1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import subprocess
21import struct
22import operator
23import pickle
24import weakref
25import warnings
26import test.support
27import test.support.script_helper
28from test import support
29from test.support import hashlib_helper
30from test.support import socket_helper
31
32
33# Skip tests if _multiprocessing wasn't built.
34_multiprocessing = test.support.import_module('_multiprocessing')
35# Skip tests if sem_open implementation is broken.
36support.skip_if_broken_multiprocessing_synchronize()
37import threading
38
39import multiprocessing.connection
40import multiprocessing.dummy
41import multiprocessing.heap
42import multiprocessing.managers
43import multiprocessing.pool
44import multiprocessing.queues
45
46from multiprocessing import util
47
48try:
49    from multiprocessing import reduction
50    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
51except ImportError:
52    HAS_REDUCTION = False
53
54try:
55    from multiprocessing.sharedctypes import Value, copy
56    HAS_SHAREDCTYPES = True
57except ImportError:
58    HAS_SHAREDCTYPES = False
59
60try:
61    from multiprocessing import shared_memory
62    HAS_SHMEM = True
63except ImportError:
64    HAS_SHMEM = False
65
66try:
67    import msvcrt
68except ImportError:
69    msvcrt = None
70
71
72def latin(s):
73    return s.encode('latin')
74
75
76def close_queue(queue):
77    if isinstance(queue, multiprocessing.queues.Queue):
78        queue.close()
79        queue.join_thread()
80
81
82def join_process(process):
83    # Since multiprocessing.Process has the same API than threading.Thread
84    # (join() and is_alive(), the support function can be reused
85    support.join_thread(process)
86
87
88if os.name == "posix":
89    from multiprocessing import resource_tracker
90
91    def _resource_unlink(name, rtype):
92        resource_tracker._CLEANUP_FUNCS[rtype](name)
93
94
95#
96# Constants
97#
98
99LOG_LEVEL = util.SUBWARNING
100#LOG_LEVEL = logging.DEBUG
101
102DELTA = 0.1
103CHECK_TIMINGS = False     # making true makes tests take a lot longer
104                          # and can sometimes cause some non-serious
105                          # failures because some calls block a bit
106                          # longer than expected
107if CHECK_TIMINGS:
108    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
109else:
110    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
111
112HAVE_GETVALUE = not getattr(_multiprocessing,
113                            'HAVE_BROKEN_SEM_GETVALUE', False)
114
115WIN32 = (sys.platform == "win32")
116
117from multiprocessing.connection import wait
118
119def wait_for_handle(handle, timeout):
120    if timeout is not None and timeout < 0.0:
121        timeout = None
122    return wait([handle], timeout)
123
124try:
125    MAXFD = os.sysconf("SC_OPEN_MAX")
126except:
127    MAXFD = 256
128
129# To speed up tests when using the forkserver, we can preload these:
130PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
131
132#
133# Some tests require ctypes
134#
135
136try:
137    from ctypes import Structure, c_int, c_double, c_longlong
138except ImportError:
139    Structure = object
140    c_int = c_double = c_longlong = None
141
142
143def check_enough_semaphores():
144    """Check that the system supports enough semaphores to run the test."""
145    # minimum number of semaphores available according to POSIX
146    nsems_min = 256
147    try:
148        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
149    except (AttributeError, ValueError):
150        # sysconf not available or setting not available
151        return
152    if nsems == -1 or nsems >= nsems_min:
153        return
154    raise unittest.SkipTest("The OS doesn't support enough semaphores "
155                            "to run the test (required: %d)." % nsems_min)
156
157
158#
159# Creates a wrapper for a function which records the time it takes to finish
160#
161
162class TimingWrapper(object):
163
164    def __init__(self, func):
165        self.func = func
166        self.elapsed = None
167
168    def __call__(self, *args, **kwds):
169        t = time.monotonic()
170        try:
171            return self.func(*args, **kwds)
172        finally:
173            self.elapsed = time.monotonic() - t
174
175#
176# Base class for test cases
177#
178
179class BaseTestCase(object):
180
181    ALLOWED_TYPES = ('processes', 'manager', 'threads')
182
183    def assertTimingAlmostEqual(self, a, b):
184        if CHECK_TIMINGS:
185            self.assertAlmostEqual(a, b, 1)
186
187    def assertReturnsIfImplemented(self, value, func, *args):
188        try:
189            res = func(*args)
190        except NotImplementedError:
191            pass
192        else:
193            return self.assertEqual(value, res)
194
195    # For the sanity of Windows users, rather than crashing or freezing in
196    # multiple ways.
197    def __reduce__(self, *args):
198        raise NotImplementedError("shouldn't try to pickle a test case")
199
200    __reduce_ex__ = __reduce__
201
202#
203# Return the value of a semaphore
204#
205
206def get_value(self):
207    try:
208        return self.get_value()
209    except AttributeError:
210        try:
211            return self._Semaphore__value
212        except AttributeError:
213            try:
214                return self._value
215            except AttributeError:
216                raise NotImplementedError
217
218#
219# Testcases
220#
221
222class DummyCallable:
223    def __call__(self, q, c):
224        assert isinstance(c, DummyCallable)
225        q.put(5)
226
227
228class _TestProcess(BaseTestCase):
229
230    ALLOWED_TYPES = ('processes', 'threads')
231
232    def test_current(self):
233        if self.TYPE == 'threads':
234            self.skipTest('test not appropriate for {}'.format(self.TYPE))
235
236        current = self.current_process()
237        authkey = current.authkey
238
239        self.assertTrue(current.is_alive())
240        self.assertTrue(not current.daemon)
241        self.assertIsInstance(authkey, bytes)
242        self.assertTrue(len(authkey) > 0)
243        self.assertEqual(current.ident, os.getpid())
244        self.assertEqual(current.exitcode, None)
245
246    def test_daemon_argument(self):
247        if self.TYPE == "threads":
248            self.skipTest('test not appropriate for {}'.format(self.TYPE))
249
250        # By default uses the current process's daemon flag.
251        proc0 = self.Process(target=self._test)
252        self.assertEqual(proc0.daemon, self.current_process().daemon)
253        proc1 = self.Process(target=self._test, daemon=True)
254        self.assertTrue(proc1.daemon)
255        proc2 = self.Process(target=self._test, daemon=False)
256        self.assertFalse(proc2.daemon)
257
258    @classmethod
259    def _test(cls, q, *args, **kwds):
260        current = cls.current_process()
261        q.put(args)
262        q.put(kwds)
263        q.put(current.name)
264        if cls.TYPE != 'threads':
265            q.put(bytes(current.authkey))
266            q.put(current.pid)
267
268    def test_parent_process_attributes(self):
269        if self.TYPE == "threads":
270            self.skipTest('test not appropriate for {}'.format(self.TYPE))
271
272        self.assertIsNone(self.parent_process())
273
274        rconn, wconn = self.Pipe(duplex=False)
275        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
276        p.start()
277        p.join()
278        parent_pid, parent_name = rconn.recv()
279        self.assertEqual(parent_pid, self.current_process().pid)
280        self.assertEqual(parent_pid, os.getpid())
281        self.assertEqual(parent_name, self.current_process().name)
282
283    @classmethod
284    def _test_send_parent_process(cls, wconn):
285        from multiprocessing.process import parent_process
286        wconn.send([parent_process().pid, parent_process().name])
287
288    def test_parent_process(self):
289        if self.TYPE == "threads":
290            self.skipTest('test not appropriate for {}'.format(self.TYPE))
291
292        # Launch a child process. Make it launch a grandchild process. Kill the
293        # child process and make sure that the grandchild notices the death of
294        # its parent (a.k.a the child process).
295        rconn, wconn = self.Pipe(duplex=False)
296        p = self.Process(
297            target=self._test_create_grandchild_process, args=(wconn, ))
298        p.start()
299
300        if not rconn.poll(timeout=support.LONG_TIMEOUT):
301            raise AssertionError("Could not communicate with child process")
302        parent_process_status = rconn.recv()
303        self.assertEqual(parent_process_status, "alive")
304
305        p.terminate()
306        p.join()
307
308        if not rconn.poll(timeout=support.LONG_TIMEOUT):
309            raise AssertionError("Could not communicate with child process")
310        parent_process_status = rconn.recv()
311        self.assertEqual(parent_process_status, "not alive")
312
313    @classmethod
314    def _test_create_grandchild_process(cls, wconn):
315        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
316        p.start()
317        time.sleep(300)
318
319    @classmethod
320    def _test_report_parent_status(cls, wconn):
321        from multiprocessing.process import parent_process
322        wconn.send("alive" if parent_process().is_alive() else "not alive")
323        parent_process().join(timeout=support.SHORT_TIMEOUT)
324        wconn.send("alive" if parent_process().is_alive() else "not alive")
325
326    def test_process(self):
327        q = self.Queue(1)
328        e = self.Event()
329        args = (q, 1, 2)
330        kwargs = {'hello':23, 'bye':2.54}
331        name = 'SomeProcess'
332        p = self.Process(
333            target=self._test, args=args, kwargs=kwargs, name=name
334            )
335        p.daemon = True
336        current = self.current_process()
337
338        if self.TYPE != 'threads':
339            self.assertEqual(p.authkey, current.authkey)
340        self.assertEqual(p.is_alive(), False)
341        self.assertEqual(p.daemon, True)
342        self.assertNotIn(p, self.active_children())
343        self.assertTrue(type(self.active_children()) is list)
344        self.assertEqual(p.exitcode, None)
345
346        p.start()
347
348        self.assertEqual(p.exitcode, None)
349        self.assertEqual(p.is_alive(), True)
350        self.assertIn(p, self.active_children())
351
352        self.assertEqual(q.get(), args[1:])
353        self.assertEqual(q.get(), kwargs)
354        self.assertEqual(q.get(), p.name)
355        if self.TYPE != 'threads':
356            self.assertEqual(q.get(), current.authkey)
357            self.assertEqual(q.get(), p.pid)
358
359        p.join()
360
361        self.assertEqual(p.exitcode, 0)
362        self.assertEqual(p.is_alive(), False)
363        self.assertNotIn(p, self.active_children())
364        close_queue(q)
365
366    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
367    def test_process_mainthread_native_id(self):
368        if self.TYPE == 'threads':
369            self.skipTest('test not appropriate for {}'.format(self.TYPE))
370
371        current_mainthread_native_id = threading.main_thread().native_id
372
373        q = self.Queue(1)
374        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
375        p.start()
376
377        child_mainthread_native_id = q.get()
378        p.join()
379        close_queue(q)
380
381        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
382
383    @classmethod
384    def _test_process_mainthread_native_id(cls, q):
385        mainthread_native_id = threading.main_thread().native_id
386        q.put(mainthread_native_id)
387
388    @classmethod
389    def _sleep_some(cls):
390        time.sleep(100)
391
392    @classmethod
393    def _test_sleep(cls, delay):
394        time.sleep(delay)
395
396    def _kill_process(self, meth):
397        if self.TYPE == 'threads':
398            self.skipTest('test not appropriate for {}'.format(self.TYPE))
399
400        p = self.Process(target=self._sleep_some)
401        p.daemon = True
402        p.start()
403
404        self.assertEqual(p.is_alive(), True)
405        self.assertIn(p, self.active_children())
406        self.assertEqual(p.exitcode, None)
407
408        join = TimingWrapper(p.join)
409
410        self.assertEqual(join(0), None)
411        self.assertTimingAlmostEqual(join.elapsed, 0.0)
412        self.assertEqual(p.is_alive(), True)
413
414        self.assertEqual(join(-1), None)
415        self.assertTimingAlmostEqual(join.elapsed, 0.0)
416        self.assertEqual(p.is_alive(), True)
417
418        # XXX maybe terminating too soon causes the problems on Gentoo...
419        time.sleep(1)
420
421        meth(p)
422
423        if hasattr(signal, 'alarm'):
424            # On the Gentoo buildbot waitpid() often seems to block forever.
425            # We use alarm() to interrupt it if it blocks for too long.
426            def handler(*args):
427                raise RuntimeError('join took too long: %s' % p)
428            old_handler = signal.signal(signal.SIGALRM, handler)
429            try:
430                signal.alarm(10)
431                self.assertEqual(join(), None)
432            finally:
433                signal.alarm(0)
434                signal.signal(signal.SIGALRM, old_handler)
435        else:
436            self.assertEqual(join(), None)
437
438        self.assertTimingAlmostEqual(join.elapsed, 0.0)
439
440        self.assertEqual(p.is_alive(), False)
441        self.assertNotIn(p, self.active_children())
442
443        p.join()
444
445        return p.exitcode
446
447    def test_terminate(self):
448        exitcode = self._kill_process(multiprocessing.Process.terminate)
449        if os.name != 'nt':
450            self.assertEqual(exitcode, -signal.SIGTERM)
451
452    def test_kill(self):
453        exitcode = self._kill_process(multiprocessing.Process.kill)
454        if os.name != 'nt':
455            self.assertEqual(exitcode, -signal.SIGKILL)
456
457    def test_cpu_count(self):
458        try:
459            cpus = multiprocessing.cpu_count()
460        except NotImplementedError:
461            cpus = 1
462        self.assertTrue(type(cpus) is int)
463        self.assertTrue(cpus >= 1)
464
465    def test_active_children(self):
466        self.assertEqual(type(self.active_children()), list)
467
468        p = self.Process(target=time.sleep, args=(DELTA,))
469        self.assertNotIn(p, self.active_children())
470
471        p.daemon = True
472        p.start()
473        self.assertIn(p, self.active_children())
474
475        p.join()
476        self.assertNotIn(p, self.active_children())
477
478    @classmethod
479    def _test_recursion(cls, wconn, id):
480        wconn.send(id)
481        if len(id) < 2:
482            for i in range(2):
483                p = cls.Process(
484                    target=cls._test_recursion, args=(wconn, id+[i])
485                    )
486                p.start()
487                p.join()
488
489    def test_recursion(self):
490        rconn, wconn = self.Pipe(duplex=False)
491        self._test_recursion(wconn, [])
492
493        time.sleep(DELTA)
494        result = []
495        while rconn.poll():
496            result.append(rconn.recv())
497
498        expected = [
499            [],
500              [0],
501                [0, 0],
502                [0, 1],
503              [1],
504                [1, 0],
505                [1, 1]
506            ]
507        self.assertEqual(result, expected)
508
509    @classmethod
510    def _test_sentinel(cls, event):
511        event.wait(10.0)
512
513    def test_sentinel(self):
514        if self.TYPE == "threads":
515            self.skipTest('test not appropriate for {}'.format(self.TYPE))
516        event = self.Event()
517        p = self.Process(target=self._test_sentinel, args=(event,))
518        with self.assertRaises(ValueError):
519            p.sentinel
520        p.start()
521        self.addCleanup(p.join)
522        sentinel = p.sentinel
523        self.assertIsInstance(sentinel, int)
524        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
525        event.set()
526        p.join()
527        self.assertTrue(wait_for_handle(sentinel, timeout=1))
528
529    @classmethod
530    def _test_close(cls, rc=0, q=None):
531        if q is not None:
532            q.get()
533        sys.exit(rc)
534
535    def test_close(self):
536        if self.TYPE == "threads":
537            self.skipTest('test not appropriate for {}'.format(self.TYPE))
538        q = self.Queue()
539        p = self.Process(target=self._test_close, kwargs={'q': q})
540        p.daemon = True
541        p.start()
542        self.assertEqual(p.is_alive(), True)
543        # Child is still alive, cannot close
544        with self.assertRaises(ValueError):
545            p.close()
546
547        q.put(None)
548        p.join()
549        self.assertEqual(p.is_alive(), False)
550        self.assertEqual(p.exitcode, 0)
551        p.close()
552        with self.assertRaises(ValueError):
553            p.is_alive()
554        with self.assertRaises(ValueError):
555            p.join()
556        with self.assertRaises(ValueError):
557            p.terminate()
558        p.close()
559
560        wr = weakref.ref(p)
561        del p
562        gc.collect()
563        self.assertIs(wr(), None)
564
565        close_queue(q)
566
567    def test_many_processes(self):
568        if self.TYPE == 'threads':
569            self.skipTest('test not appropriate for {}'.format(self.TYPE))
570
571        sm = multiprocessing.get_start_method()
572        N = 5 if sm == 'spawn' else 100
573
574        # Try to overwhelm the forkserver loop with events
575        procs = [self.Process(target=self._test_sleep, args=(0.01,))
576                 for i in range(N)]
577        for p in procs:
578            p.start()
579        for p in procs:
580            join_process(p)
581        for p in procs:
582            self.assertEqual(p.exitcode, 0)
583
584        procs = [self.Process(target=self._sleep_some)
585                 for i in range(N)]
586        for p in procs:
587            p.start()
588        time.sleep(0.001)  # let the children start...
589        for p in procs:
590            p.terminate()
591        for p in procs:
592            join_process(p)
593        if os.name != 'nt':
594            exitcodes = [-signal.SIGTERM]
595            if sys.platform == 'darwin':
596                # bpo-31510: On macOS, killing a freshly started process with
597                # SIGTERM sometimes kills the process with SIGKILL.
598                exitcodes.append(-signal.SIGKILL)
599            for p in procs:
600                self.assertIn(p.exitcode, exitcodes)
601
602    def test_lose_target_ref(self):
603        c = DummyCallable()
604        wr = weakref.ref(c)
605        q = self.Queue()
606        p = self.Process(target=c, args=(q, c))
607        del c
608        p.start()
609        p.join()
610        self.assertIs(wr(), None)
611        self.assertEqual(q.get(), 5)
612        close_queue(q)
613
614    @classmethod
615    def _test_child_fd_inflation(self, evt, q):
616        q.put(test.support.fd_count())
617        evt.wait()
618
619    def test_child_fd_inflation(self):
620        # Number of fds in child processes should not grow with the
621        # number of running children.
622        if self.TYPE == 'threads':
623            self.skipTest('test not appropriate for {}'.format(self.TYPE))
624
625        sm = multiprocessing.get_start_method()
626        if sm == 'fork':
627            # The fork method by design inherits all fds from the parent,
628            # trying to go against it is a lost battle
629            self.skipTest('test not appropriate for {}'.format(sm))
630
631        N = 5
632        evt = self.Event()
633        q = self.Queue()
634
635        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
636                 for i in range(N)]
637        for p in procs:
638            p.start()
639
640        try:
641            fd_counts = [q.get() for i in range(N)]
642            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
643
644        finally:
645            evt.set()
646            for p in procs:
647                p.join()
648            close_queue(q)
649
650    @classmethod
651    def _test_wait_for_threads(self, evt):
652        def func1():
653            time.sleep(0.5)
654            evt.set()
655
656        def func2():
657            time.sleep(20)
658            evt.clear()
659
660        threading.Thread(target=func1).start()
661        threading.Thread(target=func2, daemon=True).start()
662
663    def test_wait_for_threads(self):
664        # A child process should wait for non-daemonic threads to end
665        # before exiting
666        if self.TYPE == 'threads':
667            self.skipTest('test not appropriate for {}'.format(self.TYPE))
668
669        evt = self.Event()
670        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
671        proc.start()
672        proc.join()
673        self.assertTrue(evt.is_set())
674
675    @classmethod
676    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
677        for stream_name, action in break_std_streams.items():
678            if action == 'close':
679                stream = io.StringIO()
680                stream.close()
681            else:
682                assert action == 'remove'
683                stream = None
684            setattr(sys, stream_name, None)
685        evt.set()
686
687    def test_error_on_stdio_flush_1(self):
688        # Check that Process works with broken standard streams
689        streams = [io.StringIO(), None]
690        streams[0].close()
691        for stream_name in ('stdout', 'stderr'):
692            for stream in streams:
693                old_stream = getattr(sys, stream_name)
694                setattr(sys, stream_name, stream)
695                try:
696                    evt = self.Event()
697                    proc = self.Process(target=self._test_error_on_stdio_flush,
698                                        args=(evt,))
699                    proc.start()
700                    proc.join()
701                    self.assertTrue(evt.is_set())
702                    self.assertEqual(proc.exitcode, 0)
703                finally:
704                    setattr(sys, stream_name, old_stream)
705
706    def test_error_on_stdio_flush_2(self):
707        # Same as test_error_on_stdio_flush_1(), but standard streams are
708        # broken by the child process
709        for stream_name in ('stdout', 'stderr'):
710            for action in ('close', 'remove'):
711                old_stream = getattr(sys, stream_name)
712                try:
713                    evt = self.Event()
714                    proc = self.Process(target=self._test_error_on_stdio_flush,
715                                        args=(evt, {stream_name: action}))
716                    proc.start()
717                    proc.join()
718                    self.assertTrue(evt.is_set())
719                    self.assertEqual(proc.exitcode, 0)
720                finally:
721                    setattr(sys, stream_name, old_stream)
722
723    @classmethod
724    def _sleep_and_set_event(self, evt, delay=0.0):
725        time.sleep(delay)
726        evt.set()
727
728    def check_forkserver_death(self, signum):
729        # bpo-31308: if the forkserver process has died, we should still
730        # be able to create and run new Process instances (the forkserver
731        # is implicitly restarted).
732        if self.TYPE == 'threads':
733            self.skipTest('test not appropriate for {}'.format(self.TYPE))
734        sm = multiprocessing.get_start_method()
735        if sm != 'forkserver':
736            # The fork method by design inherits all fds from the parent,
737            # trying to go against it is a lost battle
738            self.skipTest('test not appropriate for {}'.format(sm))
739
740        from multiprocessing.forkserver import _forkserver
741        _forkserver.ensure_running()
742
743        # First process sleeps 500 ms
744        delay = 0.5
745
746        evt = self.Event()
747        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
748        proc.start()
749
750        pid = _forkserver._forkserver_pid
751        os.kill(pid, signum)
752        # give time to the fork server to die and time to proc to complete
753        time.sleep(delay * 2.0)
754
755        evt2 = self.Event()
756        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
757        proc2.start()
758        proc2.join()
759        self.assertTrue(evt2.is_set())
760        self.assertEqual(proc2.exitcode, 0)
761
762        proc.join()
763        self.assertTrue(evt.is_set())
764        self.assertIn(proc.exitcode, (0, 255))
765
766    def test_forkserver_sigint(self):
767        # Catchable signal
768        self.check_forkserver_death(signal.SIGINT)
769
770    def test_forkserver_sigkill(self):
771        # Uncatchable signal
772        if os.name != 'nt':
773            self.check_forkserver_death(signal.SIGKILL)
774
775
776#
777#
778#
779
780class _UpperCaser(multiprocessing.Process):
781
782    def __init__(self):
783        multiprocessing.Process.__init__(self)
784        self.child_conn, self.parent_conn = multiprocessing.Pipe()
785
786    def run(self):
787        self.parent_conn.close()
788        for s in iter(self.child_conn.recv, None):
789            self.child_conn.send(s.upper())
790        self.child_conn.close()
791
792    def submit(self, s):
793        assert type(s) is str
794        self.parent_conn.send(s)
795        return self.parent_conn.recv()
796
797    def stop(self):
798        self.parent_conn.send(None)
799        self.parent_conn.close()
800        self.child_conn.close()
801
802class _TestSubclassingProcess(BaseTestCase):
803
804    ALLOWED_TYPES = ('processes',)
805
806    def test_subclassing(self):
807        uppercaser = _UpperCaser()
808        uppercaser.daemon = True
809        uppercaser.start()
810        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
811        self.assertEqual(uppercaser.submit('world'), 'WORLD')
812        uppercaser.stop()
813        uppercaser.join()
814
815    def test_stderr_flush(self):
816        # sys.stderr is flushed at process shutdown (issue #13812)
817        if self.TYPE == "threads":
818            self.skipTest('test not appropriate for {}'.format(self.TYPE))
819
820        testfn = test.support.TESTFN
821        self.addCleanup(test.support.unlink, testfn)
822        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
823        proc.start()
824        proc.join()
825        with open(testfn, 'r') as f:
826            err = f.read()
827            # The whole traceback was printed
828            self.assertIn("ZeroDivisionError", err)
829            self.assertIn("test_multiprocessing.py", err)
830            self.assertIn("1/0 # MARKER", err)
831
832    @classmethod
833    def _test_stderr_flush(cls, testfn):
834        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
835        sys.stderr = open(fd, 'w', closefd=False)
836        1/0 # MARKER
837
838
839    @classmethod
840    def _test_sys_exit(cls, reason, testfn):
841        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
842        sys.stderr = open(fd, 'w', closefd=False)
843        sys.exit(reason)
844
845    def test_sys_exit(self):
846        # See Issue 13854
847        if self.TYPE == 'threads':
848            self.skipTest('test not appropriate for {}'.format(self.TYPE))
849
850        testfn = test.support.TESTFN
851        self.addCleanup(test.support.unlink, testfn)
852
853        for reason in (
854            [1, 2, 3],
855            'ignore this',
856        ):
857            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
858            p.daemon = True
859            p.start()
860            join_process(p)
861            self.assertEqual(p.exitcode, 1)
862
863            with open(testfn, 'r') as f:
864                content = f.read()
865            self.assertEqual(content.rstrip(), str(reason))
866
867            os.unlink(testfn)
868
869        cases = [
870            ((True,), 1),
871            ((False,), 0),
872            ((8,), 8),
873            ((None,), 0),
874            ((), 0),
875            ]
876
877        for args, expected in cases:
878            with self.subTest(args=args):
879                p = self.Process(target=sys.exit, args=args)
880                p.daemon = True
881                p.start()
882                join_process(p)
883                self.assertEqual(p.exitcode, expected)
884
885#
886#
887#
888
889def queue_empty(q):
890    if hasattr(q, 'empty'):
891        return q.empty()
892    else:
893        return q.qsize() == 0
894
895def queue_full(q, maxsize):
896    if hasattr(q, 'full'):
897        return q.full()
898    else:
899        return q.qsize() == maxsize
900
901
902class _TestQueue(BaseTestCase):
903
904
905    @classmethod
906    def _test_put(cls, queue, child_can_start, parent_can_continue):
907        child_can_start.wait()
908        for i in range(6):
909            queue.get()
910        parent_can_continue.set()
911
912    def test_put(self):
913        MAXSIZE = 6
914        queue = self.Queue(maxsize=MAXSIZE)
915        child_can_start = self.Event()
916        parent_can_continue = self.Event()
917
918        proc = self.Process(
919            target=self._test_put,
920            args=(queue, child_can_start, parent_can_continue)
921            )
922        proc.daemon = True
923        proc.start()
924
925        self.assertEqual(queue_empty(queue), True)
926        self.assertEqual(queue_full(queue, MAXSIZE), False)
927
928        queue.put(1)
929        queue.put(2, True)
930        queue.put(3, True, None)
931        queue.put(4, False)
932        queue.put(5, False, None)
933        queue.put_nowait(6)
934
935        # the values may be in buffer but not yet in pipe so sleep a bit
936        time.sleep(DELTA)
937
938        self.assertEqual(queue_empty(queue), False)
939        self.assertEqual(queue_full(queue, MAXSIZE), True)
940
941        put = TimingWrapper(queue.put)
942        put_nowait = TimingWrapper(queue.put_nowait)
943
944        self.assertRaises(pyqueue.Full, put, 7, False)
945        self.assertTimingAlmostEqual(put.elapsed, 0)
946
947        self.assertRaises(pyqueue.Full, put, 7, False, None)
948        self.assertTimingAlmostEqual(put.elapsed, 0)
949
950        self.assertRaises(pyqueue.Full, put_nowait, 7)
951        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
952
953        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
954        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
955
956        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
957        self.assertTimingAlmostEqual(put.elapsed, 0)
958
959        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
960        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
961
962        child_can_start.set()
963        parent_can_continue.wait()
964
965        self.assertEqual(queue_empty(queue), True)
966        self.assertEqual(queue_full(queue, MAXSIZE), False)
967
968        proc.join()
969        close_queue(queue)
970
971    @classmethod
972    def _test_get(cls, queue, child_can_start, parent_can_continue):
973        child_can_start.wait()
974        #queue.put(1)
975        queue.put(2)
976        queue.put(3)
977        queue.put(4)
978        queue.put(5)
979        parent_can_continue.set()
980
981    def test_get(self):
982        queue = self.Queue()
983        child_can_start = self.Event()
984        parent_can_continue = self.Event()
985
986        proc = self.Process(
987            target=self._test_get,
988            args=(queue, child_can_start, parent_can_continue)
989            )
990        proc.daemon = True
991        proc.start()
992
993        self.assertEqual(queue_empty(queue), True)
994
995        child_can_start.set()
996        parent_can_continue.wait()
997
998        time.sleep(DELTA)
999        self.assertEqual(queue_empty(queue), False)
1000
1001        # Hangs unexpectedly, remove for now
1002        #self.assertEqual(queue.get(), 1)
1003        self.assertEqual(queue.get(True, None), 2)
1004        self.assertEqual(queue.get(True), 3)
1005        self.assertEqual(queue.get(timeout=1), 4)
1006        self.assertEqual(queue.get_nowait(), 5)
1007
1008        self.assertEqual(queue_empty(queue), True)
1009
1010        get = TimingWrapper(queue.get)
1011        get_nowait = TimingWrapper(queue.get_nowait)
1012
1013        self.assertRaises(pyqueue.Empty, get, False)
1014        self.assertTimingAlmostEqual(get.elapsed, 0)
1015
1016        self.assertRaises(pyqueue.Empty, get, False, None)
1017        self.assertTimingAlmostEqual(get.elapsed, 0)
1018
1019        self.assertRaises(pyqueue.Empty, get_nowait)
1020        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1021
1022        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1023        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1024
1025        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1026        self.assertTimingAlmostEqual(get.elapsed, 0)
1027
1028        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1029        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1030
1031        proc.join()
1032        close_queue(queue)
1033
1034    @classmethod
1035    def _test_fork(cls, queue):
1036        for i in range(10, 20):
1037            queue.put(i)
1038        # note that at this point the items may only be buffered, so the
1039        # process cannot shutdown until the feeder thread has finished
1040        # pushing items onto the pipe.
1041
1042    def test_fork(self):
1043        # Old versions of Queue would fail to create a new feeder
1044        # thread for a forked process if the original process had its
1045        # own feeder thread.  This test checks that this no longer
1046        # happens.
1047
1048        queue = self.Queue()
1049
1050        # put items on queue so that main process starts a feeder thread
1051        for i in range(10):
1052            queue.put(i)
1053
1054        # wait to make sure thread starts before we fork a new process
1055        time.sleep(DELTA)
1056
1057        # fork process
1058        p = self.Process(target=self._test_fork, args=(queue,))
1059        p.daemon = True
1060        p.start()
1061
1062        # check that all expected items are in the queue
1063        for i in range(20):
1064            self.assertEqual(queue.get(), i)
1065        self.assertRaises(pyqueue.Empty, queue.get, False)
1066
1067        p.join()
1068        close_queue(queue)
1069
1070    def test_qsize(self):
1071        q = self.Queue()
1072        try:
1073            self.assertEqual(q.qsize(), 0)
1074        except NotImplementedError:
1075            self.skipTest('qsize method not implemented')
1076        q.put(1)
1077        self.assertEqual(q.qsize(), 1)
1078        q.put(5)
1079        self.assertEqual(q.qsize(), 2)
1080        q.get()
1081        self.assertEqual(q.qsize(), 1)
1082        q.get()
1083        self.assertEqual(q.qsize(), 0)
1084        close_queue(q)
1085
1086    @classmethod
1087    def _test_task_done(cls, q):
1088        for obj in iter(q.get, None):
1089            time.sleep(DELTA)
1090            q.task_done()
1091
1092    def test_task_done(self):
1093        queue = self.JoinableQueue()
1094
1095        workers = [self.Process(target=self._test_task_done, args=(queue,))
1096                   for i in range(4)]
1097
1098        for p in workers:
1099            p.daemon = True
1100            p.start()
1101
1102        for i in range(10):
1103            queue.put(i)
1104
1105        queue.join()
1106
1107        for p in workers:
1108            queue.put(None)
1109
1110        for p in workers:
1111            p.join()
1112        close_queue(queue)
1113
1114    def test_no_import_lock_contention(self):
1115        with test.support.temp_cwd():
1116            module_name = 'imported_by_an_imported_module'
1117            with open(module_name + '.py', 'w') as f:
1118                f.write("""if 1:
1119                    import multiprocessing
1120
1121                    q = multiprocessing.Queue()
1122                    q.put('knock knock')
1123                    q.get(timeout=3)
1124                    q.close()
1125                    del q
1126                """)
1127
1128            with test.support.DirsOnSysPath(os.getcwd()):
1129                try:
1130                    __import__(module_name)
1131                except pyqueue.Empty:
1132                    self.fail("Probable regression on import lock contention;"
1133                              " see Issue #22853")
1134
1135    def test_timeout(self):
1136        q = multiprocessing.Queue()
1137        start = time.monotonic()
1138        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1139        delta = time.monotonic() - start
1140        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1141        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1142        # failed because the delta was only 135.8 ms.
1143        self.assertGreaterEqual(delta, 0.100)
1144        close_queue(q)
1145
1146    def test_queue_feeder_donot_stop_onexc(self):
1147        # bpo-30414: verify feeder handles exceptions correctly
1148        if self.TYPE != 'processes':
1149            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1150
1151        class NotSerializable(object):
1152            def __reduce__(self):
1153                raise AttributeError
1154        with test.support.captured_stderr():
1155            q = self.Queue()
1156            q.put(NotSerializable())
1157            q.put(True)
1158            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1159            close_queue(q)
1160
1161        with test.support.captured_stderr():
1162            # bpo-33078: verify that the queue size is correctly handled
1163            # on errors.
1164            q = self.Queue(maxsize=1)
1165            q.put(NotSerializable())
1166            q.put(True)
1167            try:
1168                self.assertEqual(q.qsize(), 1)
1169            except NotImplementedError:
1170                # qsize is not available on all platform as it
1171                # relies on sem_getvalue
1172                pass
1173            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1174            # Check that the size of the queue is correct
1175            self.assertTrue(q.empty())
1176            close_queue(q)
1177
1178    def test_queue_feeder_on_queue_feeder_error(self):
1179        # bpo-30006: verify feeder handles exceptions using the
1180        # _on_queue_feeder_error hook.
1181        if self.TYPE != 'processes':
1182            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1183
1184        class NotSerializable(object):
1185            """Mock unserializable object"""
1186            def __init__(self):
1187                self.reduce_was_called = False
1188                self.on_queue_feeder_error_was_called = False
1189
1190            def __reduce__(self):
1191                self.reduce_was_called = True
1192                raise AttributeError
1193
1194        class SafeQueue(multiprocessing.queues.Queue):
1195            """Queue with overloaded _on_queue_feeder_error hook"""
1196            @staticmethod
1197            def _on_queue_feeder_error(e, obj):
1198                if (isinstance(e, AttributeError) and
1199                        isinstance(obj, NotSerializable)):
1200                    obj.on_queue_feeder_error_was_called = True
1201
1202        not_serializable_obj = NotSerializable()
1203        # The captured_stderr reduces the noise in the test report
1204        with test.support.captured_stderr():
1205            q = SafeQueue(ctx=multiprocessing.get_context())
1206            q.put(not_serializable_obj)
1207
1208            # Verify that q is still functioning correctly
1209            q.put(True)
1210            self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT))
1211
1212        # Assert that the serialization and the hook have been called correctly
1213        self.assertTrue(not_serializable_obj.reduce_was_called)
1214        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1215
1216    def test_closed_queue_put_get_exceptions(self):
1217        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1218            q.close()
1219            with self.assertRaisesRegex(ValueError, 'is closed'):
1220                q.put('foo')
1221            with self.assertRaisesRegex(ValueError, 'is closed'):
1222                q.get()
1223#
1224#
1225#
1226
1227class _TestLock(BaseTestCase):
1228
1229    def test_lock(self):
1230        lock = self.Lock()
1231        self.assertEqual(lock.acquire(), True)
1232        self.assertEqual(lock.acquire(False), False)
1233        self.assertEqual(lock.release(), None)
1234        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1235
1236    def test_rlock(self):
1237        lock = self.RLock()
1238        self.assertEqual(lock.acquire(), True)
1239        self.assertEqual(lock.acquire(), True)
1240        self.assertEqual(lock.acquire(), True)
1241        self.assertEqual(lock.release(), None)
1242        self.assertEqual(lock.release(), None)
1243        self.assertEqual(lock.release(), None)
1244        self.assertRaises((AssertionError, RuntimeError), lock.release)
1245
1246    def test_lock_context(self):
1247        with self.Lock():
1248            pass
1249
1250
1251class _TestSemaphore(BaseTestCase):
1252
1253    def _test_semaphore(self, sem):
1254        self.assertReturnsIfImplemented(2, get_value, sem)
1255        self.assertEqual(sem.acquire(), True)
1256        self.assertReturnsIfImplemented(1, get_value, sem)
1257        self.assertEqual(sem.acquire(), True)
1258        self.assertReturnsIfImplemented(0, get_value, sem)
1259        self.assertEqual(sem.acquire(False), False)
1260        self.assertReturnsIfImplemented(0, get_value, sem)
1261        self.assertEqual(sem.release(), None)
1262        self.assertReturnsIfImplemented(1, get_value, sem)
1263        self.assertEqual(sem.release(), None)
1264        self.assertReturnsIfImplemented(2, get_value, sem)
1265
1266    def test_semaphore(self):
1267        sem = self.Semaphore(2)
1268        self._test_semaphore(sem)
1269        self.assertEqual(sem.release(), None)
1270        self.assertReturnsIfImplemented(3, get_value, sem)
1271        self.assertEqual(sem.release(), None)
1272        self.assertReturnsIfImplemented(4, get_value, sem)
1273
1274    def test_bounded_semaphore(self):
1275        sem = self.BoundedSemaphore(2)
1276        self._test_semaphore(sem)
1277        # Currently fails on OS/X
1278        #if HAVE_GETVALUE:
1279        #    self.assertRaises(ValueError, sem.release)
1280        #    self.assertReturnsIfImplemented(2, get_value, sem)
1281
1282    def test_timeout(self):
1283        if self.TYPE != 'processes':
1284            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1285
1286        sem = self.Semaphore(0)
1287        acquire = TimingWrapper(sem.acquire)
1288
1289        self.assertEqual(acquire(False), False)
1290        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1291
1292        self.assertEqual(acquire(False, None), False)
1293        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1294
1295        self.assertEqual(acquire(False, TIMEOUT1), False)
1296        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1297
1298        self.assertEqual(acquire(True, TIMEOUT2), False)
1299        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1300
1301        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1302        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1303
1304
1305class _TestCondition(BaseTestCase):
1306
1307    @classmethod
1308    def f(cls, cond, sleeping, woken, timeout=None):
1309        cond.acquire()
1310        sleeping.release()
1311        cond.wait(timeout)
1312        woken.release()
1313        cond.release()
1314
1315    def assertReachesEventually(self, func, value):
1316        for i in range(10):
1317            try:
1318                if func() == value:
1319                    break
1320            except NotImplementedError:
1321                break
1322            time.sleep(DELTA)
1323        time.sleep(DELTA)
1324        self.assertReturnsIfImplemented(value, func)
1325
1326    def check_invariant(self, cond):
1327        # this is only supposed to succeed when there are no sleepers
1328        if self.TYPE == 'processes':
1329            try:
1330                sleepers = (cond._sleeping_count.get_value() -
1331                            cond._woken_count.get_value())
1332                self.assertEqual(sleepers, 0)
1333                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1334            except NotImplementedError:
1335                pass
1336
1337    def test_notify(self):
1338        cond = self.Condition()
1339        sleeping = self.Semaphore(0)
1340        woken = self.Semaphore(0)
1341
1342        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1343        p.daemon = True
1344        p.start()
1345        self.addCleanup(p.join)
1346
1347        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1348        p.daemon = True
1349        p.start()
1350        self.addCleanup(p.join)
1351
1352        # wait for both children to start sleeping
1353        sleeping.acquire()
1354        sleeping.acquire()
1355
1356        # check no process/thread has woken up
1357        time.sleep(DELTA)
1358        self.assertReturnsIfImplemented(0, get_value, woken)
1359
1360        # wake up one process/thread
1361        cond.acquire()
1362        cond.notify()
1363        cond.release()
1364
1365        # check one process/thread has woken up
1366        time.sleep(DELTA)
1367        self.assertReturnsIfImplemented(1, get_value, woken)
1368
1369        # wake up another
1370        cond.acquire()
1371        cond.notify()
1372        cond.release()
1373
1374        # check other has woken up
1375        time.sleep(DELTA)
1376        self.assertReturnsIfImplemented(2, get_value, woken)
1377
1378        # check state is not mucked up
1379        self.check_invariant(cond)
1380        p.join()
1381
1382    def test_notify_all(self):
1383        cond = self.Condition()
1384        sleeping = self.Semaphore(0)
1385        woken = self.Semaphore(0)
1386
1387        # start some threads/processes which will timeout
1388        for i in range(3):
1389            p = self.Process(target=self.f,
1390                             args=(cond, sleeping, woken, TIMEOUT1))
1391            p.daemon = True
1392            p.start()
1393            self.addCleanup(p.join)
1394
1395            t = threading.Thread(target=self.f,
1396                                 args=(cond, sleeping, woken, TIMEOUT1))
1397            t.daemon = True
1398            t.start()
1399            self.addCleanup(t.join)
1400
1401        # wait for them all to sleep
1402        for i in range(6):
1403            sleeping.acquire()
1404
1405        # check they have all timed out
1406        for i in range(6):
1407            woken.acquire()
1408        self.assertReturnsIfImplemented(0, get_value, woken)
1409
1410        # check state is not mucked up
1411        self.check_invariant(cond)
1412
1413        # start some more threads/processes
1414        for i in range(3):
1415            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1416            p.daemon = True
1417            p.start()
1418            self.addCleanup(p.join)
1419
1420            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1421            t.daemon = True
1422            t.start()
1423            self.addCleanup(t.join)
1424
1425        # wait for them to all sleep
1426        for i in range(6):
1427            sleeping.acquire()
1428
1429        # check no process/thread has woken up
1430        time.sleep(DELTA)
1431        self.assertReturnsIfImplemented(0, get_value, woken)
1432
1433        # wake them all up
1434        cond.acquire()
1435        cond.notify_all()
1436        cond.release()
1437
1438        # check they have all woken
1439        self.assertReachesEventually(lambda: get_value(woken), 6)
1440
1441        # check state is not mucked up
1442        self.check_invariant(cond)
1443
1444    def test_notify_n(self):
1445        cond = self.Condition()
1446        sleeping = self.Semaphore(0)
1447        woken = self.Semaphore(0)
1448
1449        # start some threads/processes
1450        for i in range(3):
1451            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1452            p.daemon = True
1453            p.start()
1454            self.addCleanup(p.join)
1455
1456            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1457            t.daemon = True
1458            t.start()
1459            self.addCleanup(t.join)
1460
1461        # wait for them to all sleep
1462        for i in range(6):
1463            sleeping.acquire()
1464
1465        # check no process/thread has woken up
1466        time.sleep(DELTA)
1467        self.assertReturnsIfImplemented(0, get_value, woken)
1468
1469        # wake some of them up
1470        cond.acquire()
1471        cond.notify(n=2)
1472        cond.release()
1473
1474        # check 2 have woken
1475        self.assertReachesEventually(lambda: get_value(woken), 2)
1476
1477        # wake the rest of them
1478        cond.acquire()
1479        cond.notify(n=4)
1480        cond.release()
1481
1482        self.assertReachesEventually(lambda: get_value(woken), 6)
1483
1484        # doesn't do anything more
1485        cond.acquire()
1486        cond.notify(n=3)
1487        cond.release()
1488
1489        self.assertReturnsIfImplemented(6, get_value, woken)
1490
1491        # check state is not mucked up
1492        self.check_invariant(cond)
1493
1494    def test_timeout(self):
1495        cond = self.Condition()
1496        wait = TimingWrapper(cond.wait)
1497        cond.acquire()
1498        res = wait(TIMEOUT1)
1499        cond.release()
1500        self.assertEqual(res, False)
1501        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1502
1503    @classmethod
1504    def _test_waitfor_f(cls, cond, state):
1505        with cond:
1506            state.value = 0
1507            cond.notify()
1508            result = cond.wait_for(lambda : state.value==4)
1509            if not result or state.value != 4:
1510                sys.exit(1)
1511
1512    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1513    def test_waitfor(self):
1514        # based on test in test/lock_tests.py
1515        cond = self.Condition()
1516        state = self.Value('i', -1)
1517
1518        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1519        p.daemon = True
1520        p.start()
1521
1522        with cond:
1523            result = cond.wait_for(lambda : state.value==0)
1524            self.assertTrue(result)
1525            self.assertEqual(state.value, 0)
1526
1527        for i in range(4):
1528            time.sleep(0.01)
1529            with cond:
1530                state.value += 1
1531                cond.notify()
1532
1533        join_process(p)
1534        self.assertEqual(p.exitcode, 0)
1535
1536    @classmethod
1537    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1538        sem.release()
1539        with cond:
1540            expected = 0.1
1541            dt = time.monotonic()
1542            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1543            dt = time.monotonic() - dt
1544            # borrow logic in assertTimeout() from test/lock_tests.py
1545            if not result and expected * 0.6 < dt < expected * 10.0:
1546                success.value = True
1547
1548    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1549    def test_waitfor_timeout(self):
1550        # based on test in test/lock_tests.py
1551        cond = self.Condition()
1552        state = self.Value('i', 0)
1553        success = self.Value('i', False)
1554        sem = self.Semaphore(0)
1555
1556        p = self.Process(target=self._test_waitfor_timeout_f,
1557                         args=(cond, state, success, sem))
1558        p.daemon = True
1559        p.start()
1560        self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT))
1561
1562        # Only increment 3 times, so state == 4 is never reached.
1563        for i in range(3):
1564            time.sleep(0.01)
1565            with cond:
1566                state.value += 1
1567                cond.notify()
1568
1569        join_process(p)
1570        self.assertTrue(success.value)
1571
1572    @classmethod
1573    def _test_wait_result(cls, c, pid):
1574        with c:
1575            c.notify()
1576        time.sleep(1)
1577        if pid is not None:
1578            os.kill(pid, signal.SIGINT)
1579
1580    def test_wait_result(self):
1581        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1582            pid = os.getpid()
1583        else:
1584            pid = None
1585
1586        c = self.Condition()
1587        with c:
1588            self.assertFalse(c.wait(0))
1589            self.assertFalse(c.wait(0.1))
1590
1591            p = self.Process(target=self._test_wait_result, args=(c, pid))
1592            p.start()
1593
1594            self.assertTrue(c.wait(60))
1595            if pid is not None:
1596                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1597
1598            p.join()
1599
1600
1601class _TestEvent(BaseTestCase):
1602
1603    @classmethod
1604    def _test_event(cls, event):
1605        time.sleep(TIMEOUT2)
1606        event.set()
1607
1608    def test_event(self):
1609        event = self.Event()
1610        wait = TimingWrapper(event.wait)
1611
1612        # Removed temporarily, due to API shear, this does not
1613        # work with threading._Event objects. is_set == isSet
1614        self.assertEqual(event.is_set(), False)
1615
1616        # Removed, threading.Event.wait() will return the value of the __flag
1617        # instead of None. API Shear with the semaphore backed mp.Event
1618        self.assertEqual(wait(0.0), False)
1619        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1620        self.assertEqual(wait(TIMEOUT1), False)
1621        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1622
1623        event.set()
1624
1625        # See note above on the API differences
1626        self.assertEqual(event.is_set(), True)
1627        self.assertEqual(wait(), True)
1628        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1629        self.assertEqual(wait(TIMEOUT1), True)
1630        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1631        # self.assertEqual(event.is_set(), True)
1632
1633        event.clear()
1634
1635        #self.assertEqual(event.is_set(), False)
1636
1637        p = self.Process(target=self._test_event, args=(event,))
1638        p.daemon = True
1639        p.start()
1640        self.assertEqual(wait(), True)
1641        p.join()
1642
1643#
1644# Tests for Barrier - adapted from tests in test/lock_tests.py
1645#
1646
1647# Many of the tests for threading.Barrier use a list as an atomic
1648# counter: a value is appended to increment the counter, and the
1649# length of the list gives the value.  We use the class DummyList
1650# for the same purpose.
1651
1652class _DummyList(object):
1653
1654    def __init__(self):
1655        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1656        lock = multiprocessing.Lock()
1657        self.__setstate__((wrapper, lock))
1658        self._lengthbuf[0] = 0
1659
1660    def __setstate__(self, state):
1661        (self._wrapper, self._lock) = state
1662        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1663
1664    def __getstate__(self):
1665        return (self._wrapper, self._lock)
1666
1667    def append(self, _):
1668        with self._lock:
1669            self._lengthbuf[0] += 1
1670
1671    def __len__(self):
1672        with self._lock:
1673            return self._lengthbuf[0]
1674
1675def _wait():
1676    # A crude wait/yield function not relying on synchronization primitives.
1677    time.sleep(0.01)
1678
1679
1680class Bunch(object):
1681    """
1682    A bunch of threads.
1683    """
1684    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1685        """
1686        Construct a bunch of `n` threads running the same function `f`.
1687        If `wait_before_exit` is True, the threads won't terminate until
1688        do_finish() is called.
1689        """
1690        self.f = f
1691        self.args = args
1692        self.n = n
1693        self.started = namespace.DummyList()
1694        self.finished = namespace.DummyList()
1695        self._can_exit = namespace.Event()
1696        if not wait_before_exit:
1697            self._can_exit.set()
1698
1699        threads = []
1700        for i in range(n):
1701            p = namespace.Process(target=self.task)
1702            p.daemon = True
1703            p.start()
1704            threads.append(p)
1705
1706        def finalize(threads):
1707            for p in threads:
1708                p.join()
1709
1710        self._finalizer = weakref.finalize(self, finalize, threads)
1711
1712    def task(self):
1713        pid = os.getpid()
1714        self.started.append(pid)
1715        try:
1716            self.f(*self.args)
1717        finally:
1718            self.finished.append(pid)
1719            self._can_exit.wait(30)
1720            assert self._can_exit.is_set()
1721
1722    def wait_for_started(self):
1723        while len(self.started) < self.n:
1724            _wait()
1725
1726    def wait_for_finished(self):
1727        while len(self.finished) < self.n:
1728            _wait()
1729
1730    def do_finish(self):
1731        self._can_exit.set()
1732
1733    def close(self):
1734        self._finalizer()
1735
1736
1737class AppendTrue(object):
1738    def __init__(self, obj):
1739        self.obj = obj
1740    def __call__(self):
1741        self.obj.append(True)
1742
1743
1744class _TestBarrier(BaseTestCase):
1745    """
1746    Tests for Barrier objects.
1747    """
1748    N = 5
1749    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1750
1751    def setUp(self):
1752        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1753
1754    def tearDown(self):
1755        self.barrier.abort()
1756        self.barrier = None
1757
1758    def DummyList(self):
1759        if self.TYPE == 'threads':
1760            return []
1761        elif self.TYPE == 'manager':
1762            return self.manager.list()
1763        else:
1764            return _DummyList()
1765
1766    def run_threads(self, f, args):
1767        b = Bunch(self, f, args, self.N-1)
1768        try:
1769            f(*args)
1770            b.wait_for_finished()
1771        finally:
1772            b.close()
1773
1774    @classmethod
1775    def multipass(cls, barrier, results, n):
1776        m = barrier.parties
1777        assert m == cls.N
1778        for i in range(n):
1779            results[0].append(True)
1780            assert len(results[1]) == i * m
1781            barrier.wait()
1782            results[1].append(True)
1783            assert len(results[0]) == (i + 1) * m
1784            barrier.wait()
1785        try:
1786            assert barrier.n_waiting == 0
1787        except NotImplementedError:
1788            pass
1789        assert not barrier.broken
1790
1791    def test_barrier(self, passes=1):
1792        """
1793        Test that a barrier is passed in lockstep
1794        """
1795        results = [self.DummyList(), self.DummyList()]
1796        self.run_threads(self.multipass, (self.barrier, results, passes))
1797
1798    def test_barrier_10(self):
1799        """
1800        Test that a barrier works for 10 consecutive runs
1801        """
1802        return self.test_barrier(10)
1803
1804    @classmethod
1805    def _test_wait_return_f(cls, barrier, queue):
1806        res = barrier.wait()
1807        queue.put(res)
1808
1809    def test_wait_return(self):
1810        """
1811        test the return value from barrier.wait
1812        """
1813        queue = self.Queue()
1814        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1815        results = [queue.get() for i in range(self.N)]
1816        self.assertEqual(results.count(0), 1)
1817        close_queue(queue)
1818
1819    @classmethod
1820    def _test_action_f(cls, barrier, results):
1821        barrier.wait()
1822        if len(results) != 1:
1823            raise RuntimeError
1824
1825    def test_action(self):
1826        """
1827        Test the 'action' callback
1828        """
1829        results = self.DummyList()
1830        barrier = self.Barrier(self.N, action=AppendTrue(results))
1831        self.run_threads(self._test_action_f, (barrier, results))
1832        self.assertEqual(len(results), 1)
1833
1834    @classmethod
1835    def _test_abort_f(cls, barrier, results1, results2):
1836        try:
1837            i = barrier.wait()
1838            if i == cls.N//2:
1839                raise RuntimeError
1840            barrier.wait()
1841            results1.append(True)
1842        except threading.BrokenBarrierError:
1843            results2.append(True)
1844        except RuntimeError:
1845            barrier.abort()
1846
1847    def test_abort(self):
1848        """
1849        Test that an abort will put the barrier in a broken state
1850        """
1851        results1 = self.DummyList()
1852        results2 = self.DummyList()
1853        self.run_threads(self._test_abort_f,
1854                         (self.barrier, results1, results2))
1855        self.assertEqual(len(results1), 0)
1856        self.assertEqual(len(results2), self.N-1)
1857        self.assertTrue(self.barrier.broken)
1858
1859    @classmethod
1860    def _test_reset_f(cls, barrier, results1, results2, results3):
1861        i = barrier.wait()
1862        if i == cls.N//2:
1863            # Wait until the other threads are all in the barrier.
1864            while barrier.n_waiting < cls.N-1:
1865                time.sleep(0.001)
1866            barrier.reset()
1867        else:
1868            try:
1869                barrier.wait()
1870                results1.append(True)
1871            except threading.BrokenBarrierError:
1872                results2.append(True)
1873        # Now, pass the barrier again
1874        barrier.wait()
1875        results3.append(True)
1876
1877    def test_reset(self):
1878        """
1879        Test that a 'reset' on a barrier frees the waiting threads
1880        """
1881        results1 = self.DummyList()
1882        results2 = self.DummyList()
1883        results3 = self.DummyList()
1884        self.run_threads(self._test_reset_f,
1885                         (self.barrier, results1, results2, results3))
1886        self.assertEqual(len(results1), 0)
1887        self.assertEqual(len(results2), self.N-1)
1888        self.assertEqual(len(results3), self.N)
1889
1890    @classmethod
1891    def _test_abort_and_reset_f(cls, barrier, barrier2,
1892                                results1, results2, results3):
1893        try:
1894            i = barrier.wait()
1895            if i == cls.N//2:
1896                raise RuntimeError
1897            barrier.wait()
1898            results1.append(True)
1899        except threading.BrokenBarrierError:
1900            results2.append(True)
1901        except RuntimeError:
1902            barrier.abort()
1903        # Synchronize and reset the barrier.  Must synchronize first so
1904        # that everyone has left it when we reset, and after so that no
1905        # one enters it before the reset.
1906        if barrier2.wait() == cls.N//2:
1907            barrier.reset()
1908        barrier2.wait()
1909        barrier.wait()
1910        results3.append(True)
1911
1912    def test_abort_and_reset(self):
1913        """
1914        Test that a barrier can be reset after being broken.
1915        """
1916        results1 = self.DummyList()
1917        results2 = self.DummyList()
1918        results3 = self.DummyList()
1919        barrier2 = self.Barrier(self.N)
1920
1921        self.run_threads(self._test_abort_and_reset_f,
1922                         (self.barrier, barrier2, results1, results2, results3))
1923        self.assertEqual(len(results1), 0)
1924        self.assertEqual(len(results2), self.N-1)
1925        self.assertEqual(len(results3), self.N)
1926
1927    @classmethod
1928    def _test_timeout_f(cls, barrier, results):
1929        i = barrier.wait()
1930        if i == cls.N//2:
1931            # One thread is late!
1932            time.sleep(1.0)
1933        try:
1934            barrier.wait(0.5)
1935        except threading.BrokenBarrierError:
1936            results.append(True)
1937
1938    def test_timeout(self):
1939        """
1940        Test wait(timeout)
1941        """
1942        results = self.DummyList()
1943        self.run_threads(self._test_timeout_f, (self.barrier, results))
1944        self.assertEqual(len(results), self.barrier.parties)
1945
1946    @classmethod
1947    def _test_default_timeout_f(cls, barrier, results):
1948        i = barrier.wait(cls.defaultTimeout)
1949        if i == cls.N//2:
1950            # One thread is later than the default timeout
1951            time.sleep(1.0)
1952        try:
1953            barrier.wait()
1954        except threading.BrokenBarrierError:
1955            results.append(True)
1956
1957    def test_default_timeout(self):
1958        """
1959        Test the barrier's default timeout
1960        """
1961        barrier = self.Barrier(self.N, timeout=0.5)
1962        results = self.DummyList()
1963        self.run_threads(self._test_default_timeout_f, (barrier, results))
1964        self.assertEqual(len(results), barrier.parties)
1965
1966    def test_single_thread(self):
1967        b = self.Barrier(1)
1968        b.wait()
1969        b.wait()
1970
1971    @classmethod
1972    def _test_thousand_f(cls, barrier, passes, conn, lock):
1973        for i in range(passes):
1974            barrier.wait()
1975            with lock:
1976                conn.send(i)
1977
1978    def test_thousand(self):
1979        if self.TYPE == 'manager':
1980            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1981        passes = 1000
1982        lock = self.Lock()
1983        conn, child_conn = self.Pipe(False)
1984        for j in range(self.N):
1985            p = self.Process(target=self._test_thousand_f,
1986                           args=(self.barrier, passes, child_conn, lock))
1987            p.start()
1988            self.addCleanup(p.join)
1989
1990        for i in range(passes):
1991            for j in range(self.N):
1992                self.assertEqual(conn.recv(), i)
1993
1994#
1995#
1996#
1997
1998class _TestValue(BaseTestCase):
1999
2000    ALLOWED_TYPES = ('processes',)
2001
2002    codes_values = [
2003        ('i', 4343, 24234),
2004        ('d', 3.625, -4.25),
2005        ('h', -232, 234),
2006        ('q', 2 ** 33, 2 ** 34),
2007        ('c', latin('x'), latin('y'))
2008        ]
2009
2010    def setUp(self):
2011        if not HAS_SHAREDCTYPES:
2012            self.skipTest("requires multiprocessing.sharedctypes")
2013
2014    @classmethod
2015    def _test(cls, values):
2016        for sv, cv in zip(values, cls.codes_values):
2017            sv.value = cv[2]
2018
2019
2020    def test_value(self, raw=False):
2021        if raw:
2022            values = [self.RawValue(code, value)
2023                      for code, value, _ in self.codes_values]
2024        else:
2025            values = [self.Value(code, value)
2026                      for code, value, _ in self.codes_values]
2027
2028        for sv, cv in zip(values, self.codes_values):
2029            self.assertEqual(sv.value, cv[1])
2030
2031        proc = self.Process(target=self._test, args=(values,))
2032        proc.daemon = True
2033        proc.start()
2034        proc.join()
2035
2036        for sv, cv in zip(values, self.codes_values):
2037            self.assertEqual(sv.value, cv[2])
2038
2039    def test_rawvalue(self):
2040        self.test_value(raw=True)
2041
2042    def test_getobj_getlock(self):
2043        val1 = self.Value('i', 5)
2044        lock1 = val1.get_lock()
2045        obj1 = val1.get_obj()
2046
2047        val2 = self.Value('i', 5, lock=None)
2048        lock2 = val2.get_lock()
2049        obj2 = val2.get_obj()
2050
2051        lock = self.Lock()
2052        val3 = self.Value('i', 5, lock=lock)
2053        lock3 = val3.get_lock()
2054        obj3 = val3.get_obj()
2055        self.assertEqual(lock, lock3)
2056
2057        arr4 = self.Value('i', 5, lock=False)
2058        self.assertFalse(hasattr(arr4, 'get_lock'))
2059        self.assertFalse(hasattr(arr4, 'get_obj'))
2060
2061        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2062
2063        arr5 = self.RawValue('i', 5)
2064        self.assertFalse(hasattr(arr5, 'get_lock'))
2065        self.assertFalse(hasattr(arr5, 'get_obj'))
2066
2067
2068class _TestArray(BaseTestCase):
2069
2070    ALLOWED_TYPES = ('processes',)
2071
2072    @classmethod
2073    def f(cls, seq):
2074        for i in range(1, len(seq)):
2075            seq[i] += seq[i-1]
2076
2077    @unittest.skipIf(c_int is None, "requires _ctypes")
2078    def test_array(self, raw=False):
2079        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2080        if raw:
2081            arr = self.RawArray('i', seq)
2082        else:
2083            arr = self.Array('i', seq)
2084
2085        self.assertEqual(len(arr), len(seq))
2086        self.assertEqual(arr[3], seq[3])
2087        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2088
2089        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2090
2091        self.assertEqual(list(arr[:]), seq)
2092
2093        self.f(seq)
2094
2095        p = self.Process(target=self.f, args=(arr,))
2096        p.daemon = True
2097        p.start()
2098        p.join()
2099
2100        self.assertEqual(list(arr[:]), seq)
2101
2102    @unittest.skipIf(c_int is None, "requires _ctypes")
2103    def test_array_from_size(self):
2104        size = 10
2105        # Test for zeroing (see issue #11675).
2106        # The repetition below strengthens the test by increasing the chances
2107        # of previously allocated non-zero memory being used for the new array
2108        # on the 2nd and 3rd loops.
2109        for _ in range(3):
2110            arr = self.Array('i', size)
2111            self.assertEqual(len(arr), size)
2112            self.assertEqual(list(arr), [0] * size)
2113            arr[:] = range(10)
2114            self.assertEqual(list(arr), list(range(10)))
2115            del arr
2116
2117    @unittest.skipIf(c_int is None, "requires _ctypes")
2118    def test_rawarray(self):
2119        self.test_array(raw=True)
2120
2121    @unittest.skipIf(c_int is None, "requires _ctypes")
2122    def test_getobj_getlock_obj(self):
2123        arr1 = self.Array('i', list(range(10)))
2124        lock1 = arr1.get_lock()
2125        obj1 = arr1.get_obj()
2126
2127        arr2 = self.Array('i', list(range(10)), lock=None)
2128        lock2 = arr2.get_lock()
2129        obj2 = arr2.get_obj()
2130
2131        lock = self.Lock()
2132        arr3 = self.Array('i', list(range(10)), lock=lock)
2133        lock3 = arr3.get_lock()
2134        obj3 = arr3.get_obj()
2135        self.assertEqual(lock, lock3)
2136
2137        arr4 = self.Array('i', range(10), lock=False)
2138        self.assertFalse(hasattr(arr4, 'get_lock'))
2139        self.assertFalse(hasattr(arr4, 'get_obj'))
2140        self.assertRaises(AttributeError,
2141                          self.Array, 'i', range(10), lock='notalock')
2142
2143        arr5 = self.RawArray('i', range(10))
2144        self.assertFalse(hasattr(arr5, 'get_lock'))
2145        self.assertFalse(hasattr(arr5, 'get_obj'))
2146
2147#
2148#
2149#
2150
2151class _TestContainers(BaseTestCase):
2152
2153    ALLOWED_TYPES = ('manager',)
2154
2155    def test_list(self):
2156        a = self.list(list(range(10)))
2157        self.assertEqual(a[:], list(range(10)))
2158
2159        b = self.list()
2160        self.assertEqual(b[:], [])
2161
2162        b.extend(list(range(5)))
2163        self.assertEqual(b[:], list(range(5)))
2164
2165        self.assertEqual(b[2], 2)
2166        self.assertEqual(b[2:10], [2,3,4])
2167
2168        b *= 2
2169        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2170
2171        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2172
2173        self.assertEqual(a[:], list(range(10)))
2174
2175        d = [a, b]
2176        e = self.list(d)
2177        self.assertEqual(
2178            [element[:] for element in e],
2179            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2180            )
2181
2182        f = self.list([a])
2183        a.append('hello')
2184        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2185
2186    def test_list_iter(self):
2187        a = self.list(list(range(10)))
2188        it = iter(a)
2189        self.assertEqual(list(it), list(range(10)))
2190        self.assertEqual(list(it), [])  # exhausted
2191        # list modified during iteration
2192        it = iter(a)
2193        a[0] = 100
2194        self.assertEqual(next(it), 100)
2195
2196    def test_list_proxy_in_list(self):
2197        a = self.list([self.list(range(3)) for _i in range(3)])
2198        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2199
2200        a[0][-1] = 55
2201        self.assertEqual(a[0][:], [0, 1, 55])
2202        for i in range(1, 3):
2203            self.assertEqual(a[i][:], [0, 1, 2])
2204
2205        self.assertEqual(a[1].pop(), 2)
2206        self.assertEqual(len(a[1]), 2)
2207        for i in range(0, 3, 2):
2208            self.assertEqual(len(a[i]), 3)
2209
2210        del a
2211
2212        b = self.list()
2213        b.append(b)
2214        del b
2215
2216    def test_dict(self):
2217        d = self.dict()
2218        indices = list(range(65, 70))
2219        for i in indices:
2220            d[i] = chr(i)
2221        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2222        self.assertEqual(sorted(d.keys()), indices)
2223        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2224        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2225
2226    def test_dict_iter(self):
2227        d = self.dict()
2228        indices = list(range(65, 70))
2229        for i in indices:
2230            d[i] = chr(i)
2231        it = iter(d)
2232        self.assertEqual(list(it), indices)
2233        self.assertEqual(list(it), [])  # exhausted
2234        # dictionary changed size during iteration
2235        it = iter(d)
2236        d.clear()
2237        self.assertRaises(RuntimeError, next, it)
2238
2239    def test_dict_proxy_nested(self):
2240        pets = self.dict(ferrets=2, hamsters=4)
2241        supplies = self.dict(water=10, feed=3)
2242        d = self.dict(pets=pets, supplies=supplies)
2243
2244        self.assertEqual(supplies['water'], 10)
2245        self.assertEqual(d['supplies']['water'], 10)
2246
2247        d['supplies']['blankets'] = 5
2248        self.assertEqual(supplies['blankets'], 5)
2249        self.assertEqual(d['supplies']['blankets'], 5)
2250
2251        d['supplies']['water'] = 7
2252        self.assertEqual(supplies['water'], 7)
2253        self.assertEqual(d['supplies']['water'], 7)
2254
2255        del pets
2256        del supplies
2257        self.assertEqual(d['pets']['ferrets'], 2)
2258        d['supplies']['blankets'] = 11
2259        self.assertEqual(d['supplies']['blankets'], 11)
2260
2261        pets = d['pets']
2262        supplies = d['supplies']
2263        supplies['water'] = 7
2264        self.assertEqual(supplies['water'], 7)
2265        self.assertEqual(d['supplies']['water'], 7)
2266
2267        d.clear()
2268        self.assertEqual(len(d), 0)
2269        self.assertEqual(supplies['water'], 7)
2270        self.assertEqual(pets['hamsters'], 4)
2271
2272        l = self.list([pets, supplies])
2273        l[0]['marmots'] = 1
2274        self.assertEqual(pets['marmots'], 1)
2275        self.assertEqual(l[0]['marmots'], 1)
2276
2277        del pets
2278        del supplies
2279        self.assertEqual(l[0]['marmots'], 1)
2280
2281        outer = self.list([[88, 99], l])
2282        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2283        self.assertEqual(outer[-1][-1]['feed'], 3)
2284
2285    def test_namespace(self):
2286        n = self.Namespace()
2287        n.name = 'Bob'
2288        n.job = 'Builder'
2289        n._hidden = 'hidden'
2290        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2291        del n.job
2292        self.assertEqual(str(n), "Namespace(name='Bob')")
2293        self.assertTrue(hasattr(n, 'name'))
2294        self.assertTrue(not hasattr(n, 'job'))
2295
2296#
2297#
2298#
2299
2300def sqr(x, wait=0.0):
2301    time.sleep(wait)
2302    return x*x
2303
2304def mul(x, y):
2305    return x*y
2306
2307def raise_large_valuerror(wait):
2308    time.sleep(wait)
2309    raise ValueError("x" * 1024**2)
2310
2311def identity(x):
2312    return x
2313
2314class CountedObject(object):
2315    n_instances = 0
2316
2317    def __new__(cls):
2318        cls.n_instances += 1
2319        return object.__new__(cls)
2320
2321    def __del__(self):
2322        type(self).n_instances -= 1
2323
2324class SayWhenError(ValueError): pass
2325
2326def exception_throwing_generator(total, when):
2327    if when == -1:
2328        raise SayWhenError("Somebody said when")
2329    for i in range(total):
2330        if i == when:
2331            raise SayWhenError("Somebody said when")
2332        yield i
2333
2334
2335class _TestPool(BaseTestCase):
2336
2337    @classmethod
2338    def setUpClass(cls):
2339        super().setUpClass()
2340        cls.pool = cls.Pool(4)
2341
2342    @classmethod
2343    def tearDownClass(cls):
2344        cls.pool.terminate()
2345        cls.pool.join()
2346        cls.pool = None
2347        super().tearDownClass()
2348
2349    def test_apply(self):
2350        papply = self.pool.apply
2351        self.assertEqual(papply(sqr, (5,)), sqr(5))
2352        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2353
2354    def test_map(self):
2355        pmap = self.pool.map
2356        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2357        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2358                         list(map(sqr, list(range(100)))))
2359
2360    def test_starmap(self):
2361        psmap = self.pool.starmap
2362        tuples = list(zip(range(10), range(9,-1, -1)))
2363        self.assertEqual(psmap(mul, tuples),
2364                         list(itertools.starmap(mul, tuples)))
2365        tuples = list(zip(range(100), range(99,-1, -1)))
2366        self.assertEqual(psmap(mul, tuples, chunksize=20),
2367                         list(itertools.starmap(mul, tuples)))
2368
2369    def test_starmap_async(self):
2370        tuples = list(zip(range(100), range(99,-1, -1)))
2371        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2372                         list(itertools.starmap(mul, tuples)))
2373
2374    def test_map_async(self):
2375        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2376                         list(map(sqr, list(range(10)))))
2377
2378    def test_map_async_callbacks(self):
2379        call_args = self.manager.list() if self.TYPE == 'manager' else []
2380        self.pool.map_async(int, ['1'],
2381                            callback=call_args.append,
2382                            error_callback=call_args.append).wait()
2383        self.assertEqual(1, len(call_args))
2384        self.assertEqual([1], call_args[0])
2385        self.pool.map_async(int, ['a'],
2386                            callback=call_args.append,
2387                            error_callback=call_args.append).wait()
2388        self.assertEqual(2, len(call_args))
2389        self.assertIsInstance(call_args[1], ValueError)
2390
2391    def test_map_unplicklable(self):
2392        # Issue #19425 -- failure to pickle should not cause a hang
2393        if self.TYPE == 'threads':
2394            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2395        class A(object):
2396            def __reduce__(self):
2397                raise RuntimeError('cannot pickle')
2398        with self.assertRaises(RuntimeError):
2399            self.pool.map(sqr, [A()]*10)
2400
2401    def test_map_chunksize(self):
2402        try:
2403            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2404        except multiprocessing.TimeoutError:
2405            self.fail("pool.map_async with chunksize stalled on null list")
2406
2407    def test_map_handle_iterable_exception(self):
2408        if self.TYPE == 'manager':
2409            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2410
2411        # SayWhenError seen at the very first of the iterable
2412        with self.assertRaises(SayWhenError):
2413            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2414        # again, make sure it's reentrant
2415        with self.assertRaises(SayWhenError):
2416            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2417
2418        with self.assertRaises(SayWhenError):
2419            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2420
2421        class SpecialIterable:
2422            def __iter__(self):
2423                return self
2424            def __next__(self):
2425                raise SayWhenError
2426            def __len__(self):
2427                return 1
2428        with self.assertRaises(SayWhenError):
2429            self.pool.map(sqr, SpecialIterable(), 1)
2430        with self.assertRaises(SayWhenError):
2431            self.pool.map(sqr, SpecialIterable(), 1)
2432
2433    def test_async(self):
2434        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2435        get = TimingWrapper(res.get)
2436        self.assertEqual(get(), 49)
2437        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2438
2439    def test_async_timeout(self):
2440        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2441        get = TimingWrapper(res.get)
2442        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2443        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2444
2445    def test_imap(self):
2446        it = self.pool.imap(sqr, list(range(10)))
2447        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2448
2449        it = self.pool.imap(sqr, list(range(10)))
2450        for i in range(10):
2451            self.assertEqual(next(it), i*i)
2452        self.assertRaises(StopIteration, it.__next__)
2453
2454        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2455        for i in range(1000):
2456            self.assertEqual(next(it), i*i)
2457        self.assertRaises(StopIteration, it.__next__)
2458
2459    def test_imap_handle_iterable_exception(self):
2460        if self.TYPE == 'manager':
2461            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2462
2463        # SayWhenError seen at the very first of the iterable
2464        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2465        self.assertRaises(SayWhenError, it.__next__)
2466        # again, make sure it's reentrant
2467        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2468        self.assertRaises(SayWhenError, it.__next__)
2469
2470        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2471        for i in range(3):
2472            self.assertEqual(next(it), i*i)
2473        self.assertRaises(SayWhenError, it.__next__)
2474
2475        # SayWhenError seen at start of problematic chunk's results
2476        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2477        for i in range(6):
2478            self.assertEqual(next(it), i*i)
2479        self.assertRaises(SayWhenError, it.__next__)
2480        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2481        for i in range(4):
2482            self.assertEqual(next(it), i*i)
2483        self.assertRaises(SayWhenError, it.__next__)
2484
2485    def test_imap_unordered(self):
2486        it = self.pool.imap_unordered(sqr, list(range(10)))
2487        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2488
2489        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2490        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2491
2492    def test_imap_unordered_handle_iterable_exception(self):
2493        if self.TYPE == 'manager':
2494            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2495
2496        # SayWhenError seen at the very first of the iterable
2497        it = self.pool.imap_unordered(sqr,
2498                                      exception_throwing_generator(1, -1),
2499                                      1)
2500        self.assertRaises(SayWhenError, it.__next__)
2501        # again, make sure it's reentrant
2502        it = self.pool.imap_unordered(sqr,
2503                                      exception_throwing_generator(1, -1),
2504                                      1)
2505        self.assertRaises(SayWhenError, it.__next__)
2506
2507        it = self.pool.imap_unordered(sqr,
2508                                      exception_throwing_generator(10, 3),
2509                                      1)
2510        expected_values = list(map(sqr, list(range(10))))
2511        with self.assertRaises(SayWhenError):
2512            # imap_unordered makes it difficult to anticipate the SayWhenError
2513            for i in range(10):
2514                value = next(it)
2515                self.assertIn(value, expected_values)
2516                expected_values.remove(value)
2517
2518        it = self.pool.imap_unordered(sqr,
2519                                      exception_throwing_generator(20, 7),
2520                                      2)
2521        expected_values = list(map(sqr, list(range(20))))
2522        with self.assertRaises(SayWhenError):
2523            for i in range(20):
2524                value = next(it)
2525                self.assertIn(value, expected_values)
2526                expected_values.remove(value)
2527
2528    def test_make_pool(self):
2529        expected_error = (RemoteError if self.TYPE == 'manager'
2530                          else ValueError)
2531
2532        self.assertRaises(expected_error, self.Pool, -1)
2533        self.assertRaises(expected_error, self.Pool, 0)
2534
2535        if self.TYPE != 'manager':
2536            p = self.Pool(3)
2537            try:
2538                self.assertEqual(3, len(p._pool))
2539            finally:
2540                p.close()
2541                p.join()
2542
2543    def test_terminate(self):
2544        result = self.pool.map_async(
2545            time.sleep, [0.1 for i in range(10000)], chunksize=1
2546            )
2547        self.pool.terminate()
2548        join = TimingWrapper(self.pool.join)
2549        join()
2550        # Sanity check the pool didn't wait for all tasks to finish
2551        self.assertLess(join.elapsed, 2.0)
2552
2553    def test_empty_iterable(self):
2554        # See Issue 12157
2555        p = self.Pool(1)
2556
2557        self.assertEqual(p.map(sqr, []), [])
2558        self.assertEqual(list(p.imap(sqr, [])), [])
2559        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2560        self.assertEqual(p.map_async(sqr, []).get(), [])
2561
2562        p.close()
2563        p.join()
2564
2565    def test_context(self):
2566        if self.TYPE == 'processes':
2567            L = list(range(10))
2568            expected = [sqr(i) for i in L]
2569            with self.Pool(2) as p:
2570                r = p.map_async(sqr, L)
2571                self.assertEqual(r.get(), expected)
2572            p.join()
2573            self.assertRaises(ValueError, p.map_async, sqr, L)
2574
2575    @classmethod
2576    def _test_traceback(cls):
2577        raise RuntimeError(123) # some comment
2578
2579    def test_traceback(self):
2580        # We want ensure that the traceback from the child process is
2581        # contained in the traceback raised in the main process.
2582        if self.TYPE == 'processes':
2583            with self.Pool(1) as p:
2584                try:
2585                    p.apply(self._test_traceback)
2586                except Exception as e:
2587                    exc = e
2588                else:
2589                    self.fail('expected RuntimeError')
2590            p.join()
2591            self.assertIs(type(exc), RuntimeError)
2592            self.assertEqual(exc.args, (123,))
2593            cause = exc.__cause__
2594            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2595            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2596
2597            with test.support.captured_stderr() as f1:
2598                try:
2599                    raise exc
2600                except RuntimeError:
2601                    sys.excepthook(*sys.exc_info())
2602            self.assertIn('raise RuntimeError(123) # some comment',
2603                          f1.getvalue())
2604            # _helper_reraises_exception should not make the error
2605            # a remote exception
2606            with self.Pool(1) as p:
2607                try:
2608                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2609                except Exception as e:
2610                    exc = e
2611                else:
2612                    self.fail('expected SayWhenError')
2613                self.assertIs(type(exc), SayWhenError)
2614                self.assertIs(exc.__cause__, None)
2615            p.join()
2616
2617    @classmethod
2618    def _test_wrapped_exception(cls):
2619        raise RuntimeError('foo')
2620
2621    def test_wrapped_exception(self):
2622        # Issue #20980: Should not wrap exception when using thread pool
2623        with self.Pool(1) as p:
2624            with self.assertRaises(RuntimeError):
2625                p.apply(self._test_wrapped_exception)
2626        p.join()
2627
2628    def test_map_no_failfast(self):
2629        # Issue #23992: the fail-fast behaviour when an exception is raised
2630        # during map() would make Pool.join() deadlock, because a worker
2631        # process would fill the result queue (after the result handler thread
2632        # terminated, hence not draining it anymore).
2633
2634        t_start = time.monotonic()
2635
2636        with self.assertRaises(ValueError):
2637            with self.Pool(2) as p:
2638                try:
2639                    p.map(raise_large_valuerror, [0, 1])
2640                finally:
2641                    time.sleep(0.5)
2642                    p.close()
2643                    p.join()
2644
2645        # check that we indeed waited for all jobs
2646        self.assertGreater(time.monotonic() - t_start, 0.9)
2647
2648    def test_release_task_refs(self):
2649        # Issue #29861: task arguments and results should not be kept
2650        # alive after we are done with them.
2651        objs = [CountedObject() for i in range(10)]
2652        refs = [weakref.ref(o) for o in objs]
2653        self.pool.map(identity, objs)
2654
2655        del objs
2656        time.sleep(DELTA)  # let threaded cleanup code run
2657        self.assertEqual(set(wr() for wr in refs), {None})
2658        # With a process pool, copies of the objects are returned, check
2659        # they were released too.
2660        self.assertEqual(CountedObject.n_instances, 0)
2661
2662    def test_enter(self):
2663        if self.TYPE == 'manager':
2664            self.skipTest("test not applicable to manager")
2665
2666        pool = self.Pool(1)
2667        with pool:
2668            pass
2669            # call pool.terminate()
2670        # pool is no longer running
2671
2672        with self.assertRaises(ValueError):
2673            # bpo-35477: pool.__enter__() fails if the pool is not running
2674            with pool:
2675                pass
2676        pool.join()
2677
2678    def test_resource_warning(self):
2679        if self.TYPE == 'manager':
2680            self.skipTest("test not applicable to manager")
2681
2682        pool = self.Pool(1)
2683        pool.terminate()
2684        pool.join()
2685
2686        # force state to RUN to emit ResourceWarning in __del__()
2687        pool._state = multiprocessing.pool.RUN
2688
2689        with support.check_warnings(('unclosed running multiprocessing pool',
2690                                     ResourceWarning)):
2691            pool = None
2692            support.gc_collect()
2693
2694def raising():
2695    raise KeyError("key")
2696
2697def unpickleable_result():
2698    return lambda: 42
2699
2700class _TestPoolWorkerErrors(BaseTestCase):
2701    ALLOWED_TYPES = ('processes', )
2702
2703    def test_async_error_callback(self):
2704        p = multiprocessing.Pool(2)
2705
2706        scratchpad = [None]
2707        def errback(exc):
2708            scratchpad[0] = exc
2709
2710        res = p.apply_async(raising, error_callback=errback)
2711        self.assertRaises(KeyError, res.get)
2712        self.assertTrue(scratchpad[0])
2713        self.assertIsInstance(scratchpad[0], KeyError)
2714
2715        p.close()
2716        p.join()
2717
2718    def test_unpickleable_result(self):
2719        from multiprocessing.pool import MaybeEncodingError
2720        p = multiprocessing.Pool(2)
2721
2722        # Make sure we don't lose pool processes because of encoding errors.
2723        for iteration in range(20):
2724
2725            scratchpad = [None]
2726            def errback(exc):
2727                scratchpad[0] = exc
2728
2729            res = p.apply_async(unpickleable_result, error_callback=errback)
2730            self.assertRaises(MaybeEncodingError, res.get)
2731            wrapped = scratchpad[0]
2732            self.assertTrue(wrapped)
2733            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2734            self.assertIsNotNone(wrapped.exc)
2735            self.assertIsNotNone(wrapped.value)
2736
2737        p.close()
2738        p.join()
2739
2740class _TestPoolWorkerLifetime(BaseTestCase):
2741    ALLOWED_TYPES = ('processes', )
2742
2743    def test_pool_worker_lifetime(self):
2744        p = multiprocessing.Pool(3, maxtasksperchild=10)
2745        self.assertEqual(3, len(p._pool))
2746        origworkerpids = [w.pid for w in p._pool]
2747        # Run many tasks so each worker gets replaced (hopefully)
2748        results = []
2749        for i in range(100):
2750            results.append(p.apply_async(sqr, (i, )))
2751        # Fetch the results and verify we got the right answers,
2752        # also ensuring all the tasks have completed.
2753        for (j, res) in enumerate(results):
2754            self.assertEqual(res.get(), sqr(j))
2755        # Refill the pool
2756        p._repopulate_pool()
2757        # Wait until all workers are alive
2758        # (countdown * DELTA = 5 seconds max startup process time)
2759        countdown = 50
2760        while countdown and not all(w.is_alive() for w in p._pool):
2761            countdown -= 1
2762            time.sleep(DELTA)
2763        finalworkerpids = [w.pid for w in p._pool]
2764        # All pids should be assigned.  See issue #7805.
2765        self.assertNotIn(None, origworkerpids)
2766        self.assertNotIn(None, finalworkerpids)
2767        # Finally, check that the worker pids have changed
2768        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2769        p.close()
2770        p.join()
2771
2772    def test_pool_worker_lifetime_early_close(self):
2773        # Issue #10332: closing a pool whose workers have limited lifetimes
2774        # before all the tasks completed would make join() hang.
2775        p = multiprocessing.Pool(3, maxtasksperchild=1)
2776        results = []
2777        for i in range(6):
2778            results.append(p.apply_async(sqr, (i, 0.3)))
2779        p.close()
2780        p.join()
2781        # check the results
2782        for (j, res) in enumerate(results):
2783            self.assertEqual(res.get(), sqr(j))
2784
2785    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
2786        # tests cases against bpo-38744 and bpo-39360
2787        cmd = '''if 1:
2788            from multiprocessing import Pool
2789            problem = None
2790            class A:
2791                def __init__(self):
2792                    self.pool = Pool(processes=1)
2793            def test():
2794                global problem
2795                problem = A()
2796                problem.pool.map(float, tuple(range(10)))
2797            if __name__ == "__main__":
2798                test()
2799        '''
2800        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
2801        self.assertEqual(rc, 0)
2802
2803#
2804# Test of creating a customized manager class
2805#
2806
2807from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2808
2809class FooBar(object):
2810    def f(self):
2811        return 'f()'
2812    def g(self):
2813        raise ValueError
2814    def _h(self):
2815        return '_h()'
2816
2817def baz():
2818    for i in range(10):
2819        yield i*i
2820
2821class IteratorProxy(BaseProxy):
2822    _exposed_ = ('__next__',)
2823    def __iter__(self):
2824        return self
2825    def __next__(self):
2826        return self._callmethod('__next__')
2827
2828class MyManager(BaseManager):
2829    pass
2830
2831MyManager.register('Foo', callable=FooBar)
2832MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2833MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2834
2835
2836class _TestMyManager(BaseTestCase):
2837
2838    ALLOWED_TYPES = ('manager',)
2839
2840    def test_mymanager(self):
2841        manager = MyManager()
2842        manager.start()
2843        self.common(manager)
2844        manager.shutdown()
2845
2846        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2847        # to the manager process if it takes longer than 1 second to stop,
2848        # which happens on slow buildbots.
2849        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2850
2851    def test_mymanager_context(self):
2852        with MyManager() as manager:
2853            self.common(manager)
2854        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2855        # to the manager process if it takes longer than 1 second to stop,
2856        # which happens on slow buildbots.
2857        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2858
2859    def test_mymanager_context_prestarted(self):
2860        manager = MyManager()
2861        manager.start()
2862        with manager:
2863            self.common(manager)
2864        self.assertEqual(manager._process.exitcode, 0)
2865
2866    def common(self, manager):
2867        foo = manager.Foo()
2868        bar = manager.Bar()
2869        baz = manager.baz()
2870
2871        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2872        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2873
2874        self.assertEqual(foo_methods, ['f', 'g'])
2875        self.assertEqual(bar_methods, ['f', '_h'])
2876
2877        self.assertEqual(foo.f(), 'f()')
2878        self.assertRaises(ValueError, foo.g)
2879        self.assertEqual(foo._callmethod('f'), 'f()')
2880        self.assertRaises(RemoteError, foo._callmethod, '_h')
2881
2882        self.assertEqual(bar.f(), 'f()')
2883        self.assertEqual(bar._h(), '_h()')
2884        self.assertEqual(bar._callmethod('f'), 'f()')
2885        self.assertEqual(bar._callmethod('_h'), '_h()')
2886
2887        self.assertEqual(list(baz), [i*i for i in range(10)])
2888
2889
2890#
2891# Test of connecting to a remote server and using xmlrpclib for serialization
2892#
2893
2894_queue = pyqueue.Queue()
2895def get_queue():
2896    return _queue
2897
2898class QueueManager(BaseManager):
2899    '''manager class used by server process'''
2900QueueManager.register('get_queue', callable=get_queue)
2901
2902class QueueManager2(BaseManager):
2903    '''manager class which specifies the same interface as QueueManager'''
2904QueueManager2.register('get_queue')
2905
2906
2907SERIALIZER = 'xmlrpclib'
2908
2909class _TestRemoteManager(BaseTestCase):
2910
2911    ALLOWED_TYPES = ('manager',)
2912    values = ['hello world', None, True, 2.25,
2913              'hall\xe5 v\xe4rlden',
2914              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2915              b'hall\xe5 v\xe4rlden',
2916             ]
2917    result = values[:]
2918
2919    @classmethod
2920    def _putter(cls, address, authkey):
2921        manager = QueueManager2(
2922            address=address, authkey=authkey, serializer=SERIALIZER
2923            )
2924        manager.connect()
2925        queue = manager.get_queue()
2926        # Note that xmlrpclib will deserialize object as a list not a tuple
2927        queue.put(tuple(cls.values))
2928
2929    def test_remote(self):
2930        authkey = os.urandom(32)
2931
2932        manager = QueueManager(
2933            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER
2934            )
2935        manager.start()
2936        self.addCleanup(manager.shutdown)
2937
2938        p = self.Process(target=self._putter, args=(manager.address, authkey))
2939        p.daemon = True
2940        p.start()
2941
2942        manager2 = QueueManager2(
2943            address=manager.address, authkey=authkey, serializer=SERIALIZER
2944            )
2945        manager2.connect()
2946        queue = manager2.get_queue()
2947
2948        self.assertEqual(queue.get(), self.result)
2949
2950        # Because we are using xmlrpclib for serialization instead of
2951        # pickle this will cause a serialization error.
2952        self.assertRaises(Exception, queue.put, time.sleep)
2953
2954        # Make queue finalizer run before the server is stopped
2955        del queue
2956
2957
2958@hashlib_helper.requires_hashdigest('md5')
2959class _TestManagerRestart(BaseTestCase):
2960
2961    @classmethod
2962    def _putter(cls, address, authkey):
2963        manager = QueueManager(
2964            address=address, authkey=authkey, serializer=SERIALIZER)
2965        manager.connect()
2966        queue = manager.get_queue()
2967        queue.put('hello world')
2968
2969    def test_rapid_restart(self):
2970        authkey = os.urandom(32)
2971        manager = QueueManager(
2972            address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2973        try:
2974            srvr = manager.get_server()
2975            addr = srvr.address
2976            # Close the connection.Listener socket which gets opened as a part
2977            # of manager.get_server(). It's not needed for the test.
2978            srvr.listener.close()
2979            manager.start()
2980
2981            p = self.Process(target=self._putter, args=(manager.address, authkey))
2982            p.start()
2983            p.join()
2984            queue = manager.get_queue()
2985            self.assertEqual(queue.get(), 'hello world')
2986            del queue
2987        finally:
2988            if hasattr(manager, "shutdown"):
2989                manager.shutdown()
2990
2991        manager = QueueManager(
2992            address=addr, authkey=authkey, serializer=SERIALIZER)
2993        try:
2994            manager.start()
2995            self.addCleanup(manager.shutdown)
2996        except OSError as e:
2997            if e.errno != errno.EADDRINUSE:
2998                raise
2999            # Retry after some time, in case the old socket was lingering
3000            # (sporadic failure on buildbots)
3001            time.sleep(1.0)
3002            manager = QueueManager(
3003                address=addr, authkey=authkey, serializer=SERIALIZER)
3004            if hasattr(manager, "shutdown"):
3005                self.addCleanup(manager.shutdown)
3006
3007#
3008#
3009#
3010
3011SENTINEL = latin('')
3012
3013class _TestConnection(BaseTestCase):
3014
3015    ALLOWED_TYPES = ('processes', 'threads')
3016
3017    @classmethod
3018    def _echo(cls, conn):
3019        for msg in iter(conn.recv_bytes, SENTINEL):
3020            conn.send_bytes(msg)
3021        conn.close()
3022
3023    def test_connection(self):
3024        conn, child_conn = self.Pipe()
3025
3026        p = self.Process(target=self._echo, args=(child_conn,))
3027        p.daemon = True
3028        p.start()
3029
3030        seq = [1, 2.25, None]
3031        msg = latin('hello world')
3032        longmsg = msg * 10
3033        arr = array.array('i', list(range(4)))
3034
3035        if self.TYPE == 'processes':
3036            self.assertEqual(type(conn.fileno()), int)
3037
3038        self.assertEqual(conn.send(seq), None)
3039        self.assertEqual(conn.recv(), seq)
3040
3041        self.assertEqual(conn.send_bytes(msg), None)
3042        self.assertEqual(conn.recv_bytes(), msg)
3043
3044        if self.TYPE == 'processes':
3045            buffer = array.array('i', [0]*10)
3046            expected = list(arr) + [0] * (10 - len(arr))
3047            self.assertEqual(conn.send_bytes(arr), None)
3048            self.assertEqual(conn.recv_bytes_into(buffer),
3049                             len(arr) * buffer.itemsize)
3050            self.assertEqual(list(buffer), expected)
3051
3052            buffer = array.array('i', [0]*10)
3053            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3054            self.assertEqual(conn.send_bytes(arr), None)
3055            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3056                             len(arr) * buffer.itemsize)
3057            self.assertEqual(list(buffer), expected)
3058
3059            buffer = bytearray(latin(' ' * 40))
3060            self.assertEqual(conn.send_bytes(longmsg), None)
3061            try:
3062                res = conn.recv_bytes_into(buffer)
3063            except multiprocessing.BufferTooShort as e:
3064                self.assertEqual(e.args, (longmsg,))
3065            else:
3066                self.fail('expected BufferTooShort, got %s' % res)
3067
3068        poll = TimingWrapper(conn.poll)
3069
3070        self.assertEqual(poll(), False)
3071        self.assertTimingAlmostEqual(poll.elapsed, 0)
3072
3073        self.assertEqual(poll(-1), False)
3074        self.assertTimingAlmostEqual(poll.elapsed, 0)
3075
3076        self.assertEqual(poll(TIMEOUT1), False)
3077        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3078
3079        conn.send(None)
3080        time.sleep(.1)
3081
3082        self.assertEqual(poll(TIMEOUT1), True)
3083        self.assertTimingAlmostEqual(poll.elapsed, 0)
3084
3085        self.assertEqual(conn.recv(), None)
3086
3087        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3088        conn.send_bytes(really_big_msg)
3089        self.assertEqual(conn.recv_bytes(), really_big_msg)
3090
3091        conn.send_bytes(SENTINEL)                          # tell child to quit
3092        child_conn.close()
3093
3094        if self.TYPE == 'processes':
3095            self.assertEqual(conn.readable, True)
3096            self.assertEqual(conn.writable, True)
3097            self.assertRaises(EOFError, conn.recv)
3098            self.assertRaises(EOFError, conn.recv_bytes)
3099
3100        p.join()
3101
3102    def test_duplex_false(self):
3103        reader, writer = self.Pipe(duplex=False)
3104        self.assertEqual(writer.send(1), None)
3105        self.assertEqual(reader.recv(), 1)
3106        if self.TYPE == 'processes':
3107            self.assertEqual(reader.readable, True)
3108            self.assertEqual(reader.writable, False)
3109            self.assertEqual(writer.readable, False)
3110            self.assertEqual(writer.writable, True)
3111            self.assertRaises(OSError, reader.send, 2)
3112            self.assertRaises(OSError, writer.recv)
3113            self.assertRaises(OSError, writer.poll)
3114
3115    def test_spawn_close(self):
3116        # We test that a pipe connection can be closed by parent
3117        # process immediately after child is spawned.  On Windows this
3118        # would have sometimes failed on old versions because
3119        # child_conn would be closed before the child got a chance to
3120        # duplicate it.
3121        conn, child_conn = self.Pipe()
3122
3123        p = self.Process(target=self._echo, args=(child_conn,))
3124        p.daemon = True
3125        p.start()
3126        child_conn.close()    # this might complete before child initializes
3127
3128        msg = latin('hello')
3129        conn.send_bytes(msg)
3130        self.assertEqual(conn.recv_bytes(), msg)
3131
3132        conn.send_bytes(SENTINEL)
3133        conn.close()
3134        p.join()
3135
3136    def test_sendbytes(self):
3137        if self.TYPE != 'processes':
3138            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3139
3140        msg = latin('abcdefghijklmnopqrstuvwxyz')
3141        a, b = self.Pipe()
3142
3143        a.send_bytes(msg)
3144        self.assertEqual(b.recv_bytes(), msg)
3145
3146        a.send_bytes(msg, 5)
3147        self.assertEqual(b.recv_bytes(), msg[5:])
3148
3149        a.send_bytes(msg, 7, 8)
3150        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3151
3152        a.send_bytes(msg, 26)
3153        self.assertEqual(b.recv_bytes(), latin(''))
3154
3155        a.send_bytes(msg, 26, 0)
3156        self.assertEqual(b.recv_bytes(), latin(''))
3157
3158        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3159
3160        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3161
3162        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3163
3164        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3165
3166        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3167
3168    @classmethod
3169    def _is_fd_assigned(cls, fd):
3170        try:
3171            os.fstat(fd)
3172        except OSError as e:
3173            if e.errno == errno.EBADF:
3174                return False
3175            raise
3176        else:
3177            return True
3178
3179    @classmethod
3180    def _writefd(cls, conn, data, create_dummy_fds=False):
3181        if create_dummy_fds:
3182            for i in range(0, 256):
3183                if not cls._is_fd_assigned(i):
3184                    os.dup2(conn.fileno(), i)
3185        fd = reduction.recv_handle(conn)
3186        if msvcrt:
3187            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3188        os.write(fd, data)
3189        os.close(fd)
3190
3191    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3192    def test_fd_transfer(self):
3193        if self.TYPE != 'processes':
3194            self.skipTest("only makes sense with processes")
3195        conn, child_conn = self.Pipe(duplex=True)
3196
3197        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3198        p.daemon = True
3199        p.start()
3200        self.addCleanup(test.support.unlink, test.support.TESTFN)
3201        with open(test.support.TESTFN, "wb") as f:
3202            fd = f.fileno()
3203            if msvcrt:
3204                fd = msvcrt.get_osfhandle(fd)
3205            reduction.send_handle(conn, fd, p.pid)
3206        p.join()
3207        with open(test.support.TESTFN, "rb") as f:
3208            self.assertEqual(f.read(), b"foo")
3209
3210    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3211    @unittest.skipIf(sys.platform == "win32",
3212                     "test semantics don't make sense on Windows")
3213    @unittest.skipIf(MAXFD <= 256,
3214                     "largest assignable fd number is too small")
3215    @unittest.skipUnless(hasattr(os, "dup2"),
3216                         "test needs os.dup2()")
3217    def test_large_fd_transfer(self):
3218        # With fd > 256 (issue #11657)
3219        if self.TYPE != 'processes':
3220            self.skipTest("only makes sense with processes")
3221        conn, child_conn = self.Pipe(duplex=True)
3222
3223        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3224        p.daemon = True
3225        p.start()
3226        self.addCleanup(test.support.unlink, test.support.TESTFN)
3227        with open(test.support.TESTFN, "wb") as f:
3228            fd = f.fileno()
3229            for newfd in range(256, MAXFD):
3230                if not self._is_fd_assigned(newfd):
3231                    break
3232            else:
3233                self.fail("could not find an unassigned large file descriptor")
3234            os.dup2(fd, newfd)
3235            try:
3236                reduction.send_handle(conn, newfd, p.pid)
3237            finally:
3238                os.close(newfd)
3239        p.join()
3240        with open(test.support.TESTFN, "rb") as f:
3241            self.assertEqual(f.read(), b"bar")
3242
3243    @classmethod
3244    def _send_data_without_fd(self, conn):
3245        os.write(conn.fileno(), b"\0")
3246
3247    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3248    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3249    def test_missing_fd_transfer(self):
3250        # Check that exception is raised when received data is not
3251        # accompanied by a file descriptor in ancillary data.
3252        if self.TYPE != 'processes':
3253            self.skipTest("only makes sense with processes")
3254        conn, child_conn = self.Pipe(duplex=True)
3255
3256        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3257        p.daemon = True
3258        p.start()
3259        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3260        p.join()
3261
3262    def test_context(self):
3263        a, b = self.Pipe()
3264
3265        with a, b:
3266            a.send(1729)
3267            self.assertEqual(b.recv(), 1729)
3268            if self.TYPE == 'processes':
3269                self.assertFalse(a.closed)
3270                self.assertFalse(b.closed)
3271
3272        if self.TYPE == 'processes':
3273            self.assertTrue(a.closed)
3274            self.assertTrue(b.closed)
3275            self.assertRaises(OSError, a.recv)
3276            self.assertRaises(OSError, b.recv)
3277
3278class _TestListener(BaseTestCase):
3279
3280    ALLOWED_TYPES = ('processes',)
3281
3282    def test_multiple_bind(self):
3283        for family in self.connection.families:
3284            l = self.connection.Listener(family=family)
3285            self.addCleanup(l.close)
3286            self.assertRaises(OSError, self.connection.Listener,
3287                              l.address, family)
3288
3289    def test_context(self):
3290        with self.connection.Listener() as l:
3291            with self.connection.Client(l.address) as c:
3292                with l.accept() as d:
3293                    c.send(1729)
3294                    self.assertEqual(d.recv(), 1729)
3295
3296        if self.TYPE == 'processes':
3297            self.assertRaises(OSError, l.accept)
3298
3299    @unittest.skipUnless(util.abstract_sockets_supported,
3300                         "test needs abstract socket support")
3301    def test_abstract_socket(self):
3302        with self.connection.Listener("\0something") as listener:
3303            with self.connection.Client(listener.address) as client:
3304                with listener.accept() as d:
3305                    client.send(1729)
3306                    self.assertEqual(d.recv(), 1729)
3307
3308        if self.TYPE == 'processes':
3309            self.assertRaises(OSError, listener.accept)
3310
3311
3312class _TestListenerClient(BaseTestCase):
3313
3314    ALLOWED_TYPES = ('processes', 'threads')
3315
3316    @classmethod
3317    def _test(cls, address):
3318        conn = cls.connection.Client(address)
3319        conn.send('hello')
3320        conn.close()
3321
3322    def test_listener_client(self):
3323        for family in self.connection.families:
3324            l = self.connection.Listener(family=family)
3325            p = self.Process(target=self._test, args=(l.address,))
3326            p.daemon = True
3327            p.start()
3328            conn = l.accept()
3329            self.assertEqual(conn.recv(), 'hello')
3330            p.join()
3331            l.close()
3332
3333    def test_issue14725(self):
3334        l = self.connection.Listener()
3335        p = self.Process(target=self._test, args=(l.address,))
3336        p.daemon = True
3337        p.start()
3338        time.sleep(1)
3339        # On Windows the client process should by now have connected,
3340        # written data and closed the pipe handle by now.  This causes
3341        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3342        # 14725.
3343        conn = l.accept()
3344        self.assertEqual(conn.recv(), 'hello')
3345        conn.close()
3346        p.join()
3347        l.close()
3348
3349    def test_issue16955(self):
3350        for fam in self.connection.families:
3351            l = self.connection.Listener(family=fam)
3352            c = self.connection.Client(l.address)
3353            a = l.accept()
3354            a.send_bytes(b"hello")
3355            self.assertTrue(c.poll(1))
3356            a.close()
3357            c.close()
3358            l.close()
3359
3360class _TestPoll(BaseTestCase):
3361
3362    ALLOWED_TYPES = ('processes', 'threads')
3363
3364    def test_empty_string(self):
3365        a, b = self.Pipe()
3366        self.assertEqual(a.poll(), False)
3367        b.send_bytes(b'')
3368        self.assertEqual(a.poll(), True)
3369        self.assertEqual(a.poll(), True)
3370
3371    @classmethod
3372    def _child_strings(cls, conn, strings):
3373        for s in strings:
3374            time.sleep(0.1)
3375            conn.send_bytes(s)
3376        conn.close()
3377
3378    def test_strings(self):
3379        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3380        a, b = self.Pipe()
3381        p = self.Process(target=self._child_strings, args=(b, strings))
3382        p.start()
3383
3384        for s in strings:
3385            for i in range(200):
3386                if a.poll(0.01):
3387                    break
3388            x = a.recv_bytes()
3389            self.assertEqual(s, x)
3390
3391        p.join()
3392
3393    @classmethod
3394    def _child_boundaries(cls, r):
3395        # Polling may "pull" a message in to the child process, but we
3396        # don't want it to pull only part of a message, as that would
3397        # corrupt the pipe for any other processes which might later
3398        # read from it.
3399        r.poll(5)
3400
3401    def test_boundaries(self):
3402        r, w = self.Pipe(False)
3403        p = self.Process(target=self._child_boundaries, args=(r,))
3404        p.start()
3405        time.sleep(2)
3406        L = [b"first", b"second"]
3407        for obj in L:
3408            w.send_bytes(obj)
3409        w.close()
3410        p.join()
3411        self.assertIn(r.recv_bytes(), L)
3412
3413    @classmethod
3414    def _child_dont_merge(cls, b):
3415        b.send_bytes(b'a')
3416        b.send_bytes(b'b')
3417        b.send_bytes(b'cd')
3418
3419    def test_dont_merge(self):
3420        a, b = self.Pipe()
3421        self.assertEqual(a.poll(0.0), False)
3422        self.assertEqual(a.poll(0.1), False)
3423
3424        p = self.Process(target=self._child_dont_merge, args=(b,))
3425        p.start()
3426
3427        self.assertEqual(a.recv_bytes(), b'a')
3428        self.assertEqual(a.poll(1.0), True)
3429        self.assertEqual(a.poll(1.0), True)
3430        self.assertEqual(a.recv_bytes(), b'b')
3431        self.assertEqual(a.poll(1.0), True)
3432        self.assertEqual(a.poll(1.0), True)
3433        self.assertEqual(a.poll(0.0), True)
3434        self.assertEqual(a.recv_bytes(), b'cd')
3435
3436        p.join()
3437
3438#
3439# Test of sending connection and socket objects between processes
3440#
3441
3442@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3443@hashlib_helper.requires_hashdigest('md5')
3444class _TestPicklingConnections(BaseTestCase):
3445
3446    ALLOWED_TYPES = ('processes',)
3447
3448    @classmethod
3449    def tearDownClass(cls):
3450        from multiprocessing import resource_sharer
3451        resource_sharer.stop(timeout=support.LONG_TIMEOUT)
3452
3453    @classmethod
3454    def _listener(cls, conn, families):
3455        for fam in families:
3456            l = cls.connection.Listener(family=fam)
3457            conn.send(l.address)
3458            new_conn = l.accept()
3459            conn.send(new_conn)
3460            new_conn.close()
3461            l.close()
3462
3463        l = socket.create_server((socket_helper.HOST, 0))
3464        conn.send(l.getsockname())
3465        new_conn, addr = l.accept()
3466        conn.send(new_conn)
3467        new_conn.close()
3468        l.close()
3469
3470        conn.recv()
3471
3472    @classmethod
3473    def _remote(cls, conn):
3474        for (address, msg) in iter(conn.recv, None):
3475            client = cls.connection.Client(address)
3476            client.send(msg.upper())
3477            client.close()
3478
3479        address, msg = conn.recv()
3480        client = socket.socket()
3481        client.connect(address)
3482        client.sendall(msg.upper())
3483        client.close()
3484
3485        conn.close()
3486
3487    def test_pickling(self):
3488        families = self.connection.families
3489
3490        lconn, lconn0 = self.Pipe()
3491        lp = self.Process(target=self._listener, args=(lconn0, families))
3492        lp.daemon = True
3493        lp.start()
3494        lconn0.close()
3495
3496        rconn, rconn0 = self.Pipe()
3497        rp = self.Process(target=self._remote, args=(rconn0,))
3498        rp.daemon = True
3499        rp.start()
3500        rconn0.close()
3501
3502        for fam in families:
3503            msg = ('This connection uses family %s' % fam).encode('ascii')
3504            address = lconn.recv()
3505            rconn.send((address, msg))
3506            new_conn = lconn.recv()
3507            self.assertEqual(new_conn.recv(), msg.upper())
3508
3509        rconn.send(None)
3510
3511        msg = latin('This connection uses a normal socket')
3512        address = lconn.recv()
3513        rconn.send((address, msg))
3514        new_conn = lconn.recv()
3515        buf = []
3516        while True:
3517            s = new_conn.recv(100)
3518            if not s:
3519                break
3520            buf.append(s)
3521        buf = b''.join(buf)
3522        self.assertEqual(buf, msg.upper())
3523        new_conn.close()
3524
3525        lconn.send(None)
3526
3527        rconn.close()
3528        lconn.close()
3529
3530        lp.join()
3531        rp.join()
3532
3533    @classmethod
3534    def child_access(cls, conn):
3535        w = conn.recv()
3536        w.send('all is well')
3537        w.close()
3538
3539        r = conn.recv()
3540        msg = r.recv()
3541        conn.send(msg*2)
3542
3543        conn.close()
3544
3545    def test_access(self):
3546        # On Windows, if we do not specify a destination pid when
3547        # using DupHandle then we need to be careful to use the
3548        # correct access flags for DuplicateHandle(), or else
3549        # DupHandle.detach() will raise PermissionError.  For example,
3550        # for a read only pipe handle we should use
3551        # access=FILE_GENERIC_READ.  (Unfortunately
3552        # DUPLICATE_SAME_ACCESS does not work.)
3553        conn, child_conn = self.Pipe()
3554        p = self.Process(target=self.child_access, args=(child_conn,))
3555        p.daemon = True
3556        p.start()
3557        child_conn.close()
3558
3559        r, w = self.Pipe(duplex=False)
3560        conn.send(w)
3561        w.close()
3562        self.assertEqual(r.recv(), 'all is well')
3563        r.close()
3564
3565        r, w = self.Pipe(duplex=False)
3566        conn.send(r)
3567        r.close()
3568        w.send('foobar')
3569        w.close()
3570        self.assertEqual(conn.recv(), 'foobar'*2)
3571
3572        p.join()
3573
3574#
3575#
3576#
3577
3578class _TestHeap(BaseTestCase):
3579
3580    ALLOWED_TYPES = ('processes',)
3581
3582    def setUp(self):
3583        super().setUp()
3584        # Make pristine heap for these tests
3585        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3586        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3587
3588    def tearDown(self):
3589        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3590        super().tearDown()
3591
3592    def test_heap(self):
3593        iterations = 5000
3594        maxblocks = 50
3595        blocks = []
3596
3597        # get the heap object
3598        heap = multiprocessing.heap.BufferWrapper._heap
3599        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3600
3601        # create and destroy lots of blocks of different sizes
3602        for i in range(iterations):
3603            size = int(random.lognormvariate(0, 1) * 1000)
3604            b = multiprocessing.heap.BufferWrapper(size)
3605            blocks.append(b)
3606            if len(blocks) > maxblocks:
3607                i = random.randrange(maxblocks)
3608                del blocks[i]
3609            del b
3610
3611        # verify the state of the heap
3612        with heap._lock:
3613            all = []
3614            free = 0
3615            occupied = 0
3616            for L in list(heap._len_to_seq.values()):
3617                # count all free blocks in arenas
3618                for arena, start, stop in L:
3619                    all.append((heap._arenas.index(arena), start, stop,
3620                                stop-start, 'free'))
3621                    free += (stop-start)
3622            for arena, arena_blocks in heap._allocated_blocks.items():
3623                # count all allocated blocks in arenas
3624                for start, stop in arena_blocks:
3625                    all.append((heap._arenas.index(arena), start, stop,
3626                                stop-start, 'occupied'))
3627                    occupied += (stop-start)
3628
3629            self.assertEqual(free + occupied,
3630                             sum(arena.size for arena in heap._arenas))
3631
3632            all.sort()
3633
3634            for i in range(len(all)-1):
3635                (arena, start, stop) = all[i][:3]
3636                (narena, nstart, nstop) = all[i+1][:3]
3637                if arena != narena:
3638                    # Two different arenas
3639                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3640                    self.assertEqual(nstart, 0)         # first block
3641                else:
3642                    # Same arena: two adjacent blocks
3643                    self.assertEqual(stop, nstart)
3644
3645        # test free'ing all blocks
3646        random.shuffle(blocks)
3647        while blocks:
3648            blocks.pop()
3649
3650        self.assertEqual(heap._n_frees, heap._n_mallocs)
3651        self.assertEqual(len(heap._pending_free_blocks), 0)
3652        self.assertEqual(len(heap._arenas), 0)
3653        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3654        self.assertEqual(len(heap._len_to_seq), 0)
3655
3656    def test_free_from_gc(self):
3657        # Check that freeing of blocks by the garbage collector doesn't deadlock
3658        # (issue #12352).
3659        # Make sure the GC is enabled, and set lower collection thresholds to
3660        # make collections more frequent (and increase the probability of
3661        # deadlock).
3662        if not gc.isenabled():
3663            gc.enable()
3664            self.addCleanup(gc.disable)
3665        thresholds = gc.get_threshold()
3666        self.addCleanup(gc.set_threshold, *thresholds)
3667        gc.set_threshold(10)
3668
3669        # perform numerous block allocations, with cyclic references to make
3670        # sure objects are collected asynchronously by the gc
3671        for i in range(5000):
3672            a = multiprocessing.heap.BufferWrapper(1)
3673            b = multiprocessing.heap.BufferWrapper(1)
3674            # circular references
3675            a.buddy = b
3676            b.buddy = a
3677
3678#
3679#
3680#
3681
3682class _Foo(Structure):
3683    _fields_ = [
3684        ('x', c_int),
3685        ('y', c_double),
3686        ('z', c_longlong,)
3687        ]
3688
3689class _TestSharedCTypes(BaseTestCase):
3690
3691    ALLOWED_TYPES = ('processes',)
3692
3693    def setUp(self):
3694        if not HAS_SHAREDCTYPES:
3695            self.skipTest("requires multiprocessing.sharedctypes")
3696
3697    @classmethod
3698    def _double(cls, x, y, z, foo, arr, string):
3699        x.value *= 2
3700        y.value *= 2
3701        z.value *= 2
3702        foo.x *= 2
3703        foo.y *= 2
3704        string.value *= 2
3705        for i in range(len(arr)):
3706            arr[i] *= 2
3707
3708    def test_sharedctypes(self, lock=False):
3709        x = Value('i', 7, lock=lock)
3710        y = Value(c_double, 1.0/3.0, lock=lock)
3711        z = Value(c_longlong, 2 ** 33, lock=lock)
3712        foo = Value(_Foo, 3, 2, lock=lock)
3713        arr = self.Array('d', list(range(10)), lock=lock)
3714        string = self.Array('c', 20, lock=lock)
3715        string.value = latin('hello')
3716
3717        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3718        p.daemon = True
3719        p.start()
3720        p.join()
3721
3722        self.assertEqual(x.value, 14)
3723        self.assertAlmostEqual(y.value, 2.0/3.0)
3724        self.assertEqual(z.value, 2 ** 34)
3725        self.assertEqual(foo.x, 6)
3726        self.assertAlmostEqual(foo.y, 4.0)
3727        for i in range(10):
3728            self.assertAlmostEqual(arr[i], i*2)
3729        self.assertEqual(string.value, latin('hellohello'))
3730
3731    def test_synchronize(self):
3732        self.test_sharedctypes(lock=True)
3733
3734    def test_copy(self):
3735        foo = _Foo(2, 5.0, 2 ** 33)
3736        bar = copy(foo)
3737        foo.x = 0
3738        foo.y = 0
3739        foo.z = 0
3740        self.assertEqual(bar.x, 2)
3741        self.assertAlmostEqual(bar.y, 5.0)
3742        self.assertEqual(bar.z, 2 ** 33)
3743
3744
3745@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3746@hashlib_helper.requires_hashdigest('md5')
3747class _TestSharedMemory(BaseTestCase):
3748
3749    ALLOWED_TYPES = ('processes',)
3750
3751    @staticmethod
3752    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3753        if isinstance(shmem_name_or_obj, str):
3754            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3755        else:
3756            local_sms = shmem_name_or_obj
3757        local_sms.buf[:len(binary_data)] = binary_data
3758        local_sms.close()
3759
3760    def test_shared_memory_basics(self):
3761        sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
3762        self.addCleanup(sms.unlink)
3763
3764        # Verify attributes are readable.
3765        self.assertEqual(sms.name, 'test01_tsmb')
3766        self.assertGreaterEqual(sms.size, 512)
3767        self.assertGreaterEqual(len(sms.buf), sms.size)
3768
3769        # Modify contents of shared memory segment through memoryview.
3770        sms.buf[0] = 42
3771        self.assertEqual(sms.buf[0], 42)
3772
3773        # Attach to existing shared memory segment.
3774        also_sms = shared_memory.SharedMemory('test01_tsmb')
3775        self.assertEqual(also_sms.buf[0], 42)
3776        also_sms.close()
3777
3778        # Attach to existing shared memory segment but specify a new size.
3779        same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3780        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3781        same_sms.close()
3782
3783        # Creating Shared Memory Segment with -ve size
3784        with self.assertRaises(ValueError):
3785            shared_memory.SharedMemory(create=True, size=-2)
3786
3787        # Attaching Shared Memory Segment without a name
3788        with self.assertRaises(ValueError):
3789            shared_memory.SharedMemory(create=False)
3790
3791        # Test if shared memory segment is created properly,
3792        # when _make_filename returns an existing shared memory segment name
3793        with unittest.mock.patch(
3794            'multiprocessing.shared_memory._make_filename') as mock_make_filename:
3795
3796            NAME_PREFIX = shared_memory._SHM_NAME_PREFIX
3797            names = ['test01_fn', 'test02_fn']
3798            # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary
3799            # because some POSIX compliant systems require name to start with /
3800            names = [NAME_PREFIX + name for name in names]
3801
3802            mock_make_filename.side_effect = names
3803            shm1 = shared_memory.SharedMemory(create=True, size=1)
3804            self.addCleanup(shm1.unlink)
3805            self.assertEqual(shm1._name, names[0])
3806
3807            mock_make_filename.side_effect = names
3808            shm2 = shared_memory.SharedMemory(create=True, size=1)
3809            self.addCleanup(shm2.unlink)
3810            self.assertEqual(shm2._name, names[1])
3811
3812        if shared_memory._USE_POSIX:
3813            # Posix Shared Memory can only be unlinked once.  Here we
3814            # test an implementation detail that is not observed across
3815            # all supported platforms (since WindowsNamedSharedMemory
3816            # manages unlinking on its own and unlink() does nothing).
3817            # True release of shared memory segment does not necessarily
3818            # happen until process exits, depending on the OS platform.
3819            with self.assertRaises(FileNotFoundError):
3820                sms_uno = shared_memory.SharedMemory(
3821                    'test01_dblunlink',
3822                    create=True,
3823                    size=5000
3824                )
3825
3826                try:
3827                    self.assertGreaterEqual(sms_uno.size, 5000)
3828
3829                    sms_duo = shared_memory.SharedMemory('test01_dblunlink')
3830                    sms_duo.unlink()  # First shm_unlink() call.
3831                    sms_duo.close()
3832                    sms_uno.close()
3833
3834                finally:
3835                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3836
3837        with self.assertRaises(FileExistsError):
3838            # Attempting to create a new shared memory segment with a
3839            # name that is already in use triggers an exception.
3840            there_can_only_be_one_sms = shared_memory.SharedMemory(
3841                'test01_tsmb',
3842                create=True,
3843                size=512
3844            )
3845
3846        if shared_memory._USE_POSIX:
3847            # Requesting creation of a shared memory segment with the option
3848            # to attach to an existing segment, if that name is currently in
3849            # use, should not trigger an exception.
3850            # Note:  Using a smaller size could possibly cause truncation of
3851            # the existing segment but is OS platform dependent.  In the
3852            # case of MacOS/darwin, requesting a smaller size is disallowed.
3853            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3854                _flags = os.O_CREAT | os.O_RDWR
3855            ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
3856            self.assertEqual(ok_if_exists_sms.size, sms.size)
3857            ok_if_exists_sms.close()
3858
3859        # Attempting to attach to an existing shared memory segment when
3860        # no segment exists with the supplied name triggers an exception.
3861        with self.assertRaises(FileNotFoundError):
3862            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3863            nonexisting_sms.unlink()  # Error should occur on prior line.
3864
3865        sms.close()
3866
3867        # Test creating a shared memory segment with negative size
3868        with self.assertRaises(ValueError):
3869            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
3870
3871        # Test creating a shared memory segment with size 0
3872        with self.assertRaises(ValueError):
3873            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
3874
3875        # Test creating a shared memory segment without size argument
3876        with self.assertRaises(ValueError):
3877            sms_invalid = shared_memory.SharedMemory(create=True)
3878
3879    def test_shared_memory_across_processes(self):
3880        # bpo-40135: don't define shared memory block's name in case of
3881        # the failure when we run multiprocessing tests in parallel.
3882        sms = shared_memory.SharedMemory(create=True, size=512)
3883        self.addCleanup(sms.unlink)
3884
3885        # Verify remote attachment to existing block by name is working.
3886        p = self.Process(
3887            target=self._attach_existing_shmem_then_write,
3888            args=(sms.name, b'howdy')
3889        )
3890        p.daemon = True
3891        p.start()
3892        p.join()
3893        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3894
3895        # Verify pickling of SharedMemory instance also works.
3896        p = self.Process(
3897            target=self._attach_existing_shmem_then_write,
3898            args=(sms, b'HELLO')
3899        )
3900        p.daemon = True
3901        p.start()
3902        p.join()
3903        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3904
3905        sms.close()
3906
3907    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3908    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
3909        # bpo-36368: protect SharedMemoryManager server process from
3910        # KeyboardInterrupt signals.
3911        smm = multiprocessing.managers.SharedMemoryManager()
3912        smm.start()
3913
3914        # make sure the manager works properly at the beginning
3915        sl = smm.ShareableList(range(10))
3916
3917        # the manager's server should ignore KeyboardInterrupt signals, and
3918        # maintain its connection with the current process, and success when
3919        # asked to deliver memory segments.
3920        os.kill(smm._process.pid, signal.SIGINT)
3921
3922        sl2 = smm.ShareableList(range(10))
3923
3924        # test that the custom signal handler registered in the Manager does
3925        # not affect signal handling in the parent process.
3926        with self.assertRaises(KeyboardInterrupt):
3927            os.kill(os.getpid(), signal.SIGINT)
3928
3929        smm.shutdown()
3930
3931    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
3932    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
3933        # bpo-36867: test that a SharedMemoryManager uses the
3934        # same resource_tracker process as its parent.
3935        cmd = '''if 1:
3936            from multiprocessing.managers import SharedMemoryManager
3937
3938
3939            smm = SharedMemoryManager()
3940            smm.start()
3941            sl = smm.ShareableList(range(10))
3942            smm.shutdown()
3943        '''
3944        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3945
3946        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
3947        # resource_tracker process as its parent would make the parent's
3948        # tracker complain about sl being leaked even though smm.shutdown()
3949        # properly released sl.
3950        self.assertFalse(err)
3951
3952    def test_shared_memory_SharedMemoryManager_basics(self):
3953        smm1 = multiprocessing.managers.SharedMemoryManager()
3954        with self.assertRaises(ValueError):
3955            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
3956        smm1.start()
3957        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
3958        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
3959        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
3960        self.assertEqual(len(doppleganger_list0), 5)
3961        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
3962        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
3963        held_name = lom[0].name
3964        smm1.shutdown()
3965        if sys.platform != "win32":
3966            # Calls to unlink() have no effect on Windows platform; shared
3967            # memory will only be released once final process exits.
3968            with self.assertRaises(FileNotFoundError):
3969                # No longer there to be attached to again.
3970                absent_shm = shared_memory.SharedMemory(name=held_name)
3971
3972        with multiprocessing.managers.SharedMemoryManager() as smm2:
3973            sl = smm2.ShareableList("howdy")
3974            shm = smm2.SharedMemory(size=128)
3975            held_name = sl.shm.name
3976        if sys.platform != "win32":
3977            with self.assertRaises(FileNotFoundError):
3978                # No longer there to be attached to again.
3979                absent_sl = shared_memory.ShareableList(name=held_name)
3980
3981
3982    def test_shared_memory_ShareableList_basics(self):
3983        sl = shared_memory.ShareableList(
3984            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
3985        )
3986        self.addCleanup(sl.shm.unlink)
3987
3988        # Verify attributes are readable.
3989        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
3990
3991        # Exercise len().
3992        self.assertEqual(len(sl), 7)
3993
3994        # Exercise index().
3995        with warnings.catch_warnings():
3996            # Suppress BytesWarning when comparing against b'HoWdY'.
3997            warnings.simplefilter('ignore')
3998            with self.assertRaises(ValueError):
3999                sl.index('100')
4000            self.assertEqual(sl.index(100), 3)
4001
4002        # Exercise retrieving individual values.
4003        self.assertEqual(sl[0], 'howdy')
4004        self.assertEqual(sl[-2], True)
4005
4006        # Exercise iterability.
4007        self.assertEqual(
4008            tuple(sl),
4009            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
4010        )
4011
4012        # Exercise modifying individual values.
4013        sl[3] = 42
4014        self.assertEqual(sl[3], 42)
4015        sl[4] = 'some'  # Change type at a given position.
4016        self.assertEqual(sl[4], 'some')
4017        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
4018        with self.assertRaisesRegex(ValueError,
4019                                    "exceeds available storage"):
4020            sl[4] = 'far too many'
4021        self.assertEqual(sl[4], 'some')
4022        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
4023        self.assertEqual(sl[0], 'encodés')
4024        self.assertEqual(sl[1], b'HoWdY')  # no spillage
4025        with self.assertRaisesRegex(ValueError,
4026                                    "exceeds available storage"):
4027            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
4028        self.assertEqual(sl[1], b'HoWdY')
4029        with self.assertRaisesRegex(ValueError,
4030                                    "exceeds available storage"):
4031            sl[1] = b'123456789'
4032        self.assertEqual(sl[1], b'HoWdY')
4033
4034        # Exercise count().
4035        with warnings.catch_warnings():
4036            # Suppress BytesWarning when comparing against b'HoWdY'.
4037            warnings.simplefilter('ignore')
4038            self.assertEqual(sl.count(42), 2)
4039            self.assertEqual(sl.count(b'HoWdY'), 1)
4040            self.assertEqual(sl.count(b'adios'), 0)
4041
4042        # Exercise creating a duplicate.
4043        sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
4044        try:
4045            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4046            self.assertEqual('test03_duplicate', sl_copy.shm.name)
4047            self.assertEqual(list(sl), list(sl_copy))
4048            self.assertEqual(sl.format, sl_copy.format)
4049            sl_copy[-1] = 77
4050            self.assertEqual(sl_copy[-1], 77)
4051            self.assertNotEqual(sl[-1], 77)
4052            sl_copy.shm.close()
4053        finally:
4054            sl_copy.shm.unlink()
4055
4056        # Obtain a second handle on the same ShareableList.
4057        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4058        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4059        sl_tethered[-1] = 880
4060        self.assertEqual(sl[-1], 880)
4061        sl_tethered.shm.close()
4062
4063        sl.shm.close()
4064
4065        # Exercise creating an empty ShareableList.
4066        empty_sl = shared_memory.ShareableList()
4067        try:
4068            self.assertEqual(len(empty_sl), 0)
4069            self.assertEqual(empty_sl.format, '')
4070            self.assertEqual(empty_sl.count('any'), 0)
4071            with self.assertRaises(ValueError):
4072                empty_sl.index(None)
4073            empty_sl.shm.close()
4074        finally:
4075            empty_sl.shm.unlink()
4076
4077    def test_shared_memory_ShareableList_pickling(self):
4078        sl = shared_memory.ShareableList(range(10))
4079        self.addCleanup(sl.shm.unlink)
4080
4081        serialized_sl = pickle.dumps(sl)
4082        deserialized_sl = pickle.loads(serialized_sl)
4083        self.assertTrue(
4084            isinstance(deserialized_sl, shared_memory.ShareableList)
4085        )
4086        self.assertTrue(deserialized_sl[-1], 9)
4087        self.assertFalse(sl is deserialized_sl)
4088        deserialized_sl[4] = "changed"
4089        self.assertEqual(sl[4], "changed")
4090
4091        # Verify data is not being put into the pickled representation.
4092        name = 'a' * len(sl.shm.name)
4093        larger_sl = shared_memory.ShareableList(range(400))
4094        self.addCleanup(larger_sl.shm.unlink)
4095        serialized_larger_sl = pickle.dumps(larger_sl)
4096        self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
4097        larger_sl.shm.close()
4098
4099        deserialized_sl.shm.close()
4100        sl.shm.close()
4101
4102    def test_shared_memory_cleaned_after_process_termination(self):
4103        cmd = '''if 1:
4104            import os, time, sys
4105            from multiprocessing import shared_memory
4106
4107            # Create a shared_memory segment, and send the segment name
4108            sm = shared_memory.SharedMemory(create=True, size=10)
4109            sys.stdout.write(sm.name + '\\n')
4110            sys.stdout.flush()
4111            time.sleep(100)
4112        '''
4113        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4114                              stdout=subprocess.PIPE,
4115                              stderr=subprocess.PIPE) as p:
4116            name = p.stdout.readline().strip().decode()
4117
4118            # killing abruptly processes holding reference to a shared memory
4119            # segment should not leak the given memory segment.
4120            p.terminate()
4121            p.wait()
4122
4123            deadline = time.monotonic() + support.LONG_TIMEOUT
4124            t = 0.1
4125            while time.monotonic() < deadline:
4126                time.sleep(t)
4127                t = min(t*2, 5)
4128                try:
4129                    smm = shared_memory.SharedMemory(name, create=False)
4130                except FileNotFoundError:
4131                    break
4132            else:
4133                raise AssertionError("A SharedMemory segment was leaked after"
4134                                     " a process was abruptly terminated.")
4135
4136            if os.name == 'posix':
4137                # A warning was emitted by the subprocess' own
4138                # resource_tracker (on Windows, shared memory segments
4139                # are released automatically by the OS).
4140                err = p.stderr.read().decode()
4141                self.assertIn(
4142                    "resource_tracker: There appear to be 1 leaked "
4143                    "shared_memory objects to clean up at shutdown", err)
4144
4145#
4146#
4147#
4148
4149class _TestFinalize(BaseTestCase):
4150
4151    ALLOWED_TYPES = ('processes',)
4152
4153    def setUp(self):
4154        self.registry_backup = util._finalizer_registry.copy()
4155        util._finalizer_registry.clear()
4156
4157    def tearDown(self):
4158        self.assertFalse(util._finalizer_registry)
4159        util._finalizer_registry.update(self.registry_backup)
4160
4161    @classmethod
4162    def _test_finalize(cls, conn):
4163        class Foo(object):
4164            pass
4165
4166        a = Foo()
4167        util.Finalize(a, conn.send, args=('a',))
4168        del a           # triggers callback for a
4169
4170        b = Foo()
4171        close_b = util.Finalize(b, conn.send, args=('b',))
4172        close_b()       # triggers callback for b
4173        close_b()       # does nothing because callback has already been called
4174        del b           # does nothing because callback has already been called
4175
4176        c = Foo()
4177        util.Finalize(c, conn.send, args=('c',))
4178
4179        d10 = Foo()
4180        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4181
4182        d01 = Foo()
4183        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4184        d02 = Foo()
4185        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4186        d03 = Foo()
4187        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4188
4189        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4190
4191        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4192
4193        # call multiprocessing's cleanup function then exit process without
4194        # garbage collecting locals
4195        util._exit_function()
4196        conn.close()
4197        os._exit(0)
4198
4199    def test_finalize(self):
4200        conn, child_conn = self.Pipe()
4201
4202        p = self.Process(target=self._test_finalize, args=(child_conn,))
4203        p.daemon = True
4204        p.start()
4205        p.join()
4206
4207        result = [obj for obj in iter(conn.recv, 'STOP')]
4208        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4209
4210    def test_thread_safety(self):
4211        # bpo-24484: _run_finalizers() should be thread-safe
4212        def cb():
4213            pass
4214
4215        class Foo(object):
4216            def __init__(self):
4217                self.ref = self  # create reference cycle
4218                # insert finalizer at random key
4219                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4220
4221        finish = False
4222        exc = None
4223
4224        def run_finalizers():
4225            nonlocal exc
4226            while not finish:
4227                time.sleep(random.random() * 1e-1)
4228                try:
4229                    # A GC run will eventually happen during this,
4230                    # collecting stale Foo's and mutating the registry
4231                    util._run_finalizers()
4232                except Exception as e:
4233                    exc = e
4234
4235        def make_finalizers():
4236            nonlocal exc
4237            d = {}
4238            while not finish:
4239                try:
4240                    # Old Foo's get gradually replaced and later
4241                    # collected by the GC (because of the cyclic ref)
4242                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4243                except Exception as e:
4244                    exc = e
4245                    d.clear()
4246
4247        old_interval = sys.getswitchinterval()
4248        old_threshold = gc.get_threshold()
4249        try:
4250            sys.setswitchinterval(1e-6)
4251            gc.set_threshold(5, 5, 5)
4252            threads = [threading.Thread(target=run_finalizers),
4253                       threading.Thread(target=make_finalizers)]
4254            with test.support.start_threads(threads):
4255                time.sleep(4.0)  # Wait a bit to trigger race condition
4256                finish = True
4257            if exc is not None:
4258                raise exc
4259        finally:
4260            sys.setswitchinterval(old_interval)
4261            gc.set_threshold(*old_threshold)
4262            gc.collect()  # Collect remaining Foo's
4263
4264
4265#
4266# Test that from ... import * works for each module
4267#
4268
4269class _TestImportStar(unittest.TestCase):
4270
4271    def get_module_names(self):
4272        import glob
4273        folder = os.path.dirname(multiprocessing.__file__)
4274        pattern = os.path.join(glob.escape(folder), '*.py')
4275        files = glob.glob(pattern)
4276        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4277        modules = ['multiprocessing.' + m for m in modules]
4278        modules.remove('multiprocessing.__init__')
4279        modules.append('multiprocessing')
4280        return modules
4281
4282    def test_import(self):
4283        modules = self.get_module_names()
4284        if sys.platform == 'win32':
4285            modules.remove('multiprocessing.popen_fork')
4286            modules.remove('multiprocessing.popen_forkserver')
4287            modules.remove('multiprocessing.popen_spawn_posix')
4288        else:
4289            modules.remove('multiprocessing.popen_spawn_win32')
4290            if not HAS_REDUCTION:
4291                modules.remove('multiprocessing.popen_forkserver')
4292
4293        if c_int is None:
4294            # This module requires _ctypes
4295            modules.remove('multiprocessing.sharedctypes')
4296
4297        for name in modules:
4298            __import__(name)
4299            mod = sys.modules[name]
4300            self.assertTrue(hasattr(mod, '__all__'), name)
4301
4302            for attr in mod.__all__:
4303                self.assertTrue(
4304                    hasattr(mod, attr),
4305                    '%r does not have attribute %r' % (mod, attr)
4306                    )
4307
4308#
4309# Quick test that logging works -- does not test logging output
4310#
4311
4312class _TestLogging(BaseTestCase):
4313
4314    ALLOWED_TYPES = ('processes',)
4315
4316    def test_enable_logging(self):
4317        logger = multiprocessing.get_logger()
4318        logger.setLevel(util.SUBWARNING)
4319        self.assertTrue(logger is not None)
4320        logger.debug('this will not be printed')
4321        logger.info('nor will this')
4322        logger.setLevel(LOG_LEVEL)
4323
4324    @classmethod
4325    def _test_level(cls, conn):
4326        logger = multiprocessing.get_logger()
4327        conn.send(logger.getEffectiveLevel())
4328
4329    def test_level(self):
4330        LEVEL1 = 32
4331        LEVEL2 = 37
4332
4333        logger = multiprocessing.get_logger()
4334        root_logger = logging.getLogger()
4335        root_level = root_logger.level
4336
4337        reader, writer = multiprocessing.Pipe(duplex=False)
4338
4339        logger.setLevel(LEVEL1)
4340        p = self.Process(target=self._test_level, args=(writer,))
4341        p.start()
4342        self.assertEqual(LEVEL1, reader.recv())
4343        p.join()
4344        p.close()
4345
4346        logger.setLevel(logging.NOTSET)
4347        root_logger.setLevel(LEVEL2)
4348        p = self.Process(target=self._test_level, args=(writer,))
4349        p.start()
4350        self.assertEqual(LEVEL2, reader.recv())
4351        p.join()
4352        p.close()
4353
4354        root_logger.setLevel(root_level)
4355        logger.setLevel(level=LOG_LEVEL)
4356
4357
4358# class _TestLoggingProcessName(BaseTestCase):
4359#
4360#     def handle(self, record):
4361#         assert record.processName == multiprocessing.current_process().name
4362#         self.__handled = True
4363#
4364#     def test_logging(self):
4365#         handler = logging.Handler()
4366#         handler.handle = self.handle
4367#         self.__handled = False
4368#         # Bypass getLogger() and side-effects
4369#         logger = logging.getLoggerClass()(
4370#                 'multiprocessing.test.TestLoggingProcessName')
4371#         logger.addHandler(handler)
4372#         logger.propagate = False
4373#
4374#         logger.warn('foo')
4375#         assert self.__handled
4376
4377#
4378# Check that Process.join() retries if os.waitpid() fails with EINTR
4379#
4380
4381class _TestPollEintr(BaseTestCase):
4382
4383    ALLOWED_TYPES = ('processes',)
4384
4385    @classmethod
4386    def _killer(cls, pid):
4387        time.sleep(0.1)
4388        os.kill(pid, signal.SIGUSR1)
4389
4390    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4391    def test_poll_eintr(self):
4392        got_signal = [False]
4393        def record(*args):
4394            got_signal[0] = True
4395        pid = os.getpid()
4396        oldhandler = signal.signal(signal.SIGUSR1, record)
4397        try:
4398            killer = self.Process(target=self._killer, args=(pid,))
4399            killer.start()
4400            try:
4401                p = self.Process(target=time.sleep, args=(2,))
4402                p.start()
4403                p.join()
4404            finally:
4405                killer.join()
4406            self.assertTrue(got_signal[0])
4407            self.assertEqual(p.exitcode, 0)
4408        finally:
4409            signal.signal(signal.SIGUSR1, oldhandler)
4410
4411#
4412# Test to verify handle verification, see issue 3321
4413#
4414
4415class TestInvalidHandle(unittest.TestCase):
4416
4417    @unittest.skipIf(WIN32, "skipped on Windows")
4418    def test_invalid_handles(self):
4419        conn = multiprocessing.connection.Connection(44977608)
4420        # check that poll() doesn't crash
4421        try:
4422            conn.poll()
4423        except (ValueError, OSError):
4424            pass
4425        finally:
4426            # Hack private attribute _handle to avoid printing an error
4427            # in conn.__del__
4428            conn._handle = None
4429        self.assertRaises((ValueError, OSError),
4430                          multiprocessing.connection.Connection, -1)
4431
4432
4433
4434@hashlib_helper.requires_hashdigest('md5')
4435class OtherTest(unittest.TestCase):
4436    # TODO: add more tests for deliver/answer challenge.
4437    def test_deliver_challenge_auth_failure(self):
4438        class _FakeConnection(object):
4439            def recv_bytes(self, size):
4440                return b'something bogus'
4441            def send_bytes(self, data):
4442                pass
4443        self.assertRaises(multiprocessing.AuthenticationError,
4444                          multiprocessing.connection.deliver_challenge,
4445                          _FakeConnection(), b'abc')
4446
4447    def test_answer_challenge_auth_failure(self):
4448        class _FakeConnection(object):
4449            def __init__(self):
4450                self.count = 0
4451            def recv_bytes(self, size):
4452                self.count += 1
4453                if self.count == 1:
4454                    return multiprocessing.connection.CHALLENGE
4455                elif self.count == 2:
4456                    return b'something bogus'
4457                return b''
4458            def send_bytes(self, data):
4459                pass
4460        self.assertRaises(multiprocessing.AuthenticationError,
4461                          multiprocessing.connection.answer_challenge,
4462                          _FakeConnection(), b'abc')
4463
4464#
4465# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4466#
4467
4468def initializer(ns):
4469    ns.test += 1
4470
4471@hashlib_helper.requires_hashdigest('md5')
4472class TestInitializers(unittest.TestCase):
4473    def setUp(self):
4474        self.mgr = multiprocessing.Manager()
4475        self.ns = self.mgr.Namespace()
4476        self.ns.test = 0
4477
4478    def tearDown(self):
4479        self.mgr.shutdown()
4480        self.mgr.join()
4481
4482    def test_manager_initializer(self):
4483        m = multiprocessing.managers.SyncManager()
4484        self.assertRaises(TypeError, m.start, 1)
4485        m.start(initializer, (self.ns,))
4486        self.assertEqual(self.ns.test, 1)
4487        m.shutdown()
4488        m.join()
4489
4490    def test_pool_initializer(self):
4491        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4492        p = multiprocessing.Pool(1, initializer, (self.ns,))
4493        p.close()
4494        p.join()
4495        self.assertEqual(self.ns.test, 1)
4496
4497#
4498# Issue 5155, 5313, 5331: Test process in processes
4499# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4500#
4501
4502def _this_sub_process(q):
4503    try:
4504        item = q.get(block=False)
4505    except pyqueue.Empty:
4506        pass
4507
4508def _test_process():
4509    queue = multiprocessing.Queue()
4510    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4511    subProc.daemon = True
4512    subProc.start()
4513    subProc.join()
4514
4515def _afunc(x):
4516    return x*x
4517
4518def pool_in_process():
4519    pool = multiprocessing.Pool(processes=4)
4520    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4521    pool.close()
4522    pool.join()
4523
4524class _file_like(object):
4525    def __init__(self, delegate):
4526        self._delegate = delegate
4527        self._pid = None
4528
4529    @property
4530    def cache(self):
4531        pid = os.getpid()
4532        # There are no race conditions since fork keeps only the running thread
4533        if pid != self._pid:
4534            self._pid = pid
4535            self._cache = []
4536        return self._cache
4537
4538    def write(self, data):
4539        self.cache.append(data)
4540
4541    def flush(self):
4542        self._delegate.write(''.join(self.cache))
4543        self._cache = []
4544
4545class TestStdinBadfiledescriptor(unittest.TestCase):
4546
4547    def test_queue_in_process(self):
4548        proc = multiprocessing.Process(target=_test_process)
4549        proc.start()
4550        proc.join()
4551
4552    def test_pool_in_process(self):
4553        p = multiprocessing.Process(target=pool_in_process)
4554        p.start()
4555        p.join()
4556
4557    def test_flushing(self):
4558        sio = io.StringIO()
4559        flike = _file_like(sio)
4560        flike.write('foo')
4561        proc = multiprocessing.Process(target=lambda: flike.flush())
4562        flike.flush()
4563        assert sio.getvalue() == 'foo'
4564
4565
4566class TestWait(unittest.TestCase):
4567
4568    @classmethod
4569    def _child_test_wait(cls, w, slow):
4570        for i in range(10):
4571            if slow:
4572                time.sleep(random.random()*0.1)
4573            w.send((i, os.getpid()))
4574        w.close()
4575
4576    def test_wait(self, slow=False):
4577        from multiprocessing.connection import wait
4578        readers = []
4579        procs = []
4580        messages = []
4581
4582        for i in range(4):
4583            r, w = multiprocessing.Pipe(duplex=False)
4584            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4585            p.daemon = True
4586            p.start()
4587            w.close()
4588            readers.append(r)
4589            procs.append(p)
4590            self.addCleanup(p.join)
4591
4592        while readers:
4593            for r in wait(readers):
4594                try:
4595                    msg = r.recv()
4596                except EOFError:
4597                    readers.remove(r)
4598                    r.close()
4599                else:
4600                    messages.append(msg)
4601
4602        messages.sort()
4603        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4604        self.assertEqual(messages, expected)
4605
4606    @classmethod
4607    def _child_test_wait_socket(cls, address, slow):
4608        s = socket.socket()
4609        s.connect(address)
4610        for i in range(10):
4611            if slow:
4612                time.sleep(random.random()*0.1)
4613            s.sendall(('%s\n' % i).encode('ascii'))
4614        s.close()
4615
4616    def test_wait_socket(self, slow=False):
4617        from multiprocessing.connection import wait
4618        l = socket.create_server((socket_helper.HOST, 0))
4619        addr = l.getsockname()
4620        readers = []
4621        procs = []
4622        dic = {}
4623
4624        for i in range(4):
4625            p = multiprocessing.Process(target=self._child_test_wait_socket,
4626                                        args=(addr, slow))
4627            p.daemon = True
4628            p.start()
4629            procs.append(p)
4630            self.addCleanup(p.join)
4631
4632        for i in range(4):
4633            r, _ = l.accept()
4634            readers.append(r)
4635            dic[r] = []
4636        l.close()
4637
4638        while readers:
4639            for r in wait(readers):
4640                msg = r.recv(32)
4641                if not msg:
4642                    readers.remove(r)
4643                    r.close()
4644                else:
4645                    dic[r].append(msg)
4646
4647        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4648        for v in dic.values():
4649            self.assertEqual(b''.join(v), expected)
4650
4651    def test_wait_slow(self):
4652        self.test_wait(True)
4653
4654    def test_wait_socket_slow(self):
4655        self.test_wait_socket(True)
4656
4657    def test_wait_timeout(self):
4658        from multiprocessing.connection import wait
4659
4660        expected = 5
4661        a, b = multiprocessing.Pipe()
4662
4663        start = time.monotonic()
4664        res = wait([a, b], expected)
4665        delta = time.monotonic() - start
4666
4667        self.assertEqual(res, [])
4668        self.assertLess(delta, expected * 2)
4669        self.assertGreater(delta, expected * 0.5)
4670
4671        b.send(None)
4672
4673        start = time.monotonic()
4674        res = wait([a, b], 20)
4675        delta = time.monotonic() - start
4676
4677        self.assertEqual(res, [a])
4678        self.assertLess(delta, 0.4)
4679
4680    @classmethod
4681    def signal_and_sleep(cls, sem, period):
4682        sem.release()
4683        time.sleep(period)
4684
4685    def test_wait_integer(self):
4686        from multiprocessing.connection import wait
4687
4688        expected = 3
4689        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4690        sem = multiprocessing.Semaphore(0)
4691        a, b = multiprocessing.Pipe()
4692        p = multiprocessing.Process(target=self.signal_and_sleep,
4693                                    args=(sem, expected))
4694
4695        p.start()
4696        self.assertIsInstance(p.sentinel, int)
4697        self.assertTrue(sem.acquire(timeout=20))
4698
4699        start = time.monotonic()
4700        res = wait([a, p.sentinel, b], expected + 20)
4701        delta = time.monotonic() - start
4702
4703        self.assertEqual(res, [p.sentinel])
4704        self.assertLess(delta, expected + 2)
4705        self.assertGreater(delta, expected - 2)
4706
4707        a.send(None)
4708
4709        start = time.monotonic()
4710        res = wait([a, p.sentinel, b], 20)
4711        delta = time.monotonic() - start
4712
4713        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4714        self.assertLess(delta, 0.4)
4715
4716        b.send(None)
4717
4718        start = time.monotonic()
4719        res = wait([a, p.sentinel, b], 20)
4720        delta = time.monotonic() - start
4721
4722        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4723        self.assertLess(delta, 0.4)
4724
4725        p.terminate()
4726        p.join()
4727
4728    def test_neg_timeout(self):
4729        from multiprocessing.connection import wait
4730        a, b = multiprocessing.Pipe()
4731        t = time.monotonic()
4732        res = wait([a], timeout=-1)
4733        t = time.monotonic() - t
4734        self.assertEqual(res, [])
4735        self.assertLess(t, 1)
4736        a.close()
4737        b.close()
4738
4739#
4740# Issue 14151: Test invalid family on invalid environment
4741#
4742
4743class TestInvalidFamily(unittest.TestCase):
4744
4745    @unittest.skipIf(WIN32, "skipped on Windows")
4746    def test_invalid_family(self):
4747        with self.assertRaises(ValueError):
4748            multiprocessing.connection.Listener(r'\\.\test')
4749
4750    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4751    def test_invalid_family_win32(self):
4752        with self.assertRaises(ValueError):
4753            multiprocessing.connection.Listener('/var/test.pipe')
4754
4755#
4756# Issue 12098: check sys.flags of child matches that for parent
4757#
4758
4759class TestFlags(unittest.TestCase):
4760    @classmethod
4761    def run_in_grandchild(cls, conn):
4762        conn.send(tuple(sys.flags))
4763
4764    @classmethod
4765    def run_in_child(cls):
4766        import json
4767        r, w = multiprocessing.Pipe(duplex=False)
4768        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4769        p.start()
4770        grandchild_flags = r.recv()
4771        p.join()
4772        r.close()
4773        w.close()
4774        flags = (tuple(sys.flags), grandchild_flags)
4775        print(json.dumps(flags))
4776
4777    def test_flags(self):
4778        import json
4779        # start child process using unusual flags
4780        prog = ('from test._test_multiprocessing import TestFlags; ' +
4781                'TestFlags.run_in_child()')
4782        data = subprocess.check_output(
4783            [sys.executable, '-E', '-S', '-O', '-c', prog])
4784        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4785        self.assertEqual(child_flags, grandchild_flags)
4786
4787#
4788# Test interaction with socket timeouts - see Issue #6056
4789#
4790
4791class TestTimeouts(unittest.TestCase):
4792    @classmethod
4793    def _test_timeout(cls, child, address):
4794        time.sleep(1)
4795        child.send(123)
4796        child.close()
4797        conn = multiprocessing.connection.Client(address)
4798        conn.send(456)
4799        conn.close()
4800
4801    def test_timeout(self):
4802        old_timeout = socket.getdefaulttimeout()
4803        try:
4804            socket.setdefaulttimeout(0.1)
4805            parent, child = multiprocessing.Pipe(duplex=True)
4806            l = multiprocessing.connection.Listener(family='AF_INET')
4807            p = multiprocessing.Process(target=self._test_timeout,
4808                                        args=(child, l.address))
4809            p.start()
4810            child.close()
4811            self.assertEqual(parent.recv(), 123)
4812            parent.close()
4813            conn = l.accept()
4814            self.assertEqual(conn.recv(), 456)
4815            conn.close()
4816            l.close()
4817            join_process(p)
4818        finally:
4819            socket.setdefaulttimeout(old_timeout)
4820
4821#
4822# Test what happens with no "if __name__ == '__main__'"
4823#
4824
4825class TestNoForkBomb(unittest.TestCase):
4826    def test_noforkbomb(self):
4827        sm = multiprocessing.get_start_method()
4828        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4829        if sm != 'fork':
4830            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4831            self.assertEqual(out, b'')
4832            self.assertIn(b'RuntimeError', err)
4833        else:
4834            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4835            self.assertEqual(out.rstrip(), b'123')
4836            self.assertEqual(err, b'')
4837
4838#
4839# Issue #17555: ForkAwareThreadLock
4840#
4841
4842class TestForkAwareThreadLock(unittest.TestCase):
4843    # We recursively start processes.  Issue #17555 meant that the
4844    # after fork registry would get duplicate entries for the same
4845    # lock.  The size of the registry at generation n was ~2**n.
4846
4847    @classmethod
4848    def child(cls, n, conn):
4849        if n > 1:
4850            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4851            p.start()
4852            conn.close()
4853            join_process(p)
4854        else:
4855            conn.send(len(util._afterfork_registry))
4856        conn.close()
4857
4858    def test_lock(self):
4859        r, w = multiprocessing.Pipe(False)
4860        l = util.ForkAwareThreadLock()
4861        old_size = len(util._afterfork_registry)
4862        p = multiprocessing.Process(target=self.child, args=(5, w))
4863        p.start()
4864        w.close()
4865        new_size = r.recv()
4866        join_process(p)
4867        self.assertLessEqual(new_size, old_size)
4868
4869#
4870# Check that non-forked child processes do not inherit unneeded fds/handles
4871#
4872
4873class TestCloseFds(unittest.TestCase):
4874
4875    def get_high_socket_fd(self):
4876        if WIN32:
4877            # The child process will not have any socket handles, so
4878            # calling socket.fromfd() should produce WSAENOTSOCK even
4879            # if there is a handle of the same number.
4880            return socket.socket().detach()
4881        else:
4882            # We want to produce a socket with an fd high enough that a
4883            # freshly created child process will not have any fds as high.
4884            fd = socket.socket().detach()
4885            to_close = []
4886            while fd < 50:
4887                to_close.append(fd)
4888                fd = os.dup(fd)
4889            for x in to_close:
4890                os.close(x)
4891            return fd
4892
4893    def close(self, fd):
4894        if WIN32:
4895            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
4896        else:
4897            os.close(fd)
4898
4899    @classmethod
4900    def _test_closefds(cls, conn, fd):
4901        try:
4902            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4903        except Exception as e:
4904            conn.send(e)
4905        else:
4906            s.close()
4907            conn.send(None)
4908
4909    def test_closefd(self):
4910        if not HAS_REDUCTION:
4911            raise unittest.SkipTest('requires fd pickling')
4912
4913        reader, writer = multiprocessing.Pipe()
4914        fd = self.get_high_socket_fd()
4915        try:
4916            p = multiprocessing.Process(target=self._test_closefds,
4917                                        args=(writer, fd))
4918            p.start()
4919            writer.close()
4920            e = reader.recv()
4921            join_process(p)
4922        finally:
4923            self.close(fd)
4924            writer.close()
4925            reader.close()
4926
4927        if multiprocessing.get_start_method() == 'fork':
4928            self.assertIs(e, None)
4929        else:
4930            WSAENOTSOCK = 10038
4931            self.assertIsInstance(e, OSError)
4932            self.assertTrue(e.errno == errno.EBADF or
4933                            e.winerror == WSAENOTSOCK, e)
4934
4935#
4936# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4937#
4938
4939class TestIgnoreEINTR(unittest.TestCase):
4940
4941    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4942    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4943
4944    @classmethod
4945    def _test_ignore(cls, conn):
4946        def handler(signum, frame):
4947            pass
4948        signal.signal(signal.SIGUSR1, handler)
4949        conn.send('ready')
4950        x = conn.recv()
4951        conn.send(x)
4952        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
4953
4954    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4955    def test_ignore(self):
4956        conn, child_conn = multiprocessing.Pipe()
4957        try:
4958            p = multiprocessing.Process(target=self._test_ignore,
4959                                        args=(child_conn,))
4960            p.daemon = True
4961            p.start()
4962            child_conn.close()
4963            self.assertEqual(conn.recv(), 'ready')
4964            time.sleep(0.1)
4965            os.kill(p.pid, signal.SIGUSR1)
4966            time.sleep(0.1)
4967            conn.send(1234)
4968            self.assertEqual(conn.recv(), 1234)
4969            time.sleep(0.1)
4970            os.kill(p.pid, signal.SIGUSR1)
4971            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
4972            time.sleep(0.1)
4973            p.join()
4974        finally:
4975            conn.close()
4976
4977    @classmethod
4978    def _test_ignore_listener(cls, conn):
4979        def handler(signum, frame):
4980            pass
4981        signal.signal(signal.SIGUSR1, handler)
4982        with multiprocessing.connection.Listener() as l:
4983            conn.send(l.address)
4984            a = l.accept()
4985            a.send('welcome')
4986
4987    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4988    def test_ignore_listener(self):
4989        conn, child_conn = multiprocessing.Pipe()
4990        try:
4991            p = multiprocessing.Process(target=self._test_ignore_listener,
4992                                        args=(child_conn,))
4993            p.daemon = True
4994            p.start()
4995            child_conn.close()
4996            address = conn.recv()
4997            time.sleep(0.1)
4998            os.kill(p.pid, signal.SIGUSR1)
4999            time.sleep(0.1)
5000            client = multiprocessing.connection.Client(address)
5001            self.assertEqual(client.recv(), 'welcome')
5002            p.join()
5003        finally:
5004            conn.close()
5005
5006class TestStartMethod(unittest.TestCase):
5007    @classmethod
5008    def _check_context(cls, conn):
5009        conn.send(multiprocessing.get_start_method())
5010
5011    def check_context(self, ctx):
5012        r, w = ctx.Pipe(duplex=False)
5013        p = ctx.Process(target=self._check_context, args=(w,))
5014        p.start()
5015        w.close()
5016        child_method = r.recv()
5017        r.close()
5018        p.join()
5019        self.assertEqual(child_method, ctx.get_start_method())
5020
5021    def test_context(self):
5022        for method in ('fork', 'spawn', 'forkserver'):
5023            try:
5024                ctx = multiprocessing.get_context(method)
5025            except ValueError:
5026                continue
5027            self.assertEqual(ctx.get_start_method(), method)
5028            self.assertIs(ctx.get_context(), ctx)
5029            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
5030            self.assertRaises(ValueError, ctx.set_start_method, None)
5031            self.check_context(ctx)
5032
5033    def test_set_get(self):
5034        multiprocessing.set_forkserver_preload(PRELOAD)
5035        count = 0
5036        old_method = multiprocessing.get_start_method()
5037        try:
5038            for method in ('fork', 'spawn', 'forkserver'):
5039                try:
5040                    multiprocessing.set_start_method(method, force=True)
5041                except ValueError:
5042                    continue
5043                self.assertEqual(multiprocessing.get_start_method(), method)
5044                ctx = multiprocessing.get_context()
5045                self.assertEqual(ctx.get_start_method(), method)
5046                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5047                self.assertTrue(
5048                    ctx.Process.__name__.lower().startswith(method))
5049                self.check_context(multiprocessing)
5050                count += 1
5051        finally:
5052            multiprocessing.set_start_method(old_method, force=True)
5053        self.assertGreaterEqual(count, 1)
5054
5055    def test_get_all(self):
5056        methods = multiprocessing.get_all_start_methods()
5057        if sys.platform == 'win32':
5058            self.assertEqual(methods, ['spawn'])
5059        else:
5060            self.assertTrue(methods == ['fork', 'spawn'] or
5061                            methods == ['spawn', 'fork'] or
5062                            methods == ['fork', 'spawn', 'forkserver'] or
5063                            methods == ['spawn', 'fork', 'forkserver'])
5064
5065    def test_preload_resources(self):
5066        if multiprocessing.get_start_method() != 'forkserver':
5067            self.skipTest("test only relevant for 'forkserver' method")
5068        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5069        rc, out, err = test.support.script_helper.assert_python_ok(name)
5070        out = out.decode()
5071        err = err.decode()
5072        if out.rstrip() != 'ok' or err != '':
5073            print(out)
5074            print(err)
5075            self.fail("failed spawning forkserver or grandchild")
5076
5077
5078@unittest.skipIf(sys.platform == "win32",
5079                 "test semantics don't make sense on Windows")
5080class TestResourceTracker(unittest.TestCase):
5081
5082    def test_resource_tracker(self):
5083        #
5084        # Check that killing process does not leak named semaphores
5085        #
5086        cmd = '''if 1:
5087            import time, os, tempfile
5088            import multiprocessing as mp
5089            from multiprocessing import resource_tracker
5090            from multiprocessing.shared_memory import SharedMemory
5091
5092            mp.set_start_method("spawn")
5093            rand = tempfile._RandomNameSequence()
5094
5095
5096            def create_and_register_resource(rtype):
5097                if rtype == "semaphore":
5098                    lock = mp.Lock()
5099                    return lock, lock._semlock.name
5100                elif rtype == "shared_memory":
5101                    sm = SharedMemory(create=True, size=10)
5102                    return sm, sm._name
5103                else:
5104                    raise ValueError(
5105                        "Resource type {{}} not understood".format(rtype))
5106
5107
5108            resource1, rname1 = create_and_register_resource("{rtype}")
5109            resource2, rname2 = create_and_register_resource("{rtype}")
5110
5111            os.write({w}, rname1.encode("ascii") + b"\\n")
5112            os.write({w}, rname2.encode("ascii") + b"\\n")
5113
5114            time.sleep(10)
5115        '''
5116        for rtype in resource_tracker._CLEANUP_FUNCS:
5117            with self.subTest(rtype=rtype):
5118                if rtype == "noop":
5119                    # Artefact resource type used by the resource_tracker
5120                    continue
5121                r, w = os.pipe()
5122                p = subprocess.Popen([sys.executable,
5123                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5124                                     pass_fds=[w],
5125                                     stderr=subprocess.PIPE)
5126                os.close(w)
5127                with open(r, 'rb', closefd=True) as f:
5128                    name1 = f.readline().rstrip().decode('ascii')
5129                    name2 = f.readline().rstrip().decode('ascii')
5130                _resource_unlink(name1, rtype)
5131                p.terminate()
5132                p.wait()
5133
5134                deadline = time.monotonic() + support.LONG_TIMEOUT
5135                while time.monotonic() < deadline:
5136                    time.sleep(.5)
5137                    try:
5138                        _resource_unlink(name2, rtype)
5139                    except OSError as e:
5140                        # docs say it should be ENOENT, but OSX seems to give
5141                        # EINVAL
5142                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5143                        break
5144                else:
5145                    raise AssertionError(
5146                        f"A {rtype} resource was leaked after a process was "
5147                        f"abruptly terminated.")
5148                err = p.stderr.read().decode('utf-8')
5149                p.stderr.close()
5150                expected = ('resource_tracker: There appear to be 2 leaked {} '
5151                            'objects'.format(
5152                            rtype))
5153                self.assertRegex(err, expected)
5154                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5155
5156    def check_resource_tracker_death(self, signum, should_die):
5157        # bpo-31310: if the semaphore tracker process has died, it should
5158        # be restarted implicitly.
5159        from multiprocessing.resource_tracker import _resource_tracker
5160        pid = _resource_tracker._pid
5161        if pid is not None:
5162            os.kill(pid, signal.SIGKILL)
5163            support.wait_process(pid, exitcode=-signal.SIGKILL)
5164        with warnings.catch_warnings():
5165            warnings.simplefilter("ignore")
5166            _resource_tracker.ensure_running()
5167        pid = _resource_tracker._pid
5168
5169        os.kill(pid, signum)
5170        time.sleep(1.0)  # give it time to die
5171
5172        ctx = multiprocessing.get_context("spawn")
5173        with warnings.catch_warnings(record=True) as all_warn:
5174            warnings.simplefilter("always")
5175            sem = ctx.Semaphore()
5176            sem.acquire()
5177            sem.release()
5178            wr = weakref.ref(sem)
5179            # ensure `sem` gets collected, which triggers communication with
5180            # the semaphore tracker
5181            del sem
5182            gc.collect()
5183            self.assertIsNone(wr())
5184            if should_die:
5185                self.assertEqual(len(all_warn), 1)
5186                the_warn = all_warn[0]
5187                self.assertTrue(issubclass(the_warn.category, UserWarning))
5188                self.assertTrue("resource_tracker: process died"
5189                                in str(the_warn.message))
5190            else:
5191                self.assertEqual(len(all_warn), 0)
5192
5193    def test_resource_tracker_sigint(self):
5194        # Catchable signal (ignored by semaphore tracker)
5195        self.check_resource_tracker_death(signal.SIGINT, False)
5196
5197    def test_resource_tracker_sigterm(self):
5198        # Catchable signal (ignored by semaphore tracker)
5199        self.check_resource_tracker_death(signal.SIGTERM, False)
5200
5201    def test_resource_tracker_sigkill(self):
5202        # Uncatchable signal.
5203        self.check_resource_tracker_death(signal.SIGKILL, True)
5204
5205    @staticmethod
5206    def _is_resource_tracker_reused(conn, pid):
5207        from multiprocessing.resource_tracker import _resource_tracker
5208        _resource_tracker.ensure_running()
5209        # The pid should be None in the child process, expect for the fork
5210        # context. It should not be a new value.
5211        reused = _resource_tracker._pid in (None, pid)
5212        reused &= _resource_tracker._check_alive()
5213        conn.send(reused)
5214
5215    def test_resource_tracker_reused(self):
5216        from multiprocessing.resource_tracker import _resource_tracker
5217        _resource_tracker.ensure_running()
5218        pid = _resource_tracker._pid
5219
5220        r, w = multiprocessing.Pipe(duplex=False)
5221        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5222                                    args=(w, pid))
5223        p.start()
5224        is_resource_tracker_reused = r.recv()
5225
5226        # Clean up
5227        p.join()
5228        w.close()
5229        r.close()
5230
5231        self.assertTrue(is_resource_tracker_reused)
5232
5233
5234class TestSimpleQueue(unittest.TestCase):
5235
5236    @classmethod
5237    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5238        child_can_start.wait()
5239        # issue 30301, could fail under spawn and forkserver
5240        try:
5241            queue.put(queue.empty())
5242            queue.put(queue.empty())
5243        finally:
5244            parent_can_continue.set()
5245
5246    def test_empty(self):
5247        queue = multiprocessing.SimpleQueue()
5248        child_can_start = multiprocessing.Event()
5249        parent_can_continue = multiprocessing.Event()
5250
5251        proc = multiprocessing.Process(
5252            target=self._test_empty,
5253            args=(queue, child_can_start, parent_can_continue)
5254        )
5255        proc.daemon = True
5256        proc.start()
5257
5258        self.assertTrue(queue.empty())
5259
5260        child_can_start.set()
5261        parent_can_continue.wait()
5262
5263        self.assertFalse(queue.empty())
5264        self.assertEqual(queue.get(), True)
5265        self.assertEqual(queue.get(), False)
5266        self.assertTrue(queue.empty())
5267
5268        proc.join()
5269
5270    def test_close(self):
5271        queue = multiprocessing.SimpleQueue()
5272        queue.close()
5273        # closing a queue twice should not fail
5274        queue.close()
5275
5276    # Test specific to CPython since it tests private attributes
5277    @test.support.cpython_only
5278    def test_closed(self):
5279        queue = multiprocessing.SimpleQueue()
5280        queue.close()
5281        self.assertTrue(queue._reader.closed)
5282        self.assertTrue(queue._writer.closed)
5283
5284
5285class TestPoolNotLeakOnFailure(unittest.TestCase):
5286
5287    def test_release_unused_processes(self):
5288        # Issue #19675: During pool creation, if we can't create a process,
5289        # don't leak already created ones.
5290        will_fail_in = 3
5291        forked_processes = []
5292
5293        class FailingForkProcess:
5294            def __init__(self, **kwargs):
5295                self.name = 'Fake Process'
5296                self.exitcode = None
5297                self.state = None
5298                forked_processes.append(self)
5299
5300            def start(self):
5301                nonlocal will_fail_in
5302                if will_fail_in <= 0:
5303                    raise OSError("Manually induced OSError")
5304                will_fail_in -= 1
5305                self.state = 'started'
5306
5307            def terminate(self):
5308                self.state = 'stopping'
5309
5310            def join(self):
5311                if self.state == 'stopping':
5312                    self.state = 'stopped'
5313
5314            def is_alive(self):
5315                return self.state == 'started' or self.state == 'stopping'
5316
5317        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5318            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5319                Process=FailingForkProcess))
5320            p.close()
5321            p.join()
5322        self.assertFalse(
5323            any(process.is_alive() for process in forked_processes))
5324
5325
5326@hashlib_helper.requires_hashdigest('md5')
5327class TestSyncManagerTypes(unittest.TestCase):
5328    """Test all the types which can be shared between a parent and a
5329    child process by using a manager which acts as an intermediary
5330    between them.
5331
5332    In the following unit-tests the base type is created in the parent
5333    process, the @classmethod represents the worker process and the
5334    shared object is readable and editable between the two.
5335
5336    # The child.
5337    @classmethod
5338    def _test_list(cls, obj):
5339        assert obj[0] == 5
5340        assert obj.append(6)
5341
5342    # The parent.
5343    def test_list(self):
5344        o = self.manager.list()
5345        o.append(5)
5346        self.run_worker(self._test_list, o)
5347        assert o[1] == 6
5348    """
5349    manager_class = multiprocessing.managers.SyncManager
5350
5351    def setUp(self):
5352        self.manager = self.manager_class()
5353        self.manager.start()
5354        self.proc = None
5355
5356    def tearDown(self):
5357        if self.proc is not None and self.proc.is_alive():
5358            self.proc.terminate()
5359            self.proc.join()
5360        self.manager.shutdown()
5361        self.manager = None
5362        self.proc = None
5363
5364    @classmethod
5365    def setUpClass(cls):
5366        support.reap_children()
5367
5368    tearDownClass = setUpClass
5369
5370    def wait_proc_exit(self):
5371        # Only the manager process should be returned by active_children()
5372        # but this can take a bit on slow machines, so wait a few seconds
5373        # if there are other children too (see #17395).
5374        join_process(self.proc)
5375        start_time = time.monotonic()
5376        t = 0.01
5377        while len(multiprocessing.active_children()) > 1:
5378            time.sleep(t)
5379            t *= 2
5380            dt = time.monotonic() - start_time
5381            if dt >= 5.0:
5382                test.support.environment_altered = True
5383                support.print_warning(f"multiprocessing.Manager still has "
5384                                      f"{multiprocessing.active_children()} "
5385                                      f"active children after {dt} seconds")
5386                break
5387
5388    def run_worker(self, worker, obj):
5389        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5390        self.proc.daemon = True
5391        self.proc.start()
5392        self.wait_proc_exit()
5393        self.assertEqual(self.proc.exitcode, 0)
5394
5395    @classmethod
5396    def _test_event(cls, obj):
5397        assert obj.is_set()
5398        obj.wait()
5399        obj.clear()
5400        obj.wait(0.001)
5401
5402    def test_event(self):
5403        o = self.manager.Event()
5404        o.set()
5405        self.run_worker(self._test_event, o)
5406        assert not o.is_set()
5407        o.wait(0.001)
5408
5409    @classmethod
5410    def _test_lock(cls, obj):
5411        obj.acquire()
5412
5413    def test_lock(self, lname="Lock"):
5414        o = getattr(self.manager, lname)()
5415        self.run_worker(self._test_lock, o)
5416        o.release()
5417        self.assertRaises(RuntimeError, o.release)  # already released
5418
5419    @classmethod
5420    def _test_rlock(cls, obj):
5421        obj.acquire()
5422        obj.release()
5423
5424    def test_rlock(self, lname="Lock"):
5425        o = getattr(self.manager, lname)()
5426        self.run_worker(self._test_rlock, o)
5427
5428    @classmethod
5429    def _test_semaphore(cls, obj):
5430        obj.acquire()
5431
5432    def test_semaphore(self, sname="Semaphore"):
5433        o = getattr(self.manager, sname)()
5434        self.run_worker(self._test_semaphore, o)
5435        o.release()
5436
5437    def test_bounded_semaphore(self):
5438        self.test_semaphore(sname="BoundedSemaphore")
5439
5440    @classmethod
5441    def _test_condition(cls, obj):
5442        obj.acquire()
5443        obj.release()
5444
5445    def test_condition(self):
5446        o = self.manager.Condition()
5447        self.run_worker(self._test_condition, o)
5448
5449    @classmethod
5450    def _test_barrier(cls, obj):
5451        assert obj.parties == 5
5452        obj.reset()
5453
5454    def test_barrier(self):
5455        o = self.manager.Barrier(5)
5456        self.run_worker(self._test_barrier, o)
5457
5458    @classmethod
5459    def _test_pool(cls, obj):
5460        # TODO: fix https://bugs.python.org/issue35919
5461        with obj:
5462            pass
5463
5464    def test_pool(self):
5465        o = self.manager.Pool(processes=4)
5466        self.run_worker(self._test_pool, o)
5467
5468    @classmethod
5469    def _test_queue(cls, obj):
5470        assert obj.qsize() == 2
5471        assert obj.full()
5472        assert not obj.empty()
5473        assert obj.get() == 5
5474        assert not obj.empty()
5475        assert obj.get() == 6
5476        assert obj.empty()
5477
5478    def test_queue(self, qname="Queue"):
5479        o = getattr(self.manager, qname)(2)
5480        o.put(5)
5481        o.put(6)
5482        self.run_worker(self._test_queue, o)
5483        assert o.empty()
5484        assert not o.full()
5485
5486    def test_joinable_queue(self):
5487        self.test_queue("JoinableQueue")
5488
5489    @classmethod
5490    def _test_list(cls, obj):
5491        assert obj[0] == 5
5492        assert obj.count(5) == 1
5493        assert obj.index(5) == 0
5494        obj.sort()
5495        obj.reverse()
5496        for x in obj:
5497            pass
5498        assert len(obj) == 1
5499        assert obj.pop(0) == 5
5500
5501    def test_list(self):
5502        o = self.manager.list()
5503        o.append(5)
5504        self.run_worker(self._test_list, o)
5505        assert not o
5506        self.assertEqual(len(o), 0)
5507
5508    @classmethod
5509    def _test_dict(cls, obj):
5510        assert len(obj) == 1
5511        assert obj['foo'] == 5
5512        assert obj.get('foo') == 5
5513        assert list(obj.items()) == [('foo', 5)]
5514        assert list(obj.keys()) == ['foo']
5515        assert list(obj.values()) == [5]
5516        assert obj.copy() == {'foo': 5}
5517        assert obj.popitem() == ('foo', 5)
5518
5519    def test_dict(self):
5520        o = self.manager.dict()
5521        o['foo'] = 5
5522        self.run_worker(self._test_dict, o)
5523        assert not o
5524        self.assertEqual(len(o), 0)
5525
5526    @classmethod
5527    def _test_value(cls, obj):
5528        assert obj.value == 1
5529        assert obj.get() == 1
5530        obj.set(2)
5531
5532    def test_value(self):
5533        o = self.manager.Value('i', 1)
5534        self.run_worker(self._test_value, o)
5535        self.assertEqual(o.value, 2)
5536        self.assertEqual(o.get(), 2)
5537
5538    @classmethod
5539    def _test_array(cls, obj):
5540        assert obj[0] == 0
5541        assert obj[1] == 1
5542        assert len(obj) == 2
5543        assert list(obj) == [0, 1]
5544
5545    def test_array(self):
5546        o = self.manager.Array('i', [0, 1])
5547        self.run_worker(self._test_array, o)
5548
5549    @classmethod
5550    def _test_namespace(cls, obj):
5551        assert obj.x == 0
5552        assert obj.y == 1
5553
5554    def test_namespace(self):
5555        o = self.manager.Namespace()
5556        o.x = 0
5557        o.y = 1
5558        self.run_worker(self._test_namespace, o)
5559
5560
5561class MiscTestCase(unittest.TestCase):
5562    def test__all__(self):
5563        # Just make sure names in blacklist are excluded
5564        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5565                             blacklist=['SUBDEBUG', 'SUBWARNING'])
5566#
5567# Mixins
5568#
5569
5570class BaseMixin(object):
5571    @classmethod
5572    def setUpClass(cls):
5573        cls.dangling = (multiprocessing.process._dangling.copy(),
5574                        threading._dangling.copy())
5575
5576    @classmethod
5577    def tearDownClass(cls):
5578        # bpo-26762: Some multiprocessing objects like Pool create reference
5579        # cycles. Trigger a garbage collection to break these cycles.
5580        test.support.gc_collect()
5581
5582        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5583        if processes:
5584            test.support.environment_altered = True
5585            support.print_warning(f'Dangling processes: {processes}')
5586        processes = None
5587
5588        threads = set(threading._dangling) - set(cls.dangling[1])
5589        if threads:
5590            test.support.environment_altered = True
5591            support.print_warning(f'Dangling threads: {threads}')
5592        threads = None
5593
5594
5595class ProcessesMixin(BaseMixin):
5596    TYPE = 'processes'
5597    Process = multiprocessing.Process
5598    connection = multiprocessing.connection
5599    current_process = staticmethod(multiprocessing.current_process)
5600    parent_process = staticmethod(multiprocessing.parent_process)
5601    active_children = staticmethod(multiprocessing.active_children)
5602    Pool = staticmethod(multiprocessing.Pool)
5603    Pipe = staticmethod(multiprocessing.Pipe)
5604    Queue = staticmethod(multiprocessing.Queue)
5605    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5606    Lock = staticmethod(multiprocessing.Lock)
5607    RLock = staticmethod(multiprocessing.RLock)
5608    Semaphore = staticmethod(multiprocessing.Semaphore)
5609    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5610    Condition = staticmethod(multiprocessing.Condition)
5611    Event = staticmethod(multiprocessing.Event)
5612    Barrier = staticmethod(multiprocessing.Barrier)
5613    Value = staticmethod(multiprocessing.Value)
5614    Array = staticmethod(multiprocessing.Array)
5615    RawValue = staticmethod(multiprocessing.RawValue)
5616    RawArray = staticmethod(multiprocessing.RawArray)
5617
5618
5619class ManagerMixin(BaseMixin):
5620    TYPE = 'manager'
5621    Process = multiprocessing.Process
5622    Queue = property(operator.attrgetter('manager.Queue'))
5623    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5624    Lock = property(operator.attrgetter('manager.Lock'))
5625    RLock = property(operator.attrgetter('manager.RLock'))
5626    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5627    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5628    Condition = property(operator.attrgetter('manager.Condition'))
5629    Event = property(operator.attrgetter('manager.Event'))
5630    Barrier = property(operator.attrgetter('manager.Barrier'))
5631    Value = property(operator.attrgetter('manager.Value'))
5632    Array = property(operator.attrgetter('manager.Array'))
5633    list = property(operator.attrgetter('manager.list'))
5634    dict = property(operator.attrgetter('manager.dict'))
5635    Namespace = property(operator.attrgetter('manager.Namespace'))
5636
5637    @classmethod
5638    def Pool(cls, *args, **kwds):
5639        return cls.manager.Pool(*args, **kwds)
5640
5641    @classmethod
5642    def setUpClass(cls):
5643        super().setUpClass()
5644        cls.manager = multiprocessing.Manager()
5645
5646    @classmethod
5647    def tearDownClass(cls):
5648        # only the manager process should be returned by active_children()
5649        # but this can take a bit on slow machines, so wait a few seconds
5650        # if there are other children too (see #17395)
5651        start_time = time.monotonic()
5652        t = 0.01
5653        while len(multiprocessing.active_children()) > 1:
5654            time.sleep(t)
5655            t *= 2
5656            dt = time.monotonic() - start_time
5657            if dt >= 5.0:
5658                test.support.environment_altered = True
5659                support.print_warning(f"multiprocessing.Manager still has "
5660                                      f"{multiprocessing.active_children()} "
5661                                      f"active children after {dt} seconds")
5662                break
5663
5664        gc.collect()                       # do garbage collection
5665        if cls.manager._number_of_objects() != 0:
5666            # This is not really an error since some tests do not
5667            # ensure that all processes which hold a reference to a
5668            # managed object have been joined.
5669            test.support.environment_altered = True
5670            support.print_warning('Shared objects which still exist '
5671                                  'at manager shutdown:')
5672            support.print_warning(cls.manager._debug_info())
5673        cls.manager.shutdown()
5674        cls.manager.join()
5675        cls.manager = None
5676
5677        super().tearDownClass()
5678
5679
5680class ThreadsMixin(BaseMixin):
5681    TYPE = 'threads'
5682    Process = multiprocessing.dummy.Process
5683    connection = multiprocessing.dummy.connection
5684    current_process = staticmethod(multiprocessing.dummy.current_process)
5685    active_children = staticmethod(multiprocessing.dummy.active_children)
5686    Pool = staticmethod(multiprocessing.dummy.Pool)
5687    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5688    Queue = staticmethod(multiprocessing.dummy.Queue)
5689    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5690    Lock = staticmethod(multiprocessing.dummy.Lock)
5691    RLock = staticmethod(multiprocessing.dummy.RLock)
5692    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5693    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5694    Condition = staticmethod(multiprocessing.dummy.Condition)
5695    Event = staticmethod(multiprocessing.dummy.Event)
5696    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5697    Value = staticmethod(multiprocessing.dummy.Value)
5698    Array = staticmethod(multiprocessing.dummy.Array)
5699
5700#
5701# Functions used to create test cases from the base ones in this module
5702#
5703
5704def install_tests_in_module_dict(remote_globs, start_method):
5705    __module__ = remote_globs['__name__']
5706    local_globs = globals()
5707    ALL_TYPES = {'processes', 'threads', 'manager'}
5708
5709    for name, base in local_globs.items():
5710        if not isinstance(base, type):
5711            continue
5712        if issubclass(base, BaseTestCase):
5713            if base is BaseTestCase:
5714                continue
5715            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5716            for type_ in base.ALLOWED_TYPES:
5717                newname = 'With' + type_.capitalize() + name[1:]
5718                Mixin = local_globs[type_.capitalize() + 'Mixin']
5719                class Temp(base, Mixin, unittest.TestCase):
5720                    pass
5721                if type_ == 'manager':
5722                    Temp = hashlib_helper.requires_hashdigest('md5')(Temp)
5723                Temp.__name__ = Temp.__qualname__ = newname
5724                Temp.__module__ = __module__
5725                remote_globs[newname] = Temp
5726        elif issubclass(base, unittest.TestCase):
5727            class Temp(base, object):
5728                pass
5729            Temp.__name__ = Temp.__qualname__ = name
5730            Temp.__module__ = __module__
5731            remote_globs[name] = Temp
5732
5733    dangling = [None, None]
5734    old_start_method = [None]
5735
5736    def setUpModule():
5737        multiprocessing.set_forkserver_preload(PRELOAD)
5738        multiprocessing.process._cleanup()
5739        dangling[0] = multiprocessing.process._dangling.copy()
5740        dangling[1] = threading._dangling.copy()
5741        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5742        try:
5743            multiprocessing.set_start_method(start_method, force=True)
5744        except ValueError:
5745            raise unittest.SkipTest(start_method +
5746                                    ' start method not supported')
5747
5748        if sys.platform.startswith("linux"):
5749            try:
5750                lock = multiprocessing.RLock()
5751            except OSError:
5752                raise unittest.SkipTest("OSError raises on RLock creation, "
5753                                        "see issue 3111!")
5754        check_enough_semaphores()
5755        util.get_temp_dir()     # creates temp directory
5756        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5757
5758    def tearDownModule():
5759        need_sleep = False
5760
5761        # bpo-26762: Some multiprocessing objects like Pool create reference
5762        # cycles. Trigger a garbage collection to break these cycles.
5763        test.support.gc_collect()
5764
5765        multiprocessing.set_start_method(old_start_method[0], force=True)
5766        # pause a bit so we don't get warning about dangling threads/processes
5767        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5768        if processes:
5769            need_sleep = True
5770            test.support.environment_altered = True
5771            support.print_warning(f'Dangling processes: {processes}')
5772        processes = None
5773
5774        threads = set(threading._dangling) - set(dangling[1])
5775        if threads:
5776            need_sleep = True
5777            test.support.environment_altered = True
5778            support.print_warning(f'Dangling threads: {threads}')
5779        threads = None
5780
5781        # Sleep 500 ms to give time to child processes to complete.
5782        if need_sleep:
5783            time.sleep(0.5)
5784
5785        multiprocessing.util._cleanup_tests()
5786
5787    remote_globs['setUpModule'] = setUpModule
5788    remote_globs['tearDownModule'] = tearDownModule
5789