1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client class to access the Floss manager interface.""" 15 16from floss.pandora.floss import observer_base 17from floss.pandora.floss import utils 18 19 20class ManagerCallbacks: 21 """Callbacks for the Manager Interface. 22 23 Implement this to observe these callbacks when exporting callbacks via 24 register_callback. 25 """ 26 27 def on_hci_device_changed(self, hci, present): 28 """Hci device presence is updated. 29 30 Args: 31 hci: Hci interface number. 32 present: Whether this hci interface is appearing or disappearing. 33 """ 34 pass 35 36 def on_hci_enabled_changed(self, hci, enabled): 37 """Hci device is being enabled or disabled. 38 39 Args: 40 hci: Hci interface number. 41 enabled: Whether this hci interface is being enabled or disabled. 42 """ 43 pass 44 45 46class FlossManagerClient(ManagerCallbacks): 47 """Handles method calls to and callbacks from the Manager interface.""" 48 49 MGR_SERVICE = 'org.chromium.bluetooth.Manager' 50 MGR_INTERFACE = 'org.chromium.bluetooth.Manager' 51 MGR_OBJECT = '/org/chromium/bluetooth/Manager' 52 53 # Exported callback interface and objects 54 CB_EXPORTED_INTF = 'org.chromium.bluetooth.ManagerCallback' 55 CB_EXPORTED_OBJ_NAME = 'test_manager_client' 56 57 class AdaptersNotParseable(Exception): 58 """An entry in the result of GetAvailableAdapters was not parseable.""" 59 pass 60 61 class ExportedManagerCallbacks(observer_base.ObserverBase): 62 """ 63 <node> 64 <interface name="org.chromium.bluetooth.ManagerCallback"> 65 <method name="OnHciDeviceChanged"> 66 <arg type="i" name="hci" direction="in" /> 67 <arg type="b" name="present" direction="in" /> 68 </method> 69 <method name="OnHciEnabledChanged"> 70 <arg type="i" name="hci" direction="in" /> 71 <arg type="b" name="enabled" direction="in" /> 72 </method> 73 </interface> 74 </node> 75 """ 76 77 def __init__(self): 78 """Construct exported callbacks object.""" 79 observer_base.ObserverBase.__init__(self) 80 81 def OnHciDeviceChanged(self, hci, present): 82 """Handle device presence callbacks.""" 83 for observer in self.observers.values(): 84 observer.on_hci_device_changed(hci, present) 85 86 def OnHciEnabledChanged(self, hci, enabled): 87 """Handle device enabled callbacks.""" 88 for observer in self.observers.values(): 89 observer.on_hci_enabled_changed(hci, enabled) 90 91 def __init__(self, bus): 92 """Construct the client. 93 94 Args: 95 bus: DBus bus over which we'll establish connections. 96 """ 97 self.bus = bus 98 99 # We don't register callbacks by default. The client owner must call 100 # register_callbacks to do so. 101 self.callbacks = None 102 103 # Initialize hci devices and their power states 104 self.adapters = {} 105 106 def __del__(self): 107 """Destructor.""" 108 del self.callbacks 109 110 @utils.glib_call(False) 111 def has_proxy(self): 112 """Checks whether manager proxy can be acquired.""" 113 return bool(self.proxy()) 114 115 def proxy(self): 116 """Gets proxy object to manager interface for method calls.""" 117 return self.bus.get(self.MGR_SERVICE, self.MGR_OBJECT)[self.MGR_INTERFACE] 118 119 @utils.glib_call(False) 120 def register_callbacks(self): 121 """Registers manager callbacks for this client if one doesn't already exist.""" 122 # Callbacks already registered 123 if self.callbacks: 124 return True 125 126 # Create and publish callbacks 127 self.callbacks = self.ExportedManagerCallbacks() 128 self.callbacks.add_observer('manager_client', self) 129 objpath = utils.generate_dbus_cb_objpath(self.CB_EXPORTED_OBJ_NAME) 130 self.bus.register_object(objpath, self.callbacks, None) 131 132 # Register published callbacks with manager daemon 133 self.proxy().RegisterCallback(objpath) 134 135 return True 136 137 @utils.glib_callback() 138 def on_hci_device_changed(self, hci, present): 139 """Handle device presence change.""" 140 if present: 141 self.adapters[hci] = self.adapters.get(hci, False) 142 elif hci in self.adapters: 143 del self.adapters[hci] 144 145 @utils.glib_callback() 146 def on_hci_enabled_changed(self, hci, enabled): 147 """Handle device enabled change.""" 148 self.adapters[hci] = enabled 149 150 def get_default_adapter(self): 151 """Get the default adapter in use by the manager.""" 152 # TODO(abps): The default adapter is hci0 until we support multiple 153 # adapters. 154 return 0 155 156 def has_default_adapter(self): 157 """Checks whether the default adapter exists on this system.""" 158 return self.get_default_adapter() in self.adapters 159 160 @utils.glib_call() 161 def start(self, hci): 162 """Start a specific adapter.""" 163 self.proxy().Start(hci) 164 165 @utils.glib_call() 166 def stop(self, hci): 167 """Stop a specific adapter.""" 168 self.proxy().Stop(hci) 169 170 @utils.glib_call(False) 171 def get_adapter_enabled(self, hci): 172 """Checks whether a specific adapter is enabled (i.e. started).""" 173 return bool(self.proxy().GetAdapterEnabled(hci)) 174 175 @utils.glib_call(False) 176 def get_floss_enabled(self): 177 """Gets whether Floss is enabled.""" 178 return bool(self.proxy().GetFlossEnabled()) 179 180 @utils.glib_call() 181 def set_floss_enabled(self, enabled): 182 self.proxy().SetFlossEnabled(enabled) 183 184 @utils.glib_call([]) 185 def get_available_adapters(self): 186 """Gets a list of currently available adapters and if they are enabled.""" 187 all_adapters = [] 188 dbus_result = self.proxy().GetAvailableAdapters() 189 190 for d in dbus_result: 191 if 'hci_interface' in d and 'enabled' in d: 192 all_adapters.append((int(d['hci_interface']), bool(d['enabled']))) 193 else: 194 raise FlossManagerClient.AdaptersNotParseable(f'Could not parse: {d}') 195 196 # This function call overwrites any existing cached values of 197 # self.adapters that we may have gotten from observers. 198 self.adapters = {} 199 for (hci, enabled) in all_adapters: 200 self.adapters[hci] = enabled 201 202 return all_adapters 203