1 /// Full runtime loom tests. These are heavy tests and take significant time to
2 /// run on CI.
3 ///
4 /// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
5 ///
6 /// In order to speed up the C
7 use crate::future::poll_fn;
8 use crate::runtime::tests::loom_oneshot as oneshot;
9 use crate::runtime::{self, Runtime};
10 use crate::{spawn, task};
11 use tokio_test::assert_ok;
12
13 use loom::sync::atomic::{AtomicBool, AtomicUsize};
14 use loom::sync::{Arc, Mutex};
15
16 use pin_project_lite::pin_project;
17 use std::future::Future;
18 use std::pin::Pin;
19 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
20 use std::task::{Context, Poll};
21
22 /// Tests are divided into groups to make the runs faster on CI.
23 mod group_a {
24 use super::*;
25
26 #[test]
racy_shutdown()27 fn racy_shutdown() {
28 loom::model(|| {
29 let pool = mk_pool(1);
30
31 // here's the case we want to exercise:
32 //
33 // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
34 // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
35 // shutdown method never gets run.
36 //
37 // we do this by spawning two tasks on one worker, the first of which does block_in_place,
38 // and then immediately drop the pool.
39
40 pool.spawn(track(async {
41 crate::task::block_in_place(|| {});
42 }));
43 pool.spawn(track(async {}));
44 drop(pool);
45 });
46 }
47
48 #[test]
pool_multi_spawn()49 fn pool_multi_spawn() {
50 loom::model(|| {
51 let pool = mk_pool(2);
52 let c1 = Arc::new(AtomicUsize::new(0));
53
54 let (tx, rx) = oneshot::channel();
55 let tx1 = Arc::new(Mutex::new(Some(tx)));
56
57 // Spawn a task
58 let c2 = c1.clone();
59 let tx2 = tx1.clone();
60 pool.spawn(track(async move {
61 spawn(track(async move {
62 if 1 == c1.fetch_add(1, Relaxed) {
63 tx1.lock().unwrap().take().unwrap().send(());
64 }
65 }));
66 }));
67
68 // Spawn a second task
69 pool.spawn(track(async move {
70 spawn(track(async move {
71 if 1 == c2.fetch_add(1, Relaxed) {
72 tx2.lock().unwrap().take().unwrap().send(());
73 }
74 }));
75 }));
76
77 rx.recv();
78 });
79 }
80
only_blocking_inner(first_pending: bool)81 fn only_blocking_inner(first_pending: bool) {
82 loom::model(move || {
83 let pool = mk_pool(1);
84 let (block_tx, block_rx) = oneshot::channel();
85
86 pool.spawn(track(async move {
87 crate::task::block_in_place(move || {
88 block_tx.send(());
89 });
90 if first_pending {
91 task::yield_now().await
92 }
93 }));
94
95 block_rx.recv();
96 drop(pool);
97 });
98 }
99
100 #[test]
only_blocking_without_pending()101 fn only_blocking_without_pending() {
102 only_blocking_inner(false)
103 }
104
105 #[test]
only_blocking_with_pending()106 fn only_blocking_with_pending() {
107 only_blocking_inner(true)
108 }
109 }
110
111 mod group_b {
112 use super::*;
113
blocking_and_regular_inner(first_pending: bool)114 fn blocking_and_regular_inner(first_pending: bool) {
115 const NUM: usize = 3;
116 loom::model(move || {
117 let pool = mk_pool(1);
118 let cnt = Arc::new(AtomicUsize::new(0));
119
120 let (block_tx, block_rx) = oneshot::channel();
121 let (done_tx, done_rx) = oneshot::channel();
122 let done_tx = Arc::new(Mutex::new(Some(done_tx)));
123
124 pool.spawn(track(async move {
125 crate::task::block_in_place(move || {
126 block_tx.send(());
127 });
128 if first_pending {
129 task::yield_now().await
130 }
131 }));
132
133 for _ in 0..NUM {
134 let cnt = cnt.clone();
135 let done_tx = done_tx.clone();
136
137 pool.spawn(track(async move {
138 if NUM == cnt.fetch_add(1, Relaxed) + 1 {
139 done_tx.lock().unwrap().take().unwrap().send(());
140 }
141 }));
142 }
143
144 done_rx.recv();
145 block_rx.recv();
146
147 drop(pool);
148 });
149 }
150
151 #[test]
blocking_and_regular()152 fn blocking_and_regular() {
153 blocking_and_regular_inner(false);
154 }
155
156 #[test]
blocking_and_regular_with_pending()157 fn blocking_and_regular_with_pending() {
158 blocking_and_regular_inner(true);
159 }
160
161 #[test]
pool_shutdown()162 fn pool_shutdown() {
163 loom::model(|| {
164 let pool = mk_pool(2);
165
166 pool.spawn(track(async move {
167 gated2(true).await;
168 }));
169
170 pool.spawn(track(async move {
171 gated2(false).await;
172 }));
173
174 drop(pool);
175 });
176 }
177
178 #[test]
join_output()179 fn join_output() {
180 loom::model(|| {
181 let rt = mk_pool(1);
182
183 rt.block_on(async {
184 let t = crate::spawn(track(async { "hello" }));
185
186 let out = assert_ok!(t.await);
187 assert_eq!("hello", out.into_inner());
188 });
189 });
190 }
191
192 #[test]
poll_drop_handle_then_drop()193 fn poll_drop_handle_then_drop() {
194 loom::model(|| {
195 let rt = mk_pool(1);
196
197 rt.block_on(async move {
198 let mut t = crate::spawn(track(async { "hello" }));
199
200 poll_fn(|cx| {
201 let _ = Pin::new(&mut t).poll(cx);
202 Poll::Ready(())
203 })
204 .await;
205 });
206 })
207 }
208
209 #[test]
complete_block_on_under_load()210 fn complete_block_on_under_load() {
211 loom::model(|| {
212 let pool = mk_pool(1);
213
214 pool.block_on(async {
215 // Trigger a re-schedule
216 crate::spawn(track(async {
217 for _ in 0..2 {
218 task::yield_now().await;
219 }
220 }));
221
222 gated2(true).await
223 });
224 });
225 }
226 }
227
228 mod group_c {
229 use super::*;
230
231 #[test]
shutdown_with_notification()232 fn shutdown_with_notification() {
233 use crate::sync::oneshot;
234
235 loom::model(|| {
236 let rt = mk_pool(2);
237 let (done_tx, done_rx) = oneshot::channel::<()>();
238
239 rt.spawn(track(async move {
240 let (tx, rx) = oneshot::channel::<()>();
241
242 crate::spawn(async move {
243 crate::task::spawn_blocking(move || {
244 let _ = tx.send(());
245 });
246
247 let _ = done_rx.await;
248 });
249
250 let _ = rx.await;
251
252 let _ = done_tx.send(());
253 }));
254 });
255 }
256 }
257
258 mod group_d {
259 use super::*;
260
261 #[test]
pool_multi_notify()262 fn pool_multi_notify() {
263 loom::model(|| {
264 let pool = mk_pool(2);
265
266 let c1 = Arc::new(AtomicUsize::new(0));
267
268 let (done_tx, done_rx) = oneshot::channel();
269 let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
270
271 // Spawn a task
272 let c2 = c1.clone();
273 let done_tx2 = done_tx1.clone();
274 pool.spawn(track(async move {
275 gated().await;
276 gated().await;
277
278 if 1 == c1.fetch_add(1, Relaxed) {
279 done_tx1.lock().unwrap().take().unwrap().send(());
280 }
281 }));
282
283 // Spawn a second task
284 pool.spawn(track(async move {
285 gated().await;
286 gated().await;
287
288 if 1 == c2.fetch_add(1, Relaxed) {
289 done_tx2.lock().unwrap().take().unwrap().send(());
290 }
291 }));
292
293 done_rx.recv();
294 });
295 }
296 }
297
mk_pool(num_threads: usize) -> Runtime298 fn mk_pool(num_threads: usize) -> Runtime {
299 runtime::Builder::new_multi_thread()
300 .worker_threads(num_threads)
301 .build()
302 .unwrap()
303 }
304
gated() -> impl Future<Output = &'static str>305 fn gated() -> impl Future<Output = &'static str> {
306 gated2(false)
307 }
308
gated2(thread: bool) -> impl Future<Output = &'static str>309 fn gated2(thread: bool) -> impl Future<Output = &'static str> {
310 use loom::thread;
311 use std::sync::Arc;
312
313 let gate = Arc::new(AtomicBool::new(false));
314 let mut fired = false;
315
316 poll_fn(move |cx| {
317 if !fired {
318 let gate = gate.clone();
319 let waker = cx.waker().clone();
320
321 if thread {
322 thread::spawn(move || {
323 gate.store(true, SeqCst);
324 waker.wake_by_ref();
325 });
326 } else {
327 spawn(track(async move {
328 gate.store(true, SeqCst);
329 waker.wake_by_ref();
330 }));
331 }
332
333 fired = true;
334
335 return Poll::Pending;
336 }
337
338 if gate.load(SeqCst) {
339 Poll::Ready("hello world")
340 } else {
341 Poll::Pending
342 }
343 })
344 }
345
track<T: Future>(f: T) -> Track<T>346 fn track<T: Future>(f: T) -> Track<T> {
347 Track {
348 inner: f,
349 arc: Arc::new(()),
350 }
351 }
352
353 pin_project! {
354 struct Track<T> {
355 #[pin]
356 inner: T,
357 // Arc is used to hook into loom's leak tracking.
358 arc: Arc<()>,
359 }
360 }
361
362 impl<T> Track<T> {
into_inner(self) -> T363 fn into_inner(self) -> T {
364 self.inner
365 }
366 }
367
368 impl<T: Future> Future for Track<T> {
369 type Output = Track<T::Output>;
370
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>371 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372 let me = self.project();
373
374 Poll::Ready(Track {
375 inner: ready!(me.inner.poll(cx)),
376 arc: me.arc.clone(),
377 })
378 }
379 }
380