1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client class to access the Floss adapter interface.""" 15 16import logging 17import uuid as uuid_module 18 19from floss.pandora.floss import floss_enums 20from floss.pandora.floss import observer_base 21from floss.pandora.floss import utils 22from gi.repository import GLib 23 24 25class BluetoothCallbacks: 26 """Callbacks for the Adapter Interface. 27 28 Implement this to observe these callbacks when exporting callbacks via 29 register_callback. 30 """ 31 32 def on_address_changed(self, addr): 33 """Adapter address changed. 34 35 Args: 36 addr: New address of the adapter. 37 """ 38 pass 39 40 def on_device_found(self, remote_device): 41 """Device found via discovery. 42 43 Args: 44 remote_device: Remove device found during discovery session. 45 """ 46 pass 47 48 def on_discovering_changed(self, discovering): 49 """Discovering state has changed. 50 51 Args: 52 discovering: Whether discovery enabled or disabled. 53 """ 54 pass 55 56 def on_ssp_request(self, remote_device, class_of_device, variant, passkey): 57 """Simple secure pairing request for agent to reply. 58 59 Args: 60 remote_device: 61 Remote device that is being paired. 62 class_of_device: 63 Class of device as described in HCI spec. 64 variant: 65 SSP variant (0-3). [Confirmation, Entry, Consent, Notification] 66 passkey: 67 Passkey to display (so user can confirm or type it). 68 """ 69 pass 70 71 def on_pin_request(self, remote_device, cod, min_16_digit): 72 """When there is a pin request to display the event to client. 73 74 Args: 75 remote_device: 76 Remote device that is being paired. 77 cod: 78 Class of device as described in HCI spec. 79 min_16_digit: 80 True if the pin is 16 digit, False otherwise. 81 """ 82 pass 83 84 def on_pin_display(self, remote_device, pincode): 85 """When there is a auto-gen pin to display the event to client. 86 87 Args: 88 remote_device: 89 Remote device that is being paired. 90 pincode: 91 PIN code to display. 92 """ 93 pass 94 95 def on_bond_state_changed(self, status, address, state): 96 """Bonding/Pairing state has changed for a device. 97 98 Args: 99 status: 100 Success (0) or failure reason for bonding. 101 address: 102 This notification is for this BDADDR. 103 state: 104 Bonding state. 0 = Not bonded, 1 = Bonding, 2 = Bonded. 105 """ 106 pass 107 108 def on_device_properties_changed(self, remote_device, props): 109 """Device properties changed for a remote device. 110 111 Args: 112 remote_device: 113 Remote device that is being searched. 114 props: 115 Remote device properties. 116 117 """ 118 pass 119 120class BluetoothConnectionCallbacks: 121 """Callbacks for the Device Connection interface. 122 123 Implement this to observe these callbacks when exporting callbacks via 124 register_connection_callback 125 """ 126 127 def on_device_connected(self, remote_device): 128 """Notification that a device has completed HCI connection. 129 130 Args: 131 remote_device: Remote device that completed HCI connection. 132 """ 133 pass 134 135 def on_device_disconnected(self, remote_device): 136 """Notification that a device has completed HCI disconnection. 137 138 Args: 139 remote_device: Remote device that completed HCI disconnection. 140 """ 141 pass 142 143 144class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): 145 """Handles method calls to and callbacks from the Adapter interface.""" 146 147 ADAPTER_SERVICE = 'org.chromium.bluetooth' 148 ADAPTER_INTERFACE = 'org.chromium.bluetooth.Bluetooth' 149 ADAPTER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/adapter' 150 ADAPTER_CB_INTF = 'org.chromium.bluetooth.BluetoothCallback' 151 ADAPTER_CB_OBJ_NAME = 'test_adapter_client' 152 ADAPTER_CONN_CB_INTF = 'org.chromium.bluetooth.BluetoothConnectionCallback' 153 ADAPTER_CONN_CB_OBJ_NAME = 'test_connection_client' 154 QA_INTERFACE = 'org.chromium.bluetooth.BluetoothQA' 155 QA_LEGACY_INTERFACE = 'org.chromium.bluetooth.BluetoothQALegacy' 156 157 DISCONNECTION_TIMEOUT = 5 158 159 @staticmethod 160 def parse_dbus_device(remote_device_dbus): 161 """Parse a dbus variant dict as a remote device. 162 163 Args: 164 remote_device_dbus: Variant dict with signature a{sv}. 165 166 Returns: 167 Parsing success, BluetoothDevice tuple 168 """ 169 if 'address' in remote_device_dbus and 'name' in remote_device_dbus: 170 return True, (str(remote_device_dbus['address']), str(remote_device_dbus['name'])) 171 172 return False, None 173 174 class ExportedAdapterCallbacks(observer_base.ObserverBase): 175 """ 176 <node> 177 <interface name="org.chromium.bluetooth.BluetoothCallback"> 178 <method name="OnAddressChanged"> 179 <arg type="s" name="addr" direction="in" /> 180 </method> 181 <method name="OnDeviceFound"> 182 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 183 </method> 184 <method name="OnDiscoveringChanged"> 185 <arg type="b" name="discovering" direction="in" /> 186 </method> 187 <method name="OnSspRequest"> 188 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 189 <arg type="u" name="class_of_device" direction="in" /> 190 <arg type="u" name="variant" direction="in" /> 191 <arg type="u" name="passkey" direction="in" /> 192 </method> 193 <method name="OnPinRequest"> 194 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 195 <arg type="u" name="cod" direction="in" /> 196 <arg type="b" name="min_16_digit" direction="in" /> 197 </method> 198 <method name="OnPinDisplay"> 199 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 200 <arg type="s" name="pincode" direction="in" /> 201 </method> 202 <method name="OnBondStateChanged"> 203 <arg type="u" name="status" direction="in" /> 204 <arg type="s" name="address" direction="in" /> 205 <arg type="u" name="state" direction="in" /> 206 </method> 207 <method name="OnDevicePropertiesChanged"> 208 <arg type="a{sv}" name="remote_device" direction="in" /> 209 <arg type="au" name="props" direction="in" /> 210 </method> 211 </interface> 212 </node> 213 """ 214 215 def __init__(self): 216 """Construct exported callbacks object.""" 217 observer_base.ObserverBase.__init__(self) 218 219 def OnAddressChanged(self, addr): 220 """Handle address changed callbacks.""" 221 for observer in self.observers.values(): 222 observer.on_address_changed(addr) 223 224 def OnDeviceFound(self, remote_device_dbus): 225 """Handle device found from discovery.""" 226 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 227 if not parsed: 228 logging.debug('OnDeviceFound parse error: {}'.format(remote_device_dbus)) 229 return 230 231 for observer in self.observers.values(): 232 observer.on_device_found(remote_device) 233 234 def OnDiscoveringChanged(self, discovering): 235 """Handle discovering state changed.""" 236 for observer in self.observers.values(): 237 observer.on_discovering_changed(bool(discovering)) 238 239 def OnSspRequest(self, remote_device_dbus, class_of_device, variant, passkey): 240 """Handle pairing/bonding request to agent.""" 241 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 242 if not parsed: 243 logging.error('OnSspRequest parse error: {}'.format(remote_device_dbus)) 244 return 245 246 for observer in self.observers.values(): 247 observer.on_ssp_request(remote_device, class_of_device, variant, passkey) 248 249 def OnPinRequest(self, remote_device_dbus, cod, min_16_digit): 250 """Handle PIN request callback.""" 251 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 252 if not parsed: 253 logging.error('OnPinRequest parse error: {}'.format(remote_device_dbus)) 254 return 255 256 for observer in self.observers.values(): 257 observer.on_pin_request(remote_device, cod, min_16_digit) 258 259 def OnPinDisplay(self, remote_device_dbus, pincode): 260 """Handle PIN display callback.""" 261 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 262 if not parsed: 263 logging.error('OnPinDisplay parse error: {}'.format(remote_device_dbus)) 264 return 265 266 for observer in self.observers.values(): 267 observer.on_pin_display(remote_device, pincode) 268 269 def OnBondStateChanged(self, status, address, state): 270 """Handle bond state changed callbacks.""" 271 for observer in self.observers.values(): 272 observer.on_bond_state_changed(status, address, state) 273 274 def OnDevicePropertiesChanged(self, remote_device, props): 275 """Handle device properties changed callbacks.""" 276 for observer in self.observers.values(): 277 observer.on_device_properties_changed(remote_device, props) 278 279 class ExportedConnectionCallbacks(observer_base.ObserverBase): 280 """ 281 <node> 282 <interface name="org.chromium.bluetooth.BluetoothConnectionCallback"> 283 <method name="OnDeviceConnected"> 284 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 285 </method> 286 <method name="OnDeviceDisconnected"> 287 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 288 </method> 289 </interface> 290 </node> 291 """ 292 293 def __init__(self, bus, object_path): 294 """Construct exported connection callbacks object.""" 295 observer_base.ObserverBase.__init__(self) 296 297 def OnDeviceConnected(self, remote_device_dbus): 298 """Handle device connected.""" 299 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 300 if not parsed: 301 logging.debug('OnDeviceConnected parse error: {}'.format(remote_device_dbus)) 302 return 303 304 for observer in self.observers.values(): 305 observer.on_device_connected(remote_device) 306 307 def OnDeviceDisconnected(self, remote_device_dbus): 308 """Handle device disconnected.""" 309 parsed, remote_device = FlossAdapterClient.parse_dbus_device(remote_device_dbus) 310 if not parsed: 311 logging.debug('OnDeviceDisconnected parse error: {}'.format(remote_device_dbus)) 312 return 313 314 for observer in self.observers.values(): 315 observer.on_device_disconnected(remote_device) 316 317 def __init__(self, bus, hci): 318 """Construct the client. 319 320 Args: 321 bus: 322 DBus bus over which we'll establish connections. 323 hci: 324 HCI adapter index. Get this value from `get_default_adapter` 325 on FlossManagerClient. 326 """ 327 self.bus = bus 328 self.hci = hci 329 self.objpath = self.ADAPTER_OBJECT_PATTERN.format(hci) 330 331 # We don't register callbacks by default. 332 self.callbacks = None 333 self.connection_callbacks = None 334 335 # Locally cached values 336 self.known_devices = {} 337 self.discovering = False 338 339 # Initialize properties when registering callbacks (we know proxy is 340 # valid at this point). 341 self.properties = None 342 self.remote_properties = None 343 344 def __del__(self): 345 """Destructor.""" 346 del self.callbacks 347 del self.connection_callbacks 348 349 def _make_device(self, address, name, bond_state=None, connected=None): 350 """Make a device dict.""" 351 return { 352 'address': address, 353 'name': name, 354 'bond_state': bond_state, 355 'connected': connected, 356 } 357 358 @utils.glib_callback() 359 def on_device_found(self, remote_device): 360 """Remote device was found as part of discovery.""" 361 address, name = remote_device 362 363 # Update a new device 364 if address not in self.known_devices: 365 self.known_devices[address] = self._make_device(address, name) 366 # Update name if previous cached value didn't have a name 367 elif not self.known_devices[address]: 368 self.known_devices[address]['name'] = name 369 370 @utils.glib_callback() 371 def on_discovering_changed(self, discovering): 372 """Discovering state has changed.""" 373 # Ignore a no-op 374 if self.discovering == discovering: 375 return 376 377 # Cache the value 378 self.discovering = discovering 379 380 # If we are freshly starting discoveyr, clear all locally cached known 381 # devices (that are not bonded or connected) 382 if discovering: 383 # Filter known devices to currently bonded or connected devices 384 self.known_devices = { 385 key: value 386 for key, value in self.known_devices.items() 387 if value.get('bond_state', 0) or value.get('connected', False) 388 } 389 390 @utils.glib_callback() 391 def on_bond_state_changed(self, status, address, state): 392 """Bond state has changed.""" 393 # You can bond unknown devices if it was previously bonded 394 if address not in self.known_devices: 395 self.known_devices[address] = self._make_device(address, '', bond_state=state) 396 else: 397 self.known_devices[address]['bond_state'] = state 398 399 @utils.glib_callback() 400 def on_device_connected(self, remote_device): 401 """Remote device connected hci.""" 402 address, name = remote_device 403 if address not in self.known_devices: 404 self.known_devices[address] = self._make_device(address, name, connected=True) 405 else: 406 self.known_devices[address]['connected'] = True 407 408 @utils.glib_callback() 409 def on_device_disconnected(self, remote_device): 410 """Remote device disconnected hci.""" 411 address, name = remote_device 412 if address not in self.known_devices: 413 self.known_devices[address] = self._make_device(address, name, connected=False) 414 else: 415 self.known_devices[address]['connected'] = False 416 417 @utils.glib_callback() 418 def on_device_properties_changed(self, remote_device, props): 419 """Device properties changed for a remote device. 420 421 Args: 422 remote_device: 423 Remote device that is being searched. 424 props: 425 Remote device properties. 426 427 """ 428 pass 429 430 def _make_dbus_device(self, address, name): 431 return {'address': GLib.Variant('s', address), 'name': GLib.Variant('s', name)} 432 433 @utils.glib_call(False) 434 def has_proxy(self): 435 """Checks whether adapter proxy can be acquired.""" 436 return bool(self.proxy()) 437 438 def proxy(self): 439 """Gets proxy object to adapter interface for method calls.""" 440 return self.bus.get(self.ADAPTER_SERVICE, self.objpath)[self.ADAPTER_INTERFACE] 441 442 def qa_proxy(self): 443 """Gets proxy object to QA interface for method calls.""" 444 return self.bus.get(self.ADAPTER_SERVICE, self.objpath)[self.QA_INTERFACE] 445 446 # TODO(b/227405934): Not sure we want GetRemoteRssi on adapter api since 447 # it's unlikely to be accurate over time. Use a mock for 448 # testing for now. 449 def get_mock_remote_rssi(self, device): 450 """Gets mock value for remote device rssi.""" 451 return -50 452 453 def register_properties(self): 454 """Registers a property set for this client.""" 455 self.properties = utils.PropertySet({ 456 'Address': (self.proxy().GetAddress, None), 457 'Name': (self.proxy().GetName, self.proxy().SetName), 458 'Alias': (self._get_alias, None), 459 'Modalias': (self._get_modalias, None), 460 'Class': (self.proxy().GetBluetoothClass, self.proxy().SetBluetoothClass), 461 'Uuids': (self._get_uuids, None), 462 'Discoverable': (self.proxy().GetDiscoverable, self.proxy().SetDiscoverable), 463 'DiscoverableTimeout': (self.proxy().GetDiscoverableTimeout, None), 464 'IsMultiAdvertisementSupported': (self.proxy().IsMultiAdvertisementSupported, None), 465 'IsLeExtendedAdvertisingSupported': (self.proxy().IsLeExtendedAdvertisingSupported, None) 466 }) 467 468 self.remote_properties = utils.PropertySet({ 469 'Name': (self.proxy().GetRemoteName, None), 470 'Type': (self.proxy().GetRemoteType, None), 471 'Alias': (self.proxy().GetRemoteAlias, None), 472 'Class': (self.proxy().GetRemoteClass, None), 473 'WakeAllowed': (self.proxy().GetRemoteWakeAllowed, None), 474 'Uuids': (self.proxy().GetRemoteUuids, None), 475 'RSSI': (self.get_mock_remote_rssi, None), 476 }) 477 478 def _get_alias(self): 479 """Gets the adapter's alias name. 480 481 It tries BluetoothQA interface first. In case it fails, use 482 BluetoothQALegacy interface instead. 483 484 Returns: 485 Alias name of the adapter. 486 """ 487 return self.qa_proxy().GetAlias() 488 489 def _get_modalias(self): 490 """Gets the adapter modalias name. 491 492 It tries BluetoothQA interface first. In case it fails, use 493 BluetoothQALegacy interface instead. 494 495 Returns: 496 Modalias name of the adapter. 497 """ 498 return self.qa_proxy().GetModalias() 499 500 def _get_uuids(self): 501 """Gets the UUIDs from the D-Bus. 502 503 If D-Bus returns UUID as list of integers, converts the value to UUID 504 string. 505 506 Returns: 507 List of UUIDs in string representation. 508 """ 509 510 uuids = self.proxy().GetUuids() 511 512 # Type check: uuids should be subscriptable. 513 try: 514 first_uuid = uuids[0] 515 except TypeError: 516 return [] 517 518 if isinstance(first_uuid, str): 519 return uuids 520 521 uuid_list = [] 522 for uuid in uuids: 523 uuid_hex = ''.join('{:02x}'.format(m) for m in uuid) 524 uuid_list.append(str(uuid_module.UUID(uuid_hex))) 525 return uuid_list 526 527 @utils.glib_call(False) 528 def register_callbacks(self): 529 """Registers callbacks for this client. 530 531 This will also initialize properties and populate the list of bonded 532 devices since this should be the first thing that gets called after we 533 know that the adapter client has a valid proxy object. 534 535 Returns: 536 True. 537 """ 538 # Make sure properties are registered 539 if not self.properties: 540 self.register_properties() 541 542 # Prevent callback registration multiple times 543 if self.callbacks and self.connection_callbacks: 544 return True 545 546 # Reset known devices 547 self.known_devices.clear() 548 549 if not self.callbacks: 550 # Create and publish callbacks 551 self.callbacks = self.ExportedAdapterCallbacks() 552 self.callbacks.add_observer('adapter_client', self) 553 objpath = utils.generate_dbus_cb_objpath(self.ADAPTER_CB_OBJ_NAME, self.hci) 554 self.bus.register_object(objpath, self.callbacks, None) 555 556 # Register published callback with adapter daemon 557 self.proxy().RegisterCallback(objpath) 558 559 if not self.connection_callbacks: 560 self.connection_callbacks = self.ExportedConnectionCallbacks(self.bus, objpath) 561 self.connection_callbacks.add_observer('adapter_client', self) 562 objpath = utils.generate_dbus_cb_objpath(self.ADAPTER_CONN_CB_OBJ_NAME, self.hci) 563 self.bus.register_object(objpath, self.connection_callbacks, None) 564 565 self.proxy().RegisterConnectionCallback(objpath) 566 567 # Add bonded devices as known devices and set their initial connection 568 # state 569 bonded_devices = self.proxy().GetBondedDevices() 570 for device in bonded_devices: 571 (success, devtuple) = FlossAdapterClient.parse_dbus_device(device) 572 if success: 573 (address, name) = devtuple 574 dev = self.known_devices.get(address, 575 self._make_device(address, name, bond_state=floss_enums.BondState.BONDED)) 576 if dev['bond_state'] is None: 577 dev['bond_state'] = floss_enums.BondState.BONDED 578 logging.info('[%s:%s] initially bonded.', address, name) 579 580 if dev['connected'] is None: 581 cstate = self.proxy().GetConnectionState(self._make_dbus_device(address, name)) 582 dev['connected'] = bool(cstate > 0) 583 logging.info('[%s:%s] initially connection state: %d.', address, name, cstate) 584 585 self.known_devices[address] = dev 586 587 return True 588 589 def register_callback_observer(self, name, observer): 590 """Add an observer for all callbacks. 591 592 Args: 593 name: Name of the observer. 594 observer: Observer that implements all callback classes. 595 """ 596 if isinstance(observer, BluetoothCallbacks): 597 self.callbacks.add_observer(name, observer) 598 599 if isinstance(observer, BluetoothConnectionCallbacks): 600 self.connection_callbacks.add_observer(name, observer) 601 602 def unregister_callback_observer(self, name, observer): 603 """Remove an observer for all callbacks. 604 605 Args: 606 name: 607 Name of the observer. 608 observer: 609 Observer that implements all callback classes. 610 """ 611 if isinstance(observer, BluetoothCallbacks): 612 self.callbacks.remove_observer(name, observer) 613 614 if isinstance(observer, BluetoothConnectionCallbacks): 615 self.connection_callbacks.remove_observer(name, observer) 616 617 @utils.glib_call('') 618 def get_address(self): 619 """Gets the adapter's current address.""" 620 return str(self.proxy().GetAddress()) 621 622 @utils.glib_call('') 623 def get_name(self): 624 """Gets the adapter's name.""" 625 return str(self.proxy().GetName()) 626 627 @utils.glib_call(None) 628 def get_property(self, prop_name): 629 """Gets property by name.""" 630 return self.properties.get(prop_name) 631 632 def get_properties(self): 633 """Gets all adapter properties. 634 635 Returns: 636 A dict of adapter's property names and properties. 637 """ 638 return {p: self.get_property(p) for p in self.properties.get_property_names()} 639 640 def get_discoverable_timeout(self): 641 """Gets the adapter's discoverable timeout.""" 642 return self.proxy().GetDiscoverableTimeout() 643 644 @utils.glib_call(None) 645 def get_remote_property(self, address, prop_name): 646 """Gets remote device property by name.""" 647 name = 'Test device' 648 if address in self.known_devices: 649 name = self.known_devices[address]['name'] 650 651 remote_device = self._make_dbus_device(address, name) 652 return self.remote_properties.get(prop_name, remote_device) 653 654 @utils.glib_call(None) 655 def set_property(self, prop_name, *args): 656 """Sets property by name.""" 657 return self.properties.set(prop_name, *args) 658 659 @utils.glib_call(None) 660 def set_remote_property(self, address, prop_name, *args): 661 """Sets remote property by name.""" 662 name = 'Test device' 663 if address in self.known_devices: 664 name = self.known_devices[address]['name'] 665 666 remote_device = self._make_dbus_device(address, name) 667 return self.properties.set(prop_name, remote_device, *args) 668 669 @utils.glib_call(None) 670 def is_le_extended_advertising_supported(self): 671 """Is LE extended advertising supported? 672 673 Returns: 674 True on success, False on failure, None on DBus error. 675 """ 676 return bool(self.proxy().IsLeExtendedAdvertisingSupported()) 677 678 @utils.glib_call(None) 679 def is_multi_advertisement_supported(self): 680 """Checks if multiple advertisements are supported. 681 682 Returns: 683 True on success, False on failure, None on DBus error. 684 """ 685 return bool(self.proxy().IsMultiAdvertisementSupported()) 686 687 @utils.glib_call(False) 688 def start_discovery(self): 689 """Starts discovery session. 690 691 Returns: 692 True on success, False on failure, None on DBus error. 693 """ 694 return bool(self.proxy().StartDiscovery()) 695 696 @utils.glib_call(False) 697 def stop_discovery(self): 698 """Stops discovery session. 699 700 Returns: 701 True on success, False on failure, None on DBus error. 702 """ 703 return bool(self.proxy().CancelDiscovery()) 704 705 @utils.glib_call(False) 706 def is_wbs_supported(self): 707 """Is WBS supported? 708 709 Returns: 710 True on success, False on failure, None on DBus error. 711 """ 712 return bool(self.proxy().IsWbsSupported()) 713 714 @utils.glib_call(False) 715 def is_discovering(self): 716 """Is adapter discovering?""" 717 return bool(self.discovering) 718 719 @utils.glib_call(False) 720 def has_device(self, address): 721 """Checks to see if device with address is known.""" 722 return address in self.known_devices 723 724 def is_bonded(self, address): 725 """Checks if the given address is currently fully bonded.""" 726 return address in self.known_devices and self.known_devices[address].get( 727 'bond_state', floss_enums.BondState.NOT_BONDED) == floss_enums.BondState.BONDED 728 729 @utils.glib_call(False) 730 def create_bond(self, address, transport): 731 """Creates bond with target address. 732 """ 733 name = 'Test bond' 734 if address in self.known_devices: 735 name = self.known_devices[address]['name'] 736 737 remote_device = self._make_dbus_device(address, name) 738 return bool(self.proxy().CreateBond(remote_device, int(transport))) 739 740 @utils.glib_call(False) 741 def cancel_bond(self, address): 742 """Call cancel bond with no additional checks. Prefer |forget_device|. 743 744 Args: 745 address: Device to cancel bond. 746 747 Returns: 748 Result of |CancelBondProcess|. 749 """ 750 name = 'Test bond' 751 if address in self.known_devices: 752 name = self.known_devices[address]['name'] 753 754 remote_device = self._make_dbus_device(address, name) 755 return bool(self.proxy().CancelBond(remote_device)) 756 757 @utils.glib_call(False) 758 def remove_bond(self, address): 759 """Call remove bond with no additional checks. Prefer |forget_device|. 760 761 Args: 762 address: Device to remove bond. 763 764 Returns: 765 Result of |RemoveBond|. 766 """ 767 name = 'Test bond' 768 if address in self.known_devices: 769 name = self.known_devices[address]['name'] 770 771 remote_device = self._make_dbus_device(address, name) 772 return bool(self.proxy().RemoveBond(remote_device)) 773 774 @utils.glib_call(None) 775 def get_bond_state(self, address): 776 """Gets remote device bond state. 777 778 Args: 779 address: Device to get bond status. 780 781 Returns: 782 True on success, False on failure, None on DBus error. 783 """ 784 name = 'Test bond' 785 if address in self.known_devices: 786 name = self.known_devices[address]['name'] 787 788 remote_device = self._make_dbus_device(address, name) 789 return bool(self.proxy().GetBondState(remote_device)) 790 791 @utils.glib_call(None) 792 def fetch_remote_uuids(self, address): 793 """Gets remote device service uuids. 794 795 Args: 796 address: Device to cancel bond. 797 798 Returns: 799 True on success, False on failure, None on DBus error. 800 """ 801 name = 'Test bond' 802 if address in self.known_devices: 803 name = self.known_devices[address]['name'] 804 805 remote_device = self._make_dbus_device(address, name) 806 return self.proxy().FetchRemoteUuids(remote_device) 807 808 @utils.glib_call(None) 809 def get_bonded_devices(self): 810 """Get all bonded devices. 811 812 Returns: 813 List of device addresses; None on DBus error. 814 """ 815 return self.proxy().GetBondedDevices() 816 817 @utils.glib_call(False) 818 def forget_device(self, address): 819 """Forgets device from local cache and removes bonding. 820 821 If a device is currently bonding or bonded, it will cancel or remove the 822 bond to totally remove this device. 823 824 Args: 825 address: Device address to forget. 826 827 Returns: 828 True if device was known and was removed. 829 False if device was unknown or removal failed. 830 """ 831 if address not in self.known_devices: 832 return False 833 834 # Remove the device from known devices first 835 device = self.known_devices[address] 836 del self.known_devices[address] 837 838 remote_device = self._make_dbus_device(device['address'], device['name']) 839 840 # Extra actions if bond state is not NOT_BONDED 841 if device['bond_state'] == floss_enums.BondState.BONDING: 842 return bool(self.proxy().CancelBondProcess(remote_device)) 843 elif device['bond_state'] == floss_enums.BondState.BONDED: 844 return bool(self.proxy().RemoveBond(remote_device)) 845 846 return True 847 848 @utils.glib_call(False) 849 def set_pin(self, address, accept, pin_code): 850 """Set pin on bonding device. 851 852 Args: 853 address: Device address to reply. 854 accept: True to accept the pin request, False to reject the pin request. 855 pin_code: PIN code to reply. The PIN code is a list of up to 16 856 integers. 857 """ 858 if address not in self.known_devices: 859 logging.debug('[%s] Unknown device in set_pin', address) 860 return False 861 862 device = self.known_devices[address] 863 remote_device = self._make_dbus_device(address, device['name']) 864 865 return self.proxy().SetPin(remote_device, accept, pin_code) 866 867 @utils.glib_call(False) 868 def set_pairing_confirmation(self, address, accept): 869 """Confirm that a pairing should be completed on a bonding device.""" 870 # Device should be known or already `Bonding` 871 if address not in self.known_devices: 872 logging.debug('[%s] Unknown device in set_pairing_confirmation', address) 873 return False 874 875 device = self.known_devices[address] 876 remote_device = self._make_dbus_device(address, device['name']) 877 878 return bool(self.proxy().SetPairingConfirmation(remote_device, accept)) 879 880 def get_connected_devices_count(self): 881 """Gets the number of known, connected devices.""" 882 return sum([1 for x in self.known_devices.values() if x.get('connected', False)]) 883 884 def is_connected(self, address): 885 """Checks whether a device is connected.""" 886 return address in self.known_devices and self.known_devices[address].get('connected', False) 887 888 @utils.glib_call(False) 889 def connect_all_enabled_profiles(self, address): 890 """Connect all enabled profiles for target address.""" 891 device = self._make_dbus_device(address, self.known_devices.get(address, {}).get('name', 'Test device')) 892 return bool(self.proxy().ConnectAllEnabledProfiles(device)) 893 894 @utils.glib_call(False) 895 def disconnect_all_enabled_profiles(self, address): 896 """Disconnect all enabled profiles for target address.""" 897 device = self._make_dbus_device(address, self.known_devices.get(address, {}).get('name', 'Test device')) 898 return bool(self.proxy().DisconnectAllEnabledProfiles(device)) 899 900 @utils.glib_call(None) 901 def get_connection_state(self, address): 902 """Gets connection state.""" 903 device = self._make_dbus_device(address, self.known_devices.get(address, {}).get('name', 'Test device')) 904 return self.proxy().GetConnectionState(device) 905 906 def wait_for_device_disconnected(self, address): 907 """Waits for the device become disconnected.""" 908 909 def device_disconnected(self): 910 return not self.known_devices.get(address, {}).get('connected', True) 911 912 try: 913 utils.poll_for_condition(condition=(lambda: device_disconnected(self)), timeout=self.DISCONNECTION_TIMEOUT) 914 return True 915 except TimeoutError: 916 logging.error('on_device_disconnected not called') 917 return False 918 919 def disconnect_device(self, address): 920 """Disconnect a specific address.""" 921 return self.disconnect_all_enabled_profiles(address) and self.wait_for_device_disconnected(address) 922