1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/build_config.h"
18 
19 #include "perfetto/ext/base/unix_task_runner.h"
20 
21 #include <errno.h>
22 #include <stdlib.h>
23 
24 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
25 #include <Windows.h>
26 #include <synchapi.h>
27 #else
28 #include <unistd.h>
29 #endif
30 
31 #include <algorithm>
32 #include <limits>
33 
34 #include "perfetto/ext/base/watchdog.h"
35 
36 namespace perfetto {
37 namespace base {
38 
UnixTaskRunner()39 UnixTaskRunner::UnixTaskRunner() {
40   AddFileDescriptorWatch(event_.fd(), [] {
41     // Not reached -- see PostFileDescriptorWatches().
42     PERFETTO_DFATAL("Should be unreachable.");
43   });
44 }
45 
46 UnixTaskRunner::~UnixTaskRunner() = default;
47 
WakeUp()48 void UnixTaskRunner::WakeUp() {
49   event_.Notify();
50 }
51 
Run()52 void UnixTaskRunner::Run() {
53   PERFETTO_DCHECK_THREAD(thread_checker_);
54   created_thread_id_ = GetThreadId();
55   quit_ = false;
56   for (;;) {
57     int poll_timeout_ms;
58     {
59       std::lock_guard<std::mutex> lock(lock_);
60       if (quit_)
61         return;
62       poll_timeout_ms = GetDelayMsToNextTaskLocked();
63       UpdateWatchTasksLocked();
64     }
65 
66 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
67     DWORD timeout =
68         poll_timeout_ms >= 0 ? static_cast<DWORD>(poll_timeout_ms) : INFINITE;
69     DWORD ret =
70         WaitForMultipleObjects(static_cast<DWORD>(poll_fds_.size()),
71                                &poll_fds_[0], /*bWaitAll=*/false, timeout);
72     // Unlike poll(2), WaitForMultipleObjects() returns only *one* handle in the
73     // set, even when >1 is signalled. In order to avoid starvation,
74     // PostFileDescriptorWatches() will WaitForSingleObject() each other handle
75     // to ensure fairness. |ret| here is passed just to avoid an extra
76     // WaitForSingleObject() for the one handle that WaitForMultipleObject()
77     // returned.
78     PostFileDescriptorWatches(ret);
79 #else
80     int ret = PERFETTO_EINTR(poll(
81         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
82     PERFETTO_CHECK(ret >= 0);
83     PostFileDescriptorWatches(0 /*ignored*/);
84 #endif
85 
86     // To avoid starvation we always interleave all types of tasks -- immediate,
87     // delayed and file descriptor watches.
88     RunImmediateAndDelayedTask();
89   }
90 }
91 
Quit()92 void UnixTaskRunner::Quit() {
93   std::lock_guard<std::mutex> lock(lock_);
94   quit_ = true;
95   WakeUp();
96 }
97 
QuitCalled()98 bool UnixTaskRunner::QuitCalled() {
99   std::lock_guard<std::mutex> lock(lock_);
100   return quit_;
101 }
102 
IsIdleForTesting()103 bool UnixTaskRunner::IsIdleForTesting() {
104   std::lock_guard<std::mutex> lock(lock_);
105   return immediate_tasks_.empty();
106 }
107 
UpdateWatchTasksLocked()108 void UnixTaskRunner::UpdateWatchTasksLocked() {
109   PERFETTO_DCHECK_THREAD(thread_checker_);
110 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
111   if (!watch_tasks_changed_)
112     return;
113   watch_tasks_changed_ = false;
114 #endif
115   poll_fds_.clear();
116   for (auto& it : watch_tasks_) {
117     PlatformHandle handle = it.first;
118     WatchTask& watch_task = it.second;
119 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
120     if (!watch_task.pending)
121       poll_fds_.push_back(handle);
122 #else
123     watch_task.poll_fd_index = poll_fds_.size();
124     poll_fds_.push_back({handle, POLLIN | POLLHUP, 0});
125 #endif
126   }
127 }
128 
RunImmediateAndDelayedTask()129 void UnixTaskRunner::RunImmediateAndDelayedTask() {
130   // If locking overhead becomes an issue, add a separate work queue.
131   std::function<void()> immediate_task;
132   std::function<void()> delayed_task;
133   TimeMillis now = GetWallTimeMs();
134   {
135     std::lock_guard<std::mutex> lock(lock_);
136     if (!immediate_tasks_.empty()) {
137       immediate_task = std::move(immediate_tasks_.front());
138       immediate_tasks_.pop_front();
139     }
140     if (!delayed_tasks_.empty()) {
141       auto it = delayed_tasks_.begin();
142       if (now >= it->first) {
143         delayed_task = std::move(it->second);
144         delayed_tasks_.erase(it);
145       }
146     }
147   }
148 
149   errno = 0;
150   if (immediate_task)
151     RunTaskWithWatchdogGuard(immediate_task);
152   errno = 0;
153   if (delayed_task)
154     RunTaskWithWatchdogGuard(delayed_task);
155 }
156 
PostFileDescriptorWatches(uint64_t windows_wait_result)157 void UnixTaskRunner::PostFileDescriptorWatches(uint64_t windows_wait_result) {
158   PERFETTO_DCHECK_THREAD(thread_checker_);
159   for (size_t i = 0; i < poll_fds_.size(); i++) {
160 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
161     const PlatformHandle handle = poll_fds_[i];
162     // |windows_wait_result| is the result of WaitForMultipleObjects() call. If
163     // one of the objects was signalled, it will have a value between
164     // [0, poll_fds_.size()].
165     if (i != windows_wait_result &&
166         WaitForSingleObject(handle, 0) != WAIT_OBJECT_0) {
167       continue;
168     }
169 #else
170     base::ignore_result(windows_wait_result);
171     const PlatformHandle handle = poll_fds_[i].fd;
172     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
173       continue;
174     poll_fds_[i].revents = 0;
175 #endif
176 
177     // The wake-up event is handled inline to avoid an infinite recursion of
178     // posted tasks.
179     if (handle == event_.fd()) {
180       event_.Clear();
181       continue;
182     }
183 
184     // Binding to |this| is safe since we are the only object executing the
185     // task.
186     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this, handle));
187 
188     // Flag the task as pending.
189 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
190     // On Windows this is done by marking the WatchTask entry as pending. This
191     // is more expensive than Linux as requires rebuilding the |poll_fds_|
192     // vector on each call. There doesn't seem to be a good alternative though.
193     auto it = watch_tasks_.find(handle);
194     PERFETTO_CHECK(it != watch_tasks_.end());
195     PERFETTO_DCHECK(!it->second.pending);
196     it->second.pending = true;
197 #else
198     // On UNIX systems instead, we just make the fd negative while its task is
199     // pending. This makes poll(2) ignore the fd.
200     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
201     poll_fds_[i].fd = -poll_fds_[i].fd;
202 #endif
203   }
204 }
205 
RunFileDescriptorWatch(PlatformHandle fd)206 void UnixTaskRunner::RunFileDescriptorWatch(PlatformHandle fd) {
207   std::function<void()> task;
208   {
209     std::lock_guard<std::mutex> lock(lock_);
210     auto it = watch_tasks_.find(fd);
211     if (it == watch_tasks_.end())
212       return;
213     WatchTask& watch_task = it->second;
214 
215     // Make poll(2) pay attention to the fd again. Since another thread may have
216     // updated this watch we need to refresh the set first.
217     UpdateWatchTasksLocked();
218 
219 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
220     // On Windows we manually track the presence of outstanding tasks for the
221     // watch. The UpdateWatchTasksLocked() in the Run() loop will re-add the
222     // task to the |poll_fds_| vector.
223     PERFETTO_DCHECK(watch_task.pending);
224     watch_task.pending = false;
225 #else
226     size_t fd_index = watch_task.poll_fd_index;
227     PERFETTO_DCHECK(fd_index < poll_fds_.size());
228     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
229     poll_fds_[fd_index].fd = fd;
230 #endif
231     task = watch_task.callback;
232   }
233   errno = 0;
234   RunTaskWithWatchdogGuard(task);
235 }
236 
GetDelayMsToNextTaskLocked() const237 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
238   PERFETTO_DCHECK_THREAD(thread_checker_);
239   if (!immediate_tasks_.empty())
240     return 0;
241   if (!delayed_tasks_.empty()) {
242     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
243     return std::max(0, static_cast<int>(diff.count()));
244   }
245   return -1;
246 }
247 
PostTask(std::function<void ()> task)248 void UnixTaskRunner::PostTask(std::function<void()> task) {
249   bool was_empty;
250   {
251     std::lock_guard<std::mutex> lock(lock_);
252     was_empty = immediate_tasks_.empty();
253     immediate_tasks_.push_back(std::move(task));
254   }
255   if (was_empty)
256     WakeUp();
257 }
258 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)259 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
260                                      uint32_t delay_ms) {
261   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
262   {
263     std::lock_guard<std::mutex> lock(lock_);
264     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
265   }
266   WakeUp();
267 }
268 
AddFileDescriptorWatch(PlatformHandle fd,std::function<void ()> task)269 void UnixTaskRunner::AddFileDescriptorWatch(PlatformHandle fd,
270                                             std::function<void()> task) {
271   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
272   {
273     std::lock_guard<std::mutex> lock(lock_);
274     PERFETTO_DCHECK(!watch_tasks_.count(fd));
275     WatchTask& watch_task = watch_tasks_[fd];
276     watch_task.callback = std::move(task);
277 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
278     watch_task.pending = false;
279 #else
280     watch_task.poll_fd_index = SIZE_MAX;
281 #endif
282     watch_tasks_changed_ = true;
283   }
284   WakeUp();
285 }
286 
RemoveFileDescriptorWatch(PlatformHandle fd)287 void UnixTaskRunner::RemoveFileDescriptorWatch(PlatformHandle fd) {
288   PERFETTO_DCHECK(PlatformHandleChecker::IsValid(fd));
289   {
290     std::lock_guard<std::mutex> lock(lock_);
291     PERFETTO_DCHECK(watch_tasks_.count(fd));
292     watch_tasks_.erase(fd);
293     watch_tasks_changed_ = true;
294   }
295   // No need to schedule a wake-up for this.
296 }
297 
RunsTasksOnCurrentThread() const298 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
299   return GetThreadId() == created_thread_id_;
300 }
301 
302 }  // namespace base
303 }  // namespace perfetto
304