1# Test case for the os.poll() function 2 3import os 4import subprocess 5import random 6import select 7import threading 8import time 9import unittest 10from test.support import TESTFN, run_unittest, reap_threads, cpython_only 11 12try: 13 select.poll 14except AttributeError: 15 raise unittest.SkipTest("select.poll not defined") 16 17 18def find_ready_matching(ready, flag): 19 match = [] 20 for fd, mode in ready: 21 if mode & flag: 22 match.append(fd) 23 return match 24 25class PollTests(unittest.TestCase): 26 27 def test_poll1(self): 28 # Basic functional test of poll object 29 # Create a bunch of pipe and test that poll works with them. 30 31 p = select.poll() 32 33 NUM_PIPES = 12 34 MSG = b" This is a test." 35 MSG_LEN = len(MSG) 36 readers = [] 37 writers = [] 38 r2w = {} 39 w2r = {} 40 41 for i in range(NUM_PIPES): 42 rd, wr = os.pipe() 43 p.register(rd) 44 p.modify(rd, select.POLLIN) 45 p.register(wr, select.POLLOUT) 46 readers.append(rd) 47 writers.append(wr) 48 r2w[rd] = wr 49 w2r[wr] = rd 50 51 bufs = [] 52 53 while writers: 54 ready = p.poll() 55 ready_writers = find_ready_matching(ready, select.POLLOUT) 56 if not ready_writers: 57 raise RuntimeError("no pipes ready for writing") 58 wr = random.choice(ready_writers) 59 os.write(wr, MSG) 60 61 ready = p.poll() 62 ready_readers = find_ready_matching(ready, select.POLLIN) 63 if not ready_readers: 64 raise RuntimeError("no pipes ready for reading") 65 rd = random.choice(ready_readers) 66 buf = os.read(rd, MSG_LEN) 67 self.assertEqual(len(buf), MSG_LEN) 68 bufs.append(buf) 69 os.close(r2w[rd]) ; os.close( rd ) 70 p.unregister( r2w[rd] ) 71 p.unregister( rd ) 72 writers.remove(r2w[rd]) 73 74 self.assertEqual(bufs, [MSG] * NUM_PIPES) 75 76 def test_poll_unit_tests(self): 77 # returns NVAL for invalid file descriptor 78 FD, w = os.pipe() 79 os.close(FD) 80 os.close(w) 81 p = select.poll() 82 p.register(FD) 83 r = p.poll() 84 self.assertEqual(r[0], (FD, select.POLLNVAL)) 85 86 with open(TESTFN, 'w') as f: 87 fd = f.fileno() 88 p = select.poll() 89 p.register(f) 90 r = p.poll() 91 self.assertEqual(r[0][0], fd) 92 r = p.poll() 93 self.assertEqual(r[0], (fd, select.POLLNVAL)) 94 os.unlink(TESTFN) 95 96 # type error for invalid arguments 97 p = select.poll() 98 self.assertRaises(TypeError, p.register, p) 99 self.assertRaises(TypeError, p.unregister, p) 100 101 # can't unregister non-existent object 102 p = select.poll() 103 self.assertRaises(KeyError, p.unregister, 3) 104 105 # Test error cases 106 pollster = select.poll() 107 class Nope: 108 pass 109 110 class Almost: 111 def fileno(self): 112 return 'fileno' 113 114 self.assertRaises(TypeError, pollster.register, Nope(), 0) 115 self.assertRaises(TypeError, pollster.register, Almost(), 0) 116 117 # Another test case for poll(). This is copied from the test case for 118 # select(), modified to use poll() instead. 119 120 def test_poll2(self): 121 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 122 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 123 bufsize=0) 124 proc.__enter__() 125 self.addCleanup(proc.__exit__, None, None, None) 126 p = proc.stdout 127 pollster = select.poll() 128 pollster.register( p, select.POLLIN ) 129 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 130 fdlist = pollster.poll(tout) 131 if (fdlist == []): 132 continue 133 fd, flags = fdlist[0] 134 if flags & select.POLLHUP: 135 line = p.readline() 136 if line != b"": 137 self.fail('error: pipe seems to be closed, but still returns data') 138 continue 139 140 elif flags & select.POLLIN: 141 line = p.readline() 142 if not line: 143 break 144 self.assertEqual(line, b'testing...\n') 145 continue 146 else: 147 self.fail('Unexpected return value from select.poll: %s' % fdlist) 148 149 def test_poll3(self): 150 # test int overflow 151 pollster = select.poll() 152 pollster.register(1) 153 154 self.assertRaises(OverflowError, pollster.poll, 1 << 64) 155 156 x = 2 + 3 157 if x != 5: 158 self.fail('Overflow must have occurred') 159 160 # Issues #15989, #17919 161 self.assertRaises(ValueError, pollster.register, 0, -1) 162 self.assertRaises(OverflowError, pollster.register, 0, 1 << 64) 163 self.assertRaises(ValueError, pollster.modify, 1, -1) 164 self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64) 165 166 @cpython_only 167 def test_poll_c_limits(self): 168 from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX 169 pollster = select.poll() 170 pollster.register(1) 171 172 # Issues #15989, #17919 173 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1) 174 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1) 175 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1) 176 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1) 177 178 @reap_threads 179 def test_threaded_poll(self): 180 r, w = os.pipe() 181 self.addCleanup(os.close, r) 182 self.addCleanup(os.close, w) 183 rfds = [] 184 for i in range(10): 185 fd = os.dup(r) 186 self.addCleanup(os.close, fd) 187 rfds.append(fd) 188 pollster = select.poll() 189 for fd in rfds: 190 pollster.register(fd, select.POLLIN) 191 192 t = threading.Thread(target=pollster.poll) 193 t.start() 194 try: 195 time.sleep(0.5) 196 # trigger ufds array reallocation 197 for fd in rfds: 198 pollster.unregister(fd) 199 pollster.register(w, select.POLLOUT) 200 self.assertRaises(RuntimeError, pollster.poll) 201 finally: 202 # and make the call to poll() from the thread return 203 os.write(w, b'spam') 204 t.join() 205 206 @unittest.skipUnless(threading, 'Threading required for this test.') 207 @reap_threads 208 def test_poll_blocks_with_negative_ms(self): 209 for timeout_ms in [None, -1000, -1, -1.0, -0.1, -1e-100]: 210 # Create two file descriptors. This will be used to unlock 211 # the blocking call to poll.poll inside the thread 212 r, w = os.pipe() 213 pollster = select.poll() 214 pollster.register(r, select.POLLIN) 215 216 poll_thread = threading.Thread(target=pollster.poll, args=(timeout_ms,)) 217 poll_thread.start() 218 poll_thread.join(timeout=0.1) 219 self.assertTrue(poll_thread.is_alive()) 220 221 # Write to the pipe so pollster.poll unblocks and the thread ends. 222 os.write(w, b'spam') 223 poll_thread.join() 224 self.assertFalse(poll_thread.is_alive()) 225 os.close(r) 226 os.close(w) 227 228 229def test_main(): 230 run_unittest(PollTests) 231 232if __name__ == '__main__': 233 test_main() 234