1 #[test]
select()2 fn select() {
3     use futures::executor::block_on;
4     use futures::stream::{self, StreamExt};
5 
6     fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
7         let a = stream::iter(a);
8         let b = stream::iter(b);
9         let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
10         assert_eq!(vec, expected);
11     }
12 
13     select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
14     select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
15     select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
16 }
17 
18 #[test]
flat_map()19 fn flat_map() {
20     use futures::stream::{self, StreamExt};
21 
22     futures::executor::block_on(async {
23         let st = stream::iter(vec![
24             stream::iter(0..=4u8),
25             stream::iter(6..=10),
26             stream::iter(0..=2),
27         ]);
28 
29         let values: Vec<_> = st
30             .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0)))
31             .collect()
32             .await;
33 
34         assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
35     });
36 }
37 
38 #[test]
scan()39 fn scan() {
40     use futures::stream::{self, StreamExt};
41 
42     futures::executor::block_on(async {
43         assert_eq!(
44             stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
45                 .scan(1, |state, e| {
46                     *state += 1;
47                     futures::future::ready(if e < *state { Some(e) } else { None })
48                 })
49                 .collect::<Vec<_>>()
50                 .await,
51             vec![1u8, 2, 3, 4]
52         );
53     });
54 }
55 
56 #[test]
take_until()57 fn take_until() {
58     use futures::future::{self, Future};
59     use futures::stream::{self, StreamExt};
60     use futures::task::Poll;
61 
62     fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
63         let mut i = 0;
64         future::poll_fn(move |_cx| {
65             i += 1;
66             if i <= stop_on {
67                 Poll::Pending
68             } else {
69                 Poll::Ready(())
70             }
71         })
72     }
73 
74     futures::executor::block_on(async {
75         // Verify stopping works:
76         let stream = stream::iter(1u32..=10);
77         let stop_fut = make_stop_fut(5);
78 
79         let stream = stream.take_until(stop_fut);
80         let last = stream.fold(0, |_, i| async move { i }).await;
81         assert_eq!(last, 5);
82 
83         // Verify take_future() works:
84         let stream = stream::iter(1..=10);
85         let stop_fut = make_stop_fut(5);
86 
87         let mut stream = stream.take_until(stop_fut);
88 
89         assert_eq!(stream.next().await, Some(1));
90         assert_eq!(stream.next().await, Some(2));
91 
92         stream.take_future();
93 
94         let last = stream.fold(0, |_, i| async move { i }).await;
95         assert_eq!(last, 10);
96 
97         // Verify take_future() returns None if stream is stopped:
98         let stream = stream::iter(1u32..=10);
99         let stop_fut = make_stop_fut(1);
100         let mut stream = stream.take_until(stop_fut);
101         assert_eq!(stream.next().await, Some(1));
102         assert_eq!(stream.next().await, None);
103         assert!(stream.take_future().is_none());
104 
105         // Verify TakeUntil is fused:
106         let mut i = 0;
107         let stream = stream::poll_fn(move |_cx| {
108             i += 1;
109             match i {
110                 1 => Poll::Ready(Some(1)),
111                 2 => Poll::Ready(None),
112                 _ => panic!("TakeUntil not fused"),
113             }
114         });
115 
116         let stop_fut = make_stop_fut(1);
117         let mut stream = stream.take_until(stop_fut);
118         assert_eq!(stream.next().await, Some(1));
119         assert_eq!(stream.next().await, None);
120         assert_eq!(stream.next().await, None);
121     });
122 }
123 
124 #[test]
125 #[should_panic]
ready_chunks_panic_on_cap_zero()126 fn ready_chunks_panic_on_cap_zero() {
127     use futures::channel::mpsc;
128     use futures::stream::StreamExt;
129 
130     let (_, rx1) = mpsc::channel::<()>(1);
131 
132     let _ = rx1.ready_chunks(0);
133 }
134 
135 #[test]
ready_chunks()136 fn ready_chunks() {
137     use futures::channel::mpsc;
138     use futures::stream::StreamExt;
139     use futures::sink::SinkExt;
140     use futures::FutureExt;
141     use futures_test::task::noop_context;
142 
143     let (mut tx, rx1) = mpsc::channel::<i32>(16);
144 
145     let mut s = rx1.ready_chunks(2);
146 
147     let mut cx = noop_context();
148     assert!(s.next().poll_unpin(&mut cx).is_pending());
149 
150     futures::executor::block_on(async {
151         tx.send(1).await.unwrap();
152 
153         assert_eq!(s.next().await.unwrap(), vec![1]);
154         tx.send(2).await.unwrap();
155         tx.send(3).await.unwrap();
156         tx.send(4).await.unwrap();
157         assert_eq!(s.next().await.unwrap(), vec![2,3]);
158         assert_eq!(s.next().await.unwrap(), vec![4]);
159     });
160 }
161