1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from .log import logger
29
30
31def _test_selector_event(selector, fd, event):
32    # Test if the selector is monitoring 'event' events
33    # for the file descriptor 'fd'.
34    try:
35        key = selector.get_key(fd)
36    except KeyError:
37        return False
38    else:
39        return bool(key.events & event)
40
41
42class BaseSelectorEventLoop(base_events.BaseEventLoop):
43    """Selector event loop.
44
45    See events.EventLoop for API specification.
46    """
47
48    def __init__(self, selector=None):
49        super().__init__()
50
51        if selector is None:
52            selector = selectors.DefaultSelector()
53        logger.debug('Using selector: %s', selector.__class__.__name__)
54        self._selector = selector
55        self._make_self_pipe()
56        self._transports = weakref.WeakValueDictionary()
57
58    def _make_socket_transport(self, sock, protocol, waiter=None, *,
59                               extra=None, server=None):
60        return _SelectorSocketTransport(self, sock, protocol, waiter,
61                                        extra, server)
62
63    def _make_ssl_transport(
64            self, rawsock, protocol, sslcontext, waiter=None,
65            *, server_side=False, server_hostname=None,
66            extra=None, server=None,
67            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
68        ssl_protocol = sslproto.SSLProtocol(
69                self, protocol, sslcontext, waiter,
70                server_side, server_hostname,
71                ssl_handshake_timeout=ssl_handshake_timeout)
72        _SelectorSocketTransport(self, rawsock, ssl_protocol,
73                                 extra=extra, server=server)
74        return ssl_protocol._app_transport
75
76    def _make_datagram_transport(self, sock, protocol,
77                                 address=None, waiter=None, extra=None):
78        return _SelectorDatagramTransport(self, sock, protocol,
79                                          address, waiter, extra)
80
81    def close(self):
82        if self.is_running():
83            raise RuntimeError("Cannot close a running event loop")
84        if self.is_closed():
85            return
86        self._close_self_pipe()
87        super().close()
88        if self._selector is not None:
89            self._selector.close()
90            self._selector = None
91
92    def _close_self_pipe(self):
93        self._remove_reader(self._ssock.fileno())
94        self._ssock.close()
95        self._ssock = None
96        self._csock.close()
97        self._csock = None
98        self._internal_fds -= 1
99
100    def _make_self_pipe(self):
101        # A self-socket, really. :-)
102        self._ssock, self._csock = socket.socketpair()
103        self._ssock.setblocking(False)
104        self._csock.setblocking(False)
105        self._internal_fds += 1
106        self._add_reader(self._ssock.fileno(), self._read_from_self)
107
108    def _process_self_data(self, data):
109        pass
110
111    def _read_from_self(self):
112        while True:
113            try:
114                data = self._ssock.recv(4096)
115                if not data:
116                    break
117                self._process_self_data(data)
118            except InterruptedError:
119                continue
120            except BlockingIOError:
121                break
122
123    def _write_to_self(self):
124        # This may be called from a different thread, possibly after
125        # _close_self_pipe() has been called or even while it is
126        # running.  Guard for self._csock being None or closed.  When
127        # a socket is closed, send() raises OSError (with errno set to
128        # EBADF, but let's not rely on the exact error code).
129        csock = self._csock
130        if csock is not None:
131            try:
132                csock.send(b'\0')
133            except OSError:
134                if self._debug:
135                    logger.debug("Fail to write a null byte into the "
136                                 "self-pipe socket",
137                                 exc_info=True)
138
139    def _start_serving(self, protocol_factory, sock,
140                       sslcontext=None, server=None, backlog=100,
141                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
142        self._add_reader(sock.fileno(), self._accept_connection,
143                         protocol_factory, sock, sslcontext, server, backlog,
144                         ssl_handshake_timeout)
145
146    def _accept_connection(
147            self, protocol_factory, sock,
148            sslcontext=None, server=None, backlog=100,
149            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
150        # This method is only called once for each event loop tick where the
151        # listening socket has triggered an EVENT_READ. There may be multiple
152        # connections waiting for an .accept() so it is called in a loop.
153        # See https://bugs.python.org/issue27906 for more details.
154        for _ in range(backlog):
155            try:
156                conn, addr = sock.accept()
157                if self._debug:
158                    logger.debug("%r got a new connection from %r: %r",
159                                 server, addr, conn)
160                conn.setblocking(False)
161            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
162                # Early exit because the socket accept buffer is empty.
163                return None
164            except OSError as exc:
165                # There's nowhere to send the error, so just log it.
166                if exc.errno in (errno.EMFILE, errno.ENFILE,
167                                 errno.ENOBUFS, errno.ENOMEM):
168                    # Some platforms (e.g. Linux keep reporting the FD as
169                    # ready, so we remove the read handler temporarily.
170                    # We'll try again in a while.
171                    self.call_exception_handler({
172                        'message': 'socket.accept() out of system resource',
173                        'exception': exc,
174                        'socket': sock,
175                    })
176                    self._remove_reader(sock.fileno())
177                    self.call_later(constants.ACCEPT_RETRY_DELAY,
178                                    self._start_serving,
179                                    protocol_factory, sock, sslcontext, server,
180                                    backlog, ssl_handshake_timeout)
181                else:
182                    raise  # The event loop will catch, log and ignore it.
183            else:
184                extra = {'peername': addr}
185                accept = self._accept_connection2(
186                    protocol_factory, conn, extra, sslcontext, server,
187                    ssl_handshake_timeout)
188                self.create_task(accept)
189
190    async def _accept_connection2(
191            self, protocol_factory, conn, extra,
192            sslcontext=None, server=None,
193            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
194        protocol = None
195        transport = None
196        try:
197            protocol = protocol_factory()
198            waiter = self.create_future()
199            if sslcontext:
200                transport = self._make_ssl_transport(
201                    conn, protocol, sslcontext, waiter=waiter,
202                    server_side=True, extra=extra, server=server,
203                    ssl_handshake_timeout=ssl_handshake_timeout)
204            else:
205                transport = self._make_socket_transport(
206                    conn, protocol, waiter=waiter, extra=extra,
207                    server=server)
208
209            try:
210                await waiter
211            except:
212                transport.close()
213                raise
214
215            # It's now up to the protocol to handle the connection.
216        except Exception as exc:
217            if self._debug:
218                context = {
219                    'message':
220                        'Error on transport creation for incoming connection',
221                    'exception': exc,
222                }
223                if protocol is not None:
224                    context['protocol'] = protocol
225                if transport is not None:
226                    context['transport'] = transport
227                self.call_exception_handler(context)
228
229    def _ensure_fd_no_transport(self, fd):
230        fileno = fd
231        if not isinstance(fileno, int):
232            try:
233                fileno = int(fileno.fileno())
234            except (AttributeError, TypeError, ValueError):
235                # This code matches selectors._fileobj_to_fd function.
236                raise ValueError(f"Invalid file object: {fd!r}") from None
237        try:
238            transport = self._transports[fileno]
239        except KeyError:
240            pass
241        else:
242            if not transport.is_closing():
243                raise RuntimeError(
244                    f'File descriptor {fd!r} is used by transport '
245                    f'{transport!r}')
246
247    def _add_reader(self, fd, callback, *args):
248        self._check_closed()
249        handle = events.Handle(callback, args, self, None)
250        try:
251            key = self._selector.get_key(fd)
252        except KeyError:
253            self._selector.register(fd, selectors.EVENT_READ,
254                                    (handle, None))
255        else:
256            mask, (reader, writer) = key.events, key.data
257            self._selector.modify(fd, mask | selectors.EVENT_READ,
258                                  (handle, writer))
259            if reader is not None:
260                reader.cancel()
261
262    def _remove_reader(self, fd):
263        if self.is_closed():
264            return False
265        try:
266            key = self._selector.get_key(fd)
267        except KeyError:
268            return False
269        else:
270            mask, (reader, writer) = key.events, key.data
271            mask &= ~selectors.EVENT_READ
272            if not mask:
273                self._selector.unregister(fd)
274            else:
275                self._selector.modify(fd, mask, (None, writer))
276
277            if reader is not None:
278                reader.cancel()
279                return True
280            else:
281                return False
282
283    def _add_writer(self, fd, callback, *args):
284        self._check_closed()
285        handle = events.Handle(callback, args, self, None)
286        try:
287            key = self._selector.get_key(fd)
288        except KeyError:
289            self._selector.register(fd, selectors.EVENT_WRITE,
290                                    (None, handle))
291        else:
292            mask, (reader, writer) = key.events, key.data
293            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
294                                  (reader, handle))
295            if writer is not None:
296                writer.cancel()
297
298    def _remove_writer(self, fd):
299        """Remove a writer callback."""
300        if self.is_closed():
301            return False
302        try:
303            key = self._selector.get_key(fd)
304        except KeyError:
305            return False
306        else:
307            mask, (reader, writer) = key.events, key.data
308            # Remove both writer and connector.
309            mask &= ~selectors.EVENT_WRITE
310            if not mask:
311                self._selector.unregister(fd)
312            else:
313                self._selector.modify(fd, mask, (reader, None))
314
315            if writer is not None:
316                writer.cancel()
317                return True
318            else:
319                return False
320
321    def add_reader(self, fd, callback, *args):
322        """Add a reader callback."""
323        self._ensure_fd_no_transport(fd)
324        return self._add_reader(fd, callback, *args)
325
326    def remove_reader(self, fd):
327        """Remove a reader callback."""
328        self._ensure_fd_no_transport(fd)
329        return self._remove_reader(fd)
330
331    def add_writer(self, fd, callback, *args):
332        """Add a writer callback.."""
333        self._ensure_fd_no_transport(fd)
334        return self._add_writer(fd, callback, *args)
335
336    def remove_writer(self, fd):
337        """Remove a writer callback."""
338        self._ensure_fd_no_transport(fd)
339        return self._remove_writer(fd)
340
341    async def sock_recv(self, sock, n):
342        """Receive data from the socket.
343
344        The return value is a bytes object representing the data received.
345        The maximum amount of data to be received at once is specified by
346        nbytes.
347        """
348        if self._debug and sock.gettimeout() != 0:
349            raise ValueError("the socket must be non-blocking")
350        fut = self.create_future()
351        self._sock_recv(fut, None, sock, n)
352        return await fut
353
354    def _sock_recv(self, fut, registered_fd, sock, n):
355        # _sock_recv() can add itself as an I/O callback if the operation can't
356        # be done immediately. Don't use it directly, call sock_recv().
357        if registered_fd is not None:
358            # Remove the callback early.  It should be rare that the
359            # selector says the fd is ready but the call still returns
360            # EAGAIN, and I am willing to take a hit in that case in
361            # order to simplify the common case.
362            self.remove_reader(registered_fd)
363        if fut.cancelled():
364            return
365        try:
366            data = sock.recv(n)
367        except (BlockingIOError, InterruptedError):
368            fd = sock.fileno()
369            self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
370        except Exception as exc:
371            fut.set_exception(exc)
372        else:
373            fut.set_result(data)
374
375    async def sock_recv_into(self, sock, buf):
376        """Receive data from the socket.
377
378        The received data is written into *buf* (a writable buffer).
379        The return value is the number of bytes written.
380        """
381        if self._debug and sock.gettimeout() != 0:
382            raise ValueError("the socket must be non-blocking")
383        fut = self.create_future()
384        self._sock_recv_into(fut, None, sock, buf)
385        return await fut
386
387    def _sock_recv_into(self, fut, registered_fd, sock, buf):
388        # _sock_recv_into() can add itself as an I/O callback if the operation
389        # can't be done immediately. Don't use it directly, call
390        # sock_recv_into().
391        if registered_fd is not None:
392            # Remove the callback early.  It should be rare that the
393            # selector says the FD is ready but the call still returns
394            # EAGAIN, and I am willing to take a hit in that case in
395            # order to simplify the common case.
396            self.remove_reader(registered_fd)
397        if fut.cancelled():
398            return
399        try:
400            nbytes = sock.recv_into(buf)
401        except (BlockingIOError, InterruptedError):
402            fd = sock.fileno()
403            self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
404        except Exception as exc:
405            fut.set_exception(exc)
406        else:
407            fut.set_result(nbytes)
408
409    async def sock_sendall(self, sock, data):
410        """Send data to the socket.
411
412        The socket must be connected to a remote socket. This method continues
413        to send data from data until either all data has been sent or an
414        error occurs. None is returned on success. On error, an exception is
415        raised, and there is no way to determine how much data, if any, was
416        successfully processed by the receiving end of the connection.
417        """
418        if self._debug and sock.gettimeout() != 0:
419            raise ValueError("the socket must be non-blocking")
420        fut = self.create_future()
421        if data:
422            self._sock_sendall(fut, None, sock, data)
423        else:
424            fut.set_result(None)
425        return await fut
426
427    def _sock_sendall(self, fut, registered_fd, sock, data):
428        if registered_fd is not None:
429            self.remove_writer(registered_fd)
430        if fut.cancelled():
431            return
432
433        try:
434            n = sock.send(data)
435        except (BlockingIOError, InterruptedError):
436            n = 0
437        except Exception as exc:
438            fut.set_exception(exc)
439            return
440
441        if n == len(data):
442            fut.set_result(None)
443        else:
444            if n:
445                data = data[n:]
446            fd = sock.fileno()
447            self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
448
449    async def sock_connect(self, sock, address):
450        """Connect to a remote socket at address.
451
452        This method is a coroutine.
453        """
454        if self._debug and sock.gettimeout() != 0:
455            raise ValueError("the socket must be non-blocking")
456
457        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
458            resolved = await self._ensure_resolved(
459                address, family=sock.family, proto=sock.proto, loop=self)
460            _, _, _, _, address = resolved[0]
461
462        fut = self.create_future()
463        self._sock_connect(fut, sock, address)
464        return await fut
465
466    def _sock_connect(self, fut, sock, address):
467        fd = sock.fileno()
468        try:
469            sock.connect(address)
470        except (BlockingIOError, InterruptedError):
471            # Issue #23618: When the C function connect() fails with EINTR, the
472            # connection runs in background. We have to wait until the socket
473            # becomes writable to be notified when the connection succeed or
474            # fails.
475            fut.add_done_callback(
476                functools.partial(self._sock_connect_done, fd))
477            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
478        except Exception as exc:
479            fut.set_exception(exc)
480        else:
481            fut.set_result(None)
482
483    def _sock_connect_done(self, fd, fut):
484        self.remove_writer(fd)
485
486    def _sock_connect_cb(self, fut, sock, address):
487        if fut.cancelled():
488            return
489
490        try:
491            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
492            if err != 0:
493                # Jump to any except clause below.
494                raise OSError(err, f'Connect call failed {address}')
495        except (BlockingIOError, InterruptedError):
496            # socket is still registered, the callback will be retried later
497            pass
498        except Exception as exc:
499            fut.set_exception(exc)
500        else:
501            fut.set_result(None)
502
503    async def sock_accept(self, sock):
504        """Accept a connection.
505
506        The socket must be bound to an address and listening for connections.
507        The return value is a pair (conn, address) where conn is a new socket
508        object usable to send and receive data on the connection, and address
509        is the address bound to the socket on the other end of the connection.
510        """
511        if self._debug and sock.gettimeout() != 0:
512            raise ValueError("the socket must be non-blocking")
513        fut = self.create_future()
514        self._sock_accept(fut, False, sock)
515        return await fut
516
517    def _sock_accept(self, fut, registered, sock):
518        fd = sock.fileno()
519        if registered:
520            self.remove_reader(fd)
521        if fut.cancelled():
522            return
523        try:
524            conn, address = sock.accept()
525            conn.setblocking(False)
526        except (BlockingIOError, InterruptedError):
527            self.add_reader(fd, self._sock_accept, fut, True, sock)
528        except Exception as exc:
529            fut.set_exception(exc)
530        else:
531            fut.set_result((conn, address))
532
533    async def _sendfile_native(self, transp, file, offset, count):
534        del self._transports[transp._sock_fd]
535        resume_reading = transp.is_reading()
536        transp.pause_reading()
537        await transp._make_empty_waiter()
538        try:
539            return await self.sock_sendfile(transp._sock, file, offset, count,
540                                            fallback=False)
541        finally:
542            transp._reset_empty_waiter()
543            if resume_reading:
544                transp.resume_reading()
545            self._transports[transp._sock_fd] = transp
546
547    def _process_events(self, event_list):
548        for key, mask in event_list:
549            fileobj, (reader, writer) = key.fileobj, key.data
550            if mask & selectors.EVENT_READ and reader is not None:
551                if reader._cancelled:
552                    self._remove_reader(fileobj)
553                else:
554                    self._add_callback(reader)
555            if mask & selectors.EVENT_WRITE and writer is not None:
556                if writer._cancelled:
557                    self._remove_writer(fileobj)
558                else:
559                    self._add_callback(writer)
560
561    def _stop_serving(self, sock):
562        self._remove_reader(sock.fileno())
563        sock.close()
564
565
566class _SelectorTransport(transports._FlowControlMixin,
567                         transports.Transport):
568
569    max_size = 256 * 1024  # Buffer size passed to recv().
570
571    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
572
573    # Attribute used in the destructor: it must be set even if the constructor
574    # is not called (see _SelectorSslTransport which may start by raising an
575    # exception)
576    _sock = None
577
578    def __init__(self, loop, sock, protocol, extra=None, server=None):
579        super().__init__(extra, loop)
580        self._extra['socket'] = sock
581        self._extra['sockname'] = sock.getsockname()
582        if 'peername' not in self._extra:
583            try:
584                self._extra['peername'] = sock.getpeername()
585            except socket.error:
586                self._extra['peername'] = None
587        self._sock = sock
588        self._sock_fd = sock.fileno()
589
590        self._protocol_connected = False
591        self.set_protocol(protocol)
592
593        self._server = server
594        self._buffer = self._buffer_factory()
595        self._conn_lost = 0  # Set when call to connection_lost scheduled.
596        self._closing = False  # Set when close() called.
597        if self._server is not None:
598            self._server._attach()
599        loop._transports[self._sock_fd] = self
600
601    def __repr__(self):
602        info = [self.__class__.__name__]
603        if self._sock is None:
604            info.append('closed')
605        elif self._closing:
606            info.append('closing')
607        info.append(f'fd={self._sock_fd}')
608        # test if the transport was closed
609        if self._loop is not None and not self._loop.is_closed():
610            polling = _test_selector_event(self._loop._selector,
611                                           self._sock_fd, selectors.EVENT_READ)
612            if polling:
613                info.append('read=polling')
614            else:
615                info.append('read=idle')
616
617            polling = _test_selector_event(self._loop._selector,
618                                           self._sock_fd,
619                                           selectors.EVENT_WRITE)
620            if polling:
621                state = 'polling'
622            else:
623                state = 'idle'
624
625            bufsize = self.get_write_buffer_size()
626            info.append(f'write=<{state}, bufsize={bufsize}>')
627        return '<{}>'.format(' '.join(info))
628
629    def abort(self):
630        self._force_close(None)
631
632    def set_protocol(self, protocol):
633        self._protocol = protocol
634        self._protocol_connected = True
635
636    def get_protocol(self):
637        return self._protocol
638
639    def is_closing(self):
640        return self._closing
641
642    def close(self):
643        if self._closing:
644            return
645        self._closing = True
646        self._loop._remove_reader(self._sock_fd)
647        if not self._buffer:
648            self._conn_lost += 1
649            self._loop._remove_writer(self._sock_fd)
650            self._loop.call_soon(self._call_connection_lost, None)
651
652    def __del__(self):
653        if self._sock is not None:
654            warnings.warn(f"unclosed transport {self!r}", ResourceWarning,
655                          source=self)
656            self._sock.close()
657
658    def _fatal_error(self, exc, message='Fatal error on transport'):
659        # Should be called from exception handler only.
660        if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
661            if self._loop.get_debug():
662                logger.debug("%r: %s", self, message, exc_info=True)
663        else:
664            self._loop.call_exception_handler({
665                'message': message,
666                'exception': exc,
667                'transport': self,
668                'protocol': self._protocol,
669            })
670        self._force_close(exc)
671
672    def _force_close(self, exc):
673        if self._conn_lost:
674            return
675        if self._buffer:
676            self._buffer.clear()
677            self._loop._remove_writer(self._sock_fd)
678        if not self._closing:
679            self._closing = True
680            self._loop._remove_reader(self._sock_fd)
681        self._conn_lost += 1
682        self._loop.call_soon(self._call_connection_lost, exc)
683
684    def _call_connection_lost(self, exc):
685        try:
686            if self._protocol_connected:
687                self._protocol.connection_lost(exc)
688        finally:
689            self._sock.close()
690            self._sock = None
691            self._protocol = None
692            self._loop = None
693            server = self._server
694            if server is not None:
695                server._detach()
696                self._server = None
697
698    def get_write_buffer_size(self):
699        return len(self._buffer)
700
701    def _add_reader(self, fd, callback, *args):
702        if self._closing:
703            return
704
705        self._loop._add_reader(fd, callback, *args)
706
707
708class _SelectorSocketTransport(_SelectorTransport):
709
710    _start_tls_compatible = True
711    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
712
713    def __init__(self, loop, sock, protocol, waiter=None,
714                 extra=None, server=None):
715
716        self._read_ready_cb = None
717        super().__init__(loop, sock, protocol, extra, server)
718        self._eof = False
719        self._paused = False
720        self._empty_waiter = None
721
722        # Disable the Nagle algorithm -- small writes will be
723        # sent without waiting for the TCP ACK.  This generally
724        # decreases the latency (in some cases significantly.)
725        base_events._set_nodelay(self._sock)
726
727        self._loop.call_soon(self._protocol.connection_made, self)
728        # only start reading when connection_made() has been called
729        self._loop.call_soon(self._add_reader,
730                             self._sock_fd, self._read_ready)
731        if waiter is not None:
732            # only wake up the waiter when connection_made() has been called
733            self._loop.call_soon(futures._set_result_unless_cancelled,
734                                 waiter, None)
735
736    def set_protocol(self, protocol):
737        if isinstance(protocol, protocols.BufferedProtocol):
738            self._read_ready_cb = self._read_ready__get_buffer
739        else:
740            self._read_ready_cb = self._read_ready__data_received
741
742        super().set_protocol(protocol)
743
744    def is_reading(self):
745        return not self._paused and not self._closing
746
747    def pause_reading(self):
748        if self._closing or self._paused:
749            return
750        self._paused = True
751        self._loop._remove_reader(self._sock_fd)
752        if self._loop.get_debug():
753            logger.debug("%r pauses reading", self)
754
755    def resume_reading(self):
756        if self._closing or not self._paused:
757            return
758        self._paused = False
759        self._add_reader(self._sock_fd, self._read_ready)
760        if self._loop.get_debug():
761            logger.debug("%r resumes reading", self)
762
763    def _read_ready(self):
764        self._read_ready_cb()
765
766    def _read_ready__get_buffer(self):
767        if self._conn_lost:
768            return
769
770        try:
771            buf = self._protocol.get_buffer(-1)
772            if not len(buf):
773                raise RuntimeError('get_buffer() returned an empty buffer')
774        except Exception as exc:
775            self._fatal_error(
776                exc, 'Fatal error: protocol.get_buffer() call failed.')
777            return
778
779        try:
780            nbytes = self._sock.recv_into(buf)
781        except (BlockingIOError, InterruptedError):
782            return
783        except Exception as exc:
784            self._fatal_error(exc, 'Fatal read error on socket transport')
785            return
786
787        if not nbytes:
788            self._read_ready__on_eof()
789            return
790
791        try:
792            self._protocol.buffer_updated(nbytes)
793        except Exception as exc:
794            self._fatal_error(
795                exc, 'Fatal error: protocol.buffer_updated() call failed.')
796
797    def _read_ready__data_received(self):
798        if self._conn_lost:
799            return
800        try:
801            data = self._sock.recv(self.max_size)
802        except (BlockingIOError, InterruptedError):
803            return
804        except Exception as exc:
805            self._fatal_error(exc, 'Fatal read error on socket transport')
806            return
807
808        if not data:
809            self._read_ready__on_eof()
810            return
811
812        try:
813            self._protocol.data_received(data)
814        except Exception as exc:
815            self._fatal_error(
816                exc, 'Fatal error: protocol.data_received() call failed.')
817
818    def _read_ready__on_eof(self):
819        if self._loop.get_debug():
820            logger.debug("%r received EOF", self)
821
822        try:
823            keep_open = self._protocol.eof_received()
824        except Exception as exc:
825            self._fatal_error(
826                exc, 'Fatal error: protocol.eof_received() call failed.')
827            return
828
829        if keep_open:
830            # We're keeping the connection open so the
831            # protocol can write more, but we still can't
832            # receive more, so remove the reader callback.
833            self._loop._remove_reader(self._sock_fd)
834        else:
835            self.close()
836
837    def write(self, data):
838        if not isinstance(data, (bytes, bytearray, memoryview)):
839            raise TypeError(f'data argument must be a bytes-like object, '
840                            f'not {type(data).__name__!r}')
841        if self._eof:
842            raise RuntimeError('Cannot call write() after write_eof()')
843        if self._empty_waiter is not None:
844            raise RuntimeError('unable to write; sendfile is in progress')
845        if not data:
846            return
847
848        if self._conn_lost:
849            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
850                logger.warning('socket.send() raised exception.')
851            self._conn_lost += 1
852            return
853
854        if not self._buffer:
855            # Optimization: try to send now.
856            try:
857                n = self._sock.send(data)
858            except (BlockingIOError, InterruptedError):
859                pass
860            except Exception as exc:
861                self._fatal_error(exc, 'Fatal write error on socket transport')
862                return
863            else:
864                data = data[n:]
865                if not data:
866                    return
867            # Not all was written; register write handler.
868            self._loop._add_writer(self._sock_fd, self._write_ready)
869
870        # Add it to the buffer.
871        self._buffer.extend(data)
872        self._maybe_pause_protocol()
873
874    def _write_ready(self):
875        assert self._buffer, 'Data should not be empty'
876
877        if self._conn_lost:
878            return
879        try:
880            n = self._sock.send(self._buffer)
881        except (BlockingIOError, InterruptedError):
882            pass
883        except Exception as exc:
884            self._loop._remove_writer(self._sock_fd)
885            self._buffer.clear()
886            self._fatal_error(exc, 'Fatal write error on socket transport')
887            if self._empty_waiter is not None:
888                self._empty_waiter.set_exception(exc)
889        else:
890            if n:
891                del self._buffer[:n]
892            self._maybe_resume_protocol()  # May append to buffer.
893            if not self._buffer:
894                self._loop._remove_writer(self._sock_fd)
895                if self._empty_waiter is not None:
896                    self._empty_waiter.set_result(None)
897                if self._closing:
898                    self._call_connection_lost(None)
899                elif self._eof:
900                    self._sock.shutdown(socket.SHUT_WR)
901
902    def write_eof(self):
903        if self._closing or self._eof:
904            return
905        self._eof = True
906        if not self._buffer:
907            self._sock.shutdown(socket.SHUT_WR)
908
909    def can_write_eof(self):
910        return True
911
912    def _call_connection_lost(self, exc):
913        super()._call_connection_lost(exc)
914        if self._empty_waiter is not None:
915            self._empty_waiter.set_exception(
916                ConnectionError("Connection is closed by peer"))
917
918    def _make_empty_waiter(self):
919        if self._empty_waiter is not None:
920            raise RuntimeError("Empty waiter is already set")
921        self._empty_waiter = self._loop.create_future()
922        if not self._buffer:
923            self._empty_waiter.set_result(None)
924        return self._empty_waiter
925
926    def _reset_empty_waiter(self):
927        self._empty_waiter = None
928
929
930class _SelectorDatagramTransport(_SelectorTransport):
931
932    _buffer_factory = collections.deque
933
934    def __init__(self, loop, sock, protocol, address=None,
935                 waiter=None, extra=None):
936        super().__init__(loop, sock, protocol, extra)
937        self._address = address
938        self._loop.call_soon(self._protocol.connection_made, self)
939        # only start reading when connection_made() has been called
940        self._loop.call_soon(self._add_reader,
941                             self._sock_fd, self._read_ready)
942        if waiter is not None:
943            # only wake up the waiter when connection_made() has been called
944            self._loop.call_soon(futures._set_result_unless_cancelled,
945                                 waiter, None)
946
947    def get_write_buffer_size(self):
948        return sum(len(data) for data, _ in self._buffer)
949
950    def _read_ready(self):
951        if self._conn_lost:
952            return
953        try:
954            data, addr = self._sock.recvfrom(self.max_size)
955        except (BlockingIOError, InterruptedError):
956            pass
957        except OSError as exc:
958            self._protocol.error_received(exc)
959        except Exception as exc:
960            self._fatal_error(exc, 'Fatal read error on datagram transport')
961        else:
962            self._protocol.datagram_received(data, addr)
963
964    def sendto(self, data, addr=None):
965        if not isinstance(data, (bytes, bytearray, memoryview)):
966            raise TypeError(f'data argument must be a bytes-like object, '
967                            f'not {type(data).__name__!r}')
968        if not data:
969            return
970
971        if self._address and addr not in (None, self._address):
972            raise ValueError(
973                f'Invalid address: must be None or {self._address}')
974
975        if self._conn_lost and self._address:
976            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
977                logger.warning('socket.send() raised exception.')
978            self._conn_lost += 1
979            return
980
981        if not self._buffer:
982            # Attempt to send it right away first.
983            try:
984                if self._address:
985                    self._sock.send(data)
986                else:
987                    self._sock.sendto(data, addr)
988                return
989            except (BlockingIOError, InterruptedError):
990                self._loop._add_writer(self._sock_fd, self._sendto_ready)
991            except OSError as exc:
992                self._protocol.error_received(exc)
993                return
994            except Exception as exc:
995                self._fatal_error(
996                    exc, 'Fatal write error on datagram transport')
997                return
998
999        # Ensure that what we buffer is immutable.
1000        self._buffer.append((bytes(data), addr))
1001        self._maybe_pause_protocol()
1002
1003    def _sendto_ready(self):
1004        while self._buffer:
1005            data, addr = self._buffer.popleft()
1006            try:
1007                if self._address:
1008                    self._sock.send(data)
1009                else:
1010                    self._sock.sendto(data, addr)
1011            except (BlockingIOError, InterruptedError):
1012                self._buffer.appendleft((data, addr))  # Try again later.
1013                break
1014            except OSError as exc:
1015                self._protocol.error_received(exc)
1016                return
1017            except Exception as exc:
1018                self._fatal_error(
1019                    exc, 'Fatal write error on datagram transport')
1020                return
1021
1022        self._maybe_resume_protocol()  # May append to buffer.
1023        if not self._buffer:
1024            self._loop._remove_writer(self._sock_fd)
1025            if self._closing:
1026                self._call_connection_lost(None)
1027