1#!/usr/bin/python3
2#
3# Copyright (C) 2023 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16#
17"""Convert single cts report into information files.
18
19Given a cts report, which could be a zip file or test_result.xml, this script
20turns them into three files: info.json, result.csv, and summary.csv.
21"""
22
23import argparse
24import csv
25import json
26import os
27import shutil
28import tempfile
29import xml.etree.ElementTree as ET
30import zipfile
31import constant
32
33
34# TODO(b/293809772): Logging.
35
36
37class CtsReport:
38  """Class to record the test result of a cts report."""
39
40  STATUS_ORDER = [
41      'pass',
42      'IGNORED',
43      'ASSUMPTION_FAILURE',
44      'fail',
45      'TEST_ERROR',
46      'TEST_STATUS_UNSPECIFIED',
47  ]
48
49  FAIL_INDEX = STATUS_ORDER.index('fail')
50
51  def __init__(self, info, selected_abis=constant.ALL_TEST_ABIS):
52    self.info = info
53    self.selected_abis = selected_abis
54    self.result_tree = {}
55    self.module_summaries = {}
56
57  @staticmethod
58  def is_fail(status):
59    if status == constant.NO_DATA:
60      return False
61    else:
62      return CtsReport.STATUS_ORDER.index(status) >= CtsReport.FAIL_INDEX
63
64  def gen_keys_list(self):
65    """Generate a 2D-list of keys."""
66
67    keys_list = []
68
69    modules = self.result_tree
70
71    for module_name, abis in modules.items():
72      for abi, test_classes in abis.items():
73        for class_name, tests in test_classes.items():
74          for test_name in tests.keys():
75            keys_list.append([module_name, abi, class_name, test_name])
76
77    return keys_list
78
79  def is_compatible(self, info):
80    return self.info['build_fingerprint'] == info['build_fingerprint']
81
82  def get_test_status(self, module_name, abi, class_name, test_name):
83    """Get test status from the CtsReport object."""
84
85    if module_name not in self.result_tree:
86      return constant.NO_DATA
87    abis = self.result_tree[module_name]
88
89    if abi not in abis:
90      return constant.NO_DATA
91    test_classes = abis[abi]
92
93    if class_name not in test_classes:
94      return constant.NO_DATA
95
96    tests = test_classes[class_name]
97
98    if test_name not in tests:
99      return constant.NO_DATA
100
101    return tests[test_name]
102
103  def set_test_status(
104      self, module_name, abi, class_name, test_name, test_status
105  ):
106    """Set test status to the CtsReport object."""
107
108    previous = self.get_test_status(module_name, abi, class_name, test_name)
109
110    abis = self.result_tree.setdefault(module_name, {})
111    test_classes = abis.setdefault(abi, {})
112    tests = test_classes.setdefault(class_name, {})
113
114    if previous == constant.NO_DATA:
115      tests[test_name] = test_status
116
117      module_summary = self.module_summaries.setdefault(module_name, {})
118      summary = module_summary.setdefault(abi, self.ModuleSummary())
119      summary.counter[test_status] += 1
120
121    elif (CtsReport.STATUS_ORDER.index(test_status)
122          < CtsReport.STATUS_ORDER.index(previous)):
123      summary = self.module_summaries[module_name][abi]
124
125      tests[test_name] = test_status
126
127      summary.counter[previous] -= 1
128      summary.counter[test_status] += 1
129
130  def read_test_result_xml(self, test_result_path, ignore_abi=False):
131    """Read the result from test_result.xml into a CtsReport object."""
132
133    tree = ET.parse(test_result_path)
134    root = tree.getroot()
135
136    for module in root.iter('Module'):
137      module_name = module.attrib['name']
138      abi = module.attrib['abi']
139      if abi not in self.selected_abis:
140        continue
141      if ignore_abi:
142        abi = constant.ABI_IGNORED
143
144      for testcase in module.iter('TestCase'):
145        class_name = testcase.attrib['name']
146
147        for test in testcase.iter('Test'):
148          test_name = test.attrib['name']
149          result = test.attrib['result']
150          self.set_test_status(module_name, abi, class_name, test_name, result)
151
152  def load_from_csv(self, result_csvfile, ignore_abi=False):
153    """Read the information of the report from the csv files.
154
155    Args:
156      result_csvfile: path to result.csv
157      ignore_abi: if specified, load the test ABI name as constant.ABI_IGNORED
158    """
159
160    result_reader = csv.reader(result_csvfile)
161
162    try:
163      next(result_reader)  # skip the header of csv file
164    except StopIteration:
165      print(f'Empty file: {result_csvfile.name}')
166      return
167
168    for row in result_reader:
169      module_name, abi, class_name, test_name, result = row
170      if abi not in self.selected_abis:
171        continue
172      if ignore_abi:
173        abi = constant.ABI_IGNORED
174      self.set_test_status(module_name, abi, class_name, test_name, result)
175
176  def write_to_csv(self, result_csvfile, summary_csvfile):
177    """Write the information of the report to the csv files.
178
179    Args:
180      result_csvfile: path to result.csv
181      summary_csvfile: path to summary.csv
182    """
183
184    summary_writer = csv.writer(summary_csvfile)
185    summary_writer.writerow(['module_name', 'abi'] + CtsReport.STATUS_ORDER)
186
187    result_writer = csv.writer(result_csvfile)
188    result_writer.writerow(
189        ['module_name', 'abi', 'class_name', 'test_name', 'result']
190    )
191
192    modules = self.result_tree
193
194    for module_name, abis in modules.items():
195      for abi, test_classes in abis.items():
196        module_summary = self.module_summaries[module_name][abi]
197
198        summary = module_summary.summary_list()
199
200        row = [module_name, abi] + summary
201        summary_writer.writerow(row)
202
203        for class_name, tests in test_classes.items():
204          for test_name, result in tests.items():
205            result_writer.writerow(
206                [module_name, abi, class_name, test_name, result]
207            )
208
209  def output_files(self, output_dir):
210    """Produce output files into the directory."""
211
212    parsed_info_path = os.path.join(output_dir, 'info.json')
213    parsed_result_path = os.path.join(output_dir, 'result.csv')
214    parsed_summary_path = os.path.join(output_dir, 'summary.csv')
215
216    files = [parsed_info_path, parsed_result_path, parsed_summary_path]
217
218    for f in files:
219      if os.path.exists(f):
220        raise FileExistsError(f'Output file {f} already exists.')
221
222    with open(parsed_info_path, 'w') as info_file:
223      info_file.write(json.dumps(self.info, indent=2))
224
225    with (
226        open(parsed_result_path, 'w') as result_csvfile,
227        open(parsed_summary_path, 'w') as summary_csvfile,
228    ):
229      self.write_to_csv(result_csvfile, summary_csvfile)
230
231    for f in files:
232      print(f'Parsed output {f}')
233
234    return files
235
236  class ModuleSummary:
237    """Record the result summary of each (module, abi) pair."""
238
239    def __init__(self):
240      self.counter = dict.fromkeys(CtsReport.STATUS_ORDER, 0)
241
242    @property
243    def tested_items(self):
244      """All tested items."""
245      items = 0
246      for status in CtsReport.STATUS_ORDER:
247        items += self.counter[status]
248      return items
249
250    @property
251    def pass_rate(self):
252      """Pass rate of the module."""
253      if self.tested_items == 0:
254        return 0.0
255      else:
256        pass_category = 0
257        for status in CtsReport.STATUS_ORDER:
258          if not CtsReport.is_fail(status):
259            pass_category += self.counter[status]
260        return pass_category / self.tested_items
261
262    def print_summary(self):
263      for key in CtsReport.STATUS_ORDER:
264        print(f'{key}: {self.counter[key]}')
265        print()
266
267    def summary_list(self):
268      return [self.counter[key] for key in CtsReport.STATUS_ORDER]
269
270
271ATTRS_TO_SHOW = [
272    'Result::Build.build_model',
273    'Result::Build.build_id',
274    'Result::Build.build_fingerprint',
275    'Result::Build.build_device',
276    'Result::Build.build_version_sdk',
277    'Result::Build.build_version_security_patch',
278    'Result::Build.build_board',
279    'Result::Build.build_type',
280    'Result::Build.build_version_release',
281    'Result.suite_name',
282    'Result.suite_version',
283    'Result.suite_plan',
284    'Result.suite_build_number',
285]
286
287
288def parse_attrib_path(attrib_path):
289  """Parse the path into xml tag and attribute name."""
290  first_dot = attrib_path.index('.')
291  tags = attrib_path[:first_dot].split('::')
292  attr_name = attrib_path[first_dot + 1 :]
293  return tags, attr_name
294
295
296def get_test_info_xml(test_result_path):
297  """Get test info from xml file."""
298
299  tree = ET.parse(test_result_path)
300  root = tree.getroot()
301
302  test_info = {
303      'tool_version': constant.VERSION,
304      'source_path': test_result_path,
305  }
306
307  for attrib_path in ATTRS_TO_SHOW:
308    tags, attr_name = parse_attrib_path(attrib_path)
309    node = root
310
311    while True:
312      tags = tags[1:]
313      if tags:
314        node = node.find(tags[0])
315      else:
316        break
317
318    test_info[attr_name] = node.attrib[attr_name]
319
320  return test_info
321
322
323def print_test_info(info):
324  """Print test information of the result in table format."""
325
326  max_key_len = max([len(k) for k in info])
327  max_value_len = max([len(info[k]) for k in info])
328  table_len = max_key_len + 2 + max_value_len
329
330  print('=' * table_len)
331
332  for key in info:
333    print(f'{key:<{max_key_len}}  {info[key]}')
334
335  print('=' * table_len)
336  print()
337
338
339def extract_test_result_from_zip(zip_file_path, dest_dir):
340  """Extract test_result.xml from the zip file."""
341
342  result_name = 'test_result.xml'
343  extracted = os.path.join(dest_dir, result_name)
344  with zipfile.ZipFile(zip_file_path) as myzip:
345    result_list = [f for f in myzip.namelist() if result_name in f]
346    if len(result_list) != 1:
347      raise RuntimeError(f'Cannot extract {result_name} from {zip_file_path}, '
348                         f'matched files: {" ".join(result_list)}')
349    with myzip.open(result_list[0]) as source, open(extracted, 'wb') as target:
350      shutil.copyfileobj(source, target)
351  return extracted
352
353
354def parse_report_file(report_file,
355                      selected_abis=constant.ALL_TEST_ABIS,
356                      ignore_abi=False):
357  """Turn one cts report into a CtsReport object."""
358
359  with tempfile.TemporaryDirectory() as temp_dir:
360    xml_path = (
361        extract_test_result_from_zip(report_file, temp_dir)
362        if zipfile.is_zipfile(report_file)
363        else report_file
364    )
365
366    test_info = get_test_info_xml(xml_path)
367    print(f'Parsing {selected_abis} test results from: ')
368    print_test_info(test_info)
369
370    report = CtsReport(test_info, selected_abis)
371    report.read_test_result_xml(xml_path, ignore_abi)
372
373  return report
374
375
376def main():
377  parser = argparse.ArgumentParser()
378
379  parser.add_argument(
380      '-r',
381      '--report',
382      required=True,
383      help=(
384          'Path to a cts report, where a cts report could '
385          'be a zip archive or a xml file.'
386      ),
387  )
388  parser.add_argument(
389      '-d',
390      '--output-dir',
391      required=True,
392      help='Path to the directory to store output files.',
393  )
394  parser.add_argument(
395      '--abi',
396      choices=constant.ALL_TEST_ABIS,
397      nargs='*',
398      default=constant.ALL_TEST_ABIS,
399      help='Selected test ABIs to be parsed.',
400  )
401
402  args = parser.parse_args()
403
404  report_file = args.report
405  output_dir = args.output_dir
406
407  if not os.path.exists(output_dir):
408    raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
409
410  report = parse_report_file(report_file, args.abi)
411
412  report.output_files(output_dir)
413
414
415if __name__ == '__main__':
416  main()
417