1 /*
2  * Copyright (c) 2016-present, Facebook, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under both the BSD-style license (found in the
6  * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7  * in the COPYING file in the root directory of this source tree).
8  */
9 #pragma once
10 
11 #include "utils/Buffer.h"
12 
13 #include <atomic>
14 #include <cassert>
15 #include <cstddef>
16 #include <condition_variable>
17 #include <cstddef>
18 #include <functional>
19 #include <mutex>
20 #include <queue>
21 
22 namespace pzstd {
23 
24 /// Unbounded thread-safe work queue.
25 template <typename T>
26 class WorkQueue {
27   // Protects all member variable access
28   std::mutex mutex_;
29   std::condition_variable readerCv_;
30   std::condition_variable writerCv_;
31   std::condition_variable finishCv_;
32 
33   std::queue<T> queue_;
34   bool done_;
35   std::size_t maxSize_;
36 
37   // Must have lock to call this function
full()38   bool full() const {
39     if (maxSize_ == 0) {
40       return false;
41     }
42     return queue_.size() >= maxSize_;
43   }
44 
45  public:
46   /**
47    * Constructs an empty work queue with an optional max size.
48    * If `maxSize == 0` the queue size is unbounded.
49    *
50    * @param maxSize The maximum allowed size of the work queue.
51    */
done_(false)52   WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
53 
54   /**
55    * Push an item onto the work queue.  Notify a single thread that work is
56    * available.  If `finish()` has been called, do nothing and return false.
57    * If `push()` returns false, then `item` has not been moved from.
58    *
59    * @param item  Item to push onto the queue.
60    * @returns     True upon success, false if `finish()` has been called.  An
61    *               item was pushed iff `push()` returns true.
62    */
push(T && item)63   bool push(T&& item) {
64     {
65       std::unique_lock<std::mutex> lock(mutex_);
66       while (full() && !done_) {
67         writerCv_.wait(lock);
68       }
69       if (done_) {
70         return false;
71       }
72       queue_.push(std::move(item));
73     }
74     readerCv_.notify_one();
75     return true;
76   }
77 
78   /**
79    * Attempts to pop an item off the work queue.  It will block until data is
80    * available or `finish()` has been called.
81    *
82    * @param[out] item  If `pop` returns `true`, it contains the popped item.
83    *                    If `pop` returns `false`, it is unmodified.
84    * @returns          True upon success.  False if the queue is empty and
85    *                    `finish()` has been called.
86    */
pop(T & item)87   bool pop(T& item) {
88     {
89       std::unique_lock<std::mutex> lock(mutex_);
90       while (queue_.empty() && !done_) {
91         readerCv_.wait(lock);
92       }
93       if (queue_.empty()) {
94         assert(done_);
95         return false;
96       }
97       item = std::move(queue_.front());
98       queue_.pop();
99     }
100     writerCv_.notify_one();
101     return true;
102   }
103 
104   /**
105    * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded.
106    *
107    * @param maxSize The new maximum queue size.
108    */
setMaxSize(std::size_t maxSize)109   void setMaxSize(std::size_t maxSize) {
110     {
111       std::lock_guard<std::mutex> lock(mutex_);
112       maxSize_ = maxSize;
113     }
114     writerCv_.notify_all();
115   }
116 
117   /**
118    * Promise that `push()` won't be called again, so once the queue is empty
119    * there will never any more work.
120    */
finish()121   void finish() {
122     {
123       std::lock_guard<std::mutex> lock(mutex_);
124       assert(!done_);
125       done_ = true;
126     }
127     readerCv_.notify_all();
128     writerCv_.notify_all();
129     finishCv_.notify_all();
130   }
131 
132   /// Blocks until `finish()` has been called (but the queue may not be empty).
waitUntilFinished()133   void waitUntilFinished() {
134     std::unique_lock<std::mutex> lock(mutex_);
135     while (!done_) {
136       finishCv_.wait(lock);
137     }
138   }
139 };
140 
141 /// Work queue for `Buffer`s that knows the total number of bytes in the queue.
142 class BufferWorkQueue {
143   WorkQueue<Buffer> queue_;
144   std::atomic<std::size_t> size_;
145 
146  public:
queue_(maxSize)147   BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
148 
push(Buffer buffer)149   void push(Buffer buffer) {
150     size_.fetch_add(buffer.size());
151     queue_.push(std::move(buffer));
152   }
153 
pop(Buffer & buffer)154   bool pop(Buffer& buffer) {
155     bool result = queue_.pop(buffer);
156     if (result) {
157       size_.fetch_sub(buffer.size());
158     }
159     return result;
160   }
161 
setMaxSize(std::size_t maxSize)162   void setMaxSize(std::size_t maxSize) {
163     queue_.setMaxSize(maxSize);
164   }
165 
finish()166   void finish() {
167     queue_.finish();
168   }
169 
170   /**
171    * Blocks until `finish()` has been called.
172    *
173    * @returns The total number of bytes of all the `Buffer`s currently in the
174    *           queue.
175    */
size()176   std::size_t size() {
177     queue_.waitUntilFinished();
178     return size_.load();
179   }
180 };
181 }
182