1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, time, random
6
7from autotest_lib.client.bin import utils
8from autotest_lib.server import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.usb_mux_controller import USBMuxController
11
12_WAIT_DELAY = 5
13_USB_DIR = '/sys/bus/usb/devices'
14MAX_PORTS = 8
15TMP_FAILED_TEST_LIST = list()
16PORT_BEING_TESTED = list()
17
18class kernel_ExternalUsbPeripheralsDetectionStress(test.test):
19    """Uses USB multiplexer to repeatedly connect and disconnect USB devices."""
20    version = 1
21
22
23    def set_hub_power(self, on=True):
24        """Setting USB hub power status
25
26        @param on: To power on the servo-usb hub or not.
27
28        """
29        reset = 'off'
30        if not on:
31            reset = 'on'
32        self.host.servo.set('dut_hub1_rst1', reset)
33        time.sleep(_WAIT_DELAY)
34
35
36    def check_manufacturer_and_product_info(
37            self, vId, pId, manufacturer, pName):
38        """Check manufacturer and product info from lsusb against dict values.
39
40        @param vId: Vendor id of the connected USB device.
41        @param pId: Product id of the connected USB device.
42        @param manufacturer: Manufacturer name of the connected USB device.
43        @param pName: Product name of the connected USB device
44        @param result: To track test result.
45        @return result value
46
47        """
48        result = True
49        manu_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iManufacturer')
50        prod_cmd = ('lsusb -v -d ' + vId + ':' +  pId + ' | grep iProduct')
51
52        manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True).
53                           stdout.strip())
54        prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True).
55                           stdout.strip())
56
57        manu_verify = 'iManufacturer.*' + manufacturer
58        prod_verify = 'iProduct.*' + pName
59
60        match_result_manu = re.search(manu_verify, manu_cmd_output) != None
61        match_result_prod = re.search(prod_verify, prod_cmd_output) != None
62
63        if not match_result_manu or not match_result_prod:
64            logging.debug('Manufacturer or productName do not match.')
65            result = False
66
67        return result
68
69
70    def check_driver_symlink_and_dir(self, devicePath, productName):
71        """Check driver symlink and dir against devicePath value from dict.
72
73        @param devicePath: Device driver path.
74        @param productName: Product name of the connected USB device.
75        @param result: To track test result.
76        @return result value
77
78        """
79        result = True
80        tmp_list = [device_dir for device_dir in
81                    self.host.run('ls -1 %s' % devicePath,
82                    ignore_status=True).stdout.split('\n')
83                    if re.match(r'\d-\d.*:\d\.\d', device_dir)]
84
85        if not tmp_list:
86            logging.debug('No driver created/loaded for %s', productName)
87            result = False
88
89        flag = False
90        for device_dir in tmp_list:
91            driver_path = os.path.join(devicePath,
92                                       '%s/driver' % device_dir)
93            if self._exists_on(driver_path):
94                flag = True
95                link = (self.host.run('ls -l %s | grep ^l'
96                                      '| grep driver'
97                                      % driver_path, ignore_status=True)
98                                      .stdout.strip())
99                logging.info('%s', link)
100
101        if not flag:
102            logging.debug('Device driver not found')
103            result = False
104
105        return result
106
107
108    def check_usb_peripherals_details(self, connected_device_dict):
109        """Checks USB peripheral details against the values in dictionary.
110
111        @param connected_device_dict: Dictionary of device attributes.
112
113        """
114        result = True
115        usbPort = connected_device_dict['usb_port']
116        PORT_BEING_TESTED.append(usbPort)
117        self.usb_mux.enable_port(usbPort)
118
119        vendorId = connected_device_dict['deviceInfo']['vendorId']
120        productId = connected_device_dict['deviceInfo']['productId']
121        manufacturer = connected_device_dict['deviceInfo']['manufacturer']
122        productName = connected_device_dict['deviceInfo']['productName']
123        lsusbOutput = connected_device_dict['deviceInfo']['lsusb']
124        devicePath = connected_device_dict['deviceInfo']['devicePath']
125
126        try:
127            utils.poll_for_condition(
128                    lambda: self.host.path_exists(devicePath),
129                    exception=utils.TimeoutError('Trouble finding USB device '
130                    'on port %d' % usbPort), timeout=15, sleep_interval=1)
131        except utils.TimeoutError:
132            logging.debug('Trouble finding USB device on port %d', usbPort)
133            result = False
134            pass
135
136        if not ((vendorId + ':' + productId in lsusbOutput) and
137                self.check_manufacturer_and_product_info(
138                vendorId, productId, manufacturer, productName) and
139                self.check_driver_symlink_and_dir(devicePath, productName)):
140            result = False
141
142        self.usb_mux.disable_all_ports()
143        try:
144            utils.poll_for_condition(
145                    lambda: not self.host.path_exists(devicePath),
146                    exception=utils.TimeoutError('Device driver path does not '
147                    'disappear after device is disconnected.'), timeout=15,
148                    sleep_interval=1)
149        except utils.TimeoutError:
150            logging.debug('Device driver path is still present after device is '
151                         'disconnected from the DUT.')
152            result = False
153            pass
154
155        if result is False:
156            logging.debug('Test failed on port %s.', usbPort)
157            TMP_FAILED_TEST_LIST.append(usbPort)
158
159
160    def get_usb_device_dirs(self):
161        """Gets the usb device dirs from _USB_DIR path.
162
163        @returns list with number of device dirs else None
164
165        """
166        usb_dir_list = list()
167        cmd = 'ls -1 %s' % _USB_DIR
168        cmd_output = self.host.run(cmd).stdout.strip().split('\n')
169        for d in cmd_output:
170            usb_dir_list.append(os.path.join(_USB_DIR, d))
171        return usb_dir_list
172
173
174    def parse_device_dir_for_info(self, dir_list, usb_device_dict):
175        """Uses vendorId/device path and to get other device attributes.
176
177        @param dir_list: Complete path of directories.
178        @param usb_device_dict: Dictionary to store device attributes.
179        @returns usb_device_dict with device attributes
180
181        """
182        for d_path in dir_list:
183            file_name = os.path.join(d_path, 'idVendor')
184            if self._exists_on(file_name):
185                vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
186                if vendor_id:
187                    usb_device_dict['deviceInfo']['vendorId'] = vendor_id
188                    usb_device_dict['deviceInfo']['devicePath'] = d_path
189                    usb_device_dict['deviceInfo']['productId'] = (
190                            self.get_product_info(d_path, 'idProduct'))
191                    usb_device_dict['deviceInfo']['productName'] = (
192                            self.get_product_info(d_path, 'product'))
193                    usb_device_dict['deviceInfo']['manufacturer'] = (
194                            self.get_product_info(d_path, 'manufacturer'))
195        return usb_device_dict
196
197
198    def get_product_info(self, directory, prod_string):
199        """Gets the product id, name and manufacturer info from device path.
200
201        @param directory: Driver path for the USB device.
202        @param prod_string: Device attribute string.
203        returns the output of the cat command
204
205        """
206        product_file_name = os.path.join(directory, prod_string)
207        if self._exists_on(product_file_name):
208            return self.host.run('cat %s' % product_file_name).stdout.strip()
209        return None
210
211
212    def _exists_on(self, path):
213        """Checks if file exists on host or not.
214
215        @returns True or False
216        """
217        return self.host.run('ls %s' % path,
218                             ignore_status=True).exit_status == 0
219
220
221    def check_lsusb_diff(self, original_lsusb_output):
222        """Compare LSUSB output for before and after connecting each device.
223
224        @param original_lsusb_output: lsusb output prior to connecting device.
225        @returns the difference between new and old lsusb outputs
226
227        """
228        lsusb_output = (self.host.run('lsusb', ignore_status=True).
229                        stdout.strip().split('\n'))
230        lsusb_diff = (list(set(lsusb_output).
231                      difference(set(original_lsusb_output))))
232        return lsusb_diff
233
234
235    def run_once(self, host, loop_count):
236        """Main function to run the autotest.
237
238        @param host: Host object representing the DUT.
239        @param loop_count: Number of iteration cycles.
240        @raise error.TestFail if one or more USB devices are not detected
241
242        """
243        self.host = host
244        self.usb_mux = USBMuxController(self.host)
245
246        # Make sure all USB ports are disabled prior to starting the test.
247        self.usb_mux.disable_all_ports()
248
249        self.host.servo.switch_usbkey('dut')
250        self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
251
252        self.set_hub_power(False)
253        # Collect the USB devices directories before switching on hub
254        usb_list_dir_off = self.get_usb_device_dirs()
255
256        self.set_hub_power(True)
257        # Collect the USB devices directories after switching on hub
258        usb_list_dir_on = self.get_usb_device_dirs()
259
260        lsusb_original_out = (self.host.run('lsusb', ignore_status=True).
261                                 stdout.strip().split('\n'))
262        list_of_usb_device_dictionaries = list()
263        usb_port = 0
264
265        # Extract connected USB device information and store it in a dict.
266        while usb_port < MAX_PORTS:
267            usb_device_dict = {'usb_port':None,'deviceInfo':
268                    {'devicePath':None,'vendorId':None,'productId':None,
269                    'productName':None,'manufacturer':None,'lsusb':None}}
270            usb_device_dir_list = list()
271            self.usb_mux.enable_port(usb_port)
272            try:
273                utils.poll_for_condition(
274                        lambda: self.check_lsusb_diff(lsusb_original_out),
275                        exception=utils.TimeoutError('No USB device on port '
276                        '%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1)
277            except utils.TimeoutError:
278                logging.debug('No USB device found on port %d', usb_port)
279                pass
280
281            # Maintain list of associated dirs for each connected USB device
282            for device in self.get_usb_device_dirs():
283                if device not in usb_list_dir_on:
284                    usb_device_dir_list.append(device)
285
286            usb_device_dict = self.parse_device_dir_for_info(
287                    usb_device_dir_list, usb_device_dict)
288
289            lsusb_diff = self.check_lsusb_diff(lsusb_original_out)
290            if lsusb_diff:
291                usb_device_dict['usb_port'] = usb_port
292                usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0]
293                list_of_usb_device_dictionaries.append(usb_device_dict)
294
295            self.usb_mux.disable_all_ports()
296            try:
297                utils.poll_for_condition(
298                        lambda: not self.check_lsusb_diff(lsusb_original_out),
299                        exception=utils.TimeoutError('Timed out waiting for '
300                        'USB device to disappear.'), timeout=_WAIT_DELAY,
301                        sleep_interval=1)
302            except utils.TimeoutError:
303                logging.debug('Timed out waiting for USB device to disappear.')
304                pass
305            logging.info('%s', usb_device_dict)
306            usb_port += 1
307
308        if len(list_of_usb_device_dictionaries) == 0:
309            # Fails if no devices detected
310            raise error.TestError('No connected devices were detected. Make '
311                                  'sure the devices are connected to USB_KEY '
312                                  'and DUT_HUB1_USB on the servo board.')
313        logging.info('Connected devices list: %s',
314                      list_of_usb_device_dictionaries)
315
316        # loop_count defines the number of times the USB peripheral details
317        # should be checked and random.choice is used to randomly select one of
318        # the elements from the usb device list specifying the device whose
319        # details should be checked.
320        for i in xrange(loop_count):
321            self.check_usb_peripherals_details(
322                    random.choice(list_of_usb_device_dictionaries))
323
324        logging.info('Sequence of ports tested with random picker: %s',
325                      ', '.join(map(str, PORT_BEING_TESTED)))
326
327        if TMP_FAILED_TEST_LIST:
328            logging.info('Failed to verify devices on following ports: %s',
329                         ', '.join(map(str, TMP_FAILED_TEST_LIST)))
330            raise error.TestFail('Failed to do full device verification on '
331                                 'some ports.')
332