1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client class to access the Floss scanner interface.""" 15import copy 16import logging 17import uuid as uuid_module 18 19from floss.pandora.floss import floss_enums 20from floss.pandora.floss import observer_base 21from floss.pandora.floss import utils 22from gi.repository import GLib 23 24 25class BluetoothScannerCallbacks: 26 """Callbacks for the scanner interface. 27 28 Implement this to observe these callbacks when exporting callbacks via 29 register_callback. 30 """ 31 32 def on_scanner_registered(self, uuid, scanner_id, status): 33 """Called when scanner registered. 34 35 Args: 36 uuid: The specific uuid to register it. 37 scanner_id: Scanner id of scanning set. 38 status: floss_enums.GattStatus. 39 """ 40 pass 41 42 def on_scan_result(self, scan_result): 43 """Called when execute start_scan(). 44 45 Args: 46 scan_result: The struct of ScanResult. 47 """ 48 pass 49 50 def on_advertisement_found(self, scanner_id, scan_result): 51 """Called when advertisement found. 52 53 Args: 54 scanner_id: The scanner ID for scanner. 55 scan_result: The struct of ScanResult. 56 """ 57 pass 58 59 def on_advertisement_lost(self, scanner_id, scan_result): 60 """Called when advertisement lost. 61 62 Args: 63 scanner_id: The scanner ID for scanner. 64 scan_result: The struct of ScanResult. 65 """ 66 pass 67 68 def on_suspend_mode_change(self, suspend_mode): 69 """Called when suspend mode change. 70 71 Args: 72 suspend_mode: The suspend mode of Bluetooth. 73 """ 74 pass 75 76 77class ScannerObj: 78 """The scanner object for Advertisement Monitor Tests. 79 80 This class creates instances of multiple scanners. 81 """ 82 83 def __init__(self, scanner_id, uuid, status): 84 """Construction of a scanner object. 85 86 Args: 87 scanner_id: Scanner ID of scanning set. 88 uuid: The specific UUID for scanner. 89 status: GATT status. 90 """ 91 self.scanner_id = scanner_id 92 self.uuid = uuid 93 self.status = status 94 self.events = { 95 'DeviceFound': 0, 96 'DeviceLost': 0, 97 } 98 self.target_devices = [] 99 100 def get_event_count(self, event): 101 """Reads the event count. 102 103 Args: 104 event: Name of the specific event or 'All' for all events. 105 Returns: 106 Count of a specific event or dict of counts of all events. 107 """ 108 if event == 'All': 109 return self.events 110 111 return self.events.get(event) 112 113 def add_event_count(self, event): 114 """Increase the event count by one. 115 116 Args: 117 event: Name of the event as a string. 118 """ 119 self.events[event] += 1 120 121 def reset_event_count(self, event): 122 """Resets the event count. 123 124 Args: 125 event: Name of a specific event or 'All' for all events. 126 True on success, False otherwise. 127 128 Returns: 129 True if success, False otherwise. 130 """ 131 if event == 'All': 132 for event_key in self.events: 133 self.events[event_key] = 0 134 return True 135 136 if event in self.events: 137 self.events[event] = 0 138 return True 139 140 return False 141 142 def set_target_devices(self, devices): 143 """Sets the target devices to the given scanner. 144 145 DeviceFound and DeviceLost will only be counted if it is triggered by a 146 target device. 147 148 Args: 149 devices: A list of devices in dbus object path. 150 """ 151 self.target_devices = copy.deepcopy(devices) 152 153 154class FlossScannerClient(BluetoothScannerCallbacks): 155 """Handles method calls to and callbacks from the scanner interface.""" 156 157 SCANNER_SERVICE = 'org.chromium.bluetooth' 158 SCANNER_INTERFACE = 'org.chromium.bluetooth.BluetoothGatt' 159 SCANNER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/gatt' 160 161 SCANNER_CB_INTF = 'org.chromium.bluetooth.ScannerCallback' 162 SCANNER_CB_OBJ_NAME = 'test_scanner_client' 163 FLOSS_RESPONSE_LATENCY_SECS = 3 164 165 class ExportedScannerCallbacks(observer_base.ObserverBase): 166 """ 167 <node> 168 <interface name="org.chromium.bluetooth.ScannerCallback"> 169 <method name="OnScannerRegistered"> 170 <arg type="ay" name="uuid" direction="in" /> 171 <arg type="y" name="scanner_id" direction="in" /> 172 <arg type="u" name="status" direction="in" /> 173 </method> 174 <method name="OnScanResult"> 175 <arg type="a{sv}" name="scan_result" direction="in" /> 176 </method> 177 <method name="OnAdvertisementFound"> 178 <arg type="y" name="scanner_id" direction="in" /> 179 <arg type="a{sv}" name="scan_result" direction="in" /> 180 </method> 181 <method name="OnAdvertisementLost"> 182 <arg type="y" name="scanner_id" direction="in" /> 183 <arg type="a{sv}" name="scan_result" direction="in" /> 184 </method> 185 <method name="OnSuspendModeChange"> 186 <arg type="u" name="suspend_mode" direction="in" /> 187 </method> 188 </interface> 189 </node> 190 """ 191 192 def __init__(self): 193 """Constructs exported callbacks object.""" 194 observer_base.ObserverBase.__init__(self) 195 196 def OnScannerRegistered(self, uuid, scanner_id, status): 197 """Handles scanner registered callback. 198 199 Args: 200 uuid: The specific uuid to register it. 201 scanner_id: Scanner id of scanning set. 202 status: floss_enums.GattStatus. 203 """ 204 for observer in self.observers.values(): 205 observer.on_scanner_registered(uuid, scanner_id, status) 206 207 def OnScanResult(self, scan_result): 208 """Handles scan result callback. 209 210 Args: 211 scan_result: The struct of ScanResult. 212 """ 213 for observer in self.observers.values(): 214 observer.on_scan_result(scan_result) 215 216 def OnAdvertisementFound(self, scanner_id, scan_result): 217 """Handles advertisement found callback. 218 219 Args: 220 scanner_id: The scanner ID for scanner. 221 scan_result: The struct of ScanResult. 222 """ 223 for observer in self.observers.values(): 224 observer.on_advertisement_found(scanner_id, scan_result) 225 226 def OnAdvertisementLost(self, scanner_id, scan_result): 227 """Handles advertisement lost callback. 228 229 Args: 230 scanner_id: The scanner ID for scanner. 231 scan_result: The struct of ScanResult. 232 """ 233 for observer in self.observers.values(): 234 observer.on_advertisement_lost(scanner_id, scan_result) 235 236 def OnSuspendModeChange(self, suspend_mode): 237 """Handles suspend mode change callback. 238 239 Args: 240 suspend_mode: The suspend mode of Bluetooth. 241 """ 242 for observer in self.observers.values(): 243 observer.on_suspend_mode_change(suspend_mode) 244 245 def __init__(self, bus, hci): 246 """Constructs the client. 247 248 Args: 249 bus: D-Bus bus over which we'll establish connections. 250 hci: HCI adapter index. Get this value from `get_default_adapter` 251 on FlossManagerClient. 252 """ 253 self.bus = bus 254 self.hci = hci 255 self.objpath = self.SCANNER_OBJECT_PATTERN.format(hci) 256 257 # We don't register callbacks by default. 258 self.callbacks = None 259 self.callback_id = None 260 self.register_scanner_results = {} 261 self.scanners = {} 262 263 def __del__(self): 264 """Destructor.""" 265 del self.callbacks 266 267 @utils.glib_callback() 268 def on_scanner_registered(self, uuid, scanner_id, status): 269 """Handles scanner registered callback. 270 271 Args: 272 uuid: The specific uuid to register it. 273 scanner_id: Scanner id of scanning set. 274 status: floss_enums.GattStatus. 275 """ 276 logging.debug('on_scanner_registered: uuid: %s, scanner_id: %s status: %s', uuid, scanner_id, status) 277 278 # The uuid is returned as a list of bytes (128-bit UUID) so 279 # we need convert it to uuid object in order to store it in the 280 # dictionary as a key. 281 uuid_object = uuid_module.UUID(bytes=bytes(uuid)) 282 self.register_scanner_results[uuid_object] = (scanner_id, status) 283 284 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 285 return 286 287 # Creates a scanner object every time a new scanner registered. 288 scanner = ScannerObj(scanner_id, uuid_object, status) 289 self.scanners[scanner_id] = scanner 290 291 @utils.glib_callback() 292 def on_scan_result(self, scan_result): 293 """Handles scan result callback. 294 295 Args: 296 scan_result: The struct of ScanResult. 297 """ 298 logging.debug('on_scan_result: scan_result: %s', scan_result) 299 300 @utils.glib_callback() 301 def on_advertisement_found(self, scanner_id, scan_result): 302 """Handles advertisement found callback. 303 304 Args: 305 scanner_id: The scanner ID for scanner. 306 scan_result: The struct of ScanResult. 307 """ 308 logging.debug('on_advertisement_found: scanner_id: %s, scan_result: %s', scanner_id, scan_result) 309 310 # Update DeviceFound if the received address device exists in the 311 # target_device list. 312 if scan_result['address'] in self.scanners[scanner_id].target_devices: 313 self.scanners[scanner_id].add_event_count('DeviceFound') 314 315 @utils.glib_callback() 316 def on_advertisement_lost(self, scanner_id, scan_result): 317 """Handles advertisement lost callback. 318 319 Args: 320 scanner_id: The scanner ID for scanner. 321 scan_result: The struct of ScanResult. 322 """ 323 logging.debug('on_advertisement_lost: scanner_id: %s, scan_result: %s', scanner_id, scan_result) 324 325 # Update DeviceLost if the received address device exists in the 326 # target_device list. 327 if scan_result['address'] in self.scanners[scanner_id].target_devices: 328 self.scanners[scanner_id].add_event_count('DeviceLost') 329 330 @utils.glib_callback() 331 def on_suspend_mode_change(self, suspend_mode): 332 """Handles suspend mode change callback. 333 334 Args: 335 suspend_mode: The suspend mode of Bluetooth. 336 """ 337 logging.debug('on_suspend_mode_change: suspend_mode: %s', suspend_mode) 338 339 def make_dbus_scan_filter_pattern(self, start_position, ad_type, content): 340 """Makes struct for scan filter pattern D-Bus. 341 342 Args: 343 start_position: The start position of pattern. 344 ad_type: The type of pattern. 345 content: The content of pattern. 346 347 Returns: 348 Dictionary of scan filter pattern. 349 """ 350 return { 351 'start_position': GLib.Variant('y', start_position), 352 'ad_type': GLib.Variant('y', ad_type), 353 'content': GLib.Variant('ay', content) 354 } 355 356 def make_dbus_scan_filter_condition(self, patterns): 357 """Makes struct for scan filter condition D-Bus. 358 359 Args: 360 patterns: The list of patterns used for conditions. 361 362 Returns: 363 Dictionary of scan filter condition. 364 """ 365 return {'patterns': GLib.Variant('aa{sv}', patterns)} 366 367 def make_dbus_scan_filter(self, rssi_high_threshold, rssi_low_threshold, rssi_low_timeout, rssi_sampling_period, 368 condition): 369 """Makes struct for scan filter D-Bus. 370 371 Args: 372 rssi_high_threshold: RSSI high threshold value. 373 rssi_low_threshold: RSSI low threshold value. 374 rssi_low_timeout: RSSI low timeout value. 375 rssi_sampling_period: The sampling interval in milliseconds. 376 condition: Struct of ScanFilterCondition. 377 378 Returns: 379 Dictionary of scan filter. 380 """ 381 patterns = [] 382 for c in condition: 383 patterns.append(self.make_dbus_scan_filter_pattern(c['start_position'], c['ad_type'], c['content'])) 384 return { 385 'rssi_high_threshold': GLib.Variant('y', rssi_high_threshold), 386 'rssi_low_threshold': GLib.Variant('y', rssi_low_threshold), 387 'rssi_low_timeout': GLib.Variant('y', rssi_low_timeout), 388 'rssi_sampling_period': GLib.Variant('y', rssi_sampling_period), 389 'condition': GLib.Variant('a{sv}', self.make_dbus_scan_filter_condition(patterns)) 390 } 391 392 def make_dbus_scan_settings(self, interval, window, scan_type): 393 """Makes struct for scan settings D-Bus. 394 395 Args: 396 interval: The interval value to setting scan. 397 window: The window value to setting scan. 398 scan_type: The type of scan. 399 400 Returns: 401 Dictionary of scan settings. 402 """ 403 return { 404 'interval': GLib.Variant('i', interval), 405 'window': GLib.Variant('i', window), 406 'scan_type': GLib.Variant('u', scan_type) 407 } 408 409 @utils.glib_call(False) 410 def has_proxy(self): 411 """Checks whether scanner proxy can be acquired.""" 412 return bool(self.proxy()) 413 414 def proxy(self): 415 """Gets proxy object to scanner interface for method calls.""" 416 return self.bus.get(self.SCANNER_SERVICE, self.objpath)[self.SCANNER_INTERFACE] 417 418 @utils.glib_call(False) 419 def register_scanner_callback(self): 420 """Registers scanner callbacks if it doesn't exist.""" 421 422 if self.callbacks: 423 return True 424 425 # Create and publish callbacks 426 self.callbacks = self.ExportedScannerCallbacks() 427 self.callbacks.add_observer('scanner_client', self) 428 objpath = utils.generate_dbus_cb_objpath(self.SCANNER_CB_OBJ_NAME, self.hci) 429 self.bus.register_object(objpath, self.callbacks, None) 430 431 # Register published callbacks with scanner daemon 432 self.callback_id = self.proxy().RegisterScannerCallback(objpath) 433 return True 434 435 def register_callback_observer(self, name, observer): 436 """Add an observer for all callbacks. 437 438 Args: 439 name: 440 Name of the observer. 441 observer: 442 Observer that implements all callback classes. 443 """ 444 if isinstance(observer, BluetoothScannerCallbacks): 445 self.callbacks.add_observer(name, observer) 446 447 def unregister_callback_observer(self, name, observer): 448 """Remove an observer for all callbacks. 449 450 Args: 451 name: 452 Name of the observer. 453 observer: 454 Observer that implements all callback classes. 455 """ 456 if isinstance(observer, BluetoothScannerCallbacks): 457 self.callbacks.remove_observer(name, observer) 458 459 def wait_for_on_scanner_registered(self, uuid): 460 """Waits for register scanner. 461 462 Args: 463 uuid: The specific uuid for scanner. 464 465 Returns: 466 scanner_id, status for specific uuid on success, 467 (None, None) otherwise. 468 """ 469 try: 470 utils.poll_for_condition(condition=(lambda: uuid in self.register_scanner_results), 471 timeout=self.FLOSS_RESPONSE_LATENCY_SECS) 472 except TimeoutError: 473 logging.error('on_scanner_registered not called') 474 return None, None 475 scanner_id, status = self.register_scanner_results[uuid] 476 477 # Consume the result here because we have no straightforward timing 478 # to drop the info. We can't drop it in unregister_scanner because 479 # if the advertising failed to start then it makes no sense for the 480 # user to call unregister_scanner. 481 del self.register_scanner_results[uuid] 482 return scanner_id, status 483 484 @utils.glib_call(False) 485 def unregister_scanner_callback(self): 486 """Unregisters scanner callback for this client. 487 488 Returns: 489 True on success, False otherwise. 490 """ 491 return self.proxy().UnregisterScannerCallback(self.callback_id) 492 493 @utils.glib_call(None) 494 def register_scanner(self): 495 """Registers scanner for the callback id. 496 497 Returns: 498 UUID of the registered scanner on success, None otherwise. 499 """ 500 return uuid_module.UUID(bytes=bytes(self.proxy().RegisterScanner(self.callback_id))) 501 502 def register_scanner_sync(self): 503 """Registers scanner for the callback id. 504 505 Returns: 506 scanner_id of the registered scanner on success, None otherwise. 507 """ 508 uuid = self.register_scanner() 509 510 # Failed if we have issue in D-bus (None). 511 if uuid is None: 512 logging.error('Failed to register the scanner') 513 return None 514 515 scanner_id, status = self.wait_for_on_scanner_registered(uuid) 516 if status is None: 517 return None 518 519 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 520 logging.error('Failed to register the scanner with id: %s, status = %s', scanner_id, status) 521 return None 522 return scanner_id 523 524 @utils.glib_call(False) 525 def unregister_scanner(self, scanner_id): 526 """Unregisters scanner set using scanner id of set. 527 528 Args: 529 scanner_id: Scanner id of set scanning. 530 531 Returns: 532 True on success, False otherwise. 533 """ 534 del self.scanners[scanner_id] 535 return self.proxy().UnregisterScanner(scanner_id) 536 537 @utils.glib_call(False) 538 def start_scan(self, scanner_id, settings, scan_filter): 539 """Starts scan. 540 541 Args: 542 scanner_id: Scanner id of set scanning. 543 settings: ScanSettings structure. 544 scan_filter: ScanFilter structure. 545 546 Returns: 547 True on success, False otherwise. 548 """ 549 status = self.proxy().StartScan(scanner_id, settings, scan_filter) 550 551 if floss_enums.BtStatus(status) != floss_enums.BtStatus.SUCCESS: 552 logging.error('Failed to start the scanner with id: %s, status = %s', scanner_id, status) 553 return False 554 return True 555 556 @utils.glib_call(None) 557 def stop_scan(self, scanner_id): 558 """Stops scan set using scanner_id. 559 560 Args: 561 scanner_id: Scanner id of set scanning. 562 563 Returns: 564 floss_enums.BtStatus as int on success, None otherwise. 565 """ 566 return self.proxy().StopScan(scanner_id) 567 568 @utils.glib_call(None) 569 def get_scan_suspend_mode(self): 570 """Gets scan suspend mode. 571 572 Returns: 573 SuspendMode as int on success, None otherwise. 574 """ 575 return self.proxy().GetScanSuspendMode() 576 577 @utils.glib_call(None) 578 def is_msft_supported(self): 579 """Checks if MSFT supported. 580 581 Returns: 582 MSFT capability as boolean on success, None otherwise. 583 """ 584 return self.proxy().IsMsftSupported() 585 586 def get_event_count(self, scanner_id, event): 587 """Reads the count of a particular event on the given scanner. 588 589 Args: 590 scanner_id: The scanner ID. 591 event: Name of the specific event or 'All' for all events. 592 593 Returns: 594 Count of the specific event or dict of counts of all events. 595 """ 596 if scanner_id not in self.scanners: 597 return None 598 599 return self.scanners[scanner_id].get_event_count(event) 600 601 def reset_event_count(self, scanner_id, event): 602 """Resets the count of a particular event on the given scanner. 603 604 Args: 605 scanner_id: The scanner ID. 606 event: Name of the specific event or 'All' for all events. 607 608 Returns: 609 True on success, False otherwise. 610 """ 611 if scanner_id not in self.scanners: 612 return False 613 614 return self.scanners[scanner_id].reset_event_count(event) 615 616 def set_target_devices(self, scanner_id, devices): 617 """Sets target devices to the given scanner. 618 619 DeviceFound and DeviceLost will only be counted if it is triggered 620 by a target device. 621 622 Args: 623 scanner_id: The scanner ID. 624 devices: A list of devices in dbus object path. 625 626 Returns: 627 True on success, False otherwise. 628 """ 629 if scanner_id not in self.scanners: 630 return False 631 632 self.scanners[scanner_id].set_target_devices(devices) 633 return True 634 635 def remove_monitor(self, scanner_id): 636 """Removes the Advertisement Monitor object. 637 638 Args: 639 scanner_id: The scanner ID. 640 641 Returns: 642 True on success, False otherwise. 643 """ 644 stop_scan = self.stop_scan(scanner_id) 645 unregister_scanner = self.unregister_scanner(scanner_id) 646 647 if stop_scan == floss_enums.BtStatus.SUCCESS: 648 stop_scan = True 649 else: 650 return False 651 return stop_scan and unregister_scanner 652