1#!/usr/bin/env python 2# 3# Copyright (C) 2017 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18import logging 19import os 20import shutil 21 22from vts.runners.host import asserts 23from vts.runners.host import base_test 24from vts.runners.host import const 25from vts.runners.host import keys 26from vts.runners.host import test_runner 27from vts.utils.python.controllers import adb 28from vts.utils.python.controllers import android_device 29from vts.utils.python.common import list_utils 30 31from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config 32from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase 33 34 35class LibFuzzerTest(base_test.BaseTestClass): 36 """Runs LLVM libfuzzer tests on target. 37 38 Attributes: 39 _dut: AndroidDevice, the device under test as config. 40 """ 41 42 def setUpClass(self): 43 """Creates a remote shell instance, and copies data files.""" 44 required_params = [ 45 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 46 keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, 47 ] 48 self.getUserParams(required_params) 49 50 logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH, 51 self.data_file_path) 52 logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, 53 self.binary_test_source) 54 55 self._dut = self.android_devices[0] 56 self._dut.stop() 57 self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR) 58 59 def tearDownClass(self): 60 """Deletes all copied data.""" 61 self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR) 62 self._dut.start() 63 64 def PushFiles(self, src): 65 """adb pushes test case file to target.""" 66 push_src = os.path.join(self.data_file_path, src) 67 push_dst = config.FUZZER_TEST_DIR 68 self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True) 69 logging.info('Adb pushed: %s \nto: %s', push_src, push_dst) 70 return push_dst 71 72 def CreateTestCases(self): 73 """Creates LibFuzzerTestCase instances. 74 75 Returns: 76 LibFuzzerTestCase list. 77 """ 78 test_cases = map( 79 lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}), 80 self.binary_test_source) 81 return test_cases 82 83 def CreateCorpusOut(self, test_case): 84 """Creates corpus output directory on the target. 85 86 Args: 87 test_case: LibFuzzerTestCase object, current test case. 88 89 Throws: 90 throws an AdbError when there is an error in adb operations. 91 """ 92 corpus_out = test_case.GetCorpusOutDir() 93 self._dut.adb.shell('mkdir %s -p' % corpus_out) 94 95 def RetrieveCorpusSeed(self, test_case): 96 """Retrieves corpus seed directory from GCS to the target. 97 98 Args: 99 test_case: LibFuzzerTestCase object, current test case. 100 101 Throws: 102 throws an AdbError when there is an error in adb operations. 103 104 Returns: 105 inuse_seed, the file path of the inuse seed in GCS, if fetch succeeded. 106 None, otherwise. 107 """ 108 inuse_seed = self._corpus_manager.FetchCorpusSeed( 109 test_case._test_name, self._temp_dir) 110 local_corpus_seed_dir = os.path.join( 111 self._temp_dir, '%s_corpus_seed' % test_case._test_name) 112 if os.path.exists(local_corpus_seed_dir) and os.listdir( 113 local_corpus_seed_dir): 114 self._dut.adb.push(local_corpus_seed_dir, config.FUZZER_TEST_DIR) 115 else: 116 corpus_seed = test_case.GetCorpusSeedDir() 117 self._dut.adb.shell('mkdir %s -p' % corpus_seed) 118 return inuse_seed 119 120 def AnalyzeGeneratedCorpus(self, test_case): 121 """Analyzes the generated corpus body. 122 123 Args: 124 test_case: LibFuzzerTestCase object. 125 126 Returns: 127 number of newly generated corpus strings, if the out directory exists. 128 0, otherwise. 129 """ 130 logging.info('temporary directory for this test: %s', self._temp_dir) 131 pulled_corpus_out_dir = os.path.join( 132 self._temp_dir, os.path.basename(test_case.GetCorpusOutDir())) 133 if os.path.exists(pulled_corpus_out_dir): 134 logging.info('corpus out directory pulled from target: %s', 135 pulled_corpus_out_dir) 136 pulled_corpus = os.listdir(pulled_corpus_out_dir) 137 logging.debug(pulled_corpus) 138 logging.info('generated corpus size: %d', len(pulled_corpus)) 139 return len(pulled_corpus) 140 else: 141 logging.error('corput out directory does not exist on the host.') 142 return 0 143 144 def EvaluateTestcase(self, test_case, result, inuse_seed): 145 """Evaluates the test result and moves the used seed accordingly. 146 147 Args: 148 test_case: LibFuzzerTestCase object. 149 result: a result dict object returned by the adb shell command. 150 inuse_seed: the seed used as input to this test case. 151 152 Raises: 153 signals.TestFailure when the testcase failed. 154 """ 155 return_codes = result.get('return_codes', None) 156 if return_codes == config.ExitCode.FUZZER_TEST_PASS: 157 logging.info( 158 'adb shell fuzzing command exited normally with exitcode %d.', 159 result['return_codes']) 160 if inuse_seed is not None: 161 self._corpus_manager.InuseToDest(test_case._test_name, 162 inuse_seed, 'corpus_complete') 163 elif return_codes == config.ExitCode.FUZZER_TEST_FAIL: 164 logging.info( 165 'adb shell fuzzing command exited normally with exitcode %d.', 166 result['return_codes']) 167 if inuse_seed is not None: 168 self._corpus_manager.InuseToDest(test_case._test_name, 169 inuse_seed, 'corpus_crash') 170 else: 171 logging.error('adb shell fuzzing command exited abnormally.') 172 if inuse_seed is not None: 173 self._corpus_manager.InuseToDest(test_case._test_name, 174 inuse_seed, 'corpus_error') 175 176 def RunTestcase(self, test_case): 177 """Runs the given test case and asserts the result. 178 179 Args: 180 test_case: LibFuzzerTestCase object. 181 """ 182 self.PushFiles(test_case.bin_host_path) 183 self.CreateCorpusOut(test_case) 184 inuse_seed = self.RetrieveCorpusSeed(test_case) 185 if inuse_seed == 'locked': 186 # skip this test case 187 logging.warning('test case locked, skipping testcase %s.', 188 test_case.test_name) 189 return 190 191 fuzz_cmd = '"%s"' % test_case.GetRunCommand() 192 193 result = {} 194 try: 195 result = self._dut.adb.shell(fuzz_cmd, no_except=True) 196 except adb.AdbError as e: 197 logging.exception(e) 198 199 corpus_trigger_dir = os.path.join(self._temp_dir, 200 test_case.GetCorpusTriggerDir()) 201 os.makedirs(corpus_trigger_dir) 202 try: 203 self._dut.adb.pull(config.FUZZER_TEST_CRASH_REPORT, 204 corpus_trigger_dir) 205 except adb.AdbError as e: 206 logging.exception(e) 207 logging.error('crash report was not created during test run.') 208 209 try: 210 self._dut.adb.pull(test_case.GetCorpusOutDir(), self._temp_dir) 211 self.AnalyzeGeneratedCorpus(test_case) 212 self._corpus_manager.UploadCorpusOutDir(test_case._test_name, 213 self._temp_dir) 214 except adb.AdbError as e: 215 logging.exception(e) 216 logging.error('Device failed. Removing lock from GCS.') 217 self._corpus_manager.remove_lock(test_case._test_name) 218 219 if inuse_seed is not 'directory': 220 self.EvaluateTestcase(test_case, result, inuse_seed) 221 self.AssertTestResult(test_case, result) 222 223 def LogCrashReport(self, test_case): 224 """Logs crash-causing fuzzer input. 225 226 Reads the crash report file and logs the contents in format: 227 '\x01\x23\x45\x67\x89\xab\xcd\xef' 228 229 Args: 230 test_case: LibFuzzerTestCase object 231 """ 232 touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT 233 self._dut.adb.shell(touch_cmd) 234 235 # output is string of a hexdump from crash report file. 236 # From the example above, output would be '0123456789abcdef'. 237 xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT 238 output = self._dut.adb.shell(xxd_cmd) 239 remove_chars = ['\r', '\t', '\n', ' '] 240 for char in remove_chars: 241 output = output.replace(char, '') 242 243 crash_report = '' 244 # output is guaranteed to be even in length since its a hexdump. 245 for offset in xrange(0, len(output), 2): 246 crash_report += '\\x%s' % output[offset:offset + 2] 247 248 logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"', 249 test_case.test_name, crash_report) 250 251 # TODO(trong): differentiate between crashes and sanitizer rule violations. 252 def AssertTestResult(self, test_case, result): 253 """Asserts that test case finished as expected. 254 255 Checks that device is in responsive state. If not, waits for boot 256 then reports test as failure. If it is, asserts that all test commands 257 returned exit code 0. 258 259 Args: 260 test_case: LibFuzzerTestCase object 261 result: dict(str, str, int), command results from shell. 262 """ 263 logging.info('Test case results.') 264 logging.info('stdout: %s' % result[const.STDOUT]) 265 logging.info('stderr: %s' % result[const.STDERR]) 266 logging.info('exit code: %s' % result[const.EXIT_CODE]) 267 if not self._dut.hasBooted(): 268 self._dut.waitForBootCompletion() 269 asserts.fail('%s left the device in unresponsive state.' % 270 test_case.test_name) 271 272 exit_code = result[const.EXIT_CODE] 273 if exit_code == config.ExitCode.FUZZER_TEST_FAIL: 274 #TODO(b/64123979): once normal fail happens, examine. 275 self.LogCrashReport(test_case) 276 asserts.fail('%s failed normally.' % test_case.test_name) 277 elif exit_code != config.ExitCode.FUZZER_TEST_PASS: 278 asserts.fail('%s failed abnormally.' % test_case.test_name) 279 280 def tearDownClass(self): 281 """Removes the temporary directory used for corpus management.""" 282 logging.debug('Temporary directory %s is being deleted', 283 self._temp_dir) 284 try: 285 shutil.rmtree(self._temp_dir) 286 except OSError as e: 287 logging.exception(e) 288 289 def generateFuzzerTests(self): 290 """Runs fuzzer tests.""" 291 self.runGeneratedTests( 292 test_func=self.RunTestcase, 293 settings=self.CreateTestCases(), 294 name_func=lambda x: x.test_name) 295 296 297if __name__ == '__main__': 298 test_runner.main() 299