summaryrefslogtreecommitdiff
path: root/src/sync/barrier.rs
blob: c0f21b82e9a87adeaca9f0c73f0af512f3835dc7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
use super::waiter::Waiter;
use futures::prelude::*;
use std::sync::{Arc, Mutex};
use futures::future::{Shared, SharedError};

use crate::error::ConditionError;

#[derive(Clone)]
pub struct Barrier {
    inner: Arc<Mutex<BarrierRef>>,
}

struct BarrierRef {
    condition: Option<Shared<Box<Future<Item=(), Error=ConditionError> + Send>>>
}

impl Barrier {

    pub fn new() -> Barrier {
        Barrier {
            inner: Arc::new(Mutex::new(
            BarrierRef {
                condition: None,
            }))
        }
    }

    pub fn condition(&self, waiter: &impl Waiter) 
        -> Shared<Box<Future<Item=(), Error=ConditionError> + Send>> 
    {
        let mut mut_barrier = self.inner.lock().unwrap();
        let maybe_condition = &mut mut_barrier.condition;

        let f = maybe_condition.get_or_insert_with(|| {
            waiter.condition()
        });

        let f =
            if let Some(_) = f.peek() {
                let condition = waiter.condition();
                maybe_condition.replace(condition);
                maybe_condition.as_ref().unwrap()
            } else { f };
        f.clone()
    }
}