1import cStringIO 2 3from autotest_lib.client.common_lib.cros import textfsm 4from autotest_lib.client.common_lib.cros.cfm.usb import usb_device 5 6 7class UsbDeviceCollector(object): 8 """Utility class for obtaining info about connected USB devices.""" 9 10 USB_DEVICES_TEMPLATE = ( 11 'Value Required Vendor ([0-9a-fA-F]+)\n' 12 'Value Required ProdID ([0-9A-Fa-f]+)\n' 13 'Value Required prev ([0-9a-fA-Z.]+)\n' 14 'Value Required Bus ([0-9.]+)\n' 15 'Value Required Port ([0-9.]+)\n' 16 'Value Required Lev ([0-9.]+)\n' 17 'Value Required Dev ([0-9.]+)\n' 18 'Value Required Prnt ([0-9.]+)\n' 19 'Value Manufacturer (.+)\n' 20 'Value Product (.+)\n' 21 'Value serialnumber ([0-9a-fA-Z\:\-]+)\n' 22 'Value cinterfaces (\d)\n' 23 'Value List intindex ([0-9])\n' 24 'Value List intdriver ([A-Za-z-\(\)]+)\n\n' 25 'Start\n' 26 ' ^USB-Device -> Continue.Record\n' 27 ' ^T:\s+Bus=${Bus}\s+Lev=${Lev}\s+Prnt=${Prnt}' 28 '\s+Port=${Port}.*Dev#=\s*${Dev}.*\n' 29 ' ^P:\s+Vendor=${Vendor}\s+ProdID=${ProdID}\sRev=${prev}\n' 30 ' ^S:\s+Manufacturer=${Manufacturer}\n' 31 ' ^S:\s+Product=${Product}\n' 32 ' ^S:\s+SerialNumber=${serialnumber}\n' 33 ' ^C:\s+\#Ifs=\s+${cinterfaces}\n' 34 ' ^I:\s+If\#=\s+${intindex}.*Driver=${intdriver}\n' 35 ) 36 37 def __init__(self, host): 38 """ 39 Constructor 40 @param host the DUT. 41 """ 42 self._host = host 43 44 def _extract_usb_data(self, rawdata): 45 """ 46 Populate usb data into a list of dictionaries. 47 @param rawdata The output of "usb-devices" on CfM. 48 @returns list of dictionary, example dictionary: 49 {'Manufacturer': 'USBest Technology', 50 'Product': 'SiS HID Touch Controller', 51 'Vendor': '266e', 52 'intindex': ['0'], 53 'tport': '00', 54 'tcnt': '01', 55 'serialnumber': '', 56 'tlev': '03', 57 'tdev': '18', 58 'dver': '', 59 'intdriver': ['usbhid'], 60 'tbus': '01', 61 'prev': '03.00', 62 'cinterfaces': '1', 63 'ProdID': '0110', 64 'tprnt': '14'} 65 """ 66 usbdata = [] 67 rawdata += '\n' 68 re_table = textfsm.TextFSM(cStringIO.StringIO(self.USB_DEVICES_TEMPLATE)) 69 fsm_results = re_table.ParseText(rawdata) 70 usbdata = [dict(zip(re_table.header, row)) for row in fsm_results] 71 return usbdata 72 73 def _collect_usb_device_data(self): 74 """Collecting usb device data.""" 75 usb_devices = (self._host.run('usb-devices', ignore_status=True). 76 stdout.strip().split('\n\n')) 77 return self._extract_usb_data( 78 '\nUSB-Device\n'+'\nUSB-Device\n'.join(usb_devices)) 79 80 81 def _create_usb_device(self, usbdata): 82 return usb_device.UsbDevice( 83 vid=usbdata['Vendor'], 84 pid=usbdata['ProdID'], 85 product=usbdata.get('Product', 'Not available'), 86 interfaces=usbdata['intdriver'], 87 bus=int(usbdata['Bus']), 88 level=int(usbdata['Lev']), 89 # We increment here by 1 because usb-devices reports 0-indexed port 90 # numbers where as lsusb reports 1-indexed. We opted to follow the 91 # the lsusb standard. 92 port=int(usbdata['Port']) + 1) 93 94 def get_usb_devices(self): 95 """ 96 Returns the list of UsbDevices connected to the DUT. 97 @returns A list of UsbDevice instances. 98 """ 99 usbdata = self._collect_usb_device_data() 100 data_and_devices = [] 101 for data in usbdata: 102 usb_device = self._create_usb_device(data) 103 data_and_devices.append((data, usb_device)) 104 # Make a pass to populate parents of the UsbDevices. 105 # We need parent ID and Device ID from the raw data since we do not 106 # care about storing those in a UsbDevice. That's why we bother 107 # iterating through the (data,UsbDevice) pairs. 108 for data, usb_device in data_and_devices: 109 parent_id = int(data['Prnt']) 110 bus = usb_device.bus 111 # Device IDs are not unique across busses. When finding a device's 112 # parent we look for a device with the parent ID on the same bus. 113 usb_device.parent = self._find_device_on_same_bus( 114 data_and_devices, parent_id, bus) 115 return [x[1] for x in data_and_devices] 116 117 def _find_device_on_same_bus(self, data_and_devices, device_id, bus): 118 for data, usb_device in data_and_devices: 119 if int(data['Dev']) == device_id and usb_device.bus == bus: 120 return usb_device 121 return None 122 123 def get_devices_by_spec(self, *specs): 124 """ 125 Returns all UsbDevices that match the any of the given specs. 126 @param specs instances of UsbDeviceSpec. 127 @returns a list UsbDevice instances. 128 """ 129 spec_vid_pids = [spec.vid_pid for spec in specs] 130 return [d for d in self.get_usb_devices() 131 if d.vid_pid in spec_vid_pids] 132