1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 
7 #include <stdint.h>
8 
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/synchronization/waitable_event.h"
18 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
20 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
21 #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h"
22 
23 namespace mojo {
24 namespace internal {
25 
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31     : public base::RefCountedThreadSafe<InterfaceEndpoint>,
32       public InterfaceEndpointController {
33  public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35       : router_(router),
36         id_(id),
37         closed_(false),
38         peer_closed_(false),
39         handle_created_(false),
40         client_(nullptr) {}
41 
42   // ---------------------------------------------------------------------------
43   // The following public methods are safe to call from any sequence without
44   // locking.
45 
id() const46   InterfaceId id() const { return id_; }
47 
48   // ---------------------------------------------------------------------------
49   // The following public methods are called under the router's lock.
50 
closed() const51   bool closed() const { return closed_; }
set_closed()52   void set_closed() {
53     router_->AssertLockAcquired();
54     closed_ = true;
55   }
56 
peer_closed() const57   bool peer_closed() const { return peer_closed_; }
set_peer_closed()58   void set_peer_closed() {
59     router_->AssertLockAcquired();
60     peer_closed_ = true;
61   }
62 
handle_created() const63   bool handle_created() const { return handle_created_; }
set_handle_created()64   void set_handle_created() {
65     router_->AssertLockAcquired();
66     handle_created_ = true;
67   }
68 
disconnect_reason() const69   const base::Optional<DisconnectReason>& disconnect_reason() const {
70     return disconnect_reason_;
71   }
set_disconnect_reason(const base::Optional<DisconnectReason> & disconnect_reason)72   void set_disconnect_reason(
73       const base::Optional<DisconnectReason>& disconnect_reason) {
74     router_->AssertLockAcquired();
75     disconnect_reason_ = disconnect_reason;
76   }
77 
task_runner() const78   base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
79 
client() const80   InterfaceEndpointClient* client() const { return client_; }
81 
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)82   void AttachClient(InterfaceEndpointClient* client,
83                     scoped_refptr<base::SequencedTaskRunner> runner) {
84     router_->AssertLockAcquired();
85     DCHECK(!client_);
86     DCHECK(!closed_);
87     DCHECK(runner->RunsTasksInCurrentSequence());
88 
89     task_runner_ = std::move(runner);
90     client_ = client;
91   }
92 
93   // This method must be called on the same sequence as the corresponding
94   // AttachClient() call.
DetachClient()95   void DetachClient() {
96     router_->AssertLockAcquired();
97     DCHECK(client_);
98     DCHECK(task_runner_->RunsTasksInCurrentSequence());
99     DCHECK(!closed_);
100 
101     task_runner_ = nullptr;
102     client_ = nullptr;
103     sync_watcher_.reset();
104   }
105 
SignalSyncMessageEvent()106   void SignalSyncMessageEvent() {
107     router_->AssertLockAcquired();
108     if (sync_message_event_signaled_)
109       return;
110     sync_message_event_signaled_ = true;
111     if (sync_watcher_)
112       sync_watcher_->SignalEvent();
113   }
114 
ResetSyncMessageSignal()115   void ResetSyncMessageSignal() {
116     router_->AssertLockAcquired();
117     if (!sync_message_event_signaled_)
118       return;
119     sync_message_event_signaled_ = false;
120     if (sync_watcher_)
121       sync_watcher_->ResetEvent();
122   }
123 
124   // ---------------------------------------------------------------------------
125   // The following public methods (i.e., InterfaceEndpointController
126   // implementation) are called by the client on the same sequence as the
127   // AttachClient() call. They are called outside of the router's lock.
128 
SendMessage(Message * message)129   bool SendMessage(Message* message) override {
130     DCHECK(task_runner_->RunsTasksInCurrentSequence());
131     message->set_interface_id(id_);
132     return router_->connector_.Accept(message);
133   }
134 
AllowWokenUpBySyncWatchOnSameThread()135   void AllowWokenUpBySyncWatchOnSameThread() override {
136     DCHECK(task_runner_->RunsTasksInCurrentSequence());
137 
138     EnsureSyncWatcherExists();
139     sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
140   }
141 
SyncWatch(const bool * should_stop)142   bool SyncWatch(const bool* should_stop) override {
143     DCHECK(task_runner_->RunsTasksInCurrentSequence());
144 
145     EnsureSyncWatcherExists();
146     return sync_watcher_->SyncWatch(should_stop);
147   }
148 
149  private:
150   friend class base::RefCountedThreadSafe<InterfaceEndpoint>;
151 
~InterfaceEndpoint()152   ~InterfaceEndpoint() override {
153     router_->AssertLockAcquired();
154 
155     DCHECK(!client_);
156   }
157 
OnSyncEventSignaled()158   void OnSyncEventSignaled() {
159     DCHECK(task_runner_->RunsTasksInCurrentSequence());
160     scoped_refptr<MultiplexRouter> router_protector(router_);
161 
162     MayAutoLock locker(&router_->lock_);
163     scoped_refptr<InterfaceEndpoint> self_protector(this);
164 
165     bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
166 
167     if (!more_to_process)
168       ResetSyncMessageSignal();
169 
170     // Currently there are no queued sync messages and the peer has closed so
171     // there won't be incoming sync messages in the future.
172     if (!more_to_process && peer_closed_) {
173       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
174       // on the call stack, resetting the sync watcher will allow it to exit
175       // when the call stack unwinds to that frame.
176       sync_watcher_.reset();
177     }
178   }
179 
EnsureSyncWatcherExists()180   void EnsureSyncWatcherExists() {
181     DCHECK(task_runner_->RunsTasksInCurrentSequence());
182     if (sync_watcher_)
183       return;
184 
185     MayAutoLock locker(&router_->lock_);
186     sync_watcher_ =
187         std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating(
188             &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this)));
189     if (sync_message_event_signaled_)
190       sync_watcher_->SignalEvent();
191   }
192 
193   // ---------------------------------------------------------------------------
194   // The following members are safe to access from any sequence.
195 
196   MultiplexRouter* const router_;
197   const InterfaceId id_;
198 
199   // ---------------------------------------------------------------------------
200   // The following members are accessed under the router's lock.
201 
202   // Whether the endpoint has been closed.
203   bool closed_;
204   // Whether the peer endpoint has been closed.
205   bool peer_closed_;
206 
207   // Whether there is already a ScopedInterfaceEndpointHandle created for this
208   // endpoint.
209   bool handle_created_;
210 
211   base::Optional<DisconnectReason> disconnect_reason_;
212 
213   // The task runner on which |client_|'s methods can be called.
214   scoped_refptr<base::SequencedTaskRunner> task_runner_;
215   // Not owned. It is null if no client is attached to this endpoint.
216   InterfaceEndpointClient* client_;
217 
218   // Indicates whether the sync watcher should be signaled for this endpoint.
219   bool sync_message_event_signaled_ = false;
220 
221   // Guarded by the router's lock. Used to synchronously wait on replies.
222   std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_;
223 
224   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
225 };
226 
227 // MessageWrapper objects are always destroyed under the router's lock. On
228 // destruction, if the message it wrappers contains interface IDs, the wrapper
229 // closes the corresponding endpoints.
230 class MultiplexRouter::MessageWrapper {
231  public:
232   MessageWrapper() = default;
233 
MessageWrapper(MultiplexRouter * router,Message message)234   MessageWrapper(MultiplexRouter* router, Message message)
235       : router_(router), value_(std::move(message)) {}
236 
MessageWrapper(MessageWrapper && other)237   MessageWrapper(MessageWrapper&& other)
238       : router_(other.router_), value_(std::move(other.value_)) {}
239 
~MessageWrapper()240   ~MessageWrapper() {
241     if (!router_ || value_.IsNull())
242       return;
243 
244     router_->AssertLockAcquired();
245     // Don't try to close the endpoints if at this point the router is already
246     // half-destructed.
247     if (!router_->being_destructed_)
248       router_->CloseEndpointsForMessage(value_);
249   }
250 
operator =(MessageWrapper && other)251   MessageWrapper& operator=(MessageWrapper&& other) {
252     router_ = other.router_;
253     value_ = std::move(other.value_);
254     return *this;
255   }
256 
value() const257   const Message& value() const { return value_; }
258 
259   // Must be called outside of the router's lock.
260   // Returns a null message if it fails to deseralize the associated endpoint
261   // handles.
DeserializeEndpointHandlesAndTake()262   Message DeserializeEndpointHandlesAndTake() {
263     if (!value_.DeserializeAssociatedEndpointHandles(router_)) {
264       // The previous call may have deserialized part of the associated
265       // interface endpoint handles. They must be destroyed outside of the
266       // router's lock, so we cannot wait until destruction of MessageWrapper.
267       value_.Reset();
268       return Message();
269     }
270     return std::move(value_);
271   }
272 
273  private:
274   MultiplexRouter* router_ = nullptr;
275   Message value_;
276 
277   DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
278 };
279 
280 struct MultiplexRouter::Task {
281  public:
282   // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task283   static std::unique_ptr<Task> CreateMessageTask(
284       MessageWrapper message_wrapper) {
285     Task* task = new Task(MESSAGE);
286     task->message_wrapper = std::move(message_wrapper);
287     return base::WrapUnique(task);
288   }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task289   static std::unique_ptr<Task> CreateNotifyErrorTask(
290       InterfaceEndpoint* endpoint) {
291     Task* task = new Task(NOTIFY_ERROR);
292     task->endpoint_to_notify = endpoint;
293     return base::WrapUnique(task);
294   }
295 
~Taskmojo::internal::MultiplexRouter::Task296   ~Task() {}
297 
IsMessageTaskmojo::internal::MultiplexRouter::Task298   bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task299   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
300 
301   MessageWrapper message_wrapper;
302   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
303 
304   enum Type { MESSAGE, NOTIFY_ERROR };
305   Type type;
306 
307  private:
Taskmojo::internal::MultiplexRouter::Task308   explicit Task(Type in_type) : type(in_type) {}
309 
310   DISALLOW_COPY_AND_ASSIGN(Task);
311 };
312 
MultiplexRouter(ScopedMessagePipeHandle message_pipe,Config config,bool set_interface_id_namesapce_bit,scoped_refptr<base::SequencedTaskRunner> runner)313 MultiplexRouter::MultiplexRouter(
314     ScopedMessagePipeHandle message_pipe,
315     Config config,
316     bool set_interface_id_namesapce_bit,
317     scoped_refptr<base::SequencedTaskRunner> runner)
318     : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
319       task_runner_(runner),
320       filters_(this),
321       connector_(std::move(message_pipe),
322                  config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
323                                            : Connector::SINGLE_THREADED_SEND,
324                  std::move(runner)),
325       control_message_handler_(this),
326       control_message_proxy_(&connector_) {
327   DCHECK(task_runner_->RunsTasksInCurrentSequence());
328 
329   if (config == MULTI_INTERFACE)
330     lock_.emplace();
331 
332   if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
333       config == MULTI_INTERFACE) {
334     // Always participate in sync handle watching in multi-interface mode,
335     // because even if it doesn't expect sync requests during sync handle
336     // watching, it may still need to dispatch messages to associated endpoints
337     // on a different sequence.
338     connector_.AllowWokenUpBySyncWatchOnSameThread();
339   }
340   connector_.set_incoming_receiver(&filters_);
341   connector_.set_connection_error_handler(base::Bind(
342       &MultiplexRouter::OnPipeConnectionError, base::Unretained(this)));
343 
344   std::unique_ptr<MessageHeaderValidator> header_validator =
345       std::make_unique<MessageHeaderValidator>();
346   header_validator_ = header_validator.get();
347   filters_.Append(std::move(header_validator));
348 }
349 
~MultiplexRouter()350 MultiplexRouter::~MultiplexRouter() {
351   MayAutoLock locker(&lock_);
352 
353   being_destructed_ = true;
354 
355   sync_message_tasks_.clear();
356   tasks_.clear();
357   endpoints_.clear();
358 }
359 
AddIncomingMessageFilter(std::unique_ptr<MessageReceiver> filter)360 void MultiplexRouter::AddIncomingMessageFilter(
361     std::unique_ptr<MessageReceiver> filter) {
362   filters_.Append(std::move(filter));
363 }
364 
SetMasterInterfaceName(const char * name)365 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
366   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
367   header_validator_->SetDescription(std::string(name) +
368                                     " [master] MessageHeaderValidator");
369   control_message_handler_.SetDescription(
370       std::string(name) + " [master] PipeControlMessageHandler");
371   connector_.SetWatcherHeapProfilerTag(name);
372 }
373 
AssociateInterface(ScopedInterfaceEndpointHandle handle_to_send)374 InterfaceId MultiplexRouter::AssociateInterface(
375     ScopedInterfaceEndpointHandle handle_to_send) {
376   if (!handle_to_send.pending_association())
377     return kInvalidInterfaceId;
378 
379   uint32_t id = 0;
380   {
381     MayAutoLock locker(&lock_);
382     do {
383       if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
384         next_interface_id_value_ = 1;
385       id = next_interface_id_value_++;
386       if (set_interface_id_namespace_bit_)
387         id |= kInterfaceIdNamespaceMask;
388     } while (base::ContainsKey(endpoints_, id));
389 
390     InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
391     endpoints_[id] = endpoint;
392     if (encountered_error_)
393       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
394     endpoint->set_handle_created();
395   }
396 
397   if (!NotifyAssociation(&handle_to_send, id)) {
398     // The peer handle of |handle_to_send|, which is supposed to join this
399     // associated group, has been closed.
400     {
401       MayAutoLock locker(&lock_);
402       InterfaceEndpoint* endpoint = FindEndpoint(id);
403       if (endpoint)
404         UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
405     }
406 
407     control_message_proxy_.NotifyPeerEndpointClosed(
408         id, handle_to_send.disconnect_reason());
409   }
410   return id;
411 }
412 
CreateLocalEndpointHandle(InterfaceId id)413 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
414     InterfaceId id) {
415   if (!IsValidInterfaceId(id))
416     return ScopedInterfaceEndpointHandle();
417 
418   MayAutoLock locker(&lock_);
419   bool inserted = false;
420   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
421   if (inserted) {
422     if (encountered_error_)
423       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
424   } else {
425     if (endpoint->handle_created() || endpoint->closed())
426       return ScopedInterfaceEndpointHandle();
427   }
428 
429   endpoint->set_handle_created();
430   return CreateScopedInterfaceEndpointHandle(id);
431 }
432 
CloseEndpointHandle(InterfaceId id,const base::Optional<DisconnectReason> & reason)433 void MultiplexRouter::CloseEndpointHandle(
434     InterfaceId id,
435     const base::Optional<DisconnectReason>& reason) {
436   if (!IsValidInterfaceId(id))
437     return;
438 
439   MayAutoLock locker(&lock_);
440   DCHECK(base::ContainsKey(endpoints_, id));
441   InterfaceEndpoint* endpoint = endpoints_[id].get();
442   DCHECK(!endpoint->client());
443   DCHECK(!endpoint->closed());
444   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
445 
446   if (!IsMasterInterfaceId(id) || reason) {
447     MayAutoUnlock unlocker(&lock_);
448     control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
449   }
450 
451   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
452 }
453 
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SequencedTaskRunner> runner)454 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
455     const ScopedInterfaceEndpointHandle& handle,
456     InterfaceEndpointClient* client,
457     scoped_refptr<base::SequencedTaskRunner> runner) {
458   const InterfaceId id = handle.id();
459 
460   DCHECK(IsValidInterfaceId(id));
461   DCHECK(client);
462 
463   MayAutoLock locker(&lock_);
464   DCHECK(base::ContainsKey(endpoints_, id));
465 
466   InterfaceEndpoint* endpoint = endpoints_[id].get();
467   endpoint->AttachClient(client, std::move(runner));
468 
469   if (endpoint->peer_closed())
470     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
471   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
472 
473   return endpoint;
474 }
475 
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)476 void MultiplexRouter::DetachEndpointClient(
477     const ScopedInterfaceEndpointHandle& handle) {
478   const InterfaceId id = handle.id();
479 
480   DCHECK(IsValidInterfaceId(id));
481 
482   MayAutoLock locker(&lock_);
483   DCHECK(base::ContainsKey(endpoints_, id));
484 
485   InterfaceEndpoint* endpoint = endpoints_[id].get();
486   endpoint->DetachClient();
487 }
488 
RaiseError()489 void MultiplexRouter::RaiseError() {
490   if (task_runner_->RunsTasksInCurrentSequence()) {
491     connector_.RaiseError();
492   } else {
493     task_runner_->PostTask(FROM_HERE,
494                            base::Bind(&MultiplexRouter::RaiseError, this));
495   }
496 }
497 
PrefersSerializedMessages()498 bool MultiplexRouter::PrefersSerializedMessages() {
499   MayAutoLock locker(&lock_);
500   return connector_.PrefersSerializedMessages();
501 }
502 
CloseMessagePipe()503 void MultiplexRouter::CloseMessagePipe() {
504   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
505   connector_.CloseMessagePipe();
506   // CloseMessagePipe() above won't trigger connection error handler.
507   // Explicitly call OnPipeConnectionError() so that associated endpoints will
508   // get notified.
509   OnPipeConnectionError();
510 }
511 
PauseIncomingMethodCallProcessing()512 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
513   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
514   connector_.PauseIncomingMethodCallProcessing();
515 
516   MayAutoLock locker(&lock_);
517   paused_ = true;
518 
519   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
520     iter->second->ResetSyncMessageSignal();
521 }
522 
ResumeIncomingMethodCallProcessing()523 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
524   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
525   connector_.ResumeIncomingMethodCallProcessing();
526 
527   MayAutoLock locker(&lock_);
528   paused_ = false;
529 
530   for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
531     auto sync_iter = sync_message_tasks_.find(iter->first);
532     if (iter->second->peer_closed() ||
533         (sync_iter != sync_message_tasks_.end() &&
534          !sync_iter->second.empty())) {
535       iter->second->SignalSyncMessageEvent();
536     }
537   }
538 
539   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
540 }
541 
HasAssociatedEndpoints() const542 bool MultiplexRouter::HasAssociatedEndpoints() const {
543   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
544   MayAutoLock locker(&lock_);
545 
546   if (endpoints_.size() > 1)
547     return true;
548   if (endpoints_.size() == 0)
549     return false;
550 
551   return !base::ContainsKey(endpoints_, kMasterInterfaceId);
552 }
553 
EnableTestingMode()554 void MultiplexRouter::EnableTestingMode() {
555   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
556   MayAutoLock locker(&lock_);
557 
558   testing_mode_ = true;
559   connector_.set_enforce_errors_from_incoming_receiver(false);
560 }
561 
Accept(Message * message)562 bool MultiplexRouter::Accept(Message* message) {
563   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
564 
565   // Insert endpoints for the payload interface IDs as soon as the message
566   // arrives, instead of waiting till the message is dispatched. Consider the
567   // following sequence:
568   // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not
569   //    dispatched because a sync call is blocking the thread.
570   // 2) Sync message msg2 arrives targeting interface ID x.
571   //
572   // If we don't insert endpoint for interface ID x, when trying to dispatch
573   // msg2 we don't know whether it is an unexpected message or it is just
574   // because the message containing x hasn't been dispatched.
575   if (!InsertEndpointsForMessage(*message))
576     return false;
577 
578   scoped_refptr<MultiplexRouter> protector(this);
579   MayAutoLock locker(&lock_);
580 
581   DCHECK(!paused_);
582 
583   ClientCallBehavior client_call_behavior =
584       connector_.during_sync_handle_watcher_callback()
585           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
586           : ALLOW_DIRECT_CLIENT_CALLS;
587 
588   MessageWrapper message_wrapper(this, std::move(*message));
589   bool processed = tasks_.empty() && ProcessIncomingMessage(
590                                          &message_wrapper, client_call_behavior,
591                                          connector_.task_runner());
592 
593   if (!processed) {
594     // Either the task queue is not empty or we cannot process the message
595     // directly. In both cases, there is no need to call ProcessTasks().
596     tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
597     Task* task = tasks_.back().get();
598 
599     if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
600       InterfaceId id = task->message_wrapper.value().interface_id();
601       sync_message_tasks_[id].push_back(task);
602       InterfaceEndpoint* endpoint = FindEndpoint(id);
603       if (endpoint)
604         endpoint->SignalSyncMessageEvent();
605     }
606   } else if (!tasks_.empty()) {
607     // Processing the message may result in new tasks (for error notification)
608     // being added to the queue. In this case, we have to attempt to process the
609     // tasks.
610     ProcessTasks(client_call_behavior, connector_.task_runner());
611   }
612 
613   // Always return true. If we see errors during message processing, we will
614   // explicitly call Connector::RaiseError() to disconnect the message pipe.
615   return true;
616 }
617 
OnPeerAssociatedEndpointClosed(InterfaceId id,const base::Optional<DisconnectReason> & reason)618 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
619     InterfaceId id,
620     const base::Optional<DisconnectReason>& reason) {
621   MayAutoLock locker(&lock_);
622   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
623 
624   if (reason)
625     endpoint->set_disconnect_reason(reason);
626 
627   // It is possible that this endpoint has been set as peer closed. That is
628   // because when the message pipe is closed, all the endpoints are updated with
629   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
630   // as long as there are refs keeping the router alive. If there is a
631   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
632   // here and see that the endpoint has been marked as peer closed.
633   if (!endpoint->peer_closed()) {
634     if (endpoint->client())
635       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
636     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
637   }
638 
639   // No need to trigger a ProcessTasks() because it is already on the stack.
640 
641   return true;
642 }
643 
OnPipeConnectionError()644 void MultiplexRouter::OnPipeConnectionError() {
645   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
646 
647   scoped_refptr<MultiplexRouter> protector(this);
648   MayAutoLock locker(&lock_);
649 
650   encountered_error_ = true;
651 
652   // Calling UpdateEndpointStateMayRemove() may remove the corresponding value
653   // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore,
654   // copy the endpoint pointers to a vector and iterate over it instead.
655   std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector;
656   endpoint_vector.reserve(endpoints_.size());
657   for (const auto& pair : endpoints_)
658     endpoint_vector.push_back(pair.second);
659 
660   for (const auto& endpoint : endpoint_vector) {
661     if (endpoint->client())
662       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get()));
663 
664     UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED);
665   }
666 
667   ProcessTasks(connector_.during_sync_handle_watcher_callback()
668                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
669                    : ALLOW_DIRECT_CLIENT_CALLS,
670                connector_.task_runner());
671 }
672 
ProcessTasks(ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)673 void MultiplexRouter::ProcessTasks(
674     ClientCallBehavior client_call_behavior,
675     base::SequencedTaskRunner* current_task_runner) {
676   AssertLockAcquired();
677 
678   if (posted_to_process_tasks_)
679     return;
680 
681   while (!tasks_.empty() && !paused_) {
682     std::unique_ptr<Task> task(std::move(tasks_.front()));
683     tasks_.pop_front();
684 
685     InterfaceId id = kInvalidInterfaceId;
686     bool sync_message =
687         task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
688         task->message_wrapper.value().has_flag(Message::kFlagIsSync);
689     if (sync_message) {
690       id = task->message_wrapper.value().interface_id();
691       auto& sync_message_queue = sync_message_tasks_[id];
692       DCHECK_EQ(task.get(), sync_message_queue.front());
693       sync_message_queue.pop_front();
694     }
695 
696     bool processed =
697         task->IsNotifyErrorTask()
698             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
699                                      current_task_runner)
700             : ProcessIncomingMessage(&task->message_wrapper,
701                                      client_call_behavior, current_task_runner);
702 
703     if (!processed) {
704       if (sync_message) {
705         auto& sync_message_queue = sync_message_tasks_[id];
706         sync_message_queue.push_front(task.get());
707       }
708       tasks_.push_front(std::move(task));
709       break;
710     } else {
711       if (sync_message) {
712         auto iter = sync_message_tasks_.find(id);
713         if (iter != sync_message_tasks_.end() && iter->second.empty())
714           sync_message_tasks_.erase(iter);
715       }
716     }
717   }
718 }
719 
ProcessFirstSyncMessageForEndpoint(InterfaceId id)720 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
721   AssertLockAcquired();
722 
723   auto iter = sync_message_tasks_.find(id);
724   if (iter == sync_message_tasks_.end())
725     return false;
726 
727   if (paused_)
728     return true;
729 
730   MultiplexRouter::Task* task = iter->second.front();
731   iter->second.pop_front();
732 
733   DCHECK(task->IsMessageTask());
734   MessageWrapper message_wrapper = std::move(task->message_wrapper);
735 
736   // Note: after this call, |task| and |iter| may be invalidated.
737   bool processed = ProcessIncomingMessage(
738       &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
739   DCHECK(processed);
740 
741   iter = sync_message_tasks_.find(id);
742   if (iter == sync_message_tasks_.end())
743     return false;
744 
745   if (iter->second.empty()) {
746     sync_message_tasks_.erase(iter);
747     return false;
748   }
749 
750   return true;
751 }
752 
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)753 bool MultiplexRouter::ProcessNotifyErrorTask(
754     Task* task,
755     ClientCallBehavior client_call_behavior,
756     base::SequencedTaskRunner* current_task_runner) {
757   DCHECK(!current_task_runner ||
758          current_task_runner->RunsTasksInCurrentSequence());
759   DCHECK(!paused_);
760 
761   AssertLockAcquired();
762   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
763   if (!endpoint->client())
764     return true;
765 
766   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
767       endpoint->task_runner() != current_task_runner) {
768     MaybePostToProcessTasks(endpoint->task_runner());
769     return false;
770   }
771 
772   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
773 
774   InterfaceEndpointClient* client = endpoint->client();
775   base::Optional<DisconnectReason> disconnect_reason(
776       endpoint->disconnect_reason());
777 
778   {
779     // We must unlock before calling into |client| because it may call this
780     // object within NotifyError(). Holding the lock will lead to deadlock.
781     //
782     // It is safe to call into |client| without the lock. Because |client| is
783     // always accessed on the same sequence, including DetachEndpointClient().
784     MayAutoUnlock unlocker(&lock_);
785     client->NotifyError(disconnect_reason);
786   }
787   return true;
788 }
789 
ProcessIncomingMessage(MessageWrapper * message_wrapper,ClientCallBehavior client_call_behavior,base::SequencedTaskRunner * current_task_runner)790 bool MultiplexRouter::ProcessIncomingMessage(
791     MessageWrapper* message_wrapper,
792     ClientCallBehavior client_call_behavior,
793     base::SequencedTaskRunner* current_task_runner) {
794   DCHECK(!current_task_runner ||
795          current_task_runner->RunsTasksInCurrentSequence());
796   DCHECK(!paused_);
797   DCHECK(message_wrapper);
798   AssertLockAcquired();
799 
800   const Message* message = &message_wrapper->value();
801   if (message->IsNull()) {
802     // This is a sync message and has been processed during sync handle
803     // watching.
804     return true;
805   }
806 
807   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
808     bool result = false;
809 
810     {
811       MayAutoUnlock unlocker(&lock_);
812       Message tmp_message =
813           message_wrapper->DeserializeEndpointHandlesAndTake();
814       result = !tmp_message.IsNull() &&
815                control_message_handler_.Accept(&tmp_message);
816     }
817 
818     if (!result)
819       RaiseErrorInNonTestingMode();
820 
821     return true;
822   }
823 
824   InterfaceId id = message->interface_id();
825   DCHECK(IsValidInterfaceId(id));
826 
827   InterfaceEndpoint* endpoint = FindEndpoint(id);
828   if (!endpoint || endpoint->closed())
829     return true;
830 
831   if (!endpoint->client()) {
832     // We need to wait until a client is attached in order to dispatch further
833     // messages.
834     return false;
835   }
836 
837   bool can_direct_call;
838   if (message->has_flag(Message::kFlagIsSync)) {
839     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
840                       endpoint->task_runner()->RunsTasksInCurrentSequence();
841   } else {
842     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
843                       endpoint->task_runner() == current_task_runner;
844   }
845 
846   if (!can_direct_call) {
847     MaybePostToProcessTasks(endpoint->task_runner());
848     return false;
849   }
850 
851   DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
852 
853   InterfaceEndpointClient* client = endpoint->client();
854   bool result = false;
855   {
856     // We must unlock before calling into |client| because it may call this
857     // object within HandleIncomingMessage(). Holding the lock will lead to
858     // deadlock.
859     //
860     // It is safe to call into |client| without the lock. Because |client| is
861     // always accessed on the same sequence, including DetachEndpointClient().
862     MayAutoUnlock unlocker(&lock_);
863     Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake();
864     result =
865         !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message);
866   }
867   if (!result)
868     RaiseErrorInNonTestingMode();
869 
870   return true;
871 }
872 
MaybePostToProcessTasks(base::SequencedTaskRunner * task_runner)873 void MultiplexRouter::MaybePostToProcessTasks(
874     base::SequencedTaskRunner* task_runner) {
875   AssertLockAcquired();
876   if (posted_to_process_tasks_)
877     return;
878 
879   posted_to_process_tasks_ = true;
880   posted_to_task_runner_ = task_runner;
881   task_runner->PostTask(
882       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
883 }
884 
LockAndCallProcessTasks()885 void MultiplexRouter::LockAndCallProcessTasks() {
886   // There is no need to hold a ref to this class in this case because this is
887   // always called using base::Bind(), which holds a ref.
888   MayAutoLock locker(&lock_);
889   posted_to_process_tasks_ = false;
890   scoped_refptr<base::SequencedTaskRunner> runner(
891       std::move(posted_to_task_runner_));
892   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
893 }
894 
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)895 void MultiplexRouter::UpdateEndpointStateMayRemove(
896     InterfaceEndpoint* endpoint,
897     EndpointStateUpdateType type) {
898   if (type == ENDPOINT_CLOSED) {
899     endpoint->set_closed();
900   } else {
901     endpoint->set_peer_closed();
902     // If the interface endpoint is performing a sync watch, this makes sure
903     // it is notified and eventually exits the sync watch.
904     endpoint->SignalSyncMessageEvent();
905   }
906   if (endpoint->closed() && endpoint->peer_closed())
907     endpoints_.erase(endpoint->id());
908 }
909 
RaiseErrorInNonTestingMode()910 void MultiplexRouter::RaiseErrorInNonTestingMode() {
911   AssertLockAcquired();
912   if (!testing_mode_)
913     RaiseError();
914 }
915 
FindOrInsertEndpoint(InterfaceId id,bool * inserted)916 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
917     InterfaceId id,
918     bool* inserted) {
919   AssertLockAcquired();
920   // Either |inserted| is nullptr or it points to a boolean initialized as
921   // false.
922   DCHECK(!inserted || !*inserted);
923 
924   InterfaceEndpoint* endpoint = FindEndpoint(id);
925   if (!endpoint) {
926     endpoint = new InterfaceEndpoint(this, id);
927     endpoints_[id] = endpoint;
928     if (inserted)
929       *inserted = true;
930   }
931 
932   return endpoint;
933 }
934 
FindEndpoint(InterfaceId id)935 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint(
936     InterfaceId id) {
937   AssertLockAcquired();
938   auto iter = endpoints_.find(id);
939   return iter != endpoints_.end() ? iter->second.get() : nullptr;
940 }
941 
AssertLockAcquired()942 void MultiplexRouter::AssertLockAcquired() {
943 #if DCHECK_IS_ON()
944   if (lock_)
945     lock_->AssertAcquired();
946 #endif
947 }
948 
InsertEndpointsForMessage(const Message & message)949 bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) {
950   if (!message.is_serialized())
951     return true;
952 
953   uint32_t num_ids = message.payload_num_interface_ids();
954   if (num_ids == 0)
955     return true;
956 
957   const uint32_t* ids = message.payload_interface_ids();
958 
959   MayAutoLock locker(&lock_);
960   for (uint32_t i = 0; i < num_ids; ++i) {
961     // Message header validation already ensures that the IDs are valid and not
962     // the master ID.
963     // The IDs are from the remote side and therefore their namespace bit is
964     // supposed to be different than the value that this router would use.
965     if (set_interface_id_namespace_bit_ ==
966         HasInterfaceIdNamespaceBitSet(ids[i])) {
967       return false;
968     }
969 
970     // It is possible that the endpoint already exists even when the remote side
971     // is well-behaved: it might have notified us that the peer endpoint has
972     // closed.
973     bool inserted = false;
974     InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted);
975     if (endpoint->closed() || endpoint->handle_created())
976       return false;
977   }
978 
979   return true;
980 }
981 
CloseEndpointsForMessage(const Message & message)982 void MultiplexRouter::CloseEndpointsForMessage(const Message& message) {
983   AssertLockAcquired();
984 
985   if (!message.is_serialized())
986     return;
987 
988   uint32_t num_ids = message.payload_num_interface_ids();
989   if (num_ids == 0)
990     return;
991 
992   const uint32_t* ids = message.payload_interface_ids();
993   for (uint32_t i = 0; i < num_ids; ++i) {
994     InterfaceEndpoint* endpoint = FindEndpoint(ids[i]);
995     // If the remote side maliciously sends the same interface ID in another
996     // message which has been dispatched, we could get here with no endpoint
997     // for the ID, a closed endpoint, or an endpoint with handle created.
998     if (!endpoint || endpoint->closed() || endpoint->handle_created()) {
999       RaiseErrorInNonTestingMode();
1000       continue;
1001     }
1002 
1003     UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
1004     MayAutoUnlock unlocker(&lock_);
1005     control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt);
1006   }
1007 
1008   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
1009 }
1010 
1011 }  // namespace internal
1012 }  // namespace mojo
1013