1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "perfetto/base/build_config.h"
18
19 #include "perfetto/ext/base/unix_task_runner.h"
20
21 #include <errno.h>
22 #include <stdlib.h>
23
24 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
25 #include <Windows.h>
26 #include <synchapi.h>
27 #else
28 #include <unistd.h>
29 #endif
30
31 #include <algorithm>
32 #include <limits>
33
34 #include "perfetto/ext/base/watchdog.h"
35
36 namespace perfetto {
37 namespace base {
38
UnixTaskRunner()39 UnixTaskRunner::UnixTaskRunner() {
40 AddFileDescriptorWatch(event_.fd(), [] {
41 // Not reached -- see PostFileDescriptorWatches().
42 PERFETTO_DFATAL("Should be unreachable.");
43 });
44 }
45
46 UnixTaskRunner::~UnixTaskRunner() = default;
47
WakeUp()48 void UnixTaskRunner::WakeUp() {
49 event_.Notify();
50 }
51
Run()52 void UnixTaskRunner::Run() {
53 PERFETTO_DCHECK_THREAD(thread_checker_);
54 created_thread_id_ = GetThreadId();
55 quit_ = false;
56 for (;;) {
57 int poll_timeout_ms;
58 {
59 std::lock_guard<std::mutex> lock(lock_);
60 if (quit_)
61 return;
62 poll_timeout_ms = GetDelayMsToNextTaskLocked();
63 UpdateWatchTasksLocked();
64 }
65
66 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
67 DWORD timeout =
68 poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
69 DWORD ret =
70 WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
71 &poll_fds_[0], /*bWaitAll=*/false, timeout);
72 // Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
73 // set, even when >1 is signalled. In order to avoid starvation,
74 // PostFileDescriptorWatches() will WaitForSingleObject() each other handle
75 // to ensure fairness. |ret| here is passed just to avoid an extra
76 // WaitForSingleObject() for the one handle that WaitForMultipleObject()
77 // returned.
78 PostFileDescriptorWatches(ret);
79 #else
80 int ret = PERFETTO_EINTR(poll(
81 &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
82 PERFETTO_CHECK(ret >= 0);
83 PostFileDescriptorWatches(0 /*ignored*/);
84 #endif
85
86 // To avoid starvation we always interleave all types of tasks -- immediate,
87 // delayed and file descriptor watches.
88 RunImmediateAndDelayedTask();
89 }
90 }
91
Quit()92 void UnixTaskRunner::Quit() {
93 std::lock_guard<std::mutex> lock(lock_);
94 quit_ = true;
95 WakeUp();
96 }
97
QuitCalled()98 bool UnixTaskRunner::QuitCalled() {
99 std::lock_guard<std::mutex> lock(lock_);
100 return quit_;
101 }
102
IsIdleForTesting()103 bool UnixTaskRunner::IsIdleForTesting() {
104 std::lock_guard<std::mutex> lock(lock_);
105 return immediate_tasks_.empty();
106 }
107
UpdateWatchTasksLocked()108 void UnixTaskRunner::UpdateWatchTasksLocked() {
109 PERFETTO_DCHECK_THREAD(thread_checker_);
110 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
111 if (!watch_tasks_changed_)
112 return;
113 watch_tasks_changed_ = false;
114 #endif
115 poll_fds_.clear();
116 for (auto& it : watch_tasks_) {
117 PlatformHandle handle = it.first;
118 WatchTask& watch_task = it.second;
119 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
120 if (!watch_task.pending)
121 poll_fds_.push_back(handle);
122 #else
123 watch_task.poll_fd_index = poll_fds_.size();
124 poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
125 #endif
126 }
127 }
128
RunImmediateAndDelayedTask()129 void UnixTaskRunner::RunImmediateAndDelayedTask() {
130 // If locking overhead becomes an issue, add a separate work queue.
131 std::function<void()> immediate_task;
132 std::function<void()> delayed_task;
133 TimeMillis now = GetWallTimeMs();
134 {
135 std::lock_guard<std::mutex> lock(lock_);
136 if (!immediate_tasks_.empty()) {
137 immediate_task = std::move(immediate_tasks_.front());
138 immediate_tasks_.pop_front();
139 }
140 if (!delayed_tasks_.empty()) {
141 auto it = delayed_tasks_.begin();
142 if (now >= it->first) {
143 delayed_task = std::move(it->second);
144 delayed_tasks_.erase(it);
145 }
146 }
147 }
148
149 errno = 0;
150 if (immediate_task)
151 RunTaskWithWatchdogGuard(immediate_task);
152 errno = 0;
153 if (delayed_task)
154 RunTaskWithWatchdogGuard(delayed_task);
155 }
156
PostFileDescriptorWatches(uint64_t windows_wait_result)157 void UnixTaskRunner::PostFileDescriptorWatches(uint64_t windows_wait_result) {
158 PERFETTO_DCHECK_THREAD(thread_checker_);
159 for (size_t i = 0; i < poll_fds_.size(); i++) {
160 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
161 const PlatformHandle handle = poll_fds_[i];
162 // |windows_wait_result| is the result of WaitForMultipleObjects() call. If
163 // one of the objects was signalled, it will have a value between
164 // [0, poll_fds_.size()].
165 if (i != windows_wait_result &&
166 WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
167 continue;
168 }
169 #else
170 base::ignore_result(windows_wait_result);
171 const PlatformHandle handle = poll_fds_[i].fd;
172 if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
173 continue;
174 poll_fds_[i].revents = 0;
175 #endif
176
177 // The wake-up event is handled inline to avoid an infinite recursion of
178 // posted tasks.
179 if (handle == event_.fd()) {
180 event_.Clear();
181 continue;
182 }
183
184 // Binding to |this| is safe since we are the only object executing the
185 // task.
186 PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
187
188 // Flag the task as pending.
189 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
190 // On Windows this is done by marking the WatchTask entry as pending. This
191 // is more expensive than Linux as requires rebuilding the |poll_fds_|
192 // vector on each call. There doesn't seem to be a good alternative though.
193 auto it = watch_tasks_.find(handle);
194 PERFETTO_CHECK(it != watch_tasks_.end());
195 PERFETTO_DCHECK(!it->second.pending);
196 it->second.pending = true;
197 #else
198 // On UNIX systems instead, we just make the fd negative while its task is
199 // pending. This makes poll(2) ignore the fd.
200 PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
201 poll_fds_[i].fd = -poll_fds_[i].fd;
202 #endif
203 }
204 }
205
RunFileDescriptorWatch(PlatformHandle fd)206 void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
207 std::function<void()> task;
208 {
209 std::lock_guard<std::mutex> lock(lock_);
210 auto it = watch_tasks_.find(fd);
211 if (it == watch_tasks_.end())
212 return;
213 WatchTask& watch_task = it->second;
214
215 // Make poll(2) pay attention to the fd again. Since another thread may have
216 // updated this watch we need to refresh the set first.
217 UpdateWatchTasksLocked();
218
219 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
220 // On Windows we manually track the presence of outstanding tasks for the
221 // watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
222 // task to the |poll_fds_| vector.
223 PERFETTO_DCHECK(watch_task.pending);
224 watch_task.pending = false;
225 #else
226 size_t fd_index = watch_task.poll_fd_index;
227 PERFETTO_DCHECK(fd_index < poll_fds_.size());
228 PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
229 poll_fds_[fd_index].fd = fd;
230 #endif
231 task = watch_task.callback;
232 }
233 errno = 0;
234 RunTaskWithWatchdogGuard(task);
235 }
236
GetDelayMsToNextTaskLocked() const237 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
238 PERFETTO_DCHECK_THREAD(thread_checker_);
239 if (!immediate_tasks_.empty())
240 return 0;
241 if (!delayed_tasks_.empty()) {
242 TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
243 return std::max(0, static_cast<int>(diff.count()));
244 }
245 return -1;
246 }
247
PostTask(std::function<void ()> task)248 void UnixTaskRunner::PostTask(std::function<void()> task) {
249 bool was_empty;
250 {
251 std::lock_guard<std::mutex> lock(lock_);
252 was_empty = immediate_tasks_.empty();
253 immediate_tasks_.push_back(std::move(task));
254 }
255 if (was_empty)
256 WakeUp();
257 }
258
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)259 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
260 uint32_t delay_ms) {
261 TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
262 {
263 std::lock_guard<std::mutex> lock(lock_);
264 delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
265 }
266 WakeUp();
267 }
268
AddFileDescriptorWatch(PlatformHandle fd,std::function<void ()> task)269 void UnixTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
270 std::function<void()> task) {
271 PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
272 {
273 std::lock_guard<std::mutex> lock(lock_);
274 PERFETTO_DCHECK(!watch_tasks_.count(fd));
275 WatchTask& watch_task = watch_tasks_[fd];
276 watch_task.callback = std::move(task);
277 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
278 watch_task.pending = false;
279 #else
280 watch_task.poll_fd_index = SIZE_MAX;
281 #endif
282 watch_tasks_changed_ = true;
283 }
284 WakeUp();
285 }
286
RemoveFileDescriptorWatch(PlatformHandle fd)287 void UnixTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
288 PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
289 {
290 std::lock_guard<std::mutex> lock(lock_);
291 PERFETTO_DCHECK(watch_tasks_.count(fd));
292 watch_tasks_.erase(fd);
293 watch_tasks_changed_ = true;
294 }
295 // No need to schedule a wake-up for this.
296 }
297
RunsTasksOnCurrentThread() const298 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
299 return GetThreadId() == created_thread_id_;
300 }
301
302 } // namespace base
303 } // namespace perfetto
304