1 //! Tests for channel selection using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12 Duration::from_millis(ms)
13 }
14
15 #[test]
smoke1()16 fn smoke1() {
17 let (s1, r1) = unbounded::<usize>();
18 let (s2, r2) = unbounded::<usize>();
19
20 s1.send(1).unwrap();
21
22 let mut sel = Select::new();
23 let oper1 = sel.recv(&r1);
24 let oper2 = sel.recv(&r2);
25 let oper = sel.select();
26 match oper.index() {
27 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28 i if i == oper2 => panic!(),
29 _ => unreachable!(),
30 }
31
32 s2.send(2).unwrap();
33
34 let mut sel = Select::new();
35 let oper1 = sel.recv(&r1);
36 let oper2 = sel.recv(&r2);
37 let oper = sel.select();
38 match oper.index() {
39 i if i == oper1 => panic!(),
40 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41 _ => unreachable!(),
42 }
43 }
44
45 #[test]
smoke2()46 fn smoke2() {
47 let (_s1, r1) = unbounded::<i32>();
48 let (_s2, r2) = unbounded::<i32>();
49 let (_s3, r3) = unbounded::<i32>();
50 let (_s4, r4) = unbounded::<i32>();
51 let (s5, r5) = unbounded::<i32>();
52
53 s5.send(5).unwrap();
54
55 let mut sel = Select::new();
56 let oper1 = sel.recv(&r1);
57 let oper2 = sel.recv(&r2);
58 let oper3 = sel.recv(&r3);
59 let oper4 = sel.recv(&r4);
60 let oper5 = sel.recv(&r5);
61 let oper = sel.select();
62 match oper.index() {
63 i if i == oper1 => panic!(),
64 i if i == oper2 => panic!(),
65 i if i == oper3 => panic!(),
66 i if i == oper4 => panic!(),
67 i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68 _ => unreachable!(),
69 }
70 }
71
72 #[test]
disconnected()73 fn disconnected() {
74 let (s1, r1) = unbounded::<i32>();
75 let (s2, r2) = unbounded::<i32>();
76
77 scope(|scope| {
78 scope.spawn(|_| {
79 drop(s1);
80 thread::sleep(ms(500));
81 s2.send(5).unwrap();
82 });
83
84 let mut sel = Select::new();
85 let oper1 = sel.recv(&r1);
86 let oper2 = sel.recv(&r2);
87 let oper = sel.select_timeout(ms(1000));
88 match oper {
89 Err(_) => panic!(),
90 Ok(oper) => match oper.index() {
91 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92 i if i == oper2 => panic!(),
93 _ => unreachable!(),
94 },
95 }
96
97 r2.recv().unwrap();
98 })
99 .unwrap();
100
101 let mut sel = Select::new();
102 let oper1 = sel.recv(&r1);
103 let oper2 = sel.recv(&r2);
104 let oper = sel.select_timeout(ms(1000));
105 match oper {
106 Err(_) => panic!(),
107 Ok(oper) => match oper.index() {
108 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109 i if i == oper2 => panic!(),
110 _ => unreachable!(),
111 },
112 }
113
114 scope(|scope| {
115 scope.spawn(|_| {
116 thread::sleep(ms(500));
117 drop(s2);
118 });
119
120 let mut sel = Select::new();
121 let oper1 = sel.recv(&r2);
122 let oper = sel.select_timeout(ms(1000));
123 match oper {
124 Err(_) => panic!(),
125 Ok(oper) => match oper.index() {
126 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127 _ => unreachable!(),
128 },
129 }
130 })
131 .unwrap();
132 }
133
134 #[test]
default()135 fn default() {
136 let (s1, r1) = unbounded::<i32>();
137 let (s2, r2) = unbounded::<i32>();
138
139 let mut sel = Select::new();
140 let _oper1 = sel.recv(&r1);
141 let _oper2 = sel.recv(&r2);
142 let oper = sel.try_select();
143 match oper {
144 Err(_) => {}
145 Ok(_) => panic!(),
146 }
147
148 drop(s1);
149
150 let mut sel = Select::new();
151 let oper1 = sel.recv(&r1);
152 let oper2 = sel.recv(&r2);
153 let oper = sel.try_select();
154 match oper {
155 Err(_) => panic!(),
156 Ok(oper) => match oper.index() {
157 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158 i if i == oper2 => panic!(),
159 _ => unreachable!(),
160 },
161 }
162
163 s2.send(2).unwrap();
164
165 let mut sel = Select::new();
166 let oper1 = sel.recv(&r2);
167 let oper = sel.try_select();
168 match oper {
169 Err(_) => panic!(),
170 Ok(oper) => match oper.index() {
171 i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172 _ => unreachable!(),
173 },
174 }
175
176 let mut sel = Select::new();
177 let _oper1 = sel.recv(&r2);
178 let oper = sel.try_select();
179 match oper {
180 Err(_) => {}
181 Ok(_) => panic!(),
182 }
183
184 let mut sel = Select::new();
185 let oper = sel.try_select();
186 match oper {
187 Err(_) => {}
188 Ok(_) => panic!(),
189 }
190 }
191
192 #[test]
timeout()193 fn timeout() {
194 let (_s1, r1) = unbounded::<i32>();
195 let (s2, r2) = unbounded::<i32>();
196
197 scope(|scope| {
198 scope.spawn(|_| {
199 thread::sleep(ms(1500));
200 s2.send(2).unwrap();
201 });
202
203 let mut sel = Select::new();
204 let oper1 = sel.recv(&r1);
205 let oper2 = sel.recv(&r2);
206 let oper = sel.select_timeout(ms(1000));
207 match oper {
208 Err(_) => {}
209 Ok(oper) => match oper.index() {
210 i if i == oper1 => panic!(),
211 i if i == oper2 => panic!(),
212 _ => unreachable!(),
213 },
214 }
215
216 let mut sel = Select::new();
217 let oper1 = sel.recv(&r1);
218 let oper2 = sel.recv(&r2);
219 let oper = sel.select_timeout(ms(1000));
220 match oper {
221 Err(_) => panic!(),
222 Ok(oper) => match oper.index() {
223 i if i == oper1 => panic!(),
224 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225 _ => unreachable!(),
226 },
227 }
228 })
229 .unwrap();
230
231 scope(|scope| {
232 let (s, r) = unbounded::<i32>();
233
234 scope.spawn(move |_| {
235 thread::sleep(ms(500));
236 drop(s);
237 });
238
239 let mut sel = Select::new();
240 let oper = sel.select_timeout(ms(1000));
241 match oper {
242 Err(_) => {
243 let mut sel = Select::new();
244 let oper1 = sel.recv(&r);
245 let oper = sel.try_select();
246 match oper {
247 Err(_) => panic!(),
248 Ok(oper) => match oper.index() {
249 i if i == oper1 => assert!(oper.recv(&r).is_err()),
250 _ => unreachable!(),
251 },
252 }
253 }
254 Ok(_) => unreachable!(),
255 }
256 })
257 .unwrap();
258 }
259
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262 let (_, r) = unbounded::<i32>();
263
264 let mut sel = Select::new();
265 let oper1 = sel.recv(&r);
266 let oper = sel.try_select();
267 match oper {
268 Err(_) => panic!(),
269 Ok(oper) => match oper.index() {
270 i if i == oper1 => assert!(oper.recv(&r).is_err()),
271 _ => unreachable!(),
272 },
273 }
274
275 let (_, r) = unbounded::<i32>();
276
277 let mut sel = Select::new();
278 let oper1 = sel.recv(&r);
279 let oper = sel.select_timeout(ms(1000));
280 match oper {
281 Err(_) => panic!(),
282 Ok(oper) => match oper.index() {
283 i if i == oper1 => assert!(oper.recv(&r).is_err()),
284 _ => unreachable!(),
285 },
286 }
287
288 let (s, _) = bounded::<i32>(0);
289
290 let mut sel = Select::new();
291 let oper1 = sel.send(&s);
292 let oper = sel.try_select();
293 match oper {
294 Err(_) => panic!(),
295 Ok(oper) => match oper.index() {
296 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297 _ => unreachable!(),
298 },
299 }
300
301 let (s, _) = bounded::<i32>(0);
302
303 let mut sel = Select::new();
304 let oper1 = sel.send(&s);
305 let oper = sel.select_timeout(ms(1000));
306 match oper {
307 Err(_) => panic!(),
308 Ok(oper) => match oper.index() {
309 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310 _ => unreachable!(),
311 },
312 }
313 }
314
315 #[test]
default_only()316 fn default_only() {
317 let start = Instant::now();
318
319 let mut sel = Select::new();
320 let oper = sel.try_select();
321 assert!(oper.is_err());
322 let now = Instant::now();
323 assert!(now - start <= ms(50));
324
325 let start = Instant::now();
326 let mut sel = Select::new();
327 let oper = sel.select_timeout(ms(500));
328 assert!(oper.is_err());
329 let now = Instant::now();
330 assert!(now - start >= ms(450));
331 assert!(now - start <= ms(550));
332 }
333
334 #[test]
unblocks()335 fn unblocks() {
336 let (s1, r1) = bounded::<i32>(0);
337 let (s2, r2) = bounded::<i32>(0);
338
339 scope(|scope| {
340 scope.spawn(|_| {
341 thread::sleep(ms(500));
342 s2.send(2).unwrap();
343 });
344
345 let mut sel = Select::new();
346 let oper1 = sel.recv(&r1);
347 let oper2 = sel.recv(&r2);
348 let oper = sel.select_timeout(ms(1000));
349 match oper {
350 Err(_) => panic!(),
351 Ok(oper) => match oper.index() {
352 i if i == oper1 => panic!(),
353 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354 _ => unreachable!(),
355 },
356 }
357 })
358 .unwrap();
359
360 scope(|scope| {
361 scope.spawn(|_| {
362 thread::sleep(ms(500));
363 assert_eq!(r1.recv().unwrap(), 1);
364 });
365
366 let mut sel = Select::new();
367 let oper1 = sel.send(&s1);
368 let oper2 = sel.send(&s2);
369 let oper = sel.select_timeout(ms(1000));
370 match oper {
371 Err(_) => panic!(),
372 Ok(oper) => match oper.index() {
373 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374 i if i == oper2 => panic!(),
375 _ => unreachable!(),
376 },
377 }
378 })
379 .unwrap();
380 }
381
382 #[test]
both_ready()383 fn both_ready() {
384 let (s1, r1) = bounded(0);
385 let (s2, r2) = bounded(0);
386
387 scope(|scope| {
388 scope.spawn(|_| {
389 thread::sleep(ms(500));
390 s1.send(1).unwrap();
391 assert_eq!(r2.recv().unwrap(), 2);
392 });
393
394 for _ in 0..2 {
395 let mut sel = Select::new();
396 let oper1 = sel.recv(&r1);
397 let oper2 = sel.send(&s2);
398 let oper = sel.select();
399 match oper.index() {
400 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402 _ => unreachable!(),
403 }
404 }
405 })
406 .unwrap();
407 }
408
409 #[test]
loop_try()410 fn loop_try() {
411 const RUNS: usize = 20;
412
413 for _ in 0..RUNS {
414 let (s1, r1) = bounded::<i32>(0);
415 let (s2, r2) = bounded::<i32>(0);
416 let (s_end, r_end) = bounded::<()>(0);
417
418 scope(|scope| {
419 scope.spawn(|_| loop {
420 let mut done = false;
421
422 let mut sel = Select::new();
423 let oper1 = sel.send(&s1);
424 let oper = sel.try_select();
425 match oper {
426 Err(_) => {}
427 Ok(oper) => match oper.index() {
428 i if i == oper1 => {
429 let _ = oper.send(&s1, 1);
430 done = true;
431 }
432 _ => unreachable!(),
433 },
434 }
435 if done {
436 break;
437 }
438
439 let mut sel = Select::new();
440 let oper1 = sel.recv(&r_end);
441 let oper = sel.try_select();
442 match oper {
443 Err(_) => {}
444 Ok(oper) => match oper.index() {
445 i if i == oper1 => {
446 let _ = oper.recv(&r_end);
447 done = true;
448 }
449 _ => unreachable!(),
450 },
451 }
452 if done {
453 break;
454 }
455 });
456
457 scope.spawn(|_| loop {
458 if let Ok(x) = r2.try_recv() {
459 assert_eq!(x, 2);
460 break;
461 }
462
463 let mut done = false;
464 let mut sel = Select::new();
465 let oper1 = sel.recv(&r_end);
466 let oper = sel.try_select();
467 match oper {
468 Err(_) => {}
469 Ok(oper) => match oper.index() {
470 i if i == oper1 => {
471 let _ = oper.recv(&r_end);
472 done = true;
473 }
474 _ => unreachable!(),
475 },
476 }
477 if done {
478 break;
479 }
480 });
481
482 scope.spawn(|_| {
483 thread::sleep(ms(500));
484
485 let mut sel = Select::new();
486 let oper1 = sel.recv(&r1);
487 let oper2 = sel.send(&s2);
488 let oper = sel.select_timeout(ms(1000));
489 match oper {
490 Err(_) => {}
491 Ok(oper) => match oper.index() {
492 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493 i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494 _ => unreachable!(),
495 },
496 }
497
498 drop(s_end);
499 });
500 })
501 .unwrap();
502 }
503 }
504
505 #[test]
cloning1()506 fn cloning1() {
507 scope(|scope| {
508 let (s1, r1) = unbounded::<i32>();
509 let (_s2, r2) = unbounded::<i32>();
510 let (s3, r3) = unbounded::<()>();
511
512 scope.spawn(move |_| {
513 r3.recv().unwrap();
514 drop(s1.clone());
515 assert!(r3.try_recv().is_err());
516 s1.send(1).unwrap();
517 r3.recv().unwrap();
518 });
519
520 s3.send(()).unwrap();
521
522 let mut sel = Select::new();
523 let oper1 = sel.recv(&r1);
524 let oper2 = sel.recv(&r2);
525 let oper = sel.select();
526 match oper.index() {
527 i if i == oper1 => drop(oper.recv(&r1)),
528 i if i == oper2 => drop(oper.recv(&r2)),
529 _ => unreachable!(),
530 }
531
532 s3.send(()).unwrap();
533 })
534 .unwrap();
535 }
536
537 #[test]
cloning2()538 fn cloning2() {
539 let (s1, r1) = unbounded::<()>();
540 let (s2, r2) = unbounded::<()>();
541 let (_s3, _r3) = unbounded::<()>();
542
543 scope(|scope| {
544 scope.spawn(move |_| {
545 let mut sel = Select::new();
546 let oper1 = sel.recv(&r1);
547 let oper2 = sel.recv(&r2);
548 let oper = sel.select();
549 match oper.index() {
550 i if i == oper1 => panic!(),
551 i if i == oper2 => drop(oper.recv(&r2)),
552 _ => unreachable!(),
553 }
554 });
555
556 thread::sleep(ms(500));
557 drop(s1.clone());
558 s2.send(()).unwrap();
559 })
560 .unwrap();
561 }
562
563 #[test]
preflight1()564 fn preflight1() {
565 let (s, r) = unbounded();
566 s.send(()).unwrap();
567
568 let mut sel = Select::new();
569 let oper1 = sel.recv(&r);
570 let oper = sel.select();
571 match oper.index() {
572 i if i == oper1 => drop(oper.recv(&r)),
573 _ => unreachable!(),
574 }
575 }
576
577 #[test]
preflight2()578 fn preflight2() {
579 let (s, r) = unbounded();
580 drop(s.clone());
581 s.send(()).unwrap();
582 drop(s);
583
584 let mut sel = Select::new();
585 let oper1 = sel.recv(&r);
586 let oper = sel.select();
587 match oper.index() {
588 i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589 _ => unreachable!(),
590 }
591
592 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594
595 #[test]
preflight3()596 fn preflight3() {
597 let (s, r) = unbounded();
598 drop(s.clone());
599 s.send(()).unwrap();
600 drop(s);
601 r.recv().unwrap();
602
603 let mut sel = Select::new();
604 let oper1 = sel.recv(&r);
605 let oper = sel.select();
606 match oper.index() {
607 i if i == oper1 => assert!(oper.recv(&r).is_err()),
608 _ => unreachable!(),
609 }
610 }
611
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614 let (s, r) = unbounded::<i32>();
615 let hit = vec![Cell::new(false); 4];
616
617 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618 let mut sel = Select::new();
619 let oper0 = sel.recv(&r);
620 let oper1 = sel.recv(&r);
621 let oper2 = sel.send(&s);
622 let oper3 = sel.send(&s);
623 let oper = sel.select();
624 match oper.index() {
625 i if i == oper0 => {
626 assert!(oper.recv(&r).is_ok());
627 hit[0].set(true);
628 }
629 i if i == oper1 => {
630 assert!(oper.recv(&r).is_ok());
631 hit[1].set(true);
632 }
633 i if i == oper2 => {
634 assert!(oper.send(&s, 0).is_ok());
635 hit[2].set(true);
636 }
637 i if i == oper3 => {
638 assert!(oper.send(&s, 0).is_ok());
639 hit[3].set(true);
640 }
641 _ => unreachable!(),
642 }
643 }
644 }
645
646 #[test]
nesting()647 fn nesting() {
648 let (s, r) = unbounded::<i32>();
649
650 let mut sel = Select::new();
651 let oper1 = sel.send(&s);
652 let oper = sel.select();
653 match oper.index() {
654 i if i == oper1 => {
655 assert!(oper.send(&s, 0).is_ok());
656
657 let mut sel = Select::new();
658 let oper1 = sel.recv(&r);
659 let oper = sel.select();
660 match oper.index() {
661 i if i == oper1 => {
662 assert_eq!(oper.recv(&r), Ok(0));
663
664 let mut sel = Select::new();
665 let oper1 = sel.send(&s);
666 let oper = sel.select();
667 match oper.index() {
668 i if i == oper1 => {
669 assert!(oper.send(&s, 1).is_ok());
670
671 let mut sel = Select::new();
672 let oper1 = sel.recv(&r);
673 let oper = sel.select();
674 match oper.index() {
675 i if i == oper1 => {
676 assert_eq!(oper.recv(&r), Ok(1));
677 }
678 _ => unreachable!(),
679 }
680 }
681 _ => unreachable!(),
682 }
683 }
684 _ => unreachable!(),
685 }
686 }
687 _ => unreachable!(),
688 }
689 }
690
691 #[test]
stress_recv()692 fn stress_recv() {
693 const COUNT: usize = 10_000;
694
695 let (s1, r1) = unbounded();
696 let (s2, r2) = bounded(5);
697 let (s3, r3) = bounded(100);
698
699 scope(|scope| {
700 scope.spawn(|_| {
701 for i in 0..COUNT {
702 s1.send(i).unwrap();
703 r3.recv().unwrap();
704
705 s2.send(i).unwrap();
706 r3.recv().unwrap();
707 }
708 });
709
710 for i in 0..COUNT {
711 for _ in 0..2 {
712 let mut sel = Select::new();
713 let oper1 = sel.recv(&r1);
714 let oper2 = sel.recv(&r2);
715 let oper = sel.select();
716 match oper.index() {
717 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
718 ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
719 _ => unreachable!(),
720 }
721
722 s3.send(()).unwrap();
723 }
724 }
725 })
726 .unwrap();
727 }
728
729 #[test]
stress_send()730 fn stress_send() {
731 const COUNT: usize = 10_000;
732
733 let (s1, r1) = bounded(0);
734 let (s2, r2) = bounded(0);
735 let (s3, r3) = bounded(100);
736
737 scope(|scope| {
738 scope.spawn(|_| {
739 for i in 0..COUNT {
740 assert_eq!(r1.recv().unwrap(), i);
741 assert_eq!(r2.recv().unwrap(), i);
742 r3.recv().unwrap();
743 }
744 });
745
746 for i in 0..COUNT {
747 for _ in 0..2 {
748 let mut sel = Select::new();
749 let oper1 = sel.send(&s1);
750 let oper2 = sel.send(&s2);
751 let oper = sel.select();
752 match oper.index() {
753 ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
754 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
755 _ => unreachable!(),
756 }
757 }
758 s3.send(()).unwrap();
759 }
760 })
761 .unwrap();
762 }
763
764 #[test]
stress_mixed()765 fn stress_mixed() {
766 const COUNT: usize = 10_000;
767
768 let (s1, r1) = bounded(0);
769 let (s2, r2) = bounded(0);
770 let (s3, r3) = bounded(100);
771
772 scope(|scope| {
773 scope.spawn(|_| {
774 for i in 0..COUNT {
775 s1.send(i).unwrap();
776 assert_eq!(r2.recv().unwrap(), i);
777 r3.recv().unwrap();
778 }
779 });
780
781 for i in 0..COUNT {
782 for _ in 0..2 {
783 let mut sel = Select::new();
784 let oper1 = sel.recv(&r1);
785 let oper2 = sel.send(&s2);
786 let oper = sel.select();
787 match oper.index() {
788 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
789 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
790 _ => unreachable!(),
791 }
792 }
793 s3.send(()).unwrap();
794 }
795 })
796 .unwrap();
797 }
798
799 #[test]
stress_timeout_two_threads()800 fn stress_timeout_two_threads() {
801 const COUNT: usize = 20;
802
803 let (s, r) = bounded(2);
804
805 scope(|scope| {
806 scope.spawn(|_| {
807 for i in 0..COUNT {
808 if i % 2 == 0 {
809 thread::sleep(ms(500));
810 }
811
812 let done = false;
813 while !done {
814 let mut sel = Select::new();
815 let oper1 = sel.send(&s);
816 let oper = sel.select_timeout(ms(100));
817 match oper {
818 Err(_) => {}
819 Ok(oper) => match oper.index() {
820 ix if ix == oper1 => {
821 assert!(oper.send(&s, i).is_ok());
822 break;
823 }
824 _ => unreachable!(),
825 },
826 }
827 }
828 }
829 });
830
831 scope.spawn(|_| {
832 for i in 0..COUNT {
833 if i % 2 == 0 {
834 thread::sleep(ms(500));
835 }
836
837 let mut done = false;
838 while !done {
839 let mut sel = Select::new();
840 let oper1 = sel.recv(&r);
841 let oper = sel.select_timeout(ms(100));
842 match oper {
843 Err(_) => {}
844 Ok(oper) => match oper.index() {
845 ix if ix == oper1 => {
846 assert_eq!(oper.recv(&r), Ok(i));
847 done = true;
848 }
849 _ => unreachable!(),
850 },
851 }
852 }
853 }
854 });
855 })
856 .unwrap();
857 }
858
859 #[test]
send_recv_same_channel()860 fn send_recv_same_channel() {
861 let (s, r) = bounded::<i32>(0);
862 let mut sel = Select::new();
863 let oper1 = sel.send(&s);
864 let oper2 = sel.recv(&r);
865 let oper = sel.select_timeout(ms(100));
866 match oper {
867 Err(_) => {}
868 Ok(oper) => match oper.index() {
869 ix if ix == oper1 => panic!(),
870 ix if ix == oper2 => panic!(),
871 _ => unreachable!(),
872 },
873 }
874
875 let (s, r) = unbounded::<i32>();
876 let mut sel = Select::new();
877 let oper1 = sel.send(&s);
878 let oper2 = sel.recv(&r);
879 let oper = sel.select_timeout(ms(100));
880 match oper {
881 Err(_) => panic!(),
882 Ok(oper) => match oper.index() {
883 ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
884 ix if ix == oper2 => panic!(),
885 _ => unreachable!(),
886 },
887 }
888 }
889
890 #[test]
matching()891 fn matching() {
892 const THREADS: usize = 44;
893
894 let (s, r) = &bounded::<usize>(0);
895
896 scope(|scope| {
897 for i in 0..THREADS {
898 scope.spawn(move |_| {
899 let mut sel = Select::new();
900 let oper1 = sel.recv(&r);
901 let oper2 = sel.send(&s);
902 let oper = sel.select();
903 match oper.index() {
904 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
905 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
906 _ => unreachable!(),
907 }
908 });
909 }
910 })
911 .unwrap();
912
913 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
914 }
915
916 #[test]
matching_with_leftover()917 fn matching_with_leftover() {
918 const THREADS: usize = 55;
919
920 let (s, r) = &bounded::<usize>(0);
921
922 scope(|scope| {
923 for i in 0..THREADS {
924 scope.spawn(move |_| {
925 let mut sel = Select::new();
926 let oper1 = sel.recv(&r);
927 let oper2 = sel.send(&s);
928 let oper = sel.select();
929 match oper.index() {
930 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
931 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
932 _ => unreachable!(),
933 }
934 });
935 }
936 s.send(!0).unwrap();
937 })
938 .unwrap();
939
940 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
941 }
942
943 #[test]
channel_through_channel()944 fn channel_through_channel() {
945 const COUNT: usize = 1000;
946
947 type T = Box<dyn Any + Send>;
948
949 for cap in 0..3 {
950 let (s, r) = bounded::<T>(cap);
951
952 scope(|scope| {
953 scope.spawn(move |_| {
954 let mut s = s;
955
956 for _ in 0..COUNT {
957 let (new_s, new_r) = bounded(cap);
958 let new_r: T = Box::new(Some(new_r));
959
960 {
961 let mut sel = Select::new();
962 let oper1 = sel.send(&s);
963 let oper = sel.select();
964 match oper.index() {
965 ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
966 _ => unreachable!(),
967 }
968 }
969
970 s = new_s;
971 }
972 });
973
974 scope.spawn(move |_| {
975 let mut r = r;
976
977 for _ in 0..COUNT {
978 let new = {
979 let mut sel = Select::new();
980 let oper1 = sel.recv(&r);
981 let oper = sel.select();
982 match oper.index() {
983 ix if ix == oper1 => oper
984 .recv(&r)
985 .unwrap()
986 .downcast_mut::<Option<Receiver<T>>>()
987 .unwrap()
988 .take()
989 .unwrap(),
990 _ => unreachable!(),
991 }
992 };
993 r = new;
994 }
995 });
996 })
997 .unwrap();
998 }
999 }
1000
1001 #[test]
linearizable_try()1002 fn linearizable_try() {
1003 const COUNT: usize = 100_000;
1004
1005 for step in 0..2 {
1006 let (start_s, start_r) = bounded::<()>(0);
1007 let (end_s, end_r) = bounded::<()>(0);
1008
1009 let ((s1, r1), (s2, r2)) = if step == 0 {
1010 (bounded::<i32>(1), bounded::<i32>(1))
1011 } else {
1012 (unbounded::<i32>(), unbounded::<i32>())
1013 };
1014
1015 scope(|scope| {
1016 scope.spawn(|_| {
1017 for _ in 0..COUNT {
1018 start_s.send(()).unwrap();
1019
1020 s1.send(1).unwrap();
1021
1022 let mut sel = Select::new();
1023 let oper1 = sel.recv(&r1);
1024 let oper2 = sel.recv(&r2);
1025 let oper = sel.try_select();
1026 match oper {
1027 Err(_) => unreachable!(),
1028 Ok(oper) => match oper.index() {
1029 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1030 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1031 _ => unreachable!(),
1032 },
1033 }
1034
1035 end_s.send(()).unwrap();
1036 let _ = r2.try_recv();
1037 }
1038 });
1039
1040 for _ in 0..COUNT {
1041 start_r.recv().unwrap();
1042
1043 s2.send(1).unwrap();
1044 let _ = r1.try_recv();
1045
1046 end_r.recv().unwrap();
1047 }
1048 })
1049 .unwrap();
1050 }
1051 }
1052
1053 #[test]
linearizable_timeout()1054 fn linearizable_timeout() {
1055 const COUNT: usize = 100_000;
1056
1057 for step in 0..2 {
1058 let (start_s, start_r) = bounded::<()>(0);
1059 let (end_s, end_r) = bounded::<()>(0);
1060
1061 let ((s1, r1), (s2, r2)) = if step == 0 {
1062 (bounded::<i32>(1), bounded::<i32>(1))
1063 } else {
1064 (unbounded::<i32>(), unbounded::<i32>())
1065 };
1066
1067 scope(|scope| {
1068 scope.spawn(|_| {
1069 for _ in 0..COUNT {
1070 start_s.send(()).unwrap();
1071
1072 s1.send(1).unwrap();
1073
1074 let mut sel = Select::new();
1075 let oper1 = sel.recv(&r1);
1076 let oper2 = sel.recv(&r2);
1077 let oper = sel.select_timeout(ms(0));
1078 match oper {
1079 Err(_) => unreachable!(),
1080 Ok(oper) => match oper.index() {
1081 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1082 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1083 _ => unreachable!(),
1084 },
1085 }
1086
1087 end_s.send(()).unwrap();
1088 let _ = r2.try_recv();
1089 }
1090 });
1091
1092 for _ in 0..COUNT {
1093 start_r.recv().unwrap();
1094
1095 s2.send(1).unwrap();
1096 let _ = r1.try_recv();
1097
1098 end_r.recv().unwrap();
1099 }
1100 })
1101 .unwrap();
1102 }
1103 }
1104
1105 #[test]
fairness1()1106 fn fairness1() {
1107 const COUNT: usize = 10_000;
1108
1109 let (s1, r1) = bounded::<()>(COUNT);
1110 let (s2, r2) = unbounded::<()>();
1111
1112 for _ in 0..COUNT {
1113 s1.send(()).unwrap();
1114 s2.send(()).unwrap();
1115 }
1116
1117 let hits = vec![Cell::new(0usize); 4];
1118 for _ in 0..COUNT {
1119 let after = after(ms(0));
1120 let tick = tick(ms(0));
1121
1122 let mut sel = Select::new();
1123 let oper1 = sel.recv(&r1);
1124 let oper2 = sel.recv(&r2);
1125 let oper3 = sel.recv(&after);
1126 let oper4 = sel.recv(&tick);
1127 let oper = sel.select();
1128 match oper.index() {
1129 i if i == oper1 => {
1130 oper.recv(&r1).unwrap();
1131 hits[0].set(hits[0].get() + 1);
1132 }
1133 i if i == oper2 => {
1134 oper.recv(&r2).unwrap();
1135 hits[1].set(hits[1].get() + 1);
1136 }
1137 i if i == oper3 => {
1138 oper.recv(&after).unwrap();
1139 hits[2].set(hits[2].get() + 1);
1140 }
1141 i if i == oper4 => {
1142 oper.recv(&tick).unwrap();
1143 hits[3].set(hits[3].get() + 1);
1144 }
1145 _ => unreachable!(),
1146 }
1147 }
1148 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1149 }
1150
1151 #[test]
fairness2()1152 fn fairness2() {
1153 const COUNT: usize = 10_000;
1154
1155 let (s1, r1) = unbounded::<()>();
1156 let (s2, r2) = bounded::<()>(1);
1157 let (s3, r3) = bounded::<()>(0);
1158
1159 scope(|scope| {
1160 scope.spawn(|_| {
1161 for _ in 0..COUNT {
1162 let mut sel = Select::new();
1163 let mut oper1 = None;
1164 let mut oper2 = None;
1165 if s1.is_empty() {
1166 oper1 = Some(sel.send(&s1));
1167 }
1168 if s2.is_empty() {
1169 oper2 = Some(sel.send(&s2));
1170 }
1171 let oper3 = sel.send(&s3);
1172 let oper = sel.select();
1173 match oper.index() {
1174 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1175 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1176 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1177 _ => unreachable!(),
1178 }
1179 }
1180 });
1181
1182 let hits = vec![Cell::new(0usize); 3];
1183 for _ in 0..COUNT {
1184 let mut sel = Select::new();
1185 let oper1 = sel.recv(&r1);
1186 let oper2 = sel.recv(&r2);
1187 let oper3 = sel.recv(&r3);
1188 let oper = sel.select();
1189 match oper.index() {
1190 i if i == oper1 => {
1191 oper.recv(&r1).unwrap();
1192 hits[0].set(hits[0].get() + 1);
1193 }
1194 i if i == oper2 => {
1195 oper.recv(&r2).unwrap();
1196 hits[1].set(hits[1].get() + 1);
1197 }
1198 i if i == oper3 => {
1199 oper.recv(&r3).unwrap();
1200 hits[2].set(hits[2].get() + 1);
1201 }
1202 _ => unreachable!(),
1203 }
1204 }
1205 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1206 })
1207 .unwrap();
1208 }
1209
1210 #[test]
sync_and_clone()1211 fn sync_and_clone() {
1212 const THREADS: usize = 20;
1213
1214 let (s, r) = &bounded::<usize>(0);
1215
1216 let mut sel = Select::new();
1217 let oper1 = sel.recv(&r);
1218 let oper2 = sel.send(&s);
1219 let sel = &sel;
1220
1221 scope(|scope| {
1222 for i in 0..THREADS {
1223 scope.spawn(move |_| {
1224 let mut sel = sel.clone();
1225 let oper = sel.select();
1226 match oper.index() {
1227 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1228 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1229 _ => unreachable!(),
1230 }
1231 });
1232 }
1233 })
1234 .unwrap();
1235
1236 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1237 }
1238
1239 #[test]
send_and_clone()1240 fn send_and_clone() {
1241 const THREADS: usize = 20;
1242
1243 let (s, r) = &bounded::<usize>(0);
1244
1245 let mut sel = Select::new();
1246 let oper1 = sel.recv(&r);
1247 let oper2 = sel.send(&s);
1248
1249 scope(|scope| {
1250 for i in 0..THREADS {
1251 let mut sel = sel.clone();
1252 scope.spawn(move |_| {
1253 let oper = sel.select();
1254 match oper.index() {
1255 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1256 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1257 _ => unreachable!(),
1258 }
1259 });
1260 }
1261 })
1262 .unwrap();
1263
1264 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1265 }
1266
1267 #[test]
reuse()1268 fn reuse() {
1269 const COUNT: usize = 10_000;
1270
1271 let (s1, r1) = bounded(0);
1272 let (s2, r2) = bounded(0);
1273 let (s3, r3) = bounded(100);
1274
1275 scope(|scope| {
1276 scope.spawn(|_| {
1277 for i in 0..COUNT {
1278 s1.send(i).unwrap();
1279 assert_eq!(r2.recv().unwrap(), i);
1280 r3.recv().unwrap();
1281 }
1282 });
1283
1284 let mut sel = Select::new();
1285 let oper1 = sel.recv(&r1);
1286 let oper2 = sel.send(&s2);
1287
1288 for i in 0..COUNT {
1289 for _ in 0..2 {
1290 let oper = sel.select();
1291 match oper.index() {
1292 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1293 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1294 _ => unreachable!(),
1295 }
1296 }
1297 s3.send(()).unwrap();
1298 }
1299 })
1300 .unwrap();
1301 }
1302