1 //! Tests for channel selection using the `Select` struct.
2 
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10 
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12     Duration::from_millis(ms)
13 }
14 
15 #[test]
smoke1()16 fn smoke1() {
17     let (s1, r1) = unbounded::<usize>();
18     let (s2, r2) = unbounded::<usize>();
19 
20     s1.send(1).unwrap();
21 
22     let mut sel = Select::new();
23     let oper1 = sel.recv(&r1);
24     let oper2 = sel.recv(&r2);
25     let oper = sel.select();
26     match oper.index() {
27         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28         i if i == oper2 => panic!(),
29         _ => unreachable!(),
30     }
31 
32     s2.send(2).unwrap();
33 
34     let mut sel = Select::new();
35     let oper1 = sel.recv(&r1);
36     let oper2 = sel.recv(&r2);
37     let oper = sel.select();
38     match oper.index() {
39         i if i == oper1 => panic!(),
40         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41         _ => unreachable!(),
42     }
43 }
44 
45 #[test]
smoke2()46 fn smoke2() {
47     let (_s1, r1) = unbounded::<i32>();
48     let (_s2, r2) = unbounded::<i32>();
49     let (_s3, r3) = unbounded::<i32>();
50     let (_s4, r4) = unbounded::<i32>();
51     let (s5, r5) = unbounded::<i32>();
52 
53     s5.send(5).unwrap();
54 
55     let mut sel = Select::new();
56     let oper1 = sel.recv(&r1);
57     let oper2 = sel.recv(&r2);
58     let oper3 = sel.recv(&r3);
59     let oper4 = sel.recv(&r4);
60     let oper5 = sel.recv(&r5);
61     let oper = sel.select();
62     match oper.index() {
63         i if i == oper1 => panic!(),
64         i if i == oper2 => panic!(),
65         i if i == oper3 => panic!(),
66         i if i == oper4 => panic!(),
67         i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68         _ => unreachable!(),
69     }
70 }
71 
72 #[test]
disconnected()73 fn disconnected() {
74     let (s1, r1) = unbounded::<i32>();
75     let (s2, r2) = unbounded::<i32>();
76 
77     scope(|scope| {
78         scope.spawn(|_| {
79             drop(s1);
80             thread::sleep(ms(500));
81             s2.send(5).unwrap();
82         });
83 
84         let mut sel = Select::new();
85         let oper1 = sel.recv(&r1);
86         let oper2 = sel.recv(&r2);
87         let oper = sel.select_timeout(ms(1000));
88         match oper {
89             Err(_) => panic!(),
90             Ok(oper) => match oper.index() {
91                 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92                 i if i == oper2 => panic!(),
93                 _ => unreachable!(),
94             },
95         }
96 
97         r2.recv().unwrap();
98     })
99     .unwrap();
100 
101     let mut sel = Select::new();
102     let oper1 = sel.recv(&r1);
103     let oper2 = sel.recv(&r2);
104     let oper = sel.select_timeout(ms(1000));
105     match oper {
106         Err(_) => panic!(),
107         Ok(oper) => match oper.index() {
108             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109             i if i == oper2 => panic!(),
110             _ => unreachable!(),
111         },
112     }
113 
114     scope(|scope| {
115         scope.spawn(|_| {
116             thread::sleep(ms(500));
117             drop(s2);
118         });
119 
120         let mut sel = Select::new();
121         let oper1 = sel.recv(&r2);
122         let oper = sel.select_timeout(ms(1000));
123         match oper {
124             Err(_) => panic!(),
125             Ok(oper) => match oper.index() {
126                 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127                 _ => unreachable!(),
128             },
129         }
130     })
131     .unwrap();
132 }
133 
134 #[test]
default()135 fn default() {
136     let (s1, r1) = unbounded::<i32>();
137     let (s2, r2) = unbounded::<i32>();
138 
139     let mut sel = Select::new();
140     let _oper1 = sel.recv(&r1);
141     let _oper2 = sel.recv(&r2);
142     let oper = sel.try_select();
143     match oper {
144         Err(_) => {}
145         Ok(_) => panic!(),
146     }
147 
148     drop(s1);
149 
150     let mut sel = Select::new();
151     let oper1 = sel.recv(&r1);
152     let oper2 = sel.recv(&r2);
153     let oper = sel.try_select();
154     match oper {
155         Err(_) => panic!(),
156         Ok(oper) => match oper.index() {
157             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158             i if i == oper2 => panic!(),
159             _ => unreachable!(),
160         },
161     }
162 
163     s2.send(2).unwrap();
164 
165     let mut sel = Select::new();
166     let oper1 = sel.recv(&r2);
167     let oper = sel.try_select();
168     match oper {
169         Err(_) => panic!(),
170         Ok(oper) => match oper.index() {
171             i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172             _ => unreachable!(),
173         },
174     }
175 
176     let mut sel = Select::new();
177     let _oper1 = sel.recv(&r2);
178     let oper = sel.try_select();
179     match oper {
180         Err(_) => {}
181         Ok(_) => panic!(),
182     }
183 
184     let mut sel = Select::new();
185     let oper = sel.try_select();
186     match oper {
187         Err(_) => {}
188         Ok(_) => panic!(),
189     }
190 }
191 
192 #[test]
timeout()193 fn timeout() {
194     let (_s1, r1) = unbounded::<i32>();
195     let (s2, r2) = unbounded::<i32>();
196 
197     scope(|scope| {
198         scope.spawn(|_| {
199             thread::sleep(ms(1500));
200             s2.send(2).unwrap();
201         });
202 
203         let mut sel = Select::new();
204         let oper1 = sel.recv(&r1);
205         let oper2 = sel.recv(&r2);
206         let oper = sel.select_timeout(ms(1000));
207         match oper {
208             Err(_) => {}
209             Ok(oper) => match oper.index() {
210                 i if i == oper1 => panic!(),
211                 i if i == oper2 => panic!(),
212                 _ => unreachable!(),
213             },
214         }
215 
216         let mut sel = Select::new();
217         let oper1 = sel.recv(&r1);
218         let oper2 = sel.recv(&r2);
219         let oper = sel.select_timeout(ms(1000));
220         match oper {
221             Err(_) => panic!(),
222             Ok(oper) => match oper.index() {
223                 i if i == oper1 => panic!(),
224                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225                 _ => unreachable!(),
226             },
227         }
228     })
229     .unwrap();
230 
231     scope(|scope| {
232         let (s, r) = unbounded::<i32>();
233 
234         scope.spawn(move |_| {
235             thread::sleep(ms(500));
236             drop(s);
237         });
238 
239         let mut sel = Select::new();
240         let oper = sel.select_timeout(ms(1000));
241         match oper {
242             Err(_) => {
243                 let mut sel = Select::new();
244                 let oper1 = sel.recv(&r);
245                 let oper = sel.try_select();
246                 match oper {
247                     Err(_) => panic!(),
248                     Ok(oper) => match oper.index() {
249                         i if i == oper1 => assert!(oper.recv(&r).is_err()),
250                         _ => unreachable!(),
251                     },
252                 }
253             }
254             Ok(_) => unreachable!(),
255         }
256     })
257     .unwrap();
258 }
259 
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262     let (_, r) = unbounded::<i32>();
263 
264     let mut sel = Select::new();
265     let oper1 = sel.recv(&r);
266     let oper = sel.try_select();
267     match oper {
268         Err(_) => panic!(),
269         Ok(oper) => match oper.index() {
270             i if i == oper1 => assert!(oper.recv(&r).is_err()),
271             _ => unreachable!(),
272         },
273     }
274 
275     let (_, r) = unbounded::<i32>();
276 
277     let mut sel = Select::new();
278     let oper1 = sel.recv(&r);
279     let oper = sel.select_timeout(ms(1000));
280     match oper {
281         Err(_) => panic!(),
282         Ok(oper) => match oper.index() {
283             i if i == oper1 => assert!(oper.recv(&r).is_err()),
284             _ => unreachable!(),
285         },
286     }
287 
288     let (s, _) = bounded::<i32>(0);
289 
290     let mut sel = Select::new();
291     let oper1 = sel.send(&s);
292     let oper = sel.try_select();
293     match oper {
294         Err(_) => panic!(),
295         Ok(oper) => match oper.index() {
296             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297             _ => unreachable!(),
298         },
299     }
300 
301     let (s, _) = bounded::<i32>(0);
302 
303     let mut sel = Select::new();
304     let oper1 = sel.send(&s);
305     let oper = sel.select_timeout(ms(1000));
306     match oper {
307         Err(_) => panic!(),
308         Ok(oper) => match oper.index() {
309             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310             _ => unreachable!(),
311         },
312     }
313 }
314 
315 #[test]
default_only()316 fn default_only() {
317     let start = Instant::now();
318 
319     let mut sel = Select::new();
320     let oper = sel.try_select();
321     assert!(oper.is_err());
322     let now = Instant::now();
323     assert!(now - start <= ms(50));
324 
325     let start = Instant::now();
326     let mut sel = Select::new();
327     let oper = sel.select_timeout(ms(500));
328     assert!(oper.is_err());
329     let now = Instant::now();
330     assert!(now - start >= ms(450));
331     assert!(now - start <= ms(550));
332 }
333 
334 #[test]
unblocks()335 fn unblocks() {
336     let (s1, r1) = bounded::<i32>(0);
337     let (s2, r2) = bounded::<i32>(0);
338 
339     scope(|scope| {
340         scope.spawn(|_| {
341             thread::sleep(ms(500));
342             s2.send(2).unwrap();
343         });
344 
345         let mut sel = Select::new();
346         let oper1 = sel.recv(&r1);
347         let oper2 = sel.recv(&r2);
348         let oper = sel.select_timeout(ms(1000));
349         match oper {
350             Err(_) => panic!(),
351             Ok(oper) => match oper.index() {
352                 i if i == oper1 => panic!(),
353                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354                 _ => unreachable!(),
355             },
356         }
357     })
358     .unwrap();
359 
360     scope(|scope| {
361         scope.spawn(|_| {
362             thread::sleep(ms(500));
363             assert_eq!(r1.recv().unwrap(), 1);
364         });
365 
366         let mut sel = Select::new();
367         let oper1 = sel.send(&s1);
368         let oper2 = sel.send(&s2);
369         let oper = sel.select_timeout(ms(1000));
370         match oper {
371             Err(_) => panic!(),
372             Ok(oper) => match oper.index() {
373                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374                 i if i == oper2 => panic!(),
375                 _ => unreachable!(),
376             },
377         }
378     })
379     .unwrap();
380 }
381 
382 #[test]
both_ready()383 fn both_ready() {
384     let (s1, r1) = bounded(0);
385     let (s2, r2) = bounded(0);
386 
387     scope(|scope| {
388         scope.spawn(|_| {
389             thread::sleep(ms(500));
390             s1.send(1).unwrap();
391             assert_eq!(r2.recv().unwrap(), 2);
392         });
393 
394         for _ in 0..2 {
395             let mut sel = Select::new();
396             let oper1 = sel.recv(&r1);
397             let oper2 = sel.send(&s2);
398             let oper = sel.select();
399             match oper.index() {
400                 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401                 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402                 _ => unreachable!(),
403             }
404         }
405     })
406     .unwrap();
407 }
408 
409 #[test]
loop_try()410 fn loop_try() {
411     const RUNS: usize = 20;
412 
413     for _ in 0..RUNS {
414         let (s1, r1) = bounded::<i32>(0);
415         let (s2, r2) = bounded::<i32>(0);
416         let (s_end, r_end) = bounded::<()>(0);
417 
418         scope(|scope| {
419             scope.spawn(|_| loop {
420                 let mut done = false;
421 
422                 let mut sel = Select::new();
423                 let oper1 = sel.send(&s1);
424                 let oper = sel.try_select();
425                 match oper {
426                     Err(_) => {}
427                     Ok(oper) => match oper.index() {
428                         i if i == oper1 => {
429                             let _ = oper.send(&s1, 1);
430                             done = true;
431                         }
432                         _ => unreachable!(),
433                     },
434                 }
435                 if done {
436                     break;
437                 }
438 
439                 let mut sel = Select::new();
440                 let oper1 = sel.recv(&r_end);
441                 let oper = sel.try_select();
442                 match oper {
443                     Err(_) => {}
444                     Ok(oper) => match oper.index() {
445                         i if i == oper1 => {
446                             let _ = oper.recv(&r_end);
447                             done = true;
448                         }
449                         _ => unreachable!(),
450                     },
451                 }
452                 if done {
453                     break;
454                 }
455             });
456 
457             scope.spawn(|_| loop {
458                 if let Ok(x) = r2.try_recv() {
459                     assert_eq!(x, 2);
460                     break;
461                 }
462 
463                 let mut done = false;
464                 let mut sel = Select::new();
465                 let oper1 = sel.recv(&r_end);
466                 let oper = sel.try_select();
467                 match oper {
468                     Err(_) => {}
469                     Ok(oper) => match oper.index() {
470                         i if i == oper1 => {
471                             let _ = oper.recv(&r_end);
472                             done = true;
473                         }
474                         _ => unreachable!(),
475                     },
476                 }
477                 if done {
478                     break;
479                 }
480             });
481 
482             scope.spawn(|_| {
483                 thread::sleep(ms(500));
484 
485                 let mut sel = Select::new();
486                 let oper1 = sel.recv(&r1);
487                 let oper2 = sel.send(&s2);
488                 let oper = sel.select_timeout(ms(1000));
489                 match oper {
490                     Err(_) => {}
491                     Ok(oper) => match oper.index() {
492                         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493                         i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494                         _ => unreachable!(),
495                     },
496                 }
497 
498                 drop(s_end);
499             });
500         })
501         .unwrap();
502     }
503 }
504 
505 #[test]
cloning1()506 fn cloning1() {
507     scope(|scope| {
508         let (s1, r1) = unbounded::<i32>();
509         let (_s2, r2) = unbounded::<i32>();
510         let (s3, r3) = unbounded::<()>();
511 
512         scope.spawn(move |_| {
513             r3.recv().unwrap();
514             drop(s1.clone());
515             assert!(r3.try_recv().is_err());
516             s1.send(1).unwrap();
517             r3.recv().unwrap();
518         });
519 
520         s3.send(()).unwrap();
521 
522         let mut sel = Select::new();
523         let oper1 = sel.recv(&r1);
524         let oper2 = sel.recv(&r2);
525         let oper = sel.select();
526         match oper.index() {
527             i if i == oper1 => drop(oper.recv(&r1)),
528             i if i == oper2 => drop(oper.recv(&r2)),
529             _ => unreachable!(),
530         }
531 
532         s3.send(()).unwrap();
533     })
534     .unwrap();
535 }
536 
537 #[test]
cloning2()538 fn cloning2() {
539     let (s1, r1) = unbounded::<()>();
540     let (s2, r2) = unbounded::<()>();
541     let (_s3, _r3) = unbounded::<()>();
542 
543     scope(|scope| {
544         scope.spawn(move |_| {
545             let mut sel = Select::new();
546             let oper1 = sel.recv(&r1);
547             let oper2 = sel.recv(&r2);
548             let oper = sel.select();
549             match oper.index() {
550                 i if i == oper1 => panic!(),
551                 i if i == oper2 => drop(oper.recv(&r2)),
552                 _ => unreachable!(),
553             }
554         });
555 
556         thread::sleep(ms(500));
557         drop(s1.clone());
558         s2.send(()).unwrap();
559     })
560     .unwrap();
561 }
562 
563 #[test]
preflight1()564 fn preflight1() {
565     let (s, r) = unbounded();
566     s.send(()).unwrap();
567 
568     let mut sel = Select::new();
569     let oper1 = sel.recv(&r);
570     let oper = sel.select();
571     match oper.index() {
572         i if i == oper1 => drop(oper.recv(&r)),
573         _ => unreachable!(),
574     }
575 }
576 
577 #[test]
preflight2()578 fn preflight2() {
579     let (s, r) = unbounded();
580     drop(s.clone());
581     s.send(()).unwrap();
582     drop(s);
583 
584     let mut sel = Select::new();
585     let oper1 = sel.recv(&r);
586     let oper = sel.select();
587     match oper.index() {
588         i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589         _ => unreachable!(),
590     }
591 
592     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594 
595 #[test]
preflight3()596 fn preflight3() {
597     let (s, r) = unbounded();
598     drop(s.clone());
599     s.send(()).unwrap();
600     drop(s);
601     r.recv().unwrap();
602 
603     let mut sel = Select::new();
604     let oper1 = sel.recv(&r);
605     let oper = sel.select();
606     match oper.index() {
607         i if i == oper1 => assert!(oper.recv(&r).is_err()),
608         _ => unreachable!(),
609     }
610 }
611 
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614     let (s, r) = unbounded::<i32>();
615     let hit = vec![Cell::new(false); 4];
616 
617     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618         let mut sel = Select::new();
619         let oper0 = sel.recv(&r);
620         let oper1 = sel.recv(&r);
621         let oper2 = sel.send(&s);
622         let oper3 = sel.send(&s);
623         let oper = sel.select();
624         match oper.index() {
625             i if i == oper0 => {
626                 assert!(oper.recv(&r).is_ok());
627                 hit[0].set(true);
628             }
629             i if i == oper1 => {
630                 assert!(oper.recv(&r).is_ok());
631                 hit[1].set(true);
632             }
633             i if i == oper2 => {
634                 assert!(oper.send(&s, 0).is_ok());
635                 hit[2].set(true);
636             }
637             i if i == oper3 => {
638                 assert!(oper.send(&s, 0).is_ok());
639                 hit[3].set(true);
640             }
641             _ => unreachable!(),
642         }
643     }
644 }
645 
646 #[test]
nesting()647 fn nesting() {
648     let (s, r) = unbounded::<i32>();
649 
650     let mut sel = Select::new();
651     let oper1 = sel.send(&s);
652     let oper = sel.select();
653     match oper.index() {
654         i if i == oper1 => {
655             assert!(oper.send(&s, 0).is_ok());
656 
657             let mut sel = Select::new();
658             let oper1 = sel.recv(&r);
659             let oper = sel.select();
660             match oper.index() {
661                 i if i == oper1 => {
662                     assert_eq!(oper.recv(&r), Ok(0));
663 
664                     let mut sel = Select::new();
665                     let oper1 = sel.send(&s);
666                     let oper = sel.select();
667                     match oper.index() {
668                         i if i == oper1 => {
669                             assert!(oper.send(&s, 1).is_ok());
670 
671                             let mut sel = Select::new();
672                             let oper1 = sel.recv(&r);
673                             let oper = sel.select();
674                             match oper.index() {
675                                 i if i == oper1 => {
676                                     assert_eq!(oper.recv(&r), Ok(1));
677                                 }
678                                 _ => unreachable!(),
679                             }
680                         }
681                         _ => unreachable!(),
682                     }
683                 }
684                 _ => unreachable!(),
685             }
686         }
687         _ => unreachable!(),
688     }
689 }
690 
691 #[test]
stress_recv()692 fn stress_recv() {
693     const COUNT: usize = 10_000;
694 
695     let (s1, r1) = unbounded();
696     let (s2, r2) = bounded(5);
697     let (s3, r3) = bounded(100);
698 
699     scope(|scope| {
700         scope.spawn(|_| {
701             for i in 0..COUNT {
702                 s1.send(i).unwrap();
703                 r3.recv().unwrap();
704 
705                 s2.send(i).unwrap();
706                 r3.recv().unwrap();
707             }
708         });
709 
710         for i in 0..COUNT {
711             for _ in 0..2 {
712                 let mut sel = Select::new();
713                 let oper1 = sel.recv(&r1);
714                 let oper2 = sel.recv(&r2);
715                 let oper = sel.select();
716                 match oper.index() {
717                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
718                     ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
719                     _ => unreachable!(),
720                 }
721 
722                 s3.send(()).unwrap();
723             }
724         }
725     })
726     .unwrap();
727 }
728 
729 #[test]
stress_send()730 fn stress_send() {
731     const COUNT: usize = 10_000;
732 
733     let (s1, r1) = bounded(0);
734     let (s2, r2) = bounded(0);
735     let (s3, r3) = bounded(100);
736 
737     scope(|scope| {
738         scope.spawn(|_| {
739             for i in 0..COUNT {
740                 assert_eq!(r1.recv().unwrap(), i);
741                 assert_eq!(r2.recv().unwrap(), i);
742                 r3.recv().unwrap();
743             }
744         });
745 
746         for i in 0..COUNT {
747             for _ in 0..2 {
748                 let mut sel = Select::new();
749                 let oper1 = sel.send(&s1);
750                 let oper2 = sel.send(&s2);
751                 let oper = sel.select();
752                 match oper.index() {
753                     ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
754                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
755                     _ => unreachable!(),
756                 }
757             }
758             s3.send(()).unwrap();
759         }
760     })
761     .unwrap();
762 }
763 
764 #[test]
stress_mixed()765 fn stress_mixed() {
766     const COUNT: usize = 10_000;
767 
768     let (s1, r1) = bounded(0);
769     let (s2, r2) = bounded(0);
770     let (s3, r3) = bounded(100);
771 
772     scope(|scope| {
773         scope.spawn(|_| {
774             for i in 0..COUNT {
775                 s1.send(i).unwrap();
776                 assert_eq!(r2.recv().unwrap(), i);
777                 r3.recv().unwrap();
778             }
779         });
780 
781         for i in 0..COUNT {
782             for _ in 0..2 {
783                 let mut sel = Select::new();
784                 let oper1 = sel.recv(&r1);
785                 let oper2 = sel.send(&s2);
786                 let oper = sel.select();
787                 match oper.index() {
788                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
789                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
790                     _ => unreachable!(),
791                 }
792             }
793             s3.send(()).unwrap();
794         }
795     })
796     .unwrap();
797 }
798 
799 #[test]
stress_timeout_two_threads()800 fn stress_timeout_two_threads() {
801     const COUNT: usize = 20;
802 
803     let (s, r) = bounded(2);
804 
805     scope(|scope| {
806         scope.spawn(|_| {
807             for i in 0..COUNT {
808                 if i % 2 == 0 {
809                     thread::sleep(ms(500));
810                 }
811 
812                 let done = false;
813                 while !done {
814                     let mut sel = Select::new();
815                     let oper1 = sel.send(&s);
816                     let oper = sel.select_timeout(ms(100));
817                     match oper {
818                         Err(_) => {}
819                         Ok(oper) => match oper.index() {
820                             ix if ix == oper1 => {
821                                 assert!(oper.send(&s, i).is_ok());
822                                 break;
823                             }
824                             _ => unreachable!(),
825                         },
826                     }
827                 }
828             }
829         });
830 
831         scope.spawn(|_| {
832             for i in 0..COUNT {
833                 if i % 2 == 0 {
834                     thread::sleep(ms(500));
835                 }
836 
837                 let mut done = false;
838                 while !done {
839                     let mut sel = Select::new();
840                     let oper1 = sel.recv(&r);
841                     let oper = sel.select_timeout(ms(100));
842                     match oper {
843                         Err(_) => {}
844                         Ok(oper) => match oper.index() {
845                             ix if ix == oper1 => {
846                                 assert_eq!(oper.recv(&r), Ok(i));
847                                 done = true;
848                             }
849                             _ => unreachable!(),
850                         },
851                     }
852                 }
853             }
854         });
855     })
856     .unwrap();
857 }
858 
859 #[test]
send_recv_same_channel()860 fn send_recv_same_channel() {
861     let (s, r) = bounded::<i32>(0);
862     let mut sel = Select::new();
863     let oper1 = sel.send(&s);
864     let oper2 = sel.recv(&r);
865     let oper = sel.select_timeout(ms(100));
866     match oper {
867         Err(_) => {}
868         Ok(oper) => match oper.index() {
869             ix if ix == oper1 => panic!(),
870             ix if ix == oper2 => panic!(),
871             _ => unreachable!(),
872         },
873     }
874 
875     let (s, r) = unbounded::<i32>();
876     let mut sel = Select::new();
877     let oper1 = sel.send(&s);
878     let oper2 = sel.recv(&r);
879     let oper = sel.select_timeout(ms(100));
880     match oper {
881         Err(_) => panic!(),
882         Ok(oper) => match oper.index() {
883             ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
884             ix if ix == oper2 => panic!(),
885             _ => unreachable!(),
886         },
887     }
888 }
889 
890 #[test]
matching()891 fn matching() {
892     const THREADS: usize = 44;
893 
894     let (s, r) = &bounded::<usize>(0);
895 
896     scope(|scope| {
897         for i in 0..THREADS {
898             scope.spawn(move |_| {
899                 let mut sel = Select::new();
900                 let oper1 = sel.recv(&r);
901                 let oper2 = sel.send(&s);
902                 let oper = sel.select();
903                 match oper.index() {
904                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
905                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
906                     _ => unreachable!(),
907                 }
908             });
909         }
910     })
911     .unwrap();
912 
913     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
914 }
915 
916 #[test]
matching_with_leftover()917 fn matching_with_leftover() {
918     const THREADS: usize = 55;
919 
920     let (s, r) = &bounded::<usize>(0);
921 
922     scope(|scope| {
923         for i in 0..THREADS {
924             scope.spawn(move |_| {
925                 let mut sel = Select::new();
926                 let oper1 = sel.recv(&r);
927                 let oper2 = sel.send(&s);
928                 let oper = sel.select();
929                 match oper.index() {
930                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
931                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
932                     _ => unreachable!(),
933                 }
934             });
935         }
936         s.send(!0).unwrap();
937     })
938     .unwrap();
939 
940     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
941 }
942 
943 #[test]
channel_through_channel()944 fn channel_through_channel() {
945     const COUNT: usize = 1000;
946 
947     type T = Box<dyn Any + Send>;
948 
949     for cap in 0..3 {
950         let (s, r) = bounded::<T>(cap);
951 
952         scope(|scope| {
953             scope.spawn(move |_| {
954                 let mut s = s;
955 
956                 for _ in 0..COUNT {
957                     let (new_s, new_r) = bounded(cap);
958                     let new_r: T = Box::new(Some(new_r));
959 
960                     {
961                         let mut sel = Select::new();
962                         let oper1 = sel.send(&s);
963                         let oper = sel.select();
964                         match oper.index() {
965                             ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
966                             _ => unreachable!(),
967                         }
968                     }
969 
970                     s = new_s;
971                 }
972             });
973 
974             scope.spawn(move |_| {
975                 let mut r = r;
976 
977                 for _ in 0..COUNT {
978                     let new = {
979                         let mut sel = Select::new();
980                         let oper1 = sel.recv(&r);
981                         let oper = sel.select();
982                         match oper.index() {
983                             ix if ix == oper1 => oper
984                                 .recv(&r)
985                                 .unwrap()
986                                 .downcast_mut::<Option<Receiver<T>>>()
987                                 .unwrap()
988                                 .take()
989                                 .unwrap(),
990                             _ => unreachable!(),
991                         }
992                     };
993                     r = new;
994                 }
995             });
996         })
997         .unwrap();
998     }
999 }
1000 
1001 #[test]
linearizable_try()1002 fn linearizable_try() {
1003     const COUNT: usize = 100_000;
1004 
1005     for step in 0..2 {
1006         let (start_s, start_r) = bounded::<()>(0);
1007         let (end_s, end_r) = bounded::<()>(0);
1008 
1009         let ((s1, r1), (s2, r2)) = if step == 0 {
1010             (bounded::<i32>(1), bounded::<i32>(1))
1011         } else {
1012             (unbounded::<i32>(), unbounded::<i32>())
1013         };
1014 
1015         scope(|scope| {
1016             scope.spawn(|_| {
1017                 for _ in 0..COUNT {
1018                     start_s.send(()).unwrap();
1019 
1020                     s1.send(1).unwrap();
1021 
1022                     let mut sel = Select::new();
1023                     let oper1 = sel.recv(&r1);
1024                     let oper2 = sel.recv(&r2);
1025                     let oper = sel.try_select();
1026                     match oper {
1027                         Err(_) => unreachable!(),
1028                         Ok(oper) => match oper.index() {
1029                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1030                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1031                             _ => unreachable!(),
1032                         },
1033                     }
1034 
1035                     end_s.send(()).unwrap();
1036                     let _ = r2.try_recv();
1037                 }
1038             });
1039 
1040             for _ in 0..COUNT {
1041                 start_r.recv().unwrap();
1042 
1043                 s2.send(1).unwrap();
1044                 let _ = r1.try_recv();
1045 
1046                 end_r.recv().unwrap();
1047             }
1048         })
1049         .unwrap();
1050     }
1051 }
1052 
1053 #[test]
linearizable_timeout()1054 fn linearizable_timeout() {
1055     const COUNT: usize = 100_000;
1056 
1057     for step in 0..2 {
1058         let (start_s, start_r) = bounded::<()>(0);
1059         let (end_s, end_r) = bounded::<()>(0);
1060 
1061         let ((s1, r1), (s2, r2)) = if step == 0 {
1062             (bounded::<i32>(1), bounded::<i32>(1))
1063         } else {
1064             (unbounded::<i32>(), unbounded::<i32>())
1065         };
1066 
1067         scope(|scope| {
1068             scope.spawn(|_| {
1069                 for _ in 0..COUNT {
1070                     start_s.send(()).unwrap();
1071 
1072                     s1.send(1).unwrap();
1073 
1074                     let mut sel = Select::new();
1075                     let oper1 = sel.recv(&r1);
1076                     let oper2 = sel.recv(&r2);
1077                     let oper = sel.select_timeout(ms(0));
1078                     match oper {
1079                         Err(_) => unreachable!(),
1080                         Ok(oper) => match oper.index() {
1081                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1082                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1083                             _ => unreachable!(),
1084                         },
1085                     }
1086 
1087                     end_s.send(()).unwrap();
1088                     let _ = r2.try_recv();
1089                 }
1090             });
1091 
1092             for _ in 0..COUNT {
1093                 start_r.recv().unwrap();
1094 
1095                 s2.send(1).unwrap();
1096                 let _ = r1.try_recv();
1097 
1098                 end_r.recv().unwrap();
1099             }
1100         })
1101         .unwrap();
1102     }
1103 }
1104 
1105 #[test]
fairness1()1106 fn fairness1() {
1107     const COUNT: usize = 10_000;
1108 
1109     let (s1, r1) = bounded::<()>(COUNT);
1110     let (s2, r2) = unbounded::<()>();
1111 
1112     for _ in 0..COUNT {
1113         s1.send(()).unwrap();
1114         s2.send(()).unwrap();
1115     }
1116 
1117     let hits = vec![Cell::new(0usize); 4];
1118     for _ in 0..COUNT {
1119         let after = after(ms(0));
1120         let tick = tick(ms(0));
1121 
1122         let mut sel = Select::new();
1123         let oper1 = sel.recv(&r1);
1124         let oper2 = sel.recv(&r2);
1125         let oper3 = sel.recv(&after);
1126         let oper4 = sel.recv(&tick);
1127         let oper = sel.select();
1128         match oper.index() {
1129             i if i == oper1 => {
1130                 oper.recv(&r1).unwrap();
1131                 hits[0].set(hits[0].get() + 1);
1132             }
1133             i if i == oper2 => {
1134                 oper.recv(&r2).unwrap();
1135                 hits[1].set(hits[1].get() + 1);
1136             }
1137             i if i == oper3 => {
1138                 oper.recv(&after).unwrap();
1139                 hits[2].set(hits[2].get() + 1);
1140             }
1141             i if i == oper4 => {
1142                 oper.recv(&tick).unwrap();
1143                 hits[3].set(hits[3].get() + 1);
1144             }
1145             _ => unreachable!(),
1146         }
1147     }
1148     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1149 }
1150 
1151 #[test]
fairness2()1152 fn fairness2() {
1153     const COUNT: usize = 10_000;
1154 
1155     let (s1, r1) = unbounded::<()>();
1156     let (s2, r2) = bounded::<()>(1);
1157     let (s3, r3) = bounded::<()>(0);
1158 
1159     scope(|scope| {
1160         scope.spawn(|_| {
1161             for _ in 0..COUNT {
1162                 let mut sel = Select::new();
1163                 let mut oper1 = None;
1164                 let mut oper2 = None;
1165                 if s1.is_empty() {
1166                     oper1 = Some(sel.send(&s1));
1167                 }
1168                 if s2.is_empty() {
1169                     oper2 = Some(sel.send(&s2));
1170                 }
1171                 let oper3 = sel.send(&s3);
1172                 let oper = sel.select();
1173                 match oper.index() {
1174                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1175                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1176                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1177                     _ => unreachable!(),
1178                 }
1179             }
1180         });
1181 
1182         let hits = vec![Cell::new(0usize); 3];
1183         for _ in 0..COUNT {
1184             let mut sel = Select::new();
1185             let oper1 = sel.recv(&r1);
1186             let oper2 = sel.recv(&r2);
1187             let oper3 = sel.recv(&r3);
1188             let oper = sel.select();
1189             match oper.index() {
1190                 i if i == oper1 => {
1191                     oper.recv(&r1).unwrap();
1192                     hits[0].set(hits[0].get() + 1);
1193                 }
1194                 i if i == oper2 => {
1195                     oper.recv(&r2).unwrap();
1196                     hits[1].set(hits[1].get() + 1);
1197                 }
1198                 i if i == oper3 => {
1199                     oper.recv(&r3).unwrap();
1200                     hits[2].set(hits[2].get() + 1);
1201                 }
1202                 _ => unreachable!(),
1203             }
1204         }
1205         assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1206     })
1207     .unwrap();
1208 }
1209 
1210 #[test]
sync_and_clone()1211 fn sync_and_clone() {
1212     const THREADS: usize = 20;
1213 
1214     let (s, r) = &bounded::<usize>(0);
1215 
1216     let mut sel = Select::new();
1217     let oper1 = sel.recv(&r);
1218     let oper2 = sel.send(&s);
1219     let sel = &sel;
1220 
1221     scope(|scope| {
1222         for i in 0..THREADS {
1223             scope.spawn(move |_| {
1224                 let mut sel = sel.clone();
1225                 let oper = sel.select();
1226                 match oper.index() {
1227                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1228                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1229                     _ => unreachable!(),
1230                 }
1231             });
1232         }
1233     })
1234     .unwrap();
1235 
1236     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1237 }
1238 
1239 #[test]
send_and_clone()1240 fn send_and_clone() {
1241     const THREADS: usize = 20;
1242 
1243     let (s, r) = &bounded::<usize>(0);
1244 
1245     let mut sel = Select::new();
1246     let oper1 = sel.recv(&r);
1247     let oper2 = sel.send(&s);
1248 
1249     scope(|scope| {
1250         for i in 0..THREADS {
1251             let mut sel = sel.clone();
1252             scope.spawn(move |_| {
1253                 let oper = sel.select();
1254                 match oper.index() {
1255                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1256                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1257                     _ => unreachable!(),
1258                 }
1259             });
1260         }
1261     })
1262     .unwrap();
1263 
1264     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1265 }
1266 
1267 #[test]
reuse()1268 fn reuse() {
1269     const COUNT: usize = 10_000;
1270 
1271     let (s1, r1) = bounded(0);
1272     let (s2, r2) = bounded(0);
1273     let (s3, r3) = bounded(100);
1274 
1275     scope(|scope| {
1276         scope.spawn(|_| {
1277             for i in 0..COUNT {
1278                 s1.send(i).unwrap();
1279                 assert_eq!(r2.recv().unwrap(), i);
1280                 r3.recv().unwrap();
1281             }
1282         });
1283 
1284         let mut sel = Select::new();
1285         let oper1 = sel.recv(&r1);
1286         let oper2 = sel.send(&s2);
1287 
1288         for i in 0..COUNT {
1289             for _ in 0..2 {
1290                 let oper = sel.select();
1291                 match oper.index() {
1292                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1293                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1294                     _ => unreachable!(),
1295                 }
1296             }
1297             s3.send(()).unwrap();
1298         }
1299     })
1300     .unwrap();
1301 }
1302