diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-15 06:21:52 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-15 06:21:52 +0000 |
commit | a4c842eae14bef20d3d04ee4313251344edf431f (patch) | |
tree | 1685f11fd6bbe9f85cb51d479770b04692250c0c | |
parent | 8615cc2f030240ba2982dba893fe63f11a0c8a88 (diff) |
Deserialize Urls and Dates. Also implement custom Id types
-rw-r--r-- | Cargo.toml | 6 | ||||
-rw-r--r-- | src/bin/main.rs | 49 | ||||
-rw-r--r-- | src/deserializers.rs | 0 | ||||
-rw-r--r-- | src/helix/endpoints.rs | 38 | ||||
-rw-r--r-- | src/helix/mod.rs | 243 | ||||
-rw-r--r-- | src/helix/models.rs | 46 | ||||
-rw-r--r-- | src/kraken/endpoints.rs | 6 | ||||
-rw-r--r-- | src/kraken/mod.rs | 40 | ||||
-rw-r--r-- | src/kraken/models.rs | 32 | ||||
-rw-r--r-- | src/lib.rs | 16 | ||||
-rw-r--r-- | src/types.rs | 85 |
11 files changed, 447 insertions, 114 deletions
@@ -1,6 +1,6 @@ [package] name = "twitch_api" -version = "0.0.25" +version = "0.0.30" authors = ["David Blajda <blajda@hotmail.com>"] edition = "2018" @@ -12,4 +12,6 @@ dotenv = "0.13.0" serde = "1.0.81" serde_json = "1.0.33" serde_derive = "1.0.81" -chrono = "0.4.6" +chrono = { version = "0.4.6", features = ["serde"]} +url = "1.7.2" +url_serde = "0.2.0" diff --git a/src/bin/main.rs b/src/bin/main.rs index 1aa55ed..7fdde47 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -6,7 +6,6 @@ extern crate twitch_api; use futures::future::Future; use std::env; -use twitch_api::HelixClient; use twitch_api::Client; fn main() { @@ -14,41 +13,8 @@ fn main() { let client_id = &env::var("TWITCH_API").unwrap(); let client = Client::new(client_id); - /* - let users = twitch_api - .users(vec![], vec!["shroud", "ninja"]) - .and_then(|json| { - println!("{:?}", json); - println!("len {}", json.data.len()); - Ok(json) - }) - .map(|_| ()) - .map_err(|err| { - println!("{:?}", err); - () - }); - - let videos = twitch_api - .videos(None, Some("37402112"), None) - .and_then(|json| { - println!("{:?}", json); - Ok(json) - }) - .map(|_| ()) - .map_err(|err| { - println!("{:?}", err); - () - }); - */ - - let clip = client.helix .clip(&"EnergeticApatheticTarsierThisIsSparta") - .and_then(|json| { - println!("{:?}", json); - Ok(json) - }) - .map(|_| ()) .map_err(|err| { println!("{:?}", err); () @@ -56,15 +22,18 @@ fn main() { let clip2 = client.kraken .clip(&"EnergeticApatheticTarsierThisIsSparta") - .and_then(|json| { - print!("{:?}", json); - Ok(json) - }) - .map(|_| ()) .map_err(|err| { println!("{:?}", err); () }); - tokio::run(clip.join(clip2).map(|_| ()).map_err(|_| ())); + tokio::run( + clip.join(clip2) + .and_then(|(c1, c2)| { + println!("{:?} {:?}", c1, c2); + Ok((c1, c2)) + }) + .map(|_| { client.nop(); ()}) + .map_err(|_| ()) + ); } diff --git a/src/deserializers.rs b/src/deserializers.rs new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/src/deserializers.rs diff --git a/src/helix/endpoints.rs b/src/helix/endpoints.rs index 5c3aa05..bb61ae9 100644 --- a/src/helix/endpoints.rs +++ b/src/helix/endpoints.rs @@ -1,40 +1,29 @@ use futures::future::Future; use reqwest::header; -use reqwest::r#async::{Chunk, Decoder, Request, Response}; +use reqwest::r#async::{RequestBuilder}; use reqwest::r#async::Client as ReqwestClient; use super::models::{DataContainer, PaginationContainer, User, Video, Clip}; - +use super::Client; const API_DOMAIN: &'static str = "api.twitch.tv"; /* When Client owns a ReqwestClient, any futures spawned do not immediately * terminate but 'hang'. When creating a new client for each request this problem * does not occur. This would need to be resolved so we can benefit from keep alive * connections. - * */ -pub struct Client { - id: String, -} - impl Client { - pub fn new(client_id: &str) -> Client { - Client { - id: client_id.to_owned(), - } - } - fn create_client(&self) -> ReqwestClient { - let mut headers = header::HeaderMap::new(); - let auth_key = &self.id; - let header_value = header::HeaderValue::from_str(auth_key).unwrap(); - headers.insert("Client-ID", header_value); + fn apply_standard_headers(&self, request: RequestBuilder) + -> RequestBuilder + { + let client_header = header::HeaderValue::from_str(&self.inner.id).unwrap(); - let client = ReqwestClient::builder().default_headers(headers).build().unwrap(); - client + request.header("Client-ID", client_header) } +/* pub fn users( &self, id: Vec<&str>, @@ -99,6 +88,7 @@ impl Client { return f; } +*/ pub fn clip(&self, id: &str) -> impl Future<Item=DataContainer<Clip>, Error=reqwest::Error> @@ -107,15 +97,15 @@ impl Client { String::from("https://") + API_DOMAIN + "/helix/clips" + "?id=" + id; - let f = self.create_client() - .get(&url) + let request = self.inner.client.get(&url); + let request = self.apply_standard_headers(request); + + request .send() .map(|mut res| { println!("{:?}", res); res.json::<DataContainer<Clip>>() }) - .and_then(|json| json); - - return f; + .and_then(|json| json) } } diff --git a/src/helix/mod.rs b/src/helix/mod.rs index ce4a7a2..d8c9952 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -1,2 +1,245 @@ +use std::sync::Arc; +use reqwest::r#async::Client as ReqwestClient; +pub use super::types; + pub mod endpoints; pub mod models; + +#[derive(Clone)] +pub struct Client { + inner: Arc<ClientRef> +} + +impl Client { + pub fn new(id: &str) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: ReqwestClient::new(), + }) + } + } + + pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: client, + }) + } + + } +} + +struct ClientRef { + id: String, + client: ReqwestClient, +} + +/* + +pub struct Limits { + global: LimiterRef +} + +#[derive(Clone)] +pub struct LimiterRef { + inner: Arc<Mutex<Limiter>> +} + +trait RateLimiter { + fn remaining(&self) -> usize; + fn limit(&self) -> usize; +} + +impl RateLimiter for LimiterRef { + + fn remaining(&self) -> usize { + let limits = self.inner.lock().unwrap(); + limits.remaining + } + + fn limit(&self) -> usize { + let limits = self.inner.lock().unwrap(); + limits.limit + } +} + +struct RequestJob { + pub request: Request, + pub on_complete: futures::sync::oneshot::Sender<Response>, +} +*/ + +/* API requests should be placed in a priority queue to prevent stravation. + * This implies that all requests are sent a single location and then returned + * to their callers upon completion. + * When a request is 'owned' by the queue it can be retryed when the rate limit + * is hit and allows inspect of response headers to determine remaining resources. + */ + +/* + +enum Task { + Add(RequestJob), + Drain, +} + +pub struct Limiter { + pub remaining: u32, + pub limit: u32, + in_transit: u32, + pub last_request: Option<DateTime<Utc>>, + + pub remaining_key: String, + pub limit_key: String, + + pub queue: Vec<RequestJob>, + pub client: ReqwestClient, + pub chan: mpsc::UnboundedSender<Task>, +} +use futures::sync::oneshot; + +fn handle_request(limits_ref: LimiterRef, request: RequestJob) { + let limits = limits_ref.inner.lock().unwrap(); + limits.queue.push(request); + limits.chan.unbounded_send(Task::Drain); +} + +fn handle_drain(limits_ref: LimiterRef) { + + let jobs = { + let limits = limits_ref.inner.lock().unwrap(); + let take = + std::cmp::max(limits.remaining - limits.in_transit, 0); + let jobs = Vec::new(); + for i in 0..std::cmp::min(limits.queue.len() as u32, take) { + jobs.push(limits.queue.pop().unwrap()); + } + limits.in_transit += jobs.len() as u32; + jobs + }; + + let client = { + let limits = limits_ref.inner.lock().unwrap(); + &limits.client + }; + + if jobs.len() > 0 { + for job in jobs { + let clone = job.request.clone(); + let f = + client.execute(job.request) + .and_then(move |response| { + let mut limits = limit_ref.inner.lock().unwrap(); + limits.in_transit = + std::cmp::max(0, limits.in_transit - 1); + if response.status().is_success() { + let remaining = response.headers() + .get(limits.remaining_key) + .and_then(|value| value.to_str().ok()) + .and_then(|remaining| remaining.parse::<usize>().ok()); + + let limit = response.headers() + .get(limits.limit_key) + .and_then(|value| value.to_str().ok()) + .and_then(|remaining| remaining.parse::<usize>().ok()); + + if let Some(remaining) = remaining { + limits.remaining = remaining; + } + + if let Some(limit) = remaining { + limits.limit = limit; + } + + job.on_complete.send(Ok(response)); + } else if response.status().is_client_error() { + limit.chan_tx.send(Handle( + RequestJob { + request: clone, + on_complete: job.on_complete.clone(), + })) + println!("Hit rate limit! or invalid client") + } + +} + +impl LimiterRef { + + + fn handle_drain(&self) { + + } + + fn handle_requests() { + + chan_rx.for_each(move |task| { + match task { + Handle(request) => { + handle_request( tfdsf, request); + }, + Drain => { + } + } else { + /*sleep...*/ + } + } + } + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()) + } + + + fn new(limit: u32, remaining_key: &str, limit_key: &str, client: ReqwestClient) + -> LimiterRef + { + let (chan_tx, chan_rx) = mpsc::unbounded(); + + let limiter = Limiter { + remaining: limit, + limit: limit, + in_transit: 0, + last_request: None, + remaining_key: remaining_key.to_owned(), + limit_key: limit_key.to_owned(), + queue: Vec::new(), + client: client, + chan: chan_tx, + }; + + let _ref = LimiterRef { + inner: Arc::new(Mutex::new(limiter)) + }; + + + + return _ref; + } + + fn queue(&self, request: Request) + -> impl Future<Item=Result<Response, reqwest::Error>, Error=oneshot::Canceled> { + let mut limits = self.inner.lock().unwrap(); + let limit_ref = self.clone(); + let (tx, rx) = futures::sync::oneshot::channel(); + + let job = RequestJob { + request: request, + on_complete: tx, + }; + + limits.queue.push(job); + rx + } + + /* Insert the request into a queue */ + /* + Ok(response) + }) + } + */ + +} +*/ diff --git a/src/helix/models.rs b/src/helix/models.rs index 2b01e7c..e4c58c3 100644 --- a/src/helix/models.rs +++ b/src/helix/models.rs @@ -1,8 +1,9 @@ extern crate serde_json; extern crate chrono; -use chrono::{Duration, DateTime, Utc}; - +use url::Url; +use chrono::{DateTime, Utc}; +use super::types::{UserId, VideoId, ChannelId}; #[derive(Debug, Deserialize)] pub struct DataContainer<T> { @@ -22,17 +23,17 @@ pub struct PaginationContainer<T> { #[derive(Debug, Deserialize)] pub struct Video { - pub id: String, - pub user_id: String, + pub id: VideoId, + pub user_id: UserId, pub user_name: String, pub title: String, pub description: String, - //Should be converted to a DateTime - pub created_at: String, - pub published_at: String, - //Should be converted to a URL - pub url: String, - pub thumbnail_url: String, + pub created_at: DateTime<Utc>, + pub published_at: DateTime<Utc>, + #[serde(with = "url_serde")] + pub url: Url, + #[serde(with = "url_serde")] + pub thumbnail_url: Url, pub viewable: String, pub view_count: i32, pub language: String, @@ -44,15 +45,17 @@ pub struct Video { #[derive(Debug, Deserialize)] pub struct User { - pub id: String, + pub id: UserId, pub login: String, pub display_name: String, #[serde(rename = "type")] pub user_type: String, pub broadcaster_type: String, pub description: String, - pub profile_image_url: String, - pub offline_image_url: String, + #[serde(with = "url_serde")] + pub profile_image_url: Url, + #[serde(with = "url_serde")] + pub offline_image_url: Url, pub view_count: u32, pub email: Option<String>, } @@ -60,17 +63,20 @@ pub struct User { #[derive(Debug, Deserialize)] pub struct Clip { pub id: String, - pub url: String, - pub embed_url: String, - pub broadcaster_id: String, + #[serde(with = "url_serde")] + pub url: Url, + #[serde(with = "url_serde")] + pub embed_url: Url, + pub broadcaster_id: ChannelId, pub broadcaster_name: String, - pub creator_id: String, + pub creator_id: UserId, pub creator_name: String, - pub video_id: String, + pub video_id: VideoId, pub game_id: String, pub language: String, pub title: String, - pub created_at: String, - pub thumbnail_url: String, + pub created_at: DateTime<Utc>, + #[serde(with = "url_serde")] + pub thumbnail_url: Url, pub view_count: i32, } diff --git a/src/kraken/endpoints.rs b/src/kraken/endpoints.rs index 2dbc8d1..7ae273c 100644 --- a/src/kraken/endpoints.rs +++ b/src/kraken/endpoints.rs @@ -9,10 +9,10 @@ impl Client { -> impl Future<Item=Clip, Error=reqwest::Error> { let url = String::from("https://") + API_DOMAIN + "/kraken/clips/" + id; - let client = self.create_reqwest_client(); + let request = self.inner.client.get(&url); + let request = self.apply_standard_headers(request); - client - .get(&url) + request .send() .map(|mut res| res.json::<Clip>()) .and_then(|json| json) diff --git a/src/kraken/mod.rs b/src/kraken/mod.rs index 2015781..59d00c0 100644 --- a/src/kraken/mod.rs +++ b/src/kraken/mod.rs @@ -1,33 +1,53 @@ use reqwest::header; -use reqwest::r#async::{Chunk, Decoder, Request, Response}; +use std::sync::Arc; +use reqwest::r#async::RequestBuilder; use reqwest::r#async::Client as ReqwestClient; +pub use super::types; mod endpoints; mod models; + const ACCEPT: &str = "application/vnd.twitchtv.v5+json"; pub const API_DOMAIN: &str = "api.twitch.tv"; +#[derive(Clone)] pub struct Client { + inner: Arc<ClientRef>, +} + +struct ClientRef { id: String, + client: ReqwestClient } impl Client { pub fn new(id: &str) -> Client { Client { - id: id.to_owned(), + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: ReqwestClient::new(), + }) + } + } + + pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: client, + }) } } - fn create_reqwest_client(&self) -> ReqwestClient { - let mut headers = header::HeaderMap::new(); - let auth_key = &self.id; - let client_header = header::HeaderValue::from_str(auth_key).unwrap(); + fn apply_standard_headers(&self, request: RequestBuilder) + -> RequestBuilder + { + let client_header = header::HeaderValue::from_str(&self.inner.id).unwrap(); let accept_header = header::HeaderValue::from_str(ACCEPT).unwrap(); - headers.insert("Client-ID", client_header); - headers.insert("Accept", accept_header); - let client = ReqwestClient::builder().default_headers(headers).build().unwrap(); - client + request + .header("Accept", accept_header) + .header("Client-ID", client_header) } } diff --git a/src/kraken/models.rs b/src/kraken/models.rs index 13c524c..4863641 100644 --- a/src/kraken/models.rs +++ b/src/kraken/models.rs @@ -1,12 +1,19 @@ extern crate serde_json; extern crate chrono; +extern crate url; + +use url::Url; +use chrono::{DateTime, Utc}; +use super::types::{UserId, VideoId}; #[derive(Debug, Deserialize)] pub struct Clip { pub slug: String, pub tracking_id: String, - pub url: String, - pub embed_url: String, + #[serde(with = "url_serde")] + pub url: Url, + #[serde(with = "url_serde")] + pub embed_url: Url, pub embed_html: String, pub broadcaster: UserData, pub curator: UserData, @@ -16,29 +23,34 @@ pub struct Clip { pub title: String, pub views: i32, pub duration: f32, - pub created_at: String, + pub created_at: DateTime<Utc>, pub thumbnails: Thumbnails, } #[derive(Debug, Deserialize)] pub struct Thumbnails { - pub medium: String, - pub small: String, - pub tiny: String, + #[serde(with = "url_serde")] + pub medium: Url, + #[serde(with = "url_serde")] + pub small: Url, + #[serde(with = "url_serde")] + pub tiny: Url, } #[derive(Debug, Deserialize)] pub struct UserData { - pub id: String, + pub id: UserId, pub name: String, pub display_name: String, - pub channel_url: String, + #[serde(with = "url_serde")] + pub channel_url: Url, pub logo: String, } #[derive(Debug, Deserialize)] pub struct Vod { - pub id: String, - pub url: String, + pub id: VideoId, + #[serde(with = "url_serde")] + pub url: Url, } @@ -6,11 +6,13 @@ extern crate chrono; #[macro_use] extern crate serde_derive; -mod helix; -mod kraken; +pub mod helix; +pub mod kraken; +pub mod types; -pub use self::helix::endpoints::Client as HelixClient; +pub use self::helix::Client as HelixClient; pub use self::kraken::Client as KrakenClient; +use reqwest::r#async::Client as ReqwestClient; pub struct Client { pub helix: HelixClient, @@ -19,9 +21,13 @@ pub struct Client { impl Client { pub fn new(client_id: &str) -> Client { + let client = ReqwestClient::new(); Client { - helix: HelixClient::new(client_id), - kraken: KrakenClient::new(client_id), + helix: HelixClient::new_with_client(client_id, client.clone()), + kraken: KrakenClient::new_with_client(client_id, client.clone()), } } + + pub fn nop(self) { + } } diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..ef5c486 --- /dev/null +++ b/src/types.rs @@ -0,0 +1,85 @@ +use std::convert::{AsRef, Into}; +use std::sync::Arc; + +/* Used for Id's that can be interpreted as integers but aren't returned as + * an int by Twitch's API. (Maybe to allow a quick switch to a different representation + * without breaking the json schema?) + */ + +#[derive(Clone)] +pub struct IntegerId { + inner: Arc<IntegerIdRef> +} + +struct IntegerIdRef { + id: String, + int: u32, +} + +impl AsRef<u32> for IntegerId { + fn as_ref(&self) -> &u32 { + &self.inner.int + } +} + +impl AsRef<str> for IntegerId { + + fn as_ref(&self) -> &str { + &self.inner.id + } +} + +use std::fmt; +use std::fmt::{Display, Debug, Formatter}; + +impl Display for IntegerId { + + fn fmt(&self, f: &mut Formatter) -> fmt::Result + { + write!(f, "{}", &self.inner.id) + } +} + +impl Debug for IntegerId { + fn fmt(&self, f: &mut Formatter) -> fmt::Result + { + write!(f, "{:?}", &self.inner.id) + } +} + +use serde::{Deserialize, Deserializer}; +impl<'de> Deserialize<'de> for IntegerId { + + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where D: Deserializer<'de> + { + let id = String::deserialize(deserializer)?; + let int = (&id).parse::<u32>().map_err(serde::de::Error::custom)?; + + Ok(IntegerId { + inner: Arc::new(IntegerIdRef { + id: id, + int: int, + }) + }) + } +} + + +pub struct Id { + inner: String +} + +impl AsRef<str> for Id { + + fn as_ref(&self) -> &str { + &self.inner + } +} + +pub type UserId = IntegerId; +pub type ChannelId = UserId; +pub type VideoId = IntegerId; + + +pub type ClipId = Id; |