1 /*
2 * Copyright 2019 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/operations_chain.h"
12
13 #include <functional>
14 #include <memory>
15 #include <utility>
16 #include <vector>
17
18 #include "rtc_base/bind.h"
19 #include "rtc_base/event.h"
20 #include "rtc_base/thread.h"
21 #include "test/gmock.h"
22 #include "test/gtest.h"
23
24 namespace rtc {
25
26 using ::testing::ElementsAre;
27
28 class OperationTracker {
29 public:
OperationTracker()30 OperationTracker() : background_thread_(Thread::Create()) {
31 background_thread_->Start();
32 }
33 // The caller is responsible for ensuring that no operations are pending.
~OperationTracker()34 ~OperationTracker() {}
35
36 // Creates a binding for the synchronous operation (see
37 // StartSynchronousOperation() below).
BindSynchronousOperation(Event * operation_complete_event)38 std::function<void(std::function<void()>)> BindSynchronousOperation(
39 Event* operation_complete_event) {
40 return [this, operation_complete_event](std::function<void()> callback) {
41 StartSynchronousOperation(operation_complete_event, std::move(callback));
42 };
43 }
44
45 // Creates a binding for the asynchronous operation (see
46 // StartAsynchronousOperation() below).
BindAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event)47 std::function<void(std::function<void()>)> BindAsynchronousOperation(
48 Event* unblock_operation_event,
49 Event* operation_complete_event) {
50 return [this, unblock_operation_event,
51 operation_complete_event](std::function<void()> callback) {
52 StartAsynchronousOperation(unblock_operation_event,
53 operation_complete_event, std::move(callback));
54 };
55 }
56
57 // When an operation is completed, its associated Event* is added to this
58 // list, in chronological order. This allows you to verify the order that
59 // operations are executed.
completed_operation_events() const60 const std::vector<Event*>& completed_operation_events() const {
61 return completed_operation_events_;
62 }
63
64 private:
65 // This operation is completed synchronously; the callback is invoked before
66 // the function returns.
StartSynchronousOperation(Event * operation_complete_event,std::function<void ()> callback)67 void StartSynchronousOperation(Event* operation_complete_event,
68 std::function<void()> callback) {
69 completed_operation_events_.push_back(operation_complete_event);
70 operation_complete_event->Set();
71 callback();
72 }
73
74 // This operation is completed asynchronously; it pings |background_thread_|,
75 // blocking that thread until |unblock_operation_event| is signaled and then
76 // completes upon posting back to the thread that the operation started on.
77 // Note that this requires the starting thread to be executing tasks (handle
78 // messages), i.e. must not be blocked.
StartAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event,std::function<void ()> callback)79 void StartAsynchronousOperation(Event* unblock_operation_event,
80 Event* operation_complete_event,
81 std::function<void()> callback) {
82 Thread* current_thread = Thread::Current();
83 background_thread_->PostTask(
84 RTC_FROM_HERE, [this, current_thread, unblock_operation_event,
85 operation_complete_event, callback]() {
86 unblock_operation_event->Wait(Event::kForever);
87 current_thread->PostTask(
88 RTC_FROM_HERE, [this, operation_complete_event, callback]() {
89 completed_operation_events_.push_back(operation_complete_event);
90 operation_complete_event->Set();
91 callback();
92 });
93 });
94 }
95
96 std::unique_ptr<Thread> background_thread_;
97 std::vector<Event*> completed_operation_events_;
98 };
99
100 // The OperationTrackerProxy ensures all operations are chained on a separate
101 // thread. This allows tests to block while chained operations are posting
102 // between threads.
103 class OperationTrackerProxy {
104 public:
OperationTrackerProxy()105 OperationTrackerProxy()
106 : operations_chain_thread_(Thread::Create()),
107 operation_tracker_(nullptr),
108 operations_chain_(nullptr) {
109 operations_chain_thread_->Start();
110 }
111
Initialize()112 std::unique_ptr<Event> Initialize() {
113 std::unique_ptr<Event> event = std::make_unique<Event>();
114 operations_chain_thread_->PostTask(
115 RTC_FROM_HERE, [this, event_ptr = event.get()]() {
116 operation_tracker_ = std::make_unique<OperationTracker>();
117 operations_chain_ = OperationsChain::Create();
118 event_ptr->Set();
119 });
120 return event;
121 }
122
ReleaseOperationChain()123 std::unique_ptr<Event> ReleaseOperationChain() {
124 std::unique_ptr<Event> event = std::make_unique<Event>();
125 operations_chain_thread_->PostTask(RTC_FROM_HERE,
126 [this, event_ptr = event.get()]() {
127 operations_chain_ = nullptr;
128 event_ptr->Set();
129 });
130 return event;
131 }
132
133 // Chains a synchronous operation on the operation chain's thread.
PostSynchronousOperation()134 std::unique_ptr<Event> PostSynchronousOperation() {
135 std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
136 operations_chain_thread_->PostTask(
137 RTC_FROM_HERE, [this, operation_complete_event_ptr =
138 operation_complete_event.get()]() {
139 operations_chain_->ChainOperation(
140 operation_tracker_->BindSynchronousOperation(
141 operation_complete_event_ptr));
142 });
143 return operation_complete_event;
144 }
145
146 // Chains an asynchronous operation on the operation chain's thread. This
147 // involves the operation chain thread and an additional background thread.
PostAsynchronousOperation(Event * unblock_operation_event)148 std::unique_ptr<Event> PostAsynchronousOperation(
149 Event* unblock_operation_event) {
150 std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
151 operations_chain_thread_->PostTask(
152 RTC_FROM_HERE,
153 [this, unblock_operation_event,
154 operation_complete_event_ptr = operation_complete_event.get()]() {
155 operations_chain_->ChainOperation(
156 operation_tracker_->BindAsynchronousOperation(
157 unblock_operation_event, operation_complete_event_ptr));
158 });
159 return operation_complete_event;
160 }
161
162 // The order of completed events. Touches the |operation_tracker_| on the
163 // calling thread, this is only thread safe if all chained operations have
164 // completed.
completed_operation_events() const165 const std::vector<Event*>& completed_operation_events() const {
166 return operation_tracker_->completed_operation_events();
167 }
168
169 private:
170 std::unique_ptr<Thread> operations_chain_thread_;
171 std::unique_ptr<OperationTracker> operation_tracker_;
172 scoped_refptr<OperationsChain> operations_chain_;
173 };
174
175 // On destruction, sets a boolean flag to true.
176 class SignalOnDestruction final {
177 public:
SignalOnDestruction(bool * destructor_called)178 SignalOnDestruction(bool* destructor_called)
179 : destructor_called_(destructor_called) {
180 RTC_DCHECK(destructor_called_);
181 }
~SignalOnDestruction()182 ~SignalOnDestruction() {
183 // Moved objects will have |destructor_called_| set to null. Destroying a
184 // moved SignalOnDestruction should not signal.
185 if (destructor_called_) {
186 *destructor_called_ = true;
187 }
188 }
189
190 // Move operators.
SignalOnDestruction(SignalOnDestruction && other)191 SignalOnDestruction(SignalOnDestruction&& other)
192 : SignalOnDestruction(other.destructor_called_) {
193 other.destructor_called_ = nullptr;
194 }
operator =(SignalOnDestruction && other)195 SignalOnDestruction& operator=(SignalOnDestruction&& other) {
196 destructor_called_ = other.destructor_called_;
197 other.destructor_called_ = nullptr;
198 return *this;
199 }
200
201 private:
202 bool* destructor_called_;
203
204 RTC_DISALLOW_COPY_AND_ASSIGN(SignalOnDestruction);
205 };
206
TEST(OperationsChainTest,SynchronousOperation)207 TEST(OperationsChainTest, SynchronousOperation) {
208 OperationTrackerProxy operation_tracker_proxy;
209 operation_tracker_proxy.Initialize()->Wait(Event::kForever);
210
211 operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
212 }
213
TEST(OperationsChainTest,AsynchronousOperation)214 TEST(OperationsChainTest, AsynchronousOperation) {
215 OperationTrackerProxy operation_tracker_proxy;
216 operation_tracker_proxy.Initialize()->Wait(Event::kForever);
217
218 Event unblock_async_operation_event;
219 auto async_operation_completed_event =
220 operation_tracker_proxy.PostAsynchronousOperation(
221 &unblock_async_operation_event);
222 // This should not be signaled until we unblock the operation.
223 EXPECT_FALSE(async_operation_completed_event->Wait(0));
224 // Unblock the operation and wait for it to complete.
225 unblock_async_operation_event.Set();
226 async_operation_completed_event->Wait(Event::kForever);
227 }
228
TEST(OperationsChainTest,SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty)229 TEST(OperationsChainTest,
230 SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
231 // Testing synchonicity must be done without the OperationTrackerProxy to
232 // ensure messages are not processed in parallel. This test has no background
233 // threads.
234 scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
235 OperationTracker operation_tracker;
236 Event event0;
237 operations_chain->ChainOperation(
238 operation_tracker.BindSynchronousOperation(&event0));
239 // This should already be signaled. (If it wasn't, waiting wouldn't help,
240 // because we'd be blocking the only thread that exists.)
241 EXPECT_TRUE(event0.Wait(0));
242 // Chaining another operation should also execute immediately because the
243 // chain should already be empty.
244 Event event1;
245 operations_chain->ChainOperation(
246 operation_tracker.BindSynchronousOperation(&event1));
247 EXPECT_TRUE(event1.Wait(0));
248 }
249
TEST(OperationsChainTest,AsynchronousOperationBlocksSynchronousOperation)250 TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
251 OperationTrackerProxy operation_tracker_proxy;
252 operation_tracker_proxy.Initialize()->Wait(Event::kForever);
253
254 Event unblock_async_operation_event;
255 auto async_operation_completed_event =
256 operation_tracker_proxy.PostAsynchronousOperation(
257 &unblock_async_operation_event);
258
259 auto sync_operation_completed_event =
260 operation_tracker_proxy.PostSynchronousOperation();
261
262 unblock_async_operation_event.Set();
263
264 sync_operation_completed_event->Wait(Event::kForever);
265 // The asynchronous avent should have blocked the synchronous event, meaning
266 // this should already be signaled.
267 EXPECT_TRUE(async_operation_completed_event->Wait(0));
268 }
269
TEST(OperationsChainTest,OperationsAreExecutedInOrder)270 TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
271 OperationTrackerProxy operation_tracker_proxy;
272 operation_tracker_proxy.Initialize()->Wait(Event::kForever);
273
274 // Chain a mix of asynchronous and synchronous operations.
275 Event operation0_unblock_event;
276 auto operation0_completed_event =
277 operation_tracker_proxy.PostAsynchronousOperation(
278 &operation0_unblock_event);
279
280 Event operation1_unblock_event;
281 auto operation1_completed_event =
282 operation_tracker_proxy.PostAsynchronousOperation(
283 &operation1_unblock_event);
284
285 auto operation2_completed_event =
286 operation_tracker_proxy.PostSynchronousOperation();
287
288 auto operation3_completed_event =
289 operation_tracker_proxy.PostSynchronousOperation();
290
291 Event operation4_unblock_event;
292 auto operation4_completed_event =
293 operation_tracker_proxy.PostAsynchronousOperation(
294 &operation4_unblock_event);
295
296 auto operation5_completed_event =
297 operation_tracker_proxy.PostSynchronousOperation();
298
299 Event operation6_unblock_event;
300 auto operation6_completed_event =
301 operation_tracker_proxy.PostAsynchronousOperation(
302 &operation6_unblock_event);
303
304 // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
305 // don't need to be unblocked.
306 operation6_unblock_event.Set();
307 operation4_unblock_event.Set();
308 operation1_unblock_event.Set();
309 operation0_unblock_event.Set();
310 // Await all operations. The await-order shouldn't matter since they all get
311 // executed eventually.
312 operation0_completed_event->Wait(Event::kForever);
313 operation1_completed_event->Wait(Event::kForever);
314 operation2_completed_event->Wait(Event::kForever);
315 operation3_completed_event->Wait(Event::kForever);
316 operation4_completed_event->Wait(Event::kForever);
317 operation5_completed_event->Wait(Event::kForever);
318 operation6_completed_event->Wait(Event::kForever);
319
320 EXPECT_THAT(
321 operation_tracker_proxy.completed_operation_events(),
322 ElementsAre(
323 operation0_completed_event.get(), operation1_completed_event.get(),
324 operation2_completed_event.get(), operation3_completed_event.get(),
325 operation4_completed_event.get(), operation5_completed_event.get(),
326 operation6_completed_event.get()));
327 }
328
TEST(OperationsChainTest,SafeToReleaseReferenceToOperationChainWhileOperationIsPending)329 TEST(OperationsChainTest,
330 SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
331 OperationTrackerProxy operation_tracker_proxy;
332 operation_tracker_proxy.Initialize()->Wait(Event::kForever);
333
334 Event unblock_async_operation_event;
335 auto async_operation_completed_event =
336 operation_tracker_proxy.PostAsynchronousOperation(
337 &unblock_async_operation_event);
338
339 // Pending operations keep the OperationChain alive, making it safe for the
340 // test to release any references before unblocking the async operation.
341 operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
342
343 unblock_async_operation_event.Set();
344 async_operation_completed_event->Wait(Event::kForever);
345 }
346
TEST(OperationsChainTest,FunctorIsNotDestroyedWhileExecuting)347 TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
348 scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
349
350 bool destructor_called = false;
351 SignalOnDestruction signal_on_destruction(&destructor_called);
352
353 operations_chain->ChainOperation(
354 [signal_on_destruction = std::move(signal_on_destruction),
355 &destructor_called](std::function<void()> callback) {
356 EXPECT_FALSE(destructor_called);
357 // Invoking the callback marks the operation as complete, popping the
358 // Operation object from the OperationsChain internal queue.
359 callback();
360 // Even though the internal Operation object has been destroyed,
361 // variables captured by this lambda expression must still be valid (the
362 // associated functor must not be deleted while executing).
363 EXPECT_FALSE(destructor_called);
364 });
365 // The lambda having executed synchronously and completed, its captured
366 // variables should now have been deleted.
367 EXPECT_TRUE(destructor_called);
368 }
369
370 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
371
TEST(OperationsChainDeathTest,OperationNotInvokingCallbackShouldCrash)372 TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
373 scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
374 EXPECT_DEATH(
375 operations_chain->ChainOperation([](std::function<void()> callback) {}),
376 "");
377 }
378
TEST(OperationsChainDeathTest,OperationInvokingCallbackMultipleTimesShouldCrash)379 TEST(OperationsChainDeathTest,
380 OperationInvokingCallbackMultipleTimesShouldCrash) {
381 scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
382 EXPECT_DEATH(
383 operations_chain->ChainOperation([](std::function<void()> callback) {
384 // Signal that the operation has completed multiple times.
385 callback();
386 callback();
387 }),
388 "");
389 }
390
391 #endif // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
392
393 } // namespace rtc
394