1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/cpp/server/dynamic_thread_pool.h"
20 
21 #include <mutex>
22 
23 #include <grpc/support/log.h>
24 
25 #include "src/core/lib/gprpp/thd.h"
26 
27 namespace grpc {
28 
DynamicThread(DynamicThreadPool * pool)29 DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
30     : pool_(pool),
31       thd_("grpcpp_dynamic_pool",
32            [](void* th) {
33              static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
34            },
35            this) {
36   thd_.Start();
37 }
~DynamicThread()38 DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
39 
ThreadFunc()40 void DynamicThreadPool::DynamicThread::ThreadFunc() {
41   pool_->ThreadFunc();
42   // Now that we have killed ourselves, we should reduce the thread count
43   std::unique_lock<std::mutex> lock(pool_->mu_);
44   pool_->nthreads_--;
45   // Move ourselves to dead list
46   pool_->dead_threads_.push_back(this);
47 
48   if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
49     pool_->shutdown_cv_.notify_one();
50   }
51 }
52 
ThreadFunc()53 void DynamicThreadPool::ThreadFunc() {
54   for (;;) {
55     // Wait until work is available or we are shutting down.
56     std::unique_lock<std::mutex> lock(mu_);
57     if (!shutdown_ && callbacks_.empty()) {
58       // If there are too many threads waiting, then quit this thread
59       if (threads_waiting_ >= reserve_threads_) {
60         break;
61       }
62       threads_waiting_++;
63       cv_.wait(lock);
64       threads_waiting_--;
65     }
66     // Drain callbacks before considering shutdown to ensure all work
67     // gets completed.
68     if (!callbacks_.empty()) {
69       auto cb = callbacks_.front();
70       callbacks_.pop();
71       lock.unlock();
72       cb();
73     } else if (shutdown_) {
74       break;
75     }
76   }
77 }
78 
DynamicThreadPool(int reserve_threads)79 DynamicThreadPool::DynamicThreadPool(int reserve_threads)
80     : shutdown_(false),
81       reserve_threads_(reserve_threads),
82       nthreads_(0),
83       threads_waiting_(0) {
84   for (int i = 0; i < reserve_threads_; i++) {
85     std::lock_guard<std::mutex> lock(mu_);
86     nthreads_++;
87     new DynamicThread(this);
88   }
89 }
90 
ReapThreads(std::list<DynamicThread * > * tlist)91 void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
92   for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
93     delete *t;
94   }
95 }
96 
~DynamicThreadPool()97 DynamicThreadPool::~DynamicThreadPool() {
98   std::unique_lock<std::mutex> lock(mu_);
99   shutdown_ = true;
100   cv_.notify_all();
101   while (nthreads_ != 0) {
102     shutdown_cv_.wait(lock);
103   }
104   ReapThreads(&dead_threads_);
105 }
106 
Add(const std::function<void ()> & callback)107 void DynamicThreadPool::Add(const std::function<void()>& callback) {
108   std::lock_guard<std::mutex> lock(mu_);
109   // Add works to the callbacks list
110   callbacks_.push(callback);
111   // Increase pool size or notify as needed
112   if (threads_waiting_ == 0) {
113     // Kick off a new thread
114     nthreads_++;
115     new DynamicThread(this);
116   } else {
117     cv_.notify_one();
118   }
119   // Also use this chance to harvest dead threads
120   if (!dead_threads_.empty()) {
121     ReapThreads(&dead_threads_);
122   }
123 }
124 
125 }  // namespace grpc
126