1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/data_pipe_producer_dispatcher.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "mojo/core/configuration.h"
16 #include "mojo/core/core.h"
17 #include "mojo/core/data_pipe_control_message.h"
18 #include "mojo/core/node_controller.h"
19 #include "mojo/core/platform_handle_utils.h"
20 #include "mojo/core/request_context.h"
21 #include "mojo/core/user_message_impl.h"
22 #include "mojo/public/c/system/data_pipe.h"
23 
24 namespace mojo {
25 namespace core {
26 
27 namespace {
28 
29 const uint8_t kFlagPeerClosed = 0x01;
30 
31 #pragma pack(push, 1)
32 
33 struct SerializedState {
34   MojoCreateDataPipeOptions options;
35   uint64_t pipe_id;
36   uint32_t write_offset;
37   uint32_t available_capacity;
38   uint8_t flags;
39   uint64_t buffer_guid_high;
40   uint64_t buffer_guid_low;
41   char padding[7];
42 };
43 
44 static_assert(sizeof(SerializedState) % 8 == 0,
45               "Invalid SerializedState size.");
46 
47 #pragma pack(pop)
48 
49 }  // namespace
50 
51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
52 // reference to the dispatcher to ensure it lives as long as the observed port.
53 class DataPipeProducerDispatcher::PortObserverThunk
54     : public NodeController::PortObserver {
55  public:
PortObserverThunk(scoped_refptr<DataPipeProducerDispatcher> dispatcher)56   explicit PortObserverThunk(
57       scoped_refptr<DataPipeProducerDispatcher> dispatcher)
58       : dispatcher_(dispatcher) {}
59 
60  private:
~PortObserverThunk()61   ~PortObserverThunk() override {}
62 
63   // NodeController::PortObserver:
OnPortStatusChanged()64   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
65 
66   scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
67 
68   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
69 };
70 
71 // static
Create(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)72 scoped_refptr<DataPipeProducerDispatcher> DataPipeProducerDispatcher::Create(
73     NodeController* node_controller,
74     const ports::PortRef& control_port,
75     base::UnsafeSharedMemoryRegion shared_ring_buffer,
76     const MojoCreateDataPipeOptions& options,
77     uint64_t pipe_id) {
78   scoped_refptr<DataPipeProducerDispatcher> producer =
79       new DataPipeProducerDispatcher(node_controller, control_port,
80                                      std::move(shared_ring_buffer), options,
81                                      pipe_id);
82   base::AutoLock lock(producer->lock_);
83   if (!producer->InitializeNoLock())
84     return nullptr;
85   return producer;
86 }
87 
GetType() const88 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
89   return Type::DATA_PIPE_PRODUCER;
90 }
91 
Close()92 MojoResult DataPipeProducerDispatcher::Close() {
93   base::AutoLock lock(lock_);
94   DVLOG(1) << "Closing data pipe producer " << pipe_id_;
95   return CloseNoLock();
96 }
97 
WriteData(const void * elements,uint32_t * num_bytes,const MojoWriteDataOptions & options)98 MojoResult DataPipeProducerDispatcher::WriteData(
99     const void* elements,
100     uint32_t* num_bytes,
101     const MojoWriteDataOptions& options) {
102   base::AutoLock lock(lock_);
103   if (!shared_ring_buffer_.IsValid() || in_transit_)
104     return MOJO_RESULT_INVALID_ARGUMENT;
105 
106   if (in_two_phase_write_)
107     return MOJO_RESULT_BUSY;
108 
109   if (peer_closed_)
110     return MOJO_RESULT_FAILED_PRECONDITION;
111 
112   if (*num_bytes % options_.element_num_bytes != 0)
113     return MOJO_RESULT_INVALID_ARGUMENT;
114   if (*num_bytes == 0)
115     return MOJO_RESULT_OK;  // Nothing to do.
116 
117   if ((options.flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
118       (*num_bytes > available_capacity_)) {
119     // Don't return "should wait" since you can't wait for a specified amount of
120     // data.
121     return MOJO_RESULT_OUT_OF_RANGE;
122   }
123 
124   DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
125   uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
126   if (num_bytes_to_write == 0)
127     return MOJO_RESULT_SHOULD_WAIT;
128 
129   *num_bytes = num_bytes_to_write;
130 
131   CHECK(ring_buffer_mapping_.IsValid());
132   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
133   CHECK(data);
134 
135   const uint8_t* source = static_cast<const uint8_t*>(elements);
136   CHECK(source);
137 
138   DCHECK_LE(write_offset_, options_.capacity_num_bytes);
139   uint32_t tail_bytes_to_write =
140       std::min(options_.capacity_num_bytes - write_offset_, num_bytes_to_write);
141   uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
142 
143   DCHECK_GT(tail_bytes_to_write, 0u);
144   memcpy(data + write_offset_, source, tail_bytes_to_write);
145   if (head_bytes_to_write > 0)
146     memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
147 
148   DCHECK_LE(num_bytes_to_write, available_capacity_);
149   available_capacity_ -= num_bytes_to_write;
150   write_offset_ =
151       (write_offset_ + num_bytes_to_write) % options_.capacity_num_bytes;
152 
153   watchers_.NotifyState(GetHandleSignalsStateNoLock());
154 
155   base::AutoUnlock unlock(lock_);
156   NotifyWrite(num_bytes_to_write);
157 
158   return MOJO_RESULT_OK;
159 }
160 
BeginWriteData(void ** buffer,uint32_t * buffer_num_bytes)161 MojoResult DataPipeProducerDispatcher::BeginWriteData(
162     void** buffer,
163     uint32_t* buffer_num_bytes) {
164   base::AutoLock lock(lock_);
165   if (!shared_ring_buffer_.IsValid() || in_transit_)
166     return MOJO_RESULT_INVALID_ARGUMENT;
167 
168   if (in_two_phase_write_)
169     return MOJO_RESULT_BUSY;
170   if (peer_closed_)
171     return MOJO_RESULT_FAILED_PRECONDITION;
172 
173   if (available_capacity_ == 0) {
174     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
175                         : MOJO_RESULT_SHOULD_WAIT;
176   }
177 
178   in_two_phase_write_ = true;
179   *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
180                                available_capacity_);
181   DCHECK_GT(*buffer_num_bytes, 0u);
182 
183   CHECK(ring_buffer_mapping_.IsValid());
184   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_.memory());
185   *buffer = data + write_offset_;
186 
187   return MOJO_RESULT_OK;
188 }
189 
EndWriteData(uint32_t num_bytes_written)190 MojoResult DataPipeProducerDispatcher::EndWriteData(
191     uint32_t num_bytes_written) {
192   base::AutoLock lock(lock_);
193   if (is_closed_ || in_transit_)
194     return MOJO_RESULT_INVALID_ARGUMENT;
195 
196   if (!in_two_phase_write_)
197     return MOJO_RESULT_FAILED_PRECONDITION;
198 
199   // Note: Allow successful completion of the two-phase write even if the other
200   // side has been closed.
201   MojoResult rv = MOJO_RESULT_OK;
202   if (num_bytes_written > available_capacity_ ||
203       num_bytes_written % options_.element_num_bytes != 0 ||
204       write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
205     rv = MOJO_RESULT_INVALID_ARGUMENT;
206   } else {
207     DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
208     available_capacity_ -= num_bytes_written;
209     write_offset_ =
210         (write_offset_ + num_bytes_written) % options_.capacity_num_bytes;
211 
212     base::AutoUnlock unlock(lock_);
213     NotifyWrite(num_bytes_written);
214   }
215 
216   in_two_phase_write_ = false;
217 
218   // If we're now writable, we *became* writable (since we weren't writable
219   // during the two-phase write), so notify watchers.
220   watchers_.NotifyState(GetHandleSignalsStateNoLock());
221 
222   return rv;
223 }
224 
GetHandleSignalsState() const225 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
226   base::AutoLock lock(lock_);
227   return GetHandleSignalsStateNoLock();
228 }
229 
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)230 MojoResult DataPipeProducerDispatcher::AddWatcherRef(
231     const scoped_refptr<WatcherDispatcher>& watcher,
232     uintptr_t context) {
233   base::AutoLock lock(lock_);
234   if (is_closed_ || in_transit_)
235     return MOJO_RESULT_INVALID_ARGUMENT;
236   return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
237 }
238 
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)239 MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
240     WatcherDispatcher* watcher,
241     uintptr_t context) {
242   base::AutoLock lock(lock_);
243   if (is_closed_ || in_transit_)
244     return MOJO_RESULT_INVALID_ARGUMENT;
245   return watchers_.Remove(watcher, context);
246 }
247 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)248 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
249                                                 uint32_t* num_ports,
250                                                 uint32_t* num_handles) {
251   base::AutoLock lock(lock_);
252   DCHECK(in_transit_);
253   *num_bytes = sizeof(SerializedState);
254   *num_ports = 1;
255   *num_handles = 1;
256 }
257 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)258 bool DataPipeProducerDispatcher::EndSerialize(
259     void* destination,
260     ports::PortName* ports,
261     PlatformHandle* platform_handles) {
262   SerializedState* state = static_cast<SerializedState*>(destination);
263   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
264   memset(state->padding, 0, sizeof(state->padding));
265 
266   base::AutoLock lock(lock_);
267   DCHECK(in_transit_);
268   state->pipe_id = pipe_id_;
269   state->write_offset = write_offset_;
270   state->available_capacity = available_capacity_;
271   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
272 
273   auto region_handle =
274       base::UnsafeSharedMemoryRegion::TakeHandleForSerialization(
275           std::move(shared_ring_buffer_));
276   const base::UnguessableToken& guid = region_handle.GetGUID();
277   state->buffer_guid_high = guid.GetHighForSerialization();
278   state->buffer_guid_low = guid.GetLowForSerialization();
279 
280   ports[0] = control_port_.name();
281 
282   PlatformHandle handle;
283   PlatformHandle ignored_handle;
284   ExtractPlatformHandlesFromSharedMemoryRegionHandle(
285       region_handle.PassPlatformHandle(), &handle, &ignored_handle);
286   if (!handle.is_valid() || ignored_handle.is_valid())
287     return false;
288 
289   platform_handles[0] = std::move(handle);
290   return true;
291 }
292 
BeginTransit()293 bool DataPipeProducerDispatcher::BeginTransit() {
294   base::AutoLock lock(lock_);
295   if (in_transit_)
296     return false;
297   in_transit_ = !in_two_phase_write_;
298   return in_transit_;
299 }
300 
CompleteTransitAndClose()301 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
302   node_controller_->SetPortObserver(control_port_, nullptr);
303 
304   base::AutoLock lock(lock_);
305   DCHECK(in_transit_);
306   transferred_ = true;
307   in_transit_ = false;
308   CloseNoLock();
309 }
310 
CancelTransit()311 void DataPipeProducerDispatcher::CancelTransit() {
312   base::AutoLock lock(lock_);
313   DCHECK(in_transit_);
314   in_transit_ = false;
315 
316   HandleSignalsState state = GetHandleSignalsStateNoLock();
317   watchers_.NotifyState(state);
318 }
319 
320 // static
321 scoped_refptr<DataPipeProducerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)322 DataPipeProducerDispatcher::Deserialize(const void* data,
323                                         size_t num_bytes,
324                                         const ports::PortName* ports,
325                                         size_t num_ports,
326                                         PlatformHandle* handles,
327                                         size_t num_handles) {
328   if (num_ports != 1 || num_handles != 1 ||
329       num_bytes != sizeof(SerializedState)) {
330     return nullptr;
331   }
332 
333   const SerializedState* state = static_cast<const SerializedState*>(data);
334   if (!state->options.capacity_num_bytes || !state->options.element_num_bytes ||
335       state->options.capacity_num_bytes < state->options.element_num_bytes) {
336     return nullptr;
337   }
338 
339   NodeController* node_controller = Core::Get()->GetNodeController();
340   ports::PortRef port;
341   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
342     return nullptr;
343 
344   auto region_handle = CreateSharedMemoryRegionHandleFromPlatformHandles(
345       std::move(handles[0]), PlatformHandle());
346   auto region = base::subtle::PlatformSharedMemoryRegion::Take(
347       std::move(region_handle),
348       base::subtle::PlatformSharedMemoryRegion::Mode::kUnsafe,
349       state->options.capacity_num_bytes,
350       base::UnguessableToken::Deserialize(state->buffer_guid_high,
351                                           state->buffer_guid_low));
352   auto ring_buffer =
353       base::UnsafeSharedMemoryRegion::Deserialize(std::move(region));
354   if (!ring_buffer.IsValid()) {
355     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
356     return nullptr;
357   }
358 
359   scoped_refptr<DataPipeProducerDispatcher> dispatcher =
360       new DataPipeProducerDispatcher(node_controller, port,
361                                      std::move(ring_buffer), state->options,
362                                      state->pipe_id);
363 
364   {
365     base::AutoLock lock(dispatcher->lock_);
366     dispatcher->write_offset_ = state->write_offset;
367     dispatcher->available_capacity_ = state->available_capacity;
368     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
369     if (!dispatcher->InitializeNoLock())
370       return nullptr;
371     dispatcher->UpdateSignalsStateNoLock();
372   }
373 
374   return dispatcher;
375 }
376 
DataPipeProducerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,base::UnsafeSharedMemoryRegion shared_ring_buffer,const MojoCreateDataPipeOptions & options,uint64_t pipe_id)377 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
378     NodeController* node_controller,
379     const ports::PortRef& control_port,
380     base::UnsafeSharedMemoryRegion shared_ring_buffer,
381     const MojoCreateDataPipeOptions& options,
382     uint64_t pipe_id)
383     : options_(options),
384       node_controller_(node_controller),
385       control_port_(control_port),
386       pipe_id_(pipe_id),
387       watchers_(this),
388       shared_ring_buffer_(std::move(shared_ring_buffer)),
389       available_capacity_(options_.capacity_num_bytes) {}
390 
~DataPipeProducerDispatcher()391 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
392   DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_.IsValid() &&
393          !ring_buffer_mapping_.IsValid());
394 }
395 
InitializeNoLock()396 bool DataPipeProducerDispatcher::InitializeNoLock() {
397   lock_.AssertAcquired();
398   if (!shared_ring_buffer_.IsValid())
399     return false;
400 
401   DCHECK(!ring_buffer_mapping_.IsValid());
402   ring_buffer_mapping_ = shared_ring_buffer_.Map();
403   if (!ring_buffer_mapping_.IsValid()) {
404     DLOG(ERROR) << "Failed to map shared buffer.";
405     shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
406     return false;
407   }
408 
409   base::AutoUnlock unlock(lock_);
410   node_controller_->SetPortObserver(
411       control_port_, base::MakeRefCounted<PortObserverThunk>(this));
412 
413   return true;
414 }
415 
CloseNoLock()416 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
417   lock_.AssertAcquired();
418   if (is_closed_ || in_transit_)
419     return MOJO_RESULT_INVALID_ARGUMENT;
420   is_closed_ = true;
421   ring_buffer_mapping_ = base::WritableSharedMemoryMapping();
422   shared_ring_buffer_ = base::UnsafeSharedMemoryRegion();
423 
424   watchers_.NotifyClosed();
425   if (!transferred_) {
426     base::AutoUnlock unlock(lock_);
427     node_controller_->ClosePort(control_port_);
428   }
429 
430   return MOJO_RESULT_OK;
431 }
432 
GetHandleSignalsStateNoLock() const433 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
434     const {
435   lock_.AssertAcquired();
436   HandleSignalsState rv;
437   if (!peer_closed_) {
438     if (!in_two_phase_write_ && shared_ring_buffer_.IsValid() &&
439         available_capacity_ > 0)
440       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
441     if (peer_remote_)
442       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
443     rv.satisfiable_signals |=
444         MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_REMOTE;
445   } else {
446     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
447   }
448   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
449   return rv;
450 }
451 
NotifyWrite(uint32_t num_bytes)452 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
453   DVLOG(1) << "Data pipe producer " << pipe_id_
454            << " notifying peer: " << num_bytes
455            << " bytes written. [control_port=" << control_port_.name() << "]";
456 
457   SendDataPipeControlMessage(node_controller_, control_port_,
458                              DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
459 }
460 
OnPortStatusChanged()461 void DataPipeProducerDispatcher::OnPortStatusChanged() {
462   DCHECK(RequestContext::current());
463 
464   base::AutoLock lock(lock_);
465 
466   // We stop observing the control port as soon it's transferred, but this can
467   // race with events which are raised right before that happens. This is fine
468   // to ignore.
469   if (transferred_)
470     return;
471 
472   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
473 
474   UpdateSignalsStateNoLock();
475 }
476 
UpdateSignalsStateNoLock()477 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
478   lock_.AssertAcquired();
479 
480   const bool was_peer_closed = peer_closed_;
481   const bool was_peer_remote = peer_remote_;
482   size_t previous_capacity = available_capacity_;
483 
484   ports::PortStatus port_status;
485   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
486   peer_remote_ = rv == ports::OK && port_status.peer_remote;
487   if (rv != ports::OK || !port_status.receiving_messages) {
488     DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
489              << " [control_port=" << control_port_.name() << "]";
490     peer_closed_ = true;
491   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
492     std::unique_ptr<ports::UserMessageEvent> message_event;
493     do {
494       int rv = node_controller_->node()->GetMessage(control_port_,
495                                                     &message_event, nullptr);
496       if (rv != ports::OK)
497         peer_closed_ = true;
498       if (message_event) {
499         auto* message = message_event->GetMessage<UserMessageImpl>();
500         if (message->user_payload_size() < sizeof(DataPipeControlMessage)) {
501           peer_closed_ = true;
502           break;
503         }
504 
505         const DataPipeControlMessage* m =
506             static_cast<const DataPipeControlMessage*>(message->user_payload());
507 
508         if (m->command != DataPipeCommand::DATA_WAS_READ) {
509           DLOG(ERROR) << "Unexpected message from consumer.";
510           peer_closed_ = true;
511           break;
512         }
513 
514         if (static_cast<size_t>(available_capacity_) + m->num_bytes >
515             options_.capacity_num_bytes) {
516           DLOG(ERROR) << "Consumer claims to have read too many bytes.";
517           break;
518         }
519 
520         DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
521                  << m->num_bytes
522                  << " bytes were read. [control_port=" << control_port_.name()
523                  << "]";
524 
525         available_capacity_ += m->num_bytes;
526       }
527     } while (message_event);
528   }
529 
530   if (peer_closed_ != was_peer_closed ||
531       available_capacity_ != previous_capacity ||
532       was_peer_remote != peer_remote_) {
533     watchers_.NotifyState(GetHandleSignalsStateNoLock());
534   }
535 }
536 
537 }  // namespace core
538 }  // namespace mojo
539