1 #[test]
is_terminated()2 fn is_terminated() {
3     use futures::future::{self, FutureExt};
4     use futures::stream::{FusedStream, SelectAll, StreamExt};
5     use futures::task::Poll;
6     use futures_test::task::noop_context;
7 
8     let mut cx = noop_context();
9     let mut tasks = SelectAll::new();
10 
11     assert_eq!(tasks.is_terminated(), false);
12     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
13     assert_eq!(tasks.is_terminated(), true);
14 
15     // Test that the sentinel value doesn't leak
16     assert_eq!(tasks.is_empty(), true);
17     assert_eq!(tasks.len(), 0);
18 
19     tasks.push(future::ready(1).into_stream());
20 
21     assert_eq!(tasks.is_empty(), false);
22     assert_eq!(tasks.len(), 1);
23 
24     assert_eq!(tasks.is_terminated(), false);
25     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
26     assert_eq!(tasks.is_terminated(), false);
27     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
28     assert_eq!(tasks.is_terminated(), true);
29 }
30 
31 #[test]
issue_1626()32 fn issue_1626() {
33     use futures::executor::block_on_stream;
34     use futures::stream;
35 
36     let a = stream::iter(0..=2);
37     let b = stream::iter(10..=14);
38 
39     let mut s = block_on_stream(stream::select_all(vec![a, b]));
40 
41     assert_eq!(s.next(), Some(0));
42     assert_eq!(s.next(), Some(10));
43     assert_eq!(s.next(), Some(1));
44     assert_eq!(s.next(), Some(11));
45     assert_eq!(s.next(), Some(2));
46     assert_eq!(s.next(), Some(12));
47     assert_eq!(s.next(), Some(13));
48     assert_eq!(s.next(), Some(14));
49     assert_eq!(s.next(), None);
50 }
51 
52 #[test]
works_1()53 fn works_1() {
54     use futures::channel::mpsc;
55     use futures::executor::block_on_stream;
56     use futures::stream::select_all;
57 
58     let (a_tx, a_rx) = mpsc::unbounded::<u32>();
59     let (b_tx, b_rx) = mpsc::unbounded::<u32>();
60     let (c_tx, c_rx) = mpsc::unbounded::<u32>();
61 
62     let streams = vec![a_rx, b_rx, c_rx];
63 
64     let mut stream = block_on_stream(select_all(streams));
65 
66     b_tx.unbounded_send(99).unwrap();
67     a_tx.unbounded_send(33).unwrap();
68     assert_eq!(Some(33), stream.next());
69     assert_eq!(Some(99), stream.next());
70 
71     b_tx.unbounded_send(99).unwrap();
72     a_tx.unbounded_send(33).unwrap();
73     assert_eq!(Some(33), stream.next());
74     assert_eq!(Some(99), stream.next());
75 
76     c_tx.unbounded_send(42).unwrap();
77     assert_eq!(Some(42), stream.next());
78     a_tx.unbounded_send(43).unwrap();
79     assert_eq!(Some(43), stream.next());
80 
81     drop((a_tx, b_tx, c_tx));
82     assert_eq!(None, stream.next());
83 }
84