1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #ifndef MOJO_CORE_NODE_CHANNEL_H_
6 #define MOJO_CORE_NODE_CHANNEL_H_
7 
8 #include <utility>
9 #include <vector>
10 
11 #include "base/callback.h"
12 #include "base/containers/queue.h"
13 #include "base/macros.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/process/process_handle.h"
16 #include "base/synchronization/lock.h"
17 #include "base/task_runner.h"
18 #include "build/build_config.h"
19 #include "mojo/core/channel.h"
20 #include "mojo/core/connection_params.h"
21 #include "mojo/core/embedder/process_error_callback.h"
22 #include "mojo/core/ports/name.h"
23 #include "mojo/core/scoped_process_handle.h"
24 
25 namespace mojo {
26 namespace core {
27 
28 // Wraps a Channel to send and receive Node control messages.
29 class NodeChannel : public base::RefCountedThreadSafe<NodeChannel>,
30                     public Channel::Delegate {
31  public:
32   class Delegate {
33    public:
~Delegate()34     virtual ~Delegate() {}
35     virtual void OnAcceptInvitee(const ports::NodeName& from_node,
36                                  const ports::NodeName& inviter_name,
37                                  const ports::NodeName& token) = 0;
38     virtual void OnAcceptInvitation(const ports::NodeName& from_node,
39                                     const ports::NodeName& token,
40                                     const ports::NodeName& invitee_name) = 0;
41     virtual void OnAddBrokerClient(const ports::NodeName& from_node,
42                                    const ports::NodeName& client_name,
43                                    base::ProcessHandle process_handle) = 0;
44     virtual void OnBrokerClientAdded(const ports::NodeName& from_node,
45                                      const ports::NodeName& client_name,
46                                      PlatformHandle broker_channel) = 0;
47     virtual void OnAcceptBrokerClient(const ports::NodeName& from_node,
48                                       const ports::NodeName& broker_name,
49                                       PlatformHandle broker_channel) = 0;
50     virtual void OnEventMessage(const ports::NodeName& from_node,
51                                 Channel::MessagePtr message) = 0;
52     virtual void OnRequestPortMerge(const ports::NodeName& from_node,
53                                     const ports::PortName& connector_port_name,
54                                     const std::string& token) = 0;
55     virtual void OnRequestIntroduction(const ports::NodeName& from_node,
56                                        const ports::NodeName& name) = 0;
57     virtual void OnIntroduce(const ports::NodeName& from_node,
58                              const ports::NodeName& name,
59                              PlatformHandle channel_handle) = 0;
60     virtual void OnBroadcast(const ports::NodeName& from_node,
61                              Channel::MessagePtr message) = 0;
62 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
63     virtual void OnRelayEventMessage(const ports::NodeName& from_node,
64                                      base::ProcessHandle from_process,
65                                      const ports::NodeName& destination,
66                                      Channel::MessagePtr message) = 0;
67     virtual void OnEventMessageFromRelay(const ports::NodeName& from_node,
68                                          const ports::NodeName& source_node,
69                                          Channel::MessagePtr message) = 0;
70 #endif
71     virtual void OnAcceptPeer(const ports::NodeName& from_node,
72                               const ports::NodeName& token,
73                               const ports::NodeName& peer_name,
74                               const ports::PortName& port_name) = 0;
75     virtual void OnChannelError(const ports::NodeName& node,
76                                 NodeChannel* channel) = 0;
77   };
78 
79   static scoped_refptr<NodeChannel> Create(
80       Delegate* delegate,
81       ConnectionParams connection_params,
82       scoped_refptr<base::TaskRunner> io_task_runner,
83       const ProcessErrorCallback& process_error_callback);
84 
85   static Channel::MessagePtr CreateEventMessage(size_t capacity,
86                                                 size_t payload_size,
87                                                 void** payload,
88                                                 size_t num_handles);
89 
90   static void GetEventMessageData(Channel::Message* message,
91                                   void** data,
92                                   size_t* num_data_bytes);
93 
94   // Start receiving messages.
95   void Start();
96 
97   // Permanently stop the channel from sending or receiving messages.
98   void ShutDown();
99 
100   // Leaks the pipe handle instead of closing it on shutdown.
101   void LeakHandleOnShutdown();
102 
103   // Invokes the bad message callback for this channel, if any.
104   void NotifyBadMessage(const std::string& error);
105 
106   void SetRemoteProcessHandle(ScopedProcessHandle process_handle);
107   bool HasRemoteProcessHandle();
108   ScopedProcessHandle CloneRemoteProcessHandle();
109 
110   // Used for context in Delegate calls (via |from_node| arguments.)
111   void SetRemoteNodeName(const ports::NodeName& name);
112 
113   void AcceptInvitee(const ports::NodeName& inviter_name,
114                      const ports::NodeName& token);
115   void AcceptInvitation(const ports::NodeName& token,
116                         const ports::NodeName& invitee_name);
117   void AcceptPeer(const ports::NodeName& sender_name,
118                   const ports::NodeName& token,
119                   const ports::PortName& port_name);
120   void AddBrokerClient(const ports::NodeName& client_name,
121                        ScopedProcessHandle process_handle);
122   void BrokerClientAdded(const ports::NodeName& client_name,
123                          PlatformHandle broker_channel);
124   void AcceptBrokerClient(const ports::NodeName& broker_name,
125                           PlatformHandle broker_channel);
126   void RequestPortMerge(const ports::PortName& connector_port_name,
127                         const std::string& token);
128   void RequestIntroduction(const ports::NodeName& name);
129   void Introduce(const ports::NodeName& name, PlatformHandle channel_handle);
130   void SendChannelMessage(Channel::MessagePtr message);
131   void Broadcast(Channel::MessagePtr message);
132 
133 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
134   // Relay the message to the specified node via this channel.  This is used to
135   // pass windows handles between two processes that do not have permission to
136   // duplicate handles into the other's address space. The relay process is
137   // assumed to have that permission.
138   void RelayEventMessage(const ports::NodeName& destination,
139                          Channel::MessagePtr message);
140 
141   // Sends a message to its destination from a relay. This is interpreted by the
142   // receiver similarly to EventMessage, but the original source node is
143   // provided as additional message metadata from the (trusted) relay node.
144   void EventMessageFromRelay(const ports::NodeName& source,
145                              Channel::MessagePtr message);
146 #endif
147 
148  private:
149   friend class base::RefCountedThreadSafe<NodeChannel>;
150 
151   using PendingMessageQueue = base::queue<Channel::MessagePtr>;
152   using PendingRelayMessageQueue =
153       base::queue<std::pair<ports::NodeName, Channel::MessagePtr>>;
154 
155   NodeChannel(Delegate* delegate,
156               ConnectionParams connection_params,
157               scoped_refptr<base::TaskRunner> io_task_runner,
158               const ProcessErrorCallback& process_error_callback);
159   ~NodeChannel() override;
160 
161   // Channel::Delegate:
162   void OnChannelMessage(const void* payload,
163                         size_t payload_size,
164                         std::vector<PlatformHandle> handles) override;
165   void OnChannelError(Channel::Error error) override;
166 
167   void WriteChannelMessage(Channel::MessagePtr message);
168 
169   Delegate* const delegate_;
170   const scoped_refptr<base::TaskRunner> io_task_runner_;
171   const ProcessErrorCallback process_error_callback_;
172 
173   base::Lock channel_lock_;
174   scoped_refptr<Channel> channel_;
175 
176   // Must only be accessed from |io_task_runner_|'s thread.
177   ports::NodeName remote_node_name_;
178 
179   base::Lock remote_process_handle_lock_;
180   ScopedProcessHandle remote_process_handle_;
181 
182   DISALLOW_COPY_AND_ASSIGN(NodeChannel);
183 };
184 
185 }  // namespace core
186 }  // namespace mojo
187 
188 #endif  // MOJO_CORE_NODE_CHANNEL_H_
189