1 use std::cell::Cell;
2 use std::future::Future;
3 use std::panic::{catch_unwind, AssertUnwindSafe};
4 use std::pin::Pin;
5 use std::sync::atomic::{AtomicUsize, Ordering};
6 use std::task::{Context, Poll};
7 use std::thread;
8 use std::time::Duration;
9
10 use async_task::Runnable;
11 use atomic_waker::AtomicWaker;
12 use easy_parallel::Parallel;
13 use smol::future;
14
15 // Creates a future with event counters.
16 //
17 // Usage: `future!(f, get_waker, POLL, DROP)`
18 //
19 // The future `f` always sleeps for 200 ms, and panics the second time it is polled.
20 // When it gets polled, `POLL` is incremented.
21 // When it gets dropped, `DROP` is incremented.
22 //
23 // Every time the future is run, it stores the waker into a global variable.
24 // This waker can be extracted using the `get_waker()` function.
25 macro_rules! future {
26 ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => {
27 static $poll: AtomicUsize = AtomicUsize::new(0);
28 static $drop: AtomicUsize = AtomicUsize::new(0);
29 static WAKER: AtomicWaker = AtomicWaker::new();
30
31 let ($name, $get_waker) = {
32 struct Fut(Cell<bool>, Box<i32>);
33
34 impl Future for Fut {
35 type Output = ();
36
37 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
38 WAKER.register(cx.waker());
39 $poll.fetch_add(1, Ordering::SeqCst);
40 thread::sleep(ms(400));
41
42 if self.0.get() {
43 panic!()
44 } else {
45 self.0.set(true);
46 Poll::Pending
47 }
48 }
49 }
50
51 impl Drop for Fut {
52 fn drop(&mut self) {
53 $drop.fetch_add(1, Ordering::SeqCst);
54 }
55 }
56
57 (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap())
58 };
59 };
60 }
61
62 // Creates a schedule function with event counters.
63 //
64 // Usage: `schedule!(s, chan, SCHED, DROP)`
65 //
66 // The schedule function `s` pushes the task into `chan`.
67 // When it gets invoked, `SCHED` is incremented.
68 // When it gets dropped, `DROP` is incremented.
69 //
70 // Receiver `chan` extracts the task when it is scheduled.
71 macro_rules! schedule {
72 ($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
73 static $drop: AtomicUsize = AtomicUsize::new(0);
74 static $sched: AtomicUsize = AtomicUsize::new(0);
75
76 let ($name, $chan) = {
77 let (s, r) = flume::unbounded();
78
79 struct Guard(Box<i32>);
80
81 impl Drop for Guard {
82 fn drop(&mut self) {
83 $drop.fetch_add(1, Ordering::SeqCst);
84 }
85 }
86
87 let guard = Guard(Box::new(0));
88 let sched = move |runnable: Runnable| {
89 &guard;
90 $sched.fetch_add(1, Ordering::SeqCst);
91 s.send(runnable).unwrap();
92 };
93
94 (sched, r)
95 };
96 };
97 }
98
ms(ms: u64) -> Duration99 fn ms(ms: u64) -> Duration {
100 Duration::from_millis(ms)
101 }
102
try_await<T>(f: impl Future<Output = T>) -> Option<T>103 fn try_await<T>(f: impl Future<Output = T>) -> Option<T> {
104 future::block_on(future::poll_once(f))
105 }
106
107 #[test]
wake_during_run()108 fn wake_during_run() {
109 future!(f, get_waker, POLL, DROP_F);
110 schedule!(s, chan, SCHEDULE, DROP_S);
111 let (runnable, task) = async_task::spawn(f, s);
112
113 runnable.run();
114 let waker = get_waker();
115 waker.wake_by_ref();
116 let runnable = chan.recv().unwrap();
117
118 Parallel::new()
119 .add(|| {
120 assert!(catch_unwind(|| runnable.run()).is_err());
121 drop(get_waker());
122 assert_eq!(POLL.load(Ordering::SeqCst), 2);
123 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
124 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
125 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
126 assert_eq!(chan.len(), 0);
127 })
128 .add(|| {
129 thread::sleep(ms(200));
130
131 waker.wake();
132 task.detach();
133 assert_eq!(POLL.load(Ordering::SeqCst), 2);
134 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
135 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
136 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
137 assert_eq!(chan.len(), 0);
138
139 thread::sleep(ms(400));
140
141 assert_eq!(POLL.load(Ordering::SeqCst), 2);
142 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
143 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
144 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
145 assert_eq!(chan.len(), 0);
146 })
147 .run();
148 }
149
150 #[test]
cancel_during_run()151 fn cancel_during_run() {
152 future!(f, get_waker, POLL, DROP_F);
153 schedule!(s, chan, SCHEDULE, DROP_S);
154 let (runnable, task) = async_task::spawn(f, s);
155
156 runnable.run();
157 let waker = get_waker();
158 waker.wake();
159 let runnable = chan.recv().unwrap();
160
161 Parallel::new()
162 .add(|| {
163 assert!(catch_unwind(|| runnable.run()).is_err());
164 drop(get_waker());
165 assert_eq!(POLL.load(Ordering::SeqCst), 2);
166 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
167 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
168 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
169 assert_eq!(chan.len(), 0);
170 })
171 .add(|| {
172 thread::sleep(ms(200));
173
174 drop(task);
175 assert_eq!(POLL.load(Ordering::SeqCst), 2);
176 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
177 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
178 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
179 assert_eq!(chan.len(), 0);
180
181 thread::sleep(ms(400));
182
183 assert_eq!(POLL.load(Ordering::SeqCst), 2);
184 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
185 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
186 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
187 assert_eq!(chan.len(), 0);
188 })
189 .run();
190 }
191
192 #[test]
wake_and_cancel_during_run()193 fn wake_and_cancel_during_run() {
194 future!(f, get_waker, POLL, DROP_F);
195 schedule!(s, chan, SCHEDULE, DROP_S);
196 let (runnable, task) = async_task::spawn(f, s);
197
198 runnable.run();
199 let waker = get_waker();
200 waker.wake_by_ref();
201 let runnable = chan.recv().unwrap();
202
203 Parallel::new()
204 .add(|| {
205 assert!(catch_unwind(|| runnable.run()).is_err());
206 drop(get_waker());
207 assert_eq!(POLL.load(Ordering::SeqCst), 2);
208 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
209 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
210 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
211 assert_eq!(chan.len(), 0);
212 })
213 .add(|| {
214 thread::sleep(ms(200));
215
216 waker.wake();
217 assert_eq!(POLL.load(Ordering::SeqCst), 2);
218 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
219 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
220 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
221 assert_eq!(chan.len(), 0);
222
223 drop(task);
224 assert_eq!(POLL.load(Ordering::SeqCst), 2);
225 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
226 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
227 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
228 assert_eq!(chan.len(), 0);
229
230 thread::sleep(ms(400));
231
232 assert_eq!(POLL.load(Ordering::SeqCst), 2);
233 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
234 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
235 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
236 assert_eq!(chan.len(), 0);
237 })
238 .run();
239 }
240
241 #[test]
cancel_and_wake_during_run()242 fn cancel_and_wake_during_run() {
243 future!(f, get_waker, POLL, DROP_F);
244 schedule!(s, chan, SCHEDULE, DROP_S);
245 let (runnable, task) = async_task::spawn(f, s);
246
247 runnable.run();
248 let waker = get_waker();
249 waker.wake_by_ref();
250 let runnable = chan.recv().unwrap();
251
252 Parallel::new()
253 .add(|| {
254 assert!(catch_unwind(|| runnable.run()).is_err());
255 drop(get_waker());
256 assert_eq!(POLL.load(Ordering::SeqCst), 2);
257 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
258 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
259 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
260 assert_eq!(chan.len(), 0);
261 })
262 .add(|| {
263 thread::sleep(ms(200));
264
265 drop(task);
266 assert_eq!(POLL.load(Ordering::SeqCst), 2);
267 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
268 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
269 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
270 assert_eq!(chan.len(), 0);
271
272 waker.wake();
273 assert_eq!(POLL.load(Ordering::SeqCst), 2);
274 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
275 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
276 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
277 assert_eq!(chan.len(), 0);
278
279 thread::sleep(ms(400));
280
281 assert_eq!(POLL.load(Ordering::SeqCst), 2);
282 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
283 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
284 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
285 assert_eq!(chan.len(), 0);
286 })
287 .run();
288 }
289
290 #[test]
panic_and_poll()291 fn panic_and_poll() {
292 future!(f, get_waker, POLL, DROP_F);
293 schedule!(s, chan, SCHEDULE, DROP_S);
294 let (runnable, task) = async_task::spawn(f, s);
295
296 runnable.run();
297 get_waker().wake();
298 assert_eq!(POLL.load(Ordering::SeqCst), 1);
299 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
300 assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
301 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
302
303 let mut task = task;
304 assert!(try_await(&mut task).is_none());
305
306 let runnable = chan.recv().unwrap();
307 assert!(catch_unwind(|| runnable.run()).is_err());
308 assert_eq!(POLL.load(Ordering::SeqCst), 2);
309 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
310 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
311 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
312
313 assert!(catch_unwind(AssertUnwindSafe(|| try_await(&mut task))).is_err());
314 assert_eq!(POLL.load(Ordering::SeqCst), 2);
315 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
316 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
317 assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
318
319 drop(get_waker());
320 drop(task);
321 assert_eq!(POLL.load(Ordering::SeqCst), 2);
322 assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1);
323 assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
324 assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
325 }
326