1# Copyright 2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the 'License'); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an 'AS IS' BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import atexit 16import itertools 17import logging 18import os 19import pipes 20import pwd 21import select 22import subprocess 23import threading 24 25TEE_TO_LOGS = object() 26_popen_lock = threading.Lock() 27_logging_service = None 28_command_serial_number = itertools.count(1) 29 30_LOG_BUFSIZE = 4096 31_PIPE_CLOSED = -1 32 33 34class _LoggerProxy(object): 35 36 def __init__(self, logger): 37 self._logger = logger 38 39 def fileno(self): 40 """Returns the fileno of the logger pipe.""" 41 return self._logger._pipe[1] 42 43 def __del__(self): 44 self._logger.close() 45 46 47class _PipeLogger(object): 48 49 def __init__(self, level, prefix): 50 self._pipe = list(os.pipe()) 51 self._level = level 52 self._prefix = prefix 53 54 def close(self): 55 """Closes the logger.""" 56 if self._pipe[1] != _PIPE_CLOSED: 57 os.close(self._pipe[1]) 58 self._pipe[1] = _PIPE_CLOSED 59 60 61class _LoggingService(object): 62 63 def __init__(self): 64 # Python's list is thread safe 65 self._loggers = [] 66 67 # Change tuple to list so that we can change the value when 68 # closing the pipe. 69 self._pipe = list(os.pipe()) 70 self._thread = threading.Thread(target=self._service_run) 71 self._thread.daemon = True 72 self._thread.start() 73 74 def _service_run(self): 75 terminate_loop = False 76 while not terminate_loop: 77 rlist = [l._pipe[0] for l in self._loggers] 78 rlist.append(self._pipe[0]) 79 for r in select.select(rlist, [], [])[0]: 80 data = os.read(r, _LOG_BUFSIZE) 81 if r != self._pipe[0]: 82 self._output_logger_message(r, data) 83 elif len(data) == 0: 84 terminate_loop = True 85 # Release resources. 86 os.close(self._pipe[0]) 87 for logger in self._loggers: 88 os.close(logger._pipe[0]) 89 90 def _output_logger_message(self, r, data): 91 logger = next(l for l in self._loggers if l._pipe[0] == r) 92 93 if len(data) == 0: 94 os.close(logger._pipe[0]) 95 self._loggers.remove(logger) 96 return 97 98 for line in data.split('\n'): 99 logging.log(logger._level, '%s%s', logger._prefix, line) 100 101 def create_logger(self, level=logging.DEBUG, prefix=''): 102 """Creates a new logger. 103 104 Args: 105 level: The desired logging level. 106 prefix: The prefix to add to each log entry. 107 """ 108 logger = _PipeLogger(level=level, prefix=prefix) 109 self._loggers.append(logger) 110 os.write(self._pipe[1], '\0') 111 return _LoggerProxy(logger) 112 113 def shutdown(self): 114 """Shuts down the logger.""" 115 if self._pipe[1] != _PIPE_CLOSED: 116 os.close(self._pipe[1]) 117 self._pipe[1] = _PIPE_CLOSED 118 self._thread.join() 119 120 121def create_logger(level=logging.DEBUG, prefix=''): 122 """Creates a new logger. 123 124 Args: 125 level: The desired logging level. 126 prefix: The prefix to add to each log entry. 127 """ 128 global _logging_service 129 if _logging_service is None: 130 _logging_service = _LoggingService() 131 atexit.register(_logging_service.shutdown) 132 return _logging_service.create_logger(level=level, prefix=prefix) 133 134 135def wait_and_check_returncode(*popens): 136 """Waits for all the Popens and check the return code is 0. 137 138 Args: 139 popens: The Popens to be checked. 140 141 Raises: 142 RuntimeError if the return code is not 0. 143 """ 144 error_message = None 145 for p in popens: 146 if p.wait() != 0: 147 error_message = ('Command failed(%d, %d): %s' % (p.pid, p.returncode, p.command)) 148 logging.error(error_message) 149 if error_message: 150 raise RuntimeError(error_message) 151 152 153def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, run_as=None): 154 """Executes a child command and wait for it. 155 156 Args: 157 args: The command to be executed. 158 stdin: The executed program's standard input. 159 stdout: The executed program's standard output. 160 stderr: The executed program's standard error. 161 run_as: If not None, run the command as the given user. 162 163 Returns: 164 The output from standard output if 'stdout' is subprocess.PIPE. 165 166 Raises: 167 RuntimeException if the return code of the child command is not 0. 168 """ 169 ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr, run_as=run_as) 170 out = ps.communicate()[0] if stdout == subprocess.PIPE else None 171 wait_and_check_returncode(ps) 172 return out 173 174 175def _run_as(user): 176 """Changes the uid and gid of the running process to be that of the 177 given user and configures its supplementary groups. 178 179 Don't call this function directly, instead wrap it in a lambda and 180 pass that to the preexec_fn argument of subprocess.Popen. 181 182 Example usage: 183 subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos')) 184 185 Args: 186 user: The user to run as. 187 """ 188 pw = pwd.getpwnam(user) 189 os.setgid(pw.pw_gid) 190 os.initgroups(user, pw.pw_gid) 191 os.setuid(pw.pw_uid) 192 193 194def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None, run_as=None): 195 """Returns a Popen object just as subprocess.Popen does but with the 196 executed command stored in Popen.command. 197 198 Args: 199 args: The command to be executed. 200 stdin: The executed program's standard input. 201 stdout: The executed program's standard output. 202 stderr: The executed program's standard error. 203 env: The executed program's environment. 204 run_as: If not None, run the command as the given user. 205 """ 206 command_id = next(_command_serial_number) 207 prefix = '[%04d] ' % command_id 208 209 if stdout is TEE_TO_LOGS: 210 stdout = create_logger(level=logging.DEBUG, prefix=prefix) 211 if stderr is TEE_TO_LOGS: 212 stderr = create_logger(level=logging.ERROR, prefix=prefix) 213 214 command = ' '.join(pipes.quote(x) for x in args) 215 logging.info('%sRunning: %s', prefix, command) 216 217 preexec_fn = None 218 if run_as is not None: 219 preexec_fn = lambda: _run_as(run_as) 220 221 # The lock is required for http://crbug.com/323843. 222 with _popen_lock: 223 ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, env=env, preexec_fn=preexec_fn) 224 225 logging.info('%spid is %d', prefix, ps.pid) 226 ps.command_id = command_id 227 ps.command = command 228 return ps 229