1# Copyright 2013 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions to form an ItsSession and perform various camera actions. 15""" 16 17 18import collections 19import fnmatch 20import glob 21import json 22import logging 23import math 24import os 25import socket 26import subprocess 27import sys 28import time 29import types 30import unicodedata 31 32from mobly.controllers.android_device_lib import adb 33import numpy 34 35import camera_properties_utils 36import capture_request_utils 37import error_util 38import image_processing_utils 39import its_device_utils 40import opencv_processing_utils 41import ui_interaction_utils 42 43ANDROID13_API_LEVEL = 33 44ANDROID14_API_LEVEL = 34 45ANDROID15_API_LEVEL = 35 46CHART_DISTANCE_NO_SCALING = 0 47IMAGE_FORMAT_JPEG = 256 48IMAGE_FORMAT_YUV_420_888 = 35 49JCA_CAPTURE_PATH_TAG = 'JCA_CAPTURE_PATH' 50JCA_CAPTURE_STATUS_TAG = 'JCA_CAPTURE_STATUS' 51LOAD_SCENE_DELAY_SEC = 3 52PREVIEW_MAX_TESTED_AREA = 1920 * 1440 53PREVIEW_MIN_TESTED_AREA = 320 * 240 54PRIVATE_FORMAT = 'priv' 55JPEG_R_FMT_STR = 'jpeg_r' 56SCALING_TO_FILE_ATOL = 0.01 57SINGLE_CAPTURE_NCAP = 1 58SUB_CAMERA_SEPARATOR = '.' 59# pylint: disable=line-too-long 60# Allowed tablets as listed on https://source.android.com/docs/compatibility/cts/camera-its-box#tablet-requirements 61# List entries must be entered in lowercase 62TABLET_ALLOWLIST = ( 63 'dragon', # Google Pixel C 64 'hnhey-q', # Honor Pad 8 65 'hwcmr09', # Huawei MediaPad M5 66 'x306f', # Lenovo Tab M10 HD (Gen 2) 67 'x606f', # Lenovo Tab M10 Plus 68 'j606f', # Lenovo Tab P11 69 'tb350fu', # Lenovo Tab P11 (Gen 2) 70 'agta', # Nokia T21 71 'gta4lwifi', # Samsung Galaxy Tab A7 72 'gta8wifi', # Samsung Galaxy Tab A8 73 'gta8', # Samsung Galaxy Tab A8 LTE 74 'gta9pwifi', # Samsung Galaxy Tab A9+ 75 'dpd2221', # Vivo Pad2 76 'nabu', # Xiaomi Pad 5 77 'xun', # Xiaomi Redmi Pad SE 78 'yunluo', # Xiaomi Redmi Pad 79) 80TABLET_DEFAULT_BRIGHTNESS = 192 # 8-bit tablet 75% brightness 81TABLET_LEGACY_BRIGHTNESS = 96 82TABLET_LEGACY_NAME = 'dragon' 83# List entries must be entered in lowercase 84TABLET_OS_VERSION = types.MappingProxyType({ 85 'nabu': ANDROID13_API_LEVEL, 86 'yunluo': ANDROID14_API_LEVEL 87 }) 88TABLET_REQUIREMENTS_URL = 'https://source.android.com/docs/compatibility/cts/camera-its-box#tablet-allowlist' 89TABLET_BRIGHTNESS_ERROR_MSG = ('Tablet brightness not set as per ' 90 f'{TABLET_REQUIREMENTS_URL} in the config file') 91TABLET_NOT_ALLOWED_ERROR_MSG = ('Tablet model or tablet Android version is ' 92 'not on our allowlist, please refer to ' 93 f'{TABLET_REQUIREMENTS_URL}') 94USE_CASE_CROPPED_RAW = 6 95VIDEO_SCENES = ('scene_video',) 96NOT_YET_MANDATED_MESSAGE = 'Not yet mandated test' 97RESULT_OK_STATUS = '-1' 98 99_VALIDATE_LIGHTING_PATCH_H = 0.05 100_VALIDATE_LIGHTING_PATCH_W = 0.05 101_VALIDATE_LIGHTING_REGIONS = { 102 'top-left': (0, 0), 103 'top-right': (0, 1-_VALIDATE_LIGHTING_PATCH_H), 104 'bottom-left': (1-_VALIDATE_LIGHTING_PATCH_W, 0), 105 'bottom-right': (1-_VALIDATE_LIGHTING_PATCH_W, 106 1-_VALIDATE_LIGHTING_PATCH_H), 107} 108_MODULAR_MACRO_OFFSET = 0.35 # Determined empirically from modular rig testing 109_VALIDATE_LIGHTING_REGIONS_MODULAR_UW = { 110 'top-left': (_MODULAR_MACRO_OFFSET, _MODULAR_MACRO_OFFSET), 111 'bottom-left': (_MODULAR_MACRO_OFFSET, 112 1-_MODULAR_MACRO_OFFSET-_VALIDATE_LIGHTING_PATCH_H), 113 'top-right': (1-_MODULAR_MACRO_OFFSET-_VALIDATE_LIGHTING_PATCH_W, 114 _MODULAR_MACRO_OFFSET), 115 'bottom-right': (1-_MODULAR_MACRO_OFFSET-_VALIDATE_LIGHTING_PATCH_W, 116 1-_MODULAR_MACRO_OFFSET-_VALIDATE_LIGHTING_PATCH_H), 117} 118_VALIDATE_LIGHTING_MACRO_FOV_THRESH = 110 119_VALIDATE_LIGHTING_THRESH = 0.05 # Determined empirically from scene[1:6] tests 120_VALIDATE_LIGHTING_THRESH_DARK = 0.3 # Determined empirically for night test 121_CMD_NAME_STR = 'cmdName' 122_OBJ_VALUE_STR = 'objValue' 123_STR_VALUE_STR = 'strValue' 124_TAG_STR = 'tag' 125_CAMERA_ID_STR = 'cameraId' 126_EXTRA_TIMEOUT_FACTOR = 10 127_COPY_SCENE_DELAY_SEC = 1 128_DST_SCENE_DIR = '/sdcard/Download/' 129_BIT_HLG10 = 0x01 # bit 1 for feature mask 130_BIT_STABILIZATION = 0x02 # bit 2 for feature mask 131 132 133def validate_tablet(tablet_name, brightness, device_id): 134 """Ensures tablet brightness is set according to documentation. 135 136 https://source.android.com/docs/compatibility/cts/camera-its-box#tablet-allowlist 137 Args: 138 tablet_name: tablet product name specified by `ro.product.device`. 139 brightness: brightness specified by config file. 140 device_id: str; ID of the device. 141 """ 142 tablet_name = tablet_name.lower() 143 if tablet_name not in TABLET_ALLOWLIST: 144 raise AssertionError(TABLET_NOT_ALLOWED_ERROR_MSG) 145 if tablet_name in TABLET_OS_VERSION: 146 if get_build_sdk_version(device_id) < TABLET_OS_VERSION[tablet_name]: 147 raise AssertionError(TABLET_NOT_ALLOWED_ERROR_MSG) 148 name_to_brightness = { 149 TABLET_LEGACY_NAME: TABLET_LEGACY_BRIGHTNESS, 150 } 151 if tablet_name in name_to_brightness: 152 if brightness != name_to_brightness[tablet_name]: 153 raise AssertionError(TABLET_BRIGHTNESS_ERROR_MSG) 154 else: 155 if brightness != TABLET_DEFAULT_BRIGHTNESS: 156 raise AssertionError(TABLET_BRIGHTNESS_ERROR_MSG) 157 158 159def check_apk_installed(device_id, package_name): 160 """Verifies that an APK is installed on a given device. 161 162 Args: 163 device_id: str; ID of the device. 164 package_name: str; name of the package that should be installed. 165 """ 166 verify_cts_cmd = ( 167 f'adb -s {device_id} shell pm list packages | ' 168 f'grep {package_name}' 169 ) 170 bytes_output = subprocess.check_output( 171 verify_cts_cmd, stderr=subprocess.STDOUT, shell=True 172 ) 173 output = str(bytes_output.decode('utf-8')).strip() 174 if package_name not in output: 175 raise AssertionError( 176 f'{package_name} not installed on device {device_id}!' 177 ) 178 179 180class ItsSession(object): 181 """Controls a device over adb to run ITS scripts. 182 183 The script importing this module (on the host machine) prepares JSON 184 objects encoding CaptureRequests, specifying sets of parameters to use 185 when capturing an image using the Camera2 APIs. This class encapsulates 186 sending the requests to the device, monitoring the device's progress, and 187 copying the resultant captures back to the host machine when done. TCP 188 forwarded over adb is the transport mechanism used. 189 190 The device must have CtsVerifier.apk installed. 191 192 Attributes: 193 sock: The open socket. 194 """ 195 196 # Open a connection to localhost:<host_port>, forwarded to port 6000 on the 197 # device. <host_port> is determined at run-time to support multiple 198 # connected devices. 199 IPADDR = '127.0.0.1' 200 REMOTE_PORT = 6000 201 BUFFER_SIZE = 4096 202 203 # LOCK_PORT is used as a mutex lock to protect the list of forwarded ports 204 # among all processes. The script assumes LOCK_PORT is available and will 205 # try to use ports between CLIENT_PORT_START and 206 # CLIENT_PORT_START+MAX_NUM_PORTS-1 on host for ITS sessions. 207 CLIENT_PORT_START = 6000 208 MAX_NUM_PORTS = 100 209 LOCK_PORT = CLIENT_PORT_START + MAX_NUM_PORTS 210 211 # Seconds timeout on each socket operation. 212 SOCK_TIMEOUT = 20.0 213 # Seconds timeout on performance measurement socket operation 214 SOCK_TIMEOUT_FOR_PERF_MEASURE = 40.0 215 # Seconds timeout on preview recording socket operation. 216 SOCK_TIMEOUT_PREVIEW = 30.0 # test_imu_drift is 30s 217 218 # Additional timeout in seconds when ITS service is doing more complicated 219 # operations, for example: issuing warmup requests before actual capture. 220 EXTRA_SOCK_TIMEOUT = 5.0 221 222 PACKAGE = 'com.android.cts.verifier.camera.its' 223 INTENT_START = 'com.android.cts.verifier.camera.its.START' 224 225 # This string must be in sync with ItsService. Updated when interface 226 # between script and ItsService is changed. 227 ITS_SERVICE_VERSION = '1.0' 228 229 SEC_TO_NSEC = 1000*1000*1000.0 230 adb = 'adb -d' 231 232 # Predefine camera props. Save props extracted from the function, 233 # "get_camera_properties". 234 props = None 235 236 IMAGE_FORMAT_LIST_1 = [ 237 'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage', 238 'dngImage', 'y8Image', 'jpeg_rImage', 239 'rawQuadBayerImage', 'rawQuadBayerStatsImage', 240 'raw10StatsImage', 'raw10QuadBayerStatsImage', 'raw10QuadBayerImage' 241 ] 242 243 IMAGE_FORMAT_LIST_2 = [ 244 'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage', 245 'yuvImage', 'jpeg_rImage', 246 'rawQuadBayerImage', 'rawQuadBayerStatsImage', 247 'raw10StatsImage', 'raw10QuadBayerStatsImage', 'raw10QuadBayerImage' 248 ] 249 250 CAP_JPEG = {'format': 'jpeg'} 251 CAP_RAW = {'format': 'raw'} 252 CAP_CROPPED_RAW = {'format': 'raw', 'useCase': USE_CASE_CROPPED_RAW} 253 CAP_YUV = {'format': 'yuv'} 254 CAP_RAW_YUV = [{'format': 'raw'}, {'format': 'yuv'}] 255 256 def __init_socket_port(self): 257 """Initialize the socket port for the host to forward requests to the device. 258 259 This method assumes localhost's LOCK_PORT is available and will try to 260 use ports between CLIENT_PORT_START and CLIENT_PORT_START+MAX_NUM_PORTS-1 261 """ 262 num_retries = 100 263 retry_wait_time_sec = 0.05 264 265 # Bind a socket to use as mutex lock 266 socket_lock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 267 for i in range(num_retries): 268 try: 269 socket_lock.bind((ItsSession.IPADDR, ItsSession.LOCK_PORT)) 270 break 271 except (socket.error, socket.timeout) as socket_issue: 272 if i == num_retries - 1: 273 raise error_util.CameraItsError( 274 self._device_id, 'socket lock returns error') from socket_issue 275 else: 276 time.sleep(retry_wait_time_sec) 277 278 # Check if a port is already assigned to the device. 279 command = 'adb forward --list' 280 proc = subprocess.Popen(command.split(), stdout=subprocess.PIPE) 281 # pylint: disable=unused-variable 282 output, error = proc.communicate() 283 port = None 284 used_ports = [] 285 for line in output.decode('utf-8').split(os.linesep): 286 # each line should be formatted as: 287 # "<device_id> tcp:<host_port> tcp:<remote_port>" 288 forward_info = line.split() 289 if len(forward_info) >= 3 and len( 290 forward_info[1]) > 4 and forward_info[1][:4] == 'tcp:' and len( 291 forward_info[2]) > 4 and forward_info[2][:4] == 'tcp:': 292 local_p = int(forward_info[1][4:]) 293 remote_p = int(forward_info[2][4:]) 294 if forward_info[ 295 0] == self._device_id and remote_p == ItsSession.REMOTE_PORT: 296 port = local_p 297 break 298 else: 299 used_ports.append(local_p) 300 301 # Find the first available port if no port is assigned to the device. 302 if port is None: 303 for p in range(ItsSession.CLIENT_PORT_START, 304 ItsSession.CLIENT_PORT_START + ItsSession.MAX_NUM_PORTS): 305 if self.check_port_availability(p, used_ports): 306 port = p 307 break 308 309 if port is None: 310 raise error_util.CameraItsError(self._device_id, 311 ' cannot find an available ' + 'port') 312 313 # Release the socket as mutex unlock 314 socket_lock.close() 315 316 # Connect to the socket 317 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 318 self.sock.connect((self.IPADDR, port)) 319 self.sock.settimeout(self.SOCK_TIMEOUT) 320 321 def check_port_availability(self, check_port, used_ports): 322 """Check if the port is available or not. 323 324 Args: 325 check_port: Port to check for availability 326 used_ports: List of used ports 327 328 Returns: 329 True if the given port is available and can be assigned to the device. 330 """ 331 if check_port not in used_ports: 332 # Try to run "adb forward" with the port 333 command = ('%s forward tcp:%d tcp:%d' % 334 (self.adb, check_port, self.REMOTE_PORT)) 335 proc = subprocess.Popen( 336 command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 337 error = proc.communicate()[1] 338 339 # Check if there is no error 340 if error is None or error.find('error'.encode()) < 0: 341 return True 342 else: 343 return False 344 345 def __wait_for_service(self): 346 """Wait for ItsService to be ready and reboot the device if needed. 347 348 This also includes the optional reboot handling: if the user 349 provides a "reboot" or "reboot=N" arg, then reboot the device, 350 waiting for N seconds (default 30) before returning. 351 """ 352 353 for s in sys.argv[1:]: 354 if s[:6] == 'reboot': 355 duration = 30 356 if len(s) > 7 and s[6] == '=': 357 duration = int(s[7:]) 358 logging.debug('Rebooting device') 359 its_device_utils.run(f'{self.adb} reboot') 360 its_device_utils.run(f'{self.adb} wait-for-device') 361 time.sleep(duration) 362 logging.debug('Reboot complete') 363 364 # Flush logcat so following code won't be misled by previous 365 # 'ItsService ready' log. 366 its_device_utils.run(f'{self.adb} logcat -c') 367 time.sleep(1) 368 369 its_device_utils.run( 370 f'{self.adb} shell am force-stop --user 0 {self.PACKAGE}') 371 its_device_utils.run( 372 f'{self.adb} shell am start-foreground-service --user 0 ' 373 f'-t text/plain -a {self.INTENT_START}' 374 ) 375 376 # Wait until the socket is ready to accept a connection. 377 proc = subprocess.Popen( 378 self.adb.split() + ['logcat'], stdout=subprocess.PIPE) 379 logcat = proc.stdout 380 while True: 381 line = logcat.readline().strip() 382 if line.find(b'ItsService ready') >= 0: 383 break 384 proc.kill() 385 proc.communicate() 386 387 def __init__(self, device_id=None, camera_id=None, hidden_physical_id=None, 388 override_to_portrait=None): 389 self._camera_id = camera_id 390 self._device_id = device_id 391 self._hidden_physical_id = hidden_physical_id 392 self._override_to_portrait = override_to_portrait 393 394 # Initialize device id and adb command. 395 self.adb = 'adb -s ' + self._device_id 396 self.__wait_for_service() 397 self.__init_socket_port() 398 399 def __enter__(self): 400 self.close_camera() 401 self.__open_camera() 402 return self 403 404 def __exit__(self, exec_type, exec_value, exec_traceback): 405 if hasattr(self, 'sock') and self.sock: 406 self.close_camera() 407 self.sock.close() 408 return False 409 410 def override_with_hidden_physical_camera_props(self, props): 411 """Check that it is a valid sub-camera backing the logical camera. 412 413 If current session is for a hidden physical camera, check that it is a valid 414 sub-camera backing the logical camera, override self.props, and return the 415 characteristics of sub-camera. Otherwise, return "props" directly. 416 417 Args: 418 props: Camera properties object. 419 420 Returns: 421 The properties of the hidden physical camera if possible. 422 """ 423 if self._hidden_physical_id: 424 if not camera_properties_utils.logical_multi_camera(props): 425 logging.debug('cam %s not a logical multi-camera: no change in props.', 426 self._hidden_physical_id) 427 return props 428 physical_ids = camera_properties_utils.logical_multi_camera_physical_ids( 429 props) 430 if self._hidden_physical_id not in physical_ids: 431 raise AssertionError(f'{self._hidden_physical_id} is not a hidden ' 432 f'sub-camera of {self._camera_id}') 433 logging.debug('Overriding cam %s props', self._hidden_physical_id) 434 props = self.get_camera_properties_by_id(self._hidden_physical_id) 435 self.props = props 436 return props 437 438 def get_camera_properties(self): 439 """Get the camera properties object for the device. 440 441 Returns: 442 The Python dictionary object for the CameraProperties object. 443 """ 444 cmd = {} 445 cmd[_CMD_NAME_STR] = 'getCameraProperties' 446 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 447 data, _ = self.__read_response_from_socket() 448 if data[_TAG_STR] != 'cameraProperties': 449 raise error_util.CameraItsError('Invalid command response') 450 self.props = data[_OBJ_VALUE_STR]['cameraProperties'] 451 return data[_OBJ_VALUE_STR]['cameraProperties'] 452 453 def get_session_properties(self, out_surfaces, cap_request): 454 """Get the camera properties object for a session configuration. 455 456 Args: 457 out_surfaces: output surfaces used to query session props. 458 cap_request: capture request used to query session props. 459 460 Returns: 461 The Python dictionary object for the CameraProperties object. 462 """ 463 cmd = {} 464 cmd[_CMD_NAME_STR] = 'getCameraSessionProperties' 465 if out_surfaces: 466 if isinstance(out_surfaces, list): 467 cmd['outputSurfaces'] = out_surfaces 468 else: 469 cmd['outputSurfaces'] = [out_surfaces] 470 formats = [ 471 c['format'] if 'format' in c else 'yuv' for c in cmd['outputSurfaces'] 472 ] 473 formats = [s if s != 'jpg' else 'jpeg' for s in formats] 474 else: 475 max_yuv_size = capture_request_utils.get_available_output_sizes( 476 'yuv', self.props)[0] 477 formats = ['yuv'] 478 cmd['outputSurfaces'] = [{ 479 'format': 'yuv', 480 'width': max_yuv_size[0], 481 'height': max_yuv_size[1] 482 }] 483 cmd['captureRequest'] = cap_request 484 485 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 486 data, _ = self.__read_response_from_socket() 487 if data[_TAG_STR] != 'cameraProperties': 488 raise error_util.CameraItsError('Invalid command response') 489 self.props = data[_OBJ_VALUE_STR]['cameraProperties'] 490 return data[_OBJ_VALUE_STR]['cameraProperties'] 491 492 def get_camera_properties_by_id(self, camera_id, override_to_portrait=None): 493 """Get the camera properties object for device with camera_id. 494 495 Args: 496 camera_id: The ID string of the camera 497 override_to_portrait: Optional value for overrideToPortrait 498 499 Returns: 500 The Python dictionary object for the CameraProperties object. Empty 501 if no such device exists. 502 """ 503 cmd = {} 504 cmd[_CMD_NAME_STR] = 'getCameraPropertiesById' 505 cmd[_CAMERA_ID_STR] = camera_id 506 if override_to_portrait is not None: 507 cmd['overrideToPortrait'] = override_to_portrait 508 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 509 data, _ = self.__read_response_from_socket() 510 if data[_TAG_STR] != 'cameraProperties': 511 raise error_util.CameraItsError('Invalid command response') 512 return data[_OBJ_VALUE_STR]['cameraProperties'] 513 514 def __read_response_from_socket(self): 515 """Reads a line (newline-terminated) string serialization of JSON object. 516 517 Returns: 518 Deserialized json obj. 519 """ 520 chars = [] 521 while not chars or chars[-1] != '\n': 522 ch = self.sock.recv(1).decode('utf-8') 523 if not ch: 524 # Socket was probably closed; otherwise don't get empty strings 525 raise error_util.CameraItsError('Problem with socket on device side') 526 chars.append(ch) 527 line = ''.join(chars) 528 jobj = json.loads(line) 529 # Optionally read a binary buffer of a fixed size. 530 buf = None 531 if 'bufValueSize' in jobj: 532 n = jobj['bufValueSize'] 533 buf = bytearray(n) 534 view = memoryview(buf) 535 while n > 0: 536 nbytes = self.sock.recv_into(view, n) 537 view = view[nbytes:] 538 n -= nbytes 539 buf = numpy.frombuffer(buf, dtype=numpy.uint8) 540 return jobj, buf 541 542 def __open_camera(self): 543 """Get the camera ID to open if it is an argument as a single camera. 544 545 This allows passing camera=# to individual tests at command line 546 and camera=#,#,# or an no camera argv with tools/run_all_tests.py. 547 In case the camera is a logical multi-camera, to run ITS on the 548 hidden physical sub-camera, pass camera=[logical ID]:[physical ID] 549 to an individual test at the command line, and same applies to multiple 550 camera IDs for tools/run_all_tests.py: camera=#,#:#,#:#,# 551 """ 552 if not self._camera_id: 553 self._camera_id = 0 554 for s in sys.argv[1:]: 555 if s[:7] == 'camera=' and len(s) > 7: 556 camera_ids = s[7:].split(',') 557 camera_id_combos = parse_camera_ids(camera_ids) 558 if len(camera_id_combos) == 1: 559 self._camera_id = camera_id_combos[0].id 560 self._hidden_physical_id = camera_id_combos[0].sub_id 561 562 logging.debug('Opening camera: %s', self._camera_id) 563 cmd = {_CMD_NAME_STR: 'open', _CAMERA_ID_STR: self._camera_id} 564 if self._override_to_portrait is not None: 565 cmd['overrideToPortrait'] = self._override_to_portrait 566 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 567 data, _ = self.__read_response_from_socket() 568 if data[_TAG_STR] != 'cameraOpened': 569 raise error_util.CameraItsError('Invalid command response') 570 571 def close_camera(self): 572 cmd = {_CMD_NAME_STR: 'close'} 573 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 574 data, _ = self.__read_response_from_socket() 575 if data[_TAG_STR] != 'cameraClosed': 576 raise error_util.CameraItsError('Invalid command response') 577 578 def zoom_ratio_within_range(self, zoom_ratio): 579 """Determine if a given zoom ratio is within device zoom range. 580 581 Args: 582 zoom_ratio: float; zoom ratio requested 583 Returns: 584 Boolean: True, if zoom_ratio inside device range. False otherwise. 585 """ 586 zoom_range = self.props['android.control.zoomRatioRange'] 587 return zoom_ratio >= zoom_range[0] and zoom_ratio <= zoom_range[1] 588 589 def get_sensors(self): 590 """Get all sensors on the device. 591 592 Returns: 593 A Python dictionary that returns keys and booleans for each sensor. 594 """ 595 cmd = {} 596 cmd[_CMD_NAME_STR] = 'checkSensorExistence' 597 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 598 data, _ = self.__read_response_from_socket() 599 if data[_TAG_STR] != 'sensorExistence': 600 raise error_util.CameraItsError('Invalid response for command: %s' % 601 cmd[_CMD_NAME_STR]) 602 return data[_OBJ_VALUE_STR] 603 604 def get_default_camera_pkg(self): 605 """Get default camera app package name. 606 607 Returns: 608 Default camera app pkg name. 609 """ 610 cmd = {} 611 cmd[_CMD_NAME_STR] = 'doGetDefaultCameraPkgName' 612 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 613 data, _ = self.__read_response_from_socket() 614 if data[_TAG_STR] != 'defaultCameraPkg': 615 raise error_util.CameraItsError('Invalid response for command: %s' % 616 cmd[_CMD_NAME_STR]) 617 return data['strValue'] 618 619 def check_gain_map_present(self, file_path): 620 """Check if the image has gainmap present or not. 621 622 The image stored at file_path is decoded and analyzed 623 to check whether the gainmap is present or not. If the image 624 captured is UltraHDR, it should have gainmap present. 625 626 Args: 627 file_path: path of the image to be analyzed on DUT. 628 Returns: 629 Boolean: True if the image has gainmap present. 630 """ 631 cmd = {} 632 cmd[_CMD_NAME_STR] = 'doGainMapCheck' 633 cmd['filePath'] = file_path 634 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 635 data, _ = self.__read_response_from_socket() 636 if data[_TAG_STR] != 'gainmapPresent': 637 raise error_util.CameraItsError( 638 'Invalid response for command: %s' % cmd[_CMD_NAME_STR]) 639 return data['strValue'] 640 641 def start_sensor_events(self): 642 """Start collecting sensor events on the device. 643 644 See get_sensor_events for more info. 645 646 Returns: 647 Nothing. 648 """ 649 cmd = {} 650 cmd[_CMD_NAME_STR] = 'startSensorEvents' 651 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 652 data, _ = self.__read_response_from_socket() 653 if data[_TAG_STR] != 'sensorEventsStarted': 654 raise error_util.CameraItsError('Invalid response for command: %s' % 655 cmd[_CMD_NAME_STR]) 656 657 def get_sensor_events(self): 658 """Get a trace of all sensor events on the device. 659 660 The trace starts when the start_sensor_events function is called. If 661 the test runs for a long time after this call, then the device's 662 internal memory can fill up. Calling get_sensor_events gets all events 663 from the device, and then stops the device from collecting events and 664 clears the internal buffer; to start again, the start_sensor_events 665 call must be used again. 666 667 Events from the accelerometer, compass, and gyro are returned; each 668 has a timestamp and x,y,z values. 669 670 Note that sensor events are only produced if the device isn't in its 671 standby mode (i.e.) if the screen is on. 672 673 Returns: 674 A Python dictionary with three keys ("accel", "mag", "gyro") each 675 of which maps to a list of objects containing "time","x","y","z" 676 keys. 677 """ 678 cmd = {} 679 cmd[_CMD_NAME_STR] = 'getSensorEvents' 680 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 681 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 682 self.sock.settimeout(timeout) 683 data, _ = self.__read_response_from_socket() 684 if data[_TAG_STR] != 'sensorEvents': 685 raise error_util.CameraItsError('Invalid response for command: %s ' % 686 cmd[_CMD_NAME_STR]) 687 self.sock.settimeout(self.SOCK_TIMEOUT) 688 return data[_OBJ_VALUE_STR] 689 690 def get_camera_ids(self): 691 """Returns the list of all camera_ids. 692 693 Returns: 694 List of camera ids on the device. 695 """ 696 cmd = {'cmdName': 'getCameraIds'} 697 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 698 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 699 self.sock.settimeout(timeout) 700 data, _ = self.__read_response_from_socket() 701 if data['tag'] != 'cameraIds': 702 raise error_util.CameraItsError('Invalid command response') 703 return data['objValue'] 704 705 def get_camera_name(self): 706 """Gets the camera name. 707 708 Returns: 709 The camera name with camera id and/or hidden physical id. 710 """ 711 if self._hidden_physical_id: 712 return f'{self._camera_id}.{self._hidden_physical_id}' 713 else: 714 return self._camera_id 715 716 def get_unavailable_physical_cameras(self, camera_id): 717 """Get the unavailable physical cameras ids. 718 719 Args: 720 camera_id: int; device id 721 Returns: 722 List of all physical camera ids which are unavailable. 723 """ 724 cmd = {_CMD_NAME_STR: 'doGetUnavailablePhysicalCameras', 725 _CAMERA_ID_STR: camera_id} 726 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 727 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 728 self.sock.settimeout(timeout) 729 data, _ = self.__read_response_from_socket() 730 if data[_TAG_STR] != 'unavailablePhysicalCameras': 731 raise error_util.CameraItsError('Invalid command response') 732 return data[_OBJ_VALUE_STR] 733 734 def is_hlg10_recording_supported_for_profile(self, profile_id): 735 """Query whether the camera device supports HLG10 video recording. 736 737 Args: 738 profile_id: int; profile id corresponding to the quality level. 739 Returns: 740 Boolean: True if device supports HLG10 video recording, False in 741 all other cases. 742 """ 743 cmd = {} 744 cmd[_CMD_NAME_STR] = 'isHLG10SupportedForProfile' 745 cmd[_CAMERA_ID_STR] = self._camera_id 746 cmd['profileId'] = profile_id 747 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 748 749 data, _ = self.__read_response_from_socket() 750 if data[_TAG_STR] != 'hlg10Response': 751 raise error_util.CameraItsError('Failed to query HLG10 support') 752 return data[_STR_VALUE_STR] == 'true' 753 754 def is_hlg10_recording_supported_for_size_and_fps( 755 self, video_size, max_fps): 756 """Query whether the camera device supports HLG10 video recording. 757 758 Args: 759 video_size: String; the hlg10 video recording size. 760 max_fps: int; the maximum frame rate of the camera. 761 Returns: 762 Boolean: True if device supports HLG10 video recording, False in 763 all other cases. 764 """ 765 cmd = {} 766 cmd[_CMD_NAME_STR] = 'isHLG10SupportedForSizeAndFps' 767 cmd[_CAMERA_ID_STR] = self._camera_id 768 cmd['videoSize'] = video_size 769 cmd['maxFps'] = max_fps 770 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 771 772 data, _ = self.__read_response_from_socket() 773 if data[_TAG_STR] != 'hlg10Response': 774 raise error_util.CameraItsError('Failed to query HLG10 support') 775 return data[_STR_VALUE_STR] == 'true' 776 777 def is_p3_capture_supported(self): 778 """Query whether the camera device supports P3 image capture. 779 780 Returns: 781 Boolean: True, if device supports P3 image capture, False in 782 all other cases. 783 """ 784 cmd = {} 785 cmd[_CMD_NAME_STR] = 'isP3Supported' 786 cmd[_CAMERA_ID_STR] = self._camera_id 787 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 788 789 data, _ = self.__read_response_from_socket() 790 if data[_TAG_STR] != 'p3Response': 791 raise error_util.CameraItsError('Failed to query P3 support') 792 return data[_STR_VALUE_STR] == 'true' 793 794 def is_landscape_to_portrait_enabled(self): 795 """Query whether the device has enabled the landscape to portrait property. 796 797 Returns: 798 Boolean: True, if the device has the system property enabled. False 799 otherwise. 800 """ 801 cmd = {} 802 cmd[_CMD_NAME_STR] = 'isLandscapeToPortraitEnabled' 803 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 804 805 data, _ = self.__read_response_from_socket() 806 if data[_TAG_STR] != 'landscapeToPortraitEnabledResponse': 807 raise error_util.CameraItsError( 808 'Failed to query landscape to portrait system property') 809 return data[_STR_VALUE_STR] == 'true' 810 811 def get_supported_video_sizes_capped(self, camera_id): 812 """Get the supported video sizes for camera id. 813 814 Args: 815 camera_id: int; device id 816 Returns: 817 Sorted list of supported video sizes. 818 """ 819 820 cmd = { 821 _CMD_NAME_STR: 'doGetSupportedVideoSizesCapped', 822 _CAMERA_ID_STR: camera_id, 823 } 824 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 825 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 826 self.sock.settimeout(timeout) 827 data, _ = self.__read_response_from_socket() 828 if data[_TAG_STR] != 'supportedVideoSizes': 829 raise error_util.CameraItsError('Invalid command response') 830 if not data[_STR_VALUE_STR]: 831 raise error_util.CameraItsError('No supported video sizes') 832 return data[_STR_VALUE_STR].split(';') 833 834 def do_basic_recording(self, profile_id, quality, duration, 835 video_stabilization_mode=0, hlg10_enabled=False, 836 zoom_ratio=None, ae_target_fps_min=None, 837 ae_target_fps_max=None): 838 """Issue a recording request and read back the video recording object. 839 840 The recording will be done with the format specified in quality. These 841 quality levels correspond to the profiles listed in CamcorderProfile. 842 The duration is the time in seconds for which the video will be recorded. 843 The recorded object consists of a path on the device at which the 844 recorded video is saved. 845 846 Args: 847 profile_id: int; profile id corresponding to the quality level. 848 quality: Video recording quality such as High, Low, VGA. 849 duration: The time in seconds for which the video will be recorded. 850 video_stabilization_mode: Video stabilization mode ON/OFF. Value can be 851 0: 'OFF', 1: 'ON', 2: 'PREVIEW' 852 hlg10_enabled: boolean: True Enable 10-bit HLG video recording, False 853 record using the regular SDR profile 854 zoom_ratio: float; zoom ratio. None if default zoom 855 ae_target_fps_min: int; CONTROL_AE_TARGET_FPS_RANGE min. Set if not None 856 ae_target_fps_max: int; CONTROL_AE_TARGET_FPS_RANGE max. Set if not None 857 Returns: 858 video_recorded_object: The recorded object returned from ItsService which 859 contains path at which the recording is saved on the device, quality of 860 the recorded video, video size of the recorded video, video frame rate 861 and 'hlg10' if 'hlg10_enabled' is set to True. 862 Ex: 863 VideoRecordingObject: { 864 'tag': 'recordingResponse', 865 'objValue': { 866 'recordedOutputPath': 867 '/storage/emulated/0/Android/data/com.android.cts.verifier' 868 '/files/VideoITS/VID_20220324_080414_0_CIF_352x288.mp4', 869 'quality': 'CIF', 870 'videoFrameRate': 30, 871 'videoSize': '352x288' 872 } 873 } 874 """ 875 cmd = {_CMD_NAME_STR: 'doBasicRecording', _CAMERA_ID_STR: self._camera_id, 876 'profileId': profile_id, 'quality': quality, 877 'recordingDuration': duration, 878 'videoStabilizationMode': video_stabilization_mode, 879 'hlg10Enabled': hlg10_enabled} 880 if zoom_ratio: 881 if self.zoom_ratio_within_range(zoom_ratio): 882 cmd['zoomRatio'] = zoom_ratio 883 else: 884 raise AssertionError(f'Zoom ratio {zoom_ratio} out of range') 885 if ae_target_fps_min and ae_target_fps_max: 886 cmd['aeTargetFpsMin'] = ae_target_fps_min 887 cmd['aeTargetFpsMax'] = ae_target_fps_max 888 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 889 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 890 self.sock.settimeout(timeout) 891 data, _ = self.__read_response_from_socket() 892 if data[_TAG_STR] != 'recordingResponse': 893 raise error_util.CameraItsError( 894 f'Invalid response for command: {cmd[_CMD_NAME_STR]}') 895 return data[_OBJ_VALUE_STR] 896 897 def _execute_preview_recording(self, cmd): 898 """Send preview recording command over socket and retrieve output object. 899 900 Args: 901 cmd: dict; Mapping from command key to corresponding value 902 Returns: 903 video_recorded_object: The recorded object returned from ItsService which 904 contains path at which the recording is saved on the device, quality of 905 the recorded video which is always set to "preview", video size of the 906 recorded video, video frame rate. 907 Ex: 908 VideoRecordingObject: { 909 'tag': 'recordingResponse', 910 'objValue': { 911 'recordedOutputPath': '/storage/emulated/0/Android/data/' 912 'com.android.cts.verifier/files/VideoITS/' 913 'VID_20220324_080414_0_CIF_352x288.mp4', 914 'quality': 'preview', 915 'videoSize': '352x288' 916 } 917 } 918 """ 919 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 920 timeout = (self.SOCK_TIMEOUT_PREVIEW + 921 self.EXTRA_SOCK_TIMEOUT * _EXTRA_TIMEOUT_FACTOR) 922 self.sock.settimeout(timeout) 923 924 data, _ = self.__read_response_from_socket() 925 logging.debug('VideoRecordingObject: %s', str(data)) 926 if data[_TAG_STR] != 'recordingResponse': 927 raise error_util.CameraItsError( 928 f'Invalid response from command{cmd[_CMD_NAME_STR]}') 929 return data[_OBJ_VALUE_STR] 930 931 def do_preview_recording_multiple_surfaces( 932 self, output_surfaces, duration, stabilize, ois=False, 933 zoom_ratio=None, ae_target_fps_min=None, ae_target_fps_max=None): 934 """Issue a preview request and read back the preview recording object. 935 936 The resolution of the preview and its recording will be determined by 937 video_size. The duration is the time in seconds for which the preview will 938 be recorded. The recorded object consists of a path on the device at 939 which the recorded video is saved. 940 941 Args: 942 output_surfaces: list; The list of output surfaces used for creating 943 preview recording session. The first surface 944 is used for recording. 945 duration: int; The time in seconds for which the video will be recorded. 946 stabilize: boolean; Whether the preview should be stabilized or not 947 ois: boolean; Whether the preview should be optically stabilized or not 948 zoom_ratio: float; static zoom ratio. None if default zoom 949 ae_target_fps_min: int; CONTROL_AE_TARGET_FPS_RANGE min. Set if not None 950 ae_target_fps_max: int; CONTROL_AE_TARGET_FPS_RANGE max. Set if not None 951 Returns: 952 video_recorded_object: The recorded object returned from ItsService 953 """ 954 cam_id = self._camera_id 955 if 'physicalCamera' in output_surfaces[0]: 956 cam_id = output_surfaces[0]['physicalCamera'] 957 cmd = { 958 _CMD_NAME_STR: 'doStaticPreviewRecording', 959 _CAMERA_ID_STR: cam_id, 960 'outputSurfaces': output_surfaces, 961 'recordingDuration': duration, 962 'stabilize': stabilize, 963 'ois': ois, 964 } 965 if zoom_ratio: 966 if self.zoom_ratio_within_range(zoom_ratio): 967 cmd['zoomRatio'] = zoom_ratio 968 else: 969 raise AssertionError(f'Zoom ratio {zoom_ratio} out of range') 970 if ae_target_fps_min and ae_target_fps_max: 971 cmd['aeTargetFpsMin'] = ae_target_fps_min 972 cmd['aeTargetFpsMax'] = ae_target_fps_max 973 return self._execute_preview_recording(cmd) 974 975 def do_preview_recording(self, video_size, duration, stabilize, ois=False, 976 zoom_ratio=None, ae_target_fps_min=None, 977 ae_target_fps_max=None, hlg10_enabled=False): 978 """Issue a preview request and read back the preview recording object. 979 980 The resolution of the preview and its recording will be determined by 981 video_size. The duration is the time in seconds for which the preview will 982 be recorded. The recorded object consists of a path on the device at 983 which the recorded video is saved. 984 985 Args: 986 video_size: str; Preview resolution at which to record. ex. "1920x1080" 987 duration: int; The time in seconds for which the video will be recorded. 988 stabilize: boolean; Whether the preview should be stabilized or not 989 ois: boolean; Whether the preview should be optically stabilized or not 990 zoom_ratio: float; static zoom ratio. None if default zoom 991 ae_target_fps_min: int; CONTROL_AE_TARGET_FPS_RANGE min. Set if not None 992 ae_target_fps_max: int; CONTROL_AE_TARGET_FPS_RANGE max. Set if not None 993 hlg10_enabled: boolean; True Eanable 10-bit HLG video recording, False 994 record using the regular SDK profile. 995 Returns: 996 video_recorded_object: The recorded object returned from ItsService 997 """ 998 output_surfaces = self.preview_surface(video_size, hlg10_enabled) 999 return self.do_preview_recording_multiple_surfaces( 1000 output_surfaces, duration, stabilize, ois, zoom_ratio, 1001 ae_target_fps_min, ae_target_fps_max) 1002 1003 def do_preview_recording_with_dynamic_zoom(self, video_size, stabilize, 1004 sweep_zoom, 1005 ae_target_fps_min=None, 1006 ae_target_fps_max=None, 1007 padded_frames=False): 1008 """Issue a preview request with dynamic zoom and read back output object. 1009 1010 The resolution of the preview and its recording will be determined by 1011 video_size. The duration will be determined by the duration at each zoom 1012 ratio and the total number of zoom ratios. The recorded object consists 1013 of a path on the device at which the recorded video is saved. 1014 1015 Args: 1016 video_size: str; Preview resolution at which to record. ex. "1920x1080" 1017 stabilize: boolean; Whether the preview should be stabilized or not 1018 sweep_zoom: tuple of (zoom_start, zoom_end, step_size, step_duration). 1019 Used to control zoom ratio during recording. 1020 zoom_start (float) is the starting zoom ratio during recording 1021 zoom_end (float) is the ending zoom ratio during recording 1022 step_size (float) is the step for zoom ratio during recording 1023 step_duration (float) sleep in ms between zoom ratios 1024 ae_target_fps_min: int; CONTROL_AE_TARGET_FPS_RANGE min. Set if not None 1025 ae_target_fps_max: int; CONTROL_AE_TARGET_FPS_RANGE max. Set if not None 1026 padded_frames: boolean; Whether to add additional frames at the beginning 1027 and end of recording to workaround issue with MediaRecorder. 1028 Returns: 1029 video_recorded_object: The recorded object returned from ItsService 1030 """ 1031 output_surface = self.preview_surface(video_size) 1032 cmd = { 1033 _CMD_NAME_STR: 'doDynamicZoomPreviewRecording', 1034 _CAMERA_ID_STR: self._camera_id, 1035 'outputSurfaces': output_surface, 1036 'stabilize': stabilize, 1037 'ois': False 1038 } 1039 zoom_start, zoom_end, step_size, step_duration = sweep_zoom 1040 if (not self.zoom_ratio_within_range(zoom_start) or 1041 not self.zoom_ratio_within_range(zoom_end)): 1042 raise AssertionError( 1043 f'Starting zoom ratio {zoom_start} or ' 1044 f'ending zoom ratio {zoom_end} out of range' 1045 ) 1046 if zoom_start > zoom_end or step_size < 0: 1047 raise NotImplementedError('Only increasing zoom ratios are supported') 1048 cmd['zoomStart'] = zoom_start 1049 cmd['zoomEnd'] = zoom_end 1050 cmd['stepSize'] = step_size 1051 cmd['stepDuration'] = step_duration 1052 cmd['hlg10Enabled'] = False 1053 cmd['paddedFrames'] = padded_frames 1054 if ae_target_fps_min and ae_target_fps_max: 1055 cmd['aeTargetFpsMin'] = ae_target_fps_min 1056 cmd['aeTargetFpsMax'] = ae_target_fps_max 1057 return self._execute_preview_recording(cmd) 1058 1059 def do_preview_recording_with_dynamic_ae_awb_region( 1060 self, video_size, ae_awb_regions, ae_awb_region_duration, stabilize=False, 1061 ae_target_fps_min=None, ae_target_fps_max=None): 1062 """Issue a preview request with dynamic 3A region and read back output object. 1063 1064 The resolution of the preview and its recording will be determined by 1065 video_size. The recorded object consists of a path on the device at which 1066 the recorded video is saved. 1067 1068 Args: 1069 video_size: str; Preview resolution at which to record. ex. "1920x1080" 1070 ae_awb_regions: dictionary of (aeAwbRegionOne/Two/Three/Four). 1071 Used to control 3A region during recording. 1072 aeAwbRegionOne (metering rectangle) first ae/awb region of recording. 1073 aeAwbRegionTwo (metering rectangle) second ae/awb region of recording. 1074 aeAwbRegionThree (metering rectangle) third ae/awb region of recording. 1075 aeAwbRegionFour (metering rectangle) fourth ae/awb region of recording. 1076 ae_awb_region_duration: float; sleep in ms between 3A regions. 1077 stabilize: boolean; Whether the preview should be stabilized. 1078 ae_target_fps_min: int; If not none, set CONTROL_AE_TARGET_FPS_RANGE min. 1079 ae_target_fps_max: int; If not none, set CONTROL_AE_TARGET_FPS_RANGE max. 1080 Returns: 1081 video_recorded_object: The recorded object returned from ItsService. 1082 """ 1083 output_surface = self.preview_surface(video_size) 1084 cmd = { 1085 _CMD_NAME_STR: 'doDynamicMeteringRegionPreviewRecording', 1086 _CAMERA_ID_STR: self._camera_id, 1087 'outputSurfaces': output_surface, 1088 'stabilize': stabilize, 1089 'ois': False, 1090 'aeAwbRegionDuration': ae_awb_region_duration 1091 } 1092 1093 cmd['aeAwbRegionOne'] = ae_awb_regions['aeAwbRegionOne'] 1094 cmd['aeAwbRegionTwo'] = ae_awb_regions['aeAwbRegionTwo'] 1095 cmd['aeAwbRegionThree'] = ae_awb_regions['aeAwbRegionThree'] 1096 cmd['aeAwbRegionFour'] = ae_awb_regions['aeAwbRegionFour'] 1097 cmd['hlg10Enabled'] = False 1098 if ae_target_fps_min and ae_target_fps_max: 1099 cmd['aeTargetFpsMin'] = ae_target_fps_min 1100 cmd['aeTargetFpsMax'] = ae_target_fps_max 1101 return self._execute_preview_recording(cmd) 1102 1103 def get_supported_video_qualities(self, camera_id): 1104 """Get all supported video qualities for this camera device. 1105 1106 ie. ['480:4', '1080:6', '2160:8', '720:5', 'CIF:3', 'HIGH:1', 'LOW:0', 1107 'QCIF:2', 'QVGA:7'] 1108 1109 Args: 1110 camera_id: device id 1111 Returns: 1112 List of all supported video qualities and corresponding profileIds. 1113 """ 1114 cmd = {} 1115 cmd[_CMD_NAME_STR] = 'getSupportedVideoQualities' 1116 cmd[_CAMERA_ID_STR] = camera_id 1117 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1118 data, _ = self.__read_response_from_socket() 1119 if data[_TAG_STR] != 'supportedVideoQualities': 1120 raise error_util.CameraItsError('Invalid command response') 1121 return data[_STR_VALUE_STR].split(';')[:-1] # remove the last appended ';' 1122 1123 def get_all_supported_preview_sizes(self, camera_id): 1124 """Get all supported preview resolutions for this camera device. 1125 1126 ie. ['640x480', '800x600', '1280x720', '1440x1080', '1920x1080'] 1127 1128 Note: resolutions are sorted by width x height in ascending order 1129 1130 Args: 1131 camera_id: int; device id 1132 1133 Returns: 1134 List of all supported preview resolutions in ascending order. 1135 """ 1136 cmd = { 1137 _CMD_NAME_STR: 'getSupportedPreviewSizes', 1138 _CAMERA_ID_STR: camera_id 1139 } 1140 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1141 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1142 self.sock.settimeout(timeout) 1143 data, _ = self.__read_response_from_socket() 1144 if data[_TAG_STR] != 'supportedPreviewSizes': 1145 raise error_util.CameraItsError('Invalid command response') 1146 if not data[_STR_VALUE_STR]: 1147 raise error_util.CameraItsError('No supported preview sizes') 1148 supported_preview_sizes = data[_STR_VALUE_STR].split(';') 1149 logging.debug('Supported preview sizes: %s', supported_preview_sizes) 1150 return supported_preview_sizes 1151 1152 def get_supported_preview_sizes(self, camera_id): 1153 """Get supported preview resolutions for this camera device. 1154 1155 ie. ['640x480', '800x600', '1280x720', '1440x1080', '1920x1080'] 1156 1157 Note: resolutions are sorted by width x height in ascending order 1158 Note: max resolution is capped at 1440x1920. 1159 Note: min resolution is capped at 320x240. 1160 1161 Args: 1162 camera_id: int; device id 1163 1164 Returns: 1165 List of all supported preview resolutions with floor & ceiling set 1166 by _CONSTANTS in ascending order. 1167 """ 1168 supported_preview_sizes = self.get_all_supported_preview_sizes(camera_id) 1169 resolution_to_area = lambda s: int(s.split('x')[0])*int(s.split('x')[1]) 1170 supported_preview_sizes = [size for size in supported_preview_sizes 1171 if (resolution_to_area(size) 1172 <= PREVIEW_MAX_TESTED_AREA 1173 and resolution_to_area(size) 1174 >= PREVIEW_MIN_TESTED_AREA)] 1175 logging.debug( 1176 'Supported preview sizes (MIN: %d, MAX: %d area in pixels): %s', 1177 PREVIEW_MIN_TESTED_AREA, PREVIEW_MAX_TESTED_AREA, 1178 supported_preview_sizes 1179 ) 1180 return supported_preview_sizes 1181 1182 def get_supported_extension_preview_sizes(self, camera_id, extension): 1183 """Get all supported preview resolutions for the extension mode. 1184 1185 ie. ['640x480', '800x600', '1280x720', '1440x1080', '1920x1080'] 1186 1187 Note: resolutions are sorted by width x height in ascending order 1188 1189 Args: 1190 camera_id: int; device id 1191 extension: int; camera extension mode 1192 1193 Returns: 1194 List of all supported camera extension preview resolutions in 1195 ascending order. 1196 """ 1197 cmd = { 1198 _CMD_NAME_STR: 'getSupportedExtensionPreviewSizes', 1199 _CAMERA_ID_STR: camera_id, 1200 "extension": extension 1201 } 1202 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1203 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1204 self.sock.settimeout(timeout) 1205 data, _ = self.__read_response_from_socket() 1206 if data[_TAG_STR] != 'supportedExtensionPreviewSizes': 1207 raise error_util.CameraItsError('Invalid command response') 1208 if not data[_STR_VALUE_STR]: 1209 raise error_util.CameraItsError('No supported extension preview sizes') 1210 supported_preview_sizes = data[_STR_VALUE_STR].split(';') 1211 logging.debug('Supported extension preview sizes: %s', supported_preview_sizes) 1212 return supported_preview_sizes 1213 1214 def get_queryable_stream_combinations(self): 1215 """Get all queryable stream combinations for this camera device. 1216 1217 This function parses the queryable stream combinations string 1218 returned from ItsService. The return value includes both the 1219 string and the parsed result. 1220 1221 One example of the queryable stream combination string is: 1222 1223 'priv:1920x1080+jpeg:4032x2268;priv:1280x720+priv:1280x720' 1224 1225 which can be parsed to: 1226 1227 [ 1228 { 1229 "name": "priv:1920x1080+jpeg:4032x2268", 1230 "combination": [ 1231 { 1232 "format": "priv", 1233 "size": "1920x1080" 1234 } 1235 { 1236 "format": "jpeg", 1237 "size": "4032x2268" 1238 } 1239 ] 1240 } 1241 { 1242 "name": "priv:1280x720+priv:1280x720", 1243 "combination": [ 1244 { 1245 "format": "priv", 1246 "size": "1280x720" 1247 }, 1248 { 1249 "format": "priv", 1250 "size": "1280x720" 1251 } 1252 ] 1253 } 1254 ] 1255 1256 Returns: 1257 Tuple of: 1258 - queryable stream combination string, and 1259 - parsed stream combinations 1260 """ 1261 cmd = { 1262 _CMD_NAME_STR: 'getQueryableStreamCombinations', 1263 } 1264 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1265 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1266 self.sock.settimeout(timeout) 1267 data, _ = self.__read_response_from_socket() 1268 if data[_TAG_STR] != 'queryableStreamCombinations': 1269 raise error_util.CameraItsError('Invalid command response') 1270 if not data[_STR_VALUE_STR]: 1271 raise error_util.CameraItsError('No queryable stream combinations') 1272 1273 # Parse the stream combination string 1274 combinations = [{ 1275 'name': c, 'combination': [ 1276 {'format': s.split(':')[0], 1277 'size': s.split(':')[1]} for s in c.split('+')]} 1278 for c in data[_STR_VALUE_STR].split(';')] 1279 1280 return data[_STR_VALUE_STR], combinations 1281 1282 def get_supported_extensions(self, camera_id): 1283 """Get all supported camera extensions for this camera device. 1284 1285 ie. [EXTENSION_AUTOMATIC, EXTENSION_BOKEH, 1286 EXTENSION_FACE_RETOUCH, EXTENSION_HDR, EXTENSION_NIGHT] 1287 where EXTENSION_AUTOMATIC is 0, EXTENSION_BOKEH is 1, etc. 1288 1289 Args: 1290 camera_id: int; device ID 1291 Returns: 1292 List of all supported extensions (as int) in ascending order. 1293 """ 1294 cmd = { 1295 'cmdName': 'getSupportedExtensions', 1296 'cameraId': camera_id 1297 } 1298 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1299 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1300 self.sock.settimeout(timeout) 1301 data, _ = self.__read_response_from_socket() 1302 if data['tag'] != 'supportedExtensions': 1303 raise error_util.CameraItsError('Invalid command response') 1304 if not data['strValue']: 1305 raise error_util.CameraItsError('No supported extensions') 1306 return [int(x) for x in str(data['strValue'][1:-1]).split(', ') if x] 1307 1308 def get_supported_extension_sizes(self, camera_id, extension, image_format): 1309 """Get all supported camera sizes for this camera, extension, and format. 1310 1311 Sorts in ascending order according to area, i.e. 1312 ['640x480', '800x600', '1280x720', '1440x1080', '1920x1080'] 1313 1314 Args: 1315 camera_id: int; device ID 1316 extension: int; the integer value of the extension. 1317 image_format: int; the integer value of the format. 1318 Returns: 1319 List of sizes supported for this camera, extension, and format. 1320 """ 1321 cmd = { 1322 'cmdName': 'getSupportedExtensionSizes', 1323 'cameraId': camera_id, 1324 'extension': extension, 1325 'format': image_format 1326 } 1327 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1328 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1329 self.sock.settimeout(timeout) 1330 data, _ = self.__read_response_from_socket() 1331 if data[_TAG_STR] != 'supportedExtensionSizes': 1332 raise error_util.CameraItsError('Invalid command response') 1333 if not data[_STR_VALUE_STR]: 1334 logging.debug('No supported extension sizes') 1335 return '' 1336 return data[_STR_VALUE_STR].split(';') 1337 1338 def get_display_size(self): 1339 """Get the display size of the screen. 1340 1341 Returns: 1342 The size of the display resolution in pixels. 1343 """ 1344 cmd = { 1345 'cmdName': 'getDisplaySize' 1346 } 1347 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1348 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1349 self.sock.settimeout(timeout) 1350 data, _ = self.__read_response_from_socket() 1351 if data['tag'] != 'displaySize': 1352 raise error_util.CameraItsError('Invalid command response') 1353 if not data['strValue']: 1354 raise error_util.CameraItsError('No display size') 1355 return data['strValue'].split('x') 1356 1357 def get_max_camcorder_profile_size(self, camera_id): 1358 """Get the maximum camcorder profile size for this camera device. 1359 1360 Args: 1361 camera_id: int; device id 1362 Returns: 1363 The maximum size among all camcorder profiles supported by this camera. 1364 """ 1365 cmd = { 1366 'cmdName': 'getMaxCamcorderProfileSize', 1367 'cameraId': camera_id 1368 } 1369 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1370 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 1371 self.sock.settimeout(timeout) 1372 data, _ = self.__read_response_from_socket() 1373 if data['tag'] != 'maxCamcorderProfileSize': 1374 raise error_util.CameraItsError('Invalid command response') 1375 if not data['strValue']: 1376 raise error_util.CameraItsError('No max camcorder profile size') 1377 return data['strValue'].split('x') 1378 1379 def do_simple_capture(self, cmd, out_surface): 1380 """Issue single capture request via command and read back image/metadata. 1381 1382 Args: 1383 cmd: Dictionary specifying command name, requests, and output surface. 1384 out_surface: Dictionary describing output surface. 1385 Returns: 1386 An object which contains following fields: 1387 * data: the image data as a numpy array of bytes. 1388 * width: the width of the captured image. 1389 * height: the height of the captured image. 1390 * format: image format 1391 * metadata: the capture result object 1392 """ 1393 fmt = out_surface['format'] if 'format' in out_surface else 'yuv' 1394 if fmt == 'jpg': fmt = 'jpeg' 1395 1396 # we only have 1 capture request and 1 surface by definition. 1397 ncap = SINGLE_CAPTURE_NCAP 1398 1399 cam_id = None 1400 bufs = {} 1401 yuv_bufs = {} 1402 if self._hidden_physical_id: 1403 out_surface['physicalCamera'] = self._hidden_physical_id 1404 1405 if 'physicalCamera' in out_surface: 1406 cam_id = out_surface['physicalCamera'] 1407 else: 1408 cam_id = self._camera_id 1409 1410 bufs[cam_id] = { 1411 'raw': [], 1412 'raw10': [], 1413 'raw12': [], 1414 'rawStats': [], 1415 'dng': [], 1416 'jpeg': [], 1417 'y8': [], 1418 'rawQuadBayer': [], 1419 'rawQuadBayerStats': [], 1420 'raw10Stats': [], 1421 'raw10QuadBayerStats': [], 1422 'raw10QuadBayer': [], 1423 } 1424 1425 # Only allow yuv output to multiple targets 1426 yuv_surface = None 1427 if cam_id == self._camera_id: 1428 if 'physicalCamera' not in out_surface: 1429 if out_surface['format'] == 'yuv': 1430 yuv_surface = out_surface 1431 else: 1432 if ('physicalCamera' in out_surface and 1433 out_surface['physicalCamera'] == cam_id): 1434 if out_surface['format'] == 'yuv': 1435 yuv_surface = out_surface 1436 1437 # Compute the buffer size of YUV targets 1438 yuv_maxsize_1d = 0 1439 if yuv_surface is not None: 1440 if ('width' not in yuv_surface and 'height' not in yuv_surface): 1441 if self.props is None: 1442 raise error_util.CameraItsError('Camera props are unavailable') 1443 yuv_maxsize_2d = capture_request_utils.get_available_output_sizes( 1444 'yuv', self.props)[0] 1445 # YUV420 size = 1.5 bytes per pixel 1446 yuv_maxsize_1d = (yuv_maxsize_2d[0] * yuv_maxsize_2d[1] * 3) // 2 1447 if 'width' in yuv_surface and 'height' in yuv_surface: 1448 yuv_size = (yuv_surface['width'] * yuv_surface['height'] * 3) // 2 1449 else: 1450 yuv_size = yuv_maxsize_1d 1451 1452 yuv_bufs[cam_id] = {yuv_size: []} 1453 1454 cam_ids = self._camera_id 1455 self.sock.settimeout(self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT) 1456 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1457 1458 nbufs = 0 1459 md = None 1460 physical_md = None 1461 width = None 1462 height = None 1463 capture_results_returned = False 1464 while (nbufs < ncap) or (not capture_results_returned): 1465 json_obj, buf = self.__read_response_from_socket() 1466 if (json_obj[_TAG_STR] in ItsSession.IMAGE_FORMAT_LIST_1 and 1467 buf is not None): 1468 fmt = json_obj[_TAG_STR][:-5] 1469 bufs[self._camera_id][fmt].append(buf) 1470 nbufs += 1 1471 elif json_obj[_TAG_STR] == 'yuvImage': 1472 buf_size = numpy.product(buf.shape) 1473 yuv_bufs[self._camera_id][buf_size].append(buf) 1474 nbufs += 1 1475 elif json_obj[_TAG_STR] == 'captureResults': 1476 capture_results_returned = True 1477 md = json_obj[_OBJ_VALUE_STR]['captureResult'] 1478 physical_md = json_obj[_OBJ_VALUE_STR]['physicalResults'] 1479 outputs = json_obj[_OBJ_VALUE_STR]['outputs'] 1480 returned_fmt = outputs[0]['format'] 1481 if fmt != returned_fmt: 1482 raise AssertionError( 1483 f'Incorrect format. Requested: {fmt}, ' 1484 f'Received: {returned_fmt}') 1485 width = outputs[0]['width'] 1486 height = outputs[0]['height'] 1487 requested_width = out_surface['width'] 1488 requested_height = out_surface['height'] 1489 if requested_width != width or requested_height != height: 1490 raise AssertionError( 1491 'Incorrect size. ' 1492 f'Requested: {requested_width}x{requested_height}, ' 1493 f'Received: {width}x{height}') 1494 else: 1495 tag_string = unicodedata.normalize('NFKD', json_obj[_TAG_STR]).encode( 1496 'ascii', 'ignore') 1497 for x in ItsSession.IMAGE_FORMAT_LIST_2: 1498 x = bytes(x, encoding='utf-8') 1499 if tag_string.startswith(x): 1500 if x == b'yuvImage': 1501 physical_id = json_obj[_TAG_STR][len(x):] 1502 if physical_id in cam_ids: 1503 buf_size = numpy.product(buf.shape) 1504 yuv_bufs[physical_id][buf_size].append(buf) 1505 nbufs += 1 1506 else: 1507 physical_id = json_obj[_TAG_STR][len(x):] 1508 if physical_id in cam_ids: 1509 fmt = x[:-5].decode('UTF-8') 1510 bufs[physical_id][fmt].append(buf) 1511 nbufs += 1 1512 1513 if 'physicalCamera' in out_surface: 1514 cam_id = out_surface['physicalCamera'] 1515 else: 1516 cam_id = self._camera_id 1517 ret = {'width': width, 'height': height, 'format': fmt} 1518 if cam_id == self._camera_id: 1519 ret['metadata'] = md 1520 else: 1521 if cam_id in physical_md: 1522 ret['metadata'] = physical_md[cam_id] 1523 1524 if fmt == 'yuv': 1525 buf_size = (width * height * 3) // 2 1526 ret['data'] = yuv_bufs[cam_id][buf_size][0] 1527 else: 1528 ret['data'] = bufs[cam_id][fmt][0] 1529 1530 return ret 1531 1532 def do_jca_capture(self, dut, log_path, flash, facing): 1533 """Take a capture using JCA, modifying capture settings using the UI. 1534 1535 Selects UI elements to modify settings, and presses the capture button. 1536 Reads response from socket containing the capture path, and 1537 pulls the image from the DUT. 1538 1539 This method is included here because an ITS session is needed to retrieve 1540 the capture path from the device. 1541 1542 Args: 1543 dut: An Android controller device object. 1544 log_path: str; log path to save screenshots. 1545 flash: str; constant describing the desired flash mode. 1546 Acceptable values: 'OFF' and 'AUTO'. 1547 facing: str; constant describing the direction the camera lens faces. 1548 Acceptable values: camera_properties_utils.LENS_FACING[BACK, FRONT] 1549 Returns: 1550 The host-side path of the capture. 1551 """ 1552 ui_interaction_utils.open_jca_viewfinder(dut, log_path) 1553 ui_interaction_utils.switch_jca_camera(dut, log_path, facing) 1554 # Bring up settings, switch flash mode, and close settings 1555 dut.ui(res=ui_interaction_utils.QUICK_SETTINGS_RESOURCE_ID).click() 1556 if flash not in ui_interaction_utils.FLASH_MODE_TO_CLICKS: 1557 raise ValueError(f'Flash mode {flash} not supported') 1558 for _ in range(ui_interaction_utils.FLASH_MODE_TO_CLICKS[flash]): 1559 dut.ui(res=ui_interaction_utils.QUICK_SET_FLASH_RESOURCE_ID).click() 1560 dut.take_screenshot(log_path, prefix='flash_mode_set') 1561 dut.ui(res=ui_interaction_utils.QUICK_SETTINGS_RESOURCE_ID).click() 1562 # Take capture 1563 dut.ui(res=ui_interaction_utils.CAPTURE_BUTTON_RESOURCE_ID).click() 1564 return self.get_and_pull_jca_capture(dut, log_path) 1565 1566 def get_and_pull_jca_capture(self, dut, log_path): 1567 """Retrieves a capture path from the socket and pulls capture to host. 1568 1569 Args: 1570 dut: An Android controller device object. 1571 log_path: str; log path to save screenshots. 1572 Returns: 1573 The host-side path of the capture. 1574 Raises: 1575 CameraItsError: If unexpected data is retrieved from the socket. 1576 """ 1577 capture_path, capture_status = None, None 1578 while not capture_path or not capture_status: 1579 data, _ = self.__read_response_from_socket() 1580 if data[_TAG_STR] == JCA_CAPTURE_PATH_TAG: 1581 capture_path = data[_STR_VALUE_STR] 1582 elif data[_TAG_STR] == JCA_CAPTURE_STATUS_TAG: 1583 capture_status = data[_STR_VALUE_STR] 1584 else: 1585 raise error_util.CameraItsError( 1586 f'Invalid response {data[_TAG_STR]} for JCA capture') 1587 if capture_status != RESULT_OK_STATUS: 1588 logging.error('Capture failed! Expected status %d, received %d', 1589 RESULT_OK_STATUS, capture_status) 1590 logging.debug('capture path: %s', capture_path) 1591 _, capture_name = os.path.split(capture_path) 1592 its_device_utils.run(f'adb -s {dut.serial} pull {capture_path} {log_path}') 1593 return os.path.join(log_path, capture_name) 1594 1595 def do_capture_with_flash(self, 1596 preview_request_start, 1597 preview_request_idle, 1598 still_capture_req, 1599 out_surface): 1600 """Issue capture request with flash and read back the image and metadata. 1601 1602 Captures a single image with still_capture_req as capture request 1603 with flash. It triggers the precapture sequence with preview request 1604 preview_request_start with capture intent preview by setting aePrecapture 1605 trigger to Start. This is followed by repeated preview requests 1606 preview_request_idle with aePrecaptureTrigger set to IDLE. 1607 Once the AE is converged, a single image is captured still_capture_req 1608 during which the flash must be fired. 1609 Note: The part where we read output data from socket is cloned from 1610 do_capture and will be consolidated in U. 1611 1612 Args: 1613 preview_request_start: Preview request with aePrecaptureTrigger set to 1614 Start 1615 preview_request_idle: Preview request with aePrecaptureTrigger set to Idle 1616 still_capture_req: Single still capture request. 1617 out_surface: Specifications of the output image formats and 1618 sizes to use for capture. Supports yuv and jpeg. 1619 Returns: 1620 An object which contains following fields: 1621 * data: the image data as a numpy array of bytes. 1622 * width: the width of the captured image. 1623 * height: the height of the captured image. 1624 * format: image format 1625 * metadata: the capture result object 1626 """ 1627 cmd = {} 1628 cmd[_CMD_NAME_STR] = 'doCaptureWithFlash' 1629 cmd['previewRequestStart'] = [preview_request_start] 1630 cmd['previewRequestIdle'] = [preview_request_idle] 1631 cmd['stillCaptureRequest'] = [still_capture_req] 1632 cmd['outputSurfaces'] = [out_surface] 1633 1634 logging.debug('Capturing image with ON_AUTO_FLASH.') 1635 return self.do_simple_capture(cmd, out_surface) 1636 1637 def do_capture_with_extensions(self, 1638 cap_request, 1639 extension, 1640 out_surface): 1641 """Issue extension capture request(s), and read back image(s) and metadata. 1642 1643 Args: 1644 cap_request: The Python dict/list specifying the capture(s), which will be 1645 converted to JSON and sent to the device. 1646 extension: The extension to be requested. 1647 out_surface: specifications of the output image format and 1648 size to use for the capture. 1649 1650 Returns: 1651 An object, list of objects, or list of lists of objects, where each 1652 object contains the following fields: 1653 * data: the image data as a numpy array of bytes. 1654 * width: the width of the captured image. 1655 * height: the height of the captured image. 1656 * format: image the format, in [ 1657 "yuv","jpeg","raw","raw10","raw12","rawStats","dng"]. 1658 * metadata: the capture result object (Python dictionary). 1659 """ 1660 cmd = {} 1661 cmd[_CMD_NAME_STR] = 'doCaptureWithExtensions' 1662 cmd['repeatRequests'] = [] 1663 cmd['captureRequests'] = [cap_request] 1664 cmd['extension'] = extension 1665 cmd['outputSurfaces'] = [out_surface] 1666 1667 logging.debug('Capturing image with EXTENSIONS.') 1668 return self.do_simple_capture(cmd, out_surface) 1669 1670 def do_capture(self, 1671 cap_request, 1672 out_surfaces=None, 1673 reprocess_format=None, 1674 repeat_request=None, 1675 reuse_session=False, 1676 first_surface_for_3a=False): 1677 """Issue capture request(s), and read back the image(s) and metadata. 1678 1679 The main top-level function for capturing one or more images using the 1680 device. Captures a single image if cap_request is a single object, and 1681 captures a burst if it is a list of objects. 1682 1683 The optional repeat_request field can be used to assign a repeating 1684 request list ran in background for 3 seconds to warm up the capturing 1685 pipeline before start capturing. The repeat_requests will be ran on a 1686 640x480 YUV surface without sending any data back. The caller needs to 1687 make sure the stream configuration defined by out_surfaces and 1688 repeat_request are valid or do_capture may fail because device does not 1689 support such stream configuration. 1690 1691 The out_surfaces field can specify the width(s), height(s), and 1692 format(s) of the captured image. The formats may be "yuv", "jpeg", 1693 "dng", "raw", "raw10", "raw12", "rawStats" or "y8". The default is a 1694 YUV420 frame ("yuv") corresponding to a full sensor frame. 1695 1696 1. Optionally the out_surfaces field can specify physical camera id(s) if 1697 the current camera device is a logical multi-camera. The physical camera 1698 id must refer to a physical camera backing this logical camera device. 1699 2. Optionally The output_surfaces field can also specify the use case(s) if 1700 the current camera device has STREAM_USE_CASE capability. 1701 1702 Note that one or more surfaces can be specified, allowing a capture to 1703 request images back in multiple formats (e.g.) raw+yuv, raw+jpeg, 1704 yuv+jpeg, raw+yuv+jpeg. If the size is omitted for a surface, the 1705 default is the largest resolution available for the format of that 1706 surface. At most one output surface can be specified for a given format, 1707 and raw+dng, raw10+dng, and raw+raw10 are not supported as combinations. 1708 1709 If reprocess_format is not None, for each request, an intermediate 1710 buffer of the given reprocess_format will be captured from camera and 1711 the intermediate buffer will be reprocessed to the output surfaces. The 1712 following settings will be turned off when capturing the intermediate 1713 buffer and will be applied when reprocessing the intermediate buffer. 1714 1. android.noiseReduction.mode 1715 2. android.edge.mode 1716 3. android.reprocess.effectiveExposureFactor 1717 1718 Supported reprocess format are "yuv" and "private". Supported output 1719 surface formats when reprocessing is enabled are "yuv" and "jpeg". 1720 1721 Example of a single capture request: 1722 1723 { 1724 "android.sensor.exposureTime": 100*1000*1000, 1725 "android.sensor.sensitivity": 100 1726 } 1727 1728 Example of a list of capture requests: 1729 [ 1730 { 1731 "android.sensor.exposureTime": 100*1000*1000, 1732 "android.sensor.sensitivity": 100 1733 }, 1734 { 1735 "android.sensor.exposureTime": 100*1000*1000, 1736 "android.sensor.sensitivity": 200 1737 } 1738 ] 1739 1740 Example of output surface specifications: 1741 { 1742 "width": 640, 1743 "height": 480, 1744 "format": "yuv" 1745 } 1746 [ 1747 { 1748 "format": "jpeg" 1749 }, 1750 { 1751 "format": "raw" 1752 } 1753 ] 1754 1755 The following variables defined in this class are shortcuts for 1756 specifying one or more formats where each output is the full size for 1757 that format; they can be used as values for the out_surfaces arguments: 1758 1759 CAP_RAW 1760 CAP_DNG 1761 CAP_YUV 1762 CAP_JPEG 1763 CAP_RAW_YUV 1764 CAP_DNG_YUV 1765 CAP_RAW_JPEG 1766 CAP_DNG_JPEG 1767 CAP_YUV_JPEG 1768 CAP_RAW_YUV_JPEG 1769 CAP_DNG_YUV_JPEG 1770 1771 If multiple formats are specified, then this function returns multiple 1772 capture objects, one for each requested format. If multiple formats and 1773 multiple captures (i.e. a burst) are specified, then this function 1774 returns multiple lists of capture objects. In both cases, the order of 1775 the returned objects matches the order of the requested formats in the 1776 out_surfaces parameter. For example: 1777 1778 yuv_cap = do_capture(req1) 1779 yuv_cap = do_capture(req1,yuv_fmt) 1780 yuv_cap, raw_cap = do_capture(req1, [yuv_fmt,raw_fmt]) 1781 yuv_caps = do_capture([req1,req2], yuv_fmt) 1782 yuv_caps, raw_caps = do_capture([req1,req2], [yuv_fmt,raw_fmt]) 1783 1784 The "rawStats" format processes the raw image and returns a new image 1785 of statistics from the raw image. The format takes additional keys, 1786 "gridWidth" and "gridHeight" which are size of grid cells in a 2D grid 1787 of the raw image. For each grid cell, the mean and variance of each raw 1788 channel is computed, and the do_capture call returns two 4-element float 1789 images of dimensions (rawWidth / gridWidth, rawHeight / gridHeight), 1790 concatenated back-to-back, where the first image contains the 4-channel 1791 means and the second contains the 4-channel variances. Note that only 1792 pixels in the active array crop region are used; pixels outside this 1793 region (for example optical black rows) are cropped out before the 1794 gridding and statistics computation is performed. 1795 1796 For the rawStats format, if the gridWidth is not provided then the raw 1797 image width is used as the default, and similarly for gridHeight. With 1798 this, the following is an example of a output description that computes 1799 the mean and variance across each image row: 1800 { 1801 "gridHeight": 1, 1802 "format": "rawStats" 1803 } 1804 1805 Args: 1806 cap_request: The Python dict/list specifying the capture(s), which will be 1807 converted to JSON and sent to the device. 1808 out_surfaces: (Optional) specifications of the output image formats and 1809 sizes to use for each capture. 1810 reprocess_format: (Optional) The reprocessing format. If not 1811 None,reprocessing will be enabled. 1812 repeat_request: Repeating request list. 1813 reuse_session: True if ItsService.java should try to use 1814 the existing CameraCaptureSession. 1815 first_surface_for_3a: Use first surface in out_surfaces for 3A, not capture 1816 Only applicable if out_surfaces contains at least 1 surface. 1817 1818 Returns: 1819 An object, list of objects, or list of lists of objects, where each 1820 object contains the following fields: 1821 * data: the image data as a numpy array of bytes. 1822 * width: the width of the captured image. 1823 * height: the height of the captured image. 1824 * format: image the format, in [ 1825 "yuv","jpeg","raw","raw10","raw12","rawStats","dng"]. 1826 * metadata: the capture result object (Python dictionary). 1827 """ 1828 cmd = {} 1829 if reprocess_format is not None: 1830 if repeat_request is not None: 1831 raise error_util.CameraItsError( 1832 'repeating request + reprocessing is not supported') 1833 cmd[_CMD_NAME_STR] = 'doReprocessCapture' 1834 cmd['reprocessFormat'] = reprocess_format 1835 else: 1836 cmd[_CMD_NAME_STR] = 'doCapture' 1837 1838 if repeat_request is None: 1839 cmd['repeatRequests'] = [] 1840 elif not isinstance(repeat_request, list): 1841 cmd['repeatRequests'] = [repeat_request] 1842 else: 1843 cmd['repeatRequests'] = repeat_request 1844 1845 if not isinstance(cap_request, list): 1846 cmd['captureRequests'] = [cap_request] 1847 else: 1848 cmd['captureRequests'] = cap_request 1849 1850 if out_surfaces: 1851 if isinstance(out_surfaces, list): 1852 cmd['outputSurfaces'] = out_surfaces 1853 else: 1854 cmd['outputSurfaces'] = [out_surfaces] 1855 formats = [ 1856 c['format'] if 'format' in c else 'yuv' for c in cmd['outputSurfaces'] 1857 ] 1858 formats = [s if s != 'jpg' else 'jpeg' for s in formats] 1859 else: 1860 max_yuv_size = capture_request_utils.get_available_output_sizes( 1861 'yuv', self.props)[0] 1862 formats = ['yuv'] 1863 cmd['outputSurfaces'] = [{ 1864 'format': 'yuv', 1865 'width': max_yuv_size[0], 1866 'height': max_yuv_size[1] 1867 }] 1868 1869 cmd['reuseSession'] = reuse_session 1870 cmd['firstSurfaceFor3A'] = first_surface_for_3a 1871 1872 requested_surfaces = cmd['outputSurfaces'][:] 1873 if first_surface_for_3a: 1874 formats.pop(0) 1875 requested_surfaces.pop(0) 1876 1877 ncap = len(cmd['captureRequests']) 1878 nsurf = len(formats) 1879 1880 cam_ids = [] 1881 bufs = {} 1882 yuv_bufs = {} 1883 for i, s in enumerate(cmd['outputSurfaces']): 1884 if self._hidden_physical_id: 1885 s['physicalCamera'] = self._hidden_physical_id 1886 1887 if 'physicalCamera' in s: 1888 cam_id = s['physicalCamera'] 1889 else: 1890 cam_id = self._camera_id 1891 1892 if cam_id not in cam_ids: 1893 cam_ids.append(cam_id) 1894 bufs[cam_id] = { 1895 'raw': [], 1896 'raw10': [], 1897 'raw12': [], 1898 'rawStats': [], 1899 'dng': [], 1900 'jpeg': [], 1901 'jpeg_r': [], 1902 'y8': [], 1903 'rawQuadBayer': [], 1904 'rawQuadBayerStats': [], 1905 'raw10Stats': [], 1906 'raw10QuadBayerStats': [], 1907 'raw10QuadBayer': [], 1908 } 1909 1910 for cam_id in cam_ids: 1911 # Only allow yuv output to multiple targets 1912 if cam_id == self._camera_id: 1913 yuv_surfaces = [ 1914 s for s in requested_surfaces 1915 if s['format'] == 'yuv' and 'physicalCamera' not in s 1916 ] 1917 formats_for_id = [ 1918 s['format'] 1919 for s in requested_surfaces 1920 if 'physicalCamera' not in s 1921 ] 1922 else: 1923 yuv_surfaces = [ 1924 s for s in requested_surfaces if s['format'] == 'yuv' and 1925 'physicalCamera' in s and s['physicalCamera'] == cam_id 1926 ] 1927 formats_for_id = [ 1928 s['format'] 1929 for s in requested_surfaces 1930 if 'physicalCamera' in s and s['physicalCamera'] == cam_id 1931 ] 1932 1933 n_yuv = len(yuv_surfaces) 1934 # Compute the buffer size of YUV targets 1935 yuv_maxsize_1d = 0 1936 for s in yuv_surfaces: 1937 if ('width' not in s and 'height' not in s): 1938 if self.props is None: 1939 raise error_util.CameraItsError('Camera props are unavailable') 1940 yuv_maxsize_2d = capture_request_utils.get_available_output_sizes( 1941 'yuv', self.props)[0] 1942 # YUV420 size = 1.5 bytes per pixel 1943 yuv_maxsize_1d = (yuv_maxsize_2d[0] * yuv_maxsize_2d[1] * 3) // 2 1944 break 1945 yuv_sizes = [ 1946 (c['width'] * c['height'] * 3) // 2 1947 if 'width' in c and 'height' in c else yuv_maxsize_1d 1948 for c in yuv_surfaces 1949 ] 1950 # Currently we don't pass enough metadata from ItsService to distinguish 1951 # different yuv stream of same buffer size 1952 if len(yuv_sizes) != len(set(yuv_sizes)): 1953 raise error_util.CameraItsError( 1954 'ITS does not support yuv outputs of same buffer size') 1955 if len(formats_for_id) > len(set(formats_for_id)): 1956 if n_yuv != len(formats_for_id) - len(set(formats_for_id)) + 1: 1957 raise error_util.CameraItsError('Duplicate format requested') 1958 1959 yuv_bufs[cam_id] = {size: [] for size in yuv_sizes} 1960 1961 logging.debug('yuv bufs: %s', yuv_bufs) 1962 raw_formats = 0 1963 raw_formats += 1 if 'dng' in formats else 0 1964 raw_formats += 1 if 'raw' in formats else 0 1965 raw_formats += 1 if 'raw10' in formats else 0 1966 raw_formats += 1 if 'raw12' in formats else 0 1967 raw_formats += 1 if 'rawStats' in formats else 0 1968 raw_formats += 1 if 'rawQuadBayer' in formats else 0 1969 raw_formats += 1 if 'rawQuadBayerStats' in formats else 0 1970 raw_formats += 1 if 'raw10Stats' in formats else 0 1971 raw_formats += 1 if 'raw10QuadBayer' in formats else 0 1972 raw_formats += 1 if 'raw10QuadBayerStats' in formats else 0 1973 1974 if raw_formats > 1: 1975 raise error_util.CameraItsError('Different raw formats not supported') 1976 1977 # Detect long exposure time and set timeout accordingly 1978 longest_exp_time = 0 1979 for req in cmd['captureRequests']: 1980 if 'android.sensor.exposureTime' in req and req[ 1981 'android.sensor.exposureTime'] > longest_exp_time: 1982 longest_exp_time = req['android.sensor.exposureTime'] 1983 1984 extended_timeout = longest_exp_time // self.SEC_TO_NSEC + self.SOCK_TIMEOUT 1985 if repeat_request: 1986 extended_timeout += self.EXTRA_SOCK_TIMEOUT 1987 self.sock.settimeout(extended_timeout) 1988 1989 logging.debug('Capturing %d frame%s with %d format%s [%s]', ncap, 1990 's' if ncap > 1 else '', nsurf, 's' if nsurf > 1 else '', 1991 ','.join(formats)) 1992 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1993 1994 # Wait for ncap*nsurf images and ncap metadata responses. 1995 # Assume that captures come out in the same order as requested in 1996 # the burst, however individual images of different formats can come 1997 # out in any order for that capture. 1998 nbufs = 0 1999 mds = [] 2000 physical_mds = [] 2001 widths = None 2002 heights = None 2003 camera_id = ( 2004 self._camera_id 2005 if not self._hidden_physical_id 2006 else self._hidden_physical_id 2007 ) 2008 logging.debug('Using camera_id %s to store buffers', camera_id) 2009 while nbufs < ncap * nsurf or len(mds) < ncap: 2010 json_obj, buf = self.__read_response_from_socket() 2011 if (json_obj[_TAG_STR] in ItsSession.IMAGE_FORMAT_LIST_1 and 2012 buf is not None): 2013 fmt = json_obj[_TAG_STR][:-5] 2014 bufs[camera_id][fmt].append(buf) 2015 nbufs += 1 2016 # Physical camera is appended to the tag string of a private capture 2017 elif json_obj[_TAG_STR].startswith('privImage'): 2018 # The private image format buffers are opaque to camera clients 2019 # and cannot be accessed. 2020 nbufs += 1 2021 elif json_obj[_TAG_STR] == 'yuvImage': 2022 buf_size = numpy.product(buf.shape) 2023 yuv_bufs[camera_id][buf_size].append(buf) 2024 nbufs += 1 2025 elif json_obj[_TAG_STR] == 'captureResults': 2026 mds.append(json_obj[_OBJ_VALUE_STR]['captureResult']) 2027 physical_mds.append(json_obj[_OBJ_VALUE_STR]['physicalResults']) 2028 outputs = json_obj[_OBJ_VALUE_STR]['outputs'] 2029 widths = [out['width'] for out in outputs] 2030 heights = [out['height'] for out in outputs] 2031 else: 2032 tag_string = unicodedata.normalize('NFKD', json_obj[_TAG_STR]).encode( 2033 'ascii', 'ignore') 2034 for x in ItsSession.IMAGE_FORMAT_LIST_2: 2035 x = bytes(x, encoding='utf-8') 2036 if tag_string.startswith(x): 2037 if x == b'yuvImage': 2038 physical_id = json_obj[_TAG_STR][len(x):] 2039 if physical_id in cam_ids: 2040 buf_size = numpy.product(buf.shape) 2041 yuv_bufs[physical_id][buf_size].append(buf) 2042 nbufs += 1 2043 else: 2044 physical_id = json_obj[_TAG_STR][len(x):] 2045 if physical_id in cam_ids: 2046 fmt = x[:-5].decode('UTF-8') 2047 bufs[physical_id][fmt].append(buf) 2048 nbufs += 1 2049 rets = [] 2050 for j, fmt in enumerate(formats): 2051 objs = [] 2052 if 'physicalCamera' in requested_surfaces[j]: 2053 cam_id = requested_surfaces[j]['physicalCamera'] 2054 else: 2055 cam_id = self._camera_id 2056 2057 for i in range(ncap): 2058 obj = {} 2059 obj['width'] = widths[j] 2060 obj['height'] = heights[j] 2061 obj['format'] = fmt 2062 if cam_id == self._camera_id: 2063 obj['metadata'] = mds[i] 2064 else: 2065 for physical_md in physical_mds[i]: 2066 if cam_id in physical_md: 2067 obj['metadata'] = physical_md[cam_id] 2068 break 2069 2070 if fmt == 'yuv': 2071 buf_size = (widths[j] * heights[j] * 3) // 2 2072 obj['data'] = yuv_bufs[cam_id][buf_size][i] 2073 elif fmt != 'priv': 2074 obj['data'] = bufs[cam_id][fmt][i] 2075 objs.append(obj) 2076 rets.append(objs if ncap > 1 else objs[0]) 2077 self.sock.settimeout(self.SOCK_TIMEOUT) 2078 if len(rets) > 1 or (isinstance(rets[0], dict) and 2079 isinstance(cap_request, list)): 2080 return rets 2081 else: 2082 return rets[0] 2083 2084 def do_vibrate(self, pattern): 2085 """Cause the device to vibrate to a specific pattern. 2086 2087 Args: 2088 pattern: Durations (ms) for which to turn on or off the vibrator. 2089 The first value indicates the number of milliseconds to wait 2090 before turning the vibrator on. The next value indicates the 2091 number of milliseconds for which to keep the vibrator on 2092 before turning it off. Subsequent values alternate between 2093 durations in milliseconds to turn the vibrator off or to turn 2094 the vibrator on. 2095 2096 Returns: 2097 Nothing. 2098 """ 2099 cmd = {} 2100 cmd[_CMD_NAME_STR] = 'doVibrate' 2101 cmd['pattern'] = pattern 2102 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2103 data, _ = self.__read_response_from_socket() 2104 if data[_TAG_STR] != 'vibrationStarted': 2105 raise error_util.CameraItsError('Invalid response for command: %s' % 2106 cmd[_CMD_NAME_STR]) 2107 2108 def set_audio_restriction(self, mode): 2109 """Set the audio restriction mode for this camera device. 2110 2111 Args: 2112 mode: int; the audio restriction mode. See CameraDevice.java for valid 2113 value. 2114 Returns: 2115 Nothing. 2116 """ 2117 cmd = {} 2118 cmd[_CMD_NAME_STR] = 'setAudioRestriction' 2119 cmd['mode'] = mode 2120 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2121 data, _ = self.__read_response_from_socket() 2122 if data[_TAG_STR] != 'audioRestrictionSet': 2123 raise error_util.CameraItsError('Invalid response for command: %s' % 2124 cmd[_CMD_NAME_STR]) 2125 2126 # pylint: disable=dangerous-default-value 2127 def do_3a(self, 2128 regions_ae=[[0, 0, 1, 1, 1]], 2129 regions_awb=[[0, 0, 1, 1, 1]], 2130 regions_af=[[0, 0, 1, 1, 1]], 2131 do_awb=True, 2132 do_af=True, 2133 lock_ae=False, 2134 lock_awb=False, 2135 get_results=False, 2136 ev_comp=0, 2137 auto_flash=False, 2138 mono_camera=False, 2139 zoom_ratio=None, 2140 out_surfaces=None, 2141 repeat_request=None, 2142 first_surface_for_3a=False): 2143 """Perform a 3A operation on the device. 2144 2145 Triggers some or all of AE, AWB, and AF, and returns once they have 2146 converged. Uses the vendor 3A that is implemented inside the HAL. 2147 Note: do_awb is always enabled regardless of do_awb flag 2148 2149 Throws an assertion if 3A fails to converge. 2150 2151 Args: 2152 regions_ae: List of weighted AE regions. 2153 regions_awb: List of weighted AWB regions. 2154 regions_af: List of weighted AF regions. 2155 do_awb: Wait for AWB to converge. 2156 do_af: Trigger AF and wait for it to converge. 2157 lock_ae: Request AE lock after convergence, and wait for it. 2158 lock_awb: Request AWB lock after convergence, and wait for it. 2159 get_results: Return the 3A results from this function. 2160 ev_comp: An EV compensation value to use when running AE. 2161 auto_flash: AE control boolean to enable auto flash. 2162 mono_camera: Boolean for monochrome camera. 2163 zoom_ratio: Zoom ratio. None if default zoom 2164 out_surfaces: dict; see do_capture() for specifications on out_surfaces. 2165 CameraCaptureSession will only be reused if out_surfaces is specified. 2166 repeat_request: repeating request list. 2167 See do_capture() for specifications on repeat_request. 2168 first_surface_for_3a: Use first surface in output_surfaces for 3A. 2169 Only applicable if out_surfaces contains at least 1 surface. 2170 2171 Region format in args: 2172 Arguments are lists of weighted regions; each weighted region is a 2173 list of 5 values, [x, y, w, h, wgt], and each argument is a list of 2174 these 5-value lists. The coordinates are given as normalized 2175 rectangles (x, y, w, h) specifying the region. For example: 2176 [[0.0, 0.0, 1.0, 0.5, 5], [0.0, 0.5, 1.0, 0.5, 10]]. 2177 Weights are non-negative integers. 2178 2179 Returns: 2180 Five values are returned if get_results is true: 2181 * AE sensitivity; 2182 * AE exposure time; 2183 * AWB gains (list); 2184 * AWB transform (list); 2185 * AF focus position; None if do_af is false 2186 Otherwise, it returns five None values. 2187 """ 2188 logging.debug('Running vendor 3A on device') 2189 cmd = {} 2190 cmd[_CMD_NAME_STR] = 'do3A' 2191 reuse_session = False 2192 if out_surfaces: 2193 reuse_session = True 2194 if isinstance(out_surfaces, list): 2195 cmd['outputSurfaces'] = out_surfaces 2196 else: 2197 cmd['outputSurfaces'] = [out_surfaces] 2198 if repeat_request is None: 2199 cmd['repeatRequests'] = [] 2200 elif not isinstance(repeat_request, list): 2201 cmd['repeatRequests'] = [repeat_request] 2202 else: 2203 cmd['repeatRequests'] = repeat_request 2204 2205 cmd['regions'] = { 2206 'ae': sum(regions_ae, []), 2207 'awb': sum(regions_awb, []), 2208 'af': sum(regions_af, []) 2209 } 2210 do_ae = True # Always run AE 2211 cmd['triggers'] = {'ae': do_ae, 'af': do_af} 2212 if lock_ae: 2213 cmd['aeLock'] = True 2214 if lock_awb: 2215 cmd['awbLock'] = True 2216 if ev_comp != 0: 2217 cmd['evComp'] = ev_comp 2218 if auto_flash: 2219 cmd['autoFlash'] = True 2220 if self._hidden_physical_id: 2221 cmd['physicalId'] = self._hidden_physical_id 2222 if zoom_ratio: 2223 if self.zoom_ratio_within_range(zoom_ratio): 2224 cmd['zoomRatio'] = zoom_ratio 2225 else: 2226 raise AssertionError(f'Zoom ratio {zoom_ratio} out of range') 2227 cmd['reuseSession'] = reuse_session 2228 cmd['firstSurfaceFor3A'] = first_surface_for_3a 2229 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2230 2231 # Wait for each specified 3A to converge. 2232 ae_sens = None 2233 ae_exp = None 2234 awb_gains = None 2235 awb_transform = None 2236 af_dist = None 2237 converged = False 2238 while True: 2239 data, _ = self.__read_response_from_socket() 2240 vals = data[_STR_VALUE_STR].split() 2241 if data[_TAG_STR] == 'aeResult': 2242 if do_ae: 2243 ae_sens, ae_exp = [int(i) for i in vals] 2244 elif data[_TAG_STR] == 'afResult': 2245 if do_af: 2246 af_dist = float(vals[0]) 2247 elif data[_TAG_STR] == 'awbResult': 2248 awb_gains = [float(f) for f in vals[:4]] 2249 awb_transform = [float(f) for f in vals[4:]] 2250 elif data[_TAG_STR] == '3aConverged': 2251 converged = True 2252 elif data[_TAG_STR] == '3aDone': 2253 break 2254 else: 2255 raise error_util.CameraItsError('Invalid command response') 2256 if converged and not get_results: 2257 return None, None, None, None, None 2258 if (do_ae and ae_sens is None or 2259 (not mono_camera and do_awb and awb_gains is None) or 2260 do_af and af_dist is None or not converged): 2261 raise error_util.CameraItsError('3A failed to converge') 2262 return ae_sens, ae_exp, awb_gains, awb_transform, af_dist 2263 2264 def calc_camera_fov(self, props): 2265 """Determine the camera field of view from internal params. 2266 2267 Args: 2268 props: Camera properties object. 2269 2270 Returns: 2271 camera_fov: string; field of view for camera. 2272 """ 2273 2274 focal_ls = props['android.lens.info.availableFocalLengths'] 2275 if len(focal_ls) > 1: 2276 logging.debug('Doing capture to determine logical camera focal length') 2277 cap = self.do_capture(capture_request_utils.auto_capture_request()) 2278 focal_l = cap['metadata']['android.lens.focalLength'] 2279 else: 2280 focal_l = focal_ls[0] 2281 2282 sensor_size = props['android.sensor.info.physicalSize'] 2283 diag = math.sqrt(sensor_size['height']**2 + sensor_size['width']**2) 2284 try: 2285 fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2)) 2286 except ValueError: 2287 fov = str(0) 2288 logging.debug('Calculated FoV: %s', fov) 2289 return fov 2290 2291 def get_file_name_to_load(self, chart_distance, camera_fov, scene): 2292 """Get the image to load on the tablet depending on fov and chart_distance. 2293 2294 Args: 2295 chart_distance: float; distance in cm from camera of displayed chart 2296 camera_fov: float; camera field of view. 2297 scene: String; Scene to be used in the test. 2298 2299 Returns: 2300 file_name: file name to display on the tablet. 2301 2302 """ 2303 chart_scaling = opencv_processing_utils.calc_chart_scaling( 2304 chart_distance, camera_fov) 2305 if math.isclose( 2306 chart_scaling, 2307 opencv_processing_utils.SCALE_WIDE_IN_22CM_RIG, 2308 abs_tol=SCALING_TO_FILE_ATOL): 2309 file_name = f'{scene}_{opencv_processing_utils.SCALE_WIDE_IN_22CM_RIG}x_scaled.png' 2310 elif math.isclose( 2311 chart_scaling, 2312 opencv_processing_utils.SCALE_TELE_IN_22CM_RIG, 2313 abs_tol=SCALING_TO_FILE_ATOL): 2314 file_name = f'{scene}_{opencv_processing_utils.SCALE_TELE_IN_22CM_RIG}x_scaled.png' 2315 elif math.isclose( 2316 chart_scaling, 2317 opencv_processing_utils.SCALE_TELE25_IN_31CM_RIG, 2318 abs_tol=SCALING_TO_FILE_ATOL): 2319 file_name = f'{scene}_{opencv_processing_utils.SCALE_TELE25_IN_31CM_RIG}x_scaled.png' 2320 elif math.isclose( 2321 chart_scaling, 2322 opencv_processing_utils.SCALE_TELE40_IN_31CM_RIG, 2323 abs_tol=SCALING_TO_FILE_ATOL): 2324 file_name = f'{scene}_{opencv_processing_utils.SCALE_TELE40_IN_31CM_RIG}x_scaled.png' 2325 elif math.isclose( 2326 chart_scaling, 2327 opencv_processing_utils.SCALE_TELE_IN_31CM_RIG, 2328 abs_tol=SCALING_TO_FILE_ATOL): 2329 file_name = f'{scene}_{opencv_processing_utils.SCALE_TELE_IN_31CM_RIG}x_scaled.png' 2330 else: 2331 file_name = f'{scene}.png' 2332 logging.debug('Scene to load: %s', file_name) 2333 return file_name 2334 2335 def is_stream_combination_supported(self, out_surfaces, settings=None): 2336 """Query whether out_surfaces combination and settings are supported by the camera device. 2337 2338 This function hooks up to the isSessionConfigurationSupported()/ 2339 isSessionConfigurationWithSettingsSupported() camera API 2340 to query whether a particular stream combination and settings are supported. 2341 2342 Args: 2343 out_surfaces: dict; see do_capture() for specifications on out_surfaces. 2344 settings: dict; optional capture request settings metadata. 2345 2346 Returns: 2347 Boolean 2348 """ 2349 cmd = {} 2350 cmd[_CMD_NAME_STR] = 'isStreamCombinationSupported' 2351 cmd[_CAMERA_ID_STR] = self._camera_id 2352 2353 if isinstance(out_surfaces, list): 2354 cmd['outputSurfaces'] = out_surfaces 2355 for out_surface in out_surfaces: 2356 if self._hidden_physical_id: 2357 out_surface['physicalCamera'] = self._hidden_physical_id 2358 else: 2359 cmd['outputSurfaces'] = [out_surfaces] 2360 if self._hidden_physical_id: 2361 out_surfaces['physicalCamera'] = self._hidden_physical_id 2362 2363 if settings: 2364 cmd['settings'] = settings 2365 2366 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2367 2368 data, _ = self.__read_response_from_socket() 2369 if data[_TAG_STR] != 'streamCombinationSupport': 2370 raise error_util.CameraItsError('Failed to query stream combination') 2371 2372 return data[_STR_VALUE_STR] == 'supportedCombination' 2373 2374 def is_camera_privacy_mode_supported(self): 2375 """Query whether the mobile device supports camera privacy mode. 2376 2377 This function checks whether the mobile device has FEATURE_CAMERA_TOGGLE 2378 feature support, which indicates the camera device can run in privacy mode. 2379 2380 Returns: 2381 Boolean 2382 """ 2383 cmd = {} 2384 cmd[_CMD_NAME_STR] = 'isCameraPrivacyModeSupported' 2385 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2386 2387 data, _ = self.__read_response_from_socket() 2388 if data[_TAG_STR] != 'cameraPrivacyModeSupport': 2389 raise error_util.CameraItsError('Failed to query camera privacy mode' 2390 ' support') 2391 return data[_STR_VALUE_STR] == 'true' 2392 2393 def is_primary_camera(self): 2394 """Query whether the camera device is a primary rear/front camera. 2395 2396 A primary rear/front facing camera is a camera device with the lowest 2397 camera Id for that facing. 2398 2399 Returns: 2400 Boolean 2401 """ 2402 cmd = {} 2403 cmd[_CMD_NAME_STR] = 'isPrimaryCamera' 2404 cmd[_CAMERA_ID_STR] = self._camera_id 2405 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2406 2407 data, _ = self.__read_response_from_socket() 2408 if data[_TAG_STR] != 'primaryCamera': 2409 raise error_util.CameraItsError('Failed to query primary camera') 2410 return data[_STR_VALUE_STR] == 'true' 2411 2412 def is_performance_class(self): 2413 """Query whether the mobile device is an R or S performance class device. 2414 2415 Returns: 2416 Boolean 2417 """ 2418 cmd = {} 2419 cmd[_CMD_NAME_STR] = 'isPerformanceClass' 2420 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2421 2422 data, _ = self.__read_response_from_socket() 2423 if data[_TAG_STR] != 'performanceClass': 2424 raise error_util.CameraItsError('Failed to query performance class') 2425 return data[_STR_VALUE_STR] == 'true' 2426 2427 def is_vic_performance_class(self): 2428 """Return whether the mobile device is VIC performance class device. 2429 """ 2430 cmd = {} 2431 cmd[_CMD_NAME_STR] = 'isVicPerformanceClass' 2432 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2433 2434 data, _ = self.__read_response_from_socket() 2435 if data[_TAG_STR] != 'vicPerformanceClass': 2436 raise error_util.CameraItsError('Failed to query performance class') 2437 return data[_STR_VALUE_STR] == 'true' 2438 2439 def measure_camera_launch_ms(self): 2440 """Measure camera launch latency in millisecond, from open to first frame. 2441 2442 Returns: 2443 Camera launch latency from camera open to receipt of first frame 2444 """ 2445 cmd = {} 2446 cmd[_CMD_NAME_STR] = 'measureCameraLaunchMs' 2447 cmd[_CAMERA_ID_STR] = self._camera_id 2448 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2449 2450 timeout = self.SOCK_TIMEOUT_FOR_PERF_MEASURE 2451 self.sock.settimeout(timeout) 2452 data, _ = self.__read_response_from_socket() 2453 self.sock.settimeout(self.SOCK_TIMEOUT) 2454 2455 if data[_TAG_STR] != 'cameraLaunchMs': 2456 raise error_util.CameraItsError('Failed to measure camera launch latency') 2457 return float(data[_STR_VALUE_STR]) 2458 2459 def measure_camera_1080p_jpeg_capture_ms(self): 2460 """Measure camera 1080P jpeg capture latency in milliseconds. 2461 2462 Returns: 2463 Camera jpeg capture latency in milliseconds 2464 """ 2465 cmd = {} 2466 cmd[_CMD_NAME_STR] = 'measureCamera1080pJpegCaptureMs' 2467 cmd[_CAMERA_ID_STR] = self._camera_id 2468 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2469 2470 timeout = self.SOCK_TIMEOUT_FOR_PERF_MEASURE 2471 self.sock.settimeout(timeout) 2472 data, _ = self.__read_response_from_socket() 2473 self.sock.settimeout(self.SOCK_TIMEOUT) 2474 2475 if data[_TAG_STR] != 'camera1080pJpegCaptureMs': 2476 raise error_util.CameraItsError( 2477 'Failed to measure camera 1080p jpeg capture latency') 2478 return float(data[_STR_VALUE_STR]) 2479 2480 def _camera_id_to_props(self): 2481 """Return the properties of each camera ID.""" 2482 unparsed_ids = self.get_camera_ids().get('cameraIdArray', []) 2483 parsed_ids = parse_camera_ids(unparsed_ids) 2484 id_to_props = {} 2485 for unparsed_id, id_combo in zip(unparsed_ids, parsed_ids): 2486 if id_combo.sub_id is None: 2487 props = self.get_camera_properties_by_id(id_combo.id) 2488 else: 2489 props = self.get_camera_properties_by_id(id_combo.sub_id) 2490 id_to_props[unparsed_id] = props 2491 if not id_to_props: 2492 raise AssertionError('No camera IDs were found.') 2493 return id_to_props 2494 2495 def has_ultrawide_camera(self, facing): 2496 """Return if device has an ultrawide camera facing the same direction. 2497 2498 Args: 2499 facing: constant describing the direction the camera device lens faces. 2500 2501 Returns: 2502 True if the device has an ultrawide camera facing in that direction. 2503 """ 2504 camera_ids = self.get_camera_ids() 2505 primary_rear_camera_id = camera_ids.get('primaryRearCameraId', '') 2506 primary_front_camera_id = camera_ids.get('primaryFrontCameraId', '') 2507 if facing == camera_properties_utils.LENS_FACING['BACK']: 2508 primary_camera_id = primary_rear_camera_id 2509 elif facing == camera_properties_utils.LENS_FACING['FRONT']: 2510 primary_camera_id = primary_front_camera_id 2511 else: 2512 raise NotImplementedError('Cameras not facing either front or back ' 2513 'are currently unsupported.') 2514 id_to_props = self._camera_id_to_props() 2515 fov_and_facing = collections.namedtuple('FovAndFacing', ['fov', 'facing']) 2516 id_to_fov_facing = { 2517 unparsed_id: fov_and_facing( 2518 self.calc_camera_fov(props), props['android.lens.facing'] 2519 ) 2520 for unparsed_id, props in id_to_props.items() 2521 } 2522 logging.debug('IDs to (FOVs, facing): %s', id_to_fov_facing) 2523 primary_camera_fov, primary_camera_facing = id_to_fov_facing[ 2524 primary_camera_id] 2525 for unparsed_id, fov_facing_combo in id_to_fov_facing.items(): 2526 if (float(fov_facing_combo.fov) > float(primary_camera_fov) and 2527 fov_facing_combo.facing == primary_camera_facing and 2528 unparsed_id != primary_camera_id): 2529 logging.debug('Ultrawide camera found with ID %s and FoV %.3f. ' 2530 'Primary camera has ID %s and FoV: %.3f.', 2531 unparsed_id, float(fov_facing_combo.fov), 2532 primary_camera_id, float(primary_camera_fov)) 2533 return True 2534 return False 2535 2536 def get_facing_to_ids(self): 2537 """Returns mapping from lens facing to list of corresponding camera IDs.""" 2538 id_to_props = self._camera_id_to_props() 2539 facing_to_ids = collections.defaultdict(list) 2540 for unparsed_id, props in id_to_props.items(): 2541 facing_to_ids[props['android.lens.facing']].append(unparsed_id) 2542 for ids in facing_to_ids.values(): 2543 ids.sort() 2544 logging.debug('Facing to camera IDs: %s', facing_to_ids) 2545 return facing_to_ids 2546 2547 def is_low_light_boost_available(self, camera_id, extension=-1): 2548 """Checks if low light boost is available for camera id and extension. 2549 2550 If the extension is not provided (or -1) then low light boost support is 2551 checked for a camera2 session. 2552 2553 Args: 2554 camera_id: int; device ID 2555 extension: int; extension type 2556 Returns: 2557 True if low light boost is available and false otherwise. 2558 """ 2559 cmd = { 2560 'cmdName': 'isLowLightBoostAvailable', 2561 'cameraId': camera_id, 2562 'extension': extension 2563 } 2564 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2565 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 2566 self.sock.settimeout(timeout) 2567 data, _ = self.__read_response_from_socket() 2568 if data['tag'] != 'isLowLightBoostAvailable': 2569 raise error_util.CameraItsError('Invalid command response') 2570 return data[_STR_VALUE_STR] == 'true' 2571 2572 def do_capture_preview_frame(self, 2573 camera_id, 2574 preview_size, 2575 frame_num=0, 2576 extension=-1, 2577 cap_request={}): 2578 """Captures the nth preview frame from the preview stream. 2579 2580 By default the 0th frame is the first frame. The extension type can also be 2581 provided or -1 to use Camera2 which is the default. 2582 2583 Args: 2584 camera_id: int; device ID 2585 preview_size: int; preview size 2586 frame_num: int; frame number to capture 2587 extension: int; extension type 2588 cap_request: dict; python dict specifying the key/value pair of capture 2589 request keys, which will be converted to JSON and sent to the device. 2590 Returns: 2591 Single JPEG frame capture as numpy array of bytes 2592 """ 2593 cmd = { 2594 'cmdName': 'doCapturePreviewFrame', 2595 'cameraId': camera_id, 2596 'previewSize': preview_size, 2597 'frameNum': frame_num, 2598 'extension': extension, 2599 'captureRequest': cap_request, 2600 } 2601 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 2602 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 2603 self.sock.settimeout(timeout) 2604 data, buf = self.__read_response_from_socket() 2605 if data[_TAG_STR] != 'jpegImage': 2606 raise error_util.CameraItsError('Invalid command response') 2607 return buf 2608 2609 def preview_surface(self, size, hlg10_enabled=False): 2610 """Create a surface dictionary based on size and hdr-ness. 2611 2612 Args: 2613 size: str, Resolution of an output surface. ex. "1920x1080" 2614 hlg10_enabled: boolean; Whether the output is hlg10 or not. 2615 2616 Returns: 2617 a dictionary object containing format, size, and hdr-ness. 2618 """ 2619 surface = { 2620 'format': 'priv', 2621 'width': int(size.split('x')[0]), 2622 'height': int(size.split('x')[1]), 2623 'hlg10': hlg10_enabled 2624 } 2625 if self._hidden_physical_id: 2626 surface['physicalCamera'] = self._hidden_physical_id 2627 return [surface] 2628 2629 2630def parse_camera_ids(ids): 2631 """Parse the string of camera IDs into array of CameraIdCombo tuples. 2632 2633 Args: 2634 ids: List of camera ids. 2635 2636 Returns: 2637 Array of CameraIdCombo 2638 """ 2639 camera_id_combo = collections.namedtuple('CameraIdCombo', ['id', 'sub_id']) 2640 id_combos = [] 2641 for one_id in ids: 2642 one_combo = one_id.split(SUB_CAMERA_SEPARATOR) 2643 if len(one_combo) == 1: 2644 id_combos.append(camera_id_combo(one_combo[0], None)) 2645 elif len(one_combo) == 2: 2646 id_combos.append(camera_id_combo(one_combo[0], one_combo[1])) 2647 else: 2648 raise AssertionError('Camera id parameters must be either ID or ' 2649 f'ID{SUB_CAMERA_SEPARATOR}SUB_ID') 2650 return id_combos 2651 2652 2653def do_capture_with_latency(cam, req, sync_latency, fmt=None): 2654 """Helper function to take enough frames to allow sync latency. 2655 2656 Args: 2657 cam: camera object 2658 req: request for camera 2659 sync_latency: integer number of frames 2660 fmt: format for the capture 2661 Returns: 2662 single capture with the unsettled frames discarded 2663 """ 2664 caps = cam.do_capture([req]*(sync_latency+1), fmt) 2665 return caps[-1] 2666 2667 2668def load_scene(cam, props, scene, tablet, chart_distance, lighting_check=True, 2669 log_path=None): 2670 """Load the scene for the camera based on the FOV. 2671 2672 Args: 2673 cam: camera object 2674 props: camera properties 2675 scene: scene to be loaded 2676 tablet: tablet to load scene on 2677 chart_distance: distance to tablet 2678 lighting_check: Boolean for lighting check enabled 2679 log_path: [Optional] path to store artifacts 2680 """ 2681 if not tablet: 2682 logging.info('Manual run: no tablet to load scene on.') 2683 return 2684 # Calculate camera_fov, which determines the image/video to load on tablet. 2685 camera_fov = cam.calc_camera_fov(props) 2686 file_name = cam.get_file_name_to_load(chart_distance, camera_fov, scene) 2687 if 'scene' not in file_name: 2688 file_name = f'scene{file_name}' 2689 if scene in VIDEO_SCENES: 2690 root_file_name, _ = os.path.splitext(file_name) 2691 file_name = root_file_name + '.mp4' 2692 logging.debug('Displaying %s on the tablet', file_name) 2693 2694 # Display the image/video on the tablet using the default media player. 2695 view_file_type = 'image/png' if scene not in VIDEO_SCENES else 'video/mp4' 2696 uri_prefix = 'file://mnt' if scene not in VIDEO_SCENES else '' 2697 tablet.adb.shell( 2698 f'am start -a android.intent.action.VIEW -t {view_file_type} ' 2699 f'-d {uri_prefix}/sdcard/Download/{file_name}') 2700 time.sleep(LOAD_SCENE_DELAY_SEC) 2701 rfov_camera_in_rfov_box = ( 2702 math.isclose( 2703 chart_distance, 2704 opencv_processing_utils.CHART_DISTANCE_31CM, rel_tol=0.1) and 2705 opencv_processing_utils.FOV_THRESH_TELE <= float(camera_fov) 2706 <= opencv_processing_utils.FOV_THRESH_UW) 2707 wfov_camera_in_wfov_box = ( 2708 math.isclose( 2709 chart_distance, 2710 opencv_processing_utils.CHART_DISTANCE_22CM, rel_tol=0.1) and 2711 float(camera_fov) > opencv_processing_utils.FOV_THRESH_UW) 2712 if (rfov_camera_in_rfov_box or wfov_camera_in_wfov_box) and lighting_check: 2713 cam.do_3a() 2714 cap = cam.do_capture( 2715 capture_request_utils.auto_capture_request(), cam.CAP_YUV) 2716 y_plane, _, _ = image_processing_utils.convert_capture_to_planes(cap) 2717 validate_lighting(y_plane, scene, log_path=log_path, fov=float(camera_fov)) 2718 2719 2720def copy_scenes_to_tablet(scene, tablet_id): 2721 """Copies scenes onto the tablet before running the tests. 2722 2723 Args: 2724 scene: Name of the scene to copy image files. 2725 tablet_id: device id of tablet 2726 """ 2727 logging.info('Copying files to tablet: %s', tablet_id) 2728 scene_path = os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests', scene) 2729 scene_dir = os.listdir(scene_path) 2730 for file_name in scene_dir: 2731 if file_name.endswith('.png') or file_name.endswith('.mp4'): 2732 src_scene_file = os.path.join(scene_path, file_name) 2733 cmd = f'adb -s {tablet_id} push {src_scene_file} {_DST_SCENE_DIR}' 2734 subprocess.Popen(cmd.split()) 2735 time.sleep(_COPY_SCENE_DELAY_SEC) 2736 logging.info('Finished copying files to tablet.') 2737 2738 2739def validate_lighting(y_plane, scene, state='ON', log_path=None, 2740 tablet_state='ON', fov=None): 2741 """Validates the lighting level in scene corners based on empirical values. 2742 2743 Args: 2744 y_plane: Y plane of YUV image 2745 scene: scene name 2746 state: string 'ON' or 'OFF' 2747 log_path: [Optional] path to store artifacts 2748 tablet_state: string 'ON' or 'OFF' 2749 fov: [Optional] float, calculated camera FoV 2750 2751 Returns: 2752 boolean True if lighting validated, else raise AssertionError 2753 """ 2754 logging.debug('Validating lighting levels.') 2755 file_name = f'validate_lighting_{scene}.jpg' 2756 if log_path: 2757 file_name = os.path.join(log_path, f'validate_lighting_{scene}.jpg') 2758 2759 if tablet_state == 'OFF': 2760 validate_lighting_thresh = _VALIDATE_LIGHTING_THRESH_DARK 2761 else: 2762 validate_lighting_thresh = _VALIDATE_LIGHTING_THRESH 2763 2764 validate_lighting_regions = _VALIDATE_LIGHTING_REGIONS 2765 if fov and fov > _VALIDATE_LIGHTING_MACRO_FOV_THRESH: 2766 validate_lighting_regions = _VALIDATE_LIGHTING_REGIONS_MODULAR_UW 2767 2768 # Test patches from each corner. 2769 for location, coordinates in validate_lighting_regions.items(): 2770 patch = image_processing_utils.get_image_patch( 2771 y_plane, coordinates[0], coordinates[1], 2772 _VALIDATE_LIGHTING_PATCH_W, _VALIDATE_LIGHTING_PATCH_H) 2773 y_mean = image_processing_utils.compute_image_means(patch)[0] 2774 logging.debug('%s corner Y mean: %.3f', location, y_mean) 2775 if state == 'ON': 2776 if y_mean > validate_lighting_thresh: 2777 logging.debug('Lights ON in test rig.') 2778 return True 2779 else: 2780 image_processing_utils.write_image(y_plane, file_name) 2781 raise AssertionError('Lights OFF in test rig. Turn ON and retry.') 2782 elif state == 'OFF': 2783 if y_mean < validate_lighting_thresh: 2784 logging.debug('Lights OFF in test rig.') 2785 return True 2786 else: 2787 image_processing_utils.write_image(y_plane, file_name) 2788 raise AssertionError('Lights ON in test rig. Turn OFF and retry.') 2789 else: 2790 raise AssertionError('Invalid lighting state string. ' 2791 "Valid strings: 'ON', 'OFF'.") 2792 2793 2794def get_build_sdk_version(device_id): 2795 """Return the int build version of the device.""" 2796 cmd = f'adb -s {device_id} shell getprop ro.build.version.sdk' 2797 try: 2798 build_sdk_version = int(subprocess.check_output(cmd.split()).rstrip()) 2799 logging.debug('Build SDK version: %d', build_sdk_version) 2800 except (subprocess.CalledProcessError, ValueError) as exp_errors: 2801 raise AssertionError('No build_sdk_version.') from exp_errors 2802 return build_sdk_version 2803 2804 2805def get_first_api_level(device_id): 2806 """Return the int value for the first API level of the device.""" 2807 cmd = f'adb -s {device_id} shell getprop ro.product.first_api_level' 2808 try: 2809 first_api_level = int(subprocess.check_output(cmd.split()).rstrip()) 2810 logging.debug('First API level: %d', first_api_level) 2811 except (subprocess.CalledProcessError, ValueError): 2812 logging.error('No first_api_level. Setting to build version.') 2813 first_api_level = get_build_sdk_version(device_id) 2814 return first_api_level 2815 2816 2817def get_vendor_api_level(device_id): 2818 """Return the int value for the vendor API level of the device.""" 2819 cmd = f'adb -s {device_id} shell getprop ro.vendor.api_level' 2820 try: 2821 vendor_api_level = int(subprocess.check_output(cmd.split()).rstrip()) 2822 logging.debug('First vendor API level: %d', vendor_api_level) 2823 except (subprocess.CalledProcessError, ValueError): 2824 logging.error('No vendor_api_level. Setting to build version.') 2825 vendor_api_level = get_build_sdk_version(device_id) 2826 return vendor_api_level 2827 2828 2829def get_media_performance_class(device_id): 2830 """Return the int value for the media performance class of the device.""" 2831 cmd = (f'adb -s {device_id} shell ' 2832 'getprop ro.odm.build.media_performance_class') 2833 try: 2834 media_performance_class = int( 2835 subprocess.check_output(cmd.split()).rstrip()) 2836 logging.debug('Media performance class: %d', media_performance_class) 2837 except (subprocess.CalledProcessError, ValueError): 2838 logging.debug('No media performance class. Setting to 0.') 2839 media_performance_class = 0 2840 return media_performance_class 2841 2842 2843def raise_mpc_assertion_error(required_mpc, test_name, found_mpc): 2844 raise AssertionError(f'With MPC >= {required_mpc}, {test_name} must be run. ' 2845 f'Found MPC: {found_mpc}') 2846 2847 2848def stop_video_playback(tablet): 2849 """Force-stop activities used for video playback on the tablet. 2850 2851 Args: 2852 tablet: a controller object for the ITS tablet. 2853 """ 2854 try: 2855 activities_unencoded = tablet.adb.shell( 2856 ['dumpsys', 'activity', 'recents', '|', 2857 'grep', '"baseIntent=Intent.*act=android.intent.action"'] 2858 ) 2859 except adb.AdbError as e: 2860 logging.warning('ADB error when finding intent activities: %s. ' 2861 'Please close the default video player manually.', e) 2862 return 2863 activity_lines = ( 2864 str(activities_unencoded.decode('utf-8')).strip().splitlines() 2865 ) 2866 for activity_line in activity_lines: 2867 activity = activity_line.split('cmp=')[-1].split('/')[0] 2868 try: 2869 tablet.adb.shell(['am', 'force-stop', activity]) 2870 except adb.AdbError as e: 2871 logging.warning('ADB error when killing intent activity %s: %s. ' 2872 'Please close the default video player manually.', 2873 activity, e) 2874 2875 2876def raise_not_yet_mandated_error(message, api_level, mandated_api_level): 2877 if api_level >= mandated_api_level: 2878 raise AssertionError( 2879 f'Test is mandated for API level {mandated_api_level} or above. ' 2880 f'Found API level {api_level}.\n\n{message}' 2881 ) 2882 else: 2883 raise AssertionError(f'{NOT_YET_MANDATED_MESSAGE}\n\n{message}') 2884 2885 2886def pull_file_from_dut(dut, dut_path, log_folder): 2887 """Pulls and returns file from dut and return file name. 2888 2889 Args: 2890 dut: device under test 2891 dut_path: pull file from this path 2892 log_folder: store pulled file to this folder 2893 2894 Returns: 2895 filename of file pulled from dut 2896 """ 2897 dut.adb.pull([dut_path, log_folder]) 2898 file_name = (dut_path.split('/')[-1]) 2899 logging.debug('%s pulled from dut', file_name) 2900 return file_name 2901 2902 2903def remove_tmp_files(log_path, match_pattern): 2904 """Remove temp file with given directory path. 2905 2906 Args: 2907 log_path: path-like object, path of directory 2908 match_pattern: string, pattern to be matched and removed 2909 2910 Returns: 2911 List of error messages if encountering error while removing files 2912 """ 2913 temp_files = [] 2914 try: 2915 temp_files = os.listdir(log_path) 2916 except FileNotFoundError: 2917 logging.debug('/tmp directory: %s not found', log_path) 2918 for file in temp_files: 2919 if fnmatch.fnmatch(file, match_pattern): 2920 file_to_remove = os.path.join(log_path, file) 2921 try: 2922 os.remove(file_to_remove) 2923 except FileNotFoundError: 2924 logging.debug('File not found: %s', str(file)) 2925 2926 2927def remove_frame_files(dir_name, save_files_list=None): 2928 """Removes the generated frame files from test dir. 2929 2930 Args: 2931 dir_name: test directory name. 2932 save_files_list: list of files not to be removed. Default is empty list. 2933 """ 2934 if os.path.exists(dir_name): 2935 for image in glob.glob('%s/*.png' % dir_name): 2936 if save_files_list is None or image not in save_files_list: 2937 os.remove(image) 2938 2939 2940def remove_file(file_name_with_path): 2941 """Removes file at given path. 2942 2943 Args: 2944 file_name_with_path: string, filename with path. 2945 """ 2946 remove_mp4_file(file_name_with_path) 2947 2948 2949def remove_mp4_file(file_name_with_path): 2950 """Removes the mp4 file at given path. 2951 2952 Args: 2953 file_name_with_path: string, path to mp4 recording. 2954 """ 2955 try: 2956 os.remove(file_name_with_path) 2957 except FileNotFoundError: 2958 logging.debug('File not found: %s', file_name_with_path) 2959 2960 2961def check_and_update_features_tested( 2962 features_tested, hlg10, is_stabilized): 2963 """Check if the [hlg10, is_stabilized] combination is already tested. 2964 2965 Args: 2966 features_tested: The list of feature combinations already tested 2967 hlg10: boolean; Whether HLG10 is enabled 2968 is_stabilized: boolean; Whether preview stabilizatoin is enabled 2969 2970 Returns: 2971 Whether the [hlg10, is_stabilized] is already tested. 2972 """ 2973 feature_mask = 0 2974 if hlg10: feature_mask |= _BIT_HLG10 2975 if is_stabilized: feature_mask |= _BIT_STABILIZATION 2976 tested = False 2977 for tested_feature in features_tested: 2978 # Only test a combination if they aren't already a subset 2979 # of another tested combination. 2980 if (tested_feature | feature_mask) == tested_feature: 2981 tested = True 2982 break 2983 2984 if not tested: 2985 features_tested.append(feature_mask) 2986 2987 return tested 2988