1 #[test]
works_1()2 fn works_1() {
3     use futures::channel::oneshot;
4     use futures::executor::block_on_stream;
5     use futures::stream::{StreamExt, FuturesOrdered};
6     use futures_test::task::noop_context;
7 
8     let (a_tx, a_rx) = oneshot::channel::<i32>();
9     let (b_tx, b_rx) = oneshot::channel::<i32>();
10     let (c_tx, c_rx) = oneshot::channel::<i32>();
11 
12     let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesOrdered<_>>();
13 
14     b_tx.send(99).unwrap();
15     assert!(stream.poll_next_unpin(&mut noop_context()).is_pending());
16 
17     a_tx.send(33).unwrap();
18     c_tx.send(33).unwrap();
19 
20     let mut iter = block_on_stream(stream);
21     assert_eq!(Some(Ok(33)), iter.next());
22     assert_eq!(Some(Ok(99)), iter.next());
23     assert_eq!(Some(Ok(33)), iter.next());
24     assert_eq!(None, iter.next());
25 }
26 
27 #[test]
works_2()28 fn works_2() {
29     use futures::channel::oneshot;
30     use futures::future::{join, FutureExt};
31     use futures::stream::{StreamExt, FuturesOrdered};
32     use futures_test::task::noop_context;
33 
34     let (a_tx, a_rx) = oneshot::channel::<i32>();
35     let (b_tx, b_rx) = oneshot::channel::<i32>();
36     let (c_tx, c_rx) = oneshot::channel::<i32>();
37 
38     let mut stream = vec![
39         a_rx.boxed(),
40         join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(),
41     ].into_iter().collect::<FuturesOrdered<_>>();
42 
43     let mut cx = noop_context();
44     a_tx.send(33).unwrap();
45     b_tx.send(33).unwrap();
46     assert!(stream.poll_next_unpin(&mut cx).is_ready());
47     assert!(stream.poll_next_unpin(&mut cx).is_pending());
48     c_tx.send(33).unwrap();
49     assert!(stream.poll_next_unpin(&mut cx).is_ready());
50 }
51 
52 #[test]
from_iterator()53 fn from_iterator() {
54     use futures::executor::block_on;
55     use futures::future;
56     use futures::stream::{StreamExt, FuturesOrdered};
57 
58     let stream = vec![
59         future::ready::<i32>(1),
60         future::ready::<i32>(2),
61         future::ready::<i32>(3)
62     ].into_iter().collect::<FuturesOrdered<_>>();
63     assert_eq!(stream.len(), 3);
64     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
65 }
66 
67 #[test]
queue_never_unblocked()68 fn queue_never_unblocked() {
69     use futures::channel::oneshot;
70     use futures::future::{self, Future, TryFutureExt};
71     use futures::stream::{StreamExt, FuturesOrdered};
72     use futures_test::task::noop_context;
73     use std::any::Any;
74 
75     let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
76     let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
77     let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();
78 
79     let mut stream = vec![
80         Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
81         Box::new(future::try_select(b_rx, c_rx)
82             .map_err(|e| e.factor_first().0)
83             .and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
84     ].into_iter().collect::<FuturesOrdered<_>>();
85 
86     let cx = &mut noop_context();
87     for _ in 0..10 {
88         assert!(stream.poll_next_unpin(cx).is_pending());
89     }
90 
91     b_tx.send(Box::new(())).unwrap();
92     assert!(stream.poll_next_unpin(cx).is_pending());
93     c_tx.send(Box::new(())).unwrap();
94     assert!(stream.poll_next_unpin(cx).is_pending());
95     assert!(stream.poll_next_unpin(cx).is_pending());
96 }
97