/* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "os/reactor.h" #include #include #include #include #include #include #include #include #include "os/log.h" namespace { // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory constexpr int kEpollMaxEvents = 64; constexpr uint64_t kStopReactor = 1 << 0; constexpr uint64_t kWaitForIdle = 1 << 1; } // namespace namespace bluetooth { namespace os { using common::Closure; class Reactor::Reactable { public: Reactable(int fd, Closure on_read_ready, Closure on_write_ready) : fd_(fd), on_read_ready_(std::move(on_read_ready)), on_write_ready_(std::move(on_write_ready)), is_executing_(false), removed_(false) {} const int fd_; Closure on_read_ready_; Closure on_write_ready_; bool is_executing_; bool removed_; std::mutex mutex_; std::unique_ptr> finished_promise_; }; Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); control_fd_ = eventfd(0, EFD_NONBLOCK); ASSERT(control_fd_ != -1); epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}}; int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event)); ASSERT(result != -1); } Reactor::~Reactor() { int result; RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr)); ASSERT(result != -1); RUN_NO_INTR(result = close(control_fd_)); ASSERT(result != -1); RUN_NO_INTR(result = close(epoll_fd_)); ASSERT(result != -1); } void Reactor::Run() { bool already_running = is_running_.exchange(true); ASSERT(!already_running); int timeout_ms = -1; bool waiting_for_idle = false; for (;;) { { std::unique_lock lock(mutex_); invalidation_list_.clear(); } epoll_event events[kEpollMaxEvents]; int count; RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms)); ASSERT(count != -1); if (waiting_for_idle && count == 0) { timeout_ms = -1; waiting_for_idle = false; idle_promise_->set_value(); idle_promise_ = nullptr; } for (int i = 0; i < count; ++i) { auto event = events[i]; ASSERT(event.events != 0u); // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered if (event.data.ptr == nullptr) { uint64_t value; eventfd_read(control_fd_, &value); if ((value & kStopReactor) != 0) { is_running_ = false; return; } else if ((value & kWaitForIdle) != 0) { timeout_ms = 30; waiting_for_idle = true; continue; } else { LOG_ERROR("Unknown control_fd value %" PRIu64 "x", value); continue; } } auto* reactable = static_cast(event.data.ptr); std::unique_lock lock(mutex_); executing_reactable_finished_ = nullptr; // See if this reactable has been removed in the meantime. if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) { continue; } { std::lock_guard reactable_lock(reactable->mutex_); lock.unlock(); reactable->is_executing_ = true; } if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) { reactable->on_read_ready_.Run(); } if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) { reactable->on_write_ready_.Run(); } { std::unique_lock reactable_lock(reactable->mutex_); reactable->is_executing_ = false; if (reactable->removed_) { reactable->finished_promise_->set_value(); reactable_lock.unlock(); delete reactable; } } } } } void Reactor::Stop() { if (!is_running_) { LOG_WARN("not running, will stop once it's started"); } auto control = eventfd_write(control_fd_, kStopReactor); ASSERT(control != -1); } Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) { uint32_t poll_event_type = 0; if (!on_read_ready.is_null()) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (!on_write_ready.is_null()) { poll_event_type |= EPOLLOUT; } auto* reactable = new Reactable(fd, on_read_ready, on_write_ready); epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, }; int register_fd; RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event)); ASSERT(register_fd != -1); return reactable; } void Reactor::Unregister(Reactor::Reactable* reactable) { ASSERT(reactable != nullptr); { std::lock_guard lock(mutex_); invalidation_list_.push_back(reactable); } bool delaying_delete_until_callback_finished = false; { int result; std::lock_guard reactable_lock(reactable->mutex_); RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr)); if (result == -1 && errno == ENOENT) { LOG_INFO("reactable is invalid or unregistered"); } else { ASSERT(result != -1); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable->removed_ = true; reactable->finished_promise_ = std::make_unique>(); executing_reactable_finished_ = std::make_shared>(reactable->finished_promise_->get_future()); delaying_delete_until_callback_finished = true; } } // If we are unregistering outside of the callback event from this reactable, we delete it now if (!delaying_delete_until_callback_finished) { delete reactable; } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { std::lock_guard lock(mutex_); if (executing_reactable_finished_ == nullptr) { return true; } auto stop_status = executing_reactable_finished_->wait_for(timeout); if (stop_status != std::future_status::ready) { LOG_ERROR("Unregister reactable timed out"); } return stop_status == std::future_status::ready; } bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) { auto promise = std::make_shared>(); auto future = std::make_unique>(promise->get_future()); { std::lock_guard lock(mutex_); idle_promise_ = promise; } auto control = eventfd_write(control_fd_, kWaitForIdle); ASSERT(control != -1); auto idle_status = future->wait_for(timeout); return idle_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { ASSERT(reactable != nullptr); uint32_t poll_event_type = 0; if (!on_read_ready.is_null()) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (!on_write_ready.is_null()) { poll_event_type |= EPOLLOUT; } { std::lock_guard reactable_lock(reactable->mutex_); reactable->on_read_ready_ = std::move(on_read_ready); reactable->on_write_ready_ = std::move(on_write_ready); } epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, }; int modify_fd; RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event)); ASSERT(modify_fd != -1); } } // namespace os } // namespace bluetooth