1 // Copyright 2015 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <brillo/message_loops/base_message_loop.h>
6 
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 #ifndef __ANDROID_HOST__
13 // Used for MISC_MAJOR. Only required for the target and not always available
14 // for the host.
15 #include <linux/major.h>
16 #endif
17 
18 #include <vector>
19 
20 #include <base/bind.h>
21 #include <base/files/file_path.h>
22 #include <base/files/file_util.h>
23 #include <base/run_loop.h>
24 #include <base/strings/string_number_conversions.h>
25 #include <base/strings/string_split.h>
26 
27 #include <brillo/location_logging.h>
28 #include <brillo/strings/string_utils.h>
29 
30 using base::Closure;
31 
32 namespace {
33 
34 const char kMiscMinorPath[] = "/proc/misc";
35 const char kBinderDriverName[] = "binder";
36 
37 }  // namespace
38 
39 namespace brillo {
40 
41 const int BaseMessageLoop::kInvalidMinor = -1;
42 const int BaseMessageLoop::kUninitializedMinor = -2;
43 
BaseMessageLoop()44 BaseMessageLoop::BaseMessageLoop() {
45   CHECK(!base::MessageLoop::current())
46       << "You can't create a base::MessageLoopForIO when another "
47          "base::MessageLoop is already created for this thread.";
48   owned_base_loop_.reset(new base::MessageLoopForIO);
49   base_loop_ = owned_base_loop_.get();
50 }
51 
BaseMessageLoop(base::MessageLoopForIO * base_loop)52 BaseMessageLoop::BaseMessageLoop(base::MessageLoopForIO* base_loop)
53     : base_loop_(base_loop) {}
54 
~BaseMessageLoop()55 BaseMessageLoop::~BaseMessageLoop() {
56   for (auto& io_task : io_tasks_) {
57     DVLOG_LOC(io_task.second.location(), 1)
58         << "Removing file descriptor watcher task_id " << io_task.first
59         << " leaked on BaseMessageLoop, scheduled from this location.";
60     io_task.second.StopWatching();
61   }
62 
63   // Note all pending canceled delayed tasks when destroying the message loop.
64   size_t lazily_deleted_tasks = 0;
65   for (const auto& delayed_task : delayed_tasks_) {
66     if (delayed_task.second.closure.is_null()) {
67       lazily_deleted_tasks++;
68     } else {
69       DVLOG_LOC(delayed_task.second.location, 1)
70           << "Removing delayed task_id " << delayed_task.first
71           << " leaked on BaseMessageLoop, scheduled from this location.";
72     }
73   }
74   if (lazily_deleted_tasks) {
75     LOG(INFO) << "Leaking " << lazily_deleted_tasks << " canceled tasks.";
76   }
77 }
78 
PostDelayedTask(const tracked_objects::Location & from_here,const Closure & task,base::TimeDelta delay)79 MessageLoop::TaskId BaseMessageLoop::PostDelayedTask(
80     const tracked_objects::Location& from_here,
81     const Closure &task,
82     base::TimeDelta delay) {
83   TaskId task_id =  NextTaskId();
84   bool base_scheduled = base_loop_->task_runner()->PostDelayedTask(
85       from_here,
86       base::Bind(&BaseMessageLoop::OnRanPostedTask,
87                  weak_ptr_factory_.GetWeakPtr(),
88                  task_id),
89       delay);
90   DVLOG_LOC(from_here, 1) << "Scheduling delayed task_id " << task_id
91                           << " to run in " << delay << ".";
92   if (!base_scheduled)
93     return MessageLoop::kTaskIdNull;
94 
95   delayed_tasks_.emplace(task_id,
96                          DelayedTask{from_here, task_id, std::move(task)});
97   return task_id;
98 }
99 
WatchFileDescriptor(const tracked_objects::Location & from_here,int fd,WatchMode mode,bool persistent,const Closure & task)100 MessageLoop::TaskId BaseMessageLoop::WatchFileDescriptor(
101     const tracked_objects::Location& from_here,
102     int fd,
103     WatchMode mode,
104     bool persistent,
105     const Closure &task) {
106   // base::MessageLoopForIO CHECKS that "fd >= 0", so we handle that case here.
107   if (fd < 0)
108     return MessageLoop::kTaskIdNull;
109 
110   base::MessageLoopForIO::Mode base_mode = base::MessageLoopForIO::WATCH_READ;
111   switch (mode) {
112     case MessageLoop::kWatchRead:
113       base_mode = base::MessageLoopForIO::WATCH_READ;
114       break;
115     case MessageLoop::kWatchWrite:
116       base_mode = base::MessageLoopForIO::WATCH_WRITE;
117       break;
118     default:
119       return MessageLoop::kTaskIdNull;
120   }
121 
122   TaskId task_id =  NextTaskId();
123   auto it_bool = io_tasks_.emplace(
124       std::piecewise_construct,
125       std::forward_as_tuple(task_id),
126       std::forward_as_tuple(
127           from_here, this, task_id, fd, base_mode, persistent, task));
128   // This should always insert a new element.
129   DCHECK(it_bool.second);
130   bool scheduled = it_bool.first->second.StartWatching();
131   DVLOG_LOC(from_here, 1)
132       << "Watching fd " << fd << " for "
133       << (mode == MessageLoop::kWatchRead ? "reading" : "writing")
134       << (persistent ? " persistently" : " just once")
135       << " as task_id " << task_id
136       << (scheduled ? " successfully" : " failed.");
137 
138   if (!scheduled) {
139     io_tasks_.erase(task_id);
140     return MessageLoop::kTaskIdNull;
141   }
142 
143 #ifndef __ANDROID_HOST__
144   // Determine if the passed fd is the binder file descriptor. For that, we need
145   // to check that is a special char device and that the major and minor device
146   // numbers match. The binder file descriptor can't be removed and added back
147   // to an epoll group when there's work available to be done by the file
148   // descriptor due to bugs in the binder driver (b/26524111) when used with
149   // epoll. Therefore, we flag the binder fd and never attempt to remove it.
150   // This may cause the binder file descriptor to be attended with higher
151   // priority and cause starvation of other events.
152   struct stat buf;
153   if (fstat(fd, &buf) == 0 &&
154       S_ISCHR(buf.st_mode) &&
155       major(buf.st_rdev) == MISC_MAJOR &&
156       minor(buf.st_rdev) == GetBinderMinor()) {
157     it_bool.first->second.RunImmediately();
158   }
159 #endif
160 
161   return task_id;
162 }
163 
CancelTask(TaskId task_id)164 bool BaseMessageLoop::CancelTask(TaskId task_id) {
165   if (task_id == kTaskIdNull)
166     return false;
167   auto delayed_task_it = delayed_tasks_.find(task_id);
168   if (delayed_task_it == delayed_tasks_.end()) {
169     // This might be an IOTask then.
170     auto io_task_it = io_tasks_.find(task_id);
171     if (io_task_it == io_tasks_.end())
172       return false;
173     return io_task_it->second.CancelTask();
174   }
175   // A DelayedTask was found for this task_id at this point.
176 
177   // Check if the callback was already canceled but we have the entry in
178   // delayed_tasks_ since it didn't fire yet in the message loop.
179   if (delayed_task_it->second.closure.is_null())
180     return false;
181 
182   DVLOG_LOC(delayed_task_it->second.location, 1)
183       << "Removing task_id " << task_id << " scheduled from this location.";
184   // We reset to closure to a null Closure to release all the resources
185   // used by this closure at this point, but we don't remove the task_id from
186   // delayed_tasks_ since we can't tell base::MessageLoopForIO to not run it.
187   delayed_task_it->second.closure = Closure();
188 
189   return true;
190 }
191 
RunOnce(bool may_block)192 bool BaseMessageLoop::RunOnce(bool may_block) {
193   run_once_ = true;
194   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
195   base_run_loop_ = &run_loop;
196   if (!may_block)
197     run_loop.RunUntilIdle();
198   else
199     run_loop.Run();
200   base_run_loop_ = nullptr;
201   // If the flag was reset to false, it means a closure was run.
202   if (!run_once_)
203     return true;
204 
205   run_once_ = false;
206   return false;
207 }
208 
Run()209 void BaseMessageLoop::Run() {
210   base::RunLoop run_loop;  // Uses the base::MessageLoopForIO implicitly.
211   base_run_loop_ = &run_loop;
212   run_loop.Run();
213   base_run_loop_ = nullptr;
214 }
215 
BreakLoop()216 void BaseMessageLoop::BreakLoop() {
217   if (base_run_loop_ == nullptr) {
218     DVLOG(1) << "Message loop not running, ignoring BreakLoop().";
219     return;  // Message loop not running, nothing to do.
220   }
221   base_run_loop_->Quit();
222 }
223 
QuitClosure() const224 Closure BaseMessageLoop::QuitClosure() const {
225   if (base_run_loop_ == nullptr)
226     return base::Bind(&base::DoNothing);
227   return base_run_loop_->QuitClosure();
228 }
229 
NextTaskId()230 MessageLoop::TaskId BaseMessageLoop::NextTaskId() {
231   TaskId res;
232   do {
233     res = ++last_id_;
234     // We would run out of memory before we run out of task ids.
235   } while (!res ||
236            delayed_tasks_.find(res) != delayed_tasks_.end() ||
237            io_tasks_.find(res) != io_tasks_.end());
238   return res;
239 }
240 
OnRanPostedTask(MessageLoop::TaskId task_id)241 void BaseMessageLoop::OnRanPostedTask(MessageLoop::TaskId task_id) {
242   auto task_it = delayed_tasks_.find(task_id);
243   DCHECK(task_it != delayed_tasks_.end());
244   if (!task_it->second.closure.is_null()) {
245     DVLOG_LOC(task_it->second.location, 1)
246         << "Running delayed task_id " << task_id
247         << " scheduled from this location.";
248     // Mark the task as canceled while we are running it so CancelTask returns
249     // false.
250     Closure closure = std::move(task_it->second.closure);
251     task_it->second.closure = Closure();
252     closure.Run();
253 
254     // If the |run_once_| flag is set, it is because we are instructed to run
255     // only once callback.
256     if (run_once_) {
257       run_once_ = false;
258       BreakLoop();
259     }
260   }
261   delayed_tasks_.erase(task_it);
262 }
263 
OnFileReadyPostedTask(MessageLoop::TaskId task_id)264 void BaseMessageLoop::OnFileReadyPostedTask(MessageLoop::TaskId task_id) {
265   auto task_it = io_tasks_.find(task_id);
266   // Even if this task was canceled while we were waiting in the message loop
267   // for this method to run, the entry in io_tasks_ should still be present, but
268   // won't do anything.
269   DCHECK(task_it != io_tasks_.end());
270   task_it->second.OnFileReadyPostedTask();
271 }
272 
ParseBinderMinor(const std::string & file_contents)273 int BaseMessageLoop::ParseBinderMinor(
274     const std::string& file_contents) {
275   int result = kInvalidMinor;
276   // Split along '\n', then along the ' '. Note that base::SplitString trims all
277   // white spaces at the beginning and end after splitting.
278   std::vector<std::string> lines =
279       base::SplitString(file_contents, "\n", base::TRIM_WHITESPACE,
280                         base::SPLIT_WANT_ALL);
281   for (const std::string& line : lines) {
282     if (line.empty())
283       continue;
284     std::string number;
285     std::string name;
286     if (!string_utils::SplitAtFirst(line, " ", &number, &name, false))
287       continue;
288 
289     if (name == kBinderDriverName && base::StringToInt(number, &result))
290       break;
291   }
292   return result;
293 }
294 
GetBinderMinor()295 unsigned int BaseMessageLoop::GetBinderMinor() {
296   if (binder_minor_ != kUninitializedMinor)
297     return binder_minor_;
298 
299   std::string proc_misc;
300   if (!base::ReadFileToString(base::FilePath(kMiscMinorPath), &proc_misc))
301     return binder_minor_;
302   binder_minor_ = ParseBinderMinor(proc_misc);
303   return binder_minor_;
304 }
305 
IOTask(const tracked_objects::Location & location,BaseMessageLoop * loop,MessageLoop::TaskId task_id,int fd,base::MessageLoopForIO::Mode base_mode,bool persistent,const Closure & task)306 BaseMessageLoop::IOTask::IOTask(const tracked_objects::Location& location,
307                                 BaseMessageLoop* loop,
308                                 MessageLoop::TaskId task_id,
309                                 int fd,
310                                 base::MessageLoopForIO::Mode base_mode,
311                                 bool persistent,
312                                 const Closure& task)
313     : location_(location), loop_(loop), task_id_(task_id),
314       fd_(fd), base_mode_(base_mode), persistent_(persistent), closure_(task) {}
315 
StartWatching()316 bool BaseMessageLoop::IOTask::StartWatching() {
317   return loop_->base_loop_->WatchFileDescriptor(
318       fd_, persistent_, base_mode_, &fd_watcher_, this);
319 }
320 
StopWatching()321 void BaseMessageLoop::IOTask::StopWatching() {
322   // This is safe to call even if we are not watching for it.
323   fd_watcher_.StopWatchingFileDescriptor();
324 }
325 
OnFileCanReadWithoutBlocking(int)326 void BaseMessageLoop::IOTask::OnFileCanReadWithoutBlocking(int /* fd */) {
327   OnFileReady();
328 }
329 
OnFileCanWriteWithoutBlocking(int)330 void BaseMessageLoop::IOTask::OnFileCanWriteWithoutBlocking(int /* fd */) {
331   OnFileReady();
332 }
333 
OnFileReady()334 void BaseMessageLoop::IOTask::OnFileReady() {
335   // For file descriptors marked with the immediate_run flag, we don't call
336   // StopWatching() and wait, instead we dispatch the callback immediately.
337   if (immediate_run_) {
338     posted_task_pending_ = true;
339     OnFileReadyPostedTask();
340     return;
341   }
342 
343   // When the file descriptor becomes available we stop watching for it and
344   // schedule a task to run the callback from the main loop. The callback will
345   // run using the same scheduler used to run other delayed tasks, avoiding
346   // starvation of the available posted tasks if there are file descriptors
347   // always available. The new posted task will use the same TaskId as the
348   // current file descriptor watching task an could be canceled in either state,
349   // when waiting for the file descriptor or waiting in the main loop.
350   StopWatching();
351   bool base_scheduled = loop_->base_loop_->task_runner()->PostTask(
352       location_,
353       base::Bind(&BaseMessageLoop::OnFileReadyPostedTask,
354                  loop_->weak_ptr_factory_.GetWeakPtr(),
355                  task_id_));
356   posted_task_pending_ = true;
357   if (base_scheduled) {
358     DVLOG_LOC(location_, 1)
359         << "Dispatching task_id " << task_id_ << " for "
360         << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
361             "reading" : "writing")
362         << " file descriptor " << fd_ << ", scheduled from this location.";
363   } else {
364     // In the rare case that PostTask() fails, we fall back to run it directly.
365     // This would indicate a bigger problem with the message loop setup.
366     LOG(ERROR) << "Error on base::MessageLoopForIO::PostTask().";
367     OnFileReadyPostedTask();
368   }
369 }
370 
OnFileReadyPostedTask()371 void BaseMessageLoop::IOTask::OnFileReadyPostedTask() {
372   // We can't access |this| after running the |closure_| since it could call
373   // CancelTask on its own task_id, so we copy the members we need now.
374   BaseMessageLoop* loop_ptr = loop_;
375   DCHECK(posted_task_pending_ = true);
376   posted_task_pending_ = false;
377 
378   // If this task was already canceled, the closure will be null and there is
379   // nothing else to do here. This execution doesn't count a step for RunOnce()
380   // unless we have a callback to run.
381   if (closure_.is_null()) {
382     loop_->io_tasks_.erase(task_id_);
383     return;
384   }
385 
386   DVLOG_LOC(location_, 1)
387       << "Running task_id " << task_id_ << " for "
388       << (base_mode_ == base::MessageLoopForIO::WATCH_READ ?
389           "reading" : "writing")
390       << " file descriptor " << fd_ << ", scheduled from this location.";
391 
392   if (persistent_) {
393     // In the persistent case we just run the callback. If this callback cancels
394     // the task id, we can't access |this| anymore, so we re-start watching the
395     // file descriptor before running the callback, unless this is a fd where
396     // we didn't stop watching the file descriptor when it became available.
397     if (!immediate_run_)
398       StartWatching();
399     closure_.Run();
400   } else {
401     // This will destroy |this|, the fd_watcher and therefore stop watching this
402     // file descriptor.
403     Closure closure_copy = std::move(closure_);
404     loop_->io_tasks_.erase(task_id_);
405     // Run the closure from the local copy we just made.
406     closure_copy.Run();
407   }
408 
409   if (loop_ptr->run_once_) {
410     loop_ptr->run_once_ = false;
411     loop_ptr->BreakLoop();
412   }
413 }
414 
CancelTask()415 bool BaseMessageLoop::IOTask::CancelTask() {
416   if (closure_.is_null())
417     return false;
418 
419   DVLOG_LOC(location_, 1)
420       << "Removing task_id " << task_id_ << " scheduled from this location.";
421 
422   if (!posted_task_pending_) {
423     // Destroying the FileDescriptorWatcher implicitly stops watching the file
424     // descriptor. This will delete our instance.
425     loop_->io_tasks_.erase(task_id_);
426     return true;
427   }
428   // The IOTask is waiting for the message loop to run its delayed task, so
429   // it is not watching for the file descriptor. We release the closure
430   // resources now but keep the IOTask instance alive while we wait for the
431   // callback to run and delete the IOTask.
432   closure_ = Closure();
433   return true;
434 }
435 
436 }  // namespace brillo
437