1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/ports/node.h"
6 
7 #include <string.h>
8 
9 #include <utility>
10 
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/synchronization/lock.h"
14 #include "mojo/edk/system/ports/node_delegate.h"
15 
16 namespace mojo {
17 namespace edk {
18 namespace ports {
19 
20 namespace {
21 
DebugError(const char * message,int error_code)22 int DebugError(const char* message, int error_code) {
23   CHECK(false) << "Oops: " << message;
24   return error_code;
25 }
26 
27 #define OOPS(x) DebugError(#x, x)
28 
CanAcceptMoreMessages(const Port * port)29 bool CanAcceptMoreMessages(const Port* port) {
30   // Have we already doled out the last message (i.e., do we expect to NOT
31   // receive further messages)?
32   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
33   if (port->state == Port::kClosed)
34     return false;
35   if (port->peer_closed || port->remove_proxy_on_last_message) {
36     if (port->last_sequence_num_to_receive == next_sequence_num - 1)
37       return false;
38   }
39   return true;
40 }
41 
42 }  // namespace
43 
44 class Node::LockedPort {
45  public:
LockedPort(Port * port)46   explicit LockedPort(Port* port) : port_(port) {
47     port_->lock.AssertAcquired();
48   }
49 
get() const50   Port* get() const { return port_; }
operator ->() const51   Port* operator->() const { return port_; }
52 
53  private:
54   Port* const port_;
55 };
56 
Node(const NodeName & name,NodeDelegate * delegate)57 Node::Node(const NodeName& name, NodeDelegate* delegate)
58     : name_(name),
59       delegate_(delegate) {
60 }
61 
~Node()62 Node::~Node() {
63   if (!ports_.empty())
64     DLOG(WARNING) << "Unclean shutdown for node " << name_;
65 }
66 
CanShutdownCleanly(bool allow_local_ports)67 bool Node::CanShutdownCleanly(bool allow_local_ports) {
68   base::AutoLock ports_lock(ports_lock_);
69 
70   if (!allow_local_ports) {
71 #if DCHECK_IS_ON()
72     for (auto entry : ports_) {
73       DVLOG(2) << "Port " << entry.first << " referencing node "
74                << entry.second->peer_node_name << " is blocking shutdown of "
75                << "node " << name_ << " (state=" << entry.second->state << ")";
76     }
77 #endif
78     return ports_.empty();
79   }
80 
81   // NOTE: This is not efficient, though it probably doesn't need to be since
82   // relatively few ports should be open during shutdown and shutdown doesn't
83   // need to be blazingly fast.
84   bool can_shutdown = true;
85   for (auto entry : ports_) {
86     base::AutoLock lock(entry.second->lock);
87     if (entry.second->peer_node_name != name_ &&
88         entry.second->state != Port::kReceiving) {
89       can_shutdown = false;
90 #if DCHECK_IS_ON()
91       DVLOG(2) << "Port " << entry.first << " referencing node "
92                << entry.second->peer_node_name << " is blocking shutdown of "
93                << "node " << name_ << " (state=" << entry.second->state << ")";
94 #else
95       // Exit early when not debugging.
96       break;
97 #endif
98     }
99   }
100 
101   return can_shutdown;
102 }
103 
GetPort(const PortName & port_name,PortRef * port_ref)104 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
105   scoped_refptr<Port> port = GetPort(port_name);
106   if (!port)
107     return ERROR_PORT_UNKNOWN;
108 
109   *port_ref = PortRef(port_name, std::move(port));
110   return OK;
111 }
112 
CreateUninitializedPort(PortRef * port_ref)113 int Node::CreateUninitializedPort(PortRef* port_ref) {
114   PortName port_name;
115   delegate_->GenerateRandomPortName(&port_name);
116 
117   scoped_refptr<Port> port = make_scoped_refptr(new Port(kInitialSequenceNum,
118                                                          kInitialSequenceNum));
119   int rv = AddPortWithName(port_name, port);
120   if (rv != OK)
121     return rv;
122 
123   *port_ref = PortRef(port_name, std::move(port));
124   return OK;
125 }
126 
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)127 int Node::InitializePort(const PortRef& port_ref,
128                          const NodeName& peer_node_name,
129                          const PortName& peer_port_name) {
130   Port* port = port_ref.port();
131 
132   {
133     base::AutoLock lock(port->lock);
134     if (port->state != Port::kUninitialized)
135       return ERROR_PORT_STATE_UNEXPECTED;
136 
137     port->state = Port::kReceiving;
138     port->peer_node_name = peer_node_name;
139     port->peer_port_name = peer_port_name;
140   }
141 
142   delegate_->PortStatusChanged(port_ref);
143 
144   return OK;
145 }
146 
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)147 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
148   int rv;
149 
150   rv = CreateUninitializedPort(port0_ref);
151   if (rv != OK)
152     return rv;
153 
154   rv = CreateUninitializedPort(port1_ref);
155   if (rv != OK)
156     return rv;
157 
158   rv = InitializePort(*port0_ref, name_, port1_ref->name());
159   if (rv != OK)
160     return rv;
161 
162   rv = InitializePort(*port1_ref, name_, port0_ref->name());
163   if (rv != OK)
164     return rv;
165 
166   return OK;
167 }
168 
SetUserData(const PortRef & port_ref,const scoped_refptr<UserData> & user_data)169 int Node::SetUserData(const PortRef& port_ref,
170                       const scoped_refptr<UserData>& user_data) {
171   Port* port = port_ref.port();
172 
173   base::AutoLock lock(port->lock);
174   if (port->state == Port::kClosed)
175     return ERROR_PORT_STATE_UNEXPECTED;
176 
177   port->user_data = std::move(user_data);
178 
179   return OK;
180 }
181 
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)182 int Node::GetUserData(const PortRef& port_ref,
183                       scoped_refptr<UserData>* user_data) {
184   Port* port = port_ref.port();
185 
186   base::AutoLock lock(port->lock);
187   if (port->state == Port::kClosed)
188     return ERROR_PORT_STATE_UNEXPECTED;
189 
190   *user_data = port->user_data;
191 
192   return OK;
193 }
194 
ClosePort(const PortRef & port_ref)195 int Node::ClosePort(const PortRef& port_ref) {
196   std::deque<PortName> referenced_port_names;
197 
198   ObserveClosureEventData data;
199 
200   NodeName peer_node_name;
201   PortName peer_port_name;
202   Port* port = port_ref.port();
203   {
204     // We may need to erase the port, which requires ports_lock_ to be held,
205     // but ports_lock_ must be acquired before any individual port locks.
206     base::AutoLock ports_lock(ports_lock_);
207 
208     base::AutoLock lock(port->lock);
209     if (port->state == Port::kUninitialized) {
210       // If the port was not yet initialized, there's nothing interesting to do.
211       ErasePort_Locked(port_ref.name());
212       return OK;
213     }
214 
215     if (port->state != Port::kReceiving)
216       return ERROR_PORT_STATE_UNEXPECTED;
217 
218     port->state = Port::kClosed;
219 
220     // We pass along the sequence number of the last message sent from this
221     // port to allow the peer to have the opportunity to consume all inbound
222     // messages before notifying the embedder that this port is closed.
223     data.last_sequence_num = port->next_sequence_num_to_send - 1;
224 
225     peer_node_name = port->peer_node_name;
226     peer_port_name = port->peer_port_name;
227 
228     // If the port being closed still has unread messages, then we need to take
229     // care to close those ports so as to avoid leaking memory.
230     port->message_queue.GetReferencedPorts(&referenced_port_names);
231 
232     ErasePort_Locked(port_ref.name());
233   }
234 
235   DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@" << name_
236            << " to " << peer_port_name << "@" << peer_node_name;
237 
238   delegate_->ForwardMessage(
239       peer_node_name,
240       NewInternalMessage(peer_port_name, EventType::kObserveClosure, data));
241 
242   for (const auto& name : referenced_port_names) {
243     PortRef ref;
244     if (GetPort(name, &ref) == OK)
245       ClosePort(ref);
246   }
247   return OK;
248 }
249 
GetStatus(const PortRef & port_ref,PortStatus * port_status)250 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
251   Port* port = port_ref.port();
252 
253   base::AutoLock lock(port->lock);
254 
255   if (port->state != Port::kReceiving)
256     return ERROR_PORT_STATE_UNEXPECTED;
257 
258   port_status->has_messages = port->message_queue.HasNextMessage();
259   port_status->receiving_messages = CanAcceptMoreMessages(port);
260   port_status->peer_closed = port->peer_closed;
261   return OK;
262 }
263 
GetMessage(const PortRef & port_ref,ScopedMessage * message)264 int Node::GetMessage(const PortRef& port_ref, ScopedMessage* message) {
265   return GetMessageIf(port_ref, nullptr, message);
266 }
267 
GetMessageIf(const PortRef & port_ref,std::function<bool (const Message &)> selector,ScopedMessage * message)268 int Node::GetMessageIf(const PortRef& port_ref,
269                        std::function<bool(const Message&)> selector,
270                        ScopedMessage* message) {
271   *message = nullptr;
272 
273   DVLOG(2) << "GetMessageIf for " << port_ref.name() << "@" << name_;
274 
275   Port* port = port_ref.port();
276   {
277     base::AutoLock lock(port->lock);
278 
279     // This could also be treated like the port being unknown since the
280     // embedder should no longer be referring to a port that has been sent.
281     if (port->state != Port::kReceiving)
282       return ERROR_PORT_STATE_UNEXPECTED;
283 
284     // Let the embedder get messages until there are no more before reporting
285     // that the peer closed its end.
286     if (!CanAcceptMoreMessages(port))
287       return ERROR_PORT_PEER_CLOSED;
288 
289     port->message_queue.GetNextMessageIf(std::move(selector), message);
290   }
291 
292   // Allow referenced ports to trigger PortStatusChanged calls.
293   if (*message) {
294     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
295       const PortName& new_port_name = (*message)->ports()[i];
296       scoped_refptr<Port> new_port = GetPort(new_port_name);
297 
298       DCHECK(new_port) << "Port " << new_port_name << "@" << name_
299                        << " does not exist!";
300 
301       base::AutoLock lock(new_port->lock);
302 
303       DCHECK(new_port->state == Port::kReceiving);
304       new_port->message_queue.set_signalable(true);
305     }
306   }
307 
308   return OK;
309 }
310 
SendMessage(const PortRef & port_ref,ScopedMessage message)311 int Node::SendMessage(const PortRef& port_ref, ScopedMessage message) {
312   int rv = SendMessageInternal(port_ref, &message);
313   if (rv != OK) {
314     // If send failed, close all carried ports. Note that we're careful not to
315     // close the sending port itself if it happened to be one of the encoded
316     // ports (an invalid but possible condition.)
317     for (size_t i = 0; i < message->num_ports(); ++i) {
318       if (message->ports()[i] == port_ref.name())
319         continue;
320 
321       PortRef port;
322       if (GetPort(message->ports()[i], &port) == OK)
323         ClosePort(port);
324     }
325   }
326   return rv;
327 }
328 
AcceptMessage(ScopedMessage message)329 int Node::AcceptMessage(ScopedMessage message) {
330   const EventHeader* header = GetEventHeader(*message);
331   switch (header->type) {
332     case EventType::kUser:
333       return OnUserMessage(std::move(message));
334     case EventType::kPortAccepted:
335       return OnPortAccepted(header->port_name);
336     case EventType::kObserveProxy:
337       return OnObserveProxy(
338           header->port_name,
339           *GetEventData<ObserveProxyEventData>(*message));
340     case EventType::kObserveProxyAck:
341       return OnObserveProxyAck(
342           header->port_name,
343           GetEventData<ObserveProxyAckEventData>(*message)->last_sequence_num);
344     case EventType::kObserveClosure:
345       return OnObserveClosure(
346           header->port_name,
347           GetEventData<ObserveClosureEventData>(*message)->last_sequence_num);
348     case EventType::kMergePort:
349       return OnMergePort(header->port_name,
350                          *GetEventData<MergePortEventData>(*message));
351   }
352   return OOPS(ERROR_NOT_IMPLEMENTED);
353 }
354 
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)355 int Node::MergePorts(const PortRef& port_ref,
356                      const NodeName& destination_node_name,
357                      const PortName& destination_port_name) {
358   Port* port = port_ref.port();
359   MergePortEventData data;
360   {
361     base::AutoLock lock(port->lock);
362 
363     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
364              << " to " << destination_port_name << "@" << destination_node_name;
365 
366     // Send the port-to-merge over to the destination node so it can be merged
367     // into the port cycle atomically there.
368     data.new_port_name = port_ref.name();
369     WillSendPort(LockedPort(port), destination_node_name, &data.new_port_name,
370                  &data.new_port_descriptor);
371   }
372   delegate_->ForwardMessage(
373       destination_node_name,
374       NewInternalMessage(destination_port_name,
375                          EventType::kMergePort, data));
376   return OK;
377 }
378 
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)379 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
380   Port* port0 = port0_ref.port();
381   Port* port1 = port1_ref.port();
382   int rv;
383   {
384     // |ports_lock_| must be held when acquiring overlapping port locks.
385     base::AutoLock ports_lock(ports_lock_);
386     base::AutoLock port0_lock(port0->lock);
387     base::AutoLock port1_lock(port1->lock);
388 
389     DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
390              << " and " << port1_ref.name() << "@" << name_;
391 
392     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving)
393       rv = ERROR_PORT_STATE_UNEXPECTED;
394     else
395       rv = MergePorts_Locked(port0_ref, port1_ref);
396   }
397 
398   if (rv != OK) {
399     ClosePort(port0_ref);
400     ClosePort(port1_ref);
401   }
402 
403   return rv;
404 }
405 
LostConnectionToNode(const NodeName & node_name)406 int Node::LostConnectionToNode(const NodeName& node_name) {
407   // We can no longer send events to the given node. We also can't expect any
408   // PortAccepted events.
409 
410   DVLOG(1) << "Observing lost connection from node " << name_
411            << " to node " << node_name;
412 
413   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
414   return OK;
415 }
416 
OnUserMessage(ScopedMessage message)417 int Node::OnUserMessage(ScopedMessage message) {
418   PortName port_name = GetEventHeader(*message)->port_name;
419   const auto* event = GetEventData<UserEventData>(*message);
420 
421 #if DCHECK_IS_ON()
422   std::ostringstream ports_buf;
423   for (size_t i = 0; i < message->num_ports(); ++i) {
424     if (i > 0)
425       ports_buf << ",";
426     ports_buf << message->ports()[i];
427   }
428 
429   DVLOG(2) << "AcceptMessage " << event->sequence_num
430              << " [ports=" << ports_buf.str() << "] at "
431              << port_name << "@" << name_;
432 #endif
433 
434   scoped_refptr<Port> port = GetPort(port_name);
435 
436   // Even if this port does not exist, cannot receive anymore messages or is
437   // buffering or proxying messages, we still need these ports to be bound to
438   // this node. When the message is forwarded, these ports will get transferred
439   // following the usual method. If the message cannot be accepted, then the
440   // newly bound ports will simply be closed.
441 
442   for (size_t i = 0; i < message->num_ports(); ++i) {
443     int rv = AcceptPort(message->ports()[i], GetPortDescriptors(event)[i]);
444     if (rv != OK)
445       return rv;
446   }
447 
448   bool has_next_message = false;
449   bool message_accepted = false;
450 
451   if (port) {
452     // We may want to forward messages once the port lock is held, so we must
453     // acquire |ports_lock_| first.
454     base::AutoLock ports_lock(ports_lock_);
455     base::AutoLock lock(port->lock);
456 
457     // Reject spurious messages if we've already received the last expected
458     // message.
459     if (CanAcceptMoreMessages(port.get())) {
460       message_accepted = true;
461       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
462 
463       if (port->state == Port::kBuffering) {
464         has_next_message = false;
465       } else if (port->state == Port::kProxying) {
466         has_next_message = false;
467 
468         // Forward messages. We forward messages in sequential order here so
469         // that we maintain the message queue's notion of next sequence number.
470         // That's useful for the proxy removal process as we can tell when this
471         // port has seen all of the messages it is expected to see.
472         int rv = ForwardMessages_Locked(LockedPort(port.get()), port_name);
473         if (rv != OK)
474           return rv;
475 
476         MaybeRemoveProxy_Locked(LockedPort(port.get()), port_name);
477       }
478     }
479   }
480 
481   if (!message_accepted) {
482     DVLOG(2) << "Message not accepted!\n";
483     // Close all newly accepted ports as they are effectively orphaned.
484     for (size_t i = 0; i < message->num_ports(); ++i) {
485       PortRef port_ref;
486       if (GetPort(message->ports()[i], &port_ref) == OK) {
487         ClosePort(port_ref);
488       } else {
489         DLOG(WARNING) << "Cannot close non-existent port!\n";
490       }
491     }
492   } else if (has_next_message) {
493     PortRef port_ref(port_name, port);
494     delegate_->PortStatusChanged(port_ref);
495   }
496 
497   return OK;
498 }
499 
OnPortAccepted(const PortName & port_name)500 int Node::OnPortAccepted(const PortName& port_name) {
501   scoped_refptr<Port> port = GetPort(port_name);
502   if (!port)
503     return ERROR_PORT_UNKNOWN;
504 
505   DVLOG(2) << "PortAccepted at " << port_name << "@" << name_
506            << " pointing to "
507            << port->peer_port_name << "@" << port->peer_node_name;
508 
509   return BeginProxying(PortRef(port_name, port));
510 }
511 
OnObserveProxy(const PortName & port_name,const ObserveProxyEventData & event)512 int Node::OnObserveProxy(const PortName& port_name,
513                          const ObserveProxyEventData& event) {
514   if (port_name == kInvalidPortName) {
515     // An ObserveProxy with an invalid target port name is a broadcast used to
516     // inform ports when their peer (which was itself a proxy) has become
517     // defunct due to unexpected node disconnection.
518     //
519     // Receiving ports affected by this treat it as equivalent to peer closure.
520     // Proxies affected by this can be removed and will in turn broadcast their
521     // own death with a similar message.
522     CHECK_EQ(event.proxy_to_node_name, kInvalidNodeName);
523     CHECK_EQ(event.proxy_to_port_name, kInvalidPortName);
524     DestroyAllPortsWithPeer(event.proxy_node_name, event.proxy_port_name);
525     return OK;
526   }
527 
528   // The port may have already been closed locally, in which case the
529   // ObserveClosure message will contain the last_sequence_num field.
530   // We can then silently ignore this message.
531   scoped_refptr<Port> port = GetPort(port_name);
532   if (!port) {
533     DVLOG(1) << "ObserveProxy: " << port_name << "@" << name_ << " not found";
534 
535     if (port_name != event.proxy_port_name &&
536         port_name != event.proxy_to_port_name) {
537       // The receiving port may have been removed while this message was in
538       // transit.  In this case, we restart the ObserveProxy circulation from
539       // the referenced proxy port to avoid leaking the proxy.
540       delegate_->ForwardMessage(
541           event.proxy_node_name,
542           NewInternalMessage(
543               event.proxy_port_name, EventType::kObserveProxy, event));
544     }
545     return OK;
546   }
547 
548   DVLOG(2) << "ObserveProxy at " << port_name << "@" << name_ << ", proxy at "
549            << event.proxy_port_name << "@"
550            << event.proxy_node_name << " pointing to "
551            << event.proxy_to_port_name << "@"
552            << event.proxy_to_node_name;
553 
554   {
555     base::AutoLock lock(port->lock);
556 
557     if (port->peer_node_name == event.proxy_node_name &&
558         port->peer_port_name == event.proxy_port_name) {
559       if (port->state == Port::kReceiving) {
560         port->peer_node_name = event.proxy_to_node_name;
561         port->peer_port_name = event.proxy_to_port_name;
562 
563         ObserveProxyAckEventData ack;
564         ack.last_sequence_num = port->next_sequence_num_to_send - 1;
565 
566         delegate_->ForwardMessage(
567             event.proxy_node_name,
568             NewInternalMessage(event.proxy_port_name,
569                                EventType::kObserveProxyAck,
570                                ack));
571       } else {
572         // As a proxy ourselves, we don't know how to honor the ObserveProxy
573         // event or to populate the last_sequence_num field of ObserveProxyAck.
574         // Afterall, another port could be sending messages to our peer now
575         // that we've sent out our own ObserveProxy event.  Instead, we will
576         // send an ObserveProxyAck indicating that the ObserveProxy event
577         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
578         // However, this has to be done after we are removed as a proxy.
579         // Otherwise, we might just find ourselves back here again, which
580         // would be akin to a busy loop.
581 
582         DVLOG(2) << "Delaying ObserveProxyAck to "
583                  << event.proxy_port_name << "@" << event.proxy_node_name;
584 
585         ObserveProxyAckEventData ack;
586         ack.last_sequence_num = kInvalidSequenceNum;
587 
588         port->send_on_proxy_removal.reset(
589             new std::pair<NodeName, ScopedMessage>(
590                 event.proxy_node_name,
591                 NewInternalMessage(event.proxy_port_name,
592                                    EventType::kObserveProxyAck,
593                                    ack)));
594       }
595     } else {
596       // Forward this event along to our peer. Eventually, it should find the
597       // port referring to the proxy.
598       delegate_->ForwardMessage(
599           port->peer_node_name,
600           NewInternalMessage(port->peer_port_name,
601                              EventType::kObserveProxy,
602                              event));
603     }
604   }
605   return OK;
606 }
607 
OnObserveProxyAck(const PortName & port_name,uint64_t last_sequence_num)608 int Node::OnObserveProxyAck(const PortName& port_name,
609                             uint64_t last_sequence_num) {
610   DVLOG(2) << "ObserveProxyAck at " << port_name << "@" << name_
611            << " (last_sequence_num=" << last_sequence_num << ")";
612 
613   scoped_refptr<Port> port = GetPort(port_name);
614   if (!port)
615     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first, so
616                                 // this is not an "Oops".
617 
618   {
619     base::AutoLock lock(port->lock);
620 
621     if (port->state != Port::kProxying)
622       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
623 
624     if (last_sequence_num == kInvalidSequenceNum) {
625       // Send again.
626       InitiateProxyRemoval(LockedPort(port.get()), port_name);
627       return OK;
628     }
629 
630     // We can now remove this port once we have received and forwarded the last
631     // message addressed to this port.
632     port->remove_proxy_on_last_message = true;
633     port->last_sequence_num_to_receive = last_sequence_num;
634   }
635   TryRemoveProxy(PortRef(port_name, port));
636   return OK;
637 }
638 
OnObserveClosure(const PortName & port_name,uint64_t last_sequence_num)639 int Node::OnObserveClosure(const PortName& port_name,
640                            uint64_t last_sequence_num) {
641   // OK if the port doesn't exist, as it may have been closed already.
642   scoped_refptr<Port> port = GetPort(port_name);
643   if (!port)
644     return OK;
645 
646   // This message tells the port that it should no longer expect more messages
647   // beyond last_sequence_num. This message is forwarded along until we reach
648   // the receiving end, and this message serves as an equivalent to
649   // ObserveProxyAck.
650 
651   bool notify_delegate = false;
652   ObserveClosureEventData forwarded_data;
653   NodeName peer_node_name;
654   PortName peer_port_name;
655   bool try_remove_proxy = false;
656   {
657     base::AutoLock lock(port->lock);
658 
659     port->peer_closed = true;
660     port->last_sequence_num_to_receive = last_sequence_num;
661 
662     DVLOG(2) << "ObserveClosure at " << port_name << "@" << name_
663              << " (state=" << port->state << ") pointing to "
664              << port->peer_port_name << "@" << port->peer_node_name
665              << " (last_sequence_num=" << last_sequence_num << ")";
666 
667     // We always forward ObserveClosure, even beyond the receiving port which
668     // cares about it. This ensures that any dead-end proxies beyond that port
669     // are notified to remove themselves.
670 
671     if (port->state == Port::kReceiving) {
672       notify_delegate = true;
673 
674       // When forwarding along the other half of the port cycle, this will only
675       // reach dead-end proxies. Tell them we've sent our last message so they
676       // can go away.
677       //
678       // TODO: Repurposing ObserveClosure for this has the desired result but
679       // may be semantically confusing since the forwarding port is not actually
680       // closed. Consider replacing this with a new event type.
681       forwarded_data.last_sequence_num = port->next_sequence_num_to_send - 1;
682     } else {
683       // We haven't yet reached the receiving peer of the closed port, so
684       // forward the message along as-is.
685       forwarded_data.last_sequence_num = last_sequence_num;
686 
687       // See about removing the port if it is a proxy as our peer won't be able
688       // to participate in proxy removal.
689       port->remove_proxy_on_last_message = true;
690       if (port->state == Port::kProxying)
691         try_remove_proxy = true;
692     }
693 
694     DVLOG(2) << "Forwarding ObserveClosure from "
695              << port_name << "@" << name_ << " to peer "
696              << port->peer_port_name << "@" << port->peer_node_name
697              << " (last_sequence_num=" << forwarded_data.last_sequence_num
698              << ")";
699 
700     peer_node_name = port->peer_node_name;
701     peer_port_name = port->peer_port_name;
702   }
703   if (try_remove_proxy)
704     TryRemoveProxy(PortRef(port_name, port));
705 
706   delegate_->ForwardMessage(
707       peer_node_name,
708       NewInternalMessage(peer_port_name, EventType::kObserveClosure,
709                          forwarded_data));
710 
711   if (notify_delegate) {
712     PortRef port_ref(port_name, port);
713     delegate_->PortStatusChanged(port_ref);
714   }
715   return OK;
716 }
717 
OnMergePort(const PortName & port_name,const MergePortEventData & event)718 int Node::OnMergePort(const PortName& port_name,
719                       const MergePortEventData& event) {
720   scoped_refptr<Port> port = GetPort(port_name);
721 
722   DVLOG(1) << "MergePort at " << port_name << "@" << name_ << " (state="
723            << (port ? port->state : -1) << ") merging with proxy "
724            << event.new_port_name
725            << "@" << name_ << " pointing to "
726            << event.new_port_descriptor.peer_port_name << "@"
727            << event.new_port_descriptor.peer_node_name << " referred by "
728            << event.new_port_descriptor.referring_port_name << "@"
729            << event.new_port_descriptor.referring_node_name;
730 
731   bool close_target_port = false;
732   bool close_new_port = false;
733 
734   // Accept the new port. This is now the receiving end of the other port cycle
735   // to be merged with ours.
736   int rv = AcceptPort(event.new_port_name, event.new_port_descriptor);
737   if (rv != OK) {
738     close_target_port = true;
739   } else if (port) {
740     // BeginProxying_Locked may call MaybeRemoveProxy_Locked, which in turn
741     // needs to hold |ports_lock_|. We also acquire multiple port locks within.
742     base::AutoLock ports_lock(ports_lock_);
743     base::AutoLock lock(port->lock);
744 
745     if (port->state != Port::kReceiving) {
746       close_new_port = true;
747     } else {
748       scoped_refptr<Port> new_port = GetPort_Locked(event.new_port_name);
749       base::AutoLock new_port_lock(new_port->lock);
750       DCHECK(new_port->state == Port::kReceiving);
751 
752       // Both ports are locked. Now all we have to do is swap their peer
753       // information and set them up as proxies.
754 
755       PortRef port0_ref(port_name, port);
756       PortRef port1_ref(event.new_port_name, new_port);
757       int rv = MergePorts_Locked(port0_ref, port1_ref);
758       if (rv == OK)
759         return rv;
760 
761       close_new_port = true;
762       close_target_port = true;
763     }
764   } else {
765     close_new_port = true;
766   }
767 
768   if (close_target_port) {
769     PortRef target_port;
770     rv = GetPort(port_name, &target_port);
771     DCHECK(rv == OK);
772 
773     ClosePort(target_port);
774   }
775 
776   if (close_new_port) {
777     PortRef new_port;
778     rv = GetPort(event.new_port_name, &new_port);
779     DCHECK(rv == OK);
780 
781     ClosePort(new_port);
782   }
783 
784   return ERROR_PORT_STATE_UNEXPECTED;
785 }
786 
AddPortWithName(const PortName & port_name,const scoped_refptr<Port> & port)787 int Node::AddPortWithName(const PortName& port_name,
788                           const scoped_refptr<Port>& port) {
789   base::AutoLock lock(ports_lock_);
790 
791   if (!ports_.insert(std::make_pair(port_name, port)).second)
792     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
793 
794   DVLOG(2) << "Created port " << port_name << "@" << name_;
795   return OK;
796 }
797 
ErasePort(const PortName & port_name)798 void Node::ErasePort(const PortName& port_name) {
799   base::AutoLock lock(ports_lock_);
800   ErasePort_Locked(port_name);
801 }
802 
ErasePort_Locked(const PortName & port_name)803 void Node::ErasePort_Locked(const PortName& port_name) {
804   ports_lock_.AssertAcquired();
805   ports_.erase(port_name);
806   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
807 }
808 
GetPort(const PortName & port_name)809 scoped_refptr<Port> Node::GetPort(const PortName& port_name) {
810   base::AutoLock lock(ports_lock_);
811   return GetPort_Locked(port_name);
812 }
813 
GetPort_Locked(const PortName & port_name)814 scoped_refptr<Port> Node::GetPort_Locked(const PortName& port_name) {
815   ports_lock_.AssertAcquired();
816   auto iter = ports_.find(port_name);
817   if (iter == ports_.end())
818     return nullptr;
819 
820   return iter->second;
821 }
822 
SendMessageInternal(const PortRef & port_ref,ScopedMessage * message)823 int Node::SendMessageInternal(const PortRef& port_ref, ScopedMessage* message) {
824   ScopedMessage& m = *message;
825   for (size_t i = 0; i < m->num_ports(); ++i) {
826     if (m->ports()[i] == port_ref.name())
827       return ERROR_PORT_CANNOT_SEND_SELF;
828   }
829 
830   Port* port = port_ref.port();
831   NodeName peer_node_name;
832   {
833     // We must acquire |ports_lock_| before grabbing any port locks, because
834     // WillSendMessage_Locked may need to lock multiple ports out of order.
835     base::AutoLock ports_lock(ports_lock_);
836     base::AutoLock lock(port->lock);
837 
838     if (port->state != Port::kReceiving)
839       return ERROR_PORT_STATE_UNEXPECTED;
840 
841     if (port->peer_closed)
842       return ERROR_PORT_PEER_CLOSED;
843 
844     int rv = WillSendMessage_Locked(LockedPort(port), port_ref.name(), m.get());
845     if (rv != OK)
846       return rv;
847 
848     // Beyond this point there's no sense in returning anything but OK. Even if
849     // message forwarding or acceptance fails, there's nothing the embedder can
850     // do to recover. Assume that failure beyond this point must be treated as a
851     // transport failure.
852 
853     peer_node_name = port->peer_node_name;
854   }
855 
856   if (peer_node_name != name_) {
857     delegate_->ForwardMessage(peer_node_name, std::move(m));
858     return OK;
859   }
860 
861   int rv = AcceptMessage(std::move(m));
862   if (rv != OK) {
863     // See comment above for why we don't return an error in this case.
864     DVLOG(2) << "AcceptMessage failed: " << rv;
865   }
866 
867   return OK;
868 }
869 
MergePorts_Locked(const PortRef & port0_ref,const PortRef & port1_ref)870 int Node::MergePorts_Locked(const PortRef& port0_ref,
871                             const PortRef& port1_ref) {
872   Port* port0 = port0_ref.port();
873   Port* port1 = port1_ref.port();
874 
875   ports_lock_.AssertAcquired();
876   port0->lock.AssertAcquired();
877   port1->lock.AssertAcquired();
878 
879   CHECK(port0->state == Port::kReceiving);
880   CHECK(port1->state == Port::kReceiving);
881 
882   // Ports cannot be merged with their own receiving peer!
883   if (port0->peer_node_name == name_ &&
884       port0->peer_port_name == port1_ref.name())
885     return ERROR_PORT_STATE_UNEXPECTED;
886 
887   if (port1->peer_node_name == name_ &&
888       port1->peer_port_name == port0_ref.name())
889     return ERROR_PORT_STATE_UNEXPECTED;
890 
891   // Only merge if both ports have never sent a message.
892   if (port0->next_sequence_num_to_send == kInitialSequenceNum &&
893       port1->next_sequence_num_to_send == kInitialSequenceNum) {
894     // Swap the ports' peer information and switch them both into buffering
895     // (eventually proxying) mode.
896 
897     std::swap(port0->peer_node_name, port1->peer_node_name);
898     std::swap(port0->peer_port_name, port1->peer_port_name);
899 
900     port0->state = Port::kBuffering;
901     if (port0->peer_closed)
902       port0->remove_proxy_on_last_message = true;
903 
904     port1->state = Port::kBuffering;
905     if (port1->peer_closed)
906       port1->remove_proxy_on_last_message = true;
907 
908     int rv1 = BeginProxying_Locked(LockedPort(port0), port0_ref.name());
909     int rv2 = BeginProxying_Locked(LockedPort(port1), port1_ref.name());
910 
911     if (rv1 == OK && rv2 == OK) {
912       // If either merged port had a closed peer, its new peer needs to be
913       // informed of this.
914       if (port1->peer_closed) {
915         ObserveClosureEventData data;
916         data.last_sequence_num = port0->last_sequence_num_to_receive;
917         delegate_->ForwardMessage(
918             port0->peer_node_name,
919             NewInternalMessage(port0->peer_port_name,
920                                EventType::kObserveClosure, data));
921       }
922 
923       if (port0->peer_closed) {
924         ObserveClosureEventData data;
925         data.last_sequence_num = port1->last_sequence_num_to_receive;
926         delegate_->ForwardMessage(
927             port1->peer_node_name,
928             NewInternalMessage(port1->peer_port_name,
929                                EventType::kObserveClosure, data));
930       }
931 
932       return OK;
933     }
934 
935     // If either proxy failed to initialize (e.g. had undeliverable messages
936     // or ended up in a bad state somehow), we keep the system in a consistent
937     // state by undoing the peer swap.
938     std::swap(port0->peer_node_name, port1->peer_node_name);
939     std::swap(port0->peer_port_name, port1->peer_port_name);
940     port0->remove_proxy_on_last_message = false;
941     port1->remove_proxy_on_last_message = false;
942     port0->state = Port::kReceiving;
943     port1->state = Port::kReceiving;
944   }
945 
946   return ERROR_PORT_STATE_UNEXPECTED;
947 }
948 
WillSendPort(const LockedPort & port,const NodeName & to_node_name,PortName * port_name,PortDescriptor * port_descriptor)949 void Node::WillSendPort(const LockedPort& port,
950                         const NodeName& to_node_name,
951                         PortName* port_name,
952                         PortDescriptor* port_descriptor) {
953   port->lock.AssertAcquired();
954 
955   PortName local_port_name = *port_name;
956 
957   PortName new_port_name;
958   delegate_->GenerateRandomPortName(&new_port_name);
959 
960   // Make sure we don't send messages to the new peer until after we know it
961   // exists. In the meantime, just buffer messages locally.
962   DCHECK(port->state == Port::kReceiving);
963   port->state = Port::kBuffering;
964 
965   // If we already know our peer is closed, we already know this proxy can
966   // be removed once it receives and forwards its last expected message.
967   if (port->peer_closed)
968     port->remove_proxy_on_last_message = true;
969 
970   *port_name = new_port_name;
971 
972   port_descriptor->peer_node_name = port->peer_node_name;
973   port_descriptor->peer_port_name = port->peer_port_name;
974   port_descriptor->referring_node_name = name_;
975   port_descriptor->referring_port_name = local_port_name;
976   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
977   port_descriptor->next_sequence_num_to_receive =
978       port->message_queue.next_sequence_num();
979   port_descriptor->last_sequence_num_to_receive =
980       port->last_sequence_num_to_receive;
981   port_descriptor->peer_closed = port->peer_closed;
982   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
983 
984   // Configure the local port to point to the new port.
985   port->peer_node_name = to_node_name;
986   port->peer_port_name = new_port_name;
987 }
988 
AcceptPort(const PortName & port_name,const PortDescriptor & port_descriptor)989 int Node::AcceptPort(const PortName& port_name,
990                      const PortDescriptor& port_descriptor) {
991   scoped_refptr<Port> port = make_scoped_refptr(
992       new Port(port_descriptor.next_sequence_num_to_send,
993                port_descriptor.next_sequence_num_to_receive));
994   port->state = Port::kReceiving;
995   port->peer_node_name = port_descriptor.peer_node_name;
996   port->peer_port_name = port_descriptor.peer_port_name;
997   port->last_sequence_num_to_receive =
998       port_descriptor.last_sequence_num_to_receive;
999   port->peer_closed = port_descriptor.peer_closed;
1000 
1001   DVLOG(2) << "Accepting port " << port_name << " [peer_closed="
1002            << port->peer_closed << "; last_sequence_num_to_receive="
1003            << port->last_sequence_num_to_receive << "]";
1004 
1005   // A newly accepted port is not signalable until the message referencing the
1006   // new port finds its way to the consumer (see GetMessageIf).
1007   port->message_queue.set_signalable(false);
1008 
1009   int rv = AddPortWithName(port_name, port);
1010   if (rv != OK)
1011     return rv;
1012 
1013   // Allow referring port to forward messages.
1014   delegate_->ForwardMessage(
1015       port_descriptor.referring_node_name,
1016       NewInternalMessage(port_descriptor.referring_port_name,
1017                          EventType::kPortAccepted));
1018   return OK;
1019 }
1020 
WillSendMessage_Locked(const LockedPort & port,const PortName & port_name,Message * message)1021 int Node::WillSendMessage_Locked(const LockedPort& port,
1022                                  const PortName& port_name,
1023                                  Message* message) {
1024   ports_lock_.AssertAcquired();
1025   port->lock.AssertAcquired();
1026 
1027   DCHECK(message);
1028 
1029   // Messages may already have a sequence number if they're being forwarded
1030   // by a proxy. Otherwise, use the next outgoing sequence number.
1031   uint64_t* sequence_num =
1032       &GetMutableEventData<UserEventData>(message)->sequence_num;
1033   if (*sequence_num == 0)
1034     *sequence_num = port->next_sequence_num_to_send++;
1035 
1036 #if DCHECK_IS_ON()
1037   std::ostringstream ports_buf;
1038   for (size_t i = 0; i < message->num_ports(); ++i) {
1039     if (i > 0)
1040       ports_buf << ",";
1041     ports_buf << message->ports()[i];
1042   }
1043 #endif
1044 
1045   if (message->num_ports() > 0) {
1046     // Note: Another thread could be trying to send the same ports, so we need
1047     // to ensure that they are ours to send before we mutate their state.
1048 
1049     std::vector<scoped_refptr<Port>> ports;
1050     ports.resize(message->num_ports());
1051 
1052     {
1053       for (size_t i = 0; i < message->num_ports(); ++i) {
1054         ports[i] = GetPort_Locked(message->ports()[i]);
1055         DCHECK(ports[i]);
1056 
1057         ports[i]->lock.Acquire();
1058         int error = OK;
1059         if (ports[i]->state != Port::kReceiving)
1060           error = ERROR_PORT_STATE_UNEXPECTED;
1061         else if (message->ports()[i] == port->peer_port_name)
1062           error = ERROR_PORT_CANNOT_SEND_PEER;
1063 
1064         if (error != OK) {
1065           // Oops, we cannot send this port.
1066           for (size_t j = 0; j <= i; ++j)
1067             ports[i]->lock.Release();
1068           // Backpedal on the sequence number.
1069           port->next_sequence_num_to_send--;
1070           return error;
1071         }
1072       }
1073     }
1074 
1075     PortDescriptor* port_descriptors =
1076         GetMutablePortDescriptors(GetMutableEventData<UserEventData>(message));
1077 
1078     for (size_t i = 0; i < message->num_ports(); ++i) {
1079       WillSendPort(LockedPort(ports[i].get()),
1080                    port->peer_node_name,
1081                    message->mutable_ports() + i,
1082                    port_descriptors + i);
1083     }
1084 
1085     for (size_t i = 0; i < message->num_ports(); ++i)
1086       ports[i]->lock.Release();
1087   }
1088 
1089 #if DCHECK_IS_ON()
1090   DVLOG(2) << "Sending message "
1091            << GetEventData<UserEventData>(*message)->sequence_num
1092            << " [ports=" << ports_buf.str() << "]"
1093            << " from " << port_name << "@" << name_
1094            << " to " << port->peer_port_name << "@" << port->peer_node_name;
1095 #endif
1096 
1097   GetMutableEventHeader(message)->port_name = port->peer_port_name;
1098   return OK;
1099 }
1100 
BeginProxying_Locked(const LockedPort & port,const PortName & port_name)1101 int Node::BeginProxying_Locked(const LockedPort& port,
1102                                const PortName& port_name) {
1103   ports_lock_.AssertAcquired();
1104   port->lock.AssertAcquired();
1105 
1106   if (port->state != Port::kBuffering)
1107     return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1108 
1109   port->state = Port::kProxying;
1110 
1111   int rv = ForwardMessages_Locked(LockedPort(port), port_name);
1112   if (rv != OK)
1113     return rv;
1114 
1115   // We may have observed closure while buffering. In that case, we can advance
1116   // to removing the proxy without sending out an ObserveProxy message. We
1117   // already know the last expected message, etc.
1118 
1119   if (port->remove_proxy_on_last_message) {
1120     MaybeRemoveProxy_Locked(LockedPort(port), port_name);
1121 
1122     // Make sure we propagate closure to our current peer.
1123     ObserveClosureEventData data;
1124     data.last_sequence_num = port->last_sequence_num_to_receive;
1125     delegate_->ForwardMessage(
1126         port->peer_node_name,
1127         NewInternalMessage(port->peer_port_name,
1128                            EventType::kObserveClosure, data));
1129   } else {
1130     InitiateProxyRemoval(LockedPort(port), port_name);
1131   }
1132 
1133   return OK;
1134 }
1135 
BeginProxying(PortRef port_ref)1136 int Node::BeginProxying(PortRef port_ref) {
1137   Port* port = port_ref.port();
1138   {
1139     base::AutoLock ports_lock(ports_lock_);
1140     base::AutoLock lock(port->lock);
1141 
1142     if (port->state != Port::kBuffering)
1143       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1144 
1145     port->state = Port::kProxying;
1146 
1147     int rv = ForwardMessages_Locked(LockedPort(port), port_ref.name());
1148     if (rv != OK)
1149       return rv;
1150   }
1151 
1152   bool should_remove;
1153   NodeName peer_node_name;
1154   ScopedMessage closure_message;
1155   {
1156     base::AutoLock lock(port->lock);
1157     if (port->state != Port::kProxying)
1158       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1159 
1160     should_remove = port->remove_proxy_on_last_message;
1161     if (should_remove) {
1162       // Make sure we propagate closure to our current peer.
1163       ObserveClosureEventData data;
1164       data.last_sequence_num = port->last_sequence_num_to_receive;
1165       peer_node_name = port->peer_node_name;
1166       closure_message = NewInternalMessage(port->peer_port_name,
1167                                            EventType::kObserveClosure, data);
1168     } else {
1169       InitiateProxyRemoval(LockedPort(port), port_ref.name());
1170     }
1171   }
1172 
1173   if (should_remove) {
1174     TryRemoveProxy(port_ref);
1175     delegate_->ForwardMessage(peer_node_name, std::move(closure_message));
1176   }
1177 
1178   return OK;
1179 }
1180 
ForwardMessages_Locked(const LockedPort & port,const PortName & port_name)1181 int Node::ForwardMessages_Locked(const LockedPort& port,
1182                                  const PortName &port_name) {
1183   ports_lock_.AssertAcquired();
1184   port->lock.AssertAcquired();
1185 
1186   for (;;) {
1187     ScopedMessage message;
1188     port->message_queue.GetNextMessageIf(nullptr, &message);
1189     if (!message)
1190       break;
1191 
1192     int rv = WillSendMessage_Locked(LockedPort(port), port_name, message.get());
1193     if (rv != OK)
1194       return rv;
1195 
1196     delegate_->ForwardMessage(port->peer_node_name, std::move(message));
1197   }
1198   return OK;
1199 }
1200 
InitiateProxyRemoval(const LockedPort & port,const PortName & port_name)1201 void Node::InitiateProxyRemoval(const LockedPort& port,
1202                                 const PortName& port_name) {
1203   port->lock.AssertAcquired();
1204 
1205   // To remove this node, we start by notifying the connected graph that we are
1206   // a proxy. This allows whatever port is referencing this node to skip it.
1207   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1208   // the peer was closed in the meantime).
1209 
1210   ObserveProxyEventData data;
1211   data.proxy_node_name = name_;
1212   data.proxy_port_name = port_name;
1213   data.proxy_to_node_name = port->peer_node_name;
1214   data.proxy_to_port_name = port->peer_port_name;
1215 
1216   delegate_->ForwardMessage(
1217       port->peer_node_name,
1218       NewInternalMessage(port->peer_port_name, EventType::kObserveProxy, data));
1219 }
1220 
MaybeRemoveProxy_Locked(const LockedPort & port,const PortName & port_name)1221 void Node::MaybeRemoveProxy_Locked(const LockedPort& port,
1222                                    const PortName& port_name) {
1223   // |ports_lock_| must be held so we can potentilaly ErasePort_Locked().
1224   ports_lock_.AssertAcquired();
1225   port->lock.AssertAcquired();
1226 
1227   DCHECK(port->state == Port::kProxying);
1228 
1229   // Make sure we have seen ObserveProxyAck before removing the port.
1230   if (!port->remove_proxy_on_last_message)
1231     return;
1232 
1233   if (!CanAcceptMoreMessages(port.get())) {
1234     // This proxy port is done. We can now remove it!
1235     ErasePort_Locked(port_name);
1236 
1237     if (port->send_on_proxy_removal) {
1238       NodeName to_node = port->send_on_proxy_removal->first;
1239       ScopedMessage& message = port->send_on_proxy_removal->second;
1240 
1241       delegate_->ForwardMessage(to_node, std::move(message));
1242       port->send_on_proxy_removal.reset();
1243     }
1244   } else {
1245     DVLOG(2) << "Cannot remove port " << port_name << "@" << name_
1246              << " now; waiting for more messages";
1247   }
1248 }
1249 
TryRemoveProxy(PortRef port_ref)1250 void Node::TryRemoveProxy(PortRef port_ref) {
1251   Port* port = port_ref.port();
1252   bool should_erase = false;
1253   ScopedMessage msg;
1254   NodeName to_node;
1255   {
1256     base::AutoLock lock(port->lock);
1257 
1258     // Port already removed. Nothing to do.
1259     if (port->state == Port::kClosed)
1260       return;
1261 
1262     DCHECK(port->state == Port::kProxying);
1263 
1264     // Make sure we have seen ObserveProxyAck before removing the port.
1265     if (!port->remove_proxy_on_last_message)
1266       return;
1267 
1268     if (!CanAcceptMoreMessages(port)) {
1269       // This proxy port is done. We can now remove it!
1270       should_erase = true;
1271 
1272       if (port->send_on_proxy_removal) {
1273         to_node = port->send_on_proxy_removal->first;
1274         msg = std::move(port->send_on_proxy_removal->second);
1275         port->send_on_proxy_removal.reset();
1276       }
1277     } else {
1278       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1279                << " now; waiting for more messages";
1280     }
1281   }
1282 
1283   if (should_erase)
1284     ErasePort(port_ref.name());
1285 
1286   if (msg)
1287     delegate_->ForwardMessage(to_node, std::move(msg));
1288 }
1289 
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1290 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1291                                    const PortName& port_name) {
1292   // Wipes out all ports whose peer node matches |node_name| and whose peer port
1293   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1294   // node is matched.
1295 
1296   std::vector<PortRef> ports_to_notify;
1297   std::vector<PortName> dead_proxies_to_broadcast;
1298   std::deque<PortName> referenced_port_names;
1299 
1300   {
1301     base::AutoLock ports_lock(ports_lock_);
1302 
1303     for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1304       Port* port = iter->second.get();
1305       {
1306         base::AutoLock port_lock(port->lock);
1307 
1308         if (port->peer_node_name == node_name &&
1309               (port_name == kInvalidPortName ||
1310                     port->peer_port_name == port_name)) {
1311           if (!port->peer_closed) {
1312             // Treat this as immediate peer closure. It's an exceptional
1313             // condition akin to a broken pipe, so we don't care about losing
1314             // messages.
1315 
1316             port->peer_closed = true;
1317             port->last_sequence_num_to_receive =
1318                 port->message_queue.next_sequence_num() - 1;
1319 
1320             if (port->state == Port::kReceiving)
1321               ports_to_notify.push_back(PortRef(iter->first, port));
1322           }
1323 
1324           // We don't expect to forward any further messages, and we don't
1325           // expect to receive a Port{Accepted,Rejected} event. Because we're
1326           // a proxy with no active peer, we cannot use the normal proxy removal
1327           // procedure of forward-propagating an ObserveProxy. Instead we
1328           // broadcast our own death so it can be back-propagated. This is
1329           // inefficient but rare.
1330           if (port->state != Port::kReceiving) {
1331             dead_proxies_to_broadcast.push_back(iter->first);
1332             iter->second->message_queue.GetReferencedPorts(
1333                 &referenced_port_names);
1334           }
1335         }
1336       }
1337     }
1338 
1339     for (const auto& proxy_name : dead_proxies_to_broadcast) {
1340       ports_.erase(proxy_name);
1341       DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1342     }
1343   }
1344 
1345   // Wake up any receiving ports who have just observed simulated peer closure.
1346   for (const auto& port : ports_to_notify)
1347     delegate_->PortStatusChanged(port);
1348 
1349   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1350     // Broadcast an event signifying that this proxy is no longer functioning.
1351     ObserveProxyEventData event;
1352     event.proxy_node_name = name_;
1353     event.proxy_port_name = proxy_name;
1354     event.proxy_to_node_name = kInvalidNodeName;
1355     event.proxy_to_port_name = kInvalidPortName;
1356     delegate_->BroadcastMessage(NewInternalMessage(
1357         kInvalidPortName, EventType::kObserveProxy, event));
1358 
1359     // Also process death locally since the port that points this closed one
1360     // could be on the current node.
1361     // Note: Although this is recursive, only a single port is involved which
1362     // limits the expected branching to 1.
1363     DestroyAllPortsWithPeer(name_, proxy_name);
1364   }
1365 
1366   // Close any ports referenced by the closed proxies.
1367   for (const auto& name : referenced_port_names) {
1368     PortRef ref;
1369     if (GetPort(name, &ref) == OK)
1370       ClosePort(ref);
1371   }
1372 }
1373 
NewInternalMessage_Helper(const PortName & port_name,const EventType & type,const void * data,size_t num_data_bytes)1374 ScopedMessage Node::NewInternalMessage_Helper(const PortName& port_name,
1375                                               const EventType& type,
1376                                               const void* data,
1377                                               size_t num_data_bytes) {
1378   ScopedMessage message;
1379   delegate_->AllocMessage(sizeof(EventHeader) + num_data_bytes, &message);
1380 
1381   EventHeader* header = GetMutableEventHeader(message.get());
1382   header->port_name = port_name;
1383   header->type = type;
1384   header->padding = 0;
1385 
1386   if (num_data_bytes)
1387     memcpy(header + 1, data, num_data_bytes);
1388 
1389   return message;
1390 }
1391 
1392 }  // namespace ports
1393 }  // namespace edk
1394 }  // namespace mojo
1395