1 /*
2 * Copyright 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "async_manager.h"
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <fcntl.h>
23 #include <mutex>
24 #include <sys/select.h>
25 #include <thread>
26 #include <unistd.h>
27 #include <vector>
28
29 #include "log.h"
30
31 #ifndef TEMP_FAILURE_RETRY
32 /* Used to retry syscalls that can return EINTR. */
33 #define TEMP_FAILURE_RETRY(exp) ({ \
34 __typeof__(exp) _rc; \
35 do { \
36 _rc = (exp); \
37 } while (_rc == -1 && errno == EINTR); \
38 _rc; })
39 #endif // TEMP_FAILURE_RETRY
40
41 namespace rootcanal {
42 // Implementation of AsyncManager is divided between two classes, three if
43 // AsyncManager itself is taken into account, but its only responsability
44 // besides being a proxy for the other two classes is to provide a global
45 // synchronization mechanism for callbacks and client code to use.
46
47 // The watching of file descriptors is done through AsyncFdWatcher. Several
48 // objects of this class may coexist simultaneosly as they share no state.
49 // After construction of this objects nothing happens beyond some very simple
50 // member initialization. When the first FD is set up for watching the object
51 // starts a new thread which watches the given (and later provided) FDs using
52 // select() inside a loop. A special FD (a pipe) is also watched which is
53 // used to notify the thread of internal changes on the object state (like
54 // the addition of new FDs to watch on). Every access to internal state is
55 // synchronized using a single internal mutex. The thread is only stopped on
56 // destruction of the object, by modifying a flag, which is the only member
57 // variable accessed without acquiring the lock (because the notification to
58 // the thread is done later by writing to a pipe which means the thread will
59 // be notified regardless of what phase of the loop it is in that moment)
60
61 // The scheduling of asynchronous tasks, periodic or not, is handled by the
62 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
63 // state between different instances so it is safe to use several objects of
64 // this class, also nothing interesting happens upon construction, but only
65 // after a Task has been scheduled and access to internal state is synchronized
66 // using a single internal mutex. When the first task is scheduled a thread
67 // is started which monitors a queue of tasks. The queue is peeked to see
68 // when the next task should be carried out and then the thread performs a
69 // (absolute) timed wait on a condition variable. The wait ends because of a
70 // time out or a notify on the cond var, the former means a task is due
71 // for execution while the later means there has been a change in internal
72 // state, like a task has been scheduled/canceled or the flag to stop has
73 // been set. Setting and querying the stop flag or modifying the task queue
74 // and subsequent notification on the cond var is done atomically (e.g while
75 // holding the lock on the internal mutex) to ensure that the thread never
76 // misses the notification, since notifying a cond var is not persistent as
77 // writing on a pipe (if not done this way, the thread could query the
78 // stopping flag and be put aside by the OS scheduler right after, then the
79 // 'stop thread' procedure could run, setting the flag, notifying a cond
80 // var that no one is waiting on and joining the thread, the thread then
81 // resumes execution believing that it needs to continue and waits on the
82 // cond var possibly forever if there are no tasks scheduled, efectively
83 // causing a deadlock).
84
85 // This number also states the maximum number of scheduled tasks we can handle
86 // at a given time
87 static const uint16_t kMaxTaskId =
88 -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)89 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
90 return (id == kMaxTaskId) ? 1 : id + 1;
91 }
92 // The buffer is only 10 bytes because the expected number of bytes
93 // written on this socket is 1. It is possible that the thread is notified
94 // more than once but highly unlikely, so a buffer of size 10 seems enough
95 // and the reads are performed inside a while just in case it isn't. From
96 // the thread routine's point of view it is the same to have been notified
97 // just once or 100 times so it just tries to consume the entire buffer.
98 // In the cases where an interrupt would cause read to return without
99 // having read everything that was available a new iteration of the thread
100 // loop will bring execution to this point almost immediately, so there is
101 // no need to treat that case.
102 static const int kNotificationBufferSize = 10;
103
104 // Async File Descriptor Watcher Implementation:
105 class AsyncManager::AsyncFdWatcher {
106 public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)107 int WatchFdForNonBlockingReads(
108 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
109 // add file descriptor and callback
110 {
111 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
112 watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
113 }
114
115 // start the thread if not started yet
116 int started = tryStartThread();
117 if (started != 0) {
118 ERROR("{}: Unable to start thread", __func__);
119 return started;
120 }
121
122 // notify the thread so that it knows of the new FD
123 notifyThread();
124
125 return 0;
126 }
127
StopWatchingFileDescriptor(int file_descriptor)128 void StopWatchingFileDescriptor(int file_descriptor) {
129 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
130 watched_shared_fds_.erase(file_descriptor);
131 }
132
133 AsyncFdWatcher() = default;
134 AsyncFdWatcher(const AsyncFdWatcher&) = delete;
135 AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
136
137 ~AsyncFdWatcher() = default;
138
stopThread()139 int stopThread() {
140 if (!std::atomic_exchange(&running_, false)) {
141 return 0; // if not running already
142 }
143
144 notifyThread();
145
146 if (std::this_thread::get_id() != thread_.get_id()) {
147 thread_.join();
148 } else {
149 WARNING("{}: Starting thread stop from inside the reading thread itself",
150 __func__);
151 }
152
153 {
154 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
155 watched_shared_fds_.clear();
156 }
157
158 return 0;
159 }
160
161 private:
162 // Make sure to call this with at least one file descriptor ready to be
163 // watched upon or the thread routine will return immediately
tryStartThread()164 int tryStartThread() {
165 if (std::atomic_exchange(&running_, true)) {
166 return 0; // if already running
167 }
168 // set up the communication channel
169 int pipe_fds[2];
170 if (pipe(pipe_fds)) {
171 ERROR(
172 "{}: Unable to establish a communication channel to the reading "
173 "thread",
174 __func__);
175 return -1;
176 }
177 // configure the fds as non blocking.
178 if (fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK) ||
179 fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK)) {
180 ERROR(
181 "{}: Unable to configure the communication channel to the reading "
182 "thread",
183 __func__);
184 return -1;
185 }
186
187 notification_listen_fd_ = pipe_fds[0];
188 notification_write_fd_ = pipe_fds[1];
189
190 thread_ = std::thread([this]() { ThreadRoutine(); });
191 if (!thread_.joinable()) {
192 ERROR("{}: Unable to start reading thread", __func__);
193 return -1;
194 }
195 return 0;
196 }
197
notifyThread() const198 int notifyThread() const {
199 char buffer = '0';
200 if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
201 ERROR("{}: Unable to send message to reading thread", __func__);
202 return -1;
203 }
204 return 0;
205 }
206
setUpFileDescriptorSet(fd_set & read_fds)207 int setUpFileDescriptorSet(fd_set& read_fds) {
208 // add comm channel to the set
209 FD_SET(notification_listen_fd_, &read_fds);
210 int nfds = notification_listen_fd_;
211
212 // add watched FDs to the set
213 {
214 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
215 for (auto& fdp : watched_shared_fds_) {
216 FD_SET(fdp.first, &read_fds);
217 nfds = std::max(fdp.first, nfds);
218 }
219 }
220 return nfds;
221 }
222
223 // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds) const224 bool consumeThreadNotifications(fd_set& read_fds) const {
225 if (FD_ISSET(notification_listen_fd_, &read_fds)) {
226 char buffer[kNotificationBufferSize];
227 while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
228 kNotificationBufferSize)) ==
229 kNotificationBufferSize) {
230 }
231 return true;
232 }
233 return false;
234 }
235
236 // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)237 void runAppropriateCallbacks(fd_set& read_fds) {
238 std::vector<decltype(watched_shared_fds_)::value_type> fds;
239 std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
240 for (auto& fdc : watched_shared_fds_) {
241 if (FD_ISSET(fdc.first, &read_fds)) {
242 fds.push_back(fdc);
243 }
244 }
245 for (auto& p : fds) {
246 p.second(p.first);
247 }
248 }
249
ThreadRoutine()250 void ThreadRoutine() {
251 while (running_) {
252 fd_set read_fds;
253 FD_ZERO(&read_fds);
254 int nfds = setUpFileDescriptorSet(read_fds);
255
256 // wait until there is data available to read on some FD
257 int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
258 if (retval <= 0) { // there was some error or a timeout
259 ERROR(
260 "{}: There was an error while waiting for data on the file "
261 "descriptors: {}",
262 __func__, strerror(errno));
263 continue;
264 }
265
266 consumeThreadNotifications(read_fds);
267
268 // Do not read if there was a call to stop running
269 if (!running_) {
270 break;
271 }
272
273 runAppropriateCallbacks(read_fds);
274 }
275 }
276
277 std::atomic_bool running_{false};
278 std::thread thread_;
279 std::recursive_mutex internal_mutex_;
280
281 std::map<int, ReadCallback> watched_shared_fds_;
282
283 // A pair of FD to send information to the reading thread
284 int notification_listen_fd_{};
285 int notification_write_fd_{};
286 };
287
288 // Async task manager implementation
289 class AsyncManager::AsyncTaskManager {
290 public:
GetNextUserId()291 AsyncUserId GetNextUserId() { return lastUserId_++; }
292
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)293 AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
294 const TaskCallback& callback) {
295 return scheduleTask(std::make_shared<Task>(
296 std::chrono::steady_clock::now() + delay, callback, user_id));
297 }
298
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)299 AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
300 std::chrono::milliseconds delay,
301 std::chrono::milliseconds period,
302 const TaskCallback& callback) {
303 return scheduleTask(std::make_shared<Task>(
304 std::chrono::steady_clock::now() + delay, period, callback, user_id));
305 }
306
CancelAsyncTask(AsyncTaskId async_task_id)307 bool CancelAsyncTask(AsyncTaskId async_task_id) {
308 // remove task from queue (and task id association) while holding lock
309 std::unique_lock<std::mutex> guard(internal_mutex_);
310 return cancel_task_with_lock_held(async_task_id);
311 }
312
CancelAsyncTasksFromUser(AsyncUserId user_id)313 bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
314 // remove task from queue (and task id association) while holding lock
315 std::unique_lock<std::mutex> guard(internal_mutex_);
316 if (tasks_by_user_id_.count(user_id) == 0) {
317 return false;
318 }
319 for (auto task : tasks_by_user_id_[user_id]) {
320 cancel_task_with_lock_held(task);
321 }
322 tasks_by_user_id_.erase(user_id);
323 return true;
324 }
325
Synchronize(const CriticalCallback & critical)326 void Synchronize(const CriticalCallback& critical) {
327 std::unique_lock<std::mutex> guard(synchronization_mutex_);
328 critical();
329 }
330
331 AsyncTaskManager() = default;
332 AsyncTaskManager(const AsyncTaskManager&) = delete;
333 AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
334
335 ~AsyncTaskManager() = default;
336
stopThread()337 int stopThread() {
338 {
339 std::unique_lock<std::mutex> guard(internal_mutex_);
340 tasks_by_id_.clear();
341 task_queue_.clear();
342 if (!running_) {
343 return 0;
344 }
345 running_ = false;
346 // notify the thread
347 internal_cond_var_.notify_one();
348 } // release the lock before joining a thread that is likely waiting for it
349 if (std::this_thread::get_id() != thread_.get_id()) {
350 thread_.join();
351 } else {
352 WARNING("{}: Starting thread stop from inside the task thread itself",
353 __func__);
354 }
355 return 0;
356 }
357
358 private:
359 // Holds the data for each task
360 class Task {
361 public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)362 Task(std::chrono::steady_clock::time_point time,
363 std::chrono::milliseconds period, const TaskCallback& callback,
364 AsyncUserId user)
365 : time(time),
366 periodic(true),
367 period(period),
368 callback(callback),
369 task_id(kInvalidTaskId),
370 user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)371 Task(std::chrono::steady_clock::time_point time,
372 const TaskCallback& callback, AsyncUserId user)
373 : time(time),
374 periodic(false),
375 callback(callback),
376 task_id(kInvalidTaskId),
377 user_id(user) {}
378
379 // Operators needed to be in a collection
operator <(const Task & another) const380 bool operator<(const Task& another) const {
381 return std::make_pair(time, task_id) <
382 std::make_pair(another.time, another.task_id);
383 }
384
isPeriodic() const385 bool isPeriodic() const { return periodic; }
386
387 // These fields should no longer be public if the class ever becomes
388 // public or gets more complex
389 std::chrono::steady_clock::time_point time;
390 bool periodic;
391 std::chrono::milliseconds period{};
392 std::mutex in_callback; // Taken when the callback is active
393 TaskCallback callback;
394 AsyncTaskId task_id;
395 AsyncUserId user_id;
396 };
397
398 // A comparator class to put shared pointers to tasks in an ordered set
399 struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator400 bool operator()(const std::shared_ptr<Task>& t1,
401 const std::shared_ptr<Task>& t2) const {
402 return *t1 < *t2;
403 }
404 };
405
cancel_task_with_lock_held(AsyncTaskId async_task_id)406 bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
407 if (tasks_by_id_.count(async_task_id) == 0) {
408 return false;
409 }
410
411 // Now make sure we are not running this task.
412 // 2 cases:
413 // - This is called from thread_, this means a running
414 // scheduled task is actually unregistering. All bets are off.
415 // - Another thread is calling us, let's make sure the task is not active.
416 if (thread_.get_id() != std::this_thread::get_id()) {
417 auto task = tasks_by_id_[async_task_id];
418 const std::lock_guard<std::mutex> lock(task->in_callback);
419 task_queue_.erase(task);
420 tasks_by_id_.erase(async_task_id);
421 } else {
422 task_queue_.erase(tasks_by_id_[async_task_id]);
423 tasks_by_id_.erase(async_task_id);
424 }
425
426 return true;
427 }
428
scheduleTask(const std::shared_ptr<Task> & task)429 AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
430 {
431 std::unique_lock<std::mutex> guard(internal_mutex_);
432 // no more room for new tasks, we need a larger type for IDs
433 if (tasks_by_id_.size() == kMaxTaskId) { // TODO potentially type unsafe
434 return kInvalidTaskId;
435 }
436 do {
437 lastTaskId_ = NextAsyncTaskId(lastTaskId_);
438 } while (isTaskIdInUse(lastTaskId_));
439 task->task_id = lastTaskId_;
440 // add task to the queue and map
441 tasks_by_id_[lastTaskId_] = task;
442 tasks_by_user_id_[task->user_id].insert(task->task_id);
443 task_queue_.insert(task);
444 }
445 // start thread if necessary
446 int started = tryStartThread();
447 if (started != 0) {
448 ERROR("{}: Unable to start thread", __func__);
449 return kInvalidTaskId;
450 }
451 // notify the thread so that it knows of the new task
452 internal_cond_var_.notify_one();
453 // return task id
454 return task->task_id;
455 }
456
isTaskIdInUse(const AsyncTaskId & task_id) const457 bool isTaskIdInUse(const AsyncTaskId& task_id) const {
458 return tasks_by_id_.count(task_id) != 0;
459 }
460
tryStartThread()461 int tryStartThread() {
462 // need the lock because of the running flag and the cond var
463 std::unique_lock<std::mutex> guard(internal_mutex_);
464 // check that the thread is not yet running
465 if (running_) {
466 return 0;
467 }
468 // start the thread
469 running_ = true;
470 thread_ = std::thread([this]() { ThreadRoutine(); });
471 if (!thread_.joinable()) {
472 ERROR("{}: Unable to start task thread", __func__);
473 return -1;
474 }
475 return 0;
476 }
477
ThreadRoutine()478 void ThreadRoutine() {
479 while (running_) {
480 TaskCallback callback;
481 std::shared_ptr<Task> task_p;
482 bool run_it = false;
483 {
484 std::unique_lock<std::mutex> guard(internal_mutex_);
485 if (!task_queue_.empty()) {
486 task_p = *(task_queue_.begin());
487 if (task_p->time < std::chrono::steady_clock::now()) {
488 run_it = true;
489 callback = task_p->callback;
490 task_queue_.erase(task_p); // need to remove and add again if
491 // periodic to update order
492 if (task_p->isPeriodic()) {
493 task_p->time += task_p->period;
494 task_queue_.insert(task_p);
495 } else {
496 tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
497 tasks_by_id_.erase(task_p->task_id);
498 }
499 }
500 }
501 }
502 if (run_it) {
503 const std::lock_guard<std::mutex> lock(task_p->in_callback);
504 Synchronize(callback);
505 }
506 {
507 std::unique_lock<std::mutex> guard(internal_mutex_);
508 // check for termination right before waiting
509 if (!running_) {
510 break;
511 }
512 // wait until time for the next task (if any)
513 if (!task_queue_.empty()) {
514 // Make a copy of the time_point because wait_until takes a reference
515 // to it and may read it after waiting, by which time the task may
516 // have been freed (e.g. via CancelAsyncTask).
517 std::chrono::steady_clock::time_point time =
518 (*task_queue_.begin())->time;
519 internal_cond_var_.wait_until(guard, time);
520 } else {
521 internal_cond_var_.wait(guard);
522 }
523 }
524 }
525 }
526
527 bool running_ = false;
528 std::thread thread_;
529 std::mutex internal_mutex_;
530 std::mutex synchronization_mutex_;
531 std::condition_variable internal_cond_var_;
532
533 AsyncTaskId lastTaskId_ = kInvalidTaskId;
534 AsyncUserId lastUserId_{1};
535 std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
536 std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
537 std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
538 };
539
540 // Async Manager Implementation:
AsyncManager()541 AsyncManager::AsyncManager()
542 : fdWatcher_p_(new AsyncFdWatcher()),
543 taskManager_p_(new AsyncTaskManager()) {}
544
~AsyncManager()545 AsyncManager::~AsyncManager() {
546 // Make sure the threads are stopped before destroying the object.
547 // The threads need to be stopped here and not in each internal class'
548 // destructor because unique_ptr's reset() first assigns nullptr to the
549 // pointer and only then calls the destructor, so any callback running
550 // on these threads would dereference a null pointer if they called a member
551 // function of this class.
552 fdWatcher_p_->stopThread();
553 taskManager_p_->stopThread();
554 }
555
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)556 int AsyncManager::WatchFdForNonBlockingReads(
557 int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
558 return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
559 on_read_fd_ready_callback);
560 }
561
StopWatchingFileDescriptor(int file_descriptor)562 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
563 fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
564 }
565
GetNextUserId()566 AsyncUserId AsyncManager::GetNextUserId() {
567 return taskManager_p_->GetNextUserId();
568 }
569
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)570 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
571 std::chrono::milliseconds delay,
572 const TaskCallback& callback) {
573 return taskManager_p_->ExecAsync(user_id, delay, callback);
574 }
575
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)576 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
577 AsyncUserId user_id, std::chrono::milliseconds delay,
578 std::chrono::milliseconds period, const TaskCallback& callback) {
579 return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
580 callback);
581 }
582
CancelAsyncTask(AsyncTaskId async_task_id)583 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
584 return taskManager_p_->CancelAsyncTask(async_task_id);
585 }
586
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)587 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
588 return taskManager_p_->CancelAsyncTasksFromUser(user_id);
589 }
590
Synchronize(const CriticalCallback & critical)591 void AsyncManager::Synchronize(const CriticalCallback& critical) {
592 taskManager_p_->Synchronize(critical);
593 }
594 } // namespace rootcanal
595