1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
11from autotest_lib.server.cros.servo import pd_console
12
13
14class firmware_PDDataSwap(FirmwareTest):
15    """
16    Servo based USB PD data role swap test
17
18    Pass critera is all data role swaps complete, or
19    a reject control message is received from the DUT in the
20    cases where the swap does not complete.
21
22    """
23    version = 1
24
25    PD_ROLE_DELAY = 0.5
26    PD_CONNECT_DELAY = 4
27    PLANKTON_PORT = 0
28    DATA_SWAP_ITERATIONS = 10
29    # Upward facing port data role
30    UFP = 'UFP'
31    # Downward facing port data role
32    DFP = 'DFP'
33    # Plankton initiated data swap request
34    PLANKTON_SWAP_REQ = 'pd %d swap data' % PLANKTON_PORT
35    # Swap Result Tables
36    swap_attempt = {
37        ('rx', DFP): 0,
38        ('rx', UFP): 0,
39        ('tx', DFP): 0,
40        ('tx', UFP): 0
41    }
42    swap_failure = {
43        ('rx', DFP): 0,
44        ('rx', UFP): 0,
45        ('tx', DFP): 0,
46        ('tx', UFP): 0
47    }
48
49    def _verify_plankton_connection(self, port):
50        """Verify if DUT to Plankton PD connection
51
52        This method checks for a Plankton PD connection for the
53        given port by first verifying if a PD connection is present.
54        If found, then it uses a Plankton feature to force a PD disconnect.
55        If the port is no longer in the connected state, and following
56        a delay, is found to be back in the connected state, then
57        a DUT pd to Plankton connection is verified.
58
59        @param port: DUT pd port to test
60
61        @returns True if DUT to Plankton pd connection is verified
62        """
63        DISCONNECT_TIME_SEC = 2
64        # plankton console command to force PD disconnect
65        disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC*1000)
66        # Only check for Plankton if DUT has active PD connection
67        if self.dut_pd_utils.is_pd_connected(port):
68            # Attempt to force PD disconnection
69            self.plankton_pd_utils.send_pd_command(disc_cmd)
70            time.sleep(self.PD_ROLE_DELAY)
71            # Verify that DUT PD port is no longer connected
72            if self.dut_pd_utils.is_pd_connected(port) == False:
73                # Wait for disconnect timer and give time to reconnect
74                time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
75                if self.dut_pd_utils.is_pd_connected(port):
76                    logging.info('Plankton connection verfied on port %d', port)
77                    return True
78            else:
79                # Could have disconnected other port, allow it to reconnect
80                # before exiting.
81                time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
82        return False
83
84    def _find_dut_to_plankton_connection(self):
85        """Find the PD port which is connected to Plankton
86
87        @returns DUT pd port number if found, None otherwise
88        """
89        for port in xrange(self.dut_pd_utils.PD_MAX_PORTS):
90            # Check for DUT to Plankton connection on port
91            if self._verify_plankton_connection(port):
92                # Plankton PD connection found so exit
93                return port
94        return None
95
96    def _get_data_role(self, console, port):
97        """Get data role of PD connection
98
99        @param console: pd console object for uart access
100        @param port: 0/1 pd port of current connection
101
102        @returns: 'DFP' or 'UFP'
103        """
104        role = console.get_pd_role(port)
105        m = re.search('[\w]+-([\w]+)', role)
106        return m.group(1)
107
108    def _get_remote_role(self, local_role):
109        """Invert data role
110
111        @param local_role: data role to be flipped
112
113        @returns: flipped data role value
114        """
115        if local_role == self.DFP:
116            return self.UFP
117        else:
118            return self.DFP
119
120    def _change_dut_power_role(self, port):
121        """Force power role change via Plankton
122
123        @param port: port of DUT PD connection
124
125        @returns True is power role change is successful
126        """
127        PLANKTON_SRC_VOLTAGE = 5
128        PLANKTON_SNK_VOLTAGE = 0
129        pd_state = self.dut_pd_utils.get_pd_state(port)
130        if pd_state == self.dut_pd_utils.SRC_CONNECT:
131            # DUT is currently a SRC, so change to SNK
132            # Use Plankton method to ensure power role change
133            self.plankton.charge(PLANKTON_SRC_VOLTAGE)
134        else:
135            # DUT is currently a SNK, so change it to a SRC.
136            self.plankton.charge(PLANKTON_SNK_VOLTAGE)
137        # Wait for change to take place
138        time.sleep(self.PD_CONNECT_DELAY)
139        plankton_state = self.plankton_pd_utils.get_pd_state(0)
140        # Current Plankton state should equal DUT state when called
141        return bool(pd_state == plankton_state)
142
143    def _send_data_swap_get_reply(self, console, port):
144        """Send data swap request, get PD control msg reply
145
146        The PD console debug mode is enabled prior to sending
147        a pd data role swap request message. This allows the
148        control message reply to be extracted. The debug mode
149        is disabled prior to exiting.
150
151        @param console: pd console object for uart access
152
153        @ returns: PD control header message
154        """
155        # Enable PD console debug mode to show control messages
156        console.enable_pd_console_debug()
157        cmd = 'pd %d swap data' % port
158        m = console.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
159        ctrl_msg = int(m[0][1], 16) & console.PD_CONTROL_MSG_MASK
160        console.disable_pd_console_debug()
161        return ctrl_msg
162
163    def _attempt_data_swap(self, pd_port, direction):
164        """Perform a data role swap request
165
166        Data swap requests can be either initiated by the DUT or received
167        by the DUT. This direction determines which PD console is used
168        to initiate the swap command. The data role before and after
169        the swap command are compared to determine if it took place.
170
171        Even if data swap capability is advertised, a PD device is allowed
172        to reject the request. Therefore, not swapping isn't itself a
173        failure. When Plankton is used to initate the request, the debug
174        mode is enabled which allows the control message from the DUT to
175        be analyzed. If the swap does not occur, but the request is rejected
176        by the DUT then that is not counted as a failure.
177
178        @param pd_port: DUT pd port value 0/1
179        @param direction: rx or tx from the DUT perspective
180
181        @returns PD control reply message for tx swaps, 0 otherwise
182        """
183        # Get starting DUT data role
184        dut_dr = self._get_data_role(self.dut_pd_utils, pd_port)
185        self.swap_attempt[(direction, dut_dr)] += 1
186        if direction == 'tx':
187            # Initiate swap request from the DUT
188            console = self.dut_pd_utils
189            cmd = 'pd %d swap data' % pd_port
190            # Send the 'swap data' command
191            self.dut_pd_utils.send_pd_command(cmd)
192            # Not using debug mode, so there is no reply message
193            ctrl = 0
194        else:
195            # Initiate swap request from Plankton
196            console = self.plankton_pd_utils
197            ctrl  = self._send_data_swap_get_reply(console, self.PLANKTON_PORT)
198
199        time.sleep(self.PD_ROLE_DELAY)
200        # Get DUT current data role
201        swap_dr = self._get_data_role(self.dut_pd_utils, pd_port)
202        logging.info('%s swap attempt: prev = %s, new = %s, msg = %s',
203                      direction, dut_dr, swap_dr, ctrl)
204        if (dut_dr == swap_dr and
205                ctrl != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
206            self.swap_failure[(direction, dut_dr)] += 1
207        return ctrl
208
209    def _execute_data_role_swap_test(self, pd_port):
210        """Execute a series of data role swaps
211
212        Attempt both rx and tx data swaps, from perspective of DUT.
213        Even if the DUT advertises support, it can
214        reject swap requests when already in the desired data role. For
215        example many devices will not swap if already in DFP mode.
216        However, Plankton should always accept a request. Therefore,
217        when a swap failed on a rx swap, then that is followed by
218        a tx swap attempt.
219
220        @param pd_port: port number of DUT PD connection
221        """
222        for attempt in xrange(self.DATA_SWAP_ITERATIONS):
223            # Use the same direction for every 2 loop iterations
224            if attempt & 2:
225                direction = 'tx'
226            else:
227                direction = 'rx'
228            ctrl_msg = self._attempt_data_swap(pd_port, direction)
229            if (direction == 'rx' and
230                    ctrl_msg ==
231                    self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
232                # Use plankton initated swap to change roles
233                self._attempt_data_swap(pd_port, 'tx')
234
235    def _test_data_swap_reject(self, pd_port):
236        """Verify that data swap request is rejected
237
238        This tests the case where the DUT doesn't advertise support
239        for data swaps. A data request is sent by Plankton, and then
240        the control message checked to ensure the request was rejected.
241        In addition, the data role and connection state are verified
242        to remain unchanged.
243
244        @param pd_port: port for DUT pd connection
245        """
246        # Get current DUT data role
247        dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
248        dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
249        # Send swap command from Plankton and get reply
250        ctrl_msg = self._send_data_swap_get_reply(self.plankton_pd_utils,
251                                                  self.PLANKTON_PORT)
252        if ctrl_msg != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']:
253            raise error.TestFail('Data Swap Req not rejected, returned %r' %
254                                 ctrl_msg)
255        # Get DUT current state
256        pd_state = self.dut_pd_utils.get_pd_state(pd_port)
257        if pd_state != dut_connect_state:
258            raise error.TestFail('PD not connected! pd_state = %r' %
259                                 pd_state)
260        # Since reject message was received, verify data role didn't change
261        curr_dr = self._get_data_role(self.dut_pd_utils, pd_port)
262        if curr_dr != dut_data_role:
263            raise error.TestFail('Unexpected PD data role change')
264
265    def initialize(self, host, cmdline_args):
266        super(firmware_PDDataSwap, self).initialize(host, cmdline_args)
267        # Only run in normal mode
268        self.switcher.setup_mode('normal')
269        self.usbpd.send_command('chan 0')
270
271    def cleanup(self):
272        self.usbpd.send_command('chan 0xffffffff')
273        super(firmware_PDDataSwap, self).cleanup()
274
275    def run_once(self):
276        """Exectue Data Role swap test.
277
278        1. Verify that pd console is accessible
279        2. Verify that DUT has a valid PD contract
280        3. Determine if DUT advertises support for data swaps
281        4. Test DUT initiated and received data swaps
282        5. Swap power roles if supported
283        6. Repeat DUT received data swap requests
284
285        """
286        # create objects for pd utilities
287        self.dut_pd_utils = pd_console.PDConsoleUtils(self.usbpd)
288        self.plankton_pd_utils = pd_console.PDConsoleUtils(self.plankton)
289
290        # Make sure PD support exists in the UART console
291        if self.dut_pd_utils.verify_pd_console() == False:
292            raise error.TestFail("pd command not present on console!")
293
294        # Type C connection (PD contract) should exist at this point
295        # For this test, the DUT must be connected to a Plankton.
296        pd_port = self._find_dut_to_plankton_connection()
297        if pd_port == None:
298            raise error.TestFail("DUT to Plankton PD connection not found")
299        dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
300        logging.info('Initial DUT connect state = %s', dut_connect_state)
301
302        # Determine if DUT supports data role swaps
303        dr_swap_allowed = self.plankton_pd_utils.is_pd_flag_set(
304                self.PLANKTON_PORT, 'data_swap')
305        # Get current DUT data role
306        dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
307        logging.info('Starting DUT Data Role = %r', dut_data_role)
308
309        # If data swaps are not allowed on the DUT, then still
310        # attempt a data swap and verify that the request is
311        # rejected by the DUT and that it remains connected and
312        # in the same role.
313        if dr_swap_allowed == False:
314            logging.info('Data Swap support not advertised by DUT')
315            self._test_data_swap_reject(pd_port)
316            logging.info('Data Swap request rejected by DUT as expected')
317        else:
318            # Data role swap support advertised, test this feature.
319            self._execute_data_role_swap_test(pd_port)
320
321            # If DUT supports Power Role swap then attempt to change roles.
322            # This way, data role swaps will be tested in both configurations.
323            if self.plankton_pd_utils.is_pd_flag_set(
324                     self.PLANKTON_PORT, 'power_swap'):
325                logging.info('\nDUT advertises Power Swap Support')
326                # Attempt to swap power roles
327                power_swap = self._change_dut_power_role(pd_port)
328                if power_swap:
329                    self._execute_data_role_swap_test(pd_port)
330                else:
331                    logging.warn('Power swap not successful!')
332                    logging.warn('Only tested with DUT in %s state',
333                                 dut_connect_state)
334            else:
335                logging.info('DUT does not advertise power swap support')
336
337            logging.info('***************** Swap Results ********************')
338            total_attempts = 0
339            total_failures = 0
340            for direction, role in self.swap_attempt.iterkeys():
341                logging.info('%s %s swap attempts = %d, failures = %d',
342                             direction, role,
343                             self.swap_attempt[(direction, role)],
344                             self.swap_failure[(direction, role)])
345                total_attempts += self.swap_attempt[(direction, role)]
346                total_failures += self.swap_failure[(direction, role)]
347
348            # If any swap attempts were not successful, flag test as failure
349            if total_failures:
350                raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' %
351                                 (total_attempts, total_failures))
352