1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "mojo/public/cpp/bindings/connector.h"
6 
7 #include <stdint.h>
8 #include <utility>
9 
10 #include "base/bind.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/synchronization/lock.h"
15 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
16 
17 namespace mojo {
18 
19 namespace {
20 
21 // Similar to base::AutoLock, except that it does nothing if |lock| passed into
22 // the constructor is null.
23 class MayAutoLock {
24  public:
MayAutoLock(base::Lock * lock)25   explicit MayAutoLock(base::Lock* lock) : lock_(lock) {
26     if (lock_)
27       lock_->Acquire();
28   }
29 
~MayAutoLock()30   ~MayAutoLock() {
31     if (lock_) {
32       lock_->AssertAcquired();
33       lock_->Release();
34     }
35   }
36 
37  private:
38   base::Lock* lock_;
39   DISALLOW_COPY_AND_ASSIGN(MayAutoLock);
40 };
41 
42 }  // namespace
43 
44 // ----------------------------------------------------------------------------
45 
Connector(ScopedMessagePipeHandle message_pipe,ConnectorConfig config,scoped_refptr<base::SingleThreadTaskRunner> runner)46 Connector::Connector(ScopedMessagePipeHandle message_pipe,
47                      ConnectorConfig config,
48                      scoped_refptr<base::SingleThreadTaskRunner> runner)
49     : message_pipe_(std::move(message_pipe)),
50       incoming_receiver_(nullptr),
51       task_runner_(std::move(runner)),
52       handle_watcher_(task_runner_),
53       error_(false),
54       drop_writes_(false),
55       enforce_errors_from_incoming_receiver_(true),
56       paused_(false),
57       lock_(config == MULTI_THREADED_SEND ? new base::Lock : nullptr),
58       allow_woken_up_by_others_(false),
59       sync_handle_watcher_callback_count_(0),
60       weak_factory_(this) {
61   weak_self_ = weak_factory_.GetWeakPtr();
62   // Even though we don't have an incoming receiver, we still want to monitor
63   // the message pipe to know if is closed or encounters an error.
64   WaitToReadMore();
65 }
66 
~Connector()67 Connector::~Connector() {
68   DCHECK(thread_checker_.CalledOnValidThread());
69 
70   CancelWait();
71 }
72 
CloseMessagePipe()73 void Connector::CloseMessagePipe() {
74   DCHECK(thread_checker_.CalledOnValidThread());
75 
76   CancelWait();
77   MayAutoLock locker(lock_.get());
78   message_pipe_.reset();
79 }
80 
PassMessagePipe()81 ScopedMessagePipeHandle Connector::PassMessagePipe() {
82   DCHECK(thread_checker_.CalledOnValidThread());
83 
84   CancelWait();
85   MayAutoLock locker(lock_.get());
86   return std::move(message_pipe_);
87 }
88 
RaiseError()89 void Connector::RaiseError() {
90   DCHECK(thread_checker_.CalledOnValidThread());
91 
92   HandleError(true, true);
93 }
94 
WaitForIncomingMessage(MojoDeadline deadline)95 bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
96   DCHECK(thread_checker_.CalledOnValidThread());
97 
98   if (error_)
99     return false;
100 
101   ResumeIncomingMethodCallProcessing();
102 
103   MojoResult rv =
104       Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, deadline, nullptr);
105   if (rv == MOJO_RESULT_SHOULD_WAIT || rv == MOJO_RESULT_DEADLINE_EXCEEDED)
106     return false;
107   if (rv != MOJO_RESULT_OK) {
108     // Users that call WaitForIncomingMessage() should expect their code to be
109     // re-entered, so we call the error handler synchronously.
110     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
111     return false;
112   }
113   ignore_result(ReadSingleMessage(&rv));
114   return (rv == MOJO_RESULT_OK);
115 }
116 
PauseIncomingMethodCallProcessing()117 void Connector::PauseIncomingMethodCallProcessing() {
118   DCHECK(thread_checker_.CalledOnValidThread());
119 
120   if (paused_)
121     return;
122 
123   paused_ = true;
124   CancelWait();
125 }
126 
ResumeIncomingMethodCallProcessing()127 void Connector::ResumeIncomingMethodCallProcessing() {
128   DCHECK(thread_checker_.CalledOnValidThread());
129 
130   if (!paused_)
131     return;
132 
133   paused_ = false;
134   WaitToReadMore();
135 }
136 
Accept(Message * message)137 bool Connector::Accept(Message* message) {
138   DCHECK(lock_ || thread_checker_.CalledOnValidThread());
139 
140   // It shouldn't hurt even if |error_| may be changed by a different thread at
141   // the same time. The outcome is that we may write into |message_pipe_| after
142   // encountering an error, which should be fine.
143   if (error_)
144     return false;
145 
146   MayAutoLock locker(lock_.get());
147 
148   if (!message_pipe_.is_valid() || drop_writes_)
149     return true;
150 
151   MojoResult rv =
152       WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
153                       MOJO_WRITE_MESSAGE_FLAG_NONE);
154 
155   switch (rv) {
156     case MOJO_RESULT_OK:
157       break;
158     case MOJO_RESULT_FAILED_PRECONDITION:
159       // There's no point in continuing to write to this pipe since the other
160       // end is gone. Avoid writing any future messages. Hide write failures
161       // from the caller since we'd like them to continue consuming any backlog
162       // of incoming messages before regarding the message pipe as closed.
163       drop_writes_ = true;
164       break;
165     case MOJO_RESULT_BUSY:
166       // We'd get a "busy" result if one of the message's handles is:
167       //   - |message_pipe_|'s own handle;
168       //   - simultaneously being used on another thread; or
169       //   - in a "busy" state that prohibits it from being transferred (e.g.,
170       //     a data pipe handle in the middle of a two-phase read/write,
171       //     regardless of which thread that two-phase read/write is happening
172       //     on).
173       // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
174       // crbug.com/389666, etc. are resolved, this will make tests fail quickly
175       // rather than hanging.)
176       CHECK(false) << "Race condition or other bug detected";
177       return false;
178     default:
179       // This particular write was rejected, presumably because of bad input.
180       // The pipe is not necessarily in a bad state.
181       return false;
182   }
183   return true;
184 }
185 
AllowWokenUpBySyncWatchOnSameThread()186 void Connector::AllowWokenUpBySyncWatchOnSameThread() {
187   DCHECK(thread_checker_.CalledOnValidThread());
188 
189   allow_woken_up_by_others_ = true;
190 
191   EnsureSyncWatcherExists();
192   sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
193 }
194 
SyncWatch(const bool * should_stop)195 bool Connector::SyncWatch(const bool* should_stop) {
196   DCHECK(thread_checker_.CalledOnValidThread());
197 
198   if (error_)
199     return false;
200 
201   ResumeIncomingMethodCallProcessing();
202 
203   EnsureSyncWatcherExists();
204   return sync_watcher_->SyncWatch(should_stop);
205 }
206 
OnWatcherHandleReady(MojoResult result)207 void Connector::OnWatcherHandleReady(MojoResult result) {
208   OnHandleReadyInternal(result);
209 }
210 
OnSyncHandleWatcherHandleReady(MojoResult result)211 void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
212   base::WeakPtr<Connector> weak_self(weak_self_);
213 
214   sync_handle_watcher_callback_count_++;
215   OnHandleReadyInternal(result);
216   // At this point, this object might have been deleted.
217   if (weak_self)
218     sync_handle_watcher_callback_count_--;
219 }
220 
OnHandleReadyInternal(MojoResult result)221 void Connector::OnHandleReadyInternal(MojoResult result) {
222   DCHECK(thread_checker_.CalledOnValidThread());
223 
224   if (result != MOJO_RESULT_OK) {
225     HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
226     return;
227   }
228   ReadAllAvailableMessages();
229   // At this point, this object might have been deleted. Return.
230 }
231 
WaitToReadMore()232 void Connector::WaitToReadMore() {
233   CHECK(!paused_);
234   DCHECK(!handle_watcher_.IsWatching());
235 
236   MojoResult rv = handle_watcher_.Start(
237       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
238       base::Bind(&Connector::OnWatcherHandleReady,
239                  base::Unretained(this)));
240 
241   if (rv != MOJO_RESULT_OK) {
242     // If the watch failed because the handle is invalid or its conditions can
243     // no longer be met, we signal the error asynchronously to avoid reentry.
244     task_runner_->PostTask(
245         FROM_HERE,
246         base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
247   }
248 
249   if (allow_woken_up_by_others_) {
250     EnsureSyncWatcherExists();
251     sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
252   }
253 }
254 
ReadSingleMessage(MojoResult * read_result)255 bool Connector::ReadSingleMessage(MojoResult* read_result) {
256   CHECK(!paused_);
257 
258   bool receiver_result = false;
259 
260   // Detect if |this| was destroyed during message dispatch. Allow for the
261   // possibility of re-entering ReadMore() through message dispatch.
262   base::WeakPtr<Connector> weak_self = weak_self_;
263 
264   Message message;
265   const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
266   *read_result = rv;
267 
268   if (rv == MOJO_RESULT_OK) {
269     receiver_result =
270         incoming_receiver_ && incoming_receiver_->Accept(&message);
271   }
272 
273   if (!weak_self)
274     return false;
275 
276   if (rv == MOJO_RESULT_SHOULD_WAIT)
277     return true;
278 
279   if (rv != MOJO_RESULT_OK) {
280     HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
281     return false;
282   }
283 
284   if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
285     HandleError(true, false);
286     return false;
287   }
288   return true;
289 }
290 
ReadAllAvailableMessages()291 void Connector::ReadAllAvailableMessages() {
292   while (!error_) {
293     MojoResult rv;
294 
295     // Return immediately if |this| was destroyed. Do not touch any members!
296     if (!ReadSingleMessage(&rv))
297       return;
298 
299     if (paused_)
300       return;
301 
302     if (rv == MOJO_RESULT_SHOULD_WAIT)
303       break;
304   }
305 }
306 
CancelWait()307 void Connector::CancelWait() {
308   handle_watcher_.Cancel();
309   sync_watcher_.reset();
310 }
311 
HandleError(bool force_pipe_reset,bool force_async_handler)312 void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
313   if (error_ || !message_pipe_.is_valid())
314     return;
315 
316   if (paused_) {
317     // Enforce calling the error handler asynchronously if the user has paused
318     // receiving messages. We need to wait until the user starts receiving
319     // messages again.
320     force_async_handler = true;
321   }
322 
323   if (!force_pipe_reset && force_async_handler)
324     force_pipe_reset = true;
325 
326   if (force_pipe_reset) {
327     CancelWait();
328     MayAutoLock locker(lock_.get());
329     message_pipe_.reset();
330     MessagePipe dummy_pipe;
331     message_pipe_ = std::move(dummy_pipe.handle0);
332   } else {
333     CancelWait();
334   }
335 
336   if (force_async_handler) {
337     if (!paused_)
338       WaitToReadMore();
339   } else {
340     error_ = true;
341     if (!connection_error_handler_.is_null())
342       connection_error_handler_.Run();
343   }
344 }
345 
EnsureSyncWatcherExists()346 void Connector::EnsureSyncWatcherExists() {
347   if (sync_watcher_)
348     return;
349   sync_watcher_.reset(new SyncHandleWatcher(
350       message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
351       base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
352                  base::Unretained(this))));
353 }
354 
355 }  // namespace mojo
356