1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/ports/node.h"
6 
7 #include <string.h>
8 
9 #include <utility>
10 
11 #include "base/atomicops.h"
12 #include "base/logging.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/edk/system/ports/node_delegate.h"
16 
17 namespace mojo {
18 namespace edk {
19 namespace ports {
20 
21 namespace {
22 
DebugError(const char * message,int error_code)23 int DebugError(const char* message, int error_code) {
24   CHECK(false) << "Oops: " << message;
25   return error_code;
26 }
27 
28 #define OOPS(x) DebugError(#x, x)
29 
CanAcceptMoreMessages(const Port * port)30 bool CanAcceptMoreMessages(const Port* port) {
31   // Have we already doled out the last message (i.e., do we expect to NOT
32   // receive further messages)?
33   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
34   if (port->state == Port::kClosed)
35     return false;
36   if (port->peer_closed || port->remove_proxy_on_last_message) {
37     if (port->last_sequence_num_to_receive == next_sequence_num - 1)
38       return false;
39   }
40   return true;
41 }
42 
43 }  // namespace
44 
45 class Node::LockedPort {
46  public:
LockedPort(Port * port)47   explicit LockedPort(Port* port) : port_(port) {
48     port_->lock.AssertAcquired();
49   }
50 
get() const51   Port* get() const { return port_; }
operator ->() const52   Port* operator->() const { return port_; }
53 
54  private:
55   Port* const port_;
56 };
57 
Node(const NodeName & name,NodeDelegate * delegate)58 Node::Node(const NodeName& name, NodeDelegate* delegate)
59     : name_(name),
60       delegate_(delegate) {
61 }
62 
~Node()63 Node::~Node() {
64   if (!ports_.empty())
65     DLOG(WARNING) << "Unclean shutdown for node " << name_;
66 }
67 
CanShutdownCleanly(ShutdownPolicy policy)68 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
69   base::AutoLock ports_lock(ports_lock_);
70 
71   if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
72 #if DCHECK_IS_ON()
73     for (auto entry : ports_) {
74       DVLOG(2) << "Port " << entry.first << " referencing node "
75                << entry.second->peer_node_name << " is blocking shutdown of "
76                << "node " << name_ << " (state=" << entry.second->state << ")";
77     }
78 #endif
79     return ports_.empty();
80   }
81 
82   DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
83 
84   // NOTE: This is not efficient, though it probably doesn't need to be since
85   // relatively few ports should be open during shutdown and shutdown doesn't
86   // need to be blazingly fast.
87   bool can_shutdown = true;
88   for (auto entry : ports_) {
89     base::AutoLock lock(entry.second->lock);
90     if (entry.second->peer_node_name != name_ &&
91         entry.second->state != Port::kReceiving) {
92       can_shutdown = false;
93 #if DCHECK_IS_ON()
94       DVLOG(2) << "Port " << entry.first << " referencing node "
95                << entry.second->peer_node_name << " is blocking shutdown of "
96                << "node " << name_ << " (state=" << entry.second->state << ")";
97 #else
98       // Exit early when not debugging.
99       break;
100 #endif
101     }
102   }
103 
104   return can_shutdown;
105 }
106 
GetPort(const PortName & port_name,PortRef * port_ref)107 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
108   scoped_refptr<Port> port = GetPort(port_name);
109   if (!port)
110     return ERROR_PORT_UNKNOWN;
111 
112   *port_ref = PortRef(port_name, std::move(port));
113   return OK;
114 }
115 
CreateUninitializedPort(PortRef * port_ref)116 int Node::CreateUninitializedPort(PortRef* port_ref) {
117   PortName port_name;
118   delegate_->GenerateRandomPortName(&port_name);
119 
120   scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
121   int rv = AddPortWithName(port_name, port);
122   if (rv != OK)
123     return rv;
124 
125   *port_ref = PortRef(port_name, std::move(port));
126   return OK;
127 }
128 
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)129 int Node::InitializePort(const PortRef& port_ref,
130                          const NodeName& peer_node_name,
131                          const PortName& peer_port_name) {
132   Port* port = port_ref.port();
133 
134   {
135     base::AutoLock lock(port->lock);
136     if (port->state != Port::kUninitialized)
137       return ERROR_PORT_STATE_UNEXPECTED;
138 
139     port->state = Port::kReceiving;
140     port->peer_node_name = peer_node_name;
141     port->peer_port_name = peer_port_name;
142   }
143 
144   delegate_->PortStatusChanged(port_ref);
145 
146   return OK;
147 }
148 
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)149 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
150   int rv;
151 
152   rv = CreateUninitializedPort(port0_ref);
153   if (rv != OK)
154     return rv;
155 
156   rv = CreateUninitializedPort(port1_ref);
157   if (rv != OK)
158     return rv;
159 
160   rv = InitializePort(*port0_ref, name_, port1_ref->name());
161   if (rv != OK)
162     return rv;
163 
164   rv = InitializePort(*port1_ref, name_, port0_ref->name());
165   if (rv != OK)
166     return rv;
167 
168   return OK;
169 }
170 
SetUserData(const PortRef & port_ref,scoped_refptr<UserData> user_data)171 int Node::SetUserData(const PortRef& port_ref,
172                       scoped_refptr<UserData> user_data) {
173   Port* port = port_ref.port();
174 
175   base::AutoLock lock(port->lock);
176   if (port->state == Port::kClosed)
177     return ERROR_PORT_STATE_UNEXPECTED;
178 
179   port->user_data = std::move(user_data);
180 
181   return OK;
182 }
183 
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)184 int Node::GetUserData(const PortRef& port_ref,
185                       scoped_refptr<UserData>* user_data) {
186   Port* port = port_ref.port();
187 
188   base::AutoLock lock(port->lock);
189   if (port->state == Port::kClosed)
190     return ERROR_PORT_STATE_UNEXPECTED;
191 
192   *user_data = port->user_data;
193 
194   return OK;
195 }
196 
ClosePort(const PortRef & port_ref)197 int Node::ClosePort(const PortRef& port_ref) {
198   std::deque<PortName> referenced_port_names;
199 
200   ObserveClosureEventData data;
201 
202   NodeName peer_node_name;
203   PortName peer_port_name;
204   Port* port = port_ref.port();
205   {
206     // We may need to erase the port, which requires ports_lock_ to be held,
207     // but ports_lock_ must be acquired before any individual port locks.
208     base::AutoLock ports_lock(ports_lock_);
209 
210     base::AutoLock lock(port->lock);
211     if (port->state == Port::kUninitialized) {
212       // If the port was not yet initialized, there's nothing interesting to do.
213       ErasePort_Locked(port_ref.name());
214       return OK;
215     }
216 
217     if (port->state != Port::kReceiving)
218       return ERROR_PORT_STATE_UNEXPECTED;
219 
220     port->state = Port::kClosed;
221 
222     // We pass along the sequence number of the last message sent from this
223     // port to allow the peer to have the opportunity to consume all inbound
224     // messages before notifying the embedder that this port is closed.
225     data.last_sequence_num = port->next_sequence_num_to_send - 1;
226 
227     peer_node_name = port->peer_node_name;
228     peer_port_name = port->peer_port_name;
229 
230     // If the port being closed still has unread messages, then we need to take
231     // care to close those ports so as to avoid leaking memory.
232     port->message_queue.GetReferencedPorts(&referenced_port_names);
233 
234     ErasePort_Locked(port_ref.name());
235   }
236 
237   DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
238            << " to " << peer_port_name << "@" << peer_node_name;
239 
240   delegate_->ForwardMessage(
241       peer_node_name,
242       NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
243 
244   for (const auto& name : referenced_port_names) {
245     PortRef ref;
246     if (GetPort(name, &ref) == OK)
247       ClosePort(ref);
248   }
249   return OK;
250 }
251 
GetStatus(const PortRef & port_ref,PortStatus * port_status)252 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
253   Port* port = port_ref.port();
254 
255   base::AutoLock lock(port->lock);
256 
257   if (port->state != Port::kReceiving)
258     return ERROR_PORT_STATE_UNEXPECTED;
259 
260   port_status->has_messages = port->message_queue.HasNextMessage();
261   port_status->receiving_messages = CanAcceptMoreMessages(port);
262   port_status->peer_closed = port->peer_closed;
263   return OK;
264 }
265 
GetMessage(const PortRef & port_ref,ScopedMessage * message,MessageFilter * filter)266 int Node::GetMessage(const PortRef& port_ref,
267                      ScopedMessage* message,
268                      MessageFilter* filter) {
269   *message = nullptr;
270 
271   DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
272 
273   Port* port = port_ref.port();
274   {
275     base::AutoLock lock(port->lock);
276 
277     // This could also be treated like the port being unknown since the
278     // embedder should no longer be referring to a port that has been sent.
279     if (port->state != Port::kReceiving)
280       return ERROR_PORT_STATE_UNEXPECTED;
281 
282     // Let the embedder get messages until there are no more before reporting
283     // that the peer closed its end.
284     if (!CanAcceptMoreMessages(port))
285       return ERROR_PORT_PEER_CLOSED;
286 
287     port->message_queue.GetNextMessage(message, filter);
288   }
289 
290   // Allow referenced ports to trigger PortStatusChanged calls.
291   if (*message) {
292     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
293       const PortName& new_port_name = (*message)->ports()[i];
294       scoped_refptr<Port> new_port = GetPort(new_port_name);
295 
296       DCHECK(new_port) << "Port " << new_port_name << "@" << name_
297                        << " does not exist!";
298 
299       base::AutoLock lock(new_port->lock);
300 
301       DCHECK(new_port->state == Port::kReceiving);
302       new_port->message_queue.set_signalable(true);
303     }
304   }
305 
306   return OK;
307 }
308 
SendMessage(const PortRef & port_ref,ScopedMessage message)309 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
310   int rv = SendMessageInternal(port_ref, &message);
311   if (rv != OK) {
312     // If send failed, close all carried ports. Note that we're careful not to
313     // close the sending port itself if it happened to be one of the encoded
314     // ports (an invalid but possible condition.)
315     for (size_t i = 0; i < message->num_ports(); ++i) {
316       if (message->ports()[i] == port_ref.name())
317         continue;
318 
319       PortRef port;
320       if (GetPort(message->ports()[i], &port) == OK)
321         ClosePort(port);
322     }
323   }
324   return rv;
325 }
326 
AcceptMessage(ScopedMessage message)327 int Node::AcceptMessage(ScopedMessage message) {
328   const EventHeader* header = GetEventHeader(*message);
329   switch (header->type) {
330     case EventType::kUser:
331       return OnUserMessage(std::move(message));
332     case EventType::kPortAccepted:
333       return OnPortAccepted(header->port_name);
334     case EventType::kObserveProxy:
335       return OnObserveProxy(
336           header->port_name,
337           *GetEventData<ObserveProxyEventData>(*message));
338     case EventType::kObserveProxyAck:
339       return OnObserveProxyAck(
340           header->port_name,
341           GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
342     case EventType::kObserveClosure:
343       return OnObserveClosure(
344           header->port_name,
345           GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
346     case EventType::kMergePort:
347       return OnMergePort(header->port_name,
348                          *GetEventData<MergePortEventData>(*message));
349   }
350   return OOPS(ERROR_NOT_IMPLEMENTED);
351 }
352 
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)353 int Node::MergePorts(const PortRef& port_ref,
354                      const NodeName& destination_node_name,
355                      const PortName& destination_port_name) {
356   Port* port = port_ref.port();
357   MergePortEventData data;
358   {
359     base::AutoLock lock(port->lock);
360 
361     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
362              << " to " << destination_port_name << "@" << destination_node_name;
363 
364     // Send the port-to-merge over to the destination node so it can be merged
365     // into the port cycle atomically there.
366     data.new_port_name = port_ref.name();
367     WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
368                  &data.new_port_descriptor);
369   }
370   delegate_->ForwardMessage(
371       destination_node_name,
372       NewInternalMessage(destination_port_name,
373                          EventType::kMergePort, data));
374   return OK;
375 }
376 
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)377 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
378   Port* port0 = port0_ref.port();
379   Port* port1 = port1_ref.port();
380   int rv;
381   {
382     // |ports_lock_| must be held when acquiring overlapping port locks.
383     base::AutoLock ports_lock(ports_lock_);
384     base::AutoLock port0_lock(port0->lock);
385     base::AutoLock port1_lock(port1->lock);
386 
387     DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
388              << " and " << port1_ref.name() << "@" << name_;
389 
390     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
391       rv = ERROR_PORT_STATE_UNEXPECTED;
392     else
393       rv = MergePorts_Locked(port0_ref, port1_ref);
394   }
395 
396   if (rv != OK) {
397     ClosePort(port0_ref);
398     ClosePort(port1_ref);
399   }
400 
401   return rv;
402 }
403 
LostConnectionToNode(const NodeName & node_name)404 int Node::LostConnectionToNode(const NodeName& node_name) {
405   // We can no longer send events to the given node. We also can't expect any
406   // PortAccepted events.
407 
408   DVLOG(1) << "Observing lost connection from node " << name_
409            << " to node " << node_name;
410 
411   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
412   return OK;
413 }
414 
OnUserMessage(ScopedMessage message)415 int Node::OnUserMessage(ScopedMessage message) {
416   PortName port_name = GetEventHeader(*message)->port_name;
417   const auto* event = GetEventData<UserEventData>(*message);
418 
419 #if DCHECK_IS_ON()
420   std::ostringstream ports_buf;
421   for (size_t i = 0; i < message->num_ports(); ++i) {
422     if (i > 0)
423       ports_buf << ",";
424     ports_buf << message->ports()[i];
425   }
426 
427   DVLOG(4) << "AcceptMessage " << event->sequence_num
428              << " [ports=" << ports_buf.str() << "] at "
429              << port_name << "@" << name_;
430 #endif
431 
432   scoped_refptr<Port> port = GetPort(port_name);
433 
434   // Even if this port does not exist, cannot receive anymore messages or is
435   // buffering or proxying messages, we still need these ports to be bound to
436   // this node. When the message is forwarded, these ports will get transferred
437   // following the usual method. If the message cannot be accepted, then the
438   // newly bound ports will simply be closed.
439 
440   for (size_t i = 0; i < message->num_ports(); ++i) {
441     int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
442     if (rv != OK)
443       return rv;
444   }
445 
446   bool has_next_message = false;
447   bool message_accepted = false;
448 
449   if (port) {
450     // We may want to forward messages once the port lock is held, so we must
451     // acquire |ports_lock_| first.
452     base::AutoLock ports_lock(ports_lock_);
453     base::AutoLock lock(port->lock);
454 
455     // Reject spurious messages if we've already received the last expected
456     // message.
457     if (CanAcceptMoreMessages(port.get())) {
458       message_accepted = true;
459       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
460 
461       if (port->state == Port::kBuffering) {
462         has_next_message = false;
463       } else if (port->state == Port::kProxying) {
464         has_next_message = false;
465 
466         // Forward messages. We forward messages in sequential order here so
467         // that we maintain the message queue's notion of next sequence number.
468         // That's useful for the proxy removal process as we can tell when this
469         // port has seen all of the messages it is expected to see.
470         int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
471         if (rv != OK)
472           return rv;
473 
474         MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
475       }
476     }
477   }
478 
479   if (!message_accepted) {
480     DVLOG(2) << "Message not accepted!\n";
481     // Close all newly accepted ports as they are effectively orphaned.
482     for (size_t i = 0; i < message->num_ports(); ++i) {
483       PortRef port_ref;
484       if (GetPort(message->ports()[i], &port_ref) == OK) {
485         ClosePort(port_ref);
486       } else {
487         DLOG(WARNING) << "Cannot close non-existent port!\n";
488       }
489     }
490   } else if (has_next_message) {
491     PortRef port_ref(port_name, port);
492     delegate_->PortStatusChanged(port_ref);
493   }
494 
495   return OK;
496 }
497 
OnPortAccepted(const PortName & port_name)498 int Node::OnPortAccepted(const PortName& port_name) {
499   scoped_refptr<Port> port = GetPort(port_name);
500   if (!port)
501     return ERROR_PORT_UNKNOWN;
502 
503   DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
504            << " pointing to "
505            << port->peer_port_name << "@" << port->peer_node_name;
506 
507   return BeginProxying(PortRef(port_name, std::move(port)));
508 }
509 
OnObserveProxy(const PortName & port_name,const ObserveProxyEventData & event)510 int Node::OnObserveProxy(const PortName& port_name,
511                          const ObserveProxyEventData& event) {
512   if (port_name == kInvalidPortName) {
513     // An ObserveProxy with an invalid target port name is a broadcast used to
514     // inform ports when their peer (which was itself a proxy) has become
515     // defunct due to unexpected node disconnection.
516     //
517     // Receiving ports affected by this treat it as equivalent to peer closure.
518     // Proxies affected by this can be removed and will in turn broadcast their
519     // own death with a similar message.
520     CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
521     CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
522     DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
523     return OK;
524   }
525 
526   // The port may have already been closed locally, in which case the
527   // ObserveClosure message will contain the last_sequence_num field.
528   // We can then silently ignore this message.
529   scoped_refptr<Port> port = GetPort(port_name);
530   if (!port) {
531     DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
532     return OK;
533   }
534 
535   DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
536            << event.proxy_port_name << "@"
537            << event.proxy_node_name << " pointing to "
538            << event.proxy_to_port_name << "@"
539            << event.proxy_to_node_name;
540 
541   {
542     base::AutoLock lock(port->lock);
543 
544     if (port->peer_node_name == event.proxy_node_name &&
545         port->peer_port_name == event.proxy_port_name) {
546       if (port->state == Port::kReceiving) {
547         port->peer_node_name = event.proxy_to_node_name;
548         port->peer_port_name = event.proxy_to_port_name;
549 
550         ObserveProxyAckEventData ack;
551         ack.last_sequence_num = port->next_sequence_num_to_send - 1;
552 
553         delegate_->ForwardMessage(
554             event.proxy_node_name,
555             NewInternalMessage(event.proxy_port_name,
556                                EventType::kObserveProxyAck,
557                                ack));
558       } else {
559         // As a proxy ourselves, we don't know how to honor the ObserveProxy
560         // event or to populate the last_sequence_num field of ObserveProxyAck.
561         // Afterall, another port could be sending messages to our peer now
562         // that we've sent out our own ObserveProxy event.  Instead, we will
563         // send an ObserveProxyAck indicating that the ObserveProxy event
564         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
565         // However, this has to be done after we are removed as a proxy.
566         // Otherwise, we might just find ourselves back here again, which
567         // would be akin to a busy loop.
568 
569         DVLOG(2) << "Delaying ObserveProxyAck to "
570                  << event.proxy_port_name << "@" << event.proxy_node_name;
571 
572         ObserveProxyAckEventData ack;
573         ack.last_sequence_num = kInvalidSequenceNum;
574 
575         port->send_on_proxy_removal.reset(
576             new std::pair<NodeName, ScopedMessage>(
577                 event.proxy_node_name,
578                 NewInternalMessage(event.proxy_port_name,
579                                    EventType::kObserveProxyAck,
580                                    ack)));
581       }
582     } else {
583       // Forward this event along to our peer. Eventually, it should find the
584       // port referring to the proxy.
585       delegate_->ForwardMessage(
586           port->peer_node_name,
587           NewInternalMessage(port->peer_port_name,
588                              EventType::kObserveProxy,
589                              event));
590     }
591   }
592   return OK;
593 }
594 
OnObserveProxyAck(const PortName & port_name,uint64_t last_sequence_num)595 int Node::OnObserveProxyAck(const PortName& port_name,
596                             uint64_t last_sequence_num) {
597   DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
598            << " (last_sequence_num=" << last_sequence_num << ")";
599 
600   scoped_refptr<Port> port = GetPort(port_name);
601   if (!port)
602     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
603                                 // this is not an "Oops".
604 
605   {
606     base::AutoLock lock(port->lock);
607 
608     if (port->state != Port::kProxying)
609       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
610 
611     if (last_sequence_num == kInvalidSequenceNum) {
612       // Send again.
613       InitiateProxyRemoval(LockedPort(port.get()), port_name);
614       return OK;
615     }
616 
617     // We can now remove this port once we have received and forwarded the last
618     // message addressed to this port.
619     port->remove_proxy_on_last_message = true;
620     port->last_sequence_num_to_receive = last_sequence_num;
621   }
622   TryRemoveProxy(PortRef(port_name, std::move(port)));
623   return OK;
624 }
625 
OnObserveClosure(const PortName & port_name,uint64_t last_sequence_num)626 int Node::OnObserveClosure(const PortName& port_name,
627                            uint64_t last_sequence_num) {
628   // OK if the port doesn't exist, as it may have been closed already.
629   scoped_refptr<Port> port = GetPort(port_name);
630   if (!port)
631     return OK;
632 
633   // This message tells the port that it should no longer expect more messages
634   // beyond last_sequence_num. This message is forwarded along until we reach
635   // the receiving end, and this message serves as an equivalent to
636   // ObserveProxyAck.
637 
638   bool notify_delegate = false;
639   ObserveClosureEventData forwarded_data;
640   NodeName peer_node_name;
641   PortName peer_port_name;
642   bool try_remove_proxy = false;
643   {
644     base::AutoLock lock(port->lock);
645 
646     port->peer_closed = true;
647     port->last_sequence_num_to_receive = last_sequence_num;
648 
649     DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
650              << " (state=" << port->state << ") pointing to "
651              << port->peer_port_name << "@" << port->peer_node_name
652              << " (last_sequence_num=" << last_sequence_num << ")";
653 
654     // We always forward ObserveClosure, even beyond the receiving port which
655     // cares about it. This ensures that any dead-end proxies beyond that port
656     // are notified to remove themselves.
657 
658     if (port->state == Port::kReceiving) {
659       notify_delegate = true;
660 
661       // When forwarding along the other half of the port cycle, this will only
662       // reach dead-end proxies. Tell them we've sent our last message so they
663       // can go away.
664       //
665       // TODO: Repurposing ObserveClosure for this has the desired result but
666       // may be semantically confusing since the forwarding port is not actually
667       // closed. Consider replacing this with a new event type.
668       forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
669     } else {
670       // We haven't yet reached the receiving peer of the closed port, so
671       // forward the message along as-is.
672       forwarded_data.last_sequence_num = last_sequence_num;
673 
674       // See about removing the port if it is a proxy as our peer won't be able
675       // to participate in proxy removal.
676       port->remove_proxy_on_last_message = true;
677       if (port->state == Port::kProxying)
678         try_remove_proxy = true;
679     }
680 
681     DVLOG(2) << "Forwarding ObserveClosure from "
682              << port_name << "@" << name_ << " to peer "
683              << port->peer_port_name << "@" << port->peer_node_name
684              << " (last_sequence_num=" << forwarded_data.last_sequence_num
685              << ")";
686 
687     peer_node_name = port->peer_node_name;
688     peer_port_name = port->peer_port_name;
689   }
690   if (try_remove_proxy)
691     TryRemoveProxy(PortRef(port_name, port));
692 
693   delegate_->ForwardMessage(
694       peer_node_name,
695       NewInternalMessage(peer_port_name, EventType::kObserveClosure,
696                          forwarded_data));
697 
698   if (notify_delegate) {
699     PortRef port_ref(port_name, std::move(port));
700     delegate_->PortStatusChanged(port_ref);
701   }
702   return OK;
703 }
704 
OnMergePort(const PortName & port_name,const MergePortEventData & event)705 int Node::OnMergePort(const PortName& port_name,
706                       const MergePortEventData& event) {
707   scoped_refptr<Port> port = GetPort(port_name);
708 
709   DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
710            << (port ? port->state : -1) << ") merging with proxy "
711            << event.new_port_name
712            << "@" << name_ << " pointing to "
713            << event.new_port_descriptor.peer_port_name << "@"
714            << event.new_port_descriptor.peer_node_name << " referred by "
715            << event.new_port_descriptor.referring_port_name << "@"
716            << event.new_port_descriptor.referring_node_name;
717 
718   bool close_target_port = false;
719   bool close_new_port = false;
720 
721   // Accept the new port. This is now the receiving end of the other port cycle
722   // to be merged with ours.
723   int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
724   if (rv != OK) {
725     close_target_port = true;
726   } else if (port) {
727     // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
728     // needs to hold |ports_lock_|. We also acquire multiple port locks within.
729     base::AutoLock ports_lock(ports_lock_);
730     base::AutoLock lock(port->lock);
731 
732     if (port->state != Port::kReceiving) {
733       close_new_port = true;
734     } else {
735       scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
736       base::AutoLock new_port_lock(new_port->lock);
737       DCHECK(new_port->state == Port::kReceiving);
738 
739       // Both ports are locked. Now all we have to do is swap their peer
740       // information and set them up as proxies.
741 
742       PortRef port0_ref(port_name, port);
743       PortRef port1_ref(event.new_port_name, new_port);
744       int rv = MergePorts_Locked(port0_ref, port1_ref);
745       if (rv == OK)
746         return rv;
747 
748       close_new_port = true;
749       close_target_port = true;
750     }
751   } else {
752     close_new_port = true;
753   }
754 
755   if (close_target_port) {
756     PortRef target_port;
757     rv = GetPort(port_name, &target_port);
758     DCHECK(rv == OK);
759 
760     ClosePort(target_port);
761   }
762 
763   if (close_new_port) {
764     PortRef new_port;
765     rv = GetPort(event.new_port_name, &new_port);
766     DCHECK(rv == OK);
767 
768     ClosePort(new_port);
769   }
770 
771   return ERROR_PORT_STATE_UNEXPECTED;
772 }
773 
AddPortWithName(const PortName & port_name,scoped_refptr<Port> port)774 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
775   base::AutoLock lock(ports_lock_);
776 
777   if (!ports_.insert(std::make_pair(port_name, std::move(port))).second)
778     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
779 
780   DVLOG(2) << "Created port " << port_name << "@" << name_;
781   return OK;
782 }
783 
ErasePort(const PortName & port_name)784 void Node::ErasePort(const PortName& port_name) {
785   base::AutoLock lock(ports_lock_);
786   ErasePort_Locked(port_name);
787 }
788 
ErasePort_Locked(const PortName & port_name)789 void Node::ErasePort_Locked(const PortName& port_name) {
790   ports_lock_.AssertAcquired();
791   ports_.erase(port_name);
792   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
793 }
794 
GetPort(const PortName & port_name)795 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
796   base::AutoLock lock(ports_lock_);
797   return GetPort_Locked(port_name);
798 }
799 
GetPort_Locked(const PortName & port_name)800 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
801   ports_lock_.AssertAcquired();
802   auto iter = ports_.find(port_name);
803   if (iter == ports_.end())
804     return nullptr;
805 
806 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
807   // Workaround for https://crbug.com/665869.
808   base::subtle::MemoryBarrier();
809 #endif
810 
811   return iter->second;
812 }
813 
SendMessageInternal(const PortRef & port_ref,ScopedMessage * message)814 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
815   ScopedMessage& m = *message;
816   for (size_t i = 0; i < m->num_ports(); ++i) {
817     if (m->ports()[i] == port_ref.name())
818       return ERROR_PORT_CANNOT_SEND_SELF;
819   }
820 
821   Port* port = port_ref.port();
822   NodeName peer_node_name;
823   {
824     // We must acquire |ports_lock_| before grabbing any port locks, because
825     // WillSendMessage_Locked may need to lock multiple ports out of order.
826     base::AutoLock ports_lock(ports_lock_);
827     base::AutoLock lock(port->lock);
828 
829     if (port->state != Port::kReceiving)
830       return ERROR_PORT_STATE_UNEXPECTED;
831 
832     if (port->peer_closed)
833       return ERROR_PORT_PEER_CLOSED;
834 
835     int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
836     if (rv != OK)
837       return rv;
838 
839     // Beyond this point there's no sense in returning anything but OK. Even if
840     // message forwarding or acceptance fails, there's nothing the embedder can
841     // do to recover. Assume that failure beyond this point must be treated as a
842     // transport failure.
843 
844     peer_node_name = port->peer_node_name;
845   }
846 
847   if (peer_node_name != name_) {
848     delegate_->ForwardMessage(peer_node_name, std::move(m));
849     return OK;
850   }
851 
852   int rv = AcceptMessage(std::move(m));
853   if (rv != OK) {
854     // See comment above for why we don't return an error in this case.
855     DVLOG(2) << "AcceptMessage failed: " << rv;
856   }
857 
858   return OK;
859 }
860 
MergePorts_Locked(const PortRef & port0_ref,const PortRef & port1_ref)861 int Node::MergePorts_Locked(const PortRef& port0_ref,
862                             const PortRef& port1_ref) {
863   Port* port0 = port0_ref.port();
864   Port* port1 = port1_ref.port();
865 
866   ports_lock_.AssertAcquired();
867   port0->lock.AssertAcquired();
868   port1->lock.AssertAcquired();
869 
870   CHECK(port0->state == Port::kReceiving);
871   CHECK(port1->state == Port::kReceiving);
872 
873   // Ports cannot be merged with their own receiving peer!
874   if (port0->peer_node_name == name_ &&
875       port0->peer_port_name == port1_ref.name())
876     return ERROR_PORT_STATE_UNEXPECTED;
877 
878   if (port1->peer_node_name == name_ &&
879       port1->peer_port_name == port0_ref.name())
880     return ERROR_PORT_STATE_UNEXPECTED;
881 
882   // Only merge if both ports have never sent a message.
883   if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
884       port1->next_sequence_num_to_send == kInitialSequenceNum) {
885     // Swap the ports' peer information and switch them both into buffering
886     // (eventually proxying) mode.
887 
888     std::swap(port0->peer_node_name, port1->peer_node_name);
889     std::swap(port0->peer_port_name, port1->peer_port_name);
890 
891     port0->state = Port::kBuffering;
892     if (port0->peer_closed)
893       port0->remove_proxy_on_last_message = true;
894 
895     port1->state = Port::kBuffering;
896     if (port1->peer_closed)
897       port1->remove_proxy_on_last_message = true;
898 
899     int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
900     int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
901 
902     if (rv1 == OK && rv2 == OK) {
903       // If either merged port had a closed peer, its new peer needs to be
904       // informed of this.
905       if (port1->peer_closed) {
906         ObserveClosureEventData data;
907         data.last_sequence_num = port0->last_sequence_num_to_receive;
908         delegate_->ForwardMessage(
909             port0->peer_node_name,
910             NewInternalMessage(port0->peer_port_name,
911                                EventType::kObserveClosure, data));
912       }
913 
914       if (port0->peer_closed) {
915         ObserveClosureEventData data;
916         data.last_sequence_num = port1->last_sequence_num_to_receive;
917         delegate_->ForwardMessage(
918             port1->peer_node_name,
919             NewInternalMessage(port1->peer_port_name,
920                                EventType::kObserveClosure, data));
921       }
922 
923       return OK;
924     }
925 
926     // If either proxy failed to initialize (e.g. had undeliverable messages
927     // or ended up in a bad state somehow), we keep the system in a consistent
928     // state by undoing the peer swap.
929     std::swap(port0->peer_node_name, port1->peer_node_name);
930     std::swap(port0->peer_port_name, port1->peer_port_name);
931     port0->remove_proxy_on_last_message = false;
932     port1->remove_proxy_on_last_message = false;
933     port0->state = Port::kReceiving;
934     port1->state = Port::kReceiving;
935   }
936 
937   return ERROR_PORT_STATE_UNEXPECTED;
938 }
939 
WillSendPort(const LockedPort & port,const NodeName & to_node_name,PortName * port_name,PortDescriptor * port_descriptor)940 void Node::WillSendPort(const LockedPort& port,
941                         const NodeName& to_node_name,
942                         PortName* port_name,
943                         PortDescriptor* port_descriptor) {
944   port->lock.AssertAcquired();
945 
946   PortName local_port_name = *port_name;
947 
948   PortName new_port_name;
949   delegate_->GenerateRandomPortName(&new_port_name);
950 
951   // Make sure we don't send messages to the new peer until after we know it
952   // exists. In the meantime, just buffer messages locally.
953   DCHECK(port->state == Port::kReceiving);
954   port->state = Port::kBuffering;
955 
956   // If we already know our peer is closed, we already know this proxy can
957   // be removed once it receives and forwards its last expected message.
958   if (port->peer_closed)
959     port->remove_proxy_on_last_message = true;
960 
961   *port_name = new_port_name;
962 
963   port_descriptor->peer_node_name = port->peer_node_name;
964   port_descriptor->peer_port_name = port->peer_port_name;
965   port_descriptor->referring_node_name = name_;
966   port_descriptor->referring_port_name = local_port_name;
967   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
968   port_descriptor->next_sequence_num_to_receive =
969       port->message_queue.next_sequence_num();
970   port_descriptor->last_sequence_num_to_receive =
971       port->last_sequence_num_to_receive;
972   port_descriptor->peer_closed = port->peer_closed;
973   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
974 
975   // Configure the local port to point to the new port.
976   port->peer_node_name = to_node_name;
977   port->peer_port_name = new_port_name;
978 }
979 
AcceptPort(const PortName & port_name,const PortDescriptor & port_descriptor)980 int Node::AcceptPort(const PortName& port_name,
981                      const PortDescriptor& port_descriptor) {
982   scoped_refptr<Port> port = make_scoped_refptr(
983       new Port(port_descriptor.next_sequence_num_to_send,
984                port_descriptor.next_sequence_num_to_receive));
985   port->state = Port::kReceiving;
986   port->peer_node_name = port_descriptor.peer_node_name;
987   port->peer_port_name = port_descriptor.peer_port_name;
988   port->last_sequence_num_to_receive =
989       port_descriptor.last_sequence_num_to_receive;
990   port->peer_closed = port_descriptor.peer_closed;
991 
992   DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
993            << port->peer_closed << "; last_sequence_num_to_receive="
994            << port->last_sequence_num_to_receive << "]";
995 
996   // A newly accepted port is not signalable until the message referencing the
997   // new port finds its way to the consumer (see GetMessage).
998   port->message_queue.set_signalable(false);
999 
1000   int rv = AddPortWithName(port_name, std::move(port));
1001   if (rv != OK)
1002     return rv;
1003 
1004   // Allow referring port to forward messages.
1005   delegate_->ForwardMessage(
1006       port_descriptor.referring_node_name,
1007       NewInternalMessage(port_descriptor.referring_port_name,
1008                          EventType::kPortAccepted));
1009   return OK;
1010 }
1011 
WillSendMessage_Locked(const LockedPort & port,const PortName & port_name,Message * message)1012 int Node::WillSendMessage_Locked(const LockedPort& port,
1013                                  const PortName& port_name,
1014                                  Message* message) {
1015   ports_lock_.AssertAcquired();
1016   port->lock.AssertAcquired();
1017 
1018   DCHECK(message);
1019 
1020   // Messages may already have a sequence number if they're being forwarded
1021   // by a proxy. Otherwise, use the next outgoing sequence number.
1022   uint64_t* sequence_num =
1023       &GetMutableEventData<UserEventData>(message)->sequence_num;
1024   if (*sequence_num == 0)
1025     *sequence_num = port->next_sequence_num_to_send++;
1026 
1027 #if DCHECK_IS_ON()
1028   std::ostringstream ports_buf;
1029   for (size_t i = 0; i < message->num_ports(); ++i) {
1030     if (i > 0)
1031       ports_buf << ",";
1032     ports_buf << message->ports()[i];
1033   }
1034 #endif
1035 
1036   if (message->num_ports() > 0) {
1037     // Note: Another thread could be trying to send the same ports, so we need
1038     // to ensure that they are ours to send before we mutate their state.
1039 
1040     std::vector<scoped_refptr<Port>> ports;
1041     ports.resize(message->num_ports());
1042 
1043     {
1044       for (size_t i = 0; i < message->num_ports(); ++i) {
1045         ports[i] = GetPort_Locked(message->ports()[i]);
1046         DCHECK(ports[i]);
1047 
1048         ports[i]->lock.Acquire();
1049         int error = OK;
1050         if (ports[i]->state != Port::kReceiving)
1051           error = ERROR_PORT_STATE_UNEXPECTED;
1052         else if (message->ports()[i] == port->peer_port_name)
1053           error = ERROR_PORT_CANNOT_SEND_PEER;
1054 
1055         if (error != OK) {
1056           // Oops, we cannot send this port.
1057           for (size_t j = 0; j <= i; ++j)
1058             ports[i]->lock.Release();
1059           // Backpedal on the sequence number.
1060           port->next_sequence_num_to_send--;
1061           return error;
1062         }
1063       }
1064     }
1065 
1066     PortDescriptor* port_descriptors =
1067         GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
1068 
1069     for (size_t i = 0; i < message->num_ports(); ++i) {
1070       WillSendPort(LockedPort(ports[i].get()),
1071                    port->peer_node_name,
1072                    message->mutable_ports() + i,
1073                    port_descriptors + i);
1074     }
1075 
1076     for (size_t i = 0; i < message->num_ports(); ++i)
1077       ports[i]->lock.Release();
1078   }
1079 
1080 #if DCHECK_IS_ON()
1081   DVLOG(4) << "Sending message "
1082            << GetEventData<UserEventData>(*message)->sequence_num
1083            << " [ports=" << ports_buf.str() << "]"
1084            << " from " << port_name << "@" << name_
1085            << " to " << port->peer_port_name << "@" << port->peer_node_name;
1086 #endif
1087 
1088   GetMutableEventHeader(message)->port_name = port->peer_port_name;
1089   return OK;
1090 }
1091 
BeginProxying_Locked(const LockedPort & port,const PortName & port_name)1092 int Node::BeginProxying_Locked(const LockedPort& port,
1093                                const PortName& port_name) {
1094   ports_lock_.AssertAcquired();
1095   port->lock.AssertAcquired();
1096 
1097   if (port->state != Port::kBuffering)
1098     return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1099 
1100   port->state = Port::kProxying;
1101 
1102   int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1103   if (rv != OK)
1104     return rv;
1105 
1106   // We may have observed closure while buffering. In that case, we can advance
1107   // to removing the proxy without sending out an ObserveProxy message. We
1108   // already know the last expected message, etc.
1109 
1110   if (port->remove_proxy_on_last_message) {
1111     MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1112 
1113     // Make sure we propagate closure to our current peer.
1114     ObserveClosureEventData data;
1115     data.last_sequence_num = port->last_sequence_num_to_receive;
1116     delegate_->ForwardMessage(
1117         port->peer_node_name,
1118         NewInternalMessage(port->peer_port_name,
1119                            EventType::kObserveClosure, data));
1120   } else {
1121     InitiateProxyRemoval(LockedPort(port), port_name);
1122   }
1123 
1124   return OK;
1125 }
1126 
BeginProxying(PortRef port_ref)1127 int Node::BeginProxying(PortRef port_ref) {
1128   Port* port = port_ref.port();
1129   {
1130     base::AutoLock ports_lock(ports_lock_);
1131     base::AutoLock lock(port->lock);
1132 
1133     if (port->state != Port::kBuffering)
1134       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1135 
1136     port->state = Port::kProxying;
1137 
1138     int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1139     if (rv != OK)
1140       return rv;
1141   }
1142 
1143   bool should_remove;
1144   NodeName peer_node_name;
1145   ScopedMessage closure_message;
1146   {
1147     base::AutoLock lock(port->lock);
1148     if (port->state != Port::kProxying)
1149       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1150 
1151     should_remove = port->remove_proxy_on_last_message;
1152     if (should_remove) {
1153       // Make sure we propagate closure to our current peer.
1154       ObserveClosureEventData data;
1155       data.last_sequence_num = port->last_sequence_num_to_receive;
1156       peer_node_name = port->peer_node_name;
1157       closure_message = NewInternalMessage(port->peer_port_name,
1158                                            EventType::kObserveClosure, data);
1159     } else {
1160       InitiateProxyRemoval(LockedPort(port), port_ref.name());
1161     }
1162   }
1163 
1164   if (should_remove) {
1165     TryRemoveProxy(port_ref);
1166     delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
1167   }
1168 
1169   return OK;
1170 }
1171 
ForwardMessages_Locked(const LockedPort & port,const PortName & port_name)1172 int Node::ForwardMessages_Locked(const LockedPort& port,
1173                                  const PortName &port_name) {
1174   ports_lock_.AssertAcquired();
1175   port->lock.AssertAcquired();
1176 
1177   for (;;) {
1178     ScopedMessage message;
1179     port->message_queue.GetNextMessage(&message, nullptr);
1180     if (!message)
1181       break;
1182 
1183     int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1184     if (rv != OK)
1185       return rv;
1186 
1187     delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1188   }
1189   return OK;
1190 }
1191 
InitiateProxyRemoval(const LockedPort & port,const PortName & port_name)1192 void Node::InitiateProxyRemoval(const LockedPort& port,
1193                                 const PortName& port_name) {
1194   port->lock.AssertAcquired();
1195 
1196   // To remove this node, we start by notifying the connected graph that we are
1197   // a proxy. This allows whatever port is referencing this node to skip it.
1198   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1199   // the peer was closed in the meantime).
1200 
1201   ObserveProxyEventData data;
1202   data.proxy_node_name = name_;
1203   data.proxy_port_name = port_name;
1204   data.proxy_to_node_name = port->peer_node_name;
1205   data.proxy_to_port_name = port->peer_port_name;
1206 
1207   delegate_->ForwardMessage(
1208       port->peer_node_name,
1209       NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1210 }
1211 
MaybeRemoveProxy_Locked(const LockedPort & port,const PortName & port_name)1212 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1213                                    const PortName& port_name) {
1214   // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
1215   ports_lock_.AssertAcquired();
1216   port->lock.AssertAcquired();
1217 
1218   DCHECK(port->state == Port::kProxying);
1219 
1220   // Make sure we have seen ObserveProxyAck before removing the port.
1221   if (!port->remove_proxy_on_last_message)
1222     return;
1223 
1224   if (!CanAcceptMoreMessages(port.get())) {
1225     // This proxy port is done. We can now remove it!
1226     ErasePort_Locked(port_name);
1227 
1228     if (port->send_on_proxy_removal) {
1229       NodeName to_node = port->send_on_proxy_removal->first;
1230       ScopedMessage& message = port->send_on_proxy_removal->second;
1231 
1232       delegate_->ForwardMessage(to_node, std::move(message));
1233       port->send_on_proxy_removal.reset();
1234     }
1235   } else {
1236     DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1237              << " now; waiting for more messages";
1238   }
1239 }
1240 
TryRemoveProxy(PortRef port_ref)1241 void Node::TryRemoveProxy(PortRef port_ref) {
1242   Port* port = port_ref.port();
1243   bool should_erase = false;
1244   ScopedMessage msg;
1245   NodeName to_node;
1246   {
1247     base::AutoLock lock(port->lock);
1248 
1249     // Port already removed. Nothing to do.
1250     if (port->state == Port::kClosed)
1251       return;
1252 
1253     DCHECK(port->state == Port::kProxying);
1254 
1255     // Make sure we have seen ObserveProxyAck before removing the port.
1256     if (!port->remove_proxy_on_last_message)
1257       return;
1258 
1259     if (!CanAcceptMoreMessages(port)) {
1260       // This proxy port is done. We can now remove it!
1261       should_erase = true;
1262 
1263       if (port->send_on_proxy_removal) {
1264         to_node = port->send_on_proxy_removal->first;
1265         msg = std::move(port->send_on_proxy_removal->second);
1266         port->send_on_proxy_removal.reset();
1267       }
1268     } else {
1269       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1270                << " now; waiting for more messages";
1271     }
1272   }
1273 
1274   if (should_erase)
1275     ErasePort(port_ref.name());
1276 
1277   if (msg)
1278     delegate_->ForwardMessage(to_node, std::move(msg));
1279 }
1280 
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1281 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1282                                    const PortName& port_name) {
1283   // Wipes out all ports whose peer node matches |node_name| and whose peer port
1284   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1285   // node is matched.
1286 
1287   std::vector<PortRef> ports_to_notify;
1288   std::vector<PortName> dead_proxies_to_broadcast;
1289   std::deque<PortName> referenced_port_names;
1290 
1291   {
1292     base::AutoLock ports_lock(ports_lock_);
1293 
1294     for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1295       Port* port = iter->second.get();
1296       {
1297         base::AutoLock port_lock(port->lock);
1298 
1299         if (port->peer_node_name == node_name &&
1300               (port_name == kInvalidPortName ||
1301                     port->peer_port_name == port_name)) {
1302           if (!port->peer_closed) {
1303             // Treat this as immediate peer closure. It's an exceptional
1304             // condition akin to a broken pipe, so we don't care about losing
1305             // messages.
1306 
1307             port->peer_closed = true;
1308             port->last_sequence_num_to_receive =
1309                 port->message_queue.next_sequence_num() - 1;
1310 
1311             if (port->state == Port::kReceiving)
1312               ports_to_notify.push_back(PortRef(iter->first, port));
1313           }
1314 
1315           // We don't expect to forward any further messages, and we don't
1316           // expect to receive a Port{Accepted,Rejected} event. Because we're
1317           // a proxy with no active peer, we cannot use the normal proxy removal
1318           // procedure of forward-propagating an ObserveProxy. Instead we
1319           // broadcast our own death so it can be back-propagated. This is
1320           // inefficient but rare.
1321           if (port->state != Port::kReceiving) {
1322             dead_proxies_to_broadcast.push_back(iter->first);
1323             iter->second->message_queue.GetReferencedPorts(
1324                 &referenced_port_names);
1325           }
1326         }
1327       }
1328     }
1329 
1330     for (const auto& proxy_name : dead_proxies_to_broadcast) {
1331       ports_.erase(proxy_name);
1332       DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1333     }
1334   }
1335 
1336   // Wake up any receiving ports who have just observed simulated peer closure.
1337   for (const auto& port : ports_to_notify)
1338     delegate_->PortStatusChanged(port);
1339 
1340   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1341     // Broadcast an event signifying that this proxy is no longer functioning.
1342     ObserveProxyEventData event;
1343     event.proxy_node_name = name_;
1344     event.proxy_port_name = proxy_name;
1345     event.proxy_to_node_name = kInvalidNodeName;
1346     event.proxy_to_port_name = kInvalidPortName;
1347     delegate_->BroadcastMessage(NewInternalMessage(
1348         kInvalidPortName, EventType::kObserveProxy, event));
1349 
1350     // Also process death locally since the port that points this closed one
1351     // could be on the current node.
1352     // Note: Although this is recursive, only a single port is involved which
1353     // limits the expected branching to 1.
1354     DestroyAllPortsWithPeer(name_, proxy_name);
1355   }
1356 
1357   // Close any ports referenced by the closed proxies.
1358   for (const auto& name : referenced_port_names) {
1359     PortRef ref;
1360     if (GetPort(name, &ref) == OK)
1361       ClosePort(ref);
1362   }
1363 }
1364 
NewInternalMessage_Helper(const PortName & port_name,const EventType & type,const void * data,size_t num_data_bytes)1365 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1366                                               const EventType& type,
1367                                               const void* data,
1368                                               size_t num_data_bytes) {
1369   ScopedMessage message;
1370   delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1371 
1372   EventHeader* header = GetMutableEventHeader(message.get());
1373   header->port_name = port_name;
1374   header->type = type;
1375   header->padding = 0;
1376 
1377   if (num_data_bytes)
1378     memcpy(header + 1, data, num_data_bytes);
1379 
1380   return message;
1381 }
1382 
1383 }  // namespace ports
1384 }  // namespace edk
1385 }  // namespace mojo
1386