diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-18 05:06:05 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-18 05:06:05 +0000 |
commit | d34229bc3e495d2927415f408b18aec51a4574e2 (patch) | |
tree | 42f892b93d1459a55b52ccb009ddafd9166e0a8b | |
parent | 21ebcdb53db06557fe73e195742c038ed91ef331 (diff) |
Implement auth client and auth barrier
-rw-r--r-- | src/bin/main.rs | 8 | ||||
-rw-r--r-- | src/error.rs | 32 | ||||
-rw-r--r-- | src/helix/mod.rs | 285 | ||||
-rw-r--r-- | src/helix/models.rs | 10 | ||||
-rw-r--r-- | src/helix/namespaces/auth.rs | 43 | ||||
-rw-r--r-- | src/helix/namespaces/clips.rs | 4 | ||||
-rw-r--r-- | src/helix/namespaces/mod.rs | 1 | ||||
-rw-r--r-- | src/lib.rs | 2 |
8 files changed, 313 insertions, 72 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs index dbaaa41..9e01a29 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -7,6 +7,7 @@ extern crate twitch_api; use futures::future::Future; use std::env; use twitch_api::Client; +use twitch_api::HelixClient; fn main() { dotenv::dotenv().unwrap(); @@ -14,7 +15,12 @@ fn main() { let client = Client::new(client_id); - let clip = client.helix + let authed_client = + client.helix.clone() + .authenticate(&env::var("TWITCH_SECRET").unwrap()) + .build(); + + let clip = authed_client .clips() .clip(&"EnergeticApatheticTarsierThisIsSparta") .map_err(|err| { diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..07bb578 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,32 @@ +use reqwest::Error as ReqwestError; +use std::convert::From; + +#[derive(Debug)] +enum Kind { + Reqwest(ReqwestError), + ClientError(String), +} + +#[derive(Debug)] +pub struct Error { + inner: Kind +} + + +impl From<reqwest::Error> for Error { + + fn from(err: ReqwestError) -> Error { + Error { + inner: Kind::Reqwest(err) + } + } +} + +impl From<futures::Canceled> for Error { + + fn from(_err: futures::Canceled) -> Error { + Error { + inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned()) + } + } +} diff --git a/src/helix/mod.rs b/src/helix/mod.rs index b1edc29..7ceecb6 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -8,24 +8,15 @@ pub mod models; pub mod namespaces; use std::collections::HashSet; - -use self::models::{DataContainer, PaginationContainer, Clip}; use futures::{Sink, Stream}; -type EndPointResult<T> = Box<Future<Item=T, Error=reqwest::Error> + Send>; - -pub trait UsersEndpoint {} -pub trait VideosEndpoint {} - - -pub trait AuthEndpoint {} +use super::error::Error; pub struct Namespace<T> { client: Client, _type: PhantomData<T> } - impl<T> Namespace<T> { pub fn new(client: &Client) -> Self { Namespace { @@ -55,11 +46,18 @@ pub struct Client { use reqwest::r#async::Response; use futures::sync::oneshot; +#[derive(Clone, PartialEq)] +enum AuthState { + Unauth, + Auth, +} + struct MutClientRef { token: Option<String>, scopes: Vec<Scope>, previous: Option<Client>, - chan: Option<mpsc::Sender<(Arc<RequestRef>, oneshot::Sender<Response>)>>, + chan: Option<mpsc::Sender<(AuthWaiter, oneshot::Sender<AuthWaiter>)>>, + auth_state: AuthState, } use futures::sync::mpsc; @@ -67,6 +65,7 @@ use futures::sync::mpsc; struct ClientRef { id: String, + secret: Option<String>, client: ReqwestClient, inner: Mutex<MutClientRef>, } @@ -83,12 +82,14 @@ impl Client { inner: Arc::new(ClientRef { id: id.to_owned(), client: client, + secret: None, inner: Mutex::new( MutClientRef { chan: None, token: None, scopes: Vec::new(), - previous: None + previous: None, + auth_state: AuthState::Auth, }) }) } @@ -114,8 +115,27 @@ impl Client { } */ - pub fn authenticate(self) -> AuthClientBuilder { - AuthClientBuilder::new(self) + /* The 'bottom' client must always be a client that is not authorized. + * This which allows for calls to Auth endpoints using the same control flow + * as other requests. + * + * Clients created with 'new' are bottom clients and calls + * to authenticate stack a authed client on top + */ + fn get_bottom_client(&self) -> Client { + let mut_client = self.inner.inner.lock().unwrap(); + match &mut_client.previous { + Some(client) => { + client.get_bottom_client() + }, + None => { + self.clone() + } + } + } + + pub fn authenticate(self, secret: &str) -> AuthClientBuilder { + AuthClientBuilder::new(self, secret) } pub fn deauthenticate(self) -> Client { @@ -129,7 +149,16 @@ impl Client { pub fn apply_standard_headers(&self, request: RequestBuilder) -> RequestBuilder { + let mut_client = self.inner.inner.lock().unwrap(); let client_header = header::HeaderValue::from_str(self.id()).unwrap(); + + let request = + if let Some(token) = &mut_client.token { + let value = "Bearer ".to_owned() + token; + let token_header = header::HeaderValue::from_str(&value).unwrap(); + request.header("Authorization", token_header) + } else { request }; + request.header("Client-ID", client_header) } } @@ -141,7 +170,7 @@ use reqwest::header; pub struct AuthClientBuilder { scopes: HashSet<Scope>, - secret: Option<String>, + secret: String, token: Option<String>, client: Client, /*If the user supplies a token, @@ -150,18 +179,34 @@ pub struct AuthClientBuilder { } impl AuthClientBuilder { - pub fn new(client: Client) -> AuthClientBuilder { + pub fn new(client: Client, secret: &str) -> AuthClientBuilder { AuthClientBuilder { scopes: HashSet::new(), client: client, - secret: None, + secret: secret.to_owned(), token: None, } } - /*TODO: Stack a new client ontop*/ pub fn build(self) -> Client { - self.client + let auth_state = if self.token.is_some() { AuthState::Auth } else { AuthState::Unauth }; + let old_client = self.client; + Client { + inner: Arc::new(ClientRef { + id: old_client.inner.id.clone(), + client: old_client.inner.client.clone(), + secret: Some(self.secret), + inner: Mutex::new ( + MutClientRef { + chan: None, + token: self.token, + scopes: Vec::new(), + previous: Some(old_client), + auth_state: auth_state, + }) + + }) + } } pub fn scope(mut self, scope: Scope) -> AuthClientBuilder { @@ -185,17 +230,18 @@ impl AuthClientBuilder { } use std::collections::BTreeMap; +use reqwest::Method; struct RequestRef { url: String, params: BTreeMap<String, String>, client: Client, - + method: Method, } enum RequestState<T> { Uninitalized, - PollChannel(oneshot::Receiver<Response>), + WaitAuth(oneshot::Receiver<AuthWaiter>), PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), } @@ -208,7 +254,8 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { pub fn new(url: String, params: BTreeMap<String, String>, - client: Client + client: Client, + method: Method, ) -> ApiRequest<T> { ApiRequest { @@ -216,6 +263,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { url: url, params: params, client: client, + method: method, }), state: RequestState::Uninitalized } @@ -228,70 +276,169 @@ use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; -fn handle_requests(channel: mpsc::Receiver<(Arc<RequestRef>, oneshot::Sender<Response>)>) - -> impl Future<Item=(), Error=()> -{ - channel.for_each(|(request, notify)| { - let _request = request.client.client().get(&request.url); - let _request = request.client.apply_standard_headers(_request); - let _request = _request.query(&request.params); - - let f = _request - .send() - .map(move |response| { - notify.send(response); +/* Consider creating a barrier future which simple takes ownership of the request + * and returns it after syncing is complete + */ + +pub trait Waiter { + + fn is_locked(&self) -> bool; + fn poll(&self) -> Box<Future<Item=(), Error=()> + Send>; +} + +struct AuthWaiter { + waiter: Client, +} + +impl Waiter for AuthWaiter { + + fn is_locked(&self) -> bool { + let mut_client = self.waiter.inner.inner.lock().unwrap(); + mut_client.auth_state == AuthState::Unauth + } + + fn poll(&self) -> Box<Future<Item=(), Error=()> + Send> { + let bottom_client = self.waiter.get_bottom_client(); + let secret = self.waiter.inner.secret.as_ref().unwrap(); + let client = self.waiter.clone(); + + let auth_future = + bottom_client + .auth() + .client_credentials(secret) + .map(move |credentials| { + println!("{:?}", credentials); + let mut mut_client = client.inner.inner.lock().unwrap(); + mut_client.auth_state = AuthState::Auth; + mut_client.token = Some(credentials.access_token.clone()); + () + }) + .map_err(|err| { + println!("{:?}", err); () - }). - map_err(|_| { - panic!("TODO....") }); - tokio::spawn(f); - - Ok(()) + Box::new(auth_future) + } +} + +/* Todo: If the polled futures returns an error than all the waiters should + * get that error + */ + +fn create_barrier<T: Send + Waiter + 'static>() + -> mpsc::Sender<(T, oneshot::Sender<T>)> +{ + enum Message<T> { + Request((T, oneshot::Sender<T>)), + OnCondition, + } + let (sender, receiver): + (mpsc::Sender<(T, oneshot::Sender<T>)>, mpsc::Receiver<(T, oneshot::Sender<T>)>) = mpsc::channel(200); + + let mut polling = false; + + let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); + let mut waiters = Vec::new(); + + let f1 = receiver.map(|request| Message::Request(request)); + let f2 = on_condition_rx.map(|complete| Message::OnCondition); + + let mut inner_sender = sender.clone(); + let inner_condition = on_condition_tx.clone(); + let f = + f1.select(f2).for_each(move |message| { + match message { + Message::Request((waiter, backchan)) => { + if waiter.is_locked() && !polling { + println!("locked"); + + let c1 = inner_condition.clone(); + let c2 = inner_condition.clone(); + let f = waiter + .poll() + .map(move |_| {c1.send(()).wait(); ()}) + .map_err(move |_| {c2.send(()).wait(); ()}); + tokio::spawn(f); + polling = true; + + waiters.push((waiter, backchan)); + } else if waiter.is_locked() || polling { + println!("polling"); + waiters.push((waiter, backchan)); + } else { + println!("Pass along waiter!"); + backchan.send(waiter); + } + }, + Message::OnCondition => { + /*Resubmit all waiters back to the request channel + * At least one waiter will pass the barrier + */ + polling = false; + let mut sender = inner_sender.clone(); + while waiters.len() > 0 { + let waiter = waiters.pop().unwrap(); + /* Spawn this */ + let f = sender.clone().send(waiter); + tokio::spawn(f.map(|_| ()).map_err(|_| ())); + } + } + } + + Ok(()) }) .map(|_| ()) - .map_err(|_| ()) + .map_err(|_| ()); + + tokio::spawn(f); + + sender } + impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; - type Error = reqwest::Error; + type Error = Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { loop { match &mut self.state { RequestState::Uninitalized => { - /*TODO use poll_ready*/ let mut mut_client = self.inner.client.inner.inner.lock().unwrap(); let (resp_tx, resp_rx) = oneshot::channel(); - match &mut mut_client.chan { - Some(chan) => { - chan.try_send((self.inner.clone(), resp_tx)); - }, - None => { - let (mut chan_tx, chan_rx) = mpsc::channel(30); - chan_tx.try_send((self.inner.clone(), resp_tx)); - - tokio::spawn(handle_requests(chan_rx)); - mut_client.chan.replace(chan_tx); - } - } - - self.state = RequestState::PollChannel(resp_rx); - }, - RequestState::PollChannel(chan) => { - let status = chan.poll(); - match status { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(mut res)) => { - let f = res.json::<T>(); - self.state = RequestState::PollParse(Box::new(f)); - continue; - }, - _ => panic!("TODO...") - } + let chan = + mut_client.chan + .get_or_insert_with(|| { + create_barrier() + }); + + /*TODO use poll_ready*/ + chan.try_send((AuthWaiter{ waiter: self.inner.client.clone() }, resp_tx)); + + self.state = RequestState::WaitAuth(resp_rx); }, + RequestState::WaitAuth(chan) => { + let waiter = try_ready!(chan.poll()); + let client = &self.inner.client; + let reqwest = client.client(); + + let builder = reqwest.request(self.inner.method.clone(), &self.inner.url); + let builder = client.apply_standard_headers(builder); + let r = builder.query(&self.inner.params); + + let f = r.send() + .map(|mut response| { + println!("{:?}", response); + response.json::<T>() + }) + .and_then(|json| { + json + }); + + self.state = RequestState::PollParse(Box::new(f)); + continue; + } RequestState::PollParse(future) => { let res = try_ready!(future.poll()); return Ok(Async::Ready(res)); diff --git a/src/helix/models.rs b/src/helix/models.rs index e4c58c3..86b7560 100644 --- a/src/helix/models.rs +++ b/src/helix/models.rs @@ -80,3 +80,13 @@ pub struct Clip { pub thumbnail_url: Url, pub view_count: i32, } + + +#[derive(Debug, Deserialize)] +pub struct Credentials { + pub access_token: String, + pub refresh_token: Option<String>, + pub expires_in: u32, + pub scope: Option<Vec<String>>, + pub token_type: String, +} diff --git a/src/helix/namespaces/auth.rs b/src/helix/namespaces/auth.rs new file mode 100644 index 0000000..8ad7956 --- /dev/null +++ b/src/helix/namespaces/auth.rs @@ -0,0 +1,43 @@ +use futures::future::Future; +use std::collections::BTreeMap; +use super::super::models::Credentials; +use super::super::Client; +const ID_DOMAIN: &'static str = "id.twitch.tv"; +use super::super::Namespace; + +pub struct Auth {} +type AuthNamespace = Namespace<Auth>; + +impl AuthNamespace { + pub fn client_credentials(self, secret: &str) + -> ApiRequest<Credentials> { + use self::client_credentials; + client_credentials(self.client, secret) + } +} + +impl Client { + pub fn auth(&self) -> AuthNamespace { + AuthNamespace::new(self) + } +} + +use super::super::ApiRequest; +use reqwest::Method; + +//TODO: Implement scopes +pub fn client_credentials(client: Client, secret: &str) + -> ApiRequest<Credentials> { + + let url = + String::from("https://") + + ID_DOMAIN + "/oauth2/token"; + + let mut params = BTreeMap::new(); + params.insert("client_id".to_owned(), client.id().to_owned()); + params.insert("client_secret".to_owned(), secret.to_owned()); + params.insert("grant_type".to_owned(), "client_credentials".to_owned()); + params.insert("scope".to_owned(), "".to_owned()); + + ApiRequest::new(url, params, client, Method::POST) +} diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs index 351f006..ac820e8 100644 --- a/src/helix/namespaces/clips.rs +++ b/src/helix/namespaces/clips.rs @@ -23,7 +23,7 @@ impl Client { } use super::super::ApiRequest; - +use reqwest::Method; pub fn clip(client: Client, id: &str) -> ApiRequest<DataContainer<Clip>> @@ -34,5 +34,5 @@ pub fn clip(client: Client, id: &str) let params = BTreeMap::new(); - ApiRequest::new( url, params, client) + ApiRequest::new(url, params, client, Method::GET) } diff --git a/src/helix/namespaces/mod.rs b/src/helix/namespaces/mod.rs index ad73aa3..d1c44bd 100644 --- a/src/helix/namespaces/mod.rs +++ b/src/helix/namespaces/mod.rs @@ -1,3 +1,4 @@ pub mod clips; pub mod users; pub mod videos; +pub mod auth; @@ -1,3 +1,4 @@ +#![recursion_limit="128"] #![feature(option_replace)] extern crate futures; extern crate reqwest; @@ -8,6 +9,7 @@ extern crate chrono; pub mod helix; pub mod kraken; pub mod types; +pub mod error; pub use self::helix::Client as HelixClient; pub use self::kraken::Client as KrakenClient; |