summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-24 23:08:26 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-24 23:08:26 +0000
commit2f1682162174f5fee4f4297f616e8f66b7c70dca (patch)
treedad5dff552442b1801553c9944a1967121e667cb
parent298806448db4a4e74306ec648bfc0e43a76c6bc3 (diff)
Fix lifetimes on ratelimit trait
-rw-r--r--src/helix/mod.rs161
-rw-r--r--src/helix/models.rs46
-rw-r--r--src/helix/namespaces/clips.rs3
3 files changed, 126 insertions, 84 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index c6eb1f5..5c51c22 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -9,7 +9,6 @@ use futures::Poll;
use serde::de::DeserializeOwned;
use futures::Async;
use futures::try_ready;
-use std::iter::FromIterator;
use crate::error::ConditionError;
@@ -68,7 +67,7 @@ impl ClientTrait for Client {
}
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
use self::ClientType::*;
match self.inner.as_ref() {
Unauth(inner) => inner.ratelimit(key),
@@ -120,7 +119,7 @@ pub trait ClientTrait {
fn id<'a>(&'a self) -> &'a str;
fn domain<'a>(&'a self) -> &'a str;
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit>;
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>;
fn authenticated(&self) -> bool;
fn scopes(&self) -> Vec<Scope>;
@@ -135,8 +134,8 @@ impl ClientTrait for UnauthClient {
&self.domain
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
- None
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ self.ratelimits.get(&key)
}
fn authenticated(&self) -> bool {
@@ -171,7 +170,7 @@ impl ClientTrait for AuthClient {
}
}
- fn ratelimit<'a>(&self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
match self.previous.inner.as_ref() {
ClientType::Auth(auth) => auth.ratelimit(key),
ClientType::Unauth(unauth) => unauth.ratelimit(key),
@@ -201,17 +200,6 @@ struct AuthStateRef {
state: AuthState,
}
-struct ClientRef {
- id: String,
- secret: Option<String>,
- reqwest: ReqwestClient,
- domain: &'static str,
- ratelimits: RatelimitMap,
- auth_state: Mutex<AuthStateRef>,
- auth_barrier: Barrier,
- previous: Option<Client>,
-}
-
impl Client {
pub fn new(id: &str) -> Client {
let client = ReqwestClient::new();
@@ -359,10 +347,6 @@ impl AuthClientBuilder {
use std::collections::BTreeMap;
use reqwest::Method;
-struct Request {
- inner: Arc<RequestRef>,
-}
-
struct RequestRef {
url: String,
params: BTreeMap<String, String>,
@@ -382,10 +366,13 @@ enum RequestState<T> {
pub struct ApiRequest<T> {
inner: Arc<RequestRef>,
- state: RequestState<T>
+ state: RequestState<T>,
+ attempt: u32,
+ max_attempts: u32,
}
-impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
+use self::models::PaginationTrait;
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
pub fn new(url: String,
params: BTreeMap<&str, &str>,
@@ -407,7 +394,9 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
method: method,
ratelimit: ratelimit,
}),
- state: RequestState::Uninitalized
+ state: RequestState::Uninitalized,
+ attempt: 0,
+ max_attempts: 1,
}
}
}
@@ -449,6 +438,8 @@ impl Ratelimit {
}
}
+
+
#[derive(Debug, Clone)]
pub struct RatelimitRef {
limit: i32,
@@ -460,6 +451,41 @@ pub struct RatelimitRef {
header_reset: String,
}
+
+impl RatelimitRef {
+ pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
+ let maybe_limit =
+ headers
+ .get(&self.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_limit {
+ self.limit = limit;
+ }
+
+ let maybe_remaining =
+ headers
+ .get(&self.header_remaining)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_remaining {
+ self.remaining = limit;
+ }
+
+ let maybe_reset =
+ headers
+ .get(&self.header_reset)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<u32>().ok());
+
+ if let Some(reset) = maybe_reset {
+ self.reset = Some(reset);
+ }
+ }
+}
+
use futures::future::SharedError;
use crate::sync::barrier::Barrier;
use crate::sync::waiter::Waiter;
@@ -576,6 +602,26 @@ impl Waiter for RatelimitWaiter {
* get that error
*/
+/* Macro ripped directly from try_ready and simplies retries if any error occurs
+ * and there are remaning retry attempt
+ */
+#[macro_export]
+macro_rules! retry_ready {
+ ($s:expr, $e:expr) => (match $e {
+ Ok(futures::prelude::Async::Ready(t)) => t,
+ Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady),
+ Err(e) => {
+ if $s.attempt < $s.max_attempts {
+ $s.attempt += 1;
+ $s.state = RequestState::Uninitalized;
+ continue;
+ } else {
+ return Err(e.into());
+ }
+ }
+ })
+}
+
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
type Error = Error;
@@ -600,7 +646,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
}
},
RequestState::WaitAuth(auth) => {
- let _waiter = try_ready!(auth.poll());
+ let _waiter = retry_ready!(self, auth.poll());
self.state = RequestState::SetupRatelimit;
},
RequestState::SetupRatelimit => {
@@ -624,16 +670,17 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
}
},
RequestState::WaitLimit(limit) => {
- let _waiter = try_ready!(limit.poll());
+ let _waiter = retry_ready!(self, limit.poll());
self.state = RequestState::WaitRequest;
},
RequestState::WaitRequest => {
- let client = &self.inner.client;
+ let client = self.inner.client.clone();
+ let c_ref = &client;
let reqwest = client.reqwest();
let limits =
self.inner.ratelimit.as_ref().and_then(|key| {
- client.ratelimit(key.clone())
+ c_ref.ratelimit(key.clone())
});
if let Some(limits) = limits {
@@ -645,53 +692,30 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
let builder = client.apply_standard_headers(builder);
let r = builder.query(&self.inner.params);
- let limits_err = limits.clone();
- let limits_ok = limits.clone();
+ let key_err = self.inner.ratelimit.clone();
+ let key_ok = self.inner.ratelimit.clone();
+ let client_err = client.clone();
+ let client_ok = client.clone();
- let f = r.send()
+ let f =
+ r.send()
.map_err(move |err| {
-
- if let Some(limits) = limits_err {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
+ if let Some(key) = key_err {
+ if let Some(limits) = client_err.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ }
}
err
})
.map(move |mut response| {
println!("{:?}", response);
- if let Some(limits) = limits_ok {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
-
- let maybe_limit =
- response.headers()
- .get(&mut_limits.header_limit)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_limit {
- mut_limits.limit = limit;
- }
-
- let maybe_remaining =
- response.headers()
- .get(&mut_limits.header_remaining)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_remaining {
- mut_limits.remaining = limit;
- }
-
- let maybe_reset =
- response.headers()
- .get(&mut_limits.header_reset)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<u32>().ok());
-
- if let Some(reset) = maybe_reset {
- mut_limits.reset = Some(reset);
+ if let Some(key) = key_ok {
+ if let Some(limits) = client_ok.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ mut_limits.update_from_headers(response.headers());
}
}
@@ -700,11 +724,10 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
.and_then(|json| {
json
});
-
self.state = RequestState::PollParse(Box::new(f));
},
RequestState::PollParse(future) => {
- let res = try_ready!(future.poll());
+ let res = retry_ready!(self, future.poll());
return Ok(Async::Ready(res));
}
}
diff --git a/src/helix/models.rs b/src/helix/models.rs
index 86b7560..bdb8438 100644
--- a/src/helix/models.rs
+++ b/src/helix/models.rs
@@ -5,14 +5,29 @@ use url::Url;
use chrono::{DateTime, Utc};
use super::types::{UserId, VideoId, ChannelId};
+pub trait PaginationTrait {
+ fn cursor<'a>(&'a self) -> &'a Option<Cursor>;
+ fn set_request(&mut self);
+}
+
#[derive(Debug, Deserialize)]
pub struct DataContainer<T> {
pub data: Vec<T>
}
-#[derive(Debug, Deserialize)]
-pub struct Cursor {
- cursor: String
+impl<T> PaginationTrait for DataContainer<T> {
+ fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None }
+ fn set_request(&mut self) {}
+}
+
+impl<T> PaginationTrait for PaginationContainer<T> {
+ fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &self.pagination }
+ fn set_request(&mut self) {}
+}
+
+impl PaginationTrait for Credentials {
+ fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None }
+ fn set_request(&mut self) {}
}
#[derive(Debug, Deserialize)]
@@ -22,6 +37,21 @@ pub struct PaginationContainer<T> {
}
#[derive(Debug, Deserialize)]
+pub struct Cursor {
+ cursor: String
+}
+
+#[derive(Debug, Deserialize)]
+pub struct Credentials {
+ pub access_token: String,
+ pub refresh_token: Option<String>,
+ pub expires_in: u32,
+ pub scope: Option<Vec<String>>,
+ pub token_type: String,
+}
+
+
+#[derive(Debug, Deserialize)]
pub struct Video {
pub id: VideoId,
pub user_id: UserId,
@@ -80,13 +110,3 @@ pub struct Clip {
pub thumbnail_url: Url,
pub view_count: i32,
}
-
-
-#[derive(Debug, Deserialize)]
-pub struct Credentials {
- pub access_token: String,
- pub refresh_token: Option<String>,
- pub expires_in: u32,
- pub scope: Option<Vec<String>>,
- pub token_type: String,
-}
diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs
index 19293cc..a73d2e9 100644
--- a/src/helix/namespaces/clips.rs
+++ b/src/helix/namespaces/clips.rs
@@ -1,9 +1,8 @@
use std::collections::BTreeMap;
-use super::super::models::{DataContainer, PaginationContainer, User, Video, Clip};
+use super::super::models::{DataContainer, Clip};
use super::super::Client;
use super::super::ClientTrait;
use super::super::RatelimitKey;
-const API_DOMAIN: &'static str = "api.twitch.tv";
use super::Namespace;
pub struct Clips {}