1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <utility>
11
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/message_loop/message_loop.h"
16 #include "mojo/edk/embedder/embedder_internal.h"
17 #include "mojo/edk/embedder/platform_shared_buffer.h"
18 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/core.h"
20 #include "mojo/edk/system/data_pipe_control_message.h"
21 #include "mojo/edk/system/node_controller.h"
22 #include "mojo/edk/system/ports_message.h"
23 #include "mojo/edk/system/request_context.h"
24 #include "mojo/public/c/system/data_pipe.h"
25
26 namespace mojo {
27 namespace edk {
28
29 namespace {
30
31 const uint8_t kFlagPeerClosed = 0x01;
32
33 #pragma pack(push, 1)
34
35 struct SerializedState {
36 MojoCreateDataPipeOptions options;
37 uint64_t pipe_id;
38 uint32_t write_offset;
39 uint32_t available_capacity;
40 uint8_t flags;
41 char padding[7];
42 };
43
44 static_assert(sizeof(SerializedState) % 8 == 0,
45 "Invalid SerializedState size.");
46
47 #pragma pack(pop)
48
49 } // namespace
50
51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
52 // reference to the dispatcher to ensure it lives as long as the observed port.
53 class DataPipeProducerDispatcher::PortObserverThunk
54 : public NodeController::PortObserver {
55 public:
PortObserverThunk(scoped_refptr<DataPipeProducerDispatcher> dispatcher)56 explicit PortObserverThunk(
57 scoped_refptr<DataPipeProducerDispatcher> dispatcher)
58 : dispatcher_(dispatcher) {}
59
60 private:
~PortObserverThunk()61 ~PortObserverThunk() override {}
62
63 // NodeController::PortObserver:
OnPortStatusChanged()64 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
65
66 scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
67
68 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
69 };
70
DataPipeProducerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,const MojoCreateDataPipeOptions & options,bool initialized,uint64_t pipe_id)71 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
72 NodeController* node_controller,
73 const ports::PortRef& control_port,
74 scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75 const MojoCreateDataPipeOptions& options,
76 bool initialized,
77 uint64_t pipe_id)
78 : options_(options),
79 node_controller_(node_controller),
80 control_port_(control_port),
81 pipe_id_(pipe_id),
82 shared_ring_buffer_(shared_ring_buffer),
83 available_capacity_(options_.capacity_num_bytes) {
84 if (initialized) {
85 base::AutoLock lock(lock_);
86 InitializeNoLock();
87 }
88 }
89
GetType() const90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91 return Type::DATA_PIPE_PRODUCER;
92 }
93
Close()94 MojoResult DataPipeProducerDispatcher::Close() {
95 base::AutoLock lock(lock_);
96 DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97 return CloseNoLock();
98 }
99
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)100 MojoResult DataPipeProducerDispatcher::Watch(
101 MojoHandleSignals signals,
102 const Watcher::WatchCallback& callback,
103 uintptr_t context) {
104 base::AutoLock lock(lock_);
105
106 if (is_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT;
108
109 return awakable_list_.AddWatcher(
110 signals, callback, context, GetHandleSignalsStateNoLock());
111 }
112
CancelWatch(uintptr_t context)113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114 base::AutoLock lock(lock_);
115
116 if (is_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT;
118
119 return awakable_list_.RemoveWatcher(context);
120 }
121
WriteData(const void * elements,uint32_t * num_bytes,MojoWriteDataFlags flags)122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123 uint32_t* num_bytes,
124 MojoWriteDataFlags flags) {
125 base::AutoLock lock(lock_);
126 if (!shared_ring_buffer_ || in_transit_)
127 return MOJO_RESULT_INVALID_ARGUMENT;
128
129 if (in_two_phase_write_)
130 return MOJO_RESULT_BUSY;
131
132 if (peer_closed_)
133 return MOJO_RESULT_FAILED_PRECONDITION;
134
135 if (*num_bytes % options_.element_num_bytes != 0)
136 return MOJO_RESULT_INVALID_ARGUMENT;
137 if (*num_bytes == 0)
138 return MOJO_RESULT_OK; // Nothing to do.
139
140 bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
141 uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
142 if (min_num_bytes_to_write > options_.capacity_num_bytes) {
143 // Don't return "should wait" since you can't wait for a specified amount of
144 // data.
145 return MOJO_RESULT_OUT_OF_RANGE;
146 }
147
148 DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
149 uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
150 if (num_bytes_to_write == 0)
151 return MOJO_RESULT_SHOULD_WAIT;
152
153 HandleSignalsState old_state = GetHandleSignalsStateNoLock();
154
155 *num_bytes = num_bytes_to_write;
156
157 CHECK(ring_buffer_mapping_);
158 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
159 CHECK(data);
160
161 const uint8_t* source = static_cast<const uint8_t*>(elements);
162 CHECK(source);
163
164 DCHECK_LE(write_offset_, options_.capacity_num_bytes);
165 uint32_t tail_bytes_to_write =
166 std::min(options_.capacity_num_bytes - write_offset_,
167 num_bytes_to_write);
168 uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
169
170 DCHECK_GT(tail_bytes_to_write, 0u);
171 memcpy(data + write_offset_, source, tail_bytes_to_write);
172 if (head_bytes_to_write > 0)
173 memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
174
175 DCHECK_LE(num_bytes_to_write, available_capacity_);
176 available_capacity_ -= num_bytes_to_write;
177 write_offset_ = (write_offset_ + num_bytes_to_write) %
178 options_.capacity_num_bytes;
179
180 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
181 if (!new_state.equals(old_state))
182 awakable_list_.AwakeForStateChange(new_state);
183
184 base::AutoUnlock unlock(lock_);
185 NotifyWrite(num_bytes_to_write);
186
187 return MOJO_RESULT_OK;
188 }
189
BeginWriteData(void ** buffer,uint32_t * buffer_num_bytes,MojoWriteDataFlags flags)190 MojoResult DataPipeProducerDispatcher::BeginWriteData(
191 void** buffer,
192 uint32_t* buffer_num_bytes,
193 MojoWriteDataFlags flags) {
194 base::AutoLock lock(lock_);
195 if (!shared_ring_buffer_ || in_transit_)
196 return MOJO_RESULT_INVALID_ARGUMENT;
197 if (in_two_phase_write_)
198 return MOJO_RESULT_BUSY;
199 if (peer_closed_)
200 return MOJO_RESULT_FAILED_PRECONDITION;
201
202 if (available_capacity_ == 0) {
203 return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
204 : MOJO_RESULT_SHOULD_WAIT;
205 }
206
207 in_two_phase_write_ = true;
208 *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
209 available_capacity_);
210 DCHECK_GT(*buffer_num_bytes, 0u);
211
212 CHECK(ring_buffer_mapping_);
213 uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
214 *buffer = data + write_offset_;
215
216 return MOJO_RESULT_OK;
217 }
218
EndWriteData(uint32_t num_bytes_written)219 MojoResult DataPipeProducerDispatcher::EndWriteData(
220 uint32_t num_bytes_written) {
221 base::AutoLock lock(lock_);
222 if (is_closed_ || in_transit_)
223 return MOJO_RESULT_INVALID_ARGUMENT;
224
225 if (!in_two_phase_write_)
226 return MOJO_RESULT_FAILED_PRECONDITION;
227
228 DCHECK(shared_ring_buffer_);
229 DCHECK(ring_buffer_mapping_);
230
231 // Note: Allow successful completion of the two-phase write even if the other
232 // side has been closed.
233 MojoResult rv = MOJO_RESULT_OK;
234 if (num_bytes_written > available_capacity_ ||
235 num_bytes_written % options_.element_num_bytes != 0 ||
236 write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
237 rv = MOJO_RESULT_INVALID_ARGUMENT;
238 } else {
239 DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
240 available_capacity_ -= num_bytes_written;
241 write_offset_ = (write_offset_ + num_bytes_written) %
242 options_.capacity_num_bytes;
243
244 base::AutoUnlock unlock(lock_);
245 NotifyWrite(num_bytes_written);
246 }
247
248 in_two_phase_write_ = false;
249
250 // If we're now writable, we *became* writable (since we weren't writable
251 // during the two-phase write), so awake producer awakables.
252 HandleSignalsState new_state = GetHandleSignalsStateNoLock();
253 if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
254 awakable_list_.AwakeForStateChange(new_state);
255
256 return rv;
257 }
258
GetHandleSignalsState() const259 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
260 base::AutoLock lock(lock_);
261 return GetHandleSignalsStateNoLock();
262 }
263
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)264 MojoResult DataPipeProducerDispatcher::AddAwakable(
265 Awakable* awakable,
266 MojoHandleSignals signals,
267 uintptr_t context,
268 HandleSignalsState* signals_state) {
269 base::AutoLock lock(lock_);
270 if (!shared_ring_buffer_ || in_transit_) {
271 if (signals_state)
272 *signals_state = HandleSignalsState();
273 return MOJO_RESULT_INVALID_ARGUMENT;
274 }
275 UpdateSignalsStateNoLock();
276 HandleSignalsState state = GetHandleSignalsStateNoLock();
277 if (state.satisfies(signals)) {
278 if (signals_state)
279 *signals_state = state;
280 return MOJO_RESULT_ALREADY_EXISTS;
281 }
282 if (!state.can_satisfy(signals)) {
283 if (signals_state)
284 *signals_state = state;
285 return MOJO_RESULT_FAILED_PRECONDITION;
286 }
287
288 awakable_list_.Add(awakable, signals, context);
289 return MOJO_RESULT_OK;
290 }
291
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)292 void DataPipeProducerDispatcher::RemoveAwakable(
293 Awakable* awakable,
294 HandleSignalsState* signals_state) {
295 base::AutoLock lock(lock_);
296 if ((!shared_ring_buffer_ || in_transit_) && signals_state)
297 *signals_state = HandleSignalsState();
298 else if (signals_state)
299 *signals_state = GetHandleSignalsStateNoLock();
300 awakable_list_.Remove(awakable);
301 }
302
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)303 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
304 uint32_t* num_ports,
305 uint32_t* num_handles) {
306 base::AutoLock lock(lock_);
307 DCHECK(in_transit_);
308 *num_bytes = sizeof(SerializedState);
309 *num_ports = 1;
310 *num_handles = 1;
311 }
312
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)313 bool DataPipeProducerDispatcher::EndSerialize(
314 void* destination,
315 ports::PortName* ports,
316 PlatformHandle* platform_handles) {
317 SerializedState* state = static_cast<SerializedState*>(destination);
318 memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
319 memset(state->padding, 0, sizeof(state->padding));
320
321 base::AutoLock lock(lock_);
322 DCHECK(in_transit_);
323 state->pipe_id = pipe_id_;
324 state->write_offset = write_offset_;
325 state->available_capacity = available_capacity_;
326 state->flags = peer_closed_ ? kFlagPeerClosed : 0;
327
328 ports[0] = control_port_.name();
329
330 buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
331 platform_handles[0] = buffer_handle_for_transit_.get();
332
333 return true;
334 }
335
BeginTransit()336 bool DataPipeProducerDispatcher::BeginTransit() {
337 base::AutoLock lock(lock_);
338 if (in_transit_)
339 return false;
340 in_transit_ = !in_two_phase_write_;
341 return in_transit_;
342 }
343
CompleteTransitAndClose()344 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
345 node_controller_->SetPortObserver(control_port_, nullptr);
346
347 base::AutoLock lock(lock_);
348 DCHECK(in_transit_);
349 transferred_ = true;
350 in_transit_ = false;
351 ignore_result(buffer_handle_for_transit_.release());
352 CloseNoLock();
353 }
354
CancelTransit()355 void DataPipeProducerDispatcher::CancelTransit() {
356 base::AutoLock lock(lock_);
357 DCHECK(in_transit_);
358 in_transit_ = false;
359 buffer_handle_for_transit_.reset();
360 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
361 }
362
363 // static
364 scoped_refptr<DataPipeProducerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)365 DataPipeProducerDispatcher::Deserialize(const void* data,
366 size_t num_bytes,
367 const ports::PortName* ports,
368 size_t num_ports,
369 PlatformHandle* handles,
370 size_t num_handles) {
371 if (num_ports != 1 || num_handles != 1 ||
372 num_bytes != sizeof(SerializedState)) {
373 return nullptr;
374 }
375
376 const SerializedState* state = static_cast<const SerializedState*>(data);
377
378 NodeController* node_controller = internal::g_core->GetNodeController();
379 ports::PortRef port;
380 if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
381 return nullptr;
382
383 PlatformHandle buffer_handle;
384 std::swap(buffer_handle, handles[0]);
385 scoped_refptr<PlatformSharedBuffer> ring_buffer =
386 PlatformSharedBuffer::CreateFromPlatformHandle(
387 state->options.capacity_num_bytes,
388 false /* read_only */,
389 ScopedPlatformHandle(buffer_handle));
390 if (!ring_buffer) {
391 DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
392 return nullptr;
393 }
394
395 scoped_refptr<DataPipeProducerDispatcher> dispatcher =
396 new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
397 state->options, false /* initialized */,
398 state->pipe_id);
399
400 {
401 base::AutoLock lock(dispatcher->lock_);
402 dispatcher->write_offset_ = state->write_offset;
403 dispatcher->available_capacity_ = state->available_capacity;
404 dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
405 dispatcher->InitializeNoLock();
406 }
407
408 return dispatcher;
409 }
410
~DataPipeProducerDispatcher()411 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
412 DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
413 !ring_buffer_mapping_);
414 }
415
InitializeNoLock()416 void DataPipeProducerDispatcher::InitializeNoLock() {
417 lock_.AssertAcquired();
418
419 if (shared_ring_buffer_) {
420 ring_buffer_mapping_ =
421 shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
422 if (!ring_buffer_mapping_) {
423 DLOG(ERROR) << "Failed to map shared buffer.";
424 shared_ring_buffer_ = nullptr;
425 }
426 }
427
428 base::AutoUnlock unlock(lock_);
429 node_controller_->SetPortObserver(
430 control_port_,
431 make_scoped_refptr(new PortObserverThunk(this)));
432 }
433
CloseNoLock()434 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435 lock_.AssertAcquired();
436 if (is_closed_ || in_transit_)
437 return MOJO_RESULT_INVALID_ARGUMENT;
438 is_closed_ = true;
439 ring_buffer_mapping_.reset();
440 shared_ring_buffer_ = nullptr;
441
442 awakable_list_.CancelAll();
443 if (!transferred_) {
444 base::AutoUnlock unlock(lock_);
445 node_controller_->ClosePort(control_port_);
446 }
447
448 return MOJO_RESULT_OK;
449 }
450
GetHandleSignalsStateNoLock() const451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452 const {
453 lock_.AssertAcquired();
454 HandleSignalsState rv;
455 if (!peer_closed_) {
456 if (!in_two_phase_write_ && shared_ring_buffer_ &&
457 available_capacity_ > 0)
458 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
459 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
460 } else {
461 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
462 }
463 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
464 return rv;
465 }
466
NotifyWrite(uint32_t num_bytes)467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
468 DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
469 << num_bytes << " bytes written. [control_port="
470 << control_port_.name() << "]";
471
472 SendDataPipeControlMessage(node_controller_, control_port_,
473 DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
474 }
475
OnPortStatusChanged()476 void DataPipeProducerDispatcher::OnPortStatusChanged() {
477 DCHECK(RequestContext::current());
478
479 base::AutoLock lock(lock_);
480
481 // We stop observing the control port as soon it's transferred, but this can
482 // race with events which are raised right before that happens. This is fine
483 // to ignore.
484 if (transferred_)
485 return;
486
487 DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
488
489 UpdateSignalsStateNoLock();
490 }
491
UpdateSignalsStateNoLock()492 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
493 lock_.AssertAcquired();
494
495 bool was_peer_closed = peer_closed_;
496 size_t previous_capacity = available_capacity_;
497
498 ports::PortStatus port_status;
499 int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
500 if (rv != ports::OK || !port_status.receiving_messages) {
501 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
502 << " [control_port=" << control_port_.name() << "]";
503 peer_closed_ = true;
504 } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
505 ports::ScopedMessage message;
506 do {
507 int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
508 &message);
509 if (rv != ports::OK)
510 peer_closed_ = true;
511 if (message) {
512 if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
513 peer_closed_ = true;
514 break;
515 }
516
517 const DataPipeControlMessage* m =
518 static_cast<const DataPipeControlMessage*>(
519 message->payload_bytes());
520
521 if (m->command != DataPipeCommand::DATA_WAS_READ) {
522 DLOG(ERROR) << "Unexpected message from consumer.";
523 peer_closed_ = true;
524 break;
525 }
526
527 if (static_cast<size_t>(available_capacity_) + m->num_bytes >
528 options_.capacity_num_bytes) {
529 DLOG(ERROR) << "Consumer claims to have read too many bytes.";
530 break;
531 }
532
533 DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
534 << m->num_bytes << " bytes were read. [control_port="
535 << control_port_.name() << "]";
536
537 available_capacity_ += m->num_bytes;
538 }
539 } while (message);
540 }
541
542 if (peer_closed_ != was_peer_closed ||
543 available_capacity_ != previous_capacity) {
544 awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
545 }
546 }
547
548 } // namespace edk
549 } // namespace mojo
550