1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG USB
18 
19 #include "sysdeps.h"
20 
21 #include <errno.h>
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <sys/ioctl.h>
27 #include <sys/types.h>
28 #include <unistd.h>
29 
30 #include <linux/usb/functionfs.h>
31 #include <sys/eventfd.h>
32 
33 #include <algorithm>
34 #include <array>
35 #include <future>
36 #include <memory>
37 #include <mutex>
38 #include <optional>
39 #include <vector>
40 
41 #include <asyncio/AsyncIO.h>
42 
43 #include <android-base/logging.h>
44 #include <android-base/macros.h>
45 #include <android-base/parsebool.h>
46 #include <android-base/properties.h>
47 #include <android-base/thread_annotations.h>
48 
49 #include "adb_unique_fd.h"
50 #include "adb_utils.h"
51 #include "daemon/property_monitor.h"
52 #include "daemon/usb_ffs.h"
53 #include "sysdeps/chrono.h"
54 #include "transfer_id.h"
55 #include "transport.h"
56 #include "types.h"
57 
58 using android::base::StringPrintf;
59 
60 // Not all USB controllers support operations larger than 16k, so don't go above that.
61 // Also, each submitted operation does an allocation in the kernel of that size, so we want to
62 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed.
63 static constexpr size_t kUsbReadQueueDepth = 8;
64 static constexpr size_t kUsbReadSize = 16384;
65 
66 static constexpr size_t kUsbWriteQueueDepth = 8;
67 static constexpr size_t kUsbWriteSize = 16384;
68 
to_string(enum usb_functionfs_event_type type)69 static const char* to_string(enum usb_functionfs_event_type type) {
70     switch (type) {
71         case FUNCTIONFS_BIND:
72             return "FUNCTIONFS_BIND";
73         case FUNCTIONFS_UNBIND:
74             return "FUNCTIONFS_UNBIND";
75         case FUNCTIONFS_ENABLE:
76             return "FUNCTIONFS_ENABLE";
77         case FUNCTIONFS_DISABLE:
78             return "FUNCTIONFS_DISABLE";
79         case FUNCTIONFS_SETUP:
80             return "FUNCTIONFS_SETUP";
81         case FUNCTIONFS_SUSPEND:
82             return "FUNCTIONFS_SUSPEND";
83         case FUNCTIONFS_RESUME:
84             return "FUNCTIONFS_RESUME";
85     }
86 }
87 
88 template <class Payload>
89 struct IoBlock {
90     bool pending = false;
91     struct iocb control = {};
92     Payload payload;
93 
idIoBlock94     TransferId id() const { return TransferId::from_value(control.aio_data); }
95 };
96 
97 using IoReadBlock = IoBlock<Block>;
98 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>;
99 
100 struct ScopedAioContext {
101     ScopedAioContext() = default;
~ScopedAioContextScopedAioContext102     ~ScopedAioContext() { reset(); }
103 
ScopedAioContextScopedAioContext104     ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); }
105     ScopedAioContext(const ScopedAioContext& copy) = delete;
106 
operator =ScopedAioContext107     ScopedAioContext& operator=(ScopedAioContext&& move) {
108         reset(move.release());
109         return *this;
110     }
111     ScopedAioContext& operator=(const ScopedAioContext& copy) = delete;
112 
CreateScopedAioContext113     static ScopedAioContext Create(size_t max_events) {
114         aio_context_t ctx = 0;
115         if (io_setup(max_events, &ctx) != 0) {
116             PLOG(FATAL) << "failed to create aio_context_t";
117         }
118         ScopedAioContext result;
119         result.reset(ctx);
120         return result;
121     }
122 
releaseScopedAioContext123     aio_context_t release() {
124         aio_context_t result = context_;
125         context_ = 0;
126         return result;
127     }
128 
resetScopedAioContext129     void reset(aio_context_t new_context = 0) {
130         if (context_ != 0) {
131             io_destroy(context_);
132         }
133 
134         context_ = new_context;
135     }
136 
getScopedAioContext137     aio_context_t get() { return context_; }
138 
139   private:
140     aio_context_t context_ = 0;
141 };
142 
143 struct UsbFfsConnection : public Connection {
UsbFfsConnectionUsbFfsConnection144     UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write,
145                      std::promise<void> destruction_notifier)
146         : worker_started_(false),
147           stopped_(false),
148           destruction_notifier_(std::move(destruction_notifier)),
149           control_fd_(std::move(control)),
150           read_fd_(std::move(read)),
151           write_fd_(std::move(write)) {
152         LOG(INFO) << "UsbFfsConnection constructed";
153         worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
154         if (worker_event_fd_ == -1) {
155             PLOG(FATAL) << "failed to create eventfd";
156         }
157 
158         monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
159         if (monitor_event_fd_ == -1) {
160             PLOG(FATAL) << "failed to create eventfd";
161         }
162 
163         aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth);
164     }
165 
~UsbFfsConnectionUsbFfsConnection166     ~UsbFfsConnection() {
167         LOG(INFO) << "UsbFfsConnection being destroyed";
168         Stop();
169         monitor_thread_.join();
170 
171         // We need to explicitly close our file descriptors before we notify our destruction,
172         // because the thread listening on the future will immediately try to reopen the endpoint.
173         aio_context_.reset();
174         control_fd_.reset();
175         read_fd_.reset();
176         write_fd_.reset();
177 
178         destruction_notifier_.set_value();
179     }
180 
WriteUsbFfsConnection181     virtual bool Write(std::unique_ptr<apacket> packet) override final {
182         LOG(DEBUG) << "USB write: " << dump_header(&packet->msg);
183         auto header = std::make_shared<Block>(sizeof(packet->msg));
184         memcpy(header->data(), &packet->msg, sizeof(packet->msg));
185 
186         std::lock_guard<std::mutex> lock(write_mutex_);
187         write_requests_.push_back(
188                 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++));
189         if (!packet->payload.empty()) {
190             // The kernel attempts to allocate a contiguous block of memory for each write,
191             // which can fail if the write is large and the kernel heap is fragmented.
192             // Split large writes into smaller chunks to avoid this.
193             auto payload = std::make_shared<Block>(std::move(packet->payload));
194             size_t offset = 0;
195             size_t len = payload->size();
196 
197             while (len > 0) {
198                 size_t write_size = std::min(kUsbWriteSize, len);
199                 write_requests_.push_back(
200                         CreateWriteBlock(payload, offset, write_size, next_write_id_++));
201                 len -= write_size;
202                 offset += write_size;
203             }
204         }
205 
206         // Wake up the worker thread to submit writes.
207         uint64_t notify = 1;
208         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
209         if (rc < 0) {
210             PLOG(FATAL) << "failed to notify worker eventfd to submit writes";
211         }
212 
213         return true;
214     }
215 
StartUsbFfsConnection216     virtual void Start() override final { StartMonitor(); }
217 
StopUsbFfsConnection218     virtual void Stop() override final {
219         if (stopped_.exchange(true)) {
220             return;
221         }
222         stopped_ = true;
223         uint64_t notify = 1;
224         ssize_t rc = adb_write(worker_event_fd_.get(), &notify, sizeof(notify));
225         if (rc < 0) {
226             PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection";
227         }
228         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
229 
230         rc = adb_write(monitor_event_fd_.get(), &notify, sizeof(notify));
231         if (rc < 0) {
232             PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection";
233         }
234 
235         CHECK_EQ(static_cast<size_t>(rc), sizeof(notify));
236     }
237 
DoTlsHandshakeUsbFfsConnection238     virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final {
239         // TODO: support TLS for usb connections.
240         LOG(FATAL) << "Not supported yet.";
241         return false;
242     }
243 
244   private:
StartMonitorUsbFfsConnection245     void StartMonitor() {
246         // This is a bit of a mess.
247         // It's possible for io_submit to end up blocking, if we call it as the endpoint
248         // becomes disabled. Work around this by having a monitor thread to listen for functionfs
249         // lifecycle events. If we notice an error condition (either we've become disabled, or we
250         // were never enabled in the first place), we send interruption signals to the worker thread
251         // until it dies, and then report failure to the transport via HandleError, which will
252         // eventually result in the transport being destroyed, which will result in UsbFfsConnection
253         // being destroyed, which unblocks the open thread and restarts this entire process.
254         static std::once_flag handler_once;
255         std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); });
256 
257         monitor_thread_ = std::thread([this]() {
258             adb_thread_setname("UsbFfs-monitor");
259             LOG(INFO) << "UsbFfs-monitor thread spawned";
260 
261             bool bound = false;
262             bool enabled = false;
263             bool running = true;
264             while (running) {
265                 adb_pollfd pfd[2] = {
266                   { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 },
267                   { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 },
268                 };
269 
270                 // If we don't see our first bind within a second, try again.
271                 int timeout_ms = bound ? -1 : 1000;
272 
273                 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms));
274                 if (rc == -1) {
275                     PLOG(FATAL) << "poll on USB control fd failed";
276                 } else if (rc == 0) {
277                     LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again";
278                     break;
279                 }
280 
281                 if (pfd[1].revents) {
282                     // We were told to die.
283                     break;
284                 }
285 
286                 struct usb_functionfs_event event;
287                 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event)));
288                 if (rc == -1) {
289                     PLOG(FATAL) << "failed to read functionfs event";
290                 } else if (rc == 0) {
291                     LOG(WARNING) << "hit EOF on functionfs control fd";
292                     break;
293                 } else if (rc != sizeof(event)) {
294                     LOG(FATAL) << "read functionfs event of unexpected size, expected "
295                                << sizeof(event) << ", got " << rc;
296                 }
297 
298                 LOG(INFO) << "USB event: "
299                           << to_string(static_cast<usb_functionfs_event_type>(event.type));
300 
301                 switch (event.type) {
302                     case FUNCTIONFS_BIND:
303                         if (bound) {
304                             LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?";
305                             running = false;
306                             break;
307                         }
308 
309                         if (enabled) {
310                             LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?";
311                             running = false;
312                             break;
313                         }
314 
315                         bound = true;
316                         break;
317 
318                     case FUNCTIONFS_ENABLE:
319                         if (!bound) {
320                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?";
321                             running = false;
322                             break;
323                         }
324 
325                         if (enabled) {
326                             LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?";
327                             running = false;
328                             break;
329                         }
330 
331                         enabled = true;
332                         StartWorker();
333                         break;
334 
335                     case FUNCTIONFS_DISABLE:
336                         if (!bound) {
337                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?";
338                         }
339 
340                         if (!enabled) {
341                             LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?";
342                         }
343 
344                         enabled = false;
345                         running = false;
346                         break;
347 
348                     case FUNCTIONFS_UNBIND:
349                         if (enabled) {
350                             LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?";
351                         }
352 
353                         if (!bound) {
354                             LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?";
355                         }
356 
357                         bound = false;
358                         running = false;
359                         break;
360 
361                     case FUNCTIONFS_SETUP: {
362                         LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = "
363                                   << static_cast<int>(event.u.setup.bRequestType)
364                                   << ", bRequest = " << static_cast<int>(event.u.setup.bRequest)
365                                   << ", wValue = " << static_cast<int>(event.u.setup.wValue)
366                                   << ", wIndex = " << static_cast<int>(event.u.setup.wIndex)
367                                   << ", wLength = " << static_cast<int>(event.u.setup.wLength);
368 
369                         if ((event.u.setup.bRequestType & USB_DIR_IN)) {
370                             LOG(INFO) << "acking device-to-host control transfer";
371                             ssize_t rc = adb_write(control_fd_.get(), "", 0);
372                             if (rc != 0) {
373                                 PLOG(ERROR) << "failed to write empty packet to host";
374                                 break;
375                             }
376                         } else {
377                             std::string buf;
378                             buf.resize(event.u.setup.wLength + 1);
379 
380                             ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size());
381                             if (rc != event.u.setup.wLength) {
382                                 LOG(ERROR)
383                                         << "read " << rc
384                                         << " bytes when trying to read control request, expected "
385                                         << event.u.setup.wLength;
386                             }
387 
388                             LOG(INFO) << "control request contents: " << buf;
389                             break;
390                         }
391                     }
392                 }
393             }
394 
395             StopWorker();
396             HandleError("monitor thread finished");
397         });
398     }
399 
StartWorkerUsbFfsConnection400     void StartWorker() {
401         CHECK(!worker_started_);
402         worker_started_ = true;
403         worker_thread_ = std::thread([this]() {
404             adb_thread_setname("UsbFfs-worker");
405             LOG(INFO) << "UsbFfs-worker thread spawned";
406 
407             for (size_t i = 0; i < kUsbReadQueueDepth; ++i) {
408                 read_requests_[i] = CreateReadBlock(next_read_id_++);
409                 if (!SubmitRead(&read_requests_[i])) {
410                     return;
411                 }
412             }
413 
414             while (!stopped_) {
415                 uint64_t dummy;
416                 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy));
417                 if (rc == -1) {
418                     PLOG(FATAL) << "failed to read from eventfd";
419                 } else if (rc == 0) {
420                     LOG(FATAL) << "hit EOF on eventfd";
421                 }
422 
423                 ReadEvents();
424 
425                 std::lock_guard<std::mutex> lock(write_mutex_);
426                 SubmitWrites();
427             }
428         });
429     }
430 
StopWorkerUsbFfsConnection431     void StopWorker() {
432         if (!worker_started_) {
433             return;
434         }
435 
436         pthread_t worker_thread_handle = worker_thread_.native_handle();
437         while (true) {
438             int rc = pthread_kill(worker_thread_handle, kInterruptionSignal);
439             if (rc != 0) {
440                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
441                 break;
442             }
443 
444             std::this_thread::sleep_for(100ms);
445 
446             rc = pthread_kill(worker_thread_handle, 0);
447             if (rc == 0) {
448                 continue;
449             } else if (rc == ESRCH) {
450                 break;
451             } else {
452                 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc);
453             }
454         }
455 
456         worker_thread_.join();
457     }
458 
PrepareReadBlockUsbFfsConnection459     void PrepareReadBlock(IoReadBlock* block, uint64_t id) {
460         block->pending = false;
461         if (block->payload.capacity() >= kUsbReadSize) {
462             block->payload.resize(kUsbReadSize);
463         } else {
464             block->payload = Block(kUsbReadSize);
465         }
466         block->control.aio_data = static_cast<uint64_t>(TransferId::read(id));
467         block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data());
468         block->control.aio_nbytes = block->payload.size();
469     }
470 
CreateReadBlockUsbFfsConnection471     IoReadBlock CreateReadBlock(uint64_t id) {
472         IoReadBlock block;
473         PrepareReadBlock(&block, id);
474         block.control.aio_rw_flags = 0;
475         block.control.aio_lio_opcode = IOCB_CMD_PREAD;
476         block.control.aio_reqprio = 0;
477         block.control.aio_fildes = read_fd_.get();
478         block.control.aio_offset = 0;
479         block.control.aio_flags = IOCB_FLAG_RESFD;
480         block.control.aio_resfd = worker_event_fd_.get();
481         return block;
482     }
483 
ReadEventsUsbFfsConnection484     void ReadEvents() {
485         static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth;
486         struct io_event events[kMaxEvents];
487         struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0};
488         int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout);
489         if (rc == -1) {
490             HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno)));
491             return;
492         }
493 
494         for (int event_idx = 0; event_idx < rc; ++event_idx) {
495             auto& event = events[event_idx];
496             TransferId id = TransferId::from_value(event.data);
497 
498             if (event.res < 0) {
499                 // On initial connection, some clients will send a ClearFeature(HALT) to
500                 // attempt to resynchronize host and device after the adb server is killed.
501                 // On newer device kernels, the reads we've already dispatched will be cancelled.
502                 // Instead of treating this as a failure, which will tear down the interface and
503                 // lead to the client doing the same thing again, just resubmit if this happens
504                 // before we've actually read anything.
505                 if (!connection_started_ && event.res == -EPIPE &&
506                     id.direction == TransferDirection::READ) {
507                     uint64_t read_idx = id.id % kUsbReadQueueDepth;
508                     SubmitRead(&read_requests_[read_idx]);
509                     continue;
510                 } else {
511                     std::string error =
512                             StringPrintf("%s %" PRIu64 " failed with error %s",
513                                          id.direction == TransferDirection::READ ? "read" : "write",
514                                          id.id, strerror(-event.res));
515                     HandleError(error);
516                     return;
517                 }
518             }
519 
520             if (id.direction == TransferDirection::READ) {
521                 connection_started_ = true;
522                 if (!HandleRead(id, event.res)) {
523                     return;
524                 }
525             } else {
526                 HandleWrite(id);
527             }
528         }
529     }
530 
HandleReadUsbFfsConnection531     bool HandleRead(TransferId id, int64_t size) {
532         uint64_t read_idx = id.id % kUsbReadQueueDepth;
533         IoReadBlock* block = &read_requests_[read_idx];
534         block->pending = false;
535         block->payload.resize(size);
536 
537         // Notification for completed reads can be received out of order.
538         if (block->id().id != needed_read_id_) {
539             LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for "
540                          << needed_read_id_;
541             return true;
542         }
543 
544         for (uint64_t id = needed_read_id_;; ++id) {
545             size_t read_idx = id % kUsbReadQueueDepth;
546             IoReadBlock* current_block = &read_requests_[read_idx];
547             if (current_block->pending) {
548                 break;
549             }
550             if (!ProcessRead(current_block)) {
551                 return false;
552             }
553             ++needed_read_id_;
554         }
555 
556         return true;
557     }
558 
ProcessReadUsbFfsConnection559     bool ProcessRead(IoReadBlock* block) {
560         if (!block->payload.empty()) {
561             if (!incoming_header_.has_value()) {
562                 if (block->payload.size() != sizeof(amessage)) {
563                     HandleError("received packet of unexpected length while reading header");
564                     return false;
565                 }
566                 amessage& msg = incoming_header_.emplace();
567                 memcpy(&msg, block->payload.data(), sizeof(msg));
568                 LOG(DEBUG) << "USB read:" << dump_header(&msg);
569                 incoming_header_ = msg;
570 
571                 if (msg.command == A_CNXN) {
572                     CancelWrites();
573                 }
574             } else {
575                 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size();
576                 if (block->payload.size() > bytes_left) {
577                     HandleError("received too many bytes while waiting for payload");
578                     return false;
579                 }
580                 incoming_payload_.append(std::move(block->payload));
581             }
582 
583             if (incoming_header_->data_length == incoming_payload_.size()) {
584                 auto packet = std::make_unique<apacket>();
585                 packet->msg = *incoming_header_;
586 
587                 // TODO: Make apacket contain an IOVector so we don't have to coalesce.
588                 packet->payload = std::move(incoming_payload_).coalesce();
589                 transport_->HandleRead(std::move(packet));
590 
591                 incoming_header_.reset();
592                 // reuse the capacity of the incoming payload while we can.
593                 auto free_block = incoming_payload_.clear();
594                 if (block->payload.capacity() == 0) {
595                     block->payload = std::move(free_block);
596                 }
597             }
598         }
599 
600         PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth);
601         SubmitRead(block);
602         return true;
603     }
604 
SubmitReadUsbFfsConnection605     bool SubmitRead(IoReadBlock* block) {
606         block->pending = true;
607         struct iocb* iocb = &block->control;
608         if (io_submit(aio_context_.get(), 1, &iocb) != 1) {
609             HandleError(StringPrintf("failed to submit read: %s", strerror(errno)));
610             return false;
611         }
612 
613         return true;
614     }
615 
HandleWriteUsbFfsConnection616     void HandleWrite(TransferId id) {
617         std::lock_guard<std::mutex> lock(write_mutex_);
618         auto it =
619                 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) {
620                     return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id);
621                 });
622         CHECK(it != write_requests_.end());
623 
624         write_requests_.erase(it);
625         size_t outstanding_writes = --writes_submitted_;
626         LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes;
627     }
628 
CreateWriteBlockUsbFfsConnection629     IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len,
630                                   uint64_t id) {
631         auto block = IoWriteBlock();
632         block.payload = std::move(payload);
633         block.control.aio_data = static_cast<uint64_t>(TransferId::write(id));
634         block.control.aio_rw_flags = 0;
635         block.control.aio_lio_opcode = IOCB_CMD_PWRITE;
636         block.control.aio_reqprio = 0;
637         block.control.aio_fildes = write_fd_.get();
638         block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset);
639         block.control.aio_nbytes = len;
640         block.control.aio_offset = 0;
641         block.control.aio_flags = IOCB_FLAG_RESFD;
642         block.control.aio_resfd = worker_event_fd_.get();
643         return block;
644     }
645 
CreateWriteBlockUsbFfsConnection646     IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) {
647         size_t len = payload.size();
648         return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id);
649     }
650 
SubmitWritesUsbFfsConnection651     void SubmitWrites() REQUIRES(write_mutex_) {
652         if (writes_submitted_ == kUsbWriteQueueDepth) {
653             return;
654         }
655 
656         ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_,
657                                             write_requests_.size() - writes_submitted_);
658         CHECK_GE(writes_to_submit, 0);
659         if (writes_to_submit == 0) {
660             return;
661         }
662 
663         struct iocb* iocbs[kUsbWriteQueueDepth];
664         for (int i = 0; i < writes_to_submit; ++i) {
665             CHECK(!write_requests_[writes_submitted_ + i].pending);
666             write_requests_[writes_submitted_ + i].pending = true;
667             iocbs[i] = &write_requests_[writes_submitted_ + i].control;
668             LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]);
669         }
670 
671         writes_submitted_ += writes_to_submit;
672 
673         int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs);
674         if (rc == -1) {
675             HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno)));
676             return;
677         } else if (rc != writes_to_submit) {
678             LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit
679                        << ", actually submitted " << rc;
680         }
681     }
682 
CancelWritesUsbFfsConnection683     void CancelWrites() {
684         std::lock_guard<std::mutex> lock(write_mutex_);
685         for (size_t i = 0; i < writes_submitted_; ++i) {
686             struct io_event res;
687             if (write_requests_[i].pending == true) {
688                 LOG(INFO) << "cancelling pending write# " << i;
689                 io_cancel(aio_context_.get(), &write_requests_[i].control, &res);
690             }
691         }
692     }
693 
HandleErrorUsbFfsConnection694     void HandleError(const std::string& error) {
695         std::call_once(error_flag_, [&]() {
696             if (transport_) {
697                 transport_->HandleError(error);
698             }
699 
700             if (!stopped_) {
701                 Stop();
702             }
703         });
704     }
705 
706     std::thread monitor_thread_;
707 
708     bool worker_started_;
709     std::thread worker_thread_;
710 
711     std::atomic<bool> stopped_;
712     std::promise<void> destruction_notifier_;
713     std::once_flag error_flag_;
714 
715     unique_fd worker_event_fd_;
716     unique_fd monitor_event_fd_;
717 
718     ScopedAioContext aio_context_;
719     unique_fd control_fd_;
720     unique_fd read_fd_;
721     unique_fd write_fd_;
722 
723     bool connection_started_ = false;
724     std::optional<amessage> incoming_header_;
725     IOVector incoming_payload_;
726 
727     std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_;
728     IOVector read_data_;
729 
730     // ID of the next request that we're going to send out.
731     size_t next_read_id_ = 0;
732 
733     // ID of the next packet we're waiting for.
734     size_t needed_read_id_ = 0;
735 
736     std::mutex write_mutex_;
737     std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_);
738     size_t next_write_id_ GUARDED_BY(write_mutex_) = 0;
739     size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0;
740 
741     static constexpr int kInterruptionSignal = SIGUSR1;
742 };
743 
usb_ffs_open_thread()744 static void usb_ffs_open_thread() {
745     adb_thread_setname("usb ffs open");
746 
747     // When the device is acting as a USB host, we'll be unable to bind to the USB gadget on kernels
748     // that don't carry a downstream patch to enable that behavior.
749     //
750     // This property is copied from vendor.sys.usb.adb.disabled by an init.rc script.
751     //
752     // Note that this property only disables rebinding the USB gadget: setting it while an interface
753     // is already bound will do nothing.
754     static const char* kPropertyUsbDisabled = "sys.usb.adb.disabled";
755     PropertyMonitor prop_mon;
756     prop_mon.Add(kPropertyUsbDisabled, [](std::string value) {
757         // Return false (i.e. break out of PropertyMonitor::Run) when the property != 1.
758         return android::base::ParseBool(value) == android::base::ParseBoolResult::kTrue;
759     });
760 
761     while (true) {
762         unique_fd control;
763         unique_fd bulk_out;
764         unique_fd bulk_in;
765         if (!open_functionfs(&control, &bulk_out, &bulk_in)) {
766             std::this_thread::sleep_for(1s);
767             continue;
768         }
769 
770         if (android::base::GetBoolProperty(kPropertyUsbDisabled, false)) {
771             LOG(INFO) << "pausing USB due to " << kPropertyUsbDisabled;
772             prop_mon.Run();
773             LOG(INFO) << "resuming USB";
774         }
775 
776         atransport* transport = new atransport();
777         transport->serial = "UsbFfs";
778         std::promise<void> destruction_notifier;
779         std::future<void> future = destruction_notifier.get_future();
780         transport->SetConnection(std::make_unique<UsbFfsConnection>(
781                 std::move(control), std::move(bulk_out), std::move(bulk_in),
782                 std::move(destruction_notifier)));
783         register_transport(transport);
784         future.wait();
785     }
786 }
787 
usb_init()788 void usb_init() {
789     std::thread(usb_ffs_open_thread).detach();
790 }
791