1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
6 #define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
7 
8 #include <stddef.h>
9 
10 #include <memory>
11 #include <string>
12 #include <vector>
13 
14 #include "base/base_export.h"
15 #include "base/containers/stack.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/strings/string_piece.h"
20 #include "base/synchronization/atomic_flag.h"
21 #include "base/synchronization/condition_variable.h"
22 #include "base/synchronization/waitable_event.h"
23 #include "base/task_runner.h"
24 #include "base/task_scheduler/priority_queue.h"
25 #include "base/task_scheduler/scheduler_lock.h"
26 #include "base/task_scheduler/scheduler_worker.h"
27 #include "base/task_scheduler/scheduler_worker_pool.h"
28 #include "base/task_scheduler/scheduler_worker_stack.h"
29 #include "base/task_scheduler/sequence.h"
30 #include "base/task_scheduler/task.h"
31 #include "base/task_scheduler/tracked_ref.h"
32 #include "base/time/time.h"
33 #include "build/build_config.h"
34 
35 namespace base {
36 
37 class HistogramBase;
38 class SchedulerWorkerObserver;
39 class SchedulerWorkerPoolParams;
40 
41 namespace internal {
42 
43 class DelayedTaskManager;
44 class TaskTracker;
45 
46 // A pool of workers that run Tasks.
47 //
48 // The pool doesn't create threads until Start() is called. Tasks can be posted
49 // at any time but will not run until after Start() is called.
50 //
51 // This class is thread-safe.
52 class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
53  public:
54   enum class WorkerEnvironment {
55     // No special worker environment required.
56     NONE,
57 #if defined(OS_WIN)
58     // Initialize a COM MTA on the worker.
59     COM_MTA,
60 #endif  // defined(OS_WIN)
61   };
62 
63   // Constructs a pool without workers.
64   //
65   // |histogram_label| is used to label the pool's histograms ("TaskScheduler."
66   // + histogram_name + "." + |histogram_label| + extra suffixes), it must not
67   // be empty. |pool_label| is used to label the pool's threads, it must not be
68   // empty. |priority_hint| is the preferred thread priority; the actual thread
69   // priority depends on shutdown state and platform capabilities.
70   // |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks
71   // posted with a delay.
72   SchedulerWorkerPoolImpl(StringPiece histogram_label,
73                           StringPiece pool_label,
74                           ThreadPriority priority_hint,
75                           TrackedRef<TaskTracker> task_tracker,
76                           DelayedTaskManager* delayed_task_manager);
77 
78   // Creates workers following the |params| specification, allowing existing and
79   // future tasks to run. The pool will run at most |max_background_tasks|
80   // unblocked TaskPriority::BACKGROUND tasks concurrently. Uses
81   // |service_thread_task_runner| to monitor for blocked threads in the pool. If
82   // specified, |scheduler_worker_observer| will be notified when a worker
83   // enters and exits its main function. It must not be destroyed before
84   // JoinForTesting() has returned (must never be destroyed in production).
85   // |worker_environment| specifies any requested environment to execute the
86   // tasks. Can only be called once. CHECKs on failure.
87   void Start(const SchedulerWorkerPoolParams& params,
88              int max_background_tasks,
89              scoped_refptr<TaskRunner> service_thread_task_runner,
90              SchedulerWorkerObserver* scheduler_worker_observer,
91              WorkerEnvironment worker_environment);
92 
93   // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in
94   // production; it is always leaked. In tests, it can only be destroyed after
95   // JoinForTesting() has returned.
96   ~SchedulerWorkerPoolImpl() override;
97 
98   // SchedulerWorkerPool:
99   void JoinForTesting() override;
100 
num_tasks_before_detach_histogram()101   const HistogramBase* num_tasks_before_detach_histogram() const {
102     return num_tasks_before_detach_histogram_;
103   }
104 
num_tasks_between_waits_histogram()105   const HistogramBase* num_tasks_between_waits_histogram() const {
106     return num_tasks_between_waits_histogram_;
107   }
108 
109   void GetHistograms(std::vector<const HistogramBase*>* histograms) const;
110 
111   // Returns the maximum number of non-blocked tasks that can run concurrently
112   // in this pool.
113   //
114   // TODO(fdoray): Remove this method. https://crbug.com/687264
115   int GetMaxConcurrentNonBlockedTasksDeprecated() const;
116 
117   // Waits until at least |n| workers are idle. Note that while workers are
118   // disallowed from cleaning up during this call: tests using a custom
119   // |suggested_reclaim_time_| need to be careful to invoke this swiftly after
120   // unblocking the waited upon workers as: if a worker is already detached by
121   // the time this is invoked, it will never make it onto the idle stack and
122   // this call will hang.
123   void WaitForWorkersIdleForTesting(size_t n);
124 
125   // Waits until all workers are idle.
126   void WaitForAllWorkersIdleForTesting();
127 
128   // Waits until |n| workers have cleaned up (since the last call to
129   // WaitForWorkersCleanedUpForTesting() or Start() if it wasn't called yet).
130   void WaitForWorkersCleanedUpForTesting(size_t n);
131 
132   // Returns the number of workers in this worker pool.
133   size_t NumberOfWorkersForTesting() const;
134 
135   // Returns |max_tasks_|.
136   size_t GetMaxTasksForTesting() const;
137 
138   // Returns the number of workers that are idle (i.e. not running tasks).
139   size_t NumberOfIdleWorkersForTesting() const;
140 
141   // Sets the MayBlock waiting threshold to TimeDelta::Max().
142   void MaximizeMayBlockThresholdForTesting();
143 
144  private:
145   class SchedulerWorkerDelegateImpl;
146 
147   // Friend tests so that they can access |kBlockedWorkersPollPeriod| and
148   // BlockedThreshold().
149   friend class TaskSchedulerWorkerPoolBlockingTest;
150   friend class TaskSchedulerWorkerPoolMayBlockTest;
151 
152   // The period between calls to AdjustMaxTasks() when the pool is at capacity.
153   // This value was set unscientifically based on intuition and may be adjusted
154   // in the future.
155   static constexpr TimeDelta kBlockedWorkersPollPeriod =
156       TimeDelta::FromMilliseconds(50);
157 
158   // SchedulerWorkerPool:
159   void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
160 
161   // Waits until at least |n| workers are idle. |lock_| must be held to call
162   // this function.
163   void WaitForWorkersIdleLockRequiredForTesting(size_t n);
164 
165   // Wakes up the last worker from this worker pool to go idle, if any.
166   void WakeUpOneWorker();
167 
168   // Performs the same action as WakeUpOneWorker() except asserts |lock_| is
169   // acquired rather than acquires it and returns true if worker wakeups are
170   // permitted.
171   bool WakeUpOneWorkerLockRequired();
172 
173   // Adds a worker, if needed, to maintain one idle worker, |max_tasks_|
174   // permitting.
175   void MaintainAtLeastOneIdleWorkerLockRequired();
176 
177   // Adds |worker| to |idle_workers_stack_|.
178   void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker);
179 
180   // Removes |worker| from |idle_workers_stack_|.
181   void RemoveFromIdleWorkersStackLockRequired(SchedulerWorker* worker);
182 
183   // Returns true if worker cleanup is permitted.
184   bool CanWorkerCleanupForTestingLockRequired();
185 
186   // Tries to add a new SchedulerWorker to the pool. Returns the new
187   // SchedulerWorker on success, nullptr otherwise. Cannot be called before
188   // Start(). Must be called under the protection of |lock_|.
189   SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired();
190 
191   // Returns the number of workers in the pool that should not run tasks due to
192   // the pool being over capacity.
193   size_t NumberOfExcessWorkersLockRequired() const;
194 
195   // Examines the list of SchedulerWorkers and increments |max_tasks_| for each
196   // worker that has been within the scope of a MAY_BLOCK ScopedBlockingCall for
197   // more than BlockedThreshold().
198   void AdjustMaxTasks();
199 
200   // Returns the threshold after which the max tasks is increased to compensate
201   // for a worker that is within a MAY_BLOCK ScopedBlockingCall.
202   TimeDelta MayBlockThreshold() const;
203 
204   // Starts calling AdjustMaxTasks() periodically on
205   // |service_thread_task_runner_| if not already requested.
206   void ScheduleAdjustMaxTasksIfNeeded();
207 
208   // Calls AdjustMaxTasks() and schedules it again as necessary. May only be
209   // called from the service thread.
210   void AdjustMaxTasksFunction();
211 
212   // Returns true if AdjustMaxTasks() should periodically be called on
213   // |service_thread_task_runner_|.
214   bool ShouldPeriodicallyAdjustMaxTasksLockRequired();
215 
216   // Increments/decrements the number of tasks that can run in this pool.
217   // |is_running_background_task| indicates whether the worker causing the
218   // change is currently running a TaskPriority::BACKGROUND task.
219   void DecrementMaxTasksLockRequired(bool is_running_background_task);
220   void IncrementMaxTasksLockRequired(bool is_running_background_task);
221 
222   const std::string pool_label_;
223   const ThreadPriority priority_hint_;
224 
225   // PriorityQueue from which all threads of this worker pool get work.
226   PriorityQueue shared_priority_queue_;
227 
228   // Suggested reclaim time for workers. Initialized by Start(). Never modified
229   // afterwards (i.e. can be read without synchronization after Start()).
230   TimeDelta suggested_reclaim_time_;
231 
232   SchedulerBackwardCompatibility backward_compatibility_;
233 
234   // Synchronizes accesses to |workers_|, |max_tasks_|, |max_background_tasks_|,
235   // |num_running_background_tasks_|, |num_pending_may_block_workers_|,
236   // |idle_workers_stack_|, |idle_workers_stack_cv_for_testing_|,
237   // |num_wake_ups_before_start_|, |cleanup_timestamps_|, |polling_max_tasks_|,
238   // |worker_cleanup_disallowed_for_testing_|,
239   // |num_workers_cleaned_up_for_testing_|,
240   // |SchedulerWorkerDelegateImpl::is_on_idle_workers_stack_|,
241   // |SchedulerWorkerDelegateImpl::incremented_max_tasks_since_blocked_| and
242   // |SchedulerWorkerDelegateImpl::may_block_start_time_|. Has
243   // |shared_priority_queue_|'s lock as its predecessor so that a worker can be
244   // pushed to |idle_workers_stack_| within the scope of a Transaction (more
245   // details in GetWork()).
246   mutable SchedulerLock lock_;
247 
248   // All workers owned by this worker pool.
249   std::vector<scoped_refptr<SchedulerWorker>> workers_;
250 
251   // The maximum number of tasks that can run concurrently in this pool. Workers
252   // can be added as needed up until there are |max_tasks_| workers.
253   size_t max_tasks_ = 0;
254 
255   // Initial value of |max_tasks_| as set in Start().
256   size_t initial_max_tasks_ = 0;
257 
258   // The maximum number of background tasks that can run concurrently in this
259   // pool.
260   int max_background_tasks_ = 0;
261 
262   // The number of background tasks that are currently running in this pool.
263   int num_running_background_tasks_ = 0;
264 
265   // Number of workers that are within the scope of a MAY_BLOCK
266   // ScopedBlockingCall but haven't caused a max task increase yet.
267   int num_pending_may_block_workers_ = 0;
268 
269   // Number of workers that are running a TaskPriority::BACKGROUND task and are
270   // within the scope of a MAY_BLOCK ScopedBlockingCall but haven't caused a max
271   // task increase yet.
272   int num_pending_background_may_block_workers_ = 0;
273 
274   // Environment to be initialized per worker.
275   WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE;
276 
277   // Stack of idle workers. Initially, all workers are on this stack. A worker
278   // is removed from the stack before its WakeUp() function is called and when
279   // it receives work from GetWork() (a worker calls GetWork() when its sleep
280   // timeout expires, even if its WakeUp() method hasn't been called). A worker
281   // is pushed on this stack when it receives nullptr from GetWork().
282   SchedulerWorkerStack idle_workers_stack_;
283 
284   // Signaled when a worker is added to the idle workers stack.
285   std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_;
286 
287   // Number of wake ups that occurred before Start(). Never modified after
288   // Start() (i.e. can be read without synchronization after Start()).
289   int num_wake_ups_before_start_ = 0;
290 
291   // Stack that contains the timestamps of when workers get cleaned up.
292   // Timestamps get popped off the stack as new workers are added.
293   base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_;
294 
295   // Whether we are currently polling for necessary adjustments to |max_tasks_|.
296   bool polling_max_tasks_ = false;
297 
298   // Indicates to the delegates that workers are not permitted to cleanup.
299   bool worker_cleanup_disallowed_for_testing_ = false;
300 
301   // Counts the number of workers cleaned up since the last call to
302   // WaitForWorkersCleanedUpForTesting() (or Start() if it wasn't called yet).
303   // |some_workers_cleaned_up_for_testing_| is true if this was ever
304   // incremented. Tests with a custom |suggested_reclaim_time_| can wait on a
305   // specific number of workers being cleaned up via
306   // WaitForWorkersCleanedUpForTesting().
307   size_t num_workers_cleaned_up_for_testing_ = 0;
308 #if DCHECK_IS_ON()
309   bool some_workers_cleaned_up_for_testing_ = false;
310 #endif
311 
312   // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
313   // incremented.
314   std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_;
315 
316   // Used for testing and makes MayBlockThreshold() return the maximum
317   // TimeDelta.
318   AtomicFlag maximum_blocked_threshold_for_testing_;
319 
320 #if DCHECK_IS_ON()
321   // Set at the start of JoinForTesting().
322   AtomicFlag join_for_testing_started_;
323 #endif
324 
325   // TaskScheduler.DetachDuration.[worker pool name] histogram. Intentionally
326   // leaked.
327   HistogramBase* const detach_duration_histogram_;
328 
329   // TaskScheduler.NumTasksBeforeDetach.[worker pool name] histogram.
330   // Intentionally leaked.
331   HistogramBase* const num_tasks_before_detach_histogram_;
332 
333   // TaskScheduler.NumTasksBetweenWaits.[worker pool name] histogram.
334   // Intentionally leaked.
335   HistogramBase* const num_tasks_between_waits_histogram_;
336 
337   scoped_refptr<TaskRunner> service_thread_task_runner_;
338 
339   // Optional observer notified when a worker enters and exits its main
340   // function. Set in Start() and never modified afterwards.
341   SchedulerWorkerObserver* scheduler_worker_observer_ = nullptr;
342 
343   // Ensures recently cleaned up workers (ref.
344   // SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as
345   // they have a raw reference to |this| (and to TaskTracker) which can
346   // otherwise result in racy use-after-frees per no longer being part of
347   // |workers_| and hence not being explicitly joined in JoinForTesting() :
348   // https://crbug.com/810464.
349   TrackedRefFactory<SchedulerWorkerPoolImpl> tracked_ref_factory_;
350 
351   DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl);
352 };
353 
354 }  // namespace internal
355 }  // namespace base
356 
357 #endif  // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
358