summaryrefslogtreecommitdiff
path: root/src/helix/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/helix/mod.rs')
-rw-r--r--src/helix/mod.rs161
1 files changed, 92 insertions, 69 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index c6eb1f5..5c51c22 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -9,7 +9,6 @@ use futures::Poll;
use serde::de::DeserializeOwned;
use futures::Async;
use futures::try_ready;
-use std::iter::FromIterator;
use crate::error::ConditionError;
@@ -68,7 +67,7 @@ impl ClientTrait for Client {
}
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
use self::ClientType::*;
match self.inner.as_ref() {
Unauth(inner) => inner.ratelimit(key),
@@ -120,7 +119,7 @@ pub trait ClientTrait {
fn id<'a>(&'a self) -> &'a str;
fn domain<'a>(&'a self) -> &'a str;
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>;
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>;
fn authenticated(&self) -> bool;
fn scopes(&self) -> Vec<Scope>;
@@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient {
&self.domain
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
- None
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ self.ratelimits.get(&key)
}
fn authenticated(&self) -> bool {
@@ -171,7 +170,7 @@ impl ClientTrait for AuthClient {
}
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
match self.previous.inner.as_ref() {
ClientType::Auth(auth) => auth.ratelimit(key),
ClientType::Unauth(unauth) => unauth.ratelimit(key),
@@ -201,17 +200,6 @@ struct AuthStateRef {
state: AuthState,
}
-struct ClientRef {
- id: String,
- secret: Option<String>,
- reqwest: ReqwestClient,
- domain: &'static str,
- ratelimits: RatelimitMap,
- auth_state: Mutex<AuthStateRef>,
- auth_barrier: Barrier,
- previous: Option<Client>,
-}
-
impl Client {
pub fn new(id: &str) -> Client {
let client = ReqwestClient::new();
@@ -359,10 +347,6 @@ impl AuthClientBuilder {
use std::collections::BTreeMap;
use reqwest::Method;
-struct Request {
- inner: Arc<RequestRef>,
-}
-
struct RequestRef {
url: String,
params: BTreeMap<String, String>,
@@ -382,10 +366,13 @@ enum RequestState<T> {
pub struct ApiRequest<T> {
inner: Arc<RequestRef>,
- state: RequestState<T>
+ state: RequestState<T>,
+ attempt: u32,
+ max_attempts: u32,
}
-impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
+use self::models::PaginationTrait;
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
pub fn new(url: String,
params: BTreeMap<&str, &str>,
@@ -407,7 +394,9 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
method: method,
ratelimit: ratelimit,
}),
- state: RequestState::Uninitalized
+ state: RequestState::Uninitalized,
+ attempt: 0,
+ max_attempts: 1,
}
}
}
@@ -449,6 +438,8 @@ impl Ratelimit {
}
}
+
+
#[derive(Debug, Clone)]
pub struct RatelimitRef {
limit: i32,
@@ -460,6 +451,41 @@ pub struct RatelimitRef {
header_reset: String,
}
+
+impl RatelimitRef {
+ pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
+ let maybe_limit =
+ headers
+ .get(&self.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_limit {
+ self.limit = limit;
+ }
+
+ let maybe_remaining =
+ headers
+ .get(&self.header_remaining)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_remaining {
+ self.remaining = limit;
+ }
+
+ let maybe_reset =
+ headers
+ .get(&self.header_reset)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<u32>().ok());
+
+ if let Some(reset) = maybe_reset {
+ self.reset = Some(reset);
+ }
+ }
+}
+
use futures::future::SharedError;
use crate::sync::barrier::Barrier;
use crate::sync::waiter::Waiter;
@@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter {
* get that error
*/
+/* Macro ripped directly from try_ready and simplies retries if any error occurs
+ * and there are remaning retry attempt
+ */
+#[macro_export]
+macro_rules! retry_ready {
+ ($s:expr, $e:expr) => (match $e {
+ Ok(futures::prelude::Async::Ready(t)) => t,
+ Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady),
+ Err(e) => {
+ if $s.attempt < $s.max_attempts {
+ $s.attempt += 1;
+ $s.state = RequestState::Uninitalized;
+ continue;
+ } else {
+ return Err(e.into());
+ }
+ }
+ })
+}
+
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
type Error = Error;
@@ -600,7 +646,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
}
},
RequestState::WaitAuth(auth) => {
- let _waiter = try_ready!(auth.poll());
+ let _waiter = retry_ready!(self, auth.poll());
self.state = RequestState::SetupRatelimit;
},
RequestState::SetupRatelimit => {
@@ -624,16 +670,17 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
}
},
RequestState::WaitLimit(limit) => {
- let _waiter = try_ready!(limit.poll());
+ let _waiter = retry_ready!(self, limit.poll());
self.state = RequestState::WaitRequest;
},
RequestState::WaitRequest => {
- let client = &self.inner.client;
+ let client = self.inner.client.clone();
+ let c_ref = &client;
let reqwest = client.reqwest();
let limits =
self.inner.ratelimit.as_ref().and_then(|key| {
- client.ratelimit(key.clone())
+ c_ref.ratelimit(key.clone())
});
if let Some(limits) = limits {
@@ -645,53 +692,30 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
let builder = client.apply_standard_headers(builder);
let r = builder.query(&self.inner.params);
- let limits_err = limits.clone();
- let limits_ok = limits.clone();
+ let key_err = self.inner.ratelimit.clone();
+ let key_ok = self.inner.ratelimit.clone();
+ let client_err = client.clone();
+ let client_ok = client.clone();
- let f = r.send()
+ let f =
+ r.send()
.map_err(move |err| {
-
- if let Some(limits) = limits_err {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
+ if let Some(key) = key_err {
+ if let Some(limits) = client_err.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ }
}
err
})
.map(move |mut response| {
println!("{:?}", response);
- if let Some(limits) = limits_ok {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
-
- let maybe_limit =
- response.headers()
- .get(&mut_limits.header_limit)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_limit {
- mut_limits.limit = limit;
- }
-
- let maybe_remaining =
- response.headers()
- .get(&mut_limits.header_remaining)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_remaining {
- mut_limits.remaining = limit;
- }
-
- let maybe_reset =
- response.headers()
- .get(&mut_limits.header_reset)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<u32>().ok());
-
- if let Some(reset) = maybe_reset {
- mut_limits.reset = Some(reset);
+ if let Some(key) = key_ok {
+ if let Some(limits) = client_ok.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ mut_limits.update_from_headers(response.headers());
}
}
@@ -700,11 +724,10 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
.and_then(|json| {
json
});
-
self.state = RequestState::PollParse(Box::new(f));
},
RequestState::PollParse(future) => {
- let res = try_ready!(future.poll());
+ let res = retry_ready!(self, future.poll());
return Ok(Async::Ready(res));
}
}