1 //! A simple single-threaded executor that can spawn non-`Send` futures.
2
3 use std::cell::Cell;
4 use std::future::Future;
5 use std::rc::Rc;
6
7 use async_task::{Runnable, Task};
8
9 thread_local! {
10 // A queue that holds scheduled tasks.
11 static QUEUE: (flume::Sender<Runnable>, flume::Receiver<Runnable>) = flume::unbounded();
12 }
13
14 /// Spawns a future on the executor.
spawn<F, T>(future: F) -> Task<T> where F: Future<Output = T> + 'static, T: 'static,15 fn spawn<F, T>(future: F) -> Task<T>
16 where
17 F: Future<Output = T> + 'static,
18 T: 'static,
19 {
20 // Create a task that is scheduled by pushing itself into the queue.
21 let schedule = |runnable| QUEUE.with(|(s, _)| s.send(runnable).unwrap());
22 let (runnable, task) = async_task::spawn_local(future, schedule);
23
24 // Schedule the task by pushing it into the queue.
25 runnable.schedule();
26
27 task
28 }
29
30 /// Runs a future to completion.
run<F, T>(future: F) -> T where F: Future<Output = T> + 'static, T: 'static,31 fn run<F, T>(future: F) -> T
32 where
33 F: Future<Output = T> + 'static,
34 T: 'static,
35 {
36 // Spawn a task that sends its result through a channel.
37 let (s, r) = flume::unbounded();
38 spawn(async move { drop(s.send(future.await)) }).detach();
39
40 loop {
41 // If the original task has completed, return its result.
42 if let Ok(val) = r.try_recv() {
43 return val;
44 }
45
46 // Otherwise, take a task from the queue and run it.
47 QUEUE.with(|(_, r)| r.recv().unwrap().run());
48 }
49 }
50
main()51 fn main() {
52 let val = Rc::new(Cell::new(0));
53
54 // Run a future that increments a non-`Send` value.
55 run({
56 let val = val.clone();
57 async move {
58 // Spawn a future that increments the value.
59 let task = spawn({
60 let val = val.clone();
61 async move {
62 val.set(dbg!(val.get()) + 1);
63 }
64 });
65
66 val.set(dbg!(val.get()) + 1);
67 task.await;
68 }
69 });
70
71 // The value should be 2 at the end of the program.
72 dbg!(val.get());
73 }
74