1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6import time
7
8from autotest_lib.client.bin import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros.tendo import peerd_config
11from autotest_lib.client.cros import dbus_util
12
13
14SERVICE_ID = 'test-service'
15
16class peerd_MonitorsDBusConnections(test.test):
17    """Test that peerd removes services when processes disconnect from DBus."""
18
19    version = 1
20
21
22    def _check_has_test_service(self, expect_service=True):
23        services = dbus_util.get_objects_with_interface(
24                peerd_config.SERVICE_NAME,
25                peerd_config.OBJECT_MANAGER_PATH,
26                peerd_config.DBUS_INTERFACE_SERVICE,
27                path_prefix=peerd_config.DBUS_PATH_SELF,
28                bus=self._bus)
29        found_service = False
30        # services is a map of object path to dicts of DBus interface to
31        # properties exposed by that interface.
32        for path, interfaces in services.iteritems():
33            for interface, properties in interfaces.iteritems():
34                if interface != peerd_config.DBUS_INTERFACE_SERVICE:
35                    continue
36                if (properties[peerd_config.SERVICE_PROPERTY_SERVICE_ID]
37                        != SERVICE_ID):
38                    continue
39                if found_service:
40                    raise error.TestFail('Found multiple test service '
41                                         'instances?')
42                found_service = True
43
44        if expect_service != found_service:
45            raise error.TestFail('Expected to see test service, but did not.')
46
47
48    def run_once(self):
49        self._bus = dbus.SystemBus()
50        config = peerd_config.PeerdConfig(verbosity_level=5)
51        config.restart_with_config()
52        self._check_has_test_service(expect_service=False)
53        self._manager = dbus.Interface(
54                self._bus.get_object(peerd_config.SERVICE_NAME,
55                                     peerd_config.DBUS_PATH_MANAGER),
56                peerd_config.DBUS_INTERFACE_MANAGER)
57        self._manager.ExposeService(SERVICE_ID,
58                                    dbus.Dictionary(signature='ss'),
59                                    dbus.Dictionary(signature='sv'))
60        # Python keeps the DBus connection sitting around unless we
61        # explicitly close it.  The service should still be there.
62        time.sleep(1)  # Peerd might take some time to publish the service.
63        self._check_has_test_service()
64        # Close our previous connection, open a new one.
65        self._bus.close()
66        self._bus = dbus.SystemBus()
67        time.sleep(1)  # Peerd might take some time to remove the service.
68        self._check_has_test_service(expect_service=False)
69