1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Expects to be run in an environment with sudo and no interactive password
6# prompt, such as within the Chromium OS development chroot.
7
8import ast
9import logging
10import os
11import re
12import time
13import xmlrpclib
14
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import utils as server_utils
17from autotest_lib.server.cros.servo import firmware_programmer
18
19# Time to wait when probing for a usb device, it takes on avg 17 seconds
20# to do a full probe.
21_USB_PROBE_TIMEOUT = 40
22
23
24def _extract_image_from_tarball(tarball, dest_dir, image_candidates):
25    """Try extracting the image_candidates from the tarball.
26
27    @param tarball: The path of the tarball.
28    @param dest_path: The path of the destination.
29    @param image_candidates: A tuple of the paths of image candidates.
30
31    @return: The first path from the image candidates, which succeeds, or None
32             if all the image candidates fail.
33    """
34    for image in image_candidates:
35        status = server_utils.system(
36                ('tar xf %s -C %s %s' % (tarball, dest_dir, image)),
37                timeout=60, ignore_status=True)
38        if status == 0:
39            return image
40    return None
41
42
43class _PowerStateController(object):
44
45    """Class to provide board-specific power operations.
46
47    This class is responsible for "power on" and "power off"
48    operations that can operate without making assumptions in
49    advance about board state.  It offers an interface that
50    abstracts out the different sequences required for different
51    board types.
52
53    """
54
55    # Constants acceptable to be passed for the `rec_mode` parameter
56    # to power_on().
57    #
58    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
59    #   SD card.
60    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
61
62    REC_ON = 'rec'
63    REC_OFF = 'on'
64    REC_ON_FORCE_MRC = 'rec_force_mrc'
65
66    # Delay in seconds needed between asserting and de-asserting
67    # warm reset.
68    _RESET_HOLD_TIME = 0.5
69
70    def __init__(self, servo):
71        """Initialize the power state control.
72
73        @param servo Servo object providing the underlying `set` and `get`
74                     methods for the target controls.
75
76        """
77        self._servo = servo
78
79    def reset(self):
80        """Force the DUT to reset.
81
82        The DUT is guaranteed to be on at the end of this call,
83        regardless of its previous state, provided that there is
84        working OS software. This also guarantees that the EC has
85        been restarted.
86
87        """
88        self._servo.set_nocheck('power_state', 'reset')
89
90    def warm_reset(self):
91        """Apply warm reset to the DUT.
92
93        This asserts, then de-asserts the 'warm_reset' signal.
94        Generally, this causes the board to restart.
95
96        """
97        self._servo.set_get_all(['warm_reset:on',
98                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
99                                 'warm_reset:off'])
100
101    def power_off(self):
102        """Force the DUT to power off.
103
104        The DUT is guaranteed to be off at the end of this call,
105        regardless of its previous state, provided that there is
106        working EC and boot firmware.  There is no requirement for
107        working OS software.
108
109        """
110        self._servo.set_nocheck('power_state', 'off')
111
112    def power_on(self, rec_mode=REC_OFF):
113        """Force the DUT to power on.
114
115        Prior to calling this function, the DUT must be powered off,
116        e.g. with a call to `power_off()`.
117
118        At power on, recovery mode is set as specified by the
119        corresponding argument.  When booting with recovery mode on, it
120        is the caller's responsibility to unplug/plug in a bootable
121        external storage device.
122
123        If the DUT requires a delay after powering on but before
124        processing inputs such as USB stick insertion, the delay is
125        handled by this method; the caller is not responsible for such
126        delays.
127
128        @param rec_mode Setting of recovery mode to be applied at
129                        power on. default: REC_OFF aka 'off'
130
131        """
132        self._servo.set_nocheck('power_state', rec_mode)
133
134
135class _Uart(object):
136    """Class to capture CPU/EC UART streams."""
137    def __init__(self, servo):
138        self._servo = servo
139        self._streams = []
140        self._logs_dir = None
141
142    def start_capture(self):
143        """Start capturing Uart streams."""
144        logging.debug('Start capturing CPU/EC UART.')
145        self._servo.set('cpu_uart_capture', 'on')
146        self._streams.append(('cpu_uart_stream', 'cpu_uart.log'))
147        try:
148            self._servo.set('ec_uart_capture', 'on')
149            self._streams.append(('ec_uart_stream', 'ec_uart.log'))
150        except error.TestFail as err:
151            if 'No control named' in str(err):
152                logging.debug('The servod is too old that ec_uart_capture not '
153                              'supported.')
154
155    def dump(self):
156        """Dump UART streams to log files accordingly."""
157        if not self._logs_dir:
158            return
159
160        for stream, logfile in self._streams:
161            logfile_fullname = os.path.join(self._logs_dir, logfile)
162            try:
163                content = self._servo.get(stream)
164            except Exception as err:
165                logging.warn('Failed to get UART log for %s: %s', stream, err)
166                continue
167
168            # The UART stream may contain non-printable characters, and servo
169            # returns it in string representation. We use `ast.leteral_eval`
170            # to revert it back.
171            with open(logfile_fullname, 'a') as fd:
172                fd.write(ast.literal_eval(content))
173
174    def stop_capture(self):
175        """Stop capturing UART streams."""
176        logging.debug('Stop capturing CPU/EC UART.')
177        for uart in ('cpu_uart_capture', 'ec_uart_capture'):
178            try:
179                self._servo.set(uart, 'off')
180            except error.TestFail as err:
181                if 'No control named' in str(err):
182                    logging.debug('The servod is too old that %s not '
183                                  'supported.', uart)
184            except Exception as err:
185                logging.warn('Failed to stop UART logging for %s: %s', uart,
186                             err)
187
188    @property
189    def logs_dir(self):
190        """Return the directory to save UART logs."""
191        return self._logs_dir
192
193    @logs_dir.setter
194    def logs_dir(self, a_dir):
195        """Set directory to save UART logs.
196
197        @param a_dir  String of logs directory name."""
198        self._logs_dir = a_dir
199
200
201class Servo(object):
202
203    """Manages control of a Servo board.
204
205    Servo is a board developed by hardware group to aide in the debug and
206    control of various partner devices. Servo's features include the simulation
207    of pressing the power button, closing the lid, and pressing Ctrl-d. This
208    class manages setting up and communicating with a servo demon (servod)
209    process. It provides both high-level functions for common servo tasks and
210    low-level functions for directly setting and reading gpios.
211
212    """
213
214    # Power button press delays in seconds.
215    #
216    # The EC specification says that 8.0 seconds should be enough
217    # for the long power press.  However, some platforms need a bit
218    # more time.  Empirical testing has found these requirements:
219    #   Alex: 8.2 seconds
220    #   ZGB:  8.5 seconds
221    # The actual value is set to the largest known necessary value.
222    #
223    # TODO(jrbarnette) Being generous is the right thing to do for
224    # existing platforms, but if this code is to be used for
225    # qualification of new hardware, we should be less generous.
226    SHORT_DELAY = 0.1
227
228    # Maximum number of times to re-read power button on release.
229    GET_RETRY_MAX = 10
230
231    # Delays to deal with DUT state transitions.
232    SLEEP_DELAY = 6
233    BOOT_DELAY = 10
234
235    # Default minimum time interval between 'press' and 'release'
236    # keyboard events.
237    SERVO_KEY_PRESS_DELAY = 0.1
238
239    # Time to toggle recovery switch on and off.
240    REC_TOGGLE_DELAY = 0.1
241
242    # Time to toggle development switch on and off.
243    DEV_TOGGLE_DELAY = 0.1
244
245    # Time between an usb disk plugged-in and detected in the system.
246    USB_DETECTION_DELAY = 10
247    # Time to keep USB power off before and after USB mux direction is changed
248    USB_POWEROFF_DELAY = 2
249
250    # Time to wait before timing out on servo initialization.
251    INIT_TIMEOUT_SECS = 10
252
253
254    def __init__(self, servo_host, servo_serial=None):
255        """Sets up the servo communication infrastructure.
256
257        @param servo_host: A ServoHost object representing
258                           the host running servod.
259        @param servo_serial: Serial number of the servo board.
260        """
261        # TODO(fdeng): crbug.com/298379
262        # We should move servo_host object out of servo object
263        # to minimize the dependencies on the rest of Autotest.
264        self._servo_host = servo_host
265        self._servo_serial = servo_serial
266        self._server = servo_host.get_servod_server_proxy()
267        self._power_state = _PowerStateController(self)
268        self._uart = _Uart(self)
269        self._usb_state = None
270        self._programmer = None
271
272
273    @property
274    def servo_serial(self):
275        """Returns the serial number of the servo board."""
276        return self._servo_serial
277
278
279    def get_power_state_controller(self):
280        """Return the power state controller for this Servo.
281
282        The power state controller provides board-independent
283        interfaces for reset, power-on, power-off operations.
284
285        """
286        return self._power_state
287
288
289    def initialize_dut(self, cold_reset=False):
290        """Initializes a dut for testing purposes.
291
292        This sets various servo signals back to default values
293        appropriate for the target board.  By default, if the DUT
294        is already on, it stays on.  If the DUT is powered off
295        before initialization, its state afterward is unspecified.
296
297        Rationale:  Basic initialization of servo sets the lid open,
298        when there is a lid.  This operation won't affect powered on
299        units; however, setting the lid open may power on a unit
300        that's off, depending on the board type and previous state
301        of the device.
302
303        If `cold_reset` is a true value, the DUT and its EC will be
304        reset, and the DUT rebooted in normal mode.
305
306        @param cold_reset If True, cold reset the device after
307                          initialization.
308        """
309        self._server.hwinit()
310        self.set('usb_mux_oe1', 'on')
311        self._usb_state = None
312        self.switch_usbkey('off')
313        self._uart.start_capture()
314        if cold_reset:
315            self._power_state.reset()
316        logging.debug('Servo initialized, version is %s',
317                      self._server.get_version())
318
319
320    def is_localhost(self):
321        """Is the servod hosted locally?
322
323        Returns:
324          True if local hosted; otherwise, False.
325        """
326        return self._servo_host.is_localhost()
327
328
329    def power_long_press(self):
330        """Simulate a long power button press."""
331        # After a long power press, the EC may ignore the next power
332        # button press (at least on Alex).  To guarantee that this
333        # won't happen, we need to allow the EC one second to
334        # collect itself.
335        self._server.power_long_press()
336
337
338    def power_normal_press(self):
339        """Simulate a normal power button press."""
340        self._server.power_normal_press()
341
342
343    def power_short_press(self):
344        """Simulate a short power button press."""
345        self._server.power_short_press()
346
347
348    def power_key(self, press_secs=''):
349        """Simulate a power button press.
350
351        @param press_secs : Str. Time to press key.
352        """
353        self._server.power_key(press_secs)
354
355
356    def lid_open(self):
357        """Simulate opening the lid and raise exception if all attempts fail"""
358        self.set('lid_open', 'yes')
359
360
361    def lid_close(self):
362        """Simulate closing the lid and raise exception if all attempts fail
363
364        Waits 6 seconds to ensure the device is fully asleep before returning.
365        """
366        self.set('lid_open', 'no')
367        time.sleep(Servo.SLEEP_DELAY)
368
369    def volume_up(self, timeout=300):
370        """Simulate pushing the volume down button.
371
372        @param timeout: Timeout for setting the volume.
373        """
374        self.set_get_all(['volume_up:yes',
375                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
376                          'volume_up:no'])
377        # we need to wait for commands to take effect before moving on
378        time_left = float(timeout)
379        while time_left > 0.0:
380            value = self.get('volume_up')
381            if value == 'no':
382                return
383            time.sleep(self.SHORT_DELAY)
384            time_left = time_left - self.SHORT_DELAY
385        raise error.TestFail("Failed setting volume_up to no")
386
387    def volume_down(self, timeout=300):
388        """Simulate pushing the volume down button.
389
390        @param timeout: Timeout for setting the volume.
391        """
392        self.set_get_all(['volume_down:yes',
393                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
394                          'volume_down:no'])
395        # we need to wait for commands to take effect before moving on
396        time_left = float(timeout)
397        while time_left > 0.0:
398            value = self.get('volume_down')
399            if value == 'no':
400                return
401            time.sleep(self.SHORT_DELAY)
402            time_left = time_left - self.SHORT_DELAY
403        raise error.TestFail("Failed setting volume_down to no")
404
405    def ctrl_d(self, press_secs=''):
406        """Simulate Ctrl-d simultaneous button presses.
407
408        @param press_secs : Str. Time to press key.
409        """
410        self._server.ctrl_d(press_secs)
411
412
413    def ctrl_u(self):
414        """Simulate Ctrl-u simultaneous button presses.
415
416        @param press_secs : Str. Time to press key.
417        """
418        self._server.ctrl_u()
419
420
421    def ctrl_enter(self, press_secs=''):
422        """Simulate Ctrl-enter simultaneous button presses.
423
424        @param press_secs : Str. Time to press key.
425        """
426        self._server.ctrl_enter(press_secs)
427
428
429    def d_key(self, press_secs=''):
430        """Simulate Enter key button press.
431
432        @param press_secs : Str. Time to press key.
433        """
434        self._server.d_key(press_secs)
435
436
437    def ctrl_key(self, press_secs=''):
438        """Simulate Enter key button press.
439
440        @param press_secs : Str. Time to press key.
441        """
442        self._server.ctrl_key(press_secs)
443
444
445    def enter_key(self, press_secs=''):
446        """Simulate Enter key button press.
447
448        @param press_secs : Str. Time to press key.
449        """
450        self._server.enter_key(press_secs)
451
452
453    def refresh_key(self, press_secs=''):
454        """Simulate Refresh key (F3) button press.
455
456        @param press_secs : Str. Time to press key.
457        """
458        self._server.refresh_key(press_secs)
459
460
461    def ctrl_refresh_key(self, press_secs=''):
462        """Simulate Ctrl and Refresh (F3) simultaneous press.
463
464        This key combination is an alternative of Space key.
465
466        @param press_secs : Str. Time to press key.
467        """
468        self._server.ctrl_refresh_key(press_secs)
469
470
471    def imaginary_key(self, press_secs=''):
472        """Simulate imaginary key button press.
473
474        Maps to a key that doesn't physically exist.
475
476        @param press_secs : Str. Time to press key.
477        """
478        self._server.imaginary_key(press_secs)
479
480
481    def sysrq_x(self, press_secs=''):
482        """Simulate Alt VolumeUp X simulataneous press.
483
484        This key combination is the kernel system request (sysrq) X.
485
486        @param press_secs : Str. Time to press key.
487        """
488        self._server.sysrq_x(press_secs)
489
490
491    def toggle_recovery_switch(self):
492        """Toggle recovery switch on and off."""
493        self.enable_recovery_mode()
494        time.sleep(self.REC_TOGGLE_DELAY)
495        self.disable_recovery_mode()
496
497
498    def enable_recovery_mode(self):
499        """Enable recovery mode on device."""
500        self.set('rec_mode', 'on')
501
502
503    def disable_recovery_mode(self):
504        """Disable recovery mode on device."""
505        self.set('rec_mode', 'off')
506
507
508    def toggle_development_switch(self):
509        """Toggle development switch on and off."""
510        self.enable_development_mode()
511        time.sleep(self.DEV_TOGGLE_DELAY)
512        self.disable_development_mode()
513
514
515    def enable_development_mode(self):
516        """Enable development mode on device."""
517        self.set('dev_mode', 'on')
518
519
520    def disable_development_mode(self):
521        """Disable development mode on device."""
522        self.set('dev_mode', 'off')
523
524    def boot_devmode(self):
525        """Boot a dev-mode device that is powered off."""
526        self.power_short_press()
527        self.pass_devmode()
528
529
530    def pass_devmode(self):
531        """Pass through boot screens in dev-mode."""
532        time.sleep(Servo.BOOT_DELAY)
533        self.ctrl_d()
534        time.sleep(Servo.BOOT_DELAY)
535
536
537    def get_board(self):
538        """Get the board connected to servod."""
539        return self._server.get_board()
540
541
542    def get_base_board(self):
543        """Get the board of the base connected to servod."""
544        try:
545            return self._server.get_base_board()
546        except  xmlrpclib.Fault as e:
547            # TODO(waihong): Remove the following compatibility check when
548            # the new versions of hdctools are deployed.
549            if 'not supported' in str(e):
550                logging.warning('The servod is too old that get_base_board '
551                        'not supported.')
552                return ''
553            raise
554
555
556    def get_ec_active_copy(self):
557        """Get the active copy of the EC image."""
558        return self.get('ec_active_copy')
559
560
561    def _get_xmlrpclib_exception(self, xmlexc):
562        """Get meaningful exception string from xmlrpc.
563
564        Args:
565            xmlexc: xmlrpclib.Fault object
566
567        xmlrpclib.Fault.faultString has the following format:
568
569        <type 'exception type'>:'actual error message'
570
571        Parse and return the real exception from the servod side instead of the
572        less meaningful one like,
573           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
574           attribute 'hw_driver'">
575
576        Returns:
577            string of underlying exception raised in servod.
578        """
579        return re.sub('^.*>:', '', xmlexc.faultString)
580
581
582    def get(self, gpio_name):
583        """Get the value of a gpio from Servod.
584
585        @param gpio_name Name of the gpio.
586        """
587        assert gpio_name
588        try:
589            return self._server.get(gpio_name)
590        except  xmlrpclib.Fault as e:
591            err_msg = "Getting '%s' :: %s" % \
592                (gpio_name, self._get_xmlrpclib_exception(e))
593            raise error.TestFail(err_msg)
594
595
596    def set(self, gpio_name, gpio_value):
597        """Set and check the value of a gpio using Servod.
598
599        @param gpio_name Name of the gpio.
600        @param gpio_value New setting for the gpio.
601        """
602        self.set_nocheck(gpio_name, gpio_value)
603        retry_count = Servo.GET_RETRY_MAX
604        while gpio_value != self.get(gpio_name) and retry_count:
605            logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
606                         retry_count)
607            retry_count -= 1
608            time.sleep(Servo.SHORT_DELAY)
609        if not retry_count:
610            assert gpio_value == self.get(gpio_name), \
611                'Servo failed to set %s to %s' % (gpio_name, gpio_value)
612
613
614    def set_nocheck(self, gpio_name, gpio_value):
615        """Set the value of a gpio using Servod.
616
617        @param gpio_name Name of the gpio.
618        @param gpio_value New setting for the gpio.
619        """
620        assert gpio_name and gpio_value
621        logging.info('Setting %s to %r', gpio_name, gpio_value)
622        try:
623            self._server.set(gpio_name, gpio_value)
624        except  xmlrpclib.Fault as e:
625            err_msg = "Setting '%s' to %r :: %s" % \
626                (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
627            raise error.TestFail(err_msg)
628
629
630    def set_get_all(self, controls):
631        """Set &| get one or more control values.
632
633        @param controls: list of strings, controls to set &| get.
634
635        @raise: error.TestError in case error occurs setting/getting values.
636        """
637        rv = []
638        try:
639            logging.info('Set/get all: %s', str(controls))
640            rv = self._server.set_get_all(controls)
641        except xmlrpclib.Fault as e:
642            # TODO(waihong): Remove the following backward compatibility when
643            # the new versions of hdctools are deployed.
644            if 'not supported' in str(e):
645                logging.warning('The servod is too old that set_get_all '
646                        'not supported. Use set and get instead.')
647                for control in controls:
648                    if ':' in control:
649                        (name, value) = control.split(':')
650                        if name == 'sleep':
651                            time.sleep(float(value))
652                        else:
653                            self.set_nocheck(name, value)
654                        rv.append(True)
655                    else:
656                        rv.append(self.get(name))
657            else:
658                err_msg = "Problem with '%s' :: %s" % \
659                    (controls, self._get_xmlrpclib_exception(e))
660                raise error.TestFail(err_msg)
661        return rv
662
663
664    # TODO(waihong) It may fail if multiple servo's are connected to the same
665    # host. Should look for a better way, like the USB serial name, to identify
666    # the USB device.
667    # TODO(sbasi) Remove this code from autoserv once firmware tests have been
668    # updated.
669    def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
670        """Probe the USB disk device plugged-in the servo from the host side.
671
672        It uses servod to discover if there is a usb device attached to it.
673
674        @param timeout The timeout period when probing for the usb host device.
675
676        @return: String of USB disk path (e.g. '/dev/sdb') or None.
677        """
678        # Set up Servo's usb mux.
679        self.switch_usbkey('host')
680        return self._server.probe_host_usb_dev(timeout) or None
681
682
683    def image_to_servo_usb(self, image_path=None,
684                           make_image_noninteractive=False):
685        """Install an image to the USB key plugged into the servo.
686
687        This method may copy any image to the servo USB key including a
688        recovery image or a test image.  These images are frequently used
689        for test purposes such as restoring a corrupted image or conducting
690        an upgrade of ec/fw/kernel as part of a test of a specific image part.
691
692        @param image_path Path on the host to the recovery image.
693        @param make_image_noninteractive Make the recovery image
694                                   noninteractive, therefore the DUT
695                                   will reboot automatically after
696                                   installation.
697        """
698        # We're about to start plugging/unplugging the USB key.  We
699        # don't know the state of the DUT, or what it might choose
700        # to do to the device after hotplug.  To avoid surprises,
701        # force the DUT to be off.
702        self._server.hwinit()
703        self._power_state.power_off()
704
705        # Set up Servo's usb mux.
706        self.switch_usbkey('host')
707        if image_path:
708            logging.info('Searching for usb device and copying image to it. '
709                         'Please wait a few minutes...')
710            if not self._server.download_image_to_usb(image_path):
711                logging.error('Failed to transfer requested image to USB. '
712                              'Please take a look at Servo Logs.')
713                raise error.AutotestError('Download image to usb failed.')
714            if make_image_noninteractive:
715                logging.info('Making image noninteractive')
716                if not self._server.make_image_noninteractive():
717                    logging.error('Failed to make image noninteractive. '
718                                  'Please take a look at Servo Logs.')
719
720    def boot_in_recovery_mode(self):
721        """Boot host DUT in recovery mode."""
722        self._power_state.power_on(rec_mode=self._power_state.REC_ON)
723        self.switch_usbkey('dut')
724
725
726    def install_recovery_image(self, image_path=None,
727                               make_image_noninteractive=False):
728        """Install the recovery image specified by the path onto the DUT.
729
730        This method uses google recovery mode to install a recovery image
731        onto a DUT through the use of a USB stick that is mounted on a servo
732        board specified by the usb_dev.  If no image path is specified
733        we use the recovery image already on the usb image.
734
735        @param image_path: Path on the host to the recovery image.
736        @param make_image_noninteractive: Make the recovery image
737                noninteractive, therefore the DUT will reboot automatically
738                after installation.
739        """
740        self.image_to_servo_usb(image_path, make_image_noninteractive)
741        self.boot_in_recovery_mode()
742
743
744    def _scp_image(self, image_path):
745        """Copy image to the servo host.
746
747        When programming a firmware image on the DUT, the image must be
748        located on the host to which the servo device is connected. Sometimes
749        servo is controlled by a remote host, in this case the image needs to
750        be transferred to the remote host.
751
752        @param image_path: a string, name of the firmware image file to be
753               transferred.
754        @return: a string, full path name of the copied file on the remote.
755        """
756
757        dest_path = os.path.join('/tmp', os.path.basename(image_path))
758        self._servo_host.send_file(image_path, dest_path)
759        return dest_path
760
761
762    def system(self, command, timeout=3600):
763        """Execute the passed in command on the servod host.
764
765        @param command Command to be executed.
766        @param timeout Maximum number of seconds of runtime allowed. Default to
767                       1 hour.
768        """
769        logging.info('Will execute on servo host: %s', command)
770        self._servo_host.run(command, timeout=timeout)
771
772
773    def system_output(self, command, timeout=3600,
774                      ignore_status=False, args=()):
775        """Execute the passed in command on the servod host, return stdout.
776
777        @param command a string, the command to execute
778        @param timeout an int, max number of seconds to wait til command
779               execution completes. Default to 1 hour.
780        @param ignore_status a Boolean, if true - ignore command's nonzero exit
781               status, otherwise an exception will be thrown
782        @param args a tuple of strings, each becoming a separate command line
783               parameter for the command
784        @return command's stdout as a string.
785        """
786        return self._servo_host.run(command, timeout=timeout,
787                                    ignore_status=ignore_status,
788                                    args=args).stdout.strip()
789
790
791    def get_servo_version(self):
792        """Get the version of the servo, e.g., servo_v2 or servo_v3.
793
794        @return: The version of the servo.
795
796        """
797        return self._server.get_version()
798
799
800    def _initialize_programmer(self, rw_only=False):
801        """Initialize the firmware programmer.
802
803        @param rw_only: True to initialize a programmer which only
804                        programs the RW portions.
805        """
806        if self._programmer:
807            return
808        # Initialize firmware programmer
809        servo_version = self.get_servo_version()
810        if servo_version.startswith('servo_v2'):
811            self._programmer = firmware_programmer.ProgrammerV2(self)
812            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
813        # Both servo v3 and v4 use the same programming methods so just leverage
814        # ProgrammerV3 for servo v4 as well.
815        elif (servo_version.startswith('servo_v3') or
816              servo_version.startswith('servo_v4')):
817            self._programmer = firmware_programmer.ProgrammerV3(self)
818            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
819        else:
820            raise error.TestError(
821                    'No firmware programmer for servo version: %s' %
822                    servo_version)
823
824
825    def program_bios(self, image, rw_only=False):
826        """Program bios on DUT with given image.
827
828        @param image: a string, file name of the BIOS image to program
829                      on the DUT.
830        @param rw_only: True to only program the RW portion of BIOS.
831
832        """
833        self._initialize_programmer()
834        if not self.is_localhost():
835            image = self._scp_image(image)
836        if rw_only:
837            self._programmer_rw.program_bios(image)
838        else:
839            self._programmer.program_bios(image)
840
841
842    def program_ec(self, image, rw_only=False):
843        """Program ec on DUT with given image.
844
845        @param image: a string, file name of the EC image to program
846                      on the DUT.
847        @param rw_only: True to only program the RW portion of EC.
848
849        """
850        self._initialize_programmer()
851        if not self.is_localhost():
852            image = self._scp_image(image)
853        if rw_only:
854            self._programmer_rw.program_ec(image)
855        else:
856            self._programmer.program_ec(image)
857
858
859    def _reprogram(self, tarball_path, firmware_name, image_candidates,
860                   rw_only):
861        """Helper function to reprogram firmware for EC or BIOS.
862
863        @param tarball_path: The path of the downloaded build tarball.
864        @param: firmware_name: either 'EC' or 'BIOS'.
865        @param image_candidates: A tuple of the paths of image candidates.
866        @param rw_only: True to only install firmware to its RW portions. Keep
867                the RO portions unchanged.
868
869        @raise: TestError if cannot extract firmware from the tarball.
870        """
871        dest_dir = os.path.dirname(tarball_path)
872        image = _extract_image_from_tarball(tarball_path, dest_dir,
873                                            image_candidates)
874        if not image:
875            if firmware_name == 'EC':
876                logging.info('Not a Chrome EC, ignore re-programming it')
877                return
878            else:
879                raise error.TestError('Failed to extract the %s image from '
880                                      'tarball' % firmware_name)
881
882        logging.info('Will re-program %s %snow', firmware_name,
883                     'RW ' if rw_only else '')
884
885        if firmware_name == 'EC':
886            self.program_ec(os.path.join(dest_dir, image), rw_only)
887        else:
888            self.program_bios(os.path.join(dest_dir, image), rw_only)
889
890
891    def program_firmware(self, model, tarball_path, rw_only=False):
892        """Program firmware (EC, if applied, and BIOS) of the DUT.
893
894        @param model: The DUT model name.
895        @param tarball_path: The path of the downloaded build tarball.
896        @param rw_only: True to only install firmware to its RW portions. Keep
897                the RO portions unchanged.
898        """
899        ap_image_candidates = ('image.bin', 'image-%s.bin' % model)
900        ec_image_candidates = ('ec.bin', '%s/ec.bin' % model)
901
902        self._reprogram(tarball_path, 'EC', ec_image_candidates, rw_only)
903        self._reprogram(tarball_path, 'BIOS', ap_image_candidates, rw_only)
904
905        self.get_power_state_controller().reset()
906        time.sleep(Servo.BOOT_DELAY)
907
908
909    def _switch_usbkey_power(self, power_state, detection_delay=False):
910        """Switch usbkey power.
911
912        This function switches usbkey power by setting the value of
913        'prtctl4_pwren'. If the power is already in the
914        requested state, this function simply returns.
915
916        @param power_state: A string, 'on' or 'off'.
917        @param detection_delay: A boolean value, if True, sleep
918                                for |USB_DETECTION_DELAY| after switching
919                                the power on.
920        """
921        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
922        # handle beaglebones that haven't yet updated and have the
923        # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
924        # have been updated and also think about a better way to handle
925        # situations like this.
926        try:
927            self._server.safe_switch_usbkey_power(power_state)
928        except Exception:
929            self.set('prtctl4_pwren', power_state)
930        if power_state == 'off':
931            time.sleep(self.USB_POWEROFF_DELAY)
932        elif detection_delay:
933            time.sleep(self.USB_DETECTION_DELAY)
934
935
936    def switch_usbkey(self, usb_state):
937        """Connect USB flash stick to either host or DUT, or turn USB port off.
938
939        This function switches the servo multiplexer to provide electrical
940        connection between the USB port J3 and either host or DUT side. It
941        can also be used to turn the USB port off.
942
943        Switching to 'dut' or 'host' is accompanied by powercycling
944        of the USB stick, because it sometimes gets wedged if the mux
945        is switched while the stick power is on.
946
947        @param usb_state: A string, one of 'dut', 'host', or 'off'.
948                          'dut' and 'host' indicate which side the
949                          USB flash device is required to be connected to.
950                          'off' indicates turning the USB port off.
951
952        @raise: error.TestError in case the parameter is not 'dut'
953                'host', or 'off'.
954        """
955        if self.get_usbkey_direction() == usb_state:
956            return
957
958        if usb_state == 'off':
959            self._switch_usbkey_power('off')
960            self._usb_state = usb_state
961            return
962        elif usb_state == 'host':
963            mux_direction = 'servo_sees_usbkey'
964        elif usb_state == 'dut':
965            mux_direction = 'dut_sees_usbkey'
966        else:
967            raise error.TestError('Unknown USB state request: %s' % usb_state)
968
969        self._switch_usbkey_power('off')
970        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
971        # handle beaglebones that haven't yet updated and have the
972        # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
973        # been updated and also think about a better way to handle situations
974        # like this.
975        try:
976            self._server.safe_switch_usbkey(mux_direction)
977        except Exception:
978            self.set('usb_mux_sel1', mux_direction)
979        time.sleep(self.USB_POWEROFF_DELAY)
980        self._switch_usbkey_power('on', usb_state == 'host')
981        self._usb_state = usb_state
982
983
984    def get_usbkey_direction(self):
985        """Get which side USB is connected to or 'off' if usb power is off.
986
987        @return: A string, one of 'dut', 'host', or 'off'.
988        """
989        if not self._usb_state:
990            if self.get('prtctl4_pwren') == 'off':
991                self._usb_state = 'off'
992            elif self.get('usb_mux_sel1').startswith('dut'):
993                self._usb_state = 'dut'
994            else:
995                self._usb_state = 'host'
996        return self._usb_state
997
998
999    def set_servo_v4_role(self, role):
1000        """Set the power role of servo v4, either 'src' or 'snk'.
1001
1002        It does nothing if not a servo v4.
1003
1004        @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
1005        """
1006        servo_version = self.get_servo_version()
1007        if servo_version.startswith('servo_v4'):
1008            value = self.get('servo_v4_role')
1009            if value != role:
1010                self.set_nocheck('servo_v4_role', role)
1011            else:
1012                logging.debug('Already in the role: %s.', role)
1013        else:
1014            logging.debug('Not a servo v4, unable to set role to %s.', role)
1015
1016
1017    @property
1018    def uart_logs_dir(self):
1019        """Return the directory to save UART logs."""
1020        return self._uart.logs_dir if self._uart else ""
1021
1022
1023    @uart_logs_dir.setter
1024    def uart_logs_dir(self, logs_dir):
1025        """Set directory to save UART logs.
1026
1027        @param logs_dir  String of directory name."""
1028        if self._uart:
1029            self._uart.logs_dir = logs_dir
1030
1031
1032    def close(self):
1033        """Close the servo object."""
1034        if self._uart:
1035            self._uart.stop_capture()
1036            self._uart.dump()
1037            self._uart = None
1038