diff options
-rw-r--r-- | src/bin/main.rs | 43 | ||||
-rw-r--r-- | src/client.rs | 406 | ||||
-rw-r--r-- | src/error.rs | 79 | ||||
-rw-r--r-- | src/helix/mod.rs | 52 | ||||
-rw-r--r-- | src/helix/namespaces/videos.rs | 4 | ||||
-rw-r--r-- | src/kraken/mod.rs | 26 | ||||
-rw-r--r-- | src/kraken/models.rs | 18 | ||||
-rw-r--r-- | src/kraken/namespaces/mod.rs | 1 | ||||
-rw-r--r-- | src/kraken/namespaces/users.rs | 32 | ||||
-rw-r--r-- | src/lib.rs | 5 | ||||
-rw-r--r-- | src/models.rs | 14 | ||||
-rw-r--r-- | src/sync/mod.rs | 42 | ||||
-rw-r--r-- | src/sync/waiter.rs | 1 | ||||
-rw-r--r-- | src/types.rs | 4 | ||||
-rw-r--r-- | tests/common/mod.rs | 32 | ||||
-rw-r--r-- | tests/helix.rs | 253 | ||||
-rw-r--r-- | tests/kraken.rs | 76 |
17 files changed, 906 insertions, 182 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs index b8025d9..a545ec1 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -3,22 +3,28 @@ extern crate futures; extern crate serde; extern crate tokio; extern crate twitch_api; +extern crate env_logger; use futures::future::Future; -use futures::Stream; use std::env; use twitch_api::HelixClient; use twitch_api::KrakenClient; -use std::str::FromStr; - -use twitch_api::types::UserId; -use twitch_api::types::ClipId; +use twitch_api::ClientConfig; +use twitch_api::client::RatelimitMap; fn main() { dotenv::dotenv().unwrap(); + env_logger::init(); + + let config = ClientConfig { + max_retrys: 0, + ratelimits: RatelimitMap::empty(), + ..ClientConfig::default() + }; + let client_id = &env::var("TWITCH_API").unwrap(); - let helix_client = HelixClient::new(client_id); + let helix_client = HelixClient::new_with_config(client_id, config); let kraken_client = KrakenClient::new(client_id); /* @@ -26,13 +32,15 @@ fn main() { .build(); */ +/* let clip = helix_client .clips() - .clip(&ClipId::new("EnergeticApatheticTarsierThisIsSparta")) + .clip(&"EnergeticApatheticTarsierThisIsSparta") .map_err(|err| { println!("{:?}", err); () }); + */ /* let clip2 = authed_client @@ -60,6 +68,7 @@ fn main() { */ +/* let clip2 = kraken_client .clips() .clip(&"EnergeticApatheticTarsierThisIsSparta") @@ -67,12 +76,20 @@ fn main() { println!("{:?}", err); () }); +*/ + - let u = helix_client - .users() - .users(&vec!(), &vec!("freakey")) - .map(|res| {println!("{:?}", res); ()}) - .map_err(|res| {println!("{:?}", res); ()}); + let f = futures::future::ok(1).and_then(move |_| { + for i in 0..80 { + let u = helix_client + .users() + .users(&vec!(), &vec!("freakey")) + .map(|res| {println!("{:?}", res); ()}) + .map_err(|res| {println!("{:?}", res); ()}); + tokio::spawn(u); + } + Ok(()) + }); /* Prevents tokio from **hanging** * since tokio::run blocks the current thread and waits for the entire runtime @@ -81,7 +98,7 @@ fn main() { */ //std::mem::drop(authed_client); tokio::run( - u + f /* clip.join(clip2) .and_then(|(c1, c2)| { diff --git a/src/client.rs b/src/client.rs index a8bc0b5..0307a05 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,10 @@ +use crate::models::Message; +use std::convert::TryFrom; use futures::future::Future; use std::sync::{Arc, Mutex}; use reqwest::r#async::Client as ReqwestClient; +use reqwest::Error as ReqwestError; +use reqwest::r#async::{Request, Response}; use std::collections::{HashSet, HashMap}; use super::error::Error; @@ -9,6 +13,8 @@ use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; +use serde_json::Value; +use futures::future::Either; use crate::error::ConditionError; @@ -18,7 +24,10 @@ pub use super::types; pub enum RatelimitKey { Default, } -type RatelimitMap = HashMap<RatelimitKey, Ratelimit>; + +pub struct RatelimitMap { + pub inner: HashMap<RatelimitKey, Ratelimit> +} const API_DOMAIN: &'static str = "api.twitch.tv"; const AUTH_DOMAIN: &'static str = "id.twitch.tv"; @@ -35,12 +44,170 @@ pub struct Client { inner: Arc<ClientType>, } +#[derive(Debug)] +pub struct ScopeParseError {} +use std::fmt; +impl fmt::Display for ScopeParseError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Scope Parse Error") + } +} + /*TODO*/ -#[derive(PartialEq, Hash, Eq, Clone)] +#[derive(PartialEq, Hash, Eq, Clone, Debug)] pub enum Scope { + Helix(HelixScope), + Kraken(KrakenScope), +} + +impl TryFrom<&str> for Scope { + type Error = ScopeParseError; + fn try_from(s: &str) -> Result<Scope, Self::Error> { + if let Ok(scope) = HelixScope::try_from(s) { + return Ok(Scope::Helix(scope)); + } + if let Ok(scope) = KrakenScope::try_from(s) { + return Ok(Scope::Kraken(scope)); + } + Err(ScopeParseError {}) + } +} +use serde::{Deserialize, Deserializer}; +impl<'de> Deserialize<'de> for Scope { + + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where D: Deserializer<'de> + { + let id = String::deserialize(deserializer)?; + Scope::try_from(&id[0..]).map_err(serde::de::Error::custom) + } +} + +#[derive(PartialEq, Hash, Eq, Clone, Debug)] +pub enum HelixScope { + AnalyticsReadExtensions, + AnalyticsReadGames, + BitsRead, + ChannelReadSubscriptions, + ClipsEdit, + UserEdit, + UserEditBroadcast, + UserReadBroadcast, UserReadEmail, } +impl HelixScope { + pub fn to_str(&self) -> &'static str { + use self::HelixScope::*; + match self { + AnalyticsReadExtensions => "analytics:read:extensions", + AnalyticsReadGames => "analytics:read:games", + BitsRead => "bits:read", + ChannelReadSubscriptions => "channel:read:subscriptions", + ClipsEdit => "clips:edit", + UserEdit => "user:edit", + UserEditBroadcast => "user:edit:broadcast", + UserReadBroadcast => "user:read:broadcast", + UserReadEmail => "user:read:email", + } + } +} + +impl TryFrom<&str> for HelixScope { + type Error = ScopeParseError; + fn try_from(s: &str) -> Result<HelixScope, Self::Error> { + use self::HelixScope::*; + Ok( match s { + "analytics:read:extensions" => AnalyticsReadExtensions, + "analytics:read:games" => AnalyticsReadGames, + "bits:read" => BitsRead, + "channel:read:subscriptions" => ChannelReadSubscriptions, + "clips:edit" => ClipsEdit, + "user:edit" => UserEdit, + "user:edit:broadcast" => UserEditBroadcast, + "user:read:broadcast" => UserReadBroadcast, + "user:read:email" => UserReadEmail, + _ => return Err(ScopeParseError{}) + }) + } +} + +#[derive(PartialEq, Hash, Eq, Clone, Debug)] +pub enum KrakenScope { + ChannelCheckSubscription, + ChannelCommercial, + ChannelEditor, + ChannelFeedEdit, + ChannelFeedRead, + ChannelRead, + ChannelStream, + ChannelSubscriptions, + CollectionsEdit, + CommunitiesEdit, + CommunitiesModerate, + Openid, + UserBlocksEdit, + UserBlocksRead, + UserFollowsEdit, + UserRead, + UserSubscriptions, + ViewingActivityRead, +} + +impl KrakenScope { + pub fn to_str(&self) -> &'static str { + use self::KrakenScope::*; + match self { + ChannelCheckSubscription => "channel_check_subscription", + ChannelCommercial => "channel_commercial", + ChannelEditor => "channel_editor", + ChannelFeedEdit => "channel_feed_edit", + ChannelFeedRead => "channel_feed_read", + ChannelRead => "channel_read", + ChannelStream => "channel_stream", + ChannelSubscriptions => "channel_subscriptions", + CollectionsEdit => "collections_edit", + CommunitiesEdit => "communities_edit", + CommunitiesModerate => "communities_moderate", + Openid => "openid", + UserBlocksEdit => "user_blocks_edit", + UserBlocksRead => "user_blocks_read", + UserFollowsEdit => "user_follows_edit", + UserRead => "user_read", + UserSubscriptions => "user_subscriptions", + ViewingActivityRead => "viewing_activity_read", + } + } +} + +impl TryFrom<&str> for KrakenScope { + type Error = ScopeParseError; + fn try_from(s: &str) -> Result<KrakenScope, Self::Error> { + use self::KrakenScope::*; + Ok( match s { + "channel_check_subscription" => ChannelCheckSubscription, + "channel_commercial" => ChannelCommercial, + "channel_editor" => ChannelEditor, + "channel_feed_edit" => ChannelFeedEdit, + "channel_feed_read" => ChannelFeedRead, + "channel_read" => ChannelRead, + "channel_stream" => ChannelStream, + "channel_subscriptions" => ChannelSubscriptions, + "collections_edit" => CollectionsEdit, + "communities_edit" => CommunitiesEdit, + "communities_moderate" => CommunitiesModerate, + "openid" => Openid, + "user_blocks_edit" => UserBlocksEdit, + "user_blocks_read" => UserBlocksRead, + "user_follows_edit" => UserFollowsEdit, + "user_read" => UserRead, + "user_subscriptions" => UserSubscriptions, + "viewing_activity_read" => ViewingActivityRead, + _ => return Err(ScopeParseError {}) + }) + } +} + #[derive(Clone)] pub enum Version { Helix, @@ -62,17 +229,94 @@ impl Client { } } +pub struct TestConfigRef { + pub requests: Vec<Result<Request, ReqwestError>>, + pub responses: Vec<Response>, +} + +#[derive(Clone)] +pub struct TestConfig { + pub inner: Arc<Mutex<TestConfigRef>> +} + +impl TestConfig { + + pub fn push_response(&self, response: Response) { + let inner = &mut self.inner.lock().unwrap(); + inner.responses.push(response); + } +} + +impl Default for TestConfig { + + fn default() -> Self { + TestConfig { + inner: Arc::new( + Mutex::new( + TestConfigRef { + requests: Vec::new(), + responses: Vec::new(), + } + ) + ) + } + } +} + enum ClientType { Unauth(UnauthClient), Auth(AuthClient), } + +pub struct ClientConfig { + pub reqwest: ReqwestClient, + pub domain: String, + pub auth_domain: String, + pub ratelimits: RatelimitMap, + pub max_retrys: u32, + pub test_config: Option<TestConfig>, +} + +impl Default for RatelimitMap { + + fn default() -> Self { + let mut limits = HashMap::new(); + limits.insert(RatelimitKey::Default, Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset")); + RatelimitMap { + inner: limits + } + } +} + +impl RatelimitMap { + pub fn empty() -> RatelimitMap { + RatelimitMap { + inner: HashMap::new() + } + } +} + +impl Default for ClientConfig { + + fn default() -> Self { + let reqwest = ReqwestClient::new(); + let ratelimits = RatelimitMap::default(); + + ClientConfig { + reqwest, + domain: API_DOMAIN.to_owned(), + auth_domain: AUTH_DOMAIN.to_owned(), + ratelimits, + max_retrys: 1, + test_config: None, + } + } +} + pub struct UnauthClient { id: String, - reqwest: ReqwestClient, - domain: String, - auth_domain: String, - ratelimits: RatelimitMap, + config: ClientConfig, version: Version, } @@ -86,6 +330,7 @@ pub struct AuthClient { pub trait ClientTrait { fn id<'a>(&'a self) -> &'a str; + fn config<'a>(&'a self) -> &'a ClientConfig; fn domain<'a>(&'a self) -> &'a str; fn auth_domain<'a>(&'a self) -> &'a str; fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit>; @@ -100,21 +345,25 @@ impl ClientTrait for UnauthClient { } fn domain<'a>(&'a self) -> &'a str { - &self.domain + &self.config.domain } fn auth_domain<'a>(&'a self) -> &'a str { - &self.auth_domain + &self.config.auth_domain } fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { - self.ratelimits.get(&key) + self.config.ratelimits.inner.get(&key) } fn authenticated(&self) -> bool { false } + fn config<'a>(&'a self) -> &'a ClientConfig { + &self.config + } + fn scopes(&self) -> Vec<Scope> { Vec::with_capacity(0) } @@ -146,6 +395,14 @@ impl ClientTrait for Client { } } + fn config<'a>(&'a self) -> &'a ClientConfig { + use self::ClientType::*; + match self.inner.as_ref() { + Unauth(inner) => inner.config(), + Auth(inner) => inner.config(), + } + } + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { use self::ClientType::*; match self.inner.as_ref() { @@ -194,6 +451,13 @@ impl ClientTrait for AuthClient { } } + fn config<'a>(&'a self) -> &'a ClientConfig { + match self.previous.inner.as_ref() { + ClientType::Auth(auth) => auth.config(), + ClientType::Unauth(unauth) => unauth.config(), + } + } + fn ratelimit<'a>(&'a self, key: RatelimitKey) -> Option<&'a Ratelimit> { match self.previous.inner.as_ref() { ClientType::Auth(auth) => auth.ratelimit(key), @@ -208,7 +472,7 @@ impl ClientTrait for AuthClient { fn scopes(&self) -> Vec<Scope> { let auth = self.auth_state.lock().expect("Auth Lock is poisoned"); - Vec::with_capacity(0) + auth.scopes.clone() } } @@ -225,28 +489,13 @@ struct AuthStateRef { } impl Client { - pub fn new(id: &str, version: Version) -> Client { + pub fn new(id: &str, config: ClientConfig, version: Version) -> Client { let client = ReqwestClient::new(); - Client::new_with_client(id, client, version) - } - - fn default_ratelimits() -> RatelimitMap { - let mut limits = RatelimitMap::new(); - limits.insert(RatelimitKey::Default, Ratelimit::new(30, "Ratelimit-Limit", "Ratelimit-Remaining", "Ratelimit-Reset")); - - limits - } - - pub fn new_with_client(id: &str, reqwest: ReqwestClient, version: Version) -> Client { - Client { inner: Arc::new( ClientType::Unauth(UnauthClient { id: id.to_owned(), - reqwest: reqwest, - domain: API_DOMAIN.to_owned(), - auth_domain: AUTH_DOMAIN.to_owned(), - ratelimits: Self::default_ratelimits(), + config: config, version: version, })) } @@ -271,11 +520,23 @@ impl Client { fn reqwest(&self) -> ReqwestClient { use self::ClientType::*; match self.inner.as_ref() { - Unauth(inner) => inner.reqwest.clone(), + Unauth(inner) => inner.config.reqwest.clone(), Auth(inner) => inner.previous.reqwest(), } } + fn send(&self, builder: RequestBuilder) -> Box<dyn Future<Item=Response, Error=reqwest::Error> + Send> { + if let Some(test_config) = &self.config().test_config { + let config: &mut TestConfigRef = &mut test_config.inner.lock().expect("Test Config poisoned"); + println!("{}", config.responses.len()); + config.requests.push(builder.build()); + let res = config.responses.pop().expect("Ran out of test responses!"); + Box::new(futures::future::ok(res)) + } else { + Box::new(builder.send()) + } + } + /* The 'bottom' client must always be a client that is not authorized. * This which allows for calls to Auth endpoints using the same control flow * as other requests. @@ -437,7 +698,7 @@ enum RequestState<T> { SetupRatelimit, WaitLimit(WaiterState<RatelimitWaiter>), WaitRequest, - PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), + PollParse(Box<dyn Future<Item=T, Error=Error> + Send>), } pub struct ApiRequest<T> { @@ -468,11 +729,12 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> ApiRequest<T> { ratelimit: Option<RatelimitKey>, ) -> ApiRequest<T> { + let max_attempts = client.config().max_retrys; ApiRequest { inner: Arc::new(RequestRef::new(url, params, client, method, ratelimit)), state: RequestState::SetupRequest, attempt: 0, - max_attempts: 1, + max_attempts, pagination: None, } } @@ -582,7 +844,6 @@ impl RatelimitRef { } } -use futures::future::SharedError; use crate::sync::barrier::Barrier; use crate::sync::waiter::Waiter; @@ -662,10 +923,13 @@ impl Waiter for AuthWaiter { let mut auth = inner.auth_state.lock().unwrap(); auth.state = AuthState::Auth; auth.token = Some(credentials.access_token.clone()); + if let Some(scopes) = credentials.scope { + for scope in scopes { auth.scopes.push(scope) } + } } () }) - .map_err(|_| ConditionError{}); + .map_err(|err| err.into()); Future::shared(Box::new(auth_future)) } @@ -694,15 +958,11 @@ impl Waiter for RatelimitWaiter { limits.remaining = limits.limit; () }) - .map_err(|_| ConditionError{}) + .map_err(|err| Error::from(err).into()) )) } } -/* Todo: If the polled futures returns an error than all the waiters should - * get that error - */ - /* Macro ripped directly from try_ready and simplies retries if any error occurs * and there are remaning retry attempt */ @@ -739,7 +999,7 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> Stream for Iterable inner: self.inner.clone(), state: RequestState::SetupRequest, attempt: 0, - max_attempts: 1, + max_attempts: self.inner.client.config().max_retrys, pagination: None }); }, @@ -762,7 +1022,7 @@ impl<T: DeserializeOwned + PaginationTrait + 'static + Send> Stream for Iterable inner: self.inner.clone(), state: RequestState::SetupRequest, attempt: 0, - max_attempts: 1, + max_attempts: self.inner.client.config().max_retrys, pagination: Some(cursor.to_owned()), }); }, @@ -865,37 +1125,55 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { }; - let key_err = self.inner.ratelimit.clone(); - let key_ok = self.inner.ratelimit.clone(); - let client_err = client.clone(); - let client_ok = client.clone(); - + let ratelimit_key = self.inner.ratelimit.clone(); + let client_cloned = client.clone(); + /* + Allow testing by capturing the request and returning a predetermined response + If testing is set in the client config then `Pending` is captured and saved and a future::ok(Resposne) is returned. + */ let f = - builder.send() - .map_err(move |err| { - if let Some(key) = key_err { - if let Some(limits) = client_err.ratelimit(key) { + client.send(builder) + .then(move |result| { + trace!("[TWITCH_API] {:?}", result); + if let Some(ratelimit_key) = ratelimit_key { + if let Some(limits) = client_cloned.ratelimit(ratelimit_key) { let mut mut_limits = limits.inner.lock().unwrap(); mut_limits.inflight = mut_limits.inflight - 1; } } - - err + result }) - .map(move |mut response| { - println!("{:?}", response); - if let Some(key) = key_ok { - if let Some(limits) = client_ok.ratelimit(key) { - let mut mut_limits = limits.inner.lock().unwrap(); - mut_limits.inflight = mut_limits.inflight - 1; - mut_limits.update_from_headers(response.headers()); - } + .map_err(|err| err.into()) + .and_then(|mut response| { + let status = response.status(); + if status.is_success() { + Either::A( + response.json().map_err(|err| Error::from(err)).and_then(|json| { + trace!("[TWITCH_API] {}", json); + serde_json::from_value(json).map_err(|err| err.into()) + }) + ) + } else { + Either::B( + response.json::<Message>() + .then(|res| { + match res { + Ok(message) => futures::future::err(Some(message)), + Err(_err) => futures::future::err(None) + } + }) + .map_err(move |maybe_message| { + let status = response.status(); + if status == 401 || status == 403 { + Error::auth_error(maybe_message) + } else if status == 429 { + Error::ratelimit_error(maybe_message) + } else { + Error::auth_error(maybe_message) + } + }) + ) } - - response.json::<T>() - }) - .and_then(|json| { - json }); self.state = RequestState::PollParse(Box::new(f)); }, @@ -906,4 +1184,4 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { } } } -} +}
\ No newline at end of file diff --git a/src/error.rs b/src/error.rs index 88f2713..c291b5d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,24 +1,35 @@ use reqwest::Error as ReqwestError; use futures::future::SharedError; use std::convert::From; +use std::sync::Arc; +use serde_json::Error as JsonError; +use crate::models::Message; -/*TODO: How should condition errors be handled? - * Ultimately the future must resolve so if the condition - * errs than all it's waiters must err. - */ #[derive(Clone, Debug)] -pub struct ConditionError{} +pub struct ConditionError { + inner: Arc<Error>, +} impl From<SharedError<ConditionError>> for ConditionError { fn from(other: SharedError<ConditionError>) -> Self { - ConditionError{} + (*other).clone() + } +} + +impl From<Error> for ConditionError { + fn from(other: Error) -> Self { + ConditionError{ inner: Arc::new(other) } } } #[derive(Debug)] enum Kind { Reqwest(ReqwestError), - ClientError(String), + ConditionError(ConditionError), + Io(std::io::Error), + Json(JsonError), + AuthError(Option<Message>), + RatelimitError(Option<Message>), } #[derive(Debug)] @@ -26,50 +37,58 @@ pub struct Error { inner: Kind } +impl Error { + pub fn auth_error(message: Option<Message>) -> Error { + Error { inner: Kind::AuthError(message) } + } -impl From<reqwest::Error> for Error { + pub fn ratelimit_error(message: Option<Message>) -> Error { + Error { inner: Kind::RatelimitError(message) } + } - fn from(err: ReqwestError) -> Error { - Error { - inner: Kind::Reqwest(err) + pub fn is_auth_error(&self) -> bool { + match &self.inner { + Kind::AuthError(_) => true, + Kind::ConditionError(condition) => condition.inner.is_auth_error(), + _ => false, } } -} - -impl From<()> for Error { - fn from(err: ()) -> Error { - Error { - inner: Kind::ClientError("Internal error".to_owned()) + pub fn is_ratelimit_error(&self) -> bool { + match &self.inner { + Kind::RatelimitError(_) => true, + Kind::ConditionError(condition) => condition.inner.is_ratelimit_error(), + _ => false, } } } -impl From<futures::Canceled> for Error { - fn from(_err: futures::Canceled) -> Error { +impl From<reqwest::Error> for Error { + + fn from(err: ReqwestError) -> Error { Error { - inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned()) + inner: Kind::Reqwest(err) } } } -use std::sync::mpsc::SendError; +impl From<std::io::Error> for Error { + fn from(err: std::io::Error) -> Self { + Error { inner: Kind::Io(err) } + } +} -impl<T> From<SendError<T>> for Error { +impl From<JsonError> for Error { - fn from(_err: SendError<T>) -> Error { - Error { - inner: Kind::ClientError("Channel unexpectedly closed".to_owned()) - } + fn from(err: JsonError) -> Error { + Error { inner: Kind::Json(err) } } } impl From<ConditionError> for Error { - fn from(_err: ConditionError) -> Error { - Error { - inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned()) - } + fn from(err: ConditionError) -> Error { + Error { inner: Kind::ConditionError(err) } } } diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 53a4ffb..da24aa4 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -1,24 +1,12 @@ -pub mod models; -pub mod namespaces; - use crate::client::Client as GenericClient; -use crate::client::Version; +use crate::client::{Version, ClientConfig}; use crate::client::ClientTrait; -/* -#[derive(PartialEq, Hash, Eq, Clone)] -pub enum Scope { - AnalyticsReadExtensions, - AnalyticsReadGames, - BitsRead, - ClipsEdit, - UserEdit, - UserEditBroadcast, - UserReadBroadcast, - UserReadEmail, -} -*/ -use crate::client::Scope; +use crate::client::{HelixScope, Scope}; + +pub mod models; +pub mod namespaces; + #[derive(Clone)] pub struct Client { @@ -27,14 +15,32 @@ pub struct Client { impl Client { pub fn new(id: &str) -> Client { + let config = ClientConfig::default(); Client { - inner: GenericClient::new(id, Version::Helix) + inner: GenericClient::new(id, config, Version::Helix) + } + } + + pub fn new_with_config(id: &str, config: ClientConfig) -> Client { + Client { + inner: GenericClient::new(id, config, Version::Helix) } } pub fn authenticate(self, secret: &str) -> AuthClientBuilder { AuthClientBuilder::new(self, secret) } + + pub fn id<'a>(&'a self) -> &'a str { &self.inner.id() } + pub fn domain<'a>(&'a self) -> &'a str { &self.inner.domain() } + pub fn auth_domain<'a>(&'a self) -> &'a str { &self.inner.auth_domain() } + pub fn authenticated(&self) -> bool { self.inner.authenticated() } + + pub fn scopes(&self) -> Vec<HelixScope> { + self.inner.scopes().into_iter().filter_map(|item| { + if let Scope::Helix(scope) = item { Some(scope) } else { None } + }).collect() + } } use crate::client::AuthClientBuilder as GenericAuthClientBuilder; @@ -57,15 +63,15 @@ impl AuthClientBuilder { } } - pub fn scope(self, scope: Scope) -> AuthClientBuilder { + pub fn scope(self, scope: HelixScope) -> AuthClientBuilder { AuthClientBuilder { - inner: self.inner.scope(scope) + inner: self.inner.scope(Scope::Helix(scope)) } } - pub fn scopes(self, scopes: Vec<Scope>) -> AuthClientBuilder { + pub fn scopes(self, scopes: Vec<HelixScope>) -> AuthClientBuilder { AuthClientBuilder { - inner: self.inner.scopes(scopes) + inner: self.inner.scopes(scopes.into_iter().map(|e| Scope::Helix(e)).collect()) } } diff --git a/src/helix/namespaces/videos.rs b/src/helix/namespaces/videos.rs index 9f2fd2b..b603b8f 100644 --- a/src/helix/namespaces/videos.rs +++ b/src/helix/namespaces/videos.rs @@ -13,13 +13,13 @@ impl VideosNamespace { by_id(self.client, ids) } - pub fn by_user(self, user_id: &UserId) + pub fn by_user<S: ToString>(self, user_id: &S) -> IterableApiRequest<PaginationContainer<Video>> { use self::by_user; by_user(self.client, user_id) } - pub fn for_game(self, game_id: &GameId) + pub fn for_game<S: ToString>(self, game_id: &S) -> IterableApiRequest<PaginationContainer<Video>> { use self::for_game; for_game(self.client, game_id) diff --git a/src/kraken/mod.rs b/src/kraken/mod.rs index fbec045..4046377 100644 --- a/src/kraken/mod.rs +++ b/src/kraken/mod.rs @@ -1,7 +1,10 @@ use crate::client::Client as GenericClient; -use crate::client::Version; +use crate::client::{Version, ClientConfig}; +use crate::client::ClientTrait; pub use super::types; +use crate::client::{KrakenScope, Scope}; + mod namespaces; pub mod models; @@ -12,17 +15,34 @@ pub struct Client { impl Client { pub fn new(id: &str) -> Client { + let config = ClientConfig::default(); + Client { + inner: GenericClient::new(id, config, Version::Kraken) + } + } + + pub fn new_with_config(id: &str, config: ClientConfig) -> Client { Client { - inner: GenericClient::new(id, Version::Kraken) + inner: GenericClient::new(id, config, Version::Kraken) } } pub fn authenticate(self, secret: &str) -> AuthClientBuilder { AuthClientBuilder::new(self, secret) } + + pub fn id<'a>(&'a self) -> &'a str { &self.inner.id() } + pub fn domain<'a>(&'a self) -> &'a str { &self.inner.domain() } + pub fn auth_domain<'a>(&'a self) -> &'a str { &self.inner.auth_domain() } + pub fn authenticated(&self) -> bool { self.inner.authenticated() } + + pub fn scopes(&self) -> Vec<KrakenScope> { + self.inner.scopes().into_iter().filter_map(|item| { + if let Scope::Kraken(scope) = item { Some(scope) } else { None } + }).collect() + } } -use crate::client::Scope; use crate::client::AuthClientBuilder as GenericAuthClientBuilder; pub struct AuthClientBuilder { diff --git a/src/kraken/models.rs b/src/kraken/models.rs index e8f2d6c..5b8c30b 100644 --- a/src/kraken/models.rs +++ b/src/kraken/models.rs @@ -8,6 +8,20 @@ use super::types::{UserId, VideoId}; use crate::client::PaginationTrait; #[derive(Debug, Deserialize, Serialize)] +pub struct User { + pub _id: String, + pub bio: String, + pub created_at: DateTime<Utc>, + pub display_name: String, + #[serde(with = "url_serde")] + pub logo: Url, + pub name: String, + #[serde(rename = "type")] + pub user_type: String, + pub updated_at: DateTime<Utc>, +} + +#[derive(Debug, Deserialize, Serialize)] pub struct Clip { pub slug: String, pub tracking_id: String, @@ -32,6 +46,10 @@ impl PaginationTrait for Clip { fn cursor<'a>(&'a self) -> Option<&'a str> { None } } +impl PaginationTrait for User { + fn cursor<'a>(&'a self) -> Option<&'a str> { None } +} + #[derive(Debug, Deserialize, Serialize)] pub struct Thumbnails { diff --git a/src/kraken/namespaces/mod.rs b/src/kraken/namespaces/mod.rs index 5f4b421..d8a065f 100644 --- a/src/kraken/namespaces/mod.rs +++ b/src/kraken/namespaces/mod.rs @@ -3,6 +3,7 @@ pub use super::models; pub use super::Client; pub mod clips; +pub mod users; pub struct Namespace<T> { client: Client, diff --git a/src/kraken/namespaces/users.rs b/src/kraken/namespaces/users.rs new file mode 100644 index 0000000..d4adb8e --- /dev/null +++ b/src/kraken/namespaces/users.rs @@ -0,0 +1,32 @@ +use std::collections::BTreeMap; +use super::super::models::{User}; +use super::super::Client; +use crate::client::{RatelimitKey, ClientTrait, ApiRequest}; +use reqwest::Method; +use super::Namespace; + +pub struct Users {} +type UsersNamespace = Namespace<Users>; + +impl UsersNamespace { + pub fn by_id(self, id: &str) -> ApiRequest<User> { + use self::by_id; + by_id(self.client, id) + } +} + +impl Client { + pub fn users(&self) -> UsersNamespace { + UsersNamespace::new(self) + } +} + +pub fn by_id(client: Client, id: &str) + -> ApiRequest<User> +{ + let client = client.inner; + let url = String::from("https://") + client.domain() + "/kraken/users/" + id; + let params = BTreeMap::new(); + + ApiRequest::new(url, params, client, Method::GET, Some(RatelimitKey::Default)) +} @@ -1,10 +1,12 @@ #![recursion_limit="128"] -#![feature(option_replace)] +#![feature(try_from)] extern crate futures; extern crate reqwest; extern crate serde; extern crate chrono; +extern crate serde_json; #[macro_use] extern crate serde_derive; +#[macro_use] extern crate log; pub mod helix; pub mod kraken; @@ -17,3 +19,4 @@ pub mod models; pub use self::helix::Client as HelixClient; pub use self::kraken::Client as KrakenClient; +pub use self::client::{ClientConfig, TestConfig}; diff --git a/src/models.rs b/src/models.rs index f68e60f..cc5442c 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,16 +1,28 @@ extern crate serde_json; use crate::client::PaginationTrait; +use crate::client::Scope; impl PaginationTrait for Credentials { fn cursor<'a>(&'a self) -> Option<&'a str> { None } } +impl PaginationTrait for Message { + fn cursor<'a>(&'a self) -> Option<&'a str> { None } +} + #[derive(Debug, Deserialize)] pub struct Credentials { pub access_token: String, pub refresh_token: Option<String>, pub expires_in: u32, - pub scope: Option<Vec<String>>, + pub scope: Option<Vec<Scope>>, pub token_type: String, } + +#[derive(Debug, Deserialize)] +pub struct Message { + pub error: Option<String>, + pub message: String, + pub status: u32, +}
\ No newline at end of file diff --git a/src/sync/mod.rs b/src/sync/mod.rs index ca06d32..74e4235 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,44 +1,2 @@ -//f.barrier(auth).barrier(ratelimit).and_then(|result| {}) -//A ratelimiter must be aware when a limit is hit, the upper limit, -//and remaining requests. (use case specific) -// -//This can be done by either letting the ratelimiter drive the request -//so it can inspect returned headers or by maybe? using a channel to inform -//the limiter -// -//Submit task to ratelimiter. -//Check if the limit is hit and if we are polling -// 1 if we hit the limit and are not polling, add to the queue and start -// polling. -// 2. if we are polling add the request to the queue -// 3. if we are not polling and not locked then -// send the request and increment the in-flight counter. -// -// when the request has completed without errors then decrement -// the in-flight counter, update limiter data, and return the -// result to the requester. -// -// On error, EITHER: -// 1. If the error is rate limiter related place the request -// back in a queue, return other errors. (Prevents starvation) -// 2. Return all errors back to the Requester they can resubmit -// the request -// -// The main difference is that the condition is dependent on the waiter's -// future result. -// -// For auth requests we can use an OkFuture that returns the waiter and never errs -// -// So waiters must provide IntoFuture, a future than can poll the condition, -// and a is locked. -// The lock check must be pure (no side effects) but IntoFuture may -// have side effects (eg. increments in-flight counter) -// -// The result of the IntoFuture is returned to caller or the Err of the poll -// Future. For simplicity these will be the same type. -// -// Should the poll condition trait be located on the Waiter or the Barrier? -// All waiters in a barrier must use the same condition. - pub mod barrier; pub mod waiter; diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs index 8039280..1005e3d 100644 --- a/src/sync/waiter.rs +++ b/src/sync/waiter.rs @@ -1,4 +1,3 @@ -use futures::sync::oneshot; use futures::Future; use futures::future::{Shared, SharedError}; use crate::error::ConditionError; diff --git a/src/types.rs b/src/types.rs index 56891d2..38ab50c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -177,12 +177,12 @@ mod tests { let u2 = u2.unwrap(); assert_eq!(u1, u2); - assert_eq!(u1, 1234); + //assert_eq!(u1, 1234); assert_eq!(&u1, "1234"); let u2 = UserId::from_str("1235").unwrap(); assert_ne!(u1, u2); - assert_ne!(u1, 1235); + //assert_ne!(u1, 1235); assert_ne!(&u1, "1235"); /* This must give a compile error */ diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..ff2eb08 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,32 @@ +extern crate twitch_api; +extern crate hyper; +extern crate futures; +extern crate reqwest; +extern crate url; +extern crate http; + +use tokio::runtime::current_thread::Runtime; +use twitch_api::{ClientConfig, TestConfig}; +use reqwest::r#async::Response; +use http::response::Builder; + +pub const CLIENT_ID: &str = "cfabdegwdoklmawdzdo98xt2fo512y"; +pub const CLIENT_SECRET: &str = "nyo51xcdrerl8z9m56w9w6wg"; + +pub fn test_config() -> (ClientConfig, TestConfig) { + let test_config = TestConfig::default(); + (ClientConfig { + test_config: Some(test_config.clone()), + max_retrys: 0, + ..ClientConfig::default() + }, test_config) +} + +pub fn okay_response(data: &'static str) -> Response { + + let response = + Builder::new() + .status(200) + .body(data).unwrap(); + Response::from(response) +}
\ No newline at end of file diff --git a/tests/helix.rs b/tests/helix.rs new file mode 100644 index 0000000..8c85c3b --- /dev/null +++ b/tests/helix.rs @@ -0,0 +1,253 @@ +extern crate twitch_api; +extern crate tokio; +extern crate hyper; +extern crate futures; +extern crate url; +extern crate http; + +pub mod common; + +use futures::Stream; +use twitch_api::HelixClient; +use twitch_api::{ClientConfig, TestConfig}; +use twitch_api::client::ClientTrait; +use twitch_api::error::Error; +use twitch_api::client::HelixScope; + + +use tokio::runtime::current_thread::Runtime; +use reqwest::r#async::Response; +use http::response::Builder; + +use crate::common::*; + +const USER_RESPONSE: &str = r#"{ + "data": [{ + "id": "44322889", + "login": "dallas", + "display_name": "dallas", + "type": "staff", + "broadcaster_type": "", + "description": "Just a gamer playing games and chatting. :)", + "profile_image_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/dallas-profile_image-1a2c906ee2c35f12-300x300.png", + "offline_image_url": "https://static-cdn.jtvnw.net/jtv_user_pictures/dallas-channel_offline_image-1a2c906ee2c35f12-1920x1080.png", + "view_count": 191836881, + "email": "login@provider.com" + }]}"#; + + +const AUTH_RESPONSE: &str = r#" +{ + "access_token": "prau3ol6mg5glgek8m89ec2s9q5i3i", + "refresh_token": "", + "expires_in": 3600, + "scope": ["user:read:email", "bits:read"], + "token_type": "bearer" +}"#; + +#[test] +fn test_invalid_client_id() { + let response = r#"{"error":"Unauthorized","status":401,"message":"Must provide a valid Client-ID or OAuth token"}"#; + + let response = + Builder::new() + .status(401) + .body(response).unwrap(); + let response = Response::from(response); + + let (config, test_config) = test_config(); + test_config.push_response(response); + + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config); + let user_future = client.users().users(&[], &["freakey"]); + let result = runtime.block_on(user_future); + + assert!(result.is_err()); + if let Err(err) = result { + assert!(err.is_auth_error()) + } +} + +#[test] +fn test_invalid_auth() { + let response = r#"{"message":"invalid client secret","status":403}"#; + + let response = + Builder::new() + .status(403) + .body(response).unwrap(); + let response = Response::from(response); + + let (config, test_config) = test_config(); + test_config.push_response(response); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config) + .authenticate(CLIENT_SECRET).build(); + assert!(!client.authenticated()); + + let user_future = client.users().users(&[], &["dallas"]); + let result = runtime.block_on(user_future); + + assert!(!client.authenticated()); + assert!(result.is_err()); + if let Err(err) = result { + println!("{:?}", err); + assert!(err.is_auth_error()) + } +} + +#[test] +fn test_ratelimit_hit() { + let response = r#"{"error":"Too Many Requests","message":"Thou Shall Not Pass","status":429}"#; + let response = + Builder::new() + .status(429) + .body(response).unwrap(); + let response = Response::from(response); + + let (config, test_config) = test_config(); + test_config.push_response(response); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config); + let user_future = client.users().users(&[], &["freakey"]); + let result = runtime.block_on(user_future); + + assert!(result.is_err()); + if let Err(err) = result { + assert!(err.is_ratelimit_error()) + } +} + +#[test] +fn test_client_header() { + let (config, test_config) = test_config(); + test_config.push_response(okay_response(USER_RESPONSE)); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config); + let user_future = client.users().users(&[], &["dallas"]); + let result = runtime.block_on(user_future); + assert!(result.is_ok()); + + let request = { + let config = &mut test_config.inner.lock().unwrap(); + config.requests.pop().unwrap().unwrap() + }; + + let headers = request.headers(); + let header = headers.get("Client-Id"); + assert_eq!(Some(CLIENT_ID), header.and_then(|s| Some(s.to_str().unwrap()))); +} + +#[test] +fn test_auth_header() { + + let (config, test_config) = test_config(); + test_config.push_response(okay_response(USER_RESPONSE)); + test_config.push_response(okay_response(AUTH_RESPONSE)); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config) + .authenticate(CLIENT_SECRET).build(); + + /*Authentication is lazy*/ + assert!(!client.authenticated()); + + let user_future = client.users().users(&[], &["dallas"]); + let result = runtime.block_on(user_future); + assert!(result.is_ok()); + + let request = { + let config = &mut test_config.inner.lock().unwrap(); + config.requests.pop().unwrap().unwrap() + }; + + let headers = request.headers(); + let client_id = headers.get("Client-Id"); + let secret = headers.get("Authorization"); + + let scopes = client.scopes(); + + assert!(client.authenticated()); + assert_eq!(Some(CLIENT_ID), client_id.and_then(|s| Some(s.to_str().unwrap()))); + assert_eq!(Some("Bearer ".to_owned() + "prau3ol6mg5glgek8m89ec2s9q5i3i"), secret.and_then(|s| Some(s.to_str().unwrap().to_owned()))); + assert!(scopes.contains(&HelixScope::UserReadEmail)); + assert!(scopes.contains(&HelixScope::BitsRead)); +} + + +#[test] +fn test_single_request() { + let (config, test_config) = test_config(); + test_config.push_response(okay_response(USER_RESPONSE)); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config); + let user_future = client.users().users(&[], &["dallas"]); + let result = runtime.block_on(user_future); + + assert!(result.is_ok()) +} + +#[test] +fn test_pagination() { + + let response = r#"{ + "data": [{ + "id": "234482848", + "user_id": "67955580", + "user_name": "ChewieMelodies", + "title": "-", + "description": "", + "created_at": "2018-03-02T20:53:41Z", + "published_at": "2018-03-02T20:53:41Z", + "url": "https://www.twitch.tv/videos/234482848", + "thumbnail_url": "https://static-cdn.jtvnw.net/s3_vods/bebc8cba2926d1967418_chewiemelodies_27786761696_805342775/thumb/thumb0-%{width}x%{height}.jpg", + "viewable": "public", + "view_count": 142, + "language": "en", + "type": "archive", + "duration": "3h8m33s" + }], + "pagination":{"cursor":"eyJiIjpudWxsLCJhIjoiMTUwMzQ0MTc3NjQyNDQyMjAwMCJ9"} + }"#; + + let response2 = r#"{ + "data": [{ + "id": "234482848", + "user_id": "67955580", + "user_name": "ChewieMelodies", + "title": "-", + "description": "", + "created_at": "2018-03-02T20:53:41Z", + "published_at": "2018-03-02T20:53:41Z", + "url": "https://www.twitch.tv/videos/234482848", + "thumbnail_url": "https://static-cdn.jtvnw.net/s3_vods/bebc8cba2926d1967418_chewiemelodies_27786761696_805342775/thumb/thumb0-%{width}x%{height}.jpg", + "viewable": "public", + "view_count": 142, + "language": "en", + "type": "archive", + "duration": "3h8m33s" + }], + "pagination":{"cursor":""} + }"#; + + let (config, test_config) = test_config(); + test_config.push_response(okay_response(response)); + test_config.push_response(okay_response(response2)); + let mut runtime = Runtime::new().unwrap(); + + let client = HelixClient::new_with_config(CLIENT_ID, config); + let video_future = client.videos().by_user(&"67955580"); + let result = runtime.block_on(video_future.into_future()); + assert!(result.is_ok()); + if let Ok((Some(data), next)) = result { + + let result = runtime.block_on(next.into_future()); + assert!(result.is_ok()); + } else {unreachable!()} +}
\ No newline at end of file diff --git a/tests/kraken.rs b/tests/kraken.rs new file mode 100644 index 0000000..8300b69 --- /dev/null +++ b/tests/kraken.rs @@ -0,0 +1,76 @@ +extern crate twitch_api; +extern crate tokio; +extern crate hyper; +extern crate futures; +extern crate url; +extern crate http; + +pub mod common; + +use twitch_api::KrakenClient; +use twitch_api::{ClientConfig, TestConfig}; +use twitch_api::client::ClientTrait; +use twitch_api::error::Error; +use twitch_api::client::KrakenScope; + +use tokio::runtime::current_thread::Runtime; +use reqwest::r#async::Response; +use http::response::Builder; + +use crate::common::*; + +const AUTH_RESPONSE: &str = r#" +{ + "access_token": "prau3ol6mg5glgek8m89ec2s9q5i3i", + "refresh_token": "", + "expires_in": 3600, + "scope": ["user_read", "channel_commercial"], + "token_type": "bearer" +}"#; + +const USER_RESPONSE: &str = r#"{ + "_id": "44322889", + "bio": "Just a gamer playing games and chatting. :)", + "created_at": "2013-06-03T19:12:02.580593Z", + "display_name": "dallas", + "logo": "https://static-cdn.jtvnw.net/jtv_user_pictures/dallas-profile_image-1a2c906ee2c35f12-300x300.png", + "name": "dallas", + "type": "staff", + "updated_at": "2016-12-13T16:31:55.958584Z" +}"#; + +#[test] +fn test_auth_header() { + + let (config, test_config) = test_config(); + test_config.push_response(okay_response(USER_RESPONSE)); + test_config.push_response(okay_response(AUTH_RESPONSE)); + let mut runtime = Runtime::new().unwrap(); + + let client = KrakenClient::new_with_config(CLIENT_ID, config) + .authenticate(CLIENT_SECRET).build(); + + /*Authentication is lazy*/ + assert!(!client.authenticated()); + + let user_future = client.users().by_id(&"44322889"); + let result = runtime.block_on(user_future); + assert!(result.is_ok()); + + let request = { + let config = &mut test_config.inner.lock().unwrap(); + config.requests.pop().unwrap().unwrap() + }; + + let headers = request.headers(); + let client_id = headers.get("Client-Id"); + let secret = headers.get("Authorization"); + + let scopes = client.scopes(); + + assert!(client.authenticated()); + assert_eq!(Some(CLIENT_ID), client_id.and_then(|s| Some(s.to_str().unwrap()))); + assert_eq!(Some("OAuth ".to_owned() + "prau3ol6mg5glgek8m89ec2s9q5i3i"), secret.and_then(|s| Some(s.to_str().unwrap().to_owned()))); + assert!(scopes.contains(&KrakenScope::UserRead)); + assert!(scopes.contains(&KrakenScope::ChannelCommercial)); +}
\ No newline at end of file |