1# Lint as: python2, python3
2# Copyright 2018 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Helper class for managing charging the DUT with Servo v4."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import logging
13from six.moves import range
14import time
15
16from autotest_lib.client.common_lib import error
17from autotest_lib.client.common_lib.cros import retry
18
19# Base delay time in seconds for Servo role change and PD negotiation.
20_DELAY_SEC = 0.1
21# Total delay time in minutes for Servo role change and PD negotiation.
22_TIMEOUT_MIN = 0.3
23# Exponential backoff for Servo role change and PD negotiation.
24_BACKOFF = 2
25# Number of attempts to recover Servo v4.
26_RETRYS = 3
27# Seconds to wait after resetting the role on a recovery attempt
28# before trying to set it to the intended role again.
29_RECOVERY_WAIT_SEC = 1
30# Delay to wait before polling whether the role as been changed successfully.
31_ROLE_SETTLING_DELAY_SEC = 1
32# Timeout in minutes to attempt checking AC information over ssh.
33# Ethernet connection through the v4 flickers on role change. The usb
34# adapter needs to reenumerate and the DUT reconnect before information can be
35# queried. This delay has proven sufficient to overcome this in the current
36# implementation.
37_ETH_REENUMERATE_TIMEOUT_MIN = 1
38
39
40def _invert_role(role):
41    """Helper to invert the role.
42
43    @param role: role to invert
44
45    @returns:
46      'src' if |role| is 'snk'
47      'snk' if |role| is 'src'
48    """
49    return 'src' if role == 'snk' else 'snk'
50
51class ServoV4ChargeManager(object):
52    """A helper class for managing charging the DUT with Servo v4."""
53
54    def __init__(self, host, servo):
55        """Check for correct Servo setup.
56
57        Make sure that Servo is v4 and can manage charging. Make sure that DUT
58        responds to Servo charging commands. Restore Servo v4 power role after
59        sanity check.
60
61        @param host: CrosHost object representing the DUT or None.
62                     If host is None, then the is_ac_connected check on the
63                     host object is skipped.
64        @param servo: a proxy for servod.
65        """
66        super(ServoV4ChargeManager, self).__init__()
67        self._host = host
68        self._servo = servo
69        if not self._servo.supports_built_in_pd_control():
70            raise error.TestNAError('Servo setup does not support PD control. '
71                                    'Check logs for details.')
72
73        self._original_role = self._servo.get('servo_v4_role')
74        if self._original_role == 'snk':
75            self.start_charging()
76            self.stop_charging()
77        elif self._original_role == 'src':
78            self.stop_charging()
79            self.start_charging()
80        else:
81            raise error.TestNAError('Unrecognized Servo v4 power role: %s.' %
82                                    self._original_role)
83
84    # TODO(b/129882930): once both sides are stable, remove the _retry_wrapper
85    # wrappers as they aren't needed anymore. The current motivation for the
86    # retry loop in the autotest framework is to have a 'stable' library i.e.
87    # retries but also a mechanism and and easy to remove bridge once the bug
88    # is fixed, and we don't require the bandaid anymore.
89
90    def _retry_wrapper(self, role, verify):
91        """Try up to |_RETRYS| times to set the v4 to |role|.
92
93        @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if
94                     'snk' disconnect DUT from AC power.
95        @param verify: bool to verify that charging started/stopped.
96
97        @returns: number of retries needed for success
98        """
99        for retry in range(_RETRYS):
100            try:
101                self._change_role(role, verify)
102                return retry
103            except error.TestError as e:
104                if retry < _RETRYS - 1:
105                    # Ensure this retry loop and logging isn't run on the
106                    # last iteration.
107                    logging.warning('Failed to set to %s %d times. %s '
108                                    'Trying to cycle through %s to '
109                                    'recover.', role, retry + 1, str(e),
110                                    _invert_role(role))
111                    # Cycle through the other state before retrying. Do not
112                    # verify as this is strictly a recovery mechanism - sleep
113                    # instead.
114                    self._change_role(_invert_role(role), verify=False)
115                    time.sleep(_RECOVERY_WAIT_SEC)
116        logging.error('Giving up on %s.', role)
117        raise e
118
119    def stop_charging(self, verify=True):
120        """Cut off AC power supply to DUT with Servo.
121
122        @param verify: whether to verify that charging stopped.
123
124        @returns: number of retries needed for success
125        """
126        return self._retry_wrapper('snk', verify)
127
128    def start_charging(self, verify=True):
129        """Connect AC power supply to DUT with Servo.
130
131        @param verify: whether to verify that charging started.
132
133        @returns: number of retries needed for success
134        """
135        return self._retry_wrapper('src', verify)
136
137    def restore_original_setting(self, verify=True):
138        """Restore Servo to original charging setting.
139
140        @param verify: whether to verify that original role was restored.
141        """
142        self._retry_wrapper(self._original_role, verify)
143
144    def _change_role(self, role, verify=True):
145        """Change Servo PD role and check if DUT responded accordingly.
146
147        @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if
148                     'snk' disconnect DUT from AC power.
149        @param verify: bool to verify that charging started/stopped.
150
151        @raises error.TestError: if the role did not change successfully.
152        """
153        self._servo.set_nocheck('servo_v4_role', role)
154        # Sometimes the role reverts quickly. Add a short delay to let the new
155        # role stabilize.
156        time.sleep(_ROLE_SETTLING_DELAY_SEC)
157
158        if not verify:
159            return
160
161        @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN,
162                     delay_sec=_DELAY_SEC, backoff=_BACKOFF)
163        def check_servo_role(role):
164            """Check if servo role is as expected, if not, retry."""
165            if self._servo.get('servo_v4_role') != role:
166                raise error.TestError('Servo v4 failed to set its PD role to '
167                                      '%s.' % role)
168        check_servo_role(role)
169
170        connected = True if role == 'src' else False
171
172        @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN,
173                     delay_sec=_DELAY_SEC, backoff=_BACKOFF)
174        def check_ac_connected(connected):
175            """Check if the EC believes an AC charger is connected."""
176            if not self._servo.has_control('charger_connected'):
177                # TODO(coconutruben): remove this check once labs have the
178                # latest hdctools with the required control.
179                logging.warn('Could not verify %r control as the '
180                              'control is not available on servod.',
181                              'charger_connected')
182                return
183            ec_opinion = self._servo.get('charger_connected')
184            if ec_opinion != connected:
185                str_lookup = {True: 'connected', False: 'disconnected'}
186                msg = ('EC thinks charger is %s but it should be %s.'
187                       % (str_lookup[ec_opinion],
188                          str_lookup[connected]))
189                raise error.TestError(msg)
190
191        check_ac_connected(connected)
192
193        @retry.retry(error.TestError, timeout_min=_ETH_REENUMERATE_TIMEOUT_MIN,
194                     delay_sec=_DELAY_SEC, backoff=_BACKOFF)
195        def check_host_ac(connected):
196            """Check if DUT AC power is as expected, if not, retry."""
197            if self._host.is_ac_connected() != connected:
198                intent = 'connect' if connected else 'disconnect'
199                raise error.TestError('DUT failed to %s AC power.'% intent)
200
201        if self._host and self._host.is_up_fast():
202            # If the DUT has been charging in S3/S5/G3, cannot verify.
203            check_host_ac(connected)
204