1# Copyright 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions for verifying preview stabilization. 15""" 16 17import cv2 18import fnmatch 19import logging 20import os 21import threading 22import time 23 24import numpy as np 25 26import its_session_utils 27import image_processing_utils 28import sensor_fusion_utils 29import video_processing_utils 30 31_AREA_720P_VIDEO = 1280 * 720 32_ASPECT_RATIO_16_9 = 16/9 # determine if preview fmt > 16:9 33_ASPECT_TOL = 0.01 34_GREEN_TOL = 235 35_RED_BLUE_TOL = 15 36_HIGH_RES_SIZE = '3840x2160' # Resolution for 4K quality 37_IMG_FORMAT = 'png' 38_MIN_PHONE_MOVEMENT_ANGLE = 5 # degrees 39_NATURAL_ORIENTATION_PORTRAIT = (90, 270) # orientation in "normal position" 40_NUM_ROTATIONS = 24 41_PREVIEW_MAX_TESTED_AREA = 1920 * 1440 42_PREVIEW_MIN_TESTED_AREA = 320 * 240 43_PREVIEW_STABILIZATION_FACTOR = 0.7 # 70% of gyro movement allowed 44_SKIP_INITIAL_FRAMES = 15 45_START_FRAME = 30 # give 3A some frames to warm up 46_VIDEO_DELAY_TIME = 5.5 # seconds 47_VIDEO_DURATION = 5.5 # seconds 48_PREVIEW_DURATION = 400 # milliseconds 49 50 51def get_720p_or_above_size(supported_preview_sizes): 52 """Returns the smallest size above or equal to 720p in preview and video. 53 54 If the largest preview size is under 720P, returns the largest value. 55 56 Args: 57 supported_preview_sizes: list; preview sizes. 58 e.g. ['1920x960', '1600x1200', '1920x1080'] 59 Returns: 60 smallest size >= 720p video format 61 """ 62 63 size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 64 smallest_area = float('inf') 65 smallest_720p_or_above_size = '' 66 largest_supported_preview_size = '' 67 largest_area = 0 68 for size in supported_preview_sizes: 69 area = size_to_area(size) 70 if smallest_area > area >= _AREA_720P_VIDEO: 71 smallest_area = area 72 smallest_720p_or_above_size = size 73 else: 74 if area > largest_area: 75 largest_area = area 76 largest_supported_preview_size = size 77 78 if largest_area > _AREA_720P_VIDEO: 79 logging.debug('Smallest 720p or above size: %s', 80 smallest_720p_or_above_size) 81 return smallest_720p_or_above_size 82 else: 83 logging.debug('Largest supported preview size: %s', 84 largest_supported_preview_size) 85 return largest_supported_preview_size 86 87 88def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig, 89 zoom_ratio=None, fps_range=None, hlg10=False, ois=False): 90 """Capture a new set of data from the device. 91 92 Captures camera preview frames while the user is moving the device in 93 the prescribed manner. 94 95 Args: 96 cam: camera object. 97 tablet_device: boolean; based on config file. 98 preview_size: str; preview stream resolution. ex. '1920x1080' 99 stabilize: boolean; whether preview stabilization is ON. 100 rot_rig: dict with 'cntl' and 'ch' defined. 101 zoom_ratio: float; static zoom ratio. None if default zoom. 102 fps_range: list; target fps range. 103 hlg10: boolean; whether to capture hlg10 output. 104 ois: boolean; whether optical image stabilization is ON. 105 Returns: 106 recording object; a dictionary containing output path, video size, etc. 107 """ 108 109 output_surfaces = cam.preview_surface(preview_size, hlg10) 110 return collect_data_with_surfaces(cam, tablet_device, output_surfaces, 111 stabilize, rot_rig, zoom_ratio, 112 fps_range, ois) 113 114 115def collect_data_with_surfaces(cam, tablet_device, output_surfaces, 116 stabilize, rot_rig, zoom_ratio=None, 117 fps_range=None, ois=False): 118 """Capture a new set of data from the device. 119 120 Captures camera preview frames while the user is moving the device in 121 the prescribed manner. 122 123 Args: 124 cam: camera object. 125 tablet_device: boolean; based on config file. 126 output_surfaces: list of dict; The list of output surfaces configured for 127 the recording. Only the first surface is used for recording; the rest are 128 configured, but not requested. 129 stabilize: boolean; whether preview stabilization is ON. 130 rot_rig: dict with 'cntl' and 'ch' defined. 131 zoom_ratio: float; static zoom ratio. None if default zoom. 132 fps_range: list; target fps range. 133 ois: boolean; whether optical image stabilization is ON. 134 Returns: 135 recording object; a dictionary containing output path, video size, etc. 136 """ 137 138 logging.debug('Starting sensor event collection') 139 serial_port = None 140 if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower(): 141 # identify port 142 serial_port = sensor_fusion_utils.serial_port_def( 143 sensor_fusion_utils.ARDUINO_STRING) 144 # send test cmd to Arduino until cmd returns properly 145 sensor_fusion_utils.establish_serial_comm(serial_port) 146 # Start camera vibration 147 if tablet_device: 148 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET 149 else: 150 servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION 151 p = threading.Thread( 152 target=sensor_fusion_utils.rotation_rig, 153 args=( 154 rot_rig['cntl'], 155 rot_rig['ch'], 156 _NUM_ROTATIONS, 157 sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION, 158 servo_speed, 159 sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION, 160 serial_port, 161 ), 162 ) 163 p.start() 164 165 cam.start_sensor_events() 166 # Allow time for rig to start moving 167 time.sleep(_VIDEO_DELAY_TIME) 168 169 # Record video and return recording object 170 min_fps = fps_range[0] if (fps_range is not None) else None 171 max_fps = fps_range[1] if (fps_range is not None) else None 172 recording_obj = cam.do_preview_recording_multiple_surfaces( 173 output_surfaces, _VIDEO_DURATION, stabilize, ois, zoom_ratio=zoom_ratio, 174 ae_target_fps_min=min_fps, ae_target_fps_max=max_fps) 175 176 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 177 logging.debug('Tested quality: %s', recording_obj['quality']) 178 179 # Wait for vibration to stop 180 p.join() 181 182 return recording_obj 183 184 185def verify_preview_stabilization(recording_obj, gyro_events, 186 test_name, log_path, facing, zoom_ratio=None): 187 """Verify the returned recording is properly stabilized. 188 189 Args: 190 recording_obj: Camcorder recording object. 191 gyro_events: Gyroscope events collected while recording. 192 test_name: Name of the test. 193 log_path: Path for the log file. 194 facing: Facing of the camera device. 195 zoom_ratio: Static zoom ratio. None if default zoom. 196 197 Returns: 198 A dictionary containing the maximum gyro angle, the maximum camera angle, 199 and a failure message if the recorded video isn't properly stablilized. 200 """ 201 202 file_name = recording_obj['recordedOutputPath'].split('/')[-1] 203 logging.debug('recorded file name: %s', file_name) 204 video_size = recording_obj['videoSize'] 205 logging.debug('video size: %s', video_size) 206 207 # Get all frames from the video 208 file_list = video_processing_utils.extract_all_frames_from_video( 209 log_path, file_name, _IMG_FORMAT 210 ) 211 frames = [] 212 213 logging.debug('Number of frames %d', len(file_list)) 214 for file in file_list: 215 img = image_processing_utils.convert_image_to_numpy_array( 216 os.path.join(log_path, file) 217 ) 218 frames.append(img / 255) 219 frame_h, frame_w, _ = frames[0].shape 220 logging.debug('Frame size %d x %d', frame_w, frame_h) 221 222 # Extract camera rotations 223 if zoom_ratio: 224 zoom_ratio_suffix = f'{zoom_ratio:.1f}' 225 else: 226 zoom_ratio_suffix = '1' 227 file_name_stem = ( 228 f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x') 229 cam_rots = sensor_fusion_utils.get_cam_rotations( 230 frames[_START_FRAME:], 231 facing, 232 frame_h, 233 file_name_stem, 234 _START_FRAME, 235 stabilized_video=True 236 ) 237 sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME, 238 video_size, file_name_stem) 239 max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle( 240 cam_rots, 'Camera') 241 242 # Extract gyro rotations 243 sensor_fusion_utils.plot_gyro_events( 244 gyro_events, f'{test_name}_{video_size}_{zoom_ratio_suffix}x', 245 log_path) 246 gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement( 247 gyro_events, _VIDEO_DELAY_TIME) 248 max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle( 249 gyro_rots, 'Gyro') 250 logging.debug( 251 'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f', 252 video_size, max_camera_angle, max_gyro_angle, 253 max_camera_angle / max_gyro_angle) 254 255 # Assert phone is moved enough during test 256 if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE: 257 raise AssertionError( 258 f'Phone not moved enough! Movement: {max_gyro_angle}, ' 259 f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees') 260 261 w_x_h = video_size.split('x') 262 if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9: 263 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1 264 else: 265 preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR 266 267 failure_msg = None 268 if max_camera_angle >= max_gyro_angle * preview_stabilization_factor: 269 failure_msg = ( 270 f'{video_size} preview not stabilized enough! ' 271 f'Max preview angle: {max_camera_angle:.3f}, ' 272 f'Max gyro angle: {max_gyro_angle:.3f}, ' 273 f'ratio: {max_camera_angle/max_gyro_angle:.3f} ' 274 f'THRESH: {preview_stabilization_factor}.') 275 # Delete saved frames if the format is a PASS 276 else: 277 try: 278 tmpdir = os.listdir(log_path) 279 except FileNotFoundError: 280 logging.debug('Tmp directory: %s not found', log_path) 281 for file in tmpdir: 282 if fnmatch.fnmatch(file, f'*_{video_size}_stabilized_frame_*'): 283 file_to_remove = os.path.join(log_path, file) 284 try: 285 os.remove(file_to_remove) 286 except FileNotFoundError: 287 logging.debug('File Not Found: %s', str(file)) 288 logging.debug('Format %s passes, frame images removed', video_size) 289 290 return {'gyro': max_gyro_angle, 'cam': max_camera_angle, 291 'failure': failure_msg} 292 293 294def collect_preview_data_with_zoom(cam, preview_size, zoom_start, 295 zoom_end, step_size, recording_duration_ms, 296 padded_frames=False): 297 """Captures a preview video from the device. 298 299 Captures camera preview frames from the passed device. 300 301 Args: 302 cam: camera object. 303 preview_size: str; preview resolution. ex. '1920x1080'. 304 zoom_start: (float) is the starting zoom ratio during recording. 305 zoom_end: (float) is the ending zoom ratio during recording. 306 step_size: (float) is the step for zoom ratio during recording. 307 recording_duration_ms: preview recording duration in ms. 308 padded_frames: boolean; Whether to add additional frames at the beginning 309 and end of recording to workaround issue with MediaRecorder. 310 311 Returns: 312 recording object as described by cam.do_preview_recording_with_dynamic_zoom. 313 """ 314 recording_obj = cam.do_preview_recording_with_dynamic_zoom( 315 preview_size, 316 stabilize=False, 317 sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms), 318 padded_frames=padded_frames 319 ) 320 logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath']) 321 logging.debug('Tested quality: %s', recording_obj['quality']) 322 return recording_obj 323 324 325def is_aspect_ratio_match(size_str, target_ratio): 326 """Checks if a resolution string matches the target aspect ratio.""" 327 width, height = map(int, size_str.split('x')) 328 return abs(width / height - target_ratio) < _ASPECT_TOL 329 330 331def get_max_preview_test_size(cam, camera_id, aspect_ratio=None): 332 """Finds the max preview size to be tested. 333 334 If the device supports the _HIGH_RES_SIZE preview size then 335 it uses that for testing, otherwise uses the max supported 336 preview size capped at _PREVIEW_MAX_TESTED_AREA. 337 338 Args: 339 cam: camera object 340 camera_id: str; camera device id under test 341 aspect_ratio: preferred aspect_ratio For example: '4/3' 342 343 Returns: 344 preview_test_size: str; wxh resolution of the size to be tested 345 """ 346 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 347 supported_preview_sizes = cam.get_all_supported_preview_sizes(camera_id) 348 if aspect_ratio is None: 349 supported_preview_sizes = [size for size in supported_preview_sizes 350 if resolution_to_area(size) 351 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 352 else: 353 supported_preview_sizes = [size for size in supported_preview_sizes 354 if resolution_to_area(size) 355 >= video_processing_utils.LOWEST_RES_TESTED_AREA 356 and is_aspect_ratio_match(size, aspect_ratio)] 357 358 logging.debug('Supported preview resolutions: %s', supported_preview_sizes) 359 360 if _HIGH_RES_SIZE in supported_preview_sizes: 361 preview_test_size = _HIGH_RES_SIZE 362 else: 363 capped_supported_preview_sizes = [ 364 size 365 for size in supported_preview_sizes 366 if ( 367 resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA 368 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 369 ) 370 ] 371 preview_test_size = capped_supported_preview_sizes[-1] 372 373 logging.debug('Selected preview resolution: %s', preview_test_size) 374 375 return preview_test_size 376 377 378def get_max_extension_preview_test_size(cam, camera_id, extension): 379 """Finds the max preview size for an extension to be tested. 380 381 If the device supports the _HIGH_RES_SIZE preview size then 382 it uses that for testing, otherwise uses the max supported 383 preview size capped at _PREVIEW_MAX_TESTED_AREA. 384 385 Args: 386 cam: camera object 387 camera_id: str; camera device id under test 388 extension: int; camera extension mode under test 389 390 Returns: 391 preview_test_size: str; wxh resolution of the size to be tested 392 """ 393 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 394 supported_preview_sizes = ( 395 cam.get_supported_extension_preview_sizes(camera_id, extension)) 396 supported_preview_sizes = [size for size in supported_preview_sizes 397 if resolution_to_area(size) 398 >= video_processing_utils.LOWEST_RES_TESTED_AREA] 399 logging.debug('Supported preview resolutions for extension %d: %s', 400 extension, supported_preview_sizes) 401 402 if _HIGH_RES_SIZE in supported_preview_sizes: 403 preview_test_size = _HIGH_RES_SIZE 404 else: 405 capped_supported_preview_sizes = [ 406 size 407 for size in supported_preview_sizes 408 if ( 409 resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA 410 and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA 411 ) 412 ] 413 preview_test_size = capped_supported_preview_sizes[-1] 414 415 logging.debug('Selected preview resolution: %s', preview_test_size) 416 417 return preview_test_size 418 419 420def mirror_preview_image_by_sensor_orientation( 421 sensor_orientation, input_preview_img): 422 """If testing front camera, mirror preview image to match camera capture. 423 424 Preview are flipped on device's natural orientation, so for sensor 425 orientation 90 or 270, it is up or down. Sensor orientation 0 or 180 426 is left or right. 427 428 Args: 429 sensor_orientation: integer; display orientation in natural position. 430 input_preview_img: numpy array; image extracted from preview recording. 431 Returns: 432 output_preview_img: numpy array; flipped according to natural orientation. 433 """ 434 if sensor_orientation in _NATURAL_ORIENTATION_PORTRAIT: 435 # Opencv expects a numpy array but np.flip generates a 'view' which 436 # doesn't work with opencv. ndarray.copy forces copy instead of view. 437 output_preview_img = np.ndarray.copy(np.flipud(input_preview_img)) 438 logging.debug( 439 'Found sensor orientation %d, flipping up down', sensor_orientation) 440 else: 441 output_preview_img = np.ndarray.copy(np.fliplr(input_preview_img)) 442 logging.debug( 443 'Found sensor orientation %d, flipping left right', sensor_orientation) 444 445 return output_preview_img 446 447 448def is_image_green(image_path): 449 """Checks if an image is mostly green. 450 451 Checks if an image is mostly green by ensuring green is dominant 452 and red/blue values are low. 453 454 Args: 455 image_path: str; The path to the image file. 456 457 Returns: 458 bool: True if mostly green, False otherwise. 459 """ 460 461 image = cv2.imread(image_path) 462 463 average_color = np.mean(image, axis=(0, 1)) 464 465 # Extract individual color values 466 blue_value = average_color[0] 467 green_value = average_color[1] 468 red_value = average_color[2] 469 470 # Check if green is dominant and red/blue are below the threshold 471 if (green_value > _GREEN_TOL and 472 red_value < _RED_BLUE_TOL and 473 blue_value < _RED_BLUE_TOL): 474 return True 475 else: 476 return False 477 478 479def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size, 480 log_path): 481 """Captures a preview video from the device over zoom range. 482 483 Captures camera preview frames at various zoom level in zoom range. 484 485 Args: 486 dut: device under test 487 cam: camera object 488 preview_size: str; preview resolution. ex. '1920x1080' 489 z_min: minimum zoom for preview capture 490 z_max: maximum zoom for preview capture 491 z_step_size: zoom step size from min to max 492 log_path: str; path for video file directory 493 494 Returns: 495 capture_results: total capture results of each frame 496 file_list: file name for each frame 497 """ 498 logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f', 499 z_min, z_max, z_step_size) 500 501 # Converge 3A 502 cam.do_3a() 503 504 # recording preview 505 # TODO: b/350821827 - encode time stamps in camera frames instead of 506 # padded green frams 507 # MediaRecorder on some devices drop last few frames. To solve this issue 508 # add green frames as padding at the end of recorded camera frames. This way 509 # green buffer frames would be droped by MediaRecorder instead of actual 510 # frames. Later these green padded frames are removed. 511 preview_rec_obj = collect_preview_data_with_zoom( 512 cam, preview_size, z_min, z_max, z_step_size, 513 _PREVIEW_DURATION, padded_frames=True) 514 515 preview_file_name = its_session_utils.pull_file_from_dut( 516 dut, preview_rec_obj['recordedOutputPath'], log_path) 517 518 logging.debug('recorded video size : %s', 519 str(preview_rec_obj['videoSize'])) 520 521 # Extract frames as png from mp4 preview recording 522 file_list = video_processing_utils.extract_all_frames_from_video( 523 log_path, preview_file_name, _IMG_FORMAT 524 ) 525 526 first_camera_frame_idx = 0 527 last_camera_frame_idx = len(file_list) 528 529 # Find index of the first-non green frame 530 for (idx, file_name) in enumerate(file_list): 531 file_path = os.path.join(log_path, file_name) 532 if is_image_green(file_path): 533 its_session_utils.remove_file(file_path) 534 logging.debug('Removed green file %s', file_name) 535 else: 536 logging.debug('First camera frame: %s', file_name) 537 first_camera_frame_idx = idx 538 break 539 540 # Find index of last non-green frame 541 for (idx, file_name) in reversed(list(enumerate(file_list))): 542 file_path = os.path.join(log_path, file_name) 543 if is_image_green(file_path): 544 its_session_utils.remove_file(file_path) 545 logging.debug('Removed green file %s', file_name) 546 else: 547 logging.debug('Last camera frame: %s', file_name) 548 last_camera_frame_idx = idx 549 break 550 551 logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx, 552 last_camera_frame_idx) 553 file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1] 554 555 # Raise error if capture result and frame count doesn't match 556 capture_results = preview_rec_obj['captureMetadata'] 557 extra_capture_result_count = len(capture_results) - len(file_list) 558 logging.debug('Number of frames %d', len(file_list)) 559 if extra_capture_result_count != 0: 560 its_session_utils.remove_frame_files(log_path) 561 e_msg = (f'Number of CaptureResult ({len(capture_results)}) ' 562 f'vs number of Frames ({len(file_list)}) count mismatch.' 563 ' Retry Test.') 564 raise AssertionError(e_msg) 565 566 # skip frames which might not have 3A converged 567 capture_results = capture_results[_SKIP_INITIAL_FRAMES:] 568 skipped_files = file_list[:_SKIP_INITIAL_FRAMES] 569 file_list = file_list[_SKIP_INITIAL_FRAMES:] 570 571 # delete skipped files 572 for file_name in skipped_files: 573 its_session_utils.remove_file(os.path.join(log_path, file_name)) 574 575 return capture_results, file_list 576