1import unittest 2from test import test_support 3from contextlib import closing 4import gc 5import pickle 6import select 7import signal 8import subprocess 9import traceback 10import sys, os, time, errno 11 12if sys.platform in ('os2', 'riscos'): 13 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 14 15 16class HandlerBCalled(Exception): 17 pass 18 19 20def exit_subprocess(): 21 """Use os._exit(0) to exit the current subprocess. 22 23 Otherwise, the test catches the SystemExit and continues executing 24 in parallel with the original test, so you wind up with an 25 exponential number of tests running concurrently. 26 """ 27 os._exit(0) 28 29 30def ignoring_eintr(__func, *args, **kwargs): 31 try: 32 return __func(*args, **kwargs) 33 except EnvironmentError as e: 34 if e.errno != errno.EINTR: 35 raise 36 return None 37 38 39@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 40class InterProcessSignalTests(unittest.TestCase): 41 MAX_DURATION = 20 # Entire test should last at most 20 sec. 42 43 def setUp(self): 44 self.using_gc = gc.isenabled() 45 gc.disable() 46 47 def tearDown(self): 48 if self.using_gc: 49 gc.enable() 50 51 def format_frame(self, frame, limit=None): 52 return ''.join(traceback.format_stack(frame, limit=limit)) 53 54 def handlerA(self, signum, frame): 55 self.a_called = True 56 if test_support.verbose: 57 print "handlerA invoked from signal %s at:\n%s" % ( 58 signum, self.format_frame(frame, limit=1)) 59 60 def handlerB(self, signum, frame): 61 self.b_called = True 62 if test_support.verbose: 63 print "handlerB invoked from signal %s at:\n%s" % ( 64 signum, self.format_frame(frame, limit=1)) 65 raise HandlerBCalled(signum, self.format_frame(frame)) 66 67 def wait(self, child): 68 """Wait for child to finish, ignoring EINTR.""" 69 while True: 70 try: 71 child.wait() 72 return 73 except OSError as e: 74 if e.errno != errno.EINTR: 75 raise 76 77 def run_test(self): 78 # Install handlers. This function runs in a sub-process, so we 79 # don't worry about re-setting the default handlers. 80 signal.signal(signal.SIGHUP, self.handlerA) 81 signal.signal(signal.SIGUSR1, self.handlerB) 82 signal.signal(signal.SIGUSR2, signal.SIG_IGN) 83 signal.signal(signal.SIGALRM, signal.default_int_handler) 84 85 # Variables the signals will modify: 86 self.a_called = False 87 self.b_called = False 88 89 # Let the sub-processes know who to send signals to. 90 pid = os.getpid() 91 if test_support.verbose: 92 print "test runner's pid is", pid 93 94 child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)]) 95 if child: 96 self.wait(child) 97 if not self.a_called: 98 time.sleep(1) # Give the signal time to be delivered. 99 self.assertTrue(self.a_called) 100 self.assertFalse(self.b_called) 101 self.a_called = False 102 103 # Make sure the signal isn't delivered while the previous 104 # Popen object is being destroyed, because __del__ swallows 105 # exceptions. 106 del child 107 try: 108 child = subprocess.Popen(['kill', '-USR1', str(pid)]) 109 # This wait should be interrupted by the signal's exception. 110 self.wait(child) 111 time.sleep(1) # Give the signal time to be delivered. 112 self.fail('HandlerBCalled exception not raised') 113 except HandlerBCalled: 114 self.assertTrue(self.b_called) 115 self.assertFalse(self.a_called) 116 if test_support.verbose: 117 print "HandlerBCalled exception caught" 118 119 child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)]) 120 if child: 121 self.wait(child) # Nothing should happen. 122 123 try: 124 signal.alarm(1) 125 # The race condition in pause doesn't matter in this case, 126 # since alarm is going to raise a KeyboardException, which 127 # will skip the call. 128 signal.pause() 129 # But if another signal arrives before the alarm, pause 130 # may return early. 131 time.sleep(1) 132 except KeyboardInterrupt: 133 if test_support.verbose: 134 print "KeyboardInterrupt (the alarm() went off)" 135 except: 136 self.fail("Some other exception woke us from pause: %s" % 137 traceback.format_exc()) 138 else: 139 self.fail("pause returned of its own accord, and the signal" 140 " didn't arrive after another second.") 141 finally: 142 signal.alarm(0) 143 144 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 145 @unittest.skipIf(sys.platform=='freebsd6', 146 'inter process signals not reliable (do not mix well with threading) ' 147 'on freebsd6') 148 def test_main(self): 149 # This function spawns a child process to insulate the main 150 # test-running process from all the signals. It then 151 # communicates with that child process over a pipe and 152 # re-raises information about any exceptions the child 153 # raises. The real work happens in self.run_test(). 154 os_done_r, os_done_w = os.pipe() 155 with closing(os.fdopen(os_done_r)) as done_r, \ 156 closing(os.fdopen(os_done_w, 'w')) as done_w: 157 child = os.fork() 158 if child == 0: 159 # In the child process; run the test and report results 160 # through the pipe. 161 try: 162 done_r.close() 163 # Have to close done_w again here because 164 # exit_subprocess() will skip the enclosing with block. 165 with closing(done_w): 166 try: 167 self.run_test() 168 except: 169 pickle.dump(traceback.format_exc(), done_w) 170 else: 171 pickle.dump(None, done_w) 172 except: 173 print 'Uh oh, raised from pickle.' 174 traceback.print_exc() 175 finally: 176 exit_subprocess() 177 178 done_w.close() 179 # Block for up to MAX_DURATION seconds for the test to finish. 180 r, w, x = select.select([done_r], [], [], self.MAX_DURATION) 181 if done_r in r: 182 tb = pickle.load(done_r) 183 if tb: 184 self.fail(tb) 185 else: 186 os.kill(child, signal.SIGKILL) 187 self.fail('Test deadlocked after %d seconds.' % 188 self.MAX_DURATION) 189 190 # read the exit status to not leak a zombie process 191 os.waitpid(child, 0) 192 193 194@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 195class BasicSignalTests(unittest.TestCase): 196 def trivial_signal_handler(self, *args): 197 pass 198 199 def test_out_of_range_signal_number_raises_error(self): 200 self.assertRaises(ValueError, signal.getsignal, 4242) 201 202 self.assertRaises(ValueError, signal.signal, 4242, 203 self.trivial_signal_handler) 204 205 def test_setting_signal_handler_to_none_raises_error(self): 206 self.assertRaises(TypeError, signal.signal, 207 signal.SIGUSR1, None) 208 209 def test_getsignal(self): 210 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 211 self.assertEqual(signal.getsignal(signal.SIGHUP), 212 self.trivial_signal_handler) 213 signal.signal(signal.SIGHUP, hup) 214 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 215 216 217@unittest.skipUnless(sys.platform == "win32", "Windows specific") 218class WindowsSignalTests(unittest.TestCase): 219 def test_issue9324(self): 220 # Updated for issue #10003, adding SIGBREAK 221 handler = lambda x, y: None 222 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 223 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 224 signal.SIGTERM): 225 # Set and then reset a handler for signals that work on windows 226 signal.signal(sig, signal.signal(sig, handler)) 227 228 with self.assertRaises(ValueError): 229 signal.signal(-1, handler) 230 231 with self.assertRaises(ValueError): 232 signal.signal(7, handler) 233 234 235class WakeupFDTests(unittest.TestCase): 236 237 def test_invalid_fd(self): 238 fd = test_support.make_bad_fd() 239 self.assertRaises(ValueError, signal.set_wakeup_fd, fd) 240 241 242@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 243class WakeupSignalTests(unittest.TestCase): 244 TIMEOUT_FULL = 10 245 TIMEOUT_HALF = 5 246 247 def test_wakeup_fd_early(self): 248 import select 249 250 signal.alarm(1) 251 try: 252 before_time = time.time() 253 # We attempt to get a signal during the sleep, 254 # before select is called 255 time.sleep(self.TIMEOUT_FULL) 256 mid_time = time.time() 257 finally: 258 signal.alarm(0) 259 260 self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF) 261 select.select([self.read], [], [], self.TIMEOUT_FULL) 262 after_time = time.time() 263 self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF) 264 265 def test_wakeup_fd_during(self): 266 import select 267 268 signal.alarm(1) 269 try: 270 before_time = time.time() 271 # We attempt to get a signal during the select call 272 self.assertRaises(select.error, select.select, 273 [self.read], [], [], self.TIMEOUT_FULL) 274 after_time = time.time() 275 finally: 276 signal.alarm(0) 277 278 self.assertTrue(after_time - before_time < self.TIMEOUT_HALF) 279 280 def setUp(self): 281 import fcntl 282 283 self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None) 284 self.read, self.write = os.pipe() 285 flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0) 286 flags = flags | os.O_NONBLOCK 287 fcntl.fcntl(self.write, fcntl.F_SETFL, flags) 288 self.old_wakeup = signal.set_wakeup_fd(self.write) 289 290 def tearDown(self): 291 signal.set_wakeup_fd(self.old_wakeup) 292 os.close(self.read) 293 os.close(self.write) 294 signal.signal(signal.SIGALRM, self.alrm) 295 296@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 297class SiginterruptTest(unittest.TestCase): 298 299 def setUp(self): 300 """Install a no-op signal handler that can be set to allow 301 interrupts or not, and arrange for the original signal handler to be 302 re-installed when the test is finished. 303 """ 304 self.signum = signal.SIGUSR1 305 oldhandler = signal.signal(self.signum, lambda x,y: None) 306 self.addCleanup(signal.signal, self.signum, oldhandler) 307 308 def readpipe_interrupted(self): 309 """Perform a read during which a signal will arrive. Return True if the 310 read is interrupted by the signal and raises an exception. Return False 311 if it returns normally. 312 """ 313 # Create a pipe that can be used for the read. Also clean it up 314 # when the test is over, since nothing else will (but see below for 315 # the write end). 316 r, w = os.pipe() 317 self.addCleanup(os.close, r) 318 319 # Create another process which can send a signal to this one to try 320 # to interrupt the read. 321 ppid = os.getpid() 322 pid = os.fork() 323 324 if pid == 0: 325 # Child code: sleep to give the parent enough time to enter the 326 # read() call (there's a race here, but it's really tricky to 327 # eliminate it); then signal the parent process. Also, sleep 328 # again to make it likely that the signal is delivered to the 329 # parent process before the child exits. If the child exits 330 # first, the write end of the pipe will be closed and the test 331 # is invalid. 332 try: 333 time.sleep(0.2) 334 os.kill(ppid, self.signum) 335 time.sleep(0.2) 336 finally: 337 # No matter what, just exit as fast as possible now. 338 exit_subprocess() 339 else: 340 # Parent code. 341 # Make sure the child is eventually reaped, else it'll be a 342 # zombie for the rest of the test suite run. 343 self.addCleanup(os.waitpid, pid, 0) 344 345 # Close the write end of the pipe. The child has a copy, so 346 # it's not really closed until the child exits. We need it to 347 # close when the child exits so that in the non-interrupt case 348 # the read eventually completes, otherwise we could just close 349 # it *after* the test. 350 os.close(w) 351 352 # Try the read and report whether it is interrupted or not to 353 # the caller. 354 try: 355 d = os.read(r, 1) 356 return False 357 except OSError, err: 358 if err.errno != errno.EINTR: 359 raise 360 return True 361 362 def test_without_siginterrupt(self): 363 """If a signal handler is installed and siginterrupt is not called 364 at all, when that signal arrives, it interrupts a syscall that's in 365 progress. 366 """ 367 i = self.readpipe_interrupted() 368 self.assertTrue(i) 369 # Arrival of the signal shouldn't have changed anything. 370 i = self.readpipe_interrupted() 371 self.assertTrue(i) 372 373 def test_siginterrupt_on(self): 374 """If a signal handler is installed and siginterrupt is called with 375 a true value for the second argument, when that signal arrives, it 376 interrupts a syscall that's in progress. 377 """ 378 signal.siginterrupt(self.signum, 1) 379 i = self.readpipe_interrupted() 380 self.assertTrue(i) 381 # Arrival of the signal shouldn't have changed anything. 382 i = self.readpipe_interrupted() 383 self.assertTrue(i) 384 385 def test_siginterrupt_off(self): 386 """If a signal handler is installed and siginterrupt is called with 387 a false value for the second argument, when that signal arrives, it 388 does not interrupt a syscall that's in progress. 389 """ 390 signal.siginterrupt(self.signum, 0) 391 i = self.readpipe_interrupted() 392 self.assertFalse(i) 393 # Arrival of the signal shouldn't have changed anything. 394 i = self.readpipe_interrupted() 395 self.assertFalse(i) 396 397 398@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 399class ItimerTest(unittest.TestCase): 400 def setUp(self): 401 self.hndl_called = False 402 self.hndl_count = 0 403 self.itimer = None 404 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 405 406 def tearDown(self): 407 signal.signal(signal.SIGALRM, self.old_alarm) 408 if self.itimer is not None: # test_itimer_exc doesn't change this attr 409 # just ensure that itimer is stopped 410 signal.setitimer(self.itimer, 0) 411 412 def sig_alrm(self, *args): 413 self.hndl_called = True 414 if test_support.verbose: 415 print("SIGALRM handler invoked", args) 416 417 def sig_vtalrm(self, *args): 418 self.hndl_called = True 419 420 if self.hndl_count > 3: 421 # it shouldn't be here, because it should have been disabled. 422 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 423 "timer.") 424 elif self.hndl_count == 3: 425 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 426 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 427 if test_support.verbose: 428 print("last SIGVTALRM handler call") 429 430 self.hndl_count += 1 431 432 if test_support.verbose: 433 print("SIGVTALRM handler invoked", args) 434 435 def sig_prof(self, *args): 436 self.hndl_called = True 437 signal.setitimer(signal.ITIMER_PROF, 0) 438 439 if test_support.verbose: 440 print("SIGPROF handler invoked", args) 441 442 def test_itimer_exc(self): 443 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 444 # defines it ? 445 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 446 # Negative times are treated as zero on some platforms. 447 if 0: 448 self.assertRaises(signal.ItimerError, 449 signal.setitimer, signal.ITIMER_REAL, -1) 450 451 def test_itimer_real(self): 452 self.itimer = signal.ITIMER_REAL 453 signal.setitimer(self.itimer, 1.0) 454 if test_support.verbose: 455 print("\ncall pause()...") 456 signal.pause() 457 458 self.assertEqual(self.hndl_called, True) 459 460 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 461 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 462 'itimer not reliable (does not mix well with threading) on some BSDs.') 463 def test_itimer_virtual(self): 464 self.itimer = signal.ITIMER_VIRTUAL 465 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 466 signal.setitimer(self.itimer, 0.3, 0.2) 467 468 start_time = time.time() 469 while time.time() - start_time < 60.0: 470 # use up some virtual time by doing real work 471 _ = pow(12345, 67890, 10000019) 472 if signal.getitimer(self.itimer) == (0.0, 0.0): 473 break # sig_vtalrm handler stopped this itimer 474 else: # Issue 8424 475 self.skipTest("timeout: likely cause: machine too slow or load too " 476 "high") 477 478 # virtual itimer should be (0.0, 0.0) now 479 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 480 # and the handler should have been called 481 self.assertEqual(self.hndl_called, True) 482 483 # Issue 3864. Unknown if this affects earlier versions of freebsd also. 484 @unittest.skipIf(sys.platform=='freebsd6', 485 'itimer not reliable (does not mix well with threading) on freebsd6') 486 def test_itimer_prof(self): 487 self.itimer = signal.ITIMER_PROF 488 signal.signal(signal.SIGPROF, self.sig_prof) 489 signal.setitimer(self.itimer, 0.2, 0.2) 490 491 start_time = time.time() 492 while time.time() - start_time < 60.0: 493 # do some work 494 _ = pow(12345, 67890, 10000019) 495 if signal.getitimer(self.itimer) == (0.0, 0.0): 496 break # sig_prof handler stopped this itimer 497 else: # Issue 8424 498 self.skipTest("timeout: likely cause: machine too slow or load too " 499 "high") 500 501 # profiling itimer should be (0.0, 0.0) now 502 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 503 # and the handler should have been called 504 self.assertEqual(self.hndl_called, True) 505 506 def test_setitimer_tiny(self): 507 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 508 # Check that float -> timeval conversion doesn't round 509 # the interval down to zero, which would disable the timer. 510 self.itimer = signal.ITIMER_REAL 511 signal.setitimer(self.itimer, 1e-6) 512 time.sleep(1) 513 self.assertEqual(self.hndl_called, True) 514 515 516def test_main(): 517 test_support.run_unittest(BasicSignalTests, InterProcessSignalTests, 518 WakeupFDTests, WakeupSignalTests, 519 SiginterruptTest, ItimerTest, 520 WindowsSignalTests) 521 522 523if __name__ == "__main__": 524 test_main() 525