1#!/usr/bin/env python
3# Copyright (C) 2017 The Android Open Source Project
5# Licensed under the Apache License, Version 2.0 (the 'License');
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9#      http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an 'AS IS' BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
18import logging
19import os
20import shutil
22from vts.runners.host import asserts
23from vts.runners.host import base_test
24from vts.runners.host import const
25from vts.runners.host import keys
26from vts.runners.host import test_runner
27from vts.utils.python.controllers import adb
28from vts.utils.python.controllers import android_device
29from vts.utils.python.common import list_utils
31from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config
32from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase
35class LibFuzzerTest(base_test.BaseTestClass):
36    """Runs LLVM libfuzzer tests on target.
38    Attributes:
39        _dut: AndroidDevice, the device under test as config.
40    """
42    def setUpClass(self):
43        """Creates a remote shell instance, and copies data files."""
44        required_params = [
45            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
46            keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
47        ]
48        self.getUserParams(required_params)
50        logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH,
51                     self.data_file_path)
52        logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
53                     self.binary_test_source)
55        self._dut = self.android_devices[0]
56        self._dut.stop()
57        self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR)
59    def tearDownClass(self):
60        """Deletes all copied data."""
61        self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR)
62        self._dut.start()
64    def PushFiles(self, src):
65        """adb pushes test case file to target."""
66        push_src = os.path.join(self.data_file_path, src)
67        push_dst = config.FUZZER_TEST_DIR
68        self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True)
69        logging.info('Adb pushed: %s \nto: %s', push_src, push_dst)
70        return push_dst
72    def CreateTestCases(self):
73        """Creates LibFuzzerTestCase instances.
75        Returns:
76            LibFuzzerTestCase list.
77        """
78        test_cases = map(
79            lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}),
80            self.binary_test_source)
81        return test_cases
83    def CreateCorpusOut(self, test_case):
84        """Creates corpus output directory on the target.
86        Args:
87            test_case: LibFuzzerTestCase object, current test case.
89        Throws:
90            throws an AdbError when there is an error in adb operations.
91        """
92        corpus_out = test_case.GetCorpusOutDir()
93        self._dut.adb.shell('mkdir %s -p' % corpus_out)
95    def RetrieveCorpusSeed(self, test_case):
96        """Retrieves corpus seed directory from GCS to the target.
98        Args:
99            test_case: LibFuzzerTestCase object, current test case.
101        Throws:
102            throws an AdbError when there is an error in adb operations.
104        Returns:
105            inuse_seed, the file path of the inuse seed in GCS, if fetch succeeded.
106            None, otherwise.
107        """
108        inuse_seed = self._corpus_manager.FetchCorpusSeed(
109            test_case._test_name, self._temp_dir)
110        local_corpus_seed_dir = os.path.join(
111            self._temp_dir, '%s_corpus_seed' % test_case._test_name)
112        if os.path.exists(local_corpus_seed_dir) and os.listdir(
113                local_corpus_seed_dir):
114            self._dut.adb.push(local_corpus_seed_dir, config.FUZZER_TEST_DIR)
115        else:
116            corpus_seed = test_case.GetCorpusSeedDir()
117            self._dut.adb.shell('mkdir %s -p' % corpus_seed)
118        return inuse_seed
120    def AnalyzeGeneratedCorpus(self, test_case):
121        """Analyzes the generated corpus body.
123        Args:
124            test_case: LibFuzzerTestCase object.
126        Returns:
127            number of newly generated corpus strings, if the out directory exists.
128            0, otherwise.
129        """
130        logging.info('temporary directory for this test: %s', self._temp_dir)
131        pulled_corpus_out_dir = os.path.join(
132            self._temp_dir, os.path.basename(test_case.GetCorpusOutDir()))
133        if os.path.exists(pulled_corpus_out_dir):
134            logging.info('corpus out directory pulled from target: %s',
135                         pulled_corpus_out_dir)
136            pulled_corpus = os.listdir(pulled_corpus_out_dir)
137            logging.debug(pulled_corpus)
138            logging.info('generated corpus size: %d', len(pulled_corpus))
139            return len(pulled_corpus)
140        else:
141            logging.error('corput out directory does not exist on the host.')
142            return 0
144    def EvaluateTestcase(self, test_case, result, inuse_seed):
145        """Evaluates the test result and moves the used seed accordingly.
147        Args:
148            test_case: LibFuzzerTestCase object.
149            result: a result dict object returned by the adb shell command.
150            inuse_seed: the seed used as input to this test case.
152        Raises:
153            signals.TestFailure when the testcase failed.
154        """
155        return_codes = result.get('return_codes', None)
156        if return_codes == config.ExitCode.FUZZER_TEST_PASS:
157            logging.info(
158                'adb shell fuzzing command exited normally with exitcode %d.',
159                result['return_codes'])
160            if inuse_seed is not None:
161                self._corpus_manager.InuseToDest(test_case._test_name,
162                                                 inuse_seed, 'corpus_complete')
163        elif return_codes == config.ExitCode.FUZZER_TEST_FAIL:
164            logging.info(
165                'adb shell fuzzing command exited normally with exitcode %d.',
166                result['return_codes'])
167            if inuse_seed is not None:
168                self._corpus_manager.InuseToDest(test_case._test_name,
169                                                 inuse_seed, 'corpus_crash')
170        else:
171            logging.error('adb shell fuzzing command exited abnormally.')
172            if inuse_seed is not None:
173                self._corpus_manager.InuseToDest(test_case._test_name,
174                                                 inuse_seed, 'corpus_error')
176    def RunTestcase(self, test_case):
177        """Runs the given test case and asserts the result.
179        Args:
180            test_case: LibFuzzerTestCase object.
181        """
182        self.PushFiles(test_case.bin_host_path)
183        self.CreateCorpusOut(test_case)
184        inuse_seed = self.RetrieveCorpusSeed(test_case)
185        if inuse_seed == 'locked':
186            # skip this test case
187            logging.warning('test case locked, skipping testcase %s.',
188                            test_case.test_name)
189            return
191        fuzz_cmd = '"%s"' % test_case.GetRunCommand()
193        result = {}
194        try:
195            result = self._dut.adb.shell(fuzz_cmd, no_except=True)
196        except adb.AdbError as e:
197            logging.exception(e)
199        corpus_trigger_dir = os.path.join(self._temp_dir,
200                                          test_case.GetCorpusTriggerDir())
201        os.makedirs(corpus_trigger_dir)
202        try:
203            self._dut.adb.pull(config.FUZZER_TEST_CRASH_REPORT,
204                               corpus_trigger_dir)
205        except adb.AdbError as e:
206            logging.exception(e)
207            logging.error('crash report was not created during test run.')
209        try:
210            self._dut.adb.pull(test_case.GetCorpusOutDir(), self._temp_dir)
211            self.AnalyzeGeneratedCorpus(test_case)
212            self._corpus_manager.UploadCorpusOutDir(test_case._test_name,
213                                                    self._temp_dir)
214        except adb.AdbError as e:
215            logging.exception(e)
216            logging.error('Device failed. Removing lock from GCS.')
217            self._corpus_manager.remove_lock(test_case._test_name)
219        if inuse_seed is not 'directory':
220            self.EvaluateTestcase(test_case, result, inuse_seed)
221        self.AssertTestResult(test_case, result)
223    def LogCrashReport(self, test_case):
224        """Logs crash-causing fuzzer input.
226        Reads the crash report file and logs the contents in format:
227        '\x01\x23\x45\x67\x89\xab\xcd\xef'
229        Args:
230            test_case: LibFuzzerTestCase object
231        """
232        touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT
233        self._dut.adb.shell(touch_cmd)
235        # output is string of a hexdump from crash report file.
236        # From the example above, output would be '0123456789abcdef'.
237        xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT
238        output = self._dut.adb.shell(xxd_cmd)
239        remove_chars = ['\r', '\t', '\n', ' ']
240        for char in remove_chars:
241            output = output.replace(char, '')
243        crash_report = ''
244        # output is guaranteed to be even in length since its a hexdump.
245        for offset in xrange(0, len(output), 2):
246            crash_report += '\\x%s' % output[offset:offset + 2]
248        logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"',
249                     test_case.test_name, crash_report)
251    # TODO(trong): differentiate between crashes and sanitizer rule violations.
252    def AssertTestResult(self, test_case, result):
253        """Asserts that test case finished as expected.
255        Checks that device is in responsive state. If not, waits for boot
256        then reports test as failure. If it is, asserts that all test commands
257        returned exit code 0.
259        Args:
260            test_case: LibFuzzerTestCase object
261            result: dict(str, str, int), command results from shell.
262        """
263        logging.info('Test case results.')
264        logging.info('stdout: %s' % result[const.STDOUT])
265        logging.info('stderr: %s' % result[const.STDERR])
266        logging.info('exit code: %s' % result[const.EXIT_CODE])
267        if not self._dut.hasBooted():
268            self._dut.waitForBootCompletion()
269            asserts.fail('%s left the device in unresponsive state.' %
270                         test_case.test_name)
272        exit_code = result[const.EXIT_CODE]
273        if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
274            #TODO(b/64123979): once normal fail happens, examine.
275            self.LogCrashReport(test_case)
276            asserts.fail('%s failed normally.' % test_case.test_name)
277        elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
278            asserts.fail('%s failed abnormally.' % test_case.test_name)
280    def tearDownClass(self):
281        """Removes the temporary directory used for corpus management."""
282        logging.debug('Temporary directory %s is being deleted',
283                      self._temp_dir)
284        try:
285            shutil.rmtree(self._temp_dir)
286        except OSError as e:
287            logging.exception(e)
289    def generateFuzzerTests(self):
290        """Runs fuzzer tests."""
291        self.runGeneratedTests(
292            test_func=self.RunTestcase,
293            settings=self.CreateTestCases(),
294            name_func=lambda x: x.test_name)
297if __name__ == '__main__':
298    test_runner.main()