1# Copyright 2014 The Android Open Source Project
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
7#      http://www.apache.org/licenses/LICENSE-2.0
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15import json
16import logging
17import os
18import os.path
19import subprocess
20import sys
21import tempfile
22import time
23import yaml
25import capture_request_utils
26import camera_properties_utils
27import image_processing_utils
28import its_session_utils
30import numpy as np
32YAML_FILE_DIR = os.environ['CAMERA_ITS_TOP']
33CONFIG_FILE = os.path.join(YAML_FILE_DIR, 'config.yml')
34TEST_KEY_TABLET = 'tablet'
35TEST_KEY_SENSOR_FUSION = 'sensor_fusion'
36LOAD_SCENE_DELAY = 1  # seconds
37ACTIVITY_START_WAIT = 1.5  # seconds
43RESULT_KEY = 'result'
44SUMMARY_KEY = 'summary'
46ITS_TEST_ACTIVITY = 'com.android.cts.verifier/.camera.its.ItsTestActivity'
47ACTION_ITS_RESULT = 'com.android.cts.verifier.camera.its.ACTION_ITS_RESULT'
48EXTRA_VERSION = 'camera.its.extra.VERSION'
49CURRENT_ITS_VERSION = '1.0'  # version number to sync with CtsVerifier
50EXTRA_CAMERA_ID = 'camera.its.extra.CAMERA_ID'
51EXTRA_RESULTS = 'camera.its.extra.RESULTS'
52TIME_KEY_START = 'start'
53TIME_KEY_END = 'end'
54VALID_CONTROLLERS = ('arduino', 'canakit')
55_INT_STR_DICT = {'11': '1_1', '12': '1_2'}  # recover replaced '_' in scene def
57# All possible scenes
58# Notes on scene names:
59#   scene*_1/2/... are same scene split to load balance run times for scenes
60#   scene*_a/b/... are similar scenes that share one or more tests
62    'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c',
63    'scene2_d', 'scene2_e', 'scene3', 'scene4', 'scene5', 'scene6',
64    'sensor_fusion', 'scene_change'
67# Scenes that can be automated through tablet display
69    'scene0', 'scene1_1', 'scene1_2', 'scene2_a', 'scene2_b', 'scene2_c',
70    'scene2_d', 'scene2_e', 'scene3', 'scene4', 'scene6', 'scene_change'
73# Scenes that are logically grouped and can be called as group
75        'scene1': ['scene1_1', 'scene1_2'],
76        'scene2': ['scene2_a', 'scene2_b', 'scene2_c', 'scene2_d', 'scene2_e']
79# Scenes that have to be run manually regardless of configuration
80_MANUAL_SCENES = ['scene5']
82# Scene requirements for manual testing.
83_SCENE_REQ = {
84    'scene0': None,
85    'scene1_1': 'A grey card covering at least the middle 30% of the scene',
86    'scene1_2': 'A grey card covering at least the middle 30% of the scene',
87    'scene2_a': 'The picture with 3 faces in tests/scene2_a/scene2_a.png',
88    'scene2_b': 'The picture with 3 faces in tests/scene2_b/scene2_b.png',
89    'scene2_c': 'The picture with 3 faces in tests/scene2_c/scene2_c.png',
90    'scene2_d': 'The picture with 3 faces in tests/scene2_d/scene2_d.png',
91    'scene2_e': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
92    'scene3': 'The ISO12233 chart',
93    'scene4': 'A test chart of a circle covering at least the middle 50% of '
94              'the scene. See tests/scene4/scene4.png',
95    'scene5': 'Capture images with a diffuser attached to the camera. '
96              'See CameraITS.pdf section 2.3.4 for more details',
97    'scene6': 'A grid of black circles on a white background. '
98              'See tests/scene6/scene6.png',
99    'sensor_fusion': 'A checkerboard pattern for phone to rotate in front of '
100                     'in tests/sensor_fusion/checkerboard.pdf\n'
101                     'See tests/sensor_fusion/SensorFusion.pdf for detailed '
102                     'instructions.\nNote that this test will be skipped '
103                     'on devices not supporting REALTIME camera timestamp.',
104    'scene_change': 'The picture with 3 faces in tests/scene2_e/scene2_e.png',
109    'scene0': [
110        'test_burst_capture',
111        'test_jitter',
112        'test_metadata',
113        'test_read_write',
114        'test_sensor_events',
115        'test_solid_color_test_pattern',
116        'test_unified_timestamps',
117    ],
118    'scene1_1': [
119        'test_burst_sameness_manual',
120        'test_dng_noise_model',
121        'test_exposure',
122        'test_linearity',
123    ],
124    'scene1_2': [
125        'test_raw_exposure',
126        'test_raw_sensitivity',
127        'test_yuv_plus_raw',
128    ],
129    'scene2_a': [
130        'test_faces',
131        'test_num_faces',
132    ],
133    'scene4': [
134        'test_aspect_ratio_and_crop',
135    ],
136    'sensor_fusion': [
137        'test_sensor_fusion',
138    ],
141_DST_SCENE_DIR = '/mnt/sdcard/Download/'
142MOBLY_TEST_SUMMARY_TXT_FILE = 'test_mobly_summary.txt'
145def run(cmd):
146  """Replaces os.system call, while hiding stdout+stderr messages."""
147  with open(os.devnull, 'wb') as devnull:
148    subprocess.check_call(cmd.split(), stdout=devnull, stderr=subprocess.STDOUT)
151def report_result(device_id, camera_id, results):
152  """Sends a pass/fail result to the device, via an intent.
154  Args:
155   device_id: The ID string of the device to report the results to.
156   camera_id: The ID string of the camera for which to report pass/fail.
157   results: a dictionary contains all ITS scenes as key and result/summary of
158            current ITS run. See test_report_result unit test for an example.
159  """
160  adb = f'adb -s {device_id}'
162  # Start ItsTestActivity to receive test results
163  cmd = f'{adb} shell am start {ITS_TEST_ACTIVITY} --activity-brought-to-front'
164  run(cmd)
165  time.sleep(ACTIVITY_START_WAIT)
167  # Validate/process results argument
168  for scene in results:
169    if RESULT_KEY not in results[scene]:
170      raise ValueError(f'ITS result not found for {scene}')
171    if results[scene][RESULT_KEY] not in RESULT_VALUES:
172      raise ValueError(f'Unknown ITS result for {scene}: {results[RESULT_KEY]}')
173    if SUMMARY_KEY in results[scene]:
174      device_summary_path = f'/sdcard/its_camera{camera_id}_{scene}.txt'
175      run('%s push %s %s' %
176          (adb, results[scene][SUMMARY_KEY], device_summary_path))
177      results[scene][SUMMARY_KEY] = device_summary_path
179  json_results = json.dumps(results)
180  cmd = (f"{adb} shell am broadcast -a {ACTION_ITS_RESULT} --es {EXTRA_VERSION}"
181         f" {CURRENT_ITS_VERSION} --es {EXTRA_CAMERA_ID} {camera_id} --es "
182         f"{EXTRA_RESULTS} \'{json_results}\'")
183  if len(cmd) > 8000:
184    logging.info('ITS command string might be too long! len:%s', len(cmd))
185  run(cmd)
188def load_scenes_on_tablet(scene, tablet_id):
189  """Copies scenes onto the tablet before running the tests.
191  Args:
192    scene: Name of the scene to copy image files.
193    tablet_id: adb id of tablet
194  """
195  logging.info('Copying files to tablet: %s', tablet_id)
196  scene_dir = os.listdir(
197      os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', scene))
198  for file_name in scene_dir:
199    if file_name.endswith('.png'):
200      src_scene_file = os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
201                                    scene, file_name)
202      cmd = f'adb -s {tablet_id} push {src_scene_file} {_DST_SCENE_DIR}'
203      subprocess.Popen(cmd.split())
204  time.sleep(LOAD_SCENE_DELAY)
205  logging.info('Finished copying files to tablet.')
208def check_manual_scenes(device_id, camera_id, scene, out_path):
209  """Halt run to change scenes.
211  Args:
212    device_id: id of device
213    camera_id: id of camera
214    scene: Name of the scene to copy image files.
215    out_path: output file location
216  """
217  with its_session_utils.ItsSession(
218      device_id=device_id,
219      camera_id=camera_id) as cam:
220    props = cam.get_camera_properties()
221    props = cam.override_with_hidden_physical_camera_props(props)
223    while True:
224      input(f'\n Press <ENTER> after positioning camera {camera_id} with '
225            f'{scene}.\n The scene setup should be: \n  {_SCENE_REQ[scene]}\n')
226      # Converge 3A prior to capture
227      if scene == 'scene5':
228        cam.do_3a(do_af=False, lock_ae=camera_properties_utils.ae_lock(props),
229                  lock_awb=camera_properties_utils.awb_lock(props))
230      else:
231        cam.do_3a()
232      req, fmt = capture_request_utils.get_fastest_auto_capture_settings(props)
233      logging.info('Capturing an image to check the test scene')
234      cap = cam.do_capture(req, fmt)
235      img = image_processing_utils.convert_capture_to_rgb_image(cap)
236      img_name = os.path.join(out_path, f'test_{scene}.jpg')
237      logging.info('Please check scene setup in %s', img_name)
238      image_processing_utils.write_image(img, img_name)
239      choice = input('Is the image okay for ITS {scene}? (Y/N)').lower()
240      if choice == 'y':
241        break
244def get_config_file_contents():
245  """Read the config file contents from a YML file.
247  Args:
248    None
250  Returns:
251    config_file_contents: a dict read from config.yml
252  """
253  with open(CONFIG_FILE) as file:
254    config_file_contents = yaml.load(file, yaml.FullLoader)
255  return config_file_contents
258def get_test_params(config_file_contents):
259  """Reads the config file parameters.
261  Args:
262    config_file_contents: dict read from config.yml file
264  Returns:
265    dict of test parameters
266  """
267  test_params = None
268  for _, j in config_file_contents.items():
269    for datadict in j:
270      test_params = datadict.get('TestParams')
271  return test_params
274def get_device_serial_number(device, config_file_contents):
275  """Returns the serial number of the device with label from the config file.
277  The config file contains TestBeds dictionary which contains Controllers and
278  Android Device dicts.The two devices used by the test per box are listed
279  here labels dut and tablet. Parse through the nested TestBeds dict to get
280  the Android device details.
282  Args:
283    device: String device label as specified in config file.dut/tablet
284    config_file_contents: dict read from config.yml file
285  """
287  for _, j in config_file_contents.items():
288    for datadict in j:
289      android_device_contents = datadict.get('Controllers')
290      for device_dict in android_device_contents.get('AndroidDevice'):
291        for _, label in device_dict.items():
292          if label == 'tablet':
293            tablet_device_id = device_dict.get('serial')
294          if label == 'dut':
295            dut_device_id = device_dict.get('serial')
296  if device == 'tablet':
297    return tablet_device_id
298  else:
299    return dut_device_id
302def get_updated_yml_file(yml_file_contents):
303  """Create a new yml file and write the testbed contents in it.
305  This testbed file is per box and contains all the parameters and
306  device id used by the mobly tests.
308  Args:
309   yml_file_contents: Data to write in yml file.
311  Returns:
312    Updated yml file contents.
313  """
314  os.chmod(YAML_FILE_DIR, 0o755)
315  _, new_yaml_file = tempfile.mkstemp(
316      suffix='.yml', prefix='config_', dir=YAML_FILE_DIR)
317  with open(new_yaml_file, 'w') as f:
318    yaml.dump(yml_file_contents, stream=f, default_flow_style=False)
319  new_yaml_file_name = os.path.basename(new_yaml_file)
320  return new_yaml_file_name
323def enable_external_storage(device_id):
324  """Override apk mode to allow write to external storage.
326  Args:
327    device_id: Serial number of the device.
329  """
330  cmd = (f'adb -s {device_id} shell appops '
331         'set com.android.cts.verifier MANAGE_EXTERNAL_STORAGE allow')
332  run(cmd)
335def main():
336  """Run all the Camera ITS automated tests.
338    Script should be run from the top-level CameraITS directory.
340    Command line arguments:
341        camera:  the camera(s) to be tested. Use comma to separate multiple
342                 camera Ids. Ex: "camera=0,1" or "camera=1"
343        scenes:  the test scene(s) to be executed. Use comma to separate
344                 multiple scenes. Ex: "scenes=scene0,scene1_1" or
345                 "scenes=0,1_1,sensor_fusion" (sceneX can be abbreviated by X
346                 where X is scene name minus 'scene')
347  """
348  logging.basicConfig(level=logging.INFO)
349  # Make output directories to hold the generated files.
350  topdir = tempfile.mkdtemp(prefix='CameraITS_')
351  subprocess.call(['chmod', 'g+rx', topdir])
352  logging.info('Saving output files to: %s', topdir)
354  scenes = []
355  camera_id_combos = []
356  # Override camera & scenes with cmd line values if available
357  for s in list(sys.argv[1:]):
358    if 'scenes=' in s:
359      scenes = s.split('=')[1].split(',')
360    elif 'camera=' in s:
361      camera_id_combos = s.split('=')[1].split(',')
363  # Read config file and extract relevant TestBed
364  config_file_contents = get_config_file_contents()
365  for i in config_file_contents['TestBeds']:
366    if scenes == ['sensor_fusion']:
367      if TEST_KEY_SENSOR_FUSION not in i['Name'].lower():
368        config_file_contents['TestBeds'].remove(i)
369    else:
370      if TEST_KEY_SENSOR_FUSION in i['Name'].lower():
371        config_file_contents['TestBeds'].remove(i)
373  # Get test parameters from config file
374  test_params_content = get_test_params(config_file_contents)
375  if not camera_id_combos:
376    camera_id_combos = str(test_params_content['camera']).split(',')
377  if not scenes:
378    scenes = str(test_params_content['scene']).split(',')
379    scenes = [_INT_STR_DICT.get(n, n) for n in scenes]  # recover '1_1' & '1_2'
381  device_id = get_device_serial_number('dut', config_file_contents)
382  # Enable external storage on DUT to send summary report to CtsVerifier.apk
383  enable_external_storage(device_id)
385  config_file_test_key = config_file_contents['TestBeds'][0]['Name'].lower()
386  if TEST_KEY_TABLET in config_file_test_key:
387    tablet_id = get_device_serial_number('tablet', config_file_contents)
388  else:
389    tablet_id = None
391  testing_sensor_fusion_with_controller = False
392  if TEST_KEY_SENSOR_FUSION in config_file_test_key:
393    if test_params_content['rotator_cntl'].lower() in VALID_CONTROLLERS:
394      testing_sensor_fusion_with_controller = True
396  # Prepend 'scene' if not specified at cmd line
397  for i, s in enumerate(scenes):
398    if (not s.startswith('scene') and
399        not s.startswith(('sensor_fusion', '<scene-name>'))):
400      scenes[i] = f'scene{s}'
402  # Expand GROUPED_SCENES and remove any duplicates
403  scenes = [_GROUPED_SCENES[s] if s in _GROUPED_SCENES else s for s in scenes]
404  scenes = np.hstack(scenes).tolist()
405  scenes = sorted(set(scenes), key=scenes.index)
407  logging.info('Running ITS on device: %s, camera(s): %s, scene(s): %s',
408               device_id, camera_id_combos, scenes)
410  # Determine if manual run
411  if tablet_id is not None and not set(scenes).intersection(_MANUAL_SCENES):
412    auto_scene_switch = True
413  else:
414    auto_scene_switch = False
415    logging.info('Manual testing: no tablet defined or testing scene5.')
417  for camera_id in camera_id_combos:
418    test_params_content['camera'] = camera_id
419    results = {}
421    # Run through all scenes if user does not supply one and config file doesn't
422    # have specific scene name listed.
423    if its_session_utils.SUB_CAMERA_SEPARATOR in camera_id:
424      possible_scenes = list(SUB_CAMERA_TESTS.keys())
425      if auto_scene_switch:
426        possible_scenes.remove('sensor_fusion')
427    else:
428      possible_scenes = _AUTO_SCENES if auto_scene_switch else _ALL_SCENES
430    if '<scene-name>' in scenes:
431      per_camera_scenes = possible_scenes
432    else:
433      # Validate user input scene names
434      per_camera_scenes = []
435      for s in scenes:
436        if s in possible_scenes:
437          per_camera_scenes.append(s)
438      if not per_camera_scenes:
439        raise ValueError('No valid scene specified for this camera.')
441    logging.info('camera: %s, scene(s): %s', camera_id, per_camera_scenes)
442    for s in _ALL_SCENES:
443      results[s] = {RESULT_KEY: RESULT_NOT_EXECUTED}
444    # A subdir in topdir will be created for each camera_id. All scene test
445    # output logs for each camera id will be stored in this subdir.
446    # This output log path is a mobly param : LogPath
447    cam_id_string = 'cam_id_%s' % (
448        camera_id.replace(its_session_utils.SUB_CAMERA_SEPARATOR, '_'))
449    mobly_output_logs_path = os.path.join(topdir, cam_id_string)
450    os.mkdir(mobly_output_logs_path)
451    tot_pass = 0
452    for s in per_camera_scenes:
453      test_params_content['scene'] = s
454      results[s]['TEST_STATUS'] = []
456      # unit is millisecond for execution time record in CtsVerifier
457      scene_start_time = int(round(time.time() * 1000))
458      scene_test_summary = f'Cam{camera_id} {s}' + '\n'
459      mobly_scene_output_logs_path = os.path.join(mobly_output_logs_path, s)
461      if auto_scene_switch:
462        # Copy scene images onto the tablet
463        if s not in ['scene0']:
464          load_scenes_on_tablet(s, tablet_id)
465      else:
466        # Check manual scenes for correctness
467        if s not in ['scene0'] and not testing_sensor_fusion_with_controller:
468          check_manual_scenes(device_id, camera_id, s, mobly_output_logs_path)
470      scene_test_list = []
471      config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
472      # Add the MoblyParams to config.yml file with the path to store camera_id
473      # test results. This is a separate dict other than TestBeds.
474      mobly_params_dict = {
475          'MoblyParams': {
476              'LogPath': mobly_scene_output_logs_path
477          }
478      }
479      config_file_contents.update(mobly_params_dict)
480      logging.debug('Final config file contents: %s', config_file_contents)
481      new_yml_file_name = get_updated_yml_file(config_file_contents)
482      logging.info('Using %s as temporary config yml file', new_yml_file_name)
483      if camera_id.rfind(its_session_utils.SUB_CAMERA_SEPARATOR) == -1:
484        scene_dir = os.listdir(
485            os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', s))
486        for file_name in scene_dir:
487          if file_name.endswith('.py') and 'test' in file_name:
488            scene_test_list.append(file_name)
489      else:  # sub-camera
490        if SUB_CAMERA_TESTS.get(s):
491          scene_test_list = [f'{test}.py' for test in SUB_CAMERA_TESTS[s]]
492        else:
493          scene_test_list = []
494      scene_test_list.sort()
496      # Run tests for scene
497      logging.info('Running tests for %s with camera %s', s, camera_id)
498      num_pass = 0
499      num_skip = 0
500      num_not_mandated_fail = 0
501      num_fail = 0
502      for test in scene_test_list:
503        # Handle repeated test
504        if 'tests/' in test:
505          cmd = [
506              'python3',
507              os.path.join(os.environ['CAMERA_ITS_TOP'], test), '-c',
508              '%s' % new_yml_file_name
509          ]
510        else:
511          cmd = [
512              'python3',
513              os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', s, test),
514              '-c',
515              '%s' % new_yml_file_name
516          ]
517        for num_try in range(NUM_TRIES):
518          # pylint: disable=subprocess-run-check
519          with open(MOBLY_TEST_SUMMARY_TXT_FILE, 'w') as fp:
520            output = subprocess.run(cmd, stdout=fp)
521          # pylint: enable=subprocess-run-check
523          # Parse mobly logs to determine SKIP, NOT_YET_MANDATED, and
524          # socket FAILs.
525          with open(MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as file:
526            test_code = output.returncode
527            test_failed = False
528            test_skipped = False
529            test_not_yet_mandated = False
530            line = file.read()
531            if 'Test skipped' in line:
532              return_string = 'SKIP '
533              num_skip += 1
534              test_skipped = True
535              break
537            if 'Not yet mandated test' in line:
538              return_string = 'FAIL*'
539              num_not_mandated_fail += 1
540              test_not_yet_mandated = True
541              break
543            if test_code == 0 and not test_skipped:
544              return_string = 'PASS '
545              num_pass += 1
546              break
548            if test_code == 1 and not test_not_yet_mandated:
549              return_string = 'FAIL '
550              if 'Problem with socket' in line and num_try != NUM_TRIES-1:
551                logging.info('Retry %s/%s', s, test)
552              else:
553                num_fail += 1
554                test_failed = True
555                break
556            os.remove(MOBLY_TEST_SUMMARY_TXT_FILE)
557        logging.info('%s %s/%s', return_string, s, test)
558        test_name = test.split('/')[-1].split('.')[0]
559        results[s]['TEST_STATUS'].append({'test':test_name,'status':return_string.strip()})
560        msg_short = '%s %s' % (return_string, test)
561        scene_test_summary += msg_short + '\n'
563      # unit is millisecond for execution time record in CtsVerifier
564      scene_end_time = int(round(time.time() * 1000))
565      skip_string = ''
566      tot_tests = len(scene_test_list)
567      if num_skip > 0:
568        skipstr = f",{num_skip} test{'s' if num_skip > 1 else ''} skipped"
569      test_result = '%d / %d tests passed (%.1f%%)%s' % (
570          num_pass + num_not_mandated_fail, len(scene_test_list) - num_skip,
571          100.0 * float(num_pass + num_not_mandated_fail) /
572          (len(scene_test_list) - num_skip)
573          if len(scene_test_list) != num_skip else 100.0, skip_string)
574      logging.info(test_result)
575      if num_not_mandated_fail > 0:
576        logging.info('(*) %s not_yet_mandated tests failed',
577                     num_not_mandated_fail)
579      tot_pass += num_pass
580      logging.info('scene tests: %s, Total tests passed: %s', tot_tests,
581                   tot_pass)
582      if tot_tests > 0:
583        logging.info('%s compatibility score: %.f/100\n',
584                     s, 100 * num_pass / tot_tests)
585        scene_test_summary_path = os.path.join(mobly_scene_output_logs_path,
586                                               'scene_test_summary.txt')
587        with open(scene_test_summary_path, 'w') as f:
588          f.write(scene_test_summary)
589        results[s][RESULT_KEY] = (RESULT_PASS if num_fail == 0 else RESULT_FAIL)
590        results[s][SUMMARY_KEY] = scene_test_summary_path
591        results[s][TIME_KEY_START] = scene_start_time
592        results[s][TIME_KEY_END] = scene_end_time
593      else:
594        logging.info('%s compatibility score: 0/100\n')
596      # Delete temporary yml file after scene run.
597      new_yaml_file_path = os.path.join(YAML_FILE_DIR, new_yml_file_name)
598      os.remove(new_yaml_file_path)
600    # Log results per camera
601    logging.info('Reporting camera %s ITS results to CtsVerifier', camera_id)
602    report_result(device_id, camera_id, results)
604  logging.info('Test execution completed.')
606  # Power down tablet
607  cmd = f'adb -s {tablet_id} shell input keyevent KEYCODE_POWER'
608  subprocess.Popen(cmd.split())
610if __name__ == '__main__':
611  main()