1 /* 2 * Copyright (C) 2012 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef ART_RUNTIME_THREAD_POOL_H_ 18 #define ART_RUNTIME_THREAD_POOL_H_ 19 20 #include <deque> 21 #include <vector> 22 23 #include "barrier.h" 24 #include "base/mutex.h" 25 #include "closure.h" 26 #include "mem_map.h" 27 28 namespace art { 29 30 class ThreadPool; 31 32 class Task : public Closure { 33 public: 34 // Called when references reaches 0. Finalize()35 virtual void Finalize() { } 36 }; 37 38 class ThreadPoolWorker { 39 public: 40 static const size_t kDefaultStackSize = 1 * MB; 41 GetStackSize()42 size_t GetStackSize() const { 43 DCHECK(stack_.get() != nullptr); 44 return stack_->Size(); 45 } 46 47 virtual ~ThreadPoolWorker(); 48 49 protected: 50 ThreadPoolWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 51 static void* Callback(void* arg) LOCKS_EXCLUDED(Locks::mutator_lock_); 52 virtual void Run(); 53 54 ThreadPool* const thread_pool_; 55 const std::string name_; 56 std::unique_ptr<MemMap> stack_; 57 pthread_t pthread_; 58 59 private: 60 friend class ThreadPool; 61 DISALLOW_COPY_AND_ASSIGN(ThreadPoolWorker); 62 }; 63 64 class ThreadPool { 65 public: 66 // Returns the number of threads in the thread pool. GetThreadCount()67 size_t GetThreadCount() const { 68 return threads_.size(); 69 } 70 71 // Broadcast to the workers and tell them to empty out the work queue. 72 void StartWorkers(Thread* self); 73 74 // Do not allow workers to grab any new tasks. 75 void StopWorkers(Thread* self); 76 77 // Add a new task, the first available started worker will process it. Does not delete the task 78 // after running it, it is the caller's responsibility. 79 void AddTask(Thread* self, Task* task); 80 81 explicit ThreadPool(const char* name, size_t num_threads); 82 virtual ~ThreadPool(); 83 84 // Wait for all tasks currently on queue to get completed. 85 void Wait(Thread* self, bool do_work, bool may_hold_locks); 86 87 size_t GetTaskCount(Thread* self); 88 89 // Returns the total amount of workers waited for tasks. GetWaitTime()90 uint64_t GetWaitTime() const { 91 return total_wait_time_; 92 } 93 94 // Provides a way to bound the maximum number of worker threads, threads must be less the the 95 // thread count of the thread pool. 96 void SetMaxActiveWorkers(size_t threads); 97 98 protected: 99 // get a task to run, blocks if there are no tasks left 100 virtual Task* GetTask(Thread* self); 101 102 // Try to get a task, returning NULL if there is none available. 103 Task* TryGetTask(Thread* self); 104 Task* TryGetTaskLocked(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_); 105 106 // Are we shutting down? IsShuttingDown()107 bool IsShuttingDown() const EXCLUSIVE_LOCKS_REQUIRED(task_queue_lock_) { 108 return shutting_down_; 109 } 110 111 const std::string name_; 112 Mutex task_queue_lock_; 113 ConditionVariable task_queue_condition_ GUARDED_BY(task_queue_lock_); 114 ConditionVariable completion_condition_ GUARDED_BY(task_queue_lock_); 115 volatile bool started_ GUARDED_BY(task_queue_lock_); 116 volatile bool shutting_down_ GUARDED_BY(task_queue_lock_); 117 // How many worker threads are waiting on the condition. 118 volatile size_t waiting_count_ GUARDED_BY(task_queue_lock_); 119 std::deque<Task*> tasks_ GUARDED_BY(task_queue_lock_); 120 // TODO: make this immutable/const? 121 std::vector<ThreadPoolWorker*> threads_; 122 // Work balance detection. 123 uint64_t start_time_ GUARDED_BY(task_queue_lock_); 124 uint64_t total_wait_time_; 125 Barrier creation_barier_; 126 size_t max_active_workers_ GUARDED_BY(task_queue_lock_); 127 128 private: 129 friend class ThreadPoolWorker; 130 friend class WorkStealingWorker; 131 DISALLOW_COPY_AND_ASSIGN(ThreadPool); 132 }; 133 134 class WorkStealingTask : public Task { 135 public: WorkStealingTask()136 WorkStealingTask() : ref_count_(0) {} 137 GetRefCount()138 size_t GetRefCount() const { 139 return ref_count_; 140 } 141 142 virtual void StealFrom(Thread* self, WorkStealingTask* source) = 0; 143 144 private: 145 // How many people are referencing this task. 146 size_t ref_count_; 147 148 friend class WorkStealingWorker; 149 }; 150 151 class WorkStealingWorker : public ThreadPoolWorker { 152 public: 153 virtual ~WorkStealingWorker(); 154 IsRunningTask()155 bool IsRunningTask() const { 156 return task_ != NULL; 157 } 158 159 protected: 160 WorkStealingTask* task_; 161 162 WorkStealingWorker(ThreadPool* thread_pool, const std::string& name, size_t stack_size); 163 virtual void Run(); 164 165 private: 166 friend class WorkStealingThreadPool; 167 DISALLOW_COPY_AND_ASSIGN(WorkStealingWorker); 168 }; 169 170 class WorkStealingThreadPool : public ThreadPool { 171 public: 172 explicit WorkStealingThreadPool(const char* name, size_t num_threads); 173 virtual ~WorkStealingThreadPool(); 174 175 private: 176 Mutex work_steal_lock_; 177 // Which thread we are stealing from (round robin). 178 size_t steal_index_; 179 180 // Find a task to steal from 181 WorkStealingTask* FindTaskToStealFrom(Thread* self) EXCLUSIVE_LOCKS_REQUIRED(work_steal_lock_); 182 183 friend class WorkStealingWorker; 184 }; 185 186 } // namespace art 187 188 #endif // ART_RUNTIME_THREAD_POOL_H_ 189