1#/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import random 18import statistics 19import string 20import time 21from acts import asserts 22from acts.base_test import BaseTestClass 23from acts.signals import TestPass 24from acts.test_decorators import test_tracker_info 25from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 26from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection 27from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test 28from acts.test_utils.bt.bt_test_utils import verify_server_and_client_connected 29from acts.test_utils.bt.bt_test_utils import write_read_verify_data 30from acts.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger 31from acts.test_utils.bt.loggers.protos import bluetooth_metric_pb2 as proto_module 32from acts.utils import set_location_service 33 34 35class BluetoothLatencyTest(BaseTestClass): 36 """Connects two Android phones and tests the RFCOMM latency. 37 38 Attributes: 39 client_device: An Android device object that will be sending data 40 server_device: An Android device object that will be receiving data 41 bt_logger: The proxy logger instance for each test case 42 data_transfer_type: Data transfer protocol used for the test 43 """ 44 45 def setup_class(self): 46 super().setup_class() 47 48 # Sanity check of the devices under test 49 # TODO(b/119051823): Investigate using a config validator to replace this. 50 if len(self.android_devices) < 2: 51 raise ValueError( 52 'Not enough android phones detected (need at least two)') 53 54 # Data will be sent from the client_device to the server_device 55 self.client_device = self.android_devices[0] 56 self.server_device = self.android_devices[1] 57 self.bt_logger = BluetoothMetricLogger.for_test_case() 58 self.data_transfer_type = proto_module.BluetoothDataTestResult.RFCOMM 59 self.log.info('Successfully found required devices.') 60 61 def setup_test(self): 62 setup_multiple_devices_for_bt_test(self.android_devices) 63 self._connect_rfcomm() 64 65 def teardown_test(self): 66 if verify_server_and_client_connected( 67 self.client_device, self.server_device, log=False): 68 self.client_device.droid.bluetoothSocketConnStop() 69 self.server_device.droid.bluetoothSocketConnStop() 70 71 def _connect_rfcomm(self): 72 """Establishes an RFCOMM connection between two phones. 73 74 Connects the client device to the server device given the hardware 75 address of the server device. 76 """ 77 78 set_location_service(self.client_device, True) 79 set_location_service(self.server_device, True) 80 server_address = self.server_device.droid.bluetoothGetLocalAddress() 81 self.log.info('Pairing and connecting devices') 82 asserts.assert_true(self.client_device.droid 83 .bluetoothDiscoverAndBond(server_address), 84 'Failed to pair and connect devices') 85 86 # Create RFCOMM connection 87 asserts.assert_true(orchestrate_rfcomm_connection 88 (self.client_device, self.server_device), 89 'Failed to establish RFCOMM connection') 90 91 def _measure_latency(self): 92 """Measures the latency of data transfer over RFCOMM. 93 94 Sends data from the client device that is read by the server device. 95 Calculates the latency of the transfer. 96 97 Returns: 98 The latency of the transfer milliseconds. 99 """ 100 101 # Generates a random message to transfer 102 message = (''.join(random.choice(string.ascii_letters + string.digits) 103 for _ in range(6))) 104 105 start_time = time.perf_counter() 106 write_read_successful = write_read_verify_data(self.client_device, 107 self.server_device, 108 message, 109 False) 110 end_time = time.perf_counter() 111 asserts.assert_true(write_read_successful, 112 'Failed to send/receive message') 113 return (end_time - start_time) * 1000 114 115 @BluetoothBaseTest.bt_test_wrap 116 @test_tracker_info(uuid='7748295d-204e-4ad0-adf5-7591380b940a') 117 def test_bluetooth_latency(self): 118 """Tests the latency for a data transfer over RFCOMM""" 119 120 metrics = {} 121 latency_list = [] 122 123 for _ in range(300): 124 latency_list.append(self._measure_latency()) 125 126 metrics['data_transfer_protocol'] = self.data_transfer_type 127 metrics['data_latency_min_millis'] = int(min(latency_list)) 128 metrics['data_latency_max_millis'] = int(max(latency_list)) 129 metrics['data_latency_avg_millis'] = int(statistics.mean(latency_list)) 130 self.log.info('Latency: {}'.format(metrics)) 131 132 proto = self.bt_logger.get_results(metrics, 133 self.__class__.__name__, 134 self.server_device, 135 self.client_device) 136 137 asserts.assert_true(metrics['data_latency_min_millis'] > 0, 138 'Minimum latency must be greater than 0!', 139 extras=proto) 140 141 raise TestPass('Latency test completed successfully', extras=proto) 142