From a4c842eae14bef20d3d04ee4313251344edf431f Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sat, 15 Dec 2018 06:21:52 +0000 Subject: Deserialize Urls and Dates. Also implement custom Id types --- src/helix/endpoints.rs | 38 +++----- src/helix/mod.rs | 243 +++++++++++++++++++++++++++++++++++++++++++++++++ src/helix/models.rs | 46 ++++++---- 3 files changed, 283 insertions(+), 44 deletions(-) (limited to 'src/helix') diff --git a/src/helix/endpoints.rs b/src/helix/endpoints.rs index 5c3aa05..bb61ae9 100644 --- a/src/helix/endpoints.rs +++ b/src/helix/endpoints.rs @@ -1,40 +1,29 @@ use futures::future::Future; use reqwest::header; -use reqwest::r#async::{Chunk, Decoder, Request, Response}; +use reqwest::r#async::{RequestBuilder}; use reqwest::r#async::Client as ReqwestClient; use super::models::{DataContainer, PaginationContainer, User, Video, Clip}; - +use super::Client; const API_DOMAIN: &'static str = "api.twitch.tv"; /* When Client owns a ReqwestClient, any futures spawned do not immediately * terminate but 'hang'. When creating a new client for each request this problem * does not occur. This would need to be resolved so we can benefit from keep alive * connections. - * */ -pub struct Client { - id: String, -} - impl Client { - pub fn new(client_id: &str) -> Client { - Client { - id: client_id.to_owned(), - } - } - fn create_client(&self) -> ReqwestClient { - let mut headers = header::HeaderMap::new(); - let auth_key = &self.id; - let header_value = header::HeaderValue::from_str(auth_key).unwrap(); - headers.insert("Client-ID", header_value); + fn apply_standard_headers(&self, request: RequestBuilder) + -> RequestBuilder + { + let client_header = header::HeaderValue::from_str(&self.inner.id).unwrap(); - let client = ReqwestClient::builder().default_headers(headers).build().unwrap(); - client + request.header("Client-ID", client_header) } +/* pub fn users( &self, id: Vec<&str>, @@ -99,6 +88,7 @@ impl Client { return f; } +*/ pub fn clip(&self, id: &str) -> impl Future, Error=reqwest::Error> @@ -107,15 +97,15 @@ impl Client { String::from("https://") + API_DOMAIN + "/helix/clips" + "?id=" + id; - let f = self.create_client() - .get(&url) + let request = self.inner.client.get(&url); + let request = self.apply_standard_headers(request); + + request .send() .map(|mut res| { println!("{:?}", res); res.json::>() }) - .and_then(|json| json); - - return f; + .and_then(|json| json) } } diff --git a/src/helix/mod.rs b/src/helix/mod.rs index ce4a7a2..d8c9952 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -1,2 +1,245 @@ +use std::sync::Arc; +use reqwest::r#async::Client as ReqwestClient; +pub use super::types; + pub mod endpoints; pub mod models; + +#[derive(Clone)] +pub struct Client { + inner: Arc +} + +impl Client { + pub fn new(id: &str) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: ReqwestClient::new(), + }) + } + } + + pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: client, + }) + } + + } +} + +struct ClientRef { + id: String, + client: ReqwestClient, +} + +/* + +pub struct Limits { + global: LimiterRef +} + +#[derive(Clone)] +pub struct LimiterRef { + inner: Arc> +} + +trait RateLimiter { + fn remaining(&self) -> usize; + fn limit(&self) -> usize; +} + +impl RateLimiter for LimiterRef { + + fn remaining(&self) -> usize { + let limits = self.inner.lock().unwrap(); + limits.remaining + } + + fn limit(&self) -> usize { + let limits = self.inner.lock().unwrap(); + limits.limit + } +} + +struct RequestJob { + pub request: Request, + pub on_complete: futures::sync::oneshot::Sender, +} +*/ + +/* API requests should be placed in a priority queue to prevent stravation. + * This implies that all requests are sent a single location and then returned + * to their callers upon completion. + * When a request is 'owned' by the queue it can be retryed when the rate limit + * is hit and allows inspect of response headers to determine remaining resources. + */ + +/* + +enum Task { + Add(RequestJob), + Drain, +} + +pub struct Limiter { + pub remaining: u32, + pub limit: u32, + in_transit: u32, + pub last_request: Option>, + + pub remaining_key: String, + pub limit_key: String, + + pub queue: Vec, + pub client: ReqwestClient, + pub chan: mpsc::UnboundedSender, +} +use futures::sync::oneshot; + +fn handle_request(limits_ref: LimiterRef, request: RequestJob) { + let limits = limits_ref.inner.lock().unwrap(); + limits.queue.push(request); + limits.chan.unbounded_send(Task::Drain); +} + +fn handle_drain(limits_ref: LimiterRef) { + + let jobs = { + let limits = limits_ref.inner.lock().unwrap(); + let take = + std::cmp::max(limits.remaining - limits.in_transit, 0); + let jobs = Vec::new(); + for i in 0..std::cmp::min(limits.queue.len() as u32, take) { + jobs.push(limits.queue.pop().unwrap()); + } + limits.in_transit += jobs.len() as u32; + jobs + }; + + let client = { + let limits = limits_ref.inner.lock().unwrap(); + &limits.client + }; + + if jobs.len() > 0 { + for job in jobs { + let clone = job.request.clone(); + let f = + client.execute(job.request) + .and_then(move |response| { + let mut limits = limit_ref.inner.lock().unwrap(); + limits.in_transit = + std::cmp::max(0, limits.in_transit - 1); + if response.status().is_success() { + let remaining = response.headers() + .get(limits.remaining_key) + .and_then(|value| value.to_str().ok()) + .and_then(|remaining| remaining.parse::().ok()); + + let limit = response.headers() + .get(limits.limit_key) + .and_then(|value| value.to_str().ok()) + .and_then(|remaining| remaining.parse::().ok()); + + if let Some(remaining) = remaining { + limits.remaining = remaining; + } + + if let Some(limit) = remaining { + limits.limit = limit; + } + + job.on_complete.send(Ok(response)); + } else if response.status().is_client_error() { + limit.chan_tx.send(Handle( + RequestJob { + request: clone, + on_complete: job.on_complete.clone(), + })) + println!("Hit rate limit! or invalid client") + } + +} + +impl LimiterRef { + + + fn handle_drain(&self) { + + } + + fn handle_requests() { + + chan_rx.for_each(move |task| { + match task { + Handle(request) => { + handle_request( tfdsf, request); + }, + Drain => { + } + } else { + /*sleep...*/ + } + } + } + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()) + } + + + fn new(limit: u32, remaining_key: &str, limit_key: &str, client: ReqwestClient) + -> LimiterRef + { + let (chan_tx, chan_rx) = mpsc::unbounded(); + + let limiter = Limiter { + remaining: limit, + limit: limit, + in_transit: 0, + last_request: None, + remaining_key: remaining_key.to_owned(), + limit_key: limit_key.to_owned(), + queue: Vec::new(), + client: client, + chan: chan_tx, + }; + + let _ref = LimiterRef { + inner: Arc::new(Mutex::new(limiter)) + }; + + + + return _ref; + } + + fn queue(&self, request: Request) + -> impl Future, Error=oneshot::Canceled> { + let mut limits = self.inner.lock().unwrap(); + let limit_ref = self.clone(); + let (tx, rx) = futures::sync::oneshot::channel(); + + let job = RequestJob { + request: request, + on_complete: tx, + }; + + limits.queue.push(job); + rx + } + + /* Insert the request into a queue */ + /* + Ok(response) + }) + } + */ + +} +*/ diff --git a/src/helix/models.rs b/src/helix/models.rs index 2b01e7c..e4c58c3 100644 --- a/src/helix/models.rs +++ b/src/helix/models.rs @@ -1,8 +1,9 @@ extern crate serde_json; extern crate chrono; -use chrono::{Duration, DateTime, Utc}; - +use url::Url; +use chrono::{DateTime, Utc}; +use super::types::{UserId, VideoId, ChannelId}; #[derive(Debug, Deserialize)] pub struct DataContainer { @@ -22,17 +23,17 @@ pub struct PaginationContainer { #[derive(Debug, Deserialize)] pub struct Video { - pub id: String, - pub user_id: String, + pub id: VideoId, + pub user_id: UserId, pub user_name: String, pub title: String, pub description: String, - //Should be converted to a DateTime - pub created_at: String, - pub published_at: String, - //Should be converted to a URL - pub url: String, - pub thumbnail_url: String, + pub created_at: DateTime, + pub published_at: DateTime, + #[serde(with = "url_serde")] + pub url: Url, + #[serde(with = "url_serde")] + pub thumbnail_url: Url, pub viewable: String, pub view_count: i32, pub language: String, @@ -44,15 +45,17 @@ pub struct Video { #[derive(Debug, Deserialize)] pub struct User { - pub id: String, + pub id: UserId, pub login: String, pub display_name: String, #[serde(rename = "type")] pub user_type: String, pub broadcaster_type: String, pub description: String, - pub profile_image_url: String, - pub offline_image_url: String, + #[serde(with = "url_serde")] + pub profile_image_url: Url, + #[serde(with = "url_serde")] + pub offline_image_url: Url, pub view_count: u32, pub email: Option, } @@ -60,17 +63,20 @@ pub struct User { #[derive(Debug, Deserialize)] pub struct Clip { pub id: String, - pub url: String, - pub embed_url: String, - pub broadcaster_id: String, + #[serde(with = "url_serde")] + pub url: Url, + #[serde(with = "url_serde")] + pub embed_url: Url, + pub broadcaster_id: ChannelId, pub broadcaster_name: String, - pub creator_id: String, + pub creator_id: UserId, pub creator_name: String, - pub video_id: String, + pub video_id: VideoId, pub game_id: String, pub language: String, pub title: String, - pub created_at: String, - pub thumbnail_url: String, + pub created_at: DateTime, + #[serde(with = "url_serde")] + pub thumbnail_url: Url, pub view_count: i32, } -- cgit v1.2.3