1 //! Runs `!Send` futures on the current thread.
2 use crate::runtime::task::{self, JoinHandle, Task};
3 use crate::sync::AtomicWaker;
4 use crate::util::linked_list::{Link, LinkedList};
5 
6 use std::cell::{Cell, RefCell};
7 use std::collections::VecDeque;
8 use std::fmt;
9 use std::future::Future;
10 use std::marker::PhantomData;
11 use std::pin::Pin;
12 use std::sync::{Arc, Mutex};
13 use std::task::Poll;
14 
15 use pin_project_lite::pin_project;
16 
17 cfg_rt! {
18     /// A set of tasks which are executed on the same thread.
19     ///
20     /// In some cases, it is necessary to run one or more futures that do not
21     /// implement [`Send`] and thus are unsafe to send between threads. In these
22     /// cases, a [local task set] may be used to schedule one or more `!Send`
23     /// futures to run together on the same thread.
24     ///
25     /// For example, the following code will not compile:
26     ///
27     /// ```rust,compile_fail
28     /// use std::rc::Rc;
29     ///
30     /// #[tokio::main]
31     /// async fn main() {
32     ///     // `Rc` does not implement `Send`, and thus may not be sent between
33     ///     // threads safely.
34     ///     let unsend_data = Rc::new("my unsend data...");
35     ///
36     ///     let unsend_data = unsend_data.clone();
37     ///     // Because the `async` block here moves `unsend_data`, the future is `!Send`.
38     ///     // Since `tokio::spawn` requires the spawned future to implement `Send`, this
39     ///     // will not compile.
40     ///     tokio::spawn(async move {
41     ///         println!("{}", unsend_data);
42     ///         // ...
43     ///     }).await.unwrap();
44     /// }
45     /// ```
46     ///
47     /// # Use with `run_until`
48     ///
49     /// To spawn `!Send` futures, we can use a local task set to schedule them
50     /// on the thread calling [`Runtime::block_on`]. When running inside of the
51     /// local task set, we can use [`task::spawn_local`], which can spawn
52     /// `!Send` futures. For example:
53     ///
54     /// ```rust
55     /// use std::rc::Rc;
56     /// use tokio::task;
57     ///
58     /// #[tokio::main]
59     /// async fn main() {
60     ///     let unsend_data = Rc::new("my unsend data...");
61     ///
62     ///     // Construct a local task set that can run `!Send` futures.
63     ///     let local = task::LocalSet::new();
64     ///
65     ///     // Run the local task set.
66     ///     local.run_until(async move {
67     ///         let unsend_data = unsend_data.clone();
68     ///         // `spawn_local` ensures that the future is spawned on the local
69     ///         // task set.
70     ///         task::spawn_local(async move {
71     ///             println!("{}", unsend_data);
72     ///             // ...
73     ///         }).await.unwrap();
74     ///     }).await;
75     /// }
76     /// ```
77     /// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
78     /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
79     /// cannot be used inside a task spawned with `tokio::spawn`.
80     ///
81     /// ## Awaiting a `LocalSet`
82     ///
83     /// Additionally, a `LocalSet` itself implements `Future`, completing when
84     /// *all* tasks spawned on the `LocalSet` complete. This can be used to run
85     /// several futures on a `LocalSet` and drive the whole set until they
86     /// complete. For example,
87     ///
88     /// ```rust
89     /// use tokio::{task, time};
90     /// use std::rc::Rc;
91     ///
92     /// #[tokio::main]
93     /// async fn main() {
94     ///     let unsend_data = Rc::new("world");
95     ///     let local = task::LocalSet::new();
96     ///
97     ///     let unsend_data2 = unsend_data.clone();
98     ///     local.spawn_local(async move {
99     ///         // ...
100     ///         println!("hello {}", unsend_data2)
101     ///     });
102     ///
103     ///     local.spawn_local(async move {
104     ///         time::sleep(time::Duration::from_millis(100)).await;
105     ///         println!("goodbye {}", unsend_data)
106     ///     });
107     ///
108     ///     // ...
109     ///
110     ///     local.await;
111     /// }
112     /// ```
113     /// **Note:** Awaiting a `LocalSet` can only be done inside
114     /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
115     /// [`Runtime::block_on`]. It cannot be used inside a task spawned with
116     /// `tokio::spawn`.
117     ///
118     /// ## Use inside `tokio::spawn`
119     ///
120     /// The two methods mentioned above cannot be used inside `tokio::spawn`, so
121     /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
122     /// something else. The solution is to create the `LocalSet` somewhere else,
123     /// and communicate with it using an [`mpsc`] channel.
124     ///
125     /// The following example puts the `LocalSet` inside a new thread.
126     /// ```
127     /// use tokio::runtime::Builder;
128     /// use tokio::sync::{mpsc, oneshot};
129     /// use tokio::task::LocalSet;
130     ///
131     /// // This struct describes the task you want to spawn. Here we include
132     /// // some simple examples. The oneshot channel allows sending a response
133     /// // to the spawner.
134     /// #[derive(Debug)]
135     /// enum Task {
136     ///     PrintNumber(u32),
137     ///     AddOne(u32, oneshot::Sender<u32>),
138     /// }
139     ///
140     /// #[derive(Clone)]
141     /// struct LocalSpawner {
142     ///    send: mpsc::UnboundedSender<Task>,
143     /// }
144     ///
145     /// impl LocalSpawner {
146     ///     pub fn new() -> Self {
147     ///         let (send, mut recv) = mpsc::unbounded_channel();
148     ///
149     ///         let rt = Builder::new_current_thread()
150     ///             .enable_all()
151     ///             .build()
152     ///             .unwrap();
153     ///
154     ///         std::thread::spawn(move || {
155     ///             let local = LocalSet::new();
156     ///
157     ///             local.spawn_local(async move {
158     ///                 while let Some(new_task) = recv.recv().await {
159     ///                     tokio::task::spawn_local(run_task(new_task));
160     ///                 }
161     ///                 // If the while loop returns, then all the LocalSpawner
162     ///                 // objects have have been dropped.
163     ///             });
164     ///
165     ///             // This will return once all senders are dropped and all
166     ///             // spawned tasks have returned.
167     ///             rt.block_on(local);
168     ///         });
169     ///
170     ///         Self {
171     ///             send,
172     ///         }
173     ///     }
174     ///
175     ///     pub fn spawn(&self, task: Task) {
176     ///         self.send.send(task).expect("Thread with LocalSet has shut down.");
177     ///     }
178     /// }
179     ///
180     /// // This task may do !Send stuff. We use printing a number as an example,
181     /// // but it could be anything.
182     /// //
183     /// // The Task struct is an enum to support spawning many different kinds
184     /// // of operations.
185     /// async fn run_task(task: Task) {
186     ///     match task {
187     ///         Task::PrintNumber(n) => {
188     ///             println!("{}", n);
189     ///         },
190     ///         Task::AddOne(n, response) => {
191     ///             // We ignore failures to send the response.
192     ///             let _ = response.send(n + 1);
193     ///         },
194     ///     }
195     /// }
196     ///
197     /// #[tokio::main]
198     /// async fn main() {
199     ///     let spawner = LocalSpawner::new();
200     ///
201     ///     let (send, response) = oneshot::channel();
202     ///     spawner.spawn(Task::AddOne(10, send));
203     ///     let eleven = response.await.unwrap();
204     ///     assert_eq!(eleven, 11);
205     /// }
206     /// ```
207     ///
208     /// [`Send`]: trait@std::marker::Send
209     /// [local task set]: struct@LocalSet
210     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
211     /// [`task::spawn_local`]: fn@spawn_local
212     /// [`mpsc`]: mod@crate::sync::mpsc
213     pub struct LocalSet {
214         /// Current scheduler tick
215         tick: Cell<u8>,
216 
217         /// State available from thread-local
218         context: Context,
219 
220         /// This type should not be Send.
221         _not_send: PhantomData<*const ()>,
222     }
223 }
224 
225 /// State available from the thread-local
226 struct Context {
227     /// Owned task set and local run queue
228     tasks: RefCell<Tasks>,
229 
230     /// State shared between threads.
231     shared: Arc<Shared>,
232 }
233 
234 struct Tasks {
235     /// Collection of all active tasks spawned onto this executor.
236     owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,
237 
238     /// Local run queue sender and receiver.
239     queue: VecDeque<task::Notified<Arc<Shared>>>,
240 }
241 
242 /// LocalSet state shared between threads.
243 struct Shared {
244     /// Remote run queue sender
245     queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
246 
247     /// Wake the `LocalSet` task
248     waker: AtomicWaker,
249 }
250 
251 pin_project! {
252     #[derive(Debug)]
253     struct RunUntil<'a, F> {
254         local_set: &'a LocalSet,
255         #[pin]
256         future: F,
257     }
258 }
259 
260 scoped_thread_local!(static CURRENT: Context);
261 
262 cfg_rt! {
263     /// Spawns a `!Send` future on the local task set.
264     ///
265     /// The spawned future will be run on the same thread that called `spawn_local.`
266     /// This may only be called from the context of a local task set.
267     ///
268     /// # Panics
269     ///
270     /// - This function panics if called outside of a local task set.
271     ///
272     /// # Examples
273     ///
274     /// ```rust
275     /// use std::rc::Rc;
276     /// use tokio::task;
277     ///
278     /// #[tokio::main]
279     /// async fn main() {
280     ///     let unsend_data = Rc::new("my unsend data...");
281     ///
282     ///     let local = task::LocalSet::new();
283     ///
284     ///     // Run the local task set.
285     ///     local.run_until(async move {
286     ///         let unsend_data = unsend_data.clone();
287     ///         task::spawn_local(async move {
288     ///             println!("{}", unsend_data);
289     ///             // ...
290     ///         }).await.unwrap();
291     ///     }).await;
292     /// }
293     /// ```
294     #[cfg_attr(tokio_track_caller, track_caller)]
295     pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
296     where
297         F: Future + 'static,
298         F::Output: 'static,
299     {
300         let future = crate::util::trace::task(future, "local");
301         CURRENT.with(|maybe_cx| {
302             let cx = maybe_cx
303                 .expect("`spawn_local` called from outside of a `task::LocalSet`");
304 
305             // Safety: Tasks are only polled and dropped from the thread that
306             // spawns them.
307             let (task, handle) = unsafe { task::joinable_local(future) };
308             cx.tasks.borrow_mut().queue.push_back(task);
309             handle
310         })
311     }
312 }
313 
314 /// Initial queue capacity
315 const INITIAL_CAPACITY: usize = 64;
316 
317 /// Max number of tasks to poll per tick.
318 const MAX_TASKS_PER_TICK: usize = 61;
319 
320 /// How often it check the remote queue first
321 const REMOTE_FIRST_INTERVAL: u8 = 31;
322 
323 impl LocalSet {
324     /// Returns a new local task set.
new() -> LocalSet325     pub fn new() -> LocalSet {
326         LocalSet {
327             tick: Cell::new(0),
328             context: Context {
329                 tasks: RefCell::new(Tasks {
330                     owned: LinkedList::new(),
331                     queue: VecDeque::with_capacity(INITIAL_CAPACITY),
332                 }),
333                 shared: Arc::new(Shared {
334                     queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
335                     waker: AtomicWaker::new(),
336                 }),
337             },
338             _not_send: PhantomData,
339         }
340     }
341 
342     /// Spawns a `!Send` task onto the local task set.
343     ///
344     /// This task is guaranteed to be run on the current thread.
345     ///
346     /// Unlike the free function [`spawn_local`], this method may be used to
347     /// spawn local tasks when the task set is _not_ running. For example:
348     /// ```rust
349     /// use tokio::task;
350     ///
351     /// #[tokio::main]
352     /// async fn main() {
353     ///     let local = task::LocalSet::new();
354     ///
355     ///     // Spawn a future on the local set. This future will be run when
356     ///     // we call `run_until` to drive the task set.
357     ///     local.spawn_local(async {
358     ///        // ...
359     ///     });
360     ///
361     ///     // Run the local task set.
362     ///     local.run_until(async move {
363     ///         // ...
364     ///     }).await;
365     ///
366     ///     // When `run` finishes, we can spawn _more_ futures, which will
367     ///     // run in subsequent calls to `run_until`.
368     ///     local.spawn_local(async {
369     ///        // ...
370     ///     });
371     ///
372     ///     local.run_until(async move {
373     ///         // ...
374     ///     }).await;
375     /// }
376     /// ```
377     /// [`spawn_local`]: fn@spawn_local
378     #[cfg_attr(tokio_track_caller, track_caller)]
spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,379     pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
380     where
381         F: Future + 'static,
382         F::Output: 'static,
383     {
384         let future = crate::util::trace::task(future, "local");
385         let (task, handle) = unsafe { task::joinable_local(future) };
386         self.context.tasks.borrow_mut().queue.push_back(task);
387         self.context.shared.waker.wake();
388         handle
389     }
390 
391     /// Runs a future to completion on the provided runtime, driving any local
392     /// futures spawned on this task set on the current thread.
393     ///
394     /// This runs the given future on the runtime, blocking until it is
395     /// complete, and yielding its resolved result. Any tasks or timers which
396     /// the future spawns internally will be executed on the runtime. The future
397     /// may also call [`spawn_local`] to spawn_local additional local futures on the
398     /// current thread.
399     ///
400     /// This method should not be called from an asynchronous context.
401     ///
402     /// # Panics
403     ///
404     /// This function panics if the executor is at capacity, if the provided
405     /// future panics, or if called within an asynchronous execution context.
406     ///
407     /// # Notes
408     ///
409     /// Since this function internally calls [`Runtime::block_on`], and drives
410     /// futures in the local task set inside that call to `block_on`, the local
411     /// futures may not use [in-place blocking]. If a blocking call needs to be
412     /// issued from a local task, the [`spawn_blocking`] API may be used instead.
413     ///
414     /// For example, this will panic:
415     /// ```should_panic
416     /// use tokio::runtime::Runtime;
417     /// use tokio::task;
418     ///
419     /// let rt  = Runtime::new().unwrap();
420     /// let local = task::LocalSet::new();
421     /// local.block_on(&rt, async {
422     ///     let join = task::spawn_local(async {
423     ///         let blocking_result = task::block_in_place(|| {
424     ///             // ...
425     ///         });
426     ///         // ...
427     ///     });
428     ///     join.await.unwrap();
429     /// })
430     /// ```
431     /// This, however, will not panic:
432     /// ```
433     /// use tokio::runtime::Runtime;
434     /// use tokio::task;
435     ///
436     /// let rt  = Runtime::new().unwrap();
437     /// let local = task::LocalSet::new();
438     /// local.block_on(&rt, async {
439     ///     let join = task::spawn_local(async {
440     ///         let blocking_result = task::spawn_blocking(|| {
441     ///             // ...
442     ///         }).await;
443     ///         // ...
444     ///     });
445     ///     join.await.unwrap();
446     /// })
447     /// ```
448     ///
449     /// [`spawn_local`]: fn@spawn_local
450     /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
451     /// [in-place blocking]: fn@crate::task::block_in_place
452     /// [`spawn_blocking`]: fn@crate::task::spawn_blocking
453     #[cfg(feature = "rt")]
454     #[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output where F: Future,455     pub fn block_on<F>(&self, rt: &crate::runtime::Runtime, future: F) -> F::Output
456     where
457         F: Future,
458     {
459         rt.block_on(self.run_until(future))
460     }
461 
462     /// Run a future to completion on the local set, returning its output.
463     ///
464     /// This returns a future that runs the given future with a local set,
465     /// allowing it to call [`spawn_local`] to spawn additional `!Send` futures.
466     /// Any local futures spawned on the local set will be driven in the
467     /// background until the future passed to `run_until` completes. When the future
468     /// passed to `run` finishes, any local futures which have not completed
469     /// will remain on the local set, and will be driven on subsequent calls to
470     /// `run_until` or when [awaiting the local set] itself.
471     ///
472     /// # Examples
473     ///
474     /// ```rust
475     /// use tokio::task;
476     ///
477     /// #[tokio::main]
478     /// async fn main() {
479     ///     task::LocalSet::new().run_until(async {
480     ///         task::spawn_local(async move {
481     ///             // ...
482     ///         }).await.unwrap();
483     ///         // ...
484     ///     }).await;
485     /// }
486     /// ```
487     ///
488     /// [`spawn_local`]: fn@spawn_local
489     /// [awaiting the local set]: #awaiting-a-localset
run_until<F>(&self, future: F) -> F::Output where F: Future,490     pub async fn run_until<F>(&self, future: F) -> F::Output
491     where
492         F: Future,
493     {
494         let run_until = RunUntil {
495             future,
496             local_set: self,
497         };
498         run_until.await
499     }
500 
501     /// Tick the scheduler, returning whether the local future needs to be
502     /// notified again.
tick(&self) -> bool503     fn tick(&self) -> bool {
504         for _ in 0..MAX_TASKS_PER_TICK {
505             match self.next_task() {
506                 // Run the task
507                 //
508                 // Safety: As spawned tasks are `!Send`, `run_unchecked` must be
509                 // used. We are responsible for maintaining the invariant that
510                 // `run_unchecked` is only called on threads that spawned the
511                 // task initially. Because `LocalSet` itself is `!Send`, and
512                 // `spawn_local` spawns into the `LocalSet` on the current
513                 // thread, the invariant is maintained.
514                 Some(task) => crate::coop::budget(|| task.run()),
515                 // We have fully drained the queue of notified tasks, so the
516                 // local future doesn't need to be notified again — it can wait
517                 // until something else wakes a task in the local set.
518                 None => return false,
519             }
520         }
521 
522         true
523     }
524 
next_task(&self) -> Option<task::Notified<Arc<Shared>>>525     fn next_task(&self) -> Option<task::Notified<Arc<Shared>>> {
526         let tick = self.tick.get();
527         self.tick.set(tick.wrapping_add(1));
528 
529         if tick % REMOTE_FIRST_INTERVAL == 0 {
530             self.context
531                 .shared
532                 .queue
533                 .lock()
534                 .unwrap()
535                 .pop_front()
536                 .or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
537         } else {
538             self.context
539                 .tasks
540                 .borrow_mut()
541                 .queue
542                 .pop_front()
543                 .or_else(|| self.context.shared.queue.lock().unwrap().pop_front())
544         }
545     }
546 
with<T>(&self, f: impl FnOnce() -> T) -> T547     fn with<T>(&self, f: impl FnOnce() -> T) -> T {
548         CURRENT.set(&self.context, f)
549     }
550 }
551 
552 impl fmt::Debug for LocalSet {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result553     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
554         fmt.debug_struct("LocalSet").finish()
555     }
556 }
557 
558 impl Future for LocalSet {
559     type Output = ();
560 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>561     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
562         // Register the waker before starting to work
563         self.context.shared.waker.register_by_ref(cx.waker());
564 
565         if self.with(|| self.tick()) {
566             // If `tick` returns true, we need to notify the local future again:
567             // there are still tasks remaining in the run queue.
568             cx.waker().wake_by_ref();
569             Poll::Pending
570         } else if self.context.tasks.borrow().owned.is_empty() {
571             // If the scheduler has no remaining futures, we're done!
572             Poll::Ready(())
573         } else {
574             // There are still futures in the local set, but we've polled all the
575             // futures in the run queue. Therefore, we can just return Pending
576             // since the remaining futures will be woken from somewhere else.
577             Poll::Pending
578         }
579     }
580 }
581 
582 impl Default for LocalSet {
default() -> LocalSet583     fn default() -> LocalSet {
584         LocalSet::new()
585     }
586 }
587 
588 impl Drop for LocalSet {
drop(&mut self)589     fn drop(&mut self) {
590         self.with(|| {
591             // Loop required here to ensure borrow is dropped between iterations
592             #[allow(clippy::while_let_loop)]
593             loop {
594                 let task = match self.context.tasks.borrow_mut().owned.pop_back() {
595                     Some(task) => task,
596                     None => break,
597                 };
598 
599                 // Safety: same as `run_unchecked`.
600                 task.shutdown();
601             }
602 
603             for task in self.context.tasks.borrow_mut().queue.drain(..) {
604                 task.shutdown();
605             }
606 
607             for task in self.context.shared.queue.lock().unwrap().drain(..) {
608                 task.shutdown();
609             }
610 
611             assert!(self.context.tasks.borrow().owned.is_empty());
612         });
613     }
614 }
615 
616 // === impl LocalFuture ===
617 
618 impl<T: Future> Future for RunUntil<'_, T> {
619     type Output = T::Output;
620 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output>621     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
622         let me = self.project();
623 
624         me.local_set.with(|| {
625             me.local_set
626                 .context
627                 .shared
628                 .waker
629                 .register_by_ref(cx.waker());
630 
631             let _no_blocking = crate::runtime::enter::disallow_blocking();
632             let f = me.future;
633 
634             if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
635                 return Poll::Ready(output);
636             }
637 
638             if me.local_set.tick() {
639                 // If `tick` returns `true`, we need to notify the local future again:
640                 // there are still tasks remaining in the run queue.
641                 cx.waker().wake_by_ref();
642             }
643 
644             Poll::Pending
645         })
646     }
647 }
648 
649 impl Shared {
650     /// Schedule the provided task on the scheduler.
schedule(&self, task: task::Notified<Arc<Self>>)651     fn schedule(&self, task: task::Notified<Arc<Self>>) {
652         CURRENT.with(|maybe_cx| match maybe_cx {
653             Some(cx) if cx.shared.ptr_eq(self) => {
654                 cx.tasks.borrow_mut().queue.push_back(task);
655             }
656             _ => {
657                 self.queue.lock().unwrap().push_back(task);
658                 self.waker.wake();
659             }
660         });
661     }
662 
ptr_eq(&self, other: &Shared) -> bool663     fn ptr_eq(&self, other: &Shared) -> bool {
664         std::ptr::eq(self, other)
665     }
666 }
667 
668 impl task::Schedule for Arc<Shared> {
bind(task: Task<Self>) -> Arc<Shared>669     fn bind(task: Task<Self>) -> Arc<Shared> {
670         CURRENT.with(|maybe_cx| {
671             let cx = maybe_cx.expect("scheduler context missing");
672             cx.tasks.borrow_mut().owned.push_front(task);
673             cx.shared.clone()
674         })
675     }
676 
release(&self, task: &Task<Self>) -> Option<Task<Self>>677     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
678         use std::ptr::NonNull;
679 
680         CURRENT.with(|maybe_cx| {
681             let cx = maybe_cx.expect("scheduler context missing");
682 
683             assert!(cx.shared.ptr_eq(self));
684 
685             let ptr = NonNull::from(task.header());
686             // safety: task must be contained by list. It is inserted into the
687             // list in `bind`.
688             unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
689         })
690     }
691 
schedule(&self, task: task::Notified<Self>)692     fn schedule(&self, task: task::Notified<Self>) {
693         Shared::schedule(self, task);
694     }
695 }
696