diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-16 20:24:36 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-16 20:24:36 +0000 |
commit | 27267ed98fc839b51ae4621fd1aa230f7f068bdc (patch) | |
tree | 94c6899c8ab0a6fae8e8f4f7295e0a027933e703 | |
parent | be68a7da226743edbce5b52e506d9083e2859578 (diff) |
Prototype of a non-trait based client
-rw-r--r-- | src/bin/main.rs | 2 | ||||
-rw-r--r-- | src/helix/endpoints.rs | 16 | ||||
-rw-r--r-- | src/helix/mod.rs | 368 | ||||
-rw-r--r-- | src/lib.rs | 1 |
4 files changed, 85 insertions, 302 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs index 45b25df..3467711 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -8,8 +8,6 @@ use futures::future::Future; use std::env; use twitch_api::Client; -use twitch_api::helix::HelixClient; - fn main() { dotenv::dotenv().unwrap(); let client_id = &env::var("TWITCH_API").unwrap(); diff --git a/src/helix/endpoints.rs b/src/helix/endpoints.rs index 80b4ebb..a02bf73 100644 --- a/src/helix/endpoints.rs +++ b/src/helix/endpoints.rs @@ -8,19 +8,7 @@ use super::models::{DataContainer, PaginationContainer, User, Video, Clip}; use super::Client; const API_DOMAIN: &'static str = "api.twitch.tv"; -/* When Client owns a ReqwestClient, any futures spawned do not immediately - * terminate but 'hang'. When creating a new client for each request this problem - * does not occur. This would need to be resolved so we can benefit from keep alive - * connections. - */ - -use super::super::GetRequest; -use super::super::GetRequestRef; -use std::marker::PhantomData; - -use super::HelixClient; - -fn apply_standard_headers(client: &Box<dyn HelixClient>, request: RequestBuilder) +fn apply_standard_headers(client: Client, request: RequestBuilder) -> RequestBuilder { let client_header = header::HeaderValue::from_str(client.id()).unwrap(); @@ -28,7 +16,7 @@ fn apply_standard_headers(client: &Box<dyn HelixClient>, request: RequestBuilder request.header("Client-ID", client_header) } -pub fn clip(client: &Box<dyn HelixClient>, id: &str) +pub fn clip(client: Client, id: &str) -> impl Future<Item=DataContainer<Clip>, Error=reqwest::Error> { let url = diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 50412fb..fe873dd 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -3,98 +3,125 @@ use std::sync::{Arc, Mutex}; use reqwest::r#async::Client as ReqwestClient; pub use super::types; +use std::marker::PhantomData; + pub mod endpoints; pub mod models; use std::collections::HashSet; -#[derive(Clone)] -pub struct Client { - inner: Arc<ClientRef> -} - -struct ClientRef { - id: String, - client: ReqwestClient, -} - use self::models::{DataContainer, PaginationContainer, Clip}; +type EndPointResult<T> = Box<Future<Item=T, Error=reqwest::Error> + Send>; + pub trait UsersEndpoint {} pub trait VideosEndpoint {} + pub trait ClipsEndpointTrait { - fn clip(&self, id: &str) -> Box<Future<Item=DataContainer<Clip>, Error=reqwest::Error> + Send>; + fn clip(&self, id: &str) -> EndPointResult<DataContainer<Clip>>; } pub trait AuthEndpoint {} -pub struct ClipsEndpoint { - client: Box<dyn HelixClient> +pub struct Namespace<T> { + client: Client, + _type: PhantomData<T> } -impl ClipsEndpointTrait for ClipsEndpoint { - fn clip(&self, id: &str) -> Box<Future<Item=DataContainer<Clip>, Error=reqwest::Error> + Send> { - use self::endpoints::clip; - Box::new(clip(&self.client, id)) +impl<T> Namespace<T> { + pub fn new(client: &Client) -> Self { + Namespace { + client: client.clone(), + _type: PhantomData, + } } - } -pub trait HelixClient { - //fn users(&self) -> Box<dyn UsersEndpoint>; - //fn videos(&self) -> Box<dyn VideosEndpoint>; - fn clips(&self) -> Box<dyn ClipsEndpointTrait>; - //fn auth(&self) -> Box<dyn AuthEndpoint>; +pub struct Clips {} - fn id(&self) -> &str; - fn authenticated(&self) -> bool; - fn with_auth(self, secret: &str) -> AuthClientBuilder; - fn unauth(self) -> Box<dyn HelixClient>; +type ClipsNamespace = Namespace<Clips>; - /* I don't want to expose this. Temp work around*/ - fn client(&self) -> &ReqwestClient; +#[derive(Clone)] +pub struct Client { + inner: Arc<ClientRef>, } -impl HelixClient for Client { +struct MutClientRef { + token: Option<String>, + scopes: Vec<Scope>, + previous: Option<Client> +} - fn clips(&self) -> Box<dyn ClipsEndpointTrait> { - Box::new(ClipsEndpoint { - client: Box::new(self.clone()) - }) - } - - fn id(&self) -> &str { - &self.inner.id - } +struct ClientRef { + id: String, + client: ReqwestClient, + inner: Mutex<MutClientRef>, +} - fn authenticated(&self) -> bool { - false +impl Client { + pub fn new(id: &str) -> Client { + let client = ReqwestClient::new(); + Client::new_with_client(id, client) } - fn with_auth(self, secret: &str) -> AuthClientBuilder { - AuthClientBuilder::new(Box::new(self), secret) + pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { + inner: Arc::new(ClientRef { + id: id.to_owned(), + client: client, + inner: Mutex::new( + MutClientRef { + token: None, + scopes: Vec::new(), + previous: None + }) + }) + } } +} + - fn unauth(self) -> Box<dyn HelixClient> { - Box::new(self) +use reqwest::r#async::{RequestBuilder}; +use reqwest::header; + +impl Client { + + pub fn id(&self) -> &str { + &self.inner.id } - fn client(&self) -> &ReqwestClient { + pub fn client(&self) -> &ReqwestClient { &self.inner.client } + + fn apply_standard_headers(&self, request: RequestBuilder) + -> RequestBuilder + { + let client_header = header::HeaderValue::from_str(self.id()).unwrap(); + request.header("Client-ID", client_header) + } } +impl Client { + + pub fn clips(&self) -> ClipsNamespace { + ClipsNamespace::new(self) + } -#[derive(Clone)] -pub struct AuthClient { +} +impl ClipsNamespace { + pub fn clip(self, id: &str) -> impl Future<Item=DataContainer<Clip>, Error=reqwest::Error> { + use self::endpoints::clip; + clip(self.client, id) + } } pub struct AuthClientBuilder { scopes: HashSet<Scope>, secret: String, - client: Box<HelixClient>, + client: Client, /*If the user supplies a token, * then we can skip fetching it from the server and are authenticated */ @@ -102,7 +129,7 @@ pub struct AuthClientBuilder { } impl AuthClientBuilder { - pub fn new(client: Box<dyn HelixClient>, secret: &str) -> AuthClientBuilder { + pub fn new(client: Client, secret: &str) -> AuthClientBuilder { AuthClientBuilder { scopes: HashSet::new(), secret: secret.to_owned(), @@ -111,12 +138,10 @@ impl AuthClientBuilder { } } - /* - pub fn build(self) -> Box<dyn HelixClient> { - AuthClient { - } + /*TODO: Stack a new client ontop*/ + pub fn build(self) -> Client { + self.client } - */ pub fn scope(mut self, scope: Scope) -> AuthClientBuilder { let scopes = &mut self.scopes; @@ -132,7 +157,8 @@ impl AuthClientBuilder { self } - pub fn token(self, token: &str) -> AuthClientBuilder { + pub fn token(mut self, token: &str) -> AuthClientBuilder { + self.token.replace(token.to_owned()); self } } @@ -149,233 +175,3 @@ pub enum Scope { UserReadBroadcast, UserReadEmail, } - -impl Client { - pub fn new(id: &str) -> Client { - Client { - inner: Arc::new(ClientRef { - id: id.to_owned(), - client: ReqwestClient::new(), - }) - } - } - - pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { - Client { - inner: Arc::new(ClientRef { - id: id.to_owned(), - client: client, - }) - } - - } -} - - -/* - -pub struct Limits { - global: LimiterRef -} - -#[derive(Clone)] -pub struct LimiterRef { - inner: Arc<Mutex<Limiter>> -} - -trait RateLimiter { - fn remaining(&self) -> usize; - fn limit(&self) -> usize; -} - -impl RateLimiter for LimiterRef { - - fn remaining(&self) -> usize { - let limits = self.inner.lock().unwrap(); - limits.remaining - } - - fn limit(&self) -> usize { - let limits = self.inner.lock().unwrap(); - limits.limit - } -} - -struct RequestJob { - pub request: Request, - pub on_complete: futures::sync::oneshot::Sender<Response>, -} -*/ - -/* API requests should be placed in a priority queue to prevent stravation. - * This implies that all requests are sent a single location and then returned - * to their callers upon completion. - * When a request is 'owned' by the queue it can be retryed when the rate limit - * is hit and allows inspect of response headers to determine remaining resources. - */ - -/* - -enum Task { - Add(RequestJob), - Drain, -} - -pub struct Limiter { - pub remaining: u32, - pub limit: u32, - in_transit: u32, - pub last_request: Option<DateTime<Utc>>, - - pub remaining_key: String, - pub limit_key: String, - - pub queue: Vec<RequestJob>, - pub client: ReqwestClient, - pub chan: mpsc::UnboundedSender<Task>, -} -use futures::sync::oneshot; - -fn handle_request(limits_ref: LimiterRef, request: RequestJob) { - let limits = limits_ref.inner.lock().unwrap(); - limits.queue.push(request); - limits.chan.unbounded_send(Task::Drain); -} - -fn handle_drain(limits_ref: LimiterRef) { - - let jobs = { - let limits = limits_ref.inner.lock().unwrap(); - let take = - std::cmp::max(limits.remaining - limits.in_transit, 0); - let jobs = Vec::new(); - for i in 0..std::cmp::min(limits.queue.len() as u32, take) { - jobs.push(limits.queue.pop().unwrap()); - } - limits.in_transit += jobs.len() as u32; - jobs - }; - - let client = { - let limits = limits_ref.inner.lock().unwrap(); - &limits.client - }; - - if jobs.len() > 0 { - for job in jobs { - let clone = job.request.clone(); - let f = - client.execute(job.request) - .and_then(move |response| { - let mut limits = limit_ref.inner.lock().unwrap(); - limits.in_transit = - std::cmp::max(0, limits.in_transit - 1); - if response.status().is_success() { - let remaining = response.headers() - .get(limits.remaining_key) - .and_then(|value| value.to_str().ok()) - .and_then(|remaining| remaining.parse::<usize>().ok()); - - let limit = response.headers() - .get(limits.limit_key) - .and_then(|value| value.to_str().ok()) - .and_then(|remaining| remaining.parse::<usize>().ok()); - - if let Some(remaining) = remaining { - limits.remaining = remaining; - } - - if let Some(limit) = remaining { - limits.limit = limit; - } - - job.on_complete.send(Ok(response)); - } else if response.status().is_client_error() { - limit.chan_tx.send(Handle( - RequestJob { - request: clone, - on_complete: job.on_complete.clone(), - })) - println!("Hit rate limit! or invalid client") - } - -} - -impl LimiterRef { - - - fn handle_drain(&self) { - - } - - fn handle_requests() { - - chan_rx.for_each(move |task| { - match task { - Handle(request) => { - handle_request( tfdsf, request); - }, - Drain => { - } - } else { - /*sleep...*/ - } - } - } - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()) - } - - - fn new(limit: u32, remaining_key: &str, limit_key: &str, client: ReqwestClient) - -> LimiterRef - { - let (chan_tx, chan_rx) = mpsc::unbounded(); - - let limiter = Limiter { - remaining: limit, - limit: limit, - in_transit: 0, - last_request: None, - remaining_key: remaining_key.to_owned(), - limit_key: limit_key.to_owned(), - queue: Vec::new(), - client: client, - chan: chan_tx, - }; - - let _ref = LimiterRef { - inner: Arc::new(Mutex::new(limiter)) - }; - - - - return _ref; - } - - fn queue(&self, request: Request) - -> impl Future<Item=Result<Response, reqwest::Error>, Error=oneshot::Canceled> { - let mut limits = self.inner.lock().unwrap(); - let limit_ref = self.clone(); - let (tx, rx) = futures::sync::oneshot::channel(); - - let job = RequestJob { - request: request, - on_complete: tx, - }; - - limits.queue.push(job); - rx - } - - /* Insert the request into a queue */ - /* - Ok(response) - }) - } - */ - -} -*/ @@ -1,3 +1,4 @@ +#![feature(option_replace)] extern crate futures; extern crate reqwest; extern crate serde; |