1#
2# Copyright (C) 2016 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import os
19
20from vts.runners.host import asserts
21from vts.runners.host import base_test
22from vts.runners.host import const
23from vts.runners.host import keys
24from vts.runners.host import test_runner
25from vts.utils.python.controllers import adb
26
27from vts.utils.python.common import list_utils
28from vts.utils.python.os import path_utils
29
30from vts.testcases.template.llvmfuzzer_test import llvmfuzzer_test_config as config
31
32
33class LLVMFuzzerTest(base_test.BaseTestClass):
34    """Runs fuzzer tests on target.
35
36    Attributes:
37        _dut: AndroidDevice, the device under test as config
38        _testcases: string list, list of testcases to run
39        start_vts_agents: whether to start vts agents when registering new
40                          android devices.
41    """
42    start_vts_agents = False
43
44    def setUpClass(self):
45        """Creates a remote shell instance, and copies data files."""
46        required_params = [
47            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
48            config.ConfigKeys.FUZZER_CONFIGS
49        ]
50        self.getUserParams(required_params)
51
52        self._testcases = map(lambda x: str(x), self.fuzzer_configs.keys())
53
54        logging.debug("Testcases: %s", self._testcases)
55        logging.debug("%s: %s", keys.ConfigKeys.IKEY_DATA_FILE_PATH,
56                     self.data_file_path)
57        logging.debug("%s: %s", config.ConfigKeys.FUZZER_CONFIGS,
58                     self.fuzzer_configs)
59
60        self._dut = self.android_devices[0]
61        self._dut.adb.shell("mkdir %s -p" % config.FUZZER_TEST_DIR)
62
63    def tearDownClass(self):
64        """Deletes all copied data."""
65        self._dut.adb.shell("rm -rf %s" % config.FUZZER_TEST_DIR)
66
67    def PushFiles(self, testcase):
68        """adb pushes testcase file to target.
69
70        Args:
71            testcase: string, path to executable fuzzer.
72        """
73        push_src = os.path.join(self.data_file_path, config.FUZZER_SRC_DIR,
74                                testcase)
75        self._dut.adb.push("%s %s" % (push_src, config.FUZZER_TEST_DIR))
76        logging.debug("Adb pushed: %s", testcase)
77
78    def CreateFuzzerFlags(self, fuzzer_config):
79        """Creates flags for the fuzzer executable.
80
81        Args:
82            fuzzer_config: dict, contains configuration for the fuzzer.
83
84        Returns:
85            string, command line flags for fuzzer executable.
86        """
87
88        def _SerializeVTSFuzzerParams(params):
89            """Creates VTS command line flags for fuzzer executable.
90
91            Args:
92                params: dict, contains flags and their values.
93
94            Returns:
95                string, of form "--<flag0>=<val0> --<flag1>=<val1> ... "
96            """
97            VTS_SPEC_FILES = "vts_spec_files"
98            VTS_EXEC_SIZE = "vts_exec_size"
99            DELIMITER = ":"
100
101            # vts_spec_files is a string list, will be serialized like this:
102            # [a, b, c] -> "a:b:c"
103            vts_spec_files = params.get(VTS_SPEC_FILES, {})
104            target_vts_spec_files = DELIMITER.join(map(
105                lambda x: path_utils.JoinTargetPath(config.FUZZER_SPEC_DIR, x),
106                vts_spec_files))
107            flags = "--%s=\"%s\" " % (VTS_SPEC_FILES, target_vts_spec_files)
108
109            vts_exec_size = params.get(VTS_EXEC_SIZE, {})
110            flags += "--%s=%s" % (VTS_EXEC_SIZE, vts_exec_size)
111            return flags
112
113        def _SerializeLLVMFuzzerParams(params):
114            """Creates LLVM libfuzzer command line flags for fuzzer executable.
115
116            Args:
117                params: dict, contains flags and their values.
118
119            Returns:
120                string, of form "--<flag0>=<val0> --<flag1>=<val1> ... "
121            """
122            return " ".join(["-%s=%s" % (k, v) for k, v in params.items()])
123
124        vts_fuzzer_params = fuzzer_config.get("vts_fuzzer_params", {})
125
126        llvmfuzzer_params = config.FUZZER_PARAMS.copy()
127        llvmfuzzer_params.update(fuzzer_config.get("llvmfuzzer_params", {}))
128
129        vts_fuzzer_flags = _SerializeVTSFuzzerParams(vts_fuzzer_params)
130        llvmfuzzer_flags = _SerializeLLVMFuzzerParams(llvmfuzzer_params)
131
132        return vts_fuzzer_flags + " -- " + llvmfuzzer_flags
133
134    def CreateCorpus(self, fuzzer, fuzzer_config):
135        """Creates a corpus directory on target.
136
137        Args:
138            fuzzer: string, name of the fuzzer executable.
139            fuzzer_config: dict, contains configuration for the fuzzer.
140
141        Returns:
142            string, path to corpus directory on the target.
143        """
144        corpus = fuzzer_config.get("corpus", [])
145        corpus_dir = path_utils.JoinTargetPath(config.FUZZER_TEST_DIR,
146                                               "%s_corpus" % fuzzer)
147
148        self._dut.adb.shell("mkdir %s -p" % corpus_dir)
149        for idx, corpus_entry in enumerate(corpus):
150            corpus_entry = corpus_entry.replace("x", "\\x")
151            corpus_entry_file = path_utils.JoinTargetPath(
152                corpus_dir, "input%s" % idx)
153            cmd = "echo -ne '%s' > %s" % (str(corpus_entry), corpus_entry_file)
154            # Vts shell drive doesn't play nicely with escape characters,
155            # so we use adb shell.
156            self._dut.adb.shell("\"%s\"" % cmd)
157
158        return corpus_dir
159
160    def RunTestcase(self, fuzzer):
161        """Runs the given testcase and asserts the result.
162
163        Args:
164            fuzzer: string, name of fuzzer executable.
165        """
166        self.PushFiles(fuzzer)
167
168        fuzzer_config = self.fuzzer_configs.get(fuzzer, {})
169        test_flags = self.CreateFuzzerFlags(fuzzer_config)
170        corpus_dir = self.CreateCorpus(fuzzer, fuzzer_config)
171
172        chmod_cmd = "chmod -R 755 %s" % path_utils.JoinTargetPath(
173            config.FUZZER_TEST_DIR, fuzzer)
174        self._dut.adb.shell(chmod_cmd)
175
176        cd_cmd = "cd %s" % config.FUZZER_TEST_DIR
177        ld_path = "LD_LIBRARY_PATH=/data/local/tmp/64:/data/local/tmp/32:$LD_LIBRARY_PATH"
178        test_cmd = "./%s" % fuzzer
179
180        fuzz_cmd = "%s && %s %s %s %s > /dev/null" % (cd_cmd, ld_path,
181                                                      test_cmd, corpus_dir,
182                                                      test_flags)
183        logging.debug("Executing: %s", fuzz_cmd)
184        # TODO(trong): vts shell doesn't handle timeouts properly, change this after it does.
185        try:
186            stdout = self._dut.adb.shell("'%s'" % fuzz_cmd)
187            result = {
188                const.STDOUT: stdout,
189                const.STDERR: "",
190                const.EXIT_CODE: 0
191            }
192        except adb.AdbError as e:
193            result = {
194                const.STDOUT: e.stdout,
195                const.STDERR: e.stderr,
196                const.EXIT_CODE: e.ret_code
197            }
198        self.AssertTestResult(fuzzer, result)
199
200    def LogCrashReport(self, fuzzer):
201        """Logs crash-causing fuzzer input.
202
203        Reads the crash report file and logs the contents in format:
204        "\x01\x23\x45\x67\x89\xab\xcd\xef"
205
206        Args:
207            fuzzer: string, name of fuzzer executable.
208        """
209        cmd = "xxd -p %s" % config.FUZZER_TEST_CRASH_REPORT
210
211        # output is string of a hexdump from crash report file.
212        # From the example above, output would be "0123456789abcdef".
213        output = self._dut.adb.shell(cmd)
214        remove_chars = ["\r", "\t", "\n", " "]
215        for char in remove_chars:
216            output = output.replace(char, "")
217
218        crash_report = ""
219        # output is guaranteed to be even in length since its a hexdump.
220        for offset in xrange(0, len(output), 2):
221            crash_report += "\\x%s" % output[offset:offset + 2]
222
223        logging.debug('FUZZER_TEST_CRASH_REPORT for %s: "%s"', fuzzer,
224                     crash_report)
225
226    # TODO(trong): differentiate between crashes and sanitizer rule violations.
227    def AssertTestResult(self, fuzzer, result):
228        """Asserts that testcase finished as expected.
229
230        Checks that device is in responsive state. If not, waits for boot
231        then reports test as failure. If it is, asserts that all test commands
232        returned exit code 0.
233
234        Args:
235            fuzzer: string, name of fuzzer executable.
236            result: dict(str, str, int), command results from shell.
237        """
238        logging.debug("Test result: %s" % result)
239        if not self._dut.hasBooted():
240            self._dut.waitForBootCompletion()
241            asserts.fail("%s left the device in unresponsive state." % fuzzer)
242
243        exit_code = result[const.EXIT_CODE]
244        if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
245            self.LogCrashReport(fuzzer)
246            asserts.fail("%s failed normally." % fuzzer)
247        elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
248            asserts.fail("%s failed abnormally." % fuzzer)
249
250    def generateFuzzerTests(self):
251        """Runs fuzzer tests."""
252        self.runGeneratedTests(
253            test_func=self.RunTestcase,
254            settings=self._testcases,
255            name_func=lambda x: x.split("/")[-1])
256
257
258if __name__ == "__main__":
259    test_runner.main()
260