1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Script that preprocesses a Python command then runs it.
15
16This script evaluates expressions in the Python command's arguments then invokes
17the command.
18"""
19
20import argparse
21from dataclasses import dataclass
22import enum
23import logging
24import os
25from pathlib import Path
26import re
27import shlex
28import subprocess
29import sys
30from typing import Callable, Dict, Iterable, Iterator, List, NamedTuple
31from typing import Optional, Tuple
32
33_LOG = logging.getLogger(__name__)
34
35
36def _parse_args() -> argparse.Namespace:
37    """Parses arguments for this script, splitting out the command to run."""
38
39    parser = argparse.ArgumentParser(description=__doc__)
40    parser.add_argument('--gn-root',
41                        type=Path,
42                        required=True,
43                        help=('Path to the root of the GN tree; '
44                              'value of rebase_path("//")'))
45    parser.add_argument('--current-path',
46                        type=Path,
47                        required=True,
48                        help='Value of rebase_path(".")')
49    parser.add_argument('--default-toolchain',
50                        required=True,
51                        help='Value of default_toolchain')
52    parser.add_argument('--current-toolchain',
53                        required=True,
54                        help='Value of current_toolchain')
55    parser.add_argument('--directory',
56                        type=Path,
57                        help='Execute the command from this directory')
58    parser.add_argument('--module', help='Run this module instead of a script')
59    parser.add_argument('--env',
60                        action='append',
61                        help='Environment variables to set as NAME=VALUE')
62    parser.add_argument(
63        '--touch',
64        type=Path,
65        help='File to touch after the command is run',
66    )
67    parser.add_argument(
68        '--capture-output',
69        action='store_true',
70        help='Capture subcommand output; display only on error',
71    )
72    parser.add_argument(
73        'original_cmd',
74        nargs=argparse.REMAINDER,
75        help='Python script with arguments to run',
76    )
77    return parser.parse_args()
78
79
80class GnPaths(NamedTuple):
81    """The set of paths needed to resolve GN paths to filesystem paths."""
82    root: Path
83    build: Path
84    cwd: Path
85
86    # Toolchain label or '' if using the default toolchain
87    toolchain: str
88
89    def resolve(self, gn_path: str) -> Path:
90        """Resolves a GN path to a filesystem path."""
91        if gn_path.startswith('//'):
92            return self.root.joinpath(gn_path.lstrip('/')).resolve()
93
94        return self.cwd.joinpath(gn_path).resolve()
95
96    def resolve_paths(self, gn_paths: str, sep: str = ';') -> str:
97        """Resolves GN paths to filesystem paths in a delimited string."""
98        return sep.join(
99            str(self.resolve(path)) for path in gn_paths.split(sep))
100
101
102@dataclass(frozen=True)
103class Label:
104    """Represents a GN label."""
105    name: str
106    dir: Path
107    relative_dir: Path
108    toolchain: Optional['Label']
109    out_dir: Path
110    gen_dir: Path
111
112    def __init__(self, paths: GnPaths, label: str):
113        # Use this lambda to set attributes on this frozen dataclass.
114        set_attr = lambda attr, val: object.__setattr__(self, attr, val)
115
116        # Handle explicitly-specified toolchains
117        if label.endswith(')'):
118            label, toolchain = label[:-1].rsplit('(', 1)
119        else:
120            # Prevent infinite recursion for toolchains
121            toolchain = paths.toolchain if paths.toolchain != label else ''
122
123        set_attr('toolchain', Label(paths, toolchain) if toolchain else None)
124
125        # Split off the :target, if provided, or use the last part of the path.
126        try:
127            directory, name = label.rsplit(':', 1)
128        except ValueError:
129            directory, name = label, label.rsplit('/', 1)[-1]
130
131        set_attr('name', name)
132
133        # Resolve the directory to an absolute path
134        set_attr('dir', paths.resolve(directory))
135        set_attr('relative_dir', self.dir.relative_to(paths.root.resolve()))
136
137        set_attr(
138            'out_dir',
139            paths.build / self.toolchain_name() / 'obj' / self.relative_dir)
140        set_attr(
141            'gen_dir',
142            paths.build / self.toolchain_name() / 'gen' / self.relative_dir)
143
144    def gn_label(self) -> str:
145        label = f'//{self.relative_dir.as_posix()}:{self.name}'
146        return f'{label}({self.toolchain!r})' if self.toolchain else label
147
148    def toolchain_name(self) -> str:
149        return self.toolchain.name if self.toolchain else ''
150
151    def __repr__(self) -> str:
152        return self.gn_label()
153
154
155class _Artifact(NamedTuple):
156    path: Path
157    variables: Dict[str, str]
158
159
160# Matches a non-phony build statement.
161_GN_NINJA_BUILD_STATEMENT = re.compile(r'^build (.+):[ \n](?!phony\b)')
162
163# Extensions used for compilation artifacts.
164_MAIN_ARTIFACTS = '', '.elf', '.a', '.so', '.dylib', '.exe', '.lib', '.dll'
165
166
167def _get_artifact(build_dir: Path, entries: List[str]) -> _Artifact:
168    """Attempts to resolve which artifact to use if there are multiple.
169
170    Selects artifacts based on extension. This will not work if a toolchain
171    creates multiple compilation artifacts from one command (e.g. .a and .elf).
172    """
173    assert entries, "There should be at least one entry here!"
174
175    if len(entries) == 1:
176        return _Artifact(build_dir / entries[0], {})
177
178    filtered = [p for p in entries if Path(p).suffix in _MAIN_ARTIFACTS]
179
180    if len(filtered) == 1:
181        return _Artifact(build_dir / filtered[0], {})
182
183    raise ExpressionError(
184        f'Expected 1, but found {len(filtered)} artifacts, after filtering for '
185        f'extensions {", ".join(repr(e) for e in _MAIN_ARTIFACTS)}: {entries}')
186
187
188def _parse_build_artifacts(build_dir: Path, fd) -> Iterator[_Artifact]:
189    """Partially parses the build statements in a Ninja file."""
190    lines = iter(fd)
191
192    def next_line():
193        try:
194            return next(lines)
195        except StopIteration:
196            return None
197
198    # Serves as the parse state (only two states)
199    artifact: Optional[_Artifact] = None
200
201    line = next_line()
202
203    while line is not None:
204        if artifact:
205            if line.startswith('  '):  # build variable statements are indented
206                key, value = (a.strip() for a in line.split('=', 1))
207                artifact.variables[key] = value
208                line = next_line()
209            else:
210                yield artifact
211                artifact = None
212        else:
213            match = _GN_NINJA_BUILD_STATEMENT.match(line)
214            if match:
215                artifact = _get_artifact(build_dir, match.group(1).split())
216
217            line = next_line()
218
219    if artifact:
220        yield artifact
221
222
223def _search_target_ninja(ninja_file: Path, paths: GnPaths,
224                         target: Label) -> Tuple[Optional[Path], List[Path]]:
225    """Parses the main output file and object files from <target>.ninja."""
226
227    artifact: Optional[Path] = None
228    objects: List[Path] = []
229
230    _LOG.debug('Parsing target Ninja file %s for %s', ninja_file, target)
231
232    with ninja_file.open() as fd:
233        for path, variables in _parse_build_artifacts(paths.build, fd):
234            # Older GN used .stamp files when there is no build artifact.
235            if path.suffix == '.stamp':
236                continue
237
238            if variables:
239                assert not artifact, f'Multiple artifacts for {target}!'
240                artifact = path
241            else:
242                objects.append(path)
243
244    return artifact, objects
245
246
247def _search_toolchain_ninja(ninja_file: Path, paths: GnPaths,
248                            target: Label) -> Optional[Path]:
249    """Searches the toolchain.ninja file for outputs from the provided target.
250
251    Files created by an action appear in toolchain.ninja instead of in their own
252    <target>.ninja. If the specified target has a single output file in
253    toolchain.ninja, this function returns its path.
254    """
255
256    _LOG.debug('Searching toolchain Ninja file %s for %s', ninja_file, target)
257
258    # Older versions of GN used a .stamp file to signal completion of a target.
259    stamp_dir = target.out_dir.relative_to(paths.build).as_posix()
260    stamp_tool = f'{target.toolchain_name()}_stamp'
261    stamp_statement = f'build {stamp_dir}/{target.name}.stamp: {stamp_tool} '
262
263    # Newer GN uses a phony Ninja target to signal completion of a target.
264    phony_dir = Path(target.toolchain_name(), 'phony',
265                     target.relative_dir).as_posix()
266    phony_statement = f'build {phony_dir}/{target.name}: phony '
267
268    with ninja_file.open() as fd:
269        for line in fd:
270            for statement in (phony_statement, stamp_statement):
271                if line.startswith(statement):
272                    output_files = line[len(statement):].strip().split()
273                    if len(output_files) == 1:
274                        return paths.build / output_files[0]
275
276                    break
277
278    return None
279
280
281def _search_ninja_files(
282        paths: GnPaths,
283        target: Label) -> Tuple[bool, Optional[Path], List[Path]]:
284    ninja_file = target.out_dir / f'{target.name}.ninja'
285    if ninja_file.exists():
286        return (True, *_search_target_ninja(ninja_file, paths, target))
287
288    ninja_file = paths.build / target.toolchain_name() / 'toolchain.ninja'
289    if ninja_file.exists():
290        return True, _search_toolchain_ninja(ninja_file, paths, target), []
291
292    return False, None, []
293
294
295@dataclass(frozen=True)
296class TargetInfo:
297    """Provides information about a target parsed from a .ninja file."""
298
299    label: Label
300    generated: bool  # True if the Ninja files for this target were generated.
301    artifact: Optional[Path]
302    object_files: Tuple[Path]
303
304    def __init__(self, paths: GnPaths, target: str):
305        object.__setattr__(self, 'label', Label(paths, target))
306
307        generated, artifact, objects = _search_ninja_files(paths, self.label)
308
309        object.__setattr__(self, 'generated', generated)
310        object.__setattr__(self, 'artifact', artifact)
311        object.__setattr__(self, 'object_files', tuple(objects))
312
313    def __repr__(self) -> str:
314        return repr(self.label)
315
316
317class ExpressionError(Exception):
318    """An error occurred while parsing an expression."""
319
320
321class _ArgAction(enum.Enum):
322    APPEND = 0
323    OMIT = 1
324    EMIT_NEW = 2
325
326
327class _Expression:
328    def __init__(self, match: re.Match, ending: int):
329        self._match = match
330        self._ending = ending
331
332    @property
333    def string(self):
334        return self._match.string
335
336    @property
337    def end(self) -> int:
338        return self._ending + len(_ENDING)
339
340    def contents(self) -> str:
341        return self.string[self._match.end():self._ending]
342
343    def expression(self) -> str:
344        return self.string[self._match.start():self.end]
345
346
347_Actions = Iterator[Tuple[_ArgAction, str]]
348
349
350def _target_file(paths: GnPaths, expr: _Expression) -> _Actions:
351    target = TargetInfo(paths, expr.contents())
352
353    if not target.generated:
354        raise ExpressionError(f'Target {target} has not been generated by GN!')
355
356    if target.artifact is None:
357        raise ExpressionError(f'Target {target} has no output file!')
358
359    yield _ArgAction.APPEND, str(target.artifact)
360
361
362def _target_file_if_exists(paths: GnPaths, expr: _Expression) -> _Actions:
363    target = TargetInfo(paths, expr.contents())
364
365    if target.generated:
366        if target.artifact is None:
367            raise ExpressionError(f'Target {target} has no output file!')
368
369        if Path(target.artifact).exists():
370            yield _ArgAction.APPEND, str(target.artifact)
371            return
372
373    yield _ArgAction.OMIT, ''
374
375
376def _target_objects(paths: GnPaths, expr: _Expression) -> _Actions:
377    if expr.expression() != expr.string:
378        raise ExpressionError(
379            f'The expression "{expr.expression()}" in "{expr.string}" may '
380            'expand to multiple arguments, so it cannot be used alongside '
381            'other text or expressions')
382
383    target = TargetInfo(paths, expr.contents())
384    if not target.generated:
385        raise ExpressionError(f'Target {target} has not been generated by GN!')
386
387    for obj in target.object_files:
388        yield _ArgAction.EMIT_NEW, str(obj)
389
390
391# TODO(pwbug/347): Replace expressions with native GN features when possible.
392_FUNCTIONS: Dict['str', Callable[[GnPaths, _Expression], _Actions]] = {
393    'TARGET_FILE': _target_file,
394    'TARGET_FILE_IF_EXISTS': _target_file_if_exists,
395    'TARGET_OBJECTS': _target_objects,
396}
397
398_START_EXPRESSION = re.compile(fr'<({"|".join(_FUNCTIONS)})\(')
399_ENDING = ')>'
400
401
402def _expand_arguments(paths: GnPaths, string: str) -> _Actions:
403    pos = 0
404
405    for match in _START_EXPRESSION.finditer(string):
406        if pos != match.start():
407            yield _ArgAction.APPEND, string[pos:match.start()]
408
409        ending = string.find(_ENDING, match.end())
410        if ending == -1:
411            raise ExpressionError(f'Parse error: no terminating "{_ENDING}" '
412                                  f'was found for "{string[match.start():]}"')
413
414        expression = _Expression(match, ending)
415        yield from _FUNCTIONS[match.group(1)](paths, expression)
416
417        pos = expression.end
418
419    if pos < len(string):
420        yield _ArgAction.APPEND, string[pos:]
421
422
423def expand_expressions(paths: GnPaths, arg: str) -> Iterable[str]:
424    """Expands <FUNCTION(...)> expressions; yields zero or more arguments."""
425    if arg == '':
426        return ['']
427
428    expanded_args: List[List[str]] = [[]]
429
430    for action, piece in _expand_arguments(paths, arg):
431        if action is _ArgAction.OMIT:
432            return []
433
434        expanded_args[-1].append(piece)
435        if action is _ArgAction.EMIT_NEW:
436            expanded_args.append([])
437
438    return (''.join(arg) for arg in expanded_args if arg)
439
440
441def main(
442    gn_root: Path,
443    current_path: Path,
444    directory: Optional[Path],
445    original_cmd: List[str],
446    default_toolchain: str,
447    current_toolchain: str,
448    module: Optional[str],
449    env: Optional[List[str]],
450    capture_output: bool,
451    touch: Optional[Path],
452) -> int:
453    """Script entry point."""
454
455    if not original_cmd or original_cmd[0] != '--':
456        _LOG.error('%s requires a command to run', sys.argv[0])
457        return 1
458
459    # GN build scripts are executed from the root build directory.
460    root_build_dir = Path.cwd().resolve()
461
462    tool = current_toolchain if current_toolchain != default_toolchain else ''
463    paths = GnPaths(root=gn_root.resolve(),
464                    build=root_build_dir,
465                    cwd=current_path.resolve(),
466                    toolchain=tool)
467
468    command = [sys.executable]
469
470    if module is not None:
471        command += ['-m', module]
472
473    run_args: dict = dict(cwd=directory)
474
475    if env is not None:
476        environment = os.environ.copy()
477        environment.update((k, v) for k, v in (a.split('=', 1) for a in env))
478        run_args['env'] = environment
479
480    if capture_output:
481        # Combine stdout and stderr so that error messages are correctly
482        # interleaved with the rest of the output.
483        run_args['stdout'] = subprocess.PIPE
484        run_args['stderr'] = subprocess.STDOUT
485
486    try:
487        for arg in original_cmd[1:]:
488            command += expand_expressions(paths, arg)
489    except ExpressionError as err:
490        _LOG.error('%s: %s', sys.argv[0], err)
491        return 1
492
493    _LOG.debug('RUN %s', ' '.join(shlex.quote(arg) for arg in command))
494
495    completed_process = subprocess.run(command, **run_args)
496
497    if completed_process.returncode != 0:
498        _LOG.debug('Command failed; exit code: %d',
499                   completed_process.returncode)
500        if capture_output:
501            sys.stdout.buffer.write(completed_process.stdout)
502    elif touch:
503        # If a stamp file is provided and the command executed successfully,
504        # touch the stamp file to indicate a successful run of the command.
505        _LOG.debug('TOUCH %s', touch)
506        touch.touch()
507
508    return completed_process.returncode
509
510
511if __name__ == '__main__':
512    sys.exit(main(**vars(_parse_args())))
513