1# Copyright 2016 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16import os.path 17import re 18import subprocess 19import sys 20import tempfile 21import time 22 23import its.device 24import numpy 25 26SCENE_NAME = 'sensor_fusion' 27SKIP_RET_CODE = 101 28TEST_NAME = 'test_sensor_fusion' 29TEST_DIR = os.path.join(os.getcwd(), 'tests', SCENE_NAME) 30W, H = 640, 480 31 32# For finding best correlation shifts from test output logs. 33SHIFT_RE = re.compile('^Best correlation of [0-9.]+ at shift of [-0-9.]+ms$') 34# For finding lines that indicate socket issues in failed test runs. 35SOCKET_FAIL_RE = re.compile( 36 r'.*((socket\.(error|timeout))|(Problem with socket)).*') 37 38FPS = 30 39TEST_LENGTH = 7 # seconds 40 41 42def main(): 43 """Run test_sensor_fusion NUM_RUNS times. 44 45 Save intermediate files and produce a summary/report of the results. 46 47 Script should be run from the top-level CameraITS directory. 48 49 Command line arguments: 50 camera: Camera(s) to be tested. Use comma to separate multiple 51 camera Ids. Ex: 'camera=0,1' or 'camera=1' 52 device: Device id for adb 53 fps: FPS to capture with during the test 54 img_size: Comma-separated dimensions of captured images (defaults to 55 640x480). Ex: 'img_size=<width>,<height>' 56 num_runs: Number of times to repeat the test 57 rotator: String for rotator id in for vid:pid:ch 58 test_length: How long the test should run for (in seconds) 59 tmp_dir: Location of temp directory for output files 60 """ 61 62 camera_id = '0' 63 fps = str(FPS) 64 img_size = '%s,%s' % (W, H) 65 num_runs = 1 66 rotator_ids = 'default' 67 test_length = str(TEST_LENGTH) 68 tmp_dir = None 69 for s in sys.argv[1:]: 70 if s[:7] == 'camera=' and len(s) > 7: 71 camera_id = s[7:] 72 if s[:4] == 'fps=' and len(s) > 4: 73 fps = s[4:] 74 elif s[:9] == 'img_size=' and len(s) > 9: 75 img_size = s[9:] 76 elif s[:9] == 'num_runs=' and len(s) > 9: 77 num_runs = int(s[9:]) 78 elif s[:8] == 'rotator=' and len(s) > 8: 79 rotator_ids = s[8:] 80 elif s[:12] == 'test_length=' and len(s) > 12: 81 test_length = s[12:] 82 elif s[:8] == 'tmp_dir=' and len(s) > 8: 83 tmp_dir = s[8:] 84 85 # Make output directories to hold the generated files. 86 tmpdir = tempfile.mkdtemp(dir=tmp_dir) 87 print 'Saving output files to:', tmpdir, '\n' 88 89 device_id = its.device.get_device_id() 90 device_id_arg = 'device=' + device_id 91 print 'Testing device ' + device_id 92 93 # ensure camera_id is valid 94 avail_camera_ids = find_avail_camera_ids(device_id_arg, tmpdir) 95 if camera_id not in avail_camera_ids: 96 print 'Need to specify valid camera_id in ', avail_camera_ids 97 sys.exit() 98 99 camera_id_arg = 'camera=' + camera_id 100 if rotator_ids: 101 rotator_id_arg = 'rotator=' + rotator_ids 102 print 'Preparing to run sensor_fusion on camera', camera_id 103 104 img_size_arg = 'img_size=' + img_size 105 print 'Image dimensions are ' + 'x'.join(img_size.split(',')) 106 107 fps_arg = 'fps=' + fps 108 test_length_arg = 'test_length=' + test_length 109 print 'Capturing at %sfps' % fps 110 111 os.mkdir(os.path.join(tmpdir, camera_id)) 112 113 # Run test "num_runs" times, capturing stdout and stderr. 114 num_pass = 0 115 num_fail = 0 116 num_skip = 0 117 num_socket_fails = 0 118 num_non_socket_fails = 0 119 shift_list = [] 120 for i in range(num_runs): 121 os.mkdir(os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i))) 122 cmd = 'python tools/rotation_rig.py rotator=%s' % rotator_ids 123 subprocess.Popen(cmd.split()) 124 cmd = ['python', os.path.join(TEST_DIR, TEST_NAME+'.py'), 125 device_id_arg, camera_id_arg, rotator_id_arg, img_size_arg, 126 fps_arg, test_length_arg] 127 outdir = os.path.join(tmpdir, camera_id, SCENE_NAME+'_'+str(i)) 128 outpath = os.path.join(outdir, TEST_NAME+'_stdout.txt') 129 errpath = os.path.join(outdir, TEST_NAME+'_stderr.txt') 130 t0 = time.time() 131 with open(outpath, 'w') as fout, open(errpath, 'w') as ferr: 132 retcode = subprocess.call( 133 cmd, stderr=ferr, stdout=fout, cwd=outdir) 134 t1 = time.time() 135 136 if retcode == 0: 137 retstr = 'PASS ' 138 time_shift = find_time_shift(outpath) 139 shift_list.append(time_shift) 140 num_pass += 1 141 elif retcode == SKIP_RET_CODE: 142 retstr = 'SKIP ' 143 num_skip += 1 144 else: 145 retstr = 'FAIL ' 146 time_shift = find_time_shift(outpath) 147 if time_shift is None: 148 if is_socket_fail(errpath): 149 num_socket_fails += 1 150 else: 151 num_non_socket_fails += 1 152 else: 153 shift_list.append(time_shift) 154 num_fail += 1 155 msg = '%s %s/%s [%.1fs]' % (retstr, SCENE_NAME, TEST_NAME, t1-t0) 156 print msg 157 158 if num_pass == 1: 159 print 'Best shift is %sms' % shift_list[0] 160 elif num_pass > 1: 161 shift_arr = numpy.array(shift_list) 162 mean, std = numpy.mean(shift_arr), numpy.std(shift_arr) 163 print 'Best shift mean is %sms with std. dev. of %sms' % (mean, std) 164 165 pass_percentage = 100*float(num_pass+num_skip)/num_runs 166 print '%d / %d tests passed (%.1f%%)' % (num_pass+num_skip, 167 num_runs, 168 pass_percentage) 169 170 if num_socket_fails != 0: 171 print '%s failure(s) due to socket issues' % num_socket_fails 172 if num_non_socket_fails != 0: 173 print '%s non-socket failure(s)' % num_non_socket_fails 174 175 176def is_socket_fail(err_file_path): 177 """Search through a test run's stderr log for any mention of socket issues. 178 179 Args: 180 err_file_path: File path for stderr logs to search through 181 182 Returns: 183 True if the test run failed and it was due to socket issues. Otherwise, 184 False. 185 """ 186 return find_matching_line(err_file_path, SOCKET_FAIL_RE) is not None 187 188 189def find_time_shift(out_file_path): 190 """Search through a test run's stdout log for the best time shift. 191 192 Args: 193 out_file_path: File path for stdout logs to search through 194 195 Returns: 196 The best time shift, if one is found. Otherwise, returns None. 197 """ 198 line = find_matching_line(out_file_path, SHIFT_RE) 199 if line is None: 200 return None 201 else: 202 words = line.split(' ') 203 # Get last word and strip off 'ms\n' before converting to a float. 204 return float(words[-1][:-3]) 205 206 207def find_matching_line(file_path, regex): 208 """Search each line in the file at 'file_path' for a line matching 'regex'. 209 210 Args: 211 file_path: File path for file being searched 212 regex: Regex used to match against lines 213 214 Returns: 215 The first matching line. If none exists, returns None. 216 """ 217 with open(file_path) as f: 218 for line in f: 219 if regex.match(line): 220 return line 221 return None 222 223def find_avail_camera_ids(device_id_arg, tmpdir): 224 """Find the available camera IDs. 225 226 Args: 227 devices_id_arg(str): device=### 228 tmpdir(str): generated tmp dir for run 229 Returns: 230 list of available cameras 231 """ 232 with its.device.ItsSession() as cam: 233 avail_camera_ids = cam.get_camera_ids() 234 return avail_camera_ids 235 236 237if __name__ == '__main__': 238 main() 239 240