1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for running fuzzers."""
15import os
16import sys
17import shutil
18import tempfile
19import unittest
20from unittest import mock
21
22import parameterized
23from pyfakefs import fake_filesystem_unittest
24
25import config_utils
26import fuzz_target
27import run_fuzzers
28
29# pylint: disable=wrong-import-position
30INFRA_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
31sys.path.append(INFRA_DIR)
32
33import test_helpers
34
35# NOTE: This integration test relies on
36# https://github.com/google/oss-fuzz/tree/master/projects/example project.
37EXAMPLE_PROJECT = 'example'
38
39# Location of files used for testing.
40TEST_DATA_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
41                              'test_data')
42
43MEMORY_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'memory')
44MEMORY_FUZZER = 'curl_fuzzer_memory'
45
46UNDEFINED_FUZZER_DIR = os.path.join(TEST_DATA_PATH, 'undefined')
47UNDEFINED_FUZZER = 'curl_fuzzer_undefined'
48
49FUZZ_SECONDS = 10
50
51
52def _create_config(**kwargs):
53  """Creates a config object and then sets every attribute that is a key in
54  |kwargs| to the corresponding value. Asserts that each key in |kwargs| is an
55  attribute of Config."""
56  with mock.patch('os.path.basename', return_value=None), mock.patch(
57      'config_utils.get_project_src_path',
58      return_value=None), mock.patch('config_utils._is_dry_run',
59                                     return_value=True):
60    config = config_utils.RunFuzzersConfig()
61
62  for key, value in kwargs.items():
63    assert hasattr(config, key), 'Config doesn\'t have attribute: ' + key
64    setattr(config, key, value)
65  return config
66
67
68class RunFuzzerIntegrationTestMixin:  # pylint: disable=too-few-public-methods,invalid-name
69  """Mixin for integration test classes that runbuild_fuzzers on builds of a
70  specific sanitizer."""
71  # These must be defined by children.
72  FUZZER_DIR = None
73  FUZZER = None
74
75  def _test_run_with_sanitizer(self, fuzzer_dir, sanitizer):
76    """Calls run_fuzzers on fuzzer_dir and |sanitizer| and asserts
77    the run succeeded and that no bug was found."""
78    with test_helpers.temp_dir_copy(fuzzer_dir) as fuzzer_dir_copy:
79      config = _create_config(fuzz_seconds=FUZZ_SECONDS,
80                              workspace=fuzzer_dir_copy,
81                              project_name='curl',
82                              sanitizer=sanitizer)
83      result = run_fuzzers.run_fuzzers(config)
84    self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
85
86
87class RunMemoryFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
88                                     unittest.TestCase):
89  """Integration test for build_fuzzers with an MSAN build."""
90  FUZZER_DIR = MEMORY_FUZZER_DIR
91  FUZZER = MEMORY_FUZZER
92
93  @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
94                   'INTEGRATION_TESTS=1 not set')
95  def test_run_with_memory_sanitizer(self):
96    """Tests run_fuzzers with a valid MSAN build."""
97    self._test_run_with_sanitizer(self.FUZZER_DIR, 'memory')
98
99
100class RunUndefinedFuzzerIntegrationTest(RunFuzzerIntegrationTestMixin,
101                                        unittest.TestCase):
102  """Integration test for build_fuzzers with an UBSAN build."""
103  FUZZER_DIR = UNDEFINED_FUZZER_DIR
104  FUZZER = UNDEFINED_FUZZER
105
106  @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
107                   'INTEGRATION_TESTS=1 not set')
108  def test_run_with_undefined_sanitizer(self):
109    """Tests run_fuzzers with a valid UBSAN build."""
110    self._test_run_with_sanitizer(self.FUZZER_DIR, 'undefined')
111
112
113class BaseFuzzTargetRunnerTest(unittest.TestCase):
114  """Tests BaseFuzzTargetRunner."""
115
116  def _create_runner(self, **kwargs):  # pylint: disable=no-self-use
117    defaults = {'fuzz_seconds': FUZZ_SECONDS, 'project_name': EXAMPLE_PROJECT}
118    for default_key, default_value in defaults.items():
119      if default_key not in kwargs:
120        kwargs[default_key] = default_value
121
122    config = _create_config(**kwargs)
123    return run_fuzzers.BaseFuzzTargetRunner(config)
124
125  def _test_initialize_fail(self, expected_error_args, **create_runner_kwargs):
126    with mock.patch('logging.error') as mocked_error:
127      runner = self._create_runner(**create_runner_kwargs)
128      self.assertFalse(runner.initialize())
129      mocked_error.assert_called_with(*expected_error_args)
130
131  @parameterized.parameterized.expand([(0,), (None,), (-1,)])
132  def test_initialize_invalid_fuzz_seconds(self, fuzz_seconds):
133    """Tests initialize fails with an invalid fuzz seconds."""
134    expected_error_args = ('Fuzz_seconds argument must be greater than 1, '
135                           'but was: %s.', fuzz_seconds)
136    with tempfile.TemporaryDirectory() as tmp_dir:
137      out_path = os.path.join(tmp_dir, 'out')
138      os.mkdir(out_path)
139      with mock.patch('utils.get_fuzz_targets') as mocked_get_fuzz_targets:
140        mocked_get_fuzz_targets.return_value = [
141            os.path.join(out_path, 'fuzz_target')
142        ]
143        self._test_initialize_fail(expected_error_args,
144                                   fuzz_seconds=fuzz_seconds,
145                                   workspace=tmp_dir)
146
147  def test_initialize_no_out_dir(self):
148    """Tests initialize fails with no out dir."""
149    with tempfile.TemporaryDirectory() as tmp_dir:
150      out_path = os.path.join(tmp_dir, 'out')
151      expected_error_args = ('Out directory: %s does not exist.', out_path)
152      self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
153
154  def test_initialize_nonempty_artifacts(self):
155    """Tests initialize with a file artifacts path."""
156    with tempfile.TemporaryDirectory() as tmp_dir:
157      out_path = os.path.join(tmp_dir, 'out')
158      os.mkdir(out_path)
159      artifacts_path = os.path.join(out_path, 'artifacts')
160      with open(artifacts_path, 'w') as artifacts_handle:
161        artifacts_handle.write('fake')
162      expected_error_args = (
163          'Artifacts path: %s exists and is not an empty directory.',
164          artifacts_path)
165      self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
166
167  def test_initialize_bad_artifacts(self):
168    """Tests initialize with a non-empty artifacts path."""
169    with tempfile.TemporaryDirectory() as tmp_dir:
170      out_path = os.path.join(tmp_dir, 'out')
171      artifacts_path = os.path.join(out_path, 'artifacts')
172      os.makedirs(artifacts_path)
173      artifact_path = os.path.join(artifacts_path, 'artifact')
174      with open(artifact_path, 'w') as artifact_handle:
175        artifact_handle.write('fake')
176      expected_error_args = (
177          'Artifacts path: %s exists and is not an empty directory.',
178          artifacts_path)
179      self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
180
181  @mock.patch('utils.get_fuzz_targets')
182  @mock.patch('logging.error')
183  def test_initialize_empty_artifacts(self, mocked_log_error,
184                                      mocked_get_fuzz_targets):
185    """Tests initialize with an empty artifacts dir."""
186    mocked_get_fuzz_targets.return_value = ['fuzz-target']
187    with tempfile.TemporaryDirectory() as tmp_dir:
188      out_path = os.path.join(tmp_dir, 'out')
189      artifacts_path = os.path.join(out_path, 'artifacts')
190      os.makedirs(artifacts_path)
191      runner = self._create_runner(workspace=tmp_dir)
192      self.assertTrue(runner.initialize())
193      mocked_log_error.assert_not_called()
194      self.assertTrue(os.path.isdir(artifacts_path))
195
196  @mock.patch('utils.get_fuzz_targets')
197  @mock.patch('logging.error')
198  def test_initialize_no_artifacts(self, mocked_log_error,
199                                   mocked_get_fuzz_targets):
200    """Tests initialize with no artifacts dir (the expected setting)."""
201    mocked_get_fuzz_targets.return_value = ['fuzz-target']
202    with tempfile.TemporaryDirectory() as tmp_dir:
203      out_path = os.path.join(tmp_dir, 'out')
204      os.makedirs(out_path)
205      runner = self._create_runner(workspace=tmp_dir)
206      self.assertTrue(runner.initialize())
207      mocked_log_error.assert_not_called()
208      self.assertTrue(os.path.isdir(os.path.join(out_path, 'artifacts')))
209
210  def test_initialize_no_fuzz_targets(self):
211    """Tests initialize with no fuzz targets."""
212    with tempfile.TemporaryDirectory() as tmp_dir:
213      out_path = os.path.join(tmp_dir, 'out')
214      os.makedirs(out_path)
215      expected_error_args = ('No fuzz targets were found in out directory: %s.',
216                             out_path)
217      self._test_initialize_fail(expected_error_args, workspace=tmp_dir)
218
219  def test_get_fuzz_target_artifact(self):
220    """Tests that get_fuzz_target_artifact works as intended."""
221    runner = self._create_runner()
222    artifacts_dir = 'artifacts-dir'
223    runner.artifacts_dir = artifacts_dir
224    artifact_name = 'artifact-name'
225    target = mock.MagicMock()
226    target_name = 'target_name'
227    target.target_name = target_name
228    fuzz_target_artifact = runner.get_fuzz_target_artifact(
229        target, artifact_name)
230    expected_fuzz_target_artifact = (
231        'artifacts-dir/target_name-address-artifact-name')
232    self.assertEqual(fuzz_target_artifact, expected_fuzz_target_artifact)
233
234
235class CiFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
236  """Tests that CiFuzzTargetRunner works as intended."""
237
238  def setUp(self):
239    self.setUpPyfakefs()
240
241  @mock.patch('utils.get_fuzz_targets')
242  @mock.patch('run_fuzzers.CiFuzzTargetRunner.run_fuzz_target')
243  @mock.patch('run_fuzzers.CiFuzzTargetRunner.create_fuzz_target_obj')
244  def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
245                                  mocked_run_fuzz_target,
246                                  mocked_get_fuzz_targets):
247    """Tests that run_fuzz_targets quits on the first crash it finds."""
248    workspace = 'workspace'
249    out_path = os.path.join(workspace, 'out')
250    self.fs.create_dir(out_path)
251    config = _create_config(fuzz_seconds=FUZZ_SECONDS,
252                            workspace=workspace,
253                            project_name=EXAMPLE_PROJECT)
254    runner = run_fuzzers.CiFuzzTargetRunner(config)
255
256    mocked_get_fuzz_targets.return_value = ['target1', 'target2']
257    runner.initialize()
258    testcase = os.path.join(workspace, 'testcase')
259    self.fs.create_file(testcase)
260    stacktrace = b'stacktrace'
261    mocked_run_fuzz_target.return_value = fuzz_target.FuzzResult(
262        testcase, stacktrace)
263    magic_mock = mock.MagicMock()
264    magic_mock.target_name = 'target1'
265    mocked_create_fuzz_target_obj.return_value = magic_mock
266    self.assertTrue(runner.run_fuzz_targets())
267    self.assertIn('target1-address-testcase', os.listdir(runner.artifacts_dir))
268    self.assertEqual(mocked_run_fuzz_target.call_count, 1)
269
270
271class BatchFuzzTargetRunnerTest(fake_filesystem_unittest.TestCase):
272  """Tests that CiFuzzTargetRunner works as intended."""
273
274  def setUp(self):
275    self.setUpPyfakefs()
276
277  @mock.patch('utils.get_fuzz_targets')
278  @mock.patch('run_fuzzers.BatchFuzzTargetRunner.run_fuzz_target')
279  @mock.patch('run_fuzzers.BatchFuzzTargetRunner.create_fuzz_target_obj')
280  def test_run_fuzz_targets_quits(self, mocked_create_fuzz_target_obj,
281                                  mocked_run_fuzz_target,
282                                  mocked_get_fuzz_targets):
283    """Tests that run_fuzz_targets doesn't quit on the first crash it finds."""
284    workspace = 'workspace'
285    out_path = os.path.join(workspace, 'out')
286    self.fs.create_dir(out_path)
287    config = _create_config(fuzz_seconds=FUZZ_SECONDS,
288                            workspace=workspace,
289                            project_name=EXAMPLE_PROJECT)
290    runner = run_fuzzers.BatchFuzzTargetRunner(config)
291
292    mocked_get_fuzz_targets.return_value = ['target1', 'target2']
293    runner.initialize()
294    testcase1 = os.path.join(workspace, 'testcase-aaa')
295    testcase2 = os.path.join(workspace, 'testcase-bbb')
296    self.fs.create_file(testcase1)
297    self.fs.create_file(testcase2)
298    stacktrace = b'stacktrace'
299    call_count = 0
300
301    def mock_run_fuzz_target(_):
302      nonlocal call_count
303      if call_count == 0:
304        testcase = testcase1
305      elif call_count == 1:
306        testcase = testcase2
307      assert call_count != 2
308      call_count += 1
309      return fuzz_target.FuzzResult(testcase, stacktrace)
310
311    mocked_run_fuzz_target.side_effect = mock_run_fuzz_target
312    magic_mock = mock.MagicMock()
313    magic_mock.target_name = 'target1'
314    mocked_create_fuzz_target_obj.return_value = magic_mock
315    self.assertTrue(runner.run_fuzz_targets())
316    self.assertIn('target1-address-testcase-aaa',
317                  os.listdir(runner.artifacts_dir))
318    self.assertEqual(mocked_run_fuzz_target.call_count, 2)
319
320
321class RunAddressFuzzersIntegrationTest(RunFuzzerIntegrationTestMixin,
322                                       unittest.TestCase):
323  """Integration tests for build_fuzzers with an ASAN build."""
324
325  BUILD_DIR_NAME = 'cifuzz-latest-build'
326
327  @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
328                   'INTEGRATION_TESTS=1 not set')
329  def test_new_bug_found(self):
330    """Tests run_fuzzers with a valid ASAN build."""
331    # Set the first return value to True, then the second to False to
332    # emulate a bug existing in the current PR but not on the downloaded
333    # OSS-Fuzz build.
334    with mock.patch('fuzz_target.FuzzTarget.is_reproducible',
335                    side_effect=[True, False]):
336      with tempfile.TemporaryDirectory() as tmp_dir:
337        workspace = os.path.join(tmp_dir, 'workspace')
338        shutil.copytree(TEST_DATA_PATH, workspace)
339        config = _create_config(fuzz_seconds=FUZZ_SECONDS,
340                                workspace=workspace,
341                                project_name=EXAMPLE_PROJECT)
342        result = run_fuzzers.run_fuzzers(config)
343        self.assertEqual(result, run_fuzzers.RunFuzzersResult.BUG_FOUND)
344        build_dir = os.path.join(workspace, 'out', self.BUILD_DIR_NAME)
345        self.assertNotEqual(0, len(os.listdir(build_dir)))
346
347  @unittest.skipIf(not os.getenv('INTEGRATION_TESTS'),
348                   'INTEGRATION_TESTS=1 not set')
349  @mock.patch('fuzz_target.FuzzTarget.is_reproducible',
350              side_effect=[True, True])
351  def test_old_bug_found(self, _):
352    """Tests run_fuzzers with a bug found in OSS-Fuzz before."""
353    config = _create_config(fuzz_seconds=FUZZ_SECONDS,
354                            workspace=TEST_DATA_PATH,
355                            project_name=EXAMPLE_PROJECT)
356    with tempfile.TemporaryDirectory() as tmp_dir:
357      workspace = os.path.join(tmp_dir, 'workspace')
358      shutil.copytree(TEST_DATA_PATH, workspace)
359      config = _create_config(fuzz_seconds=FUZZ_SECONDS,
360                              workspace=TEST_DATA_PATH,
361                              project_name=EXAMPLE_PROJECT)
362      result = run_fuzzers.run_fuzzers(config)
363      self.assertEqual(result, run_fuzzers.RunFuzzersResult.NO_BUG_FOUND)
364      build_dir = os.path.join(TEST_DATA_PATH, 'out', self.BUILD_DIR_NAME)
365      self.assertTrue(os.path.exists(build_dir))
366      self.assertNotEqual(0, len(os.listdir(build_dir)))
367
368  def test_invalid_build(self):
369    """Tests run_fuzzers with an invalid ASAN build."""
370    with tempfile.TemporaryDirectory() as tmp_dir:
371      out_path = os.path.join(tmp_dir, 'out')
372      os.mkdir(out_path)
373      config = _create_config(fuzz_seconds=FUZZ_SECONDS,
374                              workspace=tmp_dir,
375                              project_name=EXAMPLE_PROJECT)
376      result = run_fuzzers.run_fuzzers(config)
377    self.assertEqual(result, run_fuzzers.RunFuzzersResult.ERROR)
378
379
380if __name__ == '__main__':
381  unittest.main()
382