1# Copyright 2019, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""ATest execution info generator."""
16
17# pylint: disable=line-too-long
18
19from __future__ import print_function
20
21import glob
22import logging
23import json
24import os
25import sys
26
27import atest_utils as au
28import constants
29
30from metrics import metrics_utils
31
32_ARGS_KEY = 'args'
33_STATUS_PASSED_KEY = 'PASSED'
34_STATUS_FAILED_KEY = 'FAILED'
35_STATUS_IGNORED_KEY = 'IGNORED'
36_SUMMARY_KEY = 'summary'
37_TOTAL_SUMMARY_KEY = 'total_summary'
38_TEST_RUNNER_KEY = 'test_runner'
39_TEST_NAME_KEY = 'test_name'
40_TEST_TIME_KEY = 'test_time'
41_TEST_DETAILS_KEY = 'details'
42_TEST_RESULT_NAME = 'test_result'
43_TEST_RESULT_LINK = 'test_result_link'
44_EXIT_CODE_ATTR = 'EXIT_CODE'
45_MAIN_MODULE_KEY = '__main__'
46_UUID_LEN = 30
47_RESULT_LEN = 20
48_RESULT_URL_LEN = 35
49_COMMAND_LEN = 50
50_LOGCAT_FMT = '{}/log/invocation_*/{}*logcat-on-failure*'
51
52_SUMMARY_MAP_TEMPLATE = {_STATUS_PASSED_KEY: 0,
53                         _STATUS_FAILED_KEY: 0,
54                         _STATUS_IGNORED_KEY: 0}
55
56PREPARE_END_TIME = None
57
58
59def preparation_time(start_time):
60    """Return the preparation time.
61
62    Args:
63        start_time: The time.
64
65    Returns:
66        The preparation time if PREPARE_END_TIME is set, None otherwise.
67    """
68    return PREPARE_END_TIME - start_time if PREPARE_END_TIME else None
69
70
71def symlink_latest_result(test_result_dir):
72    """Make the symbolic link to latest result.
73
74    Args:
75        test_result_dir: A string of the dir path.
76    """
77    symlink = os.path.join(constants.ATEST_RESULT_ROOT, 'LATEST')
78    if os.path.exists(symlink) or os.path.islink(symlink):
79        os.remove(symlink)
80    os.symlink(test_result_dir, symlink)
81
82
83def print_test_result(root, history_arg):
84    """Make a list of latest n test result.
85
86    Args:
87        root: A string of the test result root path.
88        history_arg: A string of an integer or uuid. If it's an integer string,
89                     the number of lines of test result will be given; else it
90                     will be treated a uuid and print test result accordingly
91                     in detail.
92    """
93    if not history_arg.isdigit():
94        path = os.path.join(constants.ATEST_RESULT_ROOT, history_arg,
95                            'test_result')
96        print_test_result_by_path(path)
97        return
98    target = '%s/20*_*_*' % root
99    paths = glob.glob(target)
100    paths.sort(reverse=True)
101    if has_url_results():
102        print('{:-^{uuid_len}} {:-^{result_len}} {:-^{result_url_len}} {:-^{command_len}}'
103              .format('uuid', 'result', 'result_url', 'command',
104                      uuid_len=_UUID_LEN,
105                      result_len=_RESULT_LEN,
106                      result_url_len=_RESULT_URL_LEN,
107                      command_len=_COMMAND_LEN))
108    else:
109        print('{:-^{uuid_len}} {:-^{result_len}} {:-^{command_len}}'
110              .format('uuid', 'result', 'command',
111                      uuid_len=_UUID_LEN,
112                      result_len=_RESULT_LEN,
113                      command_len=_COMMAND_LEN))
114    for path in paths[0: int(history_arg)+1]:
115        result_path = os.path.join(path, 'test_result')
116        if os.path.isfile(result_path):
117            try:
118                with open(result_path) as json_file:
119                    result = json.load(json_file)
120                    total_summary = result.get(_TOTAL_SUMMARY_KEY, {})
121                    summary_str = ', '.join([k[:1]+':'+str(v)
122                                             for k, v in total_summary.items()])
123                    test_result_url = result.get(_TEST_RESULT_LINK, '')
124                    if has_url_results():
125                        print('{:<{uuid_len}} {:<{result_len}} '
126                              '{:<{result_url_len}} atest {:<{command_len}}'
127                              .format(os.path.basename(path),
128                                      summary_str,
129                                      test_result_url,
130                                      result.get(_ARGS_KEY, ''),
131                                      uuid_len=_UUID_LEN,
132                                      result_len=_RESULT_LEN,
133                                      result_url_len=_RESULT_URL_LEN,
134                                      command_len=_COMMAND_LEN))
135                    else:
136                        print('{:<{uuid_len}} {:<{result_len}} atest {:<{command_len}}'
137                              .format(os.path.basename(path),
138                                      summary_str,
139                                      result.get(_ARGS_KEY, ''),
140                                      uuid_len=_UUID_LEN,
141                                      result_len=_RESULT_LEN,
142                                      command_len=_COMMAND_LEN))
143            except ValueError:
144                pass
145
146
147def print_test_result_by_path(path):
148    """Print latest test result.
149
150    Args:
151        path: A string of test result path.
152    """
153    if os.path.isfile(path):
154        with open(path) as json_file:
155            result = json.load(json_file)
156            print("\natest {}".format(result.get(_ARGS_KEY, '')))
157            test_result_url = result.get(_TEST_RESULT_LINK, '')
158            if test_result_url:
159                print('\nTest Result Link: {}'.format(test_result_url))
160            print('\nTotal Summary:\n{}'.format(au.delimiter('-')))
161            total_summary = result.get(_TOTAL_SUMMARY_KEY, {})
162            print(', '.join([(k+':'+str(v))
163                             for k, v in total_summary.items()]))
164            fail_num = total_summary.get(_STATUS_FAILED_KEY)
165            if fail_num > 0:
166                message = '%d test failed' % fail_num
167                print('\n')
168                print(au.colorize(message, constants.RED))
169                print('-' * len(message))
170                test_runner = result.get(_TEST_RUNNER_KEY, {})
171                for runner_name in test_runner.keys():
172                    test_dict = test_runner.get(runner_name, {})
173                    for test_name in test_dict:
174                        test_details = test_dict.get(test_name, {})
175                        for fail in test_details.get(_STATUS_FAILED_KEY):
176                            print(au.colorize('{}'.format(
177                                fail.get(_TEST_NAME_KEY)), constants.RED))
178                            failure_files = glob.glob(_LOGCAT_FMT.format(
179                                os.path.dirname(path), fail.get(_TEST_NAME_KEY)
180                                ))
181                            if failure_files:
182                                print('{} {}'.format(
183                                    au.colorize('LOGCAT-ON-FAILURES:',
184                                                constants.CYAN),
185                                    failure_files[0]))
186                            print('{} {}'.format(
187                                au.colorize('STACKTRACE:\n', constants.CYAN),
188                                fail.get(_TEST_DETAILS_KEY)))
189
190
191def has_non_test_options(args):
192    """
193    check whether non-test option in the args.
194
195    Args:
196        args: An argspace.Namespace class instance holding parsed args.
197
198    Returns:
199        True, if args has at least one non-test option.
200        False, otherwise.
201    """
202    return (args.collect_tests_only
203            or args.dry_run
204            or args.help
205            or args.history
206            or args.info
207            or args.version
208            or args.latest_result
209            or args.history)
210
211
212def has_url_results():
213    """Get if contains url info."""
214    for root, _, files in os.walk(constants.ATEST_RESULT_ROOT):
215        for file in files:
216            if file != 'test_result':
217                continue
218            json_file = os.path.join(root, 'test_result')
219            with open(json_file) as result:
220                try:
221                    result = json.load(result)
222                    url_link = result.get(_TEST_RESULT_LINK, '')
223                    if url_link:
224                        return True
225                except ValueError:
226                    pass
227    return False
228
229
230class AtestExecutionInfo:
231    """Class that stores the whole test progress information in JSON format.
232
233    ----
234    For example, running command
235        atest hello_world_test HelloWorldTest
236
237    will result in storing the execution detail in JSON:
238    {
239      "args": "hello_world_test HelloWorldTest",
240      "test_runner": {
241          "AtestTradefedTestRunner": {
242              "hello_world_test": {
243                  "FAILED": [
244                      {"test_time": "(5ms)",
245                       "details": "Hello, Wor...",
246                       "test_name": "HelloWorldTest#PrintHelloWorld"}
247                      ],
248                  "summary": {"FAILED": 1, "PASSED": 0, "IGNORED": 0}
249              },
250              "HelloWorldTests": {
251                  "PASSED": [
252                      {"test_time": "(27ms)",
253                       "details": null,
254                       "test_name": "...HelloWorldTest#testHalloWelt"},
255                      {"test_time": "(1ms)",
256                       "details": null,
257                       "test_name": "....HelloWorldTest#testHelloWorld"}
258                      ],
259                  "summary": {"FAILED": 0, "PASSED": 2, "IGNORED": 0}
260              }
261          }
262      },
263      "total_summary": {"FAILED": 1, "PASSED": 2, "IGNORED": 0}
264    }
265    """
266
267    result_reporters = []
268
269    def __init__(self, args, work_dir, args_ns):
270        """Initialise an AtestExecutionInfo instance.
271
272        Args:
273            args: Command line parameters.
274            work_dir: The directory for saving information.
275            args_ns: An argspace.Namespace class instance holding parsed args.
276
277        Returns:
278               A json format string.
279        """
280        self.args = args
281        self.work_dir = work_dir
282        self.result_file = None
283        self.args_ns = args_ns
284
285    def __enter__(self):
286        """Create and return information file object."""
287        full_file_name = os.path.join(self.work_dir, _TEST_RESULT_NAME)
288        try:
289            self.result_file = open(full_file_name, 'w')
290        except IOError:
291            logging.error('Cannot open file %s', full_file_name)
292        return self.result_file
293
294    def __exit__(self, exit_type, value, traceback):
295        """Write execution information and close information file."""
296        if self.result_file and not has_non_test_options(self.args_ns):
297            self.result_file.write(AtestExecutionInfo.
298                                   _generate_execution_detail(self.args))
299            self.result_file.close()
300            symlink_latest_result(self.work_dir)
301        main_module = sys.modules.get(_MAIN_MODULE_KEY)
302        main_exit_code = getattr(main_module, _EXIT_CODE_ATTR,
303                                 constants.EXIT_CODE_ERROR)
304        if main_exit_code == constants.EXIT_CODE_SUCCESS:
305            metrics_utils.send_exit_event(main_exit_code)
306        else:
307            metrics_utils.handle_exc_and_send_exit_event(main_exit_code)
308
309    @staticmethod
310    def _generate_execution_detail(args):
311        """Generate execution detail.
312
313        Args:
314            args: Command line parameters that you want to save.
315
316        Returns:
317            A json format string.
318        """
319        info_dict = {_ARGS_KEY: ' '.join(args)}
320        try:
321            AtestExecutionInfo._arrange_test_result(
322                info_dict,
323                AtestExecutionInfo.result_reporters)
324            return json.dumps(info_dict)
325        except ValueError as err:
326            logging.warning('Parsing test result failed due to : %s', err)
327
328    @staticmethod
329    def _arrange_test_result(info_dict, reporters):
330        """Append test result information in given dict.
331
332        Arrange test information to below
333        "test_runner": {
334            "test runner name": {
335                "test name": {
336                    "FAILED": [
337                        {"test time": "",
338                         "details": "",
339                         "test name": ""}
340                    ],
341                "summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0}
342                },
343            },
344        "total_summary": {"FAILED": 0, "PASSED": 0, "IGNORED": 0}
345
346        Args:
347            info_dict: A dict you want to add result information in.
348            reporters: A list of result_reporter.
349
350        Returns:
351            A dict contains test result information data.
352        """
353        info_dict[_TEST_RUNNER_KEY] = {}
354        for reporter in reporters:
355            if reporter.test_result_link:
356                info_dict[_TEST_RESULT_LINK] = reporter.test_result_link
357            for test in reporter.all_test_results:
358                runner = info_dict[_TEST_RUNNER_KEY].setdefault(
359                    test.runner_name, {})
360                group = runner.setdefault(test.group_name, {})
361                result_dict = {_TEST_NAME_KEY: test.test_name,
362                               _TEST_TIME_KEY: test.test_time,
363                               _TEST_DETAILS_KEY: test.details}
364                group.setdefault(test.status, []).append(result_dict)
365
366        total_test_group_summary = _SUMMARY_MAP_TEMPLATE.copy()
367        for runner in info_dict[_TEST_RUNNER_KEY]:
368            for group in info_dict[_TEST_RUNNER_KEY][runner]:
369                group_summary = _SUMMARY_MAP_TEMPLATE.copy()
370                for status in info_dict[_TEST_RUNNER_KEY][runner][group]:
371                    count = len(info_dict[_TEST_RUNNER_KEY][runner][group][status])
372                    if status in _SUMMARY_MAP_TEMPLATE:
373                        group_summary[status] = count
374                        total_test_group_summary[status] += count
375                info_dict[_TEST_RUNNER_KEY][runner][group][_SUMMARY_KEY] = group_summary
376        info_dict[_TOTAL_SUMMARY_KEY] = total_test_group_summary
377        return info_dict
378