1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
6 #define MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
7 
8 #include <memory>
9 #include <queue>
10 #include <unordered_map>
11 #include <unordered_set>
12 #include <utility>
13 #include <vector>
14 
15 #include "base/callback.h"
16 #include "base/containers/hash_tables.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/task_runner.h"
20 #include "mojo/edk/embedder/platform_handle_vector.h"
21 #include "mojo/edk/embedder/platform_shared_buffer.h"
22 #include "mojo/edk/embedder/scoped_platform_handle.h"
23 #include "mojo/edk/system/atomic_flag.h"
24 #include "mojo/edk/system/node_channel.h"
25 #include "mojo/edk/system/ports/name.h"
26 #include "mojo/edk/system/ports/node.h"
27 #include "mojo/edk/system/ports/node_delegate.h"
28 
29 namespace base {
30 class PortProvider;
31 }
32 
33 namespace mojo {
34 namespace edk {
35 
36 class Broker;
37 class Core;
38 class MachPortRelay;
39 class PortsMessage;
40 
41 // The owner of ports::Node which facilitates core EDK implementation. All
42 // public interface methods are safe to call from any thread.
43 class NodeController : public ports::NodeDelegate,
44                        public NodeChannel::Delegate {
45  public:
46   class PortObserver : public ports::UserData {
47    public:
48     virtual void OnPortStatusChanged() = 0;
49 
50    protected:
~PortObserver()51     ~PortObserver() override {}
52   };
53 
54   // |core| owns and out-lives us.
55   explicit NodeController(Core* core);
56   ~NodeController() override;
57 
name()58   const ports::NodeName& name() const { return name_; }
core()59   Core* core() const { return core_; }
node()60   ports::Node* node() const { return node_.get(); }
io_task_runner()61   scoped_refptr<base::TaskRunner> io_task_runner() const {
62     return io_task_runner_;
63   }
64 
65 #if defined(OS_MACOSX) && !defined(OS_IOS)
66   // Create the relay used to transfer mach ports between processes.
67   void CreateMachPortRelay(base::PortProvider* port_provider);
68 #endif
69 
70   // Called exactly once, shortly after construction, and before any other
71   // methods are called on this object.
72   void SetIOTaskRunner(scoped_refptr<base::TaskRunner> io_task_runner);
73 
74   // Connects this node to a child node. This node will initiate a handshake.
75   void ConnectToChild(base::ProcessHandle process_handle,
76                       ScopedPlatformHandle platform_handle,
77                       const std::string& child_token,
78                       const ProcessErrorCallback& process_error_callback);
79 
80   // Closes all reserved ports which associated with the child process
81   // |child_token|.
82   void CloseChildPorts(const std::string& child_token);
83 
84   // Connects this node to a parent node. The parent node will initiate a
85   // handshake.
86   void ConnectToParent(ScopedPlatformHandle platform_handle);
87 
88   // Sets a port's observer. If |observer| is null the port's current observer
89   // is removed.
90   void SetPortObserver(const ports::PortRef& port,
91                        const scoped_refptr<PortObserver>& observer);
92 
93   // Closes a port. Use this in lieu of calling Node::ClosePort() directly, as
94   // it ensures the port's observer has also been removed.
95   void ClosePort(const ports::PortRef& port);
96 
97   // Sends a message on a port to its peer.
98   int SendMessage(const ports::PortRef& port_ref,
99                   std::unique_ptr<PortsMessage> message);
100 
101   // Reserves a local port |port| associated with |token|. A peer holding a copy
102   // of |token| can merge one of its own ports into this one.
103   void ReservePort(const std::string& token, const ports::PortRef& port,
104                    const std::string& child_token);
105 
106   // Merges a local port |port| into a port reserved by |token| in the parent.
107   void MergePortIntoParent(const std::string& token,
108                            const ports::PortRef& port);
109 
110   // Merges two local ports together.
111   int MergeLocalPorts(const ports::PortRef& port0, const ports::PortRef& port1);
112 
113   // Creates a new shared buffer for use in the current process.
114   scoped_refptr<PlatformSharedBuffer> CreateSharedBuffer(size_t num_bytes);
115 
116   // Request that the Node be shut down cleanly. This may take an arbitrarily
117   // long time to complete, at which point |callback| will be called.
118   //
119   // Note that while it is safe to continue using the NodeController's public
120   // interface after requesting shutdown, you do so at your own risk and there
121   // is NO guarantee that new messages will be sent or ports will complete
122   // transfer.
123   void RequestShutdown(const base::Closure& callback);
124 
125   // Notifies the NodeController that we received a bad message from the given
126   // node.
127   void NotifyBadMessageFrom(const ports::NodeName& source_node,
128                             const std::string& error);
129 
130  private:
131   friend Core;
132 
133   using NodeMap = std::unordered_map<ports::NodeName,
134                                      scoped_refptr<NodeChannel>>;
135   using OutgoingMessageQueue = std::queue<Channel::MessagePtr>;
136 
137   struct ReservedPort {
138     ports::PortRef port;
139     const std::string child_token;
140   };
141 
142   void ConnectToChildOnIOThread(
143       base::ProcessHandle process_handle,
144       ScopedPlatformHandle platform_handle,
145       ports::NodeName token,
146       const ProcessErrorCallback& process_error_callback);
147   void ConnectToParentOnIOThread(ScopedPlatformHandle platform_handle);
148 
149   scoped_refptr<NodeChannel> GetPeerChannel(const ports::NodeName& name);
150   scoped_refptr<NodeChannel> GetParentChannel();
151   scoped_refptr<NodeChannel> GetBrokerChannel();
152 
153   void AddPeer(const ports::NodeName& name,
154                scoped_refptr<NodeChannel> channel,
155                bool start_channel);
156   void DropPeer(const ports::NodeName& name, NodeChannel* channel);
157   void SendPeerMessage(const ports::NodeName& name,
158                        ports::ScopedMessage message);
159   void AcceptIncomingMessages();
160   void ProcessIncomingMessages();
161   void DropAllPeers();
162 
163   // ports::NodeDelegate:
164   void GenerateRandomPortName(ports::PortName* port_name) override;
165   void AllocMessage(size_t num_header_bytes,
166                     ports::ScopedMessage* message) override;
167   void ForwardMessage(const ports::NodeName& node,
168                       ports::ScopedMessage message) override;
169   void BroadcastMessage(ports::ScopedMessage message) override;
170   void PortStatusChanged(const ports::PortRef& port) override;
171 
172   // NodeChannel::Delegate:
173   void OnAcceptChild(const ports::NodeName& from_node,
174                      const ports::NodeName& parent_name,
175                      const ports::NodeName& token) override;
176   void OnAcceptParent(const ports::NodeName& from_node,
177                       const ports::NodeName& token,
178                       const ports::NodeName& child_name) override;
179   void OnAddBrokerClient(const ports::NodeName& from_node,
180                          const ports::NodeName& client_name,
181                          base::ProcessHandle process_handle) override;
182   void OnBrokerClientAdded(const ports::NodeName& from_node,
183                            const ports::NodeName& client_name,
184                            ScopedPlatformHandle broker_channel) override;
185   void OnAcceptBrokerClient(const ports::NodeName& from_node,
186                             const ports::NodeName& broker_name,
187                             ScopedPlatformHandle broker_channel) override;
188   void OnPortsMessage(const ports::NodeName& from_node,
189                       Channel::MessagePtr message) override;
190   void OnRequestPortMerge(const ports::NodeName& from_node,
191                           const ports::PortName& connector_port_name,
192                           const std::string& token) override;
193   void OnRequestIntroduction(const ports::NodeName& from_node,
194                              const ports::NodeName& name) override;
195   void OnIntroduce(const ports::NodeName& from_node,
196                    const ports::NodeName& name,
197                    ScopedPlatformHandle channel_handle) override;
198   void OnBroadcast(const ports::NodeName& from_node,
199                    Channel::MessagePtr message) override;
200 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
201   void OnRelayPortsMessage(const ports::NodeName& from_node,
202                            base::ProcessHandle from_process,
203                            const ports::NodeName& destination,
204                            Channel::MessagePtr message) override;
205   void OnPortsMessageFromRelay(const ports::NodeName& from_node,
206                                const ports::NodeName& source_node,
207                                Channel::MessagePtr message) override;
208 #endif
209   void OnChannelError(const ports::NodeName& from_node,
210                       NodeChannel* channel) override;
211 #if defined(OS_MACOSX) && !defined(OS_IOS)
212   MachPortRelay* GetMachPortRelay() override;
213 #endif
214 
215   // Marks this NodeController for destruction when the IO thread shuts down.
216   // This is used in case Core is torn down before the IO thread. Must only be
217   // called on the IO thread.
218   void DestroyOnIOThreadShutdown();
219 
220   // If there is a registered shutdown callback (meaning shutdown has been
221   // requested, this checks the Node's status to see if clean shutdown is
222   // possible. If so, shutdown is performed and the shutdown callback is run.
223   void AttemptShutdownIfRequested();
224 
225   // These are safe to access from any thread as long as the Node is alive.
226   Core* const core_;
227   const ports::NodeName name_;
228   const std::unique_ptr<ports::Node> node_;
229   scoped_refptr<base::TaskRunner> io_task_runner_;
230 
231   // Guards |peers_| and |pending_peer_messages_|.
232   base::Lock peers_lock_;
233 
234   // Channels to known peers, including parent and children, if any.
235   NodeMap peers_;
236 
237   // Outgoing message queues for peers we've heard of but can't yet talk to.
238   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
239       pending_peer_messages_;
240 
241   // Guards |reserved_ports_| and |pending_child_tokens_|.
242   base::Lock reserved_ports_lock_;
243 
244   // Ports reserved by token. Key is the port token.
245   base::hash_map<std::string, ReservedPort> reserved_ports_;
246   // TODO(amistry): This _really_ needs to be a bimap. Unfortunately, we don't
247   // have one yet :(
248   std::unordered_map<ports::NodeName, std::string> pending_child_tokens_;
249 
250   // Guards |pending_port_merges_| and |reject_pending_merges_|.
251   base::Lock pending_port_merges_lock_;
252 
253   // A set of port merge requests awaiting parent connection.
254   std::vector<std::pair<std::string, ports::PortRef>> pending_port_merges_;
255 
256   // Indicates that new merge requests should be rejected because the parent has
257   // disconnected.
258   bool reject_pending_merges_ = false;
259 
260   // Guards |parent_name_| and |bootstrap_parent_channel_|.
261   base::Lock parent_lock_;
262 
263   // The name of our parent node, if any.
264   ports::NodeName parent_name_;
265 
266   // A temporary reference to the parent channel before we know their name.
267   scoped_refptr<NodeChannel> bootstrap_parent_channel_;
268 
269   // Guards |broker_name_|, |pending_broker_clients_|, and
270   // |pending_relay_messages_|.
271   base::Lock broker_lock_;
272 
273   // The name of our broker node, if any.
274   ports::NodeName broker_name_;
275 
276   // A queue of pending child names waiting to be connected to a broker.
277   std::queue<ports::NodeName> pending_broker_clients_;
278 
279   // Messages waiting to be relayed by the broker once it's known.
280   std::unordered_map<ports::NodeName, OutgoingMessageQueue>
281       pending_relay_messages_;
282 
283   // Guards |incoming_messages_| and |incoming_messages_task_posted_|.
284   base::Lock messages_lock_;
285   std::queue<ports::ScopedMessage> incoming_messages_;
286   // Ensures that there is only one incoming messages task posted to the IO
287   // thread.
288   bool incoming_messages_task_posted_ = false;
289 
290   // Guards |shutdown_callback_|.
291   base::Lock shutdown_lock_;
292 
293   // Set by RequestShutdown(). If this is non-null, the controller will
294   // begin polling the Node to see if clean shutdown is possible any time the
295   // Node's state is modified by the controller.
296   base::Closure shutdown_callback_;
297   // Flag to fast-path checking |shutdown_callback_|.
298   AtomicFlag shutdown_callback_flag_;
299 
300   // All other fields below must only be accessed on the I/O thread, i.e., the
301   // thread on which core_->io_task_runner() runs tasks.
302 
303   // Channels to children during handshake.
304   NodeMap pending_children_;
305 
306   // Indicates whether this object should delete itself on IO thread shutdown.
307   // Must only be accessed from the IO thread.
308   bool destroy_on_io_thread_shutdown_ = false;
309 
310 #if defined(OS_POSIX) && !defined(OS_MACOSX) && !defined(OS_NACL_SFI)
311   // Broker for sync shared buffer creation (non-Mac posix-only) in children.
312   std::unique_ptr<Broker> broker_;
313 #endif
314 
315 #if defined(OS_MACOSX) && !defined(OS_IOS)
316   base::Lock mach_port_relay_lock_;
317   // Relay for transferring mach ports to/from children.
318   std::unique_ptr<MachPortRelay> mach_port_relay_;
319 #endif
320 
321   DISALLOW_COPY_AND_ASSIGN(NodeController);
322 };
323 
324 }  // namespace edk
325 }  // namespace mojo
326 
327 #endif  // MOJO_EDK_SYSTEM_NODE_CONTROLLER_H_
328