1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18
19Class CmdInput inherts from the cmd library.
20
21Functions that start with "do_" have a method
22signature that doesn't match the actual command
23line command and that is intended. This is so the
24"help" command knows what to display (in this case
25the documentation of the command itself).
26
27For example:
28Looking at the function "do_tool_set_target_device_name"
29has the inputs self and line which is expected of this type
30of method signature. When the "help" command is done on the
31method name you get the function documentation as such:
32
33(Cmd) help tool_set_target_device_name
34
35        Description: Reset the target device name.
36        Input(s):
37            device_name: Required. The advertising name to connect to.
38        Usage: tool_set_target_device_name new_target_device name
39          Examples:
40            tool_set_target_device_name le_watch
41
42This is all to say this documentation pattern is expected.
43
44"""
45
46from acts.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
47from acts.test_utils.bt.bt_constants import bt_attribute_values
48from acts.test_utils.bt.bt_constants import sig_uuid_constants
49
50import acts.test_utils.bt.gatt_test_database as gatt_test_database
51
52import cmd
53import pprint
54import time
55"""Various Global Strings"""
56CMD_LOG = "CMD {} result: {}"
57FAILURE = "CMD {} threw exception: {}"
58BASIC_ADV_NAME = "fs_test"
59
60
61class CmdInput(cmd.Cmd):
62    ble_advertise_interval = 1000
63    bt_control_ids = []
64    bt_control_names = []
65    bt_control_devices = []
66    bt_scan_poll_timer = 0.5
67    target_device_name = ""
68    le_ids = []
69    unique_mac_addr_id = None
70
71    def setup_vars(self, dut, target_device_name, log):
72        self.pri_dut = dut
73        # Note: test_dut is the start of a slow conversion from a Fuchsia specific
74        # Tool to an abstract_device tool. Only commands that use test_dut will work
75        # Otherwise this tool is primarially targeted at Fuchsia devices.
76        self.test_dut = create_bluetooth_device(self.pri_dut)
77        self.test_dut.initialize_bluetooth_controller()
78        self.target_device_name = target_device_name
79        self.log = log
80
81    def emptyline(self):
82        pass
83
84    def do_EOF(self, line):
85        "End Script"
86        return True
87
88    """ Useful Helper functions and cmd line tooling """
89    def str_to_bool(self, s):
90        if s.lower() == 'true':
91            return True
92        elif s.lower() == 'false':
93            return False
94
95    def _find_unique_id_over_le(self):
96        scan_filter = {"name_substring": self.target_device_name}
97        self.unique_mac_addr_id = None
98        self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
99        tries = 10
100        for i in range(tries):
101            time.sleep(self.bt_scan_poll_timer)
102            scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices(
103            )['result']
104            for device in scan_res:
105                name, did, connectable = device["name"], device["id"], device[
106                    "connectable"]
107                if (self.target_device_name in name):
108                    self.unique_mac_addr_id = did
109                    self.log.info(
110                        "Successfully found device: name, id: {}, {}".format(
111                            name, did))
112                    break
113            if self.unique_mac_addr_id:
114                break
115        self.pri_dut.gattc_lib.bleStopBleScan()
116
117    def _find_unique_id_over_bt_control(self):
118        self.unique_mac_addr_id = None
119        self.bt_control_devices = []
120        self.pri_dut.btc_lib.requestDiscovery(True)
121        tries = 10
122        for i in range(tries):
123            if self.unique_mac_addr_id:
124                break
125            time.sleep(self.bt_scan_poll_timer)
126            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
127            )['result']
128            for id_dict in device_list:
129                device = device_list[id_dict]
130                self.bt_control_devices.append(device)
131                name = None
132                if device['name'] is not None:
133                    name = device['name']
134                did, address = device['id'], device['address']
135
136                self.bt_control_ids.append(did)
137                if name is not None:
138                    self.bt_control_names.append(name)
139                    if self.target_device_name in name:
140                        self.unique_mac_addr_id = did
141                        self.log.info(
142                            "Successfully found device: name, id, address: {}, {}, {}"
143                            .format(name, did, address))
144                        break
145        self.pri_dut.btc_lib.requestDiscovery(False)
146
147    def do_tool_take_bt_snoop_log(self, custom_name):
148        """
149        Description: Takes the bt snoop log from the Fuchsia device.
150        Logs will show up in your config files' logpath directory.
151
152        Input(s):
153            custom_name: Optional. Override the default pcap file name.
154
155        Usage: tool_set_target_device_name new_target_device name
156          Examples:
157            tool_take_bt_snoop_log connection_error
158            tool_take_bt_snoop_log
159        """
160        self.pri_dut.take_bt_snoop_log(custom_name)
161
162    def do_tool_refresh_unique_id(self, line):
163        """
164        Description: Refresh command line tool mac unique id.
165        Usage:
166          Examples:
167            tool_refresh_unique_id
168        """
169        try:
170            self._find_unique_id_over_le()
171        except Exception as err:
172            self.log.error(
173                "Failed to scan or find scan result: {}".format(err))
174
175    def do_tool_refresh_unique_id_using_bt_control(self, line):
176        """
177        Description: Refresh command line tool mac unique id.
178        Usage:
179          Examples:
180            tool_refresh_unique_id_using_bt_control
181        """
182        try:
183            self._find_unique_id_over_bt_control()
184        except Exception as err:
185            self.log.error(
186                "Failed to scan or find scan result: {}".format(err))
187
188    def do_tool_set_target_device_name(self, line):
189        """
190        Description: Reset the target device name.
191        Input(s):
192            device_name: Required. The advertising name to connect to.
193        Usage: tool_set_target_device_name new_target_device name
194          Examples:
195            tool_set_target_device_name le_watch
196        """
197        self.log.info("Setting target_device_name to: {}".format(line))
198        self.target_device_name = line
199
200    """Begin BLE advertise wrappers"""
201    def do_ble_start_generic_connectable_advertisement(self, line):
202        """
203        Description: Start a connectable LE advertisement
204        Usage: ble_start_generic_connectable_advertisement
205        """
206        cmd = "Start a connectable LE advertisement"
207        try:
208            adv_data = {"name": BASIC_ADV_NAME}
209            self.pri_dut.ble_lib.bleStartBleAdvertising(
210                adv_data, self.ble_advertise_interval)
211        except Exception as err:
212            self.log.error(FAILURE.format(cmd, err))
213
214    def do_ble_start_generic_nonconnectable_advertisement(self, line):
215        """
216        Description: Start a non-connectable LE advertisement
217        Usage: ble_start_generic_nonconnectable_advertisement
218        """
219        cmd = "Start a nonconnectable LE advertisement"
220        try:
221            adv_data = {"name": BASIC_ADV_NAME}
222            self.pri_dut.ble_lib.bleStartBleAdvertising(
223                adv_data, self.ble_advertise_interval, False)
224        except Exception as err:
225            self.log.error(FAILURE.format(cmd, err))
226
227    def do_ble_stop_advertisement(self, line):
228        """
229        Description: Stop a BLE advertisement.
230        Usage: ble_stop_advertisement
231        """
232        cmd = "Stop a connectable LE advertisement"
233        try:
234            self.pri_dut.ble_lib.bleStopBleAdvertising()
235        except Exception as err:
236            self.log.error(FAILURE.format(cmd, err))
237
238    """End BLE advertise wrappers"""
239    """Begin GATT client wrappers"""
240    def complete_gattc_connect_by_id(self, text, line, begidx, endidx):
241        if not text:
242            completions = list(self.le_ids)[:]
243        else:
244            completions = [s for s in self.le_ids if s.startswith(text)]
245        return completions
246
247    def do_gattc_connect_by_id(self, line):
248        """
249        Description: Connect to a LE peripheral.
250        Input(s):
251            device_id: Required. The unique device ID from Fuchsia
252                discovered devices.
253        Usage:
254          Examples:
255            gattc_connect device_id
256        """
257        cmd = "Connect to a LE peripheral by input ID."
258        try:
259
260            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
261                line)
262            self.log.info("Connection status: {}".format(
263                pprint.pformat(connection_status)))
264        except Exception as err:
265            self.log.error(FAILURE.format(cmd, err))
266
267    def do_gattc_connect(self, line):
268        """
269        Description: Connect to a LE peripheral.
270        Optional input: device_name
271        Input(s):
272            device_name: Optional. The peripheral ID to connect to.
273        Usage:
274          Examples:
275            gattc_connect
276            gattc_connect eddystone_123
277        """
278        cmd = "Connect to a LE peripheral."
279        try:
280            if len(line) > 0:
281                self.target_device_name = line
282                self.unique_mac_addr_id = None
283            if not self.unique_mac_addr_id:
284                try:
285                    self._find_unique_id()
286                except Exception as err:
287                    self.log.info("Failed to scan or find device.")
288                    return
289            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
290                self.unique_mac_addr_id)
291            self.log.info("Connection status: {}".format(
292                pprint.pformat(connection_status)))
293        except Exception as err:
294            self.log.error(FAILURE.format(cmd, err))
295
296    def do_gattc_connect_disconnect_iterations(self, line):
297        """
298        Description: Connect then disconnect to a LE peripheral multiple times.
299        Input(s):
300            iterations: Required. The number of iterations to run.
301        Usage:
302          Examples:
303            gattc_connect_disconnect_iterations 10
304        """
305        cmd = "Connect to a LE peripheral."
306        try:
307            if not self.unique_mac_addr_id:
308                try:
309                    self._find_unique_id()
310                except Exception as err:
311                    self.log.info("Failed to scan or find device.")
312                    return
313            for i in range(int(line)):
314                self.log.info("Running iteration {}".format(i + 1))
315                connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
316                    self.unique_mac_addr_id)
317                self.log.info("Connection status: {}".format(
318                    pprint.pformat(connection_status)))
319                time.sleep(4)
320                disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
321                    self.unique_mac_addr_id)
322                self.log.info("Disconnect status: {}".format(disc_status))
323                time.sleep(3)
324        except Exception as err:
325            self.log.error(FAILURE.format(cmd, err))
326
327    def do_gattc_disconnect(self, line):
328        """
329        Description: Disconnect from LE peripheral.
330        Assumptions: Already connected to a peripheral.
331        Usage:
332          Examples:
333            gattc_disconnect
334        """
335        cmd = "Disconenct from LE peripheral."
336        try:
337            disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
338                self.unique_mac_addr_id)
339            self.log.info("Disconnect status: {}".format(disconnect_status))
340        except Exception as err:
341            self.log.error(FAILURE.format(cmd, err))
342
343    def do_gattc_list_services(self, line):
344        """
345        Description: List services from LE peripheral.
346        Assumptions: Already connected to a peripheral.
347        Usage:
348          Examples:
349            gattc_list_services
350        """
351        cmd = "List services from LE peripheral."
352        try:
353            services = self.pri_dut.gattc_lib.listServices(
354                self.unique_mac_addr_id)
355            self.log.info("Discovered Services: \n{}".format(
356                pprint.pformat(services)))
357        except Exception as err:
358            self.log.error(FAILURE.format(cmd, err))
359
360    def do_gattc_connect_to_service(self, line):
361        """
362        Description: Connect to Peripheral GATT server service.
363        Assumptions: Already connected to peripheral.
364        Input(s):
365            service_id: Required. The service id reference on the GATT server.
366        Usage:
367          Examples:
368            gattc_connect_to_service service_id
369        """
370        cmd = "GATT client connect to GATT server service."
371        try:
372            self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id,
373                                                    int(line))
374        except Exception as err:
375            self.log.error(FAILURE.format(cmd, err))
376
377    def do_gattc_discover_characteristics(self, line):
378        """
379        Description: Discover characteristics from a connected service.
380        Assumptions: Already connected to a GATT server service.
381        Usage:
382          Examples:
383            gattc_discover_characteristics
384        """
385        cmd = "Discover and list characteristics from a GATT server."
386        try:
387            chars = self.pri_dut.gattc_lib.discoverCharacteristics()
388            self.log.info("Discovered chars:\n{}".format(
389                pprint.pformat(chars)))
390        except Exception as err:
391            self.log.error(FAILURE.format(cmd, err))
392
393    def do_gattc_notify_all_chars(self, line):
394        """
395        Description: Enable all notifications on all Characteristics on
396            a GATT server.
397        Assumptions: Basic GATT connection made.
398        Usage:
399          Examples:
400            gattc_notify_all_chars
401        """
402        cmd = "Read all characteristics from the GATT service."
403        try:
404            services = self.pri_dut.gattc_lib.listServices(
405                self.unique_mac_addr_id)
406            for service in services['result']:
407                service_id = service['id']
408                service_uuid = service['uuid_type']
409                self.pri_dut.gattc_lib.connectToService(
410                    self.unique_mac_addr_id, service_id)
411                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
412                print("Reading chars in service uuid: {}".format(service_uuid))
413
414                for char in chars['result']:
415                    char_id = char['id']
416                    char_uuid = char['uuid_type']
417                    # quick char filter for apple-4 test... remove later
418                    print("found uuid {}".format(char_uuid))
419                    try:
420                        self.pri_dut.gattc_lib.enableNotifyCharacteristic(
421                            char_id)
422                    except Exception as err:
423                        print("error enabling notification")
424        except Exception as err:
425            self.log.error(FAILURE.format(cmd, err))
426
427    def do_gattc_read_all_chars(self, line):
428        """
429        Description: Read all Characteristic values from a GATT server across
430            all services.
431        Assumptions: Basic GATT connection made.
432        Usage:
433          Examples:
434            gattc_read_all_chars
435        """
436        cmd = "Read all characteristics from the GATT service."
437        try:
438            services = self.pri_dut.gattc_lib.listServices(
439                self.unique_mac_addr_id)
440            for service in services['result']:
441                service_id = service['id']
442                service_uuid = service['uuid_type']
443                self.pri_dut.gattc_lib.connectToService(
444                    self.unique_mac_addr_id, service_id)
445                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
446                print("Reading chars in service uuid: {}".format(service_uuid))
447
448                for char in chars['result']:
449                    char_id = char['id']
450                    char_uuid = char['uuid_type']
451                    try:
452                        read_val =  \
453                            self.pri_dut.gattc_lib.readCharacteristicById(
454                                char_id)
455                        print("  Characteristic uuid / Value: {} / {}".format(
456                            char_uuid, read_val['result']))
457                        str_value = ""
458                        for val in read_val['result']:
459                            str_value += chr(val)
460                        print("    str val: {}".format(str_value))
461                    except Exception as err:
462                        print(err)
463                        pass
464        except Exception as err:
465            self.log.error(FAILURE.format(cmd, err))
466
467    def do_gattc_read_all_desc(self, line):
468        """
469        Description: Read all Descriptors values from a GATT server across
470            all services.
471        Assumptions: Basic GATT connection made.
472        Usage:
473          Examples:
474            gattc_read_all_chars
475        """
476        cmd = "Read all descriptors from the GATT service."
477        try:
478            services = self.pri_dut.gattc_lib.listServices(
479                self.unique_mac_addr_id)
480            for service in services['result']:
481                service_id = service['id']
482                service_uuid = service['uuid_type']
483                self.pri_dut.gattc_lib.connectToService(
484                    self.unique_mac_addr_id, service_id)
485                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
486                print("Reading descs in service uuid: {}".format(service_uuid))
487
488                for char in chars['result']:
489                    char_id = char['id']
490                    char_uuid = char['uuid_type']
491                    descriptors = char['descriptors']
492                    print("  Reading descs in char uuid: {}".format(char_uuid))
493                    for desc in descriptors:
494                        desc_id = desc["id"]
495                        desc_uuid = desc["uuid_type"]
496                    try:
497                        read_val = self.pri_dut.gattc_lib.readDescriptorById(
498                            desc_id)
499                        print("    Descriptor uuid / Value: {} / {}".format(
500                            desc_uuid, read_val['result']))
501                    except Exception as err:
502                        pass
503        except Exception as err:
504            self.log.error(FAILURE.format(cmd, err))
505
506    def do_gattc_write_all_desc(self, line):
507        """
508        Description: Write a value to all Descriptors on the GATT server.
509        Assumptions: Basic GATT connection made.
510        Input(s):
511            offset: Required. The offset to start writing to.
512            size: Required. The size of bytes to write (value will be generated).
513                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
514        Usage:
515          Examples:
516            gattc_write_all_desc 0 100
517            gattc_write_all_desc 10 2
518        """
519        cmd = "Read all descriptors from the GATT service."
520        try:
521            args = line.split()
522            if len(args) != 2:
523                self.log.info("2 Arguments required: [Offset] [Size]")
524                return
525            offset = int(args[0])
526            size = args[1]
527            write_value = []
528            for i in range(int(size)):
529                write_value.append(i % 256)
530            services = self.pri_dut.gattc_lib.listServices(
531                self.unique_mac_addr_id)
532            for service in services['result']:
533                service_id = service['id']
534                service_uuid = service['uuid_type']
535                self.pri_dut.gattc_lib.connectToService(
536                    self.unique_mac_addr_id, service_id)
537                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
538                print("Writing descs in service uuid: {}".format(service_uuid))
539
540                for char in chars['result']:
541                    char_id = char['id']
542                    char_uuid = char['uuid_type']
543                    descriptors = char['descriptors']
544                    print("  Reading descs in char uuid: {}".format(char_uuid))
545                    for desc in descriptors:
546                        desc_id = desc["id"]
547                        desc_uuid = desc["uuid_type"]
548                    try:
549                        write_val = self.pri_dut.gattc_lib.writeDescriptorById(
550                            desc_id, offset, write_value)
551                        print("    Descriptor uuid / Result: {} / {}".format(
552                            desc_uuid, write_val['result']))
553                    except Exception as err:
554                        pass
555        except Exception as err:
556            self.log.error(FAILURE.format(cmd, err))
557
558    def do_gattc_read_all_long_desc(self, line):
559        """
560        Description: Read all long Characteristic Descriptors
561        Assumptions: Basic GATT connection made.
562        Input(s):
563            offset: Required. The offset to start reading from.
564            max_bytes: Required. The max size of bytes to return.
565        Usage:
566          Examples:
567            gattc_read_all_long_desc 0 100
568            gattc_read_all_long_desc 10 20
569        """
570        cmd = "Read all long descriptors from the GATT service."
571        try:
572            args = line.split()
573            if len(args) != 2:
574                self.log.info("2 Arguments required: [Offset] [Size]")
575                return
576            offset = int(args[0])
577            max_bytes = int(args[1])
578            services = self.pri_dut.ble_lib.bleListServices(
579                self.unique_mac_addr_id)
580            for service in services['result']:
581                service_id = service['id']
582                service_uuid = service['uuid_type']
583                self.pri_dut.gattc_lib.connectToService(
584                    self.unique_mac_addr_id, service_id)
585                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
586                print("Reading descs in service uuid: {}".format(service_uuid))
587
588                for char in chars['result']:
589                    char_id = char['id']
590                    char_uuid = char['uuid_type']
591                    descriptors = char['descriptors']
592                    print("  Reading descs in char uuid: {}".format(char_uuid))
593                    for desc in descriptors:
594                        desc_id = desc["id"]
595                        desc_uuid = desc["uuid_type"]
596                    try:
597                        read_val = self.pri_dut.gattc_lib.readLongDescriptorById(
598                            desc_id, offset, max_bytes)
599                        print("    Descriptor uuid / Result: {} / {}".format(
600                            desc_uuid, read_val['result']))
601                    except Exception as err:
602                        pass
603        except Exception as err:
604            self.log.error(FAILURE.format(cmd, err))
605
606    def do_gattc_read_all_long_char(self, line):
607        """
608        Description: Read all long Characteristic
609        Assumptions: Basic GATT connection made.
610        Input(s):
611            offset: Required. The offset to start reading from.
612            max_bytes: Required. The max size of bytes to return.
613        Usage:
614          Examples:
615            gattc_read_all_long_char 0 100
616            gattc_read_all_long_char 10 20
617        """
618        cmd = "Read all long Characteristics from the GATT service."
619        try:
620            args = line.split()
621            if len(args) != 2:
622                self.log.info("2 Arguments required: [Offset] [Size]")
623                return
624            offset = int(args[0])
625            max_bytes = int(args[1])
626            services = self.pri_dut.ble_lib.bleListServices(
627                self.unique_mac_addr_id)
628            for service in services['result']:
629                service_id = service['id']
630                service_uuid = service['uuid_type']
631                self.pri_dut.gattc_lib.connectToService(
632                    self.unique_mac_addr_id, service_id)
633                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
634                print("Reading chars in service uuid: {}".format(service_uuid))
635
636                for char in chars['result']:
637                    char_id = char['id']
638                    char_uuid = char['uuid_type']
639                    try:
640                        read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
641                            char_id, offset, max_bytes)
642                        print("    Char uuid / Result: {} / {}".format(
643                            char_uuid, read_val['result']))
644                    except Exception as err:
645                        pass
646        except Exception as err:
647            self.log.error(FAILURE.format(cmd, err))
648
649    def do_gattc_write_all_chars(self, line):
650        """
651        Description: Write all characteristic values from a GATT server across
652            all services.
653        Assumptions: Basic GATT connection made.
654        Input(s):
655            offset: Required. The offset to start writing on.
656            size: The write value size (value will be generated)
657                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
658        Usage:
659          Examples:
660            gattc_write_all_chars 0 10
661            gattc_write_all_chars 10 1
662        """
663        cmd = "Read all characteristics from the GATT service."
664        try:
665            args = line.split()
666            if len(args) != 2:
667                self.log.info("2 Arguments required: [Offset] [Size]")
668                return
669            offset = int(args[0])
670            size = int(args[1])
671            write_value = []
672            for i in range(size):
673                write_value.append(i % 256)
674            services = self.pri_dut.gattc_lib.listServices(
675                self.unique_mac_addr_id)
676            for service in services['result']:
677                service_id = service['id']
678                service_uuid = service['uuid_type']
679                self.pri_dut.gattc_lib.connectToService(
680                    self.unique_mac_addr_id, service_id)
681                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
682                print("Writing chars in service uuid: {}".format(service_uuid))
683
684                for char in chars['result']:
685                    char_id = char['id']
686                    char_uuid = char['uuid_type']
687                    try:
688                        write_result = self.pri_dut.gattc_lib.writeCharById(
689                            char_id, offset, write_value)
690                        print("  Characteristic uuid write result: {} / {}".
691                              format(char_uuid, write_result['result']))
692                    except Exception as err:
693                        print("error writing char {}".format(err))
694                        pass
695        except Exception as err:
696            self.log.error(FAILURE.format(cmd, err))
697
698    def do_gattc_write_all_chars_without_response(self, line):
699        """
700        Description: Write all characteristic values from a GATT server across
701            all services.
702        Assumptions: Basic GATT connection made.
703        Input(s):
704            size: The write value size (value will be generated).
705                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
706        Usage:
707          Examples:
708            gattc_write_all_chars_without_response 100
709        """
710        cmd = "Read all characteristics from the GATT service."
711        try:
712            args = line.split()
713            if len(args) != 1:
714                self.log.info("1 Arguments required: [Size]")
715                return
716            size = int(args[0])
717            write_value = []
718            for i in range(size):
719                write_value.append(i % 256)
720            services = self.pri_dut.gattc_lib.listServices(
721                self.unique_mac_addr_id)
722            for service in services['result']:
723                service_id = service['id']
724                service_uuid = service['uuid_type']
725                self.pri_dut.gattc_lib.connectToService(
726                    self.unique_mac_addr_id, service_id)
727                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
728                print("Reading chars in service uuid: {}".format(service_uuid))
729
730                for char in chars['result']:
731                    char_id = char['id']
732                    char_uuid = char['uuid_type']
733                    try:
734                        write_result = \
735                            self.pri_dut.gattc_lib.writeCharByIdWithoutResponse(
736                                char_id, write_value)
737                        print("  Characteristic uuid write result: {} / {}".
738                              format(char_uuid, write_result['result']))
739                    except Exception as err:
740                        pass
741        except Exception as err:
742            self.log.error(FAILURE.format(cmd, err))
743
744    def do_gattc_write_char_by_id(self, line):
745        """
746        Description: Write char by characteristic id reference.
747        Assumptions: Already connected to a GATT server service.
748        Input(s):
749            characteristic_id: The characteristic id reference on the GATT
750                service
751            offset: The offset value to use
752            size: Function will generate random bytes by input size.
753                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
754        Usage:
755          Examples:
756            gattc_write_char_by_id char_id 0 5
757            gattc_write_char_by_id char_id 20 1
758        """
759        cmd = "Write to GATT server characteristic ."
760        try:
761            args = line.split()
762            if len(args) != 3:
763                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
764                return
765            id = int(args[0], 16)
766            offset = int(args[1])
767            size = int(args[2])
768            write_value = []
769            for i in range(size):
770                write_value.append(i % 256)
771            self.test_dut.gatt_client_write_characteristic_by_handle(
772                self.unique_mac_addr_id, id, offset, write_value)
773        except Exception as err:
774            self.log.error(FAILURE.format(cmd, err))
775
776    def do_gattc_write_char_by_id_without_response(self, line):
777        """
778        Description: Write char by characteristic id reference without response.
779        Assumptions: Already connected to a GATT server service.
780        Input(s):
781            characteristic_id: The characteristic id reference on the GATT
782                service
783            size: Function will generate random bytes by input size.
784                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
785        Usage:
786          Examples:
787            gattc_write_char_by_id_without_response char_id 5
788        """
789        cmd = "Write characteristic by id without response."
790        try:
791            args = line.split()
792            if len(args) != 2:
793                self.log.info("2 Arguments required: [Id] [Size]")
794                return
795            id = int(args[0], 16)
796            size = args[1]
797            write_value = []
798            for i in range(int(size)):
799                write_value.append(i % 256)
800            self.test_dut.gatt_client_write_characteristic_without_response_by_handle(
801                self.unique_mac_addr_id, id, write_value)
802        except Exception as err:
803            self.log.error(FAILURE.format(cmd, err))
804
805    def do_gattc_enable_notify_char_by_id(self, line):
806        """
807        Description: Enable Characteristic notification on Characteristic ID.
808        Assumptions: Already connected to a GATT server service.
809        Input(s):
810            characteristic_id: The characteristic id reference on the GATT
811                service
812        Usage:
813          Examples:
814            gattc_enable_notify_char_by_id char_id
815        """
816        cmd = "Enable notifications by Characteristic id."
817        try:
818            id = int(line, 16)
819            self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle(
820                self.unique_mac_addr_id, id)
821        except Exception as err:
822            self.log.error(FAILURE.format(cmd, err))
823
824    def do_gattc_disable_notify_char_by_id(self, line):
825        """
826        Description: Disable Characteristic notification on Characteristic ID.
827        Assumptions: Already connected to a GATT server service.
828        Input(s):
829            characteristic_id: The characteristic id reference on the GATT
830                service
831        Usage:
832          Examples:
833            gattc_disable_notify_char_by_id char_id
834        """
835        cmd = "Disable notify Characteristic by id."
836        try:
837            id = int(line, 16)
838            self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle(
839                self.unique_mac_addr_id, id)
840        except Exception as err:
841            self.log.error(FAILURE.format(cmd, err))
842
843    def do_gattc_read_char_by_id(self, line):
844        """
845        Description: Read Characteristic by ID.
846        Assumptions: Already connected to a GATT server service.
847        Input(s):
848            characteristic_id: The characteristic id reference on the GATT
849                service
850        Usage:
851          Examples:
852            gattc_read_char_by_id char_id
853        """
854        cmd = "Read Characteristic value by ID."
855        try:
856            id = int(line, 16)
857            read_val = self.test_dut.gatt_client_read_characteristic_by_handle(
858                self.unique_mac_addr_id, id)
859            self.log.info("Characteristic Value with id {}: {}".format(
860                id, read_val))
861        except Exception as err:
862            self.log.error(FAILURE.format(cmd, err))
863
864    def do_gattc_write_desc_by_id(self, line):
865        """
866        Description: Write Descriptor by characteristic id reference.
867        Assumptions: Already connected to a GATT server service.
868        Input(s):
869            descriptor_id: The Descriptor id reference on the GATT service
870            offset: The offset value to use
871            size: Function will generate random bytes by input size.
872                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
873        Usage:
874          Examples:
875            gattc_write_desc_by_id desc_id 0 5
876            gattc_write_desc_by_id desc_id 20 1
877        """
878        cmd = "Write Descriptor by id."
879        try:
880            args = line.split()
881            id = int(args[0], 16)
882            offset = int(args[1])
883            size = args[2]
884            write_value = []
885            for i in range(int(size)):
886                write_value.append(i % 256)
887            write_result = self.test_dut.gatt_client_write_descriptor_by_handle(
888                self.unique_mac_addr_id, id, offset, write_value)
889            self.log.info("Descriptor Write result {}: {}".format(
890                id, write_result))
891        except Exception as err:
892            self.log.error(FAILURE.format(cmd, err))
893
894    def do_gattc_read_desc_by_id(self, line):
895        """
896        Description: Read Descriptor by ID.
897        Assumptions: Already connected to a GATT server service.
898        Input(s):
899            descriptor_id: The Descriptor id reference on the GATT service
900        Usage:
901          Examples:
902            gattc_read_desc_by_id desc_id
903        """
904        cmd = "Read Descriptor by ID."
905        try:
906            id = int(line, 16)
907            read_val = self.test_dut.gatt_client_read_descriptor_by_handle(
908                self.unique_mac_addr_id, id)
909            self.log.info("Descriptor Value with id {}: {}".format(
910                id, read_val))
911        except Exception as err:
912            self.log.error(FAILURE.format(cmd, err))
913
914    def do_gattc_read_long_char_by_id(self, line):
915        """
916        Description: Read long Characteristic value by id.
917        Assumptions: Already connected to a GATT server service.
918        Input(s):
919            characteristic_id: The characteristic id reference on the GATT
920                service
921            offset: The offset value to use.
922            max_bytes: The max bytes size to return.
923        Usage:
924          Examples:
925            gattc_read_long_char_by_id char_id 0 10
926            gattc_read_long_char_by_id char_id 20 1
927        """
928        cmd = "Read long Characteristic value by id."
929        try:
930            args = line.split()
931            if len(args) != 3:
932                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
933                return
934            id = int(args[0], 16)
935            offset = int(args[1])
936            max_bytes = int(args[2])
937            read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
938                id, offset, max_bytes)
939            self.log.info("Characteristic Value with id {}: {}".format(
940                id, read_val['result']))
941
942        except Exception as err:
943            self.log.error(FAILURE.format(cmd, err))
944
945    """End GATT client wrappers"""
946    """Begin LE scan wrappers"""
947    def _update_scan_results(self, scan_results):
948        self.le_ids = []
949        for scan in scan_results['result']:
950            self.le_ids.append(scan['id'])
951
952    def do_ble_start_scan(self, line):
953        """
954        Description: Perform a BLE scan.
955        Default filter name: ""
956        Optional input: filter_device_name
957        Usage:
958          Examples:
959            ble_start_scan
960            ble_start_scan eddystone
961        """
962        cmd = "Perform a BLE scan and list discovered devices."
963        try:
964            scan_filter = {"name_substring": ""}
965            if line:
966                scan_filter = {"name_substring": line}
967            self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
968        except Exception as err:
969            self.log.error(FAILURE.format(cmd, err))
970
971    def do_ble_stop_scan(self, line):
972        """
973        Description: Stops a BLE scan and returns discovered devices.
974        Usage:
975          Examples:
976            ble_stop_scan
977        """
978        cmd = "Stops a BLE scan and returns discovered devices."
979        try:
980            scan_results = self.pri_dut.gattc_lib.bleStopBleScan()
981            self._update_scan_results(scan_results)
982            self.log.info(pprint.pformat(scan_results))
983        except Exception as err:
984            self.log.error(FAILURE.format(cmd, err))
985
986    def do_ble_get_discovered_devices(self, line):
987        """
988        Description: Get discovered LE devices of an active scan.
989        Usage:
990          Examples:
991            ble_stop_scan
992        """
993        cmd = "Get discovered LE devices of an active scan."
994        try:
995            scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()
996            self._update_scan_results(scan_results)
997            self.log.info(pprint.pformat(scan_results))
998        except Exception as err:
999            self.log.error(FAILURE.format(cmd, err))
1000
1001    """End LE scan wrappers"""
1002    """Begin GATT Server wrappers"""
1003    def do_gatts_close(self, line):
1004        """
1005        Description: Close active GATT server.
1006
1007        Usage:
1008          Examples:
1009            gatts_close
1010        """
1011        cmd = "Close active GATT server."
1012        try:
1013            result = self.pri_dut.gatts_lib.closeServer()
1014            self.log.info(result)
1015        except Exception as err:
1016            self.log.error(FAILURE.format(cmd, err))
1017
1018    def complete_gatts_setup_database(self, text, line, begidx, endidx):
1019        if not text:
1020            completions = list(
1021                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())
1022        else:
1023            completions = [
1024                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
1025                if s.startswith(text)
1026            ]
1027        return completions
1028
1029    def do_gatts_setup_database(self, line):
1030        """
1031        Description: Setup a Gatt server database based on pre-defined inputs.
1032            Supports Tab Autocomplete.
1033        Input(s):
1034            descriptor_db_name: The descriptor db name that matches one in
1035                acts.test_utils.bt.gatt_test_database
1036        Usage:
1037          Examples:
1038            gatts_setup_database LARGE_DB_1
1039        """
1040        cmd = "Setup GATT Server Database Based of pre-defined dictionaries"
1041        try:
1042            scan_results = self.pri_dut.gatts_lib.publishServer(
1043                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
1044            self.log.info(scan_results)
1045        except Exception as err:
1046            self.log.error(FAILURE.format(cmd, err))
1047
1048    """End GATT Server wrappers"""
1049    """Begin Bluetooth Controller wrappers"""
1050    def complete_btc_pair(self, text, line, begidx, endidx):
1051        """ Provides auto-complete for btc_pair cmd.
1052
1053        See Cmd module for full description.
1054        """
1055        arg_completion = len(line.split(" ")) - 1
1056        pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE']
1057        non_bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE']
1058        transport_options = ['BREDR', 'LE']
1059        if arg_completion == 1:
1060            if not text:
1061                completions = pairing_security_level_options
1062            else:
1063                completions = [
1064                    s for s in pairing_security_level_options
1065                    if s.startswith(text)
1066                ]
1067            return completions
1068        if arg_completion == 2:
1069            if not text:
1070                completions = non_bondable_options
1071            else:
1072                completions = [
1073                    s for s in non_bondable_options if s.startswith(text)
1074                ]
1075            return completions
1076        if arg_completion == 3:
1077            if not text:
1078                completions = transport_options
1079            else:
1080                completions = [
1081                    s for s in transport_options if s.startswith(text)
1082                ]
1083            return completions
1084
1085    def do_btc_pair(self, line):
1086        """
1087        Description: Sends an outgoing pairing request.
1088
1089        Input(s):
1090            pairing security level: ENCRYPTED, AUTHENTICATED, or NONE
1091            non_bondable: BONDABLE, NON_BONDABLE, or NONE
1092            transport: BREDR or LE
1093
1094        Usage:
1095          Examples:
1096            btc_pair NONE NONE BREDR
1097            btc_pair ENCRYPTED NONE LE
1098            btc_pair AUTHENTICATED NONE LE
1099            btc_pair NONE NON_BONDABLE BREDR
1100        """
1101        cmd = "Send an outgoing pairing request."
1102        pairing_security_level_mapping = {
1103            "ENCRYPTED": 1,
1104            "AUTHENTICATED": 2,
1105            "NONE": None,
1106        }
1107
1108        non_bondable_mapping = {
1109            "BONDABLE": False,  # Note: Reversed on purpose
1110            "NON_BONDABLE": True,  # Note: Reversed on purpose
1111            "NONE": None,
1112        }
1113
1114        transport_mapping = {
1115            "BREDR": 1,
1116            "LE": 2,
1117        }
1118
1119        try:
1120            options = line.split(" ")
1121            result = self.test_dut.init_pair(
1122                self.unique_mac_addr_id,
1123                pairing_security_level_mapping.get(options[0]),
1124                non_bondable_mapping.get(options[1]),
1125                transport_mapping.get(options[2]),
1126            )
1127            self.log.info(result)
1128        except Exception as err:
1129            self.log.error(FAILURE.format(cmd, err))
1130
1131    def do_btc_accept_pairing(self, line):
1132        """
1133        Description: Accept all incoming pairing requests.
1134
1135        Usage:
1136          Examples:
1137            btc_accept_pairing
1138        """
1139        cmd = "Accept incoming pairing requests"
1140        try:
1141            result = self.pri_dut.btc_lib.acceptPairing()
1142            self.log.info(result)
1143        except Exception as err:
1144            self.log.error(FAILURE.format(cmd, err))
1145
1146    def do_btc_forget_device(self, line):
1147        """
1148        Description: Forget pairing of the current device under test.
1149            Current device under test is the device found by
1150            tool_refresh_unique_id from custom user param. This function
1151            will also perform a clean disconnect if actively connected.
1152
1153        Usage:
1154          Examples:
1155            btc_forget_device
1156        """
1157        cmd = "For pairing of the current device under test."
1158        try:
1159            self.log.info("Forgetting device id: {}".format(
1160                self.unique_mac_addr_id))
1161            result = self.pri_dut.btc_lib.forgetDevice(self.unique_mac_addr_id)
1162            self.log.info(result)
1163        except Exception as err:
1164            self.log.error(FAILURE.format(cmd, err))
1165
1166    def do_btc_set_discoverable(self, discoverable):
1167        """
1168        Description: Change Bluetooth Controller discoverablility.
1169        Input(s):
1170            discoverable: true to set discoverable
1171                          false to set non-discoverable
1172        Usage:
1173          Examples:
1174            btc_set_discoverable true
1175            btc_set_discoverable false
1176        """
1177        cmd = "Change Bluetooth Controller discoverablility."
1178        try:
1179            result = self.test_dut.set_discoverable(
1180                self.str_to_bool(discoverable))
1181            self.log.info(result)
1182        except Exception as err:
1183            self.log.error(FAILURE.format(cmd, err))
1184
1185    def do_btc_set_name(self, name):
1186        """
1187        Description: Change Bluetooth Controller local name.
1188        Input(s):
1189            name: The name to set the Bluetooth Controller name to.
1190
1191        Usage:
1192          Examples:
1193            btc_set_name fs_test
1194        """
1195        cmd = "Change Bluetooth Controller local name."
1196        try:
1197            result = self.test_dut.set_bluetooth_local_name(name)
1198            self.log.info(result)
1199        except Exception as err:
1200            self.log.error(FAILURE.format(cmd, err))
1201
1202    def do_btc_request_discovery(self, discover):
1203        """
1204        Description: Change whether the Bluetooth Controller is in active.
1205            discovery or not.
1206        Input(s):
1207            discover: true to start discovery
1208                      false to end discovery
1209        Usage:
1210          Examples:
1211            btc_request_discovery true
1212            btc_request_discovery false
1213        """
1214        cmd = "Change whether the Bluetooth Controller is in active."
1215        try:
1216            result = self.pri_dut.btc_lib.requestDiscovery(
1217                self.str_to_bool(discover))
1218            self.log.info(result)
1219        except Exception as err:
1220            self.log.error(FAILURE.format(cmd, err))
1221
1222    def do_btc_get_known_remote_devices(self, line):
1223        """
1224        Description: Get a list of known devices.
1225
1226        Usage:
1227          Examples:
1228            btc_get_known_remote_devices
1229        """
1230        cmd = "Get a list of known devices."
1231        self.bt_control_devices = []
1232        try:
1233            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
1234            )['result']
1235            for id_dict in device_list:
1236                device = device_list[id_dict]
1237                self.bt_control_devices.append(device)
1238                self.log.info("Device found {}".format(device))
1239
1240        except Exception as err:
1241            self.log.error(FAILURE.format(cmd, err))
1242
1243    def do_btc_forget_all_known_devices(self, line):
1244        """
1245        Description: Forget all known devices.
1246
1247        Usage:
1248          Examples:
1249            btc_forget_all_known_devices
1250        """
1251        cmd = "Forget all known devices."
1252        try:
1253            device_list = self.pri_dut.btc_lib.getKnownRemoteDevices(
1254            )['result']
1255            for device in device_list:
1256                d = device_list[device]
1257                if d['bonded'] or d['connected']:
1258                    self.log.info("Unbonding deivce: {}".format(d))
1259                    self.log.info(
1260                        self.pri_dut.btc_lib.forgetDevice(d['id'])['result'])
1261        except Exception as err:
1262            self.log.error(FAILURE.format(cmd, err))
1263
1264    def do_btc_connect_device(self, line):
1265        """
1266        Description: Connect to device under test.
1267            Device under test is specified by either user params
1268            or
1269                tool_set_target_device_name <name>
1270                do_tool_refresh_unique_id_using_bt_control
1271
1272        Usage:
1273          Examples:
1274            btc_connect_device
1275        """
1276        cmd = "Connect to device under test."
1277        try:
1278            result = self.pri_dut.btc_lib.connectDevice(
1279                self.unique_mac_addr_id)
1280            self.log.info(result)
1281        except Exception as err:
1282            self.log.error(FAILURE.format(cmd, err))
1283
1284    def complete_btc_connect_device_by_id(self, text, line, begidx, endidx):
1285        if not text:
1286            completions = list(self.bt_control_ids)[:]
1287        else:
1288            completions = [
1289                s for s in self.bt_control_ids if s.startswith(text)
1290            ]
1291        return completions
1292
1293    def do_btc_connect_device_by_id(self, device_id):
1294        """
1295        Description: Connect to device id based on pre-defined inputs.
1296            Supports Tab Autocomplete.
1297        Input(s):
1298            device_id: The device id to connect to.
1299
1300        Usage:
1301          Examples:
1302            btc_connect_device_by_id <device_id>
1303        """
1304        cmd = "Connect to device id based on pre-defined inputs."
1305        try:
1306            result = self.pri_dut.btc_lib.connectDevice(device_id)
1307            self.log.info(result)
1308        except Exception as err:
1309            self.log.error(FAILURE.format(cmd, err))
1310
1311    def complete_btc_connect_device_by_name(self, text, line, begidx, endidx):
1312        if not text:
1313            completions = list(self.bt_control_names)[:]
1314        else:
1315            completions = [
1316                s for s in self.bt_control_names if s.startswith(text)
1317            ]
1318        return completions
1319
1320    def do_btc_connect_device_by_name(self, device_name):
1321        """
1322        Description: Connect to device id based on pre-defined inputs.
1323            Supports Tab Autocomplete.
1324        Input(s):
1325            device_id: The device id to connect to.
1326
1327        Usage:
1328          Examples:
1329            btc_connect_device_by_name <device_id>
1330        """
1331        cmd = "Connect to device name based on pre-defined inputs."
1332        try:
1333            for device in self.bt_control_devices:
1334                if device_name is device['name']:
1335
1336                    result = self.pri_dut.btc_lib.connectDevice(device['id'])
1337                    self.log.info(result)
1338        except Exception as err:
1339            self.log.error(FAILURE.format(cmd, err))
1340
1341    def do_btc_disconnect_device(self, line):
1342        """
1343        Description: Disconnect to device under test.
1344            Device under test is specified by either user params
1345            or
1346                tool_set_target_device_name <name>
1347                do_tool_refresh_unique_id_using_bt_control
1348
1349        Usage:
1350          Examples:
1351            btc_disconnect_device
1352        """
1353        cmd = "Disconnect to device under test."
1354        try:
1355            result = self.pri_dut.btc_lib.disconnectDevice(
1356                self.unique_mac_addr_id)
1357            self.log.info(result)
1358        except Exception as err:
1359            self.log.error(FAILURE.format(cmd, err))
1360
1361    def do_btc_init_bluetooth_control(self, line):
1362        """
1363        Description: Initialize the Bluetooth Controller.
1364
1365        Usage:
1366          Examples:
1367            btc_init_bluetooth_control
1368        """
1369        cmd = "Initialize the Bluetooth Controller."
1370        try:
1371            result = self.test_dut.initialize_bluetooth_controller()
1372            self.log.info(result)
1373        except Exception as err:
1374            self.log.error(FAILURE.format(cmd, err))
1375
1376    def do_btc_get_local_address(self, line):
1377        """
1378        Description: Get the local BR/EDR address of the Bluetooth Controller.
1379
1380        Usage:
1381          Examples:
1382            btc_get_local_address
1383        """
1384        cmd = "Get the local BR/EDR address of the Bluetooth Controller."
1385        try:
1386            result = self.test_dut.get_local_bluetooth_address()
1387            self.log.info(result)
1388        except Exception as err:
1389            self.log.error(FAILURE.format(cmd, err))
1390
1391    def do_btc_input_pairing_pin(self, line):
1392        """
1393        Description: Sends a pairing pin to SL4F's Bluetooth Control's
1394        Pairing Delegate.
1395
1396        Usage:
1397          Examples:
1398            btc_input_pairing_pin 123456
1399        """
1400        cmd = "Input pairing pin to the Fuchsia device."
1401        try:
1402            result = self.pri_dut.btc_lib.inputPairingPin(line)['result']
1403            self.log.info(result)
1404        except Exception as err:
1405            self.log.error(FAILURE.format(cmd, err))
1406
1407    def do_btc_get_pairing_pin(self, line):
1408        """
1409        Description: Gets the pairing pin from SL4F's Bluetooth Control's
1410        Pairing Delegate.
1411
1412        Usage:
1413          Examples:
1414            btc_get_pairing_pin
1415        """
1416        cmd = "Get the pairing pin from the Fuchsia device."
1417        try:
1418            result = self.pri_dut.btc_lib.getPairingPin()['result']
1419            self.log.info(result)
1420        except Exception as err:
1421            self.log.error(FAILURE.format(cmd, err))
1422
1423    """End Bluetooth Control wrappers"""
1424    """Begin Profile Server wrappers"""
1425    def do_sdp_pts_example(self, num_of_records):
1426        """
1427        Description: An example of how to setup a generic SDP record
1428            and SDP search capabilities. This example will pass a few
1429            SDP tests.
1430
1431        Input(s):
1432            num_of_records: The number of records to add.
1433
1434        Usage:
1435          Examples:
1436            sdp_pts_example 1
1437            sdp pts_example 10
1438        """
1439        cmd = "Setup SDP for PTS testing."
1440        record = {
1441            'service_class_uuids': ["0001"],
1442            'protocol_descriptors': [
1443                {
1444                    'protocol':
1445                    int(sig_uuid_constants['AVDTP'], 16),
1446                    'params': [
1447                        {
1448                            'data': 0x0103  # to indicate 1.3
1449                        },
1450                        {
1451                            'data': 0x0105  # to indicate 1.5
1452                        }
1453                    ]
1454                },
1455                {
1456                    'protocol': int(sig_uuid_constants['SDP'], 16),
1457                    'params': [{
1458                        'data': int(sig_uuid_constants['AVDTP'], 16),
1459                    }]
1460                }
1461            ],
1462            'profile_descriptors': [{
1463                'profile_id':
1464                int(sig_uuid_constants['AdvancedAudioDistribution'], 16),
1465                'major_version':
1466                1,
1467                'minor_version':
1468                3,
1469            }],
1470            'additional_protocol_descriptors': [{
1471                'protocol':
1472                int(sig_uuid_constants['L2CAP'], 16),
1473                'params': [
1474                    {
1475                        'data': int(sig_uuid_constants['AVDTP'], 16),
1476                    },
1477                    {
1478                        'data': int(sig_uuid_constants['AVCTP'], 16),
1479                    },
1480                    {
1481                        'data': int(sig_uuid_constants['GenericAudio'], 16),
1482                    },
1483                ]
1484            }],
1485            'information': [{
1486                'language': "en",
1487                'name': "A2DP",
1488                'description': "Advanced Audio Distribution Profile",
1489                'provider': "Fuchsia"
1490            }],
1491            'additional_attributes': [
1492                {
1493                    'id': 0x0200,
1494                    'element': {
1495                        'data': int(sig_uuid_constants['AVDTP'], 16)
1496                    }
1497                },
1498                {
1499                    'id': 0x0201,
1500                    'element': {
1501                        'data': int(sig_uuid_constants['AVDTP'], 16)
1502                    }
1503                },
1504            ]
1505        }
1506
1507        attributes = [
1508            bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'],
1509            bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'],
1510            bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'],
1511            bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'],
1512            bt_attribute_values['ATTR_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST'],
1513            bt_attribute_values['ATTR_SERVICE_RECORD_HANDLE'],
1514        ]
1515
1516        try:
1517            self.pri_dut.sdp_lib.addSearch(
1518                attributes, int(sig_uuid_constants['AudioSource'], 16))
1519            self.pri_dut.sdp_lib.addSearch(
1520                attributes,
1521                int(sig_uuid_constants['AdvancedAudioDistribution'], 16))
1522            for _ in range(int(num_of_records)):
1523                result = self.pri_dut.sdp_lib.addService(record)
1524                self.log.info(result)
1525        except Exception as err:
1526            self.log.error(FAILURE.format(cmd, err))
1527
1528    def do_sdp_cleanup(self, line):
1529        """
1530        Description: Cleanup any existing SDP records
1531
1532        Usage:
1533          Examples:
1534            sdp_cleanup
1535        """
1536        cmd = "Cleanup SDP objects."
1537        try:
1538            result = self.pri_dut.sdp_lib.cleanUp()
1539            self.log.info(result)
1540        except Exception as err:
1541            self.log.error(FAILURE.format(cmd, err))
1542
1543    def do_sdp_init(self, line):
1544        """
1545        Description: Init the profile proxy for setting up SDP records
1546
1547        Usage:
1548          Examples:
1549            sdp_init
1550        """
1551        cmd = "Initialize profile proxy objects for adding SDP records"
1552        try:
1553            result = self.pri_dut.sdp_lib.init()
1554            self.log.info(result)
1555        except Exception as err:
1556            self.log.error(FAILURE.format(cmd, err))
1557
1558    def do_sdp_connect_l2cap(self, psm):
1559        """
1560        Description: Send an l2cap connection request over an input psm value.
1561
1562        Note: Must be already connected to a peer.
1563
1564        Input(s):
1565            psm: The int hex value to connect over. Available PSMs:
1566                SDP 0x0001  See Bluetooth Service Discovery Protocol (SDP)
1567                RFCOMM  0x0003  See RFCOMM with TS 07.10
1568                TCS-BIN 0x0005  See Bluetooth Telephony Control Specification /
1569                    TCS Binary
1570                TCS-BIN-CORDLESS    0x0007  See Bluetooth Telephony Control
1571                    Specification / TCS Binary
1572                BNEP    0x000F  See Bluetooth Network Encapsulation Protocol
1573                HID_Control 0x0011  See Human Interface Device
1574                HID_Interrupt   0x0013  See Human Interface Device
1575                UPnP    0x0015  See [ESDP]
1576                AVCTP   0x0017  See Audio/Video Control Transport Protocol
1577                AVDTP   0x0019  See Audio/Video Distribution Transport Protocol
1578                AVCTP_Browsing  0x001B  See Audio/Video Remote Control Profile
1579                UDI_C-Plane 0x001D  See the Unrestricted Digital Information
1580                    Profile [UDI]
1581                ATT 0x001F  See Bluetooth Core Specification​
1582                ​3DSP   0x0021​ ​​See 3D Synchronization Profile.
1583                ​LE_PSM_IPSP    ​0x0023 ​See Internet Protocol Support Profile
1584                    (IPSP)
1585                OTS 0x0025  See Object Transfer Service (OTS)
1586                EATT    0x0027  See Bluetooth Core Specification
1587
1588        Usage:
1589          Examples:
1590            sdp_connect_l2cap 0001
1591            sdp_connect_l2cap 0015
1592        """
1593        cmd = "Connect l2cap"
1594        try:
1595            result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id,
1596                                                       int(psm, 16))
1597            self.log.info(result)
1598        except Exception as err:
1599            self.log.error(FAILURE.format(cmd, err))
1600
1601    """End Profile Server wrappers"""
1602    """Begin AVDTP wrappers"""
1603    def complete_avdtp_init(self, text, line, begidx, endidx):
1604        roles = ["sink", "source"]
1605        if not text:
1606            completions = roles
1607        else:
1608            completions = [s for s in roles if s.startswith(text)]
1609        return completions
1610
1611    def do_avdtp_init(self, role):
1612        """
1613        Description: Init the AVDTP and A2DP service corresponding to the input
1614        role.
1615
1616        Input(s):
1617            role: The specified role. Either 'source' or 'sink'.
1618
1619        Usage:
1620          Examples:
1621            avdtp_init source
1622            avdtp_init sink
1623        """
1624        cmd = "Initialize AVDTP proxy"
1625        try:
1626            result = self.pri_dut.avdtp_lib.init(role)
1627            self.log.info(result)
1628        except Exception as err:
1629            self.log.error(FAILURE.format(cmd, err))
1630
1631    def do_avdtp_kill_a2dp_sink(self, line):
1632        """
1633        Description: Quickly kill any A2DP sink service currently running on the
1634        device.
1635
1636        Usage:
1637          Examples:
1638            avdtp_kill_a2dp_sink
1639        """
1640        cmd = "Killing A2DP sink"
1641        try:
1642            result = self.pri_dut.control_daemon("bt-a2dp-sink.cmx", "stop")
1643            self.log.info(result)
1644        except Exception as err:
1645            self.log.error(FAILURE.format(cmd, err))
1646
1647    def do_avdtp_kill_a2dp_source(self, line):
1648        """
1649        Description: Quickly kill any A2DP source service currently running on
1650        the device.
1651
1652        Usage:
1653          Examples:
1654            avdtp_kill_a2dp_source
1655        """
1656        cmd = "Killing A2DP source"
1657        try:
1658            result = self.pri_dut.control_daemon("bt-a2dp-source.cmx", "stop")
1659            self.log.info(result)
1660        except Exception as err:
1661            self.log.error(FAILURE.format(cmd, err))
1662
1663    def do_avdtp_get_connected_peers(self, line):
1664        """
1665        Description: Get the connected peers for the AVDTP service
1666
1667        Usage:
1668          Examples:
1669            avdtp_get_connected_peers
1670        """
1671        cmd = "AVDTP get connected peers"
1672        try:
1673            result = self.pri_dut.avdtp_lib.getConnectedPeers()
1674            self.log.info(result)
1675        except Exception as err:
1676            self.log.error(FAILURE.format(cmd, err))
1677
1678    def do_avdtp_set_configuration(self, peer_id):
1679        """
1680        Description: Send AVDTP command to connected peer: set configuration
1681
1682        Input(s):
1683            peer_id: The specified peer_id.
1684
1685        Usage:
1686          Examples:
1687            avdtp_set_configuration <peer_id>
1688        """
1689        cmd = "Send AVDTP set configuration to connected peer"
1690        try:
1691            result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id))
1692            self.log.info(result)
1693        except Exception as err:
1694            self.log.error(FAILURE.format(cmd, err))
1695
1696    def do_avdtp_get_configuration(self, peer_id):
1697        """
1698        Description: Send AVDTP command to connected peer: get configuration
1699
1700        Input(s):
1701            peer_id: The specified peer_id.
1702
1703        Usage:
1704          Examples:
1705            avdtp_get_configuration <peer_id>
1706        """
1707        cmd = "Send AVDTP get configuration to connected peer"
1708        try:
1709            result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id))
1710            self.log.info(result)
1711        except Exception as err:
1712            self.log.error(FAILURE.format(cmd, err))
1713
1714    def do_avdtp_get_capabilities(self, peer_id):
1715        """
1716        Description: Send AVDTP command to connected peer: get capabilities
1717
1718        Input(s):
1719            peer_id: The specified peer_id.
1720
1721        Usage:
1722          Examples:
1723            avdtp_get_capabilities <peer_id>
1724        """
1725        cmd = "Send AVDTP get capabilities to connected peer"
1726        try:
1727            result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id))
1728            self.log.info(result)
1729        except Exception as err:
1730            self.log.error(FAILURE.format(cmd, err))
1731
1732    def do_avdtp_get_all_capabilities(self, peer_id):
1733        """
1734        Description: Send AVDTP command to connected peer: get all capabilities
1735
1736        Input(s):
1737            peer_id: The specified peer_id.
1738
1739        Usage:
1740          Examples:
1741            avdtp_get_all_capabilities <peer_id>
1742        """
1743        cmd = "Send AVDTP get all capabilities to connected peer"
1744        try:
1745            result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id))
1746            self.log.info(result)
1747        except Exception as err:
1748            self.log.error(FAILURE.format(cmd, err))
1749
1750    def do_avdtp_reconfigure_stream(self, peer_id):
1751        """
1752        Description: Send AVDTP command to connected peer: reconfigure stream
1753
1754        Input(s):
1755            peer_id: The specified peer_id.
1756
1757        Usage:
1758          Examples:
1759            avdtp_reconfigure_stream <peer_id>
1760        """
1761        cmd = "Send AVDTP reconfigure stream to connected peer"
1762        try:
1763            result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id))
1764            self.log.info(result)
1765        except Exception as err:
1766            self.log.error(FAILURE.format(cmd, err))
1767
1768    def do_avdtp_suspend_stream(self, peer_id):
1769        """
1770        Description: Send AVDTP command to connected peer: suspend stream
1771
1772        Input(s):
1773            peer_id: The specified peer_id.
1774
1775        Usage:
1776          Examples:
1777            avdtp_suspend_stream <peer_id>
1778        """
1779        cmd = "Send AVDTP suspend stream to connected peer"
1780        try:
1781            result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id))
1782            self.log.info(result)
1783        except Exception as err:
1784            self.log.error(FAILURE.format(cmd, err))
1785
1786    def do_avdtp_suspend_reconfigure(self, peer_id):
1787        """
1788        Description: Send AVDTP command to connected peer: suspend reconfigure
1789
1790        Input(s):
1791            peer_id: The specified peer_id.
1792
1793        Usage:
1794          Examples:
1795            avdtp_suspend_reconfigure <peer_id>
1796        """
1797        cmd = "Send AVDTP suspend reconfigure to connected peer"
1798        try:
1799            result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id))
1800            self.log.info(result)
1801        except Exception as err:
1802            self.log.error(FAILURE.format(cmd, err))
1803
1804    def do_avdtp_release_stream(self, peer_id):
1805        """
1806        Description: Send AVDTP command to connected peer: release stream
1807
1808        Input(s):
1809            peer_id: The specified peer_id.
1810
1811        Usage:
1812          Examples:
1813            avdtp_release_stream <peer_id>
1814        """
1815        cmd = "Send AVDTP release stream to connected peer"
1816        try:
1817            result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id))
1818            self.log.info(result)
1819        except Exception as err:
1820            self.log.error(FAILURE.format(cmd, err))
1821
1822    def do_avdtp_establish_stream(self, peer_id):
1823        """
1824        Description: Send AVDTP command to connected peer: establish stream
1825
1826        Input(s):
1827            peer_id: The specified peer_id.
1828
1829        Usage:
1830          Examples:
1831            avdtp_establish_stream <peer_id>
1832        """
1833        cmd = "Send AVDTP establish stream to connected peer"
1834        try:
1835            result = self.pri_dut.avdtp_lib.establishStream(int(peer_id))
1836            self.log.info(result)
1837        except Exception as err:
1838            self.log.error(FAILURE.format(cmd, err))
1839
1840    def do_avdtp_start_stream(self, peer_id):
1841        """
1842        Description: Send AVDTP command to connected peer: start stream
1843
1844        Input(s):
1845            peer_id: The specified peer_id.
1846
1847        Usage:
1848          Examples:
1849            avdtp_start_stream <peer_id>
1850        """
1851        cmd = "Send AVDTP start stream to connected peer"
1852        try:
1853            result = self.pri_dut.avdtp_lib.startStream(int(peer_id))
1854            self.log.info(result)
1855        except Exception as err:
1856            self.log.error(FAILURE.format(cmd, err))
1857
1858    def do_avdtp_abort_stream(self, peer_id):
1859        """
1860        Description: Send AVDTP command to connected peer: abort stream
1861
1862        Input(s):
1863            peer_id: The specified peer_id.
1864
1865        Usage:
1866          Examples:
1867            avdtp_abort_stream <peer_id>
1868        """
1869        cmd = "Send AVDTP abort stream to connected peer"
1870        try:
1871            result = self.pri_dut.avdtp_lib.abortStream(int(peer_id))
1872            self.log.info(result)
1873        except Exception as err:
1874            self.log.error(FAILURE.format(cmd, err))
1875
1876    def do_avdtp_remove_service(self, line):
1877        """
1878        Description: Removes the AVDTP service in use.
1879
1880        Usage:
1881          Examples:
1882            avdtp_establish_stream <peer_id>
1883        """
1884        cmd = "Remove AVDTP service"
1885        try:
1886            result = self.pri_dut.avdtp_lib.removeService()
1887            self.log.info(result)
1888        except Exception as err:
1889            self.log.error(FAILURE.format(cmd, err))
1890
1891    """End AVDTP wrappers"""
1892