1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import pprint 10import re 11import time 12import uuid 13 14from threading import Timer 15from autotest_lib.client.bin import utils 16from autotest_lib.client.common_lib import error 17from autotest_lib.server import test 18from autotest_lib.server.cros import vboot_constants as vboot 19from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig 20from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 21from autotest_lib.server.cros.faft.utils import mode_switcher 22from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 23from autotest_lib.server.cros.servo import chrome_ec 24from autotest_lib.server.cros.servo import chrome_usbpd 25 26ConnectionError = mode_switcher.ConnectionError 27 28 29class FAFTBase(test.test): 30 """The base class of FAFT classes. 31 32 It launches the FAFTClient on DUT, such that the test can access its 33 firmware functions and interfaces. It also provides some methods to 34 handle the reboot mechanism, in order to ensure FAFTClient is still 35 connected after reboot. 36 """ 37 def initialize(self, host): 38 """Create a FAFTClient object and install the dependency.""" 39 self.servo = host.servo 40 self.servo.initialize_dut() 41 self._client = host 42 self.faft_client = RPCProxy(host) 43 self.lockfile = '/var/tmp/faft/lock' 44 45 46class FirmwareTest(FAFTBase): 47 """ 48 Base class that sets up helper objects/functions for firmware tests. 49 50 TODO: add documentaion as the FAFT rework progresses. 51 """ 52 version = 1 53 54 # Mapping of partition number of kernel and rootfs. 55 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 56 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 57 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 58 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 59 60 CHROMEOS_MAGIC = "CHROMEOS" 61 CORRUPTED_MAGIC = "CORRUPTD" 62 63 # Delay for waiting client to return before EC suspend 64 EC_SUSPEND_DELAY = 5 65 66 # Delay between EC suspend and wake 67 WAKE_DELAY = 10 68 69 # Delay between closing and opening lid 70 LID_DELAY = 1 71 72 _SERVOD_LOG = '/var/log/servod.log' 73 74 _ROOTFS_PARTITION_NUMBER = 3 75 76 _backup_firmware_sha = () 77 _backup_kernel_sha = dict() 78 _backup_cgpt_attr = dict() 79 _backup_gbb_flags = None 80 _backup_dev_mode = None 81 82 # Class level variable, keep track the states of one time setup. 83 # This variable is preserved across tests which inherit this class. 84 _global_setup_done = { 85 'gbb_flags': False, 86 'reimage': False, 87 'usb_check': False, 88 } 89 90 @classmethod 91 def check_setup_done(cls, label): 92 """Check if the given setup is done. 93 94 @param label: The label of the setup. 95 """ 96 return cls._global_setup_done[label] 97 98 @classmethod 99 def mark_setup_done(cls, label): 100 """Mark the given setup done. 101 102 @param label: The label of the setup. 103 """ 104 cls._global_setup_done[label] = True 105 106 @classmethod 107 def unmark_setup_done(cls, label): 108 """Mark the given setup not done. 109 110 @param label: The label of the setup. 111 """ 112 cls._global_setup_done[label] = False 113 114 def initialize(self, host, cmdline_args, ec_wp=None): 115 super(FirmwareTest, self).initialize(host) 116 self.run_id = str(uuid.uuid4()) 117 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 118 # Parse arguments from command line 119 args = {} 120 self.power_control = host.POWER_CONTROL_RPM 121 for arg in cmdline_args: 122 match = re.search("^(\w+)=(.+)", arg) 123 if match: 124 args[match.group(1)] = match.group(2) 125 if 'power_control' in args: 126 self.power_control = args['power_control'] 127 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 128 raise error.TestError('Valid values for --args=power_control ' 129 'are %s. But you entered wrong argument ' 130 'as "%s".' 131 % (host.POWER_CONTROL_VALID_ARGS, 132 self.power_control)) 133 134 self.faft_config = FAFTConfig( 135 self.faft_client.system.get_platform_name()) 136 self.checkers = FAFTCheckers(self) 137 self.switcher = mode_switcher.create_mode_switcher(self) 138 139 if self.faft_config.chrome_ec: 140 self.ec = chrome_ec.ChromeEC(self.servo) 141 # Check for presence of a USBPD console 142 if self.faft_config.chrome_usbpd: 143 self.usbpd = chrome_usbpd.ChromeUSBPD(self.servo) 144 elif self.faft_config.chrome_ec: 145 # If no separate USBPD console, then PD exists on EC console 146 self.usbpd = self.ec 147 # Get plankton console 148 self.plankton = host.plankton 149 self.plankton_host = host._plankton_host 150 151 self._setup_uart_capture() 152 self._setup_servo_log() 153 self._record_system_info() 154 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 155 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 156 if self.fw_vboot2: 157 self.faft_client.system.set_fw_try_next('A') 158 if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B': 159 logging.info('mainfw_act is B. rebooting to set it A') 160 self.switcher.mode_aware_reboot() 161 self._setup_gbb_flags() 162 self._stop_service('update-engine') 163 self._create_faft_lockfile() 164 self._setup_ec_write_protect(ec_wp) 165 # See chromium:239034 regarding needing this sync. 166 self.blocking_sync() 167 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 168 169 def cleanup(self): 170 """Autotest cleanup function.""" 171 # Unset state checker in case it's set by subclass 172 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 173 try: 174 self.faft_client.system.is_available() 175 except: 176 # Remote is not responding. Revive DUT so that subsequent tests 177 # don't fail. 178 self._restore_routine_from_timeout() 179 self.switcher.restore_mode() 180 self._restore_ec_write_protect() 181 self._restore_gbb_flags() 182 self._start_service('update-engine') 183 self._remove_faft_lockfile() 184 self._record_servo_log() 185 self._record_faft_client_log() 186 self._cleanup_uart_capture() 187 super(FirmwareTest, self).cleanup() 188 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 189 190 def _record_system_info(self): 191 """Record some critical system info to the attr keyval. 192 193 This info is used by generate_test_report later. 194 """ 195 system_info = { 196 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 197 'ec_version': self.faft_client.ec.get_version(), 198 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 199 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 200 'servod_version': self._client._servo_host.run( 201 'servod --version').stdout.strip(), 202 } 203 logging.info('System info:\n' + pprint.pformat(system_info)) 204 self.write_attr_keyval(system_info) 205 206 def invalidate_firmware_setup(self): 207 """Invalidate all firmware related setup state. 208 209 This method is called when the firmware is re-flashed. It resets all 210 firmware related setup states so that the next test setup properly 211 again. 212 """ 213 self.unmark_setup_done('gbb_flags') 214 215 def _retrieve_recovery_reason_from_trap(self): 216 """Try to retrieve the recovery reason from a trapped recovery screen. 217 218 @return: The recovery_reason, 0 if any error. 219 """ 220 recovery_reason = 0 221 logging.info('Try to retrieve recovery reason...') 222 if self.servo.get_usbkey_direction() == 'dut': 223 self.switcher.bypass_rec_mode() 224 else: 225 self.servo.switch_usbkey('dut') 226 227 try: 228 self.switcher.wait_for_client() 229 lines = self.faft_client.system.run_shell_command_get_output( 230 'crossystem recovery_reason') 231 recovery_reason = int(lines[0]) 232 logging.info('Got the recovery reason %d.', recovery_reason) 233 except ConnectionError: 234 logging.error('Failed to get the recovery reason due to connection ' 235 'error.') 236 return recovery_reason 237 238 def _reset_client(self): 239 """Reset client to a workable state. 240 241 This method is called when the client is not responsive. It may be 242 caused by the following cases: 243 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 244 - corrupted firmware; 245 - corrutped OS image. 246 """ 247 # DUT may halt on a firmware screen. Try cold reboot. 248 logging.info('Try cold reboot...') 249 self.switcher.mode_aware_reboot(reboot_type='cold', 250 sync_before_boot=False, 251 wait_for_dut_up=False) 252 self.switcher.wait_for_client_offline() 253 self.switcher.bypass_dev_mode() 254 try: 255 self.switcher.wait_for_client() 256 return 257 except ConnectionError: 258 logging.warn('Cold reboot doesn\'t help, still connection error.') 259 260 # DUT may be broken by a corrupted firmware. Restore firmware. 261 # We assume the recovery boot still works fine. Since the recovery 262 # code is in RO region and all FAFT tests don't change the RO region 263 # except GBB. 264 if self.is_firmware_saved(): 265 self._ensure_client_in_recovery() 266 logging.info('Try restore the original firmware...') 267 if self.is_firmware_changed(): 268 try: 269 self.restore_firmware() 270 return 271 except ConnectionError: 272 logging.warn('Restoring firmware doesn\'t help, still ' 273 'connection error.') 274 275 # Perhaps it's kernel that's broken. Let's try restoring it. 276 if self.is_kernel_saved(): 277 self._ensure_client_in_recovery() 278 logging.info('Try restore the original kernel...') 279 if self.is_kernel_changed(): 280 try: 281 self.restore_kernel() 282 return 283 except ConnectionError: 284 logging.warn('Restoring kernel doesn\'t help, still ' 285 'connection error.') 286 287 # DUT may be broken by a corrupted OS image. Restore OS image. 288 self._ensure_client_in_recovery() 289 logging.info('Try restore the OS image...') 290 self.faft_client.system.run_shell_command('chromeos-install --yes') 291 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 292 self.switcher.wait_for_client_offline() 293 self.switcher.bypass_dev_mode() 294 try: 295 self.switcher.wait_for_client() 296 logging.info('Successfully restore OS image.') 297 return 298 except ConnectionError: 299 logging.warn('Restoring OS image doesn\'t help, still connection ' 300 'error.') 301 302 def _ensure_client_in_recovery(self): 303 """Ensure client in recovery boot; reboot into it if necessary. 304 305 @raise TestError: if failed to boot the USB image. 306 """ 307 logging.info('Try boot into USB image...') 308 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 309 wait_for_dut_up=False) 310 self.servo.switch_usbkey('host') 311 self.switcher.bypass_rec_mode() 312 try: 313 self.switcher.wait_for_client() 314 except ConnectionError: 315 raise error.TestError('Failed to boot the USB image.') 316 317 def _restore_routine_from_timeout(self): 318 """A routine to try to restore the system from a timeout error. 319 320 This method is called when FAFT failed to connect DUT after reboot. 321 322 @raise TestFail: This exception is already raised, with a decription 323 why it failed. 324 """ 325 # DUT is disconnected. Capture the UART output for debug. 326 self._record_uart_capture() 327 328 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 329 # identify if it is a network flaky. 330 331 recovery_reason = self._retrieve_recovery_reason_from_trap() 332 333 # Reset client to a workable state. 334 self._reset_client() 335 336 # Raise the proper TestFail exception. 337 if recovery_reason: 338 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 339 'and timed out' % recovery_reason) 340 else: 341 raise error.TestFail('Timed out waiting for DUT reboot') 342 343 def assert_test_image_in_usb_disk(self, usb_dev=None): 344 """Assert an USB disk plugged-in on servo and a test image inside. 345 346 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 347 If None, it is detected automatically. 348 @raise TestError: if USB disk not detected or not a test image. 349 """ 350 if self.check_setup_done('usb_check'): 351 return 352 if usb_dev: 353 assert self.servo.get_usbkey_direction() == 'host' 354 else: 355 self.servo.switch_usbkey('host') 356 usb_dev = self.servo.probe_host_usb_dev() 357 if not usb_dev: 358 raise error.TestError( 359 'An USB disk should be plugged in the servo board.') 360 361 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 362 logging.info('usb dev is %s', usb_dev) 363 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 364 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 365 366 try: 367 usb_lsb = self.servo.system_output('cat %s' % 368 os.path.join(tmpd, 'etc/lsb-release')) 369 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 370 dut_lsb = '\n'.join(self.faft_client.system. 371 run_shell_command_get_output('cat /etc/lsb-release')) 372 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 373 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 374 raise error.TestError('USB stick in servo is no test image') 375 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 376 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 377 if usb_board != dut_board: 378 raise error.TestError('USB stick in servo contains a %s ' 379 'image, but DUT is a %s' % (usb_board, dut_board)) 380 finally: 381 for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd): 382 self.servo.system(cmd) 383 384 self.mark_setup_done('usb_check') 385 386 def setup_usbkey(self, usbkey, host=None): 387 """Setup the USB disk for the test. 388 389 It checks the setup of USB disk and a valid ChromeOS test image inside. 390 It also muxes the USB disk to either the host or DUT by request. 391 392 @param usbkey: True if the USB disk is required for the test, False if 393 not required. 394 @param host: Optional, True to mux the USB disk to host, False to mux it 395 to DUT, default to do nothing. 396 """ 397 if usbkey: 398 self.assert_test_image_in_usb_disk() 399 elif host is None: 400 # USB disk is not required for the test. Better to mux it to host. 401 host = True 402 403 if host is True: 404 self.servo.switch_usbkey('host') 405 elif host is False: 406 self.servo.switch_usbkey('dut') 407 408 def get_usbdisk_path_on_dut(self): 409 """Get the path of the USB disk device plugged-in the servo on DUT. 410 411 Returns: 412 A string representing USB disk path, like '/dev/sdb', or None if 413 no USB disk is found. 414 """ 415 cmd = 'ls -d /dev/s*[a-z]' 416 original_value = self.servo.get_usbkey_direction() 417 418 # Make the dut unable to see the USB disk. 419 self.servo.switch_usbkey('off') 420 no_usb_set = set( 421 self.faft_client.system.run_shell_command_get_output(cmd)) 422 423 # Make the dut able to see the USB disk. 424 self.servo.switch_usbkey('dut') 425 time.sleep(self.faft_config.usb_plug) 426 has_usb_set = set( 427 self.faft_client.system.run_shell_command_get_output(cmd)) 428 429 # Back to its original value. 430 if original_value != self.servo.get_usbkey_direction(): 431 self.servo.switch_usbkey(original_value) 432 433 diff_set = has_usb_set - no_usb_set 434 if len(diff_set) == 1: 435 return diff_set.pop() 436 else: 437 return None 438 439 def _create_faft_lockfile(self): 440 """Creates the FAFT lockfile.""" 441 logging.info('Creating FAFT lockfile...') 442 command = 'touch %s' % (self.lockfile) 443 self.faft_client.system.run_shell_command(command) 444 445 def _remove_faft_lockfile(self): 446 """Removes the FAFT lockfile.""" 447 logging.info('Removing FAFT lockfile...') 448 command = 'rm -f %s' % (self.lockfile) 449 self.faft_client.system.run_shell_command(command) 450 451 def _stop_service(self, service): 452 """Stops a upstart service on the client. 453 454 @param service: The name of the upstart service. 455 """ 456 logging.info('Stopping %s...', service) 457 command = 'status %s | grep stop || stop %s' % (service, service) 458 self.faft_client.system.run_shell_command(command) 459 460 def _start_service(self, service): 461 """Starts a upstart service on the client. 462 463 @param service: The name of the upstart service. 464 """ 465 logging.info('Starting %s...', service) 466 command = 'status %s | grep start || start %s' % (service, service) 467 self.faft_client.system.run_shell_command(command) 468 469 def clear_set_gbb_flags(self, clear_mask, set_mask): 470 """Clear and set the GBB flags in the current flashrom. 471 472 @param clear_mask: A mask of flags to be cleared. 473 @param set_mask: A mask of flags to be set. 474 """ 475 gbb_flags = self.faft_client.bios.get_gbb_flags() 476 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 477 if new_flags != gbb_flags: 478 self._backup_gbb_flags = gbb_flags 479 logging.info('Changing GBB flags from 0x%x to 0x%x.', 480 gbb_flags, new_flags) 481 self.faft_client.bios.set_gbb_flags(new_flags) 482 # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state 483 if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON): 484 self.switcher.mode_aware_reboot() 485 else: 486 logging.info('Current GBB flags look good for test: 0x%x.', 487 gbb_flags) 488 489 def check_ec_capability(self, required_cap=None, suppress_warning=False): 490 """Check if current platform has required EC capabilities. 491 492 @param required_cap: A list containing required EC capabilities. Pass in 493 None to only check for presence of Chrome EC. 494 @param suppress_warning: True to suppress any warning messages. 495 @return: True if requirements are met. Otherwise, False. 496 """ 497 if not self.faft_config.chrome_ec: 498 if not suppress_warning: 499 logging.warn('Requires Chrome EC to run this test.') 500 return False 501 502 if not required_cap: 503 return True 504 505 for cap in required_cap: 506 if cap not in self.faft_config.ec_capability: 507 if not suppress_warning: 508 logging.warn('Requires EC capability "%s" to run this ' 509 'test.', cap) 510 return False 511 512 return True 513 514 def check_root_part_on_non_recovery(self, part): 515 """Check the partition number of root device and on normal/dev boot. 516 517 @param part: A string of partition number, e.g.'3'. 518 @return: True if the root device matched and on normal/dev boot; 519 otherwise, False. 520 """ 521 return self.checkers.root_part_checker(part) and \ 522 self.checkers.crossystem_checker({ 523 'mainfw_type': ('normal', 'developer'), 524 }) 525 526 def _join_part(self, dev, part): 527 """Return a concatenated string of device and partition number. 528 529 @param dev: A string of device, e.g.'/dev/sda'. 530 @param part: A string of partition number, e.g.'3'. 531 @return: A concatenated string of device and partition number, 532 e.g.'/dev/sda3'. 533 534 >>> seq = FirmwareTest() 535 >>> seq._join_part('/dev/sda', '3') 536 '/dev/sda3' 537 >>> seq._join_part('/dev/mmcblk0', '2') 538 '/dev/mmcblk0p2' 539 """ 540 if 'mmcblk' in dev: 541 return dev + 'p' + part 542 else: 543 return dev + part 544 545 def copy_kernel_and_rootfs(self, from_part, to_part): 546 """Copy kernel and rootfs from from_part to to_part. 547 548 @param from_part: A string of partition number to be copied from. 549 @param to_part: A string of partition number to be copied to. 550 """ 551 root_dev = self.faft_client.system.get_root_dev() 552 logging.info('Copying kernel from %s to %s. Please wait...', 553 from_part, to_part) 554 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 555 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 556 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 557 logging.info('Copying rootfs from %s to %s. Please wait...', 558 from_part, to_part) 559 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 560 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 561 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 562 563 def ensure_kernel_boot(self, part): 564 """Ensure the request kernel boot. 565 566 If not, it duplicates the current kernel to the requested kernel 567 and sets the requested higher priority to ensure it boot. 568 569 @param part: A string of kernel partition number or 'a'/'b'. 570 """ 571 if not self.checkers.root_part_checker(part): 572 if self.faft_client.kernel.diff_a_b(): 573 self.copy_kernel_and_rootfs( 574 from_part=self.OTHER_KERNEL_MAP[part], 575 to_part=part) 576 self.reset_and_prioritize_kernel(part) 577 self.switcher.mode_aware_reboot() 578 579 def set_hardware_write_protect(self, enable): 580 """Set hardware write protect pin. 581 582 @param enable: True if asserting write protect pin. Otherwise, False. 583 """ 584 self.servo.set('fw_wp_vref', self.faft_config.wp_voltage) 585 self.servo.set('fw_wp_en', 'on') 586 self.servo.set('fw_wp', 'on' if enable else 'off') 587 588 def set_ec_write_protect_and_reboot(self, enable): 589 """Set EC write protect status and reboot to take effect. 590 591 The write protect state is only activated if both hardware write 592 protect pin is asserted and software write protect flag is set. 593 This method asserts/deasserts hardware write protect pin first, and 594 set corresponding EC software write protect flag. 595 596 If the device uses non-Chrome EC, set the software write protect via 597 flashrom. 598 599 If the device uses Chrome EC, a reboot is required for write protect 600 to take effect. Since the software write protect flag cannot be unset 601 if hardware write protect pin is asserted, we need to deasserted the 602 pin first if we are deactivating write protect. Similarly, a reboot 603 is required before we can modify the software flag. 604 605 @param enable: True if activating EC write protect. Otherwise, False. 606 """ 607 self.set_hardware_write_protect(enable) 608 if self.faft_config.chrome_ec: 609 self.set_chrome_ec_write_protect_and_reboot(enable) 610 else: 611 self.faft_client.ec.set_write_protect(enable) 612 self.switcher.mode_aware_reboot() 613 614 def set_chrome_ec_write_protect_and_reboot(self, enable): 615 """Set Chrome EC write protect status and reboot to take effect. 616 617 @param enable: True if activating EC write protect. Otherwise, False. 618 """ 619 if enable: 620 # Set write protect flag and reboot to take effect. 621 self.ec.set_flash_write_protect(enable) 622 self.sync_and_ec_reboot() 623 else: 624 # Reboot after deasserting hardware write protect pin to deactivate 625 # write protect. And then remove software write protect flag. 626 self.sync_and_ec_reboot() 627 self.ec.set_flash_write_protect(enable) 628 629 def _setup_ec_write_protect(self, ec_wp): 630 """Setup for EC write-protection. 631 632 It makes sure the EC in the requested write-protection state. If not, it 633 flips the state. Flipping the write-protection requires DUT reboot. 634 635 @param ec_wp: True to request EC write-protected; False to request EC 636 not write-protected; None to do nothing. 637 """ 638 if ec_wp is None: 639 self._old_ec_wp = None 640 return 641 self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'}) 642 if ec_wp != self._old_ec_wp: 643 logging.info('The test required EC is %swrite-protected. Reboot ' 644 'and flip the state.', '' if ec_wp else 'not ') 645 self.switcher.mode_aware_reboot( 646 'custom', 647 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 648 649 def _restore_ec_write_protect(self): 650 """Restore the original EC write-protection.""" 651 if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None): 652 return 653 if not self.checkers.crossystem_checker( 654 {'wpsw_boot': '1' if self._old_ec_wp else '0'}): 655 logging.info('Restore original EC write protection and reboot.') 656 self.switcher.mode_aware_reboot( 657 'custom', 658 lambda:self.set_ec_write_protect_and_reboot( 659 self._old_ec_wp)) 660 661 def _setup_uart_capture(self): 662 """Setup the CPU/EC/PD UART capture.""" 663 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 664 self.servo.set('cpu_uart_capture', 'on') 665 self.ec_uart_file = None 666 self.usbpd_uart_file = None 667 if self.faft_config.chrome_ec: 668 try: 669 self.servo.set('ec_uart_capture', 'on') 670 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 671 except error.TestFail as e: 672 if 'No control named' in str(e): 673 logging.warn('The servod is too old that ec_uart_capture ' 674 'not supported.') 675 # Log separate PD console if supported 676 if self.check_ec_capability(['usbpd_uart'], suppress_warning=True): 677 try: 678 self.servo.set('usbpd_uart_capture', 'on') 679 self.usbpd_uart_file = os.path.join(self.resultsdir, 680 'usbpd_uart.txt') 681 except error.TestFail as e: 682 if 'No control named' in str(e): 683 logging.warn('The servod is too old that ' 684 'usbpd_uart_capture is not supported.') 685 else: 686 logging.info('Not a Google EC, cannot capture ec console output.') 687 688 def _record_uart_capture(self): 689 """Record the CPU/EC/PD UART output stream to files.""" 690 if self.cpu_uart_file: 691 with open(self.cpu_uart_file, 'a') as f: 692 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream'))) 693 if self.ec_uart_file and self.faft_config.chrome_ec: 694 with open(self.ec_uart_file, 'a') as f: 695 f.write(ast.literal_eval(self.servo.get('ec_uart_stream'))) 696 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 697 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 698 with open(self.usbpd_uart_file, 'a') as f: 699 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream'))) 700 701 def _cleanup_uart_capture(self): 702 """Cleanup the CPU/EC/PD UART capture.""" 703 # Flush the remaining UART output. 704 self._record_uart_capture() 705 self.servo.set('cpu_uart_capture', 'off') 706 if self.ec_uart_file and self.faft_config.chrome_ec: 707 self.servo.set('ec_uart_capture', 'off') 708 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 709 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 710 self.servo.set('usbpd_uart_capture', 'off') 711 712 def _get_power_state(self, power_state): 713 """ 714 Return the current power state of the AP 715 """ 716 return self.ec.send_command_get_output("powerinfo", [power_state]) 717 718 def wait_power_state(self, power_state, retries): 719 """ 720 Wait for certain power state. 721 722 @param power_state: power state you are expecting 723 @param retries: retries. This is necessary if AP is powering down 724 and transitioning through different states. 725 """ 726 logging.info('Checking power state "%s" maximum %d times.', 727 power_state, retries) 728 while retries > 0: 729 logging.info("try count: %d" % retries) 730 try: 731 retries = retries - 1 732 ret = self._get_power_state(power_state) 733 return True 734 except error.TestFail: 735 pass 736 return False 737 738 def delayed(seconds): 739 def decorator(f): 740 def wrapper(*args, **kargs): 741 t = Timer(seconds, f, args, kargs) 742 t.start() 743 return wrapper 744 return decorator 745 746 @delayed(WAKE_DELAY) 747 def wake_by_power_button(self): 748 """Delay by WAKE_DELAY seconds and then wake DUT with power button.""" 749 self.servo.power_normal_press() 750 751 @delayed(WAKE_DELAY) 752 def wake_by_lid_switch(self): 753 """Delay by WAKE_DELAY seconds and then wake DUT with lid switch.""" 754 self.servo.set('lid_open', 'no') 755 time.sleep(self.LID_DELAY) 756 self.servo.set('lid_open', 'yes') 757 758 def suspend_as_reboot(self, wake_func): 759 """ 760 Suspend DUT and also kill FAFT client so that this acts like a reboot. 761 762 Args: 763 wake_func: A function that is called to wake DUT. Note that this 764 function must delay itself so that we don't wake DUT before 765 suspend_as_reboot returns. 766 """ 767 cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY 768 self.faft_client.system.run_shell_command(cmd) 769 self.faft_client.disconnect() 770 time.sleep(self.EC_SUSPEND_DELAY) 771 logging.info("wake function disabled") 772 wake_func() 773 774 def _fetch_servo_log(self): 775 """Fetch the servo log.""" 776 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2) 777 servo_log = self.servo.system_output(cmd) 778 return None if servo_log == 'NOTFOUND' else servo_log 779 780 def _setup_servo_log(self): 781 """Setup the servo log capturing.""" 782 self.servo_log_original_len = -1 783 if self.servo.is_localhost(): 784 # No servo log recorded when servod runs locally. 785 return 786 787 servo_log = self._fetch_servo_log() 788 if servo_log: 789 self.servo_log_original_len = len(servo_log) 790 else: 791 logging.warn('Servo log file not found.') 792 793 def _record_servo_log(self): 794 """Record the servo log to the results directory.""" 795 if self.servo_log_original_len != -1: 796 servo_log = self._fetch_servo_log() 797 servo_log_file = os.path.join(self.resultsdir, 'servod.log') 798 with open(servo_log_file, 'a') as f: 799 f.write(servo_log[self.servo_log_original_len:]) 800 801 def _record_faft_client_log(self): 802 """Record the faft client log to the results directory.""" 803 client_log = self.faft_client.system.dump_log(True) 804 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 805 with open(client_log_file, 'w') as f: 806 f.write(client_log) 807 808 def _setup_gbb_flags(self): 809 """Setup the GBB flags for FAFT test.""" 810 if self.faft_config.gbb_version < 1.1: 811 logging.info('Skip modifying GBB on versions older than 1.1.') 812 return 813 814 if self.check_setup_done('gbb_flags'): 815 return 816 817 logging.info('Set proper GBB flags for test.') 818 self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY | 819 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 820 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB | 821 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK | 822 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP, 823 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM | 824 vboot.GBB_FLAG_FAFT_KEY_OVERIDE) 825 self.mark_setup_done('gbb_flags') 826 827 def drop_backup_gbb_flags(self): 828 """Drops the backup GBB flags. 829 830 This can be used when a test intends to permanently change GBB flags. 831 """ 832 self._backup_gbb_flags = None 833 834 def _restore_gbb_flags(self): 835 """Restore GBB flags to their original state.""" 836 if self._backup_gbb_flags is None: 837 return 838 # Setting up and restoring the GBB flags take a lot of time. For 839 # speed-up purpose, don't restore it. 840 logging.info('***') 841 logging.info('*** Please manually restore the original GBB flags to: ' 842 '0x%x ***', self._backup_gbb_flags) 843 logging.info('***') 844 self.unmark_setup_done('gbb_flags') 845 846 def setup_tried_fwb(self, tried_fwb): 847 """Setup for fw B tried state. 848 849 It makes sure the system in the requested fw B tried state. If not, it 850 tries to do so. 851 852 @param tried_fwb: True if requested in tried_fwb=1; 853 False if tried_fwb=0. 854 """ 855 if tried_fwb: 856 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 857 logging.info( 858 'Firmware is not booted with tried_fwb. Reboot into it.') 859 self.faft_client.system.set_try_fw_b() 860 else: 861 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 862 logging.info( 863 'Firmware is booted with tried_fwb. Reboot to clear.') 864 865 def power_on(self): 866 """Switch DUT AC power on.""" 867 self._client.power_on(self.power_control) 868 869 def power_off(self): 870 """Switch DUT AC power off.""" 871 self._client.power_off(self.power_control) 872 873 def power_cycle(self): 874 """Power cycle DUT AC power.""" 875 self._client.power_cycle(self.power_control) 876 877 def setup_rw_boot(self, section='a'): 878 """Make sure firmware is in RW-boot mode. 879 880 If the given firmware section is in RO-boot mode, turn off the RO-boot 881 flag and reboot DUT into RW-boot mode. 882 883 @param section: A firmware section, either 'a' or 'b'. 884 """ 885 flags = self.faft_client.bios.get_preamble_flags(section) 886 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 887 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 888 self.faft_client.bios.set_preamble_flags(section, flags) 889 self.switcher.mode_aware_reboot() 890 891 def setup_kernel(self, part): 892 """Setup for kernel test. 893 894 It makes sure both kernel A and B bootable and the current boot is 895 the requested kernel part. 896 897 @param part: A string of kernel partition number or 'a'/'b'. 898 """ 899 self.ensure_kernel_boot(part) 900 logging.info('Checking the integrity of kernel B and rootfs B...') 901 if (self.faft_client.kernel.diff_a_b() or 902 not self.faft_client.rootfs.verify_rootfs('B')): 903 logging.info('Copying kernel and rootfs from A to B...') 904 self.copy_kernel_and_rootfs(from_part=part, 905 to_part=self.OTHER_KERNEL_MAP[part]) 906 self.reset_and_prioritize_kernel(part) 907 908 def reset_and_prioritize_kernel(self, part): 909 """Make the requested partition highest priority. 910 911 This function also reset kerenl A and B to bootable. 912 913 @param part: A string of partition number to be prioritized. 914 """ 915 root_dev = self.faft_client.system.get_root_dev() 916 # Reset kernel A and B to bootable. 917 self.faft_client.system.run_shell_command( 918 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 919 self.faft_client.system.run_shell_command( 920 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 921 # Set kernel part highest priority. 922 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 923 (self.KERNEL_MAP[part], root_dev)) 924 925 def blocking_sync(self): 926 """Run a blocking sync command.""" 927 # The double calls to sync fakes a blocking call 928 # since the first call returns before the flush 929 # is complete, but the second will wait for the 930 # first to finish. 931 self.faft_client.system.run_shell_command('sync') 932 self.faft_client.system.run_shell_command('sync') 933 934 # sync only sends SYNCHRONIZE_CACHE but doesn't 935 # check the status. For mmc devices, use `mmc 936 # status get` command to send an empty command to 937 # wait for the disk to be available again. For 938 # other devices, hdparm sends TUR to check if 939 # a device is ready for transfer operation. 940 root_dev = self.faft_client.system.get_root_dev() 941 if 'mmcblk' in root_dev: 942 self.faft_client.system.run_shell_command('mmc status get %s' % 943 root_dev) 944 else: 945 self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev) 946 947 def sync_and_ec_reboot(self, flags=''): 948 """Request the client sync and do a EC triggered reboot. 949 950 @param flags: Optional, a space-separated string of flags passed to EC 951 reboot command, including: 952 default: EC soft reboot; 953 'hard': EC cold/hard reboot. 954 """ 955 self.blocking_sync() 956 self.ec.reboot(flags) 957 time.sleep(self.faft_config.ec_boot_to_console) 958 self.check_lid_and_power_on() 959 960 def reboot_and_reset_tpm(self): 961 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 962 self.switcher.reboot_to_mode(to_mode='rec') 963 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 964 self.switcher.mode_aware_reboot() 965 966 def full_power_off_and_on(self): 967 """Shutdown the device by pressing power button and power on again.""" 968 boot_id = self.get_bootid() 969 # Press power button to trigger Chrome OS normal shutdown process. 970 # We use a customized delay since the normal-press 1.2s is not enough. 971 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 972 # device can take 44-51 seconds to restart, 973 # add buffer from the default timeout of 60 seconds. 974 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 975 time.sleep(self.faft_config.shutdown) 976 # Short press power button to boot DUT again. 977 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 978 979 def check_lid_and_power_on(self): 980 """ 981 On devices with EC software sync, system powers on after EC reboots if 982 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 983 This method checks lid switch state and presses power button if 984 necessary. 985 """ 986 if self.servo.get("lid_open") == "no": 987 time.sleep(self.faft_config.software_sync) 988 self.servo.power_short_press() 989 990 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 991 """Modify the kernel header magic in USB stick. 992 993 The kernel header magic is the first 8-byte of kernel partition. 994 We modify it to make it fail on kernel verification check. 995 996 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 997 @param from_magic: A string of magic which we change it from. 998 @param to_magic: A string of magic which we change it to. 999 @raise TestError: if failed to change magic. 1000 """ 1001 assert len(from_magic) == 8 1002 assert len(to_magic) == 8 1003 # USB image only contains one kernel. 1004 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1005 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1006 current_magic = self.servo.system_output(read_cmd) 1007 if current_magic == to_magic: 1008 logging.info("The kernel magic is already %s.", current_magic) 1009 return 1010 if current_magic != from_magic: 1011 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1012 1013 logging.info('Modify the kernel magic in USB, from %s to %s.', 1014 from_magic, to_magic) 1015 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1016 " 2>/dev/null" % (to_magic, kernel_part)) 1017 self.servo.system(write_cmd) 1018 1019 if self.servo.system_output(read_cmd) != to_magic: 1020 raise error.TestError("Failed to write new magic.") 1021 1022 def corrupt_usb_kernel(self, usb_dev): 1023 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1024 1025 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1026 """ 1027 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1028 self.CORRUPTED_MAGIC) 1029 1030 def restore_usb_kernel(self, usb_dev): 1031 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1032 1033 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1034 """ 1035 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1036 self.CHROMEOS_MAGIC) 1037 1038 def _call_action(self, action_tuple, check_status=False): 1039 """Call the action function with/without arguments. 1040 1041 @param action_tuple: A function, or a tuple (function, args, error_msg), 1042 in which, args and error_msg are optional. args is 1043 either a value or a tuple if multiple arguments. 1044 This can also be a list containing multiple 1045 function or tuple. In this case, these actions are 1046 called in sequence. 1047 @param check_status: Check the return value of action function. If not 1048 succeed, raises a TestFail exception. 1049 @return: The result value of the action function. 1050 @raise TestError: An error when the action function is not callable. 1051 @raise TestFail: When check_status=True, action function not succeed. 1052 """ 1053 if isinstance(action_tuple, list): 1054 return all([self._call_action(action, check_status=check_status) 1055 for action in action_tuple]) 1056 1057 action = action_tuple 1058 args = () 1059 error_msg = 'Not succeed' 1060 if isinstance(action_tuple, tuple): 1061 action = action_tuple[0] 1062 if len(action_tuple) >= 2: 1063 args = action_tuple[1] 1064 if not isinstance(args, tuple): 1065 args = (args,) 1066 if len(action_tuple) >= 3: 1067 error_msg = action_tuple[2] 1068 1069 if action is None: 1070 return 1071 1072 if not callable(action): 1073 raise error.TestError('action is not callable!') 1074 1075 info_msg = 'calling %s' % str(action) 1076 if args: 1077 info_msg += ' with args %s' % str(args) 1078 logging.info(info_msg) 1079 ret = action(*args) 1080 1081 if check_status and not ret: 1082 raise error.TestFail('%s: %s returning %s' % 1083 (error_msg, info_msg, str(ret))) 1084 return ret 1085 1086 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1087 run_power_action=True, post_power_action=None, shutdown_timeout=None): 1088 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1089 1090 @param shutdown_action: function which makes DUT shutdown, like 1091 pressing power key. 1092 @param pre_power_action: function which is called before next power on. 1093 @param power_action: power_key press by default, set to None to skip. 1094 @param post_power_action: function which is called after next power on. 1095 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1096 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1097 """ 1098 self._call_action(shutdown_action) 1099 logging.info('Wait to ensure DUT shut down...') 1100 try: 1101 if shutdown_timeout is None: 1102 shutdown_timeout = self.faft_config.shutdown_timeout 1103 self.switcher.wait_for_client(timeout=shutdown_timeout) 1104 raise error.TestFail( 1105 'Should shut the device down after calling %s.' % 1106 str(shutdown_action)) 1107 except ConnectionError: 1108 logging.info( 1109 'DUT is surely shutdown. We are going to power it on again...') 1110 1111 if pre_power_action: 1112 self._call_action(pre_power_action) 1113 if run_power_action: 1114 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1115 if post_power_action: 1116 self._call_action(post_power_action) 1117 1118 def get_bootid(self, retry=3): 1119 """ 1120 Return the bootid. 1121 """ 1122 boot_id = None 1123 while retry: 1124 try: 1125 boot_id = self._client.get_boot_id() 1126 break 1127 except error.AutoservRunError: 1128 retry -= 1 1129 if retry: 1130 logging.info('Retry to get boot_id...') 1131 else: 1132 logging.warning('Failed to get boot_id.') 1133 logging.info('boot_id: %s', boot_id) 1134 return boot_id 1135 1136 def check_state(self, func): 1137 """ 1138 Wrapper around _call_action with check_status set to True. This is a 1139 helper function to be used by tests and is currently implemented by 1140 calling _call_action with check_status=True. 1141 1142 TODO: This function's arguments need to be made more stringent. And 1143 its functionality should be moved over to check functions directly in 1144 the future. 1145 1146 @param func: A function, or a tuple (function, args, error_msg), 1147 in which, args and error_msg are optional. args is 1148 either a value or a tuple if multiple arguments. 1149 This can also be a list containing multiple 1150 function or tuple. In this case, these actions are 1151 called in sequence. 1152 @return: The result value of the action function. 1153 @raise TestFail: If the function does notsucceed. 1154 """ 1155 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1156 self._call_action(func, check_status=True) 1157 logging.info("-[FAFT]-[ end state_checker ]----------------") 1158 1159 def get_current_firmware_sha(self): 1160 """Get current firmware sha of body and vblock. 1161 1162 @return: Current firmware sha follows the order ( 1163 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha) 1164 """ 1165 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'), 1166 self.faft_client.bios.get_body_sha('a'), 1167 self.faft_client.bios.get_sig_sha('b'), 1168 self.faft_client.bios.get_body_sha('b')) 1169 if not all(current_firmware_sha): 1170 raise error.TestError('Failed to get firmware sha.') 1171 return current_firmware_sha 1172 1173 def is_firmware_changed(self): 1174 """Check if the current firmware changed, by comparing its SHA. 1175 1176 @return: True if it is changed, otherwise Flase. 1177 """ 1178 # Device may not be rebooted after test. 1179 self.faft_client.bios.reload() 1180 1181 current_sha = self.get_current_firmware_sha() 1182 1183 if current_sha == self._backup_firmware_sha: 1184 return False 1185 else: 1186 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0]) 1187 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1]) 1188 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2]) 1189 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3]) 1190 logging.info("Firmware changed:") 1191 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA) 1192 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB) 1193 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN) 1194 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB) 1195 return True 1196 1197 def backup_firmware(self, suffix='.original'): 1198 """Backup firmware to file, and then send it to host. 1199 1200 @param suffix: a string appended to backup file name 1201 """ 1202 remote_temp_dir = self.faft_client.system.create_temp_dir() 1203 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1204 self.faft_client.bios.dump_whole(remote_bios_path) 1205 self._client.get_file(remote_bios_path, 1206 os.path.join(self.resultsdir, 'bios' + suffix)) 1207 self._client.run('rm -rf %s' % remote_temp_dir) 1208 logging.info('Backup firmware stored in %s with suffix %s', 1209 self.resultsdir, suffix) 1210 1211 self._backup_firmware_sha = self.get_current_firmware_sha() 1212 1213 def is_firmware_saved(self): 1214 """Check if a firmware saved (called backup_firmware before). 1215 1216 @return: True if the firmware is backuped; otherwise False. 1217 """ 1218 return self._backup_firmware_sha != () 1219 1220 def clear_saved_firmware(self): 1221 """Clear the firmware saved by the method backup_firmware.""" 1222 self._backup_firmware_sha = () 1223 1224 def restore_firmware(self, suffix='.original'): 1225 """Restore firmware from host in resultsdir. 1226 1227 @param suffix: a string appended to backup file name 1228 """ 1229 if not self.is_firmware_changed(): 1230 return 1231 1232 # Backup current corrupted firmware. 1233 self.backup_firmware(suffix='.corrupt') 1234 1235 # Restore firmware. 1236 remote_temp_dir = self.faft_client.system.create_temp_dir() 1237 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1238 os.path.join(remote_temp_dir, 'bios')) 1239 1240 self.faft_client.bios.write_whole( 1241 os.path.join(remote_temp_dir, 'bios')) 1242 self.switcher.mode_aware_reboot() 1243 logging.info('Successfully restore firmware.') 1244 1245 def setup_firmwareupdate_shellball(self, shellball=None): 1246 """Deside a shellball to use in firmware update test. 1247 1248 Check if there is a given shellball, and it is a shell script. Then, 1249 send it to the remote host. Otherwise, use 1250 /usr/sbin/chromeos-firmwareupdate. 1251 1252 @param shellball: path of a shellball or default to None. 1253 1254 @return: Path of shellball in remote host. If use default shellball, 1255 reutrn None. 1256 """ 1257 updater_path = None 1258 if shellball: 1259 # Determine the firmware file is a shellball or a raw binary. 1260 is_shellball = (utils.system_output("file %s" % shellball).find( 1261 "shell script") != -1) 1262 if is_shellball: 1263 logging.info('Device will update firmware with shellball %s', 1264 shellball) 1265 temp_dir = self.faft_client.system.create_temp_dir( 1266 'shellball_') 1267 temp_shellball = os.path.join(temp_dir, 'updater.sh') 1268 self._client.send_file(shellball, temp_shellball) 1269 updater_path = temp_shellball 1270 else: 1271 raise error.TestFail( 1272 'The given shellball is not a shell script.') 1273 return updater_path 1274 1275 def is_kernel_changed(self): 1276 """Check if the current kernel is changed, by comparing its SHA1 hash. 1277 1278 @return: True if it is changed; otherwise, False. 1279 """ 1280 changed = False 1281 for p in ('A', 'B'): 1282 backup_sha = self._backup_kernel_sha.get(p, None) 1283 current_sha = self.faft_client.kernel.get_sha(p) 1284 if backup_sha != current_sha: 1285 changed = True 1286 logging.info('Kernel %s is changed', p) 1287 return changed 1288 1289 def backup_kernel(self, suffix='.original'): 1290 """Backup kernel to files, and the send them to host. 1291 1292 @param suffix: a string appended to backup file name. 1293 """ 1294 remote_temp_dir = self.faft_client.system.create_temp_dir() 1295 for p in ('A', 'B'): 1296 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1297 self.faft_client.kernel.dump(p, remote_path) 1298 self._client.get_file( 1299 remote_path, 1300 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1301 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1302 logging.info('Backup kernel stored in %s with suffix %s', 1303 self.resultsdir, suffix) 1304 1305 def is_kernel_saved(self): 1306 """Check if kernel images are saved (backup_kernel called before). 1307 1308 @return: True if the kernel is saved; otherwise, False. 1309 """ 1310 return len(self._backup_kernel_sha) != 0 1311 1312 def clear_saved_kernel(self): 1313 """Clear the kernel saved by backup_kernel().""" 1314 self._backup_kernel_sha = dict() 1315 1316 def restore_kernel(self, suffix='.original'): 1317 """Restore kernel from host in resultsdir. 1318 1319 @param suffix: a string appended to backup file name. 1320 """ 1321 if not self.is_kernel_changed(): 1322 return 1323 1324 # Backup current corrupted kernel. 1325 self.backup_kernel(suffix='.corrupt') 1326 1327 # Restore kernel. 1328 remote_temp_dir = self.faft_client.system.create_temp_dir() 1329 for p in ('A', 'B'): 1330 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1331 self._client.send_file( 1332 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1333 remote_path) 1334 self.faft_client.kernel.write(p, remote_path) 1335 1336 self.switcher.mode_aware_reboot() 1337 logging.info('Successfully restored kernel.') 1338 1339 def backup_cgpt_attributes(self): 1340 """Backup CGPT partition table attributes.""" 1341 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1342 1343 def restore_cgpt_attributes(self): 1344 """Restore CGPT partition table attributes.""" 1345 current_table = self.faft_client.cgpt.get_attributes() 1346 if current_table == self._backup_cgpt_attr: 1347 return 1348 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1349 self._backup_cgpt_attr, 1350 current_table) 1351 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr) 1352 1353 self.switcher.mode_aware_reboot() 1354 logging.info('Successfully restored CGPT table.') 1355 1356 def try_fwb(self, count=0): 1357 """set to try booting FWB count # times 1358 1359 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 1360 vboot2 1361 1362 @param count: an integer specifying value to program into 1363 fwb_tries(vb1)/fw_try_next(vb2) 1364 """ 1365 if self.fw_vboot2: 1366 self.faft_client.system.set_fw_try_next('B', count) 1367 else: 1368 # vboot1: we need to boot into fwb at least once 1369 if not count: 1370 count = count + 1 1371 self.faft_client.system.set_try_fw_b(count) 1372 1373