1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_consumer_dispatcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <algorithm>
11 #include <limits>
12 #include <utility>
13
14 #include "base/bind.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/message_loop/message_loop.h"
18 #include "mojo/edk/embedder/embedder_internal.h"
19 #include "mojo/edk/embedder/platform_shared_buffer.h"
20 #include "mojo/edk/system/core.h"
21 #include "mojo/edk/system/data_pipe_control_message.h"
22 #include "mojo/edk/system/node_controller.h"
23 #include "mojo/edk/system/ports_message.h"
24 #include "mojo/edk/system/request_context.h"
25 #include "mojo/public/c/system/data_pipe.h"
26
27 namespace mojo {
28 namespace edk {
29
30 namespace {
31
32 const uint8_t kFlagPeerClosed = 0x01;
33
34 #pragma pack(push, 1)
35
36 struct SerializedState {
37 MojoCreateDataPipeOptions options;
38 uint64_t pipe_id;
39 uint32_t read_offset;
40 uint32_t bytes_available;
41 uint8_t flags;
42 char padding[7];
43 };
44
45 static_assert(sizeof(SerializedState) % 8 == 0,
46 "Invalid SerializedState size.");
47
48 #pragma pack(pop)
49
50 } // namespace
51
52 // A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a
53 // reference to the dispatcher to ensure it lives as long as the observed port.
54 class DataPipeConsumerDispatcher::PortObserverThunk
55 : public NodeController::PortObserver {
56 public:
PortObserverThunk(scoped_refptr<DataPipeConsumerDispatcher> dispatcher)57 explicit PortObserverThunk(
58 scoped_refptr<DataPipeConsumerDispatcher> dispatcher)
59 : dispatcher_(dispatcher) {}
60
61 private:
~PortObserverThunk()62 ~PortObserverThunk() override {}
63
64 // NodeController::PortObserver:
OnPortStatusChanged()65 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
66
67 scoped_refptr<DataPipeConsumerDispatcher> dispatcher_;
68
69 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
70 };
71
DataPipeConsumerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,const MojoCreateDataPipeOptions & options,bool initialized,uint64_t pipe_id)72 DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
73 NodeController* node_controller,
74 const ports::PortRef& control_port,
75 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
76 const MojoCreateDataPipeOptions& options,
77 bool initialized,
78 uint64_t pipe_id)
79 : options_(options),
80 node_controller_(node_controller),
81 control_port_(control_port),
82 pipe_id_(pipe_id),
83 shared_ring_buffer_(shared_ring_buffer) {
84 if (initialized) {
85 base::AutoLock lock(lock_);
86 InitializeNoLock();
87 }
88 }
89
GetType() const90 Dispatcher::Type DataPipeConsumerDispatcher::GetType() const {
91 return Type::DATA_PIPE_CONSUMER;
92 }
93
Close()94 MojoResult DataPipeConsumerDispatcher::Close() {
95 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe consumer " << pipe_id_;
97 return CloseNoLock();
98 }
99
100
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)101 MojoResult DataPipeConsumerDispatcher::Watch(
102 MojoHandleSignals signals,
103 const Watcher::WatchCallback& callback,
104 uintptr_t context) {
105 base::AutoLock lock(lock_);
106
107 if (is_closed_ || in_transit_)
108 return MOJO_RESULT_INVALID_ARGUMENT;
109
110 return awakable_list_.AddWatcher(
111 signals, callback, context, GetHandleSignalsStateNoLock());
112 }
113
CancelWatch(uintptr_t context)114 MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
115 base::AutoLock lock(lock_);
116
117 if (is_closed_ || in_transit_)
118 return MOJO_RESULT_INVALID_ARGUMENT;
119
120 return awakable_list_.RemoveWatcher(context);
121 }
122
ReadData(void * elements,uint32_t * num_bytes,MojoReadDataFlags flags)123 MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
124 uint32_t* num_bytes,
125 MojoReadDataFlags flags) {
126 base::AutoLock lock(lock_);
127 if (!shared_ring_buffer_ || in_transit_)
128 return MOJO_RESULT_INVALID_ARGUMENT;
129
130 if (in_two_phase_read_)
131 return MOJO_RESULT_BUSY;
132
133 if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
134 if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
135 (flags & MOJO_READ_DATA_FLAG_DISCARD))
136 return MOJO_RESULT_INVALID_ARGUMENT;
137 DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above.
138 DVLOG_IF(2, elements)
139 << "Query mode: ignoring non-null |elements|";
140 *num_bytes = static_cast<uint32_t>(bytes_available_);
141 return MOJO_RESULT_OK;
142 }
143
144 bool discard = false;
145 if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) {
146 // These flags are mutally exclusive.
147 if (flags & MOJO_READ_DATA_FLAG_PEEK)
148 return MOJO_RESULT_INVALID_ARGUMENT;
149 DVLOG_IF(2, elements)
150 << "Discard mode: ignoring non-null |elements|";
151 discard = true;
152 }
153
154 uint32_t max_num_bytes_to_read = *num_bytes;
155 if (max_num_bytes_to_read % options_.element_num_bytes != 0)
156 return MOJO_RESULT_INVALID_ARGUMENT;
157
158 bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE;
159 uint32_t min_num_bytes_to_read =
160 all_or_none ? max_num_bytes_to_read : 0;
161
162 if (min_num_bytes_to_read > bytes_available_) {
163 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
164 : MOJO_RESULT_OUT_OF_RANGE;
165 }
166
167 uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
168 if (bytes_to_read == 0) {
169 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
170 : MOJO_RESULT_SHOULD_WAIT;
171 }
172
173 if (!discard) {
174 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
175 CHECK(data);
176
177 uint8_t* destination = static_cast<uint8_t*>(elements);
178 CHECK(destination);
179
180 DCHECK_LE(read_offset_, options_.capacity_num_bytes);
181 uint32_t tail_bytes_to_copy =
182 std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read);
183 uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy;
184 if (tail_bytes_to_copy > 0)
185 memcpy(destination, data + read_offset_, tail_bytes_to_copy);
186 if (head_bytes_to_copy > 0)
187 memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy);
188 }
189 *num_bytes = bytes_to_read;
190
191 bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK);
192 if (discard || !peek) {
193 read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes;
194 bytes_available_ -= bytes_to_read;
195
196 base::AutoUnlock unlock(lock_);
197 NotifyRead(bytes_to_read);
198 }
199
200 return MOJO_RESULT_OK;
201 }
202
BeginReadData(const void ** buffer,uint32_t * buffer_num_bytes,MojoReadDataFlags flags)203 MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
204 uint32_t* buffer_num_bytes,
205 MojoReadDataFlags flags) {
206 base::AutoLock lock(lock_);
207 if (!shared_ring_buffer_ || in_transit_)
208 return MOJO_RESULT_INVALID_ARGUMENT;
209
210 if (in_two_phase_read_)
211 return MOJO_RESULT_BUSY;
212
213 // These flags may not be used in two-phase mode.
214 if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
215 (flags & MOJO_READ_DATA_FLAG_QUERY) ||
216 (flags & MOJO_READ_DATA_FLAG_PEEK))
217 return MOJO_RESULT_INVALID_ARGUMENT;
218
219 if (bytes_available_ == 0) {
220 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
221 : MOJO_RESULT_SHOULD_WAIT;
222 }
223
224 DCHECK_LT(read_offset_, options_.capacity_num_bytes);
225 uint32_t bytes_to_read = std::min(bytes_available_,
226 options_.capacity_num_bytes - read_offset_);
227
228 CHECK(ring_buffer_mapping_);
229 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
230 CHECK(data);
231
232 in_two_phase_read_ = true;
233 *buffer = data + read_offset_;
234 *buffer_num_bytes = bytes_to_read;
235 two_phase_max_bytes_read_ = bytes_to_read;
236
237 return MOJO_RESULT_OK;
238 }
239
EndReadData(uint32_t num_bytes_read)240 MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
241 base::AutoLock lock(lock_);
242 if (!in_two_phase_read_)
243 return MOJO_RESULT_FAILED_PRECONDITION;
244
245 if (in_transit_)
246 return MOJO_RESULT_INVALID_ARGUMENT;
247
248 CHECK(shared_ring_buffer_);
249
250 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
251 MojoResult rv;
252 if (num_bytes_read > two_phase_max_bytes_read_ ||
253 num_bytes_read % options_.element_num_bytes != 0) {
254 rv = MOJO_RESULT_INVALID_ARGUMENT;
255 } else {
256 rv = MOJO_RESULT_OK;
257 read_offset_ =
258 (read_offset_ + num_bytes_read) % options_.capacity_num_bytes;
259
260 DCHECK_GE(bytes_available_, num_bytes_read);
261 bytes_available_ -= num_bytes_read;
262
263 base::AutoUnlock unlock(lock_);
264 NotifyRead(num_bytes_read);
265 }
266
267 in_two_phase_read_ = false;
268 two_phase_max_bytes_read_ = 0;
269
270 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
271 if (!new_state.equals(old_state))
272 awakable_list_.AwakeForStateChange(new_state);
273
274 return rv;
275 }
276
GetHandleSignalsState() const277 HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
278 base::AutoLock lock(lock_);
279 return GetHandleSignalsStateNoLock();
280 }
281
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)282 MojoResult DataPipeConsumerDispatcher::AddAwakable(
283 Awakable* awakable,
284 MojoHandleSignals signals,
285 uintptr_t context,
286 HandleSignalsState* signals_state) {
287 base::AutoLock lock(lock_);
288 if (!shared_ring_buffer_ || in_transit_) {
289 if (signals_state)
290 *signals_state = HandleSignalsState();
291 return MOJO_RESULT_INVALID_ARGUMENT;
292 }
293 UpdateSignalsStateNoLock();
294 HandleSignalsState state = GetHandleSignalsStateNoLock();
295 if (state.satisfies(signals)) {
296 if (signals_state)
297 *signals_state = state;
298 return MOJO_RESULT_ALREADY_EXISTS;
299 }
300 if (!state.can_satisfy(signals)) {
301 if (signals_state)
302 *signals_state = state;
303 return MOJO_RESULT_FAILED_PRECONDITION;
304 }
305
306 awakable_list_.Add(awakable, signals, context);
307 return MOJO_RESULT_OK;
308 }
309
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)310 void DataPipeConsumerDispatcher::RemoveAwakable(
311 Awakable* awakable,
312 HandleSignalsState* signals_state) {
313 base::AutoLock lock(lock_);
314 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
315 *signals_state = HandleSignalsState();
316 else if (signals_state)
317 *signals_state = GetHandleSignalsStateNoLock();
318 awakable_list_.Remove(awakable);
319 }
320
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)321 void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes,
322 uint32_t* num_ports,
323 uint32_t* num_handles) {
324 base::AutoLock lock(lock_);
325 DCHECK(in_transit_);
326 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
327 *num_ports = 1;
328 *num_handles = 1;
329 }
330
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)331 bool DataPipeConsumerDispatcher::EndSerialize(
332 void* destination,
333 ports::PortName* ports,
334 PlatformHandle* platform_handles) {
335 SerializedState* state = static_cast<SerializedState*>(destination);
336 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
337 memset(state->padding, 0, sizeof(state->padding));
338
339 base::AutoLock lock(lock_);
340 DCHECK(in_transit_);
341 state->pipe_id = pipe_id_;
342 state->read_offset = read_offset_;
343 state->bytes_available = bytes_available_;
344 state->flags = peer_closed_ ? kFlagPeerClosed : 0;
345
346 ports[0] = control_port_.name();
347
348 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
349 platform_handles[0] = buffer_handle_for_transit_.get();
350
351 return true;
352 }
353
BeginTransit()354 bool DataPipeConsumerDispatcher::BeginTransit() {
355 base::AutoLock lock(lock_);
356 if (in_transit_)
357 return false;
358 in_transit_ = !in_two_phase_read_;
359 return in_transit_;
360 }
361
CompleteTransitAndClose()362 void DataPipeConsumerDispatcher::CompleteTransitAndClose() {
363 node_controller_->SetPortObserver(control_port_, nullptr);
364
365 base::AutoLock lock(lock_);
366 DCHECK(in_transit_);
367 in_transit_ = false;
368 transferred_ = true;
369 ignore_result(buffer_handle_for_transit_.release());
370 CloseNoLock();
371 }
372
CancelTransit()373 void DataPipeConsumerDispatcher::CancelTransit() {
374 base::AutoLock lock(lock_);
375 DCHECK(in_transit_);
376 in_transit_ = false;
377 buffer_handle_for_transit_.reset();
378 UpdateSignalsStateNoLock();
379 }
380
381 // static
382 scoped_refptr<DataPipeConsumerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)383 DataPipeConsumerDispatcher::Deserialize(const void* data,
384 size_t num_bytes,
385 const ports::PortName* ports,
386 size_t num_ports,
387 PlatformHandle* handles,
388 size_t num_handles) {
389 if (num_ports != 1 || num_handles != 1 ||
390 num_bytes != sizeof(SerializedState)) {
391 return nullptr;
392 }
393
394 const SerializedState* state = static_cast<const SerializedState*>(data);
395
396 NodeController* node_controller = internal::g_core->GetNodeController();
397 ports::PortRef port;
398 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
399 return nullptr;
400
401 PlatformHandle buffer_handle;
402 std::swap(buffer_handle, handles[0]);
403 scoped_refptr<PlatformSharedBuffer> ring_buffer =
404 PlatformSharedBuffer::CreateFromPlatformHandle(
405 state->options.capacity_num_bytes,
406 false /* read_only */,
407 ScopedPlatformHandle(buffer_handle));
408 if (!ring_buffer) {
409 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
410 return nullptr;
411 }
412
413 scoped_refptr<DataPipeConsumerDispatcher> dispatcher =
414 new DataPipeConsumerDispatcher(node_controller, port, ring_buffer,
415 state->options, false /* initialized */,
416 state->pipe_id);
417
418 {
419 base::AutoLock lock(dispatcher->lock_);
420 dispatcher->read_offset_ = state->read_offset;
421 dispatcher->bytes_available_ = state->bytes_available;
422 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
423 dispatcher->InitializeNoLock();
424 }
425
426 return dispatcher;
427 }
428
~DataPipeConsumerDispatcher()429 DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() {
430 DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ &&
431 !in_transit_);
432 }
433
InitializeNoLock()434 void DataPipeConsumerDispatcher::InitializeNoLock() {
435 lock_.AssertAcquired();
436
437 if (shared_ring_buffer_) {
438 DCHECK(!ring_buffer_mapping_);
439 ring_buffer_mapping_ =
440 shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
441 if (!ring_buffer_mapping_) {
442 DLOG(ERROR) << "Failed to map shared buffer.";
443 shared_ring_buffer_ = nullptr;
444 }
445 }
446
447 base::AutoUnlock unlock(lock_);
448 node_controller_->SetPortObserver(
449 control_port_,
450 make_scoped_refptr(new PortObserverThunk(this)));
451 }
452
CloseNoLock()453 MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
454 lock_.AssertAcquired();
455 if (is_closed_ || in_transit_)
456 return MOJO_RESULT_INVALID_ARGUMENT;
457 is_closed_ = true;
458 ring_buffer_mapping_.reset();
459 shared_ring_buffer_ = nullptr;
460
461 awakable_list_.CancelAll();
462 if (!transferred_) {
463 base::AutoUnlock unlock(lock_);
464 node_controller_->ClosePort(control_port_);
465 }
466
467 return MOJO_RESULT_OK;
468 }
469
470 HandleSignalsState
GetHandleSignalsStateNoLock() const471 DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
472 lock_.AssertAcquired();
473
474 HandleSignalsState rv;
475 if (shared_ring_buffer_ && bytes_available_) {
476 if (!in_two_phase_read_)
477 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
478 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
479 } else if (!peer_closed_ && shared_ring_buffer_) {
480 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
481 }
482
483 if (peer_closed_)
484 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
485 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
486 return rv;
487 }
488
NotifyRead(uint32_t num_bytes)489 void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) {
490 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: "
491 << num_bytes << " bytes read. [control_port="
492 << control_port_.name() << "]";
493
494 SendDataPipeControlMessage(node_controller_, control_port_,
495 DataPipeCommand::DATA_WAS_READ, num_bytes);
496 }
497
OnPortStatusChanged()498 void DataPipeConsumerDispatcher::OnPortStatusChanged() {
499 DCHECK(RequestContext::current());
500
501 base::AutoLock lock(lock_);
502
503 // We stop observing the control port as soon it's transferred, but this can
504 // race with events which are raised right before that happens. This is fine
505 // to ignore.
506 if (transferred_)
507 return;
508
509 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
510
511 UpdateSignalsStateNoLock();
512 }
513
UpdateSignalsStateNoLock()514 void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
515 lock_.AssertAcquired();
516
517 bool was_peer_closed = peer_closed_;
518 size_t previous_bytes_available = bytes_available_;
519
520 ports::PortStatus port_status;
521 int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
522 if (rv != ports::OK || !port_status.receiving_messages) {
523 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure"
524 << " [control_port=" << control_port_.name() << "]";
525 peer_closed_ = true;
526 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
527 ports::ScopedMessage message;
528 do {
529 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
530 &message);
531 if (rv != ports::OK)
532 peer_closed_ = true;
533 if (message) {
534 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
535 peer_closed_ = true;
536 break;
537 }
538
539 const DataPipeControlMessage* m =
540 static_cast<const DataPipeControlMessage*>(
541 message->payload_bytes());
542
543 if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) {
544 DLOG(ERROR) << "Unexpected control message from producer.";
545 peer_closed_ = true;
546 break;
547 }
548
549 if (static_cast<size_t>(bytes_available_) + m->num_bytes >
550 options_.capacity_num_bytes) {
551 DLOG(ERROR) << "Producer claims to have written too many bytes.";
552 peer_closed_ = true;
553 break;
554 }
555
556 DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that "
557 << m->num_bytes << " bytes were written. [control_port="
558 << control_port_.name() << "]";
559
560 bytes_available_ += m->num_bytes;
561 }
562 } while (message);
563 }
564
565 if (peer_closed_ != was_peer_closed ||
566 bytes_available_ != previous_bytes_available) {
567 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
568 }
569 }
570
571 } // namespace edk
572 } // namespace mojo
573