1"""Tests for blueberry.pbap.bluetooth_pbap.""" 2 3import os 4import random 5import time 6 7from mobly import asserts 8from mobly import test_runner 9from mobly import signals 10from mobly import utils 11 12from mobly.controllers import android_device 13 14from blueberry.utils import blueberry_ui_base_test 15from blueberry.utils import bt_constants 16from blueberry.utils import bt_test_utils 17 18# The path is used to place the created vcf files. 19STORAGE_PATH = '/storage/emulated/0' 20 21# URI for contacts database. 22CONTACTS_URI = 'content://com.android.contacts/data/phones' 23 24# Number of seconds to wait for contacts and call logs update. 25WAITING_TIMEOUT_SEC = 60 26 27# Number of contacts and call logs to be tested. 28TEST_DATA_COUNT = 1000 29 30# Permissions for Contacts app. 31PERMISSION_LIST = [ 32 'android.permission.READ_CONTACTS', 33 'android.permission.WRITE_CONTACTS', 34] 35 36 37class BluetoothPbapTest(blueberry_ui_base_test.BlueberryUiBaseTest): 38 """Test Class for Bluetooth PBAP Test.""" 39 40 def __init__(self, configs): 41 super().__init__(configs) 42 self.derived_bt_device = None 43 self.pri_phone = None 44 self.pse_mac_address = None 45 46 def setup_class(self): 47 """Standard Mobly setup class.""" 48 super(BluetoothPbapTest, self).setup_class() 49 50 # Bluetooth carkit which role is Phone Book Client Equipment (PCE). 51 self.derived_bt_device = self.derived_bt_devices[0] 52 53 # Primary phone which role is Phone Book Server Equipment (PSE). 54 self.pri_phone = self.android_devices[0] 55 self.pri_phone.init_setup() 56 self.pri_phone.sl4a_setup() 57 self.derived_bt_device.add_sec_ad_device(self.pri_phone) 58 59 # Grant the permissions to Contacts app. 60 for device in [self.pri_phone, self.derived_bt_device]: 61 required_permissions = PERMISSION_LIST 62 # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30. 63 if int(device.build_info['build_version_sdk']) < 30: 64 required_permissions.append('android.permission.READ_EXTERNAL_STORAGE') 65 for permission in required_permissions: 66 device.adb.shell('pm grant com.google.android.contacts %s' % permission) 67 self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address() 68 mac_address = self.derived_bt_device.get_bluetooth_mac_address() 69 self.derived_bt_device.activate_pairing_mode() 70 self.pri_phone.pair_and_connect_bluetooth(mac_address) 71 # Sleep until the connection stabilizes. 72 time.sleep(5) 73 74 # Allow permission access for PBAP profile. 75 self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission( 76 mac_address, 77 bt_constants.BluetoothProfile.PBAP.value, 78 bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value) 79 80 def setup_test(self): 81 super(BluetoothPbapTest, self).setup_test() 82 # Make sure PBAP is not connected before running tests. 83 self._terminate_pbap_connection() 84 85 def _import_vcf_to_pse(self, file_name, expected_contact_count): 86 """Imports the vcf file to PSE.""" 87 # Open ImportVcardActivity and click "OK" in the pop-up dialog, then 88 # PickActivity will be launched and browses the existing vcf files. 89 self.pri_phone.adb.shell( 90 'am start com.google.android.contacts/' 91 'com.google.android.apps.contacts.vcard.ImportVCardActivity') 92 self.pri_phone.aud(text='OK').click() 93 94 # Check if the vcf file appears in the PickActivity. 95 if not self.pri_phone.aud(text=file_name).exists(): 96 raise android_device.DeviceError( 97 self.pri_phone, 98 'No file name matches "%s" in PickActivity.' % file_name) 99 100 # TODO(user): Remove the check of code name for S build. 101 if (self.pri_phone.build_info['build_version_codename'] != 'S' and 102 int(self.pri_phone.build_info['build_version_sdk']) <= 30): 103 # Since `adb shell input tap` cannot work in PickActivity before R build, 104 # send TAB and ENETER Key events to select and import the vcf file. 105 if self.pri_phone.aud(content_desc='Grid view').exists(): 106 # Switch Grid mode since ENTER Key event cannot work in List mode on 107 # git_rvc-d2-release branch. 108 self.pri_phone.aud(content_desc='Grid view').click() 109 self.pri_phone.aud.send_key_code('KEYCODE_TAB') 110 self.pri_phone.aud.send_key_code('KEYCODE_ENTER') 111 else: 112 self.pri_phone.aud(text=file_name).click() 113 self.pri_phone.log.info('Importing "%s"...' % file_name) 114 current_count = self._wait_and_get_contact_count( 115 self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC) 116 if current_count != expected_contact_count: 117 raise android_device.DeviceError( 118 self.pri_phone, 119 'Failed to import %d contact(s) within %ds. Actual count: %d' % 120 (expected_contact_count, WAITING_TIMEOUT_SEC, current_count)) 121 self.pri_phone.log.info( 122 'Successfully added %d contact(s).' % current_count) 123 124 def _generate_contacts_on_pse(self, 125 num_of_contacts, 126 first_name=None, 127 last_name=None, 128 phone_number=None): 129 """Generates contacts to be tested on PSE.""" 130 vcf_file = bt_test_utils.create_vcf_from_vcard( 131 output_path=self.pri_phone.log_path, 132 num_of_contacts=num_of_contacts, 133 first_name=first_name, 134 last_name=last_name, 135 phone_number=phone_number) 136 self.pri_phone.adb.push([vcf_file, STORAGE_PATH]) 137 # For R build, since the pushed vcf file probably not found when importing 138 # contacts, do a media scan to recognize the file. 139 if int(self.pri_phone.build_info['build_version_sdk']) > 29: 140 self.pri_phone.adb.shell('content call --uri content://media/ --method ' 141 'scan_volume --arg external_primary') 142 file_name = vcf_file.split('/')[-1] 143 self._import_vcf_to_pse(file_name, num_of_contacts) 144 self.pri_phone.adb.shell( 145 'rm -rf %s' % os.path.join(STORAGE_PATH, file_name)) 146 147 def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs): 148 """Generates call logs to be tested on PSE.""" 149 self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' % 150 (num_of_call_logs, call_log_type)) 151 for _ in range(num_of_call_logs): 152 self.pri_phone.sl4a.callLogsPut(dict( 153 type=call_log_type, 154 number='8809%d' % random.randrange(int(10e8)), 155 time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N'))))) 156 current_count = self._wait_and_get_call_log_count( 157 self.pri_phone, 158 call_log_type, 159 num_of_call_logs, 160 WAITING_TIMEOUT_SEC) 161 if current_count != num_of_call_logs: 162 raise android_device.DeviceError( 163 self.pri_phone, 164 'Failed to generate %d call log(s) within %ds. ' 165 'Actual count: %d, Call log type: %s' % 166 (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type)) 167 self.pri_phone.log.info( 168 'Successfully added %d call log(s).' % current_count) 169 170 def _wait_and_get_contact_count(self, 171 device, 172 expected_contact_count, 173 timeout_sec): 174 """Waits for contact update for a period time and returns contact count. 175 176 This method should be used when a device imports some new contacts. It can 177 wait some time for contact update until expectation or timeout and then 178 return contact count. 179 180 Args: 181 device: AndroidDevice, Mobly Android controller class. 182 expected_contact_count: Int, Number of contacts as expected. 183 timeout_sec: Int, Number of seconds to wait for contact update. 184 185 Returns: 186 current_count: Int, number of the existing contacts on the device. 187 """ 188 start_time = time.time() 189 end_time = start_time + timeout_sec 190 current_count = 0 191 while time.time() < end_time: 192 current_count = device.sl4a.contactsGetCount() 193 if current_count == expected_contact_count: 194 break 195 # Interval between attempts to get contacts. 196 time.sleep(1) 197 if current_count != expected_contact_count: 198 device.log.warning( 199 'Failed to get expected contact count: %d. ' 200 'Actual contact count: %d.' % 201 (expected_contact_count, current_count)) 202 return current_count 203 204 def _wait_and_get_call_log_count(self, 205 device, 206 call_log_type, 207 expected_call_log_count, 208 timeout_sec): 209 """Waits for call log update for a period time and returns call log count. 210 211 This method should be used when a device adds some new call logs. It can 212 wait some time for call log update until expectation or timeout and then 213 return call log count. 214 215 Args: 216 device: AndroidDevice, Mobly Android controller class. 217 call_log_type: String, Type of the call logs. 218 expected_call_log_count: Int, Number of call logs as expected. 219 timeout_sec: Int, Number of seconds to wait for call log update. 220 221 Returns: 222 current_count: Int, number of the existing call logs on the device. 223 """ 224 start_time = time.time() 225 end_time = start_time + timeout_sec 226 current_count = 0 227 while time.time() < end_time: 228 current_count = len(device.sl4a.callLogsGet(call_log_type)) 229 if current_count == expected_call_log_count: 230 break 231 # Interval between attempts to get call logs. 232 time.sleep(1) 233 if current_count != expected_call_log_count: 234 device.log.warning( 235 'Failed to get expected call log count: %d. ' 236 'Actual call log count: %d.' % 237 (expected_call_log_count, current_count)) 238 return current_count 239 240 def _terminate_pbap_connection(self): 241 status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus( 242 self.pse_mac_address) 243 if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED: 244 return 245 self.derived_bt_device.log.info('Disconnecting PBAP...') 246 self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect( 247 self.pse_mac_address) 248 # Buffer for the connection status check. 249 time.sleep(3) 250 status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus( 251 self.pse_mac_address) 252 if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED: 253 raise signals.TestError('PBAP connection failed to be terminated.') 254 self.derived_bt_device.log.info('Successfully disconnected PBAP.') 255 256 def test_download_contacts(self): 257 """Test for the feature of downloading contacts. 258 259 Tests that PCE can download contacts from PSE. 260 """ 261 # Make sure no any contacts exist on the devices. 262 for device in [self.pri_phone, self.derived_bt_device]: 263 device.sl4a.contactsEraseAll() 264 265 # Add contacts to PSE. 266 self._generate_contacts_on_pse(TEST_DATA_COUNT) 267 268 # When PCE is connected to PSE, it will download PSE's contacts. 269 self.derived_bt_device.pbap_connect() 270 self.derived_bt_device.log.info('Downloading contacts from PSE...') 271 current_count = self._wait_and_get_contact_count( 272 self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC) 273 self.derived_bt_device.log.info( 274 'Successfully downloaded %d contact(s).' % current_count) 275 276 asserts.assert_true( 277 current_count == TEST_DATA_COUNT, 278 'PCE failed to download %d contact(s) within %ds, ' 279 'actually downloaded %d contact(s).' % 280 (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count)) 281 282 def test_download_call_logs(self): 283 """Test for the feature of downloading call logs. 284 285 Tests that PCE can download incoming/outgoing/missed call logs from PSE. 286 """ 287 # Make sure no any call logs exist on the devices. 288 for device in [self.pri_phone, self.derived_bt_device]: 289 device.sl4a.callLogsEraseAll() 290 291 call_log_types = [ 292 bt_constants.INCOMING_CALL_LOG_TYPE, 293 bt_constants.OUTGOING_CALL_LOG_TYPE, 294 bt_constants.MISSED_CALL_LOG_TYPE, 295 ] 296 for call_log_type in call_log_types: 297 # Add call logs to PSE. 298 self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT) 299 300 # When PCE is connected to PSE, it will download PSE's contacts. 301 self.derived_bt_device.pbap_connect() 302 self.derived_bt_device.log.info('Downloading call logs...') 303 304 for call_log_type in call_log_types: 305 current_count = self._wait_and_get_call_log_count( 306 self.derived_bt_device, 307 call_log_type, 308 TEST_DATA_COUNT, 309 WAITING_TIMEOUT_SEC) 310 self.derived_bt_device.log.info( 311 'Successfully downloaded %d call log(s) which type are "%s".' % 312 (current_count, call_log_type)) 313 314 asserts.assert_true( 315 current_count == TEST_DATA_COUNT, 316 'PCE failed to download %d call log(s) which type are "%s" within %ds' 317 ', actually downloaded %d call log(s).' % 318 (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count)) 319 320 def test_show_caller_name(self): 321 """Test for caller name of the incoming phone call is correct on PCE. 322 323 Tests that caller name matches contact name which is downloaded via PBAP. 324 """ 325 # Checks if two android devices exist. 326 if len(self.android_devices) < 2: 327 raise signals.TestError('This test requires two Android devices.') 328 primary_phone = self.pri_phone 329 secondary_phone = self.android_devices[1] 330 secondary_phone.init_setup() 331 for phone in [primary_phone, secondary_phone]: 332 # Checks if SIM state is loaded for every devices. 333 if not phone.is_sim_state_loaded(): 334 raise signals.TestError(f'Please insert a SIM Card to the phone ' 335 f'"{phone.serial}".') 336 # Checks if phone_number is provided in the support dimensions. 337 phone.phone_number = phone.dimensions.get('phone_number') 338 if not phone.phone_number: 339 raise signals.TestError(f'Please add "phone_number" to support ' 340 f'dimensions of the phone "{phone.serial}".') 341 # Make sure no any contacts exist on the devices. 342 for device in [primary_phone, self.derived_bt_device]: 343 device.sl4a.contactsEraseAll() 344 # Generate a contact name randomly. 345 first_name = utils.rand_ascii_str(4) 346 last_name = utils.rand_ascii_str(4) 347 full_name = f'{first_name} {last_name}' 348 primary_phone.log.info('Creating a contact "%s"...', full_name) 349 self._generate_contacts_on_pse( 350 num_of_contacts=1, 351 first_name=first_name, 352 last_name=last_name, 353 phone_number=secondary_phone.phone_number) 354 self.derived_bt_device.log.info('Connecting to PSE...') 355 self.derived_bt_device.pbap_connect() 356 self.derived_bt_device.log.info('Downloading contacts from PSE...') 357 current_count = self._wait_and_get_contact_count( 358 device=self.derived_bt_device, 359 expected_contact_count=1, 360 timeout_sec=WAITING_TIMEOUT_SEC) 361 self.derived_bt_device.log.info('Successfully downloaded %d contact(s).', 362 current_count) 363 asserts.assert_equal( 364 first=current_count, 365 second=1, 366 msg=f'Failed to download the contact "{full_name}".') 367 secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number) 368 secondary_phone.log.info('Made a phone call to device "%s".', 369 primary_phone.serial) 370 primary_phone.log.info('Waiting for the incoming call from device "%s"...', 371 secondary_phone.serial) 372 is_ringing = primary_phone.wait_for_call_state( 373 bt_constants.CALL_STATE_RINGING, 374 bt_constants.CALL_STATE_TIMEOUT_SEC) 375 if not is_ringing: 376 raise signals.TestError( 377 f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for ' 378 f'the incoming call from device "{secondary_phone.serial}".') 379 try: 380 self.derived_bt_device.aud.open_notification() 381 hfp_address = primary_phone.get_bluetooth_mac_address() 382 if not self.derived_bt_device.aud( 383 text=f'Incoming call via HFP {hfp_address}').exists(): 384 raise signals.TestError('The incoming call was not received from ' 385 'the Handsfree device side.') 386 # Asserts that caller name of the incoming phone call is correct in the 387 # notification bar. 388 asserts.assert_true( 389 self.derived_bt_device.aud(text=full_name).exists(), 390 f'Caller name is incorrect. Expectation: "{full_name}"') 391 finally: 392 # Takes a screenshot for debugging. 393 self.derived_bt_device.take_screenshot(self.derived_bt_device.log_path) 394 # Recovery actions. 395 self.derived_bt_device.aud.close_notification() 396 secondary_phone.sl4a.telecomEndCall() 397 398 399if __name__ == '__main__': 400 test_runner.main() 401