1 use crate::job::{JobFifo, JobRef, StackJob};
2 use crate::latch::{AsCoreLatch, CoreLatch, CountLatch, Latch, LockLatch, SpinLatch};
3 use crate::log::Event::*;
4 use crate::log::Logger;
5 use crate::sleep::Sleep;
6 use crate::unwind;
7 use crate::util::leak;
8 use crate::{
9     ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder,
10 };
11 use crossbeam_deque::{Injector, Steal, Stealer, Worker};
12 use std::any::Any;
13 use std::cell::Cell;
14 use std::collections::hash_map::DefaultHasher;
15 use std::fmt;
16 use std::hash::Hasher;
17 use std::io;
18 use std::mem;
19 use std::ptr;
20 #[allow(deprecated)]
21 use std::sync::atomic::ATOMIC_USIZE_INIT;
22 use std::sync::atomic::{AtomicUsize, Ordering};
23 use std::sync::{Arc, Once};
24 use std::thread;
25 use std::usize;
26 
27 /// Thread builder used for customization via
28 /// [`ThreadPoolBuilder::spawn_handler`](struct.ThreadPoolBuilder.html#method.spawn_handler).
29 pub struct ThreadBuilder {
30     name: Option<String>,
31     stack_size: Option<usize>,
32     worker: Worker<JobRef>,
33     registry: Arc<Registry>,
34     index: usize,
35 }
36 
37 impl ThreadBuilder {
38     /// Gets the index of this thread in the pool, within `0..num_threads`.
index(&self) -> usize39     pub fn index(&self) -> usize {
40         self.index
41     }
42 
43     /// Gets the string that was specified by `ThreadPoolBuilder::name()`.
name(&self) -> Option<&str>44     pub fn name(&self) -> Option<&str> {
45         self.name.as_ref().map(String::as_str)
46     }
47 
48     /// Gets the value that was specified by `ThreadPoolBuilder::stack_size()`.
stack_size(&self) -> Option<usize>49     pub fn stack_size(&self) -> Option<usize> {
50         self.stack_size
51     }
52 
53     /// Executes the main loop for this thread. This will not return until the
54     /// thread pool is dropped.
run(self)55     pub fn run(self) {
56         unsafe { main_loop(self.worker, self.registry, self.index) }
57     }
58 }
59 
60 impl fmt::Debug for ThreadBuilder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result61     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62         f.debug_struct("ThreadBuilder")
63             .field("pool", &self.registry.id())
64             .field("index", &self.index)
65             .field("name", &self.name)
66             .field("stack_size", &self.stack_size)
67             .finish()
68     }
69 }
70 
71 /// Generalized trait for spawning a thread in the `Registry`.
72 ///
73 /// This trait is pub-in-private -- E0445 forces us to make it public,
74 /// but we don't actually want to expose these details in the API.
75 pub trait ThreadSpawn {
76     private_decl! {}
77 
78     /// Spawn a thread with the `ThreadBuilder` parameters, and then
79     /// call `ThreadBuilder::run()`.
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>80     fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>;
81 }
82 
83 /// Spawns a thread in the "normal" way with `std::thread::Builder`.
84 ///
85 /// This type is pub-in-private -- E0445 forces us to make it public,
86 /// but we don't actually want to expose these details in the API.
87 #[derive(Debug, Default)]
88 pub struct DefaultSpawn;
89 
90 impl ThreadSpawn for DefaultSpawn {
91     private_impl! {}
92 
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>93     fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
94         let mut b = thread::Builder::new();
95         if let Some(name) = thread.name() {
96             b = b.name(name.to_owned());
97         }
98         if let Some(stack_size) = thread.stack_size() {
99             b = b.stack_size(stack_size);
100         }
101         b.spawn(|| thread.run())?;
102         Ok(())
103     }
104 }
105 
106 /// Spawns a thread with a user's custom callback.
107 ///
108 /// This type is pub-in-private -- E0445 forces us to make it public,
109 /// but we don't actually want to expose these details in the API.
110 #[derive(Debug)]
111 pub struct CustomSpawn<F>(F);
112 
113 impl<F> CustomSpawn<F>
114 where
115     F: FnMut(ThreadBuilder) -> io::Result<()>,
116 {
new(spawn: F) -> Self117     pub(super) fn new(spawn: F) -> Self {
118         CustomSpawn(spawn)
119     }
120 }
121 
122 impl<F> ThreadSpawn for CustomSpawn<F>
123 where
124     F: FnMut(ThreadBuilder) -> io::Result<()>,
125 {
126     private_impl! {}
127 
128     #[inline]
spawn(&mut self, thread: ThreadBuilder) -> io::Result<()>129     fn spawn(&mut self, thread: ThreadBuilder) -> io::Result<()> {
130         (self.0)(thread)
131     }
132 }
133 
134 pub(super) struct Registry {
135     logger: Logger,
136     thread_infos: Vec<ThreadInfo>,
137     sleep: Sleep,
138     injected_jobs: Injector<JobRef>,
139     panic_handler: Option<Box<PanicHandler>>,
140     start_handler: Option<Box<StartHandler>>,
141     exit_handler: Option<Box<ExitHandler>>,
142 
143     // When this latch reaches 0, it means that all work on this
144     // registry must be complete. This is ensured in the following ways:
145     //
146     // - if this is the global registry, there is a ref-count that never
147     //   gets released.
148     // - if this is a user-created thread-pool, then so long as the thread-pool
149     //   exists, it holds a reference.
150     // - when we inject a "blocking job" into the registry with `ThreadPool::install()`,
151     //   no adjustment is needed; the `ThreadPool` holds the reference, and since we won't
152     //   return until the blocking job is complete, that ref will continue to be held.
153     // - when `join()` or `scope()` is invoked, similarly, no adjustments are needed.
154     //   These are always owned by some other job (e.g., one injected by `ThreadPool::install()`)
155     //   and that job will keep the pool alive.
156     terminate_count: AtomicUsize,
157 }
158 
159 /// ////////////////////////////////////////////////////////////////////////
160 /// Initialization
161 
162 static mut THE_REGISTRY: Option<&'static Arc<Registry>> = None;
163 static THE_REGISTRY_SET: Once = Once::new();
164 
165 /// Starts the worker threads (if that has not already happened). If
166 /// initialization has not already occurred, use the default
167 /// configuration.
global_registry() -> &'static Arc<Registry>168 fn global_registry() -> &'static Arc<Registry> {
169     set_global_registry(|| Registry::new(ThreadPoolBuilder::new()))
170         .or_else(|err| unsafe { THE_REGISTRY.ok_or(err) })
171         .expect("The global thread pool has not been initialized.")
172 }
173 
174 /// Starts the worker threads (if that has not already happened) with
175 /// the given builder.
init_global_registry<S>( builder: ThreadPoolBuilder<S>, ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> where S: ThreadSpawn,176 pub(super) fn init_global_registry<S>(
177     builder: ThreadPoolBuilder<S>,
178 ) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
179 where
180     S: ThreadSpawn,
181 {
182     set_global_registry(|| Registry::new(builder))
183 }
184 
185 /// Starts the worker threads (if that has not already happened)
186 /// by creating a registry with the given callback.
set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError> where F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,187 fn set_global_registry<F>(registry: F) -> Result<&'static Arc<Registry>, ThreadPoolBuildError>
188 where
189     F: FnOnce() -> Result<Arc<Registry>, ThreadPoolBuildError>,
190 {
191     let mut result = Err(ThreadPoolBuildError::new(
192         ErrorKind::GlobalPoolAlreadyInitialized,
193     ));
194     THE_REGISTRY_SET.call_once(|| {
195         result = registry().map(|registry| {
196             let registry = leak(registry);
197             unsafe {
198                 THE_REGISTRY = Some(registry);
199             }
200             registry
201         });
202     });
203     result
204 }
205 
206 struct Terminator<'a>(&'a Arc<Registry>);
207 
208 impl<'a> Drop for Terminator<'a> {
drop(&mut self)209     fn drop(&mut self) {
210         self.0.terminate()
211     }
212 }
213 
214 impl Registry {
new<S>( mut builder: ThreadPoolBuilder<S>, ) -> Result<Arc<Self>, ThreadPoolBuildError> where S: ThreadSpawn,215     pub(super) fn new<S>(
216         mut builder: ThreadPoolBuilder<S>,
217     ) -> Result<Arc<Self>, ThreadPoolBuildError>
218     where
219         S: ThreadSpawn,
220     {
221         let n_threads = builder.get_num_threads();
222         let breadth_first = builder.get_breadth_first();
223 
224         let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads)
225             .map(|_| {
226                 let worker = if breadth_first {
227                     Worker::new_fifo()
228                 } else {
229                     Worker::new_lifo()
230                 };
231 
232                 let stealer = worker.stealer();
233                 (worker, stealer)
234             })
235             .unzip();
236 
237         let logger = Logger::new(n_threads);
238         let registry = Arc::new(Registry {
239             logger: logger.clone(),
240             thread_infos: stealers.into_iter().map(ThreadInfo::new).collect(),
241             sleep: Sleep::new(logger, n_threads),
242             injected_jobs: Injector::new(),
243             terminate_count: AtomicUsize::new(1),
244             panic_handler: builder.take_panic_handler(),
245             start_handler: builder.take_start_handler(),
246             exit_handler: builder.take_exit_handler(),
247         });
248 
249         // If we return early or panic, make sure to terminate existing threads.
250         let t1000 = Terminator(&registry);
251 
252         for (index, worker) in workers.into_iter().enumerate() {
253             let thread = ThreadBuilder {
254                 name: builder.get_thread_name(index),
255                 stack_size: builder.get_stack_size(),
256                 registry: registry.clone(),
257                 worker,
258                 index,
259             };
260             if let Err(e) = builder.get_spawn_handler().spawn(thread) {
261                 return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)));
262             }
263         }
264 
265         // Returning normally now, without termination.
266         mem::forget(t1000);
267 
268         Ok(registry.clone())
269     }
270 
current() -> Arc<Registry>271     pub(super) fn current() -> Arc<Registry> {
272         unsafe {
273             let worker_thread = WorkerThread::current();
274             if worker_thread.is_null() {
275                 global_registry().clone()
276             } else {
277                 (*worker_thread).registry.clone()
278             }
279         }
280     }
281 
282     /// Returns the number of threads in the current registry.  This
283     /// is better than `Registry::current().num_threads()` because it
284     /// avoids incrementing the `Arc`.
current_num_threads() -> usize285     pub(super) fn current_num_threads() -> usize {
286         unsafe {
287             let worker_thread = WorkerThread::current();
288             if worker_thread.is_null() {
289                 global_registry().num_threads()
290             } else {
291                 (*worker_thread).registry.num_threads()
292             }
293         }
294     }
295 
296     /// Returns the current `WorkerThread` if it's part of this `Registry`.
current_thread(&self) -> Option<&WorkerThread>297     pub(super) fn current_thread(&self) -> Option<&WorkerThread> {
298         unsafe {
299             let worker = WorkerThread::current().as_ref()?;
300             if worker.registry().id() == self.id() {
301                 Some(worker)
302             } else {
303                 None
304             }
305         }
306     }
307 
308     /// Returns an opaque identifier for this registry.
id(&self) -> RegistryId309     pub(super) fn id(&self) -> RegistryId {
310         // We can rely on `self` not to change since we only ever create
311         // registries that are boxed up in an `Arc` (see `new()` above).
312         RegistryId {
313             addr: self as *const Self as usize,
314         }
315     }
316 
317     #[inline]
log(&self, event: impl FnOnce() -> crate::log::Event)318     pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
319         self.logger.log(event)
320     }
321 
num_threads(&self) -> usize322     pub(super) fn num_threads(&self) -> usize {
323         self.thread_infos.len()
324     }
325 
handle_panic(&self, err: Box<dyn Any + Send>)326     pub(super) fn handle_panic(&self, err: Box<dyn Any + Send>) {
327         match self.panic_handler {
328             Some(ref handler) => {
329                 // If the customizable panic handler itself panics,
330                 // then we abort.
331                 let abort_guard = unwind::AbortIfPanic;
332                 handler(err);
333                 mem::forget(abort_guard);
334             }
335             None => {
336                 // Default panic handler aborts.
337                 let _ = unwind::AbortIfPanic; // let this drop.
338             }
339         }
340     }
341 
342     /// Waits for the worker threads to get up and running.  This is
343     /// meant to be used for benchmarking purposes, primarily, so that
344     /// you can get more consistent numbers by having everything
345     /// "ready to go".
wait_until_primed(&self)346     pub(super) fn wait_until_primed(&self) {
347         for info in &self.thread_infos {
348             info.primed.wait();
349         }
350     }
351 
352     /// Waits for the worker threads to stop. This is used for testing
353     /// -- so we can check that termination actually works.
354     #[cfg(test)]
wait_until_stopped(&self)355     pub(super) fn wait_until_stopped(&self) {
356         for info in &self.thread_infos {
357             info.stopped.wait();
358         }
359     }
360 
361     /// ////////////////////////////////////////////////////////////////////////
362     /// MAIN LOOP
363     ///
364     /// So long as all of the worker threads are hanging out in their
365     /// top-level loop, there is no work to be done.
366 
367     /// Push a job into the given `registry`. If we are running on a
368     /// worker thread for the registry, this will push onto the
369     /// deque. Else, it will inject from the outside (which is slower).
inject_or_push(&self, job_ref: JobRef)370     pub(super) fn inject_or_push(&self, job_ref: JobRef) {
371         let worker_thread = WorkerThread::current();
372         unsafe {
373             if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
374                 (*worker_thread).push(job_ref);
375             } else {
376                 self.inject(&[job_ref]);
377             }
378         }
379     }
380 
381     /// Push a job into the "external jobs" queue; it will be taken by
382     /// whatever worker has nothing to do. Use this is you know that
383     /// you are not on a worker of this registry.
inject(&self, injected_jobs: &[JobRef])384     pub(super) fn inject(&self, injected_jobs: &[JobRef]) {
385         self.log(|| JobsInjected {
386             count: injected_jobs.len(),
387         });
388 
389         // It should not be possible for `state.terminate` to be true
390         // here. It is only set to true when the user creates (and
391         // drops) a `ThreadPool`; and, in that case, they cannot be
392         // calling `inject()` later, since they dropped their
393         // `ThreadPool`.
394         debug_assert_ne!(
395             self.terminate_count.load(Ordering::Acquire),
396             0,
397             "inject() sees state.terminate as true"
398         );
399 
400         let queue_was_empty = self.injected_jobs.is_empty();
401 
402         for &job_ref in injected_jobs {
403             self.injected_jobs.push(job_ref);
404         }
405 
406         self.sleep
407             .new_injected_jobs(usize::MAX, injected_jobs.len() as u32, queue_was_empty);
408     }
409 
has_injected_job(&self) -> bool410     fn has_injected_job(&self) -> bool {
411         !self.injected_jobs.is_empty()
412     }
413 
pop_injected_job(&self, worker_index: usize) -> Option<JobRef>414     fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
415         loop {
416             match self.injected_jobs.steal() {
417                 Steal::Success(job) => {
418                     self.log(|| JobUninjected {
419                         worker: worker_index,
420                     });
421                     return Some(job);
422                 }
423                 Steal::Empty => return None,
424                 Steal::Retry => {}
425             }
426         }
427     }
428 
429     /// If already in a worker-thread of this registry, just execute `op`.
430     /// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
431     /// completes and return its return value. If `op` panics, that panic will
432     /// be propagated as well.  The second argument indicates `true` if injection
433     /// was performed, `false` if executed directly.
in_worker<OP, R>(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,434     pub(super) fn in_worker<OP, R>(&self, op: OP) -> R
435     where
436         OP: FnOnce(&WorkerThread, bool) -> R + Send,
437         R: Send,
438     {
439         unsafe {
440             let worker_thread = WorkerThread::current();
441             if worker_thread.is_null() {
442                 self.in_worker_cold(op)
443             } else if (*worker_thread).registry().id() != self.id() {
444                 self.in_worker_cross(&*worker_thread, op)
445             } else {
446                 // Perfectly valid to give them a `&T`: this is the
447                 // current thread, so we know the data structure won't be
448                 // invalidated until we return.
449                 op(&*worker_thread, false)
450             }
451         }
452     }
453 
454     #[cold]
in_worker_cold<OP, R>(&self, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,455     unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
456     where
457         OP: FnOnce(&WorkerThread, bool) -> R + Send,
458         R: Send,
459     {
460         thread_local!(static LOCK_LATCH: LockLatch = LockLatch::new());
461 
462         LOCK_LATCH.with(|l| {
463             // This thread isn't a member of *any* thread pool, so just block.
464             debug_assert!(WorkerThread::current().is_null());
465             let job = StackJob::new(
466                 |injected| {
467                     let worker_thread = WorkerThread::current();
468                     assert!(injected && !worker_thread.is_null());
469                     op(&*worker_thread, true)
470                 },
471                 l,
472             );
473             self.inject(&[job.as_job_ref()]);
474             job.latch.wait_and_reset(); // Make sure we can use the same latch again next time.
475 
476             // flush accumulated logs as we exit the thread
477             self.logger.log(|| Flush);
478 
479             job.into_result()
480         })
481     }
482 
483     #[cold]
in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,484     unsafe fn in_worker_cross<OP, R>(&self, current_thread: &WorkerThread, op: OP) -> R
485     where
486         OP: FnOnce(&WorkerThread, bool) -> R + Send,
487         R: Send,
488     {
489         // This thread is a member of a different pool, so let it process
490         // other work while waiting for this `op` to complete.
491         debug_assert!(current_thread.registry().id() != self.id());
492         let latch = SpinLatch::cross(current_thread);
493         let job = StackJob::new(
494             |injected| {
495                 let worker_thread = WorkerThread::current();
496                 assert!(injected && !worker_thread.is_null());
497                 op(&*worker_thread, true)
498             },
499             latch,
500         );
501         self.inject(&[job.as_job_ref()]);
502         current_thread.wait_until(&job.latch);
503         job.into_result()
504     }
505 
506     /// Increments the terminate counter. This increment should be
507     /// balanced by a call to `terminate`, which will decrement. This
508     /// is used when spawning asynchronous work, which needs to
509     /// prevent the registry from terminating so long as it is active.
510     ///
511     /// Note that blocking functions such as `join` and `scope` do not
512     /// need to concern themselves with this fn; their context is
513     /// responsible for ensuring the current thread-pool will not
514     /// terminate until they return.
515     ///
516     /// The global thread-pool always has an outstanding reference
517     /// (the initial one). Custom thread-pools have one outstanding
518     /// reference that is dropped when the `ThreadPool` is dropped:
519     /// since installing the thread-pool blocks until any joins/scopes
520     /// complete, this ensures that joins/scopes are covered.
521     ///
522     /// The exception is `::spawn()`, which can create a job outside
523     /// of any blocking scope. In that case, the job itself holds a
524     /// terminate count and is responsible for invoking `terminate()`
525     /// when finished.
increment_terminate_count(&self)526     pub(super) fn increment_terminate_count(&self) {
527         let previous = self.terminate_count.fetch_add(1, Ordering::AcqRel);
528         debug_assert!(previous != 0, "registry ref count incremented from zero");
529         assert!(
530             previous != std::usize::MAX,
531             "overflow in registry ref count"
532         );
533     }
534 
535     /// Signals that the thread-pool which owns this registry has been
536     /// dropped. The worker threads will gradually terminate, once any
537     /// extant work is completed.
terminate(&self)538     pub(super) fn terminate(&self) {
539         if self.terminate_count.fetch_sub(1, Ordering::AcqRel) == 1 {
540             for (i, thread_info) in self.thread_infos.iter().enumerate() {
541                 thread_info.terminate.set_and_tickle_one(self, i);
542             }
543         }
544     }
545 
546     /// Notify the worker that the latch they are sleeping on has been "set".
notify_worker_latch_is_set(&self, target_worker_index: usize)547     pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
548         self.sleep.notify_worker_latch_is_set(target_worker_index);
549     }
550 }
551 
552 #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
553 pub(super) struct RegistryId {
554     addr: usize,
555 }
556 
557 struct ThreadInfo {
558     /// Latch set once thread has started and we are entering into the
559     /// main loop. Used to wait for worker threads to become primed,
560     /// primarily of interest for benchmarking.
561     primed: LockLatch,
562 
563     /// Latch is set once worker thread has completed. Used to wait
564     /// until workers have stopped; only used for tests.
565     stopped: LockLatch,
566 
567     /// The latch used to signal that terminated has been requested.
568     /// This latch is *set* by the `terminate` method on the
569     /// `Registry`, once the registry's main "terminate" counter
570     /// reaches zero.
571     ///
572     /// NB. We use a `CountLatch` here because it has no lifetimes and is
573     /// meant for async use, but the count never gets higher than one.
574     terminate: CountLatch,
575 
576     /// the "stealer" half of the worker's deque
577     stealer: Stealer<JobRef>,
578 }
579 
580 impl ThreadInfo {
new(stealer: Stealer<JobRef>) -> ThreadInfo581     fn new(stealer: Stealer<JobRef>) -> ThreadInfo {
582         ThreadInfo {
583             primed: LockLatch::new(),
584             stopped: LockLatch::new(),
585             terminate: CountLatch::new(),
586             stealer,
587         }
588     }
589 }
590 
591 /// ////////////////////////////////////////////////////////////////////////
592 /// WorkerThread identifiers
593 
594 pub(super) struct WorkerThread {
595     /// the "worker" half of our local deque
596     worker: Worker<JobRef>,
597 
598     /// local queue used for `spawn_fifo` indirection
599     fifo: JobFifo,
600 
601     index: usize,
602 
603     /// A weak random number generator.
604     rng: XorShift64Star,
605 
606     registry: Arc<Registry>,
607 }
608 
609 // This is a bit sketchy, but basically: the WorkerThread is
610 // allocated on the stack of the worker on entry and stored into this
611 // thread local variable. So it will remain valid at least until the
612 // worker is fully unwound. Using an unsafe pointer avoids the need
613 // for a RefCell<T> etc.
614 thread_local! {
615     static WORKER_THREAD_STATE: Cell<*const WorkerThread> = Cell::new(ptr::null());
616 }
617 
618 impl Drop for WorkerThread {
drop(&mut self)619     fn drop(&mut self) {
620         // Undo `set_current`
621         WORKER_THREAD_STATE.with(|t| {
622             assert!(t.get().eq(&(self as *const _)));
623             t.set(ptr::null());
624         });
625     }
626 }
627 
628 impl WorkerThread {
629     /// Gets the `WorkerThread` index for the current thread; returns
630     /// NULL if this is not a worker thread. This pointer is valid
631     /// anywhere on the current thread.
632     #[inline]
current() -> *const WorkerThread633     pub(super) fn current() -> *const WorkerThread {
634         WORKER_THREAD_STATE.with(Cell::get)
635     }
636 
637     /// Sets `self` as the worker thread index for the current thread.
638     /// This is done during worker thread startup.
set_current(thread: *const WorkerThread)639     unsafe fn set_current(thread: *const WorkerThread) {
640         WORKER_THREAD_STATE.with(|t| {
641             assert!(t.get().is_null());
642             t.set(thread);
643         });
644     }
645 
646     /// Returns the registry that owns this worker thread.
647     #[inline]
registry(&self) -> &Arc<Registry>648     pub(super) fn registry(&self) -> &Arc<Registry> {
649         &self.registry
650     }
651 
652     #[inline]
log(&self, event: impl FnOnce() -> crate::log::Event)653     pub(super) fn log(&self, event: impl FnOnce() -> crate::log::Event) {
654         self.registry.logger.log(event)
655     }
656 
657     /// Our index amongst the worker threads (ranges from `0..self.num_threads()`).
658     #[inline]
index(&self) -> usize659     pub(super) fn index(&self) -> usize {
660         self.index
661     }
662 
663     #[inline]
push(&self, job: JobRef)664     pub(super) unsafe fn push(&self, job: JobRef) {
665         self.log(|| JobPushed { worker: self.index });
666         let queue_was_empty = self.worker.is_empty();
667         self.worker.push(job);
668         self.registry
669             .sleep
670             .new_internal_jobs(self.index, 1, queue_was_empty);
671     }
672 
673     #[inline]
push_fifo(&self, job: JobRef)674     pub(super) unsafe fn push_fifo(&self, job: JobRef) {
675         self.push(self.fifo.push(job));
676     }
677 
678     #[inline]
local_deque_is_empty(&self) -> bool679     pub(super) fn local_deque_is_empty(&self) -> bool {
680         self.worker.is_empty()
681     }
682 
683     /// Attempts to obtain a "local" job -- typically this means
684     /// popping from the top of the stack, though if we are configured
685     /// for breadth-first execution, it would mean dequeuing from the
686     /// bottom.
687     #[inline]
take_local_job(&self) -> Option<JobRef>688     pub(super) unsafe fn take_local_job(&self) -> Option<JobRef> {
689         let popped_job = self.worker.pop();
690 
691         if popped_job.is_some() {
692             self.log(|| JobPopped { worker: self.index });
693         }
694 
695         popped_job
696     }
697 
698     /// Wait until the latch is set. Try to keep busy by popping and
699     /// stealing tasks as necessary.
700     #[inline]
wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L)701     pub(super) unsafe fn wait_until<L: AsCoreLatch + ?Sized>(&self, latch: &L) {
702         let latch = latch.as_core_latch();
703         if !latch.probe() {
704             self.wait_until_cold(latch);
705         }
706     }
707 
708     #[cold]
wait_until_cold(&self, latch: &CoreLatch)709     unsafe fn wait_until_cold(&self, latch: &CoreLatch) {
710         // the code below should swallow all panics and hence never
711         // unwind; but if something does wrong, we want to abort,
712         // because otherwise other code in rayon may assume that the
713         // latch has been signaled, and that can lead to random memory
714         // accesses, which would be *very bad*
715         let abort_guard = unwind::AbortIfPanic;
716 
717         let mut idle_state = self.registry.sleep.start_looking(self.index, latch);
718         while !latch.probe() {
719             // Try to find some work to do. We give preference first
720             // to things in our local deque, then in other workers
721             // deques, and finally to injected jobs from the
722             // outside. The idea is to finish what we started before
723             // we take on something new.
724             if let Some(job) = self
725                 .take_local_job()
726                 .or_else(|| self.steal())
727                 .or_else(|| self.registry.pop_injected_job(self.index))
728             {
729                 self.registry.sleep.work_found(idle_state);
730                 self.execute(job);
731                 idle_state = self.registry.sleep.start_looking(self.index, latch);
732             } else {
733                 self.registry
734                     .sleep
735                     .no_work_found(&mut idle_state, latch, || self.registry.has_injected_job())
736             }
737         }
738 
739         // If we were sleepy, we are not anymore. We "found work" --
740         // whatever the surrounding thread was doing before it had to
741         // wait.
742         self.registry.sleep.work_found(idle_state);
743 
744         self.log(|| ThreadSawLatchSet {
745             worker: self.index,
746             latch_addr: latch.addr(),
747         });
748         mem::forget(abort_guard); // successful execution, do not abort
749     }
750 
751     #[inline]
execute(&self, job: JobRef)752     pub(super) unsafe fn execute(&self, job: JobRef) {
753         job.execute();
754     }
755 
756     /// Try to steal a single job and return it.
757     ///
758     /// This should only be done as a last resort, when there is no
759     /// local work to do.
steal(&self) -> Option<JobRef>760     unsafe fn steal(&self) -> Option<JobRef> {
761         // we only steal when we don't have any work to do locally
762         debug_assert!(self.local_deque_is_empty());
763 
764         // otherwise, try to steal
765         let thread_infos = &self.registry.thread_infos.as_slice();
766         let num_threads = thread_infos.len();
767         if num_threads <= 1 {
768             return None;
769         }
770 
771         loop {
772             let mut retry = false;
773             let start = self.rng.next_usize(num_threads);
774             let job = (start..num_threads)
775                 .chain(0..start)
776                 .filter(move |&i| i != self.index)
777                 .find_map(|victim_index| {
778                     let victim = &thread_infos[victim_index];
779                     match victim.stealer.steal() {
780                         Steal::Success(job) => {
781                             self.log(|| JobStolen {
782                                 worker: self.index,
783                                 victim: victim_index,
784                             });
785                             Some(job)
786                         }
787                         Steal::Empty => None,
788                         Steal::Retry => {
789                             retry = true;
790                             None
791                         }
792                     }
793                 });
794             if job.is_some() || !retry {
795                 return job;
796             }
797         }
798     }
799 }
800 
801 /// ////////////////////////////////////////////////////////////////////////
802 
main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize)803 unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
804     let worker_thread = &WorkerThread {
805         worker,
806         fifo: JobFifo::new(),
807         index,
808         rng: XorShift64Star::new(),
809         registry: registry.clone(),
810     };
811     WorkerThread::set_current(worker_thread);
812 
813     // let registry know we are ready to do work
814     registry.thread_infos[index].primed.set();
815 
816     // Worker threads should not panic. If they do, just abort, as the
817     // internal state of the threadpool is corrupted. Note that if
818     // **user code** panics, we should catch that and redirect.
819     let abort_guard = unwind::AbortIfPanic;
820 
821     // Inform a user callback that we started a thread.
822     if let Some(ref handler) = registry.start_handler {
823         let registry = registry.clone();
824         match unwind::halt_unwinding(|| handler(index)) {
825             Ok(()) => {}
826             Err(err) => {
827                 registry.handle_panic(err);
828             }
829         }
830     }
831 
832     let my_terminate_latch = &registry.thread_infos[index].terminate;
833     worker_thread.log(|| ThreadStart {
834         worker: index,
835         terminate_addr: my_terminate_latch.as_core_latch().addr(),
836     });
837     worker_thread.wait_until(my_terminate_latch);
838 
839     // Should not be any work left in our queue.
840     debug_assert!(worker_thread.take_local_job().is_none());
841 
842     // let registry know we are done
843     registry.thread_infos[index].stopped.set();
844 
845     // Normal termination, do not abort.
846     mem::forget(abort_guard);
847 
848     worker_thread.log(|| ThreadTerminate { worker: index });
849 
850     // Inform a user callback that we exited a thread.
851     if let Some(ref handler) = registry.exit_handler {
852         let registry = registry.clone();
853         match unwind::halt_unwinding(|| handler(index)) {
854             Ok(()) => {}
855             Err(err) => {
856                 registry.handle_panic(err);
857             }
858         }
859         // We're already exiting the thread, there's nothing else to do.
860     }
861 }
862 
863 /// If already in a worker-thread, just execute `op`.  Otherwise,
864 /// execute `op` in the default thread-pool. Either way, block until
865 /// `op` completes and return its return value. If `op` panics, that
866 /// panic will be propagated as well.  The second argument indicates
867 /// `true` if injection was performed, `false` if executed directly.
in_worker<OP, R>(op: OP) -> R where OP: FnOnce(&WorkerThread, bool) -> R + Send, R: Send,868 pub(super) fn in_worker<OP, R>(op: OP) -> R
869 where
870     OP: FnOnce(&WorkerThread, bool) -> R + Send,
871     R: Send,
872 {
873     unsafe {
874         let owner_thread = WorkerThread::current();
875         if !owner_thread.is_null() {
876             // Perfectly valid to give them a `&T`: this is the
877             // current thread, so we know the data structure won't be
878             // invalidated until we return.
879             op(&*owner_thread, false)
880         } else {
881             global_registry().in_worker_cold(op)
882         }
883     }
884 }
885 
886 /// [xorshift*] is a fast pseudorandom number generator which will
887 /// even tolerate weak seeding, as long as it's not zero.
888 ///
889 /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift*
890 struct XorShift64Star {
891     state: Cell<u64>,
892 }
893 
894 impl XorShift64Star {
new() -> Self895     fn new() -> Self {
896         // Any non-zero seed will do -- this uses the hash of a global counter.
897         let mut seed = 0;
898         while seed == 0 {
899             let mut hasher = DefaultHasher::new();
900             #[allow(deprecated)]
901             static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
902             hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed));
903             seed = hasher.finish();
904         }
905 
906         XorShift64Star {
907             state: Cell::new(seed),
908         }
909     }
910 
next(&self) -> u64911     fn next(&self) -> u64 {
912         let mut x = self.state.get();
913         debug_assert_ne!(x, 0);
914         x ^= x >> 12;
915         x ^= x << 25;
916         x ^= x >> 27;
917         self.state.set(x);
918         x.wrapping_mul(0x2545_f491_4f6c_dd1d)
919     }
920 
921     /// Return a value from `0..n`.
next_usize(&self, n: usize) -> usize922     fn next_usize(&self, n: usize) -> usize {
923         (self.next() % n as u64) as usize
924     }
925 }
926