1# Copyright 2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client class to access the Floss adapter interface.""" 15 16import logging 17import uuid 18 19from floss.pandora.floss import observer_base 20from floss.pandora.floss import utils 21from floss.pandora.floss import floss_enums 22from gi.repository import GLib 23 24 25class BluetoothAdvertisingCallbacks: 26 """Callbacks for the advertising interface. 27 28 Implement this to observe these callbacks when exporting callbacks via 29 register_callback. 30 """ 31 32 def on_advertising_set_started(self, reg_id, advertiser_id, tx_power, status): 33 """Called when advertising set started. 34 35 Args: 36 reg_id: 37 Reg_id of advertising set. 38 advertiser_id: 39 Advertiser id of advertising set. 40 tx_power: 41 Tx-power value get from advertising set registered. 42 status: 43 GattStatus. 44 """ 45 pass 46 47 def on_own_address_read(self, advertiser_id, address_type, address): 48 """Called when own address read. 49 50 Args: 51 advertiser_id: 52 Advertiser id of advertising set. 53 address_type: 54 Public or private address. 55 address: 56 Own address. 57 """ 58 pass 59 60 def on_advertising_set_stopped(self, advertiser_id): 61 """Called when advertising set stopped. 62 63 Args: 64 advertiser_id: Advertiser id of advertising set. 65 """ 66 pass 67 68 def on_advertising_enabled(self, advertiser_id, enable, status): 69 """Called when advertising enabled. 70 71 Args: 72 advertiser_id: 73 Advertiser id of advertising set. 74 enable: 75 Enable advertising set flag. 76 status: 77 GattStatus. 78 """ 79 pass 80 81 def on_advertising_data_set(self, advertiser_id, status): 82 """Called when advertising data set. 83 84 Args: 85 advertiser_id: 86 Advertiser id of advertising set. 87 status: 88 GattStatus. 89 """ 90 pass 91 92 def on_scan_response_data_set(self, advertiser_id, status): 93 """Called when scan response data set. 94 95 Args: 96 advertiser_id: 97 Advertiser id of advertising set. 98 status: 99 GattStatus. 100 """ 101 pass 102 103 def on_advertising_parameters_updated(self, advertiser_id, tx_power, status): 104 """Called when advertising parameters updated. 105 106 Args: 107 advertiser_id: 108 Advertiser id of advertising set. 109 tx_power: 110 Tx-power value get from advertising set registered. 111 status: 112 GattStatus. 113 """ 114 pass 115 116 def on_periodic_advertising_parameters_updated(self, advertiser_id, status): 117 """Called when periodic advertising parameters updated. 118 119 Args: 120 advertiser_id: 121 Advertiser id of advertising set. 122 status: 123 GattStatus. 124 """ 125 pass 126 127 def on_periodic_advertising_data_set(self, advertiser_id, status): 128 """Called when periodic advertising data set. 129 130 Args: 131 advertiser_id: 132 Advertiser id of advertising set. 133 status: 134 GattStatus. 135 """ 136 pass 137 138 def on_periodic_advertising_enabled(self, advertiser_id, enable, status): 139 """Called when periodic advertising parameters enabled. 140 141 Args: 142 advertiser_id: 143 Advertiser id of advertising set. 144 enable: 145 Enable advertising set flag. 146 status: 147 GattStatus. 148 """ 149 pass 150 151 152class FlossAdvertisingClient(BluetoothAdvertisingCallbacks): 153 """Handles method calls to and callbacks from the advertising interface.""" 154 155 ADAPTER_SERVICE = 'org.chromium.bluetooth' 156 ADVERTISING_INTERFACE = 'org.chromium.bluetooth.BluetoothGatt' 157 ADVERTISING_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/gatt' 158 159 ADVERTISING_CB_INTF = 'org.chromium.bluetooth.AdvertisingSetCallback' 160 ADVERTISING_CB_OBJ_NAME = 'test_advertising_client' 161 162 FLOSS_RESPONSE_LATENCY_SECS = 3 163 164 class ExportedAdvertisingCallbacks(observer_base.ObserverBase): 165 """ 166 <node> 167 <interface name="org.chromium.bluetooth.AdvertisingSetCallback"> 168 <method name="OnAdvertisingSetStarted"> 169 <arg type="i" name="reg_id" direction="in" /> 170 <arg type="i" name="advertiser_id" direction="in" /> 171 <arg type="i" name="tx_power" direction="in" /> 172 <arg type="u" name="status" direction="in" /> 173 </method> 174 <method name="OnOwnAddressRead"> 175 <arg type="i" name="advertiser_id" direction="in" /> 176 <arg type="i" name="address_type" direction="in" /> 177 <arg type="s" name="address" direction="in" /> 178 </method> 179 <method name="OnAdvertisingSetStopped"> 180 <arg type="i" name="advertiser_id" direction="in" /> 181 </method> 182 <method name="OnAdvertisingEnabled"> 183 <arg type="i" name="advertiser_id" direction="in" /> 184 <arg type="b" name="enable" direction="in" /> 185 <arg type="u" name="status" direction="in" /> 186 </method> 187 <method name="OnAdvertisingDataSet"> 188 <arg type="i" name="advertiser_id" direction="in" /> 189 <arg type="u" name="status" direction="in" /> 190 </method> 191 <method name="OnScanResponseDataSet"> 192 <arg type="i" name="advertiser_id" direction="in" /> 193 <arg type="u" name="status" direction="in" /> 194 </method> 195 <method name="OnAdvertisingParametersUpdated"> 196 <arg type="i" name="advertiser_id" direction="in" /> 197 <arg type="i" name="tx_power" direction="in" /> 198 <arg type="u" name="status" direction="in" /> 199 </method> 200 <method name="OnPeriodicAdvertisingParametersUpdated"> 201 <arg type="i" name="advertiser_id" direction="in" /> 202 <arg type="u" name="status" direction="in" /> 203 </method> 204 <method name="OnPeriodicAdvertisingDataSet"> 205 <arg type="i" name="advertiser_id" direction="in" /> 206 <arg type="u" name="status" direction="in" /> 207 </method> 208 <method name="OnPeriodicAdvertisingEnabled"> 209 <arg type="i" name="advertiser_id" direction="in" /> 210 <arg type="b" name="enable" direction="in" /> 211 <arg type="u" name="status" direction="in" /> 212 </method> 213 </interface> 214 </node> 215 """ 216 217 def __init__(self): 218 """Construct exported callbacks object.""" 219 observer_base.ObserverBase.__init__(self) 220 221 def OnAdvertisingSetStarted(self, reg_id, advertiser_id, tx_power, status): 222 """Handle advertising set started callback.""" 223 for observer in self.observers.values(): 224 observer.on_advertising_set_started(reg_id, advertiser_id, tx_power, status) 225 226 def OnOwnAddressRead(self, advertiser_id, address_type, address): 227 """Handle own address read callback.""" 228 for observer in self.observers.values(): 229 observer.on_own_address_read(advertiser_id, address_type, address) 230 231 def OnAdvertisingSetStopped(self, advertiser_id): 232 """Handle advertising set stopped callback.""" 233 for observer in self.observers.values(): 234 observer.on_advertising_set_stopped(advertiser_id) 235 236 def OnAdvertisingEnabled(self, advertiser_id, enable, status): 237 """Handle advertising enabled callback.""" 238 for observer in self.observers.values(): 239 observer.on_advertising_enabled(advertiser_id, enable, status) 240 241 def OnAdvertisingDataSet(self, advertiser_id, status): 242 """Handle advertising data set callback.""" 243 for observer in self.observers.values(): 244 observer.on_advertising_data_set(advertiser_id, status) 245 246 def OnScanResponseDataSet(self, advertiser_id, status): 247 """Handle scan response data set callback.""" 248 for observer in self.observers.values(): 249 observer.on_scan_response_data_set(advertiser_id, status) 250 251 def OnAdvertisingParametersUpdated(self, advertiser_id, tx_power, status): 252 """Handle advertising parameters updated callback.""" 253 for observer in self.observers.values(): 254 observer.on_advertising_parameters_updated(advertiser_id, tx_power, status) 255 256 def OnPeriodicAdvertisingParametersUpdated(self, advertiser_id, status): 257 """Handle periodic advertising parameters updated callback.""" 258 for observer in self.observers.values(): 259 observer.on_periodic_advertising_parameters_updated(advertiser_id, status) 260 261 def OnPeriodicAdvertisingDataSet(self, advertiser_id, status): 262 """Handle periodic advertising data set callback.""" 263 for observer in self.observers.values(): 264 observer.on_periodic_advertising_data_set(advertiser_id, status) 265 266 def OnPeriodicAdvertisingEnabled(self, advertiser_id, enable, status): 267 """Handle periodic advertising enabled callback.""" 268 for observer in self.observers.values(): 269 observer.on_periodic_advertising_enabled(advertiser_id, enable, status) 270 271 def __init__(self, bus, hci): 272 """Construct the client. 273 274 Args: 275 bus: 276 DBus bus over which we'll establish connections. 277 hci: 278 HCI adapter index. Get this value from `get_default_adapter` on FlossAdvertisingClient. 279 """ 280 self.bus = bus 281 self.hci = hci 282 self.objpath = self.ADVERTISING_OBJECT_PATTERN.format(hci) 283 284 # We don't register callbacks by default. 285 self.callbacks = None 286 self.callback_id = None 287 288 # A dict of advertiser_id as key and tx power as value. 289 self.active_advs = {} 290 291 # A dict of reg_id as key and tuple of (advertiser_id, status) as value. 292 self.start_adv_results = {} 293 294 def __del__(self): 295 """Destructor.""" 296 del self.callbacks 297 298 @utils.glib_callback() 299 def on_advertising_set_started(self, reg_id, advertiser_id, tx_power, status): 300 """Handle advertising set started callback.""" 301 logging.debug('on_advertising_set_started: reg_id: %s, advertiser_id: %s, ' 302 'tx_power: %s, status: %s', reg_id, advertiser_id, tx_power, status) 303 self.start_adv_results[reg_id] = (advertiser_id, status) 304 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 305 return 306 307 if advertiser_id in self.active_advs: 308 logging.warning('The set of advertiser_id: %s, is already registered.', advertiser_id) 309 else: 310 self.active_advs[advertiser_id] = tx_power 311 312 @utils.glib_callback() 313 def on_own_address_read(self, advertiser_id, address_type, address): 314 """Handle own address read callback.""" 315 logging.debug('on_own_address_read: advertiser_id: %s, address_type: %s, ' 316 'address: %s', advertiser_id, address_type, address) 317 318 @utils.glib_callback() 319 def on_advertising_set_stopped(self, advertiser_id): 320 """Handle advertising set stopped callback.""" 321 logging.debug('on_advertising_set_stopped: advertiser_id: %s', advertiser_id) 322 if advertiser_id in self.active_advs: 323 self.active_advs.pop(advertiser_id) 324 else: 325 logging.warning('The set of advertiser_id: %s, not registered yet.', advertiser_id) 326 327 @utils.glib_callback() 328 def on_advertising_enabled(self, advertiser_id, enable, status): 329 """Handle advertising enable callback.""" 330 logging.debug('on_advertising_enabled: advertiser_id: %s, enable: %s status: %s', advertiser_id, enable, status) 331 332 @utils.glib_callback() 333 def on_advertising_data_set(self, advertiser_id, status): 334 """Handle advertising data set callback.""" 335 logging.debug('on_advertising_data_set: advertiser_id: %s, status: %s', advertiser_id, status) 336 337 @utils.glib_callback() 338 def on_scan_response_data_set(self, advertiser_id, status): 339 """Handle scan response data set callback.""" 340 logging.debug('on_scan_response_data_set: advertiser_id: %s, status: %s', advertiser_id, status) 341 342 @utils.glib_callback() 343 def on_advertising_parameters_updated(self, advertiser_id, tx_power, status): 344 """Handle advertising parameters update callback.""" 345 logging.debug('on_advertising_parameters_updated: advertiser_id: %s, ' 346 'tx_power: %s, status: %s', advertiser_id, tx_power, status) 347 348 @utils.glib_callback() 349 def on_periodic_advertising_parameters_updated(self, advertiser_id, status): 350 """Handle periodic advertising parameters updated callback.""" 351 logging.debug('on_periodic_advertising_parameters_updated: advertiser_id: ' 352 '%s, status: %s', advertiser_id, status) 353 354 @utils.glib_callback() 355 def on_periodic_advertising_data_set(self, advertiser_id, status): 356 """Handle periodic advertising data set callback.""" 357 logging.debug('on_periodic_advertising_data_set: advertiser_id: %s status: %s', advertiser_id, status) 358 359 @utils.glib_callback() 360 def on_periodic_advertising_enabled(self, advertiser_id, enable, status): 361 """Handle on periodic advertising enabled callback.""" 362 logging.debug('on_periodic_advertising_enabled: advertiser_id: %s, enable: ' 363 '%s, status: %s', advertiser_id, enable, status) 364 365 def make_dbus_periodic_advertising_parameters(self, adv_periodic_parameters): 366 """Makes a struct for periodic advertising parameters D-Bus. 367 368 Args: 369 adv_periodic_parameters: A dictionary of periodic advertising 370 parameters. 371 372 Returns: 373 An empty dictionary if adv_periodic_parameters is None or some 374 periodic parameters are missing from it, else returns a 375 dictionary with periodic advertising parameters. 376 """ 377 if not adv_periodic_parameters: 378 return {} 379 380 missing_periodic_parameters = {'include_tx_power', 'interval'} - set(adv_periodic_parameters.keys()) 381 382 if missing_periodic_parameters: 383 logging.error('Missing periodic advertisement parameters data with ' 384 'keys: %s', ','.join(missing_periodic_parameters)) 385 return {} 386 387 return { 388 'include_tx_power': GLib.Variant('b', adv_periodic_parameters['include_tx_power']), 389 'interval': GLib.Variant('i', adv_periodic_parameters['interval']) 390 } 391 392 def make_dbus_advertising_set_parameters(self, adv_set_parameters): 393 """Makes a struct for advertising set parameters D-Bus. 394 395 Args: 396 adv_set_parameters: A dictionary of advertising set parameters. 397 398 Returns: 399 An empty dictionary if adv_set_parameters is None or some 400 parameters are missing from it, else returns a dictionary with 401 advertising set parameters. 402 """ 403 if not adv_set_parameters: 404 return {} 405 406 missing_parameters = { 407 'connectable', 'scannable', 'is_legacy', 'is_anonymous', 'include_tx_power', 'primary_phy', 'secondary_phy', 408 'interval', 'tx_power_level', 'own_address_type' 409 } - set(adv_set_parameters.keys()) 410 411 if missing_parameters: 412 logging.error('Missing advertisement parameters with keys: %s', ','.join(missing_parameters)) 413 return {} 414 415 return { 416 'connectable': GLib.Variant('b', adv_set_parameters['connectable']), 417 'scannable': GLib.Variant('b', adv_set_parameters['scannable']), 418 'is_legacy': GLib.Variant('b', adv_set_parameters['is_legacy']), 419 'is_anonymous': GLib.Variant('b', adv_set_parameters['is_anonymous']), 420 'include_tx_power': GLib.Variant('b', adv_set_parameters['include_tx_power']), 421 'primary_phy': GLib.Variant('u', adv_set_parameters['primary_phy']), 422 'secondary_phy': GLib.Variant('u', adv_set_parameters['secondary_phy']), 423 'interval': GLib.Variant('i', adv_set_parameters['interval']), 424 'tx_power_level': GLib.Variant('i', adv_set_parameters['tx_power_level']), 425 'own_address_type': GLib.Variant('i', adv_set_parameters['own_address_type']) 426 } 427 428 def make_dbus_advertise_data(self, adv_data): 429 """Makes a struct for advertising data D-Bus. 430 431 Args: 432 adv_data: A dictionary of advertising data. 433 434 Returns: 435 An empty dictionary if adv_data is None or some data are 436 missing from it, else returns a dictionary with advertising 437 data. 438 """ 439 if not adv_data: 440 return {} 441 442 missing_data = { 443 'service_uuids', 'solicit_uuids', 'transport_discovery_data', 'manufacturer_data', 'service_data', 444 'include_tx_power_level', 'include_device_name' 445 } - set(adv_data.keys()) 446 447 if missing_data: 448 logging.error('Missing advertisement data with keys: %s', ','.join(missing_data)) 449 return {} 450 451 return { 452 'service_uuids': 453 GLib.Variant('aay', self.convert_uuids_to_bytearray(adv_data['service_uuids'])), 454 'solicit_uuids': 455 GLib.Variant('aay', self.convert_uuids_to_bytearray(adv_data['solicit_uuids'])), 456 'transport_discovery_data': 457 GLib.Variant('aay', adv_data['transport_discovery_data']), 458 'manufacturer_data': 459 GLib.Variant('a{qay}', self.convert_manufacturer_data_to_bytearray(adv_data['manufacturer_data'])), 460 'service_data': 461 GLib.Variant('a{say}', self.convert_service_data_to_bytearray(adv_data['service_data'])), 462 'include_tx_power_level': 463 GLib.Variant('b', adv_data['include_tx_power_level']), 464 'include_device_name': 465 GLib.Variant('b', adv_data['include_device_name']) 466 } 467 468 @utils.glib_call(False) 469 def has_proxy(self): 470 """Checks whether Gatt proxy can be acquired.""" 471 return bool(self.proxy()) 472 473 def proxy(self): 474 """Gets proxy object to Gatt interface for method calls.""" 475 return self.bus.get(self.ADAPTER_SERVICE, self.objpath)[self.ADVERTISING_INTERFACE] 476 477 @utils.glib_call(False) 478 def register_advertiser_callback(self): 479 """Registers advertising callbacks for this client if one doesn't already exist.""" 480 481 if self.callbacks: 482 return True 483 484 # Create and publish callbacks 485 self.callbacks = self.ExportedAdvertisingCallbacks() 486 self.callbacks.add_observer('advertising_client', self) 487 objpath = utils.generate_dbus_cb_objpath(self.ADVERTISING_CB_OBJ_NAME, self.hci) 488 self.bus.register_object(objpath, self.callbacks, None) 489 490 # Register published callbacks with manager daemon 491 self.callback_id = self.proxy().RegisterAdvertiserCallback(objpath) 492 return True 493 494 @utils.glib_call(False) 495 def unregister_advertiser_callback(self): 496 """Unregisters advertising callbacks for this client. 497 498 Returns: 499 True on success, False otherwise. 500 """ 501 self.proxy().UnregisterAdvertiserCallback(self.callback_id) 502 return True 503 504 def register_callback_observer(self, name, observer): 505 """Add an observer for all callbacks. 506 507 Args: 508 name: 509 Name of the observer. 510 observer: 511 Observer that implements all callback classes. 512 """ 513 if isinstance(observer, BluetoothAdvertisingCallbacks): 514 self.callbacks.add_observer(name, observer) 515 516 def unregister_callback_observer(self, name, observer): 517 """Remove an observer for all callbacks. 518 519 Args: 520 name: 521 Name of the observer. 522 observer: 523 Observer that implements all callback classes. 524 """ 525 if isinstance(observer, BluetoothAdvertisingCallbacks): 526 self.callbacks.remove_observer(name, observer) 527 528 @utils.glib_call(None) 529 def start_advertising_set(self, parameters, advertise_data, scan_response, periodic_parameters, periodic_data, 530 duration, max_ext_adv_events): 531 """Starts advertising set. 532 533 Args: 534 parameters: 535 AdvertisingSetParameters structure. 536 advertise_data: 537 AdvertiseData structure. 538 scan_response: 539 Scan response data(optional). 540 periodic_parameters: 541 PeriodicAdvertisingParameters structure (optional). 542 periodic_data: 543 AdvertiseData structure(optional). 544 duration: 545 Time to start advertising set. 546 max_ext_adv_events: 547 Maximum of extended advertising events. 548 549 Returns: 550 The reg_id for the advertising set on success, 551 None otherwise. 552 """ 553 return self.proxy().StartAdvertisingSet(parameters, advertise_data, scan_response, periodic_parameters, 554 periodic_data, duration, max_ext_adv_events, self.callback_id) 555 556 @utils.glib_call(False) 557 def stop_advertising_set(self, advertiser_id): 558 """Stops advertising set using advertiser id of set. 559 560 Args: 561 advertiser_id: Advertiser id of set advertising. 562 563 Returns: 564 True on success, False otherwise. 565 """ 566 self.proxy().StopAdvertisingSet(advertiser_id) 567 return True 568 569 @utils.glib_call(False) 570 def enable_advertising_set(self, advertiser_id, enable, duration, max_ext_adv_events): 571 """Enables advertising set using advertiser_id. 572 573 Args: 574 advertiser_id: 575 Advertiser id of set advertising. 576 enable: 577 Enable advertising set flag. 578 duration: 579 Time to send the advertising set. 580 max_ext_adv_events: 581 Number of max extend adv events. 582 583 Returns: 584 True on success, False otherwise. 585 """ 586 self.proxy().EnableAdvertisingSet(advertiser_id, enable, duration, max_ext_adv_events) 587 return True 588 589 @utils.glib_call(False) 590 def set_advertising_data(self, advertiser_id, data): 591 """Sets advertising data using advertiser_id. 592 593 Args: 594 advertiser_id: 595 Advertiser id of set advertising. 596 data: 597 AdvertiseData structure. 598 599 Returns: 600 True on success, False otherwise. 601 """ 602 self.proxy().SetAdvertisingData(advertiser_id, data) 603 return True 604 605 @utils.glib_call(False) 606 def set_scan_response_data(self, advertiser_id, data): 607 """Sets scan response data using advertiser id. 608 609 Args: 610 advertiser_id: 611 Advertiser id of set advertising. 612 data: 613 AdvertiseData structure. 614 615 Returns: 616 True on success, False otherwise. 617 """ 618 self.proxy().SetScanResponseData(advertiser_id, data) 619 return True 620 621 @utils.glib_call(False) 622 def set_advertising_parameters(self, advertiser_id, parameters): 623 """Sets advertising parameters using advertiser_id. 624 625 Args: 626 advertiser_id: 627 Advertiser id of set advertising. 628 parameters: 629 AdvertisingSetParameters structure. 630 631 Returns: 632 True on success, False otherwise. 633 """ 634 self.proxy().SetAdvertisingParameters(advertiser_id, parameters) 635 return True 636 637 @utils.glib_call(False) 638 def set_periodic_advertising_parameters(self, advertiser_id, parameters): 639 """Sets periodic advertising parameters using advertiser_id. 640 641 Args: 642 advertiser_id: 643 Advertiser id of set advertising. 644 parameters: 645 AdvertisingSetParameters structure. 646 647 Returns: 648 True on success, False otherwise. 649 """ 650 self.proxy().SetPeriodicAdvertisingParameters(advertiser_id, parameters) 651 return True 652 653 @utils.glib_call(False) 654 def set_periodic_advertising_data(self, advertiser_id, data): 655 """Sets periodic advertising data using advertiser_id. 656 657 Args: 658 advertiser_id: 659 Advertiser id of set advertising. 660 data: 661 AdvertiseData structure. 662 663 Returns: 664 True on success, False otherwise. 665 """ 666 self.proxy().SetPeriodicAdvertisingData(advertiser_id, data) 667 return True 668 669 @utils.glib_call(False) 670 def set_periodic_advertising_enable(self, advertiser_id, enable): 671 """Sets periodic advertising enable using advertiser_id. 672 673 Args: 674 advertiser_id: 675 Advertiser id of set advertising. 676 enable: 677 Enable advertising set flag. 678 679 Returns: 680 True on success, False otherwise. 681 """ 682 self.proxy().SetPeriodicAdvertisingEnable(advertiser_id, enable) 683 return True 684 685 @utils.glib_call(False) 686 def get_own_address(self, advertiser_id): 687 """Gets own address using advertiser_id. 688 689 Args: 690 advertiser_id: Advertiser id of set advertising. 691 692 Returns: 693 True on success, False otherwise. 694 """ 695 self.proxy().GetOwnAddress(advertiser_id) 696 return True 697 698 @staticmethod 699 def convert_service_data_to_bytearray(service_data): 700 """Converts values in service data dict to bytearray. 701 702 Args: 703 service_data: A dict of UUID as key and service data for specific UUID as value. 704 705 Returns: 706 Dictionary of the data converted. 707 """ 708 return {k: bytearray(v) for k, v in service_data.items()} 709 710 @staticmethod 711 def convert_manufacturer_data_to_bytearray(manufacturer_data): 712 """Converts values in manufacturer data dict to bytearray. 713 714 It also converts the hex keys to integers. 715 716 Args: 717 manufacturer_data: A dict of manufacturer id as key and manufacturer data for specific id as value. 718 719 Returns: 720 Dictionary of the data converted. 721 """ 722 return {int(k, 16): bytearray(v) for k, v in manufacturer_data.items()} 723 724 @staticmethod 725 def convert_uuids_to_bytearray(uuids): 726 """Converts values in uuids list to bytearray. 727 728 Args: 729 uuids: A list of UUID128bit. 730 731 Returns: 732 List of the data converted. 733 """ 734 return [uuid.UUID(i).bytes for i in uuids] 735 736 def wait_for_adv_started(self, reg_id): 737 """Waits for advertising started. 738 739 Args: 740 reg_id: The reg_id for advertising set. 741 742 Returns: 743 Advertiser_id, status for specific reg_id on success, 744 (None, None) otherwise. 745 """ 746 try: 747 utils.poll_for_condition(condition=(lambda: reg_id in self.start_adv_results), 748 timeout=self.FLOSS_RESPONSE_LATENCY_SECS) 749 750 except TimeoutError: 751 logging.error('on_advertising_set_started not called') 752 return (None, None) 753 754 advertise_id, status = self.start_adv_results[reg_id] 755 756 # Consume the result here because we have no straightforward timing 757 # to drop the info. We can't drop it in wait_for_adv_stopped because 758 # if the advertising failed to start then it makes no sense for the 759 # user to call wait_for_adv_stopped. 760 del self.start_adv_results[reg_id] 761 762 return advertise_id, status 763 764 def wait_for_adv_stopped(self, advertiser_id): 765 """Waits for advertising stopped. 766 767 Args: 768 advertiser_id: The advertiser_id for advertising set. 769 770 Returns: 771 True on success, False otherwise. 772 """ 773 try: 774 utils.poll_for_condition(condition=(lambda: advertiser_id not in self.active_advs), 775 timeout=self.FLOSS_RESPONSE_LATENCY_SECS) 776 777 return True 778 except TimeoutError: 779 logging.error('on_advertising_set_stopped not called') 780 return False 781 782 def start_advertising_set_sync(self, parameters, advertise_data, scan_response, periodic_parameters, periodic_data, 783 duration, max_ext_adv_events): 784 """Starts advertising set sync. 785 786 Args: 787 parameters: 788 AdvertisingSetParameters structure. 789 advertise_data: 790 AdvertiseData structure. 791 scan_response: 792 Scan response data(optional). 793 periodic_parameters: 794 PeriodicAdvertisingParameters structure (optional). 795 periodic_data: 796 AdvertiseData structure(optional). 797 duration: 798 Time to start advertising set. 799 max_ext_adv_events: 800 Maximum of extended advertising events. 801 802 Returns: 803 Advertiser_id for specific reg_id on success, None otherwise. 804 """ 805 806 reg_id = self.start_advertising_set(parameters, advertise_data, scan_response, periodic_parameters, 807 periodic_data, duration, max_ext_adv_events) 808 if reg_id is None: 809 logging.error('Failed to start advertisement set') 810 return None 811 812 advertise_id, status = self.wait_for_adv_started(reg_id) 813 if status is None: 814 return None 815 816 if floss_enums.GattStatus(status) != floss_enums.GattStatus.SUCCESS: 817 logging.error('Failed to start advertisement with id: %s, status = %s', advertise_id, status) 818 return None 819 return advertise_id 820 821 def stop_advertising_set_sync(self, advertiser_id): 822 """Stops advertising set sync. 823 824 Args: 825 advertiser_id: Advertiser_id for set of advertising. 826 827 Returns: 828 True on success, False otherwise. 829 """ 830 if not self.stop_advertising_set(advertiser_id): 831 return False 832 return self.wait_for_adv_stopped(advertiser_id) 833 834 def stop_all_advertising_sets(self): 835 """Stops all advertising sets. 836 837 Returns: 838 True on success, False otherwise. 839 """ 840 failed_adv_ids = [] 841 adv_ids = [i for i in self.active_advs] 842 for i in adv_ids: 843 if not self.stop_advertising_set_sync(i): 844 failed_adv_ids.append(i) 845 846 if failed_adv_ids: 847 logging.error('Failed to reset advertisement sets with ids: %s', ','.join(failed_adv_ids)) 848 return False 849 return True 850 851 def get_tx_power(self, advertiser_id): 852 """Gets tx power value for specific advertiser id. 853 854 Args: 855 advertiser_id: Advertiser_id for set of advertising. 856 857 Returns: 858 Advertiser_id on success, None otherwise. 859 """ 860 return self.active_advs.get(advertiser_id) 861