1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef ART_RUNTIME_THREAD_POOL_H_
18 #define ART_RUNTIME_THREAD_POOL_H_
19 
20 #include <deque>
21 #include <functional>
22 #include <vector>
23 
24 #include "barrier.h"
25 #include "base/macros.h"
26 #include "base/mem_map.h"
27 #include "base/mutex.h"
28 
29 namespace art HIDDEN {
30 
31 class AbstractThreadPool;
32 
33 class Closure {
34  public:
~Closure()35   virtual ~Closure() { }
36   virtual void Run(Thread* self) = 0;
37 };
38 
39 class FunctionClosure : public Closure {
40  public:
FunctionClosure(std::function<void (Thread *)> && f)41   explicit FunctionClosure(std::function<void(Thread*)>&& f) : func_(std::move(f)) {}
Run(Thread * self)42   void Run(Thread* self) override {
43     func_(self);
44   }
45 
46  private:
47   std::function<void(Thread*)> func_;
48 };
49 
50 class Task : public Closure {
51  public:
52   // Called after Closure::Run has been called.
Finalize()53   virtual void Finalize() { }
54 };
55 
56 class SelfDeletingTask : public Task {
57  public:
~SelfDeletingTask()58   virtual ~SelfDeletingTask() { }
Finalize()59   virtual void Finalize() {
60     delete this;
61   }
62 };
63 
64 class FunctionTask : public SelfDeletingTask {
65  public:
FunctionTask(std::function<void (Thread *)> && func)66   explicit FunctionTask(std::function<void(Thread*)>&& func) : func_(std::move(func)) {}
67 
Run(Thread * self)68   void Run(Thread* self) override {
69     func_(self);
70   }
71 
72  private:
73   std::function<void(Thread*)> func_;
74 };
75 
76 class ThreadPoolWorker {
77  public:
78   static const size_t kDefaultStackSize = 1 * MB;
79 
GetStackSize()80   size_t GetStackSize() const {
81     DCHECK(stack_.IsValid());
82     return stack_.Size();
83   }
84 
85   virtual ~ThreadPoolWorker();
86 
87   // Set the "nice" priority for this worker.
88   void SetPthreadPriority(int priority);
89 
90   // Get the "nice" priority for this worker.
91   int GetPthreadPriority();
92 
GetThread()93   Thread* GetThread() const { return thread_; }
94 
95  protected:
96   ThreadPoolWorker(AbstractThreadPool* thread_pool, const std::string& name, size_t stack_size);
97   static void* Callback(void* arg) REQUIRES(!Locks::mutator_lock_);
98   virtual void Run();
99 
100   AbstractThreadPool* const thread_pool_;
101   const std::string name_;
102   MemMap stack_;
103   pthread_t pthread_;
104   Thread* thread_;
105 
106  private:
107   friend class AbstractThreadPool;
108   DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker);
109 };
110 
111 // Note that thread pool workers will set Thread#setCanCallIntoJava to false.
112 class AbstractThreadPool {
113  public:
114   // Returns the number of threads in the thread pool.
GetThreadCount()115   size_t GetThreadCount() const {
116     return threads_.size();
117   }
118 
119   EXPORT const std::vector<ThreadPoolWorker*>& GetWorkers();
120 
121   // Broadcast to the workers and tell them to empty out the work queue.
122   EXPORT void StartWorkers(Thread* self) REQUIRES(!task_queue_lock_);
123 
124   // Do not allow workers to grab any new tasks.
125   EXPORT void StopWorkers(Thread* self) REQUIRES(!task_queue_lock_);
126 
127   // Returns if the thread pool has started.
128   bool HasStarted(Thread* self) REQUIRES(!task_queue_lock_);
129 
130   // Add a new task, the first available started worker will process it. Does not delete the task
131   // after running it, it is the caller's responsibility.
132   virtual void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_) = 0;
133 
134   // Remove all tasks in the queue.
135   virtual void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_) = 0;
136 
137   virtual size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_) = 0;
138 
139   // Create the threads of this pool.
140   EXPORT void CreateThreads();
141 
142   // Stops and deletes all threads in this pool.
143   void DeleteThreads();
144 
145   // Wait for all tasks currently on queue to get completed. If the pool has been stopped, only
146   // wait till all already running tasks are done.
147   // When the pool was created with peers for workers, do_work must not be true (see ThreadPool()).
148   EXPORT void Wait(Thread* self, bool do_work, bool may_hold_locks) REQUIRES(!task_queue_lock_);
149 
150   // Returns the total amount of workers waited for tasks.
GetWaitTime()151   uint64_t GetWaitTime() const {
152     return total_wait_time_;
153   }
154 
155   // Provides a way to bound the maximum number of worker threads, threads must be less the the
156   // thread count of the thread pool.
157   void SetMaxActiveWorkers(size_t threads) REQUIRES(!task_queue_lock_);
158 
159   // Set the "nice" priority for threads in the pool.
160   void SetPthreadPriority(int priority);
161 
162   // CHECK that the "nice" priority of threads in the pool is the given
163   // `priority`.
164   void CheckPthreadPriority(int priority);
165 
166   // Wait for workers to be created.
167   void WaitForWorkersToBeCreated();
168 
~AbstractThreadPool()169   virtual ~AbstractThreadPool() {}
170 
171  protected:
172   // get a task to run, blocks if there are no tasks left
173   Task* GetTask(Thread* self) REQUIRES(!task_queue_lock_);
174 
175   // Try to get a task, returning null if there is none available.
176   Task* TryGetTask(Thread* self) REQUIRES(!task_queue_lock_);
177   virtual Task* TryGetTaskLocked() REQUIRES(task_queue_lock_) = 0;
178 
179   // Are we shutting down?
IsShuttingDown()180   bool IsShuttingDown() const REQUIRES(task_queue_lock_) {
181     return shutting_down_;
182   }
183 
184   virtual bool HasOutstandingTasks() const REQUIRES(task_queue_lock_) = 0;
185 
186   EXPORT AbstractThreadPool(const char* name,
187                             size_t num_threads,
188                             bool create_peers,
189                             size_t worker_stack_size);
190 
191   const std::string name_;
192   Mutex task_queue_lock_;
193   ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_);
194   ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_);
195   volatile bool started_ GUARDED_BY(task_queue_lock_);
196   volatile bool shutting_down_ GUARDED_BY(task_queue_lock_);
197   // How many worker threads are waiting on the condition.
198   volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_);
199   std::vector<ThreadPoolWorker*> threads_;
200   // Work balance detection.
201   uint64_t start_time_ GUARDED_BY(task_queue_lock_);
202   uint64_t total_wait_time_;
203   Barrier creation_barier_;
204   size_t max_active_workers_ GUARDED_BY(task_queue_lock_);
205   const bool create_peers_;
206   const size_t worker_stack_size_;
207 
208  private:
209   friend class ThreadPoolWorker;
210   friend class WorkStealingWorker;
211   DISALLOW_COPY_AND_ASSIGN(AbstractThreadPool);
212 };
213 
214 class EXPORT ThreadPool : public AbstractThreadPool {
215  public:
216   // Create a named thread pool with the given number of threads.
217   //
218   // If create_peers is true, all worker threads will have a Java peer object. Note that if the
219   // pool is asked to do work on the current thread (see Wait), a peer may not be available. Wait
220   // will conservatively abort if create_peers and do_work are true.
221   static ThreadPool* Create(const char* name,
222                             size_t num_threads,
223                             bool create_peers = false,
224                             size_t worker_stack_size = ThreadPoolWorker::kDefaultStackSize) {
225     ThreadPool* pool = new ThreadPool(name, num_threads, create_peers, worker_stack_size);
226     pool->CreateThreads();
227     return pool;
228   }
229 
230   void AddTask(Thread* self, Task* task) REQUIRES(!task_queue_lock_) override;
231   size_t GetTaskCount(Thread* self) REQUIRES(!task_queue_lock_) override;
232   void RemoveAllTasks(Thread* self) REQUIRES(!task_queue_lock_) override;
233   ~ThreadPool() override;
234 
235  protected:
236   Task* TryGetTaskLocked() REQUIRES(task_queue_lock_) override;
237 
HasOutstandingTasks()238   bool HasOutstandingTasks() const REQUIRES(task_queue_lock_) override {
239     return started_ && !tasks_.empty();
240   }
241 
ThreadPool(const char * name,size_t num_threads,bool create_peers,size_t worker_stack_size)242   ThreadPool(const char* name,
243              size_t num_threads,
244              bool create_peers,
245              size_t worker_stack_size)
246       : AbstractThreadPool(name, num_threads, create_peers, worker_stack_size) {}
247 
248  private:
249   std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_);
250 
251   DISALLOW_COPY_AND_ASSIGN(ThreadPool);
252 };
253 
254 }  // namespace art
255 
256 #endif  // ART_RUNTIME_THREAD_POOL_H_
257