1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_
6 #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_
7 
8 #include <stddef.h>
9 
10 #include <cstddef>
11 #include <memory>
12 #include <string>
13 
14 #include "base/base_export.h"
15 #include "base/callback_forward.h"
16 #include "base/macros.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/single_thread_task_runner.h"
19 #include "base/task_runner.h"
20 
21 namespace tracked_objects {
22 class Location;
23 }  // namespace tracked_objects
24 
25 namespace base {
26 
27 class SingleThreadTaskRunner;
28 
29 template <class T> class DeleteHelper;
30 
31 class SequencedTaskRunner;
32 
33 // A worker thread pool that enforces ordering between sets of tasks. It also
34 // allows you to specify what should happen to your tasks on shutdown.
35 //
36 // To enforce ordering, get a unique sequence token from the pool and post all
37 // tasks you want to order with the token. All tasks with the same token are
38 // guaranteed to execute serially, though not necessarily on the same thread.
39 // This means that:
40 //
41 //   - No two tasks with the same token will run at the same time.
42 //
43 //   - Given two tasks T1 and T2 with the same token such that T2 will
44 //     run after T1, then T2 will start after T1 is destroyed.
45 //
46 //   - If T2 will run after T1, then all memory changes in T1 and T1's
47 //     destruction will be visible to T2.
48 //
49 // Example:
50 //   SequencedWorkerPool::SequenceToken token =
51 //       SequencedWorkerPool::GetSequenceToken();
52 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
53 //                                FROM_HERE, base::Bind(...));
54 //   pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN,
55 //                                FROM_HERE, base::Bind(...));
56 //
57 // You can make named sequence tokens to make it easier to share a token
58 // across different components.
59 //
60 // You can also post tasks to the pool without ordering using PostWorkerTask.
61 // These will be executed in an unspecified order. The order of execution
62 // between tasks with different sequence tokens is also unspecified.
63 //
64 // This class may be leaked on shutdown to facilitate fast shutdown. The
65 // expected usage, however, is to call Shutdown(), which correctly accounts
66 // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN
67 // behavior.
68 //
69 // Implementation note: This does not use a base::WorkerPool since that does
70 // not enforce shutdown semantics or allow us to specify how many worker
71 // threads to run. For the typical use case of random background work, we don't
72 // necessarily want to be super aggressive about creating threads.
73 //
74 // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited
75 // from TaskRunner).
76 //
77 // Test-only code should wrap this in a base::SequencedWorkerPoolOwner to avoid
78 // memory leaks. See http://crbug.com/273800
79 class BASE_EXPORT SequencedWorkerPool : public TaskRunner {
80  public:
81   // Defines what should happen to a task posted to the worker pool on
82   // shutdown.
83   enum WorkerShutdown {
84     // Tasks posted with this mode which have not run at shutdown will be
85     // deleted rather than run, and any tasks with this mode running at
86     // shutdown will be ignored (the worker thread will not be joined).
87     //
88     // This option provides a nice way to post stuff you don't want blocking
89     // shutdown. For example, you might be doing a slow DNS lookup and if it's
90     // blocked on the OS, you may not want to stop shutdown, since the result
91     // doesn't really matter at that point.
92     //
93     // However, you need to be very careful what you do in your callback when
94     // you use this option. Since the thread will continue to run until the OS
95     // terminates the process, the app can be in the process of tearing down
96     // when you're running. This means any singletons or global objects you
97     // use may suddenly become invalid out from under you. For this reason,
98     // it's best to use this only for slow but simple operations like the DNS
99     // example.
100     CONTINUE_ON_SHUTDOWN,
101 
102     // Tasks posted with this mode that have not started executing at
103     // shutdown will be deleted rather than executed. However, any tasks that
104     // have already begun executing when shutdown is called will be allowed
105     // to continue, and will block shutdown until completion.
106     //
107     // Note: Because Shutdown() may block while these tasks are executing,
108     // care must be taken to ensure that they do not block on the thread that
109     // called Shutdown(), as this may lead to deadlock.
110     SKIP_ON_SHUTDOWN,
111 
112     // Tasks posted with this mode will block shutdown until they're
113     // executed. Since this can have significant performance implications,
114     // use sparingly.
115     //
116     // Generally, this should be used only for user data, for example, a task
117     // writing a preference file.
118     //
119     // If a task is posted during shutdown, it will not get run since the
120     // workers may already be stopped. In this case, the post operation will
121     // fail (return false) and the task will be deleted.
122     BLOCK_SHUTDOWN,
123   };
124 
125   // Opaque identifier that defines sequencing of tasks posted to the worker
126   // pool.
127   class BASE_EXPORT SequenceToken {
128    public:
SequenceToken()129     SequenceToken() : id_(0) {}
~SequenceToken()130     ~SequenceToken() {}
131 
Equals(const SequenceToken & other)132     bool Equals(const SequenceToken& other) const {
133       return id_ == other.id_;
134     }
135 
136     // Returns false if current thread is executing an unsequenced task.
IsValid()137     bool IsValid() const {
138       return id_ != 0;
139     }
140 
141     // Returns a string representation of this token. This method should only be
142     // used for debugging.
143     std::string ToString() const;
144 
145    private:
146     friend class SequencedWorkerPool;
147 
SequenceToken(int id)148     explicit SequenceToken(int id) : id_(id) {}
149 
150     int id_;
151   };
152 
153   // Allows tests to perform certain actions.
154   class TestingObserver {
155    public:
~TestingObserver()156     virtual ~TestingObserver() {}
157     virtual void OnHasWork() = 0;
158     virtual void WillWaitForShutdown() = 0;
159     virtual void OnDestruct() = 0;
160   };
161 
162   // Gets the SequencedToken of the current thread.
163   // If current thread is not a SequencedWorkerPool worker thread or is running
164   // an unsequenced task, returns an invalid SequenceToken.
165   static SequenceToken GetSequenceTokenForCurrentThread();
166 
167   // Gets a SequencedTaskRunner for the current thread. If the current thread is
168   // running an unsequenced task, a new SequenceToken will be generated and set,
169   // so that the returned SequencedTaskRunner is guaranteed to run tasks after
170   // the current task has finished running.
171   static scoped_refptr<SequencedTaskRunner>
172   GetSequencedTaskRunnerForCurrentThread();
173 
174   // Returns a unique token that can be used to sequence tasks posted to
175   // PostSequencedWorkerTask(). Valid tokens are always nonzero.
176   // TODO(bauerb): Rename this to better differentiate from
177   // GetSequenceTokenForCurrentThread().
178   static SequenceToken GetSequenceToken();
179 
180   // Returns the SequencedWorkerPool that owns this thread, or null if the
181   // current thread is not a SequencedWorkerPool worker thread.
182   static scoped_refptr<SequencedWorkerPool> GetWorkerPoolForCurrentThread();
183 
184   // When constructing a SequencedWorkerPool, there must be a
185   // ThreadTaskRunnerHandle on the current thread unless you plan to
186   // deliberately leak it.
187 
188   // Pass the maximum number of threads (they will be lazily created as needed)
189   // and a prefix for the thread name to aid in debugging.
190   SequencedWorkerPool(size_t max_threads,
191                       const std::string& thread_name_prefix);
192 
193   // Like above, but with |observer| for testing.  Does not take ownership of
194   // |observer|.
195   SequencedWorkerPool(size_t max_threads,
196                       const std::string& thread_name_prefix,
197                       TestingObserver* observer);
198 
199   // Returns the sequence token associated with the given name. Calling this
200   // function multiple times with the same string will always produce the
201   // same sequence token. If the name has not been used before, a new token
202   // will be created.
203   SequenceToken GetNamedSequenceToken(const std::string& name);
204 
205   // Returns a SequencedTaskRunner wrapper which posts to this
206   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
207   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
208   // are posted with BLOCK_SHUTDOWN behavior.
209   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunner(
210       SequenceToken token);
211 
212   // Returns a SequencedTaskRunner wrapper which posts to this
213   // SequencedWorkerPool using the given sequence token. Tasks with nonzero
214   // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay
215   // are posted with the given shutdown behavior.
216   scoped_refptr<SequencedTaskRunner> GetSequencedTaskRunnerWithShutdownBehavior(
217       SequenceToken token,
218       WorkerShutdown shutdown_behavior);
219 
220   // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using
221   // the given shutdown behavior. Tasks with nonzero delay are posted with
222   // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the
223   // given shutdown behavior.
224   scoped_refptr<TaskRunner> GetTaskRunnerWithShutdownBehavior(
225       WorkerShutdown shutdown_behavior);
226 
227   // Posts the given task for execution in the worker pool. Tasks posted with
228   // this function will execute in an unspecified order on a background thread.
229   // Returns true if the task was posted. If your tasks have ordering
230   // requirements, see PostSequencedWorkerTask().
231   //
232   // This class will attempt to delete tasks that aren't run
233   // (non-block-shutdown semantics) but can't guarantee that this happens. If
234   // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there
235   // will be no workers available to delete these tasks. And there may be
236   // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN
237   // tasks. Deleting those tasks before the previous one has completed could
238   // cause nondeterministic crashes because the task could be keeping some
239   // objects alive which do work in their destructor, which could voilate the
240   // assumptions of the running task.
241   //
242   // The task will be guaranteed to run to completion before shutdown
243   // (BLOCK_SHUTDOWN semantics).
244   //
245   // Returns true if the task was posted successfully. This may fail during
246   // shutdown regardless of the specified ShutdownBehavior.
247   bool PostWorkerTask(const tracked_objects::Location& from_here,
248                       const Closure& task);
249 
250   // Same as PostWorkerTask but allows a delay to be specified (although doing
251   // so changes the shutdown behavior). The task will be run after the given
252   // delay has elapsed.
253   //
254   // If the delay is nonzero, the task won't be guaranteed to run to completion
255   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
256   // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the
257   // task will be guaranteed to run to completion before shutdown
258   // (BLOCK_SHUTDOWN semantics).
259   bool PostDelayedWorkerTask(const tracked_objects::Location& from_here,
260                              const Closure& task,
261                              TimeDelta delay);
262 
263   // Same as PostWorkerTask but allows specification of the shutdown behavior.
264   bool PostWorkerTaskWithShutdownBehavior(
265       const tracked_objects::Location& from_here,
266       const Closure& task,
267       WorkerShutdown shutdown_behavior);
268 
269   // Like PostWorkerTask above, but provides sequencing semantics. This means
270   // that tasks posted with the same sequence token (see GetSequenceToken())
271   // are guaranteed to execute in order. This is useful in cases where you're
272   // doing operations that may depend on previous ones, like appending to a
273   // file.
274   //
275   // The task will be guaranteed to run to completion before shutdown
276   // (BLOCK_SHUTDOWN semantics).
277   //
278   // Returns true if the task was posted successfully. This may fail during
279   // shutdown regardless of the specified ShutdownBehavior.
280   bool PostSequencedWorkerTask(SequenceToken sequence_token,
281                                const tracked_objects::Location& from_here,
282                                const Closure& task);
283 
284   // Like PostSequencedWorkerTask above, but allows you to specify a named
285   // token, which saves an extra call to GetNamedSequenceToken.
286   bool PostNamedSequencedWorkerTask(const std::string& token_name,
287                                     const tracked_objects::Location& from_here,
288                                     const Closure& task);
289 
290   // Same as PostSequencedWorkerTask but allows a delay to be specified
291   // (although doing so changes the shutdown behavior). The task will be run
292   // after the given delay has elapsed.
293   //
294   // If the delay is nonzero, the task won't be guaranteed to run to completion
295   // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs.
296   // If the delay is zero, this behaves exactly like PostSequencedWorkerTask,
297   // i.e. the task will be guaranteed to run to completion before shutdown
298   // (BLOCK_SHUTDOWN semantics).
299   bool PostDelayedSequencedWorkerTask(
300       SequenceToken sequence_token,
301       const tracked_objects::Location& from_here,
302       const Closure& task,
303       TimeDelta delay);
304 
305   // Same as PostSequencedWorkerTask but allows specification of the shutdown
306   // behavior.
307   bool PostSequencedWorkerTaskWithShutdownBehavior(
308       SequenceToken sequence_token,
309       const tracked_objects::Location& from_here,
310       const Closure& task,
311       WorkerShutdown shutdown_behavior);
312 
313   // TaskRunner implementation. Forwards to PostDelayedWorkerTask().
314   bool PostDelayedTask(const tracked_objects::Location& from_here,
315                        const Closure& task,
316                        TimeDelta delay) override;
317   bool RunsTasksOnCurrentThread() const override;
318 
319   // Returns true if the current thread is processing a task with the given
320   // sequence_token.
321   bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const;
322 
323   // Returns true if any thread is currently processing a task with the given
324   // sequence token. Should only be called with a valid sequence token.
325   bool IsRunningSequence(SequenceToken sequence_token) const;
326 
327   // Blocks until all pending tasks are complete. This should only be called in
328   // unit tests when you want to validate something that should have happened.
329   // This will not flush delayed tasks; delayed tasks get deleted.
330   //
331   // Note that calling this will not prevent other threads from posting work to
332   // the queue while the calling thread is waiting on Flush(). In this case,
333   // Flush will return only when there's no more work in the queue. Normally,
334   // this doesn't come up since in a test, all the work is being posted from
335   // the main thread.
336   void FlushForTesting();
337 
338   // Spuriously signal that there is work to be done.
339   void SignalHasWorkForTesting();
340 
341   // Implements the worker pool shutdown. This should be called during app
342   // shutdown, and will discard/join with appropriate tasks before returning.
343   // After this call, subsequent calls to post tasks will fail.
344   //
345   // Must be called from the same thread this object was constructed on.
Shutdown()346   void Shutdown() { Shutdown(0); }
347 
348   // A variant that allows an arbitrary number of new blocking tasks to be
349   // posted during shutdown. The tasks cannot be posted within the execution
350   // context of tasks whose shutdown behavior is not BLOCKING_SHUTDOWN. Once
351   // the limit is reached, subsequent calls to post task fail in all cases.
352   // Must be called from the same thread this object was constructed on.
353   void Shutdown(int max_new_blocking_tasks_after_shutdown);
354 
355   // Check if Shutdown was called for given threading pool. This method is used
356   // for aborting time consuming operation to avoid blocking shutdown.
357   //
358   // Can be called from any thread.
359   bool IsShutdownInProgress();
360 
361  protected:
362   ~SequencedWorkerPool() override;
363 
364   void OnDestruct() const override;
365 
366  private:
367   friend class RefCountedThreadSafe<SequencedWorkerPool>;
368   friend class DeleteHelper<SequencedWorkerPool>;
369 
370   class Inner;
371   class Worker;
372 
373   const scoped_refptr<SingleThreadTaskRunner> constructor_task_runner_;
374 
375   // Avoid pulling in too many headers by putting (almost) everything
376   // into |inner_|.
377   const std::unique_ptr<Inner> inner_;
378 
379   DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool);
380 };
381 
382 }  // namespace base
383 
384 #endif  // BASE_THREADING_SEQUENCED_WORKER_POOL_H_
385