1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/system/wait_set.h"
6 
7 #include <set>
8 #include <vector>
9 
10 #include "base/bind.h"
11 #include "base/callback.h"
12 #include "base/memory/ptr_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/platform_thread.h"
15 #include "base/threading/simple_thread.h"
16 #include "mojo/public/cpp/system/message_pipe.h"
17 #include "mojo/public/cpp/system/wait.h"
18 #include "testing/gtest/include/gtest/gtest.h"
19 
20 namespace mojo {
21 namespace {
22 
23 using WaitSetTest = testing::Test;
24 
WriteMessage(const ScopedMessagePipeHandle & handle,const base::StringPiece & message)25 void WriteMessage(const ScopedMessagePipeHandle& handle,
26                   const base::StringPiece& message) {
27   MojoResult rv = WriteMessageRaw(handle.get(), message.data(),
28                                   static_cast<uint32_t>(message.size()),
29                                   nullptr, 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
30   CHECK_EQ(MOJO_RESULT_OK, rv);
31 }
32 
ReadMessage(const ScopedMessagePipeHandle & handle)33 std::string ReadMessage(const ScopedMessagePipeHandle& handle) {
34   std::vector<uint8_t> bytes;
35   MojoResult rv = ReadMessageRaw(handle.get(), &bytes, nullptr,
36                                  MOJO_READ_MESSAGE_FLAG_NONE);
37   CHECK_EQ(MOJO_RESULT_OK, rv);
38   return std::string(bytes.begin(), bytes.end());
39 }
40 
41 class ThreadedRunner : public base::SimpleThread {
42  public:
ThreadedRunner(const base::Closure & callback)43   explicit ThreadedRunner(const base::Closure& callback)
44       : SimpleThread("ThreadedRunner"), callback_(callback) {}
~ThreadedRunner()45   ~ThreadedRunner() override { Join(); }
46 
Run()47   void Run() override { callback_.Run(); }
48 
49  private:
50   const base::Closure callback_;
51 
52   DISALLOW_COPY_AND_ASSIGN(ThreadedRunner);
53 };
54 
TEST_F(WaitSetTest,Satisfied)55 TEST_F(WaitSetTest, Satisfied) {
56   WaitSet wait_set;
57   MessagePipe p;
58 
59   const char kTestMessage1[] = "hello wake up";
60 
61   // Watch only one handle and write to the other.
62 
63   wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
64   WriteMessage(p.handle0, kTestMessage1);
65 
66   size_t num_ready_handles = 2;
67   Handle ready_handles[2];
68   MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
69   HandleSignalsState hss[2];
70   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
71 
72   EXPECT_EQ(1u, num_ready_handles);
73   EXPECT_EQ(p.handle1.get(), ready_handles[0]);
74   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
75   EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
76 
77   wait_set.RemoveHandle(p.handle1.get());
78 
79   // Now watch only the other handle and write to the first one.
80 
81   wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
82   WriteMessage(p.handle1, kTestMessage1);
83 
84   num_ready_handles = 2;
85   ready_results[0] = MOJO_RESULT_UNKNOWN;
86   ready_results[1] = MOJO_RESULT_UNKNOWN;
87   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
88 
89   EXPECT_EQ(1u, num_ready_handles);
90   EXPECT_EQ(p.handle0.get(), ready_handles[0]);
91   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
92   EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
93 
94   // Now wait on both of them.
95   wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
96 
97   num_ready_handles = 2;
98   ready_results[0] = MOJO_RESULT_UNKNOWN;
99   ready_results[1] = MOJO_RESULT_UNKNOWN;
100   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
101   EXPECT_EQ(2u, num_ready_handles);
102   EXPECT_TRUE((ready_handles[0] == p.handle0.get() &&
103                ready_handles[1] == p.handle1.get()) ||
104               (ready_handles[0] == p.handle1.get() &&
105                ready_handles[1] == p.handle0.get()));
106   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
107   EXPECT_EQ(MOJO_RESULT_OK, ready_results[1]);
108   EXPECT_TRUE(hss[0].readable() && hss[0].writable() && !hss[0].peer_closed());
109   EXPECT_TRUE(hss[1].readable() && hss[1].writable() && !hss[1].peer_closed());
110 
111   // Wait on both again, but with only enough output space for one result.
112   num_ready_handles = 1;
113   ready_results[0] = MOJO_RESULT_UNKNOWN;
114   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
115   EXPECT_EQ(1u, num_ready_handles);
116   EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
117               ready_handles[0] == p.handle1.get());
118   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
119 
120   // Remove the ready handle from the set and wait one more time.
121   EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
122 
123   num_ready_handles = 1;
124   ready_results[0] = MOJO_RESULT_UNKNOWN;
125   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results, hss);
126   EXPECT_EQ(1u, num_ready_handles);
127   EXPECT_TRUE(ready_handles[0] == p.handle0.get() ||
128               ready_handles[0] == p.handle1.get());
129   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
130 
131   EXPECT_EQ(MOJO_RESULT_OK, wait_set.RemoveHandle(ready_handles[0]));
132 
133   // The wait set should be empty now. Nothing to wait on.
134   num_ready_handles = 2;
135   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
136   EXPECT_EQ(0u, num_ready_handles);
137 }
138 
TEST_F(WaitSetTest,Unsatisfiable)139 TEST_F(WaitSetTest, Unsatisfiable) {
140   MessagePipe p, q;
141   WaitSet wait_set;
142 
143   wait_set.AddHandle(q.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
144   wait_set.AddHandle(q.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
145   wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
146 
147   size_t num_ready_handles = 2;
148   Handle ready_handles[2];
149   MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
150 
151   p.handle1.reset();
152   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
153   EXPECT_EQ(1u, num_ready_handles);
154   EXPECT_EQ(p.handle0.get(), ready_handles[0]);
155   EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ready_results[0]);
156 }
157 
TEST_F(WaitSetTest,CloseWhileWaiting)158 TEST_F(WaitSetTest, CloseWhileWaiting) {
159   MessagePipe p;
160   WaitSet wait_set;
161 
162   wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
163 
164   const Handle handle0_value = p.handle0.get();
165   ThreadedRunner close_after_delay(base::Bind(
166       [](ScopedMessagePipeHandle* handle) {
167         // Wait a little while, then close the handle.
168         base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
169         handle->reset();
170       },
171       &p.handle0));
172   close_after_delay.Start();
173 
174   size_t num_ready_handles = 2;
175   Handle ready_handles[2];
176   MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
177   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
178   EXPECT_EQ(1u, num_ready_handles);
179   EXPECT_EQ(handle0_value, ready_handles[0]);
180   EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_results[0]);
181 
182   EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
183 }
184 
TEST_F(WaitSetTest,CloseBeforeWaiting)185 TEST_F(WaitSetTest, CloseBeforeWaiting) {
186   MessagePipe p;
187   WaitSet wait_set;
188 
189   wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
190   wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
191 
192   Handle handle0_value = p.handle0.get();
193   Handle handle1_value = p.handle1.get();
194 
195   p.handle0.reset();
196   p.handle1.reset();
197 
198   // Ensure that the WaitSet user is always made aware of all cancellations even
199   // if they happen while not waiting, or they have to be returned over the span
200   // of multiple Wait() calls due to insufficient output storage.
201 
202   size_t num_ready_handles = 1;
203   Handle ready_handle;
204   MojoResult ready_result = MOJO_RESULT_UNKNOWN;
205   wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
206   EXPECT_EQ(1u, num_ready_handles);
207   EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
208   EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
209   EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
210 
211   wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
212   EXPECT_EQ(1u, num_ready_handles);
213   EXPECT_TRUE(ready_handle == handle0_value || ready_handle == handle1_value);
214   EXPECT_EQ(MOJO_RESULT_CANCELLED, ready_result);
215   EXPECT_EQ(MOJO_RESULT_NOT_FOUND, wait_set.RemoveHandle(handle0_value));
216 
217   // Nothing more to wait on.
218   wait_set.Wait(nullptr, &num_ready_handles, &ready_handle, &ready_result);
219   EXPECT_EQ(0u, num_ready_handles);
220 }
221 
TEST_F(WaitSetTest,SatisfiedThenUnsatisfied)222 TEST_F(WaitSetTest, SatisfiedThenUnsatisfied) {
223   MessagePipe p;
224   WaitSet wait_set;
225 
226   wait_set.AddHandle(p.handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
227   wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
228 
229   const char kTestMessage1[] = "testing testing testing";
230   WriteMessage(p.handle0, kTestMessage1);
231 
232   size_t num_ready_handles = 2;
233   Handle ready_handles[2];
234   MojoResult ready_results[2] = {MOJO_RESULT_UNKNOWN, MOJO_RESULT_UNKNOWN};
235   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
236   EXPECT_EQ(1u, num_ready_handles);
237   EXPECT_EQ(p.handle1.get(), ready_handles[0]);
238   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
239 
240   EXPECT_EQ(kTestMessage1, ReadMessage(p.handle1));
241 
242   ThreadedRunner write_after_delay(base::Bind(
243       [](ScopedMessagePipeHandle* handle) {
244         // Wait a little while, then write a message.
245         base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
246         WriteMessage(*handle, "wakey wakey");
247       },
248       &p.handle1));
249   write_after_delay.Start();
250 
251   num_ready_handles = 2;
252   wait_set.Wait(nullptr, &num_ready_handles, ready_handles, ready_results);
253   EXPECT_EQ(1u, num_ready_handles);
254   EXPECT_EQ(p.handle0.get(), ready_handles[0]);
255   EXPECT_EQ(MOJO_RESULT_OK, ready_results[0]);
256 }
257 
TEST_F(WaitSetTest,EventOnly)258 TEST_F(WaitSetTest, EventOnly) {
259   base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
260                             base::WaitableEvent::InitialState::SIGNALED);
261   WaitSet wait_set;
262   wait_set.AddEvent(&event);
263 
264   base::WaitableEvent* ready_event = nullptr;
265   size_t num_ready_handles = 1;
266   Handle ready_handle;
267   MojoResult ready_result = MOJO_RESULT_UNKNOWN;
268   wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
269   EXPECT_EQ(0u, num_ready_handles);
270   EXPECT_EQ(&event, ready_event);
271 }
272 
TEST_F(WaitSetTest,EventAndHandle)273 TEST_F(WaitSetTest, EventAndHandle) {
274   const char kTestMessage[] = "hello hello";
275 
276   MessagePipe p;
277   WriteMessage(p.handle0, kTestMessage);
278 
279   base::WaitableEvent event(base::WaitableEvent::ResetPolicy::MANUAL,
280                             base::WaitableEvent::InitialState::NOT_SIGNALED);
281 
282   WaitSet wait_set;
283   wait_set.AddHandle(p.handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
284   wait_set.AddEvent(&event);
285 
286   base::WaitableEvent* ready_event = nullptr;
287   size_t num_ready_handles = 1;
288   Handle ready_handle;
289   MojoResult ready_result = MOJO_RESULT_UNKNOWN;
290   wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
291   EXPECT_EQ(1u, num_ready_handles);
292   EXPECT_EQ(nullptr, ready_event);
293   EXPECT_EQ(p.handle1.get(), ready_handle);
294   EXPECT_EQ(MOJO_RESULT_OK, ready_result);
295 
296   EXPECT_EQ(kTestMessage, ReadMessage(p.handle1));
297 
298   ThreadedRunner signal_after_delay(base::Bind(
299       [](base::WaitableEvent* event) {
300         // Wait a little while, then close the handle.
301         base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(200));
302         event->Signal();
303       },
304       &event));
305   signal_after_delay.Start();
306 
307   wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle, &ready_result);
308   EXPECT_EQ(0u, num_ready_handles);
309   EXPECT_EQ(&event, ready_event);
310 }
311 
TEST_F(WaitSetTest,NoStarvation)312 TEST_F(WaitSetTest, NoStarvation) {
313   const char kTestMessage[] = "wait for it";
314   const size_t kNumTestPipes = 50;
315   const size_t kNumTestEvents = 10;
316 
317   // Create a bunch of handles and events which are always ready and add them
318   // to a shared WaitSet.
319 
320   WaitSet wait_set;
321 
322   MessagePipe pipes[kNumTestPipes];
323   for (size_t i = 0; i < kNumTestPipes; ++i) {
324     WriteMessage(pipes[i].handle0, kTestMessage);
325     Wait(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
326 
327     WriteMessage(pipes[i].handle1, kTestMessage);
328     Wait(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
329 
330     wait_set.AddHandle(pipes[i].handle0.get(), MOJO_HANDLE_SIGNAL_READABLE);
331     wait_set.AddHandle(pipes[i].handle1.get(), MOJO_HANDLE_SIGNAL_READABLE);
332   }
333 
334   std::vector<std::unique_ptr<base::WaitableEvent>> events(kNumTestEvents);
335   for (auto& event_ptr : events) {
336     event_ptr = std::make_unique<base::WaitableEvent>(
337         base::WaitableEvent::ResetPolicy::MANUAL,
338         base::WaitableEvent::InitialState::NOT_SIGNALED);
339     event_ptr->Signal();
340     wait_set.AddEvent(event_ptr.get());
341   }
342 
343   // Now verify that all handle and event signals are deteceted within a finite
344   // number of consecutive Wait() calls. Do it a few times for good measure.
345   for (size_t i = 0; i < 3; ++i) {
346     std::set<base::WaitableEvent*> ready_events;
347     std::set<Handle> ready_handles;
348     while (ready_events.size() < kNumTestEvents ||
349            ready_handles.size() < kNumTestPipes * 2) {
350       base::WaitableEvent* ready_event = nullptr;
351       size_t num_ready_handles = 1;
352       Handle ready_handle;
353       MojoResult ready_result = MOJO_RESULT_UNKNOWN;
354       wait_set.Wait(&ready_event, &num_ready_handles, &ready_handle,
355                     &ready_result);
356       if (ready_event)
357         ready_events.insert(ready_event);
358 
359       if (num_ready_handles) {
360         EXPECT_EQ(1u, num_ready_handles);
361         EXPECT_EQ(MOJO_RESULT_OK, ready_result);
362         ready_handles.insert(ready_handle);
363       }
364     }
365   }
366 }
367 
368 }  // namespace
369 }  // namespace mojo
370