1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use tokio::runtime::Runtime;
5 use tokio::sync::oneshot;
6 use tokio_test::{assert_err, assert_ok};
7
8 use std::thread;
9 use tokio::time::{timeout, Duration};
10
11 mod support {
12 pub(crate) mod mpsc_stream;
13 }
14
15 #[test]
spawned_task_does_not_progress_without_block_on()16 fn spawned_task_does_not_progress_without_block_on() {
17 let (tx, mut rx) = oneshot::channel();
18
19 let rt = rt();
20
21 rt.spawn(async move {
22 assert_ok!(tx.send("hello"));
23 });
24
25 thread::sleep(Duration::from_millis(50));
26
27 assert_err!(rx.try_recv());
28
29 let out = rt.block_on(async { assert_ok!(rx.await) });
30
31 assert_eq!(out, "hello");
32 }
33
34 #[test]
no_extra_poll()35 fn no_extra_poll() {
36 use pin_project_lite::pin_project;
37 use std::pin::Pin;
38 use std::sync::{
39 atomic::{AtomicUsize, Ordering::SeqCst},
40 Arc,
41 };
42 use std::task::{Context, Poll};
43 use tokio_stream::{Stream, StreamExt};
44
45 pin_project! {
46 struct TrackPolls<S> {
47 npolls: Arc<AtomicUsize>,
48 #[pin]
49 s: S,
50 }
51 }
52
53 impl<S> Stream for TrackPolls<S>
54 where
55 S: Stream,
56 {
57 type Item = S::Item;
58 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 let this = self.project();
60 this.npolls.fetch_add(1, SeqCst);
61 this.s.poll_next(cx)
62 }
63 }
64
65 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>();
66 let rx = TrackPolls {
67 npolls: Arc::new(AtomicUsize::new(0)),
68 s: rx,
69 };
70 let npolls = Arc::clone(&rx.npolls);
71
72 let rt = rt();
73
74 // TODO: could probably avoid this, but why not.
75 let mut rx = Box::pin(rx);
76
77 rt.spawn(async move { while rx.next().await.is_some() {} });
78 rt.block_on(async {
79 tokio::task::yield_now().await;
80 });
81
82 // should have been polled exactly once: the initial poll
83 assert_eq!(npolls.load(SeqCst), 1);
84
85 tx.send(()).unwrap();
86 rt.block_on(async {
87 tokio::task::yield_now().await;
88 });
89
90 // should have been polled twice more: once to yield Some(), then once to yield Pending
91 assert_eq!(npolls.load(SeqCst), 1 + 2);
92
93 drop(tx);
94 rt.block_on(async {
95 tokio::task::yield_now().await;
96 });
97
98 // should have been polled once more: to yield None
99 assert_eq!(npolls.load(SeqCst), 1 + 2 + 1);
100 }
101
102 #[test]
acquire_mutex_in_drop()103 fn acquire_mutex_in_drop() {
104 use futures::future::pending;
105 use tokio::task;
106
107 let (tx1, rx1) = oneshot::channel();
108 let (tx2, rx2) = oneshot::channel();
109
110 let rt = rt();
111
112 rt.spawn(async move {
113 let _ = rx2.await;
114 unreachable!();
115 });
116
117 rt.spawn(async move {
118 let _ = rx1.await;
119 tx2.send(()).unwrap();
120 unreachable!();
121 });
122
123 // Spawn a task that will never notify
124 rt.spawn(async move {
125 pending::<()>().await;
126 tx1.send(()).unwrap();
127 });
128
129 // Tick the loop
130 rt.block_on(async {
131 task::yield_now().await;
132 });
133
134 // Drop the rt
135 drop(rt);
136 }
137
138 #[test]
139 #[should_panic(
140 expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."
141 )]
timeout_panics_when_no_time_handle()142 fn timeout_panics_when_no_time_handle() {
143 let rt = tokio::runtime::Builder::new_current_thread()
144 .build()
145 .unwrap();
146 rt.block_on(async {
147 let (_tx, rx) = oneshot::channel::<()>();
148 let dur = Duration::from_millis(20);
149 let _ = timeout(dur, rx).await;
150 });
151 }
152
rt() -> Runtime153 fn rt() -> Runtime {
154 tokio::runtime::Builder::new_current_thread()
155 .enable_all()
156 .build()
157 .unwrap()
158 }
159