1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
13import socket
14import random
15import logging
16import errno
17import weakref
18import test.script_helper
19from test import support
20from StringIO import StringIO
21_multiprocessing = support.import_module('_multiprocessing')
22# import threading after _multiprocessing to raise a more relevant error
23# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
26
27# Work around broken sem_open implementations
28support.import_module('multiprocessing.synchronize')
29
30import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
34import multiprocessing.pool
35
36from multiprocessing import util
37
38try:
39    from multiprocessing import reduction
40    HAS_REDUCTION = True
41except ImportError:
42    HAS_REDUCTION = False
43
44try:
45    from multiprocessing.sharedctypes import Value, copy
46    HAS_SHAREDCTYPES = True
47except ImportError:
48    HAS_SHAREDCTYPES = False
49
50try:
51    import msvcrt
52except ImportError:
53    msvcrt = None
54
55#
56#
57#
58
59latin = str
60
61#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
66#LOG_LEVEL = logging.DEBUG
67
68DELTA = 0.1
69CHECK_TIMINGS = False     # making true makes tests take a lot longer
70                          # and can sometimes cause some non-serious
71                          # failures because some calls block a bit
72                          # longer than expected
73if CHECK_TIMINGS:
74    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79                            'HAVE_BROKEN_SEM_GETVALUE', False)
80
81WIN32 = (sys.platform == "win32")
82
83try:
84    MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86    MAXFD = 256
87
88#
89# Some tests require ctypes
90#
91
92try:
93    from ctypes import Structure, c_int, c_double
94except ImportError:
95    Structure = object
96    c_int = c_double = None
97
98
99def check_enough_semaphores():
100    """Check that the system supports enough semaphores to run the test."""
101    # minimum number of semaphores available according to POSIX
102    nsems_min = 256
103    try:
104        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105    except (AttributeError, ValueError):
106        # sysconf not available or setting not available
107        return
108    if nsems == -1 or nsems >= nsems_min:
109        return
110    raise unittest.SkipTest("The OS doesn't support enough semaphores "
111                            "to run the test (required: %d)." % nsems_min)
112
113
114#
115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120    def __init__(self, func):
121        self.func = func
122        self.elapsed = None
123
124    def __call__(self, *args, **kwds):
125        t = time.time()
126        try:
127            return self.func(*args, **kwds)
128        finally:
129            self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137    ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139    def assertTimingAlmostEqual(self, a, b):
140        if CHECK_TIMINGS:
141            self.assertAlmostEqual(a, b, 1)
142
143    def assertReturnsIfImplemented(self, value, func, *args):
144        try:
145            res = func(*args)
146        except NotImplementedError:
147            pass
148        else:
149            return self.assertEqual(value, res)
150
151    # For the sanity of Windows users, rather than crashing or freezing in
152    # multiple ways.
153    def __reduce__(self, *args):
154        raise NotImplementedError("shouldn't try to pickle a test case")
155
156    __reduce_ex__ = __reduce__
157
158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163    try:
164        return self.get_value()
165    except AttributeError:
166        try:
167            return self._Semaphore__value
168        except AttributeError:
169            try:
170                return self._value
171            except AttributeError:
172                raise NotImplementedError
173
174#
175# Testcases
176#
177
178class DummyCallable(object):
179    def __call__(self, q, c):
180        assert isinstance(c, DummyCallable)
181        q.put(5)
182
183
184class _TestProcess(BaseTestCase):
185
186    ALLOWED_TYPES = ('processes', 'threads')
187
188    def test_current(self):
189        if self.TYPE == 'threads':
190            self.skipTest('test not appropriate for {}'.format(self.TYPE))
191
192        current = self.current_process()
193        authkey = current.authkey
194
195        self.assertTrue(current.is_alive())
196        self.assertTrue(not current.daemon)
197        self.assertIsInstance(authkey, bytes)
198        self.assertTrue(len(authkey) > 0)
199        self.assertEqual(current.ident, os.getpid())
200        self.assertEqual(current.exitcode, None)
201
202    @classmethod
203    def _test(cls, q, *args, **kwds):
204        current = cls.current_process()
205        q.put(args)
206        q.put(kwds)
207        q.put(current.name)
208        if cls.TYPE != 'threads':
209            q.put(bytes(current.authkey))
210            q.put(current.pid)
211
212    def test_process(self):
213        q = self.Queue(1)
214        e = self.Event()
215        args = (q, 1, 2)
216        kwargs = {'hello':23, 'bye':2.54}
217        name = 'SomeProcess'
218        p = self.Process(
219            target=self._test, args=args, kwargs=kwargs, name=name
220            )
221        p.daemon = True
222        current = self.current_process()
223
224        if self.TYPE != 'threads':
225            self.assertEqual(p.authkey, current.authkey)
226        self.assertEqual(p.is_alive(), False)
227        self.assertEqual(p.daemon, True)
228        self.assertNotIn(p, self.active_children())
229        self.assertTrue(type(self.active_children()) is list)
230        self.assertEqual(p.exitcode, None)
231
232        p.start()
233
234        self.assertEqual(p.exitcode, None)
235        self.assertEqual(p.is_alive(), True)
236        self.assertIn(p, self.active_children())
237
238        self.assertEqual(q.get(), args[1:])
239        self.assertEqual(q.get(), kwargs)
240        self.assertEqual(q.get(), p.name)
241        if self.TYPE != 'threads':
242            self.assertEqual(q.get(), current.authkey)
243            self.assertEqual(q.get(), p.pid)
244
245        p.join()
246
247        self.assertEqual(p.exitcode, 0)
248        self.assertEqual(p.is_alive(), False)
249        self.assertNotIn(p, self.active_children())
250
251    @classmethod
252    def _test_terminate(cls):
253        time.sleep(1000)
254
255    def test_terminate(self):
256        if self.TYPE == 'threads':
257            self.skipTest('test not appropriate for {}'.format(self.TYPE))
258
259        p = self.Process(target=self._test_terminate)
260        p.daemon = True
261        p.start()
262
263        self.assertEqual(p.is_alive(), True)
264        self.assertIn(p, self.active_children())
265        self.assertEqual(p.exitcode, None)
266
267        p.terminate()
268
269        join = TimingWrapper(p.join)
270        self.assertEqual(join(), None)
271        self.assertTimingAlmostEqual(join.elapsed, 0.0)
272
273        self.assertEqual(p.is_alive(), False)
274        self.assertNotIn(p, self.active_children())
275
276        p.join()
277
278        # XXX sometimes get p.exitcode == 0 on Windows ...
279        #self.assertEqual(p.exitcode, -signal.SIGTERM)
280
281    def test_cpu_count(self):
282        try:
283            cpus = multiprocessing.cpu_count()
284        except NotImplementedError:
285            cpus = 1
286        self.assertTrue(type(cpus) is int)
287        self.assertTrue(cpus >= 1)
288
289    def test_active_children(self):
290        self.assertEqual(type(self.active_children()), list)
291
292        p = self.Process(target=time.sleep, args=(DELTA,))
293        self.assertNotIn(p, self.active_children())
294
295        p.daemon = True
296        p.start()
297        self.assertIn(p, self.active_children())
298
299        p.join()
300        self.assertNotIn(p, self.active_children())
301
302    @classmethod
303    def _test_recursion(cls, wconn, id):
304        from multiprocessing import forking
305        wconn.send(id)
306        if len(id) < 2:
307            for i in range(2):
308                p = cls.Process(
309                    target=cls._test_recursion, args=(wconn, id+[i])
310                    )
311                p.start()
312                p.join()
313
314    def test_recursion(self):
315        rconn, wconn = self.Pipe(duplex=False)
316        self._test_recursion(wconn, [])
317
318        time.sleep(DELTA)
319        result = []
320        while rconn.poll():
321            result.append(rconn.recv())
322
323        expected = [
324            [],
325              [0],
326                [0, 0],
327                [0, 1],
328              [1],
329                [1, 0],
330                [1, 1]
331            ]
332        self.assertEqual(result, expected)
333
334    @classmethod
335    def _test_sys_exit(cls, reason, testfn):
336        sys.stderr = open(testfn, 'w')
337        sys.exit(reason)
338
339    def test_sys_exit(self):
340        # See Issue 13854
341        if self.TYPE == 'threads':
342            self.skipTest('test not appropriate for {}'.format(self.TYPE))
343
344        testfn = support.TESTFN
345        self.addCleanup(support.unlink, testfn)
346
347        for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
348            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
349            p.daemon = True
350            p.start()
351            p.join(5)
352            self.assertEqual(p.exitcode, code)
353
354            with open(testfn, 'r') as f:
355                self.assertEqual(f.read().rstrip(), str(reason))
356
357        for reason in (True, False, 8):
358            p = self.Process(target=sys.exit, args=(reason,))
359            p.daemon = True
360            p.start()
361            p.join(5)
362            self.assertEqual(p.exitcode, reason)
363
364    def test_lose_target_ref(self):
365        c = DummyCallable()
366        wr = weakref.ref(c)
367        q = self.Queue()
368        p = self.Process(target=c, args=(q, c))
369        del c
370        p.start()
371        p.join()
372        self.assertIs(wr(), None)
373        self.assertEqual(q.get(), 5)
374
375
376#
377#
378#
379
380class _UpperCaser(multiprocessing.Process):
381
382    def __init__(self):
383        multiprocessing.Process.__init__(self)
384        self.child_conn, self.parent_conn = multiprocessing.Pipe()
385
386    def run(self):
387        self.parent_conn.close()
388        for s in iter(self.child_conn.recv, None):
389            self.child_conn.send(s.upper())
390        self.child_conn.close()
391
392    def submit(self, s):
393        assert type(s) is str
394        self.parent_conn.send(s)
395        return self.parent_conn.recv()
396
397    def stop(self):
398        self.parent_conn.send(None)
399        self.parent_conn.close()
400        self.child_conn.close()
401
402class _TestSubclassingProcess(BaseTestCase):
403
404    ALLOWED_TYPES = ('processes',)
405
406    def test_subclassing(self):
407        uppercaser = _UpperCaser()
408        uppercaser.daemon = True
409        uppercaser.start()
410        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
411        self.assertEqual(uppercaser.submit('world'), 'WORLD')
412        uppercaser.stop()
413        uppercaser.join()
414
415#
416#
417#
418
419def queue_empty(q):
420    if hasattr(q, 'empty'):
421        return q.empty()
422    else:
423        return q.qsize() == 0
424
425def queue_full(q, maxsize):
426    if hasattr(q, 'full'):
427        return q.full()
428    else:
429        return q.qsize() == maxsize
430
431
432class _TestQueue(BaseTestCase):
433
434
435    @classmethod
436    def _test_put(cls, queue, child_can_start, parent_can_continue):
437        child_can_start.wait()
438        for i in range(6):
439            queue.get()
440        parent_can_continue.set()
441
442    def test_put(self):
443        MAXSIZE = 6
444        queue = self.Queue(maxsize=MAXSIZE)
445        child_can_start = self.Event()
446        parent_can_continue = self.Event()
447
448        proc = self.Process(
449            target=self._test_put,
450            args=(queue, child_can_start, parent_can_continue)
451            )
452        proc.daemon = True
453        proc.start()
454
455        self.assertEqual(queue_empty(queue), True)
456        self.assertEqual(queue_full(queue, MAXSIZE), False)
457
458        queue.put(1)
459        queue.put(2, True)
460        queue.put(3, True, None)
461        queue.put(4, False)
462        queue.put(5, False, None)
463        queue.put_nowait(6)
464
465        # the values may be in buffer but not yet in pipe so sleep a bit
466        time.sleep(DELTA)
467
468        self.assertEqual(queue_empty(queue), False)
469        self.assertEqual(queue_full(queue, MAXSIZE), True)
470
471        put = TimingWrapper(queue.put)
472        put_nowait = TimingWrapper(queue.put_nowait)
473
474        self.assertRaises(Queue.Full, put, 7, False)
475        self.assertTimingAlmostEqual(put.elapsed, 0)
476
477        self.assertRaises(Queue.Full, put, 7, False, None)
478        self.assertTimingAlmostEqual(put.elapsed, 0)
479
480        self.assertRaises(Queue.Full, put_nowait, 7)
481        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
482
483        self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
484        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
485
486        self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
487        self.assertTimingAlmostEqual(put.elapsed, 0)
488
489        self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
490        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
491
492        child_can_start.set()
493        parent_can_continue.wait()
494
495        self.assertEqual(queue_empty(queue), True)
496        self.assertEqual(queue_full(queue, MAXSIZE), False)
497
498        proc.join()
499
500    @classmethod
501    def _test_get(cls, queue, child_can_start, parent_can_continue):
502        child_can_start.wait()
503        #queue.put(1)
504        queue.put(2)
505        queue.put(3)
506        queue.put(4)
507        queue.put(5)
508        parent_can_continue.set()
509
510    def test_get(self):
511        queue = self.Queue()
512        child_can_start = self.Event()
513        parent_can_continue = self.Event()
514
515        proc = self.Process(
516            target=self._test_get,
517            args=(queue, child_can_start, parent_can_continue)
518            )
519        proc.daemon = True
520        proc.start()
521
522        self.assertEqual(queue_empty(queue), True)
523
524        child_can_start.set()
525        parent_can_continue.wait()
526
527        time.sleep(DELTA)
528        self.assertEqual(queue_empty(queue), False)
529
530        # Hangs unexpectedly, remove for now
531        #self.assertEqual(queue.get(), 1)
532        self.assertEqual(queue.get(True, None), 2)
533        self.assertEqual(queue.get(True), 3)
534        self.assertEqual(queue.get(timeout=1), 4)
535        self.assertEqual(queue.get_nowait(), 5)
536
537        self.assertEqual(queue_empty(queue), True)
538
539        get = TimingWrapper(queue.get)
540        get_nowait = TimingWrapper(queue.get_nowait)
541
542        self.assertRaises(Queue.Empty, get, False)
543        self.assertTimingAlmostEqual(get.elapsed, 0)
544
545        self.assertRaises(Queue.Empty, get, False, None)
546        self.assertTimingAlmostEqual(get.elapsed, 0)
547
548        self.assertRaises(Queue.Empty, get_nowait)
549        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
550
551        self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
552        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
553
554        self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
555        self.assertTimingAlmostEqual(get.elapsed, 0)
556
557        self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
558        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
559
560        proc.join()
561
562    @classmethod
563    def _test_fork(cls, queue):
564        for i in range(10, 20):
565            queue.put(i)
566        # note that at this point the items may only be buffered, so the
567        # process cannot shutdown until the feeder thread has finished
568        # pushing items onto the pipe.
569
570    def test_fork(self):
571        # Old versions of Queue would fail to create a new feeder
572        # thread for a forked process if the original process had its
573        # own feeder thread.  This test checks that this no longer
574        # happens.
575
576        queue = self.Queue()
577
578        # put items on queue so that main process starts a feeder thread
579        for i in range(10):
580            queue.put(i)
581
582        # wait to make sure thread starts before we fork a new process
583        time.sleep(DELTA)
584
585        # fork process
586        p = self.Process(target=self._test_fork, args=(queue,))
587        p.daemon = True
588        p.start()
589
590        # check that all expected items are in the queue
591        for i in range(20):
592            self.assertEqual(queue.get(), i)
593        self.assertRaises(Queue.Empty, queue.get, False)
594
595        p.join()
596
597    def test_qsize(self):
598        q = self.Queue()
599        try:
600            self.assertEqual(q.qsize(), 0)
601        except NotImplementedError:
602            self.skipTest('qsize method not implemented')
603        q.put(1)
604        self.assertEqual(q.qsize(), 1)
605        q.put(5)
606        self.assertEqual(q.qsize(), 2)
607        q.get()
608        self.assertEqual(q.qsize(), 1)
609        q.get()
610        self.assertEqual(q.qsize(), 0)
611
612    @classmethod
613    def _test_task_done(cls, q):
614        for obj in iter(q.get, None):
615            time.sleep(DELTA)
616            q.task_done()
617
618    def test_task_done(self):
619        queue = self.JoinableQueue()
620
621        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
622            self.skipTest("requires 'queue.task_done()' method")
623
624        workers = [self.Process(target=self._test_task_done, args=(queue,))
625                   for i in xrange(4)]
626
627        for p in workers:
628            p.daemon = True
629            p.start()
630
631        for i in xrange(10):
632            queue.put(i)
633
634        queue.join()
635
636        for p in workers:
637            queue.put(None)
638
639        for p in workers:
640            p.join()
641
642    def test_no_import_lock_contention(self):
643        with support.temp_cwd():
644            module_name = 'imported_by_an_imported_module'
645            with open(module_name + '.py', 'w') as f:
646                f.write("""if 1:
647                    import multiprocessing
648
649                    q = multiprocessing.Queue()
650                    q.put('knock knock')
651                    q.get(timeout=3)
652                    q.close()
653                """)
654
655            with support.DirsOnSysPath(os.getcwd()):
656                try:
657                    __import__(module_name)
658                except Queue.Empty:
659                    self.fail("Probable regression on import lock contention;"
660                              " see Issue #22853")
661
662    def test_queue_feeder_donot_stop_onexc(self):
663        # bpo-30414: verify feeder handles exceptions correctly
664        if self.TYPE != 'processes':
665            self.skipTest('test not appropriate for {}'.format(self.TYPE))
666
667        class NotSerializable(object):
668            def __reduce__(self):
669                raise AttributeError
670        with test.support.captured_stderr():
671            q = self.Queue()
672            q.put(NotSerializable())
673            q.put(True)
674            # bpo-30595: use a timeout of 1 second for slow buildbots
675            self.assertTrue(q.get(timeout=1.0))
676
677
678#
679#
680#
681
682class _TestLock(BaseTestCase):
683
684    def test_lock(self):
685        lock = self.Lock()
686        self.assertEqual(lock.acquire(), True)
687        self.assertEqual(lock.acquire(False), False)
688        self.assertEqual(lock.release(), None)
689        self.assertRaises((ValueError, threading.ThreadError), lock.release)
690
691    def test_rlock(self):
692        lock = self.RLock()
693        self.assertEqual(lock.acquire(), True)
694        self.assertEqual(lock.acquire(), True)
695        self.assertEqual(lock.acquire(), True)
696        self.assertEqual(lock.release(), None)
697        self.assertEqual(lock.release(), None)
698        self.assertEqual(lock.release(), None)
699        self.assertRaises((AssertionError, RuntimeError), lock.release)
700
701    def test_lock_context(self):
702        with self.Lock():
703            pass
704
705
706class _TestSemaphore(BaseTestCase):
707
708    def _test_semaphore(self, sem):
709        self.assertReturnsIfImplemented(2, get_value, sem)
710        self.assertEqual(sem.acquire(), True)
711        self.assertReturnsIfImplemented(1, get_value, sem)
712        self.assertEqual(sem.acquire(), True)
713        self.assertReturnsIfImplemented(0, get_value, sem)
714        self.assertEqual(sem.acquire(False), False)
715        self.assertReturnsIfImplemented(0, get_value, sem)
716        self.assertEqual(sem.release(), None)
717        self.assertReturnsIfImplemented(1, get_value, sem)
718        self.assertEqual(sem.release(), None)
719        self.assertReturnsIfImplemented(2, get_value, sem)
720
721    def test_semaphore(self):
722        sem = self.Semaphore(2)
723        self._test_semaphore(sem)
724        self.assertEqual(sem.release(), None)
725        self.assertReturnsIfImplemented(3, get_value, sem)
726        self.assertEqual(sem.release(), None)
727        self.assertReturnsIfImplemented(4, get_value, sem)
728
729    def test_bounded_semaphore(self):
730        sem = self.BoundedSemaphore(2)
731        self._test_semaphore(sem)
732        # Currently fails on OS/X
733        #if HAVE_GETVALUE:
734        #    self.assertRaises(ValueError, sem.release)
735        #    self.assertReturnsIfImplemented(2, get_value, sem)
736
737    def test_timeout(self):
738        if self.TYPE != 'processes':
739            self.skipTest('test not appropriate for {}'.format(self.TYPE))
740
741        sem = self.Semaphore(0)
742        acquire = TimingWrapper(sem.acquire)
743
744        self.assertEqual(acquire(False), False)
745        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
746
747        self.assertEqual(acquire(False, None), False)
748        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
749
750        self.assertEqual(acquire(False, TIMEOUT1), False)
751        self.assertTimingAlmostEqual(acquire.elapsed, 0)
752
753        self.assertEqual(acquire(True, TIMEOUT2), False)
754        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
755
756        self.assertEqual(acquire(timeout=TIMEOUT3), False)
757        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
758
759
760class _TestCondition(BaseTestCase):
761
762    @classmethod
763    def f(cls, cond, sleeping, woken, timeout=None):
764        cond.acquire()
765        sleeping.release()
766        cond.wait(timeout)
767        woken.release()
768        cond.release()
769
770    def check_invariant(self, cond):
771        # this is only supposed to succeed when there are no sleepers
772        if self.TYPE == 'processes':
773            try:
774                sleepers = (cond._sleeping_count.get_value() -
775                            cond._woken_count.get_value())
776                self.assertEqual(sleepers, 0)
777                self.assertEqual(cond._wait_semaphore.get_value(), 0)
778            except NotImplementedError:
779                pass
780
781    def test_notify(self):
782        cond = self.Condition()
783        sleeping = self.Semaphore(0)
784        woken = self.Semaphore(0)
785
786        p = self.Process(target=self.f, args=(cond, sleeping, woken))
787        p.daemon = True
788        p.start()
789
790        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
791        p.daemon = True
792        p.start()
793
794        # wait for both children to start sleeping
795        sleeping.acquire()
796        sleeping.acquire()
797
798        # check no process/thread has woken up
799        time.sleep(DELTA)
800        self.assertReturnsIfImplemented(0, get_value, woken)
801
802        # wake up one process/thread
803        cond.acquire()
804        cond.notify()
805        cond.release()
806
807        # check one process/thread has woken up
808        time.sleep(DELTA)
809        self.assertReturnsIfImplemented(1, get_value, woken)
810
811        # wake up another
812        cond.acquire()
813        cond.notify()
814        cond.release()
815
816        # check other has woken up
817        time.sleep(DELTA)
818        self.assertReturnsIfImplemented(2, get_value, woken)
819
820        # check state is not mucked up
821        self.check_invariant(cond)
822        p.join()
823
824    def test_notify_all(self):
825        cond = self.Condition()
826        sleeping = self.Semaphore(0)
827        woken = self.Semaphore(0)
828
829        # start some threads/processes which will timeout
830        for i in range(3):
831            p = self.Process(target=self.f,
832                             args=(cond, sleeping, woken, TIMEOUT1))
833            p.daemon = True
834            p.start()
835
836            t = threading.Thread(target=self.f,
837                                 args=(cond, sleeping, woken, TIMEOUT1))
838            t.daemon = True
839            t.start()
840
841        # wait for them all to sleep
842        for i in xrange(6):
843            sleeping.acquire()
844
845        # check they have all timed out
846        for i in xrange(6):
847            woken.acquire()
848        self.assertReturnsIfImplemented(0, get_value, woken)
849
850        # check state is not mucked up
851        self.check_invariant(cond)
852
853        # start some more threads/processes
854        for i in range(3):
855            p = self.Process(target=self.f, args=(cond, sleeping, woken))
856            p.daemon = True
857            p.start()
858
859            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
860            t.daemon = True
861            t.start()
862
863        # wait for them to all sleep
864        for i in xrange(6):
865            sleeping.acquire()
866
867        # check no process/thread has woken up
868        time.sleep(DELTA)
869        self.assertReturnsIfImplemented(0, get_value, woken)
870
871        # wake them all up
872        cond.acquire()
873        cond.notify_all()
874        cond.release()
875
876        # check they have all woken
877        for i in range(10):
878            try:
879                if get_value(woken) == 6:
880                    break
881            except NotImplementedError:
882                break
883            time.sleep(DELTA)
884        self.assertReturnsIfImplemented(6, get_value, woken)
885
886        # check state is not mucked up
887        self.check_invariant(cond)
888
889    def test_timeout(self):
890        cond = self.Condition()
891        wait = TimingWrapper(cond.wait)
892        cond.acquire()
893        res = wait(TIMEOUT1)
894        cond.release()
895        self.assertEqual(res, None)
896        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
897
898
899class _TestEvent(BaseTestCase):
900
901    @classmethod
902    def _test_event(cls, event):
903        time.sleep(TIMEOUT2)
904        event.set()
905
906    def test_event(self):
907        event = self.Event()
908        wait = TimingWrapper(event.wait)
909
910        # Removed temporarily, due to API shear, this does not
911        # work with threading._Event objects. is_set == isSet
912        self.assertEqual(event.is_set(), False)
913
914        # Removed, threading.Event.wait() will return the value of the __flag
915        # instead of None. API Shear with the semaphore backed mp.Event
916        self.assertEqual(wait(0.0), False)
917        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
918        self.assertEqual(wait(TIMEOUT1), False)
919        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
920
921        event.set()
922
923        # See note above on the API differences
924        self.assertEqual(event.is_set(), True)
925        self.assertEqual(wait(), True)
926        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
927        self.assertEqual(wait(TIMEOUT1), True)
928        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
929        # self.assertEqual(event.is_set(), True)
930
931        event.clear()
932
933        #self.assertEqual(event.is_set(), False)
934
935        p = self.Process(target=self._test_event, args=(event,))
936        p.daemon = True
937        p.start()
938        self.assertEqual(wait(), True)
939
940#
941#
942#
943
944class _TestValue(BaseTestCase):
945
946    ALLOWED_TYPES = ('processes',)
947
948    codes_values = [
949        ('i', 4343, 24234),
950        ('d', 3.625, -4.25),
951        ('h', -232, 234),
952        ('c', latin('x'), latin('y'))
953        ]
954
955    def setUp(self):
956        if not HAS_SHAREDCTYPES:
957            self.skipTest("requires multiprocessing.sharedctypes")
958
959    @classmethod
960    def _test(cls, values):
961        for sv, cv in zip(values, cls.codes_values):
962            sv.value = cv[2]
963
964
965    def test_value(self, raw=False):
966        if raw:
967            values = [self.RawValue(code, value)
968                      for code, value, _ in self.codes_values]
969        else:
970            values = [self.Value(code, value)
971                      for code, value, _ in self.codes_values]
972
973        for sv, cv in zip(values, self.codes_values):
974            self.assertEqual(sv.value, cv[1])
975
976        proc = self.Process(target=self._test, args=(values,))
977        proc.daemon = True
978        proc.start()
979        proc.join()
980
981        for sv, cv in zip(values, self.codes_values):
982            self.assertEqual(sv.value, cv[2])
983
984    def test_rawvalue(self):
985        self.test_value(raw=True)
986
987    def test_getobj_getlock(self):
988        val1 = self.Value('i', 5)
989        lock1 = val1.get_lock()
990        obj1 = val1.get_obj()
991
992        val2 = self.Value('i', 5, lock=None)
993        lock2 = val2.get_lock()
994        obj2 = val2.get_obj()
995
996        lock = self.Lock()
997        val3 = self.Value('i', 5, lock=lock)
998        lock3 = val3.get_lock()
999        obj3 = val3.get_obj()
1000        self.assertEqual(lock, lock3)
1001
1002        arr4 = self.Value('i', 5, lock=False)
1003        self.assertFalse(hasattr(arr4, 'get_lock'))
1004        self.assertFalse(hasattr(arr4, 'get_obj'))
1005
1006        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1007
1008        arr5 = self.RawValue('i', 5)
1009        self.assertFalse(hasattr(arr5, 'get_lock'))
1010        self.assertFalse(hasattr(arr5, 'get_obj'))
1011
1012
1013class _TestArray(BaseTestCase):
1014
1015    ALLOWED_TYPES = ('processes',)
1016
1017    @classmethod
1018    def f(cls, seq):
1019        for i in range(1, len(seq)):
1020            seq[i] += seq[i-1]
1021
1022    @unittest.skipIf(c_int is None, "requires _ctypes")
1023    def test_array(self, raw=False):
1024        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1025        if raw:
1026            arr = self.RawArray('i', seq)
1027        else:
1028            arr = self.Array('i', seq)
1029
1030        self.assertEqual(len(arr), len(seq))
1031        self.assertEqual(arr[3], seq[3])
1032        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1033
1034        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1035
1036        self.assertEqual(list(arr[:]), seq)
1037
1038        self.f(seq)
1039
1040        p = self.Process(target=self.f, args=(arr,))
1041        p.daemon = True
1042        p.start()
1043        p.join()
1044
1045        self.assertEqual(list(arr[:]), seq)
1046
1047    @unittest.skipIf(c_int is None, "requires _ctypes")
1048    def test_array_from_size(self):
1049        size = 10
1050        # Test for zeroing (see issue #11675).
1051        # The repetition below strengthens the test by increasing the chances
1052        # of previously allocated non-zero memory being used for the new array
1053        # on the 2nd and 3rd loops.
1054        for _ in range(3):
1055            arr = self.Array('i', size)
1056            self.assertEqual(len(arr), size)
1057            self.assertEqual(list(arr), [0] * size)
1058            arr[:] = range(10)
1059            self.assertEqual(list(arr), range(10))
1060            del arr
1061
1062    @unittest.skipIf(c_int is None, "requires _ctypes")
1063    def test_rawarray(self):
1064        self.test_array(raw=True)
1065
1066    @unittest.skipIf(c_int is None, "requires _ctypes")
1067    def test_array_accepts_long(self):
1068        arr = self.Array('i', 10L)
1069        self.assertEqual(len(arr), 10)
1070        raw_arr = self.RawArray('i', 10L)
1071        self.assertEqual(len(raw_arr), 10)
1072
1073    @unittest.skipIf(c_int is None, "requires _ctypes")
1074    def test_getobj_getlock_obj(self):
1075        arr1 = self.Array('i', range(10))
1076        lock1 = arr1.get_lock()
1077        obj1 = arr1.get_obj()
1078
1079        arr2 = self.Array('i', range(10), lock=None)
1080        lock2 = arr2.get_lock()
1081        obj2 = arr2.get_obj()
1082
1083        lock = self.Lock()
1084        arr3 = self.Array('i', range(10), lock=lock)
1085        lock3 = arr3.get_lock()
1086        obj3 = arr3.get_obj()
1087        self.assertEqual(lock, lock3)
1088
1089        arr4 = self.Array('i', range(10), lock=False)
1090        self.assertFalse(hasattr(arr4, 'get_lock'))
1091        self.assertFalse(hasattr(arr4, 'get_obj'))
1092        self.assertRaises(AttributeError,
1093                          self.Array, 'i', range(10), lock='notalock')
1094
1095        arr5 = self.RawArray('i', range(10))
1096        self.assertFalse(hasattr(arr5, 'get_lock'))
1097        self.assertFalse(hasattr(arr5, 'get_obj'))
1098
1099#
1100#
1101#
1102
1103class _TestContainers(BaseTestCase):
1104
1105    ALLOWED_TYPES = ('manager',)
1106
1107    def test_list(self):
1108        a = self.list(range(10))
1109        self.assertEqual(a[:], range(10))
1110
1111        b = self.list()
1112        self.assertEqual(b[:], [])
1113
1114        b.extend(range(5))
1115        self.assertEqual(b[:], range(5))
1116
1117        self.assertEqual(b[2], 2)
1118        self.assertEqual(b[2:10], [2,3,4])
1119
1120        b *= 2
1121        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1122
1123        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1124
1125        self.assertEqual(a[:], range(10))
1126
1127        d = [a, b]
1128        e = self.list(d)
1129        self.assertEqual(
1130            e[:],
1131            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1132            )
1133
1134        f = self.list([a])
1135        a.append('hello')
1136        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1137
1138    def test_dict(self):
1139        d = self.dict()
1140        indices = range(65, 70)
1141        for i in indices:
1142            d[i] = chr(i)
1143        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1144        self.assertEqual(sorted(d.keys()), indices)
1145        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1146        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1147
1148    def test_namespace(self):
1149        n = self.Namespace()
1150        n.name = 'Bob'
1151        n.job = 'Builder'
1152        n._hidden = 'hidden'
1153        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1154        del n.job
1155        self.assertEqual(str(n), "Namespace(name='Bob')")
1156        self.assertTrue(hasattr(n, 'name'))
1157        self.assertTrue(not hasattr(n, 'job'))
1158
1159#
1160#
1161#
1162
1163def sqr(x, wait=0.0):
1164    time.sleep(wait)
1165    return x*x
1166
1167def identity(x):
1168    return x
1169
1170class CountedObject(object):
1171    n_instances = 0
1172
1173    def __new__(cls):
1174        cls.n_instances += 1
1175        return object.__new__(cls)
1176
1177    def __del__(self):
1178        type(self).n_instances -= 1
1179
1180class SayWhenError(ValueError): pass
1181
1182def exception_throwing_generator(total, when):
1183    for i in range(total):
1184        if i == when:
1185            raise SayWhenError("Somebody said when")
1186        yield i
1187
1188class _TestPool(BaseTestCase):
1189
1190    def test_apply(self):
1191        papply = self.pool.apply
1192        self.assertEqual(papply(sqr, (5,)), sqr(5))
1193        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1194
1195    def test_map(self):
1196        pmap = self.pool.map
1197        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1198        self.assertEqual(pmap(sqr, range(100), chunksize=20),
1199                         map(sqr, range(100)))
1200
1201    def test_map_unplicklable(self):
1202        # Issue #19425 -- failure to pickle should not cause a hang
1203        if self.TYPE == 'threads':
1204            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1205        class A(object):
1206            def __reduce__(self):
1207                raise RuntimeError('cannot pickle')
1208        with self.assertRaises(RuntimeError):
1209            self.pool.map(sqr, [A()]*10)
1210
1211    def test_map_chunksize(self):
1212        try:
1213            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1214        except multiprocessing.TimeoutError:
1215            self.fail("pool.map_async with chunksize stalled on null list")
1216
1217    def test_async(self):
1218        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1219        get = TimingWrapper(res.get)
1220        self.assertEqual(get(), 49)
1221        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1222
1223    def test_async_timeout(self):
1224        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1225        get = TimingWrapper(res.get)
1226        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1227        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1228
1229    def test_imap(self):
1230        it = self.pool.imap(sqr, range(10))
1231        self.assertEqual(list(it), map(sqr, range(10)))
1232
1233        it = self.pool.imap(sqr, range(10))
1234        for i in range(10):
1235            self.assertEqual(it.next(), i*i)
1236        self.assertRaises(StopIteration, it.next)
1237
1238        it = self.pool.imap(sqr, range(1000), chunksize=100)
1239        for i in range(1000):
1240            self.assertEqual(it.next(), i*i)
1241        self.assertRaises(StopIteration, it.next)
1242
1243    def test_imap_handle_iterable_exception(self):
1244        if self.TYPE == 'manager':
1245            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1246
1247        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1248        for i in range(3):
1249            self.assertEqual(next(it), i*i)
1250        self.assertRaises(SayWhenError, it.next)
1251
1252        # SayWhenError seen at start of problematic chunk's results
1253        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1254        for i in range(6):
1255            self.assertEqual(next(it), i*i)
1256        self.assertRaises(SayWhenError, it.next)
1257        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1258        for i in range(4):
1259            self.assertEqual(next(it), i*i)
1260        self.assertRaises(SayWhenError, it.next)
1261
1262    def test_imap_unordered(self):
1263        it = self.pool.imap_unordered(sqr, range(100))
1264        self.assertEqual(sorted(it), map(sqr, range(100)))
1265
1266        it = self.pool.imap_unordered(sqr, range(1000), chunksize=100)
1267        self.assertEqual(sorted(it), map(sqr, range(1000)))
1268
1269    def test_imap_unordered_handle_iterable_exception(self):
1270        if self.TYPE == 'manager':
1271            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1272
1273        it = self.pool.imap_unordered(sqr,
1274                                      exception_throwing_generator(10, 3),
1275                                      1)
1276        expected_values = map(sqr, range(10))
1277        with self.assertRaises(SayWhenError):
1278            # imap_unordered makes it difficult to anticipate the SayWhenError
1279            for i in range(10):
1280                value = next(it)
1281                self.assertIn(value, expected_values)
1282                expected_values.remove(value)
1283
1284        it = self.pool.imap_unordered(sqr,
1285                                      exception_throwing_generator(20, 7),
1286                                      2)
1287        expected_values = map(sqr, range(20))
1288        with self.assertRaises(SayWhenError):
1289            for i in range(20):
1290                value = next(it)
1291                self.assertIn(value, expected_values)
1292                expected_values.remove(value)
1293
1294    def test_make_pool(self):
1295        self.assertRaises(ValueError, multiprocessing.Pool, -1)
1296        self.assertRaises(ValueError, multiprocessing.Pool, 0)
1297
1298        p = multiprocessing.Pool(3)
1299        self.assertEqual(3, len(p._pool))
1300        p.close()
1301        p.join()
1302
1303    def test_terminate(self):
1304        p = self.Pool(4)
1305        result = p.map_async(
1306            time.sleep, [0.1 for i in range(10000)], chunksize=1
1307            )
1308        p.terminate()
1309        join = TimingWrapper(p.join)
1310        join()
1311        self.assertTrue(join.elapsed < 0.2)
1312
1313    def test_empty_iterable(self):
1314        # See Issue 12157
1315        p = self.Pool(1)
1316
1317        self.assertEqual(p.map(sqr, []), [])
1318        self.assertEqual(list(p.imap(sqr, [])), [])
1319        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1320        self.assertEqual(p.map_async(sqr, []).get(), [])
1321
1322        p.close()
1323        p.join()
1324
1325    def test_release_task_refs(self):
1326        # Issue #29861: task arguments and results should not be kept
1327        # alive after we are done with them.
1328        objs = list(CountedObject() for i in range(10))
1329        refs = list(weakref.ref(o) for o in objs)
1330        self.pool.map(identity, objs)
1331
1332        del objs
1333        time.sleep(DELTA)  # let threaded cleanup code run
1334        self.assertEqual(set(wr() for wr in refs), {None})
1335        # With a process pool, copies of the objects are returned, check
1336        # they were released too.
1337        self.assertEqual(CountedObject.n_instances, 0)
1338
1339
1340def unpickleable_result():
1341    return lambda: 42
1342
1343class _TestPoolWorkerErrors(BaseTestCase):
1344    ALLOWED_TYPES = ('processes', )
1345
1346    def test_unpickleable_result(self):
1347        from multiprocessing.pool import MaybeEncodingError
1348        p = multiprocessing.Pool(2)
1349
1350        # Make sure we don't lose pool processes because of encoding errors.
1351        for iteration in range(20):
1352            res = p.apply_async(unpickleable_result)
1353            self.assertRaises(MaybeEncodingError, res.get)
1354
1355        p.close()
1356        p.join()
1357
1358class _TestPoolWorkerLifetime(BaseTestCase):
1359
1360    ALLOWED_TYPES = ('processes', )
1361    def test_pool_worker_lifetime(self):
1362        p = multiprocessing.Pool(3, maxtasksperchild=10)
1363        self.assertEqual(3, len(p._pool))
1364        origworkerpids = [w.pid for w in p._pool]
1365        # Run many tasks so each worker gets replaced (hopefully)
1366        results = []
1367        for i in range(100):
1368            results.append(p.apply_async(sqr, (i, )))
1369        # Fetch the results and verify we got the right answers,
1370        # also ensuring all the tasks have completed.
1371        for (j, res) in enumerate(results):
1372            self.assertEqual(res.get(), sqr(j))
1373        # Refill the pool
1374        p._repopulate_pool()
1375        # Wait until all workers are alive
1376        # (countdown * DELTA = 5 seconds max startup process time)
1377        countdown = 50
1378        while countdown and not all(w.is_alive() for w in p._pool):
1379            countdown -= 1
1380            time.sleep(DELTA)
1381        finalworkerpids = [w.pid for w in p._pool]
1382        # All pids should be assigned.  See issue #7805.
1383        self.assertNotIn(None, origworkerpids)
1384        self.assertNotIn(None, finalworkerpids)
1385        # Finally, check that the worker pids have changed
1386        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1387        p.close()
1388        p.join()
1389
1390    def test_pool_worker_lifetime_early_close(self):
1391        # Issue #10332: closing a pool whose workers have limited lifetimes
1392        # before all the tasks completed would make join() hang.
1393        p = multiprocessing.Pool(3, maxtasksperchild=1)
1394        results = []
1395        for i in range(6):
1396            results.append(p.apply_async(sqr, (i, 0.3)))
1397        p.close()
1398        p.join()
1399        # check the results
1400        for (j, res) in enumerate(results):
1401            self.assertEqual(res.get(), sqr(j))
1402
1403
1404#
1405# Test that manager has expected number of shared objects left
1406#
1407
1408class _TestZZZNumberOfObjects(BaseTestCase):
1409    # Because test cases are sorted alphabetically, this one will get
1410    # run after all the other tests for the manager.  It tests that
1411    # there have been no "reference leaks" for the manager's shared
1412    # objects.  Note the comment in _TestPool.test_terminate().
1413    ALLOWED_TYPES = ('manager',)
1414
1415    def test_number_of_objects(self):
1416        EXPECTED_NUMBER = 1                # the pool object is still alive
1417        multiprocessing.active_children()  # discard dead process objs
1418        gc.collect()                       # do garbage collection
1419        refs = self.manager._number_of_objects()
1420        debug_info = self.manager._debug_info()
1421        if refs != EXPECTED_NUMBER:
1422            print self.manager._debug_info()
1423            print debug_info
1424
1425        self.assertEqual(refs, EXPECTED_NUMBER)
1426
1427#
1428# Test of creating a customized manager class
1429#
1430
1431from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1432
1433class FooBar(object):
1434    def f(self):
1435        return 'f()'
1436    def g(self):
1437        raise ValueError
1438    def _h(self):
1439        return '_h()'
1440
1441def baz():
1442    for i in xrange(10):
1443        yield i*i
1444
1445class IteratorProxy(BaseProxy):
1446    _exposed_ = ('next', '__next__')
1447    def __iter__(self):
1448        return self
1449    def next(self):
1450        return self._callmethod('next')
1451    def __next__(self):
1452        return self._callmethod('__next__')
1453
1454class MyManager(BaseManager):
1455    pass
1456
1457MyManager.register('Foo', callable=FooBar)
1458MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1459MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1460
1461
1462class _TestMyManager(BaseTestCase):
1463
1464    ALLOWED_TYPES = ('manager',)
1465
1466    def test_mymanager(self):
1467        manager = MyManager()
1468        manager.start()
1469
1470        foo = manager.Foo()
1471        bar = manager.Bar()
1472        baz = manager.baz()
1473
1474        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1475        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1476
1477        self.assertEqual(foo_methods, ['f', 'g'])
1478        self.assertEqual(bar_methods, ['f', '_h'])
1479
1480        self.assertEqual(foo.f(), 'f()')
1481        self.assertRaises(ValueError, foo.g)
1482        self.assertEqual(foo._callmethod('f'), 'f()')
1483        self.assertRaises(RemoteError, foo._callmethod, '_h')
1484
1485        self.assertEqual(bar.f(), 'f()')
1486        self.assertEqual(bar._h(), '_h()')
1487        self.assertEqual(bar._callmethod('f'), 'f()')
1488        self.assertEqual(bar._callmethod('_h'), '_h()')
1489
1490        self.assertEqual(list(baz), [i*i for i in range(10)])
1491
1492        manager.shutdown()
1493
1494#
1495# Test of connecting to a remote server and using xmlrpclib for serialization
1496#
1497
1498_queue = Queue.Queue()
1499def get_queue():
1500    return _queue
1501
1502class QueueManager(BaseManager):
1503    '''manager class used by server process'''
1504QueueManager.register('get_queue', callable=get_queue)
1505
1506class QueueManager2(BaseManager):
1507    '''manager class which specifies the same interface as QueueManager'''
1508QueueManager2.register('get_queue')
1509
1510
1511SERIALIZER = 'xmlrpclib'
1512
1513class _TestRemoteManager(BaseTestCase):
1514
1515    ALLOWED_TYPES = ('manager',)
1516    values = ['hello world', None, True, 2.25,
1517              #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1518              ]
1519    result = values[:]
1520    if support.have_unicode:
1521        #result[-1] = u'hall\xe5 v\xe4rlden'
1522        uvalue = support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1523                           r'\u0441\u0432\u0456\u0442')
1524        values.append(uvalue)
1525        result.append(uvalue)
1526
1527    @classmethod
1528    def _putter(cls, address, authkey):
1529        manager = QueueManager2(
1530            address=address, authkey=authkey, serializer=SERIALIZER
1531            )
1532        manager.connect()
1533        queue = manager.get_queue()
1534        # Note that xmlrpclib will deserialize object as a list not a tuple
1535        queue.put(tuple(cls.values))
1536
1537    def test_remote(self):
1538        authkey = os.urandom(32)
1539
1540        manager = QueueManager(
1541            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
1542            )
1543        manager.start()
1544
1545        p = self.Process(target=self._putter, args=(manager.address, authkey))
1546        p.daemon = True
1547        p.start()
1548
1549        manager2 = QueueManager2(
1550            address=manager.address, authkey=authkey, serializer=SERIALIZER
1551            )
1552        manager2.connect()
1553        queue = manager2.get_queue()
1554
1555        self.assertEqual(queue.get(), self.result)
1556
1557        # Because we are using xmlrpclib for serialization instead of
1558        # pickle this will cause a serialization error.
1559        self.assertRaises(Exception, queue.put, time.sleep)
1560
1561        # Make queue finalizer run before the server is stopped
1562        del queue
1563        manager.shutdown()
1564
1565class _TestManagerRestart(BaseTestCase):
1566
1567    @classmethod
1568    def _putter(cls, address, authkey):
1569        manager = QueueManager(
1570            address=address, authkey=authkey, serializer=SERIALIZER)
1571        manager.connect()
1572        queue = manager.get_queue()
1573        queue.put('hello world')
1574
1575    def test_rapid_restart(self):
1576        authkey = os.urandom(32)
1577        manager = QueueManager(
1578            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
1579        srvr = manager.get_server()
1580        addr = srvr.address
1581        # Close the connection.Listener socket which gets opened as a part
1582        # of manager.get_server(). It's not needed for the test.
1583        srvr.listener.close()
1584        manager.start()
1585
1586        p = self.Process(target=self._putter, args=(manager.address, authkey))
1587        p.start()
1588        p.join()
1589        queue = manager.get_queue()
1590        self.assertEqual(queue.get(), 'hello world')
1591        del queue
1592        manager.shutdown()
1593
1594        manager = QueueManager(
1595            address=addr, authkey=authkey, serializer=SERIALIZER)
1596        manager.start()
1597        manager.shutdown()
1598
1599#
1600#
1601#
1602
1603SENTINEL = latin('')
1604
1605class _TestConnection(BaseTestCase):
1606
1607    ALLOWED_TYPES = ('processes', 'threads')
1608
1609    @classmethod
1610    def _echo(cls, conn):
1611        for msg in iter(conn.recv_bytes, SENTINEL):
1612            conn.send_bytes(msg)
1613        conn.close()
1614
1615    def test_connection(self):
1616        conn, child_conn = self.Pipe()
1617
1618        p = self.Process(target=self._echo, args=(child_conn,))
1619        p.daemon = True
1620        p.start()
1621
1622        seq = [1, 2.25, None]
1623        msg = latin('hello world')
1624        longmsg = msg * 10
1625        arr = array.array('i', range(4))
1626
1627        if self.TYPE == 'processes':
1628            self.assertEqual(type(conn.fileno()), int)
1629
1630        self.assertEqual(conn.send(seq), None)
1631        self.assertEqual(conn.recv(), seq)
1632
1633        self.assertEqual(conn.send_bytes(msg), None)
1634        self.assertEqual(conn.recv_bytes(), msg)
1635
1636        if self.TYPE == 'processes':
1637            buffer = array.array('i', [0]*10)
1638            expected = list(arr) + [0] * (10 - len(arr))
1639            self.assertEqual(conn.send_bytes(arr), None)
1640            self.assertEqual(conn.recv_bytes_into(buffer),
1641                             len(arr) * buffer.itemsize)
1642            self.assertEqual(list(buffer), expected)
1643
1644            buffer = array.array('i', [0]*10)
1645            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1646            self.assertEqual(conn.send_bytes(arr), None)
1647            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1648                             len(arr) * buffer.itemsize)
1649            self.assertEqual(list(buffer), expected)
1650
1651            buffer = bytearray(latin(' ' * 40))
1652            self.assertEqual(conn.send_bytes(longmsg), None)
1653            try:
1654                res = conn.recv_bytes_into(buffer)
1655            except multiprocessing.BufferTooShort, e:
1656                self.assertEqual(e.args, (longmsg,))
1657            else:
1658                self.fail('expected BufferTooShort, got %s' % res)
1659
1660        poll = TimingWrapper(conn.poll)
1661
1662        self.assertEqual(poll(), False)
1663        self.assertTimingAlmostEqual(poll.elapsed, 0)
1664
1665        self.assertEqual(poll(TIMEOUT1), False)
1666        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1667
1668        conn.send(None)
1669        time.sleep(.1)
1670
1671        self.assertEqual(poll(TIMEOUT1), True)
1672        self.assertTimingAlmostEqual(poll.elapsed, 0)
1673
1674        self.assertEqual(conn.recv(), None)
1675
1676        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
1677        conn.send_bytes(really_big_msg)
1678        self.assertEqual(conn.recv_bytes(), really_big_msg)
1679
1680        conn.send_bytes(SENTINEL)                          # tell child to quit
1681        child_conn.close()
1682
1683        if self.TYPE == 'processes':
1684            self.assertEqual(conn.readable, True)
1685            self.assertEqual(conn.writable, True)
1686            self.assertRaises(EOFError, conn.recv)
1687            self.assertRaises(EOFError, conn.recv_bytes)
1688
1689        p.join()
1690
1691    def test_duplex_false(self):
1692        reader, writer = self.Pipe(duplex=False)
1693        self.assertEqual(writer.send(1), None)
1694        self.assertEqual(reader.recv(), 1)
1695        if self.TYPE == 'processes':
1696            self.assertEqual(reader.readable, True)
1697            self.assertEqual(reader.writable, False)
1698            self.assertEqual(writer.readable, False)
1699            self.assertEqual(writer.writable, True)
1700            self.assertRaises(IOError, reader.send, 2)
1701            self.assertRaises(IOError, writer.recv)
1702            self.assertRaises(IOError, writer.poll)
1703
1704    def test_spawn_close(self):
1705        # We test that a pipe connection can be closed by parent
1706        # process immediately after child is spawned.  On Windows this
1707        # would have sometimes failed on old versions because
1708        # child_conn would be closed before the child got a chance to
1709        # duplicate it.
1710        conn, child_conn = self.Pipe()
1711
1712        p = self.Process(target=self._echo, args=(child_conn,))
1713        p.daemon = True
1714        p.start()
1715        child_conn.close()    # this might complete before child initializes
1716
1717        msg = latin('hello')
1718        conn.send_bytes(msg)
1719        self.assertEqual(conn.recv_bytes(), msg)
1720
1721        conn.send_bytes(SENTINEL)
1722        conn.close()
1723        p.join()
1724
1725    def test_sendbytes(self):
1726        if self.TYPE != 'processes':
1727            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1728
1729        msg = latin('abcdefghijklmnopqrstuvwxyz')
1730        a, b = self.Pipe()
1731
1732        a.send_bytes(msg)
1733        self.assertEqual(b.recv_bytes(), msg)
1734
1735        a.send_bytes(msg, 5)
1736        self.assertEqual(b.recv_bytes(), msg[5:])
1737
1738        a.send_bytes(msg, 7, 8)
1739        self.assertEqual(b.recv_bytes(), msg[7:7+8])
1740
1741        a.send_bytes(msg, 26)
1742        self.assertEqual(b.recv_bytes(), latin(''))
1743
1744        a.send_bytes(msg, 26, 0)
1745        self.assertEqual(b.recv_bytes(), latin(''))
1746
1747        self.assertRaises(ValueError, a.send_bytes, msg, 27)
1748
1749        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1750
1751        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1752
1753        self.assertRaises(ValueError, a.send_bytes, msg, -1)
1754
1755        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1756
1757    @classmethod
1758    def _is_fd_assigned(cls, fd):
1759        try:
1760            os.fstat(fd)
1761        except OSError as e:
1762            if e.errno == errno.EBADF:
1763                return False
1764            raise
1765        else:
1766            return True
1767
1768    @classmethod
1769    def _writefd(cls, conn, data, create_dummy_fds=False):
1770        if create_dummy_fds:
1771            for i in range(0, 256):
1772                if not cls._is_fd_assigned(i):
1773                    os.dup2(conn.fileno(), i)
1774        fd = reduction.recv_handle(conn)
1775        if msvcrt:
1776            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1777        os.write(fd, data)
1778        os.close(fd)
1779
1780    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1781    def test_fd_transfer(self):
1782        if self.TYPE != 'processes':
1783            self.skipTest("only makes sense with processes")
1784        conn, child_conn = self.Pipe(duplex=True)
1785
1786        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
1787        p.daemon = True
1788        p.start()
1789        with open(support.TESTFN, "wb") as f:
1790            fd = f.fileno()
1791            if msvcrt:
1792                fd = msvcrt.get_osfhandle(fd)
1793            reduction.send_handle(conn, fd, p.pid)
1794        p.join()
1795        with open(support.TESTFN, "rb") as f:
1796            self.assertEqual(f.read(), b"foo")
1797
1798    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1799    @unittest.skipIf(sys.platform == "win32",
1800                     "test semantics don't make sense on Windows")
1801    @unittest.skipIf(MAXFD <= 256,
1802                     "largest assignable fd number is too small")
1803    @unittest.skipUnless(hasattr(os, "dup2"),
1804                         "test needs os.dup2()")
1805    def test_large_fd_transfer(self):
1806        # With fd > 256 (issue #11657)
1807        if self.TYPE != 'processes':
1808            self.skipTest("only makes sense with processes")
1809        conn, child_conn = self.Pipe(duplex=True)
1810
1811        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
1812        p.daemon = True
1813        p.start()
1814        with open(support.TESTFN, "wb") as f:
1815            fd = f.fileno()
1816            for newfd in range(256, MAXFD):
1817                if not self._is_fd_assigned(newfd):
1818                    break
1819            else:
1820                self.fail("could not find an unassigned large file descriptor")
1821            os.dup2(fd, newfd)
1822            try:
1823                reduction.send_handle(conn, newfd, p.pid)
1824            finally:
1825                os.close(newfd)
1826        p.join()
1827        with open(support.TESTFN, "rb") as f:
1828            self.assertEqual(f.read(), b"bar")
1829
1830    @classmethod
1831    def _send_data_without_fd(self, conn):
1832        os.write(conn.fileno(), b"\0")
1833
1834    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
1835    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1836    def test_missing_fd_transfer(self):
1837        # Check that exception is raised when received data is not
1838        # accompanied by a file descriptor in ancillary data.
1839        if self.TYPE != 'processes':
1840            self.skipTest("only makes sense with processes")
1841        conn, child_conn = self.Pipe(duplex=True)
1842
1843        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1844        p.daemon = True
1845        p.start()
1846        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1847        p.join()
1848
1849class _TestListenerClient(BaseTestCase):
1850
1851    ALLOWED_TYPES = ('processes', 'threads')
1852
1853    @classmethod
1854    def _test(cls, address):
1855        conn = cls.connection.Client(address)
1856        conn.send('hello')
1857        conn.close()
1858
1859    def test_listener_client(self):
1860        for family in self.connection.families:
1861            l = self.connection.Listener(family=family)
1862            p = self.Process(target=self._test, args=(l.address,))
1863            p.daemon = True
1864            p.start()
1865            conn = l.accept()
1866            self.assertEqual(conn.recv(), 'hello')
1867            p.join()
1868            l.close()
1869
1870    def test_issue14725(self):
1871        l = self.connection.Listener()
1872        p = self.Process(target=self._test, args=(l.address,))
1873        p.daemon = True
1874        p.start()
1875        time.sleep(1)
1876        # On Windows the client process should by now have connected,
1877        # written data and closed the pipe handle by now.  This causes
1878        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
1879        # 14725.
1880        conn = l.accept()
1881        self.assertEqual(conn.recv(), 'hello')
1882        conn.close()
1883        p.join()
1884        l.close()
1885
1886#
1887# Test of sending connection and socket objects between processes
1888#
1889"""
1890class _TestPicklingConnections(BaseTestCase):
1891
1892    ALLOWED_TYPES = ('processes',)
1893
1894    def _listener(self, conn, families):
1895        for fam in families:
1896            l = self.connection.Listener(family=fam)
1897            conn.send(l.address)
1898            new_conn = l.accept()
1899            conn.send(new_conn)
1900
1901        if self.TYPE == 'processes':
1902            l = socket.socket()
1903            l.bind(('localhost', 0))
1904            conn.send(l.getsockname())
1905            l.listen(1)
1906            new_conn, addr = l.accept()
1907            conn.send(new_conn)
1908
1909        conn.recv()
1910
1911    def _remote(self, conn):
1912        for (address, msg) in iter(conn.recv, None):
1913            client = self.connection.Client(address)
1914            client.send(msg.upper())
1915            client.close()
1916
1917        if self.TYPE == 'processes':
1918            address, msg = conn.recv()
1919            client = socket.socket()
1920            client.connect(address)
1921            client.sendall(msg.upper())
1922            client.close()
1923
1924        conn.close()
1925
1926    def test_pickling(self):
1927        try:
1928            multiprocessing.allow_connection_pickling()
1929        except ImportError:
1930            return
1931
1932        families = self.connection.families
1933
1934        lconn, lconn0 = self.Pipe()
1935        lp = self.Process(target=self._listener, args=(lconn0, families))
1936        lp.daemon = True
1937        lp.start()
1938        lconn0.close()
1939
1940        rconn, rconn0 = self.Pipe()
1941        rp = self.Process(target=self._remote, args=(rconn0,))
1942        rp.daemon = True
1943        rp.start()
1944        rconn0.close()
1945
1946        for fam in families:
1947            msg = ('This connection uses family %s' % fam).encode('ascii')
1948            address = lconn.recv()
1949            rconn.send((address, msg))
1950            new_conn = lconn.recv()
1951            self.assertEqual(new_conn.recv(), msg.upper())
1952
1953        rconn.send(None)
1954
1955        if self.TYPE == 'processes':
1956            msg = latin('This connection uses a normal socket')
1957            address = lconn.recv()
1958            rconn.send((address, msg))
1959            if hasattr(socket, 'fromfd'):
1960                new_conn = lconn.recv()
1961                self.assertEqual(new_conn.recv(100), msg.upper())
1962            else:
1963                # XXX On Windows with Py2.6 need to backport fromfd()
1964                discard = lconn.recv_bytes()
1965
1966        lconn.send(None)
1967
1968        rconn.close()
1969        lconn.close()
1970
1971        lp.join()
1972        rp.join()
1973"""
1974#
1975#
1976#
1977
1978class _TestHeap(BaseTestCase):
1979
1980    ALLOWED_TYPES = ('processes',)
1981
1982    def test_heap(self):
1983        iterations = 5000
1984        maxblocks = 50
1985        blocks = []
1986
1987        # create and destroy lots of blocks of different sizes
1988        for i in xrange(iterations):
1989            size = int(random.lognormvariate(0, 1) * 1000)
1990            b = multiprocessing.heap.BufferWrapper(size)
1991            blocks.append(b)
1992            if len(blocks) > maxblocks:
1993                i = random.randrange(maxblocks)
1994                del blocks[i]
1995
1996        # get the heap object
1997        heap = multiprocessing.heap.BufferWrapper._heap
1998
1999        # verify the state of the heap
2000        all = []
2001        occupied = 0
2002        heap._lock.acquire()
2003        self.addCleanup(heap._lock.release)
2004        for L in heap._len_to_seq.values():
2005            for arena, start, stop in L:
2006                all.append((heap._arenas.index(arena), start, stop,
2007                            stop-start, 'free'))
2008        for arena, start, stop in heap._allocated_blocks:
2009            all.append((heap._arenas.index(arena), start, stop,
2010                        stop-start, 'occupied'))
2011            occupied += (stop-start)
2012
2013        all.sort()
2014
2015        for i in range(len(all)-1):
2016            (arena, start, stop) = all[i][:3]
2017            (narena, nstart, nstop) = all[i+1][:3]
2018            self.assertTrue((arena != narena and nstart == 0) or
2019                            (stop == nstart))
2020
2021    def test_free_from_gc(self):
2022        # Check that freeing of blocks by the garbage collector doesn't deadlock
2023        # (issue #12352).
2024        # Make sure the GC is enabled, and set lower collection thresholds to
2025        # make collections more frequent (and increase the probability of
2026        # deadlock).
2027        if not gc.isenabled():
2028            gc.enable()
2029            self.addCleanup(gc.disable)
2030        thresholds = gc.get_threshold()
2031        self.addCleanup(gc.set_threshold, *thresholds)
2032        gc.set_threshold(10)
2033
2034        # perform numerous block allocations, with cyclic references to make
2035        # sure objects are collected asynchronously by the gc
2036        for i in range(5000):
2037            a = multiprocessing.heap.BufferWrapper(1)
2038            b = multiprocessing.heap.BufferWrapper(1)
2039            # circular references
2040            a.buddy = b
2041            b.buddy = a
2042
2043#
2044#
2045#
2046
2047class _Foo(Structure):
2048    _fields_ = [
2049        ('x', c_int),
2050        ('y', c_double)
2051        ]
2052
2053class _TestSharedCTypes(BaseTestCase):
2054
2055    ALLOWED_TYPES = ('processes',)
2056
2057    def setUp(self):
2058        if not HAS_SHAREDCTYPES:
2059            self.skipTest("requires multiprocessing.sharedctypes")
2060
2061    @classmethod
2062    def _double(cls, x, y, foo, arr, string):
2063        x.value *= 2
2064        y.value *= 2
2065        foo.x *= 2
2066        foo.y *= 2
2067        string.value *= 2
2068        for i in range(len(arr)):
2069            arr[i] *= 2
2070
2071    def test_sharedctypes(self, lock=False):
2072        x = Value('i', 7, lock=lock)
2073        y = Value(c_double, 1.0/3.0, lock=lock)
2074        foo = Value(_Foo, 3, 2, lock=lock)
2075        arr = self.Array('d', range(10), lock=lock)
2076        string = self.Array('c', 20, lock=lock)
2077        string.value = latin('hello')
2078
2079        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2080        p.daemon = True
2081        p.start()
2082        p.join()
2083
2084        self.assertEqual(x.value, 14)
2085        self.assertAlmostEqual(y.value, 2.0/3.0)
2086        self.assertEqual(foo.x, 6)
2087        self.assertAlmostEqual(foo.y, 4.0)
2088        for i in range(10):
2089            self.assertAlmostEqual(arr[i], i*2)
2090        self.assertEqual(string.value, latin('hellohello'))
2091
2092    def test_synchronize(self):
2093        self.test_sharedctypes(lock=True)
2094
2095    def test_copy(self):
2096        foo = _Foo(2, 5.0)
2097        bar = copy(foo)
2098        foo.x = 0
2099        foo.y = 0
2100        self.assertEqual(bar.x, 2)
2101        self.assertAlmostEqual(bar.y, 5.0)
2102
2103#
2104#
2105#
2106
2107class _TestFinalize(BaseTestCase):
2108
2109    ALLOWED_TYPES = ('processes',)
2110
2111    def setUp(self):
2112        self.registry_backup = util._finalizer_registry.copy()
2113        util._finalizer_registry.clear()
2114
2115    def tearDown(self):
2116        self.assertFalse(util._finalizer_registry)
2117        util._finalizer_registry.update(self.registry_backup)
2118
2119    @classmethod
2120    def _test_finalize(cls, conn):
2121        class Foo(object):
2122            pass
2123
2124        a = Foo()
2125        util.Finalize(a, conn.send, args=('a',))
2126        del a           # triggers callback for a
2127
2128        b = Foo()
2129        close_b = util.Finalize(b, conn.send, args=('b',))
2130        close_b()       # triggers callback for b
2131        close_b()       # does nothing because callback has already been called
2132        del b           # does nothing because callback has already been called
2133
2134        c = Foo()
2135        util.Finalize(c, conn.send, args=('c',))
2136
2137        d10 = Foo()
2138        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2139
2140        d01 = Foo()
2141        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2142        d02 = Foo()
2143        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2144        d03 = Foo()
2145        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2146
2147        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2148
2149        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2150
2151        # call multiprocessing's cleanup function then exit process without
2152        # garbage collecting locals
2153        util._exit_function()
2154        conn.close()
2155        os._exit(0)
2156
2157    def test_finalize(self):
2158        conn, child_conn = self.Pipe()
2159
2160        p = self.Process(target=self._test_finalize, args=(child_conn,))
2161        p.daemon = True
2162        p.start()
2163        p.join()
2164
2165        result = [obj for obj in iter(conn.recv, 'STOP')]
2166        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2167
2168    def test_thread_safety(self):
2169        # bpo-24484: _run_finalizers() should be thread-safe
2170        def cb():
2171            pass
2172
2173        class Foo(object):
2174            def __init__(self):
2175                self.ref = self  # create reference cycle
2176                # insert finalizer at random key
2177                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
2178
2179        finish = False
2180        exc = []
2181
2182        def run_finalizers():
2183            while not finish:
2184                time.sleep(random.random() * 1e-1)
2185                try:
2186                    # A GC run will eventually happen during this,
2187                    # collecting stale Foo's and mutating the registry
2188                    util._run_finalizers()
2189                except Exception as e:
2190                    exc.append(e)
2191
2192        def make_finalizers():
2193            d = {}
2194            while not finish:
2195                try:
2196                    # Old Foo's get gradually replaced and later
2197                    # collected by the GC (because of the cyclic ref)
2198                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
2199                except Exception as e:
2200                    exc.append(e)
2201                    d.clear()
2202
2203        old_interval = sys.getcheckinterval()
2204        old_threshold = gc.get_threshold()
2205        try:
2206            sys.setcheckinterval(10)
2207            gc.set_threshold(5, 5, 5)
2208            threads = [threading.Thread(target=run_finalizers),
2209                       threading.Thread(target=make_finalizers)]
2210            with support.start_threads(threads):
2211                time.sleep(4.0)  # Wait a bit to trigger race condition
2212                finish = True
2213            if exc:
2214                raise exc[0]
2215        finally:
2216            sys.setcheckinterval(old_interval)
2217            gc.set_threshold(*old_threshold)
2218            gc.collect()  # Collect remaining Foo's
2219
2220
2221#
2222# Test that from ... import * works for each module
2223#
2224
2225class _TestImportStar(BaseTestCase):
2226
2227    ALLOWED_TYPES = ('processes',)
2228
2229    def test_import(self):
2230        modules = [
2231            'multiprocessing', 'multiprocessing.connection',
2232            'multiprocessing.heap', 'multiprocessing.managers',
2233            'multiprocessing.pool', 'multiprocessing.process',
2234            'multiprocessing.synchronize', 'multiprocessing.util'
2235            ]
2236
2237        if HAS_REDUCTION:
2238            modules.append('multiprocessing.reduction')
2239
2240        if c_int is not None:
2241            # This module requires _ctypes
2242            modules.append('multiprocessing.sharedctypes')
2243
2244        for name in modules:
2245            __import__(name)
2246            mod = sys.modules[name]
2247
2248            for attr in getattr(mod, '__all__', ()):
2249                self.assertTrue(
2250                    hasattr(mod, attr),
2251                    '%r does not have attribute %r' % (mod, attr)
2252                    )
2253
2254#
2255# Quick test that logging works -- does not test logging output
2256#
2257
2258class _TestLogging(BaseTestCase):
2259
2260    ALLOWED_TYPES = ('processes',)
2261
2262    def test_enable_logging(self):
2263        logger = multiprocessing.get_logger()
2264        logger.setLevel(util.SUBWARNING)
2265        self.assertTrue(logger is not None)
2266        logger.debug('this will not be printed')
2267        logger.info('nor will this')
2268        logger.setLevel(LOG_LEVEL)
2269
2270    @classmethod
2271    def _test_level(cls, conn):
2272        logger = multiprocessing.get_logger()
2273        conn.send(logger.getEffectiveLevel())
2274
2275    def test_level(self):
2276        LEVEL1 = 32
2277        LEVEL2 = 37
2278
2279        logger = multiprocessing.get_logger()
2280        root_logger = logging.getLogger()
2281        root_level = root_logger.level
2282
2283        reader, writer = multiprocessing.Pipe(duplex=False)
2284
2285        logger.setLevel(LEVEL1)
2286        p = self.Process(target=self._test_level, args=(writer,))
2287        p.daemon = True
2288        p.start()
2289        self.assertEqual(LEVEL1, reader.recv())
2290
2291        logger.setLevel(logging.NOTSET)
2292        root_logger.setLevel(LEVEL2)
2293        p = self.Process(target=self._test_level, args=(writer,))
2294        p.daemon = True
2295        p.start()
2296        self.assertEqual(LEVEL2, reader.recv())
2297
2298        root_logger.setLevel(root_level)
2299        logger.setLevel(level=LOG_LEVEL)
2300
2301
2302# class _TestLoggingProcessName(BaseTestCase):
2303#
2304#     def handle(self, record):
2305#         assert record.processName == multiprocessing.current_process().name
2306#         self.__handled = True
2307#
2308#     def test_logging(self):
2309#         handler = logging.Handler()
2310#         handler.handle = self.handle
2311#         self.__handled = False
2312#         # Bypass getLogger() and side-effects
2313#         logger = logging.getLoggerClass()(
2314#                 'multiprocessing.test.TestLoggingProcessName')
2315#         logger.addHandler(handler)
2316#         logger.propagate = False
2317#
2318#         logger.warn('foo')
2319#         assert self.__handled
2320
2321#
2322# Check that Process.join() retries if os.waitpid() fails with EINTR
2323#
2324
2325class _TestPollEintr(BaseTestCase):
2326
2327    ALLOWED_TYPES = ('processes',)
2328
2329    @classmethod
2330    def _killer(cls, pid):
2331        time.sleep(0.5)
2332        os.kill(pid, signal.SIGUSR1)
2333
2334    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2335    def test_poll_eintr(self):
2336        got_signal = [False]
2337        def record(*args):
2338            got_signal[0] = True
2339        pid = os.getpid()
2340        oldhandler = signal.signal(signal.SIGUSR1, record)
2341        try:
2342            killer = self.Process(target=self._killer, args=(pid,))
2343            killer.start()
2344            p = self.Process(target=time.sleep, args=(1,))
2345            p.start()
2346            p.join()
2347            self.assertTrue(got_signal[0])
2348            self.assertEqual(p.exitcode, 0)
2349            killer.join()
2350        finally:
2351            signal.signal(signal.SIGUSR1, oldhandler)
2352
2353#
2354# Test to verify handle verification, see issue 3321
2355#
2356
2357class TestInvalidHandle(unittest.TestCase):
2358
2359    @unittest.skipIf(WIN32, "skipped on Windows")
2360    def test_invalid_handles(self):
2361        conn = _multiprocessing.Connection(44977608)
2362        self.assertRaises(IOError, conn.poll)
2363        self.assertRaises(IOError, _multiprocessing.Connection, -1)
2364
2365#
2366# Functions used to create test cases from the base ones in this module
2367#
2368
2369def get_attributes(Source, names):
2370    d = {}
2371    for name in names:
2372        obj = getattr(Source, name)
2373        if type(obj) == type(get_attributes):
2374            obj = staticmethod(obj)
2375        d[name] = obj
2376    return d
2377
2378def create_test_cases(Mixin, type):
2379    result = {}
2380    glob = globals()
2381    Type = type.capitalize()
2382
2383    for name in glob.keys():
2384        if name.startswith('_Test'):
2385            base = glob[name]
2386            if type in base.ALLOWED_TYPES:
2387                newname = 'With' + Type + name[1:]
2388                class Temp(base, unittest.TestCase, Mixin):
2389                    pass
2390                result[newname] = Temp
2391                Temp.__name__ = newname
2392                Temp.__module__ = Mixin.__module__
2393    return result
2394
2395#
2396# Create test cases
2397#
2398
2399class ProcessesMixin(object):
2400    TYPE = 'processes'
2401    Process = multiprocessing.Process
2402    locals().update(get_attributes(multiprocessing, (
2403        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2404        'Condition', 'Event', 'Value', 'Array', 'RawValue',
2405        'RawArray', 'current_process', 'active_children', 'Pipe',
2406        'connection', 'JoinableQueue', 'Pool'
2407        )))
2408
2409testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2410globals().update(testcases_processes)
2411
2412
2413class ManagerMixin(object):
2414    TYPE = 'manager'
2415    Process = multiprocessing.Process
2416    manager = object.__new__(multiprocessing.managers.SyncManager)
2417    locals().update(get_attributes(manager, (
2418        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2419       'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2420        'Namespace', 'JoinableQueue', 'Pool'
2421        )))
2422
2423testcases_manager = create_test_cases(ManagerMixin, type='manager')
2424globals().update(testcases_manager)
2425
2426
2427class ThreadsMixin(object):
2428    TYPE = 'threads'
2429    Process = multiprocessing.dummy.Process
2430    locals().update(get_attributes(multiprocessing.dummy, (
2431        'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2432        'Condition', 'Event', 'Value', 'Array', 'current_process',
2433        'active_children', 'Pipe', 'connection', 'dict', 'list',
2434        'Namespace', 'JoinableQueue', 'Pool'
2435        )))
2436
2437testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2438globals().update(testcases_threads)
2439
2440class OtherTest(unittest.TestCase):
2441    # TODO: add more tests for deliver/answer challenge.
2442    def test_deliver_challenge_auth_failure(self):
2443        class _FakeConnection(object):
2444            def recv_bytes(self, size):
2445                return b'something bogus'
2446            def send_bytes(self, data):
2447                pass
2448        self.assertRaises(multiprocessing.AuthenticationError,
2449                          multiprocessing.connection.deliver_challenge,
2450                          _FakeConnection(), b'abc')
2451
2452    def test_answer_challenge_auth_failure(self):
2453        class _FakeConnection(object):
2454            def __init__(self):
2455                self.count = 0
2456            def recv_bytes(self, size):
2457                self.count += 1
2458                if self.count == 1:
2459                    return multiprocessing.connection.CHALLENGE
2460                elif self.count == 2:
2461                    return b'something bogus'
2462                return b''
2463            def send_bytes(self, data):
2464                pass
2465        self.assertRaises(multiprocessing.AuthenticationError,
2466                          multiprocessing.connection.answer_challenge,
2467                          _FakeConnection(), b'abc')
2468
2469#
2470# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2471#
2472
2473def initializer(ns):
2474    ns.test += 1
2475
2476class TestInitializers(unittest.TestCase):
2477    def setUp(self):
2478        self.mgr = multiprocessing.Manager()
2479        self.ns = self.mgr.Namespace()
2480        self.ns.test = 0
2481
2482    def tearDown(self):
2483        self.mgr.shutdown()
2484
2485    def test_manager_initializer(self):
2486        m = multiprocessing.managers.SyncManager()
2487        self.assertRaises(TypeError, m.start, 1)
2488        m.start(initializer, (self.ns,))
2489        self.assertEqual(self.ns.test, 1)
2490        m.shutdown()
2491
2492    def test_pool_initializer(self):
2493        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2494        p = multiprocessing.Pool(1, initializer, (self.ns,))
2495        p.close()
2496        p.join()
2497        self.assertEqual(self.ns.test, 1)
2498
2499#
2500# Issue 5155, 5313, 5331: Test process in processes
2501# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2502#
2503
2504def _this_sub_process(q):
2505    try:
2506        item = q.get(block=False)
2507    except Queue.Empty:
2508        pass
2509
2510def _test_process(q):
2511    queue = multiprocessing.Queue()
2512    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2513    subProc.daemon = True
2514    subProc.start()
2515    subProc.join()
2516
2517def _afunc(x):
2518    return x*x
2519
2520def pool_in_process():
2521    pool = multiprocessing.Pool(processes=4)
2522    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2523
2524class _file_like(object):
2525    def __init__(self, delegate):
2526        self._delegate = delegate
2527        self._pid = None
2528
2529    @property
2530    def cache(self):
2531        pid = os.getpid()
2532        # There are no race conditions since fork keeps only the running thread
2533        if pid != self._pid:
2534            self._pid = pid
2535            self._cache = []
2536        return self._cache
2537
2538    def write(self, data):
2539        self.cache.append(data)
2540
2541    def flush(self):
2542        self._delegate.write(''.join(self.cache))
2543        self._cache = []
2544
2545class TestStdinBadfiledescriptor(unittest.TestCase):
2546
2547    def test_queue_in_process(self):
2548        queue = multiprocessing.Queue()
2549        proc = multiprocessing.Process(target=_test_process, args=(queue,))
2550        proc.start()
2551        proc.join()
2552
2553    def test_pool_in_process(self):
2554        p = multiprocessing.Process(target=pool_in_process)
2555        p.start()
2556        p.join()
2557
2558    def test_flushing(self):
2559        sio = StringIO()
2560        flike = _file_like(sio)
2561        flike.write('foo')
2562        proc = multiprocessing.Process(target=lambda: flike.flush())
2563        flike.flush()
2564        assert sio.getvalue() == 'foo'
2565
2566#
2567# Test interaction with socket timeouts - see Issue #6056
2568#
2569
2570class TestTimeouts(unittest.TestCase):
2571    @classmethod
2572    def _test_timeout(cls, child, address):
2573        time.sleep(1)
2574        child.send(123)
2575        child.close()
2576        conn = multiprocessing.connection.Client(address)
2577        conn.send(456)
2578        conn.close()
2579
2580    def test_timeout(self):
2581        old_timeout = socket.getdefaulttimeout()
2582        try:
2583            socket.setdefaulttimeout(0.1)
2584            parent, child = multiprocessing.Pipe(duplex=True)
2585            l = multiprocessing.connection.Listener(family='AF_INET')
2586            p = multiprocessing.Process(target=self._test_timeout,
2587                                        args=(child, l.address))
2588            p.start()
2589            child.close()
2590            self.assertEqual(parent.recv(), 123)
2591            parent.close()
2592            conn = l.accept()
2593            self.assertEqual(conn.recv(), 456)
2594            conn.close()
2595            l.close()
2596            p.join(10)
2597        finally:
2598            socket.setdefaulttimeout(old_timeout)
2599
2600#
2601# Test what happens with no "if __name__ == '__main__'"
2602#
2603
2604class TestNoForkBomb(unittest.TestCase):
2605    def test_noforkbomb(self):
2606        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2607        if WIN32:
2608            rc, out, err = test.script_helper.assert_python_failure(name)
2609            self.assertEqual(out, '')
2610            self.assertIn('RuntimeError', err)
2611        else:
2612            rc, out, err = test.script_helper.assert_python_ok(name)
2613            self.assertEqual(out.rstrip(), '123')
2614            self.assertEqual(err, '')
2615
2616#
2617# Issue 12098: check sys.flags of child matches that for parent
2618#
2619
2620class TestFlags(unittest.TestCase):
2621    @classmethod
2622    def run_in_grandchild(cls, conn):
2623        conn.send(tuple(sys.flags))
2624
2625    @classmethod
2626    def run_in_child(cls):
2627        import json
2628        r, w = multiprocessing.Pipe(duplex=False)
2629        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2630        p.start()
2631        grandchild_flags = r.recv()
2632        p.join()
2633        r.close()
2634        w.close()
2635        flags = (tuple(sys.flags), grandchild_flags)
2636        print(json.dumps(flags))
2637
2638    @support.requires_unicode  # XXX json needs unicode support
2639    def test_flags(self):
2640        import json, subprocess
2641        # start child process using unusual flags
2642        prog = ('from test.test_multiprocessing import TestFlags; ' +
2643                'TestFlags.run_in_child()')
2644        data = subprocess.check_output(
2645            [sys.executable, '-E', '-B', '-O', '-c', prog])
2646        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2647        self.assertEqual(child_flags, grandchild_flags)
2648
2649#
2650# Issue #17555: ForkAwareThreadLock
2651#
2652
2653class TestForkAwareThreadLock(unittest.TestCase):
2654    # We recurisvely start processes.  Issue #17555 meant that the
2655    # after fork registry would get duplicate entries for the same
2656    # lock.  The size of the registry at generation n was ~2**n.
2657
2658    @classmethod
2659    def child(cls, n, conn):
2660        if n > 1:
2661            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2662            p.start()
2663            p.join()
2664        else:
2665            conn.send(len(util._afterfork_registry))
2666        conn.close()
2667
2668    def test_lock(self):
2669        r, w = multiprocessing.Pipe(False)
2670        l = util.ForkAwareThreadLock()
2671        old_size = len(util._afterfork_registry)
2672        p = multiprocessing.Process(target=self.child, args=(5, w))
2673        p.start()
2674        new_size = r.recv()
2675        p.join()
2676        self.assertLessEqual(new_size, old_size)
2677
2678#
2679# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2680#
2681
2682class TestIgnoreEINTR(unittest.TestCase):
2683
2684    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
2685    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
2686
2687    @classmethod
2688    def _test_ignore(cls, conn):
2689        def handler(signum, frame):
2690            pass
2691        signal.signal(signal.SIGUSR1, handler)
2692        conn.send('ready')
2693        x = conn.recv()
2694        conn.send(x)
2695        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
2696
2697    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2698    def test_ignore(self):
2699        conn, child_conn = multiprocessing.Pipe()
2700        try:
2701            p = multiprocessing.Process(target=self._test_ignore,
2702                                        args=(child_conn,))
2703            p.daemon = True
2704            p.start()
2705            child_conn.close()
2706            self.assertEqual(conn.recv(), 'ready')
2707            time.sleep(0.1)
2708            os.kill(p.pid, signal.SIGUSR1)
2709            time.sleep(0.1)
2710            conn.send(1234)
2711            self.assertEqual(conn.recv(), 1234)
2712            time.sleep(0.1)
2713            os.kill(p.pid, signal.SIGUSR1)
2714            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
2715            time.sleep(0.1)
2716            p.join()
2717        finally:
2718            conn.close()
2719
2720    @classmethod
2721    def _test_ignore_listener(cls, conn):
2722        def handler(signum, frame):
2723            pass
2724        signal.signal(signal.SIGUSR1, handler)
2725        l = multiprocessing.connection.Listener()
2726        conn.send(l.address)
2727        a = l.accept()
2728        a.send('welcome')
2729
2730    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2731    def test_ignore_listener(self):
2732        conn, child_conn = multiprocessing.Pipe()
2733        try:
2734            p = multiprocessing.Process(target=self._test_ignore_listener,
2735                                        args=(child_conn,))
2736            p.daemon = True
2737            p.start()
2738            child_conn.close()
2739            address = conn.recv()
2740            time.sleep(0.1)
2741            os.kill(p.pid, signal.SIGUSR1)
2742            time.sleep(0.1)
2743            client = multiprocessing.connection.Client(address)
2744            self.assertEqual(client.recv(), 'welcome')
2745            p.join()
2746        finally:
2747            conn.close()
2748
2749#
2750#
2751#
2752
2753testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2754                   TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
2755                   TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
2756
2757#
2758#
2759#
2760
2761def test_main(run=None):
2762    if sys.platform.startswith("linux"):
2763        try:
2764            lock = multiprocessing.RLock()
2765        except OSError:
2766            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
2767
2768    check_enough_semaphores()
2769
2770    if run is None:
2771        from test.support import run_unittest as run
2772
2773    util.get_temp_dir()     # creates temp directory for use by all processes
2774
2775    multiprocessing.get_logger().setLevel(LOG_LEVEL)
2776
2777    ProcessesMixin.pool = multiprocessing.Pool(4)
2778    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2779    ManagerMixin.manager.__init__()
2780    ManagerMixin.manager.start()
2781    ManagerMixin.pool = ManagerMixin.manager.Pool(4)
2782
2783    testcases = (
2784        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2785        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
2786        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2787        testcases_other
2788        )
2789
2790    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2791    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2792    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2793    # module during these tests is at least platform dependent and possibly
2794    # non-deterministic on any given platform. So we don't mind if the listed
2795    # warnings aren't actually raised.
2796    with support.check_py3k_warnings(
2797            (".+__(get|set)slice__ has been removed", DeprecationWarning),
2798            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2799            quiet=True):
2800        run(suite)
2801
2802    ThreadsMixin.pool.terminate()
2803    ProcessesMixin.pool.terminate()
2804    ManagerMixin.pool.terminate()
2805    ManagerMixin.manager.shutdown()
2806
2807    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
2808
2809def main():
2810    test_main(unittest.TextTestRunner(verbosity=2).run)
2811
2812if __name__ == '__main__':
2813    main()
2814