# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import re import time from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error # en-US key matrix (from "kb membrane pin matrix.pdf") KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3), '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9), '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4), 'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6), 'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5), '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3), 'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9), ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2), 'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5), '.': (5, 9), '/': (5, 8), ' ': (5, 11), '': (6, 12), '': (0, 10), '': (6, 11), '': (2, 1), '': (0, 4), '': (7, 7), '': (4, 0), '': (1, 1), '': (1, 11), '': (3, 2), '': (6, 10), '': (2, 0), '': (0, 2), '': (0, 1), '': (2, 2), '': (1, 2), '': (3, 4), '': (2, 4), '': (1, 4), '': (2, 9), '': (1, 9), '': (7, 11), '': (5, 7), '': (4, 11), '': (7, 12)} def has_ectool(): """Determine if ectool shell command is present. Returns: boolean true if avail, false otherwise. """ cmd = 'which ectool' return (utils.system(cmd, ignore_status=True) == 0) class ECError(Exception): """Base class for a failure when communicating with EC.""" pass class EC_Common(object): """Class for EC common. This incredibly brief base class is intended to encapsulate common elements across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment that includes only the use of ectool. """ def __init__(self, target='cros_ec'): """Constructor. @param target: target name of ec to communicate with. """ if not has_ectool(): ec_info = utils.system_output("mosys ec info", ignore_status=True) logging.warning("Ectool absent on this platform ( %s )", ec_info) raise error.TestNAError("Platform doesn't support ectool") self._target = target def ec_command(self, cmd, **kwargs): """Executes ec command and returns results. @param cmd: string of command to execute. @param kwargs: optional params passed to utils.system_output @returns: string of results from ec command. """ full_cmd = 'ectool --name=%s %s' % (self._target, cmd) logging.debug('Command: %s', full_cmd) result = utils.system_output(full_cmd, **kwargs) logging.debug('Result: %s', result) return result class EC(EC_Common): """Class for CrOS embedded controller (EC).""" HELLO_RE = "EC says hello" GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)" SET_FANSPEED_RE = "Fan target RPM set." TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)" # : TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)" TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on" # For battery, check we can see a non-zero capacity value. BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh" LIGHTBAR_RE = "^ 05\s+3f\s+3f$" def __init__(self): """Constructor.""" super(EC, self).__init__() self._temperature_dict = None def hello(self, **kwargs): """Test EC hello command. @param kwargs: optional params passed to utils.system_output @returns True if success False otherwise. """ response = self.ec_command('hello', **kwargs) return (re.search(self.HELLO_RE, response) is not None) def auto_fan_ctrl(self): """Turns auto fan ctrl on. @returns True if success False otherwise. """ response = self.ec_command('autofanctrl') logging.info('Turned on auto fan control.') return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None) def get_fanspeed(self): """Gets fanspeed. @raises error.TestError if regexp fails to match. @returns integer of fan speed RPM. """ response = self.ec_command('pwmgetfanrpm') match = re.search(self.GET_FANSPEED_RE, response) if not match: raise error.TestError('Unable to read fan speed') rpm = int(match.group(1)) logging.info('Fan speed: %d', rpm) return rpm def set_fanspeed(self, rpm): """Sets fan speed. @param rpm: integer of fan speed RPM to set @returns True if success False otherwise. """ response = self.ec_command('pwmsetfanrpm %d' % rpm) logging.info('Set fan speed: %d', rpm) return (re.search(self.SET_FANSPEED_RE, response) is not None) def _get_temperature_dict(self): """Read EC temperature name and idx into a dict. @returns dict where key=, value = """ # The sensor (name, idx) mapping does not change. if self._temperature_dict: return self._temperature_dict temperature_dict = {} response = self.ec_command('tempsinfo all') for rline in response.split('\n'): match = re.search(self.TEMP_SENSOR_INFO_RE, rline) if match: temperature_dict[match.group(3)] = int(match.group(1)) self._temperature_dict = temperature_dict return temperature_dict def get_temperature(self, idx=None, name=None): """Gets temperature from idx sensor. Reads temperature either directly if idx is provided or by discovering idx using name. @param idx: integer of temp sensor to read. Default=None @param name: string of temp sensor to read. Default=None. For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro @raises ECError if fails to find idx of name. @raises error.TestError if fails to read sensor or fails to identify sensor to read from idx & name param. @returns integer of temperature reading in degrees Kelvin. """ if idx is None: temperature_dict = self._get_temperature_dict() if name in temperature_dict: idx = temperature_dict[name] else: raise ECError('Finding temp idx for name %s' % name) response = self.ec_command('temps %d' % idx) match = re.search(self.TEMP_SENSOR_TEMP_RE, response) if not match: raise error.TestError('Reading temperature idx %d' % idx) return int(match.group(1)) def get_battery(self): """Get battery presence (design capacity found). @returns True if success False otherwise. """ try: response = self.ec_command('battery') except error.CmdError: raise ECError('calling EC battery command') return (re.search(self.BATTERY_RE, response) is not None) def get_lightbar(self): """Test lightbar. @returns True if success False otherwise. """ self.ec_command('lightbar on') self.ec_command('lightbar init') self.ec_command('lightbar 4 255 255 255') response = self.ec_command('lightbar') self.ec_command('lightbar off') return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None) def key_press(self, key): """Emit key down and up signal of the keyboard. @param key: name of a key defined in KEYMATRIX. """ self.key_down(key) self.key_up(key) def _key_action(self, key, action_type): if not key in KEYMATRIX: raise error.TestError('Unknown key: ' + key) row, col = KEYMATRIX[key] self.ec_command('kbpress %d %d %d' % (row, col, action_type)) def key_down(self, key): """Emit key down signal of the keyboard. @param key: name of a key defined in KEYMATRIX. """ self._key_action(key, 1) def key_up(self, key): """Emit key up signal of the keyboard. @param key: name of a key defined in KEYMATRIX. """ self._key_action(key, 0) class EC_USBPD_Port(EC_Common): """Class for CrOS embedded controller for USB-PD Port. Public attributes: index: integer of USB type-C port index. Public Methods: is_dfp: Determine if data role is Downstream Facing Port (DFP). is_amode_supported: Check if alternate mode is supported by port. is_amode_entered: Check if alternate mode is entered. set_amode: Set an alternate mode. Private attributes: _port: integer of USB type-C port id. _port_info: holds usbpd protocol info. _amodes: holds alternate mode info. Private methods: _invalidate_port_data: Remove port data to force re-eval. _get_port_info: Get USB-PD port info. _get_amodes: parse and return port's svid info. """ def __init__(self, index): """Constructor. @param index: integer of USB type-C port index. """ self.index = index # TODO(crosbug.com/p/38133) target= only works for samus super(EC_USBPD_Port, self).__init__(target='cros_pd') # Interrogate port at instantiation. Use invalidate to force re-eval. self._port_info = self._get_port_info() self._amodes = self._get_amodes() def _invalidate_port_data(self): """Remove port data to force re-eval.""" self._port_info = None self._amodes = None def _get_port_info(self): """Get USB-PD port info. ectool command usbpd provides the following information about the port: - Enabled/Disabled - Power & Data Role - Polarity - Protocol State At time of authoring it looks like: Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY @raises error.TestError if ... port info not parseable. @returns dictionary for with keyval pairs: enabled: True | False | None power_role: sink | source | None data_role: UFP | DFP | None is_reversed: True | False | None state: various strings | None """ PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \ 'Polarity:CC(\d+)\s+State:(\w+)' match = re.search(PORT_INFO_RE, self.ec_command("usbpd %s" % (self.index))) if not match or int(match.group(1)) != self.index: error.TestError('Unable to determine port %d info' % self.index) pinfo = dict(enabled=None, power_role=None, data_role=None, is_reversed=None, state=None) pinfo['enabled'] = match.group(2) == 'enabled' pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source' pinfo['data_role'] = match.group(4) pinfo['is_reversed'] = True if match.group(5) == '2' else False pinfo['state'] = match.group(6) logging.debug('port_info = %s', pinfo) return pinfo def _get_amodes(self): """Parse alternate modes from pdgetmode. Looks like ... *SVID:0xff01 *0x00000485 0x00000000 ... SVID:0x18d1 0x00000001 0x00000000 ... @returns dictionary of format: : {active: True|False, configs: , opos:} where: : USB-IF Standard or vendor id as hex string (i.e. 0xff01) : list of uint32_t configs : integer of active object position. Note, this is the config list index + 1 """ SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)' svids = dict() cmd = 'pdgetmode %d' % self.index for line in self.ec_command(cmd, ignore_status=True).split('\n'): if line.strip() == '': continue logging.debug('pdgetmode line: %s', line) match = re.search(SVID_RE, line) if not match: logging.warning("Unable to parse SVID line %s", line) continue active = match.group(1) == '*' svid = match.group(2) configs_str = match.group(3) configs = list() opos = None for i,config in enumerate(configs_str.split(), 1): if config.startswith('*'): opos = i config = config[1:] config = int(config, 16) # ignore unpopulated configs if config == 0: continue configs.append(config) svids[svid] = dict(active=active, configs=configs, opos=opos) logging.debug("Port %d svids = %s", self.index, svids) return svids def is_dfp(self): """Determine if data role is Downstream Facing Port (DFP). @returns True if DFP False otherwise. """ if self._port_info is None: self._port_info = self._get_port_info() return self._port_info['data_role'] == 'DFP' def is_amode_supported(self, svid): """Check if alternate mode is supported by port partner. @param svid: alternate mode SVID hexstring (i.e. 0xff01) """ if self._amodes is None: self._amodes = self._get_amodes() if svid in self._amodes.keys(): return True return False def is_amode_entered(self, svid, opos): """Check if alternate mode is entered. @param svid: alternate mode SVID hexstring (i.e. 0xff01). @param opos: object position of config to act on. @returns True if entered False otherwise """ if self._amodes is None: self._amodes = self._get_amodes() if not self.is_amode_supported(svid): return False if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos: return True return False def set_amode(self, svid, opos, enter, delay_secs=2): """Set alternate mode. @param svid: alternate mode SVID hexstring (i.e. 0xff01). @param opos: object position of config to act on. @param enter: Boolean of whether to enter mode. @raises error.TestError if ... mode not supported. opos is > number of configs. @returns True if successful False otherwise """ if self._amodes is None: self._amodes = self._get_amodes() if svid not in self._amodes.keys(): raise error.TestError("SVID %s not supported", svid) if opos > len(self._amodes[svid]['configs']): raise error.TestError("opos > available configs") cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos, 1 if enter else 0) self.ec_command(cmd, ignore_status=True) self._invalidate_port_data() # allow some time for mode entry/exit time.sleep(delay_secs) return self.is_amode_entered(svid, opos) == enter def get_flash_info(self): mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*' mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*' flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major', 'dev_minor', 'rw_hash', 'image_status']) cmd = 'infopddev %d' % self.index tries = 3 while (tries): res = self.ec_command(cmd, ignore_status=True) if not 'has no discovered device' in res: break tries -= 1 time.sleep(1) for ln in res.split('\n'): mat1 = re.match(mat1_re, ln) if mat1: flash_dict['ptype'] = int(mat1.group(1)) flash_dict['vid'] = mat1.group(2) flash_dict['pid'] = mat1.group(3) continue mat2 = re.match(mat2_re, ln) if mat2: flash_dict['dev_major'] = int(mat2.group(1)) flash_dict['dev_minor'] = int(mat2.group(2)) flash_dict['rw_hash'] = mat2.group(3) flash_dict['image_status'] = mat2.group(4) break return flash_dict class EC_USBPD(EC_Common): """Class for CrOS embedded controller for USB-PD. Public attributes: ports: list EC_USBPD_Port instances Public Methods: get_num_ports: get number of USB-PD ports device has. Private attributes: _num_ports: integer number of USB-PD ports device has. """ def __init__(self, num_ports=None): """Constructor. @param num_ports: total number of USB-PD ports on device. This is an override. If left 'None' will try to determine. """ self._num_ports = num_ports self.ports = list() # TODO(crosbug.com/p/38133) target= only works for samus super(EC_USBPD, self).__init__(target='cros_pd') if (self.get_num_ports() == 0): raise error.TestNAError("Device has no USB-PD ports") for i in xrange(self._num_ports): self.ports.append(EC_USBPD_Port(i)) def get_num_ports(self): """Determine the number of ports for device. Uses ectool's usbpdpower command which in turn makes host command call to EC_CMD_USB_PD_PORTS to determine the number of ports. TODO(tbroch) May want to consider adding separate ectool command to surface the number of ports directly instead of via usbpdpower @returns number of ports. """ if (self._num_ports is not None): return self._num_ports self._num_ports = len(self.ec_command("usbpdpower").split(b'\n')) return self._num_ports