1#!/usr/bin/python
2# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import atexit
7import itertools
8import logging
9import os
10import pipes
11import pwd
12import select
13import subprocess
14import threading
15
16from autotest_lib.client.common_lib.utils import TEE_TO_LOGS
17
18_popen_lock = threading.Lock()
19_logging_service = None
20_command_serial_number = itertools.count(1)
21
22_LOG_BUFSIZE = 4096
23_PIPE_CLOSED = -1
24
25class _LoggerProxy(object):
26
27    def __init__(self, logger):
28        self._logger = logger
29
30    def fileno(self):
31        """Returns the fileno of the logger pipe."""
32        return self._logger._pipe[1]
33
34    def __del__(self):
35        self._logger.close()
36
37
38class _PipeLogger(object):
39
40    def __init__(self, level, prefix):
41        self._pipe = list(os.pipe())
42        self._level = level
43        self._prefix = prefix
44
45    def close(self):
46        """Closes the logger."""
47        if self._pipe[1] != _PIPE_CLOSED:
48            os.close(self._pipe[1])
49            self._pipe[1] = _PIPE_CLOSED
50
51
52class _LoggingService(object):
53
54    def __init__(self):
55        # Python's list is thread safe
56        self._loggers = []
57
58        # Change tuple to list so that we can change the value when
59        # closing the pipe.
60        self._pipe = list(os.pipe())
61        self._thread = threading.Thread(target=self._service_run)
62        self._thread.daemon = True
63        self._thread.start()
64
65
66    def _service_run(self):
67        terminate_loop = False
68        while not terminate_loop:
69            rlist = [l._pipe[0] for l in self._loggers]
70            rlist.append(self._pipe[0])
71            for r in select.select(rlist, [], [])[0]:
72                data = os.read(r, _LOG_BUFSIZE)
73                if r != self._pipe[0]:
74                    self._output_logger_message(r, data)
75                elif len(data) == 0:
76                    terminate_loop = True
77        # Release resources.
78        os.close(self._pipe[0])
79        for logger in self._loggers:
80            os.close(logger._pipe[0])
81
82
83    def _output_logger_message(self, r, data):
84        logger = next(l for l in self._loggers if l._pipe[0] == r)
85
86        if len(data) == 0:
87            os.close(logger._pipe[0])
88            self._loggers.remove(logger)
89            return
90
91        for line in data.split('\n'):
92            logging.log(logger._level, '%s%s', logger._prefix, line)
93
94
95    def create_logger(self, level=logging.DEBUG, prefix=''):
96        """Creates a new logger.
97
98        @param level: the desired logging level
99        @param prefix: the prefix to add to each log entry
100        """
101        logger = _PipeLogger(level=level, prefix=prefix)
102        self._loggers.append(logger)
103        os.write(self._pipe[1], '\0')
104        return _LoggerProxy(logger)
105
106
107    def shutdown(self):
108        """Shuts down the logger."""
109        if self._pipe[1] != _PIPE_CLOSED:
110            os.close(self._pipe[1])
111            self._pipe[1] = _PIPE_CLOSED
112            self._thread.join()
113
114
115def create_logger(level=logging.DEBUG, prefix=''):
116    """Creates a new logger.
117
118    @param level: the desired logging level
119    @param prefix: the prefix to add to each log entry
120    """
121    global _logging_service
122    if _logging_service is None:
123        _logging_service = _LoggingService()
124        atexit.register(_logging_service.shutdown)
125    return _logging_service.create_logger(level=level, prefix=prefix)
126
127
128def kill_or_log_returncode(*popens):
129    """Kills all the processes of the given Popens or logs the return code.
130
131    @param popens: The Popens to be killed.
132    """
133    for p in popens:
134        if p.poll() is None:
135            try:
136                p.kill()
137            except Exception as e:
138                logging.warning('failed to kill %d, %s', p.pid, e)
139        else:
140            logging.warning('command exit (pid=%d, rc=%d): %s',
141                            p.pid, p.returncode, p.command)
142
143
144def wait_and_check_returncode(*popens):
145    """Wait for all the Popens and check the return code is 0.
146
147    If the return code is not 0, it raises an RuntimeError.
148
149    @param popens: The Popens to be checked.
150    """
151    error_message = None
152    for p in popens:
153        if p.wait() != 0:
154            error_message = ('Command failed(%d, %d): %s' %
155                             (p.pid, p.returncode, p.command))
156            logging.error(error_message)
157    if error_message:
158        raise RuntimeError(error_message)
159
160
161def execute(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS,
162            run_as=None):
163    """Executes a child command and wait for it.
164
165    Returns the output from standard output if 'stdout' is subprocess.PIPE.
166    Raises RuntimeException if the return code of the child command is not 0.
167
168    @param args: the command to be executed
169    @param stdin: the executed program's standard input
170    @param stdout: the executed program's standard output
171    @param stderr: the executed program's standard error
172    @param run_as: if not None, run the command as the given user
173    """
174    ps = popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
175               run_as=run_as)
176    out = ps.communicate()[0] if stdout == subprocess.PIPE else None
177    wait_and_check_returncode(ps)
178    return out
179
180
181def _run_as(user):
182    """Changes the uid and gid of the running process to be that of the
183    given user and configures its supplementary groups.
184
185    Don't call this function directly, instead wrap it in a lambda and
186    pass that to the preexec_fn argument of subprocess.Popen.
187
188    Example usage:
189    subprocess.Popen(..., preexec_fn=lambda: _run_as('chronos'))
190
191    @param user: the user to run as
192    """
193    pw = pwd.getpwnam(user)
194    os.setgid(pw.pw_gid)
195    os.initgroups(user, pw.pw_gid)
196    os.setuid(pw.pw_uid)
197
198
199def popen(args, stdin=None, stdout=TEE_TO_LOGS, stderr=TEE_TO_LOGS, env=None,
200          run_as=None):
201    """Returns a Popen object just as subprocess.Popen does but with the
202    executed command stored in Popen.command.
203
204    @param args: the command to be executed
205    @param stdin: the executed program's standard input
206    @param stdout: the executed program's standard output
207    @param stderr: the executed program's standard error
208    @param env: the executed program's environment
209    @param run_as: if not None, run the command as the given user
210    """
211    command_id = _command_serial_number.next()
212    prefix = '[%04d] ' % command_id
213
214    if stdout is TEE_TO_LOGS:
215        stdout = create_logger(level=logging.DEBUG, prefix=prefix)
216    if stderr is TEE_TO_LOGS:
217        stderr = create_logger(level=logging.ERROR, prefix=prefix)
218
219    command = ' '.join(pipes.quote(x) for x in args)
220    logging.info('%sRunning: %s', prefix, command)
221
222    preexec_fn = None
223    if run_as is not None:
224        preexec_fn = lambda: _run_as(run_as)
225
226    # The lock is required for http://crbug.com/323843.
227    with _popen_lock:
228        ps = subprocess.Popen(args, stdin=stdin, stdout=stdout, stderr=stderr,
229                              env=env, preexec_fn=preexec_fn)
230    logging.info('%spid is %d', prefix, ps.pid)
231    ps.command_id = command_id
232    ps.command = command
233    return ps
234