1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7import re
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
11from autotest_lib.server.cros.cfm import cfm_base_test
12
13class enterprise_CFM_PTZStress(cfm_base_test.CfmBaseTest):
14    """
15    Executes the following tests on CFM devices:
16
17       1. Enroll the device and join a meeting.
18       2. During meeting PTZ the camera according to the control file.
19    Verify the following functionalities:
20
21       1. Camera is enumerated.
22       2. Verify PTZ signal is sent to the camera.
23    """
24    version = 1
25
26    def check_camera_enumeration(self, camera_name):
27        """
28        Checks if there is any camera connected to the DUT.
29            If so, return the USB bus number the camera is on.
30        @param camera_name the camera's name under test
31        @returns The USB bus number the camera is on, if there is only
32            1 camera connected, false otherwise.
33        """
34        collector = usb_device_collector.UsbDeviceCollector(self._host)
35        camera = collector.get_devices_by_spec(camera_name)
36        if len(camera) == 1:
37            bus_number = camera[0].bus
38            logging.info('Camera enumerated: {} on bus {}'.
39                format(camera_name,bus_number))
40            return bus_number
41        raise error.TestFail('Camera failed to enumerate')
42
43
44    def dump_usbmon_traffic(self, bus, usb_trace_path):
45        """
46        Start usbmon with specified bus and dump the traffic to file
47        @param bus bus number the camera is on
48        @param usb_trace_path the USB traces file path
49        """
50        cmd = ('cat /sys/kernel/debug/usb/usbmon/{}u > {} &'.
51            format(bus, usb_trace_path))
52        try:
53            self._host.run(cmd, ignore_status = True)
54        except Exception as e:
55            logging.info('Fail to run cmd {}. Error: {}'.
56                format(cmd, str(e)))
57        logging.info('Usbmon traffic dumped to {}'.format(usb_trace_path))
58
59
60    def check_usbmon_traffic(self, usb_trace_path):
61        """
62        Check traces
63        @param usb_trace_path the USB traces file path
64        """
65        cmd = ('cat {} & '.format(usb_trace_path))
66        try:
67            traces = self._host.run_output(cmd, ignore_status = True)
68            if re.search('C Ii', traces) and re.search('S Ii', traces):
69                logging.info('PTZ signal verified')
70            else:
71                raise error.TestFail('PTZ signal did not go through')
72        except Exception as e:
73            logging.info('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
74
75
76    def clean_usb_traces_file(self, usb_trace_path):
77        """
78        Clean traces file
79        @param usb_trace_path the USB traces file path
80        """
81        cmd = ('rm {}'.format(usb_trace_path))
82        try:
83            self._host.run(cmd, ignore_status = True)
84        except Exception as e:
85            raise error.TestFail('Fail to run cmd {}. Error: {}'.format(cmd, str(e)))
86        logging.info('Cleaned up traces in {}'.format(usb_trace_path))
87
88
89    def run_once(self, host, test_config, ptz_motion_sequence):
90        """Runs the test."""
91        self.cfm_facade.wait_for_telemetry_commands()
92        for loop_no in xrange(1, test_config['repeat'] + 1):
93            logging.info('Test Loop : {}'.format(loop_no))
94            bus = self.check_camera_enumeration(test_config['camera'])
95            self.cfm_facade.start_meeting_session()
96            self.dump_usbmon_traffic(bus, test_config['usb_trace_path'])
97            for motion in ptz_motion_sequence:
98                self.cfm_facade.move_camera(motion)
99                time.sleep(test_config['motion_duration'])
100            self.check_usbmon_traffic(test_config['usb_trace_path'])
101            self.cfm_facade.end_meeting_session()
102            self.clean_usb_traces_file(test_config['usb_trace_path'])
103