1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <condition_variable>
20 #include <functional>
21 #include <list>
22 #include <mutex>
23 #include <optional>
24 #include "android-base/thread_annotations.h"
25 
26 namespace android {
27 
28 /**
29  * A thread-safe FIFO queue. This list-backed queue stores up to <i>capacity</i> objects if
30  * a capacity is provided at construction, and is otherwise unbounded.
31  * Objects can always be added. Objects are added immediately.
32  * If the queue is full, new objects cannot be added.
33  *
34  * The action of retrieving an object will block until an element is available.
35  */
36 template <class T>
37 class BlockingQueue {
38 public:
39     explicit BlockingQueue() = default;
40 
BlockingQueue(size_t capacity)41     explicit BlockingQueue(size_t capacity) : mCapacity(capacity){};
42 
43     /**
44      * Retrieve and remove the oldest object.
45      * Blocks execution indefinitely while queue is empty.
46      */
pop()47     T pop() {
48         std::unique_lock lock(mLock);
49         android::base::ScopedLockAssertion assumeLock(mLock);
50         mHasElements.wait(lock, [this]() REQUIRES(mLock) { return !this->mQueue.empty(); });
51         T t = std::move(mQueue.front());
52         mQueue.erase(mQueue.begin());
53         return t;
54     };
55 
56     /**
57      * Retrieve and remove the oldest object.
58      * Blocks execution for the given duration while queue is empty, and returns std::nullopt
59      * if the queue was empty for the entire duration.
60      */
popWithTimeout(std::chrono::nanoseconds duration)61     std::optional<T> popWithTimeout(std::chrono::nanoseconds duration) {
62         std::unique_lock lock(mLock);
63         android::base::ScopedLockAssertion assumeLock(mLock);
64         if (!mHasElements.wait_for(lock, duration,
65                                    [this]() REQUIRES(mLock) { return !this->mQueue.empty(); })) {
66             return {};
67         }
68         T t = std::move(mQueue.front());
69         mQueue.erase(mQueue.begin());
70         return t;
71     };
72 
73     /**
74      * Add a new object to the queue.
75      * Does not block.
76      * Return true if an element was successfully added.
77      * Return false if the queue is full.
78      */
push(T && t)79     bool push(T&& t) {
80         { // acquire lock
81             std::scoped_lock lock(mLock);
82             if (mCapacity && mQueue.size() == mCapacity) {
83                 return false;
84             }
85             mQueue.push_back(std::move(t));
86         } // release lock
87         mHasElements.notify_one();
88         return true;
89     };
90 
91     /**
92      * Construct a new object into the queue.
93      * Does not block.
94      * Return true if an element was successfully added.
95      * Return false if the queue is full.
96      */
97     template <class... Args>
emplace(Args &&...args)98     bool emplace(Args&&... args) {
99         { // acquire lock
100             std::scoped_lock lock(mLock);
101             if (mCapacity && mQueue.size() == mCapacity) {
102                 return false;
103             }
104             mQueue.emplace_back(args...);
105         } // release lock
106         mHasElements.notify_one();
107         return true;
108     };
109 
erase_if(const std::function<bool (const T &)> & pred)110     void erase_if(const std::function<bool(const T&)>& pred) {
111         std::scoped_lock lock(mLock);
112         std::erase_if(mQueue, pred);
113     }
114 
115     /**
116      * Remove all elements.
117      * Does not block.
118      */
clear()119     void clear() {
120         std::scoped_lock lock(mLock);
121         mQueue.clear();
122     };
123 
124     /**
125      * How many elements are currently stored in the queue.
126      * Primary used for debugging.
127      * Does not block.
128      */
size()129     size_t size() {
130         std::scoped_lock lock(mLock);
131         return mQueue.size();
132     }
133 
134 private:
135     const std::optional<size_t> mCapacity;
136     /**
137      * Used to signal that mQueue is non-empty.
138      */
139     std::condition_variable mHasElements;
140     /**
141      * Lock for accessing and waiting on elements.
142      */
143     std::mutex mLock;
144     std::list<T> mQueue GUARDED_BY(mLock);
145 };
146 
147 } // namespace android
148