1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/sequenced_worker_pool.h"
6 
7 #include <stdint.h>
8 
9 #include <list>
10 #include <map>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <vector>
15 
16 #include "base/atomic_sequence_num.h"
17 #include "base/callback.h"
18 #include "base/compiler_specific.h"
19 #include "base/critical_closure.h"
20 #include "base/lazy_instance.h"
21 #include "base/logging.h"
22 #include "base/macros.h"
23 #include "base/memory/ptr_util.h"
24 #include "base/stl_util.h"
25 #include "base/strings/stringprintf.h"
26 #include "base/synchronization/condition_variable.h"
27 #include "base/synchronization/lock.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/simple_thread.h"
30 #include "base/threading/thread_local.h"
31 #include "base/threading/thread_restrictions.h"
32 #include "base/threading/thread_task_runner_handle.h"
33 #include "base/time/time.h"
34 #include "base/trace_event/heap_profiler.h"
35 #include "base/trace_event/trace_event.h"
36 #include "base/tracked_objects.h"
37 #include "build/build_config.h"
38 
39 #if defined(OS_MACOSX)
40 #include "base/mac/scoped_nsautorelease_pool.h"
41 #elif defined(OS_WIN)
42 #include "base/win/scoped_com_initializer.h"
43 #endif
44 
45 #if !defined(OS_NACL)
46 #include "base/metrics/histogram.h"
47 #endif
48 
49 namespace base {
50 
51 namespace {
52 
53 struct SequencedTask : public TrackingInfo  {
SequencedTaskbase::__anon3cfb63c70111::SequencedTask54   SequencedTask()
55       : sequence_token_id(0),
56         trace_id(0),
57         sequence_task_number(0),
58         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
59 
SequencedTaskbase::__anon3cfb63c70111::SequencedTask60   explicit SequencedTask(const tracked_objects::Location& from_here)
61       : base::TrackingInfo(from_here, TimeTicks()),
62         sequence_token_id(0),
63         trace_id(0),
64         sequence_task_number(0),
65         shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
66 
~SequencedTaskbase::__anon3cfb63c70111::SequencedTask67   ~SequencedTask() {}
68 
69   int sequence_token_id;
70   int trace_id;
71   int64_t sequence_task_number;
72   SequencedWorkerPool::WorkerShutdown shutdown_behavior;
73   tracked_objects::Location posted_from;
74   Closure task;
75 
76   // Non-delayed tasks and delayed tasks are managed together by time-to-run
77   // order. We calculate the time by adding the posted time and the given delay.
78   TimeTicks time_to_run;
79 };
80 
81 struct SequencedTaskLessThan {
82  public:
operator ()base::__anon3cfb63c70111::SequencedTaskLessThan83   bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
84     if (lhs.time_to_run < rhs.time_to_run)
85       return true;
86 
87     if (lhs.time_to_run > rhs.time_to_run)
88       return false;
89 
90     // If the time happen to match, then we use the sequence number to decide.
91     return lhs.sequence_task_number < rhs.sequence_task_number;
92   }
93 };
94 
95 // SequencedWorkerPoolTaskRunner ---------------------------------------------
96 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
97 // fixed ShutdownBehavior.
98 //
99 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
100 class SequencedWorkerPoolTaskRunner : public TaskRunner {
101  public:
102   SequencedWorkerPoolTaskRunner(
103       scoped_refptr<SequencedWorkerPool> pool,
104       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
105 
106   // TaskRunner implementation
107   bool PostDelayedTask(const tracked_objects::Location& from_here,
108                        const Closure& task,
109                        TimeDelta delay) override;
110   bool RunsTasksOnCurrentThread() const override;
111 
112  private:
113   ~SequencedWorkerPoolTaskRunner() override;
114 
115   const scoped_refptr<SequencedWorkerPool> pool_;
116 
117   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
118 
119   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
120 };
121 
SequencedWorkerPoolTaskRunner(scoped_refptr<SequencedWorkerPool> pool,SequencedWorkerPool::WorkerShutdown shutdown_behavior)122 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
123     scoped_refptr<SequencedWorkerPool> pool,
124     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
125     : pool_(std::move(pool)), shutdown_behavior_(shutdown_behavior) {}
126 
~SequencedWorkerPoolTaskRunner()127 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
128 }
129 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)130 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
131     const tracked_objects::Location& from_here,
132     const Closure& task,
133     TimeDelta delay) {
134   if (delay.is_zero()) {
135     return pool_->PostWorkerTaskWithShutdownBehavior(
136         from_here, task, shutdown_behavior_);
137   }
138   return pool_->PostDelayedWorkerTask(from_here, task, delay);
139 }
140 
RunsTasksOnCurrentThread() const141 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
142   return pool_->RunsTasksOnCurrentThread();
143 }
144 
145 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
146 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
147 // fixed sequence token.
148 //
149 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
150 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
151  public:
152   SequencedWorkerPoolSequencedTaskRunner(
153       scoped_refptr<SequencedWorkerPool> pool,
154       SequencedWorkerPool::SequenceToken token,
155       SequencedWorkerPool::WorkerShutdown shutdown_behavior);
156 
157   // TaskRunner implementation
158   bool PostDelayedTask(const tracked_objects::Location& from_here,
159                        const Closure& task,
160                        TimeDelta delay) override;
161   bool RunsTasksOnCurrentThread() const override;
162 
163   // SequencedTaskRunner implementation
164   bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
165                                   const Closure& task,
166                                   TimeDelta delay) override;
167 
168  private:
169   ~SequencedWorkerPoolSequencedTaskRunner() override;
170 
171   const scoped_refptr<SequencedWorkerPool> pool_;
172 
173   const SequencedWorkerPool::SequenceToken token_;
174 
175   const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
176 
177   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
178 };
179 
SequencedWorkerPoolSequencedTaskRunner(scoped_refptr<SequencedWorkerPool> pool,SequencedWorkerPool::SequenceToken token,SequencedWorkerPool::WorkerShutdown shutdown_behavior)180 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
181     scoped_refptr<SequencedWorkerPool> pool,
182     SequencedWorkerPool::SequenceToken token,
183     SequencedWorkerPool::WorkerShutdown shutdown_behavior)
184     : pool_(std::move(pool)),
185       token_(token),
186       shutdown_behavior_(shutdown_behavior) {}
187 
188 SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner()189 ~SequencedWorkerPoolSequencedTaskRunner() {
190 }
191 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)192 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
193     const tracked_objects::Location& from_here,
194     const Closure& task,
195     TimeDelta delay) {
196   if (delay.is_zero()) {
197     return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
198         token_, from_here, task, shutdown_behavior_);
199   }
200   return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
201 }
202 
RunsTasksOnCurrentThread() const203 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
204   return pool_->IsRunningSequenceOnCurrentThread(token_);
205 }
206 
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)207 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
208     const tracked_objects::Location& from_here,
209     const Closure& task,
210     TimeDelta delay) {
211   // There's no way to run nested tasks, so simply forward to
212   // PostDelayedTask.
213   return PostDelayedTask(from_here, task, delay);
214 }
215 
216 // Create a process-wide unique ID to represent this task in trace events. This
217 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
218 // with MessageLoop pointers on other processes.
GetTaskTraceID(const SequencedTask & task,void * pool)219 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
220   return (static_cast<uint64_t>(task.trace_id) << 32) |
221          static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
222 }
223 
224 }  // namespace
225 
226 // Worker ---------------------------------------------------------------------
227 
228 class SequencedWorkerPool::Worker : public SimpleThread {
229  public:
230   // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
231   // around as long as we are running.
232   Worker(scoped_refptr<SequencedWorkerPool> worker_pool,
233          int thread_number,
234          const std::string& thread_name_prefix);
235   ~Worker() override;
236 
237   // SimpleThread implementation. This actually runs the background thread.
238   void Run() override;
239 
240   // Gets the worker for the current thread out of thread-local storage.
241   static Worker* GetForCurrentThread();
242 
243   // Indicates that a task is about to be run. The parameters provide
244   // additional metainformation about the task being run.
set_running_task_info(SequenceToken token,WorkerShutdown shutdown_behavior)245   void set_running_task_info(SequenceToken token,
246                              WorkerShutdown shutdown_behavior) {
247     is_processing_task_ = true;
248     task_sequence_token_ = token;
249     task_shutdown_behavior_ = shutdown_behavior;
250   }
251 
252   // Indicates that the task has finished running.
reset_running_task_info()253   void reset_running_task_info() { is_processing_task_ = false; }
254 
255   // Whether the worker is processing a task.
is_processing_task()256   bool is_processing_task() { return is_processing_task_; }
257 
task_sequence_token() const258   SequenceToken task_sequence_token() const {
259     DCHECK(is_processing_task_);
260     return task_sequence_token_;
261   }
262 
task_shutdown_behavior() const263   WorkerShutdown task_shutdown_behavior() const {
264     DCHECK(is_processing_task_);
265     return task_shutdown_behavior_;
266   }
267 
worker_pool() const268   scoped_refptr<SequencedWorkerPool> worker_pool() const {
269     return worker_pool_;
270   }
271 
272  private:
273   static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
274       lazy_tls_ptr_;
275 
276   scoped_refptr<SequencedWorkerPool> worker_pool_;
277   // The sequence token of the task being processed. Only valid when
278   // is_processing_task_ is true.
279   SequenceToken task_sequence_token_;
280   // The shutdown behavior of the task being processed. Only valid when
281   // is_processing_task_ is true.
282   WorkerShutdown task_shutdown_behavior_;
283   // Whether the Worker is processing a task.
284   bool is_processing_task_;
285 
286   DISALLOW_COPY_AND_ASSIGN(Worker);
287 };
288 
289 // Inner ----------------------------------------------------------------------
290 
291 class SequencedWorkerPool::Inner {
292  public:
293   // Take a raw pointer to |worker| to avoid cycles (since we're owned
294   // by it).
295   Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
296         const std::string& thread_name_prefix,
297         TestingObserver* observer);
298 
299   ~Inner();
300 
301   static SequenceToken GetSequenceToken();
302 
303   SequenceToken GetNamedSequenceToken(const std::string& name);
304 
305   // This function accepts a name and an ID. If the name is null, the
306   // token ID is used. This allows us to implement the optional name lookup
307   // from a single function without having to enter the lock a separate time.
308   bool PostTask(const std::string* optional_token_name,
309                 SequenceToken sequence_token,
310                 WorkerShutdown shutdown_behavior,
311                 const tracked_objects::Location& from_here,
312                 const Closure& task,
313                 TimeDelta delay);
314 
315   bool RunsTasksOnCurrentThread() const;
316 
317   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
318 
319   bool IsRunningSequence(SequenceToken sequence_token) const;
320 
321   void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
322                                           WorkerShutdown shutdown_behavior);
323 
324   void CleanupForTesting();
325 
326   void SignalHasWorkForTesting();
327 
328   int GetWorkSignalCountForTesting() const;
329 
330   void Shutdown(int max_blocking_tasks_after_shutdown);
331 
332   bool IsShutdownInProgress();
333 
334   // Runs the worker loop on the background thread.
335   void ThreadLoop(Worker* this_worker);
336 
337  private:
338   enum GetWorkStatus {
339     GET_WORK_FOUND,
340     GET_WORK_NOT_FOUND,
341     GET_WORK_WAIT,
342   };
343 
344   enum CleanupState {
345     CLEANUP_REQUESTED,
346     CLEANUP_STARTING,
347     CLEANUP_RUNNING,
348     CLEANUP_FINISHING,
349     CLEANUP_DONE,
350   };
351 
352   // Called from within the lock, this converts the given token name into a
353   // token ID, creating a new one if necessary.
354   int LockedGetNamedTokenID(const std::string& name);
355 
356   // Called from within the lock, this returns the next sequence task number.
357   int64_t LockedGetNextSequenceTaskNumber();
358 
359   // Gets new task. There are 3 cases depending on the return value:
360   //
361   // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
362   //    be run immediately.
363   // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
364   //    and |task| is not filled in. In this case, the caller should wait until
365   //    a task is posted.
366   // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
367   //    immediately, and |task| is not filled in. Likewise, |wait_time| is
368   //    filled in the time to wait until the next task to run. In this case, the
369   //    caller should wait the time.
370   //
371   // In any case, the calling code should clear the given
372   // delete_these_outside_lock vector the next time the lock is released.
373   // See the implementation for a more detailed description.
374   GetWorkStatus GetWork(SequencedTask* task,
375                         TimeDelta* wait_time,
376                         std::vector<Closure>* delete_these_outside_lock);
377 
378   void HandleCleanup();
379 
380   // Peforms init and cleanup around running the given task. WillRun...
381   // returns the value from PrepareToStartAdditionalThreadIfNecessary.
382   // The calling code should call FinishStartingAdditionalThread once the
383   // lock is released if the return values is nonzero.
384   int WillRunWorkerTask(const SequencedTask& task);
385   void DidRunWorkerTask(const SequencedTask& task);
386 
387   // Returns true if there are no threads currently running the given
388   // sequence token.
389   bool IsSequenceTokenRunnable(int sequence_token_id) const;
390 
391   // Checks if all threads are busy and the addition of one more could run an
392   // additional task waiting in the queue. This must be called from within
393   // the lock.
394   //
395   // If another thread is helpful, this will mark the thread as being in the
396   // process of starting and returns the index of the new thread which will be
397   // 0 or more. The caller should then call FinishStartingAdditionalThread to
398   // complete initialization once the lock is released.
399   //
400   // If another thread is not necessary, returne 0;
401   //
402   // See the implementedion for more.
403   int PrepareToStartAdditionalThreadIfHelpful();
404 
405   // The second part of thread creation after
406   // PrepareToStartAdditionalThreadIfHelpful with the thread number it
407   // generated. This actually creates the thread and should be called outside
408   // the lock to avoid blocking important work starting a thread in the lock.
409   void FinishStartingAdditionalThread(int thread_number);
410 
411   // Signal |has_work_| and increment |has_work_signal_count_|.
412   void SignalHasWork();
413 
414   // Checks whether there is work left that's blocking shutdown. Must be
415   // called inside the lock.
416   bool CanShutdown() const;
417 
418   SequencedWorkerPool* const worker_pool_;
419 
420   // The last sequence number used. Managed by GetSequenceToken, since this
421   // only does threadsafe increment operations, you do not need to hold the
422   // lock. This is class-static to make SequenceTokens issued by
423   // GetSequenceToken unique across SequencedWorkerPool instances.
424   static base::StaticAtomicSequenceNumber g_last_sequence_number_;
425 
426   // This lock protects |everything in this class|. Do not read or modify
427   // anything without holding this lock. Do not block while holding this
428   // lock.
429   mutable Lock lock_;
430 
431   // Condition variable that is waited on by worker threads until new
432   // tasks are posted or shutdown starts.
433   ConditionVariable has_work_cv_;
434 
435   // Condition variable that is waited on by non-worker threads (in
436   // Shutdown()) until CanShutdown() goes to true.
437   ConditionVariable can_shutdown_cv_;
438 
439   // The maximum number of worker threads we'll create.
440   const size_t max_threads_;
441 
442   const std::string thread_name_prefix_;
443 
444   // Associates all known sequence token names with their IDs.
445   std::map<std::string, int> named_sequence_tokens_;
446 
447   // Owning pointers to all threads we've created so far, indexed by
448   // ID. Since we lazily create threads, this may be less than
449   // max_threads_ and will be initially empty.
450   using ThreadMap = std::map<PlatformThreadId, std::unique_ptr<Worker>>;
451   ThreadMap threads_;
452 
453   // Set to true when we're in the process of creating another thread.
454   // See PrepareToStartAdditionalThreadIfHelpful for more.
455   bool thread_being_created_;
456 
457   // Number of threads currently waiting for work.
458   size_t waiting_thread_count_;
459 
460   // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
461   // or SKIP_ON_SHUTDOWN flag set.
462   size_t blocking_shutdown_thread_count_;
463 
464   // A set of all pending tasks in time-to-run order. These are tasks that are
465   // either waiting for a thread to run on, waiting for their time to run,
466   // or blocked on a previous task in their sequence. We have to iterate over
467   // the tasks by time-to-run order, so we use the set instead of the
468   // traditional priority_queue.
469   typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
470   PendingTaskSet pending_tasks_;
471 
472   // The next sequence number for a new sequenced task.
473   int64_t next_sequence_task_number_;
474 
475   // Number of tasks in the pending_tasks_ list that are marked as blocking
476   // shutdown.
477   size_t blocking_shutdown_pending_task_count_;
478 
479   // Lists all sequence tokens currently executing.
480   std::set<int> current_sequences_;
481 
482   // An ID for each posted task to distinguish the task from others in traces.
483   int trace_id_;
484 
485   // Set when Shutdown is called and no further tasks should be
486   // allowed, though we may still be running existing tasks.
487   bool shutdown_called_;
488 
489   // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
490   // has been called.
491   int max_blocking_tasks_after_shutdown_;
492 
493   // State used to cleanup for testing, all guarded by lock_.
494   CleanupState cleanup_state_;
495   size_t cleanup_idlers_;
496   ConditionVariable cleanup_cv_;
497 
498   TestingObserver* const testing_observer_;
499 
500   DISALLOW_COPY_AND_ASSIGN(Inner);
501 };
502 
503 // Worker definitions ---------------------------------------------------------
504 
Worker(scoped_refptr<SequencedWorkerPool> worker_pool,int thread_number,const std::string & prefix)505 SequencedWorkerPool::Worker::Worker(
506     scoped_refptr<SequencedWorkerPool> worker_pool,
507     int thread_number,
508     const std::string& prefix)
509     : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
510       worker_pool_(std::move(worker_pool)),
511       task_shutdown_behavior_(BLOCK_SHUTDOWN),
512       is_processing_task_(false) {
513   Start();
514 }
515 
~Worker()516 SequencedWorkerPool::Worker::~Worker() {
517 }
518 
Run()519 void SequencedWorkerPool::Worker::Run() {
520 #if defined(OS_WIN)
521   win::ScopedCOMInitializer com_initializer;
522 #endif
523 
524   // Store a pointer to this worker in thread local storage for static function
525   // access.
526   DCHECK(!lazy_tls_ptr_.Get().Get());
527   lazy_tls_ptr_.Get().Set(this);
528 
529   // Just jump back to the Inner object to run the thread, since it has all the
530   // tracking information and queues. It might be more natural to implement
531   // using DelegateSimpleThread and have Inner implement the Delegate to avoid
532   // having these worker objects at all, but that method lacks the ability to
533   // send thread-specific information easily to the thread loop.
534   worker_pool_->inner_->ThreadLoop(this);
535   // Release our cyclic reference once we're done.
536   worker_pool_ = nullptr;
537 }
538 
539 // static
540 SequencedWorkerPool::Worker*
GetForCurrentThread()541 SequencedWorkerPool::Worker::GetForCurrentThread() {
542   // Don't construct lazy instance on check.
543   if (lazy_tls_ptr_ == nullptr)
544     return nullptr;
545 
546   return lazy_tls_ptr_.Get().Get();
547 }
548 
549 // static
550 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
551     SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
552 
553 // Inner definitions ---------------------------------------------------------
554 
Inner(SequencedWorkerPool * worker_pool,size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)555 SequencedWorkerPool::Inner::Inner(
556     SequencedWorkerPool* worker_pool,
557     size_t max_threads,
558     const std::string& thread_name_prefix,
559     TestingObserver* observer)
560     : worker_pool_(worker_pool),
561       lock_(),
562       has_work_cv_(&lock_),
563       can_shutdown_cv_(&lock_),
564       max_threads_(max_threads),
565       thread_name_prefix_(thread_name_prefix),
566       thread_being_created_(false),
567       waiting_thread_count_(0),
568       blocking_shutdown_thread_count_(0),
569       next_sequence_task_number_(0),
570       blocking_shutdown_pending_task_count_(0),
571       trace_id_(0),
572       shutdown_called_(false),
573       max_blocking_tasks_after_shutdown_(0),
574       cleanup_state_(CLEANUP_DONE),
575       cleanup_idlers_(0),
576       cleanup_cv_(&lock_),
577       testing_observer_(observer) {}
578 
~Inner()579 SequencedWorkerPool::Inner::~Inner() {
580   // You must call Shutdown() before destroying the pool.
581   DCHECK(shutdown_called_);
582 
583   // Need to explicitly join with the threads before they're destroyed or else
584   // they will be running when our object is half torn down.
585   for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
586     it->second->Join();
587   threads_.clear();
588 
589   if (testing_observer_)
590     testing_observer_->OnDestruct();
591 }
592 
593 // static
594 SequencedWorkerPool::SequenceToken
GetSequenceToken()595 SequencedWorkerPool::Inner::GetSequenceToken() {
596   // Need to add one because StaticAtomicSequenceNumber starts at zero, which
597   // is used as a sentinel value in SequenceTokens.
598   return SequenceToken(g_last_sequence_number_.GetNext() + 1);
599 }
600 
601 SequencedWorkerPool::SequenceToken
GetNamedSequenceToken(const std::string & name)602 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
603   AutoLock lock(lock_);
604   return SequenceToken(LockedGetNamedTokenID(name));
605 }
606 
PostTask(const std::string * optional_token_name,SequenceToken sequence_token,WorkerShutdown shutdown_behavior,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)607 bool SequencedWorkerPool::Inner::PostTask(
608     const std::string* optional_token_name,
609     SequenceToken sequence_token,
610     WorkerShutdown shutdown_behavior,
611     const tracked_objects::Location& from_here,
612     const Closure& task,
613     TimeDelta delay) {
614   DCHECK(delay.is_zero() || shutdown_behavior == SKIP_ON_SHUTDOWN);
615   SequencedTask sequenced(from_here);
616   sequenced.sequence_token_id = sequence_token.id_;
617   sequenced.shutdown_behavior = shutdown_behavior;
618   sequenced.posted_from = from_here;
619   sequenced.task =
620       shutdown_behavior == BLOCK_SHUTDOWN ?
621       base::MakeCriticalClosure(task) : task;
622   sequenced.time_to_run = TimeTicks::Now() + delay;
623 
624   int create_thread_id = 0;
625   {
626     AutoLock lock(lock_);
627     if (shutdown_called_) {
628       // Don't allow a new task to be posted if it doesn't block shutdown.
629       if (shutdown_behavior != BLOCK_SHUTDOWN)
630         return false;
631 
632       // If the current thread is running a task, and that task doesn't block
633       // shutdown, then it shouldn't be allowed to post any more tasks.
634       ThreadMap::const_iterator found =
635           threads_.find(PlatformThread::CurrentId());
636       if (found != threads_.end() && found->second->is_processing_task() &&
637           found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
638         return false;
639       }
640 
641       if (max_blocking_tasks_after_shutdown_ <= 0) {
642         DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
643         return false;
644       }
645       max_blocking_tasks_after_shutdown_ -= 1;
646     }
647 
648     // The trace_id is used for identifying the task in about:tracing.
649     sequenced.trace_id = trace_id_++;
650 
651     TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
652         "SequencedWorkerPool::Inner::PostTask",
653         TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
654         TRACE_EVENT_FLAG_FLOW_OUT);
655 
656     sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
657 
658     // Now that we have the lock, apply the named token rules.
659     if (optional_token_name)
660       sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
661 
662     pending_tasks_.insert(sequenced);
663     if (shutdown_behavior == BLOCK_SHUTDOWN)
664       blocking_shutdown_pending_task_count_++;
665 
666     create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
667   }
668 
669   // Actually start the additional thread or signal an existing one now that
670   // we're outside the lock.
671   if (create_thread_id)
672     FinishStartingAdditionalThread(create_thread_id);
673   else
674     SignalHasWork();
675 
676   return true;
677 }
678 
RunsTasksOnCurrentThread() const679 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
680   AutoLock lock(lock_);
681   return ContainsKey(threads_, PlatformThread::CurrentId());
682 }
683 
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const684 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
685     SequenceToken sequence_token) const {
686   AutoLock lock(lock_);
687   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
688   if (found == threads_.end())
689     return false;
690   return found->second->is_processing_task() &&
691          sequence_token.Equals(found->second->task_sequence_token());
692 }
693 
IsRunningSequence(SequenceToken sequence_token) const694 bool SequencedWorkerPool::Inner::IsRunningSequence(
695     SequenceToken sequence_token) const {
696   DCHECK(sequence_token.IsValid());
697   AutoLock lock(lock_);
698   return !IsSequenceTokenRunnable(sequence_token.id_);
699 }
700 
SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,WorkerShutdown shutdown_behavior)701 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
702     SequenceToken sequence_token,
703     WorkerShutdown shutdown_behavior) {
704   AutoLock lock(lock_);
705   ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
706   DCHECK(found != threads_.end());
707   DCHECK(found->second->is_processing_task());
708   DCHECK(!found->second->task_sequence_token().IsValid());
709   found->second->set_running_task_info(sequence_token, shutdown_behavior);
710 
711   // Mark the sequence token as in use.
712   bool success = current_sequences_.insert(sequence_token.id_).second;
713   DCHECK(success);
714 }
715 
716 // See https://code.google.com/p/chromium/issues/detail?id=168415
CleanupForTesting()717 void SequencedWorkerPool::Inner::CleanupForTesting() {
718   DCHECK(!RunsTasksOnCurrentThread());
719   base::ThreadRestrictions::ScopedAllowWait allow_wait;
720   AutoLock lock(lock_);
721   CHECK_EQ(CLEANUP_DONE, cleanup_state_);
722   if (shutdown_called_)
723     return;
724   if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
725     return;
726   cleanup_state_ = CLEANUP_REQUESTED;
727   cleanup_idlers_ = 0;
728   has_work_cv_.Signal();
729   while (cleanup_state_ != CLEANUP_DONE)
730     cleanup_cv_.Wait();
731 }
732 
SignalHasWorkForTesting()733 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
734   SignalHasWork();
735 }
736 
Shutdown(int max_new_blocking_tasks_after_shutdown)737 void SequencedWorkerPool::Inner::Shutdown(
738     int max_new_blocking_tasks_after_shutdown) {
739   DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
740   {
741     AutoLock lock(lock_);
742     // Cleanup and Shutdown should not be called concurrently.
743     CHECK_EQ(CLEANUP_DONE, cleanup_state_);
744     if (shutdown_called_)
745       return;
746     shutdown_called_ = true;
747     max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
748 
749     // Tickle the threads. This will wake up a waiting one so it will know that
750     // it can exit, which in turn will wake up any other waiting ones.
751     SignalHasWork();
752 
753     // There are no pending or running tasks blocking shutdown, we're done.
754     if (CanShutdown())
755       return;
756   }
757 
758   // If we're here, then something is blocking shutdown.  So wait for
759   // CanShutdown() to go to true.
760 
761   if (testing_observer_)
762     testing_observer_->WillWaitForShutdown();
763 
764 #if !defined(OS_NACL)
765   TimeTicks shutdown_wait_begin = TimeTicks::Now();
766 #endif
767 
768   {
769     base::ThreadRestrictions::ScopedAllowWait allow_wait;
770     AutoLock lock(lock_);
771     while (!CanShutdown())
772       can_shutdown_cv_.Wait();
773   }
774 #if !defined(OS_NACL)
775   UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
776                       TimeTicks::Now() - shutdown_wait_begin);
777 #endif
778 }
779 
IsShutdownInProgress()780 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
781     AutoLock lock(lock_);
782     return shutdown_called_;
783 }
784 
ThreadLoop(Worker * this_worker)785 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
786   {
787     AutoLock lock(lock_);
788     DCHECK(thread_being_created_);
789     thread_being_created_ = false;
790     auto result = threads_.insert(
791         std::make_pair(this_worker->tid(), WrapUnique(this_worker)));
792     DCHECK(result.second);
793 
794     while (true) {
795 #if defined(OS_MACOSX)
796       base::mac::ScopedNSAutoreleasePool autorelease_pool;
797 #endif
798 
799       HandleCleanup();
800 
801       // See GetWork for what delete_these_outside_lock is doing.
802       SequencedTask task;
803       TimeDelta wait_time;
804       std::vector<Closure> delete_these_outside_lock;
805       GetWorkStatus status =
806           GetWork(&task, &wait_time, &delete_these_outside_lock);
807       if (status == GET_WORK_FOUND) {
808         TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
809             "SequencedWorkerPool::Inner::ThreadLoop",
810             TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
811             TRACE_EVENT_FLAG_FLOW_IN,
812             "src_file", task.posted_from.file_name(),
813             "src_func", task.posted_from.function_name());
814         TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION task_event(
815             task.posted_from.file_name());
816         int new_thread_id = WillRunWorkerTask(task);
817         {
818           AutoUnlock unlock(lock_);
819           // There may be more work available, so wake up another
820           // worker thread. (Technically not required, since we
821           // already get a signal for each new task, but it doesn't
822           // hurt.)
823           SignalHasWork();
824           delete_these_outside_lock.clear();
825 
826           // Complete thread creation outside the lock if necessary.
827           if (new_thread_id)
828             FinishStartingAdditionalThread(new_thread_id);
829 
830           this_worker->set_running_task_info(
831               SequenceToken(task.sequence_token_id), task.shutdown_behavior);
832 
833           tracked_objects::TaskStopwatch stopwatch;
834           stopwatch.Start();
835           task.task.Run();
836           stopwatch.Stop();
837 
838           tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
839               task, stopwatch);
840 
841           // Update the sequence token in case it has been set from within the
842           // task, so it can be removed from the set of currently running
843           // sequences in DidRunWorkerTask() below.
844           task.sequence_token_id = this_worker->task_sequence_token().id_;
845 
846           // Make sure our task is erased outside the lock for the
847           // same reason we do this with delete_these_oustide_lock.
848           // Also, do it before calling reset_running_task_info() so
849           // that sequence-checking from within the task's destructor
850           // still works.
851           task.task = Closure();
852 
853           this_worker->reset_running_task_info();
854         }
855         DidRunWorkerTask(task);  // Must be done inside the lock.
856       } else if (cleanup_state_ == CLEANUP_RUNNING) {
857         switch (status) {
858           case GET_WORK_WAIT: {
859               AutoUnlock unlock(lock_);
860               delete_these_outside_lock.clear();
861             }
862             break;
863           case GET_WORK_NOT_FOUND:
864             CHECK(delete_these_outside_lock.empty());
865             cleanup_state_ = CLEANUP_FINISHING;
866             cleanup_cv_.Broadcast();
867             break;
868           default:
869             NOTREACHED();
870         }
871       } else {
872         // When we're terminating and there's no more work, we can
873         // shut down, other workers can complete any pending or new tasks.
874         // We can get additional tasks posted after shutdown_called_ is set
875         // but only worker threads are allowed to post tasks at that time, and
876         // the workers responsible for posting those tasks will be available
877         // to run them. Also, there may be some tasks stuck behind running
878         // ones with the same sequence token, but additional threads won't
879         // help this case.
880         if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
881           AutoUnlock unlock(lock_);
882           delete_these_outside_lock.clear();
883           break;
884         }
885 
886         // No work was found, but there are tasks that need deletion. The
887         // deletion must happen outside of the lock.
888         if (delete_these_outside_lock.size()) {
889           AutoUnlock unlock(lock_);
890           delete_these_outside_lock.clear();
891 
892           // Since the lock has been released, |status| may no longer be
893           // accurate. It might read GET_WORK_WAIT even if there are tasks
894           // ready to perform work. Jump to the top of the loop to recalculate
895           // |status|.
896           continue;
897         }
898 
899         waiting_thread_count_++;
900 
901         switch (status) {
902           case GET_WORK_NOT_FOUND:
903             has_work_cv_.Wait();
904             break;
905           case GET_WORK_WAIT:
906             has_work_cv_.TimedWait(wait_time);
907             break;
908           default:
909             NOTREACHED();
910         }
911         waiting_thread_count_--;
912       }
913     }
914   }  // Release lock_.
915 
916   // We noticed we should exit. Wake up the next worker so it knows it should
917   // exit as well (because the Shutdown() code only signals once).
918   SignalHasWork();
919 
920   // Possibly unblock shutdown.
921   can_shutdown_cv_.Signal();
922 }
923 
HandleCleanup()924 void SequencedWorkerPool::Inner::HandleCleanup() {
925   lock_.AssertAcquired();
926   if (cleanup_state_ == CLEANUP_DONE)
927     return;
928   if (cleanup_state_ == CLEANUP_REQUESTED) {
929     // We win, we get to do the cleanup as soon as the others wise up and idle.
930     cleanup_state_ = CLEANUP_STARTING;
931     while (thread_being_created_ ||
932            cleanup_idlers_ != threads_.size() - 1) {
933       has_work_cv_.Signal();
934       cleanup_cv_.Wait();
935     }
936     cleanup_state_ = CLEANUP_RUNNING;
937     return;
938   }
939   if (cleanup_state_ == CLEANUP_STARTING) {
940     // Another worker thread is cleaning up, we idle here until thats done.
941     ++cleanup_idlers_;
942     cleanup_cv_.Broadcast();
943     while (cleanup_state_ != CLEANUP_FINISHING) {
944       cleanup_cv_.Wait();
945     }
946     --cleanup_idlers_;
947     cleanup_cv_.Broadcast();
948     return;
949   }
950   if (cleanup_state_ == CLEANUP_FINISHING) {
951     // We wait for all idlers to wake up prior to being DONE.
952     while (cleanup_idlers_ != 0) {
953       cleanup_cv_.Broadcast();
954       cleanup_cv_.Wait();
955     }
956     if (cleanup_state_ == CLEANUP_FINISHING) {
957       cleanup_state_ = CLEANUP_DONE;
958       cleanup_cv_.Signal();
959     }
960     return;
961   }
962 }
963 
LockedGetNamedTokenID(const std::string & name)964 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
965     const std::string& name) {
966   lock_.AssertAcquired();
967   DCHECK(!name.empty());
968 
969   std::map<std::string, int>::const_iterator found =
970       named_sequence_tokens_.find(name);
971   if (found != named_sequence_tokens_.end())
972     return found->second;  // Got an existing one.
973 
974   // Create a new one for this name.
975   SequenceToken result = GetSequenceToken();
976   named_sequence_tokens_.insert(std::make_pair(name, result.id_));
977   return result.id_;
978 }
979 
LockedGetNextSequenceTaskNumber()980 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
981   lock_.AssertAcquired();
982   // We assume that we never create enough tasks to wrap around.
983   return next_sequence_task_number_++;
984 }
985 
GetWork(SequencedTask * task,TimeDelta * wait_time,std::vector<Closure> * delete_these_outside_lock)986 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
987     SequencedTask* task,
988     TimeDelta* wait_time,
989     std::vector<Closure>* delete_these_outside_lock) {
990   lock_.AssertAcquired();
991 
992   // Find the next task with a sequence token that's not currently in use.
993   // If the token is in use, that means another thread is running something
994   // in that sequence, and we can't run it without going out-of-order.
995   //
996   // This algorithm is simple and fair, but inefficient in some cases. For
997   // example, say somebody schedules 1000 slow tasks with the same sequence
998   // number. We'll have to go through all those tasks each time we feel like
999   // there might be work to schedule. If this proves to be a problem, we
1000   // should make this more efficient.
1001   //
1002   // One possible enhancement would be to keep a map from sequence ID to a
1003   // list of pending but currently blocked SequencedTasks for that ID.
1004   // When a worker finishes a task of one sequence token, it can pick up the
1005   // next one from that token right away.
1006   //
1007   // This may lead to starvation if there are sufficient numbers of sequences
1008   // in use. To alleviate this, we could add an incrementing priority counter
1009   // to each SequencedTask. Then maintain a priority_queue of all runnable
1010   // tasks, sorted by priority counter. When a sequenced task is completed
1011   // we would pop the head element off of that tasks pending list and add it
1012   // to the priority queue. Then we would run the first item in the priority
1013   // queue.
1014 
1015   GetWorkStatus status = GET_WORK_NOT_FOUND;
1016   int unrunnable_tasks = 0;
1017   PendingTaskSet::iterator i = pending_tasks_.begin();
1018   // We assume that the loop below doesn't take too long and so we can just do
1019   // a single call to TimeTicks::Now().
1020   const TimeTicks current_time = TimeTicks::Now();
1021   while (i != pending_tasks_.end()) {
1022     if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
1023       unrunnable_tasks++;
1024       ++i;
1025       continue;
1026     }
1027 
1028     if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
1029       // We're shutting down and the task we just found isn't blocking
1030       // shutdown. Delete it and get more work.
1031       //
1032       // Note that we do not want to delete unrunnable tasks. Deleting a task
1033       // can have side effects (like freeing some objects) and deleting a
1034       // task that's supposed to run after one that's currently running could
1035       // cause an obscure crash.
1036       //
1037       // We really want to delete these tasks outside the lock in case the
1038       // closures are holding refs to objects that want to post work from
1039       // their destructorss (which would deadlock). The closures are
1040       // internally refcounted, so we just need to keep a copy of them alive
1041       // until the lock is exited. The calling code can just clear() the
1042       // vector they passed to us once the lock is exited to make this
1043       // happen.
1044       delete_these_outside_lock->push_back(i->task);
1045       pending_tasks_.erase(i++);
1046       continue;
1047     }
1048 
1049     if (i->time_to_run > current_time) {
1050       // The time to run has not come yet.
1051       *wait_time = i->time_to_run - current_time;
1052       status = GET_WORK_WAIT;
1053       if (cleanup_state_ == CLEANUP_RUNNING) {
1054         // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1055         delete_these_outside_lock->push_back(i->task);
1056         pending_tasks_.erase(i);
1057       }
1058       break;
1059     }
1060 
1061     // Found a runnable task.
1062     *task = *i;
1063     pending_tasks_.erase(i);
1064     if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1065       blocking_shutdown_pending_task_count_--;
1066     }
1067 
1068     status = GET_WORK_FOUND;
1069     break;
1070   }
1071 
1072   return status;
1073 }
1074 
WillRunWorkerTask(const SequencedTask & task)1075 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1076   lock_.AssertAcquired();
1077 
1078   // Mark the task's sequence number as in use.
1079   if (task.sequence_token_id)
1080     current_sequences_.insert(task.sequence_token_id);
1081 
1082   // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1083   // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1084   // completes.
1085   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1086     blocking_shutdown_thread_count_++;
1087 
1088   // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1089   // creates a new thread if there is no free one, there is a race when posting
1090   // tasks that many tasks could have been posted before a thread started
1091   // running them, so only one thread would have been created. So we also check
1092   // whether we should create more threads after removing our task from the
1093   // queue, which also has the nice side effect of creating the workers from
1094   // background threads rather than the main thread of the app.
1095   //
1096   // If another thread wasn't created, we want to wake up an existing thread
1097   // if there is one waiting to pick up the next task.
1098   //
1099   // Note that we really need to do this *before* running the task, not
1100   // after. Otherwise, if more than one task is posted, the creation of the
1101   // second thread (since we only create one at a time) will be blocked by
1102   // the execution of the first task, which could be arbitrarily long.
1103   return PrepareToStartAdditionalThreadIfHelpful();
1104 }
1105 
DidRunWorkerTask(const SequencedTask & task)1106 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1107   lock_.AssertAcquired();
1108 
1109   if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1110     DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1111     blocking_shutdown_thread_count_--;
1112   }
1113 
1114   if (task.sequence_token_id)
1115     current_sequences_.erase(task.sequence_token_id);
1116 }
1117 
IsSequenceTokenRunnable(int sequence_token_id) const1118 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1119     int sequence_token_id) const {
1120   lock_.AssertAcquired();
1121   return !sequence_token_id ||
1122       current_sequences_.find(sequence_token_id) ==
1123           current_sequences_.end();
1124 }
1125 
PrepareToStartAdditionalThreadIfHelpful()1126 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1127   lock_.AssertAcquired();
1128   // How thread creation works:
1129   //
1130   // We'de like to avoid creating threads with the lock held. However, we
1131   // need to be sure that we have an accurate accounting of the threads for
1132   // proper Joining and deltion on shutdown.
1133   //
1134   // We need to figure out if we need another thread with the lock held, which
1135   // is what this function does. It then marks us as in the process of creating
1136   // a thread. When we do shutdown, we wait until the thread_being_created_
1137   // flag is cleared, which ensures that the new thread is properly added to
1138   // all the data structures and we can't leak it. Once shutdown starts, we'll
1139   // refuse to create more threads or they would be leaked.
1140   //
1141   // Note that this creates a mostly benign race condition on shutdown that
1142   // will cause fewer workers to be created than one would expect. It isn't
1143   // much of an issue in real life, but affects some tests. Since we only spawn
1144   // one worker at a time, the following sequence of events can happen:
1145   //
1146   //  1. Main thread posts a bunch of unrelated tasks that would normally be
1147   //     run on separate threads.
1148   //  2. The first task post causes us to start a worker. Other tasks do not
1149   //     cause a worker to start since one is pending.
1150   //  3. Main thread initiates shutdown.
1151   //  4. No more threads are created since the shutdown_called_ flag is set.
1152   //
1153   // The result is that one may expect that max_threads_ workers to be created
1154   // given the workload, but in reality fewer may be created because the
1155   // sequence of thread creation on the background threads is racing with the
1156   // shutdown call.
1157   if (!shutdown_called_ &&
1158       !thread_being_created_ &&
1159       cleanup_state_ == CLEANUP_DONE &&
1160       threads_.size() < max_threads_ &&
1161       waiting_thread_count_ == 0) {
1162     // We could use an additional thread if there's work to be done.
1163     for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1164          i != pending_tasks_.end(); ++i) {
1165       if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1166         // Found a runnable task, mark the thread as being started.
1167         thread_being_created_ = true;
1168         return static_cast<int>(threads_.size() + 1);
1169       }
1170     }
1171   }
1172   return 0;
1173 }
1174 
FinishStartingAdditionalThread(int thread_number)1175 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1176     int thread_number) {
1177   // Called outside of the lock.
1178   DCHECK_GT(thread_number, 0);
1179 
1180   // The worker is assigned to the list when the thread actually starts, which
1181   // will manage the memory of the pointer.
1182   new Worker(worker_pool_, thread_number, thread_name_prefix_);
1183 }
1184 
SignalHasWork()1185 void SequencedWorkerPool::Inner::SignalHasWork() {
1186   has_work_cv_.Signal();
1187   if (testing_observer_) {
1188     testing_observer_->OnHasWork();
1189   }
1190 }
1191 
CanShutdown() const1192 bool SequencedWorkerPool::Inner::CanShutdown() const {
1193   lock_.AssertAcquired();
1194   // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1195   return !thread_being_created_ &&
1196          blocking_shutdown_thread_count_ == 0 &&
1197          blocking_shutdown_pending_task_count_ == 0;
1198 }
1199 
1200 base::StaticAtomicSequenceNumber
1201 SequencedWorkerPool::Inner::g_last_sequence_number_;
1202 
1203 // SequencedWorkerPool --------------------------------------------------------
1204 
ToString() const1205 std::string SequencedWorkerPool::SequenceToken::ToString() const {
1206   return base::StringPrintf("[%d]", id_);
1207 }
1208 
1209 // static
1210 SequencedWorkerPool::SequenceToken
GetSequenceTokenForCurrentThread()1211 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1212   Worker* worker = Worker::GetForCurrentThread();
1213   if (!worker)
1214     return SequenceToken();
1215 
1216   return worker->task_sequence_token();
1217 }
1218 
1219 // static
1220 scoped_refptr<SequencedWorkerPool>
GetWorkerPoolForCurrentThread()1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1222   Worker* worker = Worker::GetForCurrentThread();
1223   if (!worker)
1224     return nullptr;
1225 
1226   return worker->worker_pool();
1227 }
1228 
1229 // static
1230 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerForCurrentThread()1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1232   Worker* worker = Worker::GetForCurrentThread();
1233 
1234   // If there is no worker, this thread is not a worker thread. Otherwise, it is
1235   // currently running a task (sequenced or unsequenced).
1236   if (!worker)
1237     return nullptr;
1238 
1239   scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1240   SequenceToken sequence_token = worker->task_sequence_token();
1241   WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1242   if (!sequence_token.IsValid()) {
1243     // Create a new sequence token and bind this thread to it, to make sure that
1244     // a task posted to the SequencedTaskRunner we are going to return is not
1245     // immediately going to run on a different thread.
1246     sequence_token = Inner::GetSequenceToken();
1247     pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1248                                                      shutdown_behavior);
1249   }
1250 
1251   DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1252   return new SequencedWorkerPoolSequencedTaskRunner(
1253       std::move(pool), sequence_token, shutdown_behavior);
1254 }
1255 
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix)1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1257                                          const std::string& thread_name_prefix)
1258     : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1259       inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1260 }
1261 
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1263                                          const std::string& thread_name_prefix,
1264                                          TestingObserver* observer)
1265     : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1266       inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1267 }
1268 
~SequencedWorkerPool()1269 SequencedWorkerPool::~SequencedWorkerPool() {}
1270 
OnDestruct() const1271 void SequencedWorkerPool::OnDestruct() const {
1272   // Avoid deleting ourselves on a worker thread (which would deadlock).
1273   if (RunsTasksOnCurrentThread()) {
1274     constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1275   } else {
1276     delete this;
1277   }
1278 }
1279 
1280 // static
GetSequenceToken()1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1282   return Inner::GetSequenceToken();
1283 }
1284 
GetNamedSequenceToken(const std::string & name)1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1286     const std::string& name) {
1287   return inner_->GetNamedSequenceToken(name);
1288 }
1289 
GetSequencedTaskRunner(SequenceToken token)1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1291     SequenceToken token) {
1292   return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1293 }
1294 
1295 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerWithShutdownBehavior(SequenceToken token,WorkerShutdown shutdown_behavior)1296 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1297     SequenceToken token, WorkerShutdown shutdown_behavior) {
1298   return new SequencedWorkerPoolSequencedTaskRunner(
1299       this, token, shutdown_behavior);
1300 }
1301 
1302 scoped_refptr<TaskRunner>
GetTaskRunnerWithShutdownBehavior(WorkerShutdown shutdown_behavior)1303 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1304     WorkerShutdown shutdown_behavior) {
1305   return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1306 }
1307 
PostWorkerTask(const tracked_objects::Location & from_here,const Closure & task)1308 bool SequencedWorkerPool::PostWorkerTask(
1309     const tracked_objects::Location& from_here,
1310     const Closure& task) {
1311   return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1312                           from_here, task, TimeDelta());
1313 }
1314 
PostDelayedWorkerTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1315 bool SequencedWorkerPool::PostDelayedWorkerTask(
1316     const tracked_objects::Location& from_here,
1317     const Closure& task,
1318     TimeDelta delay) {
1319   WorkerShutdown shutdown_behavior =
1320       delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1321   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1322                           from_here, task, delay);
1323 }
1324 
PostWorkerTaskWithShutdownBehavior(const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1325 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1326     const tracked_objects::Location& from_here,
1327     const Closure& task,
1328     WorkerShutdown shutdown_behavior) {
1329   return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1330                           from_here, task, TimeDelta());
1331 }
1332 
PostSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task)1333 bool SequencedWorkerPool::PostSequencedWorkerTask(
1334     SequenceToken sequence_token,
1335     const tracked_objects::Location& from_here,
1336     const Closure& task) {
1337   return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1338                           from_here, task, TimeDelta());
1339 }
1340 
PostDelayedSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1341 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1342     SequenceToken sequence_token,
1343     const tracked_objects::Location& from_here,
1344     const Closure& task,
1345     TimeDelta delay) {
1346   WorkerShutdown shutdown_behavior =
1347       delay.is_zero() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1348   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1349                           from_here, task, delay);
1350 }
1351 
PostNamedSequencedWorkerTask(const std::string & token_name,const tracked_objects::Location & from_here,const Closure & task)1352 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1353     const std::string& token_name,
1354     const tracked_objects::Location& from_here,
1355     const Closure& task) {
1356   DCHECK(!token_name.empty());
1357   return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1358                           from_here, task, TimeDelta());
1359 }
1360 
PostSequencedWorkerTaskWithShutdownBehavior(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1361 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1362     SequenceToken sequence_token,
1363     const tracked_objects::Location& from_here,
1364     const Closure& task,
1365     WorkerShutdown shutdown_behavior) {
1366   return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1367                           from_here, task, TimeDelta());
1368 }
1369 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1370 bool SequencedWorkerPool::PostDelayedTask(
1371     const tracked_objects::Location& from_here,
1372     const Closure& task,
1373     TimeDelta delay) {
1374   return PostDelayedWorkerTask(from_here, task, delay);
1375 }
1376 
RunsTasksOnCurrentThread() const1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1378   return inner_->RunsTasksOnCurrentThread();
1379 }
1380 
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1382     SequenceToken sequence_token) const {
1383   return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1384 }
1385 
IsRunningSequence(SequenceToken sequence_token) const1386 bool SequencedWorkerPool::IsRunningSequence(
1387     SequenceToken sequence_token) const {
1388   return inner_->IsRunningSequence(sequence_token);
1389 }
1390 
FlushForTesting()1391 void SequencedWorkerPool::FlushForTesting() {
1392   inner_->CleanupForTesting();
1393 }
1394 
SignalHasWorkForTesting()1395 void SequencedWorkerPool::SignalHasWorkForTesting() {
1396   inner_->SignalHasWorkForTesting();
1397 }
1398 
Shutdown(int max_new_blocking_tasks_after_shutdown)1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1400   DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1401   inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1402 }
1403 
IsShutdownInProgress()1404 bool SequencedWorkerPool::IsShutdownInProgress() {
1405   return inner_->IsShutdownInProgress();
1406 }
1407 
1408 }  // namespace base
1409