1 #[test]
select()2 fn select() {
3 use futures::executor::block_on;
4 use futures::stream::{self, StreamExt};
5
6 fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
7 let a = stream::iter(a);
8 let b = stream::iter(b);
9 let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
10 assert_eq!(vec, expected);
11 }
12
13 select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
14 select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
15 select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
16 }
17
18 #[test]
flat_map()19 fn flat_map() {
20 use futures::stream::{self, StreamExt};
21
22 futures::executor::block_on(async {
23 let st = stream::iter(vec![
24 stream::iter(0..=4u8),
25 stream::iter(6..=10),
26 stream::iter(0..=2),
27 ]);
28
29 let values: Vec<_> = st
30 .flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0)))
31 .collect()
32 .await;
33
34 assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
35 });
36 }
37
38 #[test]
scan()39 fn scan() {
40 use futures::stream::{self, StreamExt};
41
42 futures::executor::block_on(async {
43 assert_eq!(
44 stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
45 .scan(1, |state, e| {
46 *state += 1;
47 futures::future::ready(if e < *state { Some(e) } else { None })
48 })
49 .collect::<Vec<_>>()
50 .await,
51 vec![1u8, 2, 3, 4]
52 );
53 });
54 }
55
56 #[test]
take_until()57 fn take_until() {
58 use futures::future::{self, Future};
59 use futures::stream::{self, StreamExt};
60 use futures::task::Poll;
61
62 fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
63 let mut i = 0;
64 future::poll_fn(move |_cx| {
65 i += 1;
66 if i <= stop_on {
67 Poll::Pending
68 } else {
69 Poll::Ready(())
70 }
71 })
72 }
73
74 futures::executor::block_on(async {
75 // Verify stopping works:
76 let stream = stream::iter(1u32..=10);
77 let stop_fut = make_stop_fut(5);
78
79 let stream = stream.take_until(stop_fut);
80 let last = stream.fold(0, |_, i| async move { i }).await;
81 assert_eq!(last, 5);
82
83 // Verify take_future() works:
84 let stream = stream::iter(1..=10);
85 let stop_fut = make_stop_fut(5);
86
87 let mut stream = stream.take_until(stop_fut);
88
89 assert_eq!(stream.next().await, Some(1));
90 assert_eq!(stream.next().await, Some(2));
91
92 stream.take_future();
93
94 let last = stream.fold(0, |_, i| async move { i }).await;
95 assert_eq!(last, 10);
96
97 // Verify take_future() returns None if stream is stopped:
98 let stream = stream::iter(1u32..=10);
99 let stop_fut = make_stop_fut(1);
100 let mut stream = stream.take_until(stop_fut);
101 assert_eq!(stream.next().await, Some(1));
102 assert_eq!(stream.next().await, None);
103 assert!(stream.take_future().is_none());
104
105 // Verify TakeUntil is fused:
106 let mut i = 0;
107 let stream = stream::poll_fn(move |_cx| {
108 i += 1;
109 match i {
110 1 => Poll::Ready(Some(1)),
111 2 => Poll::Ready(None),
112 _ => panic!("TakeUntil not fused"),
113 }
114 });
115
116 let stop_fut = make_stop_fut(1);
117 let mut stream = stream.take_until(stop_fut);
118 assert_eq!(stream.next().await, Some(1));
119 assert_eq!(stream.next().await, None);
120 assert_eq!(stream.next().await, None);
121 });
122 }
123
124 #[test]
125 #[should_panic]
ready_chunks_panic_on_cap_zero()126 fn ready_chunks_panic_on_cap_zero() {
127 use futures::channel::mpsc;
128 use futures::stream::StreamExt;
129
130 let (_, rx1) = mpsc::channel::<()>(1);
131
132 let _ = rx1.ready_chunks(0);
133 }
134
135 #[test]
ready_chunks()136 fn ready_chunks() {
137 use futures::channel::mpsc;
138 use futures::stream::StreamExt;
139 use futures::sink::SinkExt;
140 use futures::FutureExt;
141 use futures_test::task::noop_context;
142
143 let (mut tx, rx1) = mpsc::channel::<i32>(16);
144
145 let mut s = rx1.ready_chunks(2);
146
147 let mut cx = noop_context();
148 assert!(s.next().poll_unpin(&mut cx).is_pending());
149
150 futures::executor::block_on(async {
151 tx.send(1).await.unwrap();
152
153 assert_eq!(s.next().await.unwrap(), vec![1]);
154 tx.send(2).await.unwrap();
155 tx.send(3).await.unwrap();
156 tx.send(4).await.unwrap();
157 assert_eq!(s.next().await.unwrap(), vec![2,3]);
158 assert_eq!(s.next().await.unwrap(), vec![4]);
159 });
160 }
161