1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "os/queue.h"
18 
19 #include <sys/eventfd.h>
20 
21 #include <atomic>
22 #include <future>
23 #include <unordered_map>
24 
25 #include "common/bind.h"
26 #include "gtest/gtest.h"
27 #include "os/reactor.h"
28 
29 namespace bluetooth {
30 namespace os {
31 namespace {
32 
33 constexpr int kQueueSize = 10;
34 constexpr int kHalfOfQueueSize = kQueueSize / 2;
35 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
36 constexpr int kQueueSizeOne = 1;
37 
38 class QueueTest : public ::testing::Test {
39  protected:
SetUp()40   void SetUp() override {
41     enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
42     enqueue_handler_ = new Handler(enqueue_thread_);
43     dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
44     dequeue_handler_ = new Handler(dequeue_thread_);
45   }
TearDown()46   void TearDown() override {
47     enqueue_handler_->Clear();
48     delete enqueue_handler_;
49     delete enqueue_thread_;
50     dequeue_handler_->Clear();
51     delete dequeue_handler_;
52     delete dequeue_thread_;
53     enqueue_handler_ = nullptr;
54     enqueue_thread_ = nullptr;
55     dequeue_handler_ = nullptr;
56     dequeue_thread_ = nullptr;
57   }
58 
59   Thread* enqueue_thread_;
60   Handler* enqueue_handler_;
61   Thread* dequeue_thread_;
62   Handler* dequeue_handler_;
63 };
64 
65 class TestEnqueueEnd {
66  public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)67   explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
68       : count(0), handler_(handler), queue_(queue), delay_(0) {}
69 
~TestEnqueueEnd()70   ~TestEnqueueEnd() {}
71 
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)72   void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
73     promise_map_ = promise_map;
74     handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
75   }
76 
UnregisterEnqueue()77   void UnregisterEnqueue() {
78     std::promise<void> promise;
79     auto future = promise.get_future();
80 
81     handler_->Post(
82         common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
83     future.wait();
84   }
85 
EnqueueCallbackForTest()86   std::unique_ptr<std::string> EnqueueCallbackForTest() {
87     if (delay_ != 0) {
88       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
89     }
90 
91     count++;
92     std::unique_ptr<std::string> data = std::move(buffer_.front());
93     buffer_.pop();
94     std::string copy = *data;
95     if (buffer_.empty()) {
96       queue_->UnregisterEnqueue();
97     }
98 
99     auto pair = promise_map_->find(buffer_.size());
100     if (pair != promise_map_->end()) {
101       pair->second.set_value(pair->first);
102       promise_map_->erase(pair->first);
103     }
104     return data;
105   }
106 
setDelay(int value)107   void setDelay(int value) {
108     delay_ = value;
109   }
110 
111   std::queue<std::unique_ptr<std::string>> buffer_;
112   int count;
113 
114  private:
115   Handler* handler_;
116   Queue<std::string>* queue_;
117   std::unordered_map<int, std::promise<int>>* promise_map_;
118   int delay_;
119 
handle_register_enqueue()120   void handle_register_enqueue() {
121     queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
122   }
123 
handle_unregister_enqueue(std::promise<void> promise)124   void handle_unregister_enqueue(std::promise<void> promise) {
125     queue_->UnregisterEnqueue();
126     promise.set_value();
127   }
128 };
129 
130 class TestDequeueEnd {
131  public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)132   explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
133       : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
134 
~TestDequeueEnd()135   ~TestDequeueEnd() {}
136 
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)137   void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
138     promise_map_ = promise_map;
139     handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
140   }
141 
UnregisterDequeue()142   void UnregisterDequeue() {
143     std::promise<void> promise;
144     auto future = promise.get_future();
145 
146     handler_->Post(
147         common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
148     future.wait();
149   }
150 
DequeueCallbackForTest()151   void DequeueCallbackForTest() {
152     if (delay_ != 0) {
153       std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
154     }
155 
156     count++;
157     std::unique_ptr<std::string> data = queue_->TryDequeue();
158     buffer_.push(std::move(data));
159 
160     if (buffer_.size() == capacity_) {
161       queue_->UnregisterDequeue();
162     }
163 
164     auto pair = promise_map_->find(buffer_.size());
165     if (pair != promise_map_->end()) {
166       pair->second.set_value(pair->first);
167       promise_map_->erase(pair->first);
168     }
169   }
170 
setDelay(int value)171   void setDelay(int value) {
172     delay_ = value;
173   }
174 
175   std::queue<std::unique_ptr<std::string>> buffer_;
176   int count;
177 
178  private:
179   Handler* handler_;
180   Queue<std::string>* queue_;
181   std::unordered_map<int, std::promise<int>>* promise_map_;
182   int capacity_;
183   int delay_;
184 
handle_register_dequeue()185   void handle_register_dequeue() {
186     queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
187   }
188 
handle_unregister_dequeue(std::promise<void> promise)189   void handle_unregister_dequeue(std::promise<void> promise) {
190     queue_->UnregisterDequeue();
191     promise.set_value();
192   }
193 };
194 
195 // Enqueue end level : 0 -> queue is full, 1 - >  queue isn't full
196 // Dequeue end level : 0 -> queue is empty, 1 - >  queue isn't empty
197 
198 // Test 1 : Queue is empty
199 
200 // Enqueue end level : 1
201 // Dequeue end level : 0
202 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)203 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
204   Queue<std::string> queue(kQueueSize);
205   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
206 
207   // Push kQueueSize data to enqueue_end buffer
208   for (int i = 0; i < kQueueSize; i++) {
209     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
210     test_enqueue_end.buffer_.push(std::move(data));
211   }
212   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
213 
214   // Register enqueue and expect data move to Queue
215   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
216   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
217   auto enqueue_future = enqueue_promise_map[0].get_future();
218   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
219   enqueue_future.wait();
220   EXPECT_EQ(enqueue_future.get(), 0);
221   std::this_thread::sleep_for(std::chrono::milliseconds(20));
222 }
223 
224 // Enqueue end level : 1
225 // Dequeue end level : 0
226 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)227 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
228   Queue<std::string> queue(kQueueSize);
229   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
230 
231   // Register dequeue, DequeueCallback shouldn't be invoked
232   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
233   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
234   std::this_thread::sleep_for(std::chrono::milliseconds(20));
235   EXPECT_EQ(test_dequeue_end.count, 0);
236 
237   test_dequeue_end.UnregisterDequeue();
238 }
239 
240 // Test 2 : Queue is full
241 
242 // Enqueue end level : 0
243 // Dequeue end level : 1
244 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)245 TEST_F(QueueTest, register_enqueue_with_full_queue) {
246   Queue<std::string> queue(kQueueSize);
247   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
248 
249   // make Queue full
250   for (int i = 0; i < kQueueSize; i++) {
251     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
252     test_enqueue_end.buffer_.push(std::move(data));
253   }
254   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
255   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
256   auto enqueue_future = enqueue_promise_map[0].get_future();
257   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
258   enqueue_future.wait();
259   EXPECT_EQ(enqueue_future.get(), 0);
260 
261   // push some data to enqueue_end buffer and register enqueue;
262   for (int i = 0; i < kHalfOfQueueSize; i++) {
263     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
264     test_enqueue_end.buffer_.push(std::move(data));
265   }
266   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
267 
268   // EnqueueCallback shouldn't be invoked
269   std::this_thread::sleep_for(std::chrono::milliseconds(20));
270   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
271   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
272 
273   test_enqueue_end.UnregisterEnqueue();
274 }
275 
276 // Enqueue end level : 0
277 // Dequeue end level : 1
278 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)279 TEST_F(QueueTest, register_dequeue_with_full_queue) {
280   Queue<std::string> queue(kQueueSize);
281   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
282   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
283 
284   // make Queue full
285   for (int i = 0; i < kQueueSize; i++) {
286     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
287     test_enqueue_end.buffer_.push(std::move(data));
288   }
289   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
290   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
291   auto enqueue_future = enqueue_promise_map[0].get_future();
292   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
293   enqueue_future.wait();
294   EXPECT_EQ(enqueue_future.get(), 0);
295 
296   // Register dequeue and expect data move to dequeue end buffer
297   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
298   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
299   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
300   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
301   dequeue_future.wait();
302   EXPECT_EQ(dequeue_future.get(), kQueueSize);
303 
304   test_dequeue_end.UnregisterDequeue();
305 }
306 
307 // Test 3 : Queue is non-empty and non-full
308 
309 // Enqueue end level : 1
310 // Dequeue end level : 1
311 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)312 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
313   Queue<std::string> queue(kQueueSize);
314   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
315 
316   // make Queue half empty
317   for (int i = 0; i < kHalfOfQueueSize; i++) {
318     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
319     test_enqueue_end.buffer_.push(std::move(data));
320   }
321   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
322   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
323   auto enqueue_future = enqueue_promise_map[0].get_future();
324   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
325   enqueue_future.wait();
326   EXPECT_EQ(enqueue_future.get(), 0);
327 
328   // push some data to enqueue_end buffer and register enqueue;
329   for (int i = 0; i < kHalfOfQueueSize; i++) {
330     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
331     test_enqueue_end.buffer_.push(std::move(data));
332   }
333 
334   // Register enqueue and expect data move to Queue
335   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
336   enqueue_future = enqueue_promise_map[0].get_future();
337   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
338   enqueue_future.wait();
339   EXPECT_EQ(enqueue_future.get(), 0);
340 }
341 
342 // Enqueue end level : 1
343 // Dequeue end level : 1
344 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)345 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
346   Queue<std::string> queue(kQueueSize);
347   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
348   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
349 
350   // make Queue half empty
351   for (int i = 0; i < kHalfOfQueueSize; i++) {
352     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
353     test_enqueue_end.buffer_.push(std::move(data));
354   }
355   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
356   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
357   auto enqueue_future = enqueue_promise_map[0].get_future();
358   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
359   enqueue_future.wait();
360   EXPECT_EQ(enqueue_future.get(), 0);
361 
362   // Register dequeue and expect data move to dequeue end buffer
363   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
364   dequeue_promise_map.emplace(
365       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
366   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
367   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
368   dequeue_future.wait();
369   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
370 
371   test_dequeue_end.UnregisterDequeue();
372 }
373 
374 // Dynamic level test
375 
376 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
377 
378 // Enqueue end level : 1 -> 0
379 // Dequeue end level : 1
380 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)381 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
382   Queue<std::string> queue(kQueueSize);
383   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
384 
385   // push double of kQueueSize to enqueue end buffer
386   for (int i = 0; i < kDoubleOfQueueSize; i++) {
387     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
388     test_enqueue_end.buffer_.push(std::move(data));
389   }
390 
391   // Register enqueue and expect kQueueSize data move to Queue
392   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
393   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
394   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
395   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
396   enqueue_future.wait();
397   EXPECT_EQ(enqueue_future.get(), kQueueSize);
398 
399   // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
400   std::this_thread::sleep_for(std::chrono::milliseconds(20));
401   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
402   EXPECT_EQ(test_enqueue_end.count, kQueueSize);
403 
404   test_enqueue_end.UnregisterEnqueue();
405 }
406 
407 // Enqueue end level : 1 -> 0
408 // Dequeue end level : 1
409 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)410 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
411   Queue<std::string> queue(kQueueSize);
412   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
413   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
414 
415   // push double of kQueueSize to enqueue end buffer
416   for (int i = 0; i < kDoubleOfQueueSize; i++) {
417     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
418     test_enqueue_end.buffer_.push(std::move(data));
419   }
420 
421   // Register dequeue
422   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
423   dequeue_promise_map.emplace(
424       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
425   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
426   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
427 
428   // Register enqueue
429   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
430   enqueue_promise_map.emplace(
431       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
432   auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
433   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
434 
435   // Dequeue end will unregister when buffer size is kHalfOfQueueSize
436   dequeue_future.wait();
437   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
438 
439   // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
440   enqueue_future.wait();
441   EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
442   std::this_thread::sleep_for(std::chrono::milliseconds(20));
443   EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
444   EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
445 
446   test_enqueue_end.UnregisterEnqueue();
447 }
448 
449 // Enqueue end level : 1 -> 0
450 // Dequeue end level : 1
451 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)452 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
453   Queue<std::string> queue(kQueueSize);
454   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
455   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
456 
457   // push double of kDoubleOfQueueSize to enqueue end buffer
458   for (int i = 0; i < kDoubleOfQueueSize; i++) {
459     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
460     test_enqueue_end.buffer_.push(std::move(data));
461   }
462 
463   // Set 20 ms delay for callback and register dequeue
464   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
465   test_dequeue_end.setDelay(20);
466   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
467   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
468 
469   // Register enqueue
470   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
471   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
472   auto enqueue_future = enqueue_promise_map[0].get_future();
473   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
474 
475   // Wait for enqueue buffer empty and expect queue is full
476   enqueue_future.wait();
477   EXPECT_EQ(enqueue_future.get(), 0);
478   EXPECT_GE(test_dequeue_end.buffer_.size(), kQueueSize - 1);
479 
480   test_dequeue_end.UnregisterDequeue();
481 }
482 
483 // Enqueue end level : 0 -> 1
484 // Dequeue end level : 1 -> 0
485 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)486 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
487   Queue<std::string> queue(kQueueSizeOne);
488   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
489   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
490 
491   // push double of kQueueSize to enqueue end buffer
492   for (int i = 0; i < kQueueSize; i++) {
493     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
494     test_enqueue_end.buffer_.push(std::move(data));
495   }
496 
497   // Register dequeue
498   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
499   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
500   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
501   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
502 
503   // Register enqueue
504   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
505   auto enqueue_future = enqueue_promise_map[0].get_future();
506   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
507 
508   // Wait for all data move from enqueue end buffer to dequeue end buffer
509   dequeue_future.wait();
510   EXPECT_EQ(dequeue_future.get(), kQueueSize);
511 
512   test_dequeue_end.UnregisterDequeue();
513 }
514 
515 // Enqueue end level : 1 -> 0
516 // Dequeue end level : 1
517 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)518 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
519   Queue<std::string> queue(kQueueSize);
520   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
521   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
522 
523   // make Queue full
524   for (int i = 0; i < kDoubleOfQueueSize; i++) {
525     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
526     test_enqueue_end.buffer_.push(std::move(data));
527   }
528   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
529   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
530   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
531   auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
532   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
533   enqueue_future.wait();
534   EXPECT_EQ(enqueue_future.get(), kQueueSize);
535 
536   // Expect kQueueSize data block in enqueue end buffer
537   std::this_thread::sleep_for(std::chrono::milliseconds(20));
538   EXPECT_EQ(test_enqueue_end.buffer_.size(), kQueueSize);
539 
540   // Register dequeue
541   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
542   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
543 
544   // Expect enqueue end will empty
545   enqueue_future = enqueue_promise_map[0].get_future();
546   enqueue_future.wait();
547   EXPECT_EQ(enqueue_future.get(), 0);
548 
549   test_dequeue_end.UnregisterDequeue();
550 }
551 
552 // Enqueue end level : 0 -> 1
553 // Dequeue end level : 1 -> 0
554 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)555 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
556   Queue<std::string> queue(kQueueSizeOne);
557   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
558   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
559 
560   // push double of kQueueSize to enqueue end buffer
561   for (int i = 0; i < kQueueSize; i++) {
562     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
563     test_enqueue_end.buffer_.push(std::move(data));
564   }
565 
566   // Register dequeue
567   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
568   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
569   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
570   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
571 
572   // Register enqueue
573   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
574   auto enqueue_future = enqueue_promise_map[0].get_future();
575   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
576 
577   // Wait for all data move from enqueue end buffer to dequeue end buffer
578   dequeue_future.wait();
579   EXPECT_EQ(dequeue_future.get(), kQueueSize);
580 
581   test_dequeue_end.UnregisterDequeue();
582 }
583 
584 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
585 
586 // Enqueue end level : 1
587 // Dequeue end level : 1 -> 0
588 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)589 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
590   Queue<std::string> queue(kQueueSize);
591   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
592   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
593 
594   // make Queue half empty
595   for (int i = 0; i < kHalfOfQueueSize; i++) {
596     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
597     test_enqueue_end.buffer_.push(std::move(data));
598   }
599   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
600   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
601   auto enqueue_future = enqueue_promise_map[0].get_future();
602   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
603   enqueue_future.wait();
604   EXPECT_EQ(enqueue_future.get(), 0);
605 
606   // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
607   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
608   dequeue_promise_map.emplace(
609       std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
610   auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
611   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
612   dequeue_future.wait();
613   EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
614 
615   // Expect DequeueCallback should stop to be invoked
616   std::this_thread::sleep_for(std::chrono::milliseconds(20));
617   EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
618 }
619 
620 // Enqueue end level : 1
621 // Dequeue end level : 1 -> 0
622 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)623 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
624   Queue<std::string> queue(kQueueSize);
625   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
626   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
627 
628   // make Queue half empty
629   for (int i = 0; i < kHalfOfQueueSize; i++) {
630     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
631     test_enqueue_end.buffer_.push(std::move(data));
632   }
633   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
634   enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
635   auto enqueue_future = enqueue_promise_map[0].get_future();
636   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
637   enqueue_future.wait();
638   EXPECT_EQ(enqueue_future.get(), 0);
639 
640   // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
641   for (int i = 0; i < kHalfOfQueueSize; i++) {
642     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
643     test_enqueue_end.buffer_.push(std::move(data));
644   }
645   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
646 
647   // Register dequeue, expect kQueueSize move to dequeue end buffer
648   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
649   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
650   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
651   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
652   dequeue_future.wait();
653   EXPECT_EQ(dequeue_future.get(), kQueueSize);
654 
655   // Expect DequeueCallback should stop to be invoked
656   std::this_thread::sleep_for(std::chrono::milliseconds(20));
657   EXPECT_EQ(test_dequeue_end.count, kQueueSize);
658 }
659 
660 // Enqueue end level : 1
661 // Dequeue end level : 0 -> 1
662 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)663 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
664   Queue<std::string> queue(kQueueSize);
665   TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
666   TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
667 
668   // Register dequeue
669   std::unordered_map<int, std::promise<int>> dequeue_promise_map;
670   dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
671   test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
672 
673   // push kQueueSize data to enqueue end buffer and register enqueue
674   for (int i = 0; i < kQueueSize; i++) {
675     std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
676     test_enqueue_end.buffer_.push(std::move(data));
677   }
678   std::unordered_map<int, std::promise<int>> enqueue_promise_map;
679   test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
680 
681   // Expect kQueueSize data move to dequeue end buffer
682   auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
683   dequeue_future.wait();
684   EXPECT_EQ(dequeue_future.get(), kQueueSize);
685 }
686 
TEST_F(QueueTest,pass_smart_pointer_and_unregister)687 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
688   Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
689 
690   // Enqueue a string
691   std::string valid = "Valid String";
692   std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
693   queue->RegisterEnqueue(
694       enqueue_handler_,
695       common::Bind(
696           [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
697             queue->UnregisterEnqueue();
698             return std::make_unique<std::string>(*shared);
699           },
700           common::Unretained(queue),
701           shared));
702 
703   // Dequeue the string
704   queue->RegisterDequeue(
705       dequeue_handler_,
706       common::Bind(
707           [](Queue<std::string>* queue, std::string valid) {
708             queue->UnregisterDequeue();
709             auto answer = *queue->TryDequeue();
710             ASSERT_EQ(answer, valid);
711           },
712           common::Unretained(queue),
713           valid));
714 
715   // Wait for both handlers to finish and delete the Queue
716   std::promise<void> promise;
717   auto future = promise.get_future();
718 
719   enqueue_handler_->Post(common::BindOnce(
720       [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
721         dequeue_handler->Post(common::BindOnce(
722             [](Queue<std::string>* queue, std::promise<void>* promise) {
723               delete queue;
724               promise->set_value();
725             },
726             common::Unretained(queue),
727             common::Unretained(promise)));
728       },
729       common::Unretained(dequeue_handler_),
730       common::Unretained(queue),
731       common::Unretained(&promise)));
732   future.wait();
733 }
734 
sleep_and_enqueue_callback(int * to_increase)735 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
736   std::this_thread::sleep_for(std::chrono::milliseconds(100));
737   (*to_increase)++;
738   return std::make_unique<std::string>("Hello");
739 }
740 
TEST_F(QueueTest,unregister_enqueue_and_wait)741 TEST_F(QueueTest, unregister_enqueue_and_wait) {
742   Queue<std::string> queue(10);
743   int* indicator = new int(100);
744   queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
745   std::this_thread::sleep_for(std::chrono::milliseconds(50));
746   queue.UnregisterEnqueue();
747   EXPECT_EQ(*indicator, 101);
748   delete indicator;
749 }
750 
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)751 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
752     int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
753   std::this_thread::sleep_for(std::chrono::milliseconds(100));
754   (*to_increase)++;
755   if (is_registered->exchange(false)) {
756     queue->UnregisterEnqueue();
757   }
758   return std::make_unique<std::string>("Hello");
759 }
760 
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)761 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
762   Queue<std::string> queue(10);
763   int* indicator = new int(100);
764   std::atomic_bool is_registered = true;
765   queue.RegisterEnqueue(
766       enqueue_handler_,
767       common::Bind(
768           &sleep_and_enqueue_callback_and_unregister,
769           common::Unretained(indicator),
770           common::Unretained(&queue),
771           common::Unretained(&is_registered)));
772   std::this_thread::sleep_for(std::chrono::milliseconds(50));
773   if (is_registered.exchange(false)) {
774     queue.UnregisterEnqueue();
775   }
776   EXPECT_EQ(*indicator, 101);
777   delete indicator;
778 }
779 
sleep_and_dequeue_callback(int * to_increase)780 void sleep_and_dequeue_callback(int* to_increase) {
781   std::this_thread::sleep_for(std::chrono::milliseconds(100));
782   (*to_increase)++;
783 }
784 
TEST_F(QueueTest,unregister_dequeue_and_wait)785 TEST_F(QueueTest, unregister_dequeue_and_wait) {
786   int* indicator = new int(100);
787   Queue<std::string> queue(10);
788   queue.RegisterEnqueue(
789       enqueue_handler_,
790       common::Bind(
791           [](Queue<std::string>* queue) {
792             queue->UnregisterEnqueue();
793             return std::make_unique<std::string>("Hello");
794           },
795           common::Unretained(&queue)));
796   queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
797   std::this_thread::sleep_for(std::chrono::milliseconds(50));
798   queue.UnregisterDequeue();
799   EXPECT_EQ(*indicator, 101);
800   delete indicator;
801 }
802 
803 // Create all threads for death tests in the function that dies
804 class QueueDeathTest : public ::testing::Test {
805  public:
RegisterEnqueueAndDelete()806   void RegisterEnqueueAndDelete() {
807     Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
808     Handler* enqueue_handler = new Handler(enqueue_thread);
809     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
810     queue->RegisterEnqueue(
811         enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
812     delete queue;
813   }
814 
RegisterDequeueAndDelete()815   void RegisterDequeueAndDelete() {
816     Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
817     Handler* dequeue_handler = new Handler(dequeue_thread);
818     Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
819     queue->RegisterDequeue(
820         dequeue_handler,
821         common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
822     delete queue;
823   }
824 };
825 
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)826 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
827   EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
828 }
829 
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)830 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
831   EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
832 }
833 
834 class MockIQueueEnqueue : public IQueueEnqueue<int> {
835  public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)836   void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
837     EXPECT_FALSE(registered_);
838     registered_ = true;
839     handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
840   }
841 
handle_register_enqueue(EnqueueCallback callback)842   void handle_register_enqueue(EnqueueCallback callback) {
843     if (dont_handle_register_enqueue_) {
844       return;
845     }
846     while (registered_) {
847       std::unique_ptr<int> front = callback.Run();
848       queue_.push(*front);
849     }
850   }
851 
UnregisterEnqueue()852   void UnregisterEnqueue() override {
853     EXPECT_TRUE(registered_);
854     registered_ = false;
855   }
856 
857   bool dont_handle_register_enqueue_ = false;
858   bool registered_ = false;
859   std::queue<int> queue_;
860 };
861 
862 class EnqueueBufferTest : public ::testing::Test {
863  protected:
SetUp()864   void SetUp() override {
865     thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
866     handler_ = new Handler(thread_);
867   }
868 
TearDown()869   void TearDown() override {
870     handler_->Clear();
871     delete handler_;
872     delete thread_;
873   }
874 
SynchronizeHandler()875   void SynchronizeHandler() {
876     std::promise<void> promise;
877     auto future = promise.get_future();
878     handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
879     future.wait();
880   }
881 
882   MockIQueueEnqueue enqueue_;
883   EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
884   Thread* thread_;
885   Handler* handler_;
886 };
887 
TEST_F(EnqueueBufferTest,enqueue)888 TEST_F(EnqueueBufferTest, enqueue) {
889   int num_items = 10;
890   for (int i = 0; i < num_items; i++) {
891     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
892   }
893   SynchronizeHandler();
894   for (int i = 0; i < num_items; i++) {
895     ASSERT_EQ(enqueue_.queue_.front(), i);
896     enqueue_.queue_.pop();
897   }
898   ASSERT_FALSE(enqueue_.registered_);
899 }
900 
TEST_F(EnqueueBufferTest,clear)901 TEST_F(EnqueueBufferTest, clear) {
902   enqueue_.dont_handle_register_enqueue_ = true;
903   int num_items = 10;
904   for (int i = 0; i < num_items; i++) {
905     enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
906   }
907   ASSERT_TRUE(enqueue_.registered_);
908   enqueue_buffer_.Clear();
909   ASSERT_FALSE(enqueue_.registered_);
910 }
911 
TEST_F(EnqueueBufferTest,delete_when_in_callback)912 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
913   Queue<int>* queue = new Queue<int>(kQueueSize);
914   EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
915   int num_items = 10;
916   for (int i = 0; i < num_items; i++) {
917     enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
918   }
919 
920   delete enqueue_buffer;
921   delete queue;
922 }
923 
924 }  // namespace
925 }  // namespace os
926 }  // namespace bluetooth
927