1# Copyright 2014 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import copy
16import math
17import os
18import os.path
19import re
20import subprocess
21import sys
22import tempfile
23import time
24
25import its.caps
26import its.cv2image
27import its.device
28from its.device import ItsSession
29import its.image
30
31import numpy as np
32
33CHART_DELAY = 1  # seconds
34CHART_DISTANCE = 30.0  # cm
35CHART_HEIGHT = 13.5  # cm
36CHART_SCALE_START = 0.65
37CHART_SCALE_STOP = 1.35
38CHART_SCALE_STEP = 0.025
39FACING_EXTERNAL = 2
40NUM_TRYS = 2
41SCENE3_FILE = os.path.join(os.environ["CAMERA_ITS_TOP"], "pymodules", "its",
42                           "test_images", "ISO12233.png")
43SKIP_RET_CODE = 101  # note this must be same as tests/scene*/test_*
44VGA_HEIGHT = 480
45VGA_WIDTH = 640
46
47# Not yet mandated tests
48NOT_YET_MANDATED = {
49        "scene0": [
50                "test_jitter",
51                "test_burst_capture",
52                "test_test_patterns"
53        ],
54        "scene1": [
55                "test_ae_af",
56                "test_ae_precapture_trigger",
57                "test_crop_region_raw",
58                "test_ev_compensation_advanced",
59                "test_ev_compensation_basic",
60                "test_yuv_plus_jpeg"
61        ],
62        "scene2": [
63                "test_num_faces"
64        ],
65        "scene3": [
66                "test_flip_mirror",
67                "test_lens_movement_reporting",
68                "test_lens_position"
69        ],
70        "scene4": [
71                "test_multi_camera_alignment"
72        ],
73        "scene5": [],
74        "sensor_fusion": []
75}
76
77
78def calc_camera_fov():
79    """Determine the camera field of view from internal params."""
80    with ItsSession() as cam:
81        props = cam.get_camera_properties()
82    try:
83        focal_l = props['android.lens.info.availableFocalLengths'][0]
84        sensor_size = props['android.sensor.info.physicalSize']
85        diag = math.sqrt(sensor_size['height'] ** 2 +
86                         sensor_size['width'] ** 2)
87        fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2))
88    except ValueError:
89        fov = str(0)
90    print 'Calculated FoV: %s' % fov
91    return fov
92
93
94def evaluate_socket_failure(err_file_path):
95    """Determine if test fails due to socket FAIL."""
96    socket_fail = False
97    with open(err_file_path, 'r') as ferr:
98        for line in ferr:
99            if (line.find('socket.error') != -1 or
100                line.find('socket.timeout') != -1 or
101                line.find('Problem with socket') != -1):
102                socket_fail = True
103    return socket_fail
104
105
106def skip_sensor_fusion():
107    """Determine if sensor fusion test is skipped for this camera."""
108
109    skip_code = SKIP_RET_CODE
110    with ItsSession() as cam:
111        props = cam.get_camera_properties()
112        if (its.caps.sensor_fusion(props) and its.caps.manual_sensor(props) and
113                props['android.lens.facing'] is not FACING_EXTERNAL):
114            skip_code = None
115    return skip_code
116
117
118def main():
119    """Run all the automated tests, saving intermediate files, and producing
120    a summary/report of the results.
121
122    Script should be run from the top-level CameraITS directory.
123
124    Command line arguments:
125        camera:  the camera(s) to be tested. Use comma to separate multiple
126                 camera Ids. Ex: "camera=0,1" or "camera=1"
127        device:  device id for adb
128        scenes:  the test scene(s) to be executed. Use comma to separate
129                 multiple scenes. Ex: "scenes=scene0,scene1" or
130                 "scenes=0,1,sensor_fusion" (sceneX can be abbreviated by X
131                 where X is a integer)
132        chart:   [Experimental] another android device served as test chart
133                 display. When this argument presents, change of test scene
134                 will be handled automatically. Note that this argument
135                 requires special physical/hardware setup to work and may not
136                 work on all android devices.
137        result:  Device ID to forward results to (in addition to the device
138                 that the tests are running on).
139        rot_rig: [Experimental] ID of the rotation rig being used (formatted as
140                 "<vendor ID>:<product ID>:<channel #>" or "default")
141        tmp_dir: location of temp directory for output files
142        skip_scene_validation: force skip scene validation. Used when test scene
143                 is setup up front and don't require tester validation.
144        dist:    [Experimental] chart distance in cm.
145    """
146
147    all_scenes = ["scene0", "scene1", "scene2", "scene3", "scene4", "scene5",
148                  "sensor_fusion"]
149
150    auto_scenes = ["scene0", "scene1", "scene2", "scene3", "scene4"]
151
152    scene_req = {
153        "scene0": None,
154        "scene1": "A grey card covering at least the middle 30% of the scene",
155        "scene2": "A picture containing human faces",
156        "scene3": "The ISO 12233 chart",
157        "scene4": "A specific test page of a circle covering at least the "
158                  "middle 50% of the scene. See CameraITS.pdf section 2.3.4 "
159                  "for more details",
160        "scene5": "Capture images with a diffuser attached to the camera. See "
161                  "CameraITS.pdf section 2.3.4 for more details",
162        "sensor_fusion": "Rotating checkboard pattern. See "
163                         "sensor_fusion/SensorFusion.pdf for detailed "
164                         "instructions.\nNote that this test will be skipped "
165                         "on devices not supporting REALTIME camera timestamp."
166    }
167    scene_extra_args = {
168        "scene5": ["doAF=False"]
169    }
170
171    camera_ids = []
172    scenes = []
173    chart_host_id = None
174    result_device_id = None
175    rot_rig_id = None
176    tmp_dir = None
177    skip_scene_validation = False
178    chart_distance = CHART_DISTANCE
179    chart_height = CHART_HEIGHT
180
181    for s in sys.argv[1:]:
182        if s[:7] == "camera=" and len(s) > 7:
183            camera_ids = s[7:].split(',')
184        elif s[:7] == "scenes=" and len(s) > 7:
185            scenes = s[7:].split(',')
186        elif s[:6] == 'chart=' and len(s) > 6:
187            chart_host_id = s[6:]
188        elif s[:7] == 'result=' and len(s) > 7:
189            result_device_id = s[7:]
190        elif s[:8] == 'rot_rig=' and len(s) > 8:
191            rot_rig_id = s[8:]  # valid values: 'default' or '$VID:$PID:$CH'
192            # The default '$VID:$PID:$CH' is '04d8:fc73:1'
193        elif s[:8] == 'tmp_dir=' and len(s) > 8:
194            tmp_dir = s[8:]
195        elif s == 'skip_scene_validation':
196            skip_scene_validation = True
197        elif s[:5] == 'dist=' and len(s) > 5:
198            chart_distance = float(re.sub('cm', '', s[5:]))
199
200    chart_dist_arg = 'dist= ' + str(chart_distance)
201    auto_scene_switch = chart_host_id is not None
202    merge_result_switch = result_device_id is not None
203
204    # Run through all scenes if user does not supply one
205    possible_scenes = auto_scenes if auto_scene_switch else all_scenes
206    if not scenes:
207        scenes = possible_scenes
208    else:
209        # Validate user input scene names
210        valid_scenes = True
211        temp_scenes = []
212        for s in scenes:
213            if s in possible_scenes:
214                temp_scenes.append(s)
215            else:
216                try:
217                    # Try replace "X" to "sceneX"
218                    scene_str = "scene" + s
219                    if scene_str not in possible_scenes:
220                        valid_scenes = False
221                        break
222                    temp_scenes.append(scene_str)
223                except ValueError:
224                    valid_scenes = False
225                    break
226
227        if not valid_scenes:
228            print 'Unknown scene specified:', s
229            assert False
230        scenes = temp_scenes
231
232    # Initialize test results
233    results = {}
234    result_key = ItsSession.RESULT_KEY
235    for s in all_scenes:
236        results[s] = {result_key: ItsSession.RESULT_NOT_EXECUTED}
237
238    # Make output directories to hold the generated files.
239    topdir = tempfile.mkdtemp(dir=tmp_dir)
240    subprocess.call(['chmod', 'g+rx', topdir])
241    print "Saving output files to:", topdir, "\n"
242
243    device_id = its.device.get_device_id()
244    device_id_arg = "device=" + device_id
245    print "Testing device " + device_id
246
247    # Sanity Check for devices
248    device_bfp = its.device.get_device_fingerprint(device_id)
249    assert device_bfp is not None
250
251    if auto_scene_switch:
252        chart_host_bfp = its.device.get_device_fingerprint(chart_host_id)
253        assert chart_host_bfp is not None
254
255    if merge_result_switch:
256        result_device_bfp = its.device.get_device_fingerprint(result_device_id)
257        assert_err_msg = ('Cannot merge result to a different build, from '
258                          '%s to %s' % (device_bfp, result_device_bfp))
259        assert device_bfp == result_device_bfp, assert_err_msg
260
261    # user doesn't specify camera id, run through all cameras
262    if not camera_ids:
263        with its.device.ItsSession() as cam:
264            camera_ids = cam.get_camera_ids()
265
266    print "Running ITS on camera: %s, scene %s" % (camera_ids, scenes)
267
268    if auto_scene_switch:
269        # merge_result only supports run_parallel_tests
270        if merge_result_switch and camera_ids[0] == '1':
271            print 'Skip chart screen'
272            time.sleep(1)
273        else:
274            print 'Waking up chart screen: ', chart_host_id
275            screen_id_arg = ('screen=%s' % chart_host_id)
276            cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools',
277                                          'wake_up_screen.py'), screen_id_arg]
278            wake_code = subprocess.call(cmd)
279            assert wake_code == 0
280
281    for camera_id in camera_ids:
282        camera_fov = calc_camera_fov()
283        # Loop capturing images until user confirm test scene is correct
284        camera_id_arg = "camera=" + camera_id
285        print "Preparing to run ITS on camera", camera_id
286
287        os.mkdir(os.path.join(topdir, camera_id))
288        for d in scenes:
289            os.mkdir(os.path.join(topdir, camera_id, d))
290
291        for scene in scenes:
292            skip_code = None
293            tests = [(s[:-3], os.path.join("tests", scene, s))
294                     for s in os.listdir(os.path.join("tests", scene))
295                     if s[-3:] == ".py" and s[:4] == "test"]
296            tests.sort()
297
298            summary = "Cam" + camera_id + " " + scene + "\n"
299            numpass = 0
300            numskip = 0
301            num_not_mandated_fail = 0
302            numfail = 0
303            validate_switch = True
304            if scene_req[scene] is not None:
305                out_path = os.path.join(topdir, camera_id, scene+".jpg")
306                out_arg = "out=" + out_path
307                if scene == 'sensor_fusion':
308                    skip_code = skip_sensor_fusion()
309                    if rot_rig_id or skip_code == SKIP_RET_CODE:
310                        validate_switch = False
311                if skip_scene_validation:
312                    validate_switch = False
313                cmd = None
314                if auto_scene_switch:
315                    if (not merge_result_switch or
316                            (merge_result_switch and camera_ids[0] == '0')):
317                        scene_arg = 'scene=' + scene
318                        fov_arg = 'fov=' + camera_fov
319                        cmd = ['python',
320                               os.path.join(os.getcwd(), 'tools/load_scene.py'),
321                               scene_arg, chart_dist_arg, fov_arg, screen_id_arg]
322                    else:
323                        time.sleep(CHART_DELAY)
324                else:
325                    # Skip scene validation under certain conditions
326                    if validate_switch and not merge_result_switch:
327                        scene_arg = 'scene=' + scene_req[scene]
328                        extra_args = scene_extra_args.get(scene, [])
329                        cmd = ['python',
330                               os.path.join(os.getcwd(),
331                                            'tools/validate_scene.py'),
332                               camera_id_arg, out_arg,
333                               scene_arg, device_id_arg] + extra_args
334                if cmd is not None:
335                    valid_scene_code = subprocess.call(cmd, cwd=topdir)
336                    assert valid_scene_code == 0
337            print "Start running ITS on camera %s, %s" % (camera_id, scene)
338            # Extract chart from scene for scene3 once up front
339            chart_loc_arg = ''
340            if scene == 'scene3':
341                if float(camera_fov) < 90 and np.isclose(chart_distance, 20,
342                                                         rtol=0.1):
343                    chart_height *= 0.67
344                chart = its.cv2image.Chart(SCENE3_FILE, chart_height,
345                                           chart_distance, CHART_SCALE_START,
346                                           CHART_SCALE_STOP, CHART_SCALE_STEP)
347                chart_loc_arg = 'chart_loc=%.2f,%.2f,%.2f,%.2f,%.3f' % (
348                        chart.xnorm, chart.ynorm, chart.wnorm, chart.hnorm,
349                        chart.scale)
350            # Run each test, capturing stdout and stderr.
351            for (testname, testpath) in tests:
352                if auto_scene_switch:
353                    if merge_result_switch and camera_ids[0] == '0':
354                        # Send an input event to keep the screen not dimmed.
355                        # Since we are not using camera of chart screen, FOCUS event
356                        # should do nothing but keep the screen from dimming.
357                        # The "sleep after x minutes of inactivity" display setting
358                        # determines how long this command can keep screen bright.
359                        # Setting it to something like 30 minutes should be enough.
360                        cmd = ('adb -s %s shell input keyevent FOCUS'
361                               % chart_host_id)
362                        subprocess.call(cmd.split())
363                t0 = time.time()
364                for num_try in range(NUM_TRYS):
365                    outdir = os.path.join(topdir, camera_id, scene)
366                    outpath = os.path.join(outdir, testname+'_stdout.txt')
367                    errpath = os.path.join(outdir, testname+'_stderr.txt')
368                    if scene == 'sensor_fusion':
369                        if skip_code is not SKIP_RET_CODE:
370                            if rot_rig_id:
371                                print 'Rotating phone w/ rig %s' % rot_rig_id
372                                rig = ('python tools/rotation_rig.py rotator=%s' %
373                                       rot_rig_id)
374                                subprocess.Popen(rig.split())
375                            else:
376                                print 'Rotate phone 15s as shown in SensorFusion.pdf'
377                        else:
378                            test_code = skip_code
379                    if skip_code is not SKIP_RET_CODE:
380                        cmd = ['python', os.path.join(os.getcwd(), testpath)]
381                        cmd += sys.argv[1:] + [camera_id_arg] + [chart_loc_arg]
382                        cmd += [chart_dist_arg]
383                        with open(outpath, 'w') as fout, open(errpath, 'w') as ferr:
384                            test_code = subprocess.call(
385                                cmd, stderr=ferr, stdout=fout, cwd=outdir)
386                    if test_code == 0 or test_code == SKIP_RET_CODE:
387                        break
388                    else:
389                        socket_fail = evaluate_socket_failure(errpath)
390                        if socket_fail:
391                            if num_try != NUM_TRYS-1:
392                                print ' Retry %s/%s' % (scene, testname)
393                            else:
394                                break
395                        else:
396                            break
397                t1 = time.time()
398
399                test_failed = False
400                if test_code == 0:
401                    retstr = "PASS "
402                    numpass += 1
403                elif test_code == SKIP_RET_CODE:
404                    retstr = "SKIP "
405                    numskip += 1
406                elif test_code != 0 and testname in NOT_YET_MANDATED[scene]:
407                    retstr = "FAIL*"
408                    num_not_mandated_fail += 1
409                else:
410                    retstr = "FAIL "
411                    numfail += 1
412                    test_failed = True
413
414                msg = "%s %s/%s [%.1fs]" % (retstr, scene, testname, t1-t0)
415                print msg
416                its.device.adb_log(device_id, msg)
417                msg_short = "%s %s [%.1fs]" % (retstr, testname, t1-t0)
418                if test_failed:
419                    summary += msg_short + "\n"
420
421            if numskip > 0:
422                skipstr = ", %d test%s skipped" % (
423                    numskip, "s" if numskip > 1 else "")
424            else:
425                skipstr = ""
426
427            test_result = "\n%d / %d tests passed (%.1f%%)%s" % (
428                numpass + num_not_mandated_fail, len(tests) - numskip,
429                100.0 * float(numpass + num_not_mandated_fail) /
430                (len(tests) - numskip)
431                if len(tests) != numskip else 100.0, skipstr)
432            print test_result
433
434            if num_not_mandated_fail > 0:
435                msg = "(*) tests are not yet mandated"
436                print msg
437
438            summary_path = os.path.join(topdir, camera_id, scene, "summary.txt")
439            with open(summary_path, "w") as f:
440                f.write(summary)
441
442            passed = numfail == 0
443            results[scene][result_key] = (ItsSession.RESULT_PASS if passed
444                                          else ItsSession.RESULT_FAIL)
445            results[scene][ItsSession.SUMMARY_KEY] = summary_path
446
447        msg = "Reporting ITS result to CtsVerifier"
448        print msg
449        its.device.adb_log(device_id, msg)
450        if merge_result_switch:
451            # results are modified by report_result
452            results_backup = copy.deepcopy(results)
453            its.device.report_result(result_device_id, camera_id, results_backup)
454
455        its.device.report_result(device_id, camera_id, results)
456
457    if auto_scene_switch:
458        if merge_result_switch:
459            print 'Skip shutting down chart screen'
460        else:
461            print 'Shutting down chart screen: ', chart_host_id
462            screen_id_arg = ('screen=%s' % chart_host_id)
463            cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools',
464                                          'turn_off_screen.py'), screen_id_arg]
465            screen_off_code = subprocess.call(cmd)
466            assert screen_off_code == 0
467
468            print 'Shutting down DUT screen: ', device_id
469            screen_id_arg = ('screen=%s' % device_id)
470            cmd = ['python', os.path.join(os.environ['CAMERA_ITS_TOP'], 'tools',
471                                          'turn_off_screen.py'), screen_id_arg]
472            screen_off_code = subprocess.call(cmd)
473            assert screen_off_code == 0
474
475    print "ITS tests finished. Please go back to CtsVerifier and proceed"
476
477if __name__ == '__main__':
478    main()
479