1# Copyright 2011 Google Inc. All Rights Reserved.
2#
3"""Classes that help running commands in a subshell.
4
5Commands can be run locally, or remotly using SSH connection.  You may log the
6output of a command to a terminal or a file, or any other destination.
7"""
8
9__author__ = 'kbaclawski@google.com (Krystian Baclawski)'
10
11import fcntl
12import logging
13import os
14import select
15import subprocess
16import time
17
18from automation.common import logger
19
20
21class CommandExecuter(object):
22  DRY_RUN = False
23
24  def __init__(self, dry_run=False):
25    self._logger = logging.getLogger(self.__class__.__name__)
26    self._dry_run = dry_run or self.DRY_RUN
27
28  @classmethod
29  def Configure(cls, dry_run):
30    cls.DRY_RUN = dry_run
31
32  def RunCommand(self,
33                 cmd,
34                 machine=None,
35                 username=None,
36                 command_terminator=None,
37                 command_timeout=None):
38    cmd = str(cmd)
39
40    if self._dry_run:
41      return 0
42
43    if not command_terminator:
44      command_terminator = CommandTerminator()
45
46    if command_terminator.IsTerminated():
47      self._logger.warning('Command has been already terminated!')
48      return 1
49
50    # Rewrite command for remote execution.
51    if machine:
52      if username:
53        login = '%s@%s' % (username, machine)
54      else:
55        login = machine
56
57      self._logger.debug("Executing '%s' on %s.", cmd, login)
58
59      # FIXME(asharif): Remove this after crosbug.com/33007 is fixed.
60      cmd = "ssh -t -t %s -- '%s'" % (login, cmd)
61    else:
62      self._logger.debug("Executing: '%s'.", cmd)
63
64    child = self._SpawnProcess(cmd, command_terminator, command_timeout)
65
66    self._logger.debug('{PID: %d} Finished with %d code.', child.pid,
67                       child.returncode)
68
69    return child.returncode
70
71  def _Terminate(self, child, command_timeout, wait_timeout=10):
72    """Gracefully shutdown the child by sending SIGTERM."""
73
74    if command_timeout:
75      self._logger.warning('{PID: %d} Timeout of %s seconds reached since '
76                           'process started.', child.pid, command_timeout)
77
78    self._logger.warning('{PID: %d} Terminating child.', child.pid)
79
80    try:
81      child.terminate()
82    except OSError:
83      pass
84
85    wait_started = time.time()
86
87    while not child.poll():
88      if time.time() - wait_started >= wait_timeout:
89        break
90      time.sleep(0.1)
91
92    return child.poll()
93
94  def _Kill(self, child):
95    """Kill the child with immediate result."""
96    self._logger.warning('{PID: %d} Process still alive.', child.pid)
97    self._logger.warning('{PID: %d} Killing child.', child.pid)
98    child.kill()
99    child.wait()
100
101  def _SpawnProcess(self, cmd, command_terminator, command_timeout):
102    # Create a child process executing provided command.
103    child = subprocess.Popen(cmd,
104                             stdout=subprocess.PIPE,
105                             stderr=subprocess.PIPE,
106                             stdin=subprocess.PIPE,
107                             shell=True)
108
109    # Close stdin so the child won't be able to block on read.
110    child.stdin.close()
111
112    started_time = time.time()
113
114    # Watch for data on process stdout, stderr.
115    pipes = [child.stdout, child.stderr]
116
117    # Put pipes into non-blocking mode.
118    for pipe in pipes:
119      fd = pipe.fileno()
120      fd_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
121      fcntl.fcntl(fd, fcntl.F_SETFL, fd_flags | os.O_NONBLOCK)
122
123    already_terminated = False
124
125    while pipes:
126      # Maybe timeout reached?
127      if command_timeout and time.time() - started_time > command_timeout:
128        command_terminator.Terminate()
129
130      # Check if terminate request was received.
131      if command_terminator.IsTerminated() and not already_terminated:
132        if not self._Terminate(child, command_timeout):
133          self._Kill(child)
134        # Don't exit the loop immediately. Firstly try to read everything that
135        # was left on stdout and stderr.
136        already_terminated = True
137
138      # Wait for pipes to become ready.
139      ready_pipes, _, _ = select.select(pipes, [], [], 0.1)
140
141      # Handle file descriptors ready to be read.
142      for pipe in ready_pipes:
143        fd = pipe.fileno()
144
145        data = os.read(fd, 4096)
146
147        # check for end-of-file
148        if not data:
149          pipes.remove(pipe)
150          continue
151
152        # read all data that's available
153        while data:
154          if pipe == child.stdout:
155            self.DataReceivedOnOutput(data)
156          elif pipe == child.stderr:
157            self.DataReceivedOnError(data)
158
159          try:
160            data = os.read(fd, 4096)
161          except OSError:
162            # terminate loop if EWOULDBLOCK (EAGAIN) is received
163            data = ''
164
165    if not already_terminated:
166      self._logger.debug('Waiting for command to finish.')
167      child.wait()
168
169    return child
170
171  def DataReceivedOnOutput(self, data):
172    """Invoked when the child process wrote data to stdout."""
173    sys.stdout.write(data)
174
175  def DataReceivedOnError(self, data):
176    """Invoked when the child process wrote data to stderr."""
177    sys.stderr.write(data)
178
179
180class LoggingCommandExecuter(CommandExecuter):
181
182  def __init__(self, *args, **kwargs):
183    super(LoggingCommandExecuter, self).__init__(*args, **kwargs)
184
185    # Create a logger for command's stdout/stderr streams.
186    self._output = logging.getLogger('%s.%s' % (self._logger.name, 'Output'))
187
188  def OpenLog(self, log_path):
189    """The messages are going to be saved to gzip compressed file."""
190    formatter = logging.Formatter('%(asctime)s %(prefix)s: %(message)s',
191                                  '%Y-%m-%d %H:%M:%S')
192    handler = logger.CompressedFileHandler(log_path, delay=True)
193    handler.setFormatter(formatter)
194    self._output.addHandler(handler)
195
196    # Set a flag to prevent log records from being propagated up the logger
197    # hierarchy tree.  We don't want for command output messages to appear in
198    # the main log.
199    self._output.propagate = 0
200
201  def CloseLog(self):
202    """Remove handlers and reattach the logger to its parent."""
203    for handler in list(self._output.handlers):
204      self._output.removeHandler(handler)
205      handler.flush()
206      handler.close()
207
208    self._output.propagate = 1
209
210  def DataReceivedOnOutput(self, data):
211    """Invoked when the child process wrote data to stdout."""
212    for line in data.splitlines():
213      self._output.info(line, extra={'prefix': 'STDOUT'})
214
215  def DataReceivedOnError(self, data):
216    """Invoked when the child process wrote data to stderr."""
217    for line in data.splitlines():
218      self._output.warning(line, extra={'prefix': 'STDERR'})
219
220
221class CommandTerminator(object):
222
223  def __init__(self):
224    self.terminated = False
225
226  def Terminate(self):
227    self.terminated = True
228
229  def IsTerminated(self):
230    return self.terminated
231