1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <memory>
6 #include <vector>
7 
8 #include "base/atomicops.h"
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/macros.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/run_loop.h"
15 #include "base/strings/stringprintf.h"
16 #include "base/synchronization/atomic_flag.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "base/threading/platform_thread.h"
19 #include "base/threading/sequenced_task_runner_handle.h"
20 #include "base/time/time.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22 #include "testing/perf/perf_test.h"
23 
24 namespace base {
25 
26 namespace {
27 
28 // A thread that waits for the caller to signal an event before proceeding to
29 // call Action::Run().
30 class PostingThread {
31  public:
32   class Action {
33    public:
34     virtual ~Action() = default;
35 
36     // Called after the thread is started and |start_event_| is signalled.
37     virtual void Run() = 0;
38 
39    protected:
40     Action() = default;
41 
42    private:
43     DISALLOW_COPY_AND_ASSIGN(Action);
44   };
45 
46   // Creates a PostingThread where the thread waits on |start_event| before
47   // calling action->Run(). If a thread is returned, the thread is guaranteed to
48   // be allocated and running and the caller must call Join() before destroying
49   // the PostingThread.
Create(WaitableEvent * start_event,std::unique_ptr<Action> action)50   static std::unique_ptr<PostingThread> Create(WaitableEvent* start_event,
51                                                std::unique_ptr<Action> action) {
52     auto posting_thread =
53         WrapUnique(new PostingThread(start_event, std::move(action)));
54 
55     if (!posting_thread->Start())
56       return nullptr;
57 
58     return posting_thread;
59   }
60 
~PostingThread()61   ~PostingThread() { DCHECK_EQ(!thread_handle_.is_null(), join_called_); }
62 
Join()63   void Join() {
64     PlatformThread::Join(thread_handle_);
65     join_called_ = true;
66   }
67 
68  private:
69   class Delegate final : public PlatformThread::Delegate {
70    public:
Delegate(PostingThread * outer,std::unique_ptr<Action> action)71     Delegate(PostingThread* outer, std::unique_ptr<Action> action)
72         : outer_(outer), action_(std::move(action)) {
73       DCHECK(outer_);
74       DCHECK(action_);
75     }
76 
77     ~Delegate() override = default;
78 
79    private:
ThreadMain()80     void ThreadMain() override {
81       outer_->thread_started_.Signal();
82       outer_->start_event_->Wait();
83       action_->Run();
84     }
85 
86     PostingThread* const outer_;
87     const std::unique_ptr<Action> action_;
88 
89     DISALLOW_COPY_AND_ASSIGN(Delegate);
90   };
91 
PostingThread(WaitableEvent * start_event,std::unique_ptr<Action> delegate)92   PostingThread(WaitableEvent* start_event, std::unique_ptr<Action> delegate)
93       : start_event_(start_event),
94         thread_started_(WaitableEvent::ResetPolicy::MANUAL,
95                         WaitableEvent::InitialState::NOT_SIGNALED),
96         delegate_(this, std::move(delegate)) {
97     DCHECK(start_event_);
98   }
99 
Start()100   bool Start() {
101     bool thread_created =
102         PlatformThread::Create(0, &delegate_, &thread_handle_);
103     if (thread_created)
104       thread_started_.Wait();
105 
106     return thread_created;
107   }
108 
109   bool join_called_ = false;
110   WaitableEvent* const start_event_;
111   WaitableEvent thread_started_;
112   Delegate delegate_;
113 
114   PlatformThreadHandle thread_handle_;
115 
116   DISALLOW_COPY_AND_ASSIGN(PostingThread);
117 };
118 
119 class MessageLoopPerfTest : public ::testing::TestWithParam<int> {
120  public:
MessageLoopPerfTest()121   MessageLoopPerfTest()
122       : message_loop_task_runner_(SequencedTaskRunnerHandle::Get()),
123         run_posting_threads_(WaitableEvent::ResetPolicy::MANUAL,
124                              WaitableEvent::InitialState::NOT_SIGNALED) {}
125 
ParamInfoToString(::testing::TestParamInfo<int> param_info)126   static std::string ParamInfoToString(
127       ::testing::TestParamInfo<int> param_info) {
128     return PostingThreadCountToString(param_info.param);
129   }
130 
PostingThreadCountToString(int posting_threads)131   static std::string PostingThreadCountToString(int posting_threads) {
132     // Special case 1 thread for thread vs threads.
133     if (posting_threads == 1)
134       return "1_Posting_Thread";
135 
136     return StringPrintf("%d_Posting_Threads", posting_threads);
137   }
138 
139  protected:
140   class ContinuouslyPostTasks final : public PostingThread::Action {
141    public:
ContinuouslyPostTasks(MessageLoopPerfTest * outer)142     ContinuouslyPostTasks(MessageLoopPerfTest* outer) : outer_(outer) {
143       DCHECK(outer_);
144     }
145     ~ContinuouslyPostTasks() override = default;
146 
147    private:
Run()148     void Run() override {
149       RepeatingClosure task_to_run =
150           BindRepeating([](size_t* num_tasks_run) { ++*num_tasks_run; },
151                         &outer_->num_tasks_run_);
152       while (!outer_->stop_posting_threads_.IsSet()) {
153         outer_->message_loop_task_runner_->PostTask(FROM_HERE, task_to_run);
154         subtle::NoBarrier_AtomicIncrement(&outer_->num_tasks_posted_, 1);
155       }
156     }
157 
158     MessageLoopPerfTest* const outer_;
159 
160     DISALLOW_COPY_AND_ASSIGN(ContinuouslyPostTasks);
161   };
162 
SetUp()163   void SetUp() override {
164     // This check is here because we can't ASSERT_TRUE in the constructor.
165     ASSERT_TRUE(message_loop_task_runner_);
166   }
167 
168   // Runs ActionType::Run() on |num_posting_threads| and requests test
169   // termination around |duration|.
170   template <typename ActionType>
RunTest(const int num_posting_threads,TimeDelta duration)171   void RunTest(const int num_posting_threads, TimeDelta duration) {
172     std::vector<std::unique_ptr<PostingThread>> threads;
173     for (int i = 0; i < num_posting_threads; ++i) {
174       threads.emplace_back(PostingThread::Create(
175           &run_posting_threads_, std::make_unique<ActionType>(this)));
176       // Don't assert here to simplify the code that requires a Join() call for
177       // every created PostingThread.
178       EXPECT_TRUE(threads[i]);
179     }
180 
181     RunLoop run_loop;
182     message_loop_task_runner_->PostDelayedTask(
183         FROM_HERE,
184         BindOnce(
185             [](RunLoop* run_loop, AtomicFlag* stop_posting_threads) {
186               stop_posting_threads->Set();
187               run_loop->Quit();
188             },
189             &run_loop, &stop_posting_threads_),
190         duration);
191 
192     TimeTicks post_task_start = TimeTicks::Now();
193     run_posting_threads_.Signal();
194 
195     TimeTicks run_loop_start = TimeTicks::Now();
196     run_loop.Run();
197     tasks_run_duration_ = TimeTicks::Now() - run_loop_start;
198 
199     for (auto& thread : threads)
200       thread->Join();
201 
202     tasks_posted_duration_ = TimeTicks::Now() - post_task_start;
203   }
204 
num_tasks_posted() const205   size_t num_tasks_posted() const {
206     return subtle::NoBarrier_Load(&num_tasks_posted_);
207   }
208 
tasks_posted_duration() const209   TimeDelta tasks_posted_duration() const { return tasks_posted_duration_; }
210 
num_tasks_run() const211   size_t num_tasks_run() const { return num_tasks_run_; }
212 
tasks_run_duration() const213   TimeDelta tasks_run_duration() const { return tasks_run_duration_; }
214 
215  private:
216   MessageLoop message_loop_;
217 
218   // Accessed on multiple threads, thread-safe or constant:
219   const scoped_refptr<SequencedTaskRunner> message_loop_task_runner_;
220   WaitableEvent run_posting_threads_;
221   AtomicFlag stop_posting_threads_;
222   subtle::AtomicWord num_tasks_posted_ = 0;
223 
224   // Accessed only on the test case thread:
225   TimeDelta tasks_posted_duration_;
226   TimeDelta tasks_run_duration_;
227   size_t num_tasks_run_ = 0;
228 
229   DISALLOW_COPY_AND_ASSIGN(MessageLoopPerfTest);
230 };
231 
232 }  // namespace
233 
TEST_P(MessageLoopPerfTest,PostTaskRate)234 TEST_P(MessageLoopPerfTest, PostTaskRate) {
235   // Measures the average rate of posting tasks from different threads and the
236   // average rate that the message loop is running those tasks.
237   RunTest<ContinuouslyPostTasks>(GetParam(), TimeDelta::FromSeconds(3));
238   perf_test::PrintResult("task_posting", "",
239                          PostingThreadCountToString(GetParam()),
240                          tasks_posted_duration().InMicroseconds() /
241                              static_cast<double>(num_tasks_posted()),
242                          "us/task", true);
243   perf_test::PrintResult("task_running", "",
244                          PostingThreadCountToString(GetParam()),
245                          tasks_run_duration().InMicroseconds() /
246                              static_cast<double>(num_tasks_run()),
247                          "us/task", true);
248 }
249 
250 INSTANTIATE_TEST_CASE_P(,
251                         MessageLoopPerfTest,
252                         ::testing::Values(1, 5, 10),
253                         MessageLoopPerfTest::ParamInfoToString);
254 }  // namespace base
255