1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29from test.support import hashlib_helper 30from test.support import socket_helper 31 32 33# Skip tests if _multiprocessing wasn't built. 34_multiprocessing = test.support.import_module('_multiprocessing') 35# Skip tests if sem_open implementation is broken. 36support.skip_if_broken_multiprocessing_synchronize() 37import threading 38 39import multiprocessing.connection 40import multiprocessing.dummy 41import multiprocessing.heap 42import multiprocessing.managers 43import multiprocessing.pool 44import multiprocessing.queues 45 46from multiprocessing import util 47 48try: 49 from multiprocessing import reduction 50 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 51except ImportError: 52 HAS_REDUCTION = False 53 54try: 55 from multiprocessing.sharedctypes import Value, copy 56 HAS_SHAREDCTYPES = True 57except ImportError: 58 HAS_SHAREDCTYPES = False 59 60try: 61 from multiprocessing import shared_memory 62 HAS_SHMEM = True 63except ImportError: 64 HAS_SHMEM = False 65 66try: 67 import msvcrt 68except ImportError: 69 msvcrt = None 70 71 72def latin(s): 73 return s.encode('latin') 74 75 76def close_queue(queue): 77 if isinstance(queue, multiprocessing.queues.Queue): 78 queue.close() 79 queue.join_thread() 80 81 82def join_process(process): 83 # Since multiprocessing.Process has the same API than threading.Thread 84 # (join() and is_alive(), the support function can be reused 85 support.join_thread(process) 86 87 88if os.name == "posix": 89 from multiprocessing import resource_tracker 90 91 def _resource_unlink(name, rtype): 92 resource_tracker._CLEANUP_FUNCS[rtype](name) 93 94 95# 96# Constants 97# 98 99LOG_LEVEL = util.SUBWARNING 100#LOG_LEVEL = logging.DEBUG 101 102DELTA = 0.1 103CHECK_TIMINGS = False # making true makes tests take a lot longer 104 # and can sometimes cause some non-serious 105 # failures because some calls block a bit 106 # longer than expected 107if CHECK_TIMINGS: 108 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 109else: 110 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 111 112HAVE_GETVALUE = not getattr(_multiprocessing, 113 'HAVE_BROKEN_SEM_GETVALUE', False) 114 115WIN32 = (sys.platform == "win32") 116 117from multiprocessing.connection import wait 118 119def wait_for_handle(handle, timeout): 120 if timeout is not None and timeout < 0.0: 121 timeout = None 122 return wait([handle], timeout) 123 124try: 125 MAXFD = os.sysconf("SC_OPEN_MAX") 126except: 127 MAXFD = 256 128 129# To speed up tests when using the forkserver, we can preload these: 130PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 131 132# 133# Some tests require ctypes 134# 135 136try: 137 from ctypes import Structure, c_int, c_double, c_longlong 138except ImportError: 139 Structure = object 140 c_int = c_double = c_longlong = None 141 142 143def check_enough_semaphores(): 144 """Check that the system supports enough semaphores to run the test.""" 145 # minimum number of semaphores available according to POSIX 146 nsems_min = 256 147 try: 148 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 149 except (AttributeError, ValueError): 150 # sysconf not available or setting not available 151 return 152 if nsems == -1 or nsems >= nsems_min: 153 return 154 raise unittest.SkipTest("The OS doesn't support enough semaphores " 155 "to run the test (required: %d)." % nsems_min) 156 157 158# 159# Creates a wrapper for a function which records the time it takes to finish 160# 161 162class TimingWrapper(object): 163 164 def __init__(self, func): 165 self.func = func 166 self.elapsed = None 167 168 def __call__(self, *args, **kwds): 169 t = time.monotonic() 170 try: 171 return self.func(*args, **kwds) 172 finally: 173 self.elapsed = time.monotonic() - t 174 175# 176# Base class for test cases 177# 178 179class BaseTestCase(object): 180 181 ALLOWED_TYPES = ('processes', 'manager', 'threads') 182 183 def assertTimingAlmostEqual(self, a, b): 184 if CHECK_TIMINGS: 185 self.assertAlmostEqual(a, b, 1) 186 187 def assertReturnsIfImplemented(self, value, func, *args): 188 try: 189 res = func(*args) 190 except NotImplementedError: 191 pass 192 else: 193 return self.assertEqual(value, res) 194 195 # For the sanity of Windows users, rather than crashing or freezing in 196 # multiple ways. 197 def __reduce__(self, *args): 198 raise NotImplementedError("shouldn't try to pickle a test case") 199 200 __reduce_ex__ = __reduce__ 201 202# 203# Return the value of a semaphore 204# 205 206def get_value(self): 207 try: 208 return self.get_value() 209 except AttributeError: 210 try: 211 return self._Semaphore__value 212 except AttributeError: 213 try: 214 return self._value 215 except AttributeError: 216 raise NotImplementedError 217 218# 219# Testcases 220# 221 222class DummyCallable: 223 def __call__(self, q, c): 224 assert isinstance(c, DummyCallable) 225 q.put(5) 226 227 228class _TestProcess(BaseTestCase): 229 230 ALLOWED_TYPES = ('processes', 'threads') 231 232 def test_current(self): 233 if self.TYPE == 'threads': 234 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 235 236 current = self.current_process() 237 authkey = current.authkey 238 239 self.assertTrue(current.is_alive()) 240 self.assertTrue(not current.daemon) 241 self.assertIsInstance(authkey, bytes) 242 self.assertTrue(len(authkey) > 0) 243 self.assertEqual(current.ident, os.getpid()) 244 self.assertEqual(current.exitcode, None) 245 246 def test_daemon_argument(self): 247 if self.TYPE == "threads": 248 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 249 250 # By default uses the current process's daemon flag. 251 proc0 = self.Process(target=self._test) 252 self.assertEqual(proc0.daemon, self.current_process().daemon) 253 proc1 = self.Process(target=self._test, daemon=True) 254 self.assertTrue(proc1.daemon) 255 proc2 = self.Process(target=self._test, daemon=False) 256 self.assertFalse(proc2.daemon) 257 258 @classmethod 259 def _test(cls, q, *args, **kwds): 260 current = cls.current_process() 261 q.put(args) 262 q.put(kwds) 263 q.put(current.name) 264 if cls.TYPE != 'threads': 265 q.put(bytes(current.authkey)) 266 q.put(current.pid) 267 268 def test_parent_process_attributes(self): 269 if self.TYPE == "threads": 270 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 271 272 self.assertIsNone(self.parent_process()) 273 274 rconn, wconn = self.Pipe(duplex=False) 275 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 276 p.start() 277 p.join() 278 parent_pid, parent_name = rconn.recv() 279 self.assertEqual(parent_pid, self.current_process().pid) 280 self.assertEqual(parent_pid, os.getpid()) 281 self.assertEqual(parent_name, self.current_process().name) 282 283 @classmethod 284 def _test_send_parent_process(cls, wconn): 285 from multiprocessing.process import parent_process 286 wconn.send([parent_process().pid, parent_process().name]) 287 288 def test_parent_process(self): 289 if self.TYPE == "threads": 290 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 291 292 # Launch a child process. Make it launch a grandchild process. Kill the 293 # child process and make sure that the grandchild notices the death of 294 # its parent (a.k.a the child process). 295 rconn, wconn = self.Pipe(duplex=False) 296 p = self.Process( 297 target=self._test_create_grandchild_process, args=(wconn, )) 298 p.start() 299 300 if not rconn.poll(timeout=support.LONG_TIMEOUT): 301 raise AssertionError("Could not communicate with child process") 302 parent_process_status = rconn.recv() 303 self.assertEqual(parent_process_status, "alive") 304 305 p.terminate() 306 p.join() 307 308 if not rconn.poll(timeout=support.LONG_TIMEOUT): 309 raise AssertionError("Could not communicate with child process") 310 parent_process_status = rconn.recv() 311 self.assertEqual(parent_process_status, "not alive") 312 313 @classmethod 314 def _test_create_grandchild_process(cls, wconn): 315 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 316 p.start() 317 time.sleep(300) 318 319 @classmethod 320 def _test_report_parent_status(cls, wconn): 321 from multiprocessing.process import parent_process 322 wconn.send("alive" if parent_process().is_alive() else "not alive") 323 parent_process().join(timeout=support.SHORT_TIMEOUT) 324 wconn.send("alive" if parent_process().is_alive() else "not alive") 325 326 def test_process(self): 327 q = self.Queue(1) 328 e = self.Event() 329 args = (q, 1, 2) 330 kwargs = {'hello':23, 'bye':2.54} 331 name = 'SomeProcess' 332 p = self.Process( 333 target=self._test, args=args, kwargs=kwargs, name=name 334 ) 335 p.daemon = True 336 current = self.current_process() 337 338 if self.TYPE != 'threads': 339 self.assertEqual(p.authkey, current.authkey) 340 self.assertEqual(p.is_alive(), False) 341 self.assertEqual(p.daemon, True) 342 self.assertNotIn(p, self.active_children()) 343 self.assertTrue(type(self.active_children()) is list) 344 self.assertEqual(p.exitcode, None) 345 346 p.start() 347 348 self.assertEqual(p.exitcode, None) 349 self.assertEqual(p.is_alive(), True) 350 self.assertIn(p, self.active_children()) 351 352 self.assertEqual(q.get(), args[1:]) 353 self.assertEqual(q.get(), kwargs) 354 self.assertEqual(q.get(), p.name) 355 if self.TYPE != 'threads': 356 self.assertEqual(q.get(), current.authkey) 357 self.assertEqual(q.get(), p.pid) 358 359 p.join() 360 361 self.assertEqual(p.exitcode, 0) 362 self.assertEqual(p.is_alive(), False) 363 self.assertNotIn(p, self.active_children()) 364 close_queue(q) 365 366 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 367 def test_process_mainthread_native_id(self): 368 if self.TYPE == 'threads': 369 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 370 371 current_mainthread_native_id = threading.main_thread().native_id 372 373 q = self.Queue(1) 374 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 375 p.start() 376 377 child_mainthread_native_id = q.get() 378 p.join() 379 close_queue(q) 380 381 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 382 383 @classmethod 384 def _test_process_mainthread_native_id(cls, q): 385 mainthread_native_id = threading.main_thread().native_id 386 q.put(mainthread_native_id) 387 388 @classmethod 389 def _sleep_some(cls): 390 time.sleep(100) 391 392 @classmethod 393 def _test_sleep(cls, delay): 394 time.sleep(delay) 395 396 def _kill_process(self, meth): 397 if self.TYPE == 'threads': 398 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 399 400 p = self.Process(target=self._sleep_some) 401 p.daemon = True 402 p.start() 403 404 self.assertEqual(p.is_alive(), True) 405 self.assertIn(p, self.active_children()) 406 self.assertEqual(p.exitcode, None) 407 408 join = TimingWrapper(p.join) 409 410 self.assertEqual(join(0), None) 411 self.assertTimingAlmostEqual(join.elapsed, 0.0) 412 self.assertEqual(p.is_alive(), True) 413 414 self.assertEqual(join(-1), None) 415 self.assertTimingAlmostEqual(join.elapsed, 0.0) 416 self.assertEqual(p.is_alive(), True) 417 418 # XXX maybe terminating too soon causes the problems on Gentoo... 419 time.sleep(1) 420 421 meth(p) 422 423 if hasattr(signal, 'alarm'): 424 # On the Gentoo buildbot waitpid() often seems to block forever. 425 # We use alarm() to interrupt it if it blocks for too long. 426 def handler(*args): 427 raise RuntimeError('join took too long: %s' % p) 428 old_handler = signal.signal(signal.SIGALRM, handler) 429 try: 430 signal.alarm(10) 431 self.assertEqual(join(), None) 432 finally: 433 signal.alarm(0) 434 signal.signal(signal.SIGALRM, old_handler) 435 else: 436 self.assertEqual(join(), None) 437 438 self.assertTimingAlmostEqual(join.elapsed, 0.0) 439 440 self.assertEqual(p.is_alive(), False) 441 self.assertNotIn(p, self.active_children()) 442 443 p.join() 444 445 return p.exitcode 446 447 def test_terminate(self): 448 exitcode = self._kill_process(multiprocessing.Process.terminate) 449 if os.name != 'nt': 450 self.assertEqual(exitcode, -signal.SIGTERM) 451 452 def test_kill(self): 453 exitcode = self._kill_process(multiprocessing.Process.kill) 454 if os.name != 'nt': 455 self.assertEqual(exitcode, -signal.SIGKILL) 456 457 def test_cpu_count(self): 458 try: 459 cpus = multiprocessing.cpu_count() 460 except NotImplementedError: 461 cpus = 1 462 self.assertTrue(type(cpus) is int) 463 self.assertTrue(cpus >= 1) 464 465 def test_active_children(self): 466 self.assertEqual(type(self.active_children()), list) 467 468 p = self.Process(target=time.sleep, args=(DELTA,)) 469 self.assertNotIn(p, self.active_children()) 470 471 p.daemon = True 472 p.start() 473 self.assertIn(p, self.active_children()) 474 475 p.join() 476 self.assertNotIn(p, self.active_children()) 477 478 @classmethod 479 def _test_recursion(cls, wconn, id): 480 wconn.send(id) 481 if len(id) < 2: 482 for i in range(2): 483 p = cls.Process( 484 target=cls._test_recursion, args=(wconn, id+[i]) 485 ) 486 p.start() 487 p.join() 488 489 def test_recursion(self): 490 rconn, wconn = self.Pipe(duplex=False) 491 self._test_recursion(wconn, []) 492 493 time.sleep(DELTA) 494 result = [] 495 while rconn.poll(): 496 result.append(rconn.recv()) 497 498 expected = [ 499 [], 500 [0], 501 [0, 0], 502 [0, 1], 503 [1], 504 [1, 0], 505 [1, 1] 506 ] 507 self.assertEqual(result, expected) 508 509 @classmethod 510 def _test_sentinel(cls, event): 511 event.wait(10.0) 512 513 def test_sentinel(self): 514 if self.TYPE == "threads": 515 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 516 event = self.Event() 517 p = self.Process(target=self._test_sentinel, args=(event,)) 518 with self.assertRaises(ValueError): 519 p.sentinel 520 p.start() 521 self.addCleanup(p.join) 522 sentinel = p.sentinel 523 self.assertIsInstance(sentinel, int) 524 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 525 event.set() 526 p.join() 527 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 528 529 @classmethod 530 def _test_close(cls, rc=0, q=None): 531 if q is not None: 532 q.get() 533 sys.exit(rc) 534 535 def test_close(self): 536 if self.TYPE == "threads": 537 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 538 q = self.Queue() 539 p = self.Process(target=self._test_close, kwargs={'q': q}) 540 p.daemon = True 541 p.start() 542 self.assertEqual(p.is_alive(), True) 543 # Child is still alive, cannot close 544 with self.assertRaises(ValueError): 545 p.close() 546 547 q.put(None) 548 p.join() 549 self.assertEqual(p.is_alive(), False) 550 self.assertEqual(p.exitcode, 0) 551 p.close() 552 with self.assertRaises(ValueError): 553 p.is_alive() 554 with self.assertRaises(ValueError): 555 p.join() 556 with self.assertRaises(ValueError): 557 p.terminate() 558 p.close() 559 560 wr = weakref.ref(p) 561 del p 562 gc.collect() 563 self.assertIs(wr(), None) 564 565 close_queue(q) 566 567 def test_many_processes(self): 568 if self.TYPE == 'threads': 569 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 570 571 sm = multiprocessing.get_start_method() 572 N = 5 if sm == 'spawn' else 100 573 574 # Try to overwhelm the forkserver loop with events 575 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 576 for i in range(N)] 577 for p in procs: 578 p.start() 579 for p in procs: 580 join_process(p) 581 for p in procs: 582 self.assertEqual(p.exitcode, 0) 583 584 procs = [self.Process(target=self._sleep_some) 585 for i in range(N)] 586 for p in procs: 587 p.start() 588 time.sleep(0.001) # let the children start... 589 for p in procs: 590 p.terminate() 591 for p in procs: 592 join_process(p) 593 if os.name != 'nt': 594 exitcodes = [-signal.SIGTERM] 595 if sys.platform == 'darwin': 596 # bpo-31510: On macOS, killing a freshly started process with 597 # SIGTERM sometimes kills the process with SIGKILL. 598 exitcodes.append(-signal.SIGKILL) 599 for p in procs: 600 self.assertIn(p.exitcode, exitcodes) 601 602 def test_lose_target_ref(self): 603 c = DummyCallable() 604 wr = weakref.ref(c) 605 q = self.Queue() 606 p = self.Process(target=c, args=(q, c)) 607 del c 608 p.start() 609 p.join() 610 self.assertIs(wr(), None) 611 self.assertEqual(q.get(), 5) 612 close_queue(q) 613 614 @classmethod 615 def _test_child_fd_inflation(self, evt, q): 616 q.put(test.support.fd_count()) 617 evt.wait() 618 619 def test_child_fd_inflation(self): 620 # Number of fds in child processes should not grow with the 621 # number of running children. 622 if self.TYPE == 'threads': 623 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 624 625 sm = multiprocessing.get_start_method() 626 if sm == 'fork': 627 # The fork method by design inherits all fds from the parent, 628 # trying to go against it is a lost battle 629 self.skipTest('test not appropriate for {}'.format(sm)) 630 631 N = 5 632 evt = self.Event() 633 q = self.Queue() 634 635 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 636 for i in range(N)] 637 for p in procs: 638 p.start() 639 640 try: 641 fd_counts = [q.get() for i in range(N)] 642 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 643 644 finally: 645 evt.set() 646 for p in procs: 647 p.join() 648 close_queue(q) 649 650 @classmethod 651 def _test_wait_for_threads(self, evt): 652 def func1(): 653 time.sleep(0.5) 654 evt.set() 655 656 def func2(): 657 time.sleep(20) 658 evt.clear() 659 660 threading.Thread(target=func1).start() 661 threading.Thread(target=func2, daemon=True).start() 662 663 def test_wait_for_threads(self): 664 # A child process should wait for non-daemonic threads to end 665 # before exiting 666 if self.TYPE == 'threads': 667 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 668 669 evt = self.Event() 670 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 671 proc.start() 672 proc.join() 673 self.assertTrue(evt.is_set()) 674 675 @classmethod 676 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 677 for stream_name, action in break_std_streams.items(): 678 if action == 'close': 679 stream = io.StringIO() 680 stream.close() 681 else: 682 assert action == 'remove' 683 stream = None 684 setattr(sys, stream_name, None) 685 evt.set() 686 687 def test_error_on_stdio_flush_1(self): 688 # Check that Process works with broken standard streams 689 streams = [io.StringIO(), None] 690 streams[0].close() 691 for stream_name in ('stdout', 'stderr'): 692 for stream in streams: 693 old_stream = getattr(sys, stream_name) 694 setattr(sys, stream_name, stream) 695 try: 696 evt = self.Event() 697 proc = self.Process(target=self._test_error_on_stdio_flush, 698 args=(evt,)) 699 proc.start() 700 proc.join() 701 self.assertTrue(evt.is_set()) 702 self.assertEqual(proc.exitcode, 0) 703 finally: 704 setattr(sys, stream_name, old_stream) 705 706 def test_error_on_stdio_flush_2(self): 707 # Same as test_error_on_stdio_flush_1(), but standard streams are 708 # broken by the child process 709 for stream_name in ('stdout', 'stderr'): 710 for action in ('close', 'remove'): 711 old_stream = getattr(sys, stream_name) 712 try: 713 evt = self.Event() 714 proc = self.Process(target=self._test_error_on_stdio_flush, 715 args=(evt, {stream_name: action})) 716 proc.start() 717 proc.join() 718 self.assertTrue(evt.is_set()) 719 self.assertEqual(proc.exitcode, 0) 720 finally: 721 setattr(sys, stream_name, old_stream) 722 723 @classmethod 724 def _sleep_and_set_event(self, evt, delay=0.0): 725 time.sleep(delay) 726 evt.set() 727 728 def check_forkserver_death(self, signum): 729 # bpo-31308: if the forkserver process has died, we should still 730 # be able to create and run new Process instances (the forkserver 731 # is implicitly restarted). 732 if self.TYPE == 'threads': 733 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 734 sm = multiprocessing.get_start_method() 735 if sm != 'forkserver': 736 # The fork method by design inherits all fds from the parent, 737 # trying to go against it is a lost battle 738 self.skipTest('test not appropriate for {}'.format(sm)) 739 740 from multiprocessing.forkserver import _forkserver 741 _forkserver.ensure_running() 742 743 # First process sleeps 500 ms 744 delay = 0.5 745 746 evt = self.Event() 747 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 748 proc.start() 749 750 pid = _forkserver._forkserver_pid 751 os.kill(pid, signum) 752 # give time to the fork server to die and time to proc to complete 753 time.sleep(delay * 2.0) 754 755 evt2 = self.Event() 756 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 757 proc2.start() 758 proc2.join() 759 self.assertTrue(evt2.is_set()) 760 self.assertEqual(proc2.exitcode, 0) 761 762 proc.join() 763 self.assertTrue(evt.is_set()) 764 self.assertIn(proc.exitcode, (0, 255)) 765 766 def test_forkserver_sigint(self): 767 # Catchable signal 768 self.check_forkserver_death(signal.SIGINT) 769 770 def test_forkserver_sigkill(self): 771 # Uncatchable signal 772 if os.name != 'nt': 773 self.check_forkserver_death(signal.SIGKILL) 774 775 776# 777# 778# 779 780class _UpperCaser(multiprocessing.Process): 781 782 def __init__(self): 783 multiprocessing.Process.__init__(self) 784 self.child_conn, self.parent_conn = multiprocessing.Pipe() 785 786 def run(self): 787 self.parent_conn.close() 788 for s in iter(self.child_conn.recv, None): 789 self.child_conn.send(s.upper()) 790 self.child_conn.close() 791 792 def submit(self, s): 793 assert type(s) is str 794 self.parent_conn.send(s) 795 return self.parent_conn.recv() 796 797 def stop(self): 798 self.parent_conn.send(None) 799 self.parent_conn.close() 800 self.child_conn.close() 801 802class _TestSubclassingProcess(BaseTestCase): 803 804 ALLOWED_TYPES = ('processes',) 805 806 def test_subclassing(self): 807 uppercaser = _UpperCaser() 808 uppercaser.daemon = True 809 uppercaser.start() 810 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 811 self.assertEqual(uppercaser.submit('world'), 'WORLD') 812 uppercaser.stop() 813 uppercaser.join() 814 815 def test_stderr_flush(self): 816 # sys.stderr is flushed at process shutdown (issue #13812) 817 if self.TYPE == "threads": 818 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 819 820 testfn = test.support.TESTFN 821 self.addCleanup(test.support.unlink, testfn) 822 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 823 proc.start() 824 proc.join() 825 with open(testfn, 'r') as f: 826 err = f.read() 827 # The whole traceback was printed 828 self.assertIn("ZeroDivisionError", err) 829 self.assertIn("test_multiprocessing.py", err) 830 self.assertIn("1/0 # MARKER", err) 831 832 @classmethod 833 def _test_stderr_flush(cls, testfn): 834 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 835 sys.stderr = open(fd, 'w', closefd=False) 836 1/0 # MARKER 837 838 839 @classmethod 840 def _test_sys_exit(cls, reason, testfn): 841 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 842 sys.stderr = open(fd, 'w', closefd=False) 843 sys.exit(reason) 844 845 def test_sys_exit(self): 846 # See Issue 13854 847 if self.TYPE == 'threads': 848 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 849 850 testfn = test.support.TESTFN 851 self.addCleanup(test.support.unlink, testfn) 852 853 for reason in ( 854 [1, 2, 3], 855 'ignore this', 856 ): 857 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 858 p.daemon = True 859 p.start() 860 join_process(p) 861 self.assertEqual(p.exitcode, 1) 862 863 with open(testfn, 'r') as f: 864 content = f.read() 865 self.assertEqual(content.rstrip(), str(reason)) 866 867 os.unlink(testfn) 868 869 cases = [ 870 ((True,), 1), 871 ((False,), 0), 872 ((8,), 8), 873 ((None,), 0), 874 ((), 0), 875 ] 876 877 for args, expected in cases: 878 with self.subTest(args=args): 879 p = self.Process(target=sys.exit, args=args) 880 p.daemon = True 881 p.start() 882 join_process(p) 883 self.assertEqual(p.exitcode, expected) 884 885# 886# 887# 888 889def queue_empty(q): 890 if hasattr(q, 'empty'): 891 return q.empty() 892 else: 893 return q.qsize() == 0 894 895def queue_full(q, maxsize): 896 if hasattr(q, 'full'): 897 return q.full() 898 else: 899 return q.qsize() == maxsize 900 901 902class _TestQueue(BaseTestCase): 903 904 905 @classmethod 906 def _test_put(cls, queue, child_can_start, parent_can_continue): 907 child_can_start.wait() 908 for i in range(6): 909 queue.get() 910 parent_can_continue.set() 911 912 def test_put(self): 913 MAXSIZE = 6 914 queue = self.Queue(maxsize=MAXSIZE) 915 child_can_start = self.Event() 916 parent_can_continue = self.Event() 917 918 proc = self.Process( 919 target=self._test_put, 920 args=(queue, child_can_start, parent_can_continue) 921 ) 922 proc.daemon = True 923 proc.start() 924 925 self.assertEqual(queue_empty(queue), True) 926 self.assertEqual(queue_full(queue, MAXSIZE), False) 927 928 queue.put(1) 929 queue.put(2, True) 930 queue.put(3, True, None) 931 queue.put(4, False) 932 queue.put(5, False, None) 933 queue.put_nowait(6) 934 935 # the values may be in buffer but not yet in pipe so sleep a bit 936 time.sleep(DELTA) 937 938 self.assertEqual(queue_empty(queue), False) 939 self.assertEqual(queue_full(queue, MAXSIZE), True) 940 941 put = TimingWrapper(queue.put) 942 put_nowait = TimingWrapper(queue.put_nowait) 943 944 self.assertRaises(pyqueue.Full, put, 7, False) 945 self.assertTimingAlmostEqual(put.elapsed, 0) 946 947 self.assertRaises(pyqueue.Full, put, 7, False, None) 948 self.assertTimingAlmostEqual(put.elapsed, 0) 949 950 self.assertRaises(pyqueue.Full, put_nowait, 7) 951 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 952 953 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 954 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 955 956 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 957 self.assertTimingAlmostEqual(put.elapsed, 0) 958 959 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 960 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 961 962 child_can_start.set() 963 parent_can_continue.wait() 964 965 self.assertEqual(queue_empty(queue), True) 966 self.assertEqual(queue_full(queue, MAXSIZE), False) 967 968 proc.join() 969 close_queue(queue) 970 971 @classmethod 972 def _test_get(cls, queue, child_can_start, parent_can_continue): 973 child_can_start.wait() 974 #queue.put(1) 975 queue.put(2) 976 queue.put(3) 977 queue.put(4) 978 queue.put(5) 979 parent_can_continue.set() 980 981 def test_get(self): 982 queue = self.Queue() 983 child_can_start = self.Event() 984 parent_can_continue = self.Event() 985 986 proc = self.Process( 987 target=self._test_get, 988 args=(queue, child_can_start, parent_can_continue) 989 ) 990 proc.daemon = True 991 proc.start() 992 993 self.assertEqual(queue_empty(queue), True) 994 995 child_can_start.set() 996 parent_can_continue.wait() 997 998 time.sleep(DELTA) 999 self.assertEqual(queue_empty(queue), False) 1000 1001 # Hangs unexpectedly, remove for now 1002 #self.assertEqual(queue.get(), 1) 1003 self.assertEqual(queue.get(True, None), 2) 1004 self.assertEqual(queue.get(True), 3) 1005 self.assertEqual(queue.get(timeout=1), 4) 1006 self.assertEqual(queue.get_nowait(), 5) 1007 1008 self.assertEqual(queue_empty(queue), True) 1009 1010 get = TimingWrapper(queue.get) 1011 get_nowait = TimingWrapper(queue.get_nowait) 1012 1013 self.assertRaises(pyqueue.Empty, get, False) 1014 self.assertTimingAlmostEqual(get.elapsed, 0) 1015 1016 self.assertRaises(pyqueue.Empty, get, False, None) 1017 self.assertTimingAlmostEqual(get.elapsed, 0) 1018 1019 self.assertRaises(pyqueue.Empty, get_nowait) 1020 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1021 1022 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1023 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1024 1025 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1026 self.assertTimingAlmostEqual(get.elapsed, 0) 1027 1028 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1029 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1030 1031 proc.join() 1032 close_queue(queue) 1033 1034 @classmethod 1035 def _test_fork(cls, queue): 1036 for i in range(10, 20): 1037 queue.put(i) 1038 # note that at this point the items may only be buffered, so the 1039 # process cannot shutdown until the feeder thread has finished 1040 # pushing items onto the pipe. 1041 1042 def test_fork(self): 1043 # Old versions of Queue would fail to create a new feeder 1044 # thread for a forked process if the original process had its 1045 # own feeder thread. This test checks that this no longer 1046 # happens. 1047 1048 queue = self.Queue() 1049 1050 # put items on queue so that main process starts a feeder thread 1051 for i in range(10): 1052 queue.put(i) 1053 1054 # wait to make sure thread starts before we fork a new process 1055 time.sleep(DELTA) 1056 1057 # fork process 1058 p = self.Process(target=self._test_fork, args=(queue,)) 1059 p.daemon = True 1060 p.start() 1061 1062 # check that all expected items are in the queue 1063 for i in range(20): 1064 self.assertEqual(queue.get(), i) 1065 self.assertRaises(pyqueue.Empty, queue.get, False) 1066 1067 p.join() 1068 close_queue(queue) 1069 1070 def test_qsize(self): 1071 q = self.Queue() 1072 try: 1073 self.assertEqual(q.qsize(), 0) 1074 except NotImplementedError: 1075 self.skipTest('qsize method not implemented') 1076 q.put(1) 1077 self.assertEqual(q.qsize(), 1) 1078 q.put(5) 1079 self.assertEqual(q.qsize(), 2) 1080 q.get() 1081 self.assertEqual(q.qsize(), 1) 1082 q.get() 1083 self.assertEqual(q.qsize(), 0) 1084 close_queue(q) 1085 1086 @classmethod 1087 def _test_task_done(cls, q): 1088 for obj in iter(q.get, None): 1089 time.sleep(DELTA) 1090 q.task_done() 1091 1092 def test_task_done(self): 1093 queue = self.JoinableQueue() 1094 1095 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1096 for i in range(4)] 1097 1098 for p in workers: 1099 p.daemon = True 1100 p.start() 1101 1102 for i in range(10): 1103 queue.put(i) 1104 1105 queue.join() 1106 1107 for p in workers: 1108 queue.put(None) 1109 1110 for p in workers: 1111 p.join() 1112 close_queue(queue) 1113 1114 def test_no_import_lock_contention(self): 1115 with test.support.temp_cwd(): 1116 module_name = 'imported_by_an_imported_module' 1117 with open(module_name + '.py', 'w') as f: 1118 f.write("""if 1: 1119 import multiprocessing 1120 1121 q = multiprocessing.Queue() 1122 q.put('knock knock') 1123 q.get(timeout=3) 1124 q.close() 1125 del q 1126 """) 1127 1128 with test.support.DirsOnSysPath(os.getcwd()): 1129 try: 1130 __import__(module_name) 1131 except pyqueue.Empty: 1132 self.fail("Probable regression on import lock contention;" 1133 " see Issue #22853") 1134 1135 def test_timeout(self): 1136 q = multiprocessing.Queue() 1137 start = time.monotonic() 1138 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1139 delta = time.monotonic() - start 1140 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1141 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1142 # failed because the delta was only 135.8 ms. 1143 self.assertGreaterEqual(delta, 0.100) 1144 close_queue(q) 1145 1146 def test_queue_feeder_donot_stop_onexc(self): 1147 # bpo-30414: verify feeder handles exceptions correctly 1148 if self.TYPE != 'processes': 1149 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1150 1151 class NotSerializable(object): 1152 def __reduce__(self): 1153 raise AttributeError 1154 with test.support.captured_stderr(): 1155 q = self.Queue() 1156 q.put(NotSerializable()) 1157 q.put(True) 1158 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1159 close_queue(q) 1160 1161 with test.support.captured_stderr(): 1162 # bpo-33078: verify that the queue size is correctly handled 1163 # on errors. 1164 q = self.Queue(maxsize=1) 1165 q.put(NotSerializable()) 1166 q.put(True) 1167 try: 1168 self.assertEqual(q.qsize(), 1) 1169 except NotImplementedError: 1170 # qsize is not available on all platform as it 1171 # relies on sem_getvalue 1172 pass 1173 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1174 # Check that the size of the queue is correct 1175 self.assertTrue(q.empty()) 1176 close_queue(q) 1177 1178 def test_queue_feeder_on_queue_feeder_error(self): 1179 # bpo-30006: verify feeder handles exceptions using the 1180 # _on_queue_feeder_error hook. 1181 if self.TYPE != 'processes': 1182 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1183 1184 class NotSerializable(object): 1185 """Mock unserializable object""" 1186 def __init__(self): 1187 self.reduce_was_called = False 1188 self.on_queue_feeder_error_was_called = False 1189 1190 def __reduce__(self): 1191 self.reduce_was_called = True 1192 raise AttributeError 1193 1194 class SafeQueue(multiprocessing.queues.Queue): 1195 """Queue with overloaded _on_queue_feeder_error hook""" 1196 @staticmethod 1197 def _on_queue_feeder_error(e, obj): 1198 if (isinstance(e, AttributeError) and 1199 isinstance(obj, NotSerializable)): 1200 obj.on_queue_feeder_error_was_called = True 1201 1202 not_serializable_obj = NotSerializable() 1203 # The captured_stderr reduces the noise in the test report 1204 with test.support.captured_stderr(): 1205 q = SafeQueue(ctx=multiprocessing.get_context()) 1206 q.put(not_serializable_obj) 1207 1208 # Verify that q is still functioning correctly 1209 q.put(True) 1210 self.assertTrue(q.get(timeout=support.SHORT_TIMEOUT)) 1211 1212 # Assert that the serialization and the hook have been called correctly 1213 self.assertTrue(not_serializable_obj.reduce_was_called) 1214 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1215 1216 def test_closed_queue_put_get_exceptions(self): 1217 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1218 q.close() 1219 with self.assertRaisesRegex(ValueError, 'is closed'): 1220 q.put('foo') 1221 with self.assertRaisesRegex(ValueError, 'is closed'): 1222 q.get() 1223# 1224# 1225# 1226 1227class _TestLock(BaseTestCase): 1228 1229 def test_lock(self): 1230 lock = self.Lock() 1231 self.assertEqual(lock.acquire(), True) 1232 self.assertEqual(lock.acquire(False), False) 1233 self.assertEqual(lock.release(), None) 1234 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1235 1236 def test_rlock(self): 1237 lock = self.RLock() 1238 self.assertEqual(lock.acquire(), True) 1239 self.assertEqual(lock.acquire(), True) 1240 self.assertEqual(lock.acquire(), True) 1241 self.assertEqual(lock.release(), None) 1242 self.assertEqual(lock.release(), None) 1243 self.assertEqual(lock.release(), None) 1244 self.assertRaises((AssertionError, RuntimeError), lock.release) 1245 1246 def test_lock_context(self): 1247 with self.Lock(): 1248 pass 1249 1250 1251class _TestSemaphore(BaseTestCase): 1252 1253 def _test_semaphore(self, sem): 1254 self.assertReturnsIfImplemented(2, get_value, sem) 1255 self.assertEqual(sem.acquire(), True) 1256 self.assertReturnsIfImplemented(1, get_value, sem) 1257 self.assertEqual(sem.acquire(), True) 1258 self.assertReturnsIfImplemented(0, get_value, sem) 1259 self.assertEqual(sem.acquire(False), False) 1260 self.assertReturnsIfImplemented(0, get_value, sem) 1261 self.assertEqual(sem.release(), None) 1262 self.assertReturnsIfImplemented(1, get_value, sem) 1263 self.assertEqual(sem.release(), None) 1264 self.assertReturnsIfImplemented(2, get_value, sem) 1265 1266 def test_semaphore(self): 1267 sem = self.Semaphore(2) 1268 self._test_semaphore(sem) 1269 self.assertEqual(sem.release(), None) 1270 self.assertReturnsIfImplemented(3, get_value, sem) 1271 self.assertEqual(sem.release(), None) 1272 self.assertReturnsIfImplemented(4, get_value, sem) 1273 1274 def test_bounded_semaphore(self): 1275 sem = self.BoundedSemaphore(2) 1276 self._test_semaphore(sem) 1277 # Currently fails on OS/X 1278 #if HAVE_GETVALUE: 1279 # self.assertRaises(ValueError, sem.release) 1280 # self.assertReturnsIfImplemented(2, get_value, sem) 1281 1282 def test_timeout(self): 1283 if self.TYPE != 'processes': 1284 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1285 1286 sem = self.Semaphore(0) 1287 acquire = TimingWrapper(sem.acquire) 1288 1289 self.assertEqual(acquire(False), False) 1290 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1291 1292 self.assertEqual(acquire(False, None), False) 1293 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1294 1295 self.assertEqual(acquire(False, TIMEOUT1), False) 1296 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1297 1298 self.assertEqual(acquire(True, TIMEOUT2), False) 1299 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1300 1301 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1302 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1303 1304 1305class _TestCondition(BaseTestCase): 1306 1307 @classmethod 1308 def f(cls, cond, sleeping, woken, timeout=None): 1309 cond.acquire() 1310 sleeping.release() 1311 cond.wait(timeout) 1312 woken.release() 1313 cond.release() 1314 1315 def assertReachesEventually(self, func, value): 1316 for i in range(10): 1317 try: 1318 if func() == value: 1319 break 1320 except NotImplementedError: 1321 break 1322 time.sleep(DELTA) 1323 time.sleep(DELTA) 1324 self.assertReturnsIfImplemented(value, func) 1325 1326 def check_invariant(self, cond): 1327 # this is only supposed to succeed when there are no sleepers 1328 if self.TYPE == 'processes': 1329 try: 1330 sleepers = (cond._sleeping_count.get_value() - 1331 cond._woken_count.get_value()) 1332 self.assertEqual(sleepers, 0) 1333 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1334 except NotImplementedError: 1335 pass 1336 1337 def test_notify(self): 1338 cond = self.Condition() 1339 sleeping = self.Semaphore(0) 1340 woken = self.Semaphore(0) 1341 1342 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1343 p.daemon = True 1344 p.start() 1345 self.addCleanup(p.join) 1346 1347 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1348 p.daemon = True 1349 p.start() 1350 self.addCleanup(p.join) 1351 1352 # wait for both children to start sleeping 1353 sleeping.acquire() 1354 sleeping.acquire() 1355 1356 # check no process/thread has woken up 1357 time.sleep(DELTA) 1358 self.assertReturnsIfImplemented(0, get_value, woken) 1359 1360 # wake up one process/thread 1361 cond.acquire() 1362 cond.notify() 1363 cond.release() 1364 1365 # check one process/thread has woken up 1366 time.sleep(DELTA) 1367 self.assertReturnsIfImplemented(1, get_value, woken) 1368 1369 # wake up another 1370 cond.acquire() 1371 cond.notify() 1372 cond.release() 1373 1374 # check other has woken up 1375 time.sleep(DELTA) 1376 self.assertReturnsIfImplemented(2, get_value, woken) 1377 1378 # check state is not mucked up 1379 self.check_invariant(cond) 1380 p.join() 1381 1382 def test_notify_all(self): 1383 cond = self.Condition() 1384 sleeping = self.Semaphore(0) 1385 woken = self.Semaphore(0) 1386 1387 # start some threads/processes which will timeout 1388 for i in range(3): 1389 p = self.Process(target=self.f, 1390 args=(cond, sleeping, woken, TIMEOUT1)) 1391 p.daemon = True 1392 p.start() 1393 self.addCleanup(p.join) 1394 1395 t = threading.Thread(target=self.f, 1396 args=(cond, sleeping, woken, TIMEOUT1)) 1397 t.daemon = True 1398 t.start() 1399 self.addCleanup(t.join) 1400 1401 # wait for them all to sleep 1402 for i in range(6): 1403 sleeping.acquire() 1404 1405 # check they have all timed out 1406 for i in range(6): 1407 woken.acquire() 1408 self.assertReturnsIfImplemented(0, get_value, woken) 1409 1410 # check state is not mucked up 1411 self.check_invariant(cond) 1412 1413 # start some more threads/processes 1414 for i in range(3): 1415 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1416 p.daemon = True 1417 p.start() 1418 self.addCleanup(p.join) 1419 1420 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1421 t.daemon = True 1422 t.start() 1423 self.addCleanup(t.join) 1424 1425 # wait for them to all sleep 1426 for i in range(6): 1427 sleeping.acquire() 1428 1429 # check no process/thread has woken up 1430 time.sleep(DELTA) 1431 self.assertReturnsIfImplemented(0, get_value, woken) 1432 1433 # wake them all up 1434 cond.acquire() 1435 cond.notify_all() 1436 cond.release() 1437 1438 # check they have all woken 1439 self.assertReachesEventually(lambda: get_value(woken), 6) 1440 1441 # check state is not mucked up 1442 self.check_invariant(cond) 1443 1444 def test_notify_n(self): 1445 cond = self.Condition() 1446 sleeping = self.Semaphore(0) 1447 woken = self.Semaphore(0) 1448 1449 # start some threads/processes 1450 for i in range(3): 1451 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1452 p.daemon = True 1453 p.start() 1454 self.addCleanup(p.join) 1455 1456 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1457 t.daemon = True 1458 t.start() 1459 self.addCleanup(t.join) 1460 1461 # wait for them to all sleep 1462 for i in range(6): 1463 sleeping.acquire() 1464 1465 # check no process/thread has woken up 1466 time.sleep(DELTA) 1467 self.assertReturnsIfImplemented(0, get_value, woken) 1468 1469 # wake some of them up 1470 cond.acquire() 1471 cond.notify(n=2) 1472 cond.release() 1473 1474 # check 2 have woken 1475 self.assertReachesEventually(lambda: get_value(woken), 2) 1476 1477 # wake the rest of them 1478 cond.acquire() 1479 cond.notify(n=4) 1480 cond.release() 1481 1482 self.assertReachesEventually(lambda: get_value(woken), 6) 1483 1484 # doesn't do anything more 1485 cond.acquire() 1486 cond.notify(n=3) 1487 cond.release() 1488 1489 self.assertReturnsIfImplemented(6, get_value, woken) 1490 1491 # check state is not mucked up 1492 self.check_invariant(cond) 1493 1494 def test_timeout(self): 1495 cond = self.Condition() 1496 wait = TimingWrapper(cond.wait) 1497 cond.acquire() 1498 res = wait(TIMEOUT1) 1499 cond.release() 1500 self.assertEqual(res, False) 1501 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1502 1503 @classmethod 1504 def _test_waitfor_f(cls, cond, state): 1505 with cond: 1506 state.value = 0 1507 cond.notify() 1508 result = cond.wait_for(lambda : state.value==4) 1509 if not result or state.value != 4: 1510 sys.exit(1) 1511 1512 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1513 def test_waitfor(self): 1514 # based on test in test/lock_tests.py 1515 cond = self.Condition() 1516 state = self.Value('i', -1) 1517 1518 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1519 p.daemon = True 1520 p.start() 1521 1522 with cond: 1523 result = cond.wait_for(lambda : state.value==0) 1524 self.assertTrue(result) 1525 self.assertEqual(state.value, 0) 1526 1527 for i in range(4): 1528 time.sleep(0.01) 1529 with cond: 1530 state.value += 1 1531 cond.notify() 1532 1533 join_process(p) 1534 self.assertEqual(p.exitcode, 0) 1535 1536 @classmethod 1537 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1538 sem.release() 1539 with cond: 1540 expected = 0.1 1541 dt = time.monotonic() 1542 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1543 dt = time.monotonic() - dt 1544 # borrow logic in assertTimeout() from test/lock_tests.py 1545 if not result and expected * 0.6 < dt < expected * 10.0: 1546 success.value = True 1547 1548 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1549 def test_waitfor_timeout(self): 1550 # based on test in test/lock_tests.py 1551 cond = self.Condition() 1552 state = self.Value('i', 0) 1553 success = self.Value('i', False) 1554 sem = self.Semaphore(0) 1555 1556 p = self.Process(target=self._test_waitfor_timeout_f, 1557 args=(cond, state, success, sem)) 1558 p.daemon = True 1559 p.start() 1560 self.assertTrue(sem.acquire(timeout=support.LONG_TIMEOUT)) 1561 1562 # Only increment 3 times, so state == 4 is never reached. 1563 for i in range(3): 1564 time.sleep(0.01) 1565 with cond: 1566 state.value += 1 1567 cond.notify() 1568 1569 join_process(p) 1570 self.assertTrue(success.value) 1571 1572 @classmethod 1573 def _test_wait_result(cls, c, pid): 1574 with c: 1575 c.notify() 1576 time.sleep(1) 1577 if pid is not None: 1578 os.kill(pid, signal.SIGINT) 1579 1580 def test_wait_result(self): 1581 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1582 pid = os.getpid() 1583 else: 1584 pid = None 1585 1586 c = self.Condition() 1587 with c: 1588 self.assertFalse(c.wait(0)) 1589 self.assertFalse(c.wait(0.1)) 1590 1591 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1592 p.start() 1593 1594 self.assertTrue(c.wait(60)) 1595 if pid is not None: 1596 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1597 1598 p.join() 1599 1600 1601class _TestEvent(BaseTestCase): 1602 1603 @classmethod 1604 def _test_event(cls, event): 1605 time.sleep(TIMEOUT2) 1606 event.set() 1607 1608 def test_event(self): 1609 event = self.Event() 1610 wait = TimingWrapper(event.wait) 1611 1612 # Removed temporarily, due to API shear, this does not 1613 # work with threading._Event objects. is_set == isSet 1614 self.assertEqual(event.is_set(), False) 1615 1616 # Removed, threading.Event.wait() will return the value of the __flag 1617 # instead of None. API Shear with the semaphore backed mp.Event 1618 self.assertEqual(wait(0.0), False) 1619 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1620 self.assertEqual(wait(TIMEOUT1), False) 1621 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1622 1623 event.set() 1624 1625 # See note above on the API differences 1626 self.assertEqual(event.is_set(), True) 1627 self.assertEqual(wait(), True) 1628 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1629 self.assertEqual(wait(TIMEOUT1), True) 1630 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1631 # self.assertEqual(event.is_set(), True) 1632 1633 event.clear() 1634 1635 #self.assertEqual(event.is_set(), False) 1636 1637 p = self.Process(target=self._test_event, args=(event,)) 1638 p.daemon = True 1639 p.start() 1640 self.assertEqual(wait(), True) 1641 p.join() 1642 1643# 1644# Tests for Barrier - adapted from tests in test/lock_tests.py 1645# 1646 1647# Many of the tests for threading.Barrier use a list as an atomic 1648# counter: a value is appended to increment the counter, and the 1649# length of the list gives the value. We use the class DummyList 1650# for the same purpose. 1651 1652class _DummyList(object): 1653 1654 def __init__(self): 1655 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1656 lock = multiprocessing.Lock() 1657 self.__setstate__((wrapper, lock)) 1658 self._lengthbuf[0] = 0 1659 1660 def __setstate__(self, state): 1661 (self._wrapper, self._lock) = state 1662 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1663 1664 def __getstate__(self): 1665 return (self._wrapper, self._lock) 1666 1667 def append(self, _): 1668 with self._lock: 1669 self._lengthbuf[0] += 1 1670 1671 def __len__(self): 1672 with self._lock: 1673 return self._lengthbuf[0] 1674 1675def _wait(): 1676 # A crude wait/yield function not relying on synchronization primitives. 1677 time.sleep(0.01) 1678 1679 1680class Bunch(object): 1681 """ 1682 A bunch of threads. 1683 """ 1684 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1685 """ 1686 Construct a bunch of `n` threads running the same function `f`. 1687 If `wait_before_exit` is True, the threads won't terminate until 1688 do_finish() is called. 1689 """ 1690 self.f = f 1691 self.args = args 1692 self.n = n 1693 self.started = namespace.DummyList() 1694 self.finished = namespace.DummyList() 1695 self._can_exit = namespace.Event() 1696 if not wait_before_exit: 1697 self._can_exit.set() 1698 1699 threads = [] 1700 for i in range(n): 1701 p = namespace.Process(target=self.task) 1702 p.daemon = True 1703 p.start() 1704 threads.append(p) 1705 1706 def finalize(threads): 1707 for p in threads: 1708 p.join() 1709 1710 self._finalizer = weakref.finalize(self, finalize, threads) 1711 1712 def task(self): 1713 pid = os.getpid() 1714 self.started.append(pid) 1715 try: 1716 self.f(*self.args) 1717 finally: 1718 self.finished.append(pid) 1719 self._can_exit.wait(30) 1720 assert self._can_exit.is_set() 1721 1722 def wait_for_started(self): 1723 while len(self.started) < self.n: 1724 _wait() 1725 1726 def wait_for_finished(self): 1727 while len(self.finished) < self.n: 1728 _wait() 1729 1730 def do_finish(self): 1731 self._can_exit.set() 1732 1733 def close(self): 1734 self._finalizer() 1735 1736 1737class AppendTrue(object): 1738 def __init__(self, obj): 1739 self.obj = obj 1740 def __call__(self): 1741 self.obj.append(True) 1742 1743 1744class _TestBarrier(BaseTestCase): 1745 """ 1746 Tests for Barrier objects. 1747 """ 1748 N = 5 1749 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1750 1751 def setUp(self): 1752 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1753 1754 def tearDown(self): 1755 self.barrier.abort() 1756 self.barrier = None 1757 1758 def DummyList(self): 1759 if self.TYPE == 'threads': 1760 return [] 1761 elif self.TYPE == 'manager': 1762 return self.manager.list() 1763 else: 1764 return _DummyList() 1765 1766 def run_threads(self, f, args): 1767 b = Bunch(self, f, args, self.N-1) 1768 try: 1769 f(*args) 1770 b.wait_for_finished() 1771 finally: 1772 b.close() 1773 1774 @classmethod 1775 def multipass(cls, barrier, results, n): 1776 m = barrier.parties 1777 assert m == cls.N 1778 for i in range(n): 1779 results[0].append(True) 1780 assert len(results[1]) == i * m 1781 barrier.wait() 1782 results[1].append(True) 1783 assert len(results[0]) == (i + 1) * m 1784 barrier.wait() 1785 try: 1786 assert barrier.n_waiting == 0 1787 except NotImplementedError: 1788 pass 1789 assert not barrier.broken 1790 1791 def test_barrier(self, passes=1): 1792 """ 1793 Test that a barrier is passed in lockstep 1794 """ 1795 results = [self.DummyList(), self.DummyList()] 1796 self.run_threads(self.multipass, (self.barrier, results, passes)) 1797 1798 def test_barrier_10(self): 1799 """ 1800 Test that a barrier works for 10 consecutive runs 1801 """ 1802 return self.test_barrier(10) 1803 1804 @classmethod 1805 def _test_wait_return_f(cls, barrier, queue): 1806 res = barrier.wait() 1807 queue.put(res) 1808 1809 def test_wait_return(self): 1810 """ 1811 test the return value from barrier.wait 1812 """ 1813 queue = self.Queue() 1814 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1815 results = [queue.get() for i in range(self.N)] 1816 self.assertEqual(results.count(0), 1) 1817 close_queue(queue) 1818 1819 @classmethod 1820 def _test_action_f(cls, barrier, results): 1821 barrier.wait() 1822 if len(results) != 1: 1823 raise RuntimeError 1824 1825 def test_action(self): 1826 """ 1827 Test the 'action' callback 1828 """ 1829 results = self.DummyList() 1830 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1831 self.run_threads(self._test_action_f, (barrier, results)) 1832 self.assertEqual(len(results), 1) 1833 1834 @classmethod 1835 def _test_abort_f(cls, barrier, results1, results2): 1836 try: 1837 i = barrier.wait() 1838 if i == cls.N//2: 1839 raise RuntimeError 1840 barrier.wait() 1841 results1.append(True) 1842 except threading.BrokenBarrierError: 1843 results2.append(True) 1844 except RuntimeError: 1845 barrier.abort() 1846 1847 def test_abort(self): 1848 """ 1849 Test that an abort will put the barrier in a broken state 1850 """ 1851 results1 = self.DummyList() 1852 results2 = self.DummyList() 1853 self.run_threads(self._test_abort_f, 1854 (self.barrier, results1, results2)) 1855 self.assertEqual(len(results1), 0) 1856 self.assertEqual(len(results2), self.N-1) 1857 self.assertTrue(self.barrier.broken) 1858 1859 @classmethod 1860 def _test_reset_f(cls, barrier, results1, results2, results3): 1861 i = barrier.wait() 1862 if i == cls.N//2: 1863 # Wait until the other threads are all in the barrier. 1864 while barrier.n_waiting < cls.N-1: 1865 time.sleep(0.001) 1866 barrier.reset() 1867 else: 1868 try: 1869 barrier.wait() 1870 results1.append(True) 1871 except threading.BrokenBarrierError: 1872 results2.append(True) 1873 # Now, pass the barrier again 1874 barrier.wait() 1875 results3.append(True) 1876 1877 def test_reset(self): 1878 """ 1879 Test that a 'reset' on a barrier frees the waiting threads 1880 """ 1881 results1 = self.DummyList() 1882 results2 = self.DummyList() 1883 results3 = self.DummyList() 1884 self.run_threads(self._test_reset_f, 1885 (self.barrier, results1, results2, results3)) 1886 self.assertEqual(len(results1), 0) 1887 self.assertEqual(len(results2), self.N-1) 1888 self.assertEqual(len(results3), self.N) 1889 1890 @classmethod 1891 def _test_abort_and_reset_f(cls, barrier, barrier2, 1892 results1, results2, results3): 1893 try: 1894 i = barrier.wait() 1895 if i == cls.N//2: 1896 raise RuntimeError 1897 barrier.wait() 1898 results1.append(True) 1899 except threading.BrokenBarrierError: 1900 results2.append(True) 1901 except RuntimeError: 1902 barrier.abort() 1903 # Synchronize and reset the barrier. Must synchronize first so 1904 # that everyone has left it when we reset, and after so that no 1905 # one enters it before the reset. 1906 if barrier2.wait() == cls.N//2: 1907 barrier.reset() 1908 barrier2.wait() 1909 barrier.wait() 1910 results3.append(True) 1911 1912 def test_abort_and_reset(self): 1913 """ 1914 Test that a barrier can be reset after being broken. 1915 """ 1916 results1 = self.DummyList() 1917 results2 = self.DummyList() 1918 results3 = self.DummyList() 1919 barrier2 = self.Barrier(self.N) 1920 1921 self.run_threads(self._test_abort_and_reset_f, 1922 (self.barrier, barrier2, results1, results2, results3)) 1923 self.assertEqual(len(results1), 0) 1924 self.assertEqual(len(results2), self.N-1) 1925 self.assertEqual(len(results3), self.N) 1926 1927 @classmethod 1928 def _test_timeout_f(cls, barrier, results): 1929 i = barrier.wait() 1930 if i == cls.N//2: 1931 # One thread is late! 1932 time.sleep(1.0) 1933 try: 1934 barrier.wait(0.5) 1935 except threading.BrokenBarrierError: 1936 results.append(True) 1937 1938 def test_timeout(self): 1939 """ 1940 Test wait(timeout) 1941 """ 1942 results = self.DummyList() 1943 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1944 self.assertEqual(len(results), self.barrier.parties) 1945 1946 @classmethod 1947 def _test_default_timeout_f(cls, barrier, results): 1948 i = barrier.wait(cls.defaultTimeout) 1949 if i == cls.N//2: 1950 # One thread is later than the default timeout 1951 time.sleep(1.0) 1952 try: 1953 barrier.wait() 1954 except threading.BrokenBarrierError: 1955 results.append(True) 1956 1957 def test_default_timeout(self): 1958 """ 1959 Test the barrier's default timeout 1960 """ 1961 barrier = self.Barrier(self.N, timeout=0.5) 1962 results = self.DummyList() 1963 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1964 self.assertEqual(len(results), barrier.parties) 1965 1966 def test_single_thread(self): 1967 b = self.Barrier(1) 1968 b.wait() 1969 b.wait() 1970 1971 @classmethod 1972 def _test_thousand_f(cls, barrier, passes, conn, lock): 1973 for i in range(passes): 1974 barrier.wait() 1975 with lock: 1976 conn.send(i) 1977 1978 def test_thousand(self): 1979 if self.TYPE == 'manager': 1980 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1981 passes = 1000 1982 lock = self.Lock() 1983 conn, child_conn = self.Pipe(False) 1984 for j in range(self.N): 1985 p = self.Process(target=self._test_thousand_f, 1986 args=(self.barrier, passes, child_conn, lock)) 1987 p.start() 1988 self.addCleanup(p.join) 1989 1990 for i in range(passes): 1991 for j in range(self.N): 1992 self.assertEqual(conn.recv(), i) 1993 1994# 1995# 1996# 1997 1998class _TestValue(BaseTestCase): 1999 2000 ALLOWED_TYPES = ('processes',) 2001 2002 codes_values = [ 2003 ('i', 4343, 24234), 2004 ('d', 3.625, -4.25), 2005 ('h', -232, 234), 2006 ('q', 2 ** 33, 2 ** 34), 2007 ('c', latin('x'), latin('y')) 2008 ] 2009 2010 def setUp(self): 2011 if not HAS_SHAREDCTYPES: 2012 self.skipTest("requires multiprocessing.sharedctypes") 2013 2014 @classmethod 2015 def _test(cls, values): 2016 for sv, cv in zip(values, cls.codes_values): 2017 sv.value = cv[2] 2018 2019 2020 def test_value(self, raw=False): 2021 if raw: 2022 values = [self.RawValue(code, value) 2023 for code, value, _ in self.codes_values] 2024 else: 2025 values = [self.Value(code, value) 2026 for code, value, _ in self.codes_values] 2027 2028 for sv, cv in zip(values, self.codes_values): 2029 self.assertEqual(sv.value, cv[1]) 2030 2031 proc = self.Process(target=self._test, args=(values,)) 2032 proc.daemon = True 2033 proc.start() 2034 proc.join() 2035 2036 for sv, cv in zip(values, self.codes_values): 2037 self.assertEqual(sv.value, cv[2]) 2038 2039 def test_rawvalue(self): 2040 self.test_value(raw=True) 2041 2042 def test_getobj_getlock(self): 2043 val1 = self.Value('i', 5) 2044 lock1 = val1.get_lock() 2045 obj1 = val1.get_obj() 2046 2047 val2 = self.Value('i', 5, lock=None) 2048 lock2 = val2.get_lock() 2049 obj2 = val2.get_obj() 2050 2051 lock = self.Lock() 2052 val3 = self.Value('i', 5, lock=lock) 2053 lock3 = val3.get_lock() 2054 obj3 = val3.get_obj() 2055 self.assertEqual(lock, lock3) 2056 2057 arr4 = self.Value('i', 5, lock=False) 2058 self.assertFalse(hasattr(arr4, 'get_lock')) 2059 self.assertFalse(hasattr(arr4, 'get_obj')) 2060 2061 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2062 2063 arr5 = self.RawValue('i', 5) 2064 self.assertFalse(hasattr(arr5, 'get_lock')) 2065 self.assertFalse(hasattr(arr5, 'get_obj')) 2066 2067 2068class _TestArray(BaseTestCase): 2069 2070 ALLOWED_TYPES = ('processes',) 2071 2072 @classmethod 2073 def f(cls, seq): 2074 for i in range(1, len(seq)): 2075 seq[i] += seq[i-1] 2076 2077 @unittest.skipIf(c_int is None, "requires _ctypes") 2078 def test_array(self, raw=False): 2079 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2080 if raw: 2081 arr = self.RawArray('i', seq) 2082 else: 2083 arr = self.Array('i', seq) 2084 2085 self.assertEqual(len(arr), len(seq)) 2086 self.assertEqual(arr[3], seq[3]) 2087 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2088 2089 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2090 2091 self.assertEqual(list(arr[:]), seq) 2092 2093 self.f(seq) 2094 2095 p = self.Process(target=self.f, args=(arr,)) 2096 p.daemon = True 2097 p.start() 2098 p.join() 2099 2100 self.assertEqual(list(arr[:]), seq) 2101 2102 @unittest.skipIf(c_int is None, "requires _ctypes") 2103 def test_array_from_size(self): 2104 size = 10 2105 # Test for zeroing (see issue #11675). 2106 # The repetition below strengthens the test by increasing the chances 2107 # of previously allocated non-zero memory being used for the new array 2108 # on the 2nd and 3rd loops. 2109 for _ in range(3): 2110 arr = self.Array('i', size) 2111 self.assertEqual(len(arr), size) 2112 self.assertEqual(list(arr), [0] * size) 2113 arr[:] = range(10) 2114 self.assertEqual(list(arr), list(range(10))) 2115 del arr 2116 2117 @unittest.skipIf(c_int is None, "requires _ctypes") 2118 def test_rawarray(self): 2119 self.test_array(raw=True) 2120 2121 @unittest.skipIf(c_int is None, "requires _ctypes") 2122 def test_getobj_getlock_obj(self): 2123 arr1 = self.Array('i', list(range(10))) 2124 lock1 = arr1.get_lock() 2125 obj1 = arr1.get_obj() 2126 2127 arr2 = self.Array('i', list(range(10)), lock=None) 2128 lock2 = arr2.get_lock() 2129 obj2 = arr2.get_obj() 2130 2131 lock = self.Lock() 2132 arr3 = self.Array('i', list(range(10)), lock=lock) 2133 lock3 = arr3.get_lock() 2134 obj3 = arr3.get_obj() 2135 self.assertEqual(lock, lock3) 2136 2137 arr4 = self.Array('i', range(10), lock=False) 2138 self.assertFalse(hasattr(arr4, 'get_lock')) 2139 self.assertFalse(hasattr(arr4, 'get_obj')) 2140 self.assertRaises(AttributeError, 2141 self.Array, 'i', range(10), lock='notalock') 2142 2143 arr5 = self.RawArray('i', range(10)) 2144 self.assertFalse(hasattr(arr5, 'get_lock')) 2145 self.assertFalse(hasattr(arr5, 'get_obj')) 2146 2147# 2148# 2149# 2150 2151class _TestContainers(BaseTestCase): 2152 2153 ALLOWED_TYPES = ('manager',) 2154 2155 def test_list(self): 2156 a = self.list(list(range(10))) 2157 self.assertEqual(a[:], list(range(10))) 2158 2159 b = self.list() 2160 self.assertEqual(b[:], []) 2161 2162 b.extend(list(range(5))) 2163 self.assertEqual(b[:], list(range(5))) 2164 2165 self.assertEqual(b[2], 2) 2166 self.assertEqual(b[2:10], [2,3,4]) 2167 2168 b *= 2 2169 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2170 2171 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2172 2173 self.assertEqual(a[:], list(range(10))) 2174 2175 d = [a, b] 2176 e = self.list(d) 2177 self.assertEqual( 2178 [element[:] for element in e], 2179 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2180 ) 2181 2182 f = self.list([a]) 2183 a.append('hello') 2184 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2185 2186 def test_list_iter(self): 2187 a = self.list(list(range(10))) 2188 it = iter(a) 2189 self.assertEqual(list(it), list(range(10))) 2190 self.assertEqual(list(it), []) # exhausted 2191 # list modified during iteration 2192 it = iter(a) 2193 a[0] = 100 2194 self.assertEqual(next(it), 100) 2195 2196 def test_list_proxy_in_list(self): 2197 a = self.list([self.list(range(3)) for _i in range(3)]) 2198 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2199 2200 a[0][-1] = 55 2201 self.assertEqual(a[0][:], [0, 1, 55]) 2202 for i in range(1, 3): 2203 self.assertEqual(a[i][:], [0, 1, 2]) 2204 2205 self.assertEqual(a[1].pop(), 2) 2206 self.assertEqual(len(a[1]), 2) 2207 for i in range(0, 3, 2): 2208 self.assertEqual(len(a[i]), 3) 2209 2210 del a 2211 2212 b = self.list() 2213 b.append(b) 2214 del b 2215 2216 def test_dict(self): 2217 d = self.dict() 2218 indices = list(range(65, 70)) 2219 for i in indices: 2220 d[i] = chr(i) 2221 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2222 self.assertEqual(sorted(d.keys()), indices) 2223 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2224 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2225 2226 def test_dict_iter(self): 2227 d = self.dict() 2228 indices = list(range(65, 70)) 2229 for i in indices: 2230 d[i] = chr(i) 2231 it = iter(d) 2232 self.assertEqual(list(it), indices) 2233 self.assertEqual(list(it), []) # exhausted 2234 # dictionary changed size during iteration 2235 it = iter(d) 2236 d.clear() 2237 self.assertRaises(RuntimeError, next, it) 2238 2239 def test_dict_proxy_nested(self): 2240 pets = self.dict(ferrets=2, hamsters=4) 2241 supplies = self.dict(water=10, feed=3) 2242 d = self.dict(pets=pets, supplies=supplies) 2243 2244 self.assertEqual(supplies['water'], 10) 2245 self.assertEqual(d['supplies']['water'], 10) 2246 2247 d['supplies']['blankets'] = 5 2248 self.assertEqual(supplies['blankets'], 5) 2249 self.assertEqual(d['supplies']['blankets'], 5) 2250 2251 d['supplies']['water'] = 7 2252 self.assertEqual(supplies['water'], 7) 2253 self.assertEqual(d['supplies']['water'], 7) 2254 2255 del pets 2256 del supplies 2257 self.assertEqual(d['pets']['ferrets'], 2) 2258 d['supplies']['blankets'] = 11 2259 self.assertEqual(d['supplies']['blankets'], 11) 2260 2261 pets = d['pets'] 2262 supplies = d['supplies'] 2263 supplies['water'] = 7 2264 self.assertEqual(supplies['water'], 7) 2265 self.assertEqual(d['supplies']['water'], 7) 2266 2267 d.clear() 2268 self.assertEqual(len(d), 0) 2269 self.assertEqual(supplies['water'], 7) 2270 self.assertEqual(pets['hamsters'], 4) 2271 2272 l = self.list([pets, supplies]) 2273 l[0]['marmots'] = 1 2274 self.assertEqual(pets['marmots'], 1) 2275 self.assertEqual(l[0]['marmots'], 1) 2276 2277 del pets 2278 del supplies 2279 self.assertEqual(l[0]['marmots'], 1) 2280 2281 outer = self.list([[88, 99], l]) 2282 self.assertIsInstance(outer[0], list) # Not a ListProxy 2283 self.assertEqual(outer[-1][-1]['feed'], 3) 2284 2285 def test_namespace(self): 2286 n = self.Namespace() 2287 n.name = 'Bob' 2288 n.job = 'Builder' 2289 n._hidden = 'hidden' 2290 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2291 del n.job 2292 self.assertEqual(str(n), "Namespace(name='Bob')") 2293 self.assertTrue(hasattr(n, 'name')) 2294 self.assertTrue(not hasattr(n, 'job')) 2295 2296# 2297# 2298# 2299 2300def sqr(x, wait=0.0): 2301 time.sleep(wait) 2302 return x*x 2303 2304def mul(x, y): 2305 return x*y 2306 2307def raise_large_valuerror(wait): 2308 time.sleep(wait) 2309 raise ValueError("x" * 1024**2) 2310 2311def identity(x): 2312 return x 2313 2314class CountedObject(object): 2315 n_instances = 0 2316 2317 def __new__(cls): 2318 cls.n_instances += 1 2319 return object.__new__(cls) 2320 2321 def __del__(self): 2322 type(self).n_instances -= 1 2323 2324class SayWhenError(ValueError): pass 2325 2326def exception_throwing_generator(total, when): 2327 if when == -1: 2328 raise SayWhenError("Somebody said when") 2329 for i in range(total): 2330 if i == when: 2331 raise SayWhenError("Somebody said when") 2332 yield i 2333 2334 2335class _TestPool(BaseTestCase): 2336 2337 @classmethod 2338 def setUpClass(cls): 2339 super().setUpClass() 2340 cls.pool = cls.Pool(4) 2341 2342 @classmethod 2343 def tearDownClass(cls): 2344 cls.pool.terminate() 2345 cls.pool.join() 2346 cls.pool = None 2347 super().tearDownClass() 2348 2349 def test_apply(self): 2350 papply = self.pool.apply 2351 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2352 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2353 2354 def test_map(self): 2355 pmap = self.pool.map 2356 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2357 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2358 list(map(sqr, list(range(100))))) 2359 2360 def test_starmap(self): 2361 psmap = self.pool.starmap 2362 tuples = list(zip(range(10), range(9,-1, -1))) 2363 self.assertEqual(psmap(mul, tuples), 2364 list(itertools.starmap(mul, tuples))) 2365 tuples = list(zip(range(100), range(99,-1, -1))) 2366 self.assertEqual(psmap(mul, tuples, chunksize=20), 2367 list(itertools.starmap(mul, tuples))) 2368 2369 def test_starmap_async(self): 2370 tuples = list(zip(range(100), range(99,-1, -1))) 2371 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2372 list(itertools.starmap(mul, tuples))) 2373 2374 def test_map_async(self): 2375 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2376 list(map(sqr, list(range(10))))) 2377 2378 def test_map_async_callbacks(self): 2379 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2380 self.pool.map_async(int, ['1'], 2381 callback=call_args.append, 2382 error_callback=call_args.append).wait() 2383 self.assertEqual(1, len(call_args)) 2384 self.assertEqual([1], call_args[0]) 2385 self.pool.map_async(int, ['a'], 2386 callback=call_args.append, 2387 error_callback=call_args.append).wait() 2388 self.assertEqual(2, len(call_args)) 2389 self.assertIsInstance(call_args[1], ValueError) 2390 2391 def test_map_unplicklable(self): 2392 # Issue #19425 -- failure to pickle should not cause a hang 2393 if self.TYPE == 'threads': 2394 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2395 class A(object): 2396 def __reduce__(self): 2397 raise RuntimeError('cannot pickle') 2398 with self.assertRaises(RuntimeError): 2399 self.pool.map(sqr, [A()]*10) 2400 2401 def test_map_chunksize(self): 2402 try: 2403 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2404 except multiprocessing.TimeoutError: 2405 self.fail("pool.map_async with chunksize stalled on null list") 2406 2407 def test_map_handle_iterable_exception(self): 2408 if self.TYPE == 'manager': 2409 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2410 2411 # SayWhenError seen at the very first of the iterable 2412 with self.assertRaises(SayWhenError): 2413 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2414 # again, make sure it's reentrant 2415 with self.assertRaises(SayWhenError): 2416 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2417 2418 with self.assertRaises(SayWhenError): 2419 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2420 2421 class SpecialIterable: 2422 def __iter__(self): 2423 return self 2424 def __next__(self): 2425 raise SayWhenError 2426 def __len__(self): 2427 return 1 2428 with self.assertRaises(SayWhenError): 2429 self.pool.map(sqr, SpecialIterable(), 1) 2430 with self.assertRaises(SayWhenError): 2431 self.pool.map(sqr, SpecialIterable(), 1) 2432 2433 def test_async(self): 2434 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2435 get = TimingWrapper(res.get) 2436 self.assertEqual(get(), 49) 2437 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2438 2439 def test_async_timeout(self): 2440 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2441 get = TimingWrapper(res.get) 2442 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2443 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2444 2445 def test_imap(self): 2446 it = self.pool.imap(sqr, list(range(10))) 2447 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2448 2449 it = self.pool.imap(sqr, list(range(10))) 2450 for i in range(10): 2451 self.assertEqual(next(it), i*i) 2452 self.assertRaises(StopIteration, it.__next__) 2453 2454 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2455 for i in range(1000): 2456 self.assertEqual(next(it), i*i) 2457 self.assertRaises(StopIteration, it.__next__) 2458 2459 def test_imap_handle_iterable_exception(self): 2460 if self.TYPE == 'manager': 2461 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2462 2463 # SayWhenError seen at the very first of the iterable 2464 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2465 self.assertRaises(SayWhenError, it.__next__) 2466 # again, make sure it's reentrant 2467 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2468 self.assertRaises(SayWhenError, it.__next__) 2469 2470 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2471 for i in range(3): 2472 self.assertEqual(next(it), i*i) 2473 self.assertRaises(SayWhenError, it.__next__) 2474 2475 # SayWhenError seen at start of problematic chunk's results 2476 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2477 for i in range(6): 2478 self.assertEqual(next(it), i*i) 2479 self.assertRaises(SayWhenError, it.__next__) 2480 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2481 for i in range(4): 2482 self.assertEqual(next(it), i*i) 2483 self.assertRaises(SayWhenError, it.__next__) 2484 2485 def test_imap_unordered(self): 2486 it = self.pool.imap_unordered(sqr, list(range(10))) 2487 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2488 2489 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2490 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2491 2492 def test_imap_unordered_handle_iterable_exception(self): 2493 if self.TYPE == 'manager': 2494 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2495 2496 # SayWhenError seen at the very first of the iterable 2497 it = self.pool.imap_unordered(sqr, 2498 exception_throwing_generator(1, -1), 2499 1) 2500 self.assertRaises(SayWhenError, it.__next__) 2501 # again, make sure it's reentrant 2502 it = self.pool.imap_unordered(sqr, 2503 exception_throwing_generator(1, -1), 2504 1) 2505 self.assertRaises(SayWhenError, it.__next__) 2506 2507 it = self.pool.imap_unordered(sqr, 2508 exception_throwing_generator(10, 3), 2509 1) 2510 expected_values = list(map(sqr, list(range(10)))) 2511 with self.assertRaises(SayWhenError): 2512 # imap_unordered makes it difficult to anticipate the SayWhenError 2513 for i in range(10): 2514 value = next(it) 2515 self.assertIn(value, expected_values) 2516 expected_values.remove(value) 2517 2518 it = self.pool.imap_unordered(sqr, 2519 exception_throwing_generator(20, 7), 2520 2) 2521 expected_values = list(map(sqr, list(range(20)))) 2522 with self.assertRaises(SayWhenError): 2523 for i in range(20): 2524 value = next(it) 2525 self.assertIn(value, expected_values) 2526 expected_values.remove(value) 2527 2528 def test_make_pool(self): 2529 expected_error = (RemoteError if self.TYPE == 'manager' 2530 else ValueError) 2531 2532 self.assertRaises(expected_error, self.Pool, -1) 2533 self.assertRaises(expected_error, self.Pool, 0) 2534 2535 if self.TYPE != 'manager': 2536 p = self.Pool(3) 2537 try: 2538 self.assertEqual(3, len(p._pool)) 2539 finally: 2540 p.close() 2541 p.join() 2542 2543 def test_terminate(self): 2544 result = self.pool.map_async( 2545 time.sleep, [0.1 for i in range(10000)], chunksize=1 2546 ) 2547 self.pool.terminate() 2548 join = TimingWrapper(self.pool.join) 2549 join() 2550 # Sanity check the pool didn't wait for all tasks to finish 2551 self.assertLess(join.elapsed, 2.0) 2552 2553 def test_empty_iterable(self): 2554 # See Issue 12157 2555 p = self.Pool(1) 2556 2557 self.assertEqual(p.map(sqr, []), []) 2558 self.assertEqual(list(p.imap(sqr, [])), []) 2559 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2560 self.assertEqual(p.map_async(sqr, []).get(), []) 2561 2562 p.close() 2563 p.join() 2564 2565 def test_context(self): 2566 if self.TYPE == 'processes': 2567 L = list(range(10)) 2568 expected = [sqr(i) for i in L] 2569 with self.Pool(2) as p: 2570 r = p.map_async(sqr, L) 2571 self.assertEqual(r.get(), expected) 2572 p.join() 2573 self.assertRaises(ValueError, p.map_async, sqr, L) 2574 2575 @classmethod 2576 def _test_traceback(cls): 2577 raise RuntimeError(123) # some comment 2578 2579 def test_traceback(self): 2580 # We want ensure that the traceback from the child process is 2581 # contained in the traceback raised in the main process. 2582 if self.TYPE == 'processes': 2583 with self.Pool(1) as p: 2584 try: 2585 p.apply(self._test_traceback) 2586 except Exception as e: 2587 exc = e 2588 else: 2589 self.fail('expected RuntimeError') 2590 p.join() 2591 self.assertIs(type(exc), RuntimeError) 2592 self.assertEqual(exc.args, (123,)) 2593 cause = exc.__cause__ 2594 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2595 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2596 2597 with test.support.captured_stderr() as f1: 2598 try: 2599 raise exc 2600 except RuntimeError: 2601 sys.excepthook(*sys.exc_info()) 2602 self.assertIn('raise RuntimeError(123) # some comment', 2603 f1.getvalue()) 2604 # _helper_reraises_exception should not make the error 2605 # a remote exception 2606 with self.Pool(1) as p: 2607 try: 2608 p.map(sqr, exception_throwing_generator(1, -1), 1) 2609 except Exception as e: 2610 exc = e 2611 else: 2612 self.fail('expected SayWhenError') 2613 self.assertIs(type(exc), SayWhenError) 2614 self.assertIs(exc.__cause__, None) 2615 p.join() 2616 2617 @classmethod 2618 def _test_wrapped_exception(cls): 2619 raise RuntimeError('foo') 2620 2621 def test_wrapped_exception(self): 2622 # Issue #20980: Should not wrap exception when using thread pool 2623 with self.Pool(1) as p: 2624 with self.assertRaises(RuntimeError): 2625 p.apply(self._test_wrapped_exception) 2626 p.join() 2627 2628 def test_map_no_failfast(self): 2629 # Issue #23992: the fail-fast behaviour when an exception is raised 2630 # during map() would make Pool.join() deadlock, because a worker 2631 # process would fill the result queue (after the result handler thread 2632 # terminated, hence not draining it anymore). 2633 2634 t_start = time.monotonic() 2635 2636 with self.assertRaises(ValueError): 2637 with self.Pool(2) as p: 2638 try: 2639 p.map(raise_large_valuerror, [0, 1]) 2640 finally: 2641 time.sleep(0.5) 2642 p.close() 2643 p.join() 2644 2645 # check that we indeed waited for all jobs 2646 self.assertGreater(time.monotonic() - t_start, 0.9) 2647 2648 def test_release_task_refs(self): 2649 # Issue #29861: task arguments and results should not be kept 2650 # alive after we are done with them. 2651 objs = [CountedObject() for i in range(10)] 2652 refs = [weakref.ref(o) for o in objs] 2653 self.pool.map(identity, objs) 2654 2655 del objs 2656 time.sleep(DELTA) # let threaded cleanup code run 2657 self.assertEqual(set(wr() for wr in refs), {None}) 2658 # With a process pool, copies of the objects are returned, check 2659 # they were released too. 2660 self.assertEqual(CountedObject.n_instances, 0) 2661 2662 def test_enter(self): 2663 if self.TYPE == 'manager': 2664 self.skipTest("test not applicable to manager") 2665 2666 pool = self.Pool(1) 2667 with pool: 2668 pass 2669 # call pool.terminate() 2670 # pool is no longer running 2671 2672 with self.assertRaises(ValueError): 2673 # bpo-35477: pool.__enter__() fails if the pool is not running 2674 with pool: 2675 pass 2676 pool.join() 2677 2678 def test_resource_warning(self): 2679 if self.TYPE == 'manager': 2680 self.skipTest("test not applicable to manager") 2681 2682 pool = self.Pool(1) 2683 pool.terminate() 2684 pool.join() 2685 2686 # force state to RUN to emit ResourceWarning in __del__() 2687 pool._state = multiprocessing.pool.RUN 2688 2689 with support.check_warnings(('unclosed running multiprocessing pool', 2690 ResourceWarning)): 2691 pool = None 2692 support.gc_collect() 2693 2694def raising(): 2695 raise KeyError("key") 2696 2697def unpickleable_result(): 2698 return lambda: 42 2699 2700class _TestPoolWorkerErrors(BaseTestCase): 2701 ALLOWED_TYPES = ('processes', ) 2702 2703 def test_async_error_callback(self): 2704 p = multiprocessing.Pool(2) 2705 2706 scratchpad = [None] 2707 def errback(exc): 2708 scratchpad[0] = exc 2709 2710 res = p.apply_async(raising, error_callback=errback) 2711 self.assertRaises(KeyError, res.get) 2712 self.assertTrue(scratchpad[0]) 2713 self.assertIsInstance(scratchpad[0], KeyError) 2714 2715 p.close() 2716 p.join() 2717 2718 def test_unpickleable_result(self): 2719 from multiprocessing.pool import MaybeEncodingError 2720 p = multiprocessing.Pool(2) 2721 2722 # Make sure we don't lose pool processes because of encoding errors. 2723 for iteration in range(20): 2724 2725 scratchpad = [None] 2726 def errback(exc): 2727 scratchpad[0] = exc 2728 2729 res = p.apply_async(unpickleable_result, error_callback=errback) 2730 self.assertRaises(MaybeEncodingError, res.get) 2731 wrapped = scratchpad[0] 2732 self.assertTrue(wrapped) 2733 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2734 self.assertIsNotNone(wrapped.exc) 2735 self.assertIsNotNone(wrapped.value) 2736 2737 p.close() 2738 p.join() 2739 2740class _TestPoolWorkerLifetime(BaseTestCase): 2741 ALLOWED_TYPES = ('processes', ) 2742 2743 def test_pool_worker_lifetime(self): 2744 p = multiprocessing.Pool(3, maxtasksperchild=10) 2745 self.assertEqual(3, len(p._pool)) 2746 origworkerpids = [w.pid for w in p._pool] 2747 # Run many tasks so each worker gets replaced (hopefully) 2748 results = [] 2749 for i in range(100): 2750 results.append(p.apply_async(sqr, (i, ))) 2751 # Fetch the results and verify we got the right answers, 2752 # also ensuring all the tasks have completed. 2753 for (j, res) in enumerate(results): 2754 self.assertEqual(res.get(), sqr(j)) 2755 # Refill the pool 2756 p._repopulate_pool() 2757 # Wait until all workers are alive 2758 # (countdown * DELTA = 5 seconds max startup process time) 2759 countdown = 50 2760 while countdown and not all(w.is_alive() for w in p._pool): 2761 countdown -= 1 2762 time.sleep(DELTA) 2763 finalworkerpids = [w.pid for w in p._pool] 2764 # All pids should be assigned. See issue #7805. 2765 self.assertNotIn(None, origworkerpids) 2766 self.assertNotIn(None, finalworkerpids) 2767 # Finally, check that the worker pids have changed 2768 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2769 p.close() 2770 p.join() 2771 2772 def test_pool_worker_lifetime_early_close(self): 2773 # Issue #10332: closing a pool whose workers have limited lifetimes 2774 # before all the tasks completed would make join() hang. 2775 p = multiprocessing.Pool(3, maxtasksperchild=1) 2776 results = [] 2777 for i in range(6): 2778 results.append(p.apply_async(sqr, (i, 0.3))) 2779 p.close() 2780 p.join() 2781 # check the results 2782 for (j, res) in enumerate(results): 2783 self.assertEqual(res.get(), sqr(j)) 2784 2785 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2786 # tests cases against bpo-38744 and bpo-39360 2787 cmd = '''if 1: 2788 from multiprocessing import Pool 2789 problem = None 2790 class A: 2791 def __init__(self): 2792 self.pool = Pool(processes=1) 2793 def test(): 2794 global problem 2795 problem = A() 2796 problem.pool.map(float, tuple(range(10))) 2797 if __name__ == "__main__": 2798 test() 2799 ''' 2800 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2801 self.assertEqual(rc, 0) 2802 2803# 2804# Test of creating a customized manager class 2805# 2806 2807from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2808 2809class FooBar(object): 2810 def f(self): 2811 return 'f()' 2812 def g(self): 2813 raise ValueError 2814 def _h(self): 2815 return '_h()' 2816 2817def baz(): 2818 for i in range(10): 2819 yield i*i 2820 2821class IteratorProxy(BaseProxy): 2822 _exposed_ = ('__next__',) 2823 def __iter__(self): 2824 return self 2825 def __next__(self): 2826 return self._callmethod('__next__') 2827 2828class MyManager(BaseManager): 2829 pass 2830 2831MyManager.register('Foo', callable=FooBar) 2832MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2833MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2834 2835 2836class _TestMyManager(BaseTestCase): 2837 2838 ALLOWED_TYPES = ('manager',) 2839 2840 def test_mymanager(self): 2841 manager = MyManager() 2842 manager.start() 2843 self.common(manager) 2844 manager.shutdown() 2845 2846 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2847 # to the manager process if it takes longer than 1 second to stop, 2848 # which happens on slow buildbots. 2849 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2850 2851 def test_mymanager_context(self): 2852 with MyManager() as manager: 2853 self.common(manager) 2854 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2855 # to the manager process if it takes longer than 1 second to stop, 2856 # which happens on slow buildbots. 2857 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2858 2859 def test_mymanager_context_prestarted(self): 2860 manager = MyManager() 2861 manager.start() 2862 with manager: 2863 self.common(manager) 2864 self.assertEqual(manager._process.exitcode, 0) 2865 2866 def common(self, manager): 2867 foo = manager.Foo() 2868 bar = manager.Bar() 2869 baz = manager.baz() 2870 2871 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2872 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2873 2874 self.assertEqual(foo_methods, ['f', 'g']) 2875 self.assertEqual(bar_methods, ['f', '_h']) 2876 2877 self.assertEqual(foo.f(), 'f()') 2878 self.assertRaises(ValueError, foo.g) 2879 self.assertEqual(foo._callmethod('f'), 'f()') 2880 self.assertRaises(RemoteError, foo._callmethod, '_h') 2881 2882 self.assertEqual(bar.f(), 'f()') 2883 self.assertEqual(bar._h(), '_h()') 2884 self.assertEqual(bar._callmethod('f'), 'f()') 2885 self.assertEqual(bar._callmethod('_h'), '_h()') 2886 2887 self.assertEqual(list(baz), [i*i for i in range(10)]) 2888 2889 2890# 2891# Test of connecting to a remote server and using xmlrpclib for serialization 2892# 2893 2894_queue = pyqueue.Queue() 2895def get_queue(): 2896 return _queue 2897 2898class QueueManager(BaseManager): 2899 '''manager class used by server process''' 2900QueueManager.register('get_queue', callable=get_queue) 2901 2902class QueueManager2(BaseManager): 2903 '''manager class which specifies the same interface as QueueManager''' 2904QueueManager2.register('get_queue') 2905 2906 2907SERIALIZER = 'xmlrpclib' 2908 2909class _TestRemoteManager(BaseTestCase): 2910 2911 ALLOWED_TYPES = ('manager',) 2912 values = ['hello world', None, True, 2.25, 2913 'hall\xe5 v\xe4rlden', 2914 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2915 b'hall\xe5 v\xe4rlden', 2916 ] 2917 result = values[:] 2918 2919 @classmethod 2920 def _putter(cls, address, authkey): 2921 manager = QueueManager2( 2922 address=address, authkey=authkey, serializer=SERIALIZER 2923 ) 2924 manager.connect() 2925 queue = manager.get_queue() 2926 # Note that xmlrpclib will deserialize object as a list not a tuple 2927 queue.put(tuple(cls.values)) 2928 2929 def test_remote(self): 2930 authkey = os.urandom(32) 2931 2932 manager = QueueManager( 2933 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER 2934 ) 2935 manager.start() 2936 self.addCleanup(manager.shutdown) 2937 2938 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2939 p.daemon = True 2940 p.start() 2941 2942 manager2 = QueueManager2( 2943 address=manager.address, authkey=authkey, serializer=SERIALIZER 2944 ) 2945 manager2.connect() 2946 queue = manager2.get_queue() 2947 2948 self.assertEqual(queue.get(), self.result) 2949 2950 # Because we are using xmlrpclib for serialization instead of 2951 # pickle this will cause a serialization error. 2952 self.assertRaises(Exception, queue.put, time.sleep) 2953 2954 # Make queue finalizer run before the server is stopped 2955 del queue 2956 2957 2958@hashlib_helper.requires_hashdigest('md5') 2959class _TestManagerRestart(BaseTestCase): 2960 2961 @classmethod 2962 def _putter(cls, address, authkey): 2963 manager = QueueManager( 2964 address=address, authkey=authkey, serializer=SERIALIZER) 2965 manager.connect() 2966 queue = manager.get_queue() 2967 queue.put('hello world') 2968 2969 def test_rapid_restart(self): 2970 authkey = os.urandom(32) 2971 manager = QueueManager( 2972 address=(socket_helper.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2973 try: 2974 srvr = manager.get_server() 2975 addr = srvr.address 2976 # Close the connection.Listener socket which gets opened as a part 2977 # of manager.get_server(). It's not needed for the test. 2978 srvr.listener.close() 2979 manager.start() 2980 2981 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2982 p.start() 2983 p.join() 2984 queue = manager.get_queue() 2985 self.assertEqual(queue.get(), 'hello world') 2986 del queue 2987 finally: 2988 if hasattr(manager, "shutdown"): 2989 manager.shutdown() 2990 2991 manager = QueueManager( 2992 address=addr, authkey=authkey, serializer=SERIALIZER) 2993 try: 2994 manager.start() 2995 self.addCleanup(manager.shutdown) 2996 except OSError as e: 2997 if e.errno != errno.EADDRINUSE: 2998 raise 2999 # Retry after some time, in case the old socket was lingering 3000 # (sporadic failure on buildbots) 3001 time.sleep(1.0) 3002 manager = QueueManager( 3003 address=addr, authkey=authkey, serializer=SERIALIZER) 3004 if hasattr(manager, "shutdown"): 3005 self.addCleanup(manager.shutdown) 3006 3007# 3008# 3009# 3010 3011SENTINEL = latin('') 3012 3013class _TestConnection(BaseTestCase): 3014 3015 ALLOWED_TYPES = ('processes', 'threads') 3016 3017 @classmethod 3018 def _echo(cls, conn): 3019 for msg in iter(conn.recv_bytes, SENTINEL): 3020 conn.send_bytes(msg) 3021 conn.close() 3022 3023 def test_connection(self): 3024 conn, child_conn = self.Pipe() 3025 3026 p = self.Process(target=self._echo, args=(child_conn,)) 3027 p.daemon = True 3028 p.start() 3029 3030 seq = [1, 2.25, None] 3031 msg = latin('hello world') 3032 longmsg = msg * 10 3033 arr = array.array('i', list(range(4))) 3034 3035 if self.TYPE == 'processes': 3036 self.assertEqual(type(conn.fileno()), int) 3037 3038 self.assertEqual(conn.send(seq), None) 3039 self.assertEqual(conn.recv(), seq) 3040 3041 self.assertEqual(conn.send_bytes(msg), None) 3042 self.assertEqual(conn.recv_bytes(), msg) 3043 3044 if self.TYPE == 'processes': 3045 buffer = array.array('i', [0]*10) 3046 expected = list(arr) + [0] * (10 - len(arr)) 3047 self.assertEqual(conn.send_bytes(arr), None) 3048 self.assertEqual(conn.recv_bytes_into(buffer), 3049 len(arr) * buffer.itemsize) 3050 self.assertEqual(list(buffer), expected) 3051 3052 buffer = array.array('i', [0]*10) 3053 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3054 self.assertEqual(conn.send_bytes(arr), None) 3055 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3056 len(arr) * buffer.itemsize) 3057 self.assertEqual(list(buffer), expected) 3058 3059 buffer = bytearray(latin(' ' * 40)) 3060 self.assertEqual(conn.send_bytes(longmsg), None) 3061 try: 3062 res = conn.recv_bytes_into(buffer) 3063 except multiprocessing.BufferTooShort as e: 3064 self.assertEqual(e.args, (longmsg,)) 3065 else: 3066 self.fail('expected BufferTooShort, got %s' % res) 3067 3068 poll = TimingWrapper(conn.poll) 3069 3070 self.assertEqual(poll(), False) 3071 self.assertTimingAlmostEqual(poll.elapsed, 0) 3072 3073 self.assertEqual(poll(-1), False) 3074 self.assertTimingAlmostEqual(poll.elapsed, 0) 3075 3076 self.assertEqual(poll(TIMEOUT1), False) 3077 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3078 3079 conn.send(None) 3080 time.sleep(.1) 3081 3082 self.assertEqual(poll(TIMEOUT1), True) 3083 self.assertTimingAlmostEqual(poll.elapsed, 0) 3084 3085 self.assertEqual(conn.recv(), None) 3086 3087 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3088 conn.send_bytes(really_big_msg) 3089 self.assertEqual(conn.recv_bytes(), really_big_msg) 3090 3091 conn.send_bytes(SENTINEL) # tell child to quit 3092 child_conn.close() 3093 3094 if self.TYPE == 'processes': 3095 self.assertEqual(conn.readable, True) 3096 self.assertEqual(conn.writable, True) 3097 self.assertRaises(EOFError, conn.recv) 3098 self.assertRaises(EOFError, conn.recv_bytes) 3099 3100 p.join() 3101 3102 def test_duplex_false(self): 3103 reader, writer = self.Pipe(duplex=False) 3104 self.assertEqual(writer.send(1), None) 3105 self.assertEqual(reader.recv(), 1) 3106 if self.TYPE == 'processes': 3107 self.assertEqual(reader.readable, True) 3108 self.assertEqual(reader.writable, False) 3109 self.assertEqual(writer.readable, False) 3110 self.assertEqual(writer.writable, True) 3111 self.assertRaises(OSError, reader.send, 2) 3112 self.assertRaises(OSError, writer.recv) 3113 self.assertRaises(OSError, writer.poll) 3114 3115 def test_spawn_close(self): 3116 # We test that a pipe connection can be closed by parent 3117 # process immediately after child is spawned. On Windows this 3118 # would have sometimes failed on old versions because 3119 # child_conn would be closed before the child got a chance to 3120 # duplicate it. 3121 conn, child_conn = self.Pipe() 3122 3123 p = self.Process(target=self._echo, args=(child_conn,)) 3124 p.daemon = True 3125 p.start() 3126 child_conn.close() # this might complete before child initializes 3127 3128 msg = latin('hello') 3129 conn.send_bytes(msg) 3130 self.assertEqual(conn.recv_bytes(), msg) 3131 3132 conn.send_bytes(SENTINEL) 3133 conn.close() 3134 p.join() 3135 3136 def test_sendbytes(self): 3137 if self.TYPE != 'processes': 3138 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3139 3140 msg = latin('abcdefghijklmnopqrstuvwxyz') 3141 a, b = self.Pipe() 3142 3143 a.send_bytes(msg) 3144 self.assertEqual(b.recv_bytes(), msg) 3145 3146 a.send_bytes(msg, 5) 3147 self.assertEqual(b.recv_bytes(), msg[5:]) 3148 3149 a.send_bytes(msg, 7, 8) 3150 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3151 3152 a.send_bytes(msg, 26) 3153 self.assertEqual(b.recv_bytes(), latin('')) 3154 3155 a.send_bytes(msg, 26, 0) 3156 self.assertEqual(b.recv_bytes(), latin('')) 3157 3158 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3159 3160 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3161 3162 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3163 3164 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3165 3166 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3167 3168 @classmethod 3169 def _is_fd_assigned(cls, fd): 3170 try: 3171 os.fstat(fd) 3172 except OSError as e: 3173 if e.errno == errno.EBADF: 3174 return False 3175 raise 3176 else: 3177 return True 3178 3179 @classmethod 3180 def _writefd(cls, conn, data, create_dummy_fds=False): 3181 if create_dummy_fds: 3182 for i in range(0, 256): 3183 if not cls._is_fd_assigned(i): 3184 os.dup2(conn.fileno(), i) 3185 fd = reduction.recv_handle(conn) 3186 if msvcrt: 3187 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3188 os.write(fd, data) 3189 os.close(fd) 3190 3191 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3192 def test_fd_transfer(self): 3193 if self.TYPE != 'processes': 3194 self.skipTest("only makes sense with processes") 3195 conn, child_conn = self.Pipe(duplex=True) 3196 3197 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3198 p.daemon = True 3199 p.start() 3200 self.addCleanup(test.support.unlink, test.support.TESTFN) 3201 with open(test.support.TESTFN, "wb") as f: 3202 fd = f.fileno() 3203 if msvcrt: 3204 fd = msvcrt.get_osfhandle(fd) 3205 reduction.send_handle(conn, fd, p.pid) 3206 p.join() 3207 with open(test.support.TESTFN, "rb") as f: 3208 self.assertEqual(f.read(), b"foo") 3209 3210 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3211 @unittest.skipIf(sys.platform == "win32", 3212 "test semantics don't make sense on Windows") 3213 @unittest.skipIf(MAXFD <= 256, 3214 "largest assignable fd number is too small") 3215 @unittest.skipUnless(hasattr(os, "dup2"), 3216 "test needs os.dup2()") 3217 def test_large_fd_transfer(self): 3218 # With fd > 256 (issue #11657) 3219 if self.TYPE != 'processes': 3220 self.skipTest("only makes sense with processes") 3221 conn, child_conn = self.Pipe(duplex=True) 3222 3223 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3224 p.daemon = True 3225 p.start() 3226 self.addCleanup(test.support.unlink, test.support.TESTFN) 3227 with open(test.support.TESTFN, "wb") as f: 3228 fd = f.fileno() 3229 for newfd in range(256, MAXFD): 3230 if not self._is_fd_assigned(newfd): 3231 break 3232 else: 3233 self.fail("could not find an unassigned large file descriptor") 3234 os.dup2(fd, newfd) 3235 try: 3236 reduction.send_handle(conn, newfd, p.pid) 3237 finally: 3238 os.close(newfd) 3239 p.join() 3240 with open(test.support.TESTFN, "rb") as f: 3241 self.assertEqual(f.read(), b"bar") 3242 3243 @classmethod 3244 def _send_data_without_fd(self, conn): 3245 os.write(conn.fileno(), b"\0") 3246 3247 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3248 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3249 def test_missing_fd_transfer(self): 3250 # Check that exception is raised when received data is not 3251 # accompanied by a file descriptor in ancillary data. 3252 if self.TYPE != 'processes': 3253 self.skipTest("only makes sense with processes") 3254 conn, child_conn = self.Pipe(duplex=True) 3255 3256 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3257 p.daemon = True 3258 p.start() 3259 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3260 p.join() 3261 3262 def test_context(self): 3263 a, b = self.Pipe() 3264 3265 with a, b: 3266 a.send(1729) 3267 self.assertEqual(b.recv(), 1729) 3268 if self.TYPE == 'processes': 3269 self.assertFalse(a.closed) 3270 self.assertFalse(b.closed) 3271 3272 if self.TYPE == 'processes': 3273 self.assertTrue(a.closed) 3274 self.assertTrue(b.closed) 3275 self.assertRaises(OSError, a.recv) 3276 self.assertRaises(OSError, b.recv) 3277 3278class _TestListener(BaseTestCase): 3279 3280 ALLOWED_TYPES = ('processes',) 3281 3282 def test_multiple_bind(self): 3283 for family in self.connection.families: 3284 l = self.connection.Listener(family=family) 3285 self.addCleanup(l.close) 3286 self.assertRaises(OSError, self.connection.Listener, 3287 l.address, family) 3288 3289 def test_context(self): 3290 with self.connection.Listener() as l: 3291 with self.connection.Client(l.address) as c: 3292 with l.accept() as d: 3293 c.send(1729) 3294 self.assertEqual(d.recv(), 1729) 3295 3296 if self.TYPE == 'processes': 3297 self.assertRaises(OSError, l.accept) 3298 3299 @unittest.skipUnless(util.abstract_sockets_supported, 3300 "test needs abstract socket support") 3301 def test_abstract_socket(self): 3302 with self.connection.Listener("\0something") as listener: 3303 with self.connection.Client(listener.address) as client: 3304 with listener.accept() as d: 3305 client.send(1729) 3306 self.assertEqual(d.recv(), 1729) 3307 3308 if self.TYPE == 'processes': 3309 self.assertRaises(OSError, listener.accept) 3310 3311 3312class _TestListenerClient(BaseTestCase): 3313 3314 ALLOWED_TYPES = ('processes', 'threads') 3315 3316 @classmethod 3317 def _test(cls, address): 3318 conn = cls.connection.Client(address) 3319 conn.send('hello') 3320 conn.close() 3321 3322 def test_listener_client(self): 3323 for family in self.connection.families: 3324 l = self.connection.Listener(family=family) 3325 p = self.Process(target=self._test, args=(l.address,)) 3326 p.daemon = True 3327 p.start() 3328 conn = l.accept() 3329 self.assertEqual(conn.recv(), 'hello') 3330 p.join() 3331 l.close() 3332 3333 def test_issue14725(self): 3334 l = self.connection.Listener() 3335 p = self.Process(target=self._test, args=(l.address,)) 3336 p.daemon = True 3337 p.start() 3338 time.sleep(1) 3339 # On Windows the client process should by now have connected, 3340 # written data and closed the pipe handle by now. This causes 3341 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3342 # 14725. 3343 conn = l.accept() 3344 self.assertEqual(conn.recv(), 'hello') 3345 conn.close() 3346 p.join() 3347 l.close() 3348 3349 def test_issue16955(self): 3350 for fam in self.connection.families: 3351 l = self.connection.Listener(family=fam) 3352 c = self.connection.Client(l.address) 3353 a = l.accept() 3354 a.send_bytes(b"hello") 3355 self.assertTrue(c.poll(1)) 3356 a.close() 3357 c.close() 3358 l.close() 3359 3360class _TestPoll(BaseTestCase): 3361 3362 ALLOWED_TYPES = ('processes', 'threads') 3363 3364 def test_empty_string(self): 3365 a, b = self.Pipe() 3366 self.assertEqual(a.poll(), False) 3367 b.send_bytes(b'') 3368 self.assertEqual(a.poll(), True) 3369 self.assertEqual(a.poll(), True) 3370 3371 @classmethod 3372 def _child_strings(cls, conn, strings): 3373 for s in strings: 3374 time.sleep(0.1) 3375 conn.send_bytes(s) 3376 conn.close() 3377 3378 def test_strings(self): 3379 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3380 a, b = self.Pipe() 3381 p = self.Process(target=self._child_strings, args=(b, strings)) 3382 p.start() 3383 3384 for s in strings: 3385 for i in range(200): 3386 if a.poll(0.01): 3387 break 3388 x = a.recv_bytes() 3389 self.assertEqual(s, x) 3390 3391 p.join() 3392 3393 @classmethod 3394 def _child_boundaries(cls, r): 3395 # Polling may "pull" a message in to the child process, but we 3396 # don't want it to pull only part of a message, as that would 3397 # corrupt the pipe for any other processes which might later 3398 # read from it. 3399 r.poll(5) 3400 3401 def test_boundaries(self): 3402 r, w = self.Pipe(False) 3403 p = self.Process(target=self._child_boundaries, args=(r,)) 3404 p.start() 3405 time.sleep(2) 3406 L = [b"first", b"second"] 3407 for obj in L: 3408 w.send_bytes(obj) 3409 w.close() 3410 p.join() 3411 self.assertIn(r.recv_bytes(), L) 3412 3413 @classmethod 3414 def _child_dont_merge(cls, b): 3415 b.send_bytes(b'a') 3416 b.send_bytes(b'b') 3417 b.send_bytes(b'cd') 3418 3419 def test_dont_merge(self): 3420 a, b = self.Pipe() 3421 self.assertEqual(a.poll(0.0), False) 3422 self.assertEqual(a.poll(0.1), False) 3423 3424 p = self.Process(target=self._child_dont_merge, args=(b,)) 3425 p.start() 3426 3427 self.assertEqual(a.recv_bytes(), b'a') 3428 self.assertEqual(a.poll(1.0), True) 3429 self.assertEqual(a.poll(1.0), True) 3430 self.assertEqual(a.recv_bytes(), b'b') 3431 self.assertEqual(a.poll(1.0), True) 3432 self.assertEqual(a.poll(1.0), True) 3433 self.assertEqual(a.poll(0.0), True) 3434 self.assertEqual(a.recv_bytes(), b'cd') 3435 3436 p.join() 3437 3438# 3439# Test of sending connection and socket objects between processes 3440# 3441 3442@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3443@hashlib_helper.requires_hashdigest('md5') 3444class _TestPicklingConnections(BaseTestCase): 3445 3446 ALLOWED_TYPES = ('processes',) 3447 3448 @classmethod 3449 def tearDownClass(cls): 3450 from multiprocessing import resource_sharer 3451 resource_sharer.stop(timeout=support.LONG_TIMEOUT) 3452 3453 @classmethod 3454 def _listener(cls, conn, families): 3455 for fam in families: 3456 l = cls.connection.Listener(family=fam) 3457 conn.send(l.address) 3458 new_conn = l.accept() 3459 conn.send(new_conn) 3460 new_conn.close() 3461 l.close() 3462 3463 l = socket.create_server((socket_helper.HOST, 0)) 3464 conn.send(l.getsockname()) 3465 new_conn, addr = l.accept() 3466 conn.send(new_conn) 3467 new_conn.close() 3468 l.close() 3469 3470 conn.recv() 3471 3472 @classmethod 3473 def _remote(cls, conn): 3474 for (address, msg) in iter(conn.recv, None): 3475 client = cls.connection.Client(address) 3476 client.send(msg.upper()) 3477 client.close() 3478 3479 address, msg = conn.recv() 3480 client = socket.socket() 3481 client.connect(address) 3482 client.sendall(msg.upper()) 3483 client.close() 3484 3485 conn.close() 3486 3487 def test_pickling(self): 3488 families = self.connection.families 3489 3490 lconn, lconn0 = self.Pipe() 3491 lp = self.Process(target=self._listener, args=(lconn0, families)) 3492 lp.daemon = True 3493 lp.start() 3494 lconn0.close() 3495 3496 rconn, rconn0 = self.Pipe() 3497 rp = self.Process(target=self._remote, args=(rconn0,)) 3498 rp.daemon = True 3499 rp.start() 3500 rconn0.close() 3501 3502 for fam in families: 3503 msg = ('This connection uses family %s' % fam).encode('ascii') 3504 address = lconn.recv() 3505 rconn.send((address, msg)) 3506 new_conn = lconn.recv() 3507 self.assertEqual(new_conn.recv(), msg.upper()) 3508 3509 rconn.send(None) 3510 3511 msg = latin('This connection uses a normal socket') 3512 address = lconn.recv() 3513 rconn.send((address, msg)) 3514 new_conn = lconn.recv() 3515 buf = [] 3516 while True: 3517 s = new_conn.recv(100) 3518 if not s: 3519 break 3520 buf.append(s) 3521 buf = b''.join(buf) 3522 self.assertEqual(buf, msg.upper()) 3523 new_conn.close() 3524 3525 lconn.send(None) 3526 3527 rconn.close() 3528 lconn.close() 3529 3530 lp.join() 3531 rp.join() 3532 3533 @classmethod 3534 def child_access(cls, conn): 3535 w = conn.recv() 3536 w.send('all is well') 3537 w.close() 3538 3539 r = conn.recv() 3540 msg = r.recv() 3541 conn.send(msg*2) 3542 3543 conn.close() 3544 3545 def test_access(self): 3546 # On Windows, if we do not specify a destination pid when 3547 # using DupHandle then we need to be careful to use the 3548 # correct access flags for DuplicateHandle(), or else 3549 # DupHandle.detach() will raise PermissionError. For example, 3550 # for a read only pipe handle we should use 3551 # access=FILE_GENERIC_READ. (Unfortunately 3552 # DUPLICATE_SAME_ACCESS does not work.) 3553 conn, child_conn = self.Pipe() 3554 p = self.Process(target=self.child_access, args=(child_conn,)) 3555 p.daemon = True 3556 p.start() 3557 child_conn.close() 3558 3559 r, w = self.Pipe(duplex=False) 3560 conn.send(w) 3561 w.close() 3562 self.assertEqual(r.recv(), 'all is well') 3563 r.close() 3564 3565 r, w = self.Pipe(duplex=False) 3566 conn.send(r) 3567 r.close() 3568 w.send('foobar') 3569 w.close() 3570 self.assertEqual(conn.recv(), 'foobar'*2) 3571 3572 p.join() 3573 3574# 3575# 3576# 3577 3578class _TestHeap(BaseTestCase): 3579 3580 ALLOWED_TYPES = ('processes',) 3581 3582 def setUp(self): 3583 super().setUp() 3584 # Make pristine heap for these tests 3585 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3586 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3587 3588 def tearDown(self): 3589 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3590 super().tearDown() 3591 3592 def test_heap(self): 3593 iterations = 5000 3594 maxblocks = 50 3595 blocks = [] 3596 3597 # get the heap object 3598 heap = multiprocessing.heap.BufferWrapper._heap 3599 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3600 3601 # create and destroy lots of blocks of different sizes 3602 for i in range(iterations): 3603 size = int(random.lognormvariate(0, 1) * 1000) 3604 b = multiprocessing.heap.BufferWrapper(size) 3605 blocks.append(b) 3606 if len(blocks) > maxblocks: 3607 i = random.randrange(maxblocks) 3608 del blocks[i] 3609 del b 3610 3611 # verify the state of the heap 3612 with heap._lock: 3613 all = [] 3614 free = 0 3615 occupied = 0 3616 for L in list(heap._len_to_seq.values()): 3617 # count all free blocks in arenas 3618 for arena, start, stop in L: 3619 all.append((heap._arenas.index(arena), start, stop, 3620 stop-start, 'free')) 3621 free += (stop-start) 3622 for arena, arena_blocks in heap._allocated_blocks.items(): 3623 # count all allocated blocks in arenas 3624 for start, stop in arena_blocks: 3625 all.append((heap._arenas.index(arena), start, stop, 3626 stop-start, 'occupied')) 3627 occupied += (stop-start) 3628 3629 self.assertEqual(free + occupied, 3630 sum(arena.size for arena in heap._arenas)) 3631 3632 all.sort() 3633 3634 for i in range(len(all)-1): 3635 (arena, start, stop) = all[i][:3] 3636 (narena, nstart, nstop) = all[i+1][:3] 3637 if arena != narena: 3638 # Two different arenas 3639 self.assertEqual(stop, heap._arenas[arena].size) # last block 3640 self.assertEqual(nstart, 0) # first block 3641 else: 3642 # Same arena: two adjacent blocks 3643 self.assertEqual(stop, nstart) 3644 3645 # test free'ing all blocks 3646 random.shuffle(blocks) 3647 while blocks: 3648 blocks.pop() 3649 3650 self.assertEqual(heap._n_frees, heap._n_mallocs) 3651 self.assertEqual(len(heap._pending_free_blocks), 0) 3652 self.assertEqual(len(heap._arenas), 0) 3653 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3654 self.assertEqual(len(heap._len_to_seq), 0) 3655 3656 def test_free_from_gc(self): 3657 # Check that freeing of blocks by the garbage collector doesn't deadlock 3658 # (issue #12352). 3659 # Make sure the GC is enabled, and set lower collection thresholds to 3660 # make collections more frequent (and increase the probability of 3661 # deadlock). 3662 if not gc.isenabled(): 3663 gc.enable() 3664 self.addCleanup(gc.disable) 3665 thresholds = gc.get_threshold() 3666 self.addCleanup(gc.set_threshold, *thresholds) 3667 gc.set_threshold(10) 3668 3669 # perform numerous block allocations, with cyclic references to make 3670 # sure objects are collected asynchronously by the gc 3671 for i in range(5000): 3672 a = multiprocessing.heap.BufferWrapper(1) 3673 b = multiprocessing.heap.BufferWrapper(1) 3674 # circular references 3675 a.buddy = b 3676 b.buddy = a 3677 3678# 3679# 3680# 3681 3682class _Foo(Structure): 3683 _fields_ = [ 3684 ('x', c_int), 3685 ('y', c_double), 3686 ('z', c_longlong,) 3687 ] 3688 3689class _TestSharedCTypes(BaseTestCase): 3690 3691 ALLOWED_TYPES = ('processes',) 3692 3693 def setUp(self): 3694 if not HAS_SHAREDCTYPES: 3695 self.skipTest("requires multiprocessing.sharedctypes") 3696 3697 @classmethod 3698 def _double(cls, x, y, z, foo, arr, string): 3699 x.value *= 2 3700 y.value *= 2 3701 z.value *= 2 3702 foo.x *= 2 3703 foo.y *= 2 3704 string.value *= 2 3705 for i in range(len(arr)): 3706 arr[i] *= 2 3707 3708 def test_sharedctypes(self, lock=False): 3709 x = Value('i', 7, lock=lock) 3710 y = Value(c_double, 1.0/3.0, lock=lock) 3711 z = Value(c_longlong, 2 ** 33, lock=lock) 3712 foo = Value(_Foo, 3, 2, lock=lock) 3713 arr = self.Array('d', list(range(10)), lock=lock) 3714 string = self.Array('c', 20, lock=lock) 3715 string.value = latin('hello') 3716 3717 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3718 p.daemon = True 3719 p.start() 3720 p.join() 3721 3722 self.assertEqual(x.value, 14) 3723 self.assertAlmostEqual(y.value, 2.0/3.0) 3724 self.assertEqual(z.value, 2 ** 34) 3725 self.assertEqual(foo.x, 6) 3726 self.assertAlmostEqual(foo.y, 4.0) 3727 for i in range(10): 3728 self.assertAlmostEqual(arr[i], i*2) 3729 self.assertEqual(string.value, latin('hellohello')) 3730 3731 def test_synchronize(self): 3732 self.test_sharedctypes(lock=True) 3733 3734 def test_copy(self): 3735 foo = _Foo(2, 5.0, 2 ** 33) 3736 bar = copy(foo) 3737 foo.x = 0 3738 foo.y = 0 3739 foo.z = 0 3740 self.assertEqual(bar.x, 2) 3741 self.assertAlmostEqual(bar.y, 5.0) 3742 self.assertEqual(bar.z, 2 ** 33) 3743 3744 3745@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3746@hashlib_helper.requires_hashdigest('md5') 3747class _TestSharedMemory(BaseTestCase): 3748 3749 ALLOWED_TYPES = ('processes',) 3750 3751 @staticmethod 3752 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3753 if isinstance(shmem_name_or_obj, str): 3754 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3755 else: 3756 local_sms = shmem_name_or_obj 3757 local_sms.buf[:len(binary_data)] = binary_data 3758 local_sms.close() 3759 3760 def test_shared_memory_basics(self): 3761 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512) 3762 self.addCleanup(sms.unlink) 3763 3764 # Verify attributes are readable. 3765 self.assertEqual(sms.name, 'test01_tsmb') 3766 self.assertGreaterEqual(sms.size, 512) 3767 self.assertGreaterEqual(len(sms.buf), sms.size) 3768 3769 # Modify contents of shared memory segment through memoryview. 3770 sms.buf[0] = 42 3771 self.assertEqual(sms.buf[0], 42) 3772 3773 # Attach to existing shared memory segment. 3774 also_sms = shared_memory.SharedMemory('test01_tsmb') 3775 self.assertEqual(also_sms.buf[0], 42) 3776 also_sms.close() 3777 3778 # Attach to existing shared memory segment but specify a new size. 3779 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size) 3780 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3781 same_sms.close() 3782 3783 # Creating Shared Memory Segment with -ve size 3784 with self.assertRaises(ValueError): 3785 shared_memory.SharedMemory(create=True, size=-2) 3786 3787 # Attaching Shared Memory Segment without a name 3788 with self.assertRaises(ValueError): 3789 shared_memory.SharedMemory(create=False) 3790 3791 # Test if shared memory segment is created properly, 3792 # when _make_filename returns an existing shared memory segment name 3793 with unittest.mock.patch( 3794 'multiprocessing.shared_memory._make_filename') as mock_make_filename: 3795 3796 NAME_PREFIX = shared_memory._SHM_NAME_PREFIX 3797 names = ['test01_fn', 'test02_fn'] 3798 # Prepend NAME_PREFIX which can be '/psm_' or 'wnsm_', necessary 3799 # because some POSIX compliant systems require name to start with / 3800 names = [NAME_PREFIX + name for name in names] 3801 3802 mock_make_filename.side_effect = names 3803 shm1 = shared_memory.SharedMemory(create=True, size=1) 3804 self.addCleanup(shm1.unlink) 3805 self.assertEqual(shm1._name, names[0]) 3806 3807 mock_make_filename.side_effect = names 3808 shm2 = shared_memory.SharedMemory(create=True, size=1) 3809 self.addCleanup(shm2.unlink) 3810 self.assertEqual(shm2._name, names[1]) 3811 3812 if shared_memory._USE_POSIX: 3813 # Posix Shared Memory can only be unlinked once. Here we 3814 # test an implementation detail that is not observed across 3815 # all supported platforms (since WindowsNamedSharedMemory 3816 # manages unlinking on its own and unlink() does nothing). 3817 # True release of shared memory segment does not necessarily 3818 # happen until process exits, depending on the OS platform. 3819 with self.assertRaises(FileNotFoundError): 3820 sms_uno = shared_memory.SharedMemory( 3821 'test01_dblunlink', 3822 create=True, 3823 size=5000 3824 ) 3825 3826 try: 3827 self.assertGreaterEqual(sms_uno.size, 5000) 3828 3829 sms_duo = shared_memory.SharedMemory('test01_dblunlink') 3830 sms_duo.unlink() # First shm_unlink() call. 3831 sms_duo.close() 3832 sms_uno.close() 3833 3834 finally: 3835 sms_uno.unlink() # A second shm_unlink() call is bad. 3836 3837 with self.assertRaises(FileExistsError): 3838 # Attempting to create a new shared memory segment with a 3839 # name that is already in use triggers an exception. 3840 there_can_only_be_one_sms = shared_memory.SharedMemory( 3841 'test01_tsmb', 3842 create=True, 3843 size=512 3844 ) 3845 3846 if shared_memory._USE_POSIX: 3847 # Requesting creation of a shared memory segment with the option 3848 # to attach to an existing segment, if that name is currently in 3849 # use, should not trigger an exception. 3850 # Note: Using a smaller size could possibly cause truncation of 3851 # the existing segment but is OS platform dependent. In the 3852 # case of MacOS/darwin, requesting a smaller size is disallowed. 3853 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3854 _flags = os.O_CREAT | os.O_RDWR 3855 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb') 3856 self.assertEqual(ok_if_exists_sms.size, sms.size) 3857 ok_if_exists_sms.close() 3858 3859 # Attempting to attach to an existing shared memory segment when 3860 # no segment exists with the supplied name triggers an exception. 3861 with self.assertRaises(FileNotFoundError): 3862 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3863 nonexisting_sms.unlink() # Error should occur on prior line. 3864 3865 sms.close() 3866 3867 # Test creating a shared memory segment with negative size 3868 with self.assertRaises(ValueError): 3869 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 3870 3871 # Test creating a shared memory segment with size 0 3872 with self.assertRaises(ValueError): 3873 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 3874 3875 # Test creating a shared memory segment without size argument 3876 with self.assertRaises(ValueError): 3877 sms_invalid = shared_memory.SharedMemory(create=True) 3878 3879 def test_shared_memory_across_processes(self): 3880 # bpo-40135: don't define shared memory block's name in case of 3881 # the failure when we run multiprocessing tests in parallel. 3882 sms = shared_memory.SharedMemory(create=True, size=512) 3883 self.addCleanup(sms.unlink) 3884 3885 # Verify remote attachment to existing block by name is working. 3886 p = self.Process( 3887 target=self._attach_existing_shmem_then_write, 3888 args=(sms.name, b'howdy') 3889 ) 3890 p.daemon = True 3891 p.start() 3892 p.join() 3893 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3894 3895 # Verify pickling of SharedMemory instance also works. 3896 p = self.Process( 3897 target=self._attach_existing_shmem_then_write, 3898 args=(sms, b'HELLO') 3899 ) 3900 p.daemon = True 3901 p.start() 3902 p.join() 3903 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 3904 3905 sms.close() 3906 3907 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 3908 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 3909 # bpo-36368: protect SharedMemoryManager server process from 3910 # KeyboardInterrupt signals. 3911 smm = multiprocessing.managers.SharedMemoryManager() 3912 smm.start() 3913 3914 # make sure the manager works properly at the beginning 3915 sl = smm.ShareableList(range(10)) 3916 3917 # the manager's server should ignore KeyboardInterrupt signals, and 3918 # maintain its connection with the current process, and success when 3919 # asked to deliver memory segments. 3920 os.kill(smm._process.pid, signal.SIGINT) 3921 3922 sl2 = smm.ShareableList(range(10)) 3923 3924 # test that the custom signal handler registered in the Manager does 3925 # not affect signal handling in the parent process. 3926 with self.assertRaises(KeyboardInterrupt): 3927 os.kill(os.getpid(), signal.SIGINT) 3928 3929 smm.shutdown() 3930 3931 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 3932 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 3933 # bpo-36867: test that a SharedMemoryManager uses the 3934 # same resource_tracker process as its parent. 3935 cmd = '''if 1: 3936 from multiprocessing.managers import SharedMemoryManager 3937 3938 3939 smm = SharedMemoryManager() 3940 smm.start() 3941 sl = smm.ShareableList(range(10)) 3942 smm.shutdown() 3943 ''' 3944 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 3945 3946 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 3947 # resource_tracker process as its parent would make the parent's 3948 # tracker complain about sl being leaked even though smm.shutdown() 3949 # properly released sl. 3950 self.assertFalse(err) 3951 3952 def test_shared_memory_SharedMemoryManager_basics(self): 3953 smm1 = multiprocessing.managers.SharedMemoryManager() 3954 with self.assertRaises(ValueError): 3955 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 3956 smm1.start() 3957 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 3958 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 3959 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 3960 self.assertEqual(len(doppleganger_list0), 5) 3961 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 3962 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 3963 held_name = lom[0].name 3964 smm1.shutdown() 3965 if sys.platform != "win32": 3966 # Calls to unlink() have no effect on Windows platform; shared 3967 # memory will only be released once final process exits. 3968 with self.assertRaises(FileNotFoundError): 3969 # No longer there to be attached to again. 3970 absent_shm = shared_memory.SharedMemory(name=held_name) 3971 3972 with multiprocessing.managers.SharedMemoryManager() as smm2: 3973 sl = smm2.ShareableList("howdy") 3974 shm = smm2.SharedMemory(size=128) 3975 held_name = sl.shm.name 3976 if sys.platform != "win32": 3977 with self.assertRaises(FileNotFoundError): 3978 # No longer there to be attached to again. 3979 absent_sl = shared_memory.ShareableList(name=held_name) 3980 3981 3982 def test_shared_memory_ShareableList_basics(self): 3983 sl = shared_memory.ShareableList( 3984 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 3985 ) 3986 self.addCleanup(sl.shm.unlink) 3987 3988 # Verify attributes are readable. 3989 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 3990 3991 # Exercise len(). 3992 self.assertEqual(len(sl), 7) 3993 3994 # Exercise index(). 3995 with warnings.catch_warnings(): 3996 # Suppress BytesWarning when comparing against b'HoWdY'. 3997 warnings.simplefilter('ignore') 3998 with self.assertRaises(ValueError): 3999 sl.index('100') 4000 self.assertEqual(sl.index(100), 3) 4001 4002 # Exercise retrieving individual values. 4003 self.assertEqual(sl[0], 'howdy') 4004 self.assertEqual(sl[-2], True) 4005 4006 # Exercise iterability. 4007 self.assertEqual( 4008 tuple(sl), 4009 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 4010 ) 4011 4012 # Exercise modifying individual values. 4013 sl[3] = 42 4014 self.assertEqual(sl[3], 42) 4015 sl[4] = 'some' # Change type at a given position. 4016 self.assertEqual(sl[4], 'some') 4017 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 4018 with self.assertRaisesRegex(ValueError, 4019 "exceeds available storage"): 4020 sl[4] = 'far too many' 4021 self.assertEqual(sl[4], 'some') 4022 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 4023 self.assertEqual(sl[0], 'encodés') 4024 self.assertEqual(sl[1], b'HoWdY') # no spillage 4025 with self.assertRaisesRegex(ValueError, 4026 "exceeds available storage"): 4027 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 4028 self.assertEqual(sl[1], b'HoWdY') 4029 with self.assertRaisesRegex(ValueError, 4030 "exceeds available storage"): 4031 sl[1] = b'123456789' 4032 self.assertEqual(sl[1], b'HoWdY') 4033 4034 # Exercise count(). 4035 with warnings.catch_warnings(): 4036 # Suppress BytesWarning when comparing against b'HoWdY'. 4037 warnings.simplefilter('ignore') 4038 self.assertEqual(sl.count(42), 2) 4039 self.assertEqual(sl.count(b'HoWdY'), 1) 4040 self.assertEqual(sl.count(b'adios'), 0) 4041 4042 # Exercise creating a duplicate. 4043 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate') 4044 try: 4045 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4046 self.assertEqual('test03_duplicate', sl_copy.shm.name) 4047 self.assertEqual(list(sl), list(sl_copy)) 4048 self.assertEqual(sl.format, sl_copy.format) 4049 sl_copy[-1] = 77 4050 self.assertEqual(sl_copy[-1], 77) 4051 self.assertNotEqual(sl[-1], 77) 4052 sl_copy.shm.close() 4053 finally: 4054 sl_copy.shm.unlink() 4055 4056 # Obtain a second handle on the same ShareableList. 4057 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4058 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4059 sl_tethered[-1] = 880 4060 self.assertEqual(sl[-1], 880) 4061 sl_tethered.shm.close() 4062 4063 sl.shm.close() 4064 4065 # Exercise creating an empty ShareableList. 4066 empty_sl = shared_memory.ShareableList() 4067 try: 4068 self.assertEqual(len(empty_sl), 0) 4069 self.assertEqual(empty_sl.format, '') 4070 self.assertEqual(empty_sl.count('any'), 0) 4071 with self.assertRaises(ValueError): 4072 empty_sl.index(None) 4073 empty_sl.shm.close() 4074 finally: 4075 empty_sl.shm.unlink() 4076 4077 def test_shared_memory_ShareableList_pickling(self): 4078 sl = shared_memory.ShareableList(range(10)) 4079 self.addCleanup(sl.shm.unlink) 4080 4081 serialized_sl = pickle.dumps(sl) 4082 deserialized_sl = pickle.loads(serialized_sl) 4083 self.assertTrue( 4084 isinstance(deserialized_sl, shared_memory.ShareableList) 4085 ) 4086 self.assertTrue(deserialized_sl[-1], 9) 4087 self.assertFalse(sl is deserialized_sl) 4088 deserialized_sl[4] = "changed" 4089 self.assertEqual(sl[4], "changed") 4090 4091 # Verify data is not being put into the pickled representation. 4092 name = 'a' * len(sl.shm.name) 4093 larger_sl = shared_memory.ShareableList(range(400)) 4094 self.addCleanup(larger_sl.shm.unlink) 4095 serialized_larger_sl = pickle.dumps(larger_sl) 4096 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl)) 4097 larger_sl.shm.close() 4098 4099 deserialized_sl.shm.close() 4100 sl.shm.close() 4101 4102 def test_shared_memory_cleaned_after_process_termination(self): 4103 cmd = '''if 1: 4104 import os, time, sys 4105 from multiprocessing import shared_memory 4106 4107 # Create a shared_memory segment, and send the segment name 4108 sm = shared_memory.SharedMemory(create=True, size=10) 4109 sys.stdout.write(sm.name + '\\n') 4110 sys.stdout.flush() 4111 time.sleep(100) 4112 ''' 4113 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4114 stdout=subprocess.PIPE, 4115 stderr=subprocess.PIPE) as p: 4116 name = p.stdout.readline().strip().decode() 4117 4118 # killing abruptly processes holding reference to a shared memory 4119 # segment should not leak the given memory segment. 4120 p.terminate() 4121 p.wait() 4122 4123 deadline = time.monotonic() + support.LONG_TIMEOUT 4124 t = 0.1 4125 while time.monotonic() < deadline: 4126 time.sleep(t) 4127 t = min(t*2, 5) 4128 try: 4129 smm = shared_memory.SharedMemory(name, create=False) 4130 except FileNotFoundError: 4131 break 4132 else: 4133 raise AssertionError("A SharedMemory segment was leaked after" 4134 " a process was abruptly terminated.") 4135 4136 if os.name == 'posix': 4137 # A warning was emitted by the subprocess' own 4138 # resource_tracker (on Windows, shared memory segments 4139 # are released automatically by the OS). 4140 err = p.stderr.read().decode() 4141 self.assertIn( 4142 "resource_tracker: There appear to be 1 leaked " 4143 "shared_memory objects to clean up at shutdown", err) 4144 4145# 4146# 4147# 4148 4149class _TestFinalize(BaseTestCase): 4150 4151 ALLOWED_TYPES = ('processes',) 4152 4153 def setUp(self): 4154 self.registry_backup = util._finalizer_registry.copy() 4155 util._finalizer_registry.clear() 4156 4157 def tearDown(self): 4158 self.assertFalse(util._finalizer_registry) 4159 util._finalizer_registry.update(self.registry_backup) 4160 4161 @classmethod 4162 def _test_finalize(cls, conn): 4163 class Foo(object): 4164 pass 4165 4166 a = Foo() 4167 util.Finalize(a, conn.send, args=('a',)) 4168 del a # triggers callback for a 4169 4170 b = Foo() 4171 close_b = util.Finalize(b, conn.send, args=('b',)) 4172 close_b() # triggers callback for b 4173 close_b() # does nothing because callback has already been called 4174 del b # does nothing because callback has already been called 4175 4176 c = Foo() 4177 util.Finalize(c, conn.send, args=('c',)) 4178 4179 d10 = Foo() 4180 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4181 4182 d01 = Foo() 4183 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4184 d02 = Foo() 4185 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4186 d03 = Foo() 4187 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4188 4189 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4190 4191 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4192 4193 # call multiprocessing's cleanup function then exit process without 4194 # garbage collecting locals 4195 util._exit_function() 4196 conn.close() 4197 os._exit(0) 4198 4199 def test_finalize(self): 4200 conn, child_conn = self.Pipe() 4201 4202 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4203 p.daemon = True 4204 p.start() 4205 p.join() 4206 4207 result = [obj for obj in iter(conn.recv, 'STOP')] 4208 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4209 4210 def test_thread_safety(self): 4211 # bpo-24484: _run_finalizers() should be thread-safe 4212 def cb(): 4213 pass 4214 4215 class Foo(object): 4216 def __init__(self): 4217 self.ref = self # create reference cycle 4218 # insert finalizer at random key 4219 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4220 4221 finish = False 4222 exc = None 4223 4224 def run_finalizers(): 4225 nonlocal exc 4226 while not finish: 4227 time.sleep(random.random() * 1e-1) 4228 try: 4229 # A GC run will eventually happen during this, 4230 # collecting stale Foo's and mutating the registry 4231 util._run_finalizers() 4232 except Exception as e: 4233 exc = e 4234 4235 def make_finalizers(): 4236 nonlocal exc 4237 d = {} 4238 while not finish: 4239 try: 4240 # Old Foo's get gradually replaced and later 4241 # collected by the GC (because of the cyclic ref) 4242 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4243 except Exception as e: 4244 exc = e 4245 d.clear() 4246 4247 old_interval = sys.getswitchinterval() 4248 old_threshold = gc.get_threshold() 4249 try: 4250 sys.setswitchinterval(1e-6) 4251 gc.set_threshold(5, 5, 5) 4252 threads = [threading.Thread(target=run_finalizers), 4253 threading.Thread(target=make_finalizers)] 4254 with test.support.start_threads(threads): 4255 time.sleep(4.0) # Wait a bit to trigger race condition 4256 finish = True 4257 if exc is not None: 4258 raise exc 4259 finally: 4260 sys.setswitchinterval(old_interval) 4261 gc.set_threshold(*old_threshold) 4262 gc.collect() # Collect remaining Foo's 4263 4264 4265# 4266# Test that from ... import * works for each module 4267# 4268 4269class _TestImportStar(unittest.TestCase): 4270 4271 def get_module_names(self): 4272 import glob 4273 folder = os.path.dirname(multiprocessing.__file__) 4274 pattern = os.path.join(glob.escape(folder), '*.py') 4275 files = glob.glob(pattern) 4276 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4277 modules = ['multiprocessing.' + m for m in modules] 4278 modules.remove('multiprocessing.__init__') 4279 modules.append('multiprocessing') 4280 return modules 4281 4282 def test_import(self): 4283 modules = self.get_module_names() 4284 if sys.platform == 'win32': 4285 modules.remove('multiprocessing.popen_fork') 4286 modules.remove('multiprocessing.popen_forkserver') 4287 modules.remove('multiprocessing.popen_spawn_posix') 4288 else: 4289 modules.remove('multiprocessing.popen_spawn_win32') 4290 if not HAS_REDUCTION: 4291 modules.remove('multiprocessing.popen_forkserver') 4292 4293 if c_int is None: 4294 # This module requires _ctypes 4295 modules.remove('multiprocessing.sharedctypes') 4296 4297 for name in modules: 4298 __import__(name) 4299 mod = sys.modules[name] 4300 self.assertTrue(hasattr(mod, '__all__'), name) 4301 4302 for attr in mod.__all__: 4303 self.assertTrue( 4304 hasattr(mod, attr), 4305 '%r does not have attribute %r' % (mod, attr) 4306 ) 4307 4308# 4309# Quick test that logging works -- does not test logging output 4310# 4311 4312class _TestLogging(BaseTestCase): 4313 4314 ALLOWED_TYPES = ('processes',) 4315 4316 def test_enable_logging(self): 4317 logger = multiprocessing.get_logger() 4318 logger.setLevel(util.SUBWARNING) 4319 self.assertTrue(logger is not None) 4320 logger.debug('this will not be printed') 4321 logger.info('nor will this') 4322 logger.setLevel(LOG_LEVEL) 4323 4324 @classmethod 4325 def _test_level(cls, conn): 4326 logger = multiprocessing.get_logger() 4327 conn.send(logger.getEffectiveLevel()) 4328 4329 def test_level(self): 4330 LEVEL1 = 32 4331 LEVEL2 = 37 4332 4333 logger = multiprocessing.get_logger() 4334 root_logger = logging.getLogger() 4335 root_level = root_logger.level 4336 4337 reader, writer = multiprocessing.Pipe(duplex=False) 4338 4339 logger.setLevel(LEVEL1) 4340 p = self.Process(target=self._test_level, args=(writer,)) 4341 p.start() 4342 self.assertEqual(LEVEL1, reader.recv()) 4343 p.join() 4344 p.close() 4345 4346 logger.setLevel(logging.NOTSET) 4347 root_logger.setLevel(LEVEL2) 4348 p = self.Process(target=self._test_level, args=(writer,)) 4349 p.start() 4350 self.assertEqual(LEVEL2, reader.recv()) 4351 p.join() 4352 p.close() 4353 4354 root_logger.setLevel(root_level) 4355 logger.setLevel(level=LOG_LEVEL) 4356 4357 4358# class _TestLoggingProcessName(BaseTestCase): 4359# 4360# def handle(self, record): 4361# assert record.processName == multiprocessing.current_process().name 4362# self.__handled = True 4363# 4364# def test_logging(self): 4365# handler = logging.Handler() 4366# handler.handle = self.handle 4367# self.__handled = False 4368# # Bypass getLogger() and side-effects 4369# logger = logging.getLoggerClass()( 4370# 'multiprocessing.test.TestLoggingProcessName') 4371# logger.addHandler(handler) 4372# logger.propagate = False 4373# 4374# logger.warn('foo') 4375# assert self.__handled 4376 4377# 4378# Check that Process.join() retries if os.waitpid() fails with EINTR 4379# 4380 4381class _TestPollEintr(BaseTestCase): 4382 4383 ALLOWED_TYPES = ('processes',) 4384 4385 @classmethod 4386 def _killer(cls, pid): 4387 time.sleep(0.1) 4388 os.kill(pid, signal.SIGUSR1) 4389 4390 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4391 def test_poll_eintr(self): 4392 got_signal = [False] 4393 def record(*args): 4394 got_signal[0] = True 4395 pid = os.getpid() 4396 oldhandler = signal.signal(signal.SIGUSR1, record) 4397 try: 4398 killer = self.Process(target=self._killer, args=(pid,)) 4399 killer.start() 4400 try: 4401 p = self.Process(target=time.sleep, args=(2,)) 4402 p.start() 4403 p.join() 4404 finally: 4405 killer.join() 4406 self.assertTrue(got_signal[0]) 4407 self.assertEqual(p.exitcode, 0) 4408 finally: 4409 signal.signal(signal.SIGUSR1, oldhandler) 4410 4411# 4412# Test to verify handle verification, see issue 3321 4413# 4414 4415class TestInvalidHandle(unittest.TestCase): 4416 4417 @unittest.skipIf(WIN32, "skipped on Windows") 4418 def test_invalid_handles(self): 4419 conn = multiprocessing.connection.Connection(44977608) 4420 # check that poll() doesn't crash 4421 try: 4422 conn.poll() 4423 except (ValueError, OSError): 4424 pass 4425 finally: 4426 # Hack private attribute _handle to avoid printing an error 4427 # in conn.__del__ 4428 conn._handle = None 4429 self.assertRaises((ValueError, OSError), 4430 multiprocessing.connection.Connection, -1) 4431 4432 4433 4434@hashlib_helper.requires_hashdigest('md5') 4435class OtherTest(unittest.TestCase): 4436 # TODO: add more tests for deliver/answer challenge. 4437 def test_deliver_challenge_auth_failure(self): 4438 class _FakeConnection(object): 4439 def recv_bytes(self, size): 4440 return b'something bogus' 4441 def send_bytes(self, data): 4442 pass 4443 self.assertRaises(multiprocessing.AuthenticationError, 4444 multiprocessing.connection.deliver_challenge, 4445 _FakeConnection(), b'abc') 4446 4447 def test_answer_challenge_auth_failure(self): 4448 class _FakeConnection(object): 4449 def __init__(self): 4450 self.count = 0 4451 def recv_bytes(self, size): 4452 self.count += 1 4453 if self.count == 1: 4454 return multiprocessing.connection.CHALLENGE 4455 elif self.count == 2: 4456 return b'something bogus' 4457 return b'' 4458 def send_bytes(self, data): 4459 pass 4460 self.assertRaises(multiprocessing.AuthenticationError, 4461 multiprocessing.connection.answer_challenge, 4462 _FakeConnection(), b'abc') 4463 4464# 4465# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4466# 4467 4468def initializer(ns): 4469 ns.test += 1 4470 4471@hashlib_helper.requires_hashdigest('md5') 4472class TestInitializers(unittest.TestCase): 4473 def setUp(self): 4474 self.mgr = multiprocessing.Manager() 4475 self.ns = self.mgr.Namespace() 4476 self.ns.test = 0 4477 4478 def tearDown(self): 4479 self.mgr.shutdown() 4480 self.mgr.join() 4481 4482 def test_manager_initializer(self): 4483 m = multiprocessing.managers.SyncManager() 4484 self.assertRaises(TypeError, m.start, 1) 4485 m.start(initializer, (self.ns,)) 4486 self.assertEqual(self.ns.test, 1) 4487 m.shutdown() 4488 m.join() 4489 4490 def test_pool_initializer(self): 4491 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4492 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4493 p.close() 4494 p.join() 4495 self.assertEqual(self.ns.test, 1) 4496 4497# 4498# Issue 5155, 5313, 5331: Test process in processes 4499# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4500# 4501 4502def _this_sub_process(q): 4503 try: 4504 item = q.get(block=False) 4505 except pyqueue.Empty: 4506 pass 4507 4508def _test_process(): 4509 queue = multiprocessing.Queue() 4510 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4511 subProc.daemon = True 4512 subProc.start() 4513 subProc.join() 4514 4515def _afunc(x): 4516 return x*x 4517 4518def pool_in_process(): 4519 pool = multiprocessing.Pool(processes=4) 4520 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4521 pool.close() 4522 pool.join() 4523 4524class _file_like(object): 4525 def __init__(self, delegate): 4526 self._delegate = delegate 4527 self._pid = None 4528 4529 @property 4530 def cache(self): 4531 pid = os.getpid() 4532 # There are no race conditions since fork keeps only the running thread 4533 if pid != self._pid: 4534 self._pid = pid 4535 self._cache = [] 4536 return self._cache 4537 4538 def write(self, data): 4539 self.cache.append(data) 4540 4541 def flush(self): 4542 self._delegate.write(''.join(self.cache)) 4543 self._cache = [] 4544 4545class TestStdinBadfiledescriptor(unittest.TestCase): 4546 4547 def test_queue_in_process(self): 4548 proc = multiprocessing.Process(target=_test_process) 4549 proc.start() 4550 proc.join() 4551 4552 def test_pool_in_process(self): 4553 p = multiprocessing.Process(target=pool_in_process) 4554 p.start() 4555 p.join() 4556 4557 def test_flushing(self): 4558 sio = io.StringIO() 4559 flike = _file_like(sio) 4560 flike.write('foo') 4561 proc = multiprocessing.Process(target=lambda: flike.flush()) 4562 flike.flush() 4563 assert sio.getvalue() == 'foo' 4564 4565 4566class TestWait(unittest.TestCase): 4567 4568 @classmethod 4569 def _child_test_wait(cls, w, slow): 4570 for i in range(10): 4571 if slow: 4572 time.sleep(random.random()*0.1) 4573 w.send((i, os.getpid())) 4574 w.close() 4575 4576 def test_wait(self, slow=False): 4577 from multiprocessing.connection import wait 4578 readers = [] 4579 procs = [] 4580 messages = [] 4581 4582 for i in range(4): 4583 r, w = multiprocessing.Pipe(duplex=False) 4584 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4585 p.daemon = True 4586 p.start() 4587 w.close() 4588 readers.append(r) 4589 procs.append(p) 4590 self.addCleanup(p.join) 4591 4592 while readers: 4593 for r in wait(readers): 4594 try: 4595 msg = r.recv() 4596 except EOFError: 4597 readers.remove(r) 4598 r.close() 4599 else: 4600 messages.append(msg) 4601 4602 messages.sort() 4603 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4604 self.assertEqual(messages, expected) 4605 4606 @classmethod 4607 def _child_test_wait_socket(cls, address, slow): 4608 s = socket.socket() 4609 s.connect(address) 4610 for i in range(10): 4611 if slow: 4612 time.sleep(random.random()*0.1) 4613 s.sendall(('%s\n' % i).encode('ascii')) 4614 s.close() 4615 4616 def test_wait_socket(self, slow=False): 4617 from multiprocessing.connection import wait 4618 l = socket.create_server((socket_helper.HOST, 0)) 4619 addr = l.getsockname() 4620 readers = [] 4621 procs = [] 4622 dic = {} 4623 4624 for i in range(4): 4625 p = multiprocessing.Process(target=self._child_test_wait_socket, 4626 args=(addr, slow)) 4627 p.daemon = True 4628 p.start() 4629 procs.append(p) 4630 self.addCleanup(p.join) 4631 4632 for i in range(4): 4633 r, _ = l.accept() 4634 readers.append(r) 4635 dic[r] = [] 4636 l.close() 4637 4638 while readers: 4639 for r in wait(readers): 4640 msg = r.recv(32) 4641 if not msg: 4642 readers.remove(r) 4643 r.close() 4644 else: 4645 dic[r].append(msg) 4646 4647 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4648 for v in dic.values(): 4649 self.assertEqual(b''.join(v), expected) 4650 4651 def test_wait_slow(self): 4652 self.test_wait(True) 4653 4654 def test_wait_socket_slow(self): 4655 self.test_wait_socket(True) 4656 4657 def test_wait_timeout(self): 4658 from multiprocessing.connection import wait 4659 4660 expected = 5 4661 a, b = multiprocessing.Pipe() 4662 4663 start = time.monotonic() 4664 res = wait([a, b], expected) 4665 delta = time.monotonic() - start 4666 4667 self.assertEqual(res, []) 4668 self.assertLess(delta, expected * 2) 4669 self.assertGreater(delta, expected * 0.5) 4670 4671 b.send(None) 4672 4673 start = time.monotonic() 4674 res = wait([a, b], 20) 4675 delta = time.monotonic() - start 4676 4677 self.assertEqual(res, [a]) 4678 self.assertLess(delta, 0.4) 4679 4680 @classmethod 4681 def signal_and_sleep(cls, sem, period): 4682 sem.release() 4683 time.sleep(period) 4684 4685 def test_wait_integer(self): 4686 from multiprocessing.connection import wait 4687 4688 expected = 3 4689 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4690 sem = multiprocessing.Semaphore(0) 4691 a, b = multiprocessing.Pipe() 4692 p = multiprocessing.Process(target=self.signal_and_sleep, 4693 args=(sem, expected)) 4694 4695 p.start() 4696 self.assertIsInstance(p.sentinel, int) 4697 self.assertTrue(sem.acquire(timeout=20)) 4698 4699 start = time.monotonic() 4700 res = wait([a, p.sentinel, b], expected + 20) 4701 delta = time.monotonic() - start 4702 4703 self.assertEqual(res, [p.sentinel]) 4704 self.assertLess(delta, expected + 2) 4705 self.assertGreater(delta, expected - 2) 4706 4707 a.send(None) 4708 4709 start = time.monotonic() 4710 res = wait([a, p.sentinel, b], 20) 4711 delta = time.monotonic() - start 4712 4713 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4714 self.assertLess(delta, 0.4) 4715 4716 b.send(None) 4717 4718 start = time.monotonic() 4719 res = wait([a, p.sentinel, b], 20) 4720 delta = time.monotonic() - start 4721 4722 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4723 self.assertLess(delta, 0.4) 4724 4725 p.terminate() 4726 p.join() 4727 4728 def test_neg_timeout(self): 4729 from multiprocessing.connection import wait 4730 a, b = multiprocessing.Pipe() 4731 t = time.monotonic() 4732 res = wait([a], timeout=-1) 4733 t = time.monotonic() - t 4734 self.assertEqual(res, []) 4735 self.assertLess(t, 1) 4736 a.close() 4737 b.close() 4738 4739# 4740# Issue 14151: Test invalid family on invalid environment 4741# 4742 4743class TestInvalidFamily(unittest.TestCase): 4744 4745 @unittest.skipIf(WIN32, "skipped on Windows") 4746 def test_invalid_family(self): 4747 with self.assertRaises(ValueError): 4748 multiprocessing.connection.Listener(r'\\.\test') 4749 4750 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4751 def test_invalid_family_win32(self): 4752 with self.assertRaises(ValueError): 4753 multiprocessing.connection.Listener('/var/test.pipe') 4754 4755# 4756# Issue 12098: check sys.flags of child matches that for parent 4757# 4758 4759class TestFlags(unittest.TestCase): 4760 @classmethod 4761 def run_in_grandchild(cls, conn): 4762 conn.send(tuple(sys.flags)) 4763 4764 @classmethod 4765 def run_in_child(cls): 4766 import json 4767 r, w = multiprocessing.Pipe(duplex=False) 4768 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4769 p.start() 4770 grandchild_flags = r.recv() 4771 p.join() 4772 r.close() 4773 w.close() 4774 flags = (tuple(sys.flags), grandchild_flags) 4775 print(json.dumps(flags)) 4776 4777 def test_flags(self): 4778 import json 4779 # start child process using unusual flags 4780 prog = ('from test._test_multiprocessing import TestFlags; ' + 4781 'TestFlags.run_in_child()') 4782 data = subprocess.check_output( 4783 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4784 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4785 self.assertEqual(child_flags, grandchild_flags) 4786 4787# 4788# Test interaction with socket timeouts - see Issue #6056 4789# 4790 4791class TestTimeouts(unittest.TestCase): 4792 @classmethod 4793 def _test_timeout(cls, child, address): 4794 time.sleep(1) 4795 child.send(123) 4796 child.close() 4797 conn = multiprocessing.connection.Client(address) 4798 conn.send(456) 4799 conn.close() 4800 4801 def test_timeout(self): 4802 old_timeout = socket.getdefaulttimeout() 4803 try: 4804 socket.setdefaulttimeout(0.1) 4805 parent, child = multiprocessing.Pipe(duplex=True) 4806 l = multiprocessing.connection.Listener(family='AF_INET') 4807 p = multiprocessing.Process(target=self._test_timeout, 4808 args=(child, l.address)) 4809 p.start() 4810 child.close() 4811 self.assertEqual(parent.recv(), 123) 4812 parent.close() 4813 conn = l.accept() 4814 self.assertEqual(conn.recv(), 456) 4815 conn.close() 4816 l.close() 4817 join_process(p) 4818 finally: 4819 socket.setdefaulttimeout(old_timeout) 4820 4821# 4822# Test what happens with no "if __name__ == '__main__'" 4823# 4824 4825class TestNoForkBomb(unittest.TestCase): 4826 def test_noforkbomb(self): 4827 sm = multiprocessing.get_start_method() 4828 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4829 if sm != 'fork': 4830 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4831 self.assertEqual(out, b'') 4832 self.assertIn(b'RuntimeError', err) 4833 else: 4834 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4835 self.assertEqual(out.rstrip(), b'123') 4836 self.assertEqual(err, b'') 4837 4838# 4839# Issue #17555: ForkAwareThreadLock 4840# 4841 4842class TestForkAwareThreadLock(unittest.TestCase): 4843 # We recursively start processes. Issue #17555 meant that the 4844 # after fork registry would get duplicate entries for the same 4845 # lock. The size of the registry at generation n was ~2**n. 4846 4847 @classmethod 4848 def child(cls, n, conn): 4849 if n > 1: 4850 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4851 p.start() 4852 conn.close() 4853 join_process(p) 4854 else: 4855 conn.send(len(util._afterfork_registry)) 4856 conn.close() 4857 4858 def test_lock(self): 4859 r, w = multiprocessing.Pipe(False) 4860 l = util.ForkAwareThreadLock() 4861 old_size = len(util._afterfork_registry) 4862 p = multiprocessing.Process(target=self.child, args=(5, w)) 4863 p.start() 4864 w.close() 4865 new_size = r.recv() 4866 join_process(p) 4867 self.assertLessEqual(new_size, old_size) 4868 4869# 4870# Check that non-forked child processes do not inherit unneeded fds/handles 4871# 4872 4873class TestCloseFds(unittest.TestCase): 4874 4875 def get_high_socket_fd(self): 4876 if WIN32: 4877 # The child process will not have any socket handles, so 4878 # calling socket.fromfd() should produce WSAENOTSOCK even 4879 # if there is a handle of the same number. 4880 return socket.socket().detach() 4881 else: 4882 # We want to produce a socket with an fd high enough that a 4883 # freshly created child process will not have any fds as high. 4884 fd = socket.socket().detach() 4885 to_close = [] 4886 while fd < 50: 4887 to_close.append(fd) 4888 fd = os.dup(fd) 4889 for x in to_close: 4890 os.close(x) 4891 return fd 4892 4893 def close(self, fd): 4894 if WIN32: 4895 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 4896 else: 4897 os.close(fd) 4898 4899 @classmethod 4900 def _test_closefds(cls, conn, fd): 4901 try: 4902 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 4903 except Exception as e: 4904 conn.send(e) 4905 else: 4906 s.close() 4907 conn.send(None) 4908 4909 def test_closefd(self): 4910 if not HAS_REDUCTION: 4911 raise unittest.SkipTest('requires fd pickling') 4912 4913 reader, writer = multiprocessing.Pipe() 4914 fd = self.get_high_socket_fd() 4915 try: 4916 p = multiprocessing.Process(target=self._test_closefds, 4917 args=(writer, fd)) 4918 p.start() 4919 writer.close() 4920 e = reader.recv() 4921 join_process(p) 4922 finally: 4923 self.close(fd) 4924 writer.close() 4925 reader.close() 4926 4927 if multiprocessing.get_start_method() == 'fork': 4928 self.assertIs(e, None) 4929 else: 4930 WSAENOTSOCK = 10038 4931 self.assertIsInstance(e, OSError) 4932 self.assertTrue(e.errno == errno.EBADF or 4933 e.winerror == WSAENOTSOCK, e) 4934 4935# 4936# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 4937# 4938 4939class TestIgnoreEINTR(unittest.TestCase): 4940 4941 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 4942 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 4943 4944 @classmethod 4945 def _test_ignore(cls, conn): 4946 def handler(signum, frame): 4947 pass 4948 signal.signal(signal.SIGUSR1, handler) 4949 conn.send('ready') 4950 x = conn.recv() 4951 conn.send(x) 4952 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 4953 4954 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4955 def test_ignore(self): 4956 conn, child_conn = multiprocessing.Pipe() 4957 try: 4958 p = multiprocessing.Process(target=self._test_ignore, 4959 args=(child_conn,)) 4960 p.daemon = True 4961 p.start() 4962 child_conn.close() 4963 self.assertEqual(conn.recv(), 'ready') 4964 time.sleep(0.1) 4965 os.kill(p.pid, signal.SIGUSR1) 4966 time.sleep(0.1) 4967 conn.send(1234) 4968 self.assertEqual(conn.recv(), 1234) 4969 time.sleep(0.1) 4970 os.kill(p.pid, signal.SIGUSR1) 4971 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 4972 time.sleep(0.1) 4973 p.join() 4974 finally: 4975 conn.close() 4976 4977 @classmethod 4978 def _test_ignore_listener(cls, conn): 4979 def handler(signum, frame): 4980 pass 4981 signal.signal(signal.SIGUSR1, handler) 4982 with multiprocessing.connection.Listener() as l: 4983 conn.send(l.address) 4984 a = l.accept() 4985 a.send('welcome') 4986 4987 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4988 def test_ignore_listener(self): 4989 conn, child_conn = multiprocessing.Pipe() 4990 try: 4991 p = multiprocessing.Process(target=self._test_ignore_listener, 4992 args=(child_conn,)) 4993 p.daemon = True 4994 p.start() 4995 child_conn.close() 4996 address = conn.recv() 4997 time.sleep(0.1) 4998 os.kill(p.pid, signal.SIGUSR1) 4999 time.sleep(0.1) 5000 client = multiprocessing.connection.Client(address) 5001 self.assertEqual(client.recv(), 'welcome') 5002 p.join() 5003 finally: 5004 conn.close() 5005 5006class TestStartMethod(unittest.TestCase): 5007 @classmethod 5008 def _check_context(cls, conn): 5009 conn.send(multiprocessing.get_start_method()) 5010 5011 def check_context(self, ctx): 5012 r, w = ctx.Pipe(duplex=False) 5013 p = ctx.Process(target=self._check_context, args=(w,)) 5014 p.start() 5015 w.close() 5016 child_method = r.recv() 5017 r.close() 5018 p.join() 5019 self.assertEqual(child_method, ctx.get_start_method()) 5020 5021 def test_context(self): 5022 for method in ('fork', 'spawn', 'forkserver'): 5023 try: 5024 ctx = multiprocessing.get_context(method) 5025 except ValueError: 5026 continue 5027 self.assertEqual(ctx.get_start_method(), method) 5028 self.assertIs(ctx.get_context(), ctx) 5029 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 5030 self.assertRaises(ValueError, ctx.set_start_method, None) 5031 self.check_context(ctx) 5032 5033 def test_set_get(self): 5034 multiprocessing.set_forkserver_preload(PRELOAD) 5035 count = 0 5036 old_method = multiprocessing.get_start_method() 5037 try: 5038 for method in ('fork', 'spawn', 'forkserver'): 5039 try: 5040 multiprocessing.set_start_method(method, force=True) 5041 except ValueError: 5042 continue 5043 self.assertEqual(multiprocessing.get_start_method(), method) 5044 ctx = multiprocessing.get_context() 5045 self.assertEqual(ctx.get_start_method(), method) 5046 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5047 self.assertTrue( 5048 ctx.Process.__name__.lower().startswith(method)) 5049 self.check_context(multiprocessing) 5050 count += 1 5051 finally: 5052 multiprocessing.set_start_method(old_method, force=True) 5053 self.assertGreaterEqual(count, 1) 5054 5055 def test_get_all(self): 5056 methods = multiprocessing.get_all_start_methods() 5057 if sys.platform == 'win32': 5058 self.assertEqual(methods, ['spawn']) 5059 else: 5060 self.assertTrue(methods == ['fork', 'spawn'] or 5061 methods == ['spawn', 'fork'] or 5062 methods == ['fork', 'spawn', 'forkserver'] or 5063 methods == ['spawn', 'fork', 'forkserver']) 5064 5065 def test_preload_resources(self): 5066 if multiprocessing.get_start_method() != 'forkserver': 5067 self.skipTest("test only relevant for 'forkserver' method") 5068 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5069 rc, out, err = test.support.script_helper.assert_python_ok(name) 5070 out = out.decode() 5071 err = err.decode() 5072 if out.rstrip() != 'ok' or err != '': 5073 print(out) 5074 print(err) 5075 self.fail("failed spawning forkserver or grandchild") 5076 5077 5078@unittest.skipIf(sys.platform == "win32", 5079 "test semantics don't make sense on Windows") 5080class TestResourceTracker(unittest.TestCase): 5081 5082 def test_resource_tracker(self): 5083 # 5084 # Check that killing process does not leak named semaphores 5085 # 5086 cmd = '''if 1: 5087 import time, os, tempfile 5088 import multiprocessing as mp 5089 from multiprocessing import resource_tracker 5090 from multiprocessing.shared_memory import SharedMemory 5091 5092 mp.set_start_method("spawn") 5093 rand = tempfile._RandomNameSequence() 5094 5095 5096 def create_and_register_resource(rtype): 5097 if rtype == "semaphore": 5098 lock = mp.Lock() 5099 return lock, lock._semlock.name 5100 elif rtype == "shared_memory": 5101 sm = SharedMemory(create=True, size=10) 5102 return sm, sm._name 5103 else: 5104 raise ValueError( 5105 "Resource type {{}} not understood".format(rtype)) 5106 5107 5108 resource1, rname1 = create_and_register_resource("{rtype}") 5109 resource2, rname2 = create_and_register_resource("{rtype}") 5110 5111 os.write({w}, rname1.encode("ascii") + b"\\n") 5112 os.write({w}, rname2.encode("ascii") + b"\\n") 5113 5114 time.sleep(10) 5115 ''' 5116 for rtype in resource_tracker._CLEANUP_FUNCS: 5117 with self.subTest(rtype=rtype): 5118 if rtype == "noop": 5119 # Artefact resource type used by the resource_tracker 5120 continue 5121 r, w = os.pipe() 5122 p = subprocess.Popen([sys.executable, 5123 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5124 pass_fds=[w], 5125 stderr=subprocess.PIPE) 5126 os.close(w) 5127 with open(r, 'rb', closefd=True) as f: 5128 name1 = f.readline().rstrip().decode('ascii') 5129 name2 = f.readline().rstrip().decode('ascii') 5130 _resource_unlink(name1, rtype) 5131 p.terminate() 5132 p.wait() 5133 5134 deadline = time.monotonic() + support.LONG_TIMEOUT 5135 while time.monotonic() < deadline: 5136 time.sleep(.5) 5137 try: 5138 _resource_unlink(name2, rtype) 5139 except OSError as e: 5140 # docs say it should be ENOENT, but OSX seems to give 5141 # EINVAL 5142 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5143 break 5144 else: 5145 raise AssertionError( 5146 f"A {rtype} resource was leaked after a process was " 5147 f"abruptly terminated.") 5148 err = p.stderr.read().decode('utf-8') 5149 p.stderr.close() 5150 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5151 'objects'.format( 5152 rtype)) 5153 self.assertRegex(err, expected) 5154 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5155 5156 def check_resource_tracker_death(self, signum, should_die): 5157 # bpo-31310: if the semaphore tracker process has died, it should 5158 # be restarted implicitly. 5159 from multiprocessing.resource_tracker import _resource_tracker 5160 pid = _resource_tracker._pid 5161 if pid is not None: 5162 os.kill(pid, signal.SIGKILL) 5163 support.wait_process(pid, exitcode=-signal.SIGKILL) 5164 with warnings.catch_warnings(): 5165 warnings.simplefilter("ignore") 5166 _resource_tracker.ensure_running() 5167 pid = _resource_tracker._pid 5168 5169 os.kill(pid, signum) 5170 time.sleep(1.0) # give it time to die 5171 5172 ctx = multiprocessing.get_context("spawn") 5173 with warnings.catch_warnings(record=True) as all_warn: 5174 warnings.simplefilter("always") 5175 sem = ctx.Semaphore() 5176 sem.acquire() 5177 sem.release() 5178 wr = weakref.ref(sem) 5179 # ensure `sem` gets collected, which triggers communication with 5180 # the semaphore tracker 5181 del sem 5182 gc.collect() 5183 self.assertIsNone(wr()) 5184 if should_die: 5185 self.assertEqual(len(all_warn), 1) 5186 the_warn = all_warn[0] 5187 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5188 self.assertTrue("resource_tracker: process died" 5189 in str(the_warn.message)) 5190 else: 5191 self.assertEqual(len(all_warn), 0) 5192 5193 def test_resource_tracker_sigint(self): 5194 # Catchable signal (ignored by semaphore tracker) 5195 self.check_resource_tracker_death(signal.SIGINT, False) 5196 5197 def test_resource_tracker_sigterm(self): 5198 # Catchable signal (ignored by semaphore tracker) 5199 self.check_resource_tracker_death(signal.SIGTERM, False) 5200 5201 def test_resource_tracker_sigkill(self): 5202 # Uncatchable signal. 5203 self.check_resource_tracker_death(signal.SIGKILL, True) 5204 5205 @staticmethod 5206 def _is_resource_tracker_reused(conn, pid): 5207 from multiprocessing.resource_tracker import _resource_tracker 5208 _resource_tracker.ensure_running() 5209 # The pid should be None in the child process, expect for the fork 5210 # context. It should not be a new value. 5211 reused = _resource_tracker._pid in (None, pid) 5212 reused &= _resource_tracker._check_alive() 5213 conn.send(reused) 5214 5215 def test_resource_tracker_reused(self): 5216 from multiprocessing.resource_tracker import _resource_tracker 5217 _resource_tracker.ensure_running() 5218 pid = _resource_tracker._pid 5219 5220 r, w = multiprocessing.Pipe(duplex=False) 5221 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5222 args=(w, pid)) 5223 p.start() 5224 is_resource_tracker_reused = r.recv() 5225 5226 # Clean up 5227 p.join() 5228 w.close() 5229 r.close() 5230 5231 self.assertTrue(is_resource_tracker_reused) 5232 5233 5234class TestSimpleQueue(unittest.TestCase): 5235 5236 @classmethod 5237 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5238 child_can_start.wait() 5239 # issue 30301, could fail under spawn and forkserver 5240 try: 5241 queue.put(queue.empty()) 5242 queue.put(queue.empty()) 5243 finally: 5244 parent_can_continue.set() 5245 5246 def test_empty(self): 5247 queue = multiprocessing.SimpleQueue() 5248 child_can_start = multiprocessing.Event() 5249 parent_can_continue = multiprocessing.Event() 5250 5251 proc = multiprocessing.Process( 5252 target=self._test_empty, 5253 args=(queue, child_can_start, parent_can_continue) 5254 ) 5255 proc.daemon = True 5256 proc.start() 5257 5258 self.assertTrue(queue.empty()) 5259 5260 child_can_start.set() 5261 parent_can_continue.wait() 5262 5263 self.assertFalse(queue.empty()) 5264 self.assertEqual(queue.get(), True) 5265 self.assertEqual(queue.get(), False) 5266 self.assertTrue(queue.empty()) 5267 5268 proc.join() 5269 5270 def test_close(self): 5271 queue = multiprocessing.SimpleQueue() 5272 queue.close() 5273 # closing a queue twice should not fail 5274 queue.close() 5275 5276 # Test specific to CPython since it tests private attributes 5277 @test.support.cpython_only 5278 def test_closed(self): 5279 queue = multiprocessing.SimpleQueue() 5280 queue.close() 5281 self.assertTrue(queue._reader.closed) 5282 self.assertTrue(queue._writer.closed) 5283 5284 5285class TestPoolNotLeakOnFailure(unittest.TestCase): 5286 5287 def test_release_unused_processes(self): 5288 # Issue #19675: During pool creation, if we can't create a process, 5289 # don't leak already created ones. 5290 will_fail_in = 3 5291 forked_processes = [] 5292 5293 class FailingForkProcess: 5294 def __init__(self, **kwargs): 5295 self.name = 'Fake Process' 5296 self.exitcode = None 5297 self.state = None 5298 forked_processes.append(self) 5299 5300 def start(self): 5301 nonlocal will_fail_in 5302 if will_fail_in <= 0: 5303 raise OSError("Manually induced OSError") 5304 will_fail_in -= 1 5305 self.state = 'started' 5306 5307 def terminate(self): 5308 self.state = 'stopping' 5309 5310 def join(self): 5311 if self.state == 'stopping': 5312 self.state = 'stopped' 5313 5314 def is_alive(self): 5315 return self.state == 'started' or self.state == 'stopping' 5316 5317 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5318 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5319 Process=FailingForkProcess)) 5320 p.close() 5321 p.join() 5322 self.assertFalse( 5323 any(process.is_alive() for process in forked_processes)) 5324 5325 5326@hashlib_helper.requires_hashdigest('md5') 5327class TestSyncManagerTypes(unittest.TestCase): 5328 """Test all the types which can be shared between a parent and a 5329 child process by using a manager which acts as an intermediary 5330 between them. 5331 5332 In the following unit-tests the base type is created in the parent 5333 process, the @classmethod represents the worker process and the 5334 shared object is readable and editable between the two. 5335 5336 # The child. 5337 @classmethod 5338 def _test_list(cls, obj): 5339 assert obj[0] == 5 5340 assert obj.append(6) 5341 5342 # The parent. 5343 def test_list(self): 5344 o = self.manager.list() 5345 o.append(5) 5346 self.run_worker(self._test_list, o) 5347 assert o[1] == 6 5348 """ 5349 manager_class = multiprocessing.managers.SyncManager 5350 5351 def setUp(self): 5352 self.manager = self.manager_class() 5353 self.manager.start() 5354 self.proc = None 5355 5356 def tearDown(self): 5357 if self.proc is not None and self.proc.is_alive(): 5358 self.proc.terminate() 5359 self.proc.join() 5360 self.manager.shutdown() 5361 self.manager = None 5362 self.proc = None 5363 5364 @classmethod 5365 def setUpClass(cls): 5366 support.reap_children() 5367 5368 tearDownClass = setUpClass 5369 5370 def wait_proc_exit(self): 5371 # Only the manager process should be returned by active_children() 5372 # but this can take a bit on slow machines, so wait a few seconds 5373 # if there are other children too (see #17395). 5374 join_process(self.proc) 5375 start_time = time.monotonic() 5376 t = 0.01 5377 while len(multiprocessing.active_children()) > 1: 5378 time.sleep(t) 5379 t *= 2 5380 dt = time.monotonic() - start_time 5381 if dt >= 5.0: 5382 test.support.environment_altered = True 5383 support.print_warning(f"multiprocessing.Manager still has " 5384 f"{multiprocessing.active_children()} " 5385 f"active children after {dt} seconds") 5386 break 5387 5388 def run_worker(self, worker, obj): 5389 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5390 self.proc.daemon = True 5391 self.proc.start() 5392 self.wait_proc_exit() 5393 self.assertEqual(self.proc.exitcode, 0) 5394 5395 @classmethod 5396 def _test_event(cls, obj): 5397 assert obj.is_set() 5398 obj.wait() 5399 obj.clear() 5400 obj.wait(0.001) 5401 5402 def test_event(self): 5403 o = self.manager.Event() 5404 o.set() 5405 self.run_worker(self._test_event, o) 5406 assert not o.is_set() 5407 o.wait(0.001) 5408 5409 @classmethod 5410 def _test_lock(cls, obj): 5411 obj.acquire() 5412 5413 def test_lock(self, lname="Lock"): 5414 o = getattr(self.manager, lname)() 5415 self.run_worker(self._test_lock, o) 5416 o.release() 5417 self.assertRaises(RuntimeError, o.release) # already released 5418 5419 @classmethod 5420 def _test_rlock(cls, obj): 5421 obj.acquire() 5422 obj.release() 5423 5424 def test_rlock(self, lname="Lock"): 5425 o = getattr(self.manager, lname)() 5426 self.run_worker(self._test_rlock, o) 5427 5428 @classmethod 5429 def _test_semaphore(cls, obj): 5430 obj.acquire() 5431 5432 def test_semaphore(self, sname="Semaphore"): 5433 o = getattr(self.manager, sname)() 5434 self.run_worker(self._test_semaphore, o) 5435 o.release() 5436 5437 def test_bounded_semaphore(self): 5438 self.test_semaphore(sname="BoundedSemaphore") 5439 5440 @classmethod 5441 def _test_condition(cls, obj): 5442 obj.acquire() 5443 obj.release() 5444 5445 def test_condition(self): 5446 o = self.manager.Condition() 5447 self.run_worker(self._test_condition, o) 5448 5449 @classmethod 5450 def _test_barrier(cls, obj): 5451 assert obj.parties == 5 5452 obj.reset() 5453 5454 def test_barrier(self): 5455 o = self.manager.Barrier(5) 5456 self.run_worker(self._test_barrier, o) 5457 5458 @classmethod 5459 def _test_pool(cls, obj): 5460 # TODO: fix https://bugs.python.org/issue35919 5461 with obj: 5462 pass 5463 5464 def test_pool(self): 5465 o = self.manager.Pool(processes=4) 5466 self.run_worker(self._test_pool, o) 5467 5468 @classmethod 5469 def _test_queue(cls, obj): 5470 assert obj.qsize() == 2 5471 assert obj.full() 5472 assert not obj.empty() 5473 assert obj.get() == 5 5474 assert not obj.empty() 5475 assert obj.get() == 6 5476 assert obj.empty() 5477 5478 def test_queue(self, qname="Queue"): 5479 o = getattr(self.manager, qname)(2) 5480 o.put(5) 5481 o.put(6) 5482 self.run_worker(self._test_queue, o) 5483 assert o.empty() 5484 assert not o.full() 5485 5486 def test_joinable_queue(self): 5487 self.test_queue("JoinableQueue") 5488 5489 @classmethod 5490 def _test_list(cls, obj): 5491 assert obj[0] == 5 5492 assert obj.count(5) == 1 5493 assert obj.index(5) == 0 5494 obj.sort() 5495 obj.reverse() 5496 for x in obj: 5497 pass 5498 assert len(obj) == 1 5499 assert obj.pop(0) == 5 5500 5501 def test_list(self): 5502 o = self.manager.list() 5503 o.append(5) 5504 self.run_worker(self._test_list, o) 5505 assert not o 5506 self.assertEqual(len(o), 0) 5507 5508 @classmethod 5509 def _test_dict(cls, obj): 5510 assert len(obj) == 1 5511 assert obj['foo'] == 5 5512 assert obj.get('foo') == 5 5513 assert list(obj.items()) == [('foo', 5)] 5514 assert list(obj.keys()) == ['foo'] 5515 assert list(obj.values()) == [5] 5516 assert obj.copy() == {'foo': 5} 5517 assert obj.popitem() == ('foo', 5) 5518 5519 def test_dict(self): 5520 o = self.manager.dict() 5521 o['foo'] = 5 5522 self.run_worker(self._test_dict, o) 5523 assert not o 5524 self.assertEqual(len(o), 0) 5525 5526 @classmethod 5527 def _test_value(cls, obj): 5528 assert obj.value == 1 5529 assert obj.get() == 1 5530 obj.set(2) 5531 5532 def test_value(self): 5533 o = self.manager.Value('i', 1) 5534 self.run_worker(self._test_value, o) 5535 self.assertEqual(o.value, 2) 5536 self.assertEqual(o.get(), 2) 5537 5538 @classmethod 5539 def _test_array(cls, obj): 5540 assert obj[0] == 0 5541 assert obj[1] == 1 5542 assert len(obj) == 2 5543 assert list(obj) == [0, 1] 5544 5545 def test_array(self): 5546 o = self.manager.Array('i', [0, 1]) 5547 self.run_worker(self._test_array, o) 5548 5549 @classmethod 5550 def _test_namespace(cls, obj): 5551 assert obj.x == 0 5552 assert obj.y == 1 5553 5554 def test_namespace(self): 5555 o = self.manager.Namespace() 5556 o.x = 0 5557 o.y = 1 5558 self.run_worker(self._test_namespace, o) 5559 5560 5561class MiscTestCase(unittest.TestCase): 5562 def test__all__(self): 5563 # Just make sure names in blacklist are excluded 5564 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5565 blacklist=['SUBDEBUG', 'SUBWARNING']) 5566# 5567# Mixins 5568# 5569 5570class BaseMixin(object): 5571 @classmethod 5572 def setUpClass(cls): 5573 cls.dangling = (multiprocessing.process._dangling.copy(), 5574 threading._dangling.copy()) 5575 5576 @classmethod 5577 def tearDownClass(cls): 5578 # bpo-26762: Some multiprocessing objects like Pool create reference 5579 # cycles. Trigger a garbage collection to break these cycles. 5580 test.support.gc_collect() 5581 5582 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5583 if processes: 5584 test.support.environment_altered = True 5585 support.print_warning(f'Dangling processes: {processes}') 5586 processes = None 5587 5588 threads = set(threading._dangling) - set(cls.dangling[1]) 5589 if threads: 5590 test.support.environment_altered = True 5591 support.print_warning(f'Dangling threads: {threads}') 5592 threads = None 5593 5594 5595class ProcessesMixin(BaseMixin): 5596 TYPE = 'processes' 5597 Process = multiprocessing.Process 5598 connection = multiprocessing.connection 5599 current_process = staticmethod(multiprocessing.current_process) 5600 parent_process = staticmethod(multiprocessing.parent_process) 5601 active_children = staticmethod(multiprocessing.active_children) 5602 Pool = staticmethod(multiprocessing.Pool) 5603 Pipe = staticmethod(multiprocessing.Pipe) 5604 Queue = staticmethod(multiprocessing.Queue) 5605 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5606 Lock = staticmethod(multiprocessing.Lock) 5607 RLock = staticmethod(multiprocessing.RLock) 5608 Semaphore = staticmethod(multiprocessing.Semaphore) 5609 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5610 Condition = staticmethod(multiprocessing.Condition) 5611 Event = staticmethod(multiprocessing.Event) 5612 Barrier = staticmethod(multiprocessing.Barrier) 5613 Value = staticmethod(multiprocessing.Value) 5614 Array = staticmethod(multiprocessing.Array) 5615 RawValue = staticmethod(multiprocessing.RawValue) 5616 RawArray = staticmethod(multiprocessing.RawArray) 5617 5618 5619class ManagerMixin(BaseMixin): 5620 TYPE = 'manager' 5621 Process = multiprocessing.Process 5622 Queue = property(operator.attrgetter('manager.Queue')) 5623 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5624 Lock = property(operator.attrgetter('manager.Lock')) 5625 RLock = property(operator.attrgetter('manager.RLock')) 5626 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5627 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5628 Condition = property(operator.attrgetter('manager.Condition')) 5629 Event = property(operator.attrgetter('manager.Event')) 5630 Barrier = property(operator.attrgetter('manager.Barrier')) 5631 Value = property(operator.attrgetter('manager.Value')) 5632 Array = property(operator.attrgetter('manager.Array')) 5633 list = property(operator.attrgetter('manager.list')) 5634 dict = property(operator.attrgetter('manager.dict')) 5635 Namespace = property(operator.attrgetter('manager.Namespace')) 5636 5637 @classmethod 5638 def Pool(cls, *args, **kwds): 5639 return cls.manager.Pool(*args, **kwds) 5640 5641 @classmethod 5642 def setUpClass(cls): 5643 super().setUpClass() 5644 cls.manager = multiprocessing.Manager() 5645 5646 @classmethod 5647 def tearDownClass(cls): 5648 # only the manager process should be returned by active_children() 5649 # but this can take a bit on slow machines, so wait a few seconds 5650 # if there are other children too (see #17395) 5651 start_time = time.monotonic() 5652 t = 0.01 5653 while len(multiprocessing.active_children()) > 1: 5654 time.sleep(t) 5655 t *= 2 5656 dt = time.monotonic() - start_time 5657 if dt >= 5.0: 5658 test.support.environment_altered = True 5659 support.print_warning(f"multiprocessing.Manager still has " 5660 f"{multiprocessing.active_children()} " 5661 f"active children after {dt} seconds") 5662 break 5663 5664 gc.collect() # do garbage collection 5665 if cls.manager._number_of_objects() != 0: 5666 # This is not really an error since some tests do not 5667 # ensure that all processes which hold a reference to a 5668 # managed object have been joined. 5669 test.support.environment_altered = True 5670 support.print_warning('Shared objects which still exist ' 5671 'at manager shutdown:') 5672 support.print_warning(cls.manager._debug_info()) 5673 cls.manager.shutdown() 5674 cls.manager.join() 5675 cls.manager = None 5676 5677 super().tearDownClass() 5678 5679 5680class ThreadsMixin(BaseMixin): 5681 TYPE = 'threads' 5682 Process = multiprocessing.dummy.Process 5683 connection = multiprocessing.dummy.connection 5684 current_process = staticmethod(multiprocessing.dummy.current_process) 5685 active_children = staticmethod(multiprocessing.dummy.active_children) 5686 Pool = staticmethod(multiprocessing.dummy.Pool) 5687 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5688 Queue = staticmethod(multiprocessing.dummy.Queue) 5689 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5690 Lock = staticmethod(multiprocessing.dummy.Lock) 5691 RLock = staticmethod(multiprocessing.dummy.RLock) 5692 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5693 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5694 Condition = staticmethod(multiprocessing.dummy.Condition) 5695 Event = staticmethod(multiprocessing.dummy.Event) 5696 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5697 Value = staticmethod(multiprocessing.dummy.Value) 5698 Array = staticmethod(multiprocessing.dummy.Array) 5699 5700# 5701# Functions used to create test cases from the base ones in this module 5702# 5703 5704def install_tests_in_module_dict(remote_globs, start_method): 5705 __module__ = remote_globs['__name__'] 5706 local_globs = globals() 5707 ALL_TYPES = {'processes', 'threads', 'manager'} 5708 5709 for name, base in local_globs.items(): 5710 if not isinstance(base, type): 5711 continue 5712 if issubclass(base, BaseTestCase): 5713 if base is BaseTestCase: 5714 continue 5715 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5716 for type_ in base.ALLOWED_TYPES: 5717 newname = 'With' + type_.capitalize() + name[1:] 5718 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5719 class Temp(base, Mixin, unittest.TestCase): 5720 pass 5721 if type_ == 'manager': 5722 Temp = hashlib_helper.requires_hashdigest('md5')(Temp) 5723 Temp.__name__ = Temp.__qualname__ = newname 5724 Temp.__module__ = __module__ 5725 remote_globs[newname] = Temp 5726 elif issubclass(base, unittest.TestCase): 5727 class Temp(base, object): 5728 pass 5729 Temp.__name__ = Temp.__qualname__ = name 5730 Temp.__module__ = __module__ 5731 remote_globs[name] = Temp 5732 5733 dangling = [None, None] 5734 old_start_method = [None] 5735 5736 def setUpModule(): 5737 multiprocessing.set_forkserver_preload(PRELOAD) 5738 multiprocessing.process._cleanup() 5739 dangling[0] = multiprocessing.process._dangling.copy() 5740 dangling[1] = threading._dangling.copy() 5741 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5742 try: 5743 multiprocessing.set_start_method(start_method, force=True) 5744 except ValueError: 5745 raise unittest.SkipTest(start_method + 5746 ' start method not supported') 5747 5748 if sys.platform.startswith("linux"): 5749 try: 5750 lock = multiprocessing.RLock() 5751 except OSError: 5752 raise unittest.SkipTest("OSError raises on RLock creation, " 5753 "see issue 3111!") 5754 check_enough_semaphores() 5755 util.get_temp_dir() # creates temp directory 5756 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5757 5758 def tearDownModule(): 5759 need_sleep = False 5760 5761 # bpo-26762: Some multiprocessing objects like Pool create reference 5762 # cycles. Trigger a garbage collection to break these cycles. 5763 test.support.gc_collect() 5764 5765 multiprocessing.set_start_method(old_start_method[0], force=True) 5766 # pause a bit so we don't get warning about dangling threads/processes 5767 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5768 if processes: 5769 need_sleep = True 5770 test.support.environment_altered = True 5771 support.print_warning(f'Dangling processes: {processes}') 5772 processes = None 5773 5774 threads = set(threading._dangling) - set(dangling[1]) 5775 if threads: 5776 need_sleep = True 5777 test.support.environment_altered = True 5778 support.print_warning(f'Dangling threads: {threads}') 5779 threads = None 5780 5781 # Sleep 500 ms to give time to child processes to complete. 5782 if need_sleep: 5783 time.sleep(0.5) 5784 5785 multiprocessing.util._cleanup_tests() 5786 5787 remote_globs['setUpModule'] = setUpModule 5788 remote_globs['tearDownModule'] = tearDownModule 5789