1"""BLE tests.""" 2 3import time 4 5 6 7from mobly import asserts 8from mobly import test_runner 9from mobly import utils 10from utils import android_base_test 11 12# Number of seconds for the target to stay BLE advertising. 13ADVERTISING_TIME = 120 14# The number of seconds to wait for receiving scan results. 15SCAN_TIMEOUT = 5 16# The number of seconds to wair for connection established. 17CONNECTION_TIMEOUT = 20 18# The number of seconds to wait before cancel connection. 19CANCEL_CONNECTION_WAIT_TIME = 0.1 20# UUID for test service. 21TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb' 22# UUID for write characteristic. 23TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb' 24# UUID for second write characteristic. 25TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb' 26# UUID for read test. 27TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb' 28# UUID for second read characteristic. 29TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb' 30# UUID for third read characteristic. 31TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb' 32# UUID for scan response. 33TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb' 34# Advertise settings in json format for Ble Advertise. 35ADVERTISE_SETTINGS = { 36 'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY', 37 'Timeout': ADVERTISING_TIME * 1000, 38 'Connectable': True, 39 'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW' 40} 41# Ramdom data to represent device stored in advertise data. 42DATA = utils.rand_ascii_str(16) 43# Random data for scan response. 44SCAN_RESPONSE_DATA = utils.rand_ascii_str(16) 45# Random data for read operation. 46READ_DATA = utils.rand_ascii_str(8) 47# Random data for second read operation. 48SECOND_READ_DATA = utils.rand_ascii_str(8) 49# Random data for third read operation. 50THIRD_READ_DATA = utils.rand_ascii_str(8) 51# Random data for write operation. 52WRITE_DATA = utils.rand_ascii_str(8) 53# Random data for second write operation. 54SECOND_WRITE_DATA = utils.rand_ascii_str(8) 55# Advertise data in json format for BLE advertise. 56ADVERTISE_DATA = { 57 'IncludeDeviceName': False, 58 'ServiceData': [{ 59 'UUID': TEST_BLE_SERVICE_UUID, 60 'Data': DATA 61 }] 62} 63# Advertise data in json format representing scan response for BLE advertise. 64SCAN_RESPONSE = { 65 'InlcudeDeviceName': 66 False, 67 'ServiceData': [{ 68 'UUID': TEST_SCAN_RESPONSE_UUID, 69 'Data': SCAN_RESPONSE_DATA 70 }] 71} 72# Scan filter in json format for BLE scan. 73SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID} 74# Scan settings in json format for BLE scan. 75SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'} 76# Characteristics for write in json format. 77WRITE_CHARACTERISTIC = { 78 'UUID': TEST_WRITE_UUID, 79 'Property': 'PROPERTY_WRITE', 80 'Permission': 'PERMISSION_WRITE' 81} 82SECOND_WRITE_CHARACTERISTIC = { 83 'UUID': TEST_SECOND_WRITE_UUID, 84 'Property': 'PROPERTY_WRITE', 85 'Permission': 'PERMISSION_WRITE' 86} 87# Characteristics for read in json format. 88READ_CHARACTERISTIC = { 89 'UUID': TEST_READ_UUID, 90 'Property': 'PROPERTY_READ', 91 'Permission': 'PERMISSION_READ', 92 'Data': READ_DATA 93} 94SECOND_READ_CHARACTERISTIC = { 95 'UUID': TEST_SECOND_READ_UUID, 96 'Property': 'PROPERTY_READ', 97 'Permission': 'PERMISSION_READ', 98 'Data': SECOND_READ_DATA 99} 100THIRD_READ_CHARACTERISTIC = { 101 'UUID': TEST_THIRD_READ_UUID, 102 'Property': 'PROPERTY_READ', 103 'Permission': 'PERMISSION_READ', 104 'Data': THIRD_READ_DATA 105} 106# Service data in json format for Ble Server. 107SERVICE = { 108 'UUID': 109 TEST_BLE_SERVICE_UUID, 110 'Type': 111 'SERVICE_TYPE_PRIMARY', 112 'Characteristics': [ 113 WRITE_CHARACTERISTIC, SECOND_WRITE_CHARACTERISTIC, READ_CHARACTERISTIC, 114 SECOND_READ_CHARACTERISTIC, THIRD_READ_CHARACTERISTIC 115 ] 116} 117# Macros for literal string. 118UUID = 'UUID' 119GATT_SUCCESS = 'GATT_SUCCESS' 120STATE = 'newState' 121STATUS = 'status' 122 123 124def IsRequiredScanResult(scan_result): 125 result = scan_result.data['result'] 126 for service in result['ScanRecord']['Services']: 127 if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: 128 return True 129 return False 130 131 132def Discover(scanner, advertiser): 133 """Logic for BLE scan and advertise. 134 135 Args: 136 scanner: AndroidDevice. The device that starts ble scan to find target. 137 advertiser: AndroidDevice. The device that keeps advertising so other 138 devices acknowledge it. 139 140 Steps: 141 1. Advertiser starts advertising and gets a startSuccess callback. 142 2. Scanner starts scanning and finds advertiser from scan results. 143 144 Verifies: 145 Advertiser is discovered within 5s by scanner. 146 """ 147 advertiser.advertise_callback = advertiser.android.bleStartAdvertising( 148 ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE) 149 scanner.scan_callback = scanner.android.bleStartScan([SCAN_FILTER], 150 SCAN_SETTINGS) 151 advertiser.advertise_callback.waitAndGet('onStartSuccess', 30) 152 advertiser.log.info('BLE advertising started') 153 time.sleep(SCAN_TIMEOUT) 154 scan_result = scanner.scan_callback.waitForEvent( 155 'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT) 156 scan_success = False 157 scan_response_found = False 158 result = scan_result.data['result'] 159 for service in result['ScanRecord']['Services']: 160 if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: 161 scanner.connect_to_address = result['Device']['Address'] 162 scan_success = True 163 if (service[UUID] == TEST_SCAN_RESPONSE_UUID and 164 service['Data'] == SCAN_RESPONSE_DATA): 165 scan_response_found = True 166 asserts.assert_true( 167 scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT) 168 asserts.assert_true(scan_response_found, 'Scan response is not found') 169 scanner.log.info('Advertiser is found') 170 171 172def StopDiscover(scanner, advertiser): 173 """Logic for stopping BLE scan and advertise. 174 175 Args: 176 scanner: AndroidDevice. The device that starts ble scan to find target. 177 advertiser: AndroidDevice. The device that keeps advertising so other 178 devices acknowledge it. 179 180 Steps: 181 1. Scanner stops scanning. 182 2. Advertiser stops advertising. 183 """ 184 scanner.android.bleStopScan(scanner.scan_callback.callback_id) 185 scanner.log.info('BLE scanning stopped') 186 advertiser.android.bleStopAdvertising( 187 advertiser.advertise_callback.callback_id) 188 advertiser.log.info('BLE advertising stopped') 189 190 191def Connect(client, server): 192 """Logic for starting BLE client and server. 193 194 Args: 195 client: AndroidDevice. The device that behaves as GATT client. 196 server: AndroidDevice. The device that behaves as GATT server. 197 198 Steps: 199 1. Server starts and service added properly. 200 2. Client connects to server via Gatt, connection completes with 201 GATT_SUCCESS within some TIMEOUT, onConnectionStateChange/STATE_CONNECTED 202 called EXACTLY once. 203 204 Verifies: 205 Client gets corresponding callback. 206 """ 207 server.server_callback = server.android.bleStartServer([SERVICE]) 208 start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30) 209 asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS) 210 uuids = [ 211 characteristic[UUID] 212 for characteristic in start_server_result.data['Service'][ 213 'Characteristics'] 214 ] 215 for uuid in [ 216 characteristic[UUID] for characteristic in SERVICE['Characteristics'] 217 ]: 218 asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) 219 server.log.info('BLE server started') 220 client.client_callback = client.android.bleConnectGatt( 221 client.connect_to_address) 222 time.sleep(CONNECTION_TIMEOUT) 223 start_client_results = client.client_callback.getAll( 224 'onConnectionStateChange') 225 asserts.assert_equal(len(start_client_results), 1) 226 for start_client_result in start_client_results: 227 asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS) 228 asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED') 229 client.log.info('BLE client connected') 230 231 232def CancelOpen(client, server): 233 """Logic for BLE client cancel open and reconnect. 234 235 Args: 236 client: AndroidDevice. The device that behaves as GATT client. 237 server: AndroidDevice. The device that behaves as GATT server. 238 239 Steps: 240 1. Server starts and service added properly. 241 2. Client stars to connect to server via Gatt, but the connection has not 242 been established. 243 3. Client calls disconnect to cancel the connection. 244 4. Client connects to server, connection completes with GATT_SUCCESS within 245 some TIMEOUT, onConnectionStateChange/STATE_CONNECTEDcalled EXACTLY once. 246 247 Verifies: 248 Client gets corresponding callback. 249 """ 250 server.server_callback = server.android.bleStartServer([SERVICE]) 251 start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30) 252 asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS) 253 uuids = [ 254 characteristic[UUID] 255 for characteristic in start_server_result.data['Service'][ 256 'Characteristics'] 257 ] 258 for uuid in [ 259 characteristic[UUID] for characteristic in SERVICE['Characteristics'] 260 ]: 261 asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) 262 server.log.info('BLE server started') 263 client.client_callback = client.android.bleConnectGatt( 264 client.connect_to_address) 265 time.sleep(CANCEL_CONNECTION_WAIT_TIME) 266 start_client_results = client.client_callback.getAll( 267 'onConnectionStateChange') 268 if not start_client_results: 269 client.android.bleDisconnect() 270 client.log.info('BLE client cancel open') 271 time.sleep(CANCEL_CONNECTION_WAIT_TIME) 272 client.client_callback = client.android.bleConnectGatt( 273 client.connect_to_address) 274 time.sleep(CONNECTION_TIMEOUT) 275 start_client_results = client.client_callback.getAll( 276 'onConnectionStateChange') 277 asserts.assert_equal(len(start_client_results), 1) 278 for start_client_result in start_client_results: 279 asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS) 280 asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED') 281 client.log.info('BLE client connected') 282 283 284def Disconnect(client, server): 285 """Logic for stopping BLE client and server. 286 287 Args: 288 client: AndroidDevice. The device that behaves as GATT client. 289 server: AndroidDevice. The device that behaves as GATT server. 290 291 Steps: 292 1. Client calls disconnect, gets a callback with STATE_DISCONNECTED 293 and GATT_SUCCESS. 294 2. Server closes. 295 296 Verifies: 297 Client gets corresponding callback. 298 """ 299 client.android.bleDisconnect() 300 stop_client_result = client.client_callback.waitAndGet( 301 'onConnectionStateChange', 30) 302 asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS) 303 asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED') 304 client.log.info('BLE client disconnected') 305 server.android.bleStopServer() 306 server.log.info('BLE server stopped') 307 308 309def DiscoverServices(client): 310 """Logic for BLE services discovery. 311 312 Args: 313 client: AndroidDevice. The device that behaves as GATT client. 314 315 Steps: 316 1. Client successfully completes service discovery & gets 317 onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/ 318 GATT_SUCCESS called EXACTLY once. 319 2. Client discovers the readable and writable characteristics. 320 321 Verifies: 322 Client gets corresponding callback. 323 """ 324 client.android.bleDiscoverServices() 325 time.sleep(CONNECTION_TIMEOUT) 326 discover_services_results = client.client_callback.getAll( 327 'onServiceDiscovered') 328 asserts.assert_equal(len(discover_services_results), 1) 329 service_discovered = False 330 asserts.assert_equal(discover_services_results[0].data[STATUS], 331 GATT_SUCCESS) 332 for service in discover_services_results[0].data['Services']: 333 if service['UUID'] == TEST_BLE_SERVICE_UUID: 334 service_discovered = True 335 uuids = [ 336 characteristic[UUID] 337 for characteristic in service['Characteristics'] 338 ] 339 for uuid in [ 340 characteristic[UUID] 341 for characteristic in SERVICE['Characteristics'] 342 ]: 343 asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) 344 asserts.assert_true(service_discovered, 345 'Failed to discover the customize service') 346 client.log.info('BLE discover services finished') 347 348 349def ReadCharacteristic(client): 350 """Logic for BLE characteristic read. 351 352 Args: 353 client: AndroidDevice. The device that behaves as GATT client. 354 355 Steps: 356 1. Client reads a characteristic from server & gets true. 357 2. Server calls sendResponse & client gets onCharacteristicRead. 358 359 Verifies: 360 Client gets corresponding callback. 361 """ 362 read_operation_result = client.android.bleReadOperation( 363 TEST_BLE_SERVICE_UUID, TEST_READ_UUID) 364 asserts.assert_true(read_operation_result, 365 'BLE read operation failed to start') 366 read_operation_result = client.client_callback.waitAndGet( 367 'onCharacteristicRead', 30) 368 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 369 asserts.assert_equal(read_operation_result.data['Data'], READ_DATA) 370 client.log.info('Read operation finished') 371 read_operation_result = client.android.bleReadOperation( 372 TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID) 373 asserts.assert_true(read_operation_result, 374 'BLE read operation failed to start') 375 read_operation_result = client.client_callback.waitAndGet( 376 'onCharacteristicRead', 30) 377 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 378 asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA) 379 client.log.info('Second read operation finished') 380 read_operation_result = client.android.bleReadOperation( 381 TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID) 382 asserts.assert_true(read_operation_result, 383 'BLE read operation failed to start') 384 read_operation_result = client.client_callback.waitAndGet( 385 'onCharacteristicRead', 30) 386 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 387 asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA) 388 client.log.info('Third read operation finished') 389 390 391def WriteCharacteristic(client, server): 392 """Logic for BLE characteristic write. 393 394 Args: 395 client: AndroidDevice. The device that behaves as GATT client. 396 server: AndroidDevice. The device that behaves as GATT server. 397 398 Steps: 399 1. Client writes a characteristic to server & gets true. 400 2. Server calls sendResponse & client gets onCharacteristicWrite. 401 402 Verifies: 403 Client gets corresponding callback. 404 """ 405 write_operation_result = client.android.bleWriteOperation( 406 TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA) 407 asserts.assert_true(write_operation_result, 408 'BLE write operation failed to start') 409 server_write_operation_result = server.server_callback.waitAndGet( 410 'onCharacteristicWriteRequest', 30) 411 asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA) 412 client.client_callback.waitAndGet('onCharacteristicWrite', 30) 413 client.log.info('Write operation finished') 414 write_operation_result = client.android.bleWriteOperation( 415 TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA) 416 asserts.assert_true(write_operation_result, 417 'BLE write operation failed to start') 418 server_write_operation_result = server.server_callback.waitAndGet( 419 'onCharacteristicWriteRequest', 30) 420 asserts.assert_equal(server_write_operation_result.data['Data'], 421 SECOND_WRITE_DATA) 422 client.client_callback.waitAndGet('onCharacteristicWrite', 30) 423 client.log.info('Second write operation finished') 424 425 426def ReliableWrite(client, server): 427 """Logic for BLE reliable write. 428 429 Args: 430 client: AndroidDevice. The device that behaves as GATT client. 431 server: AndroidDevice. The device that behaves as GATT server. 432 433 Steps: 434 1. Client calls beginReliableWrite & gets true. 435 2. Client writes a characteristic to server & gets true. 436 3. Server calls sendResponse & client gets onCharacteristicWrite. 437 4. Client calls executeReliableWrite & gets true 438 5. Server calls sendResponse & client gets onReliableWriteCompleted. 439 440 Verifies: 441 Client get corresponding callbacks. 442 """ 443 begin_reliable_write_result = client.android.bleBeginReliableWrite() 444 asserts.assert_true(begin_reliable_write_result, 445 'BLE reliable write failed to start') 446 client.log.info('BLE reliable write started') 447 WriteCharacteristic(client, server) 448 execute_reliable_write_result = client.android.bleExecuteReliableWrite() 449 asserts.assert_true(execute_reliable_write_result, 450 'BLE reliable write failed to execute') 451 client.log.info('BLE reliable write execute started') 452 client.client_callback.waitAndGet('onReliableWriteCompleted', 30) 453 client.log.info('BLE reliable write finished') 454 455 456class BleTest(android_base_test.AndroidBaseTest): 457 """BLE tests.""" 458 459 def setup_class(self): 460 super(BleTest, self).setup_class() 461 self.initiator = self.dut_a 462 self.receiver = self.dut_b 463 # Sets the tag that represents this device in logs. 464 # The device used to scan BLE devices and behave as a GATT client. 465 self.initiator.debug_tag = 'initiator' 466 # The device used to BLE advertise and behave as a GATT server. 467 self.receiver.debug_tag = 'receiver' 468 469 def setup_test(self): 470 # Make sure bluetooth is on. 471 self.initiator.android.btEnable() 472 self.receiver.android.btEnable() 473 474 def test_basic_ble_process(self): 475 """Test for basic BLE process flow. 476 477 Steps: 478 1. Initiator discovers receiver. 479 2. Initiator connects to receiver. 480 3. Initiator discovers the BLE service receiver provided. 481 4. Initiator reads a message from receiver. 482 5. Initiator sends a message to receiver. 483 6. Initiator disconnects from receiver. 484 7. BLE scan and advertise stopped. 485 486 Verifies: 487 In each step, initiator and receiver get corresponding callbacks. 488 """ 489 Discover(self.initiator, self.receiver) 490 Connect(self.initiator, self.receiver) 491 DiscoverServices(self.initiator) 492 ReadCharacteristic(self.initiator) 493 WriteCharacteristic(self.initiator, self.receiver) 494 Disconnect(self.initiator, self.receiver) 495 StopDiscover(self.initiator, self.receiver) 496 497 def test_cancel_open_ble_process(self): 498 """Test for BLE process flow involving cancel open. 499 500 Steps: 501 1. Initiator discovers receiver. 502 2. Initiator initials connection to receiver, cancels connection and 503 reconnects. 504 3. Initiator discovers the BLE service receiver provided. 505 6. Initiator disconnects from receiver. 506 7. BLE scan and advertise stopped. 507 508 Verifies: 509 In each step, initiator and receiver get corresponding callbacks. 510 """ 511 Discover(self.initiator, self.receiver) 512 CancelOpen(self.initiator, self.receiver) 513 DiscoverServices(self.initiator) 514 Disconnect(self.initiator, self.receiver) 515 StopDiscover(self.initiator, self.receiver) 516 517 def test_reliable_write_ble_process(self): 518 """Test for BLE process flow involving reliable write. 519 520 Steps: 521 1. Initiator discovers receiver. 522 2. Initiator connects to receiver. 523 3. Initiator discovers the BLE service receiver provided. 524 4. Initiator starts reliable write to receiver and finishes successfully. 525 5. Initiator disconnects from receiver. 526 6. BLE scan and advertise stopped. 527 528 Verifies: 529 In each step, initiator and receiver get corresponding callbacks. 530 """ 531 Discover(self.initiator, self.receiver) 532 Connect(self.initiator, self.receiver) 533 DiscoverServices(self.initiator) 534 ReliableWrite(self.initiator, self.receiver) 535 Disconnect(self.initiator, self.receiver) 536 StopDiscover(self.initiator, self.receiver) 537 538 def teardown_test(self): 539 # Turn Bluetooth off on both devices after test finishes. 540 self.initiator.android.btDisable() 541 self.receiver.android.btDisable() 542 543 544if __name__ == '__main__': 545 test_runner.main() 546