1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task_scheduler/task_scheduler_impl.h"
6 
7 #include <algorithm>
8 #include <string>
9 #include <utility>
10 
11 #include "base/compiler_specific.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/metrics/field_trial_params.h"
14 #include "base/stl_util.h"
15 #include "base/strings/string_util.h"
16 #include "base/task_scheduler/delayed_task_manager.h"
17 #include "base/task_scheduler/environment_config.h"
18 #include "base/task_scheduler/scheduler_worker_pool_params.h"
19 #include "base/task_scheduler/sequence.h"
20 #include "base/task_scheduler/sequence_sort_key.h"
21 #include "base/task_scheduler/service_thread.h"
22 #include "base/task_scheduler/task.h"
23 #include "base/task_scheduler/task_tracker.h"
24 #include "base/time/time.h"
25 
26 namespace base {
27 namespace internal {
28 
TaskSchedulerImpl(StringPiece histogram_label)29 TaskSchedulerImpl::TaskSchedulerImpl(StringPiece histogram_label)
30     : TaskSchedulerImpl(histogram_label,
31                         std::make_unique<TaskTrackerImpl>(histogram_label)) {}
32 
TaskSchedulerImpl(StringPiece histogram_label,std::unique_ptr<TaskTrackerImpl> task_tracker)33 TaskSchedulerImpl::TaskSchedulerImpl(
34     StringPiece histogram_label,
35     std::unique_ptr<TaskTrackerImpl> task_tracker)
36     : task_tracker_(std::move(task_tracker)),
37       service_thread_(std::make_unique<ServiceThread>(task_tracker_.get())),
38       single_thread_task_runner_manager_(task_tracker_->GetTrackedRef(),
39                                          &delayed_task_manager_) {
40   DCHECK(!histogram_label.empty());
41 
42   static_assert(arraysize(environment_to_worker_pool_) == ENVIRONMENT_COUNT,
43                 "The size of |environment_to_worker_pool_| must match "
44                 "ENVIRONMENT_COUNT.");
45   static_assert(
46       size(kEnvironmentParams) == ENVIRONMENT_COUNT,
47       "The size of |kEnvironmentParams| must match ENVIRONMENT_COUNT.");
48 
49   int num_pools_to_create = CanUseBackgroundPriorityForSchedulerWorker()
50                                 ? ENVIRONMENT_COUNT
51                                 : ENVIRONMENT_COUNT_WITHOUT_BACKGROUND_PRIORITY;
52   for (int environment_type = 0; environment_type < num_pools_to_create;
53        ++environment_type) {
54     worker_pools_.emplace_back(std::make_unique<SchedulerWorkerPoolImpl>(
55         JoinString(
56             {histogram_label, kEnvironmentParams[environment_type].name_suffix},
57             "."),
58         kEnvironmentParams[environment_type].name_suffix,
59         kEnvironmentParams[environment_type].priority_hint,
60         task_tracker_->GetTrackedRef(), &delayed_task_manager_));
61   }
62 
63   // Map environment indexes to pools.
64   environment_to_worker_pool_[FOREGROUND] = worker_pools_[FOREGROUND].get();
65   environment_to_worker_pool_[FOREGROUND_BLOCKING] =
66       worker_pools_[FOREGROUND_BLOCKING].get();
67 
68   if (CanUseBackgroundPriorityForSchedulerWorker()) {
69     environment_to_worker_pool_[BACKGROUND] = worker_pools_[BACKGROUND].get();
70     environment_to_worker_pool_[BACKGROUND_BLOCKING] =
71         worker_pools_[BACKGROUND_BLOCKING].get();
72   } else {
73     // On platforms without background thread priority, tasks posted to the
74     // background environment are run by foreground pools.
75     environment_to_worker_pool_[BACKGROUND] = worker_pools_[FOREGROUND].get();
76     environment_to_worker_pool_[BACKGROUND_BLOCKING] =
77         worker_pools_[FOREGROUND_BLOCKING].get();
78   }
79 }
80 
~TaskSchedulerImpl()81 TaskSchedulerImpl::~TaskSchedulerImpl() {
82 #if DCHECK_IS_ON()
83   DCHECK(join_for_testing_returned_.IsSet());
84 #endif
85 }
86 
Start(const TaskScheduler::InitParams & init_params,SchedulerWorkerObserver * scheduler_worker_observer)87 void TaskSchedulerImpl::Start(
88     const TaskScheduler::InitParams& init_params,
89     SchedulerWorkerObserver* scheduler_worker_observer) {
90   // This is set in Start() and not in the constructor because variation params
91   // are usually not ready when TaskSchedulerImpl is instantiated in a process.
92   if (base::GetFieldTrialParamValue("BrowserScheduler",
93                                     "AllTasksUserBlocking") == "true") {
94     all_tasks_user_blocking_.Set();
95   }
96 
97   // Start the service thread. On platforms that support it (POSIX except NaCL
98   // SFI), the service thread runs a MessageLoopForIO which is used to support
99   // FileDescriptorWatcher in the scope in which tasks run.
100   ServiceThread::Options service_thread_options;
101   service_thread_options.message_loop_type =
102 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
103       MessageLoop::TYPE_IO;
104 #else
105       MessageLoop::TYPE_DEFAULT;
106 #endif
107   service_thread_options.timer_slack = TIMER_SLACK_MAXIMUM;
108   CHECK(service_thread_->StartWithOptions(service_thread_options));
109 
110 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
111   // Needs to happen after starting the service thread to get its
112   // message_loop().
113   task_tracker_->set_watch_file_descriptor_message_loop(
114       static_cast<MessageLoopForIO*>(service_thread_->message_loop()));
115 
116 #if DCHECK_IS_ON()
117   task_tracker_->set_service_thread_handle(service_thread_->GetThreadHandle());
118 #endif  // DCHECK_IS_ON()
119 #endif  // defined(OS_POSIX) && !defined(OS_NACL_SFI)
120 
121   // Needs to happen after starting the service thread to get its task_runner().
122   scoped_refptr<TaskRunner> service_thread_task_runner =
123       service_thread_->task_runner();
124   delayed_task_manager_.Start(service_thread_task_runner);
125 
126   single_thread_task_runner_manager_.Start(scheduler_worker_observer);
127 
128   const SchedulerWorkerPoolImpl::WorkerEnvironment worker_environment =
129 #if defined(OS_WIN)
130       init_params.shared_worker_pool_environment ==
131               InitParams::SharedWorkerPoolEnvironment::COM_MTA
132           ? SchedulerWorkerPoolImpl::WorkerEnvironment::COM_MTA
133           : SchedulerWorkerPoolImpl::WorkerEnvironment::NONE;
134 #else
135       SchedulerWorkerPoolImpl::WorkerEnvironment::NONE;
136 #endif
137 
138   // On platforms that can't use the background thread priority, background
139   // tasks run in foreground pools. A cap is set on the number of background
140   // tasks that can run in foreground pools to ensure that there is always room
141   // for incoming foreground tasks and to minimize the performance impact of
142   // background tasks.
143   const int max_background_tasks_in_foreground_pool = std::max(
144       1, std::min(init_params.background_worker_pool_params.max_tasks(),
145                   init_params.foreground_worker_pool_params.max_tasks() / 2));
146   worker_pools_[FOREGROUND]->Start(
147       init_params.foreground_worker_pool_params,
148       max_background_tasks_in_foreground_pool, service_thread_task_runner,
149       scheduler_worker_observer, worker_environment);
150   const int max_background_tasks_in_foreground_blocking_pool = std::max(
151       1,
152       std::min(
153           init_params.background_blocking_worker_pool_params.max_tasks(),
154           init_params.foreground_blocking_worker_pool_params.max_tasks() / 2));
155   worker_pools_[FOREGROUND_BLOCKING]->Start(
156       init_params.foreground_blocking_worker_pool_params,
157       max_background_tasks_in_foreground_blocking_pool,
158       service_thread_task_runner, scheduler_worker_observer,
159       worker_environment);
160 
161   if (CanUseBackgroundPriorityForSchedulerWorker()) {
162     worker_pools_[BACKGROUND]->Start(
163         init_params.background_worker_pool_params,
164         init_params.background_worker_pool_params.max_tasks(),
165         service_thread_task_runner, scheduler_worker_observer,
166         worker_environment);
167     worker_pools_[BACKGROUND_BLOCKING]->Start(
168         init_params.background_blocking_worker_pool_params,
169         init_params.background_blocking_worker_pool_params.max_tasks(),
170         service_thread_task_runner, scheduler_worker_observer,
171         worker_environment);
172   }
173 }
174 
PostDelayedTaskWithTraits(const Location & from_here,const TaskTraits & traits,OnceClosure task,TimeDelta delay)175 void TaskSchedulerImpl::PostDelayedTaskWithTraits(const Location& from_here,
176                                                   const TaskTraits& traits,
177                                                   OnceClosure task,
178                                                   TimeDelta delay) {
179   // Post |task| as part of a one-off single-task Sequence.
180   const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
181   GetWorkerPoolForTraits(new_traits)
182       ->PostTaskWithSequence(
183           Task(from_here, std::move(task), new_traits, delay),
184           MakeRefCounted<Sequence>());
185 }
186 
CreateTaskRunnerWithTraits(const TaskTraits & traits)187 scoped_refptr<TaskRunner> TaskSchedulerImpl::CreateTaskRunnerWithTraits(
188     const TaskTraits& traits) {
189   const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
190   return GetWorkerPoolForTraits(new_traits)
191       ->CreateTaskRunnerWithTraits(new_traits);
192 }
193 
194 scoped_refptr<SequencedTaskRunner>
CreateSequencedTaskRunnerWithTraits(const TaskTraits & traits)195 TaskSchedulerImpl::CreateSequencedTaskRunnerWithTraits(
196     const TaskTraits& traits) {
197   const TaskTraits new_traits = SetUserBlockingPriorityIfNeeded(traits);
198   return GetWorkerPoolForTraits(new_traits)
199       ->CreateSequencedTaskRunnerWithTraits(new_traits);
200 }
201 
202 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunnerWithTraits(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)203 TaskSchedulerImpl::CreateSingleThreadTaskRunnerWithTraits(
204     const TaskTraits& traits,
205     SingleThreadTaskRunnerThreadMode thread_mode) {
206   return single_thread_task_runner_manager_
207       .CreateSingleThreadTaskRunnerWithTraits(
208           SetUserBlockingPriorityIfNeeded(traits), thread_mode);
209 }
210 
211 #if defined(OS_WIN)
212 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunnerWithTraits(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)213 TaskSchedulerImpl::CreateCOMSTATaskRunnerWithTraits(
214     const TaskTraits& traits,
215     SingleThreadTaskRunnerThreadMode thread_mode) {
216   return single_thread_task_runner_manager_.CreateCOMSTATaskRunnerWithTraits(
217       SetUserBlockingPriorityIfNeeded(traits), thread_mode);
218 }
219 #endif  // defined(OS_WIN)
220 
GetHistograms() const221 std::vector<const HistogramBase*> TaskSchedulerImpl::GetHistograms() const {
222   std::vector<const HistogramBase*> histograms;
223   for (const auto& worker_pool : worker_pools_)
224     worker_pool->GetHistograms(&histograms);
225 
226   return histograms;
227 }
228 
GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(const TaskTraits & traits) const229 int TaskSchedulerImpl::GetMaxConcurrentNonBlockedTasksWithTraitsDeprecated(
230     const TaskTraits& traits) const {
231   // This method does not support getting the maximum number of BACKGROUND tasks
232   // that can run concurrently in a pool.
233   DCHECK_NE(traits.priority(), TaskPriority::BACKGROUND);
234   return GetWorkerPoolForTraits(traits)
235       ->GetMaxConcurrentNonBlockedTasksDeprecated();
236 }
237 
Shutdown()238 void TaskSchedulerImpl::Shutdown() {
239   // TODO(fdoray): Increase the priority of BACKGROUND tasks blocking shutdown.
240   task_tracker_->Shutdown();
241 }
242 
FlushForTesting()243 void TaskSchedulerImpl::FlushForTesting() {
244   task_tracker_->FlushForTesting();
245 }
246 
FlushAsyncForTesting(OnceClosure flush_callback)247 void TaskSchedulerImpl::FlushAsyncForTesting(OnceClosure flush_callback) {
248   task_tracker_->FlushAsyncForTesting(std::move(flush_callback));
249 }
250 
JoinForTesting()251 void TaskSchedulerImpl::JoinForTesting() {
252 #if DCHECK_IS_ON()
253   DCHECK(!join_for_testing_returned_.IsSet());
254 #endif
255   // The service thread must be stopped before the workers are joined, otherwise
256   // tasks scheduled by the DelayedTaskManager might be posted between joining
257   // those workers and stopping the service thread which will cause a CHECK. See
258   // https://crbug.com/771701.
259   service_thread_->Stop();
260   single_thread_task_runner_manager_.JoinForTesting();
261   for (const auto& worker_pool : worker_pools_)
262     worker_pool->JoinForTesting();
263 #if DCHECK_IS_ON()
264   join_for_testing_returned_.Set();
265 #endif
266 }
267 
GetWorkerPoolForTraits(const TaskTraits & traits) const268 SchedulerWorkerPoolImpl* TaskSchedulerImpl::GetWorkerPoolForTraits(
269     const TaskTraits& traits) const {
270   return environment_to_worker_pool_[GetEnvironmentIndexForTraits(traits)];
271 }
272 
SetUserBlockingPriorityIfNeeded(const TaskTraits & traits) const273 TaskTraits TaskSchedulerImpl::SetUserBlockingPriorityIfNeeded(
274     const TaskTraits& traits) const {
275   return all_tasks_user_blocking_.IsSet()
276              ? TaskTraits::Override(traits, {TaskPriority::USER_BLOCKING})
277              : traits;
278 }
279 
280 }  // namespace internal
281 }  // namespace base
282