1# Copyright 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Verify frames are not dropped during preview recording."""
15
16
17import logging
18import math
19import os
20import time
21
22from mobly import test_runner
23import numpy as np
24
25import its_base_test
26import camera_properties_utils
27import its_session_utils
28import video_processing_utils
29
30
31_FPS_RTOL = 0.1  # Recording FPS must be within 10% of requested FPS
32# Consecutive frames averaging >1.5x more than ideal frame rate -> FAIL
33_FRAME_DELTA_MAXIMUM_FACTOR = 1.5
34_FRAME_DELTA_WINDOW_SIZE = 30  # 0.5 second of 60FPS video -> 30 frames
35_NAME = os.path.splitext(os.path.basename(__file__))[0]
36_SCENE_DISPLAY_WAIT_TIME = 5  # seconds
37_VIDEO_DURATION = 10  # seconds
38
39
40def _get_local_maximum(values, window_size=1):
41  output = min(values)
42  for i in range(len(values)):
43    if i + window_size <= len(values):
44      output = max(output, np.average(values[i:i+window_size]))
45  return output
46
47
48class PreviewFrameDropTest(its_base_test.ItsBaseTest):
49  """Tests if frames are dropped during preview recording.
50
51  Takes a preview recording of a video scene, with circles moving
52  at different simulated frame rates. Verifies that the overall frame rate of
53  the recording matches the requested frame rate, and that there are no
54  significant groups of elevated frame deltas.
55  """
56
57  def test_preview_frame_drop(self):
58    log_path = self.log_path
59
60    with its_session_utils.ItsSession(
61        device_id=self.dut.serial,
62        camera_id=self.camera_id,
63        hidden_physical_id=self.hidden_physical_id) as cam:
64      props = cam.get_camera_properties()
65      props = cam.override_with_hidden_physical_camera_props(props)
66      first_api_level = its_session_utils.get_first_api_level(self.dut.serial)
67
68      camera_properties_utils.skip_unless(
69          first_api_level >= its_session_utils.ANDROID15_API_LEVEL)
70
71      its_session_utils.load_scene(
72          cam, props, self.scene, self.tablet, self.chart_distance,
73          lighting_check=False, log_path=self.log_path)
74      time.sleep(_SCENE_DISPLAY_WAIT_TIME)
75
76      # Log ffmpeg version being used
77      video_processing_utils.log_ffmpeg_version()
78
79      # Find largest minimum AE target FPS
80      fps_ranges = camera_properties_utils.get_ae_target_fps_ranges(props)
81      logging.debug('FPS ranges: %s', fps_ranges)
82      if not fps_ranges:
83        raise AssertionError('No FPS ranges found.')
84      video_fps = max(fps_ranges, key=lambda r: r[0])[0]
85      logging.debug('Recording FPS: %s', video_fps)
86
87      # Record preview at largest supported size
88      supported_preview_sizes = cam.get_supported_preview_sizes(self.camera_id)
89      supported_video_sizes = cam.get_supported_video_sizes_capped(
90          self.camera_id)
91      max_video_size = supported_video_sizes[-1]
92      logging.debug('Camera supported video sizes: %s',
93                    supported_video_sizes)
94
95      # Change preview size depending on video size support
96      preview_size = supported_preview_sizes[-1]
97      if preview_size <= max_video_size:
98        logging.debug('preview_size is supported by video encoder')
99      else:
100        preview_size = max_video_size
101
102      recording_obj = cam.do_preview_recording(
103          preview_size, _VIDEO_DURATION, False,
104          ae_target_fps_min=video_fps, ae_target_fps_max=video_fps)
105
106      logging.debug('Recorded output path: %s',
107                    recording_obj['recordedOutputPath'])
108      logging.debug('Tested quality: %s', recording_obj['quality'])
109
110      # Grab the video from the saved location on DUT
111      self.dut.adb.pull([recording_obj['recordedOutputPath'], log_path])
112      file_name = recording_obj['recordedOutputPath'].split('/')[-1]
113      logging.debug('Recorded file name: %s', file_name)
114
115      # Calculate average frame rate of recording
116      failure_messages = []
117      file_name_with_path = os.path.join(
118          self.log_path, file_name)
119      reported_frame_rate = video_processing_utils.get_average_frame_rate(
120          file_name_with_path)
121      if not math.isclose(video_fps, reported_frame_rate, rel_tol=_FPS_RTOL):
122        failure_messages.append(
123            f'Requested FPS {video_fps} does not match '
124            f'recording FPS {reported_frame_rate}, RTOL: {_FPS_RTOL}'
125        )
126      else:
127        logging.debug('Reported preview frame rate: %s', reported_frame_rate)
128
129      # Calculate frame deltas, discarding first value
130      frame_deltas = np.array(video_processing_utils.get_frame_deltas(
131          file_name_with_path))[1:]
132      frame_delta_max = np.max(frame_deltas)
133      frame_delta_min = np.min(frame_deltas)
134      frame_delta_avg = np.average(frame_deltas)
135      frame_delta_var = np.var(frame_deltas)
136      logging.debug('Frame delta max: %.4f, min: %.4f, avg: %.4f, var: %.4f',
137                    frame_delta_max, frame_delta_min,
138                    frame_delta_avg, frame_delta_var)
139      frame_delta_local_max = _get_local_maximum(
140          frame_deltas, window_size=_FRAME_DELTA_WINDOW_SIZE)
141      logging.debug('Frame delta local maximum: %.4f', frame_delta_local_max)
142      # Below print statements are for metrics logging purpose.
143      # Do not replace with logging.debug().
144      print(f'{_NAME}_max_delta: {frame_delta_local_max:.4f}')
145      maximum_tolerable_frame_delta = _FRAME_DELTA_MAXIMUM_FACTOR / video_fps
146      if frame_delta_local_max > maximum_tolerable_frame_delta:
147        failure_messages.append(
148            f'Local maximum of frame deltas {frame_delta_local_max} was '
149            'greater than maximum tolerable '
150            f'frame delta {maximum_tolerable_frame_delta}. '
151            f'Window for local maximum: {_FRAME_DELTA_WINDOW_SIZE}. '
152        )
153
154      if failure_messages:
155        raise AssertionError('\n'.join(failure_messages))
156
157  def teardown_test(self):
158    its_session_utils.stop_video_playback(self.tablet)
159
160
161if __name__ == '__main__':
162  test_runner.main()
163