1""" 2Tests for object finalization semantics, as outlined in PEP 442. 3""" 4 5import contextlib 6import gc 7import unittest 8import weakref 9 10try: 11 from _testcapi import with_tp_del 12except ImportError: 13 def with_tp_del(cls): 14 class C(object): 15 def __new__(cls, *args, **kwargs): 16 raise TypeError('requires _testcapi.with_tp_del') 17 return C 18 19try: 20 from _testcapi import without_gc 21except ImportError: 22 def without_gc(cls): 23 class C: 24 def __new__(cls, *args, **kwargs): 25 raise TypeError('requires _testcapi.without_gc') 26 return C 27 28from test import support 29 30 31class NonGCSimpleBase: 32 """ 33 The base class for all the objects under test, equipped with various 34 testing features. 35 """ 36 37 survivors = [] 38 del_calls = [] 39 tp_del_calls = [] 40 errors = [] 41 42 _cleaning = False 43 44 __slots__ = () 45 46 @classmethod 47 def _cleanup(cls): 48 cls.survivors.clear() 49 cls.errors.clear() 50 gc.garbage.clear() 51 gc.collect() 52 cls.del_calls.clear() 53 cls.tp_del_calls.clear() 54 55 @classmethod 56 @contextlib.contextmanager 57 def test(cls): 58 """ 59 A context manager to use around all finalization tests. 60 """ 61 with support.disable_gc(): 62 cls.del_calls.clear() 63 cls.tp_del_calls.clear() 64 NonGCSimpleBase._cleaning = False 65 try: 66 yield 67 if cls.errors: 68 raise cls.errors[0] 69 finally: 70 NonGCSimpleBase._cleaning = True 71 cls._cleanup() 72 73 def check_sanity(self): 74 """ 75 Check the object is sane (non-broken). 76 """ 77 78 def __del__(self): 79 """ 80 PEP 442 finalizer. Record that this was called, check the 81 object is in a sane state, and invoke a side effect. 82 """ 83 try: 84 if not self._cleaning: 85 self.del_calls.append(id(self)) 86 self.check_sanity() 87 self.side_effect() 88 except Exception as e: 89 self.errors.append(e) 90 91 def side_effect(self): 92 """ 93 A side effect called on destruction. 94 """ 95 96 97class SimpleBase(NonGCSimpleBase): 98 99 def __init__(self): 100 self.id_ = id(self) 101 102 def check_sanity(self): 103 assert self.id_ == id(self) 104 105 106@without_gc 107class NonGC(NonGCSimpleBase): 108 __slots__ = () 109 110@without_gc 111class NonGCResurrector(NonGCSimpleBase): 112 __slots__ = () 113 114 def side_effect(self): 115 """ 116 Resurrect self by storing self in a class-wide list. 117 """ 118 self.survivors.append(self) 119 120class Simple(SimpleBase): 121 pass 122 123# Can't inherit from NonGCResurrector, in case importing without_gc fails. 124class SimpleResurrector(SimpleBase): 125 126 def side_effect(self): 127 """ 128 Resurrect self by storing self in a class-wide list. 129 """ 130 self.survivors.append(self) 131 132 133class TestBase: 134 135 def setUp(self): 136 self.old_garbage = gc.garbage[:] 137 gc.garbage[:] = [] 138 139 def tearDown(self): 140 # None of the tests here should put anything in gc.garbage 141 try: 142 self.assertEqual(gc.garbage, []) 143 finally: 144 del self.old_garbage 145 gc.collect() 146 147 def assert_del_calls(self, ids): 148 self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids)) 149 150 def assert_tp_del_calls(self, ids): 151 self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids)) 152 153 def assert_survivors(self, ids): 154 self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids)) 155 156 def assert_garbage(self, ids): 157 self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids)) 158 159 def clear_survivors(self): 160 SimpleBase.survivors.clear() 161 162 163class SimpleFinalizationTest(TestBase, unittest.TestCase): 164 """ 165 Test finalization without refcycles. 166 """ 167 168 def test_simple(self): 169 with SimpleBase.test(): 170 s = Simple() 171 ids = [id(s)] 172 wr = weakref.ref(s) 173 del s 174 gc.collect() 175 self.assert_del_calls(ids) 176 self.assert_survivors([]) 177 self.assertIs(wr(), None) 178 gc.collect() 179 self.assert_del_calls(ids) 180 self.assert_survivors([]) 181 182 def test_simple_resurrect(self): 183 with SimpleBase.test(): 184 s = SimpleResurrector() 185 ids = [id(s)] 186 wr = weakref.ref(s) 187 del s 188 gc.collect() 189 self.assert_del_calls(ids) 190 self.assert_survivors(ids) 191 self.assertIsNot(wr(), None) 192 self.clear_survivors() 193 gc.collect() 194 self.assert_del_calls(ids) 195 self.assert_survivors([]) 196 self.assertIs(wr(), None) 197 198 @support.cpython_only 199 def test_non_gc(self): 200 with SimpleBase.test(): 201 s = NonGC() 202 self.assertFalse(gc.is_tracked(s)) 203 ids = [id(s)] 204 del s 205 gc.collect() 206 self.assert_del_calls(ids) 207 self.assert_survivors([]) 208 gc.collect() 209 self.assert_del_calls(ids) 210 self.assert_survivors([]) 211 212 @support.cpython_only 213 def test_non_gc_resurrect(self): 214 with SimpleBase.test(): 215 s = NonGCResurrector() 216 self.assertFalse(gc.is_tracked(s)) 217 ids = [id(s)] 218 del s 219 gc.collect() 220 self.assert_del_calls(ids) 221 self.assert_survivors(ids) 222 self.clear_survivors() 223 gc.collect() 224 self.assert_del_calls(ids * 2) 225 self.assert_survivors(ids) 226 227 228class SelfCycleBase: 229 230 def __init__(self): 231 super().__init__() 232 self.ref = self 233 234 def check_sanity(self): 235 super().check_sanity() 236 assert self.ref is self 237 238class SimpleSelfCycle(SelfCycleBase, Simple): 239 pass 240 241class SelfCycleResurrector(SelfCycleBase, SimpleResurrector): 242 pass 243 244class SuicidalSelfCycle(SelfCycleBase, Simple): 245 246 def side_effect(self): 247 """ 248 Explicitly break the reference cycle. 249 """ 250 self.ref = None 251 252 253class SelfCycleFinalizationTest(TestBase, unittest.TestCase): 254 """ 255 Test finalization of an object having a single cyclic reference to 256 itself. 257 """ 258 259 def test_simple(self): 260 with SimpleBase.test(): 261 s = SimpleSelfCycle() 262 ids = [id(s)] 263 wr = weakref.ref(s) 264 del s 265 gc.collect() 266 self.assert_del_calls(ids) 267 self.assert_survivors([]) 268 self.assertIs(wr(), None) 269 gc.collect() 270 self.assert_del_calls(ids) 271 self.assert_survivors([]) 272 273 def test_simple_resurrect(self): 274 # Test that __del__ can resurrect the object being finalized. 275 with SimpleBase.test(): 276 s = SelfCycleResurrector() 277 ids = [id(s)] 278 wr = weakref.ref(s) 279 del s 280 gc.collect() 281 self.assert_del_calls(ids) 282 self.assert_survivors(ids) 283 # XXX is this desirable? 284 self.assertIs(wr(), None) 285 # When trying to destroy the object a second time, __del__ 286 # isn't called anymore (and the object isn't resurrected). 287 self.clear_survivors() 288 gc.collect() 289 self.assert_del_calls(ids) 290 self.assert_survivors([]) 291 self.assertIs(wr(), None) 292 293 def test_simple_suicide(self): 294 # Test the GC is able to deal with an object that kills its last 295 # reference during __del__. 296 with SimpleBase.test(): 297 s = SuicidalSelfCycle() 298 ids = [id(s)] 299 wr = weakref.ref(s) 300 del s 301 gc.collect() 302 self.assert_del_calls(ids) 303 self.assert_survivors([]) 304 self.assertIs(wr(), None) 305 gc.collect() 306 self.assert_del_calls(ids) 307 self.assert_survivors([]) 308 self.assertIs(wr(), None) 309 310 311class ChainedBase: 312 313 def chain(self, left): 314 self.suicided = False 315 self.left = left 316 left.right = self 317 318 def check_sanity(self): 319 super().check_sanity() 320 if self.suicided: 321 assert self.left is None 322 assert self.right is None 323 else: 324 left = self.left 325 if left.suicided: 326 assert left.right is None 327 else: 328 assert left.right is self 329 right = self.right 330 if right.suicided: 331 assert right.left is None 332 else: 333 assert right.left is self 334 335class SimpleChained(ChainedBase, Simple): 336 pass 337 338class ChainedResurrector(ChainedBase, SimpleResurrector): 339 pass 340 341class SuicidalChained(ChainedBase, Simple): 342 343 def side_effect(self): 344 """ 345 Explicitly break the reference cycle. 346 """ 347 self.suicided = True 348 self.left = None 349 self.right = None 350 351 352class CycleChainFinalizationTest(TestBase, unittest.TestCase): 353 """ 354 Test finalization of a cyclic chain. These tests are similar in 355 spirit to the self-cycle tests above, but the collectable object 356 graph isn't trivial anymore. 357 """ 358 359 def build_chain(self, classes): 360 nodes = [cls() for cls in classes] 361 for i in range(len(nodes)): 362 nodes[i].chain(nodes[i-1]) 363 return nodes 364 365 def check_non_resurrecting_chain(self, classes): 366 N = len(classes) 367 with SimpleBase.test(): 368 nodes = self.build_chain(classes) 369 ids = [id(s) for s in nodes] 370 wrs = [weakref.ref(s) for s in nodes] 371 del nodes 372 gc.collect() 373 self.assert_del_calls(ids) 374 self.assert_survivors([]) 375 self.assertEqual([wr() for wr in wrs], [None] * N) 376 gc.collect() 377 self.assert_del_calls(ids) 378 379 def check_resurrecting_chain(self, classes): 380 N = len(classes) 381 with SimpleBase.test(): 382 nodes = self.build_chain(classes) 383 N = len(nodes) 384 ids = [id(s) for s in nodes] 385 survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)] 386 wrs = [weakref.ref(s) for s in nodes] 387 del nodes 388 gc.collect() 389 self.assert_del_calls(ids) 390 self.assert_survivors(survivor_ids) 391 # XXX desirable? 392 self.assertEqual([wr() for wr in wrs], [None] * N) 393 self.clear_survivors() 394 gc.collect() 395 self.assert_del_calls(ids) 396 self.assert_survivors([]) 397 398 def test_homogenous(self): 399 self.check_non_resurrecting_chain([SimpleChained] * 3) 400 401 def test_homogenous_resurrect(self): 402 self.check_resurrecting_chain([ChainedResurrector] * 3) 403 404 def test_homogenous_suicidal(self): 405 self.check_non_resurrecting_chain([SuicidalChained] * 3) 406 407 def test_heterogenous_suicidal_one(self): 408 self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2) 409 410 def test_heterogenous_suicidal_two(self): 411 self.check_non_resurrecting_chain( 412 [SuicidalChained] * 2 + [SimpleChained] * 2) 413 414 def test_heterogenous_resurrect_one(self): 415 self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2) 416 417 def test_heterogenous_resurrect_two(self): 418 self.check_resurrecting_chain( 419 [ChainedResurrector, SimpleChained, SuicidalChained] * 2) 420 421 def test_heterogenous_resurrect_three(self): 422 self.check_resurrecting_chain( 423 [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2) 424 425 426# NOTE: the tp_del slot isn't automatically inherited, so we have to call 427# with_tp_del() for each instantiated class. 428 429class LegacyBase(SimpleBase): 430 431 def __del__(self): 432 try: 433 # Do not invoke side_effect here, since we are now exercising 434 # the tp_del slot. 435 if not self._cleaning: 436 self.del_calls.append(id(self)) 437 self.check_sanity() 438 except Exception as e: 439 self.errors.append(e) 440 441 def __tp_del__(self): 442 """ 443 Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot. 444 """ 445 try: 446 if not self._cleaning: 447 self.tp_del_calls.append(id(self)) 448 self.check_sanity() 449 self.side_effect() 450 except Exception as e: 451 self.errors.append(e) 452 453@with_tp_del 454class Legacy(LegacyBase): 455 pass 456 457@with_tp_del 458class LegacyResurrector(LegacyBase): 459 460 def side_effect(self): 461 """ 462 Resurrect self by storing self in a class-wide list. 463 """ 464 self.survivors.append(self) 465 466@with_tp_del 467class LegacySelfCycle(SelfCycleBase, LegacyBase): 468 pass 469 470 471@support.cpython_only 472class LegacyFinalizationTest(TestBase, unittest.TestCase): 473 """ 474 Test finalization of objects with a tp_del. 475 """ 476 477 def tearDown(self): 478 # These tests need to clean up a bit more, since they create 479 # uncollectable objects. 480 gc.garbage.clear() 481 gc.collect() 482 super().tearDown() 483 484 def test_legacy(self): 485 with SimpleBase.test(): 486 s = Legacy() 487 ids = [id(s)] 488 wr = weakref.ref(s) 489 del s 490 gc.collect() 491 self.assert_del_calls(ids) 492 self.assert_tp_del_calls(ids) 493 self.assert_survivors([]) 494 self.assertIs(wr(), None) 495 gc.collect() 496 self.assert_del_calls(ids) 497 self.assert_tp_del_calls(ids) 498 499 def test_legacy_resurrect(self): 500 with SimpleBase.test(): 501 s = LegacyResurrector() 502 ids = [id(s)] 503 wr = weakref.ref(s) 504 del s 505 gc.collect() 506 self.assert_del_calls(ids) 507 self.assert_tp_del_calls(ids) 508 self.assert_survivors(ids) 509 # weakrefs are cleared before tp_del is called. 510 self.assertIs(wr(), None) 511 self.clear_survivors() 512 gc.collect() 513 self.assert_del_calls(ids) 514 self.assert_tp_del_calls(ids * 2) 515 self.assert_survivors(ids) 516 self.assertIs(wr(), None) 517 518 def test_legacy_self_cycle(self): 519 # Self-cycles with legacy finalizers end up in gc.garbage. 520 with SimpleBase.test(): 521 s = LegacySelfCycle() 522 ids = [id(s)] 523 wr = weakref.ref(s) 524 del s 525 gc.collect() 526 self.assert_del_calls([]) 527 self.assert_tp_del_calls([]) 528 self.assert_survivors([]) 529 self.assert_garbage(ids) 530 self.assertIsNot(wr(), None) 531 # Break the cycle to allow collection 532 gc.garbage[0].ref = None 533 self.assert_garbage([]) 534 self.assertIs(wr(), None) 535 536 537if __name__ == "__main__": 538 unittest.main() 539