1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises different GATT connection tests.
18
19Original location:
20  tools/test/connectivity/acts_tests/tests/google/ble/gatt/GattConnectTest.py
21"""
22
23import logging
24import time
25from queue import Empty
26
27from blueberry.tests.gd.cert.test_decorators import test_tracker_info
28from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result
29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
30from blueberry.utils.ble_scan_adv_constants import BleAdvertiseSettingsMode
31from blueberry.utils.ble_scan_adv_constants import BleScanSettingsMatchNums
32from blueberry.utils.ble_scan_adv_constants import BleScanSettingsModes
33from blueberry.utils.bt_constants import BluetoothProfile
34from blueberry.utils.bt_gatt_constants import GattCallbackError
35from blueberry.utils.bt_gatt_constants import GattCallbackString
36from blueberry.utils.bt_gatt_constants import GattCharacteristic
37from blueberry.utils.bt_gatt_constants import GattConnectionState
38from blueberry.utils.bt_gatt_constants import GattMtuSize
39from blueberry.utils.bt_gatt_constants import GattPhyMask
40from blueberry.utils.bt_gatt_constants import GattServiceType
41from blueberry.utils.bt_gatt_constants import GattTransport
42from blueberry.utils.bt_gatt_utils import GattTestUtilsError
43from blueberry.utils.bt_gatt_utils import close_gatt_client
44from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection
45from blueberry.utils.bt_gatt_utils import get_mac_address_of_generic_advertisement
46from blueberry.utils.bt_gatt_utils import log_gatt_server_uuids
47from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection
48from blueberry.utils.bt_gatt_utils import setup_gatt_connection
49from blueberry.utils.bt_gatt_utils import setup_multiple_services
50from blueberry.utils.bt_gatt_utils import wait_for_gatt_disconnect_event
51from blueberry.utils.bt_test_utils import clear_bonded_devices
52from blueberry.tests.gd.cert.truth import assertThat
53from mobly import asserts
54from mobly import test_runner
55
56PHYSICAL_DISCONNECT_TIMEOUT = 5
57
58
59class GattConnectTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
60    adv_instances = []
61    bluetooth_gatt_list = []
62    gatt_server_list = []
63    default_timeout = 10
64    default_discovery_timeout = 3
65
66    ADDR_TYPE_PUBLIC = 0
67    ADDR_TYPE_RPA = 1
68    ADDR_TYPE_NRPA = 2
69
70    def setup_class(self):
71        super().setup_class()
72        self.central = self.dut
73        self.peripheral = self.cert
74
75    def setup_test(self):
76        super().setup_test()
77        bluetooth_gatt_list = []
78        self.gatt_server_list = []
79        self.adv_instances = []
80        # Ensure there is ample time for a physical disconnect in between
81        # testcases.
82        logging.info("Waiting for {} seconds for physical GATT disconnections".format(PHYSICAL_DISCONNECT_TIMEOUT))
83        time.sleep(PHYSICAL_DISCONNECT_TIMEOUT)
84
85    def teardown_test(self):
86        for bluetooth_gatt in self.bluetooth_gatt_list:
87            self.central.sl4a.gattClientClose(bluetooth_gatt)
88        for gatt_server in self.gatt_server_list:
89            self.peripheral.sl4a.gattServerClose(gatt_server)
90        for adv in self.adv_instances:
91            self.peripheral.sl4a.bleStopBleAdvertising(adv)
92        super().teardown_test()
93        return True
94
95    def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
96        logging.info("Disconnecting from peripheral device.")
97        try:
98            disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback)
99            logging.info("Disconnected GATT, closing GATT client.")
100            close_gatt_client(self.central, bluetooth_gatt)
101            logging.info("Closed GATT client, removing it from local tracker.")
102            if bluetooth_gatt in self.bluetooth_gatt_list:
103                self.bluetooth_gatt_list.remove(bluetooth_gatt)
104        except GattTestUtilsError as err:
105            logging.error(err)
106            return False
107        return True
108
109    def _find_service_added_event(self, gatt_server_cb, uuid):
110        expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_cb)
111        try:
112            event = self.peripheral.ed.pop_event(expected_event, self.default_timeout)
113        except Empty:
114            logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
115            return False
116        if event['data']['serviceUuid'].lower() != uuid.lower():
117            logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid))
118            return False
119        return True
120
121    def _verify_mtu_changed_on_client_and_server(self, expected_mtu, gatt_callback, gatt_server_callback):
122        expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback)
123        try:
124            mtu_event = self.central.ed.pop_event(expected_event, self.default_timeout)
125            mtu_size_found = mtu_event['data']['MTU']
126            if mtu_size_found != expected_mtu:
127                logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu))
128                return False
129        except Empty:
130            logging.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event))
131            return False
132
133        expected_event = GattCallbackString.MTU_SERV_CHANGED.format(gatt_server_callback)
134        try:
135            mtu_event = self.peripheral.ed.pop_event(expected_event, self.default_timeout)
136            mtu_size_found = mtu_event['data']['MTU']
137            if mtu_size_found != expected_mtu:
138                logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu))
139                return False
140        except Empty:
141            logging.error(GattCallbackError.MTU_SERV_CHANGED_ERR.format(expected_event))
142            return False
143        return True
144
145    @test_tracker_info(uuid='8a3530a3-c8bb-466b-9710-99e694c38618')
146    def test_gatt_connect(self):
147        """Test GATT connection over LE.
148
149          Test establishing a gatt connection between a GATT server and GATT
150          client.
151
152          Steps:
153            1. Start a generic advertisement.
154            2. Start a generic scanner.
155            3. Find the advertisement and extract the mac address.
156            4. Stop the first scanner.
157            5. Create a GATT connection between the scanner and advertiser.
158            6. Disconnect the GATT connection.
159
160          Expected Result:
161            Verify that a connection was established and then disconnected
162            successfully.
163
164          Returns:
165            Pass if True
166            Fail if False
167
168          TAGS: LE, Advertising, Filtering, Scanning, GATT
169          Priority: 0
170          """
171        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
172        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
173        self.gatt_server_list.append(gatt_server)
174        try:
175            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
176            self.bluetooth_gatt_list.append(bluetooth_gatt)
177        except GattTestUtilsError as err:
178            logging.error(err)
179            asserts.fail("Failed to connect to GATT, error: {}".format(err))
180            return
181        self.adv_instances.append(adv_callback)
182        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
183
184    @test_tracker_info(uuid='a839b505-03ac-4783-be7e-1d43129a1948')
185    def test_gatt_connect_stop_advertising(self):
186        """Test GATT connection over LE then stop advertising
187
188          A test case that verifies the GATT connection doesn't
189          disconnect when LE advertisement is stopped.
190
191          Steps:
192            1. Start a generic advertisement.
193            2. Start a generic scanner.
194            3. Find the advertisement and extract the mac address.
195            4. Stop the first scanner.
196            5. Create a GATT connection between the scanner and advertiser.
197            6. Stop the advertiser.
198            7. Verify no connection state changed happened.
199            8. Disconnect the GATT connection.
200
201          Expected Result:
202            Verify that a connection was established and not disconnected
203            when advertisement stops.
204
205          Returns:
206            Pass if True
207            Fail if False
208
209          TAGS: LE, Advertising, Filtering, Scanning, GATT
210          Priority: 0
211          """
212        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
213        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
214        self.gatt_server_list.append(gatt_server)
215        try:
216            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
217            self.bluetooth_gatt_list.append(bluetooth_gatt)
218        except GattTestUtilsError as err:
219            logging.error(err)
220            asserts.fail("Failed to connect to GATT, error: {}".format(err))
221            return
222        self.peripheral.sl4a.bleStopBleAdvertising(adv_callback)
223        try:
224            event = self.central.ed.pop_event(
225                GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback, self.default_timeout))
226            logging.error("Connection event found when not expected: {}".format(event))
227            asserts.fail("Connection event found when not expected: {}".format(event))
228            return
229        except Empty:
230            logging.info("No connection state change as expected")
231        try:
232            self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)
233        except Exception as err:
234            logging.info("Failed to orchestrate disconnect: {}".format(err))
235            asserts.fail("Failed to orchestrate disconnect: {}".format(err))
236            return
237
238    @test_tracker_info(uuid='b82f91a8-54bb-4779-a117-73dc7fdb28cc')
239    def test_gatt_connect_autoconnect(self):
240        """Test GATT connection over LE.
241
242          Test re-establishing a gatt connection using autoconnect
243          set to True in order to test connection allowlist.
244
245          Steps:
246            1. Start a generic advertisement.
247            2. Start a generic scanner.
248            3. Find the advertisement and extract the mac address.
249            4. Stop the first scanner.
250            5. Create a GATT connection between the scanner and advertiser.
251            6. Disconnect the GATT connection.
252            7. Create a GATT connection with autoconnect set to True
253            8. Disconnect the GATT connection.
254
255          Expected Result:
256            Verify that a connection was re-established and then disconnected
257            successfully.
258
259          Returns:
260            Pass if True
261            Fail if False
262
263          TAGS: LE, Advertising, Filtering, Scanning, GATT
264          Priority: 0
265          """
266        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
267        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
268        self.peripheral.log.info("Opened GATT server on CERT, scanning it from DUT")
269        self.gatt_server_list.append(gatt_server)
270        autoconnect = False
271        mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement(
272            self.central, self.peripheral, self.ADDR_TYPE_PUBLIC))
273        self.adv_instances.append(adv_callback)
274        self.central.log.info("Discovered BLE advertisement, connecting GATT with autoConnect={}".format(autoconnect))
275        try:
276            bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect)
277            self.central.log.info("GATT connected, stopping BLE scanning")
278            self.central.sl4a.bleStopBleScan(scan_callback)
279            self.central.log.info("Stopped BLE scanning")
280            self.bluetooth_gatt_list.append(bluetooth_gatt)
281        except GattTestUtilsError as err:
282            logging.error(err)
283            asserts.fail("Failed to connect to GATT, error: {}".format(err))
284            return
285        self.central.log.info("Disconnecting GATT")
286        try:
287            disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback)
288            self.central.log.info("GATT disconnected, closing GATT client")
289            close_gatt_client(self.central, bluetooth_gatt)
290            self.central.log.info("GATT client closed, removing it from in-memory tracker")
291            if bluetooth_gatt in self.bluetooth_gatt_list:
292                self.bluetooth_gatt_list.remove(bluetooth_gatt)
293        except GattTestUtilsError as err:
294            logging.error(err)
295            asserts.fail("Failed to disconnect GATT, error: {}".format(err))
296            return
297        autoconnect = True
298        self.central.log.info("Connecting GATT with autoConnect={}".format(autoconnect))
299        bluetooth_gatt = self.central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect,
300                                                                 GattTransport.TRANSPORT_LE, False,
301                                                                 GattPhyMask.PHY_LE_1M_MASK)
302        self.central.log.info("Waiting for GATt to become connected")
303        self.bluetooth_gatt_list.append(bluetooth_gatt)
304        expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
305        try:
306            event = self.central.ed.pop_event(expected_event, self.default_timeout)
307            self.central.log.info("Received event={}".format(event))
308        except Empty:
309            logging.error(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
310            asserts.fail(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
311            return
312        found_state = event['data']['State']
313        expected_state = GattConnectionState.STATE_CONNECTED
314        assertThat(found_state).isEqualTo(expected_state)
315        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
316
317    @test_tracker_info(uuid='e506fa50-7cd9-4bd8-938a-6b85dcfe6bc6')
318    def test_gatt_connect_opportunistic(self):
319        """Test opportunistic GATT connection over LE.
320
321          Test establishing a gatt connection between a GATT server and GATT
322          client in opportunistic mode.
323
324          Steps:
325            1. Start a generic advertisement.
326            2. Start a generic scanner.
327            3. Find the advertisement and extract the mac address.
328            4. Stop the first scanner.
329            5. Create GATT connection 1 between the scanner and advertiser normally
330            6. Create GATT connection 2 between the scanner and advertiser using
331               opportunistic mode
332            7. Disconnect GATT connection 1
333
334          Expected Result:
335            Verify GATT connection 2 automatically disconnects when GATT connection
336            1 disconnect
337
338          Returns:
339            Pass if True
340            Fail if False
341
342          TAGS: LE, Advertising, Filtering, Scanning, GATT
343          Priority: 0
344          """
345        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
346        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
347        self.gatt_server_list.append(gatt_server)
348        mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement(
349            self.central, self.peripheral))
350        # Make GATT connection 1
351        try:
352            bluetooth_gatt_1, gatt_callback_1 = setup_gatt_connection(self.central,
353                                                                      mac_address,
354                                                                      False,
355                                                                      transport=GattTransport.TRANSPORT_AUTO,
356                                                                      opportunistic=False)
357            self.central.sl4a.bleStopBleScan(scan_callback)
358            self.adv_instances.append(adv_callback)
359            self.bluetooth_gatt_list.append(bluetooth_gatt_1)
360        except GattTestUtilsError as err:
361            logging.error(err)
362            asserts.fail("Failed to connect to GATT 1, error: {}".format(err))
363            return
364        # Make GATT connection 2
365        try:
366            bluetooth_gatt_2, gatt_callback_2 = setup_gatt_connection(self.central,
367                                                                      mac_address,
368                                                                      False,
369                                                                      transport=GattTransport.TRANSPORT_AUTO,
370                                                                      opportunistic=True)
371            self.bluetooth_gatt_list.append(bluetooth_gatt_2)
372        except GattTestUtilsError as err:
373            logging.error(err)
374            asserts.fail("Failed to connect to GATT 2, error: {}".format(err))
375            return
376        # Disconnect GATT connection 1
377        try:
378            disconnect_gatt_connection(self.central, bluetooth_gatt_1, gatt_callback_1)
379            close_gatt_client(self.central, bluetooth_gatt_1)
380            if bluetooth_gatt_1 in self.bluetooth_gatt_list:
381                self.bluetooth_gatt_list.remove(bluetooth_gatt_1)
382        except GattTestUtilsError as err:
383            logging.error(err)
384            asserts.fail("Failed to disconnect GATT 1, error: {}".format(err))
385            return
386        # Confirm that GATT connection 2 also disconnects
387        wait_for_gatt_disconnect_event(self.central, gatt_callback_2)
388        close_gatt_client(self.central, bluetooth_gatt_2)
389        if bluetooth_gatt_2 in self.bluetooth_gatt_list:
390            self.bluetooth_gatt_list.remove(bluetooth_gatt_2)
391
392    @test_tracker_info(uuid='4416d483-dec3-46cb-8038-4d82620f873a')
393    def test_gatt_request_out_of_bounds_mtu(self):
394        """Test GATT connection over LE and exercise an out of bound MTU size.
395
396          Test establishing a gatt connection between a GATT server and GATT
397          client. Request an MTU size that is the MIN value minus 1.
398
399          Steps:
400            1. Start a generic advertisement.
401            2. Start a generic scanner.
402            3. Find the advertisement and extract the mac address.
403            4. Stop the first scanner.
404            5. Create a GATT connection between the scanner and advertiser.
405            6. From the scanner (client) request MTU size change to the
406            minimum value minus one.
407            7. Find the MTU changed event on the client.
408            8. Disconnect the GATT connection.
409
410          Expected Result:
411            Verify that an MTU changed event was not discovered and that
412            it didn't cause an exception when requesting an out of bounds
413            MTU.
414
415          Returns:
416            Pass if True
417            Fail if False
418
419          TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU
420          Priority: 0
421          """
422        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
423        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
424        self.gatt_server_list.append(gatt_server)
425        try:
426            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
427            self.bluetooth_gatt_list.append(bluetooth_gatt)
428        except GattTestUtilsError as err:
429            logging.error(err)
430            asserts.fail("Failed to connect to GATT, error: {}".format(err))
431            return
432        self.adv_instances.append(adv_callback)
433        unexpected_mtu = GattMtuSize.MIN - 1
434        self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, unexpected_mtu)
435        assertThat(self._verify_mtu_changed_on_client_and_server(unexpected_mtu, gatt_callback,
436                                                                 gatt_server_cb)).isFalse()
437        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
438
439    @test_tracker_info(uuid='31ffb9ca-cc75-43fb-9802-c19f1c5856b6')
440    def test_gatt_connect_trigger_on_read_rssi(self):
441        """Test GATT connection over LE read RSSI.
442
443        Test establishing a gatt connection between a GATT server and GATT
444        client then read the RSSI.
445
446        Steps:
447          1. Start a generic advertisement.
448          2. Start a generic scanner.
449          3. Find the advertisement and extract the mac address.
450          4. Stop the first scanner.
451          5. Create a GATT connection between the scanner and advertiser.
452          6. From the scanner, request to read the RSSI of the advertiser.
453          7. Disconnect the GATT connection.
454
455        Expected Result:
456          Verify that a connection was established and then disconnected
457          successfully. Verify that the RSSI was ready correctly.
458
459        Returns:
460          Pass if True
461          Fail if False
462
463        TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI
464        Priority: 1
465        """
466        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
467        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
468        self.gatt_server_list.append(gatt_server)
469        try:
470            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
471            self.bluetooth_gatt_list.append(bluetooth_gatt)
472        except GattTestUtilsError as err:
473            logging.error(err)
474            asserts.fail("Failed to connect to GATT, error: {}".format(err))
475            return
476        self.adv_instances.append(adv_callback)
477        expected_event = GattCallbackString.RD_REMOTE_RSSI.format(gatt_callback)
478        if self.central.sl4a.gattClientReadRSSI(bluetooth_gatt):
479            try:
480                self.central.ed.pop_event(expected_event, self.default_timeout)
481            except Empty:
482                logging.error(GattCallbackError.RD_REMOTE_RSSI_ERR.format(expected_event))
483        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
484
485    @test_tracker_info(uuid='dee9ef28-b872-428a-821b-cc62f27ba936')
486    def test_gatt_connect_trigger_on_services_discovered(self):
487        """Test GATT connection and discover services of peripheral.
488
489          Test establishing a gatt connection between a GATT server and GATT
490          client the discover all services from the connected device.
491
492          Steps:
493            1. Start a generic advertisement.
494            2. Start a generic scanner.
495            3. Find the advertisement and extract the mac address.
496            4. Stop the first scanner.
497            5. Create a GATT connection between the scanner and advertiser.
498            6. From the scanner (central device), discover services.
499            7. Disconnect the GATT connection.
500
501          Expected Result:
502            Verify that a connection was established and then disconnected
503            successfully. Verify that the service were discovered.
504
505          Returns:
506            Pass if True
507            Fail if False
508
509          TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
510          Priority: 1
511          """
512        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
513        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
514        self.gatt_server_list.append(gatt_server)
515        try:
516            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
517            self.bluetooth_gatt_list.append(bluetooth_gatt)
518        except GattTestUtilsError as err:
519            logging.error(err)
520            asserts.fail("Failed to connect to GATT, error: {}".format(err))
521            return
522        self.adv_instances.append(adv_callback)
523        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
524            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
525            try:
526                event = self.central.ed.pop_event(expected_event, self.default_timeout)
527            except Empty:
528                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
529                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
530                return
531        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
532
533    @test_tracker_info(uuid='01883bdd-0cf8-48fb-bf15-467bbd4f065b')
534    def test_gatt_connect_trigger_on_services_discovered_iterate_attributes(self):
535        """Test GATT connection and iterate peripherals attributes.
536
537        Test establishing a gatt connection between a GATT server and GATT
538        client and iterate over all the characteristics and descriptors of the
539        discovered services.
540
541        Steps:
542          1. Start a generic advertisement.
543          2. Start a generic scanner.
544          3. Find the advertisement and extract the mac address.
545          4. Stop the first scanner.
546          5. Create a GATT connection between the scanner and advertiser.
547          6. From the scanner (central device), discover services.
548          7. Iterate over all the characteristics and descriptors of the
549          discovered features.
550          8. Disconnect the GATT connection.
551
552        Expected Result:
553          Verify that a connection was established and then disconnected
554          successfully. Verify that the services, characteristics, and descriptors
555          were discovered.
556
557        Returns:
558          Pass if True
559          Fail if False
560
561        TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
562        Characteristics, Descriptors
563        Priority: 1
564        """
565        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
566        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
567        self.gatt_server_list.append(gatt_server)
568        try:
569            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
570            self.bluetooth_gatt_list.append(bluetooth_gatt)
571        except GattTestUtilsError as err:
572            logging.error(err)
573            asserts.fail("Failed to connect to GATT, error: {}".format(err))
574            return
575        self.adv_instances.append(adv_callback)
576        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
577            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
578            try:
579                event = self.central.ed.pop_event(expected_event, self.default_timeout)
580                discovered_services_index = event['data']['ServicesIndex']
581            except Empty:
582                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
583                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
584                return
585            log_gatt_server_uuids(self.central, discovered_services_index)
586        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
587
588    @test_tracker_info(uuid='d4277bee-da99-4f48-8a4d-f81b5389da18')
589    def test_gatt_connect_with_service_uuid_variations(self):
590        """Test GATT connection with multiple service uuids.
591
592        Test establishing a gatt connection between a GATT server and GATT
593        client with multiple service uuid variations.
594
595        Steps:
596          1. Start a generic advertisement.
597          2. Start a generic scanner.
598          3. Find the advertisement and extract the mac address.
599          4. Stop the first scanner.
600          5. Create a GATT connection between the scanner and advertiser.
601          6. From the scanner (central device), discover services.
602          7. Verify that all the service uuid variations are found.
603          8. Disconnect the GATT connection.
604
605        Expected Result:
606          Verify that a connection was established and then disconnected
607          successfully. Verify that the service uuid variations are found.
608
609        Returns:
610          Pass if True
611          Fail if False
612
613        TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
614        Priority: 2
615        """
616        try:
617            gatt_server_cb, gatt_server = setup_multiple_services(self.peripheral)
618            self.gatt_server_list.append(gatt_server)
619        except GattTestUtilsError as err:
620            logging.error(err)
621            asserts.fail("Failed to setup GATT service, error: {}".format(err))
622            return
623        try:
624            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
625            self.bluetooth_gatt_list.append(bluetooth_gatt)
626        except GattTestUtilsError as err:
627            logging.error(err)
628            asserts.fail("Failed to connect to GATT, error: {}".format(err))
629            return
630        self.adv_instances.append(adv_callback)
631        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
632            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
633            try:
634                event = self.central.ed.pop_event(expected_event, self.default_timeout)
635            except Empty:
636                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
637                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
638                return
639            discovered_services_index = event['data']['ServicesIndex']
640            log_gatt_server_uuids(self.central, discovered_services_index)
641
642        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
643
644    @test_tracker_info(uuid='7d3442c5-f71f-44ae-bd35-f2569f01b3b8')
645    def test_gatt_connect_in_quick_succession(self):
646        """Test GATT connections multiple times.
647
648        Test establishing a gatt connection between a GATT server and GATT
649        client with multiple iterations.
650
651        Steps:
652          1. Start a generic advertisement.
653          2. Start a generic scanner.
654          3. Find the advertisement and extract the mac address.
655          4. Stop the first scanner.
656          5. Create a GATT connection between the scanner and advertiser.
657          6. Disconnect the GATT connection.
658          7. Repeat steps 5 and 6 twenty times.
659
660        Expected Result:
661          Verify that a connection was established and then disconnected
662          successfully twenty times.
663
664        Returns:
665          Pass if True
666          Fail if False
667
668        TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress
669        Priority: 1
670        """
671        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
672        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
673        self.gatt_server_list.append(gatt_server)
674        mac_address, adv_callback, scan_callback = get_mac_address_of_generic_advertisement(
675            self.central, self.peripheral)
676        autoconnect = False
677        for i in range(100):
678            logging.info("Starting connection iteration {}".format(i + 1))
679            try:
680                bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect)
681                self.central.sl4a.bleStopBleScan(scan_callback)
682            except GattTestUtilsError as err:
683                logging.error(err)
684                asserts.fail("Failed to connect to GATT at iteration {}, error: {}".format(i + 1, err))
685                return
686            test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)
687            if not test_result:
688                logging.info("Failed to disconnect from peripheral device.")
689                asserts.fail("Failed to disconnect from peripheral device.")
690                return
691        self.adv_instances.append(adv_callback)
692
693    @test_tracker_info(uuid='148469d9-7ab0-4c08-b2e9-7e49e88da1fc')
694    def test_gatt_connect_on_path_attack(self):
695        """Test GATT connection with permission write encrypted with on-path attacker prevention
696
697        Test establishing a gatt connection between a GATT server and GATT
698        client while the GATT server's characteristic includes the property
699        write value and the permission write encrypted on-path attacker prevention
700        value. This will prompt LE pairing and then the devices will create a bond.
701
702        Steps:
703          1. Create a GATT server and server callback on the peripheral device.
704          2. Create a unique service and characteristic uuid on the peripheral.
705          3. Create a characteristic on the peripheral with these properties:
706              GattCharacteristic.PROPERTY_WRITE,
707              GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM
708          4. Create a GATT service on the peripheral.
709          5. Add the characteristic to the GATT service.
710          6. Create a GATT connection between your central and peripheral device.
711          7. From the central device, discover the peripheral's services.
712          8. Iterate the services found until you find the unique characteristic
713              created in step 3.
714          9. Once found, write a random but valid value to the characteristic.
715          10. Start pairing helpers on both devices immediately after attempting
716              to write to the characteristic.
717          11. Within 10 seconds of writing the characteristic, there should be
718              a prompt to bond the device from the peripheral. The helpers will
719              handle the UI interaction automatically. (see
720              BluetoothConnectionFacade.java bluetoothStartPairingHelper).
721          12. Verify that the two devices are bonded.
722
723        Expected Result:
724          Verify that a connection was established and the devices are bonded.
725
726        Returns:
727          Pass if True
728          Fail if False
729
730        TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, OnPathAttacker
731        Priority: 1
732        """
733        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
734        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
735        self.gatt_server_list.append(gatt_server)
736        service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
737        test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
738        bonded = False
739        characteristic = self.peripheral.sl4a.gattServerCreateBluetoothGattCharacteristic(
740            test_uuid, GattCharacteristic.PROPERTY_WRITE, GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM)
741        gatt_service = self.peripheral.sl4a.gattServerCreateService(service_uuid, GattServiceType.SERVICE_TYPE_PRIMARY)
742        self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service, characteristic)
743        self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service)
744        assertThat(self._find_service_added_event(gatt_server_cb, service_uuid)).isTrue()
745        bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
746        self.bluetooth_gatt_list.append(bluetooth_gatt)
747        self.adv_instances.append(adv_callback)
748        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
749            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
750            try:
751                event = self.central.ed.pop_event(expected_event, self.default_timeout)
752            except Empty:
753                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
754                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
755                return
756            discovered_services_index = event['data']['ServicesIndex']
757        else:
758            logging.info("Failed to discover services.")
759            asserts.fail("Failed to discover services.")
760            return
761        test_value = [1, 2, 3, 4, 5, 6, 7]
762        services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index)
763        for i in range(services_count):
764            characteristic_uuids = (self.central.sl4a.gattClientGetDiscoveredCharacteristicUuids(
765                discovered_services_index, i))
766            for characteristic_uuid in characteristic_uuids:
767                if characteristic_uuid == test_uuid:
768                    self.central.sl4a.bluetoothStartPairingHelper()
769                    self.peripheral.sl4a.bluetoothStartPairingHelper()
770                    self.central.sl4a.gattClientCharacteristicSetValue(bluetooth_gatt, discovered_services_index, i,
771                                                                       characteristic_uuid, test_value)
772                    self.central.sl4a.gattClientWriteCharacteristic(bluetooth_gatt, discovered_services_index, i,
773                                                                    characteristic_uuid)
774                    start_time = time.time() + self.default_timeout
775                    target_name = self.peripheral.sl4a.bluetoothGetLocalName()
776                    while time.time() < start_time and bonded == False:
777                        bonded_devices = \
778                            self.central.sl4a.bluetoothGetBondedDevices()
779                        for device in bonded_devices:
780                            if ('name' in device.keys() and device['name'] == target_name):
781                                bonded = True
782                                break
783                    bonded = False
784                    target_name = self.central.sl4a.bluetoothGetLocalName()
785                    while time.time() < start_time and bonded == False:
786                        bonded_devices = \
787                            self.peripheral.sl4a.bluetoothGetBondedDevices()
788                        for device in bonded_devices:
789                            if ('name' in device.keys() and device['name'] == target_name):
790                                bonded = True
791                                break
792
793        # Dual mode devices will establish connection over the classic transport,
794        # in order to establish bond over both transports, and do SDP. Starting
795        # disconnection before all this is finished is not safe, might lead to
796        # race conditions, i.e. bond over classic tranport shows up after LE
797        # bond is already removed.
798        time.sleep(4)
799
800        for ad in [self.central, self.peripheral]:
801            assertThat(clear_bonded_devices(ad)).isTrue()
802
803        # Necessary sleep time for entries to update unbonded state
804        time.sleep(2)
805
806        for ad in [self.central, self.peripheral]:
807            bonded_devices = ad.sl4a.bluetoothGetBondedDevices()
808            if len(bonded_devices) > 0:
809                logging.error("Failed to unbond devices: {}".format(bonded_devices))
810                asserts.fail("Failed to unbond devices: {}".format(bonded_devices))
811                return
812        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
813
814    @test_tracker_info(uuid='cc3fc361-7bf1-4ee2-9e46-4a27c88ce6a8')
815    def test_gatt_connect_get_connected_devices(self):
816        """Test GATT connections show up in getConnectedDevices
817
818        Test establishing a gatt connection between a GATT server and GATT
819        client. Verify that active connections show up using
820        BluetoothManager.getConnectedDevices API.
821
822        Steps:
823          1. Start a generic advertisement.
824          2. Start a generic scanner.
825          3. Find the advertisement and extract the mac address.
826          4. Stop the first scanner.
827          5. Create a GATT connection between the scanner and advertiser.
828          7. Verify the GATT Client has an open connection to the GATT Server.
829          8. Verify the GATT Server has an open connection to the GATT Client.
830          9. Disconnect the GATT connection.
831
832        Expected Result:
833          Verify that a connection was established, connected devices are found
834          on both the central and peripheral devices, and then disconnected
835          successfully.
836
837        Returns:
838          Pass if True
839          Fail if False
840
841        TAGS: LE, Advertising, Scanning, GATT
842        Priority: 2
843        """
844        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
845        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
846        self.gatt_server_list.append(gatt_server)
847        try:
848            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
849            self.bluetooth_gatt_list.append(bluetooth_gatt)
850        except GattTestUtilsError as err:
851            logging.error(err)
852            asserts.fail("Failed to connect to GATT, error: {}".format(err))
853            return
854        conn_cen_devices = self.central.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT)
855        conn_per_devices = self.peripheral.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT_SERVER)
856        target_name = self.peripheral.sl4a.bluetoothGetLocalName()
857        error_message = ("Connected device {} not found in list of connected "
858                         "devices {}")
859        if not any(d['name'] == target_name for d in conn_cen_devices):
860            logging.error(error_message.format(target_name, conn_cen_devices))
861            asserts.fail(error_message.format(target_name, conn_cen_devices))
862            return
863        # For the GATT server only check the size of the list since
864        # it may or may not include the device name.
865        target_name = self.central.sl4a.bluetoothGetLocalName()
866        if not conn_per_devices:
867            logging.error(error_message.format(target_name, conn_per_devices))
868            asserts.fail(error_message.format(target_name, conn_per_devices))
869            return
870        self.adv_instances.append(adv_callback)
871        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
872
873    @test_tracker_info(uuid='a0a37ca6-9fa8-4d35-9fdb-0e25b4b8a363')
874    def test_gatt_connect_second_adv_after_canceling_first_adv(self):
875        """Test GATT connection to peripherals second advertising address.
876
877        Test the ability of cancelling GATT connections and trying to reconnect
878        to the same device via a different address.
879
880        Steps:
881          1. A starts advertising
882          2. B starts scanning and finds A's mac address
883          3. Stop advertisement from step 1. Start a new advertisement on A and
884            find the new new mac address, B knows of both old and new address.
885          4. B1 sends connect request to old address of A
886          5. B1 cancel connect attempt after 10 seconds
887          6. B1 sends connect request to new address of A
888          7. Verify B1 establish connection to A in less than 10 seconds
889
890        Expected Result:
891          Verify that a connection was established only on the second
892          advertisement's mac address.
893
894        Returns:
895          Pass if True
896          Fail if False
897
898        TAGS: LE, Advertising, Scanning, GATT
899        Priority: 3
900        """
901        autoconnect = False
902        transport = GattTransport.TRANSPORT_AUTO
903        opportunistic = False
904        # Setup a basic Gatt server on the peripheral
905        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
906        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
907
908        # Set advertisement settings to include local name in advertisement
909        # and set the advertising mode to low_latency.
910        self.peripheral.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
911        self.peripheral.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
912        self.peripheral.sl4a.bleSetAdvertiseSettingsAdvertiseMode(BleAdvertiseSettingsMode.LOW_LATENCY)
913
914        # Setup necessary advertisement objects.
915        advertise_data = self.peripheral.sl4a.bleBuildAdvertiseData()
916        advertise_settings = self.peripheral.sl4a.bleBuildAdvertiseSettings()
917        advertise_callback = self.peripheral.sl4a.bleGenBleAdvertiseCallback()
918
919        # Step 1: Start advertisement
920        self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
921
922        # Setup scan settings for low_latency scanning and to include the local name
923        # of the advertisement started in step 1.
924        filter_list = self.central.sl4a.bleGenFilterList()
925        self.central.sl4a.bleSetScanSettingsNumOfMatches(BleScanSettingsMatchNums.ONE)
926        self.central.sl4a.bleSetScanFilterDeviceName(self.peripheral.sl4a.bluetoothGetLocalName())
927        self.central.sl4a.bleBuildScanFilter(filter_list)
928        self.central.sl4a.bleSetScanSettingsScanMode(BleScanSettingsModes.LOW_LATENCY)
929
930        # Setup necessary scan objects.
931        scan_settings = self.central.sl4a.bleBuildScanSetting()
932        scan_callback = self.central.sl4a.bleGenScanCallback()
933
934        # Step 2: Start scanning on central Android device and find peripheral
935        # address.
936        self.central.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
937        expected_event_name = scan_result.format(scan_callback)
938        try:
939            mac_address_pre_restart = self.central.ed.pop_event(
940                expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address']
941            logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_pre_restart))
942        except Empty:
943            logging.info("Peripheral advertisement not found")
944            asserts.fail("Peripheral advertisement not found")
945            return
946        finally:
947            self.peripheral.sl4a.bleStopBleAdvertising(advertise_callback)
948
949        # Step 3: Restart peripheral advertising such that a new mac address is
950        # created.
951        self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
952
953        mac_address_post_restart = mac_address_pre_restart
954
955        while True:
956            try:
957                mac_address_post_restart = self.central.ed.pop_event(
958                    expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address']
959                logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_post_restart))
960            except Empty:
961                logging.info("Peripheral advertisement not found")
962                asserts.fail("Peripheral advertisement not found")
963                return
964
965            if mac_address_pre_restart != mac_address_post_restart:
966                break
967
968
969if __name__ == '__main__':
970    test_runner.main()
971