1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_channel_proxy.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <utility>
11 
12 #include "base/bind.h"
13 #include "base/compiler_specific.h"
14 #include "base/location.h"
15 #include "base/memory/ptr_util.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/threading/thread_task_runner_handle.h"
19 #include "build/build_config.h"
20 #include "ipc/ipc_channel_factory.h"
21 #include "ipc/ipc_listener.h"
22 #include "ipc/ipc_logging.h"
23 #include "ipc/ipc_message_macros.h"
24 #include "ipc/message_filter.h"
25 #include "ipc/message_filter_router.h"
26 
27 namespace IPC {
28 
29 //------------------------------------------------------------------------------
30 
Context(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)31 ChannelProxy::Context::Context(
32     Listener* listener,
33     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
34     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
35     : listener_task_runner_(listener_task_runner),
36       listener_(listener),
37       ipc_task_runner_(ipc_task_runner),
38       channel_connected_called_(false),
39       message_filter_router_(new MessageFilterRouter()),
40       peer_pid_(base::kNullProcessId) {
41   DCHECK(ipc_task_runner_.get());
42   // The Listener thread where Messages are handled must be a separate thread
43   // to avoid oversubscribing the IO thread. If you trigger this error, you
44   // need to either:
45   // 1) Create the ChannelProxy on a different thread, or
46   // 2) Just use Channel
47   // Note, we currently make an exception for a NULL listener. That usage
48   // basically works, but is outside the intent of ChannelProxy. This support
49   // will disappear, so please don't rely on it. See crbug.com/364241
50   DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
51 }
52 
53 ChannelProxy::Context::~Context() = default;
54 
ClearIPCTaskRunner()55 void ChannelProxy::Context::ClearIPCTaskRunner() {
56   ipc_task_runner_ = NULL;
57 }
58 
CreateChannel(std::unique_ptr<ChannelFactory> factory)59 void ChannelProxy::Context::CreateChannel(
60     std::unique_ptr<ChannelFactory> factory) {
61   base::AutoLock l(channel_lifetime_lock_);
62   DCHECK(!channel_);
63   DCHECK_EQ(factory->GetIPCTaskRunner(), ipc_task_runner_);
64   channel_ = factory->BuildChannel(this);
65 
66   Channel::AssociatedInterfaceSupport* support =
67       channel_->GetAssociatedInterfaceSupport();
68   if (support) {
69     thread_safe_channel_ = support->CreateThreadSafeChannel();
70 
71     base::AutoLock l(pending_filters_lock_);
72     for (auto& entry : pending_io_thread_interfaces_)
73       support->AddGenericAssociatedInterface(entry.first, entry.second);
74     pending_io_thread_interfaces_.clear();
75   }
76 }
77 
TryFilters(const Message & message)78 bool ChannelProxy::Context::TryFilters(const Message& message) {
79   DCHECK(message_filter_router_);
80 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
81   Logging* logger = Logging::GetInstance();
82   if (logger->Enabled())
83     logger->OnPreDispatchMessage(message);
84 #endif
85 
86   if (message_filter_router_->TryFilters(message)) {
87     if (message.dispatch_error()) {
88       listener_task_runner_->PostTask(
89           FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
90     }
91 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
92     if (logger->Enabled())
93       logger->OnPostDispatchMessage(message);
94 #endif
95     return true;
96   }
97   return false;
98 }
99 
100 // Called on the IPC::Channel thread
PauseChannel()101 void ChannelProxy::Context::PauseChannel() {
102   DCHECK(channel_);
103   channel_->Pause();
104 }
105 
106 // Called on the IPC::Channel thread
UnpauseChannel(bool flush)107 void ChannelProxy::Context::UnpauseChannel(bool flush) {
108   DCHECK(channel_);
109   channel_->Unpause(flush);
110 }
111 
112 // Called on the IPC::Channel thread
FlushChannel()113 void ChannelProxy::Context::FlushChannel() {
114   DCHECK(channel_);
115   channel_->Flush();
116 }
117 
118 // Called on the IPC::Channel thread
OnMessageReceived(const Message & message)119 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
120   // First give a chance to the filters to process this message.
121   if (!TryFilters(message))
122     OnMessageReceivedNoFilter(message);
123   return true;
124 }
125 
126 // Called on the IPC::Channel thread
OnMessageReceivedNoFilter(const Message & message)127 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
128   listener_task_runner_->PostTask(
129       FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
130   return true;
131 }
132 
133 // Called on the IPC::Channel thread
OnChannelConnected(int32_t peer_pid)134 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
135   // We cache off the peer_pid so it can be safely accessed from both threads.
136   {
137     base::AutoLock l(peer_pid_lock_);
138     peer_pid_ = peer_pid;
139   }
140 
141   // Add any pending filters.  This avoids a race condition where someone
142   // creates a ChannelProxy, calls AddFilter, and then right after starts the
143   // peer process.  The IO thread could receive a message before the task to add
144   // the filter is run on the IO thread.
145   OnAddFilter();
146 
147   // See above comment about using listener_task_runner_ here.
148   listener_task_runner_->PostTask(
149       FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
150 }
151 
152 // Called on the IPC::Channel thread
OnChannelError()153 void ChannelProxy::Context::OnChannelError() {
154   for (size_t i = 0; i < filters_.size(); ++i)
155     filters_[i]->OnChannelError();
156 
157   // See above comment about using listener_task_runner_ here.
158   listener_task_runner_->PostTask(
159       FROM_HERE, base::Bind(&Context::OnDispatchError, this));
160 }
161 
162 // Called on the IPC::Channel thread
OnAssociatedInterfaceRequest(const std::string & interface_name,mojo::ScopedInterfaceEndpointHandle handle)163 void ChannelProxy::Context::OnAssociatedInterfaceRequest(
164     const std::string& interface_name,
165     mojo::ScopedInterfaceEndpointHandle handle) {
166   listener_task_runner_->PostTask(
167       FROM_HERE, base::Bind(&Context::OnDispatchAssociatedInterfaceRequest,
168                             this, interface_name, base::Passed(&handle)));
169 }
170 
171 // Called on the IPC::Channel thread
OnChannelOpened()172 void ChannelProxy::Context::OnChannelOpened() {
173   DCHECK(channel_ != NULL);
174 
175   // Assume a reference to ourselves on behalf of this thread.  This reference
176   // will be released when we are closed.
177   AddRef();
178 
179   if (!channel_->Connect()) {
180     OnChannelError();
181     return;
182   }
183 
184   for (size_t i = 0; i < filters_.size(); ++i)
185     filters_[i]->OnFilterAdded(channel_.get());
186 }
187 
188 // Called on the IPC::Channel thread
OnChannelClosed()189 void ChannelProxy::Context::OnChannelClosed() {
190   // It's okay for IPC::ChannelProxy::Close to be called more than once, which
191   // would result in this branch being taken.
192   if (!channel_)
193     return;
194 
195   for (auto& filter : pending_filters_) {
196     filter->OnChannelClosing();
197     filter->OnFilterRemoved();
198   }
199   for (auto& filter : filters_) {
200     filter->OnChannelClosing();
201     filter->OnFilterRemoved();
202   }
203 
204   // We don't need the filters anymore.
205   message_filter_router_->Clear();
206   filters_.clear();
207   // We don't need the lock, because at this point, the listener thread can't
208   // access it any more.
209   pending_filters_.clear();
210 
211   ClearChannel();
212 
213   // Balance with the reference taken during startup.  This may result in
214   // self-destruction.
215   Release();
216 }
217 
Clear()218 void ChannelProxy::Context::Clear() {
219   listener_ = NULL;
220 }
221 
222 // Called on the IPC::Channel thread
OnSendMessage(std::unique_ptr<Message> message)223 void ChannelProxy::Context::OnSendMessage(std::unique_ptr<Message> message) {
224   if (!channel_) {
225     OnChannelClosed();
226     return;
227   }
228 
229   if (!channel_->Send(message.release()))
230     OnChannelError();
231 }
232 
233 // Called on the IPC::Channel thread
OnAddFilter()234 void ChannelProxy::Context::OnAddFilter() {
235   // Our OnChannelConnected method has not yet been called, so we can't be
236   // sure that channel_ is valid yet. When OnChannelConnected *is* called,
237   // it invokes OnAddFilter, so any pending filter(s) will be added at that
238   // time.
239   // No lock necessary for |peer_pid_| because it is only modified on this
240   // thread.
241   if (peer_pid_ == base::kNullProcessId)
242     return;
243 
244   std::vector<scoped_refptr<MessageFilter> > new_filters;
245   {
246     base::AutoLock auto_lock(pending_filters_lock_);
247     new_filters.swap(pending_filters_);
248   }
249 
250   for (size_t i = 0; i < new_filters.size(); ++i) {
251     filters_.push_back(new_filters[i]);
252 
253     message_filter_router_->AddFilter(new_filters[i].get());
254 
255     // The channel has already been created and connected, so we need to
256     // inform the filters right now.
257     new_filters[i]->OnFilterAdded(channel_.get());
258     new_filters[i]->OnChannelConnected(peer_pid_);
259   }
260 }
261 
262 // Called on the IPC::Channel thread
OnRemoveFilter(MessageFilter * filter)263 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
264   // No lock necessary for |peer_pid_| because it is only modified on this
265   // thread.
266   if (peer_pid_ == base::kNullProcessId) {
267     // The channel is not yet connected, so any filters are still pending.
268     base::AutoLock auto_lock(pending_filters_lock_);
269     for (size_t i = 0; i < pending_filters_.size(); ++i) {
270       if (pending_filters_[i].get() == filter) {
271         filter->OnFilterRemoved();
272         pending_filters_.erase(pending_filters_.begin() + i);
273         return;
274       }
275     }
276     return;
277   }
278   if (!channel_)
279     return;  // The filters have already been deleted.
280 
281   message_filter_router_->RemoveFilter(filter);
282 
283   for (size_t i = 0; i < filters_.size(); ++i) {
284     if (filters_[i].get() == filter) {
285       filter->OnFilterRemoved();
286       filters_.erase(filters_.begin() + i);
287       return;
288     }
289   }
290 
291   NOTREACHED() << "filter to be removed not found";
292 }
293 
294 // Called on the listener's thread
AddFilter(MessageFilter * filter)295 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
296   base::AutoLock auto_lock(pending_filters_lock_);
297   pending_filters_.push_back(base::WrapRefCounted(filter));
298   ipc_task_runner_->PostTask(
299       FROM_HERE, base::Bind(&Context::OnAddFilter, this));
300 }
301 
302 // Called on the listener's thread
OnDispatchMessage(const Message & message)303 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
304   if (!listener_)
305     return;
306 
307   OnDispatchConnected();
308 
309 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
310   Logging* logger = Logging::GetInstance();
311   if (message.type() == IPC_LOGGING_ID) {
312     logger->OnReceivedLoggingMessage(message);
313     return;
314   }
315 
316   if (logger->Enabled())
317     logger->OnPreDispatchMessage(message);
318 #endif
319 
320   listener_->OnMessageReceived(message);
321   if (message.dispatch_error())
322     listener_->OnBadMessageReceived(message);
323 
324 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
325   if (logger->Enabled())
326     logger->OnPostDispatchMessage(message);
327 #endif
328 }
329 
330 // Called on the listener's thread
OnDispatchConnected()331 void ChannelProxy::Context::OnDispatchConnected() {
332   if (channel_connected_called_)
333     return;
334 
335   base::ProcessId peer_pid;
336   {
337     base::AutoLock l(peer_pid_lock_);
338     peer_pid = peer_pid_;
339   }
340   channel_connected_called_ = true;
341   if (listener_)
342     listener_->OnChannelConnected(peer_pid);
343 }
344 
345 // Called on the listener's thread
OnDispatchError()346 void ChannelProxy::Context::OnDispatchError() {
347   if (listener_)
348     listener_->OnChannelError();
349 }
350 
351 // Called on the listener's thread
OnDispatchBadMessage(const Message & message)352 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
353   if (listener_)
354     listener_->OnBadMessageReceived(message);
355 }
356 
357 // Called on the listener's thread
OnDispatchAssociatedInterfaceRequest(const std::string & interface_name,mojo::ScopedInterfaceEndpointHandle handle)358 void ChannelProxy::Context::OnDispatchAssociatedInterfaceRequest(
359     const std::string& interface_name,
360     mojo::ScopedInterfaceEndpointHandle handle) {
361   if (listener_)
362     listener_->OnAssociatedInterfaceRequest(interface_name, std::move(handle));
363 }
364 
ClearChannel()365 void ChannelProxy::Context::ClearChannel() {
366   base::AutoLock l(channel_lifetime_lock_);
367   channel_.reset();
368 }
369 
AddGenericAssociatedInterfaceForIOThread(const std::string & name,const GenericAssociatedInterfaceFactory & factory)370 void ChannelProxy::Context::AddGenericAssociatedInterfaceForIOThread(
371     const std::string& name,
372     const GenericAssociatedInterfaceFactory& factory) {
373   base::AutoLock l(channel_lifetime_lock_);
374   if (!channel_) {
375     base::AutoLock l(pending_filters_lock_);
376     pending_io_thread_interfaces_.emplace_back(name, factory);
377     return;
378   }
379   Channel::AssociatedInterfaceSupport* support =
380       channel_->GetAssociatedInterfaceSupport();
381   if (support)
382     support->AddGenericAssociatedInterface(name, factory);
383 }
384 
Send(Message * message)385 void ChannelProxy::Context::Send(Message* message) {
386   ipc_task_runner()->PostTask(
387       FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
388                             base::Passed(base::WrapUnique(message))));
389 }
390 
391 //-----------------------------------------------------------------------------
392 
393 // static
Create(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)394 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
395     const IPC::ChannelHandle& channel_handle,
396     Channel::Mode mode,
397     Listener* listener,
398     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
399     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
400   std::unique_ptr<ChannelProxy> channel(
401       new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
402   channel->Init(channel_handle, mode, true);
403   return channel;
404 }
405 
406 // static
Create(std::unique_ptr<ChannelFactory> factory,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)407 std::unique_ptr<ChannelProxy> ChannelProxy::Create(
408     std::unique_ptr<ChannelFactory> factory,
409     Listener* listener,
410     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
411     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner) {
412   std::unique_ptr<ChannelProxy> channel(
413       new ChannelProxy(listener, ipc_task_runner, listener_task_runner));
414   channel->Init(std::move(factory), true);
415   return channel;
416 }
417 
ChannelProxy(Context * context)418 ChannelProxy::ChannelProxy(Context* context)
419     : context_(context), did_init_(false) {
420 #if defined(ENABLE_IPC_FUZZER)
421   outgoing_message_filter_ = NULL;
422 #endif
423 }
424 
ChannelProxy(Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & listener_task_runner)425 ChannelProxy::ChannelProxy(
426     Listener* listener,
427     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
428     const scoped_refptr<base::SingleThreadTaskRunner>& listener_task_runner)
429     : context_(new Context(listener, ipc_task_runner, listener_task_runner)),
430       did_init_(false) {
431 #if defined(ENABLE_IPC_FUZZER)
432   outgoing_message_filter_ = NULL;
433 #endif
434 }
435 
~ChannelProxy()436 ChannelProxy::~ChannelProxy() {
437   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
438 
439   Close();
440 }
441 
Init(const IPC::ChannelHandle & channel_handle,Channel::Mode mode,bool create_pipe_now)442 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
443                         Channel::Mode mode,
444                         bool create_pipe_now) {
445 #if defined(OS_POSIX) || defined(OS_FUCHSIA)
446   // When we are creating a server on POSIX, we need its file descriptor
447   // to be created immediately so that it can be accessed and passed
448   // to other processes. Forcing it to be created immediately avoids
449   // race conditions that may otherwise arise.
450   if (mode & Channel::MODE_SERVER_FLAG) {
451     create_pipe_now = true;
452   }
453 #endif  // defined(OS_POSIX) || defined(OS_FUCHSIA)
454   Init(
455       ChannelFactory::Create(channel_handle, mode, context_->ipc_task_runner()),
456       create_pipe_now);
457 }
458 
Init(std::unique_ptr<ChannelFactory> factory,bool create_pipe_now)459 void ChannelProxy::Init(std::unique_ptr<ChannelFactory> factory,
460                         bool create_pipe_now) {
461   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
462   DCHECK(!did_init_);
463 
464   if (create_pipe_now) {
465     // Create the channel immediately.  This effectively sets up the
466     // low-level pipe so that the client can connect.  Without creating
467     // the pipe immediately, it is possible for a listener to attempt
468     // to connect and get an error since the pipe doesn't exist yet.
469     context_->CreateChannel(std::move(factory));
470   } else {
471     context_->ipc_task_runner()->PostTask(
472         FROM_HERE, base::Bind(&Context::CreateChannel, context_,
473                               base::Passed(&factory)));
474   }
475 
476   // complete initialization on the background thread
477   context_->ipc_task_runner()->PostTask(
478       FROM_HERE,
479       base::Bind(&Context::OnChannelOpened, context_));
480 
481   did_init_ = true;
482   OnChannelInit();
483 }
484 
Pause()485 void ChannelProxy::Pause() {
486   context_->ipc_task_runner()->PostTask(
487       FROM_HERE, base::Bind(&Context::PauseChannel, context_));
488 }
489 
Unpause(bool flush)490 void ChannelProxy::Unpause(bool flush) {
491   context_->ipc_task_runner()->PostTask(
492       FROM_HERE, base::Bind(&Context::UnpauseChannel, context_, flush));
493 }
494 
Flush()495 void ChannelProxy::Flush() {
496   context_->ipc_task_runner()->PostTask(
497       FROM_HERE, base::Bind(&Context::FlushChannel, context_));
498 }
499 
Close()500 void ChannelProxy::Close() {
501   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
502 
503   // Clear the backpointer to the listener so that any pending calls to
504   // Context::OnDispatchMessage or OnDispatchError will be ignored.  It is
505   // possible that the channel could be closed while it is receiving messages!
506   context_->Clear();
507 
508   if (context_->ipc_task_runner()) {
509     context_->ipc_task_runner()->PostTask(
510         FROM_HERE, base::Bind(&Context::OnChannelClosed, context_));
511   }
512 }
513 
Send(Message * message)514 bool ChannelProxy::Send(Message* message) {
515   DCHECK(!message->is_sync()) << "Need to use IPC::SyncChannel";
516   SendInternal(message);
517   return true;
518 }
519 
SendInternal(Message * message)520 void ChannelProxy::SendInternal(Message* message) {
521   DCHECK(did_init_);
522 
523   // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
524   // tests that call Send() from a wrong thread. See http://crbug.com/163523.
525 
526 #ifdef ENABLE_IPC_FUZZER
527   // In IPC fuzzing builds, it is possible to define a filter to apply to
528   // outgoing messages. It will either rewrite the message and return a new
529   // one, freeing the original, or return the message unchanged.
530   if (outgoing_message_filter())
531     message = outgoing_message_filter()->Rewrite(message);
532 #endif
533 
534 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
535   Logging::GetInstance()->OnSendMessage(message);
536 #endif
537 
538   // See https://crbug.com/766032. This is to ensure that senders of oversized
539   // messages can be caught more easily in the wild.
540   CHECK_LE(message->size(), Channel::kMaximumMessageSize);
541 
542   context_->Send(message);
543 }
544 
AddFilter(MessageFilter * filter)545 void ChannelProxy::AddFilter(MessageFilter* filter) {
546   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
547 
548   context_->AddFilter(filter);
549 }
550 
RemoveFilter(MessageFilter * filter)551 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
552   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
553 
554   context_->ipc_task_runner()->PostTask(
555       FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_,
556                             base::RetainedRef(filter)));
557 }
558 
AddGenericAssociatedInterfaceForIOThread(const std::string & name,const GenericAssociatedInterfaceFactory & factory)559 void ChannelProxy::AddGenericAssociatedInterfaceForIOThread(
560     const std::string& name,
561     const GenericAssociatedInterfaceFactory& factory) {
562   context()->AddGenericAssociatedInterfaceForIOThread(name, factory);
563 }
564 
GetGenericRemoteAssociatedInterface(const std::string & name,mojo::ScopedInterfaceEndpointHandle handle)565 void ChannelProxy::GetGenericRemoteAssociatedInterface(
566     const std::string& name,
567     mojo::ScopedInterfaceEndpointHandle handle) {
568   DCHECK(did_init_);
569   context()->thread_safe_channel().GetAssociatedInterface(
570       name, mojom::GenericInterfaceAssociatedRequest(std::move(handle)));
571 }
572 
ClearIPCTaskRunner()573 void ChannelProxy::ClearIPCTaskRunner() {
574   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
575   context()->ClearIPCTaskRunner();
576 }
577 
OnChannelInit()578 void ChannelProxy::OnChannelInit() {
579 }
580 
581 //-----------------------------------------------------------------------------
582 
583 }  // namespace IPC
584