1# Test case for the os.poll() function 2 3import os 4import random 5import select 6try: 7 import threading 8except ImportError: 9 threading = None 10import time 11import unittest 12from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only 13 14try: 15 select.poll 16except AttributeError: 17 raise unittest.SkipTest, "select.poll not defined -- skipping test_poll" 18 19 20def find_ready_matching(ready, flag): 21 match = [] 22 for fd, mode in ready: 23 if mode & flag: 24 match.append(fd) 25 return match 26 27class PollTests(unittest.TestCase): 28 29 def test_poll1(self): 30 # Basic functional test of poll object 31 # Create a bunch of pipe and test that poll works with them. 32 33 p = select.poll() 34 35 NUM_PIPES = 12 36 MSG = " This is a test." 37 MSG_LEN = len(MSG) 38 readers = [] 39 writers = [] 40 r2w = {} 41 w2r = {} 42 43 for i in range(NUM_PIPES): 44 rd, wr = os.pipe() 45 p.register(rd) 46 p.modify(rd, select.POLLIN) 47 p.register(wr, select.POLLOUT) 48 readers.append(rd) 49 writers.append(wr) 50 r2w[rd] = wr 51 w2r[wr] = rd 52 53 bufs = [] 54 55 while writers: 56 ready = p.poll() 57 ready_writers = find_ready_matching(ready, select.POLLOUT) 58 if not ready_writers: 59 raise RuntimeError, "no pipes ready for writing" 60 wr = random.choice(ready_writers) 61 os.write(wr, MSG) 62 63 ready = p.poll() 64 ready_readers = find_ready_matching(ready, select.POLLIN) 65 if not ready_readers: 66 raise RuntimeError, "no pipes ready for reading" 67 rd = random.choice(ready_readers) 68 buf = os.read(rd, MSG_LEN) 69 self.assertEqual(len(buf), MSG_LEN) 70 bufs.append(buf) 71 os.close(r2w[rd]) ; os.close( rd ) 72 p.unregister( r2w[rd] ) 73 p.unregister( rd ) 74 writers.remove(r2w[rd]) 75 76 self.assertEqual(bufs, [MSG] * NUM_PIPES) 77 78 def poll_unit_tests(self): 79 # returns NVAL for invalid file descriptor 80 FD = 42 81 try: 82 os.close(FD) 83 except OSError: 84 pass 85 p = select.poll() 86 p.register(FD) 87 r = p.poll() 88 self.assertEqual(r[0], (FD, select.POLLNVAL)) 89 90 f = open(TESTFN, 'w') 91 fd = f.fileno() 92 p = select.poll() 93 p.register(f) 94 r = p.poll() 95 self.assertEqual(r[0][0], fd) 96 f.close() 97 r = p.poll() 98 self.assertEqual(r[0], (fd, select.POLLNVAL)) 99 os.unlink(TESTFN) 100 101 # type error for invalid arguments 102 p = select.poll() 103 self.assertRaises(TypeError, p.register, p) 104 self.assertRaises(TypeError, p.unregister, p) 105 106 # can't unregister non-existent object 107 p = select.poll() 108 self.assertRaises(KeyError, p.unregister, 3) 109 110 # Test error cases 111 pollster = select.poll() 112 class Nope: 113 pass 114 115 class Almost: 116 def fileno(self): 117 return 'fileno' 118 119 self.assertRaises(TypeError, pollster.register, Nope(), 0) 120 self.assertRaises(TypeError, pollster.register, Almost(), 0) 121 122 # Another test case for poll(). This is copied from the test case for 123 # select(), modified to use poll() instead. 124 125 def test_poll2(self): 126 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 127 p = os.popen(cmd, 'r') 128 pollster = select.poll() 129 pollster.register( p, select.POLLIN ) 130 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 131 fdlist = pollster.poll(tout) 132 if (fdlist == []): 133 continue 134 fd, flags = fdlist[0] 135 if flags & select.POLLHUP: 136 line = p.readline() 137 if line != "": 138 self.fail('error: pipe seems to be closed, but still returns data') 139 continue 140 141 elif flags & select.POLLIN: 142 line = p.readline() 143 if not line: 144 break 145 continue 146 else: 147 self.fail('Unexpected return value from select.poll: %s' % fdlist) 148 p.close() 149 150 def test_poll3(self): 151 # test int overflow 152 pollster = select.poll() 153 pollster.register(1) 154 155 self.assertRaises(OverflowError, pollster.poll, 1L << 64) 156 157 x = 2 + 3 158 if x != 5: 159 self.fail('Overflow must have occurred') 160 161 # Issues #15989, #17919 162 self.assertRaises(OverflowError, pollster.register, 0, -1) 163 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64) 164 self.assertRaises(OverflowError, pollster.modify, 1, -1) 165 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64) 166 167 @cpython_only 168 def test_poll_c_limits(self): 169 from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX 170 pollster = select.poll() 171 pollster.register(1) 172 173 # Issues #15989, #17919 174 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1) 175 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1) 176 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1) 177 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1) 178 179 @unittest.skipUnless(threading, 'Threading required for this test.') 180 @reap_threads 181 def test_threaded_poll(self): 182 r, w = os.pipe() 183 self.addCleanup(os.close, r) 184 self.addCleanup(os.close, w) 185 rfds = [] 186 for i in range(10): 187 fd = os.dup(r) 188 self.addCleanup(os.close, fd) 189 rfds.append(fd) 190 pollster = select.poll() 191 for fd in rfds: 192 pollster.register(fd, select.POLLIN) 193 194 t = threading.Thread(target=pollster.poll) 195 t.start() 196 try: 197 time.sleep(0.5) 198 # trigger ufds array reallocation 199 for fd in rfds: 200 pollster.unregister(fd) 201 pollster.register(w, select.POLLOUT) 202 self.assertRaises(RuntimeError, pollster.poll) 203 finally: 204 # and make the call to poll() from the thread return 205 os.write(w, b'spam') 206 t.join() 207 208 @unittest.skipUnless(threading, 'Threading required for this test.') 209 @reap_threads 210 def test_poll_blocks_with_negative_ms(self): 211 for timeout_ms in [None, -1000, -1, -1.0]: 212 # Create two file descriptors. This will be used to unlock 213 # the blocking call to poll.poll inside the thread 214 r, w = os.pipe() 215 pollster = select.poll() 216 pollster.register(r, select.POLLIN) 217 218 poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,)) 219 poll_thread.start() 220 poll_thread.join(timeout=0.1) 221 self.assertTrue(poll_thread.is_alive()) 222 223 # Write to the pipe so pollster.poll unblocks and the thread ends. 224 os.write(w, b'spam') 225 poll_thread.join() 226 self.assertFalse(poll_thread.is_alive()) 227 os.close(r) 228 os.close(w) 229 230 231def test_main(): 232 run_unittest(PollTests) 233 234if __name__ == '__main__': 235 test_main() 236