1"""Selector and proactor event loops for Windows.""" 2 3import _overlapped 4import _winapi 5import errno 6import math 7import msvcrt 8import socket 9import struct 10import time 11import weakref 12 13from . import events 14from . import base_subprocess 15from . import futures 16from . import proactor_events 17from . import selector_events 18from . import tasks 19from . import windows_utils 20from .log import logger 21 22 23__all__ = ( 24 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 25 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 26 'WindowsProactorEventLoopPolicy', 27) 28 29 30NULL = 0 31INFINITE = 0xffffffff 32ERROR_CONNECTION_REFUSED = 1225 33ERROR_CONNECTION_ABORTED = 1236 34 35# Initial delay in seconds for connect_pipe() before retrying to connect 36CONNECT_PIPE_INIT_DELAY = 0.001 37 38# Maximum delay in seconds for connect_pipe() before retrying to connect 39CONNECT_PIPE_MAX_DELAY = 0.100 40 41 42class _OverlappedFuture(futures.Future): 43 """Subclass of Future which represents an overlapped operation. 44 45 Cancelling it will immediately cancel the overlapped operation. 46 """ 47 48 def __init__(self, ov, *, loop=None): 49 super().__init__(loop=loop) 50 if self._source_traceback: 51 del self._source_traceback[-1] 52 self._ov = ov 53 54 def _repr_info(self): 55 info = super()._repr_info() 56 if self._ov is not None: 57 state = 'pending' if self._ov.pending else 'completed' 58 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 59 return info 60 61 def _cancel_overlapped(self): 62 if self._ov is None: 63 return 64 try: 65 self._ov.cancel() 66 except OSError as exc: 67 context = { 68 'message': 'Cancelling an overlapped future failed', 69 'exception': exc, 70 'future': self, 71 } 72 if self._source_traceback: 73 context['source_traceback'] = self._source_traceback 74 self._loop.call_exception_handler(context) 75 self._ov = None 76 77 def cancel(self): 78 self._cancel_overlapped() 79 return super().cancel() 80 81 def set_exception(self, exception): 82 super().set_exception(exception) 83 self._cancel_overlapped() 84 85 def set_result(self, result): 86 super().set_result(result) 87 self._ov = None 88 89 90class _BaseWaitHandleFuture(futures.Future): 91 """Subclass of Future which represents a wait handle.""" 92 93 def __init__(self, ov, handle, wait_handle, *, loop=None): 94 super().__init__(loop=loop) 95 if self._source_traceback: 96 del self._source_traceback[-1] 97 # Keep a reference to the Overlapped object to keep it alive until the 98 # wait is unregistered 99 self._ov = ov 100 self._handle = handle 101 self._wait_handle = wait_handle 102 103 # Should we call UnregisterWaitEx() if the wait completes 104 # or is cancelled? 105 self._registered = True 106 107 def _poll(self): 108 # non-blocking wait: use a timeout of 0 millisecond 109 return (_winapi.WaitForSingleObject(self._handle, 0) == 110 _winapi.WAIT_OBJECT_0) 111 112 def _repr_info(self): 113 info = super()._repr_info() 114 info.append(f'handle={self._handle:#x}') 115 if self._handle is not None: 116 state = 'signaled' if self._poll() else 'waiting' 117 info.append(state) 118 if self._wait_handle is not None: 119 info.append(f'wait_handle={self._wait_handle:#x}') 120 return info 121 122 def _unregister_wait_cb(self, fut): 123 # The wait was unregistered: it's not safe to destroy the Overlapped 124 # object 125 self._ov = None 126 127 def _unregister_wait(self): 128 if not self._registered: 129 return 130 self._registered = False 131 132 wait_handle = self._wait_handle 133 self._wait_handle = None 134 try: 135 _overlapped.UnregisterWait(wait_handle) 136 except OSError as exc: 137 if exc.winerror != _overlapped.ERROR_IO_PENDING: 138 context = { 139 'message': 'Failed to unregister the wait handle', 140 'exception': exc, 141 'future': self, 142 } 143 if self._source_traceback: 144 context['source_traceback'] = self._source_traceback 145 self._loop.call_exception_handler(context) 146 return 147 # ERROR_IO_PENDING means that the unregister is pending 148 149 self._unregister_wait_cb(None) 150 151 def cancel(self): 152 self._unregister_wait() 153 return super().cancel() 154 155 def set_exception(self, exception): 156 self._unregister_wait() 157 super().set_exception(exception) 158 159 def set_result(self, result): 160 self._unregister_wait() 161 super().set_result(result) 162 163 164class _WaitCancelFuture(_BaseWaitHandleFuture): 165 """Subclass of Future which represents a wait for the cancellation of a 166 _WaitHandleFuture using an event. 167 """ 168 169 def __init__(self, ov, event, wait_handle, *, loop=None): 170 super().__init__(ov, event, wait_handle, loop=loop) 171 172 self._done_callback = None 173 174 def cancel(self): 175 raise RuntimeError("_WaitCancelFuture must not be cancelled") 176 177 def set_result(self, result): 178 super().set_result(result) 179 if self._done_callback is not None: 180 self._done_callback(self) 181 182 def set_exception(self, exception): 183 super().set_exception(exception) 184 if self._done_callback is not None: 185 self._done_callback(self) 186 187 188class _WaitHandleFuture(_BaseWaitHandleFuture): 189 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 190 super().__init__(ov, handle, wait_handle, loop=loop) 191 self._proactor = proactor 192 self._unregister_proactor = True 193 self._event = _overlapped.CreateEvent(None, True, False, None) 194 self._event_fut = None 195 196 def _unregister_wait_cb(self, fut): 197 if self._event is not None: 198 _winapi.CloseHandle(self._event) 199 self._event = None 200 self._event_fut = None 201 202 # If the wait was cancelled, the wait may never be signalled, so 203 # it's required to unregister it. Otherwise, IocpProactor.close() will 204 # wait forever for an event which will never come. 205 # 206 # If the IocpProactor already received the event, it's safe to call 207 # _unregister() because we kept a reference to the Overlapped object 208 # which is used as a unique key. 209 self._proactor._unregister(self._ov) 210 self._proactor = None 211 212 super()._unregister_wait_cb(fut) 213 214 def _unregister_wait(self): 215 if not self._registered: 216 return 217 self._registered = False 218 219 wait_handle = self._wait_handle 220 self._wait_handle = None 221 try: 222 _overlapped.UnregisterWaitEx(wait_handle, self._event) 223 except OSError as exc: 224 if exc.winerror != _overlapped.ERROR_IO_PENDING: 225 context = { 226 'message': 'Failed to unregister the wait handle', 227 'exception': exc, 228 'future': self, 229 } 230 if self._source_traceback: 231 context['source_traceback'] = self._source_traceback 232 self._loop.call_exception_handler(context) 233 return 234 # ERROR_IO_PENDING is not an error, the wait was unregistered 235 236 self._event_fut = self._proactor._wait_cancel(self._event, 237 self._unregister_wait_cb) 238 239 240class PipeServer(object): 241 """Class representing a pipe server. 242 243 This is much like a bound, listening socket. 244 """ 245 def __init__(self, address): 246 self._address = address 247 self._free_instances = weakref.WeakSet() 248 # initialize the pipe attribute before calling _server_pipe_handle() 249 # because this function can raise an exception and the destructor calls 250 # the close() method 251 self._pipe = None 252 self._accept_pipe_future = None 253 self._pipe = self._server_pipe_handle(True) 254 255 def _get_unconnected_pipe(self): 256 # Create new instance and return previous one. This ensures 257 # that (until the server is closed) there is always at least 258 # one pipe handle for address. Therefore if a client attempt 259 # to connect it will not fail with FileNotFoundError. 260 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 261 return tmp 262 263 def _server_pipe_handle(self, first): 264 # Return a wrapper for a new pipe handle. 265 if self.closed(): 266 return None 267 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 268 if first: 269 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 270 h = _winapi.CreateNamedPipe( 271 self._address, flags, 272 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 273 _winapi.PIPE_WAIT, 274 _winapi.PIPE_UNLIMITED_INSTANCES, 275 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 276 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 277 pipe = windows_utils.PipeHandle(h) 278 self._free_instances.add(pipe) 279 return pipe 280 281 def closed(self): 282 return (self._address is None) 283 284 def close(self): 285 if self._accept_pipe_future is not None: 286 self._accept_pipe_future.cancel() 287 self._accept_pipe_future = None 288 # Close all instances which have not been connected to by a client. 289 if self._address is not None: 290 for pipe in self._free_instances: 291 pipe.close() 292 self._pipe = None 293 self._address = None 294 self._free_instances.clear() 295 296 __del__ = close 297 298 299class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 300 """Windows version of selector event loop.""" 301 302 303class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 304 """Windows version of proactor event loop using IOCP.""" 305 306 def __init__(self, proactor=None): 307 if proactor is None: 308 proactor = IocpProactor() 309 super().__init__(proactor) 310 311 async def create_pipe_connection(self, protocol_factory, address): 312 f = self._proactor.connect_pipe(address) 313 pipe = await f 314 protocol = protocol_factory() 315 trans = self._make_duplex_pipe_transport(pipe, protocol, 316 extra={'addr': address}) 317 return trans, protocol 318 319 async def start_serving_pipe(self, protocol_factory, address): 320 server = PipeServer(address) 321 322 def loop_accept_pipe(f=None): 323 pipe = None 324 try: 325 if f: 326 pipe = f.result() 327 server._free_instances.discard(pipe) 328 329 if server.closed(): 330 # A client connected before the server was closed: 331 # drop the client (close the pipe) and exit 332 pipe.close() 333 return 334 335 protocol = protocol_factory() 336 self._make_duplex_pipe_transport( 337 pipe, protocol, extra={'addr': address}) 338 339 pipe = server._get_unconnected_pipe() 340 if pipe is None: 341 return 342 343 f = self._proactor.accept_pipe(pipe) 344 except OSError as exc: 345 if pipe and pipe.fileno() != -1: 346 self.call_exception_handler({ 347 'message': 'Pipe accept failed', 348 'exception': exc, 349 'pipe': pipe, 350 }) 351 pipe.close() 352 elif self._debug: 353 logger.warning("Accept pipe failed on pipe %r", 354 pipe, exc_info=True) 355 except futures.CancelledError: 356 if pipe: 357 pipe.close() 358 else: 359 server._accept_pipe_future = f 360 f.add_done_callback(loop_accept_pipe) 361 362 self.call_soon(loop_accept_pipe) 363 return [server] 364 365 async def _make_subprocess_transport(self, protocol, args, shell, 366 stdin, stdout, stderr, bufsize, 367 extra=None, **kwargs): 368 waiter = self.create_future() 369 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 370 stdin, stdout, stderr, bufsize, 371 waiter=waiter, extra=extra, 372 **kwargs) 373 try: 374 await waiter 375 except Exception: 376 transp.close() 377 await transp._wait() 378 raise 379 380 return transp 381 382 383class IocpProactor: 384 """Proactor implementation using IOCP.""" 385 386 def __init__(self, concurrency=0xffffffff): 387 self._loop = None 388 self._results = [] 389 self._iocp = _overlapped.CreateIoCompletionPort( 390 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 391 self._cache = {} 392 self._registered = weakref.WeakSet() 393 self._unregistered = [] 394 self._stopped_serving = weakref.WeakSet() 395 396 def _check_closed(self): 397 if self._iocp is None: 398 raise RuntimeError('IocpProactor is closed') 399 400 def __repr__(self): 401 info = ['overlapped#=%s' % len(self._cache), 402 'result#=%s' % len(self._results)] 403 if self._iocp is None: 404 info.append('closed') 405 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 406 407 def set_loop(self, loop): 408 self._loop = loop 409 410 def select(self, timeout=None): 411 if not self._results: 412 self._poll(timeout) 413 tmp = self._results 414 self._results = [] 415 return tmp 416 417 def _result(self, value): 418 fut = self._loop.create_future() 419 fut.set_result(value) 420 return fut 421 422 def recv(self, conn, nbytes, flags=0): 423 self._register_with_iocp(conn) 424 ov = _overlapped.Overlapped(NULL) 425 try: 426 if isinstance(conn, socket.socket): 427 ov.WSARecv(conn.fileno(), nbytes, flags) 428 else: 429 ov.ReadFile(conn.fileno(), nbytes) 430 except BrokenPipeError: 431 return self._result(b'') 432 433 def finish_recv(trans, key, ov): 434 try: 435 return ov.getresult() 436 except OSError as exc: 437 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 438 _overlapped.ERROR_OPERATION_ABORTED): 439 raise ConnectionResetError(*exc.args) 440 else: 441 raise 442 443 return self._register(ov, conn, finish_recv) 444 445 def recv_into(self, conn, buf, flags=0): 446 self._register_with_iocp(conn) 447 ov = _overlapped.Overlapped(NULL) 448 try: 449 if isinstance(conn, socket.socket): 450 ov.WSARecvInto(conn.fileno(), buf, flags) 451 else: 452 ov.ReadFileInto(conn.fileno(), buf) 453 except BrokenPipeError: 454 return self._result(b'') 455 456 def finish_recv(trans, key, ov): 457 try: 458 return ov.getresult() 459 except OSError as exc: 460 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 461 _overlapped.ERROR_OPERATION_ABORTED): 462 raise ConnectionResetError(*exc.args) 463 else: 464 raise 465 466 return self._register(ov, conn, finish_recv) 467 468 def send(self, conn, buf, flags=0): 469 self._register_with_iocp(conn) 470 ov = _overlapped.Overlapped(NULL) 471 if isinstance(conn, socket.socket): 472 ov.WSASend(conn.fileno(), buf, flags) 473 else: 474 ov.WriteFile(conn.fileno(), buf) 475 476 def finish_send(trans, key, ov): 477 try: 478 return ov.getresult() 479 except OSError as exc: 480 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 481 _overlapped.ERROR_OPERATION_ABORTED): 482 raise ConnectionResetError(*exc.args) 483 else: 484 raise 485 486 return self._register(ov, conn, finish_send) 487 488 def accept(self, listener): 489 self._register_with_iocp(listener) 490 conn = self._get_accept_socket(listener.family) 491 ov = _overlapped.Overlapped(NULL) 492 ov.AcceptEx(listener.fileno(), conn.fileno()) 493 494 def finish_accept(trans, key, ov): 495 ov.getresult() 496 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 497 buf = struct.pack('@P', listener.fileno()) 498 conn.setsockopt(socket.SOL_SOCKET, 499 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 500 conn.settimeout(listener.gettimeout()) 501 return conn, conn.getpeername() 502 503 async def accept_coro(future, conn): 504 # Coroutine closing the accept socket if the future is cancelled 505 try: 506 await future 507 except futures.CancelledError: 508 conn.close() 509 raise 510 511 future = self._register(ov, listener, finish_accept) 512 coro = accept_coro(future, conn) 513 tasks.ensure_future(coro, loop=self._loop) 514 return future 515 516 def connect(self, conn, address): 517 self._register_with_iocp(conn) 518 # The socket needs to be locally bound before we call ConnectEx(). 519 try: 520 _overlapped.BindLocal(conn.fileno(), conn.family) 521 except OSError as e: 522 if e.winerror != errno.WSAEINVAL: 523 raise 524 # Probably already locally bound; check using getsockname(). 525 if conn.getsockname()[1] == 0: 526 raise 527 ov = _overlapped.Overlapped(NULL) 528 ov.ConnectEx(conn.fileno(), address) 529 530 def finish_connect(trans, key, ov): 531 ov.getresult() 532 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 533 conn.setsockopt(socket.SOL_SOCKET, 534 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 535 return conn 536 537 return self._register(ov, conn, finish_connect) 538 539 def sendfile(self, sock, file, offset, count): 540 self._register_with_iocp(sock) 541 ov = _overlapped.Overlapped(NULL) 542 offset_low = offset & 0xffff_ffff 543 offset_high = (offset >> 32) & 0xffff_ffff 544 ov.TransmitFile(sock.fileno(), 545 msvcrt.get_osfhandle(file.fileno()), 546 offset_low, offset_high, 547 count, 0, 0) 548 549 def finish_sendfile(trans, key, ov): 550 try: 551 return ov.getresult() 552 except OSError as exc: 553 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 554 _overlapped.ERROR_OPERATION_ABORTED): 555 raise ConnectionResetError(*exc.args) 556 else: 557 raise 558 return self._register(ov, sock, finish_sendfile) 559 560 def accept_pipe(self, pipe): 561 self._register_with_iocp(pipe) 562 ov = _overlapped.Overlapped(NULL) 563 connected = ov.ConnectNamedPipe(pipe.fileno()) 564 565 if connected: 566 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 567 # that the pipe is connected. There is no need to wait for the 568 # completion of the connection. 569 return self._result(pipe) 570 571 def finish_accept_pipe(trans, key, ov): 572 ov.getresult() 573 return pipe 574 575 return self._register(ov, pipe, finish_accept_pipe) 576 577 async def connect_pipe(self, address): 578 delay = CONNECT_PIPE_INIT_DELAY 579 while True: 580 # Unfortunately there is no way to do an overlapped connect to 581 # a pipe. Call CreateFile() in a loop until it doesn't fail with 582 # ERROR_PIPE_BUSY. 583 try: 584 handle = _overlapped.ConnectPipe(address) 585 break 586 except OSError as exc: 587 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 588 raise 589 590 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 591 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 592 await tasks.sleep(delay, loop=self._loop) 593 594 return windows_utils.PipeHandle(handle) 595 596 def wait_for_handle(self, handle, timeout=None): 597 """Wait for a handle. 598 599 Return a Future object. The result of the future is True if the wait 600 completed, or False if the wait did not complete (on timeout). 601 """ 602 return self._wait_for_handle(handle, timeout, False) 603 604 def _wait_cancel(self, event, done_callback): 605 fut = self._wait_for_handle(event, None, True) 606 # add_done_callback() cannot be used because the wait may only complete 607 # in IocpProactor.close(), while the event loop is not running. 608 fut._done_callback = done_callback 609 return fut 610 611 def _wait_for_handle(self, handle, timeout, _is_cancel): 612 self._check_closed() 613 614 if timeout is None: 615 ms = _winapi.INFINITE 616 else: 617 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 618 # round away from zero to wait *at least* timeout seconds. 619 ms = math.ceil(timeout * 1e3) 620 621 # We only create ov so we can use ov.address as a key for the cache. 622 ov = _overlapped.Overlapped(NULL) 623 wait_handle = _overlapped.RegisterWaitWithQueue( 624 handle, self._iocp, ov.address, ms) 625 if _is_cancel: 626 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 627 else: 628 f = _WaitHandleFuture(ov, handle, wait_handle, self, 629 loop=self._loop) 630 if f._source_traceback: 631 del f._source_traceback[-1] 632 633 def finish_wait_for_handle(trans, key, ov): 634 # Note that this second wait means that we should only use 635 # this with handles types where a successful wait has no 636 # effect. So events or processes are all right, but locks 637 # or semaphores are not. Also note if the handle is 638 # signalled and then quickly reset, then we may return 639 # False even though we have not timed out. 640 return f._poll() 641 642 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 643 return f 644 645 def _register_with_iocp(self, obj): 646 # To get notifications of finished ops on this objects sent to the 647 # completion port, were must register the handle. 648 if obj not in self._registered: 649 self._registered.add(obj) 650 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 651 # XXX We could also use SetFileCompletionNotificationModes() 652 # to avoid sending notifications to completion port of ops 653 # that succeed immediately. 654 655 def _register(self, ov, obj, callback): 656 self._check_closed() 657 658 # Return a future which will be set with the result of the 659 # operation when it completes. The future's value is actually 660 # the value returned by callback(). 661 f = _OverlappedFuture(ov, loop=self._loop) 662 if f._source_traceback: 663 del f._source_traceback[-1] 664 if not ov.pending: 665 # The operation has completed, so no need to postpone the 666 # work. We cannot take this short cut if we need the 667 # NumberOfBytes, CompletionKey values returned by 668 # PostQueuedCompletionStatus(). 669 try: 670 value = callback(None, None, ov) 671 except OSError as e: 672 f.set_exception(e) 673 else: 674 f.set_result(value) 675 # Even if GetOverlappedResult() was called, we have to wait for the 676 # notification of the completion in GetQueuedCompletionStatus(). 677 # Register the overlapped operation to keep a reference to the 678 # OVERLAPPED object, otherwise the memory is freed and Windows may 679 # read uninitialized memory. 680 681 # Register the overlapped operation for later. Note that 682 # we only store obj to prevent it from being garbage 683 # collected too early. 684 self._cache[ov.address] = (f, ov, obj, callback) 685 return f 686 687 def _unregister(self, ov): 688 """Unregister an overlapped object. 689 690 Call this method when its future has been cancelled. The event can 691 already be signalled (pending in the proactor event queue). It is also 692 safe if the event is never signalled (because it was cancelled). 693 """ 694 self._check_closed() 695 self._unregistered.append(ov) 696 697 def _get_accept_socket(self, family): 698 s = socket.socket(family) 699 s.settimeout(0) 700 return s 701 702 def _poll(self, timeout=None): 703 if timeout is None: 704 ms = INFINITE 705 elif timeout < 0: 706 raise ValueError("negative timeout") 707 else: 708 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 709 # round away from zero to wait *at least* timeout seconds. 710 ms = math.ceil(timeout * 1e3) 711 if ms >= INFINITE: 712 raise ValueError("timeout too big") 713 714 while True: 715 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 716 if status is None: 717 break 718 ms = 0 719 720 err, transferred, key, address = status 721 try: 722 f, ov, obj, callback = self._cache.pop(address) 723 except KeyError: 724 if self._loop.get_debug(): 725 self._loop.call_exception_handler({ 726 'message': ('GetQueuedCompletionStatus() returned an ' 727 'unexpected event'), 728 'status': ('err=%s transferred=%s key=%#x address=%#x' 729 % (err, transferred, key, address)), 730 }) 731 732 # key is either zero, or it is used to return a pipe 733 # handle which should be closed to avoid a leak. 734 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 735 _winapi.CloseHandle(key) 736 continue 737 738 if obj in self._stopped_serving: 739 f.cancel() 740 # Don't call the callback if _register() already read the result or 741 # if the overlapped has been cancelled 742 elif not f.done(): 743 try: 744 value = callback(transferred, key, ov) 745 except OSError as e: 746 f.set_exception(e) 747 self._results.append(f) 748 else: 749 f.set_result(value) 750 self._results.append(f) 751 752 # Remove unregistered futures 753 for ov in self._unregistered: 754 self._cache.pop(ov.address, None) 755 self._unregistered.clear() 756 757 def _stop_serving(self, obj): 758 # obj is a socket or pipe handle. It will be closed in 759 # BaseProactorEventLoop._stop_serving() which will make any 760 # pending operations fail quickly. 761 self._stopped_serving.add(obj) 762 763 def close(self): 764 if self._iocp is None: 765 # already closed 766 return 767 768 # Cancel remaining registered operations. 769 for address, (fut, ov, obj, callback) in list(self._cache.items()): 770 if fut.cancelled(): 771 # Nothing to do with cancelled futures 772 pass 773 elif isinstance(fut, _WaitCancelFuture): 774 # _WaitCancelFuture must not be cancelled 775 pass 776 else: 777 try: 778 fut.cancel() 779 except OSError as exc: 780 if self._loop is not None: 781 context = { 782 'message': 'Cancelling a future failed', 783 'exception': exc, 784 'future': fut, 785 } 786 if fut._source_traceback: 787 context['source_traceback'] = fut._source_traceback 788 self._loop.call_exception_handler(context) 789 790 # Wait until all cancelled overlapped complete: don't exit with running 791 # overlapped to prevent a crash. Display progress every second if the 792 # loop is still running. 793 msg_update = 1.0 794 start_time = time.monotonic() 795 next_msg = start_time + msg_update 796 while self._cache: 797 if next_msg <= time.monotonic(): 798 logger.debug('%r is running after closing for %.1f seconds', 799 self, time.monotonic() - start_time) 800 next_msg = time.monotonic() + msg_update 801 802 # handle a few events, or timeout 803 self._poll(msg_update) 804 805 self._results = [] 806 807 _winapi.CloseHandle(self._iocp) 808 self._iocp = None 809 810 def __del__(self): 811 self.close() 812 813 814class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 815 816 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 817 self._proc = windows_utils.Popen( 818 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 819 bufsize=bufsize, **kwargs) 820 821 def callback(f): 822 returncode = self._proc.poll() 823 self._process_exited(returncode) 824 825 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 826 f.add_done_callback(callback) 827 828 829SelectorEventLoop = _WindowsSelectorEventLoop 830 831 832class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 833 _loop_factory = SelectorEventLoop 834 835 836class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 837 _loop_factory = ProactorEventLoop 838 839 840DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy 841