1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef ART_RUNTIME_THREAD_POOL_H_ 18 #define ART_RUNTIME_THREAD_POOL_H_ 19 20 #include <deque> 21 #include <vector> 22 23 #include "barrier.h" 24 #include "base/mutex.h" 25 #include "mem_map.h" 26 27 namespace art { 28 29 class ThreadPool; 30 31 class Closure { 32 public: ~Closure()33 virtual ~Closure() { } 34 virtual void Run(Thread* self) = 0; 35 }; 36 37 class Task : public Closure { 38 public: 39 // Called after Closure::Run has been called. Finalize()40 virtual void Finalize() { } 41 }; 42 43 class SelfDeletingTask : public Task { 44 public: ~SelfDeletingTask()45 virtual ~SelfDeletingTask() { } Finalize()46 virtual void Finalize() { 47 delete this; 48 } 49 }; 50 51 class ThreadPoolWorker { 52 public: 53 static const size_t kDefaultStackSize = 1 * MB; 54 GetStackSize()55 size_t GetStackSize() const { 56 DCHECK(stack_.get() != nullptr); 57 return stack_->Size(); 58 } 59 60 virtual ~ThreadPoolWorker(); 61 62 protected: 63 ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 64 static void* Callback(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_); 65 virtual void Run(); 66 67 ThreadPool* const thread_pool_; 68 const std::string name_; 69 std::unique_ptr<MemMap> stack_; 70 pthread_t pthread_; 71 72 private: 73 friend class ThreadPool; 74 DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker); 75 }; 76 77 class ThreadPool { 78 public: 79 // Returns the number of threads in the thread pool. GetThreadCount()80 size_t GetThreadCount() const { 81 return threads_.size(); 82 } 83 84 // Broadcast to the workers and tell them to empty out the work queue. 85 void StartWorkers(Thread* self); 86 87 // Do not allow workers to grab any new tasks. 88 void StopWorkers(Thread* self); 89 90 // Add a new task, the first available started worker will process it. Does not delete the task 91 // after running it, it is the caller's responsibility. 92 void AddTask(Thread* self, Task* task); 93 94 explicit ThreadPool(const char* name, size_t num_threads); 95 virtual ~ThreadPool(); 96 97 // Wait for all tasks currently on queue to get completed. 98 void Wait(Thread* self, bool do_work, bool may_hold_locks); 99 100 size_t GetTaskCount(Thread* self); 101 102 // Returns the total amount of workers waited for tasks. GetWaitTime()103 uint64_t GetWaitTime() const { 104 return total_wait_time_; 105 } 106 107 // Provides a way to bound the maximum number of worker threads, threads must be less the the 108 // thread count of the thread pool. 109 void SetMaxActiveWorkers(size_t threads); 110 111 protected: 112 // get a task to run, blocks if there are no tasks left 113 virtual Task* GetTask(Thread* self); 114 115 // Try to get a task, returning null if there is none available. 116 Task* TryGetTask(Thread* self); 117 Task* TryGetTaskLocked() EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_); 118 119 // Are we shutting down? IsShuttingDown()120 bool IsShuttingDown() const EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_) { 121 return shutting_down_; 122 } 123 124 const std::string name_; 125 Mutex task_queue_lock_; 126 ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_); 127 ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_); 128 volatile bool started_ GUARDED_BY(task_queue_lock_); 129 volatile bool shutting_down_ GUARDED_BY(task_queue_lock_); 130 // How many worker threads are waiting on the condition. 131 volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_); 132 std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); 133 // TODO: make this immutable/const? 134 std::vector<ThreadPoolWorker*> threads_; 135 // Work balance detection. 136 uint64_t start_time_ GUARDED_BY(task_queue_lock_); 137 uint64_t total_wait_time_; 138 Barrier creation_barier_; 139 size_t max_active_workers_ GUARDED_BY(task_queue_lock_); 140 141 private: 142 friend class ThreadPoolWorker; 143 friend class WorkStealingWorker; 144 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 145 }; 146 147 class WorkStealingTask : public Task { 148 public: WorkStealingTask()149 WorkStealingTask() : ref_count_(0) {} 150 GetRefCount()151 size_t GetRefCount() const { 152 return ref_count_; 153 } 154 155 virtual void StealFrom(Thread* self, WorkStealingTask* source) = 0; 156 157 private: 158 // How many people are referencing this task. 159 size_t ref_count_; 160 161 friend class WorkStealingWorker; 162 }; 163 164 class WorkStealingWorker : public ThreadPoolWorker { 165 public: 166 virtual ~WorkStealingWorker(); 167 IsRunningTask()168 bool IsRunningTask() const { 169 return task_ != nullptr; 170 } 171 172 protected: 173 WorkStealingTask* task_; 174 175 WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 176 virtual void Run(); 177 178 private: 179 friend class WorkStealingThreadPool; 180 DISALLOW_COPY_AND_ASSIGN(WorkStealingWorker); 181 }; 182 183 class WorkStealingThreadPool : public ThreadPool { 184 public: 185 explicit WorkStealingThreadPool(const char* name, size_t num_threads); 186 virtual ~WorkStealingThreadPool(); 187 188 private: 189 Mutex work_steal_lock_; 190 // Which thread we are stealing from (round robin). 191 size_t steal_index_; 192 193 // Find a task to steal from 194 WorkStealingTask* FindTaskToStealFrom() EXCLUSIVE_LOCKS_REQUIRED(work_steal_lock_); 195 196 friend class WorkStealingWorker; 197 }; 198 199 } // namespace art 200 201 #endif // ART_RUNTIME_THREAD_POOL_H_ 202