1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast, logging, re, time
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.cros import ec
9
10# Hostevent codes, copied from:
11#     ec/include/ec_commands.h
12HOSTEVENT_LID_CLOSED        = 0x00000001
13HOSTEVENT_LID_OPEN          = 0x00000002
14HOSTEVENT_POWER_BUTTON      = 0x00000004
15HOSTEVENT_AC_CONNECTED      = 0x00000008
16HOSTEVENT_AC_DISCONNECTED   = 0x00000010
17HOSTEVENT_BATTERY_LOW       = 0x00000020
18HOSTEVENT_BATTERY_CRITICAL  = 0x00000040
19HOSTEVENT_BATTERY           = 0x00000080
20HOSTEVENT_THERMAL_THRESHOLD = 0x00000100
21HOSTEVENT_THERMAL_OVERLOAD  = 0x00000200
22HOSTEVENT_THERMAL           = 0x00000400
23HOSTEVENT_USB_CHARGER       = 0x00000800
24HOSTEVENT_KEY_PRESSED       = 0x00001000
25HOSTEVENT_INTERFACE_READY   = 0x00002000
26# Keyboard recovery combo has been pressed
27HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000
28# Shutdown due to thermal overload
29HOSTEVENT_THERMAL_SHUTDOWN  = 0x00008000
30# Shutdown due to battery level too low
31HOSTEVENT_BATTERY_SHUTDOWN  = 0x00010000
32HOSTEVENT_INVALID           = 0x80000000
33
34# Time to wait after sending keypress commands.
35KEYPRESS_RECOVERY_TIME = 0.5
36
37
38class ChromeConsole(object):
39    """Manages control of a Chrome console.
40
41    We control the Chrome console via the UART of a Servo board. Chrome console
42    provides many interfaces to set and get its behavior via console commands.
43    This class is to abstract these interfaces.
44    """
45
46    CMD = "_cmd"
47    REGEXP = "_regexp"
48    MULTICMD = "_multicmd"
49
50    def __init__(self, servo, name):
51        """Initialize and keep the servo object.
52
53        Args:
54          servo: A Servo object.
55          name: The console name.
56        """
57        self.name = name
58        self.uart_cmd = self.name + self.CMD
59        self.uart_regexp = self.name + self.REGEXP
60        self.uart_multicmd = self.name + self.MULTICMD
61
62        self._servo = servo
63        self._cached_uart_regexp = None
64
65
66    def set_uart_regexp(self, regexp):
67        if self._cached_uart_regexp == regexp:
68            return
69        self._cached_uart_regexp = regexp
70        self._servo.set(self.uart_regexp, regexp)
71
72
73    def send_command(self, commands):
74        """Send command through UART.
75
76        This function opens UART pty when called, and then command is sent
77        through UART.
78
79        Args:
80          commands: The commands to send, either a list or a string.
81        """
82        self.set_uart_regexp('None')
83        if isinstance(commands, list):
84            try:
85                self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands))
86            except error.TestFail as e:
87                if 'No control named' in str(e):
88                    logging.warning(
89                            'The servod is too old that uart_multicmd '
90                            'not supported. Use uart_cmd instead.')
91                    for command in commands:
92                        self._servo.set_nocheck(self.uart_cmd, command)
93                else:
94                    raise
95        else:
96            self._servo.set_nocheck(self.uart_cmd, commands)
97
98
99    def send_command_get_output(self, command, regexp_list):
100        """Send command through UART and wait for response.
101
102        This function waits for response message matching regular expressions.
103
104        Args:
105          command: The command sent.
106          regexp_list: List of regular expressions used to match response
107            message. Note, list must be ordered.
108
109        Returns:
110          List of tuples, each of which contains the entire matched string and
111          all the subgroups of the match. None if not matched.
112          For example:
113            response of the given command:
114              High temp: 37.2
115              Low temp: 36.4
116            regexp_list:
117              ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
118            returns:
119              [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
120
121        Raises:
122          error.TestError: An error when the given regexp_list is not valid.
123        """
124        if not isinstance(regexp_list, list):
125            raise error.TestError('Arugment regexp_list is not a list: %s' %
126                                  str(regexp_list))
127
128        self.set_uart_regexp(str(regexp_list))
129        self._servo.set_nocheck(self.uart_cmd, command)
130        return ast.literal_eval(self._servo.get(self.uart_cmd))
131
132
133class ChromeEC(ChromeConsole):
134    """Manages control of a Chrome EC.
135
136    We control the Chrome EC via the UART of a Servo board. Chrome EC
137    provides many interfaces to set and get its behavior via console commands.
138    This class is to abstract these interfaces.
139    """
140
141    def __init__(self, servo, name="ec_uart"):
142        super(ChromeEC, self).__init__(servo, name)
143
144
145    def key_down(self, keyname):
146        """Simulate pressing a key.
147
148        Args:
149          keyname: Key name, one of the keys of KEYMATRIX.
150        """
151        self.send_command('kbpress %d %d 1' %
152                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
153
154
155    def key_up(self, keyname):
156        """Simulate releasing a key.
157
158        Args:
159          keyname: Key name, one of the keys of KEYMATRIX.
160        """
161        self.send_command('kbpress %d %d 0' %
162                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
163
164
165    def key_press(self, keyname):
166        """Press and then release a key.
167
168        Args:
169          keyname: Key name, one of the keys of KEYMATRIX.
170        """
171        self.send_command([
172                'kbpress %d %d 1' %
173                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
174                'kbpress %d %d 0' %
175                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
176                ])
177        # Don't spam the EC console as fast as we can; leave some recovery time
178        # in between commands.
179        time.sleep(KEYPRESS_RECOVERY_TIME)
180
181
182    def send_key_string_raw(self, string):
183        """Send key strokes consisting of only characters.
184
185        Args:
186          string: Raw string.
187        """
188        for c in string:
189            self.key_press(c)
190
191
192    def send_key_string(self, string):
193        """Send key strokes including special keys.
194
195        Args:
196          string: Character string including special keys. An example
197            is "this is an<tab>example<enter>".
198        """
199        for m in re.finditer("(<[^>]+>)|([^<>]+)", string):
200            sp, raw = m.groups()
201            if raw is not None:
202                self.send_key_string_raw(raw)
203            else:
204                self.key_press(sp)
205
206
207    def reboot(self, flags=''):
208        """Reboot EC with given flags.
209
210        Args:
211          flags: Optional, a space-separated string of flags passed to the
212                 reboot command, including:
213                   default: EC soft reboot;
214                   'hard': EC hard/cold reboot;
215                   'ap-off': Leave AP off after EC reboot (by default, EC turns
216                             AP on after reboot if lid is open).
217
218        Raises:
219          error.TestError: If the string of flags is invalid.
220        """
221        for flag in flags.split():
222            if flag not in ('hard', 'ap-off'):
223                raise error.TestError(
224                        'The flag %s of EC reboot command is invalid.' % flag)
225        self.send_command("reboot %s" % flags)
226
227
228    def set_flash_write_protect(self, enable):
229        """Set the software write protect of EC flash.
230
231        Args:
232          enable: True to enable write protect, False to disable.
233        """
234        if enable:
235            self.send_command("flashwp enable")
236        else:
237            self.send_command("flashwp disable")
238
239
240    def set_hostevent(self, codes):
241        """Set the EC hostevent codes.
242
243        Args:
244          codes: Hostevent codes, HOSTEVENT_*
245        """
246        self.send_command("hostevent set %#x" % codes)
247        # Allow enough time for EC to process input and set flag.
248        # See chromium:371631 for details.
249        # FIXME: Stop importing time module if this hack becomes obsolete.
250        time.sleep(1)
251
252
253    def enable_console_channel(self, channel):
254        """Find console channel mask and enable that channel only
255
256        @param channel: console channel name
257        """
258        # The 'chan' command returns a list of console channels,
259        # their channel masks and channel numbers
260        regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
261        l = self.send_command_get_output('chan', [regexp])
262        # Use channel mask and append the 0x for proper hex input value
263        cmd = 'chan 0x' + l[0][2]
264        # Set console to only output the desired channel
265        self.send_command(cmd)
266
267
268class ChromeUSBPD(ChromeEC):
269    """Manages control of a Chrome USBPD.
270
271    We control the Chrome EC via the UART of a Servo board. Chrome USBPD
272    provides many interfaces to set and get its behavior via console commands.
273    This class is to abstract these interfaces.
274    """
275
276    def __init__(self, servo):
277        super(ChromeUSBPD, self).__init__(servo, "usbpd_uart")
278
279
280class ChromeCr50(ChromeConsole):
281    """Manages control of a Chrome Cr50.
282
283    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
284    provides many interfaces to set and get its behavior via console commands.
285    This class is to abstract these interfaces.
286    """
287    IDLE_COUNT = 'count: (\d+)'
288
289    def __init__(self, servo):
290        super(ChromeCr50, self).__init__(servo, "cr50_console")
291
292
293    def get_deep_sleep_count(self):
294        """Get the deep sleep count from the idle task"""
295        result = self.send_command_get_output('idle', [self.IDLE_COUNT])
296        return int(result[0][1])
297
298
299    def clear_deep_sleep_count(self):
300        """Clear the deep sleep count"""
301        result = self.send_command_get_output('idle c', [self.IDLE_COUNT])
302        if int(result[0][1]):
303            raise error.TestFail("Could not clear deep sleep count")
304
305
306    def ccd_disable(self):
307        """Change the values of the CC lines to disable CCD"""
308        self._servo.set_nocheck('servo_v4_ccd_mode', 'disconnect')
309        # TODO: Add a better way to wait until usb is disconnected
310        time.sleep(3)
311
312
313    def ccd_enable(self):
314        """Reenable CCD and reset servo interfaces"""
315        self._servo.set_nocheck('servo_v4_ccd_mode', 'ccd')
316        self._servo.set('sbu_mux_enable', 'on')
317        self._servo.set_nocheck('power_state', 'ccd_reset')
318        time.sleep(2)
319