diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-20 00:09:49 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-20 00:09:49 +0000 |
commit | 61507ec1fa61fe134547a7eac9541375c6215b33 (patch) | |
tree | cf7fb84300454680dd0acd06434fa23a71d6211f | |
parent | a8e42248cf617767dfd890c0832ea233bb4608fc (diff) |
Replace channel based barrier with shared future based barrier
-rw-r--r-- | src/error.rs | 23 | ||||
-rw-r--r-- | src/helix/mod.rs | 340 | ||||
-rw-r--r-- | src/helix/namespaces/auth.rs | 2 | ||||
-rw-r--r-- | src/helix/namespaces/clips.rs | 3 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/sync/barrier.rs | 143 | ||||
-rw-r--r-- | src/sync/waiter.rs | 12 |
7 files changed, 246 insertions, 278 deletions
diff --git a/src/error.rs b/src/error.rs index 20563b1..88f2713 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,20 @@ use reqwest::Error as ReqwestError; +use futures::future::SharedError; use std::convert::From; +/*TODO: How should condition errors be handled? + * Ultimately the future must resolve so if the condition + * errs than all it's waiters must err. + */ +#[derive(Clone, Debug)] +pub struct ConditionError{} + +impl From<SharedError<ConditionError>> for ConditionError { + fn from(other: SharedError<ConditionError>) -> Self { + ConditionError{} + } +} + #[derive(Debug)] enum Kind { Reqwest(ReqwestError), @@ -50,3 +64,12 @@ impl<T> From<SendError<T>> for Error { } } } + +impl From<ConditionError> for Error { + + fn from(_err: ConditionError) -> Error { + Error { + inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned()) + } + } +} diff --git a/src/helix/mod.rs b/src/helix/mod.rs index f7a09f6..c0dd2a9 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -1,17 +1,23 @@ use futures::future::Future; use std::sync::{Arc, Mutex}; use reqwest::r#async::Client as ReqwestClient; -pub use super::types; +use std::collections::HashSet; +use super::error::Error; use std::marker::PhantomData; -pub mod models; -pub mod namespaces; +use futures::future::Shared; +use futures::Poll; +use serde::de::DeserializeOwned; +use futures::Async; +use futures::try_ready; -use std::collections::HashSet; -use futures::{Sink, Stream}; +use crate::error::ConditionError; -use super::error::Error; -use futures::prelude::*; + +pub use super::types; + +pub mod models; +pub mod namespaces; pub struct Namespace<T> { client: Client, @@ -44,33 +50,27 @@ pub struct Client { inner: Arc<ClientRef>, } -use reqwest::r#async::Response; -use futures::sync::oneshot; - #[derive(Clone, PartialEq)] enum AuthState { Unauth, Auth, } -use futures::future::Shared; struct MutClientRef { token: Option<String>, scopes: Vec<Scope>, previous: Option<Client>, - auth_barrier: Barrier<AuthWaiter>, auth_state: AuthState, - auth_future: Option<Shared<Box<Future<Item=(), Error=()> + Send>>> + auth_future: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>> } -use futures::sync::mpsc; - - struct ClientRef { id: String, secret: Option<String>, client: ReqwestClient, + auth_barrier: Barrier, + ratelimit_default: Ratelimit, inner: Mutex<MutClientRef>, } @@ -80,6 +80,10 @@ impl Client { Client::new_with_client(id, client) } + pub fn default_ratelimit(&self) -> Ratelimit { + self.inner.ratelimit_default.clone() + } + pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { Client { @@ -87,9 +91,10 @@ impl Client { id: id.to_owned(), client: client, secret: None, + auth_barrier: Barrier::new(), + ratelimit_default: Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset"), inner: Mutex::new( MutClientRef { - auth_barrier: Barrier::new(), token: None, scopes: Vec::new(), previous: None, @@ -201,9 +206,10 @@ impl AuthClientBuilder { id: old_client.inner.id.clone(), client: old_client.inner.client.clone(), secret: Some(self.secret), + auth_barrier: Barrier::new(), + ratelimit_default: old_client.default_ratelimit(), inner: Mutex::new ( MutClientRef { - auth_barrier: Barrier::new(), token: self.token, scopes: Vec::new(), previous: Some(old_client), @@ -252,7 +258,9 @@ struct RequestRef { enum RequestState<T> { Uninitalized, - WaitAuth(AuthWaiter2), + WaitAuth(WaiterState<AuthWaiter>), + WaitLimit(WaiterState<RatelimitWaiter>), + WaitRequest, PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), } @@ -267,6 +275,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { params: BTreeMap<String, String>, client: Client, method: Method, + ratelimit: Option<Ratelimit>, ) -> ApiRequest<T> { ApiRequest { @@ -275,7 +284,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { params: params, client: client, method: method, - ratelimit: None, + ratelimit: ratelimit, }), state: RequestState::Uninitalized } @@ -283,140 +292,113 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { } -use futures::Poll; -use serde::de::DeserializeOwned; -use futures::Async; -use futures::try_ready; - - -struct AuthWaiter { - waiter: Client, -} - - pub struct RatelimitWaiter { limit: Ratelimit, - request: Request, } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Ratelimit { - inner: Arc<Mutex<RatelimitRef>> + inner: Arc<Mutex<RatelimitRef>>, + barrier: Barrier, +} + +impl Ratelimit { + pub fn new(limit: i32, + header_limit: &str, + header_remaining: &str, + header_reset: &str) + -> Ratelimit + { + Ratelimit { + inner: Arc::new( + Mutex::new( + RatelimitRef { + limit: limit, + remaining: limit, + inflight: 0, + reset: None, + header_limit: header_limit.to_owned(), + header_remaining: header_remaining.to_owned(), + header_reset: header_reset.to_owned(), + } + ) + ), + barrier: Barrier::new(), + } + } } #[derive(Debug, Clone)] pub struct RatelimitRef { + limit: i32, remaining: i32, inflight: i32, - quota: i32, reset: Option<u32>, + header_limit: String, + header_remaining: String, + header_reset: String, } -struct AuthWaiter2 { - waiter: Client, - shared_future: Option<(Shared<Box<Future<Item=(), Error=()> + Send>>)>, +use futures::future::SharedError; +use crate::sync::barrier::Barrier; +use crate::sync::waiter::Waiter; + +struct WaiterState<W: Waiter> { polling: bool, + shared_future: Option<(Shared<Box< Future<Item=(), Error=ConditionError> + Send>>)>, + waiter: W, + barrier: Barrier, } +impl<W: Waiter> WaiterState<W> { + fn new(waiter: W, barrier: &Barrier) -> WaiterState<W> { + WaiterState { + polling: false, + shared_future: None, + waiter: waiter, + barrier: barrier.clone(), + } + } +} -use futures::future::{SharedError, SharedItem}; -impl Future for AuthWaiter2 { - type Item = (); - type Error = (); +impl<W: Waiter> Future for WaiterState<W> { + type Item = <W as Waiter>::Item; + type Error = <W as Waiter>::Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { - let blocked = self.blocked(); + let blocked = self.waiter.blocked(); if blocked && !self.polling { - println!("not polling and blocked"); - let fut = self.condition(); + let fut = self.barrier.condition(&self.waiter); self.shared_future = Some(fut); self.polling = true; } else if blocked || self.polling { - println!("polling"); let f = self.shared_future.as_mut().unwrap(); - let r = f.poll(); - if let Ok(Async::NotReady) = r { - return Ok(Async::NotReady); - } + try_ready!(f.poll()); self.polling = false; } else { - println!("okay"); - return Ok(Async::Ready(())); + return Ok(Async::Ready(<W as Waiter>::Item::default())); } } } } -impl AuthWaiter2 { - - fn blocked(&self) -> bool { - let mut_client = self.waiter.inner.inner.lock().unwrap(); - mut_client.auth_state == AuthState::Unauth - } - - fn condition(&self) - -> Shared<Box<Future<Item=(), Error=()> + Send>> - { - /*NOTE: careful with the creation of new_condition - * since it may lead to a deadlock - */ - let new_condition = self.condition_future(); - let mut mut_client = self.waiter.inner.inner.lock().unwrap(); - let maybe_future = &mut mut_client.auth_future; - - let new_condition_clone = new_condition.clone(); - let f = maybe_future.get_or_insert_with(|| { - new_condition - }); - - let f = - if let Some(_) = f.peek() { - maybe_future.replace(new_condition_clone); - maybe_future.as_ref().unwrap() - } else { f }; - f.clone() - } - - fn condition_future(&self) -> - Shared<Box<Future<Item=(), Error=()> + Send>> { - let bottom_client = self.waiter.get_bottom_client(); - let secret = self.waiter.inner.secret.as_ref().unwrap(); - let client = self.waiter.clone(); - let auth_future = - bottom_client - .auth() - .client_credentials(secret) - .map(move |credentials| { - println!("{:?}", credentials); - let mut mut_client = client.inner.inner.lock().unwrap(); - mut_client.auth_state = AuthState::Auth; - mut_client.token = Some(credentials.access_token.clone()); - () - }) - .map_err(|_| ()); - - Future::shared(Box::new(auth_future)) - } +struct AuthWaiter { + waiter: Client, } -use crate::sync::waiter::Waiter; -use crate::sync::barrier::{BarrierSync, Barrier}; - impl Waiter for AuthWaiter { - type Item = Self; - type Error = Error; - type ConditionError = (); + type Item = (); + type Error = ConditionError; fn blocked(&self) -> bool { let mut_client = self.waiter.inner.inner.lock().unwrap(); mut_client.auth_state == AuthState::Unauth } - fn condition_poller(&self) - -> Box<Future<Item=(), Error=Self::ConditionError> + Send> - { + fn condition(&self) -> + Shared<Box<Future<Item=(), Error=ConditionError> + Send>> { let bottom_client = self.waiter.get_bottom_client(); let secret = self.waiter.inner.secret.as_ref().unwrap(); let client = self.waiter.clone(); @@ -432,55 +414,32 @@ impl Waiter for AuthWaiter { mut_client.token = Some(credentials.access_token.clone()); () }) - .map_err(|_| ()); - - Box::new(auth_future) - } + .map_err(|_| ConditionError{}); - fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> { - Box::new(futures::future::ok(self)) + Future::shared(Box::new(auth_future)) } } impl Waiter for RatelimitWaiter { - type Item = reqwest::r#async::Response; - type Error = Error; - type ConditionError = (); + type Item = (); + type Error = ConditionError; fn blocked(&self) -> bool { let limits = self.limit.inner.lock().unwrap(); + println!("{}, {}, {}", limits.limit, limits.remaining, limits.inflight); limits.remaining - limits.inflight <= 0 } - fn condition_poller(&self) - -> Box<Future<Item=(), Error=Self::ConditionError> + Send> + fn condition(&self) + -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>> { /*TODO: Really basic for now*/ use futures_timer::Delay; use std::time::Duration; - Box::new( - Delay::new(Duration::from_secs(60)) - .map_err(|_| ()) - ) + Future::shared(Box::new( + Delay::new(Duration::from_secs(60)).map_err(|_| ConditionError{}) + )) } - - fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> { - let client = &self.request.inner.client; - let reqwest = client.client(); - let method = &self.request.inner.method; - let url = &self.request.inner.url; - let params = &self.request.inner.params; - - let builder = reqwest.request(method.clone(), url); - let builder = client.apply_standard_headers(builder); - let r = builder.query(params); - - let limits = &self.limit.clone(); - - /* TODO update limits */ - Box::new(r.send().map_err(|err| Error::from(err))) - } - } /* Todo: If the polled futures returns an error than all the waiters should @@ -495,33 +454,101 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { loop { match &mut self.state { RequestState::Uninitalized => { - let mut mut_client = self.inner.client.inner.inner.lock().unwrap(); - let waiter = AuthWaiter { - waiter: self.inner.client.clone() - }; + let mut_client = self.inner.client.inner.inner.lock().unwrap(); - let waiter2 = AuthWaiter2 { + let waiter = AuthWaiter { waiter: self.inner.client.clone(), - shared_future: None, - polling: false, }; - let f = waiter2; + let f = WaiterState::new(waiter, + &self.inner.client.inner.auth_barrier); self.state = RequestState::WaitAuth(f); }, - RequestState::WaitAuth(chan) => { - let _waiter = try_ready!(chan.poll()); - + RequestState::WaitAuth(auth) => { + let _waiter = try_ready!(auth.poll()); + match self.inner.ratelimit { + Some(ref limit) => { + let barrier = limit.barrier.clone(); + let waiter = RatelimitWaiter { + limit: limit.clone(), + }; + let f = WaiterState::new(waiter, + &barrier); + self.state = RequestState::WaitLimit(f); + }, + None => { + self.state = RequestState::WaitRequest; + } + } + }, + RequestState::WaitLimit(limit) => { + let _waiter = try_ready!(limit.poll()); + self.state = RequestState::WaitRequest; + }, + RequestState::WaitRequest => { let client = &self.inner.client; let reqwest = client.client(); + if let Some(limits) = &self.inner.ratelimit { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight + 1; + } + let builder = reqwest.request(self.inner.method.clone(), &self.inner.url); let builder = client.apply_standard_headers(builder); let r = builder.query(&self.inner.params); + /*TODO add 1 to inflight*/ + + let ratelimit_err = self.inner.ratelimit.clone(); + let ratelimit_ok = self.inner.ratelimit.clone(); let f = r.send() + .map_err(|err| { + + if let Some(limits) = ratelimit_err { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + } + + err + }) .map(|mut response| { println!("{:?}", response); + if let Some(limits) = ratelimit_ok { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + + let maybe_limit = + response.headers() + .get(&mut_limits.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_limit { + mut_limits.limit = limit; + } + + let maybe_remaining = + response.headers() + .get(&mut_limits.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_remaining { + mut_limits.remaining = limit; + } + + let maybe_reset = + response.headers() + .get(&mut_limits.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<u32>().ok()); + + if let Some(reset) = maybe_reset { + mut_limits.reset = Some(reset); + } + } + response.json::<T>() }) .and_then(|json| { @@ -529,8 +556,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { }); self.state = RequestState::PollParse(Box::new(f)); - continue; - } + }, RequestState::PollParse(future) => { let res = try_ready!(future.poll()); return Ok(Async::Ready(res)); diff --git a/src/helix/namespaces/auth.rs b/src/helix/namespaces/auth.rs index 8ad7956..5efc0fe 100644 --- a/src/helix/namespaces/auth.rs +++ b/src/helix/namespaces/auth.rs @@ -39,5 +39,5 @@ pub fn client_credentials(client: Client, secret: &str) params.insert("grant_type".to_owned(), "client_credentials".to_owned()); params.insert("scope".to_owned(), "".to_owned()); - ApiRequest::new(url, params, client, Method::POST) + ApiRequest::new(url, params, client, Method::POST, None) } diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs index ac820e8..083e5c4 100644 --- a/src/helix/namespaces/clips.rs +++ b/src/helix/namespaces/clips.rs @@ -33,6 +33,7 @@ pub fn clip(client: Client, id: &str) API_DOMAIN + "/helix/clips" + "?id=" + id; let params = BTreeMap::new(); + let limit = client.default_ratelimit(); - ApiRequest::new(url, params, client, Method::GET) + ApiRequest::new(url, params, client, Method::GET, Some(limit)) } @@ -1,5 +1,6 @@ #![recursion_limit="128"] #![feature(option_replace)] +#![feature(associated_type_defaults)] extern crate futures; extern crate reqwest; extern crate serde; diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index 7e53b12..c0f21b8 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -1,130 +1,47 @@ use super::waiter::Waiter; -use futures::sync::mpsc; -use futures::sync::oneshot; use futures::prelude::*; +use std::sync::{Arc, Mutex}; +use futures::future::{Shared, SharedError}; -pub trait BarrierSync<W: Waiter> { - fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>; -} +use crate::error::ConditionError; -pub struct Barrier<W: Waiter> { - sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>, +#[derive(Clone)] +pub struct Barrier { + inner: Arc<Mutex<BarrierRef>>, } -impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> { - fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> { - let (resp_tx, resp_rx) = oneshot::channel(); - - if self.sink.is_none() { - let (barrier_tx, barrier_rx) = mpsc::channel(40); - self.barrier_task(barrier_rx); - self.sink.replace(barrier_tx); - } - - let chan = self.sink.as_mut().unwrap().clone(); - - /*TODO: I want meaningful error types... */ - let f1 = chan - .send((waiter, resp_tx)) - .map_err(|err| W::Error::from(())) - .and_then(|_| { - resp_rx.then(|result| { - match result { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err), - Err(err) => Err(W::Error::from(())), - } - }) - }); - - Box::new(f1) - } +struct BarrierRef { + condition: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>> } -impl<W: Waiter + 'static + Send> Barrier<W> { - pub fn new() -> Barrier<W> { +impl Barrier { + + pub fn new() -> Barrier { Barrier { - sink: None, + inner: Arc::new(Mutex::new( + BarrierRef { + condition: None, + })) } } - fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) { - - enum Message<W: Waiter> { - Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)), - OnCondition(Result<(), <W as Waiter>::ConditionError>), - } + pub fn condition(&self, waiter: &impl Waiter) + -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>> + { + let mut mut_barrier = self.inner.lock().unwrap(); + let maybe_condition = &mut mut_barrier.condition; - let mut polling = false; - let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); - let mut waiters = Vec::new(); - let f1 = receiver.map(|request| Message::Request(request)); - let f2 = on_condition_rx.map(|result| Message::OnCondition(result)); + let f = maybe_condition.get_or_insert_with(|| { + waiter.condition() + }); - let inner_condition = on_condition_tx.clone(); let f = - f1.select(f2).for_each(move |message| { - match message { - Message::Request((waiter, backchan)) => { - if waiter.blocked() && !polling { - println!("locked"); - - let c1 = inner_condition.clone(); - let f = waiter - .condition_poller() - .map(|_| ()) - .then(|result| { - c1.send(result).wait(); - Ok(()) - }); - tokio::spawn(f); - polling = true; - - waiters.push((waiter, backchan)); - } else if waiter.blocked() || polling { - println!("polling"); - waiters.push((waiter, backchan)); - } else { - println!("Pass along waiter!"); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - Message::OnCondition(result) => { - polling = false; - /*Resubmit all waiters back to the request channel - * At least one waiter will pass the barrier - */ - match result { - Ok(_) => { - while waiters.len() > 0 { - let (waiter, backchan) = waiters.pop().unwrap(); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - _ => { panic!("condition channel closed") } - } - } - } - - - - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - - tokio::spawn(f); + if let Some(_) = f.peek() { + let condition = waiter.condition(); + maybe_condition.replace(condition); + maybe_condition.as_ref().unwrap() + } else { f }; + f.clone() } } + diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs index 656c42e..8039280 100644 --- a/src/sync/waiter.rs +++ b/src/sync/waiter.rs @@ -1,13 +1,13 @@ use futures::sync::oneshot; use futures::Future; +use futures::future::{Shared, SharedError}; +use crate::error::ConditionError; pub trait Waiter { - type Item: Send + 'static; - type Error: From<Self::ConditionError> - + From<oneshot::Canceled> + From<()> + Send + 'static; - type ConditionError: Send + Clone + 'static; + type Item: Default; + type Error: From<SharedError<ConditionError>>; fn blocked(&self) -> bool; - fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>; - fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>; + fn condition(&self) + -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>; } |