1# 2# Module which deals with pickling of objects. 3# 4# multiprocessing/reduction.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10from abc import ABCMeta 11import copyreg 12import functools 13import io 14import os 15import pickle 16import socket 17import sys 18 19from . import context 20 21__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] 22 23 24HAVE_SEND_HANDLE = (sys.platform == 'win32' or 25 (hasattr(socket, 'CMSG_LEN') and 26 hasattr(socket, 'SCM_RIGHTS') and 27 hasattr(socket.socket, 'sendmsg'))) 28 29# 30# Pickler subclass 31# 32 33class ForkingPickler(pickle.Pickler): 34 '''Pickler subclass used by multiprocessing.''' 35 _extra_reducers = {} 36 _copyreg_dispatch_table = copyreg.dispatch_table 37 38 def __init__(self, *args): 39 super().__init__(*args) 40 self.dispatch_table = self._copyreg_dispatch_table.copy() 41 self.dispatch_table.update(self._extra_reducers) 42 43 @classmethod 44 def register(cls, type, reduce): 45 '''Register a reduce function for a type.''' 46 cls._extra_reducers[type] = reduce 47 48 @classmethod 49 def dumps(cls, obj, protocol=None): 50 buf = io.BytesIO() 51 cls(buf, protocol).dump(obj) 52 return buf.getbuffer() 53 54 loads = pickle.loads 55 56register = ForkingPickler.register 57 58def dump(obj, file, protocol=None): 59 '''Replacement for pickle.dump() using ForkingPickler.''' 60 ForkingPickler(file, protocol).dump(obj) 61 62# 63# Platform specific definitions 64# 65 66if sys.platform == 'win32': 67 # Windows 68 __all__ += ['DupHandle', 'duplicate', 'steal_handle'] 69 import _winapi 70 71 def duplicate(handle, target_process=None, inheritable=False): 72 '''Duplicate a handle. (target_process is a handle not a pid!)''' 73 if target_process is None: 74 target_process = _winapi.GetCurrentProcess() 75 return _winapi.DuplicateHandle( 76 _winapi.GetCurrentProcess(), handle, target_process, 77 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS) 78 79 def steal_handle(source_pid, handle): 80 '''Steal a handle from process identified by source_pid.''' 81 source_process_handle = _winapi.OpenProcess( 82 _winapi.PROCESS_DUP_HANDLE, False, source_pid) 83 try: 84 return _winapi.DuplicateHandle( 85 source_process_handle, handle, 86 _winapi.GetCurrentProcess(), 0, False, 87 _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE) 88 finally: 89 _winapi.CloseHandle(source_process_handle) 90 91 def send_handle(conn, handle, destination_pid): 92 '''Send a handle over a local connection.''' 93 dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid) 94 conn.send(dh) 95 96 def recv_handle(conn): 97 '''Receive a handle over a local connection.''' 98 return conn.recv().detach() 99 100 class DupHandle(object): 101 '''Picklable wrapper for a handle.''' 102 def __init__(self, handle, access, pid=None): 103 if pid is None: 104 # We just duplicate the handle in the current process and 105 # let the receiving process steal the handle. 106 pid = os.getpid() 107 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) 108 try: 109 self._handle = _winapi.DuplicateHandle( 110 _winapi.GetCurrentProcess(), 111 handle, proc, access, False, 0) 112 finally: 113 _winapi.CloseHandle(proc) 114 self._access = access 115 self._pid = pid 116 117 def detach(self): 118 '''Get the handle. This should only be called once.''' 119 # retrieve handle from process which currently owns it 120 if self._pid == os.getpid(): 121 # The handle has already been duplicated for this process. 122 return self._handle 123 # We must steal the handle from the process whose pid is self._pid. 124 proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, 125 self._pid) 126 try: 127 return _winapi.DuplicateHandle( 128 proc, self._handle, _winapi.GetCurrentProcess(), 129 self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE) 130 finally: 131 _winapi.CloseHandle(proc) 132 133else: 134 # Unix 135 __all__ += ['DupFd', 'sendfds', 'recvfds'] 136 import array 137 138 # On MacOSX we should acknowledge receipt of fds -- see Issue14669 139 ACKNOWLEDGE = sys.platform == 'darwin' 140 141 def sendfds(sock, fds): 142 '''Send an array of fds over an AF_UNIX socket.''' 143 fds = array.array('i', fds) 144 msg = bytes([len(fds) % 256]) 145 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 146 if ACKNOWLEDGE and sock.recv(1) != b'A': 147 raise RuntimeError('did not receive acknowledgement of fd') 148 149 def recvfds(sock, size): 150 '''Receive an array of fds over an AF_UNIX socket.''' 151 a = array.array('i') 152 bytes_size = a.itemsize * size 153 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size)) 154 if not msg and not ancdata: 155 raise EOFError 156 try: 157 if ACKNOWLEDGE: 158 sock.send(b'A') 159 if len(ancdata) != 1: 160 raise RuntimeError('received %d items of ancdata' % 161 len(ancdata)) 162 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 163 if (cmsg_level == socket.SOL_SOCKET and 164 cmsg_type == socket.SCM_RIGHTS): 165 if len(cmsg_data) % a.itemsize != 0: 166 raise ValueError 167 a.frombytes(cmsg_data) 168 if len(a) % 256 != msg[0]: 169 raise AssertionError( 170 "Len is {0:n} but msg[0] is {1!r}".format( 171 len(a), msg[0])) 172 return list(a) 173 except (ValueError, IndexError): 174 pass 175 raise RuntimeError('Invalid data received') 176 177 def send_handle(conn, handle, destination_pid): 178 '''Send a handle over a local connection.''' 179 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 180 sendfds(s, [handle]) 181 182 def recv_handle(conn): 183 '''Receive a handle over a local connection.''' 184 with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: 185 return recvfds(s, 1)[0] 186 187 def DupFd(fd): 188 '''Return a wrapper for an fd.''' 189 popen_obj = context.get_spawning_popen() 190 if popen_obj is not None: 191 return popen_obj.DupFd(popen_obj.duplicate_for_child(fd)) 192 elif HAVE_SEND_HANDLE: 193 from . import resource_sharer 194 return resource_sharer.DupFd(fd) 195 else: 196 raise ValueError('SCM_RIGHTS appears not to be available') 197 198# 199# Try making some callable types picklable 200# 201 202def _reduce_method(m): 203 if m.__self__ is None: 204 return getattr, (m.__class__, m.__func__.__name__) 205 else: 206 return getattr, (m.__self__, m.__func__.__name__) 207class _C: 208 def f(self): 209 pass 210register(type(_C().f), _reduce_method) 211 212 213def _reduce_method_descriptor(m): 214 return getattr, (m.__objclass__, m.__name__) 215register(type(list.append), _reduce_method_descriptor) 216register(type(int.__add__), _reduce_method_descriptor) 217 218 219def _reduce_partial(p): 220 return _rebuild_partial, (p.func, p.args, p.keywords or {}) 221def _rebuild_partial(func, args, keywords): 222 return functools.partial(func, *args, **keywords) 223register(functools.partial, _reduce_partial) 224 225# 226# Make sockets picklable 227# 228 229if sys.platform == 'win32': 230 def _reduce_socket(s): 231 from .resource_sharer import DupSocket 232 return _rebuild_socket, (DupSocket(s),) 233 def _rebuild_socket(ds): 234 return ds.detach() 235 register(socket.socket, _reduce_socket) 236 237else: 238 def _reduce_socket(s): 239 df = DupFd(s.fileno()) 240 return _rebuild_socket, (df, s.family, s.type, s.proto) 241 def _rebuild_socket(df, family, type, proto): 242 fd = df.detach() 243 return socket.socket(family, type, proto, fileno=fd) 244 register(socket.socket, _reduce_socket) 245 246 247class AbstractReducer(metaclass=ABCMeta): 248 '''Abstract base class for use in implementing a Reduction class 249 suitable for use in replacing the standard reduction mechanism 250 used in multiprocessing.''' 251 ForkingPickler = ForkingPickler 252 register = register 253 dump = dump 254 send_handle = send_handle 255 recv_handle = recv_handle 256 257 if sys.platform == 'win32': 258 steal_handle = steal_handle 259 duplicate = duplicate 260 DupHandle = DupHandle 261 else: 262 sendfds = sendfds 263 recvfds = recvfds 264 DupFd = DupFd 265 266 _reduce_method = _reduce_method 267 _reduce_method_descriptor = _reduce_method_descriptor 268 _rebuild_partial = _rebuild_partial 269 _reduce_socket = _reduce_socket 270 _rebuild_socket = _rebuild_socket 271 272 def __init__(self, *args): 273 register(type(_C().f), _reduce_method) 274 register(type(list.append), _reduce_method_descriptor) 275 register(type(int.__add__), _reduce_method_descriptor) 276 register(functools.partial, _reduce_partial) 277 register(socket.socket, _reduce_socket) 278