1# 2# We use a background thread for sharing fds on Unix, and for sharing sockets on 3# Windows. 4# 5# A client which wants to pickle a resource registers it with the resource 6# sharer and gets an identifier in return. The unpickling process will connect 7# to the resource sharer, sends the identifier and its pid, and then receives 8# the resource. 9# 10 11import os 12import signal 13import socket 14import sys 15import threading 16 17from . import process 18from .context import reduction 19from . import util 20 21__all__ = ['stop'] 22 23 24if sys.platform == 'win32': 25 __all__ += ['DupSocket'] 26 27 class DupSocket(object): 28 '''Picklable wrapper for a socket.''' 29 def __init__(self, sock): 30 new_sock = sock.dup() 31 def send(conn, pid): 32 share = new_sock.share(pid) 33 conn.send_bytes(share) 34 self._id = _resource_sharer.register(send, new_sock.close) 35 36 def detach(self): 37 '''Get the socket. This should only be called once.''' 38 with _resource_sharer.get_connection(self._id) as conn: 39 share = conn.recv_bytes() 40 return socket.fromshare(share) 41 42else: 43 __all__ += ['DupFd'] 44 45 class DupFd(object): 46 '''Wrapper for fd which can be used at any time.''' 47 def __init__(self, fd): 48 new_fd = os.dup(fd) 49 def send(conn, pid): 50 reduction.send_handle(conn, new_fd, pid) 51 def close(): 52 os.close(new_fd) 53 self._id = _resource_sharer.register(send, close) 54 55 def detach(self): 56 '''Get the fd. This should only be called once.''' 57 with _resource_sharer.get_connection(self._id) as conn: 58 return reduction.recv_handle(conn) 59 60 61class _ResourceSharer(object): 62 '''Manager for resources using background thread.''' 63 def __init__(self): 64 self._key = 0 65 self._cache = {} 66 self._lock = threading.Lock() 67 self._listener = None 68 self._address = None 69 self._thread = None 70 util.register_after_fork(self, _ResourceSharer._afterfork) 71 72 def register(self, send, close): 73 '''Register resource, returning an identifier.''' 74 with self._lock: 75 if self._address is None: 76 self._start() 77 self._key += 1 78 self._cache[self._key] = (send, close) 79 return (self._address, self._key) 80 81 @staticmethod 82 def get_connection(ident): 83 '''Return connection from which to receive identified resource.''' 84 from .connection import Client 85 address, key = ident 86 c = Client(address, authkey=process.current_process().authkey) 87 c.send((key, os.getpid())) 88 return c 89 90 def stop(self, timeout=None): 91 '''Stop the background thread and clear registered resources.''' 92 from .connection import Client 93 with self._lock: 94 if self._address is not None: 95 c = Client(self._address, 96 authkey=process.current_process().authkey) 97 c.send(None) 98 c.close() 99 self._thread.join(timeout) 100 if self._thread.is_alive(): 101 util.sub_warning('_ResourceSharer thread did ' 102 'not stop when asked') 103 self._listener.close() 104 self._thread = None 105 self._address = None 106 self._listener = None 107 for key, (send, close) in self._cache.items(): 108 close() 109 self._cache.clear() 110 111 def _afterfork(self): 112 for key, (send, close) in self._cache.items(): 113 close() 114 self._cache.clear() 115 self._lock._at_fork_reinit() 116 if self._listener is not None: 117 self._listener.close() 118 self._listener = None 119 self._address = None 120 self._thread = None 121 122 def _start(self): 123 from .connection import Listener 124 assert self._listener is None, "Already have Listener" 125 util.debug('starting listener and thread for sending handles') 126 self._listener = Listener(authkey=process.current_process().authkey) 127 self._address = self._listener.address 128 t = threading.Thread(target=self._serve) 129 t.daemon = True 130 t.start() 131 self._thread = t 132 133 def _serve(self): 134 if hasattr(signal, 'pthread_sigmask'): 135 signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 136 while 1: 137 try: 138 with self._listener.accept() as conn: 139 msg = conn.recv() 140 if msg is None: 141 break 142 key, destination_pid = msg 143 send, close = self._cache.pop(key) 144 try: 145 send(conn, destination_pid) 146 finally: 147 close() 148 except: 149 if not util.is_exiting(): 150 sys.excepthook(*sys.exc_info()) 151 152 153_resource_sharer = _ResourceSharer() 154stop = _resource_sharer.stop 155