1import gc 2import os 3import sys 4import signal 5import weakref 6 7from cStringIO import StringIO 8 9 10import unittest 11 12 13@unittest.skipUnless(hasattr(os, 'kill'), "Test requires os.kill") 14@unittest.skipIf(sys.platform =="win32", "Test cannot run on Windows") 15@unittest.skipIf(sys.platform == 'freebsd6', "Test kills regrtest on freebsd6 " 16 "if threads have been used") 17class TestBreak(unittest.TestCase): 18 19 def setUp(self): 20 self._default_handler = signal.getsignal(signal.SIGINT) 21 22 def tearDown(self): 23 signal.signal(signal.SIGINT, self._default_handler) 24 unittest.signals._results = weakref.WeakKeyDictionary() 25 unittest.signals._interrupt_handler = None 26 27 28 def testInstallHandler(self): 29 default_handler = signal.getsignal(signal.SIGINT) 30 unittest.installHandler() 31 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 32 33 try: 34 pid = os.getpid() 35 os.kill(pid, signal.SIGINT) 36 except KeyboardInterrupt: 37 self.fail("KeyboardInterrupt not handled") 38 39 self.assertTrue(unittest.signals._interrupt_handler.called) 40 41 def testRegisterResult(self): 42 result = unittest.TestResult() 43 unittest.registerResult(result) 44 45 for ref in unittest.signals._results: 46 if ref is result: 47 break 48 elif ref is not result: 49 self.fail("odd object in result set") 50 else: 51 self.fail("result not found") 52 53 54 def testInterruptCaught(self): 55 default_handler = signal.getsignal(signal.SIGINT) 56 57 result = unittest.TestResult() 58 unittest.installHandler() 59 unittest.registerResult(result) 60 61 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 62 63 def test(result): 64 pid = os.getpid() 65 os.kill(pid, signal.SIGINT) 66 result.breakCaught = True 67 self.assertTrue(result.shouldStop) 68 69 try: 70 test(result) 71 except KeyboardInterrupt: 72 self.fail("KeyboardInterrupt not handled") 73 self.assertTrue(result.breakCaught) 74 75 76 def testSecondInterrupt(self): 77 result = unittest.TestResult() 78 unittest.installHandler() 79 unittest.registerResult(result) 80 81 def test(result): 82 pid = os.getpid() 83 os.kill(pid, signal.SIGINT) 84 result.breakCaught = True 85 self.assertTrue(result.shouldStop) 86 os.kill(pid, signal.SIGINT) 87 self.fail("Second KeyboardInterrupt not raised") 88 89 try: 90 test(result) 91 except KeyboardInterrupt: 92 pass 93 else: 94 self.fail("Second KeyboardInterrupt not raised") 95 self.assertTrue(result.breakCaught) 96 97 98 def testTwoResults(self): 99 unittest.installHandler() 100 101 result = unittest.TestResult() 102 unittest.registerResult(result) 103 new_handler = signal.getsignal(signal.SIGINT) 104 105 result2 = unittest.TestResult() 106 unittest.registerResult(result2) 107 self.assertEqual(signal.getsignal(signal.SIGINT), new_handler) 108 109 result3 = unittest.TestResult() 110 111 def test(result): 112 pid = os.getpid() 113 os.kill(pid, signal.SIGINT) 114 115 try: 116 test(result) 117 except KeyboardInterrupt: 118 self.fail("KeyboardInterrupt not handled") 119 120 self.assertTrue(result.shouldStop) 121 self.assertTrue(result2.shouldStop) 122 self.assertFalse(result3.shouldStop) 123 124 125 def testHandlerReplacedButCalled(self): 126 # If our handler has been replaced (is no longer installed) but is 127 # called by the *new* handler, then it isn't safe to delay the 128 # SIGINT and we should immediately delegate to the default handler 129 unittest.installHandler() 130 131 handler = signal.getsignal(signal.SIGINT) 132 def new_handler(frame, signum): 133 handler(frame, signum) 134 signal.signal(signal.SIGINT, new_handler) 135 136 try: 137 pid = os.getpid() 138 os.kill(pid, signal.SIGINT) 139 except KeyboardInterrupt: 140 pass 141 else: 142 self.fail("replaced but delegated handler doesn't raise interrupt") 143 144 def testRunner(self): 145 # Creating a TextTestRunner with the appropriate argument should 146 # register the TextTestResult it creates 147 runner = unittest.TextTestRunner(stream=StringIO()) 148 149 result = runner.run(unittest.TestSuite()) 150 self.assertIn(result, unittest.signals._results) 151 152 def testWeakReferences(self): 153 # Calling registerResult on a result should not keep it alive 154 result = unittest.TestResult() 155 unittest.registerResult(result) 156 157 ref = weakref.ref(result) 158 del result 159 160 # For non-reference counting implementations 161 gc.collect();gc.collect() 162 self.assertIsNone(ref()) 163 164 165 def testRemoveResult(self): 166 result = unittest.TestResult() 167 unittest.registerResult(result) 168 169 unittest.installHandler() 170 self.assertTrue(unittest.removeResult(result)) 171 172 # Should this raise an error instead? 173 self.assertFalse(unittest.removeResult(unittest.TestResult())) 174 175 try: 176 pid = os.getpid() 177 os.kill(pid, signal.SIGINT) 178 except KeyboardInterrupt: 179 pass 180 181 self.assertFalse(result.shouldStop) 182 183 def testMainInstallsHandler(self): 184 failfast = object() 185 test = object() 186 verbosity = object() 187 result = object() 188 default_handler = signal.getsignal(signal.SIGINT) 189 190 class FakeRunner(object): 191 initArgs = [] 192 runArgs = [] 193 def __init__(self, *args, **kwargs): 194 self.initArgs.append((args, kwargs)) 195 def run(self, test): 196 self.runArgs.append(test) 197 return result 198 199 class Program(unittest.TestProgram): 200 def __init__(self, catchbreak): 201 self.exit = False 202 self.verbosity = verbosity 203 self.failfast = failfast 204 self.catchbreak = catchbreak 205 self.testRunner = FakeRunner 206 self.test = test 207 self.result = None 208 209 p = Program(False) 210 p.runTests() 211 212 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 213 'verbosity': verbosity, 214 'failfast': failfast})]) 215 self.assertEqual(FakeRunner.runArgs, [test]) 216 self.assertEqual(p.result, result) 217 218 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 219 220 FakeRunner.initArgs = [] 221 FakeRunner.runArgs = [] 222 p = Program(True) 223 p.runTests() 224 225 self.assertEqual(FakeRunner.initArgs, [((), {'buffer': None, 226 'verbosity': verbosity, 227 'failfast': failfast})]) 228 self.assertEqual(FakeRunner.runArgs, [test]) 229 self.assertEqual(p.result, result) 230 231 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 232 233 def testRemoveHandler(self): 234 default_handler = signal.getsignal(signal.SIGINT) 235 unittest.installHandler() 236 unittest.removeHandler() 237 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 238 239 # check that calling removeHandler multiple times has no ill-effect 240 unittest.removeHandler() 241 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 242 243 def testRemoveHandlerAsDecorator(self): 244 default_handler = signal.getsignal(signal.SIGINT) 245 unittest.installHandler() 246 247 @unittest.removeHandler 248 def test(): 249 self.assertEqual(signal.getsignal(signal.SIGINT), default_handler) 250 251 test() 252 self.assertNotEqual(signal.getsignal(signal.SIGINT), default_handler) 253