diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-19 19:06:51 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-19 19:06:51 +0000 |
commit | a8e42248cf617767dfd890c0832ea233bb4608fc (patch) | |
tree | 0b74e6638b64eb24f0b29ce9d6195ba26f3d2537 | |
parent | 17893388feed5f91ebd254ac7ad8e2801ca8a6d0 (diff) |
Another Barrier reimplementation
-rw-r--r-- | src/bin/main.rs | 10 | ||||
-rw-r--r-- | src/helix/mod.rs | 105 |
2 files changed, 113 insertions, 2 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs index 8b46196..c5b7ea2 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -28,12 +28,22 @@ fn main() { () }); + let clip2 = authed_client + .clips() + .clip(&"EnergeticApatheticTarsierThisIsSparta") + .map_err(|err| { + println!("{:?}", err); + () + }); + + /* let clip2 = client.kraken .clip(&"EnergeticApatheticTarsierThisIsSparta") .map_err(|err| { println!("{:?}", err); () }); + */ /* Prevents tokio from **hanging** * since tokio::run blocks the current thread and waits for the entire runtime diff --git a/src/helix/mod.rs b/src/helix/mod.rs index b555069..f7a09f6 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -11,6 +11,7 @@ use std::collections::HashSet; use futures::{Sink, Stream}; use super::error::Error; +use futures::prelude::*; pub struct Namespace<T> { client: Client, @@ -52,12 +53,15 @@ enum AuthState { Auth, } +use futures::future::Shared; + struct MutClientRef { token: Option<String>, scopes: Vec<Scope>, previous: Option<Client>, auth_barrier: Barrier<AuthWaiter>, auth_state: AuthState, + auth_future: Option<Shared<Box<Future<Item=(), Error=()> + Send>>> } use futures::sync::mpsc; @@ -90,6 +94,7 @@ impl Client { scopes: Vec::new(), previous: None, auth_state: AuthState::Auth, + auth_future: None, }) }) } @@ -203,6 +208,7 @@ impl AuthClientBuilder { scopes: Vec::new(), previous: Some(old_client), auth_state: auth_state, + auth_future: None, }) }) @@ -246,7 +252,7 @@ struct RequestRef { enum RequestState<T> { Uninitalized, - WaitAuth(Box<dyn Future<Item=<AuthWaiter as Waiter>::Item, Error=<AuthWaiter as Waiter>::Error> + Send>), + WaitAuth(AuthWaiter2), PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), } @@ -306,6 +312,95 @@ pub struct RatelimitRef { reset: Option<u32>, } +struct AuthWaiter2 { + waiter: Client, + shared_future: Option<(Shared<Box<Future<Item=(), Error=()> + Send>>)>, + polling: bool, +} + + +use futures::future::{SharedError, SharedItem}; +impl Future for AuthWaiter2 { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + loop { + let blocked = self.blocked(); + if blocked && !self.polling { + println!("not polling and blocked"); + let fut = self.condition(); + self.shared_future = Some(fut); + self.polling = true; + } else if blocked || self.polling { + println!("polling"); + let f = self.shared_future.as_mut().unwrap(); + let r = f.poll(); + if let Ok(Async::NotReady) = r { + return Ok(Async::NotReady); + } + self.polling = false; + } else { + println!("okay"); + return Ok(Async::Ready(())); + } + } + } +} + +impl AuthWaiter2 { + + fn blocked(&self) -> bool { + let mut_client = self.waiter.inner.inner.lock().unwrap(); + mut_client.auth_state == AuthState::Unauth + } + + fn condition(&self) + -> Shared<Box<Future<Item=(), Error=()> + Send>> + { + /*NOTE: careful with the creation of new_condition + * since it may lead to a deadlock + */ + let new_condition = self.condition_future(); + let mut mut_client = self.waiter.inner.inner.lock().unwrap(); + let maybe_future = &mut mut_client.auth_future; + + let new_condition_clone = new_condition.clone(); + let f = maybe_future.get_or_insert_with(|| { + new_condition + }); + + let f = + if let Some(_) = f.peek() { + maybe_future.replace(new_condition_clone); + maybe_future.as_ref().unwrap() + } else { f }; + f.clone() + } + + fn condition_future(&self) -> + Shared<Box<Future<Item=(), Error=()> + Send>> { + let bottom_client = self.waiter.get_bottom_client(); + let secret = self.waiter.inner.secret.as_ref().unwrap(); + let client = self.waiter.clone(); + + let auth_future = + bottom_client + .auth() + .client_credentials(secret) + .map(move |credentials| { + println!("{:?}", credentials); + let mut mut_client = client.inner.inner.lock().unwrap(); + mut_client.auth_state = AuthState::Auth; + mut_client.token = Some(credentials.access_token.clone()); + () + }) + .map_err(|_| ()); + + Future::shared(Box::new(auth_future)) + } +} + use crate::sync::waiter::Waiter; use crate::sync::barrier::{BarrierSync, Barrier}; @@ -405,7 +500,13 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { waiter: self.inner.client.clone() }; - let f = mut_client.auth_barrier.wait_for(waiter); + let waiter2 = AuthWaiter2 { + waiter: self.inner.client.clone(), + shared_future: None, + polling: false, + }; + + let f = waiter2; self.state = RequestState::WaitAuth(f); }, RequestState::WaitAuth(chan) => { |