summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-20 00:09:49 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-20 00:09:49 +0000
commit61507ec1fa61fe134547a7eac9541375c6215b33 (patch)
treecf7fb84300454680dd0acd06434fa23a71d6211f
parenta8e42248cf617767dfd890c0832ea233bb4608fc (diff)
Replace channel based barrier with shared future based barrier
-rw-r--r--src/error.rs23
-rw-r--r--src/helix/mod.rs340
-rw-r--r--src/helix/namespaces/auth.rs2
-rw-r--r--src/helix/namespaces/clips.rs3
-rw-r--r--src/lib.rs1
-rw-r--r--src/sync/barrier.rs143
-rw-r--r--src/sync/waiter.rs12
7 files changed, 246 insertions, 278 deletions
diff --git a/src/error.rs b/src/error.rs
index 20563b1..88f2713 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -1,6 +1,20 @@
use reqwest::Error as ReqwestError;
+use futures::future::SharedError;
use std::convert::From;
+/*TODO: How should condition errors be handled?
+ * Ultimately the future must resolve so if the condition
+ * errs than all it's waiters must err.
+ */
+#[derive(Clone, Debug)]
+pub struct ConditionError{}
+
+impl From<SharedError<ConditionError>> for ConditionError {
+ fn from(other: SharedError<ConditionError>) -> Self {
+ ConditionError{}
+ }
+}
+
#[derive(Debug)]
enum Kind {
Reqwest(ReqwestError),
@@ -50,3 +64,12 @@ impl<T> From<SendError<T>> for Error {
}
}
}
+
+impl From<ConditionError> for Error {
+
+ fn from(_err: ConditionError) -> Error {
+ Error {
+ inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned())
+ }
+ }
+}
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index f7a09f6..c0dd2a9 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -1,17 +1,23 @@
use futures::future::Future;
use std::sync::{Arc, Mutex};
use reqwest::r#async::Client as ReqwestClient;
-pub use super::types;
+use std::collections::HashSet;
+use super::error::Error;
use std::marker::PhantomData;
-pub mod models;
-pub mod namespaces;
+use futures::future::Shared;
+use futures::Poll;
+use serde::de::DeserializeOwned;
+use futures::Async;
+use futures::try_ready;
-use std::collections::HashSet;
-use futures::{Sink, Stream};
+use crate::error::ConditionError;
-use super::error::Error;
-use futures::prelude::*;
+
+pub use super::types;
+
+pub mod models;
+pub mod namespaces;
pub struct Namespace<T> {
client: Client,
@@ -44,33 +50,27 @@ pub struct Client {
inner: Arc<ClientRef>,
}
-use reqwest::r#async::Response;
-use futures::sync::oneshot;
-
#[derive(Clone, PartialEq)]
enum AuthState {
Unauth,
Auth,
}
-use futures::future::Shared;
struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
previous: Option<Client>,
- auth_barrier: Barrier<AuthWaiter>,
auth_state: AuthState,
- auth_future: Option<Shared<Box<Future<Item=(), Error=()> + Send>>>
+ auth_future: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>>
}
-use futures::sync::mpsc;
-
-
struct ClientRef {
id: String,
secret: Option<String>,
client: ReqwestClient,
+ auth_barrier: Barrier,
+ ratelimit_default: Ratelimit,
inner: Mutex<MutClientRef>,
}
@@ -80,6 +80,10 @@ impl Client {
Client::new_with_client(id, client)
}
+ pub fn default_ratelimit(&self) -> Ratelimit {
+ self.inner.ratelimit_default.clone()
+ }
+
pub fn new_with_client(id: &str, client: ReqwestClient) -> Client {
Client {
@@ -87,9 +91,10 @@ impl Client {
id: id.to_owned(),
client: client,
secret: None,
+ auth_barrier: Barrier::new(),
+ ratelimit_default: Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset"),
inner: Mutex::new(
MutClientRef {
- auth_barrier: Barrier::new(),
token: None,
scopes: Vec::new(),
previous: None,
@@ -201,9 +206,10 @@ impl AuthClientBuilder {
id: old_client.inner.id.clone(),
client: old_client.inner.client.clone(),
secret: Some(self.secret),
+ auth_barrier: Barrier::new(),
+ ratelimit_default: old_client.default_ratelimit(),
inner: Mutex::new (
MutClientRef {
- auth_barrier: Barrier::new(),
token: self.token,
scopes: Vec::new(),
previous: Some(old_client),
@@ -252,7 +258,9 @@ struct RequestRef {
enum RequestState<T> {
Uninitalized,
- WaitAuth(AuthWaiter2),
+ WaitAuth(WaiterState<AuthWaiter>),
+ WaitLimit(WaiterState<RatelimitWaiter>),
+ WaitRequest,
PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
@@ -267,6 +275,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
params: BTreeMap<String, String>,
client: Client,
method: Method,
+ ratelimit: Option<Ratelimit>,
) -> ApiRequest<T>
{
ApiRequest {
@@ -275,7 +284,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
params: params,
client: client,
method: method,
- ratelimit: None,
+ ratelimit: ratelimit,
}),
state: RequestState::Uninitalized
}
@@ -283,140 +292,113 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
}
-use futures::Poll;
-use serde::de::DeserializeOwned;
-use futures::Async;
-use futures::try_ready;
-
-
-struct AuthWaiter {
- waiter: Client,
-}
-
-
pub struct RatelimitWaiter {
limit: Ratelimit,
- request: Request,
}
-#[derive(Debug, Clone)]
+#[derive(Clone)]
pub struct Ratelimit {
- inner: Arc<Mutex<RatelimitRef>>
+ inner: Arc<Mutex<RatelimitRef>>,
+ barrier: Barrier,
+}
+
+impl Ratelimit {
+ pub fn new(limit: i32,
+ header_limit: &str,
+ header_remaining: &str,
+ header_reset: &str)
+ -> Ratelimit
+ {
+ Ratelimit {
+ inner: Arc::new(
+ Mutex::new(
+ RatelimitRef {
+ limit: limit,
+ remaining: limit,
+ inflight: 0,
+ reset: None,
+ header_limit: header_limit.to_owned(),
+ header_remaining: header_remaining.to_owned(),
+ header_reset: header_reset.to_owned(),
+ }
+ )
+ ),
+ barrier: Barrier::new(),
+ }
+ }
}
#[derive(Debug, Clone)]
pub struct RatelimitRef {
+ limit: i32,
remaining: i32,
inflight: i32,
- quota: i32,
reset: Option<u32>,
+ header_limit: String,
+ header_remaining: String,
+ header_reset: String,
}
-struct AuthWaiter2 {
- waiter: Client,
- shared_future: Option<(Shared<Box<Future<Item=(), Error=()> + Send>>)>,
+use futures::future::SharedError;
+use crate::sync::barrier::Barrier;
+use crate::sync::waiter::Waiter;
+
+struct WaiterState<W: Waiter> {
polling: bool,
+ shared_future: Option<(Shared<Box< Future<Item=(), Error=ConditionError> + Send>>)>,
+ waiter: W,
+ barrier: Barrier,
}
+impl<W: Waiter> WaiterState<W> {
+ fn new(waiter: W, barrier: &Barrier) -> WaiterState<W> {
+ WaiterState {
+ polling: false,
+ shared_future: None,
+ waiter: waiter,
+ barrier: barrier.clone(),
+ }
+ }
+}
-use futures::future::{SharedError, SharedItem};
-impl Future for AuthWaiter2 {
- type Item = ();
- type Error = ();
+impl<W: Waiter> Future for WaiterState<W> {
+ type Item = <W as Waiter>::Item;
+ type Error = <W as Waiter>::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
- let blocked = self.blocked();
+ let blocked = self.waiter.blocked();
if blocked && !self.polling {
- println!("not polling and blocked");
- let fut = self.condition();
+ let fut = self.barrier.condition(&self.waiter);
self.shared_future = Some(fut);
self.polling = true;
} else if blocked || self.polling {
- println!("polling");
let f = self.shared_future.as_mut().unwrap();
- let r = f.poll();
- if let Ok(Async::NotReady) = r {
- return Ok(Async::NotReady);
- }
+ try_ready!(f.poll());
self.polling = false;
} else {
- println!("okay");
- return Ok(Async::Ready(()));
+ return Ok(Async::Ready(<W as Waiter>::Item::default()));
}
}
}
}
-impl AuthWaiter2 {
-
- fn blocked(&self) -> bool {
- let mut_client = self.waiter.inner.inner.lock().unwrap();
- mut_client.auth_state == AuthState::Unauth
- }
-
- fn condition(&self)
- -> Shared<Box<Future<Item=(), Error=()> + Send>>
- {
- /*NOTE: careful with the creation of new_condition
- * since it may lead to a deadlock
- */
- let new_condition = self.condition_future();
- let mut mut_client = self.waiter.inner.inner.lock().unwrap();
- let maybe_future = &mut mut_client.auth_future;
-
- let new_condition_clone = new_condition.clone();
- let f = maybe_future.get_or_insert_with(|| {
- new_condition
- });
-
- let f =
- if let Some(_) = f.peek() {
- maybe_future.replace(new_condition_clone);
- maybe_future.as_ref().unwrap()
- } else { f };
- f.clone()
- }
-
- fn condition_future(&self) ->
- Shared<Box<Future<Item=(), Error=()> + Send>> {
- let bottom_client = self.waiter.get_bottom_client();
- let secret = self.waiter.inner.secret.as_ref().unwrap();
- let client = self.waiter.clone();
- let auth_future =
- bottom_client
- .auth()
- .client_credentials(secret)
- .map(move |credentials| {
- println!("{:?}", credentials);
- let mut mut_client = client.inner.inner.lock().unwrap();
- mut_client.auth_state = AuthState::Auth;
- mut_client.token = Some(credentials.access_token.clone());
- ()
- })
- .map_err(|_| ());
-
- Future::shared(Box::new(auth_future))
- }
+struct AuthWaiter {
+ waiter: Client,
}
-use crate::sync::waiter::Waiter;
-use crate::sync::barrier::{BarrierSync, Barrier};
-
impl Waiter for AuthWaiter {
- type Item = Self;
- type Error = Error;
- type ConditionError = ();
+ type Item = ();
+ type Error = ConditionError;
fn blocked(&self) -> bool {
let mut_client = self.waiter.inner.inner.lock().unwrap();
mut_client.auth_state == AuthState::Unauth
}
- fn condition_poller(&self)
- -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
- {
+ fn condition(&self) ->
+ Shared<Box<Future<Item=(), Error=ConditionError> + Send>> {
let bottom_client = self.waiter.get_bottom_client();
let secret = self.waiter.inner.secret.as_ref().unwrap();
let client = self.waiter.clone();
@@ -432,55 +414,32 @@ impl Waiter for AuthWaiter {
mut_client.token = Some(credentials.access_token.clone());
()
})
- .map_err(|_| ());
-
- Box::new(auth_future)
- }
+ .map_err(|_| ConditionError{});
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
- Box::new(futures::future::ok(self))
+ Future::shared(Box::new(auth_future))
}
}
impl Waiter for RatelimitWaiter {
- type Item = reqwest::r#async::Response;
- type Error = Error;
- type ConditionError = ();
+ type Item = ();
+ type Error = ConditionError;
fn blocked(&self) -> bool {
let limits = self.limit.inner.lock().unwrap();
+ println!("{}, {}, {}", limits.limit, limits.remaining, limits.inflight);
limits.remaining - limits.inflight <= 0
}
- fn condition_poller(&self)
- -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
{
/*TODO: Really basic for now*/
use futures_timer::Delay;
use std::time::Duration;
- Box::new(
- Delay::new(Duration::from_secs(60))
- .map_err(|_| ())
- )
+ Future::shared(Box::new(
+ Delay::new(Duration::from_secs(60)).map_err(|_| ConditionError{})
+ ))
}
-
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
- let client = &self.request.inner.client;
- let reqwest = client.client();
- let method = &self.request.inner.method;
- let url = &self.request.inner.url;
- let params = &self.request.inner.params;
-
- let builder = reqwest.request(method.clone(), url);
- let builder = client.apply_standard_headers(builder);
- let r = builder.query(params);
-
- let limits = &self.limit.clone();
-
- /* TODO update limits */
- Box::new(r.send().map_err(|err| Error::from(err)))
- }
-
}
/* Todo: If the polled futures returns an error than all the waiters should
@@ -495,33 +454,101 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
loop {
match &mut self.state {
RequestState::Uninitalized => {
- let mut mut_client = self.inner.client.inner.inner.lock().unwrap();
- let waiter = AuthWaiter {
- waiter: self.inner.client.clone()
- };
+ let mut_client = self.inner.client.inner.inner.lock().unwrap();
- let waiter2 = AuthWaiter2 {
+ let waiter = AuthWaiter {
waiter: self.inner.client.clone(),
- shared_future: None,
- polling: false,
};
- let f = waiter2;
+ let f = WaiterState::new(waiter,
+ &self.inner.client.inner.auth_barrier);
self.state = RequestState::WaitAuth(f);
},
- RequestState::WaitAuth(chan) => {
- let _waiter = try_ready!(chan.poll());
-
+ RequestState::WaitAuth(auth) => {
+ let _waiter = try_ready!(auth.poll());
+ match self.inner.ratelimit {
+ Some(ref limit) => {
+ let barrier = limit.barrier.clone();
+ let waiter = RatelimitWaiter {
+ limit: limit.clone(),
+ };
+ let f = WaiterState::new(waiter,
+ &barrier);
+ self.state = RequestState::WaitLimit(f);
+ },
+ None => {
+ self.state = RequestState::WaitRequest;
+ }
+ }
+ },
+ RequestState::WaitLimit(limit) => {
+ let _waiter = try_ready!(limit.poll());
+ self.state = RequestState::WaitRequest;
+ },
+ RequestState::WaitRequest => {
let client = &self.inner.client;
let reqwest = client.client();
+ if let Some(limits) = &self.inner.ratelimit {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight + 1;
+ }
+
let builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
let builder = client.apply_standard_headers(builder);
let r = builder.query(&self.inner.params);
+ /*TODO add 1 to inflight*/
+
+ let ratelimit_err = self.inner.ratelimit.clone();
+ let ratelimit_ok = self.inner.ratelimit.clone();
let f = r.send()
+ .map_err(|err| {
+
+ if let Some(limits) = ratelimit_err {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ }
+
+ err
+ })
.map(|mut response| {
println!("{:?}", response);
+ if let Some(limits) = ratelimit_ok {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+
+ let maybe_limit =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_limit {
+ mut_limits.limit = limit;
+ }
+
+ let maybe_remaining =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_remaining {
+ mut_limits.remaining = limit;
+ }
+
+ let maybe_reset =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<u32>().ok());
+
+ if let Some(reset) = maybe_reset {
+ mut_limits.reset = Some(reset);
+ }
+ }
+
response.json::<T>()
})
.and_then(|json| {
@@ -529,8 +556,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
});
self.state = RequestState::PollParse(Box::new(f));
- continue;
- }
+ },
RequestState::PollParse(future) => {
let res = try_ready!(future.poll());
return Ok(Async::Ready(res));
diff --git a/src/helix/namespaces/auth.rs b/src/helix/namespaces/auth.rs
index 8ad7956..5efc0fe 100644
--- a/src/helix/namespaces/auth.rs
+++ b/src/helix/namespaces/auth.rs
@@ -39,5 +39,5 @@ pub fn client_credentials(client: Client, secret: &str)
params.insert("grant_type".to_owned(), "client_credentials".to_owned());
params.insert("scope".to_owned(), "".to_owned());
- ApiRequest::new(url, params, client, Method::POST)
+ ApiRequest::new(url, params, client, Method::POST, None)
}
diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs
index ac820e8..083e5c4 100644
--- a/src/helix/namespaces/clips.rs
+++ b/src/helix/namespaces/clips.rs
@@ -33,6 +33,7 @@ pub fn clip(client: Client, id: &str)
API_DOMAIN + "/helix/clips" + "?id=" + id;
let params = BTreeMap::new();
+ let limit = client.default_ratelimit();
- ApiRequest::new(url, params, client, Method::GET)
+ ApiRequest::new(url, params, client, Method::GET, Some(limit))
}
diff --git a/src/lib.rs b/src/lib.rs
index 6407236..70f03dd 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,5 +1,6 @@
#![recursion_limit="128"]
#![feature(option_replace)]
+#![feature(associated_type_defaults)]
extern crate futures;
extern crate reqwest;
extern crate serde;
diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs
index 7e53b12..c0f21b8 100644
--- a/src/sync/barrier.rs
+++ b/src/sync/barrier.rs
@@ -1,130 +1,47 @@
use super::waiter::Waiter;
-use futures::sync::mpsc;
-use futures::sync::oneshot;
use futures::prelude::*;
+use std::sync::{Arc, Mutex};
+use futures::future::{Shared, SharedError};
-pub trait BarrierSync<W: Waiter> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>;
-}
+use crate::error::ConditionError;
-pub struct Barrier<W: Waiter> {
- sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>,
+#[derive(Clone)]
+pub struct Barrier {
+ inner: Arc<Mutex<BarrierRef>>,
}
-impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> {
- let (resp_tx, resp_rx) = oneshot::channel();
-
- if self.sink.is_none() {
- let (barrier_tx, barrier_rx) = mpsc::channel(40);
- self.barrier_task(barrier_rx);
- self.sink.replace(barrier_tx);
- }
-
- let chan = self.sink.as_mut().unwrap().clone();
-
- /*TODO: I want meaningful error types... */
- let f1 = chan
- .send((waiter, resp_tx))
- .map_err(|err| W::Error::from(()))
- .and_then(|_| {
- resp_rx.then(|result| {
- match result {
- Ok(Ok(result)) => Ok(result),
- Ok(Err(err)) => Err(err),
- Err(err) => Err(W::Error::from(())),
- }
- })
- });
-
- Box::new(f1)
- }
+struct BarrierRef {
+ condition: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>>
}
-impl<W: Waiter + 'static + Send> Barrier<W> {
- pub fn new() -> Barrier<W> {
+impl Barrier {
+
+ pub fn new() -> Barrier {
Barrier {
- sink: None,
+ inner: Arc::new(Mutex::new(
+ BarrierRef {
+ condition: None,
+ }))
}
}
- fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) {
-
- enum Message<W: Waiter> {
- Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)),
- OnCondition(Result<(), <W as Waiter>::ConditionError>),
- }
+ pub fn condition(&self, waiter: &impl Waiter)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
+ {
+ let mut mut_barrier = self.inner.lock().unwrap();
+ let maybe_condition = &mut mut_barrier.condition;
- let mut polling = false;
- let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
- let mut waiters = Vec::new();
- let f1 = receiver.map(|request| Message::Request(request));
- let f2 = on_condition_rx.map(|result| Message::OnCondition(result));
+ let f = maybe_condition.get_or_insert_with(|| {
+ waiter.condition()
+ });
- let inner_condition = on_condition_tx.clone();
let f =
- f1.select(f2).for_each(move |message| {
- match message {
- Message::Request((waiter, backchan)) => {
- if waiter.blocked() && !polling {
- println!("locked");
-
- let c1 = inner_condition.clone();
- let f = waiter
- .condition_poller()
- .map(|_| ())
- .then(|result| {
- c1.send(result).wait();
- Ok(())
- });
- tokio::spawn(f);
- polling = true;
-
- waiters.push((waiter, backchan));
- } else if waiter.blocked() || polling {
- println!("polling");
- waiters.push((waiter, backchan));
- } else {
- println!("Pass along waiter!");
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- Message::OnCondition(result) => {
- polling = false;
- /*Resubmit all waiters back to the request channel
- * At least one waiter will pass the barrier
- */
- match result {
- Ok(_) => {
- while waiters.len() > 0 {
- let (waiter, backchan) = waiters.pop().unwrap();
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- _ => { panic!("condition channel closed") }
- }
- }
- }
-
-
-
- Ok(())
- })
- .map(|_| ())
- .map_err(|_| ());
-
- tokio::spawn(f);
+ if let Some(_) = f.peek() {
+ let condition = waiter.condition();
+ maybe_condition.replace(condition);
+ maybe_condition.as_ref().unwrap()
+ } else { f };
+ f.clone()
}
}
+
diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs
index 656c42e..8039280 100644
--- a/src/sync/waiter.rs
+++ b/src/sync/waiter.rs
@@ -1,13 +1,13 @@
use futures::sync::oneshot;
use futures::Future;
+use futures::future::{Shared, SharedError};
+use crate::error::ConditionError;
pub trait Waiter {
- type Item: Send + 'static;
- type Error: From<Self::ConditionError>
- + From<oneshot::Canceled> + From<()> + Send + 'static;
- type ConditionError: Send + Clone + 'static;
+ type Item: Default;
+ type Error: From<SharedError<ConditionError>>;
fn blocked(&self) -> bool;
- fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>;
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>;
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>;
}