summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-15 06:21:52 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-15 06:21:52 +0000
commita4c842eae14bef20d3d04ee4313251344edf431f (patch)
tree1685f11fd6bbe9f85cb51d479770b04692250c0c
parent8615cc2f030240ba2982dba893fe63f11a0c8a88 (diff)
Deserialize Urls and Dates. Also implement custom Id types
-rw-r--r--Cargo.toml6
-rw-r--r--src/bin/main.rs49
-rw-r--r--src/deserializers.rs0
-rw-r--r--src/helix/endpoints.rs38
-rw-r--r--src/helix/mod.rs243
-rw-r--r--src/helix/models.rs46
-rw-r--r--src/kraken/endpoints.rs6
-rw-r--r--src/kraken/mod.rs40
-rw-r--r--src/kraken/models.rs32
-rw-r--r--src/lib.rs16
-rw-r--r--src/types.rs85
11 files changed, 447 insertions, 114 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 473e8e3..58e4fc6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "twitch_api"
-version = "0.0.25"
+version = "0.0.30"
authors = ["David Blajda <blajda@hotmail.com>"]
edition = "2018"
@@ -12,4 +12,6 @@ dotenv = "0.13.0"
serde = "1.0.81"
serde_json = "1.0.33"
serde_derive = "1.0.81"
-chrono = "0.4.6"
+chrono = { version = "0.4.6", features = ["serde"]}
+url = "1.7.2"
+url_serde = "0.2.0"
diff --git a/src/bin/main.rs b/src/bin/main.rs
index 1aa55ed..7fdde47 100644
--- a/src/bin/main.rs
+++ b/src/bin/main.rs
@@ -6,7 +6,6 @@ extern crate twitch_api;
use futures::future::Future;
use std::env;
-use twitch_api::HelixClient;
use twitch_api::Client;
fn main() {
@@ -14,41 +13,8 @@ fn main() {
let client_id = &env::var("TWITCH_API").unwrap();
let client = Client::new(client_id);
- /*
- let users = twitch_api
- .users(vec![], vec!["shroud", "ninja"])
- .and_then(|json| {
- println!("{:?}", json);
- println!("len {}", json.data.len());
- Ok(json)
- })
- .map(|_| ())
- .map_err(|err| {
- println!("{:?}", err);
- ()
- });
-
- let videos = twitch_api
- .videos(None, Some("37402112"), None)
- .and_then(|json| {
- println!("{:?}", json);
- Ok(json)
- })
- .map(|_| ())
- .map_err(|err| {
- println!("{:?}", err);
- ()
- });
- */
-
-
let clip = client.helix
.clip(&"EnergeticApatheticTarsierThisIsSparta")
- .and_then(|json| {
- println!("{:?}", json);
- Ok(json)
- })
- .map(|_| ())
.map_err(|err| {
println!("{:?}", err);
()
@@ -56,15 +22,18 @@ fn main() {
let clip2 = client.kraken
.clip(&"EnergeticApatheticTarsierThisIsSparta")
- .and_then(|json| {
- print!("{:?}", json);
- Ok(json)
- })
- .map(|_| ())
.map_err(|err| {
println!("{:?}", err);
()
});
- tokio::run(clip.join(clip2).map(|_| ()).map_err(|_| ()));
+ tokio::run(
+ clip.join(clip2)
+ .and_then(|(c1, c2)| {
+ println!("{:?} {:?}", c1, c2);
+ Ok((c1, c2))
+ })
+ .map(|_| { client.nop(); ()})
+ .map_err(|_| ())
+ );
}
diff --git a/src/deserializers.rs b/src/deserializers.rs
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/deserializers.rs
diff --git a/src/helix/endpoints.rs b/src/helix/endpoints.rs
index 5c3aa05..bb61ae9 100644
--- a/src/helix/endpoints.rs
+++ b/src/helix/endpoints.rs
@@ -1,40 +1,29 @@
use futures::future::Future;
use reqwest::header;
-use reqwest::r#async::{Chunk, Decoder, Request, Response};
+use reqwest::r#async::{RequestBuilder};
use reqwest::r#async::Client as ReqwestClient;
use super::models::{DataContainer, PaginationContainer, User, Video, Clip};
-
+use super::Client;
const API_DOMAIN: &'static str = "api.twitch.tv";
/* When Client owns a ReqwestClient, any futures spawned do not immediately
* terminate but 'hang'. When creating a new client for each request this problem
* does not occur. This would need to be resolved so we can benefit from keep alive
* connections.
- *
*/
-pub struct Client {
- id: String,
-}
-
impl Client {
- pub fn new(client_id: &str) -> Client {
- Client {
- id: client_id.to_owned(),
- }
- }
- fn create_client(&self) -> ReqwestClient {
- let mut headers = header::HeaderMap::new();
- let auth_key = &self.id;
- let header_value = header::HeaderValue::from_str(auth_key).unwrap();
- headers.insert("Client-ID", header_value);
+ fn apply_standard_headers(&self, request: RequestBuilder)
+ -> RequestBuilder
+ {
+ let client_header = header::HeaderValue::from_str(&self.inner.id).unwrap();
- let client = ReqwestClient::builder().default_headers(headers).build().unwrap();
- client
+ request.header("Client-ID", client_header)
}
+/*
pub fn users(
&self,
id: Vec<&str>,
@@ -99,6 +88,7 @@ impl Client {
return f;
}
+*/
pub fn clip(&self, id: &str)
-> impl Future<Item=DataContainer<Clip>, Error=reqwest::Error>
@@ -107,15 +97,15 @@ impl Client {
String::from("https://") +
API_DOMAIN + "/helix/clips" + "?id=" + id;
- let f = self.create_client()
- .get(&url)
+ let request = self.inner.client.get(&url);
+ let request = self.apply_standard_headers(request);
+
+ request
.send()
.map(|mut res| {
println!("{:?}", res);
res.json::<DataContainer<Clip>>()
})
- .and_then(|json| json);
-
- return f;
+ .and_then(|json| json)
}
}
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index ce4a7a2..d8c9952 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -1,2 +1,245 @@
+use std::sync::Arc;
+use reqwest::r#async::Client as ReqwestClient;
+pub use super::types;
+
pub mod endpoints;
pub mod models;
+
+#[derive(Clone)]
+pub struct Client {
+ inner: Arc<ClientRef>
+}
+
+impl Client {
+ pub fn new(id: &str) -> Client {
+ Client {
+ inner: Arc::new(ClientRef {
+ id: id.to_owned(),
+ client: ReqwestClient::new(),
+ })
+ }
+ }
+
+ pub fn new_with_client(id: &str, client: ReqwestClient) -> Client {
+ Client {
+ inner: Arc::new(ClientRef {
+ id: id.to_owned(),
+ client: client,
+ })
+ }
+
+ }
+}
+
+struct ClientRef {
+ id: String,
+ client: ReqwestClient,
+}
+
+/*
+
+pub struct Limits {
+ global: LimiterRef
+}
+
+#[derive(Clone)]
+pub struct LimiterRef {
+ inner: Arc<Mutex<Limiter>>
+}
+
+trait RateLimiter {
+ fn remaining(&self) -> usize;
+ fn limit(&self) -> usize;
+}
+
+impl RateLimiter for LimiterRef {
+
+ fn remaining(&self) -> usize {
+ let limits = self.inner.lock().unwrap();
+ limits.remaining
+ }
+
+ fn limit(&self) -> usize {
+ let limits = self.inner.lock().unwrap();
+ limits.limit
+ }
+}
+
+struct RequestJob {
+ pub request: Request,
+ pub on_complete: futures::sync::oneshot::Sender<Response>,
+}
+*/
+
+/* API requests should be placed in a priority queue to prevent stravation.
+ * This implies that all requests are sent a single location and then returned
+ * to their callers upon completion.
+ * When a request is 'owned' by the queue it can be retryed when the rate limit
+ * is hit and allows inspect of response headers to determine remaining resources.
+ */
+
+/*
+
+enum Task {
+ Add(RequestJob),
+ Drain,
+}
+
+pub struct Limiter {
+ pub remaining: u32,
+ pub limit: u32,
+ in_transit: u32,
+ pub last_request: Option<DateTime<Utc>>,
+
+ pub remaining_key: String,
+ pub limit_key: String,
+
+ pub queue: Vec<RequestJob>,
+ pub client: ReqwestClient,
+ pub chan: mpsc::UnboundedSender<Task>,
+}
+use futures::sync::oneshot;
+
+fn handle_request(limits_ref: LimiterRef, request: RequestJob) {
+ let limits = limits_ref.inner.lock().unwrap();
+ limits.queue.push(request);
+ limits.chan.unbounded_send(Task::Drain);
+}
+
+fn handle_drain(limits_ref: LimiterRef) {
+
+ let jobs = {
+ let limits = limits_ref.inner.lock().unwrap();
+ let take =
+ std::cmp::max(limits.remaining - limits.in_transit, 0);
+ let jobs = Vec::new();
+ for i in 0..std::cmp::min(limits.queue.len() as u32, take) {
+ jobs.push(limits.queue.pop().unwrap());
+ }
+ limits.in_transit += jobs.len() as u32;
+ jobs
+ };
+
+ let client = {
+ let limits = limits_ref.inner.lock().unwrap();
+ &limits.client
+ };
+
+ if jobs.len() > 0 {
+ for job in jobs {
+ let clone = job.request.clone();
+ let f =
+ client.execute(job.request)
+ .and_then(move |response| {
+ let mut limits = limit_ref.inner.lock().unwrap();
+ limits.in_transit =
+ std::cmp::max(0, limits.in_transit - 1);
+ if response.status().is_success() {
+ let remaining = response.headers()
+ .get(limits.remaining_key)
+ .and_then(|value| value.to_str().ok())
+ .and_then(|remaining| remaining.parse::<usize>().ok());
+
+ let limit = response.headers()
+ .get(limits.limit_key)
+ .and_then(|value| value.to_str().ok())
+ .and_then(|remaining| remaining.parse::<usize>().ok());
+
+ if let Some(remaining) = remaining {
+ limits.remaining = remaining;
+ }
+
+ if let Some(limit) = remaining {
+ limits.limit = limit;
+ }
+
+ job.on_complete.send(Ok(response));
+ } else if response.status().is_client_error() {
+ limit.chan_tx.send(Handle(
+ RequestJob {
+ request: clone,
+ on_complete: job.on_complete.clone(),
+ }))
+ println!("Hit rate limit! or invalid client")
+ }
+
+}
+
+impl LimiterRef {
+
+
+ fn handle_drain(&self) {
+
+ }
+
+ fn handle_requests() {
+
+ chan_rx.for_each(move |task| {
+ match task {
+ Handle(request) => {
+ handle_request( tfdsf, request);
+ },
+ Drain => {
+ }
+ } else {
+ /*sleep...*/
+ }
+ }
+ }
+ Ok(())
+ })
+ .map(|_| ())
+ .map_err(|_| ())
+ }
+
+
+ fn new(limit: u32, remaining_key: &str, limit_key: &str, client: ReqwestClient)
+ -> LimiterRef
+ {
+ let (chan_tx, chan_rx) = mpsc::unbounded();
+
+ let limiter = Limiter {
+ remaining: limit,
+ limit: limit,
+ in_transit: 0,
+ last_request: None,
+ remaining_key: remaining_key.to_owned(),
+ limit_key: limit_key.to_owned(),
+ queue: Vec::new(),
+ client: client,
+ chan: chan_tx,
+ };
+
+ let _ref = LimiterRef {
+ inner: Arc::new(Mutex::new(limiter))
+ };
+
+
+
+ return _ref;
+ }
+
+ fn queue(&self, request: Request)
+ -> impl Future<Item=Result<Response, reqwest::Error>, Error=oneshot::Canceled> {
+ let mut limits = self.inner.lock().unwrap();
+ let limit_ref = self.clone();
+ let (tx, rx) = futures::sync::oneshot::channel();
+
+ let job = RequestJob {
+ request: request,
+ on_complete: tx,
+ };
+
+ limits.queue.push(job);
+ rx
+ }
+
+ /* Insert the request into a queue */
+ /*
+ Ok(response)
+ })
+ }
+ */
+
+}
+*/
diff --git a/src/helix/models.rs b/src/helix/models.rs
index 2b01e7c..e4c58c3 100644
--- a/src/helix/models.rs
+++ b/src/helix/models.rs
@@ -1,8 +1,9 @@
extern crate serde_json;
extern crate chrono;
-use chrono::{Duration, DateTime, Utc};
-
+use url::Url;
+use chrono::{DateTime, Utc};
+use super::types::{UserId, VideoId, ChannelId};
#[derive(Debug, Deserialize)]
pub struct DataContainer<T> {
@@ -22,17 +23,17 @@ pub struct PaginationContainer<T> {
#[derive(Debug, Deserialize)]
pub struct Video {
- pub id: String,
- pub user_id: String,
+ pub id: VideoId,
+ pub user_id: UserId,
pub user_name: String,
pub title: String,
pub description: String,
- //Should be converted to a DateTime
- pub created_at: String,
- pub published_at: String,
- //Should be converted to a URL
- pub url: String,
- pub thumbnail_url: String,
+ pub created_at: DateTime<Utc>,
+ pub published_at: DateTime<Utc>,
+ #[serde(with = "url_serde")]
+ pub url: Url,
+ #[serde(with = "url_serde")]
+ pub thumbnail_url: Url,
pub viewable: String,
pub view_count: i32,
pub language: String,
@@ -44,15 +45,17 @@ pub struct Video {
#[derive(Debug, Deserialize)]
pub struct User {
- pub id: String,
+ pub id: UserId,
pub login: String,
pub display_name: String,
#[serde(rename = "type")]
pub user_type: String,
pub broadcaster_type: String,
pub description: String,
- pub profile_image_url: String,
- pub offline_image_url: String,
+ #[serde(with = "url_serde")]
+ pub profile_image_url: Url,
+ #[serde(with = "url_serde")]
+ pub offline_image_url: Url,
pub view_count: u32,
pub email: Option<String>,
}
@@ -60,17 +63,20 @@ pub struct User {
#[derive(Debug, Deserialize)]
pub struct Clip {
pub id: String,
- pub url: String,
- pub embed_url: String,
- pub broadcaster_id: String,
+ #[serde(with = "url_serde")]
+ pub url: Url,
+ #[serde(with = "url_serde")]
+ pub embed_url: Url,
+ pub broadcaster_id: ChannelId,
pub broadcaster_name: String,
- pub creator_id: String,
+ pub creator_id: UserId,
pub creator_name: String,
- pub video_id: String,
+ pub video_id: VideoId,
pub game_id: String,
pub language: String,
pub title: String,
- pub created_at: String,
- pub thumbnail_url: String,
+ pub created_at: DateTime<Utc>,
+ #[serde(with = "url_serde")]
+ pub thumbnail_url: Url,
pub view_count: i32,
}
diff --git a/src/kraken/endpoints.rs b/src/kraken/endpoints.rs
index 2dbc8d1..7ae273c 100644
--- a/src/kraken/endpoints.rs
+++ b/src/kraken/endpoints.rs
@@ -9,10 +9,10 @@ impl Client {
-> impl Future<Item=Clip, Error=reqwest::Error>
{
let url = String::from("https://") + API_DOMAIN + "/kraken/clips/" + id;
- let client = self.create_reqwest_client();
+ let request = self.inner.client.get(&url);
+ let request = self.apply_standard_headers(request);
- client
- .get(&url)
+ request
.send()
.map(|mut res| res.json::<Clip>())
.and_then(|json| json)
diff --git a/src/kraken/mod.rs b/src/kraken/mod.rs
index 2015781..59d00c0 100644
--- a/src/kraken/mod.rs
+++ b/src/kraken/mod.rs
@@ -1,33 +1,53 @@
use reqwest::header;
-use reqwest::r#async::{Chunk, Decoder, Request, Response};
+use std::sync::Arc;
+use reqwest::r#async::RequestBuilder;
use reqwest::r#async::Client as ReqwestClient;
+pub use super::types;
mod endpoints;
mod models;
+
const ACCEPT: &str = "application/vnd.twitchtv.v5+json";
pub const API_DOMAIN: &str = "api.twitch.tv";
+#[derive(Clone)]
pub struct Client {
+ inner: Arc<ClientRef>,
+}
+
+struct ClientRef {
id: String,
+ client: ReqwestClient
}
impl Client {
pub fn new(id: &str) -> Client {
Client {
- id: id.to_owned(),
+ inner: Arc::new(ClientRef {
+ id: id.to_owned(),
+ client: ReqwestClient::new(),
+ })
+ }
+ }
+
+ pub fn new_with_client(id: &str, client: ReqwestClient) -> Client {
+ Client {
+ inner: Arc::new(ClientRef {
+ id: id.to_owned(),
+ client: client,
+ })
}
}
- fn create_reqwest_client(&self) -> ReqwestClient {
- let mut headers = header::HeaderMap::new();
- let auth_key = &self.id;
- let client_header = header::HeaderValue::from_str(auth_key).unwrap();
+ fn apply_standard_headers(&self, request: RequestBuilder)
+ -> RequestBuilder
+ {
+ let client_header = header::HeaderValue::from_str(&self.inner.id).unwrap();
let accept_header = header::HeaderValue::from_str(ACCEPT).unwrap();
- headers.insert("Client-ID", client_header);
- headers.insert("Accept", accept_header);
- let client = ReqwestClient::builder().default_headers(headers).build().unwrap();
- client
+ request
+ .header("Accept", accept_header)
+ .header("Client-ID", client_header)
}
}
diff --git a/src/kraken/models.rs b/src/kraken/models.rs
index 13c524c..4863641 100644
--- a/src/kraken/models.rs
+++ b/src/kraken/models.rs
@@ -1,12 +1,19 @@
extern crate serde_json;
extern crate chrono;
+extern crate url;
+
+use url::Url;
+use chrono::{DateTime, Utc};
+use super::types::{UserId, VideoId};
#[derive(Debug, Deserialize)]
pub struct Clip {
pub slug: String,
pub tracking_id: String,
- pub url: String,
- pub embed_url: String,
+ #[serde(with = "url_serde")]
+ pub url: Url,
+ #[serde(with = "url_serde")]
+ pub embed_url: Url,
pub embed_html: String,
pub broadcaster: UserData,
pub curator: UserData,
@@ -16,29 +23,34 @@ pub struct Clip {
pub title: String,
pub views: i32,
pub duration: f32,
- pub created_at: String,
+ pub created_at: DateTime<Utc>,
pub thumbnails: Thumbnails,
}
#[derive(Debug, Deserialize)]
pub struct Thumbnails {
- pub medium: String,
- pub small: String,
- pub tiny: String,
+ #[serde(with = "url_serde")]
+ pub medium: Url,
+ #[serde(with = "url_serde")]
+ pub small: Url,
+ #[serde(with = "url_serde")]
+ pub tiny: Url,
}
#[derive(Debug, Deserialize)]
pub struct UserData {
- pub id: String,
+ pub id: UserId,
pub name: String,
pub display_name: String,
- pub channel_url: String,
+ #[serde(with = "url_serde")]
+ pub channel_url: Url,
pub logo: String,
}
#[derive(Debug, Deserialize)]
pub struct Vod {
- pub id: String,
- pub url: String,
+ pub id: VideoId,
+ #[serde(with = "url_serde")]
+ pub url: Url,
}
diff --git a/src/lib.rs b/src/lib.rs
index 06239b9..e0100fa 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -6,11 +6,13 @@ extern crate chrono;
#[macro_use]
extern crate serde_derive;
-mod helix;
-mod kraken;
+pub mod helix;
+pub mod kraken;
+pub mod types;
-pub use self::helix::endpoints::Client as HelixClient;
+pub use self::helix::Client as HelixClient;
pub use self::kraken::Client as KrakenClient;
+use reqwest::r#async::Client as ReqwestClient;
pub struct Client {
pub helix: HelixClient,
@@ -19,9 +21,13 @@ pub struct Client {
impl Client {
pub fn new(client_id: &str) -> Client {
+ let client = ReqwestClient::new();
Client {
- helix: HelixClient::new(client_id),
- kraken: KrakenClient::new(client_id),
+ helix: HelixClient::new_with_client(client_id, client.clone()),
+ kraken: KrakenClient::new_with_client(client_id, client.clone()),
}
}
+
+ pub fn nop(self) {
+ }
}
diff --git a/src/types.rs b/src/types.rs
new file mode 100644
index 0000000..ef5c486
--- /dev/null
+++ b/src/types.rs
@@ -0,0 +1,85 @@
+use std::convert::{AsRef, Into};
+use std::sync::Arc;
+
+/* Used for Id's that can be interpreted as integers but aren't returned as
+ * an int by Twitch's API. (Maybe to allow a quick switch to a different representation
+ * without breaking the json schema?)
+ */
+
+#[derive(Clone)]
+pub struct IntegerId {
+ inner: Arc<IntegerIdRef>
+}
+
+struct IntegerIdRef {
+ id: String,
+ int: u32,
+}
+
+impl AsRef<u32> for IntegerId {
+ fn as_ref(&self) -> &u32 {
+ &self.inner.int
+ }
+}
+
+impl AsRef<str> for IntegerId {
+
+ fn as_ref(&self) -> &str {
+ &self.inner.id
+ }
+}
+
+use std::fmt;
+use std::fmt::{Display, Debug, Formatter};
+
+impl Display for IntegerId {
+
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result
+ {
+ write!(f, "{}", &self.inner.id)
+ }
+}
+
+impl Debug for IntegerId {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result
+ {
+ write!(f, "{:?}", &self.inner.id)
+ }
+}
+
+use serde::{Deserialize, Deserializer};
+impl<'de> Deserialize<'de> for IntegerId {
+
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where D: Deserializer<'de>
+ {
+ let id = String::deserialize(deserializer)?;
+ let int = (&id).parse::<u32>().map_err(serde::de::Error::custom)?;
+
+ Ok(IntegerId {
+ inner: Arc::new(IntegerIdRef {
+ id: id,
+ int: int,
+ })
+ })
+ }
+}
+
+
+pub struct Id {
+ inner: String
+}
+
+impl AsRef<str> for Id {
+
+ fn as_ref(&self) -> &str {
+ &self.inner
+ }
+}
+
+pub type UserId = IntegerId;
+pub type ChannelId = UserId;
+pub type VideoId = IntegerId;
+
+
+pub type ClipId = Id;