1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <stddef.h>
6 #include <stdint.h>
7
8 #include <memory>
9
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder.h"
16 #include "mojo/edk/embedder/platform_channel_pair.h"
17 #include "mojo/edk/system/test_utils.h"
18 #include "mojo/edk/system/waiter.h"
19 #include "mojo/edk/test/mojo_test_base.h"
20 #include "mojo/public/c/system/data_pipe.h"
21 #include "mojo/public/c/system/functions.h"
22 #include "mojo/public/c/system/message_pipe.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24
25 namespace mojo {
26 namespace edk {
27 namespace {
28
29 const uint32_t kSizeOfOptions =
30 static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
31
32 // In various places, we have to poll (since, e.g., we can't yet wait for a
33 // certain amount of data to be available). This is the maximum number of
34 // iterations (separated by a short sleep).
35 // TODO(vtl): Get rid of this.
36 const size_t kMaxPoll = 100;
37
38 // Used in Multiprocess test.
39 const size_t kMultiprocessCapacity = 37;
40 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
41 const int kMultiprocessMaxIter = 5;
42
43 class DataPipeTest : public test::MojoTestBase {
44 public:
DataPipeTest()45 DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
46 consumer_(MOJO_HANDLE_INVALID) {}
47
~DataPipeTest()48 ~DataPipeTest() override {
49 if (producer_ != MOJO_HANDLE_INVALID)
50 CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
51 if (consumer_ != MOJO_HANDLE_INVALID)
52 CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
53 }
54
Create(const MojoCreateDataPipeOptions * options)55 MojoResult Create(const MojoCreateDataPipeOptions* options) {
56 return MojoCreateDataPipe(options, &producer_, &consumer_);
57 }
58
WriteData(const void * elements,uint32_t * num_bytes,bool all_or_none=false)59 MojoResult WriteData(const void* elements,
60 uint32_t* num_bytes,
61 bool all_or_none = false) {
62 return MojoWriteData(producer_, elements, num_bytes,
63 all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
64 : MOJO_WRITE_DATA_FLAG_NONE);
65 }
66
ReadData(void * elements,uint32_t * num_bytes,bool all_or_none=false,bool peek=false)67 MojoResult ReadData(void* elements,
68 uint32_t* num_bytes,
69 bool all_or_none = false,
70 bool peek = false) {
71 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
72 if (all_or_none)
73 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
74 if (peek)
75 flags |= MOJO_READ_DATA_FLAG_PEEK;
76 return MojoReadData(consumer_, elements, num_bytes, flags);
77 }
78
QueryData(uint32_t * num_bytes)79 MojoResult QueryData(uint32_t* num_bytes) {
80 return MojoReadData(consumer_, nullptr, num_bytes,
81 MOJO_READ_DATA_FLAG_QUERY);
82 }
83
DiscardData(uint32_t * num_bytes,bool all_or_none=false)84 MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
85 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
86 if (all_or_none)
87 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
88 return MojoReadData(consumer_, nullptr, num_bytes, flags);
89 }
90
BeginReadData(const void ** elements,uint32_t * num_bytes,bool all_or_none=false)91 MojoResult BeginReadData(const void** elements,
92 uint32_t* num_bytes,
93 bool all_or_none = false) {
94 MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
95 if (all_or_none)
96 flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
97 return MojoBeginReadData(consumer_, elements, num_bytes, flags);
98 }
99
EndReadData(uint32_t num_bytes_read)100 MojoResult EndReadData(uint32_t num_bytes_read) {
101 return MojoEndReadData(consumer_, num_bytes_read);
102 }
103
BeginWriteData(void ** elements,uint32_t * num_bytes,bool all_or_none=false)104 MojoResult BeginWriteData(void** elements,
105 uint32_t* num_bytes,
106 bool all_or_none = false) {
107 MojoReadDataFlags flags = MOJO_WRITE_DATA_FLAG_NONE;
108 if (all_or_none)
109 flags |= MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
110 return MojoBeginWriteData(producer_, elements, num_bytes, flags);
111 }
112
EndWriteData(uint32_t num_bytes_written)113 MojoResult EndWriteData(uint32_t num_bytes_written) {
114 return MojoEndWriteData(producer_, num_bytes_written);
115 }
116
CloseProducer()117 MojoResult CloseProducer() {
118 MojoResult rv = MojoClose(producer_);
119 producer_ = MOJO_HANDLE_INVALID;
120 return rv;
121 }
122
CloseConsumer()123 MojoResult CloseConsumer() {
124 MojoResult rv = MojoClose(consumer_);
125 consumer_ = MOJO_HANDLE_INVALID;
126 return rv;
127 }
128
129 MojoHandle producer_, consumer_;
130
131 private:
132 DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
133 };
134
TEST_F(DataPipeTest,Basic)135 TEST_F(DataPipeTest, Basic) {
136 const MojoCreateDataPipeOptions options = {
137 kSizeOfOptions, // |struct_size|.
138 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
139 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
140 1000 * sizeof(int32_t) // |capacity_num_bytes|.
141 };
142
143 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
144
145 // We can write to a data pipe handle immediately.
146 int32_t elements[10] = {};
147 uint32_t num_bytes = 0;
148
149 num_bytes =
150 static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
151
152 elements[0] = 123;
153 elements[1] = 456;
154 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
155 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
156
157 // Now wait for the other side to become readable.
158 MojoHandleSignalsState state;
159 ASSERT_EQ(MOJO_RESULT_OK,
160 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
161 MOJO_DEADLINE_INDEFINITE, &state));
162 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
163
164 elements[0] = -1;
165 elements[1] = -1;
166 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
167 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
168 ASSERT_EQ(elements[0], 123);
169 ASSERT_EQ(elements[1], 456);
170 }
171
172 // Tests creation of data pipes with various (valid) options.
TEST_F(DataPipeTest,CreateAndMaybeTransfer)173 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
174 MojoCreateDataPipeOptions test_options[] = {
175 // Default options.
176 {},
177 // Trivial element size, non-default capacity.
178 {kSizeOfOptions, // |struct_size|.
179 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
180 1, // |element_num_bytes|.
181 1000}, // |capacity_num_bytes|.
182 // Nontrivial element size, non-default capacity.
183 {kSizeOfOptions, // |struct_size|.
184 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
185 4, // |element_num_bytes|.
186 4000}, // |capacity_num_bytes|.
187 // Nontrivial element size, default capacity.
188 {kSizeOfOptions, // |struct_size|.
189 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
190 100, // |element_num_bytes|.
191 0} // |capacity_num_bytes|.
192 };
193 for (size_t i = 0; i < arraysize(test_options); i++) {
194 MojoHandle producer_handle, consumer_handle;
195 MojoCreateDataPipeOptions* options =
196 i ? &test_options[i] : nullptr;
197 ASSERT_EQ(MOJO_RESULT_OK,
198 MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
199 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
200 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
201 }
202 }
203
TEST_F(DataPipeTest,SimpleReadWrite)204 TEST_F(DataPipeTest, SimpleReadWrite) {
205 const MojoCreateDataPipeOptions options = {
206 kSizeOfOptions, // |struct_size|.
207 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
208 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
209 1000 * sizeof(int32_t) // |capacity_num_bytes|.
210 };
211
212 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
213 MojoHandleSignalsState hss;
214
215 int32_t elements[10] = {};
216 uint32_t num_bytes = 0;
217
218 // Try reading; nothing there yet.
219 num_bytes =
220 static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
221 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
222
223 // Query; nothing there yet.
224 num_bytes = 0;
225 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
226 ASSERT_EQ(0u, num_bytes);
227
228 // Discard; nothing there yet.
229 num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
230 ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
231
232 // Read with invalid |num_bytes|.
233 num_bytes = sizeof(elements[0]) + 1;
234 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
235
236 // Write two elements.
237 elements[0] = 123;
238 elements[1] = 456;
239 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
240 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
241 // It should have written everything (even without "all or none").
242 ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
243
244 // Wait.
245 ASSERT_EQ(MOJO_RESULT_OK,
246 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
247 MOJO_DEADLINE_INDEFINITE, &hss));
248 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
249 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
250 hss.satisfiable_signals);
251
252 // Query.
253 // TODO(vtl): It's theoretically possible (though not with the current
254 // implementation/configured limits) that not all the data has arrived yet.
255 // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
256 // or |2 * ...|.)
257 num_bytes = 0;
258 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
259 ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
260
261 // Read one element.
262 elements[0] = -1;
263 elements[1] = -1;
264 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
265 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
266 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
267 ASSERT_EQ(123, elements[0]);
268 ASSERT_EQ(-1, elements[1]);
269
270 // Query.
271 // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
272 // should get 1 here.)
273 num_bytes = 0;
274 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
275 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
276
277 // Peek one element.
278 elements[0] = -1;
279 elements[1] = -1;
280 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
281 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
282 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
283 ASSERT_EQ(456, elements[0]);
284 ASSERT_EQ(-1, elements[1]);
285
286 // Query. Still has 1 element remaining.
287 num_bytes = 0;
288 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
289 ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
290
291 // Try to read two elements, with "all or none".
292 elements[0] = -1;
293 elements[1] = -1;
294 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
295 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
296 ReadData(elements, &num_bytes, true, false));
297 ASSERT_EQ(-1, elements[0]);
298 ASSERT_EQ(-1, elements[1]);
299
300 // Try to read two elements, without "all or none".
301 elements[0] = -1;
302 elements[1] = -1;
303 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
304 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
305 ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
306 ASSERT_EQ(456, elements[0]);
307 ASSERT_EQ(-1, elements[1]);
308
309 // Query.
310 num_bytes = 0;
311 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
312 ASSERT_EQ(0u, num_bytes);
313 }
314
315 // Note: The "basic" waiting tests test that the "wait states" are correct in
316 // various situations; they don't test that waiters are properly awoken on state
317 // changes. (For that, we need to use multiple threads.)
TEST_F(DataPipeTest,BasicProducerWaiting)318 TEST_F(DataPipeTest, BasicProducerWaiting) {
319 // Note: We take advantage of the fact that current for current
320 // implementations capacities are strict maximums. This is not guaranteed by
321 // the API.
322
323 const MojoCreateDataPipeOptions options = {
324 kSizeOfOptions, // |struct_size|.
325 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
326 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
327 2 * sizeof(int32_t) // |capacity_num_bytes|.
328 };
329 Create(&options);
330 MojoHandleSignalsState hss;
331
332 // Never readable.
333 hss = MojoHandleSignalsState();
334 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
335 MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
336 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
337 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
338 hss.satisfiable_signals);
339
340 // Already writable.
341 hss = MojoHandleSignalsState();
342 ASSERT_EQ(MOJO_RESULT_OK,
343 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
344
345 // Write two elements.
346 int32_t elements[2] = {123, 456};
347 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
348 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
349 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
350
351 // Wait for data to become available to the consumer.
352 ASSERT_EQ(MOJO_RESULT_OK,
353 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
354 MOJO_DEADLINE_INDEFINITE, &hss));
355 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
356 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
357 hss.satisfiable_signals);
358
359 // Peek one element.
360 elements[0] = -1;
361 elements[1] = -1;
362 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
363 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
364 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
365 ASSERT_EQ(123, elements[0]);
366 ASSERT_EQ(-1, elements[1]);
367
368 // Read one element.
369 elements[0] = -1;
370 elements[1] = -1;
371 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
372 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
373 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
374 ASSERT_EQ(123, elements[0]);
375 ASSERT_EQ(-1, elements[1]);
376
377 // Try writing, using a two-phase write.
378 void* buffer = nullptr;
379 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
380 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
381 EXPECT_TRUE(buffer);
382 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
383
384 static_cast<int32_t*>(buffer)[0] = 789;
385 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
386 1u * sizeof(elements[0]))));
387
388 // Read one element, using a two-phase read.
389 const void* read_buffer = nullptr;
390 num_bytes = 0u;
391 ASSERT_EQ(MOJO_RESULT_OK,
392 BeginReadData(&read_buffer, &num_bytes, false));
393 EXPECT_TRUE(read_buffer);
394 // The two-phase read should be able to read at least one element.
395 ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
396 ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
397 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
398 1u * sizeof(elements[0]))));
399
400 // Write one element.
401 elements[0] = 123;
402 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
403 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
404 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
405
406 // Close the consumer.
407 CloseConsumer();
408
409 // It should now be never-writable.
410 hss = MojoHandleSignalsState();
411 ASSERT_EQ(MOJO_RESULT_OK,
412 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
413 MOJO_DEADLINE_INDEFINITE, &hss));
414 hss = MojoHandleSignalsState();
415 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
416 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
417 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
418 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
419 }
420
TEST_F(DataPipeTest,PeerClosedProducerWaiting)421 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
422 const MojoCreateDataPipeOptions options = {
423 kSizeOfOptions, // |struct_size|.
424 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
425 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
426 2 * sizeof(int32_t) // |capacity_num_bytes|.
427 };
428 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
429 MojoHandleSignalsState hss;
430
431 // Close the consumer.
432 CloseConsumer();
433
434 // It should be signaled.
435 hss = MojoHandleSignalsState();
436 ASSERT_EQ(MOJO_RESULT_OK,
437 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
438 MOJO_DEADLINE_INDEFINITE, &hss));
439 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
440 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
441 }
442
TEST_F(DataPipeTest,PeerClosedConsumerWaiting)443 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
444 const MojoCreateDataPipeOptions options = {
445 kSizeOfOptions, // |struct_size|.
446 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
447 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
448 2 * sizeof(int32_t) // |capacity_num_bytes|.
449 };
450 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
451 MojoHandleSignalsState hss;
452
453 // Close the producer.
454 CloseProducer();
455
456 // It should be signaled.
457 hss = MojoHandleSignalsState();
458 ASSERT_EQ(MOJO_RESULT_OK,
459 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
460 MOJO_DEADLINE_INDEFINITE, &hss));
461 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
462 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
463 }
464
TEST_F(DataPipeTest,BasicConsumerWaiting)465 TEST_F(DataPipeTest, BasicConsumerWaiting) {
466 const MojoCreateDataPipeOptions options = {
467 kSizeOfOptions, // |struct_size|.
468 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
469 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
470 1000 * sizeof(int32_t) // |capacity_num_bytes|.
471 };
472 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
473 MojoHandleSignalsState hss;
474
475 // Never writable.
476 hss = MojoHandleSignalsState();
477 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
478 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
479 MOJO_DEADLINE_INDEFINITE, &hss));
480 ASSERT_EQ(0u, hss.satisfied_signals);
481 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
482 hss.satisfiable_signals);
483
484 // Write two elements.
485 int32_t elements[2] = {123, 456};
486 uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
487 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
488
489 // Wait for readability.
490 hss = MojoHandleSignalsState();
491 ASSERT_EQ(MOJO_RESULT_OK,
492 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
493 MOJO_DEADLINE_INDEFINITE, &hss));
494 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
495 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
496 hss.satisfiable_signals);
497
498 // Discard one element.
499 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
500 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
501 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
502
503 // Should still be readable.
504 hss = MojoHandleSignalsState();
505 ASSERT_EQ(MOJO_RESULT_OK,
506 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
507 MOJO_DEADLINE_INDEFINITE, &hss));
508 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
509 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
510 hss.satisfiable_signals);
511
512 // Peek one element.
513 elements[0] = -1;
514 elements[1] = -1;
515 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
516 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
517 ASSERT_EQ(456, elements[0]);
518 ASSERT_EQ(-1, elements[1]);
519
520 // Should still be readable.
521 hss = MojoHandleSignalsState();
522 ASSERT_EQ(MOJO_RESULT_OK,
523 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
524 MOJO_DEADLINE_INDEFINITE, &hss));
525 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
526 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
527 hss.satisfiable_signals);
528
529 // Read one element.
530 elements[0] = -1;
531 elements[1] = -1;
532 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
533 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
534 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
535 ASSERT_EQ(456, elements[0]);
536 ASSERT_EQ(-1, elements[1]);
537
538 // Write one element.
539 elements[0] = 789;
540 elements[1] = -1;
541 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
542 ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
543
544 // Waiting should now succeed.
545 hss = MojoHandleSignalsState();
546 ASSERT_EQ(MOJO_RESULT_OK,
547 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
548 MOJO_DEADLINE_INDEFINITE, &hss));
549 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
550 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
551 hss.satisfiable_signals);
552
553 // Close the producer.
554 CloseProducer();
555
556 // Should still be readable.
557 hss = MojoHandleSignalsState();
558 ASSERT_EQ(MOJO_RESULT_OK,
559 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
560 MOJO_DEADLINE_INDEFINITE, &hss));
561 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
562 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
563 hss.satisfiable_signals);
564
565 // Wait for the peer closed signal.
566 hss = MojoHandleSignalsState();
567 ASSERT_EQ(MOJO_RESULT_OK,
568 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
569 MOJO_DEADLINE_INDEFINITE, &hss));
570 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
571 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
572 hss.satisfiable_signals);
573
574 // Read one element.
575 elements[0] = -1;
576 elements[1] = -1;
577 num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
578 ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
579 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
580 ASSERT_EQ(789, elements[0]);
581 ASSERT_EQ(-1, elements[1]);
582
583 // Should be never-readable.
584 hss = MojoHandleSignalsState();
585 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
586 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
587 MOJO_DEADLINE_INDEFINITE, &hss));
588 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
589 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
590 }
591
592 // Test with two-phase APIs and also closing the producer with an active
593 // consumer waiter.
TEST_F(DataPipeTest,ConsumerWaitingTwoPhase)594 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
595 const MojoCreateDataPipeOptions options = {
596 kSizeOfOptions, // |struct_size|.
597 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
598 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
599 1000 * sizeof(int32_t) // |capacity_num_bytes|.
600 };
601 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
602 MojoHandleSignalsState hss;
603
604 // Write two elements.
605 int32_t* elements = nullptr;
606 void* buffer = nullptr;
607 // Request room for three (but we'll only write two).
608 uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
609 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
610 EXPECT_TRUE(buffer);
611 EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
612 elements = static_cast<int32_t*>(buffer);
613 elements[0] = 123;
614 elements[1] = 456;
615 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
616
617 // Wait for readability.
618 hss = MojoHandleSignalsState();
619 ASSERT_EQ(MOJO_RESULT_OK,
620 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
621 MOJO_DEADLINE_INDEFINITE, &hss));
622 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
623 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
624 hss.satisfiable_signals);
625
626 // Read one element.
627 // Request two in all-or-none mode, but only read one.
628 const void* read_buffer = nullptr;
629 num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
630 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
631 EXPECT_TRUE(read_buffer);
632 ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
633 const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
634 ASSERT_EQ(123, read_elements[0]);
635 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
636
637 // Should still be readable.
638 hss = MojoHandleSignalsState();
639 ASSERT_EQ(MOJO_RESULT_OK,
640 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
641 MOJO_DEADLINE_INDEFINITE, &hss));
642 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
643 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
644 hss.satisfiable_signals);
645
646 // Read one element.
647 // Request three, but not in all-or-none mode.
648 read_buffer = nullptr;
649 num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
650 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
651 EXPECT_TRUE(read_buffer);
652 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
653 read_elements = static_cast<const int32_t*>(read_buffer);
654 ASSERT_EQ(456, read_elements[0]);
655 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
656
657 // Close the producer.
658 CloseProducer();
659
660 // Should be never-readable.
661 hss = MojoHandleSignalsState();
662 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
663 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
664 MOJO_DEADLINE_INDEFINITE, &hss));
665 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
666 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
667 }
668
669 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
TEST_F(DataPipeTest,BasicTwoPhaseWaiting)670 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
671 const MojoCreateDataPipeOptions options = {
672 kSizeOfOptions, // |struct_size|.
673 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
674 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
675 1000 * sizeof(int32_t) // |capacity_num_bytes|.
676 };
677 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
678 MojoHandleSignalsState hss;
679
680 // It should be writable.
681 hss = MojoHandleSignalsState();
682 ASSERT_EQ(MOJO_RESULT_OK,
683 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
684 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
685 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
686 hss.satisfiable_signals);
687
688 uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
689 void* write_ptr = nullptr;
690 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
691 EXPECT_TRUE(write_ptr);
692 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
693
694 // At this point, it shouldn't be writable.
695 hss = MojoHandleSignalsState();
696 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
697 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
698 ASSERT_EQ(0u, hss.satisfied_signals);
699 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
700 hss.satisfiable_signals);
701
702 // It shouldn't be readable yet either (we'll wait later).
703 hss = MojoHandleSignalsState();
704 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
705 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
706 ASSERT_EQ(0u, hss.satisfied_signals);
707 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
708 hss.satisfiable_signals);
709
710 static_cast<int32_t*>(write_ptr)[0] = 123;
711 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
712
713 // It should immediately be writable again.
714 hss = MojoHandleSignalsState();
715 ASSERT_EQ(MOJO_RESULT_OK,
716 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
717 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
718 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
719 hss.satisfiable_signals);
720
721 // It should become readable.
722 hss = MojoHandleSignalsState();
723 ASSERT_EQ(MOJO_RESULT_OK,
724 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
725 MOJO_DEADLINE_INDEFINITE, &hss));
726 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
727 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
728 hss.satisfiable_signals);
729
730 // Start another two-phase write and check that it's readable even in the
731 // middle of it.
732 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
733 write_ptr = nullptr;
734 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
735 EXPECT_TRUE(write_ptr);
736 EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
737
738 // It should be readable.
739 hss = MojoHandleSignalsState();
740 ASSERT_EQ(MOJO_RESULT_OK,
741 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
742 MOJO_DEADLINE_INDEFINITE, &hss));
743 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
744 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
745 hss.satisfiable_signals);
746
747 // End the two-phase write without writing anything.
748 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
749
750 // Start a two-phase read.
751 num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
752 const void* read_ptr = nullptr;
753 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
754 EXPECT_TRUE(read_ptr);
755 ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
756
757 // At this point, it should still be writable.
758 hss = MojoHandleSignalsState();
759 ASSERT_EQ(MOJO_RESULT_OK,
760 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
761 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
762 ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
763 hss.satisfiable_signals);
764
765 // But not readable.
766 hss = MojoHandleSignalsState();
767 ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
768 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
769 ASSERT_EQ(0u, hss.satisfied_signals);
770 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
771 hss.satisfiable_signals);
772
773 // End the two-phase read without reading anything.
774 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
775
776 // It should be readable again.
777 hss = MojoHandleSignalsState();
778 ASSERT_EQ(MOJO_RESULT_OK,
779 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
780 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
781 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
782 hss.satisfiable_signals);
783 }
784
Seq(int32_t start,size_t count,int32_t * out)785 void Seq(int32_t start, size_t count, int32_t* out) {
786 for (size_t i = 0; i < count; i++)
787 out[i] = start + static_cast<int32_t>(i);
788 }
789
TEST_F(DataPipeTest,AllOrNone)790 TEST_F(DataPipeTest, AllOrNone) {
791 const MojoCreateDataPipeOptions options = {
792 kSizeOfOptions, // |struct_size|.
793 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
794 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
795 10 * sizeof(int32_t) // |capacity_num_bytes|.
796 };
797 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
798 MojoHandleSignalsState hss;
799
800 // Try writing way too much.
801 uint32_t num_bytes = 20u * sizeof(int32_t);
802 int32_t buffer[100];
803 Seq(0, arraysize(buffer), buffer);
804 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
805
806 // Should still be empty.
807 num_bytes = ~0u;
808 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
809 ASSERT_EQ(0u, num_bytes);
810
811 // Write some data.
812 num_bytes = 5u * sizeof(int32_t);
813 Seq(100, arraysize(buffer), buffer);
814 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
815 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
816
817 // Wait for data.
818 // TODO(vtl): There's no real guarantee that all the data will become
819 // available at once (except that in current implementations, with reasonable
820 // limits, it will). Eventually, we'll be able to wait for a specified amount
821 // of data to become available.
822 hss = MojoHandleSignalsState();
823 ASSERT_EQ(MOJO_RESULT_OK,
824 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
825 MOJO_DEADLINE_INDEFINITE, &hss));
826 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
827 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
828 hss.satisfiable_signals);
829
830 // Half full.
831 num_bytes = 0u;
832 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
833 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
834
835 /* TODO(jam): enable if we end up observing max capacity
836 // Too much.
837 num_bytes = 6u * sizeof(int32_t);
838 Seq(200, arraysize(buffer), buffer);
839 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
840 */
841
842 // Try reading too much.
843 num_bytes = 11u * sizeof(int32_t);
844 memset(buffer, 0xab, sizeof(buffer));
845 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
846 int32_t expected_buffer[100];
847 memset(expected_buffer, 0xab, sizeof(expected_buffer));
848 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
849
850 // Try discarding too much.
851 num_bytes = 11u * sizeof(int32_t);
852 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
853
854 // Just a little.
855 num_bytes = 2u * sizeof(int32_t);
856 Seq(300, arraysize(buffer), buffer);
857 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
858 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
859
860 // Just right.
861 num_bytes = 3u * sizeof(int32_t);
862 Seq(400, arraysize(buffer), buffer);
863 ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
864 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
865
866 // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
867 // specified amount of data to be available, so poll.
868 for (size_t i = 0; i < kMaxPoll; i++) {
869 num_bytes = 0u;
870 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
871 if (num_bytes >= 10u * sizeof(int32_t))
872 break;
873
874 test::Sleep(test::EpsilonDeadline());
875 }
876 ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
877
878 // Read half.
879 num_bytes = 5u * sizeof(int32_t);
880 memset(buffer, 0xab, sizeof(buffer));
881 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
882 ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
883 memset(expected_buffer, 0xab, sizeof(expected_buffer));
884 Seq(100, 5, expected_buffer);
885 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
886
887 // Try reading too much again.
888 num_bytes = 6u * sizeof(int32_t);
889 memset(buffer, 0xab, sizeof(buffer));
890 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
891 memset(expected_buffer, 0xab, sizeof(expected_buffer));
892 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
893
894 // Try discarding too much again.
895 num_bytes = 6u * sizeof(int32_t);
896 ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
897
898 // Discard a little.
899 num_bytes = 2u * sizeof(int32_t);
900 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
901 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
902
903 // Three left.
904 num_bytes = 0u;
905 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
906 ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
907
908 // Close the producer, then test producer-closed cases.
909 CloseProducer();
910
911 // Wait.
912 hss = MojoHandleSignalsState();
913 ASSERT_EQ(MOJO_RESULT_OK,
914 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
915 MOJO_DEADLINE_INDEFINITE, &hss));
916 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
917 hss.satisfied_signals);
918 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
919 hss.satisfiable_signals);
920
921 // Try reading too much; "failed precondition" since the producer is closed.
922 num_bytes = 4u * sizeof(int32_t);
923 memset(buffer, 0xab, sizeof(buffer));
924 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
925 ReadData(buffer, &num_bytes, true));
926 memset(expected_buffer, 0xab, sizeof(expected_buffer));
927 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
928
929 // Try discarding too much; "failed precondition" again.
930 num_bytes = 4u * sizeof(int32_t);
931 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
932
933 // Read a little.
934 num_bytes = 2u * sizeof(int32_t);
935 memset(buffer, 0xab, sizeof(buffer));
936 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
937 ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
938 memset(expected_buffer, 0xab, sizeof(expected_buffer));
939 Seq(400, 2, expected_buffer);
940 ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
941
942 // Discard the remaining element.
943 num_bytes = 1u * sizeof(int32_t);
944 ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
945 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
946
947 // Empty again.
948 num_bytes = ~0u;
949 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
950 ASSERT_EQ(0u, num_bytes);
951 }
952
953 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
954 // respectively, as much as possible, even if it may have to "wrap around" the
955 // internal circular buffer. (Note that the two-phase write and read need not do
956 // this.)
TEST_F(DataPipeTest,WrapAround)957 TEST_F(DataPipeTest, WrapAround) {
958 unsigned char test_data[1000];
959 for (size_t i = 0; i < arraysize(test_data); i++)
960 test_data[i] = static_cast<unsigned char>(i);
961
962 const MojoCreateDataPipeOptions options = {
963 kSizeOfOptions, // |struct_size|.
964 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
965 1u, // |element_num_bytes|.
966 100u // |capacity_num_bytes|.
967 };
968
969 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
970 MojoHandleSignalsState hss;
971
972 // Write 20 bytes.
973 uint32_t num_bytes = 20u;
974 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
975 ASSERT_EQ(20u, num_bytes);
976
977 // Wait for data.
978 ASSERT_EQ(MOJO_RESULT_OK,
979 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
980 MOJO_DEADLINE_INDEFINITE, &hss));
981 ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
982 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
983 hss.satisfiable_signals);
984
985 // Read 10 bytes.
986 unsigned char read_buffer[1000] = {0};
987 num_bytes = 10u;
988 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
989 ASSERT_EQ(10u, num_bytes);
990 ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
991
992 // Check that a two-phase write can now only write (at most) 80 bytes. (This
993 // checks an implementation detail; this behavior is not guaranteed.)
994 void* write_buffer_ptr = nullptr;
995 num_bytes = 0u;
996 ASSERT_EQ(MOJO_RESULT_OK,
997 BeginWriteData(&write_buffer_ptr, &num_bytes, false));
998 EXPECT_TRUE(write_buffer_ptr);
999 ASSERT_EQ(80u, num_bytes);
1000 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
1001
1002 size_t total_num_bytes = 0;
1003 while (total_num_bytes < 90) {
1004 // Wait to write.
1005 ASSERT_EQ(MOJO_RESULT_OK,
1006 MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE,
1007 MOJO_DEADLINE_INDEFINITE, &hss));
1008 ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
1009 ASSERT_EQ(hss.satisfiable_signals,
1010 MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
1011
1012 // Write as much as we can.
1013 num_bytes = 100;
1014 ASSERT_EQ(MOJO_RESULT_OK,
1015 WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
1016 total_num_bytes += num_bytes;
1017 }
1018
1019 ASSERT_EQ(90u, total_num_bytes);
1020
1021 num_bytes = 0;
1022 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1023 ASSERT_EQ(100u, num_bytes);
1024
1025 // Check that a two-phase read can now only read (at most) 90 bytes. (This
1026 // checks an implementation detail; this behavior is not guaranteed.)
1027 const void* read_buffer_ptr = nullptr;
1028 num_bytes = 0;
1029 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes, false));
1030 EXPECT_TRUE(read_buffer_ptr);
1031 ASSERT_EQ(90u, num_bytes);
1032 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
1033
1034 // Read as much as possible. We should read 100 bytes.
1035 num_bytes = static_cast<uint32_t>(arraysize(read_buffer) *
1036 sizeof(read_buffer[0]));
1037 memset(read_buffer, 0, num_bytes);
1038 ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
1039 ASSERT_EQ(100u, num_bytes);
1040 ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1041 }
1042
1043 // Tests the behavior of writing (simple and two-phase), closing the producer,
1044 // then reading (simple and two-phase).
TEST_F(DataPipeTest,WriteCloseProducerRead)1045 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1046 const char kTestData[] = "hello world";
1047 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1048
1049 const MojoCreateDataPipeOptions options = {
1050 kSizeOfOptions, // |struct_size|.
1051 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1052 1u, // |element_num_bytes|.
1053 1000u // |capacity_num_bytes|.
1054 };
1055 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1056
1057 // Write some data, so we'll have something to read.
1058 uint32_t num_bytes = kTestDataSize;
1059 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1060 ASSERT_EQ(kTestDataSize, num_bytes);
1061
1062 // Write it again, so we'll have something left over.
1063 num_bytes = kTestDataSize;
1064 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1065 ASSERT_EQ(kTestDataSize, num_bytes);
1066
1067 // Start two-phase write.
1068 void* write_buffer_ptr = nullptr;
1069 num_bytes = 0u;
1070 ASSERT_EQ(MOJO_RESULT_OK,
1071 BeginWriteData(&write_buffer_ptr, &num_bytes, false));
1072 EXPECT_TRUE(write_buffer_ptr);
1073 EXPECT_GT(num_bytes, 0u);
1074
1075 // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1076 for (size_t i = 0; i < kMaxPoll; i++) {
1077 num_bytes = 0u;
1078 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1079 if (num_bytes >= 2u * kTestDataSize)
1080 break;
1081
1082 test::Sleep(test::EpsilonDeadline());
1083 }
1084 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1085
1086 // Start two-phase read.
1087 const void* read_buffer_ptr = nullptr;
1088 num_bytes = 0u;
1089 ASSERT_EQ(MOJO_RESULT_OK,
1090 BeginReadData(&read_buffer_ptr, &num_bytes));
1091 EXPECT_TRUE(read_buffer_ptr);
1092 ASSERT_EQ(2u * kTestDataSize, num_bytes);
1093
1094 // Close the producer.
1095 CloseProducer();
1096
1097 // The consumer can finish its two-phase read.
1098 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1099 ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1100
1101 // And start another.
1102 read_buffer_ptr = nullptr;
1103 num_bytes = 0u;
1104 ASSERT_EQ(MOJO_RESULT_OK,
1105 BeginReadData(&read_buffer_ptr, &num_bytes));
1106 EXPECT_TRUE(read_buffer_ptr);
1107 ASSERT_EQ(kTestDataSize, num_bytes);
1108 }
1109
1110
1111 // Tests the behavior of interrupting a two-phase read and write by closing the
1112 // consumer.
TEST_F(DataPipeTest,TwoPhaseWriteReadCloseConsumer)1113 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1114 const char kTestData[] = "hello world";
1115 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1116
1117 const MojoCreateDataPipeOptions options = {
1118 kSizeOfOptions, // |struct_size|.
1119 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1120 1u, // |element_num_bytes|.
1121 1000u // |capacity_num_bytes|.
1122 };
1123 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1124 MojoHandleSignalsState hss;
1125
1126 // Write some data, so we'll have something to read.
1127 uint32_t num_bytes = kTestDataSize;
1128 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1129 ASSERT_EQ(kTestDataSize, num_bytes);
1130
1131 // Start two-phase write.
1132 void* write_buffer_ptr = nullptr;
1133 num_bytes = 0u;
1134 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1135 EXPECT_TRUE(write_buffer_ptr);
1136 ASSERT_GT(num_bytes, kTestDataSize);
1137
1138 // Wait for data.
1139 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1140 hss = MojoHandleSignalsState();
1141 ASSERT_EQ(MOJO_RESULT_OK,
1142 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1143 MOJO_DEADLINE_INDEFINITE, &hss));
1144 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1145 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1146 hss.satisfiable_signals);
1147
1148 // Start two-phase read.
1149 const void* read_buffer_ptr = nullptr;
1150 num_bytes = 0u;
1151 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1152 EXPECT_TRUE(read_buffer_ptr);
1153 ASSERT_EQ(kTestDataSize, num_bytes);
1154
1155 // Close the consumer.
1156 CloseConsumer();
1157
1158 // Wait for producer to know that the consumer is closed.
1159 hss = MojoHandleSignalsState();
1160 ASSERT_EQ(MOJO_RESULT_OK,
1161 MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1162 MOJO_DEADLINE_INDEFINITE, &hss));
1163 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1164 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1165
1166 // Actually write some data. (Note: Premature freeing of the buffer would
1167 // probably only be detected under ASAN or similar.)
1168 memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1169 // Note: Even though the consumer has been closed, ending the two-phase
1170 // write will report success.
1171 ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1172
1173 // But trying to write should result in failure.
1174 num_bytes = kTestDataSize;
1175 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1176
1177 // As will trying to start another two-phase write.
1178 write_buffer_ptr = nullptr;
1179 num_bytes = 0u;
1180 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1181 BeginWriteData(&write_buffer_ptr, &num_bytes));
1182 }
1183
1184 // Tests the behavior of "interrupting" a two-phase write by closing both the
1185 // producer and the consumer.
TEST_F(DataPipeTest,TwoPhaseWriteCloseBoth)1186 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1187 const uint32_t kTestDataSize = 15u;
1188
1189 const MojoCreateDataPipeOptions options = {
1190 kSizeOfOptions, // |struct_size|.
1191 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1192 1u, // |element_num_bytes|.
1193 1000u // |capacity_num_bytes|.
1194 };
1195 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1196
1197 // Start two-phase write.
1198 void* write_buffer_ptr = nullptr;
1199 uint32_t num_bytes = 0u;
1200 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1201 EXPECT_TRUE(write_buffer_ptr);
1202 ASSERT_GT(num_bytes, kTestDataSize);
1203 }
1204
1205 // Tests the behavior of writing, closing the producer, and then reading (with
1206 // and without data remaining).
TEST_F(DataPipeTest,WriteCloseProducerReadNoData)1207 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1208 const char kTestData[] = "hello world";
1209 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1210
1211 const MojoCreateDataPipeOptions options = {
1212 kSizeOfOptions, // |struct_size|.
1213 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1214 1u, // |element_num_bytes|.
1215 1000u // |capacity_num_bytes|.
1216 };
1217 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1218 MojoHandleSignalsState hss;
1219
1220 // Write some data, so we'll have something to read.
1221 uint32_t num_bytes = kTestDataSize;
1222 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1223 ASSERT_EQ(kTestDataSize, num_bytes);
1224
1225 // Close the producer.
1226 CloseProducer();
1227
1228 // Wait. (Note that once the consumer knows that the producer is closed, it
1229 // must also know about all the data that was sent.)
1230 hss = MojoHandleSignalsState();
1231 ASSERT_EQ(MOJO_RESULT_OK,
1232 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1233 MOJO_DEADLINE_INDEFINITE, &hss));
1234 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1235 hss.satisfied_signals);
1236 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1237 hss.satisfiable_signals);
1238
1239 // Peek that data.
1240 char buffer[1000];
1241 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1242 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1243 ASSERT_EQ(kTestDataSize, num_bytes);
1244 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1245
1246 // Read that data.
1247 memset(buffer, 0, 1000);
1248 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1249 ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1250 ASSERT_EQ(kTestDataSize, num_bytes);
1251 ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1252
1253 // A second read should fail.
1254 num_bytes = static_cast<uint32_t>(sizeof(buffer));
1255 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1256
1257 // A two-phase read should also fail.
1258 const void* read_buffer_ptr = nullptr;
1259 num_bytes = 0u;
1260 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1261 BeginReadData(&read_buffer_ptr, &num_bytes));
1262
1263 // Ditto for discard.
1264 num_bytes = 10u;
1265 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1266 }
1267
1268 // Test that during a two phase read the memory stays valid even if more data
1269 // comes in.
TEST_F(DataPipeTest,TwoPhaseReadMemoryStable)1270 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
1271 const char kTestData[] = "hello world";
1272 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1273
1274 const MojoCreateDataPipeOptions options = {
1275 kSizeOfOptions, // |struct_size|.
1276 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1277 1u, // |element_num_bytes|.
1278 1000u // |capacity_num_bytes|.
1279 };
1280 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1281 MojoHandleSignalsState hss;
1282
1283 // Write some data.
1284 uint32_t num_bytes = kTestDataSize;
1285 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1286 ASSERT_EQ(kTestDataSize, num_bytes);
1287
1288 // Wait for the data.
1289 hss = MojoHandleSignalsState();
1290 ASSERT_EQ(MOJO_RESULT_OK,
1291 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1292 MOJO_DEADLINE_INDEFINITE, &hss));
1293 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1294 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1295 hss.satisfiable_signals);
1296
1297 // Begin a two-phase read.
1298 const void* read_buffer_ptr = nullptr;
1299 uint32_t read_buffer_size = 0u;
1300 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
1301
1302 // Write more data.
1303 const char kExtraData[] = "bye world";
1304 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1305 num_bytes = kExtraDataSize;
1306 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1307 ASSERT_EQ(kExtraDataSize, num_bytes);
1308
1309 // Close the producer.
1310 CloseProducer();
1311
1312 // Wait. (Note that once the consumer knows that the producer is closed, it
1313 // must also have received the extra data).
1314 hss = MojoHandleSignalsState();
1315 ASSERT_EQ(MOJO_RESULT_OK,
1316 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1317 MOJO_DEADLINE_INDEFINITE, &hss));
1318 ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1319 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1320 hss.satisfiable_signals);
1321
1322 // Read the two phase memory to check it's still valid.
1323 ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1324 EndReadData(read_buffer_size);
1325 }
1326
1327 // Test that two-phase reads/writes behave correctly when given invalid
1328 // arguments.
TEST_F(DataPipeTest,TwoPhaseMoreInvalidArguments)1329 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1330 const MojoCreateDataPipeOptions options = {
1331 kSizeOfOptions, // |struct_size|.
1332 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1333 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1334 10 * sizeof(int32_t) // |capacity_num_bytes|.
1335 };
1336 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1337 MojoHandleSignalsState hss;
1338
1339 // No data.
1340 uint32_t num_bytes = 1000u;
1341 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1342 ASSERT_EQ(0u, num_bytes);
1343
1344 // Try "ending" a two-phase write when one isn't active.
1345 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1346 EndWriteData(1u * sizeof(int32_t)));
1347
1348 // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1349 // have time to propagate.
1350 test::Sleep(test::EpsilonDeadline());
1351
1352 // Still no data.
1353 num_bytes = 1000u;
1354 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1355 ASSERT_EQ(0u, num_bytes);
1356
1357 // Try ending a two-phase write with an invalid amount (too much).
1358 num_bytes = 0u;
1359 void* write_ptr = nullptr;
1360 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1361 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1362 EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1363
1364 // But the two-phase write still ended.
1365 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1366
1367 // Wait a bit (as above).
1368 test::Sleep(test::EpsilonDeadline());
1369
1370 // Still no data.
1371 num_bytes = 1000u;
1372 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1373 ASSERT_EQ(0u, num_bytes);
1374
1375 // Try ending a two-phase write with an invalid amount (not a multiple of the
1376 // element size).
1377 num_bytes = 0u;
1378 write_ptr = nullptr;
1379 ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1380 EXPECT_GE(num_bytes, 1u);
1381 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1382
1383 // But the two-phase write still ended.
1384 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1385
1386 // Wait a bit (as above).
1387 test::Sleep(test::EpsilonDeadline());
1388
1389 // Still no data.
1390 num_bytes = 1000u;
1391 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1392 ASSERT_EQ(0u, num_bytes);
1393
1394 // Now write some data, so we'll be able to try reading.
1395 int32_t element = 123;
1396 num_bytes = 1u * sizeof(int32_t);
1397 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1398
1399 // Wait for data.
1400 // TODO(vtl): (See corresponding TODO in AllOrNone.)
1401 hss = MojoHandleSignalsState();
1402 ASSERT_EQ(MOJO_RESULT_OK,
1403 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1404 MOJO_DEADLINE_INDEFINITE, &hss));
1405 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1406 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1407 hss.satisfiable_signals);
1408
1409 // One element available.
1410 num_bytes = 0u;
1411 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1412 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1413
1414 // Try "ending" a two-phase read when one isn't active.
1415 ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1416
1417 // Still one element available.
1418 num_bytes = 0u;
1419 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1420 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1421
1422 // Try ending a two-phase read with an invalid amount (too much).
1423 num_bytes = 0u;
1424 const void* read_ptr = nullptr;
1425 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1426 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1427 EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1428
1429 // Still one element available.
1430 num_bytes = 0u;
1431 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1432 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1433
1434 // Try ending a two-phase read with an invalid amount (not a multiple of the
1435 // element size).
1436 num_bytes = 0u;
1437 read_ptr = nullptr;
1438 ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1439 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1440 ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1441 ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1442
1443 // Still one element available.
1444 num_bytes = 0u;
1445 ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1446 ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1447 }
1448
1449 // Test that a producer can be sent over a MP.
TEST_F(DataPipeTest,SendProducer)1450 TEST_F(DataPipeTest, SendProducer) {
1451 const char kTestData[] = "hello world";
1452 const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1453
1454 const MojoCreateDataPipeOptions options = {
1455 kSizeOfOptions, // |struct_size|.
1456 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1457 1u, // |element_num_bytes|.
1458 1000u // |capacity_num_bytes|.
1459 };
1460 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1461 MojoHandleSignalsState hss;
1462
1463 // Write some data.
1464 uint32_t num_bytes = kTestDataSize;
1465 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1466 ASSERT_EQ(kTestDataSize, num_bytes);
1467
1468 // Wait for the data.
1469 hss = MojoHandleSignalsState();
1470 ASSERT_EQ(MOJO_RESULT_OK,
1471 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1472 MOJO_DEADLINE_INDEFINITE, &hss));
1473 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1474 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1475 hss.satisfiable_signals);
1476
1477 // Check the data.
1478 const void* read_buffer = nullptr;
1479 num_bytes = 0u;
1480 ASSERT_EQ(MOJO_RESULT_OK,
1481 BeginReadData(&read_buffer, &num_bytes, false));
1482 ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
1483 EndReadData(num_bytes);
1484
1485 // Now send the producer over a MP so that it's serialized.
1486 MojoHandle pipe0, pipe1;
1487 ASSERT_EQ(MOJO_RESULT_OK,
1488 MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1489
1490 ASSERT_EQ(MOJO_RESULT_OK,
1491 MojoWriteMessage(pipe0, nullptr, 0, &producer_, 1,
1492 MOJO_WRITE_MESSAGE_FLAG_NONE));
1493 producer_ = MOJO_HANDLE_INVALID;
1494 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
1495 MOJO_DEADLINE_INDEFINITE, &hss));
1496 uint32_t num_handles = 1;
1497 ASSERT_EQ(MOJO_RESULT_OK,
1498 MojoReadMessage(pipe1, nullptr, 0, &producer_, &num_handles,
1499 MOJO_READ_MESSAGE_FLAG_NONE));
1500 ASSERT_EQ(num_handles, 1u);
1501
1502 // Write more data.
1503 const char kExtraData[] = "bye world";
1504 const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1505 num_bytes = kExtraDataSize;
1506 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1507 ASSERT_EQ(kExtraDataSize, num_bytes);
1508
1509 // Wait for it.
1510 hss = MojoHandleSignalsState();
1511 ASSERT_EQ(MOJO_RESULT_OK,
1512 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1513 MOJO_DEADLINE_INDEFINITE, &hss));
1514 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1515 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1516 hss.satisfiable_signals);
1517
1518 // Check the second write.
1519 num_bytes = 0u;
1520 ASSERT_EQ(MOJO_RESULT_OK,
1521 BeginReadData(&read_buffer, &num_bytes, false));
1522 ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
1523 EndReadData(num_bytes);
1524
1525 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1526 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1527 }
1528
1529 // Ensures that if a data pipe consumer whose producer has closed is passed over
1530 // a message pipe, the deserialized dispatcher is also marked as having a closed
1531 // peer.
TEST_F(DataPipeTest,ConsumerWithClosedProducerSent)1532 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
1533 const MojoCreateDataPipeOptions options = {
1534 kSizeOfOptions, // |struct_size|.
1535 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1536 static_cast<uint32_t>(sizeof(int32_t)), // |element_num_bytes|.
1537 1000 * sizeof(int32_t) // |capacity_num_bytes|.
1538 };
1539
1540 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1541
1542 // We can write to a data pipe handle immediately.
1543 int32_t data = 123;
1544 uint32_t num_bytes = sizeof(data);
1545 ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
1546 ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
1547
1548 // Now wait for the other side to become readable and to see the peer closed.
1549 MojoHandleSignalsState state;
1550 ASSERT_EQ(MOJO_RESULT_OK,
1551 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1552 MOJO_DEADLINE_INDEFINITE, &state));
1553 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1554 state.satisfied_signals);
1555 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1556 state.satisfiable_signals);
1557
1558 // Now send the consumer over a MP so that it's serialized.
1559 MojoHandle pipe0, pipe1;
1560 ASSERT_EQ(MOJO_RESULT_OK,
1561 MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1562
1563 ASSERT_EQ(MOJO_RESULT_OK,
1564 MojoWriteMessage(pipe0, nullptr, 0, &consumer_, 1,
1565 MOJO_WRITE_MESSAGE_FLAG_NONE));
1566 consumer_ = MOJO_HANDLE_INVALID;
1567 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
1568 MOJO_DEADLINE_INDEFINITE, &state));
1569 uint32_t num_handles = 1;
1570 ASSERT_EQ(MOJO_RESULT_OK,
1571 MojoReadMessage(pipe1, nullptr, 0, &consumer_, &num_handles,
1572 MOJO_READ_MESSAGE_FLAG_NONE));
1573 ASSERT_EQ(num_handles, 1u);
1574
1575 ASSERT_EQ(MOJO_RESULT_OK,
1576 MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1577 MOJO_DEADLINE_INDEFINITE, &state));
1578 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1579 state.satisfied_signals);
1580 ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1581 state.satisfiable_signals);
1582
1583 int32_t read_data;
1584 ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
1585 ASSERT_EQ(sizeof(read_data), num_bytes);
1586 ASSERT_EQ(data, read_data);
1587
1588 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1589 ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1590 }
1591
WriteAllData(MojoHandle producer,const void * elements,uint32_t num_bytes)1592 bool WriteAllData(MojoHandle producer,
1593 const void* elements,
1594 uint32_t num_bytes) {
1595 for (size_t i = 0; i < kMaxPoll; i++) {
1596 // Write as much data as we can.
1597 uint32_t write_bytes = num_bytes;
1598 MojoResult result = MojoWriteData(producer, elements, &write_bytes,
1599 MOJO_WRITE_DATA_FLAG_NONE);
1600 if (result == MOJO_RESULT_OK) {
1601 num_bytes -= write_bytes;
1602 elements = static_cast<const uint8_t*>(elements) + write_bytes;
1603 if (num_bytes == 0)
1604 return true;
1605 } else {
1606 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1607 }
1608
1609 MojoHandleSignalsState hss = MojoHandleSignalsState();
1610 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE,
1611 MOJO_DEADLINE_INDEFINITE, &hss));
1612 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
1613 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1614 hss.satisfiable_signals);
1615 }
1616
1617 return false;
1618 }
1619
1620 // If |expect_empty| is true, expect |consumer| to be empty after reading.
ReadAllData(MojoHandle consumer,void * elements,uint32_t num_bytes,bool expect_empty)1621 bool ReadAllData(MojoHandle consumer,
1622 void* elements,
1623 uint32_t num_bytes,
1624 bool expect_empty) {
1625 for (size_t i = 0; i < kMaxPoll; i++) {
1626 // Read as much data as we can.
1627 uint32_t read_bytes = num_bytes;
1628 MojoResult result =
1629 MojoReadData(consumer, elements, &read_bytes, MOJO_READ_DATA_FLAG_NONE);
1630 if (result == MOJO_RESULT_OK) {
1631 num_bytes -= read_bytes;
1632 elements = static_cast<uint8_t*>(elements) + read_bytes;
1633 if (num_bytes == 0) {
1634 if (expect_empty) {
1635 // Expect no more data.
1636 test::Sleep(test::TinyDeadline());
1637 MojoReadData(consumer, nullptr, &num_bytes,
1638 MOJO_READ_DATA_FLAG_QUERY);
1639 EXPECT_EQ(0u, num_bytes);
1640 }
1641 return true;
1642 }
1643 } else {
1644 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1645 }
1646
1647 MojoHandleSignalsState hss = MojoHandleSignalsState();
1648 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE,
1649 MOJO_DEADLINE_INDEFINITE, &hss));
1650 // Peer could have become closed while we're still waiting for data.
1651 EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
1652 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1653 hss.satisfiable_signals);
1654 }
1655
1656 return num_bytes == 0;
1657 }
1658
1659 #if !defined(OS_IOS)
1660
TEST_F(DataPipeTest,Multiprocess)1661 TEST_F(DataPipeTest, Multiprocess) {
1662 const uint32_t kTestDataSize =
1663 static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1664 const MojoCreateDataPipeOptions options = {
1665 kSizeOfOptions, // |struct_size|.
1666 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1667 1, // |element_num_bytes|.
1668 kMultiprocessCapacity // |capacity_num_bytes|.
1669 };
1670 ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1671
1672 RUN_CHILD_ON_PIPE(MultiprocessClient, server_mp)
1673 // Send some data before serialising and sending the data pipe over.
1674 // This is the first write so we don't need to use WriteAllData.
1675 uint32_t num_bytes = kTestDataSize;
1676 ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
1677 MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
1678 ASSERT_EQ(kTestDataSize, num_bytes);
1679
1680 // Send child process the data pipe.
1681 ASSERT_EQ(MOJO_RESULT_OK,
1682 MojoWriteMessage(server_mp, nullptr, 0, &consumer_, 1,
1683 MOJO_WRITE_MESSAGE_FLAG_NONE));
1684
1685 // Send a bunch of data of varying sizes.
1686 uint8_t buffer[100];
1687 int seq = 0;
1688 for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1689 for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
1690 for (unsigned int j = 0; j < size; ++j)
1691 buffer[j] = seq + j;
1692 EXPECT_TRUE(WriteAllData(producer_, buffer, size));
1693 seq += size;
1694 }
1695 }
1696
1697 // Write the test string in again.
1698 ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
1699
1700 // Swap ends.
1701 ASSERT_EQ(MOJO_RESULT_OK,
1702 MojoWriteMessage(server_mp, nullptr, 0, &producer_, 1,
1703 MOJO_WRITE_MESSAGE_FLAG_NONE));
1704
1705 // Receive the consumer from the other side.
1706 producer_ = MOJO_HANDLE_INVALID;
1707 MojoHandleSignalsState hss = MojoHandleSignalsState();
1708 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(server_mp, MOJO_HANDLE_SIGNAL_READABLE,
1709 MOJO_DEADLINE_INDEFINITE, &hss));
1710 MojoHandle handles[2];
1711 uint32_t num_handles = arraysize(handles);
1712 ASSERT_EQ(MOJO_RESULT_OK,
1713 MojoReadMessage(server_mp, nullptr, 0, handles, &num_handles,
1714 MOJO_READ_MESSAGE_FLAG_NONE));
1715 ASSERT_EQ(1u, num_handles);
1716 consumer_ = handles[0];
1717
1718 // Read the test string twice. Once for when we sent it, and once for the
1719 // other end sending it.
1720 for (int i = 0; i < 2; ++i) {
1721 EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
1722 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1723 }
1724
1725 WriteMessage(server_mp, "quit");
1726
1727 // Don't have to close the consumer here because it will be done for us.
1728 END_CHILD()
1729 }
1730
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient,DataPipeTest,client_mp)1731 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
1732 const uint32_t kTestDataSize =
1733 static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1734
1735 // Receive the data pipe from the other side.
1736 MojoHandle consumer = MOJO_HANDLE_INVALID;
1737 MojoHandleSignalsState hss = MojoHandleSignalsState();
1738 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
1739 MOJO_DEADLINE_INDEFINITE, &hss));
1740 MojoHandle handles[2];
1741 uint32_t num_handles = arraysize(handles);
1742 ASSERT_EQ(MOJO_RESULT_OK,
1743 MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
1744 MOJO_READ_MESSAGE_FLAG_NONE));
1745 ASSERT_EQ(1u, num_handles);
1746 consumer = handles[0];
1747
1748 // Read the initial string that was sent.
1749 int32_t buffer[100];
1750 EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
1751 EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1752
1753 // Receive the main data and check it is correct.
1754 int seq = 0;
1755 uint8_t expected_buffer[100];
1756 for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1757 for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
1758 for (unsigned int j = 0; j < size; ++j)
1759 expected_buffer[j] = seq + j;
1760 EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
1761 EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
1762
1763 seq += size;
1764 }
1765 }
1766
1767 // Swap ends.
1768 ASSERT_EQ(MOJO_RESULT_OK, MojoWriteMessage(client_mp, nullptr, 0, &consumer,
1769 1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1770
1771 // Receive the producer from the other side.
1772 MojoHandle producer = MOJO_HANDLE_INVALID;
1773 hss = MojoHandleSignalsState();
1774 ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
1775 MOJO_DEADLINE_INDEFINITE, &hss));
1776 num_handles = arraysize(handles);
1777 ASSERT_EQ(MOJO_RESULT_OK,
1778 MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
1779 MOJO_READ_MESSAGE_FLAG_NONE));
1780 ASSERT_EQ(1u, num_handles);
1781 producer = handles[0];
1782
1783 // Write the test string one more time.
1784 EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
1785
1786 // We swapped ends, so close the producer.
1787 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
1788
1789 // Wait to receive a "quit" message before exiting.
1790 EXPECT_EQ("quit", ReadMessage(client_mp));
1791 }
1792
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer,DataPipeTest,h)1793 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
1794 MojoHandle p;
1795 std::string message = ReadMessageWithHandles(h, &p, 1);
1796
1797 // Write some data to the producer and close it.
1798 uint32_t num_bytes = static_cast<uint32_t>(message.size());
1799 EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, message.data(), &num_bytes,
1800 MOJO_WRITE_DATA_FLAG_NONE));
1801 EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
1802
1803 // Close the producer before quitting.
1804 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1805
1806 // Wait for a quit message.
1807 EXPECT_EQ("quit", ReadMessage(h));
1808 }
1809
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer,DataPipeTest,h)1810 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
1811 MojoHandle c;
1812 std::string expected_message = ReadMessageWithHandles(h, &c, 1);
1813
1814 // Wait for the consumer to become readable.
1815 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
1816 MOJO_DEADLINE_INDEFINITE, nullptr));
1817
1818 // Drain the consumer and expect to find the given message.
1819 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1820 std::vector<char> bytes(expected_message.size());
1821 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
1822 MOJO_READ_DATA_FLAG_NONE));
1823 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1824
1825 std::string message(bytes.data(), bytes.size());
1826 EXPECT_EQ(expected_message, message);
1827
1828 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1829
1830 // Wait for a quit message.
1831 EXPECT_EQ("quit", ReadMessage(h));
1832 }
1833
TEST_F(DataPipeTest,SendConsumerAndCloseProducer)1834 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
1835 // Create a new data pipe.
1836 MojoHandle p, c;
1837 EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p ,&c));
1838
1839 RUN_CHILD_ON_PIPE(WriteAndCloseProducer, producer_client)
1840 RUN_CHILD_ON_PIPE(ReadAndCloseConsumer, consumer_client)
1841 const std::string kMessage = "Hello, world!";
1842 WriteMessageWithHandles(producer_client, kMessage, &p, 1);
1843 WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
1844
1845 WriteMessage(consumer_client, "quit");
1846 END_CHILD()
1847
1848 WriteMessage(producer_client, "quit");
1849 END_CHILD()
1850 }
1851
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite,DataPipeTest,h)1852 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
1853 const MojoCreateDataPipeOptions options = {
1854 kSizeOfOptions, // |struct_size|.
1855 MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, // |flags|.
1856 1, // |element_num_bytes|.
1857 kMultiprocessCapacity // |capacity_num_bytes|.
1858 };
1859
1860 MojoHandle p, c;
1861 ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
1862
1863 const std::string kMessage = "Hello, world!";
1864 WriteMessageWithHandles(h, kMessage, &c, 1);
1865
1866 // Write some data to the producer and close it.
1867 uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
1868 EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, kMessage.data(), &num_bytes,
1869 MOJO_WRITE_DATA_FLAG_NONE));
1870 EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
1871 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1872
1873 // Wait for a quit message.
1874 EXPECT_EQ("quit", ReadMessage(h));
1875 }
1876
TEST_F(DataPipeTest,CreateInChild)1877 TEST_F(DataPipeTest, CreateInChild) {
1878 RUN_CHILD_ON_PIPE(CreateAndWrite, child)
1879 MojoHandle c;
1880 std::string expected_message = ReadMessageWithHandles(child, &c, 1);
1881
1882 // Wait for the consumer to become readable.
1883 EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
1884 MOJO_DEADLINE_INDEFINITE, nullptr));
1885
1886 // Drain the consumer and expect to find the given message.
1887 uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1888 std::vector<char> bytes(expected_message.size());
1889 EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
1890 MOJO_READ_DATA_FLAG_NONE));
1891 EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1892
1893 std::string message(bytes.data(), bytes.size());
1894 EXPECT_EQ(expected_message, message);
1895
1896 EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1897 WriteMessage(child, "quit");
1898 END_CHILD()
1899 }
1900
1901 #endif // !defined(OS_IOS)
1902
1903 } // namespace
1904 } // namespace edk
1905 } // namespace mojo
1906