1# Lint as: python2, python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import functools
11import logging
12import pprint
13import re
14import six
15from six.moves import range
16import time
17
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.common_lib.cros import cr50_utils
21from autotest_lib.server.cros.servo import chrome_ec
22from autotest_lib.server.cros.servo import servo
23
24
25def dts_control_command(func):
26    """For methods that should only run when dts mode control is supported."""
27    @functools.wraps(func)
28    def wrapper(instance, *args, **kwargs):
29        """Ignore those functions if dts mode control is not supported."""
30        if instance._servo.dts_mode_is_valid():
31            return func(instance, *args, **kwargs)
32        logging.info('Servo setup does not support DTS mode. ignoring %s',
33                     func.__name__)
34    return wrapper
35
36
37class ChromeCr50(chrome_ec.ChromeConsole):
38    """Manages control of a Chrome Cr50.
39
40    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
41    provides many interfaces to set and get its behavior via console commands.
42    This class is to abstract these interfaces.
43    """
44    PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d']
45    PROD_RO_KEYIDS = ['0xaa66150f']
46    OPEN = 'open'
47    UNLOCK = 'unlock'
48    LOCK = 'lock'
49    # The amount of time you need to show physical presence.
50    PP_SHORT = 15
51    PP_LONG = 300
52    CCD_PASSWORD_RATE_LIMIT = 3
53    IDLE_COUNT = 'count: (\d+)\s'
54    SHORT_WAIT = 3
55    # The version has four groups: the partition, the header version, debug
56    # descriptor and then version string.
57    # There are two partitions A and B. The active partition is marked with a
58    # '*'. If it is a debug image '/DBG' is added to the version string. If the
59    # image has been corrupted, the version information will be replaced with
60    # 'Error'.
61    # So the output may look something like this.
62    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
63    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
64    # Or like this if the region was corrupted.
65    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
66    #   RW_B:    Error
67    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
68    INACTIVE_VERSION = VERSION_FORMAT % ''
69    ACTIVE_VERSION = VERSION_FORMAT % '\*'
70    # Following lines of the version output may print the image board id
71    # information. eg.
72    # BID A:   5a5a4146:ffffffff:00007f00 Yes
73    # BID B:   00000000:00000000:00000000 Yes
74    # Use the first group from ACTIVE_VERSION to match the active board id
75    # partition.
76    BID_ERROR = 'read_board_id: failed'
77    BID_FORMAT = ':\s+[a-f0-9:]{26} '
78    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
79            BID_ERROR)
80    WAKE_CHAR = '\n\n\n\n'
81    WAKE_RESPONSE = ['(>|Console is enabled)']
82    START_UNLOCK_TIMEOUT = 20
83    GETTIME = ['= (\S+)']
84    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
85    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
86    MAX_RETRY_COUNT = 5
87    CCDSTATE_MAX_RETRY_COUNT = 20
88    START_STR = ['((Havn|UART).*Console is enabled;)']
89    REBOOT_DELAY_WITH_CCD = 60
90    REBOOT_DELAY_WITH_FLEX = 3
91    ON_STRINGS = ['enable', 'enabled', 'on']
92    CONSERVATIVE_CCD_WAIT = 10
93    CCD_SHORT_PRESSES = 5
94    CAP_IS_ACCESSIBLE = 0
95    CAP_SETTING = 1
96    CAP_REQ = 2
97    GET_CAP_TRIES = 20
98    # Regex to match the valid capability settings.
99    CAP_STATES = '(Always|Default|IfOpened|UnlessLocked)'
100    # List of all cr50 ccd capabilities. Same order of 'ccd' output
101    CAP_NAMES = [
102        'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx',
103        'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole',
104        'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe', 'OpenNoLongPP',
105        'BatteryBypassPP', 'UpdateNoTPMWipe', 'I2C', 'FlashRead',
106        'OpenNoDevMode', 'OpenFromUSB', 'OverrideBatt'
107    ]
108    # There are two capability formats. Match both.
109    #  UartGscRxECTx   Y 3=IfOpened
110    #  or
111    #  UartGscRxECTx   Y 0=Default (Always)
112    # Make sure the last word is at the end of the line. The next line will
113    # start with some whitespace, so account for that too.
114    CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES,
115                                                          CAP_STATES)
116    # Be as specific as possible with the 'ccd' output, so the test will notice
117    # missing characters and retry getting the output. Name each group, so the
118    # test can extract the field information into a dictionary.
119    # CCD_FIELDS is used to order the regex when searching for multiple fields
120    CCD_FIELDS = ['State', 'Password', 'Flags', 'Capabilities', 'TPM']
121    # CCD_FORMAT has the field names as keys and the expected output as the
122    # value.
123    CCD_FORMAT = {
124        'State' : '(State: (?P<State>Opened|Locked|Unlocked))',
125        'Password' : '(Password: (?P<Password>set|none))',
126        'Flags' : '(Flags: (?P<Flags>\S*))',
127        'Capabilities' : '(Capabilities:.*(?P<Capabilities>%s))' %
128                         (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT),
129        'TPM' : '(TPM:(?P<TPM>[ \S]*)\r)',
130    }
131
132    # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h
133    BOARD_PROP = {
134           'BOARD_SLAVE_CONFIG_SPI'      : 1 << 0,
135           'BOARD_SLAVE_CONFIG_I2C'      : 1 << 1,
136           'BOARD_NEEDS_SYS_RST_PULL_UP' : 1 << 5,
137           'BOARD_USE_PLT_RESET'         : 1 << 6,
138           'BOARD_WP_ASSERTED'           : 1 << 8,
139           'BOARD_FORCING_WP'            : 1 << 9,
140           'BOARD_NO_RO_UART'            : 1 << 10,
141           'BOARD_CCD_STATE_MASK'        : 3 << 11,
142           'BOARD_DEEP_SLEEP_DISABLED'   : 1 << 13,
143           'BOARD_DETECT_AP_WITH_UART'   : 1 << 14,
144           'BOARD_ITE_EC_SYNC_NEEDED'    : 1 << 15,
145           'BOARD_WP_DISABLE_DELAY'      : 1 << 16,
146           'BOARD_CLOSED_SOURCE_SET1'    : 1 << 17,
147           'BOARD_CLOSED_LOOP_RESET'     : 1 << 18,
148           'BOARD_NO_INA_SUPPORT'        : 1 << 19,
149           'BOARD_ALLOW_CHANGE_TPM_MODE' : 1 << 20,
150           'BOARD_EC_CR50_COMM_SUPPORT'  : 1 << 21,
151           'BOARD_CCD_REC_LID_PIN_DIOA1' : 0x01 << 22,
152           'BOARD_CCD_REC_LID_PIN_DIOA9' : 0x02 << 22,
153           'BOARD_CCD_REC_LID_PIN_DIOA12': 0x03 << 22,
154    }
155
156    # CR50 reset flags as defined in platform ec_commands.h. These are only the
157    # flags used by cr50.
158    RESET_FLAGS = {
159           'RESET_FLAG_OTHER'            : 1 << 0,
160           'RESET_FLAG_BROWNOUT'         : 1 << 2,
161           'RESET_FLAG_POWER_ON'         : 1 << 3,
162           'RESET_FLAG_SOFT'             : 1 << 5,
163           'RESET_FLAG_HIBERNATE'        : 1 << 6,
164           'RESET_FLAG_RTC_ALARM'        : 3 << 7,
165           'RESET_FLAG_WAKE_PIN'         : 1 << 8,
166           'RESET_FLAG_HARD'             : 1 << 11,
167           'RESET_FLAG_USB_RESUME'       : 1 << 14,
168           'RESET_FLAG_RDD'              : 1 << 15,
169           'RESET_FLAG_RBOX'             : 1 << 16,
170           'RESET_FLAG_SECURITY'         : 1 << 17,
171    }
172
173    def __init__(self, servo, faft_config):
174        """Initializes a ChromeCr50 object.
175
176        @param servo: A servo object.
177        @param faft_config: A faft config object.
178        """
179        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
180        self.faft_config = faft_config
181
182
183    def wake_cr50(self):
184        """Wake up cr50 by sending some linebreaks and wait for the response"""
185        for i in range(self.MAX_RETRY_COUNT):
186            try:
187                rv = super(ChromeCr50, self).send_command_get_output(
188                        self.WAKE_CHAR, self.WAKE_RESPONSE)
189                logging.debug('wake result %r', rv)
190                return
191            except servo.ResponsiveConsoleError as e:
192                logging.info("Console responsive, but couldn't match wake "
193                             "response %s", e)
194        raise servo.ResponsiveConsoleError('Unable to wake cr50')
195
196
197    def send_command(self, commands):
198        """Send command through UART.
199
200        Cr50 will drop characters input to the UART when it resumes from sleep.
201        If servo is not using ccd, send some dummy characters before sending the
202        real command to make sure cr50 is awake.
203
204        @param commands: the command string to send to cr50
205        """
206        if self._servo.main_device_is_flex():
207            self.wake_cr50()
208        super(ChromeCr50, self).send_command(commands)
209
210
211    def set_cap(self, cap, setting):
212        """Set the capability to setting
213
214        @param cap: The capability string
215        @param setting: The setting to set the capability to.
216        """
217        self.set_caps({ cap : setting })
218
219
220    def set_caps(self, cap_dict):
221        """Use cap_dict to set all the cap values
222
223        Set all of the capabilities in cap_dict to the correct config.
224
225        @param cap_dict: A dictionary with the capability as key and the desired
226                         setting as values
227        """
228        for cap, config in six.iteritems(cap_dict):
229            self.send_command('ccd set %s %s' % (cap, config))
230        current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
231        for cap, config in six.iteritems(cap_dict):
232            if (current_cap_settings[cap].lower() !=
233                config.lower()):
234                raise error.TestFail('Failed to set %s to %s' % (cap, config))
235
236
237    def get_cap_overview(self, cap_dict):
238        """Get a basic overview of the capability dictionary
239
240        If all capabilities are set to Default, ccd has been reset to default.
241        If all capabilities are set to Always, ccd is in factory mode.
242
243        @param cap_dict: A dictionary of the capability settings
244        @return: A tuple of the capability overview (in factory mode, is reset)
245        """
246        in_factory_mode = True
247        is_reset = True
248        for cap, cap_info in six.iteritems(cap_dict):
249            cap_setting = cap_info[self.CAP_SETTING]
250            if cap_setting != 'Always':
251                in_factory_mode = False
252            if cap_setting != 'Default':
253                is_reset = False
254        return in_factory_mode, is_reset
255
256
257    def password_is_reset(self):
258        """Returns True if the password is cleared"""
259        return self.get_ccd_info('Password') == 'none'
260
261
262    def ccd_is_reset(self):
263        """Returns True if the ccd is reset
264
265        The password must be cleared, write protect and battery presence must
266        follow battery presence, and all capabilities must be Always
267        """
268        return (self.password_is_reset() and self.wp_is_reset() and
269                self.batt_pres_is_reset() and
270                self.get_cap_overview(self.get_cap_dict())[1])
271
272
273    def wp_is_reset(self):
274        """Returns True if wp is reset to follow batt pres at all times"""
275        follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
276        return follow_batt_pres and follow_batt_pres_atboot
277
278
279    def get_wp_state(self):
280        """Get the current write protect and atboot state
281
282        The atboot setting cannot really be determined now if it is set to
283        follow battery presence. It is likely to remain the same after reboot,
284        but who knows. If the third element of the tuple is True, the last
285        element will not be that useful
286
287        @return: a tuple with the current write protect state
288                (True if current state is to follow batt presence,
289                 True if write protect is enabled,
290                 True if current state is to follow batt presence atboot,
291                 True if write protect is enabled atboot)
292        """
293        rv = self.send_command_retry_get_output('wp',
294                ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
295                 '(follow|enabled|disabled)'], safe=True)[0]
296        _, forced, enabled, _, atboot = rv
297        logging.debug(rv)
298        return (not forced, enabled =='enabled',
299                atboot == 'follow', atboot == 'enabled')
300
301
302    def in_dev_mode(self):
303        """Return True if cr50 thinks the device is in dev mode"""
304        return 'dev_mode' in self.get_ccd_info('TPM')
305
306
307    def get_ccd_info(self, field=None):
308        """Get the current ccd state.
309
310        Take the output of 'ccd' and convert it to a dictionary.
311
312        @param: the ccd info param to get or None to get the full ccd output
313                dictionary.
314        @return: the field value or a dictionary with the ccd field name as the
315                 key and the setting as the value.
316        """
317
318        if field:
319            match_value = self.CCD_FORMAT[field]
320        else:
321            values = [ self.CCD_FORMAT[field] for field in self.CCD_FIELDS ]
322            match_value = '.*'.join(values)
323        matched_output = None
324        original_timeout = float(self._servo.get('cr50_uart_timeout'))
325        # Change the console timeout to 10s, it may take longer than 3s to read
326        # ccd info
327        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
328        for i in range(self.GET_CAP_TRIES):
329            try:
330                # If some ccd output is dropped and the output doesn't match the
331                # expected ccd output format, send_command_get_output will wait the
332                # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use
333                # re to search the command output instead of
334                # send_safe_command_get_output, so we don't have to wait the full
335                # timeout if output is dropped.
336                rv = self.send_command_retry_get_output('ccd', ['ccd.*>'],
337                                                        safe=True)[0]
338                matched_output = re.search(match_value, rv, re.DOTALL)
339                if matched_output:
340                    break
341                logging.info('try %d: could not match ccd output %s', i, rv)
342            except Exception as e:
343                logging.info('try %d got error %s', i, str(e))
344
345        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
346        if not matched_output:
347            raise error.TestFail('Could not get ccd output')
348        matched_dict = matched_output.groupdict()
349        logging.info('Current CCD settings:\n%s', pprint.pformat(matched_dict))
350        if field:
351            return matched_dict.get(field)
352        return matched_dict
353
354
355    def get_cap(self, cap):
356        """Returns the capabilitiy from the capability dictionary"""
357        return self.get_cap_dict()[cap]
358
359
360    def get_cap_dict(self, info=None):
361        """Get the current ccd capability settings.
362
363        The capability may be using the 'Default' setting. That doesn't say much
364        about the ccd state required to use the capability. Return all ccd
365        information in the cap_dict
366        [is accessible, setting, requirement]
367
368        @param info: Only fill the cap_dict with the requested information:
369                     CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
370        @return: A dictionary with the capability as the key a list of the
371                 current settings as the value [is_accessible, setting,
372                 requirement]
373        """
374        # Add whitespace at the end, so we can still match the last line.
375        cap_info_str = self.get_ccd_info('Capabilities') + '\r\n'
376        cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT,
377                                  cap_info_str)
378        caps = {}
379        for cap, accessible, setting, _, required in cap_settings:
380            # If there's only 1 value after =, then the setting is the
381            # requirement.
382            if not required:
383                required = setting
384            cap_info = [accessible == 'Y', setting, required]
385            if info is not None:
386                caps[cap] = cap_info[info]
387            else:
388                caps[cap] = cap_info
389        logging.debug(pprint.pformat(caps))
390        return caps
391
392
393    def send_command_get_output(self, command, regexp_list):
394        """Send command through UART and wait for response.
395
396        Cr50 will drop characters input to the UART when it resumes from sleep.
397        If servo is not using ccd, send some dummy characters before sending the
398        real command to make sure cr50 is awake.
399
400        @param command: the command to send
401        @param regexp_list: The list of regular expressions to match in the
402                            command output
403        @return: A list of matched output
404        """
405        if self._servo.main_device_is_flex():
406            self.wake_cr50()
407
408        # We have started prepending '\n' to separate cr50 console junk from
409        # the real command. If someone is just searching for .*>, then they will
410        # only get the output from the first '\n' we added. Raise an error to
411        # change the test to look for something more specific ex command.*>.
412        # cr50 will print the command in the output, so that is an easy way to
413        # modify '.*>' to match the real command output.
414        if '.*>' in regexp_list:
415            raise error.TestError('Send more specific regexp %r %r' % (command,
416                    regexp_list))
417
418        # prepend \n to separate the command from any junk that may have been
419        # sent to the cr50 uart.
420        command = '\n' + command
421        return super(ChromeCr50, self).send_command_get_output(command,
422                                                               regexp_list)
423
424
425    def send_safe_command_get_output(self, command, regexp_list,
426            channel_mask=0x1):
427        """Restrict the console channels while sending console commands.
428
429        @param command: the command to send
430        @param regexp_list: The list of regular expressions to match in the
431                            command output
432        @param channel_mask: The mask to pass to 'chan' prior to running the
433                             command, indicating which channels should remain
434                             enabled (0x1 is command output)
435        @return: A list of matched output
436        """
437        self.send_command('chan save')
438        self.send_command('chan 0x%x' % channel_mask)
439        try:
440            rv = self.send_command_get_output(command, regexp_list)
441        finally:
442            self.send_command('chan restore')
443        return rv
444
445
446    def send_command_retry_get_output(self, command, regexp_list, safe=False,
447                                      compare_output=False, retries=MAX_RETRY_COUNT):
448        """Retry the command 5 times if you get a timeout or drop some output
449
450
451        @param command: the command string
452        @param regexp_list: the regex to search for
453        @param safe: use send_safe_command_get_output if True otherwise use
454                     send_command_get_output
455        @param compare_output: look for reproducible output
456        """
457        send_command = (self.send_safe_command_get_output if safe else
458                        self.send_command_get_output)
459        err = 'no consistent output' if compare_output else 'unknown'
460        past_rv = []
461        for i in range(retries):
462            try:
463                rv = send_command(command, regexp_list)
464                if not compare_output or rv in past_rv:
465                    return rv
466                if past_rv:
467                    logging.debug('%d %s not in %s', i, rv, past_rv)
468                past_rv.append(rv)
469            except Exception as e:
470                err = e
471                logging.info('attempt %d %r: %s %s', i, command, type(e),
472                             str(e))
473        if compare_output:
474            logging.info('No consistent output for %r %s', command,
475                         pprint.pformat(past_rv))
476        raise error.TestError('Issue sending %r command: %r' % (command, err))
477
478
479    def get_deep_sleep_count(self):
480        """Get the deep sleep count from the idle task"""
481        result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT],
482                                                    safe=True)
483        return int(result[0][1])
484
485
486    def clear_deep_sleep_count(self):
487        """Clear the deep sleep count"""
488        self.send_command('idle c')
489        if self.get_deep_sleep_count():
490            raise error.TestFail("Could not clear deep sleep count")
491
492
493    def get_board_properties(self):
494        """Get information from the version command"""
495        rv = self.send_command_retry_get_output('brdprop',
496                ['properties = (\S+)\s'], safe=True)
497        return int(rv[0][1], 16)
498
499
500    def uses_board_property(self, prop_name):
501        """Returns 1 if the given property is set, or 0 otherwise
502
503        @param prop_name: a property name in string type.
504        """
505        brdprop = self.get_board_properties()
506        prop = self.BOARD_PROP[prop_name]
507        return (brdprop & prop) == prop
508
509
510    def has_command(self, cmd):
511        """Returns 1 if cr50 has the command 0 if it doesn't"""
512        try:
513            self.send_safe_command_get_output('help', [cmd])
514        except:
515            logging.info("Image does not include '%s' command", cmd)
516            return 0
517        return 1
518
519
520    def reboot(self):
521        """Reboot Cr50 and wait for cr50 to reset"""
522        self.wait_for_reboot(cmd='reboot', timeout=10)
523
524
525    def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
526        """Use uart to wait for cr50 to reboot.
527
528        If a command is given run it and wait for cr50 to reboot. Monitor
529        the cr50 uart to detect the reset. Wait up to timeout seconds
530        for the reset.
531
532        @param cmd: the command to run to reset cr50.
533        @param timeout: seconds to wait to detect the reboot.
534        """
535        original_timeout = float(self._servo.get('cr50_uart_timeout'))
536        # Change the console timeout to timeout, so we wait at least that long
537        # for cr50 to print the start string.
538        self._servo.set_nocheck('cr50_uart_timeout', timeout)
539        try:
540            self.send_command_get_output(cmd, self.START_STR)
541            logging.debug('Detected cr50 reboot')
542        except error.TestFail as e:
543            logging.debug('Failed to detect cr50 reboot')
544        # Reset the timeout.
545        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
546
547
548    def wait_for_reboot(self, cmd='\n', timeout=60):
549        """Wait for cr50 to reboot
550
551        Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
552        necessary.
553
554        @param cmd: the command to run to reset cr50.
555        @param timeout: seconds to wait to detect the reboot.
556        """
557        logging.info('Wait up to %s seconds for reboot (%s)', timeout,
558                     cmd.strip())
559        if self._servo.main_device_is_ccd():
560            self.send_command(cmd)
561            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
562            # go down to detect the reboot.
563            self.wait_for_ccd_disable(timeout, raise_error=False)
564            self.ccd_enable()
565        else:
566            self._uart_wait_for_reboot(cmd, timeout)
567
568        # On most devices, a Cr50 reset will cause an AP reset. Force this to
569        # happen on devices where the AP is left down.
570        if not self.faft_config.ap_up_after_cr50_reboot:
571            # Reset the DUT a few seconds after cr50 reboot.
572            time.sleep(self.SHORT_WAIT)
573            logging.info('Resetting DUT after Cr50 reset')
574            self._servo.get_power_state_controller().reset()
575
576
577    def set_board_id(self, chip_bid, chip_flags):
578        """Set the chip board id type and flags."""
579        self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
580
581
582    def get_board_id(self):
583        """Get the chip board id type and flags.
584
585        bid_type_inv will be '' if the bid output doesn't show it. If no board
586        id type inv is shown, then board id is erased will just check the type
587        and flags.
588
589        @returns a tuple (A string of bid_type:bid_type_inv:bid_flags,
590                          True if board id is erased)
591        """
592        bid = self.send_command_retry_get_output('bid',
593                    ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'],
594                    safe=True)[0][1:]
595        bid_str = ':'.join(bid)
596        bid_is_erased =  set(bid).issubset({'', 'ffffffff'})
597        logging.info('chip board id: %s', bid_str)
598        logging.info('chip board id is erased: %s',
599                     'yes' if bid_is_erased else 'no')
600        return bid_str, bid_is_erased
601
602
603    def eraseflashinfo(self, retries=10):
604        """Run eraseflashinfo.
605
606        @returns True if the board id is erased
607        """
608        for i in range(retries):
609            # The console could drop characters while matching 'eraseflashinfo'.
610            # Retry if the command times out. It's ok to run eraseflashinfo
611            # multiple times.
612            rv = self.send_command_retry_get_output(
613                    'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip()
614            logging.info('eraseflashinfo output: %r', rv)
615            bid_erased = self.get_board_id()[1]
616            eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv
617            if not eraseflashinfo_issue and bid_erased:
618                break
619            logging.info('Retrying eraseflashinfo')
620        return bid_erased
621
622
623    def rollback(self):
624        """Set the reset counter high enough to force a rollback and reboot."""
625        if not self.has_command('rollback'):
626            raise error.TestError("need image with 'rollback'")
627
628        inactive_partition = self.get_inactive_version_info()[0]
629
630        self.wait_for_reboot(cmd='rollback', timeout=10)
631
632        running_partition = self.get_active_version_info()[0]
633        if inactive_partition != running_partition:
634            raise error.TestError("Failed to rollback to inactive image")
635
636
637    def rolledback(self):
638        """Returns true if cr50 just rolled back"""
639        return 'Rollback detected' in self.send_command_retry_get_output(
640                'sysinfo', ['sysinfo.*>'], safe=True)[0]
641
642
643    def get_version_info(self, regexp):
644        """Get information from the version command"""
645        return self.send_command_retry_get_output('version', [regexp],
646                                                  safe=True,
647                                                  compare_output=True)[0][1::]
648
649
650    def get_inactive_version_info(self):
651        """Get the active partition, version, and hash"""
652        return self.get_version_info(self.INACTIVE_VERSION)
653
654
655    def get_active_version_info(self):
656        """Get the active partition, version, and hash"""
657        return self.get_version_info(self.ACTIVE_VERSION)
658
659
660    def using_prod_rw_keys(self):
661        """Returns True if the RW keyid is prod"""
662        rv = self.send_command_retry_get_output('sysinfo',
663                ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1]
664        logging.info('RW Keyid: 0x%s', rv)
665        return rv in self.PROD_RW_KEYIDS
666
667
668    def get_active_board_id_str(self):
669        """Get the running image board id.
670
671        @return: The board id string or None if the image does not support board
672                 id or the image is not board id locked.
673        """
674        # Getting the board id from the version console command is only
675        # supported in board id locked images .22 and above. Any image that is
676        # board id locked will have support for getting the image board id.
677        #
678        # If board id is not supported on the device, return None. This is
679        # still expected on all current non board id locked release images.
680        try:
681            version_info = self.get_version_info(self.ACTIVE_BID)
682        except error.TestFail as e:
683            logging.info(str(e))
684            logging.info('Cannot use the version to get the board id')
685            return None
686
687        if self.BID_ERROR in version_info[4]:
688            raise error.TestError(version_info)
689        bid = version_info[4].split()[1]
690        return cr50_utils.GetBoardIdInfoString(bid)
691
692
693    def get_version(self):
694        """Get the RW version"""
695        return self.get_active_version_info()[1].strip()
696
697
698    def get_full_version(self):
699        """Get the complete RW version string."""
700        _, rw_ver, dbg, ver_str = self.get_active_version_info()
701        return  rw_ver + (dbg if dbg else '') + ver_str
702
703
704    def ccd_is_enabled(self):
705        """Return True if ccd is enabled.
706
707        If the test is running through ccd, return the ccd_state value. If
708        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
709        if Cr50 has ccd enabled.
710
711        @return: 'off' or 'on' based on whether the cr50 console is working.
712        """
713        if self._servo.main_device_is_ccd():
714            return self._servo.get('ccd_state') == 'on'
715        else:
716            return not bool(self.gpioget('CCD_MODE_L'))
717
718
719    @dts_control_command
720    def wait_for_stable_ccd_state(self, state, timeout, raise_error):
721        """Wait up to timeout seconds for CCD to be 'on' or 'off'
722
723        Verify ccd is off or on and remains in that state for 3 seconds.
724
725        @param state: a string either 'on' or 'off'.
726        @param timeout: time in seconds to wait
727        @param raise_error: Raise TestFail if the value is state is not reached.
728        @raise TestFail: if ccd never reaches the specified state
729        """
730        wait_for_enable = state == 'on'
731        logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
732        enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
733                                       timeout_sec=timeout)
734        if enabled != wait_for_enable:
735            error_msg = ("timed out before detecting ccd '%s'" %
736                         ('on' if wait_for_enable else 'off'))
737            if raise_error:
738                raise error.TestFail(error_msg)
739            logging.warning(error_msg)
740        else:
741            # Make sure the state doesn't change.
742            enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
743                                           timeout_sec=self.SHORT_WAIT)
744            if enabled != wait_for_enable:
745                error_msg = ("CCD switched %r after briefly being %r" %
746                             ('on' if enabled else 'off', state))
747                if raise_error:
748                    raise error.TestFail(error_msg)
749                logging.info(error_msg)
750        logging.info("ccd is %r", 'on' if enabled else 'off')
751
752
753    @dts_control_command
754    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
755        """Wait for the cr50 console to stop working"""
756        self.wait_for_stable_ccd_state('off', timeout, raise_error)
757
758
759    @dts_control_command
760    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
761        """Wait for the cr50 console to start working"""
762        self.wait_for_stable_ccd_state('on', timeout, raise_error)
763
764
765    @dts_control_command
766    def ccd_disable(self, raise_error=True):
767        """Change the values of the CC lines to disable CCD"""
768        logging.info("disable ccd")
769        self._servo.set_dts_mode('off')
770        self.wait_for_ccd_disable(raise_error=raise_error)
771
772
773    @dts_control_command
774    def ccd_enable(self, raise_error=False):
775        """Reenable CCD and reset servo interfaces"""
776        logging.info("reenable ccd")
777        self._servo.set_dts_mode('on')
778        # If the test is actually running with ccd, wait for USB communication
779        # to come up after reset.
780        if self._servo.main_device_is_ccd():
781            time.sleep(self._servo.USB_DETECTION_DELAY)
782        self.wait_for_ccd_enable(raise_error=raise_error)
783
784
785    def _level_change_req_pp(self, level):
786        """Returns True if setting the level will require physical presence"""
787        testlab_pp = level != 'testlab open' and 'testlab' in level
788        # If the level is open and the ccd capabilities say physical presence
789        # is required, then physical presence will be required.
790        open_pp = (level == 'open' and
791                   not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
792        return testlab_pp or open_pp
793
794
795    def _state_to_bool(self, state):
796        """Converts the state string to True or False"""
797        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
798        return state.lower() in self.ON_STRINGS
799
800
801    def testlab_is_on(self):
802        """Returns True of testlab mode is on"""
803        return self._state_to_bool(self._servo.get('cr50_testlab'))
804
805
806    def set_ccd_testlab(self, state):
807        """Set the testlab mode
808
809        @param state: the desired testlab mode string: 'on' or 'off'
810        @raise TestFail: if testlab mode was not changed
811        """
812        if self._servo.main_device_is_ccd():
813            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
814                    'cable instead.')
815        if not self.faft_config.has_powerbutton:
816            raise error.TestError('No power button on device')
817
818        request_on = self._state_to_bool(state)
819        testlab_on = self.testlab_is_on()
820        request_str = 'on' if request_on else 'off'
821
822        if testlab_on == request_on:
823            logging.info('ccd testlab already set to %s', request_str)
824            return
825
826        original_level = self.get_ccd_level()
827
828        # We can only change the testlab mode when the device is open. If
829        # testlab mode is already enabled, we can go directly to open using 'ccd
830        # testlab open'. This will save 5 minutes, because we can skip the
831        # physical presence check.
832        if testlab_on:
833            self.send_command('ccd testlab open')
834        else:
835            self.set_ccd_level('open')
836
837        ap_is_on = self.ap_is_on()
838        # Set testlab mode
839        rv = self.send_command_get_output('ccd testlab %s' % request_str,
840                ['ccd.*>'])[0]
841        if 'Access Denied' in rv:
842            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
843
844        # Press the power button once a second for 15 seconds. If the AP is
845        # currently on, make sure it's on at the end of the open process.
846        self.run_pp(self.PP_SHORT, ensure_ap_on=ap_is_on)
847
848        self.set_ccd_level(original_level)
849        if request_on != self.testlab_is_on():
850            raise error.TestFail('Failed to set ccd testlab to %s' % state)
851
852
853    def get_ccd_level(self):
854        """Returns the current ccd privilege level"""
855        return self.get_ccd_info('State').lower().rstrip('ed')
856
857
858    def set_ccd_level(self, level, password=''):
859        """Set the Cr50 CCD privilege level.
860
861        @param level: a string of the ccd privilege level: 'open', 'lock', or
862                      'unlock'.
863        @param password: send the ccd command with password. This will still
864                         require the same physical presence.
865        @raise TestFail: if the level couldn't be set
866        """
867        # TODO(mruthven): add support for CCD password
868        level = level.lower()
869
870        if level == self.get_ccd_level():
871            logging.info('CCD privilege level is already %s', level)
872            return
873
874        if 'testlab' in level:
875            raise error.TestError("Can't change testlab mode using "
876                "ccd_set_level")
877
878        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
879        batt_is_disconnected = self.get_batt_pres_state()[1]
880        req_pp = self._level_change_req_pp(level)
881        has_pp = not self._servo.main_device_is_ccd()
882        dbg_en = self.get_active_version_info()[2]
883
884        if req_pp and not has_pp:
885            raise error.TestError("Can't change privilege level to '%s' "
886                "without physical presence." % level)
887
888        if not testlab_on and not has_pp:
889            raise error.TestError("Wont change privilege level without "
890                "physical presence or testlab mode enabled")
891
892        original_timeout = float(self._servo.get('cr50_uart_timeout'))
893        # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
894        # take more than 3 seconds.
895        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
896        # Start the unlock process.
897
898        if level == 'open' or level == 'unlock':
899            logging.info('waiting %d seconds, the minimum time between'
900                         ' ccd password attempts',
901                         self.CCD_PASSWORD_RATE_LIMIT)
902            time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
903
904        ap_is_on = self.ap_is_on()
905        try:
906            cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
907            # ccd command outputs on the rbox, ccd, and console channels,
908            # respectively. Cr50 uses these channels to print relevant ccd
909            # information.
910            # Restrict all other channels.
911            ccd_output_channels = 0x20000 | 0x8 | 0x1
912            rv = self.send_safe_command_get_output(
913                    cmd, [cmd + '(.*)>'],
914                    channel_mask=ccd_output_channels)[0][1]
915        finally:
916            self._servo.set('cr50_uart_timeout', original_timeout)
917        logging.info(rv)
918        if 'ccd_open denied: fwmp' in rv:
919            raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
920        if 'Access Denied' in rv:
921            raise error.TestFail("%r %s" % (cmd, rv))
922        if 'Busy' in rv:
923            raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
924
925        # Press the power button once a second, if we need physical presence.
926        if req_pp and batt_is_disconnected:
927            # DBG images have shorter unlock processes. If the AP is currently
928            # on, make sure it's on at the end of the open process.
929            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG,
930                        ensure_ap_on=ap_is_on)
931
932        if level != self.get_ccd_level():
933            raise error.TestFail('Could not set privilege level to %s' % level)
934
935        logging.info('Successfully set CCD privelege level to %s', level)
936
937
938    def run_pp(self, unlock_timeout, ensure_ap_on=False):
939        """Press the power button a for unlock_timeout seconds.
940
941        This will press the power button many more times than it needs to be
942        pressed. Cr50 doesn't care if you press it too often. It just cares that
943        you press the power button at least once within the detect interval.
944
945        For privilege level changes you need to press the power button 5 times
946        in the short interval and then 4 times within the long interval.
947        Short Interval
948        100msec < power button press < 5 seconds
949        Long Interval
950        60s < power button press < 300s
951
952        For testlab enable/disable you must press the power button 5 times
953        spaced between 100msec and 5 seconds apart.
954
955        @param unlock_timeout: time to press the power button in seconds.
956        @param ensure_ap_on: If true, press the power to turn on the AP.
957        """
958        end_time = time.time() + unlock_timeout
959
960        logging.info('Pressing power button for %ds to unlock the console.',
961                     unlock_timeout)
962        logging.info('The process should end at %s', time.ctime(end_time))
963
964        # Press the power button once a second to unlock the console.
965        while time.time() < end_time:
966            self._servo.power_short_press()
967            time.sleep(1)
968
969        # If the last power button press left the AP powered off, and it was on
970        # before, turn it back on.
971        time.sleep(self.faft_config.shutdown)
972        if ensure_ap_on and not self.ap_is_on():
973            logging.info('AP is off. Pressing the power button to turn it on')
974            self._servo.power_short_press()
975            logging.debug('Pressing PP to turn back on')
976
977
978    def gettime(self):
979        """Get the current cr50 system time"""
980        result = self.send_safe_command_get_output('gettime', [' = (.*) s'])
981        return float(result[0][1])
982
983
984    def servo_dts_mode_is_valid(self):
985        """Returns True if cr50 registers change in servo dts mode."""
986        # This is to test that Cr50 actually recognizes the change in ccd state
987        # We cant do that with tests using ccd, because the cr50 communication
988        # goes down once ccd is enabled.
989        if not self._servo.dts_mode_is_safe():
990            return False
991
992        ccd_start = 'on' if self.ccd_is_enabled() else 'off'
993        dts_start = self._servo.get_dts_mode()
994        try:
995            # Verify both ccd enable and disable
996            self.ccd_disable(raise_error=True)
997            self.ccd_enable(raise_error=True)
998            rv = True
999        except Exception as e:
1000            logging.info(e)
1001            rv = False
1002        self._servo.set_dts_mode(dts_start)
1003        self.wait_for_stable_ccd_state(ccd_start, 60, True)
1004        logging.info('Test setup does%s support servo DTS mode',
1005                '' if rv else 'n\'t')
1006        return rv
1007
1008
1009    def wait_until_update_is_allowed(self):
1010        """Wait until cr50 will be able to accept an update.
1011
1012        Cr50 rejects any attempt to update if it has been less than 60 seconds
1013        since it last recovered from deep sleep or came up from reboot. This
1014        will wait until cr50 gettime shows a time greater than 60.
1015        """
1016        if self.get_active_version_info()[2]:
1017            logging.info("Running DBG image. Don't need to wait for update.")
1018            return
1019        cr50_time = self.gettime()
1020        if cr50_time < 60:
1021            sleep_time = 61 - cr50_time
1022            logging.info('Cr50 has been up for %ds waiting %ds before update',
1023                         cr50_time, sleep_time)
1024            time.sleep(sleep_time)
1025
1026
1027    def tpm_is_enabled(self):
1028        """Query the current TPM mode.
1029
1030        @return:  True if TPM is enabled, False otherwise.
1031        """
1032        result = self.send_command_retry_get_output('sysinfo',
1033                ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1]
1034        logging.debug(result)
1035
1036        return result.lower() == 'enabled'
1037
1038
1039    def get_keyladder_state(self):
1040        """Get the status of H1 Key Ladder.
1041
1042        @return: The keyladder state string. prod or dev both mean enabled.
1043        """
1044        result = self.send_command_retry_get_output('sysinfo',
1045                ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'],
1046                safe=True)[0][1]
1047        logging.debug(result)
1048        return result
1049
1050
1051    def keyladder_is_disabled(self):
1052        """Get the status of H1 Key Ladder.
1053
1054        @return: True if H1 Key Ladder is disabled. False otherwise.
1055        """
1056        return self.get_keyladder_state() == 'disabled'
1057
1058
1059    def get_sleepmask(self):
1060        """Returns the sleepmask as an int"""
1061        rv = self.send_command_retry_get_output('sleepmask',
1062                ['sleep mask: (\S{8})\s+'], safe=True)[0][1]
1063        logging.info('sleepmask %s', rv)
1064        return int(rv, 16)
1065
1066
1067    def get_ccdstate(self):
1068        """Return a dictionary of the ccdstate once it's done debouncing"""
1069        for i in range(self.CCDSTATE_MAX_RETRY_COUNT):
1070            rv = self.send_command_retry_get_output('ccdstate',
1071                    ['ccdstate(.*)>'], safe=True, compare_output=True)[0][0]
1072
1073            # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or
1074            # 'unknown' may appear transiently. 'debouncing' should transition
1075            # to 'on' or 'off' within 1 second, and 'unknown' should do so
1076            # within 20 seconds.
1077            if 'debouncing' not in rv and 'unknown' not in rv:
1078                break
1079            time.sleep(self.SHORT_WAIT)
1080        ccdstate = {}
1081        for line in rv.splitlines():
1082            line = line.strip()
1083            if ':' in line:
1084                k, v = line.split(':', 1)
1085                ccdstate[k.strip()] = v.strip()
1086        logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate))
1087        return ccdstate
1088
1089
1090    def ap_is_on(self):
1091        """Get the power state of the AP.
1092
1093        @return: True if the AP is on; False otherwise.
1094        """
1095        ap_state = self.get_ccdstate()['AP']
1096        if ap_state == 'on':
1097            return True
1098        elif ap_state == 'off':
1099            return False
1100        else:
1101            raise error.TestFail('Read unusable AP state from ccdstate: %r' %
1102                                 ap_state)
1103
1104
1105    def gpioget(self, signal_name):
1106        """Get the current state of the signal
1107
1108        @return an integer 1 or 0 based on the gpioget value
1109        """
1110        result = self.send_command_retry_get_output('gpioget',
1111                    ['(0|1)[ \S]*%s' % signal_name], safe=True)
1112        return int(result[0][1])
1113
1114
1115    def batt_pres_is_reset(self):
1116        """Returns True if batt pres is reset to always follow batt pres"""
1117        follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state()
1118        return follow_bp and follow_bp_atboot
1119
1120
1121    def get_batt_pres_state(self):
1122        """Get the current and atboot battery presence state
1123
1124        The atboot setting cannot really be determined now if it is set to
1125        follow battery presence. It is likely to remain the same after reboot,
1126        but who knows. If the third element of the tuple is True, the last
1127        element will not be that useful
1128
1129        @return: a tuple of the current battery presence state
1130                 (True if current state is to follow batt presence,
1131                  True if battery is connected,
1132                  True if current state is to follow batt presence atboot,
1133                  True if battery is connected atboot)
1134        """
1135        # bpforce is added in 4.16. If the image doesn't have the command, cr50
1136        # always follows battery presence. In these images 'gpioget BATT_PRES_L'
1137        # accurately represents the battery presence state, because it can't be
1138        # overidden.
1139        if not self.has_command('bpforce'):
1140            batt_pres = not bool(self.gpioget('BATT_PRES_L'))
1141            return (True, batt_pres, True, batt_pres)
1142
1143        # The bpforce command is very similar to the wp command. It just
1144        # substitutes 'connected' for 'enabled' and 'disconnected' for
1145        # 'disabled'.
1146        rv = self.send_command_retry_get_output('bpforce',
1147                ['batt pres: (forced )?(con|dis).*at boot: (forced )?'
1148                 '(follow|discon|con)'], safe=True)[0]
1149        _, forced, connected, _, atboot = rv
1150        logging.info(rv)
1151        return (not forced, connected == 'con', atboot == 'follow',
1152                atboot == 'con')
1153
1154
1155    def set_batt_pres_state(self, state, atboot):
1156        """Override the battery presence state.
1157
1158        @param state: a string of the battery presence setting: 'connected',
1159                  'disconnected', or 'follow_batt_pres'
1160        @param atboot: True if we're overriding battery presence atboot
1161        """
1162        cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '')
1163        logging.info('running %r', cmd)
1164        self.send_command(cmd)
1165
1166
1167    def dump_nvmem(self):
1168        """Print nvmem objects."""
1169        rv = self.send_safe_command_get_output('dump_nvmem',
1170                                               ['dump_nvmem(.*)>'])[0][1]
1171        logging.info('NVMEM OUTPUT:\n%s', rv)
1172
1173
1174    def get_reset_cause(self):
1175        """Returns the reset flags for the last reset."""
1176        rv = self.send_command_retry_get_output('sysinfo',
1177                ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1]
1178        logging.info('reset cause: %s', rv)
1179        return int(rv, 16)
1180
1181
1182    def was_reset(self, reset_type):
1183        """Returns 1 if the reset type is found in the reset_cause.
1184
1185        @param reset_type: reset name in string type.
1186        """
1187        reset_cause = self.get_reset_cause()
1188        reset_flag = self.RESET_FLAGS[reset_type]
1189        return bool(reset_cause & reset_flag)
1190
1191
1192    def get_devid(self):
1193        """Returns the cr50 serial number."""
1194        return self.send_command_retry_get_output('sysinfo',
1195                ['DEV_ID:\s+(0x[0-9a-f]{8} 0x[0-9a-f]{8})'])[0][1]
1196
1197
1198    def get_serial(self):
1199        """Returns the cr50 serial number."""
1200        serial = self.get_devid().replace('0x', '').replace(' ', '-').upper()
1201        logging.info('CCD serial: %s', serial)
1202        return serial
1203
1204    def check_boot_mode(self, mode_exp='NORMAL'):
1205        """Query the boot mode to Cr50, and compare it against mode_exp.
1206
1207        Args:
1208            mode_exp: expecting boot mode. It should be either 'NORMAL'
1209                      or 'NO_BOOT'.
1210        Returns:
1211            True if the boot mode matches mode_exp.
1212            False, otherwise.
1213        Raises:
1214            TestError: Input parameter is not valid.
1215        """
1216
1217        if mode_exp not in ['NORMAL', 'NO_BOOT']:
1218            raise error.TestError('parameter, mode_exp is not valid: %s' %
1219                                  mode_exp)
1220        rv = self.send_command_retry_get_output('ec_comm',
1221                ['boot_mode\s*:\s*(NORMAL|NO_BOOT)'], safe=True)
1222        return mode_exp == rv[0][1]
1223
1224    def get_reset_count(self):
1225        """Returns the cr50 reset count"""
1226        return self.send_command_retry_get_output('sysinfo',
1227                                                  ['Reset count: (\d+)'],
1228                                                  safe=True)[0][1]
1229
1230    def check_servo_monitor(self):
1231        """Returns True if cr50 can detect servo connect/disconnect"""
1232        orig_dts = self._servo.get('servo_v4_dts_mode')
1233        # Detach ccd so EC uart won't interfere with servo detection
1234        self._servo.set_dts_mode('off')
1235        self._servo.set('ec_uart_en', 'off')
1236        time.sleep(self.SHORT_WAIT)
1237        if self.get_ccdstate()['Servo'] != 'disconnected':
1238            self._servo.set_dts_mode(orig_dts)
1239            return False
1240
1241        self._servo.set('ec_uart_en', 'on')
1242        time.sleep(self.SHORT_WAIT)
1243        if self.get_ccdstate()['Servo'] != 'connected':
1244            self._servo.set_dts_mode(orig_dts)
1245            return False
1246        self._servo.set_dts_mode(orig_dts)
1247        return True
1248