# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import httplib import logging import socket import time import xmlrpclib from contextlib import contextmanager from PIL import Image from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.cros.chameleon import audio_board from autotest_lib.client.cros.chameleon import edid as edid_lib from autotest_lib.client.cros.chameleon import usb_controller CHAMELEON_PORT = 9992 class ChameleonConnectionError(error.TestError): """Indicates that connecting to Chameleon failed. It is fatal to the test unless caught. """ pass class ChameleonConnection(object): """ChameleonConnection abstracts the network connection to the board. ChameleonBoard and ChameleonPort use it for accessing Chameleon RPC. """ def __init__(self, hostname, port=CHAMELEON_PORT): """Constructs a ChameleonConnection. @param hostname: Hostname the chameleond process is running. @param port: Port number the chameleond process is listening on. @raise ChameleonConnectionError if connection failed. """ self.chameleond_proxy = ChameleonConnection._create_server_proxy( hostname, port) @staticmethod def _create_server_proxy(hostname, port): """Creates the chameleond server proxy. @param hostname: Hostname the chameleond process is running. @param port: Port number the chameleond process is listening on. @return ServerProxy object to chameleond. @raise ChameleonConnectionError if connection failed. """ remote = 'http://%s:%s' % (hostname, port) chameleond_proxy = xmlrpclib.ServerProxy(remote, allow_none=True) # Call a RPC to test. try: chameleond_proxy.GetSupportedPorts() except (socket.error, xmlrpclib.ProtocolError, httplib.BadStatusLine) as e: raise ChameleonConnectionError(e) return chameleond_proxy class ChameleonBoard(object): """ChameleonBoard is an abstraction of a Chameleon board. A Chameleond RPC proxy is passed to the construction such that it can use this proxy to control the Chameleon board. User can use host to access utilities that are not provided by Chameleond XMLRPC server, e.g. send_file and get_file, which are provided by ssh_host.SSHHost, which is the base class of ChameleonHost. """ def __init__(self, chameleon_connection, chameleon_host=None): """Construct a ChameleonBoard. @param chameleon_connection: ChameleonConnection object. @param chameleon_host: ChameleonHost object. None if this ChameleonBoard is not created by a ChameleonHost. """ self.host = chameleon_host self._chameleond_proxy = chameleon_connection.chameleond_proxy self._usb_ctrl = usb_controller.USBController(chameleon_connection) if self._chameleond_proxy.HasAudioBoard(): self._audio_board = audio_board.AudioBoard(chameleon_connection) else: self._audio_board = None logging.info('There is no audio board on this Chameleon.') def reset(self): """Resets Chameleon board.""" self._chameleond_proxy.Reset() def get_all_ports(self): """Gets all the ports on Chameleon board which are connected. @return: A list of ChameleonPort objects. """ ports = self._chameleond_proxy.ProbePorts() return [ChameleonPort(self._chameleond_proxy, port) for port in ports] def get_all_inputs(self): """Gets all the input ports on Chameleon board which are connected. @return: A list of ChameleonPort objects. """ ports = self._chameleond_proxy.ProbeInputs() return [ChameleonPort(self._chameleond_proxy, port) for port in ports] def get_all_outputs(self): """Gets all the output ports on Chameleon board which are connected. @return: A list of ChameleonPort objects. """ ports = self._chameleond_proxy.ProbeOutputs() return [ChameleonPort(self._chameleond_proxy, port) for port in ports] def get_label(self): """Gets the label which indicates the display connection. @return: A string of the label, like 'hdmi', 'dp_hdmi', etc. """ connectors = [] for port in self._chameleond_proxy.ProbeInputs(): if self._chameleond_proxy.HasVideoSupport(port): connector = self._chameleond_proxy.GetConnectorType(port).lower() connectors.append(connector) # Eliminate duplicated ports. It simplifies the labels of dual-port # devices, i.e. dp_dp categorized into dp. return '_'.join(sorted(set(connectors))) def get_audio_board(self): """Gets the audio board on Chameleon. @return: An AudioBoard object. """ return self._audio_board def get_usb_controller(self): """Gets the USB controller on Chameleon. @return: A USBController object. """ return self._usb_ctrl def get_mac_address(self): """Gets the MAC address of Chameleon. @return: A string for MAC address. """ return self._chameleond_proxy.GetMacAddress() class ChameleonPort(object): """ChameleonPort is an abstraction of a general port of a Chameleon board. It only contains some common methods shared with audio and video ports. A Chameleond RPC proxy and an port_id are passed to the construction. The port_id is the unique identity to the port. """ def __init__(self, chameleond_proxy, port_id): """Construct a ChameleonPort. @param chameleond_proxy: Chameleond RPC proxy object. @param port_id: The ID of the input port. """ self.chameleond_proxy = chameleond_proxy self.port_id = port_id def get_connector_id(self): """Returns the connector ID. @return: A number of connector ID. """ return self.port_id def get_connector_type(self): """Returns the human readable string for the connector type. @return: A string, like "VGA", "DVI", "HDMI", or "DP". """ return self.chameleond_proxy.GetConnectorType(self.port_id) def has_audio_support(self): """Returns if the input has audio support. @return: True if the input has audio support; otherwise, False. """ return self.chameleond_proxy.HasAudioSupport(self.port_id) def has_video_support(self): """Returns if the input has video support. @return: True if the input has video support; otherwise, False. """ return self.chameleond_proxy.HasVideoSupport(self.port_id) def plug(self): """Asserts HPD line to high, emulating plug.""" logging.info('Plug Chameleon port %d', self.port_id) self.chameleond_proxy.Plug(self.port_id) def unplug(self): """Deasserts HPD line to low, emulating unplug.""" logging.info('Unplug Chameleon port %d', self.port_id) self.chameleond_proxy.Unplug(self.port_id) def set_plug(self, plug_status): """Sets plug/unplug by plug_status. @param plug_status: True to plug; False to unplug. """ if plug_status: self.plug() else: self.unplug() @property def plugged(self): """ @returns True if this port is plugged to Chameleon, False otherwise. """ return self.chameleond_proxy.IsPlugged(self.port_id) class ChameleonVideoInput(ChameleonPort): """ChameleonVideoInput is an abstraction of a video input port. It contains some special methods to control a video input. """ _DUT_STABILIZE_TIME = 3 _DURATION_UNPLUG_FOR_EDID = 5 _TIMEOUT_VIDEO_STABLE_PROBE = 10 _EDID_ID_DISABLE = -1 def __init__(self, chameleon_port): """Construct a ChameleonVideoInput. @param chameleon_port: A general ChameleonPort object. """ self.chameleond_proxy = chameleon_port.chameleond_proxy self.port_id = chameleon_port.port_id def wait_video_input_stable(self, timeout=None): """Waits the video input stable or timeout. @param timeout: The time period to wait for. @return: True if the video input becomes stable within the timeout period; otherwise, False. """ is_input_stable = self.chameleond_proxy.WaitVideoInputStable( self.port_id, timeout) # If video input of Chameleon has been stable, wait for DUT software # layer to be stable as well to make sure all the configurations have # been propagated before proceeding. if is_input_stable: logging.info('Video input has been stable. Waiting for the DUT' ' to be stable...') time.sleep(self._DUT_STABILIZE_TIME) return is_input_stable def read_edid(self): """Reads the EDID. @return: An Edid object or NO_EDID. """ edid_binary = self.chameleond_proxy.ReadEdid(self.port_id) if edid_binary is None: return edid_lib.NO_EDID # Read EDID without verify. It may be made corrupted as intended # for the test purpose. return edid_lib.Edid(edid_binary.data, skip_verify=True) def apply_edid(self, edid): """Applies the given EDID. @param edid: An Edid object or NO_EDID. """ if edid is edid_lib.NO_EDID: self.chameleond_proxy.ApplyEdid(self.port_id, self._EDID_ID_DISABLE) else: edid_binary = xmlrpclib.Binary(edid.data) edid_id = self.chameleond_proxy.CreateEdid(edid_binary) self.chameleond_proxy.ApplyEdid(self.port_id, edid_id) self.chameleond_proxy.DestroyEdid(edid_id) @contextmanager def use_edid(self, edid): """Uses the given EDID in a with statement. It sets the EDID up in the beginning and restores to the original EDID in the end. This function is expected to be used in a with statement, like the following: with chameleon_port.use_edid(edid): do_some_test_on(chameleon_port) @param edid: An EDID object. """ # Set the EDID up in the beginning. plugged = self.plugged if plugged: self.unplug() original_edid = self.read_edid() logging.info('Apply EDID on port %d', self.port_id) self.apply_edid(edid) if plugged: time.sleep(self._DURATION_UNPLUG_FOR_EDID) self.plug() self.wait_video_input_stable(self._TIMEOUT_VIDEO_STABLE_PROBE) try: # Yeild to execute the with statement. yield finally: # Restore the original EDID in the end. current_edid = self.read_edid() if original_edid.data != current_edid.data: logging.info('Restore the original EDID.') self.apply_edid(original_edid) def use_edid_file(self, filename): """Uses the given EDID file in a with statement. It sets the EDID up in the beginning and restores to the original EDID in the end. This function is expected to be used in a with statement, like the following: with chameleon_port.use_edid_file(filename): do_some_test_on(chameleon_port) @param filename: A path to the EDID file. """ return self.use_edid(edid_lib.Edid.from_file(filename)) def fire_hpd_pulse(self, deassert_interval_usec, assert_interval_usec=None, repeat_count=1, end_level=1): """Fires one or more HPD pulse (low -> high -> low -> ...). @param deassert_interval_usec: The time in microsecond of the deassert pulse. @param assert_interval_usec: The time in microsecond of the assert pulse. If None, then use the same value as deassert_interval_usec. @param repeat_count: The count of HPD pulses to fire. @param end_level: HPD ends with 0 for LOW (unplugged) or 1 for HIGH (plugged). """ self.chameleond_proxy.FireHpdPulse( self.port_id, deassert_interval_usec, assert_interval_usec, repeat_count, int(bool(end_level))) def fire_mixed_hpd_pulses(self, widths): """Fires one or more HPD pulses, starting at low, of mixed widths. One must specify a list of segment widths in the widths argument where widths[0] is the width of the first low segment, widths[1] is that of the first high segment, widths[2] is that of the second low segment... etc. The HPD line stops at low if even number of segment widths are specified; otherwise, it stops at high. @param widths: list of pulse segment widths in usec. """ self.chameleond_proxy.FireMixedHpdPulses(self.port_id, widths) def capture_screen(self): """Captures Chameleon framebuffer. @return An Image object. """ return Image.fromstring( 'RGB', self.get_resolution(), self.chameleond_proxy.DumpPixels(self.port_id).data) def get_resolution(self): """Gets the source resolution. @return: A (width, height) tuple. """ # The return value of RPC is converted to a list. Convert it back to # a tuple. return tuple(self.chameleond_proxy.DetectResolution(self.port_id)) def set_content_protection(self, enable): """Sets the content protection state on the port. @param enable: True to enable; False to disable. """ self.chameleond_proxy.SetContentProtection(self.port_id, enable) def is_content_protection_enabled(self): """Returns True if the content protection is enabled on the port. @return: True if the content protection is enabled; otherwise, False. """ return self.chameleond_proxy.IsContentProtectionEnabled(self.port_id) def is_video_input_encrypted(self): """Returns True if the video input on the port is encrypted. @return: True if the video input is encrypted; otherwise, False. """ return self.chameleond_proxy.IsVideoInputEncrypted(self.port_id) def start_capturing_video(self, box=None): """ Captures video frames. Asynchronous, returns immediately. @param box: int tuple, left, upper, right, lower pixel coordinates. Defines the rectangular boundary within which to capture. """ if box is None: self.chameleond_proxy.StartCapturingVideo(self.port_id) else: self.chameleond_proxy.StartCapturingVideo(self.port_id, *box) def stop_capturing_video(self): """ Stops the ongoing video frame capturing. """ self.chameleond_proxy.StopCapturingVideo(self.port_id) def get_captured_frame_count(self): """ @return: int, the number of frames that have been captured. """ return self.chameleond_proxy.GetCapturedFrameCount() def read_captured_frame(self, index): """ @param index: int, index of the desired captured frame. @return: xmlrpclib.Binary object containing a byte-array of the pixels. """ frame = self.chameleond_proxy.ReadCapturedFrame(index) return Image.fromstring('RGB', self.get_captured_resolution(), frame.data) def get_captured_checksums(self, start_index=0, stop_index=None): """ @param start_index: int, index of the frame to start with. @param stop_index: int, index of the frame (excluded) to stop at. @return: a list of checksums of frames captured. """ return self.chameleond_proxy.GetCapturedChecksums(start_index, stop_index) def get_captured_resolution(self): """ @return: (width, height) tuple, the resolution of captured frames. """ return self.chameleond_proxy.GetCapturedResolution() class ChameleonAudioInput(ChameleonPort): """ChameleonAudioInput is an abstraction of an audio input port. It contains some special methods to control an audio input. """ def __init__(self, chameleon_port): """Construct a ChameleonAudioInput. @param chameleon_port: A general ChameleonPort object. """ self.chameleond_proxy = chameleon_port.chameleond_proxy self.port_id = chameleon_port.port_id def start_capturing_audio(self): """Starts capturing audio.""" return self.chameleond_proxy.StartCapturingAudio(self.port_id) def stop_capturing_audio(self): """Stops capturing audio. Returns: A tuple (remote_path, format). remote_path: The captured file path on Chameleon. format: A dict containing: file_type: 'raw' or 'wav'. sample_format: 'S32_LE' for 32-bit signed integer in little-endian. Refer to aplay manpage for other formats. channel: channel number. rate: sampling rate. """ remote_path, data_format = self.chameleond_proxy.StopCapturingAudio( self.port_id) return remote_path, data_format class ChameleonAudioOutput(ChameleonPort): """ChameleonAudioOutput is an abstraction of an audio output port. It contains some special methods to control an audio output. """ def __init__(self, chameleon_port): """Construct a ChameleonAudioOutput. @param chameleon_port: A general ChameleonPort object. """ self.chameleond_proxy = chameleon_port.chameleond_proxy self.port_id = chameleon_port.port_id def start_playing_audio(self, path, data_format): """Starts playing audio. @param path: The path to the file to play on Chameleon. @param data_format: A dict containing data format. Currently Chameleon only accepts data format: dict(file_type='raw', sample_format='S32_LE', channel=8, rate=48000). """ self.chameleond_proxy.StartPlayingAudio(self.port_id, path, data_format) def stop_playing_audio(self): """Stops capturing audio.""" self.chameleond_proxy.StopPlayingAudio(self.port_id) def make_chameleon_hostname(dut_hostname): """Given a DUT's hostname, returns the hostname of its Chameleon. @param dut_hostname: Hostname of a DUT. @return Hostname of the DUT's Chameleon. """ host_parts = dut_hostname.split('.') host_parts[0] = host_parts[0] + '-chameleon' return '.'.join(host_parts) def create_chameleon_board(dut_hostname, args): """Given either DUT's hostname or argments, creates a ChameleonBoard object. If the DUT's hostname is in the lab zone, it connects to the Chameleon by append the hostname with '-chameleon' suffix. If not, checks if the args contains the key-value pair 'chameleon_host=IP'. @param dut_hostname: Hostname of a DUT. @param args: A string of arguments passed from the command line. @return A ChameleonBoard object. @raise ChameleonConnectionError if unknown hostname. """ connection = None hostname = make_chameleon_hostname(dut_hostname) if utils.host_is_in_lab_zone(hostname): connection = ChameleonConnection(hostname) else: args_dict = utils.args_to_dict(args) hostname = args_dict.get('chameleon_host', None) port = args_dict.get('chameleon_port', CHAMELEON_PORT) if hostname: connection = ChameleonConnection(hostname, port) else: raise ChameleonConnectionError('No chameleon_host is given in args') return ChameleonBoard(connection)