1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task/sequence_manager/task_queue_impl.h"
6 
7 #include <memory>
8 #include <utility>
9 
10 #include "base/strings/stringprintf.h"
11 #include "base/task/sequence_manager/sequence_manager_impl.h"
12 #include "base/task/sequence_manager/time_domain.h"
13 #include "base/task/sequence_manager/work_queue.h"
14 #include "base/time/time.h"
15 #include "base/trace_event/blame_context.h"
16 
17 namespace base {
18 namespace sequence_manager {
19 
20 // static
PriorityToString(TaskQueue::QueuePriority priority)21 const char* TaskQueue::PriorityToString(TaskQueue::QueuePriority priority) {
22   switch (priority) {
23     case kControlPriority:
24       return "control";
25     case kHighestPriority:
26       return "highest";
27     case kHighPriority:
28       return "high";
29     case kNormalPriority:
30       return "normal";
31     case kLowPriority:
32       return "low";
33     case kBestEffortPriority:
34       return "best_effort";
35     default:
36       NOTREACHED();
37       return nullptr;
38   }
39 }
40 
41 namespace internal {
42 
TaskQueueImpl(SequenceManagerImpl * sequence_manager,TimeDomain * time_domain,const TaskQueue::Spec & spec)43 TaskQueueImpl::TaskQueueImpl(SequenceManagerImpl* sequence_manager,
44                              TimeDomain* time_domain,
45                              const TaskQueue::Spec& spec)
46     : name_(spec.name),
47       thread_id_(PlatformThread::CurrentId()),
48       any_thread_(sequence_manager, time_domain),
49       main_thread_only_(sequence_manager, this, time_domain),
50       should_monitor_quiescence_(spec.should_monitor_quiescence),
51       should_notify_observers_(spec.should_notify_observers) {
52   DCHECK(time_domain);
53 }
54 
~TaskQueueImpl()55 TaskQueueImpl::~TaskQueueImpl() {
56 #if DCHECK_IS_ON()
57   AutoLock lock(any_thread_lock_);
58   // NOTE this check shouldn't fire because |SequenceManagerImpl::queues_|
59   // contains a strong reference to this TaskQueueImpl and the
60   // SequenceManagerImpl destructor calls UnregisterTaskQueue on all task
61   // queues.
62   DCHECK(!any_thread().sequence_manager)
63       << "UnregisterTaskQueue must be called first!";
64 #endif
65 }
66 
PostTaskResult()67 TaskQueueImpl::PostTaskResult::PostTaskResult()
68     : success(false), task(OnceClosure(), Location()) {}
69 
PostTaskResult(bool success,TaskQueue::PostedTask task)70 TaskQueueImpl::PostTaskResult::PostTaskResult(bool success,
71                                               TaskQueue::PostedTask task)
72     : success(success), task(std::move(task)) {}
73 
PostTaskResult(PostTaskResult && move_from)74 TaskQueueImpl::PostTaskResult::PostTaskResult(PostTaskResult&& move_from)
75     : success(move_from.success), task(std::move(move_from.task)) {}
76 
77 TaskQueueImpl::PostTaskResult::~PostTaskResult() = default;
78 
Success()79 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Success() {
80   return PostTaskResult(true, TaskQueue::PostedTask(OnceClosure(), Location()));
81 }
82 
Fail(TaskQueue::PostedTask task)83 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostTaskResult::Fail(
84     TaskQueue::PostedTask task) {
85   return PostTaskResult(false, std::move(task));
86 }
87 
Task(TaskQueue::PostedTask task,TimeTicks desired_run_time,EnqueueOrder sequence_number)88 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
89                           TimeTicks desired_run_time,
90                           EnqueueOrder sequence_number)
91     : TaskQueue::Task(std::move(task), desired_run_time) {
92   // It might wrap around to a negative number but it's handled properly.
93   sequence_num = static_cast<int>(sequence_number);
94 }
95 
Task(TaskQueue::PostedTask task,TimeTicks desired_run_time,EnqueueOrder sequence_number,EnqueueOrder enqueue_order)96 TaskQueueImpl::Task::Task(TaskQueue::PostedTask task,
97                           TimeTicks desired_run_time,
98                           EnqueueOrder sequence_number,
99                           EnqueueOrder enqueue_order)
100     : TaskQueue::Task(std::move(task), desired_run_time),
101       enqueue_order_(enqueue_order) {
102   // It might wrap around to a negative number but it's handled properly.
103   sequence_num = static_cast<int>(sequence_number);
104 }
105 
AnyThread(SequenceManagerImpl * sequence_manager,TimeDomain * time_domain)106 TaskQueueImpl::AnyThread::AnyThread(SequenceManagerImpl* sequence_manager,
107                                     TimeDomain* time_domain)
108     : sequence_manager(sequence_manager), time_domain(time_domain) {}
109 
110 TaskQueueImpl::AnyThread::~AnyThread() = default;
111 
MainThreadOnly(SequenceManagerImpl * sequence_manager,TaskQueueImpl * task_queue,TimeDomain * time_domain)112 TaskQueueImpl::MainThreadOnly::MainThreadOnly(
113     SequenceManagerImpl* sequence_manager,
114     TaskQueueImpl* task_queue,
115     TimeDomain* time_domain)
116     : sequence_manager(sequence_manager),
117       time_domain(time_domain),
118       delayed_work_queue(
119           new WorkQueue(task_queue, "delayed", WorkQueue::QueueType::kDelayed)),
120       immediate_work_queue(new WorkQueue(task_queue,
121                                          "immediate",
122                                          WorkQueue::QueueType::kImmediate)),
123       set_index(0),
124       is_enabled_refcount(0),
125       voter_refcount(0),
126       blame_context(nullptr),
127       is_enabled_for_test(true) {}
128 
129 TaskQueueImpl::MainThreadOnly::~MainThreadOnly() = default;
130 
UnregisterTaskQueue()131 void TaskQueueImpl::UnregisterTaskQueue() {
132   TaskDeque immediate_incoming_queue;
133 
134   {
135     AutoLock lock(any_thread_lock_);
136     AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
137 
138     if (main_thread_only().time_domain)
139       main_thread_only().time_domain->UnregisterQueue(this);
140 
141     if (!any_thread().sequence_manager)
142       return;
143 
144     main_thread_only().on_task_completed_handler = OnTaskCompletedHandler();
145     any_thread().time_domain = nullptr;
146     main_thread_only().time_domain = nullptr;
147 
148     any_thread().sequence_manager = nullptr;
149     main_thread_only().sequence_manager = nullptr;
150     any_thread().on_next_wake_up_changed_callback =
151         OnNextWakeUpChangedCallback();
152     main_thread_only().on_next_wake_up_changed_callback =
153         OnNextWakeUpChangedCallback();
154     immediate_incoming_queue.swap(immediate_incoming_queue_);
155   }
156 
157   // It is possible for a task to hold a scoped_refptr to this, which
158   // will lead to TaskQueueImpl destructor being called when deleting a task.
159   // To avoid use-after-free, we need to clear all fields of a task queue
160   // before starting to delete the tasks.
161   // All work queues and priority queues containing tasks should be moved to
162   // local variables on stack (std::move for unique_ptrs and swap for queues)
163   // before clearing them and deleting tasks.
164 
165   // Flush the queues outside of the lock because TSAN complains about a lock
166   // order inversion for tasks that are posted from within a lock, with a
167   // destructor that acquires the same lock.
168 
169   std::priority_queue<Task> delayed_incoming_queue;
170   delayed_incoming_queue.swap(main_thread_only().delayed_incoming_queue);
171 
172   std::unique_ptr<WorkQueue> immediate_work_queue =
173       std::move(main_thread_only().immediate_work_queue);
174   std::unique_ptr<WorkQueue> delayed_work_queue =
175       std::move(main_thread_only().delayed_work_queue);
176 }
177 
GetName() const178 const char* TaskQueueImpl::GetName() const {
179   return name_;
180 }
181 
RunsTasksInCurrentSequence() const182 bool TaskQueueImpl::RunsTasksInCurrentSequence() const {
183   return PlatformThread::CurrentId() == thread_id_;
184 }
185 
PostDelayedTask(TaskQueue::PostedTask task)186 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTask(
187     TaskQueue::PostedTask task) {
188   if (task.delay.is_zero())
189     return PostImmediateTaskImpl(std::move(task));
190 
191   return PostDelayedTaskImpl(std::move(task));
192 }
193 
PostImmediateTaskImpl(TaskQueue::PostedTask task)194 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostImmediateTaskImpl(
195     TaskQueue::PostedTask task) {
196   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
197   // for details.
198   CHECK(task.callback);
199   AutoLock lock(any_thread_lock_);
200   if (!any_thread().sequence_manager)
201     return PostTaskResult::Fail(std::move(task));
202 
203   EnqueueOrder sequence_number =
204       any_thread().sequence_manager->GetNextSequenceNumber();
205 
206   PushOntoImmediateIncomingQueueLocked(Task(std::move(task),
207                                             any_thread().time_domain->Now(),
208                                             sequence_number, sequence_number));
209   return PostTaskResult::Success();
210 }
211 
PostDelayedTaskImpl(TaskQueue::PostedTask task)212 TaskQueueImpl::PostTaskResult TaskQueueImpl::PostDelayedTaskImpl(
213     TaskQueue::PostedTask task) {
214   // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167
215   // for details.
216   CHECK(task.callback);
217   DCHECK_GT(task.delay, TimeDelta());
218   if (PlatformThread::CurrentId() == thread_id_) {
219     // Lock-free fast path for delayed tasks posted from the main thread.
220     if (!main_thread_only().sequence_manager)
221       return PostTaskResult::Fail(std::move(task));
222 
223     EnqueueOrder sequence_number =
224         main_thread_only().sequence_manager->GetNextSequenceNumber();
225 
226     TimeTicks time_domain_now = main_thread_only().time_domain->Now();
227     TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
228     PushOntoDelayedIncomingQueueFromMainThread(
229         Task(std::move(task), time_domain_delayed_run_time, sequence_number),
230         time_domain_now);
231   } else {
232     // NOTE posting a delayed task from a different thread is not expected to
233     // be common. This pathway is less optimal than perhaps it could be
234     // because it causes two main thread tasks to be run.  Should this
235     // assumption prove to be false in future, we may need to revisit this.
236     AutoLock lock(any_thread_lock_);
237     if (!any_thread().sequence_manager)
238       return PostTaskResult::Fail(std::move(task));
239 
240     EnqueueOrder sequence_number =
241         any_thread().sequence_manager->GetNextSequenceNumber();
242 
243     TimeTicks time_domain_now = any_thread().time_domain->Now();
244     TimeTicks time_domain_delayed_run_time = time_domain_now + task.delay;
245     PushOntoDelayedIncomingQueueLocked(
246         Task(std::move(task), time_domain_delayed_run_time, sequence_number));
247   }
248   return PostTaskResult::Success();
249 }
250 
PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,TimeTicks now)251 void TaskQueueImpl::PushOntoDelayedIncomingQueueFromMainThread(
252     Task pending_task,
253     TimeTicks now) {
254   main_thread_only().sequence_manager->WillQueueTask(&pending_task);
255   main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
256 
257   LazyNow lazy_now(now);
258   UpdateDelayedWakeUp(&lazy_now);
259 
260   TraceQueueSize();
261 }
262 
PushOntoDelayedIncomingQueueLocked(Task pending_task)263 void TaskQueueImpl::PushOntoDelayedIncomingQueueLocked(Task pending_task) {
264   any_thread().sequence_manager->WillQueueTask(&pending_task);
265 
266   EnqueueOrder thread_hop_task_sequence_number =
267       any_thread().sequence_manager->GetNextSequenceNumber();
268   // TODO(altimin): Add a copy method to Task to capture metadata here.
269   PushOntoImmediateIncomingQueueLocked(Task(
270       TaskQueue::PostedTask(BindOnce(&TaskQueueImpl::ScheduleDelayedWorkTask,
271                                      Unretained(this), std::move(pending_task)),
272                             FROM_HERE, TimeDelta(), Nestable::kNonNestable,
273                             pending_task.task_type()),
274       TimeTicks(), thread_hop_task_sequence_number,
275       thread_hop_task_sequence_number));
276 }
277 
ScheduleDelayedWorkTask(Task pending_task)278 void TaskQueueImpl::ScheduleDelayedWorkTask(Task pending_task) {
279   DCHECK(main_thread_checker_.CalledOnValidThread());
280   TimeTicks delayed_run_time = pending_task.delayed_run_time;
281   TimeTicks time_domain_now = main_thread_only().time_domain->Now();
282   if (delayed_run_time <= time_domain_now) {
283     // If |delayed_run_time| is in the past then push it onto the work queue
284     // immediately. To ensure the right task ordering we need to temporarily
285     // push it onto the |delayed_incoming_queue|.
286     delayed_run_time = time_domain_now;
287     pending_task.delayed_run_time = time_domain_now;
288     main_thread_only().delayed_incoming_queue.push(std::move(pending_task));
289     LazyNow lazy_now(time_domain_now);
290     WakeUpForDelayedWork(&lazy_now);
291   } else {
292     // If |delayed_run_time| is in the future we can queue it as normal.
293     PushOntoDelayedIncomingQueueFromMainThread(std::move(pending_task),
294                                                time_domain_now);
295   }
296   TraceQueueSize();
297 }
298 
PushOntoImmediateIncomingQueueLocked(Task task)299 void TaskQueueImpl::PushOntoImmediateIncomingQueueLocked(Task task) {
300   // If the |immediate_incoming_queue| is empty we need a DoWork posted to make
301   // it run.
302   bool was_immediate_incoming_queue_empty;
303 
304   EnqueueOrder sequence_number = task.enqueue_order();
305   TimeTicks desired_run_time = task.delayed_run_time;
306 
307   {
308     AutoLock lock(immediate_incoming_queue_lock_);
309     was_immediate_incoming_queue_empty = immediate_incoming_queue().empty();
310     any_thread().sequence_manager->WillQueueTask(&task);
311     immediate_incoming_queue().push_back(std::move(task));
312   }
313 
314   if (was_immediate_incoming_queue_empty) {
315     // However there's no point posting a DoWork for a blocked queue. NB we can
316     // only tell if it's disabled from the main thread.
317     bool queue_is_blocked =
318         RunsTasksInCurrentSequence() &&
319         (!IsQueueEnabled() || main_thread_only().current_fence);
320     any_thread().sequence_manager->OnQueueHasIncomingImmediateWork(
321         this, sequence_number, queue_is_blocked);
322     if (!any_thread().on_next_wake_up_changed_callback.is_null())
323       any_thread().on_next_wake_up_changed_callback.Run(desired_run_time);
324   }
325 
326   TraceQueueSize();
327 }
328 
ReloadImmediateWorkQueueIfEmpty()329 void TaskQueueImpl::ReloadImmediateWorkQueueIfEmpty() {
330   if (!main_thread_only().immediate_work_queue->Empty())
331     return;
332 
333   main_thread_only().immediate_work_queue->ReloadEmptyImmediateQueue();
334 }
335 
ReloadEmptyImmediateQueue(TaskDeque * queue)336 void TaskQueueImpl::ReloadEmptyImmediateQueue(TaskDeque* queue) {
337   DCHECK(queue->empty());
338 
339   AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
340   queue->swap(immediate_incoming_queue());
341 
342   // Activate delayed fence if necessary. This is ideologically similar to
343   // ActivateDelayedFenceIfNeeded, but due to immediate tasks being posted
344   // from any thread we can't generate an enqueue order for the fence there,
345   // so we have to check all immediate tasks and use their enqueue order for
346   // a fence.
347   if (main_thread_only().delayed_fence) {
348     for (const Task& task : *queue) {
349       if (task.delayed_run_time >= main_thread_only().delayed_fence.value()) {
350         main_thread_only().delayed_fence = nullopt;
351         DCHECK(!main_thread_only().current_fence);
352         main_thread_only().current_fence = task.enqueue_order();
353         // Do not trigger WorkQueueSets notification when taking incoming
354         // immediate queue.
355         main_thread_only().immediate_work_queue->InsertFenceSilently(
356             main_thread_only().current_fence);
357         main_thread_only().delayed_work_queue->InsertFenceSilently(
358             main_thread_only().current_fence);
359         break;
360       }
361     }
362   }
363 }
364 
IsEmpty() const365 bool TaskQueueImpl::IsEmpty() const {
366   if (!main_thread_only().delayed_work_queue->Empty() ||
367       !main_thread_only().delayed_incoming_queue.empty() ||
368       !main_thread_only().immediate_work_queue->Empty()) {
369     return false;
370   }
371 
372   AutoLock lock(immediate_incoming_queue_lock_);
373   return immediate_incoming_queue().empty();
374 }
375 
GetNumberOfPendingTasks() const376 size_t TaskQueueImpl::GetNumberOfPendingTasks() const {
377   size_t task_count = 0;
378   task_count += main_thread_only().delayed_work_queue->Size();
379   task_count += main_thread_only().delayed_incoming_queue.size();
380   task_count += main_thread_only().immediate_work_queue->Size();
381 
382   AutoLock lock(immediate_incoming_queue_lock_);
383   task_count += immediate_incoming_queue().size();
384   return task_count;
385 }
386 
HasTaskToRunImmediately() const387 bool TaskQueueImpl::HasTaskToRunImmediately() const {
388   // Any work queue tasks count as immediate work.
389   if (!main_thread_only().delayed_work_queue->Empty() ||
390       !main_thread_only().immediate_work_queue->Empty()) {
391     return true;
392   }
393 
394   // Tasks on |delayed_incoming_queue| that could run now, count as
395   // immediate work.
396   if (!main_thread_only().delayed_incoming_queue.empty() &&
397       main_thread_only().delayed_incoming_queue.top().delayed_run_time <=
398           main_thread_only().time_domain->CreateLazyNow().Now()) {
399     return true;
400   }
401 
402   // Finally tasks on |immediate_incoming_queue| count as immediate work.
403   AutoLock lock(immediate_incoming_queue_lock_);
404   return !immediate_incoming_queue().empty();
405 }
406 
407 Optional<TaskQueueImpl::DelayedWakeUp>
GetNextScheduledWakeUpImpl()408 TaskQueueImpl::GetNextScheduledWakeUpImpl() {
409   // Note we don't scheduled a wake-up for disabled queues.
410   if (main_thread_only().delayed_incoming_queue.empty() || !IsQueueEnabled())
411     return nullopt;
412 
413   return main_thread_only().delayed_incoming_queue.top().delayed_wake_up();
414 }
415 
GetNextScheduledWakeUp()416 Optional<TimeTicks> TaskQueueImpl::GetNextScheduledWakeUp() {
417   Optional<DelayedWakeUp> wake_up = GetNextScheduledWakeUpImpl();
418   if (!wake_up)
419     return nullopt;
420   return wake_up->time;
421 }
422 
WakeUpForDelayedWork(LazyNow * lazy_now)423 void TaskQueueImpl::WakeUpForDelayedWork(LazyNow* lazy_now) {
424   // Enqueue all delayed tasks that should be running now, skipping any that
425   // have been canceled.
426   while (!main_thread_only().delayed_incoming_queue.empty()) {
427     Task& task =
428         const_cast<Task&>(main_thread_only().delayed_incoming_queue.top());
429     if (!task.task || task.task.IsCancelled()) {
430       main_thread_only().delayed_incoming_queue.pop();
431       continue;
432     }
433     if (task.delayed_run_time > lazy_now->Now())
434       break;
435     ActivateDelayedFenceIfNeeded(task.delayed_run_time);
436     task.set_enqueue_order(
437         main_thread_only().sequence_manager->GetNextSequenceNumber());
438     main_thread_only().delayed_work_queue->Push(std::move(task));
439     main_thread_only().delayed_incoming_queue.pop();
440 
441     // Normally WakeUpForDelayedWork is called inside DoWork, but it also
442     // can be called elsewhere (e.g. tests and fast-path for posting
443     // delayed tasks). Ensure that there is a DoWork posting. No-op inside
444     // existing DoWork due to DoWork deduplication.
445     if (IsQueueEnabled() || !main_thread_only().current_fence) {
446       main_thread_only().sequence_manager->MaybeScheduleImmediateWork(
447           FROM_HERE);
448     }
449   }
450 
451   UpdateDelayedWakeUp(lazy_now);
452 }
453 
TraceQueueSize() const454 void TaskQueueImpl::TraceQueueSize() const {
455   bool is_tracing;
456   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
457       TRACE_DISABLED_BY_DEFAULT("sequence_manager"), &is_tracing);
458   if (!is_tracing)
459     return;
460 
461   // It's only safe to access the work queues from the main thread.
462   // TODO(alexclarke): We should find another way of tracing this
463   if (PlatformThread::CurrentId() != thread_id_)
464     return;
465 
466   AutoLock lock(immediate_incoming_queue_lock_);
467   TRACE_COUNTER1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"), GetName(),
468                  immediate_incoming_queue().size() +
469                      main_thread_only().immediate_work_queue->Size() +
470                      main_thread_only().delayed_work_queue->Size() +
471                      main_thread_only().delayed_incoming_queue.size());
472 }
473 
SetQueuePriority(TaskQueue::QueuePriority priority)474 void TaskQueueImpl::SetQueuePriority(TaskQueue::QueuePriority priority) {
475   if (!main_thread_only().sequence_manager || priority == GetQueuePriority())
476     return;
477   main_thread_only()
478       .sequence_manager->main_thread_only()
479       .selector.SetQueuePriority(this, priority);
480 }
481 
GetQueuePriority() const482 TaskQueue::QueuePriority TaskQueueImpl::GetQueuePriority() const {
483   size_t set_index = immediate_work_queue()->work_queue_set_index();
484   DCHECK_EQ(set_index, delayed_work_queue()->work_queue_set_index());
485   return static_cast<TaskQueue::QueuePriority>(set_index);
486 }
487 
AsValueInto(TimeTicks now,trace_event::TracedValue * state) const488 void TaskQueueImpl::AsValueInto(TimeTicks now,
489                                 trace_event::TracedValue* state) const {
490   AutoLock lock(any_thread_lock_);
491   AutoLock immediate_incoming_queue_lock(immediate_incoming_queue_lock_);
492   state->BeginDictionary();
493   state->SetString("name", GetName());
494   if (!main_thread_only().sequence_manager) {
495     state->SetBoolean("unregistered", true);
496     state->EndDictionary();
497     return;
498   }
499   DCHECK(main_thread_only().time_domain);
500   DCHECK(main_thread_only().delayed_work_queue);
501   DCHECK(main_thread_only().immediate_work_queue);
502 
503   state->SetString(
504       "task_queue_id",
505       StringPrintf("0x%" PRIx64,
506                    static_cast<uint64_t>(reinterpret_cast<uintptr_t>(this))));
507   state->SetBoolean("enabled", IsQueueEnabled());
508   state->SetString("time_domain_name",
509                    main_thread_only().time_domain->GetName());
510   state->SetInteger("immediate_incoming_queue_size",
511                     immediate_incoming_queue().size());
512   state->SetInteger("delayed_incoming_queue_size",
513                     main_thread_only().delayed_incoming_queue.size());
514   state->SetInteger("immediate_work_queue_size",
515                     main_thread_only().immediate_work_queue->Size());
516   state->SetInteger("delayed_work_queue_size",
517                     main_thread_only().delayed_work_queue->Size());
518 
519   if (!main_thread_only().delayed_incoming_queue.empty()) {
520     TimeDelta delay_to_next_task =
521         (main_thread_only().delayed_incoming_queue.top().delayed_run_time -
522          main_thread_only().time_domain->CreateLazyNow().Now());
523     state->SetDouble("delay_to_next_task_ms",
524                      delay_to_next_task.InMillisecondsF());
525   }
526   if (main_thread_only().current_fence)
527     state->SetInteger("current_fence", main_thread_only().current_fence);
528   if (main_thread_only().delayed_fence) {
529     state->SetDouble(
530         "delayed_fence_seconds_from_now",
531         (main_thread_only().delayed_fence.value() - now).InSecondsF());
532   }
533 
534   bool verbose = false;
535   TRACE_EVENT_CATEGORY_GROUP_ENABLED(
536       TRACE_DISABLED_BY_DEFAULT("sequence_manager.verbose_snapshots"),
537       &verbose);
538 
539   if (verbose) {
540     state->BeginArray("immediate_incoming_queue");
541     QueueAsValueInto(immediate_incoming_queue(), now, state);
542     state->EndArray();
543     state->BeginArray("delayed_work_queue");
544     main_thread_only().delayed_work_queue->AsValueInto(now, state);
545     state->EndArray();
546     state->BeginArray("immediate_work_queue");
547     main_thread_only().immediate_work_queue->AsValueInto(now, state);
548     state->EndArray();
549     state->BeginArray("delayed_incoming_queue");
550     QueueAsValueInto(main_thread_only().delayed_incoming_queue, now, state);
551     state->EndArray();
552   }
553   state->SetString("priority", TaskQueue::PriorityToString(GetQueuePriority()));
554   state->EndDictionary();
555 }
556 
AddTaskObserver(MessageLoop::TaskObserver * task_observer)557 void TaskQueueImpl::AddTaskObserver(MessageLoop::TaskObserver* task_observer) {
558   main_thread_only().task_observers.AddObserver(task_observer);
559 }
560 
RemoveTaskObserver(MessageLoop::TaskObserver * task_observer)561 void TaskQueueImpl::RemoveTaskObserver(
562     MessageLoop::TaskObserver* task_observer) {
563   main_thread_only().task_observers.RemoveObserver(task_observer);
564 }
565 
NotifyWillProcessTask(const PendingTask & pending_task)566 void TaskQueueImpl::NotifyWillProcessTask(const PendingTask& pending_task) {
567   DCHECK(should_notify_observers_);
568   if (main_thread_only().blame_context)
569     main_thread_only().blame_context->Enter();
570   for (auto& observer : main_thread_only().task_observers)
571     observer.WillProcessTask(pending_task);
572 }
573 
NotifyDidProcessTask(const PendingTask & pending_task)574 void TaskQueueImpl::NotifyDidProcessTask(const PendingTask& pending_task) {
575   DCHECK(should_notify_observers_);
576   for (auto& observer : main_thread_only().task_observers)
577     observer.DidProcessTask(pending_task);
578   if (main_thread_only().blame_context)
579     main_thread_only().blame_context->Leave();
580 }
581 
SetTimeDomain(TimeDomain * time_domain)582 void TaskQueueImpl::SetTimeDomain(TimeDomain* time_domain) {
583   {
584     AutoLock lock(any_thread_lock_);
585     DCHECK(time_domain);
586     // NOTE this is similar to checking |any_thread().sequence_manager| but
587     // the TaskQueueSelectorTests constructs TaskQueueImpl directly with a null
588     // sequence_manager.  Instead we check |any_thread().time_domain| which is
589     // another way of asserting that UnregisterTaskQueue has not been called.
590     DCHECK(any_thread().time_domain);
591     if (!any_thread().time_domain)
592       return;
593     DCHECK(main_thread_checker_.CalledOnValidThread());
594     if (time_domain == main_thread_only().time_domain)
595       return;
596 
597     any_thread().time_domain = time_domain;
598   }
599 
600   main_thread_only().time_domain->UnregisterQueue(this);
601   main_thread_only().time_domain = time_domain;
602 
603   LazyNow lazy_now = time_domain->CreateLazyNow();
604   // Clear scheduled wake up to ensure that new notifications are issued
605   // correctly.
606   // TODO(altimin): Remove this when we won't have to support changing time
607   // domains.
608   main_thread_only().scheduled_wake_up = nullopt;
609   UpdateDelayedWakeUp(&lazy_now);
610 }
611 
GetTimeDomain() const612 TimeDomain* TaskQueueImpl::GetTimeDomain() const {
613   if (PlatformThread::CurrentId() == thread_id_)
614     return main_thread_only().time_domain;
615 
616   AutoLock lock(any_thread_lock_);
617   return any_thread().time_domain;
618 }
619 
SetBlameContext(trace_event::BlameContext * blame_context)620 void TaskQueueImpl::SetBlameContext(trace_event::BlameContext* blame_context) {
621   main_thread_only().blame_context = blame_context;
622 }
623 
InsertFence(TaskQueue::InsertFencePosition position)624 void TaskQueueImpl::InsertFence(TaskQueue::InsertFencePosition position) {
625   if (!main_thread_only().sequence_manager)
626     return;
627 
628   // Only one fence may be present at a time.
629   main_thread_only().delayed_fence = nullopt;
630 
631   EnqueueOrder previous_fence = main_thread_only().current_fence;
632   EnqueueOrder current_fence =
633       position == TaskQueue::InsertFencePosition::kNow
634           ? main_thread_only().sequence_manager->GetNextSequenceNumber()
635           : EnqueueOrder::blocking_fence();
636 
637   // Tasks posted after this point will have a strictly higher enqueue order
638   // and will be blocked from running.
639   main_thread_only().current_fence = current_fence;
640   bool task_unblocked =
641       main_thread_only().immediate_work_queue->InsertFence(current_fence);
642   task_unblocked |=
643       main_thread_only().delayed_work_queue->InsertFence(current_fence);
644 
645   if (!task_unblocked && previous_fence && previous_fence < current_fence) {
646     AutoLock lock(immediate_incoming_queue_lock_);
647     if (!immediate_incoming_queue().empty() &&
648         immediate_incoming_queue().front().enqueue_order() > previous_fence &&
649         immediate_incoming_queue().front().enqueue_order() < current_fence) {
650       task_unblocked = true;
651     }
652   }
653 
654   if (IsQueueEnabled() && task_unblocked) {
655     main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
656   }
657 }
658 
InsertFenceAt(TimeTicks time)659 void TaskQueueImpl::InsertFenceAt(TimeTicks time) {
660   // Task queue can have only one fence, delayed or not.
661   RemoveFence();
662   main_thread_only().delayed_fence = time;
663 }
664 
RemoveFence()665 void TaskQueueImpl::RemoveFence() {
666   if (!main_thread_only().sequence_manager)
667     return;
668 
669   EnqueueOrder previous_fence = main_thread_only().current_fence;
670   main_thread_only().current_fence = EnqueueOrder::none();
671   main_thread_only().delayed_fence = nullopt;
672 
673   bool task_unblocked = main_thread_only().immediate_work_queue->RemoveFence();
674   task_unblocked |= main_thread_only().delayed_work_queue->RemoveFence();
675 
676   if (!task_unblocked && previous_fence) {
677     AutoLock lock(immediate_incoming_queue_lock_);
678     if (!immediate_incoming_queue().empty() &&
679         immediate_incoming_queue().front().enqueue_order() > previous_fence) {
680       task_unblocked = true;
681     }
682   }
683 
684   if (IsQueueEnabled() && task_unblocked) {
685     main_thread_only().sequence_manager->MaybeScheduleImmediateWork(FROM_HERE);
686   }
687 }
688 
BlockedByFence() const689 bool TaskQueueImpl::BlockedByFence() const {
690   if (!main_thread_only().current_fence)
691     return false;
692 
693   if (!main_thread_only().immediate_work_queue->BlockedByFence() ||
694       !main_thread_only().delayed_work_queue->BlockedByFence()) {
695     return false;
696   }
697 
698   AutoLock lock(immediate_incoming_queue_lock_);
699   if (immediate_incoming_queue().empty())
700     return true;
701 
702   return immediate_incoming_queue().front().enqueue_order() >
703          main_thread_only().current_fence;
704 }
705 
HasActiveFence()706 bool TaskQueueImpl::HasActiveFence() {
707   if (main_thread_only().delayed_fence &&
708       main_thread_only().time_domain->Now() >
709           main_thread_only().delayed_fence.value()) {
710     return true;
711   }
712   return !!main_thread_only().current_fence;
713 }
714 
CouldTaskRun(EnqueueOrder enqueue_order) const715 bool TaskQueueImpl::CouldTaskRun(EnqueueOrder enqueue_order) const {
716   if (!IsQueueEnabled())
717     return false;
718 
719   if (!main_thread_only().current_fence)
720     return true;
721 
722   return enqueue_order < main_thread_only().current_fence;
723 }
724 
725 // static
QueueAsValueInto(const TaskDeque & queue,TimeTicks now,trace_event::TracedValue * state)726 void TaskQueueImpl::QueueAsValueInto(const TaskDeque& queue,
727                                      TimeTicks now,
728                                      trace_event::TracedValue* state) {
729   for (const Task& task : queue) {
730     TaskAsValueInto(task, now, state);
731   }
732 }
733 
734 // static
QueueAsValueInto(const std::priority_queue<Task> & queue,TimeTicks now,trace_event::TracedValue * state)735 void TaskQueueImpl::QueueAsValueInto(const std::priority_queue<Task>& queue,
736                                      TimeTicks now,
737                                      trace_event::TracedValue* state) {
738   // Remove const to search |queue| in the destructive manner. Restore the
739   // content from |visited| later.
740   std::priority_queue<Task>* mutable_queue =
741       const_cast<std::priority_queue<Task>*>(&queue);
742   std::priority_queue<Task> visited;
743   while (!mutable_queue->empty()) {
744     TaskAsValueInto(mutable_queue->top(), now, state);
745     visited.push(std::move(const_cast<Task&>(mutable_queue->top())));
746     mutable_queue->pop();
747   }
748   *mutable_queue = std::move(visited);
749 }
750 
751 // static
TaskAsValueInto(const Task & task,TimeTicks now,trace_event::TracedValue * state)752 void TaskQueueImpl::TaskAsValueInto(const Task& task,
753                                     TimeTicks now,
754                                     trace_event::TracedValue* state) {
755   state->BeginDictionary();
756   state->SetString("posted_from", task.posted_from.ToString());
757   if (task.enqueue_order_set())
758     state->SetInteger("enqueue_order", task.enqueue_order());
759   state->SetInteger("sequence_num", task.sequence_num);
760   state->SetBoolean("nestable", task.nestable == Nestable::kNestable);
761   state->SetBoolean("is_high_res", task.is_high_res);
762   state->SetBoolean("is_cancelled", task.task.IsCancelled());
763   state->SetDouble("delayed_run_time",
764                    (task.delayed_run_time - TimeTicks()).InMillisecondsF());
765   state->SetDouble("delayed_run_time_milliseconds_from_now",
766                    (task.delayed_run_time - now).InMillisecondsF());
767   state->EndDictionary();
768 }
769 
QueueEnabledVoterImpl(scoped_refptr<TaskQueue> task_queue)770 TaskQueueImpl::QueueEnabledVoterImpl::QueueEnabledVoterImpl(
771     scoped_refptr<TaskQueue> task_queue)
772     : task_queue_(task_queue), enabled_(true) {}
773 
~QueueEnabledVoterImpl()774 TaskQueueImpl::QueueEnabledVoterImpl::~QueueEnabledVoterImpl() {
775   if (task_queue_->GetTaskQueueImpl())
776     task_queue_->GetTaskQueueImpl()->RemoveQueueEnabledVoter(this);
777 }
778 
SetQueueEnabled(bool enabled)779 void TaskQueueImpl::QueueEnabledVoterImpl::SetQueueEnabled(bool enabled) {
780   if (enabled_ == enabled)
781     return;
782 
783   task_queue_->GetTaskQueueImpl()->OnQueueEnabledVoteChanged(enabled);
784   enabled_ = enabled;
785 }
786 
RemoveQueueEnabledVoter(const QueueEnabledVoterImpl * voter)787 void TaskQueueImpl::RemoveQueueEnabledVoter(
788     const QueueEnabledVoterImpl* voter) {
789   // Bail out if we're being called from TaskQueueImpl::UnregisterTaskQueue.
790   if (!main_thread_only().time_domain)
791     return;
792 
793   bool was_enabled = IsQueueEnabled();
794   if (voter->enabled_) {
795     main_thread_only().is_enabled_refcount--;
796     DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
797   }
798 
799   main_thread_only().voter_refcount--;
800   DCHECK_GE(main_thread_only().voter_refcount, 0);
801 
802   bool is_enabled = IsQueueEnabled();
803   if (was_enabled != is_enabled)
804     EnableOrDisableWithSelector(is_enabled);
805 }
806 
IsQueueEnabled() const807 bool TaskQueueImpl::IsQueueEnabled() const {
808   // By default is_enabled_refcount and voter_refcount both equal zero.
809   return (main_thread_only().is_enabled_refcount ==
810           main_thread_only().voter_refcount) &&
811          main_thread_only().is_enabled_for_test;
812 }
813 
OnQueueEnabledVoteChanged(bool enabled)814 void TaskQueueImpl::OnQueueEnabledVoteChanged(bool enabled) {
815   bool was_enabled = IsQueueEnabled();
816   if (enabled) {
817     main_thread_only().is_enabled_refcount++;
818     DCHECK_LE(main_thread_only().is_enabled_refcount,
819               main_thread_only().voter_refcount);
820   } else {
821     main_thread_only().is_enabled_refcount--;
822     DCHECK_GE(main_thread_only().is_enabled_refcount, 0);
823   }
824 
825   bool is_enabled = IsQueueEnabled();
826   if (was_enabled != is_enabled)
827     EnableOrDisableWithSelector(is_enabled);
828 }
829 
EnableOrDisableWithSelector(bool enable)830 void TaskQueueImpl::EnableOrDisableWithSelector(bool enable) {
831   if (!main_thread_only().sequence_manager)
832     return;
833 
834   LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
835   UpdateDelayedWakeUp(&lazy_now);
836 
837   if (enable) {
838     if (HasPendingImmediateWork() &&
839         !main_thread_only().on_next_wake_up_changed_callback.is_null()) {
840       // Delayed work notification will be issued via time domain.
841       main_thread_only().on_next_wake_up_changed_callback.Run(TimeTicks());
842     }
843 
844     // Note the selector calls SequenceManager::OnTaskQueueEnabled which posts
845     // a DoWork if needed.
846     main_thread_only()
847         .sequence_manager->main_thread_only()
848         .selector.EnableQueue(this);
849   } else {
850     main_thread_only()
851         .sequence_manager->main_thread_only()
852         .selector.DisableQueue(this);
853   }
854 }
855 
856 std::unique_ptr<TaskQueue::QueueEnabledVoter>
CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue)857 TaskQueueImpl::CreateQueueEnabledVoter(scoped_refptr<TaskQueue> task_queue) {
858   DCHECK_EQ(task_queue->GetTaskQueueImpl(), this);
859   main_thread_only().voter_refcount++;
860   main_thread_only().is_enabled_refcount++;
861   return std::make_unique<QueueEnabledVoterImpl>(task_queue);
862 }
863 
SweepCanceledDelayedTasks(TimeTicks now)864 void TaskQueueImpl::SweepCanceledDelayedTasks(TimeTicks now) {
865   if (main_thread_only().delayed_incoming_queue.empty())
866     return;
867 
868   // Remove canceled tasks.
869   std::priority_queue<Task> remaining_tasks;
870   while (!main_thread_only().delayed_incoming_queue.empty()) {
871     if (!main_thread_only().delayed_incoming_queue.top().task.IsCancelled()) {
872       remaining_tasks.push(std::move(
873           const_cast<Task&>(main_thread_only().delayed_incoming_queue.top())));
874     }
875     main_thread_only().delayed_incoming_queue.pop();
876   }
877 
878   main_thread_only().delayed_incoming_queue = std::move(remaining_tasks);
879 
880   LazyNow lazy_now(now);
881   UpdateDelayedWakeUp(&lazy_now);
882 }
883 
PushImmediateIncomingTaskForTest(TaskQueueImpl::Task && task)884 void TaskQueueImpl::PushImmediateIncomingTaskForTest(
885     TaskQueueImpl::Task&& task) {
886   AutoLock lock(immediate_incoming_queue_lock_);
887   immediate_incoming_queue().push_back(std::move(task));
888 }
889 
RequeueDeferredNonNestableTask(DeferredNonNestableTask task)890 void TaskQueueImpl::RequeueDeferredNonNestableTask(
891     DeferredNonNestableTask task) {
892   DCHECK(task.task.nestable == Nestable::kNonNestable);
893   // The re-queued tasks have to be pushed onto the front because we'd otherwise
894   // violate the strict monotonically increasing enqueue order within the
895   // WorkQueue.  We can't assign them a new enqueue order here because that will
896   // not behave correctly with fences and things will break (e.g Idle TQ).
897   if (task.work_queue_type == WorkQueueType::kDelayed) {
898     main_thread_only().delayed_work_queue->PushNonNestableTaskToFront(
899         std::move(task.task));
900   } else {
901     main_thread_only().immediate_work_queue->PushNonNestableTaskToFront(
902         std::move(task.task));
903   }
904 }
905 
SetOnNextWakeUpChangedCallback(TaskQueueImpl::OnNextWakeUpChangedCallback callback)906 void TaskQueueImpl::SetOnNextWakeUpChangedCallback(
907     TaskQueueImpl::OnNextWakeUpChangedCallback callback) {
908 #if DCHECK_IS_ON()
909   if (callback) {
910     DCHECK(main_thread_only().on_next_wake_up_changed_callback.is_null())
911         << "Can't assign two different observers to "
912            "blink::scheduler::TaskQueue";
913   }
914 #endif
915   AutoLock lock(any_thread_lock_);
916   any_thread().on_next_wake_up_changed_callback = callback;
917   main_thread_only().on_next_wake_up_changed_callback = callback;
918 }
919 
UpdateDelayedWakeUp(LazyNow * lazy_now)920 void TaskQueueImpl::UpdateDelayedWakeUp(LazyNow* lazy_now) {
921   return UpdateDelayedWakeUpImpl(lazy_now, GetNextScheduledWakeUpImpl());
922 }
923 
UpdateDelayedWakeUpImpl(LazyNow * lazy_now,Optional<TaskQueueImpl::DelayedWakeUp> wake_up)924 void TaskQueueImpl::UpdateDelayedWakeUpImpl(
925     LazyNow* lazy_now,
926     Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
927   if (main_thread_only().scheduled_wake_up == wake_up)
928     return;
929   main_thread_only().scheduled_wake_up = wake_up;
930 
931   if (wake_up &&
932       !main_thread_only().on_next_wake_up_changed_callback.is_null() &&
933       !HasPendingImmediateWork()) {
934     main_thread_only().on_next_wake_up_changed_callback.Run(wake_up->time);
935   }
936 
937   main_thread_only().time_domain->SetNextWakeUpForQueue(this, wake_up,
938                                                         lazy_now);
939 }
940 
SetDelayedWakeUpForTesting(Optional<TaskQueueImpl::DelayedWakeUp> wake_up)941 void TaskQueueImpl::SetDelayedWakeUpForTesting(
942     Optional<TaskQueueImpl::DelayedWakeUp> wake_up) {
943   LazyNow lazy_now = main_thread_only().time_domain->CreateLazyNow();
944   UpdateDelayedWakeUpImpl(&lazy_now, wake_up);
945 }
946 
HasPendingImmediateWork()947 bool TaskQueueImpl::HasPendingImmediateWork() {
948   // Any work queue tasks count as immediate work.
949   if (!main_thread_only().delayed_work_queue->Empty() ||
950       !main_thread_only().immediate_work_queue->Empty()) {
951     return true;
952   }
953 
954   // Finally tasks on |immediate_incoming_queue| count as immediate work.
955   AutoLock lock(immediate_incoming_queue_lock_);
956   return !immediate_incoming_queue().empty();
957 }
958 
SetOnTaskStartedHandler(TaskQueueImpl::OnTaskStartedHandler handler)959 void TaskQueueImpl::SetOnTaskStartedHandler(
960     TaskQueueImpl::OnTaskStartedHandler handler) {
961   main_thread_only().on_task_started_handler = std::move(handler);
962 }
963 
OnTaskStarted(const TaskQueue::Task & task,const TaskQueue::TaskTiming & task_timing)964 void TaskQueueImpl::OnTaskStarted(const TaskQueue::Task& task,
965                                   const TaskQueue::TaskTiming& task_timing) {
966   if (!main_thread_only().on_task_started_handler.is_null())
967     main_thread_only().on_task_started_handler.Run(task, task_timing);
968 }
969 
SetOnTaskCompletedHandler(TaskQueueImpl::OnTaskCompletedHandler handler)970 void TaskQueueImpl::SetOnTaskCompletedHandler(
971     TaskQueueImpl::OnTaskCompletedHandler handler) {
972   main_thread_only().on_task_completed_handler = std::move(handler);
973 }
974 
OnTaskCompleted(const TaskQueue::Task & task,const TaskQueue::TaskTiming & task_timing)975 void TaskQueueImpl::OnTaskCompleted(const TaskQueue::Task& task,
976                                     const TaskQueue::TaskTiming& task_timing) {
977   if (!main_thread_only().on_task_completed_handler.is_null())
978     main_thread_only().on_task_completed_handler.Run(task, task_timing);
979 }
980 
RequiresTaskTiming() const981 bool TaskQueueImpl::RequiresTaskTiming() const {
982   return !main_thread_only().on_task_started_handler.is_null() ||
983          !main_thread_only().on_task_completed_handler.is_null();
984 }
985 
IsUnregistered() const986 bool TaskQueueImpl::IsUnregistered() const {
987   AutoLock lock(any_thread_lock_);
988   return !any_thread().sequence_manager;
989 }
990 
GetSequenceManagerWeakPtr()991 WeakPtr<SequenceManagerImpl> TaskQueueImpl::GetSequenceManagerWeakPtr() {
992   return main_thread_only().sequence_manager->GetWeakPtr();
993 }
994 
995 scoped_refptr<GracefulQueueShutdownHelper>
GetGracefulQueueShutdownHelper()996 TaskQueueImpl::GetGracefulQueueShutdownHelper() {
997   return main_thread_only().sequence_manager->GetGracefulQueueShutdownHelper();
998 }
999 
SetQueueEnabledForTest(bool enabled)1000 void TaskQueueImpl::SetQueueEnabledForTest(bool enabled) {
1001   main_thread_only().is_enabled_for_test = enabled;
1002   EnableOrDisableWithSelector(IsQueueEnabled());
1003 }
1004 
ActivateDelayedFenceIfNeeded(TimeTicks now)1005 void TaskQueueImpl::ActivateDelayedFenceIfNeeded(TimeTicks now) {
1006   if (!main_thread_only().delayed_fence)
1007     return;
1008   if (main_thread_only().delayed_fence.value() > now)
1009     return;
1010   InsertFence(TaskQueue::InsertFencePosition::kNow);
1011   main_thread_only().delayed_fence = nullopt;
1012 }
1013 
1014 }  // namespace internal
1015 }  // namespace sequence_manager
1016 }  // namespace base
1017