# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module for running fuzzers.""" import enum import logging import os import shutil import sys import time import clusterfuzz_deployment import fuzz_target import stack_parser # pylint: disable=wrong-import-position,import-error sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import utils class RunFuzzersResult(enum.Enum): """Enum result from running fuzzers.""" ERROR = 0 BUG_FOUND = 1 NO_BUG_FOUND = 2 class BaseFuzzTargetRunner: """Base class for fuzzer runners.""" def __init__(self, config): self.config = config self.clusterfuzz_deployment = ( clusterfuzz_deployment.get_clusterfuzz_deployment(self.config)) # Set by the initialize method. self.out_dir = None self.fuzz_target_paths = None self.artifacts_dir = None def initialize(self): """Initialization method. Must be called before calling run_fuzz_targets. Returns True on success.""" # Use a seperate initialization function so we can return False on failure # instead of exceptioning like we need to do if this were done in the # __init__ method. logging.info('Using %s sanitizer.', self.config.sanitizer) # TODO(metzman) Add a check to ensure we aren't over time limit. if not self.config.fuzz_seconds or self.config.fuzz_seconds < 1: logging.error( 'Fuzz_seconds argument must be greater than 1, but was: %s.', self.config.fuzz_seconds) return False self.out_dir = os.path.join(self.config.workspace, 'out') if not os.path.exists(self.out_dir): logging.error('Out directory: %s does not exist.', self.out_dir) return False self.artifacts_dir = os.path.join(self.out_dir, 'artifacts') if not os.path.exists(self.artifacts_dir): os.mkdir(self.artifacts_dir) elif (not os.path.isdir(self.artifacts_dir) or os.listdir(self.artifacts_dir)): logging.error('Artifacts path: %s exists and is not an empty directory.', self.artifacts_dir) return False self.fuzz_target_paths = utils.get_fuzz_targets(self.out_dir) logging.info('Fuzz targets: %s', self.fuzz_target_paths) if not self.fuzz_target_paths: logging.error('No fuzz targets were found in out directory: %s.', self.out_dir) return False return True def run_fuzz_target(self, fuzz_target_obj): # pylint: disable=no-self-use """Fuzzes with |fuzz_target_obj| and returns the result.""" # TODO(metzman): Make children implement this so that the batch runner can # do things differently. result = fuzz_target_obj.fuzz() fuzz_target_obj.free_disk_if_needed() return result @property def quit_on_bug_found(self): """Property that is checked to determine if fuzzing should quit after first bug is found.""" raise NotImplementedError('Child class must implement method') def get_fuzz_target_artifact(self, target, artifact_name): """Returns the path of a fuzzing artifact named |artifact_name| for |fuzz_target|.""" artifact_name = '{target_name}-{sanitizer}-{artifact_name}'.format( target_name=target.target_name, sanitizer=self.config.sanitizer, artifact_name=artifact_name) return os.path.join(self.artifacts_dir, artifact_name) def create_fuzz_target_obj(self, target_path, run_seconds): """Returns a fuzz target object.""" return fuzz_target.FuzzTarget(target_path, run_seconds, self.out_dir, self.clusterfuzz_deployment, self.config) def run_fuzz_targets(self): """Runs fuzz targets. Returns True if a bug was found.""" fuzzers_left_to_run = len(self.fuzz_target_paths) # Make a copy since we will mutate it. fuzz_seconds = self.config.fuzz_seconds min_seconds_per_fuzzer = fuzz_seconds // fuzzers_left_to_run bug_found = False for target_path in self.fuzz_target_paths: # By doing this, we can ensure that every fuzz target runs for at least # min_seconds_per_fuzzer, but that other fuzzers will have longer to run # if one ends early. run_seconds = max(fuzz_seconds // fuzzers_left_to_run, min_seconds_per_fuzzer) target = self.create_fuzz_target_obj(target_path, run_seconds) start_time = time.time() result = self.run_fuzz_target(target) # It's OK if this goes negative since we take max when determining # run_seconds. fuzz_seconds -= time.time() - start_time fuzzers_left_to_run -= 1 if not result.testcase or not result.stacktrace: logging.info('Fuzzer %s finished running without crashes.', target.target_name) continue # TODO(metzman): Do this with filestore. testcase_artifact_path = self.get_fuzz_target_artifact( target, os.path.basename(result.testcase)) shutil.move(result.testcase, testcase_artifact_path) bug_summary_artifact_path = self.get_fuzz_target_artifact( target, 'bug-summary.txt') stack_parser.parse_fuzzer_output(result.stacktrace, bug_summary_artifact_path) bug_found = True if self.quit_on_bug_found: logging.info('Bug found. Stopping fuzzing.') return bug_found return bug_found class CiFuzzTargetRunner(BaseFuzzTargetRunner): """Runner for fuzz targets used in CI (patch-fuzzing) context.""" @property def quit_on_bug_found(self): return True class BatchFuzzTargetRunner(BaseFuzzTargetRunner): """Runner for fuzz targets used in batch fuzzing context.""" @property def quit_on_bug_found(self): return False def get_fuzz_target_runner(config): """Returns a fuzz target runner object based on the run_fuzzers_mode of |config|.""" logging.info('RUN_FUZZERS_MODE is: %s', config.run_fuzzers_mode) if config.run_fuzzers_mode == 'batch': return BatchFuzzTargetRunner(config) return CiFuzzTargetRunner(config) def run_fuzzers(config): # pylint: disable=too-many-locals """Runs fuzzers for a specific OSS-Fuzz project. Args: config: A RunFuzzTargetsConfig. Returns: A RunFuzzersResult enum value indicating what happened during fuzzing. """ fuzz_target_runner = get_fuzz_target_runner(config) if not fuzz_target_runner.initialize(): # We didn't fuzz at all because of internal (CIFuzz) errors. And we didn't # find any bugs. return RunFuzzersResult.ERROR if not fuzz_target_runner.run_fuzz_targets(): # We fuzzed successfully, but didn't find any bugs (in the fuzz target). return RunFuzzersResult.NO_BUG_FOUND # We fuzzed successfully and found bug(s) in the fuzz targets. return RunFuzzersResult.BUG_FOUND