1import asyncore 2import unittest 3import select 4import os 5import socket 6import sys 7import time 8import errno 9import struct 10import threading 11 12from test import support 13from io import BytesIO 14 15if support.PGO: 16 raise unittest.SkipTest("test is not helpful for PGO") 17 18 19TIMEOUT = 3 20HAS_UNIX_SOCKETS = hasattr(socket, 'AF_UNIX') 21 22class dummysocket: 23 def __init__(self): 24 self.closed = False 25 26 def close(self): 27 self.closed = True 28 29 def fileno(self): 30 return 42 31 32class dummychannel: 33 def __init__(self): 34 self.socket = dummysocket() 35 36 def close(self): 37 self.socket.close() 38 39class exitingdummy: 40 def __init__(self): 41 pass 42 43 def handle_read_event(self): 44 raise asyncore.ExitNow() 45 46 handle_write_event = handle_read_event 47 handle_close = handle_read_event 48 handle_expt_event = handle_read_event 49 50class crashingdummy: 51 def __init__(self): 52 self.error_handled = False 53 54 def handle_read_event(self): 55 raise Exception() 56 57 handle_write_event = handle_read_event 58 handle_close = handle_read_event 59 handle_expt_event = handle_read_event 60 61 def handle_error(self): 62 self.error_handled = True 63 64# used when testing senders; just collects what it gets until newline is sent 65def capture_server(evt, buf, serv): 66 try: 67 serv.listen() 68 conn, addr = serv.accept() 69 except socket.timeout: 70 pass 71 else: 72 n = 200 73 start = time.monotonic() 74 while n > 0 and time.monotonic() - start < 3.0: 75 r, w, e = select.select([conn], [], [], 0.1) 76 if r: 77 n -= 1 78 data = conn.recv(10) 79 # keep everything except for the newline terminator 80 buf.write(data.replace(b'\n', b'')) 81 if b'\n' in data: 82 break 83 time.sleep(0.01) 84 85 conn.close() 86 finally: 87 serv.close() 88 evt.set() 89 90def bind_af_aware(sock, addr): 91 """Helper function to bind a socket according to its family.""" 92 if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: 93 # Make sure the path doesn't exist. 94 support.unlink(addr) 95 support.bind_unix_socket(sock, addr) 96 else: 97 sock.bind(addr) 98 99 100class HelperFunctionTests(unittest.TestCase): 101 def test_readwriteexc(self): 102 # Check exception handling behavior of read, write and _exception 103 104 # check that ExitNow exceptions in the object handler method 105 # bubbles all the way up through asyncore read/write/_exception calls 106 tr1 = exitingdummy() 107 self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) 108 self.assertRaises(asyncore.ExitNow, asyncore.write, tr1) 109 self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) 110 111 # check that an exception other than ExitNow in the object handler 112 # method causes the handle_error method to get called 113 tr2 = crashingdummy() 114 asyncore.read(tr2) 115 self.assertEqual(tr2.error_handled, True) 116 117 tr2 = crashingdummy() 118 asyncore.write(tr2) 119 self.assertEqual(tr2.error_handled, True) 120 121 tr2 = crashingdummy() 122 asyncore._exception(tr2) 123 self.assertEqual(tr2.error_handled, True) 124 125 # asyncore.readwrite uses constants in the select module that 126 # are not present in Windows systems (see this thread: 127 # http://mail.python.org/pipermail/python-list/2001-October/109973.html) 128 # These constants should be present as long as poll is available 129 130 @unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 131 def test_readwrite(self): 132 # Check that correct methods are called by readwrite() 133 134 attributes = ('read', 'expt', 'write', 'closed', 'error_handled') 135 136 expected = ( 137 (select.POLLIN, 'read'), 138 (select.POLLPRI, 'expt'), 139 (select.POLLOUT, 'write'), 140 (select.POLLERR, 'closed'), 141 (select.POLLHUP, 'closed'), 142 (select.POLLNVAL, 'closed'), 143 ) 144 145 class testobj: 146 def __init__(self): 147 self.read = False 148 self.write = False 149 self.closed = False 150 self.expt = False 151 self.error_handled = False 152 153 def handle_read_event(self): 154 self.read = True 155 156 def handle_write_event(self): 157 self.write = True 158 159 def handle_close(self): 160 self.closed = True 161 162 def handle_expt_event(self): 163 self.expt = True 164 165 def handle_error(self): 166 self.error_handled = True 167 168 for flag, expectedattr in expected: 169 tobj = testobj() 170 self.assertEqual(getattr(tobj, expectedattr), False) 171 asyncore.readwrite(tobj, flag) 172 173 # Only the attribute modified by the routine we expect to be 174 # called should be True. 175 for attr in attributes: 176 self.assertEqual(getattr(tobj, attr), attr==expectedattr) 177 178 # check that ExitNow exceptions in the object handler method 179 # bubbles all the way up through asyncore readwrite call 180 tr1 = exitingdummy() 181 self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) 182 183 # check that an exception other than ExitNow in the object handler 184 # method causes the handle_error method to get called 185 tr2 = crashingdummy() 186 self.assertEqual(tr2.error_handled, False) 187 asyncore.readwrite(tr2, flag) 188 self.assertEqual(tr2.error_handled, True) 189 190 def test_closeall(self): 191 self.closeall_check(False) 192 193 def test_closeall_default(self): 194 self.closeall_check(True) 195 196 def closeall_check(self, usedefault): 197 # Check that close_all() closes everything in a given map 198 199 l = [] 200 testmap = {} 201 for i in range(10): 202 c = dummychannel() 203 l.append(c) 204 self.assertEqual(c.socket.closed, False) 205 testmap[i] = c 206 207 if usedefault: 208 socketmap = asyncore.socket_map 209 try: 210 asyncore.socket_map = testmap 211 asyncore.close_all() 212 finally: 213 testmap, asyncore.socket_map = asyncore.socket_map, socketmap 214 else: 215 asyncore.close_all(testmap) 216 217 self.assertEqual(len(testmap), 0) 218 219 for c in l: 220 self.assertEqual(c.socket.closed, True) 221 222 def test_compact_traceback(self): 223 try: 224 raise Exception("I don't like spam!") 225 except: 226 real_t, real_v, real_tb = sys.exc_info() 227 r = asyncore.compact_traceback() 228 else: 229 self.fail("Expected exception") 230 231 (f, function, line), t, v, info = r 232 self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') 233 self.assertEqual(function, 'test_compact_traceback') 234 self.assertEqual(t, real_t) 235 self.assertEqual(v, real_v) 236 self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) 237 238 239class DispatcherTests(unittest.TestCase): 240 def setUp(self): 241 pass 242 243 def tearDown(self): 244 asyncore.close_all() 245 246 def test_basic(self): 247 d = asyncore.dispatcher() 248 self.assertEqual(d.readable(), True) 249 self.assertEqual(d.writable(), True) 250 251 def test_repr(self): 252 d = asyncore.dispatcher() 253 self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) 254 255 def test_log(self): 256 d = asyncore.dispatcher() 257 258 # capture output of dispatcher.log() (to stderr) 259 l1 = "Lovely spam! Wonderful spam!" 260 l2 = "I don't like spam!" 261 with support.captured_stderr() as stderr: 262 d.log(l1) 263 d.log(l2) 264 265 lines = stderr.getvalue().splitlines() 266 self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) 267 268 def test_log_info(self): 269 d = asyncore.dispatcher() 270 271 # capture output of dispatcher.log_info() (to stdout via print) 272 l1 = "Have you got anything without spam?" 273 l2 = "Why can't she have egg bacon spam and sausage?" 274 l3 = "THAT'S got spam in it!" 275 with support.captured_stdout() as stdout: 276 d.log_info(l1, 'EGGS') 277 d.log_info(l2) 278 d.log_info(l3, 'SPAM') 279 280 lines = stdout.getvalue().splitlines() 281 expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] 282 self.assertEqual(lines, expected) 283 284 def test_unhandled(self): 285 d = asyncore.dispatcher() 286 d.ignore_log_types = () 287 288 # capture output of dispatcher.log_info() (to stdout via print) 289 with support.captured_stdout() as stdout: 290 d.handle_expt() 291 d.handle_read() 292 d.handle_write() 293 d.handle_connect() 294 295 lines = stdout.getvalue().splitlines() 296 expected = ['warning: unhandled incoming priority event', 297 'warning: unhandled read event', 298 'warning: unhandled write event', 299 'warning: unhandled connect event'] 300 self.assertEqual(lines, expected) 301 302 def test_strerror(self): 303 # refers to bug #8573 304 err = asyncore._strerror(errno.EPERM) 305 if hasattr(os, 'strerror'): 306 self.assertEqual(err, os.strerror(errno.EPERM)) 307 err = asyncore._strerror(-1) 308 self.assertTrue(err != "") 309 310 311class dispatcherwithsend_noread(asyncore.dispatcher_with_send): 312 def readable(self): 313 return False 314 315 def handle_connect(self): 316 pass 317 318 319class DispatcherWithSendTests(unittest.TestCase): 320 def setUp(self): 321 pass 322 323 def tearDown(self): 324 asyncore.close_all() 325 326 @support.reap_threads 327 def test_send(self): 328 evt = threading.Event() 329 sock = socket.socket() 330 sock.settimeout(3) 331 port = support.bind_port(sock) 332 333 cap = BytesIO() 334 args = (evt, cap, sock) 335 t = threading.Thread(target=capture_server, args=args) 336 t.start() 337 try: 338 # wait a little longer for the server to initialize (it sometimes 339 # refuses connections on slow machines without this wait) 340 time.sleep(0.2) 341 342 data = b"Suppose there isn't a 16-ton weight?" 343 d = dispatcherwithsend_noread() 344 d.create_socket() 345 d.connect((support.HOST, port)) 346 347 # give time for socket to connect 348 time.sleep(0.1) 349 350 d.send(data) 351 d.send(data) 352 d.send(b'\n') 353 354 n = 1000 355 while d.out_buffer and n > 0: 356 asyncore.poll() 357 n -= 1 358 359 evt.wait() 360 361 self.assertEqual(cap.getvalue(), data*2) 362 finally: 363 support.join_thread(t, timeout=TIMEOUT) 364 365 366@unittest.skipUnless(hasattr(asyncore, 'file_wrapper'), 367 'asyncore.file_wrapper required') 368class FileWrapperTest(unittest.TestCase): 369 def setUp(self): 370 self.d = b"It's not dead, it's sleeping!" 371 with open(support.TESTFN, 'wb') as file: 372 file.write(self.d) 373 374 def tearDown(self): 375 support.unlink(support.TESTFN) 376 377 def test_recv(self): 378 fd = os.open(support.TESTFN, os.O_RDONLY) 379 w = asyncore.file_wrapper(fd) 380 os.close(fd) 381 382 self.assertNotEqual(w.fd, fd) 383 self.assertNotEqual(w.fileno(), fd) 384 self.assertEqual(w.recv(13), b"It's not dead") 385 self.assertEqual(w.read(6), b", it's") 386 w.close() 387 self.assertRaises(OSError, w.read, 1) 388 389 def test_send(self): 390 d1 = b"Come again?" 391 d2 = b"I want to buy some cheese." 392 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) 393 w = asyncore.file_wrapper(fd) 394 os.close(fd) 395 396 w.write(d1) 397 w.send(d2) 398 w.close() 399 with open(support.TESTFN, 'rb') as file: 400 self.assertEqual(file.read(), self.d + d1 + d2) 401 402 @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 403 'asyncore.file_dispatcher required') 404 def test_dispatcher(self): 405 fd = os.open(support.TESTFN, os.O_RDONLY) 406 data = [] 407 class FileDispatcher(asyncore.file_dispatcher): 408 def handle_read(self): 409 data.append(self.recv(29)) 410 s = FileDispatcher(fd) 411 os.close(fd) 412 asyncore.loop(timeout=0.01, use_poll=True, count=2) 413 self.assertEqual(b"".join(data), self.d) 414 415 def test_resource_warning(self): 416 # Issue #11453 417 fd = os.open(support.TESTFN, os.O_RDONLY) 418 f = asyncore.file_wrapper(fd) 419 420 os.close(fd) 421 with support.check_warnings(('', ResourceWarning)): 422 f = None 423 support.gc_collect() 424 425 def test_close_twice(self): 426 fd = os.open(support.TESTFN, os.O_RDONLY) 427 f = asyncore.file_wrapper(fd) 428 os.close(fd) 429 430 os.close(f.fd) # file_wrapper dupped fd 431 with self.assertRaises(OSError): 432 f.close() 433 434 self.assertEqual(f.fd, -1) 435 # calling close twice should not fail 436 f.close() 437 438 439class BaseTestHandler(asyncore.dispatcher): 440 441 def __init__(self, sock=None): 442 asyncore.dispatcher.__init__(self, sock) 443 self.flag = False 444 445 def handle_accept(self): 446 raise Exception("handle_accept not supposed to be called") 447 448 def handle_accepted(self): 449 raise Exception("handle_accepted not supposed to be called") 450 451 def handle_connect(self): 452 raise Exception("handle_connect not supposed to be called") 453 454 def handle_expt(self): 455 raise Exception("handle_expt not supposed to be called") 456 457 def handle_close(self): 458 raise Exception("handle_close not supposed to be called") 459 460 def handle_error(self): 461 raise 462 463 464class BaseServer(asyncore.dispatcher): 465 """A server which listens on an address and dispatches the 466 connection to a handler. 467 """ 468 469 def __init__(self, family, addr, handler=BaseTestHandler): 470 asyncore.dispatcher.__init__(self) 471 self.create_socket(family) 472 self.set_reuse_addr() 473 bind_af_aware(self.socket, addr) 474 self.listen(5) 475 self.handler = handler 476 477 @property 478 def address(self): 479 return self.socket.getsockname() 480 481 def handle_accepted(self, sock, addr): 482 self.handler(sock) 483 484 def handle_error(self): 485 raise 486 487 488class BaseClient(BaseTestHandler): 489 490 def __init__(self, family, address): 491 BaseTestHandler.__init__(self) 492 self.create_socket(family) 493 self.connect(address) 494 495 def handle_connect(self): 496 pass 497 498 499class BaseTestAPI: 500 501 def tearDown(self): 502 asyncore.close_all(ignore_all=True) 503 504 def loop_waiting_for_flag(self, instance, timeout=5): 505 timeout = float(timeout) / 100 506 count = 100 507 while asyncore.socket_map and count > 0: 508 asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll) 509 if instance.flag: 510 return 511 count -= 1 512 time.sleep(timeout) 513 self.fail("flag not set") 514 515 def test_handle_connect(self): 516 # make sure handle_connect is called on connect() 517 518 class TestClient(BaseClient): 519 def handle_connect(self): 520 self.flag = True 521 522 server = BaseServer(self.family, self.addr) 523 client = TestClient(self.family, server.address) 524 self.loop_waiting_for_flag(client) 525 526 def test_handle_accept(self): 527 # make sure handle_accept() is called when a client connects 528 529 class TestListener(BaseTestHandler): 530 531 def __init__(self, family, addr): 532 BaseTestHandler.__init__(self) 533 self.create_socket(family) 534 bind_af_aware(self.socket, addr) 535 self.listen(5) 536 self.address = self.socket.getsockname() 537 538 def handle_accept(self): 539 self.flag = True 540 541 server = TestListener(self.family, self.addr) 542 client = BaseClient(self.family, server.address) 543 self.loop_waiting_for_flag(server) 544 545 def test_handle_accepted(self): 546 # make sure handle_accepted() is called when a client connects 547 548 class TestListener(BaseTestHandler): 549 550 def __init__(self, family, addr): 551 BaseTestHandler.__init__(self) 552 self.create_socket(family) 553 bind_af_aware(self.socket, addr) 554 self.listen(5) 555 self.address = self.socket.getsockname() 556 557 def handle_accept(self): 558 asyncore.dispatcher.handle_accept(self) 559 560 def handle_accepted(self, sock, addr): 561 sock.close() 562 self.flag = True 563 564 server = TestListener(self.family, self.addr) 565 client = BaseClient(self.family, server.address) 566 self.loop_waiting_for_flag(server) 567 568 569 def test_handle_read(self): 570 # make sure handle_read is called on data received 571 572 class TestClient(BaseClient): 573 def handle_read(self): 574 self.flag = True 575 576 class TestHandler(BaseTestHandler): 577 def __init__(self, conn): 578 BaseTestHandler.__init__(self, conn) 579 self.send(b'x' * 1024) 580 581 server = BaseServer(self.family, self.addr, TestHandler) 582 client = TestClient(self.family, server.address) 583 self.loop_waiting_for_flag(client) 584 585 def test_handle_write(self): 586 # make sure handle_write is called 587 588 class TestClient(BaseClient): 589 def handle_write(self): 590 self.flag = True 591 592 server = BaseServer(self.family, self.addr) 593 client = TestClient(self.family, server.address) 594 self.loop_waiting_for_flag(client) 595 596 def test_handle_close(self): 597 # make sure handle_close is called when the other end closes 598 # the connection 599 600 class TestClient(BaseClient): 601 602 def handle_read(self): 603 # in order to make handle_close be called we are supposed 604 # to make at least one recv() call 605 self.recv(1024) 606 607 def handle_close(self): 608 self.flag = True 609 self.close() 610 611 class TestHandler(BaseTestHandler): 612 def __init__(self, conn): 613 BaseTestHandler.__init__(self, conn) 614 self.close() 615 616 server = BaseServer(self.family, self.addr, TestHandler) 617 client = TestClient(self.family, server.address) 618 self.loop_waiting_for_flag(client) 619 620 def test_handle_close_after_conn_broken(self): 621 # Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and 622 # #11265). 623 624 data = b'\0' * 128 625 626 class TestClient(BaseClient): 627 628 def handle_write(self): 629 self.send(data) 630 631 def handle_close(self): 632 self.flag = True 633 self.close() 634 635 def handle_expt(self): 636 self.flag = True 637 self.close() 638 639 class TestHandler(BaseTestHandler): 640 641 def handle_read(self): 642 self.recv(len(data)) 643 self.close() 644 645 def writable(self): 646 return False 647 648 server = BaseServer(self.family, self.addr, TestHandler) 649 client = TestClient(self.family, server.address) 650 self.loop_waiting_for_flag(client) 651 652 @unittest.skipIf(sys.platform.startswith("sunos"), 653 "OOB support is broken on Solaris") 654 def test_handle_expt(self): 655 # Make sure handle_expt is called on OOB data received. 656 # Note: this might fail on some platforms as OOB data is 657 # tenuously supported and rarely used. 658 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 659 self.skipTest("Not applicable to AF_UNIX sockets.") 660 661 if sys.platform == "darwin" and self.use_poll: 662 self.skipTest("poll may fail on macOS; see issue #28087") 663 664 class TestClient(BaseClient): 665 def handle_expt(self): 666 self.socket.recv(1024, socket.MSG_OOB) 667 self.flag = True 668 669 class TestHandler(BaseTestHandler): 670 def __init__(self, conn): 671 BaseTestHandler.__init__(self, conn) 672 self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB) 673 674 server = BaseServer(self.family, self.addr, TestHandler) 675 client = TestClient(self.family, server.address) 676 self.loop_waiting_for_flag(client) 677 678 def test_handle_error(self): 679 680 class TestClient(BaseClient): 681 def handle_write(self): 682 1.0 / 0 683 def handle_error(self): 684 self.flag = True 685 try: 686 raise 687 except ZeroDivisionError: 688 pass 689 else: 690 raise Exception("exception not raised") 691 692 server = BaseServer(self.family, self.addr) 693 client = TestClient(self.family, server.address) 694 self.loop_waiting_for_flag(client) 695 696 def test_connection_attributes(self): 697 server = BaseServer(self.family, self.addr) 698 client = BaseClient(self.family, server.address) 699 700 # we start disconnected 701 self.assertFalse(server.connected) 702 self.assertTrue(server.accepting) 703 # this can't be taken for granted across all platforms 704 #self.assertFalse(client.connected) 705 self.assertFalse(client.accepting) 706 707 # execute some loops so that client connects to server 708 asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100) 709 self.assertFalse(server.connected) 710 self.assertTrue(server.accepting) 711 self.assertTrue(client.connected) 712 self.assertFalse(client.accepting) 713 714 # disconnect the client 715 client.close() 716 self.assertFalse(server.connected) 717 self.assertTrue(server.accepting) 718 self.assertFalse(client.connected) 719 self.assertFalse(client.accepting) 720 721 # stop serving 722 server.close() 723 self.assertFalse(server.connected) 724 self.assertFalse(server.accepting) 725 726 def test_create_socket(self): 727 s = asyncore.dispatcher() 728 s.create_socket(self.family) 729 self.assertEqual(s.socket.type, socket.SOCK_STREAM) 730 self.assertEqual(s.socket.family, self.family) 731 self.assertEqual(s.socket.gettimeout(), 0) 732 self.assertFalse(s.socket.get_inheritable()) 733 734 def test_bind(self): 735 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 736 self.skipTest("Not applicable to AF_UNIX sockets.") 737 s1 = asyncore.dispatcher() 738 s1.create_socket(self.family) 739 s1.bind(self.addr) 740 s1.listen(5) 741 port = s1.socket.getsockname()[1] 742 743 s2 = asyncore.dispatcher() 744 s2.create_socket(self.family) 745 # EADDRINUSE indicates the socket was correctly bound 746 self.assertRaises(OSError, s2.bind, (self.addr[0], port)) 747 748 def test_set_reuse_addr(self): 749 if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX: 750 self.skipTest("Not applicable to AF_UNIX sockets.") 751 752 with socket.socket(self.family) as sock: 753 try: 754 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 755 except OSError: 756 unittest.skip("SO_REUSEADDR not supported on this platform") 757 else: 758 # if SO_REUSEADDR succeeded for sock we expect asyncore 759 # to do the same 760 s = asyncore.dispatcher(socket.socket(self.family)) 761 self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET, 762 socket.SO_REUSEADDR)) 763 s.socket.close() 764 s.create_socket(self.family) 765 s.set_reuse_addr() 766 self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET, 767 socket.SO_REUSEADDR)) 768 769 @support.reap_threads 770 def test_quick_connect(self): 771 # see: http://bugs.python.org/issue10340 772 if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())): 773 self.skipTest("test specific to AF_INET and AF_INET6") 774 775 server = BaseServer(self.family, self.addr) 776 # run the thread 500 ms: the socket should be connected in 200 ms 777 t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, 778 count=5)) 779 t.start() 780 try: 781 with socket.socket(self.family, socket.SOCK_STREAM) as s: 782 s.settimeout(.2) 783 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, 784 struct.pack('ii', 1, 0)) 785 786 try: 787 s.connect(server.address) 788 except OSError: 789 pass 790 finally: 791 support.join_thread(t, timeout=TIMEOUT) 792 793class TestAPI_UseIPv4Sockets(BaseTestAPI): 794 family = socket.AF_INET 795 addr = (support.HOST, 0) 796 797@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') 798class TestAPI_UseIPv6Sockets(BaseTestAPI): 799 family = socket.AF_INET6 800 addr = (support.HOSTv6, 0) 801 802@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') 803class TestAPI_UseUnixSockets(BaseTestAPI): 804 if HAS_UNIX_SOCKETS: 805 family = socket.AF_UNIX 806 addr = support.TESTFN 807 808 def tearDown(self): 809 support.unlink(self.addr) 810 BaseTestAPI.tearDown(self) 811 812class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): 813 use_poll = False 814 815@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 816class TestAPI_UseIPv4Poll(TestAPI_UseIPv4Sockets, unittest.TestCase): 817 use_poll = True 818 819class TestAPI_UseIPv6Select(TestAPI_UseIPv6Sockets, unittest.TestCase): 820 use_poll = False 821 822@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 823class TestAPI_UseIPv6Poll(TestAPI_UseIPv6Sockets, unittest.TestCase): 824 use_poll = True 825 826class TestAPI_UseUnixSocketsSelect(TestAPI_UseUnixSockets, unittest.TestCase): 827 use_poll = False 828 829@unittest.skipUnless(hasattr(select, 'poll'), 'select.poll required') 830class TestAPI_UseUnixSocketsPoll(TestAPI_UseUnixSockets, unittest.TestCase): 831 use_poll = True 832 833if __name__ == "__main__": 834 unittest.main() 835