1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6
7 #include <limits>
8 #include <memory>
9
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/edk/embedder/embedder_internal.h"
14 #include "mojo/edk/system/core.h"
15 #include "mojo/edk/system/message_for_transit.h"
16 #include "mojo/edk/system/node_controller.h"
17 #include "mojo/edk/system/ports_message.h"
18 #include "mojo/edk/system/request_context.h"
19
20 namespace mojo {
21 namespace edk {
22
23 namespace {
24
25 using DispatcherHeader = MessageForTransit::DispatcherHeader;
26 using MessageHeader = MessageForTransit::MessageHeader;
27
28 #pragma pack(push, 1)
29
30 struct SerializedState {
31 uint64_t pipe_id;
32 int8_t endpoint;
33 char padding[7];
34 };
35
36 static_assert(sizeof(SerializedState) % 8 == 0,
37 "Invalid SerializedState size.");
38
39 #pragma pack(pop)
40
41 } // namespace
42
43 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
44 // reference to the MPD to ensure it lives as long as the observed port.
45 class MessagePipeDispatcher::PortObserverThunk
46 : public NodeController::PortObserver {
47 public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)48 explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
49 : dispatcher_(dispatcher) {}
50
51 private:
~PortObserverThunk()52 ~PortObserverThunk() override {}
53
54 // NodeController::PortObserver:
OnPortStatusChanged()55 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
56
57 scoped_refptr<MessagePipeDispatcher> dispatcher_;
58
59 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
60 };
61
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)62 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
63 const ports::PortRef& port,
64 uint64_t pipe_id,
65 int endpoint)
66 : node_controller_(node_controller),
67 port_(port),
68 pipe_id_(pipe_id),
69 endpoint_(endpoint) {
70 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
71 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
72
73 node_controller_->SetPortObserver(
74 port_,
75 make_scoped_refptr(new PortObserverThunk(this)));
76 }
77
Fuse(MessagePipeDispatcher * other)78 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
79 node_controller_->SetPortObserver(port_, nullptr);
80 node_controller_->SetPortObserver(other->port_, nullptr);
81
82 ports::PortRef port0;
83 {
84 base::AutoLock lock(signal_lock_);
85 port0 = port_;
86 port_closed_.Set(true);
87 awakables_.CancelAll();
88 }
89
90 ports::PortRef port1;
91 {
92 base::AutoLock lock(other->signal_lock_);
93 port1 = other->port_;
94 other->port_closed_.Set(true);
95 other->awakables_.CancelAll();
96 }
97
98 // Both ports are always closed by this call.
99 int rv = node_controller_->MergeLocalPorts(port0, port1);
100 return rv == ports::OK;
101 }
102
GetType() const103 Dispatcher::Type MessagePipeDispatcher::GetType() const {
104 return Type::MESSAGE_PIPE;
105 }
106
Close()107 MojoResult MessagePipeDispatcher::Close() {
108 base::AutoLock lock(signal_lock_);
109 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
110 << " [port=" << port_.name() << "]";
111 return CloseNoLock();
112 }
113
Watch(MojoHandleSignals signals,const Watcher::WatchCallback & callback,uintptr_t context)114 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
115 const Watcher::WatchCallback& callback,
116 uintptr_t context) {
117 base::AutoLock lock(signal_lock_);
118
119 if (port_closed_ || in_transit_)
120 return MOJO_RESULT_INVALID_ARGUMENT;
121
122 return awakables_.AddWatcher(
123 signals, callback, context, GetHandleSignalsStateNoLock());
124 }
125
CancelWatch(uintptr_t context)126 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
127 base::AutoLock lock(signal_lock_);
128
129 if (port_closed_ || in_transit_)
130 return MOJO_RESULT_INVALID_ARGUMENT;
131
132 return awakables_.RemoveWatcher(context);
133 }
134
WriteMessage(std::unique_ptr<MessageForTransit> message,MojoWriteMessageFlags flags)135 MojoResult MessagePipeDispatcher::WriteMessage(
136 std::unique_ptr<MessageForTransit> message,
137 MojoWriteMessageFlags flags) {
138 if (port_closed_ || in_transit_)
139 return MOJO_RESULT_INVALID_ARGUMENT;
140
141 size_t num_bytes = message->num_bytes();
142 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage());
143
144 DVLOG(2) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
145 << " [port=" << port_.name() << "; rv=" << rv
146 << "; num_bytes=" << num_bytes << "]";
147
148 if (rv != ports::OK) {
149 if (rv == ports::ERROR_PORT_UNKNOWN ||
150 rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
151 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
152 return MOJO_RESULT_INVALID_ARGUMENT;
153 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
154 base::AutoLock lock(signal_lock_);
155 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
156 return MOJO_RESULT_FAILED_PRECONDITION;
157 }
158
159 NOTREACHED();
160 return MOJO_RESULT_UNKNOWN;
161 }
162
163 return MOJO_RESULT_OK;
164 }
165
ReadMessage(std::unique_ptr<MessageForTransit> * message,uint32_t * num_bytes,MojoHandle * handles,uint32_t * num_handles,MojoReadMessageFlags flags,bool read_any_size)166 MojoResult MessagePipeDispatcher::ReadMessage(
167 std::unique_ptr<MessageForTransit>* message,
168 uint32_t* num_bytes,
169 MojoHandle* handles,
170 uint32_t* num_handles,
171 MojoReadMessageFlags flags,
172 bool read_any_size) {
173 // We can't read from a port that's closed or in transit!
174 if (port_closed_ || in_transit_)
175 return MOJO_RESULT_INVALID_ARGUMENT;
176
177 bool no_space = false;
178 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
179 bool invalid_message = false;
180
181 // Grab a message if the provided handles buffer is large enough. If the input
182 // |num_bytes| is provided and |read_any_size| is false, we also ensure
183 // that it specifies a size at least as large as the next available payload.
184 //
185 // If |read_any_size| is true, the input value of |*num_bytes| is ignored.
186 // This flag exists to support both new and old API behavior.
187
188 ports::ScopedMessage ports_message;
189 int rv = node_controller_->node()->GetMessageIf(
190 port_,
191 [read_any_size, num_bytes, num_handles, &no_space, &may_discard,
192 &invalid_message](
193 const ports::Message& next_message) {
194 const PortsMessage& message =
195 static_cast<const PortsMessage&>(next_message);
196 if (message.num_payload_bytes() < sizeof(MessageHeader)) {
197 invalid_message = true;
198 return true;
199 }
200
201 const MessageHeader* header =
202 static_cast<const MessageHeader*>(message.payload_bytes());
203 if (header->header_size > message.num_payload_bytes()) {
204 invalid_message = true;
205 return true;
206 }
207
208 uint32_t bytes_to_read = 0;
209 uint32_t bytes_available =
210 static_cast<uint32_t>(message.num_payload_bytes()) -
211 header->header_size;
212 if (num_bytes) {
213 bytes_to_read = std::min(*num_bytes, bytes_available);
214 *num_bytes = bytes_available;
215 }
216
217 uint32_t handles_to_read = 0;
218 uint32_t handles_available = header->num_dispatchers;
219 if (num_handles) {
220 handles_to_read = std::min(*num_handles, handles_available);
221 *num_handles = handles_available;
222 }
223
224 if (handles_to_read < handles_available ||
225 (!read_any_size && bytes_to_read < bytes_available)) {
226 no_space = true;
227 return may_discard;
228 }
229
230 return true;
231 },
232 &ports_message);
233
234 if (invalid_message)
235 return MOJO_RESULT_UNKNOWN;
236
237 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
238 if (rv == ports::ERROR_PORT_UNKNOWN ||
239 rv == ports::ERROR_PORT_STATE_UNEXPECTED)
240 return MOJO_RESULT_INVALID_ARGUMENT;
241
242 NOTREACHED();
243 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here?
244 }
245
246 if (no_space) {
247 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
248 // sufficient to hold this message's data. The message will still be in
249 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
250 return MOJO_RESULT_RESOURCE_EXHAUSTED;
251 }
252
253 if (!ports_message) {
254 // No message was available in queue.
255
256 if (rv == ports::OK)
257 return MOJO_RESULT_SHOULD_WAIT;
258
259 // Peer is closed and there are no more messages to read.
260 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
261 base::AutoLock lock(signal_lock_);
262 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
263 return MOJO_RESULT_FAILED_PRECONDITION;
264 }
265
266 // Alright! We have a message and the caller has provided sufficient storage
267 // in which to receive it.
268
269 std::unique_ptr<PortsMessage> msg(
270 static_cast<PortsMessage*>(ports_message.release()));
271
272 const MessageHeader* header =
273 static_cast<const MessageHeader*>(msg->payload_bytes());
274 const DispatcherHeader* dispatcher_headers =
275 reinterpret_cast<const DispatcherHeader*>(header + 1);
276
277 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max())
278 return MOJO_RESULT_UNKNOWN;
279
280 // Deserialize dispatchers.
281 if (header->num_dispatchers > 0) {
282 CHECK(handles);
283 std::vector<DispatcherInTransit> dispatchers(header->num_dispatchers);
284 size_t data_payload_index = sizeof(MessageHeader) +
285 header->num_dispatchers * sizeof(DispatcherHeader);
286 if (data_payload_index > header->header_size)
287 return MOJO_RESULT_UNKNOWN;
288 const char* dispatcher_data = reinterpret_cast<const char*>(
289 dispatcher_headers + header->num_dispatchers);
290 size_t port_index = 0;
291 size_t platform_handle_index = 0;
292 ScopedPlatformHandleVectorPtr msg_handles = msg->TakeHandles();
293 const size_t num_msg_handles = msg_handles ? msg_handles->size() : 0;
294 for (size_t i = 0; i < header->num_dispatchers; ++i) {
295 const DispatcherHeader& dh = dispatcher_headers[i];
296 Type type = static_cast<Type>(dh.type);
297
298 size_t next_payload_index = data_payload_index + dh.num_bytes;
299 if (msg->num_payload_bytes() < next_payload_index ||
300 next_payload_index < data_payload_index) {
301 return MOJO_RESULT_UNKNOWN;
302 }
303
304 size_t next_port_index = port_index + dh.num_ports;
305 if (msg->num_ports() < next_port_index || next_port_index < port_index)
306 return MOJO_RESULT_UNKNOWN;
307
308 size_t next_platform_handle_index =
309 platform_handle_index + dh.num_platform_handles;
310 if (num_msg_handles < next_platform_handle_index ||
311 next_platform_handle_index < platform_handle_index) {
312 return MOJO_RESULT_UNKNOWN;
313 }
314
315 PlatformHandle* out_handles =
316 num_msg_handles ? msg_handles->data() + platform_handle_index
317 : nullptr;
318 dispatchers[i].dispatcher = Dispatcher::Deserialize(
319 type, dispatcher_data, dh.num_bytes, msg->ports() + port_index,
320 dh.num_ports, out_handles, dh.num_platform_handles);
321 if (!dispatchers[i].dispatcher)
322 return MOJO_RESULT_UNKNOWN;
323
324 dispatcher_data += dh.num_bytes;
325 data_payload_index = next_payload_index;
326 port_index = next_port_index;
327 platform_handle_index = next_platform_handle_index;
328 }
329
330 if (!node_controller_->core()->AddDispatchersFromTransit(dispatchers,
331 handles))
332 return MOJO_RESULT_UNKNOWN;
333 }
334
335 CHECK(msg);
336 *message = MessageForTransit::WrapPortsMessage(std::move(msg));
337 return MOJO_RESULT_OK;
338 }
339
340 HandleSignalsState
GetHandleSignalsState() const341 MessagePipeDispatcher::GetHandleSignalsState() const {
342 base::AutoLock lock(signal_lock_);
343 return GetHandleSignalsStateNoLock();
344 }
345
AddAwakable(Awakable * awakable,MojoHandleSignals signals,uintptr_t context,HandleSignalsState * signals_state)346 MojoResult MessagePipeDispatcher::AddAwakable(
347 Awakable* awakable,
348 MojoHandleSignals signals,
349 uintptr_t context,
350 HandleSignalsState* signals_state) {
351 base::AutoLock lock(signal_lock_);
352
353 if (port_closed_ || in_transit_) {
354 if (signals_state)
355 *signals_state = HandleSignalsState();
356 return MOJO_RESULT_INVALID_ARGUMENT;
357 }
358
359 HandleSignalsState state = GetHandleSignalsStateNoLock();
360
361 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
362 << endpoint_ << " [awakable=" << awakable << "; port="
363 << port_.name() << "; signals=" << signals << "; satisfied="
364 << state.satisfied_signals << "; satisfiable="
365 << state.satisfiable_signals << "]";
366
367 if (state.satisfies(signals)) {
368 if (signals_state)
369 *signals_state = state;
370 DVLOG(2) << "Signals already set for " << port_.name();
371 return MOJO_RESULT_ALREADY_EXISTS;
372 }
373 if (!state.can_satisfy(signals)) {
374 if (signals_state)
375 *signals_state = state;
376 DVLOG(2) << "Signals impossible to satisfy for " << port_.name();
377 return MOJO_RESULT_FAILED_PRECONDITION;
378 }
379
380 DVLOG(2) << "Adding awakable to pipe " << pipe_id_ << " endpoint "
381 << endpoint_ << " [awakable=" << awakable << "; port="
382 << port_.name() << "; signals=" << signals << "]";
383
384 awakables_.Add(awakable, signals, context);
385 return MOJO_RESULT_OK;
386 }
387
RemoveAwakable(Awakable * awakable,HandleSignalsState * signals_state)388 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
389 HandleSignalsState* signals_state) {
390 base::AutoLock lock(signal_lock_);
391 if (port_closed_ || in_transit_) {
392 if (signals_state)
393 *signals_state = HandleSignalsState();
394 } else if (signals_state) {
395 *signals_state = GetHandleSignalsStateNoLock();
396 }
397
398 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
399 << endpoint_ << " [awakable=" << awakable << "; port="
400 << port_.name() << "]";
401
402 awakables_.Remove(awakable);
403 }
404
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)405 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
406 uint32_t* num_ports,
407 uint32_t* num_handles) {
408 *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
409 *num_ports = 1;
410 *num_handles = 0;
411 }
412
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)413 bool MessagePipeDispatcher::EndSerialize(void* destination,
414 ports::PortName* ports,
415 PlatformHandle* handles) {
416 SerializedState* state = static_cast<SerializedState*>(destination);
417 state->pipe_id = pipe_id_;
418 state->endpoint = static_cast<int8_t>(endpoint_);
419 memset(state->padding, 0, sizeof(state->padding));
420 ports[0] = port_.name();
421 return true;
422 }
423
BeginTransit()424 bool MessagePipeDispatcher::BeginTransit() {
425 base::AutoLock lock(signal_lock_);
426 if (in_transit_ || port_closed_)
427 return false;
428 in_transit_.Set(true);
429 return in_transit_;
430 }
431
CompleteTransitAndClose()432 void MessagePipeDispatcher::CompleteTransitAndClose() {
433 node_controller_->SetPortObserver(port_, nullptr);
434
435 base::AutoLock lock(signal_lock_);
436 port_transferred_ = true;
437 in_transit_.Set(false);
438 CloseNoLock();
439 }
440
CancelTransit()441 void MessagePipeDispatcher::CancelTransit() {
442 base::AutoLock lock(signal_lock_);
443 in_transit_.Set(false);
444
445 // Something may have happened while we were waiting for potential transit.
446 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
447 }
448
449 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)450 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
451 const void* data,
452 size_t num_bytes,
453 const ports::PortName* ports,
454 size_t num_ports,
455 PlatformHandle* handles,
456 size_t num_handles) {
457 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
458 return nullptr;
459
460 const SerializedState* state = static_cast<const SerializedState*>(data);
461
462 ports::PortRef port;
463 CHECK_EQ(
464 ports::OK,
465 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
466
467 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
468 state->pipe_id, state->endpoint);
469 }
470
~MessagePipeDispatcher()471 MessagePipeDispatcher::~MessagePipeDispatcher() {
472 DCHECK(port_closed_ && !in_transit_);
473 }
474
CloseNoLock()475 MojoResult MessagePipeDispatcher::CloseNoLock() {
476 signal_lock_.AssertAcquired();
477 if (port_closed_ || in_transit_)
478 return MOJO_RESULT_INVALID_ARGUMENT;
479
480 port_closed_.Set(true);
481 awakables_.CancelAll();
482
483 if (!port_transferred_) {
484 base::AutoUnlock unlock(signal_lock_);
485 node_controller_->ClosePort(port_);
486 }
487
488 return MOJO_RESULT_OK;
489 }
490
GetHandleSignalsStateNoLock() const491 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
492 HandleSignalsState rv;
493
494 ports::PortStatus port_status;
495 if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
496 CHECK(in_transit_ || port_transferred_ || port_closed_);
497 return HandleSignalsState();
498 }
499
500 if (port_status.has_messages) {
501 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
502 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
503 }
504 if (port_status.receiving_messages)
505 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
506 if (!port_status.peer_closed) {
507 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
508 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
509 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
510 } else {
511 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
512 }
513 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
514 return rv;
515 }
516
OnPortStatusChanged()517 void MessagePipeDispatcher::OnPortStatusChanged() {
518 DCHECK(RequestContext::current());
519
520 base::AutoLock lock(signal_lock_);
521
522 // We stop observing our port as soon as it's transferred, but this can race
523 // with events which are raised right before that happens. This is fine to
524 // ignore.
525 if (port_transferred_)
526 return;
527
528 #if DCHECK_IS_ON()
529 ports::PortStatus port_status;
530 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
531 if (port_status.has_messages) {
532 ports::ScopedMessage unused;
533 size_t message_size = 0;
534 node_controller_->node()->GetMessageIf(
535 port_, [&message_size](const ports::Message& message) {
536 message_size = message.num_payload_bytes();
537 return false;
538 }, &unused);
539 DVLOG(2) << "New message detected on message pipe " << pipe_id_
540 << " endpoint " << endpoint_ << " [port=" << port_.name()
541 << "; size=" << message_size << "]";
542 }
543 if (port_status.peer_closed) {
544 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
545 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
546 }
547 }
548 #endif
549
550 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
551 }
552
553 } // namespace edk
554 } // namespace mojo
555