1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/public/cpp/bindings/connector.h"
6
7 #include <stddef.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/callback_helpers.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/run_loop.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "mojo/public/cpp/bindings/message.h"
20 #include "mojo/public/cpp/bindings/tests/message_queue.h"
21 #include "testing/gtest/include/gtest/gtest.h"
22
23 namespace mojo {
24 namespace test {
25 namespace {
26
27 class MessageAccumulator : public MessageReceiver {
28 public:
MessageAccumulator()29 MessageAccumulator() {}
MessageAccumulator(const base::Closure & closure)30 explicit MessageAccumulator(const base::Closure& closure)
31 : closure_(closure) {}
32
Accept(Message * message)33 bool Accept(Message* message) override {
34 queue_.Push(message);
35 if (!closure_.is_null())
36 base::ResetAndReturn(&closure_).Run();
37 return true;
38 }
39
IsEmpty() const40 bool IsEmpty() const { return queue_.IsEmpty(); }
41
Pop(Message * message)42 void Pop(Message* message) { queue_.Pop(message); }
43
set_closure(const base::Closure & closure)44 void set_closure(const base::Closure& closure) { closure_ = closure; }
45
size() const46 size_t size() const { return queue_.size(); }
47
48 private:
49 MessageQueue queue_;
50 base::Closure closure_;
51 };
52
53 class ConnectorDeletingMessageAccumulator : public MessageAccumulator {
54 public:
ConnectorDeletingMessageAccumulator(Connector ** connector)55 ConnectorDeletingMessageAccumulator(Connector** connector)
56 : connector_(connector) {}
57
Accept(Message * message)58 bool Accept(Message* message) override {
59 delete *connector_;
60 *connector_ = nullptr;
61 return MessageAccumulator::Accept(message);
62 }
63
64 private:
65 Connector** connector_;
66 };
67
68 class ReentrantMessageAccumulator : public MessageAccumulator {
69 public:
ReentrantMessageAccumulator(Connector * connector)70 ReentrantMessageAccumulator(Connector* connector)
71 : connector_(connector), number_of_calls_(0) {}
72
Accept(Message * message)73 bool Accept(Message* message) override {
74 if (!MessageAccumulator::Accept(message))
75 return false;
76 number_of_calls_++;
77 if (number_of_calls_ == 1) {
78 return connector_->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
79 }
80 return true;
81 }
82
number_of_calls()83 int number_of_calls() { return number_of_calls_; }
84
85 private:
86 Connector* connector_;
87 int number_of_calls_;
88 };
89
90 class ConnectorTest : public testing::Test {
91 public:
ConnectorTest()92 ConnectorTest() {}
93
SetUp()94 void SetUp() override { CreateMessagePipe(nullptr, &handle0_, &handle1_); }
95
TearDown()96 void TearDown() override {}
97
CreateMessage(const char * text,std::vector<ScopedHandle> handles=std::vector<ScopedHandle> ())98 Message CreateMessage(
99 const char* text,
100 std::vector<ScopedHandle> handles = std::vector<ScopedHandle>()) {
101 const size_t size = strlen(text) + 1; // Plus null terminator.
102 Message message(1, 0, size, 0, &handles);
103 memcpy(message.payload_buffer()->AllocateAndGet(size), text, size);
104 return message;
105 }
106
107 protected:
108 ScopedMessagePipeHandle handle0_;
109 ScopedMessagePipeHandle handle1_;
110
111 private:
112 base::MessageLoop loop_;
113 };
114
TEST_F(ConnectorTest,Basic)115 TEST_F(ConnectorTest, Basic) {
116 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
117 base::ThreadTaskRunnerHandle::Get());
118 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
119 base::ThreadTaskRunnerHandle::Get());
120
121 const char kText[] = "hello world";
122 Message message = CreateMessage(kText);
123 connector0.Accept(&message);
124
125 base::RunLoop run_loop;
126 MessageAccumulator accumulator(run_loop.QuitClosure());
127 connector1.set_incoming_receiver(&accumulator);
128
129 run_loop.Run();
130
131 ASSERT_FALSE(accumulator.IsEmpty());
132
133 Message message_received;
134 accumulator.Pop(&message_received);
135
136 EXPECT_EQ(
137 std::string(kText),
138 std::string(reinterpret_cast<const char*>(message_received.payload())));
139 }
140
TEST_F(ConnectorTest,Basic_Synchronous)141 TEST_F(ConnectorTest, Basic_Synchronous) {
142 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
143 base::ThreadTaskRunnerHandle::Get());
144 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
145 base::ThreadTaskRunnerHandle::Get());
146
147 const char kText[] = "hello world";
148 Message message = CreateMessage(kText);
149 connector0.Accept(&message);
150
151 MessageAccumulator accumulator;
152 connector1.set_incoming_receiver(&accumulator);
153
154 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
155
156 ASSERT_FALSE(accumulator.IsEmpty());
157
158 Message message_received;
159 accumulator.Pop(&message_received);
160
161 EXPECT_EQ(
162 std::string(kText),
163 std::string(reinterpret_cast<const char*>(message_received.payload())));
164 }
165
TEST_F(ConnectorTest,Basic_EarlyIncomingReceiver)166 TEST_F(ConnectorTest, Basic_EarlyIncomingReceiver) {
167 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
168 base::ThreadTaskRunnerHandle::Get());
169 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
170 base::ThreadTaskRunnerHandle::Get());
171
172 base::RunLoop run_loop;
173 MessageAccumulator accumulator(run_loop.QuitClosure());
174 connector1.set_incoming_receiver(&accumulator);
175
176 const char kText[] = "hello world";
177 Message message = CreateMessage(kText);
178 connector0.Accept(&message);
179
180 run_loop.Run();
181
182 ASSERT_FALSE(accumulator.IsEmpty());
183
184 Message message_received;
185 accumulator.Pop(&message_received);
186
187 EXPECT_EQ(
188 std::string(kText),
189 std::string(reinterpret_cast<const char*>(message_received.payload())));
190 }
191
TEST_F(ConnectorTest,Basic_TwoMessages)192 TEST_F(ConnectorTest, Basic_TwoMessages) {
193 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
194 base::ThreadTaskRunnerHandle::Get());
195 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
196 base::ThreadTaskRunnerHandle::Get());
197
198 const char* kText[] = {"hello", "world"};
199 for (size_t i = 0; i < arraysize(kText); ++i) {
200 Message message = CreateMessage(kText[i]);
201 connector0.Accept(&message);
202 }
203
204 MessageAccumulator accumulator;
205 connector1.set_incoming_receiver(&accumulator);
206
207 for (size_t i = 0; i < arraysize(kText); ++i) {
208 if (accumulator.IsEmpty()) {
209 base::RunLoop run_loop;
210 accumulator.set_closure(run_loop.QuitClosure());
211 run_loop.Run();
212 }
213 ASSERT_FALSE(accumulator.IsEmpty());
214
215 Message message_received;
216 accumulator.Pop(&message_received);
217
218 EXPECT_EQ(
219 std::string(kText[i]),
220 std::string(reinterpret_cast<const char*>(message_received.payload())));
221 }
222 }
223
TEST_F(ConnectorTest,Basic_TwoMessages_Synchronous)224 TEST_F(ConnectorTest, Basic_TwoMessages_Synchronous) {
225 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
226 base::ThreadTaskRunnerHandle::Get());
227 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
228 base::ThreadTaskRunnerHandle::Get());
229
230 const char* kText[] = {"hello", "world"};
231 for (size_t i = 0; i < arraysize(kText); ++i) {
232 Message message = CreateMessage(kText[i]);
233 connector0.Accept(&message);
234 }
235
236 MessageAccumulator accumulator;
237 connector1.set_incoming_receiver(&accumulator);
238
239 connector1.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
240
241 ASSERT_FALSE(accumulator.IsEmpty());
242
243 Message message_received;
244 accumulator.Pop(&message_received);
245
246 EXPECT_EQ(
247 std::string(kText[0]),
248 std::string(reinterpret_cast<const char*>(message_received.payload())));
249
250 ASSERT_TRUE(accumulator.IsEmpty());
251 }
252
TEST_F(ConnectorTest,WriteToClosedPipe)253 TEST_F(ConnectorTest, WriteToClosedPipe) {
254 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
255 base::ThreadTaskRunnerHandle::Get());
256
257 const char kText[] = "hello world";
258 Message message = CreateMessage(kText);
259
260 // Close the other end of the pipe.
261 handle1_.reset();
262
263 // Not observed yet because we haven't spun the message loop yet.
264 EXPECT_FALSE(connector0.encountered_error());
265
266 // Write failures are not reported.
267 bool ok = connector0.Accept(&message);
268 EXPECT_TRUE(ok);
269
270 // Still not observed.
271 EXPECT_FALSE(connector0.encountered_error());
272
273 // Spin the message loop, and then we should start observing the closed pipe.
274 base::RunLoop run_loop;
275 connector0.set_connection_error_handler(run_loop.QuitClosure());
276 run_loop.Run();
277
278 EXPECT_TRUE(connector0.encountered_error());
279 }
280
TEST_F(ConnectorTest,MessageWithHandles)281 TEST_F(ConnectorTest, MessageWithHandles) {
282 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
283 base::ThreadTaskRunnerHandle::Get());
284 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
285 base::ThreadTaskRunnerHandle::Get());
286
287 const char kText[] = "hello world";
288
289 MessagePipe pipe;
290 std::vector<ScopedHandle> handles;
291 handles.emplace_back(ScopedHandle::From(std::move(pipe.handle0)));
292 Message message1 = CreateMessage(kText, std::move(handles));
293
294 connector0.Accept(&message1);
295
296 base::RunLoop run_loop;
297 MessageAccumulator accumulator(run_loop.QuitClosure());
298 connector1.set_incoming_receiver(&accumulator);
299
300 run_loop.Run();
301
302 ASSERT_FALSE(accumulator.IsEmpty());
303
304 Message message_received;
305 accumulator.Pop(&message_received);
306
307 EXPECT_EQ(
308 std::string(kText),
309 std::string(reinterpret_cast<const char*>(message_received.payload())));
310 ASSERT_EQ(1U, message_received.handles()->size());
311
312 // Now send a message to the transferred handle and confirm it's sent through
313 // to the orginal pipe.
314 auto pipe_handle = ScopedMessagePipeHandle::From(
315 std::move(message_received.mutable_handles()->front()));
316 Connector connector_received(std::move(pipe_handle),
317 Connector::SINGLE_THREADED_SEND,
318 base::ThreadTaskRunnerHandle::Get());
319 Connector connector_original(std::move(pipe.handle1),
320 Connector::SINGLE_THREADED_SEND,
321 base::ThreadTaskRunnerHandle::Get());
322
323 Message message2 = CreateMessage(kText);
324 connector_received.Accept(&message2);
325 base::RunLoop run_loop2;
326 MessageAccumulator accumulator2(run_loop2.QuitClosure());
327 connector_original.set_incoming_receiver(&accumulator2);
328 run_loop2.Run();
329
330 ASSERT_FALSE(accumulator2.IsEmpty());
331
332 accumulator2.Pop(&message_received);
333
334 EXPECT_EQ(
335 std::string(kText),
336 std::string(reinterpret_cast<const char*>(message_received.payload())));
337 }
338
TEST_F(ConnectorTest,WaitForIncomingMessageWithError)339 TEST_F(ConnectorTest, WaitForIncomingMessageWithError) {
340 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
341 base::ThreadTaskRunnerHandle::Get());
342 // Close the other end of the pipe.
343 handle1_.reset();
344 ASSERT_FALSE(connector0.WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE));
345 }
346
TEST_F(ConnectorTest,WaitForIncomingMessageWithDeletion)347 TEST_F(ConnectorTest, WaitForIncomingMessageWithDeletion) {
348 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
349 base::ThreadTaskRunnerHandle::Get());
350 Connector* connector1 =
351 new Connector(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
352 base::ThreadTaskRunnerHandle::Get());
353
354 const char kText[] = "hello world";
355 Message message = CreateMessage(kText);
356 connector0.Accept(&message);
357
358 ConnectorDeletingMessageAccumulator accumulator(&connector1);
359 connector1->set_incoming_receiver(&accumulator);
360
361 connector1->WaitForIncomingMessage(MOJO_DEADLINE_INDEFINITE);
362
363 ASSERT_FALSE(connector1);
364 ASSERT_FALSE(accumulator.IsEmpty());
365
366 Message message_received;
367 accumulator.Pop(&message_received);
368
369 EXPECT_EQ(
370 std::string(kText),
371 std::string(reinterpret_cast<const char*>(message_received.payload())));
372 }
373
TEST_F(ConnectorTest,WaitForIncomingMessageWithReentrancy)374 TEST_F(ConnectorTest, WaitForIncomingMessageWithReentrancy) {
375 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
376 base::ThreadTaskRunnerHandle::Get());
377 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
378 base::ThreadTaskRunnerHandle::Get());
379
380 const char* kText[] = {"hello", "world"};
381 for (size_t i = 0; i < arraysize(kText); ++i) {
382 Message message = CreateMessage(kText[i]);
383 connector0.Accept(&message);
384 }
385
386 ReentrantMessageAccumulator accumulator(&connector1);
387 connector1.set_incoming_receiver(&accumulator);
388
389 for (size_t i = 0; i < arraysize(kText); ++i) {
390 if (accumulator.IsEmpty()) {
391 base::RunLoop run_loop;
392 accumulator.set_closure(run_loop.QuitClosure());
393 run_loop.Run();
394 }
395 ASSERT_FALSE(accumulator.IsEmpty());
396
397 Message message_received;
398 accumulator.Pop(&message_received);
399
400 EXPECT_EQ(
401 std::string(kText[i]),
402 std::string(reinterpret_cast<const char*>(message_received.payload())));
403 }
404
405 ASSERT_EQ(2, accumulator.number_of_calls());
406 }
407
ForwardErrorHandler(bool * called,const base::Closure & callback)408 void ForwardErrorHandler(bool* called, const base::Closure& callback) {
409 *called = true;
410 callback.Run();
411 }
412
TEST_F(ConnectorTest,RaiseError)413 TEST_F(ConnectorTest, RaiseError) {
414 base::RunLoop run_loop, run_loop2;
415 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
416 base::ThreadTaskRunnerHandle::Get());
417 bool error_handler_called0 = false;
418 connector0.set_connection_error_handler(base::Bind(
419 &ForwardErrorHandler, &error_handler_called0, run_loop.QuitClosure()));
420
421 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
422 base::ThreadTaskRunnerHandle::Get());
423 bool error_handler_called1 = false;
424 connector1.set_connection_error_handler(base::Bind(
425 &ForwardErrorHandler, &error_handler_called1, run_loop2.QuitClosure()));
426
427 const char kText[] = "hello world";
428 Message message = CreateMessage(kText);
429 connector0.Accept(&message);
430 connector0.RaiseError();
431
432 base::RunLoop run_loop3;
433 MessageAccumulator accumulator(run_loop3.QuitClosure());
434 connector1.set_incoming_receiver(&accumulator);
435
436 run_loop3.Run();
437
438 // Messages sent prior to RaiseError() still arrive at the other end.
439 ASSERT_FALSE(accumulator.IsEmpty());
440
441 Message message_received;
442 accumulator.Pop(&message_received);
443
444 EXPECT_EQ(
445 std::string(kText),
446 std::string(reinterpret_cast<const char*>(message_received.payload())));
447
448 run_loop.Run();
449 run_loop2.Run();
450
451 // Connection error handler is called at both sides.
452 EXPECT_TRUE(error_handler_called0);
453 EXPECT_TRUE(error_handler_called1);
454
455 // The error flag is set at both sides.
456 EXPECT_TRUE(connector0.encountered_error());
457 EXPECT_TRUE(connector1.encountered_error());
458
459 // The message pipe handle is valid at both sides.
460 EXPECT_TRUE(connector0.is_valid());
461 EXPECT_TRUE(connector1.is_valid());
462 }
463
PauseConnectorAndRunClosure(Connector * connector,const base::Closure & closure)464 void PauseConnectorAndRunClosure(Connector* connector,
465 const base::Closure& closure) {
466 connector->PauseIncomingMethodCallProcessing();
467 closure.Run();
468 }
469
TEST_F(ConnectorTest,PauseWithQueuedMessages)470 TEST_F(ConnectorTest, PauseWithQueuedMessages) {
471 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
472 base::ThreadTaskRunnerHandle::Get());
473 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
474 base::ThreadTaskRunnerHandle::Get());
475
476 const char kText[] = "hello world";
477
478 // Queue up two messages.
479 Message message = CreateMessage(kText);
480 connector0.Accept(&message);
481 message = CreateMessage(kText);
482 connector0.Accept(&message);
483
484 base::RunLoop run_loop;
485 // Configure the accumulator such that it pauses after the first message is
486 // received.
487 MessageAccumulator accumulator(base::Bind(
488 &PauseConnectorAndRunClosure, &connector1, run_loop.QuitClosure()));
489 connector1.set_incoming_receiver(&accumulator);
490
491 run_loop.Run();
492
493 // As we paused after the first message we should only have gotten one
494 // message.
495 ASSERT_EQ(1u, accumulator.size());
496 }
497
AccumulateWithNestedLoop(MessageAccumulator * accumulator,const base::Closure & closure)498 void AccumulateWithNestedLoop(MessageAccumulator* accumulator,
499 const base::Closure& closure) {
500 base::RunLoop nested_run_loop(base::RunLoop::Type::kNestableTasksAllowed);
501 accumulator->set_closure(nested_run_loop.QuitClosure());
502 nested_run_loop.Run();
503 closure.Run();
504 }
505
TEST_F(ConnectorTest,ProcessWhenNested)506 TEST_F(ConnectorTest, ProcessWhenNested) {
507 Connector connector0(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
508 base::ThreadTaskRunnerHandle::Get());
509 Connector connector1(std::move(handle1_), Connector::SINGLE_THREADED_SEND,
510 base::ThreadTaskRunnerHandle::Get());
511
512 const char kText[] = "hello world";
513
514 // Queue up two messages.
515 Message message = CreateMessage(kText);
516 connector0.Accept(&message);
517 message = CreateMessage(kText);
518 connector0.Accept(&message);
519
520 base::RunLoop run_loop;
521 MessageAccumulator accumulator;
522 // When the accumulator gets the first message it spins a nested message
523 // loop. The loop is quit when another message is received.
524 accumulator.set_closure(base::Bind(&AccumulateWithNestedLoop, &accumulator,
525 run_loop.QuitClosure()));
526 connector1.set_incoming_receiver(&accumulator);
527
528 run_loop.Run();
529
530 ASSERT_EQ(2u, accumulator.size());
531 }
532
TEST_F(ConnectorTest,DestroyOnDifferentThreadAfterClose)533 TEST_F(ConnectorTest, DestroyOnDifferentThreadAfterClose) {
534 std::unique_ptr<Connector> connector(
535 new Connector(std::move(handle0_), Connector::SINGLE_THREADED_SEND,
536 base::ThreadTaskRunnerHandle::Get()));
537
538 connector->CloseMessagePipe();
539
540 base::Thread another_thread("ThreadForDestroyingConnector");
541 another_thread.Start();
542
543 base::RunLoop run_loop;
544 another_thread.task_runner()->PostTaskAndReply(
545 FROM_HERE,
546 base::Bind(
547 [](std::unique_ptr<Connector> connector) { connector.reset(); },
548 base::Passed(std::move(connector))),
549 run_loop.QuitClosure());
550
551 run_loop.Run();
552 }
553
554 } // namespace
555 } // namespace test
556 } // namespace mojo
557