1 use std::sync::atomic::{AtomicUsize, Ordering};
2 use std::sync::{Arc, Condvar, Mutex};
3 use std::usize;
4 
5 use crate::registry::{Registry, WorkerThread};
6 
7 /// We define various kinds of latches, which are all a primitive signaling
8 /// mechanism. A latch starts as false. Eventually someone calls `set()` and
9 /// it becomes true. You can test if it has been set by calling `probe()`.
10 ///
11 /// Some kinds of latches, but not all, support a `wait()` operation
12 /// that will wait until the latch is set, blocking efficiently. That
13 /// is not part of the trait since it is not possibly to do with all
14 /// latches.
15 ///
16 /// The intention is that `set()` is called once, but `probe()` may be
17 /// called any number of times. Once `probe()` returns true, the memory
18 /// effects that occurred before `set()` become visible.
19 ///
20 /// It'd probably be better to refactor the API into two paired types,
21 /// but that's a bit of work, and this is not a public API.
22 ///
23 /// ## Memory ordering
24 ///
25 /// Latches need to guarantee two things:
26 ///
27 /// - Once `probe()` returns true, all memory effects from the `set()`
28 ///   are visible (in other words, the set should synchronize-with
29 ///   the probe).
30 /// - Once `set()` occurs, the next `probe()` *will* observe it.  This
31 ///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
32 ///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
33 pub(super) trait Latch {
34     /// Set the latch, signalling others.
35     ///
36     /// # WARNING
37     ///
38     /// Setting a latch triggers other threads to wake up and (in some
39     /// cases) complete. This may, in turn, cause memory to be
40     /// allocated and so forth.  One must be very careful about this,
41     /// and it's typically better to read all the fields you will need
42     /// to access *before* a latch is set!
set(&self)43     fn set(&self);
44 }
45 
46 pub(super) trait AsCoreLatch {
as_core_latch(&self) -> &CoreLatch47     fn as_core_latch(&self) -> &CoreLatch;
48 }
49 
50 /// Latch is not set, owning thread is awake
51 const UNSET: usize = 0;
52 
53 /// Latch is not set, owning thread is going to sleep on this latch
54 /// (but has not yet fallen asleep).
55 const SLEEPY: usize = 1;
56 
57 /// Latch is not set, owning thread is asleep on this latch and
58 /// must be awoken.
59 const SLEEPING: usize = 2;
60 
61 /// Latch is set.
62 const SET: usize = 3;
63 
64 /// Spin latches are the simplest, most efficient kind, but they do
65 /// not support a `wait()` operation. They just have a boolean flag
66 /// that becomes true when `set()` is called.
67 #[derive(Debug)]
68 pub(super) struct CoreLatch {
69     state: AtomicUsize,
70 }
71 
72 impl CoreLatch {
73     #[inline]
new() -> Self74     fn new() -> Self {
75         Self {
76             state: AtomicUsize::new(0),
77         }
78     }
79 
80     /// Returns the address of this core latch as an integer. Used
81     /// for logging.
82     #[inline]
addr(&self) -> usize83     pub(super) fn addr(&self) -> usize {
84         self as *const CoreLatch as usize
85     }
86 
87     /// Invoked by owning thread as it prepares to sleep. Returns true
88     /// if the owning thread may proceed to fall asleep, false if the
89     /// latch was set in the meantime.
90     #[inline]
get_sleepy(&self) -> bool91     pub(super) fn get_sleepy(&self) -> bool {
92         self.state
93             .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
94             .is_ok()
95     }
96 
97     /// Invoked by owning thread as it falls asleep sleep. Returns
98     /// true if the owning thread should block, or false if the latch
99     /// was set in the meantime.
100     #[inline]
fall_asleep(&self) -> bool101     pub(super) fn fall_asleep(&self) -> bool {
102         self.state
103             .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
104             .is_ok()
105     }
106 
107     /// Invoked by owning thread as it falls asleep sleep. Returns
108     /// true if the owning thread should block, or false if the latch
109     /// was set in the meantime.
110     #[inline]
wake_up(&self)111     pub(super) fn wake_up(&self) {
112         if !self.probe() {
113             let _ =
114                 self.state
115                     .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
116         }
117     }
118 
119     /// Set the latch. If this returns true, the owning thread was sleeping
120     /// and must be awoken.
121     ///
122     /// This is private because, typically, setting a latch involves
123     /// doing some wakeups; those are encapsulated in the surrounding
124     /// latch code.
125     #[inline]
set(&self) -> bool126     fn set(&self) -> bool {
127         let old_state = self.state.swap(SET, Ordering::AcqRel);
128         old_state == SLEEPING
129     }
130 
131     /// Test if this latch has been set.
132     #[inline]
probe(&self) -> bool133     pub(super) fn probe(&self) -> bool {
134         self.state.load(Ordering::Acquire) == SET
135     }
136 }
137 
138 /// Spin latches are the simplest, most efficient kind, but they do
139 /// not support a `wait()` operation. They just have a boolean flag
140 /// that becomes true when `set()` is called.
141 pub(super) struct SpinLatch<'r> {
142     core_latch: CoreLatch,
143     registry: &'r Arc<Registry>,
144     target_worker_index: usize,
145     cross: bool,
146 }
147 
148 impl<'r> SpinLatch<'r> {
149     /// Creates a new spin latch that is owned by `thread`. This means
150     /// that `thread` is the only thread that should be blocking on
151     /// this latch -- it also means that when the latch is set, we
152     /// will wake `thread` if it is sleeping.
153     #[inline]
new(thread: &'r WorkerThread) -> SpinLatch<'r>154     pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
155         SpinLatch {
156             core_latch: CoreLatch::new(),
157             registry: thread.registry(),
158             target_worker_index: thread.index(),
159             cross: false,
160         }
161     }
162 
163     /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
164     /// need to make sure the registry is kept alive after setting, so we can
165     /// safely call the notification.
166     #[inline]
cross(thread: &'r WorkerThread) -> SpinLatch<'r>167     pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
168         SpinLatch {
169             cross: true,
170             ..SpinLatch::new(thread)
171         }
172     }
173 
174     #[inline]
probe(&self) -> bool175     pub(super) fn probe(&self) -> bool {
176         self.core_latch.probe()
177     }
178 }
179 
180 impl<'r> AsCoreLatch for SpinLatch<'r> {
181     #[inline]
as_core_latch(&self) -> &CoreLatch182     fn as_core_latch(&self) -> &CoreLatch {
183         &self.core_latch
184     }
185 }
186 
187 impl<'r> Latch for SpinLatch<'r> {
188     #[inline]
set(&self)189     fn set(&self) {
190         let cross_registry;
191 
192         let registry = if self.cross {
193             // Ensure the registry stays alive while we notify it.
194             // Otherwise, it would be possible that we set the spin
195             // latch and the other thread sees it and exits, causing
196             // the registry to be deallocated, all before we get a
197             // chance to invoke `registry.notify_worker_latch_is_set`.
198             cross_registry = Arc::clone(self.registry);
199             &cross_registry
200         } else {
201             // If this is not a "cross-registry" spin-latch, then the
202             // thread which is performing `set` is itself ensuring
203             // that the registry stays alive.
204             self.registry
205         };
206         let target_worker_index = self.target_worker_index;
207 
208         // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
209         if self.core_latch.set() {
210             // Subtle: at this point, we can no longer read from
211             // `self`, because the thread owning this spin latch may
212             // have awoken and deallocated the latch. Therefore, we
213             // only use fields whose values we already read.
214             registry.notify_worker_latch_is_set(target_worker_index);
215         }
216     }
217 }
218 
219 /// A Latch starts as false and eventually becomes true. You can block
220 /// until it becomes true.
221 pub(super) struct LockLatch {
222     m: Mutex<bool>,
223     v: Condvar,
224 }
225 
226 impl LockLatch {
227     #[inline]
new() -> LockLatch228     pub(super) fn new() -> LockLatch {
229         LockLatch {
230             m: Mutex::new(false),
231             v: Condvar::new(),
232         }
233     }
234 
235     /// Block until latch is set, then resets this lock latch so it can be reused again.
wait_and_reset(&self)236     pub(super) fn wait_and_reset(&self) {
237         let mut guard = self.m.lock().unwrap();
238         while !*guard {
239             guard = self.v.wait(guard).unwrap();
240         }
241         *guard = false;
242     }
243 
244     /// Block until latch is set.
wait(&self)245     pub(super) fn wait(&self) {
246         let mut guard = self.m.lock().unwrap();
247         while !*guard {
248             guard = self.v.wait(guard).unwrap();
249         }
250     }
251 }
252 
253 impl Latch for LockLatch {
254     #[inline]
set(&self)255     fn set(&self) {
256         let mut guard = self.m.lock().unwrap();
257         *guard = true;
258         self.v.notify_all();
259     }
260 }
261 
262 /// Counting latches are used to implement scopes. They track a
263 /// counter. Unlike other latches, calling `set()` does not
264 /// necessarily make the latch be considered `set()`; instead, it just
265 /// decrements the counter. The latch is only "set" (in the sense that
266 /// `probe()` returns true) once the counter reaches zero.
267 ///
268 /// Note: like a `SpinLatch`, count laches are always associated with
269 /// some registry that is probing them, which must be tickled when
270 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
271 /// reference to that registry. This is because in some cases the
272 /// registry owns the count-latch, and that would create a cycle. So a
273 /// `CountLatch` must be given a reference to its owning registry when
274 /// it is set. For this reason, it does not implement the `Latch`
275 /// trait (but it doesn't have to, as it is not used in those generic
276 /// contexts).
277 #[derive(Debug)]
278 pub(super) struct CountLatch {
279     core_latch: CoreLatch,
280     counter: AtomicUsize,
281 }
282 
283 impl CountLatch {
284     #[inline]
new() -> CountLatch285     pub(super) fn new() -> CountLatch {
286         CountLatch {
287             core_latch: CoreLatch::new(),
288             counter: AtomicUsize::new(1),
289         }
290     }
291 
292     #[inline]
increment(&self)293     pub(super) fn increment(&self) {
294         debug_assert!(!self.core_latch.probe());
295         self.counter.fetch_add(1, Ordering::Relaxed);
296     }
297 
298     /// Decrements the latch counter by one. If this is the final
299     /// count, then the latch is **set**, and calls to `probe()` will
300     /// return true. Returns whether the latch was set. This is an
301     /// internal operation, as it does not tickle, and to fail to
302     /// tickle would lead to deadlock.
303     #[inline]
set(&self) -> bool304     fn set(&self) -> bool {
305         if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
306             self.core_latch.set();
307             true
308         } else {
309             false
310         }
311     }
312 
313     /// Decrements the latch counter by one and possibly set it.  If
314     /// the latch is set, then the specific worker thread is tickled,
315     /// which should be the one that owns this latch.
316     #[inline]
set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize)317     pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
318         if self.set() {
319             registry.notify_worker_latch_is_set(target_worker_index);
320         }
321     }
322 }
323 
324 impl AsCoreLatch for CountLatch {
325     #[inline]
as_core_latch(&self) -> &CoreLatch326     fn as_core_latch(&self) -> &CoreLatch {
327         &self.core_latch
328     }
329 }
330 
331 impl<'a, L> Latch for &'a L
332 where
333     L: Latch,
334 {
335     #[inline]
set(&self)336     fn set(&self) {
337         L::set(self);
338     }
339 }
340