summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-27 04:46:10 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-27 04:46:10 +0000
commitcb1b144e48ee357a76f551433d4886f092d259c8 (patch)
tree15606dc621e4d68f3bf3e9d27a44e6fd02a00fc5
parent2e08d0c8abbfb9f989c61acb4f6c580719a65b42 (diff)
Use a generic client for both helix and kraken
-rw-r--r--src/bin/main.rs5
-rw-r--r--src/client.rs875
-rw-r--r--src/helix/mod.rs836
-rw-r--r--src/helix/models.rs42
-rw-r--r--src/helix/namespaces/auth.rs9
-rw-r--r--src/helix/namespaces/clips.rs6
-rw-r--r--src/helix/namespaces/videos.rs8
-rw-r--r--src/lib.rs6
-rw-r--r--src/models.rs16
-rw-r--r--src/namespace.rs0
-rw-r--r--src/namespace/auth.rs55
-rw-r--r--src/namespace/mod.rs1
12 files changed, 997 insertions, 862 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs
index 982dc9d..ba1482a 100644
--- a/src/bin/main.rs
+++ b/src/bin/main.rs
@@ -15,10 +15,11 @@ fn main() {
let client = HelixClient::new(client_id);
- let authed_client =
- client
+ let authed_client = client;
+ /*
.authenticate(&env::var("TWITCH_SECRET").unwrap())
.build();
+ */
let clip = authed_client
.clips()
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..fd3af32
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,875 @@
+use futures::future::Future;
+use std::sync::{Arc, Mutex};
+use reqwest::r#async::Client as ReqwestClient;
+
+use std::collections::{HashSet, HashMap};
+use super::error::Error;
+use futures::future::Shared;
+use futures::Poll;
+use serde::de::DeserializeOwned;
+use futures::Async;
+use futures::try_ready;
+
+use crate::error::ConditionError;
+
+pub use super::types;
+
+#[derive(PartialEq, Eq, Hash, Clone)]
+pub enum RatelimitKey {
+ Default,
+}
+type RatelimitMap = HashMap<RatelimitKey, Ratelimit>;
+
+const API_DOMAIN: &'static str = "api.twitch.tv";
+
+pub trait PaginationTrait {
+ fn cursor<'a>(&'a self) -> Option<&'a str>;
+}
+
+
+#[derive(Clone)]
+pub struct Client {
+ inner: Arc<ClientType>,
+}
+
+/*TODO*/
+#[derive(PartialEq, Hash, Eq, Clone)]
+pub enum Scope {
+ UserReadEmail,
+}
+
+#[derive(Clone)]
+pub enum Version {
+ Helix,
+ Kraken,
+}
+
+impl Client {
+
+ pub fn authenticate(self, secret: &str) -> AuthClientBuilder {
+ AuthClientBuilder::new(self, secret)
+ }
+
+ pub fn deauthenticate(self) -> Client {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(_inner) => self,
+ Auth(inner) => inner.previous.clone(),
+ }
+ }
+}
+
+enum ClientType {
+ Unauth(UnauthClient),
+ Auth(AuthClient),
+}
+
+pub struct UnauthClient {
+ id: String,
+ reqwest: ReqwestClient,
+ domain: String,
+ ratelimits: RatelimitMap,
+ version: Version,
+}
+
+pub struct AuthClient {
+ secret: String,
+ auth_state: Mutex<AuthStateRef>,
+ auth_barrier: Barrier,
+ previous: Client,
+}
+
+pub trait ClientTrait {
+
+ fn id<'a>(&'a self) -> &'a str;
+ fn domain<'a>(&'a self) -> &'a str;
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>;
+
+ fn authenticated(&self) -> bool;
+ fn scopes(&self) -> Vec<Scope>;
+}
+
+impl ClientTrait for UnauthClient {
+ fn id<'a>(&'a self) -> &'a str {
+ &self.id
+ }
+
+ fn domain<'a>(&'a self) -> &'a str {
+ &self.domain
+ }
+
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ self.ratelimits.get(&key)
+ }
+
+ fn authenticated(&self) -> bool {
+ false
+ }
+
+ fn scopes(&self) -> Vec<Scope> {
+ Vec::with_capacity(0)
+ }
+}
+
+impl ClientTrait for Client {
+
+ fn id<'a>(&'a self) -> &'a str {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.id(),
+ Auth(inner) => inner.id(),
+ }
+ }
+
+ fn domain<'a>(&'a self) -> &'a str {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.domain(),
+ Auth(inner) => inner.domain(),
+ }
+ }
+
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.ratelimit(key),
+ Auth(inner) => inner.ratelimit(key),
+ }
+ }
+
+ fn authenticated(&self) -> bool {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.authenticated(),
+ Auth(inner) => inner.authenticated(),
+ }
+ }
+
+ fn scopes(&self) -> Vec<Scope> {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.scopes(),
+ Auth(inner) => inner.scopes(),
+ }
+ }
+}
+
+/*TODO I'd be nice to remove this boiler plate */
+impl ClientTrait for AuthClient {
+ fn id<'a>(&'a self) -> &'a str {
+ match self.previous.inner.as_ref() {
+ ClientType::Auth(auth) => auth.id(),
+ ClientType::Unauth(unauth) => unauth.id(),
+ }
+ }
+
+ fn domain<'a>(&'a self) -> &'a str {
+ match self.previous.inner.as_ref() {
+ ClientType::Auth(auth) => auth.domain(),
+ ClientType::Unauth(unauth) => unauth.domain(),
+ }
+ }
+
+ fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
+ match self.previous.inner.as_ref() {
+ ClientType::Auth(auth) => auth.ratelimit(key),
+ ClientType::Unauth(unauth) => unauth.ratelimit(key),
+ }
+ }
+
+ fn authenticated(&self) -> bool {
+ let auth = self.auth_state.lock().expect("Auth Lock is poisoned");
+ auth.state == AuthState::Auth
+ }
+
+ fn scopes(&self) -> Vec<Scope> {
+ let auth = self.auth_state.lock().expect("Auth Lock is poisoned");
+ Vec::with_capacity(0)
+ }
+}
+
+#[derive(Clone, PartialEq)]
+enum AuthState {
+ Unauth,
+ Auth,
+}
+
+struct AuthStateRef {
+ token: Option<String>,
+ scopes: Vec<Scope>,
+ state: AuthState,
+}
+
+impl Client {
+ pub fn new(id: &str, version: Version) -> Client {
+ let client = ReqwestClient::new();
+ Client::new_with_client(id, client, version)
+ }
+
+ fn default_ratelimits() -> RatelimitMap {
+ let mut limits = RatelimitMap::new();
+ limits.insert(RatelimitKey::Default, Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset"));
+
+ limits
+ }
+
+ pub fn new_with_client(id: &str, reqwest: ReqwestClient, version: Version) -> Client {
+
+ Client {
+ inner: Arc::new(
+ ClientType::Unauth(UnauthClient {
+ id: id.to_owned(),
+ reqwest: reqwest,
+ domain: API_DOMAIN.to_owned(),
+ ratelimits: Self::default_ratelimits(),
+ version: version,
+ }))
+ }
+ }
+
+ fn secret<'a>(&'a self) -> Option<&'a str> {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(_) => None,
+ Auth(inner) => Some(&inner.secret),
+ }
+ }
+
+ fn version(&self) -> Version {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.version.clone(),
+ Auth(inner) => inner.previous.version(),
+ }
+ }
+
+ fn reqwest(&self) -> ReqwestClient {
+ use self::ClientType::*;
+ match self.inner.as_ref() {
+ Unauth(inner) => inner.reqwest.clone(),
+ Auth(inner) => inner.previous.reqwest(),
+ }
+ }
+
+ /* The 'bottom' client must always be a client that is not authorized.
+ * This which allows for calls to Auth endpoints using the same control flow
+ * as other requests.
+ *
+ * Clients created with 'new' are bottom clients and calls
+ * to authenticate stack an authed client on top
+ */
+ fn get_bottom_client(&self) -> Client {
+ match self.inner.as_ref() {
+ ClientType::Auth(inner) => inner.previous.get_bottom_client(),
+ ClientType::Unauth(_) => self.clone(),
+ }
+ }
+
+ fn apply_standard_headers(&self, request: RequestBuilder)
+ -> RequestBuilder
+ {
+ match self.version() {
+ Version::Helix => {
+ let token = match self.inner.as_ref() {
+ ClientType::Auth(inner) => {
+ let auth = inner.auth_state.lock().expect("Authlock is poisoned");
+ auth.token.as_ref().map(|s| s.to_owned())
+ }
+ ClientType::Unauth(_) => None,
+ };
+
+ let client_header = header::HeaderValue::from_str(self.id()).unwrap();
+
+ let request =
+ if let Some(token) = token {
+ let value = "Bearer ".to_owned() + &token;
+ let token_header = header::HeaderValue::from_str(&value).unwrap();
+ request.header("Authorization", token_header)
+ } else { request };
+
+ request.header("Client-ID", client_header)
+ },
+ Version::Kraken => {
+ request
+ }
+ }
+ }
+}
+
+
+use reqwest::r#async::{RequestBuilder};
+use reqwest::header;
+
+
+pub struct AuthClientBuilder {
+ scopes: HashSet<Scope>,
+ secret: String,
+ token: Option<String>,
+ client: Client,
+ /*If the user supplies a token,
+ * then we can skip fetching it from the server and are authenticated
+ */
+}
+
+impl AuthClientBuilder {
+ pub fn new(client: Client, secret: &str) -> AuthClientBuilder {
+ AuthClientBuilder {
+ scopes: HashSet::new(),
+ client: client,
+ secret: secret.to_owned(),
+ token: None,
+ }
+ }
+
+ pub fn build(self) -> Client {
+ let auth_state = if self.token.is_some() { AuthState::Auth } else { AuthState::Unauth };
+ let old_client = self.client;
+ Client {
+ inner: Arc::new(ClientType::Auth(
+ AuthClient {
+ secret: self.secret,
+ auth_barrier: Barrier::new(),
+ auth_state: Mutex::new (
+ AuthStateRef {
+ token: self.token,
+ scopes: Vec::new(),
+ state: auth_state,
+ }),
+ previous: old_client,
+ }))
+ }
+ }
+
+ pub fn scope(mut self, scope: Scope) -> AuthClientBuilder {
+ let scopes = &mut self.scopes;
+ scopes.insert(scope);
+ self
+ }
+
+ pub fn scopes(mut self, scopes: Vec<Scope>) -> AuthClientBuilder {
+ let _scopes = &mut self.scopes;
+ for scope in scopes {
+ _scopes.insert(scope);
+ }
+ self
+ }
+
+ pub fn token(mut self, token: &str) -> AuthClientBuilder {
+ self.token.replace(token.to_owned());
+ self
+ }
+}
+
+use std::collections::BTreeMap;
+use reqwest::Method;
+
+struct RequestRef {
+ url: String,
+ params: BTreeMap<String, String>,
+ client: Client,
+ ratelimit: Option<RatelimitKey>,
+ method: Method,
+}
+
+impl RequestRef {
+ pub fn new(url: String,
+ params: BTreeMap<&str, &str>,
+ client: Client,
+ method: Method,
+ ratelimit: Option<RatelimitKey>,
+ ) -> RequestRef
+ {
+ let mut owned_params = BTreeMap::new();
+ for (key, value) in params {
+ owned_params.insert(key.to_owned(), value.to_owned());
+ }
+
+ RequestRef {
+ url: url,
+ params: owned_params,
+ client: client,
+ method: method,
+ ratelimit: ratelimit,
+ }
+ }
+}
+
+enum RequestState<T> {
+ SetupRequest,
+ SetupBarriers,
+ WaitAuth(WaiterState<AuthWaiter>),
+ SetupRatelimit,
+ WaitLimit(WaiterState<RatelimitWaiter>),
+ WaitRequest,
+ PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
+}
+
+pub struct ApiRequest<T> {
+ inner: Arc<RequestRef>,
+ state: RequestState<T>,
+ attempt: u32,
+ max_attempts: u32,
+ pagination: Option<String>,
+}
+
+enum IterableApiRequestState<T> {
+ Start,
+ PollInner(ApiRequest<T>),
+ Finished,
+}
+
+pub struct IterableApiRequest<T> {
+ inner: Arc<RequestRef>,
+ state: IterableApiRequestState<T>,
+}
+
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
+
+ pub fn new(url: String,
+ params: BTreeMap<&str, &str>,
+ client: Client,
+ method: Method,
+ ratelimit: Option<RatelimitKey>,
+ ) -> ApiRequest<T>
+ {
+ ApiRequest {
+ inner: Arc::new(RequestRef::new(url, params, client, method, ratelimit)),
+ state: RequestState::SetupRequest,
+ attempt: 0,
+ max_attempts: 1,
+ pagination: None,
+ }
+ }
+}
+
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> IterableApiRequest<T> {
+
+ pub fn new(url: String,
+ params: BTreeMap<&str, &str>,
+ client: Client,
+ method: Method,
+ ratelimit: Option<RatelimitKey>
+ ) -> IterableApiRequest<T>
+ {
+ let request_ref =
+ Arc::new(RequestRef::new(url, params, client, method, ratelimit));
+
+ IterableApiRequest {
+ inner: request_ref,
+ state: IterableApiRequestState::Start,
+ }
+ }
+}
+
+
+pub struct RatelimitWaiter {
+ limit: Ratelimit,
+}
+
+#[derive(Clone)]
+pub struct Ratelimit {
+ inner: Arc<Mutex<RatelimitRef>>,
+ barrier: Barrier,
+}
+
+impl Ratelimit {
+ pub fn new(limit: i32,
+ header_limit: &str,
+ header_remaining: &str,
+ header_reset: &str)
+ -> Ratelimit
+ {
+ Ratelimit {
+ inner: Arc::new(
+ Mutex::new(
+ RatelimitRef {
+ limit: limit,
+ remaining: limit,
+ inflight: 0,
+ reset: None,
+ header_limit: header_limit.to_owned(),
+ header_remaining: header_remaining.to_owned(),
+ header_reset: header_reset.to_owned(),
+ }
+ )
+ ),
+ barrier: Barrier::new(),
+ }
+ }
+}
+
+
+
+#[derive(Debug, Clone)]
+pub struct RatelimitRef {
+ limit: i32,
+ remaining: i32,
+ inflight: i32,
+ reset: Option<u32>,
+ header_limit: String,
+ header_remaining: String,
+ header_reset: String,
+}
+
+
+impl RatelimitRef {
+ pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
+ let maybe_limit =
+ headers
+ .get(&self.header_limit)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_limit {
+ self.limit = limit;
+ }
+
+ let maybe_remaining =
+ headers
+ .get(&self.header_remaining)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<i32>().ok());
+
+ if let Some(limit) = maybe_remaining {
+ self.remaining = limit;
+ }
+
+ let maybe_reset =
+ headers
+ .get(&self.header_reset)
+ .and_then(|x| x.to_str().ok())
+ .and_then(|x| x.parse::<u32>().ok());
+
+ if let Some(reset) = maybe_reset {
+ self.reset = Some(reset);
+ }
+ }
+}
+
+use futures::future::SharedError;
+use crate::sync::barrier::Barrier;
+use crate::sync::waiter::Waiter;
+
+struct WaiterState<W: Waiter> {
+ polling: bool,
+ shared_future: Option<(Shared<Box<Future<Item=(), Error=ConditionError> + Send>>)>,
+ waiter: W,
+ barrier: Barrier,
+}
+
+impl<W: Waiter> WaiterState<W> {
+ fn new(waiter: W, barrier: &Barrier) -> WaiterState<W> {
+ WaiterState {
+ polling: false,
+ shared_future: None,
+ waiter: waiter,
+ barrier: barrier.clone(),
+ }
+ }
+}
+
+impl<W: Waiter> Future for WaiterState<W> {
+ type Item = <W as Waiter>::Item;
+ type Error = <W as Waiter>::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ loop {
+ let blocked = self.waiter.blocked();
+ if blocked && !self.polling {
+ let fut = self.barrier.condition(&self.waiter);
+ self.shared_future = Some(fut);
+ self.polling = true;
+ } else if blocked || self.polling {
+ let f = self.shared_future.as_mut().unwrap();
+ try_ready!(f.poll());
+ self.polling = false;
+ } else {
+ return Ok(Async::Ready(<W as Waiter>::Item::default()));
+ }
+ }
+ }
+}
+
+
+struct AuthWaiter {
+ waiter: Client,
+}
+
+impl Waiter for AuthWaiter {
+ type Item = ();
+ type Error = ConditionError;
+
+ fn blocked(&self) -> bool {
+ match self.waiter.inner.as_ref() {
+ ClientType::Unauth(_) => false,
+ ClientType::Auth(inner) => {
+ let auth = inner.auth_state.lock()
+ .expect("unable to lock auth state");
+ auth.state == AuthState::Unauth
+ }
+ }
+ }
+
+ fn condition(&self) ->
+ Shared<Box<Future<Item=(), Error=ConditionError> + Send>> {
+ /* If a secret is not provided then just immediately return */
+ let secret = self.waiter.secret().unwrap();
+ let bottom_client = self.waiter.get_bottom_client();
+ let client = self.waiter.clone();
+
+ let auth_future =
+ bottom_client
+ .auth()
+ .client_credentials(secret)
+ .map(move |credentials| {
+ println!("{:?}", credentials);
+ if let ClientType::Auth(inner) = client.inner.as_ref() {
+ let mut auth = inner.auth_state.lock().unwrap();
+ auth.state = AuthState::Auth;
+ auth.token = Some(credentials.access_token.clone());
+ }
+ ()
+ })
+ .map_err(|_| ConditionError{});
+
+ Future::shared(Box::new(auth_future))
+ }
+}
+
+impl Waiter for RatelimitWaiter {
+ type Item = ();
+ type Error = ConditionError;
+
+ fn blocked(&self) -> bool {
+ let limits = self.limit.inner.lock().unwrap();
+ println!("{}, {}, {}", limits.limit, limits.remaining, limits.inflight);
+ limits.remaining - limits.inflight <= 0
+ }
+
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
+ {
+ /*TODO: Really basic for now*/
+ use futures_timer::Delay;
+ use std::time::Duration;
+ let limits = self.limit.clone();
+ Future::shared(Box::new(
+ Delay::new(Duration::from_secs(60))
+ .map(move |_res| {
+ let mut limits = limits.inner.lock().unwrap();
+ limits.remaining = limits.limit;
+ ()
+ })
+ .map_err(|_| ConditionError{})
+ ))
+ }
+}
+
+/* Todo: If the polled futures returns an error than all the waiters should
+ * get that error
+ */
+
+/* Macro ripped directly from try_ready and simplies retries if any error occurs
+ * and there are remaning retry attempt
+ */
+#[macro_export]
+macro_rules! retry_ready {
+ ($s:expr, $e:expr) => (match $e {
+ Ok(futures::prelude::Async::Ready(t)) => t,
+ Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady),
+ Err(e) => {
+ if $s.attempt < $s.max_attempts {
+ $s.attempt += 1;
+ $s.state = RequestState::SetupBarriers;
+ continue;
+ } else {
+ return Err(e.into());
+ }
+ }
+ })
+}
+
+use futures::Stream;
+
+impl<T: DeserializeOwned + PaginationTrait + 'static + Send> Stream for IterableApiRequest<T> {
+ type Item = T;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match &mut self.state {
+ IterableApiRequestState::Start => {
+ self.state =
+ IterableApiRequestState::PollInner(
+ ApiRequest {
+ inner: self.inner.clone(),
+ state: RequestState::SetupRequest,
+ attempt: 0,
+ max_attempts: 1,
+ pagination: None
+ });
+ },
+ IterableApiRequestState::PollInner(request) => {
+ let f = request as &mut Future<Item=Self::Item, Error=Self::Error>;
+ match f.poll() {
+ Err(err) => {
+ self.state = IterableApiRequestState::Finished;
+ return Err(err);
+ },
+ Ok(state) => {
+ match state {
+ Async::NotReady => return Ok(Async::NotReady),
+ Async::Ready(res) => {
+ let cursor = res.cursor();
+ match cursor {
+ Some(cursor) => {
+ self.state = IterableApiRequestState::PollInner(
+ ApiRequest {
+ inner: self.inner.clone(),
+ state: RequestState::SetupRequest,
+ attempt: 0,
+ max_attempts: 1,
+ pagination: Some(cursor.to_owned()),
+ });
+ },
+ None => {
+ self.state = IterableApiRequestState::Finished;
+ }
+ }
+ return Ok(Async::Ready(Some(res)));
+ }
+ }
+ }
+ }
+ },
+ IterableApiRequestState::Finished => {
+ return Ok(Async::Ready(None));
+ }
+ }
+ }
+ }
+}
+
+impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
+ type Item = T;
+ type Error = Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ loop {
+ match &mut self.state {
+ RequestState::SetupRequest => {
+ self.attempt = 0;
+ self.state = RequestState::SetupBarriers;
+ }
+ RequestState::SetupBarriers => {
+ match self.inner.client.inner.as_ref() {
+ ClientType::Auth(inner) => {
+ let waiter = AuthWaiter {
+ waiter: self.inner.client.clone(),
+ };
+
+ let f = WaiterState::new(waiter,
+ &inner.auth_barrier);
+ self.state = RequestState::WaitAuth(f);
+ },
+ ClientType::Unauth(_) => {
+ self.state = RequestState::SetupRatelimit;
+ }
+ }
+ },
+ RequestState::WaitAuth(auth) => {
+ let _waiter = retry_ready!(self, auth.poll());
+ self.state = RequestState::SetupRatelimit;
+ },
+ RequestState::SetupRatelimit => {
+ let limits =
+ self.inner.ratelimit.as_ref().and_then(|key| {
+ self.inner.client.ratelimit(key.clone())
+ });
+ match limits {
+ Some(ratelimit) => {
+ let barrier = ratelimit.barrier.clone();
+ let waiter = RatelimitWaiter {
+ limit: ratelimit.clone(),
+ };
+ let f = WaiterState::new(waiter,
+ &barrier);
+ self.state = RequestState::WaitLimit(f);
+ },
+ None => {
+ self.state = RequestState::WaitRequest;
+ }
+ }
+ },
+ RequestState::WaitLimit(limit) => {
+ let _waiter = retry_ready!(self, limit.poll());
+ self.state = RequestState::WaitRequest;
+ },
+ RequestState::WaitRequest => {
+ let client = self.inner.client.clone();
+ let c_ref = &client;
+ let reqwest = client.reqwest();
+
+ let limits =
+ self.inner.ratelimit.as_ref().and_then(|key| {
+ c_ref.ratelimit(key.clone())
+ });
+
+ if let Some(limits) = limits {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight + 1;
+ }
+
+ let mut builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
+ builder = client.apply_standard_headers(builder);
+ builder = builder.query(&self.inner.params);
+ builder =
+ if let Some(cursor) = &self.pagination {
+ builder.query(&[("after", cursor)])
+ } else {
+ builder
+ };
+
+
+ let key_err = self.inner.ratelimit.clone();
+ let key_ok = self.inner.ratelimit.clone();
+ let client_err = client.clone();
+ let client_ok = client.clone();
+
+ let f =
+ builder.send()
+ .map_err(move |err| {
+ if let Some(key) = key_err {
+ if let Some(limits) = client_err.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ }
+ }
+
+ err
+ })
+ .map(move |mut response| {
+ println!("{:?}", response);
+ if let Some(key) = key_ok {
+ if let Some(limits) = client_ok.ratelimit(key) {
+ let mut mut_limits = limits.inner.lock().unwrap();
+ mut_limits.inflight = mut_limits.inflight - 1;
+ mut_limits.update_from_headers(response.headers());
+ }
+ }
+
+ response.json::<T>()
+ })
+ .and_then(|json| {
+ json
+ });
+ self.state = RequestState::PollParse(Box::new(f));
+ },
+ RequestState::PollParse(future) => {
+ let res = retry_ready!(self, future.poll());
+ return Ok(Async::Ready(res));
+ },
+ }
+ }
+ }
+}
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index 15f2008..8fbca9f 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -1,30 +1,10 @@
-use futures::future::Future;
-use std::sync::{Arc, Mutex};
-use reqwest::r#async::Client as ReqwestClient;
-
-use std::collections::{HashSet, HashMap};
-use super::error::Error;
-use futures::future::Shared;
-use futures::Poll;
-use serde::de::DeserializeOwned;
-use futures::Async;
-use futures::try_ready;
-
-use crate::error::ConditionError;
-
-
-pub use super::types;
-
pub mod models;
pub mod namespaces;
-const API_DOMAIN: &'static str = "api.twitch.tv";
-
-#[derive(PartialEq, Eq, Hash, Clone)]
-pub enum RatelimitKey {
- Default,
-}
-type RatelimitMap = HashMap<RatelimitKey, Ratelimit>;
+use crate::client::Client as GenericClient;
+use crate::client::AuthClientBuilder;
+use crate::client::Version;
+use crate::client::ClientTrait;
#[derive(PartialEq, Hash, Eq, Clone)]
pub enum Scope {
@@ -40,817 +20,17 @@ pub enum Scope {
#[derive(Clone)]
pub struct Client {
- inner: Arc<ClientType>,
-}
-
-enum ClientType {
- Unauth(UnauthClient),
- Auth(AuthClient),
-}
-
-/*TODO: Try to remove this boilerplate too*/
-impl ClientTrait for Client {
-
- fn id<'a>(&'a self) -> &'a str {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.id(),
- Auth(inner) => inner.id(),
- }
- }
-
- fn domain<'a>(&'a self) -> &'a str {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.domain(),
- Auth(inner) => inner.domain(),
- }
- }
-
- fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.ratelimit(key),
- Auth(inner) => inner.ratelimit(key),
- }
- }
-
- fn authenticated(&self) -> bool {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.authenticated(),
- Auth(inner) => inner.authenticated(),
- }
- }
-
- fn scopes(&self) -> Vec<Scope> {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.scopes(),
- Auth(inner) => inner.scopes(),
- }
- }
-}
-
-pub struct UnauthClient {
- id: String,
- reqwest: ReqwestClient,
- domain: String,
- ratelimits: RatelimitMap,
-}
-
-impl Client {
-
- pub fn authenticate(self, secret: &str) -> AuthClientBuilder {
- AuthClientBuilder::new(self, secret)
- }
-
- pub fn deauthenticate(self) -> Client {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(_inner) => self,
- Auth(inner) => inner.previous.clone(),
- }
- }
-}
-
-
-pub trait ClientTrait {
-
- fn id<'a>(&'a self) -> &'a str;
- fn domain<'a>(&'a self) -> &'a str;
- fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>;
-
- fn authenticated(&self) -> bool;
- fn scopes(&self) -> Vec<Scope>;
-}
-
-impl ClientTrait for UnauthClient {
- fn id<'a>(&'a self) -> &'a str {
- &self.id
- }
-
- fn domain<'a>(&'a self) -> &'a str {
- &self.domain
- }
-
- fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
- self.ratelimits.get(&key)
- }
-
- fn authenticated(&self) -> bool {
- false
- }
-
- fn scopes(&self) -> Vec<Scope> {
- Vec::with_capacity(0)
- }
-}
-
-pub struct AuthClient {
- secret: String,
- auth_state: Mutex<AuthStateRef>,
- auth_barrier: Barrier,
- previous: Client,
-}
-
-/*TODO I'd be nice to remove this boiler plate */
-impl ClientTrait for AuthClient {
- fn id<'a>(&'a self) -> &'a str {
- match self.previous.inner.as_ref() {
- ClientType::Auth(auth) => auth.id(),
- ClientType::Unauth(unauth) => unauth.id(),
- }
- }
-
- fn domain<'a>(&'a self) -> &'a str {
- match self.previous.inner.as_ref() {
- ClientType::Auth(auth) => auth.domain(),
- ClientType::Unauth(unauth) => unauth.domain(),
- }
- }
-
- fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> {
- match self.previous.inner.as_ref() {
- ClientType::Auth(auth) => auth.ratelimit(key),
- ClientType::Unauth(unauth) => unauth.ratelimit(key),
- }
- }
-
- fn authenticated(&self) -> bool {
- let auth = self.auth_state.lock().expect("Auth Lock is poisoned");
- auth.state == AuthState::Auth
- }
-
- fn scopes(&self) -> Vec<Scope> {
- let auth = self.auth_state.lock().expect("Auth Lock is poisoned");
- Vec::with_capacity(0)
- }
-}
-
-#[derive(Clone, PartialEq)]
-enum AuthState {
- Unauth,
- Auth,
-}
-
-struct AuthStateRef {
- token: Option<String>,
- scopes: Vec<Scope>,
- state: AuthState,
+ inner: GenericClient
}
impl Client {
pub fn new(id: &str) -> Client {
- let client = ReqwestClient::new();
- Client::new_with_client(id, client)
- }
-
- fn default_ratelimits() -> RatelimitMap {
- let mut limits = RatelimitMap::new();
- limits.insert(RatelimitKey::Default, Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset"));
-
- limits
- }
-
- pub fn new_with_client(id: &str, reqwest: ReqwestClient) -> Client {
-
- Client {
- inner: Arc::new(
- ClientType::Unauth(UnauthClient {
- id: id.to_owned(),
- reqwest: reqwest,
- domain: API_DOMAIN.to_owned(),
- ratelimits: Self::default_ratelimits(),
- }))
- }
- }
-
- fn secret<'a>(&'a self) -> Option<&'a str> {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(_) => None,
- Auth(inner) => Some(&inner.secret),
- }
- }
-
- fn reqwest(&self) -> ReqwestClient {
- use self::ClientType::*;
- match self.inner.as_ref() {
- Unauth(inner) => inner.reqwest.clone(),
- Auth(inner) => inner.previous.reqwest(),
- }
- }
-
- /* The 'bottom' client must always be a client that is not authorized.
- * This which allows for calls to Auth endpoints using the same control flow
- * as other requests.
- *
- * Clients created with 'new' are bottom clients and calls
- * to authenticate stack an authed client on top
- */
- fn get_bottom_client(&self) -> Client {
- match self.inner.as_ref() {
- ClientType::Auth(inner) => inner.previous.get_bottom_client(),
- ClientType::Unauth(_) => self.clone(),
- }
- }
-
- fn apply_standard_headers(&self, request: RequestBuilder)
- -> RequestBuilder
- {
- let token = match self.inner.as_ref() {
- ClientType::Auth(inner) => {
- let auth = inner.auth_state.lock().expect("Authlock is poisoned");
- auth.token.as_ref().map(|s| s.to_owned())
- }
- ClientType::Unauth(_) => None,
- };
-
- let client_header = header::HeaderValue::from_str(self.id()).unwrap();
-
- let request =
- if let Some(token) = token {
- let value = "Bearer ".to_owned() + &token;
- let token_header = header::HeaderValue::from_str(&value).unwrap();
- request.header("Authorization", token_header)
- } else { request };
-
- request.header("Client-ID", client_header)
- }
-}
-
-
-use reqwest::r#async::{RequestBuilder};
-use reqwest::header;
-
-
-pub struct AuthClientBuilder {
- scopes: HashSet<Scope>,
- secret: String,
- token: Option<String>,
- client: Client,
- /*If the user supplies a token,
- * then we can skip fetching it from the server and are authenticated
- */
-}
-
-impl AuthClientBuilder {
- pub fn new(client: Client, secret: &str) -> AuthClientBuilder {
- AuthClientBuilder {
- scopes: HashSet::new(),
- client: client,
- secret: secret.to_owned(),
- token: None,
- }
- }
-
- pub fn build(self) -> Client {
- let auth_state = if self.token.is_some() { AuthState::Auth } else { AuthState::Unauth };
- let old_client = self.client;
Client {
- inner: Arc::new(ClientType::Auth(
- AuthClient {
- secret: self.secret,
- auth_barrier: Barrier::new(),
- auth_state: Mutex::new (
- AuthStateRef {
- token: self.token,
- scopes: Vec::new(),
- state: auth_state,
- }),
- previous: old_client,
- }))
- }
- }
-
- pub fn scope(mut self, scope: Scope) -> AuthClientBuilder {
- let scopes = &mut self.scopes;
- scopes.insert(scope);
- self
- }
-
- pub fn scopes(mut self, scopes: Vec<Scope>) -> AuthClientBuilder {
- let _scopes = &mut self.scopes;
- for scope in scopes {
- _scopes.insert(scope);
- }
- self
- }
-
- pub fn token(mut self, token: &str) -> AuthClientBuilder {
- self.token.replace(token.to_owned());
- self
- }
-}
-
-use std::collections::BTreeMap;
-use reqwest::Method;
-
-struct RequestRef {
- url: String,
- params: BTreeMap<String, String>,
- client: Client,
- ratelimit: Option<RatelimitKey>,
- method: Method,
-}
-
-impl RequestRef {
- pub fn new(url: String,
- params: BTreeMap<&str, &str>,
- client: Client,
- method: Method,
- ratelimit: Option<RatelimitKey>,
- ) -> RequestRef
- {
- let mut owned_params = BTreeMap::new();
- for (key, value) in params {
- owned_params.insert(key.to_owned(), value.to_owned());
- }
-
- RequestRef {
- url: url,
- params: owned_params,
- client: client,
- method: method,
- ratelimit: ratelimit,
- }
- }
-}
-
-enum RequestState<T> {
- SetupRequest,
- SetupBarriers,
- WaitAuth(WaiterState<AuthWaiter>),
- SetupRatelimit,
- WaitLimit(WaiterState<RatelimitWaiter>),
- WaitRequest,
- PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
-}
-
-pub struct ApiRequest<T> {
- inner: Arc<RequestRef>,
- state: RequestState<T>,
- attempt: u32,
- max_attempts: u32,
- pagination: Option<String>,
-}
-
-enum IterableApiRequestState<T> {
- Start,
- PollInner(ApiRequest<T>),
- Finished,
-}
-
-pub struct IterableApiRequest<T> {
- inner: Arc<RequestRef>,
- state: IterableApiRequestState<T>,
-}
-
-use self::models::PaginationTrait;
-impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> {
-
- pub fn new(url: String,
- params: BTreeMap<&str, &str>,
- client: Client,
- method: Method,
- ratelimit: Option<RatelimitKey>,
- ) -> ApiRequest<T>
- {
- ApiRequest {
- inner: Arc::new(RequestRef::new(url, params, client, method, ratelimit)),
- state: RequestState::SetupRequest,
- attempt: 0,
- max_attempts: 1,
- pagination: None,
+ inner: GenericClient::new(id, Version::Helix)
}
}
-}
-
-impl<T: DeserializeOwned + PaginationTrait + 'static + Send> IterableApiRequest<T> {
-
- pub fn new(url: String,
- params: BTreeMap<&str, &str>,
- client: Client,
- method: Method,
- ratelimit: Option<RatelimitKey>
- ) -> IterableApiRequest<T>
- {
- let request_ref =
- Arc::new(RequestRef::new(url, params, client, method, ratelimit));
- IterableApiRequest {
- inner: request_ref,
- state: IterableApiRequestState::Start,
- }
- }
-}
-
-
-pub struct RatelimitWaiter {
- limit: Ratelimit,
-}
-
-#[derive(Clone)]
-pub struct Ratelimit {
- inner: Arc<Mutex<RatelimitRef>>,
- barrier: Barrier,
-}
-
-impl Ratelimit {
- pub fn new(limit: i32,
- header_limit: &str,
- header_remaining: &str,
- header_reset: &str)
- -> Ratelimit
- {
- Ratelimit {
- inner: Arc::new(
- Mutex::new(
- RatelimitRef {
- limit: limit,
- remaining: limit,
- inflight: 0,
- reset: None,
- header_limit: header_limit.to_owned(),
- header_remaining: header_remaining.to_owned(),
- header_reset: header_reset.to_owned(),
- }
- )
- ),
- barrier: Barrier::new(),
- }
- }
-}
-
-
-
-#[derive(Debug, Clone)]
-pub struct RatelimitRef {
- limit: i32,
- remaining: i32,
- inflight: i32,
- reset: Option<u32>,
- header_limit: String,
- header_remaining: String,
- header_reset: String,
-}
-
-
-impl RatelimitRef {
- pub fn update_from_headers(&mut self, headers: &reqwest::header::HeaderMap) {
- let maybe_limit =
- headers
- .get(&self.header_limit)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_limit {
- self.limit = limit;
- }
-
- let maybe_remaining =
- headers
- .get(&self.header_remaining)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<i32>().ok());
-
- if let Some(limit) = maybe_remaining {
- self.remaining = limit;
- }
-
- let maybe_reset =
- headers
- .get(&self.header_reset)
- .and_then(|x| x.to_str().ok())
- .and_then(|x| x.parse::<u32>().ok());
-
- if let Some(reset) = maybe_reset {
- self.reset = Some(reset);
- }
- }
-}
-
-use futures::future::SharedError;
-use crate::sync::barrier::Barrier;
-use crate::sync::waiter::Waiter;
-
-struct WaiterState<W: Waiter> {
- polling: bool,
- shared_future: Option<(Shared<Box<Future<Item=(), Error=ConditionError> + Send>>)>,
- waiter: W,
- barrier: Barrier,
-}
-
-impl<W: Waiter> WaiterState<W> {
- fn new(waiter: W, barrier: &Barrier) -> WaiterState<W> {
- WaiterState {
- polling: false,
- shared_future: None,
- waiter: waiter,
- barrier: barrier.clone(),
- }
- }
-}
-
-impl<W: Waiter> Future for WaiterState<W> {
- type Item = <W as Waiter>::Item;
- type Error = <W as Waiter>::Error;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- loop {
- let blocked = self.waiter.blocked();
- if blocked && !self.polling {
- let fut = self.barrier.condition(&self.waiter);
- self.shared_future = Some(fut);
- self.polling = true;
- } else if blocked || self.polling {
- let f = self.shared_future.as_mut().unwrap();
- try_ready!(f.poll());
- self.polling = false;
- } else {
- return Ok(Async::Ready(<W as Waiter>::Item::default()));
- }
- }
- }
-}
-
-
-struct AuthWaiter {
- waiter: Client,
-}
-
-impl Waiter for AuthWaiter {
- type Item = ();
- type Error = ConditionError;
-
- fn blocked(&self) -> bool {
- match self.waiter.inner.as_ref() {
- ClientType::Unauth(_) => false,
- ClientType::Auth(inner) => {
- let auth = inner.auth_state.lock()
- .expect("unable to lock auth state");
- auth.state == AuthState::Unauth
- }
- }
- }
-
- fn condition(&self) ->
- Shared<Box<Future<Item=(), Error=ConditionError> + Send>> {
- /* If a secret is not provided then just immediately return */
- let secret = self.waiter.secret().unwrap();
- let bottom_client = self.waiter.get_bottom_client();
- let client = self.waiter.clone();
-
- let auth_future =
- bottom_client
- .auth()
- .client_credentials(secret)
- .map(move |credentials| {
- println!("{:?}", credentials);
- if let ClientType::Auth(inner) = client.inner.as_ref() {
- let mut auth = inner.auth_state.lock().unwrap();
- auth.state = AuthState::Auth;
- auth.token = Some(credentials.access_token.clone());
- }
- ()
- })
- .map_err(|_| ConditionError{});
-
- Future::shared(Box::new(auth_future))
- }
-}
-
-impl Waiter for RatelimitWaiter {
- type Item = ();
- type Error = ConditionError;
-
- fn blocked(&self) -> bool {
- let limits = self.limit.inner.lock().unwrap();
- println!("{}, {}, {}", limits.limit, limits.remaining, limits.inflight);
- limits.remaining - limits.inflight <= 0
- }
-
- fn condition(&self)
- -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
- {
- /*TODO: Really basic for now*/
- use futures_timer::Delay;
- use std::time::Duration;
- Future::shared(Box::new(
- Delay::new(Duration::from_secs(60)).map_err(|_| ConditionError{})
- ))
- }
-}
-
-/* Todo: If the polled futures returns an error than all the waiters should
- * get that error
- */
-
-/* Macro ripped directly from try_ready and simplies retries if any error occurs
- * and there are remaning retry attempt
- */
-#[macro_export]
-macro_rules! retry_ready {
- ($s:expr, $e:expr) => (match $e {
- Ok(futures::prelude::Async::Ready(t)) => t,
- Ok(futures::prelude::Async::NotReady) => return Ok(futures::prelude::Async::NotReady),
- Err(e) => {
- if $s.attempt < $s.max_attempts {
- $s.attempt += 1;
- $s.state = RequestState::SetupBarriers;
- continue;
- } else {
- return Err(e.into());
- }
- }
- })
-}
-
-use futures::Stream;
-
-impl<T: DeserializeOwned + PaginationTrait + 'static + Send> Stream for IterableApiRequest<T> {
- type Item = T;
- type Error = Error;
-
- fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
- loop {
- match &mut self.state {
- IterableApiRequestState::Start => {
- self.state =
- IterableApiRequestState::PollInner(
- ApiRequest {
- inner: self.inner.clone(),
- state: RequestState::SetupRequest,
- attempt: 0,
- max_attempts: 1,
- pagination: None
- });
- },
- IterableApiRequestState::PollInner(request) => {
- let f = request as &mut Future<Item=Self::Item, Error=Self::Error>;
- match f.poll() {
- Err(err) => {
- self.state = IterableApiRequestState::Finished;
- return Err(err);
- },
- Ok(state) => {
- match state {
- Async::NotReady => return Ok(Async::NotReady),
- Async::Ready(res) => {
- let cursor =
- res.cursor().as_ref()
- .and_then(|cursor| cursor.cursor.clone());
-
- match cursor {
- Some(cursor) => {
- self.state = IterableApiRequestState::PollInner(
- ApiRequest {
- inner: self.inner.clone(),
- state: RequestState::SetupRequest,
- attempt: 0,
- max_attempts: 1,
- pagination: Some(cursor.clone()),
- });
- },
- None => {
- self.state = IterableApiRequestState::Finished;
- }
- }
- return Ok(Async::Ready(Some(res)));
- }
- }
- }
- }
- },
- IterableApiRequestState::Finished => {
- return Ok(Async::Ready(None));
- }
- }
- }
- }
-}
-
-impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
- type Item = T;
- type Error = Error;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- loop {
- match &mut self.state {
- RequestState::SetupRequest => {
- self.attempt = 0;
- self.state = RequestState::SetupBarriers;
- }
- RequestState::SetupBarriers => {
- match self.inner.client.inner.as_ref() {
- ClientType::Auth(inner) => {
- let waiter = AuthWaiter {
- waiter: self.inner.client.clone(),
- };
-
- let f = WaiterState::new(waiter,
- &inner.auth_barrier);
- self.state = RequestState::WaitAuth(f);
- },
- ClientType::Unauth(_) => {
- self.state = RequestState::SetupRatelimit;
- }
- }
- },
- RequestState::WaitAuth(auth) => {
- let _waiter = retry_ready!(self, auth.poll());
- self.state = RequestState::SetupRatelimit;
- },
- RequestState::SetupRatelimit => {
- let limits =
- self.inner.ratelimit.as_ref().and_then(|key| {
- self.inner.client.ratelimit(key.clone())
- });
- match limits {
- Some(ratelimit) => {
- let barrier = ratelimit.barrier.clone();
- let waiter = RatelimitWaiter {
- limit: ratelimit.clone(),
- };
- let f = WaiterState::new(waiter,
- &barrier);
- self.state = RequestState::WaitLimit(f);
- },
- None => {
- self.state = RequestState::WaitRequest;
- }
- }
- },
- RequestState::WaitLimit(limit) => {
- let _waiter = retry_ready!(self, limit.poll());
- self.state = RequestState::WaitRequest;
- },
- RequestState::WaitRequest => {
- let client = self.inner.client.clone();
- let c_ref = &client;
- let reqwest = client.reqwest();
-
- let limits =
- self.inner.ratelimit.as_ref().and_then(|key| {
- c_ref.ratelimit(key.clone())
- });
-
- if let Some(limits) = limits {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight + 1;
- }
-
- let mut builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
- builder = client.apply_standard_headers(builder);
- builder = builder.query(&self.inner.params);
- builder =
- if let Some(cursor) = &self.pagination {
- builder.query(&[("after", cursor)])
- } else {
- builder
- };
-
-
- let key_err = self.inner.ratelimit.clone();
- let key_ok = self.inner.ratelimit.clone();
- let client_err = client.clone();
- let client_ok = client.clone();
-
- let f =
- builder.send()
- .map_err(move |err| {
- if let Some(key) = key_err {
- if let Some(limits) = client_err.ratelimit(key) {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
- }
- }
-
- err
- })
- .map(move |mut response| {
- println!("{:?}", response);
- if let Some(key) = key_ok {
- if let Some(limits) = client_ok.ratelimit(key) {
- let mut mut_limits = limits.inner.lock().unwrap();
- mut_limits.inflight = mut_limits.inflight - 1;
- mut_limits.update_from_headers(response.headers());
- }
- }
-
- response.json::<T>()
- })
- .and_then(|json| {
- json
- });
- self.state = RequestState::PollParse(Box::new(f));
- },
- RequestState::PollParse(future) => {
- let res = retry_ready!(self, future.poll());
- return Ok(Async::Ready(res));
- },
- }
- }
+ pub fn authenticate(self, secret: &str) -> AuthClientBuilder {
+ AuthClientBuilder::new(self.inner, secret)
}
}
diff --git a/src/helix/models.rs b/src/helix/models.rs
index 4124fd2..47a1288 100644
--- a/src/helix/models.rs
+++ b/src/helix/models.rs
@@ -3,11 +3,9 @@ extern crate chrono;
use url::Url;
use chrono::{DateTime, Utc};
-use super::types::{UserId, VideoId, ChannelId};
+use crate::types::{UserId, VideoId, ChannelId};
-pub trait PaginationTrait {
- fn cursor<'a>(&'a self) -> &'a Option<Cursor>;
-}
+use crate::client::PaginationTrait;
#[derive(Debug, Deserialize)]
pub struct DataContainer<T> {
@@ -15,15 +13,25 @@ pub struct DataContainer<T> {
}
impl<T> PaginationTrait for DataContainer<T> {
- fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None }
+ fn cursor<'a>(&'a self) -> Option<&'a str> { None }
}
impl<T> PaginationTrait for PaginationContainer<T> {
- fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &self.pagination }
+ fn cursor<'a>(&'a self) -> Option<&'a str> {
+ match self.pagination.as_ref() {
+ Some(cursor) => {
+ match cursor.cursor.as_ref() {
+ Some(cursor) => Some(cursor),
+ None => None,
+ }
+ },
+ None => None
+ }
+ }
}
impl PaginationTrait for Credentials {
- fn cursor<'a>(&'a self) -> &'a Option<Cursor> { &None }
+ fn cursor<'a>(&'a self) -> Option<&'a str> { None }
}
#[derive(Debug, Deserialize)]
@@ -37,17 +45,6 @@ pub struct Cursor {
pub cursor: Option<String>
}
-
-#[derive(Debug, Deserialize)]
-pub struct Credentials {
- pub access_token: String,
- pub refresh_token: Option<String>,
- pub expires_in: u32,
- pub scope: Option<Vec<String>>,
- pub token_type: String,
-}
-
-
#[derive(Debug, Deserialize)]
pub struct Video {
pub id: VideoId,
@@ -107,3 +104,12 @@ pub struct Clip {
pub thumbnail_url: Url,
pub view_count: i32,
}
+
+#[derive(Debug, Deserialize)]
+pub struct Credentials {
+ pub access_token: String,
+ pub refresh_token: Option<String>,
+ pub expires_in: u32,
+ pub scope: Option<Vec<String>>,
+ pub token_type: String,
+}
diff --git a/src/helix/namespaces/auth.rs b/src/helix/namespaces/auth.rs
index 478c1af..1ad5c57 100644
--- a/src/helix/namespaces/auth.rs
+++ b/src/helix/namespaces/auth.rs
@@ -1,9 +1,10 @@
use std::collections::BTreeMap;
-use super::super::models::Credentials;
+use crate::helix::models::Credentials;
use super::super::Client;
const ID_DOMAIN: &'static str = "id.twitch.tv";
use super::Namespace;
-use super::super::ClientTrait;
+use crate::client::{ClientTrait, ApiRequest};
+use reqwest::Method;
pub struct Auth {}
type AuthNamespace = Namespace<Auth>;
@@ -22,13 +23,11 @@ impl Client {
}
}
-use super::super::ApiRequest;
-use reqwest::Method;
-
//TODO: Implement scopes
pub fn client_credentials(client: Client, secret: &str)
-> ApiRequest<Credentials> {
+ let client = client.inner;
let url =
String::from("https://") +
ID_DOMAIN + "/oauth2/token";
diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs
index a73d2e9..28b66f7 100644
--- a/src/helix/namespaces/clips.rs
+++ b/src/helix/namespaces/clips.rs
@@ -1,10 +1,10 @@
use std::collections::BTreeMap;
use super::super::models::{DataContainer, Clip};
use super::super::Client;
-use super::super::ClientTrait;
-use super::super::RatelimitKey;
use super::Namespace;
+use crate::client::{RatelimitKey, ClientTrait, ApiRequest};
+
pub struct Clips {}
type ClipsNamespace = Namespace<Clips>;
@@ -22,12 +22,12 @@ impl Client {
}
}
-use super::super::ApiRequest;
use reqwest::Method;
pub fn clip(client: Client, id: &str)
-> ApiRequest<DataContainer<Clip>>
{
+ let client = client.inner;
let url =
String::from("https://") +
client.domain() + "/helix/clips" + "?id=" + id;
diff --git a/src/helix/namespaces/videos.rs b/src/helix/namespaces/videos.rs
index 382c9ea..a8021f3 100644
--- a/src/helix/namespaces/videos.rs
+++ b/src/helix/namespaces/videos.rs
@@ -1,13 +1,12 @@
use super::super::models::{PaginationContainer, Video};
use super::super::Client;
-use super::super::ClientTrait;
-use super::super::RatelimitKey;
-use super::super::IterableApiRequest;
use super::Namespace;
+use crate::client::{ClientTrait, RatelimitKey, IterableApiRequest};
use std::collections::BTreeMap;
use reqwest::Method;
+
pub struct Videos {}
type VideosNamespace = Namespace<Videos>;
@@ -40,6 +39,7 @@ impl Client {
pub fn by_id(client: Client, ids: Vec<&str>)
-> IterableApiRequest<PaginationContainer<Video>> {
+ let client = client.inner;
let url =
String::from("https://") + client.domain() + &String::from("/helix/videos");
@@ -54,6 +54,7 @@ pub fn by_id(client: Client, ids: Vec<&str>)
pub fn by_user(client: Client, user_id: &str)
-> IterableApiRequest<PaginationContainer<Video>> {
+ let client = client.inner;
let url =
String::from("https://") + client.domain() + &String::from("/helix/videos");
@@ -66,6 +67,7 @@ pub fn by_user(client: Client, user_id: &str)
pub fn for_game(client: Client, game_id: &str)
-> IterableApiRequest<PaginationContainer<Video>> {
+ let client = client.inner;
let url =
String::from("https://") + client.domain() + &String::from("/helix/videos");
diff --git a/src/lib.rs b/src/lib.rs
index e18f243..5392b8e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -6,14 +6,14 @@ extern crate serde;
extern crate chrono;
#[macro_use] extern crate serde_derive;
-use reqwest::r#async::Client as ReqwestClient;
-
pub mod helix;
pub mod kraken;
pub mod types;
pub mod error;
pub mod sync;
pub mod namespace;
+pub mod client;
+pub mod models;
pub use self::helix::Client as HelixClient;
-pub use self::kraken::Client as KrakenClient;
+//pub use self::kraken::Client as KrakenClient;
diff --git a/src/models.rs b/src/models.rs
new file mode 100644
index 0000000..f68e60f
--- /dev/null
+++ b/src/models.rs
@@ -0,0 +1,16 @@
+extern crate serde_json;
+
+use crate::client::PaginationTrait;
+
+impl PaginationTrait for Credentials {
+ fn cursor<'a>(&'a self) -> Option<&'a str> { None }
+}
+
+#[derive(Debug, Deserialize)]
+pub struct Credentials {
+ pub access_token: String,
+ pub refresh_token: Option<String>,
+ pub expires_in: u32,
+ pub scope: Option<Vec<String>>,
+ pub token_type: String,
+}
diff --git a/src/namespace.rs b/src/namespace.rs
deleted file mode 100644
index e69de29..0000000
--- a/src/namespace.rs
+++ /dev/null
diff --git a/src/namespace/auth.rs b/src/namespace/auth.rs
new file mode 100644
index 0000000..577aa92
--- /dev/null
+++ b/src/namespace/auth.rs
@@ -0,0 +1,55 @@
+use std::collections::BTreeMap;
+use crate::models::Credentials;
+use crate::client::Client;
+const ID_DOMAIN: &'static str = "id.twitch.tv";
+use crate::client::{ClientTrait, ApiRequest};
+use reqwest::Method;
+use std::marker::PhantomData;
+
+pub struct Namespace<T> {
+ client: Client,
+ _type: PhantomData<T>
+}
+
+impl<T> Namespace<T> {
+ pub fn new(client: &Client) -> Self {
+ Namespace {
+ client: client.clone(),
+ _type: PhantomData,
+ }
+ }
+}
+
+pub struct Auth {}
+type AuthNamespace = Namespace<Auth>;
+
+impl AuthNamespace {
+ pub fn client_credentials(self, secret: &str)
+ -> ApiRequest<Credentials> {
+ use self::client_credentials;
+ client_credentials(self.client, secret)
+ }
+}
+
+impl Client {
+ pub fn auth(&self) -> AuthNamespace {
+ AuthNamespace::new(self)
+ }
+}
+
+//TODO: Implement scopes
+pub fn client_credentials(client: Client, secret: &str)
+ -> ApiRequest<Credentials> {
+
+ let url =
+ String::from("https://") +
+ ID_DOMAIN + "/oauth2/token";
+
+ let mut params = BTreeMap::new();
+ params.insert("client_id", client.id());
+ params.insert("client_secret", secret);
+ params.insert("grant_type", "client_credentials");
+ params.insert("scope", "");
+
+ ApiRequest::new(url, params, client.clone(), Method::POST, None)
+}
diff --git a/src/namespace/mod.rs b/src/namespace/mod.rs
new file mode 100644
index 0000000..0e4a05d
--- /dev/null
+++ b/src/namespace/mod.rs
@@ -0,0 +1 @@
+pub mod auth;