1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46from acts.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 47from acts.test_utils.bt.bt_constants import bt_attribute_values 48from acts.test_utils.bt.bt_constants import sig_uuid_constants 49 50import acts.test_utils.bt.gatt_test_database as gatt_test_database 51 52import cmd 53import pprint 54import time 55"""Various Global Strings""" 56CMD_LOG = "CMD {} result: {}" 57FAILURE = "CMD {} threw exception: {}" 58BASIC_ADV_NAME = "fs_test" 59 60 61class CmdInput(cmd.Cmd): 62 ble_advertise_interval = 1000 63 bt_control_ids = [] 64 bt_control_names = [] 65 bt_control_devices = [] 66 bt_scan_poll_timer = 0.5 67 target_device_name = "" 68 le_ids = [] 69 unique_mac_addr_id = None 70 71 def setup_vars(self, dut, target_device_name, log): 72 self.pri_dut = dut 73 # Note: test_dut is the start of a slow conversion from a Fuchsia specific 74 # Tool to an abstract_device tool. Only commands that use test_dut will work 75 # Otherwise this tool is primarially targeted at Fuchsia devices. 76 self.test_dut = create_bluetooth_device(self.pri_dut) 77 self.test_dut.initialize_bluetooth_controller() 78 self.target_device_name = target_device_name 79 self.log = log 80 81 def emptyline(self): 82 pass 83 84 def do_EOF(self, line): 85 "End Script" 86 return True 87 88 """ Useful Helper functions and cmd line tooling """ 89 def str_to_bool(self, s): 90 if s.lower() == 'true': 91 return True 92 elif s.lower() == 'false': 93 return False 94 95 def _find_unique_id_over_le(self): 96 scan_filter = {"name_substring": self.target_device_name} 97 self.unique_mac_addr_id = None 98 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 99 tries = 10 100 for i in range(tries): 101 time.sleep(self.bt_scan_poll_timer) 102 scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices( 103 )['result'] 104 for device in scan_res: 105 name, did, connectable = device["name"], device["id"], device[ 106 "connectable"] 107 if (self.target_device_name in name): 108 self.unique_mac_addr_id = did 109 self.log.info( 110 "Successfully found device: name, id: {}, {}".format( 111 name, did)) 112 break 113 if self.unique_mac_addr_id: 114 break 115 self.pri_dut.gattc_lib.bleStopBleScan() 116 117 def _find_unique_id_over_bt_control(self): 118 self.unique_mac_addr_id = None 119 self.bt_control_devices = [] 120 self.pri_dut.btc_lib.requestDiscovery(True) 121 tries = 10 122 for i in range(tries): 123 if self.unique_mac_addr_id: 124 break 125 time.sleep(self.bt_scan_poll_timer) 126 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 127 )['result'] 128 for id_dict in device_list: 129 device = device_list[id_dict] 130 self.bt_control_devices.append(device) 131 name = None 132 if device['name'] is not None: 133 name = device['name'] 134 did, address = device['id'], device['address'] 135 136 self.bt_control_ids.append(did) 137 if name is not None: 138 self.bt_control_names.append(name) 139 if self.target_device_name in name: 140 self.unique_mac_addr_id = did 141 self.log.info( 142 "Successfully found device: name, id, address: {}, {}, {}" 143 .format(name, did, address)) 144 break 145 self.pri_dut.btc_lib.requestDiscovery(False) 146 147 def do_tool_take_bt_snoop_log(self, custom_name): 148 """ 149 Description: Takes the bt snoop log from the Fuchsia device. 150 Logs will show up in your config files' logpath directory. 151 152 Input(s): 153 custom_name: Optional. Override the default pcap file name. 154 155 Usage: tool_set_target_device_name new_target_device name 156 Examples: 157 tool_take_bt_snoop_log connection_error 158 tool_take_bt_snoop_log 159 """ 160 self.pri_dut.take_bt_snoop_log(custom_name) 161 162 def do_tool_refresh_unique_id(self, line): 163 """ 164 Description: Refresh command line tool mac unique id. 165 Usage: 166 Examples: 167 tool_refresh_unique_id 168 """ 169 try: 170 self._find_unique_id_over_le() 171 except Exception as err: 172 self.log.error( 173 "Failed to scan or find scan result: {}".format(err)) 174 175 def do_tool_refresh_unique_id_using_bt_control(self, line): 176 """ 177 Description: Refresh command line tool mac unique id. 178 Usage: 179 Examples: 180 tool_refresh_unique_id_using_bt_control 181 """ 182 try: 183 self._find_unique_id_over_bt_control() 184 except Exception as err: 185 self.log.error( 186 "Failed to scan or find scan result: {}".format(err)) 187 188 def do_tool_set_target_device_name(self, line): 189 """ 190 Description: Reset the target device name. 191 Input(s): 192 device_name: Required. The advertising name to connect to. 193 Usage: tool_set_target_device_name new_target_device name 194 Examples: 195 tool_set_target_device_name le_watch 196 """ 197 self.log.info("Setting target_device_name to: {}".format(line)) 198 self.target_device_name = line 199 200 """Begin BLE advertise wrappers""" 201 def do_ble_start_generic_connectable_advertisement(self, line): 202 """ 203 Description: Start a connectable LE advertisement 204 Usage: ble_start_generic_connectable_advertisement 205 """ 206 cmd = "Start a connectable LE advertisement" 207 try: 208 adv_data = {"name": BASIC_ADV_NAME} 209 self.pri_dut.ble_lib.bleStartBleAdvertising( 210 adv_data, self.ble_advertise_interval) 211 except Exception as err: 212 self.log.error(FAILURE.format(cmd, err)) 213 214 def do_ble_start_generic_nonconnectable_advertisement(self, line): 215 """ 216 Description: Start a non-connectable LE advertisement 217 Usage: ble_start_generic_nonconnectable_advertisement 218 """ 219 cmd = "Start a nonconnectable LE advertisement" 220 try: 221 adv_data = {"name": BASIC_ADV_NAME} 222 self.pri_dut.ble_lib.bleStartBleAdvertising( 223 adv_data, self.ble_advertise_interval, False) 224 except Exception as err: 225 self.log.error(FAILURE.format(cmd, err)) 226 227 def do_ble_stop_advertisement(self, line): 228 """ 229 Description: Stop a BLE advertisement. 230 Usage: ble_stop_advertisement 231 """ 232 cmd = "Stop a connectable LE advertisement" 233 try: 234 self.pri_dut.ble_lib.bleStopBleAdvertising() 235 except Exception as err: 236 self.log.error(FAILURE.format(cmd, err)) 237 238 """End BLE advertise wrappers""" 239 """Begin GATT client wrappers""" 240 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 241 if not text: 242 completions = list(self.le_ids)[:] 243 else: 244 completions = [s for s in self.le_ids if s.startswith(text)] 245 return completions 246 247 def do_gattc_connect_by_id(self, line): 248 """ 249 Description: Connect to a LE peripheral. 250 Input(s): 251 device_id: Required. The unique device ID from Fuchsia 252 discovered devices. 253 Usage: 254 Examples: 255 gattc_connect device_id 256 """ 257 cmd = "Connect to a LE peripheral by input ID." 258 try: 259 260 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 261 line) 262 self.log.info("Connection status: {}".format( 263 pprint.pformat(connection_status))) 264 except Exception as err: 265 self.log.error(FAILURE.format(cmd, err)) 266 267 def do_gattc_connect(self, line): 268 """ 269 Description: Connect to a LE peripheral. 270 Optional input: device_name 271 Input(s): 272 device_name: Optional. The peripheral ID to connect to. 273 Usage: 274 Examples: 275 gattc_connect 276 gattc_connect eddystone_123 277 """ 278 cmd = "Connect to a LE peripheral." 279 try: 280 if len(line) > 0: 281 self.target_device_name = line 282 self.unique_mac_addr_id = None 283 if not self.unique_mac_addr_id: 284 try: 285 self._find_unique_id() 286 except Exception as err: 287 self.log.info("Failed to scan or find device.") 288 return 289 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 290 self.unique_mac_addr_id) 291 self.log.info("Connection status: {}".format( 292 pprint.pformat(connection_status))) 293 except Exception as err: 294 self.log.error(FAILURE.format(cmd, err)) 295 296 def do_gattc_connect_disconnect_iterations(self, line): 297 """ 298 Description: Connect then disconnect to a LE peripheral multiple times. 299 Input(s): 300 iterations: Required. The number of iterations to run. 301 Usage: 302 Examples: 303 gattc_connect_disconnect_iterations 10 304 """ 305 cmd = "Connect to a LE peripheral." 306 try: 307 if not self.unique_mac_addr_id: 308 try: 309 self._find_unique_id() 310 except Exception as err: 311 self.log.info("Failed to scan or find device.") 312 return 313 for i in range(int(line)): 314 self.log.info("Running iteration {}".format(i + 1)) 315 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 316 self.unique_mac_addr_id) 317 self.log.info("Connection status: {}".format( 318 pprint.pformat(connection_status))) 319 time.sleep(4) 320 disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 321 self.unique_mac_addr_id) 322 self.log.info("Disconnect status: {}".format(disc_status)) 323 time.sleep(3) 324 except Exception as err: 325 self.log.error(FAILURE.format(cmd, err)) 326 327 def do_gattc_disconnect(self, line): 328 """ 329 Description: Disconnect from LE peripheral. 330 Assumptions: Already connected to a peripheral. 331 Usage: 332 Examples: 333 gattc_disconnect 334 """ 335 cmd = "Disconenct from LE peripheral." 336 try: 337 disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 338 self.unique_mac_addr_id) 339 self.log.info("Disconnect status: {}".format(disconnect_status)) 340 except Exception as err: 341 self.log.error(FAILURE.format(cmd, err)) 342 343 def do_gattc_list_services(self, line): 344 """ 345 Description: List services from LE peripheral. 346 Assumptions: Already connected to a peripheral. 347 Usage: 348 Examples: 349 gattc_list_services 350 """ 351 cmd = "List services from LE peripheral." 352 try: 353 services = self.pri_dut.gattc_lib.listServices( 354 self.unique_mac_addr_id) 355 self.log.info("Discovered Services: \n{}".format( 356 pprint.pformat(services))) 357 except Exception as err: 358 self.log.error(FAILURE.format(cmd, err)) 359 360 def do_gattc_connect_to_service(self, line): 361 """ 362 Description: Connect to Peripheral GATT server service. 363 Assumptions: Already connected to peripheral. 364 Input(s): 365 service_id: Required. The service id reference on the GATT server. 366 Usage: 367 Examples: 368 gattc_connect_to_service service_id 369 """ 370 cmd = "GATT client connect to GATT server service." 371 try: 372 self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id, 373 int(line)) 374 except Exception as err: 375 self.log.error(FAILURE.format(cmd, err)) 376 377 def do_gattc_discover_characteristics(self, line): 378 """ 379 Description: Discover characteristics from a connected service. 380 Assumptions: Already connected to a GATT server service. 381 Usage: 382 Examples: 383 gattc_discover_characteristics 384 """ 385 cmd = "Discover and list characteristics from a GATT server." 386 try: 387 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 388 self.log.info("Discovered chars:\n{}".format( 389 pprint.pformat(chars))) 390 except Exception as err: 391 self.log.error(FAILURE.format(cmd, err)) 392 393 def do_gattc_notify_all_chars(self, line): 394 """ 395 Description: Enable all notifications on all Characteristics on 396 a GATT server. 397 Assumptions: Basic GATT connection made. 398 Usage: 399 Examples: 400 gattc_notify_all_chars 401 """ 402 cmd = "Read all characteristics from the GATT service." 403 try: 404 services = self.pri_dut.gattc_lib.listServices( 405 self.unique_mac_addr_id) 406 for service in services['result']: 407 service_id = service['id'] 408 service_uuid = service['uuid_type'] 409 self.pri_dut.gattc_lib.connectToService( 410 self.unique_mac_addr_id, service_id) 411 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 412 print("Reading chars in service uuid: {}".format(service_uuid)) 413 414 for char in chars['result']: 415 char_id = char['id'] 416 char_uuid = char['uuid_type'] 417 # quick char filter for apple-4 test... remove later 418 print("found uuid {}".format(char_uuid)) 419 try: 420 self.pri_dut.gattc_lib.enableNotifyCharacteristic( 421 char_id) 422 except Exception as err: 423 print("error enabling notification") 424 except Exception as err: 425 self.log.error(FAILURE.format(cmd, err)) 426 427 def do_gattc_read_all_chars(self, line): 428 """ 429 Description: Read all Characteristic values from a GATT server across 430 all services. 431 Assumptions: Basic GATT connection made. 432 Usage: 433 Examples: 434 gattc_read_all_chars 435 """ 436 cmd = "Read all characteristics from the GATT service." 437 try: 438 services = self.pri_dut.gattc_lib.listServices( 439 self.unique_mac_addr_id) 440 for service in services['result']: 441 service_id = service['id'] 442 service_uuid = service['uuid_type'] 443 self.pri_dut.gattc_lib.connectToService( 444 self.unique_mac_addr_id, service_id) 445 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 446 print("Reading chars in service uuid: {}".format(service_uuid)) 447 448 for char in chars['result']: 449 char_id = char['id'] 450 char_uuid = char['uuid_type'] 451 try: 452 read_val = \ 453 self.pri_dut.gattc_lib.readCharacteristicById( 454 char_id) 455 print(" Characteristic uuid / Value: {} / {}".format( 456 char_uuid, read_val['result'])) 457 str_value = "" 458 for val in read_val['result']: 459 str_value += chr(val) 460 print(" str val: {}".format(str_value)) 461 except Exception as err: 462 print(err) 463 pass 464 except Exception as err: 465 self.log.error(FAILURE.format(cmd, err)) 466 467 def do_gattc_read_all_desc(self, line): 468 """ 469 Description: Read all Descriptors values from a GATT server across 470 all services. 471 Assumptions: Basic GATT connection made. 472 Usage: 473 Examples: 474 gattc_read_all_chars 475 """ 476 cmd = "Read all descriptors from the GATT service." 477 try: 478 services = self.pri_dut.gattc_lib.listServices( 479 self.unique_mac_addr_id) 480 for service in services['result']: 481 service_id = service['id'] 482 service_uuid = service['uuid_type'] 483 self.pri_dut.gattc_lib.connectToService( 484 self.unique_mac_addr_id, service_id) 485 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 486 print("Reading descs in service uuid: {}".format(service_uuid)) 487 488 for char in chars['result']: 489 char_id = char['id'] 490 char_uuid = char['uuid_type'] 491 descriptors = char['descriptors'] 492 print(" Reading descs in char uuid: {}".format(char_uuid)) 493 for desc in descriptors: 494 desc_id = desc["id"] 495 desc_uuid = desc["uuid_type"] 496 try: 497 read_val = self.pri_dut.gattc_lib.readDescriptorById( 498 desc_id) 499 print(" Descriptor uuid / Value: {} / {}".format( 500 desc_uuid, read_val['result'])) 501 except Exception as err: 502 pass 503 except Exception as err: 504 self.log.error(FAILURE.format(cmd, err)) 505 506 def do_gattc_write_all_desc(self, line): 507 """ 508 Description: Write a value to all Descriptors on the GATT server. 509 Assumptions: Basic GATT connection made. 510 Input(s): 511 offset: Required. The offset to start writing to. 512 size: Required. The size of bytes to write (value will be generated). 513 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 514 Usage: 515 Examples: 516 gattc_write_all_desc 0 100 517 gattc_write_all_desc 10 2 518 """ 519 cmd = "Read all descriptors from the GATT service." 520 try: 521 args = line.split() 522 if len(args) != 2: 523 self.log.info("2 Arguments required: [Offset] [Size]") 524 return 525 offset = int(args[0]) 526 size = args[1] 527 write_value = [] 528 for i in range(int(size)): 529 write_value.append(i % 256) 530 services = self.pri_dut.gattc_lib.listServices( 531 self.unique_mac_addr_id) 532 for service in services['result']: 533 service_id = service['id'] 534 service_uuid = service['uuid_type'] 535 self.pri_dut.gattc_lib.connectToService( 536 self.unique_mac_addr_id, service_id) 537 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 538 print("Writing descs in service uuid: {}".format(service_uuid)) 539 540 for char in chars['result']: 541 char_id = char['id'] 542 char_uuid = char['uuid_type'] 543 descriptors = char['descriptors'] 544 print(" Reading descs in char uuid: {}".format(char_uuid)) 545 for desc in descriptors: 546 desc_id = desc["id"] 547 desc_uuid = desc["uuid_type"] 548 try: 549 write_val = self.pri_dut.gattc_lib.writeDescriptorById( 550 desc_id, offset, write_value) 551 print(" Descriptor uuid / Result: {} / {}".format( 552 desc_uuid, write_val['result'])) 553 except Exception as err: 554 pass 555 except Exception as err: 556 self.log.error(FAILURE.format(cmd, err)) 557 558 def do_gattc_read_all_long_desc(self, line): 559 """ 560 Description: Read all long Characteristic Descriptors 561 Assumptions: Basic GATT connection made. 562 Input(s): 563 offset: Required. The offset to start reading from. 564 max_bytes: Required. The max size of bytes to return. 565 Usage: 566 Examples: 567 gattc_read_all_long_desc 0 100 568 gattc_read_all_long_desc 10 20 569 """ 570 cmd = "Read all long descriptors from the GATT service." 571 try: 572 args = line.split() 573 if len(args) != 2: 574 self.log.info("2 Arguments required: [Offset] [Size]") 575 return 576 offset = int(args[0]) 577 max_bytes = int(args[1]) 578 services = self.pri_dut.ble_lib.bleListServices( 579 self.unique_mac_addr_id) 580 for service in services['result']: 581 service_id = service['id'] 582 service_uuid = service['uuid_type'] 583 self.pri_dut.gattc_lib.connectToService( 584 self.unique_mac_addr_id, service_id) 585 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 586 print("Reading descs in service uuid: {}".format(service_uuid)) 587 588 for char in chars['result']: 589 char_id = char['id'] 590 char_uuid = char['uuid_type'] 591 descriptors = char['descriptors'] 592 print(" Reading descs in char uuid: {}".format(char_uuid)) 593 for desc in descriptors: 594 desc_id = desc["id"] 595 desc_uuid = desc["uuid_type"] 596 try: 597 read_val = self.pri_dut.gattc_lib.readLongDescriptorById( 598 desc_id, offset, max_bytes) 599 print(" Descriptor uuid / Result: {} / {}".format( 600 desc_uuid, read_val['result'])) 601 except Exception as err: 602 pass 603 except Exception as err: 604 self.log.error(FAILURE.format(cmd, err)) 605 606 def do_gattc_read_all_long_char(self, line): 607 """ 608 Description: Read all long Characteristic 609 Assumptions: Basic GATT connection made. 610 Input(s): 611 offset: Required. The offset to start reading from. 612 max_bytes: Required. The max size of bytes to return. 613 Usage: 614 Examples: 615 gattc_read_all_long_char 0 100 616 gattc_read_all_long_char 10 20 617 """ 618 cmd = "Read all long Characteristics from the GATT service." 619 try: 620 args = line.split() 621 if len(args) != 2: 622 self.log.info("2 Arguments required: [Offset] [Size]") 623 return 624 offset = int(args[0]) 625 max_bytes = int(args[1]) 626 services = self.pri_dut.ble_lib.bleListServices( 627 self.unique_mac_addr_id) 628 for service in services['result']: 629 service_id = service['id'] 630 service_uuid = service['uuid_type'] 631 self.pri_dut.gattc_lib.connectToService( 632 self.unique_mac_addr_id, service_id) 633 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 634 print("Reading chars in service uuid: {}".format(service_uuid)) 635 636 for char in chars['result']: 637 char_id = char['id'] 638 char_uuid = char['uuid_type'] 639 try: 640 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 641 char_id, offset, max_bytes) 642 print(" Char uuid / Result: {} / {}".format( 643 char_uuid, read_val['result'])) 644 except Exception as err: 645 pass 646 except Exception as err: 647 self.log.error(FAILURE.format(cmd, err)) 648 649 def do_gattc_write_all_chars(self, line): 650 """ 651 Description: Write all characteristic values from a GATT server across 652 all services. 653 Assumptions: Basic GATT connection made. 654 Input(s): 655 offset: Required. The offset to start writing on. 656 size: The write value size (value will be generated) 657 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 658 Usage: 659 Examples: 660 gattc_write_all_chars 0 10 661 gattc_write_all_chars 10 1 662 """ 663 cmd = "Read all characteristics from the GATT service." 664 try: 665 args = line.split() 666 if len(args) != 2: 667 self.log.info("2 Arguments required: [Offset] [Size]") 668 return 669 offset = int(args[0]) 670 size = int(args[1]) 671 write_value = [] 672 for i in range(size): 673 write_value.append(i % 256) 674 services = self.pri_dut.gattc_lib.listServices( 675 self.unique_mac_addr_id) 676 for service in services['result']: 677 service_id = service['id'] 678 service_uuid = service['uuid_type'] 679 self.pri_dut.gattc_lib.connectToService( 680 self.unique_mac_addr_id, service_id) 681 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 682 print("Writing chars in service uuid: {}".format(service_uuid)) 683 684 for char in chars['result']: 685 char_id = char['id'] 686 char_uuid = char['uuid_type'] 687 try: 688 write_result = self.pri_dut.gattc_lib.writeCharById( 689 char_id, offset, write_value) 690 print(" Characteristic uuid write result: {} / {}". 691 format(char_uuid, write_result['result'])) 692 except Exception as err: 693 print("error writing char {}".format(err)) 694 pass 695 except Exception as err: 696 self.log.error(FAILURE.format(cmd, err)) 697 698 def do_gattc_write_all_chars_without_response(self, line): 699 """ 700 Description: Write all characteristic values from a GATT server across 701 all services. 702 Assumptions: Basic GATT connection made. 703 Input(s): 704 size: The write value size (value will be generated). 705 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 706 Usage: 707 Examples: 708 gattc_write_all_chars_without_response 100 709 """ 710 cmd = "Read all characteristics from the GATT service." 711 try: 712 args = line.split() 713 if len(args) != 1: 714 self.log.info("1 Arguments required: [Size]") 715 return 716 size = int(args[0]) 717 write_value = [] 718 for i in range(size): 719 write_value.append(i % 256) 720 services = self.pri_dut.gattc_lib.listServices( 721 self.unique_mac_addr_id) 722 for service in services['result']: 723 service_id = service['id'] 724 service_uuid = service['uuid_type'] 725 self.pri_dut.gattc_lib.connectToService( 726 self.unique_mac_addr_id, service_id) 727 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 728 print("Reading chars in service uuid: {}".format(service_uuid)) 729 730 for char in chars['result']: 731 char_id = char['id'] 732 char_uuid = char['uuid_type'] 733 try: 734 write_result = \ 735 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 736 char_id, write_value) 737 print(" Characteristic uuid write result: {} / {}". 738 format(char_uuid, write_result['result'])) 739 except Exception as err: 740 pass 741 except Exception as err: 742 self.log.error(FAILURE.format(cmd, err)) 743 744 def do_gattc_write_char_by_id(self, line): 745 """ 746 Description: Write char by characteristic id reference. 747 Assumptions: Already connected to a GATT server service. 748 Input(s): 749 characteristic_id: The characteristic id reference on the GATT 750 service 751 offset: The offset value to use 752 size: Function will generate random bytes by input size. 753 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 754 Usage: 755 Examples: 756 gattc_write_char_by_id char_id 0 5 757 gattc_write_char_by_id char_id 20 1 758 """ 759 cmd = "Write to GATT server characteristic ." 760 try: 761 args = line.split() 762 if len(args) != 3: 763 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 764 return 765 id = int(args[0], 16) 766 offset = int(args[1]) 767 size = int(args[2]) 768 write_value = [] 769 for i in range(size): 770 write_value.append(i % 256) 771 self.test_dut.gatt_client_write_characteristic_by_handle( 772 self.unique_mac_addr_id, id, offset, write_value) 773 except Exception as err: 774 self.log.error(FAILURE.format(cmd, err)) 775 776 def do_gattc_write_char_by_id_without_response(self, line): 777 """ 778 Description: Write char by characteristic id reference without response. 779 Assumptions: Already connected to a GATT server service. 780 Input(s): 781 characteristic_id: The characteristic id reference on the GATT 782 service 783 size: Function will generate random bytes by input size. 784 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 785 Usage: 786 Examples: 787 gattc_write_char_by_id_without_response char_id 5 788 """ 789 cmd = "Write characteristic by id without response." 790 try: 791 args = line.split() 792 if len(args) != 2: 793 self.log.info("2 Arguments required: [Id] [Size]") 794 return 795 id = int(args[0], 16) 796 size = args[1] 797 write_value = [] 798 for i in range(int(size)): 799 write_value.append(i % 256) 800 self.test_dut.gatt_client_write_characteristic_without_response_by_handle( 801 self.unique_mac_addr_id, id, write_value) 802 except Exception as err: 803 self.log.error(FAILURE.format(cmd, err)) 804 805 def do_gattc_enable_notify_char_by_id(self, line): 806 """ 807 Description: Enable Characteristic notification on Characteristic ID. 808 Assumptions: Already connected to a GATT server service. 809 Input(s): 810 characteristic_id: The characteristic id reference on the GATT 811 service 812 Usage: 813 Examples: 814 gattc_enable_notify_char_by_id char_id 815 """ 816 cmd = "Enable notifications by Characteristic id." 817 try: 818 id = int(line, 16) 819 self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle( 820 self.unique_mac_addr_id, id) 821 except Exception as err: 822 self.log.error(FAILURE.format(cmd, err)) 823 824 def do_gattc_disable_notify_char_by_id(self, line): 825 """ 826 Description: Disable Characteristic notification on Characteristic ID. 827 Assumptions: Already connected to a GATT server service. 828 Input(s): 829 characteristic_id: The characteristic id reference on the GATT 830 service 831 Usage: 832 Examples: 833 gattc_disable_notify_char_by_id char_id 834 """ 835 cmd = "Disable notify Characteristic by id." 836 try: 837 id = int(line, 16) 838 self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle( 839 self.unique_mac_addr_id, id) 840 except Exception as err: 841 self.log.error(FAILURE.format(cmd, err)) 842 843 def do_gattc_read_char_by_id(self, line): 844 """ 845 Description: Read Characteristic by ID. 846 Assumptions: Already connected to a GATT server service. 847 Input(s): 848 characteristic_id: The characteristic id reference on the GATT 849 service 850 Usage: 851 Examples: 852 gattc_read_char_by_id char_id 853 """ 854 cmd = "Read Characteristic value by ID." 855 try: 856 id = int(line, 16) 857 read_val = self.test_dut.gatt_client_read_characteristic_by_handle( 858 self.unique_mac_addr_id, id) 859 self.log.info("Characteristic Value with id {}: {}".format( 860 id, read_val)) 861 except Exception as err: 862 self.log.error(FAILURE.format(cmd, err)) 863 864 def do_gattc_write_desc_by_id(self, line): 865 """ 866 Description: Write Descriptor by characteristic id reference. 867 Assumptions: Already connected to a GATT server service. 868 Input(s): 869 descriptor_id: The Descriptor id reference on the GATT service 870 offset: The offset value to use 871 size: Function will generate random bytes by input size. 872 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 873 Usage: 874 Examples: 875 gattc_write_desc_by_id desc_id 0 5 876 gattc_write_desc_by_id desc_id 20 1 877 """ 878 cmd = "Write Descriptor by id." 879 try: 880 args = line.split() 881 id = int(args[0], 16) 882 offset = int(args[1]) 883 size = args[2] 884 write_value = [] 885 for i in range(int(size)): 886 write_value.append(i % 256) 887 write_result = self.test_dut.gatt_client_write_descriptor_by_handle( 888 self.unique_mac_addr_id, id, offset, write_value) 889 self.log.info("Descriptor Write result {}: {}".format( 890 id, write_result)) 891 except Exception as err: 892 self.log.error(FAILURE.format(cmd, err)) 893 894 def do_gattc_read_desc_by_id(self, line): 895 """ 896 Description: Read Descriptor by ID. 897 Assumptions: Already connected to a GATT server service. 898 Input(s): 899 descriptor_id: The Descriptor id reference on the GATT service 900 Usage: 901 Examples: 902 gattc_read_desc_by_id desc_id 903 """ 904 cmd = "Read Descriptor by ID." 905 try: 906 id = int(line, 16) 907 read_val = self.test_dut.gatt_client_read_descriptor_by_handle( 908 self.unique_mac_addr_id, id) 909 self.log.info("Descriptor Value with id {}: {}".format( 910 id, read_val)) 911 except Exception as err: 912 self.log.error(FAILURE.format(cmd, err)) 913 914 def do_gattc_read_long_char_by_id(self, line): 915 """ 916 Description: Read long Characteristic value by id. 917 Assumptions: Already connected to a GATT server service. 918 Input(s): 919 characteristic_id: The characteristic id reference on the GATT 920 service 921 offset: The offset value to use. 922 max_bytes: The max bytes size to return. 923 Usage: 924 Examples: 925 gattc_read_long_char_by_id char_id 0 10 926 gattc_read_long_char_by_id char_id 20 1 927 """ 928 cmd = "Read long Characteristic value by id." 929 try: 930 args = line.split() 931 if len(args) != 3: 932 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 933 return 934 id = int(args[0], 16) 935 offset = int(args[1]) 936 max_bytes = int(args[2]) 937 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 938 id, offset, max_bytes) 939 self.log.info("Characteristic Value with id {}: {}".format( 940 id, read_val['result'])) 941 942 except Exception as err: 943 self.log.error(FAILURE.format(cmd, err)) 944 945 """End GATT client wrappers""" 946 """Begin LE scan wrappers""" 947 def _update_scan_results(self, scan_results): 948 self.le_ids = [] 949 for scan in scan_results['result']: 950 self.le_ids.append(scan['id']) 951 952 def do_ble_start_scan(self, line): 953 """ 954 Description: Perform a BLE scan. 955 Default filter name: "" 956 Optional input: filter_device_name 957 Usage: 958 Examples: 959 ble_start_scan 960 ble_start_scan eddystone 961 """ 962 cmd = "Perform a BLE scan and list discovered devices." 963 try: 964 scan_filter = {"name_substring": ""} 965 if line: 966 scan_filter = {"name_substring": line} 967 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 968 except Exception as err: 969 self.log.error(FAILURE.format(cmd, err)) 970 971 def do_ble_stop_scan(self, line): 972 """ 973 Description: Stops a BLE scan and returns discovered devices. 974 Usage: 975 Examples: 976 ble_stop_scan 977 """ 978 cmd = "Stops a BLE scan and returns discovered devices." 979 try: 980 scan_results = self.pri_dut.gattc_lib.bleStopBleScan() 981 self._update_scan_results(scan_results) 982 self.log.info(pprint.pformat(scan_results)) 983 except Exception as err: 984 self.log.error(FAILURE.format(cmd, err)) 985 986 def do_ble_get_discovered_devices(self, line): 987 """ 988 Description: Get discovered LE devices of an active scan. 989 Usage: 990 Examples: 991 ble_stop_scan 992 """ 993 cmd = "Get discovered LE devices of an active scan." 994 try: 995 scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices() 996 self._update_scan_results(scan_results) 997 self.log.info(pprint.pformat(scan_results)) 998 except Exception as err: 999 self.log.error(FAILURE.format(cmd, err)) 1000 1001 """End LE scan wrappers""" 1002 """Begin GATT Server wrappers""" 1003 def do_gatts_close(self, line): 1004 """ 1005 Description: Close active GATT server. 1006 1007 Usage: 1008 Examples: 1009 gatts_close 1010 """ 1011 cmd = "Close active GATT server." 1012 try: 1013 result = self.pri_dut.gatts_lib.closeServer() 1014 self.log.info(result) 1015 except Exception as err: 1016 self.log.error(FAILURE.format(cmd, err)) 1017 1018 def complete_gatts_setup_database(self, text, line, begidx, endidx): 1019 if not text: 1020 completions = list( 1021 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 1022 else: 1023 completions = [ 1024 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 1025 if s.startswith(text) 1026 ] 1027 return completions 1028 1029 def do_gatts_setup_database(self, line): 1030 """ 1031 Description: Setup a Gatt server database based on pre-defined inputs. 1032 Supports Tab Autocomplete. 1033 Input(s): 1034 descriptor_db_name: The descriptor db name that matches one in 1035 acts.test_utils.bt.gatt_test_database 1036 Usage: 1037 Examples: 1038 gatts_setup_database LARGE_DB_1 1039 """ 1040 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 1041 try: 1042 scan_results = self.pri_dut.gatts_lib.publishServer( 1043 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 1044 self.log.info(scan_results) 1045 except Exception as err: 1046 self.log.error(FAILURE.format(cmd, err)) 1047 1048 """End GATT Server wrappers""" 1049 """Begin Bluetooth Controller wrappers""" 1050 def complete_btc_pair(self, text, line, begidx, endidx): 1051 """ Provides auto-complete for btc_pair cmd. 1052 1053 See Cmd module for full description. 1054 """ 1055 arg_completion = len(line.split(" ")) - 1 1056 pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE'] 1057 non_bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE'] 1058 transport_options = ['BREDR', 'LE'] 1059 if arg_completion == 1: 1060 if not text: 1061 completions = pairing_security_level_options 1062 else: 1063 completions = [ 1064 s for s in pairing_security_level_options 1065 if s.startswith(text) 1066 ] 1067 return completions 1068 if arg_completion == 2: 1069 if not text: 1070 completions = non_bondable_options 1071 else: 1072 completions = [ 1073 s for s in non_bondable_options if s.startswith(text) 1074 ] 1075 return completions 1076 if arg_completion == 3: 1077 if not text: 1078 completions = transport_options 1079 else: 1080 completions = [ 1081 s for s in transport_options if s.startswith(text) 1082 ] 1083 return completions 1084 1085 def do_btc_pair(self, line): 1086 """ 1087 Description: Sends an outgoing pairing request. 1088 1089 Input(s): 1090 pairing security level: ENCRYPTED, AUTHENTICATED, or NONE 1091 non_bondable: BONDABLE, NON_BONDABLE, or NONE 1092 transport: BREDR or LE 1093 1094 Usage: 1095 Examples: 1096 btc_pair NONE NONE BREDR 1097 btc_pair ENCRYPTED NONE LE 1098 btc_pair AUTHENTICATED NONE LE 1099 btc_pair NONE NON_BONDABLE BREDR 1100 """ 1101 cmd = "Send an outgoing pairing request." 1102 pairing_security_level_mapping = { 1103 "ENCRYPTED": 1, 1104 "AUTHENTICATED": 2, 1105 "NONE": None, 1106 } 1107 1108 non_bondable_mapping = { 1109 "BONDABLE": False, # Note: Reversed on purpose 1110 "NON_BONDABLE": True, # Note: Reversed on purpose 1111 "NONE": None, 1112 } 1113 1114 transport_mapping = { 1115 "BREDR": 1, 1116 "LE": 2, 1117 } 1118 1119 try: 1120 options = line.split(" ") 1121 result = self.test_dut.init_pair( 1122 self.unique_mac_addr_id, 1123 pairing_security_level_mapping.get(options[0]), 1124 non_bondable_mapping.get(options[1]), 1125 transport_mapping.get(options[2]), 1126 ) 1127 self.log.info(result) 1128 except Exception as err: 1129 self.log.error(FAILURE.format(cmd, err)) 1130 1131 def do_btc_accept_pairing(self, line): 1132 """ 1133 Description: Accept all incoming pairing requests. 1134 1135 Usage: 1136 Examples: 1137 btc_accept_pairing 1138 """ 1139 cmd = "Accept incoming pairing requests" 1140 try: 1141 result = self.pri_dut.btc_lib.acceptPairing() 1142 self.log.info(result) 1143 except Exception as err: 1144 self.log.error(FAILURE.format(cmd, err)) 1145 1146 def do_btc_forget_device(self, line): 1147 """ 1148 Description: Forget pairing of the current device under test. 1149 Current device under test is the device found by 1150 tool_refresh_unique_id from custom user param. This function 1151 will also perform a clean disconnect if actively connected. 1152 1153 Usage: 1154 Examples: 1155 btc_forget_device 1156 """ 1157 cmd = "For pairing of the current device under test." 1158 try: 1159 self.log.info("Forgetting device id: {}".format( 1160 self.unique_mac_addr_id)) 1161 result = self.pri_dut.btc_lib.forgetDevice(self.unique_mac_addr_id) 1162 self.log.info(result) 1163 except Exception as err: 1164 self.log.error(FAILURE.format(cmd, err)) 1165 1166 def do_btc_set_discoverable(self, discoverable): 1167 """ 1168 Description: Change Bluetooth Controller discoverablility. 1169 Input(s): 1170 discoverable: true to set discoverable 1171 false to set non-discoverable 1172 Usage: 1173 Examples: 1174 btc_set_discoverable true 1175 btc_set_discoverable false 1176 """ 1177 cmd = "Change Bluetooth Controller discoverablility." 1178 try: 1179 result = self.test_dut.set_discoverable( 1180 self.str_to_bool(discoverable)) 1181 self.log.info(result) 1182 except Exception as err: 1183 self.log.error(FAILURE.format(cmd, err)) 1184 1185 def do_btc_set_name(self, name): 1186 """ 1187 Description: Change Bluetooth Controller local name. 1188 Input(s): 1189 name: The name to set the Bluetooth Controller name to. 1190 1191 Usage: 1192 Examples: 1193 btc_set_name fs_test 1194 """ 1195 cmd = "Change Bluetooth Controller local name." 1196 try: 1197 result = self.test_dut.set_bluetooth_local_name(name) 1198 self.log.info(result) 1199 except Exception as err: 1200 self.log.error(FAILURE.format(cmd, err)) 1201 1202 def do_btc_request_discovery(self, discover): 1203 """ 1204 Description: Change whether the Bluetooth Controller is in active. 1205 discovery or not. 1206 Input(s): 1207 discover: true to start discovery 1208 false to end discovery 1209 Usage: 1210 Examples: 1211 btc_request_discovery true 1212 btc_request_discovery false 1213 """ 1214 cmd = "Change whether the Bluetooth Controller is in active." 1215 try: 1216 result = self.pri_dut.btc_lib.requestDiscovery( 1217 self.str_to_bool(discover)) 1218 self.log.info(result) 1219 except Exception as err: 1220 self.log.error(FAILURE.format(cmd, err)) 1221 1222 def do_btc_get_known_remote_devices(self, line): 1223 """ 1224 Description: Get a list of known devices. 1225 1226 Usage: 1227 Examples: 1228 btc_get_known_remote_devices 1229 """ 1230 cmd = "Get a list of known devices." 1231 self.bt_control_devices = [] 1232 try: 1233 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 1234 )['result'] 1235 for id_dict in device_list: 1236 device = device_list[id_dict] 1237 self.bt_control_devices.append(device) 1238 self.log.info("Device found {}".format(device)) 1239 1240 except Exception as err: 1241 self.log.error(FAILURE.format(cmd, err)) 1242 1243 def do_btc_forget_all_known_devices(self, line): 1244 """ 1245 Description: Forget all known devices. 1246 1247 Usage: 1248 Examples: 1249 btc_forget_all_known_devices 1250 """ 1251 cmd = "Forget all known devices." 1252 try: 1253 device_list = self.pri_dut.btc_lib.getKnownRemoteDevices( 1254 )['result'] 1255 for device in device_list: 1256 d = device_list[device] 1257 if d['bonded'] or d['connected']: 1258 self.log.info("Unbonding deivce: {}".format(d)) 1259 self.log.info( 1260 self.pri_dut.btc_lib.forgetDevice(d['id'])['result']) 1261 except Exception as err: 1262 self.log.error(FAILURE.format(cmd, err)) 1263 1264 def do_btc_connect_device(self, line): 1265 """ 1266 Description: Connect to device under test. 1267 Device under test is specified by either user params 1268 or 1269 tool_set_target_device_name <name> 1270 do_tool_refresh_unique_id_using_bt_control 1271 1272 Usage: 1273 Examples: 1274 btc_connect_device 1275 """ 1276 cmd = "Connect to device under test." 1277 try: 1278 result = self.pri_dut.btc_lib.connectDevice( 1279 self.unique_mac_addr_id) 1280 self.log.info(result) 1281 except Exception as err: 1282 self.log.error(FAILURE.format(cmd, err)) 1283 1284 def complete_btc_connect_device_by_id(self, text, line, begidx, endidx): 1285 if not text: 1286 completions = list(self.bt_control_ids)[:] 1287 else: 1288 completions = [ 1289 s for s in self.bt_control_ids if s.startswith(text) 1290 ] 1291 return completions 1292 1293 def do_btc_connect_device_by_id(self, device_id): 1294 """ 1295 Description: Connect to device id based on pre-defined inputs. 1296 Supports Tab Autocomplete. 1297 Input(s): 1298 device_id: The device id to connect to. 1299 1300 Usage: 1301 Examples: 1302 btc_connect_device_by_id <device_id> 1303 """ 1304 cmd = "Connect to device id based on pre-defined inputs." 1305 try: 1306 result = self.pri_dut.btc_lib.connectDevice(device_id) 1307 self.log.info(result) 1308 except Exception as err: 1309 self.log.error(FAILURE.format(cmd, err)) 1310 1311 def complete_btc_connect_device_by_name(self, text, line, begidx, endidx): 1312 if not text: 1313 completions = list(self.bt_control_names)[:] 1314 else: 1315 completions = [ 1316 s for s in self.bt_control_names if s.startswith(text) 1317 ] 1318 return completions 1319 1320 def do_btc_connect_device_by_name(self, device_name): 1321 """ 1322 Description: Connect to device id based on pre-defined inputs. 1323 Supports Tab Autocomplete. 1324 Input(s): 1325 device_id: The device id to connect to. 1326 1327 Usage: 1328 Examples: 1329 btc_connect_device_by_name <device_id> 1330 """ 1331 cmd = "Connect to device name based on pre-defined inputs." 1332 try: 1333 for device in self.bt_control_devices: 1334 if device_name is device['name']: 1335 1336 result = self.pri_dut.btc_lib.connectDevice(device['id']) 1337 self.log.info(result) 1338 except Exception as err: 1339 self.log.error(FAILURE.format(cmd, err)) 1340 1341 def do_btc_disconnect_device(self, line): 1342 """ 1343 Description: Disconnect to device under test. 1344 Device under test is specified by either user params 1345 or 1346 tool_set_target_device_name <name> 1347 do_tool_refresh_unique_id_using_bt_control 1348 1349 Usage: 1350 Examples: 1351 btc_disconnect_device 1352 """ 1353 cmd = "Disconnect to device under test." 1354 try: 1355 result = self.pri_dut.btc_lib.disconnectDevice( 1356 self.unique_mac_addr_id) 1357 self.log.info(result) 1358 except Exception as err: 1359 self.log.error(FAILURE.format(cmd, err)) 1360 1361 def do_btc_init_bluetooth_control(self, line): 1362 """ 1363 Description: Initialize the Bluetooth Controller. 1364 1365 Usage: 1366 Examples: 1367 btc_init_bluetooth_control 1368 """ 1369 cmd = "Initialize the Bluetooth Controller." 1370 try: 1371 result = self.test_dut.initialize_bluetooth_controller() 1372 self.log.info(result) 1373 except Exception as err: 1374 self.log.error(FAILURE.format(cmd, err)) 1375 1376 def do_btc_get_local_address(self, line): 1377 """ 1378 Description: Get the local BR/EDR address of the Bluetooth Controller. 1379 1380 Usage: 1381 Examples: 1382 btc_get_local_address 1383 """ 1384 cmd = "Get the local BR/EDR address of the Bluetooth Controller." 1385 try: 1386 result = self.test_dut.get_local_bluetooth_address() 1387 self.log.info(result) 1388 except Exception as err: 1389 self.log.error(FAILURE.format(cmd, err)) 1390 1391 def do_btc_input_pairing_pin(self, line): 1392 """ 1393 Description: Sends a pairing pin to SL4F's Bluetooth Control's 1394 Pairing Delegate. 1395 1396 Usage: 1397 Examples: 1398 btc_input_pairing_pin 123456 1399 """ 1400 cmd = "Input pairing pin to the Fuchsia device." 1401 try: 1402 result = self.pri_dut.btc_lib.inputPairingPin(line)['result'] 1403 self.log.info(result) 1404 except Exception as err: 1405 self.log.error(FAILURE.format(cmd, err)) 1406 1407 def do_btc_get_pairing_pin(self, line): 1408 """ 1409 Description: Gets the pairing pin from SL4F's Bluetooth Control's 1410 Pairing Delegate. 1411 1412 Usage: 1413 Examples: 1414 btc_get_pairing_pin 1415 """ 1416 cmd = "Get the pairing pin from the Fuchsia device." 1417 try: 1418 result = self.pri_dut.btc_lib.getPairingPin()['result'] 1419 self.log.info(result) 1420 except Exception as err: 1421 self.log.error(FAILURE.format(cmd, err)) 1422 1423 """End Bluetooth Control wrappers""" 1424 """Begin Profile Server wrappers""" 1425 def do_sdp_pts_example(self, num_of_records): 1426 """ 1427 Description: An example of how to setup a generic SDP record 1428 and SDP search capabilities. This example will pass a few 1429 SDP tests. 1430 1431 Input(s): 1432 num_of_records: The number of records to add. 1433 1434 Usage: 1435 Examples: 1436 sdp_pts_example 1 1437 sdp pts_example 10 1438 """ 1439 cmd = "Setup SDP for PTS testing." 1440 record = { 1441 'service_class_uuids': ["0001"], 1442 'protocol_descriptors': [ 1443 { 1444 'protocol': 1445 int(sig_uuid_constants['AVDTP'], 16), 1446 'params': [ 1447 { 1448 'data': 0x0103 # to indicate 1.3 1449 }, 1450 { 1451 'data': 0x0105 # to indicate 1.5 1452 } 1453 ] 1454 }, 1455 { 1456 'protocol': int(sig_uuid_constants['SDP'], 16), 1457 'params': [{ 1458 'data': int(sig_uuid_constants['AVDTP'], 16), 1459 }] 1460 } 1461 ], 1462 'profile_descriptors': [{ 1463 'profile_id': 1464 int(sig_uuid_constants['AdvancedAudioDistribution'], 16), 1465 'major_version': 1466 1, 1467 'minor_version': 1468 3, 1469 }], 1470 'additional_protocol_descriptors': [{ 1471 'protocol': 1472 int(sig_uuid_constants['L2CAP'], 16), 1473 'params': [ 1474 { 1475 'data': int(sig_uuid_constants['AVDTP'], 16), 1476 }, 1477 { 1478 'data': int(sig_uuid_constants['AVCTP'], 16), 1479 }, 1480 { 1481 'data': int(sig_uuid_constants['GenericAudio'], 16), 1482 }, 1483 ] 1484 }], 1485 'information': [{ 1486 'language': "en", 1487 'name': "A2DP", 1488 'description': "Advanced Audio Distribution Profile", 1489 'provider': "Fuchsia" 1490 }], 1491 'additional_attributes': [ 1492 { 1493 'id': 0x0200, 1494 'element': { 1495 'data': int(sig_uuid_constants['AVDTP'], 16) 1496 } 1497 }, 1498 { 1499 'id': 0x0201, 1500 'element': { 1501 'data': int(sig_uuid_constants['AVDTP'], 16) 1502 } 1503 }, 1504 ] 1505 } 1506 1507 attributes = [ 1508 bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'], 1509 bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'], 1510 bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'], 1511 bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'], 1512 bt_attribute_values['ATTR_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST'], 1513 bt_attribute_values['ATTR_SERVICE_RECORD_HANDLE'], 1514 ] 1515 1516 try: 1517 self.pri_dut.sdp_lib.addSearch( 1518 attributes, int(sig_uuid_constants['AudioSource'], 16)) 1519 self.pri_dut.sdp_lib.addSearch( 1520 attributes, 1521 int(sig_uuid_constants['AdvancedAudioDistribution'], 16)) 1522 for _ in range(int(num_of_records)): 1523 result = self.pri_dut.sdp_lib.addService(record) 1524 self.log.info(result) 1525 except Exception as err: 1526 self.log.error(FAILURE.format(cmd, err)) 1527 1528 def do_sdp_cleanup(self, line): 1529 """ 1530 Description: Cleanup any existing SDP records 1531 1532 Usage: 1533 Examples: 1534 sdp_cleanup 1535 """ 1536 cmd = "Cleanup SDP objects." 1537 try: 1538 result = self.pri_dut.sdp_lib.cleanUp() 1539 self.log.info(result) 1540 except Exception as err: 1541 self.log.error(FAILURE.format(cmd, err)) 1542 1543 def do_sdp_init(self, line): 1544 """ 1545 Description: Init the profile proxy for setting up SDP records 1546 1547 Usage: 1548 Examples: 1549 sdp_init 1550 """ 1551 cmd = "Initialize profile proxy objects for adding SDP records" 1552 try: 1553 result = self.pri_dut.sdp_lib.init() 1554 self.log.info(result) 1555 except Exception as err: 1556 self.log.error(FAILURE.format(cmd, err)) 1557 1558 def do_sdp_connect_l2cap(self, psm): 1559 """ 1560 Description: Send an l2cap connection request over an input psm value. 1561 1562 Note: Must be already connected to a peer. 1563 1564 Input(s): 1565 psm: The int hex value to connect over. Available PSMs: 1566 SDP 0x0001 See Bluetooth Service Discovery Protocol (SDP) 1567 RFCOMM 0x0003 See RFCOMM with TS 07.10 1568 TCS-BIN 0x0005 See Bluetooth Telephony Control Specification / 1569 TCS Binary 1570 TCS-BIN-CORDLESS 0x0007 See Bluetooth Telephony Control 1571 Specification / TCS Binary 1572 BNEP 0x000F See Bluetooth Network Encapsulation Protocol 1573 HID_Control 0x0011 See Human Interface Device 1574 HID_Interrupt 0x0013 See Human Interface Device 1575 UPnP 0x0015 See [ESDP] 1576 AVCTP 0x0017 See Audio/Video Control Transport Protocol 1577 AVDTP 0x0019 See Audio/Video Distribution Transport Protocol 1578 AVCTP_Browsing 0x001B See Audio/Video Remote Control Profile 1579 UDI_C-Plane 0x001D See the Unrestricted Digital Information 1580 Profile [UDI] 1581 ATT 0x001F See Bluetooth Core Specification 1582 3DSP 0x0021 See 3D Synchronization Profile. 1583 LE_PSM_IPSP 0x0023 See Internet Protocol Support Profile 1584 (IPSP) 1585 OTS 0x0025 See Object Transfer Service (OTS) 1586 EATT 0x0027 See Bluetooth Core Specification 1587 1588 Usage: 1589 Examples: 1590 sdp_connect_l2cap 0001 1591 sdp_connect_l2cap 0015 1592 """ 1593 cmd = "Connect l2cap" 1594 try: 1595 result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id, 1596 int(psm, 16)) 1597 self.log.info(result) 1598 except Exception as err: 1599 self.log.error(FAILURE.format(cmd, err)) 1600 1601 """End Profile Server wrappers""" 1602 """Begin AVDTP wrappers""" 1603 def complete_avdtp_init(self, text, line, begidx, endidx): 1604 roles = ["sink", "source"] 1605 if not text: 1606 completions = roles 1607 else: 1608 completions = [s for s in roles if s.startswith(text)] 1609 return completions 1610 1611 def do_avdtp_init(self, role): 1612 """ 1613 Description: Init the AVDTP and A2DP service corresponding to the input 1614 role. 1615 1616 Input(s): 1617 role: The specified role. Either 'source' or 'sink'. 1618 1619 Usage: 1620 Examples: 1621 avdtp_init source 1622 avdtp_init sink 1623 """ 1624 cmd = "Initialize AVDTP proxy" 1625 try: 1626 result = self.pri_dut.avdtp_lib.init(role) 1627 self.log.info(result) 1628 except Exception as err: 1629 self.log.error(FAILURE.format(cmd, err)) 1630 1631 def do_avdtp_kill_a2dp_sink(self, line): 1632 """ 1633 Description: Quickly kill any A2DP sink service currently running on the 1634 device. 1635 1636 Usage: 1637 Examples: 1638 avdtp_kill_a2dp_sink 1639 """ 1640 cmd = "Killing A2DP sink" 1641 try: 1642 result = self.pri_dut.control_daemon("bt-a2dp-sink.cmx", "stop") 1643 self.log.info(result) 1644 except Exception as err: 1645 self.log.error(FAILURE.format(cmd, err)) 1646 1647 def do_avdtp_kill_a2dp_source(self, line): 1648 """ 1649 Description: Quickly kill any A2DP source service currently running on 1650 the device. 1651 1652 Usage: 1653 Examples: 1654 avdtp_kill_a2dp_source 1655 """ 1656 cmd = "Killing A2DP source" 1657 try: 1658 result = self.pri_dut.control_daemon("bt-a2dp-source.cmx", "stop") 1659 self.log.info(result) 1660 except Exception as err: 1661 self.log.error(FAILURE.format(cmd, err)) 1662 1663 def do_avdtp_get_connected_peers(self, line): 1664 """ 1665 Description: Get the connected peers for the AVDTP service 1666 1667 Usage: 1668 Examples: 1669 avdtp_get_connected_peers 1670 """ 1671 cmd = "AVDTP get connected peers" 1672 try: 1673 result = self.pri_dut.avdtp_lib.getConnectedPeers() 1674 self.log.info(result) 1675 except Exception as err: 1676 self.log.error(FAILURE.format(cmd, err)) 1677 1678 def do_avdtp_set_configuration(self, peer_id): 1679 """ 1680 Description: Send AVDTP command to connected peer: set configuration 1681 1682 Input(s): 1683 peer_id: The specified peer_id. 1684 1685 Usage: 1686 Examples: 1687 avdtp_set_configuration <peer_id> 1688 """ 1689 cmd = "Send AVDTP set configuration to connected peer" 1690 try: 1691 result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id)) 1692 self.log.info(result) 1693 except Exception as err: 1694 self.log.error(FAILURE.format(cmd, err)) 1695 1696 def do_avdtp_get_configuration(self, peer_id): 1697 """ 1698 Description: Send AVDTP command to connected peer: get configuration 1699 1700 Input(s): 1701 peer_id: The specified peer_id. 1702 1703 Usage: 1704 Examples: 1705 avdtp_get_configuration <peer_id> 1706 """ 1707 cmd = "Send AVDTP get configuration to connected peer" 1708 try: 1709 result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id)) 1710 self.log.info(result) 1711 except Exception as err: 1712 self.log.error(FAILURE.format(cmd, err)) 1713 1714 def do_avdtp_get_capabilities(self, peer_id): 1715 """ 1716 Description: Send AVDTP command to connected peer: get capabilities 1717 1718 Input(s): 1719 peer_id: The specified peer_id. 1720 1721 Usage: 1722 Examples: 1723 avdtp_get_capabilities <peer_id> 1724 """ 1725 cmd = "Send AVDTP get capabilities to connected peer" 1726 try: 1727 result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id)) 1728 self.log.info(result) 1729 except Exception as err: 1730 self.log.error(FAILURE.format(cmd, err)) 1731 1732 def do_avdtp_get_all_capabilities(self, peer_id): 1733 """ 1734 Description: Send AVDTP command to connected peer: get all capabilities 1735 1736 Input(s): 1737 peer_id: The specified peer_id. 1738 1739 Usage: 1740 Examples: 1741 avdtp_get_all_capabilities <peer_id> 1742 """ 1743 cmd = "Send AVDTP get all capabilities to connected peer" 1744 try: 1745 result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id)) 1746 self.log.info(result) 1747 except Exception as err: 1748 self.log.error(FAILURE.format(cmd, err)) 1749 1750 def do_avdtp_reconfigure_stream(self, peer_id): 1751 """ 1752 Description: Send AVDTP command to connected peer: reconfigure stream 1753 1754 Input(s): 1755 peer_id: The specified peer_id. 1756 1757 Usage: 1758 Examples: 1759 avdtp_reconfigure_stream <peer_id> 1760 """ 1761 cmd = "Send AVDTP reconfigure stream to connected peer" 1762 try: 1763 result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id)) 1764 self.log.info(result) 1765 except Exception as err: 1766 self.log.error(FAILURE.format(cmd, err)) 1767 1768 def do_avdtp_suspend_stream(self, peer_id): 1769 """ 1770 Description: Send AVDTP command to connected peer: suspend stream 1771 1772 Input(s): 1773 peer_id: The specified peer_id. 1774 1775 Usage: 1776 Examples: 1777 avdtp_suspend_stream <peer_id> 1778 """ 1779 cmd = "Send AVDTP suspend stream to connected peer" 1780 try: 1781 result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id)) 1782 self.log.info(result) 1783 except Exception as err: 1784 self.log.error(FAILURE.format(cmd, err)) 1785 1786 def do_avdtp_suspend_reconfigure(self, peer_id): 1787 """ 1788 Description: Send AVDTP command to connected peer: suspend reconfigure 1789 1790 Input(s): 1791 peer_id: The specified peer_id. 1792 1793 Usage: 1794 Examples: 1795 avdtp_suspend_reconfigure <peer_id> 1796 """ 1797 cmd = "Send AVDTP suspend reconfigure to connected peer" 1798 try: 1799 result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id)) 1800 self.log.info(result) 1801 except Exception as err: 1802 self.log.error(FAILURE.format(cmd, err)) 1803 1804 def do_avdtp_release_stream(self, peer_id): 1805 """ 1806 Description: Send AVDTP command to connected peer: release stream 1807 1808 Input(s): 1809 peer_id: The specified peer_id. 1810 1811 Usage: 1812 Examples: 1813 avdtp_release_stream <peer_id> 1814 """ 1815 cmd = "Send AVDTP release stream to connected peer" 1816 try: 1817 result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id)) 1818 self.log.info(result) 1819 except Exception as err: 1820 self.log.error(FAILURE.format(cmd, err)) 1821 1822 def do_avdtp_establish_stream(self, peer_id): 1823 """ 1824 Description: Send AVDTP command to connected peer: establish stream 1825 1826 Input(s): 1827 peer_id: The specified peer_id. 1828 1829 Usage: 1830 Examples: 1831 avdtp_establish_stream <peer_id> 1832 """ 1833 cmd = "Send AVDTP establish stream to connected peer" 1834 try: 1835 result = self.pri_dut.avdtp_lib.establishStream(int(peer_id)) 1836 self.log.info(result) 1837 except Exception as err: 1838 self.log.error(FAILURE.format(cmd, err)) 1839 1840 def do_avdtp_start_stream(self, peer_id): 1841 """ 1842 Description: Send AVDTP command to connected peer: start stream 1843 1844 Input(s): 1845 peer_id: The specified peer_id. 1846 1847 Usage: 1848 Examples: 1849 avdtp_start_stream <peer_id> 1850 """ 1851 cmd = "Send AVDTP start stream to connected peer" 1852 try: 1853 result = self.pri_dut.avdtp_lib.startStream(int(peer_id)) 1854 self.log.info(result) 1855 except Exception as err: 1856 self.log.error(FAILURE.format(cmd, err)) 1857 1858 def do_avdtp_abort_stream(self, peer_id): 1859 """ 1860 Description: Send AVDTP command to connected peer: abort stream 1861 1862 Input(s): 1863 peer_id: The specified peer_id. 1864 1865 Usage: 1866 Examples: 1867 avdtp_abort_stream <peer_id> 1868 """ 1869 cmd = "Send AVDTP abort stream to connected peer" 1870 try: 1871 result = self.pri_dut.avdtp_lib.abortStream(int(peer_id)) 1872 self.log.info(result) 1873 except Exception as err: 1874 self.log.error(FAILURE.format(cmd, err)) 1875 1876 def do_avdtp_remove_service(self, line): 1877 """ 1878 Description: Removes the AVDTP service in use. 1879 1880 Usage: 1881 Examples: 1882 avdtp_establish_stream <peer_id> 1883 """ 1884 cmd = "Remove AVDTP service" 1885 try: 1886 result = self.pri_dut.avdtp_lib.removeService() 1887 self.log.info(result) 1888 except Exception as err: 1889 self.log.error(FAILURE.format(cmd, err)) 1890 1891 """End AVDTP wrappers""" 1892