1import gc 2import os 3import sys 4import signal 5import weakref 6 7from cStringIO import StringIO 8 9 10import unittest 11 12 13@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 14@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 15@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 16 "if threads have been used") 17class TestBreak(unittest.TestCase): 18 int_handler = None 19 20 def setUp(self): 21 self._default_handler = signal.getsignal(signal.SIGINT) 22 if self.int_handler is not None: 23 signal.signal(signal.SIGINT, self.int_handler) 24 25 def tearDown(self): 26 signal.signal(signal.SIGINT, self._default_handler) 27 unittest.signals._results = weakref.WeakKeyDictionary() 28 unittest.signals._interrupt_handler = None 29 30 31 def testInstallHandler(self): 32 default_handler = signal.getsignal(signal.SIGINT) 33 unittest.installHandler() 34 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 35 36 try: 37 pid = os.getpid() 38 os.kill(pid, signal.SIGINT) 39 except KeyboardInterrupt: 40 self.fail("KeyboardInterrupt not handled") 41 42 self.assertTrue(unittest.signals._interrupt_handler.called) 43 44 def testRegisterResult(self): 45 result = unittest.TestResult() 46 unittest.registerResult(result) 47 48 for ref in unittest.signals._results: 49 if ref is result: 50 break 51 elif ref is not result: 52 self.fail("odd object in result set") 53 else: 54 self.fail("result not found") 55 56 57 def testInterruptCaught(self): 58 default_handler = signal.getsignal(signal.SIGINT) 59 60 result = unittest.TestResult() 61 unittest.installHandler() 62 unittest.registerResult(result) 63 64 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 65 66 def test(result): 67 pid = os.getpid() 68 os.kill(pid, signal.SIGINT) 69 result.breakCaught = True 70 self.assertTrue(result.shouldStop) 71 72 try: 73 test(result) 74 except KeyboardInterrupt: 75 self.fail("KeyboardInterrupt not handled") 76 self.assertTrue(result.breakCaught) 77 78 79 def testSecondInterrupt(self): 80 # Can't use skipIf decorator because the signal handler may have 81 # been changed after defining this method. 82 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 83 self.skipTest("test requires SIGINT to not be ignored") 84 result = unittest.TestResult() 85 unittest.installHandler() 86 unittest.registerResult(result) 87 88 def test(result): 89 pid = os.getpid() 90 os.kill(pid, signal.SIGINT) 91 result.breakCaught = True 92 self.assertTrue(result.shouldStop) 93 os.kill(pid, signal.SIGINT) 94 self.fail("Second KeyboardInterrupt not raised") 95 96 try: 97 test(result) 98 except KeyboardInterrupt: 99 pass 100 else: 101 self.fail("Second KeyboardInterrupt not raised") 102 self.assertTrue(result.breakCaught) 103 104 105 def testTwoResults(self): 106 unittest.installHandler() 107 108 result = unittest.TestResult() 109 unittest.registerResult(result) 110 new_handler = signal.getsignal(signal.SIGINT) 111 112 result2 = unittest.TestResult() 113 unittest.registerResult(result2) 114 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) 115 116 result3 = unittest.TestResult() 117 118 def test(result): 119 pid = os.getpid() 120 os.kill(pid, signal.SIGINT) 121 122 try: 123 test(result) 124 except KeyboardInterrupt: 125 self.fail("KeyboardInterrupt not handled") 126 127 self.assertTrue(result.shouldStop) 128 self.assertTrue(result2.shouldStop) 129 self.assertFalse(result3.shouldStop) 130 131 132 def testHandlerReplacedButCalled(self): 133 # Can't use skipIf decorator because the signal handler may have 134 # been changed after defining this method. 135 if signal.getsignal(signal.SIGINT) == signal.SIG_IGN: 136 self.skipTest("test requires SIGINT to not be ignored") 137 # If our handler has been replaced (is no longer installed) but is 138 # called by the *new* handler, then it isn't safe to delay the 139 # SIGINT and we should immediately delegate to the default handler 140 unittest.installHandler() 141 142 handler = signal.getsignal(signal.SIGINT) 143 def new_handler(frame, signum): 144 handler(frame, signum) 145 signal.signal(signal.SIGINT, new_handler) 146 147 try: 148 pid = os.getpid() 149 os.kill(pid, signal.SIGINT) 150 except KeyboardInterrupt: 151 pass 152 else: 153 self.fail("replaced but delegated handler doesn't raise interrupt") 154 155 def testRunner(self): 156 # Creating a TextTestRunner with the appropriate argument should 157 # register the TextTestResult it creates 158 runner = unittest.TextTestRunner(stream=StringIO()) 159 160 result = runner.run(unittest.TestSuite()) 161 self.assertIn(result, unittest.signals._results) 162 163 def testWeakReferences(self): 164 # Calling registerResult on a result should not keep it alive 165 result = unittest.TestResult() 166 unittest.registerResult(result) 167 168 ref = weakref.ref(result) 169 del result 170 171 # For non-reference counting implementations 172 gc.collect();gc.collect() 173 self.assertIsNone(ref()) 174 175 176 def testRemoveResult(self): 177 result = unittest.TestResult() 178 unittest.registerResult(result) 179 180 unittest.installHandler() 181 self.assertTrue(unittest.removeResult(result)) 182 183 # Should this raise an error instead? 184 self.assertFalse(unittest.removeResult(unittest.TestResult())) 185 186 try: 187 pid = os.getpid() 188 os.kill(pid, signal.SIGINT) 189 except KeyboardInterrupt: 190 pass 191 192 self.assertFalse(result.shouldStop) 193 194 def testMainInstallsHandler(self): 195 failfast = object() 196 test = object() 197 verbosity = object() 198 result = object() 199 default_handler = signal.getsignal(signal.SIGINT) 200 201 class FakeRunner(object): 202 initArgs = [] 203 runArgs = [] 204 def __init__(self, *args, **kwargs): 205 self.initArgs.append((args, kwargs)) 206 def run(self, test): 207 self.runArgs.append(test) 208 return result 209 210 class Program(unittest.TestProgram): 211 def __init__(self, catchbreak): 212 self.exit = False 213 self.verbosity = verbosity 214 self.failfast = failfast 215 self.catchbreak = catchbreak 216 self.testRunner = FakeRunner 217 self.test = test 218 self.result = None 219 220 p = Program(False) 221 p.runTests() 222 223 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 224 'verbosity': verbosity, 225 'failfast': failfast})]) 226 self.assertEqual(FakeRunner.runArgs, [test]) 227 self.assertEqual(p.result, result) 228 229 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 230 231 FakeRunner.initArgs = [] 232 FakeRunner.runArgs = [] 233 p = Program(True) 234 p.runTests() 235 236 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 237 'verbosity': verbosity, 238 'failfast': failfast})]) 239 self.assertEqual(FakeRunner.runArgs, [test]) 240 self.assertEqual(p.result, result) 241 242 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 243 244 def testRemoveHandler(self): 245 default_handler = signal.getsignal(signal.SIGINT) 246 unittest.installHandler() 247 unittest.removeHandler() 248 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 249 250 # check that calling removeHandler multiple times has no ill-effect 251 unittest.removeHandler() 252 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 253 254 def testRemoveHandlerAsDecorator(self): 255 default_handler = signal.getsignal(signal.SIGINT) 256 unittest.installHandler() 257 258 @unittest.removeHandler 259 def test(): 260 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 261 262 test() 263 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 264 265@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 266@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 267@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 268 "if threads have been used") 269class TestBreakDefaultIntHandler(TestBreak): 270 int_handler = signal.default_int_handler 271 272@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 273@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 274@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 275 "if threads have been used") 276class TestBreakSignalIgnored(TestBreak): 277 int_handler = signal.SIG_IGN 278 279@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 280@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 281@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 282 "if threads have been used") 283class TestBreakSignalDefault(TestBreak): 284 int_handler = signal.SIG_DFL 285