1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, multiprocessing, os, time
6import numpy
7from autotest_lib.client.bin import test
8from autotest_lib.client.cros.camera import camera_utils
9from autotest_lib.client.cros.power import sys_power
10from autotest_lib.client.common_lib import error
11
12try:
13    # HACK: We need to succeed if OpenCV is missing to allow "emerge
14    # autotest-tests" to succeed, as OpenCV is not available in the build
15    # environment. It is available on the target where the test actually runs.
16    import cv2
17except ImportError:
18    pass
19
20
21def async_suspend():
22    try:
23        time.sleep(5) # allow some time to start capturing
24        sys_power.kernel_suspend(10)
25    except:
26        # Any exception will be re-raised in main process, but the stack trace
27        # will be wrong. Log it here with the correct stack trace.
28        logging.exception('suspend raised exception')
29        raise
30
31
32class power_CameraSuspend(test.test):
33    """Test camera before & after suspend."""
34
35    version = 1
36
37    def run_once(self, save_images=False):
38        # open the camera via opencv
39        cam_name, cam_index = camera_utils.find_camera()
40        if cam_index is None:
41            raise error.TestError('no camera found')
42        cam = cv2.VideoCapture(cam_index)
43
44        # kick off async suspend
45        logging.info('starting subprocess to suspend system')
46        pool = multiprocessing.Pool(processes=1)
47        # TODO(spang): Move async suspend to library.
48        result = pool.apply_async(async_suspend)
49
50        # capture images concurrently with suspend
51        capture_start = time.time()
52        logging.info('start capturing at %d', capture_start)
53        image_count = 0
54        resume_count = None
55        last_image = None
56
57        while True:
58            # terminate if we've captured a few frames after resume
59            if result.ready() and resume_count is None:
60                result.get() # reraise exception, if any
61                resume_count = image_count
62                logging.info('suspend task finished')
63            if resume_count is not None and image_count - resume_count >= 10:
64                break
65
66            # capture one frame
67            image_ok, image = cam.read()
68            image_count += 1
69            if not image_ok:
70                logging.error('failed capture at image %d', image_count)
71                raise error.TestFail('image capture failed from %s', cam_name)
72
73            # write image to disk, if requested
74            if save_images and image_count <= 200:
75                path = os.path.join(self.outputdir, '%03d.jpg' % image_count)
76                cv2.imwrite(path, image)
77
78            # verify camera produces a unique image on each capture
79            if last_image is not None and numpy.array_equal(image, last_image):
80                raise error.TestFail('camera produced two identical images')
81            last_image = image
82
83        capture_end = time.time()
84        logging.info('done capturing at %d', capture_end)
85
86        logging.info('captured %d frames in %d seconds',
87                     image_count, capture_end - capture_start)
88