1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/queue.h"
18
19 #include <sys/eventfd.h>
20
21 #include <atomic>
22 #include <future>
23 #include <unordered_map>
24
25 #include "common/bind.h"
26 #include "gtest/gtest.h"
27 #include "os/reactor.h"
28
29 namespace bluetooth {
30 namespace os {
31 namespace {
32
33 constexpr int kQueueSize = 10;
34 constexpr int kHalfOfQueueSize = kQueueSize / 2;
35 constexpr int kDoubleOfQueueSize = kQueueSize * 2;
36 constexpr int kQueueSizeOne = 1;
37
38 class QueueTest : public ::testing::Test {
39 protected:
SetUp()40 void SetUp() override {
41 enqueue_thread_ = new Thread("enqueue_thread", Thread::Priority::NORMAL);
42 enqueue_handler_ = new Handler(enqueue_thread_);
43 dequeue_thread_ = new Thread("dequeue_thread", Thread::Priority::NORMAL);
44 dequeue_handler_ = new Handler(dequeue_thread_);
45 }
TearDown()46 void TearDown() override {
47 enqueue_handler_->Clear();
48 delete enqueue_handler_;
49 delete enqueue_thread_;
50 dequeue_handler_->Clear();
51 delete dequeue_handler_;
52 delete dequeue_thread_;
53 enqueue_handler_ = nullptr;
54 enqueue_thread_ = nullptr;
55 dequeue_handler_ = nullptr;
56 dequeue_thread_ = nullptr;
57 }
58
59 Thread* enqueue_thread_;
60 Handler* enqueue_handler_;
61 Thread* dequeue_thread_;
62 Handler* dequeue_handler_;
63 };
64
65 class TestEnqueueEnd {
66 public:
TestEnqueueEnd(Queue<std::string> * queue,Handler * handler)67 explicit TestEnqueueEnd(Queue<std::string>* queue, Handler* handler)
68 : count(0), handler_(handler), queue_(queue), delay_(0) {}
69
~TestEnqueueEnd()70 ~TestEnqueueEnd() {}
71
RegisterEnqueue(std::unordered_map<int,std::promise<int>> * promise_map)72 void RegisterEnqueue(std::unordered_map<int, std::promise<int>>* promise_map) {
73 promise_map_ = promise_map;
74 handler_->Post(common::BindOnce(&TestEnqueueEnd::handle_register_enqueue, common::Unretained(this)));
75 }
76
UnregisterEnqueue()77 void UnregisterEnqueue() {
78 std::promise<void> promise;
79 auto future = promise.get_future();
80
81 handler_->Post(
82 common::BindOnce(&TestEnqueueEnd::handle_unregister_enqueue, common::Unretained(this), std::move(promise)));
83 future.wait();
84 }
85
EnqueueCallbackForTest()86 std::unique_ptr<std::string> EnqueueCallbackForTest() {
87 if (delay_ != 0) {
88 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
89 }
90
91 count++;
92 std::unique_ptr<std::string> data = std::move(buffer_.front());
93 buffer_.pop();
94 std::string copy = *data;
95 if (buffer_.empty()) {
96 queue_->UnregisterEnqueue();
97 }
98
99 auto pair = promise_map_->find(buffer_.size());
100 if (pair != promise_map_->end()) {
101 pair->second.set_value(pair->first);
102 promise_map_->erase(pair->first);
103 }
104 return data;
105 }
106
setDelay(int value)107 void setDelay(int value) {
108 delay_ = value;
109 }
110
111 std::queue<std::unique_ptr<std::string>> buffer_;
112 int count;
113
114 private:
115 Handler* handler_;
116 Queue<std::string>* queue_;
117 std::unordered_map<int, std::promise<int>>* promise_map_;
118 int delay_;
119
handle_register_enqueue()120 void handle_register_enqueue() {
121 queue_->RegisterEnqueue(handler_, common::Bind(&TestEnqueueEnd::EnqueueCallbackForTest, common::Unretained(this)));
122 }
123
handle_unregister_enqueue(std::promise<void> promise)124 void handle_unregister_enqueue(std::promise<void> promise) {
125 queue_->UnregisterEnqueue();
126 promise.set_value();
127 }
128 };
129
130 class TestDequeueEnd {
131 public:
TestDequeueEnd(Queue<std::string> * queue,Handler * handler,int capacity)132 explicit TestDequeueEnd(Queue<std::string>* queue, Handler* handler, int capacity)
133 : count(0), handler_(handler), queue_(queue), capacity_(capacity), delay_(0) {}
134
~TestDequeueEnd()135 ~TestDequeueEnd() {}
136
RegisterDequeue(std::unordered_map<int,std::promise<int>> * promise_map)137 void RegisterDequeue(std::unordered_map<int, std::promise<int>>* promise_map) {
138 promise_map_ = promise_map;
139 handler_->Post(common::BindOnce(&TestDequeueEnd::handle_register_dequeue, common::Unretained(this)));
140 }
141
UnregisterDequeue()142 void UnregisterDequeue() {
143 std::promise<void> promise;
144 auto future = promise.get_future();
145
146 handler_->Post(
147 common::BindOnce(&TestDequeueEnd::handle_unregister_dequeue, common::Unretained(this), std::move(promise)));
148 future.wait();
149 }
150
DequeueCallbackForTest()151 void DequeueCallbackForTest() {
152 if (delay_ != 0) {
153 std::this_thread::sleep_for(std::chrono::milliseconds(delay_));
154 }
155
156 count++;
157 std::unique_ptr<std::string> data = queue_->TryDequeue();
158 buffer_.push(std::move(data));
159
160 if (buffer_.size() == capacity_) {
161 queue_->UnregisterDequeue();
162 }
163
164 auto pair = promise_map_->find(buffer_.size());
165 if (pair != promise_map_->end()) {
166 pair->second.set_value(pair->first);
167 promise_map_->erase(pair->first);
168 }
169 }
170
setDelay(int value)171 void setDelay(int value) {
172 delay_ = value;
173 }
174
175 std::queue<std::unique_ptr<std::string>> buffer_;
176 int count;
177
178 private:
179 Handler* handler_;
180 Queue<std::string>* queue_;
181 std::unordered_map<int, std::promise<int>>* promise_map_;
182 int capacity_;
183 int delay_;
184
handle_register_dequeue()185 void handle_register_dequeue() {
186 queue_->RegisterDequeue(handler_, common::Bind(&TestDequeueEnd::DequeueCallbackForTest, common::Unretained(this)));
187 }
188
handle_unregister_dequeue(std::promise<void> promise)189 void handle_unregister_dequeue(std::promise<void> promise) {
190 queue_->UnregisterDequeue();
191 promise.set_value();
192 }
193 };
194
195 // Enqueue end level : 0 -> queue is full, 1 - > queue isn't full
196 // Dequeue end level : 0 -> queue is empty, 1 - > queue isn't empty
197
198 // Test 1 : Queue is empty
199
200 // Enqueue end level : 1
201 // Dequeue end level : 0
202 // Test 1-1 EnqueueCallback should continually be invoked when queue isn't full
TEST_F(QueueTest,register_enqueue_with_empty_queue)203 TEST_F(QueueTest, register_enqueue_with_empty_queue) {
204 Queue<std::string> queue(kQueueSize);
205 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
206
207 // Push kQueueSize data to enqueue_end buffer
208 for (int i = 0; i < kQueueSize; i++) {
209 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
210 test_enqueue_end.buffer_.push(std::move(data));
211 }
212 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
213
214 // Register enqueue and expect data move to Queue
215 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
216 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
217 auto enqueue_future = enqueue_promise_map[0].get_future();
218 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
219 enqueue_future.wait();
220 EXPECT_EQ(enqueue_future.get(), 0);
221 std::this_thread::sleep_for(std::chrono::milliseconds(20));
222 }
223
224 // Enqueue end level : 1
225 // Dequeue end level : 0
226 // Test 1-2 DequeueCallback shouldn't be invoked when queue is empty
TEST_F(QueueTest,register_dequeue_with_empty_queue)227 TEST_F(QueueTest, register_dequeue_with_empty_queue) {
228 Queue<std::string> queue(kQueueSize);
229 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
230
231 // Register dequeue, DequeueCallback shouldn't be invoked
232 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
233 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
234 std::this_thread::sleep_for(std::chrono::milliseconds(20));
235 EXPECT_EQ(test_dequeue_end.count, 0);
236
237 test_dequeue_end.UnregisterDequeue();
238 }
239
240 // Test 2 : Queue is full
241
242 // Enqueue end level : 0
243 // Dequeue end level : 1
244 // Test 2-1 EnqueueCallback shouldn't be invoked when queue is full
TEST_F(QueueTest,register_enqueue_with_full_queue)245 TEST_F(QueueTest, register_enqueue_with_full_queue) {
246 Queue<std::string> queue(kQueueSize);
247 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
248
249 // make Queue full
250 for (int i = 0; i < kQueueSize; i++) {
251 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
252 test_enqueue_end.buffer_.push(std::move(data));
253 }
254 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
255 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
256 auto enqueue_future = enqueue_promise_map[0].get_future();
257 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
258 enqueue_future.wait();
259 EXPECT_EQ(enqueue_future.get(), 0);
260
261 // push some data to enqueue_end buffer and register enqueue;
262 for (int i = 0; i < kHalfOfQueueSize; i++) {
263 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
264 test_enqueue_end.buffer_.push(std::move(data));
265 }
266 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
267
268 // EnqueueCallback shouldn't be invoked
269 std::this_thread::sleep_for(std::chrono::milliseconds(20));
270 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
271 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
272
273 test_enqueue_end.UnregisterEnqueue();
274 }
275
276 // Enqueue end level : 0
277 // Dequeue end level : 1
278 // Test 2-2 DequeueCallback should continually be invoked when queue isn't empty
TEST_F(QueueTest,register_dequeue_with_full_queue)279 TEST_F(QueueTest, register_dequeue_with_full_queue) {
280 Queue<std::string> queue(kQueueSize);
281 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
282 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
283
284 // make Queue full
285 for (int i = 0; i < kQueueSize; i++) {
286 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
287 test_enqueue_end.buffer_.push(std::move(data));
288 }
289 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
290 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
291 auto enqueue_future = enqueue_promise_map[0].get_future();
292 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
293 enqueue_future.wait();
294 EXPECT_EQ(enqueue_future.get(), 0);
295
296 // Register dequeue and expect data move to dequeue end buffer
297 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
298 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
299 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
300 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
301 dequeue_future.wait();
302 EXPECT_EQ(dequeue_future.get(), kQueueSize);
303
304 test_dequeue_end.UnregisterDequeue();
305 }
306
307 // Test 3 : Queue is non-empty and non-full
308
309 // Enqueue end level : 1
310 // Dequeue end level : 1
311 // Test 3-1 Register enqueue with half empty queue, EnqueueCallback should continually be invoked
TEST_F(QueueTest,register_enqueue_with_half_empty_queue)312 TEST_F(QueueTest, register_enqueue_with_half_empty_queue) {
313 Queue<std::string> queue(kQueueSize);
314 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
315
316 // make Queue half empty
317 for (int i = 0; i < kHalfOfQueueSize; i++) {
318 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
319 test_enqueue_end.buffer_.push(std::move(data));
320 }
321 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
322 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
323 auto enqueue_future = enqueue_promise_map[0].get_future();
324 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
325 enqueue_future.wait();
326 EXPECT_EQ(enqueue_future.get(), 0);
327
328 // push some data to enqueue_end buffer and register enqueue;
329 for (int i = 0; i < kHalfOfQueueSize; i++) {
330 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
331 test_enqueue_end.buffer_.push(std::move(data));
332 }
333
334 // Register enqueue and expect data move to Queue
335 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
336 enqueue_future = enqueue_promise_map[0].get_future();
337 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
338 enqueue_future.wait();
339 EXPECT_EQ(enqueue_future.get(), 0);
340 }
341
342 // Enqueue end level : 1
343 // Dequeue end level : 1
344 // Test 3-2 Register dequeue with half empty queue, DequeueCallback should continually be invoked
TEST_F(QueueTest,register_dequeue_with_half_empty_queue)345 TEST_F(QueueTest, register_dequeue_with_half_empty_queue) {
346 Queue<std::string> queue(kQueueSize);
347 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
348 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
349
350 // make Queue half empty
351 for (int i = 0; i < kHalfOfQueueSize; i++) {
352 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
353 test_enqueue_end.buffer_.push(std::move(data));
354 }
355 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
356 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
357 auto enqueue_future = enqueue_promise_map[0].get_future();
358 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
359 enqueue_future.wait();
360 EXPECT_EQ(enqueue_future.get(), 0);
361
362 // Register dequeue and expect data move to dequeue end buffer
363 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
364 dequeue_promise_map.emplace(
365 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
366 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
367 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
368 dequeue_future.wait();
369 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
370
371 test_dequeue_end.UnregisterDequeue();
372 }
373
374 // Dynamic level test
375
376 // Test 4 : Queue becomes full during test, EnqueueCallback should stop to be invoked
377
378 // Enqueue end level : 1 -> 0
379 // Dequeue end level : 1
380 // Test 4-1 Queue becomes full due to only register EnqueueCallback
TEST_F(QueueTest,queue_becomes_full_enqueue_callback_only)381 TEST_F(QueueTest, queue_becomes_full_enqueue_callback_only) {
382 Queue<std::string> queue(kQueueSize);
383 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
384
385 // push double of kQueueSize to enqueue end buffer
386 for (int i = 0; i < kDoubleOfQueueSize; i++) {
387 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
388 test_enqueue_end.buffer_.push(std::move(data));
389 }
390
391 // Register enqueue and expect kQueueSize data move to Queue
392 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
393 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
394 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
395 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
396 enqueue_future.wait();
397 EXPECT_EQ(enqueue_future.get(), kQueueSize);
398
399 // EnqueueCallback shouldn't be invoked and buffer size stay in kQueueSize
400 std::this_thread::sleep_for(std::chrono::milliseconds(20));
401 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kQueueSize);
402 EXPECT_EQ(test_enqueue_end.count, kQueueSize);
403
404 test_enqueue_end.UnregisterEnqueue();
405 }
406
407 // Enqueue end level : 1 -> 0
408 // Dequeue end level : 1
409 // Test 4-2 Queue becomes full due to DequeueCallback unregister during test
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_unregister)410 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_unregister) {
411 Queue<std::string> queue(kQueueSize);
412 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
413 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
414
415 // push double of kQueueSize to enqueue end buffer
416 for (int i = 0; i < kDoubleOfQueueSize; i++) {
417 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
418 test_enqueue_end.buffer_.push(std::move(data));
419 }
420
421 // Register dequeue
422 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
423 dequeue_promise_map.emplace(
424 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
425 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
426 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
427
428 // Register enqueue
429 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
430 enqueue_promise_map.emplace(
431 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
432 auto enqueue_future = enqueue_promise_map[kHalfOfQueueSize].get_future();
433 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
434
435 // Dequeue end will unregister when buffer size is kHalfOfQueueSize
436 dequeue_future.wait();
437 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
438
439 // EnqueueCallback shouldn't be invoked and buffer size stay in kHalfOfQueueSize
440 enqueue_future.wait();
441 EXPECT_EQ(enqueue_future.get(), kHalfOfQueueSize);
442 std::this_thread::sleep_for(std::chrono::milliseconds(20));
443 EXPECT_EQ(test_enqueue_end.buffer_.size(), (size_t)kHalfOfQueueSize);
444 EXPECT_EQ(test_enqueue_end.count, kQueueSize + kHalfOfQueueSize);
445
446 test_enqueue_end.UnregisterEnqueue();
447 }
448
449 // Enqueue end level : 1 -> 0
450 // Dequeue end level : 1
451 // Test 4-3 Queue becomes full due to DequeueCallback is slower
TEST_F(QueueTest,queue_becomes_full_dequeue_callback_slower)452 TEST_F(QueueTest, queue_becomes_full_dequeue_callback_slower) {
453 Queue<std::string> queue(kQueueSize);
454 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
455 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
456
457 // push double of kDoubleOfQueueSize to enqueue end buffer
458 for (int i = 0; i < kDoubleOfQueueSize; i++) {
459 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
460 test_enqueue_end.buffer_.push(std::move(data));
461 }
462
463 // Set 20 ms delay for callback and register dequeue
464 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
465 test_dequeue_end.setDelay(20);
466 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
467 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
468
469 // Register enqueue
470 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
471 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
472 auto enqueue_future = enqueue_promise_map[0].get_future();
473 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
474
475 // Wait for enqueue buffer empty and expect queue is full
476 enqueue_future.wait();
477 EXPECT_EQ(enqueue_future.get(), 0);
478 EXPECT_GE(test_dequeue_end.buffer_.size(), kQueueSize - 1);
479
480 test_dequeue_end.UnregisterDequeue();
481 }
482
483 // Enqueue end level : 0 -> 1
484 // Dequeue end level : 1 -> 0
485 // Test 5 Queue becomes full and non empty at same time.
TEST_F(QueueTest,queue_becomes_full_and_non_empty_at_same_time)486 TEST_F(QueueTest, queue_becomes_full_and_non_empty_at_same_time) {
487 Queue<std::string> queue(kQueueSizeOne);
488 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
489 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
490
491 // push double of kQueueSize to enqueue end buffer
492 for (int i = 0; i < kQueueSize; i++) {
493 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
494 test_enqueue_end.buffer_.push(std::move(data));
495 }
496
497 // Register dequeue
498 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
499 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
500 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
501 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
502
503 // Register enqueue
504 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
505 auto enqueue_future = enqueue_promise_map[0].get_future();
506 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
507
508 // Wait for all data move from enqueue end buffer to dequeue end buffer
509 dequeue_future.wait();
510 EXPECT_EQ(dequeue_future.get(), kQueueSize);
511
512 test_dequeue_end.UnregisterDequeue();
513 }
514
515 // Enqueue end level : 1 -> 0
516 // Dequeue end level : 1
517 // Test 6 Queue becomes not full during test, EnqueueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_full_during_test)518 TEST_F(QueueTest, queue_becomes_non_full_during_test) {
519 Queue<std::string> queue(kQueueSize);
520 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
521 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize * 3);
522
523 // make Queue full
524 for (int i = 0; i < kDoubleOfQueueSize; i++) {
525 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
526 test_enqueue_end.buffer_.push(std::move(data));
527 }
528 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
529 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
530 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
531 auto enqueue_future = enqueue_promise_map[kQueueSize].get_future();
532 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
533 enqueue_future.wait();
534 EXPECT_EQ(enqueue_future.get(), kQueueSize);
535
536 // Expect kQueueSize data block in enqueue end buffer
537 std::this_thread::sleep_for(std::chrono::milliseconds(20));
538 EXPECT_EQ(test_enqueue_end.buffer_.size(), kQueueSize);
539
540 // Register dequeue
541 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
542 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
543
544 // Expect enqueue end will empty
545 enqueue_future = enqueue_promise_map[0].get_future();
546 enqueue_future.wait();
547 EXPECT_EQ(enqueue_future.get(), 0);
548
549 test_dequeue_end.UnregisterDequeue();
550 }
551
552 // Enqueue end level : 0 -> 1
553 // Dequeue end level : 1 -> 0
554 // Test 7 Queue becomes non full and empty at same time. (Exactly same as Test 5)
TEST_F(QueueTest,queue_becomes_non_full_and_empty_at_same_time)555 TEST_F(QueueTest, queue_becomes_non_full_and_empty_at_same_time) {
556 Queue<std::string> queue(kQueueSizeOne);
557 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
558 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kDoubleOfQueueSize);
559
560 // push double of kQueueSize to enqueue end buffer
561 for (int i = 0; i < kQueueSize; i++) {
562 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
563 test_enqueue_end.buffer_.push(std::move(data));
564 }
565
566 // Register dequeue
567 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
568 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
569 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
570 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
571
572 // Register enqueue
573 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
574 auto enqueue_future = enqueue_promise_map[0].get_future();
575 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
576
577 // Wait for all data move from enqueue end buffer to dequeue end buffer
578 dequeue_future.wait();
579 EXPECT_EQ(dequeue_future.get(), kQueueSize);
580
581 test_dequeue_end.UnregisterDequeue();
582 }
583
584 // Test 8 : Queue becomes empty during test, DequeueCallback should stop to be invoked
585
586 // Enqueue end level : 1
587 // Dequeue end level : 1 -> 0
588 // Test 8-1 Queue becomes empty due to only register DequeueCallback
TEST_F(QueueTest,queue_becomes_empty_dequeue_callback_only)589 TEST_F(QueueTest, queue_becomes_empty_dequeue_callback_only) {
590 Queue<std::string> queue(kQueueSize);
591 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
592 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kHalfOfQueueSize);
593
594 // make Queue half empty
595 for (int i = 0; i < kHalfOfQueueSize; i++) {
596 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
597 test_enqueue_end.buffer_.push(std::move(data));
598 }
599 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
600 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
601 auto enqueue_future = enqueue_promise_map[0].get_future();
602 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
603 enqueue_future.wait();
604 EXPECT_EQ(enqueue_future.get(), 0);
605
606 // Register dequeue, expect kHalfOfQueueSize data move to dequeue end buffer
607 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
608 dequeue_promise_map.emplace(
609 std::piecewise_construct, std::forward_as_tuple(kHalfOfQueueSize), std::forward_as_tuple());
610 auto dequeue_future = dequeue_promise_map[kHalfOfQueueSize].get_future();
611 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
612 dequeue_future.wait();
613 EXPECT_EQ(dequeue_future.get(), kHalfOfQueueSize);
614
615 // Expect DequeueCallback should stop to be invoked
616 std::this_thread::sleep_for(std::chrono::milliseconds(20));
617 EXPECT_EQ(test_dequeue_end.count, kHalfOfQueueSize);
618 }
619
620 // Enqueue end level : 1
621 // Dequeue end level : 1 -> 0
622 // Test 8-2 Queue becomes empty due to EnqueueCallback unregister during test
TEST_F(QueueTest,queue_becomes_empty_enqueue_callback_unregister)623 TEST_F(QueueTest, queue_becomes_empty_enqueue_callback_unregister) {
624 Queue<std::string> queue(kQueueSize);
625 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
626 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
627
628 // make Queue half empty
629 for (int i = 0; i < kHalfOfQueueSize; i++) {
630 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
631 test_enqueue_end.buffer_.push(std::move(data));
632 }
633 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
634 enqueue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(0), std::forward_as_tuple());
635 auto enqueue_future = enqueue_promise_map[0].get_future();
636 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
637 enqueue_future.wait();
638 EXPECT_EQ(enqueue_future.get(), 0);
639
640 // push kHalfOfQueueSize to enqueue end buffer and register enqueue.
641 for (int i = 0; i < kHalfOfQueueSize; i++) {
642 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
643 test_enqueue_end.buffer_.push(std::move(data));
644 }
645 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
646
647 // Register dequeue, expect kQueueSize move to dequeue end buffer
648 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
649 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
650 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
651 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
652 dequeue_future.wait();
653 EXPECT_EQ(dequeue_future.get(), kQueueSize);
654
655 // Expect DequeueCallback should stop to be invoked
656 std::this_thread::sleep_for(std::chrono::milliseconds(20));
657 EXPECT_EQ(test_dequeue_end.count, kQueueSize);
658 }
659
660 // Enqueue end level : 1
661 // Dequeue end level : 0 -> 1
662 // Test 9 Queue becomes not empty during test, DequeueCallback should start to be invoked
TEST_F(QueueTest,queue_becomes_non_empty_during_test)663 TEST_F(QueueTest, queue_becomes_non_empty_during_test) {
664 Queue<std::string> queue(kQueueSize);
665 TestEnqueueEnd test_enqueue_end(&queue, enqueue_handler_);
666 TestDequeueEnd test_dequeue_end(&queue, dequeue_handler_, kQueueSize);
667
668 // Register dequeue
669 std::unordered_map<int, std::promise<int>> dequeue_promise_map;
670 dequeue_promise_map.emplace(std::piecewise_construct, std::forward_as_tuple(kQueueSize), std::forward_as_tuple());
671 test_dequeue_end.RegisterDequeue(&dequeue_promise_map);
672
673 // push kQueueSize data to enqueue end buffer and register enqueue
674 for (int i = 0; i < kQueueSize; i++) {
675 std::unique_ptr<std::string> data = std::make_unique<std::string>(std::to_string(i));
676 test_enqueue_end.buffer_.push(std::move(data));
677 }
678 std::unordered_map<int, std::promise<int>> enqueue_promise_map;
679 test_enqueue_end.RegisterEnqueue(&enqueue_promise_map);
680
681 // Expect kQueueSize data move to dequeue end buffer
682 auto dequeue_future = dequeue_promise_map[kQueueSize].get_future();
683 dequeue_future.wait();
684 EXPECT_EQ(dequeue_future.get(), kQueueSize);
685 }
686
TEST_F(QueueTest,pass_smart_pointer_and_unregister)687 TEST_F(QueueTest, pass_smart_pointer_and_unregister) {
688 Queue<std::string>* queue = new Queue<std::string>(kQueueSize);
689
690 // Enqueue a string
691 std::string valid = "Valid String";
692 std::shared_ptr<std::string> shared = std::make_shared<std::string>(valid);
693 queue->RegisterEnqueue(
694 enqueue_handler_,
695 common::Bind(
696 [](Queue<std::string>* queue, std::shared_ptr<std::string> shared) {
697 queue->UnregisterEnqueue();
698 return std::make_unique<std::string>(*shared);
699 },
700 common::Unretained(queue),
701 shared));
702
703 // Dequeue the string
704 queue->RegisterDequeue(
705 dequeue_handler_,
706 common::Bind(
707 [](Queue<std::string>* queue, std::string valid) {
708 queue->UnregisterDequeue();
709 auto answer = *queue->TryDequeue();
710 ASSERT_EQ(answer, valid);
711 },
712 common::Unretained(queue),
713 valid));
714
715 // Wait for both handlers to finish and delete the Queue
716 std::promise<void> promise;
717 auto future = promise.get_future();
718
719 enqueue_handler_->Post(common::BindOnce(
720 [](os::Handler* dequeue_handler, Queue<std::string>* queue, std::promise<void>* promise) {
721 dequeue_handler->Post(common::BindOnce(
722 [](Queue<std::string>* queue, std::promise<void>* promise) {
723 delete queue;
724 promise->set_value();
725 },
726 common::Unretained(queue),
727 common::Unretained(promise)));
728 },
729 common::Unretained(dequeue_handler_),
730 common::Unretained(queue),
731 common::Unretained(&promise)));
732 future.wait();
733 }
734
sleep_and_enqueue_callback(int * to_increase)735 std::unique_ptr<std::string> sleep_and_enqueue_callback(int* to_increase) {
736 std::this_thread::sleep_for(std::chrono::milliseconds(100));
737 (*to_increase)++;
738 return std::make_unique<std::string>("Hello");
739 }
740
TEST_F(QueueTest,unregister_enqueue_and_wait)741 TEST_F(QueueTest, unregister_enqueue_and_wait) {
742 Queue<std::string> queue(10);
743 int* indicator = new int(100);
744 queue.RegisterEnqueue(enqueue_handler_, common::Bind(&sleep_and_enqueue_callback, common::Unretained(indicator)));
745 std::this_thread::sleep_for(std::chrono::milliseconds(50));
746 queue.UnregisterEnqueue();
747 EXPECT_EQ(*indicator, 101);
748 delete indicator;
749 }
750
sleep_and_enqueue_callback_and_unregister(int * to_increase,Queue<std::string> * queue,std::atomic_bool * is_registered)751 std::unique_ptr<std::string> sleep_and_enqueue_callback_and_unregister(
752 int* to_increase, Queue<std::string>* queue, std::atomic_bool* is_registered) {
753 std::this_thread::sleep_for(std::chrono::milliseconds(100));
754 (*to_increase)++;
755 if (is_registered->exchange(false)) {
756 queue->UnregisterEnqueue();
757 }
758 return std::make_unique<std::string>("Hello");
759 }
760
TEST_F(QueueTest,unregister_enqueue_and_wait_maybe_unregistered)761 TEST_F(QueueTest, unregister_enqueue_and_wait_maybe_unregistered) {
762 Queue<std::string> queue(10);
763 int* indicator = new int(100);
764 std::atomic_bool is_registered = true;
765 queue.RegisterEnqueue(
766 enqueue_handler_,
767 common::Bind(
768 &sleep_and_enqueue_callback_and_unregister,
769 common::Unretained(indicator),
770 common::Unretained(&queue),
771 common::Unretained(&is_registered)));
772 std::this_thread::sleep_for(std::chrono::milliseconds(50));
773 if (is_registered.exchange(false)) {
774 queue.UnregisterEnqueue();
775 }
776 EXPECT_EQ(*indicator, 101);
777 delete indicator;
778 }
779
sleep_and_dequeue_callback(int * to_increase)780 void sleep_and_dequeue_callback(int* to_increase) {
781 std::this_thread::sleep_for(std::chrono::milliseconds(100));
782 (*to_increase)++;
783 }
784
TEST_F(QueueTest,unregister_dequeue_and_wait)785 TEST_F(QueueTest, unregister_dequeue_and_wait) {
786 int* indicator = new int(100);
787 Queue<std::string> queue(10);
788 queue.RegisterEnqueue(
789 enqueue_handler_,
790 common::Bind(
791 [](Queue<std::string>* queue) {
792 queue->UnregisterEnqueue();
793 return std::make_unique<std::string>("Hello");
794 },
795 common::Unretained(&queue)));
796 queue.RegisterDequeue(enqueue_handler_, common::Bind(&sleep_and_dequeue_callback, common::Unretained(indicator)));
797 std::this_thread::sleep_for(std::chrono::milliseconds(50));
798 queue.UnregisterDequeue();
799 EXPECT_EQ(*indicator, 101);
800 delete indicator;
801 }
802
803 // Create all threads for death tests in the function that dies
804 class QueueDeathTest : public ::testing::Test {
805 public:
RegisterEnqueueAndDelete()806 void RegisterEnqueueAndDelete() {
807 Thread* enqueue_thread = new Thread("enqueue_thread", Thread::Priority::NORMAL);
808 Handler* enqueue_handler = new Handler(enqueue_thread);
809 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
810 queue->RegisterEnqueue(
811 enqueue_handler, common::Bind([]() { return std::make_unique<std::string>("A string to fill the queue"); }));
812 delete queue;
813 }
814
RegisterDequeueAndDelete()815 void RegisterDequeueAndDelete() {
816 Thread* dequeue_thread = new Thread("dequeue_thread", Thread::Priority::NORMAL);
817 Handler* dequeue_handler = new Handler(dequeue_thread);
818 Queue<std::string>* queue = new Queue<std::string>(kQueueSizeOne);
819 queue->RegisterDequeue(
820 dequeue_handler,
821 common::Bind([](Queue<std::string>* queue) { queue->TryDequeue(); }, common::Unretained(queue)));
822 delete queue;
823 }
824 };
825
TEST_F(QueueDeathTest,die_if_enqueue_not_unregistered)826 TEST_F(QueueDeathTest, die_if_enqueue_not_unregistered) {
827 EXPECT_DEATH(RegisterEnqueueAndDelete(), "nqueue");
828 }
829
TEST_F(QueueDeathTest,die_if_dequeue_not_unregistered)830 TEST_F(QueueDeathTest, die_if_dequeue_not_unregistered) {
831 EXPECT_DEATH(RegisterDequeueAndDelete(), "equeue");
832 }
833
834 class MockIQueueEnqueue : public IQueueEnqueue<int> {
835 public:
RegisterEnqueue(Handler * handler,EnqueueCallback callback)836 void RegisterEnqueue(Handler* handler, EnqueueCallback callback) override {
837 EXPECT_FALSE(registered_);
838 registered_ = true;
839 handler->Post(common::BindOnce(&MockIQueueEnqueue::handle_register_enqueue, common::Unretained(this), callback));
840 }
841
handle_register_enqueue(EnqueueCallback callback)842 void handle_register_enqueue(EnqueueCallback callback) {
843 if (dont_handle_register_enqueue_) {
844 return;
845 }
846 while (registered_) {
847 std::unique_ptr<int> front = callback.Run();
848 queue_.push(*front);
849 }
850 }
851
UnregisterEnqueue()852 void UnregisterEnqueue() override {
853 EXPECT_TRUE(registered_);
854 registered_ = false;
855 }
856
857 bool dont_handle_register_enqueue_ = false;
858 bool registered_ = false;
859 std::queue<int> queue_;
860 };
861
862 class EnqueueBufferTest : public ::testing::Test {
863 protected:
SetUp()864 void SetUp() override {
865 thread_ = new Thread("test_thread", Thread::Priority::NORMAL);
866 handler_ = new Handler(thread_);
867 }
868
TearDown()869 void TearDown() override {
870 handler_->Clear();
871 delete handler_;
872 delete thread_;
873 }
874
SynchronizeHandler()875 void SynchronizeHandler() {
876 std::promise<void> promise;
877 auto future = promise.get_future();
878 handler_->Post(common::BindOnce([](std::promise<void> promise) { promise.set_value(); }, std::move(promise)));
879 future.wait();
880 }
881
882 MockIQueueEnqueue enqueue_;
883 EnqueueBuffer<int> enqueue_buffer_{&enqueue_};
884 Thread* thread_;
885 Handler* handler_;
886 };
887
TEST_F(EnqueueBufferTest,enqueue)888 TEST_F(EnqueueBufferTest, enqueue) {
889 int num_items = 10;
890 for (int i = 0; i < num_items; i++) {
891 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
892 }
893 SynchronizeHandler();
894 for (int i = 0; i < num_items; i++) {
895 ASSERT_EQ(enqueue_.queue_.front(), i);
896 enqueue_.queue_.pop();
897 }
898 ASSERT_FALSE(enqueue_.registered_);
899 }
900
TEST_F(EnqueueBufferTest,clear)901 TEST_F(EnqueueBufferTest, clear) {
902 enqueue_.dont_handle_register_enqueue_ = true;
903 int num_items = 10;
904 for (int i = 0; i < num_items; i++) {
905 enqueue_buffer_.Enqueue(std::make_unique<int>(i), handler_);
906 }
907 ASSERT_TRUE(enqueue_.registered_);
908 enqueue_buffer_.Clear();
909 ASSERT_FALSE(enqueue_.registered_);
910 }
911
TEST_F(EnqueueBufferTest,delete_when_in_callback)912 TEST_F(EnqueueBufferTest, delete_when_in_callback) {
913 Queue<int>* queue = new Queue<int>(kQueueSize);
914 EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue);
915 int num_items = 10;
916 for (int i = 0; i < num_items; i++) {
917 enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_);
918 }
919
920 delete enqueue_buffer;
921 delete queue;
922 }
923
924 } // namespace
925 } // namespace os
926 } // namespace bluetooth
927