1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/task_scheduler/scheduler_single_thread_task_runner_manager.h"
6
7 #include <algorithm>
8 #include <memory>
9 #include <string>
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/strings/stringprintf.h"
17 #include "base/synchronization/atomic_flag.h"
18 #include "base/task_scheduler/delayed_task_manager.h"
19 #include "base/task_scheduler/scheduler_worker.h"
20 #include "base/task_scheduler/sequence.h"
21 #include "base/task_scheduler/task.h"
22 #include "base/task_scheduler/task_tracker.h"
23 #include "base/task_scheduler/task_traits.h"
24 #include "base/threading/platform_thread.h"
25 #include "base/time/time.h"
26
27 #if defined(OS_WIN)
28 #include <windows.h>
29
30 #include "base/win/scoped_com_initializer.h"
31 #endif // defined(OS_WIN)
32
33 namespace base {
34 namespace internal {
35
36 namespace {
37
38 // Boolean indicating whether there's a SchedulerSingleThreadTaskRunnerManager
39 // instance alive in this process. This variable should only be set when the
40 // SchedulerSingleThreadTaskRunnerManager instance is brought up (on the main
41 // thread; before any tasks are posted) and decremented when the instance is
42 // brought down (i.e., only when unit tests tear down the task environment and
43 // never in production). This makes the variable const while worker threads are
44 // up and as such it doesn't need to be atomic. It is used to tell when a task
45 // is posted from the main thread after the task environment was brought down in
46 // unit tests so that SchedulerSingleThreadTaskRunnerManager bound TaskRunners
47 // can return false on PostTask, letting such callers know they should complete
48 // necessary work synchronously. Note: |!g_manager_is_alive| is generally
49 // equivalent to |!TaskScheduler::GetInstance()| but has the advantage of being
50 // valid in task_scheduler unit tests that don't instantiate a full
51 // TaskScheduler.
52 bool g_manager_is_alive = false;
53
54 // Allows for checking the PlatformThread::CurrentRef() against a set
55 // PlatformThreadRef atomically without using locks.
56 class AtomicThreadRefChecker {
57 public:
58 AtomicThreadRefChecker() = default;
59 ~AtomicThreadRefChecker() = default;
60
Set()61 void Set() {
62 thread_ref_ = PlatformThread::CurrentRef();
63 is_set_.Set();
64 }
65
IsCurrentThreadSameAsSetThread()66 bool IsCurrentThreadSameAsSetThread() {
67 return is_set_.IsSet() && thread_ref_ == PlatformThread::CurrentRef();
68 }
69
70 private:
71 AtomicFlag is_set_;
72 PlatformThreadRef thread_ref_;
73
74 DISALLOW_COPY_AND_ASSIGN(AtomicThreadRefChecker);
75 };
76
77 class SchedulerWorkerDelegate : public SchedulerWorker::Delegate {
78 public:
SchedulerWorkerDelegate(const std::string & thread_name,SchedulerWorker::ThreadLabel thread_label)79 SchedulerWorkerDelegate(const std::string& thread_name,
80 SchedulerWorker::ThreadLabel thread_label)
81 : thread_name_(thread_name), thread_label_(thread_label) {}
82
set_worker(SchedulerWorker * worker)83 void set_worker(SchedulerWorker* worker) {
84 DCHECK(!worker_);
85 worker_ = worker;
86 }
87
88 // SchedulerWorker::Delegate:
OnCanScheduleSequence(scoped_refptr<Sequence> sequence)89 void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override {
90 DCHECK(worker_);
91 ReEnqueueSequence(std::move(sequence));
92 worker_->WakeUp();
93 }
94
GetThreadLabel() const95 SchedulerWorker::ThreadLabel GetThreadLabel() const final {
96 return thread_label_;
97 }
98
OnMainEntry(const SchedulerWorker *)99 void OnMainEntry(const SchedulerWorker* /* worker */) override {
100 thread_ref_checker_.Set();
101 PlatformThread::SetName(thread_name_);
102 }
103
GetWork(SchedulerWorker * worker)104 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
105 AutoSchedulerLock auto_lock(sequence_lock_);
106 bool has_work = has_work_;
107 has_work_ = false;
108 return has_work ? sequence_ : nullptr;
109 }
110
DidRunTask()111 void DidRunTask() override {}
112
ReEnqueueSequence(scoped_refptr<Sequence> sequence)113 void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override {
114 AutoSchedulerLock auto_lock(sequence_lock_);
115 // We've shut down, so no-op this work request. Any sequence cleanup will
116 // occur in the caller's context.
117 if (!sequence_)
118 return;
119
120 DCHECK_EQ(sequence, sequence_);
121 DCHECK(!has_work_);
122 has_work_ = true;
123 }
124
GetSleepTimeout()125 TimeDelta GetSleepTimeout() override { return TimeDelta::Max(); }
126
RunsTasksInCurrentSequence()127 bool RunsTasksInCurrentSequence() {
128 // We check the thread ref instead of the sequence for the benefit of COM
129 // callbacks which may execute without a sequence context.
130 return thread_ref_checker_.IsCurrentThreadSameAsSetThread();
131 }
132
OnMainExit(SchedulerWorker *)133 void OnMainExit(SchedulerWorker* /* worker */) override {
134 // Move |sequence_| to |local_sequence| so that if we have the last
135 // reference to the sequence we don't destroy it (and its tasks) within
136 // |sequence_lock_|.
137 scoped_refptr<Sequence> local_sequence;
138 {
139 AutoSchedulerLock auto_lock(sequence_lock_);
140 // To reclaim skipped tasks on shutdown, we null out the sequence to allow
141 // the tasks to destroy themselves.
142 local_sequence = std::move(sequence_);
143 }
144 }
145
146 // SchedulerWorkerDelegate:
147
148 // Consumers should release their sequence reference as soon as possible to
149 // ensure timely cleanup for general shutdown.
sequence()150 scoped_refptr<Sequence> sequence() {
151 AutoSchedulerLock auto_lock(sequence_lock_);
152 return sequence_;
153 }
154
155 private:
156 const std::string thread_name_;
157 const SchedulerWorker::ThreadLabel thread_label_;
158
159 // The SchedulerWorker that has |this| as a delegate. Must be set before
160 // starting or posting a task to the SchedulerWorker, because it's used in
161 // OnMainEntry() and OnCanScheduleSequence() (called when a sequence held up
162 // by WillScheduleSequence() in PostTaskNow() can be scheduled).
163 SchedulerWorker* worker_ = nullptr;
164
165 // Synchronizes access to |sequence_| and |has_work_|.
166 SchedulerLock sequence_lock_;
167 scoped_refptr<Sequence> sequence_ = new Sequence;
168 bool has_work_ = false;
169
170 AtomicThreadRefChecker thread_ref_checker_;
171
172 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegate);
173 };
174
175 #if defined(OS_WIN)
176
177 class SchedulerWorkerCOMDelegate : public SchedulerWorkerDelegate {
178 public:
SchedulerWorkerCOMDelegate(const std::string & thread_name,SchedulerWorker::ThreadLabel thread_label,TrackedRef<TaskTracker> task_tracker)179 SchedulerWorkerCOMDelegate(const std::string& thread_name,
180 SchedulerWorker::ThreadLabel thread_label,
181 TrackedRef<TaskTracker> task_tracker)
182 : SchedulerWorkerDelegate(thread_name, thread_label),
183 task_tracker_(std::move(task_tracker)) {}
184
~SchedulerWorkerCOMDelegate()185 ~SchedulerWorkerCOMDelegate() override { DCHECK(!scoped_com_initializer_); }
186
187 // SchedulerWorker::Delegate:
OnMainEntry(const SchedulerWorker * worker)188 void OnMainEntry(const SchedulerWorker* worker) override {
189 SchedulerWorkerDelegate::OnMainEntry(worker);
190
191 scoped_com_initializer_ = std::make_unique<win::ScopedCOMInitializer>();
192 }
193
GetWork(SchedulerWorker * worker)194 scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override {
195 // This scheme below allows us to cover the following scenarios:
196 // * Only SchedulerWorkerDelegate::GetWork() has work:
197 // Always return the sequence from GetWork().
198 // * Only the Windows Message Queue has work:
199 // Always return the sequence from GetWorkFromWindowsMessageQueue();
200 // * Both SchedulerWorkerDelegate::GetWork() and the Windows Message Queue
201 // have work:
202 // Process sequences from each source round-robin style.
203 scoped_refptr<Sequence> sequence;
204 if (get_work_first_) {
205 sequence = SchedulerWorkerDelegate::GetWork(worker);
206 if (sequence)
207 get_work_first_ = false;
208 }
209
210 if (!sequence) {
211 sequence = GetWorkFromWindowsMessageQueue();
212 if (sequence)
213 get_work_first_ = true;
214 }
215
216 if (!sequence && !get_work_first_) {
217 // This case is important if we checked the Windows Message Queue first
218 // and found there was no work. We don't want to return null immediately
219 // as that could cause the thread to go to sleep while work is waiting via
220 // SchedulerWorkerDelegate::GetWork().
221 sequence = SchedulerWorkerDelegate::GetWork(worker);
222 }
223 return sequence;
224 }
225
OnMainExit(SchedulerWorker *)226 void OnMainExit(SchedulerWorker* /* worker */) override {
227 scoped_com_initializer_.reset();
228 }
229
WaitForWork(WaitableEvent * wake_up_event)230 void WaitForWork(WaitableEvent* wake_up_event) override {
231 DCHECK(wake_up_event);
232 const TimeDelta sleep_time = GetSleepTimeout();
233 const DWORD milliseconds_wait =
234 sleep_time.is_max() ? INFINITE : sleep_time.InMilliseconds();
235 const HANDLE wake_up_event_handle = wake_up_event->handle();
236 MsgWaitForMultipleObjectsEx(1, &wake_up_event_handle, milliseconds_wait,
237 QS_ALLINPUT, 0);
238 }
239
240 private:
GetWorkFromWindowsMessageQueue()241 scoped_refptr<Sequence> GetWorkFromWindowsMessageQueue() {
242 MSG msg;
243 if (PeekMessage(&msg, nullptr, 0, 0, PM_REMOVE) != FALSE) {
244 Task pump_message_task(FROM_HERE,
245 Bind(
246 [](MSG msg) {
247 TranslateMessage(&msg);
248 DispatchMessage(&msg);
249 },
250 std::move(msg)),
251 TaskTraits(MayBlock()), TimeDelta());
252 if (task_tracker_->WillPostTask(&pump_message_task)) {
253 bool was_empty =
254 message_pump_sequence_->PushTask(std::move(pump_message_task));
255 DCHECK(was_empty) << "GetWorkFromWindowsMessageQueue() does not expect "
256 "queueing of pump tasks.";
257 return message_pump_sequence_;
258 }
259 }
260 return nullptr;
261 }
262
263 bool get_work_first_ = true;
264 const scoped_refptr<Sequence> message_pump_sequence_ = new Sequence;
265 const TrackedRef<TaskTracker> task_tracker_;
266 std::unique_ptr<win::ScopedCOMInitializer> scoped_com_initializer_;
267
268 DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerCOMDelegate);
269 };
270
271 #endif // defined(OS_WIN)
272
273 } // namespace
274
275 class SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner
276 : public SingleThreadTaskRunner {
277 public:
278 // Constructs a SchedulerSingleThreadTaskRunner that indirectly controls the
279 // lifetime of a dedicated |worker| for |traits|.
SchedulerSingleThreadTaskRunner(SchedulerSingleThreadTaskRunnerManager * const outer,const TaskTraits & traits,SchedulerWorker * worker,SingleThreadTaskRunnerThreadMode thread_mode)280 SchedulerSingleThreadTaskRunner(
281 SchedulerSingleThreadTaskRunnerManager* const outer,
282 const TaskTraits& traits,
283 SchedulerWorker* worker,
284 SingleThreadTaskRunnerThreadMode thread_mode)
285 : outer_(outer),
286 traits_(traits),
287 worker_(worker),
288 thread_mode_(thread_mode) {
289 DCHECK(outer_);
290 DCHECK(worker_);
291 }
292
293 // SingleThreadTaskRunner:
PostDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)294 bool PostDelayedTask(const Location& from_here,
295 OnceClosure closure,
296 TimeDelta delay) override {
297 if (!g_manager_is_alive)
298 return false;
299
300 Task task(from_here, std::move(closure), traits_, delay);
301 task.single_thread_task_runner_ref = this;
302
303 if (!outer_->task_tracker_->WillPostTask(&task))
304 return false;
305
306 if (task.delayed_run_time.is_null()) {
307 PostTaskNow(std::move(task));
308 } else {
309 outer_->delayed_task_manager_->AddDelayedTask(
310 std::move(task),
311 BindOnce(&SchedulerSingleThreadTaskRunner::PostTaskNow,
312 Unretained(this)));
313 }
314 return true;
315 }
316
PostNonNestableDelayedTask(const Location & from_here,OnceClosure closure,TimeDelta delay)317 bool PostNonNestableDelayedTask(const Location& from_here,
318 OnceClosure closure,
319 TimeDelta delay) override {
320 // Tasks are never nested within the task scheduler.
321 return PostDelayedTask(from_here, std::move(closure), delay);
322 }
323
RunsTasksInCurrentSequence() const324 bool RunsTasksInCurrentSequence() const override {
325 if (!g_manager_is_alive)
326 return false;
327 return GetDelegate()->RunsTasksInCurrentSequence();
328 }
329
330 private:
~SchedulerSingleThreadTaskRunner()331 ~SchedulerSingleThreadTaskRunner() override {
332 // Only unregister if this is a DEDICATED SingleThreadTaskRunner. SHARED
333 // task runner SchedulerWorkers are managed separately as they are reused.
334 // |g_manager_is_alive| avoids a use-after-free should this
335 // SchedulerSingleThreadTaskRunner outlive its manager. It is safe to access
336 // |g_manager_is_alive| without synchronization primitives as it is const
337 // for the lifetime of the manager and ~SchedulerSingleThreadTaskRunner()
338 // either happens prior to the end of JoinForTesting() (which happens-before
339 // manager's destruction) or on main thread after the task environment's
340 // entire destruction (which happens-after the manager's destruction). Yes,
341 // there's a theoretical use case where the last ref to this
342 // SchedulerSingleThreadTaskRunner is handed to a thread not controlled by
343 // task_scheduler and that this ends up causing
344 // ~SchedulerSingleThreadTaskRunner() to race with
345 // ~SchedulerSingleThreadTaskRunnerManager() but this is intentionally not
346 // supported (and it doesn't matter in production where we leak the task
347 // environment for such reasons). TSan should catch this weird paradigm
348 // should anyone elect to use it in a unit test and the error would point
349 // here.
350 if (g_manager_is_alive &&
351 thread_mode_ == SingleThreadTaskRunnerThreadMode::DEDICATED) {
352 outer_->UnregisterSchedulerWorker(worker_);
353 }
354 }
355
PostTaskNow(Task task)356 void PostTaskNow(Task task) {
357 scoped_refptr<Sequence> sequence = GetDelegate()->sequence();
358 // If |sequence| is null, then the thread is effectively gone (either
359 // shutdown or joined).
360 if (!sequence)
361 return;
362
363 const bool sequence_was_empty = sequence->PushTask(std::move(task));
364 if (sequence_was_empty) {
365 sequence = outer_->task_tracker_->WillScheduleSequence(
366 std::move(sequence), GetDelegate());
367 if (sequence) {
368 GetDelegate()->ReEnqueueSequence(std::move(sequence));
369 worker_->WakeUp();
370 }
371 }
372 }
373
GetDelegate() const374 SchedulerWorkerDelegate* GetDelegate() const {
375 return static_cast<SchedulerWorkerDelegate*>(worker_->delegate());
376 }
377
378 SchedulerSingleThreadTaskRunnerManager* const outer_;
379 const TaskTraits traits_;
380 SchedulerWorker* const worker_;
381 const SingleThreadTaskRunnerThreadMode thread_mode_;
382
383 DISALLOW_COPY_AND_ASSIGN(SchedulerSingleThreadTaskRunner);
384 };
385
SchedulerSingleThreadTaskRunnerManager(TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)386 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunnerManager(
387 TrackedRef<TaskTracker> task_tracker,
388 DelayedTaskManager* delayed_task_manager)
389 : task_tracker_(std::move(task_tracker)),
390 delayed_task_manager_(delayed_task_manager) {
391 DCHECK(task_tracker_);
392 DCHECK(delayed_task_manager_);
393 #if defined(OS_WIN)
394 static_assert(arraysize(shared_com_scheduler_workers_) ==
395 arraysize(shared_scheduler_workers_),
396 "The size of |shared_com_scheduler_workers_| must match "
397 "|shared_scheduler_workers_|");
398 static_assert(arraysize(shared_com_scheduler_workers_[0]) ==
399 arraysize(shared_scheduler_workers_[0]),
400 "The size of |shared_com_scheduler_workers_| must match "
401 "|shared_scheduler_workers_|");
402 #endif // defined(OS_WIN)
403 DCHECK(!g_manager_is_alive);
404 g_manager_is_alive = true;
405 }
406
407 SchedulerSingleThreadTaskRunnerManager::
~SchedulerSingleThreadTaskRunnerManager()408 ~SchedulerSingleThreadTaskRunnerManager() {
409 DCHECK(g_manager_is_alive);
410 g_manager_is_alive = false;
411 }
412
Start(SchedulerWorkerObserver * scheduler_worker_observer)413 void SchedulerSingleThreadTaskRunnerManager::Start(
414 SchedulerWorkerObserver* scheduler_worker_observer) {
415 DCHECK(!scheduler_worker_observer_);
416 scheduler_worker_observer_ = scheduler_worker_observer;
417
418 decltype(workers_) workers_to_start;
419 {
420 AutoSchedulerLock auto_lock(lock_);
421 started_ = true;
422 workers_to_start = workers_;
423 }
424
425 // Start workers that were created before this method was called.
426 // Workers that already need to wake up are already signaled as part of
427 // SchedulerSingleThreadTaskRunner::PostTaskNow(). As a result, it's
428 // unnecessary to call WakeUp() for each worker (in fact, an extraneous
429 // WakeUp() would be racy and wrong - see https://crbug.com/862582).
430 for (scoped_refptr<SchedulerWorker> worker : workers_to_start)
431 worker->Start(scheduler_worker_observer_);
432 }
433
434 scoped_refptr<SingleThreadTaskRunner>
CreateSingleThreadTaskRunnerWithTraits(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)435 SchedulerSingleThreadTaskRunnerManager::CreateSingleThreadTaskRunnerWithTraits(
436 const TaskTraits& traits,
437 SingleThreadTaskRunnerThreadMode thread_mode) {
438 return CreateTaskRunnerWithTraitsImpl<SchedulerWorkerDelegate>(traits,
439 thread_mode);
440 }
441
442 #if defined(OS_WIN)
443 scoped_refptr<SingleThreadTaskRunner>
CreateCOMSTATaskRunnerWithTraits(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)444 SchedulerSingleThreadTaskRunnerManager::CreateCOMSTATaskRunnerWithTraits(
445 const TaskTraits& traits,
446 SingleThreadTaskRunnerThreadMode thread_mode) {
447 return CreateTaskRunnerWithTraitsImpl<SchedulerWorkerCOMDelegate>(
448 traits, thread_mode);
449 }
450 #endif // defined(OS_WIN)
451
452 // static
453 SchedulerSingleThreadTaskRunnerManager::ContinueOnShutdown
TraitsToContinueOnShutdown(const TaskTraits & traits)454 SchedulerSingleThreadTaskRunnerManager::TraitsToContinueOnShutdown(
455 const TaskTraits& traits) {
456 if (traits.shutdown_behavior() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
457 return IS_CONTINUE_ON_SHUTDOWN;
458 return IS_NOT_CONTINUE_ON_SHUTDOWN;
459 }
460
461 template <typename DelegateType>
462 scoped_refptr<
463 SchedulerSingleThreadTaskRunnerManager::SchedulerSingleThreadTaskRunner>
CreateTaskRunnerWithTraitsImpl(const TaskTraits & traits,SingleThreadTaskRunnerThreadMode thread_mode)464 SchedulerSingleThreadTaskRunnerManager::CreateTaskRunnerWithTraitsImpl(
465 const TaskTraits& traits,
466 SingleThreadTaskRunnerThreadMode thread_mode) {
467 DCHECK(thread_mode != SingleThreadTaskRunnerThreadMode::SHARED ||
468 !traits.with_base_sync_primitives())
469 << "Using WithBaseSyncPrimitives() on a shared SingleThreadTaskRunner "
470 "may cause deadlocks. Either reevaluate your usage (e.g. use "
471 "SequencedTaskRunner) or use "
472 "SingleThreadTaskRunnerThreadMode::DEDICATED.";
473 // To simplify the code, |dedicated_worker| is a local only variable that
474 // allows the code to treat both the DEDICATED and SHARED cases similarly for
475 // SingleThreadTaskRunnerThreadMode. In DEDICATED, the scoped_refptr is backed
476 // by a local variable and in SHARED, the scoped_refptr is backed by a member
477 // variable.
478 SchedulerWorker* dedicated_worker = nullptr;
479 SchedulerWorker*& worker =
480 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
481 ? dedicated_worker
482 : GetSharedSchedulerWorkerForTraits<DelegateType>(traits);
483 bool new_worker = false;
484 bool started;
485 {
486 AutoSchedulerLock auto_lock(lock_);
487 if (!worker) {
488 const auto& environment_params =
489 kEnvironmentParams[GetEnvironmentIndexForTraits(traits)];
490 std::string worker_name;
491 if (thread_mode == SingleThreadTaskRunnerThreadMode::SHARED)
492 worker_name += "Shared";
493 worker_name += environment_params.name_suffix;
494 worker = CreateAndRegisterSchedulerWorker<DelegateType>(
495 worker_name, thread_mode,
496 CanUseBackgroundPriorityForSchedulerWorker()
497 ? environment_params.priority_hint
498 : ThreadPriority::NORMAL);
499 new_worker = true;
500 }
501 started = started_;
502 }
503
504 if (new_worker && started)
505 worker->Start(scheduler_worker_observer_);
506
507 return MakeRefCounted<SchedulerSingleThreadTaskRunner>(this, traits, worker,
508 thread_mode);
509 }
510
JoinForTesting()511 void SchedulerSingleThreadTaskRunnerManager::JoinForTesting() {
512 decltype(workers_) local_workers;
513 {
514 AutoSchedulerLock auto_lock(lock_);
515 local_workers = std::move(workers_);
516 }
517
518 for (const auto& worker : local_workers)
519 worker->JoinForTesting();
520
521 {
522 AutoSchedulerLock auto_lock(lock_);
523 DCHECK(workers_.empty())
524 << "New worker(s) unexpectedly registered during join.";
525 workers_ = std::move(local_workers);
526 }
527
528 // Release shared SchedulerWorkers at the end so they get joined above. If
529 // this call happens before the joins, the SchedulerWorkers are effectively
530 // detached and may outlive the SchedulerSingleThreadTaskRunnerManager.
531 ReleaseSharedSchedulerWorkers();
532 }
533
534 template <>
535 std::unique_ptr<SchedulerWorkerDelegate>
CreateSchedulerWorkerDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)536 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate<
537 SchedulerWorkerDelegate>(const std::string& name,
538 int id,
539 SingleThreadTaskRunnerThreadMode thread_mode) {
540 return std::make_unique<SchedulerWorkerDelegate>(
541 StringPrintf("TaskSchedulerSingleThread%s%d", name.c_str(), id),
542 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
543 ? SchedulerWorker::ThreadLabel::DEDICATED
544 : SchedulerWorker::ThreadLabel::SHARED);
545 }
546
547 #if defined(OS_WIN)
548 template <>
549 std::unique_ptr<SchedulerWorkerDelegate>
CreateSchedulerWorkerDelegate(const std::string & name,int id,SingleThreadTaskRunnerThreadMode thread_mode)550 SchedulerSingleThreadTaskRunnerManager::CreateSchedulerWorkerDelegate<
551 SchedulerWorkerCOMDelegate>(const std::string& name,
552 int id,
553 SingleThreadTaskRunnerThreadMode thread_mode) {
554 return std::make_unique<SchedulerWorkerCOMDelegate>(
555 StringPrintf("TaskSchedulerSingleThreadCOMSTA%s%d", name.c_str(), id),
556 thread_mode == SingleThreadTaskRunnerThreadMode::DEDICATED
557 ? SchedulerWorker::ThreadLabel::DEDICATED_COM
558 : SchedulerWorker::ThreadLabel::SHARED_COM,
559 task_tracker_);
560 }
561 #endif // defined(OS_WIN)
562
563 template <typename DelegateType>
564 SchedulerWorker*
CreateAndRegisterSchedulerWorker(const std::string & name,SingleThreadTaskRunnerThreadMode thread_mode,ThreadPriority priority_hint)565 SchedulerSingleThreadTaskRunnerManager::CreateAndRegisterSchedulerWorker(
566 const std::string& name,
567 SingleThreadTaskRunnerThreadMode thread_mode,
568 ThreadPriority priority_hint) {
569 lock_.AssertAcquired();
570 int id = next_worker_id_++;
571 std::unique_ptr<SchedulerWorkerDelegate> delegate =
572 CreateSchedulerWorkerDelegate<DelegateType>(name, id, thread_mode);
573 SchedulerWorkerDelegate* delegate_raw = delegate.get();
574 scoped_refptr<SchedulerWorker> worker = MakeRefCounted<SchedulerWorker>(
575 priority_hint, std::move(delegate), task_tracker_);
576 delegate_raw->set_worker(worker.get());
577 workers_.emplace_back(std::move(worker));
578 return workers_.back().get();
579 }
580
581 template <>
582 SchedulerWorker*&
GetSharedSchedulerWorkerForTraits(const TaskTraits & traits)583 SchedulerSingleThreadTaskRunnerManager::GetSharedSchedulerWorkerForTraits<
584 SchedulerWorkerDelegate>(const TaskTraits& traits) {
585 return shared_scheduler_workers_[GetEnvironmentIndexForTraits(traits)]
586 [TraitsToContinueOnShutdown(traits)];
587 }
588
589 #if defined(OS_WIN)
590 template <>
591 SchedulerWorker*&
GetSharedSchedulerWorkerForTraits(const TaskTraits & traits)592 SchedulerSingleThreadTaskRunnerManager::GetSharedSchedulerWorkerForTraits<
593 SchedulerWorkerCOMDelegate>(const TaskTraits& traits) {
594 return shared_com_scheduler_workers_[GetEnvironmentIndexForTraits(traits)]
595 [TraitsToContinueOnShutdown(traits)];
596 }
597 #endif // defined(OS_WIN)
598
UnregisterSchedulerWorker(SchedulerWorker * worker)599 void SchedulerSingleThreadTaskRunnerManager::UnregisterSchedulerWorker(
600 SchedulerWorker* worker) {
601 // Cleanup uses a SchedulerLock, so call Cleanup() after releasing
602 // |lock_|.
603 scoped_refptr<SchedulerWorker> worker_to_destroy;
604 {
605 AutoSchedulerLock auto_lock(lock_);
606
607 // Skip when joining (the join logic takes care of the rest).
608 if (workers_.empty())
609 return;
610
611 auto worker_iter =
612 std::find_if(workers_.begin(), workers_.end(),
613 [worker](const scoped_refptr<SchedulerWorker>& candidate) {
614 return candidate.get() == worker;
615 });
616 DCHECK(worker_iter != workers_.end());
617 worker_to_destroy = std::move(*worker_iter);
618 workers_.erase(worker_iter);
619 }
620 worker_to_destroy->Cleanup();
621 }
622
ReleaseSharedSchedulerWorkers()623 void SchedulerSingleThreadTaskRunnerManager::ReleaseSharedSchedulerWorkers() {
624 decltype(shared_scheduler_workers_) local_shared_scheduler_workers;
625 #if defined(OS_WIN)
626 decltype(shared_com_scheduler_workers_) local_shared_com_scheduler_workers;
627 #endif
628 {
629 AutoSchedulerLock auto_lock(lock_);
630 for (size_t i = 0; i < arraysize(shared_scheduler_workers_); ++i) {
631 for (size_t j = 0; j < arraysize(shared_scheduler_workers_[i]); ++j) {
632 local_shared_scheduler_workers[i][j] = shared_scheduler_workers_[i][j];
633 shared_scheduler_workers_[i][j] = nullptr;
634 #if defined(OS_WIN)
635 local_shared_com_scheduler_workers[i][j] =
636 shared_com_scheduler_workers_[i][j];
637 shared_com_scheduler_workers_[i][j] = nullptr;
638 #endif
639 }
640 }
641 }
642
643 for (size_t i = 0; i < arraysize(local_shared_scheduler_workers); ++i) {
644 for (size_t j = 0; j < arraysize(local_shared_scheduler_workers[i]); ++j) {
645 if (local_shared_scheduler_workers[i][j])
646 UnregisterSchedulerWorker(local_shared_scheduler_workers[i][j]);
647 #if defined(OS_WIN)
648 if (local_shared_com_scheduler_workers[i][j])
649 UnregisterSchedulerWorker(local_shared_com_scheduler_workers[i][j]);
650 #endif
651 }
652 }
653 }
654
655 } // namespace internal
656 } // namespace base
657