1 /*
2  *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #ifndef RTC_BASE_OPERATIONS_CHAIN_H_
12 #define RTC_BASE_OPERATIONS_CHAIN_H_
13 
14 #include <functional>
15 #include <memory>
16 #include <queue>
17 #include <set>
18 #include <type_traits>
19 #include <utility>
20 
21 #include "api/scoped_refptr.h"
22 #include "rtc_base/checks.h"
23 #include "rtc_base/constructor_magic.h"
24 #include "rtc_base/ref_count.h"
25 #include "rtc_base/ref_counted_object.h"
26 #include "rtc_base/synchronization/sequence_checker.h"
27 
28 namespace rtc {
29 
30 namespace rtc_operations_chain_internal {
31 
32 // Abstract base class for operations on the OperationsChain. Run() must be
33 // invoked exactly once during the Operation's lifespan.
34 class Operation {
35  public:
~Operation()36   virtual ~Operation() {}
37 
38   virtual void Run() = 0;
39 };
40 
41 // FunctorT is the same as in OperationsChain::ChainOperation(). |callback_| is
42 // passed on to the |functor_| and is used to inform the OperationsChain that
43 // the operation completed. The functor is responsible for invoking the
44 // callback when the operation has completed.
45 template <typename FunctorT>
46 class OperationWithFunctor final : public Operation {
47  public:
OperationWithFunctor(FunctorT && functor,std::function<void ()> callback)48   OperationWithFunctor(FunctorT&& functor, std::function<void()> callback)
49       : functor_(std::forward<FunctorT>(functor)),
50         callback_(std::move(callback)) {}
51 
~OperationWithFunctor()52   ~OperationWithFunctor() override { RTC_DCHECK(has_run_); }
53 
Run()54   void Run() override {
55     RTC_DCHECK(!has_run_);
56 #ifdef RTC_DCHECK_IS_ON
57     has_run_ = true;
58 #endif  // RTC_DCHECK_IS_ON
59     // The functor being executed may invoke the callback synchronously,
60     // marking the operation as complete. As such, |this| OperationWithFunctor
61     // object may get deleted here, including destroying |functor_|. To
62     // protect the functor from self-destruction while running, it is moved to
63     // a local variable.
64     auto functor = std::move(functor_);
65     functor(std::move(callback_));
66     // |this| may now be deleted; don't touch any member variables.
67   }
68 
69  private:
70   typename std::remove_reference<FunctorT>::type functor_;
71   std::function<void()> callback_;
72 #ifdef RTC_DCHECK_IS_ON
73   bool has_run_ = false;
74 #endif  // RTC_DCHECK_IS_ON
75 };
76 
77 }  // namespace rtc_operations_chain_internal
78 
79 // An implementation of an operations chain. An operations chain is used to
80 // ensure that asynchronous tasks are executed in-order with at most one task
81 // running at a time. The notion of an operation chain is defined in
82 // https://w3c.github.io/webrtc-pc/#dfn-operations-chain, though unlike this
83 // implementation, the referenced definition is coupled with a peer connection.
84 //
85 // An operation is an asynchronous task. The operation starts when its functor
86 // is invoked, and completes when the callback that is passed to functor is
87 // invoked by the operation. The operation must start and complete on the same
88 // sequence that the operation was "chained" on. As such, the OperationsChain
89 // operates in a "single-threaded" fashion, but the asynchronous operations may
90 // use any number of threads to achieve "in parallel" behavior.
91 //
92 // When an operation is chained onto the OperationsChain, it is enqueued to be
93 // executed. Operations are executed in FIFO order, where the next operation
94 // does not start until the previous operation has completed. OperationsChain
95 // guarantees that:
96 // - If the operations chain is empty when an operation is chained, the
97 //   operation starts immediately, inside ChainOperation().
98 // - If the operations chain is not empty when an operation is chained, the
99 //   operation starts upon the previous operation completing, inside the
100 //   callback.
101 //
102 // An operation is contractually obligated to invoke the completion callback
103 // exactly once. Cancelling a chained operation is not supported by the
104 // OperationsChain; an operation that wants to be cancellable is responsible for
105 // aborting its own steps. The callback must still be invoked.
106 //
107 // The OperationsChain is kept-alive through reference counting if there are
108 // operations pending. This, together with the contract, guarantees that all
109 // operations that are chained get executed.
110 class OperationsChain final : public RefCountedObject<RefCountInterface> {
111  public:
112   static scoped_refptr<OperationsChain> Create();
113   ~OperationsChain();
114 
115   // Chains an operation. Chained operations are executed in FIFO order. The
116   // operation starts when |functor| is executed by the OperationsChain and is
117   // contractually obligated to invoke the callback passed to it when the
118   // operation is complete. Operations must start and complete on the same
119   // sequence that this method was invoked on.
120   //
121   // If the OperationsChain is empty, the operation starts immediately.
122   // Otherwise it starts upon the previous operation completing.
123   //
124   // Requirements of FunctorT:
125   // - FunctorT is movable.
126   // - FunctorT implements "T operator()(std::function<void()> callback)" or
127   //   "T operator()(std::function<void()> callback) const" for some T (if T is
128   //   not void, the return value is discarded in the invoking sequence). The
129   //   operator starts the operation; when the operation is complete, "callback"
130   //   MUST be invoked, and it MUST be so on the sequence that ChainOperation()
131   //   was invoked on.
132   //
133   // Lambda expressions are valid functors.
134   template <typename FunctorT>
ChainOperation(FunctorT && functor)135   void ChainOperation(FunctorT&& functor) {
136     RTC_DCHECK_RUN_ON(&sequence_checker_);
137     chained_operations_.push(
138         std::make_unique<
139             rtc_operations_chain_internal::OperationWithFunctor<FunctorT>>(
140             std::forward<FunctorT>(functor), CreateOperationsChainCallback()));
141     // If this is the only operation in the chain we execute it immediately.
142     // Otherwise the callback will get invoked when the pending operation
143     // completes which will trigger the next operation to execute.
144     if (chained_operations_.size() == 1) {
145       chained_operations_.front()->Run();
146     }
147   }
148 
149  private:
150   friend class CallbackHandle;
151 
152   // The callback that is passed to an operation's functor (that is used to
153   // inform the OperationsChain that the operation has completed) is of type
154   // std::function<void()>, which is a copyable type. To allow the callback to
155   // be copyable, it is backed up by this reference counted handle. See
156   // CreateOperationsChainCallback().
157   class CallbackHandle final : public RefCountedObject<RefCountInterface> {
158    public:
159     explicit CallbackHandle(scoped_refptr<OperationsChain> operations_chain);
160     ~CallbackHandle();
161 
162     void OnOperationComplete();
163 
164    private:
165     scoped_refptr<OperationsChain> operations_chain_;
166 #ifdef RTC_DCHECK_IS_ON
167     bool has_run_ = false;
168 #endif  // RTC_DCHECK_IS_ON
169 
170     RTC_DISALLOW_COPY_AND_ASSIGN(CallbackHandle);
171   };
172 
173   OperationsChain();
174 
175   std::function<void()> CreateOperationsChainCallback();
176   void OnOperationComplete();
177 
178   webrtc::SequenceChecker sequence_checker_;
179   // FIFO-list of operations that are chained. An operation that is executing
180   // remains on this list until it has completed by invoking the callback passed
181   // to it.
182   std::queue<std::unique_ptr<rtc_operations_chain_internal::Operation>>
183       chained_operations_ RTC_GUARDED_BY(sequence_checker_);
184 
185   RTC_DISALLOW_COPY_AND_ASSIGN(OperationsChain);
186 };
187 
188 }  // namespace rtc
189 
190 #endif  // RTC_BASE_OPERATIONS_CHAIN_H_
191