1#!/usr/bin/env python 2# Copyright 2014 the V8 project authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from Queue import Empty 7from contextlib import contextmanager 8from multiprocessing import Process, Queue 9import os 10import signal 11import time 12import traceback 13 14from . import command 15 16 17def setup_testing(): 18 """For testing only: Use threading under the hood instead of multiprocessing 19 to make coverage work. 20 """ 21 global Queue 22 global Process 23 del Queue 24 del Process 25 from Queue import Queue 26 from threading import Thread as Process 27 # Monkeypatch threading Queue to look like multiprocessing Queue. 28 Queue.cancel_join_thread = lambda self: None 29 30 31class NormalResult(): 32 def __init__(self, result): 33 self.result = result 34 self.exception = None 35 36class ExceptionResult(): 37 def __init__(self, exception): 38 self.exception = exception 39 40 41class MaybeResult(): 42 def __init__(self, heartbeat, value): 43 self.heartbeat = heartbeat 44 self.value = value 45 46 @staticmethod 47 def create_heartbeat(): 48 return MaybeResult(True, None) 49 50 @staticmethod 51 def create_result(value): 52 return MaybeResult(False, value) 53 54 55def Worker(fn, work_queue, done_queue, 56 process_context_fn=None, process_context_args=None): 57 """Worker to be run in a child process. 58 The worker stops when the poison pill "STOP" is reached. 59 """ 60 try: 61 kwargs = {} 62 if process_context_fn and process_context_args is not None: 63 kwargs.update(process_context=process_context_fn(*process_context_args)) 64 for args in iter(work_queue.get, "STOP"): 65 try: 66 done_queue.put(NormalResult(fn(*args, **kwargs))) 67 except command.AbortException: 68 # SIGINT, SIGTERM or internal hard timeout. 69 break 70 except Exception, e: 71 traceback.print_exc() 72 print(">>> EXCEPTION: %s" % e) 73 done_queue.put(ExceptionResult(e)) 74 # When we reach here on normal tear down, all items have been pulled from 75 # the done_queue before and this should have no effect. On fast abort, it's 76 # possible that a fast worker left items on the done_queue in memory, which 77 # will never be pulled. This call purges those to avoid a deadlock. 78 done_queue.cancel_join_thread() 79 except KeyboardInterrupt: 80 assert False, 'Unreachable' 81 82 83@contextmanager 84def without_sig(): 85 int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) 86 term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) 87 try: 88 yield 89 finally: 90 signal.signal(signal.SIGINT, int_handler) 91 signal.signal(signal.SIGTERM, term_handler) 92 93 94class Pool(): 95 """Distributes tasks to a number of worker processes. 96 New tasks can be added dynamically even after the workers have been started. 97 Requirement: Tasks can only be added from the parent process, e.g. while 98 consuming the results generator.""" 99 100 # Factor to calculate the maximum number of items in the work/done queue. 101 # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 102 BUFFER_FACTOR = 4 103 104 def __init__(self, num_workers, heartbeat_timeout=1): 105 self.num_workers = num_workers 106 self.processes = [] 107 self.terminated = False 108 self.abort_now = False 109 110 # Invariant: processing_count >= #work_queue + #done_queue. It is greater 111 # when a worker takes an item from the work_queue and before the result is 112 # submitted to the done_queue. It is equal when no worker is working, 113 # e.g. when all workers have finished, and when no results are processed. 114 # Count is only accessed by the parent process. Only the parent process is 115 # allowed to remove items from the done_queue and to add items to the 116 # work_queue. 117 self.processing_count = 0 118 self.heartbeat_timeout = heartbeat_timeout 119 120 # Disable sigint and sigterm to prevent subprocesses from capturing the 121 # signals. 122 with without_sig(): 123 self.work_queue = Queue() 124 self.done_queue = Queue() 125 126 def imap_unordered(self, fn, gen, 127 process_context_fn=None, process_context_args=None): 128 """Maps function "fn" to items in generator "gen" on the worker processes 129 in an arbitrary order. The items are expected to be lists of arguments to 130 the function. Returns a results iterator. A result value of type 131 MaybeResult either indicates a heartbeat of the runner, i.e. indicating 132 that the runner is still waiting for the result to be computed, or it wraps 133 the real result. 134 135 Args: 136 process_context_fn: Function executed once by each worker. Expected to 137 return a process-context object. If present, this object is passed 138 as additional argument to each call to fn. 139 process_context_args: List of arguments for the invocation of 140 process_context_fn. All arguments will be pickled and sent beyond the 141 process boundary. 142 """ 143 if self.terminated: 144 return 145 try: 146 internal_error = False 147 gen = iter(gen) 148 self.advance = self._advance_more 149 150 # Disable sigint and sigterm to prevent subprocesses from capturing the 151 # signals. 152 with without_sig(): 153 for w in xrange(self.num_workers): 154 p = Process(target=Worker, args=(fn, 155 self.work_queue, 156 self.done_queue, 157 process_context_fn, 158 process_context_args)) 159 p.start() 160 self.processes.append(p) 161 162 self.advance(gen) 163 while self.processing_count > 0: 164 while True: 165 try: 166 # Read from result queue in a responsive fashion. If available, 167 # this will return a normal result immediately or a heartbeat on 168 # heartbeat timeout (default 1 second). 169 result = self._get_result_from_queue() 170 except: 171 # TODO(machenbach): Handle a few known types of internal errors 172 # gracefully, e.g. missing test files. 173 internal_error = True 174 continue 175 176 if self.abort_now: 177 # SIGINT, SIGTERM or internal hard timeout. 178 return 179 180 yield result 181 break 182 183 self.advance(gen) 184 except KeyboardInterrupt: 185 assert False, 'Unreachable' 186 except Exception as e: 187 traceback.print_exc() 188 print(">>> EXCEPTION: %s" % e) 189 finally: 190 self._terminate() 191 192 if internal_error: 193 raise Exception("Internal error in a worker process.") 194 195 def _advance_more(self, gen): 196 while self.processing_count < self.num_workers * self.BUFFER_FACTOR: 197 try: 198 self.work_queue.put(gen.next()) 199 self.processing_count += 1 200 except StopIteration: 201 self.advance = self._advance_empty 202 break 203 204 def _advance_empty(self, gen): 205 pass 206 207 def add(self, args): 208 """Adds an item to the work queue. Can be called dynamically while 209 processing the results from imap_unordered.""" 210 assert not self.terminated 211 212 self.work_queue.put(args) 213 self.processing_count += 1 214 215 def abort(self): 216 """Schedules abort on next queue read. 217 218 This is safe to call when handling SIGINT, SIGTERM or when an internal 219 hard timeout is reached. 220 """ 221 self.abort_now = True 222 223 def _terminate(self): 224 """Terminates execution and cleans up the queues. 225 226 If abort() was called before termination, this also terminates the 227 subprocesses and doesn't wait for ongoing tests. 228 """ 229 if self.terminated: 230 return 231 self.terminated = True 232 233 # Drain out work queue from tests 234 try: 235 while True: 236 self.work_queue.get(True, 0.1) 237 except Empty: 238 pass 239 240 # Make sure all processes stop 241 for _ in self.processes: 242 # During normal tear down the workers block on get(). Feed a poison pill 243 # per worker to make them stop. 244 self.work_queue.put("STOP") 245 246 if self.abort_now: 247 for p in self.processes: 248 os.kill(p.pid, signal.SIGTERM) 249 250 for p in self.processes: 251 p.join() 252 253 # Drain the queues to prevent stderr chatter when queues are garbage 254 # collected. 255 try: 256 while True: self.work_queue.get(False) 257 except: 258 pass 259 try: 260 while True: self.done_queue.get(False) 261 except: 262 pass 263 264 def _get_result_from_queue(self): 265 """Attempts to get the next result from the queue. 266 267 Returns: A wrapped result if one was available within heartbeat timeout, 268 a heartbeat result otherwise. 269 Raises: 270 Exception: If an exception occured when processing the task on the 271 worker side, it is reraised here. 272 """ 273 while True: 274 try: 275 result = self.done_queue.get(timeout=self.heartbeat_timeout) 276 self.processing_count -= 1 277 if result.exception: 278 raise result.exception 279 return MaybeResult.create_result(result.result) 280 except Empty: 281 return MaybeResult.create_heartbeat() 282