1 /*
2  *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 #include "api/task_queue/task_queue_test.h"
11 
12 #include "absl/memory/memory.h"
13 #include "absl/strings/string_view.h"
14 #include "rtc_base/event.h"
15 #include "rtc_base/ref_counter.h"
16 #include "rtc_base/task_utils/to_queued_task.h"
17 #include "rtc_base/time_utils.h"
18 
19 namespace webrtc {
20 namespace {
21 
CreateTaskQueue(const std::unique_ptr<webrtc::TaskQueueFactory> & factory,absl::string_view task_queue_name,TaskQueueFactory::Priority priority=TaskQueueFactory::Priority::NORMAL)22 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
23     const std::unique_ptr<webrtc::TaskQueueFactory>& factory,
24     absl::string_view task_queue_name,
25     TaskQueueFactory::Priority priority = TaskQueueFactory::Priority::NORMAL) {
26   return factory->CreateTaskQueue(task_queue_name, priority);
27 }
28 
TEST_P(TaskQueueTest,Construct)29 TEST_P(TaskQueueTest, Construct) {
30   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
31   auto queue = CreateTaskQueue(factory, "Construct");
32   EXPECT_FALSE(queue->IsCurrent());
33 }
34 
TEST_P(TaskQueueTest,PostAndCheckCurrent)35 TEST_P(TaskQueueTest, PostAndCheckCurrent) {
36   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
37   rtc::Event event;
38   auto queue = CreateTaskQueue(factory, "PostAndCheckCurrent");
39 
40   // We're not running a task, so |queue| shouldn't be current.
41   // Note that because rtc::Thread also supports the TQ interface and
42   // TestMainImpl::Init wraps the main test thread (bugs.webrtc.org/9714), that
43   // means that TaskQueueBase::Current() will still return a valid value.
44   EXPECT_FALSE(queue->IsCurrent());
45 
46   queue->PostTask(ToQueuedTask([&event, &queue] {
47     EXPECT_TRUE(queue->IsCurrent());
48     event.Set();
49   }));
50   EXPECT_TRUE(event.Wait(1000));
51 }
52 
TEST_P(TaskQueueTest,PostCustomTask)53 TEST_P(TaskQueueTest, PostCustomTask) {
54   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
55   rtc::Event ran;
56   auto queue = CreateTaskQueue(factory, "PostCustomImplementation");
57 
58   class CustomTask : public QueuedTask {
59    public:
60     explicit CustomTask(rtc::Event* ran) : ran_(ran) {}
61 
62    private:
63     bool Run() override {
64       ran_->Set();
65       return false;  // Do not allow the task to be deleted by the queue.
66     }
67 
68     rtc::Event* const ran_;
69   } my_task(&ran);
70 
71   queue->PostTask(absl::WrapUnique(&my_task));
72   EXPECT_TRUE(ran.Wait(1000));
73 }
74 
TEST_P(TaskQueueTest,PostDelayedZero)75 TEST_P(TaskQueueTest, PostDelayedZero) {
76   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
77   rtc::Event event;
78   auto queue = CreateTaskQueue(factory, "PostDelayedZero");
79 
80   queue->PostDelayedTask(ToQueuedTask([&event] { event.Set(); }), 0);
81   EXPECT_TRUE(event.Wait(1000));
82 }
83 
TEST_P(TaskQueueTest,PostFromQueue)84 TEST_P(TaskQueueTest, PostFromQueue) {
85   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
86   rtc::Event event;
87   auto queue = CreateTaskQueue(factory, "PostFromQueue");
88 
89   queue->PostTask(ToQueuedTask([&event, &queue] {
90     queue->PostTask(ToQueuedTask([&event] { event.Set(); }));
91   }));
92   EXPECT_TRUE(event.Wait(1000));
93 }
94 
TEST_P(TaskQueueTest,PostDelayed)95 TEST_P(TaskQueueTest, PostDelayed) {
96   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
97   rtc::Event event;
98   auto queue =
99       CreateTaskQueue(factory, "PostDelayed", TaskQueueFactory::Priority::HIGH);
100 
101   int64_t start = rtc::TimeMillis();
102   queue->PostDelayedTask(ToQueuedTask([&event, &queue] {
103                            EXPECT_TRUE(queue->IsCurrent());
104                            event.Set();
105                          }),
106                          100);
107   EXPECT_TRUE(event.Wait(1000));
108   int64_t end = rtc::TimeMillis();
109   // These tests are a little relaxed due to how "powerful" our test bots can
110   // be.  Most recently we've seen windows bots fire the callback after 94-99ms,
111   // which is why we have a little bit of leeway backwards as well.
112   EXPECT_GE(end - start, 90u);
113   EXPECT_NEAR(end - start, 190u, 100u);  // Accept 90-290.
114 }
115 
TEST_P(TaskQueueTest,PostMultipleDelayed)116 TEST_P(TaskQueueTest, PostMultipleDelayed) {
117   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
118   auto queue = CreateTaskQueue(factory, "PostMultipleDelayed");
119 
120   std::vector<rtc::Event> events(100);
121   for (int i = 0; i < 100; ++i) {
122     rtc::Event* event = &events[i];
123     queue->PostDelayedTask(ToQueuedTask([event, &queue] {
124                              EXPECT_TRUE(queue->IsCurrent());
125                              event->Set();
126                            }),
127                            i);
128   }
129 
130   for (rtc::Event& e : events)
131     EXPECT_TRUE(e.Wait(1000));
132 }
133 
TEST_P(TaskQueueTest,PostDelayedAfterDestruct)134 TEST_P(TaskQueueTest, PostDelayedAfterDestruct) {
135   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
136   rtc::Event run;
137   rtc::Event deleted;
138   auto queue = CreateTaskQueue(factory, "PostDelayedAfterDestruct");
139   queue->PostDelayedTask(
140       ToQueuedTask([&run] { run.Set(); }, [&deleted] { deleted.Set(); }), 100);
141   // Destroy the queue.
142   queue = nullptr;
143   // Task might outlive the TaskQueue, but still should be deleted.
144   EXPECT_TRUE(deleted.Wait(1000));
145   EXPECT_FALSE(run.Wait(0));  // and should not run.
146 }
147 
TEST_P(TaskQueueTest,PostAndReuse)148 TEST_P(TaskQueueTest, PostAndReuse) {
149   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
150   rtc::Event event;
151   auto post_queue = CreateTaskQueue(factory, "PostQueue");
152   auto reply_queue = CreateTaskQueue(factory, "ReplyQueue");
153 
154   int call_count = 0;
155 
156   class ReusedTask : public QueuedTask {
157    public:
158     ReusedTask(int* counter, TaskQueueBase* reply_queue, rtc::Event* event)
159         : counter_(*counter), reply_queue_(reply_queue), event_(*event) {
160       EXPECT_EQ(counter_, 0);
161     }
162 
163    private:
164     bool Run() override {
165       if (++counter_ == 1) {
166         reply_queue_->PostTask(absl::WrapUnique(this));
167         // At this point, the object is owned by reply_queue_ and it's
168         // theoratically possible that the object has been deleted (e.g. if
169         // posting wasn't possible).  So, don't touch any member variables here.
170 
171         // Indicate to the current queue that ownership has been transferred.
172         return false;
173       } else {
174         EXPECT_EQ(counter_, 2);
175         EXPECT_TRUE(reply_queue_->IsCurrent());
176         event_.Set();
177         return true;  // Indicate that the object should be deleted.
178       }
179     }
180 
181     int& counter_;
182     TaskQueueBase* const reply_queue_;
183     rtc::Event& event_;
184   };
185 
186   auto task =
187       std::make_unique<ReusedTask>(&call_count, reply_queue.get(), &event);
188   post_queue->PostTask(std::move(task));
189   EXPECT_TRUE(event.Wait(1000));
190 }
191 
TEST_P(TaskQueueTest,PostALot)192 TEST_P(TaskQueueTest, PostALot) {
193   // Waits until DecrementCount called |count| times. Thread safe.
194   class BlockingCounter {
195    public:
196     explicit BlockingCounter(int initial_count) : count_(initial_count) {}
197 
198     void DecrementCount() {
199       if (count_.DecRef() == rtc::RefCountReleaseStatus::kDroppedLastRef) {
200         event_.Set();
201       }
202     }
203     bool Wait(int give_up_after_ms) { return event_.Wait(give_up_after_ms); }
204 
205    private:
206     webrtc_impl::RefCounter count_;
207     rtc::Event event_;
208   };
209 
210   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
211   static constexpr int kTaskCount = 0xffff;
212   rtc::Event posting_done;
213   BlockingCounter all_destroyed(kTaskCount);
214 
215   int tasks_executed = 0;
216   auto task_queue = CreateTaskQueue(factory, "PostALot");
217 
218   task_queue->PostTask(ToQueuedTask([&] {
219     // Post tasks from the queue to guarantee that the 1st task won't be
220     // executed before the last one is posted.
221     for (int i = 0; i < kTaskCount; ++i) {
222       task_queue->PostTask(ToQueuedTask(
223           [&] { ++tasks_executed; }, [&] { all_destroyed.DecrementCount(); }));
224     }
225 
226     posting_done.Set();
227   }));
228 
229   // Before destroying the task queue wait until all child tasks are posted.
230   posting_done.Wait(rtc::Event::kForever);
231   // Destroy the task queue.
232   task_queue = nullptr;
233 
234   // Expect all tasks are destroyed eventually. In some task queue
235   // implementations that might happen on a different thread after task queue is
236   // destroyed.
237   EXPECT_TRUE(all_destroyed.Wait(60000));
238   EXPECT_LE(tasks_executed, kTaskCount);
239 }
240 
241 // Test posting two tasks that have shared state not protected by a
242 // lock. The TaskQueue should guarantee memory read-write order and
243 // FIFO task execution order, so the second task should always see the
244 // changes that were made by the first task.
245 //
246 // If the TaskQueue doesn't properly synchronize the execution of
247 // tasks, there will be a data race, which is undefined behavior. The
248 // EXPECT calls may randomly catch this, but to make the most of this
249 // unit test, run it under TSan or some other tool that is able to
250 // directly detect data races.
TEST_P(TaskQueueTest,PostTwoWithSharedUnprotectedState)251 TEST_P(TaskQueueTest, PostTwoWithSharedUnprotectedState) {
252   std::unique_ptr<webrtc::TaskQueueFactory> factory = GetParam()();
253   struct SharedState {
254     // First task will set this value to 1 and second will assert it.
255     int state = 0;
256   } state;
257 
258   auto queue = CreateTaskQueue(factory, "PostTwoWithSharedUnprotectedState");
259   rtc::Event done;
260   queue->PostTask(ToQueuedTask([&state, &queue, &done] {
261     // Post tasks from queue to guarantee, that 1st task won't be
262     // executed before the second one will be posted.
263     queue->PostTask(ToQueuedTask([&state] { state.state = 1; }));
264     queue->PostTask(ToQueuedTask([&state, &done] {
265       EXPECT_EQ(state.state, 1);
266       done.Set();
267     }));
268     // Check, that state changing tasks didn't start yet.
269     EXPECT_EQ(state.state, 0);
270   }));
271   EXPECT_TRUE(done.Wait(1000));
272 }
273 
274 // TaskQueueTest is a set of tests for any implementation of the TaskQueueBase.
275 // Tests are instantiated next to the concrete implementation(s).
276 // https://github.com/google/googletest/blob/master/googletest/docs/advanced.md#creating-value-parameterized-abstract-tests
277 GTEST_ALLOW_UNINSTANTIATED_PARAMETERIZED_TEST(TaskQueueTest);
278 
279 }  // namespace
280 }  // namespace webrtc
281