1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/message_pump/handle_watcher.h"
6 
7 #include <memory>
8 #include <string>
9 
10 #include "base/at_exit.h"
11 #include "base/auto_reset.h"
12 #include "base/bind.h"
13 #include "base/macros.h"
14 #include "base/memory/scoped_vector.h"
15 #include "base/run_loop.h"
16 #include "base/test/simple_test_tick_clock.h"
17 #include "base/threading/thread.h"
18 #include "mojo/message_pump/message_pump_mojo.h"
19 #include "mojo/message_pump/time_helper.h"
20 #include "mojo/public/cpp/system/core.h"
21 #include "mojo/public/cpp/test_support/test_utils.h"
22 #include "testing/gtest/include/gtest/gtest.h"
23 
24 namespace mojo {
25 namespace common {
26 namespace test {
27 
28 enum MessageLoopConfig {
29   MESSAGE_LOOP_CONFIG_DEFAULT = 0,
30   MESSAGE_LOOP_CONFIG_MOJO = 1
31 };
32 
ObserveCallback(bool * was_signaled,MojoResult * result_observed,MojoResult result)33 void ObserveCallback(bool* was_signaled,
34                      MojoResult* result_observed,
35                      MojoResult result) {
36   *was_signaled = true;
37   *result_observed = result;
38 }
39 
RunUntilIdle()40 void RunUntilIdle() {
41   base::RunLoop run_loop;
42   run_loop.RunUntilIdle();
43 }
44 
DeleteWatcherAndForwardResult(HandleWatcher * watcher,base::Callback<void (MojoResult)> next_callback,MojoResult result)45 void DeleteWatcherAndForwardResult(
46     HandleWatcher* watcher,
47     base::Callback<void(MojoResult)> next_callback,
48     MojoResult result) {
49   delete watcher;
50   next_callback.Run(result);
51 }
52 
CreateMessageLoop(MessageLoopConfig config)53 std::unique_ptr<base::MessageLoop> CreateMessageLoop(MessageLoopConfig config) {
54   std::unique_ptr<base::MessageLoop> loop;
55   if (config == MESSAGE_LOOP_CONFIG_DEFAULT)
56     loop.reset(new base::MessageLoop());
57   else
58     loop.reset(new base::MessageLoop(MessagePumpMojo::Create()));
59   return loop;
60 }
61 
62 // Helper class to manage the callback and running the message loop waiting for
63 // message to be received. Typical usage is something like:
64 //   Schedule callback returned from GetCallback().
65 //   RunUntilGotCallback();
66 //   EXPECT_TRUE(got_callback());
67 //   clear_callback();
68 class CallbackHelper {
69  public:
CallbackHelper()70   CallbackHelper()
71       : got_callback_(false),
72         run_loop_(NULL),
73         weak_factory_(this) {}
~CallbackHelper()74   ~CallbackHelper() {}
75 
76   // See description above |got_callback_|.
got_callback() const77   bool got_callback() const { return got_callback_; }
clear_callback()78   void clear_callback() { got_callback_ = false; }
79 
80   // Runs the current MessageLoop until the callback returned from GetCallback()
81   // is notified.
RunUntilGotCallback()82   void RunUntilGotCallback() {
83     ASSERT_TRUE(run_loop_ == NULL);
84     base::RunLoop run_loop;
85     base::AutoReset<base::RunLoop*> reseter(&run_loop_, &run_loop);
86     run_loop.Run();
87   }
88 
GetCallback()89   base::Callback<void(MojoResult)> GetCallback() {
90     return base::Bind(&CallbackHelper::OnCallback, weak_factory_.GetWeakPtr());
91   }
92 
Start(HandleWatcher * watcher,const MessagePipeHandle & handle)93   void Start(HandleWatcher* watcher, const MessagePipeHandle& handle) {
94     StartWithCallback(watcher, handle, GetCallback());
95   }
96 
StartWithCallback(HandleWatcher * watcher,const MessagePipeHandle & handle,const base::Callback<void (MojoResult)> & callback)97   void StartWithCallback(HandleWatcher* watcher,
98                          const MessagePipeHandle& handle,
99                          const base::Callback<void(MojoResult)>& callback) {
100     watcher->Start(handle, MOJO_HANDLE_SIGNAL_READABLE,
101                    MOJO_DEADLINE_INDEFINITE, callback);
102   }
103 
104  private:
OnCallback(MojoResult result)105   void OnCallback(MojoResult result) {
106     got_callback_ = true;
107     if (run_loop_)
108       run_loop_->Quit();
109   }
110 
111   // Set to true when the callback is called.
112   bool got_callback_;
113 
114   // If non-NULL we're in RunUntilGotCallback().
115   base::RunLoop* run_loop_;
116 
117   base::WeakPtrFactory<CallbackHelper> weak_factory_;
118 
119  private:
120   DISALLOW_COPY_AND_ASSIGN(CallbackHelper);
121 };
122 
123 class HandleWatcherTest : public testing::TestWithParam<MessageLoopConfig> {
124  public:
HandleWatcherTest()125   HandleWatcherTest()
126       : at_exit_(new base::ShadowingAtExitManager),
127         message_loop_(CreateMessageLoop(GetParam())) {}
~HandleWatcherTest()128   virtual ~HandleWatcherTest() {
129     // By explicitly destroying |at_exit_| before resetting the tick clock, it
130     // ensures that the handle watcher thread (if there is one) is shut down,
131     // preventing a race with users of the tick clock in MessagePumpMojo.
132     at_exit_.reset();
133     test::SetTickClockForTest(NULL);
134   }
135 
136  protected:
TearDownMessageLoop()137   void TearDownMessageLoop() {
138     message_loop_.reset();
139   }
140 
141   // This should be called at the beginning of any test that needs it, so that
142   // it is installed before the handle watcher thread starts.
InstallTickClock()143   void InstallTickClock() {
144     test::SetTickClockForTest(&tick_clock_);
145   }
146 
147   base::SimpleTestTickClock tick_clock_;
148 
149  private:
150   std::unique_ptr<base::ShadowingAtExitManager> at_exit_;
151   std::unique_ptr<base::MessageLoop> message_loop_;
152 
153   DISALLOW_COPY_AND_ASSIGN(HandleWatcherTest);
154 };
155 
156 INSTANTIATE_TEST_CASE_P(
157     MultipleMessageLoopConfigs, HandleWatcherTest,
158     testing::Values(MESSAGE_LOOP_CONFIG_DEFAULT, MESSAGE_LOOP_CONFIG_MOJO));
159 
160 // Trivial test case with a single handle to watch.
TEST_P(HandleWatcherTest,SingleHandler)161 TEST_P(HandleWatcherTest, SingleHandler) {
162   MessagePipe test_pipe;
163   ASSERT_TRUE(test_pipe.handle0.is_valid());
164   CallbackHelper callback_helper;
165   HandleWatcher watcher;
166   callback_helper.Start(&watcher, test_pipe.handle0.get());
167   RunUntilIdle();
168   EXPECT_FALSE(callback_helper.got_callback());
169   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
170                                            std::string()));
171   callback_helper.RunUntilGotCallback();
172   EXPECT_TRUE(callback_helper.got_callback());
173 }
174 
175 // Creates three handles and notfies them in reverse order ensuring each one is
176 // notified appropriately.
TEST_P(HandleWatcherTest,ThreeHandles)177 TEST_P(HandleWatcherTest, ThreeHandles) {
178   MessagePipe test_pipe1;
179   MessagePipe test_pipe2;
180   MessagePipe test_pipe3;
181   CallbackHelper callback_helper1;
182   CallbackHelper callback_helper2;
183   CallbackHelper callback_helper3;
184   ASSERT_TRUE(test_pipe1.handle0.is_valid());
185   ASSERT_TRUE(test_pipe2.handle0.is_valid());
186   ASSERT_TRUE(test_pipe3.handle0.is_valid());
187 
188   HandleWatcher watcher1;
189   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
190   RunUntilIdle();
191   EXPECT_FALSE(callback_helper1.got_callback());
192   EXPECT_FALSE(callback_helper2.got_callback());
193   EXPECT_FALSE(callback_helper3.got_callback());
194 
195   HandleWatcher watcher2;
196   callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
197   RunUntilIdle();
198   EXPECT_FALSE(callback_helper1.got_callback());
199   EXPECT_FALSE(callback_helper2.got_callback());
200   EXPECT_FALSE(callback_helper3.got_callback());
201 
202   HandleWatcher watcher3;
203   callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
204   RunUntilIdle();
205   EXPECT_FALSE(callback_helper1.got_callback());
206   EXPECT_FALSE(callback_helper2.got_callback());
207   EXPECT_FALSE(callback_helper3.got_callback());
208 
209   // Write to 3 and make sure it's notified.
210   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
211                                            std::string()));
212   callback_helper3.RunUntilGotCallback();
213   EXPECT_FALSE(callback_helper1.got_callback());
214   EXPECT_FALSE(callback_helper2.got_callback());
215   EXPECT_TRUE(callback_helper3.got_callback());
216   callback_helper3.clear_callback();
217 
218   // Write to 1 and 3. Only 1 should be notified since 3 was is no longer
219   // running.
220   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
221                                            std::string()));
222   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe3.handle1.get(),
223                                            std::string()));
224   callback_helper1.RunUntilGotCallback();
225   EXPECT_TRUE(callback_helper1.got_callback());
226   EXPECT_FALSE(callback_helper2.got_callback());
227   EXPECT_FALSE(callback_helper3.got_callback());
228   callback_helper1.clear_callback();
229 
230   // Write to 1 and 2. Only 2 should be notified (since 1 was already notified).
231   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
232                                            std::string()));
233   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
234                                            std::string()));
235   callback_helper2.RunUntilGotCallback();
236   EXPECT_FALSE(callback_helper1.got_callback());
237   EXPECT_TRUE(callback_helper2.got_callback());
238   EXPECT_FALSE(callback_helper3.got_callback());
239 }
240 
241 // Verifies Start() invoked a second time works.
TEST_P(HandleWatcherTest,Restart)242 TEST_P(HandleWatcherTest, Restart) {
243   MessagePipe test_pipe1;
244   MessagePipe test_pipe2;
245   CallbackHelper callback_helper1;
246   CallbackHelper callback_helper2;
247   ASSERT_TRUE(test_pipe1.handle0.is_valid());
248   ASSERT_TRUE(test_pipe2.handle0.is_valid());
249 
250   HandleWatcher watcher1;
251   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
252   RunUntilIdle();
253   EXPECT_FALSE(callback_helper1.got_callback());
254   EXPECT_FALSE(callback_helper2.got_callback());
255 
256   HandleWatcher watcher2;
257   callback_helper2.Start(&watcher2, test_pipe2.handle0.get());
258   RunUntilIdle();
259   EXPECT_FALSE(callback_helper1.got_callback());
260   EXPECT_FALSE(callback_helper2.got_callback());
261 
262   // Write to 1 and make sure it's notified.
263   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
264                                            std::string()));
265   callback_helper1.RunUntilGotCallback();
266   EXPECT_TRUE(callback_helper1.got_callback());
267   EXPECT_FALSE(callback_helper2.got_callback());
268   callback_helper1.clear_callback();
269   EXPECT_TRUE(mojo::test::DiscardMessage(test_pipe1.handle0.get()));
270 
271   // Write to 2 and make sure it's notified.
272   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe2.handle1.get(),
273                                            std::string()));
274   callback_helper2.RunUntilGotCallback();
275   EXPECT_FALSE(callback_helper1.got_callback());
276   EXPECT_TRUE(callback_helper2.got_callback());
277   callback_helper2.clear_callback();
278 
279   // Listen on 1 again.
280   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
281   RunUntilIdle();
282   EXPECT_FALSE(callback_helper1.got_callback());
283   EXPECT_FALSE(callback_helper2.got_callback());
284 
285   // Write to 1 and make sure it's notified.
286   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe1.handle1.get(),
287                                            std::string()));
288   callback_helper1.RunUntilGotCallback();
289   EXPECT_TRUE(callback_helper1.got_callback());
290   EXPECT_FALSE(callback_helper2.got_callback());
291 }
292 
293 // Verifies Start() invoked a second time on the same handle works.
TEST_P(HandleWatcherTest,RestartOnSameHandle)294 TEST_P(HandleWatcherTest, RestartOnSameHandle) {
295   MessagePipe test_pipe;
296   CallbackHelper callback_helper;
297   ASSERT_TRUE(test_pipe.handle0.is_valid());
298 
299   HandleWatcher watcher;
300   callback_helper.Start(&watcher, test_pipe.handle0.get());
301   RunUntilIdle();
302   EXPECT_FALSE(callback_helper.got_callback());
303 
304   callback_helper.Start(&watcher, test_pipe.handle0.get());
305   RunUntilIdle();
306   EXPECT_FALSE(callback_helper.got_callback());
307 }
308 
309 // Verifies deadline is honored.
TEST_P(HandleWatcherTest,Deadline)310 TEST_P(HandleWatcherTest, Deadline) {
311   InstallTickClock();
312 
313   MessagePipe test_pipe1;
314   MessagePipe test_pipe2;
315   MessagePipe test_pipe3;
316   CallbackHelper callback_helper1;
317   CallbackHelper callback_helper2;
318   CallbackHelper callback_helper3;
319   ASSERT_TRUE(test_pipe1.handle0.is_valid());
320   ASSERT_TRUE(test_pipe2.handle0.is_valid());
321   ASSERT_TRUE(test_pipe3.handle0.is_valid());
322 
323   // Add a watcher with an infinite timeout.
324   HandleWatcher watcher1;
325   callback_helper1.Start(&watcher1, test_pipe1.handle0.get());
326   RunUntilIdle();
327   EXPECT_FALSE(callback_helper1.got_callback());
328   EXPECT_FALSE(callback_helper2.got_callback());
329   EXPECT_FALSE(callback_helper3.got_callback());
330 
331   // Add another watcher wth a timeout of 500 microseconds.
332   HandleWatcher watcher2;
333   watcher2.Start(test_pipe2.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE, 500,
334                  callback_helper2.GetCallback());
335   RunUntilIdle();
336   EXPECT_FALSE(callback_helper1.got_callback());
337   EXPECT_FALSE(callback_helper2.got_callback());
338   EXPECT_FALSE(callback_helper3.got_callback());
339 
340   // Advance the clock passed the deadline. We also have to start another
341   // watcher to wake up the background thread.
342   tick_clock_.Advance(base::TimeDelta::FromMicroseconds(501));
343 
344   HandleWatcher watcher3;
345   callback_helper3.Start(&watcher3, test_pipe3.handle0.get());
346 
347   callback_helper2.RunUntilGotCallback();
348   EXPECT_FALSE(callback_helper1.got_callback());
349   EXPECT_TRUE(callback_helper2.got_callback());
350   EXPECT_FALSE(callback_helper3.got_callback());
351 }
352 
TEST_P(HandleWatcherTest,DeleteInCallback)353 TEST_P(HandleWatcherTest, DeleteInCallback) {
354   MessagePipe test_pipe;
355   CallbackHelper callback_helper;
356 
357   HandleWatcher* watcher = new HandleWatcher();
358   callback_helper.StartWithCallback(watcher, test_pipe.handle1.get(),
359                                     base::Bind(&DeleteWatcherAndForwardResult,
360                                                watcher,
361                                                callback_helper.GetCallback()));
362   EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle0.get(),
363                                            std::string()));
364   callback_helper.RunUntilGotCallback();
365   EXPECT_TRUE(callback_helper.got_callback());
366 }
367 
TEST_P(HandleWatcherTest,AbortedOnMessageLoopDestruction)368 TEST_P(HandleWatcherTest, AbortedOnMessageLoopDestruction) {
369   bool was_signaled = false;
370   MojoResult result = MOJO_RESULT_OK;
371 
372   MessagePipe pipe;
373   HandleWatcher watcher;
374   watcher.Start(pipe.handle0.get(),
375                 MOJO_HANDLE_SIGNAL_READABLE,
376                 MOJO_DEADLINE_INDEFINITE,
377                 base::Bind(&ObserveCallback, &was_signaled, &result));
378 
379   // Now, let the MessageLoop get torn down. We expect our callback to run.
380   TearDownMessageLoop();
381 
382   EXPECT_TRUE(was_signaled);
383   EXPECT_EQ(MOJO_RESULT_ABORTED, result);
384 }
385 
NeverReached(MojoResult result)386 void NeverReached(MojoResult result) {
387   FAIL() << "Callback should never be invoked " << result;
388 }
389 
390 // Called on the main thread when a thread is done. Decrements |active_count|
391 // and if |active_count| is zero quits |run_loop|.
StressThreadDone(base::RunLoop * run_loop,int * active_count)392 void StressThreadDone(base::RunLoop* run_loop, int* active_count) {
393   (*active_count)--;
394   EXPECT_GE(*active_count, 0);
395   if (*active_count == 0)
396     run_loop->Quit();
397 }
398 
399 // See description of StressTest. This is called on the background thread.
400 // |count| is the number of HandleWatchers to create. |active_count| is the
401 // number of outstanding threads, |task_runner| the task runner for the main
402 // thread and |run_loop| the run loop that should be quit when there are no more
403 // threads running. When done StressThreadDone() is invoked on the main thread.
404 // |active_count| and |run_loop| should only be used on the main thread.
RunStressTest(int count,scoped_refptr<base::TaskRunner> task_runner,base::RunLoop * run_loop,int * active_count)405 void RunStressTest(int count,
406                    scoped_refptr<base::TaskRunner> task_runner,
407                    base::RunLoop* run_loop,
408                    int* active_count) {
409   struct TestData {
410     MessagePipe pipe;
411     HandleWatcher watcher;
412   };
413   ScopedVector<TestData> data_vector;
414   for (int i = 0; i < count; ++i) {
415     if (i % 20 == 0) {
416       // Every so often we wait. This results in some level of thread balancing
417       // as well as making sure HandleWatcher has time to actually start some
418       // watches.
419       MessagePipe test_pipe;
420       ASSERT_TRUE(test_pipe.handle0.is_valid());
421       CallbackHelper callback_helper;
422       HandleWatcher watcher;
423       callback_helper.Start(&watcher, test_pipe.handle0.get());
424       RunUntilIdle();
425       EXPECT_FALSE(callback_helper.got_callback());
426       EXPECT_TRUE(mojo::test::WriteTextMessage(test_pipe.handle1.get(),
427                                                std::string()));
428       base::MessageLoop::ScopedNestableTaskAllower scoper(
429           base::MessageLoop::current());
430       callback_helper.RunUntilGotCallback();
431       EXPECT_TRUE(callback_helper.got_callback());
432     } else {
433       std::unique_ptr<TestData> test_data(new TestData);
434       ASSERT_TRUE(test_data->pipe.handle0.is_valid());
435       test_data->watcher.Start(test_data->pipe.handle0.get(),
436                     MOJO_HANDLE_SIGNAL_READABLE,
437                     MOJO_DEADLINE_INDEFINITE,
438                     base::Bind(&NeverReached));
439       data_vector.push_back(test_data.release());
440     }
441     if (i % 15 == 0)
442       data_vector.clear();
443   }
444   task_runner->PostTask(FROM_HERE,
445                         base::Bind(&StressThreadDone, run_loop,
446                                    active_count));
447 }
448 
449 // This test is meant to stress HandleWatcher. It uses from various threads
450 // repeatedly starting and stopping watches. It spins up kThreadCount
451 // threads. Each thread creates kWatchCount watches. Every so often each thread
452 // writes to a pipe and waits for the response.
TEST(HandleWatcherCleanEnvironmentTest,StressTest)453 TEST(HandleWatcherCleanEnvironmentTest, StressTest) {
454 #if defined(NDEBUG)
455   const int kThreadCount = 15;
456   const int kWatchCount = 400;
457 #else
458   const int kThreadCount = 10;
459   const int kWatchCount = 250;
460 #endif
461 
462   base::ShadowingAtExitManager at_exit;
463   base::MessageLoop message_loop;
464   base::RunLoop run_loop;
465   ScopedVector<base::Thread> threads;
466   int threads_active_counter = kThreadCount;
467   // Starts the threads first and then post the task in hopes of having more
468   // threads running at once.
469   for (int i = 0; i < kThreadCount; ++i) {
470     std::unique_ptr<base::Thread> thread(new base::Thread("test thread"));
471     if (i % 2) {
472       base::Thread::Options thread_options;
473       thread_options.message_pump_factory =
474           base::Bind(&MessagePumpMojo::Create);
475       thread->StartWithOptions(thread_options);
476     } else {
477       thread->Start();
478     }
479     threads.push_back(thread.release());
480   }
481   for (int i = 0; i < kThreadCount; ++i) {
482     threads[i]->task_runner()->PostTask(
483         FROM_HERE, base::Bind(&RunStressTest, kWatchCount,
484                               message_loop.task_runner(),
485                               &run_loop, &threads_active_counter));
486   }
487   run_loop.Run();
488   ASSERT_EQ(0, threads_active_counter);
489 }
490 
491 }  // namespace test
492 }  // namespace common
493 }  // namespace mojo
494