diff options
author | David Blajda <blajda@hotmail.com> | 2018-12-17 07:02:07 +0000 |
---|---|---|
committer | David Blajda <blajda@hotmail.com> | 2018-12-17 07:02:07 +0000 |
commit | 21ebcdb53db06557fe73e195742c038ed91ef331 (patch) | |
tree | 0e197f06683e8b14251e9f6579d747ec828f775d | |
parent | d9887360a2b015405abe550858391034e9dda9a1 (diff) |
:WIP: requests are sent to a central location
-rw-r--r-- | src/helix/mod.rs | 108 |
1 files changed, 86 insertions, 22 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 0ddd910..b1edc29 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -10,6 +10,7 @@ pub mod namespaces; use std::collections::HashSet; use self::models::{DataContainer, PaginationContainer, Clip}; +use futures::{Sink, Stream}; type EndPointResult<T> = Box<Future<Item=T, Error=reqwest::Error> + Send>; @@ -24,6 +25,7 @@ pub struct Namespace<T> { _type: PhantomData<T> } + impl<T> Namespace<T> { pub fn new(client: &Client) -> Self { Namespace { @@ -50,12 +52,19 @@ pub struct Client { inner: Arc<ClientRef>, } +use reqwest::r#async::Response; +use futures::sync::oneshot; + struct MutClientRef { token: Option<String>, scopes: Vec<Scope>, - previous: Option<Client> + previous: Option<Client>, + chan: Option<mpsc::Sender<(Arc<RequestRef>, oneshot::Sender<Response>)>>, } +use futures::sync::mpsc; + + struct ClientRef { id: String, client: ReqwestClient, @@ -69,12 +78,14 @@ impl Client { } pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { inner: Arc::new(ClientRef { id: id.to_owned(), client: client, inner: Mutex::new( MutClientRef { + chan: None, token: None, scopes: Vec::new(), previous: None @@ -115,7 +126,7 @@ impl Client { } } - fn apply_standard_headers(&self, request: RequestBuilder) + pub fn apply_standard_headers(&self, request: RequestBuilder) -> RequestBuilder { let client_header = header::HeaderValue::from_str(self.id()).unwrap(); @@ -175,16 +186,22 @@ impl AuthClientBuilder { use std::collections::BTreeMap; +struct RequestRef { + url: String, + params: BTreeMap<String, String>, + client: Client, + +} + enum RequestState<T> { Uninitalized, - Polling(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>) + PollChannel(oneshot::Receiver<Response>), + PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>), } pub struct ApiRequest<T> { - url: String, - params: BTreeMap<String, String>, - client: Client, - state: RequestState<T>, + inner: Arc<RequestRef>, + state: RequestState<T> } impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { @@ -195,19 +212,48 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> { ) -> ApiRequest<T> { ApiRequest { - url, - params, - client: client, + inner: Arc::new( RequestRef { + url: url, + params: params, + client: client, + }), state: RequestState::Uninitalized } } } + use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; +fn handle_requests(channel: mpsc::Receiver<(Arc<RequestRef>, oneshot::Sender<Response>)>) + -> impl Future<Item=(), Error=()> +{ + channel.for_each(|(request, notify)| { + let _request = request.client.client().get(&request.url); + let _request = request.client.apply_standard_headers(_request); + let _request = _request.query(&request.params); + + let f = _request + .send() + .map(move |response| { + notify.send(response); + () + }). + map_err(|_| { + panic!("TODO....") + }); + + tokio::spawn(f); + + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()) +} + impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { type Item = T; type Error = reqwest::Error; @@ -216,19 +262,37 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> { loop { match &mut self.state { RequestState::Uninitalized => { - let request = self.client.client().get(&self.url); - let request = self.client.apply_standard_headers(request); - let request = request.query(&self.params); - - let f = request - .send() - .map(|mut res| { - res.json::<T>() - }) - .and_then(|json| json); - self.state = RequestState::Polling(Box::new(f)); + /*TODO use poll_ready*/ + let mut mut_client = self.inner.client.inner.inner.lock().unwrap(); + let (resp_tx, resp_rx) = oneshot::channel(); + match &mut mut_client.chan { + Some(chan) => { + chan.try_send((self.inner.clone(), resp_tx)); + }, + None => { + let (mut chan_tx, chan_rx) = mpsc::channel(30); + chan_tx.try_send((self.inner.clone(), resp_tx)); + + tokio::spawn(handle_requests(chan_rx)); + mut_client.chan.replace(chan_tx); + } + } + + self.state = RequestState::PollChannel(resp_rx); + }, + RequestState::PollChannel(chan) => { + let status = chan.poll(); + match status { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(mut res)) => { + let f = res.json::<T>(); + self.state = RequestState::PollParse(Box::new(f)); + continue; + }, + _ => panic!("TODO...") + } }, - RequestState::Polling(future) => { + RequestState::PollParse(future) => { let res = try_ready!(future.poll()); return Ok(Async::Ready(res)); } |