1 /*
2  *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/operations_chain.h"
12 
13 #include <functional>
14 #include <memory>
15 #include <utility>
16 #include <vector>
17 
18 #include "rtc_base/bind.h"
19 #include "rtc_base/event.h"
20 #include "rtc_base/thread.h"
21 #include "test/gmock.h"
22 #include "test/gtest.h"
23 
24 namespace rtc {
25 
26 using ::testing::ElementsAre;
27 
28 class OperationTracker {
29  public:
OperationTracker()30   OperationTracker() : background_thread_(Thread::Create()) {
31     background_thread_->Start();
32   }
33   // The caller is responsible for ensuring that no operations are pending.
~OperationTracker()34   ~OperationTracker() {}
35 
36   // Creates a binding for the synchronous operation (see
37   // StartSynchronousOperation() below).
BindSynchronousOperation(Event * operation_complete_event)38   std::function<void(std::function<void()>)> BindSynchronousOperation(
39       Event* operation_complete_event) {
40     return [this, operation_complete_event](std::function<void()> callback) {
41       StartSynchronousOperation(operation_complete_event, std::move(callback));
42     };
43   }
44 
45   // Creates a binding for the asynchronous operation (see
46   // StartAsynchronousOperation() below).
BindAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event)47   std::function<void(std::function<void()>)> BindAsynchronousOperation(
48       Event* unblock_operation_event,
49       Event* operation_complete_event) {
50     return [this, unblock_operation_event,
51             operation_complete_event](std::function<void()> callback) {
52       StartAsynchronousOperation(unblock_operation_event,
53                                  operation_complete_event, std::move(callback));
54     };
55   }
56 
57   // When an operation is completed, its associated Event* is added to this
58   // list, in chronological order. This allows you to verify the order that
59   // operations are executed.
completed_operation_events() const60   const std::vector<Event*>& completed_operation_events() const {
61     return completed_operation_events_;
62   }
63 
64  private:
65   // This operation is completed synchronously; the callback is invoked before
66   // the function returns.
StartSynchronousOperation(Event * operation_complete_event,std::function<void ()> callback)67   void StartSynchronousOperation(Event* operation_complete_event,
68                                  std::function<void()> callback) {
69     completed_operation_events_.push_back(operation_complete_event);
70     operation_complete_event->Set();
71     callback();
72   }
73 
74   // This operation is completed asynchronously; it pings |background_thread_|,
75   // blocking that thread until |unblock_operation_event| is signaled and then
76   // completes upon posting back to the thread that the operation started on.
77   // Note that this requires the starting thread to be executing tasks (handle
78   // messages), i.e. must not be blocked.
StartAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event,std::function<void ()> callback)79   void StartAsynchronousOperation(Event* unblock_operation_event,
80                                   Event* operation_complete_event,
81                                   std::function<void()> callback) {
82     Thread* current_thread = Thread::Current();
83     background_thread_->PostTask(
84         RTC_FROM_HERE, [this, current_thread, unblock_operation_event,
85                         operation_complete_event, callback]() {
86           unblock_operation_event->Wait(Event::kForever);
87           current_thread->PostTask(
88               RTC_FROM_HERE, [this, operation_complete_event, callback]() {
89                 completed_operation_events_.push_back(operation_complete_event);
90                 operation_complete_event->Set();
91                 callback();
92               });
93         });
94   }
95 
96   std::unique_ptr<Thread> background_thread_;
97   std::vector<Event*> completed_operation_events_;
98 };
99 
100 // The OperationTrackerProxy ensures all operations are chained on a separate
101 // thread. This allows tests to block while chained operations are posting
102 // between threads.
103 class OperationTrackerProxy {
104  public:
OperationTrackerProxy()105   OperationTrackerProxy()
106       : operations_chain_thread_(Thread::Create()),
107         operation_tracker_(nullptr),
108         operations_chain_(nullptr) {
109     operations_chain_thread_->Start();
110   }
111 
Initialize()112   std::unique_ptr<Event> Initialize() {
113     std::unique_ptr<Event> event = std::make_unique<Event>();
114     operations_chain_thread_->PostTask(
115         RTC_FROM_HERE, [this, event_ptr = event.get()]() {
116           operation_tracker_ = std::make_unique<OperationTracker>();
117           operations_chain_ = OperationsChain::Create();
118           event_ptr->Set();
119         });
120     return event;
121   }
122 
ReleaseOperationChain()123   std::unique_ptr<Event> ReleaseOperationChain() {
124     std::unique_ptr<Event> event = std::make_unique<Event>();
125     operations_chain_thread_->PostTask(RTC_FROM_HERE,
126                                        [this, event_ptr = event.get()]() {
127                                          operations_chain_ = nullptr;
128                                          event_ptr->Set();
129                                        });
130     return event;
131   }
132 
133   // Chains a synchronous operation on the operation chain's thread.
PostSynchronousOperation()134   std::unique_ptr<Event> PostSynchronousOperation() {
135     std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
136     operations_chain_thread_->PostTask(
137         RTC_FROM_HERE, [this, operation_complete_event_ptr =
138                                   operation_complete_event.get()]() {
139           operations_chain_->ChainOperation(
140               operation_tracker_->BindSynchronousOperation(
141                   operation_complete_event_ptr));
142         });
143     return operation_complete_event;
144   }
145 
146   // Chains an asynchronous operation on the operation chain's thread. This
147   // involves the operation chain thread and an additional background thread.
PostAsynchronousOperation(Event * unblock_operation_event)148   std::unique_ptr<Event> PostAsynchronousOperation(
149       Event* unblock_operation_event) {
150     std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
151     operations_chain_thread_->PostTask(
152         RTC_FROM_HERE,
153         [this, unblock_operation_event,
154          operation_complete_event_ptr = operation_complete_event.get()]() {
155           operations_chain_->ChainOperation(
156               operation_tracker_->BindAsynchronousOperation(
157                   unblock_operation_event, operation_complete_event_ptr));
158         });
159     return operation_complete_event;
160   }
161 
162   // The order of completed events. Touches the |operation_tracker_| on the
163   // calling thread, this is only thread safe if all chained operations have
164   // completed.
completed_operation_events() const165   const std::vector<Event*>& completed_operation_events() const {
166     return operation_tracker_->completed_operation_events();
167   }
168 
169  private:
170   std::unique_ptr<Thread> operations_chain_thread_;
171   std::unique_ptr<OperationTracker> operation_tracker_;
172   scoped_refptr<OperationsChain> operations_chain_;
173 };
174 
175 // On destruction, sets a boolean flag to true.
176 class SignalOnDestruction final {
177  public:
SignalOnDestruction(bool * destructor_called)178   SignalOnDestruction(bool* destructor_called)
179       : destructor_called_(destructor_called) {
180     RTC_DCHECK(destructor_called_);
181   }
~SignalOnDestruction()182   ~SignalOnDestruction() {
183     // Moved objects will have |destructor_called_| set to null. Destroying a
184     // moved SignalOnDestruction should not signal.
185     if (destructor_called_) {
186       *destructor_called_ = true;
187     }
188   }
189 
190   // Move operators.
SignalOnDestruction(SignalOnDestruction && other)191   SignalOnDestruction(SignalOnDestruction&& other)
192       : SignalOnDestruction(other.destructor_called_) {
193     other.destructor_called_ = nullptr;
194   }
operator =(SignalOnDestruction && other)195   SignalOnDestruction& operator=(SignalOnDestruction&& other) {
196     destructor_called_ = other.destructor_called_;
197     other.destructor_called_ = nullptr;
198     return *this;
199   }
200 
201  private:
202   bool* destructor_called_;
203 
204   RTC_DISALLOW_COPY_AND_ASSIGN(SignalOnDestruction);
205 };
206 
TEST(OperationsChainTest,SynchronousOperation)207 TEST(OperationsChainTest, SynchronousOperation) {
208   OperationTrackerProxy operation_tracker_proxy;
209   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
210 
211   operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
212 }
213 
TEST(OperationsChainTest,AsynchronousOperation)214 TEST(OperationsChainTest, AsynchronousOperation) {
215   OperationTrackerProxy operation_tracker_proxy;
216   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
217 
218   Event unblock_async_operation_event;
219   auto async_operation_completed_event =
220       operation_tracker_proxy.PostAsynchronousOperation(
221           &unblock_async_operation_event);
222   // This should not be signaled until we unblock the operation.
223   EXPECT_FALSE(async_operation_completed_event->Wait(0));
224   // Unblock the operation and wait for it to complete.
225   unblock_async_operation_event.Set();
226   async_operation_completed_event->Wait(Event::kForever);
227 }
228 
TEST(OperationsChainTest,SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty)229 TEST(OperationsChainTest,
230      SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
231   // Testing synchonicity must be done without the OperationTrackerProxy to
232   // ensure messages are not processed in parallel. This test has no background
233   // threads.
234   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
235   OperationTracker operation_tracker;
236   Event event0;
237   operations_chain->ChainOperation(
238       operation_tracker.BindSynchronousOperation(&event0));
239   // This should already be signaled. (If it wasn't, waiting wouldn't help,
240   // because we'd be blocking the only thread that exists.)
241   EXPECT_TRUE(event0.Wait(0));
242   // Chaining another operation should also execute immediately because the
243   // chain should already be empty.
244   Event event1;
245   operations_chain->ChainOperation(
246       operation_tracker.BindSynchronousOperation(&event1));
247   EXPECT_TRUE(event1.Wait(0));
248 }
249 
TEST(OperationsChainTest,AsynchronousOperationBlocksSynchronousOperation)250 TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
251   OperationTrackerProxy operation_tracker_proxy;
252   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
253 
254   Event unblock_async_operation_event;
255   auto async_operation_completed_event =
256       operation_tracker_proxy.PostAsynchronousOperation(
257           &unblock_async_operation_event);
258 
259   auto sync_operation_completed_event =
260       operation_tracker_proxy.PostSynchronousOperation();
261 
262   unblock_async_operation_event.Set();
263 
264   sync_operation_completed_event->Wait(Event::kForever);
265   // The asynchronous avent should have blocked the synchronous event, meaning
266   // this should already be signaled.
267   EXPECT_TRUE(async_operation_completed_event->Wait(0));
268 }
269 
TEST(OperationsChainTest,OperationsAreExecutedInOrder)270 TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
271   OperationTrackerProxy operation_tracker_proxy;
272   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
273 
274   // Chain a mix of asynchronous and synchronous operations.
275   Event operation0_unblock_event;
276   auto operation0_completed_event =
277       operation_tracker_proxy.PostAsynchronousOperation(
278           &operation0_unblock_event);
279 
280   Event operation1_unblock_event;
281   auto operation1_completed_event =
282       operation_tracker_proxy.PostAsynchronousOperation(
283           &operation1_unblock_event);
284 
285   auto operation2_completed_event =
286       operation_tracker_proxy.PostSynchronousOperation();
287 
288   auto operation3_completed_event =
289       operation_tracker_proxy.PostSynchronousOperation();
290 
291   Event operation4_unblock_event;
292   auto operation4_completed_event =
293       operation_tracker_proxy.PostAsynchronousOperation(
294           &operation4_unblock_event);
295 
296   auto operation5_completed_event =
297       operation_tracker_proxy.PostSynchronousOperation();
298 
299   Event operation6_unblock_event;
300   auto operation6_completed_event =
301       operation_tracker_proxy.PostAsynchronousOperation(
302           &operation6_unblock_event);
303 
304   // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
305   // don't need to be unblocked.
306   operation6_unblock_event.Set();
307   operation4_unblock_event.Set();
308   operation1_unblock_event.Set();
309   operation0_unblock_event.Set();
310   // Await all operations. The await-order shouldn't matter since they all get
311   // executed eventually.
312   operation0_completed_event->Wait(Event::kForever);
313   operation1_completed_event->Wait(Event::kForever);
314   operation2_completed_event->Wait(Event::kForever);
315   operation3_completed_event->Wait(Event::kForever);
316   operation4_completed_event->Wait(Event::kForever);
317   operation5_completed_event->Wait(Event::kForever);
318   operation6_completed_event->Wait(Event::kForever);
319 
320   EXPECT_THAT(
321       operation_tracker_proxy.completed_operation_events(),
322       ElementsAre(
323           operation0_completed_event.get(), operation1_completed_event.get(),
324           operation2_completed_event.get(), operation3_completed_event.get(),
325           operation4_completed_event.get(), operation5_completed_event.get(),
326           operation6_completed_event.get()));
327 }
328 
TEST(OperationsChainTest,SafeToReleaseReferenceToOperationChainWhileOperationIsPending)329 TEST(OperationsChainTest,
330      SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
331   OperationTrackerProxy operation_tracker_proxy;
332   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
333 
334   Event unblock_async_operation_event;
335   auto async_operation_completed_event =
336       operation_tracker_proxy.PostAsynchronousOperation(
337           &unblock_async_operation_event);
338 
339   // Pending operations keep the OperationChain alive, making it safe for the
340   // test to release any references before unblocking the async operation.
341   operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
342 
343   unblock_async_operation_event.Set();
344   async_operation_completed_event->Wait(Event::kForever);
345 }
346 
TEST(OperationsChainTest,FunctorIsNotDestroyedWhileExecuting)347 TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
348   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
349 
350   bool destructor_called = false;
351   SignalOnDestruction signal_on_destruction(&destructor_called);
352 
353   operations_chain->ChainOperation(
354       [signal_on_destruction = std::move(signal_on_destruction),
355        &destructor_called](std::function<void()> callback) {
356         EXPECT_FALSE(destructor_called);
357         // Invoking the callback marks the operation as complete, popping the
358         // Operation object from the OperationsChain internal queue.
359         callback();
360         // Even though the internal Operation object has been destroyed,
361         // variables captured by this lambda expression must still be valid (the
362         // associated functor must not be deleted while executing).
363         EXPECT_FALSE(destructor_called);
364       });
365   // The lambda having executed synchronously and completed, its captured
366   // variables should now have been deleted.
367   EXPECT_TRUE(destructor_called);
368 }
369 
370 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
371 
TEST(OperationsChainDeathTest,OperationNotInvokingCallbackShouldCrash)372 TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
373   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
374   EXPECT_DEATH(
375       operations_chain->ChainOperation([](std::function<void()> callback) {}),
376       "");
377 }
378 
TEST(OperationsChainDeathTest,OperationInvokingCallbackMultipleTimesShouldCrash)379 TEST(OperationsChainDeathTest,
380      OperationInvokingCallbackMultipleTimesShouldCrash) {
381   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
382   EXPECT_DEATH(
383       operations_chain->ChainOperation([](std::function<void()> callback) {
384         // Signal that the operation has completed multiple times.
385         callback();
386         callback();
387       }),
388       "");
389 }
390 
391 #endif  // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
392 
393 }  // namespace rtc
394