From 2e08d0c8abbfb9f989c61acb4f6c580719a65b42 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 27 Dec 2018 02:29:37 +0000 Subject: :Allow iteration over endpoints that provide pagination --- src/helix/mod.rs | 122 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 47 deletions(-) (limited to 'src/helix/mod.rs') diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 5daa498..15f2008 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -247,7 +247,7 @@ impl Client { * as other requests. * * Clients created with 'new' are bottom clients and calls - * to authenticate stack a authed client on top + * to authenticate stack an authed client on top */ fn get_bottom_client(&self) -> Client { match self.inner.as_ref() { @@ -355,6 +355,29 @@ struct RequestRef { method: Method, } +impl RequestRef { + pub fn new(url: String, + params: BTreeMap<&str, &str>, + client: Client, + method: Method, + ratelimit: Option, + ) -> RequestRef + { + let mut owned_params = BTreeMap::new(); + for (key, value) in params { + owned_params.insert(key.to_owned(), value.to_owned()); + } + + RequestRef { + url: url, + params: owned_params, + client: client, + method: method, + ratelimit: ratelimit, + } + } +} + enum RequestState { SetupRequest, SetupBarriers, @@ -381,7 +404,6 @@ enum IterableApiRequestState { pub struct IterableApiRequest { inner: Arc, - pagination: Option, state: IterableApiRequestState, } @@ -395,19 +417,8 @@ impl ApiRequest { ratelimit: Option, ) -> ApiRequest { - let mut owned_params = BTreeMap::new(); - for (key, value) in params { - owned_params.insert(key.to_owned(), value.to_owned()); - } - ApiRequest { - inner: Arc::new( RequestRef { - url: url, - params: owned_params, - client: client, - method: method, - ratelimit: ratelimit, - }), + inner: Arc::new(RequestRef::new(url, params, client, method, ratelimit)), state: RequestState::SetupRequest, attempt: 0, max_attempts: 1, @@ -416,6 +427,25 @@ impl ApiRequest { } } +impl IterableApiRequest { + + pub fn new(url: String, + params: BTreeMap<&str, &str>, + client: Client, + method: Method, + ratelimit: Option + ) -> IterableApiRequest + { + let request_ref = + Arc::new(RequestRef::new(url, params, client, method, ratelimit)); + + IterableApiRequest { + inner: request_ref, + state: IterableApiRequestState::Start, + } + } +} + pub struct RatelimitWaiter { limit: Ratelimit, @@ -639,33 +669,7 @@ macro_rules! retry_ready { use futures::Stream; -/* -impl Future for ApiRequest { - type Item = ::Item; - type Error = ::Error; - - fn poll(&mut self) -> Poll { - let stream = self as &mut Stream; - let state = stream.poll(); - - match state { - Err(err) => Err(err), - Ok(maybe_ready) => { - match maybe_ready { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(item) => { - match item { - Some(result) => Ok(Async::Ready(result)), - None => panic!("Request future returned None for first result!") - } - } - } - } - } - } -} -*/ -impl Stream for IterableApiRequest { +impl Stream for IterableApiRequest { type Item = T; type Error = Error; @@ -694,8 +698,25 @@ impl Stream for IterableApiRequest { match state { Async::NotReady => return Ok(Async::NotReady), Async::Ready(res) => { - //TODO check pagination - self.state = IterableApiRequestState::Finished; + let cursor = + res.cursor().as_ref() + .and_then(|cursor| cursor.cursor.clone()); + + match cursor { + Some(cursor) => { + self.state = IterableApiRequestState::PollInner( + ApiRequest { + inner: self.inner.clone(), + state: RequestState::SetupRequest, + attempt: 0, + max_attempts: 1, + pagination: Some(cursor.clone()), + }); + }, + None => { + self.state = IterableApiRequestState::Finished; + } + } return Ok(Async::Ready(Some(res))); } } @@ -780,9 +801,16 @@ impl Future for ApiRequest { mut_limits.inflight = mut_limits.inflight + 1; } - let builder = reqwest.request(self.inner.method.clone(), &self.inner.url); - let builder = client.apply_standard_headers(builder); - let r = builder.query(&self.inner.params); + let mut builder = reqwest.request(self.inner.method.clone(), &self.inner.url); + builder = client.apply_standard_headers(builder); + builder = builder.query(&self.inner.params); + builder = + if let Some(cursor) = &self.pagination { + builder.query(&[("after", cursor)]) + } else { + builder + }; + let key_err = self.inner.ratelimit.clone(); let key_ok = self.inner.ratelimit.clone(); @@ -790,7 +818,7 @@ impl Future for ApiRequest { let client_ok = client.clone(); let f = - r.send() + builder.send() .map_err(move |err| { if let Some(key) = key_err { if let Some(limits) = client_err.ratelimit(key) { -- cgit v1.2.3