1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/sequenced_worker_pool.h"
6
7 #include <stdint.h>
8
9 #include <list>
10 #include <map>
11 #include <set>
12 #include <utility>
13 #include <vector>
14
15 #include "base/atomic_sequence_num.h"
16 #include "base/callback.h"
17 #include "base/compiler_specific.h"
18 #include "base/critical_closure.h"
19 #include "base/lazy_instance.h"
20 #include "base/logging.h"
21 #include "base/macros.h"
22 #include "base/memory/linked_ptr.h"
23 #include "base/stl_util.h"
24 #include "base/strings/stringprintf.h"
25 #include "base/synchronization/condition_variable.h"
26 #include "base/synchronization/lock.h"
27 #include "base/thread_task_runner_handle.h"
28 #include "base/threading/platform_thread.h"
29 #include "base/threading/simple_thread.h"
30 #include "base/threading/thread_local.h"
31 #include "base/threading/thread_restrictions.h"
32 #include "base/time/time.h"
33 #include "base/trace_event/trace_event.h"
34 #include "base/tracked_objects.h"
35 #include "build/build_config.h"
36
37 #if defined(OS_MACOSX)
38 #include "base/mac/scoped_nsautorelease_pool.h"
39 #elif defined(OS_WIN)
40 #include "base/win/scoped_com_initializer.h"
41 #endif
42
43 #if !defined(OS_NACL)
44 #include "base/metrics/histogram.h"
45 #endif
46
47 namespace base {
48
49 namespace {
50
51 struct SequencedTask : public TrackingInfo {
SequencedTaskbase::__anon3cfb63c70111::SequencedTask52 SequencedTask()
53 : sequence_token_id(0),
54 trace_id(0),
55 sequence_task_number(0),
56 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
57
SequencedTaskbase::__anon3cfb63c70111::SequencedTask58 explicit SequencedTask(const tracked_objects::Location& from_here)
59 : base::TrackingInfo(from_here, TimeTicks()),
60 sequence_token_id(0),
61 trace_id(0),
62 sequence_task_number(0),
63 shutdown_behavior(SequencedWorkerPool::BLOCK_SHUTDOWN) {}
64
~SequencedTaskbase::__anon3cfb63c70111::SequencedTask65 ~SequencedTask() {}
66
67 int sequence_token_id;
68 int trace_id;
69 int64_t sequence_task_number;
70 SequencedWorkerPool::WorkerShutdown shutdown_behavior;
71 tracked_objects::Location posted_from;
72 Closure task;
73
74 // Non-delayed tasks and delayed tasks are managed together by time-to-run
75 // order. We calculate the time by adding the posted time and the given delay.
76 TimeTicks time_to_run;
77 };
78
79 struct SequencedTaskLessThan {
80 public:
operator ()base::__anon3cfb63c70111::SequencedTaskLessThan81 bool operator()(const SequencedTask& lhs, const SequencedTask& rhs) const {
82 if (lhs.time_to_run < rhs.time_to_run)
83 return true;
84
85 if (lhs.time_to_run > rhs.time_to_run)
86 return false;
87
88 // If the time happen to match, then we use the sequence number to decide.
89 return lhs.sequence_task_number < rhs.sequence_task_number;
90 }
91 };
92
93 // SequencedWorkerPoolTaskRunner ---------------------------------------------
94 // A TaskRunner which posts tasks to a SequencedWorkerPool with a
95 // fixed ShutdownBehavior.
96 //
97 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
98 class SequencedWorkerPoolTaskRunner : public TaskRunner {
99 public:
100 SequencedWorkerPoolTaskRunner(
101 const scoped_refptr<SequencedWorkerPool>& pool,
102 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
103
104 // TaskRunner implementation
105 bool PostDelayedTask(const tracked_objects::Location& from_here,
106 const Closure& task,
107 TimeDelta delay) override;
108 bool RunsTasksOnCurrentThread() const override;
109
110 private:
111 ~SequencedWorkerPoolTaskRunner() override;
112
113 const scoped_refptr<SequencedWorkerPool> pool_;
114
115 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
116
117 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolTaskRunner);
118 };
119
SequencedWorkerPoolTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::WorkerShutdown shutdown_behavior)120 SequencedWorkerPoolTaskRunner::SequencedWorkerPoolTaskRunner(
121 const scoped_refptr<SequencedWorkerPool>& pool,
122 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
123 : pool_(pool),
124 shutdown_behavior_(shutdown_behavior) {
125 }
126
~SequencedWorkerPoolTaskRunner()127 SequencedWorkerPoolTaskRunner::~SequencedWorkerPoolTaskRunner() {
128 }
129
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)130 bool SequencedWorkerPoolTaskRunner::PostDelayedTask(
131 const tracked_objects::Location& from_here,
132 const Closure& task,
133 TimeDelta delay) {
134 if (delay == TimeDelta()) {
135 return pool_->PostWorkerTaskWithShutdownBehavior(
136 from_here, task, shutdown_behavior_);
137 }
138 return pool_->PostDelayedWorkerTask(from_here, task, delay);
139 }
140
RunsTasksOnCurrentThread() const141 bool SequencedWorkerPoolTaskRunner::RunsTasksOnCurrentThread() const {
142 return pool_->RunsTasksOnCurrentThread();
143 }
144
145 // SequencedWorkerPoolSequencedTaskRunner ------------------------------------
146 // A SequencedTaskRunner which posts tasks to a SequencedWorkerPool with a
147 // fixed sequence token.
148 //
149 // Note that this class is RefCountedThreadSafe (inherited from TaskRunner).
150 class SequencedWorkerPoolSequencedTaskRunner : public SequencedTaskRunner {
151 public:
152 SequencedWorkerPoolSequencedTaskRunner(
153 const scoped_refptr<SequencedWorkerPool>& pool,
154 SequencedWorkerPool::SequenceToken token,
155 SequencedWorkerPool::WorkerShutdown shutdown_behavior);
156
157 // TaskRunner implementation
158 bool PostDelayedTask(const tracked_objects::Location& from_here,
159 const Closure& task,
160 TimeDelta delay) override;
161 bool RunsTasksOnCurrentThread() const override;
162
163 // SequencedTaskRunner implementation
164 bool PostNonNestableDelayedTask(const tracked_objects::Location& from_here,
165 const Closure& task,
166 TimeDelta delay) override;
167
168 private:
169 ~SequencedWorkerPoolSequencedTaskRunner() override;
170
171 const scoped_refptr<SequencedWorkerPool> pool_;
172
173 const SequencedWorkerPool::SequenceToken token_;
174
175 const SequencedWorkerPool::WorkerShutdown shutdown_behavior_;
176
177 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPoolSequencedTaskRunner);
178 };
179
SequencedWorkerPoolSequencedTaskRunner(const scoped_refptr<SequencedWorkerPool> & pool,SequencedWorkerPool::SequenceToken token,SequencedWorkerPool::WorkerShutdown shutdown_behavior)180 SequencedWorkerPoolSequencedTaskRunner::SequencedWorkerPoolSequencedTaskRunner(
181 const scoped_refptr<SequencedWorkerPool>& pool,
182 SequencedWorkerPool::SequenceToken token,
183 SequencedWorkerPool::WorkerShutdown shutdown_behavior)
184 : pool_(pool),
185 token_(token),
186 shutdown_behavior_(shutdown_behavior) {
187 }
188
189 SequencedWorkerPoolSequencedTaskRunner::
~SequencedWorkerPoolSequencedTaskRunner()190 ~SequencedWorkerPoolSequencedTaskRunner() {
191 }
192
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)193 bool SequencedWorkerPoolSequencedTaskRunner::PostDelayedTask(
194 const tracked_objects::Location& from_here,
195 const Closure& task,
196 TimeDelta delay) {
197 if (delay == TimeDelta()) {
198 return pool_->PostSequencedWorkerTaskWithShutdownBehavior(
199 token_, from_here, task, shutdown_behavior_);
200 }
201 return pool_->PostDelayedSequencedWorkerTask(token_, from_here, task, delay);
202 }
203
RunsTasksOnCurrentThread() const204 bool SequencedWorkerPoolSequencedTaskRunner::RunsTasksOnCurrentThread() const {
205 return pool_->IsRunningSequenceOnCurrentThread(token_);
206 }
207
PostNonNestableDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)208 bool SequencedWorkerPoolSequencedTaskRunner::PostNonNestableDelayedTask(
209 const tracked_objects::Location& from_here,
210 const Closure& task,
211 TimeDelta delay) {
212 // There's no way to run nested tasks, so simply forward to
213 // PostDelayedTask.
214 return PostDelayedTask(from_here, task, delay);
215 }
216
217 // Create a process-wide unique ID to represent this task in trace events. This
218 // will be mangled with a Process ID hash to reduce the likelyhood of colliding
219 // with MessageLoop pointers on other processes.
GetTaskTraceID(const SequencedTask & task,void * pool)220 uint64_t GetTaskTraceID(const SequencedTask& task, void* pool) {
221 return (static_cast<uint64_t>(task.trace_id) << 32) |
222 static_cast<uint64_t>(reinterpret_cast<intptr_t>(pool));
223 }
224
225 } // namespace
226
227 // Worker ---------------------------------------------------------------------
228
229 class SequencedWorkerPool::Worker : public SimpleThread {
230 public:
231 // Hold a (cyclic) ref to |worker_pool|, since we want to keep it
232 // around as long as we are running.
233 Worker(const scoped_refptr<SequencedWorkerPool>& worker_pool,
234 int thread_number,
235 const std::string& thread_name_prefix);
236 ~Worker() override;
237
238 // SimpleThread implementation. This actually runs the background thread.
239 void Run() override;
240
241 // Gets the worker for the current thread out of thread-local storage.
242 static Worker* GetForCurrentThread();
243
244 // Indicates that a task is about to be run. The parameters provide
245 // additional metainformation about the task being run.
set_running_task_info(SequenceToken token,WorkerShutdown shutdown_behavior)246 void set_running_task_info(SequenceToken token,
247 WorkerShutdown shutdown_behavior) {
248 is_processing_task_ = true;
249 task_sequence_token_ = token;
250 task_shutdown_behavior_ = shutdown_behavior;
251 }
252
253 // Indicates that the task has finished running.
reset_running_task_info()254 void reset_running_task_info() { is_processing_task_ = false; }
255
256 // Whether the worker is processing a task.
is_processing_task()257 bool is_processing_task() { return is_processing_task_; }
258
task_sequence_token() const259 SequenceToken task_sequence_token() const {
260 DCHECK(is_processing_task_);
261 return task_sequence_token_;
262 }
263
task_shutdown_behavior() const264 WorkerShutdown task_shutdown_behavior() const {
265 DCHECK(is_processing_task_);
266 return task_shutdown_behavior_;
267 }
268
worker_pool() const269 scoped_refptr<SequencedWorkerPool> worker_pool() const {
270 return worker_pool_;
271 }
272
273 private:
274 static LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
275 lazy_tls_ptr_;
276
277 scoped_refptr<SequencedWorkerPool> worker_pool_;
278 // The sequence token of the task being processed. Only valid when
279 // is_processing_task_ is true.
280 SequenceToken task_sequence_token_;
281 // The shutdown behavior of the task being processed. Only valid when
282 // is_processing_task_ is true.
283 WorkerShutdown task_shutdown_behavior_;
284 // Whether the Worker is processing a task.
285 bool is_processing_task_;
286
287 DISALLOW_COPY_AND_ASSIGN(Worker);
288 };
289
290 // Inner ----------------------------------------------------------------------
291
292 class SequencedWorkerPool::Inner {
293 public:
294 // Take a raw pointer to |worker| to avoid cycles (since we're owned
295 // by it).
296 Inner(SequencedWorkerPool* worker_pool, size_t max_threads,
297 const std::string& thread_name_prefix,
298 TestingObserver* observer);
299
300 ~Inner();
301
302 static SequenceToken GetSequenceToken();
303
304 SequenceToken GetNamedSequenceToken(const std::string& name);
305
306 // This function accepts a name and an ID. If the name is null, the
307 // token ID is used. This allows us to implement the optional name lookup
308 // from a single function without having to enter the lock a separate time.
309 bool PostTask(const std::string* optional_token_name,
310 SequenceToken sequence_token,
311 WorkerShutdown shutdown_behavior,
312 const tracked_objects::Location& from_here,
313 const Closure& task,
314 TimeDelta delay);
315
316 bool RunsTasksOnCurrentThread() const;
317
318 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
319
320 bool IsRunningSequence(SequenceToken sequence_token) const;
321
322 void SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,
323 WorkerShutdown shutdown_behavior);
324
325 void CleanupForTesting();
326
327 void SignalHasWorkForTesting();
328
329 int GetWorkSignalCountForTesting() const;
330
331 void Shutdown(int max_blocking_tasks_after_shutdown);
332
333 bool IsShutdownInProgress();
334
335 // Runs the worker loop on the background thread.
336 void ThreadLoop(Worker* this_worker);
337
338 private:
339 enum GetWorkStatus {
340 GET_WORK_FOUND,
341 GET_WORK_NOT_FOUND,
342 GET_WORK_WAIT,
343 };
344
345 enum CleanupState {
346 CLEANUP_REQUESTED,
347 CLEANUP_STARTING,
348 CLEANUP_RUNNING,
349 CLEANUP_FINISHING,
350 CLEANUP_DONE,
351 };
352
353 // Called from within the lock, this converts the given token name into a
354 // token ID, creating a new one if necessary.
355 int LockedGetNamedTokenID(const std::string& name);
356
357 // Called from within the lock, this returns the next sequence task number.
358 int64_t LockedGetNextSequenceTaskNumber();
359
360 // Gets new task. There are 3 cases depending on the return value:
361 //
362 // 1) If the return value is |GET_WORK_FOUND|, |task| is filled in and should
363 // be run immediately.
364 // 2) If the return value is |GET_WORK_NOT_FOUND|, there are no tasks to run,
365 // and |task| is not filled in. In this case, the caller should wait until
366 // a task is posted.
367 // 3) If the return value is |GET_WORK_WAIT|, there are no tasks to run
368 // immediately, and |task| is not filled in. Likewise, |wait_time| is
369 // filled in the time to wait until the next task to run. In this case, the
370 // caller should wait the time.
371 //
372 // In any case, the calling code should clear the given
373 // delete_these_outside_lock vector the next time the lock is released.
374 // See the implementation for a more detailed description.
375 GetWorkStatus GetWork(SequencedTask* task,
376 TimeDelta* wait_time,
377 std::vector<Closure>* delete_these_outside_lock);
378
379 void HandleCleanup();
380
381 // Peforms init and cleanup around running the given task. WillRun...
382 // returns the value from PrepareToStartAdditionalThreadIfNecessary.
383 // The calling code should call FinishStartingAdditionalThread once the
384 // lock is released if the return values is nonzero.
385 int WillRunWorkerTask(const SequencedTask& task);
386 void DidRunWorkerTask(const SequencedTask& task);
387
388 // Returns true if there are no threads currently running the given
389 // sequence token.
390 bool IsSequenceTokenRunnable(int sequence_token_id) const;
391
392 // Checks if all threads are busy and the addition of one more could run an
393 // additional task waiting in the queue. This must be called from within
394 // the lock.
395 //
396 // If another thread is helpful, this will mark the thread as being in the
397 // process of starting and returns the index of the new thread which will be
398 // 0 or more. The caller should then call FinishStartingAdditionalThread to
399 // complete initialization once the lock is released.
400 //
401 // If another thread is not necessary, returne 0;
402 //
403 // See the implementedion for more.
404 int PrepareToStartAdditionalThreadIfHelpful();
405
406 // The second part of thread creation after
407 // PrepareToStartAdditionalThreadIfHelpful with the thread number it
408 // generated. This actually creates the thread and should be called outside
409 // the lock to avoid blocking important work starting a thread in the lock.
410 void FinishStartingAdditionalThread(int thread_number);
411
412 // Signal |has_work_| and increment |has_work_signal_count_|.
413 void SignalHasWork();
414
415 // Checks whether there is work left that's blocking shutdown. Must be
416 // called inside the lock.
417 bool CanShutdown() const;
418
419 SequencedWorkerPool* const worker_pool_;
420
421 // The last sequence number used. Managed by GetSequenceToken, since this
422 // only does threadsafe increment operations, you do not need to hold the
423 // lock. This is class-static to make SequenceTokens issued by
424 // GetSequenceToken unique across SequencedWorkerPool instances.
425 static base::StaticAtomicSequenceNumber g_last_sequence_number_;
426
427 // This lock protects |everything in this class|. Do not read or modify
428 // anything without holding this lock. Do not block while holding this
429 // lock.
430 mutable Lock lock_;
431
432 // Condition variable that is waited on by worker threads until new
433 // tasks are posted or shutdown starts.
434 ConditionVariable has_work_cv_;
435
436 // Condition variable that is waited on by non-worker threads (in
437 // Shutdown()) until CanShutdown() goes to true.
438 ConditionVariable can_shutdown_cv_;
439
440 // The maximum number of worker threads we'll create.
441 const size_t max_threads_;
442
443 const std::string thread_name_prefix_;
444
445 // Associates all known sequence token names with their IDs.
446 std::map<std::string, int> named_sequence_tokens_;
447
448 // Owning pointers to all threads we've created so far, indexed by
449 // ID. Since we lazily create threads, this may be less than
450 // max_threads_ and will be initially empty.
451 typedef std::map<PlatformThreadId, linked_ptr<Worker> > ThreadMap;
452 ThreadMap threads_;
453
454 // Set to true when we're in the process of creating another thread.
455 // See PrepareToStartAdditionalThreadIfHelpful for more.
456 bool thread_being_created_;
457
458 // Number of threads currently waiting for work.
459 size_t waiting_thread_count_;
460
461 // Number of threads currently running tasks that have the BLOCK_SHUTDOWN
462 // or SKIP_ON_SHUTDOWN flag set.
463 size_t blocking_shutdown_thread_count_;
464
465 // A set of all pending tasks in time-to-run order. These are tasks that are
466 // either waiting for a thread to run on, waiting for their time to run,
467 // or blocked on a previous task in their sequence. We have to iterate over
468 // the tasks by time-to-run order, so we use the set instead of the
469 // traditional priority_queue.
470 typedef std::set<SequencedTask, SequencedTaskLessThan> PendingTaskSet;
471 PendingTaskSet pending_tasks_;
472
473 // The next sequence number for a new sequenced task.
474 int64_t next_sequence_task_number_;
475
476 // Number of tasks in the pending_tasks_ list that are marked as blocking
477 // shutdown.
478 size_t blocking_shutdown_pending_task_count_;
479
480 // Lists all sequence tokens currently executing.
481 std::set<int> current_sequences_;
482
483 // An ID for each posted task to distinguish the task from others in traces.
484 int trace_id_;
485
486 // Set when Shutdown is called and no further tasks should be
487 // allowed, though we may still be running existing tasks.
488 bool shutdown_called_;
489
490 // The number of new BLOCK_SHUTDOWN tasks that may be posted after Shudown()
491 // has been called.
492 int max_blocking_tasks_after_shutdown_;
493
494 // State used to cleanup for testing, all guarded by lock_.
495 CleanupState cleanup_state_;
496 size_t cleanup_idlers_;
497 ConditionVariable cleanup_cv_;
498
499 TestingObserver* const testing_observer_;
500
501 DISALLOW_COPY_AND_ASSIGN(Inner);
502 };
503
504 // Worker definitions ---------------------------------------------------------
505
Worker(const scoped_refptr<SequencedWorkerPool> & worker_pool,int thread_number,const std::string & prefix)506 SequencedWorkerPool::Worker::Worker(
507 const scoped_refptr<SequencedWorkerPool>& worker_pool,
508 int thread_number,
509 const std::string& prefix)
510 : SimpleThread(prefix + StringPrintf("Worker%d", thread_number)),
511 worker_pool_(worker_pool),
512 task_shutdown_behavior_(BLOCK_SHUTDOWN),
513 is_processing_task_(false) {
514 Start();
515 }
516
~Worker()517 SequencedWorkerPool::Worker::~Worker() {
518 }
519
Run()520 void SequencedWorkerPool::Worker::Run() {
521 #if defined(OS_WIN)
522 win::ScopedCOMInitializer com_initializer;
523 #endif
524
525 // Store a pointer to this worker in thread local storage for static function
526 // access.
527 DCHECK(!lazy_tls_ptr_.Get().Get());
528 lazy_tls_ptr_.Get().Set(this);
529
530 // Just jump back to the Inner object to run the thread, since it has all the
531 // tracking information and queues. It might be more natural to implement
532 // using DelegateSimpleThread and have Inner implement the Delegate to avoid
533 // having these worker objects at all, but that method lacks the ability to
534 // send thread-specific information easily to the thread loop.
535 worker_pool_->inner_->ThreadLoop(this);
536 // Release our cyclic reference once we're done.
537 worker_pool_ = nullptr;
538 }
539
540 // static
541 SequencedWorkerPool::Worker*
GetForCurrentThread()542 SequencedWorkerPool::Worker::GetForCurrentThread() {
543 // Don't construct lazy instance on check.
544 if (lazy_tls_ptr_ == nullptr)
545 return nullptr;
546
547 return lazy_tls_ptr_.Get().Get();
548 }
549
550 // static
551 LazyInstance<ThreadLocalPointer<SequencedWorkerPool::Worker>>::Leaky
552 SequencedWorkerPool::Worker::lazy_tls_ptr_ = LAZY_INSTANCE_INITIALIZER;
553
554 // Inner definitions ---------------------------------------------------------
555
Inner(SequencedWorkerPool * worker_pool,size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)556 SequencedWorkerPool::Inner::Inner(
557 SequencedWorkerPool* worker_pool,
558 size_t max_threads,
559 const std::string& thread_name_prefix,
560 TestingObserver* observer)
561 : worker_pool_(worker_pool),
562 lock_(),
563 has_work_cv_(&lock_),
564 can_shutdown_cv_(&lock_),
565 max_threads_(max_threads),
566 thread_name_prefix_(thread_name_prefix),
567 thread_being_created_(false),
568 waiting_thread_count_(0),
569 blocking_shutdown_thread_count_(0),
570 next_sequence_task_number_(0),
571 blocking_shutdown_pending_task_count_(0),
572 trace_id_(0),
573 shutdown_called_(false),
574 max_blocking_tasks_after_shutdown_(0),
575 cleanup_state_(CLEANUP_DONE),
576 cleanup_idlers_(0),
577 cleanup_cv_(&lock_),
578 testing_observer_(observer) {}
579
~Inner()580 SequencedWorkerPool::Inner::~Inner() {
581 // You must call Shutdown() before destroying the pool.
582 DCHECK(shutdown_called_);
583
584 // Need to explicitly join with the threads before they're destroyed or else
585 // they will be running when our object is half torn down.
586 for (ThreadMap::iterator it = threads_.begin(); it != threads_.end(); ++it)
587 it->second->Join();
588 threads_.clear();
589
590 if (testing_observer_)
591 testing_observer_->OnDestruct();
592 }
593
594 // static
595 SequencedWorkerPool::SequenceToken
GetSequenceToken()596 SequencedWorkerPool::Inner::GetSequenceToken() {
597 // Need to add one because StaticAtomicSequenceNumber starts at zero, which
598 // is used as a sentinel value in SequenceTokens.
599 return SequenceToken(g_last_sequence_number_.GetNext() + 1);
600 }
601
602 SequencedWorkerPool::SequenceToken
GetNamedSequenceToken(const std::string & name)603 SequencedWorkerPool::Inner::GetNamedSequenceToken(const std::string& name) {
604 AutoLock lock(lock_);
605 return SequenceToken(LockedGetNamedTokenID(name));
606 }
607
PostTask(const std::string * optional_token_name,SequenceToken sequence_token,WorkerShutdown shutdown_behavior,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)608 bool SequencedWorkerPool::Inner::PostTask(
609 const std::string* optional_token_name,
610 SequenceToken sequence_token,
611 WorkerShutdown shutdown_behavior,
612 const tracked_objects::Location& from_here,
613 const Closure& task,
614 TimeDelta delay) {
615 DCHECK(delay == TimeDelta() || shutdown_behavior == SKIP_ON_SHUTDOWN);
616 SequencedTask sequenced(from_here);
617 sequenced.sequence_token_id = sequence_token.id_;
618 sequenced.shutdown_behavior = shutdown_behavior;
619 sequenced.posted_from = from_here;
620 sequenced.task =
621 shutdown_behavior == BLOCK_SHUTDOWN ?
622 base::MakeCriticalClosure(task) : task;
623 sequenced.time_to_run = TimeTicks::Now() + delay;
624
625 int create_thread_id = 0;
626 {
627 AutoLock lock(lock_);
628 if (shutdown_called_) {
629 // Don't allow a new task to be posted if it doesn't block shutdown.
630 if (shutdown_behavior != BLOCK_SHUTDOWN)
631 return false;
632
633 // If the current thread is running a task, and that task doesn't block
634 // shutdown, then it shouldn't be allowed to post any more tasks.
635 ThreadMap::const_iterator found =
636 threads_.find(PlatformThread::CurrentId());
637 if (found != threads_.end() && found->second->is_processing_task() &&
638 found->second->task_shutdown_behavior() != BLOCK_SHUTDOWN) {
639 return false;
640 }
641
642 if (max_blocking_tasks_after_shutdown_ <= 0) {
643 DLOG(WARNING) << "BLOCK_SHUTDOWN task disallowed";
644 return false;
645 }
646 max_blocking_tasks_after_shutdown_ -= 1;
647 }
648
649 // The trace_id is used for identifying the task in about:tracing.
650 sequenced.trace_id = trace_id_++;
651
652 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
653 "SequencedWorkerPool::Inner::PostTask",
654 TRACE_ID_MANGLE(GetTaskTraceID(sequenced, static_cast<void*>(this))),
655 TRACE_EVENT_FLAG_FLOW_OUT);
656
657 sequenced.sequence_task_number = LockedGetNextSequenceTaskNumber();
658
659 // Now that we have the lock, apply the named token rules.
660 if (optional_token_name)
661 sequenced.sequence_token_id = LockedGetNamedTokenID(*optional_token_name);
662
663 pending_tasks_.insert(sequenced);
664 if (shutdown_behavior == BLOCK_SHUTDOWN)
665 blocking_shutdown_pending_task_count_++;
666
667 create_thread_id = PrepareToStartAdditionalThreadIfHelpful();
668 }
669
670 // Actually start the additional thread or signal an existing one now that
671 // we're outside the lock.
672 if (create_thread_id)
673 FinishStartingAdditionalThread(create_thread_id);
674 else
675 SignalHasWork();
676
677 return true;
678 }
679
RunsTasksOnCurrentThread() const680 bool SequencedWorkerPool::Inner::RunsTasksOnCurrentThread() const {
681 AutoLock lock(lock_);
682 return ContainsKey(threads_, PlatformThread::CurrentId());
683 }
684
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const685 bool SequencedWorkerPool::Inner::IsRunningSequenceOnCurrentThread(
686 SequenceToken sequence_token) const {
687 AutoLock lock(lock_);
688 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
689 if (found == threads_.end())
690 return false;
691 return found->second->is_processing_task() &&
692 sequence_token.Equals(found->second->task_sequence_token());
693 }
694
IsRunningSequence(SequenceToken sequence_token) const695 bool SequencedWorkerPool::Inner::IsRunningSequence(
696 SequenceToken sequence_token) const {
697 DCHECK(sequence_token.IsValid());
698 AutoLock lock(lock_);
699 return !IsSequenceTokenRunnable(sequence_token.id_);
700 }
701
SetRunningTaskInfoForCurrentThread(SequenceToken sequence_token,WorkerShutdown shutdown_behavior)702 void SequencedWorkerPool::Inner::SetRunningTaskInfoForCurrentThread(
703 SequenceToken sequence_token,
704 WorkerShutdown shutdown_behavior) {
705 AutoLock lock(lock_);
706 ThreadMap::const_iterator found = threads_.find(PlatformThread::CurrentId());
707 DCHECK(found != threads_.end());
708 DCHECK(found->second->is_processing_task());
709 DCHECK(!found->second->task_sequence_token().IsValid());
710 found->second->set_running_task_info(sequence_token, shutdown_behavior);
711
712 // Mark the sequence token as in use.
713 bool success = current_sequences_.insert(sequence_token.id_).second;
714 DCHECK(success);
715 }
716
717 // See https://code.google.com/p/chromium/issues/detail?id=168415
CleanupForTesting()718 void SequencedWorkerPool::Inner::CleanupForTesting() {
719 DCHECK(!RunsTasksOnCurrentThread());
720 base::ThreadRestrictions::ScopedAllowWait allow_wait;
721 AutoLock lock(lock_);
722 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
723 if (shutdown_called_)
724 return;
725 if (pending_tasks_.empty() && waiting_thread_count_ == threads_.size())
726 return;
727 cleanup_state_ = CLEANUP_REQUESTED;
728 cleanup_idlers_ = 0;
729 has_work_cv_.Signal();
730 while (cleanup_state_ != CLEANUP_DONE)
731 cleanup_cv_.Wait();
732 }
733
SignalHasWorkForTesting()734 void SequencedWorkerPool::Inner::SignalHasWorkForTesting() {
735 SignalHasWork();
736 }
737
Shutdown(int max_new_blocking_tasks_after_shutdown)738 void SequencedWorkerPool::Inner::Shutdown(
739 int max_new_blocking_tasks_after_shutdown) {
740 DCHECK_GE(max_new_blocking_tasks_after_shutdown, 0);
741 {
742 AutoLock lock(lock_);
743 // Cleanup and Shutdown should not be called concurrently.
744 CHECK_EQ(CLEANUP_DONE, cleanup_state_);
745 if (shutdown_called_)
746 return;
747 shutdown_called_ = true;
748 max_blocking_tasks_after_shutdown_ = max_new_blocking_tasks_after_shutdown;
749
750 // Tickle the threads. This will wake up a waiting one so it will know that
751 // it can exit, which in turn will wake up any other waiting ones.
752 SignalHasWork();
753
754 // There are no pending or running tasks blocking shutdown, we're done.
755 if (CanShutdown())
756 return;
757 }
758
759 // If we're here, then something is blocking shutdown. So wait for
760 // CanShutdown() to go to true.
761
762 if (testing_observer_)
763 testing_observer_->WillWaitForShutdown();
764
765 #if !defined(OS_NACL)
766 TimeTicks shutdown_wait_begin = TimeTicks::Now();
767 #endif
768
769 {
770 base::ThreadRestrictions::ScopedAllowWait allow_wait;
771 AutoLock lock(lock_);
772 while (!CanShutdown())
773 can_shutdown_cv_.Wait();
774 }
775 #if !defined(OS_NACL)
776 UMA_HISTOGRAM_TIMES("SequencedWorkerPool.ShutdownDelayTime",
777 TimeTicks::Now() - shutdown_wait_begin);
778 #endif
779 }
780
IsShutdownInProgress()781 bool SequencedWorkerPool::Inner::IsShutdownInProgress() {
782 AutoLock lock(lock_);
783 return shutdown_called_;
784 }
785
ThreadLoop(Worker * this_worker)786 void SequencedWorkerPool::Inner::ThreadLoop(Worker* this_worker) {
787 {
788 AutoLock lock(lock_);
789 DCHECK(thread_being_created_);
790 thread_being_created_ = false;
791 std::pair<ThreadMap::iterator, bool> result =
792 threads_.insert(
793 std::make_pair(this_worker->tid(), make_linked_ptr(this_worker)));
794 DCHECK(result.second);
795
796 while (true) {
797 #if defined(OS_MACOSX)
798 base::mac::ScopedNSAutoreleasePool autorelease_pool;
799 #endif
800
801 HandleCleanup();
802
803 // See GetWork for what delete_these_outside_lock is doing.
804 SequencedTask task;
805 TimeDelta wait_time;
806 std::vector<Closure> delete_these_outside_lock;
807 GetWorkStatus status =
808 GetWork(&task, &wait_time, &delete_these_outside_lock);
809 if (status == GET_WORK_FOUND) {
810 TRACE_EVENT_WITH_FLOW2(TRACE_DISABLED_BY_DEFAULT("toplevel.flow"),
811 "SequencedWorkerPool::Inner::ThreadLoop",
812 TRACE_ID_MANGLE(GetTaskTraceID(task, static_cast<void*>(this))),
813 TRACE_EVENT_FLAG_FLOW_IN,
814 "src_file", task.posted_from.file_name(),
815 "src_func", task.posted_from.function_name());
816 int new_thread_id = WillRunWorkerTask(task);
817 {
818 AutoUnlock unlock(lock_);
819 // There may be more work available, so wake up another
820 // worker thread. (Technically not required, since we
821 // already get a signal for each new task, but it doesn't
822 // hurt.)
823 SignalHasWork();
824 delete_these_outside_lock.clear();
825
826 // Complete thread creation outside the lock if necessary.
827 if (new_thread_id)
828 FinishStartingAdditionalThread(new_thread_id);
829
830 this_worker->set_running_task_info(
831 SequenceToken(task.sequence_token_id), task.shutdown_behavior);
832
833 tracked_objects::TaskStopwatch stopwatch;
834 stopwatch.Start();
835 task.task.Run();
836 stopwatch.Stop();
837
838 tracked_objects::ThreadData::TallyRunOnNamedThreadIfTracking(
839 task, stopwatch);
840
841 // Update the sequence token in case it has been set from within the
842 // task, so it can be removed from the set of currently running
843 // sequences in DidRunWorkerTask() below.
844 task.sequence_token_id = this_worker->task_sequence_token().id_;
845
846 // Make sure our task is erased outside the lock for the
847 // same reason we do this with delete_these_oustide_lock.
848 // Also, do it before calling reset_running_task_info() so
849 // that sequence-checking from within the task's destructor
850 // still works.
851 task.task = Closure();
852
853 this_worker->reset_running_task_info();
854 }
855 DidRunWorkerTask(task); // Must be done inside the lock.
856 } else if (cleanup_state_ == CLEANUP_RUNNING) {
857 switch (status) {
858 case GET_WORK_WAIT: {
859 AutoUnlock unlock(lock_);
860 delete_these_outside_lock.clear();
861 }
862 break;
863 case GET_WORK_NOT_FOUND:
864 CHECK(delete_these_outside_lock.empty());
865 cleanup_state_ = CLEANUP_FINISHING;
866 cleanup_cv_.Broadcast();
867 break;
868 default:
869 NOTREACHED();
870 }
871 } else {
872 // When we're terminating and there's no more work, we can
873 // shut down, other workers can complete any pending or new tasks.
874 // We can get additional tasks posted after shutdown_called_ is set
875 // but only worker threads are allowed to post tasks at that time, and
876 // the workers responsible for posting those tasks will be available
877 // to run them. Also, there may be some tasks stuck behind running
878 // ones with the same sequence token, but additional threads won't
879 // help this case.
880 if (shutdown_called_ && blocking_shutdown_pending_task_count_ == 0) {
881 AutoUnlock unlock(lock_);
882 delete_these_outside_lock.clear();
883 break;
884 }
885
886 // No work was found, but there are tasks that need deletion. The
887 // deletion must happen outside of the lock.
888 if (delete_these_outside_lock.size()) {
889 AutoUnlock unlock(lock_);
890 delete_these_outside_lock.clear();
891
892 // Since the lock has been released, |status| may no longer be
893 // accurate. It might read GET_WORK_WAIT even if there are tasks
894 // ready to perform work. Jump to the top of the loop to recalculate
895 // |status|.
896 continue;
897 }
898
899 waiting_thread_count_++;
900
901 switch (status) {
902 case GET_WORK_NOT_FOUND:
903 has_work_cv_.Wait();
904 break;
905 case GET_WORK_WAIT:
906 has_work_cv_.TimedWait(wait_time);
907 break;
908 default:
909 NOTREACHED();
910 }
911 waiting_thread_count_--;
912 }
913 }
914 } // Release lock_.
915
916 // We noticed we should exit. Wake up the next worker so it knows it should
917 // exit as well (because the Shutdown() code only signals once).
918 SignalHasWork();
919
920 // Possibly unblock shutdown.
921 can_shutdown_cv_.Signal();
922 }
923
HandleCleanup()924 void SequencedWorkerPool::Inner::HandleCleanup() {
925 lock_.AssertAcquired();
926 if (cleanup_state_ == CLEANUP_DONE)
927 return;
928 if (cleanup_state_ == CLEANUP_REQUESTED) {
929 // We win, we get to do the cleanup as soon as the others wise up and idle.
930 cleanup_state_ = CLEANUP_STARTING;
931 while (thread_being_created_ ||
932 cleanup_idlers_ != threads_.size() - 1) {
933 has_work_cv_.Signal();
934 cleanup_cv_.Wait();
935 }
936 cleanup_state_ = CLEANUP_RUNNING;
937 return;
938 }
939 if (cleanup_state_ == CLEANUP_STARTING) {
940 // Another worker thread is cleaning up, we idle here until thats done.
941 ++cleanup_idlers_;
942 cleanup_cv_.Broadcast();
943 while (cleanup_state_ != CLEANUP_FINISHING) {
944 cleanup_cv_.Wait();
945 }
946 --cleanup_idlers_;
947 cleanup_cv_.Broadcast();
948 return;
949 }
950 if (cleanup_state_ == CLEANUP_FINISHING) {
951 // We wait for all idlers to wake up prior to being DONE.
952 while (cleanup_idlers_ != 0) {
953 cleanup_cv_.Broadcast();
954 cleanup_cv_.Wait();
955 }
956 if (cleanup_state_ == CLEANUP_FINISHING) {
957 cleanup_state_ = CLEANUP_DONE;
958 cleanup_cv_.Signal();
959 }
960 return;
961 }
962 }
963
LockedGetNamedTokenID(const std::string & name)964 int SequencedWorkerPool::Inner::LockedGetNamedTokenID(
965 const std::string& name) {
966 lock_.AssertAcquired();
967 DCHECK(!name.empty());
968
969 std::map<std::string, int>::const_iterator found =
970 named_sequence_tokens_.find(name);
971 if (found != named_sequence_tokens_.end())
972 return found->second; // Got an existing one.
973
974 // Create a new one for this name.
975 SequenceToken result = GetSequenceToken();
976 named_sequence_tokens_.insert(std::make_pair(name, result.id_));
977 return result.id_;
978 }
979
LockedGetNextSequenceTaskNumber()980 int64_t SequencedWorkerPool::Inner::LockedGetNextSequenceTaskNumber() {
981 lock_.AssertAcquired();
982 // We assume that we never create enough tasks to wrap around.
983 return next_sequence_task_number_++;
984 }
985
GetWork(SequencedTask * task,TimeDelta * wait_time,std::vector<Closure> * delete_these_outside_lock)986 SequencedWorkerPool::Inner::GetWorkStatus SequencedWorkerPool::Inner::GetWork(
987 SequencedTask* task,
988 TimeDelta* wait_time,
989 std::vector<Closure>* delete_these_outside_lock) {
990 lock_.AssertAcquired();
991
992 // Find the next task with a sequence token that's not currently in use.
993 // If the token is in use, that means another thread is running something
994 // in that sequence, and we can't run it without going out-of-order.
995 //
996 // This algorithm is simple and fair, but inefficient in some cases. For
997 // example, say somebody schedules 1000 slow tasks with the same sequence
998 // number. We'll have to go through all those tasks each time we feel like
999 // there might be work to schedule. If this proves to be a problem, we
1000 // should make this more efficient.
1001 //
1002 // One possible enhancement would be to keep a map from sequence ID to a
1003 // list of pending but currently blocked SequencedTasks for that ID.
1004 // When a worker finishes a task of one sequence token, it can pick up the
1005 // next one from that token right away.
1006 //
1007 // This may lead to starvation if there are sufficient numbers of sequences
1008 // in use. To alleviate this, we could add an incrementing priority counter
1009 // to each SequencedTask. Then maintain a priority_queue of all runnable
1010 // tasks, sorted by priority counter. When a sequenced task is completed
1011 // we would pop the head element off of that tasks pending list and add it
1012 // to the priority queue. Then we would run the first item in the priority
1013 // queue.
1014
1015 GetWorkStatus status = GET_WORK_NOT_FOUND;
1016 int unrunnable_tasks = 0;
1017 PendingTaskSet::iterator i = pending_tasks_.begin();
1018 // We assume that the loop below doesn't take too long and so we can just do
1019 // a single call to TimeTicks::Now().
1020 const TimeTicks current_time = TimeTicks::Now();
1021 while (i != pending_tasks_.end()) {
1022 if (!IsSequenceTokenRunnable(i->sequence_token_id)) {
1023 unrunnable_tasks++;
1024 ++i;
1025 continue;
1026 }
1027
1028 if (shutdown_called_ && i->shutdown_behavior != BLOCK_SHUTDOWN) {
1029 // We're shutting down and the task we just found isn't blocking
1030 // shutdown. Delete it and get more work.
1031 //
1032 // Note that we do not want to delete unrunnable tasks. Deleting a task
1033 // can have side effects (like freeing some objects) and deleting a
1034 // task that's supposed to run after one that's currently running could
1035 // cause an obscure crash.
1036 //
1037 // We really want to delete these tasks outside the lock in case the
1038 // closures are holding refs to objects that want to post work from
1039 // their destructorss (which would deadlock). The closures are
1040 // internally refcounted, so we just need to keep a copy of them alive
1041 // until the lock is exited. The calling code can just clear() the
1042 // vector they passed to us once the lock is exited to make this
1043 // happen.
1044 delete_these_outside_lock->push_back(i->task);
1045 pending_tasks_.erase(i++);
1046 continue;
1047 }
1048
1049 if (i->time_to_run > current_time) {
1050 // The time to run has not come yet.
1051 *wait_time = i->time_to_run - current_time;
1052 status = GET_WORK_WAIT;
1053 if (cleanup_state_ == CLEANUP_RUNNING) {
1054 // Deferred tasks are deleted when cleaning up, see Inner::ThreadLoop.
1055 delete_these_outside_lock->push_back(i->task);
1056 pending_tasks_.erase(i);
1057 }
1058 break;
1059 }
1060
1061 // Found a runnable task.
1062 *task = *i;
1063 pending_tasks_.erase(i);
1064 if (task->shutdown_behavior == BLOCK_SHUTDOWN) {
1065 blocking_shutdown_pending_task_count_--;
1066 }
1067
1068 status = GET_WORK_FOUND;
1069 break;
1070 }
1071
1072 return status;
1073 }
1074
WillRunWorkerTask(const SequencedTask & task)1075 int SequencedWorkerPool::Inner::WillRunWorkerTask(const SequencedTask& task) {
1076 lock_.AssertAcquired();
1077
1078 // Mark the task's sequence number as in use.
1079 if (task.sequence_token_id)
1080 current_sequences_.insert(task.sequence_token_id);
1081
1082 // Ensure that threads running tasks posted with either SKIP_ON_SHUTDOWN
1083 // or BLOCK_SHUTDOWN will prevent shutdown until that task or thread
1084 // completes.
1085 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN)
1086 blocking_shutdown_thread_count_++;
1087
1088 // We just picked up a task. Since StartAdditionalThreadIfHelpful only
1089 // creates a new thread if there is no free one, there is a race when posting
1090 // tasks that many tasks could have been posted before a thread started
1091 // running them, so only one thread would have been created. So we also check
1092 // whether we should create more threads after removing our task from the
1093 // queue, which also has the nice side effect of creating the workers from
1094 // background threads rather than the main thread of the app.
1095 //
1096 // If another thread wasn't created, we want to wake up an existing thread
1097 // if there is one waiting to pick up the next task.
1098 //
1099 // Note that we really need to do this *before* running the task, not
1100 // after. Otherwise, if more than one task is posted, the creation of the
1101 // second thread (since we only create one at a time) will be blocked by
1102 // the execution of the first task, which could be arbitrarily long.
1103 return PrepareToStartAdditionalThreadIfHelpful();
1104 }
1105
DidRunWorkerTask(const SequencedTask & task)1106 void SequencedWorkerPool::Inner::DidRunWorkerTask(const SequencedTask& task) {
1107 lock_.AssertAcquired();
1108
1109 if (task.shutdown_behavior != CONTINUE_ON_SHUTDOWN) {
1110 DCHECK_GT(blocking_shutdown_thread_count_, 0u);
1111 blocking_shutdown_thread_count_--;
1112 }
1113
1114 if (task.sequence_token_id)
1115 current_sequences_.erase(task.sequence_token_id);
1116 }
1117
IsSequenceTokenRunnable(int sequence_token_id) const1118 bool SequencedWorkerPool::Inner::IsSequenceTokenRunnable(
1119 int sequence_token_id) const {
1120 lock_.AssertAcquired();
1121 return !sequence_token_id ||
1122 current_sequences_.find(sequence_token_id) ==
1123 current_sequences_.end();
1124 }
1125
PrepareToStartAdditionalThreadIfHelpful()1126 int SequencedWorkerPool::Inner::PrepareToStartAdditionalThreadIfHelpful() {
1127 lock_.AssertAcquired();
1128 // How thread creation works:
1129 //
1130 // We'de like to avoid creating threads with the lock held. However, we
1131 // need to be sure that we have an accurate accounting of the threads for
1132 // proper Joining and deltion on shutdown.
1133 //
1134 // We need to figure out if we need another thread with the lock held, which
1135 // is what this function does. It then marks us as in the process of creating
1136 // a thread. When we do shutdown, we wait until the thread_being_created_
1137 // flag is cleared, which ensures that the new thread is properly added to
1138 // all the data structures and we can't leak it. Once shutdown starts, we'll
1139 // refuse to create more threads or they would be leaked.
1140 //
1141 // Note that this creates a mostly benign race condition on shutdown that
1142 // will cause fewer workers to be created than one would expect. It isn't
1143 // much of an issue in real life, but affects some tests. Since we only spawn
1144 // one worker at a time, the following sequence of events can happen:
1145 //
1146 // 1. Main thread posts a bunch of unrelated tasks that would normally be
1147 // run on separate threads.
1148 // 2. The first task post causes us to start a worker. Other tasks do not
1149 // cause a worker to start since one is pending.
1150 // 3. Main thread initiates shutdown.
1151 // 4. No more threads are created since the shutdown_called_ flag is set.
1152 //
1153 // The result is that one may expect that max_threads_ workers to be created
1154 // given the workload, but in reality fewer may be created because the
1155 // sequence of thread creation on the background threads is racing with the
1156 // shutdown call.
1157 if (!shutdown_called_ &&
1158 !thread_being_created_ &&
1159 cleanup_state_ == CLEANUP_DONE &&
1160 threads_.size() < max_threads_ &&
1161 waiting_thread_count_ == 0) {
1162 // We could use an additional thread if there's work to be done.
1163 for (PendingTaskSet::const_iterator i = pending_tasks_.begin();
1164 i != pending_tasks_.end(); ++i) {
1165 if (IsSequenceTokenRunnable(i->sequence_token_id)) {
1166 // Found a runnable task, mark the thread as being started.
1167 thread_being_created_ = true;
1168 return static_cast<int>(threads_.size() + 1);
1169 }
1170 }
1171 }
1172 return 0;
1173 }
1174
FinishStartingAdditionalThread(int thread_number)1175 void SequencedWorkerPool::Inner::FinishStartingAdditionalThread(
1176 int thread_number) {
1177 // Called outside of the lock.
1178 DCHECK_GT(thread_number, 0);
1179
1180 // The worker is assigned to the list when the thread actually starts, which
1181 // will manage the memory of the pointer.
1182 new Worker(worker_pool_, thread_number, thread_name_prefix_);
1183 }
1184
SignalHasWork()1185 void SequencedWorkerPool::Inner::SignalHasWork() {
1186 has_work_cv_.Signal();
1187 if (testing_observer_) {
1188 testing_observer_->OnHasWork();
1189 }
1190 }
1191
CanShutdown() const1192 bool SequencedWorkerPool::Inner::CanShutdown() const {
1193 lock_.AssertAcquired();
1194 // See PrepareToStartAdditionalThreadIfHelpful for how thread creation works.
1195 return !thread_being_created_ &&
1196 blocking_shutdown_thread_count_ == 0 &&
1197 blocking_shutdown_pending_task_count_ == 0;
1198 }
1199
1200 base::StaticAtomicSequenceNumber
1201 SequencedWorkerPool::Inner::g_last_sequence_number_;
1202
1203 // SequencedWorkerPool --------------------------------------------------------
1204
ToString() const1205 std::string SequencedWorkerPool::SequenceToken::ToString() const {
1206 return base::StringPrintf("[%d]", id_);
1207 }
1208
1209 // static
1210 SequencedWorkerPool::SequenceToken
GetSequenceTokenForCurrentThread()1211 SequencedWorkerPool::GetSequenceTokenForCurrentThread() {
1212 Worker* worker = Worker::GetForCurrentThread();
1213 if (!worker)
1214 return SequenceToken();
1215
1216 return worker->task_sequence_token();
1217 }
1218
1219 // static
1220 scoped_refptr<SequencedWorkerPool>
GetWorkerPoolForCurrentThread()1221 SequencedWorkerPool::GetWorkerPoolForCurrentThread() {
1222 Worker* worker = Worker::GetForCurrentThread();
1223 if (!worker)
1224 return nullptr;
1225
1226 return worker->worker_pool();
1227 }
1228
1229 // static
1230 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerForCurrentThread()1231 SequencedWorkerPool::GetSequencedTaskRunnerForCurrentThread() {
1232 Worker* worker = Worker::GetForCurrentThread();
1233
1234 // If there is no worker, this thread is not a worker thread. Otherwise, it is
1235 // currently running a task (sequenced or unsequenced).
1236 if (!worker)
1237 return nullptr;
1238
1239 scoped_refptr<SequencedWorkerPool> pool = worker->worker_pool();
1240 SequenceToken sequence_token = worker->task_sequence_token();
1241 WorkerShutdown shutdown_behavior = worker->task_shutdown_behavior();
1242 if (!sequence_token.IsValid()) {
1243 // Create a new sequence token and bind this thread to it, to make sure that
1244 // a task posted to the SequencedTaskRunner we are going to return is not
1245 // immediately going to run on a different thread.
1246 sequence_token = Inner::GetSequenceToken();
1247 pool->inner_->SetRunningTaskInfoForCurrentThread(sequence_token,
1248 shutdown_behavior);
1249 }
1250
1251 DCHECK(pool->IsRunningSequenceOnCurrentThread(sequence_token));
1252 return new SequencedWorkerPoolSequencedTaskRunner(
1253 std::move(pool), sequence_token, shutdown_behavior);
1254 }
1255
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix)1256 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1257 const std::string& thread_name_prefix)
1258 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1259 inner_(new Inner(this, max_threads, thread_name_prefix, NULL)) {
1260 }
1261
SequencedWorkerPool(size_t max_threads,const std::string & thread_name_prefix,TestingObserver * observer)1262 SequencedWorkerPool::SequencedWorkerPool(size_t max_threads,
1263 const std::string& thread_name_prefix,
1264 TestingObserver* observer)
1265 : constructor_task_runner_(ThreadTaskRunnerHandle::Get()),
1266 inner_(new Inner(this, max_threads, thread_name_prefix, observer)) {
1267 }
1268
~SequencedWorkerPool()1269 SequencedWorkerPool::~SequencedWorkerPool() {}
1270
OnDestruct() const1271 void SequencedWorkerPool::OnDestruct() const {
1272 // Avoid deleting ourselves on a worker thread (which would deadlock).
1273 if (RunsTasksOnCurrentThread()) {
1274 constructor_task_runner_->DeleteSoon(FROM_HERE, this);
1275 } else {
1276 delete this;
1277 }
1278 }
1279
1280 // static
GetSequenceToken()1281 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetSequenceToken() {
1282 return Inner::GetSequenceToken();
1283 }
1284
GetNamedSequenceToken(const std::string & name)1285 SequencedWorkerPool::SequenceToken SequencedWorkerPool::GetNamedSequenceToken(
1286 const std::string& name) {
1287 return inner_->GetNamedSequenceToken(name);
1288 }
1289
GetSequencedTaskRunner(SequenceToken token)1290 scoped_refptr<SequencedTaskRunner> SequencedWorkerPool::GetSequencedTaskRunner(
1291 SequenceToken token) {
1292 return GetSequencedTaskRunnerWithShutdownBehavior(token, BLOCK_SHUTDOWN);
1293 }
1294
1295 scoped_refptr<SequencedTaskRunner>
GetSequencedTaskRunnerWithShutdownBehavior(SequenceToken token,WorkerShutdown shutdown_behavior)1296 SequencedWorkerPool::GetSequencedTaskRunnerWithShutdownBehavior(
1297 SequenceToken token, WorkerShutdown shutdown_behavior) {
1298 return new SequencedWorkerPoolSequencedTaskRunner(
1299 this, token, shutdown_behavior);
1300 }
1301
1302 scoped_refptr<TaskRunner>
GetTaskRunnerWithShutdownBehavior(WorkerShutdown shutdown_behavior)1303 SequencedWorkerPool::GetTaskRunnerWithShutdownBehavior(
1304 WorkerShutdown shutdown_behavior) {
1305 return new SequencedWorkerPoolTaskRunner(this, shutdown_behavior);
1306 }
1307
PostWorkerTask(const tracked_objects::Location & from_here,const Closure & task)1308 bool SequencedWorkerPool::PostWorkerTask(
1309 const tracked_objects::Location& from_here,
1310 const Closure& task) {
1311 return inner_->PostTask(NULL, SequenceToken(), BLOCK_SHUTDOWN,
1312 from_here, task, TimeDelta());
1313 }
1314
PostDelayedWorkerTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1315 bool SequencedWorkerPool::PostDelayedWorkerTask(
1316 const tracked_objects::Location& from_here,
1317 const Closure& task,
1318 TimeDelta delay) {
1319 WorkerShutdown shutdown_behavior =
1320 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1321 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1322 from_here, task, delay);
1323 }
1324
PostWorkerTaskWithShutdownBehavior(const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1325 bool SequencedWorkerPool::PostWorkerTaskWithShutdownBehavior(
1326 const tracked_objects::Location& from_here,
1327 const Closure& task,
1328 WorkerShutdown shutdown_behavior) {
1329 return inner_->PostTask(NULL, SequenceToken(), shutdown_behavior,
1330 from_here, task, TimeDelta());
1331 }
1332
PostSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task)1333 bool SequencedWorkerPool::PostSequencedWorkerTask(
1334 SequenceToken sequence_token,
1335 const tracked_objects::Location& from_here,
1336 const Closure& task) {
1337 return inner_->PostTask(NULL, sequence_token, BLOCK_SHUTDOWN,
1338 from_here, task, TimeDelta());
1339 }
1340
PostDelayedSequencedWorkerTask(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1341 bool SequencedWorkerPool::PostDelayedSequencedWorkerTask(
1342 SequenceToken sequence_token,
1343 const tracked_objects::Location& from_here,
1344 const Closure& task,
1345 TimeDelta delay) {
1346 WorkerShutdown shutdown_behavior =
1347 delay == TimeDelta() ? BLOCK_SHUTDOWN : SKIP_ON_SHUTDOWN;
1348 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1349 from_here, task, delay);
1350 }
1351
PostNamedSequencedWorkerTask(const std::string & token_name,const tracked_objects::Location & from_here,const Closure & task)1352 bool SequencedWorkerPool::PostNamedSequencedWorkerTask(
1353 const std::string& token_name,
1354 const tracked_objects::Location& from_here,
1355 const Closure& task) {
1356 DCHECK(!token_name.empty());
1357 return inner_->PostTask(&token_name, SequenceToken(), BLOCK_SHUTDOWN,
1358 from_here, task, TimeDelta());
1359 }
1360
PostSequencedWorkerTaskWithShutdownBehavior(SequenceToken sequence_token,const tracked_objects::Location & from_here,const Closure & task,WorkerShutdown shutdown_behavior)1361 bool SequencedWorkerPool::PostSequencedWorkerTaskWithShutdownBehavior(
1362 SequenceToken sequence_token,
1363 const tracked_objects::Location& from_here,
1364 const Closure& task,
1365 WorkerShutdown shutdown_behavior) {
1366 return inner_->PostTask(NULL, sequence_token, shutdown_behavior,
1367 from_here, task, TimeDelta());
1368 }
1369
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,TimeDelta delay)1370 bool SequencedWorkerPool::PostDelayedTask(
1371 const tracked_objects::Location& from_here,
1372 const Closure& task,
1373 TimeDelta delay) {
1374 return PostDelayedWorkerTask(from_here, task, delay);
1375 }
1376
RunsTasksOnCurrentThread() const1377 bool SequencedWorkerPool::RunsTasksOnCurrentThread() const {
1378 return inner_->RunsTasksOnCurrentThread();
1379 }
1380
IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const1381 bool SequencedWorkerPool::IsRunningSequenceOnCurrentThread(
1382 SequenceToken sequence_token) const {
1383 return inner_->IsRunningSequenceOnCurrentThread(sequence_token);
1384 }
1385
IsRunningSequence(SequenceToken sequence_token) const1386 bool SequencedWorkerPool::IsRunningSequence(
1387 SequenceToken sequence_token) const {
1388 return inner_->IsRunningSequence(sequence_token);
1389 }
1390
FlushForTesting()1391 void SequencedWorkerPool::FlushForTesting() {
1392 inner_->CleanupForTesting();
1393 }
1394
SignalHasWorkForTesting()1395 void SequencedWorkerPool::SignalHasWorkForTesting() {
1396 inner_->SignalHasWorkForTesting();
1397 }
1398
Shutdown(int max_new_blocking_tasks_after_shutdown)1399 void SequencedWorkerPool::Shutdown(int max_new_blocking_tasks_after_shutdown) {
1400 DCHECK(constructor_task_runner_->BelongsToCurrentThread());
1401 inner_->Shutdown(max_new_blocking_tasks_after_shutdown);
1402 }
1403
IsShutdownInProgress()1404 bool SequencedWorkerPool::IsShutdownInProgress() {
1405 return inner_->IsShutdownInProgress();
1406 }
1407
1408 } // namespace base
1409