1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Auto test for Bizlink firmware updater functionality and udev rule."""
5
6from __future__ import print_function
7import logging
8import os
9import re
10import time
11
12from autotest_lib.client.common_lib import error
13from autotest_lib.server import test
14
15UPDATER_WAIT_TIME = 180     # seconds
16CMD_CHECK_FW_UPDATE_OUTPUT = 'grep bizlink-updater /var/log/messages'
17FW_PATH = '/lib/firmware/bizlink/'
18OLD_FW_NAME = 'megachips-firmware-old.bin'
19NEW_FW_NAME = 'megachips-firmware.bin'
20
21
22class enterprise_CFM_BizlinkUpdater(test.test):
23    """
24    Bizlink dongle firmware updater functionality test in Chrome Box.
25
26    The procedure of the test is:
27    1. flash old version FW to device,
28    2. Reboot the device, which should be able to trigger udev rule and run the
29         updater,
30    3. wait for the updater to finish,
31    4. run fw updater again and verify that the FW in device is consistent with
32         latest FW within system by checking the output.
33    """
34
35    version = 1
36
37
38    def initialize(self, host):
39        self.host = host
40        self.old_fw_path = os.path.join(FW_PATH, OLD_FW_NAME)
41        self.new_fw_path = os.path.join(FW_PATH, NEW_FW_NAME)
42
43    def cleanup(self):
44        cmd = 'rm -f {}'.format(self.old_fw_path)
45        self.host.run(cmd)
46        super(enterprise_CFM_BizlinkUpdater, self).cleanup()
47
48    def check_update_result(self, expected_output=''):
49        """
50        Checks FW update result.
51
52        Queries the syslog and checks if expected_output occurs in it.
53
54        @param expected_output: the string to query syslog for.
55
56        @returns True if expected_output is in syslog. Otherwise return false.
57        """
58        result = self.host.run(CMD_CHECK_FW_UPDATE_OUTPUT)
59        # Only check last 5 logs for the most recent run.
60        messages = result.stdout.strip().split('\n')
61        if len(messages) < 5:
62            return False
63        messages = ''.join(messages[-5:])
64        if expected_output in messages:
65            return True
66        else:
67            return False
68
69    def convert_rootfs_writable(self):
70        """
71        Removes rootfs verification on DUT and reboots.
72        """
73        logging.info('Disabling rootfs verification...')
74        self.remove_rootfs_verification()
75
76        logging.info('Rebooting...')
77        self.host.reboot()
78
79    def remove_rootfs_verification(self):
80        """Removes rootfs verification."""
81        # 2 & 4 are default partitions, and the system boots from one of them.
82        # Code from chromite/scripts/deploy_chrome.py
83        KERNEL_A_PARTITION = 2
84        KERNEL_B_PARTITION = 4
85
86        cmd_template = ('/usr/share/vboot/bin/make_dev_ssd.sh --partitions %d '
87                        '--remove_rootfs_verification --force')
88        for partition in (KERNEL_A_PARTITION, KERNEL_B_PARTITION):
89            cmd = cmd_template % partition
90            self.host.run(cmd)
91
92    def is_filesystem_readwrite(self):
93        """Checks if the root file system is read-writable.
94
95        Queries the DUT's filesystem /dev/root, checks for keyword 'rw'.
96
97        @returns True if /dev/root is read-writable. False otherwise.
98        """
99        cmd = 'cat /proc/mounts | grep "/dev/root"'
100        result = self.host.run(cmd)
101        if result.stderr:
102            output = result.stderr
103        else:
104            output = result.stdout
105        fields = re.split(' |,', output)
106        return True if fields.__len__() >= 4 and fields[3] == 'rw' else False
107
108    def copy_firmware(self):
109        """Copies test firmware from server to DUT."""
110        current_dir = os.path.dirname(os.path.realpath(__file__))
111        src_firmware_path = os.path.join(current_dir, OLD_FW_NAME)
112        dst_firmware_path = FW_PATH
113        logging.info('Copy firmware from {} to {}.'.format(src_firmware_path,
114                                                           dst_firmware_path))
115        self.host.send_file(src_firmware_path, dst_firmware_path,
116                            delete_dest=True)
117
118    def triger_updater(self):
119        """Trigers udev rule to run fw updater."""
120        self.host.reboot()
121
122    def flash_fw(self, fw_path):
123        """
124        Flashes certain firmware to device.
125
126        Runs Bizlink firmware updater on DUT to flashes the firmware given
127        by fw_path to target device.
128
129        @param fw_path: the path to the firmware to flash.
130
131        """
132        cmd_run_updater = ('/usr/sbin/bizlink-updater --update=true '
133                           '--fw_path={}'.format(fw_path))
134        logging.info('executing {}'.format(cmd_run_updater))
135        self.host.run(cmd_run_updater)
136
137    def run_once(self):
138        # Make the DUT filesystem writable.
139        if not self.is_filesystem_readwrite():
140            logging.info('DUT root file system is not read-writable. '
141                         'Converting it read-writable...')
142            self.convert_rootfs_writable()
143        else:
144            logging.info('DUT is read-writable.')
145
146        # Copy old FW to device.
147        self.copy_firmware()
148
149        # Flash old FW to device.
150        self.flash_fw(self.old_fw_path)
151        expect_output = 'FW update succeed.'
152        succeed = self.check_update_result(expected_output=expect_output)
153        if not succeed:
154            raise error.TestFail('Expect \'{}\' in output, '
155                                 'but didn\'t find it.'.format(expect_output))
156
157        self.triger_updater()
158
159        # Wait for fw updater to finish.
160        time.sleep(UPDATER_WAIT_TIME)
161
162        # Try flash the new firmware, should detect same fw version.
163        expect_output = 'Same FW version, no update required.'
164        self.flash_fw(self.new_fw_path)
165        succeed = self.check_update_result(expected_output=expect_output)
166        if not succeed:
167            raise error.TestFail('Expect {} in output '
168                                 'but didn\'t find it.'.format(expect_output))
169
170