1import cStringIO
2
3from autotest_lib.client.common_lib.cros import textfsm
4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device
5
6
7class UsbDeviceCollector(object):
8    """Utility class for obtaining info about connected USB devices."""
9
10    USB_DEVICES_TEMPLATE = (
11        'Value Required Vendor ([0-9a-fA-F]+)\n'
12        'Value Required ProdID ([0-9A-Fa-f]+)\n'
13        'Value Required prev ([0-9a-fA-Z.]+)\n'
14        'Value Required Bus ([0-9.]+)\n'
15        'Value Required Port ([0-9.]+)\n'
16        'Value Required Lev ([0-9.]+)\n'
17        'Value Required Dev ([0-9.]+)\n'
18        'Value Required Prnt ([0-9.]+)\n'
19        'Value Manufacturer (.+)\n'
20        'Value Product (.+)\n'
21        'Value serialnumber ([0-9a-fA-Z\:\-]+)\n'
22        'Value cinterfaces (\d)\n'
23        'Value List intindex ([0-9])\n'
24        'Value List intdriver ([A-Za-z-\(\)]+)\n\n'
25        'Start\n'
26        '  ^USB-Device -> Continue.Record\n'
27        '  ^T:\s+Bus=${Bus}\s+Lev=${Lev}\s+Prnt=${Prnt}'
28        '\s+Port=${Port}.*Dev#=\s*${Dev}.*\n'
29        '  ^P:\s+Vendor=${Vendor}\s+ProdID=${ProdID}\sRev=${prev}\n'
30        '  ^S:\s+Manufacturer=${Manufacturer}\n'
31        '  ^S:\s+Product=${Product}\n'
32        '  ^S:\s+SerialNumber=${serialnumber}\n'
33        '  ^C:\s+\#Ifs=\s+${cinterfaces}\n'
34        '  ^I:\s+If\#=\s+${intindex}.*Driver=${intdriver}\n'
35    )
36
37    def __init__(self, host):
38        """
39        Constructor
40        @param host the DUT.
41        """
42        self._host = host
43
44    def _extract_usb_data(self, rawdata):
45      """
46      Populate usb data into a list of dictionaries.
47      @param rawdata The output of "usb-devices" on CfM.
48      @returns list of dictionary, example dictionary:
49      {'Manufacturer': 'USBest Technology',
50      'Product': 'SiS HID Touch Controller',
51      'Vendor': '266e',
52      'intindex': ['0'],
53      'tport': '00',
54      'tcnt': '01',
55      'serialnumber': '',
56      'tlev': '03',
57      'tdev': '18',
58      'dver': '',
59      'intdriver': ['usbhid'],
60      'tbus': '01',
61      'prev': '03.00',
62      'cinterfaces': '1',
63      'ProdID': '0110',
64      'tprnt': '14'}
65      """
66      usbdata = []
67      rawdata += '\n'
68      re_table = textfsm.TextFSM(cStringIO.StringIO(self.USB_DEVICES_TEMPLATE))
69      fsm_results = re_table.ParseText(rawdata)
70      usbdata = [dict(zip(re_table.header, row)) for row in fsm_results]
71      return usbdata
72
73    def _collect_usb_device_data(self):
74        """Collecting usb device data."""
75        usb_devices = (self._host.run('usb-devices', ignore_status=True).
76                       stdout.strip().split('\n\n'))
77        return self._extract_usb_data(
78            '\nUSB-Device\n'+'\nUSB-Device\n'.join(usb_devices))
79
80
81    def _create_usb_device(self, usbdata):
82        return usb_device.UsbDevice(
83            vid=usbdata['Vendor'],
84            pid=usbdata['ProdID'],
85            product=usbdata.get('Product', 'Not available'),
86            interfaces=usbdata['intdriver'],
87            bus=int(usbdata['Bus']),
88            level=int(usbdata['Lev']),
89            # We increment here by 1 because usb-devices reports 0-indexed port
90            # numbers where as lsusb reports 1-indexed. We opted to follow the
91            # the lsusb standard.
92            port=int(usbdata['Port']) + 1)
93
94    def get_usb_devices(self):
95        """
96        Returns the list of UsbDevices connected to the DUT.
97        @returns A list of UsbDevice instances.
98        """
99        usbdata = self._collect_usb_device_data()
100        data_and_devices = []
101        for data in usbdata:
102            usb_device = self._create_usb_device(data)
103            data_and_devices.append((data, usb_device))
104        # Make a pass to populate parents of the UsbDevices.
105        # We need parent ID and Device ID from the raw data since we do not
106        # care about storing those in a UsbDevice. That's why we bother
107        # iterating through the (data,UsbDevice) pairs.
108        for data, usb_device in data_and_devices:
109            parent_id = int(data['Prnt'])
110            bus = usb_device.bus
111            # Device IDs are not unique across busses. When finding a device's
112            # parent we look for a device with the parent ID on the same bus.
113            usb_device.parent = self._find_device_on_same_bus(
114                    data_and_devices, parent_id, bus)
115        return [x[1] for x in data_and_devices]
116
117    def _find_device_on_same_bus(self, data_and_devices, device_id, bus):
118        for data, usb_device in data_and_devices:
119            if int(data['Dev']) == device_id and usb_device.bus == bus:
120                return usb_device
121        return None
122
123    def get_devices_by_spec(self, *specs):
124        """
125        Returns all UsbDevices that match the any of the given specs.
126        @param specs instances of UsbDeviceSpec.
127        @returns a list UsbDevice instances.
128        """
129        spec_vid_pids = [spec.vid_pid for spec in specs]
130        return [d for d in self.get_usb_devices()
131                if d.vid_pid in spec_vid_pids]
132