1import gc
2import os
3import sys
4import signal
5import weakref
6
7from cStringIO import StringIO
8
9
10import unittest
11
12
13@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
14@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
15@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
16    "if threads have been used")
17class TestBreak(unittest.TestCase):
18
19    def setUp(self):
20        self._default_handler = signal.getsignal(signal.SIGINT)
21
22    def tearDown(self):
23        signal.signal(signal.SIGINT, self._default_handler)
24        unittest.signals._results = weakref.WeakKeyDictionary()
25        unittest.signals._interrupt_handler = None
26
27
28    def testInstallHandler(self):
29        default_handler = signal.getsignal(signal.SIGINT)
30        unittest.installHandler()
31        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
32
33        try:
34            pid = os.getpid()
35            os.kill(pid, signal.SIGINT)
36        except KeyboardInterrupt:
37            self.fail("KeyboardInterrupt not handled")
38
39        self.assertTrue(unittest.signals._interrupt_handler.called)
40
41    def testRegisterResult(self):
42        result = unittest.TestResult()
43        unittest.registerResult(result)
44
45        for ref in unittest.signals._results:
46            if ref is result:
47                break
48            elif ref is not result:
49                self.fail("odd object in result set")
50        else:
51            self.fail("result not found")
52
53
54    def testInterruptCaught(self):
55        default_handler = signal.getsignal(signal.SIGINT)
56
57        result = unittest.TestResult()
58        unittest.installHandler()
59        unittest.registerResult(result)
60
61        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
62
63        def test(result):
64            pid = os.getpid()
65            os.kill(pid, signal.SIGINT)
66            result.breakCaught = True
67            self.assertTrue(result.shouldStop)
68
69        try:
70            test(result)
71        except KeyboardInterrupt:
72            self.fail("KeyboardInterrupt not handled")
73        self.assertTrue(result.breakCaught)
74
75
76    def testSecondInterrupt(self):
77        result = unittest.TestResult()
78        unittest.installHandler()
79        unittest.registerResult(result)
80
81        def test(result):
82            pid = os.getpid()
83            os.kill(pid, signal.SIGINT)
84            result.breakCaught = True
85            self.assertTrue(result.shouldStop)
86            os.kill(pid, signal.SIGINT)
87            self.fail("Second KeyboardInterrupt not raised")
88
89        try:
90            test(result)
91        except KeyboardInterrupt:
92            pass
93        else:
94            self.fail("Second KeyboardInterrupt not raised")
95        self.assertTrue(result.breakCaught)
96
97
98    def testTwoResults(self):
99        unittest.installHandler()
100
101        result = unittest.TestResult()
102        unittest.registerResult(result)
103        new_handler = signal.getsignal(signal.SIGINT)
104
105        result2 = unittest.TestResult()
106        unittest.registerResult(result2)
107        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
108
109        result3 = unittest.TestResult()
110
111        def test(result):
112            pid = os.getpid()
113            os.kill(pid, signal.SIGINT)
114
115        try:
116            test(result)
117        except KeyboardInterrupt:
118            self.fail("KeyboardInterrupt not handled")
119
120        self.assertTrue(result.shouldStop)
121        self.assertTrue(result2.shouldStop)
122        self.assertFalse(result3.shouldStop)
123
124
125    def testHandlerReplacedButCalled(self):
126        # If our handler has been replaced (is no longer installed) but is
127        # called by the *new* handler, then it isn't safe to delay the
128        # SIGINT and we should immediately delegate to the default handler
129        unittest.installHandler()
130
131        handler = signal.getsignal(signal.SIGINT)
132        def new_handler(frame, signum):
133            handler(frame, signum)
134        signal.signal(signal.SIGINT, new_handler)
135
136        try:
137            pid = os.getpid()
138            os.kill(pid, signal.SIGINT)
139        except KeyboardInterrupt:
140            pass
141        else:
142            self.fail("replaced but delegated handler doesn't raise interrupt")
143
144    def testRunner(self):
145        # Creating a TextTestRunner with the appropriate argument should
146        # register the TextTestResult it creates
147        runner = unittest.TextTestRunner(stream=StringIO())
148
149        result = runner.run(unittest.TestSuite())
150        self.assertIn(result, unittest.signals._results)
151
152    def testWeakReferences(self):
153        # Calling registerResult on a result should not keep it alive
154        result = unittest.TestResult()
155        unittest.registerResult(result)
156
157        ref = weakref.ref(result)
158        del result
159
160        # For non-reference counting implementations
161        gc.collect();gc.collect()
162        self.assertIsNone(ref())
163
164
165    def testRemoveResult(self):
166        result = unittest.TestResult()
167        unittest.registerResult(result)
168
169        unittest.installHandler()
170        self.assertTrue(unittest.removeResult(result))
171
172        # Should this raise an error instead?
173        self.assertFalse(unittest.removeResult(unittest.TestResult()))
174
175        try:
176            pid = os.getpid()
177            os.kill(pid, signal.SIGINT)
178        except KeyboardInterrupt:
179            pass
180
181        self.assertFalse(result.shouldStop)
182
183    def testMainInstallsHandler(self):
184        failfast = object()
185        test = object()
186        verbosity = object()
187        result = object()
188        default_handler = signal.getsignal(signal.SIGINT)
189
190        class FakeRunner(object):
191            initArgs = []
192            runArgs = []
193            def __init__(self, *args, **kwargs):
194                self.initArgs.append((args, kwargs))
195            def run(self, test):
196                self.runArgs.append(test)
197                return result
198
199        class Program(unittest.TestProgram):
200            def __init__(self, catchbreak):
201                self.exit = False
202                self.verbosity = verbosity
203                self.failfast = failfast
204                self.catchbreak = catchbreak
205                self.testRunner = FakeRunner
206                self.test = test
207                self.result = None
208
209        p = Program(False)
210        p.runTests()
211
212        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
213                                                     'verbosity': verbosity,
214                                                     'failfast': failfast})])
215        self.assertEqual(FakeRunner.runArgs, [test])
216        self.assertEqual(p.result, result)
217
218        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
219
220        FakeRunner.initArgs = []
221        FakeRunner.runArgs = []
222        p = Program(True)
223        p.runTests()
224
225        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
226                                                     'verbosity': verbosity,
227                                                     'failfast': failfast})])
228        self.assertEqual(FakeRunner.runArgs, [test])
229        self.assertEqual(p.result, result)
230
231        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
232
233    def testRemoveHandler(self):
234        default_handler = signal.getsignal(signal.SIGINT)
235        unittest.installHandler()
236        unittest.removeHandler()
237        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
238
239        # check that calling removeHandler multiple times has no ill-effect
240        unittest.removeHandler()
241        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
242
243    def testRemoveHandlerAsDecorator(self):
244        default_handler = signal.getsignal(signal.SIGINT)
245        unittest.installHandler()
246
247        @unittest.removeHandler
248        def test():
249            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
250
251        test()
252        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
253