1 use futures::future;
2 use futures::executor::block_on;
3 use futures::channel::oneshot::{self, Canceled};
4 use std::sync::mpsc::{channel, TryRecvError};
5 
6 mod support;
7 use support::*;
8 
unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E>9 fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
10     match r {
11         Ok(Either::Left((t, _))) |
12         Ok(Either::Right((t, _))) => Ok(t),
13         Err(Either::Left((e, _))) |
14         Err(Either::Right((e, _))) => Err(e),
15     }
16 }
17 
18 #[test]
result_smoke()19 fn result_smoke() {
20     fn is_future_v<A, B, C>(_: C)
21         where A: Send + 'static,
22               B: Send + 'static,
23               C: Future<Item=A, Error=B>
24     {}
25 
26     is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
27     is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
28     is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
29     is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
30     is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
31     is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
32 
33     assert_done(|| f_ok(1), r_ok(1));
34     assert_done(|| f_err(1), r_err(1));
35     assert_done(|| result(Ok(1)), r_ok(1));
36     assert_done(|| result(Err(1)), r_err(1));
37     assert_done(|| ok(1), r_ok(1));
38     assert_done(|| err(1), r_err(1));
39     assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
40     assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
41     assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
42     assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
43     assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
44     assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
45     assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
46     assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
47     assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
48     assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
49     assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
50     assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
51     assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
52     assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
53     assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
54     assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
55     assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
56     assert_done(|| f_ok(1).join(f_err(1)), Err(1));
57     assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
58     assert_done(|| f_err(1).join(f_ok(1)), Err(1));
59     assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
60     assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
61     assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
62     assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
63 }
64 
65 #[test]
test_empty()66 fn test_empty() {
67     fn empty() -> Empty<i32, u32> { future::empty() }
68 
69     assert_empty(|| empty());
70     assert_empty(|| empty().select(empty()));
71     assert_empty(|| empty().join(empty()));
72     assert_empty(|| empty().join(f_ok(1)));
73     assert_empty(|| f_ok(1).join(empty()));
74     assert_empty(|| empty().or_else(move |_| empty()));
75     assert_empty(|| empty().and_then(move |_| empty()));
76     assert_empty(|| f_err(1).or_else(move |_| empty()));
77     assert_empty(|| f_ok(1).and_then(move |_| empty()));
78     assert_empty(|| empty().map(|a| a + 1));
79     assert_empty(|| empty().map_err(|a| a + 1));
80     assert_empty(|| empty().then(|a| a));
81 }
82 
83 #[test]
test_ok()84 fn test_ok() {
85     assert_done(|| ok(1), r_ok(1));
86     assert_done(|| err(1), r_err(1));
87 }
88 
89 #[test]
flatten()90 fn flatten() {
91     fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
92         future::ok(a)
93     }
94     fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
95         future::err(b)
96     }
97 
98     assert_done(|| ok(ok(1)).flatten(), r_ok(1));
99     assert_done(|| ok(err(1)).flatten(), r_err(1));
100     assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
101     assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1));
102     assert_empty(|| ok(empty::<i32, u32>()).flatten());
103     assert_empty(|| empty::<i32, u32>().map(ok).flatten());
104 }
105 
106 #[test]
smoke_oneshot()107 fn smoke_oneshot() {
108     assert_done(|| {
109         let (c, p) = oneshot::channel();
110         c.send(1).unwrap();
111         p
112     }, Ok(1));
113     assert_done(|| {
114         let (c, p) = oneshot::channel::<i32>();
115         drop(c);
116         p
117     }, Err(Canceled));
118     let mut completes = Vec::new();
119     assert_empty(|| {
120         let (a, b) = oneshot::channel::<i32>();
121         completes.push(a);
122         b
123     });
124 
125     let (c, mut p) = oneshot::channel::<i32>();
126     drop(c);
127     let res = panic_waker_lw(|lw| p.poll(lw));
128     assert!(res.is_err());
129     let (c, p) = oneshot::channel::<i32>();
130     drop(c);
131     let (tx, rx) = channel();
132     p.then(move |_| {
133         tx.send(())
134     }).forget();
135     rx.recv().unwrap();
136 }
137 
138 #[test]
select_cancels()139 fn select_cancels() {
140     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
141     let ((btx, brx), (dtx, drx)) = (channel(), channel());
142     let b = b.map(move |b| { btx.send(b).unwrap(); b });
143     let d = d.map(move |d| { dtx.send(d).unwrap(); d });
144 
145     let mut f = b.select(d).then(unselect);
146     // assert!(f.poll(&mut Task::new()).is_pending());
147     assert!(brx.try_recv().is_err());
148     assert!(drx.try_recv().is_err());
149     a.send(1).unwrap();
150     noop_waker_lw(|lw| {
151         let res = f.poll(lw);
152         assert!(res.ok().unwrap().is_ready());
153         assert_eq!(brx.recv().unwrap(), 1);
154         drop(c);
155         assert!(drx.recv().is_err());
156 
157         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
158         let ((btx, _brx), (dtx, drx)) = (channel(), channel());
159         let b = b.map(move |b| { btx.send(b).unwrap(); b });
160         let d = d.map(move |d| { dtx.send(d).unwrap(); d });
161 
162         let mut f = b.select(d).then(unselect);
163         assert!(f.poll(lw).ok().unwrap().is_pending());
164         assert!(f.poll(lw).ok().unwrap().is_pending());
165         a.send(1).unwrap();
166         assert!(f.poll(lw).ok().unwrap().is_ready());
167         drop((c, f));
168         assert!(drx.recv().is_err());
169     })
170 }
171 
172 #[test]
join_cancels()173 fn join_cancels() {
174     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
175     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
176     let b = b.map(move |b| { btx.send(b).unwrap(); b });
177     let d = d.map(move |d| { dtx.send(d).unwrap(); d });
178 
179     let mut f = b.join(d);
180     drop(a);
181     let res = panic_waker_lw(|lw| f.poll(lw));
182     assert!(res.is_err());
183     drop(c);
184     assert!(drx.recv().is_err());
185 
186     let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
187     let ((btx, _brx), (dtx, drx)) = (channel(), channel());
188     let b = b.map(move |b| { btx.send(b).unwrap(); b });
189     let d = d.map(move |d| { dtx.send(d).unwrap(); d });
190 
191     let (tx, rx) = channel();
192     let f = b.join(d);
193     f.then(move |_| {
194         tx.send(()).unwrap();
195         let res: Result<(), ()> = Ok(());
196         res
197     }).forget();
198     assert!(rx.try_recv().is_err());
199     drop(a);
200     rx.recv().unwrap();
201     drop(c);
202     assert!(drx.recv().is_err());
203 }
204 
205 #[test]
join_incomplete()206 fn join_incomplete() {
207     let (a, b) = oneshot::channel::<i32>();
208     let (tx, rx) = channel();
209     noop_waker_lw(|lw| {
210         let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap());
211         assert!(f.poll(lw).ok().unwrap().is_pending());
212         assert!(rx.try_recv().is_err());
213         a.send(2).unwrap();
214         assert!(f.poll(lw).ok().unwrap().is_ready());
215         assert_eq!(rx.recv().unwrap(), (1, 2));
216 
217         let (a, b) = oneshot::channel::<i32>();
218         let (tx, rx) = channel();
219         let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap());
220         assert!(f.poll(lw).ok().unwrap().is_pending());
221         assert!(rx.try_recv().is_err());
222         a.send(1).unwrap();
223         assert!(f.poll(lw).ok().unwrap().is_ready());
224         assert_eq!(rx.recv().unwrap(), (1, 2));
225 
226         let (a, b) = oneshot::channel::<i32>();
227         let (tx, rx) = channel();
228         let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap());
229         assert!(f.poll(lw).ok().unwrap().is_pending());
230         assert!(rx.try_recv().is_err());
231         drop(a);
232         assert!(f.poll(lw).is_err());
233         assert_eq!(rx.recv().unwrap(), 2);
234 
235         let (a, b) = oneshot::channel::<i32>();
236         let (tx, rx) = channel();
237         let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap());
238         assert!(f.poll(lw).ok().unwrap().is_pending());
239         assert!(rx.try_recv().is_err());
240         drop(a);
241         assert!(f.poll(lw).is_err());
242         assert_eq!(rx.recv().unwrap(), 1);
243     })
244 }
245 
246 
247 #[test]
select2()248 fn select2() {
249     assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
250     assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2));
251     assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
252     assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
253 
254     assert_done(|| {
255         f_ok(1).select(f_ok(2))
256                .map_err(|_| 0)
257                .and_then(|either_tup| {
258                    let (a, b) = either_tup.into_inner();
259                    b.map(move |b| a + b)
260                })
261     }, Ok(3));
262 
263     // Finish one half of a select and then fail the second, ensuring that we
264     // get the notification of the second one.
265     {
266         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
267         let f = b.select(d);
268         let (tx, rx) = channel();
269         f.map(move |r| tx.send(r).unwrap()).forget();
270         a.send(1).unwrap();
271         let (val, next) = rx.recv().unwrap().into_inner();
272         assert_eq!(val, 1);
273         let (tx, rx) = channel();
274         next.map_err(move |_r| tx.send(2).unwrap()).forget();
275         assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
276         drop(c);
277         assert_eq!(rx.recv().unwrap(), 2);
278     }
279 
280     // Fail the second half and ensure that we see the first one finish
281     {
282         let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
283         let f = b.select(d);
284         let (tx, rx) = channel();
285         f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget();
286         drop(c);
287         let (val, next) = rx.recv().unwrap();
288         assert_eq!(val, 1);
289         let (tx, rx) = channel();
290         next.map(move |r| tx.send(r).unwrap()).forget();
291         assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
292         a.send(2).unwrap();
293         assert_eq!(rx.recv().unwrap(), 2);
294     }
295 
296     // Cancelling the first half should cancel the second
297     {
298         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
299         let ((btx, brx), (dtx, drx)) = (channel(), channel());
300         let b = b.map(move |v| { btx.send(v).unwrap(); v });
301         let d = d.map(move |v| { dtx.send(v).unwrap(); v });
302         let f = b.select(d);
303         drop(f);
304         assert!(drx.recv().is_err());
305         assert!(brx.recv().is_err());
306     }
307 
308     // Cancel after a schedule
309     {
310         let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
311         let ((btx, brx), (dtx, drx)) = (channel(), channel());
312         let b = b.map(move |v| { btx.send(v).unwrap(); v });
313         let d = d.map(move |v| { dtx.send(v).unwrap(); v });
314         let mut f = b.select(d);
315         let _res = noop_waker_lw(|lw| f.poll(lw));
316         drop(f);
317         assert!(drx.recv().is_err());
318         assert!(brx.recv().is_err());
319     }
320 
321     // Cancel propagates
322     {
323         let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
324         let ((btx, brx), (dtx, drx)) = (channel(), channel());
325         let b = b.map(move |v| { btx.send(v).unwrap(); v });
326         let d = d.map(move |v| { dtx.send(v).unwrap(); v });
327         let (tx, rx) = channel();
328         b.select(d).map(move |_| tx.send(()).unwrap()).forget();
329         drop(a);
330         assert!(drx.recv().is_err());
331         assert!(brx.recv().is_err());
332         assert!(rx.recv().is_err());
333     }
334 
335     // Cancel on early drop
336     {
337         let (tx, rx) = channel();
338         let f = f_ok(1).select(empty::<_, ()>().map(move |()| {
339             tx.send(()).unwrap();
340             1
341         }));
342         drop(f);
343         assert!(rx.recv().is_err());
344     }
345 }
346 
347 #[test]
option()348 fn option() {
349     assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future()));
350     assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future()));
351 }
352