1"""Unit tests for contextlib.py, and other context managers.""" 2 3import io 4import sys 5import tempfile 6import threading 7import unittest 8from contextlib import * # Tests __all__ 9from test import support 10import weakref 11 12 13class TestAbstractContextManager(unittest.TestCase): 14 15 def test_enter(self): 16 class DefaultEnter(AbstractContextManager): 17 def __exit__(self, *args): 18 super().__exit__(*args) 19 20 manager = DefaultEnter() 21 self.assertIs(manager.__enter__(), manager) 22 23 def test_exit_is_abstract(self): 24 class MissingExit(AbstractContextManager): 25 pass 26 27 with self.assertRaises(TypeError): 28 MissingExit() 29 30 def test_structural_subclassing(self): 31 class ManagerFromScratch: 32 def __enter__(self): 33 return self 34 def __exit__(self, exc_type, exc_value, traceback): 35 return None 36 37 self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager)) 38 39 class DefaultEnter(AbstractContextManager): 40 def __exit__(self, *args): 41 super().__exit__(*args) 42 43 self.assertTrue(issubclass(DefaultEnter, AbstractContextManager)) 44 45 class NoEnter(ManagerFromScratch): 46 __enter__ = None 47 48 self.assertFalse(issubclass(NoEnter, AbstractContextManager)) 49 50 class NoExit(ManagerFromScratch): 51 __exit__ = None 52 53 self.assertFalse(issubclass(NoExit, AbstractContextManager)) 54 55 56class ContextManagerTestCase(unittest.TestCase): 57 58 def test_contextmanager_plain(self): 59 state = [] 60 @contextmanager 61 def woohoo(): 62 state.append(1) 63 yield 42 64 state.append(999) 65 with woohoo() as x: 66 self.assertEqual(state, [1]) 67 self.assertEqual(x, 42) 68 state.append(x) 69 self.assertEqual(state, [1, 42, 999]) 70 71 def test_contextmanager_finally(self): 72 state = [] 73 @contextmanager 74 def woohoo(): 75 state.append(1) 76 try: 77 yield 42 78 finally: 79 state.append(999) 80 with self.assertRaises(ZeroDivisionError): 81 with woohoo() as x: 82 self.assertEqual(state, [1]) 83 self.assertEqual(x, 42) 84 state.append(x) 85 raise ZeroDivisionError() 86 self.assertEqual(state, [1, 42, 999]) 87 88 def test_contextmanager_no_reraise(self): 89 @contextmanager 90 def whee(): 91 yield 92 ctx = whee() 93 ctx.__enter__() 94 # Calling __exit__ should not result in an exception 95 self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None)) 96 97 def test_contextmanager_trap_yield_after_throw(self): 98 @contextmanager 99 def whoo(): 100 try: 101 yield 102 except: 103 yield 104 ctx = whoo() 105 ctx.__enter__() 106 self.assertRaises( 107 RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None 108 ) 109 110 def test_contextmanager_except(self): 111 state = [] 112 @contextmanager 113 def woohoo(): 114 state.append(1) 115 try: 116 yield 42 117 except ZeroDivisionError as e: 118 state.append(e.args[0]) 119 self.assertEqual(state, [1, 42, 999]) 120 with woohoo() as x: 121 self.assertEqual(state, [1]) 122 self.assertEqual(x, 42) 123 state.append(x) 124 raise ZeroDivisionError(999) 125 self.assertEqual(state, [1, 42, 999]) 126 127 def test_contextmanager_except_stopiter(self): 128 stop_exc = StopIteration('spam') 129 @contextmanager 130 def woohoo(): 131 yield 132 try: 133 with self.assertWarnsRegex(DeprecationWarning, 134 "StopIteration"): 135 with woohoo(): 136 raise stop_exc 137 except Exception as ex: 138 self.assertIs(ex, stop_exc) 139 else: 140 self.fail('StopIteration was suppressed') 141 142 def test_contextmanager_except_pep479(self): 143 code = """\ 144from __future__ import generator_stop 145from contextlib import contextmanager 146@contextmanager 147def woohoo(): 148 yield 149""" 150 locals = {} 151 exec(code, locals, locals) 152 woohoo = locals['woohoo'] 153 154 stop_exc = StopIteration('spam') 155 try: 156 with woohoo(): 157 raise stop_exc 158 except Exception as ex: 159 self.assertIs(ex, stop_exc) 160 else: 161 self.fail('StopIteration was suppressed') 162 163 def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self): 164 @contextmanager 165 def test_issue29692(): 166 try: 167 yield 168 except Exception as exc: 169 raise RuntimeError('issue29692:Chained') from exc 170 try: 171 with test_issue29692(): 172 raise ZeroDivisionError 173 except Exception as ex: 174 self.assertIs(type(ex), RuntimeError) 175 self.assertEqual(ex.args[0], 'issue29692:Chained') 176 self.assertIsInstance(ex.__cause__, ZeroDivisionError) 177 178 try: 179 with test_issue29692(): 180 raise StopIteration('issue29692:Unchained') 181 except Exception as ex: 182 self.assertIs(type(ex), StopIteration) 183 self.assertEqual(ex.args[0], 'issue29692:Unchained') 184 self.assertIsNone(ex.__cause__) 185 186 def _create_contextmanager_attribs(self): 187 def attribs(**kw): 188 def decorate(func): 189 for k,v in kw.items(): 190 setattr(func,k,v) 191 return func 192 return decorate 193 @contextmanager 194 @attribs(foo='bar') 195 def baz(spam): 196 """Whee!""" 197 return baz 198 199 def test_contextmanager_attribs(self): 200 baz = self._create_contextmanager_attribs() 201 self.assertEqual(baz.__name__,'baz') 202 self.assertEqual(baz.foo, 'bar') 203 204 @support.requires_docstrings 205 def test_contextmanager_doc_attrib(self): 206 baz = self._create_contextmanager_attribs() 207 self.assertEqual(baz.__doc__, "Whee!") 208 209 @support.requires_docstrings 210 def test_instance_docstring_given_cm_docstring(self): 211 baz = self._create_contextmanager_attribs()(None) 212 self.assertEqual(baz.__doc__, "Whee!") 213 214 def test_keywords(self): 215 # Ensure no keyword arguments are inhibited 216 @contextmanager 217 def woohoo(self, func, args, kwds): 218 yield (self, func, args, kwds) 219 with woohoo(self=11, func=22, args=33, kwds=44) as target: 220 self.assertEqual(target, (11, 22, 33, 44)) 221 222 def test_nokeepref(self): 223 class A: 224 pass 225 226 @contextmanager 227 def woohoo(a, b): 228 a = weakref.ref(a) 229 b = weakref.ref(b) 230 self.assertIsNone(a()) 231 self.assertIsNone(b()) 232 yield 233 234 with woohoo(A(), b=A()): 235 pass 236 237 def test_param_errors(self): 238 @contextmanager 239 def woohoo(a, *, b): 240 yield 241 242 with self.assertRaises(TypeError): 243 woohoo() 244 with self.assertRaises(TypeError): 245 woohoo(3, 5) 246 with self.assertRaises(TypeError): 247 woohoo(b=3) 248 249 def test_recursive(self): 250 depth = 0 251 @contextmanager 252 def woohoo(): 253 nonlocal depth 254 before = depth 255 depth += 1 256 yield 257 depth -= 1 258 self.assertEqual(depth, before) 259 260 @woohoo() 261 def recursive(): 262 if depth < 10: 263 recursive() 264 265 recursive() 266 self.assertEqual(depth, 0) 267 268 269class ClosingTestCase(unittest.TestCase): 270 271 @support.requires_docstrings 272 def test_instance_docs(self): 273 # Issue 19330: ensure context manager instances have good docstrings 274 cm_docstring = closing.__doc__ 275 obj = closing(None) 276 self.assertEqual(obj.__doc__, cm_docstring) 277 278 def test_closing(self): 279 state = [] 280 class C: 281 def close(self): 282 state.append(1) 283 x = C() 284 self.assertEqual(state, []) 285 with closing(x) as y: 286 self.assertEqual(x, y) 287 self.assertEqual(state, [1]) 288 289 def test_closing_error(self): 290 state = [] 291 class C: 292 def close(self): 293 state.append(1) 294 x = C() 295 self.assertEqual(state, []) 296 with self.assertRaises(ZeroDivisionError): 297 with closing(x) as y: 298 self.assertEqual(x, y) 299 1 / 0 300 self.assertEqual(state, [1]) 301 302 303class NullcontextTestCase(unittest.TestCase): 304 def test_nullcontext(self): 305 class C: 306 pass 307 c = C() 308 with nullcontext(c) as c_in: 309 self.assertIs(c_in, c) 310 311 312class FileContextTestCase(unittest.TestCase): 313 314 def testWithOpen(self): 315 tfn = tempfile.mktemp() 316 try: 317 f = None 318 with open(tfn, "w") as f: 319 self.assertFalse(f.closed) 320 f.write("Booh\n") 321 self.assertTrue(f.closed) 322 f = None 323 with self.assertRaises(ZeroDivisionError): 324 with open(tfn, "r") as f: 325 self.assertFalse(f.closed) 326 self.assertEqual(f.read(), "Booh\n") 327 1 / 0 328 self.assertTrue(f.closed) 329 finally: 330 support.unlink(tfn) 331 332class LockContextTestCase(unittest.TestCase): 333 334 def boilerPlate(self, lock, locked): 335 self.assertFalse(locked()) 336 with lock: 337 self.assertTrue(locked()) 338 self.assertFalse(locked()) 339 with self.assertRaises(ZeroDivisionError): 340 with lock: 341 self.assertTrue(locked()) 342 1 / 0 343 self.assertFalse(locked()) 344 345 def testWithLock(self): 346 lock = threading.Lock() 347 self.boilerPlate(lock, lock.locked) 348 349 def testWithRLock(self): 350 lock = threading.RLock() 351 self.boilerPlate(lock, lock._is_owned) 352 353 def testWithCondition(self): 354 lock = threading.Condition() 355 def locked(): 356 return lock._is_owned() 357 self.boilerPlate(lock, locked) 358 359 def testWithSemaphore(self): 360 lock = threading.Semaphore() 361 def locked(): 362 if lock.acquire(False): 363 lock.release() 364 return False 365 else: 366 return True 367 self.boilerPlate(lock, locked) 368 369 def testWithBoundedSemaphore(self): 370 lock = threading.BoundedSemaphore() 371 def locked(): 372 if lock.acquire(False): 373 lock.release() 374 return False 375 else: 376 return True 377 self.boilerPlate(lock, locked) 378 379 380class mycontext(ContextDecorator): 381 """Example decoration-compatible context manager for testing""" 382 started = False 383 exc = None 384 catch = False 385 386 def __enter__(self): 387 self.started = True 388 return self 389 390 def __exit__(self, *exc): 391 self.exc = exc 392 return self.catch 393 394 395class TestContextDecorator(unittest.TestCase): 396 397 @support.requires_docstrings 398 def test_instance_docs(self): 399 # Issue 19330: ensure context manager instances have good docstrings 400 cm_docstring = mycontext.__doc__ 401 obj = mycontext() 402 self.assertEqual(obj.__doc__, cm_docstring) 403 404 def test_contextdecorator(self): 405 context = mycontext() 406 with context as result: 407 self.assertIs(result, context) 408 self.assertTrue(context.started) 409 410 self.assertEqual(context.exc, (None, None, None)) 411 412 413 def test_contextdecorator_with_exception(self): 414 context = mycontext() 415 416 with self.assertRaisesRegex(NameError, 'foo'): 417 with context: 418 raise NameError('foo') 419 self.assertIsNotNone(context.exc) 420 self.assertIs(context.exc[0], NameError) 421 422 context = mycontext() 423 context.catch = True 424 with context: 425 raise NameError('foo') 426 self.assertIsNotNone(context.exc) 427 self.assertIs(context.exc[0], NameError) 428 429 430 def test_decorator(self): 431 context = mycontext() 432 433 @context 434 def test(): 435 self.assertIsNone(context.exc) 436 self.assertTrue(context.started) 437 test() 438 self.assertEqual(context.exc, (None, None, None)) 439 440 441 def test_decorator_with_exception(self): 442 context = mycontext() 443 444 @context 445 def test(): 446 self.assertIsNone(context.exc) 447 self.assertTrue(context.started) 448 raise NameError('foo') 449 450 with self.assertRaisesRegex(NameError, 'foo'): 451 test() 452 self.assertIsNotNone(context.exc) 453 self.assertIs(context.exc[0], NameError) 454 455 456 def test_decorating_method(self): 457 context = mycontext() 458 459 class Test(object): 460 461 @context 462 def method(self, a, b, c=None): 463 self.a = a 464 self.b = b 465 self.c = c 466 467 # these tests are for argument passing when used as a decorator 468 test = Test() 469 test.method(1, 2) 470 self.assertEqual(test.a, 1) 471 self.assertEqual(test.b, 2) 472 self.assertEqual(test.c, None) 473 474 test = Test() 475 test.method('a', 'b', 'c') 476 self.assertEqual(test.a, 'a') 477 self.assertEqual(test.b, 'b') 478 self.assertEqual(test.c, 'c') 479 480 test = Test() 481 test.method(a=1, b=2) 482 self.assertEqual(test.a, 1) 483 self.assertEqual(test.b, 2) 484 485 486 def test_typo_enter(self): 487 class mycontext(ContextDecorator): 488 def __unter__(self): 489 pass 490 def __exit__(self, *exc): 491 pass 492 493 with self.assertRaises(AttributeError): 494 with mycontext(): 495 pass 496 497 498 def test_typo_exit(self): 499 class mycontext(ContextDecorator): 500 def __enter__(self): 501 pass 502 def __uxit__(self, *exc): 503 pass 504 505 with self.assertRaises(AttributeError): 506 with mycontext(): 507 pass 508 509 510 def test_contextdecorator_as_mixin(self): 511 class somecontext(object): 512 started = False 513 exc = None 514 515 def __enter__(self): 516 self.started = True 517 return self 518 519 def __exit__(self, *exc): 520 self.exc = exc 521 522 class mycontext(somecontext, ContextDecorator): 523 pass 524 525 context = mycontext() 526 @context 527 def test(): 528 self.assertIsNone(context.exc) 529 self.assertTrue(context.started) 530 test() 531 self.assertEqual(context.exc, (None, None, None)) 532 533 534 def test_contextmanager_as_decorator(self): 535 @contextmanager 536 def woohoo(y): 537 state.append(y) 538 yield 539 state.append(999) 540 541 state = [] 542 @woohoo(1) 543 def test(x): 544 self.assertEqual(state, [1]) 545 state.append(x) 546 test('something') 547 self.assertEqual(state, [1, 'something', 999]) 548 549 # Issue #11647: Ensure the decorated function is 'reusable' 550 state = [] 551 test('something else') 552 self.assertEqual(state, [1, 'something else', 999]) 553 554 555class TestBaseExitStack: 556 exit_stack = None 557 558 @support.requires_docstrings 559 def test_instance_docs(self): 560 # Issue 19330: ensure context manager instances have good docstrings 561 cm_docstring = self.exit_stack.__doc__ 562 obj = self.exit_stack() 563 self.assertEqual(obj.__doc__, cm_docstring) 564 565 def test_no_resources(self): 566 with self.exit_stack(): 567 pass 568 569 def test_callback(self): 570 expected = [ 571 ((), {}), 572 ((1,), {}), 573 ((1,2), {}), 574 ((), dict(example=1)), 575 ((1,), dict(example=1)), 576 ((1,2), dict(example=1)), 577 ((1,2), dict(self=3, callback=4)), 578 ] 579 result = [] 580 def _exit(*args, **kwds): 581 """Test metadata propagation""" 582 result.append((args, kwds)) 583 with self.exit_stack() as stack: 584 for args, kwds in reversed(expected): 585 if args and kwds: 586 f = stack.callback(_exit, *args, **kwds) 587 elif args: 588 f = stack.callback(_exit, *args) 589 elif kwds: 590 f = stack.callback(_exit, **kwds) 591 else: 592 f = stack.callback(_exit) 593 self.assertIs(f, _exit) 594 for wrapper in stack._exit_callbacks: 595 self.assertIs(wrapper[1].__wrapped__, _exit) 596 self.assertNotEqual(wrapper[1].__name__, _exit.__name__) 597 self.assertIsNone(wrapper[1].__doc__, _exit.__doc__) 598 self.assertEqual(result, expected) 599 600 result = [] 601 with self.exit_stack() as stack: 602 with self.assertRaises(TypeError): 603 stack.callback(arg=1) 604 with self.assertRaises(TypeError): 605 self.exit_stack.callback(arg=2) 606 with self.assertRaises(TypeError): 607 stack.callback(callback=_exit, arg=3) 608 self.assertEqual(result, []) 609 610 def test_push(self): 611 exc_raised = ZeroDivisionError 612 def _expect_exc(exc_type, exc, exc_tb): 613 self.assertIs(exc_type, exc_raised) 614 def _suppress_exc(*exc_details): 615 return True 616 def _expect_ok(exc_type, exc, exc_tb): 617 self.assertIsNone(exc_type) 618 self.assertIsNone(exc) 619 self.assertIsNone(exc_tb) 620 class ExitCM(object): 621 def __init__(self, check_exc): 622 self.check_exc = check_exc 623 def __enter__(self): 624 self.fail("Should not be called!") 625 def __exit__(self, *exc_details): 626 self.check_exc(*exc_details) 627 with self.exit_stack() as stack: 628 stack.push(_expect_ok) 629 self.assertIs(stack._exit_callbacks[-1][1], _expect_ok) 630 cm = ExitCM(_expect_ok) 631 stack.push(cm) 632 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) 633 stack.push(_suppress_exc) 634 self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc) 635 cm = ExitCM(_expect_exc) 636 stack.push(cm) 637 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) 638 stack.push(_expect_exc) 639 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc) 640 stack.push(_expect_exc) 641 self.assertIs(stack._exit_callbacks[-1][1], _expect_exc) 642 1/0 643 644 def test_enter_context(self): 645 class TestCM(object): 646 def __enter__(self): 647 result.append(1) 648 def __exit__(self, *exc_details): 649 result.append(3) 650 651 result = [] 652 cm = TestCM() 653 with self.exit_stack() as stack: 654 @stack.callback # Registered first => cleaned up last 655 def _exit(): 656 result.append(4) 657 self.assertIsNotNone(_exit) 658 stack.enter_context(cm) 659 self.assertIs(stack._exit_callbacks[-1][1].__self__, cm) 660 result.append(2) 661 self.assertEqual(result, [1, 2, 3, 4]) 662 663 def test_close(self): 664 result = [] 665 with self.exit_stack() as stack: 666 @stack.callback 667 def _exit(): 668 result.append(1) 669 self.assertIsNotNone(_exit) 670 stack.close() 671 result.append(2) 672 self.assertEqual(result, [1, 2]) 673 674 def test_pop_all(self): 675 result = [] 676 with self.exit_stack() as stack: 677 @stack.callback 678 def _exit(): 679 result.append(3) 680 self.assertIsNotNone(_exit) 681 new_stack = stack.pop_all() 682 result.append(1) 683 result.append(2) 684 new_stack.close() 685 self.assertEqual(result, [1, 2, 3]) 686 687 def test_exit_raise(self): 688 with self.assertRaises(ZeroDivisionError): 689 with self.exit_stack() as stack: 690 stack.push(lambda *exc: False) 691 1/0 692 693 def test_exit_suppress(self): 694 with self.exit_stack() as stack: 695 stack.push(lambda *exc: True) 696 1/0 697 698 def test_exit_exception_chaining_reference(self): 699 # Sanity check to make sure that ExitStack chaining matches 700 # actual nested with statements 701 class RaiseExc: 702 def __init__(self, exc): 703 self.exc = exc 704 def __enter__(self): 705 return self 706 def __exit__(self, *exc_details): 707 raise self.exc 708 709 class RaiseExcWithContext: 710 def __init__(self, outer, inner): 711 self.outer = outer 712 self.inner = inner 713 def __enter__(self): 714 return self 715 def __exit__(self, *exc_details): 716 try: 717 raise self.inner 718 except: 719 raise self.outer 720 721 class SuppressExc: 722 def __enter__(self): 723 return self 724 def __exit__(self, *exc_details): 725 type(self).saved_details = exc_details 726 return True 727 728 try: 729 with RaiseExc(IndexError): 730 with RaiseExcWithContext(KeyError, AttributeError): 731 with SuppressExc(): 732 with RaiseExc(ValueError): 733 1 / 0 734 except IndexError as exc: 735 self.assertIsInstance(exc.__context__, KeyError) 736 self.assertIsInstance(exc.__context__.__context__, AttributeError) 737 # Inner exceptions were suppressed 738 self.assertIsNone(exc.__context__.__context__.__context__) 739 else: 740 self.fail("Expected IndexError, but no exception was raised") 741 # Check the inner exceptions 742 inner_exc = SuppressExc.saved_details[1] 743 self.assertIsInstance(inner_exc, ValueError) 744 self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) 745 746 def test_exit_exception_chaining(self): 747 # Ensure exception chaining matches the reference behaviour 748 def raise_exc(exc): 749 raise exc 750 751 saved_details = None 752 def suppress_exc(*exc_details): 753 nonlocal saved_details 754 saved_details = exc_details 755 return True 756 757 try: 758 with self.exit_stack() as stack: 759 stack.callback(raise_exc, IndexError) 760 stack.callback(raise_exc, KeyError) 761 stack.callback(raise_exc, AttributeError) 762 stack.push(suppress_exc) 763 stack.callback(raise_exc, ValueError) 764 1 / 0 765 except IndexError as exc: 766 self.assertIsInstance(exc.__context__, KeyError) 767 self.assertIsInstance(exc.__context__.__context__, AttributeError) 768 # Inner exceptions were suppressed 769 self.assertIsNone(exc.__context__.__context__.__context__) 770 else: 771 self.fail("Expected IndexError, but no exception was raised") 772 # Check the inner exceptions 773 inner_exc = saved_details[1] 774 self.assertIsInstance(inner_exc, ValueError) 775 self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) 776 777 def test_exit_exception_non_suppressing(self): 778 # http://bugs.python.org/issue19092 779 def raise_exc(exc): 780 raise exc 781 782 def suppress_exc(*exc_details): 783 return True 784 785 try: 786 with self.exit_stack() as stack: 787 stack.callback(lambda: None) 788 stack.callback(raise_exc, IndexError) 789 except Exception as exc: 790 self.assertIsInstance(exc, IndexError) 791 else: 792 self.fail("Expected IndexError, but no exception was raised") 793 794 try: 795 with self.exit_stack() as stack: 796 stack.callback(raise_exc, KeyError) 797 stack.push(suppress_exc) 798 stack.callback(raise_exc, IndexError) 799 except Exception as exc: 800 self.assertIsInstance(exc, KeyError) 801 else: 802 self.fail("Expected KeyError, but no exception was raised") 803 804 def test_exit_exception_with_correct_context(self): 805 # http://bugs.python.org/issue20317 806 @contextmanager 807 def gets_the_context_right(exc): 808 try: 809 yield 810 finally: 811 raise exc 812 813 exc1 = Exception(1) 814 exc2 = Exception(2) 815 exc3 = Exception(3) 816 exc4 = Exception(4) 817 818 # The contextmanager already fixes the context, so prior to the 819 # fix, ExitStack would try to fix it *again* and get into an 820 # infinite self-referential loop 821 try: 822 with self.exit_stack() as stack: 823 stack.enter_context(gets_the_context_right(exc4)) 824 stack.enter_context(gets_the_context_right(exc3)) 825 stack.enter_context(gets_the_context_right(exc2)) 826 raise exc1 827 except Exception as exc: 828 self.assertIs(exc, exc4) 829 self.assertIs(exc.__context__, exc3) 830 self.assertIs(exc.__context__.__context__, exc2) 831 self.assertIs(exc.__context__.__context__.__context__, exc1) 832 self.assertIsNone( 833 exc.__context__.__context__.__context__.__context__) 834 835 def test_exit_exception_with_existing_context(self): 836 # Addresses a lack of test coverage discovered after checking in a 837 # fix for issue 20317 that still contained debugging code. 838 def raise_nested(inner_exc, outer_exc): 839 try: 840 raise inner_exc 841 finally: 842 raise outer_exc 843 exc1 = Exception(1) 844 exc2 = Exception(2) 845 exc3 = Exception(3) 846 exc4 = Exception(4) 847 exc5 = Exception(5) 848 try: 849 with self.exit_stack() as stack: 850 stack.callback(raise_nested, exc4, exc5) 851 stack.callback(raise_nested, exc2, exc3) 852 raise exc1 853 except Exception as exc: 854 self.assertIs(exc, exc5) 855 self.assertIs(exc.__context__, exc4) 856 self.assertIs(exc.__context__.__context__, exc3) 857 self.assertIs(exc.__context__.__context__.__context__, exc2) 858 self.assertIs( 859 exc.__context__.__context__.__context__.__context__, exc1) 860 self.assertIsNone( 861 exc.__context__.__context__.__context__.__context__.__context__) 862 863 def test_body_exception_suppress(self): 864 def suppress_exc(*exc_details): 865 return True 866 try: 867 with self.exit_stack() as stack: 868 stack.push(suppress_exc) 869 1/0 870 except IndexError as exc: 871 self.fail("Expected no exception, got IndexError") 872 873 def test_exit_exception_chaining_suppress(self): 874 with self.exit_stack() as stack: 875 stack.push(lambda *exc: True) 876 stack.push(lambda *exc: 1/0) 877 stack.push(lambda *exc: {}[1]) 878 879 def test_excessive_nesting(self): 880 # The original implementation would die with RecursionError here 881 with self.exit_stack() as stack: 882 for i in range(10000): 883 stack.callback(int) 884 885 def test_instance_bypass(self): 886 class Example(object): pass 887 cm = Example() 888 cm.__exit__ = object() 889 stack = self.exit_stack() 890 self.assertRaises(AttributeError, stack.enter_context, cm) 891 stack.push(cm) 892 self.assertIs(stack._exit_callbacks[-1][1], cm) 893 894 def test_dont_reraise_RuntimeError(self): 895 # https://bugs.python.org/issue27122 896 class UniqueException(Exception): pass 897 class UniqueRuntimeError(RuntimeError): pass 898 899 @contextmanager 900 def second(): 901 try: 902 yield 1 903 except Exception as exc: 904 raise UniqueException("new exception") from exc 905 906 @contextmanager 907 def first(): 908 try: 909 yield 1 910 except Exception as exc: 911 raise exc 912 913 # The UniqueRuntimeError should be caught by second()'s exception 914 # handler which chain raised a new UniqueException. 915 with self.assertRaises(UniqueException) as err_ctx: 916 with self.exit_stack() as es_ctx: 917 es_ctx.enter_context(second()) 918 es_ctx.enter_context(first()) 919 raise UniqueRuntimeError("please no infinite loop.") 920 921 exc = err_ctx.exception 922 self.assertIsInstance(exc, UniqueException) 923 self.assertIsInstance(exc.__context__, UniqueRuntimeError) 924 self.assertIsNone(exc.__context__.__context__) 925 self.assertIsNone(exc.__context__.__cause__) 926 self.assertIs(exc.__cause__, exc.__context__) 927 928 929class TestExitStack(TestBaseExitStack, unittest.TestCase): 930 exit_stack = ExitStack 931 932 933class TestRedirectStream: 934 935 redirect_stream = None 936 orig_stream = None 937 938 @support.requires_docstrings 939 def test_instance_docs(self): 940 # Issue 19330: ensure context manager instances have good docstrings 941 cm_docstring = self.redirect_stream.__doc__ 942 obj = self.redirect_stream(None) 943 self.assertEqual(obj.__doc__, cm_docstring) 944 945 def test_no_redirect_in_init(self): 946 orig_stdout = getattr(sys, self.orig_stream) 947 self.redirect_stream(None) 948 self.assertIs(getattr(sys, self.orig_stream), orig_stdout) 949 950 def test_redirect_to_string_io(self): 951 f = io.StringIO() 952 msg = "Consider an API like help(), which prints directly to stdout" 953 orig_stdout = getattr(sys, self.orig_stream) 954 with self.redirect_stream(f): 955 print(msg, file=getattr(sys, self.orig_stream)) 956 self.assertIs(getattr(sys, self.orig_stream), orig_stdout) 957 s = f.getvalue().strip() 958 self.assertEqual(s, msg) 959 960 def test_enter_result_is_target(self): 961 f = io.StringIO() 962 with self.redirect_stream(f) as enter_result: 963 self.assertIs(enter_result, f) 964 965 def test_cm_is_reusable(self): 966 f = io.StringIO() 967 write_to_f = self.redirect_stream(f) 968 orig_stdout = getattr(sys, self.orig_stream) 969 with write_to_f: 970 print("Hello", end=" ", file=getattr(sys, self.orig_stream)) 971 with write_to_f: 972 print("World!", file=getattr(sys, self.orig_stream)) 973 self.assertIs(getattr(sys, self.orig_stream), orig_stdout) 974 s = f.getvalue() 975 self.assertEqual(s, "Hello World!\n") 976 977 def test_cm_is_reentrant(self): 978 f = io.StringIO() 979 write_to_f = self.redirect_stream(f) 980 orig_stdout = getattr(sys, self.orig_stream) 981 with write_to_f: 982 print("Hello", end=" ", file=getattr(sys, self.orig_stream)) 983 with write_to_f: 984 print("World!", file=getattr(sys, self.orig_stream)) 985 self.assertIs(getattr(sys, self.orig_stream), orig_stdout) 986 s = f.getvalue() 987 self.assertEqual(s, "Hello World!\n") 988 989 990class TestRedirectStdout(TestRedirectStream, unittest.TestCase): 991 992 redirect_stream = redirect_stdout 993 orig_stream = "stdout" 994 995 996class TestRedirectStderr(TestRedirectStream, unittest.TestCase): 997 998 redirect_stream = redirect_stderr 999 orig_stream = "stderr" 1000 1001 1002class TestSuppress(unittest.TestCase): 1003 1004 @support.requires_docstrings 1005 def test_instance_docs(self): 1006 # Issue 19330: ensure context manager instances have good docstrings 1007 cm_docstring = suppress.__doc__ 1008 obj = suppress() 1009 self.assertEqual(obj.__doc__, cm_docstring) 1010 1011 def test_no_result_from_enter(self): 1012 with suppress(ValueError) as enter_result: 1013 self.assertIsNone(enter_result) 1014 1015 def test_no_exception(self): 1016 with suppress(ValueError): 1017 self.assertEqual(pow(2, 5), 32) 1018 1019 def test_exact_exception(self): 1020 with suppress(TypeError): 1021 len(5) 1022 1023 def test_exception_hierarchy(self): 1024 with suppress(LookupError): 1025 'Hello'[50] 1026 1027 def test_other_exception(self): 1028 with self.assertRaises(ZeroDivisionError): 1029 with suppress(TypeError): 1030 1/0 1031 1032 def test_no_args(self): 1033 with self.assertRaises(ZeroDivisionError): 1034 with suppress(): 1035 1/0 1036 1037 def test_multiple_exception_args(self): 1038 with suppress(ZeroDivisionError, TypeError): 1039 1/0 1040 with suppress(ZeroDivisionError, TypeError): 1041 len(5) 1042 1043 def test_cm_is_reentrant(self): 1044 ignore_exceptions = suppress(Exception) 1045 with ignore_exceptions: 1046 pass 1047 with ignore_exceptions: 1048 len(5) 1049 with ignore_exceptions: 1050 with ignore_exceptions: # Check nested usage 1051 len(5) 1052 outer_continued = True 1053 1/0 1054 self.assertTrue(outer_continued) 1055 1056if __name__ == "__main__": 1057 unittest.main() 1058