1 use std::future::Future;
2 use std::pin::Pin;
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::task::{Context, Poll};
5 use std::thread;
6 use std::time::Duration;
7 
8 use async_task::Runnable;
9 use easy_parallel::Parallel;
10 use smol::future;
11 
12 // Creates a future with event counters.
13 //
14 // Usage: `future!(f, POLL, DROP_F, DROP_T)`
15 //
16 // The future `f` sleeps for 200 ms and outputs `Poll::Ready`.
17 // When it gets polled, `POLL` is incremented.
18 // When it gets dropped, `DROP_F` is incremented.
19 // When the output gets dropped, `DROP_T` is incremented.
20 macro_rules! future {
21     ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => {
22         static $poll: AtomicUsize = AtomicUsize::new(0);
23         static $drop_f: AtomicUsize = AtomicUsize::new(0);
24         static $drop_t: AtomicUsize = AtomicUsize::new(0);
25 
26         let $name = {
27             struct Fut(Box<i32>);
28 
29             impl Future for Fut {
30                 type Output = Out;
31 
32                 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
33                     $poll.fetch_add(1, Ordering::SeqCst);
34                     thread::sleep(ms(400));
35                     Poll::Ready(Out(Box::new(0), true))
36                 }
37             }
38 
39             impl Drop for Fut {
40                 fn drop(&mut self) {
41                     $drop_f.fetch_add(1, Ordering::SeqCst);
42                 }
43             }
44 
45             #[derive(Default)]
46             struct Out(Box<i32>, bool);
47 
48             impl Drop for Out {
49                 fn drop(&mut self) {
50                     if self.1 {
51                         $drop_t.fetch_add(1, Ordering::SeqCst);
52                     }
53                 }
54             }
55 
56             Fut(Box::new(0))
57         };
58     };
59 }
60 
61 // Creates a schedule function with event counters.
62 //
63 // Usage: `schedule!(s, SCHED, DROP)`
64 //
65 // The schedule function `s` does nothing.
66 // When it gets invoked, `SCHED` is incremented.
67 // When it gets dropped, `DROP` is incremented.
68 macro_rules! schedule {
69     ($name:pat, $sched:ident, $drop:ident) => {
70         static $drop: AtomicUsize = AtomicUsize::new(0);
71         static $sched: AtomicUsize = AtomicUsize::new(0);
72 
73         let $name = {
74             struct Guard(Box<i32>);
75 
76             impl Drop for Guard {
77                 fn drop(&mut self) {
78                     $drop.fetch_add(1, Ordering::SeqCst);
79                 }
80             }
81 
82             let guard = Guard(Box::new(0));
83             move |_runnable: Runnable| {
84                 &guard;
85                 $sched.fetch_add(1, Ordering::SeqCst);
86             }
87         };
88     };
89 }
90 
ms(ms: u64) -> Duration91 fn ms(ms: u64) -> Duration {
92     Duration::from_millis(ms)
93 }
94 
95 #[test]
cancel_during_run()96 fn cancel_during_run() {
97     future!(f, POLL, DROP_F, DROP_T);
98     schedule!(s, SCHEDULE, DROP_S);
99     let (runnable, task) = async_task::spawn(f, s);
100 
101     Parallel::new()
102         .add(|| {
103             runnable.run();
104             assert_eq!(POLL.load(Ordering::SeqCst), 1);
105             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
106             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
107             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
108             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
109         })
110         .add(|| {
111             thread::sleep(ms(200));
112 
113             assert_eq!(POLL.load(Ordering::SeqCst), 1);
114             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
115             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
116             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
117             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
118 
119             drop(task);
120             assert_eq!(POLL.load(Ordering::SeqCst), 1);
121             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
122             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
123             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
124             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
125 
126             thread::sleep(ms(400));
127 
128             assert_eq!(POLL.load(Ordering::SeqCst), 1);
129             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
130             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
131             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
132             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
133         })
134         .run();
135 }
136 
137 #[test]
join_during_run()138 fn join_during_run() {
139     future!(f, POLL, DROP_F, DROP_T);
140     schedule!(s, SCHEDULE, DROP_S);
141     let (runnable, task) = async_task::spawn(f, s);
142 
143     Parallel::new()
144         .add(|| {
145             runnable.run();
146             assert_eq!(POLL.load(Ordering::SeqCst), 1);
147             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
148             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
149 
150             thread::sleep(ms(200));
151 
152             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
153         })
154         .add(|| {
155             thread::sleep(ms(200));
156 
157             future::block_on(task);
158             assert_eq!(POLL.load(Ordering::SeqCst), 1);
159             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
160             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
161             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
162 
163             thread::sleep(ms(200));
164 
165             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
166         })
167         .run();
168 }
169 
170 #[test]
try_join_during_run()171 fn try_join_during_run() {
172     future!(f, POLL, DROP_F, DROP_T);
173     schedule!(s, SCHEDULE, DROP_S);
174     let (runnable, mut task) = async_task::spawn(f, s);
175 
176     Parallel::new()
177         .add(|| {
178             runnable.run();
179             assert_eq!(POLL.load(Ordering::SeqCst), 1);
180             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
181             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
182             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
183             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
184         })
185         .add(|| {
186             thread::sleep(ms(200));
187 
188             future::block_on(future::or(&mut task, future::ready(Default::default())));
189             assert_eq!(POLL.load(Ordering::SeqCst), 1);
190             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
191             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
192             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
193             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
194             drop(task);
195         })
196         .run();
197 }
198 
199 #[test]
detach_during_run()200 fn detach_during_run() {
201     future!(f, POLL, DROP_F, DROP_T);
202     schedule!(s, SCHEDULE, DROP_S);
203     let (runnable, task) = async_task::spawn(f, s);
204 
205     Parallel::new()
206         .add(|| {
207             runnable.run();
208             assert_eq!(POLL.load(Ordering::SeqCst), 1);
209             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
210             assert_eq!(DROP_F.load(Ordering::SeqCst), 1);
211             assert_eq!(DROP_S.load(Ordering::SeqCst), 1);
212             assert_eq!(DROP_T.load(Ordering::SeqCst), 1);
213         })
214         .add(|| {
215             thread::sleep(ms(200));
216 
217             task.detach();
218             assert_eq!(POLL.load(Ordering::SeqCst), 1);
219             assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0);
220             assert_eq!(DROP_F.load(Ordering::SeqCst), 0);
221             assert_eq!(DROP_S.load(Ordering::SeqCst), 0);
222             assert_eq!(DROP_T.load(Ordering::SeqCst), 0);
223         })
224         .run();
225 }
226