1 //! Coordinates idling workers
2
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5
6 use std::fmt;
7 use std::sync::atomic::Ordering::{self, SeqCst};
8
9 pub(super) struct Idle {
10 /// Tracks both the number of searching workers and the number of unparked
11 /// workers.
12 ///
13 /// Used as a fast-path to avoid acquiring the lock when needed.
14 state: AtomicUsize,
15
16 /// Sleeping workers
17 sleepers: Mutex<Vec<usize>>,
18
19 /// Total number of workers.
20 num_workers: usize,
21 }
22
23 const UNPARK_SHIFT: usize = 16;
24 const UNPARK_MASK: usize = !SEARCH_MASK;
25 const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
26
27 #[derive(Copy, Clone)]
28 struct State(usize);
29
30 impl Idle {
new(num_workers: usize) -> Idle31 pub(super) fn new(num_workers: usize) -> Idle {
32 let init = State::new(num_workers);
33
34 Idle {
35 state: AtomicUsize::new(init.into()),
36 sleepers: Mutex::new(Vec::with_capacity(num_workers)),
37 num_workers,
38 }
39 }
40
41 /// If there are no workers actively searching, returns the index of a
42 /// worker currently sleeping.
worker_to_notify(&self) -> Option<usize>43 pub(super) fn worker_to_notify(&self) -> Option<usize> {
44 // If at least one worker is spinning, work being notified will
45 // eventully be found. A searching thread will find **some** work and
46 // notify another worker, eventually leading to our work being found.
47 //
48 // For this to happen, this load must happen before the thread
49 // transitioning `num_searching` to zero. Acquire / Relese does not
50 // provide sufficient guarantees, so this load is done with `SeqCst` and
51 // will pair with the `fetch_sub(1)` when transitioning out of
52 // searching.
53 if !self.notify_should_wakeup() {
54 return None;
55 }
56
57 // Acquire the lock
58 let mut sleepers = self.sleepers.lock();
59
60 // Check again, now that the lock is acquired
61 if !self.notify_should_wakeup() {
62 return None;
63 }
64
65 // A worker should be woken up, atomically increment the number of
66 // searching workers as well as the number of unparked workers.
67 State::unpark_one(&self.state);
68
69 // Get the worker to unpark
70 let ret = sleepers.pop();
71 debug_assert!(ret.is_some());
72
73 ret
74 }
75
76 /// Returns `true` if the worker needs to do a final check for submitted
77 /// work.
transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool78 pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
79 // Acquire the lock
80 let mut sleepers = self.sleepers.lock();
81
82 // Decrement the number of unparked threads
83 let ret = State::dec_num_unparked(&self.state, is_searching);
84
85 // Track the sleeping worker
86 sleepers.push(worker);
87
88 ret
89 }
90
transition_worker_to_searching(&self) -> bool91 pub(super) fn transition_worker_to_searching(&self) -> bool {
92 let state = State::load(&self.state, SeqCst);
93 if 2 * state.num_searching() >= self.num_workers {
94 return false;
95 }
96
97 // It is possible for this routine to allow more than 50% of the workers
98 // to search. That is OK. Limiting searchers is only an optimization to
99 // prevent too much contention.
100 State::inc_num_searching(&self.state, SeqCst);
101 true
102 }
103
104 /// A lightweight transition from searching -> running.
105 ///
106 /// Returns `true` if this is the final searching worker. The caller
107 /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool108 pub(super) fn transition_worker_from_searching(&self) -> bool {
109 State::dec_num_searching(&self.state)
110 }
111
112 /// Unpark a specific worker. This happens if tasks are submitted from
113 /// within the worker's park routine.
unpark_worker_by_id(&self, worker_id: usize)114 pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
115 let mut sleepers = self.sleepers.lock();
116
117 for index in 0..sleepers.len() {
118 if sleepers[index] == worker_id {
119 sleepers.swap_remove(index);
120
121 // Update the state accordingly while the lock is held.
122 State::unpark_one(&self.state);
123
124 return;
125 }
126 }
127 }
128
129 /// Returns `true` if `worker_id` is contained in the sleep set
is_parked(&self, worker_id: usize) -> bool130 pub(super) fn is_parked(&self, worker_id: usize) -> bool {
131 let sleepers = self.sleepers.lock();
132 sleepers.contains(&worker_id)
133 }
134
notify_should_wakeup(&self) -> bool135 fn notify_should_wakeup(&self) -> bool {
136 let state = State(self.state.fetch_add(0, SeqCst));
137 state.num_searching() == 0 && state.num_unparked() < self.num_workers
138 }
139 }
140
141 impl State {
new(num_workers: usize) -> State142 fn new(num_workers: usize) -> State {
143 // All workers start in the unparked state
144 let ret = State(num_workers << UNPARK_SHIFT);
145 debug_assert_eq!(num_workers, ret.num_unparked());
146 debug_assert_eq!(0, ret.num_searching());
147 ret
148 }
149
load(cell: &AtomicUsize, ordering: Ordering) -> State150 fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
151 State(cell.load(ordering))
152 }
153
unpark_one(cell: &AtomicUsize)154 fn unpark_one(cell: &AtomicUsize) {
155 cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
156 }
157
inc_num_searching(cell: &AtomicUsize, ordering: Ordering)158 fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
159 cell.fetch_add(1, ordering);
160 }
161
162 /// Returns `true` if this is the final searching worker
dec_num_searching(cell: &AtomicUsize) -> bool163 fn dec_num_searching(cell: &AtomicUsize) -> bool {
164 let state = State(cell.fetch_sub(1, SeqCst));
165 state.num_searching() == 1
166 }
167
168 /// Track a sleeping worker
169 ///
170 /// Returns `true` if this is the final searching worker.
dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool171 fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
172 let mut dec = 1 << UNPARK_SHIFT;
173
174 if is_searching {
175 dec += 1;
176 }
177
178 let prev = State(cell.fetch_sub(dec, SeqCst));
179 is_searching && prev.num_searching() == 1
180 }
181
182 /// Number of workers currently searching
num_searching(self) -> usize183 fn num_searching(self) -> usize {
184 self.0 & SEARCH_MASK
185 }
186
187 /// Number of workers currently unparked
num_unparked(self) -> usize188 fn num_unparked(self) -> usize {
189 (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
190 }
191 }
192
193 impl From<usize> for State {
from(src: usize) -> State194 fn from(src: usize) -> State {
195 State(src)
196 }
197 }
198
199 impl From<State> for usize {
from(src: State) -> usize200 fn from(src: State) -> usize {
201 src.0
202 }
203 }
204
205 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result206 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
207 fmt.debug_struct("worker::State")
208 .field("num_unparked", &self.num_unparked())
209 .field("num_searching", &self.num_searching())
210 .finish()
211 }
212 }
213
214 #[test]
test_state()215 fn test_state() {
216 assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
217 assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
218
219 let state = State::new(10);
220 assert_eq!(10, state.num_unparked());
221 assert_eq!(0, state.num_searching());
222 }
223