1 use futures::channel::oneshot;
2 use futures::executor::ThreadPool;
3 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
4 use futures::task::SpawnExt;
5 use std::sync::mpsc;
6 use std::thread;
7 
run<F: Future + Send + 'static>(future: F)8 fn run<F: Future + Send + 'static>(future: F) {
9     let tp = ThreadPool::new().unwrap();
10     tp.spawn(future.map(drop)).unwrap();
11 }
12 
13 #[test]
join1()14 fn join1() {
15     let (tx, rx) = mpsc::channel();
16     run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
17     assert_eq!(rx.recv(), Ok((1, 2)));
18     assert!(rx.recv().is_err());
19 }
20 
21 #[test]
join2()22 fn join2() {
23     let (c1, p1) = oneshot::channel::<i32>();
24     let (c2, p2) = oneshot::channel::<i32>();
25     let (tx, rx) = mpsc::channel();
26     run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
27     assert!(rx.try_recv().is_err());
28     c1.send(1).unwrap();
29     assert!(rx.try_recv().is_err());
30     c2.send(2).unwrap();
31     assert_eq!(rx.recv(), Ok((1, 2)));
32     assert!(rx.recv().is_err());
33 }
34 
35 #[test]
join3()36 fn join3() {
37     let (c1, p1) = oneshot::channel::<i32>();
38     let (c2, p2) = oneshot::channel::<i32>();
39     let (tx, rx) = mpsc::channel();
40     run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
41     assert!(rx.try_recv().is_err());
42     drop(c1);
43     assert_eq!(rx.recv(), Ok(1));
44     assert!(rx.recv().is_err());
45     drop(c2);
46 }
47 
48 #[test]
join4()49 fn join4() {
50     let (c1, p1) = oneshot::channel::<i32>();
51     let (c2, p2) = oneshot::channel::<i32>();
52     let (tx, rx) = mpsc::channel();
53     run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
54     assert!(rx.try_recv().is_err());
55     drop(c1);
56     assert!(rx.recv().is_ok());
57     drop(c2);
58     assert!(rx.recv().is_err());
59 }
60 
61 #[test]
join5()62 fn join5() {
63     let (c1, p1) = oneshot::channel::<i32>();
64     let (c2, p2) = oneshot::channel::<i32>();
65     let (c3, p3) = oneshot::channel::<i32>();
66     let (tx, rx) = mpsc::channel();
67     run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
68     assert!(rx.try_recv().is_err());
69     c1.send(1).unwrap();
70     assert!(rx.try_recv().is_err());
71     c2.send(2).unwrap();
72     assert!(rx.try_recv().is_err());
73     c3.send(3).unwrap();
74     assert_eq!(rx.recv(), Ok(((1, 2), 3)));
75     assert!(rx.recv().is_err());
76 }
77 
78 #[test]
select1()79 fn select1() {
80     let (c1, p1) = oneshot::channel::<i32>();
81     let (c2, p2) = oneshot::channel::<i32>();
82     let (tx, rx) = mpsc::channel();
83     run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
84     assert!(rx.try_recv().is_err());
85     c1.send(1).unwrap();
86     let (v, p2) = rx.recv().unwrap().into_inner();
87     assert_eq!(v, 1);
88     assert!(rx.recv().is_err());
89 
90     let (tx, rx) = mpsc::channel();
91     run(p2.map_ok(move |v| tx.send(v).unwrap()));
92     c2.send(2).unwrap();
93     assert_eq!(rx.recv(), Ok(2));
94     assert!(rx.recv().is_err());
95 }
96 
97 #[test]
select2()98 fn select2() {
99     let (c1, p1) = oneshot::channel::<i32>();
100     let (c2, p2) = oneshot::channel::<i32>();
101     let (tx, rx) = mpsc::channel();
102     run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
103     assert!(rx.try_recv().is_err());
104     drop(c1);
105     let (v, p2) = rx.recv().unwrap();
106     assert_eq!(v, 1);
107     assert!(rx.recv().is_err());
108 
109     let (tx, rx) = mpsc::channel();
110     run(p2.map_ok(move |v| tx.send(v).unwrap()));
111     c2.send(2).unwrap();
112     assert_eq!(rx.recv(), Ok(2));
113     assert!(rx.recv().is_err());
114 }
115 
116 #[test]
select3()117 fn select3() {
118     let (c1, p1) = oneshot::channel::<i32>();
119     let (c2, p2) = oneshot::channel::<i32>();
120     let (tx, rx) = mpsc::channel();
121     run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
122     assert!(rx.try_recv().is_err());
123     drop(c1);
124     let (v, p2) = rx.recv().unwrap();
125     assert_eq!(v, 1);
126     assert!(rx.recv().is_err());
127 
128     let (tx, rx) = mpsc::channel();
129     run(p2.map_err(move |_v| tx.send(2).unwrap()));
130     drop(c2);
131     assert_eq!(rx.recv(), Ok(2));
132     assert!(rx.recv().is_err());
133 }
134 
135 #[test]
select4()136 fn select4() {
137     let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
138 
139     let t = thread::spawn(move || {
140         for c in rx {
141             c.send(1).unwrap();
142         }
143     });
144 
145     let (tx2, rx2) = mpsc::channel();
146     for _ in 0..10000 {
147         let (c1, p1) = oneshot::channel::<i32>();
148         let (c2, p2) = oneshot::channel::<i32>();
149 
150         let tx3 = tx2.clone();
151         run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
152         tx.send(c1).unwrap();
153         rx2.recv().unwrap();
154         drop(c2);
155     }
156     drop(tx);
157 
158     t.join().unwrap();
159 }
160