1# 2# Copyright (C) 2016 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import io 16import logging 17import os 18import shutil 19import time 20import zipfile 21 22from google.protobuf.internal.containers import RepeatedCompositeFieldContainer 23 24from vts.runners.host import keys 25from vts.utils.python.archive import archive_parser 26from vts.utils.python.build.api import artifact_fetcher 27from vts.utils.python.coverage import coverage_report 28from vts.utils.python.coverage import gcda_parser 29from vts.utils.python.coverage import gcno_parser 30from vts.utils.python.coverage.parser import FileFormatError 31from vts.utils.python.web import feature_utils 32 33TARGET_COVERAGE_PATH = "/data/misc/gcov/" 34LOCAL_COVERAGE_PATH = "/tmp/vts-test-coverage" 35 36GCNO_SUFFIX = ".gcno" 37GCDA_SUFFIX = ".gcda" 38COVERAGE_SUFFIX = ".gcnodir" 39GIT_PROJECT = "git_project" 40MODULE_NAME = "module_name" 41NAME = "name" 42PATH = "path" 43 44_BRANCH = "master" # TODO: make this a runtime parameter 45_CHECKSUM_GCNO_DICT = "checksum_gcno_dict" 46_COVERAGE_ZIP = "coverage_zip" 47_REVISION_DICT = "revision_dict" 48 49 50class CoverageFeature(feature_utils.Feature): 51 """Feature object for coverage functionality. 52 53 Attributes: 54 enabled: boolean, True if coverage is enabled, False otherwise 55 web: (optional) WebFeature, object storing web feature util for test run 56 """ 57 58 _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_COVERAGE 59 _REQUIRED_PARAMS = [ 60 keys.ConfigKeys.IKEY_ANDROID_DEVICE, 61 keys.ConfigKeys.IKEY_SERVICE_JSON_PATH, 62 keys.ConfigKeys.IKEY_ANDROID_DEVICE 63 ] 64 _OPTIONAL_PARAMS = [keys.ConfigKeys.IKEY_MODULES] 65 66 def __init__(self, user_params, web=None): 67 """Initializes the coverage feature. 68 69 Args: 70 user_params: A dictionary from parameter name (String) to parameter value. 71 web: (optional) WebFeature, object storing web feature util for test run 72 """ 73 self.ParseParameters(self._TOGGLE_PARAM, self._REQUIRED_PARAMS, self._OPTIONAL_PARAMS, 74 user_params) 75 self.web = web 76 logging.info("Coverage enabled: %s", self.enabled) 77 78 def _ExtractSourceName(self, gcno_summary, file_name): 79 """Gets the source name from the GCNO summary object. 80 81 Gets the original source file name from the FileSummary object describing 82 a gcno file using the base filename of the gcno/gcda file. 83 84 Args: 85 gcno_summary: a FileSummary object describing a gcno file 86 file_name: the base filename (without extensions) of the gcno or gcda file 87 88 Returns: 89 The relative path to the original source file corresponding to the 90 provided gcno summary. The path is relative to the root of the build. 91 """ 92 src_file_path = None 93 for key in gcno_summary.functions: 94 src_file_path = gcno_summary.functions[key].src_file_name 95 src_parts = src_file_path.rsplit(".", 1) 96 src_file_name = src_parts[0] 97 src_extension = src_parts[1] if len(src_parts) > 1 else None 98 if src_extension not in ["c", "cpp", "cc"]: 99 logging.warn("Found unsupported file type: %s", src_file_path) 100 continue 101 if src_file_name.endswith(file_name): 102 logging.info("Coverage source file: %s", src_file_path) 103 break 104 return src_file_path 105 106 def _GetChecksumGcnoDict(self, cov_zip): 107 """Generates a dictionary from gcno checksum to GCNOParser object. 108 109 Processes the gcnodir files in the zip file to produce a mapping from gcno 110 checksum to the GCNOParser object wrapping the gcno content. 111 112 Args: 113 cov_zip: the zip file containing gcnodir files from the device build 114 115 Returns: 116 the dictionary of gcno checksums to GCNOParser objects 117 """ 118 checksum_gcno_dict = dict() 119 fnames = cov_zip.namelist() 120 instrumented_modules = [f for f in fnames if f.endswith(COVERAGE_SUFFIX)] 121 for instrumented_module in instrumented_modules: 122 # Read the gcnodir file 123 archive = archive_parser.Archive(cov_zip.open(instrumented_module).read()) 124 try: 125 archive.Parse() 126 except ValueError: 127 logging.error("Archive could not be parsed: %s", name) 128 continue 129 130 for gcno_file_path in archive.files: 131 file_name_path = gcno_file_path.rsplit(".", 1)[0] 132 file_name = os.path.basename(file_name_path) 133 gcno_stream = io.BytesIO(archive.files[gcno_file_path]) 134 gcno_file_parser = gcno_parser.GCNOParser(gcno_stream) 135 checksum_gcno_dict[gcno_file_parser.checksum] = gcno_file_parser 136 return checksum_gcno_dict 137 138 def InitializeDeviceCoverage(self, dut): 139 """Initializes the device for coverage before tests run. 140 141 Finds and removes all gcda files under TARGET_COVERAGE_PATH before tests 142 run. 143 144 Args: 145 dut: the device under test. 146 """ 147 logging.info("Removing existing gcda files.") 148 gcda_files = dut.adb.shell("find %s -name \"*.gcda\" -type f -delete" % 149 TARGET_COVERAGE_PATH) 150 151 def GetGcdaDict(self, dut, local_coverage_path=None): 152 """Retrieves GCDA files from device and creates a dictionary of files. 153 154 Find all GCDA files on the target device, copy them to the host using 155 adb, then return a dictionary mapping from the gcda basename to the 156 temp location on the host. 157 158 Args: 159 dut: the device under test. 160 local_coverage_path: the host path (string) in which to copy gcda files 161 162 Returns: 163 A dictionary with gcda basenames as keys and contents as the values. 164 """ 165 logging.info("Creating gcda dictionary") 166 gcda_dict = {} 167 if not local_coverage_path: 168 timestamp = str(int(time.time() * 1000000)) 169 local_coverage_path = os.path.join(LOCAL_COVERAGE_PATH, timestamp) 170 if os.path.exists(local_coverage_path): 171 shutil.rmtree(local_coverage_path) 172 os.makedirs(local_coverage_path) 173 logging.info("Storing gcda tmp files to: %s", local_coverage_path) 174 gcda_files = dut.adb.shell("find %s -name \"*.gcda\"" % 175 TARGET_COVERAGE_PATH).split("\n") 176 for gcda in gcda_files: 177 if gcda: 178 basename = os.path.basename(gcda.strip()) 179 file_name = os.path.join(local_coverage_path, 180 basename) 181 dut.adb.pull("%s %s" % (gcda, file_name)) 182 gcda_content = open(file_name, "rb").read() 183 gcda_dict[basename] = gcda_content 184 return gcda_dict 185 186 def _AutoProcess(self, gcda_dict, isGlobal): 187 """Process coverage data and appends coverage reports to the report message. 188 189 Matches gcno files with gcda files and processes them into a coverage report 190 with references to the original source code used to build the system image. 191 Coverage information is appended as a CoverageReportMessage to the provided 192 report message. 193 194 Git project information is automatically extracted from the build info and 195 the source file name enclosed in each gcno file. Git project names must 196 resemble paths and may differ from the paths to their project root by at 197 most one. If no match is found, then coverage information will not be 198 be processed. 199 200 e.g. if the project path is test/vts, then its project name may be 201 test/vts or <some folder>/test/vts in order to be recognized. 202 203 Args: 204 gcda_dict: the dictionary of gcda basenames to gcda content (binary string) 205 isGlobal: boolean, True if the coverage data is for the entire test, False if only for 206 the current test case. 207 """ 208 revision_dict = getattr(self, _REVISION_DICT, None) 209 checksum_gcno_dict = getattr(self, _CHECKSUM_GCNO_DICT, None) 210 for gcda_name in gcda_dict: 211 gcda_stream = io.BytesIO(gcda_dict[gcda_name]) 212 gcda_file_parser = gcda_parser.GCDAParser(gcda_stream) 213 214 if not gcda_file_parser.checksum in checksum_gcno_dict: 215 logging.info("No matching gcno file for gcda: %s", gcda_name) 216 continue 217 gcno_file_parser = checksum_gcno_dict[gcda_file_parser.checksum] 218 219 try: 220 gcno_summary = gcno_file_parser.Parse() 221 except FileFormatError: 222 logging.error("Error parsing gcno for gcda %s", gcda_name) 223 continue 224 225 file_name = gcda_name.rsplit(".", 1)[0] 226 src_file_path = self._ExtractSourceName(gcno_summary, file_name) 227 228 if not src_file_path: 229 logging.error("No source file found for gcda %s.", gcda_name) 230 continue 231 232 # Process and merge gcno/gcda data 233 try: 234 gcda_file_parser.Parse(gcno_summary) 235 except FileFormatError: 236 logging.error("Error parsing gcda file %s", gcda_name) 237 continue 238 239 # Get the git project information 240 # Assumes that the project name and path to the project root are similar 241 revision = None 242 for project_name in revision_dict: 243 # Matches cases when source file root and project name are the same 244 if src_file_path.startswith(str(project_name)): 245 git_project_name = str(project_name) 246 git_project_path = str(project_name) 247 revision = str(revision_dict[project_name]) 248 logging.info("Source file '%s' matched with project '%s'", 249 src_file_path, git_project_name) 250 break 251 252 parts = os.path.normpath(str(project_name)).split(os.sep, 1) 253 # Matches when project name has an additional prefix before the 254 # project path root. 255 if len(parts) > 1 and src_file_path.startswith(parts[-1]): 256 git_project_name = str(project_name) 257 git_project_path = parts[-1] 258 revision = str(revision_dict[project_name]) 259 logging.info("Source file '%s' matched with project '%s'", 260 src_file_path, git_project_name) 261 262 if not revision: 263 logging.info("Could not find git info for %s", src_file_path) 264 continue 265 266 if self.web and self.web.enabled: 267 coverage_vec = coverage_report.GenerateLineCoverageVector( 268 src_file_path, gcno_summary) 269 total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec) 270 self.web.AddCoverageReport( 271 coverage_vec, src_file_path, git_project_name, git_project_path, 272 revision, covered_count, total_count, isGlobal) 273 274 def _ManualProcess(self, gcda_dict, isGlobal): 275 """Process coverage data and appends coverage reports to the report message. 276 277 Opens the gcno files in the cov_zip for the specified modules and matches 278 gcno/gcda files. Then, coverage vectors are generated for each set of matching 279 gcno/gcda files and appended as a CoverageReportMessage to the provided 280 report message. Unlike AutoProcess, coverage information is only processed 281 for the modules explicitly defined in 'modules'. 282 283 Args: 284 gcda_dict: the dictionary of gcda basenames to gcda content (binary string) 285 isGlobal: boolean, True if the coverage data is for the entire test, False if only for 286 the current test case. 287 """ 288 cov_zip = getattr(self, _COVERAGE_ZIP, None) 289 revision_dict = getattr(self, _REVISION_DICT, None) 290 modules = getattr(self, keys.ConfigKeys.IKEY_MODULES, None) 291 covered_modules = set(cov_zip.namelist()) 292 for module in modules: 293 if MODULE_NAME not in module or GIT_PROJECT not in module: 294 logging.error("Coverage module must specify name and git project: %s", 295 module) 296 continue 297 project = module[GIT_PROJECT] 298 if PATH not in project or NAME not in project: 299 logging.error("Project name and path not specified: %s", project) 300 continue 301 302 name = str(module[MODULE_NAME]) + COVERAGE_SUFFIX 303 git_project = str(project[NAME]) 304 git_project_path = str(project[PATH]) 305 306 if name not in covered_modules: 307 logging.error("No coverage information for module %s", name) 308 continue 309 if git_project not in revision_dict: 310 logging.error("Git project not present in device revision dict: %s", 311 git_project) 312 continue 313 314 revision = str(revision_dict[git_project]) 315 archive = archive_parser.Archive(cov_zip.open(name).read()) 316 try: 317 archive.Parse() 318 except ValueError: 319 logging.error("Archive could not be parsed: %s", name) 320 continue 321 322 for gcno_file_path in archive.files: 323 file_name_path = gcno_file_path.rsplit(".", 1)[0] 324 file_name = os.path.basename(file_name_path) 325 gcno_content = archive.files[gcno_file_path] 326 gcno_stream = io.BytesIO(gcno_content) 327 try: 328 gcno_summary = gcno_parser.GCNOParser(gcno_stream).Parse() 329 except FileFormatError: 330 logging.error("Error parsing gcno file %s", gcno_file_path) 331 continue 332 src_file_path = None 333 334 # Match gcno file with gcda file 335 gcda_name = file_name + GCDA_SUFFIX 336 if gcda_name not in gcda_dict: 337 logging.error("No gcda file found %s.", gcda_name) 338 continue 339 340 src_file_path = self._ExtractSourceName(gcno_summary, file_name) 341 342 if not src_file_path: 343 logging.error("No source file found for %s.", gcno_file_path) 344 continue 345 346 # Process and merge gcno/gcda data 347 gcda_content = gcda_dict[gcda_name] 348 gcda_stream = io.BytesIO(gcda_content) 349 try: 350 gcda_parser.GCDAParser(gcda_stream).Parse(gcno_summary) 351 except FileFormatError: 352 logging.error("Error parsing gcda file %s", gcda_content) 353 continue 354 355 if self.web and self.web.enabled: 356 coverage_vec = coverage_report.GenerateLineCoverageVector( 357 src_file_path, gcno_summary) 358 total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec) 359 self.web.AddCoverageReport( 360 coverage_vec, src_file_path, git_project, git_project_path, 361 revision, covered_count, total_count, isGlobal) 362 363 def LoadArtifacts(self): 364 """Initializes the test for coverage instrumentation. 365 366 Downloads build artifacts from the build server 367 (gcno zip file and git revision dictionary) and prepares for coverage 368 measurement. 369 370 Requires coverage feature enabled; no-op otherwise. 371 """ 372 if not self.enabled: 373 return 374 375 self.enabled = False 376 377 # Use first device info to get product, flavor, and ID 378 # TODO: support multi-device builds 379 android_devices = getattr(self, keys.ConfigKeys.IKEY_ANDROID_DEVICE) 380 if not isinstance(android_devices, list): 381 logging.warn("android device information not available") 382 return 383 384 device_spec = android_devices[0] 385 build_flavor = device_spec.get(keys.ConfigKeys.IKEY_BUILD_FLAVOR) 386 device_build_id = device_spec.get(keys.ConfigKeys.IKEY_BUILD_ID) 387 388 if not build_flavor or not device_build_id: 389 logging.error("Could not read device information.") 390 return 391 392 build_flavor = str(build_flavor) 393 if not "coverage" in build_flavor: 394 build_flavor = "{0}_coverage".format(build_flavor) 395 product = build_flavor.split("-", 1)[0] 396 build_id = str(device_build_id) 397 398 # Get service json path 399 service_json_path = getattr(self, keys.ConfigKeys.IKEY_SERVICE_JSON_PATH) 400 401 # Instantiate build client 402 try: 403 build_client = artifact_fetcher.AndroidBuildClient(service_json_path) 404 except Exception as e: 405 logging.exception('Failed to instantiate build client: %s', e) 406 return 407 408 # Fetch repo dictionary 409 try: 410 revision_dict = build_client.GetRepoDictionary(_BRANCH, build_flavor, device_build_id) 411 setattr(self, _REVISION_DICT, revision_dict) 412 except Exception as e: 413 logging.exception('Failed to fetch repo dictionary: %s', e) 414 logging.info('Coverage disabled') 415 return 416 417 # Fetch coverage zip 418 try: 419 cov_zip = io.BytesIO( 420 build_client.GetCoverage(_BRANCH, build_flavor, device_build_id, product)) 421 cov_zip = zipfile.ZipFile(cov_zip) 422 setattr(self, _COVERAGE_ZIP, cov_zip) 423 except Exception as e: 424 logging.exception('Failed to fetch coverage zip: %s', e) 425 logging.info('Coverage disabled') 426 return 427 428 if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): 429 checksum_gcno_dict = self._GetChecksumGcnoDict(cov_zip) 430 setattr(self, _CHECKSUM_GCNO_DICT, checksum_gcno_dict) 431 432 self.enabled = True 433 434 def SetCoverageData(self, coverage_data=None, isGlobal=False, dut=None): 435 """Sets and processes coverage data. 436 437 Organizes coverage data and processes it into a coverage report in the 438 current test case 439 440 Requires feature to be enabled; no-op otherwise. 441 442 Args: 443 coverage_data may be either: 444 (1) a dict where gcda name is the key and binary 445 content is the value, or 446 (2) a list of NativeCodeCoverageRawDataMessage objects 447 (3) None if the data will be pulled from dut 448 isGlobal: True if the coverage data is for the entire test, False if 449 if the coverage data is just for the current test case. 450 dut: (optional) the device object for which to pull coverage data 451 """ 452 if not self.enabled: 453 return 454 455 if not coverage_data and dut: 456 coverage_data = self.GetGcdaDict(dut) 457 458 if not coverage_data: 459 logging.info("SetCoverageData: empty coverage data") 460 return 461 462 if isinstance(coverage_data, RepeatedCompositeFieldContainer): 463 gcda_dict = {} 464 for coverage_msg in coverage_data: 465 gcda_dict[coverage_msg.file_path] = coverage_msg.gcda 466 elif isinstance(coverage_data, dict): 467 gcda_dict = coverage_data 468 else: 469 logging.error("SetCoverageData: unexpected coverage_data type: %s", 470 str(type(coverage_data))) 471 return 472 logging.info("coverage file paths %s", str([fp for fp in gcda_dict])) 473 474 if not hasattr(self, keys.ConfigKeys.IKEY_MODULES): 475 # auto-process coverage data 476 self._AutoProcess(gcda_dict, isGlobal) 477 else: 478 # explicitly process coverage data for the specified modules 479 self._ManualProcess(gcda_dict, isGlobal) 480