1# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module for determining coverage of fuzz targets."""
15import logging
16import os
17import sys
18import json
19import urllib.error
20import urllib.request
21
22# pylint: disable=wrong-import-position,import-error
23sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
24import utils
25
26# The path to get project's latest report json file.
27LATEST_REPORT_INFO_PATH = 'oss-fuzz-coverage/latest_report_info/'
28
29
30class OssFuzzCoverageGetter:
31  """Gets coverage data for a project from OSS-Fuzz."""
32
33  def __init__(self, project_name, repo_path):
34    """Constructor for OssFuzzCoverageGetter. Callers should check that
35    fuzzer_stats_url is initialized."""
36    self.project_name = project_name
37    self.repo_path = _normalize_repo_path(repo_path)
38    self.fuzzer_stats_url = _get_fuzzer_stats_dir_url(self.project_name)
39
40  def get_target_coverage_report(self, target):
41    """Get the coverage report for a specific fuzz target.
42
43    Args:
44      target: The name of the fuzz target whose coverage is requested.
45
46    Returns:
47      The target's coverage json dict or None on failure.
48    """
49    if not self.fuzzer_stats_url:
50      return None
51
52    target_url = utils.url_join(self.fuzzer_stats_url, target + '.json')
53    return get_json_from_url(target_url)
54
55  def get_files_covered_by_target(self, target):
56    """Gets a list of source files covered by the specific fuzz target.
57
58    Args:
59      target: The name of the fuzz target whose coverage is requested.
60
61    Returns:
62      A list of files that the fuzz targets covers or None.
63    """
64    target_cov = self.get_target_coverage_report(target)
65    if not target_cov:
66      return None
67
68    coverage_per_file = get_coverage_per_file(target_cov)
69    if not coverage_per_file:
70      logging.info('No files found in coverage report.')
71      return None
72
73    affected_file_list = []
74    for file_cov in coverage_per_file:
75      norm_file_path = os.path.normpath(file_cov['filename'])
76      if not norm_file_path.startswith(self.repo_path):
77        # Exclude files outside of the main repo.
78        continue
79
80      if not is_file_covered(file_cov):
81        # Don't consider a file affected if code in it is never executed.
82        continue
83
84      # TODO(metzman): It's weird to me that we access file_cov['filename']
85      # again and not norm_file_path, figure out if this makes sense.
86      relative_path = utils.remove_prefix(file_cov['filename'], self.repo_path)
87      affected_file_list.append(relative_path)
88
89    return affected_file_list
90
91
92def is_file_covered(file_cov):
93  """Returns whether the file is covered."""
94  return file_cov['summary']['regions']['covered']
95
96
97def get_coverage_per_file(target_cov):
98  """Returns the coverage per file within |target_cov|."""
99  return target_cov['data'][0]['files']
100
101
102def _normalize_repo_path(repo_path):
103  """Normalizes and returns |repo_path| to make sure cases like /src/curl and
104  /src/curl/ are both handled."""
105  repo_path = os.path.normpath(repo_path)
106  if not repo_path.endswith('/'):
107    repo_path += '/'
108  return repo_path
109
110
111def _get_latest_cov_report_info(project_name):
112  """Gets and returns a dictionary containing the latest coverage report info
113  for |project|."""
114  latest_report_info_url = utils.url_join(utils.GCS_BASE_URL,
115                                          LATEST_REPORT_INFO_PATH,
116                                          project_name + '.json')
117  latest_cov_info = get_json_from_url(latest_report_info_url)
118  if latest_cov_info is None:
119    logging.error('Could not get the coverage report json from url: %s.',
120                  latest_report_info_url)
121    return None
122  return latest_cov_info
123
124
125def _get_fuzzer_stats_dir_url(project_name):
126  """Gets latest coverage report info for a specific OSS-Fuzz project from GCS.
127
128  Args:
129    project_name: The name of the relevant OSS-Fuzz project.
130
131  Returns:
132    The projects coverage report info in json dict or None on failure.
133  """
134  latest_cov_info = _get_latest_cov_report_info(project_name)
135
136  if not latest_cov_info:
137    return None
138
139  if 'fuzzer_stats_dir' not in latest_cov_info:
140    logging.error('fuzzer_stats_dir not in latest coverage info.')
141    return None
142
143  fuzzer_stats_dir_gs_url = latest_cov_info['fuzzer_stats_dir']
144  fuzzer_stats_dir_url = utils.gs_url_to_https(fuzzer_stats_dir_gs_url)
145  return fuzzer_stats_dir_url
146
147
148def get_json_from_url(url):
149  """Gets a json object from a specified HTTP URL.
150
151  Args:
152    url: The url of the json to be downloaded.
153
154  Returns:
155    A dictionary deserialized from JSON or None on failure.
156  """
157  try:
158    response = urllib.request.urlopen(url)
159  except urllib.error.HTTPError:
160    logging.error('HTTP error with url %s.', url)
161    return None
162
163  try:
164    # read().decode() fixes compatibility issue with urllib response object.
165    result_json = json.loads(response.read().decode())
166  except (ValueError, TypeError, json.JSONDecodeError) as err:
167    logging.error('Loading json from url %s failed with: %s.', url, str(err))
168    return None
169  return result_json
170