1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11 #include "rtc_base/task_queue_libevent.h"
12
13 #include <errno.h>
14 #include <fcntl.h>
15 #include <pthread.h>
16 #include <signal.h>
17 #include <stdint.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <list>
22 #include <memory>
23 #include <type_traits>
24 #include <utility>
25
26 #include "absl/container/inlined_vector.h"
27 #include "absl/strings/string_view.h"
28 #include "api/task_queue/queued_task.h"
29 #include "api/task_queue/task_queue_base.h"
30 #include "base/third_party/libevent/event.h"
31 #include "rtc_base/checks.h"
32 #include "rtc_base/logging.h"
33 #include "rtc_base/numerics/safe_conversions.h"
34 #include "rtc_base/platform_thread.h"
35 #include "rtc_base/platform_thread_types.h"
36 #include "rtc_base/synchronization/mutex.h"
37 #include "rtc_base/thread_annotations.h"
38 #include "rtc_base/time_utils.h"
39
40 namespace webrtc {
41 namespace {
42 constexpr char kQuit = 1;
43 constexpr char kRunTasks = 2;
44
45 using Priority = TaskQueueFactory::Priority;
46
47 // This ignores the SIGPIPE signal on the calling thread.
48 // This signal can be fired when trying to write() to a pipe that's being
49 // closed or while closing a pipe that's being written to.
50 // We can run into that situation so we ignore this signal and continue as
51 // normal.
52 // As a side note for this implementation, it would be great if we could safely
53 // restore the sigmask, but unfortunately the operation of restoring it, can
54 // itself actually cause SIGPIPE to be signaled :-| (e.g. on MacOS)
55 // The SIGPIPE signal by default causes the process to be terminated, so we
56 // don't want to risk that.
57 // An alternative to this approach is to ignore the signal for the whole
58 // process:
59 // signal(SIGPIPE, SIG_IGN);
IgnoreSigPipeSignalOnCurrentThread()60 void IgnoreSigPipeSignalOnCurrentThread() {
61 sigset_t sigpipe_mask;
62 sigemptyset(&sigpipe_mask);
63 sigaddset(&sigpipe_mask, SIGPIPE);
64 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
65 }
66
SetNonBlocking(int fd)67 bool SetNonBlocking(int fd) {
68 const int flags = fcntl(fd, F_GETFL);
69 RTC_CHECK(flags != -1);
70 return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
71 }
72
73 // TODO(tommi): This is a hack to support two versions of libevent that we're
74 // compatible with. The method we really want to call is event_assign(),
75 // since event_set() has been marked as deprecated (and doesn't accept
76 // passing event_base__ as a parameter). However, the version of libevent
77 // that we have in Chromium, doesn't have event_assign(), so we need to call
78 // event_set() there.
EventAssign(struct event * ev,struct event_base * base,int fd,short events,void (* callback)(int,short,void *),void * arg)79 void EventAssign(struct event* ev,
80 struct event_base* base,
81 int fd,
82 short events,
83 void (*callback)(int, short, void*),
84 void* arg) {
85 #if defined(_EVENT2_EVENT_H_)
86 RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
87 #else
88 event_set(ev, fd, events, callback, arg);
89 RTC_CHECK_EQ(0, event_base_set(base, ev));
90 #endif
91 }
92
TaskQueuePriorityToThreadPriority(Priority priority)93 rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
94 switch (priority) {
95 case Priority::HIGH:
96 return rtc::kRealtimePriority;
97 case Priority::LOW:
98 return rtc::kLowPriority;
99 case Priority::NORMAL:
100 return rtc::kNormalPriority;
101 default:
102 RTC_NOTREACHED();
103 break;
104 }
105 return rtc::kNormalPriority;
106 }
107
108 class TaskQueueLibevent final : public TaskQueueBase {
109 public:
110 TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
111
112 void Delete() override;
113 void PostTask(std::unique_ptr<QueuedTask> task) override;
114 void PostDelayedTask(std::unique_ptr<QueuedTask> task,
115 uint32_t milliseconds) override;
116
117 private:
118 class SetTimerTask;
119 struct TimerEvent;
120
121 ~TaskQueueLibevent() override = default;
122
123 static void ThreadMain(void* context);
124 static void OnWakeup(int socket, short flags, void* context); // NOLINT
125 static void RunTimer(int fd, short flags, void* context); // NOLINT
126
127 bool is_active_ = true;
128 int wakeup_pipe_in_ = -1;
129 int wakeup_pipe_out_ = -1;
130 event_base* event_base_;
131 event wakeup_event_;
132 rtc::PlatformThread thread_;
133 Mutex pending_lock_;
134 absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
135 RTC_GUARDED_BY(pending_lock_);
136 // Holds a list of events pending timers for cleanup when the loop exits.
137 std::list<TimerEvent*> pending_timers_;
138 };
139
140 struct TaskQueueLibevent::TimerEvent {
TimerEventwebrtc::__anond79c7f5c0111::TaskQueueLibevent::TimerEvent141 TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
142 : task_queue(task_queue), task(std::move(task)) {}
~TimerEventwebrtc::__anond79c7f5c0111::TaskQueueLibevent::TimerEvent143 ~TimerEvent() { event_del(&ev); }
144
145 event ev;
146 TaskQueueLibevent* task_queue;
147 std::unique_ptr<QueuedTask> task;
148 };
149
150 class TaskQueueLibevent::SetTimerTask : public QueuedTask {
151 public:
SetTimerTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)152 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
153 : task_(std::move(task)),
154 milliseconds_(milliseconds),
155 posted_(rtc::Time32()) {}
156
157 private:
Run()158 bool Run() override {
159 // Compensate for the time that has passed since construction
160 // and until we got here.
161 uint32_t post_time = rtc::Time32() - posted_;
162 TaskQueueLibevent::Current()->PostDelayedTask(
163 std::move(task_),
164 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
165 return true;
166 }
167
168 std::unique_ptr<QueuedTask> task_;
169 const uint32_t milliseconds_;
170 const uint32_t posted_;
171 };
172
TaskQueueLibevent(absl::string_view queue_name,rtc::ThreadPriority priority)173 TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
174 rtc::ThreadPriority priority)
175 : event_base_(event_base_new()),
176 thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
177 int fds[2];
178 RTC_CHECK(pipe(fds) == 0);
179 SetNonBlocking(fds[0]);
180 SetNonBlocking(fds[1]);
181 wakeup_pipe_out_ = fds[0];
182 wakeup_pipe_in_ = fds[1];
183
184 EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
185 EV_READ | EV_PERSIST, OnWakeup, this);
186 event_add(&wakeup_event_, 0);
187 thread_.Start();
188 }
189
Delete()190 void TaskQueueLibevent::Delete() {
191 RTC_DCHECK(!IsCurrent());
192 struct timespec ts;
193 char message = kQuit;
194 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
195 // The queue is full, so we have no choice but to wait and retry.
196 RTC_CHECK_EQ(EAGAIN, errno);
197 ts.tv_sec = 0;
198 ts.tv_nsec = 1000000;
199 nanosleep(&ts, nullptr);
200 }
201
202 thread_.Stop();
203
204 event_del(&wakeup_event_);
205
206 IgnoreSigPipeSignalOnCurrentThread();
207
208 close(wakeup_pipe_in_);
209 close(wakeup_pipe_out_);
210 wakeup_pipe_in_ = -1;
211 wakeup_pipe_out_ = -1;
212
213 event_base_free(event_base_);
214 delete this;
215 }
216
PostTask(std::unique_ptr<QueuedTask> task)217 void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
218 {
219 MutexLock lock(&pending_lock_);
220 bool had_pending_tasks = !pending_.empty();
221 pending_.push_back(std::move(task));
222
223 // Only write to the pipe if there were no pending tasks before this one
224 // since the thread could be sleeping. If there were already pending tasks
225 // then we know there's either a pending write in the pipe or the thread has
226 // not yet processed the pending tasks. In either case, the thread will
227 // eventually wake up and process all pending tasks including this one.
228 if (had_pending_tasks) {
229 return;
230 }
231 }
232
233 // Note: This behvior outlined above ensures we never fill up the pipe write
234 // buffer since there will only ever be 1 byte pending.
235 char message = kRunTasks;
236 RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
237 sizeof(message));
238 }
239
PostDelayedTask(std::unique_ptr<QueuedTask> task,uint32_t milliseconds)240 void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
241 uint32_t milliseconds) {
242 if (IsCurrent()) {
243 TimerEvent* timer = new TimerEvent(this, std::move(task));
244 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
245 timer);
246 pending_timers_.push_back(timer);
247 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
248 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
249 event_add(&timer->ev, &tv);
250 } else {
251 PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
252 }
253 }
254
255 // static
ThreadMain(void * context)256 void TaskQueueLibevent::ThreadMain(void* context) {
257 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
258
259 {
260 CurrentTaskQueueSetter set_current(me);
261 while (me->is_active_)
262 event_base_loop(me->event_base_, 0);
263 }
264
265 for (TimerEvent* timer : me->pending_timers_)
266 delete timer;
267 }
268
269 // static
OnWakeup(int socket,short flags,void * context)270 void TaskQueueLibevent::OnWakeup(int socket,
271 short flags, // NOLINT
272 void* context) {
273 TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
274 RTC_DCHECK(me->wakeup_pipe_out_ == socket);
275 char buf;
276 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
277 switch (buf) {
278 case kQuit:
279 me->is_active_ = false;
280 event_base_loopbreak(me->event_base_);
281 break;
282 case kRunTasks: {
283 absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
284 {
285 MutexLock lock(&me->pending_lock_);
286 tasks.swap(me->pending_);
287 }
288 RTC_DCHECK(!tasks.empty());
289 for (auto& task : tasks) {
290 if (task->Run()) {
291 task.reset();
292 } else {
293 // |false| means the task should *not* be deleted.
294 task.release();
295 }
296 }
297 break;
298 }
299 default:
300 RTC_NOTREACHED();
301 break;
302 }
303 }
304
305 // static
RunTimer(int fd,short flags,void * context)306 void TaskQueueLibevent::RunTimer(int fd,
307 short flags, // NOLINT
308 void* context) {
309 TimerEvent* timer = static_cast<TimerEvent*>(context);
310 if (!timer->task->Run())
311 timer->task.release();
312 timer->task_queue->pending_timers_.remove(timer);
313 delete timer;
314 }
315
316 class TaskQueueLibeventFactory final : public TaskQueueFactory {
317 public:
CreateTaskQueue(absl::string_view name,Priority priority) const318 std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
319 absl::string_view name,
320 Priority priority) const override {
321 return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
322 new TaskQueueLibevent(name,
323 TaskQueuePriorityToThreadPriority(priority)));
324 }
325 };
326
327 } // namespace
328
CreateTaskQueueLibeventFactory()329 std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
330 return std::make_unique<TaskQueueLibeventFactory>();
331 }
332
333 } // namespace webrtc
334