1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/task_scheduler/scheduler_worker_pool_impl.h"
6 
7 #include <stddef.h>
8 
9 #include <algorithm>
10 #include <utility>
11 
12 #include "base/atomicops.h"
13 #include "base/auto_reset.h"
14 #include "base/bind.h"
15 #include "base/bind_helpers.h"
16 #include "base/compiler_specific.h"
17 #include "base/location.h"
18 #include "base/memory/ptr_util.h"
19 #include "base/metrics/histogram.h"
20 #include "base/sequence_token.h"
21 #include "base/strings/string_util.h"
22 #include "base/strings/stringprintf.h"
23 #include "base/task_scheduler/scheduler_worker_pool_params.h"
24 #include "base/task_scheduler/task_tracker.h"
25 #include "base/task_scheduler/task_traits.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/scoped_blocking_call.h"
28 #include "base/threading/thread_checker.h"
29 #include "base/threading/thread_restrictions.h"
30 
31 #if defined(OS_WIN)
32 #include "base/win/scoped_com_initializer.h"
33 #include "base/win/scoped_windows_thread_environment.h"
34 #include "base/win/scoped_winrt_initializer.h"
35 #include "base/win/windows_version.h"
36 #endif  // defined(OS_WIN)
37 
38 namespace base {
39 namespace internal {
40 
41 constexpr TimeDelta SchedulerWorkerPoolImpl::kBlockedWorkersPollPeriod;
42 
43 namespace {
44 
45 constexpr char kPoolNameSuffix[] = "Pool";
46 constexpr char kDetachDurationHistogramPrefix[] =
47     "TaskScheduler.DetachDuration.";
48 constexpr char kNumTasksBeforeDetachHistogramPrefix[] =
49     "TaskScheduler.NumTasksBeforeDetach.";
50 constexpr char kNumTasksBetweenWaitsHistogramPrefix[] =
51     "TaskScheduler.NumTasksBetweenWaits.";
52 constexpr size_t kMaxNumberOfWorkers = 256;
53 
54 // Only used in DCHECKs.
ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>> & workers,const SchedulerWorker * worker)55 bool ContainsWorker(const std::vector<scoped_refptr<SchedulerWorker>>& workers,
56                     const SchedulerWorker* worker) {
57   auto it = std::find_if(workers.begin(), workers.end(),
58                          [worker](const scoped_refptr<SchedulerWorker>& i) {
59                            return i.get() == worker;
60                          });
61   return it != workers.end();
62 }
63 
64 }  // namespace
65 
66 class SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl
67     : public SchedulerWorker::Delegate,
68       public BlockingObserver {
69  public:
70   // |outer| owns the worker for which this delegate is constructed.
71   SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer);
72   ~SchedulerWorkerDelegateImpl() override;
73 
74   // SchedulerWorker::Delegate:
75   void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
76   SchedulerWorker::ThreadLabel GetThreadLabel() const override;
77   void OnMainEntry(const SchedulerWorker* worker) override;
78   scoped_refptr<Sequence> GetWork(SchedulerWorker* worker) override;
79   void DidRunTask() override;
80   void ReEnqueueSequence(scoped_refptr<Sequence> sequence) override;
81   TimeDelta GetSleepTimeout() override;
82   void OnMainExit(SchedulerWorker* worker) override;
83 
84   // BlockingObserver:
85   void BlockingStarted(BlockingType blocking_type) override;
86   void BlockingTypeUpgraded() override;
87   void BlockingEnded() override;
88 
89   void MayBlockEntered();
90   void WillBlockEntered();
91 
92   // Returns true iff this worker has been within a MAY_BLOCK ScopedBlockingCall
93   // for more than |outer_->MayBlockThreshold()|. The max tasks must be
94   // incremented if this returns true.
95   bool MustIncrementMaxTasksLockRequired();
96 
is_running_background_task_lock_required() const97   bool is_running_background_task_lock_required() const {
98     outer_->lock_.AssertAcquired();
99     return is_running_background_task_;
100   }
101 
102  private:
103   // Returns true if |worker| is allowed to cleanup and remove itself from the
104   // pool. Called from GetWork() when no work is available.
105   bool CanCleanupLockRequired(const SchedulerWorker* worker) const;
106 
107   // Calls cleanup on |worker| and removes it from the pool. Called from
108   // GetWork() when no work is available and CanCleanupLockRequired() returns
109   // true.
110   void CleanupLockRequired(SchedulerWorker* worker);
111 
112   // Called in GetWork() when a worker becomes idle.
113   void OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker);
114 
115   const TrackedRef<SchedulerWorkerPoolImpl> outer_;
116 
117   // Time of the last detach.
118   TimeTicks last_detach_time_;
119 
120   // Number of tasks executed since the last time the
121   // TaskScheduler.NumTasksBetweenWaits histogram was recorded.
122   size_t num_tasks_since_last_wait_ = 0;
123 
124   // Number of tasks executed since the last time the
125   // TaskScheduler.NumTasksBeforeDetach histogram was recorded.
126   size_t num_tasks_since_last_detach_ = 0;
127 
128   // Whether |outer_->max_tasks_| was incremented due to a ScopedBlockingCall on
129   // the thread. Access synchronized by |outer_->lock_|.
130   bool incremented_max_tasks_since_blocked_ = false;
131 
132   // Time when MayBlockScopeEntered() was last called. Reset when
133   // BlockingScopeExited() is called. Access synchronized by |outer_->lock_|.
134   TimeTicks may_block_start_time_;
135 
136   // Whether this worker is currently running a task (i.e. GetWork() has
137   // returned a non-empty sequence and DidRunTask() hasn't been called yet).
138   bool is_running_task_ = false;
139 
140   // Whether this worker is currently running a TaskPriority::BACKGROUND task.
141   // Writes are made from the worker thread and are protected by
142   // |outer_->lock_|. Reads are made from any thread, they are protected by
143   // |outer_->lock_| when made outside of the worker thread.
144   bool is_running_background_task_ = false;
145 
146 #if defined(OS_WIN)
147   std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment_;
148 #endif  // defined(OS_WIN)
149 
150   // Verifies that specific calls are always made from the worker thread.
151   THREAD_CHECKER(worker_thread_checker_);
152 
153   DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerDelegateImpl);
154 };
155 
SchedulerWorkerPoolImpl(StringPiece histogram_label,StringPiece pool_label,ThreadPriority priority_hint,TrackedRef<TaskTracker> task_tracker,DelayedTaskManager * delayed_task_manager)156 SchedulerWorkerPoolImpl::SchedulerWorkerPoolImpl(
157     StringPiece histogram_label,
158     StringPiece pool_label,
159     ThreadPriority priority_hint,
160     TrackedRef<TaskTracker> task_tracker,
161     DelayedTaskManager* delayed_task_manager)
162     : SchedulerWorkerPool(std::move(task_tracker), delayed_task_manager),
163       pool_label_(pool_label.as_string()),
164       priority_hint_(priority_hint),
165       lock_(shared_priority_queue_.container_lock()),
166       idle_workers_stack_cv_for_testing_(lock_.CreateConditionVariable()),
167       // Mimics the UMA_HISTOGRAM_LONG_TIMES macro.
168       detach_duration_histogram_(Histogram::FactoryTimeGet(
169           JoinString({kDetachDurationHistogramPrefix, histogram_label,
170                       kPoolNameSuffix},
171                      ""),
172           TimeDelta::FromMilliseconds(1),
173           TimeDelta::FromHours(1),
174           50,
175           HistogramBase::kUmaTargetedHistogramFlag)),
176       // Mimics the UMA_HISTOGRAM_COUNTS_1000 macro. When a worker runs more
177       // than 1000 tasks before detaching, there is no need to know the exact
178       // number of tasks that ran.
179       num_tasks_before_detach_histogram_(Histogram::FactoryGet(
180           JoinString({kNumTasksBeforeDetachHistogramPrefix, histogram_label,
181                       kPoolNameSuffix},
182                      ""),
183           1,
184           1000,
185           50,
186           HistogramBase::kUmaTargetedHistogramFlag)),
187       // Mimics the UMA_HISTOGRAM_COUNTS_100 macro. A SchedulerWorker is
188       // expected to run between zero and a few tens of tasks between waits.
189       // When it runs more than 100 tasks, there is no need to know the exact
190       // number of tasks that ran.
191       num_tasks_between_waits_histogram_(Histogram::FactoryGet(
192           JoinString({kNumTasksBetweenWaitsHistogramPrefix, histogram_label,
193                       kPoolNameSuffix},
194                      ""),
195           1,
196           100,
197           50,
198           HistogramBase::kUmaTargetedHistogramFlag)),
199       tracked_ref_factory_(this) {
200   DCHECK(!histogram_label.empty());
201   DCHECK(!pool_label_.empty());
202 }
203 
Start(const SchedulerWorkerPoolParams & params,int max_background_tasks,scoped_refptr<TaskRunner> service_thread_task_runner,SchedulerWorkerObserver * scheduler_worker_observer,WorkerEnvironment worker_environment)204 void SchedulerWorkerPoolImpl::Start(
205     const SchedulerWorkerPoolParams& params,
206     int max_background_tasks,
207     scoped_refptr<TaskRunner> service_thread_task_runner,
208     SchedulerWorkerObserver* scheduler_worker_observer,
209     WorkerEnvironment worker_environment) {
210   AutoSchedulerLock auto_lock(lock_);
211 
212   DCHECK(workers_.empty());
213 
214   max_tasks_ = params.max_tasks();
215   DCHECK_GE(max_tasks_, 1U);
216   initial_max_tasks_ = max_tasks_;
217   DCHECK_LE(initial_max_tasks_, kMaxNumberOfWorkers);
218   max_background_tasks_ = max_background_tasks;
219   suggested_reclaim_time_ = params.suggested_reclaim_time();
220   backward_compatibility_ = params.backward_compatibility();
221   worker_environment_ = worker_environment;
222 
223   service_thread_task_runner_ = std::move(service_thread_task_runner);
224 
225   DCHECK(!scheduler_worker_observer_);
226   scheduler_worker_observer_ = scheduler_worker_observer;
227 
228   // The initial number of workers is |num_wake_ups_before_start_| + 1 to try to
229   // keep one at least one standby thread at all times (capacity permitting).
230   const int num_initial_workers =
231       std::min(num_wake_ups_before_start_ + 1, static_cast<int>(max_tasks_));
232   workers_.reserve(num_initial_workers);
233 
234   for (int index = 0; index < num_initial_workers; ++index) {
235     SchedulerWorker* worker =
236         CreateRegisterAndStartSchedulerWorkerLockRequired();
237 
238     // CHECK that the first worker can be started (assume that failure means
239     // that threads can't be created on this machine).
240     CHECK(worker || index > 0);
241 
242     if (worker) {
243       if (index < num_wake_ups_before_start_) {
244         worker->WakeUp();
245       } else {
246         idle_workers_stack_.Push(worker);
247       }
248     }
249   }
250 }
251 
~SchedulerWorkerPoolImpl()252 SchedulerWorkerPoolImpl::~SchedulerWorkerPoolImpl() {
253   // SchedulerWorkerPool should only ever be deleted:
254   //  1) In tests, after JoinForTesting().
255   //  2) In production, iff initialization failed.
256   // In both cases |workers_| should be empty.
257   DCHECK(workers_.empty());
258 }
259 
OnCanScheduleSequence(scoped_refptr<Sequence> sequence)260 void SchedulerWorkerPoolImpl::OnCanScheduleSequence(
261     scoped_refptr<Sequence> sequence) {
262   const auto sequence_sort_key = sequence->GetSortKey();
263   shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
264                                                   sequence_sort_key);
265 
266   WakeUpOneWorker();
267 }
268 
GetHistograms(std::vector<const HistogramBase * > * histograms) const269 void SchedulerWorkerPoolImpl::GetHistograms(
270     std::vector<const HistogramBase*>* histograms) const {
271   histograms->push_back(detach_duration_histogram_);
272   histograms->push_back(num_tasks_between_waits_histogram_);
273 }
274 
GetMaxConcurrentNonBlockedTasksDeprecated() const275 int SchedulerWorkerPoolImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
276 #if DCHECK_IS_ON()
277   AutoSchedulerLock auto_lock(lock_);
278   DCHECK_NE(initial_max_tasks_, 0U)
279       << "GetMaxConcurrentTasksDeprecated() should only be called after the "
280       << "worker pool has started.";
281 #endif
282   return initial_max_tasks_;
283 }
284 
WaitForWorkersIdleForTesting(size_t n)285 void SchedulerWorkerPoolImpl::WaitForWorkersIdleForTesting(size_t n) {
286   AutoSchedulerLock auto_lock(lock_);
287 
288 #if DCHECK_IS_ON()
289   DCHECK(!some_workers_cleaned_up_for_testing_)
290       << "Workers detached prior to waiting for a specific number of idle "
291          "workers. Doing the wait under such conditions is flaky. Consider "
292          "using |suggested_reclaim_time_ = TimeDelta::Max()| for this test.";
293 #endif
294 
295   WaitForWorkersIdleLockRequiredForTesting(n);
296 }
297 
WaitForAllWorkersIdleForTesting()298 void SchedulerWorkerPoolImpl::WaitForAllWorkersIdleForTesting() {
299   AutoSchedulerLock auto_lock(lock_);
300   WaitForWorkersIdleLockRequiredForTesting(workers_.size());
301 }
302 
WaitForWorkersCleanedUpForTesting(size_t n)303 void SchedulerWorkerPoolImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
304   AutoSchedulerLock auto_lock(lock_);
305 
306   if (!num_workers_cleaned_up_for_testing_cv_)
307     num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();
308 
309   while (num_workers_cleaned_up_for_testing_ < n)
310     num_workers_cleaned_up_for_testing_cv_->Wait();
311 
312   num_workers_cleaned_up_for_testing_ = 0;
313 }
314 
JoinForTesting()315 void SchedulerWorkerPoolImpl::JoinForTesting() {
316 #if DCHECK_IS_ON()
317   join_for_testing_started_.Set();
318 #endif
319 
320   decltype(workers_) workers_copy;
321   {
322     AutoSchedulerLock auto_lock(lock_);
323 
324     DCHECK_GT(workers_.size(), size_t(0)) << "Joined an unstarted worker pool.";
325 
326     // Ensure SchedulerWorkers in |workers_| do not attempt to cleanup while
327     // being joined.
328     worker_cleanup_disallowed_for_testing_ = true;
329 
330     // Make a copy of the SchedulerWorkers so that we can call
331     // SchedulerWorker::JoinForTesting() without holding |lock_| since
332     // SchedulerWorkers may need to access |workers_|.
333     workers_copy = workers_;
334   }
335   for (const auto& worker : workers_copy)
336     worker->JoinForTesting();
337 
338   AutoSchedulerLock auto_lock(lock_);
339   DCHECK(workers_ == workers_copy);
340   // Release |workers_| to clear their TrackedRef against |this|.
341   workers_.clear();
342 }
343 
NumberOfWorkersForTesting() const344 size_t SchedulerWorkerPoolImpl::NumberOfWorkersForTesting() const {
345   AutoSchedulerLock auto_lock(lock_);
346   return workers_.size();
347 }
348 
GetMaxTasksForTesting() const349 size_t SchedulerWorkerPoolImpl::GetMaxTasksForTesting() const {
350   AutoSchedulerLock auto_lock(lock_);
351   return max_tasks_;
352 }
353 
NumberOfIdleWorkersForTesting() const354 size_t SchedulerWorkerPoolImpl::NumberOfIdleWorkersForTesting() const {
355   AutoSchedulerLock auto_lock(lock_);
356   return idle_workers_stack_.Size();
357 }
358 
MaximizeMayBlockThresholdForTesting()359 void SchedulerWorkerPoolImpl::MaximizeMayBlockThresholdForTesting() {
360   maximum_blocked_threshold_for_testing_.Set();
361 }
362 
363 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer)364     SchedulerWorkerDelegateImpl(TrackedRef<SchedulerWorkerPoolImpl> outer)
365     : outer_(std::move(outer)) {
366   // Bound in OnMainEntry().
367   DETACH_FROM_THREAD(worker_thread_checker_);
368 }
369 
370 // OnMainExit() handles the thread-affine cleanup; SchedulerWorkerDelegateImpl
371 // can thereafter safely be deleted from any thread.
372 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
373     ~SchedulerWorkerDelegateImpl() = default;
374 
375 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
OnCanScheduleSequence(scoped_refptr<Sequence> sequence)376     OnCanScheduleSequence(scoped_refptr<Sequence> sequence) {
377   outer_->OnCanScheduleSequence(std::move(sequence));
378 }
379 
380 SchedulerWorker::ThreadLabel
GetThreadLabel() const381 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetThreadLabel() const {
382   return SchedulerWorker::ThreadLabel::POOLED;
383 }
384 
OnMainEntry(const SchedulerWorker * worker)385 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainEntry(
386     const SchedulerWorker* worker) {
387   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
388 
389   {
390 #if DCHECK_IS_ON()
391     AutoSchedulerLock auto_lock(outer_->lock_);
392     DCHECK(ContainsWorker(outer_->workers_, worker));
393 #endif
394   }
395 
396 #if defined(OS_WIN)
397   if (outer_->worker_environment_ == WorkerEnvironment::COM_MTA) {
398     if (win::GetVersion() >= win::VERSION_WIN8) {
399       win_thread_environment_ = std::make_unique<win::ScopedWinrtInitializer>();
400     } else {
401       win_thread_environment_ = std::make_unique<win::ScopedCOMInitializer>(
402           win::ScopedCOMInitializer::kMTA);
403     }
404     DCHECK(win_thread_environment_->Succeeded());
405   }
406 #endif  // defined(OS_WIN)
407 
408   DCHECK_EQ(num_tasks_since_last_wait_, 0U);
409 
410   PlatformThread::SetName(
411       StringPrintf("TaskScheduler%sWorker", outer_->pool_label_.c_str()));
412 
413   outer_->BindToCurrentThread();
414   SetBlockingObserverForCurrentThread(this);
415 }
416 
417 scoped_refptr<Sequence>
GetWork(SchedulerWorker * worker)418 SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::GetWork(
419     SchedulerWorker* worker) {
420   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
421   DCHECK(!is_running_task_);
422   DCHECK(!is_running_background_task_);
423 
424   {
425     AutoSchedulerLock auto_lock(outer_->lock_);
426 
427     DCHECK(ContainsWorker(outer_->workers_, worker));
428 
429     // Calling GetWork() while on the idle worker stack indicates that we
430     // must've reached GetWork() because of the WaitableEvent timing out. In
431     // which case, we return no work and possibly cleanup the worker. To avoid
432     // searching through the idle stack : use GetLastUsedTime() not being null
433     // (or being directly on top of the idle stack) as a proxy for being on the
434     // idle stack.
435     const bool is_on_idle_workers_stack =
436         outer_->idle_workers_stack_.Peek() == worker ||
437         !worker->GetLastUsedTime().is_null();
438     DCHECK_EQ(is_on_idle_workers_stack,
439               outer_->idle_workers_stack_.Contains(worker));
440     if (is_on_idle_workers_stack) {
441       if (CanCleanupLockRequired(worker))
442         CleanupLockRequired(worker);
443       return nullptr;
444     }
445 
446     // Excess workers should not get work, until they are no longer excess (i.e.
447     // max tasks increases or another worker cleans up). This ensures that if we
448     // have excess workers in the pool, they get a chance to no longer be excess
449     // before being cleaned up.
450     if (outer_->NumberOfExcessWorkersLockRequired() >
451         outer_->idle_workers_stack_.Size()) {
452       OnWorkerBecomesIdleLockRequired(worker);
453       return nullptr;
454     }
455   }
456   scoped_refptr<Sequence> sequence;
457   {
458     std::unique_ptr<PriorityQueue::Transaction> transaction(
459         outer_->shared_priority_queue_.BeginTransaction());
460 
461     if (transaction->IsEmpty()) {
462       // |transaction| is kept alive while |worker| is added to
463       // |idle_workers_stack_| to avoid this race:
464       // 1. This thread creates a Transaction, finds |shared_priority_queue_|
465       //    empty and ends the Transaction.
466       // 2. Other thread creates a Transaction, inserts a Sequence into
467       //    |shared_priority_queue_| and ends the Transaction. This can't happen
468       //    if the Transaction of step 1 is still active because because there
469       //    can only be one active Transaction per PriorityQueue at a time.
470       // 3. Other thread calls WakeUpOneWorker(). No thread is woken up because
471       //    |idle_workers_stack_| is empty.
472       // 4. This thread adds itself to |idle_workers_stack_| and goes to sleep.
473       //    No thread runs the Sequence inserted in step 2.
474       AutoSchedulerLock auto_lock(outer_->lock_);
475       OnWorkerBecomesIdleLockRequired(worker);
476       return nullptr;
477     }
478 
479     // Enforce that no more than |max_background_tasks_| run concurrently.
480     const TaskPriority priority = transaction->PeekSortKey().priority();
481     if (priority == TaskPriority::BACKGROUND) {
482       AutoSchedulerLock auto_lock(outer_->lock_);
483       if (outer_->num_running_background_tasks_ <
484           outer_->max_background_tasks_) {
485         ++outer_->num_running_background_tasks_;
486         is_running_background_task_ = true;
487       } else {
488         OnWorkerBecomesIdleLockRequired(worker);
489         return nullptr;
490       }
491     }
492 
493     sequence = transaction->PopSequence();
494   }
495   DCHECK(sequence);
496 #if DCHECK_IS_ON()
497   {
498     AutoSchedulerLock auto_lock(outer_->lock_);
499     DCHECK(!outer_->idle_workers_stack_.Contains(worker));
500   }
501 #endif
502 
503   is_running_task_ = true;
504   return sequence;
505 }
506 
DidRunTask()507 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::DidRunTask() {
508   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
509   DCHECK(may_block_start_time_.is_null());
510   DCHECK(!incremented_max_tasks_since_blocked_);
511   DCHECK(is_running_task_);
512 
513   is_running_task_ = false;
514 
515   if (is_running_background_task_) {
516     AutoSchedulerLock auto_lock(outer_->lock_);
517     --outer_->num_running_background_tasks_;
518     is_running_background_task_ = false;
519   }
520 
521   ++num_tasks_since_last_wait_;
522   ++num_tasks_since_last_detach_;
523 }
524 
525 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
ReEnqueueSequence(scoped_refptr<Sequence> sequence)526     ReEnqueueSequence(scoped_refptr<Sequence> sequence) {
527   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
528 
529   const SequenceSortKey sequence_sort_key = sequence->GetSortKey();
530   outer_->shared_priority_queue_.BeginTransaction()->Push(std::move(sequence),
531                                                           sequence_sort_key);
532   // This worker will soon call GetWork(). Therefore, there is no need to wake
533   // up a worker to run the sequence that was just inserted into
534   // |outer_->shared_priority_queue_|.
535 }
536 
537 TimeDelta SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
GetSleepTimeout()538     GetSleepTimeout() {
539   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
540   // Sleep for an extra 10% to avoid the following pathological case:
541   //   0) A task is running on a timer which matches |suggested_reclaim_time_|.
542   //   1) The timer fires and this worker is created by
543   //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
544   //      worker was assigned the task.
545   //   2) This worker begins sleeping |suggested_reclaim_time_| (on top of the
546   //      idle stack).
547   //   3) The task assigned to the other worker completes and the worker goes
548   //      back on the idle stack (this worker is now second on the idle stack;
549   //      its GetLastUsedTime() is set to Now()).
550   //   4) The sleep in (2) expires. Since (3) was fast this worker is likely to
551   //      have been second on the idle stack long enough for
552   //      CanCleanupLockRequired() to be satisfied in which case this worker is
553   //      cleaned up.
554   //   5) The timer fires at roughly the same time and we're back to (1) if (4)
555   //      resulted in a clean up; causing thread churn.
556   //
557   //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
558   //   before (5). In that case (5) will cause (3) and refresh this worker's
559   //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
560   //   and avoiding churn.
561   //
562   //   Of course the same problem arises if in (0) the timer matches
563   //   |suggested_reclaim_time_ * 1.1| but it's expected that any timer slower
564   //   than |suggested_reclaim_time_| will cause such churn during long idle
565   //   periods. If this is a problem in practice, the standby thread
566   //   configuration and algorithm should be revisited.
567   return outer_->suggested_reclaim_time_ * 1.1;
568 }
569 
570 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
CanCleanupLockRequired(const SchedulerWorker * worker) const571     CanCleanupLockRequired(const SchedulerWorker* worker) const {
572   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
573 
574   const TimeTicks last_used_time = worker->GetLastUsedTime();
575   return !last_used_time.is_null() &&
576          TimeTicks::Now() - last_used_time >= outer_->suggested_reclaim_time_ &&
577          LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
578 }
579 
CleanupLockRequired(SchedulerWorker * worker)580 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::CleanupLockRequired(
581     SchedulerWorker* worker) {
582   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
583 
584   outer_->lock_.AssertAcquired();
585   outer_->num_tasks_before_detach_histogram_->Add(num_tasks_since_last_detach_);
586   outer_->cleanup_timestamps_.push(TimeTicks::Now());
587   worker->Cleanup();
588   outer_->RemoveFromIdleWorkersStackLockRequired(worker);
589 
590   // Remove the worker from |workers_|.
591   auto worker_iter =
592       std::find(outer_->workers_.begin(), outer_->workers_.end(), worker);
593   DCHECK(worker_iter != outer_->workers_.end());
594   outer_->workers_.erase(worker_iter);
595 
596   ++outer_->num_workers_cleaned_up_for_testing_;
597 #if DCHECK_IS_ON()
598   outer_->some_workers_cleaned_up_for_testing_ = true;
599 #endif
600   if (outer_->num_workers_cleaned_up_for_testing_cv_)
601     outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
602 }
603 
604 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
OnWorkerBecomesIdleLockRequired(SchedulerWorker * worker)605     OnWorkerBecomesIdleLockRequired(SchedulerWorker* worker) {
606   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
607 
608   outer_->lock_.AssertAcquired();
609   // Record the TaskScheduler.NumTasksBetweenWaits histogram. After GetWork()
610   // returns nullptr, the SchedulerWorker will perform a wait on its
611   // WaitableEvent, so we record how many tasks were ran since the last wait
612   // here.
613   outer_->num_tasks_between_waits_histogram_->Add(num_tasks_since_last_wait_);
614   num_tasks_since_last_wait_ = 0;
615   outer_->AddToIdleWorkersStackLockRequired(worker);
616 }
617 
OnMainExit(SchedulerWorker * worker)618 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::OnMainExit(
619     SchedulerWorker* worker) {
620   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
621 
622 #if DCHECK_IS_ON()
623   {
624     bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
625     AutoSchedulerLock auto_lock(outer_->lock_);
626 
627     // |worker| should already have been removed from the idle workers stack and
628     // |workers_| by the time the thread is about to exit. (except in the cases
629     // where the pool is no longer going to be used - in which case, it's fine
630     // for there to be invalid workers in the pool.
631     if (!shutdown_complete && !outer_->join_for_testing_started_.IsSet()) {
632       DCHECK(!outer_->idle_workers_stack_.Contains(worker));
633       DCHECK(!ContainsWorker(outer_->workers_, worker));
634     }
635   }
636 #endif
637 
638 #if defined(OS_WIN)
639   win_thread_environment_.reset();
640 #endif  // defined(OS_WIN)
641 }
642 
BlockingStarted(BlockingType blocking_type)643 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingStarted(
644     BlockingType blocking_type) {
645   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
646 
647   // Blocking calls made outside of tasks should not influence the max tasks.
648   if (!is_running_task_)
649     return;
650 
651   switch (blocking_type) {
652     case BlockingType::MAY_BLOCK:
653       MayBlockEntered();
654       break;
655     case BlockingType::WILL_BLOCK:
656       WillBlockEntered();
657       break;
658   }
659 }
660 
661 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
BlockingTypeUpgraded()662     BlockingTypeUpgraded() {
663   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
664 
665   {
666     AutoSchedulerLock auto_lock(outer_->lock_);
667 
668     // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
669     // same scope already caused the max tasks to be incremented.
670     if (incremented_max_tasks_since_blocked_)
671       return;
672 
673     // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
674     // same scope.
675     if (!may_block_start_time_.is_null()) {
676       may_block_start_time_ = TimeTicks();
677       --outer_->num_pending_may_block_workers_;
678       if (is_running_background_task_)
679         --outer_->num_pending_background_may_block_workers_;
680     }
681   }
682 
683   WillBlockEntered();
684 }
685 
BlockingEnded()686 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::BlockingEnded() {
687   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
688 
689   // Ignore blocking calls made outside of tasks.
690   if (!is_running_task_)
691     return;
692 
693   AutoSchedulerLock auto_lock(outer_->lock_);
694   if (incremented_max_tasks_since_blocked_) {
695     outer_->DecrementMaxTasksLockRequired(is_running_background_task_);
696   } else {
697     DCHECK(!may_block_start_time_.is_null());
698     --outer_->num_pending_may_block_workers_;
699     if (is_running_background_task_)
700       --outer_->num_pending_background_may_block_workers_;
701   }
702 
703   incremented_max_tasks_since_blocked_ = false;
704   may_block_start_time_ = TimeTicks();
705 }
706 
MayBlockEntered()707 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::MayBlockEntered() {
708   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
709 
710   {
711     AutoSchedulerLock auto_lock(outer_->lock_);
712 
713     DCHECK(!incremented_max_tasks_since_blocked_);
714     DCHECK(may_block_start_time_.is_null());
715     may_block_start_time_ = TimeTicks::Now();
716     ++outer_->num_pending_may_block_workers_;
717     if (is_running_background_task_)
718       ++outer_->num_pending_background_may_block_workers_;
719   }
720   outer_->ScheduleAdjustMaxTasksIfNeeded();
721 }
722 
WillBlockEntered()723 void SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::WillBlockEntered() {
724   DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
725 
726   bool wake_up_allowed = false;
727   {
728     std::unique_ptr<PriorityQueue::Transaction> transaction(
729         outer_->shared_priority_queue_.BeginTransaction());
730     AutoSchedulerLock auto_lock(outer_->lock_);
731 
732     DCHECK(!incremented_max_tasks_since_blocked_);
733     DCHECK(may_block_start_time_.is_null());
734     incremented_max_tasks_since_blocked_ = true;
735     outer_->IncrementMaxTasksLockRequired(is_running_background_task_);
736 
737     // If the number of workers was less than the old max tasks, PostTask
738     // would've handled creating extra workers during WakeUpOneWorker.
739     // Therefore, we don't need to do anything here.
740     if (outer_->workers_.size() < outer_->max_tasks_ - 1)
741       return;
742 
743     if (transaction->IsEmpty()) {
744       outer_->MaintainAtLeastOneIdleWorkerLockRequired();
745     } else {
746       // TODO(crbug.com/757897): We may create extra workers in this case:
747       // |workers.size()| was equal to the old |max_tasks_|, we had multiple
748       // ScopedBlockingCalls in parallel and we had work on the PQ.
749       wake_up_allowed = outer_->WakeUpOneWorkerLockRequired();
750       // |wake_up_allowed| is true when the pool is started, and a WILL_BLOCK
751       // scope cannot be entered before the pool starts.
752       DCHECK(wake_up_allowed);
753     }
754   }
755   // TODO(crbug.com/813857): This can be better handled in the PostTask()
756   // codepath. We really only should do this if there are tasks pending.
757   if (wake_up_allowed)
758     outer_->ScheduleAdjustMaxTasksIfNeeded();
759 }
760 
761 bool SchedulerWorkerPoolImpl::SchedulerWorkerDelegateImpl::
MustIncrementMaxTasksLockRequired()762     MustIncrementMaxTasksLockRequired() {
763   outer_->lock_.AssertAcquired();
764 
765   if (!incremented_max_tasks_since_blocked_ &&
766       !may_block_start_time_.is_null() &&
767       TimeTicks::Now() - may_block_start_time_ >= outer_->MayBlockThreshold()) {
768     incremented_max_tasks_since_blocked_ = true;
769 
770     // Reset |may_block_start_time_| so that BlockingScopeExited() knows that it
771     // doesn't have to decrement the number of pending MAY_BLOCK workers.
772     may_block_start_time_ = TimeTicks();
773     --outer_->num_pending_may_block_workers_;
774     if (is_running_background_task_)
775       --outer_->num_pending_background_may_block_workers_;
776 
777     return true;
778   }
779 
780   return false;
781 }
782 
WaitForWorkersIdleLockRequiredForTesting(size_t n)783 void SchedulerWorkerPoolImpl::WaitForWorkersIdleLockRequiredForTesting(
784     size_t n) {
785   lock_.AssertAcquired();
786 
787   // Make sure workers do not cleanup while watching the idle count.
788   AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);
789 
790   while (idle_workers_stack_.Size() < n)
791     idle_workers_stack_cv_for_testing_->Wait();
792 }
793 
WakeUpOneWorkerLockRequired()794 bool SchedulerWorkerPoolImpl::WakeUpOneWorkerLockRequired() {
795   lock_.AssertAcquired();
796 
797   if (workers_.empty()) {
798     ++num_wake_ups_before_start_;
799     return false;
800   }
801 
802   // Ensure that there is one worker that can run tasks on top of the idle
803   // stack, capacity permitting.
804   MaintainAtLeastOneIdleWorkerLockRequired();
805 
806   // If the worker on top of the idle stack can run tasks, wake it up.
807   if (NumberOfExcessWorkersLockRequired() < idle_workers_stack_.Size()) {
808     SchedulerWorker* worker = idle_workers_stack_.Pop();
809     if (worker) {
810       worker->WakeUp();
811     }
812   }
813 
814   // Ensure that there is one worker that can run tasks on top of the idle
815   // stack, capacity permitting.
816   MaintainAtLeastOneIdleWorkerLockRequired();
817 
818   return true;
819 }
820 
WakeUpOneWorker()821 void SchedulerWorkerPoolImpl::WakeUpOneWorker() {
822   bool wake_up_allowed;
823   {
824     AutoSchedulerLock auto_lock(lock_);
825     wake_up_allowed = WakeUpOneWorkerLockRequired();
826   }
827   if (wake_up_allowed)
828     ScheduleAdjustMaxTasksIfNeeded();
829 }
830 
MaintainAtLeastOneIdleWorkerLockRequired()831 void SchedulerWorkerPoolImpl::MaintainAtLeastOneIdleWorkerLockRequired() {
832   lock_.AssertAcquired();
833 
834   if (workers_.size() == kMaxNumberOfWorkers)
835     return;
836   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
837 
838   if (idle_workers_stack_.IsEmpty() && workers_.size() < max_tasks_) {
839     SchedulerWorker* new_worker =
840         CreateRegisterAndStartSchedulerWorkerLockRequired();
841     if (new_worker)
842       idle_workers_stack_.Push(new_worker);
843   }
844 }
845 
AddToIdleWorkersStackLockRequired(SchedulerWorker * worker)846 void SchedulerWorkerPoolImpl::AddToIdleWorkersStackLockRequired(
847     SchedulerWorker* worker) {
848   lock_.AssertAcquired();
849 
850   DCHECK(!idle_workers_stack_.Contains(worker));
851   idle_workers_stack_.Push(worker);
852 
853   DCHECK_LE(idle_workers_stack_.Size(), workers_.size());
854 
855   idle_workers_stack_cv_for_testing_->Broadcast();
856 }
857 
RemoveFromIdleWorkersStackLockRequired(SchedulerWorker * worker)858 void SchedulerWorkerPoolImpl::RemoveFromIdleWorkersStackLockRequired(
859     SchedulerWorker* worker) {
860   lock_.AssertAcquired();
861   idle_workers_stack_.Remove(worker);
862 }
863 
864 SchedulerWorker*
CreateRegisterAndStartSchedulerWorkerLockRequired()865 SchedulerWorkerPoolImpl::CreateRegisterAndStartSchedulerWorkerLockRequired() {
866   lock_.AssertAcquired();
867 
868   DCHECK_LT(workers_.size(), max_tasks_);
869   DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
870   // SchedulerWorker needs |lock_| as a predecessor for its thread lock
871   // because in WakeUpOneWorker, |lock_| is first acquired and then
872   // the thread lock is acquired when WakeUp is called on the worker.
873   scoped_refptr<SchedulerWorker> worker = MakeRefCounted<SchedulerWorker>(
874       priority_hint_,
875       std::make_unique<SchedulerWorkerDelegateImpl>(
876           tracked_ref_factory_.GetTrackedRef()),
877       task_tracker_, &lock_, backward_compatibility_);
878 
879   if (!worker->Start(scheduler_worker_observer_))
880     return nullptr;
881 
882   workers_.push_back(worker);
883   DCHECK_LE(workers_.size(), max_tasks_);
884 
885   if (!cleanup_timestamps_.empty()) {
886     detach_duration_histogram_->AddTime(TimeTicks::Now() -
887                                         cleanup_timestamps_.top());
888     cleanup_timestamps_.pop();
889   }
890   return worker.get();
891 }
892 
NumberOfExcessWorkersLockRequired() const893 size_t SchedulerWorkerPoolImpl::NumberOfExcessWorkersLockRequired() const {
894   lock_.AssertAcquired();
895   return std::max<int>(0, workers_.size() - max_tasks_);
896 }
897 
AdjustMaxTasks()898 void SchedulerWorkerPoolImpl::AdjustMaxTasks() {
899   DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence());
900 
901   std::unique_ptr<PriorityQueue::Transaction> transaction(
902       shared_priority_queue_.BeginTransaction());
903   AutoSchedulerLock auto_lock(lock_);
904 
905   const size_t previous_max_tasks = max_tasks_;
906 
907   // Increment max tasks for each worker that has been within a MAY_BLOCK
908   // ScopedBlockingCall for more than MayBlockThreshold().
909   for (scoped_refptr<SchedulerWorker> worker : workers_) {
910     // The delegates of workers inside a SchedulerWorkerPoolImpl should be
911     // SchedulerWorkerDelegateImpls.
912     SchedulerWorkerDelegateImpl* delegate =
913         static_cast<SchedulerWorkerDelegateImpl*>(worker->delegate());
914     if (delegate->MustIncrementMaxTasksLockRequired()) {
915       IncrementMaxTasksLockRequired(
916           delegate->is_running_background_task_lock_required());
917     }
918   }
919 
920   // Wake up a worker per pending sequence, capacity permitting.
921   const size_t num_pending_sequences = transaction->Size();
922   const size_t num_wake_ups_needed =
923       std::min(max_tasks_ - previous_max_tasks, num_pending_sequences);
924 
925   for (size_t i = 0; i < num_wake_ups_needed; ++i) {
926     // No need to call ScheduleAdjustMaxTasksIfNeeded() as the caller will
927     // take care of that for us.
928     WakeUpOneWorkerLockRequired();
929   }
930 
931   MaintainAtLeastOneIdleWorkerLockRequired();
932 }
933 
MayBlockThreshold() const934 TimeDelta SchedulerWorkerPoolImpl::MayBlockThreshold() const {
935   if (maximum_blocked_threshold_for_testing_.IsSet())
936     return TimeDelta::Max();
937   // This value was set unscientifically based on intuition and may be adjusted
938   // in the future. This value is smaller than |kBlockedWorkersPollPeriod|
939   // because we hope than when multiple workers block around the same time, a
940   // single AdjustMaxTasks() call will perform all the necessary max tasks
941   // adjustments.
942   return TimeDelta::FromMilliseconds(10);
943 }
944 
ScheduleAdjustMaxTasksIfNeeded()945 void SchedulerWorkerPoolImpl::ScheduleAdjustMaxTasksIfNeeded() {
946   {
947     AutoSchedulerLock auto_lock(lock_);
948     if (polling_max_tasks_ || !ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
949       return;
950     }
951     polling_max_tasks_ = true;
952   }
953   service_thread_task_runner_->PostDelayedTask(
954       FROM_HERE,
955       BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction,
956                Unretained(this)),
957       kBlockedWorkersPollPeriod);
958 }
959 
AdjustMaxTasksFunction()960 void SchedulerWorkerPoolImpl::AdjustMaxTasksFunction() {
961   DCHECK(service_thread_task_runner_->RunsTasksInCurrentSequence());
962 
963   AdjustMaxTasks();
964   {
965     AutoSchedulerLock auto_lock(lock_);
966     DCHECK(polling_max_tasks_);
967 
968     if (!ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
969       polling_max_tasks_ = false;
970       return;
971     }
972   }
973   service_thread_task_runner_->PostDelayedTask(
974       FROM_HERE,
975       BindOnce(&SchedulerWorkerPoolImpl::AdjustMaxTasksFunction,
976                Unretained(this)),
977       kBlockedWorkersPollPeriod);
978 }
979 
ShouldPeriodicallyAdjustMaxTasksLockRequired()980 bool SchedulerWorkerPoolImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
981   lock_.AssertAcquired();
982 
983   // The maximum number of background tasks that can run concurrently must be
984   // adjusted periodically when (1) the number of background tasks that are
985   // currently running is equal to it and (2) there are workers running
986   // background tasks within the scope of a MAY_BLOCK ScopedBlockingCall but
987   // haven't cause a max background tasks increment yet.
988   // - When (1) is false: A newly posted background task will be allowed to run
989   //   normally. There is no hurry to increase max background tasks.
990   // - When (2) is false: AdjustMaxTasks() wouldn't affect
991   //   |max_background_tasks_|.
992   if (num_running_background_tasks_ >= max_background_tasks_ &&
993       num_pending_background_may_block_workers_ > 0) {
994     return true;
995   }
996 
997   // The maximum number of tasks that can run concurrently must be adjusted
998   // periodically when (1) there are no idle workers that can do work (2) there
999   // are workers that are within the scope of a MAY_BLOCK ScopedBlockingCall but
1000   // haven't cause a max tasks increment yet.
1001   // - When (1) is false: A newly posted task will run on one of the idle
1002   //   workers that are allowed to do work. There is no hurry to increase max
1003   //   tasks.
1004   // - When (2) is false: AdjustMaxTasks() wouldn't affect |max_tasks_|.
1005   const int idle_workers_that_can_do_work =
1006       idle_workers_stack_.Size() - NumberOfExcessWorkersLockRequired();
1007   return idle_workers_that_can_do_work <= 0 &&
1008          num_pending_may_block_workers_ > 0;
1009 }
1010 
DecrementMaxTasksLockRequired(bool is_running_background_task)1011 void SchedulerWorkerPoolImpl::DecrementMaxTasksLockRequired(
1012     bool is_running_background_task) {
1013   lock_.AssertAcquired();
1014   --max_tasks_;
1015   if (is_running_background_task)
1016     --max_background_tasks_;
1017 }
1018 
IncrementMaxTasksLockRequired(bool is_running_background_task)1019 void SchedulerWorkerPoolImpl::IncrementMaxTasksLockRequired(
1020     bool is_running_background_task) {
1021   lock_.AssertAcquired();
1022   ++max_tasks_;
1023   if (is_running_background_task)
1024     ++max_background_tasks_;
1025 }
1026 
1027 }  // namespace internal
1028 }  // namespace base
1029