From 21ebcdb53db06557fe73e195742c038ed91ef331 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 17 Dec 2018 07:02:07 +0000 Subject: :WIP: requests are sent to a central location --- src/helix/mod.rs | 108 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 22 deletions(-) (limited to 'src/helix') diff --git a/src/helix/mod.rs b/src/helix/mod.rs index 0ddd910..b1edc29 100644 --- a/src/helix/mod.rs +++ b/src/helix/mod.rs @@ -10,6 +10,7 @@ pub mod namespaces; use std::collections::HashSet; use self::models::{DataContainer, PaginationContainer, Clip}; +use futures::{Sink, Stream}; type EndPointResult = Box + Send>; @@ -24,6 +25,7 @@ pub struct Namespace { _type: PhantomData } + impl Namespace { pub fn new(client: &Client) -> Self { Namespace { @@ -50,12 +52,19 @@ pub struct Client { inner: Arc, } +use reqwest::r#async::Response; +use futures::sync::oneshot; + struct MutClientRef { token: Option, scopes: Vec, - previous: Option + previous: Option, + chan: Option, oneshot::Sender)>>, } +use futures::sync::mpsc; + + struct ClientRef { id: String, client: ReqwestClient, @@ -69,12 +78,14 @@ impl Client { } pub fn new_with_client(id: &str, client: ReqwestClient) -> Client { + Client { inner: Arc::new(ClientRef { id: id.to_owned(), client: client, inner: Mutex::new( MutClientRef { + chan: None, token: None, scopes: Vec::new(), previous: None @@ -115,7 +126,7 @@ impl Client { } } - fn apply_standard_headers(&self, request: RequestBuilder) + pub fn apply_standard_headers(&self, request: RequestBuilder) -> RequestBuilder { let client_header = header::HeaderValue::from_str(self.id()).unwrap(); @@ -175,16 +186,22 @@ impl AuthClientBuilder { use std::collections::BTreeMap; +struct RequestRef { + url: String, + params: BTreeMap, + client: Client, + +} + enum RequestState { Uninitalized, - Polling(Box + Send>) + PollChannel(oneshot::Receiver), + PollParse(Box + Send>), } pub struct ApiRequest { - url: String, - params: BTreeMap, - client: Client, - state: RequestState, + inner: Arc, + state: RequestState } impl ApiRequest { @@ -195,19 +212,48 @@ impl ApiRequest { ) -> ApiRequest { ApiRequest { - url, - params, - client: client, + inner: Arc::new( RequestRef { + url: url, + params: params, + client: client, + }), state: RequestState::Uninitalized } } } + use futures::Poll; use serde::de::DeserializeOwned; use futures::Async; use futures::try_ready; +fn handle_requests(channel: mpsc::Receiver<(Arc, oneshot::Sender)>) + -> impl Future +{ + channel.for_each(|(request, notify)| { + let _request = request.client.client().get(&request.url); + let _request = request.client.apply_standard_headers(_request); + let _request = _request.query(&request.params); + + let f = _request + .send() + .map(move |response| { + notify.send(response); + () + }). + map_err(|_| { + panic!("TODO....") + }); + + tokio::spawn(f); + + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()) +} + impl Future for ApiRequest { type Item = T; type Error = reqwest::Error; @@ -216,19 +262,37 @@ impl Future for ApiRequest { loop { match &mut self.state { RequestState::Uninitalized => { - let request = self.client.client().get(&self.url); - let request = self.client.apply_standard_headers(request); - let request = request.query(&self.params); - - let f = request - .send() - .map(|mut res| { - res.json::() - }) - .and_then(|json| json); - self.state = RequestState::Polling(Box::new(f)); + /*TODO use poll_ready*/ + let mut mut_client = self.inner.client.inner.inner.lock().unwrap(); + let (resp_tx, resp_rx) = oneshot::channel(); + match &mut mut_client.chan { + Some(chan) => { + chan.try_send((self.inner.clone(), resp_tx)); + }, + None => { + let (mut chan_tx, chan_rx) = mpsc::channel(30); + chan_tx.try_send((self.inner.clone(), resp_tx)); + + tokio::spawn(handle_requests(chan_rx)); + mut_client.chan.replace(chan_tx); + } + } + + self.state = RequestState::PollChannel(resp_rx); + }, + RequestState::PollChannel(chan) => { + let status = chan.poll(); + match status { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(mut res)) => { + let f = res.json::(); + self.state = RequestState::PollParse(Box::new(f)); + continue; + }, + _ => panic!("TODO...") + } }, - RequestState::Polling(future) => { + RequestState::PollParse(future) => { let res = try_ready!(future.poll()); return Ok(Async::Ready(res)); } -- cgit v1.2.3