1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/threading/worker_pool_posix.h"
6 
7 #include <set>
8 
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/synchronization/condition_variable.h"
13 #include "base/synchronization/lock.h"
14 #include "base/synchronization/waitable_event.h"
15 #include "base/threading/platform_thread.h"
16 #include "testing/gtest/include/gtest/gtest.h"
17 
18 namespace base {
19 
20 // Peer class to provide passthrough access to PosixDynamicThreadPool internals.
21 class PosixDynamicThreadPool::PosixDynamicThreadPoolPeer {
22  public:
PosixDynamicThreadPoolPeer(PosixDynamicThreadPool * pool)23   explicit PosixDynamicThreadPoolPeer(PosixDynamicThreadPool* pool)
24       : pool_(pool) {}
25 
lock()26   Lock* lock() { return &pool_->lock_; }
pending_tasks_available_cv()27   ConditionVariable* pending_tasks_available_cv() {
28     return &pool_->pending_tasks_available_cv_;
29   }
pending_tasks() const30   const std::queue<PendingTask>& pending_tasks() const {
31     return pool_->pending_tasks_;
32   }
num_idle_threads() const33   int num_idle_threads() const { return pool_->num_idle_threads_; }
num_idle_threads_cv()34   ConditionVariable* num_idle_threads_cv() {
35     return pool_->num_idle_threads_cv_.get();
36   }
set_num_idle_threads_cv(ConditionVariable * cv)37   void set_num_idle_threads_cv(ConditionVariable* cv) {
38     pool_->num_idle_threads_cv_.reset(cv);
39   }
40 
41  private:
42   PosixDynamicThreadPool* pool_;
43 
44   DISALLOW_COPY_AND_ASSIGN(PosixDynamicThreadPoolPeer);
45 };
46 
47 namespace {
48 
49 // IncrementingTask's main purpose is to increment a counter.  It also updates a
50 // set of unique thread ids, and signals a ConditionVariable on completion.
51 // Note that since it does not block, there is no way to control the number of
52 // threads used if more than one IncrementingTask is consecutively posted to the
53 // thread pool, since the first one might finish executing before the subsequent
54 // PostTask() calls get invoked.
IncrementingTask(Lock * counter_lock,int * counter,Lock * unique_threads_lock,std::set<PlatformThreadId> * unique_threads)55 void IncrementingTask(Lock* counter_lock,
56                       int* counter,
57                       Lock* unique_threads_lock,
58                       std::set<PlatformThreadId>* unique_threads) {
59   {
60     base::AutoLock locked(*unique_threads_lock);
61     unique_threads->insert(PlatformThread::CurrentId());
62   }
63   base::AutoLock locked(*counter_lock);
64   (*counter)++;
65 }
66 
67 // BlockingIncrementingTask is a simple wrapper around IncrementingTask that
68 // allows for waiting at the start of Run() for a WaitableEvent to be signalled.
69 struct BlockingIncrementingTaskArgs {
70   Lock* counter_lock;
71   int* counter;
72   Lock* unique_threads_lock;
73   std::set<PlatformThreadId>* unique_threads;
74   Lock* num_waiting_to_start_lock;
75   int* num_waiting_to_start;
76   ConditionVariable* num_waiting_to_start_cv;
77   base::WaitableEvent* start;
78 };
79 
BlockingIncrementingTask(const BlockingIncrementingTaskArgs & args)80 void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
81   {
82     base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
83     (*args.num_waiting_to_start)++;
84   }
85   args.num_waiting_to_start_cv->Signal();
86   args.start->Wait();
87   IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
88                    args.unique_threads);
89 }
90 
91 class PosixDynamicThreadPoolTest : public testing::Test {
92  protected:
PosixDynamicThreadPoolTest()93   PosixDynamicThreadPoolTest()
94       : pool_(new base::PosixDynamicThreadPool("dynamic_pool", 60 * 60)),
95         peer_(pool_.get()),
96         counter_(0),
97         num_waiting_to_start_(0),
98         num_waiting_to_start_cv_(&num_waiting_to_start_lock_),
99         start_(true, false) {}
100 
SetUp()101   void SetUp() override {
102     peer_.set_num_idle_threads_cv(new ConditionVariable(peer_.lock()));
103   }
104 
TearDown()105   void TearDown() override {
106     // Wake up the idle threads so they can terminate.
107     if (pool_.get())
108       pool_->Terminate();
109   }
110 
WaitForTasksToStart(int num_tasks)111   void WaitForTasksToStart(int num_tasks) {
112     base::AutoLock num_waiting_to_start_locked(num_waiting_to_start_lock_);
113     while (num_waiting_to_start_ < num_tasks) {
114       num_waiting_to_start_cv_.Wait();
115     }
116   }
117 
WaitForIdleThreads(int num_idle_threads)118   void WaitForIdleThreads(int num_idle_threads) {
119     base::AutoLock pool_locked(*peer_.lock());
120     while (peer_.num_idle_threads() < num_idle_threads) {
121       peer_.num_idle_threads_cv()->Wait();
122     }
123   }
124 
CreateNewIncrementingTaskCallback()125   base::Closure CreateNewIncrementingTaskCallback() {
126     return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
127                       &unique_threads_lock_, &unique_threads_);
128   }
129 
CreateNewBlockingIncrementingTaskCallback()130   base::Closure CreateNewBlockingIncrementingTaskCallback() {
131     BlockingIncrementingTaskArgs args = {
132         &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
133         &num_waiting_to_start_lock_, &num_waiting_to_start_,
134         &num_waiting_to_start_cv_, &start_
135     };
136     return base::Bind(&BlockingIncrementingTask, args);
137   }
138 
139   scoped_refptr<base::PosixDynamicThreadPool> pool_;
140   base::PosixDynamicThreadPool::PosixDynamicThreadPoolPeer peer_;
141   Lock counter_lock_;
142   int counter_;
143   Lock unique_threads_lock_;
144   std::set<PlatformThreadId> unique_threads_;
145   Lock num_waiting_to_start_lock_;
146   int num_waiting_to_start_;
147   ConditionVariable num_waiting_to_start_cv_;
148   base::WaitableEvent start_;
149 };
150 
151 }  // namespace
152 
TEST_F(PosixDynamicThreadPoolTest,Basic)153 TEST_F(PosixDynamicThreadPoolTest, Basic) {
154   EXPECT_EQ(0, peer_.num_idle_threads());
155   EXPECT_EQ(0U, unique_threads_.size());
156   EXPECT_EQ(0U, peer_.pending_tasks().size());
157 
158   // Add one task and wait for it to be completed.
159   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
160 
161   WaitForIdleThreads(1);
162 
163   EXPECT_EQ(1U, unique_threads_.size()) <<
164       "There should be only one thread allocated for one task.";
165   EXPECT_EQ(1, counter_);
166 }
167 
TEST_F(PosixDynamicThreadPoolTest,ReuseIdle)168 TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
169   // Add one task and wait for it to be completed.
170   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
171 
172   WaitForIdleThreads(1);
173 
174   // Add another 2 tasks.  One should reuse the existing worker thread.
175   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
176   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
177 
178   WaitForTasksToStart(2);
179   start_.Signal();
180   WaitForIdleThreads(2);
181 
182   EXPECT_EQ(2U, unique_threads_.size());
183   EXPECT_EQ(2, peer_.num_idle_threads());
184   EXPECT_EQ(3, counter_);
185 }
186 
TEST_F(PosixDynamicThreadPoolTest,TwoActiveTasks)187 TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
188   // Add two blocking tasks.
189   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
190   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
191 
192   EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";
193 
194   WaitForTasksToStart(2);
195   start_.Signal();
196   WaitForIdleThreads(2);
197 
198   EXPECT_EQ(2U, unique_threads_.size());
199   EXPECT_EQ(2, peer_.num_idle_threads()) << "Existing threads are now idle.";
200   EXPECT_EQ(2, counter_);
201 }
202 
TEST_F(PosixDynamicThreadPoolTest,Complex)203 TEST_F(PosixDynamicThreadPoolTest, Complex) {
204   // Add two non blocking tasks and wait for them to finish.
205   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
206 
207   WaitForIdleThreads(1);
208 
209   // Add two blocking tasks, start them simultaneously, and wait for them to
210   // finish.
211   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
212   pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
213 
214   WaitForTasksToStart(2);
215   start_.Signal();
216   WaitForIdleThreads(2);
217 
218   EXPECT_EQ(3, counter_);
219   EXPECT_EQ(2, peer_.num_idle_threads());
220   EXPECT_EQ(2U, unique_threads_.size());
221 
222   // Wake up all idle threads so they can exit.
223   {
224     base::AutoLock locked(*peer_.lock());
225     while (peer_.num_idle_threads() > 0) {
226       peer_.pending_tasks_available_cv()->Signal();
227       peer_.num_idle_threads_cv()->Wait();
228     }
229   }
230 
231   // Add another non blocking task.  There are no threads to reuse.
232   pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
233   WaitForIdleThreads(1);
234 
235   // The POSIX implementation of PlatformThread::CurrentId() uses pthread_self()
236   // which is not guaranteed to be unique after a thread joins. The OS X
237   // implemntation of pthread_self() returns the address of the pthread_t, which
238   // is merely a malloc()ed pointer stored in the first TLS slot. When a thread
239   // joins and that structure is freed, the block of memory can be put on the
240   // OS free list, meaning the same address could be reused in a subsequent
241   // allocation. This in fact happens when allocating in a loop as this test
242   // does.
243   //
244   // Because there are two concurrent threads, there's at least the guarantee
245   // of having two unique thread IDs in the set. But after those two threads are
246   // joined, the next-created thread can get a re-used ID if the allocation of
247   // the pthread_t structure is taken from the free list. Therefore, there can
248   // be either 2 or 3 unique thread IDs in the set at this stage in the test.
249   EXPECT_TRUE(unique_threads_.size() >= 2 && unique_threads_.size() <= 3)
250       << "unique_threads_.size() = " << unique_threads_.size();
251   EXPECT_EQ(1, peer_.num_idle_threads());
252   EXPECT_EQ(4, counter_);
253 }
254 
255 }  // namespace base
256