summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-26 06:17:23 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-26 06:17:23 +0000
commitadcc342c9c1674ce88ebaf7f9359bde9665ca3f8 (patch)
treed75e5866350f9c7fe59f31acd6ccf89c16d8ce7d
parent2f1682162174f5fee4f4297f616e8f66b7c70dca (diff)
:WIP: Allow iteration over endpoints that provide pagination
-rw-r--r--src/helix/mod.rs106
1 files changed, 99 insertions, 7 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index 5c51c22..5daa498 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -356,7 +356,8 @@ struct RequestRef {
}
enum RequestState<T> {
- Uninitalized,
+ SetupRequest,
+ SetupBarriers,
WaitAuth(WaiterState<AuthWaiter>),
SetupRatelimit,
WaitLimit(WaiterState<RatelimitWaiter>),
@@ -369,6 +370,19 @@ pub struct ApiRequest<T> {
state: RequestState<T>,
attempt: u32,
max_attempts: u32,
+ pagination: Option<String>,
+}
+
+enum IterableApiRequestState<T> {
+ Start,
+ PollInner(ApiRequest<T>),
+ Finished,
+}
+
+pub struct IterableApiRequest<T> {
+ inner: Arc<RequestRef>,
+ pagination: Option<String>,
+ state: IterableApiRequestState<T>,
}
use self::models::PaginationTrait;
@@ -394,9 +408,10 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
method: method,
ratelimit: ratelimit,
}),
- state: RequestState::Uninitalized,
+ state: RequestState::SetupRequest,
attempt: 0,
max_attempts: 1,
+ pagination: None,
}
}
}
@@ -492,7 +507,7 @@ use crate::sync::waiter::Waiter;
struct WaiterState<W: Waiter> {
polling: bool,
- shared_future: Option<(Shared<Box< Future<Item=(), Error=ConditionError> + Send>>)>,
+ shared_future: Option<(Shared<Box<Future<Item=(), Error=ConditionError> + Send>>)>,
waiter: W,
barrier: Barrier,
}
@@ -552,7 +567,7 @@ impl Waiter for AuthWaiter {
fn condition(&self) ->
Shared<Box<Future<Item=(), Error=ConditionError> + Send>> {
- /* If a secret is not provided than just immediately return */
+ /* If a secret is not provided then just immediately return */
let secret = self.waiter.secret().unwrap();
let bottom_client = self.waiter.get_bottom_client();
let client = self.waiter.clone();
@@ -613,7 +628,7 @@ macro_rules! retry_ready {
Err(e) => {
if $s.attempt < $s.max_attempts {
$s.attempt += 1;
- $s.state = RequestState::Uninitalized;
+ $s.state = RequestState::SetupBarriers;
continue;
} else {
return Err(e.into());
@@ -622,6 +637,79 @@ macro_rules! retry_ready {
})
}
+use futures::Stream;
+
+/*
+impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
+ type Item = <Self as Stream>::Item;
+ type Error = <Self as Stream>::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let stream = self as &mut Stream<Item=Self::Item, Error=Self::Error>;
+ let state = stream.poll();
+
+ match state {
+ Err(err) => Err(err),
+ Ok(maybe_ready) => {
+ match maybe_ready {
+ Async::NotReady => Ok(Async::NotReady),
+ Async::Ready(item) => {
+ match item {
+ Some(result) => Ok(Async::Ready(result)),
+ None => panic!("Request future returned None for first result!")
+ }
+ }
+ }
+ }
+ }
+ }
+}
+*/
+impl<T: DeserializeOwned + 'static + Send> Stream for IterableApiRequest<T> {
+ type Item = T;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match &mut self.state {
+ IterableApiRequestState::Start => {
+ self.state =
+ IterableApiRequestState::PollInner(
+ ApiRequest {
+ inner: self.inner.clone(),
+ state: RequestState::SetupRequest,
+ attempt: 0,
+ max_attempts: 1,
+ pagination: None
+ });
+ },
+ IterableApiRequestState::PollInner(request) => {
+ let f = request as &mut Future<Item=Self::Item, Error=Self::Error>;
+ match f.poll() {
+ Err(err) => {
+ self.state = IterableApiRequestState::Finished;
+ return Err(err);
+ },
+ Ok(state) => {
+ match state {
+ Async::NotReady => return Ok(Async::NotReady),
+ Async::Ready(res) => {
+ //TODO check pagination
+ self.state = IterableApiRequestState::Finished;
+ return Ok(Async::Ready(Some(res)));
+ }
+ }
+ }
+ }
+ },
+ IterableApiRequestState::Finished => {
+ return Ok(Async::Ready(None));
+ }
+ }
+ }
+ }
+}
+
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
type Error = Error;
@@ -629,7 +717,11 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match &mut self.state {
- RequestState::Uninitalized => {
+ RequestState::SetupRequest => {
+ self.attempt = 0;
+ self.state = RequestState::SetupBarriers;
+ }
+ RequestState::SetupBarriers => {
match self.inner.client.inner.as_ref() {
ClientType::Auth(inner) => {
let waiter = AuthWaiter {
@@ -729,7 +821,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
RequestState::PollParse(future) => {
let res = retry_ready!(self, future.poll());
return Ok(Async::Ready(res));
- }
+ },
}
}
}