1 // Copyright 2018 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
6 #define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
7 
8 #include <stddef.h>
9 
10 #include <memory>
11 #include <set>
12 
13 #include "base/callback.h"
14 #include "base/containers/circular_deque.h"
15 #include "base/macros.h"
16 #include "base/memory/weak_ptr.h"
17 #include "base/message_loop/message_loop.h"
18 #include "base/pending_task.h"
19 #include "base/task/sequence_manager/enqueue_order.h"
20 #include "base/task/sequence_manager/intrusive_heap.h"
21 #include "base/task/sequence_manager/lazily_deallocated_deque.h"
22 #include "base/task/sequence_manager/sequenced_task_source.h"
23 #include "base/task/sequence_manager/task_queue.h"
24 #include "base/threading/thread_checker.h"
25 #include "base/trace_event/trace_event.h"
26 #include "base/trace_event/trace_event_argument.h"
27 
28 namespace base {
29 namespace sequence_manager {
30 
31 class LazyNow;
32 class TimeDomain;
33 
34 namespace internal {
35 
36 class SequenceManagerImpl;
37 class WorkQueue;
38 class WorkQueueSets;
39 
40 struct IncomingImmediateWorkList {
41   IncomingImmediateWorkList* next = nullptr;
42   TaskQueueImpl* queue = nullptr;
43   internal::EnqueueOrder order;
44 };
45 
46 // TaskQueueImpl has four main queues:
47 //
48 // Immediate (non-delayed) tasks:
49 //    |immediate_incoming_queue| - PostTask enqueues tasks here.
50 //    |immediate_work_queue| - SequenceManager takes immediate tasks here.
51 //
52 // Delayed tasks
53 //    |delayed_incoming_queue| - PostDelayedTask enqueues tasks here.
54 //    |delayed_work_queue| - SequenceManager takes delayed tasks here.
55 //
56 // The |immediate_incoming_queue| can be accessed from any thread, the other
57 // queues are main-thread only. To reduce the overhead of locking,
58 // |immediate_work_queue| is swapped with |immediate_incoming_queue| when
59 // |immediate_work_queue| becomes empty.
60 //
61 // Delayed tasks are initially posted to |delayed_incoming_queue| and a wake-up
62 // is scheduled with the TimeDomain.  When the delay has elapsed, the TimeDomain
63 // calls UpdateDelayedWorkQueue and ready delayed tasks are moved into the
64 // |delayed_work_queue|. Note the EnqueueOrder (used for ordering) for a delayed
65 // task is not set until it's moved into the |delayed_work_queue|.
66 //
67 // TaskQueueImpl uses the WorkQueueSets and the TaskQueueSelector to implement
68 // prioritization. Task selection is done by the TaskQueueSelector and when a
69 // queue is selected, it round-robins between the |immediate_work_queue| and
70 // |delayed_work_queue|.  The reason for this is we want to make sure delayed
71 // tasks (normally the most common type) don't starve out immediate work.
72 class BASE_EXPORT TaskQueueImpl {
73  public:
74   TaskQueueImpl(SequenceManagerImpl* sequence_manager,
75                 TimeDomain* time_domain,
76                 const TaskQueue::Spec& spec);
77 
78   ~TaskQueueImpl();
79 
80   // Represents a time at which a task wants to run. Tasks scheduled for the
81   // same point in time will be ordered by their sequence numbers.
82   struct DelayedWakeUp {
83     TimeTicks time;
84     int sequence_num;
85 
86     bool operator!=(const DelayedWakeUp& other) const {
87       return time != other.time || other.sequence_num != sequence_num;
88     }
89 
90     bool operator==(const DelayedWakeUp& other) const {
91       return !(*this != other);
92     }
93 
94     bool operator<=(const DelayedWakeUp& other) const {
95       if (time == other.time) {
96         // Debug gcc builds can compare an element against itself.
97         DCHECK(sequence_num != other.sequence_num || this == &other);
98         // |PostedTask::sequence_num| is int and might wrap around to
99         // a negative number when casted from EnqueueOrder.
100         // This way of comparison handles that properly.
101         return (sequence_num - other.sequence_num) <= 0;
102       }
103       return time < other.time;
104     }
105   };
106 
107   class BASE_EXPORT Task : public TaskQueue::Task {
108    public:
109     Task(TaskQueue::PostedTask task,
110          TimeTicks desired_run_time,
111          EnqueueOrder sequence_number);
112 
113     Task(TaskQueue::PostedTask task,
114          TimeTicks desired_run_time,
115          EnqueueOrder sequence_number,
116          EnqueueOrder enqueue_order);
117 
delayed_wake_up()118     DelayedWakeUp delayed_wake_up() const {
119       // Since we use |sequence_num| in DelayedWakeUp for ordering purposes
120       // and integer overflow handling is type-sensitive it's worth to protect
121       // it from an unnoticed potential change in the PendingTask base class.
122       static_assert(std::is_same<decltype(sequence_num), int>::value, "");
123       return DelayedWakeUp{delayed_run_time, sequence_num};
124     }
125 
enqueue_order()126     EnqueueOrder enqueue_order() const {
127       DCHECK(enqueue_order_);
128       return enqueue_order_;
129     }
130 
set_enqueue_order(EnqueueOrder enqueue_order)131     void set_enqueue_order(EnqueueOrder enqueue_order) {
132       DCHECK(!enqueue_order_);
133       enqueue_order_ = enqueue_order;
134     }
135 
enqueue_order_set()136     bool enqueue_order_set() const { return enqueue_order_; }
137 
138    private:
139     // Similar to sequence number, but ultimately the |enqueue_order_| is what
140     // the scheduler uses for task ordering. For immediate tasks |enqueue_order|
141     // is set when posted, but for delayed tasks it's not defined until they are
142     // enqueued on the |delayed_work_queue_|. This is because otherwise delayed
143     // tasks could run before an immediate task posted after the delayed task.
144     EnqueueOrder enqueue_order_;
145   };
146 
147   // A result retuned by PostDelayedTask. When scheduler failed to post a task
148   // due to being shutdown a task is returned to be destroyed outside the lock.
149   struct PostTaskResult {
150     PostTaskResult();
151     PostTaskResult(bool success, TaskQueue::PostedTask task);
152     PostTaskResult(PostTaskResult&& move_from);
153     PostTaskResult(const PostTaskResult& copy_from) = delete;
154     ~PostTaskResult();
155 
156     static PostTaskResult Success();
157     static PostTaskResult Fail(TaskQueue::PostedTask task);
158 
159     bool success;
160     TaskQueue::PostedTask task;
161   };
162 
163   // Types of queues TaskQueueImpl is maintaining internally.
164   enum class WorkQueueType { kImmediate, kDelayed };
165 
166   // Non-nestable tasks may get deferred but such queue is being maintained on
167   // SequenceManager side, so we need to keep information how to requeue it.
168   struct DeferredNonNestableTask {
169     internal::TaskQueueImpl::Task task;
170     internal::TaskQueueImpl* task_queue;
171     WorkQueueType work_queue_type;
172   };
173 
174   using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>;
175   using OnTaskStartedHandler =
176       RepeatingCallback<void(const TaskQueue::Task&,
177                              const TaskQueue::TaskTiming&)>;
178   using OnTaskCompletedHandler =
179       RepeatingCallback<void(const TaskQueue::Task&,
180                              const TaskQueue::TaskTiming&)>;
181 
182   // TaskQueue implementation.
183   const char* GetName() const;
184   bool RunsTasksInCurrentSequence() const;
185   PostTaskResult PostDelayedTask(TaskQueue::PostedTask task);
186   // Require a reference to enclosing task queue for lifetime control.
187   std::unique_ptr<TaskQueue::QueueEnabledVoter> CreateQueueEnabledVoter(
188       scoped_refptr<TaskQueue> owning_task_queue);
189   bool IsQueueEnabled() const;
190   bool IsEmpty() const;
191   size_t GetNumberOfPendingTasks() const;
192   bool HasTaskToRunImmediately() const;
193   Optional<TimeTicks> GetNextScheduledWakeUp();
194   Optional<DelayedWakeUp> GetNextScheduledWakeUpImpl();
195   void SetQueuePriority(TaskQueue::QueuePriority priority);
196   TaskQueue::QueuePriority GetQueuePriority() const;
197   void AddTaskObserver(MessageLoop::TaskObserver* task_observer);
198   void RemoveTaskObserver(MessageLoop::TaskObserver* task_observer);
199   void SetTimeDomain(TimeDomain* time_domain);
200   TimeDomain* GetTimeDomain() const;
201   void SetBlameContext(trace_event::BlameContext* blame_context);
202   void InsertFence(TaskQueue::InsertFencePosition position);
203   void InsertFenceAt(TimeTicks time);
204   void RemoveFence();
205   bool HasActiveFence();
206   bool BlockedByFence() const;
207   // Implementation of TaskQueue::SetObserver.
208   void SetOnNextWakeUpChangedCallback(OnNextWakeUpChangedCallback callback);
209 
210   void UnregisterTaskQueue();
211 
212   // Returns true if a (potentially hypothetical) task with the specified
213   // |enqueue_order| could run on the queue. Must be called from the main
214   // thread.
215   bool CouldTaskRun(EnqueueOrder enqueue_order) const;
216 
217   // Must only be called from the thread this task queue was created on.
218   void ReloadImmediateWorkQueueIfEmpty();
219 
220   void AsValueInto(TimeTicks now, trace_event::TracedValue* state) const;
221 
GetQuiescenceMonitored()222   bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; }
GetShouldNotifyObservers()223   bool GetShouldNotifyObservers() const { return should_notify_observers_; }
224 
225   void NotifyWillProcessTask(const PendingTask& pending_task);
226   void NotifyDidProcessTask(const PendingTask& pending_task);
227 
228   // Check for available tasks in immediate work queues.
229   // Used to check if we need to generate notifications about delayed work.
230   bool HasPendingImmediateWork();
231 
delayed_work_queue()232   WorkQueue* delayed_work_queue() {
233     return main_thread_only().delayed_work_queue.get();
234   }
235 
delayed_work_queue()236   const WorkQueue* delayed_work_queue() const {
237     return main_thread_only().delayed_work_queue.get();
238   }
239 
immediate_work_queue()240   WorkQueue* immediate_work_queue() {
241     return main_thread_only().immediate_work_queue.get();
242   }
243 
immediate_work_queue()244   const WorkQueue* immediate_work_queue() const {
245     return main_thread_only().immediate_work_queue.get();
246   }
247 
248   // Protected by SequenceManagerImpl's AnyThread lock.
immediate_work_list_storage()249   IncomingImmediateWorkList* immediate_work_list_storage() {
250     return &immediate_work_list_storage_;
251   }
252 
253   // Enqueues any delayed tasks which should be run now on the
254   // |delayed_work_queue|.
255   // Must be called from the main thread.
256   void WakeUpForDelayedWork(LazyNow* lazy_now);
257 
heap_handle()258   HeapHandle heap_handle() const { return main_thread_only().heap_handle; }
259 
set_heap_handle(HeapHandle heap_handle)260   void set_heap_handle(HeapHandle heap_handle) {
261     main_thread_only().heap_handle = heap_handle;
262   }
263 
264   // Pushes |task| onto the front of the specified work queue. Caution must be
265   // taken with this API because you could easily starve out other work.
266   // TODO(kraynov): Simplify non-nestable task logic https://crbug.com/845437.
267   void RequeueDeferredNonNestableTask(DeferredNonNestableTask task);
268 
269   void PushImmediateIncomingTaskForTest(TaskQueueImpl::Task&& task);
270 
271   class QueueEnabledVoterImpl : public TaskQueue::QueueEnabledVoter {
272    public:
273     explicit QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue);
274     ~QueueEnabledVoterImpl() override;
275 
276     // QueueEnabledVoter implementation.
277     void SetQueueEnabled(bool enabled) override;
278 
GetTaskQueueForTest()279     TaskQueueImpl* GetTaskQueueForTest() const {
280       return task_queue_->GetTaskQueueImpl();
281     }
282 
283    private:
284     friend class TaskQueueImpl;
285 
286     scoped_refptr<TaskQueue> task_queue_;
287     bool enabled_;
288   };
289 
290   // Iterates over |delayed_incoming_queue| removing canceled tasks.
291   void SweepCanceledDelayedTasks(TimeTicks now);
292 
293   // Allows wrapping TaskQueue to set a handler to subscribe for notifications
294   // about started and completed tasks.
295   void SetOnTaskStartedHandler(OnTaskStartedHandler handler);
296   void OnTaskStarted(const TaskQueue::Task& task,
297                      const TaskQueue::TaskTiming& task_timing);
298   void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler);
299   void OnTaskCompleted(const TaskQueue::Task& task,
300                        const TaskQueue::TaskTiming& task_timing);
301   bool RequiresTaskTiming() const;
302 
303   WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
304 
305   scoped_refptr<GracefulQueueShutdownHelper> GetGracefulQueueShutdownHelper();
306 
307   // Returns true if this queue is unregistered or task queue manager is deleted
308   // and this queue can be safely deleted on any thread.
309   bool IsUnregistered() const;
310 
311   // Disables queue for testing purposes, when a QueueEnabledVoter can't be
312   // constructed due to not having TaskQueue.
313   void SetQueueEnabledForTest(bool enabled);
314 
315  protected:
316   void SetDelayedWakeUpForTesting(Optional<DelayedWakeUp> wake_up);
317 
318  private:
319   friend class WorkQueue;
320   friend class WorkQueueTest;
321 
322   struct AnyThread {
323     AnyThread(SequenceManagerImpl* sequence_manager, TimeDomain* time_domain);
324     ~AnyThread();
325 
326     // SequenceManagerImpl, TimeDomain and Observer are maintained in two
327     // copies: inside AnyThread and inside MainThreadOnly. They can be changed
328     // only from main thread, so it should be locked before accessing from other
329     // threads.
330     SequenceManagerImpl* sequence_manager;
331     TimeDomain* time_domain;
332     // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
333     OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
334   };
335 
336   struct MainThreadOnly {
337     MainThreadOnly(SequenceManagerImpl* sequence_manager,
338                    TaskQueueImpl* task_queue,
339                    TimeDomain* time_domain);
340     ~MainThreadOnly();
341 
342     // Another copy of SequenceManagerImpl, TimeDomain and Observer
343     // for lock-free access from the main thread.
344     // See description inside struct AnyThread for details.
345     SequenceManagerImpl* sequence_manager;
346     TimeDomain* time_domain;
347     // Callback corresponding to TaskQueue::Observer::OnQueueNextChanged.
348     OnNextWakeUpChangedCallback on_next_wake_up_changed_callback;
349 
350     std::unique_ptr<WorkQueue> delayed_work_queue;
351     std::unique_ptr<WorkQueue> immediate_work_queue;
352     std::priority_queue<TaskQueueImpl::Task> delayed_incoming_queue;
353     ObserverList<MessageLoop::TaskObserver> task_observers;
354     size_t set_index;
355     HeapHandle heap_handle;
356     int is_enabled_refcount;
357     int voter_refcount;
358     trace_event::BlameContext* blame_context;  // Not owned.
359     EnqueueOrder current_fence;
360     Optional<TimeTicks> delayed_fence;
361     OnTaskStartedHandler on_task_started_handler;
362     OnTaskCompletedHandler on_task_completed_handler;
363     // Last reported wake up, used only in UpdateWakeUp to avoid
364     // excessive calls.
365     Optional<DelayedWakeUp> scheduled_wake_up;
366     // If false, queue will be disabled. Used only for tests.
367     bool is_enabled_for_test;
368   };
369 
370   PostTaskResult PostImmediateTaskImpl(TaskQueue::PostedTask task);
371   PostTaskResult PostDelayedTaskImpl(TaskQueue::PostedTask task);
372 
373   // Push the task onto the |delayed_incoming_queue|. Lock-free main thread
374   // only fast path.
375   void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,
376                                                   TimeTicks now);
377 
378   // Push the task onto the |delayed_incoming_queue|.  Slow path from other
379   // threads.
380   void PushOntoDelayedIncomingQueueLocked(Task pending_task);
381 
382   void ScheduleDelayedWorkTask(Task pending_task);
383 
384   void MoveReadyImmediateTasksToImmediateWorkQueueLocked();
385 
386   // Push the task onto the |immediate_incoming_queue| and for auto pumped
387   // queues it calls MaybePostDoWorkOnMainRunner if the Incoming queue was
388   // empty.
389   void PushOntoImmediateIncomingQueueLocked(Task task);
390 
391   using TaskDeque = circular_deque<Task>;
392 
393   // Extracts all the tasks from the immediate incoming queue and swaps it with
394   // |queue| which must be empty.
395   // Can be called from any thread.
396   void ReloadEmptyImmediateQueue(TaskDeque* queue);
397 
398   void TraceQueueSize() const;
399   static void QueueAsValueInto(const TaskDeque& queue,
400                                TimeTicks now,
401                                trace_event::TracedValue* state);
402   static void QueueAsValueInto(const std::priority_queue<Task>& queue,
403                                TimeTicks now,
404                                trace_event::TracedValue* state);
405   static void TaskAsValueInto(const Task& task,
406                               TimeTicks now,
407                               trace_event::TracedValue* state);
408 
409   void RemoveQueueEnabledVoter(const QueueEnabledVoterImpl* voter);
410   void OnQueueEnabledVoteChanged(bool enabled);
411   void EnableOrDisableWithSelector(bool enable);
412 
413   // Schedules delayed work on time domain and calls the observer.
414   void UpdateDelayedWakeUp(LazyNow* lazy_now);
415   void UpdateDelayedWakeUpImpl(LazyNow* lazy_now,
416                                Optional<DelayedWakeUp> wake_up);
417 
418   // Activate a delayed fence if a time has come.
419   void ActivateDelayedFenceIfNeeded(TimeTicks now);
420 
421   const char* name_;
422 
423   const PlatformThreadId thread_id_;
424 
425   mutable Lock any_thread_lock_;
426   AnyThread any_thread_;
any_thread()427   struct AnyThread& any_thread() {
428     any_thread_lock_.AssertAcquired();
429     return any_thread_;
430   }
any_thread()431   const struct AnyThread& any_thread() const {
432     any_thread_lock_.AssertAcquired();
433     return any_thread_;
434   }
435 
436   ThreadChecker main_thread_checker_;
437   MainThreadOnly main_thread_only_;
main_thread_only()438   MainThreadOnly& main_thread_only() {
439     DCHECK(main_thread_checker_.CalledOnValidThread());
440     return main_thread_only_;
441   }
main_thread_only()442   const MainThreadOnly& main_thread_only() const {
443     DCHECK(main_thread_checker_.CalledOnValidThread());
444     return main_thread_only_;
445   }
446 
447   mutable Lock immediate_incoming_queue_lock_;
448   TaskDeque immediate_incoming_queue_;
immediate_incoming_queue()449   TaskDeque& immediate_incoming_queue() {
450     immediate_incoming_queue_lock_.AssertAcquired();
451     return immediate_incoming_queue_;
452   }
immediate_incoming_queue()453   const TaskDeque& immediate_incoming_queue() const {
454     immediate_incoming_queue_lock_.AssertAcquired();
455     return immediate_incoming_queue_;
456   }
457 
458   // Protected by SequenceManagerImpl's AnyThread lock.
459   IncomingImmediateWorkList immediate_work_list_storage_;
460 
461   const bool should_monitor_quiescence_;
462   const bool should_notify_observers_;
463 
464   DISALLOW_COPY_AND_ASSIGN(TaskQueueImpl);
465 };
466 
467 }  // namespace internal
468 }  // namespace sequence_manager
469 }  // namespace base
470 
471 #endif  // BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
472