summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-17 07:02:07 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-17 07:02:07 +0000
commit21ebcdb53db06557fe73e195742c038ed91ef331 (patch)
tree0e197f06683e8b14251e9f6579d747ec828f775d
parentd9887360a2b015405abe550858391034e9dda9a1 (diff)
:WIP: requests are sent to a central location
-rw-r--r--src/helix/mod.rs108
1 files changed, 86 insertions, 22 deletions
diff --git a/src/helix/mod.rs b/src/helix/mod.rs
index 0ddd910..b1edc29 100644
--- a/src/helix/mod.rs
+++ b/src/helix/mod.rs
@@ -10,6 +10,7 @@ pub mod namespaces;
use std::collections::HashSet;
use self::models::{DataContainer, PaginationContainer, Clip};
+use futures::{Sink, Stream};
type EndPointResult<T> = Box<Future<Item=T, Error=reqwest::Error> + Send>;
@@ -24,6 +25,7 @@ pub struct Namespace<T> {
_type: PhantomData<T>
}
+
impl<T> Namespace<T> {
pub fn new(client: &Client) -> Self {
Namespace {
@@ -50,12 +52,19 @@ pub struct Client {
inner: Arc<ClientRef>,
}
+use reqwest::r#async::Response;
+use futures::sync::oneshot;
+
struct MutClientRef {
token: Option<String>,
scopes: Vec<Scope>,
- previous: Option<Client>
+ previous: Option<Client>,
+ chan: Option<mpsc::Sender<(Arc<RequestRef>, oneshot::Sender<Response>)>>,
}
+use futures::sync::mpsc;
+
+
struct ClientRef {
id: String,
client: ReqwestClient,
@@ -69,12 +78,14 @@ impl Client {
}
pub fn new_with_client(id: &str, client: ReqwestClient) -> Client {
+
Client {
inner: Arc::new(ClientRef {
id: id.to_owned(),
client: client,
inner: Mutex::new(
MutClientRef {
+ chan: None,
token: None,
scopes: Vec::new(),
previous: None
@@ -115,7 +126,7 @@ impl Client {
}
}
- fn apply_standard_headers(&self, request: RequestBuilder)
+ pub fn apply_standard_headers(&self, request: RequestBuilder)
-> RequestBuilder
{
let client_header = header::HeaderValue::from_str(self.id()).unwrap();
@@ -175,16 +186,22 @@ impl AuthClientBuilder {
use std::collections::BTreeMap;
+struct RequestRef {
+ url: String,
+ params: BTreeMap<String, String>,
+ client: Client,
+
+}
+
enum RequestState<T> {
Uninitalized,
- Polling(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>)
+ PollChannel(oneshot::Receiver<Response>),
+ PollParse(Box<dyn Future<Item=T, Error=reqwest::Error> + Send>),
}
pub struct ApiRequest<T> {
- url: String,
- params: BTreeMap<String, String>,
- client: Client,
- state: RequestState<T>,
+ inner: Arc<RequestRef>,
+ state: RequestState<T>
}
impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
@@ -195,19 +212,48 @@ impl<T: DeserializeOwned + 'static + Send> ApiRequest<T> {
) -> ApiRequest<T>
{
ApiRequest {
- url,
- params,
- client: client,
+ inner: Arc::new( RequestRef {
+ url: url,
+ params: params,
+ client: client,
+ }),
state: RequestState::Uninitalized
}
}
}
+
use futures::Poll;
use serde::de::DeserializeOwned;
use futures::Async;
use futures::try_ready;
+fn handle_requests(channel: mpsc::Receiver<(Arc<RequestRef>, oneshot::Sender<Response>)>)
+ -> impl Future<Item=(), Error=()>
+{
+ channel.for_each(|(request, notify)| {
+ let _request = request.client.client().get(&request.url);
+ let _request = request.client.apply_standard_headers(_request);
+ let _request = _request.query(&request.params);
+
+ let f = _request
+ .send()
+ .map(move |response| {
+ notify.send(response);
+ ()
+ }).
+ map_err(|_| {
+ panic!("TODO....")
+ });
+
+ tokio::spawn(f);
+
+ Ok(())
+ })
+ .map(|_| ())
+ .map_err(|_| ())
+}
+
impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
type Item = T;
type Error = reqwest::Error;
@@ -216,19 +262,37 @@ impl<T: DeserializeOwned + 'static + Send> Future for ApiRequest<T> {
loop {
match &mut self.state {
RequestState::Uninitalized => {
- let request = self.client.client().get(&self.url);
- let request = self.client.apply_standard_headers(request);
- let request = request.query(&self.params);
-
- let f = request
- .send()
- .map(|mut res| {
- res.json::<T>()
- })
- .and_then(|json| json);
- self.state = RequestState::Polling(Box::new(f));
+ /*TODO use poll_ready*/
+ let mut mut_client = self.inner.client.inner.inner.lock().unwrap();
+ let (resp_tx, resp_rx) = oneshot::channel();
+ match &mut mut_client.chan {
+ Some(chan) => {
+ chan.try_send((self.inner.clone(), resp_tx));
+ },
+ None => {
+ let (mut chan_tx, chan_rx) = mpsc::channel(30);
+ chan_tx.try_send((self.inner.clone(), resp_tx));
+
+ tokio::spawn(handle_requests(chan_rx));
+ mut_client.chan.replace(chan_tx);
+ }
+ }
+
+ self.state = RequestState::PollChannel(resp_rx);
+ },
+ RequestState::PollChannel(chan) => {
+ let status = chan.poll();
+ match status {
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Ok(Async::Ready(mut res)) => {
+ let f = res.json::<T>();
+ self.state = RequestState::PollParse(Box::new(f));
+ continue;
+ },
+ _ => panic!("TODO...")
+ }
},
- RequestState::Polling(future) => {
+ RequestState::PollParse(future) => {
let res = try_ready!(future.poll());
return Ok(Async::Ready(res));
}