1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "sync")]
4
5 use tokio::sync::broadcast;
6 use tokio_test::task;
7 use tokio_test::{
8 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
9 };
10
11 use std::sync::Arc;
12
13 macro_rules! assert_recv {
14 ($e:expr) => {
15 match $e.try_recv() {
16 Ok(value) => value,
17 Err(e) => panic!("expected recv; got = {:?}", e),
18 }
19 };
20 }
21
22 macro_rules! assert_empty {
23 ($e:expr) => {
24 match $e.try_recv() {
25 Ok(value) => panic!("expected empty; got = {:?}", value),
26 Err(broadcast::error::TryRecvError::Empty) => {}
27 Err(e) => panic!("expected empty; got = {:?}", e),
28 }
29 };
30 }
31
32 macro_rules! assert_lagged {
33 ($e:expr, $n:expr) => {
34 match assert_err!($e) {
35 broadcast::error::TryRecvError::Lagged(n) => {
36 assert_eq!(n, $n);
37 }
38 _ => panic!("did not lag"),
39 }
40 };
41 }
42
43 macro_rules! assert_closed {
44 ($e:expr) => {
45 match assert_err!($e) {
46 broadcast::error::TryRecvError::Closed => {}
47 _ => panic!("did not lag"),
48 }
49 };
50 }
51
52 trait AssertSend: Send + Sync {}
53 impl AssertSend for broadcast::Sender<i32> {}
54 impl AssertSend for broadcast::Receiver<i32> {}
55
56 #[test]
send_try_recv_bounded()57 fn send_try_recv_bounded() {
58 let (tx, mut rx) = broadcast::channel(16);
59
60 assert_empty!(rx);
61
62 let n = assert_ok!(tx.send("hello"));
63 assert_eq!(n, 1);
64
65 let val = assert_recv!(rx);
66 assert_eq!(val, "hello");
67
68 assert_empty!(rx);
69 }
70
71 #[test]
send_two_recv()72 fn send_two_recv() {
73 let (tx, mut rx1) = broadcast::channel(16);
74 let mut rx2 = tx.subscribe();
75
76 assert_empty!(rx1);
77 assert_empty!(rx2);
78
79 let n = assert_ok!(tx.send("hello"));
80 assert_eq!(n, 2);
81
82 let val = assert_recv!(rx1);
83 assert_eq!(val, "hello");
84
85 let val = assert_recv!(rx2);
86 assert_eq!(val, "hello");
87
88 assert_empty!(rx1);
89 assert_empty!(rx2);
90 }
91
92 #[test]
send_recv_bounded()93 fn send_recv_bounded() {
94 let (tx, mut rx) = broadcast::channel(16);
95
96 let mut recv = task::spawn(rx.recv());
97
98 assert_pending!(recv.poll());
99
100 assert_ok!(tx.send("hello"));
101
102 assert!(recv.is_woken());
103 let val = assert_ready_ok!(recv.poll());
104 assert_eq!(val, "hello");
105 }
106
107 #[test]
send_two_recv_bounded()108 fn send_two_recv_bounded() {
109 let (tx, mut rx1) = broadcast::channel(16);
110 let mut rx2 = tx.subscribe();
111
112 let mut recv1 = task::spawn(rx1.recv());
113 let mut recv2 = task::spawn(rx2.recv());
114
115 assert_pending!(recv1.poll());
116 assert_pending!(recv2.poll());
117
118 assert_ok!(tx.send("hello"));
119
120 assert!(recv1.is_woken());
121 assert!(recv2.is_woken());
122
123 let val1 = assert_ready_ok!(recv1.poll());
124 let val2 = assert_ready_ok!(recv2.poll());
125 assert_eq!(val1, "hello");
126 assert_eq!(val2, "hello");
127
128 drop((recv1, recv2));
129
130 let mut recv1 = task::spawn(rx1.recv());
131 let mut recv2 = task::spawn(rx2.recv());
132
133 assert_pending!(recv1.poll());
134
135 assert_ok!(tx.send("world"));
136
137 assert!(recv1.is_woken());
138 assert!(!recv2.is_woken());
139
140 let val1 = assert_ready_ok!(recv1.poll());
141 let val2 = assert_ready_ok!(recv2.poll());
142 assert_eq!(val1, "world");
143 assert_eq!(val2, "world");
144 }
145
146 #[test]
change_tasks()147 fn change_tasks() {
148 let (tx, mut rx) = broadcast::channel(1);
149
150 let mut recv = Box::pin(rx.recv());
151
152 let mut task1 = task::spawn(&mut recv);
153 assert_pending!(task1.poll());
154
155 let mut task2 = task::spawn(&mut recv);
156 assert_pending!(task2.poll());
157
158 tx.send("hello").unwrap();
159
160 assert!(task2.is_woken());
161 }
162
163 #[test]
send_slow_rx()164 fn send_slow_rx() {
165 let (tx, mut rx1) = broadcast::channel(16);
166 let mut rx2 = tx.subscribe();
167
168 {
169 let mut recv2 = task::spawn(rx2.recv());
170
171 {
172 let mut recv1 = task::spawn(rx1.recv());
173
174 assert_pending!(recv1.poll());
175 assert_pending!(recv2.poll());
176
177 assert_ok!(tx.send("one"));
178
179 assert!(recv1.is_woken());
180 assert!(recv2.is_woken());
181
182 assert_ok!(tx.send("two"));
183
184 let val = assert_ready_ok!(recv1.poll());
185 assert_eq!(val, "one");
186 }
187
188 let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
189 assert_eq!(val, "two");
190
191 let mut recv1 = task::spawn(rx1.recv());
192
193 assert_pending!(recv1.poll());
194
195 assert_ok!(tx.send("three"));
196
197 assert!(recv1.is_woken());
198
199 let val = assert_ready_ok!(recv1.poll());
200 assert_eq!(val, "three");
201
202 let val = assert_ready_ok!(recv2.poll());
203 assert_eq!(val, "one");
204 }
205
206 let val = assert_recv!(rx2);
207 assert_eq!(val, "two");
208
209 let val = assert_recv!(rx2);
210 assert_eq!(val, "three");
211 }
212
213 #[test]
drop_rx_while_values_remain()214 fn drop_rx_while_values_remain() {
215 let (tx, mut rx1) = broadcast::channel(16);
216 let mut rx2 = tx.subscribe();
217
218 assert_ok!(tx.send("one"));
219 assert_ok!(tx.send("two"));
220
221 assert_recv!(rx1);
222 assert_recv!(rx2);
223
224 drop(rx2);
225 drop(rx1);
226 }
227
228 #[test]
lagging_rx()229 fn lagging_rx() {
230 let (tx, mut rx1) = broadcast::channel(2);
231 let mut rx2 = tx.subscribe();
232
233 assert_ok!(tx.send("one"));
234 assert_ok!(tx.send("two"));
235
236 assert_eq!("one", assert_recv!(rx1));
237
238 assert_ok!(tx.send("three"));
239
240 // Lagged too far
241 let x = dbg!(rx2.try_recv());
242 assert_lagged!(x, 1);
243
244 // Calling again gets the next value
245 assert_eq!("two", assert_recv!(rx2));
246
247 assert_eq!("two", assert_recv!(rx1));
248 assert_eq!("three", assert_recv!(rx1));
249
250 assert_ok!(tx.send("four"));
251 assert_ok!(tx.send("five"));
252
253 assert_lagged!(rx2.try_recv(), 1);
254
255 assert_ok!(tx.send("six"));
256
257 assert_lagged!(rx2.try_recv(), 1);
258 }
259
260 #[test]
send_no_rx()261 fn send_no_rx() {
262 let (tx, _) = broadcast::channel(16);
263
264 assert_err!(tx.send("hello"));
265
266 let mut rx = tx.subscribe();
267
268 assert_ok!(tx.send("world"));
269
270 let val = assert_recv!(rx);
271 assert_eq!("world", val);
272 }
273
274 #[test]
275 #[should_panic]
zero_capacity()276 fn zero_capacity() {
277 broadcast::channel::<()>(0);
278 }
279
280 #[test]
281 #[should_panic]
capacity_too_big()282 fn capacity_too_big() {
283 use std::usize;
284
285 broadcast::channel::<()>(1 + (usize::MAX >> 1));
286 }
287
288 #[test]
289 #[cfg(not(target_os = "android"))]
panic_in_clone()290 fn panic_in_clone() {
291 use std::panic::{self, AssertUnwindSafe};
292
293 #[derive(Eq, PartialEq, Debug)]
294 struct MyVal(usize);
295
296 impl Clone for MyVal {
297 fn clone(&self) -> MyVal {
298 assert_ne!(0, self.0);
299 MyVal(self.0)
300 }
301 }
302
303 let (tx, mut rx) = broadcast::channel(16);
304
305 assert_ok!(tx.send(MyVal(0)));
306 assert_ok!(tx.send(MyVal(1)));
307
308 let res = panic::catch_unwind(AssertUnwindSafe(|| {
309 let _ = rx.try_recv();
310 }));
311
312 assert_err!(res);
313
314 let val = assert_recv!(rx);
315 assert_eq!(val, MyVal(1));
316 }
317
318 #[test]
dropping_tx_notifies_rx()319 fn dropping_tx_notifies_rx() {
320 let (tx, mut rx1) = broadcast::channel::<()>(16);
321 let mut rx2 = tx.subscribe();
322
323 let tx2 = tx.clone();
324
325 let mut recv1 = task::spawn(rx1.recv());
326 let mut recv2 = task::spawn(rx2.recv());
327
328 assert_pending!(recv1.poll());
329 assert_pending!(recv2.poll());
330
331 drop(tx);
332
333 assert_pending!(recv1.poll());
334 assert_pending!(recv2.poll());
335
336 drop(tx2);
337
338 assert!(recv1.is_woken());
339 assert!(recv2.is_woken());
340
341 let err = assert_ready_err!(recv1.poll());
342 assert!(is_closed(err));
343
344 let err = assert_ready_err!(recv2.poll());
345 assert!(is_closed(err));
346 }
347
348 #[test]
unconsumed_messages_are_dropped()349 fn unconsumed_messages_are_dropped() {
350 let (tx, rx) = broadcast::channel(16);
351
352 let msg = Arc::new(());
353
354 assert_ok!(tx.send(msg.clone()));
355
356 assert_eq!(2, Arc::strong_count(&msg));
357
358 drop(rx);
359
360 assert_eq!(1, Arc::strong_count(&msg));
361 }
362
363 #[test]
single_capacity_recvs()364 fn single_capacity_recvs() {
365 let (tx, mut rx) = broadcast::channel(1);
366
367 assert_ok!(tx.send(1));
368
369 assert_eq!(assert_recv!(rx), 1);
370 assert_empty!(rx);
371 }
372
373 #[test]
single_capacity_recvs_after_drop_1()374 fn single_capacity_recvs_after_drop_1() {
375 let (tx, mut rx) = broadcast::channel(1);
376
377 assert_ok!(tx.send(1));
378 drop(tx);
379
380 assert_eq!(assert_recv!(rx), 1);
381 assert_closed!(rx.try_recv());
382 }
383
384 #[test]
single_capacity_recvs_after_drop_2()385 fn single_capacity_recvs_after_drop_2() {
386 let (tx, mut rx) = broadcast::channel(1);
387
388 assert_ok!(tx.send(1));
389 assert_ok!(tx.send(2));
390 drop(tx);
391
392 assert_lagged!(rx.try_recv(), 1);
393 assert_eq!(assert_recv!(rx), 2);
394 assert_closed!(rx.try_recv());
395 }
396
397 #[test]
dropping_sender_does_not_overwrite()398 fn dropping_sender_does_not_overwrite() {
399 let (tx, mut rx) = broadcast::channel(2);
400
401 assert_ok!(tx.send(1));
402 assert_ok!(tx.send(2));
403 drop(tx);
404
405 assert_eq!(assert_recv!(rx), 1);
406 assert_eq!(assert_recv!(rx), 2);
407 assert_closed!(rx.try_recv());
408 }
409
410 #[test]
lagging_receiver_recovers_after_wrap_closed_1()411 fn lagging_receiver_recovers_after_wrap_closed_1() {
412 let (tx, mut rx) = broadcast::channel(2);
413
414 assert_ok!(tx.send(1));
415 assert_ok!(tx.send(2));
416 assert_ok!(tx.send(3));
417 drop(tx);
418
419 assert_lagged!(rx.try_recv(), 1);
420 assert_eq!(assert_recv!(rx), 2);
421 assert_eq!(assert_recv!(rx), 3);
422 assert_closed!(rx.try_recv());
423 }
424
425 #[test]
lagging_receiver_recovers_after_wrap_closed_2()426 fn lagging_receiver_recovers_after_wrap_closed_2() {
427 let (tx, mut rx) = broadcast::channel(2);
428
429 assert_ok!(tx.send(1));
430 assert_ok!(tx.send(2));
431 assert_ok!(tx.send(3));
432 assert_ok!(tx.send(4));
433 drop(tx);
434
435 assert_lagged!(rx.try_recv(), 2);
436 assert_eq!(assert_recv!(rx), 3);
437 assert_eq!(assert_recv!(rx), 4);
438 assert_closed!(rx.try_recv());
439 }
440
441 #[test]
lagging_receiver_recovers_after_wrap_open()442 fn lagging_receiver_recovers_after_wrap_open() {
443 let (tx, mut rx) = broadcast::channel(2);
444
445 assert_ok!(tx.send(1));
446 assert_ok!(tx.send(2));
447 assert_ok!(tx.send(3));
448
449 assert_lagged!(rx.try_recv(), 1);
450 assert_eq!(assert_recv!(rx), 2);
451 assert_eq!(assert_recv!(rx), 3);
452 assert_empty!(rx);
453 }
454
is_closed(err: broadcast::error::RecvError) -> bool455 fn is_closed(err: broadcast::error::RecvError) -> bool {
456 matches!(err, broadcast::error::RecvError::Closed)
457 }
458