diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-26 06:17:23 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-26 06:17:23 +0000 |
commit | adcc342c9c1674ce88ebaf7f9359bde9665ca3f8 (patch) | |
tree | d75e5866350f9c7fe59f31acd6ccf89c16d8ce7d | |
parent | 2f1682162174f5fee4f4297f616e8f66b7c70dca (diff) |
:WIP: Allow iteration over endpoints that provide pagination
-rw-r--r-- | src/helix/mod.rs | 106 |
1 files changed, 99 insertions, 7 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 5c51c22..5daa498 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -356,7 +356,8 @@ struct RequestRef { } enum RequestState<T> { - Uninitalized, + SetupRequest, + SetupBarriers, WaitAuth(WaiterState<AuthWaiter>), SetupRatelimit, WaitLimit(WaiterState<RatelimitWaiter>), @@ -369,6 +370,19 @@ pub struct ApiRequest<T> { state: RequestState<T>, attempt: u32, max_attempts: u32, + pagination: Option<String>, +} + +enum IterableApiRequestState<T> { + Start, + PollInner(ApiRequest<T>), + Finished, +} + +pub struct IterableApiRequest<T> { + inner: Arc<RequestRef>, + pagination: Option<String>, + state: IterableApiRequestState<T>, } use self::models::PaginationTrait; @@ -394,9 +408,10 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> { method: method, ratelimit: ratelimit, }), - state: RequestState::Uninitalized, + state: RequestState::SetupRequest, attempt: 0, max_attempts: 1, + pagination: None, } } } @@ -492,7 +507,7 @@ use crate::sync::waiter::Waiter; struct WaiterState<W: Waiter> { polling: bool, - shared_future: Option<(Shared<Box< Future<Item=(), Error=ConditionError> + Send>>)>, + shared_future: Option<(Shared<Box<Future<Item=(), Error=ConditionError> + Send>>)>, waiter: W, barrier: Barrier, } @@ -552,7 +567,7 @@ impl Waiter for AuthWaiter { fn condition(&self) -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>> { - /* If a secret is not provided than just immediately return */ + /* If a secret is not provided then just immediately return */ let secret = self.waiter.secret().unwrap(); let bottom_client = self.waiter.get_bottom_client(); let client = self.waiter.clone(); @@ -613,7 +628,7 @@ macro_rules! retry_ready { Err(e) => { if $s.attempt < $s.max_attempts { $s.attempt += 1; - $s.state = RequestState::Uninitalized; + $s.state = RequestState::SetupBarriers; continue; } else { return Err(e.into()); @@ -622,6 +637,79 @@ macro_rules! retry_ready { }) } +use futures::Stream; + +/* +impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { + type Item = <Self as Stream>::Item; + type Error = <Self as Stream>::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let stream = self as &mut Stream<Item=Self::Item, Error=Self::Error>; + let state = stream.poll(); + + match state { + Err(err) => Err(err), + Ok(maybe_ready) => { + match maybe_ready { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(item) => { + match item { + Some(result) => Ok(Async::Ready(result)), + None => panic!("Request future returned None for first result!") + } + } + } + } + } + } +} +*/ +impl<T: DeserializeOwned + 'static + Send> Stream for IterableApiRequest<T> { + type Item = T; + type Error = Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + match &mut self.state { + IterableApiRequestState::Start => { + self.state = + IterableApiRequestState::PollInner( + ApiRequest { + inner: self.inner.clone(), + state: RequestState::SetupRequest, + attempt: 0, + max_attempts: 1, + pagination: None + }); + }, + IterableApiRequestState::PollInner(request) => { + let f = request as &mut Future<Item=Self::Item, Error=Self::Error>; + match f.poll() { + Err(err) => { + self.state = IterableApiRequestState::Finished; + return Err(err); + }, + Ok(state) => { + match state { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(res) => { + //TODO check pagination + self.state = IterableApiRequestState::Finished; + return Ok(Async::Ready(Some(res))); + } + } + } + } + }, + IterableApiRequestState::Finished => { + return Ok(Async::Ready(None)); + } + } + } + } +} + impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; type Error = Error; @@ -629,7 +717,11 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { match &mut self.state { - RequestState::Uninitalized => { + RequestState::SetupRequest => { + self.attempt = 0; + self.state = RequestState::SetupBarriers; + } + RequestState::SetupBarriers => { match self.inner.client.inner.as_ref() { ClientType::Auth(inner) => { let waiter = AuthWaiter { @@ -729,7 +821,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { RequestState::PollParse(future) => { let res = retry_ready!(self, future.poll()); return Ok(Async::Ready(res)); - } + }, } } } |