1# Test case for the os.poll() function
2
3import os
4import random
5import select
6try:
7    import threading
8except ImportError:
9    threading = None
10import time
11import unittest
12from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
13
14try:
15    select.poll
16except AttributeError:
17    raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
18
19
20def find_ready_matching(ready, flag):
21    match = []
22    for fd, mode in ready:
23        if mode & flag:
24            match.append(fd)
25    return match
26
27class PollTests(unittest.TestCase):
28
29    def test_poll1(self):
30        # Basic functional test of poll object
31        # Create a bunch of pipe and test that poll works with them.
32
33        p = select.poll()
34
35        NUM_PIPES = 12
36        MSG = " This is a test."
37        MSG_LEN = len(MSG)
38        readers = []
39        writers = []
40        r2w = {}
41        w2r = {}
42
43        for i in range(NUM_PIPES):
44            rd, wr = os.pipe()
45            p.register(rd)
46            p.modify(rd, select.POLLIN)
47            p.register(wr, select.POLLOUT)
48            readers.append(rd)
49            writers.append(wr)
50            r2w[rd] = wr
51            w2r[wr] = rd
52
53        bufs = []
54
55        while writers:
56            ready = p.poll()
57            ready_writers = find_ready_matching(ready, select.POLLOUT)
58            if not ready_writers:
59                raise RuntimeError, "no pipes ready for writing"
60            wr = random.choice(ready_writers)
61            os.write(wr, MSG)
62
63            ready = p.poll()
64            ready_readers = find_ready_matching(ready, select.POLLIN)
65            if not ready_readers:
66                raise RuntimeError, "no pipes ready for reading"
67            rd = random.choice(ready_readers)
68            buf = os.read(rd, MSG_LEN)
69            self.assertEqual(len(buf), MSG_LEN)
70            bufs.append(buf)
71            os.close(r2w[rd]) ; os.close( rd )
72            p.unregister( r2w[rd] )
73            p.unregister( rd )
74            writers.remove(r2w[rd])
75
76        self.assertEqual(bufs, [MSG] * NUM_PIPES)
77
78    def poll_unit_tests(self):
79        # returns NVAL for invalid file descriptor
80        FD = 42
81        try:
82            os.close(FD)
83        except OSError:
84            pass
85        p = select.poll()
86        p.register(FD)
87        r = p.poll()
88        self.assertEqual(r[0], (FD, select.POLLNVAL))
89
90        f = open(TESTFN, 'w')
91        fd = f.fileno()
92        p = select.poll()
93        p.register(f)
94        r = p.poll()
95        self.assertEqual(r[0][0], fd)
96        f.close()
97        r = p.poll()
98        self.assertEqual(r[0], (fd, select.POLLNVAL))
99        os.unlink(TESTFN)
100
101        # type error for invalid arguments
102        p = select.poll()
103        self.assertRaises(TypeError, p.register, p)
104        self.assertRaises(TypeError, p.unregister, p)
105
106        # can't unregister non-existent object
107        p = select.poll()
108        self.assertRaises(KeyError, p.unregister, 3)
109
110        # Test error cases
111        pollster = select.poll()
112        class Nope:
113            pass
114
115        class Almost:
116            def fileno(self):
117                return 'fileno'
118
119        self.assertRaises(TypeError, pollster.register, Nope(), 0)
120        self.assertRaises(TypeError, pollster.register, Almost(), 0)
121
122    # Another test case for poll().  This is copied from the test case for
123    # select(), modified to use poll() instead.
124
125    def test_poll2(self):
126        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
127        p = os.popen(cmd, 'r')
128        pollster = select.poll()
129        pollster.register( p, select.POLLIN )
130        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
131            fdlist = pollster.poll(tout)
132            if (fdlist == []):
133                continue
134            fd, flags = fdlist[0]
135            if flags & select.POLLHUP:
136                line = p.readline()
137                if line != "":
138                    self.fail('error: pipe seems to be closed, but still returns data')
139                continue
140
141            elif flags & select.POLLIN:
142                line = p.readline()
143                if not line:
144                    break
145                continue
146            else:
147                self.fail('Unexpected return value from select.poll: %s' % fdlist)
148        p.close()
149
150    def test_poll3(self):
151        # test int overflow
152        pollster = select.poll()
153        pollster.register(1)
154
155        self.assertRaises(OverflowError, pollster.poll, 1L << 64)
156
157        x = 2 + 3
158        if x != 5:
159            self.fail('Overflow must have occurred')
160
161        # Issues #15989, #17919
162        self.assertRaises(OverflowError, pollster.register, 0, -1)
163        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
164        self.assertRaises(OverflowError, pollster.modify, 1, -1)
165        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
166
167    @cpython_only
168    def test_poll_c_limits(self):
169        from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
170        pollster = select.poll()
171        pollster.register(1)
172
173        # Issues #15989, #17919
174        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
175        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
176        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
177        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
178
179    @unittest.skipUnless(threading, 'Threading required for this test.')
180    @reap_threads
181    def test_threaded_poll(self):
182        r, w = os.pipe()
183        self.addCleanup(os.close, r)
184        self.addCleanup(os.close, w)
185        rfds = []
186        for i in range(10):
187            fd = os.dup(r)
188            self.addCleanup(os.close, fd)
189            rfds.append(fd)
190        pollster = select.poll()
191        for fd in rfds:
192            pollster.register(fd, select.POLLIN)
193
194        t = threading.Thread(target=pollster.poll)
195        t.start()
196        try:
197            time.sleep(0.5)
198            # trigger ufds array reallocation
199            for fd in rfds:
200                pollster.unregister(fd)
201            pollster.register(w, select.POLLOUT)
202            self.assertRaises(RuntimeError, pollster.poll)
203        finally:
204            # and make the call to poll() from the thread return
205            os.write(w, b'spam')
206            t.join()
207
208    @unittest.skipUnless(threading, 'Threading required for this test.')
209    @reap_threads
210    def test_poll_blocks_with_negative_ms(self):
211        for timeout_ms in [None, -1000, -1, -1.0]:
212            # Create two file descriptors. This will be used to unlock
213            # the blocking call to poll.poll inside the thread
214            r, w = os.pipe()
215            pollster = select.poll()
216            pollster.register(r, select.POLLIN)
217
218            poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,))
219            poll_thread.start()
220            poll_thread.join(timeout=0.1)
221            self.assertTrue(poll_thread.is_alive())
222
223            # Write to the pipe so pollster.poll unblocks and the thread ends.
224            os.write(w, b'spam')
225            poll_thread.join()
226            self.assertFalse(poll_thread.is_alive())
227            os.close(r)
228            os.close(w)
229
230
231def test_main():
232    run_unittest(PollTests)
233
234if __name__ == '__main__':
235    test_main()
236