1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/ports/node.h"
6 
7 #include <string.h>
8 
9 #include <algorithm>
10 #include <utility>
11 
12 #include "base/atomicops.h"
13 #include "base/containers/stack_container.h"
14 #include "base/lazy_instance.h"
15 #include "base/logging.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/optional.h"
18 #include "base/synchronization/lock.h"
19 #include "base/threading/thread_local.h"
20 #include "build/build_config.h"
21 #include "mojo/core/ports/event.h"
22 #include "mojo/core/ports/node_delegate.h"
23 #include "mojo/core/ports/port_locker.h"
24 
25 #if !defined(OS_NACL)
26 #include "crypto/random.h"
27 #else
28 #include "base/rand_util.h"
29 #endif
30 
31 namespace mojo {
32 namespace core {
33 namespace ports {
34 
35 namespace {
36 
37 constexpr size_t kRandomNameCacheSize = 256;
38 
39 // Random port name generator which maintains a cache of random bytes to draw
40 // from. This amortizes the cost of random name generation on platforms where
41 // RandBytes may have significant per-call overhead.
42 //
43 // Note that the use of this cache means one has to be careful about fork()ing
44 // a process once any port names have been generated, as that behavior can lead
45 // to collisions between independently generated names in different processes.
46 class RandomNameGenerator {
47  public:
48   RandomNameGenerator() = default;
49   ~RandomNameGenerator() = default;
50 
GenerateRandomPortName()51   PortName GenerateRandomPortName() {
52     base::AutoLock lock(lock_);
53     if (cache_index_ == kRandomNameCacheSize) {
54 #if defined(OS_NACL)
55       base::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
56 #else
57       crypto::RandBytes(cache_, sizeof(PortName) * kRandomNameCacheSize);
58 #endif
59       cache_index_ = 0;
60     }
61     return cache_[cache_index_++];
62   }
63 
64  private:
65   base::Lock lock_;
66   PortName cache_[kRandomNameCacheSize];
67   size_t cache_index_ = kRandomNameCacheSize;
68 
69   DISALLOW_COPY_AND_ASSIGN(RandomNameGenerator);
70 };
71 
72 base::LazyInstance<RandomNameGenerator>::Leaky g_name_generator =
73     LAZY_INSTANCE_INITIALIZER;
74 
DebugError(const char * message,int error_code)75 int DebugError(const char* message, int error_code) {
76   NOTREACHED() << "Oops: " << message;
77   return error_code;
78 }
79 
80 #define OOPS(x) DebugError(#x, x)
81 
CanAcceptMoreMessages(const Port * port)82 bool CanAcceptMoreMessages(const Port* port) {
83   // Have we already doled out the last message (i.e., do we expect to NOT
84   // receive further messages)?
85   uint64_t next_sequence_num = port->message_queue.next_sequence_num();
86   if (port->state == Port::kClosed)
87     return false;
88   if (port->peer_closed || port->remove_proxy_on_last_message) {
89     if (port->last_sequence_num_to_receive == next_sequence_num - 1)
90       return false;
91   }
92   return true;
93 }
94 
GenerateRandomPortName(PortName * name)95 void GenerateRandomPortName(PortName* name) {
96   *name = g_name_generator.Get().GenerateRandomPortName();
97 }
98 
99 }  // namespace
100 
Node(const NodeName & name,NodeDelegate * delegate)101 Node::Node(const NodeName& name, NodeDelegate* delegate)
102     : name_(name), delegate_(this, delegate) {}
103 
~Node()104 Node::~Node() {
105   if (!ports_.empty())
106     DLOG(WARNING) << "Unclean shutdown for node " << name_;
107 }
108 
CanShutdownCleanly(ShutdownPolicy policy)109 bool Node::CanShutdownCleanly(ShutdownPolicy policy) {
110   PortLocker::AssertNoPortsLockedOnCurrentThread();
111   base::AutoLock ports_lock(ports_lock_);
112 
113   if (policy == ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS) {
114 #if DCHECK_IS_ON()
115     for (auto& entry : ports_) {
116       DVLOG(2) << "Port " << entry.first << " referencing node "
117                << entry.second->peer_node_name << " is blocking shutdown of "
118                << "node " << name_ << " (state=" << entry.second->state << ")";
119     }
120 #endif
121     return ports_.empty();
122   }
123 
124   DCHECK_EQ(policy, ShutdownPolicy::ALLOW_LOCAL_PORTS);
125 
126   // NOTE: This is not efficient, though it probably doesn't need to be since
127   // relatively few ports should be open during shutdown and shutdown doesn't
128   // need to be blazingly fast.
129   bool can_shutdown = true;
130   for (auto& entry : ports_) {
131     PortRef port_ref(entry.first, entry.second);
132     SinglePortLocker locker(&port_ref);
133     auto* port = locker.port();
134     if (port->peer_node_name != name_ && port->state != Port::kReceiving) {
135       can_shutdown = false;
136 #if DCHECK_IS_ON()
137       DVLOG(2) << "Port " << entry.first << " referencing node "
138                << port->peer_node_name << " is blocking shutdown of "
139                << "node " << name_ << " (state=" << port->state << ")";
140 #else
141       // Exit early when not debugging.
142       break;
143 #endif
144     }
145   }
146 
147   return can_shutdown;
148 }
149 
GetPort(const PortName & port_name,PortRef * port_ref)150 int Node::GetPort(const PortName& port_name, PortRef* port_ref) {
151   PortLocker::AssertNoPortsLockedOnCurrentThread();
152   base::AutoLock lock(ports_lock_);
153   auto iter = ports_.find(port_name);
154   if (iter == ports_.end())
155     return ERROR_PORT_UNKNOWN;
156 
157 #if defined(OS_ANDROID) && defined(ARCH_CPU_ARM64)
158   // Workaround for https://crbug.com/665869.
159   base::subtle::MemoryBarrier();
160 #endif
161 
162   *port_ref = PortRef(port_name, iter->second);
163   return OK;
164 }
165 
CreateUninitializedPort(PortRef * port_ref)166 int Node::CreateUninitializedPort(PortRef* port_ref) {
167   PortName port_name;
168   GenerateRandomPortName(&port_name);
169 
170   scoped_refptr<Port> port(new Port(kInitialSequenceNum, kInitialSequenceNum));
171   int rv = AddPortWithName(port_name, port);
172   if (rv != OK)
173     return rv;
174 
175   *port_ref = PortRef(port_name, std::move(port));
176   return OK;
177 }
178 
InitializePort(const PortRef & port_ref,const NodeName & peer_node_name,const PortName & peer_port_name)179 int Node::InitializePort(const PortRef& port_ref,
180                          const NodeName& peer_node_name,
181                          const PortName& peer_port_name) {
182   {
183     SinglePortLocker locker(&port_ref);
184     auto* port = locker.port();
185     if (port->state != Port::kUninitialized)
186       return ERROR_PORT_STATE_UNEXPECTED;
187 
188     port->state = Port::kReceiving;
189     port->peer_node_name = peer_node_name;
190     port->peer_port_name = peer_port_name;
191   }
192 
193   delegate_->PortStatusChanged(port_ref);
194 
195   return OK;
196 }
197 
CreatePortPair(PortRef * port0_ref,PortRef * port1_ref)198 int Node::CreatePortPair(PortRef* port0_ref, PortRef* port1_ref) {
199   int rv;
200 
201   rv = CreateUninitializedPort(port0_ref);
202   if (rv != OK)
203     return rv;
204 
205   rv = CreateUninitializedPort(port1_ref);
206   if (rv != OK)
207     return rv;
208 
209   rv = InitializePort(*port0_ref, name_, port1_ref->name());
210   if (rv != OK)
211     return rv;
212 
213   rv = InitializePort(*port1_ref, name_, port0_ref->name());
214   if (rv != OK)
215     return rv;
216 
217   return OK;
218 }
219 
SetUserData(const PortRef & port_ref,scoped_refptr<UserData> user_data)220 int Node::SetUserData(const PortRef& port_ref,
221                       scoped_refptr<UserData> user_data) {
222   SinglePortLocker locker(&port_ref);
223   auto* port = locker.port();
224   if (port->state == Port::kClosed)
225     return ERROR_PORT_STATE_UNEXPECTED;
226 
227   port->user_data = std::move(user_data);
228 
229   return OK;
230 }
231 
GetUserData(const PortRef & port_ref,scoped_refptr<UserData> * user_data)232 int Node::GetUserData(const PortRef& port_ref,
233                       scoped_refptr<UserData>* user_data) {
234   SinglePortLocker locker(&port_ref);
235   auto* port = locker.port();
236   if (port->state == Port::kClosed)
237     return ERROR_PORT_STATE_UNEXPECTED;
238 
239   *user_data = port->user_data;
240 
241   return OK;
242 }
243 
ClosePort(const PortRef & port_ref)244 int Node::ClosePort(const PortRef& port_ref) {
245   std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
246   NodeName peer_node_name;
247   PortName peer_port_name;
248   uint64_t last_sequence_num = 0;
249   bool was_initialized = false;
250   {
251     SinglePortLocker locker(&port_ref);
252     auto* port = locker.port();
253     switch (port->state) {
254       case Port::kUninitialized:
255         break;
256 
257       case Port::kReceiving:
258         was_initialized = true;
259         port->state = Port::kClosed;
260 
261         // We pass along the sequence number of the last message sent from this
262         // port to allow the peer to have the opportunity to consume all inbound
263         // messages before notifying the embedder that this port is closed.
264         last_sequence_num = port->next_sequence_num_to_send - 1;
265 
266         peer_node_name = port->peer_node_name;
267         peer_port_name = port->peer_port_name;
268 
269         // If the port being closed still has unread messages, then we need to
270         // take care to close those ports so as to avoid leaking memory.
271         port->message_queue.TakeAllMessages(&undelivered_messages);
272         break;
273 
274       default:
275         return ERROR_PORT_STATE_UNEXPECTED;
276     }
277   }
278 
279   ErasePort(port_ref.name());
280 
281   if (was_initialized) {
282     DVLOG(2) << "Sending ObserveClosure from " << port_ref.name() << "@"
283              << name_ << " to " << peer_port_name << "@" << peer_node_name;
284     delegate_->ForwardEvent(peer_node_name,
285                             std::make_unique<ObserveClosureEvent>(
286                                 peer_port_name, last_sequence_num));
287     for (const auto& message : undelivered_messages) {
288       for (size_t i = 0; i < message->num_ports(); ++i) {
289         PortRef ref;
290         if (GetPort(message->ports()[i], &ref) == OK)
291           ClosePort(ref);
292       }
293     }
294   }
295   return OK;
296 }
297 
GetStatus(const PortRef & port_ref,PortStatus * port_status)298 int Node::GetStatus(const PortRef& port_ref, PortStatus* port_status) {
299   SinglePortLocker locker(&port_ref);
300   auto* port = locker.port();
301   if (port->state != Port::kReceiving)
302     return ERROR_PORT_STATE_UNEXPECTED;
303 
304   port_status->has_messages = port->message_queue.HasNextMessage();
305   port_status->receiving_messages = CanAcceptMoreMessages(port);
306   port_status->peer_closed = port->peer_closed;
307   port_status->peer_remote = port->peer_node_name != name_;
308   port_status->queued_message_count =
309       port->message_queue.queued_message_count();
310   port_status->queued_num_bytes = port->message_queue.queued_num_bytes();
311   return OK;
312 }
313 
GetMessage(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> * message,MessageFilter * filter)314 int Node::GetMessage(const PortRef& port_ref,
315                      std::unique_ptr<UserMessageEvent>* message,
316                      MessageFilter* filter) {
317   *message = nullptr;
318 
319   DVLOG(4) << "GetMessage for " << port_ref.name() << "@" << name_;
320 
321   {
322     SinglePortLocker locker(&port_ref);
323     auto* port = locker.port();
324 
325     // This could also be treated like the port being unknown since the
326     // embedder should no longer be referring to a port that has been sent.
327     if (port->state != Port::kReceiving)
328       return ERROR_PORT_STATE_UNEXPECTED;
329 
330     // Let the embedder get messages until there are no more before reporting
331     // that the peer closed its end.
332     if (!CanAcceptMoreMessages(port))
333       return ERROR_PORT_PEER_CLOSED;
334 
335     port->message_queue.GetNextMessage(message, filter);
336   }
337 
338   // Allow referenced ports to trigger PortStatusChanged calls.
339   if (*message) {
340     for (size_t i = 0; i < (*message)->num_ports(); ++i) {
341       PortRef new_port_ref;
342       int rv = GetPort((*message)->ports()[i], &new_port_ref);
343 
344       DCHECK_EQ(OK, rv) << "Port " << new_port_ref.name() << "@" << name_
345                         << " does not exist!";
346 
347       SinglePortLocker locker(&new_port_ref);
348       DCHECK(locker.port()->state == Port::kReceiving);
349       locker.port()->message_queue.set_signalable(true);
350     }
351 
352     // The user may retransmit this message from another port. We reset the
353     // sequence number so that the message will get a new one if that happens.
354     (*message)->set_sequence_num(0);
355   }
356 
357   return OK;
358 }
359 
SendUserMessage(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> message)360 int Node::SendUserMessage(const PortRef& port_ref,
361                           std::unique_ptr<UserMessageEvent> message) {
362   int rv = SendUserMessageInternal(port_ref, &message);
363   if (rv != OK) {
364     // If send failed, close all carried ports. Note that we're careful not to
365     // close the sending port itself if it happened to be one of the encoded
366     // ports (an invalid but possible condition.)
367     for (size_t i = 0; i < message->num_ports(); ++i) {
368       if (message->ports()[i] == port_ref.name())
369         continue;
370 
371       PortRef port;
372       if (GetPort(message->ports()[i], &port) == OK)
373         ClosePort(port);
374     }
375   }
376   return rv;
377 }
378 
AcceptEvent(ScopedEvent event)379 int Node::AcceptEvent(ScopedEvent event) {
380   switch (event->type()) {
381     case Event::Type::kUserMessage:
382       return OnUserMessage(Event::Cast<UserMessageEvent>(&event));
383     case Event::Type::kPortAccepted:
384       return OnPortAccepted(Event::Cast<PortAcceptedEvent>(&event));
385     case Event::Type::kObserveProxy:
386       return OnObserveProxy(Event::Cast<ObserveProxyEvent>(&event));
387     case Event::Type::kObserveProxyAck:
388       return OnObserveProxyAck(Event::Cast<ObserveProxyAckEvent>(&event));
389     case Event::Type::kObserveClosure:
390       return OnObserveClosure(Event::Cast<ObserveClosureEvent>(&event));
391     case Event::Type::kMergePort:
392       return OnMergePort(Event::Cast<MergePortEvent>(&event));
393   }
394   return OOPS(ERROR_NOT_IMPLEMENTED);
395 }
396 
MergePorts(const PortRef & port_ref,const NodeName & destination_node_name,const PortName & destination_port_name)397 int Node::MergePorts(const PortRef& port_ref,
398                      const NodeName& destination_node_name,
399                      const PortName& destination_port_name) {
400   PortName new_port_name;
401   Event::PortDescriptor new_port_descriptor;
402   {
403     SinglePortLocker locker(&port_ref);
404 
405     DVLOG(1) << "Sending MergePort from " << port_ref.name() << "@" << name_
406              << " to " << destination_port_name << "@" << destination_node_name;
407 
408     // Send the port-to-merge over to the destination node so it can be merged
409     // into the port cycle atomically there.
410     new_port_name = port_ref.name();
411     ConvertToProxy(locker.port(), destination_node_name, &new_port_name,
412                    &new_port_descriptor);
413   }
414 
415   if (new_port_descriptor.peer_node_name == name_ &&
416       destination_node_name != name_) {
417     // Ensure that the locally retained peer of the new proxy gets a status
418     // update so it notices that its peer is now remote.
419     PortRef local_peer;
420     if (GetPort(new_port_descriptor.peer_port_name, &local_peer) == OK)
421       delegate_->PortStatusChanged(local_peer);
422   }
423 
424   delegate_->ForwardEvent(
425       destination_node_name,
426       std::make_unique<MergePortEvent>(destination_port_name, new_port_name,
427                                        new_port_descriptor));
428   return OK;
429 }
430 
MergeLocalPorts(const PortRef & port0_ref,const PortRef & port1_ref)431 int Node::MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref) {
432   DVLOG(1) << "Merging local ports " << port0_ref.name() << "@" << name_
433            << " and " << port1_ref.name() << "@" << name_;
434   return MergePortsInternal(port0_ref, port1_ref,
435                             true /* allow_close_on_bad_state */);
436 }
437 
LostConnectionToNode(const NodeName & node_name)438 int Node::LostConnectionToNode(const NodeName& node_name) {
439   // We can no longer send events to the given node. We also can't expect any
440   // PortAccepted events.
441 
442   DVLOG(1) << "Observing lost connection from node " << name_ << " to node "
443            << node_name;
444 
445   DestroyAllPortsWithPeer(node_name, kInvalidPortName);
446   return OK;
447 }
448 
OnUserMessage(std::unique_ptr<UserMessageEvent> message)449 int Node::OnUserMessage(std::unique_ptr<UserMessageEvent> message) {
450   PortName port_name = message->port_name();
451 
452 #if DCHECK_IS_ON()
453   std::ostringstream ports_buf;
454   for (size_t i = 0; i < message->num_ports(); ++i) {
455     if (i > 0)
456       ports_buf << ",";
457     ports_buf << message->ports()[i];
458   }
459 
460   DVLOG(4) << "OnUserMessage " << message->sequence_num()
461            << " [ports=" << ports_buf.str() << "] at " << port_name << "@"
462            << name_;
463 #endif
464 
465   // Even if this port does not exist, cannot receive anymore messages or is
466   // buffering or proxying messages, we still need these ports to be bound to
467   // this node. When the message is forwarded, these ports will get transferred
468   // following the usual method. If the message cannot be accepted, then the
469   // newly bound ports will simply be closed.
470   for (size_t i = 0; i < message->num_ports(); ++i) {
471     Event::PortDescriptor& descriptor = message->port_descriptors()[i];
472     if (descriptor.referring_node_name == kInvalidNodeName) {
473       // If the referring node name is invalid, this descriptor can be ignored
474       // and the port should already exist locally.
475       PortRef port_ref;
476       if (GetPort(message->ports()[i], &port_ref) != OK)
477         return ERROR_PORT_UNKNOWN;
478     } else {
479       int rv = AcceptPort(message->ports()[i], descriptor);
480       if (rv != OK)
481         return rv;
482 
483       // Ensure that the referring node is wiped out of this descriptor. This
484       // allows the event to be forwarded across multiple local hops without
485       // attempting to accept the port more than once.
486       descriptor.referring_node_name = kInvalidNodeName;
487     }
488   }
489 
490   PortRef port_ref;
491   GetPort(port_name, &port_ref);
492   bool has_next_message = false;
493   bool message_accepted = false;
494   bool should_forward_messages = false;
495   if (port_ref.is_valid()) {
496     SinglePortLocker locker(&port_ref);
497     auto* port = locker.port();
498 
499     // Reject spurious messages if we've already received the last expected
500     // message.
501     if (CanAcceptMoreMessages(port)) {
502       message_accepted = true;
503       port->message_queue.AcceptMessage(std::move(message), &has_next_message);
504 
505       if (port->state == Port::kBuffering) {
506         has_next_message = false;
507       } else if (port->state == Port::kProxying) {
508         has_next_message = false;
509         should_forward_messages = true;
510       }
511     }
512   }
513 
514   if (should_forward_messages) {
515     int rv = ForwardUserMessagesFromProxy(port_ref);
516     if (rv != OK)
517       return rv;
518     TryRemoveProxy(port_ref);
519   }
520 
521   if (!message_accepted) {
522     DVLOG(2) << "Message not accepted!\n";
523     // Close all newly accepted ports as they are effectively orphaned.
524     for (size_t i = 0; i < message->num_ports(); ++i) {
525       PortRef attached_port_ref;
526       if (GetPort(message->ports()[i], &attached_port_ref) == OK) {
527         ClosePort(attached_port_ref);
528       } else {
529         DLOG(WARNING) << "Cannot close non-existent port!\n";
530       }
531     }
532   } else if (has_next_message) {
533     delegate_->PortStatusChanged(port_ref);
534   }
535 
536   return OK;
537 }
538 
OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event)539 int Node::OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event) {
540   PortRef port_ref;
541   if (GetPort(event->port_name(), &port_ref) != OK)
542     return ERROR_PORT_UNKNOWN;
543 
544 #if DCHECK_IS_ON()
545   {
546     SinglePortLocker locker(&port_ref);
547     DVLOG(2) << "PortAccepted at " << port_ref.name() << "@" << name_
548              << " pointing to " << locker.port()->peer_port_name << "@"
549              << locker.port()->peer_node_name;
550   }
551 #endif
552 
553   return BeginProxying(port_ref);
554 }
555 
OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event)556 int Node::OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event) {
557   if (event->port_name() == kInvalidPortName) {
558     // An ObserveProxy with an invalid target port name is a broadcast used to
559     // inform ports when their peer (which was itself a proxy) has become
560     // defunct due to unexpected node disconnection.
561     //
562     // Receiving ports affected by this treat it as equivalent to peer closure.
563     // Proxies affected by this can be removed and will in turn broadcast their
564     // own death with a similar message.
565     DCHECK_EQ(event->proxy_target_node_name(), kInvalidNodeName);
566     DCHECK_EQ(event->proxy_target_port_name(), kInvalidPortName);
567     DestroyAllPortsWithPeer(event->proxy_node_name(), event->proxy_port_name());
568     return OK;
569   }
570 
571   // The port may have already been closed locally, in which case the
572   // ObserveClosure message will contain the last_sequence_num field.
573   // We can then silently ignore this message.
574   PortRef port_ref;
575   if (GetPort(event->port_name(), &port_ref) != OK) {
576     DVLOG(1) << "ObserveProxy: " << event->port_name() << "@" << name_
577              << " not found";
578     return OK;
579   }
580 
581   DVLOG(2) << "ObserveProxy at " << port_ref.name() << "@" << name_
582            << ", proxy at " << event->proxy_port_name() << "@"
583            << event->proxy_node_name() << " pointing to "
584            << event->proxy_target_port_name() << "@"
585            << event->proxy_target_node_name();
586 
587   bool update_status = false;
588   ScopedEvent event_to_forward;
589   NodeName event_target_node;
590   {
591     SinglePortLocker locker(&port_ref);
592     auto* port = locker.port();
593 
594     if (port->peer_node_name == event->proxy_node_name() &&
595         port->peer_port_name == event->proxy_port_name()) {
596       if (port->state == Port::kReceiving) {
597         port->peer_node_name = event->proxy_target_node_name();
598         port->peer_port_name = event->proxy_target_port_name();
599         event_target_node = event->proxy_node_name();
600         event_to_forward = std::make_unique<ObserveProxyAckEvent>(
601             event->proxy_port_name(), port->next_sequence_num_to_send - 1);
602         update_status = true;
603         DVLOG(2) << "Forwarding ObserveProxyAck from " << event->port_name()
604                  << "@" << name_ << " to " << event->proxy_port_name() << "@"
605                  << event_target_node;
606       } else {
607         // As a proxy ourselves, we don't know how to honor the ObserveProxy
608         // event or to populate the last_sequence_num field of ObserveProxyAck.
609         // Afterall, another port could be sending messages to our peer now
610         // that we've sent out our own ObserveProxy event.  Instead, we will
611         // send an ObserveProxyAck indicating that the ObserveProxy event
612         // should be re-sent (last_sequence_num set to kInvalidSequenceNum).
613         // However, this has to be done after we are removed as a proxy.
614         // Otherwise, we might just find ourselves back here again, which
615         // would be akin to a busy loop.
616 
617         DVLOG(2) << "Delaying ObserveProxyAck to " << event->proxy_port_name()
618                  << "@" << event->proxy_node_name();
619 
620         port->send_on_proxy_removal.reset(new std::pair<NodeName, ScopedEvent>(
621             event->proxy_node_name(),
622             std::make_unique<ObserveProxyAckEvent>(event->proxy_port_name(),
623                                                    kInvalidSequenceNum)));
624       }
625     } else {
626       // Forward this event along to our peer. Eventually, it should find the
627       // port referring to the proxy.
628       event_target_node = port->peer_node_name;
629       event->set_port_name(port->peer_port_name);
630       event_to_forward = std::move(event);
631     }
632   }
633 
634   if (event_to_forward)
635     delegate_->ForwardEvent(event_target_node, std::move(event_to_forward));
636 
637   if (update_status)
638     delegate_->PortStatusChanged(port_ref);
639 
640   return OK;
641 }
642 
OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event)643 int Node::OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event) {
644   DVLOG(2) << "ObserveProxyAck at " << event->port_name() << "@" << name_
645            << " (last_sequence_num=" << event->last_sequence_num() << ")";
646 
647   PortRef port_ref;
648   if (GetPort(event->port_name(), &port_ref) != OK)
649     return ERROR_PORT_UNKNOWN;  // The port may have observed closure first.
650 
651   bool try_remove_proxy_immediately;
652   {
653     SinglePortLocker locker(&port_ref);
654     auto* port = locker.port();
655     if (port->state != Port::kProxying)
656       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
657 
658     // If the last sequence number is invalid, this is a signal that we need to
659     // retransmit the ObserveProxy event for this port rather than flagging the
660     // the proxy for removal ASAP.
661     try_remove_proxy_immediately =
662         event->last_sequence_num() != kInvalidSequenceNum;
663     if (try_remove_proxy_immediately) {
664       // We can now remove this port once we have received and forwarded the
665       // last message addressed to this port.
666       port->remove_proxy_on_last_message = true;
667       port->last_sequence_num_to_receive = event->last_sequence_num();
668     }
669   }
670 
671   if (try_remove_proxy_immediately)
672     TryRemoveProxy(port_ref);
673   else
674     InitiateProxyRemoval(port_ref);
675 
676   return OK;
677 }
678 
OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event)679 int Node::OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event) {
680   // OK if the port doesn't exist, as it may have been closed already.
681   PortRef port_ref;
682   if (GetPort(event->port_name(), &port_ref) != OK)
683     return OK;
684 
685   // This message tells the port that it should no longer expect more messages
686   // beyond last_sequence_num. This message is forwarded along until we reach
687   // the receiving end, and this message serves as an equivalent to
688   // ObserveProxyAck.
689 
690   bool notify_delegate = false;
691   NodeName peer_node_name;
692   PortName peer_port_name;
693   bool try_remove_proxy = false;
694   {
695     SinglePortLocker locker(&port_ref);
696     auto* port = locker.port();
697 
698     port->peer_closed = true;
699     port->last_sequence_num_to_receive = event->last_sequence_num();
700 
701     DVLOG(2) << "ObserveClosure at " << port_ref.name() << "@" << name_
702              << " (state=" << port->state << ") pointing to "
703              << port->peer_port_name << "@" << port->peer_node_name
704              << " (last_sequence_num=" << event->last_sequence_num() << ")";
705 
706     // We always forward ObserveClosure, even beyond the receiving port which
707     // cares about it. This ensures that any dead-end proxies beyond that port
708     // are notified to remove themselves.
709 
710     if (port->state == Port::kReceiving) {
711       notify_delegate = true;
712 
713       // When forwarding along the other half of the port cycle, this will only
714       // reach dead-end proxies. Tell them we've sent our last message so they
715       // can go away.
716       //
717       // TODO: Repurposing ObserveClosure for this has the desired result but
718       // may be semantically confusing since the forwarding port is not actually
719       // closed. Consider replacing this with a new event type.
720       event->set_last_sequence_num(port->next_sequence_num_to_send - 1);
721     } else {
722       // We haven't yet reached the receiving peer of the closed port, so we'll
723       // forward the message along as-is.
724       // See about removing the port if it is a proxy as our peer won't be able
725       // to participate in proxy removal.
726       port->remove_proxy_on_last_message = true;
727       if (port->state == Port::kProxying)
728         try_remove_proxy = true;
729     }
730 
731     DVLOG(2) << "Forwarding ObserveClosure from " << port_ref.name() << "@"
732              << name_ << " to peer " << port->peer_port_name << "@"
733              << port->peer_node_name
734              << " (last_sequence_num=" << event->last_sequence_num() << ")";
735 
736     peer_node_name = port->peer_node_name;
737     peer_port_name = port->peer_port_name;
738   }
739 
740   if (try_remove_proxy)
741     TryRemoveProxy(port_ref);
742 
743   event->set_port_name(peer_port_name);
744   delegate_->ForwardEvent(peer_node_name, std::move(event));
745 
746   if (notify_delegate)
747     delegate_->PortStatusChanged(port_ref);
748 
749   return OK;
750 }
751 
OnMergePort(std::unique_ptr<MergePortEvent> event)752 int Node::OnMergePort(std::unique_ptr<MergePortEvent> event) {
753   PortRef port_ref;
754   GetPort(event->port_name(), &port_ref);
755 
756   DVLOG(1) << "MergePort at " << port_ref.name() << "@" << name_
757            << " merging with proxy " << event->new_port_name() << "@" << name_
758            << " pointing to " << event->new_port_descriptor().peer_port_name
759            << "@" << event->new_port_descriptor().peer_node_name
760            << " referred by "
761            << event->new_port_descriptor().referring_port_name << "@"
762            << event->new_port_descriptor().referring_node_name;
763 
764   // Accept the new port. This is now the receiving end of the other port cycle
765   // to be merged with ours. Note that we always attempt to accept the new port
766   // first as otherwise its peer receiving port could be left stranded
767   // indefinitely.
768   if (AcceptPort(event->new_port_name(), event->new_port_descriptor()) != OK) {
769     if (port_ref.is_valid())
770       ClosePort(port_ref);
771     return ERROR_PORT_STATE_UNEXPECTED;
772   }
773 
774   PortRef new_port_ref;
775   GetPort(event->new_port_name(), &new_port_ref);
776   if (!port_ref.is_valid() && new_port_ref.is_valid()) {
777     ClosePort(new_port_ref);
778     return ERROR_PORT_UNKNOWN;
779   } else if (port_ref.is_valid() && !new_port_ref.is_valid()) {
780     ClosePort(port_ref);
781     return ERROR_PORT_UNKNOWN;
782   }
783 
784   return MergePortsInternal(port_ref, new_port_ref,
785                             false /* allow_close_on_bad_state */);
786 }
787 
AddPortWithName(const PortName & port_name,scoped_refptr<Port> port)788 int Node::AddPortWithName(const PortName& port_name, scoped_refptr<Port> port) {
789   PortLocker::AssertNoPortsLockedOnCurrentThread();
790   base::AutoLock lock(ports_lock_);
791   if (!ports_.emplace(port_name, std::move(port)).second)
792     return OOPS(ERROR_PORT_EXISTS);  // Suggests a bad UUID generator.
793   DVLOG(2) << "Created port " << port_name << "@" << name_;
794   return OK;
795 }
796 
ErasePort(const PortName & port_name)797 void Node::ErasePort(const PortName& port_name) {
798   PortLocker::AssertNoPortsLockedOnCurrentThread();
799   scoped_refptr<Port> port;
800   {
801     base::AutoLock lock(ports_lock_);
802     auto it = ports_.find(port_name);
803     if (it == ports_.end())
804       return;
805     port = std::move(it->second);
806     ports_.erase(it);
807   }
808   // NOTE: We are careful not to release the port's messages while holding any
809   // locks, since they may run arbitrary user code upon destruction.
810   std::vector<std::unique_ptr<UserMessageEvent>> messages;
811   {
812     PortRef port_ref(port_name, std::move(port));
813     SinglePortLocker locker(&port_ref);
814     locker.port()->message_queue.TakeAllMessages(&messages);
815   }
816   DVLOG(2) << "Deleted port " << port_name << "@" << name_;
817 }
818 
SendUserMessageInternal(const PortRef & port_ref,std::unique_ptr<UserMessageEvent> * message)819 int Node::SendUserMessageInternal(const PortRef& port_ref,
820                                   std::unique_ptr<UserMessageEvent>* message) {
821   std::unique_ptr<UserMessageEvent>& m = *message;
822   for (size_t i = 0; i < m->num_ports(); ++i) {
823     if (m->ports()[i] == port_ref.name())
824       return ERROR_PORT_CANNOT_SEND_SELF;
825   }
826 
827   NodeName target_node;
828   int rv = PrepareToForwardUserMessage(port_ref, Port::kReceiving,
829                                        false /* ignore_closed_peer */, m.get(),
830                                        &target_node);
831   if (rv != OK)
832     return rv;
833 
834   // Beyond this point there's no sense in returning anything but OK. Even if
835   // message forwarding or acceptance fails, there's nothing the embedder can
836   // do to recover. Assume that failure beyond this point must be treated as a
837   // transport failure.
838 
839   DCHECK_NE(kInvalidNodeName, target_node);
840   if (target_node != name_) {
841     delegate_->ForwardEvent(target_node, std::move(m));
842     return OK;
843   }
844 
845   int accept_result = AcceptEvent(std::move(m));
846   if (accept_result != OK) {
847     // See comment above for why we don't return an error in this case.
848     DVLOG(2) << "AcceptEvent failed: " << accept_result;
849   }
850 
851   return OK;
852 }
853 
MergePortsInternal(const PortRef & port0_ref,const PortRef & port1_ref,bool allow_close_on_bad_state)854 int Node::MergePortsInternal(const PortRef& port0_ref,
855                              const PortRef& port1_ref,
856                              bool allow_close_on_bad_state) {
857   const PortRef* port_refs[2] = {&port0_ref, &port1_ref};
858   {
859     base::Optional<PortLocker> locker(base::in_place, port_refs, 2);
860     auto* port0 = locker->GetPort(port0_ref);
861     auto* port1 = locker->GetPort(port1_ref);
862 
863     // There are several conditions which must be met before we'll consider
864     // merging two ports:
865     //
866     // - They must both be in the kReceiving state
867     // - They must not be each other's peer
868     // - They must have never sent a user message
869     //
870     // If any of these criteria are not met, we fail early.
871     if (port0->state != Port::kReceiving || port1->state != Port::kReceiving ||
872         (port0->peer_node_name == name_ &&
873          port0->peer_port_name == port1_ref.name()) ||
874         (port1->peer_node_name == name_ &&
875          port1->peer_port_name == port0_ref.name()) ||
876         port0->next_sequence_num_to_send != kInitialSequenceNum ||
877         port1->next_sequence_num_to_send != kInitialSequenceNum) {
878       // On failure, we only close a port if it was at least properly in the
879       // |kReceiving| state. This avoids getting the system in an inconsistent
880       // state by e.g. closing a proxy abruptly.
881       //
882       // Note that we must release the port locks before closing ports.
883       const bool close_port0 =
884           port0->state == Port::kReceiving || allow_close_on_bad_state;
885       const bool close_port1 =
886           port1->state == Port::kReceiving || allow_close_on_bad_state;
887       locker.reset();
888       if (close_port0)
889         ClosePort(port0_ref);
890       if (close_port1)
891         ClosePort(port1_ref);
892       return ERROR_PORT_STATE_UNEXPECTED;
893     }
894 
895     // Swap the ports' peer information and switch them both to proxying mode.
896     std::swap(port0->peer_node_name, port1->peer_node_name);
897     std::swap(port0->peer_port_name, port1->peer_port_name);
898     port0->state = Port::kProxying;
899     port1->state = Port::kProxying;
900     if (port0->peer_closed)
901       port0->remove_proxy_on_last_message = true;
902     if (port1->peer_closed)
903       port1->remove_proxy_on_last_message = true;
904   }
905 
906   // Flush any queued messages from the new proxies and, if successful, complete
907   // the merge by initiating proxy removals.
908   if (ForwardUserMessagesFromProxy(port0_ref) == OK &&
909       ForwardUserMessagesFromProxy(port1_ref) == OK) {
910     for (size_t i = 0; i < 2; ++i) {
911       bool try_remove_proxy_immediately = false;
912       ScopedEvent closure_event;
913       NodeName closure_event_target_node;
914       {
915         SinglePortLocker locker(port_refs[i]);
916         auto* port = locker.port();
917         DCHECK(port->state == Port::kProxying);
918         try_remove_proxy_immediately = port->remove_proxy_on_last_message;
919         if (try_remove_proxy_immediately || port->peer_closed) {
920           // If either end of the port cycle is closed, we propagate an
921           // ObserveClosure event.
922           closure_event_target_node = port->peer_node_name;
923           closure_event = std::make_unique<ObserveClosureEvent>(
924               port->peer_port_name, port->last_sequence_num_to_receive);
925         }
926       }
927       if (try_remove_proxy_immediately)
928         TryRemoveProxy(*port_refs[i]);
929       else
930         InitiateProxyRemoval(*port_refs[i]);
931 
932       if (closure_event) {
933         delegate_->ForwardEvent(closure_event_target_node,
934                                 std::move(closure_event));
935       }
936     }
937 
938     return OK;
939   }
940 
941   // If we failed to forward proxied messages, we keep the system in a
942   // consistent state by undoing the peer swap and closing the ports.
943   {
944     PortLocker locker(port_refs, 2);
945     auto* port0 = locker.GetPort(port0_ref);
946     auto* port1 = locker.GetPort(port1_ref);
947     std::swap(port0->peer_node_name, port1->peer_node_name);
948     std::swap(port0->peer_port_name, port1->peer_port_name);
949     port0->remove_proxy_on_last_message = false;
950     port1->remove_proxy_on_last_message = false;
951     DCHECK_EQ(Port::kProxying, port0->state);
952     DCHECK_EQ(Port::kProxying, port1->state);
953     port0->state = Port::kReceiving;
954     port1->state = Port::kReceiving;
955   }
956 
957   ClosePort(port0_ref);
958   ClosePort(port1_ref);
959   return ERROR_PORT_STATE_UNEXPECTED;
960 }
961 
ConvertToProxy(Port * port,const NodeName & to_node_name,PortName * port_name,Event::PortDescriptor * port_descriptor)962 void Node::ConvertToProxy(Port* port,
963                           const NodeName& to_node_name,
964                           PortName* port_name,
965                           Event::PortDescriptor* port_descriptor) {
966   port->AssertLockAcquired();
967   PortName local_port_name = *port_name;
968 
969   PortName new_port_name;
970   GenerateRandomPortName(&new_port_name);
971 
972   // Make sure we don't send messages to the new peer until after we know it
973   // exists. In the meantime, just buffer messages locally.
974   DCHECK(port->state == Port::kReceiving);
975   port->state = Port::kBuffering;
976 
977   // If we already know our peer is closed, we already know this proxy can
978   // be removed once it receives and forwards its last expected message.
979   if (port->peer_closed)
980     port->remove_proxy_on_last_message = true;
981 
982   *port_name = new_port_name;
983 
984   port_descriptor->peer_node_name = port->peer_node_name;
985   port_descriptor->peer_port_name = port->peer_port_name;
986   port_descriptor->referring_node_name = name_;
987   port_descriptor->referring_port_name = local_port_name;
988   port_descriptor->next_sequence_num_to_send = port->next_sequence_num_to_send;
989   port_descriptor->next_sequence_num_to_receive =
990       port->message_queue.next_sequence_num();
991   port_descriptor->last_sequence_num_to_receive =
992       port->last_sequence_num_to_receive;
993   port_descriptor->peer_closed = port->peer_closed;
994   memset(port_descriptor->padding, 0, sizeof(port_descriptor->padding));
995 
996   // Configure the local port to point to the new port.
997   port->peer_node_name = to_node_name;
998   port->peer_port_name = new_port_name;
999 }
1000 
AcceptPort(const PortName & port_name,const Event::PortDescriptor & port_descriptor)1001 int Node::AcceptPort(const PortName& port_name,
1002                      const Event::PortDescriptor& port_descriptor) {
1003   scoped_refptr<Port> port =
1004       base::MakeRefCounted<Port>(port_descriptor.next_sequence_num_to_send,
1005                                  port_descriptor.next_sequence_num_to_receive);
1006   port->state = Port::kReceiving;
1007   port->peer_node_name = port_descriptor.peer_node_name;
1008   port->peer_port_name = port_descriptor.peer_port_name;
1009   port->last_sequence_num_to_receive =
1010       port_descriptor.last_sequence_num_to_receive;
1011   port->peer_closed = port_descriptor.peer_closed;
1012 
1013   DVLOG(2) << "Accepting port " << port_name
1014            << " [peer_closed=" << port->peer_closed
1015            << "; last_sequence_num_to_receive="
1016            << port->last_sequence_num_to_receive << "]";
1017 
1018   // A newly accepted port is not signalable until the message referencing the
1019   // new port finds its way to the consumer (see GetMessage).
1020   port->message_queue.set_signalable(false);
1021 
1022   int rv = AddPortWithName(port_name, std::move(port));
1023   if (rv != OK)
1024     return rv;
1025 
1026   // Allow referring port to forward messages.
1027   delegate_->ForwardEvent(
1028       port_descriptor.referring_node_name,
1029       std::make_unique<PortAcceptedEvent>(port_descriptor.referring_port_name));
1030   return OK;
1031 }
1032 
PrepareToForwardUserMessage(const PortRef & forwarding_port_ref,Port::State expected_port_state,bool ignore_closed_peer,UserMessageEvent * message,NodeName * forward_to_node)1033 int Node::PrepareToForwardUserMessage(const PortRef& forwarding_port_ref,
1034                                       Port::State expected_port_state,
1035                                       bool ignore_closed_peer,
1036                                       UserMessageEvent* message,
1037                                       NodeName* forward_to_node) {
1038   bool target_is_remote = false;
1039   for (;;) {
1040     NodeName target_node_name;
1041     {
1042       SinglePortLocker locker(&forwarding_port_ref);
1043       target_node_name = locker.port()->peer_node_name;
1044     }
1045 
1046     // NOTE: This may call out to arbitrary user code, so it's important to call
1047     // it only while no port locks are held on the calling thread.
1048     if (target_node_name != name_) {
1049       if (!message->NotifyWillBeRoutedExternally()) {
1050         LOG(ERROR) << "NotifyWillBeRoutedExternally failed unexpectedly.";
1051         return ERROR_PORT_STATE_UNEXPECTED;
1052       }
1053     }
1054 
1055     // Simultaneously lock the forwarding port as well as all attached ports.
1056     base::StackVector<PortRef, 4> attached_port_refs;
1057     base::StackVector<const PortRef*, 5> ports_to_lock;
1058     attached_port_refs.container().resize(message->num_ports());
1059     ports_to_lock.container().resize(message->num_ports() + 1);
1060     ports_to_lock[0] = &forwarding_port_ref;
1061     for (size_t i = 0; i < message->num_ports(); ++i) {
1062       GetPort(message->ports()[i], &attached_port_refs[i]);
1063       DCHECK(attached_port_refs[i].is_valid());
1064       ports_to_lock[i + 1] = &attached_port_refs[i];
1065     }
1066     PortLocker locker(ports_to_lock.container().data(),
1067                       ports_to_lock.container().size());
1068     auto* forwarding_port = locker.GetPort(forwarding_port_ref);
1069 
1070     if (forwarding_port->peer_node_name != target_node_name) {
1071       // The target node has already changed since we last held the lock.
1072       if (target_node_name == name_) {
1073         // If the target node was previously this local node, we need to restart
1074         // the loop, since that means we may now route the message externally.
1075         continue;
1076       }
1077 
1078       target_node_name = forwarding_port->peer_node_name;
1079     }
1080     target_is_remote = target_node_name != name_;
1081 
1082     if (forwarding_port->state != expected_port_state)
1083       return ERROR_PORT_STATE_UNEXPECTED;
1084     if (forwarding_port->peer_closed && !ignore_closed_peer)
1085       return ERROR_PORT_PEER_CLOSED;
1086 
1087     // Messages may already have a sequence number if they're being forwarded by
1088     // a proxy. Otherwise, use the next outgoing sequence number.
1089     if (message->sequence_num() == 0)
1090       message->set_sequence_num(forwarding_port->next_sequence_num_to_send++);
1091 #if DCHECK_IS_ON()
1092     std::ostringstream ports_buf;
1093     for (size_t i = 0; i < message->num_ports(); ++i) {
1094       if (i > 0)
1095         ports_buf << ",";
1096       ports_buf << message->ports()[i];
1097     }
1098 #endif
1099 
1100     if (message->num_ports() > 0) {
1101       // Sanity check to make sure we can actually send all the attached ports.
1102       // They must all be in the |kReceiving| state and must not be the sender's
1103       // own peer.
1104       DCHECK_EQ(message->num_ports(), attached_port_refs.container().size());
1105       for (size_t i = 0; i < message->num_ports(); ++i) {
1106         auto* attached_port = locker.GetPort(attached_port_refs[i]);
1107         int error = OK;
1108         if (attached_port->state != Port::kReceiving) {
1109           error = ERROR_PORT_STATE_UNEXPECTED;
1110         } else if (attached_port_refs[i].name() ==
1111                    forwarding_port->peer_port_name) {
1112           error = ERROR_PORT_CANNOT_SEND_PEER;
1113         }
1114 
1115         if (error != OK) {
1116           // Not going to send. Backpedal on the sequence number.
1117           forwarding_port->next_sequence_num_to_send--;
1118           return error;
1119         }
1120       }
1121 
1122       if (target_is_remote) {
1123         // We only bother to proxy and rewrite ports in the event if it's
1124         // going to be routed to an external node. This substantially reduces
1125         // the amount of port churn in the system, as many port-carrying
1126         // events are routed at least 1 or 2 intra-node hops before (if ever)
1127         // being routed externally.
1128         Event::PortDescriptor* port_descriptors = message->port_descriptors();
1129         for (size_t i = 0; i < message->num_ports(); ++i) {
1130           ConvertToProxy(locker.GetPort(attached_port_refs[i]),
1131                          target_node_name, message->ports() + i,
1132                          port_descriptors + i);
1133         }
1134       }
1135     }
1136 
1137 #if DCHECK_IS_ON()
1138     DVLOG(4) << "Sending message " << message->sequence_num()
1139              << " [ports=" << ports_buf.str() << "]"
1140              << " from " << forwarding_port_ref.name() << "@" << name_ << " to "
1141              << forwarding_port->peer_port_name << "@" << target_node_name;
1142 #endif
1143 
1144     *forward_to_node = target_node_name;
1145     message->set_port_name(forwarding_port->peer_port_name);
1146     break;
1147   }
1148 
1149   if (target_is_remote) {
1150     for (size_t i = 0; i < message->num_ports(); ++i) {
1151       // For any ports that were converted to proxies above, make sure their
1152       // prior local peer (if applicable) receives a status update so it can be
1153       // made aware of its peer's location.
1154       const Event::PortDescriptor& descriptor = message->port_descriptors()[i];
1155       if (descriptor.peer_node_name == name_) {
1156         PortRef local_peer;
1157         if (GetPort(descriptor.peer_port_name, &local_peer) == OK)
1158           delegate_->PortStatusChanged(local_peer);
1159       }
1160     }
1161   }
1162 
1163   return OK;
1164 }
1165 
BeginProxying(const PortRef & port_ref)1166 int Node::BeginProxying(const PortRef& port_ref) {
1167   {
1168     SinglePortLocker locker(&port_ref);
1169     auto* port = locker.port();
1170     if (port->state != Port::kBuffering)
1171       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1172     port->state = Port::kProxying;
1173   }
1174 
1175   int rv = ForwardUserMessagesFromProxy(port_ref);
1176   if (rv != OK)
1177     return rv;
1178 
1179   bool try_remove_proxy_immediately;
1180   ScopedEvent closure_event;
1181   NodeName closure_target_node;
1182   {
1183     SinglePortLocker locker(&port_ref);
1184     auto* port = locker.port();
1185     if (port->state != Port::kProxying)
1186       return OOPS(ERROR_PORT_STATE_UNEXPECTED);
1187 
1188     try_remove_proxy_immediately = port->remove_proxy_on_last_message;
1189     if (try_remove_proxy_immediately) {
1190       // Make sure we propagate closure to our current peer.
1191       closure_target_node = port->peer_node_name;
1192       closure_event = std::make_unique<ObserveClosureEvent>(
1193           port->peer_port_name, port->last_sequence_num_to_receive);
1194     }
1195   }
1196 
1197   if (try_remove_proxy_immediately) {
1198     TryRemoveProxy(port_ref);
1199     delegate_->ForwardEvent(closure_target_node, std::move(closure_event));
1200   } else {
1201     InitiateProxyRemoval(port_ref);
1202   }
1203 
1204   return OK;
1205 }
1206 
ForwardUserMessagesFromProxy(const PortRef & port_ref)1207 int Node::ForwardUserMessagesFromProxy(const PortRef& port_ref) {
1208   for (;;) {
1209     // NOTE: We forward messages in sequential order here so that we maintain
1210     // the message queue's notion of next sequence number. That's useful for the
1211     // proxy removal process as we can tell when this port has seen all of the
1212     // messages it is expected to see.
1213     std::unique_ptr<UserMessageEvent> message;
1214     {
1215       SinglePortLocker locker(&port_ref);
1216       locker.port()->message_queue.GetNextMessage(&message, nullptr);
1217       if (!message)
1218         break;
1219     }
1220 
1221     NodeName target_node;
1222     int rv = PrepareToForwardUserMessage(port_ref, Port::kProxying,
1223                                          true /* ignore_closed_peer */,
1224                                          message.get(), &target_node);
1225     if (rv != OK)
1226       return rv;
1227 
1228     delegate_->ForwardEvent(target_node, std::move(message));
1229   }
1230   return OK;
1231 }
1232 
InitiateProxyRemoval(const PortRef & port_ref)1233 void Node::InitiateProxyRemoval(const PortRef& port_ref) {
1234   NodeName peer_node_name;
1235   PortName peer_port_name;
1236   {
1237     SinglePortLocker locker(&port_ref);
1238     auto* port = locker.port();
1239     peer_node_name = port->peer_node_name;
1240     peer_port_name = port->peer_port_name;
1241   }
1242 
1243   // To remove this node, we start by notifying the connected graph that we are
1244   // a proxy. This allows whatever port is referencing this node to skip it.
1245   // Eventually, this node will receive ObserveProxyAck (or ObserveClosure if
1246   // the peer was closed in the meantime).
1247   delegate_->ForwardEvent(peer_node_name,
1248                           std::make_unique<ObserveProxyEvent>(
1249                               peer_port_name, name_, port_ref.name(),
1250                               peer_node_name, peer_port_name));
1251 }
1252 
TryRemoveProxy(const PortRef & port_ref)1253 void Node::TryRemoveProxy(const PortRef& port_ref) {
1254   bool should_erase = false;
1255   NodeName removal_target_node;
1256   ScopedEvent removal_event;
1257 
1258   {
1259     SinglePortLocker locker(&port_ref);
1260     auto* port = locker.port();
1261     DCHECK(port->state == Port::kProxying);
1262 
1263     // Make sure we have seen ObserveProxyAck before removing the port.
1264     if (!port->remove_proxy_on_last_message)
1265       return;
1266 
1267     if (!CanAcceptMoreMessages(port)) {
1268       should_erase = true;
1269       if (port->send_on_proxy_removal) {
1270         removal_target_node = port->send_on_proxy_removal->first;
1271         removal_event = std::move(port->send_on_proxy_removal->second);
1272       }
1273     } else {
1274       DVLOG(2) << "Cannot remove port " << port_ref.name() << "@" << name_
1275                << " now; waiting for more messages";
1276     }
1277   }
1278 
1279   if (should_erase)
1280     ErasePort(port_ref.name());
1281 
1282   if (removal_event)
1283     delegate_->ForwardEvent(removal_target_node, std::move(removal_event));
1284 }
1285 
DestroyAllPortsWithPeer(const NodeName & node_name,const PortName & port_name)1286 void Node::DestroyAllPortsWithPeer(const NodeName& node_name,
1287                                    const PortName& port_name) {
1288   // Wipes out all ports whose peer node matches |node_name| and whose peer port
1289   // matches |port_name|. If |port_name| is |kInvalidPortName|, only the peer
1290   // node is matched.
1291 
1292   std::vector<PortRef> ports_to_notify;
1293   std::vector<PortName> dead_proxies_to_broadcast;
1294   std::vector<std::unique_ptr<UserMessageEvent>> undelivered_messages;
1295 
1296   {
1297     PortLocker::AssertNoPortsLockedOnCurrentThread();
1298     base::AutoLock ports_lock(ports_lock_);
1299 
1300     for (auto iter = ports_.begin(); iter != ports_.end(); ++iter) {
1301       PortRef port_ref(iter->first, iter->second);
1302       {
1303         SinglePortLocker locker(&port_ref);
1304         auto* port = locker.port();
1305 
1306         if (port->peer_node_name == node_name &&
1307             (port_name == kInvalidPortName ||
1308              port->peer_port_name == port_name)) {
1309           if (!port->peer_closed) {
1310             // Treat this as immediate peer closure. It's an exceptional
1311             // condition akin to a broken pipe, so we don't care about losing
1312             // messages.
1313 
1314             port->peer_closed = true;
1315             port->last_sequence_num_to_receive =
1316                 port->message_queue.next_sequence_num() - 1;
1317 
1318             if (port->state == Port::kReceiving)
1319               ports_to_notify.push_back(PortRef(iter->first, port));
1320           }
1321 
1322           // We don't expect to forward any further messages, and we don't
1323           // expect to receive a Port{Accepted,Rejected} event. Because we're
1324           // a proxy with no active peer, we cannot use the normal proxy removal
1325           // procedure of forward-propagating an ObserveProxy. Instead we
1326           // broadcast our own death so it can be back-propagated. This is
1327           // inefficient but rare.
1328           if (port->state != Port::kReceiving) {
1329             dead_proxies_to_broadcast.push_back(iter->first);
1330             std::vector<std::unique_ptr<UserMessageEvent>> messages;
1331             iter->second->message_queue.TakeAllMessages(&messages);
1332             for (auto& message : messages)
1333               undelivered_messages.emplace_back(std::move(message));
1334           }
1335         }
1336       }
1337     }
1338   }
1339 
1340   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1341     ErasePort(proxy_name);
1342     DVLOG(2) << "Forcibly deleted port " << proxy_name << "@" << name_;
1343   }
1344 
1345   // Wake up any receiving ports who have just observed simulated peer closure.
1346   for (const auto& port : ports_to_notify)
1347     delegate_->PortStatusChanged(port);
1348 
1349   for (const auto& proxy_name : dead_proxies_to_broadcast) {
1350     // Broadcast an event signifying that this proxy is no longer functioning.
1351     delegate_->BroadcastEvent(std::make_unique<ObserveProxyEvent>(
1352         kInvalidPortName, name_, proxy_name, kInvalidNodeName,
1353         kInvalidPortName));
1354 
1355     // Also process death locally since the port that points this closed one
1356     // could be on the current node.
1357     // Note: Although this is recursive, only a single port is involved which
1358     // limits the expected branching to 1.
1359     DestroyAllPortsWithPeer(name_, proxy_name);
1360   }
1361 
1362   // Close any ports referenced by undelivered messages.
1363   for (const auto& message : undelivered_messages) {
1364     for (size_t i = 0; i < message->num_ports(); ++i) {
1365       PortRef ref;
1366       if (GetPort(message->ports()[i], &ref) == OK)
1367         ClosePort(ref);
1368     }
1369   }
1370 }
1371 
DelegateHolder(Node * node,NodeDelegate * delegate)1372 Node::DelegateHolder::DelegateHolder(Node* node, NodeDelegate* delegate)
1373     : node_(node), delegate_(delegate) {
1374   DCHECK(node_);
1375 }
1376 
~DelegateHolder()1377 Node::DelegateHolder::~DelegateHolder() {}
1378 
1379 #if DCHECK_IS_ON()
EnsureSafeDelegateAccess() const1380 void Node::DelegateHolder::EnsureSafeDelegateAccess() const {
1381   PortLocker::AssertNoPortsLockedOnCurrentThread();
1382   base::AutoLock lock(node_->ports_lock_);
1383 }
1384 #endif
1385 
1386 }  // namespace ports
1387 }  // namespace core
1388 }  // namespace mojo
1389