1import unittest
2import unittest.mock
3from test.support import (verbose, refcount_test, run_unittest,
4                          cpython_only, start_threads,
5                          temp_dir, TESTFN, unlink,
6                          import_module)
7from test.support.script_helper import assert_python_ok, make_script
8
9import gc
10import sys
11import sysconfig
12import textwrap
13import threading
14import time
15import weakref
16
17try:
18    from _testcapi import with_tp_del
19except ImportError:
20    def with_tp_del(cls):
21        class C(object):
22            def __new__(cls, *args, **kwargs):
23                raise TypeError('requires _testcapi.with_tp_del')
24        return C
25
26try:
27    from _testcapi import ContainerNoGC
28except ImportError:
29    ContainerNoGC = None
30
31### Support code
32###############################################################################
33
34# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
35# cyclic gc.
36
37# An instance of C1055820 has a self-loop, so becomes cyclic trash when
38# unreachable.
39class C1055820(object):
40    def __init__(self, i):
41        self.i = i
42        self.loop = self
43
44class GC_Detector(object):
45    # Create an instance I.  Then gc hasn't happened again so long as
46    # I.gc_happened is false.
47
48    def __init__(self):
49        self.gc_happened = False
50
51        def it_happened(ignored):
52            self.gc_happened = True
53
54        # Create a piece of cyclic trash that triggers it_happened when
55        # gc collects it.
56        self.wr = weakref.ref(C1055820(666), it_happened)
57
58@with_tp_del
59class Uncollectable(object):
60    """Create a reference cycle with multiple __del__ methods.
61
62    An object in a reference cycle will never have zero references,
63    and so must be garbage collected.  If one or more objects in the
64    cycle have __del__ methods, the gc refuses to guess an order,
65    and leaves the cycle uncollected."""
66    def __init__(self, partner=None):
67        if partner is None:
68            self.partner = Uncollectable(partner=self)
69        else:
70            self.partner = partner
71    def __tp_del__(self):
72        pass
73
74if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
75    BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
76else:
77    # Usually, sys.gettotalrefcount() is only present if Python has been
78    # compiled in debug mode. If it's missing, expect that Python has
79    # been released in release mode: with NDEBUG defined.
80    BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
81
82### Tests
83###############################################################################
84
85class GCTests(unittest.TestCase):
86    def test_list(self):
87        l = []
88        l.append(l)
89        gc.collect()
90        del l
91        self.assertEqual(gc.collect(), 1)
92
93    def test_dict(self):
94        d = {}
95        d[1] = d
96        gc.collect()
97        del d
98        self.assertEqual(gc.collect(), 1)
99
100    def test_tuple(self):
101        # since tuples are immutable we close the loop with a list
102        l = []
103        t = (l,)
104        l.append(t)
105        gc.collect()
106        del t
107        del l
108        self.assertEqual(gc.collect(), 2)
109
110    def test_class(self):
111        class A:
112            pass
113        A.a = A
114        gc.collect()
115        del A
116        self.assertNotEqual(gc.collect(), 0)
117
118    def test_newstyleclass(self):
119        class A(object):
120            pass
121        gc.collect()
122        del A
123        self.assertNotEqual(gc.collect(), 0)
124
125    def test_instance(self):
126        class A:
127            pass
128        a = A()
129        a.a = a
130        gc.collect()
131        del a
132        self.assertNotEqual(gc.collect(), 0)
133
134    def test_newinstance(self):
135        class A(object):
136            pass
137        a = A()
138        a.a = a
139        gc.collect()
140        del a
141        self.assertNotEqual(gc.collect(), 0)
142        class B(list):
143            pass
144        class C(B, A):
145            pass
146        a = C()
147        a.a = a
148        gc.collect()
149        del a
150        self.assertNotEqual(gc.collect(), 0)
151        del B, C
152        self.assertNotEqual(gc.collect(), 0)
153        A.a = A()
154        del A
155        self.assertNotEqual(gc.collect(), 0)
156        self.assertEqual(gc.collect(), 0)
157
158    def test_method(self):
159        # Tricky: self.__init__ is a bound method, it references the instance.
160        class A:
161            def __init__(self):
162                self.init = self.__init__
163        a = A()
164        gc.collect()
165        del a
166        self.assertNotEqual(gc.collect(), 0)
167
168    @cpython_only
169    def test_legacy_finalizer(self):
170        # A() is uncollectable if it is part of a cycle, make sure it shows up
171        # in gc.garbage.
172        @with_tp_del
173        class A:
174            def __tp_del__(self): pass
175        class B:
176            pass
177        a = A()
178        a.a = a
179        id_a = id(a)
180        b = B()
181        b.b = b
182        gc.collect()
183        del a
184        del b
185        self.assertNotEqual(gc.collect(), 0)
186        for obj in gc.garbage:
187            if id(obj) == id_a:
188                del obj.a
189                break
190        else:
191            self.fail("didn't find obj in garbage (finalizer)")
192        gc.garbage.remove(obj)
193
194    @cpython_only
195    def test_legacy_finalizer_newclass(self):
196        # A() is uncollectable if it is part of a cycle, make sure it shows up
197        # in gc.garbage.
198        @with_tp_del
199        class A(object):
200            def __tp_del__(self): pass
201        class B(object):
202            pass
203        a = A()
204        a.a = a
205        id_a = id(a)
206        b = B()
207        b.b = b
208        gc.collect()
209        del a
210        del b
211        self.assertNotEqual(gc.collect(), 0)
212        for obj in gc.garbage:
213            if id(obj) == id_a:
214                del obj.a
215                break
216        else:
217            self.fail("didn't find obj in garbage (finalizer)")
218        gc.garbage.remove(obj)
219
220    def test_function(self):
221        # Tricky: f -> d -> f, code should call d.clear() after the exec to
222        # break the cycle.
223        d = {}
224        exec("def f(): pass\n", d)
225        gc.collect()
226        del d
227        self.assertEqual(gc.collect(), 2)
228
229    @refcount_test
230    def test_frame(self):
231        def f():
232            frame = sys._getframe()
233        gc.collect()
234        f()
235        self.assertEqual(gc.collect(), 1)
236
237    def test_saveall(self):
238        # Verify that cyclic garbage like lists show up in gc.garbage if the
239        # SAVEALL option is enabled.
240
241        # First make sure we don't save away other stuff that just happens to
242        # be waiting for collection.
243        gc.collect()
244        # if this fails, someone else created immortal trash
245        self.assertEqual(gc.garbage, [])
246
247        L = []
248        L.append(L)
249        id_L = id(L)
250
251        debug = gc.get_debug()
252        gc.set_debug(debug | gc.DEBUG_SAVEALL)
253        del L
254        gc.collect()
255        gc.set_debug(debug)
256
257        self.assertEqual(len(gc.garbage), 1)
258        obj = gc.garbage.pop()
259        self.assertEqual(id(obj), id_L)
260
261    def test_del(self):
262        # __del__ methods can trigger collection, make this to happen
263        thresholds = gc.get_threshold()
264        gc.enable()
265        gc.set_threshold(1)
266
267        class A:
268            def __del__(self):
269                dir(self)
270        a = A()
271        del a
272
273        gc.disable()
274        gc.set_threshold(*thresholds)
275
276    def test_del_newclass(self):
277        # __del__ methods can trigger collection, make this to happen
278        thresholds = gc.get_threshold()
279        gc.enable()
280        gc.set_threshold(1)
281
282        class A(object):
283            def __del__(self):
284                dir(self)
285        a = A()
286        del a
287
288        gc.disable()
289        gc.set_threshold(*thresholds)
290
291    # The following two tests are fragile:
292    # They precisely count the number of allocations,
293    # which is highly implementation-dependent.
294    # For example, disposed tuples are not freed, but reused.
295    # To minimize variations, though, we first store the get_count() results
296    # and check them at the end.
297    @refcount_test
298    def test_get_count(self):
299        gc.collect()
300        a, b, c = gc.get_count()
301        x = []
302        d, e, f = gc.get_count()
303        self.assertEqual((b, c), (0, 0))
304        self.assertEqual((e, f), (0, 0))
305        # This is less fragile than asserting that a equals 0.
306        self.assertLess(a, 5)
307        # Between the two calls to get_count(), at least one object was
308        # created (the list).
309        self.assertGreater(d, a)
310
311    @refcount_test
312    def test_collect_generations(self):
313        gc.collect()
314        # This object will "trickle" into generation N + 1 after
315        # each call to collect(N)
316        x = []
317        gc.collect(0)
318        # x is now in gen 1
319        a, b, c = gc.get_count()
320        gc.collect(1)
321        # x is now in gen 2
322        d, e, f = gc.get_count()
323        gc.collect(2)
324        # x is now in gen 3
325        g, h, i = gc.get_count()
326        # We don't check a, d, g since their exact values depends on
327        # internal implementation details of the interpreter.
328        self.assertEqual((b, c), (1, 0))
329        self.assertEqual((e, f), (0, 1))
330        self.assertEqual((h, i), (0, 0))
331
332    def test_trashcan(self):
333        class Ouch:
334            n = 0
335            def __del__(self):
336                Ouch.n = Ouch.n + 1
337                if Ouch.n % 17 == 0:
338                    gc.collect()
339
340        # "trashcan" is a hack to prevent stack overflow when deallocating
341        # very deeply nested tuples etc.  It works in part by abusing the
342        # type pointer and refcount fields, and that can yield horrible
343        # problems when gc tries to traverse the structures.
344        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
345        # most likely die via segfault.
346
347        # Note:  In 2.3 the possibility for compiling without cyclic gc was
348        # removed, and that in turn allows the trashcan mechanism to work
349        # via much simpler means (e.g., it never abuses the type pointer or
350        # refcount fields anymore).  Since it's much less likely to cause a
351        # problem now, the various constants in this expensive (we force a lot
352        # of full collections) test are cut back from the 2.2 version.
353        gc.enable()
354        N = 150
355        for count in range(2):
356            t = []
357            for i in range(N):
358                t = [t, Ouch()]
359            u = []
360            for i in range(N):
361                u = [u, Ouch()]
362            v = {}
363            for i in range(N):
364                v = {1: v, 2: Ouch()}
365        gc.disable()
366
367    def test_trashcan_threads(self):
368        # Issue #13992: trashcan mechanism should be thread-safe
369        NESTING = 60
370        N_THREADS = 2
371
372        def sleeper_gen():
373            """A generator that releases the GIL when closed or dealloc'ed."""
374            try:
375                yield
376            finally:
377                time.sleep(0.000001)
378
379        class C(list):
380            # Appending to a list is atomic, which avoids the use of a lock.
381            inits = []
382            dels = []
383            def __init__(self, alist):
384                self[:] = alist
385                C.inits.append(None)
386            def __del__(self):
387                # This __del__ is called by subtype_dealloc().
388                C.dels.append(None)
389                # `g` will release the GIL when garbage-collected.  This
390                # helps assert subtype_dealloc's behaviour when threads
391                # switch in the middle of it.
392                g = sleeper_gen()
393                next(g)
394                # Now that __del__ is finished, subtype_dealloc will proceed
395                # to call list_dealloc, which also uses the trashcan mechanism.
396
397        def make_nested():
398            """Create a sufficiently nested container object so that the
399            trashcan mechanism is invoked when deallocating it."""
400            x = C([])
401            for i in range(NESTING):
402                x = [C([x])]
403            del x
404
405        def run_thread():
406            """Exercise make_nested() in a loop."""
407            while not exit:
408                make_nested()
409
410        old_switchinterval = sys.getswitchinterval()
411        sys.setswitchinterval(1e-5)
412        try:
413            exit = []
414            threads = []
415            for i in range(N_THREADS):
416                t = threading.Thread(target=run_thread)
417                threads.append(t)
418            with start_threads(threads, lambda: exit.append(1)):
419                time.sleep(1.0)
420        finally:
421            sys.setswitchinterval(old_switchinterval)
422        gc.collect()
423        self.assertEqual(len(C.inits), len(C.dels))
424
425    def test_boom(self):
426        class Boom:
427            def __getattr__(self, someattribute):
428                del self.attr
429                raise AttributeError
430
431        a = Boom()
432        b = Boom()
433        a.attr = b
434        b.attr = a
435
436        gc.collect()
437        garbagelen = len(gc.garbage)
438        del a, b
439        # a<->b are in a trash cycle now.  Collection will invoke
440        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
441        # __getattr__ deletes the internal "attr" attributes as a side effect.
442        # That causes the trash cycle to get reclaimed via refcounts falling to
443        # 0, thus mutating the trash graph as a side effect of merely asking
444        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
445        # Now __getattr__ isn't called.
446        self.assertEqual(gc.collect(), 4)
447        self.assertEqual(len(gc.garbage), garbagelen)
448
449    def test_boom2(self):
450        class Boom2:
451            def __init__(self):
452                self.x = 0
453
454            def __getattr__(self, someattribute):
455                self.x += 1
456                if self.x > 1:
457                    del self.attr
458                raise AttributeError
459
460        a = Boom2()
461        b = Boom2()
462        a.attr = b
463        b.attr = a
464
465        gc.collect()
466        garbagelen = len(gc.garbage)
467        del a, b
468        # Much like test_boom(), except that __getattr__ doesn't break the
469        # cycle until the second time gc checks for __del__.  As of 2.3b1,
470        # there isn't a second time, so this simply cleans up the trash cycle.
471        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
472        # reclaimed this way.
473        self.assertEqual(gc.collect(), 4)
474        self.assertEqual(len(gc.garbage), garbagelen)
475
476    def test_boom_new(self):
477        # boom__new and boom2_new are exactly like boom and boom2, except use
478        # new-style classes.
479
480        class Boom_New(object):
481            def __getattr__(self, someattribute):
482                del self.attr
483                raise AttributeError
484
485        a = Boom_New()
486        b = Boom_New()
487        a.attr = b
488        b.attr = a
489
490        gc.collect()
491        garbagelen = len(gc.garbage)
492        del a, b
493        self.assertEqual(gc.collect(), 4)
494        self.assertEqual(len(gc.garbage), garbagelen)
495
496    def test_boom2_new(self):
497        class Boom2_New(object):
498            def __init__(self):
499                self.x = 0
500
501            def __getattr__(self, someattribute):
502                self.x += 1
503                if self.x > 1:
504                    del self.attr
505                raise AttributeError
506
507        a = Boom2_New()
508        b = Boom2_New()
509        a.attr = b
510        b.attr = a
511
512        gc.collect()
513        garbagelen = len(gc.garbage)
514        del a, b
515        self.assertEqual(gc.collect(), 4)
516        self.assertEqual(len(gc.garbage), garbagelen)
517
518    def test_get_referents(self):
519        alist = [1, 3, 5]
520        got = gc.get_referents(alist)
521        got.sort()
522        self.assertEqual(got, alist)
523
524        atuple = tuple(alist)
525        got = gc.get_referents(atuple)
526        got.sort()
527        self.assertEqual(got, alist)
528
529        adict = {1: 3, 5: 7}
530        expected = [1, 3, 5, 7]
531        got = gc.get_referents(adict)
532        got.sort()
533        self.assertEqual(got, expected)
534
535        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
536        got.sort()
537        self.assertEqual(got, [0, 0] + list(range(5)))
538
539        self.assertEqual(gc.get_referents(1, 'a', 4j), [])
540
541    def test_is_tracked(self):
542        # Atomic built-in types are not tracked, user-defined objects and
543        # mutable containers are.
544        # NOTE: types with special optimizations (e.g. tuple) have tests
545        # in their own test files instead.
546        self.assertFalse(gc.is_tracked(None))
547        self.assertFalse(gc.is_tracked(1))
548        self.assertFalse(gc.is_tracked(1.0))
549        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
550        self.assertFalse(gc.is_tracked(True))
551        self.assertFalse(gc.is_tracked(False))
552        self.assertFalse(gc.is_tracked(b"a"))
553        self.assertFalse(gc.is_tracked("a"))
554        self.assertFalse(gc.is_tracked(bytearray(b"a")))
555        self.assertFalse(gc.is_tracked(type))
556        self.assertFalse(gc.is_tracked(int))
557        self.assertFalse(gc.is_tracked(object))
558        self.assertFalse(gc.is_tracked(object()))
559
560        class UserClass:
561            pass
562
563        class UserInt(int):
564            pass
565
566        # Base class is object; no extra fields.
567        class UserClassSlots:
568            __slots__ = ()
569
570        # Base class is fixed size larger than object; no extra fields.
571        class UserFloatSlots(float):
572            __slots__ = ()
573
574        # Base class is variable size; no extra fields.
575        class UserIntSlots(int):
576            __slots__ = ()
577
578        self.assertTrue(gc.is_tracked(gc))
579        self.assertTrue(gc.is_tracked(UserClass))
580        self.assertTrue(gc.is_tracked(UserClass()))
581        self.assertTrue(gc.is_tracked(UserInt()))
582        self.assertTrue(gc.is_tracked([]))
583        self.assertTrue(gc.is_tracked(set()))
584        self.assertTrue(gc.is_tracked(UserClassSlots()))
585        self.assertTrue(gc.is_tracked(UserFloatSlots()))
586        self.assertTrue(gc.is_tracked(UserIntSlots()))
587
588    def test_is_finalized(self):
589        # Objects not tracked by the always gc return false
590        self.assertFalse(gc.is_finalized(3))
591
592        storage = []
593        class Lazarus:
594            def __del__(self):
595                storage.append(self)
596
597        lazarus = Lazarus()
598        self.assertFalse(gc.is_finalized(lazarus))
599
600        del lazarus
601        gc.collect()
602
603        lazarus = storage.pop()
604        self.assertTrue(gc.is_finalized(lazarus))
605
606    def test_bug1055820b(self):
607        # Corresponds to temp2b.py in the bug report.
608
609        ouch = []
610        def callback(ignored):
611            ouch[:] = [wr() for wr in WRs]
612
613        Cs = [C1055820(i) for i in range(2)]
614        WRs = [weakref.ref(c, callback) for c in Cs]
615        c = None
616
617        gc.collect()
618        self.assertEqual(len(ouch), 0)
619        # Make the two instances trash, and collect again.  The bug was that
620        # the callback materialized a strong reference to an instance, but gc
621        # cleared the instance's dict anyway.
622        Cs = None
623        gc.collect()
624        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
625        for x in ouch:
626            # If the callback resurrected one of these guys, the instance
627            # would be damaged, with an empty __dict__.
628            self.assertEqual(x, None)
629
630    def test_bug21435(self):
631        # This is a poor test - its only virtue is that it happened to
632        # segfault on Tim's Windows box before the patch for 21435 was
633        # applied.  That's a nasty bug relying on specific pieces of cyclic
634        # trash appearing in exactly the right order in finalize_garbage()'s
635        # input list.
636        # But there's no reliable way to force that order from Python code,
637        # so over time chances are good this test won't really be testing much
638        # of anything anymore.  Still, if it blows up, there's _some_
639        # problem ;-)
640        gc.collect()
641
642        class A:
643            pass
644
645        class B:
646            def __init__(self, x):
647                self.x = x
648
649            def __del__(self):
650                self.attr = None
651
652        def do_work():
653            a = A()
654            b = B(A())
655
656            a.attr = b
657            b.attr = a
658
659        do_work()
660        gc.collect() # this blows up (bad C pointer) when it fails
661
662    @cpython_only
663    def test_garbage_at_shutdown(self):
664        import subprocess
665        code = """if 1:
666            import gc
667            import _testcapi
668            @_testcapi.with_tp_del
669            class X:
670                def __init__(self, name):
671                    self.name = name
672                def __repr__(self):
673                    return "<X %%r>" %% self.name
674                def __tp_del__(self):
675                    pass
676
677            x = X('first')
678            x.x = x
679            x.y = X('second')
680            del x
681            gc.set_debug(%s)
682        """
683        def run_command(code):
684            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
685                stdout=subprocess.PIPE,
686                stderr=subprocess.PIPE)
687            stdout, stderr = p.communicate()
688            p.stdout.close()
689            p.stderr.close()
690            self.assertEqual(p.returncode, 0)
691            self.assertEqual(stdout, b"")
692            return stderr
693
694        stderr = run_command(code % "0")
695        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
696                      b"shutdown; use", stderr)
697        self.assertNotIn(b"<X 'first'>", stderr)
698        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
699        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
700        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
701                      b"shutdown", stderr)
702        self.assertTrue(
703            (b"[<X 'first'>, <X 'second'>]" in stderr) or
704            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
705        # With DEBUG_SAVEALL, no additional message should get printed
706        # (because gc.garbage also contains normally reclaimable cyclic
707        # references, and its elements get printed at runtime anyway).
708        stderr = run_command(code % "gc.DEBUG_SAVEALL")
709        self.assertNotIn(b"uncollectable objects at shutdown", stderr)
710
711    def test_gc_main_module_at_shutdown(self):
712        # Create a reference cycle through the __main__ module and check
713        # it gets collected at interpreter shutdown.
714        code = """if 1:
715            class C:
716                def __del__(self):
717                    print('__del__ called')
718            l = [C()]
719            l.append(l)
720            """
721        rc, out, err = assert_python_ok('-c', code)
722        self.assertEqual(out.strip(), b'__del__ called')
723
724    def test_gc_ordinary_module_at_shutdown(self):
725        # Same as above, but with a non-__main__ module.
726        with temp_dir() as script_dir:
727            module = """if 1:
728                class C:
729                    def __del__(self):
730                        print('__del__ called')
731                l = [C()]
732                l.append(l)
733                """
734            code = """if 1:
735                import sys
736                sys.path.insert(0, %r)
737                import gctest
738                """ % (script_dir,)
739            make_script(script_dir, 'gctest', module)
740            rc, out, err = assert_python_ok('-c', code)
741            self.assertEqual(out.strip(), b'__del__ called')
742
743    def test_global_del_SystemExit(self):
744        code = """if 1:
745            class ClassWithDel:
746                def __del__(self):
747                    print('__del__ called')
748            a = ClassWithDel()
749            a.link = a
750            raise SystemExit(0)"""
751        self.addCleanup(unlink, TESTFN)
752        with open(TESTFN, 'w') as script:
753            script.write(code)
754        rc, out, err = assert_python_ok(TESTFN)
755        self.assertEqual(out.strip(), b'__del__ called')
756
757    def test_get_stats(self):
758        stats = gc.get_stats()
759        self.assertEqual(len(stats), 3)
760        for st in stats:
761            self.assertIsInstance(st, dict)
762            self.assertEqual(set(st),
763                             {"collected", "collections", "uncollectable"})
764            self.assertGreaterEqual(st["collected"], 0)
765            self.assertGreaterEqual(st["collections"], 0)
766            self.assertGreaterEqual(st["uncollectable"], 0)
767        # Check that collection counts are incremented correctly
768        if gc.isenabled():
769            self.addCleanup(gc.enable)
770            gc.disable()
771        old = gc.get_stats()
772        gc.collect(0)
773        new = gc.get_stats()
774        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
775        self.assertEqual(new[1]["collections"], old[1]["collections"])
776        self.assertEqual(new[2]["collections"], old[2]["collections"])
777        gc.collect(2)
778        new = gc.get_stats()
779        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
780        self.assertEqual(new[1]["collections"], old[1]["collections"])
781        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
782
783    def test_freeze(self):
784        gc.freeze()
785        self.assertGreater(gc.get_freeze_count(), 0)
786        gc.unfreeze()
787        self.assertEqual(gc.get_freeze_count(), 0)
788
789    def test_get_objects(self):
790        gc.collect()
791        l = []
792        l.append(l)
793        self.assertTrue(
794                any(l is element for element in gc.get_objects(generation=0))
795        )
796        self.assertFalse(
797                any(l is element for element in  gc.get_objects(generation=1))
798        )
799        self.assertFalse(
800                any(l is element for element in gc.get_objects(generation=2))
801        )
802        gc.collect(generation=0)
803        self.assertFalse(
804                any(l is element for element in gc.get_objects(generation=0))
805        )
806        self.assertTrue(
807                any(l is element for element in  gc.get_objects(generation=1))
808        )
809        self.assertFalse(
810                any(l is element for element in gc.get_objects(generation=2))
811        )
812        gc.collect(generation=1)
813        self.assertFalse(
814                any(l is element for element in gc.get_objects(generation=0))
815        )
816        self.assertFalse(
817                any(l is element for element in  gc.get_objects(generation=1))
818        )
819        self.assertTrue(
820                any(l is element for element in gc.get_objects(generation=2))
821        )
822        gc.collect(generation=2)
823        self.assertFalse(
824                any(l is element for element in gc.get_objects(generation=0))
825        )
826        self.assertFalse(
827                any(l is element for element in  gc.get_objects(generation=1))
828        )
829        self.assertTrue(
830                any(l is element for element in gc.get_objects(generation=2))
831        )
832        del l
833        gc.collect()
834
835    def test_get_objects_arguments(self):
836        gc.collect()
837        self.assertEqual(len(gc.get_objects()),
838                         len(gc.get_objects(generation=None)))
839
840        self.assertRaises(ValueError, gc.get_objects, 1000)
841        self.assertRaises(ValueError, gc.get_objects, -1000)
842        self.assertRaises(TypeError, gc.get_objects, "1")
843        self.assertRaises(TypeError, gc.get_objects, 1.234)
844
845    def test_resurrection_only_happens_once_per_object(self):
846        class A:  # simple self-loop
847            def __init__(self):
848                self.me = self
849
850        class Lazarus(A):
851            resurrected = 0
852            resurrected_instances = []
853
854            def __del__(self):
855                Lazarus.resurrected += 1
856                Lazarus.resurrected_instances.append(self)
857
858        gc.collect()
859        gc.disable()
860
861        # We start with 0 resurrections
862        laz = Lazarus()
863        self.assertEqual(Lazarus.resurrected, 0)
864
865        # Deleting the instance and triggering a collection
866        # resurrects the object
867        del laz
868        gc.collect()
869        self.assertEqual(Lazarus.resurrected, 1)
870        self.assertEqual(len(Lazarus.resurrected_instances), 1)
871
872        # Clearing the references and forcing a collection
873        # should not resurrect the object again.
874        Lazarus.resurrected_instances.clear()
875        self.assertEqual(Lazarus.resurrected, 1)
876        gc.collect()
877        self.assertEqual(Lazarus.resurrected, 1)
878
879        gc.enable()
880
881    def test_resurrection_is_transitive(self):
882        class Cargo:
883            def __init__(self):
884                self.me = self
885
886        class Lazarus:
887            resurrected_instances = []
888
889            def __del__(self):
890                Lazarus.resurrected_instances.append(self)
891
892        gc.collect()
893        gc.disable()
894
895        laz = Lazarus()
896        cargo = Cargo()
897        cargo_id = id(cargo)
898
899        # Create a cycle between cargo and laz
900        laz.cargo = cargo
901        cargo.laz = laz
902
903        # Drop the references, force a collection and check that
904        # everything was resurrected.
905        del laz, cargo
906        gc.collect()
907        self.assertEqual(len(Lazarus.resurrected_instances), 1)
908        instance = Lazarus.resurrected_instances.pop()
909        self.assertTrue(hasattr(instance, "cargo"))
910        self.assertEqual(id(instance.cargo), cargo_id)
911
912        gc.collect()
913        gc.enable()
914
915    def test_resurrection_does_not_block_cleanup_of_other_objects(self):
916
917        # When a finalizer resurrects objects, stats were reporting them as
918        # having been collected.  This affected both collect()'s return
919        # value and the dicts returned by get_stats().
920        N = 100
921
922        class A:  # simple self-loop
923            def __init__(self):
924                self.me = self
925
926        class Z(A):  # resurrecting __del__
927            def __del__(self):
928                zs.append(self)
929
930        zs = []
931
932        def getstats():
933            d = gc.get_stats()[-1]
934            return d['collected'], d['uncollectable']
935
936        gc.collect()
937        gc.disable()
938
939        # No problems if just collecting A() instances.
940        oldc, oldnc = getstats()
941        for i in range(N):
942            A()
943        t = gc.collect()
944        c, nc = getstats()
945        self.assertEqual(t, 2*N) # instance object & its dict
946        self.assertEqual(c - oldc, 2*N)
947        self.assertEqual(nc - oldnc, 0)
948
949        # But Z() is not actually collected.
950        oldc, oldnc = c, nc
951        Z()
952        # Nothing is collected - Z() is merely resurrected.
953        t = gc.collect()
954        c, nc = getstats()
955        self.assertEqual(t, 0)
956        self.assertEqual(c - oldc, 0)
957        self.assertEqual(nc - oldnc, 0)
958
959        # Z() should not prevent anything else from being collected.
960        oldc, oldnc = c, nc
961        for i in range(N):
962            A()
963        Z()
964        t = gc.collect()
965        c, nc = getstats()
966        self.assertEqual(t, 2*N)
967        self.assertEqual(c - oldc, 2*N)
968        self.assertEqual(nc - oldnc, 0)
969
970        # The A() trash should have been reclaimed already but the
971        # 2 copies of Z are still in zs (and the associated dicts).
972        oldc, oldnc = c, nc
973        zs.clear()
974        t = gc.collect()
975        c, nc = getstats()
976        self.assertEqual(t, 4)
977        self.assertEqual(c - oldc, 4)
978        self.assertEqual(nc - oldnc, 0)
979
980        gc.enable()
981
982    @unittest.skipIf(ContainerNoGC is None,
983                     'requires ContainerNoGC extension type')
984    def test_trash_weakref_clear(self):
985        # Test that trash weakrefs are properly cleared (bpo-38006).
986        #
987        # Structure we are creating:
988        #
989        #   Z <- Y <- A--+--> WZ -> C
990        #             ^  |
991        #             +--+
992        # where:
993        #   WZ is a weakref to Z with callback C
994        #   Y doesn't implement tp_traverse
995        #   A contains a reference to itself, Y and WZ
996        #
997        # A, Y, Z, WZ are all trash.  The GC doesn't know that Z is trash
998        # because Y does not implement tp_traverse.  To show the bug, WZ needs
999        # to live long enough so that Z is deallocated before it.  Then, if
1000        # gcmodule is buggy, when Z is being deallocated, C will run.
1001        #
1002        # To ensure WZ lives long enough, we put it in a second reference
1003        # cycle.  That trick only works due to the ordering of the GC prev/next
1004        # linked lists.  So, this test is a bit fragile.
1005        #
1006        # The bug reported in bpo-38006 is caused because the GC did not
1007        # clear WZ before starting the process of calling tp_clear on the
1008        # trash.  Normally, handle_weakrefs() would find the weakref via Z and
1009        # clear it.  However, since the GC cannot find Z, WR is not cleared and
1010        # it can execute during delete_garbage().  That can lead to disaster
1011        # since the callback might tinker with objects that have already had
1012        # tp_clear called on them (leaving them in possibly invalid states).
1013
1014        callback = unittest.mock.Mock()
1015
1016        class A:
1017            __slots__ = ['a', 'y', 'wz']
1018
1019        class Z:
1020            pass
1021
1022        # setup required object graph, as described above
1023        a = A()
1024        a.a = a
1025        a.y = ContainerNoGC(Z())
1026        a.wz = weakref.ref(a.y.value, callback)
1027        # create second cycle to keep WZ alive longer
1028        wr_cycle = [a.wz]
1029        wr_cycle.append(wr_cycle)
1030        # ensure trash unrelated to this test is gone
1031        gc.collect()
1032        gc.disable()
1033        # release references and create trash
1034        del a, wr_cycle
1035        gc.collect()
1036        # if called, it means there is a bug in the GC.  The weakref should be
1037        # cleared before Z dies.
1038        callback.assert_not_called()
1039        gc.enable()
1040
1041
1042class GCCallbackTests(unittest.TestCase):
1043    def setUp(self):
1044        # Save gc state and disable it.
1045        self.enabled = gc.isenabled()
1046        gc.disable()
1047        self.debug = gc.get_debug()
1048        gc.set_debug(0)
1049        gc.callbacks.append(self.cb1)
1050        gc.callbacks.append(self.cb2)
1051        self.othergarbage = []
1052
1053    def tearDown(self):
1054        # Restore gc state
1055        del self.visit
1056        gc.callbacks.remove(self.cb1)
1057        gc.callbacks.remove(self.cb2)
1058        gc.set_debug(self.debug)
1059        if self.enabled:
1060            gc.enable()
1061        # destroy any uncollectables
1062        gc.collect()
1063        for obj in gc.garbage:
1064            if isinstance(obj, Uncollectable):
1065                obj.partner = None
1066        del gc.garbage[:]
1067        del self.othergarbage
1068        gc.collect()
1069
1070    def preclean(self):
1071        # Remove all fluff from the system.  Invoke this function
1072        # manually rather than through self.setUp() for maximum
1073        # safety.
1074        self.visit = []
1075        gc.collect()
1076        garbage, gc.garbage[:] = gc.garbage[:], []
1077        self.othergarbage.append(garbage)
1078        self.visit = []
1079
1080    def cb1(self, phase, info):
1081        self.visit.append((1, phase, dict(info)))
1082
1083    def cb2(self, phase, info):
1084        self.visit.append((2, phase, dict(info)))
1085        if phase == "stop" and hasattr(self, "cleanup"):
1086            # Clean Uncollectable from garbage
1087            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
1088            gc.garbage[:] = [e for e in gc.garbage
1089                             if not isinstance(e, Uncollectable)]
1090            for e in uc:
1091                e.partner = None
1092
1093    def test_collect(self):
1094        self.preclean()
1095        gc.collect()
1096        # Algorithmically verify the contents of self.visit
1097        # because it is long and tortuous.
1098
1099        # Count the number of visits to each callback
1100        n = [v[0] for v in self.visit]
1101        n1 = [i for i in n if i == 1]
1102        n2 = [i for i in n if i == 2]
1103        self.assertEqual(n1, [1]*2)
1104        self.assertEqual(n2, [2]*2)
1105
1106        # Count that we got the right number of start and stop callbacks.
1107        n = [v[1] for v in self.visit]
1108        n1 = [i for i in n if i == "start"]
1109        n2 = [i for i in n if i == "stop"]
1110        self.assertEqual(n1, ["start"]*2)
1111        self.assertEqual(n2, ["stop"]*2)
1112
1113        # Check that we got the right info dict for all callbacks
1114        for v in self.visit:
1115            info = v[2]
1116            self.assertTrue("generation" in info)
1117            self.assertTrue("collected" in info)
1118            self.assertTrue("uncollectable" in info)
1119
1120    def test_collect_generation(self):
1121        self.preclean()
1122        gc.collect(2)
1123        for v in self.visit:
1124            info = v[2]
1125            self.assertEqual(info["generation"], 2)
1126
1127    @cpython_only
1128    def test_collect_garbage(self):
1129        self.preclean()
1130        # Each of these cause four objects to be garbage: Two
1131        # Uncollectables and their instance dicts.
1132        Uncollectable()
1133        Uncollectable()
1134        C1055820(666)
1135        gc.collect()
1136        for v in self.visit:
1137            if v[1] != "stop":
1138                continue
1139            info = v[2]
1140            self.assertEqual(info["collected"], 2)
1141            self.assertEqual(info["uncollectable"], 8)
1142
1143        # We should now have the Uncollectables in gc.garbage
1144        self.assertEqual(len(gc.garbage), 4)
1145        for e in gc.garbage:
1146            self.assertIsInstance(e, Uncollectable)
1147
1148        # Now, let our callback handle the Uncollectable instances
1149        self.cleanup=True
1150        self.visit = []
1151        gc.garbage[:] = []
1152        gc.collect()
1153        for v in self.visit:
1154            if v[1] != "stop":
1155                continue
1156            info = v[2]
1157            self.assertEqual(info["collected"], 0)
1158            self.assertEqual(info["uncollectable"], 4)
1159
1160        # Uncollectables should be gone
1161        self.assertEqual(len(gc.garbage), 0)
1162
1163
1164    @unittest.skipIf(BUILD_WITH_NDEBUG,
1165                     'built with -NDEBUG')
1166    def test_refcount_errors(self):
1167        self.preclean()
1168        # Verify the "handling" of objects with broken refcounts
1169
1170        # Skip the test if ctypes is not available
1171        import_module("ctypes")
1172
1173        import subprocess
1174        code = textwrap.dedent('''
1175            from test.support import gc_collect, SuppressCrashReport
1176
1177            a = [1, 2, 3]
1178            b = [a]
1179
1180            # Avoid coredump when Py_FatalError() calls abort()
1181            SuppressCrashReport().__enter__()
1182
1183            # Simulate the refcount of "a" being too low (compared to the
1184            # references held on it by live data), but keeping it above zero
1185            # (to avoid deallocating it):
1186            import ctypes
1187            ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
1188
1189            # The garbage collector should now have a fatal error
1190            # when it reaches the broken object
1191            gc_collect()
1192        ''')
1193        p = subprocess.Popen([sys.executable, "-c", code],
1194                             stdout=subprocess.PIPE,
1195                             stderr=subprocess.PIPE)
1196        stdout, stderr = p.communicate()
1197        p.stdout.close()
1198        p.stderr.close()
1199        # Verify that stderr has a useful error message:
1200        self.assertRegex(stderr,
1201            br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
1202        self.assertRegex(stderr,
1203            br'refcount is too small')
1204        # "address : 0x7fb5062efc18"
1205        # "address : 7FB5062EFC18"
1206        address_regex = br'[0-9a-fA-Fx]+'
1207        self.assertRegex(stderr,
1208            br'object address  : ' + address_regex)
1209        self.assertRegex(stderr,
1210            br'object refcount : 1')
1211        self.assertRegex(stderr,
1212            br'object type     : ' + address_regex)
1213        self.assertRegex(stderr,
1214            br'object type name: list')
1215        self.assertRegex(stderr,
1216            br'object repr     : \[1, 2, 3\]')
1217
1218
1219class GCTogglingTests(unittest.TestCase):
1220    def setUp(self):
1221        gc.enable()
1222
1223    def tearDown(self):
1224        gc.disable()
1225
1226    def test_bug1055820c(self):
1227        # Corresponds to temp2c.py in the bug report.  This is pretty
1228        # elaborate.
1229
1230        c0 = C1055820(0)
1231        # Move c0 into generation 2.
1232        gc.collect()
1233
1234        c1 = C1055820(1)
1235        c1.keep_c0_alive = c0
1236        del c0.loop # now only c1 keeps c0 alive
1237
1238        c2 = C1055820(2)
1239        c2wr = weakref.ref(c2) # no callback!
1240
1241        ouch = []
1242        def callback(ignored):
1243            ouch[:] = [c2wr()]
1244
1245        # The callback gets associated with a wr on an object in generation 2.
1246        c0wr = weakref.ref(c0, callback)
1247
1248        c0 = c1 = c2 = None
1249
1250        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
1251        # generation 2.  The only thing keeping it alive is that c1 points to
1252        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
1253        # global weakref to c2 (c2wr), but that weakref has no callback.
1254        # There's also a global weakref to c0 (c0wr), and that does have a
1255        # callback, and that callback references c2 via c2wr().
1256        #
1257        #               c0 has a wr with callback, which references c2wr
1258        #               ^
1259        #               |
1260        #               |     Generation 2 above dots
1261        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1262        #               |     Generation 0 below dots
1263        #               |
1264        #               |
1265        #            ^->c1   ^->c2 has a wr but no callback
1266        #            |  |    |  |
1267        #            <--v    <--v
1268        #
1269        # So this is the nightmare:  when generation 0 gets collected, we see
1270        # that c2 has a callback-free weakref, and c1 doesn't even have a
1271        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
1272        # the only object that has a weakref with a callback.  gc clears c1
1273        # and c2.  Clearing c1 has the side effect of dropping the refcount on
1274        # c0 to 0, so c0 goes away (despite that it's in an older generation)
1275        # and c0's wr callback triggers.  That in turn materializes a reference
1276        # to c2 via c2wr(), but c2 gets cleared anyway by gc.
1277
1278        # We want to let gc happen "naturally", to preserve the distinction
1279        # between generations.
1280        junk = []
1281        i = 0
1282        detector = GC_Detector()
1283        while not detector.gc_happened:
1284            i += 1
1285            if i > 10000:
1286                self.fail("gc didn't happen after 10000 iterations")
1287            self.assertEqual(len(ouch), 0)
1288            junk.append([])  # this will eventually trigger gc
1289
1290        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
1291        for x in ouch:
1292            # If the callback resurrected c2, the instance would be damaged,
1293            # with an empty __dict__.
1294            self.assertEqual(x, None)
1295
1296    def test_bug1055820d(self):
1297        # Corresponds to temp2d.py in the bug report.  This is very much like
1298        # test_bug1055820c, but uses a __del__ method instead of a weakref
1299        # callback to sneak in a resurrection of cyclic trash.
1300
1301        ouch = []
1302        class D(C1055820):
1303            def __del__(self):
1304                ouch[:] = [c2wr()]
1305
1306        d0 = D(0)
1307        # Move all the above into generation 2.
1308        gc.collect()
1309
1310        c1 = C1055820(1)
1311        c1.keep_d0_alive = d0
1312        del d0.loop # now only c1 keeps d0 alive
1313
1314        c2 = C1055820(2)
1315        c2wr = weakref.ref(c2) # no callback!
1316
1317        d0 = c1 = c2 = None
1318
1319        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
1320        # generation 2.  The only thing keeping it alive is that c1 points to
1321        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
1322        # a global weakref to c2 (c2wr), but that weakref has no callback.
1323        # There are no other weakrefs.
1324        #
1325        #               d0 has a __del__ method that references c2wr
1326        #               ^
1327        #               |
1328        #               |     Generation 2 above dots
1329        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1330        #               |     Generation 0 below dots
1331        #               |
1332        #               |
1333        #            ^->c1   ^->c2 has a wr but no callback
1334        #            |  |    |  |
1335        #            <--v    <--v
1336        #
1337        # So this is the nightmare:  when generation 0 gets collected, we see
1338        # that c2 has a callback-free weakref, and c1 doesn't even have a
1339        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
1340        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
1341        # on d0 to 0, so d0 goes away (despite that it's in an older
1342        # generation) and d0's __del__ triggers.  That in turn materializes
1343        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
1344
1345        # We want to let gc happen "naturally", to preserve the distinction
1346        # between generations.
1347        detector = GC_Detector()
1348        junk = []
1349        i = 0
1350        while not detector.gc_happened:
1351            i += 1
1352            if i > 10000:
1353                self.fail("gc didn't happen after 10000 iterations")
1354            self.assertEqual(len(ouch), 0)
1355            junk.append([])  # this will eventually trigger gc
1356
1357        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
1358        for x in ouch:
1359            # If __del__ resurrected c2, the instance would be damaged, with an
1360            # empty __dict__.
1361            self.assertEqual(x, None)
1362
1363def test_main():
1364    enabled = gc.isenabled()
1365    gc.disable()
1366    assert not gc.isenabled()
1367    debug = gc.get_debug()
1368    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1369
1370    try:
1371        gc.collect() # Delete 2nd generation garbage
1372        run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
1373    finally:
1374        gc.set_debug(debug)
1375        # test gc.enable() even if GC is disabled by default
1376        if verbose:
1377            print("restoring automatic collection")
1378        # make sure to always test gc.enable()
1379        gc.enable()
1380        assert gc.isenabled()
1381        if not enabled:
1382            gc.disable()
1383
1384if __name__ == "__main__":
1385    test_main()
1386