1"""Tests for blueberry.pbap.bluetooth_pbap."""
2
3import os
4import random
5import time
6
7from mobly import asserts
8from mobly import test_runner
9from mobly import signals
10from mobly import utils
11
12from mobly.controllers import android_device
13
14from blueberry.utils import blueberry_ui_base_test
15from blueberry.utils import bt_constants
16from blueberry.utils import bt_test_utils
17
18# The path is used to place the created vcf files.
19STORAGE_PATH = '/storage/emulated/0'
20
21# URI for contacts database.
22CONTACTS_URI = 'content://com.android.contacts/data/phones'
23
24# Number of seconds to wait for contacts and call logs update.
25WAITING_TIMEOUT_SEC = 60
26
27# Number of contacts and call logs to be tested.
28TEST_DATA_COUNT = 1000
29
30# Permissions for Contacts app.
31PERMISSION_LIST = [
32    'android.permission.READ_CONTACTS',
33    'android.permission.WRITE_CONTACTS',
34]
35
36
37class BluetoothPbapTest(blueberry_ui_base_test.BlueberryUiBaseTest):
38  """Test Class for Bluetooth PBAP Test."""
39
40  def __init__(self, configs):
41    super().__init__(configs)
42    self.derived_bt_device = None
43    self.pri_phone = None
44    self.pse_mac_address = None
45
46  def setup_class(self):
47    """Standard Mobly setup class."""
48    super(BluetoothPbapTest, self).setup_class()
49
50    # Bluetooth carkit which role is Phone Book Client Equipment (PCE).
51    self.derived_bt_device = self.derived_bt_devices[0]
52
53    # Primary phone which role is Phone Book Server Equipment (PSE).
54    self.pri_phone = self.android_devices[0]
55    self.pri_phone.init_setup()
56    self.pri_phone.sl4a_setup()
57    self.derived_bt_device.add_sec_ad_device(self.pri_phone)
58
59    # Grant the permissions to Contacts app.
60    for device in [self.pri_phone, self.derived_bt_device]:
61      required_permissions = PERMISSION_LIST
62      # App requires READ_EXTERNAL_STORAGE to read contacts if SDK < 30.
63      if int(device.build_info['build_version_sdk']) < 30:
64        required_permissions.append('android.permission.READ_EXTERNAL_STORAGE')
65      for permission in required_permissions:
66        device.adb.shell('pm grant com.google.android.contacts %s' % permission)
67    self.pse_mac_address = self.pri_phone.get_bluetooth_mac_address()
68    mac_address = self.derived_bt_device.get_bluetooth_mac_address()
69    self.derived_bt_device.activate_pairing_mode()
70    self.pri_phone.pair_and_connect_bluetooth(mac_address)
71    # Sleep until the connection stabilizes.
72    time.sleep(5)
73
74    # Allow permission access for PBAP profile.
75    self.pri_phone.sl4a.bluetoothChangeProfileAccessPermission(
76        mac_address,
77        bt_constants.BluetoothProfile.PBAP.value,
78        bt_constants.BluetoothAccessLevel.ACCESS_ALLOWED.value)
79
80  def setup_test(self):
81    super(BluetoothPbapTest, self).setup_test()
82    # Make sure PBAP is not connected before running tests.
83    self._terminate_pbap_connection()
84
85  def _import_vcf_to_pse(self, file_name, expected_contact_count):
86    """Imports the vcf file to PSE."""
87    # Open ImportVcardActivity and click "OK" in the pop-up dialog, then
88    # PickActivity will be launched and browses the existing vcf files.
89    self.pri_phone.adb.shell(
90        'am start com.google.android.contacts/'
91        'com.google.android.apps.contacts.vcard.ImportVCardActivity')
92    self.pri_phone.aud(text='OK').click()
93
94    # Check if the vcf file appears in the PickActivity.
95    if not self.pri_phone.aud(text=file_name).exists():
96      raise android_device.DeviceError(
97          self.pri_phone,
98          'No file name matches "%s" in PickActivity.' % file_name)
99
100    # TODO(user): Remove the check of code name for S build.
101    if (self.pri_phone.build_info['build_version_codename'] != 'S' and
102        int(self.pri_phone.build_info['build_version_sdk']) <= 30):
103      # Since `adb shell input tap` cannot work in PickActivity before R build,
104      # send TAB and ENETER Key events to select and import the vcf file.
105      if self.pri_phone.aud(content_desc='Grid view').exists():
106        # Switch Grid mode since ENTER Key event cannot work in List mode on
107        # git_rvc-d2-release branch.
108        self.pri_phone.aud(content_desc='Grid view').click()
109      self.pri_phone.aud.send_key_code('KEYCODE_TAB')
110      self.pri_phone.aud.send_key_code('KEYCODE_ENTER')
111    else:
112      self.pri_phone.aud(text=file_name).click()
113    self.pri_phone.log.info('Importing "%s"...' % file_name)
114    current_count = self._wait_and_get_contact_count(
115        self.pri_phone, expected_contact_count, WAITING_TIMEOUT_SEC)
116    if current_count != expected_contact_count:
117      raise android_device.DeviceError(
118          self.pri_phone,
119          'Failed to import %d contact(s) within %ds. Actual count: %d' %
120          (expected_contact_count, WAITING_TIMEOUT_SEC, current_count))
121    self.pri_phone.log.info(
122        'Successfully added %d contact(s).' % current_count)
123
124  def _generate_contacts_on_pse(self,
125                                num_of_contacts,
126                                first_name=None,
127                                last_name=None,
128                                phone_number=None):
129    """Generates contacts to be tested on PSE."""
130    vcf_file = bt_test_utils.create_vcf_from_vcard(
131        output_path=self.pri_phone.log_path,
132        num_of_contacts=num_of_contacts,
133        first_name=first_name,
134        last_name=last_name,
135        phone_number=phone_number)
136    self.pri_phone.adb.push([vcf_file, STORAGE_PATH])
137    # For R build, since the pushed vcf file probably not found when importing
138    # contacts, do a media scan to recognize the file.
139    if int(self.pri_phone.build_info['build_version_sdk']) > 29:
140      self.pri_phone.adb.shell('content call --uri content://media/ --method '
141                               'scan_volume --arg external_primary')
142    file_name = vcf_file.split('/')[-1]
143    self._import_vcf_to_pse(file_name, num_of_contacts)
144    self.pri_phone.adb.shell(
145        'rm -rf %s' % os.path.join(STORAGE_PATH, file_name))
146
147  def _generate_call_logs_on_pse(self, call_log_type, num_of_call_logs):
148    """Generates call logs to be tested on PSE."""
149    self.pri_phone.log.info('Putting %d call log(s) which type are "%s"...' %
150                            (num_of_call_logs, call_log_type))
151    for _ in range(num_of_call_logs):
152      self.pri_phone.sl4a.callLogsPut(dict(
153          type=call_log_type,
154          number='8809%d' % random.randrange(int(10e8)),
155          time=int(1000 * float(self.pri_phone.adb.shell('date +%s.%N')))))
156    current_count = self._wait_and_get_call_log_count(
157        self.pri_phone,
158        call_log_type,
159        num_of_call_logs,
160        WAITING_TIMEOUT_SEC)
161    if current_count != num_of_call_logs:
162      raise android_device.DeviceError(
163          self.pri_phone,
164          'Failed to generate %d call log(s) within %ds. '
165          'Actual count: %d, Call log type: %s' %
166          (num_of_call_logs, WAITING_TIMEOUT_SEC, current_count, call_log_type))
167    self.pri_phone.log.info(
168        'Successfully added %d call log(s).' % current_count)
169
170  def _wait_and_get_contact_count(self,
171                                  device,
172                                  expected_contact_count,
173                                  timeout_sec):
174    """Waits for contact update for a period time and returns contact count.
175
176    This method should be used when a device imports some new contacts. It can
177    wait some time for contact update until expectation or timeout and then
178    return contact count.
179
180    Args:
181      device: AndroidDevice, Mobly Android controller class.
182      expected_contact_count: Int, Number of contacts as expected.
183      timeout_sec: Int, Number of seconds to wait for contact update.
184
185    Returns:
186      current_count: Int, number of the existing contacts on the device.
187    """
188    start_time = time.time()
189    end_time = start_time + timeout_sec
190    current_count = 0
191    while time.time() < end_time:
192      current_count = device.sl4a.contactsGetCount()
193      if current_count == expected_contact_count:
194        break
195      # Interval between attempts to get contacts.
196      time.sleep(1)
197    if current_count != expected_contact_count:
198      device.log.warning(
199          'Failed to get expected contact count: %d. '
200          'Actual contact count: %d.' %
201          (expected_contact_count, current_count))
202    return current_count
203
204  def _wait_and_get_call_log_count(self,
205                                   device,
206                                   call_log_type,
207                                   expected_call_log_count,
208                                   timeout_sec):
209    """Waits for call log update for a period time and returns call log count.
210
211    This method should be used when a device adds some new call logs. It can
212    wait some time for call log update until expectation or timeout and then
213    return call log count.
214
215    Args:
216      device: AndroidDevice, Mobly Android controller class.
217      call_log_type: String, Type of the call logs.
218      expected_call_log_count: Int, Number of call logs as expected.
219      timeout_sec: Int, Number of seconds to wait for call log update.
220
221    Returns:
222      current_count: Int, number of the existing call logs on the device.
223    """
224    start_time = time.time()
225    end_time = start_time + timeout_sec
226    current_count = 0
227    while time.time() < end_time:
228      current_count = len(device.sl4a.callLogsGet(call_log_type))
229      if current_count == expected_call_log_count:
230        break
231      # Interval between attempts to get call logs.
232      time.sleep(1)
233    if current_count != expected_call_log_count:
234      device.log.warning(
235          'Failed to get expected call log count: %d. '
236          'Actual call log count: %d.' %
237          (expected_call_log_count, current_count))
238    return current_count
239
240  def _terminate_pbap_connection(self):
241    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
242        self.pse_mac_address)
243    if status == bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
244      return
245    self.derived_bt_device.log.info('Disconnecting PBAP...')
246    self.derived_bt_device.sl4a.bluetoothPbapClientDisconnect(
247        self.pse_mac_address)
248    # Buffer for the connection status check.
249    time.sleep(3)
250    status = self.derived_bt_device.sl4a.bluetoothPbapClientGetConnectionStatus(
251        self.pse_mac_address)
252    if status != bt_constants.BluetoothConnectionStatus.STATE_DISCONNECTED:
253      raise signals.TestError('PBAP connection failed to be terminated.')
254    self.derived_bt_device.log.info('Successfully disconnected PBAP.')
255
256  def test_download_contacts(self):
257    """Test for the feature of downloading contacts.
258
259    Tests that PCE can download contacts from PSE.
260    """
261    # Make sure no any contacts exist on the devices.
262    for device in [self.pri_phone, self.derived_bt_device]:
263      device.sl4a.contactsEraseAll()
264
265    # Add contacts to PSE.
266    self._generate_contacts_on_pse(TEST_DATA_COUNT)
267
268    # When PCE is connected to PSE, it will download PSE's contacts.
269    self.derived_bt_device.pbap_connect()
270    self.derived_bt_device.log.info('Downloading contacts from PSE...')
271    current_count = self._wait_and_get_contact_count(
272        self.derived_bt_device, TEST_DATA_COUNT, WAITING_TIMEOUT_SEC)
273    self.derived_bt_device.log.info(
274        'Successfully downloaded %d contact(s).' % current_count)
275
276    asserts.assert_true(
277        current_count == TEST_DATA_COUNT,
278        'PCE failed to download %d contact(s) within %ds, '
279        'actually downloaded %d contact(s).' %
280        (TEST_DATA_COUNT, WAITING_TIMEOUT_SEC, current_count))
281
282  def test_download_call_logs(self):
283    """Test for the feature of downloading call logs.
284
285    Tests that PCE can download incoming/outgoing/missed call logs from PSE.
286    """
287    # Make sure no any call logs exist on the devices.
288    for device in [self.pri_phone, self.derived_bt_device]:
289      device.sl4a.callLogsEraseAll()
290
291    call_log_types = [
292        bt_constants.INCOMING_CALL_LOG_TYPE,
293        bt_constants.OUTGOING_CALL_LOG_TYPE,
294        bt_constants.MISSED_CALL_LOG_TYPE,
295    ]
296    for call_log_type in call_log_types:
297      # Add call logs to PSE.
298      self._generate_call_logs_on_pse(call_log_type, TEST_DATA_COUNT)
299
300    # When PCE is connected to PSE, it will download PSE's contacts.
301    self.derived_bt_device.pbap_connect()
302    self.derived_bt_device.log.info('Downloading call logs...')
303
304    for call_log_type in call_log_types:
305      current_count = self._wait_and_get_call_log_count(
306          self.derived_bt_device,
307          call_log_type,
308          TEST_DATA_COUNT,
309          WAITING_TIMEOUT_SEC)
310      self.derived_bt_device.log.info(
311          'Successfully downloaded %d call log(s) which type are "%s".' %
312          (current_count, call_log_type))
313
314      asserts.assert_true(
315          current_count == TEST_DATA_COUNT,
316          'PCE failed to download %d call log(s) which type are "%s" within %ds'
317          ', actually downloaded %d call log(s).' %
318          (TEST_DATA_COUNT, call_log_type, WAITING_TIMEOUT_SEC, current_count))
319
320  def test_show_caller_name(self):
321    """Test for caller name of the incoming phone call is correct on PCE.
322
323    Tests that caller name matches contact name which is downloaded via PBAP.
324    """
325    # Checks if two android devices exist.
326    if len(self.android_devices) < 2:
327      raise signals.TestError('This test requires two Android devices.')
328    primary_phone = self.pri_phone
329    secondary_phone = self.android_devices[1]
330    secondary_phone.init_setup()
331    for phone in [primary_phone, secondary_phone]:
332      # Checks if SIM state is loaded for every devices.
333      if not phone.is_sim_state_loaded():
334        raise signals.TestError(f'Please insert a SIM Card to the phone '
335                                f'"{phone.serial}".')
336      # Checks if phone_number is provided in the support dimensions.
337      phone.phone_number = phone.dimensions.get('phone_number')
338      if not phone.phone_number:
339        raise signals.TestError(f'Please add "phone_number" to support '
340                                f'dimensions of the phone "{phone.serial}".')
341    # Make sure no any contacts exist on the devices.
342    for device in [primary_phone, self.derived_bt_device]:
343      device.sl4a.contactsEraseAll()
344    # Generate a contact name randomly.
345    first_name = utils.rand_ascii_str(4)
346    last_name = utils.rand_ascii_str(4)
347    full_name = f'{first_name} {last_name}'
348    primary_phone.log.info('Creating a contact "%s"...', full_name)
349    self._generate_contacts_on_pse(
350        num_of_contacts=1,
351        first_name=first_name,
352        last_name=last_name,
353        phone_number=secondary_phone.phone_number)
354    self.derived_bt_device.log.info('Connecting to PSE...')
355    self.derived_bt_device.pbap_connect()
356    self.derived_bt_device.log.info('Downloading contacts from PSE...')
357    current_count = self._wait_and_get_contact_count(
358        device=self.derived_bt_device,
359        expected_contact_count=1,
360        timeout_sec=WAITING_TIMEOUT_SEC)
361    self.derived_bt_device.log.info('Successfully downloaded %d contact(s).',
362                                    current_count)
363    asserts.assert_equal(
364        first=current_count,
365        second=1,
366        msg=f'Failed to download the contact "{full_name}".')
367    secondary_phone.sl4a.telecomCallNumber(primary_phone.phone_number)
368    secondary_phone.log.info('Made a phone call to device "%s".',
369                             primary_phone.serial)
370    primary_phone.log.info('Waiting for the incoming call from device "%s"...',
371                           secondary_phone.serial)
372    is_ringing = primary_phone.wait_for_call_state(
373        bt_constants.CALL_STATE_RINGING,
374        bt_constants.CALL_STATE_TIMEOUT_SEC)
375    if not is_ringing:
376      raise signals.TestError(
377          f'Timed out after {bt_constants.CALL_STATE_TIMEOUT_SEC}s waiting for '
378          f'the incoming call from device "{secondary_phone.serial}".')
379    try:
380      self.derived_bt_device.aud.open_notification()
381      hfp_address = primary_phone.get_bluetooth_mac_address()
382      if not self.derived_bt_device.aud(
383          text=f'Incoming call via HFP {hfp_address}').exists():
384        raise signals.TestError('The incoming call was not received from '
385                                'the Handsfree device side.')
386      # Asserts that caller name of the incoming phone call is correct in the
387      # notification bar.
388      asserts.assert_true(
389          self.derived_bt_device.aud(text=full_name).exists(),
390          f'Caller name is incorrect. Expectation: "{full_name}"')
391    finally:
392      # Takes a screenshot for debugging.
393      self.derived_bt_device.take_screenshot(self.derived_bt_device.log_path)
394      # Recovery actions.
395      self.derived_bt_device.aud.close_notification()
396      secondary_phone.sl4a.telecomEndCall()
397
398
399if __name__ == '__main__':
400  test_runner.main()
401