1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define TRACE_TAG USB 18 19 #include "sysdeps.h" 20 21 #include <errno.h> 22 #include <inttypes.h> 23 #include <stdio.h> 24 #include <stdlib.h> 25 #include <string.h> 26 #include <sys/ioctl.h> 27 #include <sys/types.h> 28 #include <unistd.h> 29 30 #include <linux/usb/functionfs.h> 31 #include <sys/eventfd.h> 32 33 #include <algorithm> 34 #include <array> 35 #include <future> 36 #include <memory> 37 #include <mutex> 38 #include <optional> 39 #include <vector> 40 41 #include <asyncio/AsyncIO.h> 42 43 #include <android-base/logging.h> 44 #include <android-base/macros.h> 45 #include <android-base/parsebool.h> 46 #include <android-base/properties.h> 47 #include <android-base/thread_annotations.h> 48 49 #include "adb_unique_fd.h" 50 #include "adb_utils.h" 51 #include "daemon/property_monitor.h" 52 #include "daemon/usb_ffs.h" 53 #include "sysdeps/chrono.h" 54 #include "transfer_id.h" 55 #include "transport.h" 56 #include "types.h" 57 58 using android::base::StringPrintf; 59 60 // Not all USB controllers support operations larger than 16k, so don't go above that. 61 // Also, each submitted operation does an allocation in the kernel of that size, so we want to 62 // minimize our queue depth while still maintaining a deep enough queue to keep the USB stack fed. 63 static constexpr size_t kUsbReadQueueDepth = 8; 64 static constexpr size_t kUsbReadSize = 4 * PAGE_SIZE; 65 66 static constexpr size_t kUsbWriteQueueDepth = 8; 67 static constexpr size_t kUsbWriteSize = 4 * PAGE_SIZE; 68 69 static const char* to_string(enum usb_functionfs_event_type type) { 70 switch (type) { 71 case FUNCTIONFS_BIND: 72 return "FUNCTIONFS_BIND"; 73 case FUNCTIONFS_UNBIND: 74 return "FUNCTIONFS_UNBIND"; 75 case FUNCTIONFS_ENABLE: 76 return "FUNCTIONFS_ENABLE"; 77 case FUNCTIONFS_DISABLE: 78 return "FUNCTIONFS_DISABLE"; 79 case FUNCTIONFS_SETUP: 80 return "FUNCTIONFS_SETUP"; 81 case FUNCTIONFS_SUSPEND: 82 return "FUNCTIONFS_SUSPEND"; 83 case FUNCTIONFS_RESUME: 84 return "FUNCTIONFS_RESUME"; 85 } 86 } 87 88 template <class Payload> 89 struct IoBlock { 90 bool pending = false; 91 struct iocb control = {}; 92 Payload payload; 93 94 TransferId id() const { return TransferId::from_value(control.aio_data); } 95 }; 96 97 using IoReadBlock = IoBlock<Block>; 98 using IoWriteBlock = IoBlock<std::shared_ptr<Block>>; 99 100 struct ScopedAioContext { 101 ScopedAioContext() = default; 102 ~ScopedAioContext() { reset(); } 103 104 ScopedAioContext(ScopedAioContext&& move) { reset(move.release()); } 105 ScopedAioContext(const ScopedAioContext& copy) = delete; 106 107 ScopedAioContext& operator=(ScopedAioContext&& move) { 108 reset(move.release()); 109 return *this; 110 } 111 ScopedAioContext& operator=(const ScopedAioContext& copy) = delete; 112 113 static ScopedAioContext Create(size_t max_events) { 114 aio_context_t ctx = 0; 115 if (io_setup(max_events, &ctx) != 0) { 116 PLOG(FATAL) << "failed to create aio_context_t"; 117 } 118 ScopedAioContext result; 119 result.reset(ctx); 120 return result; 121 } 122 123 aio_context_t release() { 124 aio_context_t result = context_; 125 context_ = 0; 126 return result; 127 } 128 129 void reset(aio_context_t new_context = 0) { 130 if (context_ != 0) { 131 io_destroy(context_); 132 } 133 134 context_ = new_context; 135 } 136 137 aio_context_t get() { return context_; } 138 139 private: 140 aio_context_t context_ = 0; 141 }; 142 143 struct UsbFfsConnection : public Connection { 144 UsbFfsConnection(unique_fd control, unique_fd read, unique_fd write, 145 std::promise<void> destruction_notifier) 146 : worker_started_(false), 147 stopped_(false), 148 destruction_notifier_(std::move(destruction_notifier)), 149 control_fd_(std::move(control)), 150 read_fd_(std::move(read)), 151 write_fd_(std::move(write)) { 152 LOG(INFO) << "UsbFfsConnection constructed"; 153 worker_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); 154 if (worker_event_fd_ == -1) { 155 PLOG(FATAL) << "failed to create eventfd"; 156 } 157 158 monitor_event_fd_.reset(eventfd(0, EFD_CLOEXEC)); 159 if (monitor_event_fd_ == -1) { 160 PLOG(FATAL) << "failed to create eventfd"; 161 } 162 163 aio_context_ = ScopedAioContext::Create(kUsbReadQueueDepth + kUsbWriteQueueDepth); 164 } 165 166 ~UsbFfsConnection() { 167 LOG(INFO) << "UsbFfsConnection being destroyed"; 168 Stop(); 169 monitor_thread_.join(); 170 171 // We need to explicitly close our file descriptors before we notify our destruction, 172 // because the thread listening on the future will immediately try to reopen the endpoint. 173 aio_context_.reset(); 174 control_fd_.reset(); 175 read_fd_.reset(); 176 write_fd_.reset(); 177 178 destruction_notifier_.set_value(); 179 } 180 181 virtual bool Write(std::unique_ptr<apacket> packet) override final { 182 LOG(DEBUG) << "USB write: " << dump_header(&packet->msg); 183 auto header = std::make_shared<Block>(sizeof(packet->msg)); 184 memcpy(header->data(), &packet->msg, sizeof(packet->msg)); 185 186 std::lock_guard<std::mutex> lock(write_mutex_); 187 write_requests_.push_back( 188 CreateWriteBlock(std::move(header), 0, sizeof(packet->msg), next_write_id_++)); 189 if (!packet->payload.empty()) { 190 // The kernel attempts to allocate a contiguous block of memory for each write, 191 // which can fail if the write is large and the kernel heap is fragmented. 192 // Split large writes into smaller chunks to avoid this. 193 auto payload = std::make_shared<Block>(std::move(packet->payload)); 194 size_t offset = 0; 195 size_t len = payload->size(); 196 197 while (len > 0) { 198 size_t write_size = std::min(kUsbWriteSize, len); 199 write_requests_.push_back( 200 CreateWriteBlock(payload, offset, write_size, next_write_id_++)); 201 len -= write_size; 202 offset += write_size; 203 } 204 } 205 206 // Wake up the worker thread to submit writes. 207 uint64_t notify = 1; 208 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify)); 209 if (rc < 0) { 210 PLOG(FATAL) << "failed to notify worker eventfd to submit writes"; 211 } 212 213 return true; 214 } 215 216 virtual void Start() override final { StartMonitor(); } 217 218 virtual void Stop() override final { 219 if (stopped_.exchange(true)) { 220 return; 221 } 222 stopped_ = true; 223 uint64_t notify = 1; 224 ssize_t rc = adb_write(worker_event_fd_.get(), ¬ify, sizeof(notify)); 225 if (rc < 0) { 226 PLOG(FATAL) << "failed to notify worker eventfd to stop UsbFfsConnection"; 227 } 228 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify)); 229 230 rc = adb_write(monitor_event_fd_.get(), ¬ify, sizeof(notify)); 231 if (rc < 0) { 232 PLOG(FATAL) << "failed to notify monitor eventfd to stop UsbFfsConnection"; 233 } 234 235 CHECK_EQ(static_cast<size_t>(rc), sizeof(notify)); 236 } 237 238 virtual bool DoTlsHandshake(RSA* key, std::string* auth_key) override final { 239 // TODO: support TLS for usb connections. 240 LOG(FATAL) << "Not supported yet."; 241 return false; 242 } 243 244 private: 245 void StartMonitor() { 246 // This is a bit of a mess. 247 // It's possible for io_submit to end up blocking, if we call it as the endpoint 248 // becomes disabled. Work around this by having a monitor thread to listen for functionfs 249 // lifecycle events. If we notice an error condition (either we've become disabled, or we 250 // were never enabled in the first place), we send interruption signals to the worker thread 251 // until it dies, and then report failure to the transport via HandleError, which will 252 // eventually result in the transport being destroyed, which will result in UsbFfsConnection 253 // being destroyed, which unblocks the open thread and restarts this entire process. 254 static std::once_flag handler_once; 255 std::call_once(handler_once, []() { signal(kInterruptionSignal, [](int) {}); }); 256 257 monitor_thread_ = std::thread([this]() { 258 adb_thread_setname("UsbFfs-monitor"); 259 LOG(INFO) << "UsbFfs-monitor thread spawned"; 260 261 bool bound = false; 262 bool enabled = false; 263 bool running = true; 264 while (running) { 265 adb_pollfd pfd[2] = { 266 { .fd = control_fd_.get(), .events = POLLIN, .revents = 0 }, 267 { .fd = monitor_event_fd_.get(), .events = POLLIN, .revents = 0 }, 268 }; 269 270 // If we don't see our first bind within a second, try again. 271 int timeout_ms = bound ? -1 : 1000; 272 273 int rc = TEMP_FAILURE_RETRY(adb_poll(pfd, 2, timeout_ms)); 274 if (rc == -1) { 275 PLOG(FATAL) << "poll on USB control fd failed"; 276 } else if (rc == 0) { 277 LOG(WARNING) << "timed out while waiting for FUNCTIONFS_BIND, trying again"; 278 break; 279 } 280 281 if (pfd[1].revents) { 282 // We were told to die. 283 break; 284 } 285 286 struct usb_functionfs_event event; 287 rc = TEMP_FAILURE_RETRY(adb_read(control_fd_.get(), &event, sizeof(event))); 288 if (rc == -1) { 289 PLOG(FATAL) << "failed to read functionfs event"; 290 } else if (rc == 0) { 291 LOG(WARNING) << "hit EOF on functionfs control fd"; 292 break; 293 } else if (rc != sizeof(event)) { 294 LOG(FATAL) << "read functionfs event of unexpected size, expected " 295 << sizeof(event) << ", got " << rc; 296 } 297 298 LOG(INFO) << "USB event: " 299 << to_string(static_cast<usb_functionfs_event_type>(event.type)); 300 301 switch (event.type) { 302 case FUNCTIONFS_BIND: 303 if (bound) { 304 LOG(WARNING) << "received FUNCTIONFS_BIND while already bound?"; 305 running = false; 306 break; 307 } 308 309 if (enabled) { 310 LOG(WARNING) << "received FUNCTIONFS_BIND while already enabled?"; 311 running = false; 312 break; 313 } 314 315 bound = true; 316 break; 317 318 case FUNCTIONFS_ENABLE: 319 if (!bound) { 320 LOG(WARNING) << "received FUNCTIONFS_ENABLE while not bound?"; 321 running = false; 322 break; 323 } 324 325 if (enabled) { 326 LOG(WARNING) << "received FUNCTIONFS_ENABLE while already enabled?"; 327 running = false; 328 break; 329 } 330 331 enabled = true; 332 StartWorker(); 333 break; 334 335 case FUNCTIONFS_DISABLE: 336 if (!bound) { 337 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not bound?"; 338 } 339 340 if (!enabled) { 341 LOG(WARNING) << "received FUNCTIONFS_DISABLE while not enabled?"; 342 } 343 344 enabled = false; 345 running = false; 346 break; 347 348 case FUNCTIONFS_UNBIND: 349 if (enabled) { 350 LOG(WARNING) << "received FUNCTIONFS_UNBIND while still enabled?"; 351 } 352 353 if (!bound) { 354 LOG(WARNING) << "received FUNCTIONFS_UNBIND when not bound?"; 355 } 356 357 bound = false; 358 running = false; 359 break; 360 361 case FUNCTIONFS_SETUP: { 362 LOG(INFO) << "received FUNCTIONFS_SETUP control transfer: bRequestType = " 363 << static_cast<int>(event.u.setup.bRequestType) 364 << ", bRequest = " << static_cast<int>(event.u.setup.bRequest) 365 << ", wValue = " << static_cast<int>(event.u.setup.wValue) 366 << ", wIndex = " << static_cast<int>(event.u.setup.wIndex) 367 << ", wLength = " << static_cast<int>(event.u.setup.wLength); 368 369 if ((event.u.setup.bRequestType & USB_DIR_IN)) { 370 LOG(INFO) << "acking device-to-host control transfer"; 371 ssize_t rc = adb_write(control_fd_.get(), "", 0); 372 if (rc != 0) { 373 PLOG(ERROR) << "failed to write empty packet to host"; 374 break; 375 } 376 } else { 377 std::string buf; 378 buf.resize(event.u.setup.wLength + 1); 379 380 ssize_t rc = adb_read(control_fd_.get(), buf.data(), buf.size()); 381 if (rc != event.u.setup.wLength) { 382 LOG(ERROR) 383 << "read " << rc 384 << " bytes when trying to read control request, expected " 385 << event.u.setup.wLength; 386 } 387 388 LOG(INFO) << "control request contents: " << buf; 389 break; 390 } 391 } 392 } 393 } 394 395 StopWorker(); 396 HandleError("monitor thread finished"); 397 }); 398 } 399 400 void StartWorker() { 401 CHECK(!worker_started_); 402 worker_started_ = true; 403 worker_thread_ = std::thread([this]() { 404 adb_thread_setname("UsbFfs-worker"); 405 LOG(INFO) << "UsbFfs-worker thread spawned"; 406 407 for (size_t i = 0; i < kUsbReadQueueDepth; ++i) { 408 read_requests_[i] = CreateReadBlock(next_read_id_++); 409 if (!SubmitRead(&read_requests_[i])) { 410 return; 411 } 412 } 413 414 while (!stopped_) { 415 uint64_t dummy; 416 ssize_t rc = adb_read(worker_event_fd_.get(), &dummy, sizeof(dummy)); 417 if (rc == -1) { 418 PLOG(FATAL) << "failed to read from eventfd"; 419 } else if (rc == 0) { 420 LOG(FATAL) << "hit EOF on eventfd"; 421 } 422 423 ReadEvents(); 424 425 std::lock_guard<std::mutex> lock(write_mutex_); 426 SubmitWrites(); 427 } 428 }); 429 } 430 431 void StopWorker() { 432 if (!worker_started_) { 433 return; 434 } 435 436 pthread_t worker_thread_handle = worker_thread_.native_handle(); 437 while (true) { 438 int rc = pthread_kill(worker_thread_handle, kInterruptionSignal); 439 if (rc != 0) { 440 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); 441 break; 442 } 443 444 std::this_thread::sleep_for(100ms); 445 446 rc = pthread_kill(worker_thread_handle, 0); 447 if (rc == 0) { 448 continue; 449 } else if (rc == ESRCH) { 450 break; 451 } else { 452 LOG(ERROR) << "failed to send interruption signal to worker: " << strerror(rc); 453 } 454 } 455 456 worker_thread_.join(); 457 } 458 459 void PrepareReadBlock(IoReadBlock* block, uint64_t id) { 460 block->pending = false; 461 if (block->payload.capacity() >= kUsbReadSize) { 462 block->payload.resize(kUsbReadSize); 463 } else { 464 block->payload = Block(kUsbReadSize); 465 } 466 block->control.aio_data = static_cast<uint64_t>(TransferId::read(id)); 467 block->control.aio_buf = reinterpret_cast<uintptr_t>(block->payload.data()); 468 block->control.aio_nbytes = block->payload.size(); 469 } 470 471 IoReadBlock CreateReadBlock(uint64_t id) { 472 IoReadBlock block; 473 PrepareReadBlock(&block, id); 474 block.control.aio_rw_flags = 0; 475 block.control.aio_lio_opcode = IOCB_CMD_PREAD; 476 block.control.aio_reqprio = 0; 477 block.control.aio_fildes = read_fd_.get(); 478 block.control.aio_offset = 0; 479 block.control.aio_flags = IOCB_FLAG_RESFD; 480 block.control.aio_resfd = worker_event_fd_.get(); 481 return block; 482 } 483 484 void ReadEvents() { 485 static constexpr size_t kMaxEvents = kUsbReadQueueDepth + kUsbWriteQueueDepth; 486 struct io_event events[kMaxEvents]; 487 struct timespec timeout = {.tv_sec = 0, .tv_nsec = 0}; 488 int rc = io_getevents(aio_context_.get(), 0, kMaxEvents, events, &timeout); 489 if (rc == -1) { 490 HandleError(StringPrintf("io_getevents failed while reading: %s", strerror(errno))); 491 return; 492 } 493 494 for (int event_idx = 0; event_idx < rc; ++event_idx) { 495 auto& event = events[event_idx]; 496 TransferId id = TransferId::from_value(event.data); 497 498 if (event.res < 0) { 499 // On initial connection, some clients will send a ClearFeature(HALT) to 500 // attempt to resynchronize host and device after the adb server is killed. 501 // On newer device kernels, the reads we've already dispatched will be cancelled. 502 // Instead of treating this as a failure, which will tear down the interface and 503 // lead to the client doing the same thing again, just resubmit if this happens 504 // before we've actually read anything. 505 if (!connection_started_ && event.res == -EPIPE && 506 id.direction == TransferDirection::READ) { 507 uint64_t read_idx = id.id % kUsbReadQueueDepth; 508 SubmitRead(&read_requests_[read_idx]); 509 continue; 510 } else { 511 std::string error = 512 StringPrintf("%s %" PRIu64 " failed with error %s", 513 id.direction == TransferDirection::READ ? "read" : "write", 514 id.id, strerror(-event.res)); 515 HandleError(error); 516 return; 517 } 518 } 519 520 if (id.direction == TransferDirection::READ) { 521 connection_started_ = true; 522 if (!HandleRead(id, event.res)) { 523 return; 524 } 525 } else { 526 HandleWrite(id); 527 } 528 } 529 } 530 531 bool HandleRead(TransferId id, int64_t size) { 532 uint64_t read_idx = id.id % kUsbReadQueueDepth; 533 IoReadBlock* block = &read_requests_[read_idx]; 534 block->pending = false; 535 block->payload.resize(size); 536 537 // Notification for completed reads can be received out of order. 538 if (block->id().id != needed_read_id_) { 539 LOG(VERBOSE) << "read " << block->id().id << " completed while waiting for " 540 << needed_read_id_; 541 return true; 542 } 543 544 for (uint64_t id = needed_read_id_;; ++id) { 545 size_t read_idx = id % kUsbReadQueueDepth; 546 IoReadBlock* current_block = &read_requests_[read_idx]; 547 if (current_block->pending) { 548 break; 549 } 550 if (!ProcessRead(current_block)) { 551 return false; 552 } 553 ++needed_read_id_; 554 } 555 556 return true; 557 } 558 559 bool ProcessRead(IoReadBlock* block) { 560 if (!block->payload.empty()) { 561 if (!incoming_header_.has_value()) { 562 if (block->payload.size() != sizeof(amessage)) { 563 HandleError("received packet of unexpected length while reading header"); 564 return false; 565 } 566 amessage& msg = incoming_header_.emplace(); 567 memcpy(&msg, block->payload.data(), sizeof(msg)); 568 LOG(DEBUG) << "USB read:" << dump_header(&msg); 569 incoming_header_ = msg; 570 } else { 571 size_t bytes_left = incoming_header_->data_length - incoming_payload_.size(); 572 if (block->payload.size() > bytes_left) { 573 HandleError("received too many bytes while waiting for payload"); 574 return false; 575 } 576 incoming_payload_.append(std::move(block->payload)); 577 } 578 579 if (incoming_header_->data_length == incoming_payload_.size()) { 580 auto packet = std::make_unique<apacket>(); 581 packet->msg = *incoming_header_; 582 583 // TODO: Make apacket contain an IOVector so we don't have to coalesce. 584 packet->payload = std::move(incoming_payload_).coalesce(); 585 read_callback_(this, std::move(packet)); 586 587 incoming_header_.reset(); 588 // reuse the capacity of the incoming payload while we can. 589 auto free_block = incoming_payload_.clear(); 590 if (block->payload.capacity() == 0) { 591 block->payload = std::move(free_block); 592 } 593 } 594 } 595 596 PrepareReadBlock(block, block->id().id + kUsbReadQueueDepth); 597 SubmitRead(block); 598 return true; 599 } 600 601 bool SubmitRead(IoReadBlock* block) { 602 block->pending = true; 603 struct iocb* iocb = &block->control; 604 if (io_submit(aio_context_.get(), 1, &iocb) != 1) { 605 HandleError(StringPrintf("failed to submit read: %s", strerror(errno))); 606 return false; 607 } 608 609 return true; 610 } 611 612 void HandleWrite(TransferId id) { 613 std::lock_guard<std::mutex> lock(write_mutex_); 614 auto it = 615 std::find_if(write_requests_.begin(), write_requests_.end(), [id](const auto& req) { 616 return static_cast<uint64_t>(req.id()) == static_cast<uint64_t>(id); 617 }); 618 CHECK(it != write_requests_.end()); 619 620 write_requests_.erase(it); 621 size_t outstanding_writes = --writes_submitted_; 622 LOG(DEBUG) << "USB write: reaped, down to " << outstanding_writes; 623 } 624 625 IoWriteBlock CreateWriteBlock(std::shared_ptr<Block> payload, size_t offset, size_t len, 626 uint64_t id) { 627 auto block = IoWriteBlock(); 628 block.payload = std::move(payload); 629 block.control.aio_data = static_cast<uint64_t>(TransferId::write(id)); 630 block.control.aio_rw_flags = 0; 631 block.control.aio_lio_opcode = IOCB_CMD_PWRITE; 632 block.control.aio_reqprio = 0; 633 block.control.aio_fildes = write_fd_.get(); 634 block.control.aio_buf = reinterpret_cast<uintptr_t>(block.payload->data() + offset); 635 block.control.aio_nbytes = len; 636 block.control.aio_offset = 0; 637 block.control.aio_flags = IOCB_FLAG_RESFD; 638 block.control.aio_resfd = worker_event_fd_.get(); 639 return block; 640 } 641 642 IoWriteBlock CreateWriteBlock(Block&& payload, uint64_t id) { 643 size_t len = payload.size(); 644 return CreateWriteBlock(std::make_shared<Block>(std::move(payload)), 0, len, id); 645 } 646 647 void SubmitWrites() REQUIRES(write_mutex_) { 648 if (writes_submitted_ == kUsbWriteQueueDepth) { 649 return; 650 } 651 652 ssize_t writes_to_submit = std::min(kUsbWriteQueueDepth - writes_submitted_, 653 write_requests_.size() - writes_submitted_); 654 CHECK_GE(writes_to_submit, 0); 655 if (writes_to_submit == 0) { 656 return; 657 } 658 659 struct iocb* iocbs[kUsbWriteQueueDepth]; 660 for (int i = 0; i < writes_to_submit; ++i) { 661 CHECK(!write_requests_[writes_submitted_ + i].pending); 662 write_requests_[writes_submitted_ + i].pending = true; 663 iocbs[i] = &write_requests_[writes_submitted_ + i].control; 664 LOG(VERBOSE) << "submitting write_request " << static_cast<void*>(iocbs[i]); 665 } 666 667 writes_submitted_ += writes_to_submit; 668 669 int rc = io_submit(aio_context_.get(), writes_to_submit, iocbs); 670 if (rc == -1) { 671 HandleError(StringPrintf("failed to submit write requests: %s", strerror(errno))); 672 return; 673 } else if (rc != writes_to_submit) { 674 LOG(FATAL) << "failed to submit all writes: wanted to submit " << writes_to_submit 675 << ", actually submitted " << rc; 676 } 677 } 678 679 void HandleError(const std::string& error) { 680 std::call_once(error_flag_, [&]() { 681 error_callback_(this, error); 682 if (!stopped_) { 683 Stop(); 684 } 685 }); 686 } 687 688 std::thread monitor_thread_; 689 690 bool worker_started_; 691 std::thread worker_thread_; 692 693 std::atomic<bool> stopped_; 694 std::promise<void> destruction_notifier_; 695 std::once_flag error_flag_; 696 697 unique_fd worker_event_fd_; 698 unique_fd monitor_event_fd_; 699 700 ScopedAioContext aio_context_; 701 unique_fd control_fd_; 702 unique_fd read_fd_; 703 unique_fd write_fd_; 704 705 bool connection_started_ = false; 706 std::optional<amessage> incoming_header_; 707 IOVector incoming_payload_; 708 709 std::array<IoReadBlock, kUsbReadQueueDepth> read_requests_; 710 IOVector read_data_; 711 712 // ID of the next request that we're going to send out. 713 size_t next_read_id_ = 0; 714 715 // ID of the next packet we're waiting for. 716 size_t needed_read_id_ = 0; 717 718 std::mutex write_mutex_; 719 std::deque<IoWriteBlock> write_requests_ GUARDED_BY(write_mutex_); 720 size_t next_write_id_ GUARDED_BY(write_mutex_) = 0; 721 size_t writes_submitted_ GUARDED_BY(write_mutex_) = 0; 722 723 static constexpr int kInterruptionSignal = SIGUSR1; 724 }; 725 726 static void usb_ffs_open_thread() { 727 adb_thread_setname("usb ffs open"); 728 729 // When the device is acting as a USB host, we'll be unable to bind to the USB gadget on kernels 730 // that don't carry a downstream patch to enable that behavior. 731 // 732 // This property is copied from vendor.sys.usb.adb.disabled by an init.rc script. 733 // 734 // Note that this property only disables rebinding the USB gadget: setting it while an interface 735 // is already bound will do nothing. 736 static const char* kPropertyUsbDisabled = "sys.usb.adb.disabled"; 737 PropertyMonitor prop_mon; 738 prop_mon.Add(kPropertyUsbDisabled, [](std::string value) { 739 // Return false (i.e. break out of PropertyMonitor::Run) when the property != 1. 740 return android::base::ParseBool(value) == android::base::ParseBoolResult::kTrue; 741 }); 742 743 while (true) { 744 unique_fd control; 745 unique_fd bulk_out; 746 unique_fd bulk_in; 747 if (!open_functionfs(&control, &bulk_out, &bulk_in)) { 748 std::this_thread::sleep_for(1s); 749 continue; 750 } 751 752 if (android::base::GetBoolProperty(kPropertyUsbDisabled, false)) { 753 LOG(INFO) << "pausing USB due to " << kPropertyUsbDisabled; 754 prop_mon.Run(); 755 LOG(INFO) << "resuming USB"; 756 } 757 758 atransport* transport = new atransport(); 759 transport->serial = "UsbFfs"; 760 std::promise<void> destruction_notifier; 761 std::future<void> future = destruction_notifier.get_future(); 762 transport->SetConnection(std::make_unique<UsbFfsConnection>( 763 std::move(control), std::move(bulk_out), std::move(bulk_in), 764 std::move(destruction_notifier))); 765 register_transport(transport); 766 future.wait(); 767 } 768 } 769 770 void usb_init() { 771 std::thread(usb_ffs_open_thread).detach(); 772 } 773