1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/threading/worker_pool_posix.h"
6
7 #include <set>
8
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/synchronization/lock.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/threading/platform_thread.h"
16 #include "testing/gtest/include/gtest/gtest.h"
17
18 namespace base {
19
20 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
21 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
22 public:
PosixDynamicThreadPoolPeer(PosixDynamicThreadPool * pool)23 explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
24 : pool_(pool) {}
25
lock()26 Lock* lock() { return &pool_->lock_; }
pending_tasks_available_cv()27 ConditionVariable* pending_tasks_available_cv() {
28 return &pool_->pending_tasks_available_cv_;
29 }
pending_tasks() const30 const std::queue<PendingTask>& pending_tasks() const {
31 return pool_->pending_tasks_;
32 }
num_idle_threads() const33 int num_idle_threads() const { return pool_->num_idle_threads_; }
num_idle_threads_cv()34 ConditionVariable* num_idle_threads_cv() {
35 return pool_->num_idle_threads_cv_.get();
36 }
set_num_idle_threads_cv(ConditionVariable * cv)37 void set_num_idle_threads_cv(ConditionVariable* cv) {
38 pool_->num_idle_threads_cv_.reset(cv);
39 }
40
41 private:
42 PosixDynamicThreadPool* pool_;
43
44 DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
45 };
46
47 namespace {
48
49 // IncrementingTask's main purpose is to increment a counter. It also updates a
50 // set of unique thread ids, and signals a ConditionVariable on completion.
51 // Note that since it does not block, there is no way to control the number of
52 // threads used if more than one IncrementingTask is consecutively posted to the
53 // thread pool, since the first one might finish executing before the subsequent
54 // PostTask() calls get invoked.
IncrementingTask(Lock * counter_lock,int * counter,Lock * unique_threads_lock,std::set<PlatformThreadId> * unique_threads)55 void IncrementingTask(Lock* counter_lock,
56 int* counter,
57 Lock* unique_threads_lock,
58 std::set<PlatformThreadId>* unique_threads) {
59 {
60 base::AutoLock locked(*unique_threads_lock);
61 unique_threads->insert(PlatformThread::CurrentId());
62 }
63 base::AutoLock locked(*counter_lock);
64 (*counter)++;
65 }
66
67 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
68 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
69 struct BlockingIncrementingTaskArgs {
70 Lock* counter_lock;
71 int* counter;
72 Lock* unique_threads_lock;
73 std::set<PlatformThreadId>* unique_threads;
74 Lock* num_waiting_to_start_lock;
75 int* num_waiting_to_start;
76 ConditionVariable* num_waiting_to_start_cv;
77 base::WaitableEvent* start;
78 };
79
BlockingIncrementingTask(const BlockingIncrementingTaskArgs & args)80 void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
81 {
82 base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
83 (*args.num_waiting_to_start)++;
84 }
85 args.num_waiting_to_start_cv->Signal();
86 args.start->Wait();
87 IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
88 args.unique_threads);
89 }
90
91 class PosixDynamicThreadPoolTest : public testing::Test {
92 protected:
PosixDynamicThreadPoolTest()93 PosixDynamicThreadPoolTest()
94 : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60 * 60)),
95 peer_(pool_.get()),
96 counter_(0),
97 num_waiting_to_start_(0),
98 num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
99 start_(true, false) {}
100
SetUp()101 void SetUp() override {
102 peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
103 }
104
TearDown()105 void TearDown() override {
106 // Wake up the idle threads so they can terminate.
107 if (pool_.get())
108 pool_->Terminate();
109 }
110
WaitForTasksToStart(int num_tasks)111 void WaitForTasksToStart(int num_tasks) {
112 base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
113 while (num_waiting_to_start_ < num_tasks) {
114 num_waiting_to_start_cv_.Wait();
115 }
116 }
117
WaitForIdleThreads(int num_idle_threads)118 void WaitForIdleThreads(int num_idle_threads) {
119 base::AutoLock pool_locked(*peer_.lock());
120 while (peer_.num_idle_threads() < num_idle_threads) {
121 peer_.num_idle_threads_cv()->Wait();
122 }
123 }
124
CreateNewIncrementingTaskCallback()125 base::Closure CreateNewIncrementingTaskCallback() {
126 return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
127 &unique_threads_lock_, &unique_threads_);
128 }
129
CreateNewBlockingIncrementingTaskCallback()130 base::Closure CreateNewBlockingIncrementingTaskCallback() {
131 BlockingIncrementingTaskArgs args = {
132 &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
133 &num_waiting_to_start_lock_, &num_waiting_to_start_,
134 &num_waiting_to_start_cv_, &start_
135 };
136 return base::Bind(&BlockingIncrementingTask, args);
137 }
138
139 scoped_refptr<base::PosixDynamicThreadPool> pool_;
140 base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
141 Lock counter_lock_;
142 int counter_;
143 Lock unique_threads_lock_;
144 std::set<PlatformThreadId> unique_threads_;
145 Lock num_waiting_to_start_lock_;
146 int num_waiting_to_start_;
147 ConditionVariable num_waiting_to_start_cv_;
148 base::WaitableEvent start_;
149 };
150
151 } // namespace
152
TEST_F(PosixDynamicThreadPoolTest,Basic)153 TEST_F(PosixDynamicThreadPoolTest, Basic) {
154 EXPECT_EQ(0, peer_.num_idle_threads());
155 EXPECT_EQ(0U, unique_threads_.size());
156 EXPECT_EQ(0U, peer_.pending_tasks().size());
157
158 // Add one task and wait for it to be completed.
159 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
160
161 WaitForIdleThreads(1);
162
163 EXPECT_EQ(1U, unique_threads_.size()) <<
164 "There should be only one thread allocated for one task.";
165 EXPECT_EQ(1, counter_);
166 }
167
TEST_F(PosixDynamicThreadPoolTest,ReuseIdle)168 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
169 // Add one task and wait for it to be completed.
170 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
171
172 WaitForIdleThreads(1);
173
174 // Add another 2 tasks. One should reuse the existing worker thread.
175 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
176 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
177
178 WaitForTasksToStart(2);
179 start_.Signal();
180 WaitForIdleThreads(2);
181
182 EXPECT_EQ(2U, unique_threads_.size());
183 EXPECT_EQ(2, peer_.num_idle_threads());
184 EXPECT_EQ(3, counter_);
185 }
186
TEST_F(PosixDynamicThreadPoolTest,TwoActiveTasks)187 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
188 // Add two blocking tasks.
189 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
190 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
191
192 EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
193
194 WaitForTasksToStart(2);
195 start_.Signal();
196 WaitForIdleThreads(2);
197
198 EXPECT_EQ(2U, unique_threads_.size());
199 EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
200 EXPECT_EQ(2, counter_);
201 }
202
TEST_F(PosixDynamicThreadPoolTest,Complex)203 TEST_F(PosixDynamicThreadPoolTest, Complex) {
204 // Add two non blocking tasks and wait for them to finish.
205 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
206
207 WaitForIdleThreads(1);
208
209 // Add two blocking tasks, start them simultaneously, and wait for them to
210 // finish.
211 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
212 pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
213
214 WaitForTasksToStart(2);
215 start_.Signal();
216 WaitForIdleThreads(2);
217
218 EXPECT_EQ(3, counter_);
219 EXPECT_EQ(2, peer_.num_idle_threads());
220 EXPECT_EQ(2U, unique_threads_.size());
221
222 // Wake up all idle threads so they can exit.
223 {
224 base::AutoLock locked(*peer_.lock());
225 while (peer_.num_idle_threads() > 0) {
226 peer_.pending_tasks_available_cv()->Signal();
227 peer_.num_idle_threads_cv()->Wait();
228 }
229 }
230
231 // Add another non blocking task. There are no threads to reuse.
232 pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
233 WaitForIdleThreads(1);
234
235 // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
236 // which is not guaranteed to be unique after a thread joins. The OS X
237 // implemntation of pthread_self() returns the address of the pthread_t, which
238 // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
239 // joins and that structure is freed, the block of memory can be put on the
240 // OS free list, meaning the same address could be reused in a subsequent
241 // allocation. This in fact happens when allocating in a loop as this test
242 // does.
243 //
244 // Because there are two concurrent threads, there's at least the guarantee
245 // of having two unique thread IDs in the set. But after those two threads are
246 // joined, the next-created thread can get a re-used ID if the allocation of
247 // the pthread_t structure is taken from the free list. Therefore, there can
248 // be either 2 or 3 unique thread IDs in the set at this stage in the test.
249 EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
250 << "unique_threads_.size() = " << unique_threads_.size();
251 EXPECT_EQ(1, peer_.num_idle_threads());
252 EXPECT_EQ(4, counter_);
253 }
254
255 } // namespace base
256