summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-19 16:14:14 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-19 16:14:14 +0000
commit17893388feed5f91ebd254ac7ad8e2801ca8a6d0 (patch)
treec4fb2710eb264562aa3721dadf5f43b183c6f42d
parentfbee478ad333732982f7e0eecdcc3681a6d71f2f (diff)
Place barrier and waiters in their own modules
-rw-r--r--Cargo.toml1
-rw-r--r--src/error.rs11
-rw-r--r--src/helix/mod.rs245
-rw-r--r--src/lib.rs24
-rw-r--r--src/sync/barrier.rs130
-rw-r--r--src/sync/mod.rs44
-rw-r--r--src/sync/waiter.rs13
7 files changed, 262 insertions, 206 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 58e4fc6..4ba5dbd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,3 +15,4 @@ serde_derive = "1.0.81"
chrono = { version = "0.4.6", features = ["serde"]}
url = "1.7.2"
url_serde = "0.2.0"
+futures-timer = "0.1.1"
diff --git a/src/error.rs b/src/error.rs
index 6b9f5d5..20563b1 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -39,3 +39,14 @@ impl From<futures::Canceled> for Error {
}
}
}
+
+use std::sync::mpsc::SendError;
+
+impl<T> From<SendError<T>> for Error {
+
+ fn from(_err: SendError<T>) -> Error {
+ Error {
+ inner: Kind::ClientError("Channel unexpectedly closed".to_owned())
+ }
+ }
+}
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index b7d05f3..b555069 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -232,10 +232,15 @@ impl AuthClientBuilder {
use std::collections::BTreeMap;
use reqwest::Method;
+struct Request {
+ inner: Arc<RequestRef>,
+}
+
struct RequestRef {
url: String,
params: BTreeMap<String, String>,
client: Client,
+ ratelimit: Option<Ratelimit>,
method: Method,
}
@@ -264,6 +269,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
params: params,
client: client,
method: method,
+ ratelimit: None,
}),
state: RequestState::Uninitalized
}
@@ -281,197 +287,27 @@ struct AuthWaiter {
waiter: Client,
}
-//f.barrier(auth).barrier(ratelimit).and_then(|result| {})
-//A ratelimiter must be aware when a limit is hit, the upper limit,
-//and remaining requests. (use case specific)
-//
-//This can be done by either letting the ratelimiter drive the request
-//so it can inspect returned headers or by maybe? using a channel to inform
-//the limiter
-//
-//Submit task to ratelimiter.
-//Check if the limit is hit and if we are polling
-// 1 if we hit the limit and are not polling, add to the queue and start
-// polling.
-// 1. if we are polling add the request to the queue
-// 2. if we are not polling and not locked then
-// send the request and increment the in-flight counter.
-//
-// when the request has completed without errors then decrement
-// the in-flight counter, update limiter data, and return the
-// result to the requester.
-//
-// On error, EITHER:
-// 1. If the error is rate limiter related place the request
-// back in a queue, return other errors. (Prevents starvation)
-// 2. Return all errors back to the Requester they can resubmit
-// the request
-//
-// The main difference is that the condition is dependent on the waiter's
-// future result.
-//
-// For auth requests we can use an OkFuture that returns the waiter and never errs
-//
-// So waiters must provide IntoFuture, a future than can poll the condition,
-// and a is locked.
-// The lock check must be pure (no side effects) but IntoFuture may
-// have side effects (eg. increments in-flight counter)
-//
-// The result of the IntoFuture is returned to caller or the Err of the poll
-// Future. For simplicity these will be the same type.
-//
-// Should the poll condition trait be located on the Waiter or the Barrier?
-// All waiters in a barrier must use the same condition.
-
-pub trait Waiter {
- type Item: Send + 'static;
- type Error: From<Self::ConditionError> + From<oneshot::Canceled> + Send + 'static;
- type ConditionError: Send + Clone + 'static;
-
- fn blocked(&self) -> bool;
- fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>;
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>;
-}
-pub trait BarrierSync<W: Waiter> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>;
+pub struct RatelimitWaiter {
+ limit: Ratelimit,
+ request: Request,
}
-pub struct Barrier<W: Waiter> {
- //queue: Vec<(W, oneshot::Sender<Result<W::Item, W::Error>>)>,
- sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>,
+#[derive(Debug, Clone)]
+pub struct Ratelimit {
+ inner: Arc<Mutex<RatelimitRef>>
}
-impl<W: Waiter + 'static + Send> Barrier<W> {
- pub fn new() -> Barrier<W> {
-
- //let f = barrier_rx.for_each(|_| Ok(())).map(|_| ()).map_err(|_| ());
- //tokio::spawn(f);
-
- Barrier {
- sink: None,
- }
- }
-
- fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) {
-
- enum Message<W: Waiter> {
- Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)),
- OnCondition(Result<(), <W as Waiter>::ConditionError>),
- }
-
- let mut polling = false;
- let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
- let mut waiters = Vec::new();
- let f1 = receiver.map(|request| Message::Request(request));
- let f2 = on_condition_rx.map(|result| Message::OnCondition(result));
-
- let inner_condition = on_condition_tx.clone();
- let f =
- f1.select(f2).for_each(move |message| {
- match message {
- Message::Request((waiter, backchan)) => {
- if waiter.blocked() && !polling {
- println!("locked");
-
- let c1 = inner_condition.clone();
- let f = waiter
- .condition_poller()
- .map(|_| ())
- .then(|result| {
- c1.send(result).wait();
- Ok(())
- });
- tokio::spawn(f);
- polling = true;
-
- waiters.push((waiter, backchan));
- } else if waiter.blocked() || polling {
- println!("polling");
- waiters.push((waiter, backchan));
- } else {
- println!("Pass along waiter!");
- //Execute the waiters future//
- //backchan.send(Ok(waiter));
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- Message::OnCondition(result) => {
- polling = false;
- /*Resubmit all waiters back to the request channel
- * At least one waiter will pass the barrier
- */
- match result {
- Ok(_) => {
- while waiters.len() > 0 {
- //Execute the waiters future//
- //backchan.send(Ok(waiter));
- let (waiter, backchan) = waiters.pop().unwrap();
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- Err(err) => {
- /*
- while waiters.len() > 0 {
- let (waiter, backchan) = waiters.pop().unwrap();
- backchan.send(Err(<W as Waiter>::Error::from(err.clone())));
- }
- */
- }
- }
- }
- }
-
-
-
- Ok(())
- })
- .map(|_| ())
- .map_err(|_| ());
-
- tokio::spawn(f);
- }
+#[derive(Debug, Clone)]
+pub struct RatelimitRef {
+ remaining: i32,
+ inflight: i32,
+ quota: i32,
+ reset: Option<u32>,
}
-impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> {
- let (resp_tx, resp_rx) = oneshot::channel();
-
- if self.sink.is_none() {
- let (barrier_tx, barrier_rx) = mpsc::channel(40);
- self.barrier_task(barrier_rx);
- self.sink.replace(barrier_tx);
- }
-
- let chan = self.sink.as_mut().unwrap();
-
- /*Clean this up. join it with f2*/
- let f = chan.clone().send((waiter, resp_tx)).map(|_| ()).map_err(|_| ());
- tokio::spawn(f);
-
- let f2 = resp_rx.then(|result| {
- match result {
- Ok(Ok(result)) => Ok(result),
- Ok(Err(err)) => Err(err),
- Err(err) => Err(W::Error::from(err)),
- }
- });
-
- Box::new(f2)
- }
-}
+use crate::sync::waiter::Waiter;
+use crate::sync::barrier::{BarrierSync, Barrier};
impl Waiter for AuthWaiter {
type Item = Self;
@@ -511,6 +347,47 @@ impl Waiter for AuthWaiter {
}
}
+impl Waiter for RatelimitWaiter {
+ type Item = reqwest::r#async::Response;
+ type Error = Error;
+ type ConditionError = ();
+
+ fn blocked(&self) -> bool {
+ let limits = self.limit.inner.lock().unwrap();
+ limits.remaining - limits.inflight <= 0
+ }
+
+ fn condition_poller(&self)
+ -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
+ {
+ /*TODO: Really basic for now*/
+ use futures_timer::Delay;
+ use std::time::Duration;
+ Box::new(
+ Delay::new(Duration::from_secs(60))
+ .map_err(|_| ())
+ )
+ }
+
+ fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
+ let client = &self.request.inner.client;
+ let reqwest = client.client();
+ let method = &self.request.inner.method;
+ let url = &self.request.inner.url;
+ let params = &self.request.inner.params;
+
+ let builder = reqwest.request(method.clone(), url);
+ let builder = client.apply_standard_headers(builder);
+ let r = builder.query(params);
+
+ let limits = &self.limit.clone();
+
+ /* TODO update limits */
+ Box::new(r.send().map_err(|err| Error::from(err)))
+ }
+
+}
+
/* Todo: If the polled futures returns an error than all the waiters should
* get that error
*/
diff --git a/src/lib.rs b/src/lib.rs
index 27dffea..6407236 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,15 +10,13 @@ pub mod helix;
pub mod kraken;
pub mod types;
pub mod error;
+pub mod sync;
+
pub use self::helix::Client as HelixClient;
pub use self::kraken::Client as KrakenClient;
use reqwest::r#async::Client as ReqwestClient;
-use reqwest::header::HeaderMap;
-use std::marker::PhantomData;
-use std::sync::Arc;
-use std::collections::BTreeMap;
pub struct Client {
pub helix: HelixClient,
@@ -34,21 +32,3 @@ impl Client {
}
}
}
-
-trait Request<T> {
- fn url(&self) -> String;
- fn headers(&self) -> &HeaderMap;
- fn query(&self) -> &BTreeMap<String, String>;
- fn returns(&self) -> T;
-}
-
-pub struct GetRequest<T> {
- inner: Arc<GetRequestRef<T>>,
-}
-
-struct GetRequestRef<T> {
- url: String,
-// headers: HeaderMap,
-// query: BTreeMap<String, String>,
- returns: PhantomData<T>,
-}
diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs
new file mode 100644
index 0000000..7e53b12
--- /dev/null
+++ b/src/sync/barrier.rs
@@ -0,0 +1,130 @@
+use super::waiter::Waiter;
+use futures::sync::mpsc;
+use futures::sync::oneshot;
+use futures::prelude::*;
+
+pub trait BarrierSync<W: Waiter> {
+ fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>;
+}
+
+pub struct Barrier<W: Waiter> {
+ sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>,
+}
+
+impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> {
+ fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> {
+ let (resp_tx, resp_rx) = oneshot::channel();
+
+ if self.sink.is_none() {
+ let (barrier_tx, barrier_rx) = mpsc::channel(40);
+ self.barrier_task(barrier_rx);
+ self.sink.replace(barrier_tx);
+ }
+
+ let chan = self.sink.as_mut().unwrap().clone();
+
+ /*TODO: I want meaningful error types... */
+ let f1 = chan
+ .send((waiter, resp_tx))
+ .map_err(|err| W::Error::from(()))
+ .and_then(|_| {
+ resp_rx.then(|result| {
+ match result {
+ Ok(Ok(result)) => Ok(result),
+ Ok(Err(err)) => Err(err),
+ Err(err) => Err(W::Error::from(())),
+ }
+ })
+ });
+
+ Box::new(f1)
+ }
+}
+
+impl<W: Waiter + 'static + Send> Barrier<W> {
+ pub fn new() -> Barrier<W> {
+ Barrier {
+ sink: None,
+ }
+ }
+
+ fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) {
+
+ enum Message<W: Waiter> {
+ Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)),
+ OnCondition(Result<(), <W as Waiter>::ConditionError>),
+ }
+
+ let mut polling = false;
+ let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
+ let mut waiters = Vec::new();
+ let f1 = receiver.map(|request| Message::Request(request));
+ let f2 = on_condition_rx.map(|result| Message::OnCondition(result));
+
+ let inner_condition = on_condition_tx.clone();
+ let f =
+ f1.select(f2).for_each(move |message| {
+ match message {
+ Message::Request((waiter, backchan)) => {
+ if waiter.blocked() && !polling {
+ println!("locked");
+
+ let c1 = inner_condition.clone();
+ let f = waiter
+ .condition_poller()
+ .map(|_| ())
+ .then(|result| {
+ c1.send(result).wait();
+ Ok(())
+ });
+ tokio::spawn(f);
+ polling = true;
+
+ waiters.push((waiter, backchan));
+ } else if waiter.blocked() || polling {
+ println!("polling");
+ waiters.push((waiter, backchan));
+ } else {
+ println!("Pass along waiter!");
+ let f = waiter.into_future()
+ .then(|res| {
+ backchan.send(res);
+ Ok(())
+ });
+
+ tokio::spawn(f);
+ }
+ },
+ Message::OnCondition(result) => {
+ polling = false;
+ /*Resubmit all waiters back to the request channel
+ * At least one waiter will pass the barrier
+ */
+ match result {
+ Ok(_) => {
+ while waiters.len() > 0 {
+ let (waiter, backchan) = waiters.pop().unwrap();
+ let f = waiter.into_future()
+ .then(|res| {
+ backchan.send(res);
+ Ok(())
+ });
+
+ tokio::spawn(f);
+ }
+ },
+ _ => { panic!("condition channel closed") }
+ }
+ }
+ }
+
+
+
+ Ok(())
+ })
+ .map(|_| ())
+ .map_err(|_| ());
+
+ tokio::spawn(f);
+ }
+}
diff --git a/src/sync/mod.rs b/src/sync/mod.rs
new file mode 100644
index 0000000..ca06d32
--- /dev/null
+++ b/src/sync/mod.rs
@@ -0,0 +1,44 @@
+//f.barrier(auth).barrier(ratelimit).and_then(|result| {})
+//A ratelimiter must be aware when a limit is hit, the upper limit,
+//and remaining requests. (use case specific)
+//
+//This can be done by either letting the ratelimiter drive the request
+//so it can inspect returned headers or by maybe? using a channel to inform
+//the limiter
+//
+//Submit task to ratelimiter.
+//Check if the limit is hit and if we are polling
+// 1 if we hit the limit and are not polling, add to the queue and start
+// polling.
+// 2. if we are polling add the request to the queue
+// 3. if we are not polling and not locked then
+// send the request and increment the in-flight counter.
+//
+// when the request has completed without errors then decrement
+// the in-flight counter, update limiter data, and return the
+// result to the requester.
+//
+// On error, EITHER:
+// 1. If the error is rate limiter related place the request
+// back in a queue, return other errors. (Prevents starvation)
+// 2. Return all errors back to the Requester they can resubmit
+// the request
+//
+// The main difference is that the condition is dependent on the waiter's
+// future result.
+//
+// For auth requests we can use an OkFuture that returns the waiter and never errs
+//
+// So waiters must provide IntoFuture, a future than can poll the condition,
+// and a is locked.
+// The lock check must be pure (no side effects) but IntoFuture may
+// have side effects (eg. increments in-flight counter)
+//
+// The result of the IntoFuture is returned to caller or the Err of the poll
+// Future. For simplicity these will be the same type.
+//
+// Should the poll condition trait be located on the Waiter or the Barrier?
+// All waiters in a barrier must use the same condition.
+
+pub mod barrier;
+pub mod waiter;
diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs
new file mode 100644
index 0000000..656c42e
--- /dev/null
+++ b/src/sync/waiter.rs
@@ -0,0 +1,13 @@
+use futures::sync::oneshot;
+use futures::Future;
+
+pub trait Waiter {
+ type Item: Send + 'static;
+ type Error: From<Self::ConditionError>
+ + From<oneshot::Canceled> + From<()> + Send + 'static;
+ type ConditionError: Send + Clone + 'static;
+
+ fn blocked(&self) -> bool;
+ fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>;
+ fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>;
+}