1# 2# Copyright (C) 2018 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import logging 18import os 19import shutil 20import tempfile 21 22import environment_variables as env 23import syzkaller_test_case 24 25from vts.runners.host import asserts 26from vts.runners.host import base_test 27from vts.runners.host import config_parser 28from vts.runners.host import keys 29from vts.runners.host import test_runner 30from vts.utils.python.common import cmd_utils 31from vts.utils.python.controllers import adb 32from vts.utils.python.controllers import android_device 33from vts.utils.python.gcs import gcs_api_utils 34 35 36class SyzkallerTest(base_test.BaseTestClass): 37 """Runs Syzkaller tests on target.""" 38 39 start_vts_agents = False 40 41 def setUpClass(self): 42 """Creates a remote shell instance, and copies data files. 43 44 Attributes: 45 _env: dict, a mapping from key to environment path of this test. 46 _gcs_api_utils: a GcsApiUtils object used for uploading logs. 47 _dut: an android device controller object used for getting device info. 48 """ 49 50 required_params = [ 51 keys.ConfigKeys.IKEY_SERVICE_JSON_PATH, 52 keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME, 53 keys.ConfigKeys.IKEY_SYZKALLER_PACKAGES_PATH, 54 keys.ConfigKeys.IKEY_SYZKALLER_TEMPLATE_PATH 55 ] 56 self.getUserParams(required_params) 57 58 _temp_dir = tempfile.mkdtemp() 59 self._env = dict() 60 self._env['temp_dir'] = _temp_dir 61 self._env['syzkaller_dir'] = os.path.join(_temp_dir, env.syzkaller_dir) 62 self._env['syzkaller_bin_dir'] = os.path.join(_temp_dir, 63 env.syzkaller_bin_dir) 64 self._env['syzkaller_work_dir'] = os.path.join(_temp_dir, 65 env.syzkaller_work_dir) 66 self._env['template_cfg'] = os.path.join(_temp_dir, env.template_cfg) 67 68 self._gcs_api_utils = gcs_api_utils.GcsApiUtils( 69 self.service_key_json_path, self.fuzzing_gcs_bucket_name) 70 self._dut = self.android_devices[0] 71 72 def FetchSyzkaller(self): 73 """Fetch Syzkaller program from x20 and make sure files are executable.""" 74 try: 75 logging.info('Fetching Syzkaller program.') 76 shutil.copytree(self.syzkaller_packages_path, 77 self._env['syzkaller_dir']) 78 logging.info('Fetching Syzkaller template configuration.') 79 shutil.copy(self.syzkaller_template_path, self._env['temp_dir']) 80 except IOError, e: 81 logging.exception(e) 82 self.skipAllTests( 83 'Syzkaller program is not available. Skipping all tests.') 84 85 for root, dirs, files in os.walk(self._env['syzkaller_bin_dir']): 86 for filepath in files: 87 os.chmod(os.path.join(root, filepath), 0755) 88 89 def CreateTestCases(self): 90 """Create syzkaller test cases. 91 92 Returns: 93 test_cases, list, the list of test_cases for this test. 94 """ 95 test_cases = [] 96 test_cases.append( 97 syzkaller_test_case.SyzkallerTestCase(self._env, 'linux/arm64', 98 self._dut.serial, 'adb', 99 'false', 12)) 100 return test_cases 101 102 def RunTestCase(self, test_case): 103 """Run a syzkaller test case. 104 105 Args: 106 test_case: SyzkallerTestCase object, the test case to run. 107 """ 108 test_command = test_case.GetRunCommand() 109 stdout, stderr, err_code = cmd_utils.ExecuteOneShellCommand( 110 test_command, timeout=18000) 111 if err_code: 112 logging.error(stderr) 113 else: 114 logging.info(stdout) 115 self.ReportTestCase(test_case) 116 117 def ReportTestCase(self, test_case): 118 """Asserts the result of the test case and uploads report to GCS. 119 120 Args: 121 test_case: SyzkallerTestCase object, the test case to report. 122 """ 123 self.AssertTestCaseResult(test_case) 124 self._gcs_api_utils.UploadDir(test_case._work_dir_path, 125 'kernelfuzz_reports') 126 127 def AssertTestCaseResult(self, test_case): 128 """Asserts that test case finished as expected. 129 130 If crash reports were generated during the test, reports test as failure. 131 If crash reports were not generated during the test, reports test as success. 132 133 Args: 134 test_case: SyzkallerTestCase object, the test case to assert 135 as failure or success. 136 """ 137 logging.info('Test case results.') 138 crash_dir = os.path.join(test_case._work_dir_path, 'crashes') 139 if os.listdir(crash_dir) == []: 140 logging.info('%s did not cause crash in our device.', 141 test_case._test_name) 142 else: 143 asserts.fail('%s caused crash in our device.', 144 test_case._test_name) 145 146 def tearDownClass(self): 147 """Removes the temporary directory used for Syzkaller.""" 148 logging.debug('Temporary directory %s is being deleted', 149 self._env['temp_dir']) 150 try: 151 shutil.rmtree(self._env['temp_dir']) 152 except OSError as e: 153 logging.exception(e) 154 155 def generateKernelFuzzerTests(self): 156 """Runs kernel fuzzer tests.""" 157 self.FetchSyzkaller() 158 self.runGeneratedTests( 159 test_func=self.RunTestCase, 160 settings=self.CreateTestCases(), 161 name_func=lambda x: x._test_name) 162 163 164if __name__ == '__main__': 165 test_runner.main() 166