1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 7 8 #include <stddef.h> 9 10 #include <cstddef> 11 #include <string> 12 13 #include "base/base_export.h" 14 #include "base/callback_forward.h" 15 #include "base/macros.h" 16 #include "base/memory/ref_counted.h" 17 #include "base/memory/scoped_ptr.h" 18 #include "base/single_thread_task_runner.h" 19 #include "base/task_runner.h" 20 21 namespace tracked_objects { 22 class Location; 23 } // namespace tracked_objects 24 25 namespace base { 26 27 class SingleThreadTaskRunner; 28 29 template <class T> class DeleteHelper; 30 31 class SequencedTaskRunner; 32 33 // A worker thread pool that enforces ordering between sets of tasks. It also 34 // allows you to specify what should happen to your tasks on shutdown. 35 // 36 // To enforce ordering, get a unique sequence token from the pool and post all 37 // tasks you want to order with the token. All tasks with the same token are 38 // guaranteed to execute serially, though not necessarily on the same thread. 39 // This means that: 40 // 41 // - No two tasks with the same token will run at the same time. 42 // 43 // - Given two tasks T1 and T2 with the same token such that T2 will 44 // run after T1, then T2 will start after T1 is destroyed. 45 // 46 // - If T2 will run after T1, then all memory changes in T1 and T1's 47 // destruction will be visible to T2. 48 // 49 // Example: 50 // SequencedWorkerPool::SequenceToken token = 51 // SequencedWorkerPool::GetSequenceToken(); 52 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 53 // FROM_HERE, base::Bind(...)); 54 // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, 55 // FROM_HERE, base::Bind(...)); 56 // 57 // You can make named sequence tokens to make it easier to share a token 58 // across different components. 59 // 60 // You can also post tasks to the pool without ordering using PostWorkerTask. 61 // These will be executed in an unspecified order. The order of execution 62 // between tasks with different sequence tokens is also unspecified. 63 // 64 // This class may be leaked on shutdown to facilitate fast shutdown. The 65 // expected usage, however, is to call Shutdown(), which correctly accounts 66 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN 67 // behavior. 68 // 69 // Implementation note: This does not use a base::WorkerPool since that does 70 // not enforce shutdown semantics or allow us to specify how many worker 71 // threads to run. For the typical use case of random background work, we don't 72 // necessarily want to be super aggressive about creating threads. 73 // 74 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited 75 // from TaskRunner). 76 // 77 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid 78 // memory leaks. See http://crbug.com/273800 79 class BASE_EXPORT SequencedWorkerPool : public TaskRunner { 80 public: 81 // Defines what should happen to a task posted to the worker pool on 82 // shutdown. 83 enum WorkerShutdown { 84 // Tasks posted with this mode which have not run at shutdown will be 85 // deleted rather than run, and any tasks with this mode running at 86 // shutdown will be ignored (the worker thread will not be joined). 87 // 88 // This option provides a nice way to post stuff you don't want blocking 89 // shutdown. For example, you might be doing a slow DNS lookup and if it's 90 // blocked on the OS, you may not want to stop shutdown, since the result 91 // doesn't really matter at that point. 92 // 93 // However, you need to be very careful what you do in your callback when 94 // you use this option. Since the thread will continue to run until the OS 95 // terminates the process, the app can be in the process of tearing down 96 // when you're running. This means any singletons or global objects you 97 // use may suddenly become invalid out from under you. For this reason, 98 // it's best to use this only for slow but simple operations like the DNS 99 // example. 100 CONTINUE_ON_SHUTDOWN, 101 102 // Tasks posted with this mode that have not started executing at 103 // shutdown will be deleted rather than executed. However, any tasks that 104 // have already begun executing when shutdown is called will be allowed 105 // to continue, and will block shutdown until completion. 106 // 107 // Note: Because Shutdown() may block while these tasks are executing, 108 // care must be taken to ensure that they do not block on the thread that 109 // called Shutdown(), as this may lead to deadlock. 110 SKIP_ON_SHUTDOWN, 111 112 // Tasks posted with this mode will block shutdown until they're 113 // executed. Since this can have significant performance implications, 114 // use sparingly. 115 // 116 // Generally, this should be used only for user data, for example, a task 117 // writing a preference file. 118 // 119 // If a task is posted during shutdown, it will not get run since the 120 // workers may already be stopped. In this case, the post operation will 121 // fail (return false) and the task will be deleted. 122 BLOCK_SHUTDOWN, 123 }; 124 125 // Opaque identifier that defines sequencing of tasks posted to the worker 126 // pool. 127 class BASE_EXPORT SequenceToken { 128 public: SequenceToken()129 SequenceToken() : id_(0) {} ~SequenceToken()130 ~SequenceToken() {} 131 Equals(const SequenceToken & other)132 bool Equals(const SequenceToken& other) const { 133 return id_ == other.id_; 134 } 135 136 // Returns false if current thread is executing an unsequenced task. IsValid()137 bool IsValid() const { 138 return id_ != 0; 139 } 140 141 // Returns a string representation of this token. This method should only be 142 // used for debugging. 143 std::string ToString() const; 144 145 private: 146 friend class SequencedWorkerPool; 147 SequenceToken(int id)148 explicit SequenceToken(int id) : id_(id) {} 149 150 int id_; 151 }; 152 153 // Allows tests to perform certain actions. 154 class TestingObserver { 155 public: ~TestingObserver()156 virtual ~TestingObserver() {} 157 virtual void OnHasWork() = 0; 158 virtual void WillWaitForShutdown() = 0; 159 virtual void OnDestruct() = 0; 160 }; 161 162 // Gets the SequencedToken of the current thread. 163 // If current thread is not a SequencedWorkerPool worker thread or is running 164 // an unsequenced task, returns an invalid SequenceToken. 165 static SequenceToken GetSequenceTokenForCurrentThread(); 166 167 // Gets a SequencedTaskRunner for the current thread. If the current thread is 168 // running an unsequenced task, a new SequenceToken will be generated and set, 169 // so that the returned SequencedTaskRunner is guaranteed to run tasks after 170 // the current task has finished running. 171 static scoped_refptr<SequencedTaskRunner> 172 GetSequencedTaskRunnerForCurrentThread(); 173 174 // Returns a unique token that can be used to sequence tasks posted to 175 // PostSequencedWorkerTask(). Valid tokens are always nonzero. 176 // TODO(bauerb): Rename this to better differentiate from 177 // GetSequenceTokenForCurrentThread(). 178 static SequenceToken GetSequenceToken(); 179 180 // Returns the SequencedWorkerPool that owns this thread, or null if the 181 // current thread is not a SequencedWorkerPool worker thread. 182 static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread(); 183 184 // When constructing a SequencedWorkerPool, there must be a 185 // ThreadTaskRunnerHandle on the current thread unless you plan to 186 // deliberately leak it. 187 188 // Pass the maximum number of threads (they will be lazily created as needed) 189 // and a prefix for the thread name to aid in debugging. 190 SequencedWorkerPool(size_t max_threads, 191 const std::string& thread_name_prefix); 192 193 // Like above, but with |observer| for testing. Does not take ownership of 194 // |observer|. 195 SequencedWorkerPool(size_t max_threads, 196 const std::string& thread_name_prefix, 197 TestingObserver* observer); 198 199 // Returns the sequence token associated with the given name. Calling this 200 // function multiple times with the same string will always produce the 201 // same sequence token. If the name has not been used before, a new token 202 // will be created. 203 SequenceToken GetNamedSequenceToken(const std::string& name); 204 205 // Returns a SequencedTaskRunner wrapper which posts to this 206 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 207 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 208 // are posted with BLOCK_SHUTDOWN behavior. 209 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner( 210 SequenceToken token); 211 212 // Returns a SequencedTaskRunner wrapper which posts to this 213 // SequencedWorkerPool using the given sequence token. Tasks with nonzero 214 // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay 215 // are posted with the given shutdown behavior. 216 scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior( 217 SequenceToken token, 218 WorkerShutdown shutdown_behavior); 219 220 // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using 221 // the given shutdown behavior. Tasks with nonzero delay are posted with 222 // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the 223 // given shutdown behavior. 224 scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior( 225 WorkerShutdown shutdown_behavior); 226 227 // Posts the given task for execution in the worker pool. Tasks posted with 228 // this function will execute in an unspecified order on a background thread. 229 // Returns true if the task was posted. If your tasks have ordering 230 // requirements, see PostSequencedWorkerTask(). 231 // 232 // This class will attempt to delete tasks that aren't run 233 // (non-block-shutdown semantics) but can't guarantee that this happens. If 234 // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there 235 // will be no workers available to delete these tasks. And there may be 236 // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN 237 // tasks. Deleting those tasks before the previous one has completed could 238 // cause nondeterministic crashes because the task could be keeping some 239 // objects alive which do work in their destructor, which could voilate the 240 // assumptions of the running task. 241 // 242 // The task will be guaranteed to run to completion before shutdown 243 // (BLOCK_SHUTDOWN semantics). 244 // 245 // Returns true if the task was posted successfully. This may fail during 246 // shutdown regardless of the specified ShutdownBehavior. 247 bool PostWorkerTask(const tracked_objects::Location& from_here, 248 const Closure& task); 249 250 // Same as PostWorkerTask but allows a delay to be specified (although doing 251 // so changes the shutdown behavior). The task will be run after the given 252 // delay has elapsed. 253 // 254 // If the delay is nonzero, the task won't be guaranteed to run to completion 255 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 256 // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the 257 // task will be guaranteed to run to completion before shutdown 258 // (BLOCK_SHUTDOWN semantics). 259 bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, 260 const Closure& task, 261 TimeDelta delay); 262 263 // Same as PostWorkerTask but allows specification of the shutdown behavior. 264 bool PostWorkerTaskWithShutdownBehavior( 265 const tracked_objects::Location& from_here, 266 const Closure& task, 267 WorkerShutdown shutdown_behavior); 268 269 // Like PostWorkerTask above, but provides sequencing semantics. This means 270 // that tasks posted with the same sequence token (see GetSequenceToken()) 271 // are guaranteed to execute in order. This is useful in cases where you're 272 // doing operations that may depend on previous ones, like appending to a 273 // file. 274 // 275 // The task will be guaranteed to run to completion before shutdown 276 // (BLOCK_SHUTDOWN semantics). 277 // 278 // Returns true if the task was posted successfully. This may fail during 279 // shutdown regardless of the specified ShutdownBehavior. 280 bool PostSequencedWorkerTask(SequenceToken sequence_token, 281 const tracked_objects::Location& from_here, 282 const Closure& task); 283 284 // Like PostSequencedWorkerTask above, but allows you to specify a named 285 // token, which saves an extra call to GetNamedSequenceToken. 286 bool PostNamedSequencedWorkerTask(const std::string& token_name, 287 const tracked_objects::Location& from_here, 288 const Closure& task); 289 290 // Same as PostSequencedWorkerTask but allows a delay to be specified 291 // (although doing so changes the shutdown behavior). The task will be run 292 // after the given delay has elapsed. 293 // 294 // If the delay is nonzero, the task won't be guaranteed to run to completion 295 // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. 296 // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, 297 // i.e. the task will be guaranteed to run to completion before shutdown 298 // (BLOCK_SHUTDOWN semantics). 299 bool PostDelayedSequencedWorkerTask( 300 SequenceToken sequence_token, 301 const tracked_objects::Location& from_here, 302 const Closure& task, 303 TimeDelta delay); 304 305 // Same as PostSequencedWorkerTask but allows specification of the shutdown 306 // behavior. 307 bool PostSequencedWorkerTaskWithShutdownBehavior( 308 SequenceToken sequence_token, 309 const tracked_objects::Location& from_here, 310 const Closure& task, 311 WorkerShutdown shutdown_behavior); 312 313 // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). 314 bool PostDelayedTask(const tracked_objects::Location& from_here, 315 const Closure& task, 316 TimeDelta delay) override; 317 bool RunsTasksOnCurrentThread() const override; 318 319 // Returns true if the current thread is processing a task with the given 320 // sequence_token. 321 bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; 322 323 // Returns true if any thread is currently processing a task with the given 324 // sequence token. Should only be called with a valid sequence token. 325 bool IsRunningSequence(SequenceToken sequence_token) const; 326 327 // Blocks until all pending tasks are complete. This should only be called in 328 // unit tests when you want to validate something that should have happened. 329 // This will not flush delayed tasks; delayed tasks get deleted. 330 // 331 // Note that calling this will not prevent other threads from posting work to 332 // the queue while the calling thread is waiting on Flush(). In this case, 333 // Flush will return only when there's no more work in the queue. Normally, 334 // this doesn't come up since in a test, all the work is being posted from 335 // the main thread. 336 void FlushForTesting(); 337 338 // Spuriously signal that there is work to be done. 339 void SignalHasWorkForTesting(); 340 341 // Implements the worker pool shutdown. This should be called during app 342 // shutdown, and will discard/join with appropriate tasks before returning. 343 // After this call, subsequent calls to post tasks will fail. 344 // 345 // Must be called from the same thread this object was constructed on. Shutdown()346 void Shutdown() { Shutdown(0); } 347 348 // A variant that allows an arbitrary number of new blocking tasks to be 349 // posted during shutdown. The tasks cannot be posted within the execution 350 // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once 351 // the limit is reached, subsequent calls to post task fail in all cases. 352 // Must be called from the same thread this object was constructed on. 353 void Shutdown(int max_new_blocking_tasks_after_shutdown); 354 355 // Check if Shutdown was called for given threading pool. This method is used 356 // for aborting time consuming operation to avoid blocking shutdown. 357 // 358 // Can be called from any thread. 359 bool IsShutdownInProgress(); 360 361 protected: 362 ~SequencedWorkerPool() override; 363 364 void OnDestruct() const override; 365 366 private: 367 friend class RefCountedThreadSafe<SequencedWorkerPool>; 368 friend class DeleteHelper<SequencedWorkerPool>; 369 370 class Inner; 371 class Worker; 372 373 const scoped_refptr<SingleThreadTaskRunner> constructor_task_runner_; 374 375 // Avoid pulling in too many headers by putting (almost) everything 376 // into |inner_|. 377 const scoped_ptr<Inner> inner_; 378 379 DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); 380 }; 381 382 } // namespace base 383 384 #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_ 385