diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-24 23:08:26 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-24 23:08:26 +0000 |
commit | 2f1682162174f5fee4f4297f616e8f66b7c70dca (patch) | |
tree | dad5dff552442b1801553c9944a1967121e667cb | |
parent | 298806448db4a4e74306ec648bfc0e43a76c6bc3 (diff) |
Fix lifetimes on ratelimit trait
-rw-r--r-- | src/helix/mod.rs | 161 | ||||
-rw-r--r-- | src/helix/models.rs | 46 | ||||
-rw-r--r-- | src/helix/namespaces/clips.rs | 3 |
3 files changed, 126 insertions, 84 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs index c6eb1f5..5c51c22 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -9,7 +9,6 @@ use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -use std::iter::FromIterator; use crate::error::ConditionError; @@ -68,7 +67,7 @@ impl ClientTrait for Client { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { use self::ClientType::*; match self.inner.as_ref() { Unauth(inner) => inner.ratelimit(key), @@ -120,7 +119,7 @@ pub trait ClientTrait { fn id<'a>(&'a self) -> &'a str; fn domain<'a>(&'a self) -> &'a str; - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>; + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>; fn authenticated(&self) -> bool; fn scopes(&self) -> Vec<Scope>; @@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient { &self.domain } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { - None + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { + self.ratelimits.get(&key) } fn authenticated(&self) -> bool { @@ -171,7 +170,7 @@ impl ClientTrait for AuthClient { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { match self.previous.inner.as_ref() { ClientType::Auth(auth) => auth.ratelimit(key), ClientType::Unauth(unauth) => unauth.ratelimit(key), @@ -201,17 +200,6 @@ struct AuthStateRef { state: AuthState, } -struct ClientRef { - id: String, - secret: Option<String>, - reqwest: ReqwestClient, - domain: &'static str, - ratelimits: RatelimitMap, - auth_state: Mutex<AuthStateRef>, - auth_barrier: Barrier, - previous: Option<Client>, -} - impl Client { pub fn new(id: &str) -> Client { let client = ReqwestClient::new(); @@ -359,10 +347,6 @@ impl AuthClientBuilder { use std::collections::BTreeMap; use reqwest::Method; -struct Request { - inner: Arc<RequestRef>, -} - struct RequestRef { url: String, params: BTreeMap<String, String>, @@ -382,10 +366,13 @@ enum RequestState<T> { pub struct ApiRequest<T> { inner: Arc<RequestRef>, - state: RequestState<T> + state: RequestState<T>, + attempt: u32, + max_attempts: u32, } -impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { +use self::models::PaginationTrait; +impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> { pub fn new(url: String, params: BTreeMap<&str, &str>, @@ -407,7 +394,9 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { method: method, ratelimit: ratelimit, }), - state: RequestState::Uninitalized + state: RequestState::Uninitalized, + attempt: 0, + max_attempts: 1, } } } @@ -449,6 +438,8 @@ impl Ratelimit { } } + + #[derive(Debug, Clone)] pub struct RatelimitRef { limit: i32, @@ -460,6 +451,41 @@ pub struct RatelimitRef { header_reset: String, } + +impl RatelimitRef { + pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) { + let maybe_limit = + headers + .get(&self.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_limit { + self.limit = limit; + } + + let maybe_remaining = + headers + .get(&self.header_remaining) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<i32>().ok()); + + if let Some(limit) = maybe_remaining { + self.remaining = limit; + } + + let maybe_reset = + headers + .get(&self.header_reset) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::<u32>().ok()); + + if let Some(reset) = maybe_reset { + self.reset = Some(reset); + } + } +} + use futures::future::SharedError; use crate::sync::barrier::Barrier; use crate::sync::waiter::Waiter; @@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter { * get that error */ +/* Macro ripped directly from try_ready and simplies retries if any error occurs + * and there are remaning retry attempt + */ +#[macro_export] +macro_rules! retry_ready { + ($s:expr, $e:expr) => (match $e { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady), + Err(e) => { + if $s.attempt < $s.max_attempts { + $s.attempt += 1; + $s.state = RequestState::Uninitalized; + continue; + } else { + return Err(e.into()); + } + } + }) +} + impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; type Error = Error; @@ -600,7 +646,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { } }, RequestState::WaitAuth(auth) => { - let _waiter = try_ready!(auth.poll()); + let _waiter = retry_ready!(self, auth.poll()); self.state = RequestState::SetupRatelimit; }, RequestState::SetupRatelimit => { @@ -624,16 +670,17 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { } }, RequestState::WaitLimit(limit) => { - let _waiter = try_ready!(limit.poll()); + let _waiter = retry_ready!(self, limit.poll()); self.state = RequestState::WaitRequest; }, RequestState::WaitRequest => { - let client = &self.inner.client; + let client = self.inner.client.clone(); + let c_ref = &client; let reqwest = client.reqwest(); let limits = self.inner.ratelimit.as_ref().and_then(|key| { - client.ratelimit(key.clone()) + c_ref.ratelimit(key.clone()) }); if let Some(limits) = limits { @@ -645,53 +692,30 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { let builder = client.apply_standard_headers(builder); let r = builder.query(&self.inner.params); - let limits_err = limits.clone(); - let limits_ok = limits.clone(); + let key_err = self.inner.ratelimit.clone(); + let key_ok = self.inner.ratelimit.clone(); + let client_err = client.clone(); + let client_ok = client.clone(); - let f = r.send() + let f = + r.send() .map_err(move |err| { - - if let Some(limits) = limits_err { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; + if let Some(key) = key_err { + if let Some(limits) = client_err.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + } } err }) .map(move |mut response| { println!("{:?}", response); - if let Some(limits) = limits_ok { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; - - let maybe_limit = - response.headers() - .get(&mut_limits.header_limit) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<i32>().ok()); - - if let Some(limit) = maybe_limit { - mut_limits.limit = limit; - } - - let maybe_remaining = - response.headers() - .get(&mut_limits.header_remaining) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<i32>().ok()); - - if let Some(limit) = maybe_remaining { - mut_limits.remaining = limit; - } - - let maybe_reset = - response.headers() - .get(&mut_limits.header_reset) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::<u32>().ok()); - - if let Some(reset) = maybe_reset { - mut_limits.reset = Some(reset); + if let Some(key) = key_ok { + if let Some(limits) = client_ok.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + mut_limits.update_from_headers(response.headers()); } } @@ -700,11 +724,10 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { .and_then(|json| { json }); - self.state = RequestState::PollParse(Box::new(f)); }, RequestState::PollParse(future) => { - let res = try_ready!(future.poll()); + let res = retry_ready!(self, future.poll()); return Ok(Async::Ready(res)); } } diff --git a/src/helix/models.rs b/src/helix/models.rs index 86b7560..bdb8438 100644 --- a/src/helix/models.rs +++ b/src/helix/models.rs @@ -5,14 +5,29 @@ use url::Url; use chrono::{DateTime, Utc}; use super::types::{UserId, VideoId, ChannelId}; +pub trait PaginationTrait { + fn cursor<'a>(&'a self) -> &'a Option<Cursor>; + fn set_request(&mut self); +} + #[derive(Debug, Deserialize)] pub struct DataContainer<T> { pub data: Vec<T> } -#[derive(Debug, Deserialize)] -pub struct Cursor { - cursor: String +impl<T> PaginationTrait for DataContainer<T> { + fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None } + fn set_request(&mut self) {} +} + +impl<T> PaginationTrait for PaginationContainer<T> { + fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &self.pagination } + fn set_request(&mut self) {} +} + +impl PaginationTrait for Credentials { + fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None } + fn set_request(&mut self) {} } #[derive(Debug, Deserialize)] @@ -22,6 +37,21 @@ pub struct PaginationContainer<T> { } #[derive(Debug, Deserialize)] +pub struct Cursor { + cursor: String +} + +#[derive(Debug, Deserialize)] +pub struct Credentials { + pub access_token: String, + pub refresh_token: Option<String>, + pub expires_in: u32, + pub scope: Option<Vec<String>>, + pub token_type: String, +} + + +#[derive(Debug, Deserialize)] pub struct Video { pub id: VideoId, pub user_id: UserId, @@ -80,13 +110,3 @@ pub struct Clip { pub thumbnail_url: Url, pub view_count: i32, } - - -#[derive(Debug, Deserialize)] -pub struct Credentials { - pub access_token: String, - pub refresh_token: Option<String>, - pub expires_in: u32, - pub scope: Option<Vec<String>>, - pub token_type: String, -} diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs index 19293cc..a73d2e9 100644 --- a/src/helix/namespaces/clips.rs +++ b/src/helix/namespaces/clips.rs @@ -1,9 +1,8 @@ use std::collections::BTreeMap; -use super::super::models::{DataContainer, PaginationContainer, User, Video, Clip}; +use super::super::models::{DataContainer, Clip}; use super::super::Client; use super::super::ClientTrait; use super::super::RatelimitKey; -const API_DOMAIN: &'static str = "api.twitch.tv"; use super::Namespace; pub struct Clips {} |