1 use crate::enter;
2 use futures_core::future::Future;
3 use futures_core::stream::Stream;
4 use futures_core::task::{Context, Poll};
5 use futures_task::{waker_ref, ArcWake};
6 use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
7 use futures_util::pin_mut;
8 use futures_util::stream::FuturesUnordered;
9 use futures_util::stream::StreamExt;
10 use std::cell::RefCell;
11 use std::ops::{Deref, DerefMut};
12 use std::rc::{Rc, Weak};
13 use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
14 use std::thread::{self, Thread};
15 
16 /// A single-threaded task pool for polling futures to completion.
17 ///
18 /// This executor allows you to multiplex any number of tasks onto a single
19 /// thread. It's appropriate to poll strictly I/O-bound futures that do very
20 /// little work in between I/O actions.
21 ///
22 /// To get a handle to the pool that implements
23 /// [`Spawn`](futures_task::Spawn), use the
24 /// [`spawner()`](LocalPool::spawner) method. Because the executor is
25 /// single-threaded, it supports a special form of task spawning for non-`Send`
26 /// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
27 #[derive(Debug)]
28 pub struct LocalPool {
29     pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
30     incoming: Rc<Incoming>,
31 }
32 
33 /// A handle to a [`LocalPool`](LocalPool) that implements
34 /// [`Spawn`](futures_task::Spawn).
35 #[derive(Clone, Debug)]
36 pub struct LocalSpawner {
37     incoming: Weak<Incoming>,
38 }
39 
40 type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
41 
42 pub(crate) struct ThreadNotify {
43     /// The (single) executor thread.
44     thread: Thread,
45     /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
46     /// before the next `park()`, which may otherwise happen if the code
47     /// being executed as part of the future(s) being polled makes use of
48     /// park / unpark calls of its own, i.e. we cannot assume that no other
49     /// code uses park / unpark on the executing `thread`.
50     unparked: AtomicBool,
51 }
52 
53 thread_local! {
54     static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
55         thread: thread::current(),
56         unparked: AtomicBool::new(false),
57     });
58 }
59 
60 impl ArcWake for ThreadNotify {
wake_by_ref(arc_self: &Arc<Self>)61     fn wake_by_ref(arc_self: &Arc<Self>) {
62         // Make sure the wakeup is remembered until the next `park()`.
63         let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
64         if !unparked {
65             // If the thread has not been unparked yet, it must be done
66             // now. If it was actually parked, it will run again,
67             // otherwise the token made available by `unpark`
68             // may be consumed before reaching `park()`, but `unparked`
69             // ensures it is not forgotten.
70             arc_self.thread.unpark();
71         }
72     }
73 }
74 
75 // Set up and run a basic single-threaded spawner loop, invoking `f` on each
76 // turn.
run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T77 fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
78     let _enter = enter().expect(
79         "cannot execute `LocalPool` executor from within \
80          another executor",
81     );
82 
83     CURRENT_THREAD_NOTIFY.with(|thread_notify| {
84         let waker = waker_ref(thread_notify);
85         let mut cx = Context::from_waker(&waker);
86         loop {
87             if let Poll::Ready(t) = f(&mut cx) {
88                 return t;
89             }
90             // Consume the wakeup that occurred while executing `f`, if any.
91             let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
92             if !unparked {
93                 // No wakeup occurred. It may occur now, right before parking,
94                 // but in that case the token made available by `unpark()`
95                 // is guaranteed to still be available and `park()` is a no-op.
96                 thread::park();
97                 // When the thread is unparked, `unparked` will have been set
98                 // and needs to be unset before the next call to `f` to avoid
99                 // a redundant loop iteration.
100                 thread_notify.unparked.store(false, Ordering::Release);
101             }
102         }
103     })
104 }
105 
poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T106 fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
107     let _enter = enter().expect(
108         "cannot execute `LocalPool` executor from within \
109          another executor",
110     );
111 
112     CURRENT_THREAD_NOTIFY.with(|thread_notify| {
113         let waker = waker_ref(thread_notify);
114         let mut cx = Context::from_waker(&waker);
115         f(&mut cx)
116     })
117 }
118 
119 impl LocalPool {
120     /// Create a new, empty pool of tasks.
new() -> Self121     pub fn new() -> Self {
122         Self {
123             pool: FuturesUnordered::new(),
124             incoming: Default::default(),
125         }
126     }
127 
128     /// Get a clonable handle to the pool as a [`Spawn`].
spawner(&self) -> LocalSpawner129     pub fn spawner(&self) -> LocalSpawner {
130         LocalSpawner {
131             incoming: Rc::downgrade(&self.incoming),
132         }
133     }
134 
135     /// Run all tasks in the pool to completion.
136     ///
137     /// ```
138     /// use futures::executor::LocalPool;
139     ///
140     /// let mut pool = LocalPool::new();
141     ///
142     /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
143     ///
144     /// // run *all* tasks in the pool to completion, including any newly-spawned ones.
145     /// pool.run();
146     /// ```
147     ///
148     /// The function will block the calling thread until *all* tasks in the pool
149     /// are complete, including any spawned while running existing tasks.
run(&mut self)150     pub fn run(&mut self) {
151         run_executor(|cx| self.poll_pool(cx))
152     }
153 
154     /// Runs all the tasks in the pool until the given future completes.
155     ///
156     /// ```
157     /// use futures::executor::LocalPool;
158     ///
159     /// let mut pool = LocalPool::new();
160     /// # let my_app  = async {};
161     ///
162     /// // run tasks in the pool until `my_app` completes
163     /// pool.run_until(my_app);
164     /// ```
165     ///
166     /// The function will block the calling thread *only* until the future `f`
167     /// completes; there may still be incomplete tasks in the pool, which will
168     /// be inert after the call completes, but can continue with further use of
169     /// one of the pool's run or poll methods. While the function is running,
170     /// however, all tasks in the pool will try to make progress.
run_until<F: Future>(&mut self, future: F) -> F::Output171     pub fn run_until<F: Future>(&mut self, future: F) -> F::Output {
172         pin_mut!(future);
173 
174         run_executor(|cx| {
175             {
176                 // if our main task is done, so are we
177                 let result = future.as_mut().poll(cx);
178                 if let Poll::Ready(output) = result {
179                     return Poll::Ready(output);
180                 }
181             }
182 
183             let _ = self.poll_pool(cx);
184             Poll::Pending
185         })
186     }
187 
188     /// Runs all tasks and returns after completing one future or until no more progress
189     /// can be made. Returns `true` if one future was completed, `false` otherwise.
190     ///
191     /// ```
192     /// use futures::executor::LocalPool;
193     /// use futures::task::LocalSpawnExt;
194     /// use futures::future::{ready, pending};
195     ///
196     /// let mut pool = LocalPool::new();
197     /// let spawner = pool.spawner();
198     ///
199     /// spawner.spawn_local(ready(())).unwrap();
200     /// spawner.spawn_local(ready(())).unwrap();
201     /// spawner.spawn_local(pending()).unwrap();
202     ///
203     /// // Run the two ready tasks and return true for them.
204     /// pool.try_run_one(); // returns true after completing one of the ready futures
205     /// pool.try_run_one(); // returns true after completing the other ready future
206     ///
207     /// // the remaining task can not be completed
208     /// assert!(!pool.try_run_one()); // returns false
209     /// ```
210     ///
211     /// This function will not block the calling thread and will return the moment
212     /// that there are no tasks left for which progress can be made or after exactly one
213     /// task was completed; Remaining incomplete tasks in the pool can continue with
214     /// further use of one of the pool's run or poll methods.
215     /// Though only one task will be completed, progress may be made on multiple tasks.
try_run_one(&mut self) -> bool216     pub fn try_run_one(&mut self) -> bool {
217         poll_executor(|ctx| {
218             loop {
219                 let ret = self.poll_pool_once(ctx);
220 
221                 // return if we have executed a future
222                 if let Poll::Ready(Some(_)) = ret {
223                     return true;
224                 }
225 
226                 // if there are no new incoming futures
227                 // then there is no feature that can make progress
228                 // and we can return without having completed a single future
229                 if self.incoming.borrow().is_empty() {
230                     return false;
231                 }
232             }
233         })
234     }
235 
236     /// Runs all tasks in the pool and returns if no more progress can be made
237     /// on any task.
238     ///
239     /// ```
240     /// use futures::executor::LocalPool;
241     /// use futures::task::LocalSpawnExt;
242     /// use futures::future::{ready, pending};
243     ///
244     /// let mut pool = LocalPool::new();
245     /// let spawner = pool.spawner();
246     ///
247     /// spawner.spawn_local(ready(())).unwrap();
248     /// spawner.spawn_local(ready(())).unwrap();
249     /// spawner.spawn_local(pending()).unwrap();
250     ///
251     /// // Runs the two ready task and returns.
252     /// // The empty task remains in the pool.
253     /// pool.run_until_stalled();
254     /// ```
255     ///
256     /// This function will not block the calling thread and will return the moment
257     /// that there are no tasks left for which progress can be made;
258     /// remaining incomplete tasks in the pool can continue with further use of one
259     /// of the pool's run or poll methods. While the function is running, all tasks
260     /// in the pool will try to make progress.
run_until_stalled(&mut self)261     pub fn run_until_stalled(&mut self) {
262         poll_executor(|ctx| {
263             let _ = self.poll_pool(ctx);
264         });
265     }
266 
267     // Make maximal progress on the entire pool of spawned task, returning `Ready`
268     // if the pool is empty and `Pending` if no further progress can be made.
poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()>269     fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
270         // state for the FuturesUnordered, which will never be used
271         loop {
272             let ret = self.poll_pool_once(cx);
273 
274             // we queued up some new tasks; add them and poll again
275             if !self.incoming.borrow().is_empty() {
276                 continue;
277             }
278 
279             // no queued tasks; we may be done
280             match ret {
281                 Poll::Pending => return Poll::Pending,
282                 Poll::Ready(None) => return Poll::Ready(()),
283                 _ => {}
284             }
285         }
286     }
287 
288     // Try make minimal progress on the pool of spawned tasks
poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>>289     fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
290         // empty the incoming queue of newly-spawned tasks
291         {
292             let mut incoming = self.incoming.borrow_mut();
293             for task in incoming.drain(..) {
294                 self.pool.push(task)
295             }
296         }
297 
298         // try to execute the next ready future
299         self.pool.poll_next_unpin(cx)
300     }
301 }
302 
303 impl Default for LocalPool {
default() -> Self304     fn default() -> Self {
305         Self::new()
306     }
307 }
308 
309 /// Run a future to completion on the current thread.
310 ///
311 /// This function will block the caller until the given future has completed.
312 ///
313 /// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
314 /// spawned tasks.
block_on<F: Future>(f: F) -> F::Output315 pub fn block_on<F: Future>(f: F) -> F::Output {
316     pin_mut!(f);
317     run_executor(|cx| f.as_mut().poll(cx))
318 }
319 
320 /// Turn a stream into a blocking iterator.
321 ///
322 /// When `next` is called on the resulting `BlockingStream`, the caller
323 /// will be blocked until the next element of the `Stream` becomes available.
block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S>324 pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
325     BlockingStream { stream }
326 }
327 
328 /// An iterator which blocks on values from a stream until they become available.
329 #[derive(Debug)]
330 pub struct BlockingStream<S: Stream + Unpin> {
331     stream: S,
332 }
333 
334 impl<S: Stream + Unpin> Deref for BlockingStream<S> {
335     type Target = S;
deref(&self) -> &Self::Target336     fn deref(&self) -> &Self::Target {
337         &self.stream
338     }
339 }
340 
341 impl<S: Stream + Unpin> DerefMut for BlockingStream<S> {
deref_mut(&mut self) -> &mut Self::Target342     fn deref_mut(&mut self) -> &mut Self::Target {
343         &mut self.stream
344     }
345 }
346 
347 impl<S: Stream + Unpin> BlockingStream<S> {
348     /// Convert this `BlockingStream` into the inner `Stream` type.
into_inner(self) -> S349     pub fn into_inner(self) -> S {
350         self.stream
351     }
352 }
353 
354 impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
355     type Item = S::Item;
356 
next(&mut self) -> Option<Self::Item>357     fn next(&mut self) -> Option<Self::Item> {
358         LocalPool::new().run_until(self.stream.next())
359     }
360 
size_hint(&self) -> (usize, Option<usize>)361     fn size_hint(&self) -> (usize, Option<usize>) {
362         self.stream.size_hint()
363     }
364 }
365 
366 impl Spawn for LocalSpawner {
spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError>367     fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
368         if let Some(incoming) = self.incoming.upgrade() {
369             incoming.borrow_mut().push(future.into());
370             Ok(())
371         } else {
372             Err(SpawnError::shutdown())
373         }
374     }
375 
status(&self) -> Result<(), SpawnError>376     fn status(&self) -> Result<(), SpawnError> {
377         if self.incoming.upgrade().is_some() {
378             Ok(())
379         } else {
380             Err(SpawnError::shutdown())
381         }
382     }
383 }
384 
385 impl LocalSpawn for LocalSpawner {
spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError>386     fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
387         if let Some(incoming) = self.incoming.upgrade() {
388             incoming.borrow_mut().push(future);
389             Ok(())
390         } else {
391             Err(SpawnError::shutdown())
392         }
393     }
394 
status_local(&self) -> Result<(), SpawnError>395     fn status_local(&self) -> Result<(), SpawnError> {
396         if self.incoming.upgrade().is_some() {
397             Ok(())
398         } else {
399             Err(SpawnError::shutdown())
400         }
401     }
402 }
403