1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/base/unix_task_runner.h"
18 
19 #include "perfetto/base/build_config.h"
20 
21 #include <errno.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 
25 #include <limits>
26 
27 namespace perfetto {
28 namespace base {
29 
UnixTaskRunner()30 UnixTaskRunner::UnixTaskRunner() {
31   AddFileDescriptorWatch(event_.fd(), [] {
32     // Not reached -- see PostFileDescriptorWatches().
33     PERFETTO_DFATAL("Should be unreachable.");
34   });
35 }
36 
37 UnixTaskRunner::~UnixTaskRunner() = default;
38 
WakeUp()39 void UnixTaskRunner::WakeUp() {
40   event_.Notify();
41 }
42 
Run()43 void UnixTaskRunner::Run() {
44   PERFETTO_DCHECK_THREAD(thread_checker_);
45   created_thread_id_ = GetThreadId();
46   quit_ = false;
47   for (;;) {
48     int poll_timeout_ms;
49     {
50       std::lock_guard<std::mutex> lock(lock_);
51       if (quit_)
52         return;
53       poll_timeout_ms = GetDelayMsToNextTaskLocked();
54       UpdateWatchTasksLocked();
55     }
56     int ret = PERFETTO_EINTR(poll(
57         &poll_fds_[0], static_cast<nfds_t>(poll_fds_.size()), poll_timeout_ms));
58     PERFETTO_CHECK(ret >= 0);
59 
60     // To avoid starvation we always interleave all types of tasks -- immediate,
61     // delayed and file descriptor watches.
62     PostFileDescriptorWatches();
63     RunImmediateAndDelayedTask();
64   }
65 }
66 
Quit()67 void UnixTaskRunner::Quit() {
68   std::lock_guard<std::mutex> lock(lock_);
69   quit_ = true;
70   WakeUp();
71 }
72 
QuitCalled()73 bool UnixTaskRunner::QuitCalled() {
74   std::lock_guard<std::mutex> lock(lock_);
75   return quit_;
76 }
77 
IsIdleForTesting()78 bool UnixTaskRunner::IsIdleForTesting() {
79   std::lock_guard<std::mutex> lock(lock_);
80   return immediate_tasks_.empty();
81 }
82 
UpdateWatchTasksLocked()83 void UnixTaskRunner::UpdateWatchTasksLocked() {
84   PERFETTO_DCHECK_THREAD(thread_checker_);
85   if (!watch_tasks_changed_)
86     return;
87   watch_tasks_changed_ = false;
88   poll_fds_.clear();
89   for (auto& it : watch_tasks_) {
90     it.second.poll_fd_index = poll_fds_.size();
91     poll_fds_.push_back({it.first, POLLIN | POLLHUP, 0});
92   }
93 }
94 
RunImmediateAndDelayedTask()95 void UnixTaskRunner::RunImmediateAndDelayedTask() {
96   // If locking overhead becomes an issue, add a separate work queue.
97   std::function<void()> immediate_task;
98   std::function<void()> delayed_task;
99   TimeMillis now = GetWallTimeMs();
100   {
101     std::lock_guard<std::mutex> lock(lock_);
102     if (!immediate_tasks_.empty()) {
103       immediate_task = std::move(immediate_tasks_.front());
104       immediate_tasks_.pop_front();
105     }
106     if (!delayed_tasks_.empty()) {
107       auto it = delayed_tasks_.begin();
108       if (now >= it->first) {
109         delayed_task = std::move(it->second);
110         delayed_tasks_.erase(it);
111       }
112     }
113   }
114 
115   errno = 0;
116   if (immediate_task)
117     RunTask(immediate_task);
118   errno = 0;
119   if (delayed_task)
120     RunTask(delayed_task);
121 }
122 
PostFileDescriptorWatches()123 void UnixTaskRunner::PostFileDescriptorWatches() {
124   PERFETTO_DCHECK_THREAD(thread_checker_);
125   for (size_t i = 0; i < poll_fds_.size(); i++) {
126     if (!(poll_fds_[i].revents & (POLLIN | POLLHUP)))
127       continue;
128     poll_fds_[i].revents = 0;
129 
130     // The wake-up event is handled inline to avoid an infinite recursion of
131     // posted tasks.
132     if (poll_fds_[i].fd == event_.fd()) {
133       event_.Clear();
134       continue;
135     }
136 
137     // Binding to |this| is safe since we are the only object executing the
138     // task.
139     PostTask(std::bind(&UnixTaskRunner::RunFileDescriptorWatch, this,
140                        poll_fds_[i].fd));
141 
142     // Make the fd negative while a posted task is pending. This makes poll(2)
143     // ignore the fd.
144     PERFETTO_DCHECK(poll_fds_[i].fd >= 0);
145     poll_fds_[i].fd = -poll_fds_[i].fd;
146   }
147 }
148 
RunFileDescriptorWatch(int fd)149 void UnixTaskRunner::RunFileDescriptorWatch(int fd) {
150   std::function<void()> task;
151   {
152     std::lock_guard<std::mutex> lock(lock_);
153     auto it = watch_tasks_.find(fd);
154     if (it == watch_tasks_.end())
155       return;
156     // Make poll(2) pay attention to the fd again. Since another thread may have
157     // updated this watch we need to refresh the set first.
158     UpdateWatchTasksLocked();
159     size_t fd_index = it->second.poll_fd_index;
160     PERFETTO_DCHECK(fd_index < poll_fds_.size());
161     PERFETTO_DCHECK(::abs(poll_fds_[fd_index].fd) == fd);
162     poll_fds_[fd_index].fd = fd;
163     task = it->second.callback;
164   }
165   errno = 0;
166   RunTask(task);
167 }
168 
GetDelayMsToNextTaskLocked() const169 int UnixTaskRunner::GetDelayMsToNextTaskLocked() const {
170   PERFETTO_DCHECK_THREAD(thread_checker_);
171   if (!immediate_tasks_.empty())
172     return 0;
173   if (!delayed_tasks_.empty()) {
174     TimeMillis diff = delayed_tasks_.begin()->first - GetWallTimeMs();
175     return std::max(0, static_cast<int>(diff.count()));
176   }
177   return -1;
178 }
179 
PostTask(std::function<void ()> task)180 void UnixTaskRunner::PostTask(std::function<void()> task) {
181   bool was_empty;
182   {
183     std::lock_guard<std::mutex> lock(lock_);
184     was_empty = immediate_tasks_.empty();
185     immediate_tasks_.push_back(std::move(task));
186   }
187   if (was_empty)
188     WakeUp();
189 }
190 
PostDelayedTask(std::function<void ()> task,uint32_t delay_ms)191 void UnixTaskRunner::PostDelayedTask(std::function<void()> task,
192                                      uint32_t delay_ms) {
193   TimeMillis runtime = GetWallTimeMs() + TimeMillis(delay_ms);
194   {
195     std::lock_guard<std::mutex> lock(lock_);
196     delayed_tasks_.insert(std::make_pair(runtime, std::move(task)));
197   }
198   WakeUp();
199 }
200 
AddFileDescriptorWatch(int fd,std::function<void ()> task)201 void UnixTaskRunner::AddFileDescriptorWatch(int fd,
202                                             std::function<void()> task) {
203   PERFETTO_DCHECK(fd >= 0);
204   {
205     std::lock_guard<std::mutex> lock(lock_);
206     PERFETTO_DCHECK(!watch_tasks_.count(fd));
207     watch_tasks_[fd] = {std::move(task), SIZE_MAX};
208     watch_tasks_changed_ = true;
209   }
210   WakeUp();
211 }
212 
RemoveFileDescriptorWatch(int fd)213 void UnixTaskRunner::RemoveFileDescriptorWatch(int fd) {
214   PERFETTO_DCHECK(fd >= 0);
215   {
216     std::lock_guard<std::mutex> lock(lock_);
217     PERFETTO_DCHECK(watch_tasks_.count(fd));
218     watch_tasks_.erase(fd);
219     watch_tasks_changed_ = true;
220   }
221   // No need to schedule a wake-up for this.
222 }
223 
RunsTasksOnCurrentThread() const224 bool UnixTaskRunner::RunsTasksOnCurrentThread() const {
225   return GetThreadId() == created_thread_id_;
226 }
227 
228 }  // namespace base
229 }  // namespace perfetto
230