summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-19 19:06:51 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-19 19:06:51 +0000
commita8e42248cf617767dfd890c0832ea233bb4608fc (patch)
tree0b74e6638b64eb24f0b29ce9d6195ba26f3d2537
parent17893388feed5f91ebd254ac7ad8e2801ca8a6d0 (diff)
Another Barrier reimplementation
-rw-r--r--src/bin/main.rs10
-rw-r--r--src/helix/mod.rs105
2 files changed, 113 insertions, 2 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs
index 8b46196..c5b7ea2 100644
--- a/src/bin/main.rs
+++ b/src/bin/main.rs
@@ -28,12 +28,22 @@ fn main() {
()
});
+ let clip2 = authed_client
+ .clips()
+ .clip(&"EnergeticApatheticTarsierThisIsSparta")
+ .map_err(|err| {
+ println!("{:?}", err);
+ ()
+ });
+
+ /*
let clip2 = client.kraken
.clip(&"EnergeticApatheticTarsierThisIsSparta")
.map_err(|err| {
println!("{:?}", err);
()
});
+ */
/* Prevents tokio from **hanging**
* since tokio::run blocks the current thread and waits for the entire runtime
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index b555069..f7a09f6 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -11,6 +11,7 @@ use std::collections::HashSet;
use futures::{Sink, Stream};
use super::error::Error;
+use futures::prelude::*;
pub struct Namespace<T> {
client: Client,
@@ -52,12 +53,15 @@ enum AuthState {
Auth,
}
+use futures::future::Shared;
+
struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
previous: Option<Client>,
auth_barrier: Barrier<AuthWaiter>,
auth_state: AuthState,
+ auth_future: Option<Shared<Box<Future<Item=(), Error=()> + Send>>>
}
use futures::sync::mpsc;
@@ -90,6 +94,7 @@ impl Client {
scopes: Vec::new(),
previous: None,
auth_state: AuthState::Auth,
+ auth_future: None,
})
})
}
@@ -203,6 +208,7 @@ impl AuthClientBuilder {
scopes: Vec::new(),
previous: Some(old_client),
auth_state: auth_state,
+ auth_future: None,
})
})
@@ -246,7 +252,7 @@ struct RequestRef {
enum RequestState<T> {
Uninitalized,
- WaitAuth(Box<dyn Future<Item=<AuthWaiter as Waiter>::Item, Error=<AuthWaiter as Waiter>::Error> + Send>),
+ WaitAuth(AuthWaiter2),
PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
@@ -306,6 +312,95 @@ pub struct RatelimitRef {
reset: Option<u32>,
}
+struct AuthWaiter2 {
+ waiter: Client,
+ shared_future: Option<(Shared<Box<Future<Item=(), Error=()> + Send>>)>,
+ polling: bool,
+}
+
+
+use futures::future::{SharedError, SharedItem};
+impl Future for AuthWaiter2 {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ loop {
+ let blocked = self.blocked();
+ if blocked && !self.polling {
+ println!("not polling and blocked");
+ let fut = self.condition();
+ self.shared_future = Some(fut);
+ self.polling = true;
+ } else if blocked || self.polling {
+ println!("polling");
+ let f = self.shared_future.as_mut().unwrap();
+ let r = f.poll();
+ if let Ok(Async::NotReady) = r {
+ return Ok(Async::NotReady);
+ }
+ self.polling = false;
+ } else {
+ println!("okay");
+ return Ok(Async::Ready(()));
+ }
+ }
+ }
+}
+
+impl AuthWaiter2 {
+
+ fn blocked(&self) -> bool {
+ let mut_client = self.waiter.inner.inner.lock().unwrap();
+ mut_client.auth_state == AuthState::Unauth
+ }
+
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=()> + Send>>
+ {
+ /*NOTE: careful with the creation of new_condition
+ * since it may lead to a deadlock
+ */
+ let new_condition = self.condition_future();
+ let mut mut_client = self.waiter.inner.inner.lock().unwrap();
+ let maybe_future = &mut mut_client.auth_future;
+
+ let new_condition_clone = new_condition.clone();
+ let f = maybe_future.get_or_insert_with(|| {
+ new_condition
+ });
+
+ let f =
+ if let Some(_) = f.peek() {
+ maybe_future.replace(new_condition_clone);
+ maybe_future.as_ref().unwrap()
+ } else { f };
+ f.clone()
+ }
+
+ fn condition_future(&self) ->
+ Shared<Box<Future<Item=(), Error=()> + Send>> {
+ let bottom_client = self.waiter.get_bottom_client();
+ let secret = self.waiter.inner.secret.as_ref().unwrap();
+ let client = self.waiter.clone();
+
+ let auth_future =
+ bottom_client
+ .auth()
+ .client_credentials(secret)
+ .map(move |credentials| {
+ println!("{:?}", credentials);
+ let mut mut_client = client.inner.inner.lock().unwrap();
+ mut_client.auth_state = AuthState::Auth;
+ mut_client.token = Some(credentials.access_token.clone());
+ ()
+ })
+ .map_err(|_| ());
+
+ Future::shared(Box::new(auth_future))
+ }
+}
+
use crate::sync::waiter::Waiter;
use crate::sync::barrier::{BarrierSync, Barrier};
@@ -405,7 +500,13 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
waiter: self.inner.client.clone()
};
- let f = mut_client.auth_barrier.wait_for(waiter);
+ let waiter2 = AuthWaiter2 {
+ waiter: self.inner.client.clone(),
+ shared_future: None,
+ polling: false,
+ };
+
+ let f = waiter2;
self.state = RequestState::WaitAuth(f);
},
RequestState::WaitAuth(chan) => {