1import io 2import os 3 4from .context import reduction, set_spawning_popen 5if not reduction.HAVE_SEND_HANDLE: 6 raise ImportError('No support for sending fds between processes') 7from . import forkserver 8from . import popen_fork 9from . import spawn 10from . import util 11 12 13__all__ = ['Popen'] 14 15# 16# Wrapper for an fd used while launching a process 17# 18 19class _DupFd(object): 20 def __init__(self, ind): 21 self.ind = ind 22 def detach(self): 23 return forkserver.get_inherited_fds()[self.ind] 24 25# 26# Start child process using a server process 27# 28 29class Popen(popen_fork.Popen): 30 method = 'forkserver' 31 DupFd = _DupFd 32 33 def __init__(self, process_obj): 34 self._fds = [] 35 super().__init__(process_obj) 36 37 def duplicate_for_child(self, fd): 38 self._fds.append(fd) 39 return len(self._fds) - 1 40 41 def _launch(self, process_obj): 42 prep_data = spawn.get_preparation_data(process_obj._name) 43 buf = io.BytesIO() 44 set_spawning_popen(self) 45 try: 46 reduction.dump(prep_data, buf) 47 reduction.dump(process_obj, buf) 48 finally: 49 set_spawning_popen(None) 50 51 self.sentinel, w = forkserver.connect_to_new_process(self._fds) 52 util.Finalize(self, os.close, (self.sentinel,)) 53 with open(w, 'wb', closefd=True) as f: 54 f.write(buf.getbuffer()) 55 self.pid = forkserver.read_unsigned(self.sentinel) 56 57 def poll(self, flag=os.WNOHANG): 58 if self.returncode is None: 59 from multiprocessing.connection import wait 60 timeout = 0 if flag == os.WNOHANG else None 61 if not wait([self.sentinel], timeout): 62 return None 63 try: 64 self.returncode = forkserver.read_unsigned(self.sentinel) 65 except (OSError, EOFError): 66 # The process ended abnormally perhaps because of a signal 67 self.returncode = 255 68 return self.returncode 69