1# Copyright 2013 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Utility functions to form an ItsSession and perform various camera actions. 15""" 16 17 18import collections 19import json 20import logging 21import math 22import os 23import socket 24import subprocess 25import sys 26import time 27import unicodedata 28import unittest 29 30import numpy 31 32import camera_properties_utils 33import capture_request_utils 34import error_util 35import image_processing_utils 36import opencv_processing_utils 37 38LOAD_SCENE_DELAY_SEC = 3 39SUB_CAMERA_SEPARATOR = '.' 40_VALIDATE_LIGHTING_PATCH_H = 0.05 41_VALIDATE_LIGHTING_PATCH_W = 0.05 42_VALIDATE_LIGHTING_REGIONS = { 43 'top-left': (0, 0), 44 'top-right': (0, 1-_VALIDATE_LIGHTING_PATCH_H), 45 'bottom-left': (1-_VALIDATE_LIGHTING_PATCH_W, 0), 46 'bottom-right': (1-_VALIDATE_LIGHTING_PATCH_W, 47 1-_VALIDATE_LIGHTING_PATCH_H), 48} 49_VALIDATE_LIGHTING_THRESH = 0.05 # Determined empirically from scene[1:6] tests 50 51 52class ItsSession(object): 53 """Controls a device over adb to run ITS scripts. 54 55 The script importing this module (on the host machine) prepares JSON 56 objects encoding CaptureRequests, specifying sets of parameters to use 57 when capturing an image using the Camera2 APIs. This class encapsulates 58 sending the requests to the device, monitoring the device's progress, and 59 copying the resultant captures back to the host machine when done. TCP 60 forwarded over adb is the transport mechanism used. 61 62 The device must have CtsVerifier.apk installed. 63 64 Attributes: 65 sock: The open socket. 66 """ 67 68 # Open a connection to localhost:<host_port>, forwarded to port 6000 on the 69 # device. <host_port> is determined at run-time to support multiple 70 # connected devices. 71 IPADDR = '127.0.0.1' 72 REMOTE_PORT = 6000 73 BUFFER_SIZE = 4096 74 75 # LOCK_PORT is used as a mutex lock to protect the list of forwarded ports 76 # among all processes. The script assumes LOCK_PORT is available and will 77 # try to use ports between CLIENT_PORT_START and 78 # CLIENT_PORT_START+MAX_NUM_PORTS-1 on host for ITS sessions. 79 CLIENT_PORT_START = 6000 80 MAX_NUM_PORTS = 100 81 LOCK_PORT = CLIENT_PORT_START + MAX_NUM_PORTS 82 83 # Seconds timeout on each socket operation. 84 SOCK_TIMEOUT = 20.0 85 # Additional timeout in seconds when ITS service is doing more complicated 86 # operations, for example: issuing warmup requests before actual capture. 87 EXTRA_SOCK_TIMEOUT = 5.0 88 89 PACKAGE = 'com.android.cts.verifier.camera.its' 90 INTENT_START = 'com.android.cts.verifier.camera.its.START' 91 92 # This string must be in sync with ItsService. Updated when interface 93 # between script and ItsService is changed. 94 ITS_SERVICE_VERSION = '1.0' 95 96 SEC_TO_NSEC = 1000*1000*1000.0 97 adb = 'adb -d' 98 99 # Predefine camera props. Save props extracted from the function, 100 # "get_camera_properties". 101 props = None 102 103 IMAGE_FORMAT_LIST_1 = [ 104 'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage', 105 'dngImage', 'y8Image' 106 ] 107 108 IMAGE_FORMAT_LIST_2 = [ 109 'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage', 110 'yuvImage' 111 ] 112 113 CAP_JPEG = {'format': 'jpeg'} 114 CAP_RAW = {'format': 'raw'} 115 CAP_YUV = {'format': 'yuv'} 116 CAP_RAW_YUV = [{'format': 'raw'}, {'format': 'yuv'}] 117 118 def __init_socket_port(self): 119 """Initialize the socket port for the host to forward requests to the device. 120 121 This method assumes localhost's LOCK_PORT is available and will try to 122 use ports between CLIENT_PORT_START and CLIENT_PORT_START+MAX_NUM_PORTS-1 123 """ 124 num_retries = 100 125 retry_wait_time_sec = 0.05 126 127 # Bind a socket to use as mutex lock 128 socket_lock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 129 for i in range(num_retries): 130 try: 131 socket_lock.bind((ItsSession.IPADDR, ItsSession.LOCK_PORT)) 132 break 133 except (socket.error, socket.timeout): 134 if i == num_retries - 1: 135 raise error_util.CameraItsError(self._device_id, 136 'socket lock returns error') 137 else: 138 time.sleep(retry_wait_time_sec) 139 140 # Check if a port is already assigned to the device. 141 command = 'adb forward --list' 142 proc = subprocess.Popen(command.split(), stdout=subprocess.PIPE) 143 # pylint: disable=unused-variable 144 output, error = proc.communicate() 145 port = None 146 used_ports = [] 147 for line in output.decode('utf-8').split(os.linesep): 148 # each line should be formatted as: 149 # "<device_id> tcp:<host_port> tcp:<remote_port>" 150 forward_info = line.split() 151 if len(forward_info) >= 3 and len( 152 forward_info[1]) > 4 and forward_info[1][:4] == 'tcp:' and len( 153 forward_info[2]) > 4 and forward_info[2][:4] == 'tcp:': 154 local_p = int(forward_info[1][4:]) 155 remote_p = int(forward_info[2][4:]) 156 if forward_info[ 157 0] == self._device_id and remote_p == ItsSession.REMOTE_PORT: 158 port = local_p 159 break 160 else: 161 used_ports.append(local_p) 162 163 # Find the first available port if no port is assigned to the device. 164 if port is None: 165 for p in range(ItsSession.CLIENT_PORT_START, 166 ItsSession.CLIENT_PORT_START + ItsSession.MAX_NUM_PORTS): 167 if self.check_port_availability(p, used_ports): 168 port = p 169 break 170 171 if port is None: 172 raise error_util.CameraItsError(self._device_id, 173 ' cannot find an available ' + 'port') 174 175 # Release the socket as mutex unlock 176 socket_lock.close() 177 178 # Connect to the socket 179 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 180 self.sock.connect((self.IPADDR, port)) 181 self.sock.settimeout(self.SOCK_TIMEOUT) 182 183 def check_port_availability(self, check_port, used_ports): 184 """Check if the port is available or not. 185 186 Args: 187 check_port: Port to check for availability 188 used_ports: List of used ports 189 190 Returns: 191 True if the given port is available and can be assigned to the device. 192 """ 193 if check_port not in used_ports: 194 # Try to run "adb forward" with the port 195 command = '%s forward tcp:%d tcp:%d' % \ 196 (self.adb, check_port, self.REMOTE_PORT) 197 proc = subprocess.Popen( 198 command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) 199 error = proc.communicate()[1] 200 201 # Check if there is no error 202 if error is None or error.find('error'.encode()) < 0: 203 return True 204 else: 205 return False 206 207 def __wait_for_service(self): 208 """Wait for ItsService to be ready and reboot the device if needed. 209 210 This also includes the optional reboot handling: if the user 211 provides a "reboot" or "reboot=N" arg, then reboot the device, 212 waiting for N seconds (default 30) before returning. 213 """ 214 215 for s in sys.argv[1:]: 216 if s[:6] == 'reboot': 217 duration = 30 218 if len(s) > 7 and s[6] == '=': 219 duration = int(s[7:]) 220 logging.debug('Rebooting device') 221 _run('%s reboot' % (self.adb)) 222 _run('%s wait-for-device' % (self.adb)) 223 time.sleep(duration) 224 logging.debug('Reboot complete') 225 226 # Flush logcat so following code won't be misled by previous 227 # 'ItsService ready' log. 228 _run('%s logcat -c' % (self.adb)) 229 time.sleep(1) 230 231 _run('%s shell am force-stop --user 0 %s' % (self.adb, self.PACKAGE)) 232 _run(('%s shell am start-foreground-service --user 0 -t text/plain ' 233 '-a %s') % (self.adb, self.INTENT_START)) 234 235 # Wait until the socket is ready to accept a connection. 236 proc = subprocess.Popen( 237 self.adb.split() + ['logcat'], stdout=subprocess.PIPE) 238 logcat = proc.stdout 239 while True: 240 line = logcat.readline().strip() 241 if line.find(b'ItsService ready') >= 0: 242 break 243 proc.kill() 244 proc.communicate() 245 246 def __init__(self, device_id=None, camera_id=None, hidden_physical_id=None): 247 self._camera_id = camera_id 248 self._device_id = device_id 249 self._hidden_physical_id = hidden_physical_id 250 251 # Initialize device id and adb command. 252 self.adb = 'adb -s ' + self._device_id 253 self.__wait_for_service() 254 self.__init_socket_port() 255 256 def __enter__(self): 257 self.__close_camera() 258 self.__open_camera() 259 return self 260 261 def __exit__(self, exec_type, exec_value, exec_traceback): 262 if hasattr(self, 'sock') and self.sock: 263 self.__close_camera() 264 self.sock.close() 265 return False 266 267 def override_with_hidden_physical_camera_props(self, props): 268 """Check that it is a valid sub-camera backing the logical camera. 269 270 If current session is for a hidden physical camera, check that it is a valid 271 sub-camera backing the logical camera, override self.props, and return the 272 characteristics of sub-camera. Otherwise, return "props" directly. 273 274 Args: 275 props: Camera properties object. 276 277 Returns: 278 The properties of the hidden physical camera if possible. 279 """ 280 if self._hidden_physical_id: 281 if not camera_properties_utils.logical_multi_camera(props): 282 raise AssertionError(f'{self._camera_id} is not a logical multi-camera') 283 physical_ids = camera_properties_utils.logical_multi_camera_physical_ids( 284 props) 285 if self._hidden_physical_id not in physical_ids: 286 raise AssertionError(f'{self._hidden_physical_id} is not a hidden ' 287 f'sub-camera of {self._camera_id}') 288 props = self.get_camera_properties_by_id(self._hidden_physical_id) 289 self.props = props 290 return props 291 292 def get_camera_properties(self): 293 """Get the camera properties object for the device. 294 295 Returns: 296 The Python dictionary object for the CameraProperties object. 297 """ 298 cmd = {} 299 cmd['cmdName'] = 'getCameraProperties' 300 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 301 data, _ = self.__read_response_from_socket() 302 if data['tag'] != 'cameraProperties': 303 raise error_util.CameraItsError('Invalid command response') 304 self.props = data['objValue']['cameraProperties'] 305 return data['objValue']['cameraProperties'] 306 307 def get_camera_properties_by_id(self, camera_id): 308 """Get the camera properties object for device with camera_id. 309 310 Args: 311 camera_id: The ID string of the camera 312 313 Returns: 314 The Python dictionary object for the CameraProperties object. Empty 315 if no such device exists. 316 """ 317 cmd = {} 318 cmd['cmdName'] = 'getCameraPropertiesById' 319 cmd['cameraId'] = camera_id 320 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 321 data, _ = self.__read_response_from_socket() 322 if data['tag'] != 'cameraProperties': 323 raise error_util.CameraItsError('Invalid command response') 324 return data['objValue']['cameraProperties'] 325 326 def __read_response_from_socket(self): 327 """Reads a line (newline-terminated) string serialization of JSON object. 328 329 Returns: 330 Deserialized json obj. 331 """ 332 chars = [] 333 while not chars or chars[-1] != '\n': 334 ch = self.sock.recv(1).decode('utf-8') 335 if not ch: 336 # Socket was probably closed; otherwise don't get empty strings 337 raise error_util.CameraItsError('Problem with socket on device side') 338 chars.append(ch) 339 line = ''.join(chars) 340 jobj = json.loads(line) 341 # Optionally read a binary buffer of a fixed size. 342 buf = None 343 if 'bufValueSize' in jobj: 344 n = jobj['bufValueSize'] 345 buf = bytearray(n) 346 view = memoryview(buf) 347 while n > 0: 348 nbytes = self.sock.recv_into(view, n) 349 view = view[nbytes:] 350 n -= nbytes 351 buf = numpy.frombuffer(buf, dtype=numpy.uint8) 352 return jobj, buf 353 354 def __open_camera(self): 355 """Get the camera ID to open if it is an argument as a single camera. 356 357 This allows passing camera=# to individual tests at command line 358 and camera=#,#,# or an no camera argv with tools/run_all_tests.py. 359 In case the camera is a logical multi-camera, to run ITS on the 360 hidden physical sub-camera, pass camera=[logical ID]:[physical ID] 361 to an individual test at the command line, and same applies to multiple 362 camera IDs for tools/run_all_tests.py: camera=#,#:#,#:#,# 363 """ 364 if not self._camera_id: 365 self._camera_id = 0 366 for s in sys.argv[1:]: 367 if s[:7] == 'camera=' and len(s) > 7: 368 camera_ids = s[7:].split(',') 369 camera_id_combos = parse_camera_ids(camera_ids) 370 if len(camera_id_combos) == 1: 371 self._camera_id = camera_id_combos[0].id 372 self._hidden_physical_id = camera_id_combos[0].sub_id 373 374 logging.debug('Opening camera: %s', self._camera_id) 375 cmd = {'cmdName': 'open', 'cameraId': self._camera_id} 376 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 377 data, _ = self.__read_response_from_socket() 378 if data['tag'] != 'cameraOpened': 379 raise error_util.CameraItsError('Invalid command response') 380 381 def __close_camera(self): 382 cmd = {'cmdName': 'close'} 383 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 384 data, _ = self.__read_response_from_socket() 385 if data['tag'] != 'cameraClosed': 386 raise error_util.CameraItsError('Invalid command response') 387 388 def get_sensors(self): 389 """Get all sensors on the device. 390 391 Returns: 392 A Python dictionary that returns keys and booleans for each sensor. 393 """ 394 cmd = {} 395 cmd['cmdName'] = 'checkSensorExistence' 396 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 397 data, _ = self.__read_response_from_socket() 398 if data['tag'] != 'sensorExistence': 399 raise error_util.CameraItsError('Invalid response for command: %s' % 400 cmd['cmdName']) 401 return data['objValue'] 402 403 def start_sensor_events(self): 404 """Start collecting sensor events on the device. 405 406 See get_sensor_events for more info. 407 408 Returns: 409 Nothing. 410 """ 411 cmd = {} 412 cmd['cmdName'] = 'startSensorEvents' 413 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 414 data, _ = self.__read_response_from_socket() 415 if data['tag'] != 'sensorEventsStarted': 416 raise error_util.CameraItsError('Invalid response for command: %s' % 417 cmd['cmdName']) 418 419 def get_sensor_events(self): 420 """Get a trace of all sensor events on the device. 421 422 The trace starts when the start_sensor_events function is called. If 423 the test runs for a long time after this call, then the device's 424 internal memory can fill up. Calling get_sensor_events gets all events 425 from the device, and then stops the device from collecting events and 426 clears the internal buffer; to start again, the start_sensor_events 427 call must be used again. 428 429 Events from the accelerometer, compass, and gyro are returned; each 430 has a timestamp and x,y,z values. 431 432 Note that sensor events are only produced if the device isn't in its 433 standby mode (i.e.) if the screen is on. 434 435 Returns: 436 A Python dictionary with three keys ("accel", "mag", "gyro") each 437 of which maps to a list of objects containing "time","x","y","z" 438 keys. 439 """ 440 cmd = {} 441 cmd['cmdName'] = 'getSensorEvents' 442 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 443 timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT 444 self.sock.settimeout(timeout) 445 data, _ = self.__read_response_from_socket() 446 if data['tag'] != 'sensorEvents': 447 raise error_util.CameraItsError('Invalid response for command: %s ' % 448 cmd['cmdName']) 449 self.sock.settimeout(self.SOCK_TIMEOUT) 450 return data['objValue'] 451 452 def do_capture(self, 453 cap_request, 454 out_surfaces=None, 455 reprocess_format=None, 456 repeat_request=None): 457 """Issue capture request(s), and read back the image(s) and metadata. 458 459 The main top-level function for capturing one or more images using the 460 device. Captures a single image if cap_request is a single object, and 461 captures a burst if it is a list of objects. 462 463 The optional repeat_request field can be used to assign a repeating 464 request list ran in background for 3 seconds to warm up the capturing 465 pipeline before start capturing. The repeat_requests will be ran on a 466 640x480 YUV surface without sending any data back. The caller needs to 467 make sure the stream configuration defined by out_surfaces and 468 repeat_request are valid or do_capture may fail because device does not 469 support such stream configuration. 470 471 The out_surfaces field can specify the width(s), height(s), and 472 format(s) of the captured image. The formats may be "yuv", "jpeg", 473 "dng", "raw", "raw10", "raw12", "rawStats" or "y8". The default is a 474 YUV420 frame ("yuv") corresponding to a full sensor frame. 475 476 Optionally the out_surfaces field can specify physical camera id(s) if 477 the current camera device is a logical multi-camera. The physical camera 478 id must refer to a physical camera backing this logical camera device. 479 480 Note that one or more surfaces can be specified, allowing a capture to 481 request images back in multiple formats (e.g.) raw+yuv, raw+jpeg, 482 yuv+jpeg, raw+yuv+jpeg. If the size is omitted for a surface, the 483 default is the largest resolution available for the format of that 484 surface. At most one output surface can be specified for a given format, 485 and raw+dng, raw10+dng, and raw+raw10 are not supported as combinations. 486 487 If reprocess_format is not None, for each request, an intermediate 488 buffer of the given reprocess_format will be captured from camera and 489 the intermediate buffer will be reprocessed to the output surfaces. The 490 following settings will be turned off when capturing the intermediate 491 buffer and will be applied when reprocessing the intermediate buffer. 492 1. android.noiseReduction.mode 493 2. android.edge.mode 494 3. android.reprocess.effectiveExposureFactor 495 496 Supported reprocess format are "yuv" and "private". Supported output 497 surface formats when reprocessing is enabled are "yuv" and "jpeg". 498 499 Example of a single capture request: 500 501 { 502 "android.sensor.exposureTime": 100*1000*1000, 503 "android.sensor.sensitivity": 100 504 } 505 506 Example of a list of capture requests: 507 [ 508 { 509 "android.sensor.exposureTime": 100*1000*1000, 510 "android.sensor.sensitivity": 100 511 }, 512 { 513 "android.sensor.exposureTime": 100*1000*1000, 514 "android.sensor.sensitivity": 200 515 } 516 ] 517 518 Example of output surface specifications: 519 { 520 "width": 640, 521 "height": 480, 522 "format": "yuv" 523 } 524 [ 525 { 526 "format": "jpeg" 527 }, 528 { 529 "format": "raw" 530 } 531 ] 532 533 The following variables defined in this class are shortcuts for 534 specifying one or more formats where each output is the full size for 535 that format; they can be used as values for the out_surfaces arguments: 536 537 CAP_RAW 538 CAP_DNG 539 CAP_YUV 540 CAP_JPEG 541 CAP_RAW_YUV 542 CAP_DNG_YUV 543 CAP_RAW_JPEG 544 CAP_DNG_JPEG 545 CAP_YUV_JPEG 546 CAP_RAW_YUV_JPEG 547 CAP_DNG_YUV_JPEG 548 549 If multiple formats are specified, then this function returns multiple 550 capture objects, one for each requested format. If multiple formats and 551 multiple captures (i.e. a burst) are specified, then this function 552 returns multiple lists of capture objects. In both cases, the order of 553 the returned objects matches the order of the requested formats in the 554 out_surfaces parameter. For example: 555 556 yuv_cap = do_capture(req1) 557 yuv_cap = do_capture(req1,yuv_fmt) 558 yuv_cap, raw_cap = do_capture(req1, [yuv_fmt,raw_fmt]) 559 yuv_caps = do_capture([req1,req2], yuv_fmt) 560 yuv_caps, raw_caps = do_capture([req1,req2], [yuv_fmt,raw_fmt]) 561 562 The "rawStats" format processes the raw image and returns a new image 563 of statistics from the raw image. The format takes additional keys, 564 "gridWidth" and "gridHeight" which are size of grid cells in a 2D grid 565 of the raw image. For each grid cell, the mean and variance of each raw 566 channel is computed, and the do_capture call returns two 4-element float 567 images of dimensions (rawWidth / gridWidth, rawHeight / gridHeight), 568 concatenated back-to-back, where the first image contains the 4-channel 569 means and the second contains the 4-channel variances. Note that only 570 pixels in the active array crop region are used; pixels outside this 571 region (for example optical black rows) are cropped out before the 572 gridding and statistics computation is performed. 573 574 For the rawStats format, if the gridWidth is not provided then the raw 575 image width is used as the default, and similarly for gridHeight. With 576 this, the following is an example of a output description that computes 577 the mean and variance across each image row: 578 { 579 "gridHeight": 1, 580 "format": "rawStats" 581 } 582 583 Args: 584 cap_request: The Python dict/list specifying the capture(s), which will be 585 converted to JSON and sent to the device. 586 out_surfaces: (Optional) specifications of the output image formats and 587 sizes to use for each capture. 588 reprocess_format: (Optional) The reprocessing format. If not 589 None,reprocessing will be enabled. 590 repeat_request: Repeating request list. 591 592 Returns: 593 An object, list of objects, or list of lists of objects, where each 594 object contains the following fields: 595 * data: the image data as a numpy array of bytes. 596 * width: the width of the captured image. 597 * height: the height of the captured image. 598 * format: image the format, in [ 599 "yuv","jpeg","raw","raw10","raw12","rawStats","dng"]. 600 * metadata: the capture result object (Python dictionary). 601 """ 602 cmd = {} 603 if reprocess_format is not None: 604 if repeat_request is not None: 605 raise error_util.CameraItsError( 606 'repeating request + reprocessing is not supported') 607 cmd['cmdName'] = 'doReprocessCapture' 608 cmd['reprocessFormat'] = reprocess_format 609 else: 610 cmd['cmdName'] = 'doCapture' 611 612 if repeat_request is None: 613 cmd['repeatRequests'] = [] 614 elif not isinstance(repeat_request, list): 615 cmd['repeatRequests'] = [repeat_request] 616 else: 617 cmd['repeatRequests'] = repeat_request 618 619 if not isinstance(cap_request, list): 620 cmd['captureRequests'] = [cap_request] 621 else: 622 cmd['captureRequests'] = cap_request 623 624 if out_surfaces is not None: 625 if not isinstance(out_surfaces, list): 626 cmd['outputSurfaces'] = [out_surfaces] 627 else: 628 cmd['outputSurfaces'] = out_surfaces 629 formats = [ 630 c['format'] if 'format' in c else 'yuv' for c in cmd['outputSurfaces'] 631 ] 632 formats = [s if s != 'jpg' else 'jpeg' for s in formats] 633 else: 634 max_yuv_size = capture_request_utils.get_available_output_sizes( 635 'yuv', self.props)[0] 636 formats = ['yuv'] 637 cmd['outputSurfaces'] = [{ 638 'format': 'yuv', 639 'width': max_yuv_size[0], 640 'height': max_yuv_size[1] 641 }] 642 643 ncap = len(cmd['captureRequests']) 644 nsurf = 1 if out_surfaces is None else len(cmd['outputSurfaces']) 645 646 cam_ids = [] 647 bufs = {} 648 yuv_bufs = {} 649 for i, s in enumerate(cmd['outputSurfaces']): 650 if self._hidden_physical_id: 651 s['physicalCamera'] = self._hidden_physical_id 652 653 if 'physicalCamera' in s: 654 cam_id = s['physicalCamera'] 655 else: 656 cam_id = self._camera_id 657 658 if cam_id not in cam_ids: 659 cam_ids.append(cam_id) 660 bufs[cam_id] = { 661 'raw': [], 662 'raw10': [], 663 'raw12': [], 664 'rawStats': [], 665 'dng': [], 666 'jpeg': [], 667 'y8': [] 668 } 669 670 for cam_id in cam_ids: 671 # Only allow yuv output to multiple targets 672 if cam_id == self._camera_id: 673 yuv_surfaces = [ 674 s for s in cmd['outputSurfaces'] 675 if s['format'] == 'yuv' and 'physicalCamera' not in s 676 ] 677 formats_for_id = [ 678 s['format'] 679 for s in cmd['outputSurfaces'] 680 if 'physicalCamera' not in s 681 ] 682 else: 683 yuv_surfaces = [ 684 s for s in cmd['outputSurfaces'] if s['format'] == 'yuv' and 685 'physicalCamera' in s and s['physicalCamera'] == cam_id 686 ] 687 formats_for_id = [ 688 s['format'] 689 for s in cmd['outputSurfaces'] 690 if 'physicalCamera' in s and s['physicalCamera'] == cam_id 691 ] 692 693 n_yuv = len(yuv_surfaces) 694 # Compute the buffer size of YUV targets 695 yuv_maxsize_1d = 0 696 for s in yuv_surfaces: 697 if ('width' not in s and 'height' not in s): 698 if self.props is None: 699 raise error_util.CameraItsError('Camera props are unavailable') 700 yuv_maxsize_2d = capture_request_utils.get_available_output_sizes( 701 'yuv', self.props)[0] 702 # YUV420 size = 1.5 bytes per pixel 703 yuv_maxsize_1d = (yuv_maxsize_2d[0] * yuv_maxsize_2d[1] * 3) // 2 704 break 705 yuv_sizes = [ 706 (c['width'] * c['height'] * 3) // 2 707 if 'width' in c and 'height' in c else yuv_maxsize_1d 708 for c in yuv_surfaces 709 ] 710 # Currently we don't pass enough metadta from ItsService to distinguish 711 # different yuv stream of same buffer size 712 if len(yuv_sizes) != len(set(yuv_sizes)): 713 raise error_util.CameraItsError( 714 'ITS does not support yuv outputs of same buffer size') 715 if len(formats_for_id) > len(set(formats_for_id)): 716 if n_yuv != len(formats_for_id) - len(set(formats_for_id)) + 1: 717 raise error_util.CameraItsError('Duplicate format requested') 718 719 yuv_bufs[cam_id] = {size: [] for size in yuv_sizes} 720 721 raw_formats = 0 722 raw_formats += 1 if 'dng' in formats else 0 723 raw_formats += 1 if 'raw' in formats else 0 724 raw_formats += 1 if 'raw10' in formats else 0 725 raw_formats += 1 if 'raw12' in formats else 0 726 raw_formats += 1 if 'rawStats' in formats else 0 727 if raw_formats > 1: 728 raise error_util.CameraItsError('Different raw formats not supported') 729 730 # Detect long exposure time and set timeout accordingly 731 longest_exp_time = 0 732 for req in cmd['captureRequests']: 733 if 'android.sensor.exposureTime' in req and req[ 734 'android.sensor.exposureTime'] > longest_exp_time: 735 longest_exp_time = req['android.sensor.exposureTime'] 736 737 extended_timeout = longest_exp_time // self.SEC_TO_NSEC + self.SOCK_TIMEOUT 738 if repeat_request: 739 extended_timeout += self.EXTRA_SOCK_TIMEOUT 740 self.sock.settimeout(extended_timeout) 741 742 logging.debug('Capturing %d frame%s with %d format%s [%s]', ncap, 743 's' if ncap > 1 else '', nsurf, 's' if nsurf > 1 else '', 744 ','.join(formats)) 745 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 746 747 # Wait for ncap*nsurf images and ncap metadata responses. 748 # Assume that captures come out in the same order as requested in 749 # the burst, however individual images of different formats can come 750 # out in any order for that capture. 751 nbufs = 0 752 mds = [] 753 physical_mds = [] 754 widths = None 755 heights = None 756 while nbufs < ncap * nsurf or len(mds) < ncap: 757 json_obj, buf = self.__read_response_from_socket() 758 if json_obj['tag'] in ItsSession.IMAGE_FORMAT_LIST_1 and buf is not None: 759 fmt = json_obj['tag'][:-5] 760 bufs[self._camera_id][fmt].append(buf) 761 nbufs += 1 762 elif json_obj['tag'] == 'yuvImage': 763 buf_size = numpy.product(buf.shape) 764 yuv_bufs[self._camera_id][buf_size].append(buf) 765 nbufs += 1 766 elif json_obj['tag'] == 'captureResults': 767 mds.append(json_obj['objValue']['captureResult']) 768 physical_mds.append(json_obj['objValue']['physicalResults']) 769 outputs = json_obj['objValue']['outputs'] 770 widths = [out['width'] for out in outputs] 771 heights = [out['height'] for out in outputs] 772 else: 773 tag_string = unicodedata.normalize('NFKD', json_obj['tag']).encode( 774 'ascii', 'ignore') 775 for x in ItsSession.IMAGE_FORMAT_LIST_2: 776 x = bytes(x, encoding='utf-8') 777 if tag_string.startswith(x): 778 if x == b'yuvImage': 779 physical_id = json_obj['tag'][len(x):] 780 if physical_id in cam_ids: 781 buf_size = numpy.product(buf.shape) 782 yuv_bufs[physical_id][buf_size].append(buf) 783 nbufs += 1 784 else: 785 physical_id = json_obj['tag'][len(x):] 786 if physical_id in cam_ids: 787 fmt = x[:-5].decode('UTF-8') 788 bufs[physical_id][fmt].append(buf) 789 nbufs += 1 790 rets = [] 791 for j, fmt in enumerate(formats): 792 objs = [] 793 if 'physicalCamera' in cmd['outputSurfaces'][j]: 794 cam_id = cmd['outputSurfaces'][j]['physicalCamera'] 795 else: 796 cam_id = self._camera_id 797 798 for i in range(ncap): 799 obj = {} 800 obj['width'] = widths[j] 801 obj['height'] = heights[j] 802 obj['format'] = fmt 803 if cam_id == self._camera_id: 804 obj['metadata'] = mds[i] 805 else: 806 for physical_md in physical_mds[i]: 807 if cam_id in physical_md: 808 obj['metadata'] = physical_md[cam_id] 809 break 810 811 if fmt == 'yuv': 812 buf_size = (widths[j] * heights[j] * 3) // 2 813 obj['data'] = yuv_bufs[cam_id][buf_size][i] 814 else: 815 obj['data'] = bufs[cam_id][fmt][i] 816 objs.append(obj) 817 rets.append(objs if ncap > 1 else objs[0]) 818 self.sock.settimeout(self.SOCK_TIMEOUT) 819 if len(rets) > 1 or (isinstance(rets[0], dict) and 820 isinstance(cap_request, list)): 821 return rets 822 else: 823 return rets[0] 824 825 def do_vibrate(self, pattern): 826 """Cause the device to vibrate to a specific pattern. 827 828 Args: 829 pattern: Durations (ms) for which to turn on or off the vibrator. 830 The first value indicates the number of milliseconds to wait 831 before turning the vibrator on. The next value indicates the 832 number of milliseconds for which to keep the vibrator on 833 before turning it off. Subsequent values alternate between 834 durations in milliseconds to turn the vibrator off or to turn 835 the vibrator on. 836 837 Returns: 838 Nothing. 839 """ 840 cmd = {} 841 cmd['cmdName'] = 'doVibrate' 842 cmd['pattern'] = pattern 843 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 844 data, _ = self.__read_response_from_socket() 845 if data['tag'] != 'vibrationStarted': 846 raise error_util.CameraItsError('Invalid response for command: %s' % 847 cmd['cmdName']) 848 849 def set_audio_restriction(self, mode): 850 """Set the audio restriction mode for this camera device. 851 852 Args: 853 mode: int; the audio restriction mode. See CameraDevice.java for valid 854 value. 855 Returns: 856 Nothing. 857 """ 858 cmd = {} 859 cmd['cmdName'] = 'setAudioRestriction' 860 cmd['mode'] = mode 861 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 862 data, _ = self.__read_response_from_socket() 863 if data['tag'] != 'audioRestrictionSet': 864 raise error_util.CameraItsError('Invalid response for command: %s' % 865 cmd['cmdName']) 866 867 # pylint: disable=dangerous-default-value 868 def do_3a(self, 869 regions_ae=[[0, 0, 1, 1, 1]], 870 regions_awb=[[0, 0, 1, 1, 1]], 871 regions_af=[[0, 0, 1, 1, 1]], 872 do_ae=True, 873 do_awb=True, 874 do_af=True, 875 lock_ae=False, 876 lock_awb=False, 877 get_results=False, 878 ev_comp=0, 879 mono_camera=False): 880 """Perform a 3A operation on the device. 881 882 Triggers some or all of AE, AWB, and AF, and returns once they have 883 converged. Uses the vendor 3A that is implemented inside the HAL. 884 Note: do_awb is always enabled regardless of do_awb flag 885 886 Throws an assertion if 3A fails to converge. 887 888 Args: 889 regions_ae: List of weighted AE regions. 890 regions_awb: List of weighted AWB regions. 891 regions_af: List of weighted AF regions. 892 do_ae: Trigger AE and wait for it to converge. 893 do_awb: Wait for AWB to converge. 894 do_af: Trigger AF and wait for it to converge. 895 lock_ae: Request AE lock after convergence, and wait for it. 896 lock_awb: Request AWB lock after convergence, and wait for it. 897 get_results: Return the 3A results from this function. 898 ev_comp: An EV compensation value to use when running AE. 899 mono_camera: Boolean for monochrome camera. 900 901 Region format in args: 902 Arguments are lists of weighted regions; each weighted region is a 903 list of 5 values, [x, y, w, h, wgt], and each argument is a list of 904 these 5-value lists. The coordinates are given as normalized 905 rectangles (x, y, w, h) specifying the region. For example: 906 [[0.0, 0.0, 1.0, 0.5, 5], [0.0, 0.5, 1.0, 0.5, 10]]. 907 Weights are non-negative integers. 908 909 Returns: 910 Five values are returned if get_results is true: 911 * AE sensitivity; None if do_ae is False 912 * AE exposure time; None if do_ae is False 913 * AWB gains (list); 914 * AWB transform (list); 915 * AF focus position; None if do_af is false 916 Otherwise, it returns five None values. 917 """ 918 logging.debug('Running vendor 3A on device') 919 cmd = {} 920 cmd['cmdName'] = 'do3A' 921 cmd['regions'] = { 922 'ae': sum(regions_ae, []), 923 'awb': sum(regions_awb, []), 924 'af': sum(regions_af, []) 925 } 926 cmd['triggers'] = {'ae': do_ae, 'af': do_af} 927 if lock_ae: 928 cmd['aeLock'] = True 929 if lock_awb: 930 cmd['awbLock'] = True 931 if ev_comp != 0: 932 cmd['evComp'] = ev_comp 933 if self._hidden_physical_id: 934 cmd['physicalId'] = self._hidden_physical_id 935 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 936 937 # Wait for each specified 3A to converge. 938 ae_sens = None 939 ae_exp = None 940 awb_gains = None 941 awb_transform = None 942 af_dist = None 943 converged = False 944 while True: 945 data, _ = self.__read_response_from_socket() 946 vals = data['strValue'].split() 947 if data['tag'] == 'aeResult': 948 if do_ae: 949 ae_sens, ae_exp = [int(i) for i in vals] 950 elif data['tag'] == 'afResult': 951 if do_af: 952 af_dist = float(vals[0]) 953 elif data['tag'] == 'awbResult': 954 awb_gains = [float(f) for f in vals[:4]] 955 awb_transform = [float(f) for f in vals[4:]] 956 elif data['tag'] == '3aConverged': 957 converged = True 958 elif data['tag'] == '3aDone': 959 break 960 else: 961 raise error_util.CameraItsError('Invalid command response') 962 if converged and not get_results: 963 return None, None, None, None, None 964 if (do_ae and ae_sens is None or 965 (not mono_camera and do_awb and awb_gains is None) or 966 do_af and af_dist is None or not converged): 967 raise error_util.CameraItsError('3A failed to converge') 968 return ae_sens, ae_exp, awb_gains, awb_transform, af_dist 969 970 def calc_camera_fov(self, props): 971 """Determine the camera field of view from internal params. 972 973 Args: 974 props: Camera properties object. 975 976 Returns: 977 camera_fov: string; field of view for camera. 978 """ 979 980 focal_ls = props['android.lens.info.availableFocalLengths'] 981 if len(focal_ls) > 1: 982 logging.debug('Doing capture to determine logical camera focal length') 983 cap = self.do_capture(capture_request_utils.auto_capture_request()) 984 focal_l = cap['metadata']['android.lens.focalLength'] 985 else: 986 focal_l = focal_ls[0] 987 988 sensor_size = props['android.sensor.info.physicalSize'] 989 diag = math.sqrt(sensor_size['height']**2 + sensor_size['width']**2) 990 try: 991 fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2)) 992 except ValueError: 993 fov = str(0) 994 logging.debug('Calculated FoV: %s', fov) 995 return fov 996 997 def get_file_name_to_load(self, chart_distance, camera_fov, scene): 998 """Get the image to load on the tablet depending on fov and chart_distance. 999 1000 Args: 1001 chart_distance: float; distance in cm from camera of displayed chart 1002 camera_fov: float; camera field of view. 1003 scene: String; Scene to be used in the test. 1004 1005 Returns: 1006 file_name: file name to display on the tablet. 1007 1008 """ 1009 chart_scaling = opencv_processing_utils.calc_chart_scaling( 1010 chart_distance, camera_fov) 1011 if numpy.isclose( 1012 chart_scaling, 1013 opencv_processing_utils.SCALE_RFOV_IN_WFOV_BOX, 1014 atol=0.01): 1015 file_name = '%s_%sx_scaled.png' % ( 1016 scene, str(opencv_processing_utils.SCALE_RFOV_IN_WFOV_BOX)) 1017 elif numpy.isclose( 1018 chart_scaling, 1019 opencv_processing_utils.SCALE_TELE_IN_WFOV_BOX, 1020 atol=0.01): 1021 file_name = '%s_%sx_scaled.png' % ( 1022 scene, str(opencv_processing_utils.SCALE_TELE_IN_WFOV_BOX)) 1023 elif numpy.isclose( 1024 chart_scaling, 1025 opencv_processing_utils.SCALE_TELE25_IN_RFOV_BOX, 1026 atol=0.01): 1027 file_name = '%s_%sx_scaled.png' % ( 1028 scene, str(opencv_processing_utils.SCALE_TELE25_IN_RFOV_BOX)) 1029 elif numpy.isclose( 1030 chart_scaling, 1031 opencv_processing_utils.SCALE_TELE40_IN_RFOV_BOX, 1032 atol=0.01): 1033 file_name = '%s_%sx_scaled.png' % ( 1034 scene, str(opencv_processing_utils.SCALE_TELE40_IN_RFOV_BOX)) 1035 elif numpy.isclose( 1036 chart_scaling, 1037 opencv_processing_utils.SCALE_TELE_IN_RFOV_BOX, 1038 atol=0.01): 1039 file_name = '%s_%sx_scaled.png' % ( 1040 scene, str(opencv_processing_utils.SCALE_TELE_IN_RFOV_BOX)) 1041 else: 1042 file_name = '%s.png' % scene 1043 logging.debug('Scene to load: %s', file_name) 1044 return file_name 1045 1046 def is_stream_combination_supported(self, out_surfaces): 1047 """Query whether out_surfaces combination is supported by the camera device. 1048 1049 This function hooks up to the isSessionConfigurationSupported() camera API 1050 to query whether a particular stream combination is supported. 1051 1052 Args: 1053 out_surfaces: dict; see do_capture() for specifications on out_surfaces 1054 1055 Returns: 1056 Boolean 1057 """ 1058 cmd = {} 1059 cmd['cmdName'] = 'isStreamCombinationSupported' 1060 1061 if not isinstance(out_surfaces, list): 1062 cmd['outputSurfaces'] = [out_surfaces] 1063 else: 1064 cmd['outputSurfaces'] = out_surfaces 1065 formats = [c['format'] if 'format' in c else 'yuv' 1066 for c in cmd['outputSurfaces']] 1067 formats = [s if s != 'jpg' else 'jpeg' for s in formats] 1068 1069 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1070 1071 data, _ = self.__read_response_from_socket() 1072 if data['tag'] != 'streamCombinationSupport': 1073 raise error_util.CameraItsError('Failed to query stream combination') 1074 1075 return data['strValue'] == 'supportedCombination' 1076 1077 def is_camera_privacy_mode_supported(self): 1078 """Query whether the mobile device supports camera privacy mode. 1079 1080 This function checks whether the mobile device has FEATURE_CAMERA_TOGGLE 1081 feature support, which indicates the camera device can run in privacy mode. 1082 1083 Returns: 1084 Boolean 1085 """ 1086 cmd = {} 1087 cmd['cmdName'] = 'isCameraPrivacyModeSupported' 1088 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1089 1090 data, _ = self.__read_response_from_socket() 1091 if data['tag'] != 'cameraPrivacyModeSupport': 1092 raise error_util.CameraItsError('Failed to query camera privacy mode' 1093 ' support') 1094 return data['strValue'] == 'true' 1095 1096 def is_performance_class_primary_camera(self): 1097 """Query whether the camera device is an R or S performance class primary camera. 1098 1099 A primary rear/front facing camera is a camera device with the lowest 1100 camera Id for that facing. 1101 1102 Returns: 1103 Boolean 1104 """ 1105 cmd = {} 1106 cmd['cmdName'] = 'isPerformanceClassPrimaryCamera' 1107 cmd['cameraId'] = self._camera_id 1108 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1109 1110 data, _ = self.__read_response_from_socket() 1111 if data['tag'] != 'performanceClassPrimaryCamera': 1112 raise error_util.CameraItsError('Failed to query performance class ' 1113 'primary camera') 1114 return data['strValue'] == 'true' 1115 1116 def measure_camera_launch_ms(self): 1117 """Measure camera launch latency in millisecond, from open to first frame. 1118 1119 Returns: 1120 Camera launch latency from camera open to receipt of first frame 1121 """ 1122 cmd = {} 1123 cmd['cmdName'] = 'measureCameraLaunchMs' 1124 cmd['cameraId'] = self._camera_id 1125 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1126 1127 data, _ = self.__read_response_from_socket() 1128 if data['tag'] != 'cameraLaunchMs': 1129 raise error_util.CameraItsError('Failed to measure camera launch latency') 1130 return float(data['strValue']) 1131 1132 def measure_camera_1080p_jpeg_capture_ms(self): 1133 """Measure camera 1080P jpeg capture latency in milliseconds. 1134 1135 Returns: 1136 Camera jpeg capture latency in milliseconds 1137 """ 1138 cmd = {} 1139 cmd['cmdName'] = 'measureCamera1080pJpegCaptureMs' 1140 cmd['cameraId'] = self._camera_id 1141 self.sock.send(json.dumps(cmd).encode() + '\n'.encode()) 1142 1143 data, _ = self.__read_response_from_socket() 1144 if data['tag'] != 'camera1080pJpegCaptureMs': 1145 raise error_util.CameraItsError( 1146 'Failed to measure camera 1080p jpeg capture latency') 1147 return float(data['strValue']) 1148 1149 1150def parse_camera_ids(ids): 1151 """Parse the string of camera IDs into array of CameraIdCombo tuples. 1152 1153 Args: 1154 ids: List of camera ids. 1155 1156 Returns: 1157 Array of CameraIdCombo 1158 """ 1159 camera_id_combo = collections.namedtuple('CameraIdCombo', ['id', 'sub_id']) 1160 id_combos = [] 1161 for one_id in ids: 1162 one_combo = one_id.split(SUB_CAMERA_SEPARATOR) 1163 if len(one_combo) == 1: 1164 id_combos.append(camera_id_combo(one_combo[0], None)) 1165 elif len(one_combo) == 2: 1166 id_combos.append(camera_id_combo(one_combo[0], one_combo[1])) 1167 else: 1168 raise AssertionError('Camera id parameters must be either ID or ' 1169 f'ID{SUB_CAMERA_SEPARATOR}SUB_ID') 1170 return id_combos 1171 1172 1173def _run(cmd): 1174 """Replacement for os.system, with hiding of stdout+stderr messages. 1175 1176 Args: 1177 cmd: Command to be executed in string format. 1178 """ 1179 with open(os.devnull, 'wb') as devnull: 1180 subprocess.check_call(cmd.split(), stdout=devnull, stderr=subprocess.STDOUT) 1181 1182 1183def do_capture_with_latency(cam, req, sync_latency, fmt=None): 1184 """Helper function to take enough frames to allow sync latency. 1185 1186 Args: 1187 cam: camera object 1188 req: request for camera 1189 sync_latency: integer number of frames 1190 fmt: format for the capture 1191 Returns: 1192 single capture with the unsettled frames discarded 1193 """ 1194 caps = cam.do_capture([req]*(sync_latency+1), fmt) 1195 return caps[-1] 1196 1197 1198def load_scene(cam, props, scene, tablet, chart_distance): 1199 """Load the scene for the camera based on the FOV. 1200 1201 Args: 1202 cam: camera object 1203 props: camera properties 1204 scene: scene to be loaded 1205 tablet: tablet to load scene on 1206 chart_distance: distance to tablet 1207 """ 1208 if not tablet: 1209 logging.info('Manual run: no tablet to load scene on.') 1210 return 1211 # Calculate camera_fov which will determine the image to load on tablet. 1212 camera_fov = cam.calc_camera_fov(props) 1213 file_name = cam.get_file_name_to_load(chart_distance, camera_fov, scene) 1214 logging.debug('Displaying %s on the tablet', file_name) 1215 # Display the scene on the tablet depending on camera_fov 1216 tablet.adb.shell( 1217 'am start -a android.intent.action.VIEW -t image/png ' 1218 f'-d file://mnt/sdcard/Download/{file_name}') 1219 time.sleep(LOAD_SCENE_DELAY_SEC) 1220 rfov_camera_in_rfov_box = ( 1221 numpy.isclose( 1222 chart_distance, 1223 opencv_processing_utils.CHART_DISTANCE_RFOV, rtol=0.1) and 1224 opencv_processing_utils.FOV_THRESH_TELE <= float(camera_fov) 1225 <= opencv_processing_utils.FOV_THRESH_WFOV) 1226 wfov_camera_in_wfov_box = ( 1227 numpy.isclose( 1228 chart_distance, 1229 opencv_processing_utils.CHART_DISTANCE_WFOV, rtol=0.1) and 1230 float(camera_fov) > opencv_processing_utils.FOV_THRESH_WFOV) 1231 if rfov_camera_in_rfov_box or wfov_camera_in_wfov_box: 1232 cam.do_3a() 1233 cap = cam.do_capture( 1234 capture_request_utils.auto_capture_request(), cam.CAP_YUV) 1235 y_plane, _, _ = image_processing_utils.convert_capture_to_planes(cap) 1236 validate_lighting(y_plane, scene) 1237 1238 1239def validate_lighting(y_plane, scene): 1240 """Validates the lighting level in scene corners based on empirical values. 1241 1242 Args: 1243 y_plane: Y plane of YUV image 1244 scene: scene name 1245 Returns: 1246 boolean True if lighting validated, else raise AssertionError 1247 """ 1248 logging.debug('Validating lighting levels.') 1249 1250 # Test patches from each corner. 1251 for location, coordinates in _VALIDATE_LIGHTING_REGIONS.items(): 1252 patch = image_processing_utils.get_image_patch( 1253 y_plane, coordinates[0], coordinates[1], 1254 _VALIDATE_LIGHTING_PATCH_W, _VALIDATE_LIGHTING_PATCH_H) 1255 y_mean = image_processing_utils.compute_image_means(patch)[0] 1256 logging.debug('%s corner Y mean: %.3f', location, y_mean) 1257 if y_mean > _VALIDATE_LIGHTING_THRESH: 1258 logging.debug('Lights ON in test rig.') 1259 return True 1260 image_processing_utils.write_image(y_plane, f'validate_lighting_{scene}.jpg') 1261 raise AssertionError('Lights OFF in test rig. Please turn ON and retry.') 1262 1263 1264def get_build_sdk_version(device_id): 1265 """Return the int build version of the device.""" 1266 cmd = 'adb -s %s shell getprop ro.build.version.sdk' % device_id 1267 try: 1268 build_sdk_version = int(subprocess.check_output(cmd.split()).rstrip()) 1269 logging.debug('Build SDK version: %d', build_sdk_version) 1270 except (subprocess.CalledProcessError, ValueError): 1271 raise AssertionError('No build_sdk_version.') 1272 return build_sdk_version 1273 1274 1275def get_first_api_level(device_id): 1276 """Return the int value for the first API level of the device.""" 1277 cmd = 'adb -s %s shell getprop ro.product.first_api_level' % device_id 1278 try: 1279 first_api_level = int(subprocess.check_output(cmd.split()).rstrip()) 1280 logging.debug('First API level: %d', first_api_level) 1281 except (subprocess.CalledProcessError, ValueError): 1282 logging.error('No first_api_level. Setting to build version.') 1283 first_api_level = get_build_sdk_version(device_id) 1284 return first_api_level 1285 1286 1287class ItsSessionUtilsTests(unittest.TestCase): 1288 """Run a suite of unit tests on this module.""" 1289 1290 _BRIGHTNESS_CHECKS = (0.0, 1291 _VALIDATE_LIGHTING_THRESH-0.01, 1292 _VALIDATE_LIGHTING_THRESH, 1293 _VALIDATE_LIGHTING_THRESH+0.01, 1294 1.0) 1295 _TEST_IMG_W = 640 1296 _TEST_IMG_H = 480 1297 1298 def _generate_test_image(self, brightness): 1299 """Creates a Y plane array with pixel values of brightness. 1300 1301 Args: 1302 brightness: float between [0.0, 1.0] 1303 1304 Returns: 1305 Y plane array with elements of value brightness 1306 """ 1307 test_image = numpy.zeros((self._TEST_IMG_W, self._TEST_IMG_H, 1), 1308 dtype=float) 1309 test_image.fill(brightness) 1310 return test_image 1311 1312 def test_validate_lighting(self): 1313 """Tests validate_lighting() works correctly.""" 1314 # Run with different brightnesses to validate. 1315 for brightness in self._BRIGHTNESS_CHECKS: 1316 logging.debug('Testing validate_lighting with brightness %.1f', 1317 brightness) 1318 test_image = self._generate_test_image(brightness) 1319 print(f'Testing brightness: {brightness}') 1320 if brightness <= _VALIDATE_LIGHTING_THRESH: 1321 self.assertRaises( 1322 AssertionError, validate_lighting, test_image, 'unittest') 1323 else: 1324 self.assertTrue(validate_lighting(test_image, 'unittest'), 1325 f'image value {brightness} should PASS') 1326 1327 1328if __name__ == '__main__': 1329 unittest.main() 1330