summaryrefslogtreecommitdiff
path: root/src/helix/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/helix/mod.rs')
-rw-r--r--src/helix/mod.rs340
1 files changed, 183 insertions, 157 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index f7a09f6..c0dd2a9 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -1,17 +1,23 @@
use futures::future::Future;
use std::sync::{Arc, Mutex};
use reqwest::r#async::Client as ReqwestClient;
-pub use super::types;
+use std::collections::HashSet;
+use super::error::Error;
use std::marker::PhantomData;
-pub mod models;
-pub mod namespaces;
+use futures::future::Shared;
+use futures::Poll;
+use serde::de::DeserializeOwned;
+use futures::Async;
+use futures::try_ready;
-use std::collections::HashSet;
-use futures::{Sink, Stream};
+use crate::error::ConditionError;
-use super::error::Error;
-use futures::prelude::*;
+
+pub use super::types;
+
+pub mod models;
+pub mod namespaces;
pub struct Namespace<T> {
client: Client,
@@ -44,33 +50,27 @@ pub struct Client {
inner: Arc<ClientRef>,
}
-use reqwest::r#async::Response;
-use futures::sync::oneshot;
-
#[derive(Clone, PartialEq)]
enum AuthState {
Unauth,
Auth,
}
-use futures::future::Shared;
struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
previous: Option<Client>,
- auth_barrier: Barrier<AuthWaiter>,
auth_state: AuthState,
- auth_future: Option<Shared<Box<Future<Item=(), Error=()> + Send>>>
+ auth_future: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>>
}
-use futures::sync::mpsc;
-
-
struct ClientRef {
id: String,
secret: Option<String>,
client: ReqwestClient,
+ auth_barrier: Barrier,
+ ratelimit_default: Ratelimit,
inner: Mutex<MutClientRef>,
}
@@ -80,6 +80,10 @@ impl Client {
Client::new_with_client(id, client)
}
+ pub fn default_ratelimit(&self) -> Ratelimit {
+ self.inner.ratelimit_default.clone()
+ }
+
pub fn new_with_client(id: &str, client: ReqwestClient) -> Client {
Client {
@@ -87,9 +91,10 @@ impl Client {
id: id.to_owned(),
client: client,
secret: None,
+ auth_barrier: Barrier::new(),
+ ratelimit_default: Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset"),
inner: Mutex::new(
MutClientRef {
- auth_barrier: Barrier::new(),
token: None,
scopes: Vec::new(),
previous: None,
@@ -201,9 +206,10 @@ impl AuthClientBuilder {
id: old_client.inner.id.clone(),
client: old_client.inner.client.clone(),
secret: Some(self.secret),
+ auth_barrier: Barrier::new(),
+ ratelimit_default: old_client.default_ratelimit(),
inner: Mutex::new (
MutClientRef {
- auth_barrier: Barrier::new(),
token: self.token,
scopes: Vec::new(),
previous: Some(old_client),
@@ -252,7 +258,9 @@ struct RequestRef {
enum RequestState<T> {
Uninitalized,
- WaitAuth(AuthWaiter2),
+ WaitAuth(WaiterState<AuthWaiter>),
+ WaitLimit(WaiterState<RatelimitWaiter>),
+ WaitRequest,
PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
@@ -267,6 +275,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
params: BTreeMap<String, String>,
client: Client,
method: Method,
+ ratelimit: Option<Ratelimit>,
) -> ApiRequest<T>
{
ApiRequest {
@@ -275,7 +284,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
params: params,
client: client,
method: method,
- ratelimit: None,
+ ratelimit: ratelimit,
}),
state: RequestState::Uninitalized
}
@@ -283,140 +292,113 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
}
-use futures::Poll;
-use serde::de::DeserializeOwned;
-use futures::Async;
-use futures::try_ready;
-
-
-struct AuthWaiter {
- waiter: Client,
-}
-
-
pub struct RatelimitWaiter {
limit: Ratelimit,
- request: Request,
}
-#[derive(Debug, Clone)]
+#[derive(Clone)]
pub struct Ratelimit {
- inner: Arc<Mutex<RatelimitRef>>
+ inner: Arc<Mutex<RatelimitRef>>,
+ barrier: Barrier,
+}
+
+impl Ratelimit {
+ pub fn new(limit: i32,
+ header_limit: &str,
+ header_remaining: &str,
+ header_reset: &str)
+ -> Ratelimit
+ {
+ Ratelimit {
+ inner: Arc::new(
+ Mutex::new(
+ RatelimitRef {
+ limit: limit,
+ remaining: limit,
+ inflight: 0,
+ reset: None,
+ header_limit: header_limit.to_owned(),
+ header_remaining: header_remaining.to_owned(),
+ header_reset: header_reset.to_owned(),
+ }
+ )
+ ),
+ barrier: Barrier::new(),
+ }
+ }
}
#[derive(Debug, Clone)]
pub struct RatelimitRef {
+ limit: i32,
remaining: i32,
inflight: i32,
- quota: i32,
reset: Option<u32>,
+ header_limit: String,
+ header_remaining: String,
+ header_reset: String,
}
-struct AuthWaiter2 {
- waiter: Client,
- shared_future: Option<(Shared<Box<Future<Item=(), Error=()> + Send>>)>,
+use futures::future::SharedError;
+use crate::sync::barrier::Barrier;
+use crate::sync::waiter::Waiter;
+
+struct WaiterState<W: Waiter> {
polling: bool,
+ shared_future: Option<(Shared<Box< Future<Item=(), Error=ConditionError> + Send>>)>,
+ waiter: W,
+ barrier: Barrier,
}
+impl<W: Waiter> WaiterState<W> {
+ fn new(waiter: W, barrier: &Barrier) -> WaiterState<W> {
+ WaiterState {
+ polling: false,
+ shared_future: None,
+ waiter: waiter,
+ barrier: barrier.clone(),
+ }
+ }
+}
-use futures::future::{SharedError, SharedItem};
-impl Future for AuthWaiter2 {
- type Item = ();
- type Error = ();
+impl<W: Waiter> Future for WaiterState<W> {
+ type Item = <W as Waiter>::Item;
+ type Error = <W as Waiter>::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
- let blocked = self.blocked();
+ let blocked = self.waiter.blocked();
if blocked && !self.polling {
- println!("not polling and blocked");
- let fut = self.condition();
+ let fut = self.barrier.condition(&self.waiter);
self.shared_future = Some(fut);
self.polling = true;
} else if blocked || self.polling {
- println!("polling");
let f = self.shared_future.as_mut().unwrap();
- let r = f.poll();
- if let Ok(Async::NotReady) = r {
- return Ok(Async::NotReady);
- }
+ try_ready!(f.poll());
self.polling = false;
} else {
- println!("okay");
- return Ok(Async::Ready(()));
+ return Ok(Async::Ready(<W as Waiter>::Item::default()));
}
}
}
}
-impl AuthWaiter2 {
-
- fn blocked(&self) -> bool {
- let mut_client = self.waiter.inner.inner.lock().unwrap();
- mut_client.auth_state == AuthState::Unauth
- }
-
- fn condition(&self)
- -> Shared<Box<Future<Item=(), Error=()> + Send>>
- {
- /*NOTE: careful with the creation of new_condition
- * since it may lead to a deadlock
- */
- let new_condition = self.condition_future();
- let mut mut_client = self.waiter.inner.inner.lock().unwrap();
- let maybe_future = &mut mut_client.auth_future;
-
- let new_condition_clone = new_condition.clone();
- let f = maybe_future.get_or_insert_with(|| {
- new_condition
- });
-
- let f =
- if let Some(_) = f.peek() {
- maybe_future.replace(new_condition_clone);
- maybe_future.as_ref().unwrap()
- } else { f };
- f.clone()
- }
-
- fn condition_future(&self) ->
- Shared<Box<Future<Item=(), Error=()> + Send>> {
- let bottom_client = self.waiter.get_bottom_client();
- let secret = self.waiter.inner.secret.as_ref().unwrap();
- let client = self.waiter.clone();
- let auth_future =
- bottom_client
- .auth()
- .client_credentials(secret)
- .map(move |credentials| {
- println!("{:?}", credentials);
- let mut mut_client = client.inner.inner.lock().unwrap();
- mut_client.auth_state = AuthState::Auth;
- mut_client.token = Some(credentials.access_token.clone());
- ()
- })
- .map_err(|_| ());
-
- Future::shared(Box::new(auth_future))
- }
+struct AuthWaiter {
+ waiter: Client,
}
-use crate::sync::waiter::Waiter;
-use crate::sync::barrier::{BarrierSync, Barrier};
-
impl Waiter for AuthWaiter {
- type Item = Self;
- type Error = Error;
- type ConditionError = ();
+ type Item = ();
+ type Error = ConditionError;
fn blocked(&self) -> bool {
let mut_client = self.waiter.inner.inner.lock().unwrap();
mut_client.auth_state == AuthState::Unauth
}
- fn condition_poller(&self)
- -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
- {
+ fn condition(&self) ->
+ Shared<Box<Future<Item=(), Error=ConditionError> + Send>> {
let bottom_client = self.waiter.get_bottom_client();
let secret = self.waiter.inner.secret.as_ref().unwrap();
let client = self.waiter.clone();
@@ -432,55 +414,32 @@ impl Waiter for AuthWaiter {
mut_client.token = Some(credentials.access_token.clone());
()
})
- .map_err(|_| ());
-
- Box::new(auth_future)
- }
+ .map_err(|_| ConditionError{});
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
- Box::new(futures::future::ok(self))
+ Future::shared(Box::new(auth_future))
}
}
impl Waiter for RatelimitWaiter {
- type Item = reqwest::r#async::Response;
- type Error = Error;
- type ConditionError = ();
+ type Item = ();
+ type Error = ConditionError;
fn blocked(&self) -> bool {
let limits = self.limit.inner.lock().unwrap();
+ println!("{}, {}, {}", limits.limit, limits.remaining, limits.inflight);
limits.remaining - limits.inflight <= 0
}
- fn condition_poller(&self)
- -> Box<Future<Item=(), Error=Self::ConditionError> + Send>
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
{
/*TODO: Really basic for now*/
use futures_timer::Delay;
use std::time::Duration;
- Box::new(
- Delay::new(Duration::from_secs(60))
- .map_err(|_| ())
- )
+ Future::shared(Box::new(
+ Delay::new(Duration::from_secs(60)).map_err(|_| ConditionError{})
+ ))
}
-
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send> {
- let client = &self.request.inner.client;
- let reqwest = client.client();
- let method = &self.request.inner.method;
- let url = &self.request.inner.url;
- let params = &self.request.inner.params;
-
- let builder = reqwest.request(method.clone(), url);
- let builder = client.apply_standard_headers(builder);
- let r = builder.query(params);
-
- let limits = &self.limit.clone();
-
- /* TODO update limits */
- Box::new(r.send().map_err(|err| Error::from(err)))
- }
-
}
/* Todo: If the polled futures returns an error than all the waiters should
@@ -495,33 +454,101 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
loop {
match &mut self.state {
RequestState::Uninitalized => {
- let mut mut_client = self.inner.client.inner.inner.lock().unwrap();
- let waiter = AuthWaiter {
- waiter: self.inner.client.clone()
- };
+ let mut_client = self.inner.client.inner.inner.lock().unwrap();
- let waiter2 = AuthWaiter2 {
+ let waiter = AuthWaiter {
waiter: self.inner.client.clone(),
- shared_future: None,
- polling: false,
};
- let f = waiter2;
+ let f = WaiterState::new(waiter,
+ &self.inner.client.inner.auth_barrier);
self.state = RequestState::WaitAuth(f);
},
- RequestState::WaitAuth(chan) => {
- let _waiter = try_ready!(chan.poll());
-
+ RequestState::WaitAuth(auth) => {
+ let _waiter = try_ready!(auth.poll());
+ match self.inner.ratelimit {
+ Some(ref limit) => {
+ let barrier = limit.barrier.clone();
+ let waiter = RatelimitWaiter {
+ limit: limit.clone(),
+ };
+ let f = WaiterState::new(waiter,
+ &barrier);
+ self.state = RequestState::WaitLimit(f);
+ },
+ None => {
+ self.state = RequestState::WaitRequest;
+ }
+ }
+ },
+ RequestState::WaitLimit(limit) => {
+ let _waiter = try_ready!(limit.poll());
+ self.state = RequestState::WaitRequest;
+ },
+ RequestState::WaitRequest => {
let client = &self.inner.client;
let reqwest = client.client();
+ if let Some(limits) = &self.inner.ratelimit {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight + 1;
+ }
+
let builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
let builder = client.apply_standard_headers(builder);
let r = builder.query(&self.inner.params);
+ /*TODO add 1 to inflight*/
+
+ let ratelimit_err = self.inner.ratelimit.clone();
+ let ratelimit_ok = self.inner.ratelimit.clone();
let f = r.send()
+ .map_err(|err| {
+
+ if let Some(limits) = ratelimit_err {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ }
+
+ err
+ })
.map(|mut response| {
println!("{:?}", response);
+ if let Some(limits) = ratelimit_ok {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+
+ let maybe_limit =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_limit {
+ mut_limits.limit = limit;
+ }
+
+ let maybe_remaining =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_remaining {
+ mut_limits.remaining = limit;
+ }
+
+ let maybe_reset =
+ response.headers()
+ .get(&mut_limits.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<u32>().ok());
+
+ if let Some(reset) = maybe_reset {
+ mut_limits.reset = Some(reset);
+ }
+ }
+
response.json::<T>()
})
.and_then(|json| {
@@ -529,8 +556,7 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
});
self.state = RequestState::PollParse(Box::new(f));
- continue;
- }
+ },
RequestState::PollParse(future) => {
let res = try_ready!(future.poll());
return Ok(Async::Ready(res));