1# Lint as: python3 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import ast 7import collections 8import glob 9import logging 10import os 11import pdb 12from typing import Generator, List, Optional, Tuple 13 14 15class ControlFileError(Exception): 16 """Generic error from this package.""" 17 18 19Control = collections.namedtuple('Control', 20 'path, category, name, suites, main_package') 21 22 23def load_all() -> List[Control]: 24 controls = [] 25 for path in _enumerate_files(): 26 logging.debug('Processing %s', path) 27 control = _load_one(path) 28 logging.debug(' -> %s', control) 29 controls.append(control) 30 return controls 31 32 33_ROOT_DIR = os.path.realpath( 34 os.path.join(os.path.realpath(__file__), "../../..")) 35 36 37def _enumerate_files() -> Generator[str, None, None]: 38 for ttype in ['client', 'server']: 39 tpath = os.path.join(_ROOT_DIR, ttype) 40 for path in glob.iglob(tpath + '/site_tests/*/control*'): 41 # There are some text files with names like control_sequence.txt 42 _, ext = os.path.splitext(path) 43 if ext: 44 continue 45 yield path 46 47 48def _load_one(path: str) -> Control: 49 with open(path) as r: 50 text = r.read() 51 module = ast.parse(text) 52 name = _extract_name(module) 53 category, name = _categorize_name(name) 54 return Control( 55 path=path, 56 category=category, 57 name=name, 58 suites=_extract_suites(module), 59 main_package=_extract_main_package(path, module) or '', 60 ) 61 62 63def _extract_name(module: ast.Module) -> Optional[str]: 64 stmt = _find_last_global_assignment(module.body, 'NAME') 65 if stmt is None: 66 raise ControlFileError('No global NAME assignment') 67 name = _extract_str_value(stmt) 68 if not name: 69 raise ControlFileError('Empty value') 70 return name 71 72 73def _find_last_global_assignment(stmts: List[ast.Assign], 74 name: str) -> Optional[ast.Assign]: 75 found = _find_global_assignments(stmts, name) 76 if len(found) > 0: 77 return found[-1] 78 return None 79 80 81def _find_global_assignments(stmts: List[ast.Assign], 82 name: str) -> List[ast.Assign]: 83 found = [] 84 for stmt in stmts: 85 if isinstance(stmt, ast.Assign) and _contains_name(stmt.targets, name): 86 found.append(stmt) 87 return found 88 89 90def _contains_name(targets: List[ast.Expr], want: str) -> bool: 91 for target in targets: 92 if not isinstance(target, ast.Name): 93 # We do not support complex lvalues. 94 # In particular, multi-valued assignments are not handled properly. 95 continue 96 name: ast.Name = target 97 if name.id == want: 98 return True 99 return False 100 101 102def _extract_str_value(stmt: ast.Assign) -> str: 103 if not isinstance(stmt.value, ast.Constant): 104 raise ControlFileError( 105 'Name assignment value is of type %s, want ast.Constant' % 106 type(stmt.value)) 107 v = str(stmt.value.value) 108 return v 109 110 111def _categorize_name(name: str) -> Tuple[str, str]: 112 parts = name.split('_', 1) 113 if len(parts) == 2: 114 category, rest = parts[0], parts[1] 115 else: 116 category, rest = '', parts[0] 117 return category, rest 118 119 120_SUITE_PREFIX_LEN = len('suite:') 121 122 123def _extract_suites(module: ast.Module) -> List[str]: 124 stmt = _find_last_global_assignment(module.body, 'ATTRIBUTES') 125 if stmt is None: 126 return [] 127 v = _extract_str_value(stmt) 128 suites = [] 129 for attr in v.split(','): 130 attr = attr.strip() 131 if attr.startswith('suite:'): 132 suites.append(attr[_SUITE_PREFIX_LEN:]) 133 return suites 134 135 136def _extract_main_package(path: str, module: ast.Module) -> Optional[str]: 137 fname = _extract_main_file(path, module) 138 if fname is None: 139 return None 140 relpath = os.path.relpath(os.path.dirname(path), _ROOT_DIR) 141 assert '.' not in relpath 142 return 'autotest_lib.%s.%s' % (relpath.replace('/', '.'), fname) 143 144 145def _extract_main_file(path: str, module: ast.Module) -> Optional[str]: 146 calls = _find_run_test_calls(module) 147 if not calls: 148 logging.warning('Found no job.run_test() calls in %s', path) 149 return None 150 if len(calls) > 1: 151 logging.warning('Found %d job.run_test() calls in %s, want 1', len(calls), 152 path) 153 return None 154 return _extract_run_test_target(path, calls[0]) 155 156 157def _find_run_test_calls(module: ast.Module) -> List[ast.Call]: 158 calls = [] 159 for stmt in module.body: 160 if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call): 161 call = stmt.value 162 func = call.func 163 if (isinstance(func, ast.Attribute) and func.attr == 'run_test' and 164 isinstance(func.value, ast.Name) and func.value.id == 'job'): 165 calls.append(call) 166 return calls 167 168 169def _extract_run_test_target(path: str, call: ast.Call) -> Optional[str]: 170 if len(call.args) != 1: 171 logging.warning('job.run_test() has %d arguments in %s, want 1', 172 len(call.args), path) 173 return None 174 arg = call.args[0] 175 if not isinstance(arg, ast.Constant): 176 logging.warning('job.run_test() has a non constant argument in %s', path) 177 return None 178 return arg.value 179