diff options
Diffstat (limited to 'src/helix/mod.rs')
-rw-r--r-- | src/helix/mod.rs | 161 |
1 files changed, 92 insertions, 69 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs index c6eb1f5..5c51c22 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -9,7 +9,6 @@ use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -use std::iter::FromIterator; use crate::error::ConditionError; @@ -68,7 +67,7 @@ impl ClientTrait for Client { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { use self::ClientType::*; match self.inner.as_ref() { Unauth(inner) => inner.ratelimit(key), @@ -120,7 +119,7 @@ pub trait ClientTrait { fn id<'a>(&'a self) -> &'a str; fn domain<'a>(&'a self) -> &'a str; - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>; + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>; fn authenticated(&self) -> bool; fn scopes(&self) -> Vec<Scope>; @@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient { &self.domain } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { - None + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { + self.ratelimits.get(&key) } fn authenticated(&self) -> bool { @@ -171,7 +170,7 @@ impl ClientTrait for AuthClient { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { match self.previous.inner.as_ref() { ClientType::Auth(auth) => auth.ratelimit(key), ClientType::Unauth(unauth) => unauth.ratelimit(key), @@ -201,17 +200,6 @@ struct AuthStateRef { state: AuthState, } -struct ClientRef { - id: String, - secret: Option<String>, - reqwest: ReqwestClient, - domain: &'static str, - ratelimits: RatelimitMap, - auth_state: Mutex<AuthStateRef>, - auth_barrier: Barrier, - previous: Option<Client>, -} - impl Client { pub fn new(id: &str) -> Client { let client = ReqwestClient::new(); @@ -359,10 +347,6 @@ impl AuthClientBuilder { use std::collections::BTreeMap; use reqwest::Method; -struct Request { - inner: Arc<RequestRef>, -} - struct RequestRef { url: String, params: BTreeMap<String, String>, @@ -382,10 +366,13 @@ enum RequestState<T> { pub struct ApiRequest<T> { inner: Arc<RequestRef>, - state: RequestState<T> + state: RequestState<T>, + attempt: u32, + max_attempts: u32, } -impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { +use self::models::PaginationTrait; +impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> { pub fn new(url: String, params: BTreeMap<&str, &str>, @@ -407,7 +394,9 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { method: method, ratelimit: ratelimit, }), - state: RequestState::Uninitalized + state: RequestState::Uninitalized, + attempt: 0, + max_attempts: 1, } } } @@ -449,6 +438,8 @@ impl Ratelimit { } } + + #[derive(Debug, Clone)] pub struct RatelimitRef { limit: i32, @@ -460,6 +451,41 @@ pub struct RatelimitRef { header_reset: String, } + +impl RatelimitRef { + pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) { + let maybe_limit = + headers + .get(&self.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_limit { + self.limit = limit; + } + + let maybe_remaining = + headers + .get(&self.header_remaining) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_remaining { + self.remaining = limit; + } + + let maybe_reset = + headers + .get(&self.header_reset) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<u32>().ok()); + + if let Some(reset) = maybe_reset { + self.reset = Some(reset); + } + } +} + use futures::future::SharedError; use crate::sync::barrier::Barrier; use crate::sync::waiter::Waiter; @@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter { * get that error */ +/* Macro ripped directly from try_ready and simplies retries if any error occurs + * and there are remaning retry attempt + */ +#[macro_export] +macro_rules! retry_ready { + ($s:expr, $e:expr) => (match $e { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady), + Err(e) => { + if $s.attempt < $s.max_attempts { + $s.attempt += 1; + $s.state = RequestState::Uninitalized; + continue; + } else { + return Err(e.into()); + } + } + }) +} + impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; type Error = Error; @@ -600,7 +646,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { } }, RequestState::WaitAuth(auth) => { - let _waiter = try_ready!(auth.poll()); + let _waiter = retry_ready!(self, auth.poll()); self.state = RequestState::SetupRatelimit; }, RequestState::SetupRatelimit => { @@ -624,16 +670,17 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { } }, RequestState::WaitLimit(limit) => { - let _waiter = try_ready!(limit.poll()); + let _waiter = retry_ready!(self, limit.poll()); self.state = RequestState::WaitRequest; }, RequestState::WaitRequest => { - let client = &self.inner.client; + let client = self.inner.client.clone(); + let c_ref = &client; let reqwest = client.reqwest(); let limits = self.inner.ratelimit.as_ref().and_then(|key| { - client.ratelimit(key.clone()) + c_ref.ratelimit(key.clone()) }); if let Some(limits) = limits { @@ -645,53 +692,30 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { let builder = client.apply_standard_headers(builder); let r = builder.query(&self.inner.params); - let limits_err = limits.clone(); - let limits_ok = limits.clone(); + let key_err = self.inner.ratelimit.clone(); + let key_ok = self.inner.ratelimit.clone(); + let client_err = client.clone(); + let client_ok = client.clone(); - let f = r.send() + let f = + r.send() .map_err(move |err| { - - if let Some(limits) = limits_err { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; + if let Some(key) = key_err { + if let Some(limits) = client_err.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + } } err }) .map(move |mut response| { println!("{:?}", response); - if let Some(limits) = limits_ok { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; - - let maybe_limit = - response.headers() - .get(&mut_limits.header_limit) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<i32>().ok()); - - if let Some(limit) = maybe_limit { - mut_limits.limit = limit; - } - - let maybe_remaining = - response.headers() - .get(&mut_limits.header_remaining) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<i32>().ok()); - - if let Some(limit) = maybe_remaining { - mut_limits.remaining = limit; - } - - let maybe_reset = - response.headers() - .get(&mut_limits.header_reset) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<u32>().ok()); - - if let Some(reset) = maybe_reset { - mut_limits.reset = Some(reset); + if let Some(key) = key_ok { + if let Some(limits) = client_ok.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + mut_limits.update_from_headers(response.headers()); } } @@ -700,11 +724,10 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { .and_then(|json| { json }); - self.state = RequestState::PollParse(Box::new(f)); }, RequestState::PollParse(future) => { - let res = try_ready!(future.poll()); + let res = retry_ready!(self, future.poll()); return Ok(Async::Ready(res)); } } |