1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/edk/system/node_channel.h"
6 
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "mojo/edk/system/channel.h"
15 #include "mojo/edk/system/request_context.h"
16 
17 #if defined(OS_MACOSX) && !defined(OS_IOS)
18 #include "mojo/edk/system/mach_port_relay.h"
19 #endif
20 
21 namespace mojo {
22 namespace edk {
23 
24 namespace {
25 
26 template <typename T>
Align(T t)27 T Align(T t) {
28   const auto k = kChannelMessageAlignment;
29   return t + (k - (t % k)) % k;
30 }
31 
32 // NOTE: Please ONLY append messages to the end of this enum.
33 enum class MessageType : uint32_t {
34   ACCEPT_CHILD,
35   ACCEPT_PARENT,
36   ADD_BROKER_CLIENT,
37   BROKER_CLIENT_ADDED,
38   ACCEPT_BROKER_CLIENT,
39   PORTS_MESSAGE,
40   REQUEST_PORT_MERGE,
41   REQUEST_INTRODUCTION,
42   INTRODUCE,
43 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
44   RELAY_PORTS_MESSAGE,
45 #endif
46   BROADCAST,
47 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
48   PORTS_MESSAGE_FROM_RELAY,
49 #endif
50 };
51 
52 struct Header {
53   MessageType type;
54   uint32_t padding;
55 };
56 
57 static_assert(sizeof(Header) % kChannelMessageAlignment == 0,
58     "Invalid header size.");
59 
60 struct AcceptChildData {
61   ports::NodeName parent_name;
62   ports::NodeName token;
63 };
64 
65 struct AcceptParentData {
66   ports::NodeName token;
67   ports::NodeName child_name;
68 };
69 
70 // This message may include a process handle on plaforms that require it.
71 struct AddBrokerClientData {
72   ports::NodeName client_name;
73 #if !defined(OS_WIN)
74   uint32_t process_handle;
75   uint32_t padding;
76 #endif
77 };
78 
79 #if !defined(OS_WIN)
80 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
81               "Unexpected pid size");
82 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
83               "Invalid AddBrokerClientData size.");
84 #endif
85 
86 // This data is followed by a platform channel handle to the broker.
87 struct BrokerClientAddedData {
88   ports::NodeName client_name;
89 };
90 
91 // This data may be followed by a platform channel handle to the broker. If not,
92 // then the parent is the broker and its channel should be used as such.
93 struct AcceptBrokerClientData {
94   ports::NodeName broker_name;
95 };
96 
97 // This is followed by arbitrary payload data which is interpreted as a token
98 // string for port location.
99 struct RequestPortMergeData {
100   ports::PortName connector_port_name;
101 };
102 
103 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
104 //
105 // For INTRODUCE the message also includes a valid platform handle for a channel
106 // the receiver may use to communicate with the named node directly, or an
107 // invalid platform handle if the node is unknown to the sender or otherwise
108 // cannot be introduced.
109 struct IntroductionData {
110   ports::NodeName name;
111 };
112 
113 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
114 // This struct is followed by the full payload of a message to be relayed.
115 struct RelayPortsMessageData {
116   ports::NodeName destination;
117 };
118 
119 // This struct is followed by the full payload of a relayed message.
120 struct PortsMessageFromRelayData {
121   ports::NodeName source;
122 };
123 #endif
124 
125 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data)126 Channel::MessagePtr CreateMessage(MessageType type,
127                                   size_t payload_size,
128                                   size_t num_handles,
129                                   DataType** out_data) {
130   Channel::MessagePtr message(
131       new Channel::Message(sizeof(Header) + payload_size, num_handles));
132   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
133   header->type = type;
134   header->padding = 0;
135   *out_data = reinterpret_cast<DataType*>(&header[1]);
136   return message;
137 }
138 
139 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)140 bool GetMessagePayload(const void* bytes,
141                        size_t num_bytes,
142                        DataType** out_data) {
143   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
144   if (num_bytes < sizeof(Header) + sizeof(DataType))
145     return false;
146   *out_data = reinterpret_cast<const DataType*>(
147       static_cast<const char*>(bytes) + sizeof(Header));
148   return true;
149 }
150 
151 }  // namespace
152 
153 // static
Create(Delegate * delegate,ScopedPlatformHandle platform_handle,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)154 scoped_refptr<NodeChannel> NodeChannel::Create(
155     Delegate* delegate,
156     ScopedPlatformHandle platform_handle,
157     scoped_refptr<base::TaskRunner> io_task_runner,
158     const ProcessErrorCallback& process_error_callback) {
159 #if defined(OS_NACL_SFI)
160   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
161   return nullptr;
162 #else
163   return new NodeChannel(delegate, std::move(platform_handle), io_task_runner,
164                          process_error_callback);
165 #endif
166 }
167 
168 // static
CreatePortsMessage(size_t payload_size,void ** payload,size_t num_handles)169 Channel::MessagePtr NodeChannel::CreatePortsMessage(size_t payload_size,
170                                                     void** payload,
171                                                     size_t num_handles) {
172   return CreateMessage(MessageType::PORTS_MESSAGE, payload_size, num_handles,
173                        payload);
174 }
175 
176 // static
GetPortsMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)177 void NodeChannel::GetPortsMessageData(Channel::Message* message,
178                                       void** data,
179                                       size_t* num_data_bytes) {
180   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
181   *num_data_bytes = message->payload_size() - sizeof(Header);
182 }
183 
Start()184 void NodeChannel::Start() {
185 #if defined(OS_MACOSX) && !defined(OS_IOS)
186   MachPortRelay* relay = delegate_->GetMachPortRelay();
187   if (relay)
188     relay->AddObserver(this);
189 #endif
190 
191   base::AutoLock lock(channel_lock_);
192   // ShutDown() may have already been called, in which case |channel_| is null.
193   if (channel_)
194     channel_->Start();
195 }
196 
ShutDown()197 void NodeChannel::ShutDown() {
198 #if defined(OS_MACOSX) && !defined(OS_IOS)
199   MachPortRelay* relay = delegate_->GetMachPortRelay();
200   if (relay)
201     relay->RemoveObserver(this);
202 #endif
203 
204   base::AutoLock lock(channel_lock_);
205   if (channel_) {
206     channel_->ShutDown();
207     channel_ = nullptr;
208   }
209 }
210 
LeakHandleOnShutdown()211 void NodeChannel::LeakHandleOnShutdown() {
212   base::AutoLock lock(channel_lock_);
213   if (channel_) {
214     channel_->LeakHandle();
215   }
216 }
217 
NotifyBadMessage(const std::string & error)218 void NodeChannel::NotifyBadMessage(const std::string& error) {
219   if (!process_error_callback_.is_null())
220     process_error_callback_.Run("Received bad user message: " + error);
221 }
222 
SetRemoteProcessHandle(base::ProcessHandle process_handle)223 void NodeChannel::SetRemoteProcessHandle(base::ProcessHandle process_handle) {
224   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
225   base::AutoLock lock(remote_process_handle_lock_);
226   DCHECK_EQ(base::kNullProcessHandle, remote_process_handle_);
227   CHECK_NE(remote_process_handle_, base::GetCurrentProcessHandle());
228   remote_process_handle_ = process_handle;
229 #if defined(OS_WIN)
230   DCHECK(!scoped_remote_process_handle_.is_valid());
231   scoped_remote_process_handle_.reset(PlatformHandle(process_handle));
232 #endif
233 }
234 
HasRemoteProcessHandle()235 bool NodeChannel::HasRemoteProcessHandle() {
236   base::AutoLock lock(remote_process_handle_lock_);
237   return remote_process_handle_ != base::kNullProcessHandle;
238 }
239 
CopyRemoteProcessHandle()240 base::ProcessHandle NodeChannel::CopyRemoteProcessHandle() {
241   base::AutoLock lock(remote_process_handle_lock_);
242 #if defined(OS_WIN)
243   if (remote_process_handle_ != base::kNullProcessHandle) {
244     // Privileged nodes use this to pass their childrens' process handles to the
245     // broker on launch.
246     HANDLE handle = remote_process_handle_;
247     BOOL result = DuplicateHandle(
248         base::GetCurrentProcessHandle(), remote_process_handle_,
249         base::GetCurrentProcessHandle(), &handle, 0, FALSE,
250         DUPLICATE_SAME_ACCESS);
251     DPCHECK(result);
252     return handle;
253   }
254   return base::kNullProcessHandle;
255 #else
256   return remote_process_handle_;
257 #endif
258 }
259 
SetRemoteNodeName(const ports::NodeName & name)260 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
261   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
262   remote_node_name_ = name;
263 }
264 
AcceptChild(const ports::NodeName & parent_name,const ports::NodeName & token)265 void NodeChannel::AcceptChild(const ports::NodeName& parent_name,
266                               const ports::NodeName& token) {
267   AcceptChildData* data;
268   Channel::MessagePtr message = CreateMessage(
269       MessageType::ACCEPT_CHILD, sizeof(AcceptChildData), 0, &data);
270   data->parent_name = parent_name;
271   data->token = token;
272   WriteChannelMessage(std::move(message));
273 }
274 
AcceptParent(const ports::NodeName & token,const ports::NodeName & child_name)275 void NodeChannel::AcceptParent(const ports::NodeName& token,
276                                const ports::NodeName& child_name) {
277   AcceptParentData* data;
278   Channel::MessagePtr message = CreateMessage(
279       MessageType::ACCEPT_PARENT, sizeof(AcceptParentData), 0, &data);
280   data->token = token;
281   data->child_name = child_name;
282   WriteChannelMessage(std::move(message));
283 }
284 
AddBrokerClient(const ports::NodeName & client_name,base::ProcessHandle process_handle)285 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
286                                   base::ProcessHandle process_handle) {
287   AddBrokerClientData* data;
288   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
289 #if defined(OS_WIN)
290   handles->push_back(PlatformHandle(process_handle));
291 #endif
292   Channel::MessagePtr message = CreateMessage(
293       MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
294       handles->size(), &data);
295   message->SetHandles(std::move(handles));
296   data->client_name = client_name;
297 #if !defined(OS_WIN)
298   data->process_handle = process_handle;
299   data->padding = 0;
300 #endif
301   WriteChannelMessage(std::move(message));
302 }
303 
BrokerClientAdded(const ports::NodeName & client_name,ScopedPlatformHandle broker_channel)304 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
305                                     ScopedPlatformHandle broker_channel) {
306   BrokerClientAddedData* data;
307   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
308   if (broker_channel.is_valid())
309     handles->push_back(broker_channel.release());
310   Channel::MessagePtr message = CreateMessage(
311       MessageType::BROKER_CLIENT_ADDED, sizeof(BrokerClientAddedData),
312       handles->size(), &data);
313   message->SetHandles(std::move(handles));
314   data->client_name = client_name;
315   WriteChannelMessage(std::move(message));
316 }
317 
AcceptBrokerClient(const ports::NodeName & broker_name,ScopedPlatformHandle broker_channel)318 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
319                                      ScopedPlatformHandle broker_channel) {
320   AcceptBrokerClientData* data;
321   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
322   if (broker_channel.is_valid())
323     handles->push_back(broker_channel.release());
324   Channel::MessagePtr message = CreateMessage(
325       MessageType::ACCEPT_BROKER_CLIENT, sizeof(AcceptBrokerClientData),
326       handles->size(), &data);
327   message->SetHandles(std::move(handles));
328   data->broker_name = broker_name;
329   WriteChannelMessage(std::move(message));
330 }
331 
PortsMessage(Channel::MessagePtr message)332 void NodeChannel::PortsMessage(Channel::MessagePtr message) {
333   WriteChannelMessage(std::move(message));
334 }
335 
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)336 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
337                                    const std::string& token) {
338   RequestPortMergeData* data;
339   Channel::MessagePtr message = CreateMessage(
340       MessageType::REQUEST_PORT_MERGE,
341       sizeof(RequestPortMergeData) + token.size(), 0, &data);
342   data->connector_port_name = connector_port_name;
343   memcpy(data + 1, token.data(), token.size());
344   WriteChannelMessage(std::move(message));
345 }
346 
RequestIntroduction(const ports::NodeName & name)347 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
348   IntroductionData* data;
349   Channel::MessagePtr message = CreateMessage(
350       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
351   data->name = name;
352   WriteChannelMessage(std::move(message));
353 }
354 
Introduce(const ports::NodeName & name,ScopedPlatformHandle channel_handle)355 void NodeChannel::Introduce(const ports::NodeName& name,
356                             ScopedPlatformHandle channel_handle) {
357   IntroductionData* data;
358   ScopedPlatformHandleVectorPtr handles(new PlatformHandleVector());
359   if (channel_handle.is_valid())
360     handles->push_back(channel_handle.release());
361   Channel::MessagePtr message = CreateMessage(
362       MessageType::INTRODUCE, sizeof(IntroductionData), handles->size(), &data);
363   message->SetHandles(std::move(handles));
364   data->name = name;
365   WriteChannelMessage(std::move(message));
366 }
367 
Broadcast(Channel::MessagePtr message)368 void NodeChannel::Broadcast(Channel::MessagePtr message) {
369   DCHECK(!message->has_handles());
370   void* data;
371   Channel::MessagePtr broadcast_message = CreateMessage(
372       MessageType::BROADCAST, message->data_num_bytes(), 0, &data);
373   memcpy(data, message->data(), message->data_num_bytes());
374   WriteChannelMessage(std::move(broadcast_message));
375 }
376 
377 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
RelayPortsMessage(const ports::NodeName & destination,Channel::MessagePtr message)378 void NodeChannel::RelayPortsMessage(const ports::NodeName& destination,
379                                     Channel::MessagePtr message) {
380 #if defined(OS_WIN)
381   DCHECK(message->has_handles());
382 
383   // Note that this is only used on Windows, and on Windows all platform
384   // handles are included in the message data. We blindly copy all the data
385   // here and the relay node (the parent) will duplicate handles as needed.
386   size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes();
387   RelayPortsMessageData* data;
388   Channel::MessagePtr relay_message = CreateMessage(
389       MessageType::RELAY_PORTS_MESSAGE, num_bytes, 0, &data);
390   data->destination = destination;
391   memcpy(data + 1, message->data(), message->data_num_bytes());
392 
393   // When the handles are duplicated in the parent, the source handles will
394   // be closed. If the parent never receives this message then these handles
395   // will leak, but that means something else has probably broken and the
396   // sending process won't likely be around much longer.
397   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
398   handles->clear();
399 
400 #else
401   DCHECK(message->has_mach_ports());
402 
403   // On OSX, the handles are extracted from the relayed message and attached to
404   // the wrapper. The broker then takes the handles attached to the wrapper and
405   // moves them back to the relayed message. This is necessary because the
406   // message may contain fds which need to be attached to the outer message so
407   // that they can be transferred to the broker.
408   ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
409   size_t num_bytes = sizeof(RelayPortsMessageData) + message->data_num_bytes();
410   RelayPortsMessageData* data;
411   Channel::MessagePtr relay_message = CreateMessage(
412       MessageType::RELAY_PORTS_MESSAGE, num_bytes, handles->size(), &data);
413   data->destination = destination;
414   memcpy(data + 1, message->data(), message->data_num_bytes());
415   relay_message->SetHandles(std::move(handles));
416 #endif  // defined(OS_WIN)
417 
418   WriteChannelMessage(std::move(relay_message));
419 }
420 
PortsMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)421 void NodeChannel::PortsMessageFromRelay(const ports::NodeName& source,
422                                         Channel::MessagePtr message) {
423   size_t num_bytes = sizeof(PortsMessageFromRelayData) +
424       message->payload_size();
425   PortsMessageFromRelayData* data;
426   Channel::MessagePtr relayed_message = CreateMessage(
427       MessageType::PORTS_MESSAGE_FROM_RELAY, num_bytes, message->num_handles(),
428       &data);
429   data->source = source;
430   if (message->payload_size())
431     memcpy(data + 1, message->payload(), message->payload_size());
432   relayed_message->SetHandles(message->TakeHandles());
433   WriteChannelMessage(std::move(relayed_message));
434 }
435 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
436 
NodeChannel(Delegate * delegate,ScopedPlatformHandle platform_handle,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)437 NodeChannel::NodeChannel(Delegate* delegate,
438                          ScopedPlatformHandle platform_handle,
439                          scoped_refptr<base::TaskRunner> io_task_runner,
440                          const ProcessErrorCallback& process_error_callback)
441     : delegate_(delegate),
442       io_task_runner_(io_task_runner),
443       process_error_callback_(process_error_callback)
444 #if !defined(OS_NACL_SFI)
445       , channel_(
446           Channel::Create(this, std::move(platform_handle), io_task_runner_))
447 #endif
448       {
449 }
450 
~NodeChannel()451 NodeChannel::~NodeChannel() {
452   ShutDown();
453 }
454 
OnChannelMessage(const void * payload,size_t payload_size,ScopedPlatformHandleVectorPtr handles)455 void NodeChannel::OnChannelMessage(const void* payload,
456                                    size_t payload_size,
457                                    ScopedPlatformHandleVectorPtr handles) {
458   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
459 
460   RequestContext request_context(RequestContext::Source::SYSTEM);
461 
462   // Ensure this NodeChannel stays alive through the extent of this method. The
463   // delegate may have the only other reference to this object and it may choose
464   // to drop it here in response to, e.g., a malformed message.
465   scoped_refptr<NodeChannel> keepalive = this;
466 
467 #if defined(OS_WIN)
468   // If we receive handles from a known process, rewrite them to our own
469   // process. This can occur when a privileged node receives handles directly
470   // from a privileged descendant.
471   {
472     base::AutoLock lock(remote_process_handle_lock_);
473     if (handles && remote_process_handle_ != base::kNullProcessHandle) {
474       // Note that we explicitly mark the handles as being owned by the sending
475       // process before rewriting them, in order to accommodate RewriteHandles'
476       // internal sanity checks.
477       for (auto& handle : *handles)
478         handle.owning_process = remote_process_handle_;
479       if (!Channel::Message::RewriteHandles(remote_process_handle_,
480                                             base::GetCurrentProcessHandle(),
481                                             handles.get())) {
482         DLOG(ERROR) << "Received one or more invalid handles.";
483       }
484     } else if (handles) {
485       // Handles received by an unknown process must already be owned by us.
486       for (auto& handle : *handles)
487         handle.owning_process = base::GetCurrentProcessHandle();
488     }
489   }
490 #elif defined(OS_MACOSX) && !defined(OS_IOS)
491   // If we're not the root, receive any mach ports from the message. If we're
492   // the root, the only message containing mach ports should be a
493   // RELAY_PORTS_MESSAGE.
494   {
495     MachPortRelay* relay = delegate_->GetMachPortRelay();
496     if (handles && !relay) {
497       if (!MachPortRelay::ReceivePorts(handles.get())) {
498         LOG(ERROR) << "Error receiving mach ports.";
499       }
500     }
501   }
502 #endif  // defined(OS_WIN)
503 
504 
505   if (payload_size <= sizeof(Header)) {
506     delegate_->OnChannelError(remote_node_name_, this);
507     return;
508   }
509 
510   const Header* header = static_cast<const Header*>(payload);
511   switch (header->type) {
512     case MessageType::ACCEPT_CHILD: {
513       const AcceptChildData* data;
514       if (GetMessagePayload(payload, payload_size, &data)) {
515         delegate_->OnAcceptChild(remote_node_name_, data->parent_name,
516                                  data->token);
517         return;
518       }
519       break;
520     }
521 
522     case MessageType::ACCEPT_PARENT: {
523       const AcceptParentData* data;
524       if (GetMessagePayload(payload, payload_size, &data)) {
525         delegate_->OnAcceptParent(remote_node_name_, data->token,
526                                   data->child_name);
527         return;
528       }
529       break;
530     }
531 
532     case MessageType::ADD_BROKER_CLIENT: {
533       const AddBrokerClientData* data;
534       if (GetMessagePayload(payload, payload_size, &data)) {
535         ScopedPlatformHandle process_handle;
536 #if defined(OS_WIN)
537         if (!handles || handles->size() != 1) {
538           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
539           break;
540         }
541         process_handle = ScopedPlatformHandle(handles->at(0));
542         handles->clear();
543         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
544                                      process_handle.release().handle);
545 #else
546         if (handles && handles->size() != 0) {
547           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
548           break;
549         }
550         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
551                                      data->process_handle);
552 #endif
553         return;
554       }
555       break;
556     }
557 
558     case MessageType::BROKER_CLIENT_ADDED: {
559       const BrokerClientAddedData* data;
560       if (GetMessagePayload(payload, payload_size, &data)) {
561         ScopedPlatformHandle broker_channel;
562         if (!handles || handles->size() != 1) {
563           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
564           break;
565         }
566         broker_channel = ScopedPlatformHandle(handles->at(0));
567         handles->clear();
568         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
569                                        std::move(broker_channel));
570         return;
571       }
572       break;
573     }
574 
575     case MessageType::ACCEPT_BROKER_CLIENT: {
576       const AcceptBrokerClientData* data;
577       if (GetMessagePayload(payload, payload_size, &data)) {
578         ScopedPlatformHandle broker_channel;
579         if (handles && handles->size() > 1) {
580           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
581           break;
582         }
583         if (handles && handles->size() == 1) {
584           broker_channel = ScopedPlatformHandle(handles->at(0));
585           handles->clear();
586         }
587         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
588                                         std::move(broker_channel));
589         return;
590       }
591       break;
592     }
593 
594     case MessageType::PORTS_MESSAGE: {
595       size_t num_handles = handles ? handles->size() : 0;
596       Channel::MessagePtr message(
597           new Channel::Message(payload_size, num_handles));
598       message->SetHandles(std::move(handles));
599       memcpy(message->mutable_payload(), payload, payload_size);
600       delegate_->OnPortsMessage(remote_node_name_, std::move(message));
601       return;
602     }
603 
604     case MessageType::REQUEST_PORT_MERGE: {
605       const RequestPortMergeData* data;
606       if (GetMessagePayload(payload, payload_size, &data)) {
607         // Don't accept an empty token.
608         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
609         if (token_size == 0)
610           break;
611         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
612         delegate_->OnRequestPortMerge(remote_node_name_,
613                                       data->connector_port_name, token);
614         return;
615       }
616       break;
617     }
618 
619     case MessageType::REQUEST_INTRODUCTION: {
620       const IntroductionData* data;
621       if (GetMessagePayload(payload, payload_size, &data)) {
622         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
623         return;
624       }
625       break;
626     }
627 
628     case MessageType::INTRODUCE: {
629       const IntroductionData* data;
630       if (GetMessagePayload(payload, payload_size, &data)) {
631         if (handles && handles->size() > 1) {
632           DLOG(ERROR) << "Dropping invalid introduction message.";
633           break;
634         }
635         ScopedPlatformHandle channel_handle;
636         if (handles && handles->size() == 1) {
637           channel_handle = ScopedPlatformHandle(handles->at(0));
638           handles->clear();
639         }
640         delegate_->OnIntroduce(remote_node_name_, data->name,
641                                std::move(channel_handle));
642         return;
643       }
644       break;
645     }
646 
647 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
648     case MessageType::RELAY_PORTS_MESSAGE: {
649       base::ProcessHandle from_process;
650       {
651         base::AutoLock lock(remote_process_handle_lock_);
652         from_process = remote_process_handle_;
653       }
654       const RelayPortsMessageData* data;
655       if (GetMessagePayload(payload, payload_size, &data)) {
656         // Don't try to relay an empty message.
657         if (payload_size <= sizeof(Header) + sizeof(RelayPortsMessageData))
658           break;
659 
660         const void* message_start = data + 1;
661         Channel::MessagePtr message = Channel::Message::Deserialize(
662             message_start, payload_size - sizeof(Header) - sizeof(*data));
663         if (!message) {
664           DLOG(ERROR) << "Dropping invalid relay message.";
665           break;
666         }
667   #if defined(OS_MACOSX) && !defined(OS_IOS)
668         message->SetHandles(std::move(handles));
669         MachPortRelay* relay = delegate_->GetMachPortRelay();
670         if (!relay) {
671           LOG(ERROR) << "Receiving mach ports without a port relay from "
672                      << remote_node_name_ << ". Dropping message.";
673           break;
674         }
675         {
676           base::AutoLock lock(pending_mach_messages_lock_);
677           if (relay->port_provider()->TaskForPid(from_process) ==
678               MACH_PORT_NULL) {
679             pending_relay_messages_.push(
680                 std::make_pair(data->destination, std::move(message)));
681             break;
682           }
683         }
684   #endif
685         delegate_->OnRelayPortsMessage(remote_node_name_, from_process,
686                                        data->destination, std::move(message));
687         return;
688       }
689       break;
690     }
691 #endif
692 
693     case MessageType::BROADCAST: {
694       if (payload_size <= sizeof(Header))
695         break;
696       const void* data = static_cast<const void*>(
697           reinterpret_cast<const Header*>(payload) + 1);
698       Channel::MessagePtr message =
699           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
700       if (!message || message->has_handles()) {
701         DLOG(ERROR) << "Dropping invalid broadcast message.";
702         break;
703       }
704       delegate_->OnBroadcast(remote_node_name_, std::move(message));
705       return;
706     }
707 
708 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
709     case MessageType::PORTS_MESSAGE_FROM_RELAY:
710       const PortsMessageFromRelayData* data;
711       if (GetMessagePayload(payload, payload_size, &data)) {
712         size_t num_bytes = payload_size - sizeof(*data);
713         if (num_bytes < sizeof(Header))
714           break;
715         num_bytes -= sizeof(Header);
716 
717         size_t num_handles = handles ? handles->size() : 0;
718         Channel::MessagePtr message(
719             new Channel::Message(num_bytes, num_handles));
720         message->SetHandles(std::move(handles));
721         if (num_bytes)
722           memcpy(message->mutable_payload(), data + 1, num_bytes);
723         delegate_->OnPortsMessageFromRelay(
724             remote_node_name_, data->source, std::move(message));
725         return;
726       }
727       break;
728 
729 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
730 
731     default:
732       break;
733   }
734 
735   DLOG(ERROR) << "Received invalid message. Closing channel.";
736   delegate_->OnChannelError(remote_node_name_, this);
737 }
738 
OnChannelError()739 void NodeChannel::OnChannelError() {
740   DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
741 
742   RequestContext request_context(RequestContext::Source::SYSTEM);
743 
744   ShutDown();
745   // |OnChannelError()| may cause |this| to be destroyed, but still need access
746   // to the name name after that destruction. So may a copy of
747   // |remote_node_name_| so it can be used if |this| becomes destroyed.
748   ports::NodeName node_name = remote_node_name_;
749   delegate_->OnChannelError(node_name, this);
750 }
751 
752 #if defined(OS_MACOSX) && !defined(OS_IOS)
OnProcessReady(base::ProcessHandle process)753 void NodeChannel::OnProcessReady(base::ProcessHandle process) {
754   io_task_runner_->PostTask(FROM_HERE, base::Bind(
755       &NodeChannel::ProcessPendingMessagesWithMachPorts, this));
756 }
757 
ProcessPendingMessagesWithMachPorts()758 void NodeChannel::ProcessPendingMessagesWithMachPorts() {
759   MachPortRelay* relay = delegate_->GetMachPortRelay();
760   DCHECK(relay);
761 
762   base::ProcessHandle remote_process_handle;
763   {
764     base::AutoLock lock(remote_process_handle_lock_);
765     remote_process_handle = remote_process_handle_;
766   }
767   PendingMessageQueue pending_writes;
768   PendingRelayMessageQueue pending_relays;
769   {
770     base::AutoLock lock(pending_mach_messages_lock_);
771     pending_writes.swap(pending_write_messages_);
772     pending_relays.swap(pending_relay_messages_);
773   }
774   DCHECK(pending_writes.empty() && pending_relays.empty());
775 
776   while (!pending_writes.empty()) {
777     Channel::MessagePtr message = std::move(pending_writes.front());
778     pending_writes.pop();
779     if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) {
780       LOG(ERROR) << "Error on sending mach ports. Remote process is likely "
781                  << "gone. Dropping message.";
782       return;
783     }
784 
785     base::AutoLock lock(channel_lock_);
786     if (!channel_) {
787       DLOG(ERROR) << "Dropping message on closed channel.";
788       break;
789     } else {
790       channel_->Write(std::move(message));
791     }
792   }
793 
794   // Ensure this NodeChannel stays alive while flushing relay messages.
795   scoped_refptr<NodeChannel> keepalive = this;
796 
797   while (!pending_relays.empty()) {
798     ports::NodeName destination = pending_relays.front().first;
799     Channel::MessagePtr message = std::move(pending_relays.front().second);
800     pending_relays.pop();
801     delegate_->OnRelayPortsMessage(remote_node_name_, remote_process_handle,
802                                    destination, std::move(message));
803   }
804 }
805 #endif
806 
WriteChannelMessage(Channel::MessagePtr message)807 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
808 #if defined(OS_WIN)
809   // Map handles to the destination process. Note: only messages from a
810   // privileged node should contain handles on Windows. If an unprivileged
811   // node needs to send handles, it should do so via RelayPortsMessage which
812   // stashes the handles in the message in such a way that they go undetected
813   // here (they'll be unpacked and duplicated by a privileged parent.)
814 
815   if (message->has_handles()) {
816     base::ProcessHandle remote_process_handle;
817     {
818       base::AutoLock lock(remote_process_handle_lock_);
819       remote_process_handle = remote_process_handle_;
820     }
821 
822     // Rewrite outgoing handles if we have a handle to the destination process.
823     if (remote_process_handle != base::kNullProcessHandle) {
824       ScopedPlatformHandleVectorPtr handles = message->TakeHandles();
825       if (!Channel::Message::RewriteHandles(base::GetCurrentProcessHandle(),
826                                             remote_process_handle,
827                                             handles.get())) {
828         DLOG(ERROR) << "Failed to duplicate one or more outgoing handles.";
829       }
830       message->SetHandles(std::move(handles));
831     }
832   }
833 #elif defined(OS_MACOSX) && !defined(OS_IOS)
834   // On OSX, we need to transfer mach ports to the destination process before
835   // transferring the message itself.
836   if (message->has_mach_ports()) {
837     MachPortRelay* relay = delegate_->GetMachPortRelay();
838     if (relay) {
839       base::ProcessHandle remote_process_handle;
840       {
841         base::AutoLock lock(remote_process_handle_lock_);
842         // Expect that the receiving node is a child.
843         DCHECK(remote_process_handle_ != base::kNullProcessHandle);
844         remote_process_handle = remote_process_handle_;
845       }
846       {
847         base::AutoLock lock(pending_mach_messages_lock_);
848         if (relay->port_provider()->TaskForPid(remote_process_handle) ==
849             MACH_PORT_NULL) {
850           // It is also possible for TaskForPid() to return MACH_PORT_NULL when
851           // the process has started, then died. In that case, the queued
852           // message will never be processed. But that's fine since we're about
853           // to die anyway.
854           pending_write_messages_.push(std::move(message));
855           return;
856         }
857       }
858 
859       if (!relay->SendPortsToProcess(message.get(), remote_process_handle)) {
860         LOG(ERROR) << "Error on sending mach ports. Remote process is likely "
861                    << "gone. Dropping message.";
862         return;
863       }
864     }
865   }
866 #endif
867 
868   base::AutoLock lock(channel_lock_);
869   if (!channel_)
870     DLOG(ERROR) << "Dropping message on closed channel.";
871   else
872     channel_->Write(std::move(message));
873 }
874 
875 }  // namespace edk
876 }  // namespace mojo
877