summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-18 05:06:05 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-18 05:06:05 +0000
commitd34229bc3e495d2927415f408b18aec51a4574e2 (patch)
tree42f892b93d1459a55b52ccb009ddafd9166e0a8b
parent21ebcdb53db06557fe73e195742c038ed91ef331 (diff)
Implement auth client and auth barrier
-rw-r--r--src/bin/main.rs8
-rw-r--r--src/error.rs32
-rw-r--r--src/helix/mod.rs285
-rw-r--r--src/helix/models.rs10
-rw-r--r--src/helix/namespaces/auth.rs43
-rw-r--r--src/helix/namespaces/clips.rs4
-rw-r--r--src/helix/namespaces/mod.rs1
-rw-r--r--src/lib.rs2
8 files changed, 313 insertions, 72 deletions
diff --git a/src/bin/main.rs b/src/bin/main.rs
index dbaaa41..9e01a29 100644
--- a/src/bin/main.rs
+++ b/src/bin/main.rs
@@ -7,6 +7,7 @@ extern crate twitch_api;
use futures::future::Future;
use std::env;
use twitch_api::Client;
+use twitch_api::HelixClient;
fn main() {
dotenv::dotenv().unwrap();
@@ -14,7 +15,12 @@ fn main() {
let client = Client::new(client_id);
- let clip = client.helix
+ let authed_client =
+ client.helix.clone()
+ .authenticate(&env::var("TWITCH_SECRET").unwrap())
+ .build();
+
+ let clip = authed_client
.clips()
.clip(&"EnergeticApatheticTarsierThisIsSparta")
.map_err(|err| {
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 0000000..07bb578
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,32 @@
+use reqwest::Error as ReqwestError;
+use std::convert::From;
+
+#[derive(Debug)]
+enum Kind {
+ Reqwest(ReqwestError),
+ ClientError(String),
+}
+
+#[derive(Debug)]
+pub struct Error {
+ inner: Kind
+}
+
+
+impl From<reqwest::Error> for Error {
+
+ fn from(err: ReqwestError) -> Error {
+ Error {
+ inner: Kind::Reqwest(err)
+ }
+ }
+}
+
+impl From<futures::Canceled> for Error {
+
+ fn from(_err: futures::Canceled) -> Error {
+ Error {
+ inner: Kind::ClientError("Oneshot channel unexpectedly closed".to_owned())
+ }
+ }
+}
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index b1edc29..7ceecb6 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -8,24 +8,15 @@ pub mod models;
pub mod namespaces;
use std::collections::HashSet;
-
-use self::models::{DataContainer, PaginationContainer, Clip};
use futures::{Sink, Stream};
-type EndPointResult<T> = Box<Future<Item=T, Error=reqwest::Error> + Send>;
-
-pub trait UsersEndpoint {}
-pub trait VideosEndpoint {}
-
-
-pub trait AuthEndpoint {}
+use super::error::Error;
pub struct Namespace<T> {
client: Client,
_type: PhantomData<T>
}
-
impl<T> Namespace<T> {
pub fn new(client: &Client) -> Self {
Namespace {
@@ -55,11 +46,18 @@ pub struct Client {
use reqwest::r#async::Response;
use futures::sync::oneshot;
+#[derive(Clone, PartialEq)]
+enum AuthState {
+ Unauth,
+ Auth,
+}
+
struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
previous: Option<Client>,
- chan: Option<mpsc::Sender<(Arc<RequestRef>, oneshot::Sender<Response>)>>,
+ chan: Option<mpsc::Sender<(AuthWaiter, oneshot::Sender<AuthWaiter>)>>,
+ auth_state: AuthState,
}
use futures::sync::mpsc;
@@ -67,6 +65,7 @@ use futures::sync::mpsc;
struct ClientRef {
id: String,
+ secret: Option<String>,
client: ReqwestClient,
inner: Mutex<MutClientRef>,
}
@@ -83,12 +82,14 @@ impl Client {
inner: Arc::new(ClientRef {
id: id.to_owned(),
client: client,
+ secret: None,
inner: Mutex::new(
MutClientRef {
chan: None,
token: None,
scopes: Vec::new(),
- previous: None
+ previous: None,
+ auth_state: AuthState::Auth,
})
})
}
@@ -114,8 +115,27 @@ impl Client {
}
*/
- pub fn authenticate(self) -> AuthClientBuilder {
- AuthClientBuilder::new(self)
+ /* The 'bottom' client must always be a client that is not authorized.
+ * This which allows for calls to Auth endpoints using the same control flow
+ * as other requests.
+ *
+ * Clients created with 'new' are bottom clients and calls
+ * to authenticate stack a authed client on top
+ */
+ fn get_bottom_client(&self) -> Client {
+ let mut_client = self.inner.inner.lock().unwrap();
+ match &mut_client.previous {
+ Some(client) => {
+ client.get_bottom_client()
+ },
+ None => {
+ self.clone()
+ }
+ }
+ }
+
+ pub fn authenticate(self, secret: &str) -> AuthClientBuilder {
+ AuthClientBuilder::new(self, secret)
}
pub fn deauthenticate(self) -> Client {
@@ -129,7 +149,16 @@ impl Client {
pub fn apply_standard_headers(&self, request: RequestBuilder)
-> RequestBuilder
{
+ let mut_client = self.inner.inner.lock().unwrap();
let client_header = header::HeaderValue::from_str(self.id()).unwrap();
+
+ let request =
+ if let Some(token) = &mut_client.token {
+ let value = "Bearer ".to_owned() + token;
+ let token_header = header::HeaderValue::from_str(&value).unwrap();
+ request.header("Authorization", token_header)
+ } else { request };
+
request.header("Client-ID", client_header)
}
}
@@ -141,7 +170,7 @@ use reqwest::header;
pub struct AuthClientBuilder {
scopes: HashSet<Scope>,
- secret: Option<String>,
+ secret: String,
token: Option<String>,
client: Client,
/*If the user supplies a token,
@@ -150,18 +179,34 @@ pub struct AuthClientBuilder {
}
impl AuthClientBuilder {
- pub fn new(client: Client) -> AuthClientBuilder {
+ pub fn new(client: Client, secret: &str) -> AuthClientBuilder {
AuthClientBuilder {
scopes: HashSet::new(),
client: client,
- secret: None,
+ secret: secret.to_owned(),
token: None,
}
}
- /*TODO: Stack a new client ontop*/
pub fn build(self) -> Client {
- self.client
+ let auth_state = if self.token.is_some() { AuthState::Auth } else { AuthState::Unauth };
+ let old_client = self.client;
+ Client {
+ inner: Arc::new(ClientRef {
+ id: old_client.inner.id.clone(),
+ client: old_client.inner.client.clone(),
+ secret: Some(self.secret),
+ inner: Mutex::new (
+ MutClientRef {
+ chan: None,
+ token: self.token,
+ scopes: Vec::new(),
+ previous: Some(old_client),
+ auth_state: auth_state,
+ })
+
+ })
+ }
}
pub fn scope(mut self, scope: Scope) -> AuthClientBuilder {
@@ -185,17 +230,18 @@ impl AuthClientBuilder {
}
use std::collections::BTreeMap;
+use reqwest::Method;
struct RequestRef {
url: String,
params: BTreeMap<String, String>,
client: Client,
-
+ method: Method,
}
enum RequestState<T> {
Uninitalized,
- PollChannel(oneshot::Receiver<Response>),
+ WaitAuth(oneshot::Receiver<AuthWaiter>),
PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
@@ -208,7 +254,8 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
pub fn new(url: String,
params: BTreeMap<String, String>,
- client: Client
+ client: Client,
+ method: Method,
) -> ApiRequest<T>
{
ApiRequest {
@@ -216,6 +263,7 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
url: url,
params: params,
client: client,
+ method: method,
}),
state: RequestState::Uninitalized
}
@@ -228,70 +276,169 @@ use serde::de::DeserializeOwned;
use futures::Async;
use futures::try_ready;
-fn handle_requests(channel: mpsc::Receiver<(Arc<RequestRef>, oneshot::Sender<Response>)>)
- -> impl Future<Item=(), Error=()>
-{
- channel.for_each(|(request, notify)| {
- let _request = request.client.client().get(&request.url);
- let _request = request.client.apply_standard_headers(_request);
- let _request = _request.query(&request.params);
-
- let f = _request
- .send()
- .map(move |response| {
- notify.send(response);
+/* Consider creating a barrier future which simple takes ownership of the request
+ * and returns it after syncing is complete
+ */
+
+pub trait Waiter {
+
+ fn is_locked(&self) -> bool;
+ fn poll(&self) -> Box<Future<Item=(), Error=()> + Send>;
+}
+
+struct AuthWaiter {
+ waiter: Client,
+}
+
+impl Waiter for AuthWaiter {
+
+ fn is_locked(&self) -> bool {
+ let mut_client = self.waiter.inner.inner.lock().unwrap();
+ mut_client.auth_state == AuthState::Unauth
+ }
+
+ fn poll(&self) -> Box<Future<Item=(), Error=()> + Send> {
+ let bottom_client = self.waiter.get_bottom_client();
+ let secret = self.waiter.inner.secret.as_ref().unwrap();
+ let client = self.waiter.clone();
+
+ let auth_future =
+ bottom_client
+ .auth()
+ .client_credentials(secret)
+ .map(move |credentials| {
+ println!("{:?}", credentials);
+ let mut mut_client = client.inner.inner.lock().unwrap();
+ mut_client.auth_state = AuthState::Auth;
+ mut_client.token = Some(credentials.access_token.clone());
+ ()
+ })
+ .map_err(|err| {
+ println!("{:?}", err);
()
- }).
- map_err(|_| {
- panic!("TODO....")
});
- tokio::spawn(f);
-
- Ok(())
+ Box::new(auth_future)
+ }
+}
+
+/* Todo: If the polled futures returns an error than all the waiters should
+ * get that error
+ */
+
+fn create_barrier<T: Send + Waiter + 'static>()
+ -> mpsc::Sender<(T, oneshot::Sender<T>)>
+{
+ enum Message<T> {
+ Request((T, oneshot::Sender<T>)),
+ OnCondition,
+ }
+ let (sender, receiver):
+ (mpsc::Sender<(T, oneshot::Sender<T>)>, mpsc::Receiver<(T, oneshot::Sender<T>)>) = mpsc::channel(200);
+
+ let mut polling = false;
+
+ let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
+ let mut waiters = Vec::new();
+
+ let f1 = receiver.map(|request| Message::Request(request));
+ let f2 = on_condition_rx.map(|complete| Message::OnCondition);
+
+ let mut inner_sender = sender.clone();
+ let inner_condition = on_condition_tx.clone();
+ let f =
+ f1.select(f2).for_each(move |message| {
+ match message {
+ Message::Request((waiter, backchan)) => {
+ if waiter.is_locked() && !polling {
+ println!("locked");
+
+ let c1 = inner_condition.clone();
+ let c2 = inner_condition.clone();
+ let f = waiter
+ .poll()
+ .map(move |_| {c1.send(()).wait(); ()})
+ .map_err(move |_| {c2.send(()).wait(); ()});
+ tokio::spawn(f);
+ polling = true;
+
+ waiters.push((waiter, backchan));
+ } else if waiter.is_locked() || polling {
+ println!("polling");
+ waiters.push((waiter, backchan));
+ } else {
+ println!("Pass along waiter!");
+ backchan.send(waiter);
+ }
+ },
+ Message::OnCondition => {
+ /*Resubmit all waiters back to the request channel
+ * At least one waiter will pass the barrier
+ */
+ polling = false;
+ let mut sender = inner_sender.clone();
+ while waiters.len() > 0 {
+ let waiter = waiters.pop().unwrap();
+ /* Spawn this */
+ let f = sender.clone().send(waiter);
+ tokio::spawn(f.map(|_| ()).map_err(|_| ()));
+ }
+ }
+ }
+
+ Ok(())
})
.map(|_| ())
- .map_err(|_| ())
+ .map_err(|_| ());
+
+ tokio::spawn(f);
+
+ sender
}
+
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
- type Error = reqwest::Error;
+ type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match &mut self.state {
RequestState::Uninitalized => {
- /*TODO use poll_ready*/
let mut mut_client = self.inner.client.inner.inner.lock().unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
- match &mut mut_client.chan {
- Some(chan) => {
- chan.try_send((self.inner.clone(), resp_tx));
- },
- None => {
- let (mut chan_tx, chan_rx) = mpsc::channel(30);
- chan_tx.try_send((self.inner.clone(), resp_tx));
-
- tokio::spawn(handle_requests(chan_rx));
- mut_client.chan.replace(chan_tx);
- }
- }
-
- self.state = RequestState::PollChannel(resp_rx);
- },
- RequestState::PollChannel(chan) => {
- let status = chan.poll();
- match status {
- Ok(Async::NotReady) => return Ok(Async::NotReady),
- Ok(Async::Ready(mut res)) => {
- let f = res.json::<T>();
- self.state = RequestState::PollParse(Box::new(f));
- continue;
- },
- _ => panic!("TODO...")
- }
+ let chan =
+ mut_client.chan
+ .get_or_insert_with(|| {
+ create_barrier()
+ });
+
+ /*TODO use poll_ready*/
+ chan.try_send((AuthWaiter{ waiter: self.inner.client.clone() }, resp_tx));
+
+ self.state = RequestState::WaitAuth(resp_rx);
},
+ RequestState::WaitAuth(chan) => {
+ let waiter = try_ready!(chan.poll());
+ let client = &self.inner.client;
+ let reqwest = client.client();
+
+ let builder = reqwest.request(self.inner.method.clone(), &self.inner.url);
+ let builder = client.apply_standard_headers(builder);
+ let r = builder.query(&self.inner.params);
+
+ let f = r.send()
+ .map(|mut response| {
+ println!("{:?}", response);
+ response.json::<T>()
+ })
+ .and_then(|json| {
+ json
+ });
+
+ self.state = RequestState::PollParse(Box::new(f));
+ continue;
+ }
RequestState::PollParse(future) => {
let res = try_ready!(future.poll());
return Ok(Async::Ready(res));
diff --git a/src/helix/models.rs b/src/helix/models.rs
index e4c58c3..86b7560 100644
--- a/src/helix/models.rs
+++ b/src/helix/models.rs
@@ -80,3 +80,13 @@ pub struct Clip {
pub thumbnail_url: Url,
pub view_count: i32,
}
+
+
+#[derive(Debug, Deserialize)]
+pub struct Credentials {
+ pub access_token: String,
+ pub refresh_token: Option<String>,
+ pub expires_in: u32,
+ pub scope: Option<Vec<String>>,
+ pub token_type: String,
+}
diff --git a/src/helix/namespaces/auth.rs b/src/helix/namespaces/auth.rs
new file mode 100644
index 0000000..8ad7956
--- /dev/null
+++ b/src/helix/namespaces/auth.rs
@@ -0,0 +1,43 @@
+use futures::future::Future;
+use std::collections::BTreeMap;
+use super::super::models::Credentials;
+use super::super::Client;
+const ID_DOMAIN: &'static str = "id.twitch.tv";
+use super::super::Namespace;
+
+pub struct Auth {}
+type AuthNamespace = Namespace<Auth>;
+
+impl AuthNamespace {
+ pub fn client_credentials(self, secret: &str)
+ -> ApiRequest<Credentials> {
+ use self::client_credentials;
+ client_credentials(self.client, secret)
+ }
+}
+
+impl Client {
+ pub fn auth(&self) -> AuthNamespace {
+ AuthNamespace::new(self)
+ }
+}
+
+use super::super::ApiRequest;
+use reqwest::Method;
+
+//TODO: Implement scopes
+pub fn client_credentials(client: Client, secret: &str)
+ -> ApiRequest<Credentials> {
+
+ let url =
+ String::from("https://") +
+ ID_DOMAIN + "/oauth2/token";
+
+ let mut params = BTreeMap::new();
+ params.insert("client_id".to_owned(), client.id().to_owned());
+ params.insert("client_secret".to_owned(), secret.to_owned());
+ params.insert("grant_type".to_owned(), "client_credentials".to_owned());
+ params.insert("scope".to_owned(), "".to_owned());
+
+ ApiRequest::new(url, params, client, Method::POST)
+}
diff --git a/src/helix/namespaces/clips.rs b/src/helix/namespaces/clips.rs
index 351f006..ac820e8 100644
--- a/src/helix/namespaces/clips.rs
+++ b/src/helix/namespaces/clips.rs
@@ -23,7 +23,7 @@ impl Client {
}
use super::super::ApiRequest;
-
+use reqwest::Method;
pub fn clip(client: Client, id: &str)
-> ApiRequest<DataContainer<Clip>>
@@ -34,5 +34,5 @@ pub fn clip(client: Client, id: &str)
let params = BTreeMap::new();
- ApiRequest::new( url, params, client)
+ ApiRequest::new(url, params, client, Method::GET)
}
diff --git a/src/helix/namespaces/mod.rs b/src/helix/namespaces/mod.rs
index ad73aa3..d1c44bd 100644
--- a/src/helix/namespaces/mod.rs
+++ b/src/helix/namespaces/mod.rs
@@ -1,3 +1,4 @@
pub mod clips;
pub mod users;
pub mod videos;
+pub mod auth;
diff --git a/src/lib.rs b/src/lib.rs
index 5144895..27dffea 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,4 @@
+#![recursion_limit="128"]
#![feature(option_replace)]
extern crate futures;
extern crate reqwest;
@@ -8,6 +9,7 @@ extern crate chrono;
pub mod helix;
pub mod kraken;
pub mod types;
+pub mod error;
pub use self::helix::Client as HelixClient;
pub use self::kraken::Client as KrakenClient;