1"""Selector and proactor event loops for Windows."""
2
3import _overlapped
4import _winapi
5import errno
6import math
7import msvcrt
8import socket
9import struct
10import time
11import weakref
12
13from . import events
14from . import base_subprocess
15from . import futures
16from . import proactor_events
17from . import selector_events
18from . import tasks
19from . import windows_utils
20from .log import logger
21
22
23__all__ = (
24    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
25    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
26    'WindowsProactorEventLoopPolicy',
27)
28
29
30NULL = 0
31INFINITE = 0xffffffff
32ERROR_CONNECTION_REFUSED = 1225
33ERROR_CONNECTION_ABORTED = 1236
34
35# Initial delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_INIT_DELAY = 0.001
37
38# Maximum delay in seconds for connect_pipe() before retrying to connect
39CONNECT_PIPE_MAX_DELAY = 0.100
40
41
42class _OverlappedFuture(futures.Future):
43    """Subclass of Future which represents an overlapped operation.
44
45    Cancelling it will immediately cancel the overlapped operation.
46    """
47
48    def __init__(self, ov, *, loop=None):
49        super().__init__(loop=loop)
50        if self._source_traceback:
51            del self._source_traceback[-1]
52        self._ov = ov
53
54    def _repr_info(self):
55        info = super()._repr_info()
56        if self._ov is not None:
57            state = 'pending' if self._ov.pending else 'completed'
58            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
59        return info
60
61    def _cancel_overlapped(self):
62        if self._ov is None:
63            return
64        try:
65            self._ov.cancel()
66        except OSError as exc:
67            context = {
68                'message': 'Cancelling an overlapped future failed',
69                'exception': exc,
70                'future': self,
71            }
72            if self._source_traceback:
73                context['source_traceback'] = self._source_traceback
74            self._loop.call_exception_handler(context)
75        self._ov = None
76
77    def cancel(self):
78        self._cancel_overlapped()
79        return super().cancel()
80
81    def set_exception(self, exception):
82        super().set_exception(exception)
83        self._cancel_overlapped()
84
85    def set_result(self, result):
86        super().set_result(result)
87        self._ov = None
88
89
90class _BaseWaitHandleFuture(futures.Future):
91    """Subclass of Future which represents a wait handle."""
92
93    def __init__(self, ov, handle, wait_handle, *, loop=None):
94        super().__init__(loop=loop)
95        if self._source_traceback:
96            del self._source_traceback[-1]
97        # Keep a reference to the Overlapped object to keep it alive until the
98        # wait is unregistered
99        self._ov = ov
100        self._handle = handle
101        self._wait_handle = wait_handle
102
103        # Should we call UnregisterWaitEx() if the wait completes
104        # or is cancelled?
105        self._registered = True
106
107    def _poll(self):
108        # non-blocking wait: use a timeout of 0 millisecond
109        return (_winapi.WaitForSingleObject(self._handle, 0) ==
110                _winapi.WAIT_OBJECT_0)
111
112    def _repr_info(self):
113        info = super()._repr_info()
114        info.append(f'handle={self._handle:#x}')
115        if self._handle is not None:
116            state = 'signaled' if self._poll() else 'waiting'
117            info.append(state)
118        if self._wait_handle is not None:
119            info.append(f'wait_handle={self._wait_handle:#x}')
120        return info
121
122    def _unregister_wait_cb(self, fut):
123        # The wait was unregistered: it's not safe to destroy the Overlapped
124        # object
125        self._ov = None
126
127    def _unregister_wait(self):
128        if not self._registered:
129            return
130        self._registered = False
131
132        wait_handle = self._wait_handle
133        self._wait_handle = None
134        try:
135            _overlapped.UnregisterWait(wait_handle)
136        except OSError as exc:
137            if exc.winerror != _overlapped.ERROR_IO_PENDING:
138                context = {
139                    'message': 'Failed to unregister the wait handle',
140                    'exception': exc,
141                    'future': self,
142                }
143                if self._source_traceback:
144                    context['source_traceback'] = self._source_traceback
145                self._loop.call_exception_handler(context)
146                return
147            # ERROR_IO_PENDING means that the unregister is pending
148
149        self._unregister_wait_cb(None)
150
151    def cancel(self):
152        self._unregister_wait()
153        return super().cancel()
154
155    def set_exception(self, exception):
156        self._unregister_wait()
157        super().set_exception(exception)
158
159    def set_result(self, result):
160        self._unregister_wait()
161        super().set_result(result)
162
163
164class _WaitCancelFuture(_BaseWaitHandleFuture):
165    """Subclass of Future which represents a wait for the cancellation of a
166    _WaitHandleFuture using an event.
167    """
168
169    def __init__(self, ov, event, wait_handle, *, loop=None):
170        super().__init__(ov, event, wait_handle, loop=loop)
171
172        self._done_callback = None
173
174    def cancel(self):
175        raise RuntimeError("_WaitCancelFuture must not be cancelled")
176
177    def set_result(self, result):
178        super().set_result(result)
179        if self._done_callback is not None:
180            self._done_callback(self)
181
182    def set_exception(self, exception):
183        super().set_exception(exception)
184        if self._done_callback is not None:
185            self._done_callback(self)
186
187
188class _WaitHandleFuture(_BaseWaitHandleFuture):
189    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
190        super().__init__(ov, handle, wait_handle, loop=loop)
191        self._proactor = proactor
192        self._unregister_proactor = True
193        self._event = _overlapped.CreateEvent(None, True, False, None)
194        self._event_fut = None
195
196    def _unregister_wait_cb(self, fut):
197        if self._event is not None:
198            _winapi.CloseHandle(self._event)
199            self._event = None
200            self._event_fut = None
201
202        # If the wait was cancelled, the wait may never be signalled, so
203        # it's required to unregister it. Otherwise, IocpProactor.close() will
204        # wait forever for an event which will never come.
205        #
206        # If the IocpProactor already received the event, it's safe to call
207        # _unregister() because we kept a reference to the Overlapped object
208        # which is used as a unique key.
209        self._proactor._unregister(self._ov)
210        self._proactor = None
211
212        super()._unregister_wait_cb(fut)
213
214    def _unregister_wait(self):
215        if not self._registered:
216            return
217        self._registered = False
218
219        wait_handle = self._wait_handle
220        self._wait_handle = None
221        try:
222            _overlapped.UnregisterWaitEx(wait_handle, self._event)
223        except OSError as exc:
224            if exc.winerror != _overlapped.ERROR_IO_PENDING:
225                context = {
226                    'message': 'Failed to unregister the wait handle',
227                    'exception': exc,
228                    'future': self,
229                }
230                if self._source_traceback:
231                    context['source_traceback'] = self._source_traceback
232                self._loop.call_exception_handler(context)
233                return
234            # ERROR_IO_PENDING is not an error, the wait was unregistered
235
236        self._event_fut = self._proactor._wait_cancel(self._event,
237                                                      self._unregister_wait_cb)
238
239
240class PipeServer(object):
241    """Class representing a pipe server.
242
243    This is much like a bound, listening socket.
244    """
245    def __init__(self, address):
246        self._address = address
247        self._free_instances = weakref.WeakSet()
248        # initialize the pipe attribute before calling _server_pipe_handle()
249        # because this function can raise an exception and the destructor calls
250        # the close() method
251        self._pipe = None
252        self._accept_pipe_future = None
253        self._pipe = self._server_pipe_handle(True)
254
255    def _get_unconnected_pipe(self):
256        # Create new instance and return previous one.  This ensures
257        # that (until the server is closed) there is always at least
258        # one pipe handle for address.  Therefore if a client attempt
259        # to connect it will not fail with FileNotFoundError.
260        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
261        return tmp
262
263    def _server_pipe_handle(self, first):
264        # Return a wrapper for a new pipe handle.
265        if self.closed():
266            return None
267        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
268        if first:
269            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
270        h = _winapi.CreateNamedPipe(
271            self._address, flags,
272            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
273            _winapi.PIPE_WAIT,
274            _winapi.PIPE_UNLIMITED_INSTANCES,
275            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
276            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
277        pipe = windows_utils.PipeHandle(h)
278        self._free_instances.add(pipe)
279        return pipe
280
281    def closed(self):
282        return (self._address is None)
283
284    def close(self):
285        if self._accept_pipe_future is not None:
286            self._accept_pipe_future.cancel()
287            self._accept_pipe_future = None
288        # Close all instances which have not been connected to by a client.
289        if self._address is not None:
290            for pipe in self._free_instances:
291                pipe.close()
292            self._pipe = None
293            self._address = None
294            self._free_instances.clear()
295
296    __del__ = close
297
298
299class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
300    """Windows version of selector event loop."""
301
302
303class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304    """Windows version of proactor event loop using IOCP."""
305
306    def __init__(self, proactor=None):
307        if proactor is None:
308            proactor = IocpProactor()
309        super().__init__(proactor)
310
311    async def create_pipe_connection(self, protocol_factory, address):
312        f = self._proactor.connect_pipe(address)
313        pipe = await f
314        protocol = protocol_factory()
315        trans = self._make_duplex_pipe_transport(pipe, protocol,
316                                                 extra={'addr': address})
317        return trans, protocol
318
319    async def start_serving_pipe(self, protocol_factory, address):
320        server = PipeServer(address)
321
322        def loop_accept_pipe(f=None):
323            pipe = None
324            try:
325                if f:
326                    pipe = f.result()
327                    server._free_instances.discard(pipe)
328
329                    if server.closed():
330                        # A client connected before the server was closed:
331                        # drop the client (close the pipe) and exit
332                        pipe.close()
333                        return
334
335                    protocol = protocol_factory()
336                    self._make_duplex_pipe_transport(
337                        pipe, protocol, extra={'addr': address})
338
339                pipe = server._get_unconnected_pipe()
340                if pipe is None:
341                    return
342
343                f = self._proactor.accept_pipe(pipe)
344            except OSError as exc:
345                if pipe and pipe.fileno() != -1:
346                    self.call_exception_handler({
347                        'message': 'Pipe accept failed',
348                        'exception': exc,
349                        'pipe': pipe,
350                    })
351                    pipe.close()
352                elif self._debug:
353                    logger.warning("Accept pipe failed on pipe %r",
354                                   pipe, exc_info=True)
355            except futures.CancelledError:
356                if pipe:
357                    pipe.close()
358            else:
359                server._accept_pipe_future = f
360                f.add_done_callback(loop_accept_pipe)
361
362        self.call_soon(loop_accept_pipe)
363        return [server]
364
365    async def _make_subprocess_transport(self, protocol, args, shell,
366                                         stdin, stdout, stderr, bufsize,
367                                         extra=None, **kwargs):
368        waiter = self.create_future()
369        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
370                                             stdin, stdout, stderr, bufsize,
371                                             waiter=waiter, extra=extra,
372                                             **kwargs)
373        try:
374            await waiter
375        except Exception:
376            transp.close()
377            await transp._wait()
378            raise
379
380        return transp
381
382
383class IocpProactor:
384    """Proactor implementation using IOCP."""
385
386    def __init__(self, concurrency=0xffffffff):
387        self._loop = None
388        self._results = []
389        self._iocp = _overlapped.CreateIoCompletionPort(
390            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
391        self._cache = {}
392        self._registered = weakref.WeakSet()
393        self._unregistered = []
394        self._stopped_serving = weakref.WeakSet()
395
396    def _check_closed(self):
397        if self._iocp is None:
398            raise RuntimeError('IocpProactor is closed')
399
400    def __repr__(self):
401        info = ['overlapped#=%s' % len(self._cache),
402                'result#=%s' % len(self._results)]
403        if self._iocp is None:
404            info.append('closed')
405        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
406
407    def set_loop(self, loop):
408        self._loop = loop
409
410    def select(self, timeout=None):
411        if not self._results:
412            self._poll(timeout)
413        tmp = self._results
414        self._results = []
415        return tmp
416
417    def _result(self, value):
418        fut = self._loop.create_future()
419        fut.set_result(value)
420        return fut
421
422    def recv(self, conn, nbytes, flags=0):
423        self._register_with_iocp(conn)
424        ov = _overlapped.Overlapped(NULL)
425        try:
426            if isinstance(conn, socket.socket):
427                ov.WSARecv(conn.fileno(), nbytes, flags)
428            else:
429                ov.ReadFile(conn.fileno(), nbytes)
430        except BrokenPipeError:
431            return self._result(b'')
432
433        def finish_recv(trans, key, ov):
434            try:
435                return ov.getresult()
436            except OSError as exc:
437                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
438                                    _overlapped.ERROR_OPERATION_ABORTED):
439                    raise ConnectionResetError(*exc.args)
440                else:
441                    raise
442
443        return self._register(ov, conn, finish_recv)
444
445    def recv_into(self, conn, buf, flags=0):
446        self._register_with_iocp(conn)
447        ov = _overlapped.Overlapped(NULL)
448        try:
449            if isinstance(conn, socket.socket):
450                ov.WSARecvInto(conn.fileno(), buf, flags)
451            else:
452                ov.ReadFileInto(conn.fileno(), buf)
453        except BrokenPipeError:
454            return self._result(b'')
455
456        def finish_recv(trans, key, ov):
457            try:
458                return ov.getresult()
459            except OSError as exc:
460                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
461                                    _overlapped.ERROR_OPERATION_ABORTED):
462                    raise ConnectionResetError(*exc.args)
463                else:
464                    raise
465
466        return self._register(ov, conn, finish_recv)
467
468    def send(self, conn, buf, flags=0):
469        self._register_with_iocp(conn)
470        ov = _overlapped.Overlapped(NULL)
471        if isinstance(conn, socket.socket):
472            ov.WSASend(conn.fileno(), buf, flags)
473        else:
474            ov.WriteFile(conn.fileno(), buf)
475
476        def finish_send(trans, key, ov):
477            try:
478                return ov.getresult()
479            except OSError as exc:
480                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
481                                    _overlapped.ERROR_OPERATION_ABORTED):
482                    raise ConnectionResetError(*exc.args)
483                else:
484                    raise
485
486        return self._register(ov, conn, finish_send)
487
488    def accept(self, listener):
489        self._register_with_iocp(listener)
490        conn = self._get_accept_socket(listener.family)
491        ov = _overlapped.Overlapped(NULL)
492        ov.AcceptEx(listener.fileno(), conn.fileno())
493
494        def finish_accept(trans, key, ov):
495            ov.getresult()
496            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
497            buf = struct.pack('@P', listener.fileno())
498            conn.setsockopt(socket.SOL_SOCKET,
499                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
500            conn.settimeout(listener.gettimeout())
501            return conn, conn.getpeername()
502
503        async def accept_coro(future, conn):
504            # Coroutine closing the accept socket if the future is cancelled
505            try:
506                await future
507            except futures.CancelledError:
508                conn.close()
509                raise
510
511        future = self._register(ov, listener, finish_accept)
512        coro = accept_coro(future, conn)
513        tasks.ensure_future(coro, loop=self._loop)
514        return future
515
516    def connect(self, conn, address):
517        self._register_with_iocp(conn)
518        # The socket needs to be locally bound before we call ConnectEx().
519        try:
520            _overlapped.BindLocal(conn.fileno(), conn.family)
521        except OSError as e:
522            if e.winerror != errno.WSAEINVAL:
523                raise
524            # Probably already locally bound; check using getsockname().
525            if conn.getsockname()[1] == 0:
526                raise
527        ov = _overlapped.Overlapped(NULL)
528        ov.ConnectEx(conn.fileno(), address)
529
530        def finish_connect(trans, key, ov):
531            ov.getresult()
532            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
533            conn.setsockopt(socket.SOL_SOCKET,
534                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
535            return conn
536
537        return self._register(ov, conn, finish_connect)
538
539    def sendfile(self, sock, file, offset, count):
540        self._register_with_iocp(sock)
541        ov = _overlapped.Overlapped(NULL)
542        offset_low = offset & 0xffff_ffff
543        offset_high = (offset >> 32) & 0xffff_ffff
544        ov.TransmitFile(sock.fileno(),
545                        msvcrt.get_osfhandle(file.fileno()),
546                        offset_low, offset_high,
547                        count, 0, 0)
548
549        def finish_sendfile(trans, key, ov):
550            try:
551                return ov.getresult()
552            except OSError as exc:
553                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
554                                    _overlapped.ERROR_OPERATION_ABORTED):
555                    raise ConnectionResetError(*exc.args)
556                else:
557                    raise
558        return self._register(ov, sock, finish_sendfile)
559
560    def accept_pipe(self, pipe):
561        self._register_with_iocp(pipe)
562        ov = _overlapped.Overlapped(NULL)
563        connected = ov.ConnectNamedPipe(pipe.fileno())
564
565        if connected:
566            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
567            # that the pipe is connected. There is no need to wait for the
568            # completion of the connection.
569            return self._result(pipe)
570
571        def finish_accept_pipe(trans, key, ov):
572            ov.getresult()
573            return pipe
574
575        return self._register(ov, pipe, finish_accept_pipe)
576
577    async def connect_pipe(self, address):
578        delay = CONNECT_PIPE_INIT_DELAY
579        while True:
580            # Unfortunately there is no way to do an overlapped connect to
581            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
582            # ERROR_PIPE_BUSY.
583            try:
584                handle = _overlapped.ConnectPipe(address)
585                break
586            except OSError as exc:
587                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
588                    raise
589
590            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
591            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
592            await tasks.sleep(delay, loop=self._loop)
593
594        return windows_utils.PipeHandle(handle)
595
596    def wait_for_handle(self, handle, timeout=None):
597        """Wait for a handle.
598
599        Return a Future object. The result of the future is True if the wait
600        completed, or False if the wait did not complete (on timeout).
601        """
602        return self._wait_for_handle(handle, timeout, False)
603
604    def _wait_cancel(self, event, done_callback):
605        fut = self._wait_for_handle(event, None, True)
606        # add_done_callback() cannot be used because the wait may only complete
607        # in IocpProactor.close(), while the event loop is not running.
608        fut._done_callback = done_callback
609        return fut
610
611    def _wait_for_handle(self, handle, timeout, _is_cancel):
612        self._check_closed()
613
614        if timeout is None:
615            ms = _winapi.INFINITE
616        else:
617            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
618            # round away from zero to wait *at least* timeout seconds.
619            ms = math.ceil(timeout * 1e3)
620
621        # We only create ov so we can use ov.address as a key for the cache.
622        ov = _overlapped.Overlapped(NULL)
623        wait_handle = _overlapped.RegisterWaitWithQueue(
624            handle, self._iocp, ov.address, ms)
625        if _is_cancel:
626            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
627        else:
628            f = _WaitHandleFuture(ov, handle, wait_handle, self,
629                                  loop=self._loop)
630        if f._source_traceback:
631            del f._source_traceback[-1]
632
633        def finish_wait_for_handle(trans, key, ov):
634            # Note that this second wait means that we should only use
635            # this with handles types where a successful wait has no
636            # effect.  So events or processes are all right, but locks
637            # or semaphores are not.  Also note if the handle is
638            # signalled and then quickly reset, then we may return
639            # False even though we have not timed out.
640            return f._poll()
641
642        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
643        return f
644
645    def _register_with_iocp(self, obj):
646        # To get notifications of finished ops on this objects sent to the
647        # completion port, were must register the handle.
648        if obj not in self._registered:
649            self._registered.add(obj)
650            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
651            # XXX We could also use SetFileCompletionNotificationModes()
652            # to avoid sending notifications to completion port of ops
653            # that succeed immediately.
654
655    def _register(self, ov, obj, callback):
656        self._check_closed()
657
658        # Return a future which will be set with the result of the
659        # operation when it completes.  The future's value is actually
660        # the value returned by callback().
661        f = _OverlappedFuture(ov, loop=self._loop)
662        if f._source_traceback:
663            del f._source_traceback[-1]
664        if not ov.pending:
665            # The operation has completed, so no need to postpone the
666            # work.  We cannot take this short cut if we need the
667            # NumberOfBytes, CompletionKey values returned by
668            # PostQueuedCompletionStatus().
669            try:
670                value = callback(None, None, ov)
671            except OSError as e:
672                f.set_exception(e)
673            else:
674                f.set_result(value)
675            # Even if GetOverlappedResult() was called, we have to wait for the
676            # notification of the completion in GetQueuedCompletionStatus().
677            # Register the overlapped operation to keep a reference to the
678            # OVERLAPPED object, otherwise the memory is freed and Windows may
679            # read uninitialized memory.
680
681        # Register the overlapped operation for later.  Note that
682        # we only store obj to prevent it from being garbage
683        # collected too early.
684        self._cache[ov.address] = (f, ov, obj, callback)
685        return f
686
687    def _unregister(self, ov):
688        """Unregister an overlapped object.
689
690        Call this method when its future has been cancelled. The event can
691        already be signalled (pending in the proactor event queue). It is also
692        safe if the event is never signalled (because it was cancelled).
693        """
694        self._check_closed()
695        self._unregistered.append(ov)
696
697    def _get_accept_socket(self, family):
698        s = socket.socket(family)
699        s.settimeout(0)
700        return s
701
702    def _poll(self, timeout=None):
703        if timeout is None:
704            ms = INFINITE
705        elif timeout < 0:
706            raise ValueError("negative timeout")
707        else:
708            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
709            # round away from zero to wait *at least* timeout seconds.
710            ms = math.ceil(timeout * 1e3)
711            if ms >= INFINITE:
712                raise ValueError("timeout too big")
713
714        while True:
715            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
716            if status is None:
717                break
718            ms = 0
719
720            err, transferred, key, address = status
721            try:
722                f, ov, obj, callback = self._cache.pop(address)
723            except KeyError:
724                if self._loop.get_debug():
725                    self._loop.call_exception_handler({
726                        'message': ('GetQueuedCompletionStatus() returned an '
727                                    'unexpected event'),
728                        'status': ('err=%s transferred=%s key=%#x address=%#x'
729                                   % (err, transferred, key, address)),
730                    })
731
732                # key is either zero, or it is used to return a pipe
733                # handle which should be closed to avoid a leak.
734                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
735                    _winapi.CloseHandle(key)
736                continue
737
738            if obj in self._stopped_serving:
739                f.cancel()
740            # Don't call the callback if _register() already read the result or
741            # if the overlapped has been cancelled
742            elif not f.done():
743                try:
744                    value = callback(transferred, key, ov)
745                except OSError as e:
746                    f.set_exception(e)
747                    self._results.append(f)
748                else:
749                    f.set_result(value)
750                    self._results.append(f)
751
752        # Remove unregistered futures
753        for ov in self._unregistered:
754            self._cache.pop(ov.address, None)
755        self._unregistered.clear()
756
757    def _stop_serving(self, obj):
758        # obj is a socket or pipe handle.  It will be closed in
759        # BaseProactorEventLoop._stop_serving() which will make any
760        # pending operations fail quickly.
761        self._stopped_serving.add(obj)
762
763    def close(self):
764        if self._iocp is None:
765            # already closed
766            return
767
768        # Cancel remaining registered operations.
769        for address, (fut, ov, obj, callback) in list(self._cache.items()):
770            if fut.cancelled():
771                # Nothing to do with cancelled futures
772                pass
773            elif isinstance(fut, _WaitCancelFuture):
774                # _WaitCancelFuture must not be cancelled
775                pass
776            else:
777                try:
778                    fut.cancel()
779                except OSError as exc:
780                    if self._loop is not None:
781                        context = {
782                            'message': 'Cancelling a future failed',
783                            'exception': exc,
784                            'future': fut,
785                        }
786                        if fut._source_traceback:
787                            context['source_traceback'] = fut._source_traceback
788                        self._loop.call_exception_handler(context)
789
790        # Wait until all cancelled overlapped complete: don't exit with running
791        # overlapped to prevent a crash. Display progress every second if the
792        # loop is still running.
793        msg_update = 1.0
794        start_time = time.monotonic()
795        next_msg = start_time + msg_update
796        while self._cache:
797            if next_msg <= time.monotonic():
798                logger.debug('%r is running after closing for %.1f seconds',
799                             self, time.monotonic() - start_time)
800                next_msg = time.monotonic() + msg_update
801
802            # handle a few events, or timeout
803            self._poll(msg_update)
804
805        self._results = []
806
807        _winapi.CloseHandle(self._iocp)
808        self._iocp = None
809
810    def __del__(self):
811        self.close()
812
813
814class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
815
816    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
817        self._proc = windows_utils.Popen(
818            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
819            bufsize=bufsize, **kwargs)
820
821        def callback(f):
822            returncode = self._proc.poll()
823            self._process_exited(returncode)
824
825        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
826        f.add_done_callback(callback)
827
828
829SelectorEventLoop = _WindowsSelectorEventLoop
830
831
832class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
833    _loop_factory = SelectorEventLoop
834
835
836class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
837    _loop_factory = ProactorEventLoop
838
839
840DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
841