1# Copyright (c) 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import glob 7import logging 8import re 9import time 10 11from autotest_lib.client.bin import test 12from autotest_lib.client.common_lib import error, utils 13from autotest_lib.client.cros import ec as cros_ec, cros_logging 14 15 16class usbpd_GFU(test.test): 17 """Integration test for USB-PD Google Firmware Update (GFU). 18 19 Test should: 20 - interrogate what firmware's are available for each device and for each: 21 1. Use ectool's flashpd to write RW with that to mimic old hw 22 - Validate that kernel driver successfully updates to latest RW. 23 2. Erase RW and see update as well. 24 25 TODO: 26 3. Check that update is checked after S2R. 27 """ 28 29 version = 1 30 31 FW_PATH = '/lib/firmware/cros-pd' 32 # <device>_v<major>.<minor>.<build>-<commit SHA> 33 FW_NAME_RE = r'%s/(\w+)_v(\d+)\.(\d+)\.(\d+)-([0-9a-f]+).*' % (FW_PATH) 34 GOOGLE_VID = '0x18d1' 35 MAX_UPDATE_SECS = 80 36 FW_UP_DNAME = 'cros_ec_pd_update' 37 # TODO(tbroch) This will be change once cros_ec_pd_update is abstracted from 38 # ACPI driver. Will need to fix this once it happens. 39 FW_UP_DISABLE_PATH = '/sys/devices/LNXSYSTM:00/device:00/PNP0A08:00/device:1e/PNP0C09:00/GOOG0003:00/disable' 40 41 # TODO(tbroch) find better way to build this or we'll have to edit test for 42 # each new PD peripheral. 43 DEV_MAJOR = dict(zinger=1, minimuffin=2, dingdong=3, hoho=4) 44 45 def _index_firmware_avail(self): 46 """Index the various USB-PD firmwares in the rootfs. 47 48 TODO(crosbug.com/434522) This method will need reworked after we've come 49 up with a better method for firmware release. 50 51 @returns: dictionary of firmwares (key == name, value == list of 52 firmware paths) 53 """ 54 fw_dict = collections.defaultdict(list) 55 for fw in glob.glob('%s/*_v[1-9].*.bin' % (self.FW_PATH)): 56 mat = re.match(self.FW_NAME_RE, fw) 57 if not mat: 58 continue 59 60 name = mat.group(1) 61 fw_dict[name].append(fw) 62 63 return fw_dict 64 65 def _is_gfu(self, port): 66 """Is it in GFU? 67 68 @param port: EC_USBPD object for port. 69 70 @returns: True if GFU enterd, False otherwise. 71 """ 72 return port.is_amode_supported(self.GOOGLE_VID) 73 74 def _is_in_rw(self, port): 75 """Is PD device in RW firmware? 76 77 @param port: EC_USBPD object for port. 78 79 @returns: True if in RW, False otherwise. 80 """ 81 flash_info = port.get_flash_info() 82 logging.debug('flash_info = %s', flash_info) 83 return flash_info['image_status'] == 'RW' 84 85 def _set_kernel_fw_update(self, disable=0): 86 """Disable the FW update driver. 87 88 @param disable: 1 for disable, 0 for enable. 89 """ 90 utils.write_one_line(self.FW_UP_DISABLE_PATH, disable) 91 if not disable: 92 # Allow kernel driver time quiesce 93 time.sleep(2) 94 95 def _modify_rw(self, port, rw=None, tries=3): 96 """Modify RW of USB-PD device in <port>. 97 98 @param port: EC_USBPD object for port. 99 @param rw: Path to RW FW to write using ectool. If None then uses 100 /dev/null to invalidate the RW. 101 @param tries: Number of tries to update RW via flashpd 102 103 @returns: True if success, False otherwise. 104 """ 105 timeout = self.MAX_UPDATE_SECS 106 107 if not rw: 108 rw = '/dev/null' 109 tries = 1 110 111 self._set_kernel_fw_update(disable=1) 112 113 while (tries): 114 try: 115 # Note in flashpd <dev_major> <port> <file> the dev_major is 116 # unnecessary in all cases so its just been set to 0 117 port.ec_command('flashpd 0 %d %s' % (port.index, rw), 118 ignore_status=True, timeout=timeout) 119 120 except error.CmdTimeoutError: 121 # TODO(tbroch) could remove try/except if ec_command used run 122 # instead of system_output + ignore_timeout=True 123 tries -= 1 124 continue 125 126 if rw != '/dev/null' and not self._is_in_rw(port): 127 logging.warn('Port%d: not in RW after flashpd ... retrying', 128 port.index) 129 tries -= 1 130 else: 131 break 132 133 self._set_kernel_fw_update() 134 135 msg = self._reader.get_last_msg([r'%s.*is in RO' % port.index, 136 self.FW_UP_DNAME], 137 retries=5, sleep_seconds=2) 138 if not msg: 139 logging.warn('Port%d: Driver does NOT see dev in not in RO', 140 port.index) 141 return False 142 logging.info('Port%d: Driver sees device in RO', port.index) 143 return True 144 145 def _test_update(self, port, rw=None, tries=3): 146 """Test RW update. 147 148 Method tests the kernel's RW update process by first modifying the 149 existing RW (either invalidating or rolling it back) via ectool. It 150 then querys the syslog to validate kernel sees the need for update and 151 is successful. 152 153 @param port: EC_USBPD object for port. 154 @param rw: path to RW firmware to write via ectool to test upgrade. 155 @param tries: integer number of attempts to write RW. Necessary as 156 update is not robust (design decision). 157 """ 158 if not tries: 159 raise error.TestError('Retries must be > 0') 160 161 if not self._is_in_rw(port): 162 raise error.TestError('Port%d: Device is not in RW' % port.index) 163 164 fw_up_re = r'%s.*Port%d FW update completed' % (self.FW_UP_DNAME, 165 port.index) 166 167 while tries: 168 self._reader.set_start_by_current() 169 rsp = self._modify_rw(port, rw) 170 171 if not rsp: 172 rsp_str = 'Port%d: RW modified with RW=%s failed' % \ 173 (port.index, rw) 174 if tries: 175 logging.warn('%s ... retrying.', rsp_str) 176 tries -= 1 177 else: 178 raise error.TestError(rsp_str) 179 180 self._reader.set_start_by_current() 181 msg = self._reader.get_last_msg([fw_up_re], 182 retries=(self.MAX_UPDATE_SECS / 2), 183 sleep_seconds=2) 184 185 if not msg: 186 rsp_str = 'Port%d: driver did NOT update FW' % port.index 187 if tries: 188 logging.warn('%s ... retrying.', rsp_str) 189 tries -= 1 190 continue 191 else: 192 raise error.TestError(rsp_str) 193 194 logging.info('Port%d: Driver completed RW update', port.index) 195 196 # Allow adequate reboot time after RW write completes and device is 197 # rebooted. 198 time.sleep(3) 199 200 if not self._is_in_rw(port): 201 rsp_str = 'Port%d: Device is not in RW' % port.index 202 if tries: 203 logging.warn('%s ... retrying.', rsp_str) 204 tries -= 1 205 continue 206 else: 207 raise error.TestError(rsp_str) 208 209 break # success # 210 211 def _test_rw_rollback(self, port, fw_dict): 212 """Test rolling back RW firmware. 213 214 @param port: EC_USBPD object for port. 215 @param fw_dict: dictionary of firmwares. 216 """ 217 self._set_kernel_fw_update() 218 219 # test old RW update 220 flash_info = port.get_flash_info() 221 for dev_name in fw_dict.keys(): 222 if flash_info['dev_major'] == self.DEV_MAJOR[dev_name]: 223 for old_rw in sorted(fw_dict[dev_name], reverse=True)[1:]: 224 logging.info('Port%d: Rollback test %s to %s', 225 port.index, dev_name, old_rw) 226 self._test_update(port, rw=old_rw) 227 break 228 229 def _test_ro_only(self, port, ro_reps): 230 """Test FW update on device with RO only. 231 232 @param port: EC_USBPD object for port. 233 @param ro_reps: Number of times to repeat test. 234 """ 235 # test update in RO ro_reps times 236 for i in xrange(ro_reps): 237 logging.info('RO Loop%d', i) 238 self._test_update(port) 239 240 def run_once(self, ro_reps=1): 241 242 fw_dict = self._index_firmware_avail() 243 244 self._usbpd = cros_ec.EC_USBPD() 245 self._reader = cros_logging.LogReader() 246 247 for port in self._usbpd.ports: 248 if not port.is_dfp(): 249 continue 250 251 logging.info('Port%d: is a DFP', port.index) 252 253 if not self._is_gfu(port): 254 continue 255 256 logging.info('Port%d: supports GFU', port.index) 257 258 self._test_rw_rollback(port, fw_dict) 259 self._test_ro_only(port, ro_reps) 260 261 def cleanup(self): 262 self._set_kernel_fw_update() 263