1 // Copyright 2016 Google Inc. All rights reserved
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "thread_pool.h"
16 
17 #include <condition_variable>
18 #include <mutex>
19 #include <stack>
20 #include <thread>
21 #include <vector>
22 
23 #include "affinity.h"
24 
25 class ThreadPoolImpl : public ThreadPool {
26  public:
ThreadPoolImpl(int num_threads)27   explicit ThreadPoolImpl(int num_threads)
28       : is_waiting_(false) {
29     SetAffinityForMultiThread();
30     threads_.reserve(num_threads);
31     for (int i = 0; i < num_threads; i++) {
32       threads_.push_back(thread([this]() { Loop(); }));
33     }
34   }
35 
~ThreadPoolImpl()36   virtual ~ThreadPoolImpl() override {
37   }
38 
Submit(function<void (void)> task)39   virtual void Submit(function<void(void)> task) override {
40     unique_lock<mutex> lock(mu_);
41     tasks_.push(task);
42     cond_.notify_one();
43   }
44 
Wait()45   virtual void Wait() override {
46     {
47       unique_lock<mutex> lock(mu_);
48       is_waiting_ = true;
49       cond_.notify_all();
50     }
51 
52     for (thread& th : threads_) {
53       th.join();
54     }
55 
56     SetAffinityForSingleThread();
57   }
58 
59  private:
Loop()60   void Loop() {
61     while (true) {
62       function<void(void)> task;
63       {
64         unique_lock<mutex> lock(mu_);
65         if (tasks_.empty()) {
66           if (is_waiting_)
67             return;
68           cond_.wait(lock);
69         }
70 
71         if (tasks_.empty())
72           continue;
73 
74         task = tasks_.top();
75         tasks_.pop();
76       }
77       task();
78     }
79   }
80 
81   vector<thread> threads_;
82   mutex mu_;
83   condition_variable cond_;
84   stack<function<void(void)>> tasks_;
85   bool is_waiting_;
86 };
87 
NewThreadPool(int num_threads)88 ThreadPool* NewThreadPool(int num_threads) {
89   return new ThreadPoolImpl(num_threads);
90 }
91