1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import pprint
8import StringIO
9import subprocess
10import time
11
12from autotest_lib.client.bin import utils
13from autotest_lib.client.common_lib import error, utils
14from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
15from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper
16from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
17
18
19class Cr50Test(FirmwareTest):
20    """Base class that sets up helper objects/functions for cr50 tests."""
21    version = 1
22
23    CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/'
24    CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin'
25    CR50_PROD_FILE = 'cr50.%s.bin.prod'
26    NONE = 0
27    # Saved the original device state during init.
28    INITIAL_STATE = 1 << 0
29    # Saved the original image, the device image, and the debug image. These
30    # images are needed to be able to restore the original image and board id.
31    IMAGES = 1 << 1
32    PP_SHORT_INTERVAL = 3
33    CLEARED_FWMP_EXIT_STATUS = 1
34    CLEARED_FWMP_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
35                              '_INVALID')
36    # Cr50 may have flash operation errors during the test. Here's an example
37    # of one error message.
38    # do_flash_op:245 errors 20 fsh_pe_control 40720004
39    # The stuff after the ':' may change, but all flash operation errors
40    # contain do_flash_op. do_flash_op is only ever printed if there is an
41    # error during the flash operation. Just search for do_flash_op to simplify
42    # the search string and make it applicable to all flash op errors.
43    CR50_FLASH_OP_ERROR_MSG = 'do_flash_op'
44
45    def initialize(self, host, cmdline_args, full_args,
46            restore_cr50_state=False, cr50_dev_path='', provision_update=False):
47        self._saved_state = self.NONE
48        self._raise_error_on_mismatch = not restore_cr50_state
49        self._provision_update = provision_update
50        super(Cr50Test, self).initialize(host, cmdline_args)
51
52        if not hasattr(self, 'cr50'):
53            raise error.TestNAError('Test can only be run on devices with '
54                                    'access to the Cr50 console')
55
56        logging.info('Test Args: %r', full_args)
57
58        self.can_set_ccd_level = (not self.cr50.using_ccd() or
59            self.cr50.testlab_is_on())
60        self.original_ccd_level = self.cr50.get_ccd_level()
61        self.original_ccd_settings = self.cr50.get_cap_dict(
62                info=self.cr50.CAP_SETTING)
63
64        self.host = host
65        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
66        # Clear the FWMP, so it can't disable CCD.
67        self.clear_fwmp()
68
69        if self.can_set_ccd_level:
70            # Lock cr50 so the console will be restricted
71            self.cr50.set_ccd_level('lock')
72        elif self.original_ccd_level != 'lock':
73            raise error.TestNAError('Lock the console before running cr50 test')
74
75        self._save_original_state()
76
77        # Verify cr50 is still running the correct version
78        cr50_qual_version = full_args.get('cr50_qual_version', '').strip()
79        if cr50_qual_version:
80            _, running_rw, running_bid = self.get_saved_cr50_original_version()
81            expected_rw, expected_bid_sym = cr50_qual_version.split('/')
82            expected_bid = cr50_utils.GetBoardIdInfoString(expected_bid_sym,
83                                                           symbolic=False)
84            logging.debug('Running %s %s Expect %s %s', running_rw, running_bid,
85                          expected_rw, expected_bid)
86            if running_rw != expected_rw or expected_bid != running_bid:
87                raise error.TestError('Not running %s' % cr50_qual_version)
88
89        # We successfully saved the device state
90        self._saved_state |= self.INITIAL_STATE
91        try:
92            self._save_node_locked_dev_image(cr50_dev_path)
93            self._save_original_images(full_args.get('release_path', ''))
94            # We successfully saved the device images
95            self._saved_state |= self.IMAGES
96        except:
97            if restore_cr50_state:
98                raise
99
100
101    def after_run_once(self):
102        """Log which iteration just ran"""
103        logging.info('successfully ran iteration %d', self.iteration)
104
105
106    def _save_node_locked_dev_image(self, cr50_dev_path):
107        """Save or download the node locked dev image.
108
109        Args:
110            cr50_dev_path: The path to the node locked cr50 image.
111        """
112        if os.path.isfile(cr50_dev_path):
113            self._node_locked_cr50_image = cr50_dev_path
114        else:
115            devid = self.servo.get('cr50_devid')
116            self._node_locked_cr50_image = self.download_cr50_debug_image(
117                devid)[0]
118
119
120    def _save_original_images(self, release_path):
121        """Use the saved state to find all of the device images.
122
123        This will download running cr50 image and the device image.
124
125        Args:
126            release_path: The release path given by test args
127        """
128        # Copy the prod and prepvt images from the DUT
129        _, prod_rw, prod_bid = self._original_state['device_prod_ver']
130        filename = 'prod_device_image_' + prod_rw
131        self._device_prod_image = os.path.join(self.resultsdir,
132                filename)
133        self.host.get_file(cr50_utils.CR50_PROD,
134                self._device_prod_image)
135
136        if cr50_utils.HasPrepvtImage(self.host):
137            _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver']
138            filename = 'prepvt_device_image_' + prepvt_rw
139            self._device_prepvt_image = os.path.join(self.resultsdir,
140                    filename)
141            self.host.get_file(cr50_utils.CR50_PREPVT,
142                    self._device_prepvt_image)
143            prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid)
144        else:
145            self._device_prepvt_image = None
146            prepvt_rw = None
147            prepvt_bid = None
148
149        if os.path.isfile(release_path):
150            self._original_cr50_image = release_path
151            logging.info('using supplied image')
152            return
153        # If the running cr50 image version matches the image on the DUT use
154        # the DUT image as the original image. If the versions don't match
155        # download the image from google storage
156        _, running_rw, running_bid = self.get_saved_cr50_original_version()
157
158        # Make sure prod_bid and running_bid are in the same format
159        prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid)
160        running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
161        if running_rw == prod_rw and running_bid == prod_bid:
162            logging.info('Using device cr50 prod image %s %s', prod_rw,
163                    prod_bid)
164            self._original_cr50_image = self._device_prod_image
165        elif running_rw == prepvt_rw and running_bid == prepvt_bid:
166            logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
167                    prepvt_bid)
168            self._original_cr50_image = self._device_prepvt_image
169        else:
170            logging.info('Downloading cr50 image %s %s', running_rw,
171                    running_bid)
172            self._original_cr50_image = self.download_cr50_release_image(
173                running_rw, running_bid)[0]
174
175
176    def _save_original_state(self):
177        """Save the cr50 related state.
178
179        Save the device's current cr50 version, cr50 board id, rlz, and image
180        at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to
181        restore the state during cleanup.
182        """
183        self._original_state = self.get_cr50_device_state()
184
185
186    def get_saved_cr50_original_version(self):
187        """Return (ro ver, rw ver, bid)."""
188        if ('running_ver' not in self._original_state or 'cr50_image_bid' not in
189            self._original_state):
190            raise error.TestError('No record of original cr50 image version')
191        return (self._original_state['running_ver'][0],
192                self._original_state['running_ver'][1],
193                self._original_state['cr50_image_bid'])
194
195
196    def get_saved_cr50_original_path(self):
197        """Return the local path for the original cr50 image."""
198        if not hasattr(self, '_original_cr50_image'):
199            raise error.TestError('No record of original image')
200        return self._original_cr50_image
201
202
203    def has_saved_cr50_dev_path(self):
204        """Returns true if we saved the node locked debug image."""
205        return hasattr(self, '_node_locked_cr50_image')
206
207
208    def get_saved_cr50_dev_path(self):
209        """Return the local path for the cr50 dev image."""
210        if not self.has_saved_cr50_dev_path():
211            raise error.TestError('No record of debug image')
212        return self._node_locked_cr50_image
213
214
215    def _restore_original_image(self, chip_bid, chip_flags):
216        """Restore the cr50 image and erase the state.
217
218        Make 3 attempts to update to the original image. Use a rollback from
219        the DBG image to erase the state that can only be erased by a DBG image.
220        Set the chip board id during rollback
221
222        Args:
223            chip_bid: the integer representation of chip board id or None if the
224                      board id should be erased
225            chip_flags: the integer representation of chip board id flags or
226                        None if the board id should be erased
227        """
228        for i in range(3):
229            try:
230                # Update to the node-locked DBG image so we can erase all of
231                # the state we are trying to reset
232                self.cr50_update(self._node_locked_cr50_image)
233
234                # Rollback to the original cr50 image.
235                self.cr50_update(self._original_cr50_image, rollback=True,
236                                 chip_bid=chip_bid, chip_flags=chip_flags)
237                break
238            except Exception, e:
239                logging.warning('Failed to restore original image attempt %d: '
240                                '%r', i, e)
241
242
243    def rootfs_verification_disable(self):
244        """Remove rootfs verification."""
245        if not self._rootfs_verification_is_disabled():
246            logging.debug('Removing rootfs verification.')
247            self.rootfs_tool.enable()
248
249
250    def _rootfs_verification_is_disabled(self):
251        """Returns true if rootfs verification is enabled."""
252        # Clear the TPM owner before trying to check rootfs verification
253        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
254        self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool()
255        self.rootfs_tool.initialize(self.host)
256        # rootfs_tool.is_enabled is True, that means rootfs verification is
257        # disabled.
258        return self.rootfs_tool.is_enabled()
259
260
261    def _restore_original_state(self):
262        """Restore the original cr50 related device state."""
263        if not (self._saved_state & self.IMAGES):
264            logging.warning('Did not save the original images. Cannot restore '
265                            'state')
266            return
267        # Remove the prepvt image if the test installed one.
268        if (not self._original_state['has_prepvt'] and
269            cr50_utils.HasPrepvtImage(self.host)):
270            self.host.run('rm %s' % cr50_utils.CR50_PREPVT)
271        # If rootfs verification has been disabled, copy the cr50 device image
272        # back onto the DUT.
273        if self._rootfs_verification_is_disabled():
274            cr50_utils.InstallImage(self.host, self._device_prod_image,
275                    cr50_utils.CR50_PROD)
276            # Install the prepvt image if there was one.
277            if self._device_prepvt_image:
278                cr50_utils.InstallImage(self.host, self._device_prepvt_image,
279                        cr50_utils.CR50_PREPVT)
280
281        chip_bid_info = self._original_state['chip_bid']
282        bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID
283        chip_bid = None if bid_is_erased else chip_bid_info[0]
284        chip_flags = None if bid_is_erased else chip_bid_info[2]
285        # Update to the original image and erase the board id
286        self._restore_original_image(chip_bid, chip_flags)
287
288        # Set the RLZ code
289        cr50_utils.SetRLZ(self.host, self._original_state['rlz'])
290
291        # Verify everything is still the same
292        mismatch = self._check_original_state()
293        if mismatch:
294            raise error.TestError('Could not restore state: %s' % mismatch)
295
296        logging.info('Successfully restored the original cr50 state')
297
298
299    def get_cr50_device_state(self):
300        """Get a dict with the current device cr50 information.
301
302        The state dict will include the platform brand, rlz code, chip board id,
303        the running cr50 image version, the running cr50 image board id, and the
304        device cr50 image version.
305        """
306        state = {}
307        state['mosys platform brand'] = self.host.run('mosys platform brand',
308            ignore_status=True).stdout.strip()
309        state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host,
310                cr50_utils.CR50_PROD)
311        state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host)
312        if state['has_prepvt']:
313            state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host,
314                    cr50_utils.CR50_PREPVT)
315        else:
316            state['device_prepvt_ver'] = None
317        state['rlz'] = cr50_utils.GetRLZ(self.host)
318        state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
319        state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
320        state['running_ver'] = cr50_utils.GetRunningVersion(self.host)
321        state['cr50_image_bid'] = self.cr50.get_active_board_id_str()
322
323        logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
324        return state
325
326
327    def _check_original_state(self):
328        """Compare the current cr50 state to the original state.
329
330        Returns:
331            A dictionary with the state that is wrong as the key and
332            the new and old state as the value
333        """
334        if not (self._saved_state & self.INITIAL_STATE):
335            logging.warning('Did not save the original state. Cannot verify it '
336                            'matches')
337            return
338        # Make sure the /var/cache/cr50* state is up to date.
339        cr50_utils.ClearUpdateStateAndReboot(self.host)
340
341        mismatch = {}
342        new_state = self.get_cr50_device_state()
343
344        for k, new_val in new_state.iteritems():
345            original_val = self._original_state[k]
346            if new_val != original_val:
347                mismatch[k] = 'old: %s, new: %s' % (original_val, new_val)
348
349        if mismatch:
350            logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
351        else:
352            logging.info('The device is in the original state')
353        return mismatch
354
355
356    def _reset_ccd_settings(self):
357        """Reset the ccd lock and capability states."""
358        # Clear the password if one was set.
359        if self.cr50.get_ccd_info()['Password'] != 'none':
360            self.servo.set_nocheck('cr50_testlab', 'open')
361            self.cr50.send_command('ccd reset')
362            if self.cr50.get_ccd_info()['Password'] != 'none':
363                raise error.TestFail('Could not clear password')
364
365        current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING)
366        if self.original_ccd_settings != current_settings:
367            if not self.can_set_ccd_level:
368                raise error.TestError("CCD state has changed, but we can't "
369                        "restore it")
370            self.fast_open(True)
371            self.cr50.set_caps(self.original_ccd_settings)
372
373        # First try using testlab open to open the device
374        if self.cr50.testlab_is_on() and self.original_ccd_level == 'open':
375            self.servo.set_nocheck('cr50_testlab', 'open')
376        if self.original_ccd_level != self.cr50.get_ccd_level():
377            self.cr50.set_ccd_level(self.original_ccd_level)
378
379
380
381    def cleanup(self):
382        """Attempt to cleanup the cr50 state. Then run firmware cleanup"""
383        try:
384            self._restore_cr50_state()
385        finally:
386            super(Cr50Test, self).cleanup()
387
388        # Check the logs captured during firmware_test cleanup for cr50 errors.
389        self._get_cr50_stats_from_uart_capture()
390
391
392    def _get_cr50_stats_from_uart_capture(self):
393        """Check cr50 uart output for errors.
394
395        Use the logs captured during firmware_test cleanup to check for cr50
396        errors. Flash operation issues aren't obvious unless you check the logs.
397        All flash op errors print do_flash_op and it isn't printed during normal
398        operation. Open the cr50 uart file and count the number of times this is
399        printed. Log the number of errors.
400        """
401        if not hasattr(self, 'cr50_uart_file'):
402            logging.info('There is not a cr50 uart file')
403            return
404
405        flash_error_count = 0
406        with open(self.cr50_uart_file, 'r') as f:
407            for line in f:
408                if self.CR50_FLASH_OP_ERROR_MSG in line:
409                    flash_error_count += 1
410
411        # Log any flash operation errors.
412        logging.info('do_flash_op count: %d', flash_error_count)
413
414
415    def _restore_cr50_state(self):
416        """Restore cr50 state, so the device can be used for further testing"""
417        state_mismatch = self._check_original_state()
418        if state_mismatch and not self._provision_update:
419            self._restore_original_state()
420            if self._raise_error_on_mismatch:
421                raise error.TestError('Unexpected state mismatch during '
422                                      'cleanup %s' % state_mismatch)
423
424        # Try to open cr50 and enable testlab mode if it isn't enabled.
425        try:
426            self.fast_open(True)
427        except:
428            # Even if we can't open cr50, do our best to reset the rest of the
429            # system state. Log a warning here.
430            logging.warning('Unable to Open cr50', exc_info=True)
431        # Reset the password as the first thing in cleanup. It is important that
432        # if some other part of cleanup fails, the password has at least been
433        # reset.
434        self.cr50.send_command('rddkeepalive disable')
435        self.cr50.send_command('ccd reset')
436        self.cr50.send_command('wp follow_batt_pres atboot')
437
438        # Reboot cr50 if the console is accessible. This will reset most state.
439        if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]:
440            self.cr50.reboot()
441
442        # Reenable servo v4 CCD
443        self.cr50.ccd_enable()
444
445        # reboot to normal mode if the device is in dev mode.
446        self.enter_mode_after_checking_tpm_state('normal')
447
448        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
449        self.clear_fwmp()
450
451        # Restore the ccd privilege level
452        if hasattr(self, 'original_ccd_level'):
453            self._reset_ccd_settings()
454
455
456    def find_cr50_gs_image(self, filename, image_type=None):
457        """Find the cr50 gs image name.
458
459        Args:
460            filename: the cr50 filename to match to
461            image_type: release or debug. If it is not specified we will search
462                        both the release and debug directories
463        Returns:
464            a tuple of the gsutil bucket, filename
465        """
466        gs_url = self.CR50_GS_URL % (image_type if image_type else '*')
467        gs_filename = os.path.join(gs_url, filename)
468        bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1)
469        return bucket, gs_filename
470
471
472    def download_cr50_gs_image(self, filename, image_bid='', bucket=None,
473                               image_type=None):
474        """Get the image from gs and save it in the autotest dir.
475
476        Args:
477            filename: The cr50 image basename
478            image_bid: the board id info list or string. It will be added to the
479                       filename.
480            bucket: The gs bucket name
481            image_type: 'debug' or 'release'. This will be used to determine
482                        the bucket if the bucket is not given.
483        Returns:
484            A tuple with the local path and version
485        """
486        # Add the image bid string to the filename
487        if image_bid:
488            bid_str = cr50_utils.GetBoardIdInfoString(image_bid,
489                                                       symbolic=True)
490            filename += '.' + bid_str.replace(':', '_')
491
492        if not bucket:
493            bucket, filename = self.find_cr50_gs_image(filename, image_type)
494
495        remote_temp_dir = '/tmp/'
496        src = os.path.join(remote_temp_dir, filename)
497        dest = os.path.join(self.resultsdir, filename)
498
499        # Copy the image to the dut
500        gsutil_wrapper.copy_private_bucket(host=self.host,
501                                           bucket=bucket,
502                                           filename=filename,
503                                           destination=remote_temp_dir)
504
505        self.host.get_file(src, dest)
506        ver = cr50_utils.GetBinVersion(self.host, src)
507
508        # Compare the image board id to the downloaded image to make sure we got
509        # the right file
510        downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
511        if image_bid and bid_str != downloaded_bid:
512            raise error.TestError('Could not download image with matching '
513                                  'board id wanted %s got %s' % (bid_str,
514                                  downloaded_bid))
515        return dest, ver
516
517
518    def download_cr50_debug_image(self, devid, image_bid=''):
519        """download the cr50 debug file.
520
521        Get the file with the matching devid and image board id info
522
523        Args:
524            devid: the cr50_devid string '${DEVID0} ${DEVID1}'
525            image_bid: the image board id info string or list
526        Returns:
527            A tuple with the debug image local path and version
528        """
529        # Debug images are node locked with the devid. Add the devid to the
530        # filename
531        filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'))
532
533        # Download the image
534        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
535                                                image_type='debug')
536
537        return dest, ver
538
539
540    def download_cr50_release_image(self, image_rw, image_bid=''):
541        """download the cr50 release file.
542
543        Get the file with the matching version and image board id info
544
545        Args:
546            image_rw: the rw version string
547            image_bid: the image board id info string or list
548        Returns:
549            A tuple with the release image local path and version
550        """
551        # Release images can be found using the rw version
552        filename = self.CR50_PROD_FILE % image_rw
553
554        # Download the image
555        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
556                                                image_type='release')
557
558        # Compare the rw version and board id info to make sure the right image
559        # was found
560        if image_rw != ver[1]:
561            raise error.TestError('Could not download image with matching '
562                                  'rw version')
563        return dest, ver
564
565
566    def _cr50_verify_update(self, expected_rw, expect_rollback):
567        """Verify the expected version is running on cr50.
568
569        Args:
570            expected_rw: The RW version string we expect to be running
571            expect_rollback: True if cr50 should have rolled back during the
572                             update
573
574        Raises:
575            TestFail if there is any unexpected update state
576        """
577        errors = []
578        running_rw = self.cr50.get_version()
579        if expected_rw != running_rw:
580            errors.append('running %s not %s' % (running_rw, expected_rw))
581
582        if expect_rollback != self.cr50.rolledback():
583            errors.append('%srollback detected' %
584                          'no ' if expect_rollback else '')
585        if len(errors):
586            raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
587        logging.info('RUNNING %s after %s', expected_rw,
588                     'rollback' if expect_rollback else 'update')
589
590
591    def _cr50_run_update(self, path):
592        """Install the image at path onto cr50.
593
594        Args:
595            path: the location of the image to update to
596
597        Returns:
598            the rw version of the image
599        """
600        tmp_dest = '/tmp/' + os.path.basename(path)
601
602        dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
603        cr50_utils.GSCTool(self.host, ['-a', dest])
604        return image_ver[1]
605
606
607    def cr50_update(self, path, rollback=False, erase_nvmem=False,
608                    expect_rollback=False, chip_bid=None, chip_flags=None):
609        """Attempt to update to the given image.
610
611        If rollback is True, we assume that cr50 is already running an image
612        that can rollback.
613
614        Args:
615            path: the location of the update image
616            rollback: True if we need to force cr50 to rollback to update to
617                      the given image
618            erase_nvmem: True if we need to erase nvmem during rollback
619            expect_rollback: True if cr50 should rollback on its own
620            chip_bid: the integer representation of chip board id or None if the
621                      board id should be erased during rollback
622            chip_flags: the integer representation of chip board id flags or
623                        None if the board id should be erased during rollback
624
625        Raises:
626            TestFail if the update failed
627        """
628        original_rw = self.cr50.get_version()
629
630        # Cr50 is going to reject an update if it hasn't been up for more than
631        # 60 seconds. Wait until that passes before trying to run the update.
632        self.cr50.wait_until_update_is_allowed()
633
634        image_rw = self._cr50_run_update(path)
635
636        # Running the update may cause cr50 to reboot. Wait for that before
637        # sending more commands. The reboot should happen quickly. Wait a
638        # maximum of 10 seconds.
639        self.cr50.wait_for_reboot(timeout=10)
640
641        if erase_nvmem and rollback:
642            self.cr50.erase_nvmem()
643
644        if rollback:
645            self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags)
646
647        expected_rw = original_rw if expect_rollback else image_rw
648        # If we expect a rollback, the version should remain unchanged
649        self._cr50_verify_update(expected_rw, rollback or expect_rollback)
650
651
652    def ccd_open_from_ap(self):
653        """Start the open process and press the power button."""
654        self._ccd_open_last_len = 0
655
656        self._ccd_open_stdout = StringIO.StringIO()
657
658        ccd_open_cmd = utils.sh_escape('gsctool -a -o')
659        full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
660            ccd_open_cmd)
661        # Start running the Cr50 Open process in the background.
662        self._ccd_open_job = utils.BgJob(full_ssh_cmd,
663                                         nickname='ccd_open',
664                                         stdout_tee=self._ccd_open_stdout,
665                                         stderr_tee=utils.TEE_TO_LOGS)
666
667        if self._ccd_open_job == None:
668            raise error.TestFail('could not start ccd open')
669
670        try:
671            # Wait for the first gsctool power button prompt before starting the
672            # open process.
673            logging.info(self._get_ccd_open_output())
674            # Cr50 starts out by requesting 5 quick presses then 4 longer
675            # power button presses. Run the quick presses without looking at the
676            # command output, because getting the output can take some time. For
677            # the presses that require a 1 minute wait check the output between
678            # presses, so we can catch errors
679            #
680            # run quick presses for 30 seconds. It may take a couple of seconds
681            # for open to start. 10 seconds should be enough. 30 is just used
682            # because it will definitely be enough, and this process takes 300
683            # seconds, so doing quick presses for 30 seconds won't matter.
684            end_time = time.time() + 30
685            while time.time() < end_time:
686                self.servo.power_short_press()
687                logging.info('short int power button press')
688                time.sleep(self.PP_SHORT_INTERVAL)
689            # Poll the output and press the power button for the longer presses.
690            utils.wait_for_value(self._check_open_and_press_power_button,
691                expected_value=True, timeout_sec=self.cr50.PP_LONG)
692        except Exception, e:
693            logging.info(e)
694            raise
695        finally:
696            self._close_ccd_open_job()
697        logging.info(self.cr50.get_ccd_info())
698
699
700    def _check_open_and_press_power_button(self):
701        """Check stdout and press the power button if prompted.
702
703        Returns:
704            True if the process is still running.
705        """
706        logging.info(self._get_ccd_open_output())
707        self.servo.power_short_press()
708        logging.info('long int power button press')
709        # Give cr50 some time to complete the open process. After the last
710        # power button press cr50 erases nvmem and resets the dut before setting
711        # the state to open. Wait a bit so we don't check the ccd state in the
712        # middle of this reset process. Power button requests happen once a
713        # minute, so waiting 10 seconds isn't a big deal.
714        time.sleep(10)
715        return (self._ccd_open_job.sp.poll() is not None or 'Open' in
716                self.cr50.get_ccd_info()['State'])
717
718
719    def _get_ccd_open_output(self):
720        """Read the new output."""
721        self._ccd_open_job.process_output()
722        self._ccd_open_stdout.seek(self._ccd_open_last_len)
723        output = self._ccd_open_stdout.read()
724        self._ccd_open_last_len = self._ccd_open_stdout.len
725        return output
726
727
728    def _close_ccd_open_job(self):
729        """Terminate the process and check the results."""
730        exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
731        stdout = self._ccd_open_stdout.getvalue().strip()
732        delattr(self, '_ccd_open_job')
733        if stdout:
734            logging.info('stdout of ccd open:\n%s', stdout)
735        if exit_status:
736            logging.info('exit status: %d', exit_status)
737        if 'Error' in stdout:
738            raise error.TestFail('ccd open Error %s' %
739                                 stdout.split('Error')[-1])
740        if self.cr50.OPEN != self.cr50.get_ccd_level():
741            raise error.TestFail('unable to open cr50: %s' % stdout)
742        else:
743            logging.info('Opened Cr50')
744
745
746    def fast_open(self, enable_testlab=False):
747        """Try to use testlab open. If that fails, do regular ap open.
748
749        Args:
750            enable_testlab: If True, enable testlab mode after cr50 is open.
751        """
752        # Try to use testlab open first, so we don't have to wait for the
753        # physical presence check.
754        self.cr50.send_command('ccd testlab open')
755        if self.cr50.get_ccd_level() == 'open':
756            return
757
758        # Use the console to open cr50 without entering dev mode if possible. It
759        # takes longer and relies on more systems to enter dev mode and ssh into
760        # the AP. Skip the steps that aren't required.
761        if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]:
762            self.enter_mode_after_checking_tpm_state('dev')
763
764        if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
765            self.cr50.set_ccd_level(self.cr50.OPEN)
766        else:
767            self.ccd_open_from_ap()
768
769        if enable_testlab:
770            self.cr50.set_ccd_testlab('on')
771
772        # Make sure the device is in normal mode. After opening cr50, the TPM
773        # should be cleared and the device should automatically reset to normal
774        # mode. Just check to be consistent. It's possible capabilitiy settings
775        # are set to skip wiping the TPM.
776        self.enter_mode_after_checking_tpm_state('normal')
777
778
779    def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error):
780        """Run a gsctool command and input the password
781
782        Args:
783            password: The cr50 password string
784            cmd: The gsctool command
785            name: The name to give the job
786            expect_error: True if the command should fail
787        """
788        set_pwd_cmd = utils.sh_escape(cmd)
789        full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
790            set_pwd_cmd)
791        stdout = StringIO.StringIO()
792        # Start running the gsctool Command in the background.
793        gsctool_job = utils.BgJob(full_ssh_command,
794                                  nickname='%s_with_password' % name,
795                                  stdout_tee=stdout,
796                                  stderr_tee=utils.TEE_TO_LOGS,
797                                  stdin=subprocess.PIPE)
798        if gsctool_job == None:
799            raise error.TestFail('could not start gsctool command %r', cmd)
800
801        try:
802            # Wait for enter prompt
803            gsctool_job.process_output()
804            logging.info(stdout.getvalue().strip())
805            # Enter the password
806            gsctool_job.sp.stdin.write(password + '\n')
807
808            # Wait for re-enter prompt
809            gsctool_job.process_output()
810            logging.info(stdout.getvalue().strip())
811            # Re-enter the password
812            gsctool_job.sp.stdin.write(password + '\n')
813            time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT)
814            gsctool_job.process_output()
815        finally:
816            exit_status = utils.nuke_subprocess(gsctool_job.sp)
817            output = stdout.getvalue().strip()
818            logging.info('%s stdout: %s', name, output)
819            logging.info('%s exit status: %s', name, exit_status)
820            if exit_status:
821                message = ('gsctool %s failed using %r: %s %s' %
822                           (name, password, exit_status, output))
823                if expect_error:
824                    logging.info(message)
825                else:
826                    raise error.TestFail(message)
827            elif expect_error:
828                raise error.TestFail('%s with %r did not fail when expected' %
829                                     (name, password))
830
831
832    def set_ccd_password(self, password, expect_error=False):
833        """Set the ccd password"""
834        # If for some reason the test sets a password and is interrupted before
835        # we can clear it, we want testlab mode to be enabled, so it's possible
836        # to clear the password without knowing it.
837        if not self.cr50.testlab_is_on():
838            raise error.TestError('Will not set password unless testlab mode '
839                                  'is enabled.')
840        self.run_gsctool_cmd_with_password(
841                password, 'gsctool -a -P', 'set_password', expect_error)
842
843
844    def ccd_unlock_from_ap(self, password=None, expect_error=False):
845        """Unlock cr50"""
846        if not password:
847            self.host.run('gsctool -a -U')
848            return
849        self.run_gsctool_cmd_with_password(
850                password, 'gsctool -a -U', 'unlock', expect_error)
851
852
853    def enter_mode_after_checking_tpm_state(self, mode):
854        """Reboot to mode if cr50 doesn't already match the state"""
855        # If the device is already in the correct mode, don't do anything
856        if (mode == 'dev') == self.cr50.in_dev_mode():
857            logging.info('already in %r mode', mode)
858            return
859
860        self.switcher.reboot_to_mode(to_mode=mode)
861
862        if (mode == 'dev') != self.cr50.in_dev_mode():
863            raise error.TestError('Unable to enter %r mode' % mode)
864
865
866    def _fwmp_is_cleared(self):
867        """Return True if the FWMP has been created"""
868        res = self.host.run('cryptohome '
869                            '--action=get_firmware_management_parameters',
870                            ignore_status=True)
871        if res.exit_status and res.exit_status != self.CLEARED_FWMP_EXIT_STATUS:
872            raise error.TestError('Could not run cryptohome command %r' % res)
873        return self.CLEARED_FWMP_ERROR_MSG in res.stdout
874
875
876    def clear_fwmp(self):
877        """Clear the FWMP"""
878        if self._fwmp_is_cleared():
879            return
880        status = self.host.run('cryptohome --action=tpm_status').stdout
881        logging.debug(status)
882        if 'TPM Owned: true' not in status:
883            self.host.run('cryptohome --action=tpm_take_ownership')
884            self.host.run('cryptohome --action=tpm_wait_ownership')
885        self.host.run('cryptohome '
886                      '--action=remove_firmware_management_parameters')
887
888    def tpm_is_responsive(self):
889        """Check TPM responsiveness by running tpm_version."""
890        result = self.host.run('tpm_version', ignore_status=True)
891        logging.debug(result.stdout.strip())
892        return not result.exit_status
893