1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/node_channel.h"
6 
7 #include <cstring>
8 #include <limits>
9 #include <sstream>
10 
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h"
15 #include "mojo/core/channel.h"
16 #include "mojo/core/configuration.h"
17 #include "mojo/core/core.h"
18 #include "mojo/core/request_context.h"
19 
20 namespace mojo {
21 namespace core {
22 
23 namespace {
24 
25 // NOTE: Please ONLY append messages to the end of this enum.
26 enum class MessageType : uint32_t {
27   ACCEPT_INVITEE,
28   ACCEPT_INVITATION,
29   ADD_BROKER_CLIENT,
30   BROKER_CLIENT_ADDED,
31   ACCEPT_BROKER_CLIENT,
32   EVENT_MESSAGE,
33   REQUEST_PORT_MERGE,
34   REQUEST_INTRODUCTION,
35   INTRODUCE,
36 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
37   RELAY_EVENT_MESSAGE,
38 #endif
39   BROADCAST_EVENT,
40 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
41   EVENT_MESSAGE_FROM_RELAY,
42 #endif
43   ACCEPT_PEER,
44 };
45 
46 struct Header {
47   MessageType type;
48   uint32_t padding;
49 };
50 
51 static_assert(IsAlignedForChannelMessage(sizeof(Header)),
52               "Invalid header size.");
53 
54 struct AcceptInviteeData {
55   ports::NodeName inviter_name;
56   ports::NodeName token;
57 };
58 
59 struct AcceptInvitationData {
60   ports::NodeName token;
61   ports::NodeName invitee_name;
62 };
63 
64 struct AcceptPeerData {
65   ports::NodeName token;
66   ports::NodeName peer_name;
67   ports::PortName port_name;
68 };
69 
70 // This message may include a process handle on plaforms that require it.
71 struct AddBrokerClientData {
72   ports::NodeName client_name;
73 #if !defined(OS_WIN)
74   uint32_t process_handle;
75   uint32_t padding;
76 #endif
77 };
78 
79 #if !defined(OS_WIN)
80 static_assert(sizeof(base::ProcessHandle) == sizeof(uint32_t),
81               "Unexpected pid size");
82 static_assert(sizeof(AddBrokerClientData) % kChannelMessageAlignment == 0,
83               "Invalid AddBrokerClientData size.");
84 #endif
85 
86 // This data is followed by a platform channel handle to the broker.
87 struct BrokerClientAddedData {
88   ports::NodeName client_name;
89 };
90 
91 // This data may be followed by a platform channel handle to the broker. If not,
92 // then the inviter is the broker and its channel should be used as such.
93 struct AcceptBrokerClientData {
94   ports::NodeName broker_name;
95 };
96 
97 // This is followed by arbitrary payload data which is interpreted as a token
98 // string for port location.
99 struct RequestPortMergeData {
100   ports::PortName connector_port_name;
101 };
102 
103 // Used for both REQUEST_INTRODUCTION and INTRODUCE.
104 //
105 // For INTRODUCE the message also includes a valid platform handle for a channel
106 // the receiver may use to communicate with the named node directly, or an
107 // invalid platform handle if the node is unknown to the sender or otherwise
108 // cannot be introduced.
109 struct IntroductionData {
110   ports::NodeName name;
111 };
112 
113 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
114 // This struct is followed by the full payload of a message to be relayed.
115 struct RelayEventMessageData {
116   ports::NodeName destination;
117 };
118 
119 // This struct is followed by the full payload of a relayed message.
120 struct EventMessageFromRelayData {
121   ports::NodeName source;
122 };
123 #endif
124 
125 template <typename DataType>
CreateMessage(MessageType type,size_t payload_size,size_t num_handles,DataType ** out_data,size_t capacity=0)126 Channel::MessagePtr CreateMessage(MessageType type,
127                                   size_t payload_size,
128                                   size_t num_handles,
129                                   DataType** out_data,
130                                   size_t capacity = 0) {
131   const size_t total_size = payload_size + sizeof(Header);
132   if (capacity == 0)
133     capacity = total_size;
134   else
135     capacity = std::max(total_size, capacity);
136   auto message =
137       std::make_unique<Channel::Message>(capacity, total_size, num_handles);
138   Header* header = reinterpret_cast<Header*>(message->mutable_payload());
139   header->type = type;
140   header->padding = 0;
141   *out_data = reinterpret_cast<DataType*>(&header[1]);
142   return message;
143 }
144 
145 template <typename DataType>
GetMessagePayload(const void * bytes,size_t num_bytes,DataType ** out_data)146 bool GetMessagePayload(const void* bytes,
147                        size_t num_bytes,
148                        DataType** out_data) {
149   static_assert(sizeof(DataType) > 0, "DataType must have non-zero size.");
150   if (num_bytes < sizeof(Header) + sizeof(DataType))
151     return false;
152   *out_data = reinterpret_cast<const DataType*>(
153       static_cast<const char*>(bytes) + sizeof(Header));
154   return true;
155 }
156 
157 }  // namespace
158 
159 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)160 scoped_refptr<NodeChannel> NodeChannel::Create(
161     Delegate* delegate,
162     ConnectionParams connection_params,
163     scoped_refptr<base::TaskRunner> io_task_runner,
164     const ProcessErrorCallback& process_error_callback) {
165 #if defined(OS_NACL_SFI)
166   LOG(FATAL) << "Multi-process not yet supported on NaCl-SFI";
167   return nullptr;
168 #else
169   return new NodeChannel(delegate, std::move(connection_params), io_task_runner,
170                          process_error_callback);
171 #endif
172 }
173 
174 // static
CreateEventMessage(size_t capacity,size_t payload_size,void ** payload,size_t num_handles)175 Channel::MessagePtr NodeChannel::CreateEventMessage(size_t capacity,
176                                                     size_t payload_size,
177                                                     void** payload,
178                                                     size_t num_handles) {
179   return CreateMessage(MessageType::EVENT_MESSAGE, payload_size, num_handles,
180                        payload, capacity);
181 }
182 
183 // static
GetEventMessageData(Channel::Message * message,void ** data,size_t * num_data_bytes)184 void NodeChannel::GetEventMessageData(Channel::Message* message,
185                                       void** data,
186                                       size_t* num_data_bytes) {
187   // NOTE: OnChannelMessage guarantees that we never accept a Channel::Message
188   // with a payload of fewer than |sizeof(Header)| bytes.
189   *data = reinterpret_cast<Header*>(message->mutable_payload()) + 1;
190   *num_data_bytes = message->payload_size() - sizeof(Header);
191 }
192 
Start()193 void NodeChannel::Start() {
194   base::AutoLock lock(channel_lock_);
195   // ShutDown() may have already been called, in which case |channel_| is null.
196   if (channel_)
197     channel_->Start();
198 }
199 
ShutDown()200 void NodeChannel::ShutDown() {
201   base::AutoLock lock(channel_lock_);
202   if (channel_) {
203     channel_->ShutDown();
204     channel_ = nullptr;
205   }
206 }
207 
LeakHandleOnShutdown()208 void NodeChannel::LeakHandleOnShutdown() {
209   base::AutoLock lock(channel_lock_);
210   if (channel_) {
211     channel_->LeakHandle();
212   }
213 }
214 
NotifyBadMessage(const std::string & error)215 void NodeChannel::NotifyBadMessage(const std::string& error) {
216   if (!process_error_callback_.is_null())
217     process_error_callback_.Run("Received bad user message: " + error);
218 }
219 
SetRemoteProcessHandle(ScopedProcessHandle process_handle)220 void NodeChannel::SetRemoteProcessHandle(ScopedProcessHandle process_handle) {
221   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
222   {
223     base::AutoLock lock(channel_lock_);
224     if (channel_)
225       channel_->set_remote_process(process_handle.Clone());
226   }
227   base::AutoLock lock(remote_process_handle_lock_);
228   DCHECK(!remote_process_handle_.is_valid());
229   CHECK_NE(remote_process_handle_.get(), base::GetCurrentProcessHandle());
230   remote_process_handle_ = std::move(process_handle);
231 }
232 
HasRemoteProcessHandle()233 bool NodeChannel::HasRemoteProcessHandle() {
234   base::AutoLock lock(remote_process_handle_lock_);
235   return remote_process_handle_.is_valid();
236 }
237 
CloneRemoteProcessHandle()238 ScopedProcessHandle NodeChannel::CloneRemoteProcessHandle() {
239   base::AutoLock lock(remote_process_handle_lock_);
240   if (!remote_process_handle_.is_valid())
241     return ScopedProcessHandle();
242   return remote_process_handle_.Clone();
243 }
244 
SetRemoteNodeName(const ports::NodeName & name)245 void NodeChannel::SetRemoteNodeName(const ports::NodeName& name) {
246   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
247   remote_node_name_ = name;
248 }
249 
AcceptInvitee(const ports::NodeName & inviter_name,const ports::NodeName & token)250 void NodeChannel::AcceptInvitee(const ports::NodeName& inviter_name,
251                                 const ports::NodeName& token) {
252   AcceptInviteeData* data;
253   Channel::MessagePtr message = CreateMessage(
254       MessageType::ACCEPT_INVITEE, sizeof(AcceptInviteeData), 0, &data);
255   data->inviter_name = inviter_name;
256   data->token = token;
257   WriteChannelMessage(std::move(message));
258 }
259 
AcceptInvitation(const ports::NodeName & token,const ports::NodeName & invitee_name)260 void NodeChannel::AcceptInvitation(const ports::NodeName& token,
261                                    const ports::NodeName& invitee_name) {
262   AcceptInvitationData* data;
263   Channel::MessagePtr message = CreateMessage(
264       MessageType::ACCEPT_INVITATION, sizeof(AcceptInvitationData), 0, &data);
265   data->token = token;
266   data->invitee_name = invitee_name;
267   WriteChannelMessage(std::move(message));
268 }
269 
AcceptPeer(const ports::NodeName & sender_name,const ports::NodeName & token,const ports::PortName & port_name)270 void NodeChannel::AcceptPeer(const ports::NodeName& sender_name,
271                              const ports::NodeName& token,
272                              const ports::PortName& port_name) {
273   AcceptPeerData* data;
274   Channel::MessagePtr message =
275       CreateMessage(MessageType::ACCEPT_PEER, sizeof(AcceptPeerData), 0, &data);
276   data->token = token;
277   data->peer_name = sender_name;
278   data->port_name = port_name;
279   WriteChannelMessage(std::move(message));
280 }
281 
AddBrokerClient(const ports::NodeName & client_name,ScopedProcessHandle process_handle)282 void NodeChannel::AddBrokerClient(const ports::NodeName& client_name,
283                                   ScopedProcessHandle process_handle) {
284   AddBrokerClientData* data;
285   std::vector<PlatformHandle> handles;
286 #if defined(OS_WIN)
287   handles.emplace_back(base::win::ScopedHandle(process_handle.release()));
288 #endif
289   Channel::MessagePtr message =
290       CreateMessage(MessageType::ADD_BROKER_CLIENT, sizeof(AddBrokerClientData),
291                     handles.size(), &data);
292   message->SetHandles(std::move(handles));
293   data->client_name = client_name;
294 #if !defined(OS_WIN)
295   data->process_handle = process_handle.get();
296   data->padding = 0;
297 #endif
298   WriteChannelMessage(std::move(message));
299 }
300 
BrokerClientAdded(const ports::NodeName & client_name,PlatformHandle broker_channel)301 void NodeChannel::BrokerClientAdded(const ports::NodeName& client_name,
302                                     PlatformHandle broker_channel) {
303   BrokerClientAddedData* data;
304   std::vector<PlatformHandle> handles;
305   if (broker_channel.is_valid())
306     handles.emplace_back(std::move(broker_channel));
307   Channel::MessagePtr message =
308       CreateMessage(MessageType::BROKER_CLIENT_ADDED,
309                     sizeof(BrokerClientAddedData), handles.size(), &data);
310   message->SetHandles(std::move(handles));
311   data->client_name = client_name;
312   WriteChannelMessage(std::move(message));
313 }
314 
AcceptBrokerClient(const ports::NodeName & broker_name,PlatformHandle broker_channel)315 void NodeChannel::AcceptBrokerClient(const ports::NodeName& broker_name,
316                                      PlatformHandle broker_channel) {
317   AcceptBrokerClientData* data;
318   std::vector<PlatformHandle> handles;
319   if (broker_channel.is_valid())
320     handles.emplace_back(std::move(broker_channel));
321   Channel::MessagePtr message =
322       CreateMessage(MessageType::ACCEPT_BROKER_CLIENT,
323                     sizeof(AcceptBrokerClientData), handles.size(), &data);
324   message->SetHandles(std::move(handles));
325   data->broker_name = broker_name;
326   WriteChannelMessage(std::move(message));
327 }
328 
RequestPortMerge(const ports::PortName & connector_port_name,const std::string & token)329 void NodeChannel::RequestPortMerge(const ports::PortName& connector_port_name,
330                                    const std::string& token) {
331   RequestPortMergeData* data;
332   Channel::MessagePtr message =
333       CreateMessage(MessageType::REQUEST_PORT_MERGE,
334                     sizeof(RequestPortMergeData) + token.size(), 0, &data);
335   data->connector_port_name = connector_port_name;
336   memcpy(data + 1, token.data(), token.size());
337   WriteChannelMessage(std::move(message));
338 }
339 
RequestIntroduction(const ports::NodeName & name)340 void NodeChannel::RequestIntroduction(const ports::NodeName& name) {
341   IntroductionData* data;
342   Channel::MessagePtr message = CreateMessage(
343       MessageType::REQUEST_INTRODUCTION, sizeof(IntroductionData), 0, &data);
344   data->name = name;
345   WriteChannelMessage(std::move(message));
346 }
347 
Introduce(const ports::NodeName & name,PlatformHandle channel_handle)348 void NodeChannel::Introduce(const ports::NodeName& name,
349                             PlatformHandle channel_handle) {
350   IntroductionData* data;
351   std::vector<PlatformHandle> handles;
352   if (channel_handle.is_valid())
353     handles.emplace_back(std::move(channel_handle));
354   Channel::MessagePtr message = CreateMessage(
355       MessageType::INTRODUCE, sizeof(IntroductionData), handles.size(), &data);
356   message->SetHandles(std::move(handles));
357   data->name = name;
358   WriteChannelMessage(std::move(message));
359 }
360 
SendChannelMessage(Channel::MessagePtr message)361 void NodeChannel::SendChannelMessage(Channel::MessagePtr message) {
362   WriteChannelMessage(std::move(message));
363 }
364 
Broadcast(Channel::MessagePtr message)365 void NodeChannel::Broadcast(Channel::MessagePtr message) {
366   DCHECK(!message->has_handles());
367   void* data;
368   Channel::MessagePtr broadcast_message = CreateMessage(
369       MessageType::BROADCAST_EVENT, message->data_num_bytes(), 0, &data);
370   memcpy(data, message->data(), message->data_num_bytes());
371   WriteChannelMessage(std::move(broadcast_message));
372 }
373 
374 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
RelayEventMessage(const ports::NodeName & destination,Channel::MessagePtr message)375 void NodeChannel::RelayEventMessage(const ports::NodeName& destination,
376                                     Channel::MessagePtr message) {
377 #if defined(OS_WIN)
378   DCHECK(message->has_handles());
379 
380   // Note that this is only used on Windows, and on Windows all platform
381   // handles are included in the message data. We blindly copy all the data
382   // here and the relay node (the broker) will duplicate handles as needed.
383   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
384   RelayEventMessageData* data;
385   Channel::MessagePtr relay_message =
386       CreateMessage(MessageType::RELAY_EVENT_MESSAGE, num_bytes, 0, &data);
387   data->destination = destination;
388   memcpy(data + 1, message->data(), message->data_num_bytes());
389 
390   // When the handles are duplicated in the broker, the source handles will
391   // be closed. If the broker never receives this message then these handles
392   // will leak, but that means something else has probably broken and the
393   // sending process won't likely be around much longer.
394   //
395   // TODO(https://crbug.com/813112): We would like to be able to violate the
396   // above stated assumption. We should not leak handles in cases where we
397   // outlive the broker, as we may continue existing and eventually accept a new
398   // broker invitation.
399   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
400   for (auto& handle : handles)
401     handle.TakeHandle().release();
402 
403 #else
404   DCHECK(message->has_mach_ports());
405 
406   // On OSX, the handles are extracted from the relayed message and attached to
407   // the wrapper. The broker then takes the handles attached to the wrapper and
408   // moves them back to the relayed message. This is necessary because the
409   // message may contain fds which need to be attached to the outer message so
410   // that they can be transferred to the broker.
411   std::vector<PlatformHandleInTransit> handles = message->TakeHandles();
412   size_t num_bytes = sizeof(RelayEventMessageData) + message->data_num_bytes();
413   RelayEventMessageData* data;
414   Channel::MessagePtr relay_message = CreateMessage(
415       MessageType::RELAY_EVENT_MESSAGE, num_bytes, handles.size(), &data);
416   data->destination = destination;
417   memcpy(data + 1, message->data(), message->data_num_bytes());
418   relay_message->SetHandles(std::move(handles));
419 #endif  // defined(OS_WIN)
420 
421   WriteChannelMessage(std::move(relay_message));
422 }
423 
EventMessageFromRelay(const ports::NodeName & source,Channel::MessagePtr message)424 void NodeChannel::EventMessageFromRelay(const ports::NodeName& source,
425                                         Channel::MessagePtr message) {
426   size_t num_bytes =
427       sizeof(EventMessageFromRelayData) + message->payload_size();
428   EventMessageFromRelayData* data;
429   Channel::MessagePtr relayed_message =
430       CreateMessage(MessageType::EVENT_MESSAGE_FROM_RELAY, num_bytes,
431                     message->num_handles(), &data);
432   data->source = source;
433   if (message->payload_size())
434     memcpy(data + 1, message->payload(), message->payload_size());
435   relayed_message->SetHandles(message->TakeHandles());
436   WriteChannelMessage(std::move(relayed_message));
437 }
438 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
439 
NodeChannel(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner,const ProcessErrorCallback & process_error_callback)440 NodeChannel::NodeChannel(Delegate* delegate,
441                          ConnectionParams connection_params,
442                          scoped_refptr<base::TaskRunner> io_task_runner,
443                          const ProcessErrorCallback& process_error_callback)
444     : delegate_(delegate),
445       io_task_runner_(io_task_runner),
446       process_error_callback_(process_error_callback)
447 #if !defined(OS_NACL_SFI)
448       ,
449       channel_(
450           Channel::Create(this, std::move(connection_params), io_task_runner_))
451 #endif
452 {
453 }
454 
~NodeChannel()455 NodeChannel::~NodeChannel() {
456   ShutDown();
457 }
458 
OnChannelMessage(const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)459 void NodeChannel::OnChannelMessage(const void* payload,
460                                    size_t payload_size,
461                                    std::vector<PlatformHandle> handles) {
462   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
463 
464   RequestContext request_context(RequestContext::Source::SYSTEM);
465 
466   // Ensure this NodeChannel stays alive through the extent of this method. The
467   // delegate may have the only other reference to this object and it may choose
468   // to drop it here in response to, e.g., a malformed message.
469   scoped_refptr<NodeChannel> keepalive = this;
470 
471   if (payload_size <= sizeof(Header)) {
472     delegate_->OnChannelError(remote_node_name_, this);
473     return;
474   }
475 
476   const Header* header = static_cast<const Header*>(payload);
477   switch (header->type) {
478     case MessageType::ACCEPT_INVITEE: {
479       const AcceptInviteeData* data;
480       if (GetMessagePayload(payload, payload_size, &data)) {
481         delegate_->OnAcceptInvitee(remote_node_name_, data->inviter_name,
482                                    data->token);
483         return;
484       }
485       break;
486     }
487 
488     case MessageType::ACCEPT_INVITATION: {
489       const AcceptInvitationData* data;
490       if (GetMessagePayload(payload, payload_size, &data)) {
491         delegate_->OnAcceptInvitation(remote_node_name_, data->token,
492                                       data->invitee_name);
493         return;
494       }
495       break;
496     }
497 
498     case MessageType::ADD_BROKER_CLIENT: {
499       const AddBrokerClientData* data;
500       if (GetMessagePayload(payload, payload_size, &data)) {
501 #if defined(OS_WIN)
502         if (handles.size() != 1) {
503           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
504           break;
505         }
506         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
507                                      handles[0].ReleaseHandle());
508 #else
509         if (!handles.empty()) {
510           DLOG(ERROR) << "Dropping invalid AddBrokerClient message.";
511           break;
512         }
513         delegate_->OnAddBrokerClient(remote_node_name_, data->client_name,
514                                      data->process_handle);
515 #endif
516         return;
517       }
518       break;
519     }
520 
521     case MessageType::BROKER_CLIENT_ADDED: {
522       const BrokerClientAddedData* data;
523       if (GetMessagePayload(payload, payload_size, &data)) {
524         if (handles.size() != 1) {
525           DLOG(ERROR) << "Dropping invalid BrokerClientAdded message.";
526           break;
527         }
528         delegate_->OnBrokerClientAdded(remote_node_name_, data->client_name,
529                                        std::move(handles[0]));
530         return;
531       }
532       break;
533     }
534 
535     case MessageType::ACCEPT_BROKER_CLIENT: {
536       const AcceptBrokerClientData* data;
537       if (GetMessagePayload(payload, payload_size, &data)) {
538         PlatformHandle broker_channel;
539         if (handles.size() > 1) {
540           DLOG(ERROR) << "Dropping invalid AcceptBrokerClient message.";
541           break;
542         }
543         if (handles.size() == 1)
544           broker_channel = std::move(handles[0]);
545 
546         delegate_->OnAcceptBrokerClient(remote_node_name_, data->broker_name,
547                                         std::move(broker_channel));
548         return;
549       }
550       break;
551     }
552 
553     case MessageType::EVENT_MESSAGE: {
554       Channel::MessagePtr message(
555           new Channel::Message(payload_size, handles.size()));
556       message->SetHandles(std::move(handles));
557       memcpy(message->mutable_payload(), payload, payload_size);
558       delegate_->OnEventMessage(remote_node_name_, std::move(message));
559       return;
560     }
561 
562     case MessageType::REQUEST_PORT_MERGE: {
563       const RequestPortMergeData* data;
564       if (GetMessagePayload(payload, payload_size, &data)) {
565         // Don't accept an empty token.
566         size_t token_size = payload_size - sizeof(*data) - sizeof(Header);
567         if (token_size == 0)
568           break;
569         std::string token(reinterpret_cast<const char*>(data + 1), token_size);
570         delegate_->OnRequestPortMerge(remote_node_name_,
571                                       data->connector_port_name, token);
572         return;
573       }
574       break;
575     }
576 
577     case MessageType::REQUEST_INTRODUCTION: {
578       const IntroductionData* data;
579       if (GetMessagePayload(payload, payload_size, &data)) {
580         delegate_->OnRequestIntroduction(remote_node_name_, data->name);
581         return;
582       }
583       break;
584     }
585 
586     case MessageType::INTRODUCE: {
587       const IntroductionData* data;
588       if (GetMessagePayload(payload, payload_size, &data)) {
589         if (handles.size() > 1) {
590           DLOG(ERROR) << "Dropping invalid introduction message.";
591           break;
592         }
593         PlatformHandle channel_handle;
594         if (handles.size() == 1)
595           channel_handle = std::move(handles[0]);
596 
597         delegate_->OnIntroduce(remote_node_name_, data->name,
598                                std::move(channel_handle));
599         return;
600       }
601       break;
602     }
603 
604 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
605     case MessageType::RELAY_EVENT_MESSAGE: {
606       base::ProcessHandle from_process;
607       {
608         base::AutoLock lock(remote_process_handle_lock_);
609         // NOTE: It's safe to retain a weak reference to this process handle
610         // through the extent of this call because |this| is kept alive and
611         // |remote_process_handle_| is never reset once set.
612         from_process = remote_process_handle_.get();
613       }
614       const RelayEventMessageData* data;
615       if (GetMessagePayload(payload, payload_size, &data)) {
616         // Don't try to relay an empty message.
617         if (payload_size <= sizeof(Header) + sizeof(RelayEventMessageData))
618           break;
619 
620         const void* message_start = data + 1;
621         Channel::MessagePtr message = Channel::Message::Deserialize(
622             message_start, payload_size - sizeof(Header) - sizeof(*data),
623             from_process);
624         if (!message) {
625           DLOG(ERROR) << "Dropping invalid relay message.";
626           break;
627         }
628 #if defined(OS_MACOSX) && !defined(OS_IOS)
629         message->SetHandles(std::move(handles));
630 #endif
631         delegate_->OnRelayEventMessage(remote_node_name_, from_process,
632                                        data->destination, std::move(message));
633         return;
634       }
635       break;
636     }
637 #endif
638 
639     case MessageType::BROADCAST_EVENT: {
640       if (payload_size <= sizeof(Header))
641         break;
642       const void* data = static_cast<const void*>(
643           reinterpret_cast<const Header*>(payload) + 1);
644       Channel::MessagePtr message =
645           Channel::Message::Deserialize(data, payload_size - sizeof(Header));
646       if (!message || message->has_handles()) {
647         DLOG(ERROR) << "Dropping invalid broadcast message.";
648         break;
649       }
650       delegate_->OnBroadcast(remote_node_name_, std::move(message));
651       return;
652     }
653 
654 #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
655     case MessageType::EVENT_MESSAGE_FROM_RELAY:
656       const EventMessageFromRelayData* data;
657       if (GetMessagePayload(payload, payload_size, &data)) {
658         size_t num_bytes = payload_size - sizeof(*data);
659         if (num_bytes < sizeof(Header))
660           break;
661         num_bytes -= sizeof(Header);
662 
663         Channel::MessagePtr message(
664             new Channel::Message(num_bytes, handles.size()));
665         message->SetHandles(std::move(handles));
666         if (num_bytes)
667           memcpy(message->mutable_payload(), data + 1, num_bytes);
668         delegate_->OnEventMessageFromRelay(remote_node_name_, data->source,
669                                            std::move(message));
670         return;
671       }
672       break;
673 
674 #endif  // defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS))
675 
676     case MessageType::ACCEPT_PEER: {
677       const AcceptPeerData* data;
678       if (GetMessagePayload(payload, payload_size, &data)) {
679         delegate_->OnAcceptPeer(remote_node_name_, data->token, data->peer_name,
680                                 data->port_name);
681         return;
682       }
683       break;
684     }
685 
686     default:
687       // Ignore unrecognized message types, allowing for future extensibility.
688       return;
689   }
690 
691   DLOG(ERROR) << "Received invalid message. Closing channel.";
692   if (process_error_callback_)
693     process_error_callback_.Run("NodeChannel received a malformed message");
694   delegate_->OnChannelError(remote_node_name_, this);
695 }
696 
OnChannelError(Channel::Error error)697 void NodeChannel::OnChannelError(Channel::Error error) {
698   DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
699 
700   RequestContext request_context(RequestContext::Source::SYSTEM);
701 
702   ShutDown();
703 
704   if (process_error_callback_ &&
705       error == Channel::Error::kReceivedMalformedData) {
706     process_error_callback_.Run("Channel received a malformed message");
707   }
708 
709   // |OnChannelError()| may cause |this| to be destroyed, but still need access
710   // to the name name after that destruction. So may a copy of
711   // |remote_node_name_| so it can be used if |this| becomes destroyed.
712   ports::NodeName node_name = remote_node_name_;
713   delegate_->OnChannelError(node_name, this);
714 }
715 
WriteChannelMessage(Channel::MessagePtr message)716 void NodeChannel::WriteChannelMessage(Channel::MessagePtr message) {
717   // Force a crash if this process attempts to send a message larger than the
718   // maximum allowed size. This is more useful than killing a Channel when we
719   // *receive* an oversized message, as we should consider oversized message
720   // transmission to be a bug and this helps easily identify offending code.
721   CHECK(message->data_num_bytes() < GetConfiguration().max_message_num_bytes);
722 
723   base::AutoLock lock(channel_lock_);
724   if (!channel_)
725     DLOG(ERROR) << "Dropping message on closed channel.";
726   else
727     channel_->Write(std::move(message));
728 }
729 
730 }  // namespace core
731 }  // namespace mojo
732