1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10
11class PDConsoleUtils(object):
12    """ Provides a set of methods common to USB PD FAFT tests
13
14    Each instance of this class is associated with a particular
15    servo UART console. USB PD tests will typically use the console
16    command 'pd' and its subcommands to control/monitor Type C PD
17    connections. The servo object used for UART operations is
18    passed in and stored when this object is created.
19
20    """
21
22    SRC_CONNECT = 'SRC_READY'
23    SNK_CONNECT = 'SNK_READY'
24    SRC_DISC = 'SRC_DISCONNECTED'
25    SNK_DISC = 'SNK_DISCONNECTED'
26    PD_MAX_PORTS = 2
27    CONNECT_TIME = 4
28
29    # dualrole input/ouput values
30    DUALROLE_QUERY_DELAY = 0.25
31    dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3}
32    dualrole_cmd = ['on', 'off', 'sink', 'source']
33    dualrole_resp = ['on', 'off', 'force sink', 'force source']
34
35    # Dictionary for 'pd 0/1 state' parsing
36    PD_STATE_DICT = {
37        'port': 'Port\s+([\w]+)',
38        'role': 'Role:\s+([\w]+-[\w]+)',
39        'pd_state': 'State:\s+([\w]+_[\w]+)',
40        'flags': 'Flags:\s+([\w]+)',
41        'polarity': '(CC\d)'
42    }
43
44    # Dictionary for PD control message types
45    PD_CONTROL_MSG_MASK = 0x1f
46    PD_CONTROL_MSG_DICT = {
47        'GoodCRC': 1,
48        'GotoMin': 2,
49        'Accept': 3,
50        'Reject': 4,
51        'Ping': 5,
52        'PS_RDY': 6,
53        'Get_Source_Cap': 7,
54        'Get_Sink_Cap': 8,
55        'DR_Swap': 9,
56        'PR_Swap': 10,
57        'VCONN_Swap': 11,
58        'Wait': 12,
59        'Soft_Reset': 13
60    }
61
62    # Dictionary for PD firmware state flags
63    PD_STATE_FLAGS_DICT = {
64        'power_swap': 1 << 1,
65        'data_swap': 1 << 2,
66        'data_swap_active': 1 << 3,
67        'vconn_on': 1 << 12
68    }
69
70    def __init__(self, console):
71        """ Console can be either usbpd, ec, or plankton_ec UART
72        This object with then be used by the class which creates
73        the PDConsoleUtils class to send/receive commands to UART
74        """
75        # save console for UART access functions
76        self.console = console
77
78    def send_pd_command(self, cmd):
79        """Send command to PD console UART
80
81        @param cmd: pd command string
82        """
83        self.console.send_command(cmd)
84
85    def send_pd_command_get_output(self, cmd, regexp):
86        """Send command to PD console, wait for response
87
88        @param cmd: pd command string
89        @param regexp: regular expression for desired output
90        """
91        return self.console.send_command_get_output(cmd, regexp)
92
93    def send_pd_command_get_reply_msg(self, cmd):
94        """Send PD protocol msg, get PD control msg reply
95
96        The PD console debug mode is enabled prior to sending
97        a pd protocol message. This allows the
98        control message reply to be extracted. The debug mode
99        is disabled prior to exiting.
100
101        @param cmd: pd command to issue to the UART console
102
103        @returns: PD control header message
104        """
105        # Enable PD console debug mode to show control messages
106        self.enable_pd_console_debug()
107        m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
108        ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
109        self.disable_pd_console_debug()
110        return ctrl_msg
111
112    def verify_pd_console(self):
113        """Verify that PD commands exist on UART console
114
115        Send 'help' command to UART console
116        @returns: True if 'pd' is found, False if not
117        """
118
119        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
120        if l[0][1] == 'pd':
121            return True
122        else:
123            return False
124
125    def execute_pd_state_cmd(self, port):
126        """Get PD state for specified channel
127
128        pd 0/1 state command gives produces 5 fields. The full response
129        line is captured and then parsed to extract each field to fill
130        the dict containing port, polarity, role, pd_state, and flags.
131
132        @param port: Type C PD port 0 or 1
133
134        @returns: A dict with the 5 fields listed above
135        """
136        cmd = 'pd'
137        subcmd = 'state'
138        pd_cmd = cmd +" " + str(port) + " " + subcmd
139        # Two FW versions for this command, get full line.
140        m = self.send_pd_command_get_output(pd_cmd,
141                                            ['(Port.*) - (Role:.*)\n'])
142
143        # Extract desired values from result string
144        state_result = {}
145        for key, regexp in self.PD_STATE_DICT.iteritems():
146            value = re.search(regexp, m[0][0])
147            if value:
148                state_result[key] = value.group(1)
149            else:
150                raise error.TestFail('pd 0/1 state: %r value not found' % (key))
151
152        return state_result
153
154    def get_pd_state(self, port):
155        """Get the current PD state
156
157        @param port: Type C PD port 0/1
158        @returns: current pd state
159        """
160
161        pd_dict = self.execute_pd_state_cmd(port)
162        return pd_dict['pd_state']
163
164    def get_pd_port(self, port):
165        """Get the current PD port
166
167        @param port: Type C PD port 0/1
168        @returns: current pd state
169        """
170        pd_dict = self.execute_pd_state_cmd(port)
171        return pd_dict['port']
172
173    def get_pd_role(self, port):
174        """Get the current PD power role (source or sink)
175
176        @param port: Type C PD port 0/1
177        @returns: current pd state
178        """
179        pd_dict = self.execute_pd_state_cmd(port)
180        return pd_dict['role']
181
182    def get_pd_flags(self, port):
183        """Get the current PD flags
184
185        @param port: Type C PD port 0/1
186        @returns: current pd state
187        """
188        pd_dict = self.execute_pd_state_cmd(port)
189        return pd_dict['flags']
190
191    def get_pd_dualrole(self):
192        """Get the current PD dualrole setting
193
194        @returns: current PD dualrole setting
195        """
196        cmd = 'pd dualrole'
197        dual_list = self.send_pd_command_get_output(cmd,
198                                ['dual-role toggling:\s+([\w ]+)'])
199        return dual_list[0][1]
200
201    def set_pd_dualrole(self, value):
202        """Set pd dualrole
203
204        It can be set to either:
205        1. on
206        2. off
207        3. snk (force sink mode)
208        4. src (force source mode)
209        After setting, the current value is read to confirm that it
210        was set properly.
211
212        @param value: One of the 4 options listed
213        """
214        # Get string required for console command
215        dual_index = self.dual_index[value]
216        # Create console command
217        cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index]
218        self.console.send_command(cmd)
219        time.sleep(self.DUALROLE_QUERY_DELAY)
220        # Get current setting to verify that command was successful
221        dual = self.get_pd_dualrole()
222        # If it doesn't match, then raise error
223        if dual != self.dualrole_resp[dual_index]:
224            raise error.TestFail("dualrole error: " +
225                                 self.dualrole_resp[dual_index] + " != "+dual)
226
227    def query_pd_connection(self):
228        """Determine if PD connection is present
229
230        Try the 'pd 0/1 state' command and see if it's in either
231        expected state of a conneciton. Record the port number
232        that has an active connection
233
234        @returns: dict with params port, connect, and state
235        """
236        status = {}
237        port = 0;
238        status['connect'] = False
239        status['port'] = port
240        state = self.get_pd_state(port)
241        # Check port 0 first
242        if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
243            status['connect'] = True
244            status['role'] = state
245        else:
246            port = 1
247            status['port'] = port
248            state = self.get_pd_state(port)
249            # Check port 1
250            if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
251                status['connect'] = True
252                status['role'] = state
253
254        return status
255
256    def swap_power_role(self, port):
257        """Attempt a power role swap
258
259        This method attempts to execute a power role swap. A check
260        is made to ensure that dualrole mode is enabled and that
261        a PD contract is currently established. If both checks pass,
262        then the power role swap command is issued. After a delay,
263        if a PD contract is established and the current state does
264        not equal the starting state, then it was successful.
265
266        @param port: pd port number
267
268        @returns: True if power swap is successful, False otherwise.
269        """
270        # Get starting state
271        if self.is_pd_dual_role_enabled() == False:
272            logging.info('Dualrole Mode not enabled!')
273            return False
274        if self.is_pd_connected(port) == False:
275            logging.info('PD contract not established!')
276            return False
277        current_pr = self.get_pd_state(port)
278        swap_cmd = 'pd %d swap power' % port
279        self.send_pd_command(swap_cmd)
280        time.sleep(self.CONNECT_TIME)
281        new_pr = self.get_pd_state(port)
282        logging.info('Power swap: %s -> %s', current_pr, new_pr)
283        if self.is_pd_connected(port) == False:
284            return False
285        return bool(current_pr != new_pr)
286
287    def disable_pd_console_debug(self):
288        """Turn off PD console debug
289
290        """
291        cmd = 'pd dump 0'
292        self.send_pd_command(cmd)
293
294    def enable_pd_console_debug(self):
295        """Enable PD console debug level 1
296
297        """
298        cmd = 'pd dump 1'
299        self.send_pd_command(cmd)
300
301    def is_pd_flag_set(self, port, key):
302        """Test a bit in PD protocol state flags
303
304        The flag word contains various PD protocol state information.
305        This method allows for a specific flag to be tested.
306
307        @param port: Port which has the active PD connection
308        @param key: dict key to retrieve the flag bit mapping
309
310        @returns True if the bit to be tested is set
311        """
312        pd_flags = self.get_pd_flags(port)
313        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
314
315    def is_pd_connected(self, port):
316        """Check if a PD connection is active
317
318        @param port: port to be used for pd console commands
319
320        @returns True if port is in connected state
321        """
322        state = self.get_pd_state(port)
323        return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
324
325    def is_pd_dual_role_enabled(self):
326        """Check if a PD device is in dualrole mode
327
328        @returns True is dualrole mode is active, false otherwise
329        """
330        drp = self.get_pd_dualrole()
331        return bool(drp == self.dualrole_resp[self.dual_index['on']])
332
333
334class PDConnectionUtils(PDConsoleUtils):
335    """Provides a set of methods common to USB PD FAFT tests
336
337    This Class is used for PD utility methods that require access
338    to both Plankton and DUT PD consoles.
339
340    """
341
342    def __init__(self, dut_console, plankton_console):
343        """
344        @param dut_console: PD console object for DUT
345        @param plankton_console: PD console object for Plankton
346        """
347        # save console for DUT PD UART access functions
348        self.dut_console = dut_console
349        # save console for Plankton UART access functions
350        self.plankton_console = plankton_console
351        super(PDConnectionUtils, self).__init__(dut_console)
352
353    def _verify_plankton_connection(self, port):
354        """Verify DUT to Plankton PD connection
355
356        This method checks for a Plankton PD connection for the
357        given port by first verifying if a PD connection is present.
358        If found, then it uses a Plankton feature to force a PD disconnect.
359        If the port is no longer in the connected state, and following
360        a delay, is found to be back in the connected state, then
361        a DUT pd to Plankton connection is verified.
362
363        @param port: DUT pd port to test
364
365        @returns True if DUT to Plankton pd connection is verified
366        """
367        DISCONNECT_CHECK_TIME = 0.5
368        DISCONNECT_TIME_SEC = 2
369        # plankton console command to force PD disconnect
370        disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
371        # Only check for Plankton if DUT has active PD connection
372        if self.dut_console.is_pd_connected(port):
373            # Attempt to force PD disconnection
374            self.plankton_console.send_pd_command(disc_cmd)
375            time.sleep(DISCONNECT_CHECK_TIME)
376            # Verify that DUT PD port is no longer connected
377            if self.dut_console.is_pd_connected(port) == False:
378                # Wait for disconnect timer and give time to reconnect
379                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
380                if self.dut_console.is_pd_connected(port):
381                    logging.info('Plankton connection verified on port %d',
382                                 port)
383                    return True
384            else:
385                # Could have disconnected other port, allow it to reconnect
386                # before exiting.
387                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
388        return False
389
390    def find_dut_to_plankton_connection(self):
391        """Find the PD port which is connected to Plankton
392
393        @returns DUT pd port number if found, None otherwise
394        """
395        for port in xrange(self.dut_console.PD_MAX_PORTS):
396            # Check for DUT to Plankton connection on port
397            if self._verify_plankton_connection(port):
398                # Plankton PD connection found so exit
399                return port
400        return None
401