1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/message_pump/handle_watcher.h"
6
7 #include <memory>
8 #include <string>
9
10 #include "base/at_exit.h"
11 #include "base/auto_reset.h"
12 #include "base/bind.h"
13 #include "base/macros.h"
14 #include "base/memory/scoped_vector.h"
15 #include "base/run_loop.h"
16 #include "base/test/simple_test_tick_clock.h"
17 #include "base/threading/thread.h"
18 #include "mojo/message_pump/message_pump_mojo.h"
19 #include "mojo/message_pump/time_helper.h"
20 #include "mojo/public/cpp/system/core.h"
21 #include "mojo/public/cpp/test_support/test_utils.h"
22 #include "testing/gtest/include/gtest/gtest.h"
23
24 namespace mojo {
25 namespace common {
26 namespace test {
27
28 enum MessageLoopConfig {
29 MESSAGE_LOOP_CONFIG_DEFAULT = 0,
30 MESSAGE_LOOP_CONFIG_MOJO = 1
31 };
32
ObserveCallback(bool * was_signaled,MojoResult * result_observed,MojoResult result)33 void ObserveCallback(bool* was_signaled,
34 MojoResult* result_observed,
35 MojoResult result) {
36 *was_signaled = true;
37 *result_observed = result;
38 }
39
RunUntilIdle()40 void RunUntilIdle() {
41 base::RunLoop run_loop;
42 run_loop.RunUntilIdle();
43 }
44
DeleteWatcherAndForwardResult(HandleWatcher * watcher,base::Callback<void (MojoResult)> next_callback,MojoResult result)45 void DeleteWatcherAndForwardResult(
46 HandleWatcher* watcher,
47 base::Callback<void(MojoResult)> next_callback,
48 MojoResult result) {
49 delete watcher;
50 next_callback.Run(result);
51 }
52
CreateMessageLoop(MessageLoopConfig config)53 std::unique_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) {
54 std::unique_ptr<base::MessageLoop> loop;
55 if (config == MESSAGE_LOOP_CONFIG_DEFAULT)
56 loop.reset(new base::MessageLoop());
57 else
58 loop.reset(new base::MessageLoop(MessagePumpMojo::Create()));
59 return loop;
60 }
61
62 // Helper class to manage the callback and running the message loop waiting for
63 // message to be received. Typical usage is something like:
64 // Schedule callback returned from GetCallback().
65 // RunUntilGotCallback();
66 // EXPECT_TRUE(got_callback());
67 // clear_callback();
68 class CallbackHelper {
69 public:
CallbackHelper()70 CallbackHelper()
71 : got_callback_(false),
72 run_loop_(NULL),
73 weak_factory_(this) {}
~CallbackHelper()74 ~CallbackHelper() {}
75
76 // See description above |got_callback_|.
got_callback() const77 bool got_callback() const { return got_callback_; }
clear_callback()78 void clear_callback() { got_callback_ = false; }
79
80 // Runs the current MessageLoop until the callback returned from GetCallback()
81 // is notified.
RunUntilGotCallback()82 void RunUntilGotCallback() {
83 ASSERT_TRUE(run_loop_ == NULL);
84 base::RunLoop run_loop;
85 base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
86 run_loop.Run();
87 }
88
GetCallback()89 base::Callback<void(MojoResult)> GetCallback() {
90 return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
91 }
92
Start(HandleWatcher * watcher,const MessagePipeHandle & handle)93 void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) {
94 StartWithCallback(watcher, handle, GetCallback());
95 }
96
StartWithCallback(HandleWatcher * watcher,const MessagePipeHandle & handle,const base::Callback<void (MojoResult)> & callback)97 void StartWithCallback(HandleWatcher* watcher,
98 const MessagePipeHandle& handle,
99 const base::Callback<void(MojoResult)>& callback) {
100 watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE,
101 MOJO_DEADLINE_INDEFINITE, callback);
102 }
103
104 private:
OnCallback(MojoResult result)105 void OnCallback(MojoResult result) {
106 got_callback_ = true;
107 if (run_loop_)
108 run_loop_->Quit();
109 }
110
111 // Set to true when the callback is called.
112 bool got_callback_;
113
114 // If non-NULL we're in RunUntilGotCallback().
115 base::RunLoop* run_loop_;
116
117 base::WeakPtrFactory<CallbackHelper> weak_factory_;
118
119 private:
120 DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
121 };
122
123 class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> {
124 public:
HandleWatcherTest()125 HandleWatcherTest()
126 : at_exit_(new base::ShadowingAtExitManager),
127 message_loop_(CreateMessageLoop(GetParam())) {}
~HandleWatcherTest()128 virtual ~HandleWatcherTest() {
129 // By explicitly destroying |at_exit_| before resetting the tick clock, it
130 // ensures that the handle watcher thread (if there is one) is shut down,
131 // preventing a race with users of the tick clock in MessagePumpMojo.
132 at_exit_.reset();
133 test::SetTickClockForTest(NULL);
134 }
135
136 protected:
TearDownMessageLoop()137 void TearDownMessageLoop() {
138 message_loop_.reset();
139 }
140
141 // This should be called at the beginning of any test that needs it, so that
142 // it is installed before the handle watcher thread starts.
InstallTickClock()143 void InstallTickClock() {
144 test::SetTickClockForTest(&tick_clock_);
145 }
146
147 base::SimpleTestTickClock tick_clock_;
148
149 private:
150 std::unique_ptr<base::ShadowingAtExitManager> at_exit_;
151 std::unique_ptr<base::MessageLoop> message_loop_;
152
153 DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
154 };
155
156 INSTANTIATE_TEST_CASE_P(
157 MultipleMessageLoopConfigs, HandleWatcherTest,
158 testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO));
159
160 // Trivial test case with a single handle to watch.
TEST_P(HandleWatcherTest,SingleHandler)161 TEST_P(HandleWatcherTest, SingleHandler) {
162 MessagePipe test_pipe;
163 ASSERT_TRUE(test_pipe.handle0.is_valid());
164 CallbackHelper callback_helper;
165 HandleWatcher watcher;
166 callback_helper.Start(&watcher, test_pipe.handle0.get());
167 RunUntilIdle();
168 EXPECT_FALSE(callback_helper.got_callback());
169 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
170 std::string()));
171 callback_helper.RunUntilGotCallback();
172 EXPECT_TRUE(callback_helper.got_callback());
173 }
174
175 // Creates three handles and notfies them in reverse order ensuring each one is
176 // notified appropriately.
TEST_P(HandleWatcherTest,ThreeHandles)177 TEST_P(HandleWatcherTest, ThreeHandles) {
178 MessagePipe test_pipe1;
179 MessagePipe test_pipe2;
180 MessagePipe test_pipe3;
181 CallbackHelper callback_helper1;
182 CallbackHelper callback_helper2;
183 CallbackHelper callback_helper3;
184 ASSERT_TRUE(test_pipe1.handle0.is_valid());
185 ASSERT_TRUE(test_pipe2.handle0.is_valid());
186 ASSERT_TRUE(test_pipe3.handle0.is_valid());
187
188 HandleWatcher watcher1;
189 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
190 RunUntilIdle();
191 EXPECT_FALSE(callback_helper1.got_callback());
192 EXPECT_FALSE(callback_helper2.got_callback());
193 EXPECT_FALSE(callback_helper3.got_callback());
194
195 HandleWatcher watcher2;
196 callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
197 RunUntilIdle();
198 EXPECT_FALSE(callback_helper1.got_callback());
199 EXPECT_FALSE(callback_helper2.got_callback());
200 EXPECT_FALSE(callback_helper3.got_callback());
201
202 HandleWatcher watcher3;
203 callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
204 RunUntilIdle();
205 EXPECT_FALSE(callback_helper1.got_callback());
206 EXPECT_FALSE(callback_helper2.got_callback());
207 EXPECT_FALSE(callback_helper3.got_callback());
208
209 // Write to 3 and make sure it's notified.
210 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
211 std::string()));
212 callback_helper3.RunUntilGotCallback();
213 EXPECT_FALSE(callback_helper1.got_callback());
214 EXPECT_FALSE(callback_helper2.got_callback());
215 EXPECT_TRUE(callback_helper3.got_callback());
216 callback_helper3.clear_callback();
217
218 // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
219 // running.
220 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
221 std::string()));
222 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
223 std::string()));
224 callback_helper1.RunUntilGotCallback();
225 EXPECT_TRUE(callback_helper1.got_callback());
226 EXPECT_FALSE(callback_helper2.got_callback());
227 EXPECT_FALSE(callback_helper3.got_callback());
228 callback_helper1.clear_callback();
229
230 // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
231 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
232 std::string()));
233 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
234 std::string()));
235 callback_helper2.RunUntilGotCallback();
236 EXPECT_FALSE(callback_helper1.got_callback());
237 EXPECT_TRUE(callback_helper2.got_callback());
238 EXPECT_FALSE(callback_helper3.got_callback());
239 }
240
241 // Verifies Start() invoked a second time works.
TEST_P(HandleWatcherTest,Restart)242 TEST_P(HandleWatcherTest, Restart) {
243 MessagePipe test_pipe1;
244 MessagePipe test_pipe2;
245 CallbackHelper callback_helper1;
246 CallbackHelper callback_helper2;
247 ASSERT_TRUE(test_pipe1.handle0.is_valid());
248 ASSERT_TRUE(test_pipe2.handle0.is_valid());
249
250 HandleWatcher watcher1;
251 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
252 RunUntilIdle();
253 EXPECT_FALSE(callback_helper1.got_callback());
254 EXPECT_FALSE(callback_helper2.got_callback());
255
256 HandleWatcher watcher2;
257 callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
258 RunUntilIdle();
259 EXPECT_FALSE(callback_helper1.got_callback());
260 EXPECT_FALSE(callback_helper2.got_callback());
261
262 // Write to 1 and make sure it's notified.
263 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
264 std::string()));
265 callback_helper1.RunUntilGotCallback();
266 EXPECT_TRUE(callback_helper1.got_callback());
267 EXPECT_FALSE(callback_helper2.got_callback());
268 callback_helper1.clear_callback();
269 EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get()));
270
271 // Write to 2 and make sure it's notified.
272 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
273 std::string()));
274 callback_helper2.RunUntilGotCallback();
275 EXPECT_FALSE(callback_helper1.got_callback());
276 EXPECT_TRUE(callback_helper2.got_callback());
277 callback_helper2.clear_callback();
278
279 // Listen on 1 again.
280 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
281 RunUntilIdle();
282 EXPECT_FALSE(callback_helper1.got_callback());
283 EXPECT_FALSE(callback_helper2.got_callback());
284
285 // Write to 1 and make sure it's notified.
286 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
287 std::string()));
288 callback_helper1.RunUntilGotCallback();
289 EXPECT_TRUE(callback_helper1.got_callback());
290 EXPECT_FALSE(callback_helper2.got_callback());
291 }
292
293 // Verifies Start() invoked a second time on the same handle works.
TEST_P(HandleWatcherTest,RestartOnSameHandle)294 TEST_P(HandleWatcherTest, RestartOnSameHandle) {
295 MessagePipe test_pipe;
296 CallbackHelper callback_helper;
297 ASSERT_TRUE(test_pipe.handle0.is_valid());
298
299 HandleWatcher watcher;
300 callback_helper.Start(&watcher, test_pipe.handle0.get());
301 RunUntilIdle();
302 EXPECT_FALSE(callback_helper.got_callback());
303
304 callback_helper.Start(&watcher, test_pipe.handle0.get());
305 RunUntilIdle();
306 EXPECT_FALSE(callback_helper.got_callback());
307 }
308
309 // Verifies deadline is honored.
TEST_P(HandleWatcherTest,Deadline)310 TEST_P(HandleWatcherTest, Deadline) {
311 InstallTickClock();
312
313 MessagePipe test_pipe1;
314 MessagePipe test_pipe2;
315 MessagePipe test_pipe3;
316 CallbackHelper callback_helper1;
317 CallbackHelper callback_helper2;
318 CallbackHelper callback_helper3;
319 ASSERT_TRUE(test_pipe1.handle0.is_valid());
320 ASSERT_TRUE(test_pipe2.handle0.is_valid());
321 ASSERT_TRUE(test_pipe3.handle0.is_valid());
322
323 // Add a watcher with an infinite timeout.
324 HandleWatcher watcher1;
325 callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
326 RunUntilIdle();
327 EXPECT_FALSE(callback_helper1.got_callback());
328 EXPECT_FALSE(callback_helper2.got_callback());
329 EXPECT_FALSE(callback_helper3.got_callback());
330
331 // Add another watcher wth a timeout of 500 microseconds.
332 HandleWatcher watcher2;
333 watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500,
334 callback_helper2.GetCallback());
335 RunUntilIdle();
336 EXPECT_FALSE(callback_helper1.got_callback());
337 EXPECT_FALSE(callback_helper2.got_callback());
338 EXPECT_FALSE(callback_helper3.got_callback());
339
340 // Advance the clock passed the deadline. We also have to start another
341 // watcher to wake up the background thread.
342 tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
343
344 HandleWatcher watcher3;
345 callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
346
347 callback_helper2.RunUntilGotCallback();
348 EXPECT_FALSE(callback_helper1.got_callback());
349 EXPECT_TRUE(callback_helper2.got_callback());
350 EXPECT_FALSE(callback_helper3.got_callback());
351 }
352
TEST_P(HandleWatcherTest,DeleteInCallback)353 TEST_P(HandleWatcherTest, DeleteInCallback) {
354 MessagePipe test_pipe;
355 CallbackHelper callback_helper;
356
357 HandleWatcher* watcher = new HandleWatcher();
358 callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(),
359 base::Bind(&DeleteWatcherAndForwardResult,
360 watcher,
361 callback_helper.GetCallback()));
362 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(),
363 std::string()));
364 callback_helper.RunUntilGotCallback();
365 EXPECT_TRUE(callback_helper.got_callback());
366 }
367
TEST_P(HandleWatcherTest,AbortedOnMessageLoopDestruction)368 TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) {
369 bool was_signaled = false;
370 MojoResult result = MOJO_RESULT_OK;
371
372 MessagePipe pipe;
373 HandleWatcher watcher;
374 watcher.Start(pipe.handle0.get(),
375 MOJO_HANDLE_SIGNAL_READABLE,
376 MOJO_DEADLINE_INDEFINITE,
377 base::Bind(&ObserveCallback, &was_signaled, &result));
378
379 // Now, let the MessageLoop get torn down. We expect our callback to run.
380 TearDownMessageLoop();
381
382 EXPECT_TRUE(was_signaled);
383 EXPECT_EQ(MOJO_RESULT_ABORTED, result);
384 }
385
NeverReached(MojoResult result)386 void NeverReached(MojoResult result) {
387 FAIL() << "Callback should never be invoked " << result;
388 }
389
390 // Called on the main thread when a thread is done. Decrements |active_count|
391 // and if |active_count| is zero quits |run_loop|.
StressThreadDone(base::RunLoop * run_loop,int * active_count)392 void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
393 (*active_count)--;
394 EXPECT_GE(*active_count, 0);
395 if (*active_count == 0)
396 run_loop->Quit();
397 }
398
399 // See description of StressTest. This is called on the background thread.
400 // |count| is the number of HandleWatchers to create. |active_count| is the
401 // number of outstanding threads, |task_runner| the task runner for the main
402 // thread and |run_loop| the run loop that should be quit when there are no more
403 // threads running. When done StressThreadDone() is invoked on the main thread.
404 // |active_count| and |run_loop| should only be used on the main thread.
RunStressTest(int count,scoped_refptr<base::TaskRunner> task_runner,base::RunLoop * run_loop,int * active_count)405 void RunStressTest(int count,
406 scoped_refptr<base::TaskRunner> task_runner,
407 base::RunLoop* run_loop,
408 int* active_count) {
409 struct TestData {
410 MessagePipe pipe;
411 HandleWatcher watcher;
412 };
413 ScopedVector<TestData> data_vector;
414 for (int i = 0; i < count; ++i) {
415 if (i % 20 == 0) {
416 // Every so often we wait. This results in some level of thread balancing
417 // as well as making sure HandleWatcher has time to actually start some
418 // watches.
419 MessagePipe test_pipe;
420 ASSERT_TRUE(test_pipe.handle0.is_valid());
421 CallbackHelper callback_helper;
422 HandleWatcher watcher;
423 callback_helper.Start(&watcher, test_pipe.handle0.get());
424 RunUntilIdle();
425 EXPECT_FALSE(callback_helper.got_callback());
426 EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
427 std::string()));
428 base::MessageLoop::ScopedNestableTaskAllower scoper(
429 base::MessageLoop::current());
430 callback_helper.RunUntilGotCallback();
431 EXPECT_TRUE(callback_helper.got_callback());
432 } else {
433 std::unique_ptr<TestData> test_data(new TestData);
434 ASSERT_TRUE(test_data->pipe.handle0.is_valid());
435 test_data->watcher.Start(test_data->pipe.handle0.get(),
436 MOJO_HANDLE_SIGNAL_READABLE,
437 MOJO_DEADLINE_INDEFINITE,
438 base::Bind(&NeverReached));
439 data_vector.push_back(test_data.release());
440 }
441 if (i % 15 == 0)
442 data_vector.clear();
443 }
444 task_runner->PostTask(FROM_HERE,
445 base::Bind(&StressThreadDone, run_loop,
446 active_count));
447 }
448
449 // This test is meant to stress HandleWatcher. It uses from various threads
450 // repeatedly starting and stopping watches. It spins up kThreadCount
451 // threads. Each thread creates kWatchCount watches. Every so often each thread
452 // writes to a pipe and waits for the response.
TEST(HandleWatcherCleanEnvironmentTest,StressTest)453 TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
454 #if defined(NDEBUG)
455 const int kThreadCount = 15;
456 const int kWatchCount = 400;
457 #else
458 const int kThreadCount = 10;
459 const int kWatchCount = 250;
460 #endif
461
462 base::ShadowingAtExitManager at_exit;
463 base::MessageLoop message_loop;
464 base::RunLoop run_loop;
465 ScopedVector<base::Thread> threads;
466 int threads_active_counter = kThreadCount;
467 // Starts the threads first and then post the task in hopes of having more
468 // threads running at once.
469 for (int i = 0; i < kThreadCount; ++i) {
470 std::unique_ptr<base::Thread> thread(new base::Thread("test thread"));
471 if (i % 2) {
472 base::Thread::Options thread_options;
473 thread_options.message_pump_factory =
474 base::Bind(&MessagePumpMojo::Create);
475 thread->StartWithOptions(thread_options);
476 } else {
477 thread->Start();
478 }
479 threads.push_back(thread.release());
480 }
481 for (int i = 0; i < kThreadCount; ++i) {
482 threads[i]->task_runner()->PostTask(
483 FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
484 message_loop.task_runner(),
485 &run_loop, &threads_active_counter));
486 }
487 run_loop.Run();
488 ASSERT_EQ(0, threads_active_counter);
489 }
490
491 } // namespace test
492 } // namespace common
493 } // namespace mojo
494