1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/incoming_task_queue.h"
6 
7 #include <limits>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/callback_helpers.h"
12 #include "base/location.h"
13 #include "base/metrics/histogram_macros.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/time/time.h"
16 #include "build/build_config.h"
17 
18 namespace base {
19 namespace internal {
20 
21 namespace {
22 
23 #if DCHECK_IS_ON()
24 // Delays larger than this are often bogus, and a warning should be emitted in
25 // debug builds to warn developers.  http://crbug.com/450045
26 constexpr TimeDelta kTaskDelayWarningThreshold = TimeDelta::FromDays(14);
27 #endif
28 
CalculateDelayedRuntime(TimeDelta delay)29 TimeTicks CalculateDelayedRuntime(TimeDelta delay) {
30   TimeTicks delayed_run_time;
31   if (delay > TimeDelta())
32     delayed_run_time = TimeTicks::Now() + delay;
33   else
34     DCHECK_EQ(delay.InMilliseconds(), 0) << "delay should not be negative";
35   return delayed_run_time;
36 }
37 
38 }  // namespace
39 
IncomingTaskQueue(std::unique_ptr<Observer> task_queue_observer)40 IncomingTaskQueue::IncomingTaskQueue(
41     std::unique_ptr<Observer> task_queue_observer)
42     : task_queue_observer_(std::move(task_queue_observer)),
43       triage_tasks_(this) {
44   // The constructing sequence is not necessarily the running sequence, e.g. in
45   // the case of a MessageLoop created unbound.
46   DETACH_FROM_SEQUENCE(sequence_checker_);
47 }
48 
49 IncomingTaskQueue::~IncomingTaskQueue() = default;
50 
AddToIncomingQueue(const Location & from_here,OnceClosure task,TimeDelta delay,Nestable nestable)51 bool IncomingTaskQueue::AddToIncomingQueue(const Location& from_here,
52                                            OnceClosure task,
53                                            TimeDelta delay,
54                                            Nestable nestable) {
55   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
56   // for details.
57   CHECK(task);
58   DLOG_IF(WARNING, delay > kTaskDelayWarningThreshold)
59       << "Requesting super-long task delay period of " << delay.InSeconds()
60       << " seconds from here: " << from_here.ToString();
61 
62   PendingTask pending_task(from_here, std::move(task),
63                            CalculateDelayedRuntime(delay), nestable);
64 #if defined(OS_WIN)
65   // We consider the task needs a high resolution timer if the delay is
66   // more than 0 and less than 32ms. This caps the relative error to
67   // less than 50% : a 33ms wait can wake at 48ms since the default
68   // resolution on Windows is between 10 and 15ms.
69   if (delay > TimeDelta() &&
70       delay.InMilliseconds() < (2 * Time::kMinLowResolutionThresholdMs)) {
71     pending_task.is_high_res = true;
72   }
73 #endif
74 
75   if (!delay.is_zero())
76     UMA_HISTOGRAM_LONG_TIMES("MessageLoop.DelayedTaskQueue.PostedDelay", delay);
77 
78   return PostPendingTask(&pending_task);
79 }
80 
Shutdown()81 void IncomingTaskQueue::Shutdown() {
82   AutoLock auto_lock(incoming_queue_lock_);
83   accept_new_tasks_ = false;
84 }
85 
ReportMetricsOnIdle() const86 void IncomingTaskQueue::ReportMetricsOnIdle() const {
87   UMA_HISTOGRAM_COUNTS_1M(
88       "MessageLoop.DelayedTaskQueue.PendingTasksCountOnIdle",
89       delayed_tasks_.Size());
90 }
91 
TriageQueue(IncomingTaskQueue * outer)92 IncomingTaskQueue::TriageQueue::TriageQueue(IncomingTaskQueue* outer)
93     : outer_(outer) {}
94 
95 IncomingTaskQueue::TriageQueue::~TriageQueue() = default;
96 
Peek()97 const PendingTask& IncomingTaskQueue::TriageQueue::Peek() {
98   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
99   ReloadFromIncomingQueueIfEmpty();
100   DCHECK(!queue_.empty());
101   return queue_.front();
102 }
103 
Pop()104 PendingTask IncomingTaskQueue::TriageQueue::Pop() {
105   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
106   ReloadFromIncomingQueueIfEmpty();
107   DCHECK(!queue_.empty());
108   PendingTask pending_task = std::move(queue_.front());
109   queue_.pop();
110   return pending_task;
111 }
112 
HasTasks()113 bool IncomingTaskQueue::TriageQueue::HasTasks() {
114   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
115   ReloadFromIncomingQueueIfEmpty();
116   return !queue_.empty();
117 }
118 
Clear()119 void IncomingTaskQueue::TriageQueue::Clear() {
120   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
121 
122   // Clear() should be invoked before WillDestroyCurrentMessageLoop().
123   DCHECK(outer_->accept_new_tasks_);
124 
125   // Delete all currently pending tasks but not tasks potentially posted from
126   // their destructors. See ~MessageLoop() for the full logic mitigating against
127   // infite loops when clearing pending tasks. The ScopedClosureRunner below
128   // will be bound to a task posted at the end of the queue. After it is posted,
129   // tasks will be deleted one by one, when the bound ScopedClosureRunner is
130   // deleted and sets |deleted_all_originally_pending|, we know we've deleted
131   // all originally pending tasks.
132   bool deleted_all_originally_pending = false;
133   ScopedClosureRunner capture_deleted_all_originally_pending(BindOnce(
134       [](bool* deleted_all_originally_pending) {
135         *deleted_all_originally_pending = true;
136       },
137       Unretained(&deleted_all_originally_pending)));
138   outer_->AddToIncomingQueue(
139       FROM_HERE,
140       BindOnce([](ScopedClosureRunner) {},
141                std::move(capture_deleted_all_originally_pending)),
142       TimeDelta(), Nestable::kNestable);
143 
144   while (!deleted_all_originally_pending) {
145     PendingTask pending_task = Pop();
146 
147     if (!pending_task.delayed_run_time.is_null()) {
148       outer_->delayed_tasks().Push(std::move(pending_task));
149     }
150   }
151 }
152 
ReloadFromIncomingQueueIfEmpty()153 void IncomingTaskQueue::TriageQueue::ReloadFromIncomingQueueIfEmpty() {
154   DCHECK_CALLED_ON_VALID_SEQUENCE(outer_->sequence_checker_);
155   if (queue_.empty()) {
156     outer_->ReloadWorkQueue(&queue_);
157   }
158 }
159 
DelayedQueue()160 IncomingTaskQueue::DelayedQueue::DelayedQueue() {
161   DETACH_FROM_SEQUENCE(sequence_checker_);
162 }
163 
164 IncomingTaskQueue::DelayedQueue::~DelayedQueue() = default;
165 
Push(PendingTask pending_task)166 void IncomingTaskQueue::DelayedQueue::Push(PendingTask pending_task) {
167   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
168 
169   if (pending_task.is_high_res)
170     ++pending_high_res_tasks_;
171 
172   queue_.push(std::move(pending_task));
173 }
174 
Peek()175 const PendingTask& IncomingTaskQueue::DelayedQueue::Peek() {
176   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
177   DCHECK(!queue_.empty());
178   return queue_.top();
179 }
180 
Pop()181 PendingTask IncomingTaskQueue::DelayedQueue::Pop() {
182   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183   DCHECK(!queue_.empty());
184   PendingTask delayed_task = std::move(const_cast<PendingTask&>(queue_.top()));
185   queue_.pop();
186 
187   if (delayed_task.is_high_res)
188     --pending_high_res_tasks_;
189   DCHECK_GE(pending_high_res_tasks_, 0);
190 
191   return delayed_task;
192 }
193 
HasTasks()194 bool IncomingTaskQueue::DelayedQueue::HasTasks() {
195   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
196   // TODO(robliao): The other queues don't check for IsCancelled(). Should they?
197   while (!queue_.empty() && Peek().task.IsCancelled())
198     Pop();
199 
200   return !queue_.empty();
201 }
202 
Clear()203 void IncomingTaskQueue::DelayedQueue::Clear() {
204   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
205   while (!queue_.empty())
206     Pop();
207 }
208 
Size() const209 size_t IncomingTaskQueue::DelayedQueue::Size() const {
210   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
211   return queue_.size();
212 }
213 
DeferredQueue()214 IncomingTaskQueue::DeferredQueue::DeferredQueue() {
215   DETACH_FROM_SEQUENCE(sequence_checker_);
216 }
217 
218 IncomingTaskQueue::DeferredQueue::~DeferredQueue() = default;
219 
Push(PendingTask pending_task)220 void IncomingTaskQueue::DeferredQueue::Push(PendingTask pending_task) {
221   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
222   queue_.push(std::move(pending_task));
223 }
224 
Peek()225 const PendingTask& IncomingTaskQueue::DeferredQueue::Peek() {
226   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
227   DCHECK(!queue_.empty());
228   return queue_.front();
229 }
230 
Pop()231 PendingTask IncomingTaskQueue::DeferredQueue::Pop() {
232   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
233   DCHECK(!queue_.empty());
234   PendingTask deferred_task = std::move(queue_.front());
235   queue_.pop();
236   return deferred_task;
237 }
238 
HasTasks()239 bool IncomingTaskQueue::DeferredQueue::HasTasks() {
240   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
241   return !queue_.empty();
242 }
243 
Clear()244 void IncomingTaskQueue::DeferredQueue::Clear() {
245   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
246   while (!queue_.empty())
247     Pop();
248 }
249 
PostPendingTask(PendingTask * pending_task)250 bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
251   // Warning: Don't try to short-circuit, and handle this thread's tasks more
252   // directly, as it could starve handling of foreign threads.  Put every task
253   // into this queue.
254   bool accept_new_tasks;
255   bool was_empty = false;
256   {
257     AutoLock auto_lock(incoming_queue_lock_);
258     accept_new_tasks = accept_new_tasks_;
259     if (accept_new_tasks) {
260       was_empty =
261           PostPendingTaskLockRequired(pending_task) && triage_queue_empty_;
262     }
263   }
264 
265   if (!accept_new_tasks) {
266     // Clear the pending task outside of |incoming_queue_lock_| to prevent any
267     // chance of self-deadlock if destroying a task also posts a task to this
268     // queue.
269     pending_task->task.Reset();
270     return false;
271   }
272 
273   // Let |task_queue_observer_| know of the queued task. This is done outside
274   // |incoming_queue_lock_| to avoid conflating locks (DidQueueTask() can also
275   // use a lock).
276   task_queue_observer_->DidQueueTask(was_empty);
277 
278   return true;
279 }
280 
PostPendingTaskLockRequired(PendingTask * pending_task)281 bool IncomingTaskQueue::PostPendingTaskLockRequired(PendingTask* pending_task) {
282   incoming_queue_lock_.AssertAcquired();
283 
284   // Initialize the sequence number. The sequence number is used for delayed
285   // tasks (to facilitate FIFO sorting when two tasks have the same
286   // delayed_run_time value) and for identifying the task in about:tracing.
287   pending_task->sequence_num = next_sequence_num_++;
288 
289   task_queue_observer_->WillQueueTask(pending_task);
290 
291   bool was_empty = incoming_queue_.empty();
292   incoming_queue_.push(std::move(*pending_task));
293   return was_empty;
294 }
295 
ReloadWorkQueue(TaskQueue * work_queue)296 void IncomingTaskQueue::ReloadWorkQueue(TaskQueue* work_queue) {
297   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
298 
299   // Make sure no tasks are lost.
300   DCHECK(work_queue->empty());
301 
302   // Acquire all we can from the inter-thread queue with one lock acquisition.
303   AutoLock lock(incoming_queue_lock_);
304   incoming_queue_.swap(*work_queue);
305   triage_queue_empty_ = work_queue->empty();
306 }
307 
308 }  // namespace internal
309 }  // namespace base
310