1 /*
2  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/task_queue_libevent.h"
12 
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <pthread.h>
16 #include <signal.h>
17 #include <stdint.h>
18 #include <time.h>
19 #include <unistd.h>
20 
21 #include <list>
22 #include <memory>
23 #include <type_traits>
24 #include <utility>
25 
26 #include "absl/container/inlined_vector.h"
27 #include "absl/strings/string_view.h"
28 #include "api/task_queue/queued_task.h"
29 #include "api/task_queue/task_queue_base.h"
30 #include "base/third_party/libevent/event.h"
31 #include "rtc_base/checks.h"
32 #include "rtc_base/logging.h"
33 #include "rtc_base/numerics/safe_conversions.h"
34 #include "rtc_base/platform_thread.h"
35 #include "rtc_base/platform_thread_types.h"
36 #include "rtc_base/synchronization/mutex.h"
37 #include "rtc_base/thread_annotations.h"
38 #include "rtc_base/time_utils.h"
39 
40 namespace webrtc {
41 namespace {
42 constexpr char kQuit = 1;
43 constexpr char kRunTasks = 2;
44 
45 using Priority = TaskQueueFactory::Priority;
46 
47 // This ignores the SIGPIPE signal on the calling thread.
48 // This signal can be fired when trying to write() to a pipe that's being
49 // closed or while closing a pipe that's being written to.
50 // We can run into that situation so we ignore this signal and continue as
51 // normal.
52 // As a side note for this implementation, it would be great if we could safely
53 // restore the sigmask, but unfortunately the operation of restoring it, can
54 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
55 // The SIGPIPE signal by default causes the process to be terminated, so we
56 // don't want to risk that.
57 // An alternative to this approach is to ignore the signal for the whole
58 // process:
59 //   signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()60 void IgnoreSigPipeSignalOnCurrentThread() {
61   sigset_t sigpipe_mask;
62   sigemptyset(&sigpipe_mask);
63   sigaddset(&sigpipe_mask, SIGPIPE);
64   pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
65 }
66 
SetNonBlocking(int fd)67 bool SetNonBlocking(int fd) {
68   const int flags = fcntl(fd, F_GETFL);
69   RTC_CHECK(flags != -1);
70   return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
71 }
72 
73 // TODO(tommi): This is a hack to support two versions of libevent that we're
74 // compatible with.  The method we really want to call is event_assign(),
75 // since event_set() has been marked as deprecated (and doesn't accept
76 // passing event_base__ as a parameter).  However, the version of libevent
77 // that we have in Chromium, doesn't have event_assign(), so we need to call
78 // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)79 void EventAssign(struct event* ev,
80                  struct event_base* base,
81                  int fd,
82                  short events,
83                  void (*callback)(int, short, void*),
84                  void* arg) {
85 #if defined(_EVENT2_EVENT_H_)
86   RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
87 #else
88   event_set(ev, fd, events, callback, arg);
89   RTC_CHECK_EQ(0, event_base_set(base, ev));
90 #endif
91 }
92 
TaskQueuePriorityToThreadPriority(Priority priority)93 rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
94   switch (priority) {
95     case Priority::HIGH:
96       return rtc::kRealtimePriority;
97     case Priority::LOW:
98       return rtc::kLowPriority;
99     case Priority::NORMAL:
100       return rtc::kNormalPriority;
101     default:
102       RTC_NOTREACHED();
103       break;
104   }
105   return rtc::kNormalPriority;
106 }
107 
108 class TaskQueueLibevent final : public TaskQueueBase {
109  public:
110   TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
111 
112   void Delete() override;
113   void PostTask(std::unique_ptr<QueuedTask> task) override;
114   void PostDelayedTask(std::unique_ptr<QueuedTask> task,
115                        uint32_t milliseconds) override;
116 
117  private:
118   class SetTimerTask;
119   struct TimerEvent;
120 
121   ~TaskQueueLibevent() override = default;
122 
123   static void ThreadMain(void* context);
124   static void OnWakeup(int socket, short flags, void* context);  // NOLINT
125   static void RunTimer(int fd, short flags, void* context);      // NOLINT
126 
127   bool is_active_ = true;
128   int wakeup_pipe_in_ = -1;
129   int wakeup_pipe_out_ = -1;
130   event_base* event_base_;
131   event wakeup_event_;
132   rtc::PlatformThread thread_;
133   Mutex pending_lock_;
134   absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
135       RTC_GUARDED_BY(pending_lock_);
136   // Holds a list of events pending timers for cleanup when the loop exits.
137   std::list<TimerEvent*> pending_timers_;
138 };
139 
140 struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anond79c7f5c0111::TaskQueueLibevent::TimerEvent141   TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
142       : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anond79c7f5c0111::TaskQueueLibevent::TimerEvent143   ~TimerEvent() { event_del(&ev); }
144 
145   event ev;
146   TaskQueueLibevent* task_queue;
147   std::unique_ptr<QueuedTask> task;
148 };
149 
150 class TaskQueueLibevent::SetTimerTask : public QueuedTask {
151  public:
SetTimerTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)152   SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
153       : task_(std::move(task)),
154         milliseconds_(milliseconds),
155         posted_(rtc::Time32()) {}
156 
157  private:
Run()158   bool Run() override {
159     // Compensate for the time that has passed since construction
160     // and until we got here.
161     uint32_t post_time = rtc::Time32() - posted_;
162     TaskQueueLibevent::Current()->PostDelayedTask(
163         std::move(task_),
164         post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
165     return true;
166   }
167 
168   std::unique_ptr<QueuedTask> task_;
169   const uint32_t milliseconds_;
170   const uint32_t posted_;
171 };
172 
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)173 TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
174                                      rtc::ThreadPriority priority)
175     : event_base_(event_base_new()),
176       thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
177   int fds[2];
178   RTC_CHECK(pipe(fds) == 0);
179   SetNonBlocking(fds[0]);
180   SetNonBlocking(fds[1]);
181   wakeup_pipe_out_ = fds[0];
182   wakeup_pipe_in_ = fds[1];
183 
184   EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
185               EV_READ | EV_PERSIST, OnWakeup, this);
186   event_add(&wakeup_event_, 0);
187   thread_.Start();
188 }
189 
Delete()190 void TaskQueueLibevent::Delete() {
191   RTC_DCHECK(!IsCurrent());
192   struct timespec ts;
193   char message = kQuit;
194   while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
195     // The queue is full, so we have no choice but to wait and retry.
196     RTC_CHECK_EQ(EAGAIN, errno);
197     ts.tv_sec = 0;
198     ts.tv_nsec = 1000000;
199     nanosleep(&ts, nullptr);
200   }
201 
202   thread_.Stop();
203 
204   event_del(&wakeup_event_);
205 
206   IgnoreSigPipeSignalOnCurrentThread();
207 
208   close(wakeup_pipe_in_);
209   close(wakeup_pipe_out_);
210   wakeup_pipe_in_ = -1;
211   wakeup_pipe_out_ = -1;
212 
213   event_base_free(event_base_);
214   delete this;
215 }
216 
PostTask(std::unique_ptr<QueuedTask> task)217 void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
218   {
219     MutexLock lock(&pending_lock_);
220     bool had_pending_tasks = !pending_.empty();
221     pending_.push_back(std::move(task));
222 
223     // Only write to the pipe if there were no pending tasks before this one
224     // since the thread could be sleeping. If there were already pending tasks
225     // then we know there's either a pending write in the pipe or the thread has
226     // not yet processed the pending tasks. In either case, the thread will
227     // eventually wake up and process all pending tasks including this one.
228     if (had_pending_tasks) {
229       return;
230     }
231   }
232 
233   // Note: This behvior outlined above ensures we never fill up the pipe write
234   // buffer since there will only ever be 1 byte pending.
235   char message = kRunTasks;
236   RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
237                sizeof(message));
238 }
239 
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)240 void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
241                                         uint32_t milliseconds) {
242   if (IsCurrent()) {
243     TimerEvent* timer = new TimerEvent(this, std::move(task));
244     EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
245                 timer);
246     pending_timers_.push_back(timer);
247     timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
248                   rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
249     event_add(&timer->ev, &tv);
250   } else {
251     PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
252   }
253 }
254 
255 // static
ThreadMain(void * context)256 void TaskQueueLibevent::ThreadMain(void* context) {
257   TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
258 
259   {
260     CurrentTaskQueueSetter set_current(me);
261     while (me->is_active_)
262       event_base_loop(me->event_base_, 0);
263   }
264 
265   for (TimerEvent* timer : me->pending_timers_)
266     delete timer;
267 }
268 
269 // static
OnWakeup(int socket,short flags,void * context)270 void TaskQueueLibevent::OnWakeup(int socket,
271                                  short flags,  // NOLINT
272                                  void* context) {
273   TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
274   RTC_DCHECK(me->wakeup_pipe_out_ == socket);
275   char buf;
276   RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
277   switch (buf) {
278     case kQuit:
279       me->is_active_ = false;
280       event_base_loopbreak(me->event_base_);
281       break;
282     case kRunTasks: {
283       absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
284       {
285         MutexLock lock(&me->pending_lock_);
286         tasks.swap(me->pending_);
287       }
288       RTC_DCHECK(!tasks.empty());
289       for (auto& task : tasks) {
290         if (task->Run()) {
291           task.reset();
292         } else {
293           // |false| means the task should *not* be deleted.
294           task.release();
295         }
296       }
297       break;
298     }
299     default:
300       RTC_NOTREACHED();
301       break;
302   }
303 }
304 
305 // static
RunTimer(int fd,short flags,void * context)306 void TaskQueueLibevent::RunTimer(int fd,
307                                  short flags,  // NOLINT
308                                  void* context) {
309   TimerEvent* timer = static_cast<TimerEvent*>(context);
310   if (!timer->task->Run())
311     timer->task.release();
312   timer->task_queue->pending_timers_.remove(timer);
313   delete timer;
314 }
315 
316 class TaskQueueLibeventFactory final : public TaskQueueFactory {
317  public:
CreateTaskQueue(absl::string_view name,Priority priority) const318   std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
319       absl::string_view name,
320       Priority priority) const override {
321     return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
322         new TaskQueueLibevent(name,
323                               TaskQueuePriorityToThreadPriority(priority)));
324   }
325 };
326 
327 }  // namespace
328 
CreateTaskQueueLibeventFactory()329 std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
330   return std::make_unique<TaskQueueLibeventFactory>();
331 }
332 
333 }  // namespace webrtc
334