1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 
7 #include <stdint.h>
8 
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/stl_util.h"
17 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/associated_group.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
21 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
22 
23 namespace mojo {
24 namespace internal {
25 
26 // InterfaceEndpoint stores the information of an interface endpoint registered
27 // with the router.
28 // No one other than the router's |endpoints_| and |tasks_| should hold refs to
29 // this object.
30 class MultiplexRouter::InterfaceEndpoint
31     : public base::RefCounted<InterfaceEndpoint>,
32       public InterfaceEndpointController {
33  public:
InterfaceEndpoint(MultiplexRouter * router,InterfaceId id)34   InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
35       : router_(router),
36         id_(id),
37         closed_(false),
38         peer_closed_(false),
39         client_(nullptr),
40         event_signalled_(false) {}
41 
42   // ---------------------------------------------------------------------------
43   // The following public methods are safe to call from any threads without
44   // locking.
45 
id() const46   InterfaceId id() const { return id_; }
47 
48   // ---------------------------------------------------------------------------
49   // The following public methods are called under the router's lock.
50 
closed() const51   bool closed() const { return closed_; }
set_closed()52   void set_closed() {
53     router_->lock_.AssertAcquired();
54     closed_ = true;
55   }
56 
peer_closed() const57   bool peer_closed() const { return peer_closed_; }
set_peer_closed()58   void set_peer_closed() {
59     router_->lock_.AssertAcquired();
60     peer_closed_ = true;
61   }
62 
task_runner() const63   base::SingleThreadTaskRunner* task_runner() const {
64     return task_runner_.get();
65   }
66 
client() const67   InterfaceEndpointClient* client() const { return client_; }
68 
AttachClient(InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)69   void AttachClient(InterfaceEndpointClient* client,
70                     scoped_refptr<base::SingleThreadTaskRunner> runner) {
71     router_->lock_.AssertAcquired();
72     DCHECK(!client_);
73     DCHECK(!closed_);
74     DCHECK(runner->BelongsToCurrentThread());
75 
76     task_runner_ = std::move(runner);
77     client_ = client;
78   }
79 
80   // This method must be called on the same thread as the corresponding
81   // AttachClient() call.
DetachClient()82   void DetachClient() {
83     router_->lock_.AssertAcquired();
84     DCHECK(client_);
85     DCHECK(task_runner_->BelongsToCurrentThread());
86     DCHECK(!closed_);
87 
88     task_runner_ = nullptr;
89     client_ = nullptr;
90     sync_watcher_.reset();
91   }
92 
SignalSyncMessageEvent()93   void SignalSyncMessageEvent() {
94     router_->lock_.AssertAcquired();
95     if (event_signalled_)
96       return;
97 
98     EnsureEventMessagePipeExists();
99     event_signalled_ = true;
100     MojoResult result =
101         WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
102                         0, MOJO_WRITE_MESSAGE_FLAG_NONE);
103     DCHECK_EQ(MOJO_RESULT_OK, result);
104   }
105 
106   // ---------------------------------------------------------------------------
107   // The following public methods (i.e., InterfaceEndpointController
108   // implementation) are called by the client on the same thread as the
109   // AttachClient() call. They are called outside of the router's lock.
110 
SendMessage(Message * message)111   bool SendMessage(Message* message) override {
112     DCHECK(task_runner_->BelongsToCurrentThread());
113     message->set_interface_id(id_);
114     return router_->connector_.Accept(message);
115   }
116 
AllowWokenUpBySyncWatchOnSameThread()117   void AllowWokenUpBySyncWatchOnSameThread() override {
118     DCHECK(task_runner_->BelongsToCurrentThread());
119 
120     EnsureSyncWatcherExists();
121     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
122   }
123 
SyncWatch(const bool * should_stop)124   bool SyncWatch(const bool* should_stop) override {
125     DCHECK(task_runner_->BelongsToCurrentThread());
126 
127     EnsureSyncWatcherExists();
128     return sync_watcher_->SyncWatch(should_stop);
129   }
130 
131  private:
132   friend class base::RefCounted<InterfaceEndpoint>;
133 
~InterfaceEndpoint()134   ~InterfaceEndpoint() override {
135     router_->lock_.AssertAcquired();
136 
137     DCHECK(!client_);
138     DCHECK(closed_);
139     DCHECK(peer_closed_);
140     DCHECK(!sync_watcher_);
141   }
142 
OnHandleReady(MojoResult result)143   void OnHandleReady(MojoResult result) {
144     DCHECK(task_runner_->BelongsToCurrentThread());
145     scoped_refptr<InterfaceEndpoint> self_protector(this);
146     scoped_refptr<MultiplexRouter> router_protector(router_);
147 
148     // Because we never close |sync_message_event_{sender,receiver}_| before
149     // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
150     DCHECK_EQ(MOJO_RESULT_OK, result);
151     bool reset_sync_watcher = false;
152     {
153       base::AutoLock locker(router_->lock_);
154 
155       bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
156 
157       if (!more_to_process)
158         ResetSyncMessageSignal();
159 
160       // Currently there are no queued sync messages and the peer has closed so
161       // there won't be incoming sync messages in the future.
162       reset_sync_watcher = !more_to_process && peer_closed_;
163     }
164     if (reset_sync_watcher) {
165       // If a SyncWatch() call (or multiple ones) of this interface endpoint is
166       // on the call stack, resetting the sync watcher will allow it to exit
167       // when the call stack unwinds to that frame.
168       sync_watcher_.reset();
169     }
170   }
171 
EnsureSyncWatcherExists()172   void EnsureSyncWatcherExists() {
173     DCHECK(task_runner_->BelongsToCurrentThread());
174     if (sync_watcher_)
175       return;
176 
177     {
178       base::AutoLock locker(router_->lock_);
179       EnsureEventMessagePipeExists();
180 
181       auto iter = router_->sync_message_tasks_.find(id_);
182       if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
183         SignalSyncMessageEvent();
184     }
185 
186     sync_watcher_.reset(new SyncHandleWatcher(
187         sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
188         base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
189   }
190 
EnsureEventMessagePipeExists()191   void EnsureEventMessagePipeExists() {
192     router_->lock_.AssertAcquired();
193 
194     if (sync_message_event_receiver_.is_valid())
195       return;
196 
197     MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
198                                           &sync_message_event_receiver_);
199     DCHECK_EQ(MOJO_RESULT_OK, result);
200   }
201 
ResetSyncMessageSignal()202   void ResetSyncMessageSignal() {
203     router_->lock_.AssertAcquired();
204 
205     if (!event_signalled_)
206       return;
207 
208     DCHECK(sync_message_event_receiver_.is_valid());
209     MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
210                                        nullptr, nullptr, nullptr, nullptr,
211                                        MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
212     DCHECK_EQ(MOJO_RESULT_OK, result);
213     event_signalled_ = false;
214   }
215 
216   // ---------------------------------------------------------------------------
217   // The following members are safe to access from any threads.
218 
219   MultiplexRouter* const router_;
220   const InterfaceId id_;
221 
222   // ---------------------------------------------------------------------------
223   // The following members are accessed under the router's lock.
224 
225   // Whether the endpoint has been closed.
226   bool closed_;
227   // Whether the peer endpoint has been closed.
228   bool peer_closed_;
229 
230   // The task runner on which |client_|'s methods can be called.
231   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
232   // Not owned. It is null if no client is attached to this endpoint.
233   InterfaceEndpointClient* client_;
234 
235   // A message pipe used as an event to signal that sync messages are available.
236   // The message pipe handles are initialized under the router's lock and remain
237   // unchanged afterwards. They may be accessed outside of the router's lock
238   // later.
239   ScopedMessagePipeHandle sync_message_event_sender_;
240   ScopedMessagePipeHandle sync_message_event_receiver_;
241   bool event_signalled_;
242 
243   // ---------------------------------------------------------------------------
244   // The following members are only valid while a client is attached. They are
245   // used exclusively on the client's thread. They may be accessed outside of
246   // the router's lock.
247 
248   std::unique_ptr<SyncHandleWatcher> sync_watcher_;
249 
250   DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
251 };
252 
253 struct MultiplexRouter::Task {
254  public:
255   // Doesn't take ownership of |message| but takes its contents.
CreateMessageTaskmojo::internal::MultiplexRouter::Task256   static std::unique_ptr<Task> CreateMessageTask(Message* message) {
257     Task* task = new Task(MESSAGE);
258     task->message.reset(new Message);
259     message->MoveTo(task->message.get());
260     return base::WrapUnique(task);
261   }
CreateNotifyErrorTaskmojo::internal::MultiplexRouter::Task262   static std::unique_ptr<Task> CreateNotifyErrorTask(
263       InterfaceEndpoint* endpoint) {
264     Task* task = new Task(NOTIFY_ERROR);
265     task->endpoint_to_notify = endpoint;
266     return base::WrapUnique(task);
267   }
268 
~Taskmojo::internal::MultiplexRouter::Task269   ~Task() {}
270 
IsMessageTaskmojo::internal::MultiplexRouter::Task271   bool IsMessageTask() const { return type == MESSAGE; }
IsNotifyErrorTaskmojo::internal::MultiplexRouter::Task272   bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
273 
274   std::unique_ptr<Message> message;
275   scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
276 
277   enum Type { MESSAGE, NOTIFY_ERROR };
278   Type type;
279 
280  private:
Taskmojo::internal::MultiplexRouter::Task281   explicit Task(Type in_type) : type(in_type) {}
282 };
283 
MultiplexRouter(bool set_interface_id_namesapce_bit,ScopedMessagePipeHandle message_pipe,scoped_refptr<base::SingleThreadTaskRunner> runner)284 MultiplexRouter::MultiplexRouter(
285     bool set_interface_id_namesapce_bit,
286     ScopedMessagePipeHandle message_pipe,
287     scoped_refptr<base::SingleThreadTaskRunner> runner)
288     : AssociatedGroupController(base::ThreadTaskRunnerHandle::Get()),
289       set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
290       header_validator_(this),
291       connector_(std::move(message_pipe),
292                  Connector::MULTI_THREADED_SEND,
293                  std::move(runner)),
294       control_message_handler_(this),
295       control_message_proxy_(&connector_),
296       next_interface_id_value_(1),
297       posted_to_process_tasks_(false),
298       encountered_error_(false),
299       testing_mode_(false) {
300   // Always participate in sync handle watching, because even if it doesn't
301   // expect sync requests during sync handle watching, it may still need to
302   // dispatch messages to associated endpoints on a different thread.
303   connector_.AllowWokenUpBySyncWatchOnSameThread();
304   connector_.set_incoming_receiver(&header_validator_);
305   connector_.set_connection_error_handler(
306       base::Bind(&MultiplexRouter::OnPipeConnectionError,
307                  base::Unretained(this)));
308 }
309 
~MultiplexRouter()310 MultiplexRouter::~MultiplexRouter() {
311   base::AutoLock locker(lock_);
312 
313   sync_message_tasks_.clear();
314   tasks_.clear();
315 
316   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
317     InterfaceEndpoint* endpoint = iter->second.get();
318     // Increment the iterator before calling UpdateEndpointStateMayRemove()
319     // because it may remove the corresponding value from the map.
320     ++iter;
321 
322     DCHECK(endpoint->closed());
323     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
324   }
325 
326   DCHECK(endpoints_.empty());
327 }
328 
SetMasterInterfaceName(const std::string & name)329 void MultiplexRouter::SetMasterInterfaceName(const std::string& name) {
330   DCHECK(thread_checker_.CalledOnValidThread());
331   header_validator_.SetDescription(name + " [master] MessageHeaderValidator");
332   control_message_handler_.SetDescription(
333       name + " [master] PipeControlMessageHandler");
334 }
335 
CreateEndpointHandlePair(ScopedInterfaceEndpointHandle * local_endpoint,ScopedInterfaceEndpointHandle * remote_endpoint)336 void MultiplexRouter::CreateEndpointHandlePair(
337     ScopedInterfaceEndpointHandle* local_endpoint,
338     ScopedInterfaceEndpointHandle* remote_endpoint) {
339   base::AutoLock locker(lock_);
340   uint32_t id = 0;
341   do {
342     if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
343       next_interface_id_value_ = 1;
344     id = next_interface_id_value_++;
345     if (set_interface_id_namespace_bit_)
346       id |= kInterfaceIdNamespaceMask;
347   } while (ContainsKey(endpoints_, id));
348 
349   InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
350   endpoints_[id] = endpoint;
351   if (encountered_error_)
352     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
353 
354   *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
355   *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
356 }
357 
CreateLocalEndpointHandle(InterfaceId id)358 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
359     InterfaceId id) {
360   if (!IsValidInterfaceId(id))
361     return ScopedInterfaceEndpointHandle();
362 
363   base::AutoLock locker(lock_);
364   bool inserted = false;
365   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
366   if (inserted) {
367     if (encountered_error_)
368       UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
369   } else {
370     // If the endpoint already exist, it is because we have received a
371     // notification that the peer endpoint has closed.
372     CHECK(!endpoint->closed());
373     CHECK(endpoint->peer_closed());
374   }
375   return CreateScopedInterfaceEndpointHandle(id, true);
376 }
377 
CloseEndpointHandle(InterfaceId id,bool is_local)378 void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) {
379   if (!IsValidInterfaceId(id))
380     return;
381 
382   base::AutoLock locker(lock_);
383 
384   if (!is_local) {
385     DCHECK(ContainsKey(endpoints_, id));
386     DCHECK(!IsMasterInterfaceId(id));
387 
388     // We will receive a NotifyPeerEndpointClosed message from the other side.
389     control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
390 
391     return;
392   }
393 
394   DCHECK(ContainsKey(endpoints_, id));
395   InterfaceEndpoint* endpoint = endpoints_[id].get();
396   DCHECK(!endpoint->client());
397   DCHECK(!endpoint->closed());
398   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
399 
400   if (!IsMasterInterfaceId(id))
401     control_message_proxy_.NotifyPeerEndpointClosed(id);
402 
403   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
404 }
405 
AttachEndpointClient(const ScopedInterfaceEndpointHandle & handle,InterfaceEndpointClient * client,scoped_refptr<base::SingleThreadTaskRunner> runner)406 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
407     const ScopedInterfaceEndpointHandle& handle,
408     InterfaceEndpointClient* client,
409     scoped_refptr<base::SingleThreadTaskRunner> runner) {
410   const InterfaceId id = handle.id();
411 
412   DCHECK(IsValidInterfaceId(id));
413   DCHECK(client);
414 
415   base::AutoLock locker(lock_);
416   DCHECK(ContainsKey(endpoints_, id));
417 
418   InterfaceEndpoint* endpoint = endpoints_[id].get();
419   endpoint->AttachClient(client, std::move(runner));
420 
421   if (endpoint->peer_closed())
422     tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
423   ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
424 
425   return endpoint;
426 }
427 
DetachEndpointClient(const ScopedInterfaceEndpointHandle & handle)428 void MultiplexRouter::DetachEndpointClient(
429     const ScopedInterfaceEndpointHandle& handle) {
430   const InterfaceId id = handle.id();
431 
432   DCHECK(IsValidInterfaceId(id));
433 
434   base::AutoLock locker(lock_);
435   DCHECK(ContainsKey(endpoints_, id));
436 
437   InterfaceEndpoint* endpoint = endpoints_[id].get();
438   endpoint->DetachClient();
439 }
440 
RaiseError()441 void MultiplexRouter::RaiseError() {
442   if (task_runner_->BelongsToCurrentThread()) {
443     connector_.RaiseError();
444   } else {
445     task_runner_->PostTask(FROM_HERE,
446                            base::Bind(&MultiplexRouter::RaiseError, this));
447   }
448 }
449 
CloseMessagePipe()450 void MultiplexRouter::CloseMessagePipe() {
451   DCHECK(thread_checker_.CalledOnValidThread());
452   connector_.CloseMessagePipe();
453   // CloseMessagePipe() above won't trigger connection error handler.
454   // Explicitly call OnPipeConnectionError() so that associated endpoints will
455   // get notified.
456   OnPipeConnectionError();
457 }
458 
HasAssociatedEndpoints() const459 bool MultiplexRouter::HasAssociatedEndpoints() const {
460   DCHECK(thread_checker_.CalledOnValidThread());
461   base::AutoLock locker(lock_);
462 
463   if (endpoints_.size() > 1)
464     return true;
465   if (endpoints_.size() == 0)
466     return false;
467 
468   return !ContainsKey(endpoints_, kMasterInterfaceId);
469 }
470 
EnableTestingMode()471 void MultiplexRouter::EnableTestingMode() {
472   DCHECK(thread_checker_.CalledOnValidThread());
473   base::AutoLock locker(lock_);
474 
475   testing_mode_ = true;
476   connector_.set_enforce_errors_from_incoming_receiver(false);
477 }
478 
Accept(Message * message)479 bool MultiplexRouter::Accept(Message* message) {
480   DCHECK(thread_checker_.CalledOnValidThread());
481 
482   scoped_refptr<MultiplexRouter> protector(this);
483   base::AutoLock locker(lock_);
484 
485   ClientCallBehavior client_call_behavior =
486       connector_.during_sync_handle_watcher_callback()
487           ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
488           : ALLOW_DIRECT_CLIENT_CALLS;
489 
490   bool processed =
491       tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
492                                                connector_.task_runner());
493 
494   if (!processed) {
495     // Either the task queue is not empty or we cannot process the message
496     // directly. In both cases, there is no need to call ProcessTasks().
497     tasks_.push_back(Task::CreateMessageTask(message));
498     Task* task = tasks_.back().get();
499 
500     if (task->message->has_flag(Message::kFlagIsSync)) {
501       InterfaceId id = task->message->interface_id();
502       sync_message_tasks_[id].push_back(task);
503       auto iter = endpoints_.find(id);
504       if (iter != endpoints_.end())
505         iter->second->SignalSyncMessageEvent();
506     }
507   } else if (!tasks_.empty()) {
508     // Processing the message may result in new tasks (for error notification)
509     // being added to the queue. In this case, we have to attempt to process the
510     // tasks.
511     ProcessTasks(client_call_behavior, connector_.task_runner());
512   }
513 
514   // Always return true. If we see errors during message processing, we will
515   // explicitly call Connector::RaiseError() to disconnect the message pipe.
516   return true;
517 }
518 
OnPeerAssociatedEndpointClosed(InterfaceId id)519 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
520   lock_.AssertAcquired();
521 
522   if (IsMasterInterfaceId(id))
523     return false;
524 
525   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
526 
527   // It is possible that this endpoint has been set as peer closed. That is
528   // because when the message pipe is closed, all the endpoints are updated with
529   // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
530   // as long as there are refs keeping the router alive. If there is a
531   // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
532   // here and see that the endpoint has been marked as peer closed.
533   if (!endpoint->peer_closed()) {
534     if (endpoint->client())
535       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
536     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
537   }
538 
539   // No need to trigger a ProcessTasks() because it is already on the stack.
540 
541   return true;
542 }
543 
OnAssociatedEndpointClosedBeforeSent(InterfaceId id)544 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
545   lock_.AssertAcquired();
546 
547   if (IsMasterInterfaceId(id))
548     return false;
549 
550   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
551   DCHECK(!endpoint->closed());
552   UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
553 
554   control_message_proxy_.NotifyPeerEndpointClosed(id);
555 
556   return true;
557 }
558 
OnPipeConnectionError()559 void MultiplexRouter::OnPipeConnectionError() {
560   DCHECK(thread_checker_.CalledOnValidThread());
561 
562   scoped_refptr<MultiplexRouter> protector(this);
563   base::AutoLock locker(lock_);
564 
565   encountered_error_ = true;
566 
567   for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
568     InterfaceEndpoint* endpoint = iter->second.get();
569     // Increment the iterator before calling UpdateEndpointStateMayRemove()
570     // because it may remove the corresponding value from the map.
571     ++iter;
572 
573     if (endpoint->client())
574       tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
575 
576     UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
577   }
578 
579   ProcessTasks(connector_.during_sync_handle_watcher_callback()
580                    ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
581                    : ALLOW_DIRECT_CLIENT_CALLS,
582                connector_.task_runner());
583 }
584 
ProcessTasks(ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)585 void MultiplexRouter::ProcessTasks(
586     ClientCallBehavior client_call_behavior,
587     base::SingleThreadTaskRunner* current_task_runner) {
588   lock_.AssertAcquired();
589 
590   if (posted_to_process_tasks_)
591     return;
592 
593   while (!tasks_.empty()) {
594     std::unique_ptr<Task> task(std::move(tasks_.front()));
595     tasks_.pop_front();
596 
597     InterfaceId id = kInvalidInterfaceId;
598     bool sync_message = task->IsMessageTask() && task->message &&
599                         task->message->has_flag(Message::kFlagIsSync);
600     if (sync_message) {
601       id = task->message->interface_id();
602       auto& sync_message_queue = sync_message_tasks_[id];
603       DCHECK_EQ(task.get(), sync_message_queue.front());
604       sync_message_queue.pop_front();
605     }
606 
607     bool processed =
608         task->IsNotifyErrorTask()
609             ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
610                                      current_task_runner)
611             : ProcessIncomingMessage(task->message.get(), client_call_behavior,
612                                      current_task_runner);
613 
614     if (!processed) {
615       if (sync_message) {
616         auto& sync_message_queue = sync_message_tasks_[id];
617         sync_message_queue.push_front(task.get());
618       }
619       tasks_.push_front(std::move(task));
620       break;
621     } else {
622       if (sync_message) {
623         auto iter = sync_message_tasks_.find(id);
624         if (iter != sync_message_tasks_.end() && iter->second.empty())
625           sync_message_tasks_.erase(iter);
626       }
627     }
628   }
629 }
630 
ProcessFirstSyncMessageForEndpoint(InterfaceId id)631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
632   lock_.AssertAcquired();
633 
634   auto iter = sync_message_tasks_.find(id);
635   if (iter == sync_message_tasks_.end())
636     return false;
637 
638   MultiplexRouter::Task* task = iter->second.front();
639   iter->second.pop_front();
640 
641   DCHECK(task->IsMessageTask());
642   std::unique_ptr<Message> message(std::move(task->message));
643 
644   // Note: after this call, |task| and  |iter| may be invalidated.
645   bool processed = ProcessIncomingMessage(
646       message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
647   DCHECK(processed);
648 
649   iter = sync_message_tasks_.find(id);
650   if (iter == sync_message_tasks_.end())
651     return false;
652 
653   if (iter->second.empty()) {
654     sync_message_tasks_.erase(iter);
655     return false;
656   }
657 
658   return true;
659 }
660 
ProcessNotifyErrorTask(Task * task,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)661 bool MultiplexRouter::ProcessNotifyErrorTask(
662     Task* task,
663     ClientCallBehavior client_call_behavior,
664     base::SingleThreadTaskRunner* current_task_runner) {
665   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
666   lock_.AssertAcquired();
667   InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
668   if (!endpoint->client())
669     return true;
670 
671   if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
672       endpoint->task_runner() != current_task_runner) {
673     MaybePostToProcessTasks(endpoint->task_runner());
674     return false;
675   }
676 
677   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
678 
679   InterfaceEndpointClient* client = endpoint->client();
680   {
681     // We must unlock before calling into |client| because it may call this
682     // object within NotifyError(). Holding the lock will lead to deadlock.
683     //
684     // It is safe to call into |client| without the lock. Because |client| is
685     // always accessed on the same thread, including DetachEndpointClient().
686     base::AutoUnlock unlocker(lock_);
687     client->NotifyError();
688   }
689   return true;
690 }
691 
ProcessIncomingMessage(Message * message,ClientCallBehavior client_call_behavior,base::SingleThreadTaskRunner * current_task_runner)692 bool MultiplexRouter::ProcessIncomingMessage(
693     Message* message,
694     ClientCallBehavior client_call_behavior,
695     base::SingleThreadTaskRunner* current_task_runner) {
696   DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
697   lock_.AssertAcquired();
698 
699   if (!message) {
700     // This is a sync message and has been processed during sync handle
701     // watching.
702     return true;
703   }
704 
705   if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
706     if (!control_message_handler_.Accept(message))
707       RaiseErrorInNonTestingMode();
708     return true;
709   }
710 
711   InterfaceId id = message->interface_id();
712   DCHECK(IsValidInterfaceId(id));
713 
714   bool inserted = false;
715   InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
716   if (inserted) {
717     // Currently, it is legitimate to receive messages for an endpoint
718     // that is not registered. For example, the endpoint is transferred in
719     // a message that is discarded. Once we add support to specify all
720     // enclosing endpoints in message header, we should be able to remove
721     // this.
722     UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
723 
724     // It is also possible that this newly-inserted endpoint is the master
725     // endpoint. When the master InterfacePtr/Binding goes away, the message
726     // pipe is closed and we explicitly trigger a pipe connection error. The
727     // error updates all the endpoints, including the master endpoint, with
728     // PEER_ENDPOINT_CLOSED and removes the master endpoint from the
729     // registration. We continue to process remaining tasks in the queue, as
730     // long as there are refs keeping the router alive. If there are remaining
731     // messages for the master endpoint, we will get here.
732     if (!IsMasterInterfaceId(id))
733       control_message_proxy_.NotifyPeerEndpointClosed(id);
734     return true;
735   }
736 
737   if (endpoint->closed())
738     return true;
739 
740   if (!endpoint->client()) {
741     // We need to wait until a client is attached in order to dispatch further
742     // messages.
743     return false;
744   }
745 
746   bool can_direct_call;
747   if (message->has_flag(Message::kFlagIsSync)) {
748     can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
749                       endpoint->task_runner()->BelongsToCurrentThread();
750   } else {
751     can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
752                       endpoint->task_runner() == current_task_runner;
753   }
754 
755   if (!can_direct_call) {
756     MaybePostToProcessTasks(endpoint->task_runner());
757     return false;
758   }
759 
760   DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
761 
762   InterfaceEndpointClient* client = endpoint->client();
763   bool result = false;
764   {
765     // We must unlock before calling into |client| because it may call this
766     // object within HandleIncomingMessage(). Holding the lock will lead to
767     // deadlock.
768     //
769     // It is safe to call into |client| without the lock. Because |client| is
770     // always accessed on the same thread, including DetachEndpointClient().
771     base::AutoUnlock unlocker(lock_);
772     result = client->HandleIncomingMessage(message);
773   }
774   if (!result)
775     RaiseErrorInNonTestingMode();
776 
777   return true;
778 }
779 
MaybePostToProcessTasks(base::SingleThreadTaskRunner * task_runner)780 void MultiplexRouter::MaybePostToProcessTasks(
781     base::SingleThreadTaskRunner* task_runner) {
782   lock_.AssertAcquired();
783   if (posted_to_process_tasks_)
784     return;
785 
786   posted_to_process_tasks_ = true;
787   posted_to_task_runner_ = task_runner;
788   task_runner->PostTask(
789       FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
790 }
791 
LockAndCallProcessTasks()792 void MultiplexRouter::LockAndCallProcessTasks() {
793   // There is no need to hold a ref to this class in this case because this is
794   // always called using base::Bind(), which holds a ref.
795   base::AutoLock locker(lock_);
796   posted_to_process_tasks_ = false;
797   scoped_refptr<base::SingleThreadTaskRunner> runner(
798       std::move(posted_to_task_runner_));
799   ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
800 }
801 
UpdateEndpointStateMayRemove(InterfaceEndpoint * endpoint,EndpointStateUpdateType type)802 void MultiplexRouter::UpdateEndpointStateMayRemove(
803     InterfaceEndpoint* endpoint,
804     EndpointStateUpdateType type) {
805   switch (type) {
806     case ENDPOINT_CLOSED:
807       endpoint->set_closed();
808       break;
809     case PEER_ENDPOINT_CLOSED:
810       endpoint->set_peer_closed();
811       // If the interface endpoint is performing a sync watch, this makes sure
812       // it is notified and eventually exits the sync watch.
813       endpoint->SignalSyncMessageEvent();
814       break;
815   }
816   if (endpoint->closed() && endpoint->peer_closed())
817     endpoints_.erase(endpoint->id());
818 }
819 
RaiseErrorInNonTestingMode()820 void MultiplexRouter::RaiseErrorInNonTestingMode() {
821   lock_.AssertAcquired();
822   if (!testing_mode_)
823     RaiseError();
824 }
825 
FindOrInsertEndpoint(InterfaceId id,bool * inserted)826 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
827     InterfaceId id,
828     bool* inserted) {
829   lock_.AssertAcquired();
830   // Either |inserted| is nullptr or it points to a boolean initialized as
831   // false.
832   DCHECK(!inserted || !*inserted);
833 
834   auto iter = endpoints_.find(id);
835   InterfaceEndpoint* endpoint;
836   if (iter == endpoints_.end()) {
837     endpoint = new InterfaceEndpoint(this, id);
838     endpoints_[id] = endpoint;
839     if (inserted)
840       *inserted = true;
841   } else {
842     endpoint = iter->second.get();
843   }
844 
845   return endpoint;
846 }
847 
848 }  // namespace internal
849 }  // namespace mojo
850