1 /*
2  * Copyright 2016 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "async_manager.h"
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <fcntl.h>
23 #include <mutex>
24 #include <sys/select.h>
25 #include <thread>
26 #include <unistd.h>
27 #include <vector>
28 
29 #include "log.h"
30 
31 #ifndef TEMP_FAILURE_RETRY
32 /* Used to retry syscalls that can return EINTR. */
33 #define TEMP_FAILURE_RETRY(exp) ({         \
34     __typeof__(exp) _rc;                   \
35     do {                                   \
36         _rc = (exp);                       \
37     } while (_rc == -1 && errno == EINTR); \
38     _rc; })
39 #endif  // TEMP_FAILURE_RETRY
40 
41 namespace rootcanal {
42 // Implementation of AsyncManager is divided between two classes, three if
43 // AsyncManager itself is taken into account, but its only responsability
44 // besides being a proxy for the other two classes is to provide a global
45 // synchronization mechanism for callbacks and client code to use.
46 
47 // The watching of file descriptors is done through AsyncFdWatcher. Several
48 // objects of this class may coexist simultaneosly as they share no state.
49 // After construction of this objects nothing happens beyond some very simple
50 // member initialization. When the first FD is set up for watching the object
51 // starts a new thread which watches the given (and later provided) FDs using
52 // select() inside a loop. A special FD (a pipe) is also watched which is
53 // used to notify the thread of internal changes on the object state (like
54 // the addition of new FDs to watch on). Every access to internal state is
55 // synchronized using a single internal mutex. The thread is only stopped on
56 // destruction of the object, by modifying a flag, which is the only member
57 // variable accessed without acquiring the lock (because the notification to
58 // the thread is done later by writing to a pipe which means the thread will
59 // be notified regardless of what phase of the loop it is in that moment)
60 
61 // The scheduling of asynchronous tasks, periodic or not, is handled by the
62 // AsyncTaskManager class. Like the one for FDs, this class shares no internal
63 // state between different instances so it is safe to use several objects of
64 // this class, also nothing interesting happens upon construction, but only
65 // after a Task has been scheduled and access to internal state is synchronized
66 // using a single internal mutex. When the first task is scheduled a thread
67 // is started which monitors a queue of tasks. The queue is peeked to see
68 // when the next task should be carried out and then the thread performs a
69 // (absolute) timed wait on a condition variable. The wait ends because of a
70 // time out or a notify on the cond var, the former means a task is due
71 // for execution while the later means there has been a change in internal
72 // state, like a task has been scheduled/canceled or the flag to stop has
73 // been set. Setting and querying the stop flag or modifying the task queue
74 // and subsequent notification on the cond var is done atomically (e.g while
75 // holding the lock on the internal mutex) to ensure that the thread never
76 // misses the notification, since notifying a cond var is not persistent as
77 // writing on a pipe (if not done this way, the thread could query the
78 // stopping flag and be put aside by the OS scheduler right after, then the
79 // 'stop thread' procedure could run, setting the flag, notifying a cond
80 // var that no one is waiting on and joining the thread, the thread then
81 // resumes execution believing that it needs to continue and waits on the
82 // cond var possibly forever if there are no tasks scheduled, efectively
83 // causing a deadlock).
84 
85 // This number also states the maximum number of scheduled tasks we can handle
86 // at a given time
87 static const uint16_t kMaxTaskId =
88     -1; /* 2^16 - 1, permisible ids are {1..2^16-1}*/
NextAsyncTaskId(const AsyncTaskId id)89 static inline AsyncTaskId NextAsyncTaskId(const AsyncTaskId id) {
90   return (id == kMaxTaskId) ? 1 : id + 1;
91 }
92 // The buffer is only 10 bytes because the expected number of bytes
93 // written on this socket is 1. It is possible that the thread is notified
94 // more than once but highly unlikely, so a buffer of size 10 seems enough
95 // and the reads are performed inside a while just in case it isn't. From
96 // the thread routine's point of view it is the same to have been notified
97 // just once or 100 times so it just tries to consume the entire buffer.
98 // In the cases where an interrupt would cause read to return without
99 // having read everything that was available a new iteration of the thread
100 // loop will bring execution to this point almost immediately, so there is
101 // no need to treat that case.
102 static const int kNotificationBufferSize = 10;
103 
104 // Async File Descriptor Watcher Implementation:
105 class AsyncManager::AsyncFdWatcher {
106  public:
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)107   int WatchFdForNonBlockingReads(
108       int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
109     // add file descriptor and callback
110     {
111       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
112       watched_shared_fds_[file_descriptor] = on_read_fd_ready_callback;
113     }
114 
115     // start the thread if not started yet
116     int started = tryStartThread();
117     if (started != 0) {
118       ERROR("{}: Unable to start thread", __func__);
119       return started;
120     }
121 
122     // notify the thread so that it knows of the new FD
123     notifyThread();
124 
125     return 0;
126   }
127 
StopWatchingFileDescriptor(int file_descriptor)128   void StopWatchingFileDescriptor(int file_descriptor) {
129     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
130     watched_shared_fds_.erase(file_descriptor);
131   }
132 
133   AsyncFdWatcher() = default;
134   AsyncFdWatcher(const AsyncFdWatcher&) = delete;
135   AsyncFdWatcher& operator=(const AsyncFdWatcher&) = delete;
136 
137   ~AsyncFdWatcher() = default;
138 
stopThread()139   int stopThread() {
140     if (!std::atomic_exchange(&running_, false)) {
141       return 0;  // if not running already
142     }
143 
144     notifyThread();
145 
146     if (std::this_thread::get_id() != thread_.get_id()) {
147       thread_.join();
148     } else {
149       WARNING("{}: Starting thread stop from inside the reading thread itself",
150               __func__);
151     }
152 
153     {
154       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
155       watched_shared_fds_.clear();
156     }
157 
158     return 0;
159   }
160 
161  private:
162   // Make sure to call this with at least one file descriptor ready to be
163   // watched upon or the thread routine will return immediately
tryStartThread()164   int tryStartThread() {
165     if (std::atomic_exchange(&running_, true)) {
166       return 0;  // if already running
167     }
168     // set up the communication channel
169     int pipe_fds[2];
170     if (pipe(pipe_fds)) {
171       ERROR(
172           "{}: Unable to establish a communication channel to the reading "
173           "thread",
174           __func__);
175       return -1;
176     }
177     // configure the fds as non blocking.
178     if (fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK) ||
179         fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK)) {
180       ERROR(
181           "{}: Unable to configure the communication channel to the reading "
182           "thread",
183           __func__);
184       return -1;
185     }
186 
187     notification_listen_fd_ = pipe_fds[0];
188     notification_write_fd_ = pipe_fds[1];
189 
190     thread_ = std::thread([this]() { ThreadRoutine(); });
191     if (!thread_.joinable()) {
192       ERROR("{}: Unable to start reading thread", __func__);
193       return -1;
194     }
195     return 0;
196   }
197 
notifyThread() const198   int notifyThread() const {
199     char buffer = '0';
200     if (TEMP_FAILURE_RETRY(write(notification_write_fd_, &buffer, 1)) < 0) {
201       ERROR("{}: Unable to send message to reading thread", __func__);
202       return -1;
203     }
204     return 0;
205   }
206 
setUpFileDescriptorSet(fd_set & read_fds)207   int setUpFileDescriptorSet(fd_set& read_fds) {
208     // add comm channel to the set
209     FD_SET(notification_listen_fd_, &read_fds);
210     int nfds = notification_listen_fd_;
211 
212     // add watched FDs to the set
213     {
214       std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
215       for (auto& fdp : watched_shared_fds_) {
216         FD_SET(fdp.first, &read_fds);
217         nfds = std::max(fdp.first, nfds);
218       }
219     }
220     return nfds;
221   }
222 
223   // check the comm channel and read everything there
consumeThreadNotifications(fd_set & read_fds) const224   bool consumeThreadNotifications(fd_set& read_fds) const {
225     if (FD_ISSET(notification_listen_fd_, &read_fds)) {
226       char buffer[kNotificationBufferSize];
227       while (TEMP_FAILURE_RETRY(read(notification_listen_fd_, buffer,
228                                      kNotificationBufferSize)) ==
229              kNotificationBufferSize) {
230       }
231       return true;
232     }
233     return false;
234   }
235 
236   // check all file descriptors and call callbacks if necesary
runAppropriateCallbacks(fd_set & read_fds)237   void runAppropriateCallbacks(fd_set& read_fds) {
238     std::vector<decltype(watched_shared_fds_)::value_type> fds;
239     std::unique_lock<std::recursive_mutex> guard(internal_mutex_);
240     for (auto& fdc : watched_shared_fds_) {
241       if (FD_ISSET(fdc.first, &read_fds)) {
242         fds.push_back(fdc);
243       }
244     }
245     for (auto& p : fds) {
246       p.second(p.first);
247     }
248   }
249 
ThreadRoutine()250   void ThreadRoutine() {
251     while (running_) {
252       fd_set read_fds;
253       FD_ZERO(&read_fds);
254       int nfds = setUpFileDescriptorSet(read_fds);
255 
256       // wait until there is data available to read on some FD
257       int retval = select(nfds + 1, &read_fds, NULL, NULL, NULL);
258       if (retval <= 0) {  // there was some error or a timeout
259         ERROR(
260             "{}: There was an error while waiting for data on the file "
261             "descriptors: {}",
262             __func__, strerror(errno));
263         continue;
264       }
265 
266       consumeThreadNotifications(read_fds);
267 
268       // Do not read if there was a call to stop running
269       if (!running_) {
270         break;
271       }
272 
273       runAppropriateCallbacks(read_fds);
274     }
275   }
276 
277   std::atomic_bool running_{false};
278   std::thread thread_;
279   std::recursive_mutex internal_mutex_;
280 
281   std::map<int, ReadCallback> watched_shared_fds_;
282 
283   // A pair of FD to send information to the reading thread
284   int notification_listen_fd_{};
285   int notification_write_fd_{};
286 };
287 
288 // Async task manager implementation
289 class AsyncManager::AsyncTaskManager {
290  public:
GetNextUserId()291   AsyncUserId GetNextUserId() { return lastUserId_++; }
292 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)293   AsyncTaskId ExecAsync(AsyncUserId user_id, std::chrono::milliseconds delay,
294                         const TaskCallback& callback) {
295     return scheduleTask(std::make_shared<Task>(
296         std::chrono::steady_clock::now() + delay, callback, user_id));
297   }
298 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)299   AsyncTaskId ExecAsyncPeriodically(AsyncUserId user_id,
300                                     std::chrono::milliseconds delay,
301                                     std::chrono::milliseconds period,
302                                     const TaskCallback& callback) {
303     return scheduleTask(std::make_shared<Task>(
304         std::chrono::steady_clock::now() + delay, period, callback, user_id));
305   }
306 
CancelAsyncTask(AsyncTaskId async_task_id)307   bool CancelAsyncTask(AsyncTaskId async_task_id) {
308     // remove task from queue (and task id association) while holding lock
309     std::unique_lock<std::mutex> guard(internal_mutex_);
310     return cancel_task_with_lock_held(async_task_id);
311   }
312 
CancelAsyncTasksFromUser(AsyncUserId user_id)313   bool CancelAsyncTasksFromUser(AsyncUserId user_id) {
314     // remove task from queue (and task id association) while holding lock
315     std::unique_lock<std::mutex> guard(internal_mutex_);
316     if (tasks_by_user_id_.count(user_id) == 0) {
317       return false;
318     }
319     for (auto task : tasks_by_user_id_[user_id]) {
320       cancel_task_with_lock_held(task);
321     }
322     tasks_by_user_id_.erase(user_id);
323     return true;
324   }
325 
Synchronize(const CriticalCallback & critical)326   void Synchronize(const CriticalCallback& critical) {
327     std::unique_lock<std::mutex> guard(synchronization_mutex_);
328     critical();
329   }
330 
331   AsyncTaskManager() = default;
332   AsyncTaskManager(const AsyncTaskManager&) = delete;
333   AsyncTaskManager& operator=(const AsyncTaskManager&) = delete;
334 
335   ~AsyncTaskManager() = default;
336 
stopThread()337   int stopThread() {
338     {
339       std::unique_lock<std::mutex> guard(internal_mutex_);
340       tasks_by_id_.clear();
341       task_queue_.clear();
342       if (!running_) {
343         return 0;
344       }
345       running_ = false;
346       // notify the thread
347       internal_cond_var_.notify_one();
348     }  // release the lock before joining a thread that is likely waiting for it
349     if (std::this_thread::get_id() != thread_.get_id()) {
350       thread_.join();
351     } else {
352       WARNING("{}: Starting thread stop from inside the task thread itself",
353               __func__);
354     }
355     return 0;
356   }
357 
358  private:
359   // Holds the data for each task
360   class Task {
361    public:
Task(std::chrono::steady_clock::time_point time,std::chrono::milliseconds period,const TaskCallback & callback,AsyncUserId user)362     Task(std::chrono::steady_clock::time_point time,
363          std::chrono::milliseconds period, const TaskCallback& callback,
364          AsyncUserId user)
365         : time(time),
366           periodic(true),
367           period(period),
368           callback(callback),
369           task_id(kInvalidTaskId),
370           user_id(user) {}
Task(std::chrono::steady_clock::time_point time,const TaskCallback & callback,AsyncUserId user)371     Task(std::chrono::steady_clock::time_point time,
372          const TaskCallback& callback, AsyncUserId user)
373         : time(time),
374           periodic(false),
375           callback(callback),
376           task_id(kInvalidTaskId),
377           user_id(user) {}
378 
379     // Operators needed to be in a collection
operator <(const Task & another) const380     bool operator<(const Task& another) const {
381       return std::make_pair(time, task_id) <
382              std::make_pair(another.time, another.task_id);
383     }
384 
isPeriodic() const385     bool isPeriodic() const { return periodic; }
386 
387     // These fields should no longer be public if the class ever becomes
388     // public or gets more complex
389     std::chrono::steady_clock::time_point time;
390     bool periodic;
391     std::chrono::milliseconds period{};
392     std::mutex in_callback; // Taken when the callback is active
393     TaskCallback callback;
394     AsyncTaskId task_id;
395     AsyncUserId user_id;
396   };
397 
398   // A comparator class to put shared pointers to tasks in an ordered set
399   struct task_p_comparator {
operator ()rootcanal::AsyncManager::AsyncTaskManager::task_p_comparator400     bool operator()(const std::shared_ptr<Task>& t1,
401                     const std::shared_ptr<Task>& t2) const {
402       return *t1 < *t2;
403     }
404   };
405 
cancel_task_with_lock_held(AsyncTaskId async_task_id)406   bool cancel_task_with_lock_held(AsyncTaskId async_task_id) {
407     if (tasks_by_id_.count(async_task_id) == 0) {
408       return false;
409     }
410 
411     // Now make sure we are not running this task.
412     // 2 cases:
413     // - This is called from thread_, this means a running
414     //   scheduled task is actually unregistering. All bets are off.
415     // - Another thread is calling us, let's make sure the task is not active.
416     if (thread_.get_id() != std::this_thread::get_id()) {
417       auto task = tasks_by_id_[async_task_id];
418       const std::lock_guard<std::mutex> lock(task->in_callback);
419       task_queue_.erase(task);
420       tasks_by_id_.erase(async_task_id);
421     } else {
422       task_queue_.erase(tasks_by_id_[async_task_id]);
423       tasks_by_id_.erase(async_task_id);
424     }
425 
426     return true;
427   }
428 
scheduleTask(const std::shared_ptr<Task> & task)429   AsyncTaskId scheduleTask(const std::shared_ptr<Task>& task) {
430     {
431       std::unique_lock<std::mutex> guard(internal_mutex_);
432       // no more room for new tasks, we need a larger type for IDs
433       if (tasks_by_id_.size() == kMaxTaskId) {  // TODO potentially type unsafe
434         return kInvalidTaskId;
435       }
436       do {
437         lastTaskId_ = NextAsyncTaskId(lastTaskId_);
438       } while (isTaskIdInUse(lastTaskId_));
439       task->task_id = lastTaskId_;
440       // add task to the queue and map
441       tasks_by_id_[lastTaskId_] = task;
442       tasks_by_user_id_[task->user_id].insert(task->task_id);
443       task_queue_.insert(task);
444     }
445     // start thread if necessary
446     int started = tryStartThread();
447     if (started != 0) {
448       ERROR("{}: Unable to start thread", __func__);
449       return kInvalidTaskId;
450     }
451     // notify the thread so that it knows of the new task
452     internal_cond_var_.notify_one();
453     // return task id
454     return task->task_id;
455   }
456 
isTaskIdInUse(const AsyncTaskId & task_id) const457   bool isTaskIdInUse(const AsyncTaskId& task_id) const {
458     return tasks_by_id_.count(task_id) != 0;
459   }
460 
tryStartThread()461   int tryStartThread() {
462     // need the lock because of the running flag and the cond var
463     std::unique_lock<std::mutex> guard(internal_mutex_);
464     // check that the thread is not yet running
465     if (running_) {
466       return 0;
467     }
468     // start the thread
469     running_ = true;
470     thread_ = std::thread([this]() { ThreadRoutine(); });
471     if (!thread_.joinable()) {
472       ERROR("{}: Unable to start task thread", __func__);
473       return -1;
474     }
475     return 0;
476   }
477 
ThreadRoutine()478   void ThreadRoutine() {
479     while (running_) {
480       TaskCallback callback;
481       std::shared_ptr<Task> task_p;
482       bool run_it = false;
483       {
484         std::unique_lock<std::mutex> guard(internal_mutex_);
485         if (!task_queue_.empty()) {
486           task_p = *(task_queue_.begin());
487           if (task_p->time < std::chrono::steady_clock::now()) {
488             run_it = true;
489             callback = task_p->callback;
490             task_queue_.erase(task_p);  // need to remove and add again if
491                                         // periodic to update order
492             if (task_p->isPeriodic()) {
493               task_p->time += task_p->period;
494               task_queue_.insert(task_p);
495             } else {
496               tasks_by_user_id_[task_p->user_id].erase(task_p->task_id);
497               tasks_by_id_.erase(task_p->task_id);
498             }
499           }
500         }
501       }
502       if (run_it) {
503         const std::lock_guard<std::mutex> lock(task_p->in_callback);
504         Synchronize(callback);
505       }
506       {
507         std::unique_lock<std::mutex> guard(internal_mutex_);
508         // check for termination right before waiting
509         if (!running_) {
510           break;
511         }
512         // wait until time for the next task (if any)
513         if (!task_queue_.empty()) {
514           // Make a copy of the time_point because wait_until takes a reference
515           // to it and may read it after waiting, by which time the task may
516           // have been freed (e.g. via CancelAsyncTask).
517           std::chrono::steady_clock::time_point time =
518               (*task_queue_.begin())->time;
519           internal_cond_var_.wait_until(guard, time);
520         } else {
521           internal_cond_var_.wait(guard);
522         }
523       }
524     }
525   }
526 
527   bool running_ = false;
528   std::thread thread_;
529   std::mutex internal_mutex_;
530   std::mutex synchronization_mutex_;
531   std::condition_variable internal_cond_var_;
532 
533   AsyncTaskId lastTaskId_ = kInvalidTaskId;
534   AsyncUserId lastUserId_{1};
535   std::map<AsyncTaskId, std::shared_ptr<Task>> tasks_by_id_;
536   std::map<AsyncUserId, std::set<AsyncTaskId>> tasks_by_user_id_;
537   std::set<std::shared_ptr<Task>, task_p_comparator> task_queue_;
538 };
539 
540 // Async Manager Implementation:
AsyncManager()541 AsyncManager::AsyncManager()
542     : fdWatcher_p_(new AsyncFdWatcher()),
543       taskManager_p_(new AsyncTaskManager()) {}
544 
~AsyncManager()545 AsyncManager::~AsyncManager() {
546   // Make sure the threads are stopped before destroying the object.
547   // The threads need to be stopped here and not in each internal class'
548   // destructor because unique_ptr's reset() first assigns nullptr to the
549   // pointer and only then calls the destructor, so any callback running
550   // on these threads would dereference a null pointer if they called a member
551   // function of this class.
552   fdWatcher_p_->stopThread();
553   taskManager_p_->stopThread();
554 }
555 
WatchFdForNonBlockingReads(int file_descriptor,const ReadCallback & on_read_fd_ready_callback)556 int AsyncManager::WatchFdForNonBlockingReads(
557     int file_descriptor, const ReadCallback& on_read_fd_ready_callback) {
558   return fdWatcher_p_->WatchFdForNonBlockingReads(file_descriptor,
559                                                   on_read_fd_ready_callback);
560 }
561 
StopWatchingFileDescriptor(int file_descriptor)562 void AsyncManager::StopWatchingFileDescriptor(int file_descriptor) {
563   fdWatcher_p_->StopWatchingFileDescriptor(file_descriptor);
564 }
565 
GetNextUserId()566 AsyncUserId AsyncManager::GetNextUserId() {
567   return taskManager_p_->GetNextUserId();
568 }
569 
ExecAsync(AsyncUserId user_id,std::chrono::milliseconds delay,const TaskCallback & callback)570 AsyncTaskId AsyncManager::ExecAsync(AsyncUserId user_id,
571                                     std::chrono::milliseconds delay,
572                                     const TaskCallback& callback) {
573   return taskManager_p_->ExecAsync(user_id, delay, callback);
574 }
575 
ExecAsyncPeriodically(AsyncUserId user_id,std::chrono::milliseconds delay,std::chrono::milliseconds period,const TaskCallback & callback)576 AsyncTaskId AsyncManager::ExecAsyncPeriodically(
577     AsyncUserId user_id, std::chrono::milliseconds delay,
578     std::chrono::milliseconds period, const TaskCallback& callback) {
579   return taskManager_p_->ExecAsyncPeriodically(user_id, delay, period,
580                                                callback);
581 }
582 
CancelAsyncTask(AsyncTaskId async_task_id)583 bool AsyncManager::CancelAsyncTask(AsyncTaskId async_task_id) {
584   return taskManager_p_->CancelAsyncTask(async_task_id);
585 }
586 
CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id)587 bool AsyncManager::CancelAsyncTasksFromUser(rootcanal::AsyncUserId user_id) {
588   return taskManager_p_->CancelAsyncTasksFromUser(user_id);
589 }
590 
Synchronize(const CriticalCallback & critical)591 void AsyncManager::Synchronize(const CriticalCallback& critical) {
592   taskManager_p_->Synchronize(critical);
593 }
594 }  // namespace rootcanal
595