1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/node_controller.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/histogram_macros.h"
16 #include "base/process/process_handle.h"
17 #include "base/rand_util.h"
18 #include "base/time/time.h"
19 #include "base/timer/elapsed_timer.h"
20 #include "mojo/edk/embedder/embedder_internal.h"
21 #include "mojo/edk/embedder/platform_channel_pair.h"
22 #include "mojo/edk/system/broker.h"
23 #include "mojo/edk/system/broker_host.h"
24 #include "mojo/edk/system/core.h"
25 #include "mojo/edk/system/ports_message.h"
26 #include "mojo/edk/system/request_context.h"
27 
28 #if defined(OS_MACOSX) && !defined(OS_IOS)
29 #include "mojo/edk/system/mach_port_relay.h"
30 #endif
31 
32 #if !defined(OS_NACL)
33 #include "crypto/random.h"
34 #endif
35 
36 namespace mojo {
37 namespace edk {
38 
39 namespace {
40 
41 #if defined(OS_NACL)
42 template <typename T>
GenerateRandomName(T * out)43 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
44 #else
45 template <typename T>
46 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
47 #endif
48 
GetRandomNodeName()49 ports::NodeName GetRandomNodeName() {
50   ports::NodeName name;
51   GenerateRandomName(&name);
52   return name;
53 }
54 
RecordPeerCount(size_t count)55 void RecordPeerCount(size_t count) {
56   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
57 
58   // 8k is the maximum number of file descriptors allowed in Chrome.
59   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
60                               static_cast<int32_t>(count),
61                               0 /* min */,
62                               8000 /* max */,
63                               50 /* bucket count */);
64 }
65 
RecordPendingChildCount(size_t count)66 void RecordPendingChildCount(size_t count) {
67   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
68 
69   // 8k is the maximum number of file descriptors allowed in Chrome.
70   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
71                               static_cast<int32_t>(count),
72                               0 /* min */,
73                               8000 /* max */,
74                               50 /* bucket count */);
75 }
76 
ParsePortsMessage(Channel::Message * message,void ** data,size_t * num_data_bytes,size_t * num_header_bytes,size_t * num_payload_bytes,size_t * num_ports_bytes)77 bool ParsePortsMessage(Channel::Message* message,
78                        void** data,
79                        size_t* num_data_bytes,
80                        size_t* num_header_bytes,
81                        size_t* num_payload_bytes,
82                        size_t* num_ports_bytes) {
83   DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
84          num_ports_bytes);
85 
86   NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
87   if (!*num_data_bytes)
88     return false;
89 
90   if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
91                              num_payload_bytes, num_ports_bytes)) {
92     return false;
93   }
94 
95   return true;
96 }
97 
98 // Used by NodeController to watch for shutdown. Since no IO can happen once
99 // the IO thread is killed, the NodeController can cleanly drop all its peers
100 // at that time.
101 class ThreadDestructionObserver :
102     public base::MessageLoop::DestructionObserver {
103  public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)104   static void Create(scoped_refptr<base::TaskRunner> task_runner,
105                      const base::Closure& callback) {
106     if (task_runner->RunsTasksOnCurrentThread()) {
107       // Owns itself.
108       new ThreadDestructionObserver(callback);
109     } else {
110       task_runner->PostTask(FROM_HERE,
111                             base::Bind(&Create, task_runner, callback));
112     }
113   }
114 
115  private:
ThreadDestructionObserver(const base::Closure & callback)116   explicit ThreadDestructionObserver(const base::Closure& callback)
117       : callback_(callback) {
118     base::MessageLoop::current()->AddDestructionObserver(this);
119   }
120 
~ThreadDestructionObserver()121   ~ThreadDestructionObserver() override {
122     base::MessageLoop::current()->RemoveDestructionObserver(this);
123   }
124 
125   // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()126   void WillDestroyCurrentMessageLoop() override {
127     callback_.Run();
128     delete this;
129   }
130 
131   const base::Closure callback_;
132 
133   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
134 };
135 
136 }  // namespace
137 
~NodeController()138 NodeController::~NodeController() {}
139 
NodeController(Core * core)140 NodeController::NodeController(Core* core)
141     : core_(core),
142       name_(GetRandomNodeName()),
143       node_(new ports::Node(name_, this)) {
144   DVLOG(1) << "Initializing node " << name_;
145 }
146 
147 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)148 void NodeController::CreateMachPortRelay(
149     base::PortProvider* port_provider) {
150   base::AutoLock lock(mach_port_relay_lock_);
151   DCHECK(!mach_port_relay_);
152   mach_port_relay_.reset(new MachPortRelay(port_provider));
153 }
154 #endif
155 
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)156 void NodeController::SetIOTaskRunner(
157     scoped_refptr<base::TaskRunner> task_runner) {
158   io_task_runner_ = task_runner;
159   ThreadDestructionObserver::Create(
160       io_task_runner_,
161       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
162 }
163 
ConnectToChild(base::ProcessHandle process_handle,ScopedPlatformHandle platform_handle,const std::string & child_token,const ProcessErrorCallback & process_error_callback)164 void NodeController::ConnectToChild(
165     base::ProcessHandle process_handle,
166     ScopedPlatformHandle platform_handle,
167     const std::string& child_token,
168     const ProcessErrorCallback& process_error_callback) {
169   // Generate the temporary remote node name here so that it can be associated
170   // with the embedder's child_token. If an error occurs in the child process
171   // after it is launched, but before any reserved ports are connected, this can
172   // be used to clean up any dangling ports.
173   ports::NodeName node_name;
174   GenerateRandomName(&node_name);
175 
176   {
177     base::AutoLock lock(reserved_ports_lock_);
178     bool inserted = pending_child_tokens_.insert(
179         std::make_pair(node_name, child_token)).second;
180     DCHECK(inserted);
181   }
182 
183 #if defined(OS_WIN)
184   // On Windows, we need to duplicate the process handle because we have no
185   // control over its lifetime and it may become invalid by the time the posted
186   // task runs.
187   HANDLE dup_handle = INVALID_HANDLE_VALUE;
188   BOOL ok = ::DuplicateHandle(
189       base::GetCurrentProcessHandle(), process_handle,
190       base::GetCurrentProcessHandle(), &dup_handle,
191       0, FALSE, DUPLICATE_SAME_ACCESS);
192   DPCHECK(ok);
193   process_handle = dup_handle;
194 #endif
195 
196   io_task_runner_->PostTask(
197       FROM_HERE,
198       base::Bind(&NodeController::ConnectToChildOnIOThread,
199                  base::Unretained(this),
200                  process_handle,
201                  base::Passed(&platform_handle),
202                  node_name,
203                  process_error_callback));
204 }
205 
CloseChildPorts(const std::string & child_token)206 void NodeController::CloseChildPorts(const std::string& child_token) {
207   std::vector<ports::PortRef> ports_to_close;
208   {
209     std::vector<std::string> port_tokens;
210     base::AutoLock lock(reserved_ports_lock_);
211     for (const auto& port : reserved_ports_) {
212       if (port.second.child_token == child_token) {
213         DVLOG(1) << "Closing reserved port " << port.second.port.name();
214         ports_to_close.push_back(port.second.port);
215         port_tokens.push_back(port.first);
216       }
217     }
218 
219     for (const auto& token : port_tokens)
220       reserved_ports_.erase(token);
221   }
222 
223   for (const auto& port : ports_to_close)
224     node_->ClosePort(port);
225 
226   // Ensure local port closure messages are processed.
227   AcceptIncomingMessages();
228 }
229 
ConnectToParent(ScopedPlatformHandle platform_handle)230 void NodeController::ConnectToParent(ScopedPlatformHandle platform_handle) {
231 // TODO(amistry): Consider the need for a broker on Windows.
232 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
233   // On posix, use the bootstrap channel for the broker and receive the node's
234   // channel synchronously as the first message from the broker.
235   base::ElapsedTimer timer;
236   broker_.reset(new Broker(std::move(platform_handle)));
237   platform_handle = broker_->GetParentPlatformHandle();
238   UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
239                       timer.Elapsed());
240 
241   if (!platform_handle.is_valid()) {
242     // Most likely the browser side of the channel has already been closed and
243     // the broker was unable to negotiate a NodeChannel pipe. In this case we
244     // can cancel parent connection.
245     DVLOG(1) << "Cannot connect to invalid parent channel.";
246     return;
247   }
248 #endif
249 
250   io_task_runner_->PostTask(
251       FROM_HERE,
252       base::Bind(&NodeController::ConnectToParentOnIOThread,
253                  base::Unretained(this),
254                  base::Passed(&platform_handle)));
255 }
256 
SetPortObserver(const ports::PortRef & port,const scoped_refptr<PortObserver> & observer)257 void NodeController::SetPortObserver(
258     const ports::PortRef& port,
259     const scoped_refptr<PortObserver>& observer) {
260   node_->SetUserData(port, observer);
261 }
262 
ClosePort(const ports::PortRef & port)263 void NodeController::ClosePort(const ports::PortRef& port) {
264   SetPortObserver(port, nullptr);
265   int rv = node_->ClosePort(port);
266   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
267 
268   AcceptIncomingMessages();
269 }
270 
SendMessage(const ports::PortRef & port,std::unique_ptr<PortsMessage> message)271 int NodeController::SendMessage(const ports::PortRef& port,
272                                 std::unique_ptr<PortsMessage> message) {
273   ports::ScopedMessage ports_message(message.release());
274   int rv = node_->SendMessage(port, std::move(ports_message));
275 
276   AcceptIncomingMessages();
277   return rv;
278 }
279 
ReservePort(const std::string & token,const ports::PortRef & port,const std::string & child_token)280 void NodeController::ReservePort(const std::string& token,
281                                  const ports::PortRef& port,
282                                  const std::string& child_token) {
283   DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
284            << token;
285 
286   base::AutoLock lock(reserved_ports_lock_);
287   auto result = reserved_ports_.insert(
288       std::make_pair(token, ReservedPort{port, child_token}));
289   DCHECK(result.second);
290 }
291 
MergePortIntoParent(const std::string & token,const ports::PortRef & port)292 void NodeController::MergePortIntoParent(const std::string& token,
293                                          const ports::PortRef& port) {
294   bool was_merged = false;
295   {
296     // This request may be coming from within the process that reserved the
297     // "parent" side (e.g. for Chrome single-process mode), so if this token is
298     // reserved locally, merge locally instead.
299     base::AutoLock lock(reserved_ports_lock_);
300     auto it = reserved_ports_.find(token);
301     if (it != reserved_ports_.end()) {
302       node_->MergePorts(port, name_, it->second.port.name());
303       reserved_ports_.erase(it);
304       was_merged = true;
305     }
306   }
307   if (was_merged) {
308     AcceptIncomingMessages();
309     return;
310   }
311 
312   scoped_refptr<NodeChannel> parent;
313   bool reject_merge = false;
314   {
315     // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
316     // there is a race where the parent can be set, and |pending_port_merges_|
317     // be processed between retrieving |parent| and adding the merge to
318     // |pending_port_merges_|.
319     base::AutoLock lock(pending_port_merges_lock_);
320     parent = GetParentChannel();
321     if (reject_pending_merges_) {
322       reject_merge = true;
323     } else if (!parent) {
324       pending_port_merges_.push_back(std::make_pair(token, port));
325       return;
326     }
327   }
328   if (reject_merge) {
329     node_->ClosePort(port);
330     DVLOG(2) << "Rejecting port merge for token " << token
331              << " due to closed parent channel.";
332     AcceptIncomingMessages();
333     return;
334   }
335 
336   parent->RequestPortMerge(port.name(), token);
337 }
338 
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)339 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
340                                     const ports::PortRef& port1) {
341   int rv = node_->MergeLocalPorts(port0, port1);
342   AcceptIncomingMessages();
343   return rv;
344 }
345 
CreateSharedBuffer(size_t num_bytes)346 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
347     size_t num_bytes) {
348 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
349   // Shared buffer creation failure is fatal, so always use the broker when we
350   // have one. This does mean that a non-root process that has children will use
351   // the broker for shared buffer creation even though that process is
352   // privileged.
353   if (broker_) {
354     return broker_->GetSharedBuffer(num_bytes);
355   }
356 #endif
357   return PlatformSharedBuffer::Create(num_bytes);
358 }
359 
RequestShutdown(const base::Closure & callback)360 void NodeController::RequestShutdown(const base::Closure& callback) {
361   {
362     base::AutoLock lock(shutdown_lock_);
363     shutdown_callback_ = callback;
364     shutdown_callback_flag_.Set(true);
365   }
366 
367   AttemptShutdownIfRequested();
368 }
369 
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)370 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
371                                           const std::string& error) {
372   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
373   if (peer)
374     peer->NotifyBadMessage(error);
375 }
376 
ConnectToChildOnIOThread(base::ProcessHandle process_handle,ScopedPlatformHandle platform_handle,ports::NodeName token,const ProcessErrorCallback & process_error_callback)377 void NodeController::ConnectToChildOnIOThread(
378     base::ProcessHandle process_handle,
379     ScopedPlatformHandle platform_handle,
380     ports::NodeName token,
381     const ProcessErrorCallback& process_error_callback) {
382   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
383 
384 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL)
385   PlatformChannelPair node_channel;
386   // BrokerHost owns itself.
387   BrokerHost* broker_host = new BrokerHost(std::move(platform_handle));
388   broker_host->SendChannel(node_channel.PassClientHandle());
389   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
390       this, node_channel.PassServerHandle(), io_task_runner_,
391       process_error_callback);
392 #else
393   scoped_refptr<NodeChannel> channel =
394       NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
395                           process_error_callback);
396 #endif
397 
398   // We set up the child channel with a temporary name so it can be identified
399   // as a pending child if it writes any messages to the channel. We may start
400   // receiving messages from it (though we shouldn't) as soon as Start() is
401   // called below.
402 
403   pending_children_.insert(std::make_pair(token, channel));
404   RecordPendingChildCount(pending_children_.size());
405 
406   channel->SetRemoteNodeName(token);
407   channel->SetRemoteProcessHandle(process_handle);
408   channel->Start();
409 
410   channel->AcceptChild(name_, token);
411 }
412 
ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle)413 void NodeController::ConnectToParentOnIOThread(
414     ScopedPlatformHandle platform_handle) {
415   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
416 
417   {
418     base::AutoLock lock(parent_lock_);
419     DCHECK(parent_name_ == ports::kInvalidNodeName);
420 
421     // At this point we don't know the parent's name, so we can't yet insert it
422     // into our |peers_| map. That will happen as soon as we receive an
423     // AcceptChild message from them.
424     bootstrap_parent_channel_ =
425         NodeChannel::Create(this, std::move(platform_handle), io_task_runner_,
426                             ProcessErrorCallback());
427     // Prevent the parent pipe handle from being closed on shutdown. Pipe
428     // closure is used by the parent to detect the child process has exited.
429     // Relying on message pipes to be closed is not enough because the parent
430     // may see the message pipe closure before the child is dead, causing the
431     // child process to be unexpectedly SIGKILL'd.
432     bootstrap_parent_channel_->LeakHandleOnShutdown();
433   }
434   bootstrap_parent_channel_->Start();
435 }
436 
GetPeerChannel(const ports::NodeName & name)437 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
438     const ports::NodeName& name) {
439   base::AutoLock lock(peers_lock_);
440   auto it = peers_.find(name);
441   if (it == peers_.end())
442     return nullptr;
443   return it->second;
444 }
445 
GetParentChannel()446 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
447   ports::NodeName parent_name;
448   {
449     base::AutoLock lock(parent_lock_);
450     parent_name = parent_name_;
451   }
452   return GetPeerChannel(parent_name);
453 }
454 
GetBrokerChannel()455 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
456   ports::NodeName broker_name;
457   {
458     base::AutoLock lock(broker_lock_);
459     broker_name = broker_name_;
460   }
461   return GetPeerChannel(broker_name);
462 }
463 
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)464 void NodeController::AddPeer(const ports::NodeName& name,
465                              scoped_refptr<NodeChannel> channel,
466                              bool start_channel) {
467   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
468 
469   DCHECK(name != ports::kInvalidNodeName);
470   DCHECK(channel);
471 
472   channel->SetRemoteNodeName(name);
473 
474   OutgoingMessageQueue pending_messages;
475   {
476     base::AutoLock lock(peers_lock_);
477     if (peers_.find(name) != peers_.end()) {
478       // This can happen normally if two nodes race to be introduced to each
479       // other. The losing pipe will be silently closed and introduction should
480       // not be affected.
481       DVLOG(1) << "Ignoring duplicate peer name " << name;
482       return;
483     }
484 
485     auto result = peers_.insert(std::make_pair(name, channel));
486     DCHECK(result.second);
487 
488     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
489 
490     RecordPeerCount(peers_.size());
491 
492     auto it = pending_peer_messages_.find(name);
493     if (it != pending_peer_messages_.end()) {
494       std::swap(pending_messages, it->second);
495       pending_peer_messages_.erase(it);
496     }
497   }
498 
499   if (start_channel)
500     channel->Start();
501 
502   // Flush any queued message we need to deliver to this node.
503   while (!pending_messages.empty()) {
504     channel->PortsMessage(std::move(pending_messages.front()));
505     pending_messages.pop();
506   }
507 }
508 
DropPeer(const ports::NodeName & name,NodeChannel * channel)509 void NodeController::DropPeer(const ports::NodeName& name,
510                               NodeChannel* channel) {
511   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
512 
513   {
514     base::AutoLock lock(peers_lock_);
515     auto it = peers_.find(name);
516 
517     if (it != peers_.end()) {
518       ports::NodeName peer = it->first;
519       peers_.erase(it);
520       DVLOG(1) << "Dropped peer " << peer;
521     }
522 
523     pending_peer_messages_.erase(name);
524     pending_children_.erase(name);
525 
526     RecordPeerCount(peers_.size());
527     RecordPendingChildCount(pending_children_.size());
528   }
529 
530   std::vector<ports::PortRef> ports_to_close;
531   {
532     // Clean up any reserved ports.
533     base::AutoLock lock(reserved_ports_lock_);
534     auto it = pending_child_tokens_.find(name);
535     if (it != pending_child_tokens_.end()) {
536       const std::string& child_token = it->second;
537 
538       std::vector<std::string> port_tokens;
539       for (const auto& port : reserved_ports_) {
540         if (port.second.child_token == child_token) {
541           DVLOG(1) << "Closing reserved port: " << port.second.port.name();
542           ports_to_close.push_back(port.second.port);
543           port_tokens.push_back(port.first);
544         }
545       }
546 
547       // We have to erase reserved ports in a two-step manner because the usual
548       // manner of using the returned iterator from map::erase isn't technically
549       // valid in C++11 (although it is in C++14).
550       for (const auto& token : port_tokens)
551         reserved_ports_.erase(token);
552 
553       pending_child_tokens_.erase(it);
554     }
555   }
556 
557   bool is_parent;
558   {
559     base::AutoLock lock(parent_lock_);
560     is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
561   }
562   // If the error comes from the parent channel, we also need to cancel any
563   // port merge requests, so that errors can be propagated to the message
564   // pipes.
565   if (is_parent) {
566     base::AutoLock lock(pending_port_merges_lock_);
567     reject_pending_merges_ = true;
568 
569     for (const auto& port : pending_port_merges_)
570       ports_to_close.push_back(port.second);
571     pending_port_merges_.clear();
572   }
573 
574   for (const auto& port : ports_to_close)
575     node_->ClosePort(port);
576 
577   node_->LostConnectionToNode(name);
578 
579   AcceptIncomingMessages();
580 }
581 
SendPeerMessage(const ports::NodeName & name,ports::ScopedMessage message)582 void NodeController::SendPeerMessage(const ports::NodeName& name,
583                                      ports::ScopedMessage message) {
584   Channel::MessagePtr channel_message =
585       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
586 
587   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
588 #if defined(OS_WIN)
589   if (channel_message->has_handles()) {
590     // If we're sending a message with handles we aren't the destination
591     // node's parent or broker (i.e. we don't know its process handle), ask
592     // the broker to relay for us.
593     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
594     if (!peer || !peer->HasRemoteProcessHandle()) {
595       if (broker) {
596         broker->RelayPortsMessage(name, std::move(channel_message));
597       } else {
598         base::AutoLock lock(broker_lock_);
599         pending_relay_messages_[name].emplace(std::move(channel_message));
600       }
601       return;
602     }
603   }
604 #elif defined(OS_MACOSX) && !defined(OS_IOS)
605   if (channel_message->has_mach_ports()) {
606     // Messages containing Mach ports are always routed through the broker, even
607     // if the broker process is the intended recipient.
608     bool use_broker = false;
609     {
610       base::AutoLock lock(parent_lock_);
611       use_broker = (bootstrap_parent_channel_ ||
612                     parent_name_ != ports::kInvalidNodeName);
613     }
614     if (use_broker) {
615       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
616       if (broker) {
617         broker->RelayPortsMessage(name, std::move(channel_message));
618       } else {
619         base::AutoLock lock(broker_lock_);
620         pending_relay_messages_[name].emplace(std::move(channel_message));
621       }
622       return;
623     }
624   }
625 #endif  // defined(OS_WIN)
626 
627   if (peer) {
628     peer->PortsMessage(std::move(channel_message));
629     return;
630   }
631 
632   // If we don't know who the peer is, queue the message for delivery. If this
633   // is the first message queued for the peer, we also ask the broker to
634   // introduce us to them.
635 
636   bool needs_introduction = false;
637   {
638     base::AutoLock lock(peers_lock_);
639     auto& queue = pending_peer_messages_[name];
640     needs_introduction = queue.empty();
641     queue.emplace(std::move(channel_message));
642   }
643 
644   if (needs_introduction) {
645     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
646     if (!broker) {
647       DVLOG(1) << "Dropping message for unknown peer: " << name;
648       return;
649     }
650     broker->RequestIntroduction(name);
651   }
652 }
653 
AcceptIncomingMessages()654 void NodeController::AcceptIncomingMessages() {
655   {
656     base::AutoLock lock(messages_lock_);
657     if (!incoming_messages_.empty()) {
658       // libstdc++'s deque creates an internal buffer on construction, even when
659       // the size is 0. So avoid creating it until it is necessary.
660       std::queue<ports::ScopedMessage> messages;
661       std::swap(messages, incoming_messages_);
662       base::AutoUnlock unlock(messages_lock_);
663 
664       while (!messages.empty()) {
665         node_->AcceptMessage(std::move(messages.front()));
666         messages.pop();
667       }
668     }
669   }
670 
671   AttemptShutdownIfRequested();
672 }
673 
ProcessIncomingMessages()674 void NodeController::ProcessIncomingMessages() {
675   RequestContext request_context(RequestContext::Source::SYSTEM);
676 
677   {
678     base::AutoLock lock(messages_lock_);
679     // Allow a new incoming messages processing task to be posted. This can't be
680     // done after AcceptIncomingMessages() otherwise a message might be missed.
681     // Doing it here may result in at most two tasks existing at the same time;
682     // this running one, and one pending in the task runner.
683     incoming_messages_task_posted_ = false;
684   }
685 
686   AcceptIncomingMessages();
687 }
688 
DropAllPeers()689 void NodeController::DropAllPeers() {
690   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
691 
692   std::vector<scoped_refptr<NodeChannel>> all_peers;
693   {
694     base::AutoLock lock(parent_lock_);
695     if (bootstrap_parent_channel_) {
696       // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
697       // existence to determine whether or not this is the root node. Once
698       // bootstrap_parent_channel_->ShutDown() has been called,
699       // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
700       // matter if it's deleted now or when |this| is deleted.
701       // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
702       all_peers.push_back(bootstrap_parent_channel_);
703     }
704   }
705 
706   {
707     base::AutoLock lock(peers_lock_);
708     for (const auto& peer : peers_)
709       all_peers.push_back(peer.second);
710     for (const auto& peer : pending_children_)
711       all_peers.push_back(peer.second);
712     peers_.clear();
713     pending_children_.clear();
714     pending_peer_messages_.clear();
715   }
716 
717   for (const auto& peer : all_peers)
718     peer->ShutDown();
719 
720   if (destroy_on_io_thread_shutdown_)
721     delete this;
722 }
723 
GenerateRandomPortName(ports::PortName * port_name)724 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
725   GenerateRandomName(port_name);
726 }
727 
AllocMessage(size_t num_header_bytes,ports::ScopedMessage * message)728 void NodeController::AllocMessage(size_t num_header_bytes,
729                                   ports::ScopedMessage* message) {
730   message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
731 }
732 
ForwardMessage(const ports::NodeName & node,ports::ScopedMessage message)733 void NodeController::ForwardMessage(const ports::NodeName& node,
734                                     ports::ScopedMessage message) {
735   DCHECK(message);
736   bool schedule_pump_task = false;
737   if (node == name_) {
738     // NOTE: We need to avoid re-entering the Node instance within
739     // ForwardMessage. Because ForwardMessage is only ever called
740     // (synchronously) in response to Node's ClosePort, SendMessage, or
741     // AcceptMessage, we flush the queue after calling any of those methods.
742     base::AutoLock lock(messages_lock_);
743     // |io_task_runner_| may be null in tests or processes that don't require
744     // multi-process Mojo.
745     schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
746         !incoming_messages_task_posted_;
747     incoming_messages_task_posted_ |= schedule_pump_task;
748     incoming_messages_.emplace(std::move(message));
749   } else {
750     SendPeerMessage(node, std::move(message));
751   }
752 
753   if (schedule_pump_task) {
754     // Normally, the queue is processed after the action that added the local
755     // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
756     // possible for a local message to be added as a result of a remote message,
757     // and OnChannelMessage() doesn't process this queue (although
758     // OnPortsMessage() does). There may also be other code paths, now or added
759     // in the future, which cause local messages to be added but don't process
760     // this message queue.
761     //
762     // Instead of adding a call to AcceptIncomingMessages() on every possible
763     // code path, post a task to the IO thread to process the queue. If the
764     // current call stack processes the queue, this may end up doing nothing.
765     io_task_runner_->PostTask(
766         FROM_HERE,
767         base::Bind(&NodeController::ProcessIncomingMessages,
768                    base::Unretained(this)));
769   }
770 }
771 
BroadcastMessage(ports::ScopedMessage message)772 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
773   CHECK_EQ(message->num_ports(), 0u);
774   Channel::MessagePtr channel_message =
775       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
776   CHECK(!channel_message->has_handles());
777 
778   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
779   if (broker)
780     broker->Broadcast(std::move(channel_message));
781   else
782     OnBroadcast(name_, std::move(channel_message));
783 }
784 
PortStatusChanged(const ports::PortRef & port)785 void NodeController::PortStatusChanged(const ports::PortRef& port) {
786   scoped_refptr<ports::UserData> user_data;
787   node_->GetUserData(port, &user_data);
788 
789   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
790   if (observer) {
791     observer->OnPortStatusChanged();
792   } else {
793     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
794              << "doesn't have an observer.";
795   }
796 }
797 
OnAcceptChild(const ports::NodeName & from_node,const ports::NodeName & parent_name,const ports::NodeName & token)798 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
799                                    const ports::NodeName& parent_name,
800                                    const ports::NodeName& token) {
801   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
802 
803   scoped_refptr<NodeChannel> parent;
804   {
805     base::AutoLock lock(parent_lock_);
806     if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
807       parent_name_ = parent_name;
808       parent = bootstrap_parent_channel_;
809     }
810   }
811 
812   if (!parent) {
813     DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
814     DropPeer(from_node, nullptr);
815     return;
816   }
817 
818   parent->SetRemoteNodeName(parent_name);
819   parent->AcceptParent(token, name_);
820 
821   // NOTE: The child does not actually add its parent as a peer until
822   // receiving an AcceptBrokerClient message from the broker. The parent
823   // will request that said message be sent upon receiving AcceptParent.
824 
825   DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
826 }
827 
OnAcceptParent(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & child_name)828 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
829                                     const ports::NodeName& token,
830                                     const ports::NodeName& child_name) {
831   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
832 
833   auto it = pending_children_.find(from_node);
834   if (it == pending_children_.end() || token != from_node) {
835     DLOG(ERROR) << "Received unexpected AcceptParent message from "
836                 << from_node;
837     DropPeer(from_node, nullptr);
838     return;
839   }
840 
841   scoped_refptr<NodeChannel> channel = it->second;
842   pending_children_.erase(it);
843 
844   DCHECK(channel);
845 
846   DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
847 
848   AddPeer(child_name, channel, false /* start_channel */);
849 
850   // TODO(rockot/amistry): We could simplify child initialization if we could
851   // synchronously get a new async broker channel from the broker. For now we do
852   // it asynchronously since it's only used to facilitate handle passing, not
853   // handle creation.
854   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
855   if (broker) {
856     // Inform the broker of this new child.
857     broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
858   } else {
859     // If we have no broker, either we need to wait for one, or we *are* the
860     // broker.
861     scoped_refptr<NodeChannel> parent = GetParentChannel();
862     if (!parent) {
863       base::AutoLock lock(parent_lock_);
864       parent = bootstrap_parent_channel_;
865     }
866 
867     if (!parent) {
868       // Yes, we're the broker. We can initialize the child directly.
869       channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
870     } else {
871       // We aren't the broker, so wait for a broker connection.
872       base::AutoLock lock(broker_lock_);
873       pending_broker_clients_.push(child_name);
874     }
875   }
876 }
877 
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)878 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
879                                        const ports::NodeName& client_name,
880                                        base::ProcessHandle process_handle) {
881 #if defined(OS_WIN)
882   // Scoped handle to avoid leaks on error.
883   ScopedPlatformHandle scoped_process_handle =
884       ScopedPlatformHandle(PlatformHandle(process_handle));
885 #endif
886   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
887   if (!sender) {
888     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
889     return;
890   }
891 
892   if (GetPeerChannel(client_name)) {
893     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
894     DropPeer(from_node, nullptr);
895     return;
896   }
897 
898   PlatformChannelPair broker_channel;
899   scoped_refptr<NodeChannel> client = NodeChannel::Create(
900       this, broker_channel.PassServerHandle(), io_task_runner_,
901       ProcessErrorCallback());
902 
903 #if defined(OS_WIN)
904   // The broker must have a working handle to the client process in order to
905   // properly copy other handles to and from the client.
906   if (!scoped_process_handle.is_valid()) {
907     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
908     return;
909   }
910   client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
911 #else
912   client->SetRemoteProcessHandle(process_handle);
913 #endif
914 
915   AddPeer(client_name, client, true /* start_channel */);
916 
917   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
918            << " from peer " << from_node;
919 
920   sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
921 }
922 
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,ScopedPlatformHandle broker_channel)923 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
924                                          const ports::NodeName& client_name,
925                                          ScopedPlatformHandle broker_channel) {
926   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
927   if (!client) {
928     DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
929     return;
930   }
931 
932   // This should have come from our own broker.
933   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
934     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
935     return;
936   }
937 
938   DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
939 
940   client->AcceptBrokerClient(from_node, std::move(broker_channel));
941 }
942 
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,ScopedPlatformHandle broker_channel)943 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
944                                           const ports::NodeName& broker_name,
945                                           ScopedPlatformHandle broker_channel) {
946   // This node should already have a parent in bootstrap mode.
947   ports::NodeName parent_name;
948   scoped_refptr<NodeChannel> parent;
949   {
950     base::AutoLock lock(parent_lock_);
951     parent_name = parent_name_;
952     parent = bootstrap_parent_channel_;
953     bootstrap_parent_channel_ = nullptr;
954   }
955   DCHECK(parent_name == from_node);
956   DCHECK(parent);
957 
958   std::queue<ports::NodeName> pending_broker_clients;
959   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
960       pending_relay_messages;
961   {
962     base::AutoLock lock(broker_lock_);
963     broker_name_ = broker_name;
964     std::swap(pending_broker_clients, pending_broker_clients_);
965     std::swap(pending_relay_messages, pending_relay_messages_);
966   }
967   DCHECK(broker_name != ports::kInvalidNodeName);
968 
969   // It's now possible to add both the broker and the parent as peers.
970   // Note that the broker and parent may be the same node.
971   scoped_refptr<NodeChannel> broker;
972   if (broker_name == parent_name) {
973     DCHECK(!broker_channel.is_valid());
974     broker = parent;
975   } else {
976     DCHECK(broker_channel.is_valid());
977     broker = NodeChannel::Create(this, std::move(broker_channel),
978                                  io_task_runner_, ProcessErrorCallback());
979     AddPeer(broker_name, broker, true /* start_channel */);
980   }
981 
982   AddPeer(parent_name, parent, false /* start_channel */);
983 
984   {
985     // Complete any port merge requests we have waiting for the parent.
986     base::AutoLock lock(pending_port_merges_lock_);
987     for (const auto& request : pending_port_merges_)
988       parent->RequestPortMerge(request.second.name(), request.first);
989     pending_port_merges_.clear();
990   }
991 
992   // Feed the broker any pending children of our own.
993   while (!pending_broker_clients.empty()) {
994     const ports::NodeName& child_name = pending_broker_clients.front();
995     auto it = pending_children_.find(child_name);
996     DCHECK(it != pending_children_.end());
997     broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
998     pending_broker_clients.pop();
999   }
1000 
1001 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1002   // Have the broker relay any messages we have waiting.
1003   for (auto& entry : pending_relay_messages) {
1004     const ports::NodeName& destination = entry.first;
1005     auto& message_queue = entry.second;
1006     while (!message_queue.empty()) {
1007       broker->RelayPortsMessage(destination, std::move(message_queue.front()));
1008       message_queue.pop();
1009     }
1010   }
1011 #endif
1012 
1013   DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1014 }
1015 
OnPortsMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)1016 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1017                                     Channel::MessagePtr channel_message) {
1018   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1019 
1020   void* data;
1021   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1022   if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1023                          &num_header_bytes, &num_payload_bytes,
1024                          &num_ports_bytes)) {
1025     DropPeer(from_node, nullptr);
1026     return;
1027   }
1028 
1029   CHECK(channel_message);
1030   std::unique_ptr<PortsMessage> ports_message(
1031       new PortsMessage(num_header_bytes,
1032                        num_payload_bytes,
1033                        num_ports_bytes,
1034                        std::move(channel_message)));
1035   ports_message->set_source_node(from_node);
1036   node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1037   AcceptIncomingMessages();
1038 }
1039 
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & token)1040 void NodeController::OnRequestPortMerge(
1041     const ports::NodeName& from_node,
1042     const ports::PortName& connector_port_name,
1043     const std::string& token) {
1044   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1045 
1046   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
1047            << token << " and port " << connector_port_name << "@" << from_node;
1048 
1049   ports::PortRef local_port;
1050   {
1051     base::AutoLock lock(reserved_ports_lock_);
1052     auto it = reserved_ports_.find(token);
1053     if (it == reserved_ports_.end()) {
1054       DVLOG(1) << "Ignoring request to connect to port for unknown token "
1055                << token;
1056       return;
1057     }
1058     local_port = it->second.port;
1059   }
1060 
1061   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1062   if (rv != ports::OK)
1063     DLOG(ERROR) << "MergePorts failed: " << rv;
1064 
1065   AcceptIncomingMessages();
1066 }
1067 
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1068 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1069                                            const ports::NodeName& name) {
1070   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1071 
1072   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1073   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1074     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1075                 << from_node;
1076     DropPeer(from_node, nullptr);
1077     return;
1078   }
1079 
1080   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1081   if (!new_friend) {
1082     // We don't know who they're talking about!
1083     requestor->Introduce(name, ScopedPlatformHandle());
1084   } else {
1085     PlatformChannelPair new_channel;
1086     requestor->Introduce(name, new_channel.PassServerHandle());
1087     new_friend->Introduce(from_node, new_channel.PassClientHandle());
1088   }
1089 }
1090 
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,ScopedPlatformHandle channel_handle)1091 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1092                                  const ports::NodeName& name,
1093                                  ScopedPlatformHandle channel_handle) {
1094   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1095 
1096   if (!channel_handle.is_valid()) {
1097     node_->LostConnectionToNode(name);
1098 
1099     DLOG(ERROR) << "Could not be introduced to peer " << name;
1100     base::AutoLock lock(peers_lock_);
1101     pending_peer_messages_.erase(name);
1102     return;
1103   }
1104 
1105   scoped_refptr<NodeChannel> channel =
1106       NodeChannel::Create(this, std::move(channel_handle), io_task_runner_,
1107                           ProcessErrorCallback());
1108 
1109   DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1110   AddPeer(name, channel, true /* start_channel */);
1111 }
1112 
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1113 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1114                                  Channel::MessagePtr message) {
1115   DCHECK(!message->has_handles());
1116 
1117   void* data;
1118   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1119   if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1120                          &num_header_bytes, &num_payload_bytes,
1121                          &num_ports_bytes)) {
1122     DropPeer(from_node, nullptr);
1123     return;
1124   }
1125 
1126   // Broadcast messages must not contain ports.
1127   if (num_ports_bytes > 0) {
1128     DropPeer(from_node, nullptr);
1129     return;
1130   }
1131 
1132   base::AutoLock lock(peers_lock_);
1133   for (auto& iter : peers_) {
1134     // Copy and send the message to each known peer.
1135     Channel::MessagePtr peer_message(
1136         new Channel::Message(message->payload_size(), 0));
1137     memcpy(peer_message->mutable_payload(), message->payload(),
1138            message->payload_size());
1139     iter.second->PortsMessage(std::move(peer_message));
1140   }
1141 }
1142 
1143 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayPortsMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1144 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1145                                          base::ProcessHandle from_process,
1146                                          const ports::NodeName& destination,
1147                                          Channel::MessagePtr message) {
1148   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1149 
1150   if (GetBrokerChannel()) {
1151     // Only the broker should be asked to relay a message.
1152     LOG(ERROR) << "Non-broker refusing to relay message.";
1153     DropPeer(from_node, nullptr);
1154     return;
1155   }
1156 
1157   // The parent should always know which process this came from.
1158   DCHECK(from_process != base::kNullProcessHandle);
1159 
1160 #if defined(OS_WIN)
1161   // Rewrite the handles to this (the parent) process. If the message is
1162   // destined for another child process, the handles will be rewritten to that
1163   // process before going out (see NodeChannel::WriteChannelMessage).
1164   //
1165   // TODO: We could avoid double-duplication.
1166   //
1167   // Note that we explicitly mark the handles as being owned by the sending
1168   // process before rewriting them, in order to accommodate RewriteHandles'
1169   // internal sanity checks.
1170   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
1171   for (size_t i = 0; i < handles->size(); ++i)
1172     (*handles)[i].owning_process = from_process;
1173   if (!Channel::Message::RewriteHandles(from_process,
1174                                         base::GetCurrentProcessHandle(),
1175                                         handles.get())) {
1176     DLOG(ERROR) << "Failed to relay one or more handles.";
1177   }
1178   message->SetHandles(std::move(handles));
1179 #else
1180   MachPortRelay* relay = GetMachPortRelay();
1181   if (!relay) {
1182     LOG(ERROR) << "Receiving Mach ports without a port relay from "
1183                << from_node << ". Dropping message.";
1184     return;
1185   }
1186   if (!relay->ExtractPortRights(message.get(), from_process)) {
1187     // NodeChannel should ensure that MachPortRelay is ready for the remote
1188     // process. At this point, if the port extraction failed, either something
1189     // went wrong in the mach stuff, or the remote process died.
1190     LOG(ERROR) << "Error on receiving Mach ports " << from_node
1191                << ". Dropping message.";
1192     return;
1193   }
1194 #endif  // defined(OS_WIN)
1195 
1196   if (destination == name_) {
1197     // Great, we can deliver this message locally.
1198     OnPortsMessage(from_node, std::move(message));
1199     return;
1200   }
1201 
1202   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1203   if (peer)
1204     peer->PortsMessageFromRelay(from_node, std::move(message));
1205   else
1206     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1207 }
1208 
OnPortsMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1209 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1210                                              const ports::NodeName& source_node,
1211                                              Channel::MessagePtr message) {
1212   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1213     LOG(ERROR) << "Refusing relayed message from non-broker node.";
1214     DropPeer(from_node, nullptr);
1215     return;
1216   }
1217 
1218   OnPortsMessage(source_node, std::move(message));
1219 }
1220 #endif
1221 
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1222 void NodeController::OnChannelError(const ports::NodeName& from_node,
1223                                     NodeChannel* channel) {
1224   if (io_task_runner_->RunsTasksOnCurrentThread()) {
1225     DropPeer(from_node, channel);
1226     // DropPeer may have caused local port closures, so be sure to process any
1227     // pending local messages.
1228     AcceptIncomingMessages();
1229   } else {
1230     io_task_runner_->PostTask(
1231         FROM_HERE,
1232         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1233                    from_node, channel));
1234   }
1235 }
1236 
1237 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1238 MachPortRelay* NodeController::GetMachPortRelay() {
1239   {
1240     base::AutoLock lock(parent_lock_);
1241     // Return null if we're not the root.
1242     if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1243       return nullptr;
1244   }
1245 
1246   base::AutoLock lock(mach_port_relay_lock_);
1247   return mach_port_relay_.get();
1248 }
1249 #endif
1250 
DestroyOnIOThreadShutdown()1251 void NodeController::DestroyOnIOThreadShutdown() {
1252   destroy_on_io_thread_shutdown_ = true;
1253 }
1254 
AttemptShutdownIfRequested()1255 void NodeController::AttemptShutdownIfRequested() {
1256   if (!shutdown_callback_flag_)
1257     return;
1258 
1259   base::Closure callback;
1260   {
1261     base::AutoLock lock(shutdown_lock_);
1262     if (shutdown_callback_.is_null())
1263       return;
1264     if (!node_->CanShutdownCleanly(true /* allow_local_ports */)) {
1265       DVLOG(2) << "Unable to cleanly shut down node " << name_;
1266       return;
1267     }
1268 
1269     callback = shutdown_callback_;
1270     shutdown_callback_.Reset();
1271     shutdown_callback_flag_.Set(false);
1272   }
1273 
1274   DCHECK(!callback.is_null());
1275 
1276   callback.Run();
1277 }
1278 
1279 }  // namespace edk
1280 }  // namespace mojo
1281