1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/core/channel.h"
6 
7 #include <errno.h>
8 #include <sys/socket.h>
9 
10 #include <algorithm>
11 #include <limits>
12 #include <memory>
13 
14 #include "base/bind.h"
15 #include "base/containers/queue.h"
16 #include "base/location.h"
17 #include "base/macros.h"
18 #include "base/memory/ref_counted.h"
19 #include "base/message_loop/message_loop_current.h"
20 #include "base/message_loop/message_pump_for_io.h"
21 #include "base/synchronization/lock.h"
22 #include "base/task_runner.h"
23 #include "build/build_config.h"
24 #include "mojo/core/core.h"
25 #include "mojo/public/cpp/platform/socket_utils_posix.h"
26 
27 #if !defined(OS_NACL)
28 #include <sys/uio.h>
29 #endif
30 
31 #if defined(OS_MACOSX) && !defined(OS_IOS)
32 #include "mojo/core/mach_port_relay.h"
33 #endif
34 
35 namespace mojo {
36 namespace core {
37 
38 namespace {
39 
40 const size_t kMaxBatchReadCapacity = 256 * 1024;
41 
42 // A view over a Channel::Message object. The write queue uses these since
43 // large messages may need to be sent in chunks.
44 class MessageView {
45  public:
46   // Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message,size_t offset)47   MessageView(Channel::MessagePtr message, size_t offset)
48       : message_(std::move(message)),
49         offset_(offset),
50         handles_(message_->TakeHandlesForTransport()) {
51     DCHECK_GT(message_->data_num_bytes(), offset_);
52   }
53 
MessageView(MessageView && other)54   MessageView(MessageView&& other) { *this = std::move(other); }
55 
operator =(MessageView && other)56   MessageView& operator=(MessageView&& other) {
57     message_ = std::move(other.message_);
58     offset_ = other.offset_;
59     handles_ = std::move(other.handles_);
60     return *this;
61   }
62 
~MessageView()63   ~MessageView() {}
64 
data() const65   const void* data() const {
66     return static_cast<const char*>(message_->data()) + offset_;
67   }
68 
data_num_bytes() const69   size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
70 
data_offset() const71   size_t data_offset() const { return offset_; }
advance_data_offset(size_t num_bytes)72   void advance_data_offset(size_t num_bytes) {
73     DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
74     offset_ += num_bytes;
75   }
76 
TakeHandles()77   std::vector<PlatformHandleInTransit> TakeHandles() {
78     return std::move(handles_);
79   }
TakeMessage()80   Channel::MessagePtr TakeMessage() { return std::move(message_); }
81 
SetHandles(std::vector<PlatformHandleInTransit> handles)82   void SetHandles(std::vector<PlatformHandleInTransit> handles) {
83     handles_ = std::move(handles);
84   }
85 
86  private:
87   Channel::MessagePtr message_;
88   size_t offset_;
89   std::vector<PlatformHandleInTransit> handles_;
90 
91   DISALLOW_COPY_AND_ASSIGN(MessageView);
92 };
93 
94 class ChannelPosix : public Channel,
95 #if defined(OS_MACOSX) && !defined(OS_IOS)
96                      public MachPortRelay::Observer,
97 #endif
98                      public base::MessageLoopCurrent::DestructionObserver,
99                      public base::MessagePumpForIO::FdWatcher {
100  public:
ChannelPosix(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)101   ChannelPosix(Delegate* delegate,
102                ConnectionParams connection_params,
103                scoped_refptr<base::TaskRunner> io_task_runner)
104       : Channel(delegate), self_(this), io_task_runner_(io_task_runner) {
105     if (connection_params.server_endpoint().is_valid())
106       server_ = connection_params.TakeServerEndpoint();
107     else
108       socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD();
109 
110     CHECK(server_.is_valid() || socket_.is_valid());
111   }
112 
Start()113   void Start() override {
114 #if defined(OS_MACOSX) && !defined(OS_IOS)
115     auto* relay = Core::Get()->GetMachPortRelay();
116     if (relay) {
117       // We should only have a relay if we know the remote process handle,
118       // because that means we're in the broker process.
119       relay->AddObserver(this);
120     }
121 #endif
122 
123     if (io_task_runner_->RunsTasksInCurrentSequence()) {
124       StartOnIOThread();
125     } else {
126       io_task_runner_->PostTask(
127           FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this));
128     }
129   }
130 
ShutDownImpl()131   void ShutDownImpl() override {
132     // Always shut down asynchronously when called through the public interface.
133     io_task_runner_->PostTask(
134         FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this));
135   }
136 
Write(MessagePtr message)137   void Write(MessagePtr message) override {
138 #if defined(OS_MACOSX) && !defined(OS_IOS)
139     // If this message has Mach ports and we have a MachPortRelay, use the relay
140     // to rewrite the ports as receive rights from which the send right can be
141     // read. See |MachPortRelay::SendPortsToProcess()|.
142     //
143     // Note that if we don't have a relay, the receiving process must, and they
144     // must also have the ability to extract a send right from the ports that
145     // are already attached.
146     MachPortRelay* relay = Core::Get()->GetMachPortRelay();
147     if (relay && remote_process().is_valid() && message->has_mach_ports()) {
148       if (relay->port_provider()->TaskForPid(remote_process().get()) ==
149           MACH_PORT_NULL) {
150         // We also need to have a task port for the remote process before we can
151         // send it any other ports. If we don't have one yet, queue the message
152         // until OnProcessReady() is invoked.
153         base::AutoLock lock(task_port_wait_lock_);
154         pending_outgoing_with_mach_ports_.emplace_back(std::move(message));
155         return;
156       }
157 
158       relay->SendPortsToProcess(message.get(), remote_process().get());
159     }
160 #endif
161 
162     bool write_error = false;
163     {
164       base::AutoLock lock(write_lock_);
165       if (reject_writes_)
166         return;
167       if (outgoing_messages_.empty()) {
168         if (!WriteNoLock(MessageView(std::move(message), 0)))
169           reject_writes_ = write_error = true;
170       } else {
171         outgoing_messages_.emplace_back(std::move(message), 0);
172       }
173     }
174     if (write_error) {
175       // Invoke OnWriteError() asynchronously on the IO thread, in case Write()
176       // was called by the delegate, in which case we should not re-enter it.
177       io_task_runner_->PostTask(
178           FROM_HERE, base::BindOnce(&ChannelPosix::OnWriteError, this,
179                                     Error::kDisconnected));
180     }
181   }
182 
LeakHandle()183   void LeakHandle() override {
184     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
185     leak_handle_ = true;
186   }
187 
GetReadPlatformHandles(const void * payload,size_t payload_size,size_t num_handles,const void * extra_header,size_t extra_header_size,std::vector<PlatformHandle> * handles,bool * deferred)188   bool GetReadPlatformHandles(const void* payload,
189                               size_t payload_size,
190                               size_t num_handles,
191                               const void* extra_header,
192                               size_t extra_header_size,
193                               std::vector<PlatformHandle>* handles,
194                               bool* deferred) override {
195     if (num_handles > std::numeric_limits<uint16_t>::max())
196       return false;
197 #if defined(OS_MACOSX) && !defined(OS_IOS)
198     // On OSX, we can have mach ports which are located in the extra header
199     // section.
200     using MachPortsEntry = Channel::Message::MachPortsEntry;
201     using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader;
202     if (extra_header_size <
203         sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)) {
204       return false;
205     }
206     const MachPortsExtraHeader* mach_ports_header =
207         reinterpret_cast<const MachPortsExtraHeader*>(extra_header);
208     size_t num_mach_ports = mach_ports_header->num_ports;
209     if (num_mach_ports > num_handles)
210       return false;
211     if (incoming_fds_.size() + num_mach_ports < num_handles)
212       return true;
213 
214     std::vector<PlatformHandleInTransit> handles_in_transit(num_handles);
215     const MachPortsEntry* mach_ports = mach_ports_header->entries;
216 
217     // If we know the remote process handle, we assume all incoming Mach ports
218     // are send right references owned by the remote process. Otherwise they're
219     // receive ports we can use to read a send right.
220     const bool extract_send_rights = remote_process().is_valid();
221     for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) {
222       if (mach_port_index < num_mach_ports &&
223           mach_ports[mach_port_index].index == i) {
224         mach_port_t port_name =
225             static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port);
226         if (extract_send_rights) {
227           handles_in_transit[i] =
228               PlatformHandleInTransit::CreateForMachPortName(port_name);
229         } else {
230           handles_in_transit[i] = PlatformHandleInTransit(
231               PlatformHandle(MachPortRelay::ReceiveSendRight(
232                   base::mac::ScopedMachReceiveRight(port_name))));
233         }
234         mach_port_index++;
235       } else {
236         if (incoming_fds_.empty())
237           return false;
238         handles_in_transit[i] = PlatformHandleInTransit(
239             PlatformHandle(std::move(incoming_fds_.front())));
240         incoming_fds_.pop_front();
241       }
242     }
243     if (extract_send_rights && num_mach_ports) {
244       MachPortRelay* relay = Core::Get()->GetMachPortRelay();
245       DCHECK(relay);
246       // Extracting send rights requires that we have a task port for the
247       // remote process, which we may not yet have.
248       if (relay->port_provider()->TaskForPid(remote_process().get()) !=
249           MACH_PORT_NULL) {
250         // We do have a task port, so extract the send rights immediately.
251         for (auto& handle : handles_in_transit) {
252           if (handle.is_mach_port_name()) {
253             handle = PlatformHandleInTransit(PlatformHandle(relay->ExtractPort(
254                 handle.mach_port_name(), remote_process().get())));
255           }
256         }
257       } else {
258         // No task port, we have to defer this message.
259         *deferred = true;
260         base::AutoLock lock(task_port_wait_lock_);
261         std::vector<uint8_t> data(payload_size);
262         memcpy(data.data(), payload, payload_size);
263         pending_incoming_with_mach_ports_.emplace_back(
264             std::move(data), std::move(handles_in_transit));
265         return true;
266       }
267     }
268 
269     handles->resize(handles_in_transit.size());
270     for (size_t i = 0; i < handles->size(); ++i)
271       handles->at(i) = handles_in_transit[i].TakeHandle();
272 #else
273     if (incoming_fds_.size() < num_handles)
274       return true;
275 
276     handles->resize(num_handles);
277     for (size_t i = 0; i < num_handles; ++i) {
278       handles->at(i) = PlatformHandle(std::move(incoming_fds_.front()));
279       incoming_fds_.pop_front();
280     }
281 #endif
282 
283     return true;
284   }
285 
286  private:
~ChannelPosix()287   ~ChannelPosix() override {
288     DCHECK(!read_watcher_);
289     DCHECK(!write_watcher_);
290   }
291 
StartOnIOThread()292   void StartOnIOThread() {
293     DCHECK(!read_watcher_);
294     DCHECK(!write_watcher_);
295     read_watcher_.reset(
296         new base::MessagePumpForIO::FdWatchController(FROM_HERE));
297     base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
298     if (server_.is_valid()) {
299       base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
300           server_.platform_handle().GetFD().get(), false /* persistent */,
301           base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
302     } else {
303       write_watcher_.reset(
304           new base::MessagePumpForIO::FdWatchController(FROM_HERE));
305       base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
306           socket_.get(), true /* persistent */,
307           base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this);
308       base::AutoLock lock(write_lock_);
309       FlushOutgoingMessagesNoLock();
310     }
311   }
312 
WaitForWriteOnIOThread()313   void WaitForWriteOnIOThread() {
314     base::AutoLock lock(write_lock_);
315     WaitForWriteOnIOThreadNoLock();
316   }
317 
WaitForWriteOnIOThreadNoLock()318   void WaitForWriteOnIOThreadNoLock() {
319     if (pending_write_)
320       return;
321     if (!write_watcher_)
322       return;
323     if (io_task_runner_->RunsTasksInCurrentSequence()) {
324       pending_write_ = true;
325       base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor(
326           socket_.get(), false /* persistent */,
327           base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this);
328     } else {
329       io_task_runner_->PostTask(
330           FROM_HERE,
331           base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this));
332     }
333   }
334 
ShutDownOnIOThread()335   void ShutDownOnIOThread() {
336     base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
337 
338     read_watcher_.reset();
339     write_watcher_.reset();
340     if (leak_handle_) {
341       ignore_result(socket_.release());
342       server_.TakePlatformHandle().release();
343     } else {
344       socket_.reset();
345       ignore_result(server_.TakePlatformHandle());
346     }
347 #if defined(OS_MACOSX)
348     fds_to_close_.clear();
349 #endif
350 
351 #if defined(OS_MACOSX) && !defined(OS_IOS)
352     auto* relay = Core::Get()->GetMachPortRelay();
353     if (relay)
354       relay->RemoveObserver(this);
355 #endif
356 
357     // May destroy the |this| if it was the last reference.
358     self_ = nullptr;
359   }
360 
361 #if defined(OS_MACOSX) && !defined(OS_IOS)
362   // MachPortRelay::Observer:
OnProcessReady(base::ProcessHandle process)363   void OnProcessReady(base::ProcessHandle process) override {
364     if (process != remote_process().get())
365       return;
366 
367     io_task_runner_->PostTask(
368         FROM_HERE,
369         base::BindOnce(
370             &ChannelPosix::FlushPendingMessagesWithMachPortsOnIOThread, this));
371   }
372 
FlushPendingMessagesWithMachPortsOnIOThread()373   void FlushPendingMessagesWithMachPortsOnIOThread() {
374     // We have a task port for the remote process. Now we can send or accept
375     // any pending messages with Mach ports.
376     std::vector<RawIncomingMessage> incoming;
377     std::vector<MessagePtr> outgoing;
378     {
379       base::AutoLock lock(task_port_wait_lock_);
380       if (reject_incoming_messages_with_mach_ports_)
381         return;
382       std::swap(pending_incoming_with_mach_ports_, incoming);
383       std::swap(pending_outgoing_with_mach_ports_, outgoing);
384     }
385 
386     DCHECK(remote_process().is_valid());
387     base::ProcessHandle process = remote_process().get();
388     MachPortRelay* relay = Core::Get()->GetMachPortRelay();
389     DCHECK(relay);
390     for (auto& message : incoming) {
391       Channel::Delegate* d = delegate();
392       if (!d)
393         break;
394       std::vector<PlatformHandle> handles(message.handles.size());
395       for (size_t i = 0; i < message.handles.size(); ++i) {
396         if (message.handles[i].is_mach_port_name()) {
397           handles[i] = PlatformHandle(
398               relay->ExtractPort(message.handles[i].mach_port_name(), process));
399         } else {
400           DCHECK(!message.handles[i].owning_process().is_valid());
401           handles[i] = message.handles[i].TakeHandle();
402         }
403       }
404       d->OnChannelMessage(message.data.data(), message.data.size(),
405                           std::move(handles));
406     }
407 
408     for (auto& message : outgoing) {
409       relay->SendPortsToProcess(message.get(), process);
410       Write(std::move(message));
411     }
412   }
413 #endif
414 
415   // base::MessageLoopCurrent::DestructionObserver:
WillDestroyCurrentMessageLoop()416   void WillDestroyCurrentMessageLoop() override {
417     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
418     if (self_)
419       ShutDownOnIOThread();
420   }
421 
422   // base::MessagePumpForIO::FdWatcher:
OnFileCanReadWithoutBlocking(int fd)423   void OnFileCanReadWithoutBlocking(int fd) override {
424     if (server_.is_valid()) {
425       CHECK_EQ(fd, server_.platform_handle().GetFD().get());
426 #if !defined(OS_NACL)
427       read_watcher_.reset();
428       base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
429 
430       AcceptSocketConnection(server_.platform_handle().GetFD().get(), &socket_);
431       ignore_result(server_.TakePlatformHandle());
432       if (!socket_.is_valid()) {
433         OnError(Error::kConnectionFailed);
434         return;
435       }
436       StartOnIOThread();
437 #else
438       NOTREACHED();
439 #endif
440       return;
441     }
442     CHECK_EQ(fd, socket_.get());
443 
444     bool validation_error = false;
445     bool read_error = false;
446     size_t next_read_size = 0;
447     size_t buffer_capacity = 0;
448     size_t total_bytes_read = 0;
449     size_t bytes_read = 0;
450     do {
451       buffer_capacity = next_read_size;
452       char* buffer = GetReadBuffer(&buffer_capacity);
453       DCHECK_GT(buffer_capacity, 0u);
454 
455       std::vector<base::ScopedFD> incoming_fds;
456       ssize_t read_result =
457           SocketRecvmsg(socket_.get(), buffer, buffer_capacity, &incoming_fds);
458       for (auto& fd : incoming_fds)
459         incoming_fds_.emplace_back(std::move(fd));
460 
461       if (read_result > 0) {
462         bytes_read = static_cast<size_t>(read_result);
463         total_bytes_read += bytes_read;
464         if (!OnReadComplete(bytes_read, &next_read_size)) {
465           read_error = true;
466           validation_error = true;
467           break;
468         }
469       } else if (read_result == 0 ||
470                  (errno != EAGAIN && errno != EWOULDBLOCK)) {
471         read_error = true;
472         break;
473       }
474     } while (bytes_read == buffer_capacity &&
475              total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
476     if (read_error) {
477       // Stop receiving read notifications.
478       read_watcher_.reset();
479       if (validation_error)
480         OnError(Error::kReceivedMalformedData);
481       else
482         OnError(Error::kDisconnected);
483     }
484   }
485 
OnFileCanWriteWithoutBlocking(int fd)486   void OnFileCanWriteWithoutBlocking(int fd) override {
487     bool write_error = false;
488     {
489       base::AutoLock lock(write_lock_);
490       pending_write_ = false;
491       if (!FlushOutgoingMessagesNoLock())
492         reject_writes_ = write_error = true;
493     }
494     if (write_error)
495       OnWriteError(Error::kDisconnected);
496   }
497 
498   // Attempts to write a message directly to the channel. If the full message
499   // cannot be written, it's queued and a wait is initiated to write the message
500   // ASAP on the I/O thread.
WriteNoLock(MessageView message_view)501   bool WriteNoLock(MessageView message_view) {
502     if (server_.is_valid()) {
503       outgoing_messages_.emplace_front(std::move(message_view));
504       return true;
505     }
506     size_t bytes_written = 0;
507     do {
508       message_view.advance_data_offset(bytes_written);
509 
510       ssize_t result;
511       std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles();
512       if (!handles.empty()) {
513         iovec iov = {const_cast<void*>(message_view.data()),
514                      message_view.data_num_bytes()};
515         std::vector<base::ScopedFD> fds(handles.size());
516         for (size_t i = 0; i < handles.size(); ++i)
517           fds[i] = handles[i].TakeHandle().TakeFD();
518         // TODO: Handle lots of handles.
519         result = SendmsgWithHandles(socket_.get(), &iov, 1, fds);
520         if (result >= 0) {
521 #if defined(OS_MACOSX)
522           // There is a bug on OSX which makes it dangerous to close
523           // a file descriptor while it is in transit. So instead we
524           // store the file descriptor in a set and send a message to
525           // the recipient, which is queued AFTER the message that
526           // sent the FD. The recipient will reply to the message,
527           // letting us know that it is now safe to close the file
528           // descriptor. For more information, see:
529           // http://crbug.com/298276
530           MessagePtr fds_message(
531               new Channel::Message(sizeof(fds[0]) * fds.size(), 0,
532                                    Message::MessageType::HANDLES_SENT));
533           memcpy(fds_message->mutable_payload(), fds.data(),
534                  sizeof(fds[0]) * fds.size());
535           outgoing_messages_.emplace_back(std::move(fds_message), 0);
536           {
537             base::AutoLock l(fds_to_close_lock_);
538             for (auto& fd : fds)
539               fds_to_close_.emplace_back(std::move(fd));
540           }
541 #endif  // defined(OS_MACOSX)
542         } else {
543           // Message transmission failed, so pull the FDs back into |handles|
544           // so they can be held by the Message again.
545           for (size_t i = 0; i < fds.size(); ++i) {
546             handles[i] =
547                 PlatformHandleInTransit(PlatformHandle(std::move(fds[i])));
548           }
549         }
550       } else {
551         result = SocketWrite(socket_.get(), message_view.data(),
552                              message_view.data_num_bytes());
553       }
554 
555       if (result < 0) {
556         if (errno != EAGAIN &&
557             errno != EWOULDBLOCK
558 #if defined(OS_MACOSX)
559             // On OS X if sendmsg() is trying to send fds between processes and
560             // there isn't enough room in the output buffer to send the fd
561             // structure over atomically then EMSGSIZE is returned.
562             //
563             // EMSGSIZE presents a problem since the system APIs can only call
564             // us when there's room in the socket buffer and not when there is
565             // "enough" room.
566             //
567             // The current behavior is to return to the event loop when EMSGSIZE
568             // is received and hopefull service another FD.  This is however
569             // still technically a busy wait since the event loop will call us
570             // right back until the receiver has read enough data to allow
571             // passing the FD over atomically.
572             && errno != EMSGSIZE
573 #endif
574             ) {
575           return false;
576         }
577         message_view.SetHandles(std::move(handles));
578         outgoing_messages_.emplace_front(std::move(message_view));
579         WaitForWriteOnIOThreadNoLock();
580         return true;
581       }
582 
583       bytes_written = static_cast<size_t>(result);
584     } while (bytes_written < message_view.data_num_bytes());
585 
586     return FlushOutgoingMessagesNoLock();
587   }
588 
FlushOutgoingMessagesNoLock()589   bool FlushOutgoingMessagesNoLock() {
590     base::circular_deque<MessageView> messages;
591     std::swap(outgoing_messages_, messages);
592 
593     while (!messages.empty()) {
594       if (!WriteNoLock(std::move(messages.front())))
595         return false;
596 
597       messages.pop_front();
598       if (!outgoing_messages_.empty()) {
599         // The message was requeued by WriteNoLock(), so we have to wait for
600         // pipe to become writable again. Repopulate the message queue and exit.
601         // If sending the message triggered any control messages, they may be
602         // in |outgoing_messages_| in addition to or instead of the message
603         // being sent.
604         std::swap(messages, outgoing_messages_);
605         while (!messages.empty()) {
606           outgoing_messages_.push_front(std::move(messages.back()));
607           messages.pop_back();
608         }
609         return true;
610       }
611     }
612 
613     return true;
614   }
615 
616 #if defined(OS_MACOSX)
OnControlMessage(Message::MessageType message_type,const void * payload,size_t payload_size,std::vector<PlatformHandle> handles)617   bool OnControlMessage(Message::MessageType message_type,
618                         const void* payload,
619                         size_t payload_size,
620                         std::vector<PlatformHandle> handles) override {
621     switch (message_type) {
622       case Message::MessageType::HANDLES_SENT: {
623         if (payload_size == 0)
624           break;
625         MessagePtr message(new Channel::Message(
626             payload_size, 0, Message::MessageType::HANDLES_SENT_ACK));
627         memcpy(message->mutable_payload(), payload, payload_size);
628         Write(std::move(message));
629         return true;
630       }
631 
632       case Message::MessageType::HANDLES_SENT_ACK: {
633         size_t num_fds = payload_size / sizeof(int);
634         if (num_fds == 0 || payload_size % sizeof(int) != 0)
635           break;
636 
637         const int* fds = reinterpret_cast<const int*>(payload);
638         if (!CloseHandles(fds, num_fds))
639           break;
640         return true;
641       }
642 
643       default:
644         break;
645     }
646 
647     return false;
648   }
649 
650   // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if
651   // |fds| does not match a sequence of handles in |fds_to_close_|.
CloseHandles(const int * fds,size_t num_fds)652   bool CloseHandles(const int* fds, size_t num_fds) {
653     base::AutoLock l(fds_to_close_lock_);
654     if (!num_fds)
655       return false;
656 
657     auto start = std::find_if(
658         fds_to_close_.begin(), fds_to_close_.end(),
659         [&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; });
660     if (start == fds_to_close_.end())
661       return false;
662 
663     auto it = start;
664     size_t i = 0;
665     // The FDs in the message should match a sequence of handles in
666     // |fds_to_close_|.
667     // TODO(wez): Consider making |fds_to_close_| a circular_deque<>
668     // for greater efficiency? Or assign a unique Id to each FD-containing
669     // message, and map that to a vector of FDs to close, to avoid the
670     // need for this traversal? Id could even be the first FD in the message.
671     for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) {
672       if (it->get() != fds[i])
673         return false;
674     }
675     if (i != num_fds)
676       return false;
677 
678     // Close the FDs by erase()ing their ScopedFDs.
679     fds_to_close_.erase(start, it);
680     return true;
681   }
682 #endif  // defined(OS_MACOSX)
683 
OnWriteError(Error error)684   void OnWriteError(Error error) {
685     DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
686     DCHECK(reject_writes_);
687 
688     if (error == Error::kDisconnected) {
689       // If we can't write because the pipe is disconnected then continue
690       // reading to fetch any in-flight messages, relying on end-of-stream to
691       // signal the actual disconnection.
692       if (read_watcher_) {
693         write_watcher_.reset();
694         return;
695       }
696     }
697 
698     OnError(error);
699   }
700 
701   // Keeps the Channel alive at least until explicit shutdown on the IO thread.
702   scoped_refptr<Channel> self_;
703 
704   // We may be initialized with a server socket, in which case this will be
705   // valid until it accepts an incoming connection.
706   PlatformChannelServerEndpoint server_;
707 
708   // The socket over which to communicate. May be passed in at construction time
709   // or accepted over |server_|.
710   base::ScopedFD socket_;
711 
712   scoped_refptr<base::TaskRunner> io_task_runner_;
713 
714   // These watchers must only be accessed on the IO thread.
715   std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_;
716   std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_;
717 
718   base::circular_deque<base::ScopedFD> incoming_fds_;
719 
720   // Protects |pending_write_| and |outgoing_messages_|.
721   base::Lock write_lock_;
722   bool pending_write_ = false;
723   bool reject_writes_ = false;
724   base::circular_deque<MessageView> outgoing_messages_;
725 
726   bool leak_handle_ = false;
727 
728 #if defined(OS_MACOSX)
729   base::Lock fds_to_close_lock_;
730   std::vector<base::ScopedFD> fds_to_close_;
731 #if !defined(OS_IOS)
732   // Guards access to the send/receive queues below. These are messages that
733   // can't be fully accepted from or dispatched to the Channel user yet because
734   // we're still waiting on a task port for the remote process.
735   struct RawIncomingMessage {
RawIncomingMessagemojo::core::__anonbe4d50500111::ChannelPosix::RawIncomingMessage736     RawIncomingMessage(std::vector<uint8_t> data,
737                        std::vector<PlatformHandleInTransit> handles)
738         : data(std::move(data)), handles(std::move(handles)) {}
739     RawIncomingMessage(RawIncomingMessage&&) = default;
740     ~RawIncomingMessage() = default;
741 
742     std::vector<uint8_t> data;
743     std::vector<PlatformHandleInTransit> handles;
744   };
745 
746   base::Lock task_port_wait_lock_;
747   bool reject_incoming_messages_with_mach_ports_ = false;
748   std::vector<MessagePtr> pending_outgoing_with_mach_ports_;
749   std::vector<RawIncomingMessage> pending_incoming_with_mach_ports_;
750 #endif  // !defined(OS_IOS)
751 #endif  // defined(OS_MACOSX)
752 
753   DISALLOW_COPY_AND_ASSIGN(ChannelPosix);
754 };
755 
756 }  // namespace
757 
758 // static
Create(Delegate * delegate,ConnectionParams connection_params,scoped_refptr<base::TaskRunner> io_task_runner)759 scoped_refptr<Channel> Channel::Create(
760     Delegate* delegate,
761     ConnectionParams connection_params,
762     scoped_refptr<base::TaskRunner> io_task_runner) {
763   return new ChannelPosix(delegate, std::move(connection_params),
764                           io_task_runner);
765 }
766 
767 }  // namespace core
768 }  // namespace mojo
769