1#!/usr/bin/env python
2#
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the 'License');
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an 'AS IS' BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import os
20
21from vts.runners.host import asserts
22from vts.runners.host import base_test
23from vts.runners.host import const
24from vts.runners.host import keys
25from vts.runners.host import test_runner
26from vts.utils.python.controllers import adb
27from vts.utils.python.controllers import android_device
28from vts.utils.python.common import list_utils
29
30from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config
31from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase
32
33
34class LibFuzzerTest(base_test.BaseTestClass):
35    """Runs LLVM libfuzzer tests on target.
36
37    Attributes:
38        _dut: AndroidDevice, the device under test as config.
39    """
40
41    def setUpClass(self):
42        """Creates a remote shell instance, and copies data files."""
43        required_params = [
44            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
45            keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
46        ]
47        self.getUserParams(required_params)
48
49        logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH,
50                     self.data_file_path)
51        logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
52                     self.binary_test_source)
53
54        self._dut = self.registerController(android_device, False)[0]
55        self._dut.stop()
56        self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR)
57
58    def tearDownClass(self):
59        """Deletes all copied data."""
60        self._dut.adb.shell('rm -rf %s' % self.data_file_path)
61        self._dut.start()
62
63    def PushFiles(self, src):
64        """adb pushes test case file to target."""
65        push_src = os.path.join(self.data_file_path, src)
66        push_dst = config.FUZZER_TEST_DIR
67        self._dut.adb.push('%s %s' % (push_src, push_dst))
68        logging.info('Adb pushed: %s \nto: %s', push_src, push_dst)
69        return push_dst
70
71    def CreateTestCases(self):
72        """Creates LibFuzzerTestCase instances.
73
74        Returns:
75            LibFuzzerTestCase list.
76        """
77        test_cases = map(
78            lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}),
79            self.binary_test_source)
80        return test_cases
81
82    # TODO: retrieve the corpus.
83    def CreateCorpusDir(self, test_case):
84        """Creates corpus directory on the target."""
85        corpus_dir = test_case.GetCorpusName()
86        self._dut.adb.shell('mkdir %s -p' % corpus_dir)
87
88    def RunTestcase(self, test_case):
89        """Runs the given test case and asserts the result.
90
91        Args:
92            test_case: LibFuzzerTestCase object
93        """
94        self.PushFiles(test_case.bin_host_path)
95        self.CreateCorpusDir(test_case)
96        fuzz_cmd = test_case.GetRunCommand()
97        logging.info('Executing: %s', fuzz_cmd)
98        try:
99            stdout = self._dut.adb.shell('"%s"' % fuzz_cmd)
100            result = {
101                const.STDOUT: stdout,
102                const.STDERR: '',
103                const.EXIT_CODE: 0
104            }
105        except adb.AdbError as e:
106            result = {
107                const.STDOUT: e.stdout,
108                const.STDERR: e.stderr,
109                const.EXIT_CODE: e.ret_code
110            }
111        # TODO: upload the corpus and, possibly, crash log.
112        self.AssertTestResult(test_case, result)
113
114    def LogCrashReport(self, test_case):
115        """Logs crash-causing fuzzer input.
116
117        Reads the crash report file and logs the contents in format:
118        '\x01\x23\x45\x67\x89\xab\xcd\xef'
119
120        Args:
121            test_case: LibFuzzerTestCase object
122        """
123        touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT
124        self._dut.adb.shell(touch_cmd)
125
126        # output is string of a hexdump from crash report file.
127        # From the example above, output would be '0123456789abcdef'.
128        xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT
129        output = self._dut.adb.shell(xxd_cmd)
130        remove_chars = ['\r', '\t', '\n', ' ']
131        for char in remove_chars:
132            output = output.replace(char, '')
133
134        crash_report = ''
135        # output is guaranteed to be even in length since its a hexdump.
136        for offset in xrange(0, len(output), 2):
137            crash_report += '\\x%s' % output[offset:offset + 2]
138
139        logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"',
140                     test_case.test_name, crash_report)
141
142    # TODO(trong): differentiate between crashes and sanitizer rule violations.
143    def AssertTestResult(self, test_case, result):
144        """Asserts that test case finished as expected.
145
146        Checks that device is in responsive state. If not, waits for boot
147        then reports test as failure. If it is, asserts that all test commands
148        returned exit code 0.
149
150        Args:
151            test_case: LibFuzzerTestCase object
152            result: dict(str, str, int), command results from shell.
153        """
154        logging.info('Test case results.')
155        logging.info('stdout: %s' % result[const.STDOUT])
156        logging.info('stderr: %s' % result[const.STDERR])
157        logging.info('exit code: %s' % result[const.EXIT_CODE])
158        if not self._dut.hasBooted():
159            self._dut.waitForBootCompletion()
160            asserts.fail('%s left the device in unresponsive state.' %
161                         test_case.test_name)
162
163        exit_code = result[const.EXIT_CODE]
164        if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
165            self.LogCrashReport(test_case)
166            asserts.fail('%s failed normally.' % test_case.test_name)
167        elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
168            asserts.fail('%s failed abnormally.' % test_case.test_name)
169
170    def generateFuzzerTests(self):
171        """Runs fuzzer tests."""
172        self.runGeneratedTests(
173            test_func=self.RunTestcase,
174            settings=self.CreateTestCases(),
175            name_func=lambda x: x.test_name)
176
177
178if __name__ == '__main__':
179    test_runner.main()
180