1#!/usr/bin/env python
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from Queue import Empty
7from contextlib import contextmanager
8from multiprocessing import Process, Queue
9import os
10import signal
11import time
12import traceback
13
14from . import command
15
16
17def setup_testing():
18  """For testing only: Use threading under the hood instead of multiprocessing
19  to make coverage work.
20  """
21  global Queue
22  global Process
23  del Queue
24  del Process
25  from Queue import Queue
26  from threading import Thread as Process
27  # Monkeypatch threading Queue to look like multiprocessing Queue.
28  Queue.cancel_join_thread = lambda self: None
29
30
31class NormalResult():
32  def __init__(self, result):
33    self.result = result
34    self.exception = None
35
36class ExceptionResult():
37  def __init__(self, exception):
38    self.exception = exception
39
40
41class MaybeResult():
42  def __init__(self, heartbeat, value):
43    self.heartbeat = heartbeat
44    self.value = value
45
46  @staticmethod
47  def create_heartbeat():
48    return MaybeResult(True, None)
49
50  @staticmethod
51  def create_result(value):
52    return MaybeResult(False, value)
53
54
55def Worker(fn, work_queue, done_queue,
56           process_context_fn=None, process_context_args=None):
57  """Worker to be run in a child process.
58  The worker stops when the poison pill "STOP" is reached.
59  """
60  try:
61    kwargs = {}
62    if process_context_fn and process_context_args is not None:
63      kwargs.update(process_context=process_context_fn(*process_context_args))
64    for args in iter(work_queue.get, "STOP"):
65      try:
66        done_queue.put(NormalResult(fn(*args, **kwargs)))
67      except command.AbortException:
68        # SIGINT, SIGTERM or internal hard timeout.
69        break
70      except Exception, e:
71        traceback.print_exc()
72        print(">>> EXCEPTION: %s" % e)
73        done_queue.put(ExceptionResult(e))
74    # When we reach here on normal tear down, all items have been pulled from
75    # the done_queue before and this should have no effect. On fast abort, it's
76    # possible that a fast worker left items on the done_queue in memory, which
77    # will never be pulled. This call purges those to avoid a deadlock.
78    done_queue.cancel_join_thread()
79  except KeyboardInterrupt:
80    assert False, 'Unreachable'
81
82
83@contextmanager
84def without_sig():
85  int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
86  term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
87  try:
88    yield
89  finally:
90    signal.signal(signal.SIGINT, int_handler)
91    signal.signal(signal.SIGTERM, term_handler)
92
93
94class Pool():
95  """Distributes tasks to a number of worker processes.
96  New tasks can be added dynamically even after the workers have been started.
97  Requirement: Tasks can only be added from the parent process, e.g. while
98  consuming the results generator."""
99
100  # Factor to calculate the maximum number of items in the work/done queue.
101  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
102  BUFFER_FACTOR = 4
103
104  def __init__(self, num_workers, heartbeat_timeout=1):
105    self.num_workers = num_workers
106    self.processes = []
107    self.terminated = False
108    self.abort_now = False
109
110    # Invariant: processing_count >= #work_queue + #done_queue. It is greater
111    # when a worker takes an item from the work_queue and before the result is
112    # submitted to the done_queue. It is equal when no worker is working,
113    # e.g. when all workers have finished, and when no results are processed.
114    # Count is only accessed by the parent process. Only the parent process is
115    # allowed to remove items from the done_queue and to add items to the
116    # work_queue.
117    self.processing_count = 0
118    self.heartbeat_timeout = heartbeat_timeout
119
120    # Disable sigint and sigterm to prevent subprocesses from capturing the
121    # signals.
122    with without_sig():
123      self.work_queue = Queue()
124      self.done_queue = Queue()
125
126  def imap_unordered(self, fn, gen,
127                     process_context_fn=None, process_context_args=None):
128    """Maps function "fn" to items in generator "gen" on the worker processes
129    in an arbitrary order. The items are expected to be lists of arguments to
130    the function. Returns a results iterator. A result value of type
131    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
132    that the runner is still waiting for the result to be computed, or it wraps
133    the real result.
134
135    Args:
136      process_context_fn: Function executed once by each worker. Expected to
137          return a process-context object. If present, this object is passed
138          as additional argument to each call to fn.
139      process_context_args: List of arguments for the invocation of
140          process_context_fn. All arguments will be pickled and sent beyond the
141          process boundary.
142    """
143    if self.terminated:
144      return
145    try:
146      internal_error = False
147      gen = iter(gen)
148      self.advance = self._advance_more
149
150      # Disable sigint and sigterm to prevent subprocesses from capturing the
151      # signals.
152      with without_sig():
153        for w in xrange(self.num_workers):
154          p = Process(target=Worker, args=(fn,
155                                          self.work_queue,
156                                          self.done_queue,
157                                          process_context_fn,
158                                          process_context_args))
159          p.start()
160          self.processes.append(p)
161
162      self.advance(gen)
163      while self.processing_count > 0:
164        while True:
165          try:
166            # Read from result queue in a responsive fashion. If available,
167            # this will return a normal result immediately or a heartbeat on
168            # heartbeat timeout (default 1 second).
169            result = self._get_result_from_queue()
170          except:
171            # TODO(machenbach): Handle a few known types of internal errors
172            # gracefully, e.g. missing test files.
173            internal_error = True
174            continue
175
176          if self.abort_now:
177            # SIGINT, SIGTERM or internal hard timeout.
178            return
179
180          yield result
181          break
182
183        self.advance(gen)
184    except KeyboardInterrupt:
185      assert False, 'Unreachable'
186    except Exception as e:
187      traceback.print_exc()
188      print(">>> EXCEPTION: %s" % e)
189    finally:
190      self._terminate()
191
192    if internal_error:
193      raise Exception("Internal error in a worker process.")
194
195  def _advance_more(self, gen):
196    while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
197      try:
198        self.work_queue.put(gen.next())
199        self.processing_count += 1
200      except StopIteration:
201        self.advance = self._advance_empty
202        break
203
204  def _advance_empty(self, gen):
205    pass
206
207  def add(self, args):
208    """Adds an item to the work queue. Can be called dynamically while
209    processing the results from imap_unordered."""
210    assert not self.terminated
211
212    self.work_queue.put(args)
213    self.processing_count += 1
214
215  def abort(self):
216    """Schedules abort on next queue read.
217
218    This is safe to call when handling SIGINT, SIGTERM or when an internal
219    hard timeout is reached.
220    """
221    self.abort_now = True
222
223  def _terminate(self):
224    """Terminates execution and cleans up the queues.
225
226    If abort() was called before termination, this also terminates the
227    subprocesses and doesn't wait for ongoing tests.
228    """
229    if self.terminated:
230      return
231    self.terminated = True
232
233    # Drain out work queue from tests
234    try:
235      while True:
236        self.work_queue.get(True, 0.1)
237    except Empty:
238      pass
239
240    # Make sure all processes stop
241    for _ in self.processes:
242      # During normal tear down the workers block on get(). Feed a poison pill
243      # per worker to make them stop.
244      self.work_queue.put("STOP")
245
246    if self.abort_now:
247      for p in self.processes:
248        os.kill(p.pid, signal.SIGTERM)
249
250    for p in self.processes:
251      p.join()
252
253    # Drain the queues to prevent stderr chatter when queues are garbage
254    # collected.
255    try:
256      while True: self.work_queue.get(False)
257    except:
258      pass
259    try:
260      while True: self.done_queue.get(False)
261    except:
262      pass
263
264  def _get_result_from_queue(self):
265    """Attempts to get the next result from the queue.
266
267    Returns: A wrapped result if one was available within heartbeat timeout,
268        a heartbeat result otherwise.
269    Raises:
270        Exception: If an exception occured when processing the task on the
271            worker side, it is reraised here.
272    """
273    while True:
274      try:
275        result = self.done_queue.get(timeout=self.heartbeat_timeout)
276        self.processing_count -= 1
277        if result.exception:
278          raise result.exception
279        return MaybeResult.create_result(result.result)
280      except Empty:
281        return MaybeResult.create_heartbeat()
282