1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "dbus/bus.h"
6 
7 #include <stddef.h>
8 
9 #include "base/bind.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/stl_util.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/threading/thread.h"
15 #include "base/threading/thread_restrictions.h"
16 #include "base/threading/thread_task_runner_handle.h"
17 #include "base/time/time.h"
18 #include "dbus/exported_object.h"
19 #include "dbus/message.h"
20 #include "dbus/object_manager.h"
21 #include "dbus/object_path.h"
22 #include "dbus/object_proxy.h"
23 #include "dbus/scoped_dbus_error.h"
24 
25 namespace dbus {
26 
27 namespace {
28 
29 // The NameOwnerChanged member in org.freedesktop.DBus
30 const char kNameOwnerChangedSignal[] = "NameOwnerChanged";
31 
32 // The match rule used to filter for changes to a given service name owner.
33 const char kServiceNameOwnerChangeMatchRule[] =
34     "type='signal',interface='org.freedesktop.DBus',"
35     "member='NameOwnerChanged',path='/org/freedesktop/DBus',"
36     "sender='org.freedesktop.DBus',arg0='%s'";
37 
38 // The class is used for watching the file descriptor used for D-Bus
39 // communication.
40 class Watch : public base::MessagePumpLibevent::Watcher {
41  public:
Watch(DBusWatch * watch)42   explicit Watch(DBusWatch* watch)
43       : raw_watch_(watch) {
44     dbus_watch_set_data(raw_watch_, this, NULL);
45   }
46 
~Watch()47   ~Watch() override { dbus_watch_set_data(raw_watch_, NULL, NULL); }
48 
49   // Returns true if the underlying file descriptor is ready to be watched.
IsReadyToBeWatched()50   bool IsReadyToBeWatched() {
51     return dbus_watch_get_enabled(raw_watch_);
52   }
53 
54   // Starts watching the underlying file descriptor.
StartWatching()55   void StartWatching() {
56     const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
57     const int flags = dbus_watch_get_flags(raw_watch_);
58 
59     base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
60     if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
61       mode = base::MessageLoopForIO::WATCH_READ_WRITE;
62     else if (flags & DBUS_WATCH_READABLE)
63       mode = base::MessageLoopForIO::WATCH_READ;
64     else if (flags & DBUS_WATCH_WRITABLE)
65       mode = base::MessageLoopForIO::WATCH_WRITE;
66     else
67       NOTREACHED();
68 
69     const bool persistent = true;  // Watch persistently.
70     const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
71         file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
72     CHECK(success) << "Unable to allocate memory";
73   }
74 
75   // Stops watching the underlying file descriptor.
StopWatching()76   void StopWatching() {
77     file_descriptor_watcher_.StopWatchingFileDescriptor();
78   }
79 
80  private:
81   // Implement MessagePumpLibevent::Watcher.
OnFileCanReadWithoutBlocking(int)82   void OnFileCanReadWithoutBlocking(int /*file_descriptor*/) override {
83     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
84     CHECK(success) << "Unable to allocate memory";
85   }
86 
87   // Implement MessagePumpLibevent::Watcher.
OnFileCanWriteWithoutBlocking(int)88   void OnFileCanWriteWithoutBlocking(int /*file_descriptor*/) override {
89     const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
90     CHECK(success) << "Unable to allocate memory";
91   }
92 
93   DBusWatch* raw_watch_;
94   base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
95 };
96 
97 // The class is used for monitoring the timeout used for D-Bus method
98 // calls.
99 //
100 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
101 // the object is is alive when HandleTimeout() is called. It's unlikely
102 // but it may be possible that HandleTimeout() is called after
103 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
104 // Bus::OnRemoveTimeout().
105 class Timeout : public base::RefCountedThreadSafe<Timeout> {
106  public:
Timeout(DBusTimeout * timeout)107   explicit Timeout(DBusTimeout* timeout)
108       : raw_timeout_(timeout),
109         monitoring_is_active_(false),
110         is_completed(false) {
111     dbus_timeout_set_data(raw_timeout_, this, NULL);
112     AddRef();  // Balanced on Complete().
113   }
114 
115   // Returns true if the timeout is ready to be monitored.
IsReadyToBeMonitored()116   bool IsReadyToBeMonitored() {
117     return dbus_timeout_get_enabled(raw_timeout_);
118   }
119 
120   // Starts monitoring the timeout.
StartMonitoring(Bus * bus)121   void StartMonitoring(Bus* bus) {
122     bus->GetDBusTaskRunner()->PostDelayedTask(
123         FROM_HERE,
124         base::Bind(&Timeout::HandleTimeout, this),
125         GetInterval());
126     monitoring_is_active_ = true;
127   }
128 
129   // Stops monitoring the timeout.
StopMonitoring()130   void StopMonitoring() {
131     // We cannot take back the delayed task we posted in
132     // StartMonitoring(), so we just mark the monitoring is inactive now.
133     monitoring_is_active_ = false;
134   }
135 
136   // Returns the interval.
GetInterval()137   base::TimeDelta GetInterval() {
138     return base::TimeDelta::FromMilliseconds(
139         dbus_timeout_get_interval(raw_timeout_));
140   }
141 
142   // Cleans up the raw_timeout and marks that timeout is completed.
143   // See the class comment above for why we are doing this.
Complete()144   void Complete() {
145     dbus_timeout_set_data(raw_timeout_, NULL, NULL);
146     is_completed = true;
147     Release();
148   }
149 
150  private:
151   friend class base::RefCountedThreadSafe<Timeout>;
~Timeout()152   ~Timeout() {
153   }
154 
155   // Handles the timeout.
HandleTimeout()156   void HandleTimeout() {
157     // If the timeout is marked completed, we should do nothing. This can
158     // occur if this function is called after Bus::OnRemoveTimeout().
159     if (is_completed)
160       return;
161     // Skip if monitoring is canceled.
162     if (!monitoring_is_active_)
163       return;
164 
165     const bool success = dbus_timeout_handle(raw_timeout_);
166     CHECK(success) << "Unable to allocate memory";
167   }
168 
169   DBusTimeout* raw_timeout_;
170   bool monitoring_is_active_;
171   bool is_completed;
172 };
173 
174 }  // namespace
175 
Options()176 Bus::Options::Options()
177   : bus_type(SESSION),
178     connection_type(PRIVATE) {
179 }
180 
~Options()181 Bus::Options::~Options() {
182 }
183 
Bus(const Options & options)184 Bus::Bus(const Options& options)
185     : bus_type_(options.bus_type),
186       connection_type_(options.connection_type),
187       dbus_task_runner_(options.dbus_task_runner),
188       on_shutdown_(base::WaitableEvent::ResetPolicy::AUTOMATIC,
189                    base::WaitableEvent::InitialState::NOT_SIGNALED),
190       connection_(NULL),
191       origin_thread_id_(base::PlatformThread::CurrentId()),
192       async_operations_set_up_(false),
193       shutdown_completed_(false),
194       num_pending_watches_(0),
195       num_pending_timeouts_(0),
196       address_(options.address) {
197   // This is safe to call multiple times.
198   dbus_threads_init_default();
199   // The origin message loop is unnecessary if the client uses synchronous
200   // functions only.
201   if (base::ThreadTaskRunnerHandle::IsSet())
202     origin_task_runner_ = base::ThreadTaskRunnerHandle::Get();
203 }
204 
~Bus()205 Bus::~Bus() {
206   DCHECK(!connection_);
207   DCHECK(owned_service_names_.empty());
208   DCHECK(match_rules_added_.empty());
209   DCHECK(filter_functions_added_.empty());
210   DCHECK(registered_object_paths_.empty());
211   DCHECK_EQ(0, num_pending_watches_);
212   // TODO(satorux): This check fails occasionally in browser_tests for tests
213   // that run very quickly. Perhaps something does not have time to clean up.
214   // Despite the check failing, the tests seem to run fine. crosbug.com/23416
215   // DCHECK_EQ(0, num_pending_timeouts_);
216 }
217 
GetObjectProxy(const std::string & service_name,const ObjectPath & object_path)218 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
219                                  const ObjectPath& object_path) {
220   return GetObjectProxyWithOptions(service_name, object_path,
221                                    ObjectProxy::DEFAULT_OPTIONS);
222 }
223 
GetObjectProxyWithOptions(const std::string & service_name,const ObjectPath & object_path,int options)224 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
225                                             const ObjectPath& object_path,
226                                             int options) {
227   AssertOnOriginThread();
228 
229   // Check if we already have the requested object proxy.
230   const ObjectProxyTable::key_type key(service_name + object_path.value(),
231                                        options);
232   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
233   if (iter != object_proxy_table_.end()) {
234     return iter->second.get();
235   }
236 
237   scoped_refptr<ObjectProxy> object_proxy =
238       new ObjectProxy(this, service_name, object_path, options);
239   object_proxy_table_[key] = object_proxy;
240 
241   return object_proxy.get();
242 }
243 
RemoveObjectProxy(const std::string & service_name,const ObjectPath & object_path,const base::Closure & callback)244 bool Bus::RemoveObjectProxy(const std::string& service_name,
245                             const ObjectPath& object_path,
246                             const base::Closure& callback) {
247   return RemoveObjectProxyWithOptions(service_name, object_path,
248                                       ObjectProxy::DEFAULT_OPTIONS,
249                                       callback);
250 }
251 
RemoveObjectProxyWithOptions(const std::string & service_name,const ObjectPath & object_path,int options,const base::Closure & callback)252 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
253                                        const ObjectPath& object_path,
254                                        int options,
255                                        const base::Closure& callback) {
256   AssertOnOriginThread();
257 
258   // Check if we have the requested object proxy.
259   const ObjectProxyTable::key_type key(service_name + object_path.value(),
260                                        options);
261   ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
262   if (iter != object_proxy_table_.end()) {
263     scoped_refptr<ObjectProxy> object_proxy = iter->second;
264     object_proxy_table_.erase(iter);
265     // Object is present. Remove it now and Detach on the DBus thread.
266     GetDBusTaskRunner()->PostTask(
267         FROM_HERE,
268         base::Bind(&Bus::RemoveObjectProxyInternal,
269                    this, object_proxy, callback));
270     return true;
271   }
272   return false;
273 }
274 
RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,const base::Closure & callback)275 void Bus::RemoveObjectProxyInternal(scoped_refptr<ObjectProxy> object_proxy,
276                                     const base::Closure& callback) {
277   AssertOnDBusThread();
278 
279   object_proxy.get()->Detach();
280 
281   GetOriginTaskRunner()->PostTask(FROM_HERE, callback);
282 }
283 
GetExportedObject(const ObjectPath & object_path)284 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
285   AssertOnOriginThread();
286 
287   // Check if we already have the requested exported object.
288   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
289   if (iter != exported_object_table_.end()) {
290     return iter->second.get();
291   }
292 
293   scoped_refptr<ExportedObject> exported_object =
294       new ExportedObject(this, object_path);
295   exported_object_table_[object_path] = exported_object;
296 
297   return exported_object.get();
298 }
299 
UnregisterExportedObject(const ObjectPath & object_path)300 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
301   AssertOnOriginThread();
302 
303   // Remove the registered object from the table first, to allow a new
304   // GetExportedObject() call to return a new object, rather than this one.
305   ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
306   if (iter == exported_object_table_.end())
307     return;
308 
309   scoped_refptr<ExportedObject> exported_object = iter->second;
310   exported_object_table_.erase(iter);
311 
312   // Post the task to perform the final unregistration to the D-Bus thread.
313   // Since the registration also happens on the D-Bus thread in
314   // TryRegisterObjectPath(), and the task runner we post to is a
315   // SequencedTaskRunner, there is a guarantee that this will happen before any
316   // future registration call.
317   GetDBusTaskRunner()->PostTask(
318       FROM_HERE,
319       base::Bind(&Bus::UnregisterExportedObjectInternal,
320                  this, exported_object));
321 }
322 
UnregisterExportedObjectInternal(scoped_refptr<ExportedObject> exported_object)323 void Bus::UnregisterExportedObjectInternal(
324     scoped_refptr<ExportedObject> exported_object) {
325   AssertOnDBusThread();
326 
327   exported_object->Unregister();
328 }
329 
GetObjectManager(const std::string & service_name,const ObjectPath & object_path)330 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
331                                      const ObjectPath& object_path) {
332   AssertOnOriginThread();
333 
334   // Check if we already have the requested object manager.
335   const ObjectManagerTable::key_type key(service_name + object_path.value());
336   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
337   if (iter != object_manager_table_.end()) {
338     return iter->second.get();
339   }
340 
341   scoped_refptr<ObjectManager> object_manager =
342       new ObjectManager(this, service_name, object_path);
343   object_manager_table_[key] = object_manager;
344 
345   return object_manager.get();
346 }
347 
RemoveObjectManager(const std::string & service_name,const ObjectPath & object_path,const base::Closure & callback)348 bool Bus::RemoveObjectManager(const std::string& service_name,
349                               const ObjectPath& object_path,
350                               const base::Closure& callback) {
351   AssertOnOriginThread();
352   DCHECK(!callback.is_null());
353 
354   const ObjectManagerTable::key_type key(service_name + object_path.value());
355   ObjectManagerTable::iterator iter = object_manager_table_.find(key);
356   if (iter == object_manager_table_.end())
357     return false;
358 
359   // ObjectManager is present. Remove it now and CleanUp on the DBus thread.
360   scoped_refptr<ObjectManager> object_manager = iter->second;
361   object_manager_table_.erase(iter);
362 
363   GetDBusTaskRunner()->PostTask(
364       FROM_HERE,
365       base::Bind(&Bus::RemoveObjectManagerInternal,
366                  this, object_manager, callback));
367 
368   return true;
369 }
370 
RemoveObjectManagerInternal(scoped_refptr<dbus::ObjectManager> object_manager,const base::Closure & callback)371 void Bus::RemoveObjectManagerInternal(
372       scoped_refptr<dbus::ObjectManager> object_manager,
373       const base::Closure& callback) {
374   AssertOnDBusThread();
375   DCHECK(object_manager.get());
376 
377   object_manager->CleanUp();
378 
379   // The ObjectManager has to be deleted on the origin thread since it was
380   // created there.
381   GetOriginTaskRunner()->PostTask(
382       FROM_HERE,
383       base::Bind(&Bus::RemoveObjectManagerInternalHelper,
384                  this, object_manager, callback));
385 }
386 
RemoveObjectManagerInternalHelper(scoped_refptr<dbus::ObjectManager> object_manager,const base::Closure & callback)387 void Bus::RemoveObjectManagerInternalHelper(
388       scoped_refptr<dbus::ObjectManager> object_manager,
389       const base::Closure& callback) {
390   AssertOnOriginThread();
391   DCHECK(object_manager.get());
392 
393   // Release the object manager and run the callback.
394   object_manager = NULL;
395   callback.Run();
396 }
397 
GetManagedObjects()398 void Bus::GetManagedObjects() {
399   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
400        iter != object_manager_table_.end(); ++iter) {
401     iter->second->GetManagedObjects();
402   }
403 }
404 
Connect()405 bool Bus::Connect() {
406   // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
407   AssertOnDBusThread();
408 
409   // Check if it's already initialized.
410   if (connection_)
411     return true;
412 
413   ScopedDBusError error;
414   if (bus_type_ == CUSTOM_ADDRESS) {
415     if (connection_type_ == PRIVATE) {
416       connection_ = dbus_connection_open_private(address_.c_str(), error.get());
417     } else {
418       connection_ = dbus_connection_open(address_.c_str(), error.get());
419     }
420   } else {
421     const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
422     if (connection_type_ == PRIVATE) {
423       connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
424     } else {
425       connection_ = dbus_bus_get(dbus_bus_type, error.get());
426     }
427   }
428   if (!connection_) {
429     LOG(ERROR) << "Failed to connect to the bus: "
430                << (error.is_set() ? error.message() : "");
431     return false;
432   }
433 
434   if (bus_type_ == CUSTOM_ADDRESS) {
435     // We should call dbus_bus_register here, otherwise unique name can not be
436     // acquired. According to dbus specification, it is responsible to call
437     // org.freedesktop.DBus.Hello method at the beging of bus connection to
438     // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
439     // called internally.
440     if (!dbus_bus_register(connection_, error.get())) {
441       LOG(ERROR) << "Failed to register the bus component: "
442                  << (error.is_set() ? error.message() : "");
443       return false;
444     }
445   }
446 
447   return true;
448 }
449 
ClosePrivateConnection()450 void Bus::ClosePrivateConnection() {
451   // dbus_connection_close is blocking call.
452   AssertOnDBusThread();
453   DCHECK_EQ(PRIVATE, connection_type_)
454       << "non-private connection should not be closed";
455   dbus_connection_close(connection_);
456 }
457 
ShutdownAndBlock()458 void Bus::ShutdownAndBlock() {
459   AssertOnDBusThread();
460 
461   if (shutdown_completed_)
462     return;  // Already shutdowned, just return.
463 
464   // Unregister the exported objects.
465   for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
466        iter != exported_object_table_.end(); ++iter) {
467     iter->second->Unregister();
468   }
469 
470   // Release all service names.
471   for (std::set<std::string>::iterator iter = owned_service_names_.begin();
472        iter != owned_service_names_.end();) {
473     // This is a bit tricky but we should increment the iter here as
474     // ReleaseOwnership() may remove |service_name| from the set.
475     const std::string& service_name = *iter++;
476     ReleaseOwnership(service_name);
477   }
478   if (!owned_service_names_.empty()) {
479     LOG(ERROR) << "Failed to release all service names. # of services left: "
480                << owned_service_names_.size();
481   }
482 
483   // Detach from the remote objects.
484   for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
485        iter != object_proxy_table_.end(); ++iter) {
486     iter->second->Detach();
487   }
488 
489   // Clean up the object managers.
490   for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
491        iter != object_manager_table_.end(); ++iter) {
492     iter->second->CleanUp();
493   }
494 
495   // Release object proxies and exported objects here. We should do this
496   // here rather than in the destructor to avoid memory leaks due to
497   // cyclic references.
498   object_proxy_table_.clear();
499   exported_object_table_.clear();
500 
501   // Private connection should be closed.
502   if (connection_) {
503     // Remove Disconnected watcher.
504     ScopedDBusError error;
505 
506     if (connection_type_ == PRIVATE)
507       ClosePrivateConnection();
508     // dbus_connection_close() won't unref.
509     dbus_connection_unref(connection_);
510   }
511 
512   connection_ = NULL;
513   shutdown_completed_ = true;
514 }
515 
ShutdownOnDBusThreadAndBlock()516 void Bus::ShutdownOnDBusThreadAndBlock() {
517   AssertOnOriginThread();
518   DCHECK(dbus_task_runner_.get());
519 
520   GetDBusTaskRunner()->PostTask(
521       FROM_HERE,
522       base::Bind(&Bus::ShutdownOnDBusThreadAndBlockInternal, this));
523 
524   // http://crbug.com/125222
525   base::ThreadRestrictions::ScopedAllowWait allow_wait;
526 
527   // Wait until the shutdown is complete on the D-Bus thread.
528   // The shutdown should not hang, but set timeout just in case.
529   const int kTimeoutSecs = 3;
530   const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
531   const bool signaled = on_shutdown_.TimedWait(timeout);
532   LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
533 }
534 
RequestOwnership(const std::string & service_name,ServiceOwnershipOptions options,OnOwnershipCallback on_ownership_callback)535 void Bus::RequestOwnership(const std::string& service_name,
536                            ServiceOwnershipOptions options,
537                            OnOwnershipCallback on_ownership_callback) {
538   AssertOnOriginThread();
539 
540   GetDBusTaskRunner()->PostTask(
541       FROM_HERE,
542       base::Bind(&Bus::RequestOwnershipInternal,
543                  this, service_name, options, on_ownership_callback));
544 }
545 
RequestOwnershipInternal(const std::string & service_name,ServiceOwnershipOptions options,OnOwnershipCallback on_ownership_callback)546 void Bus::RequestOwnershipInternal(const std::string& service_name,
547                                    ServiceOwnershipOptions options,
548                                    OnOwnershipCallback on_ownership_callback) {
549   AssertOnDBusThread();
550 
551   bool success = Connect();
552   if (success)
553     success = RequestOwnershipAndBlock(service_name, options);
554 
555   GetOriginTaskRunner()->PostTask(FROM_HERE,
556                                   base::Bind(on_ownership_callback,
557                                              service_name,
558                                              success));
559 }
560 
RequestOwnershipAndBlock(const std::string & service_name,ServiceOwnershipOptions options)561 bool Bus::RequestOwnershipAndBlock(const std::string& service_name,
562                                    ServiceOwnershipOptions options) {
563   DCHECK(connection_);
564   // dbus_bus_request_name() is a blocking call.
565   AssertOnDBusThread();
566 
567   // Check if we already own the service name.
568   if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
569     return true;
570   }
571 
572   ScopedDBusError error;
573   const int result = dbus_bus_request_name(connection_,
574                                            service_name.c_str(),
575                                            options,
576                                            error.get());
577   if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
578     LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
579                << (error.is_set() ? error.message() : "");
580     return false;
581   }
582   owned_service_names_.insert(service_name);
583   return true;
584 }
585 
ReleaseOwnership(const std::string & service_name)586 bool Bus::ReleaseOwnership(const std::string& service_name) {
587   DCHECK(connection_);
588   // dbus_bus_request_name() is a blocking call.
589   AssertOnDBusThread();
590 
591   // Check if we already own the service name.
592   std::set<std::string>::iterator found =
593       owned_service_names_.find(service_name);
594   if (found == owned_service_names_.end()) {
595     LOG(ERROR) << service_name << " is not owned by the bus";
596     return false;
597   }
598 
599   ScopedDBusError error;
600   const int result = dbus_bus_release_name(connection_, service_name.c_str(),
601                                            error.get());
602   if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
603     owned_service_names_.erase(found);
604     return true;
605   } else {
606     LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
607                << (error.is_set() ? error.message() : "")
608                << ", result code: " << result;
609     return false;
610   }
611 }
612 
SetUpAsyncOperations()613 bool Bus::SetUpAsyncOperations() {
614   DCHECK(connection_);
615   AssertOnDBusThread();
616 
617   if (async_operations_set_up_)
618     return true;
619 
620   // Process all the incoming data if any, so that OnDispatchStatus() will
621   // be called when the incoming data is ready.
622   ProcessAllIncomingDataIfAny();
623 
624   bool success = dbus_connection_set_watch_functions(connection_,
625                                                      &Bus::OnAddWatchThunk,
626                                                      &Bus::OnRemoveWatchThunk,
627                                                      &Bus::OnToggleWatchThunk,
628                                                      this,
629                                                      NULL);
630   CHECK(success) << "Unable to allocate memory";
631 
632   success = dbus_connection_set_timeout_functions(connection_,
633                                                   &Bus::OnAddTimeoutThunk,
634                                                   &Bus::OnRemoveTimeoutThunk,
635                                                   &Bus::OnToggleTimeoutThunk,
636                                                   this,
637                                                   NULL);
638   CHECK(success) << "Unable to allocate memory";
639 
640   dbus_connection_set_dispatch_status_function(
641       connection_,
642       &Bus::OnDispatchStatusChangedThunk,
643       this,
644       NULL);
645 
646   async_operations_set_up_ = true;
647 
648   return true;
649 }
650 
SendWithReplyAndBlock(DBusMessage * request,int timeout_ms,DBusError * error)651 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
652                                         int timeout_ms,
653                                         DBusError* error) {
654   DCHECK(connection_);
655   AssertOnDBusThread();
656 
657   return dbus_connection_send_with_reply_and_block(
658       connection_, request, timeout_ms, error);
659 }
660 
SendWithReply(DBusMessage * request,DBusPendingCall ** pending_call,int timeout_ms)661 void Bus::SendWithReply(DBusMessage* request,
662                         DBusPendingCall** pending_call,
663                         int timeout_ms) {
664   DCHECK(connection_);
665   AssertOnDBusThread();
666 
667   const bool success = dbus_connection_send_with_reply(
668       connection_, request, pending_call, timeout_ms);
669   CHECK(success) << "Unable to allocate memory";
670 }
671 
Send(DBusMessage * request,uint32_t * serial)672 void Bus::Send(DBusMessage* request, uint32_t* serial) {
673   DCHECK(connection_);
674   AssertOnDBusThread();
675 
676   const bool success = dbus_connection_send(connection_, request, serial);
677   CHECK(success) << "Unable to allocate memory";
678 }
679 
AddFilterFunction(DBusHandleMessageFunction filter_function,void * user_data)680 void Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
681                             void* user_data) {
682   DCHECK(connection_);
683   AssertOnDBusThread();
684 
685   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
686       std::make_pair(filter_function, user_data);
687   if (filter_functions_added_.find(filter_data_pair) !=
688       filter_functions_added_.end()) {
689     VLOG(1) << "Filter function already exists: " << filter_function
690             << " with associated data: " << user_data;
691     return;
692   }
693 
694   const bool success = dbus_connection_add_filter(
695       connection_, filter_function, user_data, NULL);
696   CHECK(success) << "Unable to allocate memory";
697   filter_functions_added_.insert(filter_data_pair);
698 }
699 
RemoveFilterFunction(DBusHandleMessageFunction filter_function,void * user_data)700 void Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
701                                void* user_data) {
702   DCHECK(connection_);
703   AssertOnDBusThread();
704 
705   std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
706       std::make_pair(filter_function, user_data);
707   if (filter_functions_added_.find(filter_data_pair) ==
708       filter_functions_added_.end()) {
709     VLOG(1) << "Requested to remove an unknown filter function: "
710             << filter_function
711             << " with associated data: " << user_data;
712     return;
713   }
714 
715   dbus_connection_remove_filter(connection_, filter_function, user_data);
716   filter_functions_added_.erase(filter_data_pair);
717 }
718 
AddMatch(const std::string & match_rule,DBusError * error)719 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
720   DCHECK(connection_);
721   AssertOnDBusThread();
722 
723   std::map<std::string, int>::iterator iter =
724       match_rules_added_.find(match_rule);
725   if (iter != match_rules_added_.end()) {
726     // The already existing rule's counter is incremented.
727     iter->second++;
728 
729     VLOG(1) << "Match rule already exists: " << match_rule;
730     return;
731   }
732 
733   dbus_bus_add_match(connection_, match_rule.c_str(), error);
734   match_rules_added_[match_rule] = 1;
735 }
736 
RemoveMatch(const std::string & match_rule,DBusError * error)737 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
738   DCHECK(connection_);
739   AssertOnDBusThread();
740 
741   std::map<std::string, int>::iterator iter =
742       match_rules_added_.find(match_rule);
743   if (iter == match_rules_added_.end()) {
744     LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
745     return false;
746   }
747 
748   // The rule's counter is decremented and the rule is deleted when reachs 0.
749   iter->second--;
750   if (iter->second == 0) {
751     dbus_bus_remove_match(connection_, match_rule.c_str(), error);
752     match_rules_added_.erase(match_rule);
753   }
754   return true;
755 }
756 
TryRegisterObjectPath(const ObjectPath & object_path,const DBusObjectPathVTable * vtable,void * user_data,DBusError * error)757 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
758                                 const DBusObjectPathVTable* vtable,
759                                 void* user_data,
760                                 DBusError* error) {
761   DCHECK(connection_);
762   AssertOnDBusThread();
763 
764   if (registered_object_paths_.find(object_path) !=
765       registered_object_paths_.end()) {
766     LOG(ERROR) << "Object path already registered: " << object_path.value();
767     return false;
768   }
769 
770   const bool success = dbus_connection_try_register_object_path(
771       connection_,
772       object_path.value().c_str(),
773       vtable,
774       user_data,
775       error);
776   if (success)
777     registered_object_paths_.insert(object_path);
778   return success;
779 }
780 
UnregisterObjectPath(const ObjectPath & object_path)781 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
782   DCHECK(connection_);
783   AssertOnDBusThread();
784 
785   if (registered_object_paths_.find(object_path) ==
786       registered_object_paths_.end()) {
787     LOG(ERROR) << "Requested to unregister an unknown object path: "
788                << object_path.value();
789     return;
790   }
791 
792   const bool success = dbus_connection_unregister_object_path(
793       connection_,
794       object_path.value().c_str());
795   CHECK(success) << "Unable to allocate memory";
796   registered_object_paths_.erase(object_path);
797 }
798 
ShutdownOnDBusThreadAndBlockInternal()799 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
800   AssertOnDBusThread();
801 
802   ShutdownAndBlock();
803   on_shutdown_.Signal();
804 }
805 
ProcessAllIncomingDataIfAny()806 void Bus::ProcessAllIncomingDataIfAny() {
807   AssertOnDBusThread();
808 
809   // As mentioned at the class comment in .h file, connection_ can be NULL.
810   if (!connection_)
811     return;
812 
813   // It is safe and necessary to call dbus_connection_get_dispatch_status even
814   // if the connection is lost.
815   if (dbus_connection_get_dispatch_status(connection_) ==
816       DBUS_DISPATCH_DATA_REMAINS) {
817     while (dbus_connection_dispatch(connection_) ==
818            DBUS_DISPATCH_DATA_REMAINS) {
819     }
820   }
821 }
822 
GetDBusTaskRunner()823 base::TaskRunner* Bus::GetDBusTaskRunner() {
824   if (dbus_task_runner_.get())
825     return dbus_task_runner_.get();
826   else
827     return GetOriginTaskRunner();
828 }
829 
GetOriginTaskRunner()830 base::TaskRunner* Bus::GetOriginTaskRunner() {
831   DCHECK(origin_task_runner_.get());
832   return origin_task_runner_.get();
833 }
834 
HasDBusThread()835 bool Bus::HasDBusThread() {
836   return dbus_task_runner_.get() != NULL;
837 }
838 
AssertOnOriginThread()839 void Bus::AssertOnOriginThread() {
840   DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
841 }
842 
AssertOnDBusThread()843 void Bus::AssertOnDBusThread() {
844   base::ThreadRestrictions::AssertIOAllowed();
845 
846   if (dbus_task_runner_.get()) {
847     DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
848   } else {
849     AssertOnOriginThread();
850   }
851 }
852 
GetServiceOwnerAndBlock(const std::string & service_name,GetServiceOwnerOption options)853 std::string Bus::GetServiceOwnerAndBlock(const std::string& service_name,
854                                          GetServiceOwnerOption options) {
855   AssertOnDBusThread();
856 
857   MethodCall get_name_owner_call("org.freedesktop.DBus", "GetNameOwner");
858   MessageWriter writer(&get_name_owner_call);
859   writer.AppendString(service_name);
860   VLOG(1) << "Method call: " << get_name_owner_call.ToString();
861 
862   const ObjectPath obj_path("/org/freedesktop/DBus");
863   if (!get_name_owner_call.SetDestination("org.freedesktop.DBus") ||
864       !get_name_owner_call.SetPath(obj_path)) {
865     if (options == REPORT_ERRORS)
866       LOG(ERROR) << "Failed to get name owner.";
867     return "";
868   }
869 
870   ScopedDBusError error;
871   DBusMessage* response_message =
872       SendWithReplyAndBlock(get_name_owner_call.raw_message(),
873                             ObjectProxy::TIMEOUT_USE_DEFAULT,
874                             error.get());
875   if (!response_message) {
876     if (options == REPORT_ERRORS) {
877       LOG(ERROR) << "Failed to get name owner. Got " << error.name() << ": "
878                  << error.message();
879     }
880     return "";
881   }
882 
883   std::unique_ptr<Response> response(
884       Response::FromRawMessage(response_message));
885   MessageReader reader(response.get());
886 
887   std::string service_owner;
888   if (!reader.PopString(&service_owner))
889     service_owner.clear();
890   return service_owner;
891 }
892 
GetServiceOwner(const std::string & service_name,const GetServiceOwnerCallback & callback)893 void Bus::GetServiceOwner(const std::string& service_name,
894                           const GetServiceOwnerCallback& callback) {
895   AssertOnOriginThread();
896 
897   GetDBusTaskRunner()->PostTask(
898       FROM_HERE,
899       base::Bind(&Bus::GetServiceOwnerInternal, this, service_name, callback));
900 }
901 
GetServiceOwnerInternal(const std::string & service_name,const GetServiceOwnerCallback & callback)902 void Bus::GetServiceOwnerInternal(const std::string& service_name,
903                                   const GetServiceOwnerCallback& callback) {
904   AssertOnDBusThread();
905 
906   std::string service_owner;
907   if (Connect())
908     service_owner = GetServiceOwnerAndBlock(service_name, SUPPRESS_ERRORS);
909   GetOriginTaskRunner()->PostTask(FROM_HERE,
910                                   base::Bind(callback, service_owner));
911 }
912 
ListenForServiceOwnerChange(const std::string & service_name,const GetServiceOwnerCallback & callback)913 void Bus::ListenForServiceOwnerChange(
914     const std::string& service_name,
915     const GetServiceOwnerCallback& callback) {
916   AssertOnOriginThread();
917   DCHECK(!service_name.empty());
918   DCHECK(!callback.is_null());
919 
920   GetDBusTaskRunner()->PostTask(
921       FROM_HERE,
922       base::Bind(&Bus::ListenForServiceOwnerChangeInternal,
923                  this, service_name, callback));
924 }
925 
ListenForServiceOwnerChangeInternal(const std::string & service_name,const GetServiceOwnerCallback & callback)926 void Bus::ListenForServiceOwnerChangeInternal(
927     const std::string& service_name,
928     const GetServiceOwnerCallback& callback) {
929   AssertOnDBusThread();
930   DCHECK(!service_name.empty());
931   DCHECK(!callback.is_null());
932 
933   if (!Connect() || !SetUpAsyncOperations())
934     return;
935 
936   if (service_owner_changed_listener_map_.empty())
937     AddFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
938 
939   ServiceOwnerChangedListenerMap::iterator it =
940       service_owner_changed_listener_map_.find(service_name);
941   if (it == service_owner_changed_listener_map_.end()) {
942     // Add a match rule for the new service name.
943     const std::string name_owner_changed_match_rule =
944         base::StringPrintf(kServiceNameOwnerChangeMatchRule,
945                            service_name.c_str());
946     ScopedDBusError error;
947     AddMatch(name_owner_changed_match_rule, error.get());
948     if (error.is_set()) {
949       LOG(ERROR) << "Failed to add match rule for " << service_name
950                  << ". Got " << error.name() << ": " << error.message();
951       return;
952     }
953 
954     service_owner_changed_listener_map_[service_name].push_back(callback);
955     return;
956   }
957 
958   // Check if the callback has already been added.
959   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
960   for (size_t i = 0; i < callbacks.size(); ++i) {
961     if (callbacks[i].Equals(callback))
962       return;
963   }
964   callbacks.push_back(callback);
965 }
966 
UnlistenForServiceOwnerChange(const std::string & service_name,const GetServiceOwnerCallback & callback)967 void Bus::UnlistenForServiceOwnerChange(
968     const std::string& service_name,
969     const GetServiceOwnerCallback& callback) {
970   AssertOnOriginThread();
971   DCHECK(!service_name.empty());
972   DCHECK(!callback.is_null());
973 
974   GetDBusTaskRunner()->PostTask(
975       FROM_HERE,
976       base::Bind(&Bus::UnlistenForServiceOwnerChangeInternal,
977                  this, service_name, callback));
978 }
979 
UnlistenForServiceOwnerChangeInternal(const std::string & service_name,const GetServiceOwnerCallback & callback)980 void Bus::UnlistenForServiceOwnerChangeInternal(
981     const std::string& service_name,
982     const GetServiceOwnerCallback& callback) {
983   AssertOnDBusThread();
984   DCHECK(!service_name.empty());
985   DCHECK(!callback.is_null());
986 
987   ServiceOwnerChangedListenerMap::iterator it =
988       service_owner_changed_listener_map_.find(service_name);
989   if (it == service_owner_changed_listener_map_.end())
990     return;
991 
992   std::vector<GetServiceOwnerCallback>& callbacks = it->second;
993   for (size_t i = 0; i < callbacks.size(); ++i) {
994     if (callbacks[i].Equals(callback)) {
995       callbacks.erase(callbacks.begin() + i);
996       break;  // There can be only one.
997     }
998   }
999   if (!callbacks.empty())
1000     return;
1001 
1002   // Last callback for |service_name| has been removed, remove match rule.
1003   const std::string name_owner_changed_match_rule =
1004       base::StringPrintf(kServiceNameOwnerChangeMatchRule,
1005                          service_name.c_str());
1006   ScopedDBusError error;
1007   RemoveMatch(name_owner_changed_match_rule, error.get());
1008   // And remove |service_owner_changed_listener_map_| entry.
1009   service_owner_changed_listener_map_.erase(it);
1010 
1011   if (service_owner_changed_listener_map_.empty())
1012     RemoveFilterFunction(Bus::OnServiceOwnerChangedFilter, this);
1013 }
1014 
GetConnectionName()1015 std::string Bus::GetConnectionName() {
1016   if (!connection_)
1017     return "";
1018   return dbus_bus_get_unique_name(connection_);
1019 }
1020 
OnAddWatch(DBusWatch * raw_watch)1021 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
1022   AssertOnDBusThread();
1023 
1024   // watch will be deleted when raw_watch is removed in OnRemoveWatch().
1025   Watch* watch = new Watch(raw_watch);
1026   if (watch->IsReadyToBeWatched()) {
1027     watch->StartWatching();
1028   }
1029   ++num_pending_watches_;
1030   return true;
1031 }
1032 
OnRemoveWatch(DBusWatch * raw_watch)1033 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
1034   AssertOnDBusThread();
1035 
1036   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1037   delete watch;
1038   --num_pending_watches_;
1039 }
1040 
OnToggleWatch(DBusWatch * raw_watch)1041 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
1042   AssertOnDBusThread();
1043 
1044   Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
1045   if (watch->IsReadyToBeWatched()) {
1046     watch->StartWatching();
1047   } else {
1048     // It's safe to call this if StartWatching() wasn't called, per
1049     // message_pump_libevent.h.
1050     watch->StopWatching();
1051   }
1052 }
1053 
OnAddTimeout(DBusTimeout * raw_timeout)1054 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
1055   AssertOnDBusThread();
1056 
1057   // timeout will be deleted when raw_timeout is removed in
1058   // OnRemoveTimeoutThunk().
1059   Timeout* timeout = new Timeout(raw_timeout);
1060   if (timeout->IsReadyToBeMonitored()) {
1061     timeout->StartMonitoring(this);
1062   }
1063   ++num_pending_timeouts_;
1064   return true;
1065 }
1066 
OnRemoveTimeout(DBusTimeout * raw_timeout)1067 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
1068   AssertOnDBusThread();
1069 
1070   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1071   timeout->Complete();
1072   --num_pending_timeouts_;
1073 }
1074 
OnToggleTimeout(DBusTimeout * raw_timeout)1075 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
1076   AssertOnDBusThread();
1077 
1078   Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
1079   if (timeout->IsReadyToBeMonitored()) {
1080     timeout->StartMonitoring(this);
1081   } else {
1082     timeout->StopMonitoring();
1083   }
1084 }
1085 
OnDispatchStatusChanged(DBusConnection * connection,DBusDispatchStatus)1086 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
1087                                   DBusDispatchStatus /*status*/) {
1088   DCHECK_EQ(connection, connection_);
1089   AssertOnDBusThread();
1090 
1091   // We cannot call ProcessAllIncomingDataIfAny() here, as calling
1092   // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
1093   // prohibited by the D-Bus library. Hence, we post a task here instead.
1094   // See comments for dbus_connection_set_dispatch_status_function().
1095   GetDBusTaskRunner()->PostTask(FROM_HERE,
1096                                 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
1097                                            this));
1098 }
1099 
OnServiceOwnerChanged(DBusMessage * message)1100 void Bus::OnServiceOwnerChanged(DBusMessage* message) {
1101   DCHECK(message);
1102   AssertOnDBusThread();
1103 
1104   // |message| will be unrefed on exit of the function. Increment the
1105   // reference so we can use it in Signal::FromRawMessage() below.
1106   dbus_message_ref(message);
1107   std::unique_ptr<Signal> signal(Signal::FromRawMessage(message));
1108 
1109   // Confirm the validity of the NameOwnerChanged signal.
1110   if (signal->GetMember() != kNameOwnerChangedSignal ||
1111       signal->GetInterface() != DBUS_INTERFACE_DBUS ||
1112       signal->GetSender() != DBUS_SERVICE_DBUS) {
1113     return;
1114   }
1115 
1116   MessageReader reader(signal.get());
1117   std::string service_name;
1118   std::string old_owner;
1119   std::string new_owner;
1120   if (!reader.PopString(&service_name) ||
1121       !reader.PopString(&old_owner) ||
1122       !reader.PopString(&new_owner)) {
1123     return;
1124   }
1125 
1126   ServiceOwnerChangedListenerMap::const_iterator it =
1127       service_owner_changed_listener_map_.find(service_name);
1128   if (it == service_owner_changed_listener_map_.end())
1129     return;
1130 
1131   const std::vector<GetServiceOwnerCallback>& callbacks = it->second;
1132   for (size_t i = 0; i < callbacks.size(); ++i) {
1133     GetOriginTaskRunner()->PostTask(FROM_HERE,
1134                                     base::Bind(callbacks[i], new_owner));
1135   }
1136 }
1137 
1138 // static
OnAddWatchThunk(DBusWatch * raw_watch,void * data)1139 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
1140   Bus* self = static_cast<Bus*>(data);
1141   return self->OnAddWatch(raw_watch);
1142 }
1143 
1144 // static
OnRemoveWatchThunk(DBusWatch * raw_watch,void * data)1145 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
1146   Bus* self = static_cast<Bus*>(data);
1147   self->OnRemoveWatch(raw_watch);
1148 }
1149 
1150 // static
OnToggleWatchThunk(DBusWatch * raw_watch,void * data)1151 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
1152   Bus* self = static_cast<Bus*>(data);
1153   self->OnToggleWatch(raw_watch);
1154 }
1155 
1156 // static
OnAddTimeoutThunk(DBusTimeout * raw_timeout,void * data)1157 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1158   Bus* self = static_cast<Bus*>(data);
1159   return self->OnAddTimeout(raw_timeout);
1160 }
1161 
1162 // static
OnRemoveTimeoutThunk(DBusTimeout * raw_timeout,void * data)1163 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1164   Bus* self = static_cast<Bus*>(data);
1165   self->OnRemoveTimeout(raw_timeout);
1166 }
1167 
1168 // static
OnToggleTimeoutThunk(DBusTimeout * raw_timeout,void * data)1169 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
1170   Bus* self = static_cast<Bus*>(data);
1171   self->OnToggleTimeout(raw_timeout);
1172 }
1173 
1174 // static
OnDispatchStatusChangedThunk(DBusConnection * connection,DBusDispatchStatus status,void * data)1175 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
1176                                        DBusDispatchStatus status,
1177                                        void* data) {
1178   Bus* self = static_cast<Bus*>(data);
1179   self->OnDispatchStatusChanged(connection, status);
1180 }
1181 
1182 // static
OnServiceOwnerChangedFilter(DBusConnection *,DBusMessage * message,void * data)1183 DBusHandlerResult Bus::OnServiceOwnerChangedFilter(
1184     DBusConnection* /*connection*/,
1185     DBusMessage* message,
1186     void* data) {
1187   if (dbus_message_is_signal(message,
1188                              DBUS_INTERFACE_DBUS,
1189                              kNameOwnerChangedSignal)) {
1190     Bus* self = static_cast<Bus*>(data);
1191     self->OnServiceOwnerChanged(message);
1192   }
1193   // Always return unhandled to let others, e.g. ObjectProxies, handle the same
1194   // signal.
1195   return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
1196 }
1197 
1198 }  // namespace dbus
1199