1""" 2Tests for kqueue wrapper. 3""" 4import socket 5import errno 6import time 7import select 8import sys 9import unittest 10 11from test import test_support 12if not hasattr(select, "kqueue"): 13 raise unittest.SkipTest("test works only on BSD") 14 15class TestKQueue(unittest.TestCase): 16 def test_create_queue(self): 17 kq = select.kqueue() 18 self.assertTrue(kq.fileno() > 0, kq.fileno()) 19 self.assertTrue(not kq.closed) 20 kq.close() 21 self.assertTrue(kq.closed) 22 self.assertRaises(ValueError, kq.fileno) 23 24 def test_create_event(self): 25 fd = sys.stderr.fileno() 26 ev = select.kevent(fd) 27 other = select.kevent(1000) 28 self.assertEqual(ev.ident, fd) 29 self.assertEqual(ev.filter, select.KQ_FILTER_READ) 30 self.assertEqual(ev.flags, select.KQ_EV_ADD) 31 self.assertEqual(ev.fflags, 0) 32 self.assertEqual(ev.data, 0) 33 self.assertEqual(ev.udata, 0) 34 self.assertEqual(ev, ev) 35 self.assertNotEqual(ev, other) 36 self.assertEqual(cmp(ev, other), -1) 37 self.assertTrue(ev < other) 38 self.assertTrue(other >= ev) 39 self.assertNotEqual(cmp(ev, None), 0) 40 self.assertNotEqual(cmp(ev, 1), 0) 41 self.assertNotEqual(cmp(ev, "ev"), 0) 42 self.assertEqual(cmp(ev, None), -cmp(None, ev)) 43 self.assertEqual(cmp(ev, 1), -cmp(1, ev)) 44 self.assertEqual(cmp(ev, "ev"), -cmp("ev", ev)) 45 46 ev = select.kevent(fd, select.KQ_FILTER_WRITE) 47 self.assertEqual(ev.ident, fd) 48 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 49 self.assertEqual(ev.flags, select.KQ_EV_ADD) 50 self.assertEqual(ev.fflags, 0) 51 self.assertEqual(ev.data, 0) 52 self.assertEqual(ev.udata, 0) 53 self.assertEqual(ev, ev) 54 self.assertNotEqual(ev, other) 55 56 ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT) 57 self.assertEqual(ev.ident, fd) 58 self.assertEqual(ev.filter, select.KQ_FILTER_WRITE) 59 self.assertEqual(ev.flags, select.KQ_EV_ONESHOT) 60 self.assertEqual(ev.fflags, 0) 61 self.assertEqual(ev.data, 0) 62 self.assertEqual(ev.udata, 0) 63 self.assertEqual(ev, ev) 64 self.assertNotEqual(ev, other) 65 66 ev = select.kevent(1, 2, 3, 4, 5, 6) 67 self.assertEqual(ev.ident, 1) 68 self.assertEqual(ev.filter, 2) 69 self.assertEqual(ev.flags, 3) 70 self.assertEqual(ev.fflags, 4) 71 self.assertEqual(ev.data, 5) 72 self.assertEqual(ev.udata, 6) 73 self.assertEqual(ev, ev) 74 self.assertNotEqual(ev, other) 75 76 bignum = 0x7fff 77 ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum) 78 self.assertEqual(ev.ident, bignum) 79 self.assertEqual(ev.filter, 1) 80 self.assertEqual(ev.flags, 2) 81 self.assertEqual(ev.fflags, 3) 82 self.assertEqual(ev.data, bignum - 1) 83 self.assertEqual(ev.udata, bignum) 84 self.assertEqual(ev, ev) 85 self.assertNotEqual(ev, other) 86 87 # Issue 11973 88 bignum = 0xffff 89 ev = select.kevent(0, 1, bignum) 90 self.assertEqual(ev.ident, 0) 91 self.assertEqual(ev.filter, 1) 92 self.assertEqual(ev.flags, bignum) 93 self.assertEqual(ev.fflags, 0) 94 self.assertEqual(ev.data, 0) 95 self.assertEqual(ev.udata, 0) 96 self.assertEqual(ev, ev) 97 self.assertNotEqual(ev, other) 98 99 # Issue 11973 100 bignum = 0xffffffff 101 ev = select.kevent(0, 1, 2, bignum) 102 self.assertEqual(ev.ident, 0) 103 self.assertEqual(ev.filter, 1) 104 self.assertEqual(ev.flags, 2) 105 self.assertEqual(ev.fflags, bignum) 106 self.assertEqual(ev.data, 0) 107 self.assertEqual(ev.udata, 0) 108 self.assertEqual(ev, ev) 109 self.assertNotEqual(ev, other) 110 111 112 def test_queue_event(self): 113 serverSocket = socket.socket() 114 serverSocket.bind(('127.0.0.1', 0)) 115 serverSocket.listen(1) 116 client = socket.socket() 117 client.setblocking(False) 118 try: 119 client.connect(('127.0.0.1', serverSocket.getsockname()[1])) 120 except socket.error, e: 121 self.assertEqual(e.args[0], errno.EINPROGRESS) 122 else: 123 #raise AssertionError("Connect should have raised EINPROGRESS") 124 pass # FreeBSD doesn't raise an exception here 125 server, addr = serverSocket.accept() 126 127 kq = select.kqueue() 128 kq2 = select.kqueue.fromfd(kq.fileno()) 129 130 ev = select.kevent(server.fileno(), 131 select.KQ_FILTER_WRITE, 132 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 133 kq.control([ev], 0) 134 ev = select.kevent(server.fileno(), 135 select.KQ_FILTER_READ, 136 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 137 kq.control([ev], 0) 138 ev = select.kevent(client.fileno(), 139 select.KQ_FILTER_WRITE, 140 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 141 kq2.control([ev], 0) 142 ev = select.kevent(client.fileno(), 143 select.KQ_FILTER_READ, 144 select.KQ_EV_ADD | select.KQ_EV_ENABLE) 145 kq2.control([ev], 0) 146 147 events = kq.control(None, 4, 1) 148 events = set((e.ident, e.filter) for e in events) 149 self.assertEqual(events, set([ 150 (client.fileno(), select.KQ_FILTER_WRITE), 151 (server.fileno(), select.KQ_FILTER_WRITE)])) 152 153 client.send("Hello!") 154 server.send("world!!!") 155 156 # We may need to call it several times 157 for i in range(10): 158 events = kq.control(None, 4, 1) 159 if len(events) == 4: 160 break 161 time.sleep(1.0) 162 else: 163 self.fail('timeout waiting for event notifications') 164 165 events = set((e.ident, e.filter) for e in events) 166 self.assertEqual(events, set([ 167 (client.fileno(), select.KQ_FILTER_WRITE), 168 (client.fileno(), select.KQ_FILTER_READ), 169 (server.fileno(), select.KQ_FILTER_WRITE), 170 (server.fileno(), select.KQ_FILTER_READ)])) 171 172 # Remove completely client, and server read part 173 ev = select.kevent(client.fileno(), 174 select.KQ_FILTER_WRITE, 175 select.KQ_EV_DELETE) 176 kq.control([ev], 0) 177 ev = select.kevent(client.fileno(), 178 select.KQ_FILTER_READ, 179 select.KQ_EV_DELETE) 180 kq.control([ev], 0) 181 ev = select.kevent(server.fileno(), 182 select.KQ_FILTER_READ, 183 select.KQ_EV_DELETE) 184 kq.control([ev], 0, 0) 185 186 events = kq.control([], 4, 0.99) 187 events = set((e.ident, e.filter) for e in events) 188 self.assertEqual(events, set([ 189 (server.fileno(), select.KQ_FILTER_WRITE)])) 190 191 client.close() 192 server.close() 193 serverSocket.close() 194 195 def testPair(self): 196 kq = select.kqueue() 197 a, b = socket.socketpair() 198 199 a.send(b'foo') 200 event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 201 event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 202 r = kq.control([event1, event2], 1, 1) 203 self.assertTrue(r) 204 self.assertFalse(r[0].flags & select.KQ_EV_ERROR) 205 self.assertEqual(b.recv(r[0].data), b'foo') 206 207 a.close() 208 b.close() 209 kq.close() 210 211 def test_issue30058(self): 212 # changelist must be an iterable 213 kq = select.kqueue() 214 a, b = socket.socketpair() 215 ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE) 216 217 kq.control([ev], 0) 218 # not a list 219 kq.control((ev,), 0) 220 # __len__ is not consistent with __iter__ 221 class BadList: 222 def __len__(self): 223 return 0 224 def __iter__(self): 225 for i in range(100): 226 yield ev 227 kq.control(BadList(), 0) 228 # doesn't have __len__ 229 kq.control(iter([ev]), 0) 230 231 a.close() 232 b.close() 233 kq.close() 234 235def test_main(): 236 test_support.run_unittest(TestKQueue) 237 238if __name__ == "__main__": 239 test_main() 240