1#!/usr/bin/env python3
2# Copyright 2019 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""This script flashes and runs unit tests on stm32f429i-disc1 boards."""
16
17import argparse
18import logging
19import os
20import subprocess
21import sys
22import threading
23from typing import List
24
25import coloredlogs  # type: ignore
26import serial  # type: ignore
27from stm32f429i_disc1_utils import stm32f429i_detector
28
29# Path used to access non-python resources in this python module.
30_DIR = os.path.dirname(__file__)
31
32# Path to default openocd configuration file.
33_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg')
34
35# Path to scripts provided by openocd.
36_OPENOCD_SCRIPTS_DIR = os.path.join(
37    os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd',
38    'scripts')
39
40_LOG = logging.getLogger('unit_test_runner')
41
42# Verification of test pass/failure depends on these strings. If the formatting
43# or output of the simple_printing_event_handler changes, this may need to be
44# updated.
45_TESTS_STARTING_STRING = b'[==========] Running all tests.'
46_TESTS_DONE_STRING = b'[==========] Done running all tests.'
47_TEST_FAILURE_STRING = b'[  FAILED  ]'
48
49# How long to wait for the first byte of a test to be emitted. This is longer
50# than the user-configurable timeout as there's a delay while the device is
51# flashed.
52_FLASH_TIMEOUT = 5.0
53
54
55class TestingFailure(Exception):
56    """A simple exception to be raised when a testing step fails."""
57
58
59class DeviceNotFound(Exception):
60    """A simple exception to be raised when unable to connect to a device."""
61
62
63def parse_args():
64    """Parses command-line arguments."""
65
66    parser = argparse.ArgumentParser(description=__doc__)
67    parser.add_argument('binary', help='The target test binary to run')
68    parser.add_argument('--openocd-config',
69                        default=_OPENOCD_CONFIG,
70                        help='Path to openocd configuration file')
71    parser.add_argument('--stlink-serial',
72                        default=None,
73                        help='The serial number of the stlink to use when '
74                        'flashing the target device')
75    parser.add_argument('--port',
76                        default=None,
77                        help='The name of the serial port to connect to when '
78                        'running tests')
79    parser.add_argument('--baud',
80                        type=int,
81                        default=115200,
82                        help='Target baud rate to use for serial communication'
83                        ' with target device')
84    parser.add_argument('--test-timeout',
85                        type=float,
86                        default=5.0,
87                        help='Maximum communication delay in seconds before a '
88                        'test is considered unresponsive and aborted')
89    parser.add_argument('--verbose',
90                        '-v',
91                        dest='verbose',
92                        action="store_true",
93                        help='Output additional logs as the script runs')
94
95    return parser.parse_args()
96
97
98def log_subprocess_output(level, output):
99    """Logs subprocess output line-by-line."""
100
101    lines = output.decode('utf-8', errors='replace').splitlines()
102    for line in lines:
103        _LOG.log(level, line)
104
105
106def reset_device(openocd_config, stlink_serial):
107    """Uses openocd to reset the attached device."""
108
109    # Name/path of openocd.
110    default_flasher = 'openocd'
111    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
112
113    cmd = [
114        flash_tool, '-s', _OPENOCD_SCRIPTS_DIR, '-f', openocd_config, '-c',
115        'init', '-c', 'reset run', '-c', 'exit'
116    ]
117    _LOG.debug('Resetting device')
118
119    env = os.environ.copy()
120    if stlink_serial:
121        env['PW_STLINK_SERIAL'] = stlink_serial
122
123    # Disable GDB port to support multi-device testing.
124    env['PW_GDB_PORT'] = 'disabled'
125    process = subprocess.run(cmd,
126                             stdout=subprocess.PIPE,
127                             stderr=subprocess.STDOUT,
128                             env=env)
129    if process.returncode:
130        log_subprocess_output(logging.ERROR, process.stdout)
131        raise TestingFailure('Failed to reset target device')
132
133    log_subprocess_output(logging.DEBUG, process.stdout)
134
135    _LOG.debug('Successfully reset device')
136
137
138def read_serial(port, baud_rate, test_timeout) -> bytes:
139    """Reads lines from a serial port until a line read times out.
140
141    Returns bytes object containing the read serial data.
142    """
143
144    serial_data = bytearray()
145    device = serial.Serial(baudrate=baud_rate,
146                           port=port,
147                           timeout=_FLASH_TIMEOUT)
148    if not device.is_open:
149        raise TestingFailure('Failed to open device')
150
151    # Flush input buffer and reset the device to begin the test.
152    device.reset_input_buffer()
153
154    # Block and wait for the first byte.
155    serial_data += device.read()
156    if not serial_data:
157        raise TestingFailure('Device not producing output')
158
159    device.timeout = test_timeout
160
161    # Read with a reasonable timeout until we stop getting characters.
162    while True:
163        bytes_read = device.readline()
164        if not bytes_read:
165            break
166        serial_data += bytes_read
167        if serial_data.rfind(_TESTS_DONE_STRING) != -1:
168            # Set to much more aggressive timeout since the last one or two
169            # lines should print out immediately. (one line if all fails or all
170            # passes, two lines if mixed.)
171            device.timeout = 0.01
172
173    # Remove carriage returns.
174    serial_data = serial_data.replace(b'\r', b'')
175
176    # Try to trim captured results to only contain most recent test run.
177    test_start_index = serial_data.rfind(_TESTS_STARTING_STRING)
178    return serial_data if test_start_index == -1 else serial_data[
179        test_start_index:]
180
181
182def flash_device(binary, openocd_config, stlink_serial):
183    """Flash binary to a connected device using the provided configuration."""
184
185    # Name/path of openocd.
186    default_flasher = 'openocd'
187    flash_tool = os.getenv('OPENOCD_PATH', default_flasher)
188
189    openocd_command = ' '.join(['program', binary, 'reset', 'exit'])
190    cmd = [
191        flash_tool, '-s', _OPENOCD_SCRIPTS_DIR, '-f', openocd_config, '-c',
192        openocd_command
193    ]
194    _LOG.info('Flashing firmware to device')
195
196    env = os.environ.copy()
197    if stlink_serial:
198        env['PW_STLINK_SERIAL'] = stlink_serial
199
200    # Disable GDB port to support multi-device testing.
201    env['PW_GDB_PORT'] = 'disabled'
202    process = subprocess.run(cmd,
203                             stdout=subprocess.PIPE,
204                             stderr=subprocess.STDOUT,
205                             env=env)
206    if process.returncode:
207        log_subprocess_output(logging.ERROR, process.stdout)
208        raise TestingFailure('Failed to flash target device')
209
210    log_subprocess_output(logging.DEBUG, process.stdout)
211
212    _LOG.debug('Successfully flashed firmware to device')
213
214
215def handle_test_results(test_output):
216    """Parses test output to determine whether tests passed or failed."""
217
218    if test_output.find(_TESTS_STARTING_STRING) == -1:
219        raise TestingFailure('Failed to find test start')
220
221    if test_output.rfind(_TESTS_DONE_STRING) == -1:
222        log_subprocess_output(logging.INFO, test_output)
223        raise TestingFailure('Tests did not complete')
224
225    if test_output.rfind(_TEST_FAILURE_STRING) != -1:
226        log_subprocess_output(logging.INFO, test_output)
227        raise TestingFailure('Test suite had one or more failures')
228
229    log_subprocess_output(logging.DEBUG, test_output)
230
231    _LOG.info('Test passed!')
232
233
234def _threaded_test_reader(dest, port, baud_rate, test_timeout):
235    """Parses test output to the mutable "dest" passed to this function."""
236    dest.append(read_serial(port, baud_rate, test_timeout))
237
238
239def run_device_test(binary,
240                    test_timeout,
241                    openocd_config,
242                    baud,
243                    stlink_serial=None,
244                    port=None) -> bool:
245    """Flashes, runs, and checks an on-device test binary.
246
247    Returns true on test pass.
248    """
249
250    if stlink_serial is None and port is None:
251        _LOG.debug('Attempting to automatically detect dev board')
252        boards = stm32f429i_detector.detect_boards()
253        if not boards:
254            error = 'Could not find an attached device'
255            _LOG.error(error)
256            raise DeviceNotFound(error)
257        stlink_serial = boards[0].serial_number
258        port = boards[0].dev_name
259
260    _LOG.debug('Launching test binary %s', binary)
261    try:
262        # Begin capturing test output via another thread BEFORE flashing the
263        # device since the test will automatically run after the image is
264        # flashed. This reduces flake since there isn't a need to time a reset
265        # correctly relative to the start of capturing device output.
266        result: List[bytes] = []
267        threaded_reader_args = (result, port, baud, test_timeout)
268        read_thread = threading.Thread(target=_threaded_test_reader,
269                                       args=threaded_reader_args)
270        read_thread.start()
271        _LOG.info('Running test')
272        flash_device(binary, openocd_config, stlink_serial)
273        read_thread.join()
274        if result:
275            handle_test_results(result[0])
276    except TestingFailure as err:
277        _LOG.error(err)
278        return False
279
280    return True
281
282
283def main():
284    """Set up runner, and then flash/run device test."""
285    args = parse_args()
286
287    # Try to use pw_cli logs, else default to something reasonable.
288    try:
289        import pw_cli.log  # pylint: disable=import-outside-toplevel
290        log_level = logging.DEBUG if args.verbose else logging.INFO
291        pw_cli.log.install(level=log_level)
292    except ImportError:
293        coloredlogs.install(level='DEBUG' if args.verbose else 'INFO',
294                            level_styles={
295                                'debug': {
296                                    'color': 244
297                                },
298                                'error': {
299                                    'color': 'red'
300                                }
301                            },
302                            fmt='%(asctime)s %(levelname)s | %(message)s')
303
304    if run_device_test(args.binary, args.test_timeout, args.openocd_config,
305                       args.baud, args.stlink_serial, args.port):
306        sys.exit(0)
307    else:
308        sys.exit(1)
309
310
311if __name__ == '__main__':
312    main()
313