1 use futures::channel::oneshot;
2 use futures::executor::ThreadPool;
3 use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
4 use futures::task::SpawnExt;
5 use std::sync::mpsc;
6 use std::thread;
7
run<F: Future + Send + 'static>(future: F)8 fn run<F: Future + Send + 'static>(future: F) {
9 let tp = ThreadPool::new().unwrap();
10 tp.spawn(future.map(drop)).unwrap();
11 }
12
13 #[test]
join1()14 fn join1() {
15 let (tx, rx) = mpsc::channel();
16 run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
17 assert_eq!(rx.recv(), Ok((1, 2)));
18 assert!(rx.recv().is_err());
19 }
20
21 #[test]
join2()22 fn join2() {
23 let (c1, p1) = oneshot::channel::<i32>();
24 let (c2, p2) = oneshot::channel::<i32>();
25 let (tx, rx) = mpsc::channel();
26 run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
27 assert!(rx.try_recv().is_err());
28 c1.send(1).unwrap();
29 assert!(rx.try_recv().is_err());
30 c2.send(2).unwrap();
31 assert_eq!(rx.recv(), Ok((1, 2)));
32 assert!(rx.recv().is_err());
33 }
34
35 #[test]
join3()36 fn join3() {
37 let (c1, p1) = oneshot::channel::<i32>();
38 let (c2, p2) = oneshot::channel::<i32>();
39 let (tx, rx) = mpsc::channel();
40 run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
41 assert!(rx.try_recv().is_err());
42 drop(c1);
43 assert_eq!(rx.recv(), Ok(1));
44 assert!(rx.recv().is_err());
45 drop(c2);
46 }
47
48 #[test]
join4()49 fn join4() {
50 let (c1, p1) = oneshot::channel::<i32>();
51 let (c2, p2) = oneshot::channel::<i32>();
52 let (tx, rx) = mpsc::channel();
53 run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
54 assert!(rx.try_recv().is_err());
55 drop(c1);
56 assert!(rx.recv().is_ok());
57 drop(c2);
58 assert!(rx.recv().is_err());
59 }
60
61 #[test]
join5()62 fn join5() {
63 let (c1, p1) = oneshot::channel::<i32>();
64 let (c2, p2) = oneshot::channel::<i32>();
65 let (c3, p3) = oneshot::channel::<i32>();
66 let (tx, rx) = mpsc::channel();
67 run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
68 assert!(rx.try_recv().is_err());
69 c1.send(1).unwrap();
70 assert!(rx.try_recv().is_err());
71 c2.send(2).unwrap();
72 assert!(rx.try_recv().is_err());
73 c3.send(3).unwrap();
74 assert_eq!(rx.recv(), Ok(((1, 2), 3)));
75 assert!(rx.recv().is_err());
76 }
77
78 #[test]
select1()79 fn select1() {
80 let (c1, p1) = oneshot::channel::<i32>();
81 let (c2, p2) = oneshot::channel::<i32>();
82 let (tx, rx) = mpsc::channel();
83 run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
84 assert!(rx.try_recv().is_err());
85 c1.send(1).unwrap();
86 let (v, p2) = rx.recv().unwrap().into_inner();
87 assert_eq!(v, 1);
88 assert!(rx.recv().is_err());
89
90 let (tx, rx) = mpsc::channel();
91 run(p2.map_ok(move |v| tx.send(v).unwrap()));
92 c2.send(2).unwrap();
93 assert_eq!(rx.recv(), Ok(2));
94 assert!(rx.recv().is_err());
95 }
96
97 #[test]
select2()98 fn select2() {
99 let (c1, p1) = oneshot::channel::<i32>();
100 let (c2, p2) = oneshot::channel::<i32>();
101 let (tx, rx) = mpsc::channel();
102 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
103 assert!(rx.try_recv().is_err());
104 drop(c1);
105 let (v, p2) = rx.recv().unwrap();
106 assert_eq!(v, 1);
107 assert!(rx.recv().is_err());
108
109 let (tx, rx) = mpsc::channel();
110 run(p2.map_ok(move |v| tx.send(v).unwrap()));
111 c2.send(2).unwrap();
112 assert_eq!(rx.recv(), Ok(2));
113 assert!(rx.recv().is_err());
114 }
115
116 #[test]
select3()117 fn select3() {
118 let (c1, p1) = oneshot::channel::<i32>();
119 let (c2, p2) = oneshot::channel::<i32>();
120 let (tx, rx) = mpsc::channel();
121 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
122 assert!(rx.try_recv().is_err());
123 drop(c1);
124 let (v, p2) = rx.recv().unwrap();
125 assert_eq!(v, 1);
126 assert!(rx.recv().is_err());
127
128 let (tx, rx) = mpsc::channel();
129 run(p2.map_err(move |_v| tx.send(2).unwrap()));
130 drop(c2);
131 assert_eq!(rx.recv(), Ok(2));
132 assert!(rx.recv().is_err());
133 }
134
135 #[test]
select4()136 fn select4() {
137 let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
138
139 let t = thread::spawn(move || {
140 for c in rx {
141 c.send(1).unwrap();
142 }
143 });
144
145 let (tx2, rx2) = mpsc::channel();
146 for _ in 0..10000 {
147 let (c1, p1) = oneshot::channel::<i32>();
148 let (c2, p2) = oneshot::channel::<i32>();
149
150 let tx3 = tx2.clone();
151 run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
152 tx.send(c1).unwrap();
153 rx2.recv().unwrap();
154 drop(c2);
155 }
156 drop(tx);
157
158 t.join().unwrap();
159 }
160