1import unittest
2from test import test_support
3from contextlib import closing
4import gc
5import pickle
6import select
7import signal
8import subprocess
9import traceback
10import sys, os, time, errno
11
12if sys.platform in ('os2', 'riscos'):
13    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
14
15
16class HandlerBCalled(Exception):
17    pass
18
19
20def exit_subprocess():
21    """Use os._exit(0) to exit the current subprocess.
22
23    Otherwise, the test catches the SystemExit and continues executing
24    in parallel with the original test, so you wind up with an
25    exponential number of tests running concurrently.
26    """
27    os._exit(0)
28
29
30def ignoring_eintr(__func, *args, **kwargs):
31    try:
32        return __func(*args, **kwargs)
33    except EnvironmentError as e:
34        if e.errno != errno.EINTR:
35            raise
36        return None
37
38
39@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
40class InterProcessSignalTests(unittest.TestCase):
41    MAX_DURATION = 20   # Entire test should last at most 20 sec.
42
43    def setUp(self):
44        self.using_gc = gc.isenabled()
45        gc.disable()
46
47    def tearDown(self):
48        if self.using_gc:
49            gc.enable()
50
51    def format_frame(self, frame, limit=None):
52        return ''.join(traceback.format_stack(frame, limit=limit))
53
54    def handlerA(self, signum, frame):
55        self.a_called = True
56        if test_support.verbose:
57            print "handlerA invoked from signal %s at:\n%s" % (
58                signum, self.format_frame(frame, limit=1))
59
60    def handlerB(self, signum, frame):
61        self.b_called = True
62        if test_support.verbose:
63            print "handlerB invoked from signal %s at:\n%s" % (
64                signum, self.format_frame(frame, limit=1))
65        raise HandlerBCalled(signum, self.format_frame(frame))
66
67    def wait(self, child):
68        """Wait for child to finish, ignoring EINTR."""
69        while True:
70            try:
71                child.wait()
72                return
73            except OSError as e:
74                if e.errno != errno.EINTR:
75                    raise
76
77    def run_test(self):
78        # Install handlers. This function runs in a sub-process, so we
79        # don't worry about re-setting the default handlers.
80        signal.signal(signal.SIGHUP, self.handlerA)
81        signal.signal(signal.SIGUSR1, self.handlerB)
82        signal.signal(signal.SIGUSR2, signal.SIG_IGN)
83        signal.signal(signal.SIGALRM, signal.default_int_handler)
84
85        # Variables the signals will modify:
86        self.a_called = False
87        self.b_called = False
88
89        # Let the sub-processes know who to send signals to.
90        pid = os.getpid()
91        if test_support.verbose:
92            print "test runner's pid is", pid
93
94        child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
95        if child:
96            self.wait(child)
97            if not self.a_called:
98                time.sleep(1)  # Give the signal time to be delivered.
99        self.assertTrue(self.a_called)
100        self.assertFalse(self.b_called)
101        self.a_called = False
102
103        # Make sure the signal isn't delivered while the previous
104        # Popen object is being destroyed, because __del__ swallows
105        # exceptions.
106        del child
107        try:
108            child = subprocess.Popen(['kill', '-USR1', str(pid)])
109            # This wait should be interrupted by the signal's exception.
110            self.wait(child)
111            time.sleep(1)  # Give the signal time to be delivered.
112            self.fail('HandlerBCalled exception not raised')
113        except HandlerBCalled:
114            self.assertTrue(self.b_called)
115            self.assertFalse(self.a_called)
116            if test_support.verbose:
117                print "HandlerBCalled exception caught"
118
119        child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
120        if child:
121            self.wait(child)  # Nothing should happen.
122
123        try:
124            signal.alarm(1)
125            # The race condition in pause doesn't matter in this case,
126            # since alarm is going to raise a KeyboardException, which
127            # will skip the call.
128            signal.pause()
129            # But if another signal arrives before the alarm, pause
130            # may return early.
131            time.sleep(1)
132        except KeyboardInterrupt:
133            if test_support.verbose:
134                print "KeyboardInterrupt (the alarm() went off)"
135        except:
136            self.fail("Some other exception woke us from pause: %s" %
137                      traceback.format_exc())
138        else:
139            self.fail("pause returned of its own accord, and the signal"
140                      " didn't arrive after another second.")
141        finally:
142            signal.alarm(0)
143
144    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
145    @unittest.skipIf(sys.platform=='freebsd6',
146        'inter process signals not reliable (do not mix well with threading) '
147        'on freebsd6')
148    def test_main(self):
149        # This function spawns a child process to insulate the main
150        # test-running process from all the signals. It then
151        # communicates with that child process over a pipe and
152        # re-raises information about any exceptions the child
153        # raises. The real work happens in self.run_test().
154        os_done_r, os_done_w = os.pipe()
155        with closing(os.fdopen(os_done_r)) as done_r, \
156             closing(os.fdopen(os_done_w, 'w')) as done_w:
157            child = os.fork()
158            if child == 0:
159                # In the child process; run the test and report results
160                # through the pipe.
161                try:
162                    done_r.close()
163                    # Have to close done_w again here because
164                    # exit_subprocess() will skip the enclosing with block.
165                    with closing(done_w):
166                        try:
167                            self.run_test()
168                        except:
169                            pickle.dump(traceback.format_exc(), done_w)
170                        else:
171                            pickle.dump(None, done_w)
172                except:
173                    print 'Uh oh, raised from pickle.'
174                    traceback.print_exc()
175                finally:
176                    exit_subprocess()
177
178            done_w.close()
179            # Block for up to MAX_DURATION seconds for the test to finish.
180            r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
181            if done_r in r:
182                tb = pickle.load(done_r)
183                if tb:
184                    self.fail(tb)
185            else:
186                os.kill(child, signal.SIGKILL)
187                self.fail('Test deadlocked after %d seconds.' %
188                          self.MAX_DURATION)
189
190            # read the exit status to not leak a zombie process
191            os.waitpid(child, 0)
192
193
194@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
195class BasicSignalTests(unittest.TestCase):
196    def trivial_signal_handler(self, *args):
197        pass
198
199    def test_out_of_range_signal_number_raises_error(self):
200        self.assertRaises(ValueError, signal.getsignal, 4242)
201
202        self.assertRaises(ValueError, signal.signal, 4242,
203                          self.trivial_signal_handler)
204
205    def test_setting_signal_handler_to_none_raises_error(self):
206        self.assertRaises(TypeError, signal.signal,
207                          signal.SIGUSR1, None)
208
209    def test_getsignal(self):
210        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
211        self.assertEqual(signal.getsignal(signal.SIGHUP),
212                         self.trivial_signal_handler)
213        signal.signal(signal.SIGHUP, hup)
214        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
215
216
217@unittest.skipUnless(sys.platform == "win32", "Windows specific")
218class WindowsSignalTests(unittest.TestCase):
219    def test_issue9324(self):
220        # Updated for issue #10003, adding SIGBREAK
221        handler = lambda x, y: None
222        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
223                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
224                    signal.SIGTERM):
225            # Set and then reset a handler for signals that work on windows
226            signal.signal(sig, signal.signal(sig, handler))
227
228        with self.assertRaises(ValueError):
229            signal.signal(-1, handler)
230
231        with self.assertRaises(ValueError):
232            signal.signal(7, handler)
233
234
235class WakeupFDTests(unittest.TestCase):
236
237    def test_invalid_fd(self):
238        fd = test_support.make_bad_fd()
239        self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
240
241
242@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
243class WakeupSignalTests(unittest.TestCase):
244    TIMEOUT_FULL = 10
245    TIMEOUT_HALF = 5
246
247    def test_wakeup_fd_early(self):
248        import select
249
250        signal.alarm(1)
251        try:
252            before_time = time.time()
253            # We attempt to get a signal during the sleep,
254            # before select is called
255            time.sleep(self.TIMEOUT_FULL)
256            mid_time = time.time()
257        finally:
258            signal.alarm(0)
259
260        self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
261        select.select([self.read], [], [], self.TIMEOUT_FULL)
262        after_time = time.time()
263        self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
264
265    def test_wakeup_fd_during(self):
266        import select
267
268        signal.alarm(1)
269        try:
270            before_time = time.time()
271            # We attempt to get a signal during the select call
272            self.assertRaises(select.error, select.select,
273                [self.read], [], [], self.TIMEOUT_FULL)
274            after_time = time.time()
275        finally:
276            signal.alarm(0)
277
278        self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
279
280    def setUp(self):
281        import fcntl
282
283        self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
284        self.read, self.write = os.pipe()
285        flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
286        flags = flags | os.O_NONBLOCK
287        fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
288        self.old_wakeup = signal.set_wakeup_fd(self.write)
289
290    def tearDown(self):
291        signal.set_wakeup_fd(self.old_wakeup)
292        os.close(self.read)
293        os.close(self.write)
294        signal.signal(signal.SIGALRM, self.alrm)
295
296@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
297class SiginterruptTest(unittest.TestCase):
298
299    def setUp(self):
300        """Install a no-op signal handler that can be set to allow
301        interrupts or not, and arrange for the original signal handler to be
302        re-installed when the test is finished.
303        """
304        self.signum = signal.SIGUSR1
305        oldhandler = signal.signal(self.signum, lambda x,y: None)
306        self.addCleanup(signal.signal, self.signum, oldhandler)
307
308    def readpipe_interrupted(self):
309        """Perform a read during which a signal will arrive.  Return True if the
310        read is interrupted by the signal and raises an exception.  Return False
311        if it returns normally.
312        """
313        # Create a pipe that can be used for the read.  Also clean it up
314        # when the test is over, since nothing else will (but see below for
315        # the write end).
316        r, w = os.pipe()
317        self.addCleanup(os.close, r)
318
319        # Create another process which can send a signal to this one to try
320        # to interrupt the read.
321        ppid = os.getpid()
322        pid = os.fork()
323
324        if pid == 0:
325            # Child code: sleep to give the parent enough time to enter the
326            # read() call (there's a race here, but it's really tricky to
327            # eliminate it); then signal the parent process.  Also, sleep
328            # again to make it likely that the signal is delivered to the
329            # parent process before the child exits.  If the child exits
330            # first, the write end of the pipe will be closed and the test
331            # is invalid.
332            try:
333                time.sleep(0.2)
334                os.kill(ppid, self.signum)
335                time.sleep(0.2)
336            finally:
337                # No matter what, just exit as fast as possible now.
338                exit_subprocess()
339        else:
340            # Parent code.
341            # Make sure the child is eventually reaped, else it'll be a
342            # zombie for the rest of the test suite run.
343            self.addCleanup(os.waitpid, pid, 0)
344
345            # Close the write end of the pipe.  The child has a copy, so
346            # it's not really closed until the child exits.  We need it to
347            # close when the child exits so that in the non-interrupt case
348            # the read eventually completes, otherwise we could just close
349            # it *after* the test.
350            os.close(w)
351
352            # Try the read and report whether it is interrupted or not to
353            # the caller.
354            try:
355                d = os.read(r, 1)
356                return False
357            except OSError, err:
358                if err.errno != errno.EINTR:
359                    raise
360                return True
361
362    def test_without_siginterrupt(self):
363        """If a signal handler is installed and siginterrupt is not called
364        at all, when that signal arrives, it interrupts a syscall that's in
365        progress.
366        """
367        i = self.readpipe_interrupted()
368        self.assertTrue(i)
369        # Arrival of the signal shouldn't have changed anything.
370        i = self.readpipe_interrupted()
371        self.assertTrue(i)
372
373    def test_siginterrupt_on(self):
374        """If a signal handler is installed and siginterrupt is called with
375        a true value for the second argument, when that signal arrives, it
376        interrupts a syscall that's in progress.
377        """
378        signal.siginterrupt(self.signum, 1)
379        i = self.readpipe_interrupted()
380        self.assertTrue(i)
381        # Arrival of the signal shouldn't have changed anything.
382        i = self.readpipe_interrupted()
383        self.assertTrue(i)
384
385    def test_siginterrupt_off(self):
386        """If a signal handler is installed and siginterrupt is called with
387        a false value for the second argument, when that signal arrives, it
388        does not interrupt a syscall that's in progress.
389        """
390        signal.siginterrupt(self.signum, 0)
391        i = self.readpipe_interrupted()
392        self.assertFalse(i)
393        # Arrival of the signal shouldn't have changed anything.
394        i = self.readpipe_interrupted()
395        self.assertFalse(i)
396
397
398@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
399class ItimerTest(unittest.TestCase):
400    def setUp(self):
401        self.hndl_called = False
402        self.hndl_count = 0
403        self.itimer = None
404        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
405
406    def tearDown(self):
407        signal.signal(signal.SIGALRM, self.old_alarm)
408        if self.itimer is not None: # test_itimer_exc doesn't change this attr
409            # just ensure that itimer is stopped
410            signal.setitimer(self.itimer, 0)
411
412    def sig_alrm(self, *args):
413        self.hndl_called = True
414        if test_support.verbose:
415            print("SIGALRM handler invoked", args)
416
417    def sig_vtalrm(self, *args):
418        self.hndl_called = True
419
420        if self.hndl_count > 3:
421            # it shouldn't be here, because it should have been disabled.
422            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
423                "timer.")
424        elif self.hndl_count == 3:
425            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
426            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
427            if test_support.verbose:
428                print("last SIGVTALRM handler call")
429
430        self.hndl_count += 1
431
432        if test_support.verbose:
433            print("SIGVTALRM handler invoked", args)
434
435    def sig_prof(self, *args):
436        self.hndl_called = True
437        signal.setitimer(signal.ITIMER_PROF, 0)
438
439        if test_support.verbose:
440            print("SIGPROF handler invoked", args)
441
442    def test_itimer_exc(self):
443        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
444        # defines it ?
445        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
446        # Negative times are treated as zero on some platforms.
447        if 0:
448            self.assertRaises(signal.ItimerError,
449                              signal.setitimer, signal.ITIMER_REAL, -1)
450
451    def test_itimer_real(self):
452        self.itimer = signal.ITIMER_REAL
453        signal.setitimer(self.itimer, 1.0)
454        if test_support.verbose:
455            print("\ncall pause()...")
456        signal.pause()
457
458        self.assertEqual(self.hndl_called, True)
459
460    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
461    @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
462        'itimer not reliable (does not mix well with threading) on some BSDs.')
463    def test_itimer_virtual(self):
464        self.itimer = signal.ITIMER_VIRTUAL
465        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
466        signal.setitimer(self.itimer, 0.3, 0.2)
467
468        start_time = time.time()
469        while time.time() - start_time < 60.0:
470            # use up some virtual time by doing real work
471            _ = pow(12345, 67890, 10000019)
472            if signal.getitimer(self.itimer) == (0.0, 0.0):
473                break # sig_vtalrm handler stopped this itimer
474        else: # Issue 8424
475            self.skipTest("timeout: likely cause: machine too slow or load too "
476                          "high")
477
478        # virtual itimer should be (0.0, 0.0) now
479        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
480        # and the handler should have been called
481        self.assertEqual(self.hndl_called, True)
482
483    # Issue 3864. Unknown if this affects earlier versions of freebsd also.
484    @unittest.skipIf(sys.platform=='freebsd6',
485        'itimer not reliable (does not mix well with threading) on freebsd6')
486    def test_itimer_prof(self):
487        self.itimer = signal.ITIMER_PROF
488        signal.signal(signal.SIGPROF, self.sig_prof)
489        signal.setitimer(self.itimer, 0.2, 0.2)
490
491        start_time = time.time()
492        while time.time() - start_time < 60.0:
493            # do some work
494            _ = pow(12345, 67890, 10000019)
495            if signal.getitimer(self.itimer) == (0.0, 0.0):
496                break # sig_prof handler stopped this itimer
497        else: # Issue 8424
498            self.skipTest("timeout: likely cause: machine too slow or load too "
499                          "high")
500
501        # profiling itimer should be (0.0, 0.0) now
502        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
503        # and the handler should have been called
504        self.assertEqual(self.hndl_called, True)
505
506    def test_setitimer_tiny(self):
507        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
508        # Check that float -> timeval conversion doesn't round
509        # the interval down to zero, which would disable the timer.
510        self.itimer = signal.ITIMER_REAL
511        signal.setitimer(self.itimer, 1e-6)
512        time.sleep(1)
513        self.assertEqual(self.hndl_called, True)
514
515
516def test_main():
517    test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
518                              WakeupFDTests, WakeupSignalTests,
519                              SiginterruptTest, ItimerTest,
520                              WindowsSignalTests)
521
522
523if __name__ == "__main__":
524    test_main()
525