1# 2# Copyright (C) 2016 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import logging 18import os 19 20from vts.runners.host import asserts 21from vts.runners.host import base_test 22from vts.runners.host import const 23from vts.runners.host import keys 24from vts.runners.host import test_runner 25from vts.utils.python.controllers import adb 26 27from vts.utils.python.common import list_utils 28from vts.utils.python.os import path_utils 29 30from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config 31 32 33class LLVMFuzzerTest(base_test.BaseTestClass): 34 """Runs fuzzer tests on target. 35 36 Attributes: 37 _dut: AndroidDevice, the device under test as config 38 _testcases: string list, list of testcases to run 39 start_vts_agents: whether to start vts agents when registering new 40 android devices. 41 """ 42 start_vts_agents = False 43 44 def setUpClass(self): 45 """Creates a remote shell instance, and copies data files.""" 46 required_params = [ 47 keys.ConfigKeys.IKEY_DATA_FILE_PATH, 48 config.ConfigKeys.FUZZER_CONFIGS 49 ] 50 self.getUserParams(required_params) 51 52 self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys()) 53 54 logging.debug("Testcases: %s", self._testcases) 55 logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH, 56 self.data_file_path) 57 logging.debug("%s: %s", config.ConfigKeys.FUZZER_CONFIGS, 58 self.fuzzer_configs) 59 60 self._dut = self.android_devices[0] 61 self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR) 62 63 def tearDownClass(self): 64 """Deletes all copied data.""" 65 self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR) 66 67 def PushFiles(self, testcase): 68 """adb pushes testcase file to target. 69 70 Args: 71 testcase: string, path to executable fuzzer. 72 """ 73 push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR, 74 testcase) 75 self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR)) 76 logging.debug("Adb pushed: %s", testcase) 77 78 def CreateFuzzerFlags(self, fuzzer_config): 79 """Creates flags for the fuzzer executable. 80 81 Args: 82 fuzzer_config: dict, contains configuration for the fuzzer. 83 84 Returns: 85 string, command line flags for fuzzer executable. 86 """ 87 88 def _SerializeVTSFuzzerParams(params): 89 """Creates VTS command line flags for fuzzer executable. 90 91 Args: 92 params: dict, contains flags and their values. 93 94 Returns: 95 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 96 """ 97 VTS_SPEC_FILES = "vts_spec_files" 98 VTS_EXEC_SIZE = "vts_exec_size" 99 DELIMITER = ":" 100 101 # vts_spec_files is a string list, will be serialized like this: 102 # [a, b, c] -> "a:b:c" 103 vts_spec_files = params.get(VTS_SPEC_FILES, {}) 104 target_vts_spec_files = DELIMITER.join(map( 105 lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x), 106 vts_spec_files)) 107 flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files) 108 109 vts_exec_size = params.get(VTS_EXEC_SIZE, {}) 110 flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size) 111 return flags 112 113 def _SerializeLLVMFuzzerParams(params): 114 """Creates LLVM libfuzzer command line flags for fuzzer executable. 115 116 Args: 117 params: dict, contains flags and their values. 118 119 Returns: 120 string, of form "--<flag0>=<val0> --<flag1>=<val1> ... " 121 """ 122 return " ".join(["-%s=%s" % (k, v) for k, v in params.items()]) 123 124 vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {}) 125 126 llvmfuzzer_params = config.FUZZER_PARAMS.copy() 127 llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {})) 128 129 vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params) 130 llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params) 131 132 return vts_fuzzer_flags + " -- " + llvmfuzzer_flags 133 134 def CreateCorpus(self, fuzzer, fuzzer_config): 135 """Creates a corpus directory on target. 136 137 Args: 138 fuzzer: string, name of the fuzzer executable. 139 fuzzer_config: dict, contains configuration for the fuzzer. 140 141 Returns: 142 string, path to corpus directory on the target. 143 """ 144 corpus = fuzzer_config.get("corpus", []) 145 corpus_dir = path_utils.JoinTargetPath(config.FUZZER_TEST_DIR, 146 "%s_corpus" % fuzzer) 147 148 self._dut.adb.shell("mkdir %s -p" % corpus_dir) 149 for idx, corpus_entry in enumerate(corpus): 150 corpus_entry = corpus_entry.replace("x", "\\x") 151 corpus_entry_file = path_utils.JoinTargetPath( 152 corpus_dir, "input%s" % idx) 153 cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file) 154 # Vts shell drive doesn't play nicely with escape characters, 155 # so we use adb shell. 156 self._dut.adb.shell("\"%s\"" % cmd) 157 158 return corpus_dir 159 160 def RunTestcase(self, fuzzer): 161 """Runs the given testcase and asserts the result. 162 163 Args: 164 fuzzer: string, name of fuzzer executable. 165 """ 166 self.PushFiles(fuzzer) 167 168 fuzzer_config = self.fuzzer_configs.get(fuzzer, {}) 169 test_flags = self.CreateFuzzerFlags(fuzzer_config) 170 corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config) 171 172 chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath( 173 config.FUZZER_TEST_DIR, fuzzer) 174 self._dut.adb.shell(chmod_cmd) 175 176 cd_cmd = "cd %s" % config.FUZZER_TEST_DIR 177 ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH" 178 test_cmd = "./%s" % fuzzer 179 180 fuzz_cmd = "%s && %s %s %s %s > /dev/null" % (cd_cmd, ld_path, 181 test_cmd, corpus_dir, 182 test_flags) 183 logging.debug("Executing: %s", fuzz_cmd) 184 # TODO(trong): vts shell doesn't handle timeouts properly, change this after it does. 185 try: 186 stdout = self._dut.adb.shell("'%s'" % fuzz_cmd) 187 result = { 188 const.STDOUT: stdout, 189 const.STDERR: "", 190 const.EXIT_CODE: 0 191 } 192 except adb.AdbError as e: 193 result = { 194 const.STDOUT: e.stdout, 195 const.STDERR: e.stderr, 196 const.EXIT_CODE: e.ret_code 197 } 198 self.AssertTestResult(fuzzer, result) 199 200 def LogCrashReport(self, fuzzer): 201 """Logs crash-causing fuzzer input. 202 203 Reads the crash report file and logs the contents in format: 204 "\x01\x23\x45\x67\x89\xab\xcd\xef" 205 206 Args: 207 fuzzer: string, name of fuzzer executable. 208 """ 209 cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT 210 211 # output is string of a hexdump from crash report file. 212 # From the example above, output would be "0123456789abcdef". 213 output = self._dut.adb.shell(cmd) 214 remove_chars = ["\r", "\t", "\n", " "] 215 for char in remove_chars: 216 output = output.replace(char, "") 217 218 crash_report = "" 219 # output is guaranteed to be even in length since its a hexdump. 220 for offset in xrange(0, len(output), 2): 221 crash_report += "\\x%s" % output[offset:offset + 2] 222 223 logging.debug('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer, 224 crash_report) 225 226 # TODO(trong): differentiate between crashes and sanitizer rule violations. 227 def AssertTestResult(self, fuzzer, result): 228 """Asserts that testcase finished as expected. 229 230 Checks that device is in responsive state. If not, waits for boot 231 then reports test as failure. If it is, asserts that all test commands 232 returned exit code 0. 233 234 Args: 235 fuzzer: string, name of fuzzer executable. 236 result: dict(str, str, int), command results from shell. 237 """ 238 logging.debug("Test result: %s" % result) 239 if not self._dut.hasBooted(): 240 self._dut.waitForBootCompletion() 241 asserts.fail("%s left the device in unresponsive state." % fuzzer) 242 243 exit_code = result[const.EXIT_CODE] 244 if exit_code == config.ExitCode.FUZZER_TEST_FAIL: 245 self.LogCrashReport(fuzzer) 246 asserts.fail("%s failed normally." % fuzzer) 247 elif exit_code != config.ExitCode.FUZZER_TEST_PASS: 248 asserts.fail("%s failed abnormally." % fuzzer) 249 250 def generateFuzzerTests(self): 251 """Runs fuzzer tests.""" 252 self.runGeneratedTests( 253 test_func=self.RunTestcase, 254 settings=self._testcases, 255 name_func=lambda x: x.split("/")[-1]) 256 257 258if __name__ == "__main__": 259 test_runner.main() 260