1"""Tests for blueberry.pbap.bluetooth_pbap."""
3import os
4import random
5import time
7from mobly import asserts
8from mobly import test_runner
9from mobly import signals
10from mobly import utils
12from mobly.controllers import android_device
14from blueberry.utils import blueberry_ui_base_test
15from blueberry.utils import bt_constants
16from blueberry.utils import bt_test_utils
18# The path is used to place the created vcf files.
19STORAGE_PATH = '/storage/emulated/0'
21# URI for contacts database.
22CONTACTS_URI = 'content://com.android.contacts/data/phones'
24# Number of seconds to wait for contacts and call logs update.
27# Number of contacts and call logs to be tested.
30# Permissions for Contacts app.
32    'android.permission.READ_CONTACTS',
33    'android.permission.WRITE_CONTACTS',
37class BluetoothPbapTest(blueberry_ui_base_test.BlueberryUiBaseTest):
38  """Test Class for Bluetooth PBAP Test."""
40  def __init__(self, configs):
41    super().__init__(configs)
42    self.derived_bt_device = None
43    self.pri_phone = None
44    self.pse_mac_address = None
46  def setup_class(self):
47    """Standard Mobly setup class."""
48    super(BluetoothPbapTest, self).setup_class()
50    # Bluetooth carkit which role is Phone Book Client Equipment (PCE).
51    self.derived_bt_device = self.derived_bt_devices[0]
53    # Primary phone which role is Phone Book Server Equipment (PSE).
54    self.pri_phone = self.android_devices[0]
55    self.pri_phone.init_setup()
56    self.pri_phone.sl4a_setup()
57    self.derived_bt_device.add_sec_ad_device(self.pri_phone)
59    # Grant the permissions to Contacts app.
60    for device in [self.pri_phone, self.derived_bt_device]:
61      required_permissions = PERMISSION_LIST
62      # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30.
63      if int(device.build_info['build_version_sdk']) < 30:
64        required_permissions.append('android.permission.READ_EXTERNAL_STORAGE')
65      for permission in required_permissions:
66        device.adb.shell('pm grant com.google.android.contacts %s' % permission)
67    self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address()
68    mac_address = self.derived_bt_device.get_bluetooth_mac_address()
69    self.derived_bt_device.activate_pairing_mode()
70    self.pri_phone.pair_and_connect_bluetooth(mac_address)
71    # Sleep until the connection stabilizes.
72    time.sleep(5)
74    # Allow permission access for PBAP profile.
75    self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission(
76        mac_address,
77        bt_constants.BluetoothProfile.PBAP.value,
78        bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value)
80  def setup_test(self):
81    super(BluetoothPbapTest, self).setup_test()
82    # Make sure PBAP is not connected before running tests.
83    self._terminate_pbap_connection()
85  def _import_vcf_to_pse(self, file_name, expected_contact_count):
86    """Imports the vcf file to PSE."""
87    # Open ImportVcardActivity and click "OK" in the pop-up dialog, then
88    # PickActivity will be launched and browses the existing vcf files.
89    self.pri_phone.adb.shell(
90        'am start com.google.android.contacts/'
91        'com.google.android.apps.contacts.vcard.ImportVCardActivity')
92    self.pri_phone.aud(text='OK').click()
94    # Check if the vcf file appears in the PickActivity.
95    if not self.pri_phone.aud(text=file_name).exists():
96      raise android_device.DeviceError(
97          self.pri_phone,
98          'No file name matches "%s" in PickActivity.' % file_name)
100    # TODO(user): Remove the check of code name for S build.
101    if (self.pri_phone.build_info['build_version_codename'] != 'S' and
102        int(self.pri_phone.build_info['build_version_sdk']) <= 30):
103      # Since `adb shell input tap` cannot work in PickActivity before R build,
104      # send TAB and ENETER Key events to select and import the vcf file.
105      if self.pri_phone.aud(content_desc='Grid view').exists():
106        # Switch Grid mode since ENTER Key event cannot work in List mode on
107        # git_rvc-d2-release branch.
108        self.pri_phone.aud(content_desc='Grid view').click()
109      self.pri_phone.aud.send_key_code('KEYCODE_TAB')
110      self.pri_phone.aud.send_key_code('KEYCODE_ENTER')
111    else:
112      self.pri_phone.aud(text=file_name).click()
113    self.pri_phone.log.info('Importing "%s"...' % file_name)
114    current_count = self._wait_and_get_contact_count(
115        self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC)
116    if current_count != expected_contact_count:
117      raise android_device.DeviceError(
118          self.pri_phone,
119          'Failed to import %d contact(s) within %ds. Actual count: %d' %
120          (expected_contact_count, WAITING_TIMEOUT_SEC, current_count))
121    self.pri_phone.log.info(
122        'Successfully added %d contact(s).' % current_count)
124  def _generate_contacts_on_pse(self,
125                                num_of_contacts,
126                                first_name=None,
127                                last_name=None,
128                                phone_number=None):
129    """Generates contacts to be tested on PSE."""
130    vcf_file = bt_test_utils.create_vcf_from_vcard(
131        output_path=self.pri_phone.log_path,
132        num_of_contacts=num_of_contacts,
133        first_name=first_name,
134        last_name=last_name,
135        phone_number=phone_number)
136    self.pri_phone.adb.push([vcf_file, STORAGE_PATH])
137    # For R build, since the pushed vcf file probably not found when importing
138    # contacts, do a media scan to recognize the file.
139    if int(self.pri_phone.build_info['build_version_sdk']) > 29:
140      self.pri_phone.adb.shell('content call --uri content://media/ --method '
141                               'scan_volume --arg external_primary')
142    file_name = vcf_file.split('/')[-1]
143    self._import_vcf_to_pse(file_name, num_of_contacts)
144    self.pri_phone.adb.shell(
145        'rm -rf %s' % os.path.join(STORAGE_PATH, file_name))
147  def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs):
148    """Generates call logs to be tested on PSE."""
149    self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' %
150                            (num_of_call_logs, call_log_type))
151    for _ in range(num_of_call_logs):
152      self.pri_phone.sl4a.callLogsPut(dict(
153          type=call_log_type,
154          number='8809%d' % random.randrange(int(10e8)),
155          time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N')))))
156    current_count = self._wait_and_get_call_log_count(
157        self.pri_phone,
158        call_log_type,
159        num_of_call_logs,
161    if current_count != num_of_call_logs:
162      raise android_device.DeviceError(
163          self.pri_phone,
164          'Failed to generate %d call log(s) within %ds. '
165          'Actual count: %d, Call log type: %s' %
166          (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type))
167    self.pri_phone.log.info(
168        'Successfully added %d call log(s).' % current_count)
170  def _wait_and_get_contact_count(self,
171                                  device,
172                                  expected_contact_count,
173                                  timeout_sec):
174    """Waits for contact update for a period time and returns contact count.
176    This method should be used when a device imports some new contacts. It can
177    wait some time for contact update until expectation or timeout and then
178    return contact count.
180    Args:
181      device: AndroidDevice, Mobly Android controller class.
182      expected_contact_count: Int, Number of contacts as expected.
183      timeout_sec: Int, Number of seconds to wait for contact update.
185    Returns:
186      current_count: Int, number of the existing contacts on the device.
187    """
188    start_time = time.time()
189    end_time = start_time + timeout_sec
190    current_count = 0
191    while time.time() < end_time:
192      current_count = device.sl4a.contactsGetCount()
193      if current_count == expected_contact_count:
194        break
195      # Interval between attempts to get contacts.
196      time.sleep(1)
197    if current_count != expected_contact_count:
198      device.log.warning(
199          'Failed to get expected contact count: %d. '
200          'Actual contact count: %d.' %
201          (expected_contact_count, current_count))
202    return current_count
204  def _wait_and_get_call_log_count(self,
205                                   device,
206                                   call_log_type,
207                                   expected_call_log_count,
208                                   timeout_sec):
209    """Waits for call log update for a period time and returns call log count.
211    This method should be used when a device adds some new call logs. It can
212    wait some time for call log update until expectation or timeout and then
213    return call log count.
215    Args:
216      device: AndroidDevice, Mobly Android controller class.
217      call_log_type: String, Type of the call logs.
218      expected_call_log_count: Int, Number of call logs as expected.
219      timeout_sec: Int, Number of seconds to wait for call log update.
221    Returns:
222      current_count: Int, number of the existing call logs on the device.
223    """
224    start_time = time.time()
225    end_time = start_time + timeout_sec
226    current_count = 0
227    while time.time() < end_time:
228      current_count = len(device.sl4a.callLogsGet(call_log_type))
229      if current_count == expected_call_log_count:
230        break
231      # Interval between attempts to get call logs.
232      time.sleep(1)
233    if current_count != expected_call_log_count:
234      device.log.warning(
235          'Failed to get expected call log count: %d. '
236          'Actual call log count: %d.' %
237          (expected_call_log_count, current_count))
238    return current_count
240  def _terminate_pbap_connection(self):
241    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
242        self.pse_mac_address)
243    if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
244      return
245    self.derived_bt_device.log.info('Disconnecting PBAP...')
246    self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect(
247        self.pse_mac_address)
248    # Buffer for the connection status check.
249    time.sleep(3)
250    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
251        self.pse_mac_address)
252    if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
253      raise signals.TestError('PBAP connection failed to be terminated.')
254    self.derived_bt_device.log.info('Successfully disconnected PBAP.')
256  def test_download_contacts(self):
257    """Test for the feature of downloading contacts.
259    Tests that PCE can download contacts from PSE.
260    """
261    # Make sure no any contacts exist on the devices.
262    for device in [self.pri_phone, self.derived_bt_device]:
263      device.sl4a.contactsEraseAll()
265    # Add contacts to PSE.
266    self._generate_contacts_on_pse(TEST_DATA_COUNT)
268    # When PCE is connected to PSE, it will download PSE's contacts.
269    self.derived_bt_device.pbap_connect()
270    self.derived_bt_device.log.info('Downloading contacts from PSE...')
271    current_count = self._wait_and_get_contact_count(
272        self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC)
273    self.derived_bt_device.log.info(
274        'Successfully downloaded %d contact(s).' % current_count)
276    asserts.assert_true(
277        current_count == TEST_DATA_COUNT,
278        'PCE failed to download %d contact(s) within %ds, '
279        'actually downloaded %d contact(s).' %
280        (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count))
282  def test_download_call_logs(self):
283    """Test for the feature of downloading call logs.
285    Tests that PCE can download incoming/outgoing/missed call logs from PSE.
286    """
287    # Make sure no any call logs exist on the devices.
288    for device in [self.pri_phone, self.derived_bt_device]:
289      device.sl4a.callLogsEraseAll()
291    call_log_types = [
292        bt_constants.INCOMING_CALL_LOG_TYPE,
293        bt_constants.OUTGOING_CALL_LOG_TYPE,
294        bt_constants.MISSED_CALL_LOG_TYPE,
295    ]
296    for call_log_type in call_log_types:
297      # Add call logs to PSE.
298      self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT)
300    # When PCE is connected to PSE, it will download PSE's contacts.
301    self.derived_bt_device.pbap_connect()
302    self.derived_bt_device.log.info('Downloading call logs...')
304    for call_log_type in call_log_types:
305      current_count = self._wait_and_get_call_log_count(
306          self.derived_bt_device,
307          call_log_type,
308          TEST_DATA_COUNT,
310      self.derived_bt_device.log.info(
311          'Successfully downloaded %d call log(s) which type are "%s".' %
312          (current_count, call_log_type))
314      asserts.assert_true(
315          current_count == TEST_DATA_COUNT,
316          'PCE failed to download %d call log(s) which type are "%s" within %ds'
317          ', actually downloaded %d call log(s).' %
318          (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count))
320  def test_show_caller_name(self):
321    """Test for caller name of the incoming phone call is correct on PCE.
323    Tests that caller name matches contact name which is downloaded via PBAP.
324    """
325    # Checks if two android devices exist.
326    if len(self.android_devices) < 2:
327      raise signals.TestError('This test requires two Android devices.')
328    primary_phone = self.pri_phone
329    secondary_phone = self.android_devices[1]
330    secondary_phone.init_setup()
331    for phone in [primary_phone, secondary_phone]:
332      # Checks if SIM state is loaded for every devices.
333      if not phone.is_sim_state_loaded():
334        raise signals.TestError(f'Please insert a SIM Card to the phone '
335                                f'"{phone.serial}".')
336      # Checks if phone_number is provided in the support dimensions.
337      phone.phone_number = phone.dimensions.get('phone_number')
338      if not phone.phone_number:
339        raise signals.TestError(f'Please add "phone_number" to support '
340                                f'dimensions of the phone "{phone.serial}".')
341    # Make sure no any contacts exist on the devices.
342    for device in [primary_phone, self.derived_bt_device]:
343      device.sl4a.contactsEraseAll()
344    # Generate a contact name randomly.
345    first_name = utils.rand_ascii_str(4)
346    last_name = utils.rand_ascii_str(4)
347    full_name = f'{first_name} {last_name}'
348    primary_phone.log.info('Creating a contact "%s"...', full_name)
349    self._generate_contacts_on_pse(
350        num_of_contacts=1,
351        first_name=first_name,
352        last_name=last_name,
353        phone_number=secondary_phone.phone_number)
354    self.derived_bt_device.log.info('Connecting to PSE...')
355    self.derived_bt_device.pbap_connect()
356    self.derived_bt_device.log.info('Downloading contacts from PSE...')
357    current_count = self._wait_and_get_contact_count(
358        device=self.derived_bt_device,
359        expected_contact_count=1,
360        timeout_sec=WAITING_TIMEOUT_SEC)
361    self.derived_bt_device.log.info('Successfully downloaded %d contact(s).',
362                                    current_count)
363    asserts.assert_equal(
364        first=current_count,
365        second=1,
366        msg=f'Failed to download the contact "{full_name}".')
367    secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number)
368    secondary_phone.log.info('Made a phone call to device "%s".',
369                             primary_phone.serial)
370    primary_phone.log.info('Waiting for the incoming call from device "%s"...',
371                           secondary_phone.serial)
372    is_ringing = primary_phone.wait_for_call_state(
373        bt_constants.CALL_STATE_RINGING,
374        bt_constants.CALL_STATE_TIMEOUT_SEC)
375    if not is_ringing:
376      raise signals.TestError(
377          f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for '
378          f'the incoming call from device "{secondary_phone.serial}".')
379    try:
380      self.derived_bt_device.aud.open_notification()
381      hfp_address = primary_phone.get_bluetooth_mac_address()
382      if not self.derived_bt_device.aud(
383          text=f'Incoming call via HFP {hfp_address}').exists():
384        raise signals.TestError('The incoming call was not received from '
385                                'the Handsfree device side.')
386      # Asserts that caller name of the incoming phone call is correct in the
387      # notification bar.
388      asserts.assert_true(
389          self.derived_bt_device.aud(text=full_name).exists(),
390          f'Caller name is incorrect. Expectation: "{full_name}"')
391    finally:
392      # Takes a screenshot for debugging.
393      self.derived_bt_device.take_screenshot(self.derived_bt_device.log_path)
394      # Recovery actions.
395      self.derived_bt_device.aud.close_notification()
396      secondary_phone.sl4a.telecomEndCall()
399if __name__ == '__main__':
400  test_runner.main()