summaryrefslogtreecommitdiff
path: root/src/helix/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/helix/mod.rs')
-rw-r--r--src/helix/mod.rs122
1 files changed, 75 insertions, 47 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index 5daa498..15f2008 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -247,7 +247,7 @@ impl Client {
* as other requests.
*
* Clients created with 'new' are bottom clients and calls
- * to authenticate stack a authed client on top
+ * to authenticate stack an authed client on top
*/
fn get_bottom_client(&self) -> Client {
match self.inner.as_ref() {
@@ -355,6 +355,29 @@ struct RequestRef {
method: Method,
}
+impl RequestRef {
+ pub fn new(url: String,
+ params: BTreeMap<&str, &str>,
+ client: Client,
+ method: Method,
+ ratelimit: Option<RatelimitKey>,
+ ) -> RequestRef
+ {
+ let mut owned_params = BTreeMap::new();
+ for (key, value) in params {
+ owned_params.insert(key.to_owned(), value.to_owned());
+ }
+
+ RequestRef {
+ url: url,
+ params: owned_params,
+ client: client,
+ method: method,
+ ratelimit: ratelimit,
+ }
+ }
+}
+
enum RequestState<T> {
SetupRequest,
SetupBarriers,
@@ -381,7 +404,6 @@ enum IterableApiRequestState<T> {
pub struct IterableApiRequest<T> {
inner: Arc<RequestRef>,
- pagination: Option<String>,
state: IterableApiRequestState<T>,
}
@@ -395,19 +417,8 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
ratelimit: Option<RatelimitKey>,
) -> ApiRequest<T>
{
- let mut owned_params = BTreeMap::new();
- for (key, value) in params {
- owned_params.insert(key.to_owned(), value.to_owned());
- }
-
ApiRequest {
- inner: Arc::new( RequestRef {
- url: url,
- params: owned_params,
- client: client,
- method: method,
- ratelimit: ratelimit,
- }),
+ inner: Arc::new(RequestRef::new(url, params, client, method, ratelimit)),
state: RequestState::SetupRequest,
attempt: 0,
max_attempts: 1,
@@ -416,6 +427,25 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
}
}
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> IterableApiRequest<T> {
+
+ pub fn new(url: String,
+ params: BTreeMap<&str, &str>,
+ client: Client,
+ method: Method,
+ ratelimit: Option<RatelimitKey>
+ ) -> IterableApiRequest<T>
+ {
+ let request_ref =
+ Arc::new(RequestRef::new(url, params, client, method, ratelimit));
+
+ IterableApiRequest {
+ inner: request_ref,
+ state: IterableApiRequestState::Start,
+ }
+ }
+}
+
pub struct RatelimitWaiter {
limit: Ratelimit,
@@ -639,33 +669,7 @@ macro_rules! retry_ready {
use futures::Stream;
-/*
-impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
- type Item = <Self as Stream>::Item;
- type Error = <Self as Stream>::Error;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- let stream = self as &mut Stream<Item=Self::Item, Error=Self::Error>;
- let state = stream.poll();
-
- match state {
- Err(err) => Err(err),
- Ok(maybe_ready) => {
- match maybe_ready {
- Async::NotReady => Ok(Async::NotReady),
- Async::Ready(item) => {
- match item {
- Some(result) => Ok(Async::Ready(result)),
- None => panic!("Request future returned None for first result!")
- }
- }
- }
- }
- }
- }
-}
-*/
-impl<T: DeserializeOwned + 'static + Send> Stream for IterableApiRequest<T> {
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> Stream for IterableApiRequest<T> {
type Item = T;
type Error = Error;
@@ -694,8 +698,25 @@ impl<T: DeserializeOwned + 'static + Send> Stream for IterableApiRequest<T> {
match state {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(res) => {
- //TODO check pagination
- self.state = IterableApiRequestState::Finished;
+ let cursor =
+ res.cursor().as_ref()
+ .and_then(|cursor| cursor.cursor.clone());
+
+ match cursor {
+ Some(cursor) => {
+ self.state = IterableApiRequestState::PollInner(
+ ApiRequest {
+ inner: self.inner.clone(),
+ state: RequestState::SetupRequest,
+ attempt: 0,
+ max_attempts: 1,
+ pagination: Some(cursor.clone()),
+ });
+ },
+ None => {
+ self.state = IterableApiRequestState::Finished;
+ }
+ }
return Ok(Async::Ready(Some(res)));
}
}
@@ -780,9 +801,16 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
mut_limits.inflight = mut_limits.inflight + 1;
}
- let builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
- let builder = client.apply_standard_headers(builder);
- let r = builder.query(&self.inner.params);
+ let mut builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
+ builder = client.apply_standard_headers(builder);
+ builder = builder.query(&self.inner.params);
+ builder =
+ if let Some(cursor) = &self.pagination {
+ builder.query(&[("after", cursor)])
+ } else {
+ builder
+ };
+
let key_err = self.inner.ratelimit.clone();
let key_ok = self.inner.ratelimit.clone();
@@ -790,7 +818,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
let client_ok = client.clone();
let f =
- r.send()
+ builder.send()
.map_err(move |err| {
if let Some(key) = key_err {
if let Some(limits) = client_err.ratelimit(key) {