1 use crate::future::poll_fn;
2 use crate::loom::sync::atomic::AtomicBool;
3 use crate::loom::sync::Mutex;
4 use crate::park::{Park, Unpark};
5 use crate::runtime::task::{self, JoinHandle, Schedule, Task};
6 use crate::sync::notify::Notify;
7 use crate::util::linked_list::{Link, LinkedList};
8 use crate::util::{waker_ref, Wake, WakerRef};
9 
10 use std::cell::RefCell;
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::future::Future;
14 use std::ptr::NonNull;
15 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
16 use std::sync::Arc;
17 use std::task::Poll::{Pending, Ready};
18 use std::time::Duration;
19 
20 /// Executes tasks on the current thread
21 pub(crate) struct BasicScheduler<P: Park> {
22     /// Inner state guarded by a mutex that is shared
23     /// between all `block_on` calls.
24     inner: Mutex<Option<Inner<P>>>,
25 
26     /// Notifier for waking up other threads to steal the
27     /// parker.
28     notify: Notify,
29 
30     /// Sendable task spawner
31     spawner: Spawner,
32 }
33 
34 /// The inner scheduler that owns the task queue and the main parker P.
35 struct Inner<P: Park> {
36     /// Scheduler run queue
37     ///
38     /// When the scheduler is executed, the queue is removed from `self` and
39     /// moved into `Context`.
40     ///
41     /// This indirection is to allow `BasicScheduler` to be `Send`.
42     tasks: Option<Tasks>,
43 
44     /// Sendable task spawner
45     spawner: Spawner,
46 
47     /// Current tick
48     tick: u8,
49 
50     /// Thread park handle
51     park: P,
52 }
53 
54 #[derive(Clone)]
55 pub(crate) struct Spawner {
56     shared: Arc<Shared>,
57 }
58 
59 struct Tasks {
60     /// Collection of all active tasks spawned onto this executor.
61     owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,
62 
63     /// Local run queue.
64     ///
65     /// Tasks notified from the current thread are pushed into this queue.
66     queue: VecDeque<task::Notified<Arc<Shared>>>,
67 }
68 
69 /// A remote scheduler entry.
70 ///
71 /// These are filled in by remote threads sending instructions to the scheduler.
72 enum Entry {
73     /// A remote thread wants to spawn a task.
74     Schedule(task::Notified<Arc<Shared>>),
75     /// A remote thread wants a task to be released by the scheduler. We only
76     /// have access to its header.
77     Release(NonNull<task::Header>),
78 }
79 
80 // Safety: Used correctly, the task header is "thread safe". Ultimately the task
81 // is owned by the current thread executor, for which this instruction is being
82 // sent.
83 unsafe impl Send for Entry {}
84 
85 /// Scheduler state shared between threads.
86 struct Shared {
87     /// Remote run queue
88     queue: Mutex<VecDeque<Entry>>,
89 
90     /// Unpark the blocked thread
91     unpark: Box<dyn Unpark>,
92 
93     // indicates whether the blocked on thread was woken
94     woken: AtomicBool,
95 }
96 
97 /// Thread-local context.
98 struct Context {
99     /// Shared scheduler state
100     shared: Arc<Shared>,
101 
102     /// Local queue
103     tasks: RefCell<Tasks>,
104 }
105 
106 /// Initial queue capacity.
107 const INITIAL_CAPACITY: usize = 64;
108 
109 /// Max number of tasks to poll per tick.
110 #[cfg(loom)]
111 const MAX_TASKS_PER_TICK: usize = 4;
112 #[cfg(not(loom))]
113 const MAX_TASKS_PER_TICK: usize = 61;
114 
115 /// How often to check the remote queue first.
116 const REMOTE_FIRST_INTERVAL: u8 = 31;
117 
118 // Tracks the current BasicScheduler.
119 scoped_thread_local!(static CURRENT: Context);
120 
121 impl<P: Park> BasicScheduler<P> {
new(park: P) -> BasicScheduler<P>122     pub(crate) fn new(park: P) -> BasicScheduler<P> {
123         let unpark = Box::new(park.unpark());
124 
125         let spawner = Spawner {
126             shared: Arc::new(Shared {
127                 queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
128                 unpark: unpark as Box<dyn Unpark>,
129                 woken: AtomicBool::new(false),
130             }),
131         };
132 
133         let inner = Mutex::new(Some(Inner {
134             tasks: Some(Tasks {
135                 owned: LinkedList::new(),
136                 queue: VecDeque::with_capacity(INITIAL_CAPACITY),
137             }),
138             spawner: spawner.clone(),
139             tick: 0,
140             park,
141         }));
142 
143         BasicScheduler {
144             inner,
145             notify: Notify::new(),
146             spawner,
147         }
148     }
149 
spawner(&self) -> &Spawner150     pub(crate) fn spawner(&self) -> &Spawner {
151         &self.spawner
152     }
153 
block_on<F: Future>(&self, future: F) -> F::Output154     pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
155         pin!(future);
156 
157         // Attempt to steal the dedicated parker and block_on the future if we can there,
158         // otherwise, lets select on a notification that the parker is available
159         // or the future is complete.
160         loop {
161             if let Some(inner) = &mut self.take_inner() {
162                 return inner.block_on(future);
163             } else {
164                 let mut enter = crate::runtime::enter(false);
165 
166                 let notified = self.notify.notified();
167                 pin!(notified);
168 
169                 if let Some(out) = enter
170                     .block_on(poll_fn(|cx| {
171                         if notified.as_mut().poll(cx).is_ready() {
172                             return Ready(None);
173                         }
174 
175                         if let Ready(out) = future.as_mut().poll(cx) {
176                             return Ready(Some(out));
177                         }
178 
179                         Pending
180                     }))
181                     .expect("Failed to `Enter::block_on`")
182                 {
183                     return out;
184                 }
185             }
186         }
187     }
188 
take_inner(&self) -> Option<InnerGuard<'_, P>>189     fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
190         let inner = self.inner.lock().take()?;
191 
192         Some(InnerGuard {
193             inner: Some(inner),
194             basic_scheduler: &self,
195         })
196     }
197 }
198 
199 impl<P: Park> Inner<P> {
200     /// Block on the future provided and drive the runtime's driver.
block_on<F: Future>(&mut self, future: F) -> F::Output201     fn block_on<F: Future>(&mut self, future: F) -> F::Output {
202         enter(self, |scheduler, context| {
203             let _enter = crate::runtime::enter(false);
204             let waker = scheduler.spawner.waker_ref();
205             let mut cx = std::task::Context::from_waker(&waker);
206             let mut polled = false;
207 
208             pin!(future);
209 
210             'outer: loop {
211                 if scheduler.spawner.was_woken() || !polled {
212                     polled = true;
213                     if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
214                         return v;
215                     }
216                 }
217 
218                 for _ in 0..MAX_TASKS_PER_TICK {
219                     // Get and increment the current tick
220                     let tick = scheduler.tick;
221                     scheduler.tick = scheduler.tick.wrapping_add(1);
222 
223                     let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
224                         scheduler.spawner.pop().or_else(|| {
225                             context
226                                 .tasks
227                                 .borrow_mut()
228                                 .queue
229                                 .pop_front()
230                                 .map(Entry::Schedule)
231                         })
232                     } else {
233                         context
234                             .tasks
235                             .borrow_mut()
236                             .queue
237                             .pop_front()
238                             .map(Entry::Schedule)
239                             .or_else(|| scheduler.spawner.pop())
240                     };
241 
242                     let entry = match entry {
243                         Some(entry) => entry,
244                         None => {
245                             // Park until the thread is signaled
246                             scheduler.park.park().expect("failed to park");
247 
248                             // Try polling the `block_on` future next
249                             continue 'outer;
250                         }
251                     };
252 
253                     match entry {
254                         Entry::Schedule(task) => crate::coop::budget(|| task.run()),
255                         Entry::Release(ptr) => {
256                             // Safety: the task header is only legally provided
257                             // internally in the header, so we know that it is a
258                             // valid (or in particular *allocated*) header that
259                             // is part of the linked list.
260                             unsafe {
261                                 let removed = context.tasks.borrow_mut().owned.remove(ptr);
262 
263                                 // TODO: This seems like it should hold, because
264                                 // there doesn't seem to be an avenue for anyone
265                                 // else to fiddle with the owned tasks
266                                 // collection *after* a remote thread has marked
267                                 // it as released, and at that point, the only
268                                 // location at which it can be removed is here
269                                 // or in the Drop implementation of the
270                                 // scheduler.
271                                 debug_assert!(removed.is_some());
272                             }
273                         }
274                     }
275                 }
276 
277                 // Yield to the park, this drives the timer and pulls any pending
278                 // I/O events.
279                 scheduler
280                     .park
281                     .park_timeout(Duration::from_millis(0))
282                     .expect("failed to park");
283             }
284         })
285     }
286 }
287 
288 /// Enter the scheduler context. This sets the queue and other necessary
289 /// scheduler state in the thread-local
enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R where F: FnOnce(&mut Inner<P>, &Context) -> R, P: Park,290 fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
291 where
292     F: FnOnce(&mut Inner<P>, &Context) -> R,
293     P: Park,
294 {
295     // Ensures the run queue is placed back in the `BasicScheduler` instance
296     // once `block_on` returns.`
297     struct Guard<'a, P: Park> {
298         context: Option<Context>,
299         scheduler: &'a mut Inner<P>,
300     }
301 
302     impl<P: Park> Drop for Guard<'_, P> {
303         fn drop(&mut self) {
304             let Context { tasks, .. } = self.context.take().expect("context missing");
305             self.scheduler.tasks = Some(tasks.into_inner());
306         }
307     }
308 
309     // Remove `tasks` from `self` and place it in a `Context`.
310     let tasks = scheduler.tasks.take().expect("invalid state");
311 
312     let guard = Guard {
313         context: Some(Context {
314             shared: scheduler.spawner.shared.clone(),
315             tasks: RefCell::new(tasks),
316         }),
317         scheduler,
318     };
319 
320     let context = guard.context.as_ref().unwrap();
321     let scheduler = &mut *guard.scheduler;
322 
323     CURRENT.set(context, || f(scheduler, context))
324 }
325 
326 impl<P: Park> Drop for BasicScheduler<P> {
drop(&mut self)327     fn drop(&mut self) {
328         // Avoid a double panic if we are currently panicking and
329         // the lock may be poisoned.
330 
331         let mut inner = match self.inner.lock().take() {
332             Some(inner) => inner,
333             None if std::thread::panicking() => return,
334             None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
335         };
336 
337         enter(&mut inner, |scheduler, context| {
338             // Loop required here to ensure borrow is dropped between iterations
339             #[allow(clippy::while_let_loop)]
340             loop {
341                 let task = match context.tasks.borrow_mut().owned.pop_back() {
342                     Some(task) => task,
343                     None => break,
344                 };
345 
346                 task.shutdown();
347             }
348 
349             // Drain local queue
350             for task in context.tasks.borrow_mut().queue.drain(..) {
351                 task.shutdown();
352             }
353 
354             // Drain remote queue
355             for entry in scheduler.spawner.shared.queue.lock().drain(..) {
356                 match entry {
357                     Entry::Schedule(task) => {
358                         task.shutdown();
359                     }
360                     Entry::Release(..) => {
361                         // Do nothing, each entry in the linked list was *just*
362                         // dropped by the scheduler above.
363                     }
364                 }
365             }
366 
367             assert!(context.tasks.borrow().owned.is_empty());
368         });
369     }
370 }
371 
372 impl<P: Park> fmt::Debug for BasicScheduler<P> {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result373     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
374         fmt.debug_struct("BasicScheduler").finish()
375     }
376 }
377 
378 // ===== impl Spawner =====
379 
380 impl Spawner {
381     /// Spawns a future onto the thread pool
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,382     pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
383     where
384         F: Future + Send + 'static,
385         F::Output: Send + 'static,
386     {
387         let (task, handle) = task::joinable(future);
388         self.shared.schedule(task);
389         handle
390     }
391 
pop(&self) -> Option<Entry>392     fn pop(&self) -> Option<Entry> {
393         self.shared.queue.lock().pop_front()
394     }
395 
waker_ref(&self) -> WakerRef<'_>396     fn waker_ref(&self) -> WakerRef<'_> {
397         // clear the woken bit
398         self.shared.woken.swap(false, AcqRel);
399         waker_ref(&self.shared)
400     }
401 
was_woken(&self) -> bool402     fn was_woken(&self) -> bool {
403         self.shared.woken.load(Acquire)
404     }
405 }
406 
407 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result408     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
409         fmt.debug_struct("Spawner").finish()
410     }
411 }
412 
413 // ===== impl Shared =====
414 
415 impl Schedule for Arc<Shared> {
bind(task: Task<Self>) -> Arc<Shared>416     fn bind(task: Task<Self>) -> Arc<Shared> {
417         CURRENT.with(|maybe_cx| {
418             let cx = maybe_cx.expect("scheduler context missing");
419             cx.tasks.borrow_mut().owned.push_front(task);
420             cx.shared.clone()
421         })
422     }
423 
release(&self, task: &Task<Self>) -> Option<Task<Self>>424     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
425         CURRENT.with(|maybe_cx| {
426             let ptr = NonNull::from(task.header());
427 
428             if let Some(cx) = maybe_cx {
429                 // safety: the task is inserted in the list in `bind`.
430                 unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
431             } else {
432                 self.queue.lock().push_back(Entry::Release(ptr));
433                 self.unpark.unpark();
434                 // Returning `None` here prevents the task plumbing from being
435                 // freed. It is then up to the scheduler through the queue we
436                 // just added to, or its Drop impl to free the task.
437                 None
438             }
439         })
440     }
441 
schedule(&self, task: task::Notified<Self>)442     fn schedule(&self, task: task::Notified<Self>) {
443         CURRENT.with(|maybe_cx| match maybe_cx {
444             Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
445                 cx.tasks.borrow_mut().queue.push_back(task);
446             }
447             _ => {
448                 self.queue.lock().push_back(Entry::Schedule(task));
449                 self.unpark.unpark();
450             }
451         });
452     }
453 }
454 
455 impl Wake for Shared {
wake(self: Arc<Self>)456     fn wake(self: Arc<Self>) {
457         Wake::wake_by_ref(&self)
458     }
459 
460     /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)461     fn wake_by_ref(arc_self: &Arc<Self>) {
462         arc_self.woken.store(true, Release);
463         arc_self.unpark.unpark();
464     }
465 }
466 
467 // ===== InnerGuard =====
468 
469 /// Used to ensure we always place the Inner value
470 /// back into its slot in `BasicScheduler`, even if the
471 /// future panics.
472 struct InnerGuard<'a, P: Park> {
473     inner: Option<Inner<P>>,
474     basic_scheduler: &'a BasicScheduler<P>,
475 }
476 
477 impl<P: Park> InnerGuard<'_, P> {
block_on<F: Future>(&mut self, future: F) -> F::Output478     fn block_on<F: Future>(&mut self, future: F) -> F::Output {
479         // The only time inner gets set to `None` is if we have dropped
480         // already so this unwrap is safe.
481         self.inner.as_mut().unwrap().block_on(future)
482     }
483 }
484 
485 impl<P: Park> Drop for InnerGuard<'_, P> {
drop(&mut self)486     fn drop(&mut self) {
487         if let Some(scheduler) = self.inner.take() {
488             let mut lock = self.basic_scheduler.inner.lock();
489 
490             // Replace old scheduler back into the state to allow
491             // other threads to pick it up and drive it.
492             lock.replace(scheduler);
493 
494             // Wake up other possible threads that could steal
495             // the dedicated parker P.
496             self.basic_scheduler.notify.notify_one()
497         }
498     }
499 }
500