1# Lint as: python3
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import ast
7import collections
8import glob
9import logging
10import os
11import pdb
12from typing import Generator, List, Optional, Tuple
13
14
15class ControlFileError(Exception):
16  """Generic error from this package."""
17
18
19Control = collections.namedtuple('Control',
20                                 'path, category, name, suites, main_package')
21
22
23def load_all() -> List[Control]:
24  controls = []
25  for path in _enumerate_files():
26    logging.debug('Processing %s', path)
27    control = _load_one(path)
28    logging.debug('  -> %s', control)
29    controls.append(control)
30  return controls
31
32
33_ROOT_DIR = os.path.realpath(
34    os.path.join(os.path.realpath(__file__), "../../.."))
35
36
37def _enumerate_files() -> Generator[str, None, None]:
38  for ttype in ['client', 'server']:
39    tpath = os.path.join(_ROOT_DIR, ttype)
40    for path in glob.iglob(tpath + '/site_tests/*/control*'):
41      # There are some text files with names like control_sequence.txt
42      _, ext = os.path.splitext(path)
43      if ext:
44        continue
45      yield path
46
47
48def _load_one(path: str) -> Control:
49  with open(path) as r:
50    text = r.read()
51  module = ast.parse(text)
52  name = _extract_name(module)
53  category, name = _categorize_name(name)
54  return Control(
55      path=path,
56      category=category,
57      name=name,
58      suites=_extract_suites(module),
59      main_package=_extract_main_package(path, module) or '',
60  )
61
62
63def _extract_name(module: ast.Module) -> Optional[str]:
64  stmt = _find_last_global_assignment(module.body, 'NAME')
65  if stmt is None:
66    raise ControlFileError('No global NAME assignment')
67  name = _extract_str_value(stmt)
68  if not name:
69    raise ControlFileError('Empty value')
70  return name
71
72
73def _find_last_global_assignment(stmts: List[ast.Assign],
74                                 name: str) -> Optional[ast.Assign]:
75  found = _find_global_assignments(stmts, name)
76  if len(found) > 0:
77    return found[-1]
78  return None
79
80
81def _find_global_assignments(stmts: List[ast.Assign],
82                             name: str) -> List[ast.Assign]:
83  found = []
84  for stmt in stmts:
85    if isinstance(stmt, ast.Assign) and _contains_name(stmt.targets, name):
86      found.append(stmt)
87  return found
88
89
90def _contains_name(targets: List[ast.Expr], want: str) -> bool:
91  for target in targets:
92    if not isinstance(target, ast.Name):
93      # We do not support complex lvalues.
94      # In particular, multi-valued assignments are not handled properly.
95      continue
96    name: ast.Name = target
97    if name.id == want:
98      return True
99  return False
100
101
102def _extract_str_value(stmt: ast.Assign) -> str:
103  if not isinstance(stmt.value, ast.Constant):
104    raise ControlFileError(
105        'Name assignment value is of type %s, want ast.Constant' %
106        type(stmt.value))
107  v = str(stmt.value.value)
108  return v
109
110
111def _categorize_name(name: str) -> Tuple[str, str]:
112  parts = name.split('_', 1)
113  if len(parts) == 2:
114    category, rest = parts[0], parts[1]
115  else:
116    category, rest = '', parts[0]
117  return category, rest
118
119
120_SUITE_PREFIX_LEN = len('suite:')
121
122
123def _extract_suites(module: ast.Module) -> List[str]:
124  stmt = _find_last_global_assignment(module.body, 'ATTRIBUTES')
125  if stmt is None:
126    return []
127  v = _extract_str_value(stmt)
128  suites = []
129  for attr in v.split(','):
130    attr = attr.strip()
131    if attr.startswith('suite:'):
132      suites.append(attr[_SUITE_PREFIX_LEN:])
133  return suites
134
135
136def _extract_main_package(path: str, module: ast.Module) -> Optional[str]:
137  fname = _extract_main_file(path, module)
138  if fname is None:
139    return None
140  relpath = os.path.relpath(os.path.dirname(path), _ROOT_DIR)
141  assert '.' not in relpath
142  return 'autotest_lib.%s.%s' % (relpath.replace('/', '.'), fname)
143
144
145def _extract_main_file(path: str, module: ast.Module) -> Optional[str]:
146  calls = _find_run_test_calls(module)
147  if not calls:
148    logging.warning('Found no job.run_test() calls in %s', path)
149    return None
150  if len(calls) > 1:
151    logging.warning('Found %d job.run_test() calls in %s, want 1', len(calls),
152                    path)
153    return None
154  return _extract_run_test_target(path, calls[0])
155
156
157def _find_run_test_calls(module: ast.Module) -> List[ast.Call]:
158  calls = []
159  for stmt in module.body:
160    if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
161      call = stmt.value
162      func = call.func
163      if (isinstance(func, ast.Attribute) and func.attr == 'run_test' and
164          isinstance(func.value, ast.Name) and func.value.id == 'job'):
165        calls.append(call)
166  return calls
167
168
169def _extract_run_test_target(path: str, call: ast.Call) -> Optional[str]:
170  if len(call.args) != 1:
171    logging.warning('job.run_test() has %d arguments in %s, want 1',
172                    len(call.args), path)
173    return None
174  arg = call.args[0]
175  if not isinstance(arg, ast.Constant):
176    logging.warning('job.run_test() has a non constant argument in %s', path)
177    return None
178  return arg.value
179