1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
7 
8 #include <memory>
9 #include <queue>
10 #include <unordered_map>
11 #include <unordered_set>
12 #include <utility>
13 #include <vector>
14 
15 #include "base/callback.h"
16 #include "base/containers/hash_tables.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/task_runner.h"
20 #include "mojo/edk/embedder/platform_handle_vector.h"
21 #include "mojo/edk/embedder/platform_shared_buffer.h"
22 #include "mojo/edk/embedder/scoped_platform_handle.h"
23 #include "mojo/edk/system/atomic_flag.h"
24 #include "mojo/edk/system/node_channel.h"
25 #include "mojo/edk/system/ports/name.h"
26 #include "mojo/edk/system/ports/node.h"
27 #include "mojo/edk/system/ports/node_delegate.h"
28 
29 namespace base {
30 class PortProvider;
31 }
32 
33 namespace mojo {
34 namespace edk {
35 
36 class Broker;
37 class Core;
38 class MachPortRelay;
39 class PortsMessage;
40 
41 // The owner of ports::Node which facilitates core EDK implementation. All
42 // public interface methods are safe to call from any thread.
43 class NodeController : public ports::NodeDelegate,
44                        public NodeChannel::Delegate {
45  public:
46   class PortObserver : public ports::UserData {
47    public:
48     virtual void OnPortStatusChanged() = 0;
49 
50    protected:
~PortObserver()51     ~PortObserver() override {}
52   };
53 
54   // |core| owns and out-lives us.
55   explicit NodeController(Core* core);
56   ~NodeController() override;
57 
name()58   const ports::NodeName& name() const { return name_; }
core()59   Core* core() const { return core_; }
node()60   ports::Node* node() const { return node_.get(); }
io_task_runner()61   scoped_refptr<base::TaskRunner> io_task_runner() const {
62     return io_task_runner_;
63   }
64 
65 #if defined(OS_MACOSX) && !defined(OS_IOS)
66   // Create the relay used to transfer mach ports between processes.
67   void CreateMachPortRelay(base::PortProvider* port_provider);
68 #endif
69 
70   // Called exactly once, shortly after construction, and before any other
71   // methods are called on this object.
72   void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
73 
74   // Connects this node to a child node. This node will initiate a handshake.
75   void ConnectToChild(base::ProcessHandle process_handle,
76                       ConnectionParams connection_params,
77                       const std::string& child_token,
78                       const ProcessErrorCallback& process_error_callback);
79 
80   // Closes all reserved ports which associated with the child process
81   // |child_token|.
82   void CloseChildPorts(const std::string& child_token);
83 
84   // Close a connection to a peer associated with |peer_token|.
85   void ClosePeerConnection(const std::string& peer_token);
86 
87   // Connects this node to a parent node. The parent node will initiate a
88   // handshake.
89   void ConnectToParent(ConnectionParams connection_params);
90 
91   // Connects this node to a peer node. On success, |port| will be merged with
92   // the corresponding port in the peer node.
93   void ConnectToPeer(ConnectionParams connection_params,
94                      const ports::PortRef& port,
95                      const std::string& peer_token);
96 
97   // Sets a port's observer. If |observer| is null the port's current observer
98   // is removed.
99   void SetPortObserver(const ports::PortRef& port,
100                        scoped_refptr<PortObserver> observer);
101 
102   // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
103   // it ensures the port's observer has also been removed.
104   void ClosePort(const ports::PortRef& port);
105 
106   // Sends a message on a port to its peer.
107   int SendMessage(const ports::PortRef& port_ref,
108                   std::unique_ptr<PortsMessage> message);
109 
110   // Reserves a local port |port| associated with |token|. A peer holding a copy
111   // of |token| can merge one of its own ports into this one.
112   void ReservePort(const std::string& token, const ports::PortRef& port,
113                    const std::string& child_token);
114 
115   // Merges a local port |port| into a port reserved by |token| in the parent.
116   void MergePortIntoParent(const std::string& token,
117                            const ports::PortRef& port);
118 
119   // Merges two local ports together.
120   int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
121 
122   // Creates a new shared buffer for use in the current process.
123   scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
124 
125   // Request that the Node be shut down cleanly. This may take an arbitrarily
126   // long time to complete, at which point |callback| will be called.
127   //
128   // Note that while it is safe to continue using the NodeController's public
129   // interface after requesting shutdown, you do so at your own risk and there
130   // is NO guarantee that new messages will be sent or ports will complete
131   // transfer.
132   void RequestShutdown(const base::Closure& callback);
133 
134   // Notifies the NodeController that we received a bad message from the given
135   // node.
136   void NotifyBadMessageFrom(const ports::NodeName& source_node,
137                             const std::string& error);
138 
139  private:
140   friend Core;
141 
142   using NodeMap = std::unordered_map<ports::NodeName,
143                                      scoped_refptr<NodeChannel>>;
144   using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
145 
146   struct ReservedPort {
147     ports::PortRef port;
148     const std::string child_token;
149   };
150 
151   struct PeerConnection {
152     PeerConnection();
153     PeerConnection(const PeerConnection& other);
154     PeerConnection(PeerConnection&& other);
155     PeerConnection(scoped_refptr<NodeChannel> channel,
156                    const ports::PortRef& local_port,
157                    const std::string& peer_token);
158     ~PeerConnection();
159 
160     PeerConnection& operator=(const PeerConnection& other);
161     PeerConnection& operator=(PeerConnection&& other);
162 
163 
164     scoped_refptr<NodeChannel> channel;
165     ports::PortRef local_port;
166     std::string peer_token;
167   };
168 
169   void ConnectToChildOnIOThread(
170       base::ProcessHandle process_handle,
171       ConnectionParams connection_params,
172       ports::NodeName token,
173       const ProcessErrorCallback& process_error_callback);
174   void ConnectToParentOnIOThread(ConnectionParams connection_params);
175 
176   void ConnectToPeerOnIOThread(ConnectionParams connection_params,
177                                ports::NodeName token,
178                                ports::PortRef port,
179                                const std::string& peer_token);
180   void ClosePeerConnectionOnIOThread(const std::string& node_name);
181 
182   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
183   scoped_refptr<NodeChannel> GetParentChannel();
184   scoped_refptr<NodeChannel> GetBrokerChannel();
185 
186   void AddPeer(const ports::NodeName& name,
187                scoped_refptr<NodeChannel> channel,
188                bool start_channel);
189   void DropPeer(const ports::NodeName& name, NodeChannel* channel);
190   void SendPeerMessage(const ports::NodeName& name,
191                        ports::ScopedMessage message);
192   void AcceptIncomingMessages();
193   void ProcessIncomingMessages();
194   void DropAllPeers();
195 
196   // ports::NodeDelegate:
197   void GenerateRandomPortName(ports::PortName* port_name) override;
198   void AllocMessage(size_t num_header_bytes,
199                     ports::ScopedMessage* message) override;
200   void ForwardMessage(const ports::NodeName& node,
201                       ports::ScopedMessage message) override;
202   void BroadcastMessage(ports::ScopedMessage message) override;
203   void PortStatusChanged(const ports::PortRef& port) override;
204 
205   // NodeChannel::Delegate:
206   void OnAcceptChild(const ports::NodeName& from_node,
207                      const ports::NodeName& parent_name,
208                      const ports::NodeName& token) override;
209   void OnAcceptParent(const ports::NodeName& from_node,
210                       const ports::NodeName& token,
211                       const ports::NodeName& child_name) override;
212   void OnAddBrokerClient(const ports::NodeName& from_node,
213                          const ports::NodeName& client_name,
214                          base::ProcessHandle process_handle) override;
215   void OnBrokerClientAdded(const ports::NodeName& from_node,
216                            const ports::NodeName& client_name,
217                            ScopedPlatformHandle broker_channel) override;
218   void OnAcceptBrokerClient(const ports::NodeName& from_node,
219                             const ports::NodeName& broker_name,
220                             ScopedPlatformHandle broker_channel) override;
221   void OnPortsMessage(const ports::NodeName& from_node,
222                       Channel::MessagePtr message) override;
223   void OnRequestPortMerge(const ports::NodeName& from_node,
224                           const ports::PortName& connector_port_name,
225                           const std::string& token) override;
226   void OnRequestIntroduction(const ports::NodeName& from_node,
227                              const ports::NodeName& name) override;
228   void OnIntroduce(const ports::NodeName& from_node,
229                    const ports::NodeName& name,
230                    ScopedPlatformHandle channel_handle) override;
231   void OnBroadcast(const ports::NodeName& from_node,
232                    Channel::MessagePtr message) override;
233 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
234   void OnRelayPortsMessage(const ports::NodeName& from_node,
235                            base::ProcessHandle from_process,
236                            const ports::NodeName& destination,
237                            Channel::MessagePtr message) override;
238   void OnPortsMessageFromRelay(const ports::NodeName& from_node,
239                                const ports::NodeName& source_node,
240                                Channel::MessagePtr message) override;
241 #endif
242   void OnAcceptPeer(const ports::NodeName& from_node,
243                     const ports::NodeName& token,
244                     const ports::NodeName& peer_name,
245                     const ports::PortName& port_name) override;
246   void OnChannelError(const ports::NodeName& from_node,
247                       NodeChannel* channel) override;
248 #if defined(OS_MACOSX) && !defined(OS_IOS)
249   MachPortRelay* GetMachPortRelay() override;
250 #endif
251 
252   // Cancels all pending port merges. These are merges which are supposed to
253   // be requested from the parent ASAP, and they may be cancelled if the
254   // connection to the parent is broken or never established.
255   void CancelPendingPortMerges();
256 
257   // Marks this NodeController for destruction when the IO thread shuts down.
258   // This is used in case Core is torn down before the IO thread. Must only be
259   // called on the IO thread.
260   void DestroyOnIOThreadShutdown();
261 
262   // If there is a registered shutdown callback (meaning shutdown has been
263   // requested, this checks the Node's status to see if clean shutdown is
264   // possible. If so, shutdown is performed and the shutdown callback is run.
265   void AttemptShutdownIfRequested();
266 
267   // These are safe to access from any thread as long as the Node is alive.
268   Core* const core_;
269   const ports::NodeName name_;
270   const std::unique_ptr<ports::Node> node_;
271   scoped_refptr<base::TaskRunner> io_task_runner_;
272 
273   // Guards |peers_| and |pending_peer_messages_|.
274   base::Lock peers_lock_;
275 
276   // Channels to known peers, including parent and children, if any.
277   NodeMap peers_;
278 
279   // Outgoing message queues for peers we've heard of but can't yet talk to.
280   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
281       pending_peer_messages_;
282 
283   // Guards |reserved_ports_| and |pending_child_tokens_|.
284   base::Lock reserved_ports_lock_;
285 
286   // Ports reserved by token. Key is the port token.
287   base::hash_map<std::string, ReservedPort> reserved_ports_;
288   // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't
289   // have one yet :(
290   std::unordered_map<ports::NodeName, std::string> pending_child_tokens_;
291 
292   // Guards |pending_port_merges_| and |reject_pending_merges_|.
293   base::Lock pending_port_merges_lock_;
294 
295   // A set of port merge requests awaiting parent connection.
296   std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
297 
298   // Indicates that new merge requests should be rejected because the parent has
299   // disconnected.
300   bool reject_pending_merges_ = false;
301 
302   // Guards |parent_name_| and |bootstrap_parent_channel_|.
303   base::Lock parent_lock_;
304 
305   // The name of our parent node, if any.
306   ports::NodeName parent_name_;
307 
308   // A temporary reference to the parent channel before we know their name.
309   scoped_refptr<NodeChannel> bootstrap_parent_channel_;
310 
311   // Guards |broker_name_|, |pending_broker_clients_|, and
312   // |pending_relay_messages_|.
313   base::Lock broker_lock_;
314 
315   // The name of our broker node, if any.
316   ports::NodeName broker_name_;
317 
318   // A queue of pending child names waiting to be connected to a broker.
319   std::queue<ports::NodeName> pending_broker_clients_;
320 
321   // Messages waiting to be relayed by the broker once it's known.
322   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
323       pending_relay_messages_;
324 
325   // Guards |incoming_messages_| and |incoming_messages_task_posted_|.
326   base::Lock messages_lock_;
327   std::queue<ports::ScopedMessage> incoming_messages_;
328   // Ensures that there is only one incoming messages task posted to the IO
329   // thread.
330   bool incoming_messages_task_posted_ = false;
331   // Flag to fast-path checking |incoming_messages_|.
332   AtomicFlag incoming_messages_flag_;
333 
334   // Guards |shutdown_callback_|.
335   base::Lock shutdown_lock_;
336 
337   // Set by RequestShutdown(). If this is non-null, the controller will
338   // begin polling the Node to see if clean shutdown is possible any time the
339   // Node's state is modified by the controller.
340   base::Closure shutdown_callback_;
341   // Flag to fast-path checking |shutdown_callback_|.
342   AtomicFlag shutdown_callback_flag_;
343 
344   // All other fields below must only be accessed on the I/O thread, i.e., the
345   // thread on which core_->io_task_runner() runs tasks.
346 
347   // Channels to children during handshake.
348   NodeMap pending_children_;
349 
350   using PeerNodeMap =
351       std::unordered_map<ports::NodeName, PeerConnection>;
352   PeerNodeMap peer_connections_;
353 
354   // Maps from peer token to node name, pending or not.
355   std::unordered_map<std::string, ports::NodeName> peers_by_token_;
356 
357   // Indicates whether this object should delete itself on IO thread shutdown.
358   // Must only be accessed from the IO thread.
359   bool destroy_on_io_thread_shutdown_ = false;
360 
361 #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
362   // Broker for sync shared buffer creation in children.
363   std::unique_ptr<Broker> broker_;
364 #endif
365 
366 #if defined(OS_MACOSX) && !defined(OS_IOS)
367   base::Lock mach_port_relay_lock_;
368   // Relay for transferring mach ports to/from children.
369   std::unique_ptr<MachPortRelay> mach_port_relay_;
370 #endif
371 
372   DISALLOW_COPY_AND_ASSIGN(NodeController);
373 };
374 
375 }  // namespace edk
376 }  // namespace mojo
377 
378 #endif  // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
379