1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <stddef.h>
6 #include <stdint.h>
7 
8 #include <memory>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "mojo/edk/embedder/embedder.h"
16 #include "mojo/edk/embedder/platform_channel_pair.h"
17 #include "mojo/edk/system/test_utils.h"
18 #include "mojo/edk/system/waiter.h"
19 #include "mojo/edk/test/mojo_test_base.h"
20 #include "mojo/public/c/system/data_pipe.h"
21 #include "mojo/public/c/system/functions.h"
22 #include "mojo/public/c/system/message_pipe.h"
23 #include "testing/gtest/include/gtest/gtest.h"
24 
25 namespace mojo {
26 namespace edk {
27 namespace {
28 
29 const uint32_t kSizeOfOptions =
30     static_cast<uint32_t>(sizeof(MojoCreateDataPipeOptions));
31 
32 // In various places, we have to poll (since, e.g., we can't yet wait for a
33 // certain amount of data to be available). This is the maximum number of
34 // iterations (separated by a short sleep).
35 // TODO(vtl): Get rid of this.
36 const size_t kMaxPoll = 100;
37 
38 // Used in Multiprocess test.
39 const size_t kMultiprocessCapacity = 37;
40 const char kMultiprocessTestData[] = "hello i'm a string that is 36 bytes";
41 const int kMultiprocessMaxIter = 5;
42 
43 class DataPipeTest : public test::MojoTestBase {
44  public:
DataPipeTest()45   DataPipeTest() : producer_(MOJO_HANDLE_INVALID),
46                    consumer_(MOJO_HANDLE_INVALID) {}
47 
~DataPipeTest()48   ~DataPipeTest() override {
49     if (producer_ != MOJO_HANDLE_INVALID)
50       CHECK_EQ(MOJO_RESULT_OK, MojoClose(producer_));
51     if (consumer_ != MOJO_HANDLE_INVALID)
52       CHECK_EQ(MOJO_RESULT_OK, MojoClose(consumer_));
53   }
54 
Create(const MojoCreateDataPipeOptions * options)55   MojoResult Create(const MojoCreateDataPipeOptions* options) {
56     return MojoCreateDataPipe(options, &producer_, &consumer_);
57   }
58 
WriteData(const void * elements,uint32_t * num_bytes,bool all_or_none=false)59   MojoResult WriteData(const void* elements,
60                        uint32_t* num_bytes,
61                        bool all_or_none = false) {
62     return MojoWriteData(producer_, elements, num_bytes,
63                          all_or_none ? MOJO_WRITE_DATA_FLAG_ALL_OR_NONE
64                                      : MOJO_WRITE_DATA_FLAG_NONE);
65   }
66 
ReadData(void * elements,uint32_t * num_bytes,bool all_or_none=false,bool peek=false)67   MojoResult ReadData(void* elements,
68                       uint32_t* num_bytes,
69                       bool all_or_none = false,
70                       bool peek = false) {
71     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
72     if (all_or_none)
73       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
74     if (peek)
75       flags |= MOJO_READ_DATA_FLAG_PEEK;
76     return MojoReadData(consumer_, elements, num_bytes, flags);
77   }
78 
QueryData(uint32_t * num_bytes)79   MojoResult QueryData(uint32_t* num_bytes) {
80     return MojoReadData(consumer_, nullptr, num_bytes,
81                         MOJO_READ_DATA_FLAG_QUERY);
82   }
83 
DiscardData(uint32_t * num_bytes,bool all_or_none=false)84   MojoResult DiscardData(uint32_t* num_bytes, bool all_or_none = false) {
85     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_DISCARD;
86     if (all_or_none)
87       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
88     return MojoReadData(consumer_, nullptr, num_bytes, flags);
89   }
90 
BeginReadData(const void ** elements,uint32_t * num_bytes,bool all_or_none=false)91   MojoResult BeginReadData(const void** elements,
92                            uint32_t* num_bytes,
93                            bool all_or_none = false) {
94     MojoReadDataFlags flags = MOJO_READ_DATA_FLAG_NONE;
95     if (all_or_none)
96       flags |= MOJO_READ_DATA_FLAG_ALL_OR_NONE;
97     return MojoBeginReadData(consumer_, elements, num_bytes, flags);
98   }
99 
EndReadData(uint32_t num_bytes_read)100   MojoResult EndReadData(uint32_t num_bytes_read) {
101     return MojoEndReadData(consumer_, num_bytes_read);
102   }
103 
BeginWriteData(void ** elements,uint32_t * num_bytes,bool all_or_none=false)104   MojoResult BeginWriteData(void** elements,
105                             uint32_t* num_bytes,
106                             bool all_or_none = false) {
107     MojoReadDataFlags flags = MOJO_WRITE_DATA_FLAG_NONE;
108     if (all_or_none)
109       flags |= MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
110     return MojoBeginWriteData(producer_, elements, num_bytes, flags);
111   }
112 
EndWriteData(uint32_t num_bytes_written)113   MojoResult EndWriteData(uint32_t num_bytes_written) {
114     return MojoEndWriteData(producer_, num_bytes_written);
115   }
116 
CloseProducer()117   MojoResult CloseProducer() {
118     MojoResult rv = MojoClose(producer_);
119     producer_ = MOJO_HANDLE_INVALID;
120     return rv;
121   }
122 
CloseConsumer()123   MojoResult CloseConsumer() {
124     MojoResult rv = MojoClose(consumer_);
125     consumer_ = MOJO_HANDLE_INVALID;
126     return rv;
127   }
128 
129   MojoHandle producer_, consumer_;
130 
131  private:
132   DISALLOW_COPY_AND_ASSIGN(DataPipeTest);
133 };
134 
TEST_F(DataPipeTest,Basic)135 TEST_F(DataPipeTest, Basic) {
136   const MojoCreateDataPipeOptions options = {
137       kSizeOfOptions,                           // |struct_size|.
138       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
139       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
140       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
141   };
142 
143   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
144 
145   // We can write to a data pipe handle immediately.
146   int32_t elements[10] = {};
147   uint32_t num_bytes = 0;
148 
149   num_bytes =
150       static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
151 
152   elements[0] = 123;
153   elements[1] = 456;
154   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
155   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&elements[0], &num_bytes));
156 
157   // Now wait for the other side to become readable.
158   MojoHandleSignalsState state;
159   ASSERT_EQ(MOJO_RESULT_OK,
160             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
161                      MOJO_DEADLINE_INDEFINITE, &state));
162   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, state.satisfied_signals);
163 
164   elements[0] = -1;
165   elements[1] = -1;
166   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&elements[0], &num_bytes));
167   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
168   ASSERT_EQ(elements[0], 123);
169   ASSERT_EQ(elements[1], 456);
170 }
171 
172 // Tests creation of data pipes with various (valid) options.
TEST_F(DataPipeTest,CreateAndMaybeTransfer)173 TEST_F(DataPipeTest, CreateAndMaybeTransfer) {
174   MojoCreateDataPipeOptions test_options[] = {
175       // Default options.
176       {},
177       // Trivial element size, non-default capacity.
178       {kSizeOfOptions,                           // |struct_size|.
179        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
180        1,                                        // |element_num_bytes|.
181        1000},                                    // |capacity_num_bytes|.
182       // Nontrivial element size, non-default capacity.
183       {kSizeOfOptions,                           // |struct_size|.
184        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
185        4,                                        // |element_num_bytes|.
186        4000},                                    // |capacity_num_bytes|.
187       // Nontrivial element size, default capacity.
188       {kSizeOfOptions,                           // |struct_size|.
189        MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
190        100,                                      // |element_num_bytes|.
191        0}                                        // |capacity_num_bytes|.
192   };
193   for (size_t i = 0; i < arraysize(test_options); i++) {
194     MojoHandle producer_handle, consumer_handle;
195     MojoCreateDataPipeOptions* options =
196         i ? &test_options[i] : nullptr;
197     ASSERT_EQ(MOJO_RESULT_OK,
198               MojoCreateDataPipe(options, &producer_handle, &consumer_handle));
199     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(producer_handle));
200     ASSERT_EQ(MOJO_RESULT_OK, MojoClose(consumer_handle));
201   }
202 }
203 
TEST_F(DataPipeTest,SimpleReadWrite)204 TEST_F(DataPipeTest, SimpleReadWrite) {
205   const MojoCreateDataPipeOptions options = {
206       kSizeOfOptions,                           // |struct_size|.
207       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
208       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
209       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
210   };
211 
212   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
213   MojoHandleSignalsState hss;
214 
215   int32_t elements[10] = {};
216   uint32_t num_bytes = 0;
217 
218   // Try reading; nothing there yet.
219   num_bytes =
220       static_cast<uint32_t>(arraysize(elements) * sizeof(elements[0]));
221   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, ReadData(elements, &num_bytes));
222 
223   // Query; nothing there yet.
224   num_bytes = 0;
225   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
226   ASSERT_EQ(0u, num_bytes);
227 
228   // Discard; nothing there yet.
229   num_bytes = static_cast<uint32_t>(5u * sizeof(elements[0]));
230   ASSERT_EQ(MOJO_RESULT_SHOULD_WAIT, DiscardData(&num_bytes));
231 
232   // Read with invalid |num_bytes|.
233   num_bytes = sizeof(elements[0]) + 1;
234   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, ReadData(elements, &num_bytes));
235 
236   // Write two elements.
237   elements[0] = 123;
238   elements[1] = 456;
239   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
240   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
241   // It should have written everything (even without "all or none").
242   ASSERT_EQ(2u * sizeof(elements[0]), num_bytes);
243 
244   // Wait.
245   ASSERT_EQ(MOJO_RESULT_OK,
246             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
247                      MOJO_DEADLINE_INDEFINITE, &hss));
248   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
249   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
250             hss.satisfiable_signals);
251 
252   // Query.
253   // TODO(vtl): It's theoretically possible (though not with the current
254   // implementation/configured limits) that not all the data has arrived yet.
255   // (The theoretically-correct assertion here is that |num_bytes| is |1 * ...|
256   // or |2 * ...|.)
257   num_bytes = 0;
258   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
259   ASSERT_EQ(2 * sizeof(elements[0]), num_bytes);
260 
261   // Read one element.
262   elements[0] = -1;
263   elements[1] = -1;
264   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
265   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes));
266   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
267   ASSERT_EQ(123, elements[0]);
268   ASSERT_EQ(-1, elements[1]);
269 
270   // Query.
271   // TODO(vtl): See previous TODO. (If we got 2 elements there, however, we
272   // should get 1 here.)
273   num_bytes = 0;
274   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
275   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
276 
277   // Peek one element.
278   elements[0] = -1;
279   elements[1] = -1;
280   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
281   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, true));
282   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
283   ASSERT_EQ(456, elements[0]);
284   ASSERT_EQ(-1, elements[1]);
285 
286   // Query. Still has 1 element remaining.
287   num_bytes = 0;
288   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
289   ASSERT_EQ(1 * sizeof(elements[0]), num_bytes);
290 
291   // Try to read two elements, with "all or none".
292   elements[0] = -1;
293   elements[1] = -1;
294   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
295   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE,
296             ReadData(elements, &num_bytes, true, false));
297   ASSERT_EQ(-1, elements[0]);
298   ASSERT_EQ(-1, elements[1]);
299 
300   // Try to read two elements, without "all or none".
301   elements[0] = -1;
302   elements[1] = -1;
303   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
304   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, false, false));
305   ASSERT_EQ(1u * sizeof(elements[0]), num_bytes);
306   ASSERT_EQ(456, elements[0]);
307   ASSERT_EQ(-1, elements[1]);
308 
309   // Query.
310   num_bytes = 0;
311   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
312   ASSERT_EQ(0u, num_bytes);
313 }
314 
315 // Note: The "basic" waiting tests test that the "wait states" are correct in
316 // various situations; they don't test that waiters are properly awoken on state
317 // changes. (For that, we need to use multiple threads.)
TEST_F(DataPipeTest,BasicProducerWaiting)318 TEST_F(DataPipeTest, BasicProducerWaiting) {
319   // Note: We take advantage of the fact that current for current
320   // implementations capacities are strict maximums. This is not guaranteed by
321   // the API.
322 
323   const MojoCreateDataPipeOptions options = {
324       kSizeOfOptions,                           // |struct_size|.
325       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
326       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
327       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
328   };
329   Create(&options);
330   MojoHandleSignalsState hss;
331 
332   // Never readable.
333   hss = MojoHandleSignalsState();
334   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
335             MojoWait(producer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
336   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
337   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
338             hss.satisfiable_signals);
339 
340   // Already writable.
341   hss = MojoHandleSignalsState();
342   ASSERT_EQ(MOJO_RESULT_OK,
343             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
344 
345   // Write two elements.
346   int32_t elements[2] = {123, 456};
347   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
348   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
349   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
350 
351   // Wait for data to become available to the consumer.
352   ASSERT_EQ(MOJO_RESULT_OK,
353             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
354                      MOJO_DEADLINE_INDEFINITE, &hss));
355   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
356   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
357             hss.satisfiable_signals);
358 
359   // Peek one element.
360   elements[0] = -1;
361   elements[1] = -1;
362   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
363   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
364   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
365   ASSERT_EQ(123, elements[0]);
366   ASSERT_EQ(-1, elements[1]);
367 
368   // Read one element.
369   elements[0] = -1;
370   elements[1] = -1;
371   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
372   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, false));
373   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
374   ASSERT_EQ(123, elements[0]);
375   ASSERT_EQ(-1, elements[1]);
376 
377   // Try writing, using a two-phase write.
378   void* buffer = nullptr;
379   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
380   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes));
381   EXPECT_TRUE(buffer);
382   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
383 
384   static_cast<int32_t*>(buffer)[0] = 789;
385   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(static_cast<uint32_t>(
386                                          1u * sizeof(elements[0]))));
387 
388   // Read one element, using a two-phase read.
389   const void* read_buffer = nullptr;
390   num_bytes = 0u;
391   ASSERT_EQ(MOJO_RESULT_OK,
392             BeginReadData(&read_buffer, &num_bytes, false));
393   EXPECT_TRUE(read_buffer);
394   // The two-phase read should be able to read at least one element.
395   ASSERT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(elements[0])));
396   ASSERT_EQ(456, static_cast<const int32_t*>(read_buffer)[0]);
397   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(static_cast<uint32_t>(
398                                         1u * sizeof(elements[0]))));
399 
400   // Write one element.
401   elements[0] = 123;
402   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
403   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes));
404   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
405 
406   // Close the consumer.
407   CloseConsumer();
408 
409   // It should now be never-writable.
410   hss = MojoHandleSignalsState();
411   ASSERT_EQ(MOJO_RESULT_OK,
412             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
413                      MOJO_DEADLINE_INDEFINITE, &hss));
414   hss = MojoHandleSignalsState();
415   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
416             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
417   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
418   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
419 }
420 
TEST_F(DataPipeTest,PeerClosedProducerWaiting)421 TEST_F(DataPipeTest, PeerClosedProducerWaiting) {
422   const MojoCreateDataPipeOptions options = {
423       kSizeOfOptions,                           // |struct_size|.
424       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
425       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
426       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
427   };
428   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
429   MojoHandleSignalsState hss;
430 
431   // Close the consumer.
432   CloseConsumer();
433 
434   // It should be signaled.
435   hss = MojoHandleSignalsState();
436   ASSERT_EQ(MOJO_RESULT_OK,
437             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
438                      MOJO_DEADLINE_INDEFINITE, &hss));
439   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
440   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
441 }
442 
TEST_F(DataPipeTest,PeerClosedConsumerWaiting)443 TEST_F(DataPipeTest, PeerClosedConsumerWaiting) {
444   const MojoCreateDataPipeOptions options = {
445       kSizeOfOptions,                           // |struct_size|.
446       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
447       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
448       2 * sizeof(int32_t)                       // |capacity_num_bytes|.
449   };
450   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
451   MojoHandleSignalsState hss;
452 
453   // Close the producer.
454   CloseProducer();
455 
456   // It should be signaled.
457   hss = MojoHandleSignalsState();
458   ASSERT_EQ(MOJO_RESULT_OK,
459             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
460                      MOJO_DEADLINE_INDEFINITE, &hss));
461   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
462   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
463 }
464 
TEST_F(DataPipeTest,BasicConsumerWaiting)465 TEST_F(DataPipeTest, BasicConsumerWaiting) {
466   const MojoCreateDataPipeOptions options = {
467       kSizeOfOptions,                           // |struct_size|.
468       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
469       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
470       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
471   };
472   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
473   MojoHandleSignalsState hss;
474 
475   // Never writable.
476   hss = MojoHandleSignalsState();
477   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
478             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_WRITABLE,
479                      MOJO_DEADLINE_INDEFINITE, &hss));
480   ASSERT_EQ(0u, hss.satisfied_signals);
481   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
482             hss.satisfiable_signals);
483 
484   // Write two elements.
485   int32_t elements[2] = {123, 456};
486   uint32_t num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
487   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
488 
489   // Wait for readability.
490   hss = MojoHandleSignalsState();
491   ASSERT_EQ(MOJO_RESULT_OK,
492             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
493                      MOJO_DEADLINE_INDEFINITE, &hss));
494   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
495   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
496             hss.satisfiable_signals);
497 
498   // Discard one element.
499   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
500   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
501   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
502 
503   // Should still be readable.
504   hss = MojoHandleSignalsState();
505   ASSERT_EQ(MOJO_RESULT_OK,
506             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
507                      MOJO_DEADLINE_INDEFINITE, &hss));
508   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
509   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
510             hss.satisfiable_signals);
511 
512   // Peek one element.
513   elements[0] = -1;
514   elements[1] = -1;
515   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
516   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true, true));
517   ASSERT_EQ(456, elements[0]);
518   ASSERT_EQ(-1, elements[1]);
519 
520   // Should still be readable.
521   hss = MojoHandleSignalsState();
522   ASSERT_EQ(MOJO_RESULT_OK,
523             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
524                      MOJO_DEADLINE_INDEFINITE, &hss));
525   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
526   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
527             hss.satisfiable_signals);
528 
529   // Read one element.
530   elements[0] = -1;
531   elements[1] = -1;
532   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
533   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
534   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
535   ASSERT_EQ(456, elements[0]);
536   ASSERT_EQ(-1, elements[1]);
537 
538   // Write one element.
539   elements[0] = 789;
540   elements[1] = -1;
541   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
542   ASSERT_EQ(MOJO_RESULT_OK, WriteData(elements, &num_bytes, true));
543 
544   // Waiting should now succeed.
545   hss = MojoHandleSignalsState();
546   ASSERT_EQ(MOJO_RESULT_OK,
547             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
548                      MOJO_DEADLINE_INDEFINITE, &hss));
549   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
550   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
551             hss.satisfiable_signals);
552 
553   // Close the producer.
554   CloseProducer();
555 
556   // Should still be readable.
557   hss = MojoHandleSignalsState();
558   ASSERT_EQ(MOJO_RESULT_OK,
559             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
560                      MOJO_DEADLINE_INDEFINITE, &hss));
561   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
562   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
563             hss.satisfiable_signals);
564 
565   // Wait for the peer closed signal.
566   hss = MojoHandleSignalsState();
567   ASSERT_EQ(MOJO_RESULT_OK,
568             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
569                      MOJO_DEADLINE_INDEFINITE, &hss));
570   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED) != 0);
571   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
572             hss.satisfiable_signals);
573 
574   // Read one element.
575   elements[0] = -1;
576   elements[1] = -1;
577   num_bytes = static_cast<uint32_t>(1u * sizeof(elements[0]));
578   ASSERT_EQ(MOJO_RESULT_OK, ReadData(elements, &num_bytes, true));
579   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
580   ASSERT_EQ(789, elements[0]);
581   ASSERT_EQ(-1, elements[1]);
582 
583   // Should be never-readable.
584   hss = MojoHandleSignalsState();
585   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
586             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
587                      MOJO_DEADLINE_INDEFINITE, &hss));
588   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
589   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
590 }
591 
592 // Test with two-phase APIs and also closing the producer with an active
593 // consumer waiter.
TEST_F(DataPipeTest,ConsumerWaitingTwoPhase)594 TEST_F(DataPipeTest, ConsumerWaitingTwoPhase) {
595   const MojoCreateDataPipeOptions options = {
596       kSizeOfOptions,                           // |struct_size|.
597       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
598       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
599       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
600   };
601   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
602   MojoHandleSignalsState hss;
603 
604   // Write two elements.
605   int32_t* elements = nullptr;
606   void* buffer = nullptr;
607   // Request room for three (but we'll only write two).
608   uint32_t num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
609   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&buffer, &num_bytes, true));
610   EXPECT_TRUE(buffer);
611   EXPECT_GE(num_bytes, static_cast<uint32_t>(3u * sizeof(elements[0])));
612   elements = static_cast<int32_t*>(buffer);
613   elements[0] = 123;
614   elements[1] = 456;
615   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(2u * sizeof(elements[0])));
616 
617   // Wait for readability.
618   hss = MojoHandleSignalsState();
619   ASSERT_EQ(MOJO_RESULT_OK,
620             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
621                      MOJO_DEADLINE_INDEFINITE, &hss));
622   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
623   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
624             hss.satisfiable_signals);
625 
626   // Read one element.
627   // Request two in all-or-none mode, but only read one.
628   const void* read_buffer = nullptr;
629   num_bytes = static_cast<uint32_t>(2u * sizeof(elements[0]));
630   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes, true));
631   EXPECT_TRUE(read_buffer);
632   ASSERT_EQ(static_cast<uint32_t>(2u * sizeof(elements[0])), num_bytes);
633   const int32_t* read_elements = static_cast<const int32_t*>(read_buffer);
634   ASSERT_EQ(123, read_elements[0]);
635   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
636 
637   // Should still be readable.
638   hss = MojoHandleSignalsState();
639   ASSERT_EQ(MOJO_RESULT_OK,
640             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
641                      MOJO_DEADLINE_INDEFINITE, &hss));
642   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
643   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
644             hss.satisfiable_signals);
645 
646   // Read one element.
647   // Request three, but not in all-or-none mode.
648   read_buffer = nullptr;
649   num_bytes = static_cast<uint32_t>(3u * sizeof(elements[0]));
650   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer, &num_bytes));
651   EXPECT_TRUE(read_buffer);
652   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(elements[0])), num_bytes);
653   read_elements = static_cast<const int32_t*>(read_buffer);
654   ASSERT_EQ(456, read_elements[0]);
655   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(1u * sizeof(elements[0])));
656 
657   // Close the producer.
658   CloseProducer();
659 
660   // Should be never-readable.
661   hss = MojoHandleSignalsState();
662   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
663             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
664                      MOJO_DEADLINE_INDEFINITE, &hss));
665   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
666   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
667 }
668 
669 // Tests that data pipes aren't writable/readable during two-phase writes/reads.
TEST_F(DataPipeTest,BasicTwoPhaseWaiting)670 TEST_F(DataPipeTest, BasicTwoPhaseWaiting) {
671   const MojoCreateDataPipeOptions options = {
672       kSizeOfOptions,                           // |struct_size|.
673       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
674       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
675       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
676   };
677   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
678   MojoHandleSignalsState hss;
679 
680   // It should be writable.
681   hss = MojoHandleSignalsState();
682   ASSERT_EQ(MOJO_RESULT_OK,
683             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
684   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
685   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
686             hss.satisfiable_signals);
687 
688   uint32_t num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
689   void* write_ptr = nullptr;
690   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
691   EXPECT_TRUE(write_ptr);
692   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
693 
694   // At this point, it shouldn't be writable.
695   hss = MojoHandleSignalsState();
696   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
697             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
698   ASSERT_EQ(0u, hss.satisfied_signals);
699   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
700             hss.satisfiable_signals);
701 
702   // It shouldn't be readable yet either (we'll wait later).
703   hss = MojoHandleSignalsState();
704   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
705             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
706   ASSERT_EQ(0u, hss.satisfied_signals);
707   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
708             hss.satisfiable_signals);
709 
710   static_cast<int32_t*>(write_ptr)[0] = 123;
711   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(1u * sizeof(int32_t)));
712 
713   // It should immediately be writable again.
714   hss = MojoHandleSignalsState();
715   ASSERT_EQ(MOJO_RESULT_OK,
716             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
717   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
718   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
719             hss.satisfiable_signals);
720 
721   // It should become readable.
722   hss = MojoHandleSignalsState();
723   ASSERT_EQ(MOJO_RESULT_OK,
724             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
725                      MOJO_DEADLINE_INDEFINITE, &hss));
726   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
727   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
728             hss.satisfiable_signals);
729 
730   // Start another two-phase write and check that it's readable even in the
731   // middle of it.
732   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
733   write_ptr = nullptr;
734   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
735   EXPECT_TRUE(write_ptr);
736   EXPECT_GE(num_bytes, static_cast<uint32_t>(1u * sizeof(int32_t)));
737 
738   // It should be readable.
739   hss = MojoHandleSignalsState();
740   ASSERT_EQ(MOJO_RESULT_OK,
741             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
742                      MOJO_DEADLINE_INDEFINITE, &hss));
743   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
744   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
745             hss.satisfiable_signals);
746 
747   // End the two-phase write without writing anything.
748   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0u));
749 
750   // Start a two-phase read.
751   num_bytes = static_cast<uint32_t>(1u * sizeof(int32_t));
752   const void* read_ptr = nullptr;
753   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
754   EXPECT_TRUE(read_ptr);
755   ASSERT_EQ(static_cast<uint32_t>(1u * sizeof(int32_t)), num_bytes);
756 
757   // At this point, it should still be writable.
758   hss = MojoHandleSignalsState();
759   ASSERT_EQ(MOJO_RESULT_OK,
760             MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
761   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
762   ASSERT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
763             hss.satisfiable_signals);
764 
765   // But not readable.
766   hss = MojoHandleSignalsState();
767   ASSERT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
768             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
769   ASSERT_EQ(0u, hss.satisfied_signals);
770   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
771             hss.satisfiable_signals);
772 
773   // End the two-phase read without reading anything.
774   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0u));
775 
776   // It should be readable again.
777   hss = MojoHandleSignalsState();
778   ASSERT_EQ(MOJO_RESULT_OK,
779             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
780   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
781   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
782             hss.satisfiable_signals);
783 }
784 
Seq(int32_t start,size_t count,int32_t * out)785 void Seq(int32_t start, size_t count, int32_t* out) {
786   for (size_t i = 0; i < count; i++)
787     out[i] = start + static_cast<int32_t>(i);
788 }
789 
TEST_F(DataPipeTest,AllOrNone)790 TEST_F(DataPipeTest, AllOrNone) {
791   const MojoCreateDataPipeOptions options = {
792       kSizeOfOptions,                           // |struct_size|.
793       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
794       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
795       10 * sizeof(int32_t)                      // |capacity_num_bytes|.
796   };
797   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
798   MojoHandleSignalsState hss;
799 
800   // Try writing way too much.
801   uint32_t num_bytes = 20u * sizeof(int32_t);
802   int32_t buffer[100];
803   Seq(0, arraysize(buffer), buffer);
804   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
805 
806   // Should still be empty.
807   num_bytes = ~0u;
808   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
809   ASSERT_EQ(0u, num_bytes);
810 
811   // Write some data.
812   num_bytes = 5u * sizeof(int32_t);
813   Seq(100, arraysize(buffer), buffer);
814   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
815   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
816 
817   // Wait for data.
818   // TODO(vtl): There's no real guarantee that all the data will become
819   // available at once (except that in current implementations, with reasonable
820   // limits, it will). Eventually, we'll be able to wait for a specified amount
821   // of data to become available.
822   hss = MojoHandleSignalsState();
823   ASSERT_EQ(MOJO_RESULT_OK,
824             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
825                      MOJO_DEADLINE_INDEFINITE, &hss));
826   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
827   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
828             hss.satisfiable_signals);
829 
830   // Half full.
831   num_bytes = 0u;
832   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
833   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
834 
835   /* TODO(jam): enable if we end up observing max capacity
836   // Too much.
837   num_bytes = 6u * sizeof(int32_t);
838   Seq(200, arraysize(buffer), buffer);
839   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, WriteData(buffer, &num_bytes, true));
840   */
841 
842   // Try reading too much.
843   num_bytes = 11u * sizeof(int32_t);
844   memset(buffer, 0xab, sizeof(buffer));
845   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
846   int32_t expected_buffer[100];
847   memset(expected_buffer, 0xab, sizeof(expected_buffer));
848   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
849 
850   // Try discarding too much.
851   num_bytes = 11u * sizeof(int32_t);
852   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
853 
854   // Just a little.
855   num_bytes = 2u * sizeof(int32_t);
856   Seq(300, arraysize(buffer), buffer);
857   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
858   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
859 
860   // Just right.
861   num_bytes = 3u * sizeof(int32_t);
862   Seq(400, arraysize(buffer), buffer);
863   ASSERT_EQ(MOJO_RESULT_OK, WriteData(buffer, &num_bytes, true));
864   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
865 
866   // TODO(vtl): Hack (see also the TODO above): We can't currently wait for a
867   // specified amount of data to be available, so poll.
868   for (size_t i = 0; i < kMaxPoll; i++) {
869     num_bytes = 0u;
870     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
871     if (num_bytes >= 10u * sizeof(int32_t))
872       break;
873 
874     test::Sleep(test::EpsilonDeadline());
875   }
876   ASSERT_EQ(10u * sizeof(int32_t), num_bytes);
877 
878   // Read half.
879   num_bytes = 5u * sizeof(int32_t);
880   memset(buffer, 0xab, sizeof(buffer));
881   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
882   ASSERT_EQ(5u * sizeof(int32_t), num_bytes);
883   memset(expected_buffer, 0xab, sizeof(expected_buffer));
884   Seq(100, 5, expected_buffer);
885   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
886 
887   // Try reading too much again.
888   num_bytes = 6u * sizeof(int32_t);
889   memset(buffer, 0xab, sizeof(buffer));
890   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, ReadData(buffer, &num_bytes, true));
891   memset(expected_buffer, 0xab, sizeof(expected_buffer));
892   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
893 
894   // Try discarding too much again.
895   num_bytes = 6u * sizeof(int32_t);
896   ASSERT_EQ(MOJO_RESULT_OUT_OF_RANGE, DiscardData(&num_bytes, true));
897 
898   // Discard a little.
899   num_bytes = 2u * sizeof(int32_t);
900   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
901   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
902 
903   // Three left.
904   num_bytes = 0u;
905   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
906   ASSERT_EQ(3u * sizeof(int32_t), num_bytes);
907 
908   // Close the producer, then test producer-closed cases.
909   CloseProducer();
910 
911   // Wait.
912   hss = MojoHandleSignalsState();
913   ASSERT_EQ(MOJO_RESULT_OK,
914             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
915                      MOJO_DEADLINE_INDEFINITE, &hss));
916   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
917             hss.satisfied_signals);
918   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
919             hss.satisfiable_signals);
920 
921   // Try reading too much; "failed precondition" since the producer is closed.
922   num_bytes = 4u * sizeof(int32_t);
923   memset(buffer, 0xab, sizeof(buffer));
924   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
925             ReadData(buffer, &num_bytes, true));
926   memset(expected_buffer, 0xab, sizeof(expected_buffer));
927   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
928 
929   // Try discarding too much; "failed precondition" again.
930   num_bytes = 4u * sizeof(int32_t);
931   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes, true));
932 
933   // Read a little.
934   num_bytes = 2u * sizeof(int32_t);
935   memset(buffer, 0xab, sizeof(buffer));
936   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, true));
937   ASSERT_EQ(2u * sizeof(int32_t), num_bytes);
938   memset(expected_buffer, 0xab, sizeof(expected_buffer));
939   Seq(400, 2, expected_buffer);
940   ASSERT_EQ(0, memcmp(buffer, expected_buffer, sizeof(buffer)));
941 
942   // Discard the remaining element.
943   num_bytes = 1u * sizeof(int32_t);
944   ASSERT_EQ(MOJO_RESULT_OK, DiscardData(&num_bytes, true));
945   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
946 
947   // Empty again.
948   num_bytes = ~0u;
949   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
950   ASSERT_EQ(0u, num_bytes);
951 }
952 
953 // Tests that |ProducerWriteData()| and |ConsumerReadData()| writes and reads,
954 // respectively, as much as possible, even if it may have to "wrap around" the
955 // internal circular buffer. (Note that the two-phase write and read need not do
956 // this.)
TEST_F(DataPipeTest,WrapAround)957 TEST_F(DataPipeTest, WrapAround) {
958   unsigned char test_data[1000];
959   for (size_t i = 0; i < arraysize(test_data); i++)
960     test_data[i] = static_cast<unsigned char>(i);
961 
962   const MojoCreateDataPipeOptions options = {
963       kSizeOfOptions,                           // |struct_size|.
964       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
965       1u,                                       // |element_num_bytes|.
966       100u                                      // |capacity_num_bytes|.
967   };
968 
969   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
970   MojoHandleSignalsState hss;
971 
972   // Write 20 bytes.
973   uint32_t num_bytes = 20u;
974   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&test_data[0], &num_bytes, true));
975   ASSERT_EQ(20u, num_bytes);
976 
977   // Wait for data.
978   ASSERT_EQ(MOJO_RESULT_OK,
979             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
980                      MOJO_DEADLINE_INDEFINITE, &hss));
981   ASSERT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE) != 0);
982   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
983             hss.satisfiable_signals);
984 
985   // Read 10 bytes.
986   unsigned char read_buffer[1000] = {0};
987   num_bytes = 10u;
988   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes, true));
989   ASSERT_EQ(10u, num_bytes);
990   ASSERT_EQ(0, memcmp(read_buffer, &test_data[0], 10u));
991 
992   // Check that a two-phase write can now only write (at most) 80 bytes. (This
993   // checks an implementation detail; this behavior is not guaranteed.)
994   void* write_buffer_ptr = nullptr;
995   num_bytes = 0u;
996   ASSERT_EQ(MOJO_RESULT_OK,
997             BeginWriteData(&write_buffer_ptr, &num_bytes, false));
998   EXPECT_TRUE(write_buffer_ptr);
999   ASSERT_EQ(80u, num_bytes);
1000   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(0));
1001 
1002   size_t total_num_bytes = 0;
1003   while (total_num_bytes < 90) {
1004     // Wait to write.
1005     ASSERT_EQ(MOJO_RESULT_OK,
1006               MojoWait(producer_, MOJO_HANDLE_SIGNAL_WRITABLE,
1007                        MOJO_DEADLINE_INDEFINITE, &hss));
1008     ASSERT_EQ(hss.satisfied_signals, MOJO_HANDLE_SIGNAL_WRITABLE);
1009     ASSERT_EQ(hss.satisfiable_signals,
1010               MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED);
1011 
1012     // Write as much as we can.
1013     num_bytes = 100;
1014     ASSERT_EQ(MOJO_RESULT_OK,
1015               WriteData(&test_data[20 + total_num_bytes], &num_bytes, false));
1016     total_num_bytes += num_bytes;
1017   }
1018 
1019   ASSERT_EQ(90u, total_num_bytes);
1020 
1021   num_bytes = 0;
1022   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1023   ASSERT_EQ(100u, num_bytes);
1024 
1025   // Check that a two-phase read can now only read (at most) 90 bytes. (This
1026   // checks an implementation detail; this behavior is not guaranteed.)
1027   const void* read_buffer_ptr = nullptr;
1028   num_bytes = 0;
1029   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes, false));
1030   EXPECT_TRUE(read_buffer_ptr);
1031   ASSERT_EQ(90u, num_bytes);
1032   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(0));
1033 
1034   // Read as much as possible. We should read 100 bytes.
1035   num_bytes = static_cast<uint32_t>(arraysize(read_buffer) *
1036                                     sizeof(read_buffer[0]));
1037   memset(read_buffer, 0, num_bytes);
1038   ASSERT_EQ(MOJO_RESULT_OK, ReadData(read_buffer, &num_bytes));
1039   ASSERT_EQ(100u, num_bytes);
1040   ASSERT_EQ(0, memcmp(read_buffer, &test_data[10], 100u));
1041 }
1042 
1043 // Tests the behavior of writing (simple and two-phase), closing the producer,
1044 // then reading (simple and two-phase).
TEST_F(DataPipeTest,WriteCloseProducerRead)1045 TEST_F(DataPipeTest, WriteCloseProducerRead) {
1046   const char kTestData[] = "hello world";
1047   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1048 
1049   const MojoCreateDataPipeOptions options = {
1050       kSizeOfOptions,                           // |struct_size|.
1051       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1052       1u,                                       // |element_num_bytes|.
1053       1000u                                     // |capacity_num_bytes|.
1054   };
1055   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1056 
1057   // Write some data, so we'll have something to read.
1058   uint32_t num_bytes = kTestDataSize;
1059   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1060   ASSERT_EQ(kTestDataSize, num_bytes);
1061 
1062   // Write it again, so we'll have something left over.
1063   num_bytes = kTestDataSize;
1064   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes, false));
1065   ASSERT_EQ(kTestDataSize, num_bytes);
1066 
1067   // Start two-phase write.
1068   void* write_buffer_ptr = nullptr;
1069   num_bytes = 0u;
1070   ASSERT_EQ(MOJO_RESULT_OK,
1071             BeginWriteData(&write_buffer_ptr, &num_bytes, false));
1072   EXPECT_TRUE(write_buffer_ptr);
1073   EXPECT_GT(num_bytes, 0u);
1074 
1075   // TODO(vtl): (See corresponding TODO in TwoPhaseAllOrNone.)
1076   for (size_t i = 0; i < kMaxPoll; i++) {
1077     num_bytes = 0u;
1078     ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1079     if (num_bytes >= 2u * kTestDataSize)
1080       break;
1081 
1082     test::Sleep(test::EpsilonDeadline());
1083   }
1084   ASSERT_EQ(2u * kTestDataSize, num_bytes);
1085 
1086   // Start two-phase read.
1087   const void* read_buffer_ptr = nullptr;
1088   num_bytes = 0u;
1089   ASSERT_EQ(MOJO_RESULT_OK,
1090             BeginReadData(&read_buffer_ptr, &num_bytes));
1091   EXPECT_TRUE(read_buffer_ptr);
1092   ASSERT_EQ(2u * kTestDataSize, num_bytes);
1093 
1094   // Close the producer.
1095   CloseProducer();
1096 
1097   // The consumer can finish its two-phase read.
1098   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1099   ASSERT_EQ(MOJO_RESULT_OK, EndReadData(kTestDataSize));
1100 
1101   // And start another.
1102   read_buffer_ptr = nullptr;
1103   num_bytes = 0u;
1104   ASSERT_EQ(MOJO_RESULT_OK,
1105             BeginReadData(&read_buffer_ptr, &num_bytes));
1106   EXPECT_TRUE(read_buffer_ptr);
1107   ASSERT_EQ(kTestDataSize, num_bytes);
1108 }
1109 
1110 
1111 // Tests the behavior of interrupting a two-phase read and write by closing the
1112 // consumer.
TEST_F(DataPipeTest,TwoPhaseWriteReadCloseConsumer)1113 TEST_F(DataPipeTest, TwoPhaseWriteReadCloseConsumer) {
1114   const char kTestData[] = "hello world";
1115   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1116 
1117   const MojoCreateDataPipeOptions options = {
1118       kSizeOfOptions,                           // |struct_size|.
1119       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1120       1u,                                       // |element_num_bytes|.
1121       1000u                                     // |capacity_num_bytes|.
1122   };
1123   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1124   MojoHandleSignalsState hss;
1125 
1126   // Write some data, so we'll have something to read.
1127   uint32_t num_bytes = kTestDataSize;
1128   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1129   ASSERT_EQ(kTestDataSize, num_bytes);
1130 
1131   // Start two-phase write.
1132   void* write_buffer_ptr = nullptr;
1133   num_bytes = 0u;
1134   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1135   EXPECT_TRUE(write_buffer_ptr);
1136   ASSERT_GT(num_bytes, kTestDataSize);
1137 
1138   // Wait for data.
1139   // TODO(vtl): (See corresponding TODO in AllOrNone.)
1140   hss = MojoHandleSignalsState();
1141   ASSERT_EQ(MOJO_RESULT_OK,
1142             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1143                      MOJO_DEADLINE_INDEFINITE, &hss));
1144   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1145   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1146             hss.satisfiable_signals);
1147 
1148   // Start two-phase read.
1149   const void* read_buffer_ptr = nullptr;
1150   num_bytes = 0u;
1151   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &num_bytes));
1152   EXPECT_TRUE(read_buffer_ptr);
1153   ASSERT_EQ(kTestDataSize, num_bytes);
1154 
1155   // Close the consumer.
1156   CloseConsumer();
1157 
1158   // Wait for producer to know that the consumer is closed.
1159   hss = MojoHandleSignalsState();
1160   ASSERT_EQ(MOJO_RESULT_OK,
1161             MojoWait(producer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1162                      MOJO_DEADLINE_INDEFINITE, &hss));
1163   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1164   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
1165 
1166   // Actually write some data. (Note: Premature freeing of the buffer would
1167   // probably only be detected under ASAN or similar.)
1168   memcpy(write_buffer_ptr, kTestData, kTestDataSize);
1169   // Note: Even though the consumer has been closed, ending the two-phase
1170   // write will report success.
1171   ASSERT_EQ(MOJO_RESULT_OK, EndWriteData(kTestDataSize));
1172 
1173   // But trying to write should result in failure.
1174   num_bytes = kTestDataSize;
1175   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, WriteData(kTestData, &num_bytes));
1176 
1177   // As will trying to start another two-phase write.
1178   write_buffer_ptr = nullptr;
1179   num_bytes = 0u;
1180   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1181             BeginWriteData(&write_buffer_ptr, &num_bytes));
1182 }
1183 
1184 // Tests the behavior of "interrupting" a two-phase write by closing both the
1185 // producer and the consumer.
TEST_F(DataPipeTest,TwoPhaseWriteCloseBoth)1186 TEST_F(DataPipeTest, TwoPhaseWriteCloseBoth) {
1187   const uint32_t kTestDataSize = 15u;
1188 
1189   const MojoCreateDataPipeOptions options = {
1190       kSizeOfOptions,                           // |struct_size|.
1191       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1192       1u,                                       // |element_num_bytes|.
1193       1000u                                     // |capacity_num_bytes|.
1194   };
1195   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1196 
1197   // Start two-phase write.
1198   void* write_buffer_ptr = nullptr;
1199   uint32_t num_bytes = 0u;
1200   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_buffer_ptr, &num_bytes));
1201   EXPECT_TRUE(write_buffer_ptr);
1202   ASSERT_GT(num_bytes, kTestDataSize);
1203 }
1204 
1205 // Tests the behavior of writing, closing the producer, and then reading (with
1206 // and without data remaining).
TEST_F(DataPipeTest,WriteCloseProducerReadNoData)1207 TEST_F(DataPipeTest, WriteCloseProducerReadNoData) {
1208   const char kTestData[] = "hello world";
1209   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1210 
1211   const MojoCreateDataPipeOptions options = {
1212       kSizeOfOptions,                           // |struct_size|.
1213       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1214       1u,                                       // |element_num_bytes|.
1215       1000u                                     // |capacity_num_bytes|.
1216   };
1217   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1218   MojoHandleSignalsState hss;
1219 
1220   // Write some data, so we'll have something to read.
1221   uint32_t num_bytes = kTestDataSize;
1222   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1223   ASSERT_EQ(kTestDataSize, num_bytes);
1224 
1225   // Close the producer.
1226   CloseProducer();
1227 
1228   // Wait. (Note that once the consumer knows that the producer is closed, it
1229   // must also know about all the data that was sent.)
1230   hss = MojoHandleSignalsState();
1231   ASSERT_EQ(MOJO_RESULT_OK,
1232             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1233                      MOJO_DEADLINE_INDEFINITE, &hss));
1234   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1235             hss.satisfied_signals);
1236   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1237             hss.satisfiable_signals);
1238 
1239   // Peek that data.
1240   char buffer[1000];
1241   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1242   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes, false, true));
1243   ASSERT_EQ(kTestDataSize, num_bytes);
1244   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1245 
1246   // Read that data.
1247   memset(buffer, 0, 1000);
1248   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1249   ASSERT_EQ(MOJO_RESULT_OK, ReadData(buffer, &num_bytes));
1250   ASSERT_EQ(kTestDataSize, num_bytes);
1251   ASSERT_EQ(0, memcmp(buffer, kTestData, kTestDataSize));
1252 
1253   // A second read should fail.
1254   num_bytes = static_cast<uint32_t>(sizeof(buffer));
1255   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, ReadData(buffer, &num_bytes));
1256 
1257   // A two-phase read should also fail.
1258   const void* read_buffer_ptr = nullptr;
1259   num_bytes = 0u;
1260   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1261             BeginReadData(&read_buffer_ptr, &num_bytes));
1262 
1263   // Ditto for discard.
1264   num_bytes = 10u;
1265   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, DiscardData(&num_bytes));
1266 }
1267 
1268 // Test that during a two phase read the memory stays valid even if more data
1269 // comes in.
TEST_F(DataPipeTest,TwoPhaseReadMemoryStable)1270 TEST_F(DataPipeTest, TwoPhaseReadMemoryStable) {
1271   const char kTestData[] = "hello world";
1272   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1273 
1274   const MojoCreateDataPipeOptions options = {
1275       kSizeOfOptions,                           // |struct_size|.
1276       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1277       1u,                                       // |element_num_bytes|.
1278       1000u                                     // |capacity_num_bytes|.
1279   };
1280   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1281   MojoHandleSignalsState hss;
1282 
1283   // Write some data.
1284   uint32_t num_bytes = kTestDataSize;
1285   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1286   ASSERT_EQ(kTestDataSize, num_bytes);
1287 
1288   // Wait for the data.
1289   hss = MojoHandleSignalsState();
1290   ASSERT_EQ(MOJO_RESULT_OK,
1291             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1292                      MOJO_DEADLINE_INDEFINITE, &hss));
1293   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1294   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1295             hss.satisfiable_signals);
1296 
1297   // Begin a two-phase read.
1298   const void* read_buffer_ptr = nullptr;
1299   uint32_t read_buffer_size = 0u;
1300   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_buffer_ptr, &read_buffer_size));
1301 
1302   // Write more data.
1303   const char kExtraData[] = "bye world";
1304   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1305   num_bytes = kExtraDataSize;
1306   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1307   ASSERT_EQ(kExtraDataSize, num_bytes);
1308 
1309   // Close the producer.
1310   CloseProducer();
1311 
1312   // Wait. (Note that once the consumer knows that the producer is closed, it
1313   // must also have received the extra data).
1314   hss = MojoHandleSignalsState();
1315   ASSERT_EQ(MOJO_RESULT_OK,
1316             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1317                      MOJO_DEADLINE_INDEFINITE, &hss));
1318   ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
1319   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1320             hss.satisfiable_signals);
1321 
1322   // Read the two phase memory to check it's still valid.
1323   ASSERT_EQ(0, memcmp(read_buffer_ptr, kTestData, kTestDataSize));
1324   EndReadData(read_buffer_size);
1325 }
1326 
1327 // Test that two-phase reads/writes behave correctly when given invalid
1328 // arguments.
TEST_F(DataPipeTest,TwoPhaseMoreInvalidArguments)1329 TEST_F(DataPipeTest, TwoPhaseMoreInvalidArguments) {
1330   const MojoCreateDataPipeOptions options = {
1331       kSizeOfOptions,                           // |struct_size|.
1332       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1333       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
1334       10 * sizeof(int32_t)                      // |capacity_num_bytes|.
1335   };
1336   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1337   MojoHandleSignalsState hss;
1338 
1339   // No data.
1340   uint32_t num_bytes = 1000u;
1341   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1342   ASSERT_EQ(0u, num_bytes);
1343 
1344   // Try "ending" a two-phase write when one isn't active.
1345   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
1346             EndWriteData(1u * sizeof(int32_t)));
1347 
1348   // Wait a bit, to make sure that if a signal were (incorrectly) sent, it'd
1349   // have time to propagate.
1350   test::Sleep(test::EpsilonDeadline());
1351 
1352   // Still no data.
1353   num_bytes = 1000u;
1354   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1355   ASSERT_EQ(0u, num_bytes);
1356 
1357   // Try ending a two-phase write with an invalid amount (too much).
1358   num_bytes = 0u;
1359   void* write_ptr = nullptr;
1360   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1361   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1362             EndWriteData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1363 
1364   // But the two-phase write still ended.
1365   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1366 
1367   // Wait a bit (as above).
1368   test::Sleep(test::EpsilonDeadline());
1369 
1370   // Still no data.
1371   num_bytes = 1000u;
1372   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1373   ASSERT_EQ(0u, num_bytes);
1374 
1375   // Try ending a two-phase write with an invalid amount (not a multiple of the
1376   // element size).
1377   num_bytes = 0u;
1378   write_ptr = nullptr;
1379   ASSERT_EQ(MOJO_RESULT_OK, BeginWriteData(&write_ptr, &num_bytes));
1380   EXPECT_GE(num_bytes, 1u);
1381   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndWriteData(1u));
1382 
1383   // But the two-phase write still ended.
1384   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndWriteData(0u));
1385 
1386   // Wait a bit (as above).
1387   test::Sleep(test::EpsilonDeadline());
1388 
1389   // Still no data.
1390   num_bytes = 1000u;
1391   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1392   ASSERT_EQ(0u, num_bytes);
1393 
1394   // Now write some data, so we'll be able to try reading.
1395   int32_t element = 123;
1396   num_bytes = 1u * sizeof(int32_t);
1397   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&element, &num_bytes));
1398 
1399   // Wait for data.
1400   // TODO(vtl): (See corresponding TODO in AllOrNone.)
1401   hss = MojoHandleSignalsState();
1402   ASSERT_EQ(MOJO_RESULT_OK,
1403             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1404                      MOJO_DEADLINE_INDEFINITE, &hss));
1405   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1406   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1407             hss.satisfiable_signals);
1408 
1409   // One element available.
1410   num_bytes = 0u;
1411   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1412   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1413 
1414   // Try "ending" a two-phase read when one isn't active.
1415   ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION, EndReadData(1u * sizeof(int32_t)));
1416 
1417   // Still one element available.
1418   num_bytes = 0u;
1419   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1420   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1421 
1422   // Try ending a two-phase read with an invalid amount (too much).
1423   num_bytes = 0u;
1424   const void* read_ptr = nullptr;
1425   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1426   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT,
1427             EndReadData(num_bytes + static_cast<uint32_t>(sizeof(int32_t))));
1428 
1429   // Still one element available.
1430   num_bytes = 0u;
1431   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1432   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1433 
1434   // Try ending a two-phase read with an invalid amount (not a multiple of the
1435   // element size).
1436   num_bytes = 0u;
1437   read_ptr = nullptr;
1438   ASSERT_EQ(MOJO_RESULT_OK, BeginReadData(&read_ptr, &num_bytes));
1439   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1440   ASSERT_EQ(123, static_cast<const int32_t*>(read_ptr)[0]);
1441   ASSERT_EQ(MOJO_RESULT_INVALID_ARGUMENT, EndReadData(1u));
1442 
1443   // Still one element available.
1444   num_bytes = 0u;
1445   ASSERT_EQ(MOJO_RESULT_OK, QueryData(&num_bytes));
1446   ASSERT_EQ(1u * sizeof(int32_t), num_bytes);
1447 }
1448 
1449 // Test that a producer can be sent over a MP.
TEST_F(DataPipeTest,SendProducer)1450 TEST_F(DataPipeTest, SendProducer) {
1451   const char kTestData[] = "hello world";
1452   const uint32_t kTestDataSize = static_cast<uint32_t>(sizeof(kTestData));
1453 
1454   const MojoCreateDataPipeOptions options = {
1455       kSizeOfOptions,                           // |struct_size|.
1456       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1457       1u,                                       // |element_num_bytes|.
1458       1000u                                     // |capacity_num_bytes|.
1459   };
1460   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1461   MojoHandleSignalsState hss;
1462 
1463   // Write some data.
1464   uint32_t num_bytes = kTestDataSize;
1465   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kTestData, &num_bytes));
1466   ASSERT_EQ(kTestDataSize, num_bytes);
1467 
1468   // Wait for the data.
1469   hss = MojoHandleSignalsState();
1470   ASSERT_EQ(MOJO_RESULT_OK,
1471             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1472                      MOJO_DEADLINE_INDEFINITE, &hss));
1473   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1474   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1475             hss.satisfiable_signals);
1476 
1477   // Check the data.
1478   const void* read_buffer = nullptr;
1479   num_bytes = 0u;
1480   ASSERT_EQ(MOJO_RESULT_OK,
1481             BeginReadData(&read_buffer, &num_bytes, false));
1482   ASSERT_EQ(0, memcmp(read_buffer, kTestData, kTestDataSize));
1483   EndReadData(num_bytes);
1484 
1485   // Now send the producer over a MP so that it's serialized.
1486   MojoHandle pipe0, pipe1;
1487   ASSERT_EQ(MOJO_RESULT_OK,
1488             MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1489 
1490   ASSERT_EQ(MOJO_RESULT_OK,
1491             MojoWriteMessage(pipe0, nullptr, 0, &producer_, 1,
1492                              MOJO_WRITE_MESSAGE_FLAG_NONE));
1493   producer_ = MOJO_HANDLE_INVALID;
1494   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
1495                                      MOJO_DEADLINE_INDEFINITE, &hss));
1496   uint32_t num_handles = 1;
1497   ASSERT_EQ(MOJO_RESULT_OK,
1498             MojoReadMessage(pipe1, nullptr, 0, &producer_, &num_handles,
1499                             MOJO_READ_MESSAGE_FLAG_NONE));
1500   ASSERT_EQ(num_handles, 1u);
1501 
1502   // Write more data.
1503   const char kExtraData[] = "bye world";
1504   const uint32_t kExtraDataSize = static_cast<uint32_t>(sizeof(kExtraData));
1505   num_bytes = kExtraDataSize;
1506   ASSERT_EQ(MOJO_RESULT_OK, WriteData(kExtraData, &num_bytes));
1507   ASSERT_EQ(kExtraDataSize, num_bytes);
1508 
1509   // Wait for it.
1510   hss = MojoHandleSignalsState();
1511   ASSERT_EQ(MOJO_RESULT_OK,
1512             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_READABLE,
1513                      MOJO_DEADLINE_INDEFINITE, &hss));
1514   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
1515   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1516             hss.satisfiable_signals);
1517 
1518   // Check the second write.
1519   num_bytes = 0u;
1520   ASSERT_EQ(MOJO_RESULT_OK,
1521             BeginReadData(&read_buffer, &num_bytes, false));
1522   ASSERT_EQ(0, memcmp(read_buffer, kExtraData, kExtraDataSize));
1523   EndReadData(num_bytes);
1524 
1525   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1526   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1527 }
1528 
1529 // Ensures that if a data pipe consumer whose producer has closed is passed over
1530 // a message pipe, the deserialized dispatcher is also marked as having a closed
1531 // peer.
TEST_F(DataPipeTest,ConsumerWithClosedProducerSent)1532 TEST_F(DataPipeTest, ConsumerWithClosedProducerSent) {
1533   const MojoCreateDataPipeOptions options = {
1534       kSizeOfOptions,                           // |struct_size|.
1535       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1536       static_cast<uint32_t>(sizeof(int32_t)),   // |element_num_bytes|.
1537       1000 * sizeof(int32_t)                    // |capacity_num_bytes|.
1538   };
1539 
1540   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1541 
1542   // We can write to a data pipe handle immediately.
1543   int32_t data = 123;
1544   uint32_t num_bytes = sizeof(data);
1545   ASSERT_EQ(MOJO_RESULT_OK, WriteData(&data, &num_bytes));
1546   ASSERT_EQ(MOJO_RESULT_OK, CloseProducer());
1547 
1548   // Now wait for the other side to become readable and to see the peer closed.
1549   MojoHandleSignalsState state;
1550   ASSERT_EQ(MOJO_RESULT_OK,
1551             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1552                      MOJO_DEADLINE_INDEFINITE, &state));
1553   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1554             state.satisfied_signals);
1555   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1556             state.satisfiable_signals);
1557 
1558   // Now send the consumer over a MP so that it's serialized.
1559   MojoHandle pipe0, pipe1;
1560   ASSERT_EQ(MOJO_RESULT_OK,
1561             MojoCreateMessagePipe(nullptr, &pipe0, &pipe1));
1562 
1563   ASSERT_EQ(MOJO_RESULT_OK,
1564             MojoWriteMessage(pipe0, nullptr, 0, &consumer_, 1,
1565                              MOJO_WRITE_MESSAGE_FLAG_NONE));
1566   consumer_ = MOJO_HANDLE_INVALID;
1567   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(pipe1, MOJO_HANDLE_SIGNAL_READABLE,
1568                                      MOJO_DEADLINE_INDEFINITE, &state));
1569   uint32_t num_handles = 1;
1570   ASSERT_EQ(MOJO_RESULT_OK,
1571             MojoReadMessage(pipe1, nullptr, 0, &consumer_, &num_handles,
1572                             MOJO_READ_MESSAGE_FLAG_NONE));
1573   ASSERT_EQ(num_handles, 1u);
1574 
1575   ASSERT_EQ(MOJO_RESULT_OK,
1576             MojoWait(consumer_, MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1577                      MOJO_DEADLINE_INDEFINITE, &state));
1578   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1579             state.satisfied_signals);
1580   ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1581             state.satisfiable_signals);
1582 
1583   int32_t read_data;
1584   ASSERT_EQ(MOJO_RESULT_OK, ReadData(&read_data, &num_bytes));
1585   ASSERT_EQ(sizeof(read_data), num_bytes);
1586   ASSERT_EQ(data, read_data);
1587 
1588   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe0));
1589   ASSERT_EQ(MOJO_RESULT_OK, MojoClose(pipe1));
1590 }
1591 
WriteAllData(MojoHandle producer,const void * elements,uint32_t num_bytes)1592 bool WriteAllData(MojoHandle producer,
1593                   const void* elements,
1594                   uint32_t num_bytes) {
1595   for (size_t i = 0; i < kMaxPoll; i++) {
1596     // Write as much data as we can.
1597     uint32_t write_bytes = num_bytes;
1598     MojoResult result = MojoWriteData(producer, elements, &write_bytes,
1599                                       MOJO_WRITE_DATA_FLAG_NONE);
1600     if (result == MOJO_RESULT_OK) {
1601       num_bytes -= write_bytes;
1602       elements = static_cast<const uint8_t*>(elements) + write_bytes;
1603       if (num_bytes == 0)
1604         return true;
1605     } else {
1606       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1607     }
1608 
1609     MojoHandleSignalsState hss = MojoHandleSignalsState();
1610     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(producer, MOJO_HANDLE_SIGNAL_WRITABLE,
1611                                        MOJO_DEADLINE_INDEFINITE, &hss));
1612     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
1613     EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1614               hss.satisfiable_signals);
1615   }
1616 
1617   return false;
1618 }
1619 
1620 // If |expect_empty| is true, expect |consumer| to be empty after reading.
ReadAllData(MojoHandle consumer,void * elements,uint32_t num_bytes,bool expect_empty)1621 bool ReadAllData(MojoHandle consumer,
1622                  void* elements,
1623                  uint32_t num_bytes,
1624                  bool expect_empty) {
1625   for (size_t i = 0; i < kMaxPoll; i++) {
1626     // Read as much data as we can.
1627     uint32_t read_bytes = num_bytes;
1628     MojoResult result =
1629         MojoReadData(consumer, elements, &read_bytes, MOJO_READ_DATA_FLAG_NONE);
1630     if (result == MOJO_RESULT_OK) {
1631       num_bytes -= read_bytes;
1632       elements = static_cast<uint8_t*>(elements) + read_bytes;
1633       if (num_bytes == 0) {
1634         if (expect_empty) {
1635           // Expect no more data.
1636           test::Sleep(test::TinyDeadline());
1637           MojoReadData(consumer, nullptr, &num_bytes,
1638                        MOJO_READ_DATA_FLAG_QUERY);
1639           EXPECT_EQ(0u, num_bytes);
1640         }
1641         return true;
1642       }
1643     } else {
1644       EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT, result);
1645     }
1646 
1647     MojoHandleSignalsState hss = MojoHandleSignalsState();
1648     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(consumer, MOJO_HANDLE_SIGNAL_READABLE,
1649                                        MOJO_DEADLINE_INDEFINITE, &hss));
1650     // Peer could have become closed while we're still waiting for data.
1651     EXPECT_TRUE(MOJO_HANDLE_SIGNAL_READABLE & hss.satisfied_signals);
1652     EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
1653               hss.satisfiable_signals);
1654   }
1655 
1656   return num_bytes == 0;
1657 }
1658 
1659 #if !defined(OS_IOS)
1660 
TEST_F(DataPipeTest,Multiprocess)1661 TEST_F(DataPipeTest, Multiprocess) {
1662   const uint32_t kTestDataSize =
1663       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1664   const MojoCreateDataPipeOptions options = {
1665       kSizeOfOptions,                           // |struct_size|.
1666       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1667       1,                                        // |element_num_bytes|.
1668       kMultiprocessCapacity                     // |capacity_num_bytes|.
1669   };
1670   ASSERT_EQ(MOJO_RESULT_OK, Create(&options));
1671 
1672   RUN_CHILD_ON_PIPE(MultiprocessClient, server_mp)
1673     // Send some data before serialising and sending the data pipe over.
1674     // This is the first write so we don't need to use WriteAllData.
1675     uint32_t num_bytes = kTestDataSize;
1676     ASSERT_EQ(MOJO_RESULT_OK, WriteData(kMultiprocessTestData, &num_bytes,
1677                                         MOJO_WRITE_DATA_FLAG_ALL_OR_NONE));
1678     ASSERT_EQ(kTestDataSize, num_bytes);
1679 
1680     // Send child process the data pipe.
1681     ASSERT_EQ(MOJO_RESULT_OK,
1682               MojoWriteMessage(server_mp, nullptr, 0, &consumer_, 1,
1683                                MOJO_WRITE_MESSAGE_FLAG_NONE));
1684 
1685     // Send a bunch of data of varying sizes.
1686     uint8_t buffer[100];
1687     int seq = 0;
1688     for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1689       for (uint32_t size = 1; size <= kMultiprocessCapacity; size++) {
1690         for (unsigned int j = 0; j < size; ++j)
1691           buffer[j] = seq + j;
1692         EXPECT_TRUE(WriteAllData(producer_, buffer, size));
1693         seq += size;
1694       }
1695     }
1696 
1697     // Write the test string in again.
1698     ASSERT_TRUE(WriteAllData(producer_, kMultiprocessTestData, kTestDataSize));
1699 
1700     // Swap ends.
1701     ASSERT_EQ(MOJO_RESULT_OK,
1702               MojoWriteMessage(server_mp, nullptr, 0, &producer_, 1,
1703                                MOJO_WRITE_MESSAGE_FLAG_NONE));
1704 
1705     // Receive the consumer from the other side.
1706     producer_ = MOJO_HANDLE_INVALID;
1707     MojoHandleSignalsState hss = MojoHandleSignalsState();
1708     ASSERT_EQ(MOJO_RESULT_OK, MojoWait(server_mp, MOJO_HANDLE_SIGNAL_READABLE,
1709                                        MOJO_DEADLINE_INDEFINITE, &hss));
1710     MojoHandle handles[2];
1711     uint32_t num_handles = arraysize(handles);
1712     ASSERT_EQ(MOJO_RESULT_OK,
1713               MojoReadMessage(server_mp, nullptr, 0, handles, &num_handles,
1714                               MOJO_READ_MESSAGE_FLAG_NONE));
1715     ASSERT_EQ(1u, num_handles);
1716     consumer_ = handles[0];
1717 
1718     // Read the test string twice. Once for when we sent it, and once for the
1719     // other end sending it.
1720     for (int i = 0; i < 2; ++i) {
1721       EXPECT_TRUE(ReadAllData(consumer_, buffer, kTestDataSize, i == 1));
1722       EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1723     }
1724 
1725     WriteMessage(server_mp, "quit");
1726 
1727     // Don't have to close the consumer here because it will be done for us.
1728   END_CHILD()
1729 }
1730 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient,DataPipeTest,client_mp)1731 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessClient, DataPipeTest, client_mp) {
1732   const uint32_t kTestDataSize =
1733       static_cast<uint32_t>(sizeof(kMultiprocessTestData));
1734 
1735   // Receive the data pipe from the other side.
1736   MojoHandle consumer = MOJO_HANDLE_INVALID;
1737   MojoHandleSignalsState hss = MojoHandleSignalsState();
1738   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
1739                                      MOJO_DEADLINE_INDEFINITE, &hss));
1740   MojoHandle handles[2];
1741   uint32_t num_handles = arraysize(handles);
1742   ASSERT_EQ(MOJO_RESULT_OK,
1743             MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
1744                             MOJO_READ_MESSAGE_FLAG_NONE));
1745   ASSERT_EQ(1u, num_handles);
1746   consumer = handles[0];
1747 
1748   // Read the initial string that was sent.
1749   int32_t buffer[100];
1750   EXPECT_TRUE(ReadAllData(consumer, buffer, kTestDataSize, false));
1751   EXPECT_EQ(0, memcmp(buffer, kMultiprocessTestData, kTestDataSize));
1752 
1753   // Receive the main data and check it is correct.
1754   int seq = 0;
1755   uint8_t expected_buffer[100];
1756   for (int i = 0; i < kMultiprocessMaxIter; ++i) {
1757     for (uint32_t size = 1; size <= kMultiprocessCapacity; ++size) {
1758       for (unsigned int j = 0; j < size; ++j)
1759         expected_buffer[j] = seq + j;
1760       EXPECT_TRUE(ReadAllData(consumer, buffer, size, false));
1761       EXPECT_EQ(0, memcmp(buffer, expected_buffer, size));
1762 
1763       seq += size;
1764     }
1765   }
1766 
1767   // Swap ends.
1768   ASSERT_EQ(MOJO_RESULT_OK, MojoWriteMessage(client_mp, nullptr, 0, &consumer,
1769                                              1, MOJO_WRITE_MESSAGE_FLAG_NONE));
1770 
1771   // Receive the producer from the other side.
1772   MojoHandle producer = MOJO_HANDLE_INVALID;
1773   hss = MojoHandleSignalsState();
1774   ASSERT_EQ(MOJO_RESULT_OK, MojoWait(client_mp, MOJO_HANDLE_SIGNAL_READABLE,
1775                                      MOJO_DEADLINE_INDEFINITE, &hss));
1776   num_handles = arraysize(handles);
1777   ASSERT_EQ(MOJO_RESULT_OK,
1778             MojoReadMessage(client_mp, nullptr, 0, handles, &num_handles,
1779                             MOJO_READ_MESSAGE_FLAG_NONE));
1780   ASSERT_EQ(1u, num_handles);
1781   producer = handles[0];
1782 
1783   // Write the test string one more time.
1784   EXPECT_TRUE(WriteAllData(producer, kMultiprocessTestData, kTestDataSize));
1785 
1786   // We swapped ends, so close the producer.
1787   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(producer));
1788 
1789   // Wait to receive a "quit" message before exiting.
1790   EXPECT_EQ("quit", ReadMessage(client_mp));
1791 }
1792 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer,DataPipeTest,h)1793 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(WriteAndCloseProducer, DataPipeTest, h) {
1794   MojoHandle p;
1795   std::string message = ReadMessageWithHandles(h, &p, 1);
1796 
1797   // Write some data to the producer and close it.
1798   uint32_t num_bytes = static_cast<uint32_t>(message.size());
1799   EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, message.data(), &num_bytes,
1800                                           MOJO_WRITE_DATA_FLAG_NONE));
1801   EXPECT_EQ(num_bytes, static_cast<uint32_t>(message.size()));
1802 
1803   // Close the producer before quitting.
1804   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1805 
1806   // Wait for a quit message.
1807   EXPECT_EQ("quit", ReadMessage(h));
1808 }
1809 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer,DataPipeTest,h)1810 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(ReadAndCloseConsumer, DataPipeTest, h) {
1811   MojoHandle c;
1812   std::string expected_message = ReadMessageWithHandles(h, &c, 1);
1813 
1814   // Wait for the consumer to become readable.
1815   EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
1816                                      MOJO_DEADLINE_INDEFINITE, nullptr));
1817 
1818   // Drain the consumer and expect to find the given message.
1819   uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1820   std::vector<char> bytes(expected_message.size());
1821   EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
1822                                          MOJO_READ_DATA_FLAG_NONE));
1823   EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1824 
1825   std::string message(bytes.data(), bytes.size());
1826   EXPECT_EQ(expected_message, message);
1827 
1828   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1829 
1830   // Wait for a quit message.
1831   EXPECT_EQ("quit", ReadMessage(h));
1832 }
1833 
TEST_F(DataPipeTest,SendConsumerAndCloseProducer)1834 TEST_F(DataPipeTest, SendConsumerAndCloseProducer) {
1835   // Create a new data pipe.
1836   MojoHandle p, c;
1837   EXPECT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(nullptr, &p ,&c));
1838 
1839   RUN_CHILD_ON_PIPE(WriteAndCloseProducer, producer_client)
1840     RUN_CHILD_ON_PIPE(ReadAndCloseConsumer, consumer_client)
1841       const std::string kMessage = "Hello, world!";
1842       WriteMessageWithHandles(producer_client, kMessage, &p, 1);
1843       WriteMessageWithHandles(consumer_client, kMessage, &c, 1);
1844 
1845       WriteMessage(consumer_client, "quit");
1846     END_CHILD()
1847 
1848     WriteMessage(producer_client, "quit");
1849   END_CHILD()
1850 }
1851 
DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite,DataPipeTest,h)1852 DEFINE_TEST_CLIENT_TEST_WITH_PIPE(CreateAndWrite, DataPipeTest, h) {
1853   const MojoCreateDataPipeOptions options = {
1854       kSizeOfOptions,                           // |struct_size|.
1855       MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE,  // |flags|.
1856       1,                                        // |element_num_bytes|.
1857       kMultiprocessCapacity                     // |capacity_num_bytes|.
1858   };
1859 
1860   MojoHandle p, c;
1861   ASSERT_EQ(MOJO_RESULT_OK, MojoCreateDataPipe(&options, &p, &c));
1862 
1863   const std::string kMessage = "Hello, world!";
1864   WriteMessageWithHandles(h, kMessage, &c, 1);
1865 
1866   // Write some data to the producer and close it.
1867   uint32_t num_bytes = static_cast<uint32_t>(kMessage.size());
1868   EXPECT_EQ(MOJO_RESULT_OK, MojoWriteData(p, kMessage.data(), &num_bytes,
1869                                           MOJO_WRITE_DATA_FLAG_NONE));
1870   EXPECT_EQ(num_bytes, static_cast<uint32_t>(kMessage.size()));
1871   EXPECT_EQ(MOJO_RESULT_OK, MojoClose(p));
1872 
1873   // Wait for a quit message.
1874   EXPECT_EQ("quit", ReadMessage(h));
1875 }
1876 
TEST_F(DataPipeTest,CreateInChild)1877 TEST_F(DataPipeTest, CreateInChild) {
1878   RUN_CHILD_ON_PIPE(CreateAndWrite, child)
1879     MojoHandle c;
1880     std::string expected_message = ReadMessageWithHandles(child, &c, 1);
1881 
1882     // Wait for the consumer to become readable.
1883     EXPECT_EQ(MOJO_RESULT_OK, MojoWait(c, MOJO_HANDLE_SIGNAL_READABLE,
1884                                        MOJO_DEADLINE_INDEFINITE, nullptr));
1885 
1886     // Drain the consumer and expect to find the given message.
1887     uint32_t num_bytes = static_cast<uint32_t>(expected_message.size());
1888     std::vector<char> bytes(expected_message.size());
1889     EXPECT_EQ(MOJO_RESULT_OK, MojoReadData(c, bytes.data(), &num_bytes,
1890                                            MOJO_READ_DATA_FLAG_NONE));
1891     EXPECT_EQ(num_bytes, static_cast<uint32_t>(bytes.size()));
1892 
1893     std::string message(bytes.data(), bytes.size());
1894     EXPECT_EQ(expected_message, message);
1895 
1896     EXPECT_EQ(MOJO_RESULT_OK, MojoClose(c));
1897     WriteMessage(child, "quit");
1898   END_CHILD()
1899 }
1900 
1901 #endif  // !defined(OS_IOS)
1902 
1903 }  // namespace
1904 }  // namespace edk
1905 }  // namespace mojo
1906