1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #pragma once
18
19 #include <bluetooth/log.h>
20 #include <unistd.h>
21
22 #include <functional>
23 #include <mutex>
24 #include <queue>
25
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "os/handler.h"
29 #include "os/linux_generic/reactive_semaphore.h"
30 #include "os/log.h"
31
32 namespace bluetooth {
33 namespace os {
34
35 // See documentation for |Queue|
36 template <typename T>
37 class IQueueEnqueue {
38 public:
39 using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
40 virtual ~IQueueEnqueue() = default;
41 virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) = 0;
42 virtual void UnregisterEnqueue() = 0;
43 };
44
45 // See documentation for |Queue|
46 template <typename T>
47 class IQueueDequeue {
48 public:
49 using DequeueCallback = common::Callback<void()>;
50 virtual ~IQueueDequeue() = default;
51 virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) = 0;
52 virtual void UnregisterDequeue() = 0;
53 virtual std::unique_ptr<T> TryDequeue() = 0;
54 };
55
56 template <typename T>
57 class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
58 public:
59 // A function moving data from enqueue end buffer to queue, it will be continually be invoked until queue
60 // is full. Enqueue end should make sure buffer isn't empty and UnregisterEnqueue when buffer become empty.
61 using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
62 // A function moving data form queue to dequeue end buffer, it will be continually be invoked until queue
63 // is empty. TryDequeue should be use in this function to get data from queue.
64 using DequeueCallback = common::Callback<void()>;
65 // Create a queue with |capacity| is the maximum number of messages a queue can contain
66 explicit Queue(size_t capacity);
67 ~Queue();
68 // Register |callback| that will be called on |handler| when the queue is able to enqueue one piece of data.
69 // This will cause a crash if handler or callback has already been registered before.
70 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override;
71 // Unregister current EnqueueCallback from this queue, this will cause a crash if not registered yet.
72 void UnregisterEnqueue() override;
73 // Register |callback| that will be called on |handler| when the queue has at least one piece of data ready
74 // for dequeue. This will cause a crash if handler or callback has already been registered before.
75 void RegisterDequeue(Handler* handler, DequeueCallback callback) override;
76 // Unregister current DequeueCallback from this queue, this will cause a crash if not registered yet.
77 void UnregisterDequeue() override;
78
79 // Try to dequeue an item from this queue. Return nullptr when there is nothing in the queue.
80 std::unique_ptr<T> TryDequeue() override;
81
82 private:
83 void EnqueueCallbackInternal(EnqueueCallback callback);
84 // An internal queue that holds at most |capacity| pieces of data
85 std::queue<std::unique_ptr<T>> queue_;
86 // A mutex that guards data in this queue
87 std::mutex mutex_;
88
89 class QueueEndpoint {
90 public:
QueueEndpoint(unsigned int initial_value)91 explicit QueueEndpoint(unsigned int initial_value)
92 : reactive_semaphore_(initial_value), handler_(nullptr), reactable_(nullptr) {}
93 ReactiveSemaphore reactive_semaphore_;
94 Handler* handler_;
95 Reactor::Reactable* reactable_;
96 };
97
98 QueueEndpoint enqueue_;
99 QueueEndpoint dequeue_;
100 };
101
102 template <typename T>
103 class EnqueueBuffer {
104 public:
EnqueueBuffer(IQueueEnqueue<T> * queue)105 EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
106
~EnqueueBuffer()107 ~EnqueueBuffer() {
108 if (enqueue_registered_.exchange(false)) {
109 queue_->UnregisterEnqueue();
110 }
111 }
112
Enqueue(std::unique_ptr<T> t,os::Handler * handler)113 void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
114 std::lock_guard<std::mutex> lock(mutex_);
115 buffer_.push(std::move(t));
116 if (!enqueue_registered_.exchange(true)) {
117 queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
118 }
119 }
120
Clear()121 void Clear() {
122 std::lock_guard<std::mutex> lock(mutex_);
123 if (enqueue_registered_.exchange(false)) {
124 queue_->UnregisterEnqueue();
125 std::queue<std::unique_ptr<T>> empty;
126 std::swap(buffer_, empty);
127 }
128 }
129
Size()130 auto Size() const {
131 return buffer_.size();
132 }
133
NotifyOnEmpty(common::OnceClosure callback)134 void NotifyOnEmpty(common::OnceClosure callback) {
135 std::lock_guard<std::mutex> lock(mutex_);
136 log::assert_that(callback_on_empty_.is_null(), "assert failed: callback_on_empty_.is_null()");
137 callback_on_empty_ = std::move(callback);
138 }
139
140 private:
enqueue_callback()141 std::unique_ptr<T> enqueue_callback() {
142 std::lock_guard<std::mutex> lock(mutex_);
143 std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
144 buffer_.pop();
145 if (buffer_.empty() && enqueue_registered_.exchange(false)) {
146 queue_->UnregisterEnqueue();
147 if (!callback_on_empty_.is_null()) {
148 std::move(callback_on_empty_).Run();
149 }
150 }
151 return enqueued_t;
152 }
153
154 mutable std::mutex mutex_;
155 IQueueEnqueue<T>* queue_;
156 std::atomic_bool enqueue_registered_ = false;
157 std::queue<std::unique_ptr<T>> buffer_;
158 common::OnceClosure callback_on_empty_;
159 };
160
161 template <typename T>
Queue(size_t capacity)162 Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
163
164 template <typename T>
~Queue()165 Queue<T>::~Queue() {
166 log::assert_that(enqueue_.handler_ == nullptr, "Enqueue is not unregistered");
167 log::assert_that(dequeue_.handler_ == nullptr, "Dequeue is not unregistered");
168 };
169
170 template <typename T>
RegisterEnqueue(Handler * handler,EnqueueCallback callback)171 void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
172 std::lock_guard<std::mutex> lock(mutex_);
173 log::assert_that(enqueue_.handler_ == nullptr, "assert failed: enqueue_.handler_ == nullptr");
174 log::assert_that(enqueue_.reactable_ == nullptr, "assert failed: enqueue_.reactable_ == nullptr");
175 enqueue_.handler_ = handler;
176 enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
177 enqueue_.reactive_semaphore_.GetFd(),
178 base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)),
179 base::Closure());
180 }
181
182 template <typename T>
UnregisterEnqueue()183 void Queue<T>::UnregisterEnqueue() {
184 Reactor* reactor = nullptr;
185 Reactor::Reactable* to_unregister = nullptr;
186 bool wait_for_unregister = false;
187 {
188 std::lock_guard<std::mutex> lock(mutex_);
189 log::assert_that(
190 enqueue_.reactable_ != nullptr, "assert failed: enqueue_.reactable_ != nullptr");
191 reactor = enqueue_.handler_->thread_->GetReactor();
192 wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
193 to_unregister = enqueue_.reactable_;
194 enqueue_.reactable_ = nullptr;
195 enqueue_.handler_ = nullptr;
196 }
197 reactor->Unregister(to_unregister);
198 if (wait_for_unregister) {
199 reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
200 }
201 }
202
203 template <typename T>
RegisterDequeue(Handler * handler,DequeueCallback callback)204 void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
205 std::lock_guard<std::mutex> lock(mutex_);
206 log::assert_that(dequeue_.handler_ == nullptr, "assert failed: dequeue_.handler_ == nullptr");
207 log::assert_that(dequeue_.reactable_ == nullptr, "assert failed: dequeue_.reactable_ == nullptr");
208 dequeue_.handler_ = handler;
209 dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(
210 dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure());
211 }
212
213 template <typename T>
UnregisterDequeue()214 void Queue<T>::UnregisterDequeue() {
215 Reactor* reactor = nullptr;
216 Reactor::Reactable* to_unregister = nullptr;
217 bool wait_for_unregister = false;
218 {
219 std::lock_guard<std::mutex> lock(mutex_);
220 log::assert_that(
221 dequeue_.reactable_ != nullptr, "assert failed: dequeue_.reactable_ != nullptr");
222 reactor = dequeue_.handler_->thread_->GetReactor();
223 wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
224 to_unregister = dequeue_.reactable_;
225 dequeue_.reactable_ = nullptr;
226 dequeue_.handler_ = nullptr;
227 }
228 reactor->Unregister(to_unregister);
229 if (wait_for_unregister) {
230 reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
231 }
232 }
233
234 template <typename T>
TryDequeue()235 std::unique_ptr<T> Queue<T>::TryDequeue() {
236 std::lock_guard<std::mutex> lock(mutex_);
237
238 if (queue_.empty()) {
239 return nullptr;
240 }
241
242 dequeue_.reactive_semaphore_.Decrease();
243
244 std::unique_ptr<T> data = std::move(queue_.front());
245 queue_.pop();
246
247 enqueue_.reactive_semaphore_.Increase();
248
249 return data;
250 }
251
252 template <typename T>
EnqueueCallbackInternal(EnqueueCallback callback)253 void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
254 std::unique_ptr<T> data = callback.Run();
255 log::assert_that(data != nullptr, "assert failed: data != nullptr");
256 std::lock_guard<std::mutex> lock(mutex_);
257 enqueue_.reactive_semaphore_.Decrease();
258 queue_.push(std::move(data));
259 dequeue_.reactive_semaphore_.Increase();
260 }
261
262 } // namespace os
263 } // namespace bluetooth
264