1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/message_pump/handle_watcher.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <map>
11
12 #include "base/atomic_sequence_num.h"
13 #include "base/bind.h"
14 #include "base/lazy_instance.h"
15 #include "base/logging.h"
16 #include "base/macros.h"
17 #include "base/memory/singleton.h"
18 #include "base/memory/weak_ptr.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/single_thread_task_runner.h"
21 #include "base/synchronization/lock.h"
22 #include "base/threading/thread.h"
23 #include "base/threading/thread_task_runner_handle.h"
24 #include "base/time/time.h"
25 #include "mojo/message_pump/message_pump_mojo.h"
26 #include "mojo/message_pump/message_pump_mojo_handler.h"
27 #include "mojo/message_pump/time_helper.h"
28 #include "mojo/public/c/system/message_pipe.h"
29
30 namespace mojo {
31 namespace common {
32
33 typedef int WatcherID;
34
35 namespace {
36
37 const char kWatcherThreadName[] = "handle-watcher-thread";
38
MojoDeadlineToTimeTicks(MojoDeadline deadline)39 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
40 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
41 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
42 }
43
44 // Tracks the data for a single call to Start().
45 struct WatchData {
WatchDatamojo::common::__anon2a64ae720111::WatchData46 WatchData()
47 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
48
49 WatcherID id;
50 Handle handle;
51 MojoHandleSignals handle_signals;
52 base::TimeTicks deadline;
53 base::Callback<void(MojoResult)> callback;
54 scoped_refptr<base::SingleThreadTaskRunner> task_runner;
55 };
56
57 // WatcherBackend --------------------------------------------------------------
58
59 // WatcherBackend is responsible for managing the requests and interacting with
60 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
61 // thread WatcherThreadManager creates.
62 class WatcherBackend : public MessagePumpMojoHandler {
63 public:
64 WatcherBackend();
65 ~WatcherBackend() override;
66
67 void StartWatching(const WatchData& data);
68 void StopWatching(WatcherID watcher_id);
69
70 private:
71 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
72
73 // Invoked when a handle needs to be removed and notified.
74 void RemoveAndNotify(const Handle& handle, MojoResult result);
75
76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
77 // and sets |handle| to the Handle. Returns false if not a known id.
78 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
79
80 // MessagePumpMojoHandler overrides:
81 void OnHandleReady(const Handle& handle) override;
82 void OnHandleError(const Handle& handle, MojoResult result) override;
83
84 // Maps from assigned id to WatchData.
85 HandleToWatchDataMap handle_to_data_;
86
87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
88 };
89
WatcherBackend()90 WatcherBackend::WatcherBackend() {
91 }
92
~WatcherBackend()93 WatcherBackend::~WatcherBackend() {
94 }
95
StartWatching(const WatchData & data)96 void WatcherBackend::StartWatching(const WatchData& data) {
97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
98
99 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
100
101 handle_to_data_[data.handle] = data;
102 MessagePumpMojo::current()->AddHandler(this, data.handle,
103 data.handle_signals,
104 data.deadline);
105 }
106
StopWatching(WatcherID watcher_id)107 void WatcherBackend::StopWatching(WatcherID watcher_id) {
108 // Because of the thread hop it is entirely possible to get here and not
109 // have a valid handle registered for |watcher_id|.
110 Handle handle;
111 if (!GetMojoHandleByWatcherID(watcher_id, &handle))
112 return;
113
114 handle_to_data_.erase(handle);
115 MessagePumpMojo::current()->RemoveHandler(handle);
116 }
117
RemoveAndNotify(const Handle & handle,MojoResult result)118 void WatcherBackend::RemoveAndNotify(const Handle& handle,
119 MojoResult result) {
120 if (handle_to_data_.count(handle) == 0)
121 return;
122
123 const WatchData data(handle_to_data_[handle]);
124 handle_to_data_.erase(handle);
125 MessagePumpMojo::current()->RemoveHandler(handle);
126
127 data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
128 }
129
GetMojoHandleByWatcherID(WatcherID watcher_id,Handle * handle) const130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
131 Handle* handle) const {
132 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
133 i != handle_to_data_.end(); ++i) {
134 if (i->second.id == watcher_id) {
135 *handle = i->second.handle;
136 return true;
137 }
138 }
139 return false;
140 }
141
OnHandleReady(const Handle & handle)142 void WatcherBackend::OnHandleReady(const Handle& handle) {
143 RemoveAndNotify(handle, MOJO_RESULT_OK);
144 }
145
OnHandleError(const Handle & handle,MojoResult result)146 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
147 RemoveAndNotify(handle, result);
148 }
149
150 // WatcherThreadManager --------------------------------------------------------
151
152 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend.
154 class WatcherThreadManager {
155 public:
156 ~WatcherThreadManager();
157
158 // Returns the shared instance.
159 static WatcherThreadManager* GetInstance();
160
161 // Starts watching the requested handle. Returns a unique ID that is used to
162 // stop watching the handle. When the handle is ready |callback| is notified
163 // on the thread StartWatching() was invoked on.
164 // This may be invoked on any thread.
165 WatcherID StartWatching(const Handle& handle,
166 MojoHandleSignals handle_signals,
167 base::TimeTicks deadline,
168 const base::Callback<void(MojoResult)>& callback);
169
170 // Stops watching a handle.
171 // This may be invoked on any thread.
172 void StopWatching(WatcherID watcher_id);
173
174 private:
175 enum RequestType {
176 REQUEST_START,
177 REQUEST_STOP,
178 };
179
180 // See description of |requests_| for details.
181 struct RequestData {
RequestDatamojo::common::__anon2a64ae720111::WatcherThreadManager::RequestData182 RequestData() : type(REQUEST_START), stop_id(0) {}
183
184 RequestType type;
185 WatchData start_data;
186 WatcherID stop_id;
187 };
188
189 typedef std::vector<RequestData> Requests;
190
191 friend struct base::DefaultSingletonTraits<WatcherThreadManager>;
192
193 WatcherThreadManager();
194
195 // Schedules a request on the background thread. See |requests_| for details.
196 void AddRequest(const RequestData& data);
197
198 // Processes requests added to |requests_|. This is invoked on the backend
199 // thread.
200 void ProcessRequestsOnBackendThread();
201
202 base::Thread thread_;
203
204 base::AtomicSequenceNumber watcher_id_generator_;
205
206 WatcherBackend backend_;
207
208 // Protects |requests_|.
209 base::Lock lock_;
210
211 // Start/Stop result in adding a RequestData to |requests_| (protected by
212 // |lock_|). When the background thread wakes up it processes the requests.
213 Requests requests_;
214
215 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
216 };
217
~WatcherThreadManager()218 WatcherThreadManager::~WatcherThreadManager() {
219 thread_.Stop();
220 }
221
GetInstance()222 WatcherThreadManager* WatcherThreadManager::GetInstance() {
223 return base::Singleton<WatcherThreadManager>::get();
224 }
225
StartWatching(const Handle & handle,MojoHandleSignals handle_signals,base::TimeTicks deadline,const base::Callback<void (MojoResult)> & callback)226 WatcherID WatcherThreadManager::StartWatching(
227 const Handle& handle,
228 MojoHandleSignals handle_signals,
229 base::TimeTicks deadline,
230 const base::Callback<void(MojoResult)>& callback) {
231 RequestData request_data;
232 request_data.type = REQUEST_START;
233 request_data.start_data.id = watcher_id_generator_.GetNext();
234 request_data.start_data.handle = handle;
235 request_data.start_data.callback = callback;
236 request_data.start_data.handle_signals = handle_signals;
237 request_data.start_data.deadline = deadline;
238 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
239 AddRequest(request_data);
240 return request_data.start_data.id;
241 }
242
StopWatching(WatcherID watcher_id)243 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
244 // Handle the case of StartWatching() followed by StopWatching() before
245 // |thread_| woke up.
246 {
247 base::AutoLock auto_lock(lock_);
248 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
249 if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
250 // Watcher ids are not reused, so if we find it we can stop.
251 requests_.erase(i);
252 return;
253 }
254 }
255 }
256
257 RequestData request_data;
258 request_data.type = REQUEST_STOP;
259 request_data.stop_id = watcher_id;
260 AddRequest(request_data);
261 }
262
AddRequest(const RequestData & data)263 void WatcherThreadManager::AddRequest(const RequestData& data) {
264 {
265 base::AutoLock auto_lock(lock_);
266 const bool was_empty = requests_.empty();
267 requests_.push_back(data);
268 if (!was_empty)
269 return;
270 }
271
272 // We outlive |thread_|, so it's safe to use Unretained() here.
273 thread_.task_runner()->PostTask(
274 FROM_HERE,
275 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
276 base::Unretained(this)));
277 }
278
ProcessRequestsOnBackendThread()279 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
280 DCHECK(thread_.task_runner()->BelongsToCurrentThread());
281
282 Requests requests;
283 {
284 base::AutoLock auto_lock(lock_);
285 requests_.swap(requests);
286 }
287 for (size_t i = 0; i < requests.size(); ++i) {
288 if (requests[i].type == REQUEST_START) {
289 backend_.StartWatching(requests[i].start_data);
290 } else {
291 backend_.StopWatching(requests[i].stop_id);
292 }
293 }
294 }
295
WatcherThreadManager()296 WatcherThreadManager::WatcherThreadManager()
297 : thread_(kWatcherThreadName) {
298 base::Thread::Options thread_options;
299 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
300 thread_.StartWithOptions(thread_options);
301 }
302
303 } // namespace
304
305 // HandleWatcher::StateBase and subclasses -------------------------------------
306
307 // The base class of HandleWatcher's state. Owns the user's callback and
308 // monitors the current thread's MessageLoop to know when to force the callback
309 // to run (with an error) even though the pipe hasn't been signaled yet.
310 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
311 public:
StateBase(HandleWatcher * watcher,const base::Callback<void (MojoResult)> & callback)312 StateBase(HandleWatcher* watcher,
313 const base::Callback<void(MojoResult)>& callback)
314 : watcher_(watcher),
315 callback_(callback),
316 got_ready_(false) {
317 base::MessageLoop::current()->AddDestructionObserver(this);
318 }
319
~StateBase()320 ~StateBase() override {
321 base::MessageLoop::current()->RemoveDestructionObserver(this);
322 }
323
324 protected:
NotifyHandleReady(MojoResult result)325 void NotifyHandleReady(MojoResult result) {
326 got_ready_ = true;
327 NotifyAndDestroy(result);
328 }
329
got_ready() const330 bool got_ready() const { return got_ready_; }
331
332 private:
WillDestroyCurrentMessageLoop()333 void WillDestroyCurrentMessageLoop() override {
334 // The current thread is exiting. Simulate a watch error.
335 NotifyAndDestroy(MOJO_RESULT_ABORTED);
336 }
337
NotifyAndDestroy(MojoResult result)338 void NotifyAndDestroy(MojoResult result) {
339 base::Callback<void(MojoResult)> callback = callback_;
340 watcher_->Stop(); // Destroys |this|.
341
342 callback.Run(result);
343 }
344
345 HandleWatcher* watcher_;
346 base::Callback<void(MojoResult)> callback_;
347
348 // Have we been notified that the handle is ready?
349 bool got_ready_;
350
351 DISALLOW_COPY_AND_ASSIGN(StateBase);
352 };
353
354 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
355 // SameThreadWatchingState is used to directly watch the handle on the same
356 // thread.
357 class HandleWatcher::SameThreadWatchingState : public StateBase,
358 public MessagePumpMojoHandler {
359 public:
SameThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)360 SameThreadWatchingState(HandleWatcher* watcher,
361 const Handle& handle,
362 MojoHandleSignals handle_signals,
363 MojoDeadline deadline,
364 const base::Callback<void(MojoResult)>& callback)
365 : StateBase(watcher, callback),
366 handle_(handle) {
367 DCHECK(MessagePumpMojo::IsCurrent());
368
369 MessagePumpMojo::current()->AddHandler(
370 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
371 }
372
~SameThreadWatchingState()373 ~SameThreadWatchingState() override {
374 if (!got_ready())
375 MessagePumpMojo::current()->RemoveHandler(handle_);
376 }
377
378 private:
379 // MessagePumpMojoHandler overrides:
OnHandleReady(const Handle & handle)380 void OnHandleReady(const Handle& handle) override {
381 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
382 }
383
OnHandleError(const Handle & handle,MojoResult result)384 void OnHandleError(const Handle& handle, MojoResult result) override {
385 StopWatchingAndNotifyReady(handle, result);
386 }
387
StopWatchingAndNotifyReady(const Handle & handle,MojoResult result)388 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
389 DCHECK_EQ(handle.value(), handle_.value());
390 MessagePumpMojo::current()->RemoveHandler(handle_);
391 NotifyHandleReady(result);
392 }
393
394 Handle handle_;
395
396 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
397 };
398
399 // If the thread on which HandleWatcher is used runs a message pump different
400 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
401 // handle on the handle watcher thread.
402 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
403 public:
SecondaryThreadWatchingState(HandleWatcher * watcher,const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)404 SecondaryThreadWatchingState(HandleWatcher* watcher,
405 const Handle& handle,
406 MojoHandleSignals handle_signals,
407 MojoDeadline deadline,
408 const base::Callback<void(MojoResult)>& callback)
409 : StateBase(watcher, callback),
410 weak_factory_(this) {
411 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
412 handle,
413 handle_signals,
414 MojoDeadlineToTimeTicks(deadline),
415 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
416 weak_factory_.GetWeakPtr()));
417 }
418
~SecondaryThreadWatchingState()419 ~SecondaryThreadWatchingState() override {
420 // If we've been notified the handle is ready (|got_ready()| is true) then
421 // the watch has been implicitly removed by
422 // WatcherThreadManager/MessagePumpMojo and we don't have to call
423 // StopWatching(). To do so would needlessly entail posting a task and
424 // blocking until the background thread services it.
425 if (!got_ready())
426 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
427 }
428
429 private:
430 WatcherID watcher_id_;
431
432 // Used to weakly bind |this| to the WatcherThreadManager.
433 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
434
435 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
436 };
437
438 // HandleWatcher ---------------------------------------------------------------
439
HandleWatcher()440 HandleWatcher::HandleWatcher() {
441 }
442
~HandleWatcher()443 HandleWatcher::~HandleWatcher() {
444 }
445
Start(const Handle & handle,MojoHandleSignals handle_signals,MojoDeadline deadline,const base::Callback<void (MojoResult)> & callback)446 void HandleWatcher::Start(const Handle& handle,
447 MojoHandleSignals handle_signals,
448 MojoDeadline deadline,
449 const base::Callback<void(MojoResult)>& callback) {
450 DCHECK(handle.is_valid());
451 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
452
453 // Need to clear the state before creating a new one.
454 state_.reset();
455 if (MessagePumpMojo::IsCurrent()) {
456 state_.reset(new SameThreadWatchingState(
457 this, handle, handle_signals, deadline, callback));
458 } else {
459 #if !defined(OFFICIAL_BUILD)
460 // Just for making debugging non-transferable message pipes easier. Since
461 // they can't be sent after they're read/written/listened to,
462 // MessagePipeDispatcher saves the callstack of when it's "bound" to a
463 // pipe id. Triggering a read here, instead of later in the PostTask, means
464 // we have a callstack that is useful to check if the pipe is erronously
465 // attempted to be sent.
466 uint32_t temp = 0;
467 MojoReadMessage(handle.value(), nullptr, &temp, nullptr, nullptr,
468 MOJO_READ_MESSAGE_FLAG_NONE);
469 #endif
470 state_.reset(new SecondaryThreadWatchingState(
471 this, handle, handle_signals, deadline, callback));
472 }
473 }
474
Stop()475 void HandleWatcher::Stop() {
476 state_.reset();
477 }
478
479 } // namespace common
480 } // namespace mojo
481