1# 2# Module which deals with pickling of objects. 3# 4# multiprocessing/reduction.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10from abc import ABCMeta 11import copyreg 12import functools 13import io 14import os 15import pickle 16import socket 17import sys 18 19from . import context 20 21__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] 22 23 24HAVE_SEND_HANDLE = (sys.platform == 'win32' or 25 (hasattr(socket, 'CMSG_LEN') and 26 hasattr(socket, 'SCM_RIGHTS') and 27 hasattr(socket.socket, 'sendmsg'))) 28 29# 30# Pickler subclass 31# 32 33class ForkingPickler(pickle.Pickler): 34 '''Pickler subclass used by multiprocessing.''' 35 _extra_reducers = {} 36 _copyreg_dispatch_table = copyreg.dispatch_table 37 38 def __init__(self, *args): 39 super().__init__(*args) 40 self.dispatch_table = self._copyreg_dispatch_table.copy() 41 self.dispatch_table.update(self._extra_reducers) 42 43 @classmethod 44 def register(cls, type, reduce): 45 '''Register a reduce function for a type.''' 46 cls._extra_reducers[type] = reduce 47 48 @classmethod 49 def dumps(cls, obj, protocol=None): 50 buf = io.BytesIO() 51 cls(buf, protocol).dump(obj) 52 return buf.getbuffer() 53 54 loads = pickle.loads 55 56register = ForkingPickler.register 57 58def dump(obj, file, protocol=None): 59 '''Replacement for pickle.dump() using ForkingPickler.''' 60 ForkingPickler(file, protocol).dump(obj) 61 62# 63# Platform specific definitions 64# 65 66if sys.platform == 'win32': 67 # Windows 68 __all__ += ['DupHandle', 'duplicate', 'steal_handle'] 69 import _winapi 70 71 def duplicate(handle, target_process=None, inheritable=False, 72 *, source_process=None): 73 '''Duplicate a handle. (target_process is a handle not a pid!)''' 74 current_process = _winapi.GetCurrentProcess() 75 if source_process is None: 76 source_process = current_process 77 if target_process is None: 78 target_process = current_process 79 return _winapi.DuplicateHandle( 80 source_process, handle, target_process, 81 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) 82 83 def steal_handle(source_pid, handle): 84 '''Steal a handle from process identified by source_pid.''' 85 source_process_handle = _winapi.OpenProcess( 86 _winapi.PROCESS_DUP_HANDLE, False, source_pid) 87 try: 88 return _winapi.DuplicateHandle( 89 source_process_handle, handle, 90 _winapi.GetCurrentProcess(), 0, False, 91 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) 92 finally: 93 _winapi.CloseHandle(source_process_handle) 94 95 def send_handle(conn, handle, destination_pid): 96 '''Send a handle over a local connection.''' 97 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) 98 conn.send(dh) 99 100 def recv_handle(conn): 101 '''Receive a handle over a local connection.''' 102 return conn.recv().detach() 103 104 class DupHandle(object): 105 '''Picklable wrapper for a handle.''' 106 def __init__(self, handle, access, pid=None): 107 if pid is None: 108 # We just duplicate the handle in the current process and 109 # let the receiving process steal the handle. 110 pid = os.getpid() 111 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) 112 try: 113 self._handle = _winapi.DuplicateHandle( 114 _winapi.GetCurrentProcess(), 115 handle, proc, access, False, 0) 116 finally: 117 _winapi.CloseHandle(proc) 118 self._access = access 119 self._pid = pid 120 121 def detach(self): 122 '''Get the handle. This should only be called once.''' 123 # retrieve handle from process which currently owns it 124 if self._pid == os.getpid(): 125 # The handle has already been duplicated for this process. 126 return self._handle 127 # We must steal the handle from the process whose pid is self._pid. 128 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, 129 self._pid) 130 try: 131 return _winapi.DuplicateHandle( 132 proc, self._handle, _winapi.GetCurrentProcess(), 133 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) 134 finally: 135 _winapi.CloseHandle(proc) 136 137else: 138 # Unix 139 __all__ += ['DupFd', 'sendfds', 'recvfds'] 140 import array 141 142 # On MacOSX we should acknowledge receipt of fds -- see Issue14669 143 ACKNOWLEDGE = sys.platform == 'darwin' 144 145 def sendfds(sock, fds): 146 '''Send an array of fds over an AF_UNIX socket.''' 147 fds = array.array('i', fds) 148 msg = bytes([len(fds) % 256]) 149 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 150 if ACKNOWLEDGE and sock.recv(1) != b'A': 151 raise RuntimeError('did not receive acknowledgement of fd') 152 153 def recvfds(sock, size): 154 '''Receive an array of fds over an AF_UNIX socket.''' 155 a = array.array('i') 156 bytes_size = a.itemsize * size 157 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) 158 if not msg and not ancdata: 159 raise EOFError 160 try: 161 if ACKNOWLEDGE: 162 sock.send(b'A') 163 if len(ancdata) != 1: 164 raise RuntimeError('received %d items of ancdata' % 165 len(ancdata)) 166 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 167 if (cmsg_level == socket.SOL_SOCKET and 168 cmsg_type == socket.SCM_RIGHTS): 169 if len(cmsg_data) % a.itemsize != 0: 170 raise ValueError 171 a.frombytes(cmsg_data) 172 if len(a) % 256 != msg[0]: 173 raise AssertionError( 174 "Len is {0:n} but msg[0] is {1!r}".format( 175 len(a), msg[0])) 176 return list(a) 177 except (ValueError, IndexError): 178 pass 179 raise RuntimeError('Invalid data received') 180 181 def send_handle(conn, handle, destination_pid): 182 '''Send a handle over a local connection.''' 183 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 184 sendfds(s, [handle]) 185 186 def recv_handle(conn): 187 '''Receive a handle over a local connection.''' 188 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 189 return recvfds(s, 1)[0] 190 191 def DupFd(fd): 192 '''Return a wrapper for an fd.''' 193 popen_obj = context.get_spawning_popen() 194 if popen_obj is not None: 195 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) 196 elif HAVE_SEND_HANDLE: 197 from . import resource_sharer 198 return resource_sharer.DupFd(fd) 199 else: 200 raise ValueError('SCM_RIGHTS appears not to be available') 201 202# 203# Try making some callable types picklable 204# 205 206def _reduce_method(m): 207 if m.__self__ is None: 208 return getattr, (m.__class__, m.__func__.__name__) 209 else: 210 return getattr, (m.__self__, m.__func__.__name__) 211class _C: 212 def f(self): 213 pass 214register(type(_C().f), _reduce_method) 215 216 217def _reduce_method_descriptor(m): 218 return getattr, (m.__objclass__, m.__name__) 219register(type(list.append), _reduce_method_descriptor) 220register(type(int.__add__), _reduce_method_descriptor) 221 222 223def _reduce_partial(p): 224 return _rebuild_partial, (p.func, p.args, p.keywords or {}) 225def _rebuild_partial(func, args, keywords): 226 return functools.partial(func, *args, **keywords) 227register(functools.partial, _reduce_partial) 228 229# 230# Make sockets picklable 231# 232 233if sys.platform == 'win32': 234 def _reduce_socket(s): 235 from .resource_sharer import DupSocket 236 return _rebuild_socket, (DupSocket(s),) 237 def _rebuild_socket(ds): 238 return ds.detach() 239 register(socket.socket, _reduce_socket) 240 241else: 242 def _reduce_socket(s): 243 df = DupFd(s.fileno()) 244 return _rebuild_socket, (df, s.family, s.type, s.proto) 245 def _rebuild_socket(df, family, type, proto): 246 fd = df.detach() 247 return socket.socket(family, type, proto, fileno=fd) 248 register(socket.socket, _reduce_socket) 249 250 251class AbstractReducer(metaclass=ABCMeta): 252 '''Abstract base class for use in implementing a Reduction class 253 suitable for use in replacing the standard reduction mechanism 254 used in multiprocessing.''' 255 ForkingPickler = ForkingPickler 256 register = register 257 dump = dump 258 send_handle = send_handle 259 recv_handle = recv_handle 260 261 if sys.platform == 'win32': 262 steal_handle = steal_handle 263 duplicate = duplicate 264 DupHandle = DupHandle 265 else: 266 sendfds = sendfds 267 recvfds = recvfds 268 DupFd = DupFd 269 270 _reduce_method = _reduce_method 271 _reduce_method_descriptor = _reduce_method_descriptor 272 _rebuild_partial = _rebuild_partial 273 _reduce_socket = _reduce_socket 274 _rebuild_socket = _rebuild_socket 275 276 def __init__(self, *args): 277 register(type(_C().f), _reduce_method) 278 register(type(list.append), _reduce_method_descriptor) 279 register(type(int.__add__), _reduce_method_descriptor) 280 register(functools.partial, _reduce_partial) 281 register(socket.socket, _reduce_socket) 282