1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/node_controller.h"
6 
7 #include <algorithm>
8 #include <limits>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/metrics/histogram_macros.h"
16 #include "base/process/process_handle.h"
17 #include "base/rand_util.h"
18 #include "base/time/time.h"
19 #include "base/timer/elapsed_timer.h"
20 #include "mojo/edk/embedder/embedder_internal.h"
21 #include "mojo/edk/embedder/named_platform_channel_pair.h"
22 #include "mojo/edk/embedder/named_platform_handle.h"
23 #include "mojo/edk/embedder/platform_channel_pair.h"
24 #include "mojo/edk/system/broker.h"
25 #include "mojo/edk/system/broker_host.h"
26 #include "mojo/edk/system/core.h"
27 #include "mojo/edk/system/ports_message.h"
28 #include "mojo/edk/system/request_context.h"
29 
30 #if defined(OS_MACOSX) && !defined(OS_IOS)
31 #include "mojo/edk/system/mach_port_relay.h"
32 #endif
33 
34 #if !defined(OS_NACL)
35 #include "crypto/random.h"
36 #endif
37 
38 namespace mojo {
39 namespace edk {
40 
41 namespace {
42 
43 #if defined(OS_NACL)
44 template <typename T>
GenerateRandomName(T * out)45 void GenerateRandomName(T* out) { base::RandBytes(out, sizeof(T)); }
46 #else
47 template <typename T>
48 void GenerateRandomName(T* out) { crypto::RandBytes(out, sizeof(T)); }
49 #endif
50 
GetRandomNodeName()51 ports::NodeName GetRandomNodeName() {
52   ports::NodeName name;
53   GenerateRandomName(&name);
54   return name;
55 }
56 
RecordPeerCount(size_t count)57 void RecordPeerCount(size_t count) {
58   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
59 
60   // 8k is the maximum number of file descriptors allowed in Chrome.
61   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.ConnectedPeers",
62                               static_cast<int32_t>(count),
63                               1 /* min */,
64                               8000 /* max */,
65                               50 /* bucket count */);
66 }
67 
RecordPendingChildCount(size_t count)68 void RecordPendingChildCount(size_t count) {
69   DCHECK_LE(count, static_cast<size_t>(std::numeric_limits<int32_t>::max()));
70 
71   // 8k is the maximum number of file descriptors allowed in Chrome.
72   UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.Node.PendingChildren",
73                               static_cast<int32_t>(count),
74                               1 /* min */,
75                               8000 /* max */,
76                               50 /* bucket count */);
77 }
78 
ParsePortsMessage(Channel::Message * message,void ** data,size_t * num_data_bytes,size_t * num_header_bytes,size_t * num_payload_bytes,size_t * num_ports_bytes)79 bool ParsePortsMessage(Channel::Message* message,
80                        void** data,
81                        size_t* num_data_bytes,
82                        size_t* num_header_bytes,
83                        size_t* num_payload_bytes,
84                        size_t* num_ports_bytes) {
85   DCHECK(data && num_data_bytes && num_header_bytes && num_payload_bytes &&
86          num_ports_bytes);
87 
88   NodeChannel::GetPortsMessageData(message, data, num_data_bytes);
89   if (!*num_data_bytes)
90     return false;
91 
92   if (!ports::Message::Parse(*data, *num_data_bytes, num_header_bytes,
93                              num_payload_bytes, num_ports_bytes)) {
94     return false;
95   }
96 
97   return true;
98 }
99 
100 // Used by NodeController to watch for shutdown. Since no IO can happen once
101 // the IO thread is killed, the NodeController can cleanly drop all its peers
102 // at that time.
103 class ThreadDestructionObserver :
104     public base::MessageLoop::DestructionObserver {
105  public:
Create(scoped_refptr<base::TaskRunner> task_runner,const base::Closure & callback)106   static void Create(scoped_refptr<base::TaskRunner> task_runner,
107                      const base::Closure& callback) {
108     if (task_runner->RunsTasksOnCurrentThread()) {
109       // Owns itself.
110       new ThreadDestructionObserver(callback);
111     } else {
112       task_runner->PostTask(FROM_HERE,
113                             base::Bind(&Create, task_runner, callback));
114     }
115   }
116 
117  private:
ThreadDestructionObserver(const base::Closure & callback)118   explicit ThreadDestructionObserver(const base::Closure& callback)
119       : callback_(callback) {
120     base::MessageLoop::current()->AddDestructionObserver(this);
121   }
122 
~ThreadDestructionObserver()123   ~ThreadDestructionObserver() override {
124     base::MessageLoop::current()->RemoveDestructionObserver(this);
125   }
126 
127   // base::MessageLoop::DestructionObserver:
WillDestroyCurrentMessageLoop()128   void WillDestroyCurrentMessageLoop() override {
129     callback_.Run();
130     delete this;
131   }
132 
133   const base::Closure callback_;
134 
135   DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver);
136 };
137 
138 }  // namespace
139 
~NodeController()140 NodeController::~NodeController() {}
141 
NodeController(Core * core)142 NodeController::NodeController(Core* core)
143     : core_(core),
144       name_(GetRandomNodeName()),
145       node_(new ports::Node(name_, this)) {
146   DVLOG(1) << "Initializing node " << name_;
147 }
148 
149 #if defined(OS_MACOSX) && !defined(OS_IOS)
CreateMachPortRelay(base::PortProvider * port_provider)150 void NodeController::CreateMachPortRelay(
151     base::PortProvider* port_provider) {
152   base::AutoLock lock(mach_port_relay_lock_);
153   DCHECK(!mach_port_relay_);
154   mach_port_relay_.reset(new MachPortRelay(port_provider));
155 }
156 #endif
157 
SetIOTaskRunner(scoped_refptr<base::TaskRunner> task_runner)158 void NodeController::SetIOTaskRunner(
159     scoped_refptr<base::TaskRunner> task_runner) {
160   io_task_runner_ = task_runner;
161   ThreadDestructionObserver::Create(
162       io_task_runner_,
163       base::Bind(&NodeController::DropAllPeers, base::Unretained(this)));
164 }
165 
ConnectToChild(base::ProcessHandle process_handle,ConnectionParams connection_params,const std::string & child_token,const ProcessErrorCallback & process_error_callback)166 void NodeController::ConnectToChild(
167     base::ProcessHandle process_handle,
168     ConnectionParams connection_params,
169     const std::string& child_token,
170     const ProcessErrorCallback& process_error_callback) {
171   // Generate the temporary remote node name here so that it can be associated
172   // with the embedder's child_token. If an error occurs in the child process
173   // after it is launched, but before any reserved ports are connected, this can
174   // be used to clean up any dangling ports.
175   ports::NodeName node_name;
176   GenerateRandomName(&node_name);
177 
178   {
179     base::AutoLock lock(reserved_ports_lock_);
180     bool inserted = pending_child_tokens_.insert(
181         std::make_pair(node_name, child_token)).second;
182     DCHECK(inserted);
183   }
184 
185 #if defined(OS_WIN)
186   // On Windows, we need to duplicate the process handle because we have no
187   // control over its lifetime and it may become invalid by the time the posted
188   // task runs.
189   HANDLE dup_handle = INVALID_HANDLE_VALUE;
190   BOOL ok = ::DuplicateHandle(
191       base::GetCurrentProcessHandle(), process_handle,
192       base::GetCurrentProcessHandle(), &dup_handle,
193       0, FALSE, DUPLICATE_SAME_ACCESS);
194   DPCHECK(ok);
195   process_handle = dup_handle;
196 #endif
197 
198   io_task_runner_->PostTask(
199       FROM_HERE, base::Bind(&NodeController::ConnectToChildOnIOThread,
200                             base::Unretained(this), process_handle,
201                             base::Passed(&connection_params), node_name,
202                             process_error_callback));
203 }
204 
CloseChildPorts(const std::string & child_token)205 void NodeController::CloseChildPorts(const std::string& child_token) {
206   std::vector<ports::PortRef> ports_to_close;
207   {
208     std::vector<std::string> port_tokens;
209     base::AutoLock lock(reserved_ports_lock_);
210     for (const auto& port : reserved_ports_) {
211       if (port.second.child_token == child_token) {
212         DVLOG(1) << "Closing reserved port " << port.second.port.name();
213         ports_to_close.push_back(port.second.port);
214         port_tokens.push_back(port.first);
215       }
216     }
217 
218     for (const auto& token : port_tokens)
219       reserved_ports_.erase(token);
220   }
221 
222   for (const auto& port : ports_to_close)
223     node_->ClosePort(port);
224 
225   // Ensure local port closure messages are processed.
226   AcceptIncomingMessages();
227 }
228 
ClosePeerConnection(const std::string & peer_token)229 void NodeController::ClosePeerConnection(const std::string& peer_token) {
230   io_task_runner_->PostTask(
231       FROM_HERE, base::Bind(&NodeController::ClosePeerConnectionOnIOThread,
232                             base::Unretained(this), peer_token));
233 }
234 
ConnectToParent(ConnectionParams connection_params)235 void NodeController::ConnectToParent(ConnectionParams connection_params) {
236 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
237   // Use the bootstrap channel for the broker and receive the node's channel
238   // synchronously as the first message from the broker.
239   base::ElapsedTimer timer;
240   broker_.reset(new Broker(connection_params.TakeChannelHandle()));
241   ScopedPlatformHandle platform_handle = broker_->GetParentPlatformHandle();
242   UMA_HISTOGRAM_TIMES("Mojo.System.GetParentPlatformHandleSyncTime",
243                       timer.Elapsed());
244 
245   if (!platform_handle.is_valid()) {
246     // Most likely the browser side of the channel has already been closed and
247     // the broker was unable to negotiate a NodeChannel pipe. In this case we
248     // can cancel parent connection.
249     DVLOG(1) << "Cannot connect to invalid parent channel.";
250     CancelPendingPortMerges();
251     return;
252   }
253   connection_params = ConnectionParams(std::move(platform_handle));
254 #endif
255 
256   io_task_runner_->PostTask(
257       FROM_HERE,
258       base::Bind(&NodeController::ConnectToParentOnIOThread,
259                  base::Unretained(this), base::Passed(&connection_params)));
260 }
261 
ConnectToPeer(ConnectionParams connection_params,const ports::PortRef & port,const std::string & peer_token)262 void NodeController::ConnectToPeer(ConnectionParams connection_params,
263                                    const ports::PortRef& port,
264                                    const std::string& peer_token) {
265   ports::NodeName node_name;
266   GenerateRandomName(&node_name);
267   io_task_runner_->PostTask(
268       FROM_HERE,
269       base::Bind(&NodeController::ConnectToPeerOnIOThread,
270                  base::Unretained(this), base::Passed(&connection_params),
271                  node_name, port, peer_token));
272 }
273 
SetPortObserver(const ports::PortRef & port,scoped_refptr<PortObserver> observer)274 void NodeController::SetPortObserver(const ports::PortRef& port,
275                                      scoped_refptr<PortObserver> observer) {
276   node_->SetUserData(port, std::move(observer));
277 }
278 
ClosePort(const ports::PortRef & port)279 void NodeController::ClosePort(const ports::PortRef& port) {
280   SetPortObserver(port, nullptr);
281   int rv = node_->ClosePort(port);
282   DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name();
283 
284   AcceptIncomingMessages();
285 }
286 
SendMessage(const ports::PortRef & port,std::unique_ptr<PortsMessage> message)287 int NodeController::SendMessage(const ports::PortRef& port,
288                                 std::unique_ptr<PortsMessage> message) {
289   ports::ScopedMessage ports_message(message.release());
290   int rv = node_->SendMessage(port, std::move(ports_message));
291 
292   AcceptIncomingMessages();
293   return rv;
294 }
295 
ReservePort(const std::string & token,const ports::PortRef & port,const std::string & child_token)296 void NodeController::ReservePort(const std::string& token,
297                                  const ports::PortRef& port,
298                                  const std::string& child_token) {
299   DVLOG(2) << "Reserving port " << port.name() << "@" << name_ << " for token "
300            << token;
301 
302   base::AutoLock lock(reserved_ports_lock_);
303   auto result = reserved_ports_.insert(
304       std::make_pair(token, ReservedPort{port, child_token}));
305   DCHECK(result.second);
306 }
307 
MergePortIntoParent(const std::string & token,const ports::PortRef & port)308 void NodeController::MergePortIntoParent(const std::string& token,
309                                          const ports::PortRef& port) {
310   bool was_merged = false;
311   {
312     // This request may be coming from within the process that reserved the
313     // "parent" side (e.g. for Chrome single-process mode), so if this token is
314     // reserved locally, merge locally instead.
315     base::AutoLock lock(reserved_ports_lock_);
316     auto it = reserved_ports_.find(token);
317     if (it != reserved_ports_.end()) {
318       node_->MergePorts(port, name_, it->second.port.name());
319       reserved_ports_.erase(it);
320       was_merged = true;
321     }
322   }
323   if (was_merged) {
324     AcceptIncomingMessages();
325     return;
326   }
327 
328   scoped_refptr<NodeChannel> parent;
329   bool reject_merge = false;
330   {
331     // Hold |pending_port_merges_lock_| while getting |parent|. Otherwise,
332     // there is a race where the parent can be set, and |pending_port_merges_|
333     // be processed between retrieving |parent| and adding the merge to
334     // |pending_port_merges_|.
335     base::AutoLock lock(pending_port_merges_lock_);
336     parent = GetParentChannel();
337     if (reject_pending_merges_) {
338       reject_merge = true;
339     } else if (!parent) {
340       pending_port_merges_.push_back(std::make_pair(token, port));
341       return;
342     }
343   }
344   if (reject_merge) {
345     node_->ClosePort(port);
346     DVLOG(2) << "Rejecting port merge for token " << token
347              << " due to closed parent channel.";
348     AcceptIncomingMessages();
349     return;
350   }
351 
352   parent->RequestPortMerge(port.name(), token);
353 }
354 
MergeLocalPorts(const ports::PortRef & port0,const ports::PortRef & port1)355 int NodeController::MergeLocalPorts(const ports::PortRef& port0,
356                                     const ports::PortRef& port1) {
357   int rv = node_->MergeLocalPorts(port0, port1);
358   AcceptIncomingMessages();
359   return rv;
360 }
361 
CreateSharedBuffer(size_t num_bytes)362 scoped_refptr<PlatformSharedBuffer> NodeController::CreateSharedBuffer(
363     size_t num_bytes) {
364 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
365   // Shared buffer creation failure is fatal, so always use the broker when we
366   // have one. This does mean that a non-root process that has children will use
367   // the broker for shared buffer creation even though that process is
368   // privileged.
369   if (broker_) {
370     return broker_->GetSharedBuffer(num_bytes);
371   }
372 #endif
373   return PlatformSharedBuffer::Create(num_bytes);
374 }
375 
RequestShutdown(const base::Closure & callback)376 void NodeController::RequestShutdown(const base::Closure& callback) {
377   {
378     base::AutoLock lock(shutdown_lock_);
379     shutdown_callback_ = callback;
380     shutdown_callback_flag_.Set(true);
381   }
382 
383   AttemptShutdownIfRequested();
384 }
385 
NotifyBadMessageFrom(const ports::NodeName & source_node,const std::string & error)386 void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node,
387                                           const std::string& error) {
388   scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node);
389   if (peer)
390     peer->NotifyBadMessage(error);
391 }
392 
ConnectToChildOnIOThread(base::ProcessHandle process_handle,ConnectionParams connection_params,ports::NodeName token,const ProcessErrorCallback & process_error_callback)393 void NodeController::ConnectToChildOnIOThread(
394     base::ProcessHandle process_handle,
395     ConnectionParams connection_params,
396     ports::NodeName token,
397     const ProcessErrorCallback& process_error_callback) {
398   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
399 
400 #if !defined(OS_MACOSX) && !defined(OS_NACL)
401   PlatformChannelPair node_channel;
402   ScopedPlatformHandle server_handle = node_channel.PassServerHandle();
403   // BrokerHost owns itself.
404   BrokerHost* broker_host =
405       new BrokerHost(process_handle, connection_params.TakeChannelHandle());
406   bool channel_ok = broker_host->SendChannel(node_channel.PassClientHandle());
407 
408 #if defined(OS_WIN)
409   if (!channel_ok) {
410     // On Windows the above operation may fail if the channel is crossing a
411     // session boundary. In that case we fall back to a named pipe.
412     NamedPlatformChannelPair named_channel;
413     server_handle = named_channel.PassServerHandle();
414     broker_host->SendNamedChannel(named_channel.handle().name);
415   }
416 #else
417   CHECK(channel_ok);
418 #endif  // defined(OS_WIN)
419 
420   scoped_refptr<NodeChannel> channel =
421       NodeChannel::Create(this, ConnectionParams(std::move(server_handle)),
422                           io_task_runner_, process_error_callback);
423 
424 #else  // !defined(OS_MACOSX) && !defined(OS_NACL)
425   scoped_refptr<NodeChannel> channel =
426       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
427                           process_error_callback);
428 #endif  // !defined(OS_MACOSX) && !defined(OS_NACL)
429 
430   // We set up the child channel with a temporary name so it can be identified
431   // as a pending child if it writes any messages to the channel. We may start
432   // receiving messages from it (though we shouldn't) as soon as Start() is
433   // called below.
434 
435   pending_children_.insert(std::make_pair(token, channel));
436   RecordPendingChildCount(pending_children_.size());
437 
438   channel->SetRemoteNodeName(token);
439   channel->SetRemoteProcessHandle(process_handle);
440   channel->Start();
441 
442   channel->AcceptChild(name_, token);
443 }
444 
ConnectToParentOnIOThread(ConnectionParams connection_params)445 void NodeController::ConnectToParentOnIOThread(
446     ConnectionParams connection_params) {
447   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
448 
449   {
450     base::AutoLock lock(parent_lock_);
451     DCHECK(parent_name_ == ports::kInvalidNodeName);
452 
453     // At this point we don't know the parent's name, so we can't yet insert it
454     // into our |peers_| map. That will happen as soon as we receive an
455     // AcceptChild message from them.
456     bootstrap_parent_channel_ =
457         NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
458                             ProcessErrorCallback());
459     // Prevent the parent pipe handle from being closed on shutdown. Pipe
460     // closure is used by the parent to detect the child process has exited.
461     // Relying on message pipes to be closed is not enough because the parent
462     // may see the message pipe closure before the child is dead, causing the
463     // child process to be unexpectedly SIGKILL'd.
464     bootstrap_parent_channel_->LeakHandleOnShutdown();
465   }
466   bootstrap_parent_channel_->Start();
467 }
468 
ConnectToPeerOnIOThread(ConnectionParams connection_params,ports::NodeName token,ports::PortRef port,const std::string & peer_token)469 void NodeController::ConnectToPeerOnIOThread(ConnectionParams connection_params,
470                                              ports::NodeName token,
471                                              ports::PortRef port,
472                                              const std::string& peer_token) {
473   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
474 
475   scoped_refptr<NodeChannel> channel = NodeChannel::Create(
476       this, std::move(connection_params), io_task_runner_, {});
477   peer_connections_.insert(
478       {token, PeerConnection{channel, port, peer_token}});
479   peers_by_token_.insert({peer_token, token});
480 
481   channel->SetRemoteNodeName(token);
482   channel->Start();
483 
484   channel->AcceptPeer(name_, token, port.name());
485 }
486 
ClosePeerConnectionOnIOThread(const std::string & peer_token)487 void NodeController::ClosePeerConnectionOnIOThread(
488     const std::string& peer_token) {
489   RequestContext request_context(RequestContext::Source::SYSTEM);
490   auto peer = peers_by_token_.find(peer_token);
491   // The connection may already be closed.
492   if (peer == peers_by_token_.end())
493     return;
494 
495   // |peer| may be removed so make a copy of |name|.
496   ports::NodeName name = peer->second;
497   DropPeer(name, nullptr);
498 }
499 
GetPeerChannel(const ports::NodeName & name)500 scoped_refptr<NodeChannel> NodeController::GetPeerChannel(
501     const ports::NodeName& name) {
502   base::AutoLock lock(peers_lock_);
503   auto it = peers_.find(name);
504   if (it == peers_.end())
505     return nullptr;
506   return it->second;
507 }
508 
GetParentChannel()509 scoped_refptr<NodeChannel> NodeController::GetParentChannel() {
510   ports::NodeName parent_name;
511   {
512     base::AutoLock lock(parent_lock_);
513     parent_name = parent_name_;
514   }
515   return GetPeerChannel(parent_name);
516 }
517 
GetBrokerChannel()518 scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() {
519   ports::NodeName broker_name;
520   {
521     base::AutoLock lock(broker_lock_);
522     broker_name = broker_name_;
523   }
524   return GetPeerChannel(broker_name);
525 }
526 
AddPeer(const ports::NodeName & name,scoped_refptr<NodeChannel> channel,bool start_channel)527 void NodeController::AddPeer(const ports::NodeName& name,
528                              scoped_refptr<NodeChannel> channel,
529                              bool start_channel) {
530   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
531 
532   DCHECK(name != ports::kInvalidNodeName);
533   DCHECK(channel);
534 
535   channel->SetRemoteNodeName(name);
536 
537   OutgoingMessageQueue pending_messages;
538   {
539     base::AutoLock lock(peers_lock_);
540     if (peers_.find(name) != peers_.end()) {
541       // This can happen normally if two nodes race to be introduced to each
542       // other. The losing pipe will be silently closed and introduction should
543       // not be affected.
544       DVLOG(1) << "Ignoring duplicate peer name " << name;
545       return;
546     }
547 
548     auto result = peers_.insert(std::make_pair(name, channel));
549     DCHECK(result.second);
550 
551     DVLOG(2) << "Accepting new peer " << name << " on node " << name_;
552 
553     RecordPeerCount(peers_.size());
554 
555     auto it = pending_peer_messages_.find(name);
556     if (it != pending_peer_messages_.end()) {
557       std::swap(pending_messages, it->second);
558       pending_peer_messages_.erase(it);
559     }
560   }
561 
562   if (start_channel)
563     channel->Start();
564 
565   // Flush any queued message we need to deliver to this node.
566   while (!pending_messages.empty()) {
567     channel->PortsMessage(std::move(pending_messages.front()));
568     pending_messages.pop();
569   }
570 }
571 
DropPeer(const ports::NodeName & name,NodeChannel * channel)572 void NodeController::DropPeer(const ports::NodeName& name,
573                               NodeChannel* channel) {
574   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
575 
576   {
577     base::AutoLock lock(peers_lock_);
578     auto it = peers_.find(name);
579 
580     if (it != peers_.end()) {
581       ports::NodeName peer = it->first;
582       peers_.erase(it);
583       DVLOG(1) << "Dropped peer " << peer;
584     }
585 
586     pending_peer_messages_.erase(name);
587     pending_children_.erase(name);
588 
589     RecordPeerCount(peers_.size());
590     RecordPendingChildCount(pending_children_.size());
591   }
592 
593   std::vector<ports::PortRef> ports_to_close;
594   {
595     // Clean up any reserved ports.
596     base::AutoLock lock(reserved_ports_lock_);
597     auto it = pending_child_tokens_.find(name);
598     if (it != pending_child_tokens_.end()) {
599       const std::string& child_token = it->second;
600 
601       std::vector<std::string> port_tokens;
602       for (const auto& port : reserved_ports_) {
603         if (port.second.child_token == child_token) {
604           DVLOG(1) << "Closing reserved port: " << port.second.port.name();
605           ports_to_close.push_back(port.second.port);
606           port_tokens.push_back(port.first);
607         }
608       }
609 
610       // We have to erase reserved ports in a two-step manner because the usual
611       // manner of using the returned iterator from map::erase isn't technically
612       // valid in C++11 (although it is in C++14).
613       for (const auto& token : port_tokens)
614         reserved_ports_.erase(token);
615 
616       pending_child_tokens_.erase(it);
617     }
618   }
619 
620   bool is_parent;
621   {
622     base::AutoLock lock(parent_lock_);
623     is_parent = (name == parent_name_ || channel == bootstrap_parent_channel_);
624   }
625 
626   // If the error comes from the parent channel, we also need to cancel any
627   // port merge requests, so that errors can be propagated to the message
628   // pipes.
629   if (is_parent)
630     CancelPendingPortMerges();
631 
632   auto peer = peer_connections_.find(name);
633   if (peer != peer_connections_.end()) {
634     peers_by_token_.erase(peer->second.peer_token);
635     ports_to_close.push_back(peer->second.local_port);
636     peer_connections_.erase(peer);
637   }
638 
639   for (const auto& port : ports_to_close)
640     node_->ClosePort(port);
641 
642   node_->LostConnectionToNode(name);
643 
644   AcceptIncomingMessages();
645 }
646 
SendPeerMessage(const ports::NodeName & name,ports::ScopedMessage message)647 void NodeController::SendPeerMessage(const ports::NodeName& name,
648                                      ports::ScopedMessage message) {
649   Channel::MessagePtr channel_message =
650       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
651 
652   scoped_refptr<NodeChannel> peer = GetPeerChannel(name);
653 #if defined(OS_WIN)
654   if (channel_message->has_handles()) {
655     // If we're sending a message with handles we aren't the destination
656     // node's parent or broker (i.e. we don't know its process handle), ask
657     // the broker to relay for us.
658     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
659     if (!peer || !peer->HasRemoteProcessHandle()) {
660       if (broker) {
661         broker->RelayPortsMessage(name, std::move(channel_message));
662       } else {
663         base::AutoLock lock(broker_lock_);
664         pending_relay_messages_[name].emplace(std::move(channel_message));
665       }
666       return;
667     }
668   }
669 #elif defined(OS_MACOSX) && !defined(OS_IOS)
670   if (channel_message->has_mach_ports()) {
671     // Messages containing Mach ports are always routed through the broker, even
672     // if the broker process is the intended recipient.
673     bool use_broker = false;
674     {
675       base::AutoLock lock(parent_lock_);
676       use_broker = (bootstrap_parent_channel_ ||
677                     parent_name_ != ports::kInvalidNodeName);
678     }
679     if (use_broker) {
680       scoped_refptr<NodeChannel> broker = GetBrokerChannel();
681       if (broker) {
682         broker->RelayPortsMessage(name, std::move(channel_message));
683       } else {
684         base::AutoLock lock(broker_lock_);
685         pending_relay_messages_[name].emplace(std::move(channel_message));
686       }
687       return;
688     }
689   }
690 #endif  // defined(OS_WIN)
691 
692   if (peer) {
693     peer->PortsMessage(std::move(channel_message));
694     return;
695   }
696 
697   // If we don't know who the peer is, queue the message for delivery. If this
698   // is the first message queued for the peer, we also ask the broker to
699   // introduce us to them.
700 
701   bool needs_introduction = false;
702   {
703     base::AutoLock lock(peers_lock_);
704     auto& queue = pending_peer_messages_[name];
705     needs_introduction = queue.empty();
706     queue.emplace(std::move(channel_message));
707   }
708 
709   if (needs_introduction) {
710     scoped_refptr<NodeChannel> broker = GetBrokerChannel();
711     if (!broker) {
712       DVLOG(1) << "Dropping message for unknown peer: " << name;
713       return;
714     }
715     broker->RequestIntroduction(name);
716   }
717 }
718 
AcceptIncomingMessages()719 void NodeController::AcceptIncomingMessages() {
720   // This is an impactically large value which should never be reached in
721   // practice. See the CHECK below for usage.
722   constexpr size_t kMaxAcceptedMessages = 1000000;
723 
724   size_t num_messages_accepted = 0;
725   while (incoming_messages_flag_) {
726     // TODO: We may need to be more careful to avoid starving the rest of the
727     // thread here. Revisit this if it turns out to be a problem. One
728     // alternative would be to schedule a task to continue pumping messages
729     // after flushing once.
730 
731     messages_lock_.Acquire();
732     if (incoming_messages_.empty()) {
733       messages_lock_.Release();
734       break;
735     }
736 
737     // libstdc++'s deque creates an internal buffer on construction, even when
738     // the size is 0. So avoid creating it until it is necessary.
739     std::queue<ports::ScopedMessage> messages;
740     std::swap(messages, incoming_messages_);
741     incoming_messages_flag_.Set(false);
742     messages_lock_.Release();
743 
744     num_messages_accepted += messages.size();
745     while (!messages.empty()) {
746       node_->AcceptMessage(std::move(messages.front()));
747       messages.pop();
748     }
749 
750     // This is effectively a safeguard against potential bugs which might lead
751     // to runaway message cycles. If any such cycles arise, we'll start seeing
752     // crash reports from this location.
753     CHECK_LE(num_messages_accepted, kMaxAcceptedMessages);
754   }
755 
756   if (num_messages_accepted >= 4) {
757     // Note: We avoid logging this histogram for the vast majority of cases.
758     // See https://crbug.com/685763 for more context.
759     UMA_HISTOGRAM_CUSTOM_COUNTS("Mojo.System.MessagesAcceptedPerEvent",
760                                 static_cast<int32_t>(num_messages_accepted),
761                                 1 /* min */,
762                                 500 /* max */,
763                                 50 /* bucket count */);
764   }
765 
766   AttemptShutdownIfRequested();
767 }
768 
ProcessIncomingMessages()769 void NodeController::ProcessIncomingMessages() {
770   RequestContext request_context(RequestContext::Source::SYSTEM);
771 
772   {
773     base::AutoLock lock(messages_lock_);
774     // Allow a new incoming messages processing task to be posted. This can't be
775     // done after AcceptIncomingMessages() otherwise a message might be missed.
776     // Doing it here may result in at most two tasks existing at the same time;
777     // this running one, and one pending in the task runner.
778     incoming_messages_task_posted_ = false;
779   }
780 
781   AcceptIncomingMessages();
782 }
783 
DropAllPeers()784 void NodeController::DropAllPeers() {
785   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
786 
787   std::vector<scoped_refptr<NodeChannel>> all_peers;
788   {
789     base::AutoLock lock(parent_lock_);
790     if (bootstrap_parent_channel_) {
791       // |bootstrap_parent_channel_| isn't null'd here becuase we rely on its
792       // existence to determine whether or not this is the root node. Once
793       // bootstrap_parent_channel_->ShutDown() has been called,
794       // |bootstrap_parent_channel_| is essentially a dead object and it doesn't
795       // matter if it's deleted now or when |this| is deleted.
796       // Note: |bootstrap_parent_channel_| is only modified on the IO thread.
797       all_peers.push_back(bootstrap_parent_channel_);
798     }
799   }
800 
801   {
802     base::AutoLock lock(peers_lock_);
803     for (const auto& peer : peers_)
804       all_peers.push_back(peer.second);
805     for (const auto& peer : pending_children_)
806       all_peers.push_back(peer.second);
807     peers_.clear();
808     pending_children_.clear();
809     pending_peer_messages_.clear();
810     peer_connections_.clear();
811   }
812 
813   for (const auto& peer : all_peers)
814     peer->ShutDown();
815 
816   if (destroy_on_io_thread_shutdown_)
817     delete this;
818 }
819 
GenerateRandomPortName(ports::PortName * port_name)820 void NodeController::GenerateRandomPortName(ports::PortName* port_name) {
821   GenerateRandomName(port_name);
822 }
823 
AllocMessage(size_t num_header_bytes,ports::ScopedMessage * message)824 void NodeController::AllocMessage(size_t num_header_bytes,
825                                   ports::ScopedMessage* message) {
826   message->reset(new PortsMessage(num_header_bytes, 0, 0, nullptr));
827 }
828 
ForwardMessage(const ports::NodeName & node,ports::ScopedMessage message)829 void NodeController::ForwardMessage(const ports::NodeName& node,
830                                     ports::ScopedMessage message) {
831   DCHECK(message);
832   bool schedule_pump_task = false;
833   if (node == name_) {
834     // NOTE: We need to avoid re-entering the Node instance within
835     // ForwardMessage. Because ForwardMessage is only ever called
836     // (synchronously) in response to Node's ClosePort, SendMessage, or
837     // AcceptMessage, we flush the queue after calling any of those methods.
838     base::AutoLock lock(messages_lock_);
839     // |io_task_runner_| may be null in tests or processes that don't require
840     // multi-process Mojo.
841     schedule_pump_task = incoming_messages_.empty() && io_task_runner_ &&
842         !incoming_messages_task_posted_;
843     incoming_messages_task_posted_ |= schedule_pump_task;
844     incoming_messages_.emplace(std::move(message));
845     incoming_messages_flag_.Set(true);
846   } else {
847     SendPeerMessage(node, std::move(message));
848   }
849 
850   if (schedule_pump_task) {
851     // Normally, the queue is processed after the action that added the local
852     // message is done (i.e. SendMessage, ClosePort, etc). However, it's also
853     // possible for a local message to be added as a result of a remote message,
854     // and OnChannelMessage() doesn't process this queue (although
855     // OnPortsMessage() does). There may also be other code paths, now or added
856     // in the future, which cause local messages to be added but don't process
857     // this message queue.
858     //
859     // Instead of adding a call to AcceptIncomingMessages() on every possible
860     // code path, post a task to the IO thread to process the queue. If the
861     // current call stack processes the queue, this may end up doing nothing.
862     io_task_runner_->PostTask(
863         FROM_HERE,
864         base::Bind(&NodeController::ProcessIncomingMessages,
865                    base::Unretained(this)));
866   }
867 }
868 
BroadcastMessage(ports::ScopedMessage message)869 void NodeController::BroadcastMessage(ports::ScopedMessage message) {
870   CHECK_EQ(message->num_ports(), 0u);
871   Channel::MessagePtr channel_message =
872       static_cast<PortsMessage*>(message.get())->TakeChannelMessage();
873   CHECK(!channel_message->has_handles());
874 
875   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
876   if (broker)
877     broker->Broadcast(std::move(channel_message));
878   else
879     OnBroadcast(name_, std::move(channel_message));
880 }
881 
PortStatusChanged(const ports::PortRef & port)882 void NodeController::PortStatusChanged(const ports::PortRef& port) {
883   scoped_refptr<ports::UserData> user_data;
884   node_->GetUserData(port, &user_data);
885 
886   PortObserver* observer = static_cast<PortObserver*>(user_data.get());
887   if (observer) {
888     observer->OnPortStatusChanged();
889   } else {
890     DVLOG(2) << "Ignoring status change for " << port.name() << " because it "
891              << "doesn't have an observer.";
892   }
893 }
894 
OnAcceptChild(const ports::NodeName & from_node,const ports::NodeName & parent_name,const ports::NodeName & token)895 void NodeController::OnAcceptChild(const ports::NodeName& from_node,
896                                    const ports::NodeName& parent_name,
897                                    const ports::NodeName& token) {
898   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
899 
900   scoped_refptr<NodeChannel> parent;
901   {
902     base::AutoLock lock(parent_lock_);
903     if (bootstrap_parent_channel_ && parent_name_ == ports::kInvalidNodeName) {
904       parent_name_ = parent_name;
905       parent = bootstrap_parent_channel_;
906     }
907   }
908 
909   if (!parent) {
910     DLOG(ERROR) << "Unexpected AcceptChild message from " << from_node;
911     DropPeer(from_node, nullptr);
912     return;
913   }
914 
915   parent->SetRemoteNodeName(parent_name);
916   parent->AcceptParent(token, name_);
917 
918   // NOTE: The child does not actually add its parent as a peer until
919   // receiving an AcceptBrokerClient message from the broker. The parent
920   // will request that said message be sent upon receiving AcceptParent.
921 
922   DVLOG(1) << "Child " << name_ << " accepting parent " << parent_name;
923 }
924 
OnAcceptParent(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & child_name)925 void NodeController::OnAcceptParent(const ports::NodeName& from_node,
926                                     const ports::NodeName& token,
927                                     const ports::NodeName& child_name) {
928   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
929 
930   auto it = pending_children_.find(from_node);
931   if (it == pending_children_.end() || token != from_node) {
932     DLOG(ERROR) << "Received unexpected AcceptParent message from "
933                 << from_node;
934     DropPeer(from_node, nullptr);
935     return;
936   }
937 
938   scoped_refptr<NodeChannel> channel = it->second;
939   pending_children_.erase(it);
940 
941   DCHECK(channel);
942 
943   DVLOG(1) << "Parent " << name_ << " accepted child " << child_name;
944 
945   AddPeer(child_name, channel, false /* start_channel */);
946 
947   // TODO(rockot/amistry): We could simplify child initialization if we could
948   // synchronously get a new async broker channel from the broker. For now we do
949   // it asynchronously since it's only used to facilitate handle passing, not
950   // handle creation.
951   scoped_refptr<NodeChannel> broker = GetBrokerChannel();
952   if (broker) {
953     // Inform the broker of this new child.
954     broker->AddBrokerClient(child_name, channel->CopyRemoteProcessHandle());
955   } else {
956     // If we have no broker, either we need to wait for one, or we *are* the
957     // broker.
958     scoped_refptr<NodeChannel> parent = GetParentChannel();
959     if (!parent) {
960       base::AutoLock lock(parent_lock_);
961       parent = bootstrap_parent_channel_;
962     }
963 
964     if (!parent) {
965       // Yes, we're the broker. We can initialize the child directly.
966       channel->AcceptBrokerClient(name_, ScopedPlatformHandle());
967     } else {
968       // We aren't the broker, so wait for a broker connection.
969       base::AutoLock lock(broker_lock_);
970       pending_broker_clients_.push(child_name);
971     }
972   }
973 }
974 
OnAddBrokerClient(const ports::NodeName & from_node,const ports::NodeName & client_name,base::ProcessHandle process_handle)975 void NodeController::OnAddBrokerClient(const ports::NodeName& from_node,
976                                        const ports::NodeName& client_name,
977                                        base::ProcessHandle process_handle) {
978 #if defined(OS_WIN)
979   // Scoped handle to avoid leaks on error.
980   ScopedPlatformHandle scoped_process_handle =
981       ScopedPlatformHandle(PlatformHandle(process_handle));
982 #endif
983   scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node);
984   if (!sender) {
985     DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender.";
986     return;
987   }
988 
989   if (GetPeerChannel(client_name)) {
990     DLOG(ERROR) << "Ignoring AddBrokerClient for known client.";
991     DropPeer(from_node, nullptr);
992     return;
993   }
994 
995   PlatformChannelPair broker_channel;
996   ConnectionParams connection_params(broker_channel.PassServerHandle());
997   scoped_refptr<NodeChannel> client =
998       NodeChannel::Create(this, std::move(connection_params), io_task_runner_,
999                           ProcessErrorCallback());
1000 
1001 #if defined(OS_WIN)
1002   // The broker must have a working handle to the client process in order to
1003   // properly copy other handles to and from the client.
1004   if (!scoped_process_handle.is_valid()) {
1005     DLOG(ERROR) << "Broker rejecting client with invalid process handle.";
1006     return;
1007   }
1008   client->SetRemoteProcessHandle(scoped_process_handle.release().handle);
1009 #else
1010   client->SetRemoteProcessHandle(process_handle);
1011 #endif
1012 
1013   AddPeer(client_name, client, true /* start_channel */);
1014 
1015   DVLOG(1) << "Broker " << name_ << " accepting client " << client_name
1016            << " from peer " << from_node;
1017 
1018   sender->BrokerClientAdded(client_name, broker_channel.PassClientHandle());
1019 }
1020 
OnBrokerClientAdded(const ports::NodeName & from_node,const ports::NodeName & client_name,ScopedPlatformHandle broker_channel)1021 void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node,
1022                                          const ports::NodeName& client_name,
1023                                          ScopedPlatformHandle broker_channel) {
1024   scoped_refptr<NodeChannel> client = GetPeerChannel(client_name);
1025   if (!client) {
1026     DLOG(ERROR) << "BrokerClientAdded for unknown child " << client_name;
1027     return;
1028   }
1029 
1030   // This should have come from our own broker.
1031   if (GetBrokerChannel() != GetPeerChannel(from_node)) {
1032     DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node;
1033     return;
1034   }
1035 
1036   DVLOG(1) << "Child " << client_name << " accepted by broker " << from_node;
1037 
1038   client->AcceptBrokerClient(from_node, std::move(broker_channel));
1039 }
1040 
OnAcceptBrokerClient(const ports::NodeName & from_node,const ports::NodeName & broker_name,ScopedPlatformHandle broker_channel)1041 void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node,
1042                                           const ports::NodeName& broker_name,
1043                                           ScopedPlatformHandle broker_channel) {
1044   // This node should already have a parent in bootstrap mode.
1045   ports::NodeName parent_name;
1046   scoped_refptr<NodeChannel> parent;
1047   {
1048     base::AutoLock lock(parent_lock_);
1049     parent_name = parent_name_;
1050     parent = bootstrap_parent_channel_;
1051     bootstrap_parent_channel_ = nullptr;
1052   }
1053   DCHECK(parent_name == from_node);
1054   DCHECK(parent);
1055 
1056   std::queue<ports::NodeName> pending_broker_clients;
1057   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
1058       pending_relay_messages;
1059   {
1060     base::AutoLock lock(broker_lock_);
1061     broker_name_ = broker_name;
1062     std::swap(pending_broker_clients, pending_broker_clients_);
1063     std::swap(pending_relay_messages, pending_relay_messages_);
1064   }
1065   DCHECK(broker_name != ports::kInvalidNodeName);
1066 
1067   // It's now possible to add both the broker and the parent as peers.
1068   // Note that the broker and parent may be the same node.
1069   scoped_refptr<NodeChannel> broker;
1070   if (broker_name == parent_name) {
1071     DCHECK(!broker_channel.is_valid());
1072     broker = parent;
1073   } else {
1074     DCHECK(broker_channel.is_valid());
1075     broker =
1076         NodeChannel::Create(this, ConnectionParams(std::move(broker_channel)),
1077                             io_task_runner_, ProcessErrorCallback());
1078     AddPeer(broker_name, broker, true /* start_channel */);
1079   }
1080 
1081   AddPeer(parent_name, parent, false /* start_channel */);
1082 
1083   {
1084     // Complete any port merge requests we have waiting for the parent.
1085     base::AutoLock lock(pending_port_merges_lock_);
1086     for (const auto& request : pending_port_merges_)
1087       parent->RequestPortMerge(request.second.name(), request.first);
1088     pending_port_merges_.clear();
1089   }
1090 
1091   // Feed the broker any pending children of our own.
1092   while (!pending_broker_clients.empty()) {
1093     const ports::NodeName& child_name = pending_broker_clients.front();
1094     auto it = pending_children_.find(child_name);
1095     DCHECK(it != pending_children_.end());
1096     broker->AddBrokerClient(child_name, it->second->CopyRemoteProcessHandle());
1097     pending_broker_clients.pop();
1098   }
1099 
1100 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
1101   // Have the broker relay any messages we have waiting.
1102   for (auto& entry : pending_relay_messages) {
1103     const ports::NodeName& destination = entry.first;
1104     auto& message_queue = entry.second;
1105     while (!message_queue.empty()) {
1106       broker->RelayPortsMessage(destination, std::move(message_queue.front()));
1107       message_queue.pop();
1108     }
1109   }
1110 #endif
1111 
1112   DVLOG(1) << "Child " << name_ << " accepted by broker " << broker_name;
1113 }
1114 
OnPortsMessage(const ports::NodeName & from_node,Channel::MessagePtr channel_message)1115 void NodeController::OnPortsMessage(const ports::NodeName& from_node,
1116                                     Channel::MessagePtr channel_message) {
1117   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1118 
1119   void* data;
1120   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1121   if (!ParsePortsMessage(channel_message.get(), &data, &num_data_bytes,
1122                          &num_header_bytes, &num_payload_bytes,
1123                          &num_ports_bytes)) {
1124     DropPeer(from_node, nullptr);
1125     return;
1126   }
1127 
1128   CHECK(channel_message);
1129   std::unique_ptr<PortsMessage> ports_message(
1130       new PortsMessage(num_header_bytes,
1131                        num_payload_bytes,
1132                        num_ports_bytes,
1133                        std::move(channel_message)));
1134   ports_message->set_source_node(from_node);
1135   node_->AcceptMessage(ports::ScopedMessage(ports_message.release()));
1136   AcceptIncomingMessages();
1137 }
1138 
OnRequestPortMerge(const ports::NodeName & from_node,const ports::PortName & connector_port_name,const std::string & token)1139 void NodeController::OnRequestPortMerge(
1140     const ports::NodeName& from_node,
1141     const ports::PortName& connector_port_name,
1142     const std::string& token) {
1143   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1144 
1145   DVLOG(2) << "Node " << name_ << " received RequestPortMerge for token "
1146            << token << " and port " << connector_port_name << "@" << from_node;
1147 
1148   ports::PortRef local_port;
1149   {
1150     base::AutoLock lock(reserved_ports_lock_);
1151     auto it = reserved_ports_.find(token);
1152     if (it == reserved_ports_.end()) {
1153       DVLOG(1) << "Ignoring request to connect to port for unknown token "
1154                << token;
1155       return;
1156     }
1157     local_port = it->second.port;
1158   }
1159 
1160   int rv = node_->MergePorts(local_port, from_node, connector_port_name);
1161   if (rv != ports::OK)
1162     DLOG(ERROR) << "MergePorts failed: " << rv;
1163 
1164   AcceptIncomingMessages();
1165 }
1166 
OnRequestIntroduction(const ports::NodeName & from_node,const ports::NodeName & name)1167 void NodeController::OnRequestIntroduction(const ports::NodeName& from_node,
1168                                            const ports::NodeName& name) {
1169   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1170 
1171   scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node);
1172   if (from_node == name || name == ports::kInvalidNodeName || !requestor) {
1173     DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from "
1174                 << from_node;
1175     DropPeer(from_node, nullptr);
1176     return;
1177   }
1178 
1179   scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name);
1180   if (!new_friend) {
1181     // We don't know who they're talking about!
1182     requestor->Introduce(name, ScopedPlatformHandle());
1183   } else {
1184     PlatformChannelPair new_channel;
1185     requestor->Introduce(name, new_channel.PassServerHandle());
1186     new_friend->Introduce(from_node, new_channel.PassClientHandle());
1187   }
1188 }
1189 
OnIntroduce(const ports::NodeName & from_node,const ports::NodeName & name,ScopedPlatformHandle channel_handle)1190 void NodeController::OnIntroduce(const ports::NodeName& from_node,
1191                                  const ports::NodeName& name,
1192                                  ScopedPlatformHandle channel_handle) {
1193   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1194 
1195   if (!channel_handle.is_valid()) {
1196     node_->LostConnectionToNode(name);
1197 
1198     DVLOG(1) << "Could not be introduced to peer " << name;
1199     base::AutoLock lock(peers_lock_);
1200     pending_peer_messages_.erase(name);
1201     return;
1202   }
1203 
1204   scoped_refptr<NodeChannel> channel =
1205       NodeChannel::Create(this, ConnectionParams(std::move(channel_handle)),
1206                           io_task_runner_, ProcessErrorCallback());
1207 
1208   DVLOG(1) << "Adding new peer " << name << " via parent introduction.";
1209   AddPeer(name, channel, true /* start_channel */);
1210 }
1211 
OnBroadcast(const ports::NodeName & from_node,Channel::MessagePtr message)1212 void NodeController::OnBroadcast(const ports::NodeName& from_node,
1213                                  Channel::MessagePtr message) {
1214   DCHECK(!message->has_handles());
1215 
1216   void* data;
1217   size_t num_data_bytes, num_header_bytes, num_payload_bytes, num_ports_bytes;
1218   if (!ParsePortsMessage(message.get(), &data, &num_data_bytes,
1219                          &num_header_bytes, &num_payload_bytes,
1220                          &num_ports_bytes)) {
1221     DropPeer(from_node, nullptr);
1222     return;
1223   }
1224 
1225   // Broadcast messages must not contain ports.
1226   if (num_ports_bytes > 0) {
1227     DropPeer(from_node, nullptr);
1228     return;
1229   }
1230 
1231   base::AutoLock lock(peers_lock_);
1232   for (auto& iter : peers_) {
1233     // Copy and send the message to each known peer.
1234     Channel::MessagePtr peer_message(
1235         new Channel::Message(message->payload_size(), 0));
1236     memcpy(peer_message->mutable_payload(), message->payload(),
1237            message->payload_size());
1238     iter.second->PortsMessage(std::move(peer_message));
1239   }
1240 }
1241 
1242 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
OnRelayPortsMessage(const ports::NodeName & from_node,base::ProcessHandle from_process,const ports::NodeName & destination,Channel::MessagePtr message)1243 void NodeController::OnRelayPortsMessage(const ports::NodeName& from_node,
1244                                          base::ProcessHandle from_process,
1245                                          const ports::NodeName& destination,
1246                                          Channel::MessagePtr message) {
1247   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1248 
1249   if (GetBrokerChannel()) {
1250     // Only the broker should be asked to relay a message.
1251     LOG(ERROR) << "Non-broker refusing to relay message.";
1252     DropPeer(from_node, nullptr);
1253     return;
1254   }
1255 
1256   // The parent should always know which process this came from.
1257   DCHECK(from_process != base::kNullProcessHandle);
1258 
1259 #if defined(OS_WIN)
1260   // Rewrite the handles to this (the parent) process. If the message is
1261   // destined for another child process, the handles will be rewritten to that
1262   // process before going out (see NodeChannel::WriteChannelMessage).
1263   //
1264   // TODO: We could avoid double-duplication.
1265   //
1266   // Note that we explicitly mark the handles as being owned by the sending
1267   // process before rewriting them, in order to accommodate RewriteHandles'
1268   // internal sanity checks.
1269   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
1270   for (size_t i = 0; i < handles->size(); ++i)
1271     (*handles)[i].owning_process = from_process;
1272   if (!Channel::Message::RewriteHandles(from_process,
1273                                         base::GetCurrentProcessHandle(),
1274                                         handles.get())) {
1275     DLOG(ERROR) << "Failed to relay one or more handles.";
1276   }
1277   message->SetHandles(std::move(handles));
1278 #else
1279   MachPortRelay* relay = GetMachPortRelay();
1280   if (!relay) {
1281     LOG(ERROR) << "Receiving Mach ports without a port relay from "
1282                << from_node << ". Dropping message.";
1283     return;
1284   }
1285   if (!relay->ExtractPortRights(message.get(), from_process)) {
1286     // NodeChannel should ensure that MachPortRelay is ready for the remote
1287     // process. At this point, if the port extraction failed, either something
1288     // went wrong in the mach stuff, or the remote process died.
1289     LOG(ERROR) << "Error on receiving Mach ports " << from_node
1290                << ". Dropping message.";
1291     return;
1292   }
1293 #endif  // defined(OS_WIN)
1294 
1295   if (destination == name_) {
1296     // Great, we can deliver this message locally.
1297     OnPortsMessage(from_node, std::move(message));
1298     return;
1299   }
1300 
1301   scoped_refptr<NodeChannel> peer = GetPeerChannel(destination);
1302   if (peer)
1303     peer->PortsMessageFromRelay(from_node, std::move(message));
1304   else
1305     DLOG(ERROR) << "Dropping relay message for unknown node " << destination;
1306 }
1307 
OnPortsMessageFromRelay(const ports::NodeName & from_node,const ports::NodeName & source_node,Channel::MessagePtr message)1308 void NodeController::OnPortsMessageFromRelay(const ports::NodeName& from_node,
1309                                              const ports::NodeName& source_node,
1310                                              Channel::MessagePtr message) {
1311   if (GetPeerChannel(from_node) != GetBrokerChannel()) {
1312     LOG(ERROR) << "Refusing relayed message from non-broker node.";
1313     DropPeer(from_node, nullptr);
1314     return;
1315   }
1316 
1317   OnPortsMessage(source_node, std::move(message));
1318 }
1319 #endif
1320 
OnAcceptPeer(const ports::NodeName & from_node,const ports::NodeName & token,const ports::NodeName & peer_name,const ports::PortName & port_name)1321 void NodeController::OnAcceptPeer(const ports::NodeName& from_node,
1322                                   const ports::NodeName& token,
1323                                   const ports::NodeName& peer_name,
1324                                   const ports::PortName& port_name) {
1325   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1326 
1327   auto it = peer_connections_.find(from_node);
1328   if (it == peer_connections_.end()) {
1329     DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node;
1330     DropPeer(from_node, nullptr);
1331     return;
1332   }
1333 
1334   scoped_refptr<NodeChannel> channel = std::move(it->second.channel);
1335   ports::PortRef local_port = it->second.local_port;
1336   std::string peer_token = std::move(it->second.peer_token);
1337   peer_connections_.erase(it);
1338   DCHECK(channel);
1339 
1340   // If the peer connection is a self connection (which is used in tests),
1341   // drop the channel to it and skip straight to merging the ports.
1342   if (name_ == peer_name) {
1343     peers_by_token_.erase(peer_token);
1344   } else {
1345     peers_by_token_[peer_token] = peer_name;
1346     peer_connections_.insert(
1347         {peer_name, PeerConnection{nullptr, local_port, peer_token}});
1348     DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name;
1349 
1350     AddPeer(peer_name, channel, false /* start_channel */);
1351   }
1352 
1353   // We need to choose one side to initiate the port merge. It doesn't matter
1354   // who does it as long as they don't both try. Simple solution: pick the one
1355   // with the "smaller" port name.
1356   if (local_port.name() < port_name) {
1357     node()->MergePorts(local_port, peer_name, port_name);
1358   }
1359 }
1360 
OnChannelError(const ports::NodeName & from_node,NodeChannel * channel)1361 void NodeController::OnChannelError(const ports::NodeName& from_node,
1362                                     NodeChannel* channel) {
1363   if (io_task_runner_->RunsTasksOnCurrentThread()) {
1364     DropPeer(from_node, channel);
1365     // DropPeer may have caused local port closures, so be sure to process any
1366     // pending local messages.
1367     AcceptIncomingMessages();
1368   } else {
1369     io_task_runner_->PostTask(
1370         FROM_HERE,
1371         base::Bind(&NodeController::OnChannelError, base::Unretained(this),
1372                    from_node, channel));
1373   }
1374 }
1375 
1376 #if defined(OS_MACOSX) && !defined(OS_IOS)
GetMachPortRelay()1377 MachPortRelay* NodeController::GetMachPortRelay() {
1378   {
1379     base::AutoLock lock(parent_lock_);
1380     // Return null if we're not the root.
1381     if (bootstrap_parent_channel_ || parent_name_ != ports::kInvalidNodeName)
1382       return nullptr;
1383   }
1384 
1385   base::AutoLock lock(mach_port_relay_lock_);
1386   return mach_port_relay_.get();
1387 }
1388 #endif
1389 
CancelPendingPortMerges()1390 void NodeController::CancelPendingPortMerges() {
1391   std::vector<ports::PortRef> ports_to_close;
1392 
1393   {
1394     base::AutoLock lock(pending_port_merges_lock_);
1395     reject_pending_merges_ = true;
1396     for (const auto& port : pending_port_merges_)
1397       ports_to_close.push_back(port.second);
1398     pending_port_merges_.clear();
1399   }
1400 
1401   for (const auto& port : ports_to_close)
1402     node_->ClosePort(port);
1403 }
1404 
DestroyOnIOThreadShutdown()1405 void NodeController::DestroyOnIOThreadShutdown() {
1406   destroy_on_io_thread_shutdown_ = true;
1407 }
1408 
AttemptShutdownIfRequested()1409 void NodeController::AttemptShutdownIfRequested() {
1410   if (!shutdown_callback_flag_)
1411     return;
1412 
1413   base::Closure callback;
1414   {
1415     base::AutoLock lock(shutdown_lock_);
1416     if (shutdown_callback_.is_null())
1417       return;
1418     if (!node_->CanShutdownCleanly(
1419           ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) {
1420       DVLOG(2) << "Unable to cleanly shut down node " << name_;
1421       return;
1422     }
1423 
1424     callback = shutdown_callback_;
1425     shutdown_callback_.Reset();
1426     shutdown_callback_flag_.Set(false);
1427   }
1428 
1429   DCHECK(!callback.is_null());
1430 
1431   callback.Run();
1432 }
1433 
1434 NodeController::PeerConnection::PeerConnection() = default;
1435 
1436 NodeController::PeerConnection::PeerConnection(
1437     const PeerConnection& other) = default;
1438 
1439 NodeController::PeerConnection::PeerConnection(
1440     PeerConnection&& other) = default;
1441 
PeerConnection(scoped_refptr<NodeChannel> channel,const ports::PortRef & local_port,const std::string & peer_token)1442 NodeController::PeerConnection::PeerConnection(
1443     scoped_refptr<NodeChannel> channel,
1444     const ports::PortRef& local_port,
1445     const std::string& peer_token)
1446     : channel(std::move(channel)),
1447       local_port(local_port),
1448       peer_token(peer_token) {}
1449 
1450 NodeController::PeerConnection::~PeerConnection() = default;
1451 
1452 NodeController::PeerConnection& NodeController::PeerConnection::
1453 operator=(const PeerConnection& other) = default;
1454 
1455 NodeController::PeerConnection& NodeController::PeerConnection::
1456 operator=(PeerConnection&& other) = default;
1457 
1458 }  // namespace edk
1459 }  // namespace mojo
1460