1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from .log import logger 29 30 31def _test_selector_event(selector, fd, event): 32 # Test if the selector is monitoring 'event' events 33 # for the file descriptor 'fd'. 34 try: 35 key = selector.get_key(fd) 36 except KeyError: 37 return False 38 else: 39 return bool(key.events & event) 40 41 42class BaseSelectorEventLoop(base_events.BaseEventLoop): 43 """Selector event loop. 44 45 See events.EventLoop for API specification. 46 """ 47 48 def __init__(self, selector=None): 49 super().__init__() 50 51 if selector is None: 52 selector = selectors.DefaultSelector() 53 logger.debug('Using selector: %s', selector.__class__.__name__) 54 self._selector = selector 55 self._make_self_pipe() 56 self._transports = weakref.WeakValueDictionary() 57 58 def _make_socket_transport(self, sock, protocol, waiter=None, *, 59 extra=None, server=None): 60 return _SelectorSocketTransport(self, sock, protocol, waiter, 61 extra, server) 62 63 def _make_ssl_transport( 64 self, rawsock, protocol, sslcontext, waiter=None, 65 *, server_side=False, server_hostname=None, 66 extra=None, server=None, 67 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 68 ssl_protocol = sslproto.SSLProtocol( 69 self, protocol, sslcontext, waiter, 70 server_side, server_hostname, 71 ssl_handshake_timeout=ssl_handshake_timeout) 72 _SelectorSocketTransport(self, rawsock, ssl_protocol, 73 extra=extra, server=server) 74 return ssl_protocol._app_transport 75 76 def _make_datagram_transport(self, sock, protocol, 77 address=None, waiter=None, extra=None): 78 return _SelectorDatagramTransport(self, sock, protocol, 79 address, waiter, extra) 80 81 def close(self): 82 if self.is_running(): 83 raise RuntimeError("Cannot close a running event loop") 84 if self.is_closed(): 85 return 86 self._close_self_pipe() 87 super().close() 88 if self._selector is not None: 89 self._selector.close() 90 self._selector = None 91 92 def _close_self_pipe(self): 93 self._remove_reader(self._ssock.fileno()) 94 self._ssock.close() 95 self._ssock = None 96 self._csock.close() 97 self._csock = None 98 self._internal_fds -= 1 99 100 def _make_self_pipe(self): 101 # A self-socket, really. :-) 102 self._ssock, self._csock = socket.socketpair() 103 self._ssock.setblocking(False) 104 self._csock.setblocking(False) 105 self._internal_fds += 1 106 self._add_reader(self._ssock.fileno(), self._read_from_self) 107 108 def _process_self_data(self, data): 109 pass 110 111 def _read_from_self(self): 112 while True: 113 try: 114 data = self._ssock.recv(4096) 115 if not data: 116 break 117 self._process_self_data(data) 118 except InterruptedError: 119 continue 120 except BlockingIOError: 121 break 122 123 def _write_to_self(self): 124 # This may be called from a different thread, possibly after 125 # _close_self_pipe() has been called or even while it is 126 # running. Guard for self._csock being None or closed. When 127 # a socket is closed, send() raises OSError (with errno set to 128 # EBADF, but let's not rely on the exact error code). 129 csock = self._csock 130 if csock is not None: 131 try: 132 csock.send(b'\0') 133 except OSError: 134 if self._debug: 135 logger.debug("Fail to write a null byte into the " 136 "self-pipe socket", 137 exc_info=True) 138 139 def _start_serving(self, protocol_factory, sock, 140 sslcontext=None, server=None, backlog=100, 141 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 142 self._add_reader(sock.fileno(), self._accept_connection, 143 protocol_factory, sock, sslcontext, server, backlog, 144 ssl_handshake_timeout) 145 146 def _accept_connection( 147 self, protocol_factory, sock, 148 sslcontext=None, server=None, backlog=100, 149 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 150 # This method is only called once for each event loop tick where the 151 # listening socket has triggered an EVENT_READ. There may be multiple 152 # connections waiting for an .accept() so it is called in a loop. 153 # See https://bugs.python.org/issue27906 for more details. 154 for _ in range(backlog): 155 try: 156 conn, addr = sock.accept() 157 if self._debug: 158 logger.debug("%r got a new connection from %r: %r", 159 server, addr, conn) 160 conn.setblocking(False) 161 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 162 # Early exit because the socket accept buffer is empty. 163 return None 164 except OSError as exc: 165 # There's nowhere to send the error, so just log it. 166 if exc.errno in (errno.EMFILE, errno.ENFILE, 167 errno.ENOBUFS, errno.ENOMEM): 168 # Some platforms (e.g. Linux keep reporting the FD as 169 # ready, so we remove the read handler temporarily. 170 # We'll try again in a while. 171 self.call_exception_handler({ 172 'message': 'socket.accept() out of system resource', 173 'exception': exc, 174 'socket': sock, 175 }) 176 self._remove_reader(sock.fileno()) 177 self.call_later(constants.ACCEPT_RETRY_DELAY, 178 self._start_serving, 179 protocol_factory, sock, sslcontext, server, 180 backlog, ssl_handshake_timeout) 181 else: 182 raise # The event loop will catch, log and ignore it. 183 else: 184 extra = {'peername': addr} 185 accept = self._accept_connection2( 186 protocol_factory, conn, extra, sslcontext, server, 187 ssl_handshake_timeout) 188 self.create_task(accept) 189 190 async def _accept_connection2( 191 self, protocol_factory, conn, extra, 192 sslcontext=None, server=None, 193 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 194 protocol = None 195 transport = None 196 try: 197 protocol = protocol_factory() 198 waiter = self.create_future() 199 if sslcontext: 200 transport = self._make_ssl_transport( 201 conn, protocol, sslcontext, waiter=waiter, 202 server_side=True, extra=extra, server=server, 203 ssl_handshake_timeout=ssl_handshake_timeout) 204 else: 205 transport = self._make_socket_transport( 206 conn, protocol, waiter=waiter, extra=extra, 207 server=server) 208 209 try: 210 await waiter 211 except: 212 transport.close() 213 raise 214 215 # It's now up to the protocol to handle the connection. 216 except Exception as exc: 217 if self._debug: 218 context = { 219 'message': 220 'Error on transport creation for incoming connection', 221 'exception': exc, 222 } 223 if protocol is not None: 224 context['protocol'] = protocol 225 if transport is not None: 226 context['transport'] = transport 227 self.call_exception_handler(context) 228 229 def _ensure_fd_no_transport(self, fd): 230 fileno = fd 231 if not isinstance(fileno, int): 232 try: 233 fileno = int(fileno.fileno()) 234 except (AttributeError, TypeError, ValueError): 235 # This code matches selectors._fileobj_to_fd function. 236 raise ValueError(f"Invalid file object: {fd!r}") from None 237 try: 238 transport = self._transports[fileno] 239 except KeyError: 240 pass 241 else: 242 if not transport.is_closing(): 243 raise RuntimeError( 244 f'File descriptor {fd!r} is used by transport ' 245 f'{transport!r}') 246 247 def _add_reader(self, fd, callback, *args): 248 self._check_closed() 249 handle = events.Handle(callback, args, self, None) 250 try: 251 key = self._selector.get_key(fd) 252 except KeyError: 253 self._selector.register(fd, selectors.EVENT_READ, 254 (handle, None)) 255 else: 256 mask, (reader, writer) = key.events, key.data 257 self._selector.modify(fd, mask | selectors.EVENT_READ, 258 (handle, writer)) 259 if reader is not None: 260 reader.cancel() 261 262 def _remove_reader(self, fd): 263 if self.is_closed(): 264 return False 265 try: 266 key = self._selector.get_key(fd) 267 except KeyError: 268 return False 269 else: 270 mask, (reader, writer) = key.events, key.data 271 mask &= ~selectors.EVENT_READ 272 if not mask: 273 self._selector.unregister(fd) 274 else: 275 self._selector.modify(fd, mask, (None, writer)) 276 277 if reader is not None: 278 reader.cancel() 279 return True 280 else: 281 return False 282 283 def _add_writer(self, fd, callback, *args): 284 self._check_closed() 285 handle = events.Handle(callback, args, self, None) 286 try: 287 key = self._selector.get_key(fd) 288 except KeyError: 289 self._selector.register(fd, selectors.EVENT_WRITE, 290 (None, handle)) 291 else: 292 mask, (reader, writer) = key.events, key.data 293 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 294 (reader, handle)) 295 if writer is not None: 296 writer.cancel() 297 298 def _remove_writer(self, fd): 299 """Remove a writer callback.""" 300 if self.is_closed(): 301 return False 302 try: 303 key = self._selector.get_key(fd) 304 except KeyError: 305 return False 306 else: 307 mask, (reader, writer) = key.events, key.data 308 # Remove both writer and connector. 309 mask &= ~selectors.EVENT_WRITE 310 if not mask: 311 self._selector.unregister(fd) 312 else: 313 self._selector.modify(fd, mask, (reader, None)) 314 315 if writer is not None: 316 writer.cancel() 317 return True 318 else: 319 return False 320 321 def add_reader(self, fd, callback, *args): 322 """Add a reader callback.""" 323 self._ensure_fd_no_transport(fd) 324 return self._add_reader(fd, callback, *args) 325 326 def remove_reader(self, fd): 327 """Remove a reader callback.""" 328 self._ensure_fd_no_transport(fd) 329 return self._remove_reader(fd) 330 331 def add_writer(self, fd, callback, *args): 332 """Add a writer callback..""" 333 self._ensure_fd_no_transport(fd) 334 return self._add_writer(fd, callback, *args) 335 336 def remove_writer(self, fd): 337 """Remove a writer callback.""" 338 self._ensure_fd_no_transport(fd) 339 return self._remove_writer(fd) 340 341 async def sock_recv(self, sock, n): 342 """Receive data from the socket. 343 344 The return value is a bytes object representing the data received. 345 The maximum amount of data to be received at once is specified by 346 nbytes. 347 """ 348 if self._debug and sock.gettimeout() != 0: 349 raise ValueError("the socket must be non-blocking") 350 fut = self.create_future() 351 self._sock_recv(fut, None, sock, n) 352 return await fut 353 354 def _sock_recv(self, fut, registered_fd, sock, n): 355 # _sock_recv() can add itself as an I/O callback if the operation can't 356 # be done immediately. Don't use it directly, call sock_recv(). 357 if registered_fd is not None: 358 # Remove the callback early. It should be rare that the 359 # selector says the fd is ready but the call still returns 360 # EAGAIN, and I am willing to take a hit in that case in 361 # order to simplify the common case. 362 self.remove_reader(registered_fd) 363 if fut.cancelled(): 364 return 365 try: 366 data = sock.recv(n) 367 except (BlockingIOError, InterruptedError): 368 fd = sock.fileno() 369 self.add_reader(fd, self._sock_recv, fut, fd, sock, n) 370 except Exception as exc: 371 fut.set_exception(exc) 372 else: 373 fut.set_result(data) 374 375 async def sock_recv_into(self, sock, buf): 376 """Receive data from the socket. 377 378 The received data is written into *buf* (a writable buffer). 379 The return value is the number of bytes written. 380 """ 381 if self._debug and sock.gettimeout() != 0: 382 raise ValueError("the socket must be non-blocking") 383 fut = self.create_future() 384 self._sock_recv_into(fut, None, sock, buf) 385 return await fut 386 387 def _sock_recv_into(self, fut, registered_fd, sock, buf): 388 # _sock_recv_into() can add itself as an I/O callback if the operation 389 # can't be done immediately. Don't use it directly, call 390 # sock_recv_into(). 391 if registered_fd is not None: 392 # Remove the callback early. It should be rare that the 393 # selector says the FD is ready but the call still returns 394 # EAGAIN, and I am willing to take a hit in that case in 395 # order to simplify the common case. 396 self.remove_reader(registered_fd) 397 if fut.cancelled(): 398 return 399 try: 400 nbytes = sock.recv_into(buf) 401 except (BlockingIOError, InterruptedError): 402 fd = sock.fileno() 403 self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf) 404 except Exception as exc: 405 fut.set_exception(exc) 406 else: 407 fut.set_result(nbytes) 408 409 async def sock_sendall(self, sock, data): 410 """Send data to the socket. 411 412 The socket must be connected to a remote socket. This method continues 413 to send data from data until either all data has been sent or an 414 error occurs. None is returned on success. On error, an exception is 415 raised, and there is no way to determine how much data, if any, was 416 successfully processed by the receiving end of the connection. 417 """ 418 if self._debug and sock.gettimeout() != 0: 419 raise ValueError("the socket must be non-blocking") 420 fut = self.create_future() 421 if data: 422 self._sock_sendall(fut, None, sock, data) 423 else: 424 fut.set_result(None) 425 return await fut 426 427 def _sock_sendall(self, fut, registered_fd, sock, data): 428 if registered_fd is not None: 429 self.remove_writer(registered_fd) 430 if fut.cancelled(): 431 return 432 433 try: 434 n = sock.send(data) 435 except (BlockingIOError, InterruptedError): 436 n = 0 437 except Exception as exc: 438 fut.set_exception(exc) 439 return 440 441 if n == len(data): 442 fut.set_result(None) 443 else: 444 if n: 445 data = data[n:] 446 fd = sock.fileno() 447 self.add_writer(fd, self._sock_sendall, fut, fd, sock, data) 448 449 async def sock_connect(self, sock, address): 450 """Connect to a remote socket at address. 451 452 This method is a coroutine. 453 """ 454 if self._debug and sock.gettimeout() != 0: 455 raise ValueError("the socket must be non-blocking") 456 457 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 458 resolved = await self._ensure_resolved( 459 address, family=sock.family, proto=sock.proto, loop=self) 460 _, _, _, _, address = resolved[0] 461 462 fut = self.create_future() 463 self._sock_connect(fut, sock, address) 464 return await fut 465 466 def _sock_connect(self, fut, sock, address): 467 fd = sock.fileno() 468 try: 469 sock.connect(address) 470 except (BlockingIOError, InterruptedError): 471 # Issue #23618: When the C function connect() fails with EINTR, the 472 # connection runs in background. We have to wait until the socket 473 # becomes writable to be notified when the connection succeed or 474 # fails. 475 fut.add_done_callback( 476 functools.partial(self._sock_connect_done, fd)) 477 self.add_writer(fd, self._sock_connect_cb, fut, sock, address) 478 except Exception as exc: 479 fut.set_exception(exc) 480 else: 481 fut.set_result(None) 482 483 def _sock_connect_done(self, fd, fut): 484 self.remove_writer(fd) 485 486 def _sock_connect_cb(self, fut, sock, address): 487 if fut.cancelled(): 488 return 489 490 try: 491 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 492 if err != 0: 493 # Jump to any except clause below. 494 raise OSError(err, f'Connect call failed {address}') 495 except (BlockingIOError, InterruptedError): 496 # socket is still registered, the callback will be retried later 497 pass 498 except Exception as exc: 499 fut.set_exception(exc) 500 else: 501 fut.set_result(None) 502 503 async def sock_accept(self, sock): 504 """Accept a connection. 505 506 The socket must be bound to an address and listening for connections. 507 The return value is a pair (conn, address) where conn is a new socket 508 object usable to send and receive data on the connection, and address 509 is the address bound to the socket on the other end of the connection. 510 """ 511 if self._debug and sock.gettimeout() != 0: 512 raise ValueError("the socket must be non-blocking") 513 fut = self.create_future() 514 self._sock_accept(fut, False, sock) 515 return await fut 516 517 def _sock_accept(self, fut, registered, sock): 518 fd = sock.fileno() 519 if registered: 520 self.remove_reader(fd) 521 if fut.cancelled(): 522 return 523 try: 524 conn, address = sock.accept() 525 conn.setblocking(False) 526 except (BlockingIOError, InterruptedError): 527 self.add_reader(fd, self._sock_accept, fut, True, sock) 528 except Exception as exc: 529 fut.set_exception(exc) 530 else: 531 fut.set_result((conn, address)) 532 533 async def _sendfile_native(self, transp, file, offset, count): 534 del self._transports[transp._sock_fd] 535 resume_reading = transp.is_reading() 536 transp.pause_reading() 537 await transp._make_empty_waiter() 538 try: 539 return await self.sock_sendfile(transp._sock, file, offset, count, 540 fallback=False) 541 finally: 542 transp._reset_empty_waiter() 543 if resume_reading: 544 transp.resume_reading() 545 self._transports[transp._sock_fd] = transp 546 547 def _process_events(self, event_list): 548 for key, mask in event_list: 549 fileobj, (reader, writer) = key.fileobj, key.data 550 if mask & selectors.EVENT_READ and reader is not None: 551 if reader._cancelled: 552 self._remove_reader(fileobj) 553 else: 554 self._add_callback(reader) 555 if mask & selectors.EVENT_WRITE and writer is not None: 556 if writer._cancelled: 557 self._remove_writer(fileobj) 558 else: 559 self._add_callback(writer) 560 561 def _stop_serving(self, sock): 562 self._remove_reader(sock.fileno()) 563 sock.close() 564 565 566class _SelectorTransport(transports._FlowControlMixin, 567 transports.Transport): 568 569 max_size = 256 * 1024 # Buffer size passed to recv(). 570 571 _buffer_factory = bytearray # Constructs initial value for self._buffer. 572 573 # Attribute used in the destructor: it must be set even if the constructor 574 # is not called (see _SelectorSslTransport which may start by raising an 575 # exception) 576 _sock = None 577 578 def __init__(self, loop, sock, protocol, extra=None, server=None): 579 super().__init__(extra, loop) 580 self._extra['socket'] = sock 581 self._extra['sockname'] = sock.getsockname() 582 if 'peername' not in self._extra: 583 try: 584 self._extra['peername'] = sock.getpeername() 585 except socket.error: 586 self._extra['peername'] = None 587 self._sock = sock 588 self._sock_fd = sock.fileno() 589 590 self._protocol_connected = False 591 self.set_protocol(protocol) 592 593 self._server = server 594 self._buffer = self._buffer_factory() 595 self._conn_lost = 0 # Set when call to connection_lost scheduled. 596 self._closing = False # Set when close() called. 597 if self._server is not None: 598 self._server._attach() 599 loop._transports[self._sock_fd] = self 600 601 def __repr__(self): 602 info = [self.__class__.__name__] 603 if self._sock is None: 604 info.append('closed') 605 elif self._closing: 606 info.append('closing') 607 info.append(f'fd={self._sock_fd}') 608 # test if the transport was closed 609 if self._loop is not None and not self._loop.is_closed(): 610 polling = _test_selector_event(self._loop._selector, 611 self._sock_fd, selectors.EVENT_READ) 612 if polling: 613 info.append('read=polling') 614 else: 615 info.append('read=idle') 616 617 polling = _test_selector_event(self._loop._selector, 618 self._sock_fd, 619 selectors.EVENT_WRITE) 620 if polling: 621 state = 'polling' 622 else: 623 state = 'idle' 624 625 bufsize = self.get_write_buffer_size() 626 info.append(f'write=<{state}, bufsize={bufsize}>') 627 return '<{}>'.format(' '.join(info)) 628 629 def abort(self): 630 self._force_close(None) 631 632 def set_protocol(self, protocol): 633 self._protocol = protocol 634 self._protocol_connected = True 635 636 def get_protocol(self): 637 return self._protocol 638 639 def is_closing(self): 640 return self._closing 641 642 def close(self): 643 if self._closing: 644 return 645 self._closing = True 646 self._loop._remove_reader(self._sock_fd) 647 if not self._buffer: 648 self._conn_lost += 1 649 self._loop._remove_writer(self._sock_fd) 650 self._loop.call_soon(self._call_connection_lost, None) 651 652 def __del__(self): 653 if self._sock is not None: 654 warnings.warn(f"unclosed transport {self!r}", ResourceWarning, 655 source=self) 656 self._sock.close() 657 658 def _fatal_error(self, exc, message='Fatal error on transport'): 659 # Should be called from exception handler only. 660 if isinstance(exc, base_events._FATAL_ERROR_IGNORE): 661 if self._loop.get_debug(): 662 logger.debug("%r: %s", self, message, exc_info=True) 663 else: 664 self._loop.call_exception_handler({ 665 'message': message, 666 'exception': exc, 667 'transport': self, 668 'protocol': self._protocol, 669 }) 670 self._force_close(exc) 671 672 def _force_close(self, exc): 673 if self._conn_lost: 674 return 675 if self._buffer: 676 self._buffer.clear() 677 self._loop._remove_writer(self._sock_fd) 678 if not self._closing: 679 self._closing = True 680 self._loop._remove_reader(self._sock_fd) 681 self._conn_lost += 1 682 self._loop.call_soon(self._call_connection_lost, exc) 683 684 def _call_connection_lost(self, exc): 685 try: 686 if self._protocol_connected: 687 self._protocol.connection_lost(exc) 688 finally: 689 self._sock.close() 690 self._sock = None 691 self._protocol = None 692 self._loop = None 693 server = self._server 694 if server is not None: 695 server._detach() 696 self._server = None 697 698 def get_write_buffer_size(self): 699 return len(self._buffer) 700 701 def _add_reader(self, fd, callback, *args): 702 if self._closing: 703 return 704 705 self._loop._add_reader(fd, callback, *args) 706 707 708class _SelectorSocketTransport(_SelectorTransport): 709 710 _start_tls_compatible = True 711 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 712 713 def __init__(self, loop, sock, protocol, waiter=None, 714 extra=None, server=None): 715 716 self._read_ready_cb = None 717 super().__init__(loop, sock, protocol, extra, server) 718 self._eof = False 719 self._paused = False 720 self._empty_waiter = None 721 722 # Disable the Nagle algorithm -- small writes will be 723 # sent without waiting for the TCP ACK. This generally 724 # decreases the latency (in some cases significantly.) 725 base_events._set_nodelay(self._sock) 726 727 self._loop.call_soon(self._protocol.connection_made, self) 728 # only start reading when connection_made() has been called 729 self._loop.call_soon(self._add_reader, 730 self._sock_fd, self._read_ready) 731 if waiter is not None: 732 # only wake up the waiter when connection_made() has been called 733 self._loop.call_soon(futures._set_result_unless_cancelled, 734 waiter, None) 735 736 def set_protocol(self, protocol): 737 if isinstance(protocol, protocols.BufferedProtocol): 738 self._read_ready_cb = self._read_ready__get_buffer 739 else: 740 self._read_ready_cb = self._read_ready__data_received 741 742 super().set_protocol(protocol) 743 744 def is_reading(self): 745 return not self._paused and not self._closing 746 747 def pause_reading(self): 748 if self._closing or self._paused: 749 return 750 self._paused = True 751 self._loop._remove_reader(self._sock_fd) 752 if self._loop.get_debug(): 753 logger.debug("%r pauses reading", self) 754 755 def resume_reading(self): 756 if self._closing or not self._paused: 757 return 758 self._paused = False 759 self._add_reader(self._sock_fd, self._read_ready) 760 if self._loop.get_debug(): 761 logger.debug("%r resumes reading", self) 762 763 def _read_ready(self): 764 self._read_ready_cb() 765 766 def _read_ready__get_buffer(self): 767 if self._conn_lost: 768 return 769 770 try: 771 buf = self._protocol.get_buffer(-1) 772 if not len(buf): 773 raise RuntimeError('get_buffer() returned an empty buffer') 774 except Exception as exc: 775 self._fatal_error( 776 exc, 'Fatal error: protocol.get_buffer() call failed.') 777 return 778 779 try: 780 nbytes = self._sock.recv_into(buf) 781 except (BlockingIOError, InterruptedError): 782 return 783 except Exception as exc: 784 self._fatal_error(exc, 'Fatal read error on socket transport') 785 return 786 787 if not nbytes: 788 self._read_ready__on_eof() 789 return 790 791 try: 792 self._protocol.buffer_updated(nbytes) 793 except Exception as exc: 794 self._fatal_error( 795 exc, 'Fatal error: protocol.buffer_updated() call failed.') 796 797 def _read_ready__data_received(self): 798 if self._conn_lost: 799 return 800 try: 801 data = self._sock.recv(self.max_size) 802 except (BlockingIOError, InterruptedError): 803 return 804 except Exception as exc: 805 self._fatal_error(exc, 'Fatal read error on socket transport') 806 return 807 808 if not data: 809 self._read_ready__on_eof() 810 return 811 812 try: 813 self._protocol.data_received(data) 814 except Exception as exc: 815 self._fatal_error( 816 exc, 'Fatal error: protocol.data_received() call failed.') 817 818 def _read_ready__on_eof(self): 819 if self._loop.get_debug(): 820 logger.debug("%r received EOF", self) 821 822 try: 823 keep_open = self._protocol.eof_received() 824 except Exception as exc: 825 self._fatal_error( 826 exc, 'Fatal error: protocol.eof_received() call failed.') 827 return 828 829 if keep_open: 830 # We're keeping the connection open so the 831 # protocol can write more, but we still can't 832 # receive more, so remove the reader callback. 833 self._loop._remove_reader(self._sock_fd) 834 else: 835 self.close() 836 837 def write(self, data): 838 if not isinstance(data, (bytes, bytearray, memoryview)): 839 raise TypeError(f'data argument must be a bytes-like object, ' 840 f'not {type(data).__name__!r}') 841 if self._eof: 842 raise RuntimeError('Cannot call write() after write_eof()') 843 if self._empty_waiter is not None: 844 raise RuntimeError('unable to write; sendfile is in progress') 845 if not data: 846 return 847 848 if self._conn_lost: 849 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 850 logger.warning('socket.send() raised exception.') 851 self._conn_lost += 1 852 return 853 854 if not self._buffer: 855 # Optimization: try to send now. 856 try: 857 n = self._sock.send(data) 858 except (BlockingIOError, InterruptedError): 859 pass 860 except Exception as exc: 861 self._fatal_error(exc, 'Fatal write error on socket transport') 862 return 863 else: 864 data = data[n:] 865 if not data: 866 return 867 # Not all was written; register write handler. 868 self._loop._add_writer(self._sock_fd, self._write_ready) 869 870 # Add it to the buffer. 871 self._buffer.extend(data) 872 self._maybe_pause_protocol() 873 874 def _write_ready(self): 875 assert self._buffer, 'Data should not be empty' 876 877 if self._conn_lost: 878 return 879 try: 880 n = self._sock.send(self._buffer) 881 except (BlockingIOError, InterruptedError): 882 pass 883 except Exception as exc: 884 self._loop._remove_writer(self._sock_fd) 885 self._buffer.clear() 886 self._fatal_error(exc, 'Fatal write error on socket transport') 887 if self._empty_waiter is not None: 888 self._empty_waiter.set_exception(exc) 889 else: 890 if n: 891 del self._buffer[:n] 892 self._maybe_resume_protocol() # May append to buffer. 893 if not self._buffer: 894 self._loop._remove_writer(self._sock_fd) 895 if self._empty_waiter is not None: 896 self._empty_waiter.set_result(None) 897 if self._closing: 898 self._call_connection_lost(None) 899 elif self._eof: 900 self._sock.shutdown(socket.SHUT_WR) 901 902 def write_eof(self): 903 if self._closing or self._eof: 904 return 905 self._eof = True 906 if not self._buffer: 907 self._sock.shutdown(socket.SHUT_WR) 908 909 def can_write_eof(self): 910 return True 911 912 def _call_connection_lost(self, exc): 913 super()._call_connection_lost(exc) 914 if self._empty_waiter is not None: 915 self._empty_waiter.set_exception( 916 ConnectionError("Connection is closed by peer")) 917 918 def _make_empty_waiter(self): 919 if self._empty_waiter is not None: 920 raise RuntimeError("Empty waiter is already set") 921 self._empty_waiter = self._loop.create_future() 922 if not self._buffer: 923 self._empty_waiter.set_result(None) 924 return self._empty_waiter 925 926 def _reset_empty_waiter(self): 927 self._empty_waiter = None 928 929 930class _SelectorDatagramTransport(_SelectorTransport): 931 932 _buffer_factory = collections.deque 933 934 def __init__(self, loop, sock, protocol, address=None, 935 waiter=None, extra=None): 936 super().__init__(loop, sock, protocol, extra) 937 self._address = address 938 self._loop.call_soon(self._protocol.connection_made, self) 939 # only start reading when connection_made() has been called 940 self._loop.call_soon(self._add_reader, 941 self._sock_fd, self._read_ready) 942 if waiter is not None: 943 # only wake up the waiter when connection_made() has been called 944 self._loop.call_soon(futures._set_result_unless_cancelled, 945 waiter, None) 946 947 def get_write_buffer_size(self): 948 return sum(len(data) for data, _ in self._buffer) 949 950 def _read_ready(self): 951 if self._conn_lost: 952 return 953 try: 954 data, addr = self._sock.recvfrom(self.max_size) 955 except (BlockingIOError, InterruptedError): 956 pass 957 except OSError as exc: 958 self._protocol.error_received(exc) 959 except Exception as exc: 960 self._fatal_error(exc, 'Fatal read error on datagram transport') 961 else: 962 self._protocol.datagram_received(data, addr) 963 964 def sendto(self, data, addr=None): 965 if not isinstance(data, (bytes, bytearray, memoryview)): 966 raise TypeError(f'data argument must be a bytes-like object, ' 967 f'not {type(data).__name__!r}') 968 if not data: 969 return 970 971 if self._address and addr not in (None, self._address): 972 raise ValueError( 973 f'Invalid address: must be None or {self._address}') 974 975 if self._conn_lost and self._address: 976 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 977 logger.warning('socket.send() raised exception.') 978 self._conn_lost += 1 979 return 980 981 if not self._buffer: 982 # Attempt to send it right away first. 983 try: 984 if self._address: 985 self._sock.send(data) 986 else: 987 self._sock.sendto(data, addr) 988 return 989 except (BlockingIOError, InterruptedError): 990 self._loop._add_writer(self._sock_fd, self._sendto_ready) 991 except OSError as exc: 992 self._protocol.error_received(exc) 993 return 994 except Exception as exc: 995 self._fatal_error( 996 exc, 'Fatal write error on datagram transport') 997 return 998 999 # Ensure that what we buffer is immutable. 1000 self._buffer.append((bytes(data), addr)) 1001 self._maybe_pause_protocol() 1002 1003 def _sendto_ready(self): 1004 while self._buffer: 1005 data, addr = self._buffer.popleft() 1006 try: 1007 if self._address: 1008 self._sock.send(data) 1009 else: 1010 self._sock.sendto(data, addr) 1011 except (BlockingIOError, InterruptedError): 1012 self._buffer.appendleft((data, addr)) # Try again later. 1013 break 1014 except OSError as exc: 1015 self._protocol.error_received(exc) 1016 return 1017 except Exception as exc: 1018 self._fatal_error( 1019 exc, 'Fatal write error on datagram transport') 1020 return 1021 1022 self._maybe_resume_protocol() # May append to buffer. 1023 if not self._buffer: 1024 self._loop._remove_writer(self._sock_fd) 1025 if self._closing: 1026 self._call_connection_lost(None) 1027