1#
2# Module which deals with pickling of objects.
3#
4# multiprocessing/reduction.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10from abc import ABCMeta
11import copyreg
12import functools
13import io
14import os
15import pickle
16import socket
17import sys
18
19from . import context
20
21__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
22
23
24HAVE_SEND_HANDLE = (sys.platform == 'win32' or
25                    (hasattr(socket, 'CMSG_LEN') and
26                     hasattr(socket, 'SCM_RIGHTS') and
27                     hasattr(socket.socket, 'sendmsg')))
28
29#
30# Pickler subclass
31#
32
33class ForkingPickler(pickle.Pickler):
34    '''Pickler subclass used by multiprocessing.'''
35    _extra_reducers = {}
36    _copyreg_dispatch_table = copyreg.dispatch_table
37
38    def __init__(self, *args):
39        super().__init__(*args)
40        self.dispatch_table = self._copyreg_dispatch_table.copy()
41        self.dispatch_table.update(self._extra_reducers)
42
43    @classmethod
44    def register(cls, type, reduce):
45        '''Register a reduce function for a type.'''
46        cls._extra_reducers[type] = reduce
47
48    @classmethod
49    def dumps(cls, obj, protocol=None):
50        buf = io.BytesIO()
51        cls(buf, protocol).dump(obj)
52        return buf.getbuffer()
53
54    loads = pickle.loads
55
56register = ForkingPickler.register
57
58def dump(obj, file, protocol=None):
59    '''Replacement for pickle.dump() using ForkingPickler.'''
60    ForkingPickler(file, protocol).dump(obj)
61
62#
63# Platform specific definitions
64#
65
66if sys.platform == 'win32':
67    # Windows
68    __all__ += ['DupHandle', 'duplicate', 'steal_handle']
69    import _winapi
70
71    def duplicate(handle, target_process=None, inheritable=False):
72        '''Duplicate a handle.  (target_process is a handle not a pid!)'''
73        if target_process is None:
74            target_process = _winapi.GetCurrentProcess()
75        return _winapi.DuplicateHandle(
76            _winapi.GetCurrentProcess(), handle, target_process,
77            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
78
79    def steal_handle(source_pid, handle):
80        '''Steal a handle from process identified by source_pid.'''
81        source_process_handle = _winapi.OpenProcess(
82            _winapi.PROCESS_DUP_HANDLE, False, source_pid)
83        try:
84            return _winapi.DuplicateHandle(
85                source_process_handle, handle,
86                _winapi.GetCurrentProcess(), 0, False,
87                _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
88        finally:
89            _winapi.CloseHandle(source_process_handle)
90
91    def send_handle(conn, handle, destination_pid):
92        '''Send a handle over a local connection.'''
93        dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
94        conn.send(dh)
95
96    def recv_handle(conn):
97        '''Receive a handle over a local connection.'''
98        return conn.recv().detach()
99
100    class DupHandle(object):
101        '''Picklable wrapper for a handle.'''
102        def __init__(self, handle, access, pid=None):
103            if pid is None:
104                # We just duplicate the handle in the current process and
105                # let the receiving process steal the handle.
106                pid = os.getpid()
107            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
108            try:
109                self._handle = _winapi.DuplicateHandle(
110                    _winapi.GetCurrentProcess(),
111                    handle, proc, access, False, 0)
112            finally:
113                _winapi.CloseHandle(proc)
114            self._access = access
115            self._pid = pid
116
117        def detach(self):
118            '''Get the handle.  This should only be called once.'''
119            # retrieve handle from process which currently owns it
120            if self._pid == os.getpid():
121                # The handle has already been duplicated for this process.
122                return self._handle
123            # We must steal the handle from the process whose pid is self._pid.
124            proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
125                                       self._pid)
126            try:
127                return _winapi.DuplicateHandle(
128                    proc, self._handle, _winapi.GetCurrentProcess(),
129                    self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)
130            finally:
131                _winapi.CloseHandle(proc)
132
133else:
134    # Unix
135    __all__ += ['DupFd', 'sendfds', 'recvfds']
136    import array
137
138    # On MacOSX we should acknowledge receipt of fds -- see Issue14669
139    ACKNOWLEDGE = sys.platform == 'darwin'
140
141    def sendfds(sock, fds):
142        '''Send an array of fds over an AF_UNIX socket.'''
143        fds = array.array('i', fds)
144        msg = bytes([len(fds) % 256])
145        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
146        if ACKNOWLEDGE and sock.recv(1) != b'A':
147            raise RuntimeError('did not receive acknowledgement of fd')
148
149    def recvfds(sock, size):
150        '''Receive an array of fds over an AF_UNIX socket.'''
151        a = array.array('i')
152        bytes_size = a.itemsize * size
153        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_SPACE(bytes_size))
154        if not msg and not ancdata:
155            raise EOFError
156        try:
157            if ACKNOWLEDGE:
158                sock.send(b'A')
159            if len(ancdata) != 1:
160                raise RuntimeError('received %d items of ancdata' %
161                                   len(ancdata))
162            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
163            if (cmsg_level == socket.SOL_SOCKET and
164                cmsg_type == socket.SCM_RIGHTS):
165                if len(cmsg_data) % a.itemsize != 0:
166                    raise ValueError
167                a.frombytes(cmsg_data)
168                if len(a) % 256 != msg[0]:
169                    raise AssertionError(
170                        "Len is {0:n} but msg[0] is {1!r}".format(
171                            len(a), msg[0]))
172                return list(a)
173        except (ValueError, IndexError):
174            pass
175        raise RuntimeError('Invalid data received')
176
177    def send_handle(conn, handle, destination_pid):
178        '''Send a handle over a local connection.'''
179        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
180            sendfds(s, [handle])
181
182    def recv_handle(conn):
183        '''Receive a handle over a local connection.'''
184        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
185            return recvfds(s, 1)[0]
186
187    def DupFd(fd):
188        '''Return a wrapper for an fd.'''
189        popen_obj = context.get_spawning_popen()
190        if popen_obj is not None:
191            return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
192        elif HAVE_SEND_HANDLE:
193            from . import resource_sharer
194            return resource_sharer.DupFd(fd)
195        else:
196            raise ValueError('SCM_RIGHTS appears not to be available')
197
198#
199# Try making some callable types picklable
200#
201
202def _reduce_method(m):
203    if m.__self__ is None:
204        return getattr, (m.__class__, m.__func__.__name__)
205    else:
206        return getattr, (m.__self__, m.__func__.__name__)
207class _C:
208    def f(self):
209        pass
210register(type(_C().f), _reduce_method)
211
212
213def _reduce_method_descriptor(m):
214    return getattr, (m.__objclass__, m.__name__)
215register(type(list.append), _reduce_method_descriptor)
216register(type(int.__add__), _reduce_method_descriptor)
217
218
219def _reduce_partial(p):
220    return _rebuild_partial, (p.func, p.args, p.keywords or {})
221def _rebuild_partial(func, args, keywords):
222    return functools.partial(func, *args, **keywords)
223register(functools.partial, _reduce_partial)
224
225#
226# Make sockets picklable
227#
228
229if sys.platform == 'win32':
230    def _reduce_socket(s):
231        from .resource_sharer import DupSocket
232        return _rebuild_socket, (DupSocket(s),)
233    def _rebuild_socket(ds):
234        return ds.detach()
235    register(socket.socket, _reduce_socket)
236
237else:
238    def _reduce_socket(s):
239        df = DupFd(s.fileno())
240        return _rebuild_socket, (df, s.family, s.type, s.proto)
241    def _rebuild_socket(df, family, type, proto):
242        fd = df.detach()
243        return socket.socket(family, type, proto, fileno=fd)
244    register(socket.socket, _reduce_socket)
245
246
247class AbstractReducer(metaclass=ABCMeta):
248    '''Abstract base class for use in implementing a Reduction class
249    suitable for use in replacing the standard reduction mechanism
250    used in multiprocessing.'''
251    ForkingPickler = ForkingPickler
252    register = register
253    dump = dump
254    send_handle = send_handle
255    recv_handle = recv_handle
256
257    if sys.platform == 'win32':
258        steal_handle = steal_handle
259        duplicate = duplicate
260        DupHandle = DupHandle
261    else:
262        sendfds = sendfds
263        recvfds = recvfds
264        DupFd = DupFd
265
266    _reduce_method = _reduce_method
267    _reduce_method_descriptor = _reduce_method_descriptor
268    _rebuild_partial = _rebuild_partial
269    _reduce_socket = _reduce_socket
270    _rebuild_socket = _rebuild_socket
271
272    def __init__(self, *args):
273        register(type(_C().f), _reduce_method)
274        register(type(list.append), _reduce_method_descriptor)
275        register(type(int.__add__), _reduce_method_descriptor)
276        register(functools.partial, _reduce_partial)
277        register(socket.socket, _reduce_socket)
278