1#!/usr/bin/env python3 2# Copyright 2019 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""This script flashes and runs unit tests on stm32f429i-disc1 boards.""" 16 17import argparse 18import logging 19import os 20import subprocess 21import sys 22import threading 23from typing import List 24 25import coloredlogs # type: ignore 26import serial # type: ignore 27from stm32f429i_disc1_utils import stm32f429i_detector 28 29# Path used to access non-python resources in this python module. 30_DIR = os.path.dirname(__file__) 31 32# Path to default openocd configuration file. 33_OPENOCD_CONFIG = os.path.join(_DIR, 'openocd_stm32f4xx.cfg') 34 35# Path to scripts provided by openocd. 36_OPENOCD_SCRIPTS_DIR = os.path.join( 37 os.getenv('PW_PIGWEED_CIPD_INSTALL_DIR', ''), 'share', 'openocd', 38 'scripts') 39 40_LOG = logging.getLogger('unit_test_runner') 41 42# Verification of test pass/failure depends on these strings. If the formatting 43# or output of the simple_printing_event_handler changes, this may need to be 44# updated. 45_TESTS_STARTING_STRING = b'[==========] Running all tests.' 46_TESTS_DONE_STRING = b'[==========] Done running all tests.' 47_TEST_FAILURE_STRING = b'[ FAILED ]' 48 49# How long to wait for the first byte of a test to be emitted. This is longer 50# than the user-configurable timeout as there's a delay while the device is 51# flashed. 52_FLASH_TIMEOUT = 5.0 53 54 55class TestingFailure(Exception): 56 """A simple exception to be raised when a testing step fails.""" 57 58 59class DeviceNotFound(Exception): 60 """A simple exception to be raised when unable to connect to a device.""" 61 62 63def parse_args(): 64 """Parses command-line arguments.""" 65 66 parser = argparse.ArgumentParser(description=__doc__) 67 parser.add_argument('binary', help='The target test binary to run') 68 parser.add_argument('--openocd-config', 69 default=_OPENOCD_CONFIG, 70 help='Path to openocd configuration file') 71 parser.add_argument('--stlink-serial', 72 default=None, 73 help='The serial number of the stlink to use when ' 74 'flashing the target device') 75 parser.add_argument('--port', 76 default=None, 77 help='The name of the serial port to connect to when ' 78 'running tests') 79 parser.add_argument('--baud', 80 type=int, 81 default=115200, 82 help='Target baud rate to use for serial communication' 83 ' with target device') 84 parser.add_argument('--test-timeout', 85 type=float, 86 default=5.0, 87 help='Maximum communication delay in seconds before a ' 88 'test is considered unresponsive and aborted') 89 parser.add_argument('--verbose', 90 '-v', 91 dest='verbose', 92 action="store_true", 93 help='Output additional logs as the script runs') 94 95 return parser.parse_args() 96 97 98def log_subprocess_output(level, output): 99 """Logs subprocess output line-by-line.""" 100 101 lines = output.decode('utf-8', errors='replace').splitlines() 102 for line in lines: 103 _LOG.log(level, line) 104 105 106def reset_device(openocd_config, stlink_serial): 107 """Uses openocd to reset the attached device.""" 108 109 # Name/path of openocd. 110 default_flasher = 'openocd' 111 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 112 113 cmd = [ 114 flash_tool, '-s', _OPENOCD_SCRIPTS_DIR, '-f', openocd_config, '-c', 115 'init', '-c', 'reset run', '-c', 'exit' 116 ] 117 _LOG.debug('Resetting device') 118 119 env = os.environ.copy() 120 if stlink_serial: 121 env['PW_STLINK_SERIAL'] = stlink_serial 122 123 # Disable GDB port to support multi-device testing. 124 env['PW_GDB_PORT'] = 'disabled' 125 process = subprocess.run(cmd, 126 stdout=subprocess.PIPE, 127 stderr=subprocess.STDOUT, 128 env=env) 129 if process.returncode: 130 log_subprocess_output(logging.ERROR, process.stdout) 131 raise TestingFailure('Failed to reset target device') 132 133 log_subprocess_output(logging.DEBUG, process.stdout) 134 135 _LOG.debug('Successfully reset device') 136 137 138def read_serial(port, baud_rate, test_timeout) -> bytes: 139 """Reads lines from a serial port until a line read times out. 140 141 Returns bytes object containing the read serial data. 142 """ 143 144 serial_data = bytearray() 145 device = serial.Serial(baudrate=baud_rate, 146 port=port, 147 timeout=_FLASH_TIMEOUT) 148 if not device.is_open: 149 raise TestingFailure('Failed to open device') 150 151 # Flush input buffer and reset the device to begin the test. 152 device.reset_input_buffer() 153 154 # Block and wait for the first byte. 155 serial_data += device.read() 156 if not serial_data: 157 raise TestingFailure('Device not producing output') 158 159 device.timeout = test_timeout 160 161 # Read with a reasonable timeout until we stop getting characters. 162 while True: 163 bytes_read = device.readline() 164 if not bytes_read: 165 break 166 serial_data += bytes_read 167 if serial_data.rfind(_TESTS_DONE_STRING) != -1: 168 # Set to much more aggressive timeout since the last one or two 169 # lines should print out immediately. (one line if all fails or all 170 # passes, two lines if mixed.) 171 device.timeout = 0.01 172 173 # Remove carriage returns. 174 serial_data = serial_data.replace(b'\r', b'') 175 176 # Try to trim captured results to only contain most recent test run. 177 test_start_index = serial_data.rfind(_TESTS_STARTING_STRING) 178 return serial_data if test_start_index == -1 else serial_data[ 179 test_start_index:] 180 181 182def flash_device(binary, openocd_config, stlink_serial): 183 """Flash binary to a connected device using the provided configuration.""" 184 185 # Name/path of openocd. 186 default_flasher = 'openocd' 187 flash_tool = os.getenv('OPENOCD_PATH', default_flasher) 188 189 openocd_command = ' '.join(['program', binary, 'reset', 'exit']) 190 cmd = [ 191 flash_tool, '-s', _OPENOCD_SCRIPTS_DIR, '-f', openocd_config, '-c', 192 openocd_command 193 ] 194 _LOG.info('Flashing firmware to device') 195 196 env = os.environ.copy() 197 if stlink_serial: 198 env['PW_STLINK_SERIAL'] = stlink_serial 199 200 # Disable GDB port to support multi-device testing. 201 env['PW_GDB_PORT'] = 'disabled' 202 process = subprocess.run(cmd, 203 stdout=subprocess.PIPE, 204 stderr=subprocess.STDOUT, 205 env=env) 206 if process.returncode: 207 log_subprocess_output(logging.ERROR, process.stdout) 208 raise TestingFailure('Failed to flash target device') 209 210 log_subprocess_output(logging.DEBUG, process.stdout) 211 212 _LOG.debug('Successfully flashed firmware to device') 213 214 215def handle_test_results(test_output): 216 """Parses test output to determine whether tests passed or failed.""" 217 218 if test_output.find(_TESTS_STARTING_STRING) == -1: 219 raise TestingFailure('Failed to find test start') 220 221 if test_output.rfind(_TESTS_DONE_STRING) == -1: 222 log_subprocess_output(logging.INFO, test_output) 223 raise TestingFailure('Tests did not complete') 224 225 if test_output.rfind(_TEST_FAILURE_STRING) != -1: 226 log_subprocess_output(logging.INFO, test_output) 227 raise TestingFailure('Test suite had one or more failures') 228 229 log_subprocess_output(logging.DEBUG, test_output) 230 231 _LOG.info('Test passed!') 232 233 234def _threaded_test_reader(dest, port, baud_rate, test_timeout): 235 """Parses test output to the mutable "dest" passed to this function.""" 236 dest.append(read_serial(port, baud_rate, test_timeout)) 237 238 239def run_device_test(binary, 240 test_timeout, 241 openocd_config, 242 baud, 243 stlink_serial=None, 244 port=None) -> bool: 245 """Flashes, runs, and checks an on-device test binary. 246 247 Returns true on test pass. 248 """ 249 250 if stlink_serial is None and port is None: 251 _LOG.debug('Attempting to automatically detect dev board') 252 boards = stm32f429i_detector.detect_boards() 253 if not boards: 254 error = 'Could not find an attached device' 255 _LOG.error(error) 256 raise DeviceNotFound(error) 257 stlink_serial = boards[0].serial_number 258 port = boards[0].dev_name 259 260 _LOG.debug('Launching test binary %s', binary) 261 try: 262 # Begin capturing test output via another thread BEFORE flashing the 263 # device since the test will automatically run after the image is 264 # flashed. This reduces flake since there isn't a need to time a reset 265 # correctly relative to the start of capturing device output. 266 result: List[bytes] = [] 267 threaded_reader_args = (result, port, baud, test_timeout) 268 read_thread = threading.Thread(target=_threaded_test_reader, 269 args=threaded_reader_args) 270 read_thread.start() 271 _LOG.info('Running test') 272 flash_device(binary, openocd_config, stlink_serial) 273 read_thread.join() 274 if result: 275 handle_test_results(result[0]) 276 except TestingFailure as err: 277 _LOG.error(err) 278 return False 279 280 return True 281 282 283def main(): 284 """Set up runner, and then flash/run device test.""" 285 args = parse_args() 286 287 # Try to use pw_cli logs, else default to something reasonable. 288 try: 289 import pw_cli.log # pylint: disable=import-outside-toplevel 290 log_level = logging.DEBUG if args.verbose else logging.INFO 291 pw_cli.log.install(level=log_level) 292 except ImportError: 293 coloredlogs.install(level='DEBUG' if args.verbose else 'INFO', 294 level_styles={ 295 'debug': { 296 'color': 244 297 }, 298 'error': { 299 'color': 'red' 300 } 301 }, 302 fmt='%(asctime)s %(levelname)s | %(message)s') 303 304 if run_device_test(args.binary, args.test_timeout, args.openocd_config, 305 args.baud, args.stlink_serial, args.port): 306 sys.exit(0) 307 else: 308 sys.exit(1) 309 310 311if __name__ == '__main__': 312 main() 313