1 /// Full runtime loom tests. These are heavy tests and take significant time to
2 /// run on CI.
3 ///
4 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
5 ///
6 /// In order to speed up the C
7 use crate::future::poll_fn;
8 use crate::runtime::tests::loom_oneshot as oneshot;
9 use crate::runtime::{self, Runtime};
10 use crate::{spawn, task};
11 use tokio_test::assert_ok;
12 
13 use loom::sync::atomic::{AtomicBool, AtomicUsize};
14 use loom::sync::{Arc, Mutex};
15 
16 use pin_project_lite::pin_project;
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
20 use std::task::{Context, Poll};
21 
22 /// Tests are divided into groups to make the runs faster on CI.
23 mod group_a {
24     use super::*;
25 
26     #[test]
racy_shutdown()27     fn racy_shutdown() {
28         loom::model(|| {
29             let pool = mk_pool(1);
30 
31             // here's the case we want to exercise:
32             //
33             // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
34             // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
35             // shutdown method never gets run.
36             //
37             // we do this by spawning two tasks on one worker, the first of which does block_in_place,
38             // and then immediately drop the pool.
39 
40             pool.spawn(track(async {
41                 crate::task::block_in_place(|| {});
42             }));
43             pool.spawn(track(async {}));
44             drop(pool);
45         });
46     }
47 
48     #[test]
pool_multi_spawn()49     fn pool_multi_spawn() {
50         loom::model(|| {
51             let pool = mk_pool(2);
52             let c1 = Arc::new(AtomicUsize::new(0));
53 
54             let (tx, rx) = oneshot::channel();
55             let tx1 = Arc::new(Mutex::new(Some(tx)));
56 
57             // Spawn a task
58             let c2 = c1.clone();
59             let tx2 = tx1.clone();
60             pool.spawn(track(async move {
61                 spawn(track(async move {
62                     if 1 == c1.fetch_add(1, Relaxed) {
63                         tx1.lock().unwrap().take().unwrap().send(());
64                     }
65                 }));
66             }));
67 
68             // Spawn a second task
69             pool.spawn(track(async move {
70                 spawn(track(async move {
71                     if 1 == c2.fetch_add(1, Relaxed) {
72                         tx2.lock().unwrap().take().unwrap().send(());
73                     }
74                 }));
75             }));
76 
77             rx.recv();
78         });
79     }
80 
only_blocking_inner(first_pending: bool)81     fn only_blocking_inner(first_pending: bool) {
82         loom::model(move || {
83             let pool = mk_pool(1);
84             let (block_tx, block_rx) = oneshot::channel();
85 
86             pool.spawn(track(async move {
87                 crate::task::block_in_place(move || {
88                     block_tx.send(());
89                 });
90                 if first_pending {
91                     task::yield_now().await
92                 }
93             }));
94 
95             block_rx.recv();
96             drop(pool);
97         });
98     }
99 
100     #[test]
only_blocking_without_pending()101     fn only_blocking_without_pending() {
102         only_blocking_inner(false)
103     }
104 
105     #[test]
only_blocking_with_pending()106     fn only_blocking_with_pending() {
107         only_blocking_inner(true)
108     }
109 }
110 
111 mod group_b {
112     use super::*;
113 
blocking_and_regular_inner(first_pending: bool)114     fn blocking_and_regular_inner(first_pending: bool) {
115         const NUM: usize = 3;
116         loom::model(move || {
117             let pool = mk_pool(1);
118             let cnt = Arc::new(AtomicUsize::new(0));
119 
120             let (block_tx, block_rx) = oneshot::channel();
121             let (done_tx, done_rx) = oneshot::channel();
122             let done_tx = Arc::new(Mutex::new(Some(done_tx)));
123 
124             pool.spawn(track(async move {
125                 crate::task::block_in_place(move || {
126                     block_tx.send(());
127                 });
128                 if first_pending {
129                     task::yield_now().await
130                 }
131             }));
132 
133             for _ in 0..NUM {
134                 let cnt = cnt.clone();
135                 let done_tx = done_tx.clone();
136 
137                 pool.spawn(track(async move {
138                     if NUM == cnt.fetch_add(1, Relaxed) + 1 {
139                         done_tx.lock().unwrap().take().unwrap().send(());
140                     }
141                 }));
142             }
143 
144             done_rx.recv();
145             block_rx.recv();
146 
147             drop(pool);
148         });
149     }
150 
151     #[test]
blocking_and_regular()152     fn blocking_and_regular() {
153         blocking_and_regular_inner(false);
154     }
155 
156     #[test]
blocking_and_regular_with_pending()157     fn blocking_and_regular_with_pending() {
158         blocking_and_regular_inner(true);
159     }
160 
161     #[test]
pool_shutdown()162     fn pool_shutdown() {
163         loom::model(|| {
164             let pool = mk_pool(2);
165 
166             pool.spawn(track(async move {
167                 gated2(true).await;
168             }));
169 
170             pool.spawn(track(async move {
171                 gated2(false).await;
172             }));
173 
174             drop(pool);
175         });
176     }
177 
178     #[test]
join_output()179     fn join_output() {
180         loom::model(|| {
181             let rt = mk_pool(1);
182 
183             rt.block_on(async {
184                 let t = crate::spawn(track(async { "hello" }));
185 
186                 let out = assert_ok!(t.await);
187                 assert_eq!("hello", out.into_inner());
188             });
189         });
190     }
191 
192     #[test]
poll_drop_handle_then_drop()193     fn poll_drop_handle_then_drop() {
194         loom::model(|| {
195             let rt = mk_pool(1);
196 
197             rt.block_on(async move {
198                 let mut t = crate::spawn(track(async { "hello" }));
199 
200                 poll_fn(|cx| {
201                     let _ = Pin::new(&mut t).poll(cx);
202                     Poll::Ready(())
203                 })
204                 .await;
205             });
206         })
207     }
208 
209     #[test]
complete_block_on_under_load()210     fn complete_block_on_under_load() {
211         loom::model(|| {
212             let pool = mk_pool(1);
213 
214             pool.block_on(async {
215                 // Trigger a re-schedule
216                 crate::spawn(track(async {
217                     for _ in 0..2 {
218                         task::yield_now().await;
219                     }
220                 }));
221 
222                 gated2(true).await
223             });
224         });
225     }
226 }
227 
228 mod group_c {
229     use super::*;
230 
231     #[test]
shutdown_with_notification()232     fn shutdown_with_notification() {
233         use crate::sync::oneshot;
234 
235         loom::model(|| {
236             let rt = mk_pool(2);
237             let (done_tx, done_rx) = oneshot::channel::<()>();
238 
239             rt.spawn(track(async move {
240                 let (tx, rx) = oneshot::channel::<()>();
241 
242                 crate::spawn(async move {
243                     crate::task::spawn_blocking(move || {
244                         let _ = tx.send(());
245                     });
246 
247                     let _ = done_rx.await;
248                 });
249 
250                 let _ = rx.await;
251 
252                 let _ = done_tx.send(());
253             }));
254         });
255     }
256 }
257 
258 mod group_d {
259     use super::*;
260 
261     #[test]
pool_multi_notify()262     fn pool_multi_notify() {
263         loom::model(|| {
264             let pool = mk_pool(2);
265 
266             let c1 = Arc::new(AtomicUsize::new(0));
267 
268             let (done_tx, done_rx) = oneshot::channel();
269             let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
270 
271             // Spawn a task
272             let c2 = c1.clone();
273             let done_tx2 = done_tx1.clone();
274             pool.spawn(track(async move {
275                 gated().await;
276                 gated().await;
277 
278                 if 1 == c1.fetch_add(1, Relaxed) {
279                     done_tx1.lock().unwrap().take().unwrap().send(());
280                 }
281             }));
282 
283             // Spawn a second task
284             pool.spawn(track(async move {
285                 gated().await;
286                 gated().await;
287 
288                 if 1 == c2.fetch_add(1, Relaxed) {
289                     done_tx2.lock().unwrap().take().unwrap().send(());
290                 }
291             }));
292 
293             done_rx.recv();
294         });
295     }
296 }
297 
mk_pool(num_threads: usize) -> Runtime298 fn mk_pool(num_threads: usize) -> Runtime {
299     runtime::Builder::new_multi_thread()
300         .worker_threads(num_threads)
301         .build()
302         .unwrap()
303 }
304 
gated() -> impl Future<Output = &'static str>305 fn gated() -> impl Future<Output = &'static str> {
306     gated2(false)
307 }
308 
gated2(thread: bool) -> impl Future<Output = &'static str>309 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
310     use loom::thread;
311     use std::sync::Arc;
312 
313     let gate = Arc::new(AtomicBool::new(false));
314     let mut fired = false;
315 
316     poll_fn(move |cx| {
317         if !fired {
318             let gate = gate.clone();
319             let waker = cx.waker().clone();
320 
321             if thread {
322                 thread::spawn(move || {
323                     gate.store(true, SeqCst);
324                     waker.wake_by_ref();
325                 });
326             } else {
327                 spawn(track(async move {
328                     gate.store(true, SeqCst);
329                     waker.wake_by_ref();
330                 }));
331             }
332 
333             fired = true;
334 
335             return Poll::Pending;
336         }
337 
338         if gate.load(SeqCst) {
339             Poll::Ready("hello world")
340         } else {
341             Poll::Pending
342         }
343     })
344 }
345 
track<T: Future>(f: T) -> Track<T>346 fn track<T: Future>(f: T) -> Track<T> {
347     Track {
348         inner: f,
349         arc: Arc::new(()),
350     }
351 }
352 
353 pin_project! {
354     struct Track<T> {
355         #[pin]
356         inner: T,
357         // Arc is used to hook into loom's leak tracking.
358         arc: Arc<()>,
359     }
360 }
361 
362 impl<T> Track<T> {
into_inner(self) -> T363     fn into_inner(self) -> T {
364         self.inner
365     }
366 }
367 
368 impl<T: Future> Future for Track<T> {
369     type Output = Track<T::Output>;
370 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>371     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372         let me = self.project();
373 
374         Poll::Ready(Track {
375             inner: ready!(me.inner.poll(cx)),
376             arc: me.arc.clone(),
377         })
378     }
379 }
380