1import os 2import threading 3import time 4import traceback 5try: 6 import Queue as queue 7except ImportError: 8 import queue 9 10try: 11 import win32api 12except ImportError: 13 win32api = None 14 15try: 16 import multiprocessing 17except ImportError: 18 multiprocessing = None 19 20import lit.Test 21 22### 23# Test Execution Implementation 24 25class LockedValue(object): 26 def __init__(self, value): 27 self.lock = threading.Lock() 28 self._value = value 29 30 def _get_value(self): 31 self.lock.acquire() 32 try: 33 return self._value 34 finally: 35 self.lock.release() 36 37 def _set_value(self, value): 38 self.lock.acquire() 39 try: 40 self._value = value 41 finally: 42 self.lock.release() 43 44 value = property(_get_value, _set_value) 45 46class TestProvider(object): 47 def __init__(self, tests, num_jobs, queue_impl, canceled_flag): 48 self.canceled_flag = canceled_flag 49 50 # Create a shared queue to provide the test indices. 51 self.queue = queue_impl() 52 for i in range(len(tests)): 53 self.queue.put(i) 54 for i in range(num_jobs): 55 self.queue.put(None) 56 57 def cancel(self): 58 self.canceled_flag.value = 1 59 60 def get(self): 61 # Check if we are canceled. 62 if self.canceled_flag.value: 63 return None 64 65 # Otherwise take the next test. 66 return self.queue.get() 67 68class Tester(object): 69 def __init__(self, run_instance, provider, consumer): 70 self.run_instance = run_instance 71 self.provider = provider 72 self.consumer = consumer 73 74 def run(self): 75 while True: 76 item = self.provider.get() 77 if item is None: 78 break 79 self.run_test(item) 80 self.consumer.task_finished() 81 82 def run_test(self, test_index): 83 test = self.run_instance.tests[test_index] 84 try: 85 self.run_instance.execute_test(test) 86 except KeyboardInterrupt: 87 # This is a sad hack. Unfortunately subprocess goes 88 # bonkers with ctrl-c and we start forking merrily. 89 print('\nCtrl-C detected, goodbye.') 90 os.kill(0,9) 91 self.consumer.update(test_index, test) 92 93class ThreadResultsConsumer(object): 94 def __init__(self, display): 95 self.display = display 96 self.lock = threading.Lock() 97 98 def update(self, test_index, test): 99 self.lock.acquire() 100 try: 101 self.display.update(test) 102 finally: 103 self.lock.release() 104 105 def task_finished(self): 106 pass 107 108 def handle_results(self): 109 pass 110 111class MultiprocessResultsConsumer(object): 112 def __init__(self, run, display, num_jobs): 113 self.run = run 114 self.display = display 115 self.num_jobs = num_jobs 116 self.queue = multiprocessing.Queue() 117 118 def update(self, test_index, test): 119 # This method is called in the child processes, and communicates the 120 # results to the actual display implementation via an output queue. 121 self.queue.put((test_index, test.result)) 122 123 def task_finished(self): 124 # This method is called in the child processes, and communicates that 125 # individual tasks are complete. 126 self.queue.put(None) 127 128 def handle_results(self): 129 # This method is called in the parent, and consumes the results from the 130 # output queue and dispatches to the actual display. The method will 131 # complete after each of num_jobs tasks has signalled completion. 132 completed = 0 133 while completed != self.num_jobs: 134 # Wait for a result item. 135 item = self.queue.get() 136 if item is None: 137 completed += 1 138 continue 139 140 # Update the test result in the parent process. 141 index,result = item 142 test = self.run.tests[index] 143 test.result = result 144 145 self.display.update(test) 146 147def run_one_tester(run, provider, display): 148 tester = Tester(run, provider, display) 149 tester.run() 150 151### 152 153class Run(object): 154 """ 155 This class represents a concrete, configured testing run. 156 """ 157 158 def __init__(self, lit_config, tests): 159 self.lit_config = lit_config 160 self.tests = tests 161 162 def execute_test(self, test): 163 result = None 164 start_time = time.time() 165 try: 166 result = test.config.test_format.execute(test, self.lit_config) 167 168 # Support deprecated result from execute() which returned the result 169 # code and additional output as a tuple. 170 if isinstance(result, tuple): 171 code, output = result 172 result = lit.Test.Result(code, output) 173 elif not isinstance(result, lit.Test.Result): 174 raise ValueError("unexpected result from test execution") 175 except KeyboardInterrupt: 176 raise 177 except: 178 if self.lit_config.debug: 179 raise 180 output = 'Exception during script execution:\n' 181 output += traceback.format_exc() 182 output += '\n' 183 result = lit.Test.Result(lit.Test.UNRESOLVED, output) 184 result.elapsed = time.time() - start_time 185 186 test.setResult(result) 187 188 def execute_tests(self, display, jobs, max_time=None, 189 use_processes=False): 190 """ 191 execute_tests(display, jobs, [max_time]) 192 193 Execute each of the tests in the run, using up to jobs number of 194 parallel tasks, and inform the display of each individual result. The 195 provided tests should be a subset of the tests available in this run 196 object. 197 198 If max_time is non-None, it should be a time in seconds after which to 199 stop executing tests. 200 201 The display object will have its update method called with each test as 202 it is completed. The calls are guaranteed to be locked with respect to 203 one another, but are *not* guaranteed to be called on the same thread as 204 this method was invoked on. 205 206 Upon completion, each test in the run will have its result 207 computed. Tests which were not actually executed (for any reason) will 208 be given an UNRESOLVED result. 209 """ 210 211 # Choose the appropriate parallel execution implementation. 212 consumer = None 213 if jobs != 1 and use_processes and multiprocessing: 214 try: 215 task_impl = multiprocessing.Process 216 queue_impl = multiprocessing.Queue 217 canceled_flag = multiprocessing.Value('i', 0) 218 consumer = MultiprocessResultsConsumer(self, display, jobs) 219 except: 220 # multiprocessing fails to initialize with certain OpenBSD and 221 # FreeBSD Python versions: http://bugs.python.org/issue3770 222 # Unfortunately the error raised also varies by platform. 223 self.lit_config.note('failed to initialize multiprocessing') 224 consumer = None 225 if not consumer: 226 task_impl = threading.Thread 227 queue_impl = queue.Queue 228 canceled_flag = LockedValue(0) 229 consumer = ThreadResultsConsumer(display) 230 231 # Create the test provider. 232 provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag) 233 234 # Install a console-control signal handler on Windows. 235 if win32api is not None: 236 def console_ctrl_handler(type): 237 provider.cancel() 238 return True 239 win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) 240 241 # Install a timeout handler, if requested. 242 if max_time is not None: 243 def timeout_handler(): 244 provider.cancel() 245 timeout_timer = threading.Timer(max_time, timeout_handler) 246 timeout_timer.start() 247 248 # If not using multiple tasks, just run the tests directly. 249 if jobs == 1: 250 run_one_tester(self, provider, consumer) 251 else: 252 # Otherwise, execute the tests in parallel 253 self._execute_tests_in_parallel(task_impl, provider, consumer, jobs) 254 255 # Cancel the timeout handler. 256 if max_time is not None: 257 timeout_timer.cancel() 258 259 # Update results for any tests which weren't run. 260 for test in self.tests: 261 if test.result is None: 262 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) 263 264 def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): 265 # Start all of the tasks. 266 tasks = [task_impl(target=run_one_tester, 267 args=(self, provider, consumer)) 268 for i in range(jobs)] 269 for t in tasks: 270 t.start() 271 272 # Allow the consumer to handle results, if necessary. 273 consumer.handle_results() 274 275 # Wait for all the tasks to complete. 276 for t in tasks: 277 t.join() 278