1 // This file is part of Eigen, a lightweight C++ template library
2 // for linear algebra.
3 //
4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5 //
6 // This Source Code Form is subject to the terms of the Mozilla
7 // Public License v. 2.0. If a copy of the MPL was not distributed
8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9 
10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
12 
13 
14 namespace Eigen {
15 
16 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
17 // Operations on front of the queue must be done by a single thread (owner),
18 // operations on back of the queue can be done by multiple threads concurrently.
19 //
20 // Algorithm outline:
21 // All remote threads operating on the queue back are serialized by a mutex.
22 // This ensures that at most two threads access state: owner and one remote
23 // thread (Size aside). The algorithm ensures that the occupied region of the
24 // underlying array is logically continuous (can wraparound, but no stray
25 // occupied elements). Owner operates on one end of this region, remote thread
26 // operates on the other end. Synchronization between these threads
27 // (potential consumption of the last element and take up of the last empty
28 // element) happens by means of state variable in each element. States are:
29 // empty, busy (in process of insertion of removal) and ready. Threads claim
30 // elements (empty->busy and ready->busy transitions) by means of a CAS
31 // operation. The finishing transition (busy->empty and busy->ready) are done
32 // with plain store as the element is exclusively owned by the current thread.
33 //
34 // Note: we could permit only pointers as elements, then we would not need
35 // separate state variable as null/non-null pointer value would serve as state,
36 // but that would require malloc/free per operation for large, complex values
37 // (and this is designed to store std::function<()>).
38 template <typename Work, unsigned kSize>
39 class RunQueue {
40  public:
RunQueue()41   RunQueue() : front_(0), back_(0) {
42     // require power-of-two for fast masking
43     eigen_assert((kSize & (kSize - 1)) == 0);
44     eigen_assert(kSize > 2);            // why would you do this?
45     eigen_assert(kSize <= (64 << 10));  // leave enough space for counter
46     for (unsigned i = 0; i < kSize; i++)
47       array_[i].state.store(kEmpty, std::memory_order_relaxed);
48   }
49 
~RunQueue()50   ~RunQueue() { eigen_assert(Size() == 0); }
51 
52   // PushFront inserts w at the beginning of the queue.
53   // If queue is full returns w, otherwise returns default-constructed Work.
PushFront(Work w)54   Work PushFront(Work w) {
55     unsigned front = front_.load(std::memory_order_relaxed);
56     Elem* e = &array_[front & kMask];
57     uint8_t s = e->state.load(std::memory_order_relaxed);
58     if (s != kEmpty ||
59         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
60       return w;
61     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
62     e->w = std::move(w);
63     e->state.store(kReady, std::memory_order_release);
64     return Work();
65   }
66 
67   // PopFront removes and returns the first element in the queue.
68   // If the queue was empty returns default-constructed Work.
PopFront()69   Work PopFront() {
70     unsigned front = front_.load(std::memory_order_relaxed);
71     Elem* e = &array_[(front - 1) & kMask];
72     uint8_t s = e->state.load(std::memory_order_relaxed);
73     if (s != kReady ||
74         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
75       return Work();
76     Work w = std::move(e->w);
77     e->state.store(kEmpty, std::memory_order_release);
78     front = ((front - 1) & kMask2) | (front & ~kMask2);
79     front_.store(front, std::memory_order_relaxed);
80     return w;
81   }
82 
83   // PushBack adds w at the end of the queue.
84   // If queue is full returns w, otherwise returns default-constructed Work.
PushBack(Work w)85   Work PushBack(Work w) {
86     std::unique_lock<std::mutex> lock(mutex_);
87     unsigned back = back_.load(std::memory_order_relaxed);
88     Elem* e = &array_[(back - 1) & kMask];
89     uint8_t s = e->state.load(std::memory_order_relaxed);
90     if (s != kEmpty ||
91         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
92       return w;
93     back = ((back - 1) & kMask2) | (back & ~kMask2);
94     back_.store(back, std::memory_order_relaxed);
95     e->w = std::move(w);
96     e->state.store(kReady, std::memory_order_release);
97     return Work();
98   }
99 
100   // PopBack removes and returns the last elements in the queue.
101   // Can fail spuriously.
PopBack()102   Work PopBack() {
103     if (Empty()) return Work();
104     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
105     if (!lock) return Work();
106     unsigned back = back_.load(std::memory_order_relaxed);
107     Elem* e = &array_[back & kMask];
108     uint8_t s = e->state.load(std::memory_order_relaxed);
109     if (s != kReady ||
110         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
111       return Work();
112     Work w = std::move(e->w);
113     e->state.store(kEmpty, std::memory_order_release);
114     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
115     return w;
116   }
117 
118   // PopBackHalf removes and returns half last elements in the queue.
119   // Returns number of elements removed. But can also fail spuriously.
PopBackHalf(std::vector<Work> * result)120   unsigned PopBackHalf(std::vector<Work>* result) {
121     if (Empty()) return 0;
122     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
123     if (!lock) return 0;
124     unsigned back = back_.load(std::memory_order_relaxed);
125     unsigned size = Size();
126     unsigned mid = back;
127     if (size > 1) mid = back + (size - 1) / 2;
128     unsigned n = 0;
129     unsigned start = 0;
130     for (; static_cast<int>(mid - back) >= 0; mid--) {
131       Elem* e = &array_[mid & kMask];
132       uint8_t s = e->state.load(std::memory_order_relaxed);
133       if (n == 0) {
134         if (s != kReady ||
135             !e->state.compare_exchange_strong(s, kBusy,
136                                               std::memory_order_acquire))
137           continue;
138         start = mid;
139       } else {
140         // Note: no need to store temporal kBusy, we exclusively own these
141         // elements.
142         eigen_assert(s == kReady);
143       }
144       result->push_back(std::move(e->w));
145       e->state.store(kEmpty, std::memory_order_release);
146       n++;
147     }
148     if (n != 0)
149       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
150     return n;
151   }
152 
153   // Size returns current queue size.
154   // Can be called by any thread at any time.
Size()155   unsigned Size() const {
156     // Emptiness plays critical role in thread pool blocking. So we go to great
157     // effort to not produce false positives (claim non-empty queue as empty).
158     for (;;) {
159       // Capture a consistent snapshot of front/tail.
160       unsigned front = front_.load(std::memory_order_acquire);
161       unsigned back = back_.load(std::memory_order_acquire);
162       unsigned front1 = front_.load(std::memory_order_relaxed);
163       if (front != front1) continue;
164       int size = (front & kMask2) - (back & kMask2);
165       // Fix overflow.
166       if (size < 0) size += 2 * kSize;
167       // Order of modification in push/pop is crafted to make the queue look
168       // larger than it is during concurrent modifications. E.g. pop can
169       // decrement size before the corresponding push has incremented it.
170       // So the computed size can be up to kSize + 1, fix it.
171       if (size > static_cast<int>(kSize)) size = kSize;
172       return size;
173     }
174   }
175 
176   // Empty tests whether container is empty.
177   // Can be called by any thread at any time.
Empty()178   bool Empty() const { return Size() == 0; }
179 
180  private:
181   static const unsigned kMask = kSize - 1;
182   static const unsigned kMask2 = (kSize << 1) - 1;
183   struct Elem {
184     std::atomic<uint8_t> state;
185     Work w;
186   };
187   enum {
188     kEmpty,
189     kBusy,
190     kReady,
191   };
192   std::mutex mutex_;
193   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
194   // front/back, repsectively. The remaining bits contain modification counters
195   // that are incremented on Push operations. This allows us to (1) distinguish
196   // between empty and full conditions (if we would use log(kSize) bits for
197   // position, these conditions would be indistinguishable); (2) obtain
198   // consistent snapshot of front_/back_ for Size operation using the
199   // modification counters.
200   std::atomic<unsigned> front_;
201   std::atomic<unsigned> back_;
202   Elem array_[kSize];
203 
204   RunQueue(const RunQueue&) = delete;
205   void operator=(const RunQueue&) = delete;
206 };
207 
208 }  // namespace Eigen
209 
210 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
211