1 use futures::task;
2 use futures::stream;
3 use futures::future;
4 use futures_util::lock::BiLock;
5 use std::thread;
6 
7 mod support;
8 use support::*;
9 
10 #[test]
smoke()11 fn smoke() {
12     let future = future::lazy(|_| {
13         let (a, b) = BiLock::new(1);
14 
15         {
16             let mut lock = match a.poll_lock() {
17                 Poll::Ready(l) => l,
18                 Poll::Pending => panic!("poll not ready"),
19             };
20             assert_eq!(*lock, 1);
21             *lock = 2;
22 
23             assert!(b.poll_lock().is_pending());
24             assert!(a.poll_lock().is_pending());
25         }
26 
27         assert!(b.poll_lock().is_ready());
28         assert!(a.poll_lock().is_ready());
29 
30         {
31             let lock = match b.poll_lock() {
32                 Poll::Ready(l) => l,
33                 Poll::Pending => panic!("poll not ready"),
34             };
35             assert_eq!(*lock, 2);
36         }
37 
38         assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
39 
40         Ok::<(), ()>(())
41     });
42 
43     assert!(task::spawn(future)
44                 .poll_future_notify(&notify_noop(), 0)
45                 .expect("failure in poll")
46                 .is_ready());
47 }
48 
49 #[test]
concurrent()50 fn concurrent() {
51     const N: usize = 10000;
52     let (a, b) = BiLock::new(0);
53 
54     let a = Increment {
55         a: Some(a),
56         remaining: N,
57     };
58     let b = stream::iter_ok(0..N).fold(b, |b, _n| {
59         b.lock().map(|mut b| {
60             *b += 1;
61             b.unlock()
62         })
63     });
64 
65     let t1 = thread::spawn(move || a.wait());
66     let b = b.wait().expect("b error");
67     let a = t1.join().unwrap().expect("a error");
68 
69     match a.poll_lock() {
70         Poll::Ready(l) => assert_eq!(*l, 2 * N),
71         Poll::Pending => panic!("poll not ready"),
72     }
73     match b.poll_lock() {
74         Poll::Ready(l) => assert_eq!(*l, 2 * N),
75         Poll::Pending => panic!("poll not ready"),
76     }
77 
78     assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
79 
80     struct Increment {
81         remaining: usize,
82         a: Option<BiLock<usize>>,
83     }
84 
85     impl Future for Increment {
86         type Item = BiLock<usize>;
87         type Error = ();
88 
89         fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
90             loop {
91                 if self.remaining == 0 {
92                     return Ok(self.a.take().unwrap().into())
93                 }
94 
95                 let a = self.a.as_ref().unwrap();
96                 let mut a = match a.poll_lock() {
97                     Poll::Ready(l) => l,
98                     Poll::Pending => return Ok(Poll::Pending),
99                 };
100                 self.remaining -= 1;
101                 *a += 1;
102             }
103         }
104     }
105 }
106