1# Copyright 2024 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the 'License');
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an 'AS IS' BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import atexit
16import itertools
17import logging
18import os
19import pipes
20import pwd
21import select
22import subprocess
23import threading
24
25TEE_TO_LOGS = object()
26_popen_lock = threading.Lock()
27_logging_service = None
28_command_serial_number = itertools.count(1)
29
30_LOG_BUFSIZE = 4096
31_PIPE_CLOSED = -1
32
33
34class _LoggerProxy(object):
35
36    def __init__(self, logger):
37        self._logger = logger
38
39    def fileno(self):
40        """Returns the fileno of the logger pipe."""
41        return self._logger._pipe[1]
42
43    def __del__(self):
44        self._logger.close()
45
46
47class _PipeLogger(object):
48
49    def __init__(self, level, prefix):
50        self._pipe = list(os.pipe())
51        self._level = level
52        self._prefix = prefix
53
54    def close(self):
55        """Closes the logger."""
56        if self._pipe[1] != _PIPE_CLOSED:
57            os.close(self._pipe[1])
58            self._pipe[1] = _PIPE_CLOSED
59
60
61class _LoggingService(object):
62
63    def __init__(self):
64        # Python's list is thread safe
65        self._loggers = []
66
67        # Change tuple to list so that we can change the value when
68        # closing the pipe.
69        self._pipe = list(os.pipe())
70        self._thread = threading.Thread(target=self._service_run)
71        self._thread.daemon = True
72        self._thread.start()
73
74    def _service_run(self):
75        terminate_loop = False
76        while not terminate_loop:
77            rlist = [l._pipe[0] for l in self._loggers]
78            rlist.append(self._pipe[0])
79            for r in select.select(rlist, [], [])[0]:
80                data = os.read(r, _LOG_BUFSIZE)
81                if r != self._pipe[0]:
82                    self._output_logger_message(r, data)
83                elif len(data) == 0:
84                    terminate_loop = True
85        # Release resources.
86        os.close(self._pipe[0])
87        for logger in self._loggers:
88            os.close(logger._pipe[0])
89
90    def _output_logger_message(self, r, data):
91        logger = next(l for l in self._loggers if l._pipe[0] == r)
92
93        if len(data) == 0:
94            os.close(logger._pipe[0])
95            self._loggers.remove(logger)
96            return
97
98        for line in data.split('\n'):
99            logging.log(logger._level, '%s%s', logger._prefix, line)
100
101    def create_logger(self, level=logging.DEBUG, prefix=''):
102        """Creates a new logger.
103
104        Args:
105            level: The desired logging level.
106            prefix: The prefix to add to each log entry.
107        """
108        logger = _PipeLogger(level=level, prefix=prefix)
109        self._loggers.append(logger)
110        os.write(self._pipe[1], '\0')
111        return _LoggerProxy(logger)
112
113    def shutdown(self):
114        """Shuts down the logger."""
115        if self._pipe[1] != _PIPE_CLOSED:
116            os.close(self._pipe[1])
117            self._pipe[1] = _PIPE_CLOSED
118            self._thread.join()
119
120
121def create_logger(level=logging.DEBUG, prefix=''):
122    """Creates a new logger.
123
124    Args:
125        level: The desired logging level.
126        prefix: The prefix to add to each log entry.
127    """
128    global _logging_service
129    if _logging_service is None:
130        _logging_service = _LoggingService()
131        atexit.register(_logging_service.shutdown)
132    return _logging_service.create_logger(level=level, prefix=prefix)
133
134
135def wait_and_check_returncode(*popens):
136    """Waits for all the Popens and check the return code is 0.
137
138    Args:
139        popens: The Popens to be checked.
140
141    Raises:
142        RuntimeError if the return code is not 0.
143    """
144    error_message = None
145    for p in popens:
146        if p.wait() != 0:
147            error_message = ('Command failed(%d, %d): %s' % (p.pid, p.returncode, p.command))
148            logging.error(error_message)
149    if error_message:
150        raise RuntimeError(error_message)
151
152
153def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, run_as=None):
154    """Executes a child command and wait for it.
155
156    Args:
157        args: The command to be executed.
158        stdin: The executed program's standard input.
159        stdout: The executed program's standard output.
160        stderr: The executed program's standard error.
161        run_as: If not None, run the command as the given user.
162
163    Returns:
164        The output from standard output if 'stdout' is subprocess.PIPE.
165
166    Raises:
167        RuntimeException if the return code of the child command is not 0.
168    """
169    ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr, run_as=run_as)
170    out = ps.communicate()[0] if stdout == subprocess.PIPE else None
171    wait_and_check_returncode(ps)
172    return out
173
174
175def _run_as(user):
176    """Changes the uid and gid of the running process to be that of the
177    given user and configures its supplementary groups.
178
179    Don't call this function directly, instead wrap it in a lambda and
180    pass that to the preexec_fn argument of subprocess.Popen.
181
182    Example usage:
183    subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos'))
184
185    Args:
186        user: The user to run as.
187    """
188    pw = pwd.getpwnam(user)
189    os.setgid(pw.pw_gid)
190    os.initgroups(user, pw.pw_gid)
191    os.setuid(pw.pw_uid)
192
193
194def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None, run_as=None):
195    """Returns a Popen object just as subprocess.Popen does but with the
196    executed command stored in Popen.command.
197
198    Args:
199        args: The command to be executed.
200        stdin: The executed program's standard input.
201        stdout: The executed program's standard output.
202        stderr: The executed program's standard error.
203        env: The executed program's environment.
204        run_as: If not None, run the command as the given user.
205    """
206    command_id = next(_command_serial_number)
207    prefix = '[%04d] ' % command_id
208
209    if stdout is TEE_TO_LOGS:
210        stdout = create_logger(level=logging.DEBUG, prefix=prefix)
211    if stderr is TEE_TO_LOGS:
212        stderr = create_logger(level=logging.ERROR, prefix=prefix)
213
214    command = ' '.join(pipes.quote(x) for x in args)
215    logging.info('%sRunning: %s', prefix, command)
216
217    preexec_fn = None
218    if run_as is not None:
219        preexec_fn = lambda: _run_as(run_as)
220
221    # The lock is required for http://crbug.com/323843.
222    with _popen_lock:
223        ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env, preexec_fn=preexec_fn)
224
225    logging.info('%spid is %d', prefix, ps.pid)
226    ps.command_id = command_id
227    ps.command = command
228    return ps
229