summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-18 23:13:54 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-18 23:13:54 +0000
commitfbee478ad333732982f7e0eecdcc3681a6d71f2f (patch)
tree03059df9fba2429c599588907681d9e94f8e9f7b
parentd34229bc3e495d2927415f408b18aec51a4574e2 (diff)
Refactor barrier to use traits
-rw-r--r--src/bin/main.rs1
-rw-r--r--src/error.rs9
-rw-r--r--src/helix/mod.rs312
3 files changed, 223 insertions, 99 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs
index 9e01a29..8b46196 100644
--- a/src/bin/main.rs
+++ b/src/bin/main.rs
@@ -40,6 +40,7 @@ fn main() {
* to become idle but it will never becomes idle since we keep a reference
* to a reqwest client which maintains a connection pool.
*/
+ std::mem::drop(authed_client);
std::mem::drop(client);
tokio::run(
clip.join(clip2)
diff --git a/src/error.rs b/src/error.rs
index 07bb578..6b9f5d5 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -22,6 +22,15 @@ impl From<reqwest::Error> for Error {
}
}
+impl From<()> for Error {
+
+ fn from(err: ()) -> Error {
+ Error {
+ inner: Kind::ClientError("Internal error".to_owned())
+ }
+ }
+}
+
impl From<futures::Canceled> for Error {
fn from(_err: futures::Canceled) -> Error {
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index 7ceecb6..b7d05f3 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -56,7 +56,7 @@ struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
previous: Option<Client>,
- chan: Option<mpsc::Sender<(AuthWaiter, oneshot::Sender<AuthWaiter>)>>,
+ auth_barrier: Barrier<AuthWaiter>,
auth_state: AuthState,
}
@@ -85,7 +85,7 @@ impl Client {
secret: None,
inner: Mutex::new(
MutClientRef {
- chan: None,
+ auth_barrier: Barrier::new(),
token: None,
scopes: Vec::new(),
previous: None,
@@ -198,7 +198,7 @@ impl AuthClientBuilder {
secret: Some(self.secret),
inner: Mutex::new (
MutClientRef {
- chan: None,
+ auth_barrier: Barrier::new(),
token: self.token,
scopes: Vec::new(),
previous: Some(old_client),
@@ -241,7 +241,7 @@ struct RequestRef {
enum RequestState<T> {
Uninitalized,
- WaitAuth(oneshot::Receiver<AuthWaiter>),
+ WaitAuth(Box<dyn Future<Item=<AuthWaiter as Waiter>::Item, Error=<AuthWaiter as Waiter>::Error> + Send>),
PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
@@ -276,28 +276,216 @@ use serde::de::DeserializeOwned;
use futures::Async;
use futures::try_ready;
-/* Consider creating a barrier future which simple takes ownership of the request
- * and returns it after syncing is complete
- */
+
+struct AuthWaiter {
+ waiter: Client,
+}
+
+//f.barrier(auth).barrier(ratelimit).and_then(|result| {})
+//A ratelimiter must be aware when a limit is hit, the upper limit,
+//and remaining requests. (use case specific)
+//
+//This can be done by either letting the ratelimiter drive the request
+//so it can inspect returned headers or by maybe? using a channel to inform
+//the limiter
+//
+//Submit task to ratelimiter.
+//Check if the limit is hit and if we are polling
+// 1 if we hit the limit and are not polling, add to the queue and start
+// polling.
+// 1. if we are polling add the request to the queue
+// 2. if we are not polling and not locked then
+// send the request and increment the in-flight counter.
+//
+// when the request has completed without errors then decrement
+// the in-flight counter, update limiter data, and return the
+// result to the requester.
+//
+// On error, EITHER:
+// 1. If the error is rate limiter related place the request
+// back in a queue, return other errors. (Prevents starvation)
+// 2. Return all errors back to the Requester they can resubmit
+// the request
+//
+// The main difference is that the condition is dependent on the waiter's
+// future result.
+//
+// For auth requests we can use an OkFuture that returns the waiter and never errs
+//
+// So waiters must provide IntoFuture, a future than can poll the condition,
+// and a is locked.
+// The lock check must be pure (no side effects) but IntoFuture may
+// have side effects (eg. increments in-flight counter)
+//
+// The result of the IntoFuture is returned to caller or the Err of the poll
+// Future. For simplicity these will be the same type.
+//
+// Should the poll condition trait be located on the Waiter or the Barrier?
+// All waiters in a barrier must use the same condition.
pub trait Waiter {
+ type Item: Send + 'static;
+ type Error: From<Self::ConditionError> + From<oneshot::Canceled> + Send + 'static;
+ type ConditionError: Send + Clone + 'static;
- fn is_locked(&self) -> bool;
- fn poll(&self) -> Box<Future<Item=(), Error=()> + Send>;
+ fn blocked(&self) -> bool;
+ fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>;
+ fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>;
}
-struct AuthWaiter {
- waiter: Client,
+pub trait BarrierSync<W: Waiter> {
+ fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>;
+}
+
+pub struct Barrier<W: Waiter> {
+ //queue: Vec<(W, oneshot::Sender<Result<W::Item, W::Error>>)>,
+ sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>,
+}
+
+impl<W: Waiter + 'static + Send> Barrier<W> {
+ pub fn new() -> Barrier<W> {
+
+ //let f = barrier_rx.for_each(|_| Ok(())).map(|_| ()).map_err(|_| ());
+ //tokio::spawn(f);
+
+ Barrier {
+ sink: None,
+ }
+ }
+
+ fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) {
+
+ enum Message<W: Waiter> {
+ Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)),
+ OnCondition(Result<(), <W as Waiter>::ConditionError>),
+ }
+
+ let mut polling = false;
+ let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
+ let mut waiters = Vec::new();
+ let f1 = receiver.map(|request| Message::Request(request));
+ let f2 = on_condition_rx.map(|result| Message::OnCondition(result));
+
+ let inner_condition = on_condition_tx.clone();
+ let f =
+ f1.select(f2).for_each(move |message| {
+ match message {
+ Message::Request((waiter, backchan)) => {
+ if waiter.blocked() && !polling {
+ println!("locked");
+
+ let c1 = inner_condition.clone();
+ let f = waiter
+ .condition_poller()
+ .map(|_| ())
+ .then(|result| {
+ c1.send(result).wait();
+ Ok(())
+ });
+ tokio::spawn(f);
+ polling = true;
+
+ waiters.push((waiter, backchan));
+ } else if waiter.blocked() || polling {
+ println!("polling");
+ waiters.push((waiter, backchan));
+ } else {
+ println!("Pass along waiter!");
+ //Execute the waiters future//
+ //backchan.send(Ok(waiter));
+ let f = waiter.into_future()
+ .then(|res| {
+ backchan.send(res);
+ Ok(())
+ });
+
+ tokio::spawn(f);
+ }
+ },
+ Message::OnCondition(result) => {
+ polling = false;
+ /*Resubmit all waiters back to the request channel
+ * At least one waiter will pass the barrier
+ */
+ match result {
+ Ok(_) => {
+ while waiters.len() > 0 {
+ //Execute the waiters future//
+ //backchan.send(Ok(waiter));
+ let (waiter, backchan) = waiters.pop().unwrap();
+ let f = waiter.into_future()
+ .then(|res| {
+ backchan.send(res);
+ Ok(())
+ });
+
+ tokio::spawn(f);
+ }
+ },
+ Err(err) => {
+ /*
+ while waiters.len() > 0 {
+ let (waiter, backchan) = waiters.pop().unwrap();
+ backchan.send(Err(<W as Waiter>::Error::from(err.clone())));
+ }
+ */
+ }
+ }
+ }
+ }
+
+
+
+ Ok(())
+ })
+ .map(|_| ())
+ .map_err(|_| ());
+
+ tokio::spawn(f);
+ }
+}
+
+impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> {
+ fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> {
+ let (resp_tx, resp_rx) = oneshot::channel();
+
+ if self.sink.is_none() {
+ let (barrier_tx, barrier_rx) = mpsc::channel(40);
+ self.barrier_task(barrier_rx);
+ self.sink.replace(barrier_tx);
+ }
+
+ let chan = self.sink.as_mut().unwrap();
+
+ /*Clean this up. join it with f2*/
+ let f = chan.clone().send((waiter, resp_tx)).map(|_| ()).map_err(|_| ());
+ tokio::spawn(f);
+
+ let f2 = resp_rx.then(|result| {
+ match result {
+ Ok(Ok(result)) => Ok(result),
+ Ok(Err(err)) => Err(err),
+ Err(err) => Err(W::Error::from(err)),
+ }
+ });
+
+ Box::new(f2)
+ }
}
impl Waiter for AuthWaiter {
+ type Item = Self;
+ type Error = Error;
+ type ConditionError = ();
- fn is_locked(&self) -> bool {
+ fn blocked(&self) -> bool {
let mut_client = self.waiter.inner.inner.lock().unwrap();
mut_client.auth_state == AuthState::Unauth
}
- fn poll(&self) -> Box<Future<Item=(), Error=()> + Send> {
+ fn condition_poller(&self)
+ -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
+ {
let bottom_client = self.waiter.get_bottom_client();
let secret = self.waiter.inner.secret.as_ref().unwrap();
let client = self.waiter.clone();
@@ -313,90 +501,20 @@ impl Waiter for AuthWaiter {
mut_client.token = Some(credentials.access_token.clone());
()
})
- .map_err(|err| {
- println!("{:?}", err);
- ()
- });
+ .map_err(|_| ());
Box::new(auth_future)
}
+
+ fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
+ Box::new(futures::future::ok(self))
+ }
}
/* Todo: If the polled futures returns an error than all the waiters should
* get that error
*/
-fn create_barrier<T: Send + Waiter + 'static>()
- -> mpsc::Sender<(T, oneshot::Sender<T>)>
-{
- enum Message<T> {
- Request((T, oneshot::Sender<T>)),
- OnCondition,
- }
- let (sender, receiver):
- (mpsc::Sender<(T, oneshot::Sender<T>)>, mpsc::Receiver<(T, oneshot::Sender<T>)>) = mpsc::channel(200);
-
- let mut polling = false;
-
- let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
- let mut waiters = Vec::new();
-
- let f1 = receiver.map(|request| Message::Request(request));
- let f2 = on_condition_rx.map(|complete| Message::OnCondition);
-
- let mut inner_sender = sender.clone();
- let inner_condition = on_condition_tx.clone();
- let f =
- f1.select(f2).for_each(move |message| {
- match message {
- Message::Request((waiter, backchan)) => {
- if waiter.is_locked() && !polling {
- println!("locked");
-
- let c1 = inner_condition.clone();
- let c2 = inner_condition.clone();
- let f = waiter
- .poll()
- .map(move |_| {c1.send(()).wait(); ()})
- .map_err(move |_| {c2.send(()).wait(); ()});
- tokio::spawn(f);
- polling = true;
-
- waiters.push((waiter, backchan));
- } else if waiter.is_locked() || polling {
- println!("polling");
- waiters.push((waiter, backchan));
- } else {
- println!("Pass along waiter!");
- backchan.send(waiter);
- }
- },
- Message::OnCondition => {
- /*Resubmit all waiters back to the request channel
- * At least one waiter will pass the barrier
- */
- polling = false;
- let mut sender = inner_sender.clone();
- while waiters.len() > 0 {
- let waiter = waiters.pop().unwrap();
- /* Spawn this */
- let f = sender.clone().send(waiter);
- tokio::spawn(f.map(|_| ()).map_err(|_| ()));
- }
- }
- }
-
- Ok(())
- })
- .map(|_| ())
- .map_err(|_| ());
-
- tokio::spawn(f);
-
- sender
-}
-
-
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
type Error = Error;
@@ -406,20 +524,16 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
match &mut self.state {
RequestState::Uninitalized => {
let mut mut_client = self.inner.client.inner.inner.lock().unwrap();
- let (resp_tx, resp_rx) = oneshot::channel();
- let chan =
- mut_client.chan
- .get_or_insert_with(|| {
- create_barrier()
- });
-
- /*TODO use poll_ready*/
- chan.try_send((AuthWaiter{ waiter: self.inner.client.clone() }, resp_tx));
+ let waiter = AuthWaiter {
+ waiter: self.inner.client.clone()
+ };
- self.state = RequestState::WaitAuth(resp_rx);
+ let f = mut_client.auth_barrier.wait_for(waiter);
+ self.state = RequestState::WaitAuth(f);
},
RequestState::WaitAuth(chan) => {
- let waiter = try_ready!(chan.poll());
+ let _waiter = try_ready!(chan.poll());
+
let client = &self.inner.client;
let reqwest = client.client();