1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/message_pump/message_pump_mojo.h"
6 
7 #include <stdint.h>
8 
9 #include <algorithm>
10 #include <map>
11 #include <vector>
12 
13 #include "base/containers/small_map.h"
14 #include "base/debug/alias.h"
15 #include "base/lazy_instance.h"
16 #include "base/logging.h"
17 #include "base/threading/thread_local.h"
18 #include "base/threading/thread_restrictions.h"
19 #include "base/time/time.h"
20 #include "mojo/message_pump/message_pump_mojo_handler.h"
21 #include "mojo/message_pump/time_helper.h"
22 #include "mojo/public/c/system/wait_set.h"
23 
24 namespace mojo {
25 namespace common {
26 namespace {
27 
28 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
29     g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
30 
TimeTicksToMojoDeadline(base::TimeTicks time_ticks,base::TimeTicks now)31 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
32                                      base::TimeTicks now) {
33   // The is_null() check matches that of HandleWatcher as well as how
34   // |delayed_work_time| is used.
35   if (time_ticks.is_null())
36     return MOJO_DEADLINE_INDEFINITE;
37   const int64_t delta = (time_ticks - now).InMicroseconds();
38   return delta < 0 ? static_cast<MojoDeadline>(0) :
39                      static_cast<MojoDeadline>(delta);
40 }
41 
42 }  // namespace
43 
44 struct MessagePumpMojo::RunState {
RunStatemojo::common::MessagePumpMojo::RunState45   RunState() : should_quit(false) {}
46 
47   base::TimeTicks delayed_work_time;
48 
49   bool should_quit;
50 };
51 
MessagePumpMojo()52 MessagePumpMojo::MessagePumpMojo()
53     : run_state_(NULL),
54       next_handler_id_(0),
55       event_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
56              base::WaitableEvent::InitialState::NOT_SIGNALED) {
57   DCHECK(!current())
58       << "There is already a MessagePumpMojo instance on this thread.";
59   g_tls_current_pump.Pointer()->Set(this);
60 
61   MojoResult result = CreateMessagePipe(nullptr, &read_handle_, &write_handle_);
62   CHECK_EQ(result, MOJO_RESULT_OK);
63   CHECK(read_handle_.is_valid());
64   CHECK(write_handle_.is_valid());
65 
66   MojoHandle handle;
67   result = MojoCreateWaitSet(&handle);
68   CHECK_EQ(result, MOJO_RESULT_OK);
69   wait_set_handle_.reset(Handle(handle));
70   CHECK(wait_set_handle_.is_valid());
71 
72   result =
73       MojoAddHandle(wait_set_handle_.get().value(), read_handle_.get().value(),
74                     MOJO_HANDLE_SIGNAL_READABLE);
75   CHECK_EQ(result, MOJO_RESULT_OK);
76 }
77 
~MessagePumpMojo()78 MessagePumpMojo::~MessagePumpMojo() {
79   DCHECK_EQ(this, current());
80   g_tls_current_pump.Pointer()->Set(NULL);
81 }
82 
83 // static
Create()84 std::unique_ptr<base::MessagePump> MessagePumpMojo::Create() {
85   return std::unique_ptr<MessagePump>(new MessagePumpMojo());
86 }
87 
88 // static
current()89 MessagePumpMojo* MessagePumpMojo::current() {
90   return g_tls_current_pump.Pointer()->Get();
91 }
92 
AddHandler(MessagePumpMojoHandler * handler,const Handle & handle,MojoHandleSignals wait_signals,base::TimeTicks deadline)93 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
94                                  const Handle& handle,
95                                  MojoHandleSignals wait_signals,
96                                  base::TimeTicks deadline) {
97   CHECK(handler);
98   DCHECK(handle.is_valid());
99   // Assume it's an error if someone tries to reregister an existing handle.
100   CHECK_EQ(0u, handlers_.count(handle));
101   Handler handler_data;
102   handler_data.handler = handler;
103   handler_data.wait_signals = wait_signals;
104   handler_data.deadline = deadline;
105   handler_data.id = next_handler_id_++;
106   handlers_[handle] = handler_data;
107   if (!deadline.is_null()) {
108     bool inserted = deadline_handles_.insert(handle).second;
109     DCHECK(inserted);
110   }
111 
112   MojoResult result = MojoAddHandle(wait_set_handle_.get().value(),
113                                     handle.value(), wait_signals);
114   // Because stopping a HandleWatcher is now asynchronous, it's possible for the
115   // handle to no longer be open at this point.
116   CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_INVALID_ARGUMENT);
117 }
118 
RemoveHandler(const Handle & handle)119 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
120   MojoResult result =
121       MojoRemoveHandle(wait_set_handle_.get().value(), handle.value());
122   // At this point, it's possible that the handle has been closed, which would
123   // cause MojoRemoveHandle() to return MOJO_RESULT_INVALID_ARGUMENT. It's also
124   // possible for the handle to have already been removed, so all of the
125   // possible error codes are valid here.
126   CHECK(result == MOJO_RESULT_OK || result == MOJO_RESULT_NOT_FOUND ||
127         result == MOJO_RESULT_INVALID_ARGUMENT);
128 
129   handlers_.erase(handle);
130   deadline_handles_.erase(handle);
131 }
132 
AddObserver(Observer * observer)133 void MessagePumpMojo::AddObserver(Observer* observer) {
134   observers_.AddObserver(observer);
135 }
136 
RemoveObserver(Observer * observer)137 void MessagePumpMojo::RemoveObserver(Observer* observer) {
138   observers_.RemoveObserver(observer);
139 }
140 
Run(Delegate * delegate)141 void MessagePumpMojo::Run(Delegate* delegate) {
142   RunState run_state;
143   RunState* old_state = NULL;
144   {
145     base::AutoLock auto_lock(run_state_lock_);
146     old_state = run_state_;
147     run_state_ = &run_state;
148   }
149   DoRunLoop(&run_state, delegate);
150   {
151     base::AutoLock auto_lock(run_state_lock_);
152     run_state_ = old_state;
153   }
154 }
155 
Quit()156 void MessagePumpMojo::Quit() {
157   base::AutoLock auto_lock(run_state_lock_);
158   if (run_state_)
159     run_state_->should_quit = true;
160 }
161 
ScheduleWork()162 void MessagePumpMojo::ScheduleWork() {
163   SignalControlPipe();
164 }
165 
ScheduleDelayedWork(const base::TimeTicks & delayed_work_time)166 void MessagePumpMojo::ScheduleDelayedWork(
167     const base::TimeTicks& delayed_work_time) {
168   base::AutoLock auto_lock(run_state_lock_);
169   if (!run_state_)
170     return;
171   run_state_->delayed_work_time = delayed_work_time;
172 }
173 
DoRunLoop(RunState * run_state,Delegate * delegate)174 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
175   bool more_work_is_plausible = true;
176   for (;;) {
177     const bool block = !more_work_is_plausible;
178     if (read_handle_.is_valid()) {
179       more_work_is_plausible = DoInternalWork(*run_state, block);
180     } else {
181       more_work_is_plausible = DoNonMojoWork(*run_state, block);
182     }
183 
184     if (run_state->should_quit)
185       break;
186 
187     more_work_is_plausible |= delegate->DoWork();
188     if (run_state->should_quit)
189       break;
190 
191     more_work_is_plausible |= delegate->DoDelayedWork(
192         &run_state->delayed_work_time);
193     if (run_state->should_quit)
194       break;
195 
196     if (more_work_is_plausible)
197       continue;
198 
199     more_work_is_plausible = delegate->DoIdleWork();
200     if (run_state->should_quit)
201       break;
202   }
203 }
204 
DoInternalWork(const RunState & run_state,bool block)205 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
206   bool did_work = block;
207   if (block) {
208     // If the wait isn't blocking (deadline == 0), there's no point in waiting.
209     // Wait sets do not require a wait operation to be performed in order to
210     // retreive any ready handles. Performing a wait with deadline == 0 is
211     // unnecessary work.
212     did_work = WaitForReadyHandles(run_state);
213   }
214 
215   did_work |= ProcessReadyHandles();
216   did_work |= RemoveExpiredHandles();
217 
218   return did_work;
219 }
220 
DoNonMojoWork(const RunState & run_state,bool block)221 bool MessagePumpMojo::DoNonMojoWork(const RunState& run_state, bool block) {
222   bool did_work = block;
223   if (block) {
224     const MojoDeadline deadline = GetDeadlineForWait(run_state);
225     // Stolen from base/message_loop/message_pump_default.cc
226     base::ThreadRestrictions::ScopedAllowWait allow_wait;
227     if (deadline == MOJO_DEADLINE_INDEFINITE) {
228       event_.Wait();
229     } else {
230       if (deadline > 0) {
231         event_.TimedWait(base::TimeDelta::FromMicroseconds(deadline));
232       } else {
233         did_work = false;
234       }
235     }
236     // Since event_ is auto-reset, we don't need to do anything special here
237     // other than service each delegate method.
238   }
239 
240   did_work |= RemoveExpiredHandles();
241 
242   return did_work;
243 }
244 
WaitForReadyHandles(const RunState & run_state) const245 bool MessagePumpMojo::WaitForReadyHandles(const RunState& run_state) const {
246   const MojoDeadline deadline = GetDeadlineForWait(run_state);
247   const MojoResult wait_result = Wait(
248       wait_set_handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
249   if (wait_result == MOJO_RESULT_OK) {
250     // Handles may be ready. Or not since wake-ups can be spurious in certain
251     // circumstances.
252     return true;
253   } else if (wait_result == MOJO_RESULT_DEADLINE_EXCEEDED) {
254     return false;
255   }
256 
257   base::debug::Alias(&wait_result);
258   // Unexpected result is likely fatal, crash so we can determine cause.
259   CHECK(false);
260   return false;
261 }
262 
ProcessReadyHandles()263 bool MessagePumpMojo::ProcessReadyHandles() {
264   // Maximum number of handles to retrieve and process. Experimentally, the 95th
265   // percentile is 1 handle, and the long-term average is 1.1. However, this has
266   // been seen to reach >10 under heavy load. 8 is a hand-wavy compromise.
267   const uint32_t kMaxServiced = 8;
268   uint32_t num_ready_handles = kMaxServiced;
269   MojoHandle handles[kMaxServiced];
270   MojoResult handle_results[kMaxServiced];
271 
272   const MojoResult get_result =
273       MojoGetReadyHandles(wait_set_handle_.get().value(), &num_ready_handles,
274                           handles, handle_results, nullptr);
275   CHECK(get_result == MOJO_RESULT_OK || get_result == MOJO_RESULT_SHOULD_WAIT);
276   if (get_result != MOJO_RESULT_OK)
277     return false;
278 
279   DCHECK(num_ready_handles);
280   DCHECK_LE(num_ready_handles, kMaxServiced);
281   // Do this in two steps, because notifying a handler may remove/add other
282   // handles that may have also been woken up.
283   // First, enumerate the IDs of the ready handles. Then, iterate over the
284   // handles and only take action if the ID hasn't changed.
285   // Since the size of this map is bounded by |kMaxServiced|, use a SmallMap to
286   // avoid the per-element allocation.
287   base::SmallMap<std::map<Handle, int>, kMaxServiced> ready_handles;
288   for (uint32_t i = 0; i < num_ready_handles; i++) {
289     const Handle handle = Handle(handles[i]);
290     // Skip the control handle. It's special.
291     if (handle.value() == read_handle_.get().value())
292       continue;
293     DCHECK(handle.is_valid());
294     const auto it = handlers_.find(handle);
295     // Skip handles that have been removed. This is possible because
296     // RemoveHandler() can be called with a handle that has been closed. Because
297     // the handle is closed, the MojoRemoveHandle() call in RemoveHandler()
298     // would have failed, but the handle is still in the wait set. Once the
299     // handle is retrieved using MojoGetReadyHandles(), it is implicitly removed
300     // from the set. The result is either the pending result that existed when
301     // the handle was closed, or |MOJO_RESULT_CANCELLED| to indicate that the
302     // handle was closed.
303     if (it == handlers_.end())
304       continue;
305     ready_handles[handle] = it->second.id;
306   }
307 
308   for (uint32_t i = 0; i < num_ready_handles; i++) {
309     const Handle handle = Handle(handles[i]);
310 
311     // If the handle has been removed, or it's ID has changed, skip over it.
312     // If the handle's ID has changed, and it still satisfies its signals,
313     // then it'll be caught in the next message pump iteration.
314     const auto it = handlers_.find(handle);
315     if ((handle.value() != read_handle_.get().value()) &&
316         (it == handlers_.end() || it->second.id != ready_handles[handle])) {
317       continue;
318     }
319 
320     switch (handle_results[i]) {
321       case MOJO_RESULT_CANCELLED:
322       case MOJO_RESULT_FAILED_PRECONDITION:
323         DVLOG(1) << "Error: " << handle_results[i]
324                  << " handle: " << handle.value();
325         if (handle.value() == read_handle_.get().value()) {
326           // The Mojo EDK is shutting down. We can't just quit the message pump
327           // because that may cause the thread to quit, which causes the
328           // thread's MessageLoop to be destroyed, which races with any use of
329           // |Thread::task_runner()|. So instead, we enter a "dumb" mode which
330           // bypasses Mojo and just acts like a trivial message pump. That way,
331           // we can wait for the usual thread exiting mechanism to happen.
332           // The dumb mode is indicated by releasing the control pipe's read
333           // handle.
334           read_handle_.reset();
335         } else {
336           SignalHandleError(handle, handle_results[i]);
337         }
338         break;
339       case MOJO_RESULT_OK:
340         if (handle.value() == read_handle_.get().value()) {
341           DVLOG(1) << "Signaled control pipe";
342           // Control pipe was written to.
343           ReadMessageRaw(read_handle_.get(), nullptr, nullptr, nullptr, nullptr,
344                          MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
345         } else {
346           DVLOG(1) << "Handle ready: " << handle.value();
347           SignalHandleReady(handle);
348         }
349         break;
350       default:
351         base::debug::Alias(&i);
352         base::debug::Alias(&handle_results[i]);
353         // Unexpected result is likely fatal, crash so we can determine cause.
354         CHECK(false);
355     }
356   }
357   return true;
358 }
359 
RemoveExpiredHandles()360 bool MessagePumpMojo::RemoveExpiredHandles() {
361   bool removed = false;
362   // Notify and remove any handlers whose time has expired. First, iterate over
363   // the set of handles that have a deadline, and add the expired handles to a
364   // map of <Handle, id>. Then, iterate over those expired handles and remove
365   // them. The two-step process is because a handler can add/remove new
366   // handlers.
367   std::map<Handle, int> expired_handles;
368   const base::TimeTicks now(internal::NowTicks());
369   for (const Handle handle : deadline_handles_) {
370     const auto it = handlers_.find(handle);
371     // Expect any handle in |deadline_handles_| to also be in |handlers_| since
372     // the two are modified in lock-step.
373     DCHECK(it != handlers_.end());
374     if (!it->second.deadline.is_null() && it->second.deadline < now)
375       expired_handles[handle] = it->second.id;
376   }
377   for (const auto& pair : expired_handles) {
378     auto it = handlers_.find(pair.first);
379     // Don't need to check deadline again since it can't change if id hasn't
380     // changed.
381     if (it != handlers_.end() && it->second.id == pair.second) {
382       SignalHandleError(pair.first, MOJO_RESULT_DEADLINE_EXCEEDED);
383       removed = true;
384     }
385   }
386   return removed;
387 }
388 
SignalControlPipe()389 void MessagePumpMojo::SignalControlPipe() {
390   const MojoResult result =
391       WriteMessageRaw(write_handle_.get(), NULL, 0, NULL, 0,
392                       MOJO_WRITE_MESSAGE_FLAG_NONE);
393   if (result == MOJO_RESULT_FAILED_PRECONDITION) {
394     // Mojo EDK is shutting down.
395     event_.Signal();
396     return;
397   }
398 
399   // If we can't write we likely won't wake up the thread and there is a strong
400   // chance we'll deadlock.
401   CHECK_EQ(MOJO_RESULT_OK, result);
402 }
403 
GetDeadlineForWait(const RunState & run_state) const404 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
405     const RunState& run_state) const {
406   const base::TimeTicks now(internal::NowTicks());
407   MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
408                                                   now);
409   for (const Handle handle : deadline_handles_) {
410     auto it = handlers_.find(handle);
411     DCHECK(it != handlers_.end());
412     deadline = std::min(
413         TimeTicksToMojoDeadline(it->second.deadline, now), deadline);
414   }
415   return deadline;
416 }
417 
SignalHandleReady(Handle handle)418 void MessagePumpMojo::SignalHandleReady(Handle handle) {
419   auto it = handlers_.find(handle);
420   DCHECK(it != handlers_.end());
421   MessagePumpMojoHandler* handler = it->second.handler;
422 
423   WillSignalHandler();
424   handler->OnHandleReady(handle);
425   DidSignalHandler();
426 }
427 
SignalHandleError(Handle handle,MojoResult result)428 void MessagePumpMojo::SignalHandleError(Handle handle, MojoResult result) {
429   auto it = handlers_.find(handle);
430   DCHECK(it != handlers_.end());
431   MessagePumpMojoHandler* handler = it->second.handler;
432 
433   RemoveHandler(handle);
434   WillSignalHandler();
435   handler->OnHandleError(handle, result);
436   DidSignalHandler();
437 }
438 
WillSignalHandler()439 void MessagePumpMojo::WillSignalHandler() {
440   FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
441 }
442 
DidSignalHandler()443 void MessagePumpMojo::DidSignalHandler() {
444   FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
445 }
446 
447 }  // namespace common
448 }  // namespace mojo
449