# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import time import sys from multiprocessing import Process from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.cros.faft.utils import shell_wrapper class ConnectionError(Exception): """Raised on an error of connecting DUT.""" pass class _BaseFwBypasser(object): """Base class that controls bypass logic for firmware screens.""" def __init__(self, faft_framework): self.servo = faft_framework.servo self.faft_config = faft_framework.faft_config self.client_host = faft_framework._client def bypass_dev_mode(self): """Bypass the dev mode firmware logic to boot internal image.""" raise NotImplementedError def bypass_dev_boot_usb(self): """Bypass the dev mode firmware logic to boot USB.""" raise NotImplementedError def bypass_rec_mode(self): """Bypass the rec mode firmware logic to boot USB.""" raise NotImplementedError def trigger_dev_to_rec(self): """Trigger to the rec mode from the dev screen.""" raise NotImplementedError def trigger_rec_to_dev(self): """Trigger to the dev mode from the rec screen.""" raise NotImplementedError def trigger_dev_to_normal(self): """Trigger to the normal mode from the dev screen.""" raise NotImplementedError class _CtrlDBypasser(_BaseFwBypasser): """Controls bypass logic via Ctrl-D combo.""" def bypass_dev_mode(self): """Bypass the dev mode firmware logic to boot internal image.""" time.sleep(self.faft_config.firmware_screen) self.servo.ctrl_d() def bypass_dev_boot_usb(self): """Bypass the dev mode firmware logic to boot USB.""" time.sleep(self.faft_config.firmware_screen) self.servo.ctrl_u() def bypass_rec_mode(self): """Bypass the rec mode firmware logic to boot USB.""" self.servo.switch_usbkey('host') time.sleep(self.faft_config.usb_plug) self.servo.switch_usbkey('dut') if not self.client_host.ping_wait_up( timeout=self.faft_config.delay_reboot_to_ping): psc = self.servo.get_power_state_controller() psc.power_on(psc.REC_ON) def trigger_dev_to_rec(self): """Trigger to the rec mode from the dev screen.""" time.sleep(self.faft_config.firmware_screen) # Pressing Enter for too long triggers a second key press. # Let's press it without delay self.servo.enter_key(press_secs=0) # For Alex/ZGB, there is a dev warning screen in text mode. # Skip it by pressing Ctrl-D. if self.faft_config.need_dev_transition: time.sleep(self.faft_config.legacy_text_screen) self.servo.ctrl_d() def trigger_rec_to_dev(self): """Trigger to the dev mode from the rec screen.""" time.sleep(self.faft_config.firmware_screen) self.servo.ctrl_d() time.sleep(self.faft_config.confirm_screen) if self.faft_config.rec_button_dev_switch: logging.info('RECOVERY button pressed to switch to dev mode') self.servo.toggle_recovery_switch() else: logging.info('ENTER pressed to switch to dev mode') self.servo.enter_key() def trigger_dev_to_normal(self): """Trigger to the normal mode from the dev screen.""" time.sleep(self.faft_config.firmware_screen) self.servo.enter_key() time.sleep(self.faft_config.confirm_screen) self.servo.enter_key() class _JetstreamBypasser(_BaseFwBypasser): """Controls bypass logic of Jetstream devices.""" def bypass_dev_mode(self): """Bypass the dev mode firmware logic to boot internal image.""" # Jetstream does nothing to bypass. pass def bypass_dev_boot_usb(self): """Bypass the dev mode firmware logic to boot USB.""" self.servo.switch_usbkey('dut') time.sleep(self.faft_config.firmware_screen) self.servo.toggle_development_switch() def bypass_rec_mode(self): """Bypass the rec mode firmware logic to boot USB.""" self.servo.switch_usbkey('host') time.sleep(self.faft_config.usb_plug) self.servo.switch_usbkey('dut') if not self.client_host.ping_wait_up( timeout=self.faft_config.delay_reboot_to_ping): psc = self.servo.get_power_state_controller() psc.power_on(psc.REC_ON) def trigger_dev_to_rec(self): """Trigger to the rec mode from the dev screen.""" # Jetstream does not have this triggering logic. raise NotImplementedError def trigger_rec_to_dev(self): """Trigger to the dev mode from the rec screen.""" self.servo.disable_development_mode() time.sleep(self.faft_config.firmware_screen) self.servo.toggle_development_switch() def trigger_dev_to_normal(self): """Trigger to the normal mode from the dev screen.""" # Jetstream does not have this triggering logic. raise NotImplementedError def _create_fw_bypasser(faft_framework): """Creates a proper firmware bypasser. @param faft_framework: The main FAFT framework object. """ bypasser_type = faft_framework.faft_config.fw_bypasser_type if bypasser_type == 'ctrl_d_bypasser': logging.info('Create a CtrlDBypasser') return _CtrlDBypasser(faft_framework) elif bypasser_type == 'jetstream_bypasser': logging.info('Create a JetstreamBypasser') return _JetstreamBypasser(faft_framework) elif bypasser_type == 'ryu_bypasser': # FIXME Create an RyuBypasser logging.info('Create a CtrlDBypasser') return _CtrlDBypasser(faft_framework) else: raise NotImplementedError('Not supported fw_bypasser_type: %s', bypasser_type) class _BaseModeSwitcher(object): """Base class that controls firmware mode switching.""" def __init__(self, faft_framework): self.faft_framework = faft_framework self.client_host = faft_framework._client self.faft_client = faft_framework.faft_client self.servo = faft_framework.servo self.faft_config = faft_framework.faft_config self.checkers = faft_framework.checkers self.bypasser = _create_fw_bypasser(faft_framework) self._backup_mode = None def setup_mode(self, mode): """Setup for the requested mode. It makes sure the system in the requested mode. If not, it tries to do so. @param mode: A string of mode, one of 'normal', 'dev', or 'rec'. """ if not self.checkers.mode_checker(mode): logging.info('System not in expected %s mode. Reboot into it.', mode) if self._backup_mode is None: # Only resume to normal/dev mode after test, not recovery. self._backup_mode = 'dev' if mode == 'normal' else 'normal' self.reboot_to_mode(mode) def restore_mode(self): """Restores original dev mode status if it has changed.""" if self._backup_mode is not None: self.reboot_to_mode(self._backup_mode) def reboot_to_mode(self, to_mode, from_mode=None, sync_before_boot=True, wait_for_dut_up=True): """Reboot and execute the mode switching sequence. @param to_mode: The target mode, one of 'normal', 'dev', or 'rec'. @param from_mode: The original mode, optional, one of 'normal, 'dev', or 'rec'. @param sync_before_boot: True to sync to disk before booting. @param wait_for_dut_up: True to wait DUT online again. False to do the reboot and mode switching sequence only and may need more operations to pass the firmware screen. """ logging.info('-[ModeSwitcher]-[ start reboot_to_mode(%r, %r, %r) ]-', to_mode, from_mode, wait_for_dut_up) if sync_before_boot: self.faft_framework.blocking_sync() if to_mode == 'rec': self._enable_rec_mode_and_reboot(usb_state='dut') if wait_for_dut_up: self.wait_for_client() elif to_mode == 'dev': self._enable_dev_mode_and_reboot() if wait_for_dut_up: self.bypass_dev_mode() self.wait_for_client() elif to_mode == 'normal': self._enable_normal_mode_and_reboot() if wait_for_dut_up: self.wait_for_client() else: raise NotImplementedError( 'Not supported mode switching from %s to %s' % (str(from_mode), to_mode)) logging.info('-[ModeSwitcher]-[ end reboot_to_mode(%r, %r, %r) ]-', to_mode, from_mode, wait_for_dut_up) def simple_reboot(self, reboot_type='warm', sync_before_boot=True): """Simple reboot method Just reboot the DUT using either cold or warm reset. Does not wait for DUT to come back online. Will wait for test to handle this. @param reboot_type: A string of reboot type, 'warm' or 'cold'. If reboot_type != warm/cold, raise exception. @param sync_before_boot: True to sync to disk before booting. If sync_before_boot=False, DUT offline before calling mode_aware_reboot. """ if reboot_type == 'warm': reboot_method = self.servo.get_power_state_controller().warm_reset elif reboot_type == 'cold': reboot_method = self.servo.get_power_state_controller().reset else: raise NotImplementedError('Not supported reboot_type: %s', reboot_type) if sync_before_boot: boot_id = self.faft_framework.get_bootid() self.faft_framework.blocking_sync() logging.info("-[ModeSwitcher]-[ start simple_reboot(%r) ]-", reboot_type) reboot_method() if sync_before_boot: self.wait_for_client_offline(orig_boot_id=boot_id) logging.info("-[ModeSwitcher]-[ end simple_reboot(%r) ]-", reboot_type) def mode_aware_reboot(self, reboot_type=None, reboot_method=None, sync_before_boot=True, wait_for_dut_up=True): """Uses a mode-aware way to reboot DUT. For example, if DUT is in dev mode, it requires pressing Ctrl-D to bypass the developer screen. @param reboot_type: A string of reboot type, one of 'warm', 'cold', or 'custom'. Default is a warm reboot. @param reboot_method: A custom method to do the reboot. Only use it if reboot_type='custom'. @param sync_before_boot: True to sync to disk before booting. If sync_before_boot=False, DUT offline before calling mode_aware_reboot. @param wait_for_dut_up: True to wait DUT online again. False to do the reboot only. """ if reboot_type is None or reboot_type == 'warm': reboot_method = self.servo.get_power_state_controller().warm_reset elif reboot_type == 'cold': reboot_method = self.servo.get_power_state_controller().reset elif reboot_type != 'custom': raise NotImplementedError('Not supported reboot_type: %s', reboot_type) logging.info("-[ModeSwitcher]-[ start mode_aware_reboot(%r, %s, ..) ]-", reboot_type, reboot_method.__name__) is_dev = False if sync_before_boot: is_dev = self.checkers.mode_checker('dev') boot_id = self.faft_framework.get_bootid() self.faft_framework.blocking_sync() logging.info("-[mode_aware_reboot]-[ is_dev=%s ]-", is_dev); reboot_method() if sync_before_boot: self.wait_for_client_offline(orig_boot_id=boot_id) # Encapsulating the behavior of skipping dev firmware screen, # hitting ctrl-D # Note that if booting from recovery mode, will not # call bypass_dev_mode because can't determine prior to # reboot if we're going to boot up in dev or normal mode. if is_dev: self.bypass_dev_mode() if wait_for_dut_up: self.wait_for_client() logging.info("-[ModeSwitcher]-[ end mode_aware_reboot(%r, %s, ..) ]-", reboot_type, reboot_method.__name__) def _enable_rec_mode_and_reboot(self, usb_state=None): """Switch to rec mode and reboot. This method emulates the behavior of the old physical recovery switch, i.e. switch ON + reboot + switch OFF, and the new keyboard controlled recovery mode, i.e. just press Power + Esc + Refresh. @param usb_state: A string, one of 'dut', 'host', or 'off'. """ psc = self.servo.get_power_state_controller() psc.power_off() if usb_state: self.servo.switch_usbkey(usb_state) psc.power_on(psc.REC_ON) def _disable_rec_mode_and_reboot(self, usb_state=None): """Disable the rec mode and reboot. It is achieved by calling power state controller to do a normal power on. """ psc = self.servo.get_power_state_controller() psc.power_off() time.sleep(self.faft_config.ec_boot_to_pwr_button) psc.power_on(psc.REC_OFF) def _enable_dev_mode_and_reboot(self): """Switch to developer mode and reboot.""" raise NotImplementedError def _enable_normal_mode_and_reboot(self): """Switch to normal mode and reboot.""" raise NotImplementedError # Redirects the following methods to FwBypasser def bypass_dev_mode(self): """Bypass the dev mode firmware logic to boot internal image.""" logging.info("-[bypass_dev_mode]-") self.bypasser.bypass_dev_mode() def bypass_dev_boot_usb(self): """Bypass the dev mode firmware logic to boot USB.""" logging.info("-[bypass_dev_boot_usb]-") self.bypasser.bypass_dev_boot_usb() def bypass_rec_mode(self): """Bypass the rec mode firmware logic to boot USB.""" logging.info("-[bypass_rec_mode]-") self.bypasser.bypass_rec_mode() def trigger_dev_to_rec(self): """Trigger to the rec mode from the dev screen.""" self.bypasser.trigger_dev_to_rec() def trigger_rec_to_dev(self): """Trigger to the dev mode from the rec screen.""" self.bypasser.trigger_rec_to_dev() def trigger_dev_to_normal(self): """Trigger to the normal mode from the dev screen.""" self.bypasser.trigger_dev_to_normal() def wait_for_client(self, timeout=180): """Wait for the client to come back online. New remote processes will be launched if their used flags are enabled. @param timeout: Time in seconds to wait for the client SSH daemon to come up. @raise ConnectionError: Failed to connect DUT. """ logging.info("-[FAFT]-[ start wait_for_client ]---") # Wait for the system to respond to ping before attempting ssh if not self.client_host.ping_wait_up(timeout): logging.warning("-[FAFT]-[ system did not respond to ping ]") if self.client_host.wait_up(timeout): # Check the FAFT client is avaiable. self.faft_client.system.is_available() # Stop update-engine as it may change firmware/kernel. self.faft_framework._stop_service('update-engine') else: logging.error('wait_for_client() timed out.') raise ConnectionError() logging.info("-[FAFT]-[ end wait_for_client ]-----") def wait_for_client_offline(self, timeout=60, orig_boot_id=None): """Wait for the client to come offline. @param timeout: Time in seconds to wait the client to come offline. @param orig_boot_id: A string containing the original boot id. @raise ConnectionError: Failed to wait DUT offline. """ # When running against panther, we see that sometimes # ping_wait_down() does not work correctly. There needs to # be some investigation to the root cause. # If we sleep for 120s before running get_boot_id(), it # does succeed. But if we change this to ping_wait_down() # there are implications on the wait time when running # commands at the fw screens. if not self.client_host.ping_wait_down(timeout): if orig_boot_id and self.client_host.get_boot_id() != orig_boot_id: logging.warn('Reboot done very quickly.') return raise ConnectionError() class _PhysicalButtonSwitcher(_BaseModeSwitcher): """Class that switches firmware mode via physical button.""" def _enable_dev_mode_and_reboot(self): """Switch to developer mode and reboot.""" self.servo.enable_development_mode() self.faft_client.system.run_shell_command( 'chromeos-firmwareupdate --mode todev && reboot') def _enable_normal_mode_and_reboot(self): """Switch to normal mode and reboot.""" self.servo.disable_development_mode() self.faft_client.system.run_shell_command( 'chromeos-firmwareupdate --mode tonormal && reboot') class _KeyboardDevSwitcher(_BaseModeSwitcher): """Class that switches firmware mode via keyboard combo.""" def _enable_dev_mode_and_reboot(self): """Switch to developer mode and reboot.""" logging.info("Enabling keyboard controlled developer mode") # Rebooting EC with rec mode on. Should power on AP. # Plug out USB disk for preventing recovery boot without warning self._enable_rec_mode_and_reboot(usb_state='host') self.wait_for_client_offline() self.bypasser.trigger_rec_to_dev() def _enable_normal_mode_and_reboot(self): """Switch to normal mode and reboot.""" logging.info("Disabling keyboard controlled developer mode") self._disable_rec_mode_and_reboot() self.wait_for_client_offline() self.bypasser.trigger_dev_to_normal() class _JetstreamSwitcher(_BaseModeSwitcher): """Class that switches firmware mode in Jetstream devices.""" def _enable_dev_mode_and_reboot(self): """Switch to developer mode and reboot.""" logging.info("Enabling Jetstream developer mode") self._enable_rec_mode_and_reboot(usb_state='host') self.wait_for_client_offline() self.bypasser.trigger_rec_to_dev() def _enable_normal_mode_and_reboot(self): """Switch to normal mode and reboot.""" logging.info("Disabling Jetstream developer mode") self.servo.disable_development_mode() self._enable_rec_mode_and_reboot(usb_state='host') time.sleep(self.faft_config.firmware_screen) self._disable_rec_mode_and_reboot(usb_state='host') class _RyuSwitcher(_BaseModeSwitcher): """Class that switches firmware mode via physical button.""" FASTBOOT_OEM_DELAY = 10 RECOVERY_TIMEOUT = 2400 RECOVERY_SETUP = 60 ANDROID_BOOTUP = 600 FWTOOL_STARTUP_DELAY = 30 def wait_for_client(self, timeout=180): """Wait for the client to come back online. New remote processes will be launched if their used flags are enabled. @param timeout: Time in seconds to wait for the client SSH daemon to come up. @raise ConnectionError: Failed to connect DUT. """ if not self.faft_client.system.wait_for_client(timeout): raise ConnectionError() # there's a conflict between fwtool and crossystem trying to access # the nvram after the OS boots up. Temporarily put a hard wait of # 30 seconds to try to wait for fwtool to finish up. time.sleep(self.FWTOOL_STARTUP_DELAY) def wait_for_client_offline(self, timeout=60, orig_boot_id=None): """Wait for the client to come offline. @param timeout: Time in seconds to wait the client to come offline. @param orig_boot_id: A string containing the original boot id. @raise ConnectionError: Failed to wait DUT offline. """ # TODO: Add a way to check orig_boot_id if not self.faft_client.system.wait_for_client_offline(timeout): raise ConnectionError() def print_recovery_warning(self): """Print recovery warning""" logging.info("***") logging.info("*** Entering recovery mode. This may take awhile ***") logging.info("***") # wait a minute for DUT to get settled into wipe stage time.sleep(self.RECOVERY_SETUP) def is_fastboot_mode(self): """Return True if DUT in fastboot mode, False otherwise""" result = self.faft_client.host.run_shell_command_get_output( 'fastboot devices') if not result: return False else: return True def wait_for_client_fastboot(self, timeout=30): """Wait for the client to come online in fastboot mode @param timeout: Time in seconds to wait the client @raise ConnectionError: Failed to wait DUT offline. """ utils.wait_for_value(self.is_fastboot_mode, True, timeout_sec=timeout) def _run_cmd(self, args): """Wrapper for run_shell_command For Process creation """ return self.faft_client.host.run_shell_command(args) def _enable_dev_mode_and_reboot(self): """Switch to developer mode and reboot.""" logging.info("Entering RyuSwitcher: _enable_dev_mode_and_reboot") try: self.faft_client.system.run_shell_command('reboot bootloader') self.wait_for_client_fastboot() process = Process( target=self._run_cmd, args=('fastboot oem unlock',)) process.start() # need a slight delay to give the ap time to get into valid state time.sleep(self.FASTBOOT_OEM_DELAY) self.servo.power_key(self.faft_config.hold_pwr_button_poweron) process.join() self.print_recovery_warning() self.wait_for_client_fastboot(self.RECOVERY_TIMEOUT) self.faft_client.host.run_shell_command('fastboot continue') self.wait_for_client(self.ANDROID_BOOTUP) # need to reset DUT into clean state except shell_wrapper.ShellError: raise error.TestError('Error executing shell command') except ConnectionError: raise error.TestError('Timed out waiting for DUT to exit recovery') except: raise error.TestError('Unexpected Exception: %s' % sys.exc_info()[0]) logging.info("Exiting RyuSwitcher: _enable_dev_mode_and_reboot") def _enable_normal_mode_and_reboot(self): """Switch to normal mode and reboot.""" try: self.faft_client.system.run_shell_command('reboot bootloader') self.wait_for_client_fastboot() process = Process( target=self._run_cmd, args=('fastboot oem lock',)) process.start() # need a slight delay to give the ap time to get into valid state time.sleep(self.FASTBOOT_OEM_DELAY) self.servo.power_key(self.faft_config.hold_pwr_button_poweron) process.join() self.print_recovery_warning() self.wait_for_client_fastboot(self.RECOVERY_TIMEOUT) self.faft_client.host.run_shell_command('fastboot continue') self.wait_for_client(self.ANDROID_BOOTUP) # need to reset DUT into clean state except shell_wrapper.ShellError: raise error.TestError('Error executing shell command') except ConnectionError: raise error.TestError('Timed out waiting for DUT to exit recovery') except: raise error.TestError('Unexpected Exception: %s' % sys.exc_info()[0]) logging.info("Exiting RyuSwitcher: _enable_normal_mode_and_reboot") def create_mode_switcher(faft_framework): """Creates a proper mode switcher. @param faft_framework: The main FAFT framework object. """ switcher_type = faft_framework.faft_config.mode_switcher_type if switcher_type == 'physical_button_switcher': logging.info('Create a PhysicalButtonSwitcher') return _PhysicalButtonSwitcher(faft_framework) elif switcher_type == 'keyboard_dev_switcher': logging.info('Create a KeyboardDevSwitcher') return _KeyboardDevSwitcher(faft_framework) elif switcher_type == 'jetstream_switcher': logging.info('Create a JetstreamSwitcher') return _JetstreamSwitcher(faft_framework) elif switcher_type == 'ryu_switcher': logging.info('Create a RyuSwitcher') return _RyuSwitcher(faft_framework) else: raise NotImplementedError('Not supported mode_switcher_type: %s', switcher_type)