1 // Copyright 2016 Google Inc. All rights reserved 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #include "thread_pool.h" 16 17 #include <condition_variable> 18 #include <mutex> 19 #include <stack> 20 #include <thread> 21 #include <vector> 22 23 #include "affinity.h" 24 25 class ThreadPoolImpl : public ThreadPool { 26 public: ThreadPoolImpl(int num_threads)27 explicit ThreadPoolImpl(int num_threads) 28 : is_waiting_(false) { 29 SetAffinityForMultiThread(); 30 threads_.reserve(num_threads); 31 for (int i = 0; i < num_threads; i++) { 32 threads_.push_back(thread([this]() { Loop(); })); 33 } 34 } 35 ~ThreadPoolImpl()36 virtual ~ThreadPoolImpl() override { 37 } 38 Submit(function<void (void)> task)39 virtual void Submit(function<void(void)> task) override { 40 unique_lock<mutex> lock(mu_); 41 tasks_.push(task); 42 cond_.notify_one(); 43 } 44 Wait()45 virtual void Wait() override { 46 { 47 unique_lock<mutex> lock(mu_); 48 is_waiting_ = true; 49 cond_.notify_all(); 50 } 51 52 for (thread& th : threads_) { 53 th.join(); 54 } 55 56 SetAffinityForSingleThread(); 57 } 58 59 private: Loop()60 void Loop() { 61 while (true) { 62 function<void(void)> task; 63 { 64 unique_lock<mutex> lock(mu_); 65 if (tasks_.empty()) { 66 if (is_waiting_) 67 return; 68 cond_.wait(lock); 69 } 70 71 if (tasks_.empty()) 72 continue; 73 74 task = tasks_.top(); 75 tasks_.pop(); 76 } 77 task(); 78 } 79 } 80 81 vector<thread> threads_; 82 mutex mu_; 83 condition_variable cond_; 84 stack<function<void(void)>> tasks_; 85 bool is_waiting_; 86 }; 87 NewThreadPool(int num_threads)88ThreadPool* NewThreadPool(int num_threads) { 89 return new ThreadPoolImpl(num_threads); 90 } 91