diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-19 16:14:14 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-19 16:14:14 +0000 |
commit | 17893388feed5f91ebd254ac7ad8e2801ca8a6d0 (patch) | |
tree | c4fb2710eb264562aa3721dadf5f43b183c6f42d | |
parent | fbee478ad333732982f7e0eecdcc3681a6d71f2f (diff) |
Place barrier and waiters in their own modules
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/error.rs | 11 | ||||
-rw-r--r-- | src/helix/mod.rs | 245 | ||||
-rw-r--r-- | src/lib.rs | 24 | ||||
-rw-r--r-- | src/sync/barrier.rs | 130 | ||||
-rw-r--r-- | src/sync/mod.rs | 44 | ||||
-rw-r--r-- | src/sync/waiter.rs | 13 |
7 files changed, 262 insertions, 206 deletions
@@ -15,3 +15,4 @@ serde_derive = "1.0.81" chrono = { version = "0.4.6", features = ["serde"]} url = "1.7.2" url_serde = "0.2.0" +futures-timer = "0.1.1" diff --git a/src/error.rs b/src/error.rs index 6b9f5d5..20563b1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -39,3 +39,14 @@ impl From<futures::Canceled> for Error { } } } + +use std::sync::mpsc::SendError; + +impl<T> From<SendError<T>> for Error { + + fn from(_err: SendError<T>) -> Error { + Error { + inner: Kind::ClientError("Channel unexpectedly closed".to_owned()) + } + } +} diff --git a/src/helix/mod.rs b/src/helix/mod.rs index b7d05f3..b555069 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -232,10 +232,15 @@ impl AuthClientBuilder { use std::collections::BTreeMap; use reqwest::Method; +struct Request { + inner: Arc<RequestRef>, +} + struct RequestRef { url: String, params: BTreeMap<String, String>, client: Client, + ratelimit: Option<Ratelimit>, method: Method, } @@ -264,6 +269,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { params: params, client: client, method: method, + ratelimit: None, }), state: RequestState::Uninitalized } @@ -281,197 +287,27 @@ struct AuthWaiter { waiter: Client, } -//f.barrier(auth).barrier(ratelimit).and_then(|result| {}) -//A ratelimiter must be aware when a limit is hit, the upper limit, -//and remaining requests. (use case specific) -// -//This can be done by either letting the ratelimiter drive the request -//so it can inspect returned headers or by maybe? using a channel to inform -//the limiter -// -//Submit task to ratelimiter. -//Check if the limit is hit and if we are polling -// 1 if we hit the limit and are not polling, add to the queue and start -// polling. -// 1. if we are polling add the request to the queue -// 2. if we are not polling and not locked then -// send the request and increment the in-flight counter. -// -// when the request has completed without errors then decrement -// the in-flight counter, update limiter data, and return the -// result to the requester. -// -// On error, EITHER: -// 1. If the error is rate limiter related place the request -// back in a queue, return other errors. (Prevents starvation) -// 2. Return all errors back to the Requester they can resubmit -// the request -// -// The main difference is that the condition is dependent on the waiter's -// future result. -// -// For auth requests we can use an OkFuture that returns the waiter and never errs -// -// So waiters must provide IntoFuture, a future than can poll the condition, -// and a is locked. -// The lock check must be pure (no side effects) but IntoFuture may -// have side effects (eg. increments in-flight counter) -// -// The result of the IntoFuture is returned to caller or the Err of the poll -// Future. For simplicity these will be the same type. -// -// Should the poll condition trait be located on the Waiter or the Barrier? -// All waiters in a barrier must use the same condition. - -pub trait Waiter { - type Item: Send + 'static; - type Error: From<Self::ConditionError> + From<oneshot::Canceled> + Send + 'static; - type ConditionError: Send + Clone + 'static; - - fn blocked(&self) -> bool; - fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>; - fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>; -} -pub trait BarrierSync<W: Waiter> { - fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>; +pub struct RatelimitWaiter { + limit: Ratelimit, + request: Request, } -pub struct Barrier<W: Waiter> { - //queue: Vec<(W, oneshot::Sender<Result<W::Item, W::Error>>)>, - sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>, +#[derive(Debug, Clone)] +pub struct Ratelimit { + inner: Arc<Mutex<RatelimitRef>> } -impl<W: Waiter + 'static + Send> Barrier<W> { - pub fn new() -> Barrier<W> { - - //let f = barrier_rx.for_each(|_| Ok(())).map(|_| ()).map_err(|_| ()); - //tokio::spawn(f); - - Barrier { - sink: None, - } - } - - fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) { - - enum Message<W: Waiter> { - Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)), - OnCondition(Result<(), <W as Waiter>::ConditionError>), - } - - let mut polling = false; - let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); - let mut waiters = Vec::new(); - let f1 = receiver.map(|request| Message::Request(request)); - let f2 = on_condition_rx.map(|result| Message::OnCondition(result)); - - let inner_condition = on_condition_tx.clone(); - let f = - f1.select(f2).for_each(move |message| { - match message { - Message::Request((waiter, backchan)) => { - if waiter.blocked() && !polling { - println!("locked"); - - let c1 = inner_condition.clone(); - let f = waiter - .condition_poller() - .map(|_| ()) - .then(|result| { - c1.send(result).wait(); - Ok(()) - }); - tokio::spawn(f); - polling = true; - - waiters.push((waiter, backchan)); - } else if waiter.blocked() || polling { - println!("polling"); - waiters.push((waiter, backchan)); - } else { - println!("Pass along waiter!"); - //Execute the waiters future// - //backchan.send(Ok(waiter)); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - Message::OnCondition(result) => { - polling = false; - /*Resubmit all waiters back to the request channel - * At least one waiter will pass the barrier - */ - match result { - Ok(_) => { - while waiters.len() > 0 { - //Execute the waiters future// - //backchan.send(Ok(waiter)); - let (waiter, backchan) = waiters.pop().unwrap(); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - Err(err) => { - /* - while waiters.len() > 0 { - let (waiter, backchan) = waiters.pop().unwrap(); - backchan.send(Err(<W as Waiter>::Error::from(err.clone()))); - } - */ - } - } - } - } - - - - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - - tokio::spawn(f); - } +#[derive(Debug, Clone)] +pub struct RatelimitRef { + remaining: i32, + inflight: i32, + quota: i32, + reset: Option<u32>, } -impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> { - fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> { - let (resp_tx, resp_rx) = oneshot::channel(); - - if self.sink.is_none() { - let (barrier_tx, barrier_rx) = mpsc::channel(40); - self.barrier_task(barrier_rx); - self.sink.replace(barrier_tx); - } - - let chan = self.sink.as_mut().unwrap(); - - /*Clean this up. join it with f2*/ - let f = chan.clone().send((waiter, resp_tx)).map(|_| ()).map_err(|_| ()); - tokio::spawn(f); - - let f2 = resp_rx.then(|result| { - match result { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err), - Err(err) => Err(W::Error::from(err)), - } - }); - - Box::new(f2) - } -} +use crate::sync::waiter::Waiter; +use crate::sync::barrier::{BarrierSync, Barrier}; impl Waiter for AuthWaiter { type Item = Self; @@ -511,6 +347,47 @@ impl Waiter for AuthWaiter { } } +impl Waiter for RatelimitWaiter { + type Item = reqwest::r#async::Response; + type Error = Error; + type ConditionError = (); + + fn blocked(&self) -> bool { + let limits = self.limit.inner.lock().unwrap(); + limits.remaining - limits.inflight <= 0 + } + + fn condition_poller(&self) + -> Box<Future<Item=(), Error=Self::ConditionError> + Send> + { + /*TODO: Really basic for now*/ + use futures_timer::Delay; + use std::time::Duration; + Box::new( + Delay::new(Duration::from_secs(60)) + .map_err(|_| ()) + ) + } + + fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> { + let client = &self.request.inner.client; + let reqwest = client.client(); + let method = &self.request.inner.method; + let url = &self.request.inner.url; + let params = &self.request.inner.params; + + let builder = reqwest.request(method.clone(), url); + let builder = client.apply_standard_headers(builder); + let r = builder.query(params); + + let limits = &self.limit.clone(); + + /* TODO update limits */ + Box::new(r.send().map_err(|err| Error::from(err))) + } + +} + /* Todo: If the polled futures returns an error than all the waiters should * get that error */ @@ -10,15 +10,13 @@ pub mod helix; pub mod kraken; pub mod types; pub mod error; +pub mod sync; + pub use self::helix::Client as HelixClient; pub use self::kraken::Client as KrakenClient; use reqwest::r#async::Client as ReqwestClient; -use reqwest::header::HeaderMap; -use std::marker::PhantomData; -use std::sync::Arc; -use std::collections::BTreeMap; pub struct Client { pub helix: HelixClient, @@ -34,21 +32,3 @@ impl Client { } } } - -trait Request<T> { - fn url(&self) -> String; - fn headers(&self) -> &HeaderMap; - fn query(&self) -> &BTreeMap<String, String>; - fn returns(&self) -> T; -} - -pub struct GetRequest<T> { - inner: Arc<GetRequestRef<T>>, -} - -struct GetRequestRef<T> { - url: String, -// headers: HeaderMap, -// query: BTreeMap<String, String>, - returns: PhantomData<T>, -} diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs new file mode 100644 index 0000000..7e53b12 --- /dev/null +++ b/src/sync/barrier.rs @@ -0,0 +1,130 @@ +use super::waiter::Waiter; +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::prelude::*; + +pub trait BarrierSync<W: Waiter> { + fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>; +} + +pub struct Barrier<W: Waiter> { + sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>, +} + +impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> { + fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> { + let (resp_tx, resp_rx) = oneshot::channel(); + + if self.sink.is_none() { + let (barrier_tx, barrier_rx) = mpsc::channel(40); + self.barrier_task(barrier_rx); + self.sink.replace(barrier_tx); + } + + let chan = self.sink.as_mut().unwrap().clone(); + + /*TODO: I want meaningful error types... */ + let f1 = chan + .send((waiter, resp_tx)) + .map_err(|err| W::Error::from(())) + .and_then(|_| { + resp_rx.then(|result| { + match result { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err), + Err(err) => Err(W::Error::from(())), + } + }) + }); + + Box::new(f1) + } +} + +impl<W: Waiter + 'static + Send> Barrier<W> { + pub fn new() -> Barrier<W> { + Barrier { + sink: None, + } + } + + fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) { + + enum Message<W: Waiter> { + Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)), + OnCondition(Result<(), <W as Waiter>::ConditionError>), + } + + let mut polling = false; + let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); + let mut waiters = Vec::new(); + let f1 = receiver.map(|request| Message::Request(request)); + let f2 = on_condition_rx.map(|result| Message::OnCondition(result)); + + let inner_condition = on_condition_tx.clone(); + let f = + f1.select(f2).for_each(move |message| { + match message { + Message::Request((waiter, backchan)) => { + if waiter.blocked() && !polling { + println!("locked"); + + let c1 = inner_condition.clone(); + let f = waiter + .condition_poller() + .map(|_| ()) + .then(|result| { + c1.send(result).wait(); + Ok(()) + }); + tokio::spawn(f); + polling = true; + + waiters.push((waiter, backchan)); + } else if waiter.blocked() || polling { + println!("polling"); + waiters.push((waiter, backchan)); + } else { + println!("Pass along waiter!"); + let f = waiter.into_future() + .then(|res| { + backchan.send(res); + Ok(()) + }); + + tokio::spawn(f); + } + }, + Message::OnCondition(result) => { + polling = false; + /*Resubmit all waiters back to the request channel + * At least one waiter will pass the barrier + */ + match result { + Ok(_) => { + while waiters.len() > 0 { + let (waiter, backchan) = waiters.pop().unwrap(); + let f = waiter.into_future() + .then(|res| { + backchan.send(res); + Ok(()) + }); + + tokio::spawn(f); + } + }, + _ => { panic!("condition channel closed") } + } + } + } + + + + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()); + + tokio::spawn(f); + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs new file mode 100644 index 0000000..ca06d32 --- /dev/null +++ b/src/sync/mod.rs @@ -0,0 +1,44 @@ +//f.barrier(auth).barrier(ratelimit).and_then(|result| {}) +//A ratelimiter must be aware when a limit is hit, the upper limit, +//and remaining requests. (use case specific) +// +//This can be done by either letting the ratelimiter drive the request +//so it can inspect returned headers or by maybe? using a channel to inform +//the limiter +// +//Submit task to ratelimiter. +//Check if the limit is hit and if we are polling +// 1 if we hit the limit and are not polling, add to the queue and start +// polling. +// 2. if we are polling add the request to the queue +// 3. if we are not polling and not locked then +// send the request and increment the in-flight counter. +// +// when the request has completed without errors then decrement +// the in-flight counter, update limiter data, and return the +// result to the requester. +// +// On error, EITHER: +// 1. If the error is rate limiter related place the request +// back in a queue, return other errors. (Prevents starvation) +// 2. Return all errors back to the Requester they can resubmit +// the request +// +// The main difference is that the condition is dependent on the waiter's +// future result. +// +// For auth requests we can use an OkFuture that returns the waiter and never errs +// +// So waiters must provide IntoFuture, a future than can poll the condition, +// and a is locked. +// The lock check must be pure (no side effects) but IntoFuture may +// have side effects (eg. increments in-flight counter) +// +// The result of the IntoFuture is returned to caller or the Err of the poll +// Future. For simplicity these will be the same type. +// +// Should the poll condition trait be located on the Waiter or the Barrier? +// All waiters in a barrier must use the same condition. + +pub mod barrier; +pub mod waiter; diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs new file mode 100644 index 0000000..656c42e --- /dev/null +++ b/src/sync/waiter.rs @@ -0,0 +1,13 @@ +use futures::sync::oneshot; +use futures::Future; + +pub trait Waiter { + type Item: Send + 'static; + type Error: From<Self::ConditionError> + + From<oneshot::Canceled> + From<()> + Send + 'static; + type ConditionError: Send + Clone + 'static; + + fn blocked(&self) -> bool; + fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>; + fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>; +} |