1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import re 11import logging 12from six.moves import range 13import time 14 15from autotest_lib.client.common_lib import error 16from autotest_lib.server.cros.servo import pd_console 17 18 19class PDDevice(object): 20 """Base clase for all PD devices 21 22 This class provides a set of APIs for expected Type C PD required actions 23 in TypeC FAFT tests. The base class is specific for Type C devices that 24 do not have any console access. 25 26 """ 27 28 def is_src(self, state=None): 29 """Checks if the port is connected as a source 30 31 """ 32 raise NotImplementedError( 33 'is_src should be implemented in derived class') 34 35 def is_snk(self, state=None): 36 """Checks if the port is connected as a sink 37 38 @returns None 39 """ 40 raise NotImplementedError( 41 'is_snk should be implemented in derived class') 42 43 def is_connected(self, state=None): 44 """Checks if the port is connected 45 46 @returns True if in a connected state, False otherwise 47 """ 48 return self.is_src(state) or self.is_snk(state) 49 50 def is_disconnected(self, state=None): 51 """Checks if the port is disconnected 52 53 """ 54 raise NotImplementedError( 55 'is_disconnected should be implemented in derived class') 56 57 def is_ufp(self): 58 """Checks if data role is UFP 59 60 """ 61 raise NotImplementedError( 62 'is_ufp should be implemented in derived class') 63 64 def is_dfp(self): 65 """Checks if data role is DFP 66 67 """ 68 raise NotImplementedError( 69 'is_dfp should be implemented in derived class') 70 71 def is_drp(self): 72 """Checks if dual role mode is supported 73 74 """ 75 raise NotImplementedError( 76 'is_drp should be implemented in derived class') 77 78 def dr_swap(self): 79 """Attempts a data role swap 80 81 """ 82 raise NotImplementedError( 83 'dr_swap should be implemented in derived class') 84 85 def pr_swap(self): 86 """Attempts a power role swap 87 88 """ 89 raise NotImplementedError( 90 'pr_swap should be implemented in derived class') 91 92 def vbus_request(self, voltage): 93 """Requests a specific VBUS voltage from SRC 94 95 @param voltage: requested voltage level (5, 12, 20) in volts 96 """ 97 raise NotImplementedError( 98 'vbus_request should be implemented in derived class') 99 100 def soft_reset(self): 101 """Initates a PD soft reset sequence 102 103 """ 104 raise NotImplementedError( 105 'soft_reset should be implemented in derived class') 106 107 def hard_reset(self): 108 """Initates a PD hard reset sequence 109 110 """ 111 raise NotImplementedError( 112 'hard_reset should be implemented in derived class') 113 114 def drp_set(self, mode): 115 """Sets dualrole mode 116 117 @param mode: desired dual role setting (on, off, snk, src) 118 """ 119 raise NotImplementedError( 120 'drp_set should be implemented in derived class') 121 122 def drp_get(self): 123 """Gets dualrole mode 124 125 @returns one of the modes (on, off, snk, src) 126 """ 127 raise NotImplementedError( 128 'drp_set should be implemented in derived class') 129 130 def drp_disconnect_connect(self, disc_time_sec): 131 """Force PD disconnect/connect via drp settings 132 133 @param disc_time_sec: Time in seconds between disconnect and reconnect 134 """ 135 raise NotImplementedError( 136 'drp_disconnect_connect should be implemented in derived class' 137 ) 138 139 def cc_disconnect_connect(self, disc_time_sec): 140 """Force PD disconnect/connect 141 142 @param disc_time_sec: Time in seconds between disconnect and reconnect 143 """ 144 raise NotImplementedError( 145 'cc_disconnect_connect should be implemented in derived class') 146 147 def get_connected_state_after_cc_reconnect(self, disc_time_sec): 148 """Get the connected state after disconnect/reconnect 149 150 @param disc_time_sec: Time in seconds for disconnect period. 151 @returns: The connected PD state. 152 """ 153 raise NotImplementedError( 154 'get_connected_state_after_cc_reconnect should be implemented' 155 'in derived class') 156 157 158class PDConsoleDevice(PDDevice): 159 """Class for PD devices that have console access 160 161 This class contains methods for common PD actions for any PD device which 162 has UART console access. It inherits the PD device base class. In addition, 163 it stores both the UART console and port for the PD device. 164 """ 165 166 def __init__(self, console, port, utils): 167 """Initialization method 168 169 @param console: UART console object 170 @param port: USB PD port number 171 """ 172 # Save UART console 173 self.console = console 174 # Instantiate PD utilities used by methods in this class 175 self.utils = utils 176 # Save the PD port number for this device 177 self.port = port 178 # Not a PDTester device 179 self.is_pdtester = False 180 181 def get_pd_state(self): 182 """Get the state of the PD port""" 183 return self.utils.get_pd_state(self.port) 184 185 def get_pd_role(self): 186 """Get the current PD power role (source or sink) 187 188 @returns: current pd state 189 """ 190 return self.utils.get_pd_role(self.port) 191 192 def is_pd_flag_set(self, key): 193 """Test a bit in PD protocol state flags 194 195 The flag word contains various PD protocol state information. 196 This method allows for a specific flag to be tested. 197 198 @param key: dict key to retrieve the flag bit mapping 199 200 @returns True if the bit to be tested is set 201 """ 202 return self.utils.is_pd_flag_set(self.port, key) 203 204 def is_src(self, state=None): 205 """Checks if the port is connected as a source. 206 207 The "state" argument allows the caller to get_pd_state() once, and then 208 evaluate multiple conditions without re-getting the state. 209 210 @param state: the state to check (None to get current state) 211 @returns True if connected as SRC, False otherwise 212 """ 213 return self.utils.is_src_connected(self.port, state) 214 215 def is_snk(self, state=None): 216 """Checks if the port is connected as a sink 217 218 The "state" argument allows the caller to get_pd_state() once, and then 219 evaluate multiple conditions without re-getting the state. 220 221 @param state: the state to check (None to get current state) 222 @returns True if connected as SNK, False otherwise 223 """ 224 return self.utils.is_snk_connected(self.port, state) 225 226 def is_connected(self, state=None): 227 """Checks if the port is connected 228 229 The "state" argument allows the caller to get_pd_state() once, and then 230 evaluate multiple conditions without re-getting the state. 231 232 @param state: the state to check (None to get current state) 233 @returns True if in a connected state, False otherwise 234 """ 235 return self.is_snk(state) or self.is_src(state) 236 237 def is_disconnected(self, state=None): 238 """Checks if the port is disconnected 239 240 @returns True if in a disconnected state, False otherwise 241 """ 242 return self.utils.is_disconnected(self.port, state) 243 244 def __repr__(self): 245 """String representation of the object""" 246 return "<%s %r port %s>" % ( 247 self.__class__.__name__, self.console.name, self.port) 248 249 def is_drp(self): 250 """Checks if dual role mode is supported 251 252 @returns True if dual role mode is 'on', False otherwise 253 """ 254 return self.utils.is_pd_dual_role_enabled(self.port) 255 256 def drp_disconnect_connect(self, disc_time_sec): 257 """Disconnect/reconnect using drp mode settings 258 259 A PD console device doesn't have an explicit connect/disconnect 260 command. Instead, the dualrole mode setting is used to force 261 disconnects in devices which support this feature. To disconnect, 262 force the dualrole mode to be the opposite role of the current 263 connected state. 264 265 @param disc_time_sec: time in seconds to wait to reconnect 266 267 @returns True if device disconnects, then returns to a connected 268 state. False if either step fails. 269 """ 270 # Dualrole mode must be supported 271 if self.is_drp() is False: 272 logging.warn('Device not DRP capable, unabled to force disconnect') 273 return False 274 # Force state will be the opposite of current connect state 275 if self.is_src(): 276 drp_mode = 'snk' 277 swap_state = self.utils.get_snk_connect_states() 278 else: 279 drp_mode = 'src' 280 swap_state = self.utils.get_src_connect_states() 281 # Force disconnect 282 self.drp_set(drp_mode) 283 # Wait for disconnect time 284 time.sleep(disc_time_sec) 285 # Verify that the device is disconnected 286 disconnect = self.is_disconnected() 287 288 # If the other device is dualrole, then forcing dualrole mode will 289 # only cause the disconnect to appear momentarily and reconnect 290 # in the power role forced by the drp_set() call. For this case, 291 # the role swap verifies that a disconnect/connect sequence occurred. 292 if disconnect == False: 293 time.sleep(self.utils.CONNECT_TIME) 294 # Connected, verify if power role swap has occurred 295 if self.utils.get_pd_state(self.port) in swap_state: 296 # Restore default dualrole mode 297 self.drp_set('on') 298 # Restore orignal power role 299 connect = self.pr_swap() 300 if connect == False: 301 logging.warn('DRP on both devices, 2nd power swap failed') 302 return connect 303 304 # Restore default dualrole mode 305 self.drp_set('on') 306 # Allow enough time for protocol state machine 307 time.sleep(self.utils.CONNECT_TIME) 308 # Check if connected 309 connect = self.is_connected() 310 logging.info('Disconnect = %r, Connect = %r', disconnect, connect) 311 return bool(disconnect and connect) 312 313 def drp_set(self, mode): 314 """Sets dualrole mode 315 316 @param mode: desired dual role setting (on, off, snk, src) 317 318 @returns True is set was successful, False otherwise 319 """ 320 # Set desired dualrole mode 321 self.utils.set_pd_dualrole(self.port, mode) 322 # Get current setting 323 current = self.utils.get_pd_dualrole(self.port) 324 # Verify that setting is correct 325 return bool(mode == current) 326 327 def drp_get(self): 328 """Gets dualrole mode 329 330 @returns one of the modes (on, off, snk, src) 331 """ 332 return self.utils.get_pd_dualrole(self.port) 333 334 def try_src(self, enable): 335 """Enables/Disables Try.SRC PD protocol setting 336 337 @param enable: True to enable, False to disable 338 339 @returns True is setting was successful, False if feature not 340 supported by the device, or not set as desired. 341 """ 342 # Create Try.SRC pd command 343 cmd = 'pd trysrc %d' % int(enable) 344 # TCPMv1 indicates Try.SRC is on by returning 'on' 345 # TCPMv2 indicates Try.SRC is on by returning 'Forced ON' 346 on_vals = ('on', 'Forced ON') 347 # TCPMv1 indicates Try.SRC is off by returning 'off' 348 # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF' 349 off_vals = ('off', 'Forced OFF') 350 351 # Try.SRC on/off is output, if supported feature 352 regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))] 353 m = self.utils.send_pd_command_get_output(cmd, regex) 354 355 # Determine if Try.SRC feature is supported 356 if 'Try.SRC' not in m[0][0]: 357 logging.warn('Try.SRC not supported on this PD device') 358 return False 359 360 # TrySRC is supported on this PD device, verify setting. 361 trysrc_val = m[0][1] 362 logging.info('Try.SRC mode = %s', trysrc_val) 363 if enable: 364 vals = on_vals 365 else: 366 vals = off_vals 367 368 return trysrc_val in vals 369 370 def soft_reset(self): 371 """Initates a PD soft reset sequence 372 373 To verify that a soft reset sequence was initiated, the 374 reply message is checked to verify that the reset command 375 was acknowledged by its port pair. The connect state should 376 be same as it was prior to issuing the reset command. 377 378 @returns True if the port pair acknowledges the the reset message 379 and if following the command, the device returns to the same 380 connected state. False otherwise. 381 """ 382 RESET_DELAY = 0.5 383 cmd = 'pd %d soft' % self.port 384 state_before = self.utils.get_pd_state(self.port) 385 reply = self.utils.send_pd_command_get_reply_msg(cmd) 386 if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']: 387 return False 388 time.sleep(RESET_DELAY) 389 state_after = self.utils.get_pd_state(self.port) 390 return state_before == state_after 391 392 def hard_reset(self): 393 """Initates a PD hard reset sequence 394 395 To verify that a hard reset sequence was initiated, the 396 console ouput is scanned for HARD RST TX. In addition, the connect 397 state should be same as it was prior to issuing the reset command. 398 399 @returns True if the port pair acknowledges that hard reset was 400 initiated and if following the command, the device returns to the same 401 connected state. False otherwise. 402 """ 403 RESET_DELAY = 1.0 404 cmd = 'pd %d hard' % self.port 405 state_before = self.utils.get_pd_state(self.port) 406 self.utils.enable_pd_console_debug() 407 try: 408 tcpmv1_pattern = '.*(HARD\sRST\sTX)' 409 tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)' 410 pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern)) 411 self.utils.send_pd_command_get_output(cmd, [pattern]) 412 except error.TestFail: 413 logging.warn('HARD RST TX not found') 414 return False 415 finally: 416 self.utils.disable_pd_console_debug() 417 418 time.sleep(RESET_DELAY) 419 state_after = self.utils.get_pd_state(self.port) 420 return state_before == state_after 421 422 def pr_swap(self): 423 """Attempts a power role swap 424 425 In order to attempt a power role swap the device must be 426 connected and support dualrole mode. Once these two criteria 427 are checked a power role command is issued. Following a delay 428 to allow for a reconnection the new power role is checked 429 against the power role prior to issuing the command. 430 431 @returns True if the device has swapped power roles, False otherwise. 432 """ 433 # Get starting state 434 if not self.is_drp(): 435 logging.warn('Dualrole Mode not enabled!') 436 return False 437 if self.is_connected() == False: 438 logging.warn('PD contract not established!') 439 return False 440 current_pr = self.utils.get_pd_state(self.port) 441 swap_cmd = 'pd %d swap power' % self.port 442 self.utils.send_pd_command(swap_cmd) 443 time.sleep(self.utils.CONNECT_TIME) 444 new_pr = self.utils.get_pd_state(self.port) 445 logging.info('Power swap: %s -> %s', current_pr, new_pr) 446 if self.is_connected() == False: 447 logging.warn('Device not connected following PR swap attempt.') 448 return False 449 return current_pr != new_pr 450 451 452class PDTesterDevice(PDConsoleDevice): 453 """Class for PDTester devices 454 455 This class contains methods for PD funtions which are unique to the 456 PDTester board, e.g. Plankton or Servo v4. It inherits all the methods 457 for PD console devices. 458 """ 459 460 def __init__(self, console, port, utils): 461 """Initialization method 462 463 @param console: UART console for this device 464 @param port: USB PD port number 465 """ 466 # Instantiate the PD console object 467 super(PDTesterDevice, self).__init__(console, port, utils) 468 # Indicate this is PDTester device 469 self.is_pdtester = True 470 471 def _toggle_pdtester_drp(self): 472 """Issue 'usbc_action drp' PDTester command 473 474 @returns value of drp_enable in PDTester FW 475 """ 476 drp_cmd = 'usbc_action drp' 477 drp_re = ['DRP\s=\s(\d)'] 478 # Send DRP toggle command to PDTester and get value of 'drp_enable' 479 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re) 480 return int(m[0][1]) 481 482 def _enable_pdtester_drp(self): 483 """Enable DRP mode on PDTester 484 485 DRP mode can only be toggled and is not able to be explicitly 486 enabled/disabled via the console. Therefore, this method will 487 toggle DRP mode until the console reply indicates that this 488 mode is enabled. The toggle happens a maximum of two times 489 in case this is called when it's already enabled. 490 491 @returns True when DRP mode is enabled, False if not successful 492 """ 493 for attempt in range(2): 494 if self._toggle_pdtester_drp() == True: 495 logging.info('PDTester DRP mode enabled') 496 return True 497 logging.error('PDTester DRP mode set failure') 498 return False 499 500 def _verify_state_sequence(self, states_list, console_log): 501 """Compare PD state transitions to expected values 502 503 @param states_list: list of expected PD state transitions 504 @param console_log: console output which contains state names 505 506 @returns True if the sequence matches, False otherwise 507 """ 508 # For each state in the expected state transiton table, build 509 # the regexp and search for it in the state transition log. 510 for state in states_list: 511 state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port, 512 state) 513 if re.search(state_regx, console_log) is None: 514 return False 515 return True 516 517 def cc_disconnect_connect(self, disc_time_sec): 518 """Disconnect/reconnect using PDTester 519 520 PDTester supports a feature which simulates a USB Type C disconnect 521 and reconnect. 522 523 @param disc_time_sec: Time in seconds for disconnect period. 524 """ 525 DISC_DELAY = 100 526 disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, 527 disc_time_sec * 1000) 528 self.utils.send_pd_command(disc_cmd) 529 530 def get_connected_state_after_cc_reconnect(self, disc_time_sec): 531 """Get the connected state after disconnect/reconnect using PDTester 532 533 PDTester supports a feature which simulates a USB Type C disconnect 534 and reconnect. It returns the first connected state (either source or 535 sink) after reconnect. 536 537 @param disc_time_sec: Time in seconds for disconnect period. 538 @returns: The connected PD state. 539 """ 540 DISC_DELAY = 100 541 disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000) 542 src_connected_tuple = self.utils.get_src_connect_states() 543 snk_connected_tuple = self.utils.get_snk_connect_states() 544 connected_exp = '|'.join(src_connected_tuple + snk_connected_tuple) 545 reply_exp = ['(.*)(C%d)\s+[\w]+:?\s(%s)' % (self.port, connected_exp)] 546 m = self.utils.send_pd_command_get_output(disc_cmd, reply_exp) 547 return m[0][3] 548 549 def drp_disconnect_connect(self, disc_time_sec): 550 """Disconnect/reconnect using PDTester 551 552 Utilize PDTester disconnect/connect utility and verify 553 that both disconnect and reconnect actions were successful. 554 555 @param disc_time_sec: Time in seconds for disconnect period. 556 557 @returns True if device disconnects, then returns to a connected 558 state. False if either step fails. 559 """ 560 self.cc_disconnect_connect(disc_time_sec) 561 time.sleep(disc_time_sec / 2) 562 disconnect = self.is_disconnected() 563 time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME) 564 connect = self.is_connected() 565 return disconnect and connect 566 567 def drp_set(self, mode): 568 """Sets dualrole mode 569 570 @param mode: desired dual role setting (on, off, snk, src) 571 572 @returns True if dualrole mode matches the requested value or 573 is successfully set to that value. False, otherwise. 574 """ 575 # Get current value of dualrole 576 drp = self.utils.get_pd_dualrole(self.port) 577 if drp == mode: 578 return True 579 580 if mode == 'on': 581 # Setting dpr_enable on PDTester will set dualrole mode to on 582 return self._enable_pdtester_drp() 583 else: 584 # If desired setting is other than 'on', need to ensure that 585 # drp mode on PDTester is disabled. 586 if drp == 'on': 587 # This will turn off drp_enable flag and set dualmode to 'off' 588 return self._toggle_pdtester_drp() 589 # With drp_enable flag off, can set to desired setting 590 return self.utils.set_pd_dualrole(self.port, mode) 591 592 def _reset(self, cmd, states_list): 593 """Initates a PD reset sequence 594 595 PDTester device has state names available on the console. When 596 a soft reset is issued the console log is extracted and then 597 compared against the expected state transisitons. 598 599 @param cmd: reset type (soft or hard) 600 @param states_list: list of expected PD state transitions 601 602 @returns True if state transitions match, False otherwise 603 """ 604 # Want to grab all output until either SRC_READY or SNK_READY 605 reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port] 606 m = self.utils.send_pd_command_get_output(cmd, reply_exp) 607 return self._verify_state_sequence(states_list, m[0][0]) 608 609 def soft_reset(self): 610 """Initates a PD soft reset sequence 611 612 @returns True if state transitions match, False otherwise 613 """ 614 snk_reset_states = [ 615 'SOFT_RESET', 616 'SNK_DISCOVERY', 617 'SNK_REQUESTED', 618 'SNK_TRANSITION', 619 'SNK_READY' 620 ] 621 622 src_reset_states = [ 623 'SOFT_RESET', 624 'SRC_DISCOVERY', 625 'SRC_NEGOCIATE', 626 'SRC_ACCEPTED', 627 'SRC_POWERED', 628 'SRC_TRANSITION', 629 'SRC_READY' 630 ] 631 632 if self.is_src(): 633 states_list = src_reset_states 634 elif self.is_snk(): 635 states_list = snk_reset_states 636 else: 637 raise error.TestFail('Port Pair not in a connected state') 638 639 cmd = 'pd %d soft' % self.port 640 return self._reset(cmd, states_list) 641 642 def hard_reset(self): 643 """Initates a PD hard reset sequence 644 645 @returns True if state transitions match, False otherwise 646 """ 647 snk_reset_states = [ 648 'HARD_RESET_SEND', 649 'HARD_RESET_EXECUTE', 650 'SNK_HARD_RESET_RECOVER', 651 'SNK_DISCOVERY', 652 'SNK_REQUESTED', 653 'SNK_TRANSITION', 654 'SNK_READY' 655 ] 656 657 src_reset_states = [ 658 'HARD_RESET_SEND', 659 'HARD_RESET_EXECUTE', 660 'SRC_HARD_RESET_RECOVER', 661 'SRC_DISCOVERY', 662 'SRC_NEGOCIATE', 663 'SRC_ACCEPTED', 664 'SRC_POWERED', 665 'SRC_TRANSITION', 666 'SRC_READY' 667 ] 668 669 if self.is_src(): 670 states_list = src_reset_states 671 elif self.is_snk(): 672 states_list = snk_reset_states 673 else: 674 raise error.TestFail('Port Pair not in a connected state') 675 676 cmd = 'pd %d hard' % self.port 677 return self._reset(cmd, states_list) 678 679 680class PDPortPartner(object): 681 """Methods used to instantiate PD device objects 682 683 This class is initalized with a list of servo consoles. It 684 contains methods to determine if USB PD devices are accessible 685 via the consoles and attempts to determine USB PD port partners. 686 A PD device is USB PD port specific, a single console may access 687 multiple PD devices. 688 689 """ 690 691 def __init__(self, consoles): 692 """Initialization method 693 694 @param consoles: list of servo consoles 695 """ 696 self.consoles = consoles 697 698 def __repr__(self): 699 """String representation of the object""" 700 return "<%s %r>" % (self.__class__.__name__, self.consoles) 701 702 def _send_pd_state(self, port, console): 703 """Tests if PD device exists on a given port number 704 705 @param port: USB PD port number to try 706 @param console: servo UART console 707 708 @returns True if 'pd <port> state' command gives a valid 709 response, False otherwise 710 """ 711 cmd = 'pd %d state' % port 712 regex = r'(Port C\d)|(Parameter)' 713 m = console.send_command_get_output(cmd, [regex]) 714 # If PD port exists, then output will be Port C0 or C1 715 regex = r'Port C{0}'.format(port) 716 if re.search(regex, m[0][0]): 717 return True 718 return False 719 720 def _find_num_pd_ports(self, console): 721 """Determine number of PD ports for a given console 722 723 @param console: uart console accssed via servo 724 725 @returns: number of PD ports accessible via console 726 """ 727 MAX_PORTS = 2 728 num_ports = 0 729 for port in range(MAX_PORTS): 730 if self._send_pd_state(port, console): 731 num_ports += 1 732 return num_ports 733 734 def _is_pd_console(self, console): 735 """Check if pd option exists in console 736 737 @param console: uart console accssed via servo 738 739 @returns: True if 'pd' is found, False otherwise 740 """ 741 try: 742 m = console.send_command_get_output('help', [r'(pd)\s+']) 743 return True 744 except error.TestFail: 745 return False 746 747 def _is_pdtester_console(self, console): 748 """Check for PDTester console 749 750 This method looks for a console command option 'usbc_action' which 751 is unique to PDTester PD devices. 752 753 @param console: uart console accssed via servo 754 755 @returns True if usbc_action command is present, False otherwise 756 """ 757 try: 758 m = console.send_command_get_output('help', [r'(usbc_action)']) 759 return True 760 except error.TestFail: 761 return False 762 763 def _check_port_pair(self, port1, port2): 764 """Check if two PD devices could be connected 765 766 If two USB PD devices are connected, then they should be in 767 either the SRC_READY or SNK_READY states and have opposite 768 power roles. In addition, they must be on different servo 769 consoles. 770 771 @param: list of two possible PD port parters 772 773 @returns True if not the same console and both PD devices 774 are a plausible pair based only on their PD states. 775 """ 776 # Don't test if on the same servo console 777 if port1.console == port2.console: 778 logging.info("PD Devices are on same platform -> can't be a pair") 779 return False 780 781 state1 = port1.get_pd_state() 782 port1_is_snk = port1.is_snk(state1) 783 port1_is_src = port1.is_src(state1) 784 785 state2 = port2.get_pd_state() 786 port2_is_snk = port2.is_snk(state2) 787 port2_is_src = port2.is_src(state2) 788 789 # Must be SRC <--> SNK or SNK <--> SRC 790 if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src): 791 logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s", 792 port1, state1, state2, port2) 793 return True 794 else: 795 logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s", 796 port1, state1, state2, port2) 797 return False 798 799 def _verify_pdtester_connection(self, tester_port, dut_port): 800 """Verify DUT to PDTester PD connection 801 802 This method checks for a PDTester PD connection for the 803 given port by first verifying if a PD connection is present. 804 If found, then it uses a PDTester feature to force a PD disconnect. 805 If the port is no longer in the connected state, and following 806 a delay, is found to be back in the connected state, then 807 a DUT pd to PDTester connection is verified. 808 809 @param dev_pair: list of two PD devices 810 811 @returns True if DUT to PDTester pd connection is verified 812 """ 813 DISC_CHECK_TIME = 10 814 DISC_WAIT_TIME = 20 815 CONNECT_TIME = 4 816 817 logging.info("Check: %s <--> %s", tester_port, dut_port) 818 819 if not self._check_port_pair(tester_port, dut_port): 820 return False 821 822 # Force PD disconnect 823 logging.debug('Disconnecting to check if devices are partners') 824 tester_port.cc_disconnect_connect(DISC_WAIT_TIME) 825 time.sleep(DISC_CHECK_TIME) 826 827 # Verify that both devices are now disconnected 828 tester_state = tester_port.get_pd_state() 829 dut_state = dut_port.get_pd_state() 830 logging.debug("Recheck: %s (%s) <--> (%s) %s", 831 tester_port, tester_state, dut_state, dut_port) 832 833 if not (tester_port.is_disconnected(tester_state) and 834 dut_port.is_disconnected(dut_state)): 835 logging.info("Ports did not disconnect at the same time, so" 836 " they aren't considered a pair.") 837 # Delay to allow non-pair devices to reconnect 838 time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 839 return False 840 841 logging.debug('Pair disconnected. Waiting for reconnect...') 842 843 # Allow enough time for reconnection 844 time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 845 if self._check_port_pair(tester_port, dut_port): 846 # Have verified a pd disconnect/reconnect sequence 847 logging.info('PDTester <--> DUT pair found') 848 return True 849 850 logging.info("Ports did not reconnect at the same time, so" 851 " they aren't considered a pair.") 852 return False 853 854 def identify_pd_devices(self): 855 """Instantiate PD devices present in test setup 856 857 @return: list of 2 PD devices if a DUT <-> PDTester found. 858 If not found, then returns an empty list. 859 """ 860 tester_devports = [] 861 dut_devports = [] 862 863 # For each possible uart console, check to see if a PD console 864 # is present and determine the number of PD ports. 865 for console in self.consoles: 866 if self._is_pd_console(console): 867 is_tester = self._is_pdtester_console(console) 868 num_ports = self._find_num_pd_ports(console) 869 # For each PD port that can be accessed via the console, 870 # instantiate either PDConsole or PDTester device. 871 for port in range(num_ports): 872 if is_tester: 873 logging.info('PDTesterDevice on %s port %d', 874 console.name, port) 875 tester_utils = pd_console.create_pd_console_utils( 876 console) 877 tester_devports.append(PDTesterDevice(console, 878 port, tester_utils)) 879 else: 880 logging.info('PDConsoleDevice on %s port %d', 881 console.name, port) 882 dut_utils = pd_console.create_pd_console_utils(console) 883 dut_devports.append(PDConsoleDevice(console, 884 port, dut_utils)) 885 886 if not tester_devports: 887 logging.error('The specified consoles did not include any' 888 ' PD testers: %s', self.consoles) 889 890 if not dut_devports: 891 logging.error('The specified consoles did not contain any' 892 ' DUTs: %s', self.consoles) 893 894 # Determine PD port partners in the list of PD devices. Note that 895 # there can be PD devices which are not accessible via a uart console, 896 # but are connected to a PD port which is accessible. 897 for tester in reversed(tester_devports): 898 for dut in dut_devports: 899 if tester.console == dut.console: 900 # PD Devices are on same servo console -> can't be a pair 901 continue 902 if self._verify_pdtester_connection(tester, dut): 903 dut_devports.remove(dut) 904 return [tester, dut] 905 906 return [] 907