// Copyright (C) 2019 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include "prefetcher/minijail.h" #include "common/cmd_utils.h" #include "prefetcher/prefetcher_daemon.h" #include "prefetcher/session_manager.h" #include "prefetcher/session.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace iorap::prefetcher { // Gate super-spammy IPC logging behind a property. // This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower. static bool LogVerboseIpc() { static bool initialized = false; static bool verbose_ipc; if (initialized == false) { initialized = true; verbose_ipc = ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false); } return verbose_ipc; } static const bool kInstallMiniJail = ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true); static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd"; static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size using ArgString = const char*; std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) { switch (ps) { case ReadAheadKind::kFadvise: os << "fadvise"; break; case ReadAheadKind::kMmapLocked: os << "mmap"; break; case ReadAheadKind::kMlock: os << "mlock"; break; default: os << ""; } return os; } std::ostream& operator<<(std::ostream& os, CommandChoice choice) { switch (choice) { case CommandChoice::kRegisterFilePath: os << "kRegisterFilePath"; break; case CommandChoice::kUnregisterFilePath: os << "kUnregisterFilePath"; break; case CommandChoice::kReadAhead: os << "kReadAhead"; break; case CommandChoice::kExit: os << "kExit"; break; case CommandChoice::kCreateSession: os << "kCreateSession"; break; case CommandChoice::kDestroySession: os << "kDestroySession"; break; case CommandChoice::kDumpSession: os << "kDumpSession"; break; case CommandChoice::kDumpEverything: os << "kDumpEverything"; break; case CommandChoice::kCreateFdSession: os << "kCreateFdSession"; break; default: CHECK(false) << "forgot to handle this choice"; break; } return os; } std::ostream& operator<<(std::ostream& os, const Command& command) { os << "Command{"; os << "choice=" << command.choice << ","; bool has_session_id = true; bool has_id = true; switch (command.choice) { case CommandChoice::kDumpEverything: case CommandChoice::kExit: has_session_id = false; FALLTHROUGH_INTENDED; case CommandChoice::kCreateFdSession: case CommandChoice::kCreateSession: case CommandChoice::kDestroySession: case CommandChoice::kDumpSession: has_id = false; break; default: break; } if (has_session_id) { os << "sid=" << command.session_id << ","; } if (has_id) { os << "id=" << command.id << ","; } switch (command.choice) { case CommandChoice::kRegisterFilePath: os << "file_path="; if (command.file_path) { os << *(command.file_path); } else { os << "(nullopt)"; } break; case CommandChoice::kUnregisterFilePath: break; case CommandChoice::kReadAhead: os << "read_ahead_kind=" << command.read_ahead_kind << ","; os << "length=" << command.length << ","; os << "offset=" << command.offset << ","; break; case CommandChoice::kExit: break; case CommandChoice::kCreateFdSession: os << "fd="; if (command.fd.has_value()) { os << command.fd.value(); } else { os << "(nullopt)"; } os << ","; FALLTHROUGH_INTENDED; case CommandChoice::kCreateSession: os << "description="; if (command.file_path) { os << "'" << *(command.file_path) << "'"; } else { os << "(nullopt)"; } break; case CommandChoice::kDestroySession: break; case CommandChoice::kDumpSession: break; case CommandChoice::kDumpEverything: break; default: CHECK(false) << "forgot to handle this choice"; break; } os << "}"; return os; } template struct ParseResult { T value; char* next_token; size_t stream_size; ParseResult() : value{}, next_token{nullptr}, stream_size{} { } constexpr operator bool() const { return next_token != nullptr; } }; // Very spammy: Keep it off by default. Set to true if changing this code. static constexpr bool kDebugParsingRead = false; #define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead " // Parse a strong type T from a buffer stream. // If there's insufficient space left to parse the value, an empty ParseResult is returned. template ParseResult ParsingRead(char* stream, size_t stream_size) { if (stream == nullptr) { DEBUG_PREAD << "stream was null"; return {}; } if constexpr (std::is_same_v) { ParseResult length = ParsingRead(stream, stream_size); if (!length) { DEBUG_PREAD << "could not find length"; // Not enough bytes left? return {}; } ParseResult string_result; string_result.value.reserve(length); stream = length.next_token; stream_size = length.stream_size; for (size_t i = 0; i < length.value; ++i) { ParseResult char_result = ParsingRead(stream, stream_size); stream = char_result.next_token; stream_size = char_result.stream_size; if (!char_result) { DEBUG_PREAD << "too few chars in stream, expected length: " << length.value; // Not enough bytes left? return {}; } string_result.value += char_result.value; DEBUG_PREAD << "string preliminary is : " << string_result.value; } DEBUG_PREAD << "parsed string to: " << string_result.value; string_result.next_token = stream; return string_result; } else { if (sizeof(T) > stream_size) { return {}; } ParseResult result; result.next_token = stream + sizeof(T); result.stream_size = stream_size - sizeof(T); memcpy(&result.value, stream, sizeof(T)); return result; } } // Convenience overload to chain multiple ParsingRead together. template ParseResult ParsingRead(ParseResult result) { return ParsingRead(result.next_token, result.stream_size); } class CommandParser { public: CommandParser(PrefetcherForkParameters params) { params_ = params; } std::vector ParseSocketCommands(bool& eof) { eof = false; std::vector commands_vec; std::vector buf_vector; buf_vector.resize(1024*1024); // 1MB. char* buf = &buf_vector[0]; // Binary only parsing. The higher level code can parse text // with ifstream if it really wants to. char* stream = &buf[0]; size_t stream_size = buf_vector.size(); while (true) { if (stream_size == 0) { // TODO: reply with an overflow command. LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; stream = &buf[0]; stream_size = buf_vector.size(); memset(&buf[0], /*c*/0, buf_vector.size()); } if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")"; } ssize_t count; struct msghdr hdr; memset(&hdr, 0, sizeof(hdr)); { union { struct cmsghdr cmh; char control[CMSG_SPACE(sizeof(int))]; } control_un; memset(&control_un, 0, sizeof(control_un)); /* Set 'control_un' to describe ancillary data that we want to receive */ control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */ control_un.cmh.cmsg_level = SOL_SOCKET; control_un.cmh.cmsg_type = SCM_CREDENTIALS; // the regular message data will be read into stream struct iovec iov; memset(&iov, 0, sizeof(iov)); iov.iov_base = stream; iov.iov_len = stream_size; /* Set hdr fields to describe 'control_un' */ hdr.msg_control = control_un.control; hdr.msg_controllen = sizeof(control_un.control); hdr.msg_iov = &iov; hdr.msg_iovlen = 1; hdr.msg_name = nullptr; /* no peer address */ hdr.msg_namelen = 0; count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0)); } if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size; } if (count < 0) { PLOG(ERROR) << "failed to recvmsg from input fd"; break; // TODO: let the daemon be restarted by higher level code? } else if (count == 0) { LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; eof = true; break; // TODO: let the daemon be restarted by higher level code? } { /* Extract fd from ancillary data if present */ struct cmsghdr* hp; hp = CMSG_FIRSTHDR(&hdr); if (hp && // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing? // (hp->cmsg_len == CMSG_LEN(sizeof(int))) && (hp->cmsg_level == SOL_SOCKET) && (hp->cmsg_type == SCM_RIGHTS)) { int passed_fd = *(int*) CMSG_DATA(hp); if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd; } // tack the FD into our dequeue. // we assume the FDs are sent in-order same as the regular iov are sent in-order. longbuf_fds_.insert(longbuf_fds_.end(), passed_fd); } else if (hp != nullptr) { if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS," << "cmsg_len=" << hp->cmsg_len << "," << "cmsg_level=" << hp->cmsg_level << "," << "cmsg_type=" << hp->cmsg_type; } } } longbuf_.insert(longbuf_.end(), stream, stream + count); if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); } // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]* { if (longbuf_.size() == 0) { break; } std::vector v(longbuf_.begin(), longbuf_.end()); std::vector v_fds{longbuf_fds_.begin(), longbuf_fds_.end()}; if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); if (WOULD_LOG(VERBOSE)) { std::stringstream dump; dump << std::hex << std::setfill('0'); for (size_t i = 0; i < v.size(); ++i) { dump << std::setw(2) << static_cast(v[i]); } LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); } LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size(); if (WOULD_LOG(VERBOSE)) { std::stringstream dump; for (size_t i = 0; i < v_fds.size(); ++i) { dump << v_fds[i] << ", "; } LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str(); } } size_t v_fds_off = 0; size_t consumed_fds_total = 0; size_t v_off = 0; size_t consumed_bytes = std::numeric_limits::max(); size_t consumed_total = 0; while (true) { std::optional maybe_command; maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); consumed_total += consumed_bytes; // Normal every time we get to the end of a buffer. if (!maybe_command) { if (LogVerboseIpc()) { LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); } break; } if (maybe_command->RequiresFd()) { if (v_fds_off < v_fds.size()) { maybe_command->fd = v_fds[v_fds_off++]; consumed_fds_total++; if (LogVerboseIpc()) { LOG(VERBOSE) << "Append the FD to " << *maybe_command; } } else { LOG(WARNING) << "Failed to acquire FD for " << *maybe_command; } } // in the next pass ignore what we already consumed. v_off += consumed_bytes; // true as long we don't hit the 'break' above. DCHECK_EQ(v_off, consumed_total); if (LogVerboseIpc()) { LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() << "," << *maybe_command; // Pretty-print a single command for debugging/testing. LOG(VERBOSE) << *maybe_command; } // add to the commands we parsed. commands_vec.push_back(*maybe_command); } // erase however many were consumed longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); // erase however many FDs were consumed. longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total); } break; } return commands_vec; } std::vector ParseCommands(bool& eof) { eof = false; std::vector commands_vec; std::vector buf_vector; buf_vector.resize(kPipeBufferSize); char* buf = &buf_vector[0]; // Binary only parsing. The higher level code can parse text // with ifstream if it really wants to. char* stream = &buf[0]; size_t stream_size = buf_vector.size(); while (true) { if (stream_size == 0) { // TODO: reply with an overflow command. LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; stream = &buf[0]; stream_size = buf_vector.size(); memset(&buf[0], /*c*/0, buf_vector.size()); } if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")"; } ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size)); if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size; } if (count < 0) { PLOG(ERROR) << "failed to read from input fd"; break; // TODO: let the daemon be restarted by higher level code? } else if (count == 0) { LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; eof = true; break; // TODO: let the daemon be restarted by higher level code? } longbuf_.insert(longbuf_.end(), stream, stream + count); if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); } std::optional maybe_command; { if (longbuf_.size() == 0) { break; } std::vector v(longbuf_.begin(), longbuf_.end()); if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); if (WOULD_LOG(VERBOSE)) { std::stringstream dump; dump << std::hex << std::setfill('0'); for (size_t i = 0; i < v.size(); ++i) { dump << std::setw(2) << static_cast(v[i]); } LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); } } size_t v_off = 0; size_t consumed_bytes = std::numeric_limits::max(); size_t consumed_total = 0; while (true) { maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); consumed_total += consumed_bytes; // Normal every time we get to the end of a buffer. if (!maybe_command) { if (LogVerboseIpc()) { LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); } break; } // in the next pass ignore what we already consumed. v_off += consumed_bytes; // true as long we don't hit the 'break' above. DCHECK_EQ(v_off, consumed_total); if (LogVerboseIpc()) { LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() << "," << *maybe_command; // Pretty-print a single command for debugging/testing. LOG(VERBOSE) << *maybe_command; } // add to the commands we parsed. commands_vec.push_back(*maybe_command); } // erase however many were consumed longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); } break; } return commands_vec; } private: bool IsTextMode() const { return params_.format_text; } PrefetcherForkParameters params_; // A buffer long enough to contain a lot of buffers. // This handles reads that only contain a partial command. std::deque longbuf_; // File descriptor buffers. std::deque longbuf_fds_; }; static constexpr bool kDebugCommandRead = true; #define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read " std::optional Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) { *consumed_bytes = 0; if (buf == nullptr) { return std::nullopt; } Command cmd{}; // zero-initialize any unused fields ParseResult parsed_choice = ParsingRead(buf, buf_size); cmd.choice = parsed_choice.value; if (!parsed_choice) { DEBUG_READ << "no choice"; return std::nullopt; } switch (parsed_choice.value) { case CommandChoice::kRegisterFilePath: { ParseResult parsed_session_id = ParsingRead(parsed_choice); if (!parsed_session_id) { DEBUG_READ << "no parsed session id"; return std::nullopt; } ParseResult parsed_id = ParsingRead(parsed_session_id); if (!parsed_id) { DEBUG_READ << "no parsed id"; return std::nullopt; } ParseResult parsed_file_path = ParsingRead(parsed_id); if (!parsed_file_path) { DEBUG_READ << "no file path"; return std::nullopt; } *consumed_bytes = parsed_file_path.next_token - buf; cmd.session_id = parsed_session_id.value; cmd.id = parsed_id.value; cmd.file_path = parsed_file_path.value; break; } case CommandChoice::kUnregisterFilePath: { ParseResult parsed_session_id = ParsingRead(parsed_choice); if (!parsed_session_id) { DEBUG_READ << "no parsed session id"; return std::nullopt; } ParseResult parsed_id = ParsingRead(parsed_session_id); if (!parsed_id) { DEBUG_READ << "no parsed id"; return std::nullopt; } *consumed_bytes = parsed_id.next_token - buf; cmd.session_id = parsed_session_id.value; cmd.id = parsed_id.value; break; } case CommandChoice::kReadAhead: { ParseResult parsed_session_id = ParsingRead(parsed_choice); if (!parsed_session_id) { DEBUG_READ << "no parsed session id"; return std::nullopt; } ParseResult parsed_id = ParsingRead(parsed_session_id); if (!parsed_id) { DEBUG_READ << "no parsed id"; return std::nullopt; } ParseResult parsed_kind = ParsingRead(parsed_id); if (!parsed_kind) { DEBUG_READ << "no parsed kind"; return std::nullopt; } ParseResult parsed_length = ParsingRead(parsed_kind); if (!parsed_length) { DEBUG_READ << "no parsed length"; return std::nullopt; } ParseResult parsed_offset = ParsingRead(parsed_length); if (!parsed_offset) { DEBUG_READ << "no parsed offset"; return std::nullopt; } *consumed_bytes = parsed_offset.next_token - buf; cmd.session_id = parsed_session_id.value; cmd.id = parsed_id.value; cmd.read_ahead_kind = parsed_kind.value; cmd.length = parsed_length.value; cmd.offset = parsed_offset.value; break; } case CommandChoice::kCreateSession: case CommandChoice::kCreateFdSession: { ParseResult parsed_session_id = ParsingRead(parsed_choice); if (!parsed_session_id) { DEBUG_READ << "no parsed session id"; return std::nullopt; } ParseResult parsed_description = ParsingRead(parsed_session_id); if (!parsed_description) { DEBUG_READ << "no description"; return std::nullopt; } *consumed_bytes = parsed_description.next_token - buf; cmd.session_id = parsed_session_id.value; cmd.file_path = parsed_description.value; break; } case CommandChoice::kDestroySession: case CommandChoice::kDumpSession: { ParseResult parsed_session_id = ParsingRead(parsed_choice); if (!parsed_session_id) { DEBUG_READ << "no parsed session id"; return std::nullopt; } *consumed_bytes = parsed_session_id.next_token - buf; cmd.session_id = parsed_session_id.value; break; } case CommandChoice::kExit: case CommandChoice::kDumpEverything: *consumed_bytes = parsed_choice.next_token - buf; // Only need to parse the choice. break; default: LOG(FATAL) << "unrecognized command number " << static_cast(parsed_choice.value); break; } return cmd; } bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const { *produced_bytes = 0; if (buf == nullptr) { LOG(WARNING) << "null buf, is this expected?"; return false; } bool has_enough_space = false; size_t space_requirement = std::numeric_limits::max(); space_requirement = sizeof(choice); switch (choice) { case CommandChoice::kRegisterFilePath: space_requirement += sizeof(session_id); space_requirement += sizeof(id); space_requirement += sizeof(uint32_t); // string length if (!file_path) { LOG(WARNING) << "Missing file path for kRegisterFilePath"; return false; } space_requirement += file_path->size(); // string contents break; case CommandChoice::kUnregisterFilePath: space_requirement += sizeof(session_id); space_requirement += sizeof(id); break; case CommandChoice::kReadAhead: space_requirement += sizeof(session_id); space_requirement += sizeof(id); space_requirement += sizeof(read_ahead_kind); space_requirement += sizeof(length); space_requirement += sizeof(offset); break; case CommandChoice::kCreateSession: case CommandChoice::kCreateFdSession: space_requirement += sizeof(session_id); space_requirement += sizeof(uint32_t); // string length if (!file_path) { LOG(WARNING) << "Missing file path for kCreateSession"; return false; } space_requirement += file_path->size(); // string contents break; case CommandChoice::kDestroySession: case CommandChoice::kDumpSession: space_requirement += sizeof(session_id); break; case CommandChoice::kExit: case CommandChoice::kDumpEverything: // Only need space for the choice. break; default: LOG(FATAL) << "unrecognized command number " << static_cast(choice); break; } if (buf_size < space_requirement) { return false; } *produced_bytes = space_requirement; // Always write out the choice. size_t buf_offset = 0; memcpy(&buf[buf_offset], &choice, sizeof(choice)); buf_offset += sizeof(choice); switch (choice) { case CommandChoice::kRegisterFilePath: memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); buf_offset += sizeof(session_id); memcpy(&buf[buf_offset], &id, sizeof(id)); buf_offset += sizeof(id); { uint32_t string_length = static_cast(file_path->size()); memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); buf_offset += sizeof(string_length); } DCHECK(file_path.has_value()); memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); buf_offset += file_path->size(); break; case CommandChoice::kUnregisterFilePath: memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); buf_offset += sizeof(session_id); memcpy(&buf[buf_offset], &id, sizeof(id)); buf_offset += sizeof(id); break; case CommandChoice::kReadAhead: memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); buf_offset += sizeof(session_id); memcpy(&buf[buf_offset], &id, sizeof(id)); buf_offset += sizeof(id); memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind)); buf_offset += sizeof(read_ahead_kind); memcpy(&buf[buf_offset], &length, sizeof(length)); buf_offset += sizeof(length); memcpy(&buf[buf_offset], &offset, sizeof(offset)); buf_offset += sizeof(offset); break; case CommandChoice::kCreateSession: case CommandChoice::kCreateFdSession: memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); buf_offset += sizeof(session_id); { uint32_t string_length = static_cast(file_path->size()); memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); buf_offset += sizeof(string_length); } DCHECK(file_path.has_value()); memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); buf_offset += file_path->size(); DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size(); DCHECK_EQ(buf_offset, *produced_bytes) << *this; break; case CommandChoice::kDestroySession: case CommandChoice::kDumpSession: memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); buf_offset += sizeof(session_id); break; case CommandChoice::kExit: case CommandChoice::kDumpEverything: // Only need to write out the choice. break; default: LOG(FATAL) << "should have fallen out in the above switch" << static_cast(choice); break; } DCHECK_EQ(buf_offset, space_requirement) << *this; DCHECK_EQ(buf_offset, *produced_bytes) << *this; return true; } class PrefetcherDaemon::Impl { public: std::optional StartPipesViaFork() { int pipefds[2]; if (pipe(&pipefds[0]) != 0) { PLOG(FATAL) << "Failed to create read/write pipes"; } if (WOULD_LOG(VERBOSE)) { long pipe_size = static_cast(fcntl(pipefds[0], F_GETPIPE_SZ)); if (pipe_size < 0) { PLOG(ERROR) << "Failed to F_GETPIPE_SZ:"; } LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size; } for (int i = 0; i < 2; ++i) { // Default pipe size is usually 64KB. // Increase to 1MB so that iorapd has to rarely run during prefetching. if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) { PLOG(FATAL) << "Failed to increase pipe size to max"; } } pipefd_read_ = pipefds[0]; pipefd_write_ = pipefds[1]; PrefetcherForkParameters params; params.input_fd = pipefd_read_; params.output_fd = pipefd_write_; params.format_text = false; params.use_sockets = false; bool res = StartViaFork(params); if (res) { return params; } else { return std::nullopt; } } std::optional StartSocketViaFork() { int socket_fds[2]; if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) { PLOG(FATAL) << "Failed to create read/write socketpair"; } pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer PrefetcherForkParameters params; params.input_fd = pipefd_read_; params.output_fd = pipefd_write_; params.format_text = false; params.use_sockets = true; bool res = StartViaFork(params); if (res) { return params; } else { return std::nullopt; } } bool StartViaFork(PrefetcherForkParameters params) { params_ = params; forked_ = true; child_ = fork(); if (child_ == -1) { LOG(FATAL) << "Failed to fork PrefetcherDaemon"; } else if (child_ > 0) { // we are the caller of this function LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_; return true; } else { // we are the child that was forked. std::stringstream argv; // for logging std::vector argv_vec; { std::stringstream s; s << "--input-fd"; argv_vec.push_back(s.str()); std::stringstream s2; s2 << params.input_fd; argv_vec.push_back(s2.str()); argv << " --input-fd" << " " << params.input_fd; } { std::stringstream s; s << "--output-fd"; argv_vec.push_back(s.str()); std::stringstream s2; s2 << params.output_fd; argv_vec.push_back(s2.str()); argv << " --output-fd" << " " << params.output_fd; } if (params.use_sockets) { std::stringstream s; s << "--use-sockets"; argv_vec.push_back(s.str()); argv << " --use-sockets"; } if (WOULD_LOG(VERBOSE)) { std::stringstream s; s << "--verbose"; argv_vec.push_back(s.str()); argv << " --verbose"; } std::unique_ptr argv_ptr = common::VecToArgv(kCommandFileName, argv_vec); LOG(DEBUG) << "fork+exec: " << kCommandFileName << " " << argv.str(); execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr); // This should never return. _exit(EXIT_FAILURE); } DCHECK(false); return false; } // TODO: Not very useful since this can never return 'true' // -> in the child we would've already execd which loses all this code. bool IsDaemon() { // In the child the pid is always 0. return child_ > 0; } bool Main(PrefetcherForkParameters params) { LOG(VERBOSE) << "PrefetcherDaemon::Main " << params; CommandParser command_parser{params}; Command next_command{}; std::vector many_commands; // Ensure alogd is pre-initialized before installing minijail. LOG(DEBUG) << "Installing minijail"; // Install seccomp filter using libminijail. if (kInstallMiniJail) { MiniJail(); } while (true) { bool eof = false; if (params.use_sockets) { // use recvmsg(2). supports receiving FDs. many_commands = command_parser.ParseSocketCommands(/*out*/eof); } else { // use read(2). does not support receiving FDs. many_commands = command_parser.ParseCommands(/*out*/eof); } if (eof) { LOG(WARNING) << "PrefetcherDaemon got EOF, terminating"; return true; } for (auto& command : many_commands) { if (LogVerboseIpc()) { LOG(VERBOSE) << "PrefetcherDaemon got command: " << command; } if (command.choice == CommandChoice::kExit) { LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating"; return true; } if (!ReceiveCommand(command)) { // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command; } // ReceiveCommand should dup to keep the FD. Avoid leaks. if (command.fd.has_value()) { close(*command.fd); } } } LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating"; return true; // Terminate. } Impl(PrefetcherDaemon* daemon) { session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect); DCHECK(session_manager_ != nullptr); }; ~Impl() { // Don't do anything if we never called 'StartViaFork' if (forked_) { if (!IsDaemon()) { int status; waitpid(child_, /*out*/&status, /*options*/0); } else { LOG(WARNING) << "execve should have avoided this path"; // DCHECK(false) << "not possible because the execve would avoid this path"; } } } bool SendCommand(const Command& command) { // Only parent is the sender. DCHECK(forked_); //DCHECK(!IsDaemon()); char buf[1024]; size_t stream_size; if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) { PLOG(ERROR) << "Failed to serialize command: " << command; return false; } if (LogVerboseIpc()) { LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf << ", size=" << stream_size<< ")"; } if (params_.use_sockets) { /* iov contains the normal message (Command) */ struct iovec iov; memset(&iov, 0, sizeof(iov)); iov.iov_base = &buf[0]; iov.iov_len = stream_size; struct msghdr msg; memset(&msg, 0, sizeof(msg)); /* point to iov to transmit */ msg.msg_iov = &iov; msg.msg_iovlen = 1; /* no dest address; socket is connected */ msg.msg_name = nullptr; msg.msg_namelen = 0; // append a CMSG with SCM_RIGHTS if we have an FD. if (command.fd.has_value()) { union { struct cmsghdr cmh; char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */ } control_un; memset(&control_un, 0, sizeof(control_un)); msg.msg_control = &control_un.control[0]; msg.msg_controllen = sizeof(control_un.control); struct cmsghdr *hp; hp = CMSG_FIRSTHDR(&msg); hp->cmsg_len = CMSG_LEN(sizeof(int)); hp->cmsg_level = SOL_SOCKET; hp->cmsg_type = SCM_RIGHTS; *((int *) CMSG_DATA(hp)) = *(command.fd); DCHECK(command.RequiresFd()) << command; if (LogVerboseIpc()) { LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd); } } // TODO: add CMSG for the FD passage. if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) { PLOG(ERROR) << "Failed to sendmsg command: " << command; return false; } } else { if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) { PLOG(ERROR) << "Failed to write command: " << command; return false; } } if (LogVerboseIpc()) { LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf << ", size=" << stream_size<< ")"; } // TODO: also read the reply? return true; } bool ReceiveCommand(const Command& command) { // Only child is the command receiver. // DCHECK(IsDaemon()); switch (command.choice) { case CommandChoice::kRegisterFilePath: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (!session) { LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; return false; } CHECK(command.file_path.has_value()) << command; return session->RegisterFilePath(command.id, *command.file_path); } case CommandChoice::kUnregisterFilePath: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (!session) { LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; return false; } return session->UnregisterFilePath(command.id); } case CommandChoice::kReadAhead: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (!session) { LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; return false; } return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset); } // TODO: unreadahead case CommandChoice::kExit: { LOG(WARNING) << "kExit should be handled earlier."; return true; } case CommandChoice::kCreateSession: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (session != nullptr) { LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; return false; } CHECK(command.file_path.has_value()) << command; if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path) == nullptr) { LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command; return false; } return true; } case CommandChoice::kDestroySession: { if (!session_manager_->DestroySession(command.session_id)) { LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command; return false; } return true; } case CommandChoice::kDumpSession: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (!session) { LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; return false; } // TODO: Consider doing dumpsys support somehow? session->Dump(LOG_STREAM(DEBUG), /*multiline*/true); return true; } case CommandChoice::kDumpEverything: { session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true); break; } case CommandChoice::kCreateFdSession: { std::shared_ptr session = session_manager_->FindSession(command.session_id); if (session != nullptr) { LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; return false; } CHECK(command.file_path.has_value()) << command; CHECK(command.fd.has_value()) << command; LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd); // TODO: Maybe use CreateFdSession instead? session = session_manager_->CreateSession(command.session_id, /*description*/*command.file_path, command.fd.value()); if (session == nullptr) { LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command; return false; } return session->ProcessFd(*command.fd); } } return true; } pid_t child_; bool forked_; int pipefd_read_; int pipefd_write_; PrefetcherForkParameters params_; // do not ever use an indirect session manager here, as it would cause a lifetime cycle. std::unique_ptr session_manager_; // direct only. }; PrefetcherDaemon::PrefetcherDaemon() : impl_{new Impl{this}} { LOG(VERBOSE) << "PrefetcherDaemon() constructor"; } bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) { return impl_->StartViaFork(std::move(params)); } std::optional PrefetcherDaemon::StartPipesViaFork() { return impl_->StartPipesViaFork(); } std::optional PrefetcherDaemon::StartSocketViaFork() { return impl_->StartSocketViaFork(); } bool PrefetcherDaemon::Main(PrefetcherForkParameters params) { return impl_->Main(params); } bool PrefetcherDaemon::SendCommand(const Command& command) { return impl_->SendCommand(command); } PrefetcherDaemon::~PrefetcherDaemon() { // required for unique_ptr for incomplete types. } } // namespace iorap::prefetcher