1#!/usr/bin/env python3
2#
3# Copyright (C) 2021 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""test_utils.py: utils for testing.
18"""
19
20import logging
21from multiprocessing.connection import Connection
22import os
23from pathlib import Path
24import shutil
25import sys
26import subprocess
27import time
28from typing import List, Optional
29import unittest
30
31from simpleperf_utils import remove, get_script_dir, AdbHelper, is_windows, bytes_to_str
32
33INFERNO_SCRIPT = str(Path(__file__).parents[1] / ('inferno.bat' if is_windows() else 'inferno.sh'))
34
35
36class TestHelper:
37    """ Keep global test options. """
38
39    @classmethod
40    def init(
41            cls, test_dir: str, testdata_dir: str, use_browser: bool, ndk_path: Optional[str],
42            device_serial_number: Optional[str],
43            progress_conn: Optional[Connection]):
44        """
45            When device_serial_number is None, no Android device is used.
46            When device_serial_number is '', use the default Android device.
47            When device_serial_number is not empty, select Android device by serial number.
48        """
49        cls.script_dir = Path(__file__).resolve().parents[1]
50        cls.test_base_dir = Path(test_dir).resolve()
51        cls.test_base_dir.mkdir(parents=True, exist_ok=True)
52        cls.testdata_dir = Path(testdata_dir).resolve()
53        cls.browser_option = [] if use_browser else ['--no_browser']
54        cls.ndk_path = ndk_path
55        cls.progress_conn = progress_conn
56
57        # Logs can come from multiple processes. So use append mode to avoid overwrite.
58        cls.log_fh = open(cls.test_base_dir / 'test.log', 'a')
59        logging.getLogger().handlers.clear()
60        logging.getLogger().addHandler(logging.StreamHandler(cls.log_fh))
61        os.close(sys.stderr.fileno())
62        os.dup2(cls.log_fh.fileno(), sys.stderr.fileno())
63
64        if device_serial_number is not None:
65            if device_serial_number:
66                os.environ['ANDROID_SERIAL'] = device_serial_number
67            cls.adb = AdbHelper(enable_switch_to_root=True)
68            cls.android_version = cls.adb.get_android_version()
69            cls.device_features = None
70
71    @classmethod
72    def log(cls, s: str):
73        cls.log_fh.write(s + '\n')
74        # Child processes can also write to log file, so flush it immediately to keep the order.
75        cls.log_fh.flush()
76
77    @classmethod
78    def testdata_path(cls, testdata_name: str) -> str:
79        """ Return the path of a test data. """
80        return str(cls.testdata_dir / testdata_name)
81
82    @classmethod
83    def get_test_dir(cls, test_name: str) -> Path:
84        """ Return the dir to run a test. """
85        return cls.test_base_dir / test_name
86
87    @classmethod
88    def script_path(cls, script_name: str) -> str:
89        """ Return the dir of python scripts. """
90        return str(cls.script_dir / script_name)
91
92    @classmethod
93    def get_device_features(cls):
94        if cls.device_features is None:
95            args = [sys.executable, cls.script_path(
96                'run_simpleperf_on_device.py'), 'list', '--show-features']
97            output = subprocess.check_output(args, stderr=TestHelper.log_fh)
98            output = bytes_to_str(output)
99            cls.device_features = output.split()
100        return cls.device_features
101
102    @classmethod
103    def is_trace_offcpu_supported(cls):
104        return 'trace-offcpu' in cls.get_device_features()
105
106    @classmethod
107    def get_32bit_abi(cls):
108        return cls.adb.get_property('ro.product.cpu.abilist32').strip().split(',')[0]
109
110    @classmethod
111    def write_progress(cls, progress: str):
112        if cls.progress_conn:
113            cls.progress_conn.send(progress)
114
115
116class TestBase(unittest.TestCase):
117    def setUp(self):
118        """ Run each test in a separate dir. """
119        self.test_dir = TestHelper.get_test_dir(
120            '%s.%s' % (self.__class__.__name__, self._testMethodName))
121        self.test_dir.mkdir()
122        os.chdir(self.test_dir)
123        TestHelper.log('begin test %s.%s' % (self.__class__.__name__, self._testMethodName))
124
125    def run(self, result=None):
126        start_time = time.time()
127        ret = super(TestBase, self).run(result)
128        if result.errors and result.errors[-1][0] == self:
129            status = 'FAILED'
130            err_info = result.errors[-1][1]
131        elif result.failures and result.failures[-1][0] == self:
132            status = 'FAILED'
133            err_info = result.failures[-1][1]
134        else:
135            status = 'OK'
136
137        time_taken = time.time() - start_time
138        TestHelper.log(
139            'end test %s.%s %s (%.3fs)' %
140            (self.__class__.__name__, self._testMethodName, status, time_taken))
141        if status != 'OK':
142            TestHelper.log(err_info)
143
144        # Remove test data for passed tests to save space.
145        if status == 'OK':
146            remove(self.test_dir)
147        TestHelper.write_progress(
148            '%s.%s  %s' % (self.__class__.__name__, self._testMethodName, status))
149        return ret
150
151    def run_cmd(self, args: List[str], return_output=False, drop_output=True) -> str:
152        if args[0] == 'report_html.py' or args[0] == INFERNO_SCRIPT:
153            args += TestHelper.browser_option
154        if TestHelper.ndk_path:
155            if args[0] in ['app_profiler.py', 'binary_cache_builder.py', 'pprof_proto_generator.py',
156                           'report_html.py']:
157                args += ['--ndk_path', TestHelper.ndk_path]
158        if args[0].endswith('.py'):
159            args = [sys.executable, TestHelper.script_path(args[0])] + args[1:]
160        use_shell = args[0].endswith('.bat')
161        try:
162            if return_output:
163                stdout_fd = subprocess.PIPE
164                drop_output = False
165            elif drop_output:
166                stdout_fd = subprocess.DEVNULL
167            else:
168                stdout_fd = None
169
170            subproc = subprocess.Popen(args, stdout=stdout_fd,
171                                       stderr=TestHelper.log_fh, shell=use_shell)
172            stdout_data, _ = subproc.communicate()
173            output_data = bytes_to_str(stdout_data)
174            returncode = subproc.returncode
175
176        except OSError:
177            returncode = None
178        self.assertEqual(returncode, 0, msg="failed to run cmd: %s" % args)
179        if return_output:
180            return output_data
181        return ''
182
183    def check_strings_in_file(self, filename, strings):
184        self.check_exist(filename=filename)
185        with open(filename, 'r') as fh:
186            self.check_strings_in_content(fh.read(), strings)
187
188    def check_exist(self, filename=None, dirname=None):
189        if filename:
190            self.assertTrue(os.path.isfile(filename), filename)
191        if dirname:
192            self.assertTrue(os.path.isdir(dirname), dirname)
193
194    def check_strings_in_content(self, content, strings):
195        fulfilled = [content.find(s) != -1 for s in strings]
196        self.check_fulfilled_entries(fulfilled, strings)
197
198    def check_fulfilled_entries(self, fulfilled, entries):
199        failed_entries = []
200        for ok, entry in zip(fulfilled, entries):
201            if not ok:
202                failed_entries.append(entry)
203
204        if failed_entries:
205            self.fail('failed in below entries: %s' % (failed_entries,))
206