From 2f1682162174f5fee4f4297f616e8f66b7c70dca Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 24 Dec 2018 23:08:26 +0000 Subject: Fix lifetimes on ratelimit trait --- src/helix/mod.rs | 161 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 92 insertions(+), 69 deletions(-) (limited to 'src/helix/mod.rs') diff --git a/src/helix/mod.rs b/src/helix/mod.rs index c6eb1f5..5c51c22 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -9,7 +9,6 @@ use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -use std::iter::FromIterator; use crate::error::ConditionError; @@ -68,7 +67,7 @@ impl ClientTrait for Client { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { use self::ClientType::*; match self.inner.as_ref() { Unauth(inner) => inner.ratelimit(key), @@ -120,7 +119,7 @@ pub trait ClientTrait { fn id<'a>(&'a self) -> &'a str; fn domain<'a>(&'a self) -> &'a str; - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>; + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>; fn authenticated(&self) -> bool; fn scopes(&self) -> Vec; @@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient { &self.domain } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { - None + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { + self.ratelimits.get(&key) } fn authenticated(&self) -> bool { @@ -171,7 +170,7 @@ impl ClientTrait for AuthClient { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { match self.previous.inner.as_ref() { ClientType::Auth(auth) => auth.ratelimit(key), ClientType::Unauth(unauth) => unauth.ratelimit(key), @@ -201,17 +200,6 @@ struct AuthStateRef { state: AuthState, } -struct ClientRef { - id: String, - secret: Option, - reqwest: ReqwestClient, - domain: &'static str, - ratelimits: RatelimitMap, - auth_state: Mutex, - auth_barrier: Barrier, - previous: Option, -} - impl Client { pub fn new(id: &str) -> Client { let client = ReqwestClient::new(); @@ -359,10 +347,6 @@ impl AuthClientBuilder { use std::collections::BTreeMap; use reqwest::Method; -struct Request { - inner: Arc, -} - struct RequestRef { url: String, params: BTreeMap, @@ -382,10 +366,13 @@ enum RequestState { pub struct ApiRequest { inner: Arc, - state: RequestState + state: RequestState, + attempt: u32, + max_attempts: u32, } -impl ApiRequest { +use self::models::PaginationTrait; +impl ApiRequest { pub fn new(url: String, params: BTreeMap<&str, &str>, @@ -407,7 +394,9 @@ impl ApiRequest { method: method, ratelimit: ratelimit, }), - state: RequestState::Uninitalized + state: RequestState::Uninitalized, + attempt: 0, + max_attempts: 1, } } } @@ -449,6 +438,8 @@ impl Ratelimit { } } + + #[derive(Debug, Clone)] pub struct RatelimitRef { limit: i32, @@ -460,6 +451,41 @@ pub struct RatelimitRef { header_reset: String, } + +impl RatelimitRef { + pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) { + let maybe_limit = + headers + .get(&self.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(limit) = maybe_limit { + self.limit = limit; + } + + let maybe_remaining = + headers + .get(&self.header_remaining) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(limit) = maybe_remaining { + self.remaining = limit; + } + + let maybe_reset = + headers + .get(&self.header_reset) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(reset) = maybe_reset { + self.reset = Some(reset); + } + } +} + use futures::future::SharedError; use crate::sync::barrier::Barrier; use crate::sync::waiter::Waiter; @@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter { * get that error */ +/* Macro ripped directly from try_ready and simplies retries if any error occurs + * and there are remaning retry attempt + */ +#[macro_export] +macro_rules! retry_ready { + ($s:expr, $e:expr) => (match $e { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady), + Err(e) => { + if $s.attempt < $s.max_attempts { + $s.attempt += 1; + $s.state = RequestState::Uninitalized; + continue; + } else { + return Err(e.into()); + } + } + }) +} + impl Future for ApiRequest { type Item = T; type Error = Error; @@ -600,7 +646,7 @@ impl Future for ApiRequest { } }, RequestState::WaitAuth(auth) => { - let _waiter = try_ready!(auth.poll()); + let _waiter = retry_ready!(self, auth.poll()); self.state = RequestState::SetupRatelimit; }, RequestState::SetupRatelimit => { @@ -624,16 +670,17 @@ impl Future for ApiRequest { } }, RequestState::WaitLimit(limit) => { - let _waiter = try_ready!(limit.poll()); + let _waiter = retry_ready!(self, limit.poll()); self.state = RequestState::WaitRequest; }, RequestState::WaitRequest => { - let client = &self.inner.client; + let client = self.inner.client.clone(); + let c_ref = &client; let reqwest = client.reqwest(); let limits = self.inner.ratelimit.as_ref().and_then(|key| { - client.ratelimit(key.clone()) + c_ref.ratelimit(key.clone()) }); if let Some(limits) = limits { @@ -645,53 +692,30 @@ impl Future for ApiRequest { let builder = client.apply_standard_headers(builder); let r = builder.query(&self.inner.params); - let limits_err = limits.clone(); - let limits_ok = limits.clone(); + let key_err = self.inner.ratelimit.clone(); + let key_ok = self.inner.ratelimit.clone(); + let client_err = client.clone(); + let client_ok = client.clone(); - let f = r.send() + let f = + r.send() .map_err(move |err| { - - if let Some(limits) = limits_err { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; + if let Some(key) = key_err { + if let Some(limits) = client_err.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + } } err }) .map(move |mut response| { println!("{:?}", response); - if let Some(limits) = limits_ok { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; - - let maybe_limit = - response.headers() - .get(&mut_limits.header_limit) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(limit) = maybe_limit { - mut_limits.limit = limit; - } - - let maybe_remaining = - response.headers() - .get(&mut_limits.header_remaining) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(limit) = maybe_remaining { - mut_limits.remaining = limit; - } - - let maybe_reset = - response.headers() - .get(&mut_limits.header_reset) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(reset) = maybe_reset { - mut_limits.reset = Some(reset); + if let Some(key) = key_ok { + if let Some(limits) = client_ok.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + mut_limits.update_from_headers(response.headers()); } } @@ -700,11 +724,10 @@ impl Future for ApiRequest { .and_then(|json| { json }); - self.state = RequestState::PollParse(Box::new(f)); }, RequestState::PollParse(future) => { - let res = try_ready!(future.poll()); + let res = retry_ready!(self, future.poll()); return Ok(Async::Ready(res)); } } -- cgit v1.2.3