1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import re 6import logging 7import time 8 9from autotest_lib.client.common_lib import error 10 11class PDConsoleUtils(object): 12 """ Provides a set of methods common to USB PD FAFT tests 13 14 Each instance of this class is associated with a particular 15 servo UART console. USB PD tests will typically use the console 16 command 'pd' and its subcommands to control/monitor Type C PD 17 connections. The servo object used for UART operations is 18 passed in and stored when this object is created. 19 20 """ 21 22 SRC_CONNECT = 'SRC_READY' 23 SNK_CONNECT = 'SNK_READY' 24 SRC_DISC = 'SRC_DISCONNECTED' 25 SNK_DISC = 'SNK_DISCONNECTED' 26 PD_MAX_PORTS = 2 27 CONNECT_TIME = 4 28 29 # dualrole input/ouput values 30 DUALROLE_QUERY_DELAY = 0.25 31 dual_index = {'on': 0, 'off': 1, 'snk': 2, 'src': 3} 32 dualrole_cmd = ['on', 'off', 'sink', 'source'] 33 dualrole_resp = ['on', 'off', 'force sink', 'force source'] 34 35 # Dictionary for 'pd 0/1 state' parsing 36 PD_STATE_DICT = { 37 'port': 'Port\s+([\w]+)', 38 'role': 'Role:\s+([\w]+-[\w]+)', 39 'pd_state': 'State:\s+([\w]+_[\w]+)', 40 'flags': 'Flags:\s+([\w]+)', 41 'polarity': '(CC\d)' 42 } 43 44 # Dictionary for PD control message types 45 PD_CONTROL_MSG_MASK = 0x1f 46 PD_CONTROL_MSG_DICT = { 47 'GoodCRC': 1, 48 'GotoMin': 2, 49 'Accept': 3, 50 'Reject': 4, 51 'Ping': 5, 52 'PS_RDY': 6, 53 'Get_Source_Cap': 7, 54 'Get_Sink_Cap': 8, 55 'DR_Swap': 9, 56 'PR_Swap': 10, 57 'VCONN_Swap': 11, 58 'Wait': 12, 59 'Soft_Reset': 13 60 } 61 62 # Dictionary for PD firmware state flags 63 PD_STATE_FLAGS_DICT = { 64 'power_swap': 1 << 1, 65 'data_swap': 1 << 2, 66 'data_swap_active': 1 << 3, 67 'vconn_on': 1 << 12 68 } 69 70 def __init__(self, console): 71 """ Console can be either usbpd, ec, or plankton_ec UART 72 This object with then be used by the class which creates 73 the PDConsoleUtils class to send/receive commands to UART 74 """ 75 # save console for UART access functions 76 self.console = console 77 78 def send_pd_command(self, cmd): 79 """Send command to PD console UART 80 81 @param cmd: pd command string 82 """ 83 self.console.send_command(cmd) 84 85 def send_pd_command_get_output(self, cmd, regexp): 86 """Send command to PD console, wait for response 87 88 @param cmd: pd command string 89 @param regexp: regular expression for desired output 90 """ 91 return self.console.send_command_get_output(cmd, regexp) 92 93 def verify_pd_console(self): 94 """Verify that PD commands exist on UART console 95 96 Send 'help' command to UART console 97 @returns: True if 'pd' is found, False if not 98 """ 99 100 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) 101 if l[0][1] == 'pd': 102 return True 103 else: 104 return False 105 106 def execute_pd_state_cmd(self, port): 107 """Get PD state for specified channel 108 109 pd 0/1 state command gives produces 5 fields. The full response 110 line is captured and then parsed to extract each field to fill 111 the dict containing port, polarity, role, pd_state, and flags. 112 113 @param port: Type C PD port 0 or 1 114 115 @returns: A dict with the 5 fields listed above 116 """ 117 cmd = 'pd' 118 subcmd = 'state' 119 pd_cmd = cmd +" " + str(port) + " " + subcmd 120 # Two FW versions for this command, get full line. 121 m = self.send_pd_command_get_output(pd_cmd, 122 ['(Port.*) - (Role:.*)\r']) 123 124 # Extract desired values from result string 125 state_result = {} 126 for key, regexp in self.PD_STATE_DICT.iteritems(): 127 value = re.search(regexp, m[0][0]) 128 if value: 129 state_result[key] = value.group(1) 130 else: 131 raise error.TestFail('pd 0/1 state: %r value not found' % (key)) 132 133 return state_result 134 135 def get_pd_state(self, port): 136 """Get the current PD state 137 138 @param port: Type C PD port 0/1 139 @returns: current pd state 140 """ 141 142 pd_dict = self.execute_pd_state_cmd(port) 143 return pd_dict['pd_state'] 144 145 def get_pd_port(self, port): 146 """Get the current PD port 147 148 @param port: Type C PD port 0/1 149 @returns: current pd state 150 """ 151 pd_dict = self.execute_pd_state_cmd(port) 152 return pd_dict['port'] 153 154 def get_pd_role(self, port): 155 """Get the current PD power role (source or sink) 156 157 @param port: Type C PD port 0/1 158 @returns: current pd state 159 """ 160 pd_dict = self.execute_pd_state_cmd(port) 161 return pd_dict['role'] 162 163 def get_pd_flags(self, port): 164 """Get the current PD flags 165 166 @param port: Type C PD port 0/1 167 @returns: current pd state 168 """ 169 pd_dict = self.execute_pd_state_cmd(port) 170 return pd_dict['flags'] 171 172 def get_pd_dualrole(self): 173 """Get the current PD dualrole setting 174 175 @returns: current PD dualrole setting 176 """ 177 cmd = 'pd dualrole' 178 dual_list = self.send_pd_command_get_output(cmd, 179 ['dual-role toggling:\s+([\w ]+)']) 180 return dual_list[0][1] 181 182 def set_pd_dualrole(self, value): 183 """Set pd dualrole 184 185 It can be set to either: 186 1. on 187 2. off 188 3. snk (force sink mode) 189 4. src (force source mode) 190 After setting, the current value is read to confirm that it 191 was set properly. 192 193 @param value: One of the 4 options listed 194 """ 195 # Get string required for console command 196 dual_index = self.dual_index[value] 197 # Create console command 198 cmd = 'pd dualrole ' + self.dualrole_cmd[dual_index] 199 self.console.send_command(cmd) 200 time.sleep(self.DUALROLE_QUERY_DELAY) 201 # Get current setting to verify that command was successful 202 dual = self.get_pd_dualrole() 203 # If it doesn't match, then raise error 204 if dual != self.dualrole_resp[dual_index]: 205 raise error.TestFail("dualrole error: " + 206 self.dualrole_resp[dual_index] + " != "+dual) 207 208 def query_pd_connection(self): 209 """Determine if PD connection is present 210 211 Try the 'pd 0/1 state' command and see if it's in either 212 expected state of a conneciton. Record the port number 213 that has an active connection 214 215 @returns: dict with params port, connect, and state 216 """ 217 status = {} 218 port = 0; 219 status['connect'] = False 220 status['port'] = port 221 state = self.get_pd_state(port) 222 # Check port 0 first 223 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 224 status['connect'] = True 225 status['role'] = state 226 else: 227 port = 1 228 status['port'] = port 229 state = self.get_pd_state(port) 230 # Check port 1 231 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 232 status['connect'] = True 233 status['role'] = state 234 235 return status 236 237 def disable_pd_console_debug(self): 238 """Turn off PD console debug 239 240 """ 241 cmd = 'pd dump 0' 242 self.send_pd_command(cmd) 243 244 def enable_pd_console_debug(self): 245 """Enable PD console debug level 1 246 247 """ 248 cmd = 'pd dump 1' 249 self.send_pd_command(cmd) 250 251 def is_pd_flag_set(self, port, key): 252 """Test a bit in PD protocol state flags 253 254 The flag word contains various PD protocol state information. 255 This method allows for a specific flag to be tested. 256 257 @param port: Port which has the active PD connection 258 @param key: dict key to retrieve the flag bit mapping 259 260 @returns True if the bit to be tested is set 261 """ 262 pd_flags = self.get_pd_flags(port) 263 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) 264 265 def is_pd_connected(self, port): 266 """Check if a PD connection is active 267 268 @param port: port to be used for pd console commands 269 270 @returns True if port is in connected state 271 """ 272 state = self.get_pd_state(port) 273 return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT) 274 275 def is_pd_dual_role_enabled(self): 276 """Check if a PD device is in dualrole mode 277 278 @returns True is dualrole mode is active, false otherwise 279 """ 280 drp = self.get_pd_dualrole() 281 return bool(drp == self.dualrole_resp[self.dual_index['on']]) 282 283 284class PDConnectionUtils(PDConsoleUtils): 285 """Provides a set of methods common to USB PD FAFT tests 286 287 This Class is used for PD utility methods that require access 288 to both Plankton and DUT PD consoles. 289 290 """ 291 292 def __init__(self, dut_console, plankton_console): 293 """ 294 @param dut_console: PD console object for DUT 295 @param plankton_console: PD console object for Plankton 296 """ 297 # save console for DUT PD UART access functions 298 self.dut_console = dut_console 299 # save console for Plankton UART access functions 300 self.plankton_console = plankton_console 301 super(PDConnectionUtils, self).__init__(dut_console) 302 303 def _verify_plankton_connection(self, port): 304 """Verify DUT to Plankton PD connection 305 306 This method checks for a Plankton PD connection for the 307 given port by first verifying if a PD connection is present. 308 If found, then it uses a Plankton feature to force a PD disconnect. 309 If the port is no longer in the connected state, and following 310 a delay, is found to be back in the connected state, then 311 a DUT pd to Plankton connection is verified. 312 313 @param port: DUT pd port to test 314 315 @returns True if DUT to Plankton pd connection is verified 316 """ 317 DISCONNECT_CHECK_TIME = 0.5 318 DISCONNECT_TIME_SEC = 2 319 # plankton console command to force PD disconnect 320 disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) 321 # Only check for Plankton if DUT has active PD connection 322 if self.dut_console.is_pd_connected(port): 323 # Attempt to force PD disconnection 324 self.plankton_console.send_pd_command(disc_cmd) 325 time.sleep(DISCONNECT_CHECK_TIME) 326 # Verify that DUT PD port is no longer connected 327 if self.dut_console.is_pd_connected(port) == False: 328 # Wait for disconnect timer and give time to reconnect 329 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 330 if self.dut_console.is_pd_connected(port): 331 logging.info('Plankton connection verfied on port %d', port) 332 return True 333 else: 334 # Could have disconnected other port, allow it to reconnect 335 # before exiting. 336 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 337 return False 338 339 def find_dut_to_plankton_connection(self): 340 """Find the PD port which is connected to Plankton 341 342 @returns DUT pd port number if found, None otherwise 343 """ 344 for port in xrange(self.dut_console.PD_MAX_PORTS): 345 # Check for DUT to Plankton connection on port 346 if self._verify_plankton_connection(port): 347 # Plankton PD connection found so exit 348 return port 349 return None 350