1 use std::future::Future;
2 use std::panic::catch_unwind;
3 use std::pin::Pin;
4 use std::sync::atomic::{AtomicUsize, Ordering};
5 use std::task::{Context, Poll};
6 use std::thread;
7 use std::time::Duration;
8 
9 use async_task::Runnable;
10 use easy_parallel::Parallel;
11 use smol::future;
12 
13 // Creates a future with event counters.
14 //
15 // Usage: `future!(f, POLL, DROP)`
16 //
17 // The future `f` sleeps for 200 ms and then panics.
18 // When it gets polled, `POLL` is incremented.
19 // When it gets dropped, `DROP` is incremented.
20 macro_rules! future {
21     ($name:pat, $poll:ident, $drop:ident) => {
22         static $poll: AtomicUsize = AtomicUsize::new(0);
23         static $drop: AtomicUsize = AtomicUsize::new(0);
24 
25         let $name = {
26             struct Fut(Box<i32>);
27 
28             impl Future for Fut {
29                 type Output = ();
30 
31                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
32                     $poll.fetch_add(1, Ordering::SeqCst);
33                     thread::sleep(ms(400));
34                     panic!()
35                 }
36             }
37 
38             impl Drop for Fut {
39                 fn drop(&mut self) {
40                     $drop.fetch_add(1, Ordering::SeqCst);
41                 }
42             }
43 
44             Fut(Box::new(0))
45         };
46     };
47 }
48 
49 // Creates a schedule function with event counters.
50 //
51 // Usage: `schedule!(s, SCHED, DROP)`
52 //
53 // The schedule function `s` does nothing.
54 // When it gets invoked, `SCHED` is incremented.
55 // When it gets dropped, `DROP` is incremented.
56 macro_rules! schedule {
57     ($name:pat, $sched:ident, $drop:ident) => {
58         static $drop: AtomicUsize = AtomicUsize::new(0);
59         static $sched: AtomicUsize = AtomicUsize::new(0);
60 
61         let $name = {
62             struct Guard(Box<i32>);
63 
64             impl Drop for Guard {
65                 fn drop(&mut self) {
66                     $drop.fetch_add(1, Ordering::SeqCst);
67                 }
68             }
69 
70             let guard = Guard(Box::new(0));
71             move |_runnable: Runnable| {
72                 &guard;
73                 $sched.fetch_add(1, Ordering::SeqCst);
74             }
75         };
76     };
77 }
78 
ms(ms: u64) -> Duration79 fn ms(ms: u64) -> Duration {
80     Duration::from_millis(ms)
81 }
82 
83 #[test]
cancel_during_run()84 fn cancel_during_run() {
85     future!(f, POLL, DROP_F);
86     schedule!(s, SCHEDULE, DROP_S);
87     let (runnable, task) = async_task::spawn(f, s);
88 
89     Parallel::new()
90         .add(|| {
91             assert!(catch_unwind(|| runnable.run()).is_err());
92             assert_eq!(POLL.load(Ordering::SeqCst), 1);
93             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
94             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
95             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
96         })
97         .add(|| {
98             thread::sleep(ms(200));
99 
100             drop(task);
101             assert_eq!(POLL.load(Ordering::SeqCst), 1);
102             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
103             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
104             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
105         })
106         .run();
107 }
108 
109 #[test]
run_and_join()110 fn run_and_join() {
111     future!(f, POLL, DROP_F);
112     schedule!(s, SCHEDULE, DROP_S);
113     let (runnable, task) = async_task::spawn(f, s);
114 
115     assert!(catch_unwind(|| runnable.run()).is_err());
116     assert_eq!(POLL.load(Ordering::SeqCst), 1);
117     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
118     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
119     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
120 
121     assert!(catch_unwind(|| future::block_on(task)).is_err());
122     assert_eq!(POLL.load(Ordering::SeqCst), 1);
123     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
124     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
125     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
126 }
127 
128 #[test]
try_join_and_run_and_join()129 fn try_join_and_run_and_join() {
130     future!(f, POLL, DROP_F);
131     schedule!(s, SCHEDULE, DROP_S);
132     let (runnable, mut task) = async_task::spawn(f, s);
133 
134     future::block_on(future::or(&mut task, future::ready(Default::default())));
135     assert_eq!(POLL.load(Ordering::SeqCst), 0);
136     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
137     assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
138     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
139 
140     assert!(catch_unwind(|| runnable.run()).is_err());
141     assert_eq!(POLL.load(Ordering::SeqCst), 1);
142     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
143     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
144     assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
145 
146     assert!(catch_unwind(|| future::block_on(task)).is_err());
147     assert_eq!(POLL.load(Ordering::SeqCst), 1);
148     assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
149     assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
150     assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
151 }
152 
153 #[test]
join_during_run()154 fn join_during_run() {
155     future!(f, POLL, DROP_F);
156     schedule!(s, SCHEDULE, DROP_S);
157     let (runnable, task) = async_task::spawn(f, s);
158 
159     Parallel::new()
160         .add(|| {
161             assert!(catch_unwind(|| runnable.run()).is_err());
162             assert_eq!(POLL.load(Ordering::SeqCst), 1);
163             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
164             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
165 
166             thread::sleep(ms(200));
167             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
168         })
169         .add(|| {
170             thread::sleep(ms(200));
171 
172             assert!(catch_unwind(|| future::block_on(task)).is_err());
173             assert_eq!(POLL.load(Ordering::SeqCst), 1);
174             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
175             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
176 
177             thread::sleep(ms(200));
178             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
179         })
180         .run();
181 }
182 
183 #[test]
try_join_during_run()184 fn try_join_during_run() {
185     future!(f, POLL, DROP_F);
186     schedule!(s, SCHEDULE, DROP_S);
187     let (runnable, mut task) = async_task::spawn(f, s);
188 
189     Parallel::new()
190         .add(|| {
191             assert!(catch_unwind(|| runnable.run()).is_err());
192             assert_eq!(POLL.load(Ordering::SeqCst), 1);
193             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
194             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
195             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
196         })
197         .add(|| {
198             thread::sleep(ms(200));
199 
200             future::block_on(future::or(&mut task, future::ready(Default::default())));
201             assert_eq!(POLL.load(Ordering::SeqCst), 1);
202             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
203             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
204             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
205             drop(task);
206         })
207         .run();
208 }
209 
210 #[test]
detach_during_run()211 fn detach_during_run() {
212     future!(f, POLL, DROP_F);
213     schedule!(s, SCHEDULE, DROP_S);
214     let (runnable, task) = async_task::spawn(f, s);
215 
216     Parallel::new()
217         .add(|| {
218             assert!(catch_unwind(|| runnable.run()).is_err());
219             assert_eq!(POLL.load(Ordering::SeqCst), 1);
220             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
221             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
222             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
223         })
224         .add(|| {
225             thread::sleep(ms(200));
226 
227             task.detach();
228             assert_eq!(POLL.load(Ordering::SeqCst), 1);
229             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
230             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
231             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
232         })
233         .run();
234 }
235