1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 
7 #include <limits>
8 #include <memory>
9 
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/edk/embedder/embedder_internal.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/message_for_transit.h"
16 #include "mojo/edk/system/node_controller.h"
17 #include "mojo/edk/system/ports_message.h"
18 #include "mojo/edk/system/request_context.h"
19 
20 namespace mojo {
21 namespace edk {
22 
23 namespace {
24 
25 using DispatcherHeader = MessageForTransit::DispatcherHeader;
26 using MessageHeader = MessageForTransit::MessageHeader;
27 
28 #pragma pack(push, 1)
29 
30 struct SerializedState {
31   uint64_t pipe_id;
32   int8_t endpoint;
33   char padding[7];
34 };
35 
36 static_assert(sizeof(SerializedState) % 8 == 0,
37               "Invalid SerializedState size.");
38 
39 #pragma pack(pop)
40 
41 }  // namespace
42 
43 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
44 // reference to the MPD to ensure it lives as long as the observed port.
45 class MessagePipeDispatcher::PortObserverThunk
46     : public NodeController::PortObserver {
47  public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)48   explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
49       : dispatcher_(dispatcher) {}
50 
51  private:
~PortObserverThunk()52   ~PortObserverThunk() override {}
53 
54   // NodeController::PortObserver:
OnPortStatusChanged()55   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
56 
57   scoped_refptr<MessagePipeDispatcher> dispatcher_;
58 
59   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
60 };
61 
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)62 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
63                                              const ports::PortRef& port,
64                                              uint64_t pipe_id,
65                                              int endpoint)
66     : node_controller_(node_controller),
67       port_(port),
68       pipe_id_(pipe_id),
69       endpoint_(endpoint) {
70   DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
71            << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
72 
73   node_controller_->SetPortObserver(
74       port_,
75       make_scoped_refptr(new PortObserverThunk(this)));
76 }
77 
Fuse(MessagePipeDispatcher * other)78 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
79   node_controller_->SetPortObserver(port_, nullptr);
80   node_controller_->SetPortObserver(other->port_, nullptr);
81 
82   ports::PortRef port0;
83   {
84     base::AutoLock lock(signal_lock_);
85     port0 = port_;
86     port_closed_.Set(true);
87     awakables_.CancelAll();
88   }
89 
90   ports::PortRef port1;
91   {
92     base::AutoLock lock(other->signal_lock_);
93     port1 = other->port_;
94     other->port_closed_.Set(true);
95     other->awakables_.CancelAll();
96   }
97 
98   // Both ports are always closed by this call.
99   int rv = node_controller_->MergeLocalPorts(port0, port1);
100   return rv == ports::OK;
101 }
102 
GetType() const103 Dispatcher::Type MessagePipeDispatcher::GetType() const {
104   return Type::MESSAGE_PIPE;
105 }
106 
Close()107 MojoResult MessagePipeDispatcher::Close() {
108   base::AutoLock lock(signal_lock_);
109   DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
110            << " [port=" << port_.name() << "]";
111   return CloseNoLock();
112 }
113 
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)114 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
115                                         const Watcher::WatchCallback& callback,
116                                         uintptr_t context) {
117   base::AutoLock lock(signal_lock_);
118 
119   if (port_closed_ || in_transit_)
120     return MOJO_RESULT_INVALID_ARGUMENT;
121 
122   return awakables_.AddWatcher(
123       signals, callback, context, GetHandleSignalsStateNoLock());
124 }
125 
CancelWatch(uintptr_t context)126 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
127   base::AutoLock lock(signal_lock_);
128 
129   if (port_closed_ || in_transit_)
130     return MOJO_RESULT_INVALID_ARGUMENT;
131 
132   return awakables_.RemoveWatcher(context);
133 }
134 
WriteMessage(std::unique_ptr<MessageForTransit> message,MojoWriteMessageFlags flags)135 MojoResult MessagePipeDispatcher::WriteMessage(
136     std::unique_ptr<MessageForTransit> message,
137     MojoWriteMessageFlags flags) {
138   if (port_closed_ || in_transit_)
139     return MOJO_RESULT_INVALID_ARGUMENT;
140 
141   size_t num_bytes = message->num_bytes();
142   int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
143 
144   DVLOG(2) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
145            << " [port=" << port_.name() << "; rv=" << rv
146            << "; num_bytes=" << num_bytes << "]";
147 
148   if (rv != ports::OK) {
149     if (rv == ports::ERROR_PORT_UNKNOWN ||
150         rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
151         rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
152       return MOJO_RESULT_INVALID_ARGUMENT;
153     } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
154       base::AutoLock lock(signal_lock_);
155       awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
156       return MOJO_RESULT_FAILED_PRECONDITION;
157     }
158 
159     NOTREACHED();
160     return MOJO_RESULT_UNKNOWN;
161   }
162 
163   return MOJO_RESULT_OK;
164 }
165 
ReadMessage(std::unique_ptr<MessageForTransit> * message,uint32_t * num_bytes,MojoHandle * handles,uint32_t * num_handles,MojoReadMessageFlags flags,bool read_any_size)166 MojoResult MessagePipeDispatcher::ReadMessage(
167     std::unique_ptr<MessageForTransit>* message,
168     uint32_t* num_bytes,
169     MojoHandle* handles,
170     uint32_t* num_handles,
171     MojoReadMessageFlags flags,
172     bool read_any_size) {
173   // We can't read from a port that's closed or in transit!
174   if (port_closed_ || in_transit_)
175     return MOJO_RESULT_INVALID_ARGUMENT;
176 
177   bool no_space = false;
178   bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
179   bool invalid_message = false;
180 
181   // Grab a message if the provided handles buffer is large enough. If the input
182   // |num_bytes| is provided and |read_any_size| is false, we also ensure
183   // that it specifies a size at least as large as the next available payload.
184   //
185   // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
186   // This flag exists to support both new and old API behavior.
187 
188   ports::ScopedMessage ports_message;
189   int rv = node_controller_->node()->GetMessageIf(
190       port_,
191       [read_any_size, num_bytes, num_handles, &no_space, &may_discard,
192        &invalid_message](
193           const ports::Message& next_message) {
194         const PortsMessage& message =
195             static_cast<const PortsMessage&>(next_message);
196         if (message.num_payload_bytes() < sizeof(MessageHeader)) {
197           invalid_message = true;
198           return true;
199         }
200 
201         const MessageHeader* header =
202             static_cast<const MessageHeader*>(message.payload_bytes());
203         if (header->header_size > message.num_payload_bytes()) {
204           invalid_message = true;
205           return true;
206         }
207 
208         uint32_t bytes_to_read = 0;
209         uint32_t bytes_available =
210             static_cast<uint32_t>(message.num_payload_bytes()) -
211             header->header_size;
212         if (num_bytes) {
213           bytes_to_read = std::min(*num_bytes, bytes_available);
214           *num_bytes = bytes_available;
215         }
216 
217         uint32_t handles_to_read = 0;
218         uint32_t handles_available = header->num_dispatchers;
219         if (num_handles) {
220           handles_to_read = std::min(*num_handles, handles_available);
221           *num_handles = handles_available;
222         }
223 
224         if (handles_to_read < handles_available ||
225             (!read_any_size && bytes_to_read < bytes_available)) {
226           no_space = true;
227           return may_discard;
228         }
229 
230         return true;
231       },
232       &ports_message);
233 
234   if (invalid_message)
235     return MOJO_RESULT_UNKNOWN;
236 
237   if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
238     if (rv == ports::ERROR_PORT_UNKNOWN ||
239         rv == ports::ERROR_PORT_STATE_UNEXPECTED)
240       return MOJO_RESULT_INVALID_ARGUMENT;
241 
242     NOTREACHED();
243     return MOJO_RESULT_UNKNOWN;  // TODO: Add a better error code here?
244   }
245 
246   if (no_space) {
247     // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
248     // sufficient to hold this message's data. The message will still be in
249     // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
250     return MOJO_RESULT_RESOURCE_EXHAUSTED;
251   }
252 
253   if (!ports_message) {
254     // No message was available in queue.
255 
256     if (rv == ports::OK)
257       return MOJO_RESULT_SHOULD_WAIT;
258 
259     // Peer is closed and there are no more messages to read.
260     DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
261     base::AutoLock lock(signal_lock_);
262     awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
263     return MOJO_RESULT_FAILED_PRECONDITION;
264   }
265 
266   // Alright! We have a message and the caller has provided sufficient storage
267   // in which to receive it.
268 
269   std::unique_ptr<PortsMessage> msg(
270       static_cast<PortsMessage*>(ports_message.release()));
271 
272   const MessageHeader* header =
273       static_cast<const MessageHeader*>(msg->payload_bytes());
274   const DispatcherHeader* dispatcher_headers =
275       reinterpret_cast<const DispatcherHeader*>(header + 1);
276 
277   if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
278     return MOJO_RESULT_UNKNOWN;
279 
280   // Deserialize dispatchers.
281   if (header->num_dispatchers > 0) {
282     CHECK(handles);
283     std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
284     size_t data_payload_index = sizeof(MessageHeader) +
285         header->num_dispatchers * sizeof(DispatcherHeader);
286     if (data_payload_index > header->header_size)
287       return MOJO_RESULT_UNKNOWN;
288     const char* dispatcher_data = reinterpret_cast<const char*>(
289         dispatcher_headers + header->num_dispatchers);
290     size_t port_index = 0;
291     size_t platform_handle_index = 0;
292     ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles();
293     const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0;
294     for (size_t i = 0; i < header->num_dispatchers; ++i) {
295       const DispatcherHeader& dh = dispatcher_headers[i];
296       Type type = static_cast<Type>(dh.type);
297 
298       size_t next_payload_index = data_payload_index + dh.num_bytes;
299       if (msg->num_payload_bytes() < next_payload_index ||
300           next_payload_index < data_payload_index) {
301         return MOJO_RESULT_UNKNOWN;
302       }
303 
304       size_t next_port_index = port_index + dh.num_ports;
305       if (msg->num_ports() < next_port_index || next_port_index < port_index)
306         return MOJO_RESULT_UNKNOWN;
307 
308       size_t next_platform_handle_index =
309           platform_handle_index + dh.num_platform_handles;
310       if (num_msg_handles < next_platform_handle_index ||
311           next_platform_handle_index < platform_handle_index) {
312         return MOJO_RESULT_UNKNOWN;
313       }
314 
315       PlatformHandle* out_handles =
316           num_msg_handles ? msg_handles->data() + platform_handle_index
317                           : nullptr;
318       dispatchers[i].dispatcher = Dispatcher::Deserialize(
319           type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
320           dh.num_ports, out_handles, dh.num_platform_handles);
321       if (!dispatchers[i].dispatcher)
322         return MOJO_RESULT_UNKNOWN;
323 
324       dispatcher_data += dh.num_bytes;
325       data_payload_index = next_payload_index;
326       port_index = next_port_index;
327       platform_handle_index = next_platform_handle_index;
328     }
329 
330     if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
331                                                              handles))
332       return MOJO_RESULT_UNKNOWN;
333   }
334 
335   CHECK(msg);
336   *message = MessageForTransit::WrapPortsMessage(std::move(msg));
337   return MOJO_RESULT_OK;
338 }
339 
340 HandleSignalsState
GetHandleSignalsState() const341 MessagePipeDispatcher::GetHandleSignalsState() const {
342   base::AutoLock lock(signal_lock_);
343   return GetHandleSignalsStateNoLock();
344 }
345 
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)346 MojoResult MessagePipeDispatcher::AddAwakable(
347     Awakable* awakable,
348     MojoHandleSignals signals,
349     uintptr_t context,
350     HandleSignalsState* signals_state) {
351   base::AutoLock lock(signal_lock_);
352 
353   if (port_closed_ || in_transit_) {
354     if (signals_state)
355       *signals_state = HandleSignalsState();
356     return MOJO_RESULT_INVALID_ARGUMENT;
357   }
358 
359   HandleSignalsState state = GetHandleSignalsStateNoLock();
360 
361   DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
362            << endpoint_ << " [awakable=" << awakable << "; port="
363            << port_.name() << "; signals=" << signals << "; satisfied="
364            << state.satisfied_signals << "; satisfiable="
365            << state.satisfiable_signals << "]";
366 
367   if (state.satisfies(signals)) {
368     if (signals_state)
369       *signals_state = state;
370     DVLOG(2) << "Signals already set for " << port_.name();
371     return MOJO_RESULT_ALREADY_EXISTS;
372   }
373   if (!state.can_satisfy(signals)) {
374     if (signals_state)
375       *signals_state = state;
376     DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
377     return MOJO_RESULT_FAILED_PRECONDITION;
378   }
379 
380   DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
381            << endpoint_ << " [awakable=" << awakable << "; port="
382            << port_.name() << "; signals=" << signals << "]";
383 
384   awakables_.Add(awakable, signals, context);
385   return MOJO_RESULT_OK;
386 }
387 
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)388 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
389                                            HandleSignalsState* signals_state) {
390   base::AutoLock lock(signal_lock_);
391   if (port_closed_ || in_transit_) {
392     if (signals_state)
393       *signals_state = HandleSignalsState();
394   } else if (signals_state) {
395     *signals_state = GetHandleSignalsStateNoLock();
396   }
397 
398   DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
399            << endpoint_ << " [awakable=" << awakable << "; port="
400            << port_.name() << "]";
401 
402   awakables_.Remove(awakable);
403 }
404 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)405 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
406                                            uint32_t* num_ports,
407                                            uint32_t* num_handles) {
408   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
409   *num_ports = 1;
410   *num_handles = 0;
411 }
412 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)413 bool MessagePipeDispatcher::EndSerialize(void* destination,
414                                          ports::PortName* ports,
415                                          PlatformHandle* handles) {
416   SerializedState* state = static_cast<SerializedState*>(destination);
417   state->pipe_id = pipe_id_;
418   state->endpoint = static_cast<int8_t>(endpoint_);
419   memset(state->padding, 0, sizeof(state->padding));
420   ports[0] = port_.name();
421   return true;
422 }
423 
BeginTransit()424 bool MessagePipeDispatcher::BeginTransit() {
425   base::AutoLock lock(signal_lock_);
426   if (in_transit_ || port_closed_)
427     return false;
428   in_transit_.Set(true);
429   return in_transit_;
430 }
431 
CompleteTransitAndClose()432 void MessagePipeDispatcher::CompleteTransitAndClose() {
433   node_controller_->SetPortObserver(port_, nullptr);
434 
435   base::AutoLock lock(signal_lock_);
436   port_transferred_ = true;
437   in_transit_.Set(false);
438   CloseNoLock();
439 }
440 
CancelTransit()441 void MessagePipeDispatcher::CancelTransit() {
442   base::AutoLock lock(signal_lock_);
443   in_transit_.Set(false);
444 
445   // Something may have happened while we were waiting for potential transit.
446   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
447 }
448 
449 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)450 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
451     const void* data,
452     size_t num_bytes,
453     const ports::PortName* ports,
454     size_t num_ports,
455     PlatformHandle* handles,
456     size_t num_handles) {
457   if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
458     return nullptr;
459 
460   const SerializedState* state = static_cast<const SerializedState*>(data);
461 
462   ports::PortRef port;
463   CHECK_EQ(
464       ports::OK,
465       internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
466 
467   return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
468                                    state->pipe_id, state->endpoint);
469 }
470 
~MessagePipeDispatcher()471 MessagePipeDispatcher::~MessagePipeDispatcher() {
472   DCHECK(port_closed_ && !in_transit_);
473 }
474 
CloseNoLock()475 MojoResult MessagePipeDispatcher::CloseNoLock() {
476   signal_lock_.AssertAcquired();
477   if (port_closed_ || in_transit_)
478     return MOJO_RESULT_INVALID_ARGUMENT;
479 
480   port_closed_.Set(true);
481   awakables_.CancelAll();
482 
483   if (!port_transferred_) {
484     base::AutoUnlock unlock(signal_lock_);
485     node_controller_->ClosePort(port_);
486   }
487 
488   return MOJO_RESULT_OK;
489 }
490 
GetHandleSignalsStateNoLock() const491 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
492   HandleSignalsState rv;
493 
494   ports::PortStatus port_status;
495   if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
496     CHECK(in_transit_ || port_transferred_ || port_closed_);
497     return HandleSignalsState();
498   }
499 
500   if (port_status.has_messages) {
501     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
502     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
503   }
504   if (port_status.receiving_messages)
505     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
506   if (!port_status.peer_closed) {
507     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
508     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
509     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
510   } else {
511     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
512   }
513   rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
514   return rv;
515 }
516 
OnPortStatusChanged()517 void MessagePipeDispatcher::OnPortStatusChanged() {
518   DCHECK(RequestContext::current());
519 
520   base::AutoLock lock(signal_lock_);
521 
522   // We stop observing our port as soon as it's transferred, but this can race
523   // with events which are raised right before that happens. This is fine to
524   // ignore.
525   if (port_transferred_)
526     return;
527 
528 #if DCHECK_IS_ON()
529   ports::PortStatus port_status;
530   if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
531     if (port_status.has_messages) {
532       ports::ScopedMessage unused;
533       size_t message_size = 0;
534       node_controller_->node()->GetMessageIf(
535           port_, [&message_size](const ports::Message& message) {
536             message_size = message.num_payload_bytes();
537             return false;
538           }, &unused);
539       DVLOG(2) << "New message detected on message pipe " << pipe_id_
540                << " endpoint " << endpoint_ << " [port=" << port_.name()
541                << "; size=" << message_size << "]";
542     }
543     if (port_status.peer_closed) {
544       DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
545                << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
546     }
547   }
548 #endif
549 
550   awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
551 }
552 
553 }  // namespace edk
554 }  // namespace mojo
555