1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/data_pipe_producer_dispatcher.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/message_loop/message_loop.h"
16 #include "mojo/edk/embedder/embedder_internal.h"
17 #include "mojo/edk/embedder/platform_shared_buffer.h"
18 #include "mojo/edk/system/configuration.h"
19 #include "mojo/edk/system/core.h"
20 #include "mojo/edk/system/data_pipe_control_message.h"
21 #include "mojo/edk/system/node_controller.h"
22 #include "mojo/edk/system/ports_message.h"
23 #include "mojo/edk/system/request_context.h"
24 #include "mojo/public/c/system/data_pipe.h"
25 
26 namespace mojo {
27 namespace edk {
28 
29 namespace {
30 
31 const uint8_t kFlagPeerClosed = 0x01;
32 
33 #pragma pack(push, 1)
34 
35 struct SerializedState {
36   MojoCreateDataPipeOptions options;
37   uint64_t pipe_id;
38   uint32_t write_offset;
39   uint32_t available_capacity;
40   uint8_t flags;
41   char padding[7];
42 };
43 
44 static_assert(sizeof(SerializedState) % 8 == 0,
45               "Invalid SerializedState size.");
46 
47 #pragma pack(pop)
48 
49 }  // namespace
50 
51 // A PortObserver which forwards to a DataPipeProducerDispatcher. This owns a
52 // reference to the dispatcher to ensure it lives as long as the observed port.
53 class DataPipeProducerDispatcher::PortObserverThunk
54     : public NodeController::PortObserver {
55  public:
PortObserverThunk(scoped_refptr<DataPipeProducerDispatcher> dispatcher)56   explicit PortObserverThunk(
57       scoped_refptr<DataPipeProducerDispatcher> dispatcher)
58       : dispatcher_(dispatcher) {}
59 
60  private:
~PortObserverThunk()61   ~PortObserverThunk() override {}
62 
63   // NodeController::PortObserver:
OnPortStatusChanged()64   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
65 
66   scoped_refptr<DataPipeProducerDispatcher> dispatcher_;
67 
68   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
69 };
70 
DataPipeProducerDispatcher(NodeController * node_controller,const ports::PortRef & control_port,scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,const MojoCreateDataPipeOptions & options,bool initialized,uint64_t pipe_id)71 DataPipeProducerDispatcher::DataPipeProducerDispatcher(
72     NodeController* node_controller,
73     const ports::PortRef& control_port,
74     scoped_refptr<PlatformSharedBuffer> shared_ring_buffer,
75     const MojoCreateDataPipeOptions& options,
76     bool initialized,
77     uint64_t pipe_id)
78     : options_(options),
79       node_controller_(node_controller),
80       control_port_(control_port),
81       pipe_id_(pipe_id),
82       shared_ring_buffer_(shared_ring_buffer),
83       available_capacity_(options_.capacity_num_bytes) {
84   if (initialized) {
85     base::AutoLock lock(lock_);
86     InitializeNoLock();
87   }
88 }
89 
GetType() const90 Dispatcher::Type DataPipeProducerDispatcher::GetType() const {
91   return Type::DATA_PIPE_PRODUCER;
92 }
93 
Close()94 MojoResult DataPipeProducerDispatcher::Close() {
95   base::AutoLock lock(lock_);
96   DVLOG(1) << "Closing data pipe producer " << pipe_id_;
97   return CloseNoLock();
98 }
99 
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)100 MojoResult DataPipeProducerDispatcher::Watch(
101     MojoHandleSignals signals,
102     const Watcher::WatchCallback& callback,
103     uintptr_t context) {
104   base::AutoLock lock(lock_);
105 
106   if (is_closed_ || in_transit_)
107     return MOJO_RESULT_INVALID_ARGUMENT;
108 
109   return awakable_list_.AddWatcher(
110       signals, callback, context, GetHandleSignalsStateNoLock());
111 }
112 
CancelWatch(uintptr_t context)113 MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
114   base::AutoLock lock(lock_);
115 
116   if (is_closed_ || in_transit_)
117     return MOJO_RESULT_INVALID_ARGUMENT;
118 
119   return awakable_list_.RemoveWatcher(context);
120 }
121 
WriteData(const void * elements,uint32_t * num_bytes,MojoWriteDataFlags flags)122 MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
123                                                  uint32_t* num_bytes,
124                                                  MojoWriteDataFlags flags) {
125   base::AutoLock lock(lock_);
126   if (!shared_ring_buffer_ || in_transit_)
127     return MOJO_RESULT_INVALID_ARGUMENT;
128 
129   if (in_two_phase_write_)
130     return MOJO_RESULT_BUSY;
131 
132   if (peer_closed_)
133     return MOJO_RESULT_FAILED_PRECONDITION;
134 
135   if (*num_bytes % options_.element_num_bytes != 0)
136     return MOJO_RESULT_INVALID_ARGUMENT;
137   if (*num_bytes == 0)
138     return MOJO_RESULT_OK;  // Nothing to do.
139 
140   bool all_or_none = flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE;
141   uint32_t min_num_bytes_to_write = all_or_none ? *num_bytes : 0;
142   if (min_num_bytes_to_write > options_.capacity_num_bytes) {
143     // Don't return "should wait" since you can't wait for a specified amount of
144     // data.
145     return MOJO_RESULT_OUT_OF_RANGE;
146   }
147 
148   DCHECK_LE(available_capacity_, options_.capacity_num_bytes);
149   uint32_t num_bytes_to_write = std::min(*num_bytes, available_capacity_);
150   if (num_bytes_to_write == 0)
151     return MOJO_RESULT_SHOULD_WAIT;
152 
153   HandleSignalsState old_state = GetHandleSignalsStateNoLock();
154 
155   *num_bytes = num_bytes_to_write;
156 
157   CHECK(ring_buffer_mapping_);
158   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
159   CHECK(data);
160 
161   const uint8_t* source = static_cast<const uint8_t*>(elements);
162   CHECK(source);
163 
164   DCHECK_LE(write_offset_, options_.capacity_num_bytes);
165   uint32_t tail_bytes_to_write =
166       std::min(options_.capacity_num_bytes - write_offset_,
167                num_bytes_to_write);
168   uint32_t head_bytes_to_write = num_bytes_to_write - tail_bytes_to_write;
169 
170   DCHECK_GT(tail_bytes_to_write, 0u);
171   memcpy(data + write_offset_, source, tail_bytes_to_write);
172   if (head_bytes_to_write > 0)
173     memcpy(data, source + tail_bytes_to_write, head_bytes_to_write);
174 
175   DCHECK_LE(num_bytes_to_write, available_capacity_);
176   available_capacity_ -= num_bytes_to_write;
177   write_offset_ = (write_offset_ + num_bytes_to_write) %
178       options_.capacity_num_bytes;
179 
180   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
181   if (!new_state.equals(old_state))
182     awakable_list_.AwakeForStateChange(new_state);
183 
184   base::AutoUnlock unlock(lock_);
185   NotifyWrite(num_bytes_to_write);
186 
187   return MOJO_RESULT_OK;
188 }
189 
BeginWriteData(void ** buffer,uint32_t * buffer_num_bytes,MojoWriteDataFlags flags)190 MojoResult DataPipeProducerDispatcher::BeginWriteData(
191     void** buffer,
192     uint32_t* buffer_num_bytes,
193     MojoWriteDataFlags flags) {
194   base::AutoLock lock(lock_);
195   if (!shared_ring_buffer_ || in_transit_)
196     return MOJO_RESULT_INVALID_ARGUMENT;
197   if (in_two_phase_write_)
198     return MOJO_RESULT_BUSY;
199   if (peer_closed_)
200     return MOJO_RESULT_FAILED_PRECONDITION;
201 
202   if (available_capacity_ == 0) {
203     return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
204                         : MOJO_RESULT_SHOULD_WAIT;
205   }
206 
207   in_two_phase_write_ = true;
208   *buffer_num_bytes = std::min(options_.capacity_num_bytes - write_offset_,
209                                available_capacity_);
210   DCHECK_GT(*buffer_num_bytes, 0u);
211 
212   CHECK(ring_buffer_mapping_);
213   uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase());
214   *buffer = data + write_offset_;
215 
216   return MOJO_RESULT_OK;
217 }
218 
EndWriteData(uint32_t num_bytes_written)219 MojoResult DataPipeProducerDispatcher::EndWriteData(
220     uint32_t num_bytes_written) {
221   base::AutoLock lock(lock_);
222   if (is_closed_ || in_transit_)
223     return MOJO_RESULT_INVALID_ARGUMENT;
224 
225   if (!in_two_phase_write_)
226     return MOJO_RESULT_FAILED_PRECONDITION;
227 
228   DCHECK(shared_ring_buffer_);
229   DCHECK(ring_buffer_mapping_);
230 
231   // Note: Allow successful completion of the two-phase write even if the other
232   // side has been closed.
233   MojoResult rv = MOJO_RESULT_OK;
234   if (num_bytes_written > available_capacity_ ||
235       num_bytes_written % options_.element_num_bytes != 0 ||
236       write_offset_ + num_bytes_written > options_.capacity_num_bytes) {
237     rv = MOJO_RESULT_INVALID_ARGUMENT;
238   } else {
239     DCHECK_LE(num_bytes_written + write_offset_, options_.capacity_num_bytes);
240     available_capacity_ -= num_bytes_written;
241     write_offset_ = (write_offset_ + num_bytes_written) %
242         options_.capacity_num_bytes;
243 
244     base::AutoUnlock unlock(lock_);
245     NotifyWrite(num_bytes_written);
246   }
247 
248   in_two_phase_write_ = false;
249 
250   // If we're now writable, we *became* writable (since we weren't writable
251   // during the two-phase write), so awake producer awakables.
252   HandleSignalsState new_state = GetHandleSignalsStateNoLock();
253   if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
254     awakable_list_.AwakeForStateChange(new_state);
255 
256   return rv;
257 }
258 
GetHandleSignalsState() const259 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
260   base::AutoLock lock(lock_);
261   return GetHandleSignalsStateNoLock();
262 }
263 
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)264 MojoResult DataPipeProducerDispatcher::AddAwakable(
265     Awakable* awakable,
266     MojoHandleSignals signals,
267     uintptr_t context,
268     HandleSignalsState* signals_state) {
269   base::AutoLock lock(lock_);
270   if (!shared_ring_buffer_ || in_transit_) {
271     if (signals_state)
272       *signals_state = HandleSignalsState();
273     return MOJO_RESULT_INVALID_ARGUMENT;
274   }
275   UpdateSignalsStateNoLock();
276   HandleSignalsState state = GetHandleSignalsStateNoLock();
277   if (state.satisfies(signals)) {
278     if (signals_state)
279       *signals_state = state;
280     return MOJO_RESULT_ALREADY_EXISTS;
281   }
282   if (!state.can_satisfy(signals)) {
283     if (signals_state)
284       *signals_state = state;
285     return MOJO_RESULT_FAILED_PRECONDITION;
286   }
287 
288   awakable_list_.Add(awakable, signals, context);
289   return MOJO_RESULT_OK;
290 }
291 
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)292 void DataPipeProducerDispatcher::RemoveAwakable(
293     Awakable* awakable,
294     HandleSignalsState* signals_state) {
295   base::AutoLock lock(lock_);
296   if ((!shared_ring_buffer_ || in_transit_) && signals_state)
297     *signals_state = HandleSignalsState();
298   else if (signals_state)
299     *signals_state = GetHandleSignalsStateNoLock();
300   awakable_list_.Remove(awakable);
301 }
302 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)303 void DataPipeProducerDispatcher::StartSerialize(uint32_t* num_bytes,
304                                                 uint32_t* num_ports,
305                                                 uint32_t* num_handles) {
306   base::AutoLock lock(lock_);
307   DCHECK(in_transit_);
308   *num_bytes = sizeof(SerializedState);
309   *num_ports = 1;
310   *num_handles = 1;
311 }
312 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * platform_handles)313 bool DataPipeProducerDispatcher::EndSerialize(
314     void* destination,
315     ports::PortName* ports,
316     PlatformHandle* platform_handles) {
317   SerializedState* state = static_cast<SerializedState*>(destination);
318   memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions));
319   memset(state->padding, 0, sizeof(state->padding));
320 
321   base::AutoLock lock(lock_);
322   DCHECK(in_transit_);
323   state->pipe_id = pipe_id_;
324   state->write_offset = write_offset_;
325   state->available_capacity = available_capacity_;
326   state->flags = peer_closed_ ? kFlagPeerClosed : 0;
327 
328   ports[0] = control_port_.name();
329 
330   buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle();
331   platform_handles[0] = buffer_handle_for_transit_.get();
332 
333   return true;
334 }
335 
BeginTransit()336 bool DataPipeProducerDispatcher::BeginTransit() {
337   base::AutoLock lock(lock_);
338   if (in_transit_)
339     return false;
340   in_transit_ = !in_two_phase_write_;
341   return in_transit_;
342 }
343 
CompleteTransitAndClose()344 void DataPipeProducerDispatcher::CompleteTransitAndClose() {
345   node_controller_->SetPortObserver(control_port_, nullptr);
346 
347   base::AutoLock lock(lock_);
348   DCHECK(in_transit_);
349   transferred_ = true;
350   in_transit_ = false;
351   ignore_result(buffer_handle_for_transit_.release());
352   CloseNoLock();
353 }
354 
CancelTransit()355 void DataPipeProducerDispatcher::CancelTransit() {
356   base::AutoLock lock(lock_);
357   DCHECK(in_transit_);
358   in_transit_ = false;
359   buffer_handle_for_transit_.reset();
360   awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
361 }
362 
363 // static
364 scoped_refptr<DataPipeProducerDispatcher>
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)365 DataPipeProducerDispatcher::Deserialize(const void* data,
366                                         size_t num_bytes,
367                                         const ports::PortName* ports,
368                                         size_t num_ports,
369                                         PlatformHandle* handles,
370                                         size_t num_handles) {
371   if (num_ports != 1 || num_handles != 1 ||
372       num_bytes != sizeof(SerializedState)) {
373     return nullptr;
374   }
375 
376   const SerializedState* state = static_cast<const SerializedState*>(data);
377 
378   NodeController* node_controller = internal::g_core->GetNodeController();
379   ports::PortRef port;
380   if (node_controller->node()->GetPort(ports[0], &port) != ports::OK)
381     return nullptr;
382 
383   PlatformHandle buffer_handle;
384   std::swap(buffer_handle, handles[0]);
385   scoped_refptr<PlatformSharedBuffer> ring_buffer =
386       PlatformSharedBuffer::CreateFromPlatformHandle(
387           state->options.capacity_num_bytes,
388           false /* read_only */,
389           ScopedPlatformHandle(buffer_handle));
390   if (!ring_buffer) {
391     DLOG(ERROR) << "Failed to deserialize shared buffer handle.";
392     return nullptr;
393   }
394 
395   scoped_refptr<DataPipeProducerDispatcher> dispatcher =
396       new DataPipeProducerDispatcher(node_controller, port, ring_buffer,
397                                      state->options, false /* initialized */,
398                                      state->pipe_id);
399 
400   {
401     base::AutoLock lock(dispatcher->lock_);
402     dispatcher->write_offset_ = state->write_offset;
403     dispatcher->available_capacity_ = state->available_capacity;
404     dispatcher->peer_closed_ = state->flags & kFlagPeerClosed;
405     dispatcher->InitializeNoLock();
406   }
407 
408   return dispatcher;
409 }
410 
~DataPipeProducerDispatcher()411 DataPipeProducerDispatcher::~DataPipeProducerDispatcher() {
412   DCHECK(is_closed_ && !in_transit_ && !shared_ring_buffer_ &&
413          !ring_buffer_mapping_);
414 }
415 
InitializeNoLock()416 void DataPipeProducerDispatcher::InitializeNoLock() {
417   lock_.AssertAcquired();
418 
419   if (shared_ring_buffer_) {
420     ring_buffer_mapping_ =
421         shared_ring_buffer_->Map(0, options_.capacity_num_bytes);
422     if (!ring_buffer_mapping_) {
423       DLOG(ERROR) << "Failed to map shared buffer.";
424       shared_ring_buffer_ = nullptr;
425     }
426   }
427 
428   base::AutoUnlock unlock(lock_);
429   node_controller_->SetPortObserver(
430       control_port_,
431       make_scoped_refptr(new PortObserverThunk(this)));
432 }
433 
CloseNoLock()434 MojoResult DataPipeProducerDispatcher::CloseNoLock() {
435   lock_.AssertAcquired();
436   if (is_closed_ || in_transit_)
437     return MOJO_RESULT_INVALID_ARGUMENT;
438   is_closed_ = true;
439   ring_buffer_mapping_.reset();
440   shared_ring_buffer_ = nullptr;
441 
442   awakable_list_.CancelAll();
443   if (!transferred_) {
444     base::AutoUnlock unlock(lock_);
445     node_controller_->ClosePort(control_port_);
446   }
447 
448   return MOJO_RESULT_OK;
449 }
450 
GetHandleSignalsStateNoLock() const451 HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
452     const {
453   lock_.AssertAcquired();
454   HandleSignalsState rv;
455   if (!peer_closed_) {
456     if (!in_two_phase_write_ && shared_ring_buffer_ &&
457         available_capacity_ > 0)
458       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
459     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
460   } else {
461     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
462   }
463   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
464   return rv;
465 }
466 
NotifyWrite(uint32_t num_bytes)467 void DataPipeProducerDispatcher::NotifyWrite(uint32_t num_bytes) {
468   DVLOG(1) << "Data pipe producer " << pipe_id_ << " notifying peer: "
469            << num_bytes << " bytes written. [control_port="
470            << control_port_.name() << "]";
471 
472   SendDataPipeControlMessage(node_controller_, control_port_,
473                              DataPipeCommand::DATA_WAS_WRITTEN, num_bytes);
474 }
475 
OnPortStatusChanged()476 void DataPipeProducerDispatcher::OnPortStatusChanged() {
477   DCHECK(RequestContext::current());
478 
479   base::AutoLock lock(lock_);
480 
481   // We stop observing the control port as soon it's transferred, but this can
482   // race with events which are raised right before that happens. This is fine
483   // to ignore.
484   if (transferred_)
485     return;
486 
487   DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_;
488 
489   UpdateSignalsStateNoLock();
490 }
491 
UpdateSignalsStateNoLock()492 void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
493   lock_.AssertAcquired();
494 
495   bool was_peer_closed = peer_closed_;
496   size_t previous_capacity = available_capacity_;
497 
498   ports::PortStatus port_status;
499   int rv = node_controller_->node()->GetStatus(control_port_, &port_status);
500   if (rv != ports::OK || !port_status.receiving_messages) {
501     DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware of peer closure"
502              << " [control_port=" << control_port_.name() << "]";
503     peer_closed_ = true;
504   } else if (rv == ports::OK && port_status.has_messages && !in_transit_) {
505     ports::ScopedMessage message;
506     do {
507       int rv = node_controller_->node()->GetMessageIf(control_port_, nullptr,
508                                                       &message);
509       if (rv != ports::OK)
510         peer_closed_ = true;
511       if (message) {
512         if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) {
513           peer_closed_ = true;
514           break;
515         }
516 
517         const DataPipeControlMessage* m =
518             static_cast<const DataPipeControlMessage*>(
519                 message->payload_bytes());
520 
521         if (m->command != DataPipeCommand::DATA_WAS_READ) {
522           DLOG(ERROR) << "Unexpected message from consumer.";
523           peer_closed_ = true;
524           break;
525         }
526 
527         if (static_cast<size_t>(available_capacity_) + m->num_bytes >
528               options_.capacity_num_bytes) {
529           DLOG(ERROR) << "Consumer claims to have read too many bytes.";
530           break;
531         }
532 
533         DVLOG(1) << "Data pipe producer " << pipe_id_ << " is aware that "
534                  << m->num_bytes << " bytes were read. [control_port="
535                  << control_port_.name() << "]";
536 
537         available_capacity_ += m->num_bytes;
538       }
539     } while (message);
540   }
541 
542   if (peer_closed_ != was_peer_closed ||
543       available_capacity_ != previous_capacity) {
544     awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
545   }
546 }
547 
548 }  // namespace edk
549 }  // namespace mojo
550