1import os
2import threading
3import time
4import traceback
5try:
6    import Queue as queue
7except ImportError:
8    import queue
9
10try:
11    import win32api
12except ImportError:
13    win32api = None
14
15try:
16    import multiprocessing
17except ImportError:
18    multiprocessing = None
19
20import lit.Test
21
22###
23# Test Execution Implementation
24
25class LockedValue(object):
26    def __init__(self, value):
27        self.lock = threading.Lock()
28        self._value = value
29
30    def _get_value(self):
31        self.lock.acquire()
32        try:
33            return self._value
34        finally:
35            self.lock.release()
36
37    def _set_value(self, value):
38        self.lock.acquire()
39        try:
40            self._value = value
41        finally:
42            self.lock.release()
43
44    value = property(_get_value, _set_value)
45
46class TestProvider(object):
47    def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
48        self.canceled_flag = canceled_flag
49
50        # Create a shared queue to provide the test indices.
51        self.queue = queue_impl()
52        for i in range(len(tests)):
53            self.queue.put(i)
54        for i in range(num_jobs):
55            self.queue.put(None)
56
57    def cancel(self):
58        self.canceled_flag.value = 1
59
60    def get(self):
61        # Check if we are canceled.
62        if self.canceled_flag.value:
63          return None
64
65        # Otherwise take the next test.
66        return self.queue.get()
67
68class Tester(object):
69    def __init__(self, run_instance, provider, consumer):
70        self.run_instance = run_instance
71        self.provider = provider
72        self.consumer = consumer
73
74    def run(self):
75        while True:
76            item = self.provider.get()
77            if item is None:
78                break
79            self.run_test(item)
80        self.consumer.task_finished()
81
82    def run_test(self, test_index):
83        test = self.run_instance.tests[test_index]
84        try:
85            self.run_instance.execute_test(test)
86        except KeyboardInterrupt:
87            # This is a sad hack. Unfortunately subprocess goes
88            # bonkers with ctrl-c and we start forking merrily.
89            print('\nCtrl-C detected, goodbye.')
90            os.kill(0,9)
91        self.consumer.update(test_index, test)
92
93class ThreadResultsConsumer(object):
94    def __init__(self, display):
95        self.display = display
96        self.lock = threading.Lock()
97
98    def update(self, test_index, test):
99        self.lock.acquire()
100        try:
101            self.display.update(test)
102        finally:
103            self.lock.release()
104
105    def task_finished(self):
106        pass
107
108    def handle_results(self):
109        pass
110
111class MultiprocessResultsConsumer(object):
112    def __init__(self, run, display, num_jobs):
113        self.run = run
114        self.display = display
115        self.num_jobs = num_jobs
116        self.queue = multiprocessing.Queue()
117
118    def update(self, test_index, test):
119        # This method is called in the child processes, and communicates the
120        # results to the actual display implementation via an output queue.
121        self.queue.put((test_index, test.result))
122
123    def task_finished(self):
124        # This method is called in the child processes, and communicates that
125        # individual tasks are complete.
126        self.queue.put(None)
127
128    def handle_results(self):
129        # This method is called in the parent, and consumes the results from the
130        # output queue and dispatches to the actual display. The method will
131        # complete after each of num_jobs tasks has signalled completion.
132        completed = 0
133        while completed != self.num_jobs:
134            # Wait for a result item.
135            item = self.queue.get()
136            if item is None:
137                completed += 1
138                continue
139
140            # Update the test result in the parent process.
141            index,result = item
142            test = self.run.tests[index]
143            test.result = result
144
145            self.display.update(test)
146
147def run_one_tester(run, provider, display):
148    tester = Tester(run, provider, display)
149    tester.run()
150
151###
152
153class Run(object):
154    """
155    This class represents a concrete, configured testing run.
156    """
157
158    def __init__(self, lit_config, tests):
159        self.lit_config = lit_config
160        self.tests = tests
161
162    def execute_test(self, test):
163        result = None
164        start_time = time.time()
165        try:
166            result = test.config.test_format.execute(test, self.lit_config)
167
168            # Support deprecated result from execute() which returned the result
169            # code and additional output as a tuple.
170            if isinstance(result, tuple):
171                code, output = result
172                result = lit.Test.Result(code, output)
173            elif not isinstance(result, lit.Test.Result):
174                raise ValueError("unexpected result from test execution")
175        except KeyboardInterrupt:
176            raise
177        except:
178            if self.lit_config.debug:
179                raise
180            output = 'Exception during script execution:\n'
181            output += traceback.format_exc()
182            output += '\n'
183            result = lit.Test.Result(lit.Test.UNRESOLVED, output)
184        result.elapsed = time.time() - start_time
185
186        test.setResult(result)
187
188    def execute_tests(self, display, jobs, max_time=None,
189                      use_processes=False):
190        """
191        execute_tests(display, jobs, [max_time])
192
193        Execute each of the tests in the run, using up to jobs number of
194        parallel tasks, and inform the display of each individual result. The
195        provided tests should be a subset of the tests available in this run
196        object.
197
198        If max_time is non-None, it should be a time in seconds after which to
199        stop executing tests.
200
201        The display object will have its update method called with each test as
202        it is completed. The calls are guaranteed to be locked with respect to
203        one another, but are *not* guaranteed to be called on the same thread as
204        this method was invoked on.
205
206        Upon completion, each test in the run will have its result
207        computed. Tests which were not actually executed (for any reason) will
208        be given an UNRESOLVED result.
209        """
210
211        # Choose the appropriate parallel execution implementation.
212        consumer = None
213        if jobs != 1 and use_processes and multiprocessing:
214            try:
215                task_impl = multiprocessing.Process
216                queue_impl = multiprocessing.Queue
217                canceled_flag =  multiprocessing.Value('i', 0)
218                consumer = MultiprocessResultsConsumer(self, display, jobs)
219            except:
220                # multiprocessing fails to initialize with certain OpenBSD and
221                # FreeBSD Python versions: http://bugs.python.org/issue3770
222                # Unfortunately the error raised also varies by platform.
223                self.lit_config.note('failed to initialize multiprocessing')
224                consumer = None
225        if not consumer:
226            task_impl = threading.Thread
227            queue_impl = queue.Queue
228            canceled_flag = LockedValue(0)
229            consumer = ThreadResultsConsumer(display)
230
231        # Create the test provider.
232        provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)
233
234        # Install a console-control signal handler on Windows.
235        if win32api is not None:
236            def console_ctrl_handler(type):
237                provider.cancel()
238                return True
239            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
240
241        # Install a timeout handler, if requested.
242        if max_time is not None:
243            def timeout_handler():
244                provider.cancel()
245            timeout_timer = threading.Timer(max_time, timeout_handler)
246            timeout_timer.start()
247
248        # If not using multiple tasks, just run the tests directly.
249        if jobs == 1:
250            run_one_tester(self, provider, consumer)
251        else:
252            # Otherwise, execute the tests in parallel
253            self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)
254
255        # Cancel the timeout handler.
256        if max_time is not None:
257            timeout_timer.cancel()
258
259        # Update results for any tests which weren't run.
260        for test in self.tests:
261            if test.result is None:
262                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
263
264    def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
265        # Start all of the tasks.
266        tasks = [task_impl(target=run_one_tester,
267                           args=(self, provider, consumer))
268                 for i in range(jobs)]
269        for t in tasks:
270            t.start()
271
272        # Allow the consumer to handle results, if necessary.
273        consumer.handle_results()
274
275        # Wait for all the tasks to complete.
276        for t in tasks:
277            t.join()
278