1 use futures::future;
2 use futures::executor::block_on;
3 use futures::channel::oneshot::{self, Canceled};
4 use std::sync::mpsc::{channel, TryRecvError};
5
6 mod support;
7 use support::*;
8
unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E>9 fn unselect<T, E, A, B>(r: Result<Either<(T, B), (T, A)>, Either<(E, B), (E, A)>>) -> Result<T, E> {
10 match r {
11 Ok(Either::Left((t, _))) |
12 Ok(Either::Right((t, _))) => Ok(t),
13 Err(Either::Left((e, _))) |
14 Err(Either::Right((e, _))) => Err(e),
15 }
16 }
17
18 #[test]
result_smoke()19 fn result_smoke() {
20 fn is_future_v<A, B, C>(_: C)
21 where A: Send + 'static,
22 B: Send + 'static,
23 C: Future<Item=A, Error=B>
24 {}
25
26 is_future_v::<i32, u32, _>(f_ok(1).map(|a| a + 1));
27 is_future_v::<i32, u32, _>(f_ok(1).map_err(|a| a + 1));
28 is_future_v::<i32, u32, _>(f_ok(1).and_then(Ok));
29 is_future_v::<i32, u32, _>(f_ok(1).or_else(Err));
30 is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3)));
31 is_future_v::<i32, u32, _>(f_ok(1).map(f_ok).flatten());
32
33 assert_done(|| f_ok(1), r_ok(1));
34 assert_done(|| f_err(1), r_err(1));
35 assert_done(|| result(Ok(1)), r_ok(1));
36 assert_done(|| result(Err(1)), r_err(1));
37 assert_done(|| ok(1), r_ok(1));
38 assert_done(|| err(1), r_err(1));
39 assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3));
40 assert_done(|| f_err(1).map(|a| a + 2), r_err(1));
41 assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1));
42 assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3));
43 assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3));
44 assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1));
45 assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4));
46 assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1));
47 assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1));
48 assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3));
49 assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1));
50 assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5));
51 assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1));
52 assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1));
53 assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1));
54 assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1));
55 assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1));
56 assert_done(|| f_ok(1).join(f_err(1)), Err(1));
57 assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2)));
58 assert_done(|| f_err(1).join(f_ok(1)), Err(1));
59 assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2));
60 assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2));
61 assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2));
62 assert_done(|| f_err(1).then(|_| Err(2)), r_err(2));
63 }
64
65 #[test]
test_empty()66 fn test_empty() {
67 fn empty() -> Empty<i32, u32> { future::empty() }
68
69 assert_empty(|| empty());
70 assert_empty(|| empty().select(empty()));
71 assert_empty(|| empty().join(empty()));
72 assert_empty(|| empty().join(f_ok(1)));
73 assert_empty(|| f_ok(1).join(empty()));
74 assert_empty(|| empty().or_else(move |_| empty()));
75 assert_empty(|| empty().and_then(move |_| empty()));
76 assert_empty(|| f_err(1).or_else(move |_| empty()));
77 assert_empty(|| f_ok(1).and_then(move |_| empty()));
78 assert_empty(|| empty().map(|a| a + 1));
79 assert_empty(|| empty().map_err(|a| a + 1));
80 assert_empty(|| empty().then(|a| a));
81 }
82
83 #[test]
test_ok()84 fn test_ok() {
85 assert_done(|| ok(1), r_ok(1));
86 assert_done(|| err(1), r_err(1));
87 }
88
89 #[test]
flatten()90 fn flatten() {
91 fn ok<T: Send + 'static>(a: T) -> FutureResult<T, u32> {
92 future::ok(a)
93 }
94 fn err<E: Send + 'static>(b: E) -> FutureResult<i32, E> {
95 future::err(b)
96 }
97
98 assert_done(|| ok(ok(1)).flatten(), r_ok(1));
99 assert_done(|| ok(err(1)).flatten(), r_err(1));
100 assert_done(|| err(1u32).map(ok).flatten(), r_err(1));
101 assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1));
102 assert_empty(|| ok(empty::<i32, u32>()).flatten());
103 assert_empty(|| empty::<i32, u32>().map(ok).flatten());
104 }
105
106 #[test]
smoke_oneshot()107 fn smoke_oneshot() {
108 assert_done(|| {
109 let (c, p) = oneshot::channel();
110 c.send(1).unwrap();
111 p
112 }, Ok(1));
113 assert_done(|| {
114 let (c, p) = oneshot::channel::<i32>();
115 drop(c);
116 p
117 }, Err(Canceled));
118 let mut completes = Vec::new();
119 assert_empty(|| {
120 let (a, b) = oneshot::channel::<i32>();
121 completes.push(a);
122 b
123 });
124
125 let (c, mut p) = oneshot::channel::<i32>();
126 drop(c);
127 let res = panic_waker_lw(|lw| p.poll(lw));
128 assert!(res.is_err());
129 let (c, p) = oneshot::channel::<i32>();
130 drop(c);
131 let (tx, rx) = channel();
132 p.then(move |_| {
133 tx.send(())
134 }).forget();
135 rx.recv().unwrap();
136 }
137
138 #[test]
select_cancels()139 fn select_cancels() {
140 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
141 let ((btx, brx), (dtx, drx)) = (channel(), channel());
142 let b = b.map(move |b| { btx.send(b).unwrap(); b });
143 let d = d.map(move |d| { dtx.send(d).unwrap(); d });
144
145 let mut f = b.select(d).then(unselect);
146 // assert!(f.poll(&mut Task::new()).is_pending());
147 assert!(brx.try_recv().is_err());
148 assert!(drx.try_recv().is_err());
149 a.send(1).unwrap();
150 noop_waker_lw(|lw| {
151 let res = f.poll(lw);
152 assert!(res.ok().unwrap().is_ready());
153 assert_eq!(brx.recv().unwrap(), 1);
154 drop(c);
155 assert!(drx.recv().is_err());
156
157 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
158 let ((btx, _brx), (dtx, drx)) = (channel(), channel());
159 let b = b.map(move |b| { btx.send(b).unwrap(); b });
160 let d = d.map(move |d| { dtx.send(d).unwrap(); d });
161
162 let mut f = b.select(d).then(unselect);
163 assert!(f.poll(lw).ok().unwrap().is_pending());
164 assert!(f.poll(lw).ok().unwrap().is_pending());
165 a.send(1).unwrap();
166 assert!(f.poll(lw).ok().unwrap().is_ready());
167 drop((c, f));
168 assert!(drx.recv().is_err());
169 })
170 }
171
172 #[test]
join_cancels()173 fn join_cancels() {
174 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
175 let ((btx, _brx), (dtx, drx)) = (channel(), channel());
176 let b = b.map(move |b| { btx.send(b).unwrap(); b });
177 let d = d.map(move |d| { dtx.send(d).unwrap(); d });
178
179 let mut f = b.join(d);
180 drop(a);
181 let res = panic_waker_lw(|lw| f.poll(lw));
182 assert!(res.is_err());
183 drop(c);
184 assert!(drx.recv().is_err());
185
186 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
187 let ((btx, _brx), (dtx, drx)) = (channel(), channel());
188 let b = b.map(move |b| { btx.send(b).unwrap(); b });
189 let d = d.map(move |d| { dtx.send(d).unwrap(); d });
190
191 let (tx, rx) = channel();
192 let f = b.join(d);
193 f.then(move |_| {
194 tx.send(()).unwrap();
195 let res: Result<(), ()> = Ok(());
196 res
197 }).forget();
198 assert!(rx.try_recv().is_err());
199 drop(a);
200 rx.recv().unwrap();
201 drop(c);
202 assert!(drx.recv().is_err());
203 }
204
205 #[test]
join_incomplete()206 fn join_incomplete() {
207 let (a, b) = oneshot::channel::<i32>();
208 let (tx, rx) = channel();
209 noop_waker_lw(|lw| {
210 let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap());
211 assert!(f.poll(lw).ok().unwrap().is_pending());
212 assert!(rx.try_recv().is_err());
213 a.send(2).unwrap();
214 assert!(f.poll(lw).ok().unwrap().is_ready());
215 assert_eq!(rx.recv().unwrap(), (1, 2));
216
217 let (a, b) = oneshot::channel::<i32>();
218 let (tx, rx) = channel();
219 let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap());
220 assert!(f.poll(lw).ok().unwrap().is_pending());
221 assert!(rx.try_recv().is_err());
222 a.send(1).unwrap();
223 assert!(f.poll(lw).ok().unwrap().is_ready());
224 assert_eq!(rx.recv().unwrap(), (1, 2));
225
226 let (a, b) = oneshot::channel::<i32>();
227 let (tx, rx) = channel();
228 let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap());
229 assert!(f.poll(lw).ok().unwrap().is_pending());
230 assert!(rx.try_recv().is_err());
231 drop(a);
232 assert!(f.poll(lw).is_err());
233 assert_eq!(rx.recv().unwrap(), 2);
234
235 let (a, b) = oneshot::channel::<i32>();
236 let (tx, rx) = channel();
237 let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap());
238 assert!(f.poll(lw).ok().unwrap().is_pending());
239 assert!(rx.try_recv().is_err());
240 drop(a);
241 assert!(f.poll(lw).is_err());
242 assert_eq!(rx.recv().unwrap(), 1);
243 })
244 }
245
246
247 #[test]
select2()248 fn select2() {
249 assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));
250 assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2));
251 assert_done(|| f_err(2).select(empty()).then(unselect), Err(2));
252 assert_done(|| empty().select(f_err(2)).then(unselect), Err(2));
253
254 assert_done(|| {
255 f_ok(1).select(f_ok(2))
256 .map_err(|_| 0)
257 .and_then(|either_tup| {
258 let (a, b) = either_tup.into_inner();
259 b.map(move |b| a + b)
260 })
261 }, Ok(3));
262
263 // Finish one half of a select and then fail the second, ensuring that we
264 // get the notification of the second one.
265 {
266 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
267 let f = b.select(d);
268 let (tx, rx) = channel();
269 f.map(move |r| tx.send(r).unwrap()).forget();
270 a.send(1).unwrap();
271 let (val, next) = rx.recv().unwrap().into_inner();
272 assert_eq!(val, 1);
273 let (tx, rx) = channel();
274 next.map_err(move |_r| tx.send(2).unwrap()).forget();
275 assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
276 drop(c);
277 assert_eq!(rx.recv().unwrap(), 2);
278 }
279
280 // Fail the second half and ensure that we see the first one finish
281 {
282 let ((a, b), (c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
283 let f = b.select(d);
284 let (tx, rx) = channel();
285 f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget();
286 drop(c);
287 let (val, next) = rx.recv().unwrap();
288 assert_eq!(val, 1);
289 let (tx, rx) = channel();
290 next.map(move |r| tx.send(r).unwrap()).forget();
291 assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty);
292 a.send(2).unwrap();
293 assert_eq!(rx.recv().unwrap(), 2);
294 }
295
296 // Cancelling the first half should cancel the second
297 {
298 let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
299 let ((btx, brx), (dtx, drx)) = (channel(), channel());
300 let b = b.map(move |v| { btx.send(v).unwrap(); v });
301 let d = d.map(move |v| { dtx.send(v).unwrap(); v });
302 let f = b.select(d);
303 drop(f);
304 assert!(drx.recv().is_err());
305 assert!(brx.recv().is_err());
306 }
307
308 // Cancel after a schedule
309 {
310 let ((_a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
311 let ((btx, brx), (dtx, drx)) = (channel(), channel());
312 let b = b.map(move |v| { btx.send(v).unwrap(); v });
313 let d = d.map(move |v| { dtx.send(v).unwrap(); v });
314 let mut f = b.select(d);
315 let _res = noop_waker_lw(|lw| f.poll(lw));
316 drop(f);
317 assert!(drx.recv().is_err());
318 assert!(brx.recv().is_err());
319 }
320
321 // Cancel propagates
322 {
323 let ((a, b), (_c, d)) = (oneshot::channel::<i32>(), oneshot::channel::<i32>());
324 let ((btx, brx), (dtx, drx)) = (channel(), channel());
325 let b = b.map(move |v| { btx.send(v).unwrap(); v });
326 let d = d.map(move |v| { dtx.send(v).unwrap(); v });
327 let (tx, rx) = channel();
328 b.select(d).map(move |_| tx.send(()).unwrap()).forget();
329 drop(a);
330 assert!(drx.recv().is_err());
331 assert!(brx.recv().is_err());
332 assert!(rx.recv().is_err());
333 }
334
335 // Cancel on early drop
336 {
337 let (tx, rx) = channel();
338 let f = f_ok(1).select(empty::<_, ()>().map(move |()| {
339 tx.send(()).unwrap();
340 1
341 }));
342 drop(f);
343 assert!(rx.recv().is_err());
344 }
345 }
346
347 #[test]
option()348 fn option() {
349 assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future()));
350 assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::<FutureResult<(), ()>>.into_future()));
351 }
352