1 /*
2  * Copyright (C) 2012 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "thread_pool.h"
18 
19 #include <string>
20 
21 #include "atomic.h"
22 #include "common_runtime_test.h"
23 #include "scoped_thread_state_change-inl.h"
24 #include "thread-inl.h"
25 
26 namespace art {
27 
28 class CountTask : public Task {
29  public:
CountTask(AtomicInteger * count)30   explicit CountTask(AtomicInteger* count) : count_(count), verbose_(false) {}
31 
Run(Thread * self)32   void Run(Thread* self) {
33     if (verbose_) {
34       LOG(INFO) << "Running: " << *self;
35     }
36     // Simulate doing some work.
37     usleep(100);
38     // Increment the counter which keeps track of work completed.
39     ++*count_;
40   }
41 
Finalize()42   void Finalize() {
43     if (verbose_) {
44       LOG(INFO) << "Finalizing: " << *Thread::Current();
45     }
46     delete this;
47   }
48 
49  private:
50   AtomicInteger* const count_;
51   const bool verbose_;
52 };
53 
54 class ThreadPoolTest : public CommonRuntimeTest {
55  public:
56   static int32_t num_threads;
57 };
58 
59 int32_t ThreadPoolTest::num_threads = 4;
60 
61 // Check that the thread pool actually runs tasks that you assign it.
TEST_F(ThreadPoolTest,CheckRun)62 TEST_F(ThreadPoolTest, CheckRun) {
63   Thread* self = Thread::Current();
64   ThreadPool thread_pool("Thread pool test thread pool", num_threads);
65   AtomicInteger count(0);
66   static const int32_t num_tasks = num_threads * 4;
67   for (int32_t i = 0; i < num_tasks; ++i) {
68     thread_pool.AddTask(self, new CountTask(&count));
69   }
70   thread_pool.StartWorkers(self);
71   // Wait for tasks to complete.
72   thread_pool.Wait(self, true, false);
73   // Make sure that we finished all the work.
74   EXPECT_EQ(num_tasks, count.LoadSequentiallyConsistent());
75 }
76 
TEST_F(ThreadPoolTest,StopStart)77 TEST_F(ThreadPoolTest, StopStart) {
78   Thread* self = Thread::Current();
79   ThreadPool thread_pool("Thread pool test thread pool", num_threads);
80   AtomicInteger count(0);
81   static const int32_t num_tasks = num_threads * 4;
82   for (int32_t i = 0; i < num_tasks; ++i) {
83     thread_pool.AddTask(self, new CountTask(&count));
84   }
85   usleep(200);
86   // Check that no threads started prematurely.
87   EXPECT_EQ(0, count.LoadSequentiallyConsistent());
88   // Signal the threads to start processing tasks.
89   thread_pool.StartWorkers(self);
90   usleep(200);
91   thread_pool.StopWorkers(self);
92   AtomicInteger bad_count(0);
93   thread_pool.AddTask(self, new CountTask(&bad_count));
94   usleep(200);
95   // Ensure that the task added after the workers were stopped doesn't get run.
96   EXPECT_EQ(0, bad_count.LoadSequentiallyConsistent());
97   // Allow tasks to finish up and delete themselves.
98   thread_pool.StartWorkers(self);
99   thread_pool.Wait(self, false, false);
100 }
101 
TEST_F(ThreadPoolTest,StopWait)102 TEST_F(ThreadPoolTest, StopWait) {
103   Thread* self = Thread::Current();
104   ThreadPool thread_pool("Thread pool test thread pool", num_threads);
105 
106   AtomicInteger count(0);
107   static const int32_t num_tasks = num_threads * 100;
108   for (int32_t i = 0; i < num_tasks; ++i) {
109     thread_pool.AddTask(self, new CountTask(&count));
110   }
111 
112   // Signal the threads to start processing tasks.
113   thread_pool.StartWorkers(self);
114   usleep(200);
115   thread_pool.StopWorkers(self);
116 
117   thread_pool.Wait(self, false, false);  // We should not deadlock here.
118 
119   // Drain the task list. Note: we have to restart here, as no tasks will be finished when
120   // the pool is stopped.
121   thread_pool.StartWorkers(self);
122   thread_pool.Wait(self, /* do_work */ true, false);
123 }
124 
125 class TreeTask : public Task {
126  public:
TreeTask(ThreadPool * const thread_pool,AtomicInteger * count,int depth)127   TreeTask(ThreadPool* const thread_pool, AtomicInteger* count, int depth)
128       : thread_pool_(thread_pool),
129         count_(count),
130         depth_(depth) {}
131 
Run(Thread * self)132   void Run(Thread* self) {
133     if (depth_ > 1) {
134       thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
135       thread_pool_->AddTask(self, new TreeTask(thread_pool_, count_, depth_ - 1));
136     }
137     // Increment the counter which keeps track of work completed.
138     ++*count_;
139   }
140 
Finalize()141   void Finalize() {
142     delete this;
143   }
144 
145  private:
146   ThreadPool* const thread_pool_;
147   AtomicInteger* const count_;
148   const int depth_;
149 };
150 
151 // Test that adding new tasks from within a task works.
TEST_F(ThreadPoolTest,RecursiveTest)152 TEST_F(ThreadPoolTest, RecursiveTest) {
153   Thread* self = Thread::Current();
154   ThreadPool thread_pool("Thread pool test thread pool", num_threads);
155   AtomicInteger count(0);
156   static const int depth = 8;
157   thread_pool.AddTask(self, new TreeTask(&thread_pool, &count, depth));
158   thread_pool.StartWorkers(self);
159   thread_pool.Wait(self, true, false);
160   EXPECT_EQ((1 << depth) - 1, count.LoadSequentiallyConsistent());
161 }
162 
163 class PeerTask : public Task {
164  public:
PeerTask()165   PeerTask() {}
166 
Run(Thread * self)167   void Run(Thread* self) {
168     ScopedObjectAccess soa(self);
169     CHECK(self->GetPeer() != nullptr);
170   }
171 
Finalize()172   void Finalize() {
173     delete this;
174   }
175 };
176 
177 class NoPeerTask : public Task {
178  public:
NoPeerTask()179   NoPeerTask() {}
180 
Run(Thread * self)181   void Run(Thread* self) {
182     ScopedObjectAccess soa(self);
183     CHECK(self->GetPeer() == nullptr);
184   }
185 
Finalize()186   void Finalize() {
187     delete this;
188   }
189 };
190 
191 // Tests for create_peer functionality.
TEST_F(ThreadPoolTest,PeerTest)192 TEST_F(ThreadPoolTest, PeerTest) {
193   Thread* self = Thread::Current();
194   {
195     ThreadPool thread_pool("Thread pool test thread pool", 1);
196     thread_pool.AddTask(self, new NoPeerTask());
197     thread_pool.StartWorkers(self);
198     thread_pool.Wait(self, false, false);
199   }
200 
201   {
202     // To create peers, the runtime needs to be started.
203     self->TransitionFromSuspendedToRunnable();
204     bool started = runtime_->Start();
205     ASSERT_TRUE(started);
206 
207     ThreadPool thread_pool("Thread pool test thread pool", 1, true);
208     thread_pool.AddTask(self, new PeerTask());
209     thread_pool.StartWorkers(self);
210     thread_pool.Wait(self, false, false);
211   }
212 }
213 
214 }  // namespace art
215