1#!/usr/bin/env python
2#
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the 'License');
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an 'AS IS' BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import logging
19import os
20import shutil
21
22from vts.runners.host import asserts
23from vts.runners.host import base_test
24from vts.runners.host import const
25from vts.runners.host import keys
26from vts.runners.host import test_runner
27from vts.utils.python.controllers import adb
28from vts.utils.python.controllers import android_device
29from vts.utils.python.common import list_utils
30
31from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config
32from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase
33
34
35class LibFuzzerTest(base_test.BaseTestClass):
36    """Runs LLVM libfuzzer tests on target.
37
38    Attributes:
39        _dut: AndroidDevice, the device under test as config.
40    """
41
42    def setUpClass(self):
43        """Creates a remote shell instance, and copies data files."""
44        required_params = [
45            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
46            keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
47        ]
48        self.getUserParams(required_params)
49
50        logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH,
51                     self.data_file_path)
52        logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
53                     self.binary_test_source)
54
55        self._dut = self.android_devices[0]
56        self._dut.stop()
57        self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR)
58
59    def tearDownClass(self):
60        """Deletes all copied data."""
61        self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR)
62        self._dut.start()
63
64    def PushFiles(self, src):
65        """adb pushes test case file to target."""
66        push_src = os.path.join(self.data_file_path, src)
67        push_dst = config.FUZZER_TEST_DIR
68        self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True)
69        logging.info('Adb pushed: %s \nto: %s', push_src, push_dst)
70        return push_dst
71
72    def CreateTestCases(self):
73        """Creates LibFuzzerTestCase instances.
74
75        Returns:
76            LibFuzzerTestCase list.
77        """
78        test_cases = map(
79            lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}),
80            self.binary_test_source)
81        return test_cases
82
83    def CreateCorpusOut(self, test_case):
84        """Creates corpus output directory on the target.
85
86        Args:
87            test_case: LibFuzzerTestCase object, current test case.
88
89        Throws:
90            throws an AdbError when there is an error in adb operations.
91        """
92        corpus_out = test_case.GetCorpusOutDir()
93        self._dut.adb.shell('mkdir %s -p' % corpus_out)
94
95    def RetrieveCorpusSeed(self, test_case):
96        """Retrieves corpus seed directory from GCS to the target.
97
98        Args:
99            test_case: LibFuzzerTestCase object, current test case.
100
101        Throws:
102            throws an AdbError when there is an error in adb operations.
103
104        Returns:
105            inuse_seed, the file path of the inuse seed in GCS, if fetch succeeded.
106            None, otherwise.
107        """
108        inuse_seed = self._corpus_manager.FetchCorpusSeed(
109            test_case._test_name, self._temp_dir)
110        local_corpus_seed_dir = os.path.join(
111            self._temp_dir, '%s_corpus_seed' % test_case._test_name)
112        if os.path.exists(local_corpus_seed_dir) and os.listdir(
113                local_corpus_seed_dir):
114            self._dut.adb.push(local_corpus_seed_dir, config.FUZZER_TEST_DIR)
115        else:
116            corpus_seed = test_case.GetCorpusSeedDir()
117            self._dut.adb.shell('mkdir %s -p' % corpus_seed)
118        return inuse_seed
119
120    def AnalyzeGeneratedCorpus(self, test_case):
121        """Analyzes the generated corpus body.
122
123        Args:
124            test_case: LibFuzzerTestCase object.
125
126        Returns:
127            number of newly generated corpus strings, if the out directory exists.
128            0, otherwise.
129        """
130        logging.info('temporary directory for this test: %s', self._temp_dir)
131        pulled_corpus_out_dir = os.path.join(
132            self._temp_dir, os.path.basename(test_case.GetCorpusOutDir()))
133        if os.path.exists(pulled_corpus_out_dir):
134            logging.info('corpus out directory pulled from target: %s',
135                         pulled_corpus_out_dir)
136            pulled_corpus = os.listdir(pulled_corpus_out_dir)
137            logging.debug(pulled_corpus)
138            logging.info('generated corpus size: %d', len(pulled_corpus))
139            return len(pulled_corpus)
140        else:
141            logging.error('corput out directory does not exist on the host.')
142            return 0
143
144    def EvaluateTestcase(self, test_case, result, inuse_seed):
145        """Evaluates the test result and moves the used seed accordingly.
146
147        Args:
148            test_case: LibFuzzerTestCase object.
149            result: a result dict object returned by the adb shell command.
150            inuse_seed: the seed used as input to this test case.
151
152        Raises:
153            signals.TestFailure when the testcase failed.
154        """
155        return_codes = result.get('return_codes', None)
156        if return_codes == config.ExitCode.FUZZER_TEST_PASS:
157            logging.info(
158                'adb shell fuzzing command exited normally with exitcode %d.',
159                result['return_codes'])
160            if inuse_seed is not None:
161                self._corpus_manager.InuseToDest(test_case._test_name,
162                                                 inuse_seed, 'corpus_complete')
163        elif return_codes == config.ExitCode.FUZZER_TEST_FAIL:
164            logging.info(
165                'adb shell fuzzing command exited normally with exitcode %d.',
166                result['return_codes'])
167            if inuse_seed is not None:
168                self._corpus_manager.InuseToDest(test_case._test_name,
169                                                 inuse_seed, 'corpus_crash')
170        else:
171            logging.error('adb shell fuzzing command exited abnormally.')
172            if inuse_seed is not None:
173                self._corpus_manager.InuseToDest(test_case._test_name,
174                                                 inuse_seed, 'corpus_error')
175
176    def RunTestcase(self, test_case):
177        """Runs the given test case and asserts the result.
178
179        Args:
180            test_case: LibFuzzerTestCase object.
181        """
182        self.PushFiles(test_case.bin_host_path)
183        self.CreateCorpusOut(test_case)
184        inuse_seed = self.RetrieveCorpusSeed(test_case)
185        if inuse_seed == 'locked':
186            # skip this test case
187            logging.warning('test case locked, skipping testcase %s.',
188                            test_case.test_name)
189            return
190
191        fuzz_cmd = '"%s"' % test_case.GetRunCommand()
192
193        result = {}
194        try:
195            result = self._dut.adb.shell(fuzz_cmd, no_except=True)
196        except adb.AdbError as e:
197            logging.exception(e)
198
199        corpus_trigger_dir = os.path.join(self._temp_dir,
200                                          test_case.GetCorpusTriggerDir())
201        os.makedirs(corpus_trigger_dir)
202        try:
203            self._dut.adb.pull(config.FUZZER_TEST_CRASH_REPORT,
204                               corpus_trigger_dir)
205        except adb.AdbError as e:
206            logging.exception(e)
207            logging.error('crash report was not created during test run.')
208
209        try:
210            self._dut.adb.pull(test_case.GetCorpusOutDir(), self._temp_dir)
211            self.AnalyzeGeneratedCorpus(test_case)
212            self._corpus_manager.UploadCorpusOutDir(test_case._test_name,
213                                                    self._temp_dir)
214        except adb.AdbError as e:
215            logging.exception(e)
216            logging.error('Device failed. Removing lock from GCS.')
217            self._corpus_manager.remove_lock(test_case._test_name)
218
219        if inuse_seed is not 'directory':
220            self.EvaluateTestcase(test_case, result, inuse_seed)
221        self.AssertTestResult(test_case, result)
222
223    def LogCrashReport(self, test_case):
224        """Logs crash-causing fuzzer input.
225
226        Reads the crash report file and logs the contents in format:
227        '\x01\x23\x45\x67\x89\xab\xcd\xef'
228
229        Args:
230            test_case: LibFuzzerTestCase object
231        """
232        touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT
233        self._dut.adb.shell(touch_cmd)
234
235        # output is string of a hexdump from crash report file.
236        # From the example above, output would be '0123456789abcdef'.
237        xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT
238        output = self._dut.adb.shell(xxd_cmd)
239        remove_chars = ['\r', '\t', '\n', ' ']
240        for char in remove_chars:
241            output = output.replace(char, '')
242
243        crash_report = ''
244        # output is guaranteed to be even in length since its a hexdump.
245        for offset in xrange(0, len(output), 2):
246            crash_report += '\\x%s' % output[offset:offset + 2]
247
248        logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"',
249                     test_case.test_name, crash_report)
250
251    # TODO(trong): differentiate between crashes and sanitizer rule violations.
252    def AssertTestResult(self, test_case, result):
253        """Asserts that test case finished as expected.
254
255        Checks that device is in responsive state. If not, waits for boot
256        then reports test as failure. If it is, asserts that all test commands
257        returned exit code 0.
258
259        Args:
260            test_case: LibFuzzerTestCase object
261            result: dict(str, str, int), command results from shell.
262        """
263        logging.info('Test case results.')
264        logging.info('stdout: %s' % result[const.STDOUT])
265        logging.info('stderr: %s' % result[const.STDERR])
266        logging.info('exit code: %s' % result[const.EXIT_CODE])
267        if not self._dut.hasBooted():
268            self._dut.waitForBootCompletion()
269            asserts.fail('%s left the device in unresponsive state.' %
270                         test_case.test_name)
271
272        exit_code = result[const.EXIT_CODE]
273        if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
274            #TODO(b/64123979): once normal fail happens, examine.
275            self.LogCrashReport(test_case)
276            asserts.fail('%s failed normally.' % test_case.test_name)
277        elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
278            asserts.fail('%s failed abnormally.' % test_case.test_name)
279
280    def tearDownClass(self):
281        """Removes the temporary directory used for corpus management."""
282        logging.debug('Temporary directory %s is being deleted',
283                      self._temp_dir)
284        try:
285            shutil.rmtree(self._temp_dir)
286        except OSError as e:
287            logging.exception(e)
288
289    def generateFuzzerTests(self):
290        """Runs fuzzer tests."""
291        self.runGeneratedTests(
292            test_func=self.RunTestcase,
293            settings=self.CreateTestCases(),
294            name_func=lambda x: x.test_name)
295
296
297if __name__ == '__main__':
298    test_runner.main()
299