1 /* 2 * Copyright (C) 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define TRACE_TAG INCREMENTAL 18 19 #include "incremental_server.h" 20 21 #include <android-base/endian.h> 22 #include <android-base/strings.h> 23 #include <inttypes.h> 24 #include <lz4.h> 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <unistd.h> 29 30 #include <array> 31 #include <deque> 32 #include <fstream> 33 #include <thread> 34 #include <type_traits> 35 #include <unordered_set> 36 37 #include "adb.h" 38 #include "adb_io.h" 39 #include "adb_trace.h" 40 #include "adb_unique_fd.h" 41 #include "adb_utils.h" 42 #include "incremental_utils.h" 43 #include "sysdeps.h" 44 45 namespace incremental { 46 47 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize; 48 static constexpr int kCompressedSizeMax = kBlockSize * 0.95; 49 static constexpr int8_t kTypeData = 0; 50 static constexpr int8_t kTypeHash = 1; 51 static constexpr int8_t kCompressionNone = 0; 52 static constexpr int8_t kCompressionLZ4 = 1; 53 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize)); 54 static constexpr auto kReadBufferSize = 128 * 1024; 55 static constexpr int kPollTimeoutMillis = 300000; // 5 minutes 56 57 using BlockSize = int16_t; 58 using FileId = int16_t; 59 using BlockIdx = int32_t; 60 using NumBlocks = int32_t; 61 using BlockType = int8_t; 62 using CompressionType = int8_t; 63 using RequestType = int16_t; 64 using ChunkHeader = int32_t; 65 using MagicType = uint32_t; 66 67 static constexpr MagicType INCR = 0x494e4352; // LE INCR 68 69 static constexpr RequestType SERVING_COMPLETE = 0; 70 static constexpr RequestType BLOCK_MISSING = 1; 71 static constexpr RequestType PREFETCH = 2; 72 static constexpr RequestType DESTROY = 3; 73 74 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) { 75 return val & ~(kBlockSize - 1); 76 } 77 78 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) { 79 return roundDownToBlockOffset(val + kBlockSize - 1); 80 } 81 82 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) { 83 return roundUpToBlockOffset(bytes) / kBlockSize; 84 } 85 86 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) { 87 return static_cast<off64_t>(blockIdx) * kBlockSize; 88 } 89 90 template <typename T> 91 static inline constexpr T toBigEndian(T t) { 92 using unsigned_type = std::make_unsigned_t<T>; 93 if constexpr (std::is_same_v<T, int16_t>) { 94 return htobe16(static_cast<unsigned_type>(t)); 95 } else if constexpr (std::is_same_v<T, int32_t>) { 96 return htobe32(static_cast<unsigned_type>(t)); 97 } else if constexpr (std::is_same_v<T, int64_t>) { 98 return htobe64(static_cast<unsigned_type>(t)); 99 } else { 100 return t; 101 } 102 } 103 104 template <typename T> 105 static inline constexpr T readBigEndian(void* data) { 106 using unsigned_type = std::make_unsigned_t<T>; 107 if constexpr (std::is_same_v<T, int16_t>) { 108 return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data))); 109 } else if constexpr (std::is_same_v<T, int32_t>) { 110 return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data))); 111 } else if constexpr (std::is_same_v<T, int64_t>) { 112 return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data))); 113 } else { 114 return T(); 115 } 116 } 117 118 // Received from device 119 // !Does not include magic! 120 struct RequestCommand { 121 RequestType request_type; // 2 bytes 122 FileId file_id; // 2 bytes 123 union { 124 BlockIdx block_idx; 125 NumBlocks num_blocks; 126 }; // 4 bytes 127 } __attribute__((packed)); 128 129 // Placed before actual data bytes of each block 130 struct ResponseHeader { 131 FileId file_id; // 2 bytes 132 BlockType block_type; // 1 byte 133 CompressionType compression_type; // 1 byte 134 BlockIdx block_idx; // 4 bytes 135 BlockSize block_size; // 2 bytes 136 137 static constexpr size_t responseSizeFor(size_t dataSize) { 138 return dataSize + sizeof(ResponseHeader); 139 } 140 } __attribute__((packed)); 141 142 template <size_t Size = kBlockSize> 143 struct BlockBuffer { 144 ResponseHeader header; 145 char data[Size]; 146 } __attribute__((packed)); 147 148 // Holds streaming state for a file 149 class File { 150 public: 151 // Plain file 152 File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset, 153 unique_fd tree_fd) 154 : File(filepath, id, size, tree_offset) { 155 this->fd_ = std::move(fd); 156 this->tree_fd_ = std::move(tree_fd); 157 priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size); 158 } 159 int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const { 160 int64_t bytes_read = -1; 161 const off64_t offsetStart = blockIndexToOffset(block_idx); 162 bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart); 163 return bytes_read; 164 } 165 int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const { 166 int64_t bytes_read = -1; 167 const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx); 168 bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart); 169 return bytes_read; 170 } 171 172 const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; } 173 174 bool hasTree() const { return tree_fd_.ok(); } 175 176 std::vector<bool> sentBlocks; 177 NumBlocks sentBlocksCount = 0; 178 179 std::vector<bool> sentTreeBlocks; 180 181 const char* const filepath; 182 const FileId id; 183 const int64_t size; 184 185 private: 186 File(const char* filepath, FileId id, int64_t size, int64_t tree_offset) 187 : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) { 188 sentBlocks.resize(numBytesToNumBlocks(size)); 189 sentTreeBlocks.resize(verity_tree_blocks_for_file(size)); 190 } 191 unique_fd fd_; 192 std::vector<BlockIdx> priority_blocks_; 193 194 unique_fd tree_fd_; 195 const int64_t tree_offset_; 196 }; 197 198 class IncrementalServer { 199 public: 200 IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files) 201 : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) { 202 buffer_.reserve(kReadBufferSize); 203 pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize); 204 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); 205 } 206 207 bool Serve(); 208 209 private: 210 struct PrefetchState { 211 const File* file; 212 BlockIdx overallIndex = 0; 213 BlockIdx overallEnd = 0; 214 BlockIdx priorityIndex = 0; 215 216 explicit PrefetchState(const File& f, BlockIdx start, int count) 217 : file(&f), 218 overallIndex(start), 219 overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {} 220 221 explicit PrefetchState(const File& f) 222 : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {} 223 224 bool done() const { 225 const bool overallSent = (overallIndex >= overallEnd); 226 if (file->PriorityBlocks().empty()) { 227 return overallSent; 228 } 229 return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size()); 230 } 231 }; 232 233 bool SkipToRequest(void* buffer, size_t* size, bool blocking); 234 std::optional<RequestCommand> ReadRequest(bool blocking); 235 236 void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); } 237 238 enum class SendResult { Sent, Skipped, Error }; 239 SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false); 240 241 bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx); 242 bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx); 243 244 bool SendDone(); 245 void RunPrefetching(); 246 247 void Send(const void* data, size_t size, bool flush); 248 void Flush(); 249 using TimePoint = decltype(std::chrono::high_resolution_clock::now()); 250 bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent); 251 252 unique_fd const adb_fd_; 253 unique_fd const output_fd_; 254 std::vector<File> files_; 255 256 // Incoming data buffer. 257 std::vector<char> buffer_; 258 259 std::deque<PrefetchState> prefetches_; 260 int compressed_ = 0, uncompressed_ = 0; 261 long long sentSize_ = 0; 262 263 static constexpr auto kChunkFlushSize = 31 * kBlockSize; 264 265 std::vector<char> pendingBlocksBuffer_; 266 char* pendingBlocks_ = nullptr; 267 268 // True when client notifies that all the data has been received 269 bool servingComplete_ = false; 270 }; 271 272 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) { 273 while (true) { 274 // Looking for INCR magic. 275 bool magic_found = false; 276 int bcur = 0; 277 int bsize = buffer_.size(); 278 for (bcur = 0; bcur + 4 < bsize; ++bcur) { 279 uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur)); 280 if (magic == INCR) { 281 magic_found = true; 282 break; 283 } 284 } 285 286 if (bcur > 0) { 287 // output the rest. 288 (void)WriteFdExactly(output_fd_, buffer_.data(), bcur); 289 erase_buffer_head(bcur); 290 } 291 292 if (magic_found && buffer_.size() >= *size + sizeof(INCR)) { 293 // fine, return 294 memcpy(buffer, buffer_.data() + sizeof(INCR), *size); 295 erase_buffer_head(*size + sizeof(INCR)); 296 return true; 297 } 298 299 adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0}; 300 auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0); 301 302 if (res != 1) { 303 auto err = errno; 304 (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); 305 if (res < 0) { 306 D("Failed to poll: %s", strerror(err)); 307 return false; 308 } 309 if (blocking) { 310 fprintf(stderr, "Timed out waiting for data from device.\n"); 311 } 312 if (blocking && servingComplete_) { 313 // timeout waiting from client. Serving is complete, so quit. 314 return false; 315 } 316 *size = 0; 317 return true; 318 } 319 320 bsize = buffer_.size(); 321 buffer_.resize(kReadBufferSize); 322 int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize); 323 if (r > 0) { 324 buffer_.resize(bsize + r); 325 continue; 326 } 327 328 D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno); 329 break; 330 } 331 // socket is closed. print remaining messages 332 WriteFdExactly(output_fd_, buffer_.data(), buffer_.size()); 333 return false; 334 } 335 336 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) { 337 uint8_t commandBuf[sizeof(RequestCommand)]; 338 auto size = sizeof(commandBuf); 339 if (!SkipToRequest(&commandBuf, &size, blocking)) { 340 return {{DESTROY}}; 341 } 342 if (size < sizeof(RequestCommand)) { 343 return {}; 344 } 345 RequestCommand request; 346 request.request_type = readBigEndian<RequestType>(&commandBuf[0]); 347 request.file_id = readBigEndian<FileId>(&commandBuf[2]); 348 request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]); 349 return request; 350 } 351 352 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) { 353 auto& file = files_[fileId]; 354 if (!file.hasTree()) { 355 return true; 356 } 357 const int32_t data_block_count = numBytesToNumBlocks(file.size); 358 359 const int32_t total_nodes_count(file.sentTreeBlocks.size()); 360 const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; 361 362 const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count; 363 364 // Leaf level, sending only 1 block. 365 const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock; 366 if (file.sentTreeBlocks[leaf_idx]) { 367 return true; 368 } 369 if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) { 370 return false; 371 } 372 file.sentTreeBlocks[leaf_idx] = true; 373 374 // Non-leaf, sending EVERYTHING. This should be done only once. 375 if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) { 376 return true; 377 } 378 379 for (int32_t i = 0; i < leaf_nodes_offset; ++i) { 380 if (!SendTreeBlock(fileId, blockIdx, i)) { 381 return false; 382 } 383 file.sentTreeBlocks[i] = true; 384 } 385 return true; 386 } 387 388 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) { 389 const auto& file = files_[fileId]; 390 391 BlockBuffer buffer; 392 const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data); 393 if (bytesRead <= 0) { 394 fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath, 395 blockIdx); 396 return false; 397 } 398 399 buffer.header.compression_type = kCompressionNone; 400 buffer.header.block_type = kTypeHash; 401 buffer.header.file_id = toBigEndian(fileId); 402 buffer.header.block_size = toBigEndian(int16_t(bytesRead)); 403 buffer.header.block_idx = toBigEndian(blockIdx); 404 405 Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false); 406 407 return true; 408 } 409 410 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult { 411 auto& file = files_[fileId]; 412 if (blockIdx >= static_cast<long>(file.sentBlocks.size())) { 413 // may happen as we schedule some extra blocks for reported page misses 414 D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx); 415 return SendResult::Skipped; 416 } 417 if (file.sentBlocks[blockIdx]) { 418 return SendResult::Skipped; 419 } 420 421 if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) { 422 return SendResult::Error; 423 } 424 425 BlockBuffer raw; 426 bool isZipCompressed = false; 427 const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed); 428 if (bytesRead < 0) { 429 fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx, 430 errno); 431 return SendResult::Error; 432 } 433 434 BlockBuffer<kCompressBound> compressed; 435 int16_t compressedSize = 0; 436 if (!isZipCompressed) { 437 compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound); 438 } 439 int16_t blockSize; 440 ResponseHeader* header; 441 if (compressedSize > 0 && compressedSize < kCompressedSizeMax) { 442 ++compressed_; 443 blockSize = compressedSize; 444 header = &compressed.header; 445 header->compression_type = kCompressionLZ4; 446 } else { 447 ++uncompressed_; 448 blockSize = bytesRead; 449 header = &raw.header; 450 header->compression_type = kCompressionNone; 451 } 452 453 header->block_type = kTypeData; 454 header->file_id = toBigEndian(fileId); 455 header->block_size = toBigEndian(blockSize); 456 header->block_idx = toBigEndian(blockIdx); 457 458 file.sentBlocks[blockIdx] = true; 459 file.sentBlocksCount += 1; 460 Send(header, ResponseHeader::responseSizeFor(blockSize), flush); 461 462 return SendResult::Sent; 463 } 464 465 bool IncrementalServer::SendDone() { 466 ResponseHeader header; 467 header.file_id = -1; 468 header.block_type = 0; 469 header.compression_type = 0; 470 header.block_idx = 0; 471 header.block_size = 0; 472 Send(&header, sizeof(header), true); 473 return true; 474 } 475 476 void IncrementalServer::RunPrefetching() { 477 constexpr auto kPrefetchBlocksPerIteration = 128; 478 479 int blocksToSend = kPrefetchBlocksPerIteration; 480 while (!prefetches_.empty() && blocksToSend > 0) { 481 auto& prefetch = prefetches_.front(); 482 const auto& file = *prefetch.file; 483 const auto& priority_blocks = file.PriorityBlocks(); 484 if (!priority_blocks.empty()) { 485 for (auto& i = prefetch.priorityIndex; 486 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) { 487 if (auto res = SendDataBlock(file.id, priority_blocks[i]); 488 res == SendResult::Sent) { 489 --blocksToSend; 490 } else if (res == SendResult::Error) { 491 fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i); 492 } 493 } 494 } 495 for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) { 496 if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) { 497 --blocksToSend; 498 } else if (res == SendResult::Error) { 499 fprintf(stderr, "Failed to send block %" PRId32 "\n", i); 500 } 501 } 502 if (prefetch.done()) { 503 prefetches_.pop_front(); 504 } 505 } 506 } 507 508 void IncrementalServer::Send(const void* data, size_t size, bool flush) { 509 pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_); 510 if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) { 511 Flush(); 512 } 513 } 514 515 void IncrementalServer::Flush() { 516 auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader)); 517 if (dataBytes == 0) { 518 return; 519 } 520 521 *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes); 522 auto totalBytes = sizeof(ChunkHeader) + dataBytes; 523 if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) { 524 fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes)); 525 } 526 sentSize_ += totalBytes; 527 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader); 528 } 529 530 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount, 531 int missesSent) { 532 servingComplete_ = true; 533 using namespace std::chrono; 534 auto endTime = high_resolution_clock::now(); 535 D("Streaming completed.\n" 536 "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: " 537 "%d, mb: %.3f\n" 538 "Total time taken: %.3fms", 539 missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0, 540 duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0); 541 return true; 542 } 543 544 bool IncrementalServer::Serve() { 545 // Initial handshake to verify connection is still alive 546 if (!SendOkay(adb_fd_)) { 547 fprintf(stderr, "Connection is dead. Abort.\n"); 548 return false; 549 } 550 551 std::unordered_set<FileId> prefetchedFiles; 552 bool doneSent = false; 553 int missesCount = 0; 554 int missesSent = 0; 555 556 using namespace std::chrono; 557 std::optional<TimePoint> startTime; 558 559 while (true) { 560 if (!doneSent && prefetches_.empty() && 561 std::all_of(files_.begin(), files_.end(), [](const File& f) { 562 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size()); 563 })) { 564 fprintf(stderr, "All files should be loaded. Notifying the device.\n"); 565 SendDone(); 566 doneSent = true; 567 } 568 569 const bool blocking = prefetches_.empty(); 570 if (blocking) { 571 // We've no idea how long the blocking call is, so let's flush whatever is still unsent. 572 Flush(); 573 } 574 auto request = ReadRequest(blocking); 575 576 if (!startTime) { 577 startTime = high_resolution_clock::now(); 578 } 579 580 if (request) { 581 FileId fileId = request->file_id; 582 BlockIdx blockIdx = request->block_idx; 583 584 switch (request->request_type) { 585 case DESTROY: { 586 // Stop everything. 587 return true; 588 } 589 case SERVING_COMPLETE: { 590 // Not stopping the server here. 591 ServingComplete(startTime, missesCount, missesSent); 592 break; 593 } 594 case BLOCK_MISSING: { 595 ++missesCount; 596 // Sends one single block ASAP. 597 if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 || 598 blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) { 599 fprintf(stderr, 600 "Received invalid data request for file_id %" PRId16 601 " block_idx %" PRId32 ".\n", 602 fileId, blockIdx); 603 break; 604 } 605 606 if (VLOG_IS_ON(INCREMENTAL)) { 607 auto& file = files_[fileId]; 608 auto posP = std::find(file.PriorityBlocks().begin(), 609 file.PriorityBlocks().end(), blockIdx); 610 D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)", 611 (int)fileId, (int)blockIdx, 612 posP == file.PriorityBlocks().end() 613 ? -1 614 : int(posP - file.PriorityBlocks().begin()), 615 int(file.PriorityBlocks().size())); 616 } 617 618 if (auto res = SendDataBlock(fileId, blockIdx, true); 619 res == SendResult::Error) { 620 fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx); 621 } else if (res == SendResult::Sent) { 622 ++missesSent; 623 // Make sure we send more pages from this place onward, in case if the OS is 624 // reading a bigger block. 625 prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7); 626 } 627 break; 628 } 629 case PREFETCH: { 630 // Start prefetching for a file 631 if (fileId < 0) { 632 fprintf(stderr, 633 "Received invalid prefetch request for file_id %" PRId16 "\n", 634 fileId); 635 break; 636 } 637 if (!prefetchedFiles.insert(fileId).second) { 638 fprintf(stderr, 639 "Received duplicate prefetch request for file_id %" PRId16 "\n", 640 fileId); 641 break; 642 } 643 D("Received prefetch request for file_id %" PRId16 ".", fileId); 644 prefetches_.emplace_back(files_[fileId]); 645 break; 646 } 647 default: 648 fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n", 649 request->request_type, fileId, blockIdx); 650 break; 651 } 652 } 653 654 RunPrefetching(); 655 } 656 } 657 658 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) { 659 struct stat st; 660 if (stat(filepath, &st)) { 661 error_exit("inc-server: failed to stat input file '%s'.", filepath); 662 } 663 664 unique_fd fd(adb_open(filepath, O_RDONLY)); 665 if (fd < 0) { 666 error_exit("inc-server: failed to open file '%s'.", filepath); 667 } 668 669 return {std::move(fd), st.st_size}; 670 } 671 672 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) { 673 std::string signature_file(filepath); 674 signature_file += IDSIG; 675 676 unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY)); 677 if (fd < 0) { 678 D("No signature file found for '%s'('%s')", filepath, signature_file.c_str()); 679 return {}; 680 } 681 682 auto [tree_offset, tree_size] = skip_id_sig_headers(fd); 683 if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) { 684 error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n", 685 signature_file.c_str(), (long long)tree_size, (long long)expected); 686 } 687 688 int32_t data_block_count = numBytesToNumBlocks(file_size); 689 int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock; 690 D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(), 691 int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count)); 692 693 return {std::move(fd), tree_offset}; 694 } 695 696 bool serve(int connection_fd, int output_fd, int argc, const char** argv) { 697 auto connection_ufd = unique_fd(connection_fd); 698 auto output_ufd = unique_fd(output_fd); 699 if (argc <= 0) { 700 error_exit("inc-server: must specify at least one file."); 701 } 702 703 std::vector<File> files; 704 files.reserve(argc); 705 for (int i = 0; i < argc; ++i) { 706 auto filepath = argv[i]; 707 708 auto [file_fd, file_size] = open_fd(filepath); 709 auto [sign_fd, sign_offset] = open_signature(file_size, filepath); 710 711 files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset, 712 std::move(sign_fd)); 713 } 714 715 IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files)); 716 printf("Serving...\n"); 717 fclose(stdin); 718 fclose(stdout); 719 return server.Serve(); 720 } 721 722 } // namespace incremental 723