1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions for verifying preview stabilization.
15"""
16
17import cv2
18import fnmatch
19import logging
20import os
21import threading
22import time
23
24import numpy as np
25
26import its_session_utils
27import image_processing_utils
28import sensor_fusion_utils
29import video_processing_utils
30
31_AREA_720P_VIDEO = 1280 * 720
32_ASPECT_RATIO_16_9 = 16/9  # determine if preview fmt > 16:9
33_ASPECT_TOL = 0.01
34_GREEN_TOL = 235
35_RED_BLUE_TOL = 15
36_HIGH_RES_SIZE = '3840x2160'  # Resolution for 4K quality
37_IMG_FORMAT = 'png'
38_MIN_PHONE_MOVEMENT_ANGLE = 5  # degrees
39_NATURAL_ORIENTATION_PORTRAIT = (90, 270)  # orientation in "normal position"
40_NUM_ROTATIONS = 24
41_PREVIEW_MAX_TESTED_AREA = 1920 * 1440
42_PREVIEW_MIN_TESTED_AREA = 320 * 240
43_PREVIEW_STABILIZATION_FACTOR = 0.7  # 70% of gyro movement allowed
44_SKIP_INITIAL_FRAMES = 15
45_START_FRAME = 30  # give 3A some frames to warm up
46_VIDEO_DELAY_TIME = 5.5  # seconds
47_VIDEO_DURATION = 5.5  # seconds
48_PREVIEW_DURATION = 400  # milliseconds
49
50
51def get_720p_or_above_size(supported_preview_sizes):
52  """Returns the smallest size above or equal to 720p in preview and video.
53
54  If the largest preview size is under 720P, returns the largest value.
55
56  Args:
57    supported_preview_sizes: list; preview sizes.
58      e.g. ['1920x960', '1600x1200', '1920x1080']
59  Returns:
60    smallest size >= 720p video format
61  """
62
63  size_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
64  smallest_area = float('inf')
65  smallest_720p_or_above_size = ''
66  largest_supported_preview_size = ''
67  largest_area = 0
68  for size in supported_preview_sizes:
69    area = size_to_area(size)
70    if smallest_area > area >= _AREA_720P_VIDEO:
71      smallest_area = area
72      smallest_720p_or_above_size = size
73    else:
74      if area > largest_area:
75        largest_area = area
76        largest_supported_preview_size = size
77
78  if largest_area > _AREA_720P_VIDEO:
79    logging.debug('Smallest 720p or above size: %s',
80                  smallest_720p_or_above_size)
81    return smallest_720p_or_above_size
82  else:
83    logging.debug('Largest supported preview size: %s',
84                  largest_supported_preview_size)
85    return largest_supported_preview_size
86
87
88def collect_data(cam, tablet_device, preview_size, stabilize, rot_rig,
89                 zoom_ratio=None, fps_range=None, hlg10=False, ois=False):
90  """Capture a new set of data from the device.
91
92  Captures camera preview frames while the user is moving the device in
93  the prescribed manner.
94
95  Args:
96    cam: camera object.
97    tablet_device: boolean; based on config file.
98    preview_size: str; preview stream resolution. ex. '1920x1080'
99    stabilize: boolean; whether preview stabilization is ON.
100    rot_rig: dict with 'cntl' and 'ch' defined.
101    zoom_ratio: float; static zoom ratio. None if default zoom.
102    fps_range: list; target fps range.
103    hlg10: boolean; whether to capture hlg10 output.
104    ois: boolean; whether optical image stabilization is ON.
105  Returns:
106    recording object; a dictionary containing output path, video size, etc.
107  """
108
109  output_surfaces = cam.preview_surface(preview_size, hlg10)
110  return collect_data_with_surfaces(cam, tablet_device, output_surfaces,
111                                    stabilize, rot_rig, zoom_ratio,
112                                    fps_range, ois)
113
114
115def collect_data_with_surfaces(cam, tablet_device, output_surfaces,
116                               stabilize, rot_rig, zoom_ratio=None,
117                               fps_range=None, ois=False):
118  """Capture a new set of data from the device.
119
120  Captures camera preview frames while the user is moving the device in
121  the prescribed manner.
122
123  Args:
124    cam: camera object.
125    tablet_device: boolean; based on config file.
126    output_surfaces: list of dict; The list of output surfaces configured for
127      the recording. Only the first surface is used for recording; the rest are
128      configured, but not requested.
129    stabilize: boolean; whether preview stabilization is ON.
130    rot_rig: dict with 'cntl' and 'ch' defined.
131    zoom_ratio: float; static zoom ratio. None if default zoom.
132    fps_range: list; target fps range.
133    ois: boolean; whether optical image stabilization is ON.
134  Returns:
135    recording object; a dictionary containing output path, video size, etc.
136  """
137
138  logging.debug('Starting sensor event collection')
139  serial_port = None
140  if rot_rig['cntl'].lower() == sensor_fusion_utils.ARDUINO_STRING.lower():
141    # identify port
142    serial_port = sensor_fusion_utils.serial_port_def(
143        sensor_fusion_utils.ARDUINO_STRING)
144    # send test cmd to Arduino until cmd returns properly
145    sensor_fusion_utils.establish_serial_comm(serial_port)
146  # Start camera vibration
147  if tablet_device:
148    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION_TABLET
149  else:
150    servo_speed = sensor_fusion_utils.ARDUINO_SERVO_SPEED_STABILIZATION
151  p = threading.Thread(
152      target=sensor_fusion_utils.rotation_rig,
153      args=(
154          rot_rig['cntl'],
155          rot_rig['ch'],
156          _NUM_ROTATIONS,
157          sensor_fusion_utils.ARDUINO_ANGLES_STABILIZATION,
158          servo_speed,
159          sensor_fusion_utils.ARDUINO_MOVE_TIME_STABILIZATION,
160          serial_port,
161      ),
162  )
163  p.start()
164
165  cam.start_sensor_events()
166  # Allow time for rig to start moving
167  time.sleep(_VIDEO_DELAY_TIME)
168
169  # Record video and return recording object
170  min_fps = fps_range[0] if (fps_range is not None) else None
171  max_fps = fps_range[1] if (fps_range is not None) else None
172  recording_obj = cam.do_preview_recording_multiple_surfaces(
173      output_surfaces, _VIDEO_DURATION, stabilize, ois, zoom_ratio=zoom_ratio,
174      ae_target_fps_min=min_fps, ae_target_fps_max=max_fps)
175
176  logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
177  logging.debug('Tested quality: %s', recording_obj['quality'])
178
179  # Wait for vibration to stop
180  p.join()
181
182  return recording_obj
183
184
185def verify_preview_stabilization(recording_obj, gyro_events,
186                                 test_name, log_path, facing, zoom_ratio=None):
187  """Verify the returned recording is properly stabilized.
188
189  Args:
190    recording_obj: Camcorder recording object.
191    gyro_events: Gyroscope events collected while recording.
192    test_name: Name of the test.
193    log_path: Path for the log file.
194    facing: Facing of the camera device.
195    zoom_ratio: Static zoom ratio. None if default zoom.
196
197  Returns:
198    A dictionary containing the maximum gyro angle, the maximum camera angle,
199    and a failure message if the recorded video isn't properly stablilized.
200  """
201
202  file_name = recording_obj['recordedOutputPath'].split('/')[-1]
203  logging.debug('recorded file name: %s', file_name)
204  video_size = recording_obj['videoSize']
205  logging.debug('video size: %s', video_size)
206
207  # Get all frames from the video
208  file_list = video_processing_utils.extract_all_frames_from_video(
209      log_path, file_name, _IMG_FORMAT
210  )
211  frames = []
212
213  logging.debug('Number of frames %d', len(file_list))
214  for file in file_list:
215    img = image_processing_utils.convert_image_to_numpy_array(
216        os.path.join(log_path, file)
217    )
218    frames.append(img / 255)
219  frame_h, frame_w, _ = frames[0].shape
220  logging.debug('Frame size %d x %d', frame_w, frame_h)
221
222  # Extract camera rotations
223  if zoom_ratio:
224    zoom_ratio_suffix = f'{zoom_ratio:.1f}'
225  else:
226    zoom_ratio_suffix = '1'
227  file_name_stem = (
228      f'{os.path.join(log_path, test_name)}_{video_size}_{zoom_ratio_suffix}x')
229  cam_rots = sensor_fusion_utils.get_cam_rotations(
230      frames[_START_FRAME:],
231      facing,
232      frame_h,
233      file_name_stem,
234      _START_FRAME,
235      stabilized_video=True
236  )
237  sensor_fusion_utils.plot_camera_rotations(cam_rots, _START_FRAME,
238                                            video_size, file_name_stem)
239  max_camera_angle = sensor_fusion_utils.calc_max_rotation_angle(
240      cam_rots, 'Camera')
241
242  # Extract gyro rotations
243  sensor_fusion_utils.plot_gyro_events(
244      gyro_events, f'{test_name}_{video_size}_{zoom_ratio_suffix}x',
245      log_path)
246  gyro_rots = sensor_fusion_utils.conv_acceleration_to_movement(
247      gyro_events, _VIDEO_DELAY_TIME)
248  max_gyro_angle = sensor_fusion_utils.calc_max_rotation_angle(
249      gyro_rots, 'Gyro')
250  logging.debug(
251      'Max deflection (degrees) %s: video: %.3f, gyro: %.3f ratio: %.4f',
252      video_size, max_camera_angle, max_gyro_angle,
253      max_camera_angle / max_gyro_angle)
254
255  # Assert phone is moved enough during test
256  if max_gyro_angle < _MIN_PHONE_MOVEMENT_ANGLE:
257    raise AssertionError(
258        f'Phone not moved enough! Movement: {max_gyro_angle}, '
259        f'THRESH: {_MIN_PHONE_MOVEMENT_ANGLE} degrees')
260
261  w_x_h = video_size.split('x')
262  if int(w_x_h[0])/int(w_x_h[1]) > _ASPECT_RATIO_16_9:
263    preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR * 1.1
264  else:
265    preview_stabilization_factor = _PREVIEW_STABILIZATION_FACTOR
266
267  failure_msg = None
268  if max_camera_angle >= max_gyro_angle * preview_stabilization_factor:
269    failure_msg = (
270        f'{video_size} preview not stabilized enough! '
271        f'Max preview angle:  {max_camera_angle:.3f}, '
272        f'Max gyro angle: {max_gyro_angle:.3f}, '
273        f'ratio: {max_camera_angle/max_gyro_angle:.3f} '
274        f'THRESH: {preview_stabilization_factor}.')
275  # Delete saved frames if the format is a PASS
276  else:
277    try:
278      tmpdir = os.listdir(log_path)
279    except FileNotFoundError:
280      logging.debug('Tmp directory: %s not found', log_path)
281    for file in tmpdir:
282      if fnmatch.fnmatch(file, f'*_{video_size}_stabilized_frame_*'):
283        file_to_remove = os.path.join(log_path, file)
284        try:
285          os.remove(file_to_remove)
286        except FileNotFoundError:
287          logging.debug('File Not Found: %s', str(file))
288    logging.debug('Format %s passes, frame images removed', video_size)
289
290  return {'gyro': max_gyro_angle, 'cam': max_camera_angle,
291          'failure': failure_msg}
292
293
294def collect_preview_data_with_zoom(cam, preview_size, zoom_start,
295                                   zoom_end, step_size, recording_duration_ms,
296                                   padded_frames=False):
297  """Captures a preview video from the device.
298
299  Captures camera preview frames from the passed device.
300
301  Args:
302    cam: camera object.
303    preview_size: str; preview resolution. ex. '1920x1080'.
304    zoom_start: (float) is the starting zoom ratio during recording.
305    zoom_end: (float) is the ending zoom ratio during recording.
306    step_size: (float) is the step for zoom ratio during recording.
307    recording_duration_ms: preview recording duration in ms.
308    padded_frames: boolean; Whether to add additional frames at the beginning
309      and end of recording to workaround issue with MediaRecorder.
310
311  Returns:
312    recording object as described by cam.do_preview_recording_with_dynamic_zoom.
313  """
314  recording_obj = cam.do_preview_recording_with_dynamic_zoom(
315      preview_size,
316      stabilize=False,
317      sweep_zoom=(zoom_start, zoom_end, step_size, recording_duration_ms),
318      padded_frames=padded_frames
319  )
320  logging.debug('Recorded output path: %s', recording_obj['recordedOutputPath'])
321  logging.debug('Tested quality: %s', recording_obj['quality'])
322  return recording_obj
323
324
325def is_aspect_ratio_match(size_str, target_ratio):
326  """Checks if a resolution string matches the target aspect ratio."""
327  width, height = map(int, size_str.split('x'))
328  return abs(width / height - target_ratio) < _ASPECT_TOL
329
330
331def get_max_preview_test_size(cam, camera_id, aspect_ratio=None):
332  """Finds the max preview size to be tested.
333
334  If the device supports the _HIGH_RES_SIZE preview size then
335  it uses that for testing, otherwise uses the max supported
336  preview size capped at _PREVIEW_MAX_TESTED_AREA.
337
338  Args:
339    cam: camera object
340    camera_id: str; camera device id under test
341    aspect_ratio: preferred aspect_ratio For example: '4/3'
342
343  Returns:
344    preview_test_size: str; wxh resolution of the size to be tested
345  """
346  resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
347  supported_preview_sizes = cam.get_all_supported_preview_sizes(camera_id)
348  if aspect_ratio is None:
349    supported_preview_sizes = [size for size in supported_preview_sizes
350                               if resolution_to_area(size)
351                               >= video_processing_utils.LOWEST_RES_TESTED_AREA]
352  else:
353    supported_preview_sizes = [size for size in supported_preview_sizes
354                               if resolution_to_area(size)
355                               >= video_processing_utils.LOWEST_RES_TESTED_AREA
356                               and is_aspect_ratio_match(size, aspect_ratio)]
357
358  logging.debug('Supported preview resolutions: %s', supported_preview_sizes)
359
360  if _HIGH_RES_SIZE in supported_preview_sizes:
361    preview_test_size = _HIGH_RES_SIZE
362  else:
363    capped_supported_preview_sizes = [
364        size
365        for size in supported_preview_sizes
366        if (
367            resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA
368            and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
369        )
370    ]
371    preview_test_size = capped_supported_preview_sizes[-1]
372
373  logging.debug('Selected preview resolution: %s', preview_test_size)
374
375  return preview_test_size
376
377
378def get_max_extension_preview_test_size(cam, camera_id, extension):
379  """Finds the max preview size for an extension to be tested.
380
381  If the device supports the _HIGH_RES_SIZE preview size then
382  it uses that for testing, otherwise uses the max supported
383  preview size capped at _PREVIEW_MAX_TESTED_AREA.
384
385  Args:
386    cam: camera object
387    camera_id: str; camera device id under test
388    extension: int; camera extension mode under test
389
390  Returns:
391    preview_test_size: str; wxh resolution of the size to be tested
392  """
393  resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1])
394  supported_preview_sizes = (
395      cam.get_supported_extension_preview_sizes(camera_id, extension))
396  supported_preview_sizes = [size for size in supported_preview_sizes
397                             if resolution_to_area(size)
398                             >= video_processing_utils.LOWEST_RES_TESTED_AREA]
399  logging.debug('Supported preview resolutions for extension %d: %s',
400                extension, supported_preview_sizes)
401
402  if _HIGH_RES_SIZE in supported_preview_sizes:
403    preview_test_size = _HIGH_RES_SIZE
404  else:
405    capped_supported_preview_sizes = [
406        size
407        for size in supported_preview_sizes
408        if (
409            resolution_to_area(size) <= _PREVIEW_MAX_TESTED_AREA
410            and resolution_to_area(size) >= _PREVIEW_MIN_TESTED_AREA
411        )
412    ]
413    preview_test_size = capped_supported_preview_sizes[-1]
414
415  logging.debug('Selected preview resolution: %s', preview_test_size)
416
417  return preview_test_size
418
419
420def mirror_preview_image_by_sensor_orientation(
421    sensor_orientation, input_preview_img):
422  """If testing front camera, mirror preview image to match camera capture.
423
424  Preview are flipped on device's natural orientation, so for sensor
425  orientation 90 or 270, it is up or down. Sensor orientation 0 or 180
426  is left or right.
427
428  Args:
429    sensor_orientation: integer; display orientation in natural position.
430    input_preview_img: numpy array; image extracted from preview recording.
431  Returns:
432    output_preview_img: numpy array; flipped according to natural orientation.
433  """
434  if sensor_orientation in _NATURAL_ORIENTATION_PORTRAIT:
435    # Opencv expects a numpy array but np.flip generates a 'view' which
436    # doesn't work with opencv. ndarray.copy forces copy instead of view.
437    output_preview_img = np.ndarray.copy(np.flipud(input_preview_img))
438    logging.debug(
439        'Found sensor orientation %d, flipping up down', sensor_orientation)
440  else:
441    output_preview_img = np.ndarray.copy(np.fliplr(input_preview_img))
442    logging.debug(
443        'Found sensor orientation %d, flipping left right', sensor_orientation)
444
445  return output_preview_img
446
447
448def is_image_green(image_path):
449  """Checks if an image is mostly green.
450
451  Checks if an image is mostly green by ensuring green is dominant
452  and red/blue values are low.
453
454  Args:
455    image_path: str; The path to the image file.
456
457  Returns:
458    bool: True if mostly green, False otherwise.
459  """
460
461  image = cv2.imread(image_path)
462
463  average_color = np.mean(image, axis=(0, 1))
464
465  # Extract individual color values
466  blue_value = average_color[0]
467  green_value = average_color[1]
468  red_value = average_color[2]
469
470  # Check if green is dominant and red/blue are below the threshold
471  if (green_value > _GREEN_TOL and
472      red_value < _RED_BLUE_TOL and
473      blue_value < _RED_BLUE_TOL):
474    return True
475  else:
476    return False
477
478
479def preview_over_zoom_range(dut, cam, preview_size, z_min, z_max, z_step_size,
480                            log_path):
481  """Captures a preview video from the device over zoom range.
482
483  Captures camera preview frames at various zoom level in zoom range.
484
485  Args:
486    dut: device under test
487    cam: camera object
488    preview_size: str; preview resolution. ex. '1920x1080'
489    z_min: minimum zoom for preview capture
490    z_max: maximum zoom for preview capture
491    z_step_size: zoom step size from min to max
492    log_path: str; path for video file directory
493
494  Returns:
495    capture_results: total capture results of each frame
496    file_list: file name for each frame
497  """
498  logging.debug('z_min : %.2f, z_max = %.2f, z_step_size = %.2f',
499                z_min, z_max, z_step_size)
500
501  # Converge 3A
502  cam.do_3a()
503
504  # recording preview
505  # TODO: b/350821827 - encode time stamps in camera frames instead of
506  #                     padded green frams
507  # MediaRecorder on some devices drop last few frames. To solve this issue
508  # add green frames as padding at the end of recorded camera frames. This way
509  # green buffer frames would be droped by MediaRecorder instead of actual
510  # frames. Later these green padded frames are removed.
511  preview_rec_obj = collect_preview_data_with_zoom(
512      cam, preview_size, z_min, z_max, z_step_size,
513      _PREVIEW_DURATION, padded_frames=True)
514
515  preview_file_name = its_session_utils.pull_file_from_dut(
516      dut, preview_rec_obj['recordedOutputPath'], log_path)
517
518  logging.debug('recorded video size : %s',
519                str(preview_rec_obj['videoSize']))
520
521  # Extract frames as png from mp4 preview recording
522  file_list = video_processing_utils.extract_all_frames_from_video(
523      log_path, preview_file_name, _IMG_FORMAT
524  )
525
526  first_camera_frame_idx = 0
527  last_camera_frame_idx = len(file_list)
528
529  # Find index of the first-non green frame
530  for (idx, file_name) in enumerate(file_list):
531    file_path = os.path.join(log_path, file_name)
532    if is_image_green(file_path):
533      its_session_utils.remove_file(file_path)
534      logging.debug('Removed green file %s', file_name)
535    else:
536      logging.debug('First camera frame: %s', file_name)
537      first_camera_frame_idx = idx
538      break
539
540  # Find index of last non-green frame
541  for (idx, file_name) in reversed(list(enumerate(file_list))):
542    file_path = os.path.join(log_path, file_name)
543    if is_image_green(file_path):
544      its_session_utils.remove_file(file_path)
545      logging.debug('Removed green file %s', file_name)
546    else:
547      logging.debug('Last camera frame: %s', file_name)
548      last_camera_frame_idx = idx
549      break
550
551  logging.debug('start idx = %d -- end idx = %d', first_camera_frame_idx,
552                last_camera_frame_idx)
553  file_list = file_list[first_camera_frame_idx:last_camera_frame_idx+1]
554
555  # Raise error if capture result and frame count doesn't match
556  capture_results = preview_rec_obj['captureMetadata']
557  extra_capture_result_count = len(capture_results) - len(file_list)
558  logging.debug('Number of frames %d', len(file_list))
559  if extra_capture_result_count != 0:
560    its_session_utils.remove_frame_files(log_path)
561    e_msg = (f'Number of CaptureResult ({len(capture_results)}) '
562             f'vs number of Frames ({len(file_list)}) count mismatch.'
563             ' Retry Test.')
564    raise AssertionError(e_msg)
565
566  # skip frames which might not have 3A converged
567  capture_results = capture_results[_SKIP_INITIAL_FRAMES:]
568  skipped_files = file_list[:_SKIP_INITIAL_FRAMES]
569  file_list = file_list[_SKIP_INITIAL_FRAMES:]
570
571  # delete skipped files
572  for file_name in skipped_files:
573    its_session_utils.remove_file(os.path.join(log_path, file_name))
574
575  return capture_results, file_list
576