diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-18 23:13:54 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-18 23:13:54 +0000 |
commit | fbee478ad333732982f7e0eecdcc3681a6d71f2f (patch) | |
tree | 03059df9fba2429c599588907681d9e94f8e9f7b | |
parent | d34229bc3e495d2927415f408b18aec51a4574e2 (diff) |
Refactor barrier to use traits
-rw-r--r-- | src/bin/main.rs | 1 | ||||
-rw-r--r-- | src/error.rs | 9 | ||||
-rw-r--r-- | src/helix/mod.rs | 312 |
3 files changed, 223 insertions, 99 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs index 9e01a29..8b46196 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -40,6 +40,7 @@ fn main() { * to become idle but it will never becomes idle since we keep a reference * to a reqwest client which maintains a connection pool. */ + std::mem::drop(authed_client); std::mem::drop(client); tokio::run( clip.join(clip2) diff --git a/src/error.rs b/src/error.rs index 07bb578..6b9f5d5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,6 +22,15 @@ impl From<reqwest::Error> for Error { } } +impl From<()> for Error { + + fn from(err: ()) -> Error { + Error { + inner: Kind::ClientError("Internal error".to_owned()) + } + } +} + impl From<futures::Canceled> for Error { fn from(_err: futures::Canceled) -> Error { diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 7ceecb6..b7d05f3 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -56,7 +56,7 @@ struct MutClientRef { token: Option<String>, scopes: Vec<Scope>, previous: Option<Client>, - chan: Option<mpsc::Sender<(AuthWaiter, oneshot::Sender<AuthWaiter>)>>, + auth_barrier: Barrier<AuthWaiter>, auth_state: AuthState, } @@ -85,7 +85,7 @@ impl Client { secret: None, inner: Mutex::new( MutClientRef { - chan: None, + auth_barrier: Barrier::new(), token: None, scopes: Vec::new(), previous: None, @@ -198,7 +198,7 @@ impl AuthClientBuilder { secret: Some(self.secret), inner: Mutex::new ( MutClientRef { - chan: None, + auth_barrier: Barrier::new(), token: self.token, scopes: Vec::new(), previous: Some(old_client), @@ -241,7 +241,7 @@ struct RequestRef { enum RequestState<T> { Uninitalized, - WaitAuth(oneshot::Receiver<AuthWaiter>), + WaitAuth(Box<dyn Future<Item=<AuthWaiter as Waiter>::Item, Error=<AuthWaiter as Waiter>::Error> + Send>), PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), } @@ -276,28 +276,216 @@ use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -/* Consider creating a barrier future which simple takes ownership of the request - * and returns it after syncing is complete - */ + +struct AuthWaiter { + waiter: Client, +} + +//f.barrier(auth).barrier(ratelimit).and_then(|result| {}) +//A ratelimiter must be aware when a limit is hit, the upper limit, +//and remaining requests. (use case specific) +// +//This can be done by either letting the ratelimiter drive the request +//so it can inspect returned headers or by maybe? using a channel to inform +//the limiter +// +//Submit task to ratelimiter. +//Check if the limit is hit and if we are polling +// 1 if we hit the limit and are not polling, add to the queue and start +// polling. +// 1. if we are polling add the request to the queue +// 2. if we are not polling and not locked then +// send the request and increment the in-flight counter. +// +// when the request has completed without errors then decrement +// the in-flight counter, update limiter data, and return the +// result to the requester. +// +// On error, EITHER: +// 1. If the error is rate limiter related place the request +// back in a queue, return other errors. (Prevents starvation) +// 2. Return all errors back to the Requester they can resubmit +// the request +// +// The main difference is that the condition is dependent on the waiter's +// future result. +// +// For auth requests we can use an OkFuture that returns the waiter and never errs +// +// So waiters must provide IntoFuture, a future than can poll the condition, +// and a is locked. +// The lock check must be pure (no side effects) but IntoFuture may +// have side effects (eg. increments in-flight counter) +// +// The result of the IntoFuture is returned to caller or the Err of the poll +// Future. For simplicity these will be the same type. +// +// Should the poll condition trait be located on the Waiter or the Barrier? +// All waiters in a barrier must use the same condition. pub trait Waiter { + type Item: Send + 'static; + type Error: From<Self::ConditionError> + From<oneshot::Canceled> + Send + 'static; + type ConditionError: Send + Clone + 'static; - fn is_locked(&self) -> bool; - fn poll(&self) -> Box<Future<Item=(), Error=()> + Send>; + fn blocked(&self) -> bool; + fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>; + fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>; } -struct AuthWaiter { - waiter: Client, +pub trait BarrierSync<W: Waiter> { + fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>; +} + +pub struct Barrier<W: Waiter> { + //queue: Vec<(W, oneshot::Sender<Result<W::Item, W::Error>>)>, + sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>, +} + +impl<W: Waiter + 'static + Send> Barrier<W> { + pub fn new() -> Barrier<W> { + + //let f = barrier_rx.for_each(|_| Ok(())).map(|_| ()).map_err(|_| ()); + //tokio::spawn(f); + + Barrier { + sink: None, + } + } + + fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) { + + enum Message<W: Waiter> { + Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)), + OnCondition(Result<(), <W as Waiter>::ConditionError>), + } + + let mut polling = false; + let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); + let mut waiters = Vec::new(); + let f1 = receiver.map(|request| Message::Request(request)); + let f2 = on_condition_rx.map(|result| Message::OnCondition(result)); + + let inner_condition = on_condition_tx.clone(); + let f = + f1.select(f2).for_each(move |message| { + match message { + Message::Request((waiter, backchan)) => { + if waiter.blocked() && !polling { + println!("locked"); + + let c1 = inner_condition.clone(); + let f = waiter + .condition_poller() + .map(|_| ()) + .then(|result| { + c1.send(result).wait(); + Ok(()) + }); + tokio::spawn(f); + polling = true; + + waiters.push((waiter, backchan)); + } else if waiter.blocked() || polling { + println!("polling"); + waiters.push((waiter, backchan)); + } else { + println!("Pass along waiter!"); + //Execute the waiters future// + //backchan.send(Ok(waiter)); + let f = waiter.into_future() + .then(|res| { + backchan.send(res); + Ok(()) + }); + + tokio::spawn(f); + } + }, + Message::OnCondition(result) => { + polling = false; + /*Resubmit all waiters back to the request channel + * At least one waiter will pass the barrier + */ + match result { + Ok(_) => { + while waiters.len() > 0 { + //Execute the waiters future// + //backchan.send(Ok(waiter)); + let (waiter, backchan) = waiters.pop().unwrap(); + let f = waiter.into_future() + .then(|res| { + backchan.send(res); + Ok(()) + }); + + tokio::spawn(f); + } + }, + Err(err) => { + /* + while waiters.len() > 0 { + let (waiter, backchan) = waiters.pop().unwrap(); + backchan.send(Err(<W as Waiter>::Error::from(err.clone()))); + } + */ + } + } + } + } + + + + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()); + + tokio::spawn(f); + } +} + +impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> { + fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> { + let (resp_tx, resp_rx) = oneshot::channel(); + + if self.sink.is_none() { + let (barrier_tx, barrier_rx) = mpsc::channel(40); + self.barrier_task(barrier_rx); + self.sink.replace(barrier_tx); + } + + let chan = self.sink.as_mut().unwrap(); + + /*Clean this up. join it with f2*/ + let f = chan.clone().send((waiter, resp_tx)).map(|_| ()).map_err(|_| ()); + tokio::spawn(f); + + let f2 = resp_rx.then(|result| { + match result { + Ok(Ok(result)) => Ok(result), + Ok(Err(err)) => Err(err), + Err(err) => Err(W::Error::from(err)), + } + }); + + Box::new(f2) + } } impl Waiter for AuthWaiter { + type Item = Self; + type Error = Error; + type ConditionError = (); - fn is_locked(&self) -> bool { + fn blocked(&self) -> bool { let mut_client = self.waiter.inner.inner.lock().unwrap(); mut_client.auth_state == AuthState::Unauth } - fn poll(&self) -> Box<Future<Item=(), Error=()> + Send> { + fn condition_poller(&self) + -> Box<Future<Item=(), Error=Self::ConditionError> + Send> + { let bottom_client = self.waiter.get_bottom_client(); let secret = self.waiter.inner.secret.as_ref().unwrap(); let client = self.waiter.clone(); @@ -313,90 +501,20 @@ impl Waiter for AuthWaiter { mut_client.token = Some(credentials.access_token.clone()); () }) - .map_err(|err| { - println!("{:?}", err); - () - }); + .map_err(|_| ()); Box::new(auth_future) } + + fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> { + Box::new(futures::future::ok(self)) + } } /* Todo: If the polled futures returns an error than all the waiters should * get that error */ -fn create_barrier<T: Send + Waiter + 'static>() - -> mpsc::Sender<(T, oneshot::Sender<T>)> -{ - enum Message<T> { - Request((T, oneshot::Sender<T>)), - OnCondition, - } - let (sender, receiver): - (mpsc::Sender<(T, oneshot::Sender<T>)>, mpsc::Receiver<(T, oneshot::Sender<T>)>) = mpsc::channel(200); - - let mut polling = false; - - let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); - let mut waiters = Vec::new(); - - let f1 = receiver.map(|request| Message::Request(request)); - let f2 = on_condition_rx.map(|complete| Message::OnCondition); - - let mut inner_sender = sender.clone(); - let inner_condition = on_condition_tx.clone(); - let f = - f1.select(f2).for_each(move |message| { - match message { - Message::Request((waiter, backchan)) => { - if waiter.is_locked() && !polling { - println!("locked"); - - let c1 = inner_condition.clone(); - let c2 = inner_condition.clone(); - let f = waiter - .poll() - .map(move |_| {c1.send(()).wait(); ()}) - .map_err(move |_| {c2.send(()).wait(); ()}); - tokio::spawn(f); - polling = true; - - waiters.push((waiter, backchan)); - } else if waiter.is_locked() || polling { - println!("polling"); - waiters.push((waiter, backchan)); - } else { - println!("Pass along waiter!"); - backchan.send(waiter); - } - }, - Message::OnCondition => { - /*Resubmit all waiters back to the request channel - * At least one waiter will pass the barrier - */ - polling = false; - let mut sender = inner_sender.clone(); - while waiters.len() > 0 { - let waiter = waiters.pop().unwrap(); - /* Spawn this */ - let f = sender.clone().send(waiter); - tokio::spawn(f.map(|_| ()).map_err(|_| ())); - } - } - } - - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - - tokio::spawn(f); - - sender -} - - impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; type Error = Error; @@ -406,20 +524,16 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { match &mut self.state { RequestState::Uninitalized => { let mut mut_client = self.inner.client.inner.inner.lock().unwrap(); - let (resp_tx, resp_rx) = oneshot::channel(); - let chan = - mut_client.chan - .get_or_insert_with(|| { - create_barrier() - }); - - /*TODO use poll_ready*/ - chan.try_send((AuthWaiter{ waiter: self.inner.client.clone() }, resp_tx)); + let waiter = AuthWaiter { + waiter: self.inner.client.clone() + }; - self.state = RequestState::WaitAuth(resp_rx); + let f = mut_client.auth_barrier.wait_for(waiter); + self.state = RequestState::WaitAuth(f); }, RequestState::WaitAuth(chan) => { - let waiter = try_ready!(chan.poll()); + let _waiter = try_ready!(chan.poll()); + let client = &self.inner.client; let reqwest = client.client(); |