1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.client.common_lib.cros import cr50_utils
10from autotest_lib.server.cros import filesystem_util
11from autotest_lib.server.cros.faft.cr50_test import Cr50Test
12
13
14class firmware_Cr50Update(Cr50Test):
15    """
16    Verify a dut can update to the given image.
17
18    Copy the new image onto the device and clear the update state to force
19    cr50-update to run. The test will fail if Cr50 does not update or if the
20    update script encounters any errors.
21
22    @param image: the location of the update image
23    @param image_type: string representing the image type. If it is "dev" then
24                       don't check the RO versions when comparing versions.
25    """
26    version = 1
27    UPDATE_TIMEOUT = 20
28    POST_INSTALL = "post_install"
29
30    DEV_NAME = "dev_image"
31    OLD_RELEASE_NAME = "old_release_image"
32    RELEASE_NAME = "release_image"
33    SUCCESS = 0
34    UPDATE_OK = 1
35
36
37    def initialize(self, host, cmdline_args, release_path="", release_ver="",
38                   old_release_path="", old_release_ver="", test="",
39                   full_args={}):
40        """Initialize servo and process the given images"""
41        super(firmware_Cr50Update, self).initialize(host, cmdline_args,
42                                                    full_args,
43                                                    restore_cr50_image=True)
44        self.test_post_install = test.lower() == self.POST_INSTALL
45
46        if not release_ver and not os.path.isfile(release_path):
47            release_path = self.get_saved_cr50_original_path()
48            logging.info('Using device image as release')
49
50        # Make sure ccd is disabled so it won't interfere with the update
51        self.cr50.ccd_disable()
52
53        filesystem_util.make_rootfs_writable(self.host)
54
55        self.host = host
56
57        # A dict used to store relevant information for each image
58        self.images = {}
59
60        # Process the given images in order of oldest to newest. Get the version
61        # info and add them to the update order
62        self.update_order = []
63        if old_release_path or old_release_ver:
64            self.add_image_to_update_order(self.OLD_RELEASE_NAME,
65                                           old_release_path, old_release_ver)
66        self.add_image_to_update_order(self.RELEASE_NAME, release_path,
67                                       release_ver)
68        self.add_image_to_update_order(self.DEV_NAME,
69                                       self.get_saved_dbg_image_path())
70        self.verify_update_order()
71        logging.info("Update %s", self.update_order)
72
73        self.device_update_path = cr50_utils.GetActiveCr50ImagePath(self.host)
74        # Update to the dev image
75        self.run_update(self.DEV_NAME)
76
77
78    def run_update(self, image_name, use_usb_update=False):
79        """Copy the image to the DUT and upate to it.
80
81        Normal updates will use the cr50-update script to update. If a rollback
82        is True, use usb_update to flash the image and then use the 'rw'
83        commands to force a rollback.
84
85        @param image_name: the key in the images dict that can be used to
86                           retrieve the image info.
87        @param use_usb_update: True if usb_updater should be used directly
88                               instead of running the update script.
89        """
90        self.cr50.ccd_disable()
91        # Get the current update information
92        image_ver, image_ver_str, image_path = self.images[image_name]
93
94        dest, ver = cr50_utils.InstallImage(self.host, image_path,
95                self.device_update_path)
96        assert ver == image_ver, "Install failed"
97        image_rw = image_ver[1]
98
99        running_ver = cr50_utils.GetRunningVersion(self.host)
100        running_ver_str = cr50_utils.GetVersionString(running_ver)
101
102        # If the given image is older than the running one, then we will need
103        # to do a rollback to complete the update.
104        rollback = (cr50_utils.GetNewestVersion(running_ver[1], image_rw) !=
105                    image_rw)
106        logging.info("Attempting %s from %s to %s",
107                     "rollback" if rollback else "update", running_ver_str,
108                     image_ver_str)
109
110        # If a rollback is needed, flash the image into the inactive partition,
111        # on or use usb_update to update to the new image if it is requested.
112        if use_usb_update or rollback:
113            self.cr50_update(image_path, rollback=rollback)
114            self.check_state((self.checkers.crossystem_checker,
115                              {'mainfw_type': 'normal'}))
116
117        # Cr50 is going to reject an update if it hasn't been up for more than
118        # 60 seconds. Wait until that passes before trying to run the update.
119        self.cr50.wait_until_update_is_allowed()
120
121        # Running the usb update or rollback will enable ccd. Disable it again.
122        self.cr50.ccd_disable()
123
124        # Get the last cr50 update related message from /var/log/messages
125        last_message = cr50_utils.CheckForFailures(self.host, '')
126
127        if self.test_post_install:
128            self.post_install()
129        else:
130            self.startup_install()
131
132        # The cr50 updates happen over /dev/tpm0. It takes a while. After
133        # cr50-update has finished, cr50 should reboot. Wait until this happens
134        # before sending anymore commands.
135        self.cr50.wait_for_reboot()
136
137        # Verify the system boots normally after the update
138        self.check_state((self.checkers.crossystem_checker,
139                          {'mainfw_type': 'normal'}))
140
141        # Verify the version has been updated and that there have been no
142        # unexpected usb_updater exit codes.
143        cr50_utils.VerifyUpdate(self.host, image_ver, last_message)
144
145        logging.info("Successfully updated from %s to %s %s", running_ver_str,
146                     image_name, image_ver_str)
147
148    def post_install(self):
149        """Run the update using the post-install script"""
150        logging.info(self.host.run('/usr/share/cros/cr50-update.sh'))
151        self.host.reboot()
152
153
154    def startup_install(self):
155        """Run the update using the startup script"""
156        # Clear the update state and reboot, so cr50-update will run again.
157        cr50_utils.ClearUpdateStateAndReboot(self.host)
158
159
160    def fetch_image(self, ver=None):
161        """Fetch the image from gs and copy it to the host.
162
163        @param ver: The rw version of the prod image. If it is not None then the
164                    image will be retrieved from chromeos-localmirror otherwise
165                    it will be gotten from chromeos-localmirror-private using
166                    the devids
167        """
168        if ver:
169            bid = None
170            if '/' in ver:
171                ver, bid = ver.split('/', 1)
172            return self.download_cr50_release_image(ver, bid)
173        return self.download_cr50_debug_image()
174
175
176    def add_image_to_update_order(self, image_name, image_path, ver=None):
177        """Process the image. Add it to the update_order list and images dict.
178
179        Copy the image to the DUT and get version information.
180
181        Store the image information in the images dictionary and add it to the
182        update_order list.
183
184        @param image_name: string that is what the image should be called. Used
185                           as the key in the images dict.
186        @param image_path: the path for the image.
187        @param ver: If the image path isn't specified, this will be used to find
188                    the cr50 image in gs://chromeos-localmirror/distfiles.
189        """
190        tmp_file = '/tmp/%s.bin' % image_name
191
192        if not os.path.isfile(image_path):
193            image_path, ver = self.fetch_image(ver)
194        else:
195            _, ver = cr50_utils.InstallImage(self.host, image_path, tmp_file)
196
197        ver_str = cr50_utils.GetVersionString(ver)
198
199        self.update_order.append(image_name)
200        self.images[image_name] = (ver, ver_str, image_path)
201        logging.info("%s stored at %s with version %s", image_name, image_path,
202                     ver_str)
203
204
205    def verify_update_order(self):
206        """Verify each image in the update order has a higher version than the
207        last.
208
209        The test uses the cr50 update script to update to the next image in the
210        update order. If the versions are not in ascending order then the update
211        won't work. Cr50 cannot update to an older version using the standard
212        update process.
213
214        Raises:
215            TestError if the versions are not in ascending order.
216        """
217        for i, name in enumerate(self.update_order[1::]):
218            rw = self.images[name][0][1]
219
220            last_name = self.update_order[i]
221            last_rw = self.images[last_name][0][1]
222            if cr50_utils.GetNewestVersion(last_rw, rw) != rw:
223                raise error.TestError("%s is version %s. %s needs to have a "
224                                      "higher version, but it has %s" %
225                                      (last_name, last_rw, name, rw))
226
227
228    def after_run_once(self):
229        """Add log printing what iteration we just completed"""
230        logging.info("Update iteration %s ran successfully", self.iteration)
231
232
233    def run_once(self):
234        """Update to each image in update_order"""
235        for name in self.update_order:
236            self.run_update(name)
237