1#!/usr/bin/python3 2# 3# Copyright (C) 2023 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16# 17"""Convert single cts report into information files. 18 19Given a cts report, which could be a zip file or test_result.xml, this script 20turns them into three files: info.json, result.csv, and summary.csv. 21""" 22 23import argparse 24import csv 25import json 26import os 27import shutil 28import tempfile 29import xml.etree.ElementTree as ET 30import zipfile 31import constant 32 33 34# TODO(b/293809772): Logging. 35 36 37class CtsReport: 38 """Class to record the test result of a cts report.""" 39 40 STATUS_ORDER = [ 41 'pass', 42 'IGNORED', 43 'ASSUMPTION_FAILURE', 44 'fail', 45 'TEST_ERROR', 46 'TEST_STATUS_UNSPECIFIED', 47 ] 48 49 FAIL_INDEX = STATUS_ORDER.index('fail') 50 51 def __init__(self, info, selected_abis=constant.ALL_TEST_ABIS): 52 self.info = info 53 self.selected_abis = selected_abis 54 self.result_tree = {} 55 self.module_summaries = {} 56 57 @staticmethod 58 def is_fail(status): 59 if status == constant.NO_DATA: 60 return False 61 else: 62 return CtsReport.STATUS_ORDER.index(status) >= CtsReport.FAIL_INDEX 63 64 def gen_keys_list(self): 65 """Generate a 2D-list of keys.""" 66 67 keys_list = [] 68 69 modules = self.result_tree 70 71 for module_name, abis in modules.items(): 72 for abi, test_classes in abis.items(): 73 for class_name, tests in test_classes.items(): 74 for test_name in tests.keys(): 75 keys_list.append([module_name, abi, class_name, test_name]) 76 77 return keys_list 78 79 def is_compatible(self, info): 80 return self.info['build_fingerprint'] == info['build_fingerprint'] 81 82 def get_test_status(self, module_name, abi, class_name, test_name): 83 """Get test status from the CtsReport object.""" 84 85 if module_name not in self.result_tree: 86 return constant.NO_DATA 87 abis = self.result_tree[module_name] 88 89 if abi not in abis: 90 return constant.NO_DATA 91 test_classes = abis[abi] 92 93 if class_name not in test_classes: 94 return constant.NO_DATA 95 96 tests = test_classes[class_name] 97 98 if test_name not in tests: 99 return constant.NO_DATA 100 101 return tests[test_name] 102 103 def set_test_status( 104 self, module_name, abi, class_name, test_name, test_status 105 ): 106 """Set test status to the CtsReport object.""" 107 108 previous = self.get_test_status(module_name, abi, class_name, test_name) 109 110 abis = self.result_tree.setdefault(module_name, {}) 111 test_classes = abis.setdefault(abi, {}) 112 tests = test_classes.setdefault(class_name, {}) 113 114 if previous == constant.NO_DATA: 115 tests[test_name] = test_status 116 117 module_summary = self.module_summaries.setdefault(module_name, {}) 118 summary = module_summary.setdefault(abi, self.ModuleSummary()) 119 summary.counter[test_status] += 1 120 121 elif (CtsReport.STATUS_ORDER.index(test_status) 122 < CtsReport.STATUS_ORDER.index(previous)): 123 summary = self.module_summaries[module_name][abi] 124 125 tests[test_name] = test_status 126 127 summary.counter[previous] -= 1 128 summary.counter[test_status] += 1 129 130 def read_test_result_xml(self, test_result_path, ignore_abi=False): 131 """Read the result from test_result.xml into a CtsReport object.""" 132 133 tree = ET.parse(test_result_path) 134 root = tree.getroot() 135 136 for module in root.iter('Module'): 137 module_name = module.attrib['name'] 138 abi = module.attrib['abi'] 139 if abi not in self.selected_abis: 140 continue 141 if ignore_abi: 142 abi = constant.ABI_IGNORED 143 144 for testcase in module.iter('TestCase'): 145 class_name = testcase.attrib['name'] 146 147 for test in testcase.iter('Test'): 148 test_name = test.attrib['name'] 149 result = test.attrib['result'] 150 self.set_test_status(module_name, abi, class_name, test_name, result) 151 152 def load_from_csv(self, result_csvfile, ignore_abi=False): 153 """Read the information of the report from the csv files. 154 155 Args: 156 result_csvfile: path to result.csv 157 ignore_abi: if specified, load the test ABI name as constant.ABI_IGNORED 158 """ 159 160 result_reader = csv.reader(result_csvfile) 161 162 try: 163 next(result_reader) # skip the header of csv file 164 except StopIteration: 165 print(f'Empty file: {result_csvfile.name}') 166 return 167 168 for row in result_reader: 169 module_name, abi, class_name, test_name, result = row 170 if abi not in self.selected_abis: 171 continue 172 if ignore_abi: 173 abi = constant.ABI_IGNORED 174 self.set_test_status(module_name, abi, class_name, test_name, result) 175 176 def write_to_csv(self, result_csvfile, summary_csvfile): 177 """Write the information of the report to the csv files. 178 179 Args: 180 result_csvfile: path to result.csv 181 summary_csvfile: path to summary.csv 182 """ 183 184 summary_writer = csv.writer(summary_csvfile) 185 summary_writer.writerow(['module_name', 'abi'] + CtsReport.STATUS_ORDER) 186 187 result_writer = csv.writer(result_csvfile) 188 result_writer.writerow( 189 ['module_name', 'abi', 'class_name', 'test_name', 'result'] 190 ) 191 192 modules = self.result_tree 193 194 for module_name, abis in modules.items(): 195 for abi, test_classes in abis.items(): 196 module_summary = self.module_summaries[module_name][abi] 197 198 summary = module_summary.summary_list() 199 200 row = [module_name, abi] + summary 201 summary_writer.writerow(row) 202 203 for class_name, tests in test_classes.items(): 204 for test_name, result in tests.items(): 205 result_writer.writerow( 206 [module_name, abi, class_name, test_name, result] 207 ) 208 209 def output_files(self, output_dir): 210 """Produce output files into the directory.""" 211 212 parsed_info_path = os.path.join(output_dir, 'info.json') 213 parsed_result_path = os.path.join(output_dir, 'result.csv') 214 parsed_summary_path = os.path.join(output_dir, 'summary.csv') 215 216 files = [parsed_info_path, parsed_result_path, parsed_summary_path] 217 218 for f in files: 219 if os.path.exists(f): 220 raise FileExistsError(f'Output file {f} already exists.') 221 222 with open(parsed_info_path, 'w') as info_file: 223 info_file.write(json.dumps(self.info, indent=2)) 224 225 with ( 226 open(parsed_result_path, 'w') as result_csvfile, 227 open(parsed_summary_path, 'w') as summary_csvfile, 228 ): 229 self.write_to_csv(result_csvfile, summary_csvfile) 230 231 for f in files: 232 print(f'Parsed output {f}') 233 234 return files 235 236 class ModuleSummary: 237 """Record the result summary of each (module, abi) pair.""" 238 239 def __init__(self): 240 self.counter = dict.fromkeys(CtsReport.STATUS_ORDER, 0) 241 242 @property 243 def tested_items(self): 244 """All tested items.""" 245 items = 0 246 for status in CtsReport.STATUS_ORDER: 247 items += self.counter[status] 248 return items 249 250 @property 251 def pass_rate(self): 252 """Pass rate of the module.""" 253 if self.tested_items == 0: 254 return 0.0 255 else: 256 pass_category = 0 257 for status in CtsReport.STATUS_ORDER: 258 if not CtsReport.is_fail(status): 259 pass_category += self.counter[status] 260 return pass_category / self.tested_items 261 262 def print_summary(self): 263 for key in CtsReport.STATUS_ORDER: 264 print(f'{key}: {self.counter[key]}') 265 print() 266 267 def summary_list(self): 268 return [self.counter[key] for key in CtsReport.STATUS_ORDER] 269 270 271ATTRS_TO_SHOW = [ 272 'Result::Build.build_model', 273 'Result::Build.build_id', 274 'Result::Build.build_fingerprint', 275 'Result::Build.build_device', 276 'Result::Build.build_version_sdk', 277 'Result::Build.build_version_security_patch', 278 'Result::Build.build_board', 279 'Result::Build.build_type', 280 'Result::Build.build_version_release', 281 'Result.suite_name', 282 'Result.suite_version', 283 'Result.suite_plan', 284 'Result.suite_build_number', 285] 286 287 288def parse_attrib_path(attrib_path): 289 """Parse the path into xml tag and attribute name.""" 290 first_dot = attrib_path.index('.') 291 tags = attrib_path[:first_dot].split('::') 292 attr_name = attrib_path[first_dot + 1 :] 293 return tags, attr_name 294 295 296def get_test_info_xml(test_result_path): 297 """Get test info from xml file.""" 298 299 tree = ET.parse(test_result_path) 300 root = tree.getroot() 301 302 test_info = { 303 'tool_version': constant.VERSION, 304 'source_path': test_result_path, 305 } 306 307 for attrib_path in ATTRS_TO_SHOW: 308 tags, attr_name = parse_attrib_path(attrib_path) 309 node = root 310 311 while True: 312 tags = tags[1:] 313 if tags: 314 node = node.find(tags[0]) 315 else: 316 break 317 318 test_info[attr_name] = node.attrib[attr_name] 319 320 return test_info 321 322 323def print_test_info(info): 324 """Print test information of the result in table format.""" 325 326 max_key_len = max([len(k) for k in info]) 327 max_value_len = max([len(info[k]) for k in info]) 328 table_len = max_key_len + 2 + max_value_len 329 330 print('=' * table_len) 331 332 for key in info: 333 print(f'{key:<{max_key_len}} {info[key]}') 334 335 print('=' * table_len) 336 print() 337 338 339def extract_test_result_from_zip(zip_file_path, dest_dir): 340 """Extract test_result.xml from the zip file.""" 341 342 result_name = 'test_result.xml' 343 extracted = os.path.join(dest_dir, result_name) 344 with zipfile.ZipFile(zip_file_path) as myzip: 345 result_list = [f for f in myzip.namelist() if result_name in f] 346 if len(result_list) != 1: 347 raise RuntimeError(f'Cannot extract {result_name} from {zip_file_path}, ' 348 f'matched files: {" ".join(result_list)}') 349 with myzip.open(result_list[0]) as source, open(extracted, 'wb') as target: 350 shutil.copyfileobj(source, target) 351 return extracted 352 353 354def parse_report_file(report_file, 355 selected_abis=constant.ALL_TEST_ABIS, 356 ignore_abi=False): 357 """Turn one cts report into a CtsReport object.""" 358 359 with tempfile.TemporaryDirectory() as temp_dir: 360 xml_path = ( 361 extract_test_result_from_zip(report_file, temp_dir) 362 if zipfile.is_zipfile(report_file) 363 else report_file 364 ) 365 366 test_info = get_test_info_xml(xml_path) 367 print(f'Parsing {selected_abis} test results from: ') 368 print_test_info(test_info) 369 370 report = CtsReport(test_info, selected_abis) 371 report.read_test_result_xml(xml_path, ignore_abi) 372 373 return report 374 375 376def main(): 377 parser = argparse.ArgumentParser() 378 379 parser.add_argument( 380 '-r', 381 '--report', 382 required=True, 383 help=( 384 'Path to a cts report, where a cts report could ' 385 'be a zip archive or a xml file.' 386 ), 387 ) 388 parser.add_argument( 389 '-d', 390 '--output-dir', 391 required=True, 392 help='Path to the directory to store output files.', 393 ) 394 parser.add_argument( 395 '--abi', 396 choices=constant.ALL_TEST_ABIS, 397 nargs='*', 398 default=constant.ALL_TEST_ABIS, 399 help='Selected test ABIs to be parsed.', 400 ) 401 402 args = parser.parse_args() 403 404 report_file = args.report 405 output_dir = args.output_dir 406 407 if not os.path.exists(output_dir): 408 raise FileNotFoundError(f'Output directory {output_dir} does not exist.') 409 410 report = parse_report_file(report_file, args.abi) 411 412 report.output_files(output_dir) 413 414 415if __name__ == '__main__': 416 main() 417