1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/message_pipe_dispatcher.h"
6 
7 #include <limits>
8 #include <memory>
9 
10 #include "base/logging.h"
11 #include "base/macros.h"
12 #include "base/memory/ref_counted.h"
13 #include "mojo/core/core.h"
14 #include "mojo/core/node_controller.h"
15 #include "mojo/core/ports/event.h"
16 #include "mojo/core/ports/message_filter.h"
17 #include "mojo/core/request_context.h"
18 #include "mojo/core/user_message_impl.h"
19 
20 namespace mojo {
21 namespace core {
22 
23 namespace {
24 
25 #pragma pack(push, 1)
26 
27 struct SerializedState {
28   uint64_t pipe_id;
29   int8_t endpoint;
30   char padding[7];
31 };
32 
33 static_assert(sizeof(SerializedState) % 8 == 0,
34               "Invalid SerializedState size.");
35 
36 #pragma pack(pop)
37 
38 }  // namespace
39 
40 // A PortObserver which forwards to a MessagePipeDispatcher. This owns a
41 // reference to the MPD to ensure it lives as long as the observed port.
42 class MessagePipeDispatcher::PortObserverThunk
43     : public NodeController::PortObserver {
44  public:
PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)45   explicit PortObserverThunk(scoped_refptr<MessagePipeDispatcher> dispatcher)
46       : dispatcher_(dispatcher) {}
47 
48  private:
~PortObserverThunk()49   ~PortObserverThunk() override {}
50 
51   // NodeController::PortObserver:
OnPortStatusChanged()52   void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); }
53 
54   scoped_refptr<MessagePipeDispatcher> dispatcher_;
55 
56   DISALLOW_COPY_AND_ASSIGN(PortObserverThunk);
57 };
58 
59 #if DCHECK_IS_ON()
60 
61 // A MessageFilter which never matches a message. Used to peek at the size of
62 // the next available message on a port, for debug logging only.
63 class PeekSizeMessageFilter : public ports::MessageFilter {
64  public:
PeekSizeMessageFilter()65   PeekSizeMessageFilter() {}
~PeekSizeMessageFilter()66   ~PeekSizeMessageFilter() override {}
67 
68   // ports::MessageFilter:
Match(const ports::UserMessageEvent & message_event)69   bool Match(const ports::UserMessageEvent& message_event) override {
70     const auto* message = message_event.GetMessage<UserMessageImpl>();
71     if (message->IsSerialized())
72       message_size_ = message->user_payload_size();
73     return false;
74   }
75 
message_size() const76   size_t message_size() const { return message_size_; }
77 
78  private:
79   size_t message_size_ = 0;
80 
81   DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter);
82 };
83 
84 #endif  // DCHECK_IS_ON()
85 
MessagePipeDispatcher(NodeController * node_controller,const ports::PortRef & port,uint64_t pipe_id,int endpoint)86 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
87                                              const ports::PortRef& port,
88                                              uint64_t pipe_id,
89                                              int endpoint)
90     : node_controller_(node_controller),
91       port_(port),
92       pipe_id_(pipe_id),
93       endpoint_(endpoint),
94       watchers_(this) {
95   DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
96            << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
97 
98   node_controller_->SetPortObserver(
99       port_, base::MakeRefCounted<PortObserverThunk>(this));
100 }
101 
Fuse(MessagePipeDispatcher * other)102 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
103   node_controller_->SetPortObserver(port_, nullptr);
104   node_controller_->SetPortObserver(other->port_, nullptr);
105 
106   ports::PortRef port0;
107   {
108     base::AutoLock lock(signal_lock_);
109     port0 = port_;
110     port_closed_.Set(true);
111     watchers_.NotifyClosed();
112   }
113 
114   ports::PortRef port1;
115   {
116     base::AutoLock lock(other->signal_lock_);
117     port1 = other->port_;
118     other->port_closed_.Set(true);
119     other->watchers_.NotifyClosed();
120   }
121 
122   // Both ports are always closed by this call.
123   int rv = node_controller_->MergeLocalPorts(port0, port1);
124   return rv == ports::OK;
125 }
126 
GetType() const127 Dispatcher::Type MessagePipeDispatcher::GetType() const {
128   return Type::MESSAGE_PIPE;
129 }
130 
Close()131 MojoResult MessagePipeDispatcher::Close() {
132   base::AutoLock lock(signal_lock_);
133   DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
134            << " [port=" << port_.name() << "]";
135   return CloseNoLock();
136 }
137 
WriteMessage(std::unique_ptr<ports::UserMessageEvent> message)138 MojoResult MessagePipeDispatcher::WriteMessage(
139     std::unique_ptr<ports::UserMessageEvent> message) {
140   if (port_closed_ || in_transit_)
141     return MOJO_RESULT_INVALID_ARGUMENT;
142 
143   int rv = node_controller_->SendUserMessage(port_, std::move(message));
144 
145   DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_
146            << " [port=" << port_.name() << "; rv=" << rv << "]";
147 
148   if (rv != ports::OK) {
149     if (rv == ports::ERROR_PORT_UNKNOWN ||
150         rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
151         rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
152       return MOJO_RESULT_INVALID_ARGUMENT;
153     } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
154       return MOJO_RESULT_FAILED_PRECONDITION;
155     }
156 
157     NOTREACHED();
158     return MOJO_RESULT_UNKNOWN;
159   }
160 
161   return MOJO_RESULT_OK;
162 }
163 
ReadMessage(std::unique_ptr<ports::UserMessageEvent> * message)164 MojoResult MessagePipeDispatcher::ReadMessage(
165     std::unique_ptr<ports::UserMessageEvent>* message) {
166   // We can't read from a port that's closed or in transit!
167   if (port_closed_ || in_transit_)
168     return MOJO_RESULT_INVALID_ARGUMENT;
169 
170   int rv = node_controller_->node()->GetMessage(port_, message, nullptr);
171   if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) {
172     if (rv == ports::ERROR_PORT_UNKNOWN ||
173         rv == ports::ERROR_PORT_STATE_UNEXPECTED)
174       return MOJO_RESULT_INVALID_ARGUMENT;
175 
176     NOTREACHED();
177     return MOJO_RESULT_UNKNOWN;
178   }
179 
180   if (!*message) {
181     // No message was available in queue.
182     if (rv == ports::OK)
183       return MOJO_RESULT_SHOULD_WAIT;
184     // Peer is closed and there are no more messages to read.
185     DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
186     return MOJO_RESULT_FAILED_PRECONDITION;
187   }
188 
189   // We may need to update anyone watching our signals in case we just read the
190   // last available message.
191   base::AutoLock lock(signal_lock_);
192   watchers_.NotifyState(GetHandleSignalsStateNoLock());
193   return MOJO_RESULT_OK;
194 }
195 
SetQuota(MojoQuotaType type,uint64_t limit)196 MojoResult MessagePipeDispatcher::SetQuota(MojoQuotaType type, uint64_t limit) {
197   switch (type) {
198     case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
199       if (limit == MOJO_QUOTA_LIMIT_NONE)
200         receive_queue_length_limit_.reset();
201       else
202         receive_queue_length_limit_ = limit;
203       break;
204 
205     case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
206       if (limit == MOJO_QUOTA_LIMIT_NONE)
207         receive_queue_memory_size_limit_.reset();
208       else
209         receive_queue_memory_size_limit_ = limit;
210       break;
211 
212     default:
213       return MOJO_RESULT_INVALID_ARGUMENT;
214   }
215 
216   return MOJO_RESULT_OK;
217 }
218 
QueryQuota(MojoQuotaType type,uint64_t * limit,uint64_t * usage)219 MojoResult MessagePipeDispatcher::QueryQuota(MojoQuotaType type,
220                                              uint64_t* limit,
221                                              uint64_t* usage) {
222   ports::PortStatus port_status;
223   if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
224     CHECK(in_transit_ || port_transferred_ || port_closed_);
225     return MOJO_RESULT_INVALID_ARGUMENT;
226   }
227 
228   switch (type) {
229     case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
230       *limit = receive_queue_length_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
231       *usage = port_status.queued_message_count;
232       break;
233 
234     case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
235       *limit = receive_queue_memory_size_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
236       *usage = port_status.queued_num_bytes;
237       break;
238 
239     default:
240       return MOJO_RESULT_INVALID_ARGUMENT;
241   }
242 
243   return MOJO_RESULT_OK;
244 }
245 
GetHandleSignalsState() const246 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsState() const {
247   base::AutoLock lock(signal_lock_);
248   return GetHandleSignalsStateNoLock();
249 }
250 
AddWatcherRef(const scoped_refptr<WatcherDispatcher> & watcher,uintptr_t context)251 MojoResult MessagePipeDispatcher::AddWatcherRef(
252     const scoped_refptr<WatcherDispatcher>& watcher,
253     uintptr_t context) {
254   base::AutoLock lock(signal_lock_);
255   if (port_closed_ || in_transit_)
256     return MOJO_RESULT_INVALID_ARGUMENT;
257   return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
258 }
259 
RemoveWatcherRef(WatcherDispatcher * watcher,uintptr_t context)260 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
261                                                    uintptr_t context) {
262   base::AutoLock lock(signal_lock_);
263   if (port_closed_ || in_transit_)
264     return MOJO_RESULT_INVALID_ARGUMENT;
265   return watchers_.Remove(watcher, context);
266 }
267 
StartSerialize(uint32_t * num_bytes,uint32_t * num_ports,uint32_t * num_handles)268 void MessagePipeDispatcher::StartSerialize(uint32_t* num_bytes,
269                                            uint32_t* num_ports,
270                                            uint32_t* num_handles) {
271   *num_bytes = static_cast<uint32_t>(sizeof(SerializedState));
272   *num_ports = 1;
273   *num_handles = 0;
274 }
275 
EndSerialize(void * destination,ports::PortName * ports,PlatformHandle * handles)276 bool MessagePipeDispatcher::EndSerialize(void* destination,
277                                          ports::PortName* ports,
278                                          PlatformHandle* handles) {
279   SerializedState* state = static_cast<SerializedState*>(destination);
280   state->pipe_id = pipe_id_;
281   state->endpoint = static_cast<int8_t>(endpoint_);
282   memset(state->padding, 0, sizeof(state->padding));
283   ports[0] = port_.name();
284   return true;
285 }
286 
BeginTransit()287 bool MessagePipeDispatcher::BeginTransit() {
288   base::AutoLock lock(signal_lock_);
289   if (in_transit_ || port_closed_)
290     return false;
291   in_transit_.Set(true);
292   return in_transit_;
293 }
294 
CompleteTransitAndClose()295 void MessagePipeDispatcher::CompleteTransitAndClose() {
296   node_controller_->SetPortObserver(port_, nullptr);
297 
298   base::AutoLock lock(signal_lock_);
299   port_transferred_ = true;
300   in_transit_.Set(false);
301   CloseNoLock();
302 }
303 
CancelTransit()304 void MessagePipeDispatcher::CancelTransit() {
305   base::AutoLock lock(signal_lock_);
306   in_transit_.Set(false);
307 
308   // Something may have happened while we were waiting for potential transit.
309   watchers_.NotifyState(GetHandleSignalsStateNoLock());
310 }
311 
312 // static
Deserialize(const void * data,size_t num_bytes,const ports::PortName * ports,size_t num_ports,PlatformHandle * handles,size_t num_handles)313 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
314     const void* data,
315     size_t num_bytes,
316     const ports::PortName* ports,
317     size_t num_ports,
318     PlatformHandle* handles,
319     size_t num_handles) {
320   if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
321     return nullptr;
322 
323   const SerializedState* state = static_cast<const SerializedState*>(data);
324 
325   ports::Node* node = Core::Get()->GetNodeController()->node();
326   ports::PortRef port;
327   if (node->GetPort(ports[0], &port) != ports::OK)
328     return nullptr;
329 
330   ports::PortStatus status;
331   if (node->GetStatus(port, &status) != ports::OK)
332     return nullptr;
333 
334   return new MessagePipeDispatcher(Core::Get()->GetNodeController(), port,
335                                    state->pipe_id, state->endpoint);
336 }
337 
338 MessagePipeDispatcher::~MessagePipeDispatcher() = default;
339 
CloseNoLock()340 MojoResult MessagePipeDispatcher::CloseNoLock() {
341   signal_lock_.AssertAcquired();
342   if (port_closed_ || in_transit_)
343     return MOJO_RESULT_INVALID_ARGUMENT;
344 
345   port_closed_.Set(true);
346   watchers_.NotifyClosed();
347 
348   if (!port_transferred_) {
349     base::AutoUnlock unlock(signal_lock_);
350     node_controller_->ClosePort(port_);
351   }
352 
353   return MOJO_RESULT_OK;
354 }
355 
GetHandleSignalsStateNoLock() const356 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
357   HandleSignalsState rv;
358 
359   ports::PortStatus port_status;
360   if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
361     CHECK(in_transit_ || port_transferred_ || port_closed_);
362     return HandleSignalsState();
363   }
364 
365   if (port_status.has_messages) {
366     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
367     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
368   }
369   if (port_status.receiving_messages)
370     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
371   if (!port_status.peer_closed) {
372     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
373     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
374     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
375     rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
376     if (port_status.peer_remote)
377       rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_REMOTE;
378   } else {
379     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
380   }
381   if (receive_queue_length_limit_ &&
382       port_status.queued_message_count > *receive_queue_length_limit_) {
383     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
384   } else if (receive_queue_memory_size_limit_ &&
385              port_status.queued_num_bytes > *receive_queue_memory_size_limit_) {
386     rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
387   }
388   rv.satisfiable_signals |=
389       MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
390   return rv;
391 }
392 
OnPortStatusChanged()393 void MessagePipeDispatcher::OnPortStatusChanged() {
394   DCHECK(RequestContext::current());
395 
396   base::AutoLock lock(signal_lock_);
397 
398   // We stop observing our port as soon as it's transferred, but this can race
399   // with events which are raised right before that happens. This is fine to
400   // ignore.
401   if (port_transferred_)
402     return;
403 
404 #if DCHECK_IS_ON()
405   ports::PortStatus port_status;
406   if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) {
407     if (port_status.has_messages) {
408       std::unique_ptr<ports::UserMessageEvent> unused;
409       PeekSizeMessageFilter filter;
410       node_controller_->node()->GetMessage(port_, &unused, &filter);
411       DVLOG(4) << "New message detected on message pipe " << pipe_id_
412                << " endpoint " << endpoint_ << " [port=" << port_.name()
413                << "; size=" << filter.message_size() << "]";
414     }
415     if (port_status.peer_closed) {
416       DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_
417                << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
418     }
419   }
420 #endif
421 
422   watchers_.NotifyState(GetHandleSignalsStateNoLock());
423 }
424 
425 }  // namespace core
426 }  // namespace mojo
427