1 /*
2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "os/reactor.h"
18
19 #include <bluetooth/log.h>
20 #include <fcntl.h>
21 #include <sys/epoll.h>
22 #include <sys/eventfd.h>
23 #include <unistd.h>
24
25 #include <algorithm>
26 #include <cerrno>
27 #include <cinttypes>
28 #include <cstring>
29
30 #include "os/log.h"
31
32 namespace {
33
34 // Use at most sizeof(epoll_event) * kEpollMaxEvents kernel memory
35 constexpr int kEpollMaxEvents = 64;
36 constexpr uint64_t kStopReactor = 1 << 0;
37 constexpr uint64_t kWaitForIdle = 1 << 1;
38
39 } // namespace
40
41 namespace bluetooth {
42 namespace os {
43 using common::Closure;
44
45 struct Reactor::Event::impl {
implbluetooth::os::Reactor::Event::impl46 impl() {
47 fd_ = eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK);
48 log::assert_that(fd_ != -1, "Unable to create nonblocking event file descriptor semaphore");
49 }
~implbluetooth::os::Reactor::Event::impl50 ~impl() {
51 log::assert_that(fd_ != -1, "Unable to close a never-opened event file descriptor");
52 close(fd_);
53 fd_ = -1;
54 }
55 int fd_ = -1;
56 };
57
Event()58 Reactor::Event::Event() : pimpl_(new impl()) {}
~Event()59 Reactor::Event::~Event() {
60 delete pimpl_;
61 }
62
Read()63 bool Reactor::Event::Read() {
64 uint64_t val = 0;
65 return eventfd_read(pimpl_->fd_, &val) == 0;
66 }
Id() const67 int Reactor::Event::Id() const {
68 return pimpl_->fd_;
69 }
Clear()70 void Reactor::Event::Clear() {
71 uint64_t val;
72 while (eventfd_read(pimpl_->fd_, &val) == 0) {
73 }
74 }
Close()75 void Reactor::Event::Close() {
76 int close_status;
77 RUN_NO_INTR(close_status = close(pimpl_->fd_));
78 log::assert_that(close_status != -1, "assert failed: close_status != -1");
79 }
Notify()80 void Reactor::Event::Notify() {
81 uint64_t val = 1;
82 auto write_result = eventfd_write(pimpl_->fd_, val);
83 log::assert_that(write_result != -1, "assert failed: write_result != -1");
84 }
85
86 class Reactor::Reactable {
87 public:
Reactable(int fd,Closure on_read_ready,Closure on_write_ready)88 Reactable(int fd, Closure on_read_ready, Closure on_write_ready)
89 : fd_(fd),
90 on_read_ready_(std::move(on_read_ready)),
91 on_write_ready_(std::move(on_write_ready)),
92 is_executing_(false),
93 removed_(false) {}
94 const int fd_;
95 Closure on_read_ready_;
96 Closure on_write_ready_;
97 bool is_executing_;
98 bool removed_;
99 std::mutex mutex_;
100 std::unique_ptr<std::promise<void>> finished_promise_;
101 };
102
Reactor()103 Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false) {
104 RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC));
105 log::assert_that(epoll_fd_ != -1, "could not create epoll fd: {}", strerror(errno));
106
107 control_fd_ = eventfd(0, EFD_NONBLOCK);
108 log::assert_that(control_fd_ != -1, "assert failed: control_fd_ != -1");
109
110 epoll_event control_epoll_event = {EPOLLIN, {.ptr = nullptr}};
111 int result;
112 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, control_fd_, &control_epoll_event));
113 log::assert_that(result != -1, "assert failed: result != -1");
114 }
115
~Reactor()116 Reactor::~Reactor() {
117 int result;
118 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, control_fd_, nullptr));
119 log::assert_that(result != -1, "assert failed: result != -1");
120
121 RUN_NO_INTR(result = close(control_fd_));
122 log::assert_that(result != -1, "assert failed: result != -1");
123
124 RUN_NO_INTR(result = close(epoll_fd_));
125 log::assert_that(result != -1, "assert failed: result != -1");
126 }
127
Run()128 void Reactor::Run() {
129 bool already_running = is_running_.exchange(true);
130 log::assert_that(!already_running, "assert failed: !already_running");
131
132 int timeout_ms = -1;
133 bool waiting_for_idle = false;
134 for (;;) {
135 {
136 std::unique_lock<std::mutex> lock(mutex_);
137 invalidation_list_.clear();
138 }
139 epoll_event events[kEpollMaxEvents];
140 int count;
141 RUN_NO_INTR(count = epoll_wait(epoll_fd_, events, kEpollMaxEvents, timeout_ms));
142 log::assert_that(count != -1, "epoll_wait failed: fd={}, err={}", epoll_fd_, strerror(errno));
143 if (waiting_for_idle && count == 0) {
144 timeout_ms = -1;
145 waiting_for_idle = false;
146 std::scoped_lock<std::mutex> lock(mutex_);
147 idle_promise_->set_value();
148 idle_promise_ = nullptr;
149 }
150
151 for (int i = 0; i < count; ++i) {
152 auto event = events[i];
153 log::assert_that(event.events != 0u, "assert failed: event.events != 0u");
154
155 // If the ptr stored in epoll_event.data is nullptr, it means the control fd triggered
156 if (event.data.ptr == nullptr) {
157 uint64_t value;
158 eventfd_read(control_fd_, &value);
159 if ((value & kStopReactor) != 0) {
160 is_running_ = false;
161 return;
162 } else if ((value & kWaitForIdle) != 0) {
163 timeout_ms = 30;
164 waiting_for_idle = true;
165 continue;
166 } else {
167 log::error("Unknown control_fd value {:x}", value);
168 continue;
169 }
170 }
171 auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr);
172 std::unique_lock<std::mutex> lock(mutex_);
173 executing_reactable_finished_ = nullptr;
174 // See if this reactable has been removed in the meantime.
175 if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) {
176 continue;
177 }
178
179 {
180 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
181 lock.unlock();
182 reactable->is_executing_ = true;
183 }
184 if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && !reactable->on_read_ready_.is_null()) {
185 reactable->on_read_ready_.Run();
186 }
187 if (event.events & EPOLLOUT && !reactable->on_write_ready_.is_null()) {
188 reactable->on_write_ready_.Run();
189 }
190 {
191 std::unique_lock<std::mutex> reactable_lock(reactable->mutex_);
192 reactable->is_executing_ = false;
193 if (reactable->removed_) {
194 reactable->finished_promise_->set_value();
195 reactable_lock.unlock();
196 delete reactable;
197 }
198 }
199 }
200 }
201 }
202
Stop()203 void Reactor::Stop() {
204 if (!is_running_) {
205 log::warn("not running, will stop once it's started");
206 }
207 auto control = eventfd_write(control_fd_, kStopReactor);
208 log::assert_that(control != -1, "assert failed: control != -1");
209 }
210
NewEvent() const211 std::unique_ptr<Reactor::Event> Reactor::NewEvent() const {
212 return std::make_unique<Reactor::Event>();
213 }
214
Register(int fd,Closure on_read_ready,Closure on_write_ready)215 Reactor::Reactable* Reactor::Register(int fd, Closure on_read_ready, Closure on_write_ready) {
216 uint32_t poll_event_type = 0;
217 if (!on_read_ready.is_null()) {
218 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
219 }
220 if (!on_write_ready.is_null()) {
221 poll_event_type |= EPOLLOUT;
222 }
223 auto* reactable = new Reactable(fd, on_read_ready, on_write_ready);
224 epoll_event event = {
225 .events = poll_event_type,
226 .data = {.ptr = reactable},
227 };
228 int register_fd;
229 RUN_NO_INTR(register_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event));
230 log::assert_that(register_fd != -1, "assert failed: register_fd != -1");
231 return reactable;
232 }
233
Unregister(Reactor::Reactable * reactable)234 void Reactor::Unregister(Reactor::Reactable* reactable) {
235 log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
236 {
237 std::lock_guard<std::mutex> lock(mutex_);
238 invalidation_list_.push_back(reactable);
239 }
240 bool delaying_delete_until_callback_finished = false;
241 {
242 int result;
243 std::lock_guard<std::mutex> reactable_lock(reactable->mutex_);
244 RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr));
245 if (result == -1 && errno == ENOENT) {
246 log::info("reactable is invalid or unregistered");
247 } else {
248 log::assert_that(result != -1, "could not unregister epoll fd: {}", strerror(errno));
249 }
250
251 // If we are unregistering during the callback event from this reactable, we delete it after the callback is
252 // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe.
253 if (reactable->is_executing_) {
254 reactable->removed_ = true;
255 reactable->finished_promise_ = std::make_unique<std::promise<void>>();
256 executing_reactable_finished_ = std::make_shared<std::future<void>>(reactable->finished_promise_->get_future());
257 delaying_delete_until_callback_finished = true;
258 }
259 }
260 // If we are unregistering outside of the callback event from this reactable, we delete it now
261 if (!delaying_delete_until_callback_finished) {
262 delete reactable;
263 }
264 }
265
WaitForUnregisteredReactable(std::chrono::milliseconds timeout)266 bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) {
267 std::lock_guard<std::mutex> lock(mutex_);
268 if (executing_reactable_finished_ == nullptr) {
269 return true;
270 }
271 auto stop_status = executing_reactable_finished_->wait_for(timeout);
272 if (stop_status != std::future_status::ready) {
273 log::error("Unregister reactable timed out");
274 }
275 return stop_status == std::future_status::ready;
276 }
277
WaitForIdle(std::chrono::milliseconds timeout)278 bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) {
279 auto promise = std::make_shared<std::promise<void>>();
280 auto future = std::make_unique<std::future<void>>(promise->get_future());
281 {
282 std::lock_guard<std::mutex> lock(mutex_);
283 idle_promise_ = promise;
284 }
285
286 auto control = eventfd_write(control_fd_, kWaitForIdle);
287 log::assert_that(control != -1, "assert failed: control != -1");
288
289 auto idle_status = future->wait_for(timeout);
290 return idle_status == std::future_status::ready;
291 }
292
ModifyRegistration(Reactor::Reactable * reactable,ReactOn react_on)293 void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) {
294 log::assert_that(reactable != nullptr, "assert failed: reactable != nullptr");
295
296 uint32_t poll_event_type = 0;
297 if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) {
298 poll_event_type |= (EPOLLIN | EPOLLRDHUP);
299 }
300 if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) {
301 poll_event_type |= EPOLLOUT;
302 }
303 epoll_event event = {
304 .events = poll_event_type,
305 .data = {.ptr = reactable},
306 };
307 int modify_fd;
308 RUN_NO_INTR(modify_fd = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, reactable->fd_, &event));
309 log::assert_that(modify_fd != -1, "assert failed: modify_fd != -1");
310 }
311
312 } // namespace os
313 } // namespace bluetooth
314