1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import fcntl
18import logging
19import os
20
21from vts.utils.python.common import cmd_utils
22
23# _IO('U', 20)
24USBDEVFS_RESET = ord("U") << 8 | 20
25
26
27def GetDevicesUSBFilePath():
28    """Sweeps through connected USB device info and maps serial number to them.
29
30    Returns:
31        A dict, serial numbers of the devices as the key, device file path
32        corresponding to the serial number as the value.
33    """
34    ret = {}
35
36    sh_stdout, sh_stderr, ret_code = cmd_utils.ExecuteOneShellCommand("which udevadm")
37    if ret_code != 0:
38        logging.error("`udevadm` doesn't exist on the host; "
39                      "please install 'udev' pkg before retrying.")
40        return ret
41
42    sh_stdout, _, _ = cmd_utils.ExecuteOneShellCommand(
43        "find /sys/bus/usb/devices/usb*/ -name dev")
44    lines = sh_stdout.split()
45    for line in lines:
46        syspath = os.path.dirname(line)
47        devname, _, _ = cmd_utils.ExecuteOneShellCommand(
48            "udevadm info -q name -p %s" % syspath)
49        if devname.startswith("bus/"):
50            serial, _, _ = cmd_utils.ExecuteOneShellCommand(
51                "udevadm info -q property --export -p %s | grep ID_SERIAL_SHORT"
52                % syspath)
53            if serial:
54                device_serial = serial.split("=")[1].strip().strip("'")
55                ret[device_serial] = os.path.join("/dev", devname.strip())
56
57    return ret
58
59
60def ResetDeviceUsb(dev_file_path):
61    """Invokes ioctl that resets the USB device on the given file path.
62
63    Args:
64        dev_file_path: string, abs path to the device file
65
66    Returns:
67        True if successful, False otherwise.
68    """
69    try:
70        with open(dev_file_path, "wb") as usb_fd:
71            fcntl.ioctl(usb_fd, USBDEVFS_RESET, 0)
72    except IOError as e:
73        logging.exception(e)
74        return False
75
76    return True
77
78
79def ResetUsbDeviceOfSerial(serial):
80    """Resets a USB device file corresponding to the given serial.
81
82    Args:
83        serial: string, serial number of the device whose USB device file
84                will reset.
85
86    Returns:
87        True if successful, False otherwise.
88    """
89    device_file_path = GetDevicesUSBFilePath()
90    if serial in device_file_path:
91        logging.error(
92            "Device %s not responding. Resetting device file %s.", serial,
93            device_file_path[serial])
94        return ResetDeviceUsb(device_file_path[serial])
95    return False
96
97
98def ResetUsbDeviceOfSerial_Callback(*args):
99    """Wrapper of the ResetUsbDeviceOfSerial(), for handling the *args.
100
101    Args:
102        args: tuple of arguments, expected to have the serial number of
103              the devices as the first element.
104
105    Returns:
106        True if successful, False otherwise.
107    """
108    try:
109        return ResetUsbDeviceOfSerial(args[0])
110    except IndexError as e:
111        logging.exception(e)
112        return False