1#
2# Copyright (C) 2016 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import io
16import logging
17import os
18import shutil
19import time
20import zipfile
21
22from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
23
24from vts.runners.host import keys
25from vts.utils.python.archive import archive_parser
26from vts.utils.python.build.api import artifact_fetcher
27from vts.utils.python.coverage import coverage_report
28from vts.utils.python.coverage import gcda_parser
29from vts.utils.python.coverage import gcno_parser
30from vts.utils.python.coverage.parser import FileFormatError
31from vts.utils.python.web import feature_utils
32
33TARGET_COVERAGE_PATH = "/data/misc/gcov/"
34LOCAL_COVERAGE_PATH = "/tmp/vts-test-coverage"
35
36GCNO_SUFFIX = ".gcno"
37GCDA_SUFFIX = ".gcda"
38COVERAGE_SUFFIX = ".gcnodir"
39GIT_PROJECT = "git_project"
40MODULE_NAME = "module_name"
41NAME = "name"
42PATH = "path"
43
44_BRANCH = "master"  # TODO: make this a runtime parameter
45_CHECKSUM_GCNO_DICT = "checksum_gcno_dict"
46_COVERAGE_ZIP = "coverage_zip"
47_REVISION_DICT = "revision_dict"
48
49
50class CoverageFeature(feature_utils.Feature):
51    """Feature object for coverage functionality.
52
53    Attributes:
54        enabled: boolean, True if coverage is enabled, False otherwise
55        web: (optional) WebFeature, object storing web feature util for test run
56    """
57
58    _TOGGLE_PARAM = keys.ConfigKeys.IKEY_ENABLE_COVERAGE
59    _REQUIRED_PARAMS = [
60            keys.ConfigKeys.IKEY_ANDROID_DEVICE,
61            keys.ConfigKeys.IKEY_SERVICE_JSON_PATH,
62            keys.ConfigKeys.IKEY_ANDROID_DEVICE
63        ]
64    _OPTIONAL_PARAMS = [keys.ConfigKeys.IKEY_MODULES]
65
66    def __init__(self, user_params, web=None):
67        """Initializes the coverage feature.
68
69        Args:
70            user_params: A dictionary from parameter name (String) to parameter value.
71            web: (optional) WebFeature, object storing web feature util for test run
72        """
73        self.ParseParameters(self._TOGGLE_PARAM, self._REQUIRED_PARAMS, self._OPTIONAL_PARAMS,
74                             user_params)
75        self.web = web
76        logging.info("Coverage enabled: %s", self.enabled)
77
78    def _ExtractSourceName(self, gcno_summary, file_name):
79        """Gets the source name from the GCNO summary object.
80
81        Gets the original source file name from the FileSummary object describing
82        a gcno file using the base filename of the gcno/gcda file.
83
84        Args:
85            gcno_summary: a FileSummary object describing a gcno file
86            file_name: the base filename (without extensions) of the gcno or gcda file
87
88        Returns:
89            The relative path to the original source file corresponding to the
90            provided gcno summary. The path is relative to the root of the build.
91        """
92        src_file_path = None
93        for key in gcno_summary.functions:
94            src_file_path = gcno_summary.functions[key].src_file_name
95            src_parts = src_file_path.rsplit(".", 1)
96            src_file_name = src_parts[0]
97            src_extension = src_parts[1] if len(src_parts) > 1 else None
98            if src_extension not in ["c", "cpp", "cc"]:
99                logging.warn("Found unsupported file type: %s", src_file_path)
100                continue
101            if src_file_name.endswith(file_name):
102                logging.info("Coverage source file: %s", src_file_path)
103                break
104        return src_file_path
105
106    def _GetChecksumGcnoDict(self, cov_zip):
107        """Generates a dictionary from gcno checksum to GCNOParser object.
108
109        Processes the gcnodir files in the zip file to produce a mapping from gcno
110        checksum to the GCNOParser object wrapping the gcno content.
111
112        Args:
113            cov_zip: the zip file containing gcnodir files from the device build
114
115        Returns:
116            the dictionary of gcno checksums to GCNOParser objects
117        """
118        checksum_gcno_dict = dict()
119        fnames = cov_zip.namelist()
120        instrumented_modules = [f for f in fnames if f.endswith(COVERAGE_SUFFIX)]
121        for instrumented_module in instrumented_modules:
122            # Read the gcnodir file
123            archive = archive_parser.Archive(cov_zip.open(instrumented_module).read())
124            try:
125                archive.Parse()
126            except ValueError:
127                logging.error("Archive could not be parsed: %s", name)
128                continue
129
130            for gcno_file_path in archive.files:
131                file_name_path = gcno_file_path.rsplit(".", 1)[0]
132                file_name = os.path.basename(file_name_path)
133                gcno_stream = io.BytesIO(archive.files[gcno_file_path])
134                gcno_file_parser = gcno_parser.GCNOParser(gcno_stream)
135                checksum_gcno_dict[gcno_file_parser.checksum] = gcno_file_parser
136        return checksum_gcno_dict
137
138    def InitializeDeviceCoverage(self, dut):
139        """Initializes the device for coverage before tests run.
140
141        Finds and removes all gcda files under TARGET_COVERAGE_PATH before tests
142        run.
143
144        Args:
145            dut: the device under test.
146        """
147        logging.info("Removing existing gcda files.")
148        gcda_files = dut.adb.shell("find %s -name \"*.gcda\" -type f -delete" %
149                                   TARGET_COVERAGE_PATH)
150
151    def GetGcdaDict(self, dut, local_coverage_path=None):
152        """Retrieves GCDA files from device and creates a dictionary of files.
153
154        Find all GCDA files on the target device, copy them to the host using
155        adb, then return a dictionary mapping from the gcda basename to the
156        temp location on the host.
157
158        Args:
159            dut: the device under test.
160            local_coverage_path: the host path (string) in which to copy gcda files
161
162        Returns:
163            A dictionary with gcda basenames as keys and contents as the values.
164        """
165        logging.info("Creating gcda dictionary")
166        gcda_dict = {}
167        if not local_coverage_path:
168            timestamp = str(int(time.time() * 1000000))
169            local_coverage_path = os.path.join(LOCAL_COVERAGE_PATH, timestamp)
170        if os.path.exists(local_coverage_path):
171            shutil.rmtree(local_coverage_path)
172        os.makedirs(local_coverage_path)
173        logging.info("Storing gcda tmp files to: %s", local_coverage_path)
174        gcda_files = dut.adb.shell("find %s -name \"*.gcda\"" %
175                                   TARGET_COVERAGE_PATH).split("\n")
176        for gcda in gcda_files:
177            if gcda:
178                basename = os.path.basename(gcda.strip())
179                file_name = os.path.join(local_coverage_path,
180                                         basename)
181                dut.adb.pull("%s %s" % (gcda, file_name))
182                gcda_content = open(file_name, "rb").read()
183                gcda_dict[basename] = gcda_content
184        return gcda_dict
185
186    def _AutoProcess(self, gcda_dict, isGlobal):
187        """Process coverage data and appends coverage reports to the report message.
188
189        Matches gcno files with gcda files and processes them into a coverage report
190        with references to the original source code used to build the system image.
191        Coverage information is appended as a CoverageReportMessage to the provided
192        report message.
193
194        Git project information is automatically extracted from the build info and
195        the source file name enclosed in each gcno file. Git project names must
196        resemble paths and may differ from the paths to their project root by at
197        most one. If no match is found, then coverage information will not be
198        be processed.
199
200        e.g. if the project path is test/vts, then its project name may be
201             test/vts or <some folder>/test/vts in order to be recognized.
202
203        Args:
204            gcda_dict: the dictionary of gcda basenames to gcda content (binary string)
205            isGlobal: boolean, True if the coverage data is for the entire test, False if only for
206                      the current test case.
207        """
208        revision_dict = getattr(self, _REVISION_DICT, None)
209        checksum_gcno_dict = getattr(self, _CHECKSUM_GCNO_DICT, None)
210        for gcda_name in gcda_dict:
211            gcda_stream = io.BytesIO(gcda_dict[gcda_name])
212            gcda_file_parser = gcda_parser.GCDAParser(gcda_stream)
213
214            if not gcda_file_parser.checksum in checksum_gcno_dict:
215                logging.info("No matching gcno file for gcda: %s", gcda_name)
216                continue
217            gcno_file_parser = checksum_gcno_dict[gcda_file_parser.checksum]
218
219            try:
220                gcno_summary = gcno_file_parser.Parse()
221            except FileFormatError:
222                logging.error("Error parsing gcno for gcda %s", gcda_name)
223                continue
224
225            file_name = gcda_name.rsplit(".", 1)[0]
226            src_file_path = self._ExtractSourceName(gcno_summary, file_name)
227
228            if not src_file_path:
229                logging.error("No source file found for gcda %s.", gcda_name)
230                continue
231
232            # Process and merge gcno/gcda data
233            try:
234                gcda_file_parser.Parse(gcno_summary)
235            except FileFormatError:
236                logging.error("Error parsing gcda file %s", gcda_name)
237                continue
238
239            # Get the git project information
240            # Assumes that the project name and path to the project root are similar
241            revision = None
242            for project_name in revision_dict:
243                # Matches cases when source file root and project name are the same
244                if src_file_path.startswith(str(project_name)):
245                    git_project_name = str(project_name)
246                    git_project_path = str(project_name)
247                    revision = str(revision_dict[project_name])
248                    logging.info("Source file '%s' matched with project '%s'",
249                                 src_file_path, git_project_name)
250                    break
251
252                parts = os.path.normpath(str(project_name)).split(os.sep, 1)
253                # Matches when project name has an additional prefix before the
254                # project path root.
255                if len(parts) > 1 and src_file_path.startswith(parts[-1]):
256                    git_project_name = str(project_name)
257                    git_project_path = parts[-1]
258                    revision = str(revision_dict[project_name])
259                    logging.info("Source file '%s' matched with project '%s'",
260                                 src_file_path, git_project_name)
261
262            if not revision:
263                logging.info("Could not find git info for %s", src_file_path)
264                continue
265
266            if self.web and self.web.enabled:
267                coverage_vec = coverage_report.GenerateLineCoverageVector(
268                    src_file_path, gcno_summary)
269                total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec)
270                self.web.AddCoverageReport(
271                    coverage_vec, src_file_path, git_project_name, git_project_path,
272                    revision, covered_count, total_count, isGlobal)
273
274    def _ManualProcess(self, gcda_dict, isGlobal):
275        """Process coverage data and appends coverage reports to the report message.
276
277        Opens the gcno files in the cov_zip for the specified modules and matches
278        gcno/gcda files. Then, coverage vectors are generated for each set of matching
279        gcno/gcda files and appended as a CoverageReportMessage to the provided
280        report message. Unlike AutoProcess, coverage information is only processed
281        for the modules explicitly defined in 'modules'.
282
283        Args:
284            gcda_dict: the dictionary of gcda basenames to gcda content (binary string)
285            isGlobal: boolean, True if the coverage data is for the entire test, False if only for
286                      the current test case.
287        """
288        cov_zip = getattr(self, _COVERAGE_ZIP, None)
289        revision_dict = getattr(self, _REVISION_DICT, None)
290        modules = getattr(self, keys.ConfigKeys.IKEY_MODULES, None)
291        covered_modules = set(cov_zip.namelist())
292        for module in modules:
293            if MODULE_NAME not in module or GIT_PROJECT not in module:
294                logging.error("Coverage module must specify name and git project: %s",
295                              module)
296                continue
297            project = module[GIT_PROJECT]
298            if PATH not in project or NAME not in project:
299                logging.error("Project name and path not specified: %s", project)
300                continue
301
302            name = str(module[MODULE_NAME]) + COVERAGE_SUFFIX
303            git_project = str(project[NAME])
304            git_project_path = str(project[PATH])
305
306            if name not in covered_modules:
307                logging.error("No coverage information for module %s", name)
308                continue
309            if git_project not in revision_dict:
310                logging.error("Git project not present in device revision dict: %s",
311                              git_project)
312                continue
313
314            revision = str(revision_dict[git_project])
315            archive = archive_parser.Archive(cov_zip.open(name).read())
316            try:
317                archive.Parse()
318            except ValueError:
319                logging.error("Archive could not be parsed: %s", name)
320                continue
321
322            for gcno_file_path in archive.files:
323                file_name_path = gcno_file_path.rsplit(".", 1)[0]
324                file_name = os.path.basename(file_name_path)
325                gcno_content = archive.files[gcno_file_path]
326                gcno_stream = io.BytesIO(gcno_content)
327                try:
328                    gcno_summary = gcno_parser.GCNOParser(gcno_stream).Parse()
329                except FileFormatError:
330                    logging.error("Error parsing gcno file %s", gcno_file_path)
331                    continue
332                src_file_path = None
333
334                # Match gcno file with gcda file
335                gcda_name = file_name + GCDA_SUFFIX
336                if gcda_name not in gcda_dict:
337                    logging.error("No gcda file found %s.", gcda_name)
338                    continue
339
340                src_file_path = self._ExtractSourceName(gcno_summary, file_name)
341
342                if not src_file_path:
343                    logging.error("No source file found for %s.", gcno_file_path)
344                    continue
345
346                # Process and merge gcno/gcda data
347                gcda_content = gcda_dict[gcda_name]
348                gcda_stream = io.BytesIO(gcda_content)
349                try:
350                    gcda_parser.GCDAParser(gcda_stream).Parse(gcno_summary)
351                except FileFormatError:
352                    logging.error("Error parsing gcda file %s", gcda_content)
353                    continue
354
355                if self.web and self.web.enabled:
356                    coverage_vec = coverage_report.GenerateLineCoverageVector(
357                        src_file_path, gcno_summary)
358                    total_count, covered_count = coverage_report.GetCoverageStats(coverage_vec)
359                    self.web.AddCoverageReport(
360                        coverage_vec, src_file_path, git_project, git_project_path,
361                        revision, covered_count, total_count, isGlobal)
362
363    def LoadArtifacts(self):
364        """Initializes the test for coverage instrumentation.
365
366        Downloads build artifacts from the build server
367        (gcno zip file and git revision dictionary) and prepares for coverage
368        measurement.
369
370        Requires coverage feature enabled; no-op otherwise.
371        """
372        if not self.enabled:
373            return
374
375        self.enabled = False
376
377        # Use first device info to get product, flavor, and ID
378        # TODO: support multi-device builds
379        android_devices = getattr(self, keys.ConfigKeys.IKEY_ANDROID_DEVICE)
380        if not isinstance(android_devices, list):
381            logging.warn("android device information not available")
382            return
383
384        device_spec = android_devices[0]
385        build_flavor = device_spec.get(keys.ConfigKeys.IKEY_BUILD_FLAVOR)
386        device_build_id = device_spec.get(keys.ConfigKeys.IKEY_BUILD_ID)
387
388        if not build_flavor or not device_build_id:
389            logging.error("Could not read device information.")
390            return
391
392        build_flavor = str(build_flavor)
393        if not "coverage" in build_flavor:
394            build_flavor = "{0}_coverage".format(build_flavor)
395        product = build_flavor.split("-", 1)[0]
396        build_id = str(device_build_id)
397
398        # Get service json path
399        service_json_path = getattr(self, keys.ConfigKeys.IKEY_SERVICE_JSON_PATH)
400
401        # Instantiate build client
402        try:
403            build_client = artifact_fetcher.AndroidBuildClient(service_json_path)
404        except Exception as e:
405            logging.exception('Failed to instantiate build client: %s', e)
406            return
407
408        # Fetch repo dictionary
409        try:
410            revision_dict = build_client.GetRepoDictionary(_BRANCH, build_flavor, device_build_id)
411            setattr(self, _REVISION_DICT, revision_dict)
412        except Exception as e:
413            logging.exception('Failed to fetch repo dictionary: %s', e)
414            logging.info('Coverage disabled')
415            return
416
417        # Fetch coverage zip
418        try:
419            cov_zip = io.BytesIO(
420                build_client.GetCoverage(_BRANCH, build_flavor, device_build_id, product))
421            cov_zip = zipfile.ZipFile(cov_zip)
422            setattr(self, _COVERAGE_ZIP, cov_zip)
423        except Exception as e:
424            logging.exception('Failed to fetch coverage zip: %s', e)
425            logging.info('Coverage disabled')
426            return
427
428        if not hasattr(self, keys.ConfigKeys.IKEY_MODULES):
429            checksum_gcno_dict = self._GetChecksumGcnoDict(cov_zip)
430            setattr(self, _CHECKSUM_GCNO_DICT, checksum_gcno_dict)
431
432        self.enabled = True
433
434    def SetCoverageData(self, coverage_data=None, isGlobal=False, dut=None):
435        """Sets and processes coverage data.
436
437        Organizes coverage data and processes it into a coverage report in the
438        current test case
439
440        Requires feature to be enabled; no-op otherwise.
441
442        Args:
443            coverage_data may be either:
444                          (1) a dict where gcda name is the key and binary
445                              content is the value, or
446                          (2) a list of NativeCodeCoverageRawDataMessage objects
447                          (3) None if the data will be pulled from dut
448            isGlobal: True if the coverage data is for the entire test, False if
449                      if the coverage data is just for the current test case.
450            dut: (optional) the device object for which to pull coverage data
451        """
452        if not self.enabled:
453            return
454
455        if not coverage_data and dut:
456            coverage_data = self.GetGcdaDict(dut)
457
458        if not coverage_data:
459            logging.info("SetCoverageData: empty coverage data")
460            return
461
462        if isinstance(coverage_data, RepeatedCompositeFieldContainer):
463            gcda_dict = {}
464            for coverage_msg in coverage_data:
465                gcda_dict[coverage_msg.file_path] = coverage_msg.gcda
466        elif isinstance(coverage_data, dict):
467            gcda_dict = coverage_data
468        else:
469            logging.error("SetCoverageData: unexpected coverage_data type: %s",
470                          str(type(coverage_data)))
471            return
472        logging.info("coverage file paths %s", str([fp for fp in gcda_dict]))
473
474        if not hasattr(self, keys.ConfigKeys.IKEY_MODULES):
475            # auto-process coverage data
476            self._AutoProcess(gcda_dict, isGlobal)
477        else:
478            # explicitly process coverage data for the specified modules
479            self._ManualProcess(gcda_dict, isGlobal)
480