1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tools for running presubmit checks in a Git repository.
15
16Presubmit checks are defined as a function or other callable. The function may
17take either no arguments or a list of the paths on which to run. Presubmit
18checks communicate failure by raising any exception.
19
20For example, either of these functions may be used as presubmit checks:
21
22  @pw_presubmit.filter_paths(endswith='.py')
23  def file_contains_ni(ctx: PresubmitContext):
24      for path in ctx.paths:
25          with open(path) as file:
26              contents = file.read()
27              if 'ni' not in contents and 'nee' not in contents:
28                  raise PresumitFailure('Files must say "ni"!', path=path)
29
30  def run_the_build():
31      subprocess.run(['make', 'release'], check=True)
32
33Presubmit checks that accept a list of paths may use the filter_paths decorator
34to automatically filter the paths list for file types they care about. See the
35pragma_once function for an example.
36
37See pigweed_presbumit.py for an example of how to define presubmit checks.
38"""
39
40import collections
41import contextlib
42import dataclasses
43import enum
44from inspect import Parameter, signature
45import itertools
46import logging
47import os
48from pathlib import Path
49import re
50import subprocess
51import time
52from typing import (Callable, Collection, Dict, Iterable, Iterator, List,
53                    NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union)
54
55from pw_presubmit import git_repo, tools
56from pw_presubmit.tools import plural
57
58_LOG: logging.Logger = logging.getLogger(__name__)
59
60color_red = tools.make_color(31)
61color_bold_red = tools.make_color(31, 1)
62color_black_on_red = tools.make_color(30, 41)
63color_yellow = tools.make_color(33, 1)
64color_green = tools.make_color(32)
65color_black_on_green = tools.make_color(30, 42)
66color_aqua = tools.make_color(36)
67color_bold_white = tools.make_color(37, 1)
68
69_SUMMARY_BOX = '══╦╗ ║║══╩╝'
70_CHECK_UPPER = '━━━┓       '
71_CHECK_LOWER = '       ━━━┛'
72
73WIDTH = 80
74
75_LEFT = 7
76_RIGHT = 11
77
78
79def _title(msg, style=_SUMMARY_BOX) -> str:
80    msg = f' {msg} '.center(WIDTH - 2)
81    return tools.make_box('^').format(*style, section1=msg, width1=len(msg))
82
83
84def _format_time(time_s: float) -> str:
85    minutes, seconds = divmod(time_s, 60)
86    if minutes < 60:
87        return f' {int(minutes)}:{seconds:04.1f}'
88    hours, minutes = divmod(minutes, 60)
89    return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}'
90
91
92def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
93    return box.format(*style,
94                      section1=left + ('' if left.endswith(' ') else ' '),
95                      width1=_LEFT,
96                      section2=' ' + middle,
97                      width2=WIDTH - _LEFT - _RIGHT - 4,
98                      section3=right + ' ',
99                      width3=_RIGHT)
100
101
102class PresubmitFailure(Exception):
103    """Optional exception to use for presubmit failures."""
104    def __init__(self, description: str = '', path=None):
105        super().__init__(f'{path}: {description}' if path else description)
106
107
108class _Result(enum.Enum):
109
110    PASS = 'PASSED'  # Check completed successfully.
111    FAIL = 'FAILED'  # Check failed.
112    CANCEL = 'CANCEL'  # Check didn't complete.
113
114    def colorized(self, width: int, invert: bool = False) -> str:
115        if self is _Result.PASS:
116            color = color_black_on_green if invert else color_green
117        elif self is _Result.FAIL:
118            color = color_black_on_red if invert else color_red
119        elif self is _Result.CANCEL:
120            color = color_yellow
121        else:
122            color = lambda value: value
123
124        padding = (width - len(self.value)) // 2 * ' '
125        return padding + color(self.value) + padding
126
127
128class Program(collections.abc.Sequence):
129    """A sequence of presubmit checks; basically a tuple with a name."""
130    def __init__(self, name: str, steps: Iterable[Callable]):
131        self.name = name
132        self._steps = tuple(tools.flatten(steps))
133
134    def __getitem__(self, i):
135        return self._steps[i]
136
137    def __len__(self):
138        return len(self._steps)
139
140    def __str__(self):
141        return self.name
142
143    def title(self):
144        return f'{self.name if self.name else ""} presubmit checks'.strip()
145
146
147class Programs(collections.abc.Mapping):
148    """A mapping of presubmit check programs.
149
150    Use is optional. Helpful when managing multiple presubmit check programs.
151    """
152    def __init__(self, **programs: Sequence):
153        """Initializes a name: program mapping from the provided keyword args.
154
155        A program is a sequence of presubmit check functions. The sequence may
156        contain nested sequences, which are flattened.
157        """
158        self._programs: Dict[str, Program] = {
159            name: Program(name, checks)
160            for name, checks in programs.items()
161        }
162
163    def all_steps(self) -> Dict[str, Callable]:
164        return {c.__name__: c for c in itertools.chain(*self.values())}
165
166    def __getitem__(self, item: str) -> Program:
167        return self._programs[item]
168
169    def __iter__(self) -> Iterator[str]:
170        return iter(self._programs)
171
172    def __len__(self) -> int:
173        return len(self._programs)
174
175
176@dataclasses.dataclass(frozen=True)
177class PresubmitContext:
178    """Context passed into presubmit checks."""
179    root: Path
180    repos: Tuple[Path, ...]
181    output_dir: Path
182    paths: Tuple[Path, ...]
183    package_root: Path
184
185    def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]:
186        return tuple(
187            tools.relative_paths(self.paths, start if start else self.root))
188
189    def paths_by_repo(self) -> Dict[Path, List[Path]]:
190        repos = collections.defaultdict(list)
191
192        for path in self.paths:
193            repos[git_repo.root(path)].append(path)
194
195        return repos
196
197
198class _Filter(NamedTuple):
199    endswith: Tuple[str, ...] = ('', )
200    exclude: Tuple[Pattern[str], ...] = ()
201
202    def matches(self, path: str) -> bool:
203        return (any(path.endswith(end) for end in self.endswith)
204                and not any(exp.search(path) for exp in self.exclude))
205
206
207def _print_ui(*args) -> None:
208    """Prints to stdout and flushes to stay in sync with logs on stderr."""
209    print(*args, flush=True)
210
211
212class Presubmit:
213    """Runs a series of presubmit checks on a list of files."""
214    def __init__(self, root: Path, repos: Sequence[Path],
215                 output_directory: Path, paths: Sequence[Path],
216                 package_root: Path):
217        self._root = root.resolve()
218        self._repos = tuple(repos)
219        self._output_directory = output_directory.resolve()
220        self._paths = tuple(paths)
221        self._relative_paths = tuple(
222            tools.relative_paths(self._paths, self._root))
223        self._package_root = package_root.resolve()
224
225    def run(self, program: Program, keep_going: bool = False) -> bool:
226        """Executes a series of presubmit checks on the paths."""
227
228        checks = self._apply_filters(program)
229
230        _LOG.debug('Running %s for %s', program.title(), self._root.name)
231        _print_ui(_title(f'{self._root.name}: {program.title()}'))
232
233        _LOG.info('%d of %d checks apply to %s in %s', len(checks),
234                  len(program), plural(self._paths, 'file'), self._root)
235
236        _print_ui()
237        for line in tools.file_summary(self._relative_paths):
238            _print_ui(line)
239        _print_ui()
240
241        if not self._paths:
242            _print_ui(color_yellow('No files are being checked!'))
243
244        _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks))
245
246        start_time: float = time.time()
247        passed, failed, skipped = self._execute_checks(checks, keep_going)
248        self._log_summary(time.time() - start_time, passed, failed, skipped)
249
250        return not failed and not skipped
251
252    def _apply_filters(
253            self, program: Sequence[Callable]
254    ) -> List[Tuple['_Check', Sequence[Path]]]:
255        """Returns list of (check, paths) for checks that should run."""
256        checks = [c if isinstance(c, _Check) else _Check(c) for c in program]
257        filter_to_checks: Dict[_Filter,
258                               List[_Check]] = collections.defaultdict(list)
259
260        for check in checks:
261            filter_to_checks[check.filter].append(check)
262
263        check_to_paths = self._map_checks_to_paths(filter_to_checks)
264        return [(c, check_to_paths[c]) for c in checks if c in check_to_paths]
265
266    def _map_checks_to_paths(
267        self, filter_to_checks: Dict[_Filter, List['_Check']]
268    ) -> Dict['_Check', Sequence[Path]]:
269        checks_to_paths: Dict[_Check, Sequence[Path]] = {}
270
271        posix_paths = tuple(p.as_posix() for p in self._relative_paths)
272
273        for filt, checks in filter_to_checks.items():
274            filtered_paths = tuple(
275                path for path, filter_path in zip(self._paths, posix_paths)
276                if filt.matches(filter_path))
277
278            for check in checks:
279                if filtered_paths or check.always_run:
280                    checks_to_paths[check] = filtered_paths
281                else:
282                    _LOG.debug('Skipping "%s": no relevant files', check.name)
283
284        return checks_to_paths
285
286    def _log_summary(self, time_s: float, passed: int, failed: int,
287                     skipped: int) -> None:
288        summary_items = []
289        if passed:
290            summary_items.append(f'{passed} passed')
291        if failed:
292            summary_items.append(f'{failed} failed')
293        if skipped:
294            summary_items.append(f'{skipped} not run')
295        summary = ', '.join(summary_items) or 'nothing was done'
296
297        result = _Result.FAIL if failed or skipped else _Result.PASS
298        total = passed + failed + skipped
299
300        _LOG.debug('Finished running %d checks on %s in %.1f s', total,
301                   plural(self._paths, 'file'), time_s)
302        _LOG.debug('Presubmit checks %s: %s', result.value, summary)
303
304        _print_ui(
305            _box(
306                _SUMMARY_BOX, result.colorized(_LEFT, invert=True),
307                f'{total} checks on {plural(self._paths, "file")}: {summary}',
308                _format_time(time_s)))
309
310    @contextlib.contextmanager
311    def _context(self, name: str, paths: Tuple[Path, ...]):
312        # There are many characters banned from filenames on Windows. To
313        # simplify things, just strip everything that's not a letter, digit,
314        # or underscore.
315        sanitized_name = re.sub(r'[\W_]+', '_', name).lower()
316        output_directory = self._output_directory.joinpath(sanitized_name)
317        os.makedirs(output_directory, exist_ok=True)
318
319        handler = logging.FileHandler(output_directory.joinpath('step.log'),
320                                      mode='w')
321        handler.setLevel(logging.DEBUG)
322
323        try:
324            _LOG.addHandler(handler)
325
326            yield PresubmitContext(
327                root=self._root,
328                repos=self._repos,
329                output_dir=output_directory,
330                paths=paths,
331                package_root=self._package_root,
332            )
333
334        finally:
335            _LOG.removeHandler(handler)
336
337    def _execute_checks(self, program,
338                        keep_going: bool) -> Tuple[int, int, int]:
339        """Runs presubmit checks; returns (passed, failed, skipped) lists."""
340        passed = failed = 0
341
342        for i, (check, paths) in enumerate(program, 1):
343            with self._context(check.name, paths) as ctx:
344                result = check.run(ctx, i, len(program))
345
346            if result is _Result.PASS:
347                passed += 1
348            elif result is _Result.CANCEL:
349                break
350            else:
351                failed += 1
352                if not keep_going:
353                    break
354
355        return passed, failed, len(program) - passed - failed
356
357
358def _process_pathspecs(repos: Iterable[Path],
359                       pathspecs: Iterable[str]) -> Dict[Path, List[str]]:
360    pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos}
361    repos_with_paths: Set[Path] = set()
362
363    for pathspec in pathspecs:
364        # If the pathspec is a path to an existing file, only use it for the
365        # repo it is in.
366        if os.path.exists(pathspec):
367            # Raise an exception if the path exists but is not in a known repo.
368            repo = git_repo.within_repo(pathspec)
369            if repo not in pathspecs_by_repo:
370                raise ValueError(
371                    f'{pathspec} is not in a Git repository in this presubmit')
372
373            # Make the path relative to the repo's root.
374            pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo))
375            repos_with_paths.add(repo)
376        else:
377            # Pathspecs that are not paths (e.g. '*.h') are used for all repos.
378            for patterns in pathspecs_by_repo.values():
379                patterns.append(pathspec)
380
381    # If any paths were specified, only search for paths in those repos.
382    if repos_with_paths:
383        for repo in set(pathspecs_by_repo) - repos_with_paths:
384            del pathspecs_by_repo[repo]
385
386    return pathspecs_by_repo
387
388
389def run(program: Sequence[Callable],
390        root: Path,
391        repos: Collection[Path] = (),
392        base: Optional[str] = None,
393        paths: Sequence[str] = (),
394        exclude: Sequence[Pattern] = (),
395        output_directory: Optional[Path] = None,
396        package_root: Path = None,
397        keep_going: bool = False) -> bool:
398    """Lists files in the current Git repo and runs a Presubmit with them.
399
400    This changes the directory to the root of the Git repository after listing
401    paths, so all presubmit checks can assume they run from there.
402
403    The paths argument contains Git pathspecs. If no pathspecs are provided, all
404    paths in all repos are included. If paths to files or directories are
405    provided, only files within those repositories are searched. Patterns are
406    searched across all repositories. For example, if the pathspecs "my_module/"
407    and "*.h", paths under "my_module/" in the containing repo and paths in all
408    repos matching "*.h" will be included in the presubmit.
409
410    Args:
411        program: list of presubmit check functions to run
412        root: root path of the project
413        repos: paths to the roots of Git repositories to check
414        name: name to use to refer to this presubmit check run
415        base: optional base Git commit to list files against
416        paths: optional list of Git pathspecs to run the checks against
417        exclude: regular expressions for Posix-style paths to exclude
418        output_directory: where to place output files
419        package_root: where to place package files
420        keep_going: whether to continue running checks if an error occurs
421
422    Returns:
423        True if all presubmit checks succeeded
424    """
425    repos = [repo.resolve() for repo in repos]
426
427    for repo in repos:
428        if git_repo.root(repo) != repo:
429            raise ValueError(f'{repo} is not the root of a Git repo; '
430                             'presubmit checks must be run from a Git repo')
431
432    pathspecs_by_repo = _process_pathspecs(repos, paths)
433
434    files: List[Path] = []
435
436    for repo, pathspecs in pathspecs_by_repo.items():
437        files += tools.exclude_paths(
438            exclude, git_repo.list_files(base, pathspecs, repo), root)
439
440        _LOG.info(
441            'Checking %s',
442            git_repo.describe_files(repo, repo, base, pathspecs, exclude))
443
444    if output_directory is None:
445        output_directory = root / '.presubmit'
446
447    if package_root is None:
448        package_root = output_directory / 'packages'
449
450    presubmit = Presubmit(
451        root=root,
452        repos=repos,
453        output_directory=output_directory,
454        paths=files,
455        package_root=package_root,
456    )
457
458    if not isinstance(program, Program):
459        program = Program('', program)
460
461    return presubmit.run(program, keep_going)
462
463
464class _Check:
465    """Wraps a presubmit check function.
466
467    This class consolidates the logic for running and logging a presubmit check.
468    It also supports filtering the paths passed to the presubmit check.
469    """
470    def __init__(self,
471                 check_function: Callable,
472                 path_filter: _Filter = _Filter(),
473                 always_run: bool = True):
474        _ensure_is_valid_presubmit_check_function(check_function)
475
476        self._check: Callable = check_function
477        self.filter: _Filter = path_filter
478        self.always_run: bool = always_run
479
480        # Since _Check wraps a presubmit function, adopt that function's name.
481        self.__name__ = self._check.__name__
482
483    @property
484    def name(self):
485        return self.__name__
486
487    def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result:
488        """Runs the presubmit check on the provided paths."""
489
490        _print_ui(
491            _box(_CHECK_UPPER, f'{count}/{total}', self.name,
492                 plural(ctx.paths, "file")))
493
494        _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name,
495                   plural(ctx.paths, "file"))
496
497        start_time_s = time.time()
498        result = self._call_function(ctx)
499        time_str = _format_time(time.time() - start_time_s)
500        _LOG.debug('%s %s', self.name, result.value)
501
502        _print_ui(
503            _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str))
504        _LOG.debug('%s duration:%s', self.name, time_str)
505
506        return result
507
508    def _call_function(self, ctx: PresubmitContext) -> _Result:
509        try:
510            self._check(ctx)
511        except PresubmitFailure as failure:
512            if str(failure):
513                _LOG.warning('%s', failure)
514            return _Result.FAIL
515        except Exception as failure:  # pylint: disable=broad-except
516            _LOG.exception('Presubmit check %s failed!', self.name)
517            return _Result.FAIL
518        except KeyboardInterrupt:
519            _print_ui()
520            return _Result.CANCEL
521
522        return _Result.PASS
523
524    def __call__(self, ctx: PresubmitContext, *args, **kwargs):
525        """Calling a _Check calls its underlying function directly.
526
527      This makes it possible to call functions wrapped by @filter_paths. The
528      prior filters are ignored, so new filters may be applied.
529      """
530        return self._check(ctx, *args, **kwargs)
531
532
533def _required_args(function: Callable) -> Iterable[Parameter]:
534    """Returns the required arguments for a function."""
535    optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD
536
537    for param in signature(function).parameters.values():
538        if param.default is param.empty and param.kind not in optional_types:
539            yield param
540
541
542def _ensure_is_valid_presubmit_check_function(check: Callable) -> None:
543    """Checks if a Callable can be used as a presubmit check."""
544    try:
545        required_args = tuple(_required_args(check))
546    except (TypeError, ValueError):
547        raise TypeError('Presubmit checks must be callable, but '
548                        f'{check!r} is a {type(check).__name__}')
549
550    if len(required_args) != 1:
551        raise TypeError(
552            f'Presubmit check functions must have exactly one required '
553            f'positional argument (the PresubmitContext), but '
554            f'{check.__name__} has {len(required_args)} required arguments' +
555            (f' ({", ".join(a.name for a in required_args)})'
556             if required_args else ''))
557
558
559def _make_str_tuple(value: Iterable[str]) -> Tuple[str, ...]:
560    return tuple([value] if isinstance(value, str) else value)
561
562
563def filter_paths(endswith: Iterable[str] = (''),
564                 exclude: Iterable[Union[Pattern[str], str]] = (),
565                 always_run: bool = False) -> Callable[[Callable], _Check]:
566    """Decorator for filtering the paths list for a presubmit check function.
567
568    Path filters only apply when the function is used as a presubmit check.
569    Filters are ignored when the functions are called directly. This makes it
570    possible to reuse functions wrapped in @filter_paths in other presubmit
571    checks, potentially with different path filtering rules.
572
573    Args:
574        endswith: str or iterable of path endings to include
575        exclude: regular expressions of paths to exclude
576
577    Returns:
578        a wrapped version of the presubmit function
579    """
580    def filter_paths_for_function(function: Callable):
581        return _Check(function,
582                      _Filter(_make_str_tuple(endswith),
583                              tuple(re.compile(e) for e in exclude)),
584                      always_run=always_run)
585
586    return filter_paths_for_function
587
588
589@filter_paths(endswith='.h', exclude=(r'\.pb\.h$', ))
590def pragma_once(ctx: PresubmitContext) -> None:
591    """Presubmit check that ensures all header files contain '#pragma once'."""
592
593    for path in ctx.paths:
594        with open(path) as file:
595            for line in file:
596                if line.startswith('#pragma once'):
597                    break
598            else:
599                raise PresubmitFailure('#pragma once is missing!', path=path)
600
601
602def call(*args, **kwargs) -> None:
603    """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
604    attributes, command = tools.format_command(args, kwargs)
605    _LOG.debug('[RUN] %s\n%s', attributes, command)
606
607    process = subprocess.run(args,
608                             stdout=subprocess.PIPE,
609                             stderr=subprocess.STDOUT,
610                             **kwargs)
611    logfunc = _LOG.warning if process.returncode else _LOG.debug
612
613    logfunc('[FINISHED]\n%s', command)
614    logfunc('[RESULT] %s with return code %d',
615            'Failed' if process.returncode else 'Passed', process.returncode)
616
617    output = process.stdout.decode(errors='backslashreplace')
618    if output:
619        logfunc('[OUTPUT]\n%s', output)
620
621    if process.returncode:
622        raise PresubmitFailure
623