1"""BLE tests."""
2
3import time
4
5
6
7from mobly import asserts
8from mobly import test_runner
9from mobly import utils
10from utils import android_base_test
11
12# Number of seconds for the target to stay BLE advertising.
13ADVERTISING_TIME = 120
14# The number of seconds to wait for receiving scan results.
15SCAN_TIMEOUT = 5
16# The number of seconds to wair for connection established.
17CONNECTION_TIMEOUT = 20
18# The number of seconds to wait before cancel connection.
19CANCEL_CONNECTION_WAIT_TIME = 0.1
20# UUID for test service.
21TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb'
22# UUID for write characteristic.
23TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb'
24# UUID for second write characteristic.
25TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb'
26# UUID for read test.
27TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb'
28# UUID for second read characteristic.
29TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb'
30# UUID for third read characteristic.
31TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb'
32# UUID for scan response.
33TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb'
34# Advertise settings in json format for Ble Advertise.
35ADVERTISE_SETTINGS = {
36    'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY',
37    'Timeout': ADVERTISING_TIME * 1000,
38    'Connectable': True,
39    'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW'
40}
41# Ramdom data to represent device stored in advertise data.
42DATA = utils.rand_ascii_str(16)
43# Random data for scan response.
44SCAN_RESPONSE_DATA = utils.rand_ascii_str(16)
45# Random data for read operation.
46READ_DATA = utils.rand_ascii_str(8)
47# Random data for second read operation.
48SECOND_READ_DATA = utils.rand_ascii_str(8)
49# Random data for third read operation.
50THIRD_READ_DATA = utils.rand_ascii_str(8)
51# Random data for write operation.
52WRITE_DATA = utils.rand_ascii_str(8)
53# Random data for second write operation.
54SECOND_WRITE_DATA = utils.rand_ascii_str(8)
55# Advertise data in json format for BLE advertise.
56ADVERTISE_DATA = {
57    'IncludeDeviceName': False,
58    'ServiceData': [{
59        'UUID': TEST_BLE_SERVICE_UUID,
60        'Data': DATA
61    }]
62}
63# Advertise data in json format representing scan response for BLE advertise.
64SCAN_RESPONSE = {
65    'InlcudeDeviceName':
66        False,
67    'ServiceData': [{
68        'UUID': TEST_SCAN_RESPONSE_UUID,
69        'Data': SCAN_RESPONSE_DATA
70    }]
71}
72# Scan filter in json format for BLE scan.
73SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID}
74# Scan settings in json format for BLE scan.
75SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'}
76# Characteristics for write in json format.
77WRITE_CHARACTERISTIC = {
78    'UUID': TEST_WRITE_UUID,
79    'Property': 'PROPERTY_WRITE',
80    'Permission': 'PERMISSION_WRITE'
81}
82SECOND_WRITE_CHARACTERISTIC = {
83    'UUID': TEST_SECOND_WRITE_UUID,
84    'Property': 'PROPERTY_WRITE',
85    'Permission': 'PERMISSION_WRITE'
86}
87# Characteristics for read in json format.
88READ_CHARACTERISTIC = {
89    'UUID': TEST_READ_UUID,
90    'Property': 'PROPERTY_READ',
91    'Permission': 'PERMISSION_READ',
92    'Data': READ_DATA
93}
94SECOND_READ_CHARACTERISTIC = {
95    'UUID': TEST_SECOND_READ_UUID,
96    'Property': 'PROPERTY_READ',
97    'Permission': 'PERMISSION_READ',
98    'Data': SECOND_READ_DATA
99}
100THIRD_READ_CHARACTERISTIC = {
101    'UUID': TEST_THIRD_READ_UUID,
102    'Property': 'PROPERTY_READ',
103    'Permission': 'PERMISSION_READ',
104    'Data': THIRD_READ_DATA
105}
106# Service data in json format for Ble Server.
107SERVICE = {
108    'UUID':
109        TEST_BLE_SERVICE_UUID,
110    'Type':
111        'SERVICE_TYPE_PRIMARY',
112    'Characteristics': [
113        WRITE_CHARACTERISTIC, SECOND_WRITE_CHARACTERISTIC, READ_CHARACTERISTIC,
114        SECOND_READ_CHARACTERISTIC, THIRD_READ_CHARACTERISTIC
115    ]
116}
117# Macros for literal string.
118UUID = 'UUID'
119GATT_SUCCESS = 'GATT_SUCCESS'
120STATE = 'newState'
121STATUS = 'status'
122
123
124def IsRequiredScanResult(scan_result):
125  result = scan_result.data['result']
126  for service in result['ScanRecord']['Services']:
127    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
128      return True
129  return False
130
131
132def Discover(scanner, advertiser):
133  """Logic for BLE scan and advertise.
134
135  Args:
136    scanner: AndroidDevice. The device that starts ble scan to find target.
137    advertiser: AndroidDevice. The device that keeps advertising so other
138    devices acknowledge it.
139
140  Steps:
141    1. Advertiser starts advertising and gets a startSuccess callback.
142    2. Scanner starts scanning and finds advertiser from scan results.
143
144  Verifies:
145    Advertiser is discovered within 5s by scanner.
146  """
147  advertiser.advertise_callback = advertiser.android.bleStartAdvertising(
148      ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE)
149  scanner.scan_callback = scanner.android.bleStartScan([SCAN_FILTER],
150                                                       SCAN_SETTINGS)
151  advertiser.advertise_callback.waitAndGet('onStartSuccess', 30)
152  advertiser.log.info('BLE advertising started')
153  time.sleep(SCAN_TIMEOUT)
154  scan_result = scanner.scan_callback.waitForEvent(
155      'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT)
156  scan_success = False
157  scan_response_found = False
158  result = scan_result.data['result']
159  for service in result['ScanRecord']['Services']:
160    if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA:
161      scanner.connect_to_address = result['Device']['Address']
162      scan_success = True
163    if (service[UUID] == TEST_SCAN_RESPONSE_UUID and
164        service['Data'] == SCAN_RESPONSE_DATA):
165      scan_response_found = True
166  asserts.assert_true(
167      scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT)
168  asserts.assert_true(scan_response_found, 'Scan response is not found')
169  scanner.log.info('Advertiser is found')
170
171
172def StopDiscover(scanner, advertiser):
173  """Logic for stopping BLE scan and advertise.
174
175  Args:
176    scanner: AndroidDevice. The device that starts ble scan to find target.
177    advertiser: AndroidDevice. The device that keeps advertising so other
178    devices acknowledge it.
179
180  Steps:
181    1. Scanner stops scanning.
182    2. Advertiser stops advertising.
183  """
184  scanner.android.bleStopScan(scanner.scan_callback.callback_id)
185  scanner.log.info('BLE scanning stopped')
186  advertiser.android.bleStopAdvertising(
187      advertiser.advertise_callback.callback_id)
188  advertiser.log.info('BLE advertising stopped')
189
190
191def Connect(client, server):
192  """Logic for starting BLE client and server.
193
194  Args:
195    client: AndroidDevice. The device that behaves as GATT client.
196    server: AndroidDevice. The device that behaves as GATT server.
197
198  Steps:
199    1. Server starts and service added properly.
200    2. Client connects to server via Gatt, connection completes with
201    GATT_SUCCESS within some TIMEOUT, onConnectionStateChange/STATE_CONNECTED
202    called EXACTLY once.
203
204  Verifies:
205    Client gets corresponding callback.
206  """
207  server.server_callback = server.android.bleStartServer([SERVICE])
208  start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30)
209  asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS)
210  uuids = [
211      characteristic[UUID]
212      for characteristic in start_server_result.data['Service'][
213          'Characteristics']
214  ]
215  for uuid in [
216      characteristic[UUID] for characteristic in SERVICE['Characteristics']
217  ]:
218    asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
219  server.log.info('BLE server started')
220  client.client_callback = client.android.bleConnectGatt(
221      client.connect_to_address)
222  time.sleep(CONNECTION_TIMEOUT)
223  start_client_results = client.client_callback.getAll(
224      'onConnectionStateChange')
225  asserts.assert_equal(len(start_client_results), 1)
226  for start_client_result in start_client_results:
227    asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS)
228    asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED')
229  client.log.info('BLE client connected')
230
231
232def CancelOpen(client, server):
233  """Logic for BLE client cancel open and reconnect.
234
235  Args:
236    client: AndroidDevice. The device that behaves as GATT client.
237    server: AndroidDevice. The device that behaves as GATT server.
238
239  Steps:
240    1. Server starts and service added properly.
241    2. Client stars to connect to server via Gatt, but the connection has not
242    been established.
243    3. Client calls disconnect to cancel the connection.
244    4. Client connects to server, connection completes with GATT_SUCCESS within
245    some TIMEOUT, onConnectionStateChange/STATE_CONNECTEDcalled EXACTLY once.
246
247  Verifies:
248    Client gets corresponding callback.
249  """
250  server.server_callback = server.android.bleStartServer([SERVICE])
251  start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30)
252  asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS)
253  uuids = [
254      characteristic[UUID]
255      for characteristic in start_server_result.data['Service'][
256          'Characteristics']
257  ]
258  for uuid in [
259      characteristic[UUID] for characteristic in SERVICE['Characteristics']
260  ]:
261    asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
262  server.log.info('BLE server started')
263  client.client_callback = client.android.bleConnectGatt(
264      client.connect_to_address)
265  time.sleep(CANCEL_CONNECTION_WAIT_TIME)
266  start_client_results = client.client_callback.getAll(
267      'onConnectionStateChange')
268  if not start_client_results:
269    client.android.bleDisconnect()
270    client.log.info('BLE client cancel open')
271    time.sleep(CANCEL_CONNECTION_WAIT_TIME)
272    client.client_callback = client.android.bleConnectGatt(
273        client.connect_to_address)
274    time.sleep(CONNECTION_TIMEOUT)
275    start_client_results = client.client_callback.getAll(
276        'onConnectionStateChange')
277    asserts.assert_equal(len(start_client_results), 1)
278    for start_client_result in start_client_results:
279      asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS)
280      asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED')
281      client.log.info('BLE client connected')
282
283
284def Disconnect(client, server):
285  """Logic for stopping BLE client and server.
286
287  Args:
288    client: AndroidDevice. The device that behaves as GATT client.
289    server: AndroidDevice. The device that behaves as GATT server.
290
291  Steps:
292    1. Client calls disconnect, gets a callback with STATE_DISCONNECTED
293    and GATT_SUCCESS.
294    2. Server closes.
295
296  Verifies:
297    Client gets corresponding callback.
298  """
299  client.android.bleDisconnect()
300  stop_client_result = client.client_callback.waitAndGet(
301      'onConnectionStateChange', 30)
302  asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS)
303  asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED')
304  client.log.info('BLE client disconnected')
305  server.android.bleStopServer()
306  server.log.info('BLE server stopped')
307
308
309def DiscoverServices(client):
310  """Logic for BLE services discovery.
311
312  Args:
313    client: AndroidDevice. The device that behaves as GATT client.
314
315  Steps:
316    1. Client successfully completes service discovery & gets
317    onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/
318    GATT_SUCCESS called EXACTLY once.
319    2. Client discovers the readable and writable characteristics.
320
321  Verifies:
322    Client gets corresponding callback.
323  """
324  client.android.bleDiscoverServices()
325  time.sleep(CONNECTION_TIMEOUT)
326  discover_services_results = client.client_callback.getAll(
327      'onServiceDiscovered')
328  asserts.assert_equal(len(discover_services_results), 1)
329  service_discovered = False
330  asserts.assert_equal(discover_services_results[0].data[STATUS],
331                       GATT_SUCCESS)
332  for service in discover_services_results[0].data['Services']:
333    if service['UUID'] == TEST_BLE_SERVICE_UUID:
334      service_discovered = True
335      uuids = [
336          characteristic[UUID]
337          for characteristic in service['Characteristics']
338      ]
339      for uuid in [
340          characteristic[UUID]
341          for characteristic in SERVICE['Characteristics']
342      ]:
343        asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid)
344  asserts.assert_true(service_discovered,
345                      'Failed to discover the customize service')
346  client.log.info('BLE discover services finished')
347
348
349def ReadCharacteristic(client):
350  """Logic for BLE characteristic read.
351
352  Args:
353    client: AndroidDevice. The device that behaves as GATT client.
354
355  Steps:
356    1. Client reads a characteristic from server & gets true.
357    2. Server calls sendResponse & client gets onCharacteristicRead.
358
359  Verifies:
360    Client gets corresponding callback.
361  """
362  read_operation_result = client.android.bleReadOperation(
363      TEST_BLE_SERVICE_UUID, TEST_READ_UUID)
364  asserts.assert_true(read_operation_result,
365                      'BLE read operation failed to start')
366  read_operation_result = client.client_callback.waitAndGet(
367      'onCharacteristicRead', 30)
368  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
369  asserts.assert_equal(read_operation_result.data['Data'], READ_DATA)
370  client.log.info('Read operation finished')
371  read_operation_result = client.android.bleReadOperation(
372      TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID)
373  asserts.assert_true(read_operation_result,
374                      'BLE read operation failed to start')
375  read_operation_result = client.client_callback.waitAndGet(
376      'onCharacteristicRead', 30)
377  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
378  asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA)
379  client.log.info('Second read operation finished')
380  read_operation_result = client.android.bleReadOperation(
381      TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID)
382  asserts.assert_true(read_operation_result,
383                      'BLE read operation failed to start')
384  read_operation_result = client.client_callback.waitAndGet(
385      'onCharacteristicRead', 30)
386  asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS)
387  asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA)
388  client.log.info('Third read operation finished')
389
390
391def WriteCharacteristic(client, server):
392  """Logic for BLE characteristic write.
393
394  Args:
395    client: AndroidDevice. The device that behaves as GATT client.
396    server: AndroidDevice. The device that behaves as GATT server.
397
398  Steps:
399    1. Client writes a characteristic to server & gets true.
400    2. Server calls sendResponse & client gets onCharacteristicWrite.
401
402  Verifies:
403    Client gets corresponding callback.
404  """
405  write_operation_result = client.android.bleWriteOperation(
406      TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA)
407  asserts.assert_true(write_operation_result,
408                      'BLE write operation failed to start')
409  server_write_operation_result = server.server_callback.waitAndGet(
410      'onCharacteristicWriteRequest', 30)
411  asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA)
412  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
413  client.log.info('Write operation finished')
414  write_operation_result = client.android.bleWriteOperation(
415      TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA)
416  asserts.assert_true(write_operation_result,
417                      'BLE write operation failed to start')
418  server_write_operation_result = server.server_callback.waitAndGet(
419      'onCharacteristicWriteRequest', 30)
420  asserts.assert_equal(server_write_operation_result.data['Data'],
421                       SECOND_WRITE_DATA)
422  client.client_callback.waitAndGet('onCharacteristicWrite', 30)
423  client.log.info('Second write operation finished')
424
425
426def ReliableWrite(client, server):
427  """Logic for BLE reliable write.
428
429  Args:
430    client: AndroidDevice. The device that behaves as GATT client.
431    server: AndroidDevice. The device that behaves as GATT server.
432
433  Steps:
434    1. Client calls beginReliableWrite & gets true.
435    2. Client writes a characteristic to server & gets true.
436    3. Server calls sendResponse & client gets onCharacteristicWrite.
437    4. Client calls executeReliableWrite & gets true
438    5. Server calls sendResponse & client gets onReliableWriteCompleted.
439
440  Verifies:
441    Client get corresponding callbacks.
442  """
443  begin_reliable_write_result = client.android.bleBeginReliableWrite()
444  asserts.assert_true(begin_reliable_write_result,
445                      'BLE reliable write failed to start')
446  client.log.info('BLE reliable write started')
447  WriteCharacteristic(client, server)
448  execute_reliable_write_result = client.android.bleExecuteReliableWrite()
449  asserts.assert_true(execute_reliable_write_result,
450                      'BLE reliable write failed to execute')
451  client.log.info('BLE reliable write execute started')
452  client.client_callback.waitAndGet('onReliableWriteCompleted', 30)
453  client.log.info('BLE reliable write finished')
454
455
456class BleTest(android_base_test.AndroidBaseTest):
457  """BLE tests."""
458
459  def setup_class(self):
460    super(BleTest, self).setup_class()
461    self.initiator = self.dut_a
462    self.receiver = self.dut_b
463    # Sets the tag that represents this device in logs.
464    # The device used to scan BLE devices and behave as a GATT client.
465    self.initiator.debug_tag = 'initiator'
466    # The device used to BLE advertise and behave as a GATT server.
467    self.receiver.debug_tag = 'receiver'
468
469  def setup_test(self):
470    # Make sure bluetooth is on.
471    self.initiator.android.btEnable()
472    self.receiver.android.btEnable()
473
474  def test_basic_ble_process(self):
475    """Test for basic BLE process flow.
476
477    Steps:
478      1. Initiator discovers receiver.
479      2. Initiator connects to receiver.
480      3. Initiator discovers the BLE service receiver provided.
481      4. Initiator reads a message from receiver.
482      5. Initiator sends a message to receiver.
483      6. Initiator disconnects from receiver.
484      7. BLE scan and advertise stopped.
485
486    Verifies:
487      In each step, initiator and receiver get corresponding callbacks.
488    """
489    Discover(self.initiator, self.receiver)
490    Connect(self.initiator, self.receiver)
491    DiscoverServices(self.initiator)
492    ReadCharacteristic(self.initiator)
493    WriteCharacteristic(self.initiator, self.receiver)
494    Disconnect(self.initiator, self.receiver)
495    StopDiscover(self.initiator, self.receiver)
496
497  def test_cancel_open_ble_process(self):
498    """Test for BLE process flow involving cancel open.
499
500    Steps:
501      1. Initiator discovers receiver.
502      2. Initiator initials connection to receiver, cancels connection and
503      reconnects.
504      3. Initiator discovers the BLE service receiver provided.
505      6. Initiator disconnects from receiver.
506      7. BLE scan and advertise stopped.
507
508    Verifies:
509      In each step, initiator and receiver get corresponding callbacks.
510    """
511    Discover(self.initiator, self.receiver)
512    CancelOpen(self.initiator, self.receiver)
513    DiscoverServices(self.initiator)
514    Disconnect(self.initiator, self.receiver)
515    StopDiscover(self.initiator, self.receiver)
516
517  def test_reliable_write_ble_process(self):
518    """Test for BLE process flow involving reliable write.
519
520    Steps:
521      1. Initiator discovers receiver.
522      2. Initiator connects to receiver.
523      3. Initiator discovers the BLE service receiver provided.
524      4. Initiator starts reliable write to receiver and finishes successfully.
525      5. Initiator disconnects from receiver.
526      6. BLE scan and advertise stopped.
527
528    Verifies:
529      In each step, initiator and receiver get corresponding callbacks.
530    """
531    Discover(self.initiator, self.receiver)
532    Connect(self.initiator, self.receiver)
533    DiscoverServices(self.initiator)
534    ReliableWrite(self.initiator, self.receiver)
535    Disconnect(self.initiator, self.receiver)
536    StopDiscover(self.initiator, self.receiver)
537
538  def teardown_test(self):
539    # Turn Bluetooth off on both devices after test finishes.
540    self.initiator.android.btDisable()
541    self.receiver.android.btDisable()
542
543
544if __name__ == '__main__':
545  test_runner.main()
546