From 2f1682162174f5fee4f4297f616e8f66b7c70dca Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 24 Dec 2018 23:08:26 +0000 Subject: Fix lifetimes on ratelimit trait --- src/helix/mod.rs | 161 ++++++++++++++++++++++++------------------ src/helix/models.rs | 46 ++++++++---- src/helix/namespaces/clips.rs | 3 +- 3 files changed, 126 insertions(+), 84 deletions(-) diff --git a/src/helix/mod.rs b/src/helix/mod.rs index c6eb1f5..5c51c22 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -9,7 +9,6 @@ use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -use std::iter::FromIterator; use crate::error::ConditionError; @@ -68,7 +67,7 @@ impl ClientTrait for Client { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { use self::ClientType::*; match self.inner.as_ref() { Unauth(inner) => inner.ratelimit(key), @@ -120,7 +119,7 @@ pub trait ClientTrait { fn id<'a>(&'a self) -> &'a str; fn domain<'a>(&'a self) -> &'a str; - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>; + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>; fn authenticated(&self) -> bool; fn scopes(&self) -> Vec; @@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient { &self.domain } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { - None + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { + self.ratelimits.get(&key) } fn authenticated(&self) -> bool { @@ -171,7 +170,7 @@ impl ClientTrait for AuthClient { } } - fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> { + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { match self.previous.inner.as_ref() { ClientType::Auth(auth) => auth.ratelimit(key), ClientType::Unauth(unauth) => unauth.ratelimit(key), @@ -201,17 +200,6 @@ struct AuthStateRef { state: AuthState, } -struct ClientRef { - id: String, - secret: Option, - reqwest: ReqwestClient, - domain: &'static str, - ratelimits: RatelimitMap, - auth_state: Mutex, - auth_barrier: Barrier, - previous: Option, -} - impl Client { pub fn new(id: &str) -> Client { let client = ReqwestClient::new(); @@ -359,10 +347,6 @@ impl AuthClientBuilder { use std::collections::BTreeMap; use reqwest::Method; -struct Request { - inner: Arc, -} - struct RequestRef { url: String, params: BTreeMap, @@ -382,10 +366,13 @@ enum RequestState { pub struct ApiRequest { inner: Arc, - state: RequestState + state: RequestState, + attempt: u32, + max_attempts: u32, } -impl ApiRequest { +use self::models::PaginationTrait; +impl ApiRequest { pub fn new(url: String, params: BTreeMap<&str, &str>, @@ -407,7 +394,9 @@ impl ApiRequest { method: method, ratelimit: ratelimit, }), - state: RequestState::Uninitalized + state: RequestState::Uninitalized, + attempt: 0, + max_attempts: 1, } } } @@ -449,6 +438,8 @@ impl Ratelimit { } } + + #[derive(Debug, Clone)] pub struct RatelimitRef { limit: i32, @@ -460,6 +451,41 @@ pub struct RatelimitRef { header_reset: String, } + +impl RatelimitRef { + pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) { + let maybe_limit = + headers + .get(&self.header_limit) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(limit) = maybe_limit { + self.limit = limit; + } + + let maybe_remaining = + headers + .get(&self.header_remaining) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(limit) = maybe_remaining { + self.remaining = limit; + } + + let maybe_reset = + headers + .get(&self.header_reset) + .and_then(|x| x.to_str().ok()) + .and_then(|x| x.parse::().ok()); + + if let Some(reset) = maybe_reset { + self.reset = Some(reset); + } + } +} + use futures::future::SharedError; use crate::sync::barrier::Barrier; use crate::sync::waiter::Waiter; @@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter { * get that error */ +/* Macro ripped directly from try_ready and simplies retries if any error occurs + * and there are remaning retry attempt + */ +#[macro_export] +macro_rules! retry_ready { + ($s:expr, $e:expr) => (match $e { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady), + Err(e) => { + if $s.attempt < $s.max_attempts { + $s.attempt += 1; + $s.state = RequestState::Uninitalized; + continue; + } else { + return Err(e.into()); + } + } + }) +} + impl Future for ApiRequest { type Item = T; type Error = Error; @@ -600,7 +646,7 @@ impl Future for ApiRequest { } }, RequestState::WaitAuth(auth) => { - let _waiter = try_ready!(auth.poll()); + let _waiter = retry_ready!(self, auth.poll()); self.state = RequestState::SetupRatelimit; }, RequestState::SetupRatelimit => { @@ -624,16 +670,17 @@ impl Future for ApiRequest { } }, RequestState::WaitLimit(limit) => { - let _waiter = try_ready!(limit.poll()); + let _waiter = retry_ready!(self, limit.poll()); self.state = RequestState::WaitRequest; }, RequestState::WaitRequest => { - let client = &self.inner.client; + let client = self.inner.client.clone(); + let c_ref = &client; let reqwest = client.reqwest(); let limits = self.inner.ratelimit.as_ref().and_then(|key| { - client.ratelimit(key.clone()) + c_ref.ratelimit(key.clone()) }); if let Some(limits) = limits { @@ -645,53 +692,30 @@ impl Future for ApiRequest { let builder = client.apply_standard_headers(builder); let r = builder.query(&self.inner.params); - let limits_err = limits.clone(); - let limits_ok = limits.clone(); + let key_err = self.inner.ratelimit.clone(); + let key_ok = self.inner.ratelimit.clone(); + let client_err = client.clone(); + let client_ok = client.clone(); - let f = r.send() + let f = + r.send() .map_err(move |err| { - - if let Some(limits) = limits_err { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; + if let Some(key) = key_err { + if let Some(limits) = client_err.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + } } err }) .map(move |mut response| { println!("{:?}", response); - if let Some(limits) = limits_ok { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; - - let maybe_limit = - response.headers() - .get(&mut_limits.header_limit) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(limit) = maybe_limit { - mut_limits.limit = limit; - } - - let maybe_remaining = - response.headers() - .get(&mut_limits.header_remaining) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(limit) = maybe_remaining { - mut_limits.remaining = limit; - } - - let maybe_reset = - response.headers() - .get(&mut_limits.header_reset) - .and_then(|x| x.to_str().ok()) - .and_then(|x| x.parse::().ok()); - - if let Some(reset) = maybe_reset { - mut_limits.reset = Some(reset); + if let Some(key) = key_ok { + if let Some(limits) = client_ok.ratelimit(key) { + let mut mut_limits = limits.inner.lock().unwrap(); + mut_limits.inflight = mut_limits.inflight - 1; + mut_limits.update_from_headers(response.headers()); } } @@ -700,11 +724,10 @@ impl Future for ApiRequest { .and_then(|json| { json }); - self.state = RequestState::PollParse(Box::new(f)); }, RequestState::PollParse(future) => { - let res = try_ready!(future.poll()); + let res = retry_ready!(self, future.poll()); return Ok(Async::Ready(res)); } } diff --git a/src/helix/models.rs b/src/helix/models.rs index 86b7560..bdb8438 100644 --- a/src/helix/models.rs +++ b/src/helix/models.rs @@ -5,14 +5,29 @@ use url::Url; use chrono::{DateTime, Utc}; use super::types::{UserId, VideoId, ChannelId}; +pub trait PaginationTrait { + fn cursor<'a>(&'a self) -> &'a Option; + fn set_request(&mut self); +} + #[derive(Debug, Deserialize)] pub struct DataContainer { pub data: Vec } -#[derive(Debug, Deserialize)] -pub struct Cursor { - cursor: String +impl PaginationTrait for DataContainer { + fn cursor<'a>(&'a self) -> &'a Option { &None } + fn set_request(&mut self) {} +} + +impl PaginationTrait for PaginationContainer { + fn cursor<'a>(&'a self) -> &'a Option { &self.pagination } + fn set_request(&mut self) {} +} + +impl PaginationTrait for Credentials { + fn cursor<'a>(&'a self) -> &'a Option { &None } + fn set_request(&mut self) {} } #[derive(Debug, Deserialize)] @@ -21,6 +36,21 @@ pub struct PaginationContainer { pub pagination: Option } +#[derive(Debug, Deserialize)] +pub struct Cursor { + cursor: String +} + +#[derive(Debug, Deserialize)] +pub struct Credentials { + pub access_token: String, + pub refresh_token: Option, + pub expires_in: u32, + pub scope: Option>, + pub token_type: String, +} + + #[derive(Debug, Deserialize)] pub struct Video { pub id: VideoId, @@ -80,13 +110,3 @@ pub struct Clip { pub thumbnail_url: Url, pub view_count: i32, } - - -#[derive(Debug, Deserialize)] -pub struct Credentials { - pub access_token: String, - pub refresh_token: Option, - pub expires_in: u32, - pub scope: Option>, - pub token_type: String, -} diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs index 19293cc..a73d2e9 100644 --- a/src/helix/namespaces/clips.rs +++ b/src/helix/namespaces/clips.rs @@ -1,9 +1,8 @@ use std::collections::BTreeMap; -use super::super::models::{DataContainer, PaginationContainer, User, Video, Clip}; +use super::super::models::{DataContainer, Clip}; use super::super::Client; use super::super::ClientTrait; use super::super::RatelimitKey; -const API_DOMAIN: &'static str = "api.twitch.tv"; use super::Namespace; pub struct Clips {} -- cgit v1.2.3