1#!/usr/bin/python
2# Copyright 2016 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import argparse
7import logging
8import os
9import re
10import sys
11
12if __name__ == '__main__':
13  sys.path.append(
14      os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
15
16from devil.utils import cmd_helper
17from devil.utils import usb_hubs
18from devil.utils import lsusb
19
20logger = logging.getLogger(__name__)
21
22# Note: In the documentation below, "virtual port" refers to the port number
23# as observed by the system (e.g. by usb-devices) and "physical port" refers
24# to the physical numerical label on the physical port e.g. on a USB hub.
25# The mapping between virtual and physical ports is not always the identity
26# (e.g. the port labeled "1" on a USB hub does not always show up as "port 1"
27# when you plug something into it) but, as far as we are aware, the mapping
28# between virtual and physical ports is always the same for a given
29# model of USB hub. When "port number" is referenced without specifying, it
30# means the virtual port number.
31
32
33# Wrapper functions for system commands to get output. These are in wrapper
34# functions so that they can be more easily mocked-out for tests.
35def _GetParsedLSUSBOutput():
36  return lsusb.lsusb()
37
38
39def _GetUSBDevicesOutput():
40  return cmd_helper.GetCmdOutput(['usb-devices'])
41
42
43def _GetTtyUSBInfo(tty_string):
44  cmd = ['udevadm', 'info', '--name=/dev/' + tty_string, '--attribute-walk']
45  return cmd_helper.GetCmdOutput(cmd)
46
47
48def _GetCommList():
49  return cmd_helper.GetCmdOutput('ls /dev', shell=True)
50
51
52def GetTTYList():
53  return [x for x in _GetCommList().splitlines() if 'ttyUSB' in x]
54
55
56# Class to identify nodes in the USB topology. USB topology is organized as
57# a tree.
58class USBNode(object):
59  def __init__(self):
60    self._port_to_node = {}
61
62  @property
63  def desc(self):
64    raise NotImplementedError
65
66  @property
67  def info(self):
68    raise NotImplementedError
69
70  @property
71  def device_num(self):
72    raise NotImplementedError
73
74  @property
75  def bus_num(self):
76    raise NotImplementedError
77
78  def HasPort(self, port):
79    """Determines if this device has a device connected to the given port."""
80    return port in self._port_to_node
81
82  def PortToDevice(self, port):
83    """Gets the device connected to the given port on this device."""
84    return self._port_to_node[port]
85
86  def Display(self, port_chain='', info=False):
87    """Displays information about this node and its descendants.
88
89    Output format is, e.g. 1:3:3:Device 42 (ID 1234:5678 Some Device)
90    meaning that from the bus, if you look at the device connected
91    to port 1, then the device connected to port 3 of that,
92    then the device connected to port 3 of that, you get the device
93    assigned device number 42, which is Some Device. Note that device
94    numbers will be reassigned whenever a connected device is powercycled
95    or reinserted, but port numbers stay the same as long as the device
96    is reinserted back into the same physical port.
97
98    Args:
99      port_chain: [string] Chain of ports from bus to this node (e.g. '2:4:')
100      info: [bool] Whether to display detailed info as well.
101    """
102    raise NotImplementedError
103
104  def AddChild(self, port, device):
105    """Adds child to the device tree.
106
107    Args:
108      port: [int] Port number of the device.
109      device: [USBDeviceNode] Device to add.
110
111    Raises:
112      ValueError: If device already has a child at the given port.
113    """
114    if self.HasPort(port):
115      raise ValueError('Duplicate port number')
116    else:
117      self._port_to_node[port] = device
118
119  def AllNodes(self):
120    """Generator that yields this node and all of its descendants.
121
122    Yields:
123      [USBNode] First this node, then each of its descendants (recursively)
124    """
125    yield self
126    for child_node in self._port_to_node.values():
127      for descendant_node in child_node.AllNodes():
128        yield descendant_node
129
130  def FindDeviceNumber(self, findnum):
131    """Find device with given number in tree
132
133    Searches the portion of the device tree rooted at this node for
134    a device with the given device number.
135
136    Args:
137      findnum: [int] Device number to search for.
138
139    Returns:
140      [USBDeviceNode] Node that is found.
141    """
142    for node in self.AllNodes():
143      if node.device_num == findnum:
144        return node
145    return None
146
147
148class USBDeviceNode(USBNode):
149  def __init__(self, bus_num=0, device_num=0, serial=None, info=None):
150    """Class that represents a device in USB tree.
151
152    Args:
153      bus_num: [int] Bus number that this node is attached to.
154      device_num: [int] Device number of this device (or 0, if this is a bus)
155      serial: [string] Serial number.
156      info: [dict] Map giving detailed device info.
157    """
158    super(USBDeviceNode, self).__init__()
159    self._bus_num = bus_num
160    self._device_num = device_num
161    self._serial = serial
162    self._info = {} if info is None else info
163
164  #override
165  @property
166  def desc(self):
167    return self._info.get('desc')
168
169  #override
170  @property
171  def info(self):
172    return self._info
173
174  #override
175  @property
176  def device_num(self):
177    return self._device_num
178
179  #override
180  @property
181  def bus_num(self):
182    return self._bus_num
183
184  @property
185  def serial(self):
186    return self._serial
187
188  @serial.setter
189  def serial(self, serial):
190    self._serial = serial
191
192  #override
193  def Display(self, port_chain='', info=False):
194    logger.info('%s Device %d (%s)', port_chain, self.device_num, self.desc)
195    if info:
196      logger.info('%s', self.info)
197    for (port, device) in self._port_to_node.iteritems():
198      device.Display('%s%d:' % (port_chain, port), info=info)
199
200
201class USBBusNode(USBNode):
202  def __init__(self, bus_num=0):
203    """Class that represents a node (either a bus or device) in USB tree.
204
205    Args:
206      is_bus: [bool] If true, node is bus; if not, node is device.
207      bus_num: [int] Bus number that this node is attached to.
208      device_num: [int] Device number of this device (or 0, if this is a bus)
209      desc: [string] Short description of device.
210      serial: [string] Serial number.
211      info: [dict] Map giving detailed device info.
212      port_to_dev: [dict(int:USBDeviceNode)]
213          Maps port # to device connected to port.
214    """
215    super(USBBusNode, self).__init__()
216    self._bus_num = bus_num
217
218  #override
219  @property
220  def desc(self):
221    return 'BUS %d' % self._bus_num
222
223  #override
224  @property
225  def info(self):
226    return {}
227
228  #override
229  @property
230  def device_num(self):
231    return -1
232
233  #override
234  @property
235  def bus_num(self):
236    return self._bus_num
237
238  #override
239  def Display(self, port_chain='', info=False):
240    logger.info('=== %s ===', self.desc)
241    for (port, device) in self._port_to_node.iteritems():
242      device.Display('%s%d:' % (port_chain, port), info=info)
243
244
245_T_LINE_REGEX = re.compile(r'T:  Bus=(?P<bus>\d{2}) Lev=(?P<lev>\d{2}) '
246                           r'Prnt=(?P<prnt>\d{2,3}) Port=(?P<port>\d{2}) '
247                           r'Cnt=(?P<cnt>\d{2}) Dev#=(?P<dev>.{3}) .*')
248
249_S_LINE_REGEX = re.compile(r'S:  SerialNumber=(?P<serial>.*)')
250_LSUSB_BUS_DEVICE_RE = re.compile(r'^Bus (\d{3}) Device (\d{3}): (.*)')
251
252
253def GetBusNumberToDeviceTreeMap(fast=True):
254  """Gets devices currently attached.
255
256  Args:
257    fast [bool]: whether to do it fast (only get description, not
258    the whole dictionary, from lsusb)
259
260  Returns:
261    map of {bus number: bus object}
262    where the bus object has all the devices attached to it in a tree.
263  """
264  if fast:
265    info_map = {}
266    for line in lsusb.raw_lsusb().splitlines():
267      match = _LSUSB_BUS_DEVICE_RE.match(line)
268      if match:
269        info_map[(int(match.group(1)), int(match.group(2)))] = ({
270            'desc': match.group(3)
271        })
272  else:
273    info_map = {((int(line['bus']), int(line['device']))): line
274                for line in _GetParsedLSUSBOutput()}
275
276  tree = {}
277  bus_num = -1
278  for line in _GetUSBDevicesOutput().splitlines():
279    match = _T_LINE_REGEX.match(line)
280    if match:
281      bus_num = int(match.group('bus'))
282      parent_num = int(match.group('prnt'))
283      # usb-devices starts counting ports from 0, so add 1
284      port_num = int(match.group('port')) + 1
285      device_num = int(match.group('dev'))
286
287      # create new bus if necessary
288      if bus_num not in tree:
289        tree[bus_num] = USBBusNode(bus_num=bus_num)
290
291      # create the new device
292      new_device = USBDeviceNode(
293          bus_num=bus_num,
294          device_num=device_num,
295          info=info_map.get((bus_num, device_num), {'desc': 'NOT AVAILABLE'}))
296
297      # add device to bus
298      if parent_num != 0:
299        tree[bus_num].FindDeviceNumber(parent_num).AddChild(
300            port_num, new_device)
301      else:
302        tree[bus_num].AddChild(port_num, new_device)
303
304    match = _S_LINE_REGEX.match(line)
305    if match:
306      if bus_num == -1:
307        raise ValueError('S line appears before T line in input file')
308      # put the serial number in the device
309      tree[bus_num].FindDeviceNumber(device_num).serial = match.group('serial')
310
311  return tree
312
313
314def GetHubsOnBus(bus, hub_types):
315  """Scans for all hubs on a bus of given hub types.
316
317  Args:
318    bus: [USBNode] Bus object.
319    hub_types: [iterable(usb_hubs.HubType)] Possible types of hubs.
320
321  Yields:
322    Sequence of tuples representing (hub, type of hub)
323  """
324  for device in bus.AllNodes():
325    for hub_type in hub_types:
326      if hub_type.IsType(device):
327        yield (device, hub_type)
328
329
330def GetPhysicalPortToNodeMap(hub, hub_type):
331  """Gets physical-port:node mapping for a given hub.
332  Args:
333    hub: [USBNode] Hub to get map for.
334    hub_type: [usb_hubs.HubType] Which type of hub it is.
335
336  Returns:
337    Dict of {physical port: node}
338  """
339  port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
340  return {port: device for (port, device) in port_device}
341
342
343def GetPhysicalPortToBusDeviceMap(hub, hub_type):
344  """Gets physical-port:(bus#, device#) mapping for a given hub.
345  Args:
346    hub: [USBNode] Hub to get map for.
347    hub_type: [usb_hubs.HubType] Which type of hub it is.
348
349  Returns:
350    Dict of {physical port: (bus number, device number)}
351  """
352  port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
353  return {
354      port: (device.bus_num, device.device_num)
355      for (port, device) in port_device
356  }
357
358
359def GetPhysicalPortToSerialMap(hub, hub_type):
360  """Gets physical-port:serial# mapping for a given hub.
361
362  Args:
363    hub: [USBNode] Hub to get map for.
364    hub_type: [usb_hubs.HubType] Which type of hub it is.
365
366  Returns:
367    Dict of {physical port: serial number)}
368  """
369  port_device = hub_type.GetPhysicalPortToNodeTuples(hub)
370  return {
371      port: device.serial
372      for (port, device) in port_device if device.serial
373  }
374
375
376def GetPhysicalPortToTTYMap(device, hub_type):
377  """Gets physical-port:tty-string mapping for a given hub.
378  Args:
379    hub: [USBNode] Hub to get map for.
380    hub_type: [usb_hubs.HubType] Which type of hub it is.
381
382  Returns:
383    Dict of {physical port: tty-string)}
384  """
385  port_device = hub_type.GetPhysicalPortToNodeTuples(device)
386  bus_device_to_tty = GetBusDeviceToTTYMap()
387  return {
388      port: bus_device_to_tty[(device.bus_num, device.device_num)]
389      for (port, device) in port_device
390      if (device.bus_num, device.device_num) in bus_device_to_tty
391  }
392
393
394def CollectHubMaps(hub_types, map_func, device_tree_map=None, fast=False):
395  """Runs a function on all hubs in the system and collects their output.
396
397  Args:
398    hub_types: [usb_hubs.HubType] List of possible hub types.
399    map_func: [string] Function to run on each hub.
400    device_tree: Previously constructed device tree map, if any.
401    fast: Whether to construct device tree fast, if not already provided
402
403  Yields:
404    Sequence of dicts of {physical port: device} where the type of
405    device depends on the ident keyword. Each dict is a separate hub.
406  """
407  if device_tree_map is None:
408    device_tree_map = GetBusNumberToDeviceTreeMap(fast=fast)
409  for bus in device_tree_map.values():
410    for (hub, hub_type) in GetHubsOnBus(bus, hub_types):
411      yield map_func(hub, hub_type)
412
413
414def GetAllPhysicalPortToNodeMaps(hub_types, **kwargs):
415  return CollectHubMaps(hub_types, GetPhysicalPortToNodeMap, **kwargs)
416
417
418def GetAllPhysicalPortToBusDeviceMaps(hub_types, **kwargs):
419  return CollectHubMaps(hub_types, GetPhysicalPortToBusDeviceMap, **kwargs)
420
421
422def GetAllPhysicalPortToSerialMaps(hub_types, **kwargs):
423  return CollectHubMaps(hub_types, GetPhysicalPortToSerialMap, **kwargs)
424
425
426def GetAllPhysicalPortToTTYMaps(hub_types, **kwargs):
427  return CollectHubMaps(hub_types, GetPhysicalPortToTTYMap, **kwargs)
428
429
430_BUS_NUM_REGEX = re.compile(r'.*ATTRS{busnum}=="(\d*)".*')
431_DEVICE_NUM_REGEX = re.compile(r'.*ATTRS{devnum}=="(\d*)".*')
432
433
434def GetBusDeviceFromTTY(tty_string):
435  """Gets bus and device number connected to a ttyUSB port.
436
437  Args:
438    tty_string: [String] Identifier for ttyUSB (e.g. 'ttyUSB0')
439
440  Returns:
441    Tuple (bus, device) giving device connected to that ttyUSB.
442
443  Raises:
444    ValueError: If bus and device information could not be found.
445  """
446  bus_num = None
447  device_num = None
448  # Expected output of GetCmdOutput should be something like:
449  # looking at device /devices/something/.../.../...
450  # KERNELS="ttyUSB0"
451  # SUBSYSTEMS=...
452  # DRIVERS=...
453  # ATTRS{foo}=...
454  # ATTRS{bar}=...
455  # ...
456  for line in _GetTtyUSBInfo(tty_string).splitlines():
457    bus_match = _BUS_NUM_REGEX.match(line)
458    device_match = _DEVICE_NUM_REGEX.match(line)
459    if bus_match and bus_num is None:
460      bus_num = int(bus_match.group(1))
461    if device_match and device_num is None:
462      device_num = int(device_match.group(1))
463  if bus_num is None or device_num is None:
464    raise ValueError('Info not found')
465  return (bus_num, device_num)
466
467
468def GetBusDeviceToTTYMap():
469  """Gets all mappings from (bus, device) to ttyUSB string.
470
471  Gets mapping from (bus, device) to ttyUSB string (e.g. 'ttyUSB0'),
472  for all ttyUSB strings currently active.
473
474  Returns:
475    [dict] Dict that maps (bus, device) to ttyUSB string
476  """
477  result = {}
478  for tty in GetTTYList():
479    result[GetBusDeviceFromTTY(tty)] = tty
480  return result
481
482
483# This dictionary described the mapping between physical and
484# virtual ports on a Plugable 7-Port Hub (model USB2-HUB7BC).
485# Keys are the virtual ports, values are the physical port.
486# The entry 4:{1:4, 2:3, 3:2, 4:1} indicates that virtual port
487# 4 connects to another 'virtual' hub that itself has the
488# virtual-to-physical port mapping {1:4, 2:3, 3:2, 4:1}.
489
490
491def TestUSBTopologyScript():
492  """Test display and hub identification."""
493  # The following makes logger.info behave pretty much like print
494  # during this test script.
495  logging.basicConfig(format='%(message)s', stream=sys.stdout)
496  logger.setLevel(logging.INFO)
497
498  # Identification criteria for Plugable 7-Port Hub
499  logger.info('==== USB TOPOLOGY SCRIPT TEST ====')
500  logger.info('')
501
502  # Display devices
503  logger.info('==== DEVICE DISPLAY ====')
504  device_trees = GetBusNumberToDeviceTreeMap()
505  for device_tree in device_trees.values():
506    device_tree.Display()
507  logger.info('')
508
509  # Display TTY information about devices plugged into hubs.
510  logger.info('==== TTY INFORMATION ====')
511  for port_map in GetAllPhysicalPortToTTYMaps(
512      usb_hubs.ALL_HUBS, device_tree_map=device_trees):
513    logger.info('%s', port_map)
514  logger.info('')
515
516  # Display serial number information about devices plugged into hubs.
517  logger.info('==== SERIAL NUMBER INFORMATION ====')
518  for port_map in GetAllPhysicalPortToSerialMaps(
519      usb_hubs.ALL_HUBS, device_tree_map=device_trees):
520    logger.info('%s', port_map)
521
522  return 0
523
524
525def parse_options(argv):
526  """Parses and checks the command-line options.
527
528  Returns:
529    A tuple containing the options structure and a list of categories to
530    be traced.
531  """
532  USAGE = '''./find_usb_devices [--help]
533    This script shows the mapping between USB devices and port numbers.
534    Clients are not intended to call this script from the command line.
535    Clients are intended to call the functions in this script directly.
536    For instance, GetAllPhysicalPortToSerialMaps(...)
537    Running this script with --help will display this message.
538    Running this script without --help will display information about
539    devices attached, TTY mapping, and serial number mapping,
540    for testing purposes. See design document for API documentation.
541  '''
542  parser = argparse.ArgumentParser(usage=USAGE)
543  return parser.parse_args(argv[1:])
544
545
546def main():
547  parse_options(sys.argv)
548  TestUSBTopologyScript()
549
550
551if __name__ == "__main__":
552  sys.exit(main())
553