1#!/usr/bin/python 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""This is a module to scan /sys/block/ virtual FS, query udev 8 9It provides a list of all removable or USB devices connected to the machine on 10which the module is running. 11It can be used from command line or from a python script. 12 13To use it as python module it's enough to call the get_all() function. 14@see |get_all| documentation for the output format 15|get_all()| output is human readable (as oppposite to python's data structures) 16""" 17 18import logging, os, re 19 20# this script can be run at command line on DUT (ie /usr/local/autotest 21# contains only the client/ subtree), on a normal autotest 22# installation/repository or as a python module used on a client-side test. 23import common 24from autotest_lib.client.common_lib import utils 25 26INFO_PATH = "/sys/block" 27UDEV_CMD_FOR_SERIAL_NUMBER = "udevadm info -a -n %s | grep -iE 'ATTRS{" \ 28 "serial}' | head -n 1" 29LSUSB_CMD = "lsusb -v | grep -iE '^Device Desc|bcdUSB|iSerial'" 30DESC_PATTERN = r'Device Descriptor:' 31BCDUSB_PATTERN = r'bcdUSB\s+(\d+\.\d+)' 32ISERIAL_PATTERN = r'iSerial\s+\d\s(\S*)' 33UDEV_SERIAL_PATTERN = r'=="(.*)"' 34 35 36def read_file(path_to_file, host=None): 37 """Reads the file and returns the file content 38 @param path_to_file: Full path to the file 39 @param host: DUT object 40 @return: Returns the content of file 41 """ 42 if host: 43 if not host.path_exists(path_to_file): 44 raise error.TestError("No such file or directory %s" % path_to_file) 45 return host.run('cat %s' % path_to_file).stdout.strip() 46 47 if not os.path.isfile(path_to_file): 48 raise error.TestError("No such file or directory %s" % path_to_file) 49 return utils.read_file(path_to_file) 50 51 52def system_output(command, host=None, ignore_status=False): 53 """Executes command on client 54 55 @param host: DUT object 56 @param command: command to execute 57 @return: output of command 58 """ 59 if host: 60 return host.run(command, ignore_status=ignore_status).stdout.strip() 61 62 return utils.system_output(command, ignore_status=ignore_status) 63 64 65def get_udev_info(blockdev, method='udev', host=None): 66 """Get information about |blockdev| 67 68 @param blockdev: a block device, e.g., /dev/sda1 or /dev/sda 69 @param method: either 'udev' (default) or 'blkid' 70 @param host: DUT object 71 72 @return a dictionary with two or more of the followig keys: 73 "ID_BUS", "ID_MODEL": always present 74 "ID_FS_UUID", "ID_FS_TYPE", "ID_FS_LABEL": present only if those info 75 are meaningul and present for the queried device 76 """ 77 ret = {} 78 cmd = None 79 ignore_status = False 80 81 if method == "udev": 82 cmd = "udevadm info --name %s --query=property" % blockdev 83 elif method == "blkid": 84 # this script is run as root in a normal autotest run, 85 # so this works: It doesn't have access to the necessary info 86 # when run as a non-privileged user 87 cmd = "blkid -c /dev/null -o udev %s" % blockdev 88 ignore_status = True 89 90 if cmd: 91 output = system_output(cmd, host, ignore_status=ignore_status) 92 93 udev_keys = ("ID_BUS", "ID_MODEL", "ID_FS_UUID", "ID_FS_TYPE", 94 "ID_FS_LABEL") 95 for line in output.splitlines(): 96 udev_key, udev_val = line.split('=') 97 98 if udev_key in udev_keys: 99 ret[udev_key] = udev_val 100 101 return ret 102 103 104def get_lsusb_info(host=None): 105 """Get lsusb info in list format 106 107 @param host: DUT object 108 @return: Returns lsusb output in list format 109 """ 110 111 usb_info_list = [] 112 # Getting the USB type and Serial number info using 'lsusb -v'. Sample 113 # output is shown in below 114 # Device Descriptor: 115 # bcdUSB 2.00 116 # iSerial 3 131BC7 117 # bcdUSB 2.00 118 # Device Descriptor: 119 # bcdUSB 2.10 120 # iSerial 3 001A4D5E8634B03169273995 121 122 lsusb_output = system_output(LSUSB_CMD, host) 123 # we are parsing each line and getting the usb info 124 for line in lsusb_output.splitlines(): 125 desc_matched = re.search(DESC_PATTERN, line) 126 bcdusb_matched = re.search(BCDUSB_PATTERN, line) 127 iserial_matched = re.search(ISERIAL_PATTERN, line) 128 if desc_matched: 129 usb_info = {} 130 elif bcdusb_matched: 131 # bcdUSB may appear multiple time. Drop the remaining. 132 usb_info['bcdUSB'] = bcdusb_matched.group(1) 133 elif iserial_matched: 134 usb_info['iSerial'] = iserial_matched.group(1) 135 usb_info_list.append(usb_info) 136 logging.debug('lsusb output is %s', usb_info_list) 137 return usb_info_list 138 139 140def get_usbdevice_type_and_serial(device, lsusb_info, host=None): 141 """Get USB device type and Serial number 142 143 @param device: USB device mount point Example: /dev/sda or /dev/sdb 144 @param lsusb_info: lsusb info 145 @param host: DUT object 146 @return: Returns the information about USB type and the serial number 147 of the device 148 """ 149 150 # Comparing the lsusb serial number with udev output serial number 151 # Both serial numbers should be same. Sample udev command output is 152 # shown in below. 153 # ATTRS{serial}=="001A4D5E8634B03169273995" 154 udev_serial_output = system_output(UDEV_CMD_FOR_SERIAL_NUMBER % device, 155 host) 156 udev_serial_matched = re.search(UDEV_SERIAL_PATTERN, udev_serial_output) 157 if udev_serial_matched: 158 udev_serial = udev_serial_matched.group(1) 159 logging.debug("udev serial number is %s", udev_serial) 160 for usb_details in lsusb_info: 161 if usb_details['iSerial'] == udev_serial: 162 return usb_details.get('bcdUSB'), udev_serial 163 return None, None 164 165def get_partition_info(part_path, bus, model, partid=None, fstype=None, 166 label=None, block_size=0, is_removable=False, 167 lsusb_info=[], host=None): 168 """Return information about a device as a list of dictionaries 169 170 Normally a single device described by the passed parameters will match a 171 single device on the system, and thus a single element list as return 172 value; although it's possible that a single block device is associated with 173 several mountpoints, this scenario will lead to a dictionary for each 174 mountpoint. 175 176 @param part_path: full partition path under |INFO_PATH| 177 e.g., /sys/block/sda or /sys/block/sda/sda1 178 @param bus: bus, e.g., 'usb' or 'ata', according to udev 179 @param model: device moduel, e.g., according to udev 180 @param partid: partition id, if present 181 @param fstype: filesystem type, if present 182 @param label: filesystem label, if present 183 @param block_size: filesystem block size 184 @param is_removable: whether it is a removable device 185 @param host: DUT object 186 @param lsusb_info: lsusb info 187 188 @return a list of dictionaries contaning each a partition info. 189 An empty list can be returned if no matching device is found 190 """ 191 ret = [] 192 # take the partitioned device name from the /sys/block/ path name 193 part = part_path.split('/')[-1] 194 device = "/dev/%s" % part 195 196 if not partid: 197 info = get_udev_info(device, "blkid", host=host) 198 partid = info.get('ID_FS_UUID', None) 199 if not fstype: 200 fstype = info.get('ID_FS_TYPE', None) 201 if not label: 202 label = partid 203 204 readonly = read_file("%s/ro" % part_path, host) 205 if not int(readonly): 206 partition_blocks = read_file("%s/size" % part_path, host) 207 size = block_size * int(partition_blocks) 208 209 stub = {} 210 stub['device'] = device 211 stub['bus'] = bus 212 stub['model'] = model 213 stub['size'] = size 214 215 # look for it among the mounted devices first 216 mounts = read_file("/proc/mounts", host).splitlines() 217 seen = False 218 for line in mounts: 219 dev, mount, proc_fstype, flags = line.split(' ', 3) 220 221 if device == dev: 222 if 'rw' in flags.split(','): 223 seen = True # at least one match occurred 224 225 # Sorround mountpoint with quotes, to make it parsable in 226 # case of spaces. Also information retrieved from 227 # /proc/mount override the udev passed ones (e.g., 228 # proc_fstype instead of fstype) 229 dev = stub.copy() 230 dev['fs_uuid'] = partid 231 dev['fstype'] = proc_fstype 232 dev['is_mounted'] = True 233 # When USB device is mounted automatically after login a 234 # non-labelled drive is mounted to: 235 # '/media/removable/USB Drive' 236 # Here an octal unicode '\040' is added to the path 237 # replacing ' ' (space). 238 # Following '.decode('unicode-escape')' handles the same 239 dev['mountpoint'] = mount.decode('unicode-escape') 240 dev['usb_type'], dev['serial'] = \ 241 get_usbdevice_type_and_serial(dev['device'], 242 lsusb_info=lsusb_info, 243 host=host) 244 ret.append(dev) 245 246 # If not among mounted devices, it's just attached, print about the 247 # same information but suggest a place where the user can mount the 248 # device instead 249 if not seen: 250 # we consider it if it's removable and and a partition id 251 # OR it's on the USB bus or ATA bus. 252 # Some USB HD do not get announced as removable, but they should be 253 # showed. 254 # There are good changes that if it's on a USB bus it's removable 255 # and thus interesting for us, independently whether it's declared 256 # removable 257 if (is_removable and partid) or bus in ['usb', 'ata']: 258 if not label: 259 info = get_udev_info(device, 'blkid', host=host) 260 label = info.get('ID_FS_LABEL', partid) 261 262 dev = stub.copy() 263 dev['fs_uuid'] = partid 264 dev['fstype'] = fstype 265 dev['is_mounted'] = False 266 dev['mountpoint'] = "/media/removable/%s" % label 267 dev['usb_type'], dev['serial'] = \ 268 get_usbdevice_type_and_serial(dev['device'], 269 lsusb_info=lsusb_info, 270 host=host) 271 ret.append(dev) 272 return ret 273 274 275def get_device_info(blockdev, lsusb_info, host=None): 276 """Retrieve information about |blockdev| 277 278 @see |get_partition_info()| doc for the dictionary format 279 280 @param blockdev: a block device name, e.g., "sda". 281 @param host: DUT object 282 @param lsusb_info: lsusb info 283 @return a list of dictionary, with each item representing a found device 284 """ 285 ret = [] 286 287 spath = "%s/%s" % (INFO_PATH, blockdev) 288 block_size = int(read_file("%s/queue/physical_block_size" % spath, 289 host)) 290 is_removable = bool(int(read_file("%s/removable" % spath, host))) 291 292 info = get_udev_info(blockdev, "udev", host=host) 293 dev_bus = info['ID_BUS'] 294 dev_model = info['ID_MODEL'] 295 dev_fs = info.get('ID_FS_TYPE', None) 296 dev_uuid = info.get('ID_FS_UUID', None) 297 dev_label = info.get('ID_FS_LABEL', dev_uuid) 298 299 has_partitions = False 300 for basename in system_output('ls %s' % spath, host).splitlines(): 301 partition_path = "%s/%s" % (spath, basename) 302 # we want to check if within |spath| there are subdevices with 303 # partitions 304 # e.g., if within /sys/block/sda sda1 and other partition are present 305 if not re.match("%s[0-9]+" % blockdev, basename): 306 continue # ignore what is not a subdevice 307 308 # |blockdev| has subdevices: get info for them 309 has_partitions = True 310 devs = get_partition_info(partition_path, dev_bus, dev_model, 311 block_size=block_size, 312 is_removable=is_removable, 313 lsusb_info=lsusb_info, host=host) 314 ret.extend(devs) 315 316 if not has_partitions: 317 devs = get_partition_info(spath, dev_bus, dev_model, dev_uuid, dev_fs, 318 dev_label, block_size=block_size, 319 is_removable=is_removable, 320 lsusb_info=lsusb_info, host=host) 321 ret.extend(devs) 322 323 return ret 324 325 326def get_all(host=None): 327 """Return all removable or USB storage devices attached 328 329 @param host: DUT object 330 @return a list of dictionaries, each list element describing a device 331 """ 332 ret = [] 333 lsusb_info = get_lsusb_info(host) 334 for dev in system_output('ls %s' % INFO_PATH, host).splitlines(): 335 # Among block devices we need to filter out what are virtual 336 if re.match("s[a-z]+", dev): 337 # for each of them try to obtain some info 338 ret.extend(get_device_info(dev, lsusb_info, host=host)) 339 return ret 340 341 342def main(): 343 for device in get_all(): 344 print ("%(device)s %(bus)s %(model)s %(size)d %(fs_uuid)s %(fstype)s " 345 "%(is_mounted)d %(mountpoint)s %(usb_type)s %(serial)s" % 346 device) 347 348 349if __name__ == "__main__": 350 main() 351