1# Copyright 2013 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions to form an ItsSession and perform various camera actions.
15"""
16
17
18import collections
19import json
20import logging
21import math
22import os
23import socket
24import subprocess
25import sys
26import time
27import unicodedata
28import unittest
29
30import numpy
31
32import camera_properties_utils
33import capture_request_utils
34import error_util
35import image_processing_utils
36import opencv_processing_utils
37
38LOAD_SCENE_DELAY_SEC = 3
39SUB_CAMERA_SEPARATOR = '.'
40_VALIDATE_LIGHTING_PATCH_H = 0.05
41_VALIDATE_LIGHTING_PATCH_W = 0.05
42_VALIDATE_LIGHTING_REGIONS = {
43    'top-left': (0, 0),
44    'top-right': (0, 1-_VALIDATE_LIGHTING_PATCH_H),
45    'bottom-left': (1-_VALIDATE_LIGHTING_PATCH_W, 0),
46    'bottom-right': (1-_VALIDATE_LIGHTING_PATCH_W,
47                     1-_VALIDATE_LIGHTING_PATCH_H),
48}
49_VALIDATE_LIGHTING_THRESH = 0.05  # Determined empirically from scene[1:6] tests
50
51
52class ItsSession(object):
53  """Controls a device over adb to run ITS scripts.
54
55    The script importing this module (on the host machine) prepares JSON
56    objects encoding CaptureRequests, specifying sets of parameters to use
57    when capturing an image using the Camera2 APIs. This class encapsulates
58    sending the requests to the device, monitoring the device's progress, and
59    copying the resultant captures back to the host machine when done. TCP
60    forwarded over adb is the transport mechanism used.
61
62    The device must have CtsVerifier.apk installed.
63
64    Attributes:
65        sock: The open socket.
66  """
67
68  # Open a connection to localhost:<host_port>, forwarded to port 6000 on the
69  # device. <host_port> is determined at run-time to support multiple
70  # connected devices.
71  IPADDR = '127.0.0.1'
72  REMOTE_PORT = 6000
73  BUFFER_SIZE = 4096
74
75  # LOCK_PORT is used as a mutex lock to protect the list of forwarded ports
76  # among all processes. The script assumes LOCK_PORT is available and will
77  # try to use ports between CLIENT_PORT_START and
78  # CLIENT_PORT_START+MAX_NUM_PORTS-1 on host for ITS sessions.
79  CLIENT_PORT_START = 6000
80  MAX_NUM_PORTS = 100
81  LOCK_PORT = CLIENT_PORT_START + MAX_NUM_PORTS
82
83  # Seconds timeout on each socket operation.
84  SOCK_TIMEOUT = 20.0
85  # Additional timeout in seconds when ITS service is doing more complicated
86  # operations, for example: issuing warmup requests before actual capture.
87  EXTRA_SOCK_TIMEOUT = 5.0
88
89  PACKAGE = 'com.android.cts.verifier.camera.its'
90  INTENT_START = 'com.android.cts.verifier.camera.its.START'
91
92  # This string must be in sync with ItsService. Updated when interface
93  # between script and ItsService is changed.
94  ITS_SERVICE_VERSION = '1.0'
95
96  SEC_TO_NSEC = 1000*1000*1000.0
97  adb = 'adb -d'
98
99  # Predefine camera props. Save props extracted from the function,
100  # "get_camera_properties".
101  props = None
102
103  IMAGE_FORMAT_LIST_1 = [
104      'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage',
105      'dngImage', 'y8Image'
106  ]
107
108  IMAGE_FORMAT_LIST_2 = [
109      'jpegImage', 'rawImage', 'raw10Image', 'raw12Image', 'rawStatsImage',
110      'yuvImage'
111  ]
112
113  CAP_JPEG = {'format': 'jpeg'}
114  CAP_RAW = {'format': 'raw'}
115  CAP_YUV = {'format': 'yuv'}
116  CAP_RAW_YUV = [{'format': 'raw'}, {'format': 'yuv'}]
117
118  def __init_socket_port(self):
119    """Initialize the socket port for the host to forward requests to the device.
120
121    This method assumes localhost's LOCK_PORT is available and will try to
122    use ports between CLIENT_PORT_START and CLIENT_PORT_START+MAX_NUM_PORTS-1
123    """
124    num_retries = 100
125    retry_wait_time_sec = 0.05
126
127    # Bind a socket to use as mutex lock
128    socket_lock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129    for i in range(num_retries):
130      try:
131        socket_lock.bind((ItsSession.IPADDR, ItsSession.LOCK_PORT))
132        break
133      except (socket.error, socket.timeout):
134        if i == num_retries - 1:
135          raise error_util.CameraItsError(self._device_id,
136                                          'socket lock returns error')
137        else:
138          time.sleep(retry_wait_time_sec)
139
140    # Check if a port is already assigned to the device.
141    command = 'adb forward --list'
142    proc = subprocess.Popen(command.split(), stdout=subprocess.PIPE)
143    # pylint: disable=unused-variable
144    output, error = proc.communicate()
145    port = None
146    used_ports = []
147    for line  in output.decode('utf-8').split(os.linesep):
148      # each line should be formatted as:
149      # "<device_id> tcp:<host_port> tcp:<remote_port>"
150      forward_info = line.split()
151      if len(forward_info) >= 3 and len(
152          forward_info[1]) > 4 and forward_info[1][:4] == 'tcp:' and len(
153              forward_info[2]) > 4 and forward_info[2][:4] == 'tcp:':
154        local_p = int(forward_info[1][4:])
155        remote_p = int(forward_info[2][4:])
156        if forward_info[
157            0] == self._device_id and remote_p == ItsSession.REMOTE_PORT:
158          port = local_p
159          break
160        else:
161          used_ports.append(local_p)
162
163      # Find the first available port if no port is assigned to the device.
164    if port is None:
165      for p in range(ItsSession.CLIENT_PORT_START,
166                     ItsSession.CLIENT_PORT_START + ItsSession.MAX_NUM_PORTS):
167        if self.check_port_availability(p, used_ports):
168          port = p
169          break
170
171    if port is None:
172      raise error_util.CameraItsError(self._device_id,
173                                      ' cannot find an available ' + 'port')
174
175    # Release the socket as mutex unlock
176    socket_lock.close()
177
178    # Connect to the socket
179    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
180    self.sock.connect((self.IPADDR, port))
181    self.sock.settimeout(self.SOCK_TIMEOUT)
182
183  def check_port_availability(self, check_port, used_ports):
184    """Check if the port is available or not.
185
186    Args:
187      check_port: Port to check for availability
188      used_ports: List of used ports
189
190    Returns:
191     True if the given port is available and can be assigned to the device.
192    """
193    if check_port not in used_ports:
194      # Try to run "adb forward" with the port
195      command = '%s forward tcp:%d tcp:%d' % \
196                       (self.adb, check_port, self.REMOTE_PORT)
197      proc = subprocess.Popen(
198          command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
199      error = proc.communicate()[1]
200
201      # Check if there is no error
202      if error is None or error.find('error'.encode()) < 0:
203        return True
204      else:
205        return False
206
207  def __wait_for_service(self):
208    """Wait for ItsService to be ready and reboot the device if needed.
209
210    This also includes the optional reboot handling: if the user
211    provides a "reboot" or "reboot=N" arg, then reboot the device,
212    waiting for N seconds (default 30) before returning.
213    """
214
215    for s in sys.argv[1:]:
216      if s[:6] == 'reboot':
217        duration = 30
218        if len(s) > 7 and s[6] == '=':
219          duration = int(s[7:])
220        logging.debug('Rebooting device')
221        _run('%s reboot' % (self.adb))
222        _run('%s wait-for-device' % (self.adb))
223        time.sleep(duration)
224        logging.debug('Reboot complete')
225
226    # Flush logcat so following code won't be misled by previous
227    # 'ItsService ready' log.
228    _run('%s logcat -c' % (self.adb))
229    time.sleep(1)
230
231    _run('%s shell am force-stop --user 0 %s' % (self.adb, self.PACKAGE))
232    _run(('%s shell am start-foreground-service --user 0 -t text/plain '
233          '-a %s') % (self.adb, self.INTENT_START))
234
235    # Wait until the socket is ready to accept a connection.
236    proc = subprocess.Popen(
237        self.adb.split() + ['logcat'], stdout=subprocess.PIPE)
238    logcat = proc.stdout
239    while True:
240      line = logcat.readline().strip()
241      if line.find(b'ItsService ready') >= 0:
242        break
243    proc.kill()
244    proc.communicate()
245
246  def __init__(self, device_id=None, camera_id=None, hidden_physical_id=None):
247    self._camera_id = camera_id
248    self._device_id = device_id
249    self._hidden_physical_id = hidden_physical_id
250
251    # Initialize device id and adb command.
252    self.adb = 'adb -s ' + self._device_id
253    self.__wait_for_service()
254    self.__init_socket_port()
255
256  def __enter__(self):
257    self.__close_camera()
258    self.__open_camera()
259    return self
260
261  def __exit__(self, exec_type, exec_value, exec_traceback):
262    if hasattr(self, 'sock') and self.sock:
263      self.__close_camera()
264      self.sock.close()
265    return False
266
267  def override_with_hidden_physical_camera_props(self, props):
268    """Check that it is a valid sub-camera backing the logical camera.
269
270    If current session is for a hidden physical camera, check that it is a valid
271    sub-camera backing the logical camera, override self.props, and return the
272    characteristics of sub-camera. Otherwise, return "props" directly.
273
274    Args:
275     props: Camera properties object.
276
277    Returns:
278     The properties of the hidden physical camera if possible.
279    """
280    if self._hidden_physical_id:
281      if not camera_properties_utils.logical_multi_camera(props):
282        raise AssertionError(f'{self._camera_id} is not a logical multi-camera')
283      physical_ids = camera_properties_utils.logical_multi_camera_physical_ids(
284          props)
285      if self._hidden_physical_id not in physical_ids:
286        raise AssertionError(f'{self._hidden_physical_id} is not a hidden '
287                             f'sub-camera of {self._camera_id}')
288      props = self.get_camera_properties_by_id(self._hidden_physical_id)
289      self.props = props
290    return props
291
292  def get_camera_properties(self):
293    """Get the camera properties object for the device.
294
295    Returns:
296     The Python dictionary object for the CameraProperties object.
297    """
298    cmd = {}
299    cmd['cmdName'] = 'getCameraProperties'
300    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
301    data, _ = self.__read_response_from_socket()
302    if data['tag'] != 'cameraProperties':
303      raise error_util.CameraItsError('Invalid command response')
304    self.props = data['objValue']['cameraProperties']
305    return data['objValue']['cameraProperties']
306
307  def get_camera_properties_by_id(self, camera_id):
308    """Get the camera properties object for device with camera_id.
309
310    Args:
311     camera_id: The ID string of the camera
312
313    Returns:
314     The Python dictionary object for the CameraProperties object. Empty
315     if no such device exists.
316    """
317    cmd = {}
318    cmd['cmdName'] = 'getCameraPropertiesById'
319    cmd['cameraId'] = camera_id
320    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
321    data, _ = self.__read_response_from_socket()
322    if data['tag'] != 'cameraProperties':
323      raise error_util.CameraItsError('Invalid command response')
324    return data['objValue']['cameraProperties']
325
326  def __read_response_from_socket(self):
327    """Reads a line (newline-terminated) string serialization of JSON object.
328
329    Returns:
330     Deserialized json obj.
331    """
332    chars = []
333    while not chars or chars[-1] != '\n':
334      ch = self.sock.recv(1).decode('utf-8')
335      if not ch:
336        # Socket was probably closed; otherwise don't get empty strings
337        raise error_util.CameraItsError('Problem with socket on device side')
338      chars.append(ch)
339    line = ''.join(chars)
340    jobj = json.loads(line)
341    # Optionally read a binary buffer of a fixed size.
342    buf = None
343    if 'bufValueSize' in jobj:
344      n = jobj['bufValueSize']
345      buf = bytearray(n)
346      view = memoryview(buf)
347      while n > 0:
348        nbytes = self.sock.recv_into(view, n)
349        view = view[nbytes:]
350        n -= nbytes
351      buf = numpy.frombuffer(buf, dtype=numpy.uint8)
352    return jobj, buf
353
354  def __open_camera(self):
355    """Get the camera ID to open if it is an argument as a single camera.
356
357    This allows passing camera=# to individual tests at command line
358    and camera=#,#,# or an no camera argv with tools/run_all_tests.py.
359    In case the camera is a logical multi-camera, to run ITS on the
360    hidden physical sub-camera, pass camera=[logical ID]:[physical ID]
361    to an individual test at the command line, and same applies to multiple
362    camera IDs for tools/run_all_tests.py: camera=#,#:#,#:#,#
363    """
364    if not self._camera_id:
365      self._camera_id = 0
366      for s in sys.argv[1:]:
367        if s[:7] == 'camera=' and len(s) > 7:
368          camera_ids = s[7:].split(',')
369          camera_id_combos = parse_camera_ids(camera_ids)
370          if len(camera_id_combos) == 1:
371            self._camera_id = camera_id_combos[0].id
372            self._hidden_physical_id = camera_id_combos[0].sub_id
373
374    logging.debug('Opening camera: %s', self._camera_id)
375    cmd = {'cmdName': 'open', 'cameraId': self._camera_id}
376    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
377    data, _ = self.__read_response_from_socket()
378    if data['tag'] != 'cameraOpened':
379      raise error_util.CameraItsError('Invalid command response')
380
381  def __close_camera(self):
382    cmd = {'cmdName': 'close'}
383    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
384    data, _ = self.__read_response_from_socket()
385    if data['tag'] != 'cameraClosed':
386      raise error_util.CameraItsError('Invalid command response')
387
388  def get_sensors(self):
389    """Get all sensors on the device.
390
391    Returns:
392       A Python dictionary that returns keys and booleans for each sensor.
393    """
394    cmd = {}
395    cmd['cmdName'] = 'checkSensorExistence'
396    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
397    data, _ = self.__read_response_from_socket()
398    if data['tag'] != 'sensorExistence':
399      raise error_util.CameraItsError('Invalid response for command: %s' %
400                                      cmd['cmdName'])
401    return data['objValue']
402
403  def start_sensor_events(self):
404    """Start collecting sensor events on the device.
405
406    See get_sensor_events for more info.
407
408    Returns:
409       Nothing.
410    """
411    cmd = {}
412    cmd['cmdName'] = 'startSensorEvents'
413    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
414    data, _ = self.__read_response_from_socket()
415    if data['tag'] != 'sensorEventsStarted':
416      raise error_util.CameraItsError('Invalid response for command: %s' %
417                                      cmd['cmdName'])
418
419  def get_sensor_events(self):
420    """Get a trace of all sensor events on the device.
421
422        The trace starts when the start_sensor_events function is called. If
423        the test runs for a long time after this call, then the device's
424        internal memory can fill up. Calling get_sensor_events gets all events
425        from the device, and then stops the device from collecting events and
426        clears the internal buffer; to start again, the start_sensor_events
427        call must be used again.
428
429        Events from the accelerometer, compass, and gyro are returned; each
430        has a timestamp and x,y,z values.
431
432        Note that sensor events are only produced if the device isn't in its
433        standby mode (i.e.) if the screen is on.
434
435    Returns:
436            A Python dictionary with three keys ("accel", "mag", "gyro") each
437            of which maps to a list of objects containing "time","x","y","z"
438            keys.
439    """
440    cmd = {}
441    cmd['cmdName'] = 'getSensorEvents'
442    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
443    timeout = self.SOCK_TIMEOUT + self.EXTRA_SOCK_TIMEOUT
444    self.sock.settimeout(timeout)
445    data, _ = self.__read_response_from_socket()
446    if data['tag'] != 'sensorEvents':
447      raise error_util.CameraItsError('Invalid response for command: %s ' %
448                                      cmd['cmdName'])
449    self.sock.settimeout(self.SOCK_TIMEOUT)
450    return data['objValue']
451
452  def do_capture(self,
453                 cap_request,
454                 out_surfaces=None,
455                 reprocess_format=None,
456                 repeat_request=None):
457    """Issue capture request(s), and read back the image(s) and metadata.
458
459    The main top-level function for capturing one or more images using the
460    device. Captures a single image if cap_request is a single object, and
461    captures a burst if it is a list of objects.
462
463    The optional repeat_request field can be used to assign a repeating
464    request list ran in background for 3 seconds to warm up the capturing
465    pipeline before start capturing. The repeat_requests will be ran on a
466    640x480 YUV surface without sending any data back. The caller needs to
467    make sure the stream configuration defined by out_surfaces and
468    repeat_request are valid or do_capture may fail because device does not
469    support such stream configuration.
470
471    The out_surfaces field can specify the width(s), height(s), and
472    format(s) of the captured image. The formats may be "yuv", "jpeg",
473    "dng", "raw", "raw10", "raw12", "rawStats" or "y8". The default is a
474    YUV420 frame ("yuv") corresponding to a full sensor frame.
475
476    Optionally the out_surfaces field can specify physical camera id(s) if
477    the current camera device is a logical multi-camera. The physical camera
478    id must refer to a physical camera backing this logical camera device.
479
480    Note that one or more surfaces can be specified, allowing a capture to
481    request images back in multiple formats (e.g.) raw+yuv, raw+jpeg,
482    yuv+jpeg, raw+yuv+jpeg. If the size is omitted for a surface, the
483    default is the largest resolution available for the format of that
484    surface. At most one output surface can be specified for a given format,
485    and raw+dng, raw10+dng, and raw+raw10 are not supported as combinations.
486
487    If reprocess_format is not None, for each request, an intermediate
488    buffer of the given reprocess_format will be captured from camera and
489    the intermediate buffer will be reprocessed to the output surfaces. The
490    following settings will be turned off when capturing the intermediate
491    buffer and will be applied when reprocessing the intermediate buffer.
492    1. android.noiseReduction.mode
493    2. android.edge.mode
494    3. android.reprocess.effectiveExposureFactor
495
496    Supported reprocess format are "yuv" and "private". Supported output
497    surface formats when reprocessing is enabled are "yuv" and "jpeg".
498
499    Example of a single capture request:
500
501    {
502     "android.sensor.exposureTime": 100*1000*1000,
503     "android.sensor.sensitivity": 100
504    }
505
506    Example of a list of capture requests:
507    [
508     {
509       "android.sensor.exposureTime": 100*1000*1000,
510       "android.sensor.sensitivity": 100
511     },
512    {
513      "android.sensor.exposureTime": 100*1000*1000,
514       "android.sensor.sensitivity": 200
515     }
516    ]
517
518    Example of output surface specifications:
519    {
520     "width": 640,
521     "height": 480,
522     "format": "yuv"
523    }
524    [
525     {
526       "format": "jpeg"
527     },
528     {
529       "format": "raw"
530     }
531    ]
532
533    The following variables defined in this class are shortcuts for
534    specifying one or more formats where each output is the full size for
535    that format; they can be used as values for the out_surfaces arguments:
536
537    CAP_RAW
538    CAP_DNG
539    CAP_YUV
540    CAP_JPEG
541    CAP_RAW_YUV
542    CAP_DNG_YUV
543    CAP_RAW_JPEG
544    CAP_DNG_JPEG
545    CAP_YUV_JPEG
546    CAP_RAW_YUV_JPEG
547    CAP_DNG_YUV_JPEG
548
549    If multiple formats are specified, then this function returns multiple
550    capture objects, one for each requested format. If multiple formats and
551    multiple captures (i.e. a burst) are specified, then this function
552    returns multiple lists of capture objects. In both cases, the order of
553    the returned objects matches the order of the requested formats in the
554    out_surfaces parameter. For example:
555
556    yuv_cap = do_capture(req1)
557    yuv_cap = do_capture(req1,yuv_fmt)
558    yuv_cap, raw_cap = do_capture(req1, [yuv_fmt,raw_fmt])
559    yuv_caps = do_capture([req1,req2], yuv_fmt)
560    yuv_caps, raw_caps = do_capture([req1,req2], [yuv_fmt,raw_fmt])
561
562    The "rawStats" format processes the raw image and returns a new image
563    of statistics from the raw image. The format takes additional keys,
564    "gridWidth" and "gridHeight" which are size of grid cells in a 2D grid
565    of the raw image. For each grid cell, the mean and variance of each raw
566    channel is computed, and the do_capture call returns two 4-element float
567    images of dimensions (rawWidth / gridWidth, rawHeight / gridHeight),
568    concatenated back-to-back, where the first image contains the 4-channel
569    means and the second contains the 4-channel variances. Note that only
570    pixels in the active array crop region are used; pixels outside this
571    region (for example optical black rows) are cropped out before the
572    gridding and statistics computation is performed.
573
574    For the rawStats format, if the gridWidth is not provided then the raw
575    image width is used as the default, and similarly for gridHeight. With
576    this, the following is an example of a output description that computes
577    the mean and variance across each image row:
578    {
579      "gridHeight": 1,
580      "format": "rawStats"
581    }
582
583    Args:
584      cap_request: The Python dict/list specifying the capture(s), which will be
585        converted to JSON and sent to the device.
586      out_surfaces: (Optional) specifications of the output image formats and
587        sizes to use for each capture.
588      reprocess_format: (Optional) The reprocessing format. If not
589        None,reprocessing will be enabled.
590      repeat_request: Repeating request list.
591
592    Returns:
593      An object, list of objects, or list of lists of objects, where each
594      object contains the following fields:
595      * data: the image data as a numpy array of bytes.
596      * width: the width of the captured image.
597      * height: the height of the captured image.
598      * format: image the format, in [
599                        "yuv","jpeg","raw","raw10","raw12","rawStats","dng"].
600      * metadata: the capture result object (Python dictionary).
601    """
602    cmd = {}
603    if reprocess_format is not None:
604      if repeat_request is not None:
605        raise error_util.CameraItsError(
606            'repeating request + reprocessing is not supported')
607      cmd['cmdName'] = 'doReprocessCapture'
608      cmd['reprocessFormat'] = reprocess_format
609    else:
610      cmd['cmdName'] = 'doCapture'
611
612    if repeat_request is None:
613      cmd['repeatRequests'] = []
614    elif not isinstance(repeat_request, list):
615      cmd['repeatRequests'] = [repeat_request]
616    else:
617      cmd['repeatRequests'] = repeat_request
618
619    if not isinstance(cap_request, list):
620      cmd['captureRequests'] = [cap_request]
621    else:
622      cmd['captureRequests'] = cap_request
623
624    if out_surfaces is not None:
625      if not isinstance(out_surfaces, list):
626        cmd['outputSurfaces'] = [out_surfaces]
627      else:
628        cmd['outputSurfaces'] = out_surfaces
629      formats = [
630          c['format'] if 'format' in c else 'yuv' for c in cmd['outputSurfaces']
631      ]
632      formats = [s if s != 'jpg' else 'jpeg' for s in formats]
633    else:
634      max_yuv_size = capture_request_utils.get_available_output_sizes(
635          'yuv', self.props)[0]
636      formats = ['yuv']
637      cmd['outputSurfaces'] = [{
638          'format': 'yuv',
639          'width': max_yuv_size[0],
640          'height': max_yuv_size[1]
641      }]
642
643    ncap = len(cmd['captureRequests'])
644    nsurf = 1 if out_surfaces is None else len(cmd['outputSurfaces'])
645
646    cam_ids = []
647    bufs = {}
648    yuv_bufs = {}
649    for i, s in enumerate(cmd['outputSurfaces']):
650      if self._hidden_physical_id:
651        s['physicalCamera'] = self._hidden_physical_id
652
653      if 'physicalCamera' in s:
654        cam_id = s['physicalCamera']
655      else:
656        cam_id = self._camera_id
657
658      if cam_id not in cam_ids:
659        cam_ids.append(cam_id)
660        bufs[cam_id] = {
661            'raw': [],
662            'raw10': [],
663            'raw12': [],
664            'rawStats': [],
665            'dng': [],
666            'jpeg': [],
667            'y8': []
668        }
669
670    for cam_id in cam_ids:
671       # Only allow yuv output to multiple targets
672      if cam_id == self._camera_id:
673        yuv_surfaces = [
674            s for s in cmd['outputSurfaces']
675            if s['format'] == 'yuv' and 'physicalCamera' not in s
676        ]
677        formats_for_id = [
678            s['format']
679            for s in cmd['outputSurfaces']
680            if 'physicalCamera' not in s
681        ]
682      else:
683        yuv_surfaces = [
684            s for s in cmd['outputSurfaces'] if s['format'] == 'yuv' and
685            'physicalCamera' in s and s['physicalCamera'] == cam_id
686        ]
687        formats_for_id = [
688            s['format']
689            for s in cmd['outputSurfaces']
690            if 'physicalCamera' in s and s['physicalCamera'] == cam_id
691        ]
692
693      n_yuv = len(yuv_surfaces)
694      # Compute the buffer size of YUV targets
695      yuv_maxsize_1d = 0
696      for s in yuv_surfaces:
697        if ('width' not in s and 'height' not in s):
698          if self.props is None:
699            raise error_util.CameraItsError('Camera props are unavailable')
700          yuv_maxsize_2d = capture_request_utils.get_available_output_sizes(
701              'yuv', self.props)[0]
702          # YUV420 size = 1.5 bytes per pixel
703          yuv_maxsize_1d = (yuv_maxsize_2d[0] * yuv_maxsize_2d[1] * 3) // 2
704          break
705      yuv_sizes = [
706          (c['width'] * c['height'] * 3) // 2
707          if 'width' in c and 'height' in c else yuv_maxsize_1d
708          for c in yuv_surfaces
709      ]
710      # Currently we don't pass enough metadta from ItsService to distinguish
711      # different yuv stream of same buffer size
712      if len(yuv_sizes) != len(set(yuv_sizes)):
713        raise error_util.CameraItsError(
714            'ITS does not support yuv outputs of same buffer size')
715      if len(formats_for_id) > len(set(formats_for_id)):
716        if n_yuv != len(formats_for_id) - len(set(formats_for_id)) + 1:
717          raise error_util.CameraItsError('Duplicate format requested')
718
719      yuv_bufs[cam_id] = {size: [] for size in yuv_sizes}
720
721    raw_formats = 0
722    raw_formats += 1 if 'dng' in formats else 0
723    raw_formats += 1 if 'raw' in formats else 0
724    raw_formats += 1 if 'raw10' in formats else 0
725    raw_formats += 1 if 'raw12' in formats else 0
726    raw_formats += 1 if 'rawStats' in formats else 0
727    if raw_formats > 1:
728      raise error_util.CameraItsError('Different raw formats not supported')
729
730    # Detect long exposure time and set timeout accordingly
731    longest_exp_time = 0
732    for req in cmd['captureRequests']:
733      if 'android.sensor.exposureTime' in req and req[
734          'android.sensor.exposureTime'] > longest_exp_time:
735        longest_exp_time = req['android.sensor.exposureTime']
736
737    extended_timeout = longest_exp_time // self.SEC_TO_NSEC + self.SOCK_TIMEOUT
738    if repeat_request:
739      extended_timeout += self.EXTRA_SOCK_TIMEOUT
740    self.sock.settimeout(extended_timeout)
741
742    logging.debug('Capturing %d frame%s with %d format%s [%s]', ncap,
743                  's' if ncap > 1 else '', nsurf, 's' if nsurf > 1 else '',
744                  ','.join(formats))
745    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
746
747    # Wait for ncap*nsurf images and ncap metadata responses.
748    # Assume that captures come out in the same order as requested in
749    # the burst, however individual images of different formats can come
750    # out in any order for that capture.
751    nbufs = 0
752    mds = []
753    physical_mds = []
754    widths = None
755    heights = None
756    while nbufs < ncap * nsurf or len(mds) < ncap:
757      json_obj, buf = self.__read_response_from_socket()
758      if json_obj['tag'] in ItsSession.IMAGE_FORMAT_LIST_1 and buf is not None:
759        fmt = json_obj['tag'][:-5]
760        bufs[self._camera_id][fmt].append(buf)
761        nbufs += 1
762      elif json_obj['tag'] == 'yuvImage':
763        buf_size = numpy.product(buf.shape)
764        yuv_bufs[self._camera_id][buf_size].append(buf)
765        nbufs += 1
766      elif json_obj['tag'] == 'captureResults':
767        mds.append(json_obj['objValue']['captureResult'])
768        physical_mds.append(json_obj['objValue']['physicalResults'])
769        outputs = json_obj['objValue']['outputs']
770        widths = [out['width'] for out in outputs]
771        heights = [out['height'] for out in outputs]
772      else:
773        tag_string = unicodedata.normalize('NFKD', json_obj['tag']).encode(
774            'ascii', 'ignore')
775        for x in ItsSession.IMAGE_FORMAT_LIST_2:
776          x = bytes(x, encoding='utf-8')
777          if tag_string.startswith(x):
778            if x == b'yuvImage':
779              physical_id = json_obj['tag'][len(x):]
780              if physical_id in cam_ids:
781                buf_size = numpy.product(buf.shape)
782                yuv_bufs[physical_id][buf_size].append(buf)
783                nbufs += 1
784            else:
785              physical_id = json_obj['tag'][len(x):]
786              if physical_id in cam_ids:
787                fmt = x[:-5].decode('UTF-8')
788                bufs[physical_id][fmt].append(buf)
789                nbufs += 1
790    rets = []
791    for j, fmt in enumerate(formats):
792      objs = []
793      if 'physicalCamera' in cmd['outputSurfaces'][j]:
794        cam_id = cmd['outputSurfaces'][j]['physicalCamera']
795      else:
796        cam_id = self._camera_id
797
798      for i in range(ncap):
799        obj = {}
800        obj['width'] = widths[j]
801        obj['height'] = heights[j]
802        obj['format'] = fmt
803        if cam_id == self._camera_id:
804          obj['metadata'] = mds[i]
805        else:
806          for physical_md in physical_mds[i]:
807            if cam_id in physical_md:
808              obj['metadata'] = physical_md[cam_id]
809              break
810
811        if fmt == 'yuv':
812          buf_size = (widths[j] * heights[j] * 3) // 2
813          obj['data'] = yuv_bufs[cam_id][buf_size][i]
814        else:
815          obj['data'] = bufs[cam_id][fmt][i]
816        objs.append(obj)
817      rets.append(objs if ncap > 1 else objs[0])
818    self.sock.settimeout(self.SOCK_TIMEOUT)
819    if len(rets) > 1 or (isinstance(rets[0], dict) and
820                         isinstance(cap_request, list)):
821      return rets
822    else:
823      return rets[0]
824
825  def do_vibrate(self, pattern):
826    """Cause the device to vibrate to a specific pattern.
827
828    Args:
829      pattern: Durations (ms) for which to turn on or off the vibrator.
830      The first value indicates the number of milliseconds to wait
831      before turning the vibrator on. The next value indicates the
832      number of milliseconds for which to keep the vibrator on
833      before turning it off. Subsequent values alternate between
834      durations in milliseconds to turn the vibrator off or to turn
835      the vibrator on.
836
837    Returns:
838      Nothing.
839    """
840    cmd = {}
841    cmd['cmdName'] = 'doVibrate'
842    cmd['pattern'] = pattern
843    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
844    data, _ = self.__read_response_from_socket()
845    if data['tag'] != 'vibrationStarted':
846      raise error_util.CameraItsError('Invalid response for command: %s' %
847                                      cmd['cmdName'])
848
849  def set_audio_restriction(self, mode):
850    """Set the audio restriction mode for this camera device.
851
852    Args:
853     mode: int; the audio restriction mode. See CameraDevice.java for valid
854     value.
855    Returns:
856     Nothing.
857    """
858    cmd = {}
859    cmd['cmdName'] = 'setAudioRestriction'
860    cmd['mode'] = mode
861    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
862    data, _ = self.__read_response_from_socket()
863    if data['tag'] != 'audioRestrictionSet':
864      raise error_util.CameraItsError('Invalid response for command: %s' %
865                                      cmd['cmdName'])
866
867  # pylint: disable=dangerous-default-value
868  def do_3a(self,
869            regions_ae=[[0, 0, 1, 1, 1]],
870            regions_awb=[[0, 0, 1, 1, 1]],
871            regions_af=[[0, 0, 1, 1, 1]],
872            do_ae=True,
873            do_awb=True,
874            do_af=True,
875            lock_ae=False,
876            lock_awb=False,
877            get_results=False,
878            ev_comp=0,
879            mono_camera=False):
880    """Perform a 3A operation on the device.
881
882    Triggers some or all of AE, AWB, and AF, and returns once they have
883    converged. Uses the vendor 3A that is implemented inside the HAL.
884    Note: do_awb is always enabled regardless of do_awb flag
885
886    Throws an assertion if 3A fails to converge.
887
888    Args:
889      regions_ae: List of weighted AE regions.
890      regions_awb: List of weighted AWB regions.
891      regions_af: List of weighted AF regions.
892      do_ae: Trigger AE and wait for it to converge.
893      do_awb: Wait for AWB to converge.
894      do_af: Trigger AF and wait for it to converge.
895      lock_ae: Request AE lock after convergence, and wait for it.
896      lock_awb: Request AWB lock after convergence, and wait for it.
897      get_results: Return the 3A results from this function.
898      ev_comp: An EV compensation value to use when running AE.
899      mono_camera: Boolean for monochrome camera.
900
901      Region format in args:
902         Arguments are lists of weighted regions; each weighted region is a
903         list of 5 values, [x, y, w, h, wgt], and each argument is a list of
904         these 5-value lists. The coordinates are given as normalized
905         rectangles (x, y, w, h) specifying the region. For example:
906         [[0.0, 0.0, 1.0, 0.5, 5], [0.0, 0.5, 1.0, 0.5, 10]].
907         Weights are non-negative integers.
908
909    Returns:
910      Five values are returned if get_results is true:
911      * AE sensitivity; None if do_ae is False
912      * AE exposure time; None if do_ae is False
913      * AWB gains (list);
914      * AWB transform (list);
915      * AF focus position; None if do_af is false
916      Otherwise, it returns five None values.
917    """
918    logging.debug('Running vendor 3A on device')
919    cmd = {}
920    cmd['cmdName'] = 'do3A'
921    cmd['regions'] = {
922        'ae': sum(regions_ae, []),
923        'awb': sum(regions_awb, []),
924        'af': sum(regions_af, [])
925    }
926    cmd['triggers'] = {'ae': do_ae, 'af': do_af}
927    if lock_ae:
928      cmd['aeLock'] = True
929    if lock_awb:
930      cmd['awbLock'] = True
931    if ev_comp != 0:
932      cmd['evComp'] = ev_comp
933    if self._hidden_physical_id:
934      cmd['physicalId'] = self._hidden_physical_id
935    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
936
937    # Wait for each specified 3A to converge.
938    ae_sens = None
939    ae_exp = None
940    awb_gains = None
941    awb_transform = None
942    af_dist = None
943    converged = False
944    while True:
945      data, _ = self.__read_response_from_socket()
946      vals = data['strValue'].split()
947      if data['tag'] == 'aeResult':
948        if do_ae:
949          ae_sens, ae_exp = [int(i) for i in vals]
950      elif data['tag'] == 'afResult':
951        if do_af:
952          af_dist = float(vals[0])
953      elif data['tag'] == 'awbResult':
954        awb_gains = [float(f) for f in vals[:4]]
955        awb_transform = [float(f) for f in vals[4:]]
956      elif data['tag'] == '3aConverged':
957        converged = True
958      elif data['tag'] == '3aDone':
959        break
960      else:
961        raise error_util.CameraItsError('Invalid command response')
962    if converged and not get_results:
963      return None, None, None, None, None
964    if (do_ae and ae_sens is None or
965        (not mono_camera and do_awb and awb_gains is None) or
966        do_af and af_dist is None or not converged):
967      raise error_util.CameraItsError('3A failed to converge')
968    return ae_sens, ae_exp, awb_gains, awb_transform, af_dist
969
970  def calc_camera_fov(self, props):
971    """Determine the camera field of view from internal params.
972
973    Args:
974      props: Camera properties object.
975
976    Returns:
977      camera_fov: string; field of view for camera.
978    """
979
980    focal_ls = props['android.lens.info.availableFocalLengths']
981    if len(focal_ls) > 1:
982      logging.debug('Doing capture to determine logical camera focal length')
983      cap = self.do_capture(capture_request_utils.auto_capture_request())
984      focal_l = cap['metadata']['android.lens.focalLength']
985    else:
986      focal_l = focal_ls[0]
987
988    sensor_size = props['android.sensor.info.physicalSize']
989    diag = math.sqrt(sensor_size['height']**2 + sensor_size['width']**2)
990    try:
991      fov = str(round(2 * math.degrees(math.atan(diag / (2 * focal_l))), 2))
992    except ValueError:
993      fov = str(0)
994    logging.debug('Calculated FoV: %s', fov)
995    return fov
996
997  def get_file_name_to_load(self, chart_distance, camera_fov, scene):
998    """Get the image to load on the tablet depending on fov and chart_distance.
999
1000    Args:
1001     chart_distance: float; distance in cm from camera of displayed chart
1002     camera_fov: float; camera field of view.
1003     scene: String; Scene to be used in the test.
1004
1005    Returns:
1006     file_name: file name to display on the tablet.
1007
1008    """
1009    chart_scaling = opencv_processing_utils.calc_chart_scaling(
1010        chart_distance, camera_fov)
1011    if numpy.isclose(
1012        chart_scaling,
1013        opencv_processing_utils.SCALE_RFOV_IN_WFOV_BOX,
1014        atol=0.01):
1015      file_name = '%s_%sx_scaled.png' % (
1016          scene, str(opencv_processing_utils.SCALE_RFOV_IN_WFOV_BOX))
1017    elif numpy.isclose(
1018        chart_scaling,
1019        opencv_processing_utils.SCALE_TELE_IN_WFOV_BOX,
1020        atol=0.01):
1021      file_name = '%s_%sx_scaled.png' % (
1022          scene, str(opencv_processing_utils.SCALE_TELE_IN_WFOV_BOX))
1023    elif numpy.isclose(
1024        chart_scaling,
1025        opencv_processing_utils.SCALE_TELE25_IN_RFOV_BOX,
1026        atol=0.01):
1027      file_name = '%s_%sx_scaled.png' % (
1028          scene, str(opencv_processing_utils.SCALE_TELE25_IN_RFOV_BOX))
1029    elif numpy.isclose(
1030        chart_scaling,
1031        opencv_processing_utils.SCALE_TELE40_IN_RFOV_BOX,
1032        atol=0.01):
1033      file_name = '%s_%sx_scaled.png' % (
1034          scene, str(opencv_processing_utils.SCALE_TELE40_IN_RFOV_BOX))
1035    elif numpy.isclose(
1036        chart_scaling,
1037        opencv_processing_utils.SCALE_TELE_IN_RFOV_BOX,
1038        atol=0.01):
1039      file_name = '%s_%sx_scaled.png' % (
1040          scene, str(opencv_processing_utils.SCALE_TELE_IN_RFOV_BOX))
1041    else:
1042      file_name = '%s.png' % scene
1043    logging.debug('Scene to load: %s', file_name)
1044    return file_name
1045
1046  def is_stream_combination_supported(self, out_surfaces):
1047    """Query whether out_surfaces combination is supported by the camera device.
1048
1049    This function hooks up to the isSessionConfigurationSupported() camera API
1050    to query whether a particular stream combination is supported.
1051
1052    Args:
1053      out_surfaces: dict; see do_capture() for specifications on out_surfaces
1054
1055    Returns:
1056      Boolean
1057    """
1058    cmd = {}
1059    cmd['cmdName'] = 'isStreamCombinationSupported'
1060
1061    if not isinstance(out_surfaces, list):
1062      cmd['outputSurfaces'] = [out_surfaces]
1063    else:
1064      cmd['outputSurfaces'] = out_surfaces
1065    formats = [c['format'] if 'format' in c else 'yuv'
1066               for c in cmd['outputSurfaces']]
1067    formats = [s if s != 'jpg' else 'jpeg' for s in formats]
1068
1069    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
1070
1071    data, _ = self.__read_response_from_socket()
1072    if data['tag'] != 'streamCombinationSupport':
1073      raise error_util.CameraItsError('Failed to query stream combination')
1074
1075    return data['strValue'] == 'supportedCombination'
1076
1077  def is_camera_privacy_mode_supported(self):
1078    """Query whether the mobile device supports camera privacy mode.
1079
1080    This function checks whether the mobile device has FEATURE_CAMERA_TOGGLE
1081    feature support, which indicates the camera device can run in privacy mode.
1082
1083    Returns:
1084      Boolean
1085    """
1086    cmd = {}
1087    cmd['cmdName'] = 'isCameraPrivacyModeSupported'
1088    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
1089
1090    data, _ = self.__read_response_from_socket()
1091    if data['tag'] != 'cameraPrivacyModeSupport':
1092      raise error_util.CameraItsError('Failed to query camera privacy mode'
1093                                      ' support')
1094    return data['strValue'] == 'true'
1095
1096  def is_performance_class_primary_camera(self):
1097    """Query whether the camera device is an R or S performance class primary camera.
1098
1099    A primary rear/front facing camera is a camera device with the lowest
1100    camera Id for that facing.
1101
1102    Returns:
1103      Boolean
1104    """
1105    cmd = {}
1106    cmd['cmdName'] = 'isPerformanceClassPrimaryCamera'
1107    cmd['cameraId'] = self._camera_id
1108    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
1109
1110    data, _ = self.__read_response_from_socket()
1111    if data['tag'] != 'performanceClassPrimaryCamera':
1112      raise error_util.CameraItsError('Failed to query performance class '
1113                                      'primary camera')
1114    return data['strValue'] == 'true'
1115
1116  def measure_camera_launch_ms(self):
1117    """Measure camera launch latency in millisecond, from open to first frame.
1118
1119    Returns:
1120      Camera launch latency from camera open to receipt of first frame
1121    """
1122    cmd = {}
1123    cmd['cmdName'] = 'measureCameraLaunchMs'
1124    cmd['cameraId'] = self._camera_id
1125    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
1126
1127    data, _ = self.__read_response_from_socket()
1128    if data['tag'] != 'cameraLaunchMs':
1129      raise error_util.CameraItsError('Failed to measure camera launch latency')
1130    return float(data['strValue'])
1131
1132  def measure_camera_1080p_jpeg_capture_ms(self):
1133    """Measure camera 1080P jpeg capture latency in milliseconds.
1134
1135    Returns:
1136      Camera jpeg capture latency in milliseconds
1137    """
1138    cmd = {}
1139    cmd['cmdName'] = 'measureCamera1080pJpegCaptureMs'
1140    cmd['cameraId'] = self._camera_id
1141    self.sock.send(json.dumps(cmd).encode() + '\n'.encode())
1142
1143    data, _ = self.__read_response_from_socket()
1144    if data['tag'] != 'camera1080pJpegCaptureMs':
1145      raise error_util.CameraItsError(
1146          'Failed to measure camera 1080p jpeg capture latency')
1147    return float(data['strValue'])
1148
1149
1150def parse_camera_ids(ids):
1151  """Parse the string of camera IDs into array of CameraIdCombo tuples.
1152
1153  Args:
1154   ids: List of camera ids.
1155
1156  Returns:
1157   Array of CameraIdCombo
1158  """
1159  camera_id_combo = collections.namedtuple('CameraIdCombo', ['id', 'sub_id'])
1160  id_combos = []
1161  for one_id in ids:
1162    one_combo = one_id.split(SUB_CAMERA_SEPARATOR)
1163    if len(one_combo) == 1:
1164      id_combos.append(camera_id_combo(one_combo[0], None))
1165    elif len(one_combo) == 2:
1166      id_combos.append(camera_id_combo(one_combo[0], one_combo[1]))
1167    else:
1168      raise AssertionError('Camera id parameters must be either ID or '
1169                           f'ID{SUB_CAMERA_SEPARATOR}SUB_ID')
1170  return id_combos
1171
1172
1173def _run(cmd):
1174  """Replacement for os.system, with hiding of stdout+stderr messages.
1175
1176  Args:
1177    cmd: Command to be executed in string format.
1178  """
1179  with open(os.devnull, 'wb') as devnull:
1180    subprocess.check_call(cmd.split(), stdout=devnull, stderr=subprocess.STDOUT)
1181
1182
1183def do_capture_with_latency(cam, req, sync_latency, fmt=None):
1184  """Helper function to take enough frames to allow sync latency.
1185
1186  Args:
1187    cam: camera object
1188    req: request for camera
1189    sync_latency: integer number of frames
1190    fmt: format for the capture
1191  Returns:
1192    single capture with the unsettled frames discarded
1193  """
1194  caps = cam.do_capture([req]*(sync_latency+1), fmt)
1195  return caps[-1]
1196
1197
1198def load_scene(cam, props, scene, tablet, chart_distance):
1199  """Load the scene for the camera based on the FOV.
1200
1201  Args:
1202    cam: camera object
1203    props: camera properties
1204    scene: scene to be loaded
1205    tablet: tablet to load scene on
1206    chart_distance: distance to tablet
1207  """
1208  if not tablet:
1209    logging.info('Manual run: no tablet to load scene on.')
1210    return
1211  # Calculate camera_fov which will determine the image to load on tablet.
1212  camera_fov = cam.calc_camera_fov(props)
1213  file_name = cam.get_file_name_to_load(chart_distance, camera_fov, scene)
1214  logging.debug('Displaying %s on the tablet', file_name)
1215  # Display the scene on the tablet depending on camera_fov
1216  tablet.adb.shell(
1217      'am start -a android.intent.action.VIEW -t image/png '
1218      f'-d file://mnt/sdcard/Download/{file_name}')
1219  time.sleep(LOAD_SCENE_DELAY_SEC)
1220  rfov_camera_in_rfov_box = (
1221      numpy.isclose(
1222          chart_distance,
1223          opencv_processing_utils.CHART_DISTANCE_RFOV, rtol=0.1) and
1224      opencv_processing_utils.FOV_THRESH_TELE <= float(camera_fov)
1225      <= opencv_processing_utils.FOV_THRESH_WFOV)
1226  wfov_camera_in_wfov_box = (
1227      numpy.isclose(
1228          chart_distance,
1229          opencv_processing_utils.CHART_DISTANCE_WFOV, rtol=0.1) and
1230      float(camera_fov) > opencv_processing_utils.FOV_THRESH_WFOV)
1231  if rfov_camera_in_rfov_box or wfov_camera_in_wfov_box:
1232    cam.do_3a()
1233    cap = cam.do_capture(
1234        capture_request_utils.auto_capture_request(), cam.CAP_YUV)
1235    y_plane, _, _ = image_processing_utils.convert_capture_to_planes(cap)
1236    validate_lighting(y_plane, scene)
1237
1238
1239def validate_lighting(y_plane, scene):
1240  """Validates the lighting level in scene corners based on empirical values.
1241
1242  Args:
1243    y_plane: Y plane of YUV image
1244    scene: scene name
1245  Returns:
1246    boolean True if lighting validated, else raise AssertionError
1247  """
1248  logging.debug('Validating lighting levels.')
1249
1250  # Test patches from each corner.
1251  for location, coordinates in _VALIDATE_LIGHTING_REGIONS.items():
1252    patch = image_processing_utils.get_image_patch(
1253        y_plane, coordinates[0], coordinates[1],
1254        _VALIDATE_LIGHTING_PATCH_W, _VALIDATE_LIGHTING_PATCH_H)
1255    y_mean = image_processing_utils.compute_image_means(patch)[0]
1256    logging.debug('%s corner Y mean: %.3f', location, y_mean)
1257    if y_mean > _VALIDATE_LIGHTING_THRESH:
1258      logging.debug('Lights ON in test rig.')
1259      return True
1260  image_processing_utils.write_image(y_plane, f'validate_lighting_{scene}.jpg')
1261  raise AssertionError('Lights OFF in test rig. Please turn ON and retry.')
1262
1263
1264def get_build_sdk_version(device_id):
1265  """Return the int build version of the device."""
1266  cmd = 'adb -s %s shell getprop ro.build.version.sdk' % device_id
1267  try:
1268    build_sdk_version = int(subprocess.check_output(cmd.split()).rstrip())
1269    logging.debug('Build SDK version: %d', build_sdk_version)
1270  except (subprocess.CalledProcessError, ValueError):
1271    raise AssertionError('No build_sdk_version.')
1272  return build_sdk_version
1273
1274
1275def get_first_api_level(device_id):
1276  """Return the int value for the first API level of the device."""
1277  cmd = 'adb -s %s shell getprop ro.product.first_api_level' % device_id
1278  try:
1279    first_api_level = int(subprocess.check_output(cmd.split()).rstrip())
1280    logging.debug('First API level: %d', first_api_level)
1281  except (subprocess.CalledProcessError, ValueError):
1282    logging.error('No first_api_level. Setting to build version.')
1283    first_api_level = get_build_sdk_version(device_id)
1284  return first_api_level
1285
1286
1287class ItsSessionUtilsTests(unittest.TestCase):
1288  """Run a suite of unit tests on this module."""
1289
1290  _BRIGHTNESS_CHECKS = (0.0,
1291                        _VALIDATE_LIGHTING_THRESH-0.01,
1292                        _VALIDATE_LIGHTING_THRESH,
1293                        _VALIDATE_LIGHTING_THRESH+0.01,
1294                        1.0)
1295  _TEST_IMG_W = 640
1296  _TEST_IMG_H = 480
1297
1298  def _generate_test_image(self, brightness):
1299    """Creates a Y plane array with pixel values of brightness.
1300
1301    Args:
1302      brightness: float between [0.0, 1.0]
1303
1304    Returns:
1305      Y plane array with elements of value brightness
1306    """
1307    test_image = numpy.zeros((self._TEST_IMG_W, self._TEST_IMG_H, 1),
1308                             dtype=float)
1309    test_image.fill(brightness)
1310    return test_image
1311
1312  def test_validate_lighting(self):
1313    """Tests validate_lighting() works correctly."""
1314    # Run with different brightnesses to validate.
1315    for brightness in self._BRIGHTNESS_CHECKS:
1316      logging.debug('Testing validate_lighting with brightness %.1f',
1317                    brightness)
1318      test_image = self._generate_test_image(brightness)
1319      print(f'Testing brightness: {brightness}')
1320      if brightness <= _VALIDATE_LIGHTING_THRESH:
1321        self.assertRaises(
1322            AssertionError, validate_lighting, test_image, 'unittest')
1323      else:
1324        self.assertTrue(validate_lighting(test_image, 'unittest'),
1325                        f'image value {brightness} should PASS')
1326
1327
1328if __name__ == '__main__':
1329  unittest.main()
1330