1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4 
5 use tokio::sync::broadcast;
6 use tokio_test::task;
7 use tokio_test::{
8     assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
9 };
10 
11 use std::sync::Arc;
12 
13 macro_rules! assert_recv {
14     ($e:expr) => {
15         match $e.try_recv() {
16             Ok(value) => value,
17             Err(e) => panic!("expected recv; got = {:?}", e),
18         }
19     };
20 }
21 
22 macro_rules! assert_empty {
23     ($e:expr) => {
24         match $e.try_recv() {
25             Ok(value) => panic!("expected empty; got = {:?}", value),
26             Err(broadcast::error::TryRecvError::Empty) => {}
27             Err(e) => panic!("expected empty; got = {:?}", e),
28         }
29     };
30 }
31 
32 macro_rules! assert_lagged {
33     ($e:expr, $n:expr) => {
34         match assert_err!($e) {
35             broadcast::error::TryRecvError::Lagged(n) => {
36                 assert_eq!(n, $n);
37             }
38             _ => panic!("did not lag"),
39         }
40     };
41 }
42 
43 macro_rules! assert_closed {
44     ($e:expr) => {
45         match assert_err!($e) {
46             broadcast::error::TryRecvError::Closed => {}
47             _ => panic!("did not lag"),
48         }
49     };
50 }
51 
52 trait AssertSend: Send + Sync {}
53 impl AssertSend for broadcast::Sender<i32> {}
54 impl AssertSend for broadcast::Receiver<i32> {}
55 
56 #[test]
send_try_recv_bounded()57 fn send_try_recv_bounded() {
58     let (tx, mut rx) = broadcast::channel(16);
59 
60     assert_empty!(rx);
61 
62     let n = assert_ok!(tx.send("hello"));
63     assert_eq!(n, 1);
64 
65     let val = assert_recv!(rx);
66     assert_eq!(val, "hello");
67 
68     assert_empty!(rx);
69 }
70 
71 #[test]
send_two_recv()72 fn send_two_recv() {
73     let (tx, mut rx1) = broadcast::channel(16);
74     let mut rx2 = tx.subscribe();
75 
76     assert_empty!(rx1);
77     assert_empty!(rx2);
78 
79     let n = assert_ok!(tx.send("hello"));
80     assert_eq!(n, 2);
81 
82     let val = assert_recv!(rx1);
83     assert_eq!(val, "hello");
84 
85     let val = assert_recv!(rx2);
86     assert_eq!(val, "hello");
87 
88     assert_empty!(rx1);
89     assert_empty!(rx2);
90 }
91 
92 #[test]
send_recv_bounded()93 fn send_recv_bounded() {
94     let (tx, mut rx) = broadcast::channel(16);
95 
96     let mut recv = task::spawn(rx.recv());
97 
98     assert_pending!(recv.poll());
99 
100     assert_ok!(tx.send("hello"));
101 
102     assert!(recv.is_woken());
103     let val = assert_ready_ok!(recv.poll());
104     assert_eq!(val, "hello");
105 }
106 
107 #[test]
send_two_recv_bounded()108 fn send_two_recv_bounded() {
109     let (tx, mut rx1) = broadcast::channel(16);
110     let mut rx2 = tx.subscribe();
111 
112     let mut recv1 = task::spawn(rx1.recv());
113     let mut recv2 = task::spawn(rx2.recv());
114 
115     assert_pending!(recv1.poll());
116     assert_pending!(recv2.poll());
117 
118     assert_ok!(tx.send("hello"));
119 
120     assert!(recv1.is_woken());
121     assert!(recv2.is_woken());
122 
123     let val1 = assert_ready_ok!(recv1.poll());
124     let val2 = assert_ready_ok!(recv2.poll());
125     assert_eq!(val1, "hello");
126     assert_eq!(val2, "hello");
127 
128     drop((recv1, recv2));
129 
130     let mut recv1 = task::spawn(rx1.recv());
131     let mut recv2 = task::spawn(rx2.recv());
132 
133     assert_pending!(recv1.poll());
134 
135     assert_ok!(tx.send("world"));
136 
137     assert!(recv1.is_woken());
138     assert!(!recv2.is_woken());
139 
140     let val1 = assert_ready_ok!(recv1.poll());
141     let val2 = assert_ready_ok!(recv2.poll());
142     assert_eq!(val1, "world");
143     assert_eq!(val2, "world");
144 }
145 
146 #[test]
change_tasks()147 fn change_tasks() {
148     let (tx, mut rx) = broadcast::channel(1);
149 
150     let mut recv = Box::pin(rx.recv());
151 
152     let mut task1 = task::spawn(&mut recv);
153     assert_pending!(task1.poll());
154 
155     let mut task2 = task::spawn(&mut recv);
156     assert_pending!(task2.poll());
157 
158     tx.send("hello").unwrap();
159 
160     assert!(task2.is_woken());
161 }
162 
163 #[test]
send_slow_rx()164 fn send_slow_rx() {
165     let (tx, mut rx1) = broadcast::channel(16);
166     let mut rx2 = tx.subscribe();
167 
168     {
169         let mut recv2 = task::spawn(rx2.recv());
170 
171         {
172             let mut recv1 = task::spawn(rx1.recv());
173 
174             assert_pending!(recv1.poll());
175             assert_pending!(recv2.poll());
176 
177             assert_ok!(tx.send("one"));
178 
179             assert!(recv1.is_woken());
180             assert!(recv2.is_woken());
181 
182             assert_ok!(tx.send("two"));
183 
184             let val = assert_ready_ok!(recv1.poll());
185             assert_eq!(val, "one");
186         }
187 
188         let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
189         assert_eq!(val, "two");
190 
191         let mut recv1 = task::spawn(rx1.recv());
192 
193         assert_pending!(recv1.poll());
194 
195         assert_ok!(tx.send("three"));
196 
197         assert!(recv1.is_woken());
198 
199         let val = assert_ready_ok!(recv1.poll());
200         assert_eq!(val, "three");
201 
202         let val = assert_ready_ok!(recv2.poll());
203         assert_eq!(val, "one");
204     }
205 
206     let val = assert_recv!(rx2);
207     assert_eq!(val, "two");
208 
209     let val = assert_recv!(rx2);
210     assert_eq!(val, "three");
211 }
212 
213 #[test]
drop_rx_while_values_remain()214 fn drop_rx_while_values_remain() {
215     let (tx, mut rx1) = broadcast::channel(16);
216     let mut rx2 = tx.subscribe();
217 
218     assert_ok!(tx.send("one"));
219     assert_ok!(tx.send("two"));
220 
221     assert_recv!(rx1);
222     assert_recv!(rx2);
223 
224     drop(rx2);
225     drop(rx1);
226 }
227 
228 #[test]
lagging_rx()229 fn lagging_rx() {
230     let (tx, mut rx1) = broadcast::channel(2);
231     let mut rx2 = tx.subscribe();
232 
233     assert_ok!(tx.send("one"));
234     assert_ok!(tx.send("two"));
235 
236     assert_eq!("one", assert_recv!(rx1));
237 
238     assert_ok!(tx.send("three"));
239 
240     // Lagged too far
241     let x = dbg!(rx2.try_recv());
242     assert_lagged!(x, 1);
243 
244     // Calling again gets the next value
245     assert_eq!("two", assert_recv!(rx2));
246 
247     assert_eq!("two", assert_recv!(rx1));
248     assert_eq!("three", assert_recv!(rx1));
249 
250     assert_ok!(tx.send("four"));
251     assert_ok!(tx.send("five"));
252 
253     assert_lagged!(rx2.try_recv(), 1);
254 
255     assert_ok!(tx.send("six"));
256 
257     assert_lagged!(rx2.try_recv(), 1);
258 }
259 
260 #[test]
send_no_rx()261 fn send_no_rx() {
262     let (tx, _) = broadcast::channel(16);
263 
264     assert_err!(tx.send("hello"));
265 
266     let mut rx = tx.subscribe();
267 
268     assert_ok!(tx.send("world"));
269 
270     let val = assert_recv!(rx);
271     assert_eq!("world", val);
272 }
273 
274 #[test]
275 #[should_panic]
zero_capacity()276 fn zero_capacity() {
277     broadcast::channel::<()>(0);
278 }
279 
280 #[test]
281 #[should_panic]
capacity_too_big()282 fn capacity_too_big() {
283     use std::usize;
284 
285     broadcast::channel::<()>(1 + (usize::MAX >> 1));
286 }
287 
288 #[test]
289 #[cfg(not(target_os = "android"))]
panic_in_clone()290 fn panic_in_clone() {
291     use std::panic::{self, AssertUnwindSafe};
292 
293     #[derive(Eq, PartialEq, Debug)]
294     struct MyVal(usize);
295 
296     impl Clone for MyVal {
297         fn clone(&self) -> MyVal {
298             assert_ne!(0, self.0);
299             MyVal(self.0)
300         }
301     }
302 
303     let (tx, mut rx) = broadcast::channel(16);
304 
305     assert_ok!(tx.send(MyVal(0)));
306     assert_ok!(tx.send(MyVal(1)));
307 
308     let res = panic::catch_unwind(AssertUnwindSafe(|| {
309         let _ = rx.try_recv();
310     }));
311 
312     assert_err!(res);
313 
314     let val = assert_recv!(rx);
315     assert_eq!(val, MyVal(1));
316 }
317 
318 #[test]
dropping_tx_notifies_rx()319 fn dropping_tx_notifies_rx() {
320     let (tx, mut rx1) = broadcast::channel::<()>(16);
321     let mut rx2 = tx.subscribe();
322 
323     let tx2 = tx.clone();
324 
325     let mut recv1 = task::spawn(rx1.recv());
326     let mut recv2 = task::spawn(rx2.recv());
327 
328     assert_pending!(recv1.poll());
329     assert_pending!(recv2.poll());
330 
331     drop(tx);
332 
333     assert_pending!(recv1.poll());
334     assert_pending!(recv2.poll());
335 
336     drop(tx2);
337 
338     assert!(recv1.is_woken());
339     assert!(recv2.is_woken());
340 
341     let err = assert_ready_err!(recv1.poll());
342     assert!(is_closed(err));
343 
344     let err = assert_ready_err!(recv2.poll());
345     assert!(is_closed(err));
346 }
347 
348 #[test]
unconsumed_messages_are_dropped()349 fn unconsumed_messages_are_dropped() {
350     let (tx, rx) = broadcast::channel(16);
351 
352     let msg = Arc::new(());
353 
354     assert_ok!(tx.send(msg.clone()));
355 
356     assert_eq!(2, Arc::strong_count(&msg));
357 
358     drop(rx);
359 
360     assert_eq!(1, Arc::strong_count(&msg));
361 }
362 
363 #[test]
single_capacity_recvs()364 fn single_capacity_recvs() {
365     let (tx, mut rx) = broadcast::channel(1);
366 
367     assert_ok!(tx.send(1));
368 
369     assert_eq!(assert_recv!(rx), 1);
370     assert_empty!(rx);
371 }
372 
373 #[test]
single_capacity_recvs_after_drop_1()374 fn single_capacity_recvs_after_drop_1() {
375     let (tx, mut rx) = broadcast::channel(1);
376 
377     assert_ok!(tx.send(1));
378     drop(tx);
379 
380     assert_eq!(assert_recv!(rx), 1);
381     assert_closed!(rx.try_recv());
382 }
383 
384 #[test]
single_capacity_recvs_after_drop_2()385 fn single_capacity_recvs_after_drop_2() {
386     let (tx, mut rx) = broadcast::channel(1);
387 
388     assert_ok!(tx.send(1));
389     assert_ok!(tx.send(2));
390     drop(tx);
391 
392     assert_lagged!(rx.try_recv(), 1);
393     assert_eq!(assert_recv!(rx), 2);
394     assert_closed!(rx.try_recv());
395 }
396 
397 #[test]
dropping_sender_does_not_overwrite()398 fn dropping_sender_does_not_overwrite() {
399     let (tx, mut rx) = broadcast::channel(2);
400 
401     assert_ok!(tx.send(1));
402     assert_ok!(tx.send(2));
403     drop(tx);
404 
405     assert_eq!(assert_recv!(rx), 1);
406     assert_eq!(assert_recv!(rx), 2);
407     assert_closed!(rx.try_recv());
408 }
409 
410 #[test]
lagging_receiver_recovers_after_wrap_closed_1()411 fn lagging_receiver_recovers_after_wrap_closed_1() {
412     let (tx, mut rx) = broadcast::channel(2);
413 
414     assert_ok!(tx.send(1));
415     assert_ok!(tx.send(2));
416     assert_ok!(tx.send(3));
417     drop(tx);
418 
419     assert_lagged!(rx.try_recv(), 1);
420     assert_eq!(assert_recv!(rx), 2);
421     assert_eq!(assert_recv!(rx), 3);
422     assert_closed!(rx.try_recv());
423 }
424 
425 #[test]
lagging_receiver_recovers_after_wrap_closed_2()426 fn lagging_receiver_recovers_after_wrap_closed_2() {
427     let (tx, mut rx) = broadcast::channel(2);
428 
429     assert_ok!(tx.send(1));
430     assert_ok!(tx.send(2));
431     assert_ok!(tx.send(3));
432     assert_ok!(tx.send(4));
433     drop(tx);
434 
435     assert_lagged!(rx.try_recv(), 2);
436     assert_eq!(assert_recv!(rx), 3);
437     assert_eq!(assert_recv!(rx), 4);
438     assert_closed!(rx.try_recv());
439 }
440 
441 #[test]
lagging_receiver_recovers_after_wrap_open()442 fn lagging_receiver_recovers_after_wrap_open() {
443     let (tx, mut rx) = broadcast::channel(2);
444 
445     assert_ok!(tx.send(1));
446     assert_ok!(tx.send(2));
447     assert_ok!(tx.send(3));
448 
449     assert_lagged!(rx.try_recv(), 1);
450     assert_eq!(assert_recv!(rx), 2);
451     assert_eq!(assert_recv!(rx), 3);
452     assert_empty!(rx);
453 }
454 
is_closed(err: broadcast::error::RecvError) -> bool455 fn is_closed(err: broadcast::error::RecvError) -> bool {
456     matches!(err, broadcast::error::RecvError::Closed)
457 }
458