1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
7 
8 #include <list>
9 #include <map>
10 #include <memory>
11 #include <random>
12 #include <set>
13 #include <unordered_map>
14 #include <utility>
15 
16 #include "base/atomic_sequence_num.h"
17 #include "base/cancelable_callback.h"
18 #include "base/containers/circular_deque.h"
19 #include "base/debug/task_annotator.h"
20 #include "base/macros.h"
21 #include "base/memory/scoped_refptr.h"
22 #include "base/memory/weak_ptr.h"
23 #include "base/message_loop/message_loop.h"
24 #include "base/pending_task.h"
25 #include "base/run_loop.h"
26 #include "base/single_thread_task_runner.h"
27 #include "base/synchronization/lock.h"
28 #include "base/task/sequence_manager/enqueue_order.h"
29 #include "base/task/sequence_manager/graceful_queue_shutdown_helper.h"
30 #include "base/task/sequence_manager/moveable_auto_lock.h"
31 #include "base/task/sequence_manager/sequence_manager.h"
32 #include "base/task/sequence_manager/task_queue_impl.h"
33 #include "base/task/sequence_manager/task_queue_selector.h"
34 #include "base/task/sequence_manager/thread_controller.h"
35 #include "base/threading/thread_checker.h"
36 
37 namespace base {
38 
39 namespace debug {
40 struct CrashKeyString;
41 }  // namespace debug
42 
43 namespace trace_event {
44 class ConvertableToTraceFormat;
45 }  // namespace trace_event
46 
47 namespace sequence_manager {
48 
49 class SequenceManagerForTest;
50 class TaskQueue;
51 class TaskTimeObserver;
52 class TimeDomain;
53 
54 namespace internal {
55 
56 class RealTimeDomain;
57 class TaskQueueImpl;
58 
59 // The task queue manager provides N task queues and a selector interface for
60 // choosing which task queue to service next. Each task queue consists of two
61 // sub queues:
62 //
63 // 1. Incoming task queue. Tasks that are posted get immediately appended here.
64 //    When a task is appended into an empty incoming queue, the task manager
65 //    work function (DoWork()) is scheduled to run on the main task runner.
66 //
67 // 2. Work queue. If a work queue is empty when DoWork() is entered, tasks from
68 //    the incoming task queue (if any) are moved here. The work queues are
69 //    registered with the selector as input to the scheduling decision.
70 //
71 class BASE_EXPORT SequenceManagerImpl
72     : public SequenceManager,
73       public internal::SequencedTaskSource,
74       public internal::TaskQueueSelector::Observer,
75       public RunLoop::NestingObserver {
76  public:
77   using Observer = SequenceManager::Observer;
78 
79   ~SequenceManagerImpl() override;
80 
81   // Assume direct control over current thread and create a SequenceManager.
82   // This function should be called only once per thread.
83   // This function assumes that a MessageLoop is initialized for
84   // the current thread.
85   static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread();
86 
87   // SequenceManager implementation:
88   void SetObserver(Observer* observer) override;
89   void AddTaskObserver(MessageLoop::TaskObserver* task_observer) override;
90   void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer) override;
91   void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
92   void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
93   void RegisterTimeDomain(TimeDomain* time_domain) override;
94   void UnregisterTimeDomain(TimeDomain* time_domain) override;
95   TimeDomain* GetRealTimeDomain() const override;
96   const TickClock* GetTickClock() const override;
97   TimeTicks NowTicks() const override;
98   void SetDefaultTaskRunner(
99       scoped_refptr<SingleThreadTaskRunner> task_runner) override;
100   void SweepCanceledDelayedTasks() override;
101   bool GetAndClearSystemIsQuiescentBit() override;
102   void SetWorkBatchSize(int work_batch_size) override;
103   void EnableCrashKeys(const char* file_name_crash_key,
104                        const char* function_name_crash_key) override;
105   const MetricRecordingSettings& GetMetricRecordingSettings() const override;
106 
107   // Implementation of SequencedTaskSource:
108   Optional<PendingTask> TakeTask() override;
109   void DidRunTask() override;
110   TimeDelta DelayTillNextTask(LazyNow* lazy_now) override;
111 
112   // Requests that a task to process work is posted on the main task runner.
113   // These tasks are de-duplicated in two buckets: main-thread and all other
114   // threads. This distinction is done to reduce the overhead from locks, we
115   // assume the main-thread path will be hot.
116   void MaybeScheduleImmediateWork(const Location& from_here);
117 
118   // Requests that a delayed task to process work is posted on the main task
119   // runner. These delayed tasks are de-duplicated. Must be called on the thread
120   // this class was created on.
121 
122   // Schedules next wake-up at the given time, cancels any previous requests.
123   // Use TimeTicks::Max() to cancel a wake-up.
124   // Must be called from a TimeDomain only.
125   void SetNextDelayedDoWork(LazyNow* lazy_now, TimeTicks run_time);
126 
127   // Returns the currently executing TaskQueue if any. Must be called on the
128   // thread this class was created on.
129   internal::TaskQueueImpl* currently_executing_task_queue() const;
130 
131   // Unregisters a TaskQueue previously created by |NewTaskQueue()|.
132   // No tasks will run on this queue after this call.
133   void UnregisterTaskQueueImpl(
134       std::unique_ptr<internal::TaskQueueImpl> task_queue);
135 
136   scoped_refptr<internal::GracefulQueueShutdownHelper>
137   GetGracefulQueueShutdownHelper() const;
138 
139   WeakPtr<SequenceManagerImpl> GetWeakPtr();
140 
141  protected:
142   // Create a task queue manager where |controller| controls the thread
143   // on which the tasks are eventually run.
144   explicit SequenceManagerImpl(
145       std::unique_ptr<internal::ThreadController> controller);
146 
147   friend class internal::TaskQueueImpl;
148   friend class ::base::sequence_manager::SequenceManagerForTest;
149 
150  private:
151   enum class ProcessTaskResult {
152     kDeferred,
153     kExecuted,
154     kSequenceManagerDeleted,
155   };
156 
157   struct AnyThread {
158     AnyThread();
159     ~AnyThread();
160 
161     // Task queues with newly available work on the incoming queue.
162     internal::IncomingImmediateWorkList* incoming_immediate_work_list = nullptr;
163   };
164 
165   // SequenceManager maintains a queue of non-nestable tasks since they're
166   // uncommon and allocating an extra deque per TaskQueue will waste the memory.
167   using NonNestableTaskDeque =
168       circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>;
169 
170   // We have to track rentrancy because we support nested runloops but the
171   // selector interface is unaware of those.  This struct keeps track off all
172   // task related state needed to make pairs of TakeTask() / DidRunTask() work.
173   struct ExecutingTask {
ExecutingTaskExecutingTask174     ExecutingTask(internal::TaskQueueImpl::Task&& pending_task,
175                   internal::TaskQueueImpl* task_queue,
176                   TaskQueue::TaskTiming task_timing)
177         : pending_task(std::move(pending_task)),
178           task_queue(task_queue),
179           task_timing(task_timing) {}
180 
181     internal::TaskQueueImpl::Task pending_task;
182     internal::TaskQueueImpl* task_queue = nullptr;
183     TaskQueue::TaskTiming task_timing;
184   };
185 
186   struct MainThreadOnly {
187     MainThreadOnly();
188     ~MainThreadOnly();
189 
190     int nesting_depth = 0;
191     NonNestableTaskDeque non_nestable_task_queue;
192     // TODO(altimin): Switch to instruction pointer crash key when it's
193     // available.
194     debug::CrashKeyString* file_name_crash_key = nullptr;
195     debug::CrashKeyString* function_name_crash_key = nullptr;
196 
197     std::mt19937_64 random_generator;
198     std::uniform_real_distribution<double> uniform_distribution;
199 
200     internal::TaskQueueSelector selector;
201     ObserverList<MessageLoop::TaskObserver> task_observers;
202     ObserverList<TaskTimeObserver> task_time_observers;
203     std::set<TimeDomain*> time_domains;
204     std::unique_ptr<internal::RealTimeDomain> real_time_domain;
205 
206     // List of task queues managed by this SequenceManager.
207     // - active_queues contains queues that are still running tasks.
208     //   Most often they are owned by relevant TaskQueues, but
209     //   queues_to_gracefully_shutdown_ are included here too.
210     // - queues_to_gracefully_shutdown contains queues which should be deleted
211     //   when they become empty.
212     // - queues_to_delete contains soon-to-be-deleted queues, because some
213     //   internal scheduling code does not expect queues to be pulled
214     //   from underneath.
215 
216     std::set<internal::TaskQueueImpl*> active_queues;
217     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
218         queues_to_gracefully_shutdown;
219     std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
220         queues_to_delete;
221 
222     // Scratch space used to store the contents of
223     // any_thread().incoming_immediate_work_list for use by
224     // ReloadEmptyWorkQueues.  We keep hold of this vector to avoid unnecessary
225     // memory allocations.
226     std::vector<internal::TaskQueueImpl*> queues_to_reload;
227 
228     bool task_was_run_on_quiescence_monitored_queue = false;
229 
230     // Due to nested runloops more than one task can be executing concurrently.
231     std::list<ExecutingTask> task_execution_stack;
232 
233     Observer* observer = nullptr;  // NOT OWNED
234   };
235 
236   // TaskQueueSelector::Observer:
237   void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
238 
239   // RunLoop::NestingObserver:
240   void OnBeginNestedRunLoop() override;
241   void OnExitNestedRunLoop() override;
242 
243   // Called by the task queue to inform this SequenceManager of a task that's
244   // about to be queued. This SequenceManager may use this opportunity to add
245   // metadata to |pending_task| before it is moved into the queue.
246   void WillQueueTask(internal::TaskQueueImpl::Task* pending_task);
247 
248   // Delayed Tasks with run_times <= Now() are enqueued onto the work queue and
249   // reloads any empty work queues.
250   void WakeUpReadyDelayedQueues(LazyNow* lazy_now);
251 
252   void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task);
253   void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task);
254 
255   internal::EnqueueOrder GetNextSequenceNumber();
256 
257   std::unique_ptr<trace_event::ConvertableToTraceFormat>
258   AsValueWithSelectorResult(bool should_run,
259                             internal::WorkQueue* selected_work_queue) const;
260 
261   // Adds |queue| to |any_thread().has_incoming_immediate_work_| and if
262   // |queue_is_blocked| is false it makes sure a DoWork is posted.
263   // Can be called from any thread.
264   void OnQueueHasIncomingImmediateWork(internal::TaskQueueImpl* queue,
265                                        internal::EnqueueOrder enqueue_order,
266                                        bool queue_is_blocked);
267 
268   // Returns true if |task_queue| was added to the list, or false if it was
269   // already in the list.  If |task_queue| was inserted, the |order| is set
270   // with |enqueue_order|.
271   bool AddToIncomingImmediateWorkList(internal::TaskQueueImpl* task_queue,
272                                       internal::EnqueueOrder enqueue_order);
273   void RemoveFromIncomingImmediateWorkList(internal::TaskQueueImpl* task_queue);
274 
275   // Calls |ReloadImmediateWorkQueueIfEmpty| on all queues in
276   // |main_thread_only().queues_to_reload|.
277   void ReloadEmptyWorkQueues();
278 
279   std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
280       const TaskQueue::Spec& spec) override;
281 
282   void TakeQueuesToGracefullyShutdownFromHelper();
283 
284   // Deletes queues marked for deletion and empty queues marked for shutdown.
285   void CleanUpQueues();
286 
287   bool ShouldRecordCPUTimeForTask();
288 
289   // Determines if wall time or thread time should be recorded for the next
290   // task.
291   TaskQueue::TaskTiming InitializeTaskTiming(
292       internal::TaskQueueImpl* task_queue);
293 
294   const scoped_refptr<internal::GracefulQueueShutdownHelper>
295       graceful_shutdown_helper_;
296 
297   internal::EnqueueOrder::Generator enqueue_order_generator_;
298 
299   std::unique_ptr<internal::ThreadController> controller_;
300 
301   mutable Lock any_thread_lock_;
302   AnyThread any_thread_;
303 
any_thread()304   struct AnyThread& any_thread() {
305     any_thread_lock_.AssertAcquired();
306     return any_thread_;
307   }
any_thread()308   const struct AnyThread& any_thread() const {
309     any_thread_lock_.AssertAcquired();
310     return any_thread_;
311   }
312 
313   const MetricRecordingSettings metric_recording_settings_;
314 
315   // A check to bail out early during memory corruption.
316   // https://crbug.com/757940
317   bool Validate();
318 
319   int32_t memory_corruption_sentinel_;
320 
321   THREAD_CHECKER(main_thread_checker_);
322   MainThreadOnly main_thread_only_;
main_thread_only()323   MainThreadOnly& main_thread_only() {
324     DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
325     return main_thread_only_;
326   }
main_thread_only()327   const MainThreadOnly& main_thread_only() const {
328     DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
329     return main_thread_only_;
330   }
331 
332   WeakPtrFactory<SequenceManagerImpl> weak_factory_;
333 
334   DISALLOW_COPY_AND_ASSIGN(SequenceManagerImpl);
335 };
336 
337 }  // namespace internal
338 }  // namespace sequence_manager
339 }  // namespace base
340 
341 #endif  // BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
342