1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, threading, time
6
7from autotest_lib.server import autotest, test
8from autotest_lib.client.common_lib import error
9
10_WAIT_DELAY = 10
11_LONG_TIMEOUT = 60
12_WAKE_PRESS_IN_SEC=0.2
13
14class platform_ExternalUSBStress(test.test):
15    """Uses servo to repeatedly connect/remove USB devices."""
16    version = 1
17
18    def run_once(self, host, client_autotest, repeat, network_debug):
19        self.has_lid = True
20
21        # Check if DUT has lid.
22        if host.servo.get('lid_open') == 'not_applicable':
23            self.has_lid = False
24        else:
25            # Check if
26            host.servo.lid_open()
27            if host.servo.get('lid_open') != 'yes':
28                raise error.TestError('SERVO has a bad lid_open control')
29
30        autotest_client = autotest.Autotest(host)
31        diff_list = []
32        off_list = []
33        # The servo hubs come up as diffs in connected components. These
34        # should be ignored for this test.
35        servo_hardware_prefix = 'Standard Microsystems Corp.'
36        self.is_suspended = False
37
38        def strip_lsusb_output(lsusb_output):
39            """Finds the external USB devices plugged
40
41            @param lsusb_output: lsusb command output to parse
42
43            @returns plugged_list: List of plugged usb devices names
44
45            """
46            items = lsusb_output.split('\n')
47            named_list = []
48            unnamed_device_count = 0
49            for item in items:
50                columns = item.split(' ')
51                if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0:
52                    logging.debug('Unnamed device located, adding generic name.')
53                    name = 'Unnamed device %d' % unnamed_device_count
54                    unnamed_device_count += 1
55                else:
56                    name = ' '.join(columns[6:]).strip()
57                if not name.startswith(servo_hardware_prefix):
58                    named_list.append(name)
59            return named_list
60
61
62        def set_hub_power(on=True):
63            """Turns on or off the USB hub (dut_hub1_rst1).
64
65            @param on: To power on the servo-usb hub or not
66
67            @returns usb devices list if not suspended, None if suspended
68            """
69            reset = 'off'
70            if not on:
71                reset = 'on'
72            host.servo.set('dut_hub1_rst1', reset)
73            time.sleep(_WAIT_DELAY)
74
75
76        def wait_to_detect(timeout=_LONG_TIMEOUT):
77            """Waits till timeout for set of peripherals in lsusb output.
78
79            @param timeout: timeout in seconds
80
81            @raise error.TestFail: if timeout is reached
82
83            """
84            start_time = int(time.time())
85            while True:
86                connected = strip_lsusb_output(host.run('lsusb').stdout.strip())
87                if diff_list.issubset(connected):
88                    break
89                elif int(time.time()) - start_time > timeout:
90                    raise error.TestFail('USB peripherals not detected: %s' %
91                                          str(diff_list.difference(connected)))
92                time.sleep(1)
93
94
95        def test_suspend(plugged_before_suspended=False,
96                         plugged_before_resume=False):
97            """Close and open lid while different USB plug status.
98
99            @param plugged_before_suspended: USB plugged before suspended
100            @param plugged_before_resume: USB plugged after suspended
101
102
103            @raise error.TestFail: if USB peripherals do not match expectations.
104
105            """
106            set_hub_power(plugged_before_suspended)
107
108            # Suspend
109            boot_id = host.get_boot_id()
110            if self.has_lid:
111                host.servo.lid_close()
112            else:
113                thread = threading.Thread(target = host.suspend)
114                thread.start()
115            host.test_wait_for_sleep(_LONG_TIMEOUT)
116            logging.debug(' --DUT suspended')
117            self.is_suspended = True
118
119            if plugged_before_resume is not plugged_before_suspended:
120                set_hub_power(plugged_before_resume)
121
122            # Resume
123            if self.has_lid:
124                host.servo.lid_open()
125            else:
126                host.servo.power_key(_WAKE_PRESS_IN_SEC)
127            host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
128            logging.debug(' --DUT resumed')
129            self.is_suspended = False
130
131            if not plugged_before_resume:
132                time.sleep(_WAIT_DELAY)
133                connected = strip_lsusb_output(host.run('lsusb').stdout.strip())
134                if connected != off_list:
135                    raise error.TestFail('Devices were not removed on wake.')
136            else:
137                wait_to_detect(_LONG_TIMEOUT)
138
139
140        def test_hotplug():
141            """Testing unplug-plug and check for expected peripherals.
142
143             @raise error.TestFail: if USB peripherals do not match expectations.
144
145            """
146            set_hub_power(False)
147            set_hub_power(True)
148            wait_to_detect(_LONG_TIMEOUT)
149
150
151        def stress_external_usb():
152            """Test procedures in one iteration."""
153
154            # Unplug/plug
155            test_hotplug()
156
157            # Suspend/resume as unplugged
158            test_suspend()
159
160            # Plug/close_lid/unplug/open_lid
161            test_suspend(plugged_before_suspended=True)
162
163            #Unplug/close_lid/plug/open_lid
164            test_suspend(plugged_before_resume=True)
165
166            # Suspend/resume as plugged
167            test_suspend(plugged_before_suspended=True,
168                         plugged_before_resume=True)
169
170
171        host.servo.switch_usbkey('dut')
172
173        # There are some mice that need the data and power connection to both
174        # be removed, otherwise they won't come back up.  This means that the
175        # external devices should only use the usb connections labeled:
176        # USB_KEY and DUT_HUB1_USB.
177        set_hub_power(False)
178        time.sleep(_WAIT_DELAY)
179        off_list = strip_lsusb_output(host.run('lsusb').stdout.strip())
180        set_hub_power(True)
181        time.sleep(_WAIT_DELAY * 2)
182        connected = strip_lsusb_output(host.run('lsusb').stdout.strip())
183        diff_list = set(connected).difference(set(off_list))
184        if len(diff_list) == 0:
185            raise error.TestError('No connected devices were detected.  Make '
186                                  'sure the devices are connected to USB_KEY '
187                                  'and DUT_HUB1_USB on the servo board.')
188        logging.debug('Connected devices list: %s', diff_list)
189
190        autotest_client.run_test(client_autotest,
191                                 exit_without_logout=True)
192        for iteration in xrange(1, repeat + 1):
193            logging.debug('---Iteration %d/%d' % (iteration, repeat))
194            stress_external_usb()
195