1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 
7 use crate::coop;
8 use crate::loom::rand::seed;
9 use crate::loom::sync::{Arc, Mutex};
10 use crate::park::{Park, Unpark};
11 use crate::runtime;
12 use crate::runtime::enter::EnterContext;
13 use crate::runtime::park::{Parker, Unparker};
14 use crate::runtime::thread_pool::{AtomicCell, Idle};
15 use crate::runtime::{queue, task};
16 use crate::util::linked_list::{Link, LinkedList};
17 use crate::util::FastRand;
18 
19 use std::cell::RefCell;
20 use std::time::Duration;
21 
22 /// A scheduler worker
23 pub(super) struct Worker {
24     /// Reference to shared state
25     shared: Arc<Shared>,
26 
27     /// Index holding this worker's remote state
28     index: usize,
29 
30     /// Used to hand-off a worker's core to another thread.
31     core: AtomicCell<Core>,
32 }
33 
34 /// Core data
35 struct Core {
36     /// Used to schedule bookkeeping tasks every so often.
37     tick: u8,
38 
39     /// When a task is scheduled from a worker, it is stored in this slot. The
40     /// worker will check this slot for a task **before** checking the run
41     /// queue. This effectively results in the **last** scheduled task to be run
42     /// next (LIFO). This is an optimization for message passing patterns and
43     /// helps to reduce latency.
44     lifo_slot: Option<Notified>,
45 
46     /// The worker-local run queue.
47     run_queue: queue::Local<Arc<Worker>>,
48 
49     /// True if the worker is currently searching for more work. Searching
50     /// involves attempting to steal from other workers.
51     is_searching: bool,
52 
53     /// True if the scheduler is being shutdown
54     is_shutdown: bool,
55 
56     /// Tasks owned by the core
57     tasks: LinkedList<Task, <Task as Link>::Target>,
58 
59     /// Parker
60     ///
61     /// Stored in an `Option` as the parker is added / removed to make the
62     /// borrow checker happy.
63     park: Option<Parker>,
64 
65     /// Fast random number generator.
66     rand: FastRand,
67 }
68 
69 /// State shared across all workers
70 pub(super) struct Shared {
71     /// Per-worker remote state. All other workers have access to this and is
72     /// how they communicate between each other.
73     remotes: Box<[Remote]>,
74 
75     /// Submit work to the scheduler while **not** currently on a worker thread.
76     inject: queue::Inject<Arc<Worker>>,
77 
78     /// Coordinates idle workers
79     idle: Idle,
80 
81     /// Cores that have observed the shutdown signal
82     ///
83     /// The core is **not** placed back in the worker to avoid it from being
84     /// stolen by a thread that was spawned as part of `block_in_place`.
85     #[allow(clippy::vec_box)] // we're moving an already-boxed value
86     shutdown_cores: Mutex<Vec<Box<Core>>>,
87 }
88 
89 /// Used to communicate with a worker from other threads.
90 struct Remote {
91     /// Steal tasks from this worker.
92     steal: queue::Steal<Arc<Worker>>,
93 
94     /// Transfers tasks to be released. Any worker pushes tasks, only the owning
95     /// worker pops.
96     pending_drop: task::TransferStack<Arc<Worker>>,
97 
98     /// Unparks the associated worker thread
99     unpark: Unparker,
100 }
101 
102 /// Thread-local context
103 struct Context {
104     /// Worker
105     worker: Arc<Worker>,
106 
107     /// Core data
108     core: RefCell<Option<Box<Core>>>,
109 }
110 
111 /// Starts the workers
112 pub(crate) struct Launch(Vec<Arc<Worker>>);
113 
114 /// Running a task may consume the core. If the core is still available when
115 /// running the task completes, it is returned. Otherwise, the worker will need
116 /// to stop processing.
117 type RunResult = Result<Box<Core>, ()>;
118 
119 /// A task handle
120 type Task = task::Task<Arc<Worker>>;
121 
122 /// A notified task handle
123 type Notified = task::Notified<Arc<Worker>>;
124 
125 // Tracks thread-local state
126 scoped_thread_local!(static CURRENT: Context);
127 
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)128 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
129     let mut cores = vec![];
130     let mut remotes = vec![];
131 
132     // Create the local queues
133     for _ in 0..size {
134         let (steal, run_queue) = queue::local();
135 
136         let park = park.clone();
137         let unpark = park.unpark();
138 
139         cores.push(Box::new(Core {
140             tick: 0,
141             lifo_slot: None,
142             run_queue,
143             is_searching: false,
144             is_shutdown: false,
145             tasks: LinkedList::new(),
146             park: Some(park),
147             rand: FastRand::new(seed()),
148         }));
149 
150         remotes.push(Remote {
151             steal,
152             pending_drop: task::TransferStack::new(),
153             unpark,
154         });
155     }
156 
157     let shared = Arc::new(Shared {
158         remotes: remotes.into_boxed_slice(),
159         inject: queue::Inject::new(),
160         idle: Idle::new(size),
161         shutdown_cores: Mutex::new(vec![]),
162     });
163 
164     let mut launch = Launch(vec![]);
165 
166     for (index, core) in cores.drain(..).enumerate() {
167         launch.0.push(Arc::new(Worker {
168             shared: shared.clone(),
169             index,
170             core: AtomicCell::new(Some(core)),
171         }));
172     }
173 
174     (shared, launch)
175 }
176 
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,177 pub(crate) fn block_in_place<F, R>(f: F) -> R
178 where
179     F: FnOnce() -> R,
180 {
181     // Try to steal the worker core back
182     struct Reset(coop::Budget);
183 
184     impl Drop for Reset {
185         fn drop(&mut self) {
186             CURRENT.with(|maybe_cx| {
187                 if let Some(cx) = maybe_cx {
188                     let core = cx.worker.core.take();
189                     let mut cx_core = cx.core.borrow_mut();
190                     assert!(cx_core.is_none());
191                     *cx_core = core;
192 
193                     // Reset the task budget as we are re-entering the
194                     // runtime.
195                     coop::set(self.0);
196                 }
197             });
198         }
199     }
200 
201     let mut had_entered = false;
202 
203     CURRENT.with(|maybe_cx| {
204         match (crate::runtime::enter::context(), maybe_cx.is_some()) {
205             (EnterContext::Entered { .. }, true) => {
206                 // We are on a thread pool runtime thread, so we just need to set up blocking.
207                 had_entered = true;
208             }
209             (EnterContext::Entered { allow_blocking }, false) => {
210                 // We are on an executor, but _not_ on the thread pool.
211                 // That is _only_ okay if we are in a thread pool runtime's block_on method:
212                 if allow_blocking {
213                     had_entered = true;
214                     return;
215                 } else {
216                     // This probably means we are on the basic_scheduler or in a LocalSet,
217                     // where it is _not_ okay to block.
218                     panic!("can call blocking only when running on the multi-threaded runtime");
219                 }
220             }
221             (EnterContext::NotEntered, true) => {
222                 // This is a nested call to block_in_place (we already exited).
223                 // All the necessary setup has already been done.
224                 return;
225             }
226             (EnterContext::NotEntered, false) => {
227                 // We are outside of the tokio runtime, so blocking is fine.
228                 // We can also skip all of the thread pool blocking setup steps.
229                 return;
230             }
231         }
232 
233         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
234 
235         // Get the worker core. If none is set, then blocking is fine!
236         let core = match cx.core.borrow_mut().take() {
237             Some(core) => core,
238             None => return,
239         };
240 
241         // The parker should be set here
242         assert!(core.park.is_some());
243 
244         // In order to block, the core must be sent to another thread for
245         // execution.
246         //
247         // First, move the core back into the worker's shared core slot.
248         cx.worker.core.set(core);
249 
250         // Next, clone the worker handle and send it to a new thread for
251         // processing.
252         //
253         // Once the blocking task is done executing, we will attempt to
254         // steal the core back.
255         let worker = cx.worker.clone();
256         runtime::spawn_blocking(move || run(worker));
257     });
258 
259     if had_entered {
260         // Unset the current task's budget. Blocking sections are not
261         // constrained by task budgets.
262         let _reset = Reset(coop::stop());
263 
264         crate::runtime::enter::exit(f)
265     } else {
266         f()
267     }
268 }
269 
270 /// After how many ticks is the global queue polled. This helps to ensure
271 /// fairness.
272 ///
273 /// The number is fairly arbitrary. I believe this value was copied from golang.
274 const GLOBAL_POLL_INTERVAL: u8 = 61;
275 
276 impl Launch {
launch(mut self)277     pub(crate) fn launch(mut self) {
278         for worker in self.0.drain(..) {
279             runtime::spawn_blocking(move || run(worker));
280         }
281     }
282 }
283 
run(worker: Arc<Worker>)284 fn run(worker: Arc<Worker>) {
285     // Acquire a core. If this fails, then another thread is running this
286     // worker and there is nothing further to do.
287     let core = match worker.core.take() {
288         Some(core) => core,
289         None => return,
290     };
291 
292     // Set the worker context.
293     let cx = Context {
294         worker,
295         core: RefCell::new(None),
296     };
297 
298     let _enter = crate::runtime::enter(true);
299 
300     CURRENT.set(&cx, || {
301         // This should always be an error. It only returns a `Result` to support
302         // using `?` to short circuit.
303         assert!(cx.run(core).is_err());
304     });
305 }
306 
307 impl Context {
run(&self, mut core: Box<Core>) -> RunResult308     fn run(&self, mut core: Box<Core>) -> RunResult {
309         while !core.is_shutdown {
310             // Increment the tick
311             core.tick();
312 
313             // Run maintenance, if needed
314             core = self.maintenance(core);
315 
316             // First, check work available to the current worker.
317             if let Some(task) = core.next_task(&self.worker) {
318                 core = self.run_task(task, core)?;
319                 continue;
320             }
321 
322             // There is no more **local** work to process, try to steal work
323             // from other workers.
324             if let Some(task) = core.steal_work(&self.worker) {
325                 core = self.run_task(task, core)?;
326             } else {
327                 // Wait for work
328                 core = self.park(core);
329             }
330         }
331 
332         core.pre_shutdown(&self.worker);
333 
334         // Signal shutdown
335         self.worker.shared.shutdown(core);
336         Err(())
337     }
338 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult339     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
340         // Make sure the worker is not in the **searching** state. This enables
341         // another idle worker to try to steal work.
342         core.transition_from_searching(&self.worker);
343 
344         // Make the core available to the runtime context
345         *self.core.borrow_mut() = Some(core);
346 
347         // Run the task
348         coop::budget(|| {
349             task.run();
350 
351             // As long as there is budget remaining and a task exists in the
352             // `lifo_slot`, then keep running.
353             loop {
354                 // Check if we still have the core. If not, the core was stolen
355                 // by another worker.
356                 let mut core = match self.core.borrow_mut().take() {
357                     Some(core) => core,
358                     None => return Err(()),
359                 };
360 
361                 // Check for a task in the LIFO slot
362                 let task = match core.lifo_slot.take() {
363                     Some(task) => task,
364                     None => return Ok(core),
365                 };
366 
367                 if coop::has_budget_remaining() {
368                     // Run the LIFO task, then loop
369                     *self.core.borrow_mut() = Some(core);
370                     task.run();
371                 } else {
372                     // Not enough budget left to run the LIFO task, push it to
373                     // the back of the queue and return.
374                     core.run_queue.push_back(task, self.worker.inject());
375                     return Ok(core);
376                 }
377             }
378         })
379     }
380 
maintenance(&self, mut core: Box<Core>) -> Box<Core>381     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
382         if core.tick % GLOBAL_POLL_INTERVAL == 0 {
383             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
384             // to run without actually putting the thread to sleep.
385             core = self.park_timeout(core, Some(Duration::from_millis(0)));
386 
387             // Run regularly scheduled maintenance
388             core.maintenance(&self.worker);
389         }
390 
391         core
392     }
393 
park(&self, mut core: Box<Core>) -> Box<Core>394     fn park(&self, mut core: Box<Core>) -> Box<Core> {
395         core.transition_to_parked(&self.worker);
396 
397         while !core.is_shutdown {
398             core = self.park_timeout(core, None);
399 
400             // Run regularly scheduled maintenance
401             core.maintenance(&self.worker);
402 
403             if core.transition_from_parked(&self.worker) {
404                 return core;
405             }
406         }
407 
408         core
409     }
410 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>411     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
412         // Take the parker out of core
413         let mut park = core.park.take().expect("park missing");
414 
415         // Store `core` in context
416         *self.core.borrow_mut() = Some(core);
417 
418         // Park thread
419         if let Some(timeout) = duration {
420             park.park_timeout(timeout).expect("park failed");
421         } else {
422             park.park().expect("park failed");
423         }
424 
425         // Remove `core` from context
426         core = self.core.borrow_mut().take().expect("core missing");
427 
428         // Place `park` back in `core`
429         core.park = Some(park);
430 
431         // If there are tasks available to steal, notify a worker
432         if core.run_queue.is_stealable() {
433             self.worker.shared.notify_parked();
434         }
435 
436         core
437     }
438 }
439 
440 impl Core {
441     /// Increment the tick
tick(&mut self)442     fn tick(&mut self) {
443         self.tick = self.tick.wrapping_add(1);
444     }
445 
446     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>447     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
448         if self.tick % GLOBAL_POLL_INTERVAL == 0 {
449             worker.inject().pop().or_else(|| self.next_local_task())
450         } else {
451             self.next_local_task().or_else(|| worker.inject().pop())
452         }
453     }
454 
next_local_task(&mut self) -> Option<Notified>455     fn next_local_task(&mut self) -> Option<Notified> {
456         self.lifo_slot.take().or_else(|| self.run_queue.pop())
457     }
458 
steal_work(&mut self, worker: &Worker) -> Option<Notified>459     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
460         if !self.transition_to_searching(worker) {
461             return None;
462         }
463 
464         let num = worker.shared.remotes.len();
465         // Start from a random worker
466         let start = self.rand.fastrand_n(num as u32) as usize;
467 
468         for i in 0..num {
469             let i = (start + i) % num;
470 
471             // Don't steal from ourself! We know we don't have work.
472             if i == worker.index {
473                 continue;
474             }
475 
476             let target = &worker.shared.remotes[i];
477             if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
478                 return Some(task);
479             }
480         }
481 
482         // Fallback on checking the global queue
483         worker.shared.inject.pop()
484     }
485 
transition_to_searching(&mut self, worker: &Worker) -> bool486     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
487         if !self.is_searching {
488             self.is_searching = worker.shared.idle.transition_worker_to_searching();
489         }
490 
491         self.is_searching
492     }
493 
transition_from_searching(&mut self, worker: &Worker)494     fn transition_from_searching(&mut self, worker: &Worker) {
495         if !self.is_searching {
496             return;
497         }
498 
499         self.is_searching = false;
500         worker.shared.transition_worker_from_searching();
501     }
502 
503     /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)504     fn transition_to_parked(&mut self, worker: &Worker) {
505         // When the final worker transitions **out** of searching to parked, it
506         // must check all the queues one last time in case work materialized
507         // between the last work scan and transitioning out of searching.
508         let is_last_searcher = worker
509             .shared
510             .idle
511             .transition_worker_to_parked(worker.index, self.is_searching);
512 
513         // The worker is no longer searching. Setting this is the local cache
514         // only.
515         self.is_searching = false;
516 
517         if is_last_searcher {
518             worker.shared.notify_if_work_pending();
519         }
520     }
521 
522     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool523     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
524         // If a task is in the lifo slot, then we must unpark regardless of
525         // being notified
526         if self.lifo_slot.is_some() {
527             worker.shared.idle.unpark_worker_by_id(worker.index);
528             self.is_searching = true;
529             return true;
530         }
531 
532         if worker.shared.idle.is_parked(worker.index) {
533             return false;
534         }
535 
536         // When unparked, the worker is in the searching state.
537         self.is_searching = true;
538         true
539     }
540 
541     /// Runs maintenance work such as free pending tasks and check the pool's
542     /// state.
maintenance(&mut self, worker: &Worker)543     fn maintenance(&mut self, worker: &Worker) {
544         self.drain_pending_drop(worker);
545 
546         if !self.is_shutdown {
547             // Check if the scheduler has been shutdown
548             self.is_shutdown = worker.inject().is_closed();
549         }
550     }
551 
552     // Signals all tasks to shut down, and waits for them to complete. Must run
553     // before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)554     fn pre_shutdown(&mut self, worker: &Worker) {
555         // Signal to all tasks to shut down.
556         for header in self.tasks.iter() {
557             header.shutdown();
558         }
559 
560         loop {
561             self.drain_pending_drop(worker);
562 
563             if self.tasks.is_empty() {
564                 break;
565             }
566 
567             // Wait until signalled
568             let park = self.park.as_mut().expect("park missing");
569             park.park().expect("park failed");
570         }
571     }
572 
573     // Shutdown the core
shutdown(&mut self)574     fn shutdown(&mut self) {
575         assert!(self.tasks.is_empty());
576 
577         // Take the core
578         let mut park = self.park.take().expect("park missing");
579 
580         // Drain the queue
581         while self.next_local_task().is_some() {}
582 
583         park.shutdown();
584     }
585 
drain_pending_drop(&mut self, worker: &Worker)586     fn drain_pending_drop(&mut self, worker: &Worker) {
587         use std::mem::ManuallyDrop;
588 
589         for task in worker.remote().pending_drop.drain() {
590             let task = ManuallyDrop::new(task);
591 
592             // safety: tasks are only pushed into the `pending_drop` stacks that
593             // are associated with the list they are inserted into. When a task
594             // is pushed into `pending_drop`, the ref-inc is skipped, so we must
595             // not ref-dec here.
596             //
597             // See `bind` and `release` implementations.
598             unsafe {
599                 self.tasks.remove(task.header().into());
600             }
601         }
602     }
603 }
604 
605 impl Worker {
606     /// Returns a reference to the scheduler's injection queue
inject(&self) -> &queue::Inject<Arc<Worker>>607     fn inject(&self) -> &queue::Inject<Arc<Worker>> {
608         &self.shared.inject
609     }
610 
611     /// Return a reference to this worker's remote data
remote(&self) -> &Remote612     fn remote(&self) -> &Remote {
613         &self.shared.remotes[self.index]
614     }
615 
eq(&self, other: &Worker) -> bool616     fn eq(&self, other: &Worker) -> bool {
617         self.shared.ptr_eq(&other.shared) && self.index == other.index
618     }
619 }
620 
621 impl task::Schedule for Arc<Worker> {
bind(task: Task) -> Arc<Worker>622     fn bind(task: Task) -> Arc<Worker> {
623         CURRENT.with(|maybe_cx| {
624             let cx = maybe_cx.expect("scheduler context missing");
625 
626             // Track the task
627             cx.core
628                 .borrow_mut()
629                 .as_mut()
630                 .expect("scheduler core missing")
631                 .tasks
632                 .push_front(task);
633 
634             // Return a clone of the worker
635             cx.worker.clone()
636         })
637     }
638 
release(&self, task: &Task) -> Option<Task>639     fn release(&self, task: &Task) -> Option<Task> {
640         use std::ptr::NonNull;
641 
642         enum Immediate {
643             // Task has been synchronously removed from the Core owned by the
644             // current thread
645             Removed(Option<Task>),
646             // Task is owned by another thread, so we need to notify it to clean
647             // up the task later.
648             MaybeRemote,
649         }
650 
651         let immediate = CURRENT.with(|maybe_cx| {
652             let cx = match maybe_cx {
653                 Some(cx) => cx,
654                 None => return Immediate::MaybeRemote,
655             };
656 
657             if !self.eq(&cx.worker) {
658                 // Task owned by another core, so we need to notify it.
659                 return Immediate::MaybeRemote;
660             }
661 
662             let mut maybe_core = cx.core.borrow_mut();
663 
664             if let Some(core) = &mut *maybe_core {
665                 // Directly remove the task
666                 //
667                 // safety: the task is inserted in the list in `bind`.
668                 unsafe {
669                     let ptr = NonNull::from(task.header());
670                     return Immediate::Removed(core.tasks.remove(ptr));
671                 }
672             }
673 
674             Immediate::MaybeRemote
675         });
676 
677         // Checks if we were called from within a worker, allowing for immediate
678         // removal of a scheduled task. Else we have to go through the slower
679         // process below where we remotely mark a task as dropped.
680         match immediate {
681             Immediate::Removed(task) => return task,
682             Immediate::MaybeRemote => (),
683         };
684 
685         // Track the task to be released by the worker that owns it
686         //
687         // Safety: We get a new handle without incrementing the ref-count.
688         // A ref-count is held by the "owned" linked list and it is only
689         // ever removed from that list as part of the release process: this
690         // method or popping the task from `pending_drop`. Thus, we can rely
691         // on the ref-count held by the linked-list to keep the memory
692         // alive.
693         //
694         // When the task is removed from the stack, it is forgotten instead
695         // of dropped.
696         let task = unsafe { Task::from_raw(task.header().into()) };
697 
698         self.remote().pending_drop.push(task);
699 
700         // The worker core has been handed off to another thread. In the
701         // event that the scheduler is currently shutting down, the thread
702         // that owns the task may be waiting on the release to complete
703         // shutdown.
704         if self.inject().is_closed() {
705             self.remote().unpark.unpark();
706         }
707 
708         None
709     }
710 
schedule(&self, task: Notified)711     fn schedule(&self, task: Notified) {
712         self.shared.schedule(task, false);
713     }
714 
yield_now(&self, task: Notified)715     fn yield_now(&self, task: Notified) {
716         self.shared.schedule(task, true);
717     }
718 }
719 
720 impl Shared {
schedule(&self, task: Notified, is_yield: bool)721     pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
722         CURRENT.with(|maybe_cx| {
723             if let Some(cx) = maybe_cx {
724                 // Make sure the task is part of the **current** scheduler.
725                 if self.ptr_eq(&cx.worker.shared) {
726                     // And the current thread still holds a core
727                     if let Some(core) = cx.core.borrow_mut().as_mut() {
728                         self.schedule_local(core, task, is_yield);
729                         return;
730                     }
731                 }
732             }
733 
734             // Otherwise, use the inject queue
735             self.inject.push(task);
736             self.notify_parked();
737         });
738     }
739 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)740     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
741         // Spawning from the worker thread. If scheduling a "yield" then the
742         // task must always be pushed to the back of the queue, enabling other
743         // tasks to be executed. If **not** a yield, then there is more
744         // flexibility and the task may go to the front of the queue.
745         let should_notify = if is_yield {
746             core.run_queue.push_back(task, &self.inject);
747             true
748         } else {
749             // Push to the LIFO slot
750             let prev = core.lifo_slot.take();
751             let ret = prev.is_some();
752 
753             if let Some(prev) = prev {
754                 core.run_queue.push_back(prev, &self.inject);
755             }
756 
757             core.lifo_slot = Some(task);
758 
759             ret
760         };
761 
762         // Only notify if not currently parked. If `park` is `None`, then the
763         // scheduling is from a resource driver. As notifications often come in
764         // batches, the notification is delayed until the park is complete.
765         if should_notify && core.park.is_some() {
766             self.notify_parked();
767         }
768     }
769 
close(&self)770     pub(super) fn close(&self) {
771         if self.inject.close() {
772             self.notify_all();
773         }
774     }
775 
notify_parked(&self)776     fn notify_parked(&self) {
777         if let Some(index) = self.idle.worker_to_notify() {
778             self.remotes[index].unpark.unpark();
779         }
780     }
781 
notify_all(&self)782     fn notify_all(&self) {
783         for remote in &self.remotes[..] {
784             remote.unpark.unpark();
785         }
786     }
787 
notify_if_work_pending(&self)788     fn notify_if_work_pending(&self) {
789         for remote in &self.remotes[..] {
790             if !remote.steal.is_empty() {
791                 self.notify_parked();
792                 return;
793             }
794         }
795 
796         if !self.inject.is_empty() {
797             self.notify_parked();
798         }
799     }
800 
transition_worker_from_searching(&self)801     fn transition_worker_from_searching(&self) {
802         if self.idle.transition_worker_from_searching() {
803             // We are the final searching worker. Because work was found, we
804             // need to notify another worker.
805             self.notify_parked();
806         }
807     }
808 
809     /// Signals that a worker has observed the shutdown signal and has replaced
810     /// its core back into its handle.
811     ///
812     /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>)813     fn shutdown(&self, core: Box<Core>) {
814         let mut cores = self.shutdown_cores.lock();
815         cores.push(core);
816 
817         if cores.len() != self.remotes.len() {
818             return;
819         }
820 
821         for mut core in cores.drain(..) {
822             core.shutdown();
823         }
824 
825         // Drain the injection queue
826         while self.inject.pop().is_some() {}
827     }
828 
ptr_eq(&self, other: &Shared) -> bool829     fn ptr_eq(&self, other: &Shared) -> bool {
830         std::ptr::eq(self, other)
831     }
832 }
833