1 use crate::latch::Latch;
2 use crate::unwind;
3 use crossbeam_deque::{Injector, Steal};
4 use std::any::Any;
5 use std::cell::UnsafeCell;
6 use std::mem;
7 
8 pub(super) enum JobResult<T> {
9     None,
10     Ok(T),
11     Panic(Box<dyn Any + Send>),
12 }
13 
14 /// A `Job` is used to advertise work for other threads that they may
15 /// want to steal. In accordance with time honored tradition, jobs are
16 /// arranged in a deque, so that thieves can take from the top of the
17 /// deque while the main worker manages the bottom of the deque. This
18 /// deque is managed by the `thread_pool` module.
19 pub(super) trait Job {
20     /// Unsafe: this may be called from a different thread than the one
21     /// which scheduled the job, so the implementer must ensure the
22     /// appropriate traits are met, whether `Send`, `Sync`, or both.
execute(this: *const Self)23     unsafe fn execute(this: *const Self);
24 }
25 
26 /// Effectively a Job trait object. Each JobRef **must** be executed
27 /// exactly once, or else data may leak.
28 ///
29 /// Internally, we store the job's data in a `*const ()` pointer.  The
30 /// true type is something like `*const StackJob<...>`, but we hide
31 /// it. We also carry the "execute fn" from the `Job` trait.
32 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
33 pub(super) struct JobRef {
34     pointer: *const (),
35     execute_fn: unsafe fn(*const ()),
36 }
37 
38 unsafe impl Send for JobRef {}
39 unsafe impl Sync for JobRef {}
40 
41 impl JobRef {
42     /// Unsafe: caller asserts that `data` will remain valid until the
43     /// job is executed.
new<T>(data: *const T) -> JobRef where T: Job,44     pub(super) unsafe fn new<T>(data: *const T) -> JobRef
45     where
46         T: Job,
47     {
48         let fn_ptr: unsafe fn(*const T) = <T as Job>::execute;
49 
50         // erase types:
51         JobRef {
52             pointer: data as *const (),
53             execute_fn: mem::transmute(fn_ptr),
54         }
55     }
56 
57     #[inline]
execute(&self)58     pub(super) unsafe fn execute(&self) {
59         (self.execute_fn)(self.pointer)
60     }
61 }
62 
63 /// A job that will be owned by a stack slot. This means that when it
64 /// executes it need not free any heap data, the cleanup occurs when
65 /// the stack frame is later popped.  The function parameter indicates
66 /// `true` if the job was stolen -- executed on a different thread.
67 pub(super) struct StackJob<L, F, R>
68 where
69     L: Latch + Sync,
70     F: FnOnce(bool) -> R + Send,
71     R: Send,
72 {
73     pub(super) latch: L,
74     func: UnsafeCell<Option<F>>,
75     result: UnsafeCell<JobResult<R>>,
76 }
77 
78 impl<L, F, R> StackJob<L, F, R>
79 where
80     L: Latch + Sync,
81     F: FnOnce(bool) -> R + Send,
82     R: Send,
83 {
new(func: F, latch: L) -> StackJob<L, F, R>84     pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> {
85         StackJob {
86             latch,
87             func: UnsafeCell::new(Some(func)),
88             result: UnsafeCell::new(JobResult::None),
89         }
90     }
91 
as_job_ref(&self) -> JobRef92     pub(super) unsafe fn as_job_ref(&self) -> JobRef {
93         JobRef::new(self)
94     }
95 
run_inline(self, stolen: bool) -> R96     pub(super) unsafe fn run_inline(self, stolen: bool) -> R {
97         self.func.into_inner().unwrap()(stolen)
98     }
99 
into_result(self) -> R100     pub(super) unsafe fn into_result(self) -> R {
101         self.result.into_inner().into_return_value()
102     }
103 }
104 
105 impl<L, F, R> Job for StackJob<L, F, R>
106 where
107     L: Latch + Sync,
108     F: FnOnce(bool) -> R + Send,
109     R: Send,
110 {
execute(this: *const Self)111     unsafe fn execute(this: *const Self) {
112         fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R {
113             move || func(true)
114         }
115 
116         let this = &*this;
117         let abort = unwind::AbortIfPanic;
118         let func = (*this.func.get()).take().unwrap();
119         (*this.result.get()) = match unwind::halt_unwinding(call(func)) {
120             Ok(x) => JobResult::Ok(x),
121             Err(x) => JobResult::Panic(x),
122         };
123         this.latch.set();
124         mem::forget(abort);
125     }
126 }
127 
128 /// Represents a job stored in the heap. Used to implement
129 /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply
130 /// invokes a closure, which then triggers the appropriate logic to
131 /// signal that the job executed.
132 ///
133 /// (Probably `StackJob` should be refactored in a similar fashion.)
134 pub(super) struct HeapJob<BODY>
135 where
136     BODY: FnOnce() + Send,
137 {
138     job: UnsafeCell<Option<BODY>>,
139 }
140 
141 impl<BODY> HeapJob<BODY>
142 where
143     BODY: FnOnce() + Send,
144 {
new(func: BODY) -> Self145     pub(super) fn new(func: BODY) -> Self {
146         HeapJob {
147             job: UnsafeCell::new(Some(func)),
148         }
149     }
150 
151     /// Creates a `JobRef` from this job -- note that this hides all
152     /// lifetimes, so it is up to you to ensure that this JobRef
153     /// doesn't outlive any data that it closes over.
as_job_ref(self: Box<Self>) -> JobRef154     pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef {
155         let this: *const Self = mem::transmute(self);
156         JobRef::new(this)
157     }
158 }
159 
160 impl<BODY> Job for HeapJob<BODY>
161 where
162     BODY: FnOnce() + Send,
163 {
execute(this: *const Self)164     unsafe fn execute(this: *const Self) {
165         let this: Box<Self> = mem::transmute(this);
166         let job = (*this.job.get()).take().unwrap();
167         job();
168     }
169 }
170 
171 impl<T> JobResult<T> {
172     /// Convert the `JobResult` for a job that has finished (and hence
173     /// its JobResult is populated) into its return value.
174     ///
175     /// NB. This will panic if the job panicked.
into_return_value(self) -> T176     pub(super) fn into_return_value(self) -> T {
177         match self {
178             JobResult::None => unreachable!(),
179             JobResult::Ok(x) => x,
180             JobResult::Panic(x) => unwind::resume_unwinding(x),
181         }
182     }
183 }
184 
185 /// Indirect queue to provide FIFO job priority.
186 pub(super) struct JobFifo {
187     inner: Injector<JobRef>,
188 }
189 
190 impl JobFifo {
new() -> Self191     pub(super) fn new() -> Self {
192         JobFifo {
193             inner: Injector::new(),
194         }
195     }
196 
push(&self, job_ref: JobRef) -> JobRef197     pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef {
198         // A little indirection ensures that spawns are always prioritized in FIFO order.  The
199         // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
200         // (FIFO), but either way they will end up popping from the front of this queue.
201         self.inner.push(job_ref);
202         JobRef::new(self)
203     }
204 }
205 
206 impl Job for JobFifo {
execute(this: *const Self)207     unsafe fn execute(this: *const Self) {
208         // We "execute" a queue by executing its first job, FIFO.
209         loop {
210             match (*this).inner.steal() {
211                 Steal::Success(job_ref) => break job_ref.execute(),
212                 Steal::Empty => panic!("FIFO is empty"),
213                 Steal::Retry => {}
214             }
215         }
216     }
217 }
218