1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stdint.h>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
16 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
17 
18 namespace mojo {
19 
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SingleThreadTaskRunner> runner)20 Connector::Connector(ScopedMessagePipeHandle message_pipe,
21                      ConnectorConfig config,
22                      scoped_refptr<base::SingleThreadTaskRunner> runner)
23     : message_pipe_(std::move(message_pipe)),
24       task_runner_(std::move(runner)),
25       weak_factory_(this) {
26   if (config == MULTI_THREADED_SEND)
27     lock_.emplace();
28 
29   weak_self_ = weak_factory_.GetWeakPtr();
30   // Even though we don't have an incoming receiver, we still want to monitor
31   // the message pipe to know if is closed or encounters an error.
32   WaitToReadMore();
33 }
34 
~Connector()35 Connector::~Connector() {
36   {
37     // Allow for quick destruction on any thread if the pipe is already closed.
38     base::AutoLock lock(connected_lock_);
39     if (!connected_)
40       return;
41   }
42 
43   DCHECK(thread_checker_.CalledOnValidThread());
44   CancelWait();
45 }
46 
CloseMessagePipe()47 void Connector::CloseMessagePipe() {
48   // Throw away the returned message pipe.
49   PassMessagePipe();
50 }
51 
PassMessagePipe()52 ScopedMessagePipeHandle Connector::PassMessagePipe() {
53   DCHECK(thread_checker_.CalledOnValidThread());
54 
55   CancelWait();
56   internal::MayAutoLock locker(&lock_);
57   ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
58   weak_factory_.InvalidateWeakPtrs();
59   sync_handle_watcher_callback_count_ = 0;
60 
61   base::AutoLock lock(connected_lock_);
62   connected_ = false;
63   return message_pipe;
64 }
65 
RaiseError()66 void Connector::RaiseError() {
67   DCHECK(thread_checker_.CalledOnValidThread());
68 
69   HandleError(true, true);
70 }
71 
WaitForIncomingMessage(MojoDeadline deadline)72 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
73   DCHECK(thread_checker_.CalledOnValidThread());
74 
75   if (error_)
76     return false;
77 
78   ResumeIncomingMethodCallProcessing();
79 
80   MojoResult rv =
81       Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
82   if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
83     return false;
84   if (rv != MOJO_RESULT_OK) {
85     // Users that call WaitForIncomingMessage() should expect their code to be
86     // re-entered, so we call the error handler synchronously.
87     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
88     return false;
89   }
90   ignore_result(ReadSingleMessage(&rv));
91   return (rv == MOJO_RESULT_OK);
92 }
93 
PauseIncomingMethodCallProcessing()94 void Connector::PauseIncomingMethodCallProcessing() {
95   DCHECK(thread_checker_.CalledOnValidThread());
96 
97   if (paused_)
98     return;
99 
100   paused_ = true;
101   CancelWait();
102 }
103 
ResumeIncomingMethodCallProcessing()104 void Connector::ResumeIncomingMethodCallProcessing() {
105   DCHECK(thread_checker_.CalledOnValidThread());
106 
107   if (!paused_)
108     return;
109 
110   paused_ = false;
111   WaitToReadMore();
112 }
113 
Accept(Message * message)114 bool Connector::Accept(Message* message) {
115   DCHECK(lock_ || thread_checker_.CalledOnValidThread());
116 
117   // It shouldn't hurt even if |error_| may be changed by a different thread at
118   // the same time. The outcome is that we may write into |message_pipe_| after
119   // encountering an error, which should be fine.
120   if (error_)
121     return false;
122 
123   internal::MayAutoLock locker(&lock_);
124 
125   if (!message_pipe_.is_valid() || drop_writes_)
126     return true;
127 
128   MojoResult rv =
129       WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
130                       MOJO_WRITE_MESSAGE_FLAG_NONE);
131 
132   switch (rv) {
133     case MOJO_RESULT_OK:
134       break;
135     case MOJO_RESULT_FAILED_PRECONDITION:
136       // There's no point in continuing to write to this pipe since the other
137       // end is gone. Avoid writing any future messages. Hide write failures
138       // from the caller since we'd like them to continue consuming any backlog
139       // of incoming messages before regarding the message pipe as closed.
140       drop_writes_ = true;
141       break;
142     case MOJO_RESULT_BUSY:
143       // We'd get a "busy" result if one of the message's handles is:
144       //   - |message_pipe_|'s own handle;
145       //   - simultaneously being used on another thread; or
146       //   - in a "busy" state that prohibits it from being transferred (e.g.,
147       //     a data pipe handle in the middle of a two-phase read/write,
148       //     regardless of which thread that two-phase read/write is happening
149       //     on).
150       // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
151       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
152       // rather than hanging.)
153       CHECK(false) << "Race condition or other bug detected";
154       return false;
155     default:
156       // This particular write was rejected, presumably because of bad input.
157       // The pipe is not necessarily in a bad state.
158       return false;
159   }
160   return true;
161 }
162 
AllowWokenUpBySyncWatchOnSameThread()163 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
164   DCHECK(thread_checker_.CalledOnValidThread());
165 
166   allow_woken_up_by_others_ = true;
167 
168   EnsureSyncWatcherExists();
169   sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
170 }
171 
SyncWatch(const bool * should_stop)172 bool Connector::SyncWatch(const bool* should_stop) {
173   DCHECK(thread_checker_.CalledOnValidThread());
174 
175   if (error_)
176     return false;
177 
178   ResumeIncomingMethodCallProcessing();
179 
180   EnsureSyncWatcherExists();
181   return sync_watcher_->SyncWatch(should_stop);
182 }
183 
SetWatcherHeapProfilerTag(const char * tag)184 void Connector::SetWatcherHeapProfilerTag(const char* tag) {
185   heap_profiler_tag_ = tag;
186   if (handle_watcher_) {
187     handle_watcher_->set_heap_profiler_tag(tag);
188   }
189 }
190 
OnWatcherHandleReady(MojoResult result)191 void Connector::OnWatcherHandleReady(MojoResult result) {
192   OnHandleReadyInternal(result);
193 }
194 
OnSyncHandleWatcherHandleReady(MojoResult result)195 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
196   base::WeakPtr<Connector> weak_self(weak_self_);
197 
198   sync_handle_watcher_callback_count_++;
199   OnHandleReadyInternal(result);
200   // At this point, this object might have been deleted.
201   if (weak_self) {
202     DCHECK_LT(0u, sync_handle_watcher_callback_count_);
203     sync_handle_watcher_callback_count_--;
204   }
205 }
206 
OnHandleReadyInternal(MojoResult result)207 void Connector::OnHandleReadyInternal(MojoResult result) {
208   DCHECK(thread_checker_.CalledOnValidThread());
209 
210   if (result != MOJO_RESULT_OK) {
211     HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
212     return;
213   }
214   ReadAllAvailableMessages();
215   // At this point, this object might have been deleted. Return.
216 }
217 
WaitToReadMore()218 void Connector::WaitToReadMore() {
219   CHECK(!paused_);
220   DCHECK(!handle_watcher_);
221 
222   handle_watcher_.reset(new Watcher(FROM_HERE, task_runner_));
223   if (heap_profiler_tag_)
224     handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
225   MojoResult rv = handle_watcher_->Start(
226       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
227       base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
228 
229   if (rv != MOJO_RESULT_OK) {
230     // If the watch failed because the handle is invalid or its conditions can
231     // no longer be met, we signal the error asynchronously to avoid reentry.
232     task_runner_->PostTask(
233         FROM_HERE,
234         base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
235   }
236 
237   if (allow_woken_up_by_others_) {
238     EnsureSyncWatcherExists();
239     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
240   }
241 }
242 
ReadSingleMessage(MojoResult * read_result)243 bool Connector::ReadSingleMessage(MojoResult* read_result) {
244   CHECK(!paused_);
245 
246   bool receiver_result = false;
247 
248   // Detect if |this| was destroyed or the message pipe was closed/transferred
249   // during message dispatch.
250   base::WeakPtr<Connector> weak_self = weak_self_;
251 
252   Message message;
253   const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
254   *read_result = rv;
255 
256   if (rv == MOJO_RESULT_OK) {
257     receiver_result =
258         incoming_receiver_ && incoming_receiver_->Accept(&message);
259   }
260 
261   if (!weak_self)
262     return false;
263 
264   if (rv == MOJO_RESULT_SHOULD_WAIT)
265     return true;
266 
267   if (rv != MOJO_RESULT_OK) {
268     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
269     return false;
270   }
271 
272   if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
273     HandleError(true, false);
274     return false;
275   }
276   return true;
277 }
278 
ReadAllAvailableMessages()279 void Connector::ReadAllAvailableMessages() {
280   while (!error_) {
281     MojoResult rv;
282 
283     if (!ReadSingleMessage(&rv)) {
284       // Return immediately without touching any members. |this| may have been
285       // destroyed.
286       return;
287     }
288 
289     if (paused_)
290       return;
291 
292     if (rv == MOJO_RESULT_SHOULD_WAIT)
293       break;
294   }
295 }
296 
CancelWait()297 void Connector::CancelWait() {
298   handle_watcher_.reset();
299   sync_watcher_.reset();
300 }
301 
HandleError(bool force_pipe_reset,bool force_async_handler)302 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
303   if (error_ || !message_pipe_.is_valid())
304     return;
305 
306   if (paused_) {
307     // Enforce calling the error handler asynchronously if the user has paused
308     // receiving messages. We need to wait until the user starts receiving
309     // messages again.
310     force_async_handler = true;
311   }
312 
313   if (!force_pipe_reset && force_async_handler)
314     force_pipe_reset = true;
315 
316   if (force_pipe_reset) {
317     CancelWait();
318     internal::MayAutoLock locker(&lock_);
319     message_pipe_.reset();
320     MessagePipe dummy_pipe;
321     message_pipe_ = std::move(dummy_pipe.handle0);
322   } else {
323     CancelWait();
324   }
325 
326   if (force_async_handler) {
327     if (!paused_)
328       WaitToReadMore();
329   } else {
330     error_ = true;
331     if (!connection_error_handler_.is_null())
332       connection_error_handler_.Run();
333   }
334 }
335 
EnsureSyncWatcherExists()336 void Connector::EnsureSyncWatcherExists() {
337   if (sync_watcher_)
338     return;
339   sync_watcher_.reset(new SyncHandleWatcher(
340       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
341       base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
342                  base::Unretained(this))));
343 }
344 
345 }  // namespace mojo
346