1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7
8from collections import defaultdict
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
12
13class firmware_PDProtocol(FirmwareTest):
14    """
15    Servo based USB PD protocol test.
16
17    A charger must be connected to the DUT for this test.
18
19    This test checks that when an appropriate zinger charger is connected that
20    the PD is properly negotiated in dev mode and when booted from a test image
21    through recovery that the PD is not negotiated.
22
23    Example:
24    PD Successfully negotiated
25    - ectool usbpdpower should output Charger PD
26
27    PD not negotiated
28    - ectool usbpdpower should not output Charger PD
29
30    """
31    version = 1
32
33    NEGOTIATED_PATTERN = 'Charger PD'
34    PD_NOT_SUPPORTED_PATTERN = 'INVALID_COMMAND'
35
36    ECTOOL_CMD_DICT = defaultdict(lambda: 'ectool usbpdpower')
37
38    def initialize(self, host, cmdline_args):
39        """Initialize the test"""
40        super(firmware_PDProtocol, self).initialize(host, cmdline_args)
41
42        self.ECTOOL_CMD_DICT['samus'] = 'ectool --dev=1 usbpdpower'
43
44        self.current_board = self.servo.get_board();
45
46        self.check_if_pd_supported()
47        self.switcher.setup_mode('dev')
48        # The USB disk is used for recovery. But this test wants a fine-grained
49        # control, i.e. swapping the role just before booting into recovery,
50        # not swapping here. So set used_for_recovery=False.
51        self.setup_usbkey(usbkey=True, host=False, used_for_recovery=False)
52
53        self.original_dev_boot_usb = self.faft_client.system.get_dev_boot_usb()
54        logging.info('Original dev_boot_usb value: %s',
55                     str(self.original_dev_boot_usb))
56
57    def cleanup(self):
58        """Cleanup the test"""
59        self.ensure_dev_internal_boot(self.original_dev_boot_usb)
60        super(firmware_PDProtocol, self).cleanup()
61
62    def check_if_pd_supported(self):
63        """ Checks if the DUT responds to ectool usbpdpower and skips the test
64        if it isn't supported on the device.
65        """
66        output = self.run_command(self.ECTOOL_CMD_DICT[self.current_board])
67
68        if (not output or
69            self.check_ec_output(output, self.PD_NOT_SUPPORTED_PATTERN)):
70            raise error.TestNAError("PD not supported skipping test.")
71
72    def boot_to_recovery(self):
73        """Boot device into recovery mode."""
74        logging.info('Reboot into Recovery...')
75        self.switcher.reboot_to_mode(to_mode='rec')
76
77        self.check_state((self.checkers.crossystem_checker,
78                          {'mainfw_type': 'recovery'}))
79
80    def run_command(self, command):
81        """Runs the specified command and returns the output
82        as a list of strings.
83
84        @param command: The command to run on the DUT
85        @return A list of strings of the command output
86        """
87        logging.info('Command to run: %s', command)
88
89        output = self.faft_client.system.run_shell_command_get_output(command)
90
91        logging.info('Command output: %s', output)
92
93        return output
94
95    def check_ec_output(self, output, pattern):
96        """Checks if any line in the output matches the given pattern.
97
98        @param output: A list of strings containg the output to search
99        @param pattern: The regex to search the output for
100
101        @return True upon first match found or False
102        """
103        logging.info('Checking %s for %s.', output, pattern)
104
105        for line in output:
106            if bool(re.search(pattern, line)):
107                return True
108
109        return False
110
111
112    def run_once(self):
113        """Main test logic"""
114        self.ensure_dev_internal_boot(self.original_dev_boot_usb)
115        output = self.run_command(self.ECTOOL_CMD_DICT[self.current_board])
116
117        if not self.check_ec_output(output, self.NEGOTIATED_PATTERN):
118            raise error.TestFail(
119                'ectool usbpdpower output %s did not match %s',
120                (output, self.NEGOTIATED_PATTERN))
121
122
123        self.set_servo_v4_role_to_snk()
124        self.boot_to_recovery()
125        output = self.run_command(self.ECTOOL_CMD_DICT[self.current_board])
126
127        if self.check_ec_output(output, self.NEGOTIATED_PATTERN):
128            raise error.TestFail(
129                'ectool usbpdpower output %s matched %s',
130                (output, self.NEGOTIATED_PATTERN))
131
132