1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stddef.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/callback_helpers.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/run_loop.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "mojo/public/cpp/bindings/message.h"
20 #include "mojo/public/cpp/bindings/tests/message_queue.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22 
23 namespace mojo {
24 namespace test {
25 namespace {
26 
27 class MessageAccumulator : public MessageReceiver {
28  public:
MessageAccumulator()29   MessageAccumulator() {}
MessageAccumulator(const base::Closure & closure)30   explicit MessageAccumulator(const base::Closure& closure)
31       : closure_(closure) {}
32 
Accept(Message * message)33   bool Accept(Message* message) override {
34     queue_.Push(message);
35     if (!closure_.is_null())
36       base::ResetAndReturn(&closure_).Run();
37     return true;
38   }
39 
IsEmpty() const40   bool IsEmpty() const { return queue_.IsEmpty(); }
41 
Pop(Message * message)42   void Pop(Message* message) { queue_.Pop(message); }
43 
set_closure(const base::Closure & closure)44   void set_closure(const base::Closure& closure) { closure_ = closure; }
45 
size() const46   size_t size() const { return queue_.size(); }
47 
48  private:
49   MessageQueue queue_;
50   base::Closure closure_;
51 };
52 
53 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
54  public:
ConnectorDeletingMessageAccumulator(Connector ** connector)55   ConnectorDeletingMessageAccumulator(Connector** connector)
56       : connector_(connector) {}
57 
Accept(Message * message)58   bool Accept(Message* message) override {
59     delete *connector_;
60     *connector_ = nullptr;
61     return MessageAccumulator::Accept(message);
62   }
63 
64  private:
65   Connector** connector_;
66 };
67 
68 class ReentrantMessageAccumulator : public MessageAccumulator {
69  public:
ReentrantMessageAccumulator(Connector * connector)70   ReentrantMessageAccumulator(Connector* connector)
71       : connector_(connector), number_of_calls_(0) {}
72 
Accept(Message * message)73   bool Accept(Message* message) override {
74     if (!MessageAccumulator::Accept(message))
75       return false;
76     number_of_calls_++;
77     if (number_of_calls_ == 1) {
78       return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
79     }
80     return true;
81   }
82 
number_of_calls()83   int number_of_calls() { return number_of_calls_; }
84 
85  private:
86   Connector* connector_;
87   int number_of_calls_;
88 };
89 
90 class ConnectorTest : public testing::Test {
91  public:
ConnectorTest()92   ConnectorTest() {}
93 
SetUp()94   void SetUp() override { CreateMessagePipe(nullptr, &handle0_, &handle1_); }
95 
TearDown()96   void TearDown() override {}
97 
CreateMessage(const char * text,std::vector<ScopedHandle> handles=std::vector<ScopedHandle> ())98   Message CreateMessage(
99       const char* text,
100       std::vector<ScopedHandle> handles = std::vector<ScopedHandle>()) {
101     const size_t size = strlen(text) + 1;  // Plus null terminator.
102     Message message(1, 0, size, 0, &handles);
103     memcpy(message.payload_buffer()->AllocateAndGet(size), text, size);
104     return message;
105   }
106 
107  protected:
108   ScopedMessagePipeHandle handle0_;
109   ScopedMessagePipeHandle handle1_;
110 
111  private:
112   base::MessageLoop loop_;
113 };
114 
TEST_F(ConnectorTest,Basic)115 TEST_F(ConnectorTest, Basic) {
116   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
117                        base::ThreadTaskRunnerHandle::Get());
118   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
119                        base::ThreadTaskRunnerHandle::Get());
120 
121   const char kText[] = "hello world";
122   Message message = CreateMessage(kText);
123   connector0.Accept(&message);
124 
125   base::RunLoop run_loop;
126   MessageAccumulator accumulator(run_loop.QuitClosure());
127   connector1.set_incoming_receiver(&accumulator);
128 
129   run_loop.Run();
130 
131   ASSERT_FALSE(accumulator.IsEmpty());
132 
133   Message message_received;
134   accumulator.Pop(&message_received);
135 
136   EXPECT_EQ(
137       std::string(kText),
138       std::string(reinterpret_cast<const char*>(message_received.payload())));
139 }
140 
TEST_F(ConnectorTest,Basic_Synchronous)141 TEST_F(ConnectorTest, Basic_Synchronous) {
142   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
143                        base::ThreadTaskRunnerHandle::Get());
144   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
145                        base::ThreadTaskRunnerHandle::Get());
146 
147   const char kText[] = "hello world";
148   Message message = CreateMessage(kText);
149   connector0.Accept(&message);
150 
151   MessageAccumulator accumulator;
152   connector1.set_incoming_receiver(&accumulator);
153 
154   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
155 
156   ASSERT_FALSE(accumulator.IsEmpty());
157 
158   Message message_received;
159   accumulator.Pop(&message_received);
160 
161   EXPECT_EQ(
162       std::string(kText),
163       std::string(reinterpret_cast<const char*>(message_received.payload())));
164 }
165 
TEST_F(ConnectorTest,Basic_EarlyIncomingReceiver)166 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
167   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
168                        base::ThreadTaskRunnerHandle::Get());
169   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
170                        base::ThreadTaskRunnerHandle::Get());
171 
172   base::RunLoop run_loop;
173   MessageAccumulator accumulator(run_loop.QuitClosure());
174   connector1.set_incoming_receiver(&accumulator);
175 
176   const char kText[] = "hello world";
177   Message message = CreateMessage(kText);
178   connector0.Accept(&message);
179 
180   run_loop.Run();
181 
182   ASSERT_FALSE(accumulator.IsEmpty());
183 
184   Message message_received;
185   accumulator.Pop(&message_received);
186 
187   EXPECT_EQ(
188       std::string(kText),
189       std::string(reinterpret_cast<const char*>(message_received.payload())));
190 }
191 
TEST_F(ConnectorTest,Basic_TwoMessages)192 TEST_F(ConnectorTest, Basic_TwoMessages) {
193   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
194                        base::ThreadTaskRunnerHandle::Get());
195   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
196                        base::ThreadTaskRunnerHandle::Get());
197 
198   const char* kText[] = {"hello", "world"};
199   for (size_t i = 0; i < arraysize(kText); ++i) {
200     Message message = CreateMessage(kText[i]);
201     connector0.Accept(&message);
202   }
203 
204   MessageAccumulator accumulator;
205   connector1.set_incoming_receiver(&accumulator);
206 
207   for (size_t i = 0; i < arraysize(kText); ++i) {
208     if (accumulator.IsEmpty()) {
209       base::RunLoop run_loop;
210       accumulator.set_closure(run_loop.QuitClosure());
211       run_loop.Run();
212     }
213     ASSERT_FALSE(accumulator.IsEmpty());
214 
215     Message message_received;
216     accumulator.Pop(&message_received);
217 
218     EXPECT_EQ(
219         std::string(kText[i]),
220         std::string(reinterpret_cast<const char*>(message_received.payload())));
221   }
222 }
223 
TEST_F(ConnectorTest,Basic_TwoMessages_Synchronous)224 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
225   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
226                        base::ThreadTaskRunnerHandle::Get());
227   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
228                        base::ThreadTaskRunnerHandle::Get());
229 
230   const char* kText[] = {"hello", "world"};
231   for (size_t i = 0; i < arraysize(kText); ++i) {
232     Message message = CreateMessage(kText[i]);
233     connector0.Accept(&message);
234   }
235 
236   MessageAccumulator accumulator;
237   connector1.set_incoming_receiver(&accumulator);
238 
239   connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
240 
241   ASSERT_FALSE(accumulator.IsEmpty());
242 
243   Message message_received;
244   accumulator.Pop(&message_received);
245 
246   EXPECT_EQ(
247       std::string(kText[0]),
248       std::string(reinterpret_cast<const char*>(message_received.payload())));
249 
250   ASSERT_TRUE(accumulator.IsEmpty());
251 }
252 
TEST_F(ConnectorTest,WriteToClosedPipe)253 TEST_F(ConnectorTest, WriteToClosedPipe) {
254   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
255                        base::ThreadTaskRunnerHandle::Get());
256 
257   const char kText[] = "hello world";
258   Message message = CreateMessage(kText);
259 
260   // Close the other end of the pipe.
261   handle1_.reset();
262 
263   // Not observed yet because we haven't spun the message loop yet.
264   EXPECT_FALSE(connector0.encountered_error());
265 
266   // Write failures are not reported.
267   bool ok = connector0.Accept(&message);
268   EXPECT_TRUE(ok);
269 
270   // Still not observed.
271   EXPECT_FALSE(connector0.encountered_error());
272 
273   // Spin the message loop, and then we should start observing the closed pipe.
274   base::RunLoop run_loop;
275   connector0.set_connection_error_handler(run_loop.QuitClosure());
276   run_loop.Run();
277 
278   EXPECT_TRUE(connector0.encountered_error());
279 }
280 
TEST_F(ConnectorTest,MessageWithHandles)281 TEST_F(ConnectorTest, MessageWithHandles) {
282   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
283                        base::ThreadTaskRunnerHandle::Get());
284   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
285                        base::ThreadTaskRunnerHandle::Get());
286 
287   const char kText[] = "hello world";
288 
289   MessagePipe pipe;
290   std::vector<ScopedHandle> handles;
291   handles.emplace_back(ScopedHandle::From(std::move(pipe.handle0)));
292   Message message1 = CreateMessage(kText, std::move(handles));
293 
294   connector0.Accept(&message1);
295 
296   base::RunLoop run_loop;
297   MessageAccumulator accumulator(run_loop.QuitClosure());
298   connector1.set_incoming_receiver(&accumulator);
299 
300   run_loop.Run();
301 
302   ASSERT_FALSE(accumulator.IsEmpty());
303 
304   Message message_received;
305   accumulator.Pop(&message_received);
306 
307   EXPECT_EQ(
308       std::string(kText),
309       std::string(reinterpret_cast<const char*>(message_received.payload())));
310   ASSERT_EQ(1U, message_received.handles()->size());
311 
312   // Now send a message to the transferred handle and confirm it's sent through
313   // to the orginal pipe.
314   auto pipe_handle = ScopedMessagePipeHandle::From(
315       std::move(message_received.mutable_handles()->front()));
316   Connector connector_received(std::move(pipe_handle),
317                                Connector::SINGLE_THREADED_SEND,
318                                base::ThreadTaskRunnerHandle::Get());
319   Connector connector_original(std::move(pipe.handle1),
320                                Connector::SINGLE_THREADED_SEND,
321                                base::ThreadTaskRunnerHandle::Get());
322 
323   Message message2 = CreateMessage(kText);
324   connector_received.Accept(&message2);
325   base::RunLoop run_loop2;
326   MessageAccumulator accumulator2(run_loop2.QuitClosure());
327   connector_original.set_incoming_receiver(&accumulator2);
328   run_loop2.Run();
329 
330   ASSERT_FALSE(accumulator2.IsEmpty());
331 
332   accumulator2.Pop(&message_received);
333 
334   EXPECT_EQ(
335       std::string(kText),
336       std::string(reinterpret_cast<const char*>(message_received.payload())));
337 }
338 
TEST_F(ConnectorTest,WaitForIncomingMessageWithError)339 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
340   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
341                        base::ThreadTaskRunnerHandle::Get());
342   // Close the other end of the pipe.
343   handle1_.reset();
344   ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
345 }
346 
TEST_F(ConnectorTest,WaitForIncomingMessageWithDeletion)347 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
348   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
349                        base::ThreadTaskRunnerHandle::Get());
350   Connector* connector1 =
351       new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
352                     base::ThreadTaskRunnerHandle::Get());
353 
354   const char kText[] = "hello world";
355   Message message = CreateMessage(kText);
356   connector0.Accept(&message);
357 
358   ConnectorDeletingMessageAccumulator accumulator(&connector1);
359   connector1->set_incoming_receiver(&accumulator);
360 
361   connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
362 
363   ASSERT_FALSE(connector1);
364   ASSERT_FALSE(accumulator.IsEmpty());
365 
366   Message message_received;
367   accumulator.Pop(&message_received);
368 
369   EXPECT_EQ(
370       std::string(kText),
371       std::string(reinterpret_cast<const char*>(message_received.payload())));
372 }
373 
TEST_F(ConnectorTest,WaitForIncomingMessageWithReentrancy)374 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
375   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
376                        base::ThreadTaskRunnerHandle::Get());
377   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
378                        base::ThreadTaskRunnerHandle::Get());
379 
380   const char* kText[] = {"hello", "world"};
381   for (size_t i = 0; i < arraysize(kText); ++i) {
382     Message message = CreateMessage(kText[i]);
383     connector0.Accept(&message);
384   }
385 
386   ReentrantMessageAccumulator accumulator(&connector1);
387   connector1.set_incoming_receiver(&accumulator);
388 
389   for (size_t i = 0; i < arraysize(kText); ++i) {
390     if (accumulator.IsEmpty()) {
391       base::RunLoop run_loop;
392       accumulator.set_closure(run_loop.QuitClosure());
393       run_loop.Run();
394     }
395     ASSERT_FALSE(accumulator.IsEmpty());
396 
397     Message message_received;
398     accumulator.Pop(&message_received);
399 
400     EXPECT_EQ(
401         std::string(kText[i]),
402         std::string(reinterpret_cast<const char*>(message_received.payload())));
403   }
404 
405   ASSERT_EQ(2, accumulator.number_of_calls());
406 }
407 
ForwardErrorHandler(bool * called,const base::Closure & callback)408 void ForwardErrorHandler(bool* called, const base::Closure& callback) {
409   *called = true;
410   callback.Run();
411 }
412 
TEST_F(ConnectorTest,RaiseError)413 TEST_F(ConnectorTest, RaiseError) {
414   base::RunLoop run_loop, run_loop2;
415   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
416                        base::ThreadTaskRunnerHandle::Get());
417   bool error_handler_called0 = false;
418   connector0.set_connection_error_handler(base::Bind(
419       &ForwardErrorHandler, &error_handler_called0, run_loop.QuitClosure()));
420 
421   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
422                        base::ThreadTaskRunnerHandle::Get());
423   bool error_handler_called1 = false;
424   connector1.set_connection_error_handler(base::Bind(
425       &ForwardErrorHandler, &error_handler_called1, run_loop2.QuitClosure()));
426 
427   const char kText[] = "hello world";
428   Message message = CreateMessage(kText);
429   connector0.Accept(&message);
430   connector0.RaiseError();
431 
432   base::RunLoop run_loop3;
433   MessageAccumulator accumulator(run_loop3.QuitClosure());
434   connector1.set_incoming_receiver(&accumulator);
435 
436   run_loop3.Run();
437 
438   // Messages sent prior to RaiseError() still arrive at the other end.
439   ASSERT_FALSE(accumulator.IsEmpty());
440 
441   Message message_received;
442   accumulator.Pop(&message_received);
443 
444   EXPECT_EQ(
445       std::string(kText),
446       std::string(reinterpret_cast<const char*>(message_received.payload())));
447 
448   run_loop.Run();
449   run_loop2.Run();
450 
451   // Connection error handler is called at both sides.
452   EXPECT_TRUE(error_handler_called0);
453   EXPECT_TRUE(error_handler_called1);
454 
455   // The error flag is set at both sides.
456   EXPECT_TRUE(connector0.encountered_error());
457   EXPECT_TRUE(connector1.encountered_error());
458 
459   // The message pipe handle is valid at both sides.
460   EXPECT_TRUE(connector0.is_valid());
461   EXPECT_TRUE(connector1.is_valid());
462 }
463 
PauseConnectorAndRunClosure(Connector * connector,const base::Closure & closure)464 void PauseConnectorAndRunClosure(Connector* connector,
465                                  const base::Closure& closure) {
466   connector->PauseIncomingMethodCallProcessing();
467   closure.Run();
468 }
469 
TEST_F(ConnectorTest,PauseWithQueuedMessages)470 TEST_F(ConnectorTest, PauseWithQueuedMessages) {
471   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
472                        base::ThreadTaskRunnerHandle::Get());
473   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
474                        base::ThreadTaskRunnerHandle::Get());
475 
476   const char kText[] = "hello world";
477 
478   // Queue up two messages.
479   Message message = CreateMessage(kText);
480   connector0.Accept(&message);
481   message = CreateMessage(kText);
482   connector0.Accept(&message);
483 
484   base::RunLoop run_loop;
485   // Configure the accumulator such that it pauses after the first message is
486   // received.
487   MessageAccumulator accumulator(base::Bind(
488       &PauseConnectorAndRunClosure, &connector1, run_loop.QuitClosure()));
489   connector1.set_incoming_receiver(&accumulator);
490 
491   run_loop.Run();
492 
493   // As we paused after the first message we should only have gotten one
494   // message.
495   ASSERT_EQ(1u, accumulator.size());
496 }
497 
AccumulateWithNestedLoop(MessageAccumulator * accumulator,const base::Closure & closure)498 void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
499                               const base::Closure& closure) {
500   base::RunLoop nested_run_loop(base::RunLoop::Type::kNestableTasksAllowed);
501   accumulator->set_closure(nested_run_loop.QuitClosure());
502   nested_run_loop.Run();
503   closure.Run();
504 }
505 
TEST_F(ConnectorTest,ProcessWhenNested)506 TEST_F(ConnectorTest, ProcessWhenNested) {
507   Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
508                        base::ThreadTaskRunnerHandle::Get());
509   Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
510                        base::ThreadTaskRunnerHandle::Get());
511 
512   const char kText[] = "hello world";
513 
514   // Queue up two messages.
515   Message message = CreateMessage(kText);
516   connector0.Accept(&message);
517   message = CreateMessage(kText);
518   connector0.Accept(&message);
519 
520   base::RunLoop run_loop;
521   MessageAccumulator accumulator;
522   // When the accumulator gets the first message it spins a nested message
523   // loop. The loop is quit when another message is received.
524   accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
525                                      run_loop.QuitClosure()));
526   connector1.set_incoming_receiver(&accumulator);
527 
528   run_loop.Run();
529 
530   ASSERT_EQ(2u, accumulator.size());
531 }
532 
TEST_F(ConnectorTest,DestroyOnDifferentThreadAfterClose)533 TEST_F(ConnectorTest, DestroyOnDifferentThreadAfterClose) {
534   std::unique_ptr<Connector> connector(
535       new Connector(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
536                     base::ThreadTaskRunnerHandle::Get()));
537 
538   connector->CloseMessagePipe();
539 
540   base::Thread another_thread("ThreadForDestroyingConnector");
541   another_thread.Start();
542 
543   base::RunLoop run_loop;
544   another_thread.task_runner()->PostTaskAndReply(
545       FROM_HERE,
546       base::Bind(
547           [](std::unique_ptr<Connector> connector) { connector.reset(); },
548           base::Passed(std::move(connector))),
549       run_loop.QuitClosure());
550 
551   run_loop.Run();
552 }
553 
554 }  // namespace
555 }  // namespace test
556 }  // namespace mojo
557