1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus 6import time 7 8from autotest_lib.client.bin import test 9from autotest_lib.client.common_lib import error 10from autotest_lib.client.common_lib.cros.tendo import peerd_config 11from autotest_lib.client.cros import dbus_util 12 13 14SERVICE_ID = 'test-service' 15 16class peerd_MonitorsDBusConnections(test.test): 17 """Test that peerd removes services when processes disconnect from DBus.""" 18 19 version = 1 20 21 22 def _check_has_test_service(self, expect_service=True): 23 services = dbus_util.get_objects_with_interface( 24 peerd_config.SERVICE_NAME, 25 peerd_config.OBJECT_MANAGER_PATH, 26 peerd_config.DBUS_INTERFACE_SERVICE, 27 path_prefix=peerd_config.DBUS_PATH_SELF, 28 bus=self._bus) 29 found_service = False 30 # services is a map of object path to dicts of DBus interface to 31 # properties exposed by that interface. 32 for path, interfaces in services.iteritems(): 33 for interface, properties in interfaces.iteritems(): 34 if interface != peerd_config.DBUS_INTERFACE_SERVICE: 35 continue 36 if (properties[peerd_config.SERVICE_PROPERTY_SERVICE_ID] 37 != SERVICE_ID): 38 continue 39 if found_service: 40 raise error.TestFail('Found multiple test service ' 41 'instances?') 42 found_service = True 43 44 if expect_service != found_service: 45 raise error.TestFail('Expected to see test service, but did not.') 46 47 48 def run_once(self): 49 self._bus = dbus.SystemBus() 50 config = peerd_config.PeerdConfig(verbosity_level=5) 51 config.restart_with_config() 52 self._check_has_test_service(expect_service=False) 53 self._manager = dbus.Interface( 54 self._bus.get_object(peerd_config.SERVICE_NAME, 55 peerd_config.DBUS_PATH_MANAGER), 56 peerd_config.DBUS_INTERFACE_MANAGER) 57 self._manager.ExposeService(SERVICE_ID, 58 dbus.Dictionary(signature='ss'), 59 dbus.Dictionary(signature='sv')) 60 # Python keeps the DBus connection sitting around unless we 61 # explicitly close it. The service should still be there. 62 time.sleep(1) # Peerd might take some time to publish the service. 63 self._check_has_test_service() 64 # Close our previous connection, open a new one. 65 self._bus.close() 66 self._bus = dbus.SystemBus() 67 time.sleep(1) # Peerd might take some time to remove the service. 68 self._check_has_test_service(expect_service=False) 69