/* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "os/reactor.h" #include #include #include #include #include #include #include #include #include #include "os/log.h" namespace { // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory constexpr int kEpollMaxEvents = 64; constexpr uint64_t kStopReactor = 1 << 0; constexpr uint64_t kWaitForIdle = 1 << 1; } // namespace namespace bluetooth { namespace os { using common::Closure; struct Reactor::Event::impl { impl() { fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK); log::assert_that(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore"); } ~impl() { log::assert_that(fd_ != -1, "Unable to close a never-opened event file descriptor"); close(fd_); fd_ = -1; } int fd_ = -1; }; Reactor::Event::Event() : pimpl_(new impl()) {} Reactor::Event::~Event() { delete pimpl_; } bool Reactor::Event::Read() { uint64_t val = 0; return eventfd_read(pimpl_->fd_, &val) == 0; } int Reactor::Event::Id() const { return pimpl_->fd_; } void Reactor::Event::Clear() { uint64_t val; while (eventfd_read(pimpl_->fd_, &val) == 0) { } } void Reactor::Event::Close() { int close_status; RUN_NO_INTR(close_status = close(pimpl_->fd_)); log::assert_that(close_status != -1, "assert failed: close_status != -1"); } void Reactor::Event::Notify() { uint64_t val = 1; auto write_result = eventfd_write(pimpl_->fd_, val); log::assert_that(write_result != -1, "assert failed: write_result != -1"); } class Reactor::Reactable { public: Reactable(int fd, Closure on_read_ready, Closure on_write_ready) : fd_(fd), on_read_ready_(std::move(on_read_ready)), on_write_ready_(std::move(on_write_ready)), is_executing_(false), removed_(false) {} const int fd_; Closure on_read_ready_; Closure on_write_ready_; bool is_executing_; bool removed_; std::mutex mutex_; std::unique_ptr> finished_promise_; }; Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); log::assert_that(epoll_fd_ != -1, "could not create epoll fd: {}", strerror(errno)); control_fd_ = eventfd(0, EFD_NONBLOCK); log::assert_that(control_fd_ != -1, "assert failed: control_fd_ != -1"); epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}}; int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event)); log::assert_that(result != -1, "assert failed: result != -1"); } Reactor::~Reactor() { int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr)); log::assert_that(result != -1, "assert failed: result != -1"); RUN_NO_INTR(result = close(control_fd_)); log::assert_that(result != -1, "assert failed: result != -1"); RUN_NO_INTR(result = close(epoll_fd_)); log::assert_that(result != -1, "assert failed: result != -1"); } void Reactor::Run() { bool already_running = is_running_.exchange(true); log::assert_that(!already_running, "assert failed: !already_running"); int timeout_ms = -1; bool waiting_for_idle = false; for (;;) { { std::unique_lock lock(mutex_); invalidation_list_.clear(); } epoll_event events[kEpollMaxEvents]; int count; RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms)); log::assert_that(count != -1, "epoll_wait failed: fd={}, err={}", epoll_fd_, strerror(errno)); if (waiting_for_idle && count == 0) { timeout_ms = -1; waiting_for_idle = false; std::scoped_lock lock(mutex_); idle_promise_->set_value(); idle_promise_ = nullptr; } for (int i = 0; i < count; ++i) { auto event = events[i]; log::assert_that(event.events != 0u, "assert failed: event.events != 0u"); // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered if (event.data.ptr == nullptr) { uint64_t value; eventfd_read(control_fd_, &value); if ((value & kStopReactor) != 0) { is_running_ = false; return; } else if ((value & kWaitForIdle) != 0) { timeout_ms = 30; waiting_for_idle = true; continue; } else { log::error("Unknown control_fd value {:x}", value); continue; } } auto* reactable = static_cast(event.data.ptr); std::unique_lock lock(mutex_); executing_reactable_finished_ = nullptr; // See if this reactable has been removed in the meantime. if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) { continue; } { std::lock_guard reactable_lock(reactable->mutex_); lock.unlock(); reactable->is_executing_ = true; } if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) { reactable->on_read_ready_.Run(); } if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) { reactable->on_write_ready_.Run(); } { std::unique_lock reactable_lock(reactable->mutex_); reactable->is_executing_ = false; if (reactable->removed_) { reactable->finished_promise_->set_value(); reactable_lock.unlock(); delete reactable; } } } } } void Reactor::Stop() { if (!is_running_) { log::warn("not running, will stop once it's started"); } auto control = eventfd_write(control_fd_, kStopReactor); log::assert_that(control != -1, "assert failed: control != -1"); } std::unique_ptr Reactor::NewEvent() const { return std::make_unique(); } Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) { uint32_t poll_event_type = 0; if (!on_read_ready.is_null()) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (!on_write_ready.is_null()) { poll_event_type |= EPOLLOUT; } auto* reactable = new Reactable(fd, on_read_ready, on_write_ready); epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, }; int register_fd; RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event)); log::assert_that(register_fd != -1, "assert failed: register_fd != -1"); return reactable; } void Reactor::Unregister(Reactor::Reactable* reactable) { log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr"); { std::lock_guard lock(mutex_); invalidation_list_.push_back(reactable); } bool delaying_delete_until_callback_finished = false; { int result; std::lock_guard reactable_lock(reactable->mutex_); RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr)); if (result == -1 && errno == ENOENT) { log::info("reactable is invalid or unregistered"); } else { log::assert_that(result != -1, "could not unregister epoll fd: {}", strerror(errno)); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable->removed_ = true; reactable->finished_promise_ = std::make_unique>(); executing_reactable_finished_ = std::make_shared>(reactable->finished_promise_->get_future()); delaying_delete_until_callback_finished = true; } } // If we are unregistering outside of the callback event from this reactable, we delete it now if (!delaying_delete_until_callback_finished) { delete reactable; } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { std::lock_guard lock(mutex_); if (executing_reactable_finished_ == nullptr) { return true; } auto stop_status = executing_reactable_finished_->wait_for(timeout); if (stop_status != std::future_status::ready) { log::error("Unregister reactable timed out"); } return stop_status == std::future_status::ready; } bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) { auto promise = std::make_shared>(); auto future = std::make_unique>(promise->get_future()); { std::lock_guard lock(mutex_); idle_promise_ = promise; } auto control = eventfd_write(control_fd_, kWaitForIdle); log::assert_that(control != -1, "assert failed: control != -1"); auto idle_status = future->wait_for(timeout); return idle_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) { log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr"); uint32_t poll_event_type = 0; if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= EPOLLOUT; } epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, }; int modify_fd; RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event)); log::assert_that(modify_fd != -1, "assert failed: modify_fd != -1"); } } // namespace os } // namespace bluetooth