1#
2# Module providing manager classes for dealing
3# with shared objects
4#
5# multiprocessing/managers.py
6#
7# Copyright (c) 2006-2008, R Oudkerk
8# Licensed to PSF under a Contributor Agreement.
9#
10
11__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token',
12            'SharedMemoryManager' ]
13
14#
15# Imports
16#
17
18import sys
19import threading
20import signal
21import array
22import queue
23import time
24import types
25import os
26from os import getpid
27
28from traceback import format_exc
29
30from . import connection
31from .context import reduction, get_spawning_popen, ProcessError
32from . import pool
33from . import process
34from . import util
35from . import get_context
36try:
37    from . import shared_memory
38    HAS_SHMEM = True
39except ImportError:
40    HAS_SHMEM = False
41
42#
43# Register some things for pickling
44#
45
46def reduce_array(a):
47    return array.array, (a.typecode, a.tobytes())
48reduction.register(array.array, reduce_array)
49
50view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
51if view_types[0] is not list:       # only needed in Py3.0
52    def rebuild_as_list(obj):
53        return list, (list(obj),)
54    for view_type in view_types:
55        reduction.register(view_type, rebuild_as_list)
56
57#
58# Type for identifying shared objects
59#
60
61class Token(object):
62    '''
63    Type to uniquely identify a shared object
64    '''
65    __slots__ = ('typeid', 'address', 'id')
66
67    def __init__(self, typeid, address, id):
68        (self.typeid, self.address, self.id) = (typeid, address, id)
69
70    def __getstate__(self):
71        return (self.typeid, self.address, self.id)
72
73    def __setstate__(self, state):
74        (self.typeid, self.address, self.id) = state
75
76    def __repr__(self):
77        return '%s(typeid=%r, address=%r, id=%r)' % \
78               (self.__class__.__name__, self.typeid, self.address, self.id)
79
80#
81# Function for communication with a manager's server process
82#
83
84def dispatch(c, id, methodname, args=(), kwds={}):
85    '''
86    Send a message to manager using connection `c` and return response
87    '''
88    c.send((id, methodname, args, kwds))
89    kind, result = c.recv()
90    if kind == '#RETURN':
91        return result
92    raise convert_to_error(kind, result)
93
94def convert_to_error(kind, result):
95    if kind == '#ERROR':
96        return result
97    elif kind in ('#TRACEBACK', '#UNSERIALIZABLE'):
98        if not isinstance(result, str):
99            raise TypeError(
100                "Result {0!r} (kind '{1}') type is {2}, not str".format(
101                    result, kind, type(result)))
102        if kind == '#UNSERIALIZABLE':
103            return RemoteError('Unserializable message: %s\n' % result)
104        else:
105            return RemoteError(result)
106    else:
107        return ValueError('Unrecognized message type {!r}'.format(kind))
108
109class RemoteError(Exception):
110    def __str__(self):
111        return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75)
112
113#
114# Functions for finding the method names of an object
115#
116
117def all_methods(obj):
118    '''
119    Return a list of names of methods of `obj`
120    '''
121    temp = []
122    for name in dir(obj):
123        func = getattr(obj, name)
124        if callable(func):
125            temp.append(name)
126    return temp
127
128def public_methods(obj):
129    '''
130    Return a list of names of methods of `obj` which do not start with '_'
131    '''
132    return [name for name in all_methods(obj) if name[0] != '_']
133
134#
135# Server which is run in a process controlled by a manager
136#
137
138class Server(object):
139    '''
140    Server class which runs in a process controlled by a manager object
141    '''
142    public = ['shutdown', 'create', 'accept_connection', 'get_methods',
143              'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref']
144
145    def __init__(self, registry, address, authkey, serializer):
146        if not isinstance(authkey, bytes):
147            raise TypeError(
148                "Authkey {0!r} is type {1!s}, not bytes".format(
149                    authkey, type(authkey)))
150        self.registry = registry
151        self.authkey = process.AuthenticationString(authkey)
152        Listener, Client = listener_client[serializer]
153
154        # do authentication later
155        self.listener = Listener(address=address, backlog=16)
156        self.address = self.listener.address
157
158        self.id_to_obj = {'0': (None, ())}
159        self.id_to_refcount = {}
160        self.id_to_local_proxy_obj = {}
161        self.mutex = threading.Lock()
162
163    def serve_forever(self):
164        '''
165        Run the server forever
166        '''
167        self.stop_event = threading.Event()
168        process.current_process()._manager_server = self
169        try:
170            accepter = threading.Thread(target=self.accepter)
171            accepter.daemon = True
172            accepter.start()
173            try:
174                while not self.stop_event.is_set():
175                    self.stop_event.wait(1)
176            except (KeyboardInterrupt, SystemExit):
177                pass
178        finally:
179            if sys.stdout != sys.__stdout__: # what about stderr?
180                util.debug('resetting stdout, stderr')
181                sys.stdout = sys.__stdout__
182                sys.stderr = sys.__stderr__
183            sys.exit(0)
184
185    def accepter(self):
186        while True:
187            try:
188                c = self.listener.accept()
189            except OSError:
190                continue
191            t = threading.Thread(target=self.handle_request, args=(c,))
192            t.daemon = True
193            t.start()
194
195    def handle_request(self, c):
196        '''
197        Handle a new connection
198        '''
199        funcname = result = request = None
200        try:
201            connection.deliver_challenge(c, self.authkey)
202            connection.answer_challenge(c, self.authkey)
203            request = c.recv()
204            ignore, funcname, args, kwds = request
205            assert funcname in self.public, '%r unrecognized' % funcname
206            func = getattr(self, funcname)
207        except Exception:
208            msg = ('#TRACEBACK', format_exc())
209        else:
210            try:
211                result = func(c, *args, **kwds)
212            except Exception:
213                msg = ('#TRACEBACK', format_exc())
214            else:
215                msg = ('#RETURN', result)
216        try:
217            c.send(msg)
218        except Exception as e:
219            try:
220                c.send(('#TRACEBACK', format_exc()))
221            except Exception:
222                pass
223            util.info('Failure to send message: %r', msg)
224            util.info(' ... request was %r', request)
225            util.info(' ... exception was %r', e)
226
227        c.close()
228
229    def serve_client(self, conn):
230        '''
231        Handle requests from the proxies in a particular process/thread
232        '''
233        util.debug('starting server thread to service %r',
234                   threading.current_thread().name)
235
236        recv = conn.recv
237        send = conn.send
238        id_to_obj = self.id_to_obj
239
240        while not self.stop_event.is_set():
241
242            try:
243                methodname = obj = None
244                request = recv()
245                ident, methodname, args, kwds = request
246                try:
247                    obj, exposed, gettypeid = id_to_obj[ident]
248                except KeyError as ke:
249                    try:
250                        obj, exposed, gettypeid = \
251                            self.id_to_local_proxy_obj[ident]
252                    except KeyError:
253                        raise ke
254
255                if methodname not in exposed:
256                    raise AttributeError(
257                        'method %r of %r object is not in exposed=%r' %
258                        (methodname, type(obj), exposed)
259                        )
260
261                function = getattr(obj, methodname)
262
263                try:
264                    res = function(*args, **kwds)
265                except Exception as e:
266                    msg = ('#ERROR', e)
267                else:
268                    typeid = gettypeid and gettypeid.get(methodname, None)
269                    if typeid:
270                        rident, rexposed = self.create(conn, typeid, res)
271                        token = Token(typeid, self.address, rident)
272                        msg = ('#PROXY', (rexposed, token))
273                    else:
274                        msg = ('#RETURN', res)
275
276            except AttributeError:
277                if methodname is None:
278                    msg = ('#TRACEBACK', format_exc())
279                else:
280                    try:
281                        fallback_func = self.fallback_mapping[methodname]
282                        result = fallback_func(
283                            self, conn, ident, obj, *args, **kwds
284                            )
285                        msg = ('#RETURN', result)
286                    except Exception:
287                        msg = ('#TRACEBACK', format_exc())
288
289            except EOFError:
290                util.debug('got EOF -- exiting thread serving %r',
291                           threading.current_thread().name)
292                sys.exit(0)
293
294            except Exception:
295                msg = ('#TRACEBACK', format_exc())
296
297            try:
298                try:
299                    send(msg)
300                except Exception:
301                    send(('#UNSERIALIZABLE', format_exc()))
302            except Exception as e:
303                util.info('exception in thread serving %r',
304                        threading.current_thread().name)
305                util.info(' ... message was %r', msg)
306                util.info(' ... exception was %r', e)
307                conn.close()
308                sys.exit(1)
309
310    def fallback_getvalue(self, conn, ident, obj):
311        return obj
312
313    def fallback_str(self, conn, ident, obj):
314        return str(obj)
315
316    def fallback_repr(self, conn, ident, obj):
317        return repr(obj)
318
319    fallback_mapping = {
320        '__str__':fallback_str,
321        '__repr__':fallback_repr,
322        '#GETVALUE':fallback_getvalue
323        }
324
325    def dummy(self, c):
326        pass
327
328    def debug_info(self, c):
329        '''
330        Return some info --- useful to spot problems with refcounting
331        '''
332        # Perhaps include debug info about 'c'?
333        with self.mutex:
334            result = []
335            keys = list(self.id_to_refcount.keys())
336            keys.sort()
337            for ident in keys:
338                if ident != '0':
339                    result.append('  %s:       refcount=%s\n    %s' %
340                                  (ident, self.id_to_refcount[ident],
341                                   str(self.id_to_obj[ident][0])[:75]))
342            return '\n'.join(result)
343
344    def number_of_objects(self, c):
345        '''
346        Number of shared objects
347        '''
348        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
349        return len(self.id_to_refcount)
350
351    def shutdown(self, c):
352        '''
353        Shutdown this process
354        '''
355        try:
356            util.debug('manager received shutdown message')
357            c.send(('#RETURN', None))
358        except:
359            import traceback
360            traceback.print_exc()
361        finally:
362            self.stop_event.set()
363
364    def create(self, c, typeid, /, *args, **kwds):
365        '''
366        Create a new shared object and return its id
367        '''
368        with self.mutex:
369            callable, exposed, method_to_typeid, proxytype = \
370                      self.registry[typeid]
371
372            if callable is None:
373                if kwds or (len(args) != 1):
374                    raise ValueError(
375                        "Without callable, must have one non-keyword argument")
376                obj = args[0]
377            else:
378                obj = callable(*args, **kwds)
379
380            if exposed is None:
381                exposed = public_methods(obj)
382            if method_to_typeid is not None:
383                if not isinstance(method_to_typeid, dict):
384                    raise TypeError(
385                        "Method_to_typeid {0!r}: type {1!s}, not dict".format(
386                            method_to_typeid, type(method_to_typeid)))
387                exposed = list(exposed) + list(method_to_typeid)
388
389            ident = '%x' % id(obj)  # convert to string because xmlrpclib
390                                    # only has 32 bit signed integers
391            util.debug('%r callable returned object with id %r', typeid, ident)
392
393            self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
394            if ident not in self.id_to_refcount:
395                self.id_to_refcount[ident] = 0
396
397        self.incref(c, ident)
398        return ident, tuple(exposed)
399
400    def get_methods(self, c, token):
401        '''
402        Return the methods of the shared object indicated by token
403        '''
404        return tuple(self.id_to_obj[token.id][1])
405
406    def accept_connection(self, c, name):
407        '''
408        Spawn a new thread to serve this connection
409        '''
410        threading.current_thread().name = name
411        c.send(('#RETURN', None))
412        self.serve_client(c)
413
414    def incref(self, c, ident):
415        with self.mutex:
416            try:
417                self.id_to_refcount[ident] += 1
418            except KeyError as ke:
419                # If no external references exist but an internal (to the
420                # manager) still does and a new external reference is created
421                # from it, restore the manager's tracking of it from the
422                # previously stashed internal ref.
423                if ident in self.id_to_local_proxy_obj:
424                    self.id_to_refcount[ident] = 1
425                    self.id_to_obj[ident] = \
426                        self.id_to_local_proxy_obj[ident]
427                    obj, exposed, gettypeid = self.id_to_obj[ident]
428                    util.debug('Server re-enabled tracking & INCREF %r', ident)
429                else:
430                    raise ke
431
432    def decref(self, c, ident):
433        if ident not in self.id_to_refcount and \
434            ident in self.id_to_local_proxy_obj:
435            util.debug('Server DECREF skipping %r', ident)
436            return
437
438        with self.mutex:
439            if self.id_to_refcount[ident] <= 0:
440                raise AssertionError(
441                    "Id {0!s} ({1!r}) has refcount {2:n}, not 1+".format(
442                        ident, self.id_to_obj[ident],
443                        self.id_to_refcount[ident]))
444            self.id_to_refcount[ident] -= 1
445            if self.id_to_refcount[ident] == 0:
446                del self.id_to_refcount[ident]
447
448        if ident not in self.id_to_refcount:
449            # Two-step process in case the object turns out to contain other
450            # proxy objects (e.g. a managed list of managed lists).
451            # Otherwise, deleting self.id_to_obj[ident] would trigger the
452            # deleting of the stored value (another managed object) which would
453            # in turn attempt to acquire the mutex that is already held here.
454            self.id_to_obj[ident] = (None, (), None)  # thread-safe
455            util.debug('disposing of obj with id %r', ident)
456            with self.mutex:
457                del self.id_to_obj[ident]
458
459
460#
461# Class to represent state of a manager
462#
463
464class State(object):
465    __slots__ = ['value']
466    INITIAL = 0
467    STARTED = 1
468    SHUTDOWN = 2
469
470#
471# Mapping from serializer name to Listener and Client types
472#
473
474listener_client = {
475    'pickle' : (connection.Listener, connection.Client),
476    'xmlrpclib' : (connection.XmlListener, connection.XmlClient)
477    }
478
479#
480# Definition of BaseManager
481#
482
483class BaseManager(object):
484    '''
485    Base class for managers
486    '''
487    _registry = {}
488    _Server = Server
489
490    def __init__(self, address=None, authkey=None, serializer='pickle',
491                 ctx=None):
492        if authkey is None:
493            authkey = process.current_process().authkey
494        self._address = address     # XXX not final address if eg ('', 0)
495        self._authkey = process.AuthenticationString(authkey)
496        self._state = State()
497        self._state.value = State.INITIAL
498        self._serializer = serializer
499        self._Listener, self._Client = listener_client[serializer]
500        self._ctx = ctx or get_context()
501
502    def get_server(self):
503        '''
504        Return server object with serve_forever() method and address attribute
505        '''
506        if self._state.value != State.INITIAL:
507            if self._state.value == State.STARTED:
508                raise ProcessError("Already started server")
509            elif self._state.value == State.SHUTDOWN:
510                raise ProcessError("Manager has shut down")
511            else:
512                raise ProcessError(
513                    "Unknown state {!r}".format(self._state.value))
514        return Server(self._registry, self._address,
515                      self._authkey, self._serializer)
516
517    def connect(self):
518        '''
519        Connect manager object to the server process
520        '''
521        Listener, Client = listener_client[self._serializer]
522        conn = Client(self._address, authkey=self._authkey)
523        dispatch(conn, None, 'dummy')
524        self._state.value = State.STARTED
525
526    def start(self, initializer=None, initargs=()):
527        '''
528        Spawn a server process for this manager object
529        '''
530        if self._state.value != State.INITIAL:
531            if self._state.value == State.STARTED:
532                raise ProcessError("Already started server")
533            elif self._state.value == State.SHUTDOWN:
534                raise ProcessError("Manager has shut down")
535            else:
536                raise ProcessError(
537                    "Unknown state {!r}".format(self._state.value))
538
539        if initializer is not None and not callable(initializer):
540            raise TypeError('initializer must be a callable')
541
542        # pipe over which we will retrieve address of server
543        reader, writer = connection.Pipe(duplex=False)
544
545        # spawn process which runs a server
546        self._process = self._ctx.Process(
547            target=type(self)._run_server,
548            args=(self._registry, self._address, self._authkey,
549                  self._serializer, writer, initializer, initargs),
550            )
551        ident = ':'.join(str(i) for i in self._process._identity)
552        self._process.name = type(self).__name__  + '-' + ident
553        self._process.start()
554
555        # get address of server
556        writer.close()
557        self._address = reader.recv()
558        reader.close()
559
560        # register a finalizer
561        self._state.value = State.STARTED
562        self.shutdown = util.Finalize(
563            self, type(self)._finalize_manager,
564            args=(self._process, self._address, self._authkey,
565                  self._state, self._Client),
566            exitpriority=0
567            )
568
569    @classmethod
570    def _run_server(cls, registry, address, authkey, serializer, writer,
571                    initializer=None, initargs=()):
572        '''
573        Create a server, report its address and run it
574        '''
575        # bpo-36368: protect server process from KeyboardInterrupt signals
576        signal.signal(signal.SIGINT, signal.SIG_IGN)
577
578        if initializer is not None:
579            initializer(*initargs)
580
581        # create server
582        server = cls._Server(registry, address, authkey, serializer)
583
584        # inform parent process of the server's address
585        writer.send(server.address)
586        writer.close()
587
588        # run the manager
589        util.info('manager serving at %r', server.address)
590        server.serve_forever()
591
592    def _create(self, typeid, /, *args, **kwds):
593        '''
594        Create a new shared object; return the token and exposed tuple
595        '''
596        assert self._state.value == State.STARTED, 'server not yet started'
597        conn = self._Client(self._address, authkey=self._authkey)
598        try:
599            id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
600        finally:
601            conn.close()
602        return Token(typeid, self._address, id), exposed
603
604    def join(self, timeout=None):
605        '''
606        Join the manager process (if it has been spawned)
607        '''
608        if self._process is not None:
609            self._process.join(timeout)
610            if not self._process.is_alive():
611                self._process = None
612
613    def _debug_info(self):
614        '''
615        Return some info about the servers shared objects and connections
616        '''
617        conn = self._Client(self._address, authkey=self._authkey)
618        try:
619            return dispatch(conn, None, 'debug_info')
620        finally:
621            conn.close()
622
623    def _number_of_objects(self):
624        '''
625        Return the number of shared objects
626        '''
627        conn = self._Client(self._address, authkey=self._authkey)
628        try:
629            return dispatch(conn, None, 'number_of_objects')
630        finally:
631            conn.close()
632
633    def __enter__(self):
634        if self._state.value == State.INITIAL:
635            self.start()
636        if self._state.value != State.STARTED:
637            if self._state.value == State.INITIAL:
638                raise ProcessError("Unable to start server")
639            elif self._state.value == State.SHUTDOWN:
640                raise ProcessError("Manager has shut down")
641            else:
642                raise ProcessError(
643                    "Unknown state {!r}".format(self._state.value))
644        return self
645
646    def __exit__(self, exc_type, exc_val, exc_tb):
647        self.shutdown()
648
649    @staticmethod
650    def _finalize_manager(process, address, authkey, state, _Client):
651        '''
652        Shutdown the manager process; will be registered as a finalizer
653        '''
654        if process.is_alive():
655            util.info('sending shutdown message to manager')
656            try:
657                conn = _Client(address, authkey=authkey)
658                try:
659                    dispatch(conn, None, 'shutdown')
660                finally:
661                    conn.close()
662            except Exception:
663                pass
664
665            process.join(timeout=1.0)
666            if process.is_alive():
667                util.info('manager still alive')
668                if hasattr(process, 'terminate'):
669                    util.info('trying to `terminate()` manager process')
670                    process.terminate()
671                    process.join(timeout=0.1)
672                    if process.is_alive():
673                        util.info('manager still alive after terminate')
674
675        state.value = State.SHUTDOWN
676        try:
677            del BaseProxy._address_to_local[address]
678        except KeyError:
679            pass
680
681    @property
682    def address(self):
683        return self._address
684
685    @classmethod
686    def register(cls, typeid, callable=None, proxytype=None, exposed=None,
687                 method_to_typeid=None, create_method=True):
688        '''
689        Register a typeid with the manager type
690        '''
691        if '_registry' not in cls.__dict__:
692            cls._registry = cls._registry.copy()
693
694        if proxytype is None:
695            proxytype = AutoProxy
696
697        exposed = exposed or getattr(proxytype, '_exposed_', None)
698
699        method_to_typeid = method_to_typeid or \
700                           getattr(proxytype, '_method_to_typeid_', None)
701
702        if method_to_typeid:
703            for key, value in list(method_to_typeid.items()): # isinstance?
704                assert type(key) is str, '%r is not a string' % key
705                assert type(value) is str, '%r is not a string' % value
706
707        cls._registry[typeid] = (
708            callable, exposed, method_to_typeid, proxytype
709            )
710
711        if create_method:
712            def temp(self, /, *args, **kwds):
713                util.debug('requesting creation of a shared %r object', typeid)
714                token, exp = self._create(typeid, *args, **kwds)
715                proxy = proxytype(
716                    token, self._serializer, manager=self,
717                    authkey=self._authkey, exposed=exp
718                    )
719                conn = self._Client(token.address, authkey=self._authkey)
720                dispatch(conn, None, 'decref', (token.id,))
721                return proxy
722            temp.__name__ = typeid
723            setattr(cls, typeid, temp)
724
725#
726# Subclass of set which get cleared after a fork
727#
728
729class ProcessLocalSet(set):
730    def __init__(self):
731        util.register_after_fork(self, lambda obj: obj.clear())
732    def __reduce__(self):
733        return type(self), ()
734
735#
736# Definition of BaseProxy
737#
738
739class BaseProxy(object):
740    '''
741    A base for proxies of shared objects
742    '''
743    _address_to_local = {}
744    _mutex = util.ForkAwareThreadLock()
745
746    def __init__(self, token, serializer, manager=None,
747                 authkey=None, exposed=None, incref=True, manager_owned=False):
748        with BaseProxy._mutex:
749            tls_idset = BaseProxy._address_to_local.get(token.address, None)
750            if tls_idset is None:
751                tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
752                BaseProxy._address_to_local[token.address] = tls_idset
753
754        # self._tls is used to record the connection used by this
755        # thread to communicate with the manager at token.address
756        self._tls = tls_idset[0]
757
758        # self._idset is used to record the identities of all shared
759        # objects for which the current process owns references and
760        # which are in the manager at token.address
761        self._idset = tls_idset[1]
762
763        self._token = token
764        self._id = self._token.id
765        self._manager = manager
766        self._serializer = serializer
767        self._Client = listener_client[serializer][1]
768
769        # Should be set to True only when a proxy object is being created
770        # on the manager server; primary use case: nested proxy objects.
771        # RebuildProxy detects when a proxy is being created on the manager
772        # and sets this value appropriately.
773        self._owned_by_manager = manager_owned
774
775        if authkey is not None:
776            self._authkey = process.AuthenticationString(authkey)
777        elif self._manager is not None:
778            self._authkey = self._manager._authkey
779        else:
780            self._authkey = process.current_process().authkey
781
782        if incref:
783            self._incref()
784
785        util.register_after_fork(self, BaseProxy._after_fork)
786
787    def _connect(self):
788        util.debug('making connection to manager')
789        name = process.current_process().name
790        if threading.current_thread().name != 'MainThread':
791            name += '|' + threading.current_thread().name
792        conn = self._Client(self._token.address, authkey=self._authkey)
793        dispatch(conn, None, 'accept_connection', (name,))
794        self._tls.connection = conn
795
796    def _callmethod(self, methodname, args=(), kwds={}):
797        '''
798        Try to call a method of the referent and return a copy of the result
799        '''
800        try:
801            conn = self._tls.connection
802        except AttributeError:
803            util.debug('thread %r does not own a connection',
804                       threading.current_thread().name)
805            self._connect()
806            conn = self._tls.connection
807
808        conn.send((self._id, methodname, args, kwds))
809        kind, result = conn.recv()
810
811        if kind == '#RETURN':
812            return result
813        elif kind == '#PROXY':
814            exposed, token = result
815            proxytype = self._manager._registry[token.typeid][-1]
816            token.address = self._token.address
817            proxy = proxytype(
818                token, self._serializer, manager=self._manager,
819                authkey=self._authkey, exposed=exposed
820                )
821            conn = self._Client(token.address, authkey=self._authkey)
822            dispatch(conn, None, 'decref', (token.id,))
823            return proxy
824        raise convert_to_error(kind, result)
825
826    def _getvalue(self):
827        '''
828        Get a copy of the value of the referent
829        '''
830        return self._callmethod('#GETVALUE')
831
832    def _incref(self):
833        if self._owned_by_manager:
834            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
835            return
836
837        conn = self._Client(self._token.address, authkey=self._authkey)
838        dispatch(conn, None, 'incref', (self._id,))
839        util.debug('INCREF %r', self._token.id)
840
841        self._idset.add(self._id)
842
843        state = self._manager and self._manager._state
844
845        self._close = util.Finalize(
846            self, BaseProxy._decref,
847            args=(self._token, self._authkey, state,
848                  self._tls, self._idset, self._Client),
849            exitpriority=10
850            )
851
852    @staticmethod
853    def _decref(token, authkey, state, tls, idset, _Client):
854        idset.discard(token.id)
855
856        # check whether manager is still alive
857        if state is None or state.value == State.STARTED:
858            # tell manager this process no longer cares about referent
859            try:
860                util.debug('DECREF %r', token.id)
861                conn = _Client(token.address, authkey=authkey)
862                dispatch(conn, None, 'decref', (token.id,))
863            except Exception as e:
864                util.debug('... decref failed %s', e)
865
866        else:
867            util.debug('DECREF %r -- manager already shutdown', token.id)
868
869        # check whether we can close this thread's connection because
870        # the process owns no more references to objects for this manager
871        if not idset and hasattr(tls, 'connection'):
872            util.debug('thread %r has no more proxies so closing conn',
873                       threading.current_thread().name)
874            tls.connection.close()
875            del tls.connection
876
877    def _after_fork(self):
878        self._manager = None
879        try:
880            self._incref()
881        except Exception as e:
882            # the proxy may just be for a manager which has shutdown
883            util.info('incref failed: %s' % e)
884
885    def __reduce__(self):
886        kwds = {}
887        if get_spawning_popen() is not None:
888            kwds['authkey'] = self._authkey
889
890        if getattr(self, '_isauto', False):
891            kwds['exposed'] = self._exposed_
892            return (RebuildProxy,
893                    (AutoProxy, self._token, self._serializer, kwds))
894        else:
895            return (RebuildProxy,
896                    (type(self), self._token, self._serializer, kwds))
897
898    def __deepcopy__(self, memo):
899        return self._getvalue()
900
901    def __repr__(self):
902        return '<%s object, typeid %r at %#x>' % \
903               (type(self).__name__, self._token.typeid, id(self))
904
905    def __str__(self):
906        '''
907        Return representation of the referent (or a fall-back if that fails)
908        '''
909        try:
910            return self._callmethod('__repr__')
911        except Exception:
912            return repr(self)[:-1] + "; '__str__()' failed>"
913
914#
915# Function used for unpickling
916#
917
918def RebuildProxy(func, token, serializer, kwds):
919    '''
920    Function used for unpickling proxy objects.
921    '''
922    server = getattr(process.current_process(), '_manager_server', None)
923    if server and server.address == token.address:
924        util.debug('Rebuild a proxy owned by manager, token=%r', token)
925        kwds['manager_owned'] = True
926        if token.id not in server.id_to_local_proxy_obj:
927            server.id_to_local_proxy_obj[token.id] = \
928                server.id_to_obj[token.id]
929    incref = (
930        kwds.pop('incref', True) and
931        not getattr(process.current_process(), '_inheriting', False)
932        )
933    return func(token, serializer, incref=incref, **kwds)
934
935#
936# Functions to create proxies and proxy types
937#
938
939def MakeProxyType(name, exposed, _cache={}):
940    '''
941    Return a proxy type whose methods are given by `exposed`
942    '''
943    exposed = tuple(exposed)
944    try:
945        return _cache[(name, exposed)]
946    except KeyError:
947        pass
948
949    dic = {}
950
951    for meth in exposed:
952        exec('''def %s(self, /, *args, **kwds):
953        return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
954
955    ProxyType = type(name, (BaseProxy,), dic)
956    ProxyType._exposed_ = exposed
957    _cache[(name, exposed)] = ProxyType
958    return ProxyType
959
960
961def AutoProxy(token, serializer, manager=None, authkey=None,
962              exposed=None, incref=True):
963    '''
964    Return an auto-proxy for `token`
965    '''
966    _Client = listener_client[serializer][1]
967
968    if exposed is None:
969        conn = _Client(token.address, authkey=authkey)
970        try:
971            exposed = dispatch(conn, None, 'get_methods', (token,))
972        finally:
973            conn.close()
974
975    if authkey is None and manager is not None:
976        authkey = manager._authkey
977    if authkey is None:
978        authkey = process.current_process().authkey
979
980    ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
981    proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
982                      incref=incref)
983    proxy._isauto = True
984    return proxy
985
986#
987# Types/callables which we will register with SyncManager
988#
989
990class Namespace(object):
991    def __init__(self, /, **kwds):
992        self.__dict__.update(kwds)
993    def __repr__(self):
994        items = list(self.__dict__.items())
995        temp = []
996        for name, value in items:
997            if not name.startswith('_'):
998                temp.append('%s=%r' % (name, value))
999        temp.sort()
1000        return '%s(%s)' % (self.__class__.__name__, ', '.join(temp))
1001
1002class Value(object):
1003    def __init__(self, typecode, value, lock=True):
1004        self._typecode = typecode
1005        self._value = value
1006    def get(self):
1007        return self._value
1008    def set(self, value):
1009        self._value = value
1010    def __repr__(self):
1011        return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
1012    value = property(get, set)
1013
1014def Array(typecode, sequence, lock=True):
1015    return array.array(typecode, sequence)
1016
1017#
1018# Proxy types used by SyncManager
1019#
1020
1021class IteratorProxy(BaseProxy):
1022    _exposed_ = ('__next__', 'send', 'throw', 'close')
1023    def __iter__(self):
1024        return self
1025    def __next__(self, *args):
1026        return self._callmethod('__next__', args)
1027    def send(self, *args):
1028        return self._callmethod('send', args)
1029    def throw(self, *args):
1030        return self._callmethod('throw', args)
1031    def close(self, *args):
1032        return self._callmethod('close', args)
1033
1034
1035class AcquirerProxy(BaseProxy):
1036    _exposed_ = ('acquire', 'release')
1037    def acquire(self, blocking=True, timeout=None):
1038        args = (blocking,) if timeout is None else (blocking, timeout)
1039        return self._callmethod('acquire', args)
1040    def release(self):
1041        return self._callmethod('release')
1042    def __enter__(self):
1043        return self._callmethod('acquire')
1044    def __exit__(self, exc_type, exc_val, exc_tb):
1045        return self._callmethod('release')
1046
1047
1048class ConditionProxy(AcquirerProxy):
1049    _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notify_all')
1050    def wait(self, timeout=None):
1051        return self._callmethod('wait', (timeout,))
1052    def notify(self, n=1):
1053        return self._callmethod('notify', (n,))
1054    def notify_all(self):
1055        return self._callmethod('notify_all')
1056    def wait_for(self, predicate, timeout=None):
1057        result = predicate()
1058        if result:
1059            return result
1060        if timeout is not None:
1061            endtime = time.monotonic() + timeout
1062        else:
1063            endtime = None
1064            waittime = None
1065        while not result:
1066            if endtime is not None:
1067                waittime = endtime - time.monotonic()
1068                if waittime <= 0:
1069                    break
1070            self.wait(waittime)
1071            result = predicate()
1072        return result
1073
1074
1075class EventProxy(BaseProxy):
1076    _exposed_ = ('is_set', 'set', 'clear', 'wait')
1077    def is_set(self):
1078        return self._callmethod('is_set')
1079    def set(self):
1080        return self._callmethod('set')
1081    def clear(self):
1082        return self._callmethod('clear')
1083    def wait(self, timeout=None):
1084        return self._callmethod('wait', (timeout,))
1085
1086
1087class BarrierProxy(BaseProxy):
1088    _exposed_ = ('__getattribute__', 'wait', 'abort', 'reset')
1089    def wait(self, timeout=None):
1090        return self._callmethod('wait', (timeout,))
1091    def abort(self):
1092        return self._callmethod('abort')
1093    def reset(self):
1094        return self._callmethod('reset')
1095    @property
1096    def parties(self):
1097        return self._callmethod('__getattribute__', ('parties',))
1098    @property
1099    def n_waiting(self):
1100        return self._callmethod('__getattribute__', ('n_waiting',))
1101    @property
1102    def broken(self):
1103        return self._callmethod('__getattribute__', ('broken',))
1104
1105
1106class NamespaceProxy(BaseProxy):
1107    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')
1108    def __getattr__(self, key):
1109        if key[0] == '_':
1110            return object.__getattribute__(self, key)
1111        callmethod = object.__getattribute__(self, '_callmethod')
1112        return callmethod('__getattribute__', (key,))
1113    def __setattr__(self, key, value):
1114        if key[0] == '_':
1115            return object.__setattr__(self, key, value)
1116        callmethod = object.__getattribute__(self, '_callmethod')
1117        return callmethod('__setattr__', (key, value))
1118    def __delattr__(self, key):
1119        if key[0] == '_':
1120            return object.__delattr__(self, key)
1121        callmethod = object.__getattribute__(self, '_callmethod')
1122        return callmethod('__delattr__', (key,))
1123
1124
1125class ValueProxy(BaseProxy):
1126    _exposed_ = ('get', 'set')
1127    def get(self):
1128        return self._callmethod('get')
1129    def set(self, value):
1130        return self._callmethod('set', (value,))
1131    value = property(get, set)
1132
1133    __class_getitem__ = classmethod(types.GenericAlias)
1134
1135
1136BaseListProxy = MakeProxyType('BaseListProxy', (
1137    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
1138    '__mul__', '__reversed__', '__rmul__', '__setitem__',
1139    'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove',
1140    'reverse', 'sort', '__imul__'
1141    ))
1142class ListProxy(BaseListProxy):
1143    def __iadd__(self, value):
1144        self._callmethod('extend', (value,))
1145        return self
1146    def __imul__(self, value):
1147        self._callmethod('__imul__', (value,))
1148        return self
1149
1150
1151DictProxy = MakeProxyType('DictProxy', (
1152    '__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
1153    '__setitem__', 'clear', 'copy', 'get', 'items',
1154    'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
1155    ))
1156DictProxy._method_to_typeid_ = {
1157    '__iter__': 'Iterator',
1158    }
1159
1160
1161ArrayProxy = MakeProxyType('ArrayProxy', (
1162    '__len__', '__getitem__', '__setitem__'
1163    ))
1164
1165
1166BasePoolProxy = MakeProxyType('PoolProxy', (
1167    'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
1168    'map', 'map_async', 'starmap', 'starmap_async', 'terminate',
1169    ))
1170BasePoolProxy._method_to_typeid_ = {
1171    'apply_async': 'AsyncResult',
1172    'map_async': 'AsyncResult',
1173    'starmap_async': 'AsyncResult',
1174    'imap': 'Iterator',
1175    'imap_unordered': 'Iterator'
1176    }
1177class PoolProxy(BasePoolProxy):
1178    def __enter__(self):
1179        return self
1180    def __exit__(self, exc_type, exc_val, exc_tb):
1181        self.terminate()
1182
1183#
1184# Definition of SyncManager
1185#
1186
1187class SyncManager(BaseManager):
1188    '''
1189    Subclass of `BaseManager` which supports a number of shared object types.
1190
1191    The types registered are those intended for the synchronization
1192    of threads, plus `dict`, `list` and `Namespace`.
1193
1194    The `multiprocessing.Manager()` function creates started instances of
1195    this class.
1196    '''
1197
1198SyncManager.register('Queue', queue.Queue)
1199SyncManager.register('JoinableQueue', queue.Queue)
1200SyncManager.register('Event', threading.Event, EventProxy)
1201SyncManager.register('Lock', threading.Lock, AcquirerProxy)
1202SyncManager.register('RLock', threading.RLock, AcquirerProxy)
1203SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
1204SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
1205                     AcquirerProxy)
1206SyncManager.register('Condition', threading.Condition, ConditionProxy)
1207SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
1208SyncManager.register('Pool', pool.Pool, PoolProxy)
1209SyncManager.register('list', list, ListProxy)
1210SyncManager.register('dict', dict, DictProxy)
1211SyncManager.register('Value', Value, ValueProxy)
1212SyncManager.register('Array', Array, ArrayProxy)
1213SyncManager.register('Namespace', Namespace, NamespaceProxy)
1214
1215# types returned by methods of PoolProxy
1216SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
1217SyncManager.register('AsyncResult', create_method=False)
1218
1219#
1220# Definition of SharedMemoryManager and SharedMemoryServer
1221#
1222
1223if HAS_SHMEM:
1224    class _SharedMemoryTracker:
1225        "Manages one or more shared memory segments."
1226
1227        def __init__(self, name, segment_names=[]):
1228            self.shared_memory_context_name = name
1229            self.segment_names = segment_names
1230
1231        def register_segment(self, segment_name):
1232            "Adds the supplied shared memory block name to tracker."
1233            util.debug(f"Register segment {segment_name!r} in pid {getpid()}")
1234            self.segment_names.append(segment_name)
1235
1236        def destroy_segment(self, segment_name):
1237            """Calls unlink() on the shared memory block with the supplied name
1238            and removes it from the list of blocks being tracked."""
1239            util.debug(f"Destroy segment {segment_name!r} in pid {getpid()}")
1240            self.segment_names.remove(segment_name)
1241            segment = shared_memory.SharedMemory(segment_name)
1242            segment.close()
1243            segment.unlink()
1244
1245        def unlink(self):
1246            "Calls destroy_segment() on all tracked shared memory blocks."
1247            for segment_name in self.segment_names[:]:
1248                self.destroy_segment(segment_name)
1249
1250        def __del__(self):
1251            util.debug(f"Call {self.__class__.__name__}.__del__ in {getpid()}")
1252            self.unlink()
1253
1254        def __getstate__(self):
1255            return (self.shared_memory_context_name, self.segment_names)
1256
1257        def __setstate__(self, state):
1258            self.__init__(*state)
1259
1260
1261    class SharedMemoryServer(Server):
1262
1263        public = Server.public + \
1264                 ['track_segment', 'release_segment', 'list_segments']
1265
1266        def __init__(self, *args, **kwargs):
1267            Server.__init__(self, *args, **kwargs)
1268            address = self.address
1269            # The address of Linux abstract namespaces can be bytes
1270            if isinstance(address, bytes):
1271                address = os.fsdecode(address)
1272            self.shared_memory_context = \
1273                _SharedMemoryTracker(f"shm_{address}_{getpid()}")
1274            util.debug(f"SharedMemoryServer started by pid {getpid()}")
1275
1276        def create(self, c, typeid, /, *args, **kwargs):
1277            """Create a new distributed-shared object (not backed by a shared
1278            memory block) and return its id to be used in a Proxy Object."""
1279            # Unless set up as a shared proxy, don't make shared_memory_context
1280            # a standard part of kwargs.  This makes things easier for supplying
1281            # simple functions.
1282            if hasattr(self.registry[typeid][-1], "_shared_memory_proxy"):
1283                kwargs['shared_memory_context'] = self.shared_memory_context
1284            return Server.create(self, c, typeid, *args, **kwargs)
1285
1286        def shutdown(self, c):
1287            "Call unlink() on all tracked shared memory, terminate the Server."
1288            self.shared_memory_context.unlink()
1289            return Server.shutdown(self, c)
1290
1291        def track_segment(self, c, segment_name):
1292            "Adds the supplied shared memory block name to Server's tracker."
1293            self.shared_memory_context.register_segment(segment_name)
1294
1295        def release_segment(self, c, segment_name):
1296            """Calls unlink() on the shared memory block with the supplied name
1297            and removes it from the tracker instance inside the Server."""
1298            self.shared_memory_context.destroy_segment(segment_name)
1299
1300        def list_segments(self, c):
1301            """Returns a list of names of shared memory blocks that the Server
1302            is currently tracking."""
1303            return self.shared_memory_context.segment_names
1304
1305
1306    class SharedMemoryManager(BaseManager):
1307        """Like SyncManager but uses SharedMemoryServer instead of Server.
1308
1309        It provides methods for creating and returning SharedMemory instances
1310        and for creating a list-like object (ShareableList) backed by shared
1311        memory.  It also provides methods that create and return Proxy Objects
1312        that support synchronization across processes (i.e. multi-process-safe
1313        locks and semaphores).
1314        """
1315
1316        _Server = SharedMemoryServer
1317
1318        def __init__(self, *args, **kwargs):
1319            if os.name == "posix":
1320                # bpo-36867: Ensure the resource_tracker is running before
1321                # launching the manager process, so that concurrent
1322                # shared_memory manipulation both in the manager and in the
1323                # current process does not create two resource_tracker
1324                # processes.
1325                from . import resource_tracker
1326                resource_tracker.ensure_running()
1327            BaseManager.__init__(self, *args, **kwargs)
1328            util.debug(f"{self.__class__.__name__} created by pid {getpid()}")
1329
1330        def __del__(self):
1331            util.debug(f"{self.__class__.__name__}.__del__ by pid {getpid()}")
1332            pass
1333
1334        def get_server(self):
1335            'Better than monkeypatching for now; merge into Server ultimately'
1336            if self._state.value != State.INITIAL:
1337                if self._state.value == State.STARTED:
1338                    raise ProcessError("Already started SharedMemoryServer")
1339                elif self._state.value == State.SHUTDOWN:
1340                    raise ProcessError("SharedMemoryManager has shut down")
1341                else:
1342                    raise ProcessError(
1343                        "Unknown state {!r}".format(self._state.value))
1344            return self._Server(self._registry, self._address,
1345                                self._authkey, self._serializer)
1346
1347        def SharedMemory(self, size):
1348            """Returns a new SharedMemory instance with the specified size in
1349            bytes, to be tracked by the manager."""
1350            with self._Client(self._address, authkey=self._authkey) as conn:
1351                sms = shared_memory.SharedMemory(None, create=True, size=size)
1352                try:
1353                    dispatch(conn, None, 'track_segment', (sms.name,))
1354                except BaseException as e:
1355                    sms.unlink()
1356                    raise e
1357            return sms
1358
1359        def ShareableList(self, sequence):
1360            """Returns a new ShareableList instance populated with the values
1361            from the input sequence, to be tracked by the manager."""
1362            with self._Client(self._address, authkey=self._authkey) as conn:
1363                sl = shared_memory.ShareableList(sequence)
1364                try:
1365                    dispatch(conn, None, 'track_segment', (sl.shm.name,))
1366                except BaseException as e:
1367                    sl.shm.unlink()
1368                    raise e
1369            return sl
1370