summaryrefslogtreecommitdiff
path: root/src/sync
diff options
context:
space:
mode:
authorDavid Blajda <blajda@hotmail.com>2018-12-20 00:09:49 +0000
committerDavid Blajda <blajda@hotmail.com>2018-12-20 00:09:49 +0000
commit61507ec1fa61fe134547a7eac9541375c6215b33 (patch)
treecf7fb84300454680dd0acd06434fa23a71d6211f /src/sync
parenta8e42248cf617767dfd890c0832ea233bb4608fc (diff)
Replace channel based barrier with shared future based barrier
Diffstat (limited to 'src/sync')
-rw-r--r--src/sync/barrier.rs143
-rw-r--r--src/sync/waiter.rs12
2 files changed, 36 insertions, 119 deletions
diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs
index 7e53b12..c0f21b8 100644
--- a/src/sync/barrier.rs
+++ b/src/sync/barrier.rs
@@ -1,130 +1,47 @@
use super::waiter::Waiter;
-use futures::sync::mpsc;
-use futures::sync::oneshot;
use futures::prelude::*;
+use std::sync::{Arc, Mutex};
+use futures::future::{Shared, SharedError};
-pub trait BarrierSync<W: Waiter> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send>;
-}
+use crate::error::ConditionError;
-pub struct Barrier<W: Waiter> {
- sink: Option<mpsc::Sender<(W, oneshot::Sender<Result<W::Item, W::Error>>)>>,
+#[derive(Clone)]
+pub struct Barrier {
+ inner: Arc<Mutex<BarrierRef>>,
}
-impl<W: Waiter + 'static + Send> BarrierSync<W> for Barrier<W> {
- fn wait_for(&mut self, waiter: W) -> Box<Future<Item=W::Item, Error=W::Error> + Send> {
- let (resp_tx, resp_rx) = oneshot::channel();
-
- if self.sink.is_none() {
- let (barrier_tx, barrier_rx) = mpsc::channel(40);
- self.barrier_task(barrier_rx);
- self.sink.replace(barrier_tx);
- }
-
- let chan = self.sink.as_mut().unwrap().clone();
-
- /*TODO: I want meaningful error types... */
- let f1 = chan
- .send((waiter, resp_tx))
- .map_err(|err| W::Error::from(()))
- .and_then(|_| {
- resp_rx.then(|result| {
- match result {
- Ok(Ok(result)) => Ok(result),
- Ok(Err(err)) => Err(err),
- Err(err) => Err(W::Error::from(())),
- }
- })
- });
-
- Box::new(f1)
- }
+struct BarrierRef {
+ condition: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>>
}
-impl<W: Waiter + 'static + Send> Barrier<W> {
- pub fn new() -> Barrier<W> {
+impl Barrier {
+
+ pub fn new() -> Barrier {
Barrier {
- sink: None,
+ inner: Arc::new(Mutex::new(
+ BarrierRef {
+ condition: None,
+ }))
}
}
- fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender<Result<W::Item, W::Error>>)>) {
-
- enum Message<W: Waiter> {
- Request((W, oneshot::Sender<Result<<W as Waiter>::Item, <W as Waiter>::Error>>)),
- OnCondition(Result<(), <W as Waiter>::ConditionError>),
- }
+ pub fn condition(&self, waiter: &impl Waiter)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>
+ {
+ let mut mut_barrier = self.inner.lock().unwrap();
+ let maybe_condition = &mut mut_barrier.condition;
- let mut polling = false;
- let (on_condition_tx, on_condition_rx) = mpsc::unbounded();
- let mut waiters = Vec::new();
- let f1 = receiver.map(|request| Message::Request(request));
- let f2 = on_condition_rx.map(|result| Message::OnCondition(result));
+ let f = maybe_condition.get_or_insert_with(|| {
+ waiter.condition()
+ });
- let inner_condition = on_condition_tx.clone();
let f =
- f1.select(f2).for_each(move |message| {
- match message {
- Message::Request((waiter, backchan)) => {
- if waiter.blocked() && !polling {
- println!("locked");
-
- let c1 = inner_condition.clone();
- let f = waiter
- .condition_poller()
- .map(|_| ())
- .then(|result| {
- c1.send(result).wait();
- Ok(())
- });
- tokio::spawn(f);
- polling = true;
-
- waiters.push((waiter, backchan));
- } else if waiter.blocked() || polling {
- println!("polling");
- waiters.push((waiter, backchan));
- } else {
- println!("Pass along waiter!");
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- Message::OnCondition(result) => {
- polling = false;
- /*Resubmit all waiters back to the request channel
- * At least one waiter will pass the barrier
- */
- match result {
- Ok(_) => {
- while waiters.len() > 0 {
- let (waiter, backchan) = waiters.pop().unwrap();
- let f = waiter.into_future()
- .then(|res| {
- backchan.send(res);
- Ok(())
- });
-
- tokio::spawn(f);
- }
- },
- _ => { panic!("condition channel closed") }
- }
- }
- }
-
-
-
- Ok(())
- })
- .map(|_| ())
- .map_err(|_| ());
-
- tokio::spawn(f);
+ if let Some(_) = f.peek() {
+ let condition = waiter.condition();
+ maybe_condition.replace(condition);
+ maybe_condition.as_ref().unwrap()
+ } else { f };
+ f.clone()
}
}
+
diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs
index 656c42e..8039280 100644
--- a/src/sync/waiter.rs
+++ b/src/sync/waiter.rs
@@ -1,13 +1,13 @@
use futures::sync::oneshot;
use futures::Future;
+use futures::future::{Shared, SharedError};
+use crate::error::ConditionError;
pub trait Waiter {
- type Item: Send + 'static;
- type Error: From<Self::ConditionError>
- + From<oneshot::Canceled> + From<()> + Send + 'static;
- type ConditionError: Send + Clone + 'static;
+ type Item: Default;
+ type Error: From<SharedError<ConditionError>>;
fn blocked(&self) -> bool;
- fn condition_poller(&self) -> Box<Future<Item=(), Error=Self::ConditionError> + Send>;
- fn into_future(self) -> Box<Future<Item=Self::Item, Error=Self::Error> + Send>;
+ fn condition(&self)
+ -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>>;
}