1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import os
19import shutil
20import tempfile
21
22import environment_variables as env
23import syzkaller_test_case
24
25from vts.runners.host import asserts
26from vts.runners.host import base_test
27from vts.runners.host import config_parser
28from vts.runners.host import keys
29from vts.runners.host import test_runner
30from vts.utils.python.common import cmd_utils
31from vts.utils.python.controllers import adb
32from vts.utils.python.controllers import android_device
33from vts.utils.python.gcs import gcs_api_utils
34
35
36class SyzkallerTest(base_test.BaseTestClass):
37    """Runs Syzkaller tests on target."""
38
39    start_vts_agents = False
40
41    def setUpClass(self):
42        """Creates a remote shell instance, and copies data files.
43
44        Attributes:
45            _env: dict, a mapping from key to environment path of this test.
46            _gcs_api_utils: a GcsApiUtils object used for uploading logs.
47            _dut: an android device controller object used for getting device info.
48        """
49
50        required_params = [
51            keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
52            keys.ConfigKeys.IKEY_FUZZING_GCS_BUCKET_NAME,
53            keys.ConfigKeys.IKEY_SYZKALLER_PACKAGES_PATH,
54            keys.ConfigKeys.IKEY_SYZKALLER_TEMPLATE_PATH
55        ]
56        self.getUserParams(required_params)
57
58        _temp_dir = tempfile.mkdtemp()
59        self._env = dict()
60        self._env['temp_dir'] = _temp_dir
61        self._env['syzkaller_dir'] = os.path.join(_temp_dir, env.syzkaller_dir)
62        self._env['syzkaller_bin_dir'] = os.path.join(_temp_dir,
63                                                      env.syzkaller_bin_dir)
64        self._env['syzkaller_work_dir'] = os.path.join(_temp_dir,
65                                                       env.syzkaller_work_dir)
66        self._env['template_cfg'] = os.path.join(_temp_dir, env.template_cfg)
67
68        self._gcs_api_utils = gcs_api_utils.GcsApiUtils(
69            self.service_key_json_path, self.fuzzing_gcs_bucket_name)
70        self._dut = self.android_devices[0]
71
72    def FetchSyzkaller(self):
73        """Fetch Syzkaller program from x20 and make sure files are executable."""
74        try:
75            logging.info('Fetching Syzkaller program.')
76            shutil.copytree(self.syzkaller_packages_path,
77                            self._env['syzkaller_dir'])
78            logging.info('Fetching Syzkaller template configuration.')
79            shutil.copy(self.syzkaller_template_path, self._env['temp_dir'])
80        except IOError, e:
81            logging.exception(e)
82            self.skipAllTests(
83                'Syzkaller program is not available. Skipping all tests.')
84
85        for root, dirs, files in os.walk(self._env['syzkaller_bin_dir']):
86            for filepath in files:
87                os.chmod(os.path.join(root, filepath), 0755)
88
89    def CreateTestCases(self):
90        """Create syzkaller test cases.
91
92        Returns:
93            test_cases, list, the list of test_cases for this test.
94        """
95        test_cases = []
96        test_cases.append(
97            syzkaller_test_case.SyzkallerTestCase(self._env, 'linux/arm64',
98                                                  self._dut.serial, 'adb',
99                                                  'false', 12))
100        return test_cases
101
102    def RunTestCase(self, test_case):
103        """Run a syzkaller test case.
104
105        Args:
106            test_case: SyzkallerTestCase object, the test case to run.
107        """
108        test_command = test_case.GetRunCommand()
109        stdout, stderr, err_code = cmd_utils.ExecuteOneShellCommand(
110            test_command, timeout=18000)
111        if err_code:
112            logging.error(stderr)
113        else:
114            logging.info(stdout)
115        self.ReportTestCase(test_case)
116
117    def ReportTestCase(self, test_case):
118        """Asserts the result of the test case and uploads report to GCS.
119
120        Args:
121            test_case: SyzkallerTestCase object, the test case to report.
122        """
123        self.AssertTestCaseResult(test_case)
124        self._gcs_api_utils.UploadDir(test_case._work_dir_path,
125                                      'kernelfuzz_reports')
126
127    def AssertTestCaseResult(self, test_case):
128        """Asserts that test case finished as expected.
129
130        If crash reports were generated during the test, reports test as failure.
131        If crash reports were not generated during the test, reports test as success.
132
133        Args:
134            test_case: SyzkallerTestCase object, the test case to assert
135                       as failure or success.
136        """
137        logging.info('Test case results.')
138        crash_dir = os.path.join(test_case._work_dir_path, 'crashes')
139        if os.listdir(crash_dir) == []:
140            logging.info('%s did not cause crash in our device.',
141                         test_case._test_name)
142        else:
143            asserts.fail('%s caused crash in our device.',
144                         test_case._test_name)
145
146    def tearDownClass(self):
147        """Removes the temporary directory used for Syzkaller."""
148        logging.debug('Temporary directory %s is being deleted',
149                      self._env['temp_dir'])
150        try:
151            shutil.rmtree(self._env['temp_dir'])
152        except OSError as e:
153            logging.exception(e)
154
155    def generateKernelFuzzerTests(self):
156        """Runs kernel fuzzer tests."""
157        self.FetchSyzkaller()
158        self.runGeneratedTests(
159            test_func=self.RunTestCase,
160            settings=self.CreateTestCases(),
161            name_func=lambda x: x._test_name)
162
163
164if __name__ == '__main__':
165    test_runner.main()
166