1 #[test]
works_1()2 fn works_1() {
3 use futures::channel::oneshot;
4 use futures::executor::block_on_stream;
5 use futures::stream::{StreamExt, FuturesOrdered};
6 use futures_test::task::noop_context;
7
8 let (a_tx, a_rx) = oneshot::channel::<i32>();
9 let (b_tx, b_rx) = oneshot::channel::<i32>();
10 let (c_tx, c_rx) = oneshot::channel::<i32>();
11
12 let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
13
14 b_tx.send(99).unwrap();
15 assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
16
17 a_tx.send(33).unwrap();
18 c_tx.send(33).unwrap();
19
20 let mut iter = block_on_stream(stream);
21 assert_eq!(Some(Ok(33)), iter.next());
22 assert_eq!(Some(Ok(99)), iter.next());
23 assert_eq!(Some(Ok(33)), iter.next());
24 assert_eq!(None, iter.next());
25 }
26
27 #[test]
works_2()28 fn works_2() {
29 use futures::channel::oneshot;
30 use futures::future::{join, FutureExt};
31 use futures::stream::{StreamExt, FuturesOrdered};
32 use futures_test::task::noop_context;
33
34 let (a_tx, a_rx) = oneshot::channel::<i32>();
35 let (b_tx, b_rx) = oneshot::channel::<i32>();
36 let (c_tx, c_rx) = oneshot::channel::<i32>();
37
38 let mut stream = vec![
39 a_rx.boxed(),
40 join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
41 ].into_iter().collect::<FuturesOrdered<_>>();
42
43 let mut cx = noop_context();
44 a_tx.send(33).unwrap();
45 b_tx.send(33).unwrap();
46 assert!(stream.poll_next_unpin(&mut cx).is_ready());
47 assert!(stream.poll_next_unpin(&mut cx).is_pending());
48 c_tx.send(33).unwrap();
49 assert!(stream.poll_next_unpin(&mut cx).is_ready());
50 }
51
52 #[test]
from_iterator()53 fn from_iterator() {
54 use futures::executor::block_on;
55 use futures::future;
56 use futures::stream::{StreamExt, FuturesOrdered};
57
58 let stream = vec![
59 future::ready::<i32>(1),
60 future::ready::<i32>(2),
61 future::ready::<i32>(3)
62 ].into_iter().collect::<FuturesOrdered<_>>();
63 assert_eq!(stream.len(), 3);
64 assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
65 }
66
67 #[test]
queue_never_unblocked()68 fn queue_never_unblocked() {
69 use futures::channel::oneshot;
70 use futures::future::{self, Future, TryFutureExt};
71 use futures::stream::{StreamExt, FuturesOrdered};
72 use futures_test::task::noop_context;
73 use std::any::Any;
74
75 let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
76 let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
77 let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
78
79 let mut stream = vec![
80 Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
81 Box::new(future::try_select(b_rx, c_rx)
82 .map_err(|e| e.factor_first().0)
83 .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
84 ].into_iter().collect::<FuturesOrdered<_>>();
85
86 let cx = &mut noop_context();
87 for _ in 0..10 {
88 assert!(stream.poll_next_unpin(cx).is_pending());
89 }
90
91 b_tx.send(Box::new(())).unwrap();
92 assert!(stream.poll_next_unpin(cx).is_pending());
93 c_tx.send(Box::new(())).unwrap();
94 assert!(stream.poll_next_unpin(cx).is_pending());
95 assert!(stream.poll_next_unpin(cx).is_pending());
96 }
97