From 61507ec1fa61fe134547a7eac9541375c6215b33 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 20 Dec 2018 00:09:49 +0000 Subject: Replace channel based barrier with shared future based barrier --- src/sync/barrier.rs | 143 +++++++++++----------------------------------------- src/sync/waiter.rs | 12 ++--- 2 files changed, 36 insertions(+), 119 deletions(-) (limited to 'src/sync') diff --git a/src/sync/barrier.rs b/src/sync/barrier.rs index 7e53b12..c0f21b8 100644 --- a/src/sync/barrier.rs +++ b/src/sync/barrier.rs @@ -1,130 +1,47 @@ use super::waiter::Waiter; -use futures::sync::mpsc; -use futures::sync::oneshot; use futures::prelude::*; +use std::sync::{Arc, Mutex}; +use futures::future::{Shared, SharedError}; -pub trait BarrierSync { - fn wait_for(&mut self, waiter: W) -> Box + Send>; -} +use crate::error::ConditionError; -pub struct Barrier { - sink: Option>)>>, +#[derive(Clone)] +pub struct Barrier { + inner: Arc>, } -impl BarrierSync for Barrier { - fn wait_for(&mut self, waiter: W) -> Box + Send> { - let (resp_tx, resp_rx) = oneshot::channel(); - - if self.sink.is_none() { - let (barrier_tx, barrier_rx) = mpsc::channel(40); - self.barrier_task(barrier_rx); - self.sink.replace(barrier_tx); - } - - let chan = self.sink.as_mut().unwrap().clone(); - - /*TODO: I want meaningful error types... */ - let f1 = chan - .send((waiter, resp_tx)) - .map_err(|err| W::Error::from(())) - .and_then(|_| { - resp_rx.then(|result| { - match result { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err), - Err(err) => Err(W::Error::from(())), - } - }) - }); - - Box::new(f1) - } +struct BarrierRef { + condition: Option + Send>>> } -impl Barrier { - pub fn new() -> Barrier { +impl Barrier { + + pub fn new() -> Barrier { Barrier { - sink: None, + inner: Arc::new(Mutex::new( + BarrierRef { + condition: None, + })) } } - fn barrier_task(&self, receiver: mpsc::Receiver<(W, oneshot::Sender>)>) { - - enum Message { - Request((W, oneshot::Sender::Item, ::Error>>)), - OnCondition(Result<(), ::ConditionError>), - } + pub fn condition(&self, waiter: &impl Waiter) + -> Shared + Send>> + { + let mut mut_barrier = self.inner.lock().unwrap(); + let maybe_condition = &mut mut_barrier.condition; - let mut polling = false; - let (on_condition_tx, on_condition_rx) = mpsc::unbounded(); - let mut waiters = Vec::new(); - let f1 = receiver.map(|request| Message::Request(request)); - let f2 = on_condition_rx.map(|result| Message::OnCondition(result)); + let f = maybe_condition.get_or_insert_with(|| { + waiter.condition() + }); - let inner_condition = on_condition_tx.clone(); let f = - f1.select(f2).for_each(move |message| { - match message { - Message::Request((waiter, backchan)) => { - if waiter.blocked() && !polling { - println!("locked"); - - let c1 = inner_condition.clone(); - let f = waiter - .condition_poller() - .map(|_| ()) - .then(|result| { - c1.send(result).wait(); - Ok(()) - }); - tokio::spawn(f); - polling = true; - - waiters.push((waiter, backchan)); - } else if waiter.blocked() || polling { - println!("polling"); - waiters.push((waiter, backchan)); - } else { - println!("Pass along waiter!"); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - Message::OnCondition(result) => { - polling = false; - /*Resubmit all waiters back to the request channel - * At least one waiter will pass the barrier - */ - match result { - Ok(_) => { - while waiters.len() > 0 { - let (waiter, backchan) = waiters.pop().unwrap(); - let f = waiter.into_future() - .then(|res| { - backchan.send(res); - Ok(()) - }); - - tokio::spawn(f); - } - }, - _ => { panic!("condition channel closed") } - } - } - } - - - - Ok(()) - }) - .map(|_| ()) - .map_err(|_| ()); - - tokio::spawn(f); + if let Some(_) = f.peek() { + let condition = waiter.condition(); + maybe_condition.replace(condition); + maybe_condition.as_ref().unwrap() + } else { f }; + f.clone() } } + diff --git a/src/sync/waiter.rs b/src/sync/waiter.rs index 656c42e..8039280 100644 --- a/src/sync/waiter.rs +++ b/src/sync/waiter.rs @@ -1,13 +1,13 @@ use futures::sync::oneshot; use futures::Future; +use futures::future::{Shared, SharedError}; +use crate::error::ConditionError; pub trait Waiter { - type Item: Send + 'static; - type Error: From - + From + From<()> + Send + 'static; - type ConditionError: Send + Clone + 'static; + type Item: Default; + type Error: From>; fn blocked(&self) -> bool; - fn condition_poller(&self) -> Box + Send>; - fn into_future(self) -> Box + Send>; + fn condition(&self) + -> Shared + Send>>; } -- cgit v1.2.3