1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10
11class PDConsoleUtils(object):
12    """ Provides a set of methods common to USB PD FAFT tests
13
14    Each instance of this class is associated with a particular
15    servo UART console. USB PD tests will typically use the console
16    command 'pd' and its subcommands to control/monitor Type C PD
17    connections. The servo object used for UART operations is
18    passed in and stored when this object is created.
19
20    """
21
22    SRC_CONNECT = 'SRC_READY'
23    SNK_CONNECT = 'SNK_READY'
24    SRC_DISC = 'SRC_DISCONNECTED'
25    SNK_DISC = 'SNK_DISCONNECTED'
26    PD_MAX_PORTS = 2
27    CONNECT_TIME = 4
28
29    # dualrole input/ouput values
30    DUALROLE_QUERY_DELAY = 0.25
31    dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
32    dualrole_cmd = ['on', 'off', 'sink', 'source']
33    dualrole_resp = ['on', 'off', 'force sink', 'force source']
34
35    # Dictionary for 'pd 0/1 state' parsing
36    PD_STATE_DICT = {
37        'port': 'Port\s+([\w]+)',
38        'role': 'Role:\s+([\w]+-[\w]+)',
39        'pd_state': 'State:\s+([\w]+_[\w]+)',
40        'flags': 'Flags:\s+([\w]+)',
41        'polarity': '(CC\d)'
42    }
43
44    # Dictionary for PD control message types
45    PD_CONTROL_MSG_MASK = 0x1f
46    PD_CONTROL_MSG_DICT = {
47        'GoodCRC': 1,
48        'GotoMin': 2,
49        'Accept': 3,
50        'Reject': 4,
51        'Ping': 5,
52        'PS_RDY': 6,
53        'Get_Source_Cap': 7,
54        'Get_Sink_Cap': 8,
55        'DR_Swap': 9,
56        'PR_Swap': 10,
57        'VCONN_Swap': 11,
58        'Wait': 12,
59        'Soft_Reset': 13
60    }
61
62    # Dictionary for PD firmware state flags
63    PD_STATE_FLAGS_DICT = {
64        'power_swap': 1 << 1,
65        'data_swap': 1 << 2,
66        'data_swap_active': 1 << 3,
67        'vconn_on': 1 << 12
68    }
69
70    def __init__(self, console):
71        """ Console can be either usbpd, ec, or plankton_ec UART
72        This object with then be used by the class which creates
73        the PDConsoleUtils class to send/receive commands to UART
74        """
75        # save console for UART access functions
76        self.console = console
77
78    def send_pd_command(self, cmd):
79        """Send command to PD console UART
80
81        @param cmd: pd command string
82        """
83        self.console.send_command(cmd)
84
85    def send_pd_command_get_output(self, cmd, regexp):
86        """Send command to PD console, wait for response
87
88        @param cmd: pd command string
89        @param regexp: regular expression for desired output
90        """
91        return self.console.send_command_get_output(cmd, regexp)
92
93    def verify_pd_console(self):
94        """Verify that PD commands exist on UART console
95
96        Send 'help' command to UART console
97        @returns: True if 'pd' is found, False if not
98        """
99
100        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
101        if l[0][1] == 'pd':
102            return True
103        else:
104            return False
105
106    def execute_pd_state_cmd(self, port):
107        """Get PD state for specified channel
108
109        pd 0/1 state command gives produces 5 fields. The full response
110        line is captured and then parsed to extract each field to fill
111        the dict containing port, polarity, role, pd_state, and flags.
112
113        @param port: Type C PD port 0 or 1
114
115        @returns: A dict with the 5 fields listed above
116        """
117        cmd = 'pd'
118        subcmd = 'state'
119        pd_cmd = cmd +" " + str(port) + " " + subcmd
120        # Two FW versions for this command, get full line.
121        m = self.send_pd_command_get_output(pd_cmd,
122                                            ['(Port.*) - (Role:.*)\r'])
123
124        # Extract desired values from result string
125        state_result = {}
126        for key, regexp in self.PD_STATE_DICT.iteritems():
127            value = re.search(regexp, m[0][0])
128            if value:
129                state_result[key] = value.group(1)
130            else:
131                raise error.TestFail('pd 0/1 state: %r value not found' % (key))
132
133        return state_result
134
135    def get_pd_state(self, port):
136        """Get the current PD state
137
138        @param port: Type C PD port 0/1
139        @returns: current pd state
140        """
141
142        pd_dict = self.execute_pd_state_cmd(port)
143        return pd_dict['pd_state']
144
145    def get_pd_port(self, port):
146        """Get the current PD port
147
148        @param port: Type C PD port 0/1
149        @returns: current pd state
150        """
151        pd_dict = self.execute_pd_state_cmd(port)
152        return pd_dict['port']
153
154    def get_pd_role(self, port):
155        """Get the current PD power role (source or sink)
156
157        @param port: Type C PD port 0/1
158        @returns: current pd state
159        """
160        pd_dict = self.execute_pd_state_cmd(port)
161        return pd_dict['role']
162
163    def get_pd_flags(self, port):
164        """Get the current PD flags
165
166        @param port: Type C PD port 0/1
167        @returns: current pd state
168        """
169        pd_dict = self.execute_pd_state_cmd(port)
170        return pd_dict['flags']
171
172    def get_pd_dualrole(self):
173        """Get the current PD dualrole setting
174
175        @returns: current PD dualrole setting
176        """
177        cmd = 'pd dualrole'
178        dual_list = self.send_pd_command_get_output(cmd,
179                                ['dual-role toggling:\s+([\w ]+)'])
180        return dual_list[0][1]
181
182    def set_pd_dualrole(self, value):
183        """Set pd dualrole
184
185        It can be set to either:
186        1. on
187        2. off
188        3. snk (force sink mode)
189        4. src (force source mode)
190        After setting, the current value is read to confirm that it
191        was set properly.
192
193        @param value: One of the 4 options listed
194        """
195        # Get string required for console command
196        dual_index = self.dual_index[value]
197        # Create console command
198        cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
199        self.console.send_command(cmd)
200        time.sleep(self.DUALROLE_QUERY_DELAY)
201        # Get current setting to verify that command was successful
202        dual = self.get_pd_dualrole()
203        # If it doesn't match, then raise error
204        if dual != self.dualrole_resp[dual_index]:
205            raise error.TestFail("dualrole error: " +
206                                 self.dualrole_resp[dual_index] + " != "+dual)
207
208    def query_pd_connection(self):
209        """Determine if PD connection is present
210
211        Try the 'pd 0/1 state' command and see if it's in either
212        expected state of a conneciton. Record the port number
213        that has an active connection
214
215        @returns: dict with params port, connect, and state
216        """
217        status = {}
218        port = 0;
219        status['connect'] = False
220        status['port'] = port
221        state = self.get_pd_state(port)
222        # Check port 0 first
223        if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
224            status['connect'] = True
225            status['role'] = state
226        else:
227            port = 1
228            status['port'] = port
229            state = self.get_pd_state(port)
230            # Check port 1
231            if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
232                status['connect'] = True
233                status['role'] = state
234
235        return status
236
237    def disable_pd_console_debug(self):
238        """Turn off PD console debug
239
240        """
241        cmd = 'pd dump 0'
242        self.send_pd_command(cmd)
243
244    def enable_pd_console_debug(self):
245        """Enable PD console debug level 1
246
247        """
248        cmd = 'pd dump 1'
249        self.send_pd_command(cmd)
250
251    def is_pd_flag_set(self, port, key):
252        """Test a bit in PD protocol state flags
253
254        The flag word contains various PD protocol state information.
255        This method allows for a specific flag to be tested.
256
257        @param port: Port which has the active PD connection
258        @param key: dict key to retrieve the flag bit mapping
259
260        @returns True if the bit to be tested is set
261        """
262        pd_flags = self.get_pd_flags(port)
263        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
264
265    def is_pd_connected(self, port):
266        """Check if a PD connection is active
267
268        @param port: port to be used for pd console commands
269
270        @returns True if port is in connected state
271        """
272        state = self.get_pd_state(port)
273        return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
274
275    def is_pd_dual_role_enabled(self):
276        """Check if a PD device is in dualrole mode
277
278        @returns True is dualrole mode is active, false otherwise
279        """
280        drp = self.get_pd_dualrole()
281        return bool(drp == self.dualrole_resp[self.dual_index['on']])
282
283
284class PDConnectionUtils(PDConsoleUtils):
285    """Provides a set of methods common to USB PD FAFT tests
286
287    This Class is used for PD utility methods that require access
288    to both Plankton and DUT PD consoles.
289
290    """
291
292    def __init__(self, dut_console, plankton_console):
293        """
294        @param dut_console: PD console object for DUT
295        @param plankton_console: PD console object for Plankton
296        """
297        # save console for DUT PD UART access functions
298        self.dut_console = dut_console
299        # save console for Plankton UART access functions
300        self.plankton_console = plankton_console
301        super(PDConnectionUtils, self).__init__(dut_console)
302
303    def _verify_plankton_connection(self, port):
304        """Verify DUT to Plankton PD connection
305
306        This method checks for a Plankton PD connection for the
307        given port by first verifying if a PD connection is present.
308        If found, then it uses a Plankton feature to force a PD disconnect.
309        If the port is no longer in the connected state, and following
310        a delay, is found to be back in the connected state, then
311        a DUT pd to Plankton connection is verified.
312
313        @param port: DUT pd port to test
314
315        @returns True if DUT to Plankton pd connection is verified
316        """
317        DISCONNECT_CHECK_TIME = 0.5
318        DISCONNECT_TIME_SEC = 2
319        # plankton console command to force PD disconnect
320        disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
321        # Only check for Plankton if DUT has active PD connection
322        if self.dut_console.is_pd_connected(port):
323            # Attempt to force PD disconnection
324            self.plankton_console.send_pd_command(disc_cmd)
325            time.sleep(DISCONNECT_CHECK_TIME)
326            # Verify that DUT PD port is no longer connected
327            if self.dut_console.is_pd_connected(port) == False:
328                # Wait for disconnect timer and give time to reconnect
329                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
330                if self.dut_console.is_pd_connected(port):
331                    logging.info('Plankton connection verfied on port %d', port)
332                    return True
333            else:
334                # Could have disconnected other port, allow it to reconnect
335                # before exiting.
336                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
337        return False
338
339    def find_dut_to_plankton_connection(self):
340        """Find the PD port which is connected to Plankton
341
342        @returns DUT pd port number if found, None otherwise
343        """
344        for port in xrange(self.dut_console.PD_MAX_PORTS):
345            # Check for DUT to Plankton connection on port
346            if self._verify_plankton_connection(port):
347                # Plankton PD connection found so exit
348                return port
349        return None
350