1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/message_loops/base_message_loop.h>
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #ifndef __ANDROID_HOST__
13 // Used for MISC_MAJOR. Only required for the target and not always available
14 // for the host.
15 #include <linux/major.h>
16 #endif
17 
18 #include <vector>
19 
20 #include <base/bind.h>
21 #include <base/files/file_path.h>
22 #include <base/files/file_util.h>
23 #include <base/run_loop.h>
24 #include <base/strings/string_number_conversions.h>
25 #include <base/strings/string_split.h>
26 
27 #include <brillo/location_logging.h>
28 #include <brillo/strings/string_utils.h>
29 
30 using base::Closure;
31 
32 namespace {
33 
34 const char kMiscMinorPath[] = "/proc/misc";
35 const char kBinderDriverName[] = "binder";
36 
37 }  // namespace
38 
39 namespace brillo {
40 
41 const int BaseMessageLoop::kInvalidMinor = -1;
42 const int BaseMessageLoop::kUninitializedMinor = -2;
43 
BaseMessageLoop(base::MessageLoopForIO * base_loop)44 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
45     : base_loop_(base_loop),
46       weak_ptr_factory_(this) {}
47 
~BaseMessageLoop()48 BaseMessageLoop::~BaseMessageLoop() {
49   for (auto& io_task : io_tasks_) {
50     DVLOG_LOC(io_task.second.location(), 1)
51         << "Removing file descriptor watcher task_id " << io_task.first
52         << " leaked on BaseMessageLoop, scheduled from this location.";
53     io_task.second.StopWatching();
54   }
55 
56   // Note all pending canceled delayed tasks when destroying the message loop.
57   size_t lazily_deleted_tasks = 0;
58   for (const auto& delayed_task : delayed_tasks_) {
59     if (delayed_task.second.closure.is_null()) {
60       lazily_deleted_tasks++;
61     } else {
62       DVLOG_LOC(delayed_task.second.location, 1)
63           << "Removing delayed task_id " << delayed_task.first
64           << " leaked on BaseMessageLoop, scheduled from this location.";
65     }
66   }
67   if (lazily_deleted_tasks) {
68     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
69   }
70 }
71 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,base::TimeDelta delay)72 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
73     const tracked_objects::Location& from_here,
74     const Closure &task,
75     base::TimeDelta delay) {
76   TaskId task_id =  NextTaskId();
77   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
78       from_here,
79       base::Bind(&BaseMessageLoop::OnRanPostedTask,
80                  weak_ptr_factory_.GetWeakPtr(),
81                  task_id),
82       delay);
83   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
84                           << " to run in " << delay << ".";
85   if (!base_scheduled)
86     return MessageLoop::kTaskIdNull;
87 
88   delayed_tasks_.emplace(task_id,
89                          DelayedTask{from_here, task_id, std::move(task)});
90   return task_id;
91 }
92 
WatchFileDescriptor(const tracked_objects::Location & from_here,int fd,WatchMode mode,bool persistent,const Closure & task)93 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
94     const tracked_objects::Location& from_here,
95     int fd,
96     WatchMode mode,
97     bool persistent,
98     const Closure &task) {
99   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
100   if (fd < 0)
101     return MessageLoop::kTaskIdNull;
102 
103   base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
104   switch (mode) {
105     case MessageLoop::kWatchRead:
106       base_mode = base::MessageLoopForIO::WATCH_READ;
107       break;
108     case MessageLoop::kWatchWrite:
109       base_mode = base::MessageLoopForIO::WATCH_WRITE;
110       break;
111     default:
112       return MessageLoop::kTaskIdNull;
113   }
114 
115   TaskId task_id =  NextTaskId();
116   auto it_bool = io_tasks_.emplace(
117       std::piecewise_construct,
118       std::forward_as_tuple(task_id),
119       std::forward_as_tuple(
120           from_here, this, task_id, fd, base_mode, persistent, task));
121   // This should always insert a new element.
122   DCHECK(it_bool.second);
123   bool scheduled = it_bool.first->second.StartWatching();
124   DVLOG_LOC(from_here, 1)
125       << "Watching fd " << fd << " for "
126       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
127       << (persistent ? " persistently" : " just once")
128       << " as task_id " << task_id
129       << (scheduled ? " successfully" : " failed.");
130 
131   if (!scheduled) {
132     io_tasks_.erase(task_id);
133     return MessageLoop::kTaskIdNull;
134   }
135 
136 #ifndef __ANDROID_HOST__
137   // Determine if the passed fd is the binder file descriptor. For that, we need
138   // to check that is a special char device and that the major and minor device
139   // numbers match. The binder file descriptor can't be removed and added back
140   // to an epoll group when there's work available to be done by the file
141   // descriptor due to bugs in the binder driver (b/26524111) when used with
142   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
143   // This may cause the binder file descriptor to be attended with higher
144   // priority and cause starvation of other events.
145   struct stat buf;
146   if (fstat(fd, &buf) == 0 &&
147       S_ISCHR(buf.st_mode) &&
148       major(buf.st_rdev) == MISC_MAJOR &&
149       minor(buf.st_rdev) == GetBinderMinor()) {
150     it_bool.first->second.RunImmediately();
151   }
152 #endif
153 
154   return task_id;
155 }
156 
CancelTask(TaskId task_id)157 bool BaseMessageLoop::CancelTask(TaskId task_id) {
158   if (task_id == kTaskIdNull)
159     return false;
160   auto delayed_task_it = delayed_tasks_.find(task_id);
161   if (delayed_task_it == delayed_tasks_.end()) {
162     // This might be an IOTask then.
163     auto io_task_it = io_tasks_.find(task_id);
164     if (io_task_it == io_tasks_.end())
165       return false;
166     return io_task_it->second.CancelTask();
167   }
168   // A DelayedTask was found for this task_id at this point.
169 
170   // Check if the callback was already canceled but we have the entry in
171   // delayed_tasks_ since it didn't fire yet in the message loop.
172   if (delayed_task_it->second.closure.is_null())
173     return false;
174 
175   DVLOG_LOC(delayed_task_it->second.location, 1)
176       << "Removing task_id " << task_id << " scheduled from this location.";
177   // We reset to closure to a null Closure to release all the resources
178   // used by this closure at this point, but we don't remove the task_id from
179   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
180   delayed_task_it->second.closure = Closure();
181 
182   return true;
183 }
184 
RunOnce(bool may_block)185 bool BaseMessageLoop::RunOnce(bool may_block) {
186   run_once_ = true;
187   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
188   base_run_loop_ = &run_loop;
189   if (!may_block)
190     run_loop.RunUntilIdle();
191   else
192     run_loop.Run();
193   base_run_loop_ = nullptr;
194   // If the flag was reset to false, it means a closure was run.
195   if (!run_once_)
196     return true;
197 
198   run_once_ = false;
199   return false;
200 }
201 
Run()202 void BaseMessageLoop::Run() {
203   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
204   base_run_loop_ = &run_loop;
205   run_loop.Run();
206   base_run_loop_ = nullptr;
207 }
208 
BreakLoop()209 void BaseMessageLoop::BreakLoop() {
210   if (base_run_loop_ == nullptr) {
211     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
212     return;  // Message loop not running, nothing to do.
213   }
214   base_run_loop_->Quit();
215 }
216 
QuitClosure() const217 Closure BaseMessageLoop::QuitClosure() const {
218   if (base_run_loop_ == nullptr)
219     return base::Bind(&base::DoNothing);
220   return base_run_loop_->QuitClosure();
221 }
222 
NextTaskId()223 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
224   TaskId res;
225   do {
226     res = ++last_id_;
227     // We would run out of memory before we run out of task ids.
228   } while (!res ||
229            delayed_tasks_.find(res) != delayed_tasks_.end() ||
230            io_tasks_.find(res) != io_tasks_.end());
231   return res;
232 }
233 
OnRanPostedTask(MessageLoop::TaskId task_id)234 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
235   auto task_it = delayed_tasks_.find(task_id);
236   DCHECK(task_it != delayed_tasks_.end());
237   if (!task_it->second.closure.is_null()) {
238     DVLOG_LOC(task_it->second.location, 1)
239         << "Running delayed task_id " << task_id
240         << " scheduled from this location.";
241     // Mark the task as canceled while we are running it so CancelTask returns
242     // false.
243     Closure closure = std::move(task_it->second.closure);
244     task_it->second.closure = Closure();
245     closure.Run();
246 
247     // If the |run_once_| flag is set, it is because we are instructed to run
248     // only once callback.
249     if (run_once_) {
250       run_once_ = false;
251       BreakLoop();
252     }
253   }
254   delayed_tasks_.erase(task_it);
255 }
256 
OnFileReadyPostedTask(MessageLoop::TaskId task_id)257 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
258   auto task_it = io_tasks_.find(task_id);
259   // Even if this task was canceled while we were waiting in the message loop
260   // for this method to run, the entry in io_tasks_ should still be present, but
261   // won't do anything.
262   DCHECK(task_it != io_tasks_.end());
263   task_it->second.OnFileReadyPostedTask();
264 }
265 
ParseBinderMinor(const std::string & file_contents)266 int BaseMessageLoop::ParseBinderMinor(
267     const std::string& file_contents) {
268   int result = kInvalidMinor;
269   // Split along '\n', then along the ' '. Note that base::SplitString trims all
270   // white spaces at the beginning and end after splitting.
271   std::vector<std::string> lines =
272       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
273                         base::SPLIT_WANT_ALL);
274   for (const std::string& line : lines) {
275     if (line.empty())
276       continue;
277     std::string number;
278     std::string name;
279     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
280       continue;
281 
282     if (name == kBinderDriverName && base::StringToInt(number, &result))
283       break;
284   }
285   return result;
286 }
287 
GetBinderMinor()288 unsigned int BaseMessageLoop::GetBinderMinor() {
289   if (binder_minor_ != kUninitializedMinor)
290     return binder_minor_;
291 
292   std::string proc_misc;
293   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
294     return binder_minor_;
295   binder_minor_ = ParseBinderMinor(proc_misc);
296   return binder_minor_;
297 }
298 
IOTask(const tracked_objects::Location & location,BaseMessageLoop * loop,MessageLoop::TaskId task_id,int fd,base::MessageLoopForIO::Mode base_mode,bool persistent,const Closure & task)299 BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
300                                 BaseMessageLoop* loop,
301                                 MessageLoop::TaskId task_id,
302                                 int fd,
303                                 base::MessageLoopForIO::Mode base_mode,
304                                 bool persistent,
305                                 const Closure& task)
306     : location_(location), loop_(loop), task_id_(task_id),
307       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
308 
StartWatching()309 bool BaseMessageLoop::IOTask::StartWatching() {
310   return loop_->base_loop_->WatchFileDescriptor(
311       fd_, persistent_, base_mode_, &fd_watcher_, this);
312 }
313 
StopWatching()314 void BaseMessageLoop::IOTask::StopWatching() {
315   // This is safe to call even if we are not watching for it.
316   fd_watcher_.StopWatchingFileDescriptor();
317 }
318 
OnFileCanReadWithoutBlocking(int)319 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
320   OnFileReady();
321 }
322 
OnFileCanWriteWithoutBlocking(int)323 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
324   OnFileReady();
325 }
326 
OnFileReady()327 void BaseMessageLoop::IOTask::OnFileReady() {
328   // For file descriptors marked with the immediate_run flag, we don't call
329   // StopWatching() and wait, instead we dispatch the callback immediately.
330   if (immediate_run_) {
331     posted_task_pending_ = true;
332     OnFileReadyPostedTask();
333     return;
334   }
335 
336   // When the file descriptor becomes available we stop watching for it and
337   // schedule a task to run the callback from the main loop. The callback will
338   // run using the same scheduler used to run other delayed tasks, avoiding
339   // starvation of the available posted tasks if there are file descriptors
340   // always available. The new posted task will use the same TaskId as the
341   // current file descriptor watching task an could be canceled in either state,
342   // when waiting for the file descriptor or waiting in the main loop.
343   StopWatching();
344   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
345       location_,
346       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
347                  loop_->weak_ptr_factory_.GetWeakPtr(),
348                  task_id_));
349   posted_task_pending_ = true;
350   if (base_scheduled) {
351     DVLOG_LOC(location_, 1)
352         << "Dispatching task_id " << task_id_ << " for "
353         << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
354             "reading" : "writing")
355         << " file descriptor " << fd_ << ", scheduled from this location.";
356   } else {
357     // In the rare case that PostTask() fails, we fall back to run it directly.
358     // This would indicate a bigger problem with the message loop setup.
359     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
360     OnFileReadyPostedTask();
361   }
362 }
363 
OnFileReadyPostedTask()364 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
365   // We can't access |this| after running the |closure_| since it could call
366   // CancelTask on its own task_id, so we copy the members we need now.
367   BaseMessageLoop* loop_ptr = loop_;
368   DCHECK(posted_task_pending_ = true);
369   posted_task_pending_ = false;
370 
371   // If this task was already canceled, the closure will be null and there is
372   // nothing else to do here. This execution doesn't count a step for RunOnce()
373   // unless we have a callback to run.
374   if (closure_.is_null()) {
375     loop_->io_tasks_.erase(task_id_);
376     return;
377   }
378 
379   DVLOG_LOC(location_, 1)
380       << "Running task_id " << task_id_ << " for "
381       << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
382           "reading" : "writing")
383       << " file descriptor " << fd_ << ", scheduled from this location.";
384 
385   if (persistent_) {
386     // In the persistent case we just run the callback. If this callback cancels
387     // the task id, we can't access |this| anymore, so we re-start watching the
388     // file descriptor before running the callback, unless this is a fd where
389     // we didn't stop watching the file descriptor when it became available.
390     if (!immediate_run_)
391       StartWatching();
392     closure_.Run();
393   } else {
394     // This will destroy |this|, the fd_watcher and therefore stop watching this
395     // file descriptor.
396     Closure closure_copy = std::move(closure_);
397     loop_->io_tasks_.erase(task_id_);
398     // Run the closure from the local copy we just made.
399     closure_copy.Run();
400   }
401 
402   if (loop_ptr->run_once_) {
403     loop_ptr->run_once_ = false;
404     loop_ptr->BreakLoop();
405   }
406 }
407 
CancelTask()408 bool BaseMessageLoop::IOTask::CancelTask() {
409   if (closure_.is_null())
410     return false;
411 
412   DVLOG_LOC(location_, 1)
413       << "Removing task_id " << task_id_ << " scheduled from this location.";
414 
415   if (!posted_task_pending_) {
416     // Destroying the FileDescriptorWatcher implicitly stops watching the file
417     // descriptor. This will delete our instance.
418     loop_->io_tasks_.erase(task_id_);
419     return true;
420   }
421   // The IOTask is waiting for the message loop to run its delayed task, so
422   // it is not watching for the file descriptor. We release the closure
423   // resources now but keep the IOTask instance alive while we wait for the
424   // callback to run and delete the IOTask.
425   closure_ = Closure();
426   return true;
427 }
428 
429 }  // namespace brillo
430