1# Copyright (c) 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import collections
6import glob
7import logging
8import re
9import time
10
11from autotest_lib.client.bin import test
12from autotest_lib.client.common_lib import error, utils
13from autotest_lib.client.cros import ec as cros_ec, cros_logging
14
15
16class usbpd_GFU(test.test):
17    """Integration test for USB-PD Google Firmware Update (GFU).
18
19    Test should:
20    - interrogate what firmware's are available for each device and for each:
21      1. Use ectool's flashpd to write RW with that to mimic old hw
22         - Validate that kernel driver successfully updates to latest RW.
23      2. Erase RW and see update as well.
24
25    TODO:
26      3. Check that update is checked after S2R.
27    """
28
29    version = 1
30
31    FW_PATH = '/lib/firmware/cros-pd'
32    # <device>_v<major>.<minor>.<build>-<commit SHA>
33    FW_NAME_RE = r'%s/(\w+)_v(\d+)\.(\d+)\.(\d+)-([0-9a-f]+).*' % (FW_PATH)
34    GOOGLE_VID = '0x18d1'
35    MAX_UPDATE_SECS = 80
36    FW_UP_DNAME = 'cros_ec_pd_update'
37    # TODO(tbroch) This will be change once cros_ec_pd_update is abstracted from
38    # ACPI driver.  Will need to fix this once it happens.
39    FW_UP_DISABLE_PATH = '/sys/devices/LNXSYSTM:00/device:00/PNP0A08:00/device:1e/PNP0C09:00/GOOG0003:00/disable'
40
41    # TODO(tbroch) find better way to build this or we'll have to edit test for
42    # each new PD peripheral.
43    DEV_MAJOR = dict(zinger=1, minimuffin=2, dingdong=3, hoho=4)
44
45    def _index_firmware_avail(self):
46        """Index the various USB-PD firmwares in the rootfs.
47
48        TODO(crosbug.com/434522) This method will need reworked after we've come
49        up with a better method for firmware release.
50
51        @returns: dictionary of firmwares (key == name, value == list of
52          firmware paths)
53        """
54        fw_dict = collections.defaultdict(list)
55        for fw in glob.glob('%s/*_v[1-9].*.bin' % (self.FW_PATH)):
56            mat = re.match(self.FW_NAME_RE, fw)
57            if not mat:
58                continue
59
60            name = mat.group(1)
61            fw_dict[name].append(fw)
62
63        return fw_dict
64
65    def _is_gfu(self, port):
66        """Is it in GFU?
67
68        @param port: EC_USBPD object for port.
69
70        @returns: True if GFU enterd, False otherwise.
71        """
72        return port.is_amode_supported(self.GOOGLE_VID)
73
74    def _is_in_rw(self, port):
75        """Is PD device in RW firmware?
76
77        @param port: EC_USBPD object for port.
78
79        @returns: True if in RW, False otherwise.
80        """
81        flash_info = port.get_flash_info()
82        logging.debug('flash_info = %s', flash_info)
83        return flash_info['image_status'] == 'RW'
84
85    def _set_kernel_fw_update(self, disable=0):
86        """Disable the FW update driver.
87
88        @param disable: 1 for disable, 0 for enable.
89        """
90        utils.write_one_line(self.FW_UP_DISABLE_PATH, disable)
91        if not disable:
92            # Allow kernel driver time quiesce
93            time.sleep(2)
94
95    def _modify_rw(self, port, rw=None, tries=3):
96        """Modify RW of USB-PD device in <port>.
97
98        @param port: EC_USBPD object for port.
99        @param rw: Path to RW FW to write using ectool.  If None then uses
100          /dev/null to invalidate the RW.
101        @param tries: Number of tries to update RW via flashpd
102
103        @returns: True if success, False otherwise.
104        """
105        timeout = self.MAX_UPDATE_SECS
106
107        if not rw:
108            rw = '/dev/null'
109            tries = 1
110
111        self._set_kernel_fw_update(disable=1)
112
113        while (tries):
114            try:
115                # Note in flashpd <dev_major> <port> <file> the dev_major is
116                # unnecessary in all cases so its just been set to 0
117                port.ec_command('flashpd 0 %d %s' % (port.index, rw),
118                                ignore_status=True, timeout=timeout)
119
120            except error.CmdTimeoutError:
121                # TODO(tbroch) could remove try/except if ec_command used run
122                # instead of system_output + ignore_timeout=True
123                tries -= 1
124                continue
125
126            if rw != '/dev/null' and not self._is_in_rw(port):
127                logging.warn('Port%d: not in RW after flashpd ... retrying',
128                             port.index)
129                tries -= 1
130            else:
131                break
132
133        self._set_kernel_fw_update()
134
135        msg = self._reader.get_last_msg([r'%s.*is in RO' % port.index,
136                                         self.FW_UP_DNAME],
137                                        retries=5, sleep_seconds=2)
138        if not msg:
139            logging.warn('Port%d: Driver does NOT see dev in not in RO',
140                         port.index)
141            return False
142        logging.info('Port%d: Driver sees device in RO', port.index)
143        return True
144
145    def _test_update(self, port, rw=None, tries=3):
146        """Test RW update.
147
148        Method tests the kernel's RW update process by first modifying the
149        existing RW (either invalidating or rolling it back) via ectool.  It
150        then querys the syslog to validate kernel sees the need for update and
151        is successful.
152
153        @param port: EC_USBPD object for port.
154        @param rw: path to RW firmware to write via ectool to test upgrade.
155        @param tries: integer number of attempts to write RW.  Necessary as
156          update is not robust (design decision).
157        """
158        if not tries:
159            raise error.TestError('Retries must be > 0')
160
161        if not self._is_in_rw(port):
162            raise error.TestError('Port%d: Device is not in RW' % port.index)
163
164        fw_up_re = r'%s.*Port%d FW update completed' % (self.FW_UP_DNAME,
165                                                        port.index)
166
167        while tries:
168            self._reader.set_start_by_current()
169            rsp = self._modify_rw(port, rw)
170
171            if not rsp:
172                rsp_str = 'Port%d: RW modified with RW=%s failed' % \
173                          (port.index, rw)
174                if tries:
175                    logging.warn('%s ... retrying.', rsp_str)
176                    tries -= 1
177                else:
178                    raise error.TestError(rsp_str)
179
180            self._reader.set_start_by_current()
181            msg = self._reader.get_last_msg([fw_up_re],
182                                            retries=(self.MAX_UPDATE_SECS / 2),
183                                            sleep_seconds=2)
184
185            if not msg:
186                rsp_str = 'Port%d: driver did NOT update FW' % port.index
187                if tries:
188                    logging.warn('%s ... retrying.', rsp_str)
189                    tries -= 1
190                    continue
191                else:
192                    raise error.TestError(rsp_str)
193
194            logging.info('Port%d: Driver completed RW update', port.index)
195
196            # Allow adequate reboot time after RW write completes and device is
197            # rebooted.
198            time.sleep(3)
199
200            if not self._is_in_rw(port):
201                rsp_str = 'Port%d: Device is not in RW' % port.index
202                if tries:
203                    logging.warn('%s ... retrying.', rsp_str)
204                    tries -= 1
205                    continue
206                else:
207                    raise error.TestError(rsp_str)
208
209            break # success #
210
211    def _test_rw_rollback(self, port, fw_dict):
212        """Test rolling back RW firmware.
213
214        @param port: EC_USBPD object for port.
215        @param fw_dict: dictionary of firmwares.
216        """
217        self._set_kernel_fw_update()
218
219        # test old RW update
220        flash_info = port.get_flash_info()
221        for dev_name in fw_dict.keys():
222            if flash_info['dev_major'] == self.DEV_MAJOR[dev_name]:
223                for old_rw in sorted(fw_dict[dev_name], reverse=True)[1:]:
224                    logging.info('Port%d: Rollback test %s to %s',
225                                 port.index, dev_name, old_rw)
226                    self._test_update(port, rw=old_rw)
227                break
228
229    def _test_ro_only(self, port, ro_reps):
230        """Test FW update on device with RO only.
231
232        @param port: EC_USBPD object for port.
233        @param ro_reps: Number of times to repeat test.
234        """
235        # test update in RO ro_reps times
236        for i in xrange(ro_reps):
237            logging.info('RO Loop%d', i)
238            self._test_update(port)
239
240    def run_once(self, ro_reps=1):
241
242        fw_dict = self._index_firmware_avail()
243
244        self._usbpd = cros_ec.EC_USBPD()
245        self._reader = cros_logging.LogReader()
246
247        for port in self._usbpd.ports:
248            if not port.is_dfp():
249                continue
250
251            logging.info('Port%d: is a DFP', port.index)
252
253            if not self._is_gfu(port):
254                continue
255
256            logging.info('Port%d: supports GFU', port.index)
257
258            self._test_rw_rollback(port, fw_dict)
259            self._test_ro_only(port, ro_reps)
260
261    def cleanup(self):
262        self._set_kernel_fw_update()
263