1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import pprint
10import re
11import time
12import uuid
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import test
17from autotest_lib.server.cros import vboot_constants as vboot
18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
20from autotest_lib.server.cros.faft.utils import mode_switcher
21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
22from autotest_lib.server.cros.servo import chrome_ec
23
24ConnectionError = mode_switcher.ConnectionError
25
26
27class FAFTBase(test.test):
28    """The base class of FAFT classes.
29
30    It launches the FAFTClient on DUT, such that the test can access its
31    firmware functions and interfaces. It also provides some methods to
32    handle the reboot mechanism, in order to ensure FAFTClient is still
33    connected after reboot.
34    """
35    def initialize(self, host):
36        """Create a FAFTClient object and install the dependency."""
37        self.servo = host.servo
38        self.servo.initialize_dut()
39        self._client = host
40        self.faft_client = RPCProxy(host)
41        self.lockfile = '/var/tmp/faft/lock'
42
43
44class FirmwareTest(FAFTBase):
45    """
46    Base class that sets up helper objects/functions for firmware tests.
47
48    TODO: add documentaion as the FAFT rework progresses.
49    """
50    version = 1
51
52    # Mapping of partition number of kernel and rootfs.
53    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
54    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
55    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
56    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
57
58    CHROMEOS_MAGIC = "CHROMEOS"
59    CORRUPTED_MAGIC = "CORRUPTD"
60
61    # Delay for waiting client to return before EC suspend
62    EC_SUSPEND_DELAY = 5
63
64    # Delay between EC suspend and wake
65    WAKE_DELAY = 10
66
67    # Delay between closing and opening lid
68    LID_DELAY = 1
69
70    _SERVOD_LOG = '/var/log/servod.log'
71
72    _ROOTFS_PARTITION_NUMBER = 3
73
74    _backup_firmware_sha = ()
75    _backup_kernel_sha = dict()
76    _backup_cgpt_attr = dict()
77    _backup_gbb_flags = None
78    _backup_dev_mode = None
79
80    # Class level variable, keep track the states of one time setup.
81    # This variable is preserved across tests which inherit this class.
82    _global_setup_done = {
83        'gbb_flags': False,
84        'reimage': False,
85        'usb_check': False,
86    }
87
88    @classmethod
89    def check_setup_done(cls, label):
90        """Check if the given setup is done.
91
92        @param label: The label of the setup.
93        """
94        return cls._global_setup_done[label]
95
96    @classmethod
97    def mark_setup_done(cls, label):
98        """Mark the given setup done.
99
100        @param label: The label of the setup.
101        """
102        cls._global_setup_done[label] = True
103
104    @classmethod
105    def unmark_setup_done(cls, label):
106        """Mark the given setup not done.
107
108        @param label: The label of the setup.
109        """
110        cls._global_setup_done[label] = False
111
112    def initialize(self, host, cmdline_args, ec_wp=None):
113        super(FirmwareTest, self).initialize(host)
114        self.run_id = str(uuid.uuid4())
115        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
116        # Parse arguments from command line
117        args = {}
118        self.power_control = host.POWER_CONTROL_RPM
119        for arg in cmdline_args:
120            match = re.search("^(\w+)=(.+)", arg)
121            if match:
122                args[match.group(1)] = match.group(2)
123        if 'power_control' in args:
124            self.power_control = args['power_control']
125            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
126                raise error.TestError('Valid values for --args=power_control '
127                                      'are %s. But you entered wrong argument '
128                                      'as "%s".'
129                                       % (host.POWER_CONTROL_VALID_ARGS,
130                                       self.power_control))
131
132        self.faft_config = FAFTConfig(
133                self.faft_client.system.get_platform_name())
134        self.checkers = FAFTCheckers(self)
135        self.switcher = mode_switcher.create_mode_switcher(self)
136
137        if self.faft_config.chrome_ec:
138            self.ec = chrome_ec.ChromeEC(self.servo)
139        # Check for presence of a USBPD console
140        if self.faft_config.chrome_usbpd:
141            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
142        elif self.faft_config.chrome_ec:
143            # If no separate USBPD console, then PD exists on EC console
144            self.usbpd = self.ec
145        # Get plankton console
146        self.plankton = host.plankton
147        self.plankton_host = host._plankton_host
148
149        self._setup_uart_capture()
150        self._setup_servo_log()
151        self._record_system_info()
152        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
153        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
154        if self.fw_vboot2:
155            self.faft_client.system.set_fw_try_next('A')
156            if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
157                logging.info('mainfw_act is B. rebooting to set it A')
158                self.switcher.mode_aware_reboot()
159        self._setup_gbb_flags()
160        self._stop_service('update-engine')
161        self._create_faft_lockfile()
162        self._setup_ec_write_protect(ec_wp)
163        # See chromium:239034 regarding needing this sync.
164        self.blocking_sync()
165        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
166
167    def cleanup(self):
168        """Autotest cleanup function."""
169        # Unset state checker in case it's set by subclass
170        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
171        try:
172            self.faft_client.system.is_available()
173        except:
174            # Remote is not responding. Revive DUT so that subsequent tests
175            # don't fail.
176            self._restore_routine_from_timeout()
177        self.switcher.restore_mode()
178        self._restore_ec_write_protect()
179        self._restore_gbb_flags()
180        self._start_service('update-engine')
181        self._remove_faft_lockfile()
182        self._record_servo_log()
183        self._record_faft_client_log()
184        self._cleanup_uart_capture()
185        super(FirmwareTest, self).cleanup()
186        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
187
188    def _record_system_info(self):
189        """Record some critical system info to the attr keyval.
190
191        This info is used by generate_test_report later.
192        """
193        system_info = {
194            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
195            'ec_version': self.faft_client.ec.get_version(),
196            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
197            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
198            'servod_version': self._client._servo_host.run(
199                'servod --version').stdout.strip(),
200        }
201        logging.info('System info:\n' + pprint.pformat(system_info))
202        self.write_attr_keyval(system_info)
203
204    def invalidate_firmware_setup(self):
205        """Invalidate all firmware related setup state.
206
207        This method is called when the firmware is re-flashed. It resets all
208        firmware related setup states so that the next test setup properly
209        again.
210        """
211        self.unmark_setup_done('gbb_flags')
212
213    def _retrieve_recovery_reason_from_trap(self):
214        """Try to retrieve the recovery reason from a trapped recovery screen.
215
216        @return: The recovery_reason, 0 if any error.
217        """
218        recovery_reason = 0
219        logging.info('Try to retrieve recovery reason...')
220        if self.servo.get_usbkey_direction() == 'dut':
221            self.switcher.bypass_rec_mode()
222        else:
223            self.servo.switch_usbkey('dut')
224
225        try:
226            self.switcher.wait_for_client()
227            lines = self.faft_client.system.run_shell_command_get_output(
228                        'crossystem recovery_reason')
229            recovery_reason = int(lines[0])
230            logging.info('Got the recovery reason %d.', recovery_reason)
231        except ConnectionError:
232            logging.error('Failed to get the recovery reason due to connection '
233                          'error.')
234        return recovery_reason
235
236    def _reset_client(self):
237        """Reset client to a workable state.
238
239        This method is called when the client is not responsive. It may be
240        caused by the following cases:
241          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
242          - corrupted firmware;
243          - corrutped OS image.
244        """
245        # DUT may halt on a firmware screen. Try cold reboot.
246        logging.info('Try cold reboot...')
247        self.switcher.mode_aware_reboot(reboot_type='cold',
248                                        sync_before_boot=False,
249                                        wait_for_dut_up=False)
250        self.switcher.wait_for_client_offline()
251        self.switcher.bypass_dev_mode()
252        try:
253            self.switcher.wait_for_client()
254            return
255        except ConnectionError:
256            logging.warn('Cold reboot doesn\'t help, still connection error.')
257
258        # DUT may be broken by a corrupted firmware. Restore firmware.
259        # We assume the recovery boot still works fine. Since the recovery
260        # code is in RO region and all FAFT tests don't change the RO region
261        # except GBB.
262        if self.is_firmware_saved():
263            self._ensure_client_in_recovery()
264            logging.info('Try restore the original firmware...')
265            if self.is_firmware_changed():
266                try:
267                    self.restore_firmware()
268                    return
269                except ConnectionError:
270                    logging.warn('Restoring firmware doesn\'t help, still '
271                                 'connection error.')
272
273        # Perhaps it's kernel that's broken. Let's try restoring it.
274        if self.is_kernel_saved():
275            self._ensure_client_in_recovery()
276            logging.info('Try restore the original kernel...')
277            if self.is_kernel_changed():
278                try:
279                    self.restore_kernel()
280                    return
281                except ConnectionError:
282                    logging.warn('Restoring kernel doesn\'t help, still '
283                                 'connection error.')
284
285        # DUT may be broken by a corrupted OS image. Restore OS image.
286        self._ensure_client_in_recovery()
287        logging.info('Try restore the OS image...')
288        self.faft_client.system.run_shell_command('chromeos-install --yes')
289        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
290        self.switcher.wait_for_client_offline()
291        self.switcher.bypass_dev_mode()
292        try:
293            self.switcher.wait_for_client()
294            logging.info('Successfully restore OS image.')
295            return
296        except ConnectionError:
297            logging.warn('Restoring OS image doesn\'t help, still connection '
298                         'error.')
299
300    def _ensure_client_in_recovery(self):
301        """Ensure client in recovery boot; reboot into it if necessary.
302
303        @raise TestError: if failed to boot the USB image.
304        """
305        logging.info('Try boot into USB image...')
306        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
307                                     wait_for_dut_up=False)
308        self.servo.switch_usbkey('host')
309        self.switcher.bypass_rec_mode()
310        try:
311            self.switcher.wait_for_client()
312        except ConnectionError:
313            raise error.TestError('Failed to boot the USB image.')
314
315    def _restore_routine_from_timeout(self):
316        """A routine to try to restore the system from a timeout error.
317
318        This method is called when FAFT failed to connect DUT after reboot.
319
320        @raise TestFail: This exception is already raised, with a decription
321                         why it failed.
322        """
323        # DUT is disconnected. Capture the UART output for debug.
324        self._record_uart_capture()
325
326        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
327        # identify if it is a network flaky.
328
329        recovery_reason = self._retrieve_recovery_reason_from_trap()
330
331        # Reset client to a workable state.
332        self._reset_client()
333
334        # Raise the proper TestFail exception.
335        if recovery_reason:
336            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
337                                 'and timed out' % recovery_reason)
338        else:
339            raise error.TestFail('Timed out waiting for DUT reboot')
340
341    def assert_test_image_in_usb_disk(self, usb_dev=None):
342        """Assert an USB disk plugged-in on servo and a test image inside.
343
344        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
345                        If None, it is detected automatically.
346        @raise TestError: if USB disk not detected or not a test image.
347        """
348        if self.check_setup_done('usb_check'):
349            return
350        if usb_dev:
351            assert self.servo.get_usbkey_direction() == 'host'
352        else:
353            self.servo.switch_usbkey('host')
354            usb_dev = self.servo.probe_host_usb_dev()
355            if not usb_dev:
356                raise error.TestError(
357                        'An USB disk should be plugged in the servo board.')
358
359        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
360        logging.info('usb dev is %s', usb_dev)
361        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
362        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
363
364        try:
365            usb_lsb = self.servo.system_output('cat %s' %
366                os.path.join(tmpd, 'etc/lsb-release'))
367            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
368            dut_lsb = '\n'.join(self.faft_client.system.
369                run_shell_command_get_output('cat /etc/lsb-release'))
370            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
371            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
372                raise error.TestError('USB stick in servo is no test image')
373            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
374            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
375            if usb_board != dut_board:
376                raise error.TestError('USB stick in servo contains a %s '
377                    'image, but DUT is a %s' % (usb_board, dut_board))
378        finally:
379            for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
380                self.servo.system(cmd)
381
382        self.mark_setup_done('usb_check')
383
384    def setup_usbkey(self, usbkey, host=None):
385        """Setup the USB disk for the test.
386
387        It checks the setup of USB disk and a valid ChromeOS test image inside.
388        It also muxes the USB disk to either the host or DUT by request.
389
390        @param usbkey: True if the USB disk is required for the test, False if
391                       not required.
392        @param host: Optional, True to mux the USB disk to host, False to mux it
393                    to DUT, default to do nothing.
394        """
395        if usbkey:
396            self.assert_test_image_in_usb_disk()
397        elif host is None:
398            # USB disk is not required for the test. Better to mux it to host.
399            host = True
400
401        if host is True:
402            self.servo.switch_usbkey('host')
403        elif host is False:
404            self.servo.switch_usbkey('dut')
405
406    def get_usbdisk_path_on_dut(self):
407        """Get the path of the USB disk device plugged-in the servo on DUT.
408
409        Returns:
410          A string representing USB disk path, like '/dev/sdb', or None if
411          no USB disk is found.
412        """
413        cmd = 'ls -d /dev/s*[a-z]'
414        original_value = self.servo.get_usbkey_direction()
415
416        # Make the dut unable to see the USB disk.
417        self.servo.switch_usbkey('off')
418        no_usb_set = set(
419            self.faft_client.system.run_shell_command_get_output(cmd))
420
421        # Make the dut able to see the USB disk.
422        self.servo.switch_usbkey('dut')
423        time.sleep(self.faft_config.usb_plug)
424        has_usb_set = set(
425            self.faft_client.system.run_shell_command_get_output(cmd))
426
427        # Back to its original value.
428        if original_value != self.servo.get_usbkey_direction():
429            self.servo.switch_usbkey(original_value)
430
431        diff_set = has_usb_set - no_usb_set
432        if len(diff_set) == 1:
433            return diff_set.pop()
434        else:
435            return None
436
437    def _create_faft_lockfile(self):
438        """Creates the FAFT lockfile."""
439        logging.info('Creating FAFT lockfile...')
440        command = 'touch %s' % (self.lockfile)
441        self.faft_client.system.run_shell_command(command)
442
443    def _remove_faft_lockfile(self):
444        """Removes the FAFT lockfile."""
445        logging.info('Removing FAFT lockfile...')
446        command = 'rm -f %s' % (self.lockfile)
447        self.faft_client.system.run_shell_command(command)
448
449    def _stop_service(self, service):
450        """Stops a upstart service on the client.
451
452        @param service: The name of the upstart service.
453        """
454        logging.info('Stopping %s...', service)
455        command = 'status %s | grep stop || stop %s' % (service, service)
456        self.faft_client.system.run_shell_command(command)
457
458    def _start_service(self, service):
459        """Starts a upstart service on the client.
460
461        @param service: The name of the upstart service.
462        """
463        logging.info('Starting %s...', service)
464        command = 'status %s | grep start || start %s' % (service, service)
465        self.faft_client.system.run_shell_command(command)
466
467    def clear_set_gbb_flags(self, clear_mask, set_mask):
468        """Clear and set the GBB flags in the current flashrom.
469
470        @param clear_mask: A mask of flags to be cleared.
471        @param set_mask: A mask of flags to be set.
472        """
473        gbb_flags = self.faft_client.bios.get_gbb_flags()
474        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
475        if new_flags != gbb_flags:
476            self._backup_gbb_flags = gbb_flags
477            logging.info('Changing GBB flags from 0x%x to 0x%x.',
478                         gbb_flags, new_flags)
479            self.faft_client.bios.set_gbb_flags(new_flags)
480            # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state
481            if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON):
482                self.switcher.mode_aware_reboot()
483        else:
484            logging.info('Current GBB flags look good for test: 0x%x.',
485                         gbb_flags)
486
487    def check_ec_capability(self, required_cap=None, suppress_warning=False):
488        """Check if current platform has required EC capabilities.
489
490        @param required_cap: A list containing required EC capabilities. Pass in
491                             None to only check for presence of Chrome EC.
492        @param suppress_warning: True to suppress any warning messages.
493        @return: True if requirements are met. Otherwise, False.
494        """
495        if not self.faft_config.chrome_ec:
496            if not suppress_warning:
497                logging.warn('Requires Chrome EC to run this test.')
498            return False
499
500        if not required_cap:
501            return True
502
503        for cap in required_cap:
504            if cap not in self.faft_config.ec_capability:
505                if not suppress_warning:
506                    logging.warn('Requires EC capability "%s" to run this '
507                                 'test.', cap)
508                return False
509
510        return True
511
512    def check_root_part_on_non_recovery(self, part):
513        """Check the partition number of root device and on normal/dev boot.
514
515        @param part: A string of partition number, e.g.'3'.
516        @return: True if the root device matched and on normal/dev boot;
517                 otherwise, False.
518        """
519        return self.checkers.root_part_checker(part) and \
520                self.checkers.crossystem_checker({
521                    'mainfw_type': ('normal', 'developer'),
522                })
523
524    def _join_part(self, dev, part):
525        """Return a concatenated string of device and partition number.
526
527        @param dev: A string of device, e.g.'/dev/sda'.
528        @param part: A string of partition number, e.g.'3'.
529        @return: A concatenated string of device and partition number,
530                 e.g.'/dev/sda3'.
531
532        >>> seq = FirmwareTest()
533        >>> seq._join_part('/dev/sda', '3')
534        '/dev/sda3'
535        >>> seq._join_part('/dev/mmcblk0', '2')
536        '/dev/mmcblk0p2'
537        """
538        if 'mmcblk' in dev:
539            return dev + 'p' + part
540        else:
541            return dev + part
542
543    def copy_kernel_and_rootfs(self, from_part, to_part):
544        """Copy kernel and rootfs from from_part to to_part.
545
546        @param from_part: A string of partition number to be copied from.
547        @param to_part: A string of partition number to be copied to.
548        """
549        root_dev = self.faft_client.system.get_root_dev()
550        logging.info('Copying kernel from %s to %s. Please wait...',
551                     from_part, to_part)
552        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
553                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
554                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
555        logging.info('Copying rootfs from %s to %s. Please wait...',
556                     from_part, to_part)
557        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
558                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
559                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
560
561    def ensure_kernel_boot(self, part):
562        """Ensure the request kernel boot.
563
564        If not, it duplicates the current kernel to the requested kernel
565        and sets the requested higher priority to ensure it boot.
566
567        @param part: A string of kernel partition number or 'a'/'b'.
568        """
569        if not self.checkers.root_part_checker(part):
570            if self.faft_client.kernel.diff_a_b():
571                self.copy_kernel_and_rootfs(
572                        from_part=self.OTHER_KERNEL_MAP[part],
573                        to_part=part)
574            self.reset_and_prioritize_kernel(part)
575            self.switcher.mode_aware_reboot()
576
577    def set_hardware_write_protect(self, enable):
578        """Set hardware write protect pin.
579
580        @param enable: True if asserting write protect pin. Otherwise, False.
581        """
582        try:
583            self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
584        except:
585            # TODO(waihong): Remove this fallback when all servos have the
586            # above new fw_wp_state control.
587            self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
588            self.servo.set('fw_wp_en', 'on')
589            self.servo.set('fw_wp', 'on' if enable else 'off')
590
591    def set_ec_write_protect_and_reboot(self, enable):
592        """Set EC write protect status and reboot to take effect.
593
594        The write protect state is only activated if both hardware write
595        protect pin is asserted and software write protect flag is set.
596        This method asserts/deasserts hardware write protect pin first, and
597        set corresponding EC software write protect flag.
598
599        If the device uses non-Chrome EC, set the software write protect via
600        flashrom.
601
602        If the device uses Chrome EC, a reboot is required for write protect
603        to take effect. Since the software write protect flag cannot be unset
604        if hardware write protect pin is asserted, we need to deasserted the
605        pin first if we are deactivating write protect. Similarly, a reboot
606        is required before we can modify the software flag.
607
608        @param enable: True if activating EC write protect. Otherwise, False.
609        """
610        self.set_hardware_write_protect(enable)
611        if self.faft_config.chrome_ec:
612            self.set_chrome_ec_write_protect_and_reboot(enable)
613        else:
614            self.faft_client.ec.set_write_protect(enable)
615            self.switcher.mode_aware_reboot()
616
617    def set_chrome_ec_write_protect_and_reboot(self, enable):
618        """Set Chrome EC write protect status and reboot to take effect.
619
620        @param enable: True if activating EC write protect. Otherwise, False.
621        """
622        if enable:
623            # Set write protect flag and reboot to take effect.
624            self.ec.set_flash_write_protect(enable)
625            self.sync_and_ec_reboot()
626        else:
627            # Reboot after deasserting hardware write protect pin to deactivate
628            # write protect. And then remove software write protect flag.
629            self.sync_and_ec_reboot()
630            self.ec.set_flash_write_protect(enable)
631
632    def _setup_ec_write_protect(self, ec_wp):
633        """Setup for EC write-protection.
634
635        It makes sure the EC in the requested write-protection state. If not, it
636        flips the state. Flipping the write-protection requires DUT reboot.
637
638        @param ec_wp: True to request EC write-protected; False to request EC
639                      not write-protected; None to do nothing.
640        """
641        if ec_wp is None:
642            self._old_ec_wp = None
643            return
644        self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
645        if ec_wp != self._old_ec_wp:
646            logging.info('The test required EC is %swrite-protected. Reboot '
647                         'and flip the state.', '' if ec_wp else 'not ')
648            self.switcher.mode_aware_reboot(
649                    'custom',
650                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
651
652    def _restore_ec_write_protect(self):
653        """Restore the original EC write-protection."""
654        if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
655            return
656        if not self.checkers.crossystem_checker(
657                {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
658            logging.info('Restore original EC write protection and reboot.')
659            self.switcher.mode_aware_reboot(
660                    'custom',
661                    lambda:self.set_ec_write_protect_and_reboot(
662                            self._old_ec_wp))
663
664    def _setup_uart_capture(self):
665        """Setup the CPU/EC/PD UART capture."""
666        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
667        self.servo.set('cpu_uart_capture', 'on')
668        self.cr50_console_file = None
669        self.ec_uart_file = None
670        self.usbpd_uart_file = None
671        try:
672            self.servo.set('cr50_console_capture', 'on')
673            self.cr50_console_file = os.path.join(self.resultsdir,
674                                                  'cr50_console.txt')
675            self.cr50 = chrome_ec.ChromeCr50(self.servo)
676        except error.TestFail as e:
677            if 'No control named' in str(e):
678                logging.warn('cr50 console not supported.')
679        if self.faft_config.chrome_ec:
680            try:
681                self.servo.set('ec_uart_capture', 'on')
682                self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
683            except error.TestFail as e:
684                if 'No control named' in str(e):
685                    logging.warn('The servod is too old that ec_uart_capture '
686                                 'not supported.')
687            # Log separate PD console if supported
688            if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
689                try:
690                    self.servo.set('usbpd_uart_capture', 'on')
691                    self.usbpd_uart_file = os.path.join(self.resultsdir,
692                                                        'usbpd_uart.txt')
693                except error.TestFail as e:
694                    if 'No control named' in str(e):
695                        logging.warn('The servod is too old that '
696                                     'usbpd_uart_capture is not supported.')
697        else:
698            logging.info('Not a Google EC, cannot capture ec console output.')
699
700    def _record_uart_capture(self):
701        """Record the CPU/EC/PD UART output stream to files."""
702        if self.cpu_uart_file:
703            with open(self.cpu_uart_file, 'a') as f:
704                f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
705        if self.cr50_console_file:
706            with open(self.cr50_console_file, 'a') as f:
707                f.write(ast.literal_eval(self.servo.get('cr50_console_stream')))
708        if self.ec_uart_file and self.faft_config.chrome_ec:
709            with open(self.ec_uart_file, 'a') as f:
710                f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
711        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
712            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
713            with open(self.usbpd_uart_file, 'a') as f:
714                f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
715
716    def _cleanup_uart_capture(self):
717        """Cleanup the CPU/EC/PD UART capture."""
718        # Flush the remaining UART output.
719        self._record_uart_capture()
720        self.servo.set('cpu_uart_capture', 'off')
721        if self.cr50_console_file:
722            self.servo.set('cr50_console_capture', 'off')
723        if self.ec_uart_file and self.faft_config.chrome_ec:
724            self.servo.set('ec_uart_capture', 'off')
725        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
726            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
727            self.servo.set('usbpd_uart_capture', 'off')
728
729    def _get_power_state(self, power_state):
730        """
731        Return the current power state of the AP
732        """
733        return self.ec.send_command_get_output("powerinfo", [power_state])
734
735    def wait_power_state(self, power_state, retries):
736        """
737        Wait for certain power state.
738
739        @param power_state: power state you are expecting
740        @param retries: retries.  This is necessary if AP is powering down
741        and transitioning through different states.
742        """
743        logging.info('Checking power state "%s" maximum %d times.',
744                     power_state, retries)
745        while retries > 0:
746            logging.info("try count: %d", retries)
747            try:
748                retries = retries - 1
749                ret = self._get_power_state(power_state)
750                return True
751            except error.TestFail:
752                pass
753        return False
754
755    def suspend(self):
756        """Suspends the DUT."""
757        cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
758        self.faft_client.system.run_shell_command(cmd)
759        time.sleep(self.EC_SUSPEND_DELAY)
760
761    def _fetch_servo_log(self):
762        """Fetch the servo log."""
763        cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
764        servo_log = self.servo.system_output(cmd)
765        return None if servo_log == 'NOTFOUND' else servo_log
766
767    def _setup_servo_log(self):
768        """Setup the servo log capturing."""
769        self.servo_log_original_len = -1
770        if self.servo.is_localhost():
771            # No servo log recorded when servod runs locally.
772            return
773
774        servo_log = self._fetch_servo_log()
775        if servo_log:
776            self.servo_log_original_len = len(servo_log)
777        else:
778            logging.warn('Servo log file not found.')
779
780    def _record_servo_log(self):
781        """Record the servo log to the results directory."""
782        if self.servo_log_original_len != -1:
783            servo_log = self._fetch_servo_log()
784            servo_log_file = os.path.join(self.resultsdir, 'servod.log')
785            with open(servo_log_file, 'a') as f:
786                f.write(servo_log[self.servo_log_original_len:])
787
788    def _record_faft_client_log(self):
789        """Record the faft client log to the results directory."""
790        client_log = self.faft_client.system.dump_log(True)
791        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
792        with open(client_log_file, 'w') as f:
793            f.write(client_log)
794
795    def _setup_gbb_flags(self):
796        """Setup the GBB flags for FAFT test."""
797        if self.faft_config.gbb_version < 1.1:
798            logging.info('Skip modifying GBB on versions older than 1.1.')
799            return
800
801        if self.check_setup_done('gbb_flags'):
802            return
803
804        logging.info('Set proper GBB flags for test.')
805        self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
806                                 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
807                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
808                                 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK |
809                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP,
810                                 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
811                                 vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
812        self.mark_setup_done('gbb_flags')
813
814    def drop_backup_gbb_flags(self):
815        """Drops the backup GBB flags.
816
817        This can be used when a test intends to permanently change GBB flags.
818        """
819        self._backup_gbb_flags = None
820
821    def _restore_gbb_flags(self):
822        """Restore GBB flags to their original state."""
823        if self._backup_gbb_flags is None:
824            return
825        # Setting up and restoring the GBB flags take a lot of time. For
826        # speed-up purpose, don't restore it.
827        logging.info('***')
828        logging.info('*** Please manually restore the original GBB flags to: '
829                     '0x%x ***', self._backup_gbb_flags)
830        logging.info('***')
831        self.unmark_setup_done('gbb_flags')
832
833    def setup_tried_fwb(self, tried_fwb):
834        """Setup for fw B tried state.
835
836        It makes sure the system in the requested fw B tried state. If not, it
837        tries to do so.
838
839        @param tried_fwb: True if requested in tried_fwb=1;
840                          False if tried_fwb=0.
841        """
842        if tried_fwb:
843            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
844                logging.info(
845                    'Firmware is not booted with tried_fwb. Reboot into it.')
846                self.faft_client.system.set_try_fw_b()
847        else:
848            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
849                logging.info(
850                    'Firmware is booted with tried_fwb. Reboot to clear.')
851
852    def power_on(self):
853        """Switch DUT AC power on."""
854        self._client.power_on(self.power_control)
855
856    def power_off(self):
857        """Switch DUT AC power off."""
858        self._client.power_off(self.power_control)
859
860    def power_cycle(self):
861        """Power cycle DUT AC power."""
862        self._client.power_cycle(self.power_control)
863
864    def setup_rw_boot(self, section='a'):
865        """Make sure firmware is in RW-boot mode.
866
867        If the given firmware section is in RO-boot mode, turn off the RO-boot
868        flag and reboot DUT into RW-boot mode.
869
870        @param section: A firmware section, either 'a' or 'b'.
871        """
872        flags = self.faft_client.bios.get_preamble_flags(section)
873        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
874            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
875            self.faft_client.bios.set_preamble_flags(section, flags)
876            self.switcher.mode_aware_reboot()
877
878    def setup_kernel(self, part):
879        """Setup for kernel test.
880
881        It makes sure both kernel A and B bootable and the current boot is
882        the requested kernel part.
883
884        @param part: A string of kernel partition number or 'a'/'b'.
885        """
886        self.ensure_kernel_boot(part)
887        logging.info('Checking the integrity of kernel B and rootfs B...')
888        if (self.faft_client.kernel.diff_a_b() or
889                not self.faft_client.rootfs.verify_rootfs('B')):
890            logging.info('Copying kernel and rootfs from A to B...')
891            self.copy_kernel_and_rootfs(from_part=part,
892                                        to_part=self.OTHER_KERNEL_MAP[part])
893        self.reset_and_prioritize_kernel(part)
894
895    def reset_and_prioritize_kernel(self, part):
896        """Make the requested partition highest priority.
897
898        This function also reset kerenl A and B to bootable.
899
900        @param part: A string of partition number to be prioritized.
901        """
902        root_dev = self.faft_client.system.get_root_dev()
903        # Reset kernel A and B to bootable.
904        self.faft_client.system.run_shell_command(
905            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
906        self.faft_client.system.run_shell_command(
907            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
908        # Set kernel part highest priority.
909        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
910                (self.KERNEL_MAP[part], root_dev))
911
912    def blocking_sync(self):
913        """Run a blocking sync command."""
914        # The double calls to sync fakes a blocking call
915        # since the first call returns before the flush
916        # is complete, but the second will wait for the
917        # first to finish.
918        self.faft_client.system.run_shell_command('sync')
919        self.faft_client.system.run_shell_command('sync')
920
921        # sync only sends SYNCHRONIZE_CACHE but doesn't
922        # check the status. For mmc devices, use `mmc
923        # status get` command to send an empty command to
924        # wait for the disk to be available again.  For
925        # other devices, hdparm sends TUR to check if
926        # a device is ready for transfer operation.
927        root_dev = self.faft_client.system.get_root_dev()
928        if 'mmcblk' in root_dev:
929            self.faft_client.system.run_shell_command('mmc status get %s' %
930                                                      root_dev)
931        else:
932            self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev)
933
934    def sync_and_ec_reboot(self, flags=''):
935        """Request the client sync and do a EC triggered reboot.
936
937        @param flags: Optional, a space-separated string of flags passed to EC
938                      reboot command, including:
939                          default: EC soft reboot;
940                          'hard': EC cold/hard reboot.
941        """
942        self.blocking_sync()
943        self.ec.reboot(flags)
944        time.sleep(self.faft_config.ec_boot_to_console)
945        self.check_lid_and_power_on()
946
947    def reboot_and_reset_tpm(self):
948        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
949        self.switcher.reboot_to_mode(to_mode='rec')
950        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
951        self.switcher.mode_aware_reboot()
952
953    def full_power_off_and_on(self):
954        """Shutdown the device by pressing power button and power on again."""
955        boot_id = self.get_bootid()
956        # Press power button to trigger Chrome OS normal shutdown process.
957        # We use a customized delay since the normal-press 1.2s is not enough.
958        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
959        # device can take 44-51 seconds to restart,
960        # add buffer from the default timeout of 60 seconds.
961        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
962        time.sleep(self.faft_config.shutdown)
963        # Short press power button to boot DUT again.
964        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
965
966    def check_lid_and_power_on(self):
967        """
968        On devices with EC software sync, system powers on after EC reboots if
969        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
970        This method checks lid switch state and presses power button if
971        necessary.
972        """
973        if self.servo.get("lid_open") == "no":
974            time.sleep(self.faft_config.software_sync)
975            self.servo.power_short_press()
976
977    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
978        """Modify the kernel header magic in USB stick.
979
980        The kernel header magic is the first 8-byte of kernel partition.
981        We modify it to make it fail on kernel verification check.
982
983        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
984        @param from_magic: A string of magic which we change it from.
985        @param to_magic: A string of magic which we change it to.
986        @raise TestError: if failed to change magic.
987        """
988        assert len(from_magic) == 8
989        assert len(to_magic) == 8
990        # USB image only contains one kernel.
991        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
992        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
993        current_magic = self.servo.system_output(read_cmd)
994        if current_magic == to_magic:
995            logging.info("The kernel magic is already %s.", current_magic)
996            return
997        if current_magic != from_magic:
998            raise error.TestError("Invalid kernel image on USB: wrong magic.")
999
1000        logging.info('Modify the kernel magic in USB, from %s to %s.',
1001                     from_magic, to_magic)
1002        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1003                     " 2>/dev/null" % (to_magic, kernel_part))
1004        self.servo.system(write_cmd)
1005
1006        if self.servo.system_output(read_cmd) != to_magic:
1007            raise error.TestError("Failed to write new magic.")
1008
1009    def corrupt_usb_kernel(self, usb_dev):
1010        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1011
1012        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1013        """
1014        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1015                                self.CORRUPTED_MAGIC)
1016
1017    def restore_usb_kernel(self, usb_dev):
1018        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1019
1020        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1021        """
1022        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1023                                self.CHROMEOS_MAGIC)
1024
1025    def _call_action(self, action_tuple, check_status=False):
1026        """Call the action function with/without arguments.
1027
1028        @param action_tuple: A function, or a tuple (function, args, error_msg),
1029                             in which, args and error_msg are optional. args is
1030                             either a value or a tuple if multiple arguments.
1031                             This can also be a list containing multiple
1032                             function or tuple. In this case, these actions are
1033                             called in sequence.
1034        @param check_status: Check the return value of action function. If not
1035                             succeed, raises a TestFail exception.
1036        @return: The result value of the action function.
1037        @raise TestError: An error when the action function is not callable.
1038        @raise TestFail: When check_status=True, action function not succeed.
1039        """
1040        if isinstance(action_tuple, list):
1041            return all([self._call_action(action, check_status=check_status)
1042                        for action in action_tuple])
1043
1044        action = action_tuple
1045        args = ()
1046        error_msg = 'Not succeed'
1047        if isinstance(action_tuple, tuple):
1048            action = action_tuple[0]
1049            if len(action_tuple) >= 2:
1050                args = action_tuple[1]
1051                if not isinstance(args, tuple):
1052                    args = (args,)
1053            if len(action_tuple) >= 3:
1054                error_msg = action_tuple[2]
1055
1056        if action is None:
1057            return
1058
1059        if not callable(action):
1060            raise error.TestError('action is not callable!')
1061
1062        info_msg = 'calling %s' % action.__name__
1063        if args:
1064            info_msg += ' with args %s' % str(args)
1065        logging.info(info_msg)
1066        ret = action(*args)
1067
1068        if check_status and not ret:
1069            raise error.TestFail('%s: %s returning %s' %
1070                                 (error_msg, info_msg, str(ret)))
1071        return ret
1072
1073    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1074                             run_power_action=True, post_power_action=None,
1075                             shutdown_timeout=None):
1076        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1077
1078        @param shutdown_action: function which makes DUT shutdown, like
1079                                pressing power key.
1080        @param pre_power_action: function which is called before next power on.
1081        @param run_power_action: power_key press by default, set to None to skip.
1082        @param post_power_action: function which is called after next power on.
1083        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1084        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1085        """
1086        self._call_action(shutdown_action)
1087        logging.info('Wait to ensure DUT shut down...')
1088        try:
1089            if shutdown_timeout is None:
1090                shutdown_timeout = self.faft_config.shutdown_timeout
1091            self.switcher.wait_for_client(timeout=shutdown_timeout)
1092            raise error.TestFail(
1093                    'Should shut the device down after calling %s.' %
1094                    shutdown_action.__name__)
1095        except ConnectionError:
1096            logging.info(
1097                'DUT is surely shutdown. We are going to power it on again...')
1098
1099        if pre_power_action:
1100            self._call_action(pre_power_action)
1101        if run_power_action:
1102            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1103        if post_power_action:
1104            self._call_action(post_power_action)
1105
1106    def get_bootid(self, retry=3):
1107        """
1108        Return the bootid.
1109        """
1110        boot_id = None
1111        while retry:
1112            try:
1113                boot_id = self._client.get_boot_id()
1114                break
1115            except error.AutoservRunError:
1116                retry -= 1
1117                if retry:
1118                    logging.info('Retry to get boot_id...')
1119                else:
1120                    logging.warning('Failed to get boot_id.')
1121        logging.info('boot_id: %s', boot_id)
1122        return boot_id
1123
1124    def check_state(self, func):
1125        """
1126        Wrapper around _call_action with check_status set to True. This is a
1127        helper function to be used by tests and is currently implemented by
1128        calling _call_action with check_status=True.
1129
1130        TODO: This function's arguments need to be made more stringent. And
1131        its functionality should be moved over to check functions directly in
1132        the future.
1133
1134        @param func: A function, or a tuple (function, args, error_msg),
1135                             in which, args and error_msg are optional. args is
1136                             either a value or a tuple if multiple arguments.
1137                             This can also be a list containing multiple
1138                             function or tuple. In this case, these actions are
1139                             called in sequence.
1140        @return: The result value of the action function.
1141        @raise TestFail: If the function does notsucceed.
1142        """
1143        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1144        self._call_action(func, check_status=True)
1145        logging.info("-[FAFT]-[ end state_checker ]----------------")
1146
1147    def get_current_firmware_sha(self):
1148        """Get current firmware sha of body and vblock.
1149
1150        @return: Current firmware sha follows the order (
1151                 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1152        """
1153        current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1154                                self.faft_client.bios.get_body_sha('a'),
1155                                self.faft_client.bios.get_sig_sha('b'),
1156                                self.faft_client.bios.get_body_sha('b'))
1157        if not all(current_firmware_sha):
1158            raise error.TestError('Failed to get firmware sha.')
1159        return current_firmware_sha
1160
1161    def is_firmware_changed(self):
1162        """Check if the current firmware changed, by comparing its SHA.
1163
1164        @return: True if it is changed, otherwise Flase.
1165        """
1166        # Device may not be rebooted after test.
1167        self.faft_client.bios.reload()
1168
1169        current_sha = self.get_current_firmware_sha()
1170
1171        if current_sha == self._backup_firmware_sha:
1172            return False
1173        else:
1174            corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1175            corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1176            corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1177            corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1178            logging.info("Firmware changed:")
1179            logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1180            logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1181            logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1182            logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1183            return True
1184
1185    def backup_firmware(self, suffix='.original'):
1186        """Backup firmware to file, and then send it to host.
1187
1188        @param suffix: a string appended to backup file name
1189        """
1190        remote_temp_dir = self.faft_client.system.create_temp_dir()
1191        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1192        self.faft_client.bios.dump_whole(remote_bios_path)
1193        self._client.get_file(remote_bios_path,
1194                              os.path.join(self.resultsdir, 'bios' + suffix))
1195        self._client.run('rm -rf %s' % remote_temp_dir)
1196        logging.info('Backup firmware stored in %s with suffix %s',
1197            self.resultsdir, suffix)
1198
1199        self._backup_firmware_sha = self.get_current_firmware_sha()
1200
1201    def is_firmware_saved(self):
1202        """Check if a firmware saved (called backup_firmware before).
1203
1204        @return: True if the firmware is backuped; otherwise False.
1205        """
1206        return self._backup_firmware_sha != ()
1207
1208    def clear_saved_firmware(self):
1209        """Clear the firmware saved by the method backup_firmware."""
1210        self._backup_firmware_sha = ()
1211
1212    def restore_firmware(self, suffix='.original'):
1213        """Restore firmware from host in resultsdir.
1214
1215        @param suffix: a string appended to backup file name
1216        """
1217        if not self.is_firmware_changed():
1218            return
1219
1220        # Backup current corrupted firmware.
1221        self.backup_firmware(suffix='.corrupt')
1222
1223        # Restore firmware.
1224        remote_temp_dir = self.faft_client.system.create_temp_dir()
1225        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1226                               os.path.join(remote_temp_dir, 'bios'))
1227
1228        self.faft_client.bios.write_whole(
1229            os.path.join(remote_temp_dir, 'bios'))
1230        self.switcher.mode_aware_reboot()
1231        logging.info('Successfully restore firmware.')
1232
1233    def setup_firmwareupdate_shellball(self, shellball=None):
1234        """Setup a shellball to use in firmware update test.
1235
1236        Check if there is a given shellball, and it is a shell script. Then,
1237        send it to the remote host. Otherwise, use the
1238        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1239        BIOS and EC images with the active firmware images.
1240
1241        @param shellball: path of a shellball or default to None.
1242        """
1243        if shellball:
1244            # Determine the firmware file is a shellball or a raw binary.
1245            is_shellball = (utils.system_output("file %s" % shellball).find(
1246                    "shell script") != -1)
1247            if is_shellball:
1248                logging.info('Device will update firmware with shellball %s',
1249                             shellball)
1250                temp_path = self.faft_client.updater.get_temp_path()
1251                working_shellball = os.path.join(temp_path,
1252                                                 'chromeos-firmwareupdate')
1253                self._client.send_file(shellball, working_shellball)
1254                self.faft_client.updater.extract_shellball()
1255            else:
1256                raise error.TestFail(
1257                    'The given shellball is not a shell script.')
1258        else:
1259            logging.info('No shellball given, use the original shellball and '
1260                         'replace its BIOS and EC images.')
1261            work_path = self.faft_client.updater.get_work_path()
1262            bios_in_work_path = os.path.join(work_path, 'bios.bin')
1263            ec_in_work_path = os.path.join(work_path, 'ec.bin')
1264            self.faft_client.bios.dump_whole(bios_in_work_path)
1265            if self.faft_config.chrome_ec:
1266                self.faft_client.ec.dump_firmware(ec_in_work_path)
1267            self.faft_client.updater.repack_shellball()
1268
1269    def is_kernel_changed(self):
1270        """Check if the current kernel is changed, by comparing its SHA1 hash.
1271
1272        @return: True if it is changed; otherwise, False.
1273        """
1274        changed = False
1275        for p in ('A', 'B'):
1276            backup_sha = self._backup_kernel_sha.get(p, None)
1277            current_sha = self.faft_client.kernel.get_sha(p)
1278            if backup_sha != current_sha:
1279                changed = True
1280                logging.info('Kernel %s is changed', p)
1281        return changed
1282
1283    def backup_kernel(self, suffix='.original'):
1284        """Backup kernel to files, and the send them to host.
1285
1286        @param suffix: a string appended to backup file name.
1287        """
1288        remote_temp_dir = self.faft_client.system.create_temp_dir()
1289        for p in ('A', 'B'):
1290            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1291            self.faft_client.kernel.dump(p, remote_path)
1292            self._client.get_file(
1293                    remote_path,
1294                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1295            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1296        logging.info('Backup kernel stored in %s with suffix %s',
1297            self.resultsdir, suffix)
1298
1299    def is_kernel_saved(self):
1300        """Check if kernel images are saved (backup_kernel called before).
1301
1302        @return: True if the kernel is saved; otherwise, False.
1303        """
1304        return len(self._backup_kernel_sha) != 0
1305
1306    def clear_saved_kernel(self):
1307        """Clear the kernel saved by backup_kernel()."""
1308        self._backup_kernel_sha = dict()
1309
1310    def restore_kernel(self, suffix='.original'):
1311        """Restore kernel from host in resultsdir.
1312
1313        @param suffix: a string appended to backup file name.
1314        """
1315        if not self.is_kernel_changed():
1316            return
1317
1318        # Backup current corrupted kernel.
1319        self.backup_kernel(suffix='.corrupt')
1320
1321        # Restore kernel.
1322        remote_temp_dir = self.faft_client.system.create_temp_dir()
1323        for p in ('A', 'B'):
1324            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1325            self._client.send_file(
1326                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1327                    remote_path)
1328            self.faft_client.kernel.write(p, remote_path)
1329
1330        self.switcher.mode_aware_reboot()
1331        logging.info('Successfully restored kernel.')
1332
1333    def backup_cgpt_attributes(self):
1334        """Backup CGPT partition table attributes."""
1335        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1336
1337    def restore_cgpt_attributes(self):
1338        """Restore CGPT partition table attributes."""
1339        current_table = self.faft_client.cgpt.get_attributes()
1340        if current_table == self._backup_cgpt_attr:
1341            return
1342        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1343                     self._backup_cgpt_attr,
1344                     current_table)
1345        self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1346
1347        self.switcher.mode_aware_reboot()
1348        logging.info('Successfully restored CGPT table.')
1349
1350    def try_fwb(self, count=0):
1351        """set to try booting FWB count # times
1352
1353        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1354        vboot2
1355
1356        @param count: an integer specifying value to program into
1357                      fwb_tries(vb1)/fw_try_next(vb2)
1358        """
1359        if self.fw_vboot2:
1360            self.faft_client.system.set_fw_try_next('B', count)
1361        else:
1362            # vboot1: we need to boot into fwb at least once
1363            if not count:
1364                count = count + 1
1365            self.faft_client.system.set_try_fw_b(count)
1366
1367