1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import pprint
10import re
11import time
12import uuid
13
14from threading import Timer
15from autotest_lib.client.bin import utils
16from autotest_lib.client.common_lib import error
17from autotest_lib.server import test
18from autotest_lib.server.cros import vboot_constants as vboot
19from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
20from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
21from autotest_lib.server.cros.faft.utils import mode_switcher
22from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
23from autotest_lib.server.cros.servo import chrome_ec
24from autotest_lib.server.cros.servo import chrome_usbpd
25
26ConnectionError = mode_switcher.ConnectionError
27
28
29class FAFTBase(test.test):
30    """The base class of FAFT classes.
31
32    It launches the FAFTClient on DUT, such that the test can access its
33    firmware functions and interfaces. It also provides some methods to
34    handle the reboot mechanism, in order to ensure FAFTClient is still
35    connected after reboot.
36    """
37    def initialize(self, host):
38        """Create a FAFTClient object and install the dependency."""
39        self.servo = host.servo
40        self.servo.initialize_dut()
41        self._client = host
42        self.faft_client = RPCProxy(host)
43        self.lockfile = '/var/tmp/faft/lock'
44
45
46class FirmwareTest(FAFTBase):
47    """
48    Base class that sets up helper objects/functions for firmware tests.
49
50    TODO: add documentaion as the FAFT rework progresses.
51    """
52    version = 1
53
54    # Mapping of partition number of kernel and rootfs.
55    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
56    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
57    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
58    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
59
60    CHROMEOS_MAGIC = "CHROMEOS"
61    CORRUPTED_MAGIC = "CORRUPTD"
62
63    # Delay for waiting client to return before EC suspend
64    EC_SUSPEND_DELAY = 5
65
66    # Delay between EC suspend and wake
67    WAKE_DELAY = 10
68
69    # Delay between closing and opening lid
70    LID_DELAY = 1
71
72    _SERVOD_LOG = '/var/log/servod.log'
73
74    _ROOTFS_PARTITION_NUMBER = 3
75
76    _backup_firmware_sha = ()
77    _backup_kernel_sha = dict()
78    _backup_cgpt_attr = dict()
79    _backup_gbb_flags = None
80    _backup_dev_mode = None
81
82    # Class level variable, keep track the states of one time setup.
83    # This variable is preserved across tests which inherit this class.
84    _global_setup_done = {
85        'gbb_flags': False,
86        'reimage': False,
87        'usb_check': False,
88    }
89
90    @classmethod
91    def check_setup_done(cls, label):
92        """Check if the given setup is done.
93
94        @param label: The label of the setup.
95        """
96        return cls._global_setup_done[label]
97
98    @classmethod
99    def mark_setup_done(cls, label):
100        """Mark the given setup done.
101
102        @param label: The label of the setup.
103        """
104        cls._global_setup_done[label] = True
105
106    @classmethod
107    def unmark_setup_done(cls, label):
108        """Mark the given setup not done.
109
110        @param label: The label of the setup.
111        """
112        cls._global_setup_done[label] = False
113
114    def initialize(self, host, cmdline_args, ec_wp=None):
115        super(FirmwareTest, self).initialize(host)
116        self.run_id = str(uuid.uuid4())
117        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
118        # Parse arguments from command line
119        args = {}
120        self.power_control = host.POWER_CONTROL_RPM
121        for arg in cmdline_args:
122            match = re.search("^(\w+)=(.+)", arg)
123            if match:
124                args[match.group(1)] = match.group(2)
125        if 'power_control' in args:
126            self.power_control = args['power_control']
127            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
128                raise error.TestError('Valid values for --args=power_control '
129                                      'are %s. But you entered wrong argument '
130                                      'as "%s".'
131                                       % (host.POWER_CONTROL_VALID_ARGS,
132                                       self.power_control))
133
134        self.faft_config = FAFTConfig(
135                self.faft_client.system.get_platform_name())
136        self.checkers = FAFTCheckers(self)
137        self.switcher = mode_switcher.create_mode_switcher(self)
138
139        if self.faft_config.chrome_ec:
140            self.ec = chrome_ec.ChromeEC(self.servo)
141        # Check for presence of a USBPD console
142        if self.faft_config.chrome_usbpd:
143            self.usbpd = chrome_usbpd.ChromeUSBPD(self.servo)
144        elif self.faft_config.chrome_ec:
145            # If no separate USBPD console, then PD exists on EC console
146            self.usbpd = self.ec
147        # Get plankton console
148        self.plankton = host.plankton
149        self.plankton_host = host._plankton_host
150
151        self._setup_uart_capture()
152        self._setup_servo_log()
153        self._record_system_info()
154        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
155        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
156        if self.fw_vboot2:
157            self.faft_client.system.set_fw_try_next('A')
158            if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
159                logging.info('mainfw_act is B. rebooting to set it A')
160                self.switcher.mode_aware_reboot()
161        self._setup_gbb_flags()
162        self._stop_service('update-engine')
163        self._create_faft_lockfile()
164        self._setup_ec_write_protect(ec_wp)
165        # See chromium:239034 regarding needing this sync.
166        self.blocking_sync()
167        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
168
169    def cleanup(self):
170        """Autotest cleanup function."""
171        # Unset state checker in case it's set by subclass
172        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
173        try:
174            self.faft_client.system.is_available()
175        except:
176            # Remote is not responding. Revive DUT so that subsequent tests
177            # don't fail.
178            self._restore_routine_from_timeout()
179        self.switcher.restore_mode()
180        self._restore_ec_write_protect()
181        self._restore_gbb_flags()
182        self._start_service('update-engine')
183        self._remove_faft_lockfile()
184        self._record_servo_log()
185        self._record_faft_client_log()
186        self._cleanup_uart_capture()
187        super(FirmwareTest, self).cleanup()
188        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
189
190    def _record_system_info(self):
191        """Record some critical system info to the attr keyval.
192
193        This info is used by generate_test_report later.
194        """
195        system_info = {
196            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
197            'ec_version': self.faft_client.ec.get_version(),
198            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
199            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
200            'servod_version': self._client._servo_host.run(
201                'servod --version').stdout.strip(),
202        }
203        logging.info('System info:\n' + pprint.pformat(system_info))
204        self.write_attr_keyval(system_info)
205
206    def invalidate_firmware_setup(self):
207        """Invalidate all firmware related setup state.
208
209        This method is called when the firmware is re-flashed. It resets all
210        firmware related setup states so that the next test setup properly
211        again.
212        """
213        self.unmark_setup_done('gbb_flags')
214
215    def _retrieve_recovery_reason_from_trap(self):
216        """Try to retrieve the recovery reason from a trapped recovery screen.
217
218        @return: The recovery_reason, 0 if any error.
219        """
220        recovery_reason = 0
221        logging.info('Try to retrieve recovery reason...')
222        if self.servo.get_usbkey_direction() == 'dut':
223            self.switcher.bypass_rec_mode()
224        else:
225            self.servo.switch_usbkey('dut')
226
227        try:
228            self.switcher.wait_for_client()
229            lines = self.faft_client.system.run_shell_command_get_output(
230                        'crossystem recovery_reason')
231            recovery_reason = int(lines[0])
232            logging.info('Got the recovery reason %d.', recovery_reason)
233        except ConnectionError:
234            logging.error('Failed to get the recovery reason due to connection '
235                          'error.')
236        return recovery_reason
237
238    def _reset_client(self):
239        """Reset client to a workable state.
240
241        This method is called when the client is not responsive. It may be
242        caused by the following cases:
243          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
244          - corrupted firmware;
245          - corrutped OS image.
246        """
247        # DUT may halt on a firmware screen. Try cold reboot.
248        logging.info('Try cold reboot...')
249        self.switcher.mode_aware_reboot(reboot_type='cold',
250                                        sync_before_boot=False,
251                                        wait_for_dut_up=False)
252        self.switcher.wait_for_client_offline()
253        self.switcher.bypass_dev_mode()
254        try:
255            self.switcher.wait_for_client()
256            return
257        except ConnectionError:
258            logging.warn('Cold reboot doesn\'t help, still connection error.')
259
260        # DUT may be broken by a corrupted firmware. Restore firmware.
261        # We assume the recovery boot still works fine. Since the recovery
262        # code is in RO region and all FAFT tests don't change the RO region
263        # except GBB.
264        if self.is_firmware_saved():
265            self._ensure_client_in_recovery()
266            logging.info('Try restore the original firmware...')
267            if self.is_firmware_changed():
268                try:
269                    self.restore_firmware()
270                    return
271                except ConnectionError:
272                    logging.warn('Restoring firmware doesn\'t help, still '
273                                 'connection error.')
274
275        # Perhaps it's kernel that's broken. Let's try restoring it.
276        if self.is_kernel_saved():
277            self._ensure_client_in_recovery()
278            logging.info('Try restore the original kernel...')
279            if self.is_kernel_changed():
280                try:
281                    self.restore_kernel()
282                    return
283                except ConnectionError:
284                    logging.warn('Restoring kernel doesn\'t help, still '
285                                 'connection error.')
286
287        # DUT may be broken by a corrupted OS image. Restore OS image.
288        self._ensure_client_in_recovery()
289        logging.info('Try restore the OS image...')
290        self.faft_client.system.run_shell_command('chromeos-install --yes')
291        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
292        self.switcher.wait_for_client_offline()
293        self.switcher.bypass_dev_mode()
294        try:
295            self.switcher.wait_for_client()
296            logging.info('Successfully restore OS image.')
297            return
298        except ConnectionError:
299            logging.warn('Restoring OS image doesn\'t help, still connection '
300                         'error.')
301
302    def _ensure_client_in_recovery(self):
303        """Ensure client in recovery boot; reboot into it if necessary.
304
305        @raise TestError: if failed to boot the USB image.
306        """
307        logging.info('Try boot into USB image...')
308        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
309                                     wait_for_dut_up=False)
310        self.servo.switch_usbkey('host')
311        self.switcher.bypass_rec_mode()
312        try:
313            self.switcher.wait_for_client()
314        except ConnectionError:
315            raise error.TestError('Failed to boot the USB image.')
316
317    def _restore_routine_from_timeout(self):
318        """A routine to try to restore the system from a timeout error.
319
320        This method is called when FAFT failed to connect DUT after reboot.
321
322        @raise TestFail: This exception is already raised, with a decription
323                         why it failed.
324        """
325        # DUT is disconnected. Capture the UART output for debug.
326        self._record_uart_capture()
327
328        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
329        # identify if it is a network flaky.
330
331        recovery_reason = self._retrieve_recovery_reason_from_trap()
332
333        # Reset client to a workable state.
334        self._reset_client()
335
336        # Raise the proper TestFail exception.
337        if recovery_reason:
338            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
339                                 'and timed out' % recovery_reason)
340        else:
341            raise error.TestFail('Timed out waiting for DUT reboot')
342
343    def assert_test_image_in_usb_disk(self, usb_dev=None):
344        """Assert an USB disk plugged-in on servo and a test image inside.
345
346        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
347                        If None, it is detected automatically.
348        @raise TestError: if USB disk not detected or not a test image.
349        """
350        if self.check_setup_done('usb_check'):
351            return
352        if usb_dev:
353            assert self.servo.get_usbkey_direction() == 'host'
354        else:
355            self.servo.switch_usbkey('host')
356            usb_dev = self.servo.probe_host_usb_dev()
357            if not usb_dev:
358                raise error.TestError(
359                        'An USB disk should be plugged in the servo board.')
360
361        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
362        logging.info('usb dev is %s', usb_dev)
363        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
364        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
365
366        try:
367            usb_lsb = self.servo.system_output('cat %s' %
368                os.path.join(tmpd, 'etc/lsb-release'))
369            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
370            dut_lsb = '\n'.join(self.faft_client.system.
371                run_shell_command_get_output('cat /etc/lsb-release'))
372            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
373            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
374                raise error.TestError('USB stick in servo is no test image')
375            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
376            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
377            if usb_board != dut_board:
378                raise error.TestError('USB stick in servo contains a %s '
379                    'image, but DUT is a %s' % (usb_board, dut_board))
380        finally:
381            for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
382                self.servo.system(cmd)
383
384        self.mark_setup_done('usb_check')
385
386    def setup_usbkey(self, usbkey, host=None):
387        """Setup the USB disk for the test.
388
389        It checks the setup of USB disk and a valid ChromeOS test image inside.
390        It also muxes the USB disk to either the host or DUT by request.
391
392        @param usbkey: True if the USB disk is required for the test, False if
393                       not required.
394        @param host: Optional, True to mux the USB disk to host, False to mux it
395                    to DUT, default to do nothing.
396        """
397        if usbkey:
398            self.assert_test_image_in_usb_disk()
399        elif host is None:
400            # USB disk is not required for the test. Better to mux it to host.
401            host = True
402
403        if host is True:
404            self.servo.switch_usbkey('host')
405        elif host is False:
406            self.servo.switch_usbkey('dut')
407
408    def get_usbdisk_path_on_dut(self):
409        """Get the path of the USB disk device plugged-in the servo on DUT.
410
411        Returns:
412          A string representing USB disk path, like '/dev/sdb', or None if
413          no USB disk is found.
414        """
415        cmd = 'ls -d /dev/s*[a-z]'
416        original_value = self.servo.get_usbkey_direction()
417
418        # Make the dut unable to see the USB disk.
419        self.servo.switch_usbkey('off')
420        no_usb_set = set(
421            self.faft_client.system.run_shell_command_get_output(cmd))
422
423        # Make the dut able to see the USB disk.
424        self.servo.switch_usbkey('dut')
425        time.sleep(self.faft_config.usb_plug)
426        has_usb_set = set(
427            self.faft_client.system.run_shell_command_get_output(cmd))
428
429        # Back to its original value.
430        if original_value != self.servo.get_usbkey_direction():
431            self.servo.switch_usbkey(original_value)
432
433        diff_set = has_usb_set - no_usb_set
434        if len(diff_set) == 1:
435            return diff_set.pop()
436        else:
437            return None
438
439    def _create_faft_lockfile(self):
440        """Creates the FAFT lockfile."""
441        logging.info('Creating FAFT lockfile...')
442        command = 'touch %s' % (self.lockfile)
443        self.faft_client.system.run_shell_command(command)
444
445    def _remove_faft_lockfile(self):
446        """Removes the FAFT lockfile."""
447        logging.info('Removing FAFT lockfile...')
448        command = 'rm -f %s' % (self.lockfile)
449        self.faft_client.system.run_shell_command(command)
450
451    def _stop_service(self, service):
452        """Stops a upstart service on the client.
453
454        @param service: The name of the upstart service.
455        """
456        logging.info('Stopping %s...', service)
457        command = 'status %s | grep stop || stop %s' % (service, service)
458        self.faft_client.system.run_shell_command(command)
459
460    def _start_service(self, service):
461        """Starts a upstart service on the client.
462
463        @param service: The name of the upstart service.
464        """
465        logging.info('Starting %s...', service)
466        command = 'status %s | grep start || start %s' % (service, service)
467        self.faft_client.system.run_shell_command(command)
468
469    def clear_set_gbb_flags(self, clear_mask, set_mask):
470        """Clear and set the GBB flags in the current flashrom.
471
472        @param clear_mask: A mask of flags to be cleared.
473        @param set_mask: A mask of flags to be set.
474        """
475        gbb_flags = self.faft_client.bios.get_gbb_flags()
476        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
477        if new_flags != gbb_flags:
478            self._backup_gbb_flags = gbb_flags
479            logging.info('Changing GBB flags from 0x%x to 0x%x.',
480                         gbb_flags, new_flags)
481            self.faft_client.bios.set_gbb_flags(new_flags)
482            # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
483            if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
484                self.switcher.mode_aware_reboot()
485        else:
486            logging.info('Current GBB flags look good for test: 0x%x.',
487                         gbb_flags)
488
489    def check_ec_capability(self, required_cap=None, suppress_warning=False):
490        """Check if current platform has required EC capabilities.
491
492        @param required_cap: A list containing required EC capabilities. Pass in
493                             None to only check for presence of Chrome EC.
494        @param suppress_warning: True to suppress any warning messages.
495        @return: True if requirements are met. Otherwise, False.
496        """
497        if not self.faft_config.chrome_ec:
498            if not suppress_warning:
499                logging.warn('Requires Chrome EC to run this test.')
500            return False
501
502        if not required_cap:
503            return True
504
505        for cap in required_cap:
506            if cap not in self.faft_config.ec_capability:
507                if not suppress_warning:
508                    logging.warn('Requires EC capability "%s" to run this '
509                                 'test.', cap)
510                return False
511
512        return True
513
514    def check_root_part_on_non_recovery(self, part):
515        """Check the partition number of root device and on normal/dev boot.
516
517        @param part: A string of partition number, e.g.'3'.
518        @return: True if the root device matched and on normal/dev boot;
519                 otherwise, False.
520        """
521        return self.checkers.root_part_checker(part) and \
522                self.checkers.crossystem_checker({
523                    'mainfw_type': ('normal', 'developer'),
524                })
525
526    def _join_part(self, dev, part):
527        """Return a concatenated string of device and partition number.
528
529        @param dev: A string of device, e.g.'/dev/sda'.
530        @param part: A string of partition number, e.g.'3'.
531        @return: A concatenated string of device and partition number,
532                 e.g.'/dev/sda3'.
533
534        >>> seq = FirmwareTest()
535        >>> seq._join_part('/dev/sda', '3')
536        '/dev/sda3'
537        >>> seq._join_part('/dev/mmcblk0', '2')
538        '/dev/mmcblk0p2'
539        """
540        if 'mmcblk' in dev:
541            return dev + 'p' + part
542        else:
543            return dev + part
544
545    def copy_kernel_and_rootfs(self, from_part, to_part):
546        """Copy kernel and rootfs from from_part to to_part.
547
548        @param from_part: A string of partition number to be copied from.
549        @param to_part: A string of partition number to be copied to.
550        """
551        root_dev = self.faft_client.system.get_root_dev()
552        logging.info('Copying kernel from %s to %s. Please wait...',
553                     from_part, to_part)
554        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
555                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
556                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
557        logging.info('Copying rootfs from %s to %s. Please wait...',
558                     from_part, to_part)
559        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
560                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
561                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
562
563    def ensure_kernel_boot(self, part):
564        """Ensure the request kernel boot.
565
566        If not, it duplicates the current kernel to the requested kernel
567        and sets the requested higher priority to ensure it boot.
568
569        @param part: A string of kernel partition number or 'a'/'b'.
570        """
571        if not self.checkers.root_part_checker(part):
572            if self.faft_client.kernel.diff_a_b():
573                self.copy_kernel_and_rootfs(
574                        from_part=self.OTHER_KERNEL_MAP[part],
575                        to_part=part)
576            self.reset_and_prioritize_kernel(part)
577            self.switcher.mode_aware_reboot()
578
579    def set_hardware_write_protect(self, enable):
580        """Set hardware write protect pin.
581
582        @param enable: True if asserting write protect pin. Otherwise, False.
583        """
584        self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
585        self.servo.set('fw_wp_en', 'on')
586        self.servo.set('fw_wp', 'on' if enable else 'off')
587
588    def set_ec_write_protect_and_reboot(self, enable):
589        """Set EC write protect status and reboot to take effect.
590
591        The write protect state is only activated if both hardware write
592        protect pin is asserted and software write protect flag is set.
593        This method asserts/deasserts hardware write protect pin first, and
594        set corresponding EC software write protect flag.
595
596        If the device uses non-Chrome EC, set the software write protect via
597        flashrom.
598
599        If the device uses Chrome EC, a reboot is required for write protect
600        to take effect. Since the software write protect flag cannot be unset
601        if hardware write protect pin is asserted, we need to deasserted the
602        pin first if we are deactivating write protect. Similarly, a reboot
603        is required before we can modify the software flag.
604
605        @param enable: True if activating EC write protect. Otherwise, False.
606        """
607        self.set_hardware_write_protect(enable)
608        if self.faft_config.chrome_ec:
609            self.set_chrome_ec_write_protect_and_reboot(enable)
610        else:
611            self.faft_client.ec.set_write_protect(enable)
612            self.switcher.mode_aware_reboot()
613
614    def set_chrome_ec_write_protect_and_reboot(self, enable):
615        """Set Chrome EC write protect status and reboot to take effect.
616
617        @param enable: True if activating EC write protect. Otherwise, False.
618        """
619        if enable:
620            # Set write protect flag and reboot to take effect.
621            self.ec.set_flash_write_protect(enable)
622            self.sync_and_ec_reboot()
623        else:
624            # Reboot after deasserting hardware write protect pin to deactivate
625            # write protect. And then remove software write protect flag.
626            self.sync_and_ec_reboot()
627            self.ec.set_flash_write_protect(enable)
628
629    def _setup_ec_write_protect(self, ec_wp):
630        """Setup for EC write-protection.
631
632        It makes sure the EC in the requested write-protection state. If not, it
633        flips the state. Flipping the write-protection requires DUT reboot.
634
635        @param ec_wp: True to request EC write-protected; False to request EC
636                      not write-protected; None to do nothing.
637        """
638        if ec_wp is None:
639            self._old_ec_wp = None
640            return
641        self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
642        if ec_wp != self._old_ec_wp:
643            logging.info('The test required EC is %swrite-protected. Reboot '
644                         'and flip the state.', '' if ec_wp else 'not ')
645            self.switcher.mode_aware_reboot(
646                    'custom',
647                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
648
649    def _restore_ec_write_protect(self):
650        """Restore the original EC write-protection."""
651        if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
652            return
653        if not self.checkers.crossystem_checker(
654                {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
655            logging.info('Restore original EC write protection and reboot.')
656            self.switcher.mode_aware_reboot(
657                    'custom',
658                    lambda:self.set_ec_write_protect_and_reboot(
659                            self._old_ec_wp))
660
661    def _setup_uart_capture(self):
662        """Setup the CPU/EC/PD UART capture."""
663        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
664        self.servo.set('cpu_uart_capture', 'on')
665        self.ec_uart_file = None
666        self.usbpd_uart_file = None
667        if self.faft_config.chrome_ec:
668            try:
669                self.servo.set('ec_uart_capture', 'on')
670                self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
671            except error.TestFail as e:
672                if 'No control named' in str(e):
673                    logging.warn('The servod is too old that ec_uart_capture '
674                                 'not supported.')
675            # Log separate PD console if supported
676            if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
677                try:
678                    self.servo.set('usbpd_uart_capture', 'on')
679                    self.usbpd_uart_file = os.path.join(self.resultsdir,
680                                                        'usbpd_uart.txt')
681                except error.TestFail as e:
682                    if 'No control named' in str(e):
683                        logging.warn('The servod is too old that '
684                                     'usbpd_uart_capture is not supported.')
685        else:
686            logging.info('Not a Google EC, cannot capture ec console output.')
687
688    def _record_uart_capture(self):
689        """Record the CPU/EC/PD UART output stream to files."""
690        if self.cpu_uart_file:
691            with open(self.cpu_uart_file, 'a') as f:
692                f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
693        if self.ec_uart_file and self.faft_config.chrome_ec:
694            with open(self.ec_uart_file, 'a') as f:
695                f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
696        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
697            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
698            with open(self.usbpd_uart_file, 'a') as f:
699                f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
700
701    def _cleanup_uart_capture(self):
702        """Cleanup the CPU/EC/PD UART capture."""
703        # Flush the remaining UART output.
704        self._record_uart_capture()
705        self.servo.set('cpu_uart_capture', 'off')
706        if self.ec_uart_file and self.faft_config.chrome_ec:
707            self.servo.set('ec_uart_capture', 'off')
708        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
709            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
710            self.servo.set('usbpd_uart_capture', 'off')
711
712    def _get_power_state(self, power_state):
713        """
714        Return the current power state of the AP
715        """
716        return self.ec.send_command_get_output("powerinfo", [power_state])
717
718    def wait_power_state(self, power_state, retries):
719        """
720        Wait for certain power state.
721
722        @param power_state: power state you are expecting
723        @param retries: retries.  This is necessary if AP is powering down
724        and transitioning through different states.
725        """
726        logging.info('Checking power state "%s" maximum %d times.',
727                     power_state, retries)
728        while retries > 0:
729            logging.info("try count: %d" % retries)
730            try:
731                retries = retries - 1
732                ret = self._get_power_state(power_state)
733                return True
734            except error.TestFail:
735                pass
736        return False
737
738    def delayed(seconds):
739        def decorator(f):
740            def wrapper(*args, **kargs):
741                t = Timer(seconds, f, args, kargs)
742                t.start()
743            return wrapper
744        return decorator
745
746    @delayed(WAKE_DELAY)
747    def wake_by_power_button(self):
748        """Delay by WAKE_DELAY seconds and then wake DUT with power button."""
749        self.servo.power_normal_press()
750
751    @delayed(WAKE_DELAY)
752    def wake_by_lid_switch(self):
753        """Delay by WAKE_DELAY seconds and then wake DUT with lid switch."""
754        self.servo.set('lid_open', 'no')
755        time.sleep(self.LID_DELAY)
756        self.servo.set('lid_open', 'yes')
757
758    def suspend_as_reboot(self, wake_func):
759        """
760        Suspend DUT and also kill FAFT client so that this acts like a reboot.
761
762        Args:
763          wake_func: A function that is called to wake DUT. Note that this
764            function must delay itself so that we don't wake DUT before
765            suspend_as_reboot returns.
766        """
767        cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
768        self.faft_client.system.run_shell_command(cmd)
769        self.faft_client.disconnect()
770        time.sleep(self.EC_SUSPEND_DELAY)
771        logging.info("wake function disabled")
772        wake_func()
773
774    def _fetch_servo_log(self):
775        """Fetch the servo log."""
776        cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
777        servo_log = self.servo.system_output(cmd)
778        return None if servo_log == 'NOTFOUND' else servo_log
779
780    def _setup_servo_log(self):
781        """Setup the servo log capturing."""
782        self.servo_log_original_len = -1
783        if self.servo.is_localhost():
784            # No servo log recorded when servod runs locally.
785            return
786
787        servo_log = self._fetch_servo_log()
788        if servo_log:
789            self.servo_log_original_len = len(servo_log)
790        else:
791            logging.warn('Servo log file not found.')
792
793    def _record_servo_log(self):
794        """Record the servo log to the results directory."""
795        if self.servo_log_original_len != -1:
796            servo_log = self._fetch_servo_log()
797            servo_log_file = os.path.join(self.resultsdir, 'servod.log')
798            with open(servo_log_file, 'a') as f:
799                f.write(servo_log[self.servo_log_original_len:])
800
801    def _record_faft_client_log(self):
802        """Record the faft client log to the results directory."""
803        client_log = self.faft_client.system.dump_log(True)
804        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
805        with open(client_log_file, 'w') as f:
806            f.write(client_log)
807
808    def _setup_gbb_flags(self):
809        """Setup the GBB flags for FAFT test."""
810        if self.faft_config.gbb_version < 1.1:
811            logging.info('Skip modifying GBB on versions older than 1.1.')
812            return
813
814        if self.check_setup_done('gbb_flags'):
815            return
816
817        logging.info('Set proper GBB flags for test.')
818        self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
819                                 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
820                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
821                                 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK |
822                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP,
823                                 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
824                                 vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
825        self.mark_setup_done('gbb_flags')
826
827    def drop_backup_gbb_flags(self):
828        """Drops the backup GBB flags.
829
830        This can be used when a test intends to permanently change GBB flags.
831        """
832        self._backup_gbb_flags = None
833
834    def _restore_gbb_flags(self):
835        """Restore GBB flags to their original state."""
836        if self._backup_gbb_flags is None:
837            return
838        # Setting up and restoring the GBB flags take a lot of time. For
839        # speed-up purpose, don't restore it.
840        logging.info('***')
841        logging.info('*** Please manually restore the original GBB flags to: '
842                     '0x%x ***', self._backup_gbb_flags)
843        logging.info('***')
844        self.unmark_setup_done('gbb_flags')
845
846    def setup_tried_fwb(self, tried_fwb):
847        """Setup for fw B tried state.
848
849        It makes sure the system in the requested fw B tried state. If not, it
850        tries to do so.
851
852        @param tried_fwb: True if requested in tried_fwb=1;
853                          False if tried_fwb=0.
854        """
855        if tried_fwb:
856            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
857                logging.info(
858                    'Firmware is not booted with tried_fwb. Reboot into it.')
859                self.faft_client.system.set_try_fw_b()
860        else:
861            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
862                logging.info(
863                    'Firmware is booted with tried_fwb. Reboot to clear.')
864
865    def power_on(self):
866        """Switch DUT AC power on."""
867        self._client.power_on(self.power_control)
868
869    def power_off(self):
870        """Switch DUT AC power off."""
871        self._client.power_off(self.power_control)
872
873    def power_cycle(self):
874        """Power cycle DUT AC power."""
875        self._client.power_cycle(self.power_control)
876
877    def setup_rw_boot(self, section='a'):
878        """Make sure firmware is in RW-boot mode.
879
880        If the given firmware section is in RO-boot mode, turn off the RO-boot
881        flag and reboot DUT into RW-boot mode.
882
883        @param section: A firmware section, either 'a' or 'b'.
884        """
885        flags = self.faft_client.bios.get_preamble_flags(section)
886        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
887            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
888            self.faft_client.bios.set_preamble_flags(section, flags)
889            self.switcher.mode_aware_reboot()
890
891    def setup_kernel(self, part):
892        """Setup for kernel test.
893
894        It makes sure both kernel A and B bootable and the current boot is
895        the requested kernel part.
896
897        @param part: A string of kernel partition number or 'a'/'b'.
898        """
899        self.ensure_kernel_boot(part)
900        logging.info('Checking the integrity of kernel B and rootfs B...')
901        if (self.faft_client.kernel.diff_a_b() or
902                not self.faft_client.rootfs.verify_rootfs('B')):
903            logging.info('Copying kernel and rootfs from A to B...')
904            self.copy_kernel_and_rootfs(from_part=part,
905                                        to_part=self.OTHER_KERNEL_MAP[part])
906        self.reset_and_prioritize_kernel(part)
907
908    def reset_and_prioritize_kernel(self, part):
909        """Make the requested partition highest priority.
910
911        This function also reset kerenl A and B to bootable.
912
913        @param part: A string of partition number to be prioritized.
914        """
915        root_dev = self.faft_client.system.get_root_dev()
916        # Reset kernel A and B to bootable.
917        self.faft_client.system.run_shell_command(
918            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
919        self.faft_client.system.run_shell_command(
920            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
921        # Set kernel part highest priority.
922        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
923                (self.KERNEL_MAP[part], root_dev))
924
925    def blocking_sync(self):
926        """Run a blocking sync command."""
927        # The double calls to sync fakes a blocking call
928        # since the first call returns before the flush
929        # is complete, but the second will wait for the
930        # first to finish.
931        self.faft_client.system.run_shell_command('sync')
932        self.faft_client.system.run_shell_command('sync')
933
934        # sync only sends SYNCHRONIZE_CACHE but doesn't
935        # check the status. For mmc devices, use `mmc
936        # status get` command to send an empty command to
937        # wait for the disk to be available again.  For
938        # other devices, hdparm sends TUR to check if
939        # a device is ready for transfer operation.
940        root_dev = self.faft_client.system.get_root_dev()
941        if 'mmcblk' in root_dev:
942            self.faft_client.system.run_shell_command('mmc status get %s' %
943                                                      root_dev)
944        else:
945            self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev)
946
947    def sync_and_ec_reboot(self, flags=''):
948        """Request the client sync and do a EC triggered reboot.
949
950        @param flags: Optional, a space-separated string of flags passed to EC
951                      reboot command, including:
952                          default: EC soft reboot;
953                          'hard': EC cold/hard reboot.
954        """
955        self.blocking_sync()
956        self.ec.reboot(flags)
957        time.sleep(self.faft_config.ec_boot_to_console)
958        self.check_lid_and_power_on()
959
960    def reboot_and_reset_tpm(self):
961        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
962        self.switcher.reboot_to_mode(to_mode='rec')
963        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
964        self.switcher.mode_aware_reboot()
965
966    def full_power_off_and_on(self):
967        """Shutdown the device by pressing power button and power on again."""
968        boot_id = self.get_bootid()
969        # Press power button to trigger Chrome OS normal shutdown process.
970        # We use a customized delay since the normal-press 1.2s is not enough.
971        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
972        # device can take 44-51 seconds to restart,
973        # add buffer from the default timeout of 60 seconds.
974        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
975        time.sleep(self.faft_config.shutdown)
976        # Short press power button to boot DUT again.
977        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
978
979    def check_lid_and_power_on(self):
980        """
981        On devices with EC software sync, system powers on after EC reboots if
982        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
983        This method checks lid switch state and presses power button if
984        necessary.
985        """
986        if self.servo.get("lid_open") == "no":
987            time.sleep(self.faft_config.software_sync)
988            self.servo.power_short_press()
989
990    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
991        """Modify the kernel header magic in USB stick.
992
993        The kernel header magic is the first 8-byte of kernel partition.
994        We modify it to make it fail on kernel verification check.
995
996        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
997        @param from_magic: A string of magic which we change it from.
998        @param to_magic: A string of magic which we change it to.
999        @raise TestError: if failed to change magic.
1000        """
1001        assert len(from_magic) == 8
1002        assert len(to_magic) == 8
1003        # USB image only contains one kernel.
1004        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1005        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1006        current_magic = self.servo.system_output(read_cmd)
1007        if current_magic == to_magic:
1008            logging.info("The kernel magic is already %s.", current_magic)
1009            return
1010        if current_magic != from_magic:
1011            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1012
1013        logging.info('Modify the kernel magic in USB, from %s to %s.',
1014                     from_magic, to_magic)
1015        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1016                     " 2>/dev/null" % (to_magic, kernel_part))
1017        self.servo.system(write_cmd)
1018
1019        if self.servo.system_output(read_cmd) != to_magic:
1020            raise error.TestError("Failed to write new magic.")
1021
1022    def corrupt_usb_kernel(self, usb_dev):
1023        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1024
1025        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1026        """
1027        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1028                                self.CORRUPTED_MAGIC)
1029
1030    def restore_usb_kernel(self, usb_dev):
1031        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1032
1033        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1034        """
1035        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1036                                self.CHROMEOS_MAGIC)
1037
1038    def _call_action(self, action_tuple, check_status=False):
1039        """Call the action function with/without arguments.
1040
1041        @param action_tuple: A function, or a tuple (function, args, error_msg),
1042                             in which, args and error_msg are optional. args is
1043                             either a value or a tuple if multiple arguments.
1044                             This can also be a list containing multiple
1045                             function or tuple. In this case, these actions are
1046                             called in sequence.
1047        @param check_status: Check the return value of action function. If not
1048                             succeed, raises a TestFail exception.
1049        @return: The result value of the action function.
1050        @raise TestError: An error when the action function is not callable.
1051        @raise TestFail: When check_status=True, action function not succeed.
1052        """
1053        if isinstance(action_tuple, list):
1054            return all([self._call_action(action, check_status=check_status)
1055                        for action in action_tuple])
1056
1057        action = action_tuple
1058        args = ()
1059        error_msg = 'Not succeed'
1060        if isinstance(action_tuple, tuple):
1061            action = action_tuple[0]
1062            if len(action_tuple) >= 2:
1063                args = action_tuple[1]
1064                if not isinstance(args, tuple):
1065                    args = (args,)
1066            if len(action_tuple) >= 3:
1067                error_msg = action_tuple[2]
1068
1069        if action is None:
1070            return
1071
1072        if not callable(action):
1073            raise error.TestError('action is not callable!')
1074
1075        info_msg = 'calling %s' % str(action)
1076        if args:
1077            info_msg += ' with args %s' % str(args)
1078        logging.info(info_msg)
1079        ret = action(*args)
1080
1081        if check_status and not ret:
1082            raise error.TestFail('%s: %s returning %s' %
1083                                 (error_msg, info_msg, str(ret)))
1084        return ret
1085
1086    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1087            run_power_action=True, post_power_action=None, shutdown_timeout=None):
1088        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1089
1090        @param shutdown_action: function which makes DUT shutdown, like
1091                                pressing power key.
1092        @param pre_power_action: function which is called before next power on.
1093        @param power_action: power_key press by default, set to None to skip.
1094        @param post_power_action: function which is called after next power on.
1095        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1096        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1097        """
1098        self._call_action(shutdown_action)
1099        logging.info('Wait to ensure DUT shut down...')
1100        try:
1101            if shutdown_timeout is None:
1102                shutdown_timeout = self.faft_config.shutdown_timeout
1103            self.switcher.wait_for_client(timeout=shutdown_timeout)
1104            raise error.TestFail(
1105                    'Should shut the device down after calling %s.' %
1106                    str(shutdown_action))
1107        except ConnectionError:
1108            logging.info(
1109                'DUT is surely shutdown. We are going to power it on again...')
1110
1111        if pre_power_action:
1112            self._call_action(pre_power_action)
1113        if run_power_action:
1114            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1115        if post_power_action:
1116            self._call_action(post_power_action)
1117
1118    def get_bootid(self, retry=3):
1119        """
1120        Return the bootid.
1121        """
1122        boot_id = None
1123        while retry:
1124            try:
1125                boot_id = self._client.get_boot_id()
1126                break
1127            except error.AutoservRunError:
1128                retry -= 1
1129                if retry:
1130                    logging.info('Retry to get boot_id...')
1131                else:
1132                    logging.warning('Failed to get boot_id.')
1133        logging.info('boot_id: %s', boot_id)
1134        return boot_id
1135
1136    def check_state(self, func):
1137        """
1138        Wrapper around _call_action with check_status set to True. This is a
1139        helper function to be used by tests and is currently implemented by
1140        calling _call_action with check_status=True.
1141
1142        TODO: This function's arguments need to be made more stringent. And
1143        its functionality should be moved over to check functions directly in
1144        the future.
1145
1146        @param func: A function, or a tuple (function, args, error_msg),
1147                             in which, args and error_msg are optional. args is
1148                             either a value or a tuple if multiple arguments.
1149                             This can also be a list containing multiple
1150                             function or tuple. In this case, these actions are
1151                             called in sequence.
1152        @return: The result value of the action function.
1153        @raise TestFail: If the function does notsucceed.
1154        """
1155        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1156        self._call_action(func, check_status=True)
1157        logging.info("-[FAFT]-[ end state_checker ]----------------")
1158
1159    def get_current_firmware_sha(self):
1160        """Get current firmware sha of body and vblock.
1161
1162        @return: Current firmware sha follows the order (
1163                 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1164        """
1165        current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1166                                self.faft_client.bios.get_body_sha('a'),
1167                                self.faft_client.bios.get_sig_sha('b'),
1168                                self.faft_client.bios.get_body_sha('b'))
1169        if not all(current_firmware_sha):
1170            raise error.TestError('Failed to get firmware sha.')
1171        return current_firmware_sha
1172
1173    def is_firmware_changed(self):
1174        """Check if the current firmware changed, by comparing its SHA.
1175
1176        @return: True if it is changed, otherwise Flase.
1177        """
1178        # Device may not be rebooted after test.
1179        self.faft_client.bios.reload()
1180
1181        current_sha = self.get_current_firmware_sha()
1182
1183        if current_sha == self._backup_firmware_sha:
1184            return False
1185        else:
1186            corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1187            corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1188            corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1189            corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1190            logging.info("Firmware changed:")
1191            logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1192            logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1193            logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1194            logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1195            return True
1196
1197    def backup_firmware(self, suffix='.original'):
1198        """Backup firmware to file, and then send it to host.
1199
1200        @param suffix: a string appended to backup file name
1201        """
1202        remote_temp_dir = self.faft_client.system.create_temp_dir()
1203        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1204        self.faft_client.bios.dump_whole(remote_bios_path)
1205        self._client.get_file(remote_bios_path,
1206                              os.path.join(self.resultsdir, 'bios' + suffix))
1207        self._client.run('rm -rf %s' % remote_temp_dir)
1208        logging.info('Backup firmware stored in %s with suffix %s',
1209            self.resultsdir, suffix)
1210
1211        self._backup_firmware_sha = self.get_current_firmware_sha()
1212
1213    def is_firmware_saved(self):
1214        """Check if a firmware saved (called backup_firmware before).
1215
1216        @return: True if the firmware is backuped; otherwise False.
1217        """
1218        return self._backup_firmware_sha != ()
1219
1220    def clear_saved_firmware(self):
1221        """Clear the firmware saved by the method backup_firmware."""
1222        self._backup_firmware_sha = ()
1223
1224    def restore_firmware(self, suffix='.original'):
1225        """Restore firmware from host in resultsdir.
1226
1227        @param suffix: a string appended to backup file name
1228        """
1229        if not self.is_firmware_changed():
1230            return
1231
1232        # Backup current corrupted firmware.
1233        self.backup_firmware(suffix='.corrupt')
1234
1235        # Restore firmware.
1236        remote_temp_dir = self.faft_client.system.create_temp_dir()
1237        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1238                               os.path.join(remote_temp_dir, 'bios'))
1239
1240        self.faft_client.bios.write_whole(
1241            os.path.join(remote_temp_dir, 'bios'))
1242        self.switcher.mode_aware_reboot()
1243        logging.info('Successfully restore firmware.')
1244
1245    def setup_firmwareupdate_shellball(self, shellball=None):
1246        """Deside a shellball to use in firmware update test.
1247
1248        Check if there is a given shellball, and it is a shell script. Then,
1249        send it to the remote host. Otherwise, use
1250        /usr/sbin/chromeos-firmwareupdate.
1251
1252        @param shellball: path of a shellball or default to None.
1253
1254        @return: Path of shellball in remote host. If use default shellball,
1255                 reutrn None.
1256        """
1257        updater_path = None
1258        if shellball:
1259            # Determine the firmware file is a shellball or a raw binary.
1260            is_shellball = (utils.system_output("file %s" % shellball).find(
1261                    "shell script") != -1)
1262            if is_shellball:
1263                logging.info('Device will update firmware with shellball %s',
1264                             shellball)
1265                temp_dir = self.faft_client.system.create_temp_dir(
1266                            'shellball_')
1267                temp_shellball = os.path.join(temp_dir, 'updater.sh')
1268                self._client.send_file(shellball, temp_shellball)
1269                updater_path = temp_shellball
1270            else:
1271                raise error.TestFail(
1272                    'The given shellball is not a shell script.')
1273            return updater_path
1274
1275    def is_kernel_changed(self):
1276        """Check if the current kernel is changed, by comparing its SHA1 hash.
1277
1278        @return: True if it is changed; otherwise, False.
1279        """
1280        changed = False
1281        for p in ('A', 'B'):
1282            backup_sha = self._backup_kernel_sha.get(p, None)
1283            current_sha = self.faft_client.kernel.get_sha(p)
1284            if backup_sha != current_sha:
1285                changed = True
1286                logging.info('Kernel %s is changed', p)
1287        return changed
1288
1289    def backup_kernel(self, suffix='.original'):
1290        """Backup kernel to files, and the send them to host.
1291
1292        @param suffix: a string appended to backup file name.
1293        """
1294        remote_temp_dir = self.faft_client.system.create_temp_dir()
1295        for p in ('A', 'B'):
1296            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1297            self.faft_client.kernel.dump(p, remote_path)
1298            self._client.get_file(
1299                    remote_path,
1300                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1301            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1302        logging.info('Backup kernel stored in %s with suffix %s',
1303            self.resultsdir, suffix)
1304
1305    def is_kernel_saved(self):
1306        """Check if kernel images are saved (backup_kernel called before).
1307
1308        @return: True if the kernel is saved; otherwise, False.
1309        """
1310        return len(self._backup_kernel_sha) != 0
1311
1312    def clear_saved_kernel(self):
1313        """Clear the kernel saved by backup_kernel()."""
1314        self._backup_kernel_sha = dict()
1315
1316    def restore_kernel(self, suffix='.original'):
1317        """Restore kernel from host in resultsdir.
1318
1319        @param suffix: a string appended to backup file name.
1320        """
1321        if not self.is_kernel_changed():
1322            return
1323
1324        # Backup current corrupted kernel.
1325        self.backup_kernel(suffix='.corrupt')
1326
1327        # Restore kernel.
1328        remote_temp_dir = self.faft_client.system.create_temp_dir()
1329        for p in ('A', 'B'):
1330            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1331            self._client.send_file(
1332                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1333                    remote_path)
1334            self.faft_client.kernel.write(p, remote_path)
1335
1336        self.switcher.mode_aware_reboot()
1337        logging.info('Successfully restored kernel.')
1338
1339    def backup_cgpt_attributes(self):
1340        """Backup CGPT partition table attributes."""
1341        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1342
1343    def restore_cgpt_attributes(self):
1344        """Restore CGPT partition table attributes."""
1345        current_table = self.faft_client.cgpt.get_attributes()
1346        if current_table == self._backup_cgpt_attr:
1347            return
1348        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1349                     self._backup_cgpt_attr,
1350                     current_table)
1351        self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1352
1353        self.switcher.mode_aware_reboot()
1354        logging.info('Successfully restored CGPT table.')
1355
1356    def try_fwb(self, count=0):
1357        """set to try booting FWB count # times
1358
1359        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1360        vboot2
1361
1362        @param count: an integer specifying value to program into
1363                      fwb_tries(vb1)/fw_try_next(vb2)
1364        """
1365        if self.fw_vboot2:
1366            self.faft_client.system.set_fw_try_next('B', count)
1367        else:
1368            # vboot1: we need to boot into fwb at least once
1369            if not count:
1370                count = count + 1
1371            self.faft_client.system.set_try_fw_b(count)
1372
1373