1 #![allow(clippy::cognitive_complexity)]
2 #![warn(rust_2018_idioms)]
3 #![cfg(feature = "full")]
4
5 use tokio::sync::watch;
6 use tokio_test::task::spawn;
7 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
8
9 #[test]
single_rx_recv()10 fn single_rx_recv() {
11 let (tx, mut rx) = watch::channel("one");
12
13 {
14 // Not initially notified
15 let mut t = spawn(rx.changed());
16 assert_pending!(t.poll());
17 }
18 assert_eq!(*rx.borrow(), "one");
19
20 {
21 let mut t = spawn(rx.changed());
22 assert_pending!(t.poll());
23
24 tx.send("two").unwrap();
25
26 assert!(t.is_woken());
27
28 assert_ready_ok!(t.poll());
29 }
30 assert_eq!(*rx.borrow(), "two");
31
32 {
33 let mut t = spawn(rx.changed());
34 assert_pending!(t.poll());
35
36 drop(tx);
37
38 assert!(t.is_woken());
39 assert_ready_err!(t.poll());
40 }
41 assert_eq!(*rx.borrow(), "two");
42 }
43
44 #[test]
multi_rx()45 fn multi_rx() {
46 let (tx, mut rx1) = watch::channel("one");
47 let mut rx2 = rx1.clone();
48
49 {
50 let mut t1 = spawn(rx1.changed());
51 let mut t2 = spawn(rx2.changed());
52
53 assert_pending!(t1.poll());
54 assert_pending!(t2.poll());
55 }
56 assert_eq!(*rx1.borrow(), "one");
57 assert_eq!(*rx2.borrow(), "one");
58
59 let mut t2 = spawn(rx2.changed());
60
61 {
62 let mut t1 = spawn(rx1.changed());
63
64 assert_pending!(t1.poll());
65 assert_pending!(t2.poll());
66
67 tx.send("two").unwrap();
68
69 assert!(t1.is_woken());
70 assert!(t2.is_woken());
71
72 assert_ready_ok!(t1.poll());
73 }
74 assert_eq!(*rx1.borrow(), "two");
75
76 {
77 let mut t1 = spawn(rx1.changed());
78
79 assert_pending!(t1.poll());
80
81 tx.send("three").unwrap();
82
83 assert!(t1.is_woken());
84 assert!(t2.is_woken());
85
86 assert_ready_ok!(t1.poll());
87 assert_ready_ok!(t2.poll());
88 }
89 assert_eq!(*rx1.borrow(), "three");
90
91 drop(t2);
92
93 assert_eq!(*rx2.borrow(), "three");
94
95 {
96 let mut t1 = spawn(rx1.changed());
97 let mut t2 = spawn(rx2.changed());
98
99 assert_pending!(t1.poll());
100 assert_pending!(t2.poll());
101
102 tx.send("four").unwrap();
103
104 assert_ready_ok!(t1.poll());
105 assert_ready_ok!(t2.poll());
106 }
107 assert_eq!(*rx1.borrow(), "four");
108 assert_eq!(*rx2.borrow(), "four");
109 }
110
111 #[test]
rx_observes_final_value()112 fn rx_observes_final_value() {
113 // Initial value
114
115 let (tx, mut rx) = watch::channel("one");
116 drop(tx);
117
118 {
119 let mut t1 = spawn(rx.changed());
120 assert_ready_err!(t1.poll());
121 }
122 assert_eq!(*rx.borrow(), "one");
123
124 // Sending a value
125
126 let (tx, mut rx) = watch::channel("one");
127
128 tx.send("two").unwrap();
129
130 {
131 let mut t1 = spawn(rx.changed());
132 assert_ready_ok!(t1.poll());
133 }
134 assert_eq!(*rx.borrow(), "two");
135
136 {
137 let mut t1 = spawn(rx.changed());
138 assert_pending!(t1.poll());
139
140 tx.send("three").unwrap();
141 drop(tx);
142
143 assert!(t1.is_woken());
144
145 assert_ready_ok!(t1.poll());
146 }
147 assert_eq!(*rx.borrow(), "three");
148
149 {
150 let mut t1 = spawn(rx.changed());
151 assert_ready_err!(t1.poll());
152 }
153 assert_eq!(*rx.borrow(), "three");
154 }
155
156 #[test]
poll_close()157 fn poll_close() {
158 let (tx, rx) = watch::channel("one");
159
160 {
161 let mut t = spawn(tx.closed());
162 assert_pending!(t.poll());
163
164 drop(rx);
165
166 assert!(t.is_woken());
167 assert_ready!(t.poll());
168 }
169
170 assert!(tx.send("two").is_err());
171 }
172