1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This test script exercises different GATT connection tests. 18 19Original location: 20 tools/test/connectivity/acts_tests/tests/google/ble/gatt/GattConnectTest.py 21""" 22 23import logging 24import time 25from queue import Empty 26 27from blueberry.tests.gd.cert.test_decorators import test_tracker_info 28from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result 29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test 30from blueberry.utils.ble_scan_adv_constants import BleAdvertiseSettingsMode 31from blueberry.utils.ble_scan_adv_constants import BleScanSettingsMatchNums 32from blueberry.utils.ble_scan_adv_constants import BleScanSettingsModes 33from blueberry.utils.bt_constants import BluetoothProfile 34from blueberry.utils.bt_gatt_constants import GattCallbackError 35from blueberry.utils.bt_gatt_constants import GattCallbackString 36from blueberry.utils.bt_gatt_constants import GattCharacteristic 37from blueberry.utils.bt_gatt_constants import GattConnectionState 38from blueberry.utils.bt_gatt_constants import GattMtuSize 39from blueberry.utils.bt_gatt_constants import GattPhyMask 40from blueberry.utils.bt_gatt_constants import GattServiceType 41from blueberry.utils.bt_gatt_constants import GattTransport 42from blueberry.utils.bt_gatt_utils import GattTestUtilsError 43from blueberry.utils.bt_gatt_utils import close_gatt_client 44from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection 45from blueberry.utils.bt_gatt_utils import get_mac_address_of_generic_advertisement 46from blueberry.utils.bt_gatt_utils import log_gatt_server_uuids 47from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection 48from blueberry.utils.bt_gatt_utils import setup_gatt_connection 49from blueberry.utils.bt_gatt_utils import setup_multiple_services 50from blueberry.utils.bt_gatt_utils import wait_for_gatt_disconnect_event 51from blueberry.utils.bt_test_utils import clear_bonded_devices 52from blueberry.tests.gd.cert.truth import assertThat 53from mobly import asserts 54from mobly import test_runner 55 56PHYSICAL_DISCONNECT_TIMEOUT = 5 57 58 59class GattConnectTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass): 60 adv_instances = [] 61 bluetooth_gatt_list = [] 62 gatt_server_list = [] 63 default_timeout = 10 64 default_discovery_timeout = 3 65 66 ADDR_TYPE_PUBLIC = 0 67 ADDR_TYPE_RPA = 1 68 ADDR_TYPE_NRPA = 2 69 70 def setup_class(self): 71 super().setup_class() 72 self.central = self.dut 73 self.peripheral = self.cert 74 75 def setup_test(self): 76 super().setup_test() 77 bluetooth_gatt_list = [] 78 self.gatt_server_list = [] 79 self.adv_instances = [] 80 # Ensure there is ample time for a physical disconnect in between 81 # testcases. 82 logging.info("Waiting for {} seconds for physical GATT disconnections".format(PHYSICAL_DISCONNECT_TIMEOUT)) 83 time.sleep(PHYSICAL_DISCONNECT_TIMEOUT) 84 85 def teardown_test(self): 86 for bluetooth_gatt in self.bluetooth_gatt_list: 87 self.central.sl4a.gattClientClose(bluetooth_gatt) 88 for gatt_server in self.gatt_server_list: 89 self.peripheral.sl4a.gattServerClose(gatt_server) 90 for adv in self.adv_instances: 91 self.peripheral.sl4a.bleStopBleAdvertising(adv) 92 super().teardown_test() 93 return True 94 95 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 96 logging.info("Disconnecting from peripheral device.") 97 try: 98 disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback) 99 logging.info("Disconnected GATT, closing GATT client.") 100 close_gatt_client(self.central, bluetooth_gatt) 101 logging.info("Closed GATT client, removing it from local tracker.") 102 if bluetooth_gatt in self.bluetooth_gatt_list: 103 self.bluetooth_gatt_list.remove(bluetooth_gatt) 104 except GattTestUtilsError as err: 105 logging.error(err) 106 return False 107 return True 108 109 def _find_service_added_event(self, gatt_server_cb, uuid): 110 expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_cb) 111 try: 112 event = self.peripheral.ed.pop_event(expected_event, self.default_timeout) 113 except Empty: 114 logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 115 return False 116 if event['data']['serviceUuid'].lower() != uuid.lower(): 117 logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid)) 118 return False 119 return True 120 121 def _verify_mtu_changed_on_client_and_server(self, expected_mtu, gatt_callback, gatt_server_callback): 122 expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback) 123 try: 124 mtu_event = self.central.ed.pop_event(expected_event, self.default_timeout) 125 mtu_size_found = mtu_event['data']['MTU'] 126 if mtu_size_found != expected_mtu: 127 logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu)) 128 return False 129 except Empty: 130 logging.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event)) 131 return False 132 133 expected_event = GattCallbackString.MTU_SERV_CHANGED.format(gatt_server_callback) 134 try: 135 mtu_event = self.peripheral.ed.pop_event(expected_event, self.default_timeout) 136 mtu_size_found = mtu_event['data']['MTU'] 137 if mtu_size_found != expected_mtu: 138 logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu)) 139 return False 140 except Empty: 141 logging.error(GattCallbackError.MTU_SERV_CHANGED_ERR.format(expected_event)) 142 return False 143 return True 144 145 @test_tracker_info(uuid='8a3530a3-c8bb-466b-9710-99e694c38618') 146 def test_gatt_connect(self): 147 """Test GATT connection over LE. 148 149 Test establishing a gatt connection between a GATT server and GATT 150 client. 151 152 Steps: 153 1. Start a generic advertisement. 154 2. Start a generic scanner. 155 3. Find the advertisement and extract the mac address. 156 4. Stop the first scanner. 157 5. Create a GATT connection between the scanner and advertiser. 158 6. Disconnect the GATT connection. 159 160 Expected Result: 161 Verify that a connection was established and then disconnected 162 successfully. 163 164 Returns: 165 Pass if True 166 Fail if False 167 168 TAGS: LE, Advertising, Filtering, Scanning, GATT 169 Priority: 0 170 """ 171 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 172 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 173 self.gatt_server_list.append(gatt_server) 174 try: 175 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 176 self.bluetooth_gatt_list.append(bluetooth_gatt) 177 except GattTestUtilsError as err: 178 logging.error(err) 179 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 180 return 181 self.adv_instances.append(adv_callback) 182 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 183 184 @test_tracker_info(uuid='a839b505-03ac-4783-be7e-1d43129a1948') 185 def test_gatt_connect_stop_advertising(self): 186 """Test GATT connection over LE then stop advertising 187 188 A test case that verifies the GATT connection doesn't 189 disconnect when LE advertisement is stopped. 190 191 Steps: 192 1. Start a generic advertisement. 193 2. Start a generic scanner. 194 3. Find the advertisement and extract the mac address. 195 4. Stop the first scanner. 196 5. Create a GATT connection between the scanner and advertiser. 197 6. Stop the advertiser. 198 7. Verify no connection state changed happened. 199 8. Disconnect the GATT connection. 200 201 Expected Result: 202 Verify that a connection was established and not disconnected 203 when advertisement stops. 204 205 Returns: 206 Pass if True 207 Fail if False 208 209 TAGS: LE, Advertising, Filtering, Scanning, GATT 210 Priority: 0 211 """ 212 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 213 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 214 self.gatt_server_list.append(gatt_server) 215 try: 216 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 217 self.bluetooth_gatt_list.append(bluetooth_gatt) 218 except GattTestUtilsError as err: 219 logging.error(err) 220 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 221 return 222 self.peripheral.sl4a.bleStopBleAdvertising(adv_callback) 223 try: 224 event = self.central.ed.pop_event( 225 GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback, self.default_timeout)) 226 logging.error("Connection event found when not expected: {}".format(event)) 227 asserts.fail("Connection event found when not expected: {}".format(event)) 228 return 229 except Empty: 230 logging.info("No connection state change as expected") 231 try: 232 self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) 233 except Exception as err: 234 logging.info("Failed to orchestrate disconnect: {}".format(err)) 235 asserts.fail("Failed to orchestrate disconnect: {}".format(err)) 236 return 237 238 @test_tracker_info(uuid='b82f91a8-54bb-4779-a117-73dc7fdb28cc') 239 def test_gatt_connect_autoconnect(self): 240 """Test GATT connection over LE. 241 242 Test re-establishing a gatt connection using autoconnect 243 set to True in order to test connection allowlist. 244 245 Steps: 246 1. Start a generic advertisement. 247 2. Start a generic scanner. 248 3. Find the advertisement and extract the mac address. 249 4. Stop the first scanner. 250 5. Create a GATT connection between the scanner and advertiser. 251 6. Disconnect the GATT connection. 252 7. Create a GATT connection with autoconnect set to True 253 8. Disconnect the GATT connection. 254 255 Expected Result: 256 Verify that a connection was re-established and then disconnected 257 successfully. 258 259 Returns: 260 Pass if True 261 Fail if False 262 263 TAGS: LE, Advertising, Filtering, Scanning, GATT 264 Priority: 0 265 """ 266 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 267 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 268 self.peripheral.log.info("Opened GATT server on CERT, scanning it from DUT") 269 self.gatt_server_list.append(gatt_server) 270 autoconnect = False 271 mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( 272 self.central, self.peripheral, self.ADDR_TYPE_PUBLIC)) 273 self.adv_instances.append(adv_callback) 274 self.central.log.info("Discovered BLE advertisement, connecting GATT with autoConnect={}".format(autoconnect)) 275 try: 276 bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect) 277 self.central.log.info("GATT connected, stopping BLE scanning") 278 self.central.sl4a.bleStopBleScan(scan_callback) 279 self.central.log.info("Stopped BLE scanning") 280 self.bluetooth_gatt_list.append(bluetooth_gatt) 281 except GattTestUtilsError as err: 282 logging.error(err) 283 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 284 return 285 self.central.log.info("Disconnecting GATT") 286 try: 287 disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback) 288 self.central.log.info("GATT disconnected, closing GATT client") 289 close_gatt_client(self.central, bluetooth_gatt) 290 self.central.log.info("GATT client closed, removing it from in-memory tracker") 291 if bluetooth_gatt in self.bluetooth_gatt_list: 292 self.bluetooth_gatt_list.remove(bluetooth_gatt) 293 except GattTestUtilsError as err: 294 logging.error(err) 295 asserts.fail("Failed to disconnect GATT, error: {}".format(err)) 296 return 297 autoconnect = True 298 self.central.log.info("Connecting GATT with autoConnect={}".format(autoconnect)) 299 bluetooth_gatt = self.central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, 300 GattTransport.TRANSPORT_LE, False, 301 GattPhyMask.PHY_LE_1M_MASK) 302 self.central.log.info("Waiting for GATt to become connected") 303 self.bluetooth_gatt_list.append(bluetooth_gatt) 304 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 305 try: 306 event = self.central.ed.pop_event(expected_event, self.default_timeout) 307 self.central.log.info("Received event={}".format(event)) 308 except Empty: 309 logging.error(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 310 asserts.fail(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 311 return 312 found_state = event['data']['State'] 313 expected_state = GattConnectionState.STATE_CONNECTED 314 assertThat(found_state).isEqualTo(expected_state) 315 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 316 317 @test_tracker_info(uuid='e506fa50-7cd9-4bd8-938a-6b85dcfe6bc6') 318 def test_gatt_connect_opportunistic(self): 319 """Test opportunistic GATT connection over LE. 320 321 Test establishing a gatt connection between a GATT server and GATT 322 client in opportunistic mode. 323 324 Steps: 325 1. Start a generic advertisement. 326 2. Start a generic scanner. 327 3. Find the advertisement and extract the mac address. 328 4. Stop the first scanner. 329 5. Create GATT connection 1 between the scanner and advertiser normally 330 6. Create GATT connection 2 between the scanner and advertiser using 331 opportunistic mode 332 7. Disconnect GATT connection 1 333 334 Expected Result: 335 Verify GATT connection 2 automatically disconnects when GATT connection 336 1 disconnect 337 338 Returns: 339 Pass if True 340 Fail if False 341 342 TAGS: LE, Advertising, Filtering, Scanning, GATT 343 Priority: 0 344 """ 345 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 346 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 347 self.gatt_server_list.append(gatt_server) 348 mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( 349 self.central, self.peripheral)) 350 # Make GATT connection 1 351 try: 352 bluetooth_gatt_1, gatt_callback_1 = setup_gatt_connection(self.central, 353 mac_address, 354 False, 355 transport=GattTransport.TRANSPORT_AUTO, 356 opportunistic=False) 357 self.central.sl4a.bleStopBleScan(scan_callback) 358 self.adv_instances.append(adv_callback) 359 self.bluetooth_gatt_list.append(bluetooth_gatt_1) 360 except GattTestUtilsError as err: 361 logging.error(err) 362 asserts.fail("Failed to connect to GATT 1, error: {}".format(err)) 363 return 364 # Make GATT connection 2 365 try: 366 bluetooth_gatt_2, gatt_callback_2 = setup_gatt_connection(self.central, 367 mac_address, 368 False, 369 transport=GattTransport.TRANSPORT_AUTO, 370 opportunistic=True) 371 self.bluetooth_gatt_list.append(bluetooth_gatt_2) 372 except GattTestUtilsError as err: 373 logging.error(err) 374 asserts.fail("Failed to connect to GATT 2, error: {}".format(err)) 375 return 376 # Disconnect GATT connection 1 377 try: 378 disconnect_gatt_connection(self.central, bluetooth_gatt_1, gatt_callback_1) 379 close_gatt_client(self.central, bluetooth_gatt_1) 380 if bluetooth_gatt_1 in self.bluetooth_gatt_list: 381 self.bluetooth_gatt_list.remove(bluetooth_gatt_1) 382 except GattTestUtilsError as err: 383 logging.error(err) 384 asserts.fail("Failed to disconnect GATT 1, error: {}".format(err)) 385 return 386 # Confirm that GATT connection 2 also disconnects 387 wait_for_gatt_disconnect_event(self.central, gatt_callback_2) 388 close_gatt_client(self.central, bluetooth_gatt_2) 389 if bluetooth_gatt_2 in self.bluetooth_gatt_list: 390 self.bluetooth_gatt_list.remove(bluetooth_gatt_2) 391 392 @test_tracker_info(uuid='4416d483-dec3-46cb-8038-4d82620f873a') 393 def test_gatt_request_out_of_bounds_mtu(self): 394 """Test GATT connection over LE and exercise an out of bound MTU size. 395 396 Test establishing a gatt connection between a GATT server and GATT 397 client. Request an MTU size that is the MIN value minus 1. 398 399 Steps: 400 1. Start a generic advertisement. 401 2. Start a generic scanner. 402 3. Find the advertisement and extract the mac address. 403 4. Stop the first scanner. 404 5. Create a GATT connection between the scanner and advertiser. 405 6. From the scanner (client) request MTU size change to the 406 minimum value minus one. 407 7. Find the MTU changed event on the client. 408 8. Disconnect the GATT connection. 409 410 Expected Result: 411 Verify that an MTU changed event was not discovered and that 412 it didn't cause an exception when requesting an out of bounds 413 MTU. 414 415 Returns: 416 Pass if True 417 Fail if False 418 419 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 420 Priority: 0 421 """ 422 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 423 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 424 self.gatt_server_list.append(gatt_server) 425 try: 426 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 427 self.bluetooth_gatt_list.append(bluetooth_gatt) 428 except GattTestUtilsError as err: 429 logging.error(err) 430 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 431 return 432 self.adv_instances.append(adv_callback) 433 unexpected_mtu = GattMtuSize.MIN - 1 434 self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, unexpected_mtu) 435 assertThat(self._verify_mtu_changed_on_client_and_server(unexpected_mtu, gatt_callback, 436 gatt_server_cb)).isFalse() 437 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 438 439 @test_tracker_info(uuid='31ffb9ca-cc75-43fb-9802-c19f1c5856b6') 440 def test_gatt_connect_trigger_on_read_rssi(self): 441 """Test GATT connection over LE read RSSI. 442 443 Test establishing a gatt connection between a GATT server and GATT 444 client then read the RSSI. 445 446 Steps: 447 1. Start a generic advertisement. 448 2. Start a generic scanner. 449 3. Find the advertisement and extract the mac address. 450 4. Stop the first scanner. 451 5. Create a GATT connection between the scanner and advertiser. 452 6. From the scanner, request to read the RSSI of the advertiser. 453 7. Disconnect the GATT connection. 454 455 Expected Result: 456 Verify that a connection was established and then disconnected 457 successfully. Verify that the RSSI was ready correctly. 458 459 Returns: 460 Pass if True 461 Fail if False 462 463 TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI 464 Priority: 1 465 """ 466 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 467 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 468 self.gatt_server_list.append(gatt_server) 469 try: 470 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 471 self.bluetooth_gatt_list.append(bluetooth_gatt) 472 except GattTestUtilsError as err: 473 logging.error(err) 474 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 475 return 476 self.adv_instances.append(adv_callback) 477 expected_event = GattCallbackString.RD_REMOTE_RSSI.format(gatt_callback) 478 if self.central.sl4a.gattClientReadRSSI(bluetooth_gatt): 479 try: 480 self.central.ed.pop_event(expected_event, self.default_timeout) 481 except Empty: 482 logging.error(GattCallbackError.RD_REMOTE_RSSI_ERR.format(expected_event)) 483 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 484 485 @test_tracker_info(uuid='dee9ef28-b872-428a-821b-cc62f27ba936') 486 def test_gatt_connect_trigger_on_services_discovered(self): 487 """Test GATT connection and discover services of peripheral. 488 489 Test establishing a gatt connection between a GATT server and GATT 490 client the discover all services from the connected device. 491 492 Steps: 493 1. Start a generic advertisement. 494 2. Start a generic scanner. 495 3. Find the advertisement and extract the mac address. 496 4. Stop the first scanner. 497 5. Create a GATT connection between the scanner and advertiser. 498 6. From the scanner (central device), discover services. 499 7. Disconnect the GATT connection. 500 501 Expected Result: 502 Verify that a connection was established and then disconnected 503 successfully. Verify that the service were discovered. 504 505 Returns: 506 Pass if True 507 Fail if False 508 509 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 510 Priority: 1 511 """ 512 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 513 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 514 self.gatt_server_list.append(gatt_server) 515 try: 516 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 517 self.bluetooth_gatt_list.append(bluetooth_gatt) 518 except GattTestUtilsError as err: 519 logging.error(err) 520 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 521 return 522 self.adv_instances.append(adv_callback) 523 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 524 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 525 try: 526 event = self.central.ed.pop_event(expected_event, self.default_timeout) 527 except Empty: 528 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 529 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 530 return 531 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 532 533 @test_tracker_info(uuid='01883bdd-0cf8-48fb-bf15-467bbd4f065b') 534 def test_gatt_connect_trigger_on_services_discovered_iterate_attributes(self): 535 """Test GATT connection and iterate peripherals attributes. 536 537 Test establishing a gatt connection between a GATT server and GATT 538 client and iterate over all the characteristics and descriptors of the 539 discovered services. 540 541 Steps: 542 1. Start a generic advertisement. 543 2. Start a generic scanner. 544 3. Find the advertisement and extract the mac address. 545 4. Stop the first scanner. 546 5. Create a GATT connection between the scanner and advertiser. 547 6. From the scanner (central device), discover services. 548 7. Iterate over all the characteristics and descriptors of the 549 discovered features. 550 8. Disconnect the GATT connection. 551 552 Expected Result: 553 Verify that a connection was established and then disconnected 554 successfully. Verify that the services, characteristics, and descriptors 555 were discovered. 556 557 Returns: 558 Pass if True 559 Fail if False 560 561 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 562 Characteristics, Descriptors 563 Priority: 1 564 """ 565 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 566 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 567 self.gatt_server_list.append(gatt_server) 568 try: 569 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 570 self.bluetooth_gatt_list.append(bluetooth_gatt) 571 except GattTestUtilsError as err: 572 logging.error(err) 573 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 574 return 575 self.adv_instances.append(adv_callback) 576 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 577 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 578 try: 579 event = self.central.ed.pop_event(expected_event, self.default_timeout) 580 discovered_services_index = event['data']['ServicesIndex'] 581 except Empty: 582 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 583 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 584 return 585 log_gatt_server_uuids(self.central, discovered_services_index) 586 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 587 588 @test_tracker_info(uuid='d4277bee-da99-4f48-8a4d-f81b5389da18') 589 def test_gatt_connect_with_service_uuid_variations(self): 590 """Test GATT connection with multiple service uuids. 591 592 Test establishing a gatt connection between a GATT server and GATT 593 client with multiple service uuid variations. 594 595 Steps: 596 1. Start a generic advertisement. 597 2. Start a generic scanner. 598 3. Find the advertisement and extract the mac address. 599 4. Stop the first scanner. 600 5. Create a GATT connection between the scanner and advertiser. 601 6. From the scanner (central device), discover services. 602 7. Verify that all the service uuid variations are found. 603 8. Disconnect the GATT connection. 604 605 Expected Result: 606 Verify that a connection was established and then disconnected 607 successfully. Verify that the service uuid variations are found. 608 609 Returns: 610 Pass if True 611 Fail if False 612 613 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 614 Priority: 2 615 """ 616 try: 617 gatt_server_cb, gatt_server = setup_multiple_services(self.peripheral) 618 self.gatt_server_list.append(gatt_server) 619 except GattTestUtilsError as err: 620 logging.error(err) 621 asserts.fail("Failed to setup GATT service, error: {}".format(err)) 622 return 623 try: 624 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 625 self.bluetooth_gatt_list.append(bluetooth_gatt) 626 except GattTestUtilsError as err: 627 logging.error(err) 628 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 629 return 630 self.adv_instances.append(adv_callback) 631 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 632 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 633 try: 634 event = self.central.ed.pop_event(expected_event, self.default_timeout) 635 except Empty: 636 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 637 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 638 return 639 discovered_services_index = event['data']['ServicesIndex'] 640 log_gatt_server_uuids(self.central, discovered_services_index) 641 642 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 643 644 @test_tracker_info(uuid='7d3442c5-f71f-44ae-bd35-f2569f01b3b8') 645 def test_gatt_connect_in_quick_succession(self): 646 """Test GATT connections multiple times. 647 648 Test establishing a gatt connection between a GATT server and GATT 649 client with multiple iterations. 650 651 Steps: 652 1. Start a generic advertisement. 653 2. Start a generic scanner. 654 3. Find the advertisement and extract the mac address. 655 4. Stop the first scanner. 656 5. Create a GATT connection between the scanner and advertiser. 657 6. Disconnect the GATT connection. 658 7. Repeat steps 5 and 6 twenty times. 659 660 Expected Result: 661 Verify that a connection was established and then disconnected 662 successfully twenty times. 663 664 Returns: 665 Pass if True 666 Fail if False 667 668 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress 669 Priority: 1 670 """ 671 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 672 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 673 self.gatt_server_list.append(gatt_server) 674 mac_address, adv_callback, scan_callback = get_mac_address_of_generic_advertisement( 675 self.central, self.peripheral) 676 autoconnect = False 677 for i in range(100): 678 logging.info("Starting connection iteration {}".format(i + 1)) 679 try: 680 bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect) 681 self.central.sl4a.bleStopBleScan(scan_callback) 682 except GattTestUtilsError as err: 683 logging.error(err) 684 asserts.fail("Failed to connect to GATT at iteration {}, error: {}".format(i + 1, err)) 685 return 686 test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) 687 if not test_result: 688 logging.info("Failed to disconnect from peripheral device.") 689 asserts.fail("Failed to disconnect from peripheral device.") 690 return 691 self.adv_instances.append(adv_callback) 692 693 @test_tracker_info(uuid='148469d9-7ab0-4c08-b2e9-7e49e88da1fc') 694 def test_gatt_connect_on_path_attack(self): 695 """Test GATT connection with permission write encrypted with on-path attacker prevention 696 697 Test establishing a gatt connection between a GATT server and GATT 698 client while the GATT server's characteristic includes the property 699 write value and the permission write encrypted on-path attacker prevention 700 value. This will prompt LE pairing and then the devices will create a bond. 701 702 Steps: 703 1. Create a GATT server and server callback on the peripheral device. 704 2. Create a unique service and characteristic uuid on the peripheral. 705 3. Create a characteristic on the peripheral with these properties: 706 GattCharacteristic.PROPERTY_WRITE, 707 GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM 708 4. Create a GATT service on the peripheral. 709 5. Add the characteristic to the GATT service. 710 6. Create a GATT connection between your central and peripheral device. 711 7. From the central device, discover the peripheral's services. 712 8. Iterate the services found until you find the unique characteristic 713 created in step 3. 714 9. Once found, write a random but valid value to the characteristic. 715 10. Start pairing helpers on both devices immediately after attempting 716 to write to the characteristic. 717 11. Within 10 seconds of writing the characteristic, there should be 718 a prompt to bond the device from the peripheral. The helpers will 719 handle the UI interaction automatically. (see 720 BluetoothConnectionFacade.java bluetoothStartPairingHelper). 721 12. Verify that the two devices are bonded. 722 723 Expected Result: 724 Verify that a connection was established and the devices are bonded. 725 726 Returns: 727 Pass if True 728 Fail if False 729 730 TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, OnPathAttacker 731 Priority: 1 732 """ 733 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 734 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 735 self.gatt_server_list.append(gatt_server) 736 service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 737 test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 738 bonded = False 739 characteristic = self.peripheral.sl4a.gattServerCreateBluetoothGattCharacteristic( 740 test_uuid, GattCharacteristic.PROPERTY_WRITE, GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM) 741 gatt_service = self.peripheral.sl4a.gattServerCreateService(service_uuid, GattServiceType.SERVICE_TYPE_PRIMARY) 742 self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service, characteristic) 743 self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service) 744 assertThat(self._find_service_added_event(gatt_server_cb, service_uuid)).isTrue() 745 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 746 self.bluetooth_gatt_list.append(bluetooth_gatt) 747 self.adv_instances.append(adv_callback) 748 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 749 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 750 try: 751 event = self.central.ed.pop_event(expected_event, self.default_timeout) 752 except Empty: 753 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 754 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 755 return 756 discovered_services_index = event['data']['ServicesIndex'] 757 else: 758 logging.info("Failed to discover services.") 759 asserts.fail("Failed to discover services.") 760 return 761 test_value = [1, 2, 3, 4, 5, 6, 7] 762 services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index) 763 for i in range(services_count): 764 characteristic_uuids = (self.central.sl4a.gattClientGetDiscoveredCharacteristicUuids( 765 discovered_services_index, i)) 766 for characteristic_uuid in characteristic_uuids: 767 if characteristic_uuid == test_uuid: 768 self.central.sl4a.bluetoothStartPairingHelper() 769 self.peripheral.sl4a.bluetoothStartPairingHelper() 770 self.central.sl4a.gattClientCharacteristicSetValue(bluetooth_gatt, discovered_services_index, i, 771 characteristic_uuid, test_value) 772 self.central.sl4a.gattClientWriteCharacteristic(bluetooth_gatt, discovered_services_index, i, 773 characteristic_uuid) 774 start_time = time.time() + self.default_timeout 775 target_name = self.peripheral.sl4a.bluetoothGetLocalName() 776 while time.time() < start_time and bonded == False: 777 bonded_devices = \ 778 self.central.sl4a.bluetoothGetBondedDevices() 779 for device in bonded_devices: 780 if ('name' in device.keys() and device['name'] == target_name): 781 bonded = True 782 break 783 bonded = False 784 target_name = self.central.sl4a.bluetoothGetLocalName() 785 while time.time() < start_time and bonded == False: 786 bonded_devices = \ 787 self.peripheral.sl4a.bluetoothGetBondedDevices() 788 for device in bonded_devices: 789 if ('name' in device.keys() and device['name'] == target_name): 790 bonded = True 791 break 792 793 # Dual mode devices will establish connection over the classic transport, 794 # in order to establish bond over both transports, and do SDP. Starting 795 # disconnection before all this is finished is not safe, might lead to 796 # race conditions, i.e. bond over classic tranport shows up after LE 797 # bond is already removed. 798 time.sleep(4) 799 800 for ad in [self.central, self.peripheral]: 801 assertThat(clear_bonded_devices(ad)).isTrue() 802 803 # Necessary sleep time for entries to update unbonded state 804 time.sleep(2) 805 806 for ad in [self.central, self.peripheral]: 807 bonded_devices = ad.sl4a.bluetoothGetBondedDevices() 808 if len(bonded_devices) > 0: 809 logging.error("Failed to unbond devices: {}".format(bonded_devices)) 810 asserts.fail("Failed to unbond devices: {}".format(bonded_devices)) 811 return 812 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 813 814 @test_tracker_info(uuid='cc3fc361-7bf1-4ee2-9e46-4a27c88ce6a8') 815 def test_gatt_connect_get_connected_devices(self): 816 """Test GATT connections show up in getConnectedDevices 817 818 Test establishing a gatt connection between a GATT server and GATT 819 client. Verify that active connections show up using 820 BluetoothManager.getConnectedDevices API. 821 822 Steps: 823 1. Start a generic advertisement. 824 2. Start a generic scanner. 825 3. Find the advertisement and extract the mac address. 826 4. Stop the first scanner. 827 5. Create a GATT connection between the scanner and advertiser. 828 7. Verify the GATT Client has an open connection to the GATT Server. 829 8. Verify the GATT Server has an open connection to the GATT Client. 830 9. Disconnect the GATT connection. 831 832 Expected Result: 833 Verify that a connection was established, connected devices are found 834 on both the central and peripheral devices, and then disconnected 835 successfully. 836 837 Returns: 838 Pass if True 839 Fail if False 840 841 TAGS: LE, Advertising, Scanning, GATT 842 Priority: 2 843 """ 844 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 845 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 846 self.gatt_server_list.append(gatt_server) 847 try: 848 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 849 self.bluetooth_gatt_list.append(bluetooth_gatt) 850 except GattTestUtilsError as err: 851 logging.error(err) 852 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 853 return 854 conn_cen_devices = self.central.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT) 855 conn_per_devices = self.peripheral.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT_SERVER) 856 target_name = self.peripheral.sl4a.bluetoothGetLocalName() 857 error_message = ("Connected device {} not found in list of connected " 858 "devices {}") 859 if not any(d['name'] == target_name for d in conn_cen_devices): 860 logging.error(error_message.format(target_name, conn_cen_devices)) 861 asserts.fail(error_message.format(target_name, conn_cen_devices)) 862 return 863 # For the GATT server only check the size of the list since 864 # it may or may not include the device name. 865 target_name = self.central.sl4a.bluetoothGetLocalName() 866 if not conn_per_devices: 867 logging.error(error_message.format(target_name, conn_per_devices)) 868 asserts.fail(error_message.format(target_name, conn_per_devices)) 869 return 870 self.adv_instances.append(adv_callback) 871 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 872 873 @test_tracker_info(uuid='a0a37ca6-9fa8-4d35-9fdb-0e25b4b8a363') 874 def test_gatt_connect_second_adv_after_canceling_first_adv(self): 875 """Test GATT connection to peripherals second advertising address. 876 877 Test the ability of cancelling GATT connections and trying to reconnect 878 to the same device via a different address. 879 880 Steps: 881 1. A starts advertising 882 2. B starts scanning and finds A's mac address 883 3. Stop advertisement from step 1. Start a new advertisement on A and 884 find the new new mac address, B knows of both old and new address. 885 4. B1 sends connect request to old address of A 886 5. B1 cancel connect attempt after 10 seconds 887 6. B1 sends connect request to new address of A 888 7. Verify B1 establish connection to A in less than 10 seconds 889 890 Expected Result: 891 Verify that a connection was established only on the second 892 advertisement's mac address. 893 894 Returns: 895 Pass if True 896 Fail if False 897 898 TAGS: LE, Advertising, Scanning, GATT 899 Priority: 3 900 """ 901 autoconnect = False 902 transport = GattTransport.TRANSPORT_AUTO 903 opportunistic = False 904 # Setup a basic Gatt server on the peripheral 905 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 906 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 907 908 # Set advertisement settings to include local name in advertisement 909 # and set the advertising mode to low_latency. 910 self.peripheral.sl4a.bleSetAdvertiseSettingsIsConnectable(True) 911 self.peripheral.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) 912 self.peripheral.sl4a.bleSetAdvertiseSettingsAdvertiseMode(BleAdvertiseSettingsMode.LOW_LATENCY) 913 914 # Setup necessary advertisement objects. 915 advertise_data = self.peripheral.sl4a.bleBuildAdvertiseData() 916 advertise_settings = self.peripheral.sl4a.bleBuildAdvertiseSettings() 917 advertise_callback = self.peripheral.sl4a.bleGenBleAdvertiseCallback() 918 919 # Step 1: Start advertisement 920 self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 921 922 # Setup scan settings for low_latency scanning and to include the local name 923 # of the advertisement started in step 1. 924 filter_list = self.central.sl4a.bleGenFilterList() 925 self.central.sl4a.bleSetScanSettingsNumOfMatches(BleScanSettingsMatchNums.ONE) 926 self.central.sl4a.bleSetScanFilterDeviceName(self.peripheral.sl4a.bluetoothGetLocalName()) 927 self.central.sl4a.bleBuildScanFilter(filter_list) 928 self.central.sl4a.bleSetScanSettingsScanMode(BleScanSettingsModes.LOW_LATENCY) 929 930 # Setup necessary scan objects. 931 scan_settings = self.central.sl4a.bleBuildScanSetting() 932 scan_callback = self.central.sl4a.bleGenScanCallback() 933 934 # Step 2: Start scanning on central Android device and find peripheral 935 # address. 936 self.central.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 937 expected_event_name = scan_result.format(scan_callback) 938 try: 939 mac_address_pre_restart = self.central.ed.pop_event( 940 expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address'] 941 logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_pre_restart)) 942 except Empty: 943 logging.info("Peripheral advertisement not found") 944 asserts.fail("Peripheral advertisement not found") 945 return 946 finally: 947 self.peripheral.sl4a.bleStopBleAdvertising(advertise_callback) 948 949 # Step 3: Restart peripheral advertising such that a new mac address is 950 # created. 951 self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 952 953 mac_address_post_restart = mac_address_pre_restart 954 955 while True: 956 try: 957 mac_address_post_restart = self.central.ed.pop_event( 958 expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address'] 959 logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_post_restart)) 960 except Empty: 961 logging.info("Peripheral advertisement not found") 962 asserts.fail("Peripheral advertisement not found") 963 return 964 965 if mac_address_pre_restart != mac_address_post_restart: 966 break 967 968 969if __name__ == '__main__': 970 test_runner.main() 971