1import gc
2import os
3import sys
4import signal
5import weakref
6
7from cStringIO import StringIO
8
9
10import unittest
11
12
13@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
14@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
15@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
16    "if threads have been used")
17class TestBreak(unittest.TestCase):
18    int_handler = None
19
20    def setUp(self):
21        self._default_handler = signal.getsignal(signal.SIGINT)
22        if self.int_handler is not None:
23            signal.signal(signal.SIGINT, self.int_handler)
24
25    def tearDown(self):
26        signal.signal(signal.SIGINT, self._default_handler)
27        unittest.signals._results = weakref.WeakKeyDictionary()
28        unittest.signals._interrupt_handler = None
29
30
31    def testInstallHandler(self):
32        default_handler = signal.getsignal(signal.SIGINT)
33        unittest.installHandler()
34        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
35
36        try:
37            pid = os.getpid()
38            os.kill(pid, signal.SIGINT)
39        except KeyboardInterrupt:
40            self.fail("KeyboardInterrupt not handled")
41
42        self.assertTrue(unittest.signals._interrupt_handler.called)
43
44    def testRegisterResult(self):
45        result = unittest.TestResult()
46        unittest.registerResult(result)
47
48        for ref in unittest.signals._results:
49            if ref is result:
50                break
51            elif ref is not result:
52                self.fail("odd object in result set")
53        else:
54            self.fail("result not found")
55
56
57    def testInterruptCaught(self):
58        default_handler = signal.getsignal(signal.SIGINT)
59
60        result = unittest.TestResult()
61        unittest.installHandler()
62        unittest.registerResult(result)
63
64        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
65
66        def test(result):
67            pid = os.getpid()
68            os.kill(pid, signal.SIGINT)
69            result.breakCaught = True
70            self.assertTrue(result.shouldStop)
71
72        try:
73            test(result)
74        except KeyboardInterrupt:
75            self.fail("KeyboardInterrupt not handled")
76        self.assertTrue(result.breakCaught)
77
78
79    def testSecondInterrupt(self):
80        # Can't use skipIf decorator because the signal handler may have
81        # been changed after defining this method.
82        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
83            self.skipTest("test requires SIGINT to not be ignored")
84        result = unittest.TestResult()
85        unittest.installHandler()
86        unittest.registerResult(result)
87
88        def test(result):
89            pid = os.getpid()
90            os.kill(pid, signal.SIGINT)
91            result.breakCaught = True
92            self.assertTrue(result.shouldStop)
93            os.kill(pid, signal.SIGINT)
94            self.fail("Second KeyboardInterrupt not raised")
95
96        try:
97            test(result)
98        except KeyboardInterrupt:
99            pass
100        else:
101            self.fail("Second KeyboardInterrupt not raised")
102        self.assertTrue(result.breakCaught)
103
104
105    def testTwoResults(self):
106        unittest.installHandler()
107
108        result = unittest.TestResult()
109        unittest.registerResult(result)
110        new_handler = signal.getsignal(signal.SIGINT)
111
112        result2 = unittest.TestResult()
113        unittest.registerResult(result2)
114        self.assertEqual(signal.getsignal(signal.SIGINT), new_handler)
115
116        result3 = unittest.TestResult()
117
118        def test(result):
119            pid = os.getpid()
120            os.kill(pid, signal.SIGINT)
121
122        try:
123            test(result)
124        except KeyboardInterrupt:
125            self.fail("KeyboardInterrupt not handled")
126
127        self.assertTrue(result.shouldStop)
128        self.assertTrue(result2.shouldStop)
129        self.assertFalse(result3.shouldStop)
130
131
132    def testHandlerReplacedButCalled(self):
133        # Can't use skipIf decorator because the signal handler may have
134        # been changed after defining this method.
135        if signal.getsignal(signal.SIGINT) == signal.SIG_IGN:
136            self.skipTest("test requires SIGINT to not be ignored")
137        # If our handler has been replaced (is no longer installed) but is
138        # called by the *new* handler, then it isn't safe to delay the
139        # SIGINT and we should immediately delegate to the default handler
140        unittest.installHandler()
141
142        handler = signal.getsignal(signal.SIGINT)
143        def new_handler(frame, signum):
144            handler(frame, signum)
145        signal.signal(signal.SIGINT, new_handler)
146
147        try:
148            pid = os.getpid()
149            os.kill(pid, signal.SIGINT)
150        except KeyboardInterrupt:
151            pass
152        else:
153            self.fail("replaced but delegated handler doesn't raise interrupt")
154
155    def testRunner(self):
156        # Creating a TextTestRunner with the appropriate argument should
157        # register the TextTestResult it creates
158        runner = unittest.TextTestRunner(stream=StringIO())
159
160        result = runner.run(unittest.TestSuite())
161        self.assertIn(result, unittest.signals._results)
162
163    def testWeakReferences(self):
164        # Calling registerResult on a result should not keep it alive
165        result = unittest.TestResult()
166        unittest.registerResult(result)
167
168        ref = weakref.ref(result)
169        del result
170
171        # For non-reference counting implementations
172        gc.collect();gc.collect()
173        self.assertIsNone(ref())
174
175
176    def testRemoveResult(self):
177        result = unittest.TestResult()
178        unittest.registerResult(result)
179
180        unittest.installHandler()
181        self.assertTrue(unittest.removeResult(result))
182
183        # Should this raise an error instead?
184        self.assertFalse(unittest.removeResult(unittest.TestResult()))
185
186        try:
187            pid = os.getpid()
188            os.kill(pid, signal.SIGINT)
189        except KeyboardInterrupt:
190            pass
191
192        self.assertFalse(result.shouldStop)
193
194    def testMainInstallsHandler(self):
195        failfast = object()
196        test = object()
197        verbosity = object()
198        result = object()
199        default_handler = signal.getsignal(signal.SIGINT)
200
201        class FakeRunner(object):
202            initArgs = []
203            runArgs = []
204            def __init__(self, *args, **kwargs):
205                self.initArgs.append((args, kwargs))
206            def run(self, test):
207                self.runArgs.append(test)
208                return result
209
210        class Program(unittest.TestProgram):
211            def __init__(self, catchbreak):
212                self.exit = False
213                self.verbosity = verbosity
214                self.failfast = failfast
215                self.catchbreak = catchbreak
216                self.testRunner = FakeRunner
217                self.test = test
218                self.result = None
219
220        p = Program(False)
221        p.runTests()
222
223        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
224                                                     'verbosity': verbosity,
225                                                     'failfast': failfast})])
226        self.assertEqual(FakeRunner.runArgs, [test])
227        self.assertEqual(p.result, result)
228
229        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
230
231        FakeRunner.initArgs = []
232        FakeRunner.runArgs = []
233        p = Program(True)
234        p.runTests()
235
236        self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None,
237                                                     'verbosity': verbosity,
238                                                     'failfast': failfast})])
239        self.assertEqual(FakeRunner.runArgs, [test])
240        self.assertEqual(p.result, result)
241
242        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
243
244    def testRemoveHandler(self):
245        default_handler = signal.getsignal(signal.SIGINT)
246        unittest.installHandler()
247        unittest.removeHandler()
248        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
249
250        # check that calling removeHandler multiple times has no ill-effect
251        unittest.removeHandler()
252        self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
253
254    def testRemoveHandlerAsDecorator(self):
255        default_handler = signal.getsignal(signal.SIGINT)
256        unittest.installHandler()
257
258        @unittest.removeHandler
259        def test():
260            self.assertEqual(signal.getsignal(signal.SIGINT), default_handler)
261
262        test()
263        self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler)
264
265@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
266@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
267@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
268    "if threads have been used")
269class TestBreakDefaultIntHandler(TestBreak):
270    int_handler = signal.default_int_handler
271
272@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
273@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
274@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
275    "if threads have been used")
276class TestBreakSignalIgnored(TestBreak):
277    int_handler = signal.SIG_IGN
278
279@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill")
280@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows")
281@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 "
282    "if threads have been used")
283class TestBreakSignalDefault(TestBreak):
284    int_handler = signal.SIG_DFL
285