1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import pprint 10import re 11import time 12import uuid 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.server import test 17from autotest_lib.server.cros import vboot_constants as vboot 18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig 19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 20from autotest_lib.server.cros.faft.utils import mode_switcher 21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 22from autotest_lib.server.cros.servo import chrome_ec 23 24ConnectionError = mode_switcher.ConnectionError 25 26 27class FAFTBase(test.test): 28 """The base class of FAFT classes. 29 30 It launches the FAFTClient on DUT, such that the test can access its 31 firmware functions and interfaces. It also provides some methods to 32 handle the reboot mechanism, in order to ensure FAFTClient is still 33 connected after reboot. 34 """ 35 def initialize(self, host): 36 """Create a FAFTClient object and install the dependency.""" 37 self.servo = host.servo 38 self.servo.initialize_dut() 39 self._client = host 40 self.faft_client = RPCProxy(host) 41 self.lockfile = '/var/tmp/faft/lock' 42 43 44class FirmwareTest(FAFTBase): 45 """ 46 Base class that sets up helper objects/functions for firmware tests. 47 48 TODO: add documentaion as the FAFT rework progresses. 49 """ 50 version = 1 51 52 # Mapping of partition number of kernel and rootfs. 53 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 54 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 55 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 56 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 57 58 CHROMEOS_MAGIC = "CHROMEOS" 59 CORRUPTED_MAGIC = "CORRUPTD" 60 61 # Delay for waiting client to return before EC suspend 62 EC_SUSPEND_DELAY = 5 63 64 # Delay between EC suspend and wake 65 WAKE_DELAY = 10 66 67 # Delay between closing and opening lid 68 LID_DELAY = 1 69 70 _SERVOD_LOG = '/var/log/servod.log' 71 72 _ROOTFS_PARTITION_NUMBER = 3 73 74 _backup_firmware_sha = () 75 _backup_kernel_sha = dict() 76 _backup_cgpt_attr = dict() 77 _backup_gbb_flags = None 78 _backup_dev_mode = None 79 80 # Class level variable, keep track the states of one time setup. 81 # This variable is preserved across tests which inherit this class. 82 _global_setup_done = { 83 'gbb_flags': False, 84 'reimage': False, 85 'usb_check': False, 86 } 87 88 @classmethod 89 def check_setup_done(cls, label): 90 """Check if the given setup is done. 91 92 @param label: The label of the setup. 93 """ 94 return cls._global_setup_done[label] 95 96 @classmethod 97 def mark_setup_done(cls, label): 98 """Mark the given setup done. 99 100 @param label: The label of the setup. 101 """ 102 cls._global_setup_done[label] = True 103 104 @classmethod 105 def unmark_setup_done(cls, label): 106 """Mark the given setup not done. 107 108 @param label: The label of the setup. 109 """ 110 cls._global_setup_done[label] = False 111 112 def initialize(self, host, cmdline_args, ec_wp=None): 113 super(FirmwareTest, self).initialize(host) 114 self.run_id = str(uuid.uuid4()) 115 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 116 # Parse arguments from command line 117 args = {} 118 self.power_control = host.POWER_CONTROL_RPM 119 for arg in cmdline_args: 120 match = re.search("^(\w+)=(.+)", arg) 121 if match: 122 args[match.group(1)] = match.group(2) 123 if 'power_control' in args: 124 self.power_control = args['power_control'] 125 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 126 raise error.TestError('Valid values for --args=power_control ' 127 'are %s. But you entered wrong argument ' 128 'as "%s".' 129 % (host.POWER_CONTROL_VALID_ARGS, 130 self.power_control)) 131 132 self.faft_config = FAFTConfig( 133 self.faft_client.system.get_platform_name()) 134 self.checkers = FAFTCheckers(self) 135 self.switcher = mode_switcher.create_mode_switcher(self) 136 137 if self.faft_config.chrome_ec: 138 self.ec = chrome_ec.ChromeEC(self.servo) 139 # Check for presence of a USBPD console 140 if self.faft_config.chrome_usbpd: 141 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 142 elif self.faft_config.chrome_ec: 143 # If no separate USBPD console, then PD exists on EC console 144 self.usbpd = self.ec 145 # Get plankton console 146 self.plankton = host.plankton 147 self.plankton_host = host._plankton_host 148 149 self._setup_uart_capture() 150 self._setup_servo_log() 151 self._record_system_info() 152 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 153 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 154 if self.fw_vboot2: 155 self.faft_client.system.set_fw_try_next('A') 156 if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B': 157 logging.info('mainfw_act is B. rebooting to set it A') 158 self.switcher.mode_aware_reboot() 159 self._setup_gbb_flags() 160 self._stop_service('update-engine') 161 self._create_faft_lockfile() 162 self._setup_ec_write_protect(ec_wp) 163 # See chromium:239034 regarding needing this sync. 164 self.blocking_sync() 165 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 166 167 def cleanup(self): 168 """Autotest cleanup function.""" 169 # Unset state checker in case it's set by subclass 170 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 171 try: 172 self.faft_client.system.is_available() 173 except: 174 # Remote is not responding. Revive DUT so that subsequent tests 175 # don't fail. 176 self._restore_routine_from_timeout() 177 self.switcher.restore_mode() 178 self._restore_ec_write_protect() 179 self._restore_gbb_flags() 180 self._start_service('update-engine') 181 self._remove_faft_lockfile() 182 self._record_servo_log() 183 self._record_faft_client_log() 184 self._cleanup_uart_capture() 185 super(FirmwareTest, self).cleanup() 186 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 187 188 def _record_system_info(self): 189 """Record some critical system info to the attr keyval. 190 191 This info is used by generate_test_report later. 192 """ 193 system_info = { 194 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 195 'ec_version': self.faft_client.ec.get_version(), 196 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 197 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 198 'servod_version': self._client._servo_host.run( 199 'servod --version').stdout.strip(), 200 } 201 logging.info('System info:\n' + pprint.pformat(system_info)) 202 self.write_attr_keyval(system_info) 203 204 def invalidate_firmware_setup(self): 205 """Invalidate all firmware related setup state. 206 207 This method is called when the firmware is re-flashed. It resets all 208 firmware related setup states so that the next test setup properly 209 again. 210 """ 211 self.unmark_setup_done('gbb_flags') 212 213 def _retrieve_recovery_reason_from_trap(self): 214 """Try to retrieve the recovery reason from a trapped recovery screen. 215 216 @return: The recovery_reason, 0 if any error. 217 """ 218 recovery_reason = 0 219 logging.info('Try to retrieve recovery reason...') 220 if self.servo.get_usbkey_direction() == 'dut': 221 self.switcher.bypass_rec_mode() 222 else: 223 self.servo.switch_usbkey('dut') 224 225 try: 226 self.switcher.wait_for_client() 227 lines = self.faft_client.system.run_shell_command_get_output( 228 'crossystem recovery_reason') 229 recovery_reason = int(lines[0]) 230 logging.info('Got the recovery reason %d.', recovery_reason) 231 except ConnectionError: 232 logging.error('Failed to get the recovery reason due to connection ' 233 'error.') 234 return recovery_reason 235 236 def _reset_client(self): 237 """Reset client to a workable state. 238 239 This method is called when the client is not responsive. It may be 240 caused by the following cases: 241 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 242 - corrupted firmware; 243 - corrutped OS image. 244 """ 245 # DUT may halt on a firmware screen. Try cold reboot. 246 logging.info('Try cold reboot...') 247 self.switcher.mode_aware_reboot(reboot_type='cold', 248 sync_before_boot=False, 249 wait_for_dut_up=False) 250 self.switcher.wait_for_client_offline() 251 self.switcher.bypass_dev_mode() 252 try: 253 self.switcher.wait_for_client() 254 return 255 except ConnectionError: 256 logging.warn('Cold reboot doesn\'t help, still connection error.') 257 258 # DUT may be broken by a corrupted firmware. Restore firmware. 259 # We assume the recovery boot still works fine. Since the recovery 260 # code is in RO region and all FAFT tests don't change the RO region 261 # except GBB. 262 if self.is_firmware_saved(): 263 self._ensure_client_in_recovery() 264 logging.info('Try restore the original firmware...') 265 if self.is_firmware_changed(): 266 try: 267 self.restore_firmware() 268 return 269 except ConnectionError: 270 logging.warn('Restoring firmware doesn\'t help, still ' 271 'connection error.') 272 273 # Perhaps it's kernel that's broken. Let's try restoring it. 274 if self.is_kernel_saved(): 275 self._ensure_client_in_recovery() 276 logging.info('Try restore the original kernel...') 277 if self.is_kernel_changed(): 278 try: 279 self.restore_kernel() 280 return 281 except ConnectionError: 282 logging.warn('Restoring kernel doesn\'t help, still ' 283 'connection error.') 284 285 # DUT may be broken by a corrupted OS image. Restore OS image. 286 self._ensure_client_in_recovery() 287 logging.info('Try restore the OS image...') 288 self.faft_client.system.run_shell_command('chromeos-install --yes') 289 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 290 self.switcher.wait_for_client_offline() 291 self.switcher.bypass_dev_mode() 292 try: 293 self.switcher.wait_for_client() 294 logging.info('Successfully restore OS image.') 295 return 296 except ConnectionError: 297 logging.warn('Restoring OS image doesn\'t help, still connection ' 298 'error.') 299 300 def _ensure_client_in_recovery(self): 301 """Ensure client in recovery boot; reboot into it if necessary. 302 303 @raise TestError: if failed to boot the USB image. 304 """ 305 logging.info('Try boot into USB image...') 306 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 307 wait_for_dut_up=False) 308 self.servo.switch_usbkey('host') 309 self.switcher.bypass_rec_mode() 310 try: 311 self.switcher.wait_for_client() 312 except ConnectionError: 313 raise error.TestError('Failed to boot the USB image.') 314 315 def _restore_routine_from_timeout(self): 316 """A routine to try to restore the system from a timeout error. 317 318 This method is called when FAFT failed to connect DUT after reboot. 319 320 @raise TestFail: This exception is already raised, with a decription 321 why it failed. 322 """ 323 # DUT is disconnected. Capture the UART output for debug. 324 self._record_uart_capture() 325 326 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 327 # identify if it is a network flaky. 328 329 recovery_reason = self._retrieve_recovery_reason_from_trap() 330 331 # Reset client to a workable state. 332 self._reset_client() 333 334 # Raise the proper TestFail exception. 335 if recovery_reason: 336 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 337 'and timed out' % recovery_reason) 338 else: 339 raise error.TestFail('Timed out waiting for DUT reboot') 340 341 def assert_test_image_in_usb_disk(self, usb_dev=None): 342 """Assert an USB disk plugged-in on servo and a test image inside. 343 344 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 345 If None, it is detected automatically. 346 @raise TestError: if USB disk not detected or not a test image. 347 """ 348 if self.check_setup_done('usb_check'): 349 return 350 if usb_dev: 351 assert self.servo.get_usbkey_direction() == 'host' 352 else: 353 self.servo.switch_usbkey('host') 354 usb_dev = self.servo.probe_host_usb_dev() 355 if not usb_dev: 356 raise error.TestError( 357 'An USB disk should be plugged in the servo board.') 358 359 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 360 logging.info('usb dev is %s', usb_dev) 361 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 362 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 363 364 try: 365 usb_lsb = self.servo.system_output('cat %s' % 366 os.path.join(tmpd, 'etc/lsb-release')) 367 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 368 dut_lsb = '\n'.join(self.faft_client.system. 369 run_shell_command_get_output('cat /etc/lsb-release')) 370 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 371 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 372 raise error.TestError('USB stick in servo is no test image') 373 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 374 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 375 if usb_board != dut_board: 376 raise error.TestError('USB stick in servo contains a %s ' 377 'image, but DUT is a %s' % (usb_board, dut_board)) 378 finally: 379 for cmd in ('umount %s' % rootfs, 'sync', 'rm -rf %s' % tmpd): 380 self.servo.system(cmd) 381 382 self.mark_setup_done('usb_check') 383 384 def setup_usbkey(self, usbkey, host=None): 385 """Setup the USB disk for the test. 386 387 It checks the setup of USB disk and a valid ChromeOS test image inside. 388 It also muxes the USB disk to either the host or DUT by request. 389 390 @param usbkey: True if the USB disk is required for the test, False if 391 not required. 392 @param host: Optional, True to mux the USB disk to host, False to mux it 393 to DUT, default to do nothing. 394 """ 395 if usbkey: 396 self.assert_test_image_in_usb_disk() 397 elif host is None: 398 # USB disk is not required for the test. Better to mux it to host. 399 host = True 400 401 if host is True: 402 self.servo.switch_usbkey('host') 403 elif host is False: 404 self.servo.switch_usbkey('dut') 405 406 def get_usbdisk_path_on_dut(self): 407 """Get the path of the USB disk device plugged-in the servo on DUT. 408 409 Returns: 410 A string representing USB disk path, like '/dev/sdb', or None if 411 no USB disk is found. 412 """ 413 cmd = 'ls -d /dev/s*[a-z]' 414 original_value = self.servo.get_usbkey_direction() 415 416 # Make the dut unable to see the USB disk. 417 self.servo.switch_usbkey('off') 418 no_usb_set = set( 419 self.faft_client.system.run_shell_command_get_output(cmd)) 420 421 # Make the dut able to see the USB disk. 422 self.servo.switch_usbkey('dut') 423 time.sleep(self.faft_config.usb_plug) 424 has_usb_set = set( 425 self.faft_client.system.run_shell_command_get_output(cmd)) 426 427 # Back to its original value. 428 if original_value != self.servo.get_usbkey_direction(): 429 self.servo.switch_usbkey(original_value) 430 431 diff_set = has_usb_set - no_usb_set 432 if len(diff_set) == 1: 433 return diff_set.pop() 434 else: 435 return None 436 437 def _create_faft_lockfile(self): 438 """Creates the FAFT lockfile.""" 439 logging.info('Creating FAFT lockfile...') 440 command = 'touch %s' % (self.lockfile) 441 self.faft_client.system.run_shell_command(command) 442 443 def _remove_faft_lockfile(self): 444 """Removes the FAFT lockfile.""" 445 logging.info('Removing FAFT lockfile...') 446 command = 'rm -f %s' % (self.lockfile) 447 self.faft_client.system.run_shell_command(command) 448 449 def _stop_service(self, service): 450 """Stops a upstart service on the client. 451 452 @param service: The name of the upstart service. 453 """ 454 logging.info('Stopping %s...', service) 455 command = 'status %s | grep stop || stop %s' % (service, service) 456 self.faft_client.system.run_shell_command(command) 457 458 def _start_service(self, service): 459 """Starts a upstart service on the client. 460 461 @param service: The name of the upstart service. 462 """ 463 logging.info('Starting %s...', service) 464 command = 'status %s | grep start || start %s' % (service, service) 465 self.faft_client.system.run_shell_command(command) 466 467 def clear_set_gbb_flags(self, clear_mask, set_mask): 468 """Clear and set the GBB flags in the current flashrom. 469 470 @param clear_mask: A mask of flags to be cleared. 471 @param set_mask: A mask of flags to be set. 472 """ 473 gbb_flags = self.faft_client.bios.get_gbb_flags() 474 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 475 if new_flags != gbb_flags: 476 self._backup_gbb_flags = gbb_flags 477 logging.info('Changing GBB flags from 0x%x to 0x%x.', 478 gbb_flags, new_flags) 479 self.faft_client.bios.set_gbb_flags(new_flags) 480 # If changing FORCE_DEV_SWITCH_ON flag, reboot to get a clear state 481 if ((gbb_flags ^ new_flags) & vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON): 482 self.switcher.mode_aware_reboot() 483 else: 484 logging.info('Current GBB flags look good for test: 0x%x.', 485 gbb_flags) 486 487 def check_ec_capability(self, required_cap=None, suppress_warning=False): 488 """Check if current platform has required EC capabilities. 489 490 @param required_cap: A list containing required EC capabilities. Pass in 491 None to only check for presence of Chrome EC. 492 @param suppress_warning: True to suppress any warning messages. 493 @return: True if requirements are met. Otherwise, False. 494 """ 495 if not self.faft_config.chrome_ec: 496 if not suppress_warning: 497 logging.warn('Requires Chrome EC to run this test.') 498 return False 499 500 if not required_cap: 501 return True 502 503 for cap in required_cap: 504 if cap not in self.faft_config.ec_capability: 505 if not suppress_warning: 506 logging.warn('Requires EC capability "%s" to run this ' 507 'test.', cap) 508 return False 509 510 return True 511 512 def check_root_part_on_non_recovery(self, part): 513 """Check the partition number of root device and on normal/dev boot. 514 515 @param part: A string of partition number, e.g.'3'. 516 @return: True if the root device matched and on normal/dev boot; 517 otherwise, False. 518 """ 519 return self.checkers.root_part_checker(part) and \ 520 self.checkers.crossystem_checker({ 521 'mainfw_type': ('normal', 'developer'), 522 }) 523 524 def _join_part(self, dev, part): 525 """Return a concatenated string of device and partition number. 526 527 @param dev: A string of device, e.g.'/dev/sda'. 528 @param part: A string of partition number, e.g.'3'. 529 @return: A concatenated string of device and partition number, 530 e.g.'/dev/sda3'. 531 532 >>> seq = FirmwareTest() 533 >>> seq._join_part('/dev/sda', '3') 534 '/dev/sda3' 535 >>> seq._join_part('/dev/mmcblk0', '2') 536 '/dev/mmcblk0p2' 537 """ 538 if 'mmcblk' in dev: 539 return dev + 'p' + part 540 else: 541 return dev + part 542 543 def copy_kernel_and_rootfs(self, from_part, to_part): 544 """Copy kernel and rootfs from from_part to to_part. 545 546 @param from_part: A string of partition number to be copied from. 547 @param to_part: A string of partition number to be copied to. 548 """ 549 root_dev = self.faft_client.system.get_root_dev() 550 logging.info('Copying kernel from %s to %s. Please wait...', 551 from_part, to_part) 552 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 553 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 554 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 555 logging.info('Copying rootfs from %s to %s. Please wait...', 556 from_part, to_part) 557 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 558 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 559 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 560 561 def ensure_kernel_boot(self, part): 562 """Ensure the request kernel boot. 563 564 If not, it duplicates the current kernel to the requested kernel 565 and sets the requested higher priority to ensure it boot. 566 567 @param part: A string of kernel partition number or 'a'/'b'. 568 """ 569 if not self.checkers.root_part_checker(part): 570 if self.faft_client.kernel.diff_a_b(): 571 self.copy_kernel_and_rootfs( 572 from_part=self.OTHER_KERNEL_MAP[part], 573 to_part=part) 574 self.reset_and_prioritize_kernel(part) 575 self.switcher.mode_aware_reboot() 576 577 def set_hardware_write_protect(self, enable): 578 """Set hardware write protect pin. 579 580 @param enable: True if asserting write protect pin. Otherwise, False. 581 """ 582 try: 583 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 584 except: 585 # TODO(waihong): Remove this fallback when all servos have the 586 # above new fw_wp_state control. 587 self.servo.set('fw_wp_vref', self.faft_config.wp_voltage) 588 self.servo.set('fw_wp_en', 'on') 589 self.servo.set('fw_wp', 'on' if enable else 'off') 590 591 def set_ec_write_protect_and_reboot(self, enable): 592 """Set EC write protect status and reboot to take effect. 593 594 The write protect state is only activated if both hardware write 595 protect pin is asserted and software write protect flag is set. 596 This method asserts/deasserts hardware write protect pin first, and 597 set corresponding EC software write protect flag. 598 599 If the device uses non-Chrome EC, set the software write protect via 600 flashrom. 601 602 If the device uses Chrome EC, a reboot is required for write protect 603 to take effect. Since the software write protect flag cannot be unset 604 if hardware write protect pin is asserted, we need to deasserted the 605 pin first if we are deactivating write protect. Similarly, a reboot 606 is required before we can modify the software flag. 607 608 @param enable: True if activating EC write protect. Otherwise, False. 609 """ 610 self.set_hardware_write_protect(enable) 611 if self.faft_config.chrome_ec: 612 self.set_chrome_ec_write_protect_and_reboot(enable) 613 else: 614 self.faft_client.ec.set_write_protect(enable) 615 self.switcher.mode_aware_reboot() 616 617 def set_chrome_ec_write_protect_and_reboot(self, enable): 618 """Set Chrome EC write protect status and reboot to take effect. 619 620 @param enable: True if activating EC write protect. Otherwise, False. 621 """ 622 if enable: 623 # Set write protect flag and reboot to take effect. 624 self.ec.set_flash_write_protect(enable) 625 self.sync_and_ec_reboot() 626 else: 627 # Reboot after deasserting hardware write protect pin to deactivate 628 # write protect. And then remove software write protect flag. 629 self.sync_and_ec_reboot() 630 self.ec.set_flash_write_protect(enable) 631 632 def _setup_ec_write_protect(self, ec_wp): 633 """Setup for EC write-protection. 634 635 It makes sure the EC in the requested write-protection state. If not, it 636 flips the state. Flipping the write-protection requires DUT reboot. 637 638 @param ec_wp: True to request EC write-protected; False to request EC 639 not write-protected; None to do nothing. 640 """ 641 if ec_wp is None: 642 self._old_ec_wp = None 643 return 644 self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'}) 645 if ec_wp != self._old_ec_wp: 646 logging.info('The test required EC is %swrite-protected. Reboot ' 647 'and flip the state.', '' if ec_wp else 'not ') 648 self.switcher.mode_aware_reboot( 649 'custom', 650 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 651 652 def _restore_ec_write_protect(self): 653 """Restore the original EC write-protection.""" 654 if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None): 655 return 656 if not self.checkers.crossystem_checker( 657 {'wpsw_boot': '1' if self._old_ec_wp else '0'}): 658 logging.info('Restore original EC write protection and reboot.') 659 self.switcher.mode_aware_reboot( 660 'custom', 661 lambda:self.set_ec_write_protect_and_reboot( 662 self._old_ec_wp)) 663 664 def _setup_uart_capture(self): 665 """Setup the CPU/EC/PD UART capture.""" 666 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 667 self.servo.set('cpu_uart_capture', 'on') 668 self.cr50_console_file = None 669 self.ec_uart_file = None 670 self.usbpd_uart_file = None 671 try: 672 self.servo.set('cr50_console_capture', 'on') 673 self.cr50_console_file = os.path.join(self.resultsdir, 674 'cr50_console.txt') 675 self.cr50 = chrome_ec.ChromeCr50(self.servo) 676 except error.TestFail as e: 677 if 'No control named' in str(e): 678 logging.warn('cr50 console not supported.') 679 if self.faft_config.chrome_ec: 680 try: 681 self.servo.set('ec_uart_capture', 'on') 682 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 683 except error.TestFail as e: 684 if 'No control named' in str(e): 685 logging.warn('The servod is too old that ec_uart_capture ' 686 'not supported.') 687 # Log separate PD console if supported 688 if self.check_ec_capability(['usbpd_uart'], suppress_warning=True): 689 try: 690 self.servo.set('usbpd_uart_capture', 'on') 691 self.usbpd_uart_file = os.path.join(self.resultsdir, 692 'usbpd_uart.txt') 693 except error.TestFail as e: 694 if 'No control named' in str(e): 695 logging.warn('The servod is too old that ' 696 'usbpd_uart_capture is not supported.') 697 else: 698 logging.info('Not a Google EC, cannot capture ec console output.') 699 700 def _record_uart_capture(self): 701 """Record the CPU/EC/PD UART output stream to files.""" 702 if self.cpu_uart_file: 703 with open(self.cpu_uart_file, 'a') as f: 704 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream'))) 705 if self.cr50_console_file: 706 with open(self.cr50_console_file, 'a') as f: 707 f.write(ast.literal_eval(self.servo.get('cr50_console_stream'))) 708 if self.ec_uart_file and self.faft_config.chrome_ec: 709 with open(self.ec_uart_file, 'a') as f: 710 f.write(ast.literal_eval(self.servo.get('ec_uart_stream'))) 711 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 712 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 713 with open(self.usbpd_uart_file, 'a') as f: 714 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream'))) 715 716 def _cleanup_uart_capture(self): 717 """Cleanup the CPU/EC/PD UART capture.""" 718 # Flush the remaining UART output. 719 self._record_uart_capture() 720 self.servo.set('cpu_uart_capture', 'off') 721 if self.cr50_console_file: 722 self.servo.set('cr50_console_capture', 'off') 723 if self.ec_uart_file and self.faft_config.chrome_ec: 724 self.servo.set('ec_uart_capture', 'off') 725 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 726 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 727 self.servo.set('usbpd_uart_capture', 'off') 728 729 def _get_power_state(self, power_state): 730 """ 731 Return the current power state of the AP 732 """ 733 return self.ec.send_command_get_output("powerinfo", [power_state]) 734 735 def wait_power_state(self, power_state, retries): 736 """ 737 Wait for certain power state. 738 739 @param power_state: power state you are expecting 740 @param retries: retries. This is necessary if AP is powering down 741 and transitioning through different states. 742 """ 743 logging.info('Checking power state "%s" maximum %d times.', 744 power_state, retries) 745 while retries > 0: 746 logging.info("try count: %d", retries) 747 try: 748 retries = retries - 1 749 ret = self._get_power_state(power_state) 750 return True 751 except error.TestFail: 752 pass 753 return False 754 755 def suspend(self): 756 """Suspends the DUT.""" 757 cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY 758 self.faft_client.system.run_shell_command(cmd) 759 time.sleep(self.EC_SUSPEND_DELAY) 760 761 def _fetch_servo_log(self): 762 """Fetch the servo log.""" 763 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2) 764 servo_log = self.servo.system_output(cmd) 765 return None if servo_log == 'NOTFOUND' else servo_log 766 767 def _setup_servo_log(self): 768 """Setup the servo log capturing.""" 769 self.servo_log_original_len = -1 770 if self.servo.is_localhost(): 771 # No servo log recorded when servod runs locally. 772 return 773 774 servo_log = self._fetch_servo_log() 775 if servo_log: 776 self.servo_log_original_len = len(servo_log) 777 else: 778 logging.warn('Servo log file not found.') 779 780 def _record_servo_log(self): 781 """Record the servo log to the results directory.""" 782 if self.servo_log_original_len != -1: 783 servo_log = self._fetch_servo_log() 784 servo_log_file = os.path.join(self.resultsdir, 'servod.log') 785 with open(servo_log_file, 'a') as f: 786 f.write(servo_log[self.servo_log_original_len:]) 787 788 def _record_faft_client_log(self): 789 """Record the faft client log to the results directory.""" 790 client_log = self.faft_client.system.dump_log(True) 791 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 792 with open(client_log_file, 'w') as f: 793 f.write(client_log) 794 795 def _setup_gbb_flags(self): 796 """Setup the GBB flags for FAFT test.""" 797 if self.faft_config.gbb_version < 1.1: 798 logging.info('Skip modifying GBB on versions older than 1.1.') 799 return 800 801 if self.check_setup_done('gbb_flags'): 802 return 803 804 logging.info('Set proper GBB flags for test.') 805 self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY | 806 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 807 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB | 808 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK | 809 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP, 810 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM | 811 vboot.GBB_FLAG_FAFT_KEY_OVERIDE) 812 self.mark_setup_done('gbb_flags') 813 814 def drop_backup_gbb_flags(self): 815 """Drops the backup GBB flags. 816 817 This can be used when a test intends to permanently change GBB flags. 818 """ 819 self._backup_gbb_flags = None 820 821 def _restore_gbb_flags(self): 822 """Restore GBB flags to their original state.""" 823 if self._backup_gbb_flags is None: 824 return 825 # Setting up and restoring the GBB flags take a lot of time. For 826 # speed-up purpose, don't restore it. 827 logging.info('***') 828 logging.info('*** Please manually restore the original GBB flags to: ' 829 '0x%x ***', self._backup_gbb_flags) 830 logging.info('***') 831 self.unmark_setup_done('gbb_flags') 832 833 def setup_tried_fwb(self, tried_fwb): 834 """Setup for fw B tried state. 835 836 It makes sure the system in the requested fw B tried state. If not, it 837 tries to do so. 838 839 @param tried_fwb: True if requested in tried_fwb=1; 840 False if tried_fwb=0. 841 """ 842 if tried_fwb: 843 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 844 logging.info( 845 'Firmware is not booted with tried_fwb. Reboot into it.') 846 self.faft_client.system.set_try_fw_b() 847 else: 848 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 849 logging.info( 850 'Firmware is booted with tried_fwb. Reboot to clear.') 851 852 def power_on(self): 853 """Switch DUT AC power on.""" 854 self._client.power_on(self.power_control) 855 856 def power_off(self): 857 """Switch DUT AC power off.""" 858 self._client.power_off(self.power_control) 859 860 def power_cycle(self): 861 """Power cycle DUT AC power.""" 862 self._client.power_cycle(self.power_control) 863 864 def setup_rw_boot(self, section='a'): 865 """Make sure firmware is in RW-boot mode. 866 867 If the given firmware section is in RO-boot mode, turn off the RO-boot 868 flag and reboot DUT into RW-boot mode. 869 870 @param section: A firmware section, either 'a' or 'b'. 871 """ 872 flags = self.faft_client.bios.get_preamble_flags(section) 873 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 874 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 875 self.faft_client.bios.set_preamble_flags(section, flags) 876 self.switcher.mode_aware_reboot() 877 878 def setup_kernel(self, part): 879 """Setup for kernel test. 880 881 It makes sure both kernel A and B bootable and the current boot is 882 the requested kernel part. 883 884 @param part: A string of kernel partition number or 'a'/'b'. 885 """ 886 self.ensure_kernel_boot(part) 887 logging.info('Checking the integrity of kernel B and rootfs B...') 888 if (self.faft_client.kernel.diff_a_b() or 889 not self.faft_client.rootfs.verify_rootfs('B')): 890 logging.info('Copying kernel and rootfs from A to B...') 891 self.copy_kernel_and_rootfs(from_part=part, 892 to_part=self.OTHER_KERNEL_MAP[part]) 893 self.reset_and_prioritize_kernel(part) 894 895 def reset_and_prioritize_kernel(self, part): 896 """Make the requested partition highest priority. 897 898 This function also reset kerenl A and B to bootable. 899 900 @param part: A string of partition number to be prioritized. 901 """ 902 root_dev = self.faft_client.system.get_root_dev() 903 # Reset kernel A and B to bootable. 904 self.faft_client.system.run_shell_command( 905 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 906 self.faft_client.system.run_shell_command( 907 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 908 # Set kernel part highest priority. 909 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 910 (self.KERNEL_MAP[part], root_dev)) 911 912 def blocking_sync(self): 913 """Run a blocking sync command.""" 914 # The double calls to sync fakes a blocking call 915 # since the first call returns before the flush 916 # is complete, but the second will wait for the 917 # first to finish. 918 self.faft_client.system.run_shell_command('sync') 919 self.faft_client.system.run_shell_command('sync') 920 921 # sync only sends SYNCHRONIZE_CACHE but doesn't 922 # check the status. For mmc devices, use `mmc 923 # status get` command to send an empty command to 924 # wait for the disk to be available again. For 925 # other devices, hdparm sends TUR to check if 926 # a device is ready for transfer operation. 927 root_dev = self.faft_client.system.get_root_dev() 928 if 'mmcblk' in root_dev: 929 self.faft_client.system.run_shell_command('mmc status get %s' % 930 root_dev) 931 else: 932 self.faft_client.system.run_shell_command('hdparm -f %s' % root_dev) 933 934 def sync_and_ec_reboot(self, flags=''): 935 """Request the client sync and do a EC triggered reboot. 936 937 @param flags: Optional, a space-separated string of flags passed to EC 938 reboot command, including: 939 default: EC soft reboot; 940 'hard': EC cold/hard reboot. 941 """ 942 self.blocking_sync() 943 self.ec.reboot(flags) 944 time.sleep(self.faft_config.ec_boot_to_console) 945 self.check_lid_and_power_on() 946 947 def reboot_and_reset_tpm(self): 948 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 949 self.switcher.reboot_to_mode(to_mode='rec') 950 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 951 self.switcher.mode_aware_reboot() 952 953 def full_power_off_and_on(self): 954 """Shutdown the device by pressing power button and power on again.""" 955 boot_id = self.get_bootid() 956 # Press power button to trigger Chrome OS normal shutdown process. 957 # We use a customized delay since the normal-press 1.2s is not enough. 958 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 959 # device can take 44-51 seconds to restart, 960 # add buffer from the default timeout of 60 seconds. 961 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 962 time.sleep(self.faft_config.shutdown) 963 # Short press power button to boot DUT again. 964 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 965 966 def check_lid_and_power_on(self): 967 """ 968 On devices with EC software sync, system powers on after EC reboots if 969 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 970 This method checks lid switch state and presses power button if 971 necessary. 972 """ 973 if self.servo.get("lid_open") == "no": 974 time.sleep(self.faft_config.software_sync) 975 self.servo.power_short_press() 976 977 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 978 """Modify the kernel header magic in USB stick. 979 980 The kernel header magic is the first 8-byte of kernel partition. 981 We modify it to make it fail on kernel verification check. 982 983 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 984 @param from_magic: A string of magic which we change it from. 985 @param to_magic: A string of magic which we change it to. 986 @raise TestError: if failed to change magic. 987 """ 988 assert len(from_magic) == 8 989 assert len(to_magic) == 8 990 # USB image only contains one kernel. 991 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 992 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 993 current_magic = self.servo.system_output(read_cmd) 994 if current_magic == to_magic: 995 logging.info("The kernel magic is already %s.", current_magic) 996 return 997 if current_magic != from_magic: 998 raise error.TestError("Invalid kernel image on USB: wrong magic.") 999 1000 logging.info('Modify the kernel magic in USB, from %s to %s.', 1001 from_magic, to_magic) 1002 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1003 " 2>/dev/null" % (to_magic, kernel_part)) 1004 self.servo.system(write_cmd) 1005 1006 if self.servo.system_output(read_cmd) != to_magic: 1007 raise error.TestError("Failed to write new magic.") 1008 1009 def corrupt_usb_kernel(self, usb_dev): 1010 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1011 1012 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1013 """ 1014 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1015 self.CORRUPTED_MAGIC) 1016 1017 def restore_usb_kernel(self, usb_dev): 1018 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1019 1020 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1021 """ 1022 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1023 self.CHROMEOS_MAGIC) 1024 1025 def _call_action(self, action_tuple, check_status=False): 1026 """Call the action function with/without arguments. 1027 1028 @param action_tuple: A function, or a tuple (function, args, error_msg), 1029 in which, args and error_msg are optional. args is 1030 either a value or a tuple if multiple arguments. 1031 This can also be a list containing multiple 1032 function or tuple. In this case, these actions are 1033 called in sequence. 1034 @param check_status: Check the return value of action function. If not 1035 succeed, raises a TestFail exception. 1036 @return: The result value of the action function. 1037 @raise TestError: An error when the action function is not callable. 1038 @raise TestFail: When check_status=True, action function not succeed. 1039 """ 1040 if isinstance(action_tuple, list): 1041 return all([self._call_action(action, check_status=check_status) 1042 for action in action_tuple]) 1043 1044 action = action_tuple 1045 args = () 1046 error_msg = 'Not succeed' 1047 if isinstance(action_tuple, tuple): 1048 action = action_tuple[0] 1049 if len(action_tuple) >= 2: 1050 args = action_tuple[1] 1051 if not isinstance(args, tuple): 1052 args = (args,) 1053 if len(action_tuple) >= 3: 1054 error_msg = action_tuple[2] 1055 1056 if action is None: 1057 return 1058 1059 if not callable(action): 1060 raise error.TestError('action is not callable!') 1061 1062 info_msg = 'calling %s' % action.__name__ 1063 if args: 1064 info_msg += ' with args %s' % str(args) 1065 logging.info(info_msg) 1066 ret = action(*args) 1067 1068 if check_status and not ret: 1069 raise error.TestFail('%s: %s returning %s' % 1070 (error_msg, info_msg, str(ret))) 1071 return ret 1072 1073 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1074 run_power_action=True, post_power_action=None, 1075 shutdown_timeout=None): 1076 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1077 1078 @param shutdown_action: function which makes DUT shutdown, like 1079 pressing power key. 1080 @param pre_power_action: function which is called before next power on. 1081 @param run_power_action: power_key press by default, set to None to skip. 1082 @param post_power_action: function which is called after next power on. 1083 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1084 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1085 """ 1086 self._call_action(shutdown_action) 1087 logging.info('Wait to ensure DUT shut down...') 1088 try: 1089 if shutdown_timeout is None: 1090 shutdown_timeout = self.faft_config.shutdown_timeout 1091 self.switcher.wait_for_client(timeout=shutdown_timeout) 1092 raise error.TestFail( 1093 'Should shut the device down after calling %s.' % 1094 shutdown_action.__name__) 1095 except ConnectionError: 1096 logging.info( 1097 'DUT is surely shutdown. We are going to power it on again...') 1098 1099 if pre_power_action: 1100 self._call_action(pre_power_action) 1101 if run_power_action: 1102 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1103 if post_power_action: 1104 self._call_action(post_power_action) 1105 1106 def get_bootid(self, retry=3): 1107 """ 1108 Return the bootid. 1109 """ 1110 boot_id = None 1111 while retry: 1112 try: 1113 boot_id = self._client.get_boot_id() 1114 break 1115 except error.AutoservRunError: 1116 retry -= 1 1117 if retry: 1118 logging.info('Retry to get boot_id...') 1119 else: 1120 logging.warning('Failed to get boot_id.') 1121 logging.info('boot_id: %s', boot_id) 1122 return boot_id 1123 1124 def check_state(self, func): 1125 """ 1126 Wrapper around _call_action with check_status set to True. This is a 1127 helper function to be used by tests and is currently implemented by 1128 calling _call_action with check_status=True. 1129 1130 TODO: This function's arguments need to be made more stringent. And 1131 its functionality should be moved over to check functions directly in 1132 the future. 1133 1134 @param func: A function, or a tuple (function, args, error_msg), 1135 in which, args and error_msg are optional. args is 1136 either a value or a tuple if multiple arguments. 1137 This can also be a list containing multiple 1138 function or tuple. In this case, these actions are 1139 called in sequence. 1140 @return: The result value of the action function. 1141 @raise TestFail: If the function does notsucceed. 1142 """ 1143 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1144 self._call_action(func, check_status=True) 1145 logging.info("-[FAFT]-[ end state_checker ]----------------") 1146 1147 def get_current_firmware_sha(self): 1148 """Get current firmware sha of body and vblock. 1149 1150 @return: Current firmware sha follows the order ( 1151 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha) 1152 """ 1153 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'), 1154 self.faft_client.bios.get_body_sha('a'), 1155 self.faft_client.bios.get_sig_sha('b'), 1156 self.faft_client.bios.get_body_sha('b')) 1157 if not all(current_firmware_sha): 1158 raise error.TestError('Failed to get firmware sha.') 1159 return current_firmware_sha 1160 1161 def is_firmware_changed(self): 1162 """Check if the current firmware changed, by comparing its SHA. 1163 1164 @return: True if it is changed, otherwise Flase. 1165 """ 1166 # Device may not be rebooted after test. 1167 self.faft_client.bios.reload() 1168 1169 current_sha = self.get_current_firmware_sha() 1170 1171 if current_sha == self._backup_firmware_sha: 1172 return False 1173 else: 1174 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0]) 1175 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1]) 1176 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2]) 1177 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3]) 1178 logging.info("Firmware changed:") 1179 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA) 1180 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB) 1181 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN) 1182 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB) 1183 return True 1184 1185 def backup_firmware(self, suffix='.original'): 1186 """Backup firmware to file, and then send it to host. 1187 1188 @param suffix: a string appended to backup file name 1189 """ 1190 remote_temp_dir = self.faft_client.system.create_temp_dir() 1191 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1192 self.faft_client.bios.dump_whole(remote_bios_path) 1193 self._client.get_file(remote_bios_path, 1194 os.path.join(self.resultsdir, 'bios' + suffix)) 1195 self._client.run('rm -rf %s' % remote_temp_dir) 1196 logging.info('Backup firmware stored in %s with suffix %s', 1197 self.resultsdir, suffix) 1198 1199 self._backup_firmware_sha = self.get_current_firmware_sha() 1200 1201 def is_firmware_saved(self): 1202 """Check if a firmware saved (called backup_firmware before). 1203 1204 @return: True if the firmware is backuped; otherwise False. 1205 """ 1206 return self._backup_firmware_sha != () 1207 1208 def clear_saved_firmware(self): 1209 """Clear the firmware saved by the method backup_firmware.""" 1210 self._backup_firmware_sha = () 1211 1212 def restore_firmware(self, suffix='.original'): 1213 """Restore firmware from host in resultsdir. 1214 1215 @param suffix: a string appended to backup file name 1216 """ 1217 if not self.is_firmware_changed(): 1218 return 1219 1220 # Backup current corrupted firmware. 1221 self.backup_firmware(suffix='.corrupt') 1222 1223 # Restore firmware. 1224 remote_temp_dir = self.faft_client.system.create_temp_dir() 1225 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1226 os.path.join(remote_temp_dir, 'bios')) 1227 1228 self.faft_client.bios.write_whole( 1229 os.path.join(remote_temp_dir, 'bios')) 1230 self.switcher.mode_aware_reboot() 1231 logging.info('Successfully restore firmware.') 1232 1233 def setup_firmwareupdate_shellball(self, shellball=None): 1234 """Setup a shellball to use in firmware update test. 1235 1236 Check if there is a given shellball, and it is a shell script. Then, 1237 send it to the remote host. Otherwise, use the 1238 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1239 BIOS and EC images with the active firmware images. 1240 1241 @param shellball: path of a shellball or default to None. 1242 """ 1243 if shellball: 1244 # Determine the firmware file is a shellball or a raw binary. 1245 is_shellball = (utils.system_output("file %s" % shellball).find( 1246 "shell script") != -1) 1247 if is_shellball: 1248 logging.info('Device will update firmware with shellball %s', 1249 shellball) 1250 temp_path = self.faft_client.updater.get_temp_path() 1251 working_shellball = os.path.join(temp_path, 1252 'chromeos-firmwareupdate') 1253 self._client.send_file(shellball, working_shellball) 1254 self.faft_client.updater.extract_shellball() 1255 else: 1256 raise error.TestFail( 1257 'The given shellball is not a shell script.') 1258 else: 1259 logging.info('No shellball given, use the original shellball and ' 1260 'replace its BIOS and EC images.') 1261 work_path = self.faft_client.updater.get_work_path() 1262 bios_in_work_path = os.path.join(work_path, 'bios.bin') 1263 ec_in_work_path = os.path.join(work_path, 'ec.bin') 1264 self.faft_client.bios.dump_whole(bios_in_work_path) 1265 if self.faft_config.chrome_ec: 1266 self.faft_client.ec.dump_firmware(ec_in_work_path) 1267 self.faft_client.updater.repack_shellball() 1268 1269 def is_kernel_changed(self): 1270 """Check if the current kernel is changed, by comparing its SHA1 hash. 1271 1272 @return: True if it is changed; otherwise, False. 1273 """ 1274 changed = False 1275 for p in ('A', 'B'): 1276 backup_sha = self._backup_kernel_sha.get(p, None) 1277 current_sha = self.faft_client.kernel.get_sha(p) 1278 if backup_sha != current_sha: 1279 changed = True 1280 logging.info('Kernel %s is changed', p) 1281 return changed 1282 1283 def backup_kernel(self, suffix='.original'): 1284 """Backup kernel to files, and the send them to host. 1285 1286 @param suffix: a string appended to backup file name. 1287 """ 1288 remote_temp_dir = self.faft_client.system.create_temp_dir() 1289 for p in ('A', 'B'): 1290 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1291 self.faft_client.kernel.dump(p, remote_path) 1292 self._client.get_file( 1293 remote_path, 1294 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1295 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1296 logging.info('Backup kernel stored in %s with suffix %s', 1297 self.resultsdir, suffix) 1298 1299 def is_kernel_saved(self): 1300 """Check if kernel images are saved (backup_kernel called before). 1301 1302 @return: True if the kernel is saved; otherwise, False. 1303 """ 1304 return len(self._backup_kernel_sha) != 0 1305 1306 def clear_saved_kernel(self): 1307 """Clear the kernel saved by backup_kernel().""" 1308 self._backup_kernel_sha = dict() 1309 1310 def restore_kernel(self, suffix='.original'): 1311 """Restore kernel from host in resultsdir. 1312 1313 @param suffix: a string appended to backup file name. 1314 """ 1315 if not self.is_kernel_changed(): 1316 return 1317 1318 # Backup current corrupted kernel. 1319 self.backup_kernel(suffix='.corrupt') 1320 1321 # Restore kernel. 1322 remote_temp_dir = self.faft_client.system.create_temp_dir() 1323 for p in ('A', 'B'): 1324 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1325 self._client.send_file( 1326 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1327 remote_path) 1328 self.faft_client.kernel.write(p, remote_path) 1329 1330 self.switcher.mode_aware_reboot() 1331 logging.info('Successfully restored kernel.') 1332 1333 def backup_cgpt_attributes(self): 1334 """Backup CGPT partition table attributes.""" 1335 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1336 1337 def restore_cgpt_attributes(self): 1338 """Restore CGPT partition table attributes.""" 1339 current_table = self.faft_client.cgpt.get_attributes() 1340 if current_table == self._backup_cgpt_attr: 1341 return 1342 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1343 self._backup_cgpt_attr, 1344 current_table) 1345 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr) 1346 1347 self.switcher.mode_aware_reboot() 1348 logging.info('Successfully restored CGPT table.') 1349 1350 def try_fwb(self, count=0): 1351 """set to try booting FWB count # times 1352 1353 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 1354 vboot2 1355 1356 @param count: an integer specifying value to program into 1357 fwb_tries(vb1)/fw_try_next(vb2) 1358 """ 1359 if self.fw_vboot2: 1360 self.faft_client.system.set_fw_try_next('B', count) 1361 else: 1362 # vboot1: we need to boot into fwb at least once 1363 if not count: 1364 count = count + 1 1365 self.faft_client.system.set_try_fw_b(count) 1366 1367