1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import re
11import logging
12from six.moves import range
13import time
14
15from autotest_lib.client.common_lib import error
16from autotest_lib.server.cros.servo import pd_console
17
18
19class PDDevice(object):
20    """Base clase for all PD devices
21
22    This class provides a set of APIs for expected Type C PD required actions
23    in TypeC FAFT tests. The base class is specific for Type C devices that
24    do not have any console access.
25
26    """
27
28    def is_src(self, state=None):
29        """Checks if the port is connected as a source
30
31        """
32        raise NotImplementedError(
33                'is_src should be implemented in derived class')
34
35    def is_snk(self, state=None):
36        """Checks if the port is connected as a sink
37
38        @returns None
39        """
40        raise NotImplementedError(
41                'is_snk should be implemented in derived class')
42
43    def is_connected(self, state=None):
44        """Checks if the port is connected
45
46        @returns True if in a connected state, False otherwise
47        """
48        return self.is_src(state) or self.is_snk(state)
49
50    def is_disconnected(self, state=None):
51        """Checks if the port is disconnected
52
53        """
54        raise NotImplementedError(
55                'is_disconnected should be implemented in derived class')
56
57    def is_ufp(self):
58        """Checks if data role is UFP
59
60        """
61        raise NotImplementedError(
62                'is_ufp should be implemented in derived class')
63
64    def is_dfp(self):
65        """Checks if data role is DFP
66
67        """
68        raise NotImplementedError(
69                'is_dfp should be implemented in derived class')
70
71    def is_drp(self):
72        """Checks if dual role mode is supported
73
74        """
75        raise NotImplementedError(
76                'is_drp should be implemented in derived class')
77
78    def dr_swap(self):
79        """Attempts a data role swap
80
81        """
82        raise NotImplementedError(
83               'dr_swap should be implemented in derived class')
84
85    def pr_swap(self):
86        """Attempts a power role swap
87
88        """
89        raise NotImplementedError(
90                'pr_swap should be implemented in derived class')
91
92    def vbus_request(self, voltage):
93        """Requests a specific VBUS voltage from SRC
94
95        @param voltage: requested voltage level (5, 12, 20) in volts
96        """
97        raise NotImplementedError(
98                'vbus_request should be implemented in derived class')
99
100    def soft_reset(self):
101        """Initates a PD soft reset sequence
102
103        """
104        raise NotImplementedError(
105                'soft_reset should be implemented in derived class')
106
107    def hard_reset(self):
108        """Initates a PD hard reset sequence
109
110        """
111        raise NotImplementedError(
112                'hard_reset should be implemented in derived class')
113
114    def drp_set(self, mode):
115        """Sets dualrole mode
116
117        @param mode: desired dual role setting (on, off, snk, src)
118        """
119        raise NotImplementedError(
120                'drp_set should be implemented in derived class')
121
122    def drp_get(self):
123        """Gets dualrole mode
124
125        @returns one of the modes (on, off, snk, src)
126        """
127        raise NotImplementedError(
128                'drp_set should be implemented in derived class')
129
130    def drp_disconnect_connect(self, disc_time_sec):
131        """Force PD disconnect/connect via drp settings
132
133        @param disc_time_sec: Time in seconds between disconnect and reconnect
134        """
135        raise NotImplementedError(
136                'drp_disconnect_connect should be implemented in derived class'
137        )
138
139    def cc_disconnect_connect(self, disc_time_sec):
140        """Force PD disconnect/connect
141
142        @param disc_time_sec: Time in seconds between disconnect and reconnect
143        """
144        raise NotImplementedError(
145                'cc_disconnect_connect should be implemented in derived class')
146
147    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
148        """Get the connected state after disconnect/reconnect
149
150        @param disc_time_sec: Time in seconds for disconnect period.
151        @returns: The connected PD state.
152        """
153        raise NotImplementedError(
154                'get_connected_state_after_cc_reconnect should be implemented'
155                'in derived class')
156
157
158class PDConsoleDevice(PDDevice):
159    """Class for PD devices that have console access
160
161    This class contains methods for common PD actions for any PD device which
162    has UART console access. It inherits the PD device base class. In addition,
163    it stores both the UART console and port for the PD device.
164    """
165
166    def __init__(self, console, port, utils):
167        """Initialization method
168
169        @param console: UART console object
170        @param port: USB PD port number
171        """
172        # Save UART console
173        self.console = console
174        # Instantiate PD utilities used by methods in this class
175        self.utils = utils
176        # Save the PD port number for this device
177        self.port = port
178        # Not a PDTester device
179        self.is_pdtester = False
180
181    def get_pd_state(self):
182        """Get the state of the PD port"""
183        return self.utils.get_pd_state(self.port)
184
185    def get_pd_role(self):
186        """Get the current PD power role (source or sink)
187
188        @returns: current pd state
189        """
190        return self.utils.get_pd_role(self.port)
191
192    def is_pd_flag_set(self, key):
193        """Test a bit in PD protocol state flags
194
195        The flag word contains various PD protocol state information.
196        This method allows for a specific flag to be tested.
197
198        @param key: dict key to retrieve the flag bit mapping
199
200        @returns True if the bit to be tested is set
201        """
202        return self.utils.is_pd_flag_set(self.port, key)
203
204    def is_src(self, state=None):
205        """Checks if the port is connected as a source.
206
207        The "state" argument allows the caller to get_pd_state() once, and then
208        evaluate multiple conditions without re-getting the state.
209
210        @param state: the state to check (None to get current state)
211        @returns True if connected as SRC, False otherwise
212        """
213        return self.utils.is_src_connected(self.port, state)
214
215    def is_snk(self, state=None):
216        """Checks if the port is connected as a sink
217
218        The "state" argument allows the caller to get_pd_state() once, and then
219        evaluate multiple conditions without re-getting the state.
220
221        @param state: the state to check (None to get current state)
222        @returns True if connected as SNK, False otherwise
223        """
224        return self.utils.is_snk_connected(self.port, state)
225
226    def is_connected(self, state=None):
227        """Checks if the port is connected
228
229        The "state" argument allows the caller to get_pd_state() once, and then
230        evaluate multiple conditions without re-getting the state.
231
232        @param state: the state to check (None to get current state)
233        @returns True if in a connected state, False otherwise
234        """
235        return self.is_snk(state) or self.is_src(state)
236
237    def is_disconnected(self, state=None):
238        """Checks if the port is disconnected
239
240        @returns True if in a disconnected state, False otherwise
241        """
242        return self.utils.is_disconnected(self.port, state)
243
244    def __repr__(self):
245        """String representation of the object"""
246        return "<%s %r port %s>" % (
247            self.__class__.__name__, self.console.name, self.port)
248
249    def is_drp(self):
250        """Checks if dual role mode is supported
251
252        @returns True if dual role mode is 'on', False otherwise
253        """
254        return self.utils.is_pd_dual_role_enabled(self.port)
255
256    def drp_disconnect_connect(self, disc_time_sec):
257        """Disconnect/reconnect using drp mode settings
258
259        A PD console device doesn't have an explicit connect/disconnect
260        command. Instead, the dualrole mode setting is used to force
261        disconnects in devices which support this feature. To disconnect,
262        force the dualrole mode to be the opposite role of the current
263        connected state.
264
265        @param disc_time_sec: time in seconds to wait to reconnect
266
267        @returns True if device disconnects, then returns to a connected
268        state. False if either step fails.
269        """
270        # Dualrole mode must be supported
271        if self.is_drp() is False:
272            logging.warn('Device not DRP capable, unabled to force disconnect')
273            return False
274        # Force state will be the opposite of current connect state
275        if self.is_src():
276            drp_mode = 'snk'
277            swap_state = self.utils.get_snk_connect_states()
278        else:
279            drp_mode = 'src'
280            swap_state = self.utils.get_src_connect_states()
281        # Force disconnect
282        self.drp_set(drp_mode)
283        # Wait for disconnect time
284        time.sleep(disc_time_sec)
285        # Verify that the device is disconnected
286        disconnect = self.is_disconnected()
287
288        # If the other device is dualrole, then forcing dualrole mode will
289        # only cause the disconnect to appear momentarily and reconnect
290        # in the power role forced by the drp_set() call. For this case,
291        # the role swap verifies that a disconnect/connect sequence occurred.
292        if disconnect == False:
293            time.sleep(self.utils.CONNECT_TIME)
294            # Connected, verify if power role swap has occurred
295            if self.utils.get_pd_state(self.port) in swap_state:
296                # Restore default dualrole mode
297                self.drp_set('on')
298                # Restore orignal power role
299                connect = self.pr_swap()
300                if connect == False:
301                    logging.warn('DRP on both devices, 2nd power swap failed')
302                return connect
303
304        # Restore default dualrole mode
305        self.drp_set('on')
306        # Allow enough time for protocol state machine
307        time.sleep(self.utils.CONNECT_TIME)
308        # Check if connected
309        connect = self.is_connected()
310        logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
311        return bool(disconnect and connect)
312
313    def drp_set(self, mode):
314        """Sets dualrole mode
315
316        @param mode: desired dual role setting (on, off, snk, src)
317
318        @returns True is set was successful, False otherwise
319        """
320        # Set desired dualrole mode
321        self.utils.set_pd_dualrole(self.port, mode)
322        # Get current setting
323        current = self.utils.get_pd_dualrole(self.port)
324        # Verify that setting is correct
325        return bool(mode == current)
326
327    def drp_get(self):
328        """Gets dualrole mode
329
330        @returns one of the modes (on, off, snk, src)
331        """
332        return self.utils.get_pd_dualrole(self.port)
333
334    def try_src(self, enable):
335        """Enables/Disables Try.SRC PD protocol setting
336
337        @param enable: True to enable, False to disable
338
339        @returns True is setting was successful, False if feature not
340        supported by the device, or not set as desired.
341        """
342        # Create Try.SRC pd command
343        cmd = 'pd trysrc %d' % int(enable)
344        # TCPMv1 indicates Try.SRC is on by returning 'on'
345        # TCPMv2 indicates Try.SRC is on by returning 'Forced ON'
346        on_vals = ('on', 'Forced ON')
347        # TCPMv1 indicates Try.SRC is off by returning 'off'
348        # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF'
349        off_vals = ('off', 'Forced OFF')
350
351        # Try.SRC on/off is output, if supported feature
352        regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))]
353        m = self.utils.send_pd_command_get_output(cmd, regex)
354
355        # Determine if Try.SRC feature is supported
356        if 'Try.SRC' not in m[0][0]:
357            logging.warn('Try.SRC not supported on this PD device')
358            return False
359
360        # TrySRC is supported on this PD device, verify setting.
361        trysrc_val = m[0][1]
362        logging.info('Try.SRC mode = %s', trysrc_val)
363        if enable:
364            vals = on_vals
365        else:
366            vals = off_vals
367
368        return trysrc_val in vals
369
370    def soft_reset(self):
371        """Initates a PD soft reset sequence
372
373        To verify that a soft reset sequence was initiated, the
374        reply message is checked to verify that the reset command
375        was acknowledged by its port pair. The connect state should
376        be same as it was prior to issuing the reset command.
377
378        @returns True if the port pair acknowledges the the reset message
379        and if following the command, the device returns to the same
380        connected state. False otherwise.
381        """
382        RESET_DELAY = 0.5
383        cmd = 'pd %d soft' % self.port
384        state_before = self.utils.get_pd_state(self.port)
385        reply = self.utils.send_pd_command_get_reply_msg(cmd)
386        if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
387            return False
388        time.sleep(RESET_DELAY)
389        state_after = self.utils.get_pd_state(self.port)
390        return state_before == state_after
391
392    def hard_reset(self):
393        """Initates a PD hard reset sequence
394
395        To verify that a hard reset sequence was initiated, the
396        console ouput is scanned for HARD RST TX. In addition, the connect
397        state should be same as it was prior to issuing the reset command.
398
399        @returns True if the port pair acknowledges that hard reset was
400        initiated and if following the command, the device returns to the same
401        connected state. False otherwise.
402        """
403        RESET_DELAY = 1.0
404        cmd = 'pd %d hard' % self.port
405        state_before = self.utils.get_pd_state(self.port)
406        self.utils.enable_pd_console_debug()
407        try:
408            tcpmv1_pattern = '.*(HARD\sRST\sTX)'
409            tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)'
410            pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern))
411            self.utils.send_pd_command_get_output(cmd, [pattern])
412        except error.TestFail:
413            logging.warn('HARD RST TX not found')
414            return False
415        finally:
416            self.utils.disable_pd_console_debug()
417
418        time.sleep(RESET_DELAY)
419        state_after = self.utils.get_pd_state(self.port)
420        return state_before == state_after
421
422    def pr_swap(self):
423        """Attempts a power role swap
424
425        In order to attempt a power role swap the device must be
426        connected and support dualrole mode. Once these two criteria
427        are checked a power role command is issued. Following a delay
428        to allow for a reconnection the new power role is checked
429        against the power role prior to issuing the command.
430
431        @returns True if the device has swapped power roles, False otherwise.
432        """
433        # Get starting state
434        if not self.is_drp():
435            logging.warn('Dualrole Mode not enabled!')
436            return False
437        if self.is_connected() == False:
438            logging.warn('PD contract not established!')
439            return False
440        current_pr = self.utils.get_pd_state(self.port)
441        swap_cmd = 'pd %d swap power' % self.port
442        self.utils.send_pd_command(swap_cmd)
443        time.sleep(self.utils.CONNECT_TIME)
444        new_pr = self.utils.get_pd_state(self.port)
445        logging.info('Power swap: %s -> %s', current_pr, new_pr)
446        if self.is_connected() == False:
447            logging.warn('Device not connected following PR swap attempt.')
448            return False
449        return current_pr != new_pr
450
451
452class PDTesterDevice(PDConsoleDevice):
453    """Class for PDTester devices
454
455    This class contains methods for PD funtions which are unique to the
456    PDTester board, e.g. Plankton or Servo v4. It inherits all the methods
457    for PD console devices.
458    """
459
460    def __init__(self, console, port, utils):
461        """Initialization method
462
463        @param console: UART console for this device
464        @param port: USB PD port number
465        """
466        # Instantiate the PD console object
467        super(PDTesterDevice, self).__init__(console, port, utils)
468        # Indicate this is PDTester device
469        self.is_pdtester = True
470
471    def _toggle_pdtester_drp(self):
472        """Issue 'usbc_action drp' PDTester command
473
474        @returns value of drp_enable in PDTester FW
475        """
476        drp_cmd = 'usbc_action drp'
477        drp_re = ['DRP\s=\s(\d)']
478        # Send DRP toggle command to PDTester and get value of 'drp_enable'
479        m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
480        return int(m[0][1])
481
482    def _enable_pdtester_drp(self):
483        """Enable DRP mode on PDTester
484
485        DRP mode can only be toggled and is not able to be explicitly
486        enabled/disabled via the console. Therefore, this method will
487        toggle DRP mode until the console reply indicates that this
488        mode is enabled. The toggle happens a maximum of two times
489        in case this is called when it's already enabled.
490
491        @returns True when DRP mode is enabled, False if not successful
492        """
493        for attempt in range(2):
494            if self._toggle_pdtester_drp() == True:
495                logging.info('PDTester DRP mode enabled')
496                return True
497        logging.error('PDTester DRP mode set failure')
498        return False
499
500    def _verify_state_sequence(self, states_list, console_log):
501        """Compare PD state transitions to expected values
502
503        @param states_list: list of expected PD state transitions
504        @param console_log: console output which contains state names
505
506        @returns True if the sequence matches, False otherwise
507        """
508        # For each state in the expected state transiton table, build
509        # the regexp and search for it in the state transition log.
510        for state in states_list:
511            state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port,
512                                                         state)
513            if re.search(state_regx, console_log) is None:
514                return False
515        return True
516
517    def cc_disconnect_connect(self, disc_time_sec):
518        """Disconnect/reconnect using PDTester
519
520        PDTester supports a feature which simulates a USB Type C disconnect
521        and reconnect.
522
523        @param disc_time_sec: Time in seconds for disconnect period.
524        """
525        DISC_DELAY = 100
526        disc_cmd = 'fakedisconnect %d  %d' % (DISC_DELAY,
527                                              disc_time_sec * 1000)
528        self.utils.send_pd_command(disc_cmd)
529
530    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
531        """Get the connected state after disconnect/reconnect using PDTester
532
533        PDTester supports a feature which simulates a USB Type C disconnect
534        and reconnect. It returns the first connected state (either source or
535        sink) after reconnect.
536
537        @param disc_time_sec: Time in seconds for disconnect period.
538        @returns: The connected PD state.
539        """
540        DISC_DELAY = 100
541        disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000)
542        src_connected_tuple = self.utils.get_src_connect_states()
543        snk_connected_tuple = self.utils.get_snk_connect_states()
544        connected_exp = '|'.join(src_connected_tuple + snk_connected_tuple)
545        reply_exp = ['(.*)(C%d)\s+[\w]+:?\s(%s)' % (self.port, connected_exp)]
546        m = self.utils.send_pd_command_get_output(disc_cmd, reply_exp)
547        return m[0][3]
548
549    def drp_disconnect_connect(self, disc_time_sec):
550        """Disconnect/reconnect using PDTester
551
552        Utilize PDTester disconnect/connect utility and verify
553        that both disconnect and reconnect actions were successful.
554
555        @param disc_time_sec: Time in seconds for disconnect period.
556
557        @returns True if device disconnects, then returns to a connected
558        state. False if either step fails.
559        """
560        self.cc_disconnect_connect(disc_time_sec)
561        time.sleep(disc_time_sec / 2)
562        disconnect = self.is_disconnected()
563        time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME)
564        connect = self.is_connected()
565        return disconnect and connect
566
567    def drp_set(self, mode):
568        """Sets dualrole mode
569
570        @param mode: desired dual role setting (on, off, snk, src)
571
572        @returns True if dualrole mode matches the requested value or
573        is successfully set to that value. False, otherwise.
574        """
575        # Get current value of dualrole
576        drp = self.utils.get_pd_dualrole(self.port)
577        if drp == mode:
578            return True
579
580        if mode == 'on':
581            # Setting dpr_enable on PDTester will set dualrole mode to on
582            return self._enable_pdtester_drp()
583        else:
584            # If desired setting is other than 'on', need to ensure that
585            # drp mode on PDTester is disabled.
586            if drp == 'on':
587                # This will turn off drp_enable flag and set dualmode to 'off'
588                return self._toggle_pdtester_drp()
589            # With drp_enable flag off, can set to desired setting
590            return self.utils.set_pd_dualrole(self.port, mode)
591
592    def _reset(self, cmd, states_list):
593        """Initates a PD reset sequence
594
595        PDTester device has state names available on the console. When
596        a soft reset is issued the console log is extracted and then
597        compared against the expected state transisitons.
598
599        @param cmd: reset type (soft or hard)
600        @param states_list: list of expected PD state transitions
601
602        @returns True if state transitions match, False otherwise
603        """
604        # Want to grab all output until either SRC_READY or SNK_READY
605        reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port]
606        m = self.utils.send_pd_command_get_output(cmd, reply_exp)
607        return self._verify_state_sequence(states_list, m[0][0])
608
609    def soft_reset(self):
610        """Initates a PD soft reset sequence
611
612        @returns True if state transitions match, False otherwise
613        """
614        snk_reset_states = [
615            'SOFT_RESET',
616            'SNK_DISCOVERY',
617            'SNK_REQUESTED',
618            'SNK_TRANSITION',
619            'SNK_READY'
620        ]
621
622        src_reset_states = [
623            'SOFT_RESET',
624            'SRC_DISCOVERY',
625            'SRC_NEGOCIATE',
626            'SRC_ACCEPTED',
627            'SRC_POWERED',
628            'SRC_TRANSITION',
629            'SRC_READY'
630        ]
631
632        if self.is_src():
633            states_list = src_reset_states
634        elif self.is_snk():
635            states_list = snk_reset_states
636        else:
637            raise error.TestFail('Port Pair not in a connected state')
638
639        cmd = 'pd %d soft' % self.port
640        return self._reset(cmd, states_list)
641
642    def hard_reset(self):
643        """Initates a PD hard reset sequence
644
645        @returns True if state transitions match, False otherwise
646        """
647        snk_reset_states = [
648            'HARD_RESET_SEND',
649            'HARD_RESET_EXECUTE',
650            'SNK_HARD_RESET_RECOVER',
651            'SNK_DISCOVERY',
652            'SNK_REQUESTED',
653            'SNK_TRANSITION',
654            'SNK_READY'
655        ]
656
657        src_reset_states = [
658            'HARD_RESET_SEND',
659            'HARD_RESET_EXECUTE',
660            'SRC_HARD_RESET_RECOVER',
661            'SRC_DISCOVERY',
662            'SRC_NEGOCIATE',
663            'SRC_ACCEPTED',
664            'SRC_POWERED',
665            'SRC_TRANSITION',
666            'SRC_READY'
667        ]
668
669        if self.is_src():
670            states_list = src_reset_states
671        elif self.is_snk():
672            states_list = snk_reset_states
673        else:
674            raise error.TestFail('Port Pair not in a connected state')
675
676        cmd = 'pd %d hard' % self.port
677        return self._reset(cmd, states_list)
678
679
680class PDPortPartner(object):
681    """Methods used to instantiate PD device objects
682
683    This class is initalized with a list of servo consoles. It
684    contains methods to determine if USB PD devices are accessible
685    via the consoles and attempts to determine USB PD port partners.
686    A PD device is USB PD port specific, a single console may access
687    multiple PD devices.
688
689    """
690
691    def __init__(self, consoles):
692        """Initialization method
693
694        @param consoles: list of servo consoles
695        """
696        self.consoles = consoles
697
698    def __repr__(self):
699        """String representation of the object"""
700        return "<%s %r>" % (self.__class__.__name__, self.consoles)
701
702    def _send_pd_state(self, port, console):
703        """Tests if PD device exists on a given port number
704
705        @param port: USB PD port number to try
706        @param console: servo UART console
707
708        @returns True if 'pd <port> state' command gives a valid
709        response, False otherwise
710        """
711        cmd = 'pd %d state' % port
712        regex = r'(Port C\d)|(Parameter)'
713        m = console.send_command_get_output(cmd, [regex])
714        # If PD port exists, then output will be Port C0 or C1
715        regex = r'Port C{0}'.format(port)
716        if re.search(regex, m[0][0]):
717            return True
718        return False
719
720    def _find_num_pd_ports(self, console):
721        """Determine number of PD ports for a given console
722
723        @param console: uart console accssed via servo
724
725        @returns: number of PD ports accessible via console
726        """
727        MAX_PORTS = 2
728        num_ports = 0
729        for port in range(MAX_PORTS):
730            if self._send_pd_state(port, console):
731                num_ports += 1
732        return num_ports
733
734    def _is_pd_console(self, console):
735        """Check if pd option exists in console
736
737        @param console: uart console accssed via servo
738
739        @returns: True if 'pd' is found, False otherwise
740        """
741        try:
742            m = console.send_command_get_output('help', [r'(pd)\s+'])
743            return True
744        except error.TestFail:
745            return False
746
747    def _is_pdtester_console(self, console):
748        """Check for PDTester console
749
750        This method looks for a console command option 'usbc_action' which
751        is unique to PDTester PD devices.
752
753        @param console: uart console accssed via servo
754
755        @returns True if usbc_action command is present, False otherwise
756        """
757        try:
758            m = console.send_command_get_output('help', [r'(usbc_action)'])
759            return True
760        except error.TestFail:
761            return False
762
763    def _check_port_pair(self, port1, port2):
764        """Check if two PD devices could be connected
765
766        If two USB PD devices are connected, then they should be in
767        either the SRC_READY or SNK_READY states and have opposite
768        power roles. In addition, they must be on different servo
769        consoles.
770
771        @param: list of two possible PD port parters
772
773        @returns True if not the same console and both PD devices
774        are a plausible pair based only on their PD states.
775        """
776        # Don't test if on the same servo console
777        if port1.console == port2.console:
778            logging.info("PD Devices are on same platform -> can't be a pair")
779            return False
780
781        state1 = port1.get_pd_state()
782        port1_is_snk = port1.is_snk(state1)
783        port1_is_src = port1.is_src(state1)
784
785        state2 = port2.get_pd_state()
786        port2_is_snk = port2.is_snk(state2)
787        port2_is_src = port2.is_src(state2)
788
789        # Must be SRC <--> SNK or SNK <--> SRC
790        if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src):
791            logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s",
792                          port1, state1, state2, port2)
793            return True
794        else:
795            logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s",
796                          port1, state1, state2, port2)
797            return False
798
799    def _verify_pdtester_connection(self, tester_port, dut_port):
800        """Verify DUT to PDTester PD connection
801
802        This method checks for a PDTester PD connection for the
803        given port by first verifying if a PD connection is present.
804        If found, then it uses a PDTester feature to force a PD disconnect.
805        If the port is no longer in the connected state, and following
806        a delay, is found to be back in the connected state, then
807        a DUT pd to PDTester connection is verified.
808
809        @param dev_pair: list of two PD devices
810
811        @returns True if DUT to PDTester pd connection is verified
812        """
813        DISC_CHECK_TIME = 10
814        DISC_WAIT_TIME = 20
815        CONNECT_TIME = 4
816
817        logging.info("Check: %s <--> %s", tester_port, dut_port)
818
819        if not self._check_port_pair(tester_port, dut_port):
820            return False
821
822        # Force PD disconnect
823        logging.debug('Disconnecting to check if devices are partners')
824        tester_port.cc_disconnect_connect(DISC_WAIT_TIME)
825        time.sleep(DISC_CHECK_TIME)
826
827        # Verify that both devices are now disconnected
828        tester_state = tester_port.get_pd_state()
829        dut_state = dut_port.get_pd_state()
830        logging.debug("Recheck: %s (%s) <--> (%s) %s",
831                      tester_port, tester_state, dut_state, dut_port)
832
833        if not (tester_port.is_disconnected(tester_state) and
834                dut_port.is_disconnected(dut_state)):
835            logging.info("Ports did not disconnect at the same time, so"
836                         " they aren't considered a pair.")
837            # Delay to allow non-pair devices to reconnect
838            time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
839            return False
840
841        logging.debug('Pair disconnected.  Waiting for reconnect...')
842
843        # Allow enough time for reconnection
844        time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
845        if self._check_port_pair(tester_port, dut_port):
846            # Have verified a pd disconnect/reconnect sequence
847            logging.info('PDTester <--> DUT pair found')
848            return True
849
850        logging.info("Ports did not reconnect at the same time, so"
851                     " they aren't considered a pair.")
852        return False
853
854    def identify_pd_devices(self):
855        """Instantiate PD devices present in test setup
856
857        @return: list of 2 PD devices if a DUT <-> PDTester found.
858                 If not found, then returns an empty list.
859        """
860        tester_devports = []
861        dut_devports = []
862
863        # For each possible uart console, check to see if a PD console
864        # is present and determine the number of PD ports.
865        for console in self.consoles:
866            if self._is_pd_console(console):
867                is_tester = self._is_pdtester_console(console)
868                num_ports = self._find_num_pd_ports(console)
869                # For each PD port that can be accessed via the console,
870                # instantiate either PDConsole or PDTester device.
871                for port in range(num_ports):
872                    if is_tester:
873                        logging.info('PDTesterDevice on %s port %d',
874                                     console.name, port)
875                        tester_utils = pd_console.create_pd_console_utils(
876                                       console)
877                        tester_devports.append(PDTesterDevice(console,
878                                                    port, tester_utils))
879                    else:
880                        logging.info('PDConsoleDevice on %s port %d',
881                                     console.name, port)
882                        dut_utils = pd_console.create_pd_console_utils(console)
883                        dut_devports.append(PDConsoleDevice(console,
884                                                    port, dut_utils))
885
886        if not tester_devports:
887            logging.error('The specified consoles did not include any'
888                          ' PD testers: %s', self.consoles)
889
890        if not dut_devports:
891            logging.error('The specified consoles did not contain any'
892                          ' DUTs: %s', self.consoles)
893
894        # Determine PD port partners in the list of PD devices. Note that
895        # there can be PD devices which are not accessible via a uart console,
896        # but are connected to a PD port which is accessible.
897        for tester in reversed(tester_devports):
898            for dut in dut_devports:
899                if tester.console == dut.console:
900                    # PD Devices are on same servo console -> can't be a pair
901                    continue
902                if self._verify_pdtester_connection(tester, dut):
903                    dut_devports.remove(dut)
904                    return [tester, dut]
905
906        return []
907