1import os
2import threading
3import time
4import traceback
5try:
6    import Queue as queue
7except ImportError:
8    import queue
9
10try:
11    import win32api
12except ImportError:
13    win32api = None
14
15try:
16    import multiprocessing
17except ImportError:
18    multiprocessing = None
19
20import lit.Test
21
22###
23# Test Execution Implementation
24
25class LockedValue(object):
26    def __init__(self, value):
27        self.lock = threading.Lock()
28        self._value = value
29
30    def _get_value(self):
31        self.lock.acquire()
32        try:
33            return self._value
34        finally:
35            self.lock.release()
36
37    def _set_value(self, value):
38        self.lock.acquire()
39        try:
40            self._value = value
41        finally:
42            self.lock.release()
43
44    value = property(_get_value, _set_value)
45
46class TestProvider(object):
47    def __init__(self, queue_impl, canceled_flag):
48        self.canceled_flag = canceled_flag
49
50        # Create a shared queue to provide the test indices.
51        self.queue = queue_impl()
52
53    def queue_tests(self, tests, num_jobs):
54        for i in range(len(tests)):
55            self.queue.put(i)
56        for i in range(num_jobs):
57            self.queue.put(None)
58
59    def cancel(self):
60        self.canceled_flag.value = 1
61
62    def get(self):
63        # Check if we are canceled.
64        if self.canceled_flag.value:
65          return None
66
67        # Otherwise take the next test.
68        return self.queue.get()
69
70class Tester(object):
71    def __init__(self, run_instance, provider, consumer):
72        self.run_instance = run_instance
73        self.provider = provider
74        self.consumer = consumer
75
76    def run(self):
77        while True:
78            item = self.provider.get()
79            if item is None:
80                break
81            self.run_test(item)
82        self.consumer.task_finished()
83
84    def run_test(self, test_index):
85        test = self.run_instance.tests[test_index]
86        try:
87            self.run_instance.execute_test(test)
88        except KeyboardInterrupt:
89            # This is a sad hack. Unfortunately subprocess goes
90            # bonkers with ctrl-c and we start forking merrily.
91            print('\nCtrl-C detected, goodbye.')
92            os.kill(0,9)
93        self.consumer.update(test_index, test)
94
95class ThreadResultsConsumer(object):
96    def __init__(self, display):
97        self.display = display
98        self.lock = threading.Lock()
99
100    def update(self, test_index, test):
101        self.lock.acquire()
102        try:
103            self.display.update(test)
104        finally:
105            self.lock.release()
106
107    def task_finished(self):
108        pass
109
110    def handle_results(self):
111        pass
112
113class MultiprocessResultsConsumer(object):
114    def __init__(self, run, display, num_jobs):
115        self.run = run
116        self.display = display
117        self.num_jobs = num_jobs
118        self.queue = multiprocessing.Queue()
119
120    def update(self, test_index, test):
121        # This method is called in the child processes, and communicates the
122        # results to the actual display implementation via an output queue.
123        self.queue.put((test_index, test.result))
124
125    def task_finished(self):
126        # This method is called in the child processes, and communicates that
127        # individual tasks are complete.
128        self.queue.put(None)
129
130    def handle_results(self):
131        # This method is called in the parent, and consumes the results from the
132        # output queue and dispatches to the actual display. The method will
133        # complete after each of num_jobs tasks has signalled completion.
134        completed = 0
135        while completed != self.num_jobs:
136            # Wait for a result item.
137            item = self.queue.get()
138            if item is None:
139                completed += 1
140                continue
141
142            # Update the test result in the parent process.
143            index,result = item
144            test = self.run.tests[index]
145            test.result = result
146
147            self.display.update(test)
148
149def run_one_tester(run, provider, display):
150    tester = Tester(run, provider, display)
151    tester.run()
152
153###
154
155class Run(object):
156    """
157    This class represents a concrete, configured testing run.
158    """
159
160    def __init__(self, lit_config, tests):
161        self.lit_config = lit_config
162        self.tests = tests
163
164    def execute_test(self, test):
165        result = None
166        start_time = time.time()
167        try:
168            result = test.config.test_format.execute(test, self.lit_config)
169
170            # Support deprecated result from execute() which returned the result
171            # code and additional output as a tuple.
172            if isinstance(result, tuple):
173                code, output = result
174                result = lit.Test.Result(code, output)
175            elif not isinstance(result, lit.Test.Result):
176                raise ValueError("unexpected result from test execution")
177        except KeyboardInterrupt:
178            raise
179        except:
180            if self.lit_config.debug:
181                raise
182            output = 'Exception during script execution:\n'
183            output += traceback.format_exc()
184            output += '\n'
185            result = lit.Test.Result(lit.Test.UNRESOLVED, output)
186        result.elapsed = time.time() - start_time
187
188        test.setResult(result)
189
190    def execute_tests(self, display, jobs, max_time=None,
191                      use_processes=False):
192        """
193        execute_tests(display, jobs, [max_time])
194
195        Execute each of the tests in the run, using up to jobs number of
196        parallel tasks, and inform the display of each individual result. The
197        provided tests should be a subset of the tests available in this run
198        object.
199
200        If max_time is non-None, it should be a time in seconds after which to
201        stop executing tests.
202
203        The display object will have its update method called with each test as
204        it is completed. The calls are guaranteed to be locked with respect to
205        one another, but are *not* guaranteed to be called on the same thread as
206        this method was invoked on.
207
208        Upon completion, each test in the run will have its result
209        computed. Tests which were not actually executed (for any reason) will
210        be given an UNRESOLVED result.
211        """
212
213        # Choose the appropriate parallel execution implementation.
214        consumer = None
215        if jobs != 1 and use_processes and multiprocessing:
216            try:
217                task_impl = multiprocessing.Process
218                queue_impl = multiprocessing.Queue
219                canceled_flag =  multiprocessing.Value('i', 0)
220                consumer = MultiprocessResultsConsumer(self, display, jobs)
221            except:
222                # multiprocessing fails to initialize with certain OpenBSD and
223                # FreeBSD Python versions: http://bugs.python.org/issue3770
224                # Unfortunately the error raised also varies by platform.
225                self.lit_config.note('failed to initialize multiprocessing')
226                consumer = None
227        if not consumer:
228            task_impl = threading.Thread
229            queue_impl = queue.Queue
230            canceled_flag = LockedValue(0)
231            consumer = ThreadResultsConsumer(display)
232
233        # Create the test provider.
234        provider = TestProvider(queue_impl, canceled_flag)
235
236        # Queue the tests outside the main thread because we can't guarantee
237        # that we can put() all the tests without blocking:
238        # https://docs.python.org/2/library/multiprocessing.html
239        # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
240        # without taking any out.
241        queuer = task_impl(target=provider.queue_tests, args=(self.tests, jobs))
242        queuer.start()
243
244        # Install a console-control signal handler on Windows.
245        if win32api is not None:
246            def console_ctrl_handler(type):
247                provider.cancel()
248                return True
249            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
250
251        # Install a timeout handler, if requested.
252        if max_time is not None:
253            def timeout_handler():
254                provider.cancel()
255            timeout_timer = threading.Timer(max_time, timeout_handler)
256            timeout_timer.start()
257
258        # If not using multiple tasks, just run the tests directly.
259        if jobs == 1:
260            run_one_tester(self, provider, consumer)
261        else:
262            # Otherwise, execute the tests in parallel
263            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
264
265        queuer.join()
266
267        # Cancel the timeout handler.
268        if max_time is not None:
269            timeout_timer.cancel()
270
271        # Update results for any tests which weren't run.
272        for test in self.tests:
273            if test.result is None:
274                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
275
276    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
277        # Start all of the tasks.
278        tasks = [task_impl(target=run_one_tester,
279                           args=(self, provider, consumer))
280                 for i in range(jobs)]
281        for t in tasks:
282            t.start()
283
284        # Allow the consumer to handle results, if necessary.
285        consumer.handle_results()
286
287        # Wait for all the tasks to complete.
288        for t in tasks:
289            t.join()
290