1# Copyright 2014 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import copy 16import math 17import os 18import os.path 19import re 20import subprocess 21import sys 22import tempfile 23import time 24 25import its.caps 26import its.cv2image 27import its.device 28from its.device import ItsSession 29import its.image 30 31import numpy as np 32 33CHART_DELAY = 1 # seconds 34CHART_DISTANCE = 30.0 # cm 35CHART_HEIGHT = 13.5 # cm 36CHART_SCALE_START = 0.65 37CHART_SCALE_STOP = 1.35 38CHART_SCALE_STEP = 0.025 39FACING_EXTERNAL = 2 40NUM_TRYS = 2 41SCENE3_FILE = os.path.join(os.environ["CAMERA_ITS_TOP"], "pymodules", "its", 42 "test_images", "ISO12233.png") 43SKIP_RET_CODE = 101 # note this must be same as tests/scene*/test_* 44VGA_HEIGHT = 480 45VGA_WIDTH = 640 46 47# Not yet mandated tests 48NOT_YET_MANDATED = { 49 "scene0": [ 50 "test_jitter", 51 "test_burst_capture", 52 "test_test_patterns" 53 ], 54 "scene1": [ 55 "test_ae_af", 56 "test_ae_precapture_trigger", 57 "test_crop_region_raw", 58 "test_ev_compensation_advanced", 59 "test_ev_compensation_basic", 60 "test_yuv_plus_jpeg" 61 ], 62 "scene2": [ 63 "test_num_faces" 64 ], 65 "scene3": [ 66 "test_flip_mirror", 67 "test_lens_movement_reporting", 68 "test_lens_position" 69 ], 70 "scene4": [ 71 "test_multi_camera_alignment" 72 ], 73 "scene5": [], 74 "sensor_fusion": [] 75} 76 77 78def calc_camera_fov(): 79 """Determine the camera field of view from internal params.""" 80 with ItsSession() as cam: 81 props = cam.get_camera_properties() 82 try: 83 focal_l = props['android.lens.info.availableFocalLengths'][0] 84 sensor_size = props['android.sensor.info.physicalSize'] 85 diag = math.sqrt(sensor_size['height'] ** 2 + 86 sensor_size['width'] ** 2) 87 fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2)) 88 except ValueError: 89 fov = str(0) 90 print 'Calculated FoV: %s' % fov 91 return fov 92 93 94def evaluate_socket_failure(err_file_path): 95 """Determine if test fails due to socket FAIL.""" 96 socket_fail = False 97 with open(err_file_path, 'r') as ferr: 98 for line in ferr: 99 if (line.find('socket.error') != -1 or 100 line.find('socket.timeout') != -1 or 101 line.find('Problem with socket') != -1): 102 socket_fail = True 103 return socket_fail 104 105 106def skip_sensor_fusion(): 107 """Determine if sensor fusion test is skipped for this camera.""" 108 109 skip_code = SKIP_RET_CODE 110 with ItsSession() as cam: 111 props = cam.get_camera_properties() 112 if (its.caps.sensor_fusion(props) and its.caps.manual_sensor(props) and 113 props['android.lens.facing'] is not FACING_EXTERNAL): 114 skip_code = None 115 return skip_code 116 117 118def main(): 119 """Run all the automated tests, saving intermediate files, and producing 120 a summary/report of the results. 121 122 Script should be run from the top-level CameraITS directory. 123 124 Command line arguments: 125 camera: the camera(s) to be tested. Use comma to separate multiple 126 camera Ids. Ex: "camera=0,1" or "camera=1" 127 device: device id for adb 128 scenes: the test scene(s) to be executed. Use comma to separate 129 multiple scenes. Ex: "scenes=scene0,scene1" or 130 "scenes=0,1,sensor_fusion" (sceneX can be abbreviated by X 131 where X is a integer) 132 chart: [Experimental] another android device served as test chart 133 display. When this argument presents, change of test scene 134 will be handled automatically. Note that this argument 135 requires special physical/hardware setup to work and may not 136 work on all android devices. 137 result: Device ID to forward results to (in addition to the device 138 that the tests are running on). 139 rot_rig: [Experimental] ID of the rotation rig being used (formatted as 140 "<vendor ID>:<product ID>:<channel #>" or "default") 141 tmp_dir: location of temp directory for output files 142 skip_scene_validation: force skip scene validation. Used when test scene 143 is setup up front and don't require tester validation. 144 dist: [Experimental] chart distance in cm. 145 """ 146 147 all_scenes = ["scene0", "scene1", "scene2", "scene3", "scene4", "scene5", 148 "sensor_fusion"] 149 150 auto_scenes = ["scene0", "scene1", "scene2", "scene3", "scene4"] 151 152 scene_req = { 153 "scene0": None, 154 "scene1": "A grey card covering at least the middle 30% of the scene", 155 "scene2": "A picture containing human faces", 156 "scene3": "The ISO 12233 chart", 157 "scene4": "A specific test page of a circle covering at least the " 158 "middle 50% of the scene. See CameraITS.pdf section 2.3.4 " 159 "for more details", 160 "scene5": "Capture images with a diffuser attached to the camera. See " 161 "CameraITS.pdf section 2.3.4 for more details", 162 "sensor_fusion": "Rotating checkboard pattern. See " 163 "sensor_fusion/SensorFusion.pdf for detailed " 164 "instructions.\nNote that this test will be skipped " 165 "on devices not supporting REALTIME camera timestamp." 166 } 167 scene_extra_args = { 168 "scene5": ["doAF=False"] 169 } 170 171 camera_ids = [] 172 scenes = [] 173 chart_host_id = None 174 result_device_id = None 175 rot_rig_id = None 176 tmp_dir = None 177 skip_scene_validation = False 178 chart_distance = CHART_DISTANCE 179 chart_height = CHART_HEIGHT 180 181 for s in sys.argv[1:]: 182 if s[:7] == "camera=" and len(s) > 7: 183 camera_ids = s[7:].split(',') 184 elif s[:7] == "scenes=" and len(s) > 7: 185 scenes = s[7:].split(',') 186 elif s[:6] == 'chart=' and len(s) > 6: 187 chart_host_id = s[6:] 188 elif s[:7] == 'result=' and len(s) > 7: 189 result_device_id = s[7:] 190 elif s[:8] == 'rot_rig=' and len(s) > 8: 191 rot_rig_id = s[8:] # valid values: 'default' or '$VID:$PID:$CH' 192 # The default '$VID:$PID:$CH' is '04d8:fc73:1' 193 elif s[:8] == 'tmp_dir=' and len(s) > 8: 194 tmp_dir = s[8:] 195 elif s == 'skip_scene_validation': 196 skip_scene_validation = True 197 elif s[:5] == 'dist=' and len(s) > 5: 198 chart_distance = float(re.sub('cm', '', s[5:])) 199 200 chart_dist_arg = 'dist= ' + str(chart_distance) 201 auto_scene_switch = chart_host_id is not None 202 merge_result_switch = result_device_id is not None 203 204 # Run through all scenes if user does not supply one 205 possible_scenes = auto_scenes if auto_scene_switch else all_scenes 206 if not scenes: 207 scenes = possible_scenes 208 else: 209 # Validate user input scene names 210 valid_scenes = True 211 temp_scenes = [] 212 for s in scenes: 213 if s in possible_scenes: 214 temp_scenes.append(s) 215 else: 216 try: 217 # Try replace "X" to "sceneX" 218 scene_str = "scene" + s 219 if scene_str not in possible_scenes: 220 valid_scenes = False 221 break 222 temp_scenes.append(scene_str) 223 except ValueError: 224 valid_scenes = False 225 break 226 227 if not valid_scenes: 228 print 'Unknown scene specified:', s 229 assert False 230 scenes = temp_scenes 231 232 # Initialize test results 233 results = {} 234 result_key = ItsSession.RESULT_KEY 235 for s in all_scenes: 236 results[s] = {result_key: ItsSession.RESULT_NOT_EXECUTED} 237 238 # Make output directories to hold the generated files. 239 topdir = tempfile.mkdtemp(dir=tmp_dir) 240 subprocess.call(['chmod', 'g+rx', topdir]) 241 print "Saving output files to:", topdir, "\n" 242 243 device_id = its.device.get_device_id() 244 device_id_arg = "device=" + device_id 245 print "Testing device " + device_id 246 247 # Sanity Check for devices 248 device_bfp = its.device.get_device_fingerprint(device_id) 249 assert device_bfp is not None 250 251 if auto_scene_switch: 252 chart_host_bfp = its.device.get_device_fingerprint(chart_host_id) 253 assert chart_host_bfp is not None 254 255 if merge_result_switch: 256 result_device_bfp = its.device.get_device_fingerprint(result_device_id) 257 assert_err_msg = ('Cannot merge result to a different build, from ' 258 '%s to %s' % (device_bfp, result_device_bfp)) 259 assert device_bfp == result_device_bfp, assert_err_msg 260 261 # user doesn't specify camera id, run through all cameras 262 if not camera_ids: 263 with its.device.ItsSession() as cam: 264 camera_ids = cam.get_camera_ids() 265 266 print "Running ITS on camera: %s, scene %s" % (camera_ids, scenes) 267 268 if auto_scene_switch: 269 # merge_result only supports run_parallel_tests 270 if merge_result_switch and camera_ids[0] == '1': 271 print 'Skip chart screen' 272 time.sleep(1) 273 else: 274 print 'Waking up chart screen: ', chart_host_id 275 screen_id_arg = ('screen=%s' % chart_host_id) 276 cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools', 277 'wake_up_screen.py'), screen_id_arg] 278 wake_code = subprocess.call(cmd) 279 assert wake_code == 0 280 281 for camera_id in camera_ids: 282 camera_fov = calc_camera_fov() 283 # Loop capturing images until user confirm test scene is correct 284 camera_id_arg = "camera=" + camera_id 285 print "Preparing to run ITS on camera", camera_id 286 287 os.mkdir(os.path.join(topdir, camera_id)) 288 for d in scenes: 289 os.mkdir(os.path.join(topdir, camera_id, d)) 290 291 for scene in scenes: 292 skip_code = None 293 tests = [(s[:-3], os.path.join("tests", scene, s)) 294 for s in os.listdir(os.path.join("tests", scene)) 295 if s[-3:] == ".py" and s[:4] == "test"] 296 tests.sort() 297 298 summary = "Cam" + camera_id + " " + scene + "\n" 299 numpass = 0 300 numskip = 0 301 num_not_mandated_fail = 0 302 numfail = 0 303 validate_switch = True 304 if scene_req[scene] is not None: 305 out_path = os.path.join(topdir, camera_id, scene+".jpg") 306 out_arg = "out=" + out_path 307 if scene == 'sensor_fusion': 308 skip_code = skip_sensor_fusion() 309 if rot_rig_id or skip_code == SKIP_RET_CODE: 310 validate_switch = False 311 if skip_scene_validation: 312 validate_switch = False 313 cmd = None 314 if auto_scene_switch: 315 if (not merge_result_switch or 316 (merge_result_switch and camera_ids[0] == '0')): 317 scene_arg = 'scene=' + scene 318 fov_arg = 'fov=' + camera_fov 319 cmd = ['python', 320 os.path.join(os.getcwd(), 'tools/load_scene.py'), 321 scene_arg, chart_dist_arg, fov_arg, screen_id_arg] 322 else: 323 time.sleep(CHART_DELAY) 324 else: 325 # Skip scene validation under certain conditions 326 if validate_switch and not merge_result_switch: 327 scene_arg = 'scene=' + scene_req[scene] 328 extra_args = scene_extra_args.get(scene, []) 329 cmd = ['python', 330 os.path.join(os.getcwd(), 331 'tools/validate_scene.py'), 332 camera_id_arg, out_arg, 333 scene_arg, device_id_arg] + extra_args 334 if cmd is not None: 335 valid_scene_code = subprocess.call(cmd, cwd=topdir) 336 assert valid_scene_code == 0 337 print "Start running ITS on camera %s, %s" % (camera_id, scene) 338 # Extract chart from scene for scene3 once up front 339 chart_loc_arg = '' 340 if scene == 'scene3': 341 if float(camera_fov) < 90 and np.isclose(chart_distance, 20, 342 rtol=0.1): 343 chart_height *= 0.67 344 chart = its.cv2image.Chart(SCENE3_FILE, chart_height, 345 chart_distance, CHART_SCALE_START, 346 CHART_SCALE_STOP, CHART_SCALE_STEP) 347 chart_loc_arg = 'chart_loc=%.2f,%.2f,%.2f,%.2f,%.3f' % ( 348 chart.xnorm, chart.ynorm, chart.wnorm, chart.hnorm, 349 chart.scale) 350 # Run each test, capturing stdout and stderr. 351 for (testname, testpath) in tests: 352 if auto_scene_switch: 353 if merge_result_switch and camera_ids[0] == '0': 354 # Send an input event to keep the screen not dimmed. 355 # Since we are not using camera of chart screen, FOCUS event 356 # should do nothing but keep the screen from dimming. 357 # The "sleep after x minutes of inactivity" display setting 358 # determines how long this command can keep screen bright. 359 # Setting it to something like 30 minutes should be enough. 360 cmd = ('adb -s %s shell input keyevent FOCUS' 361 % chart_host_id) 362 subprocess.call(cmd.split()) 363 t0 = time.time() 364 for num_try in range(NUM_TRYS): 365 outdir = os.path.join(topdir, camera_id, scene) 366 outpath = os.path.join(outdir, testname+'_stdout.txt') 367 errpath = os.path.join(outdir, testname+'_stderr.txt') 368 if scene == 'sensor_fusion': 369 if skip_code is not SKIP_RET_CODE: 370 if rot_rig_id: 371 print 'Rotating phone w/ rig %s' % rot_rig_id 372 rig = ('python tools/rotation_rig.py rotator=%s' % 373 rot_rig_id) 374 subprocess.Popen(rig.split()) 375 else: 376 print 'Rotate phone 15s as shown in SensorFusion.pdf' 377 else: 378 test_code = skip_code 379 if skip_code is not SKIP_RET_CODE: 380 cmd = ['python', os.path.join(os.getcwd(), testpath)] 381 cmd += sys.argv[1:] + [camera_id_arg] + [chart_loc_arg] 382 cmd += [chart_dist_arg] 383 with open(outpath, 'w') as fout, open(errpath, 'w') as ferr: 384 test_code = subprocess.call( 385 cmd, stderr=ferr, stdout=fout, cwd=outdir) 386 if test_code == 0 or test_code == SKIP_RET_CODE: 387 break 388 else: 389 socket_fail = evaluate_socket_failure(errpath) 390 if socket_fail: 391 if num_try != NUM_TRYS-1: 392 print ' Retry %s/%s' % (scene, testname) 393 else: 394 break 395 else: 396 break 397 t1 = time.time() 398 399 test_failed = False 400 if test_code == 0: 401 retstr = "PASS " 402 numpass += 1 403 elif test_code == SKIP_RET_CODE: 404 retstr = "SKIP " 405 numskip += 1 406 elif test_code != 0 and testname in NOT_YET_MANDATED[scene]: 407 retstr = "FAIL*" 408 num_not_mandated_fail += 1 409 else: 410 retstr = "FAIL " 411 numfail += 1 412 test_failed = True 413 414 msg = "%s %s/%s [%.1fs]" % (retstr, scene, testname, t1-t0) 415 print msg 416 its.device.adb_log(device_id, msg) 417 msg_short = "%s %s [%.1fs]" % (retstr, testname, t1-t0) 418 if test_failed: 419 summary += msg_short + "\n" 420 421 if numskip > 0: 422 skipstr = ", %d test%s skipped" % ( 423 numskip, "s" if numskip > 1 else "") 424 else: 425 skipstr = "" 426 427 test_result = "\n%d / %d tests passed (%.1f%%)%s" % ( 428 numpass + num_not_mandated_fail, len(tests) - numskip, 429 100.0 * float(numpass + num_not_mandated_fail) / 430 (len(tests) - numskip) 431 if len(tests) != numskip else 100.0, skipstr) 432 print test_result 433 434 if num_not_mandated_fail > 0: 435 msg = "(*) tests are not yet mandated" 436 print msg 437 438 summary_path = os.path.join(topdir, camera_id, scene, "summary.txt") 439 with open(summary_path, "w") as f: 440 f.write(summary) 441 442 passed = numfail == 0 443 results[scene][result_key] = (ItsSession.RESULT_PASS if passed 444 else ItsSession.RESULT_FAIL) 445 results[scene][ItsSession.SUMMARY_KEY] = summary_path 446 447 msg = "Reporting ITS result to CtsVerifier" 448 print msg 449 its.device.adb_log(device_id, msg) 450 if merge_result_switch: 451 # results are modified by report_result 452 results_backup = copy.deepcopy(results) 453 its.device.report_result(result_device_id, camera_id, results_backup) 454 455 its.device.report_result(device_id, camera_id, results) 456 457 if auto_scene_switch: 458 if merge_result_switch: 459 print 'Skip shutting down chart screen' 460 else: 461 print 'Shutting down chart screen: ', chart_host_id 462 screen_id_arg = ('screen=%s' % chart_host_id) 463 cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools', 464 'turn_off_screen.py'), screen_id_arg] 465 screen_off_code = subprocess.call(cmd) 466 assert screen_off_code == 0 467 468 print 'Shutting down DUT screen: ', device_id 469 screen_id_arg = ('screen=%s' % device_id) 470 cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools', 471 'turn_off_screen.py'), screen_id_arg] 472 screen_off_code = subprocess.call(cmd) 473 assert screen_off_code == 0 474 475 print "ITS tests finished. Please go back to CtsVerifier and proceed" 476 477if __name__ == '__main__': 478 main() 479