1 /*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "task_processor.h"
18 #include "base/time_utils.h"
19 #include "common_runtime_test.h"
20 #include "thread-current-inl.h"
21 #include "thread_pool.h"
22
23 namespace art {
24 namespace gc {
25
26 class TaskProcessorTest : public CommonRuntimeTest {
27 public:
28 };
29
30 class RecursiveTask : public HeapTask {
31 public:
RecursiveTask(TaskProcessor * task_processor,Atomic<size_t> * counter,size_t max_recursion)32 RecursiveTask(TaskProcessor* task_processor, Atomic<size_t>* counter, size_t max_recursion)
33 : HeapTask(NanoTime() + MsToNs(10)), task_processor_(task_processor), counter_(counter),
34 max_recursion_(max_recursion) {
35 }
Run(Thread * self)36 void Run(Thread* self) override {
37 if (max_recursion_ > 0) {
38 task_processor_->AddTask(self,
39 new RecursiveTask(task_processor_, counter_, max_recursion_ - 1));
40 counter_->fetch_add(1U, std::memory_order_seq_cst);
41 }
42 }
43
44 private:
45 TaskProcessor* const task_processor_;
46 Atomic<size_t>* const counter_;
47 const size_t max_recursion_;
48 };
49
50 class WorkUntilDoneTask : public SelfDeletingTask {
51 public:
WorkUntilDoneTask(TaskProcessor * task_processor,Atomic<bool> * done_running)52 WorkUntilDoneTask(TaskProcessor* task_processor, Atomic<bool>* done_running)
53 : task_processor_(task_processor), done_running_(done_running) {
54 }
Run(Thread * self)55 void Run(Thread* self) override {
56 task_processor_->RunAllTasks(self);
57 done_running_->store(true, std::memory_order_seq_cst);
58 }
59
60 private:
61 TaskProcessor* const task_processor_;
62 Atomic<bool>* done_running_;
63 };
64
TEST_F(TaskProcessorTest,Interrupt)65 TEST_F(TaskProcessorTest, Interrupt) {
66 ThreadPool thread_pool("task processor test", 1U);
67 Thread* const self = Thread::Current();
68 TaskProcessor task_processor;
69 static constexpr size_t kRecursion = 10;
70 Atomic<bool> done_running(false);
71 Atomic<size_t> counter(0);
72 task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
73 task_processor.Start(self);
74 // Add a task which will wait until interrupted to the thread pool.
75 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
76 thread_pool.StartWorkers(self);
77 ASSERT_FALSE(done_running);
78 // Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0.
79 while (counter.load(std::memory_order_seq_cst) != kRecursion) {
80 usleep(10);
81 }
82 ASSERT_FALSE(done_running);
83 task_processor.Stop(self);
84 thread_pool.Wait(self, true, false);
85 // After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and
86 // set done_running_ to true.
87 ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
88
89 // Test that we finish remaining tasks before returning from RunTasksUntilInterrupted.
90 counter.store(0, std::memory_order_seq_cst);
91 done_running.store(false, std::memory_order_seq_cst);
92 // Self interrupt before any of the other tasks run, but since we added them we should keep on
93 // working until all the tasks are completed.
94 task_processor.Stop(self);
95 task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
96 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
97 thread_pool.StartWorkers(self);
98 thread_pool.Wait(self, true, false);
99 ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
100 ASSERT_EQ(counter.load(std::memory_order_seq_cst), kRecursion);
101 }
102
103 class TestOrderTask : public HeapTask {
104 public:
TestOrderTask(uint64_t expected_time,size_t expected_counter,size_t * counter)105 TestOrderTask(uint64_t expected_time, size_t expected_counter, size_t* counter)
106 : HeapTask(expected_time), expected_counter_(expected_counter), counter_(counter) {
107 }
Run(Thread * thread ATTRIBUTE_UNUSED)108 void Run(Thread* thread ATTRIBUTE_UNUSED) override {
109 ASSERT_EQ(*counter_, expected_counter_);
110 ++*counter_;
111 }
112
113 private:
114 const size_t expected_counter_;
115 size_t* const counter_;
116 };
117
TEST_F(TaskProcessorTest,Ordering)118 TEST_F(TaskProcessorTest, Ordering) {
119 static const size_t kNumTasks = 25;
120 const uint64_t current_time = NanoTime();
121 Thread* const self = Thread::Current();
122 TaskProcessor task_processor;
123 task_processor.Stop(self);
124 size_t counter = 0;
125 std::vector<std::pair<uint64_t, size_t>> orderings;
126 for (size_t i = 0; i < kNumTasks; ++i) {
127 orderings.push_back(std::make_pair(current_time + MsToNs(10U * i), i));
128 }
129 for (size_t i = 0; i < kNumTasks; ++i) {
130 std::swap(orderings[i], orderings[(i * 87654231 + 12345) % orderings.size()]);
131 }
132 for (const auto& pair : orderings) {
133 auto* task = new TestOrderTask(pair.first, pair.second, &counter);
134 task_processor.AddTask(self, task);
135 }
136 ThreadPool thread_pool("task processor test", 1U);
137 Atomic<bool> done_running(false);
138 // Add a task which will wait until interrupted to the thread pool.
139 thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
140 ASSERT_FALSE(done_running.load(std::memory_order_seq_cst));
141 thread_pool.StartWorkers(self);
142 thread_pool.Wait(self, true, false);
143 ASSERT_TRUE(done_running.load(std::memory_order_seq_cst));
144 ASSERT_EQ(counter, kNumTasks);
145 }
146
147 } // namespace gc
148 } // namespace art
149