1 // TODO(@jeehoonkang): we mutates `batch_size` inside `for i in 0..batch_size {}`. It is difficult
2 // to read because we're mutating the range bound.
3 #![allow(clippy::mut_range_bound)]
4 
5 use std::cell::{Cell, UnsafeCell};
6 use std::cmp;
7 use std::fmt;
8 use std::iter::FromIterator;
9 use std::marker::PhantomData;
10 use std::mem::{self, MaybeUninit};
11 use std::ptr;
12 use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
13 use std::sync::Arc;
14 
15 use crate::epoch::{self, Atomic, Owned};
16 use crate::utils::{Backoff, CachePadded};
17 
18 // Minimum buffer capacity.
19 const MIN_CAP: usize = 64;
20 // Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
21 const MAX_BATCH: usize = 32;
22 // If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
23 // deallocated as soon as possible.
24 const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
25 
26 /// A buffer that holds tasks in a worker queue.
27 ///
28 /// This is just a pointer to the buffer and its length - dropping an instance of this struct will
29 /// *not* deallocate the buffer.
30 struct Buffer<T> {
31     /// Pointer to the allocated memory.
32     ptr: *mut T,
33 
34     /// Capacity of the buffer. Always a power of two.
35     cap: usize,
36 }
37 
38 unsafe impl<T> Send for Buffer<T> {}
39 
40 impl<T> Buffer<T> {
41     /// Allocates a new buffer with the specified capacity.
alloc(cap: usize) -> Buffer<T>42     fn alloc(cap: usize) -> Buffer<T> {
43         debug_assert_eq!(cap, cap.next_power_of_two());
44 
45         let mut v = Vec::with_capacity(cap);
46         let ptr = v.as_mut_ptr();
47         mem::forget(v);
48 
49         Buffer { ptr, cap }
50     }
51 
52     /// Deallocates the buffer.
dealloc(self)53     unsafe fn dealloc(self) {
54         drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
55     }
56 
57     /// Returns a pointer to the task at the specified `index`.
at(&self, index: isize) -> *mut T58     unsafe fn at(&self, index: isize) -> *mut T {
59         // `self.cap` is always a power of two.
60         self.ptr.offset(index & (self.cap - 1) as isize)
61     }
62 
63     /// Writes `task` into the specified `index`.
64     ///
65     /// This method might be concurrently called with another `read` at the same index, which is
66     /// technically speaking a data race and therefore UB. We should use an atomic store here, but
67     /// that would be more expensive and difficult to implement generically for all types `T`.
68     /// Hence, as a hack, we use a volatile write instead.
write(&self, index: isize, task: T)69     unsafe fn write(&self, index: isize, task: T) {
70         ptr::write_volatile(self.at(index), task)
71     }
72 
73     /// Reads a task from the specified `index`.
74     ///
75     /// This method might be concurrently called with another `write` at the same index, which is
76     /// technically speaking a data race and therefore UB. We should use an atomic load here, but
77     /// that would be more expensive and difficult to implement generically for all types `T`.
78     /// Hence, as a hack, we use a volatile write instead.
read(&self, index: isize) -> T79     unsafe fn read(&self, index: isize) -> T {
80         ptr::read_volatile(self.at(index))
81     }
82 }
83 
84 impl<T> Clone for Buffer<T> {
clone(&self) -> Buffer<T>85     fn clone(&self) -> Buffer<T> {
86         Buffer {
87             ptr: self.ptr,
88             cap: self.cap,
89         }
90     }
91 }
92 
93 impl<T> Copy for Buffer<T> {}
94 
95 /// Internal queue data shared between the worker and stealers.
96 ///
97 /// The implementation is based on the following work:
98 ///
99 /// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
100 /// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
101 ///    PPoPP 2013.][weak-mem]
102 /// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
103 ///    atomics. OOPSLA 2013.][checker]
104 ///
105 /// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
106 /// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
107 /// [checker]: https://dl.acm.org/citation.cfm?id=2509514
108 struct Inner<T> {
109     /// The front index.
110     front: AtomicIsize,
111 
112     /// The back index.
113     back: AtomicIsize,
114 
115     /// The underlying buffer.
116     buffer: CachePadded<Atomic<Buffer<T>>>,
117 }
118 
119 impl<T> Drop for Inner<T> {
drop(&mut self)120     fn drop(&mut self) {
121         // Load the back index, front index, and buffer.
122         let b = self.back.load(Ordering::Relaxed);
123         let f = self.front.load(Ordering::Relaxed);
124 
125         unsafe {
126             let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
127 
128             // Go through the buffer from front to back and drop all tasks in the queue.
129             let mut i = f;
130             while i != b {
131                 buffer.deref().at(i).drop_in_place();
132                 i = i.wrapping_add(1);
133             }
134 
135             // Free the memory allocated by the buffer.
136             buffer.into_owned().into_box().dealloc();
137         }
138     }
139 }
140 
141 /// Worker queue flavor: FIFO or LIFO.
142 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
143 enum Flavor {
144     /// The first-in first-out flavor.
145     Fifo,
146 
147     /// The last-in first-out flavor.
148     Lifo,
149 }
150 
151 /// A worker queue.
152 ///
153 /// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
154 /// tasks from it. Task schedulers typically create a single worker queue per thread.
155 ///
156 /// # Examples
157 ///
158 /// A FIFO worker:
159 ///
160 /// ```
161 /// use crossbeam_deque::{Steal, Worker};
162 ///
163 /// let w = Worker::new_fifo();
164 /// let s = w.stealer();
165 ///
166 /// w.push(1);
167 /// w.push(2);
168 /// w.push(3);
169 ///
170 /// assert_eq!(s.steal(), Steal::Success(1));
171 /// assert_eq!(w.pop(), Some(2));
172 /// assert_eq!(w.pop(), Some(3));
173 /// ```
174 ///
175 /// A LIFO worker:
176 ///
177 /// ```
178 /// use crossbeam_deque::{Steal, Worker};
179 ///
180 /// let w = Worker::new_lifo();
181 /// let s = w.stealer();
182 ///
183 /// w.push(1);
184 /// w.push(2);
185 /// w.push(3);
186 ///
187 /// assert_eq!(s.steal(), Steal::Success(1));
188 /// assert_eq!(w.pop(), Some(3));
189 /// assert_eq!(w.pop(), Some(2));
190 /// ```
191 pub struct Worker<T> {
192     /// A reference to the inner representation of the queue.
193     inner: Arc<CachePadded<Inner<T>>>,
194 
195     /// A copy of `inner.buffer` for quick access.
196     buffer: Cell<Buffer<T>>,
197 
198     /// The flavor of the queue.
199     flavor: Flavor,
200 
201     /// Indicates that the worker cannot be shared among threads.
202     _marker: PhantomData<*mut ()>, // !Send + !Sync
203 }
204 
205 unsafe impl<T: Send> Send for Worker<T> {}
206 
207 impl<T> Worker<T> {
208     /// Creates a FIFO worker queue.
209     ///
210     /// Tasks are pushed and popped from opposite ends.
211     ///
212     /// # Examples
213     ///
214     /// ```
215     /// use crossbeam_deque::Worker;
216     ///
217     /// let w = Worker::<i32>::new_fifo();
218     /// ```
new_fifo() -> Worker<T>219     pub fn new_fifo() -> Worker<T> {
220         let buffer = Buffer::alloc(MIN_CAP);
221 
222         let inner = Arc::new(CachePadded::new(Inner {
223             front: AtomicIsize::new(0),
224             back: AtomicIsize::new(0),
225             buffer: CachePadded::new(Atomic::new(buffer)),
226         }));
227 
228         Worker {
229             inner,
230             buffer: Cell::new(buffer),
231             flavor: Flavor::Fifo,
232             _marker: PhantomData,
233         }
234     }
235 
236     /// Creates a LIFO worker queue.
237     ///
238     /// Tasks are pushed and popped from the same end.
239     ///
240     /// # Examples
241     ///
242     /// ```
243     /// use crossbeam_deque::Worker;
244     ///
245     /// let w = Worker::<i32>::new_lifo();
246     /// ```
new_lifo() -> Worker<T>247     pub fn new_lifo() -> Worker<T> {
248         let buffer = Buffer::alloc(MIN_CAP);
249 
250         let inner = Arc::new(CachePadded::new(Inner {
251             front: AtomicIsize::new(0),
252             back: AtomicIsize::new(0),
253             buffer: CachePadded::new(Atomic::new(buffer)),
254         }));
255 
256         Worker {
257             inner,
258             buffer: Cell::new(buffer),
259             flavor: Flavor::Lifo,
260             _marker: PhantomData,
261         }
262     }
263 
264     /// Creates a stealer for this queue.
265     ///
266     /// The returned stealer can be shared among threads and cloned.
267     ///
268     /// # Examples
269     ///
270     /// ```
271     /// use crossbeam_deque::Worker;
272     ///
273     /// let w = Worker::<i32>::new_lifo();
274     /// let s = w.stealer();
275     /// ```
stealer(&self) -> Stealer<T>276     pub fn stealer(&self) -> Stealer<T> {
277         Stealer {
278             inner: self.inner.clone(),
279             flavor: self.flavor,
280         }
281     }
282 
283     /// Resizes the internal buffer to the new capacity of `new_cap`.
284     #[cold]
resize(&self, new_cap: usize)285     unsafe fn resize(&self, new_cap: usize) {
286         // Load the back index, front index, and buffer.
287         let b = self.inner.back.load(Ordering::Relaxed);
288         let f = self.inner.front.load(Ordering::Relaxed);
289         let buffer = self.buffer.get();
290 
291         // Allocate a new buffer and copy data from the old buffer to the new one.
292         let new = Buffer::alloc(new_cap);
293         let mut i = f;
294         while i != b {
295             ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
296             i = i.wrapping_add(1);
297         }
298 
299         let guard = &epoch::pin();
300 
301         // Replace the old buffer with the new one.
302         self.buffer.replace(new);
303         let old =
304             self.inner
305                 .buffer
306                 .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
307 
308         // Destroy the old buffer later.
309         guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
310 
311         // If the buffer is very large, then flush the thread-local garbage in order to deallocate
312         // it as soon as possible.
313         if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
314             guard.flush();
315         }
316     }
317 
318     /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
319     /// buffer.
reserve(&self, reserve_cap: usize)320     fn reserve(&self, reserve_cap: usize) {
321         if reserve_cap > 0 {
322             // Compute the current length.
323             let b = self.inner.back.load(Ordering::Relaxed);
324             let f = self.inner.front.load(Ordering::SeqCst);
325             let len = b.wrapping_sub(f) as usize;
326 
327             // The current capacity.
328             let cap = self.buffer.get().cap;
329 
330             // Is there enough capacity to push `reserve_cap` tasks?
331             if cap - len < reserve_cap {
332                 // Keep doubling the capacity as much as is needed.
333                 let mut new_cap = cap * 2;
334                 while new_cap - len < reserve_cap {
335                     new_cap *= 2;
336                 }
337 
338                 // Resize the buffer.
339                 unsafe {
340                     self.resize(new_cap);
341                 }
342             }
343         }
344     }
345 
346     /// Returns `true` if the queue is empty.
347     ///
348     /// ```
349     /// use crossbeam_deque::Worker;
350     ///
351     /// let w = Worker::new_lifo();
352     ///
353     /// assert!(w.is_empty());
354     /// w.push(1);
355     /// assert!(!w.is_empty());
356     /// ```
is_empty(&self) -> bool357     pub fn is_empty(&self) -> bool {
358         let b = self.inner.back.load(Ordering::Relaxed);
359         let f = self.inner.front.load(Ordering::SeqCst);
360         b.wrapping_sub(f) <= 0
361     }
362 
363     /// Returns the number of tasks in the deque.
364     ///
365     /// ```
366     /// use crossbeam_deque::Worker;
367     ///
368     /// let w = Worker::new_lifo();
369     ///
370     /// assert_eq!(w.len(), 0);
371     /// w.push(1);
372     /// assert_eq!(w.len(), 1);
373     /// w.push(1);
374     /// assert_eq!(w.len(), 2);
375     /// ```
len(&self) -> usize376     pub fn len(&self) -> usize {
377         let b = self.inner.back.load(Ordering::Relaxed);
378         let f = self.inner.front.load(Ordering::SeqCst);
379         b.wrapping_sub(f).max(0) as usize
380     }
381 
382     /// Pushes a task into the queue.
383     ///
384     /// # Examples
385     ///
386     /// ```
387     /// use crossbeam_deque::Worker;
388     ///
389     /// let w = Worker::new_lifo();
390     /// w.push(1);
391     /// w.push(2);
392     /// ```
push(&self, task: T)393     pub fn push(&self, task: T) {
394         // Load the back index, front index, and buffer.
395         let b = self.inner.back.load(Ordering::Relaxed);
396         let f = self.inner.front.load(Ordering::Acquire);
397         let mut buffer = self.buffer.get();
398 
399         // Calculate the length of the queue.
400         let len = b.wrapping_sub(f);
401 
402         // Is the queue full?
403         if len >= buffer.cap as isize {
404             // Yes. Grow the underlying buffer.
405             unsafe {
406                 self.resize(2 * buffer.cap);
407             }
408             buffer = self.buffer.get();
409         }
410 
411         // Write `task` into the slot.
412         unsafe {
413             buffer.write(b, task);
414         }
415 
416         atomic::fence(Ordering::Release);
417 
418         // Increment the back index.
419         //
420         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
421         // races because it doesn't understand fences.
422         self.inner.back.store(b.wrapping_add(1), Ordering::Release);
423     }
424 
425     /// Pops a task from the queue.
426     ///
427     /// # Examples
428     ///
429     /// ```
430     /// use crossbeam_deque::Worker;
431     ///
432     /// let w = Worker::new_fifo();
433     /// w.push(1);
434     /// w.push(2);
435     ///
436     /// assert_eq!(w.pop(), Some(1));
437     /// assert_eq!(w.pop(), Some(2));
438     /// assert_eq!(w.pop(), None);
439     /// ```
pop(&self) -> Option<T>440     pub fn pop(&self) -> Option<T> {
441         // Load the back and front index.
442         let b = self.inner.back.load(Ordering::Relaxed);
443         let f = self.inner.front.load(Ordering::Relaxed);
444 
445         // Calculate the length of the queue.
446         let len = b.wrapping_sub(f);
447 
448         // Is the queue empty?
449         if len <= 0 {
450             return None;
451         }
452 
453         match self.flavor {
454             // Pop from the front of the queue.
455             Flavor::Fifo => {
456                 // Try incrementing the front index to pop the task.
457                 let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
458                 let new_f = f.wrapping_add(1);
459 
460                 if b.wrapping_sub(new_f) < 0 {
461                     self.inner.front.store(f, Ordering::Relaxed);
462                     return None;
463                 }
464 
465                 unsafe {
466                     // Read the popped task.
467                     let buffer = self.buffer.get();
468                     let task = buffer.read(f);
469 
470                     // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
471                     if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
472                         self.resize(buffer.cap / 2);
473                     }
474 
475                     Some(task)
476                 }
477             }
478 
479             // Pop from the back of the queue.
480             Flavor::Lifo => {
481                 // Decrement the back index.
482                 let b = b.wrapping_sub(1);
483                 self.inner.back.store(b, Ordering::Relaxed);
484 
485                 atomic::fence(Ordering::SeqCst);
486 
487                 // Load the front index.
488                 let f = self.inner.front.load(Ordering::Relaxed);
489 
490                 // Compute the length after the back index was decremented.
491                 let len = b.wrapping_sub(f);
492 
493                 if len < 0 {
494                     // The queue is empty. Restore the back index to the original task.
495                     self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
496                     None
497                 } else {
498                     // Read the task to be popped.
499                     let buffer = self.buffer.get();
500                     let mut task = unsafe { Some(buffer.read(b)) };
501 
502                     // Are we popping the last task from the queue?
503                     if len == 0 {
504                         // Try incrementing the front index.
505                         if self
506                             .inner
507                             .front
508                             .compare_exchange(
509                                 f,
510                                 f.wrapping_add(1),
511                                 Ordering::SeqCst,
512                                 Ordering::Relaxed,
513                             )
514                             .is_err()
515                         {
516                             // Failed. We didn't pop anything.
517                             mem::forget(task.take());
518                         }
519 
520                         // Restore the back index to the original task.
521                         self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
522                     } else {
523                         // Shrink the buffer if `len` is less than one fourth of the capacity.
524                         if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
525                             unsafe {
526                                 self.resize(buffer.cap / 2);
527                             }
528                         }
529                     }
530 
531                     task
532                 }
533             }
534         }
535     }
536 }
537 
538 impl<T> fmt::Debug for Worker<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result539     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540         f.pad("Worker { .. }")
541     }
542 }
543 
544 /// A stealer handle of a worker queue.
545 ///
546 /// Stealers can be shared among threads.
547 ///
548 /// Task schedulers typically have a single worker queue per worker thread.
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// use crossbeam_deque::{Steal, Worker};
554 ///
555 /// let w = Worker::new_lifo();
556 /// w.push(1);
557 /// w.push(2);
558 ///
559 /// let s = w.stealer();
560 /// assert_eq!(s.steal(), Steal::Success(1));
561 /// assert_eq!(s.steal(), Steal::Success(2));
562 /// assert_eq!(s.steal(), Steal::Empty);
563 /// ```
564 pub struct Stealer<T> {
565     /// A reference to the inner representation of the queue.
566     inner: Arc<CachePadded<Inner<T>>>,
567 
568     /// The flavor of the queue.
569     flavor: Flavor,
570 }
571 
572 unsafe impl<T: Send> Send for Stealer<T> {}
573 unsafe impl<T: Send> Sync for Stealer<T> {}
574 
575 impl<T> Stealer<T> {
576     /// Returns `true` if the queue is empty.
577     ///
578     /// ```
579     /// use crossbeam_deque::Worker;
580     ///
581     /// let w = Worker::new_lifo();
582     /// let s = w.stealer();
583     ///
584     /// assert!(s.is_empty());
585     /// w.push(1);
586     /// assert!(!s.is_empty());
587     /// ```
is_empty(&self) -> bool588     pub fn is_empty(&self) -> bool {
589         let f = self.inner.front.load(Ordering::Acquire);
590         atomic::fence(Ordering::SeqCst);
591         let b = self.inner.back.load(Ordering::Acquire);
592         b.wrapping_sub(f) <= 0
593     }
594 
595     /// Steals a task from the queue.
596     ///
597     /// # Examples
598     ///
599     /// ```
600     /// use crossbeam_deque::{Steal, Worker};
601     ///
602     /// let w = Worker::new_lifo();
603     /// w.push(1);
604     /// w.push(2);
605     ///
606     /// let s = w.stealer();
607     /// assert_eq!(s.steal(), Steal::Success(1));
608     /// assert_eq!(s.steal(), Steal::Success(2));
609     /// ```
steal(&self) -> Steal<T>610     pub fn steal(&self) -> Steal<T> {
611         // Load the front index.
612         let f = self.inner.front.load(Ordering::Acquire);
613 
614         // A SeqCst fence is needed here.
615         //
616         // If the current thread is already pinned (reentrantly), we must manually issue the
617         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
618         // have to.
619         if epoch::is_pinned() {
620             atomic::fence(Ordering::SeqCst);
621         }
622 
623         let guard = &epoch::pin();
624 
625         // Load the back index.
626         let b = self.inner.back.load(Ordering::Acquire);
627 
628         // Is the queue empty?
629         if b.wrapping_sub(f) <= 0 {
630             return Steal::Empty;
631         }
632 
633         // Load the buffer and read the task at the front.
634         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
635         let task = unsafe { buffer.deref().read(f) };
636 
637         // Try incrementing the front index to steal the task.
638         if self
639             .inner
640             .front
641             .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
642             .is_err()
643         {
644             // We didn't steal this task, forget it.
645             mem::forget(task);
646             return Steal::Retry;
647         }
648 
649         // Return the stolen task.
650         Steal::Success(task)
651     }
652 
653     /// Steals a batch of tasks and pushes them into another worker.
654     ///
655     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
656     /// steal around half of the tasks in the queue, but also not more than some constant limit.
657     ///
658     /// # Examples
659     ///
660     /// ```
661     /// use crossbeam_deque::Worker;
662     ///
663     /// let w1 = Worker::new_fifo();
664     /// w1.push(1);
665     /// w1.push(2);
666     /// w1.push(3);
667     /// w1.push(4);
668     ///
669     /// let s = w1.stealer();
670     /// let w2 = Worker::new_fifo();
671     ///
672     /// let _ = s.steal_batch(&w2);
673     /// assert_eq!(w2.pop(), Some(1));
674     /// assert_eq!(w2.pop(), Some(2));
675     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>676     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
677         if Arc::ptr_eq(&self.inner, &dest.inner) {
678             if dest.is_empty() {
679                 return Steal::Empty;
680             } else {
681                 return Steal::Success(());
682             }
683         }
684 
685         // Load the front index.
686         let mut f = self.inner.front.load(Ordering::Acquire);
687 
688         // A SeqCst fence is needed here.
689         //
690         // If the current thread is already pinned (reentrantly), we must manually issue the
691         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
692         // have to.
693         if epoch::is_pinned() {
694             atomic::fence(Ordering::SeqCst);
695         }
696 
697         let guard = &epoch::pin();
698 
699         // Load the back index.
700         let b = self.inner.back.load(Ordering::Acquire);
701 
702         // Is the queue empty?
703         let len = b.wrapping_sub(f);
704         if len <= 0 {
705             return Steal::Empty;
706         }
707 
708         // Reserve capacity for the stolen batch.
709         let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
710         dest.reserve(batch_size);
711         let mut batch_size = batch_size as isize;
712 
713         // Get the destination buffer and back index.
714         let dest_buffer = dest.buffer.get();
715         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
716 
717         // Load the buffer.
718         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
719 
720         match self.flavor {
721             // Steal a batch of tasks from the front at once.
722             Flavor::Fifo => {
723                 // Copy the batch from the source to the destination buffer.
724                 match dest.flavor {
725                     Flavor::Fifo => {
726                         for i in 0..batch_size {
727                             unsafe {
728                                 let task = buffer.deref().read(f.wrapping_add(i));
729                                 dest_buffer.write(dest_b.wrapping_add(i), task);
730                             }
731                         }
732                     }
733                     Flavor::Lifo => {
734                         for i in 0..batch_size {
735                             unsafe {
736                                 let task = buffer.deref().read(f.wrapping_add(i));
737                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
738                             }
739                         }
740                     }
741                 }
742 
743                 // Try incrementing the front index to steal the batch.
744                 if self
745                     .inner
746                     .front
747                     .compare_exchange(
748                         f,
749                         f.wrapping_add(batch_size),
750                         Ordering::SeqCst,
751                         Ordering::Relaxed,
752                     )
753                     .is_err()
754                 {
755                     return Steal::Retry;
756                 }
757 
758                 dest_b = dest_b.wrapping_add(batch_size);
759             }
760 
761             // Steal a batch of tasks from the front one by one.
762             Flavor::Lifo => {
763                 for i in 0..batch_size {
764                     // If this is not the first steal, check whether the queue is empty.
765                     if i > 0 {
766                         // We've already got the current front index. Now execute the fence to
767                         // synchronize with other threads.
768                         atomic::fence(Ordering::SeqCst);
769 
770                         // Load the back index.
771                         let b = self.inner.back.load(Ordering::Acquire);
772 
773                         // Is the queue empty?
774                         if b.wrapping_sub(f) <= 0 {
775                             batch_size = i;
776                             break;
777                         }
778                     }
779 
780                     // Read the task at the front.
781                     let task = unsafe { buffer.deref().read(f) };
782 
783                     // Try incrementing the front index to steal the task.
784                     if self
785                         .inner
786                         .front
787                         .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
788                         .is_err()
789                     {
790                         // We didn't steal this task, forget it and break from the loop.
791                         mem::forget(task);
792                         batch_size = i;
793                         break;
794                     }
795 
796                     // Write the stolen task into the destination buffer.
797                     unsafe {
798                         dest_buffer.write(dest_b, task);
799                     }
800 
801                     // Move the source front index and the destination back index one step forward.
802                     f = f.wrapping_add(1);
803                     dest_b = dest_b.wrapping_add(1);
804                 }
805 
806                 // If we didn't steal anything, the operation needs to be retried.
807                 if batch_size == 0 {
808                     return Steal::Retry;
809                 }
810 
811                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
812                 if dest.flavor == Flavor::Fifo {
813                     for i in 0..batch_size / 2 {
814                         unsafe {
815                             let i1 = dest_b.wrapping_sub(batch_size - i);
816                             let i2 = dest_b.wrapping_sub(i + 1);
817                             let t1 = dest_buffer.read(i1);
818                             let t2 = dest_buffer.read(i2);
819                             dest_buffer.write(i1, t2);
820                             dest_buffer.write(i2, t1);
821                         }
822                     }
823                 }
824             }
825         }
826 
827         atomic::fence(Ordering::Release);
828 
829         // Update the back index in the destination queue.
830         //
831         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
832         // races because it doesn't understand fences.
833         dest.inner.back.store(dest_b, Ordering::Release);
834 
835         // Return with success.
836         Steal::Success(())
837     }
838 
839     /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
840     ///
841     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
842     /// steal around half of the tasks in the queue, but also not more than some constant limit.
843     ///
844     /// # Examples
845     ///
846     /// ```
847     /// use crossbeam_deque::{Steal, Worker};
848     ///
849     /// let w1 = Worker::new_fifo();
850     /// w1.push(1);
851     /// w1.push(2);
852     /// w1.push(3);
853     /// w1.push(4);
854     ///
855     /// let s = w1.stealer();
856     /// let w2 = Worker::new_fifo();
857     ///
858     /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
859     /// assert_eq!(w2.pop(), Some(2));
860     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>861     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
862         if Arc::ptr_eq(&self.inner, &dest.inner) {
863             match dest.pop() {
864                 None => return Steal::Empty,
865                 Some(task) => return Steal::Success(task),
866             }
867         }
868 
869         // Load the front index.
870         let mut f = self.inner.front.load(Ordering::Acquire);
871 
872         // A SeqCst fence is needed here.
873         //
874         // If the current thread is already pinned (reentrantly), we must manually issue the
875         // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
876         // have to.
877         if epoch::is_pinned() {
878             atomic::fence(Ordering::SeqCst);
879         }
880 
881         let guard = &epoch::pin();
882 
883         // Load the back index.
884         let b = self.inner.back.load(Ordering::Acquire);
885 
886         // Is the queue empty?
887         let len = b.wrapping_sub(f);
888         if len <= 0 {
889             return Steal::Empty;
890         }
891 
892         // Reserve capacity for the stolen batch.
893         let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
894         dest.reserve(batch_size);
895         let mut batch_size = batch_size as isize;
896 
897         // Get the destination buffer and back index.
898         let dest_buffer = dest.buffer.get();
899         let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
900 
901         // Load the buffer
902         let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
903 
904         // Read the task at the front.
905         let mut task = unsafe { buffer.deref().read(f) };
906 
907         match self.flavor {
908             // Steal a batch of tasks from the front at once.
909             Flavor::Fifo => {
910                 // Copy the batch from the source to the destination buffer.
911                 match dest.flavor {
912                     Flavor::Fifo => {
913                         for i in 0..batch_size {
914                             unsafe {
915                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
916                                 dest_buffer.write(dest_b.wrapping_add(i), task);
917                             }
918                         }
919                     }
920                     Flavor::Lifo => {
921                         for i in 0..batch_size {
922                             unsafe {
923                                 let task = buffer.deref().read(f.wrapping_add(i + 1));
924                                 dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
925                             }
926                         }
927                     }
928                 }
929 
930                 // Try incrementing the front index to steal the batch.
931                 if self
932                     .inner
933                     .front
934                     .compare_exchange(
935                         f,
936                         f.wrapping_add(batch_size + 1),
937                         Ordering::SeqCst,
938                         Ordering::Relaxed,
939                     )
940                     .is_err()
941                 {
942                     // We didn't steal this task, forget it.
943                     mem::forget(task);
944                     return Steal::Retry;
945                 }
946 
947                 dest_b = dest_b.wrapping_add(batch_size);
948             }
949 
950             // Steal a batch of tasks from the front one by one.
951             Flavor::Lifo => {
952                 // Try incrementing the front index to steal the task.
953                 if self
954                     .inner
955                     .front
956                     .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
957                     .is_err()
958                 {
959                     // We didn't steal this task, forget it.
960                     mem::forget(task);
961                     return Steal::Retry;
962                 }
963 
964                 // Move the front index one step forward.
965                 f = f.wrapping_add(1);
966 
967                 // Repeat the same procedure for the batch steals.
968                 for i in 0..batch_size {
969                     // We've already got the current front index. Now execute the fence to
970                     // synchronize with other threads.
971                     atomic::fence(Ordering::SeqCst);
972 
973                     // Load the back index.
974                     let b = self.inner.back.load(Ordering::Acquire);
975 
976                     // Is the queue empty?
977                     if b.wrapping_sub(f) <= 0 {
978                         batch_size = i;
979                         break;
980                     }
981 
982                     // Read the task at the front.
983                     let tmp = unsafe { buffer.deref().read(f) };
984 
985                     // Try incrementing the front index to steal the task.
986                     if self
987                         .inner
988                         .front
989                         .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
990                         .is_err()
991                     {
992                         // We didn't steal this task, forget it and break from the loop.
993                         mem::forget(tmp);
994                         batch_size = i;
995                         break;
996                     }
997 
998                     // Write the previously stolen task into the destination buffer.
999                     unsafe {
1000                         dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1001                     }
1002 
1003                     // Move the source front index and the destination back index one step forward.
1004                     f = f.wrapping_add(1);
1005                     dest_b = dest_b.wrapping_add(1);
1006                 }
1007 
1008                 // If stealing into a FIFO queue, stolen tasks need to be reversed.
1009                 if dest.flavor == Flavor::Fifo {
1010                     for i in 0..batch_size / 2 {
1011                         unsafe {
1012                             let i1 = dest_b.wrapping_sub(batch_size - i);
1013                             let i2 = dest_b.wrapping_sub(i + 1);
1014                             let t1 = dest_buffer.read(i1);
1015                             let t2 = dest_buffer.read(i2);
1016                             dest_buffer.write(i1, t2);
1017                             dest_buffer.write(i2, t1);
1018                         }
1019                     }
1020                 }
1021             }
1022         }
1023 
1024         atomic::fence(Ordering::Release);
1025 
1026         // Update the back index in the destination queue.
1027         //
1028         // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1029         // races because it doesn't understand fences.
1030         dest.inner.back.store(dest_b, Ordering::Release);
1031 
1032         // Return with success.
1033         Steal::Success(task)
1034     }
1035 }
1036 
1037 impl<T> Clone for Stealer<T> {
clone(&self) -> Stealer<T>1038     fn clone(&self) -> Stealer<T> {
1039         Stealer {
1040             inner: self.inner.clone(),
1041             flavor: self.flavor,
1042         }
1043     }
1044 }
1045 
1046 impl<T> fmt::Debug for Stealer<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1047     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1048         f.pad("Stealer { .. }")
1049     }
1050 }
1051 
1052 // Bits indicating the state of a slot:
1053 // * If a task has been written into the slot, `WRITE` is set.
1054 // * If a task has been read from the slot, `READ` is set.
1055 // * If the block is being destroyed, `DESTROY` is set.
1056 const WRITE: usize = 1;
1057 const READ: usize = 2;
1058 const DESTROY: usize = 4;
1059 
1060 // Each block covers one "lap" of indices.
1061 const LAP: usize = 64;
1062 // The maximum number of values a block can hold.
1063 const BLOCK_CAP: usize = LAP - 1;
1064 // How many lower bits are reserved for metadata.
1065 const SHIFT: usize = 1;
1066 // Indicates that the block is not the last one.
1067 const HAS_NEXT: usize = 1;
1068 
1069 /// A slot in a block.
1070 struct Slot<T> {
1071     /// The task.
1072     task: UnsafeCell<MaybeUninit<T>>,
1073 
1074     /// The state of the slot.
1075     state: AtomicUsize,
1076 }
1077 
1078 impl<T> Slot<T> {
1079     /// Waits until a task is written into the slot.
wait_write(&self)1080     fn wait_write(&self) {
1081         let backoff = Backoff::new();
1082         while self.state.load(Ordering::Acquire) & WRITE == 0 {
1083             backoff.snooze();
1084         }
1085     }
1086 }
1087 
1088 /// A block in a linked list.
1089 ///
1090 /// Each block in the list can hold up to `BLOCK_CAP` values.
1091 struct Block<T> {
1092     /// The next block in the linked list.
1093     next: AtomicPtr<Block<T>>,
1094 
1095     /// Slots for values.
1096     slots: [Slot<T>; BLOCK_CAP],
1097 }
1098 
1099 impl<T> Block<T> {
1100     /// Creates an empty block that starts at `start_index`.
new() -> Block<T>1101     fn new() -> Block<T> {
1102         // SAFETY: This is safe because:
1103         //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1104         //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1105         //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1106         //       holds a MaybeUninit.
1107         //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1108         unsafe { MaybeUninit::zeroed().assume_init() }
1109     }
1110 
1111     /// Waits until the next pointer is set.
wait_next(&self) -> *mut Block<T>1112     fn wait_next(&self) -> *mut Block<T> {
1113         let backoff = Backoff::new();
1114         loop {
1115             let next = self.next.load(Ordering::Acquire);
1116             if !next.is_null() {
1117                 return next;
1118             }
1119             backoff.snooze();
1120         }
1121     }
1122 
1123     /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
destroy(this: *mut Block<T>, count: usize)1124     unsafe fn destroy(this: *mut Block<T>, count: usize) {
1125         // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1126         // begun destruction of the block.
1127         for i in (0..count).rev() {
1128             let slot = (*this).slots.get_unchecked(i);
1129 
1130             // Mark the `DESTROY` bit if a thread is still using the slot.
1131             if slot.state.load(Ordering::Acquire) & READ == 0
1132                 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1133             {
1134                 // If a thread is still using the slot, it will continue destruction of the block.
1135                 return;
1136             }
1137         }
1138 
1139         // No thread is using the block, now it is safe to destroy it.
1140         drop(Box::from_raw(this));
1141     }
1142 }
1143 
1144 /// A position in a queue.
1145 struct Position<T> {
1146     /// The index in the queue.
1147     index: AtomicUsize,
1148 
1149     /// The block in the linked list.
1150     block: AtomicPtr<Block<T>>,
1151 }
1152 
1153 /// An injector queue.
1154 ///
1155 /// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1156 /// a single injector queue, which is the entry point for new tasks.
1157 ///
1158 /// # Examples
1159 ///
1160 /// ```
1161 /// use crossbeam_deque::{Injector, Steal};
1162 ///
1163 /// let q = Injector::new();
1164 /// q.push(1);
1165 /// q.push(2);
1166 ///
1167 /// assert_eq!(q.steal(), Steal::Success(1));
1168 /// assert_eq!(q.steal(), Steal::Success(2));
1169 /// assert_eq!(q.steal(), Steal::Empty);
1170 /// ```
1171 pub struct Injector<T> {
1172     /// The head of the queue.
1173     head: CachePadded<Position<T>>,
1174 
1175     /// The tail of the queue.
1176     tail: CachePadded<Position<T>>,
1177 
1178     /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1179     _marker: PhantomData<T>,
1180 }
1181 
1182 unsafe impl<T: Send> Send for Injector<T> {}
1183 unsafe impl<T: Send> Sync for Injector<T> {}
1184 
1185 impl<T> Default for Injector<T> {
default() -> Self1186     fn default() -> Self {
1187         let block = Box::into_raw(Box::new(Block::<T>::new()));
1188         Self {
1189             head: CachePadded::new(Position {
1190                 block: AtomicPtr::new(block),
1191                 index: AtomicUsize::new(0),
1192             }),
1193             tail: CachePadded::new(Position {
1194                 block: AtomicPtr::new(block),
1195                 index: AtomicUsize::new(0),
1196             }),
1197             _marker: PhantomData,
1198         }
1199     }
1200 }
1201 
1202 impl<T> Injector<T> {
1203     /// Creates a new injector queue.
1204     ///
1205     /// # Examples
1206     ///
1207     /// ```
1208     /// use crossbeam_deque::Injector;
1209     ///
1210     /// let q = Injector::<i32>::new();
1211     /// ```
new() -> Injector<T>1212     pub fn new() -> Injector<T> {
1213         Self::default()
1214     }
1215 
1216     /// Pushes a task into the queue.
1217     ///
1218     /// # Examples
1219     ///
1220     /// ```
1221     /// use crossbeam_deque::Injector;
1222     ///
1223     /// let w = Injector::new();
1224     /// w.push(1);
1225     /// w.push(2);
1226     /// ```
push(&self, task: T)1227     pub fn push(&self, task: T) {
1228         let backoff = Backoff::new();
1229         let mut tail = self.tail.index.load(Ordering::Acquire);
1230         let mut block = self.tail.block.load(Ordering::Acquire);
1231         let mut next_block = None;
1232 
1233         loop {
1234             // Calculate the offset of the index into the block.
1235             let offset = (tail >> SHIFT) % LAP;
1236 
1237             // If we reached the end of the block, wait until the next one is installed.
1238             if offset == BLOCK_CAP {
1239                 backoff.snooze();
1240                 tail = self.tail.index.load(Ordering::Acquire);
1241                 block = self.tail.block.load(Ordering::Acquire);
1242                 continue;
1243             }
1244 
1245             // If we're going to have to install the next block, allocate it in advance in order to
1246             // make the wait for other threads as short as possible.
1247             if offset + 1 == BLOCK_CAP && next_block.is_none() {
1248                 next_block = Some(Box::new(Block::<T>::new()));
1249             }
1250 
1251             let new_tail = tail + (1 << SHIFT);
1252 
1253             // Try advancing the tail forward.
1254             match self.tail.index.compare_exchange_weak(
1255                 tail,
1256                 new_tail,
1257                 Ordering::SeqCst,
1258                 Ordering::Acquire,
1259             ) {
1260                 Ok(_) => unsafe {
1261                     // If we've reached the end of the block, install the next one.
1262                     if offset + 1 == BLOCK_CAP {
1263                         let next_block = Box::into_raw(next_block.unwrap());
1264                         let next_index = new_tail.wrapping_add(1 << SHIFT);
1265 
1266                         self.tail.block.store(next_block, Ordering::Release);
1267                         self.tail.index.store(next_index, Ordering::Release);
1268                         (*block).next.store(next_block, Ordering::Release);
1269                     }
1270 
1271                     // Write the task into the slot.
1272                     let slot = (*block).slots.get_unchecked(offset);
1273                     slot.task.get().write(MaybeUninit::new(task));
1274                     slot.state.fetch_or(WRITE, Ordering::Release);
1275 
1276                     return;
1277                 },
1278                 Err(t) => {
1279                     tail = t;
1280                     block = self.tail.block.load(Ordering::Acquire);
1281                     backoff.spin();
1282                 }
1283             }
1284         }
1285     }
1286 
1287     /// Steals a task from the queue.
1288     ///
1289     /// # Examples
1290     ///
1291     /// ```
1292     /// use crossbeam_deque::{Injector, Steal};
1293     ///
1294     /// let q = Injector::new();
1295     /// q.push(1);
1296     /// q.push(2);
1297     ///
1298     /// assert_eq!(q.steal(), Steal::Success(1));
1299     /// assert_eq!(q.steal(), Steal::Success(2));
1300     /// assert_eq!(q.steal(), Steal::Empty);
1301     /// ```
steal(&self) -> Steal<T>1302     pub fn steal(&self) -> Steal<T> {
1303         let mut head;
1304         let mut block;
1305         let mut offset;
1306 
1307         let backoff = Backoff::new();
1308         loop {
1309             head = self.head.index.load(Ordering::Acquire);
1310             block = self.head.block.load(Ordering::Acquire);
1311 
1312             // Calculate the offset of the index into the block.
1313             offset = (head >> SHIFT) % LAP;
1314 
1315             // If we reached the end of the block, wait until the next one is installed.
1316             if offset == BLOCK_CAP {
1317                 backoff.snooze();
1318             } else {
1319                 break;
1320             }
1321         }
1322 
1323         let mut new_head = head + (1 << SHIFT);
1324 
1325         if new_head & HAS_NEXT == 0 {
1326             atomic::fence(Ordering::SeqCst);
1327             let tail = self.tail.index.load(Ordering::Relaxed);
1328 
1329             // If the tail equals the head, that means the queue is empty.
1330             if head >> SHIFT == tail >> SHIFT {
1331                 return Steal::Empty;
1332             }
1333 
1334             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1335             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1336                 new_head |= HAS_NEXT;
1337             }
1338         }
1339 
1340         // Try moving the head index forward.
1341         if self
1342             .head
1343             .index
1344             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1345             .is_err()
1346         {
1347             return Steal::Retry;
1348         }
1349 
1350         unsafe {
1351             // If we've reached the end of the block, move to the next one.
1352             if offset + 1 == BLOCK_CAP {
1353                 let next = (*block).wait_next();
1354                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1355                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1356                     next_index |= HAS_NEXT;
1357                 }
1358 
1359                 self.head.block.store(next, Ordering::Release);
1360                 self.head.index.store(next_index, Ordering::Release);
1361             }
1362 
1363             // Read the task.
1364             let slot = (*block).slots.get_unchecked(offset);
1365             slot.wait_write();
1366             let task = slot.task.get().read().assume_init();
1367 
1368             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1369             // but couldn't because we were busy reading from the slot.
1370             if (offset + 1 == BLOCK_CAP) || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) {
1371                 Block::destroy(block, offset);
1372             }
1373 
1374             Steal::Success(task)
1375         }
1376     }
1377 
1378     /// Steals a batch of tasks and pushes them into a worker.
1379     ///
1380     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1381     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1382     ///
1383     /// # Examples
1384     ///
1385     /// ```
1386     /// use crossbeam_deque::{Injector, Worker};
1387     ///
1388     /// let q = Injector::new();
1389     /// q.push(1);
1390     /// q.push(2);
1391     /// q.push(3);
1392     /// q.push(4);
1393     ///
1394     /// let w = Worker::new_fifo();
1395     /// let _ = q.steal_batch(&w);
1396     /// assert_eq!(w.pop(), Some(1));
1397     /// assert_eq!(w.pop(), Some(2));
1398     /// ```
steal_batch(&self, dest: &Worker<T>) -> Steal<()>1399     pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1400         let mut head;
1401         let mut block;
1402         let mut offset;
1403 
1404         let backoff = Backoff::new();
1405         loop {
1406             head = self.head.index.load(Ordering::Acquire);
1407             block = self.head.block.load(Ordering::Acquire);
1408 
1409             // Calculate the offset of the index into the block.
1410             offset = (head >> SHIFT) % LAP;
1411 
1412             // If we reached the end of the block, wait until the next one is installed.
1413             if offset == BLOCK_CAP {
1414                 backoff.snooze();
1415             } else {
1416                 break;
1417             }
1418         }
1419 
1420         let mut new_head = head;
1421         let advance;
1422 
1423         if new_head & HAS_NEXT == 0 {
1424             atomic::fence(Ordering::SeqCst);
1425             let tail = self.tail.index.load(Ordering::Relaxed);
1426 
1427             // If the tail equals the head, that means the queue is empty.
1428             if head >> SHIFT == tail >> SHIFT {
1429                 return Steal::Empty;
1430             }
1431 
1432             // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1433             // the right batch size to steal.
1434             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1435                 new_head |= HAS_NEXT;
1436                 // We can steal all tasks till the end of the block.
1437                 advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1438             } else {
1439                 let len = (tail - head) >> SHIFT;
1440                 // Steal half of the available tasks.
1441                 advance = ((len + 1) / 2).min(MAX_BATCH);
1442             }
1443         } else {
1444             // We can steal all tasks till the end of the block.
1445             advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1446         }
1447 
1448         new_head += advance << SHIFT;
1449         let new_offset = offset + advance;
1450 
1451         // Try moving the head index forward.
1452         if self
1453             .head
1454             .index
1455             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1456             .is_err()
1457         {
1458             return Steal::Retry;
1459         }
1460 
1461         // Reserve capacity for the stolen batch.
1462         let batch_size = new_offset - offset;
1463         dest.reserve(batch_size);
1464 
1465         // Get the destination buffer and back index.
1466         let dest_buffer = dest.buffer.get();
1467         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1468 
1469         unsafe {
1470             // If we've reached the end of the block, move to the next one.
1471             if new_offset == BLOCK_CAP {
1472                 let next = (*block).wait_next();
1473                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1474                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1475                     next_index |= HAS_NEXT;
1476                 }
1477 
1478                 self.head.block.store(next, Ordering::Release);
1479                 self.head.index.store(next_index, Ordering::Release);
1480             }
1481 
1482             // Copy values from the injector into the destination queue.
1483             match dest.flavor {
1484                 Flavor::Fifo => {
1485                     for i in 0..batch_size {
1486                         // Read the task.
1487                         let slot = (*block).slots.get_unchecked(offset + i);
1488                         slot.wait_write();
1489                         let task = slot.task.get().read().assume_init();
1490 
1491                         // Write it into the destination queue.
1492                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1493                     }
1494                 }
1495 
1496                 Flavor::Lifo => {
1497                     for i in 0..batch_size {
1498                         // Read the task.
1499                         let slot = (*block).slots.get_unchecked(offset + i);
1500                         slot.wait_write();
1501                         let task = slot.task.get().read().assume_init();
1502 
1503                         // Write it into the destination queue.
1504                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1505                     }
1506                 }
1507             }
1508 
1509             atomic::fence(Ordering::Release);
1510 
1511             // Update the back index in the destination queue.
1512             //
1513             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1514             // data races because it doesn't understand fences.
1515             dest.inner
1516                 .back
1517                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1518 
1519             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1520             // but couldn't because we were busy reading from the slot.
1521             if new_offset == BLOCK_CAP {
1522                 Block::destroy(block, offset);
1523             } else {
1524                 for i in offset..new_offset {
1525                     let slot = (*block).slots.get_unchecked(i);
1526 
1527                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1528                         Block::destroy(block, offset);
1529                         break;
1530                     }
1531                 }
1532             }
1533 
1534             Steal::Success(())
1535         }
1536     }
1537 
1538     /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1539     ///
1540     /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1541     /// steal around half of the tasks in the queue, but also not more than some constant limit.
1542     ///
1543     /// # Examples
1544     ///
1545     /// ```
1546     /// use crossbeam_deque::{Injector, Steal, Worker};
1547     ///
1548     /// let q = Injector::new();
1549     /// q.push(1);
1550     /// q.push(2);
1551     /// q.push(3);
1552     /// q.push(4);
1553     ///
1554     /// let w = Worker::new_fifo();
1555     /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1556     /// assert_eq!(w.pop(), Some(2));
1557     /// ```
steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T>1558     pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1559         let mut head;
1560         let mut block;
1561         let mut offset;
1562 
1563         let backoff = Backoff::new();
1564         loop {
1565             head = self.head.index.load(Ordering::Acquire);
1566             block = self.head.block.load(Ordering::Acquire);
1567 
1568             // Calculate the offset of the index into the block.
1569             offset = (head >> SHIFT) % LAP;
1570 
1571             // If we reached the end of the block, wait until the next one is installed.
1572             if offset == BLOCK_CAP {
1573                 backoff.snooze();
1574             } else {
1575                 break;
1576             }
1577         }
1578 
1579         let mut new_head = head;
1580         let advance;
1581 
1582         if new_head & HAS_NEXT == 0 {
1583             atomic::fence(Ordering::SeqCst);
1584             let tail = self.tail.index.load(Ordering::Relaxed);
1585 
1586             // If the tail equals the head, that means the queue is empty.
1587             if head >> SHIFT == tail >> SHIFT {
1588                 return Steal::Empty;
1589             }
1590 
1591             // If head and tail are not in the same block, set `HAS_NEXT` in head.
1592             if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1593                 new_head |= HAS_NEXT;
1594                 // We can steal all tasks till the end of the block.
1595                 advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1596             } else {
1597                 let len = (tail - head) >> SHIFT;
1598                 // Steal half of the available tasks.
1599                 advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1600             }
1601         } else {
1602             // We can steal all tasks till the end of the block.
1603             advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1604         }
1605 
1606         new_head += advance << SHIFT;
1607         let new_offset = offset + advance;
1608 
1609         // Try moving the head index forward.
1610         if self
1611             .head
1612             .index
1613             .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1614             .is_err()
1615         {
1616             return Steal::Retry;
1617         }
1618 
1619         // Reserve capacity for the stolen batch.
1620         let batch_size = new_offset - offset - 1;
1621         dest.reserve(batch_size);
1622 
1623         // Get the destination buffer and back index.
1624         let dest_buffer = dest.buffer.get();
1625         let dest_b = dest.inner.back.load(Ordering::Relaxed);
1626 
1627         unsafe {
1628             // If we've reached the end of the block, move to the next one.
1629             if new_offset == BLOCK_CAP {
1630                 let next = (*block).wait_next();
1631                 let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1632                 if !(*next).next.load(Ordering::Relaxed).is_null() {
1633                     next_index |= HAS_NEXT;
1634                 }
1635 
1636                 self.head.block.store(next, Ordering::Release);
1637                 self.head.index.store(next_index, Ordering::Release);
1638             }
1639 
1640             // Read the task.
1641             let slot = (*block).slots.get_unchecked(offset);
1642             slot.wait_write();
1643             let task = slot.task.get().read().assume_init();
1644 
1645             match dest.flavor {
1646                 Flavor::Fifo => {
1647                     // Copy values from the injector into the destination queue.
1648                     for i in 0..batch_size {
1649                         // Read the task.
1650                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1651                         slot.wait_write();
1652                         let task = slot.task.get().read().assume_init();
1653 
1654                         // Write it into the destination queue.
1655                         dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1656                     }
1657                 }
1658 
1659                 Flavor::Lifo => {
1660                     // Copy values from the injector into the destination queue.
1661                     for i in 0..batch_size {
1662                         // Read the task.
1663                         let slot = (*block).slots.get_unchecked(offset + i + 1);
1664                         slot.wait_write();
1665                         let task = slot.task.get().read().assume_init();
1666 
1667                         // Write it into the destination queue.
1668                         dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1669                     }
1670                 }
1671             }
1672 
1673             atomic::fence(Ordering::Release);
1674 
1675             // Update the back index in the destination queue.
1676             //
1677             // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1678             // data races because it doesn't understand fences.
1679             dest.inner
1680                 .back
1681                 .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1682 
1683             // Destroy the block if we've reached the end, or if another thread wanted to destroy
1684             // but couldn't because we were busy reading from the slot.
1685             if new_offset == BLOCK_CAP {
1686                 Block::destroy(block, offset);
1687             } else {
1688                 for i in offset..new_offset {
1689                     let slot = (*block).slots.get_unchecked(i);
1690 
1691                     if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1692                         Block::destroy(block, offset);
1693                         break;
1694                     }
1695                 }
1696             }
1697 
1698             Steal::Success(task)
1699         }
1700     }
1701 
1702     /// Returns `true` if the queue is empty.
1703     ///
1704     /// # Examples
1705     ///
1706     /// ```
1707     /// use crossbeam_deque::Injector;
1708     ///
1709     /// let q = Injector::new();
1710     ///
1711     /// assert!(q.is_empty());
1712     /// q.push(1);
1713     /// assert!(!q.is_empty());
1714     /// ```
is_empty(&self) -> bool1715     pub fn is_empty(&self) -> bool {
1716         let head = self.head.index.load(Ordering::SeqCst);
1717         let tail = self.tail.index.load(Ordering::SeqCst);
1718         head >> SHIFT == tail >> SHIFT
1719     }
1720 
1721     /// Returns the number of tasks in the queue.
1722     ///
1723     /// # Examples
1724     ///
1725     /// ```
1726     /// use crossbeam_deque::Injector;
1727     ///
1728     /// let q = Injector::new();
1729     ///
1730     /// assert_eq!(q.len(), 0);
1731     /// q.push(1);
1732     /// assert_eq!(q.len(), 1);
1733     /// q.push(1);
1734     /// assert_eq!(q.len(), 2);
1735     /// ```
len(&self) -> usize1736     pub fn len(&self) -> usize {
1737         loop {
1738             // Load the tail index, then load the head index.
1739             let mut tail = self.tail.index.load(Ordering::SeqCst);
1740             let mut head = self.head.index.load(Ordering::SeqCst);
1741 
1742             // If the tail index didn't change, we've got consistent indices to work with.
1743             if self.tail.index.load(Ordering::SeqCst) == tail {
1744                 // Erase the lower bits.
1745                 tail &= !((1 << SHIFT) - 1);
1746                 head &= !((1 << SHIFT) - 1);
1747 
1748                 // Fix up indices if they fall onto block ends.
1749                 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1750                     tail = tail.wrapping_add(1 << SHIFT);
1751                 }
1752                 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1753                     head = head.wrapping_add(1 << SHIFT);
1754                 }
1755 
1756                 // Rotate indices so that head falls into the first block.
1757                 let lap = (head >> SHIFT) / LAP;
1758                 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1759                 head = head.wrapping_sub((lap * LAP) << SHIFT);
1760 
1761                 // Remove the lower bits.
1762                 tail >>= SHIFT;
1763                 head >>= SHIFT;
1764 
1765                 // Return the difference minus the number of blocks between tail and head.
1766                 return tail - head - tail / LAP;
1767             }
1768         }
1769     }
1770 }
1771 
1772 impl<T> Drop for Injector<T> {
drop(&mut self)1773     fn drop(&mut self) {
1774         let mut head = self.head.index.load(Ordering::Relaxed);
1775         let mut tail = self.tail.index.load(Ordering::Relaxed);
1776         let mut block = self.head.block.load(Ordering::Relaxed);
1777 
1778         // Erase the lower bits.
1779         head &= !((1 << SHIFT) - 1);
1780         tail &= !((1 << SHIFT) - 1);
1781 
1782         unsafe {
1783             // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1784             while head != tail {
1785                 let offset = (head >> SHIFT) % LAP;
1786 
1787                 if offset < BLOCK_CAP {
1788                     // Drop the task in the slot.
1789                     let slot = (*block).slots.get_unchecked(offset);
1790                     let p = &mut *slot.task.get();
1791                     p.as_mut_ptr().drop_in_place();
1792                 } else {
1793                     // Deallocate the block and move to the next one.
1794                     let next = (*block).next.load(Ordering::Relaxed);
1795                     drop(Box::from_raw(block));
1796                     block = next;
1797                 }
1798 
1799                 head = head.wrapping_add(1 << SHIFT);
1800             }
1801 
1802             // Deallocate the last remaining block.
1803             drop(Box::from_raw(block));
1804         }
1805     }
1806 }
1807 
1808 impl<T> fmt::Debug for Injector<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1809     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1810         f.pad("Worker { .. }")
1811     }
1812 }
1813 
1814 /// Possible outcomes of a steal operation.
1815 ///
1816 /// # Examples
1817 ///
1818 /// There are lots of ways to chain results of steal operations together:
1819 ///
1820 /// ```
1821 /// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
1822 ///
1823 /// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
1824 ///
1825 /// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
1826 /// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
1827 /// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
1828 ///
1829 /// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
1830 /// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
1831 /// ```
1832 #[must_use]
1833 #[derive(PartialEq, Eq, Copy, Clone)]
1834 pub enum Steal<T> {
1835     /// The queue was empty at the time of stealing.
1836     Empty,
1837 
1838     /// At least one task was successfully stolen.
1839     Success(T),
1840 
1841     /// The steal operation needs to be retried.
1842     Retry,
1843 }
1844 
1845 impl<T> Steal<T> {
1846     /// Returns `true` if the queue was empty at the time of stealing.
1847     ///
1848     /// # Examples
1849     ///
1850     /// ```
1851     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1852     ///
1853     /// assert!(!Success(7).is_empty());
1854     /// assert!(!Retry::<i32>.is_empty());
1855     ///
1856     /// assert!(Empty::<i32>.is_empty());
1857     /// ```
is_empty(&self) -> bool1858     pub fn is_empty(&self) -> bool {
1859         match self {
1860             Steal::Empty => true,
1861             _ => false,
1862         }
1863     }
1864 
1865     /// Returns `true` if at least one task was stolen.
1866     ///
1867     /// # Examples
1868     ///
1869     /// ```
1870     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1871     ///
1872     /// assert!(!Empty::<i32>.is_success());
1873     /// assert!(!Retry::<i32>.is_success());
1874     ///
1875     /// assert!(Success(7).is_success());
1876     /// ```
is_success(&self) -> bool1877     pub fn is_success(&self) -> bool {
1878         match self {
1879             Steal::Success(_) => true,
1880             _ => false,
1881         }
1882     }
1883 
1884     /// Returns `true` if the steal operation needs to be retried.
1885     ///
1886     /// # Examples
1887     ///
1888     /// ```
1889     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1890     ///
1891     /// assert!(!Empty::<i32>.is_retry());
1892     /// assert!(!Success(7).is_retry());
1893     ///
1894     /// assert!(Retry::<i32>.is_retry());
1895     /// ```
is_retry(&self) -> bool1896     pub fn is_retry(&self) -> bool {
1897         match self {
1898             Steal::Retry => true,
1899             _ => false,
1900         }
1901     }
1902 
1903     /// Returns the result of the operation, if successful.
1904     ///
1905     /// # Examples
1906     ///
1907     /// ```
1908     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1909     ///
1910     /// assert_eq!(Empty::<i32>.success(), None);
1911     /// assert_eq!(Retry::<i32>.success(), None);
1912     ///
1913     /// assert_eq!(Success(7).success(), Some(7));
1914     /// ```
success(self) -> Option<T>1915     pub fn success(self) -> Option<T> {
1916         match self {
1917             Steal::Success(res) => Some(res),
1918             _ => None,
1919         }
1920     }
1921 
1922     /// If no task was stolen, attempts another steal operation.
1923     ///
1924     /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1925     ///
1926     /// * If the second steal resulted in `Success`, it is returned.
1927     /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1928     /// * If both resulted in `None`, then `None` is returned.
1929     ///
1930     /// # Examples
1931     ///
1932     /// ```
1933     /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1934     ///
1935     /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
1936     /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
1937     ///
1938     /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
1939     /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
1940     ///
1941     /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
1942     /// ```
or_else<F>(self, f: F) -> Steal<T> where F: FnOnce() -> Steal<T>,1943     pub fn or_else<F>(self, f: F) -> Steal<T>
1944     where
1945         F: FnOnce() -> Steal<T>,
1946     {
1947         match self {
1948             Steal::Empty => f(),
1949             Steal::Success(_) => self,
1950             Steal::Retry => {
1951                 if let Steal::Success(res) = f() {
1952                     Steal::Success(res)
1953                 } else {
1954                     Steal::Retry
1955                 }
1956             }
1957         }
1958     }
1959 }
1960 
1961 impl<T> fmt::Debug for Steal<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1962     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1963         match self {
1964             Steal::Empty => f.pad("Empty"),
1965             Steal::Success(_) => f.pad("Success(..)"),
1966             Steal::Retry => f.pad("Retry"),
1967         }
1968     }
1969 }
1970 
1971 impl<T> FromIterator<Steal<T>> for Steal<T> {
1972     /// Consumes items until a `Success` is found and returns it.
1973     ///
1974     /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
1975     /// Otherwise, `Empty` is returned.
from_iter<I>(iter: I) -> Steal<T> where I: IntoIterator<Item = Steal<T>>,1976     fn from_iter<I>(iter: I) -> Steal<T>
1977     where
1978         I: IntoIterator<Item = Steal<T>>,
1979     {
1980         let mut retry = false;
1981         for s in iter {
1982             match &s {
1983                 Steal::Empty => {}
1984                 Steal::Success(_) => return s,
1985                 Steal::Retry => retry = true,
1986             }
1987         }
1988 
1989         if retry {
1990             Steal::Retry
1991         } else {
1992             Steal::Empty
1993         }
1994     }
1995 }
1996