1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
6 #define BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
7 
8 #include "base/base_export.h"
9 #include "base/callback.h"
10 #include "base/macros.h"
11 #include "base/memory/ref_counted.h"
12 #include "base/pending_task.h"
13 #include "base/sequence_checker.h"
14 #include "base/synchronization/lock.h"
15 #include "base/time/time.h"
16 
17 namespace base {
18 
19 class BasicPostTaskPerfTest;
20 
21 namespace internal {
22 
23 // Implements a queue of tasks posted to the message loop running on the current
24 // thread. This class takes care of synchronizing posting tasks from different
25 // threads and together with MessageLoop ensures clean shutdown.
26 class BASE_EXPORT IncomingTaskQueue
27     : public RefCountedThreadSafe<IncomingTaskQueue> {
28  public:
29   // TODO(gab): Move this to SequencedTaskSource::Observer in
30   // https://chromium-review.googlesource.com/c/chromium/src/+/1088762.
31   class Observer {
32    public:
33     virtual ~Observer() = default;
34 
35     // Notifies this Observer that it is about to enqueue |task|. The Observer
36     // may alter |task| as a result (e.g. add metadata to the PendingTask
37     // struct). This may be called while holding a lock and shouldn't perform
38     // logic requiring synchronization (override DidQueueTask() for that).
39     virtual void WillQueueTask(PendingTask* task) = 0;
40 
41     // Notifies this Observer that a task was queued in the IncomingTaskQueue it
42     // observes. |was_empty| is true if the task source was empty (i.e.
43     // |!HasTasks()|) before this task was posted. DidQueueTask() can be invoked
44     // from any thread.
45     virtual void DidQueueTask(bool was_empty) = 0;
46   };
47 
48   // Provides a read and remove only view into a task queue.
49   class ReadAndRemoveOnlyQueue {
50    public:
51     ReadAndRemoveOnlyQueue() = default;
52     virtual ~ReadAndRemoveOnlyQueue() = default;
53 
54     // Returns the next task. HasTasks() is assumed to be true.
55     virtual const PendingTask& Peek() = 0;
56 
57     // Removes and returns the next task. HasTasks() is assumed to be true.
58     virtual PendingTask Pop() = 0;
59 
60     // Whether this queue has tasks.
61     virtual bool HasTasks() = 0;
62 
63     // Removes all tasks.
64     virtual void Clear() = 0;
65 
66    private:
67     DISALLOW_COPY_AND_ASSIGN(ReadAndRemoveOnlyQueue);
68   };
69 
70   // Provides a read-write task queue.
71   class Queue : public ReadAndRemoveOnlyQueue {
72    public:
73     Queue() = default;
74     ~Queue() override = default;
75 
76     // Adds the task to the end of the queue.
77     virtual void Push(PendingTask pending_task) = 0;
78 
79    private:
80     DISALLOW_COPY_AND_ASSIGN(Queue);
81   };
82 
83   // Constructs an IncomingTaskQueue which will invoke |task_queue_observer|
84   // when tasks are queued. |task_queue_observer| will be bound to this
85   // IncomingTaskQueue's lifetime. Ownership is required as opposed to a raw
86   // pointer since IncomingTaskQueue is ref-counted. For the same reasons,
87   // |task_queue_observer| needs to support being invoked racily during
88   // shutdown).
89   explicit IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer);
90 
91   // Appends a task to the incoming queue. Posting of all tasks is routed though
92   // AddToIncomingQueue() or TryAddToIncomingQueue() to make sure that posting
93   // task is properly synchronized between different threads.
94   //
95   // Returns true if the task was successfully added to the queue, otherwise
96   // returns false. In all cases, the ownership of |task| is transferred to the
97   // called method.
98   bool AddToIncomingQueue(const Location& from_here,
99                           OnceClosure task,
100                           TimeDelta delay,
101                           Nestable nestable);
102 
103   // Instructs this IncomingTaskQueue to stop accepting tasks, this cannot be
104   // undone. Note that the registered IncomingTaskQueue::Observer may still
105   // racily receive a few DidQueueTask() calls while the Shutdown() signal
106   // propagates to other threads and it needs to support that.
107   void Shutdown();
108 
triage_tasks()109   ReadAndRemoveOnlyQueue& triage_tasks() { return triage_tasks_; }
110 
delayed_tasks()111   Queue& delayed_tasks() { return delayed_tasks_; }
112 
deferred_tasks()113   Queue& deferred_tasks() { return deferred_tasks_; }
114 
HasPendingHighResolutionTasks()115   bool HasPendingHighResolutionTasks() const {
116     DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
117     return delayed_tasks_.HasPendingHighResolutionTasks();
118   }
119 
120   // Reports UMA metrics about its queues before the MessageLoop goes to sleep
121   // per being idle.
122   void ReportMetricsOnIdle() const;
123 
124  private:
125   friend class base::BasicPostTaskPerfTest;
126   friend class RefCountedThreadSafe<IncomingTaskQueue>;
127 
128   // These queues below support the previous MessageLoop behavior of
129   // maintaining three queue queues to process tasks:
130   //
131   // TriageQueue
132   // The first queue to receive all tasks for the processing sequence (when
133   // reloading from the thread-safe |incoming_queue_|). Tasks are generally
134   // either dispatched immediately or sent to the queues below.
135   //
136   // DelayedQueue
137   // The queue for holding tasks that should be run later and sorted by expected
138   // run time.
139   //
140   // DeferredQueue
141   // The queue for holding tasks that couldn't be run while the MessageLoop was
142   // nested. These are generally processed during the idle stage.
143   //
144   // Many of these do not share implementations even though they look like they
145   // could because of small quirks (reloading semantics) or differing underlying
146   // data strucutre (TaskQueue vs DelayedTaskQueue).
147 
148   // The starting point for all tasks on the sequence processing the tasks.
149   class TriageQueue : public ReadAndRemoveOnlyQueue {
150    public:
151     TriageQueue(IncomingTaskQueue* outer);
152     ~TriageQueue() override;
153 
154     // ReadAndRemoveOnlyQueue:
155     // The methods below will attempt to reload from the incoming queue if the
156     // queue itself is empty (Clear() has special logic to reload only once
157     // should destructors post more tasks).
158     const PendingTask& Peek() override;
159     PendingTask Pop() override;
160     // Whether this queue has tasks after reloading from the incoming queue.
161     bool HasTasks() override;
162     void Clear() override;
163 
164    private:
165     void ReloadFromIncomingQueueIfEmpty();
166 
167     IncomingTaskQueue* const outer_;
168     TaskQueue queue_;
169 
170     DISALLOW_COPY_AND_ASSIGN(TriageQueue);
171   };
172 
173   class DelayedQueue : public Queue {
174    public:
175     DelayedQueue();
176     ~DelayedQueue() override;
177 
178     // Queue:
179     const PendingTask& Peek() override;
180     PendingTask Pop() override;
181     // Whether this queue has tasks after sweeping the cancelled ones in front.
182     bool HasTasks() override;
183     void Clear() override;
184     void Push(PendingTask pending_task) override;
185 
186     size_t Size() const;
HasPendingHighResolutionTasks()187     bool HasPendingHighResolutionTasks() const {
188       return pending_high_res_tasks_ > 0;
189     }
190 
191    private:
192     DelayedTaskQueue queue_;
193 
194     // Number of high resolution tasks in |queue_|.
195     int pending_high_res_tasks_ = 0;
196 
197     SEQUENCE_CHECKER(sequence_checker_);
198 
199     DISALLOW_COPY_AND_ASSIGN(DelayedQueue);
200   };
201 
202   class DeferredQueue : public Queue {
203    public:
204     DeferredQueue();
205     ~DeferredQueue() override;
206 
207     // Queue:
208     const PendingTask& Peek() override;
209     PendingTask Pop() override;
210     bool HasTasks() override;
211     void Clear() override;
212     void Push(PendingTask pending_task) override;
213 
214    private:
215     TaskQueue queue_;
216 
217     SEQUENCE_CHECKER(sequence_checker_);
218 
219     DISALLOW_COPY_AND_ASSIGN(DeferredQueue);
220   };
221 
222   virtual ~IncomingTaskQueue();
223 
224   // Adds a task to |incoming_queue_|. The caller retains ownership of
225   // |pending_task|, but this function will reset the value of
226   // |pending_task->task|. This is needed to ensure that the posting call stack
227   // does not retain |pending_task->task| beyond this function call.
228   bool PostPendingTask(PendingTask* pending_task);
229 
230   // Does the real work of posting a pending task. Returns true if
231   // |incoming_queue_| was empty before |pending_task| was posted.
232   bool PostPendingTaskLockRequired(PendingTask* pending_task);
233 
234   // Loads tasks from the |incoming_queue_| into |*work_queue|. Must be called
235   // from the sequence processing the tasks.
236   void ReloadWorkQueue(TaskQueue* work_queue);
237 
238   // Checks calls made only on the MessageLoop thread.
239   SEQUENCE_CHECKER(sequence_checker_);
240 
241   const std::unique_ptr<Observer> task_queue_observer_;
242 
243   // Queue for initial triaging of tasks on the |sequence_checker_| sequence.
244   TriageQueue triage_tasks_;
245 
246   // Queue for delayed tasks on the |sequence_checker_| sequence.
247   DelayedQueue delayed_tasks_;
248 
249   // Queue for non-nestable deferred tasks on the |sequence_checker_| sequence.
250   DeferredQueue deferred_tasks_;
251 
252   // Synchronizes access to all members below this line.
253   base::Lock incoming_queue_lock_;
254 
255   // An incoming queue of tasks that are acquired under a mutex for processing
256   // on this instance's thread. These tasks have not yet been been pushed to
257   // |triage_tasks_|.
258   TaskQueue incoming_queue_;
259 
260   // True if new tasks should be accepted.
261   bool accept_new_tasks_ = true;
262 
263   // The next sequence number to use for delayed tasks.
264   int next_sequence_num_ = 0;
265 
266   // True if the outgoing queue (|triage_tasks_|) is empty. Toggled under
267   // |incoming_queue_lock_| in ReloadWorkQueue() so that
268   // PostPendingTaskLockRequired() can tell, without accessing the thread unsafe
269   // |triage_tasks_|, if the IncomingTaskQueue has been made non-empty by a
270   // PostTask() (and needs to inform its Observer).
271   bool triage_queue_empty_ = true;
272 
273   DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
274 };
275 
276 }  // namespace internal
277 }  // namespace base
278 
279 #endif  // BASE_MESSAGE_LOOP_INCOMING_TASK_QUEUE_H_
280