1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/message_loops/base_message_loop.h>
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #ifndef __APPLE__
13 #include <sys/sysmacros.h>
14 #endif
15 
16 #ifndef __ANDROID_HOST__
17 // Used for MISC_MAJOR. Only required for the target and not always available
18 // for the host.
19 #include <linux/major.h>
20 #endif
21 
22 #include <vector>
23 
24 #include <base/bind.h>
25 #include <base/bind_helpers.h>
26 #include <base/files/file_path.h>
27 #include <base/files/file_util.h>
28 #include <base/run_loop.h>
29 #include <base/strings/string_number_conversions.h>
30 #include <base/strings/string_split.h>
31 
32 #include <brillo/location_logging.h>
33 #include <brillo/strings/string_utils.h>
34 
35 using base::Closure;
36 
37 namespace {
38 
39 const char kMiscMinorPath[] = "/proc/misc";
40 const char kBinderDriverName[] = "binder";
41 
42 }  // namespace
43 
44 namespace brillo {
45 
46 const int BaseMessageLoop::kInvalidMinor = -1;
47 const int BaseMessageLoop::kUninitializedMinor = -2;
48 
BaseMessageLoop()49 BaseMessageLoop::BaseMessageLoop() {
50   CHECK(!base::MessageLoop::current())
51       << "You can't create a base::MessageLoopForIO when another "
52          "base::MessageLoop is already created for this thread.";
53   owned_base_loop_.reset(new base::MessageLoopForIO);
54   base_loop_ = owned_base_loop_.get();
55 }
56 
BaseMessageLoop(base::MessageLoopForIO * base_loop)57 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
58     : base_loop_(base_loop) {}
59 
~BaseMessageLoop()60 BaseMessageLoop::~BaseMessageLoop() {
61   for (auto& io_task : io_tasks_) {
62     DVLOG_LOC(io_task.second.location(), 1)
63         << "Removing file descriptor watcher task_id " << io_task.first
64         << " leaked on BaseMessageLoop, scheduled from this location.";
65     io_task.second.StopWatching();
66   }
67 
68   // Note all pending canceled delayed tasks when destroying the message loop.
69   size_t lazily_deleted_tasks = 0;
70   for (const auto& delayed_task : delayed_tasks_) {
71     if (delayed_task.second.closure.is_null()) {
72       lazily_deleted_tasks++;
73     } else {
74       DVLOG_LOC(delayed_task.second.location, 1)
75           << "Removing delayed task_id " << delayed_task.first
76           << " leaked on BaseMessageLoop, scheduled from this location.";
77     }
78   }
79   if (lazily_deleted_tasks) {
80     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
81   }
82 }
83 
PostDelayedTask(const base::Location & from_here,const Closure & task,base::TimeDelta delay)84 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
85     const base::Location& from_here,
86     const Closure &task,
87     base::TimeDelta delay) {
88   TaskId task_id =  NextTaskId();
89   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
90       from_here,
91       base::Bind(&BaseMessageLoop::OnRanPostedTask,
92                  weak_ptr_factory_.GetWeakPtr(),
93                  task_id),
94       delay);
95   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
96                           << " to run in " << delay << ".";
97   if (!base_scheduled)
98     return MessageLoop::kTaskIdNull;
99 
100   delayed_tasks_.emplace(task_id,
101                          DelayedTask{from_here, task_id, std::move(task)});
102   return task_id;
103 }
104 
WatchFileDescriptor(const base::Location & from_here,int fd,WatchMode mode,bool persistent,const Closure & task)105 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
106     const base::Location& from_here,
107     int fd,
108     WatchMode mode,
109     bool persistent,
110     const Closure &task) {
111   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
112   if (fd < 0)
113     return MessageLoop::kTaskIdNull;
114 
115   base::MessagePumpForIO::Mode base_mode = base::MessagePumpForIO::WATCH_READ;
116   switch (mode) {
117     case MessageLoop::kWatchRead:
118       base_mode = base::MessagePumpForIO::WATCH_READ;
119       break;
120     case MessageLoop::kWatchWrite:
121       base_mode = base::MessagePumpForIO::WATCH_WRITE;
122       break;
123     default:
124       return MessageLoop::kTaskIdNull;
125   }
126 
127   TaskId task_id =  NextTaskId();
128   auto it_bool = io_tasks_.emplace(
129       std::piecewise_construct,
130       std::forward_as_tuple(task_id),
131       std::forward_as_tuple(
132           from_here, this, task_id, fd, base_mode, persistent, task));
133   // This should always insert a new element.
134   DCHECK(it_bool.second);
135   bool scheduled = it_bool.first->second.StartWatching();
136   DVLOG_LOC(from_here, 1)
137       << "Watching fd " << fd << " for "
138       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
139       << (persistent ? " persistently" : " just once")
140       << " as task_id " << task_id
141       << (scheduled ? " successfully" : " failed.");
142 
143   if (!scheduled) {
144     io_tasks_.erase(task_id);
145     return MessageLoop::kTaskIdNull;
146   }
147 
148 #ifndef __ANDROID_HOST__
149   // Determine if the passed fd is the binder file descriptor. For that, we need
150   // to check that is a special char device and that the major and minor device
151   // numbers match. The binder file descriptor can't be removed and added back
152   // to an epoll group when there's work available to be done by the file
153   // descriptor due to bugs in the binder driver (b/26524111) when used with
154   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
155   // This may cause the binder file descriptor to be attended with higher
156   // priority and cause starvation of other events.
157   struct stat buf;
158   if (fstat(fd, &buf) == 0 &&
159       S_ISCHR(buf.st_mode) &&
160       major(buf.st_rdev) == MISC_MAJOR &&
161       minor(buf.st_rdev) == GetBinderMinor()) {
162     it_bool.first->second.RunImmediately();
163   }
164 #endif
165 
166   return task_id;
167 }
168 
CancelTask(TaskId task_id)169 bool BaseMessageLoop::CancelTask(TaskId task_id) {
170   if (task_id == kTaskIdNull)
171     return false;
172   auto delayed_task_it = delayed_tasks_.find(task_id);
173   if (delayed_task_it == delayed_tasks_.end()) {
174     // This might be an IOTask then.
175     auto io_task_it = io_tasks_.find(task_id);
176     if (io_task_it == io_tasks_.end())
177       return false;
178     return io_task_it->second.CancelTask();
179   }
180   // A DelayedTask was found for this task_id at this point.
181 
182   // Check if the callback was already canceled but we have the entry in
183   // delayed_tasks_ since it didn't fire yet in the message loop.
184   if (delayed_task_it->second.closure.is_null())
185     return false;
186 
187   DVLOG_LOC(delayed_task_it->second.location, 1)
188       << "Removing task_id " << task_id << " scheduled from this location.";
189   // We reset to closure to a null Closure to release all the resources
190   // used by this closure at this point, but we don't remove the task_id from
191   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
192   delayed_task_it->second.closure = Closure();
193 
194   return true;
195 }
196 
RunOnce(bool may_block)197 bool BaseMessageLoop::RunOnce(bool may_block) {
198   run_once_ = true;
199   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
200   base_run_loop_ = &run_loop;
201   if (!may_block)
202     run_loop.RunUntilIdle();
203   else
204     run_loop.Run();
205   base_run_loop_ = nullptr;
206   // If the flag was reset to false, it means a closure was run.
207   if (!run_once_)
208     return true;
209 
210   run_once_ = false;
211   return false;
212 }
213 
Run()214 void BaseMessageLoop::Run() {
215   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
216   base_run_loop_ = &run_loop;
217   run_loop.Run();
218   base_run_loop_ = nullptr;
219 }
220 
BreakLoop()221 void BaseMessageLoop::BreakLoop() {
222   if (base_run_loop_ == nullptr) {
223     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
224     return;  // Message loop not running, nothing to do.
225   }
226   base_run_loop_->Quit();
227 }
228 
QuitClosure() const229 Closure BaseMessageLoop::QuitClosure() const {
230   if (base_run_loop_ == nullptr)
231     return base::DoNothing();
232   return base_run_loop_->QuitClosure();
233 }
234 
NextTaskId()235 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
236   TaskId res;
237   do {
238     res = ++last_id_;
239     // We would run out of memory before we run out of task ids.
240   } while (!res ||
241            delayed_tasks_.find(res) != delayed_tasks_.end() ||
242            io_tasks_.find(res) != io_tasks_.end());
243   return res;
244 }
245 
OnRanPostedTask(MessageLoop::TaskId task_id)246 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
247   auto task_it = delayed_tasks_.find(task_id);
248   DCHECK(task_it != delayed_tasks_.end());
249   if (!task_it->second.closure.is_null()) {
250     DVLOG_LOC(task_it->second.location, 1)
251         << "Running delayed task_id " << task_id
252         << " scheduled from this location.";
253     // Mark the task as canceled while we are running it so CancelTask returns
254     // false.
255     Closure closure = std::move(task_it->second.closure);
256     task_it->second.closure = Closure();
257     closure.Run();
258 
259     // If the |run_once_| flag is set, it is because we are instructed to run
260     // only once callback.
261     if (run_once_) {
262       run_once_ = false;
263       BreakLoop();
264     }
265   }
266   delayed_tasks_.erase(task_it);
267 }
268 
OnFileReadyPostedTask(MessageLoop::TaskId task_id)269 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
270   auto task_it = io_tasks_.find(task_id);
271   // Even if this task was canceled while we were waiting in the message loop
272   // for this method to run, the entry in io_tasks_ should still be present, but
273   // won't do anything.
274   DCHECK(task_it != io_tasks_.end());
275   task_it->second.OnFileReadyPostedTask();
276 }
277 
ParseBinderMinor(const std::string & file_contents)278 int BaseMessageLoop::ParseBinderMinor(
279     const std::string& file_contents) {
280   int result = kInvalidMinor;
281   // Split along '\n', then along the ' '. Note that base::SplitString trims all
282   // white spaces at the beginning and end after splitting.
283   std::vector<std::string> lines =
284       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
285                         base::SPLIT_WANT_ALL);
286   for (const std::string& line : lines) {
287     if (line.empty())
288       continue;
289     std::string number;
290     std::string name;
291     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
292       continue;
293 
294     if (name == kBinderDriverName && base::StringToInt(number, &result))
295       break;
296   }
297   return result;
298 }
299 
GetBinderMinor()300 unsigned int BaseMessageLoop::GetBinderMinor() {
301   if (binder_minor_ != kUninitializedMinor)
302     return binder_minor_;
303 
304   std::string proc_misc;
305   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
306     return binder_minor_;
307   binder_minor_ = ParseBinderMinor(proc_misc);
308   return binder_minor_;
309 }
310 
IOTask(const base::Location & location,BaseMessageLoop * loop,MessageLoop::TaskId task_id,int fd,base::MessagePumpForIO::Mode base_mode,bool persistent,const Closure & task)311 BaseMessageLoop::IOTask::IOTask(const base::Location& location,
312                                 BaseMessageLoop* loop,
313                                 MessageLoop::TaskId task_id,
314                                 int fd,
315                                 base::MessagePumpForIO::Mode base_mode,
316                                 bool persistent,
317                                 const Closure& task)
318     : location_(location), loop_(loop), task_id_(task_id),
319       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task),
320       fd_watcher_(FROM_HERE) {}
321 
StartWatching()322 bool BaseMessageLoop::IOTask::StartWatching() {
323   // Please see MessagePumpLibevent for definition.
324   static_assert(std::is_same<base::MessagePumpForIO, base::MessagePumpLibevent>::value,
325                 "MessagePumpForIO::WatchFileDescriptor is not supported "
326                 "when MessagePumpForIO is not a MessagePumpLibevent.");
327 
328   return static_cast<base::MessagePumpLibevent*>(
329       loop_->base_loop_->pump_.get())->WatchFileDescriptor(
330           fd_, persistent_, base_mode_, &fd_watcher_, this);
331 }
332 
StopWatching()333 void BaseMessageLoop::IOTask::StopWatching() {
334   // This is safe to call even if we are not watching for it.
335   fd_watcher_.StopWatchingFileDescriptor();
336 }
337 
OnFileCanReadWithoutBlocking(int)338 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
339   OnFileReady();
340 }
341 
OnFileCanWriteWithoutBlocking(int)342 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
343   OnFileReady();
344 }
345 
OnFileReady()346 void BaseMessageLoop::IOTask::OnFileReady() {
347   // For file descriptors marked with the immediate_run flag, we don't call
348   // StopWatching() and wait, instead we dispatch the callback immediately.
349   if (immediate_run_) {
350     posted_task_pending_ = true;
351     OnFileReadyPostedTask();
352     return;
353   }
354 
355   // When the file descriptor becomes available we stop watching for it and
356   // schedule a task to run the callback from the main loop. The callback will
357   // run using the same scheduler used to run other delayed tasks, avoiding
358   // starvation of the available posted tasks if there are file descriptors
359   // always available. The new posted task will use the same TaskId as the
360   // current file descriptor watching task an could be canceled in either state,
361   // when waiting for the file descriptor or waiting in the main loop.
362   StopWatching();
363   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
364       location_,
365       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
366                  loop_->weak_ptr_factory_.GetWeakPtr(),
367                  task_id_));
368   posted_task_pending_ = true;
369   if (base_scheduled) {
370     DVLOG_LOC(location_, 1)
371         << "Dispatching task_id " << task_id_ << " for "
372         << (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
373             "reading" : "writing")
374         << " file descriptor " << fd_ << ", scheduled from this location.";
375   } else {
376     // In the rare case that PostTask() fails, we fall back to run it directly.
377     // This would indicate a bigger problem with the message loop setup.
378     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
379     OnFileReadyPostedTask();
380   }
381 }
382 
OnFileReadyPostedTask()383 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
384   // We can't access |this| after running the |closure_| since it could call
385   // CancelTask on its own task_id, so we copy the members we need now.
386   BaseMessageLoop* loop_ptr = loop_;
387   DCHECK(posted_task_pending_ = true);
388   posted_task_pending_ = false;
389 
390   // If this task was already canceled, the closure will be null and there is
391   // nothing else to do here. This execution doesn't count a step for RunOnce()
392   // unless we have a callback to run.
393   if (closure_.is_null()) {
394     loop_->io_tasks_.erase(task_id_);
395     return;
396   }
397 
398   DVLOG_LOC(location_, 1)
399       << "Running task_id " << task_id_ << " for "
400       << (base_mode_ == base::MessagePumpForIO::WATCH_READ ?
401           "reading" : "writing")
402       << " file descriptor " << fd_ << ", scheduled from this location.";
403 
404   if (persistent_) {
405     // In the persistent case we just run the callback. If this callback cancels
406     // the task id, we can't access |this| anymore, so we re-start watching the
407     // file descriptor before running the callback, unless this is a fd where
408     // we didn't stop watching the file descriptor when it became available.
409     if (!immediate_run_)
410       StartWatching();
411     closure_.Run();
412   } else {
413     // This will destroy |this|, the fd_watcher and therefore stop watching this
414     // file descriptor.
415     Closure closure_copy = std::move(closure_);
416     loop_->io_tasks_.erase(task_id_);
417     // Run the closure from the local copy we just made.
418     closure_copy.Run();
419   }
420 
421   if (loop_ptr->run_once_) {
422     loop_ptr->run_once_ = false;
423     loop_ptr->BreakLoop();
424   }
425 }
426 
CancelTask()427 bool BaseMessageLoop::IOTask::CancelTask() {
428   if (closure_.is_null())
429     return false;
430 
431   DVLOG_LOC(location_, 1)
432       << "Removing task_id " << task_id_ << " scheduled from this location.";
433 
434   if (!posted_task_pending_) {
435     // Destroying the FileDescriptorWatcher implicitly stops watching the file
436     // descriptor. This will delete our instance.
437     loop_->io_tasks_.erase(task_id_);
438     return true;
439   }
440   // The IOTask is waiting for the message loop to run its delayed task, so
441   // it is not watching for the file descriptor. We release the closure
442   // resources now but keep the IOTask instance alive while we wait for the
443   // callback to run and delete the IOTask.
444   closure_ = Closure();
445   return true;
446 }
447 
448 }  // namespace brillo
449