1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros import power_cycle_usb_util
12from autotest_lib.server import test
13import parse
14
15
16class enterprise_CFM_HuddlyUpdater(test.test):
17    """Tests the firmware updatability of HuddlyGo camera.
18
19    An event to trigger the firmware update is to power recycle of a USB port
20    which the HuddlyGo camera is attached to. The power recycle emulates
21    the power recycle of the ChromeBox or a reconnection of the peripheral
22    to the ChromeBox.
23
24    The test scenario involves the power recycling of a specific USB port
25    of the Guado ChromeBox: Front-left one. This imposes a restriction in the
26    testbed setup. This limitation is to be alleviated after the development
27    of full-fledged usb power recycle code. TODO(frankhu).
28    """
29
30    version = 1
31    _failed_test_list = []
32
33    UPDATER_WAIT_TIME = 60  # sec
34
35    FIRMWARE_PKG_ORG = 'huddly'
36    FIRMWARE_PKG_TO_TEST = 'huddly052'
37    FIRMWARE_PKG_BACKUP = 'huddly.backup'
38
39    DUT_FIRMWARE_BASE = '/lib/firmware/'
40    DUT_FIRMWARE_SRC = os.path.join(DUT_FIRMWARE_BASE, FIRMWARE_PKG_ORG)
41    DUT_FIRMWARE_SRC_BACKUP = os.path.join(DUT_FIRMWARE_BASE,
42                                           FIRMWARE_PKG_BACKUP)
43    DUT_FIRMWARE_SRC_TEST = os.path.join(DUT_FIRMWARE_BASE,
44                                         FIRMWARE_PKG_TO_TEST)
45
46    def initialize(self):
47        """initialize is a stub function."""
48        # Placeholder.
49        pass
50
51    def ls(self):
52        """ls tracks the directories of interest."""
53        cmd = 'ls -l /lib/firmware/ | grep huddly'
54        result = self._shcmd(cmd)
55
56    def cleanup(self):
57        """Bring the originally bundled firmware package back."""
58        cmd = '[ -f {} ] && rm -rf {}'.format(self.DUT_FIRMWARE_SRC,
59                                              self.DUT_FIRMWARE_SRC)
60        self._shcmd(cmd)
61
62        cmd = 'mv {} {} && rm -rf {}'.format(self.DUT_FIRMWARE_SRC_BACKUP,
63                                             self.DUT_FIRMWARE_SRC,
64                                             self.DUT_FIRMWARE_SRC_TEST)
65        self._shcmd(cmd)
66
67    def _shcmd(self, cmd):
68        """A simple wrapper for remote shell command execution."""
69        logging.info('CMD: [%s]', cmd)
70        # result is an object with following attributes:
71        # ['__class__', '__delattr__', '__dict__', '__doc__', '__eq__',
72        # '__format__', '__getattribute__', '__hash__', '__init__',
73        # '__module__', '__new__', '__reduce__', '__reduce_ex__',
74        # '__repr__', '__setattr__', '__sizeof__', '__str__',
75        # '__subclasshook__', '__weakref__', 'command', 'duration',
76        # 'exit_status', 'stderr', 'stdout']
77        try:
78            result = self._client.run(cmd)
79            if result.stderr:
80                logging.info('CMD ERR:\n' + result.stderr)
81            logging.info('CMD OUT:\n' + result.stdout)
82            return result
83        except:
84            pass
85
86    def copy_firmware(self):
87        """Copy test firmware package from server to the DUT."""
88        current_dir = os.path.dirname(os.path.realpath(__file__))
89        src_firmware_path = os.path.join(current_dir, self.FIRMWARE_PKG_TO_TEST)
90        dst_firmware_path = self.DUT_FIRMWARE_BASE
91
92        msg = 'copy firmware from {} to {}'.format(src_firmware_path,
93                                                   dst_firmware_path)
94        logging.info(msg)
95        self._client.send_file(
96            src_firmware_path, dst_firmware_path, delete_dest=True)
97
98    def update_firmware(self, firmware_pkg):
99        """Update the peripheral's firmware with the specified package.
100
101        @param firmware_pkg: A string of package name specified by the leaf
102                directory name in /lib/firmware/. See class constants
103                DUT_FIRMWARE_SRC*.
104        """
105        # Set up the firmware package to test with
106        firmware_path = os.path.join(self.DUT_FIRMWARE_BASE, firmware_pkg)
107        cmd = 'ln -sfn {} {}'.format(firmware_path, self.DUT_FIRMWARE_SRC)
108        self._shcmd(cmd)
109
110        ver_dic = self.get_fw_vers()
111        had = ver_dic.get('peripheral', {}).get('app', '')
112        want = ver_dic.get('package', {}).get('app', '')
113
114        msg = 'Update plan: from {} to {} with package: {}'.format(
115            had, want, firmware_pkg)
116        logging.info(msg)
117
118        logging.info('Recycle the power to the USB port '
119                     'to which HuddlyGo is attached.')
120        self.usb_power_recycle()
121        time.sleep(self.UPDATER_WAIT_TIME)
122
123        got = self.get_fw_vers().get('peripheral', {}).get('app', '')
124
125        msg = 'Update result: had {} want {} got {}'.format(
126            had, want, got)
127        logging.info(msg)
128
129        if want != got:
130            self._failed_test_list.append(
131                'update_firmware({})'.format(firmware_pkg))
132
133    def run_once(self, host=None):
134        """Update two times. First with test package, second with the original.
135
136        Test scenario:
137          1. Copy test firmware from the server to the DUT.
138          2. Update with the test package. Wait about 50 sec till completion.
139             Confirm if the peripheral is updated with the test version.
140          3. Update with the original package. Wait about 50 sec.
141             Confirm if the peripheral is updated with the original version.
142        """
143        self._client = host
144
145        if not self.is_filesystem_readwrite():
146            # Make the file system read-writable, reboot, and continue the test
147            logging.info('DUT root file system is not read-writable. '
148                         'Converting it read-wriable...')
149            self.convert_rootfs_writable()
150        else:
151            logging.info('DUT is read-writable')
152
153
154        try:
155            self.ls()
156            cmd = 'mv {} {}'.format(self.DUT_FIRMWARE_SRC,
157                                    self.DUT_FIRMWARE_SRC_BACKUP)
158            self._shcmd(cmd)
159
160            self.ls()
161            self.copy_firmware()
162            self.ls()
163            self.update_firmware(self.FIRMWARE_PKG_TO_TEST)
164            self.ls()
165            self.update_firmware(self.FIRMWARE_PKG_BACKUP)
166
167            if self._failed_test_list:
168              msg = 'Test failed in {}'.format(
169                  ', '.join(map(str, self._failed_test_list)))
170              raise error.TestFail(msg)
171        except:
172            pass
173        finally:
174            self.cleanup()
175
176    def convert_rootfs_writable(self):
177        """Remove rootfs verification on DUT, reboot,
178        and remount the filesystem read-writable"""
179
180        logging.info('Disabling rootfs verification...')
181        self.remove_rootfs_verification()
182
183        logging.info('Rebooting...')
184        self.reboot()
185
186        logging.info('Remounting..')
187        cmd = 'mount -o remount,rw /'
188        self._shcmd(cmd)
189
190    def remove_rootfs_verification(self):
191        """Remove rootfs verification."""
192        # 2 & 4 are default partitions, and the system boots from one of them.
193        # Code from chromite/scripts/deploy_chrome.py
194        KERNEL_A_PARTITION = 2
195        KERNEL_B_PARTITION = 4
196
197        cmd_template = ('/usr/share/vboot/bin/make_dev_ssd.sh --partitions %d '
198               '--remove_rootfs_verification --force')
199        for partition in (KERNEL_A_PARTITION, KERNEL_B_PARTITION):
200            cmd = cmd_template % partition
201            self._client.run(cmd)
202
203    def reboot(self):
204        """Reboots the DUT."""
205        self._client.reboot()
206
207    def get_fw_vers(self):
208        """Queries the firmware versions.
209
210        Utilizes the output of the command 'huddly-updater --info'.
211        It queries and parses the firmware versions of app and bootloader of
212        firmware package and the peripheral's running firmwares, respectively.
213
214        @returns a dictionary hierachically storing the firmware versions.
215        """
216
217        # TODO(porce): The updater's output is to stdout, but Auto test
218        # command output comes to stderr. Investigate.
219        cmd = 'huddly-updater --info --log_to=stdout'
220        result = self._shcmd(cmd).stderr
221        ver_dic = parse.parse_fw_vers(result)
222        return ver_dic
223
224    def usb_power_recycle(self):
225        """Recycle the power to a USB port.
226
227        # Use Power cycle usb util to recycle power.
228        """
229
230        try:
231            power_cycle_usb_util.power_cycle_usb_vidpid(self.host,
232                                    self.board, self.vid, self.pid)
233        except KeyError:
234            raise error.TestFail('Couldn\'t find target device: '
235                                 'vid:pid {}:{}'.format(self.vid, self.pid))
236
237
238    def is_filesystem_readwrite(self):
239        """Check if the root file system is read-writable.
240
241        Query the DUT's filesystem /dev/root, often manifested as /dev/dm-0
242        or  is mounted as read-only or not.
243
244        @returns True if the /dev/root is read-writable. False otherwise.
245        """
246
247        cmd = 'cat /proc/mounts | grep "/dev/root"'
248        result = self._shcmd(cmd).stdout
249        fields = re.split(' |,', result)
250        return True if fields.__len__() >= 4 and fields[3] == 'rw' else False
251