1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <bluetooth/log.h>
20 #include <unistd.h>
21 
22 #include <functional>
23 #include <mutex>
24 #include <queue>
25 
26 #include "common/bind.h"
27 #include "common/callback.h"
28 #include "os/handler.h"
29 #include "os/linux_generic/reactive_semaphore.h"
30 #include "os/log.h"
31 
32 namespace bluetooth {
33 namespace os {
34 
35 // See documentation for |Queue|
36 template <typename T>
37 class IQueueEnqueue {
38  public:
39   using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
40   virtual ~IQueueEnqueue() = default;
41   virtual void RegisterEnqueue(Handler* handler, EnqueueCallback callback) = 0;
42   virtual void UnregisterEnqueue() = 0;
43 };
44 
45 // See documentation for |Queue|
46 template <typename T>
47 class IQueueDequeue {
48  public:
49   using DequeueCallback = common::Callback<void()>;
50   virtual ~IQueueDequeue() = default;
51   virtual void RegisterDequeue(Handler* handler, DequeueCallback callback) = 0;
52   virtual void UnregisterDequeue() = 0;
53   virtual std::unique_ptr<T> TryDequeue() = 0;
54 };
55 
56 template <typename T>
57 class Queue : public IQueueEnqueue<T>, public IQueueDequeue<T> {
58  public:
59   // A function moving data from enqueue end buffer to queue, it will be continually be invoked until queue
60   // is full. Enqueue end should make sure buffer isn't empty and UnregisterEnqueue when buffer become empty.
61   using EnqueueCallback = common::Callback<std::unique_ptr<T>()>;
62   // A function moving data form queue to dequeue end buffer, it will be continually be invoked until queue
63   // is empty. TryDequeue should be use in this function to get data from queue.
64   using DequeueCallback = common::Callback<void()>;
65   // Create a queue with |capacity| is the maximum number of messages a queue can contain
66   explicit Queue(size_t capacity);
67   ~Queue();
68   // Register |callback| that will be called on |handler| when the queue is able to enqueue one piece of data.
69   // This will cause a crash if handler or callback has already been registered before.
70   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override;
71   // Unregister current EnqueueCallback from this queue, this will cause a crash if not registered yet.
72   void UnregisterEnqueue() override;
73   // Register |callback| that will be called on |handler| when the queue has at least one piece of data ready
74   // for dequeue. This will cause a crash if handler or callback has already been registered before.
75   void RegisterDequeue(Handler* handler, DequeueCallback callback) override;
76   // Unregister current DequeueCallback from this queue, this will cause a crash if not registered yet.
77   void UnregisterDequeue() override;
78 
79   // Try to dequeue an item from this queue. Return nullptr when there is nothing in the queue.
80   std::unique_ptr<T> TryDequeue() override;
81 
82  private:
83   void EnqueueCallbackInternal(EnqueueCallback callback);
84   // An internal queue that holds at most |capacity| pieces of data
85   std::queue<std::unique_ptr<T>> queue_;
86   // A mutex that guards data in this queue
87   std::mutex mutex_;
88 
89   class QueueEndpoint {
90    public:
QueueEndpoint(unsigned int initial_value)91     explicit QueueEndpoint(unsigned int initial_value)
92         : reactive_semaphore_(initial_value), handler_(nullptr), reactable_(nullptr) {}
93     ReactiveSemaphore reactive_semaphore_;
94     Handler* handler_;
95     Reactor::Reactable* reactable_;
96   };
97 
98   QueueEndpoint enqueue_;
99   QueueEndpoint dequeue_;
100 };
101 
102 template <typename T>
103 class EnqueueBuffer {
104  public:
EnqueueBuffer(IQueueEnqueue<T> * queue)105   EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {}
106 
~EnqueueBuffer()107   ~EnqueueBuffer() {
108     if (enqueue_registered_.exchange(false)) {
109       queue_->UnregisterEnqueue();
110     }
111   }
112 
Enqueue(std::unique_ptr<T> t,os::Handler * handler)113   void Enqueue(std::unique_ptr<T> t, os::Handler* handler) {
114     std::lock_guard<std::mutex> lock(mutex_);
115     buffer_.push(std::move(t));
116     if (!enqueue_registered_.exchange(true)) {
117       queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this)));
118     }
119   }
120 
Clear()121   void Clear() {
122     std::lock_guard<std::mutex> lock(mutex_);
123     if (enqueue_registered_.exchange(false)) {
124       queue_->UnregisterEnqueue();
125       std::queue<std::unique_ptr<T>> empty;
126       std::swap(buffer_, empty);
127     }
128   }
129 
Size()130   auto Size() const {
131     return buffer_.size();
132   }
133 
NotifyOnEmpty(common::OnceClosure callback)134   void NotifyOnEmpty(common::OnceClosure callback) {
135     std::lock_guard<std::mutex> lock(mutex_);
136     log::assert_that(callback_on_empty_.is_null(), "assert failed: callback_on_empty_.is_null()");
137     callback_on_empty_ = std::move(callback);
138   }
139 
140  private:
enqueue_callback()141   std::unique_ptr<T> enqueue_callback() {
142     std::lock_guard<std::mutex> lock(mutex_);
143     std::unique_ptr<T> enqueued_t = std::move(buffer_.front());
144     buffer_.pop();
145     if (buffer_.empty() && enqueue_registered_.exchange(false)) {
146       queue_->UnregisterEnqueue();
147       if (!callback_on_empty_.is_null()) {
148         std::move(callback_on_empty_).Run();
149       }
150     }
151     return enqueued_t;
152   }
153 
154   mutable std::mutex mutex_;
155   IQueueEnqueue<T>* queue_;
156   std::atomic_bool enqueue_registered_ = false;
157   std::queue<std::unique_ptr<T>> buffer_;
158   common::OnceClosure callback_on_empty_;
159 };
160 
161 template <typename T>
Queue(size_t capacity)162 Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){};
163 
164 template <typename T>
~Queue()165 Queue<T>::~Queue() {
166   log::assert_that(enqueue_.handler_ == nullptr, "Enqueue is not unregistered");
167   log::assert_that(dequeue_.handler_ == nullptr, "Dequeue is not unregistered");
168 };
169 
170 template <typename T>
RegisterEnqueue(Handler * handler,EnqueueCallback callback)171 void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) {
172   std::lock_guard<std::mutex> lock(mutex_);
173   log::assert_that(enqueue_.handler_ == nullptr, "assert failed: enqueue_.handler_ == nullptr");
174   log::assert_that(enqueue_.reactable_ == nullptr, "assert failed: enqueue_.reactable_ == nullptr");
175   enqueue_.handler_ = handler;
176   enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register(
177       enqueue_.reactive_semaphore_.GetFd(),
178       base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)),
179       base::Closure());
180 }
181 
182 template <typename T>
UnregisterEnqueue()183 void Queue<T>::UnregisterEnqueue() {
184   Reactor* reactor = nullptr;
185   Reactor::Reactable* to_unregister = nullptr;
186   bool wait_for_unregister = false;
187   {
188     std::lock_guard<std::mutex> lock(mutex_);
189     log::assert_that(
190         enqueue_.reactable_ != nullptr, "assert failed: enqueue_.reactable_ != nullptr");
191     reactor = enqueue_.handler_->thread_->GetReactor();
192     wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread());
193     to_unregister = enqueue_.reactable_;
194     enqueue_.reactable_ = nullptr;
195     enqueue_.handler_ = nullptr;
196   }
197   reactor->Unregister(to_unregister);
198   if (wait_for_unregister) {
199     reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
200   }
201 }
202 
203 template <typename T>
RegisterDequeue(Handler * handler,DequeueCallback callback)204 void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) {
205   std::lock_guard<std::mutex> lock(mutex_);
206   log::assert_that(dequeue_.handler_ == nullptr, "assert failed: dequeue_.handler_ == nullptr");
207   log::assert_that(dequeue_.reactable_ == nullptr, "assert failed: dequeue_.reactable_ == nullptr");
208   dequeue_.handler_ = handler;
209   dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(
210       dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure());
211 }
212 
213 template <typename T>
UnregisterDequeue()214 void Queue<T>::UnregisterDequeue() {
215   Reactor* reactor = nullptr;
216   Reactor::Reactable* to_unregister = nullptr;
217   bool wait_for_unregister = false;
218   {
219     std::lock_guard<std::mutex> lock(mutex_);
220     log::assert_that(
221         dequeue_.reactable_ != nullptr, "assert failed: dequeue_.reactable_ != nullptr");
222     reactor = dequeue_.handler_->thread_->GetReactor();
223     wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread());
224     to_unregister = dequeue_.reactable_;
225     dequeue_.reactable_ = nullptr;
226     dequeue_.handler_ = nullptr;
227   }
228   reactor->Unregister(to_unregister);
229   if (wait_for_unregister) {
230     reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000));
231   }
232 }
233 
234 template <typename T>
TryDequeue()235 std::unique_ptr<T> Queue<T>::TryDequeue() {
236   std::lock_guard<std::mutex> lock(mutex_);
237 
238   if (queue_.empty()) {
239     return nullptr;
240   }
241 
242   dequeue_.reactive_semaphore_.Decrease();
243 
244   std::unique_ptr<T> data = std::move(queue_.front());
245   queue_.pop();
246 
247   enqueue_.reactive_semaphore_.Increase();
248 
249   return data;
250 }
251 
252 template <typename T>
EnqueueCallbackInternal(EnqueueCallback callback)253 void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) {
254   std::unique_ptr<T> data = callback.Run();
255   log::assert_that(data != nullptr, "assert failed: data != nullptr");
256   std::lock_guard<std::mutex> lock(mutex_);
257   enqueue_.reactive_semaphore_.Decrease();
258   queue_.push(std::move(data));
259   dequeue_.reactive_semaphore_.Increase();
260 }
261 
262 }  // namespace os
263 }  // namespace bluetooth
264