1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import signal
7import sys
8import re
9import warnings
10import contextlib
11import weakref
12from unittest import mock
13
14import unittest
15from test import support
16from test.support import script_helper
17
18
19if hasattr(os, 'stat'):
20    import stat
21    has_stat = 1
22else:
23    has_stat = 0
24
25has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
26has_spawnl = hasattr(os, 'spawnl')
27
28# TEST_FILES may need to be tweaked for systems depending on the maximum
29# number of files that can be opened at one time (see ulimit -n)
30if sys.platform.startswith('openbsd'):
31    TEST_FILES = 48
32else:
33    TEST_FILES = 100
34
35# This is organized as one test for each chunk of code in tempfile.py,
36# in order of their appearance in the file.  Testing which requires
37# threads is not done here.
38
39class TestLowLevelInternals(unittest.TestCase):
40    def test_infer_return_type_singles(self):
41        self.assertIs(str, tempfile._infer_return_type(''))
42        self.assertIs(bytes, tempfile._infer_return_type(b''))
43        self.assertIs(str, tempfile._infer_return_type(None))
44
45    def test_infer_return_type_multiples(self):
46        self.assertIs(str, tempfile._infer_return_type('', ''))
47        self.assertIs(bytes, tempfile._infer_return_type(b'', b''))
48        with self.assertRaises(TypeError):
49            tempfile._infer_return_type('', b'')
50        with self.assertRaises(TypeError):
51            tempfile._infer_return_type(b'', '')
52
53    def test_infer_return_type_multiples_and_none(self):
54        self.assertIs(str, tempfile._infer_return_type(None, ''))
55        self.assertIs(str, tempfile._infer_return_type('', None))
56        self.assertIs(str, tempfile._infer_return_type(None, None))
57        self.assertIs(bytes, tempfile._infer_return_type(b'', None))
58        self.assertIs(bytes, tempfile._infer_return_type(None, b''))
59        with self.assertRaises(TypeError):
60            tempfile._infer_return_type('', None, b'')
61        with self.assertRaises(TypeError):
62            tempfile._infer_return_type(b'', None, '')
63
64
65# Common functionality.
66
67class BaseTestCase(unittest.TestCase):
68
69    str_check = re.compile(r"^[a-z0-9_-]{8}$")
70    b_check = re.compile(br"^[a-z0-9_-]{8}$")
71
72    def setUp(self):
73        self._warnings_manager = support.check_warnings()
74        self._warnings_manager.__enter__()
75        warnings.filterwarnings("ignore", category=RuntimeWarning,
76                                message="mktemp", module=__name__)
77
78    def tearDown(self):
79        self._warnings_manager.__exit__(None, None, None)
80
81    def nameCheck(self, name, dir, pre, suf):
82        (ndir, nbase) = os.path.split(name)
83        npre  = nbase[:len(pre)]
84        nsuf  = nbase[len(nbase)-len(suf):]
85
86        if dir is not None:
87            self.assertIs(type(name), str if type(dir) is str else bytes,
88                          "unexpected return type")
89        if pre is not None:
90            self.assertIs(type(name), str if type(pre) is str else bytes,
91                          "unexpected return type")
92        if suf is not None:
93            self.assertIs(type(name), str if type(suf) is str else bytes,
94                          "unexpected return type")
95        if (dir, pre, suf) == (None, None, None):
96            self.assertIs(type(name), str, "default return type must be str")
97
98        # check for equality of the absolute paths!
99        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
100                         "file %r not in directory %r" % (name, dir))
101        self.assertEqual(npre, pre,
102                         "file %r does not begin with %r" % (nbase, pre))
103        self.assertEqual(nsuf, suf,
104                         "file %r does not end with %r" % (nbase, suf))
105
106        nbase = nbase[len(pre):len(nbase)-len(suf)]
107        check = self.str_check if isinstance(nbase, str) else self.b_check
108        self.assertTrue(check.match(nbase),
109                        "random characters %r do not match %r"
110                        % (nbase, check.pattern))
111
112
113class TestExports(BaseTestCase):
114    def test_exports(self):
115        # There are no surprising symbols in the tempfile module
116        dict = tempfile.__dict__
117
118        expected = {
119            "NamedTemporaryFile" : 1,
120            "TemporaryFile" : 1,
121            "mkstemp" : 1,
122            "mkdtemp" : 1,
123            "mktemp" : 1,
124            "TMP_MAX" : 1,
125            "gettempprefix" : 1,
126            "gettempprefixb" : 1,
127            "gettempdir" : 1,
128            "gettempdirb" : 1,
129            "tempdir" : 1,
130            "template" : 1,
131            "SpooledTemporaryFile" : 1,
132            "TemporaryDirectory" : 1,
133        }
134
135        unexp = []
136        for key in dict:
137            if key[0] != '_' and key not in expected:
138                unexp.append(key)
139        self.assertTrue(len(unexp) == 0,
140                        "unexpected keys: %s" % unexp)
141
142
143class TestRandomNameSequence(BaseTestCase):
144    """Test the internal iterator object _RandomNameSequence."""
145
146    def setUp(self):
147        self.r = tempfile._RandomNameSequence()
148        super().setUp()
149
150    def test_get_six_char_str(self):
151        # _RandomNameSequence returns a six-character string
152        s = next(self.r)
153        self.nameCheck(s, '', '', '')
154
155    def test_many(self):
156        # _RandomNameSequence returns no duplicate strings (stochastic)
157
158        dict = {}
159        r = self.r
160        for i in range(TEST_FILES):
161            s = next(r)
162            self.nameCheck(s, '', '', '')
163            self.assertNotIn(s, dict)
164            dict[s] = 1
165
166    def supports_iter(self):
167        # _RandomNameSequence supports the iterator protocol
168
169        i = 0
170        r = self.r
171        for s in r:
172            i += 1
173            if i == 20:
174                break
175
176    @unittest.skipUnless(hasattr(os, 'fork'),
177        "os.fork is required for this test")
178    def test_process_awareness(self):
179        # ensure that the random source differs between
180        # child and parent.
181        read_fd, write_fd = os.pipe()
182        pid = None
183        try:
184            pid = os.fork()
185            if not pid:
186                # child process
187                os.close(read_fd)
188                os.write(write_fd, next(self.r).encode("ascii"))
189                os.close(write_fd)
190                # bypass the normal exit handlers- leave those to
191                # the parent.
192                os._exit(0)
193
194            # parent process
195            parent_value = next(self.r)
196            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
197        finally:
198            if pid:
199                # best effort to ensure the process can't bleed out
200                # via any bugs above
201                try:
202                    os.kill(pid, signal.SIGKILL)
203                except OSError:
204                    pass
205
206                # Read the process exit status to avoid zombie process
207                os.waitpid(pid, 0)
208
209            os.close(read_fd)
210            os.close(write_fd)
211        self.assertNotEqual(child_value, parent_value)
212
213
214
215class TestCandidateTempdirList(BaseTestCase):
216    """Test the internal function _candidate_tempdir_list."""
217
218    def test_nonempty_list(self):
219        # _candidate_tempdir_list returns a nonempty list of strings
220
221        cand = tempfile._candidate_tempdir_list()
222
223        self.assertFalse(len(cand) == 0)
224        for c in cand:
225            self.assertIsInstance(c, str)
226
227    def test_wanted_dirs(self):
228        # _candidate_tempdir_list contains the expected directories
229
230        # Make sure the interesting environment variables are all set.
231        with support.EnvironmentVarGuard() as env:
232            for envname in 'TMPDIR', 'TEMP', 'TMP':
233                dirname = os.getenv(envname)
234                if not dirname:
235                    env[envname] = os.path.abspath(envname)
236
237            cand = tempfile._candidate_tempdir_list()
238
239            for envname in 'TMPDIR', 'TEMP', 'TMP':
240                dirname = os.getenv(envname)
241                if not dirname: raise ValueError
242                self.assertIn(dirname, cand)
243
244            try:
245                dirname = os.getcwd()
246            except (AttributeError, OSError):
247                dirname = os.curdir
248
249            self.assertIn(dirname, cand)
250
251            # Not practical to try to verify the presence of OS-specific
252            # paths in this list.
253
254
255# We test _get_default_tempdir some more by testing gettempdir.
256
257class TestGetDefaultTempdir(BaseTestCase):
258    """Test _get_default_tempdir()."""
259
260    def test_no_files_left_behind(self):
261        # use a private empty directory
262        with tempfile.TemporaryDirectory() as our_temp_directory:
263            # force _get_default_tempdir() to consider our empty directory
264            def our_candidate_list():
265                return [our_temp_directory]
266
267            with support.swap_attr(tempfile, "_candidate_tempdir_list",
268                                   our_candidate_list):
269                # verify our directory is empty after _get_default_tempdir()
270                tempfile._get_default_tempdir()
271                self.assertEqual(os.listdir(our_temp_directory), [])
272
273                def raise_OSError(*args, **kwargs):
274                    raise OSError()
275
276                with support.swap_attr(io, "open", raise_OSError):
277                    # test again with failing io.open()
278                    with self.assertRaises(FileNotFoundError):
279                        tempfile._get_default_tempdir()
280                    self.assertEqual(os.listdir(our_temp_directory), [])
281
282                def bad_writer(*args, **kwargs):
283                    fp = orig_open(*args, **kwargs)
284                    fp.write = raise_OSError
285                    return fp
286
287                with support.swap_attr(io, "open", bad_writer) as orig_open:
288                    # test again with failing write()
289                    with self.assertRaises(FileNotFoundError):
290                        tempfile._get_default_tempdir()
291                    self.assertEqual(os.listdir(our_temp_directory), [])
292
293
294class TestGetCandidateNames(BaseTestCase):
295    """Test the internal function _get_candidate_names."""
296
297    def test_retval(self):
298        # _get_candidate_names returns a _RandomNameSequence object
299        obj = tempfile._get_candidate_names()
300        self.assertIsInstance(obj, tempfile._RandomNameSequence)
301
302    def test_same_thing(self):
303        # _get_candidate_names always returns the same object
304        a = tempfile._get_candidate_names()
305        b = tempfile._get_candidate_names()
306
307        self.assertTrue(a is b)
308
309
310@contextlib.contextmanager
311def _inside_empty_temp_dir():
312    dir = tempfile.mkdtemp()
313    try:
314        with support.swap_attr(tempfile, 'tempdir', dir):
315            yield
316    finally:
317        support.rmtree(dir)
318
319
320def _mock_candidate_names(*names):
321    return support.swap_attr(tempfile,
322                             '_get_candidate_names',
323                             lambda: iter(names))
324
325
326class TestBadTempdir:
327
328    def test_read_only_directory(self):
329        with _inside_empty_temp_dir():
330            oldmode = mode = os.stat(tempfile.tempdir).st_mode
331            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
332            os.chmod(tempfile.tempdir, mode)
333            try:
334                if os.access(tempfile.tempdir, os.W_OK):
335                    self.skipTest("can't set the directory read-only")
336                with self.assertRaises(PermissionError):
337                    self.make_temp()
338                self.assertEqual(os.listdir(tempfile.tempdir), [])
339            finally:
340                os.chmod(tempfile.tempdir, oldmode)
341
342    def test_nonexisting_directory(self):
343        with _inside_empty_temp_dir():
344            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
345            with support.swap_attr(tempfile, 'tempdir', tempdir):
346                with self.assertRaises(FileNotFoundError):
347                    self.make_temp()
348
349    def test_non_directory(self):
350        with _inside_empty_temp_dir():
351            tempdir = os.path.join(tempfile.tempdir, 'file')
352            open(tempdir, 'wb').close()
353            with support.swap_attr(tempfile, 'tempdir', tempdir):
354                with self.assertRaises((NotADirectoryError, FileNotFoundError)):
355                    self.make_temp()
356
357
358class TestMkstempInner(TestBadTempdir, BaseTestCase):
359    """Test the internal function _mkstemp_inner."""
360
361    class mkstemped:
362        _bflags = tempfile._bin_openflags
363        _tflags = tempfile._text_openflags
364        _close = os.close
365        _unlink = os.unlink
366
367        def __init__(self, dir, pre, suf, bin):
368            if bin: flags = self._bflags
369            else:   flags = self._tflags
370
371            output_type = tempfile._infer_return_type(dir, pre, suf)
372            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type)
373
374        def write(self, str):
375            os.write(self.fd, str)
376
377        def __del__(self):
378            self._close(self.fd)
379            self._unlink(self.name)
380
381    def do_create(self, dir=None, pre=None, suf=None, bin=1):
382        output_type = tempfile._infer_return_type(dir, pre, suf)
383        if dir is None:
384            if output_type is str:
385                dir = tempfile.gettempdir()
386            else:
387                dir = tempfile.gettempdirb()
388        if pre is None:
389            pre = output_type()
390        if suf is None:
391            suf = output_type()
392        file = self.mkstemped(dir, pre, suf, bin)
393
394        self.nameCheck(file.name, dir, pre, suf)
395        return file
396
397    def test_basic(self):
398        # _mkstemp_inner can create files
399        self.do_create().write(b"blat")
400        self.do_create(pre="a").write(b"blat")
401        self.do_create(suf="b").write(b"blat")
402        self.do_create(pre="a", suf="b").write(b"blat")
403        self.do_create(pre="aa", suf=".txt").write(b"blat")
404
405    def test_basic_with_bytes_names(self):
406        # _mkstemp_inner can create files when given name parts all
407        # specified as bytes.
408        dir_b = tempfile.gettempdirb()
409        self.do_create(dir=dir_b, suf=b"").write(b"blat")
410        self.do_create(dir=dir_b, pre=b"a").write(b"blat")
411        self.do_create(dir=dir_b, suf=b"b").write(b"blat")
412        self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat")
413        self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat")
414        # Can't mix str & binary types in the args.
415        with self.assertRaises(TypeError):
416            self.do_create(dir="", suf=b"").write(b"blat")
417        with self.assertRaises(TypeError):
418            self.do_create(dir=dir_b, pre="").write(b"blat")
419        with self.assertRaises(TypeError):
420            self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat")
421
422    def test_basic_many(self):
423        # _mkstemp_inner can create many files (stochastic)
424        extant = list(range(TEST_FILES))
425        for i in extant:
426            extant[i] = self.do_create(pre="aa")
427
428    def test_choose_directory(self):
429        # _mkstemp_inner can create files in a user-selected directory
430        dir = tempfile.mkdtemp()
431        try:
432            self.do_create(dir=dir).write(b"blat")
433        finally:
434            os.rmdir(dir)
435
436    @unittest.skipUnless(has_stat, 'os.stat not available')
437    def test_file_mode(self):
438        # _mkstemp_inner creates files with the proper mode
439
440        file = self.do_create()
441        mode = stat.S_IMODE(os.stat(file.name).st_mode)
442        expected = 0o600
443        if sys.platform == 'win32':
444            # There's no distinction among 'user', 'group' and 'world';
445            # replicate the 'user' bits.
446            user = expected >> 6
447            expected = user * (1 + 8 + 64)
448        self.assertEqual(mode, expected)
449
450    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
451    def test_noinherit(self):
452        # _mkstemp_inner file handles are not inherited by child processes
453
454        if support.verbose:
455            v="v"
456        else:
457            v="q"
458
459        file = self.do_create()
460        self.assertEqual(os.get_inheritable(file.fd), False)
461        fd = "%d" % file.fd
462
463        try:
464            me = __file__
465        except NameError:
466            me = sys.argv[0]
467
468        # We have to exec something, so that FD_CLOEXEC will take
469        # effect.  The core of this test is therefore in
470        # tf_inherit_check.py, which see.
471        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
472                              "tf_inherit_check.py")
473
474        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
475        # but an arg with embedded spaces should be decorated with double
476        # quotes on each end
477        if sys.platform == 'win32':
478            decorated = '"%s"' % sys.executable
479            tester = '"%s"' % tester
480        else:
481            decorated = sys.executable
482
483        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
484        self.assertFalse(retval < 0,
485                    "child process caught fatal signal %d" % -retval)
486        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
487
488    @unittest.skipUnless(has_textmode, "text mode not available")
489    def test_textmode(self):
490        # _mkstemp_inner can create files in text mode
491
492        # A text file is truncated at the first Ctrl+Z byte
493        f = self.do_create(bin=0)
494        f.write(b"blat\x1a")
495        f.write(b"extra\n")
496        os.lseek(f.fd, 0, os.SEEK_SET)
497        self.assertEqual(os.read(f.fd, 20), b"blat")
498
499    def make_temp(self):
500        return tempfile._mkstemp_inner(tempfile.gettempdir(),
501                                       tempfile.gettempprefix(),
502                                       '',
503                                       tempfile._bin_openflags,
504                                       str)
505
506    def test_collision_with_existing_file(self):
507        # _mkstemp_inner tries another name when a file with
508        # the chosen name already exists
509        with _inside_empty_temp_dir(), \
510             _mock_candidate_names('aaa', 'aaa', 'bbb'):
511            (fd1, name1) = self.make_temp()
512            os.close(fd1)
513            self.assertTrue(name1.endswith('aaa'))
514
515            (fd2, name2) = self.make_temp()
516            os.close(fd2)
517            self.assertTrue(name2.endswith('bbb'))
518
519    def test_collision_with_existing_directory(self):
520        # _mkstemp_inner tries another name when a directory with
521        # the chosen name already exists
522        with _inside_empty_temp_dir(), \
523             _mock_candidate_names('aaa', 'aaa', 'bbb'):
524            dir = tempfile.mkdtemp()
525            self.assertTrue(dir.endswith('aaa'))
526
527            (fd, name) = self.make_temp()
528            os.close(fd)
529            self.assertTrue(name.endswith('bbb'))
530
531
532class TestGetTempPrefix(BaseTestCase):
533    """Test gettempprefix()."""
534
535    def test_sane_template(self):
536        # gettempprefix returns a nonempty prefix string
537        p = tempfile.gettempprefix()
538
539        self.assertIsInstance(p, str)
540        self.assertGreater(len(p), 0)
541
542        pb = tempfile.gettempprefixb()
543
544        self.assertIsInstance(pb, bytes)
545        self.assertGreater(len(pb), 0)
546
547    def test_usable_template(self):
548        # gettempprefix returns a usable prefix string
549
550        # Create a temp directory, avoiding use of the prefix.
551        # Then attempt to create a file whose name is
552        # prefix + 'xxxxxx.xxx' in that directory.
553        p = tempfile.gettempprefix() + "xxxxxx.xxx"
554        d = tempfile.mkdtemp(prefix="")
555        try:
556            p = os.path.join(d, p)
557            fd = os.open(p, os.O_RDWR | os.O_CREAT)
558            os.close(fd)
559            os.unlink(p)
560        finally:
561            os.rmdir(d)
562
563
564class TestGetTempDir(BaseTestCase):
565    """Test gettempdir()."""
566
567    def test_directory_exists(self):
568        # gettempdir returns a directory which exists
569
570        for d in (tempfile.gettempdir(), tempfile.gettempdirb()):
571            self.assertTrue(os.path.isabs(d) or d == os.curdir,
572                            "%r is not an absolute path" % d)
573            self.assertTrue(os.path.isdir(d),
574                            "%r is not a directory" % d)
575
576    def test_directory_writable(self):
577        # gettempdir returns a directory writable by the user
578
579        # sneaky: just instantiate a NamedTemporaryFile, which
580        # defaults to writing into the directory returned by
581        # gettempdir.
582        file = tempfile.NamedTemporaryFile()
583        file.write(b"blat")
584        file.close()
585
586    def test_same_thing(self):
587        # gettempdir always returns the same object
588        a = tempfile.gettempdir()
589        b = tempfile.gettempdir()
590        c = tempfile.gettempdirb()
591
592        self.assertTrue(a is b)
593        self.assertNotEqual(type(a), type(c))
594        self.assertEqual(a, os.fsdecode(c))
595
596    def test_case_sensitive(self):
597        # gettempdir should not flatten its case
598        # even on a case-insensitive file system
599        case_sensitive_tempdir = tempfile.mkdtemp("-Temp")
600        _tempdir, tempfile.tempdir = tempfile.tempdir, None
601        try:
602            with support.EnvironmentVarGuard() as env:
603                # Fake the first env var which is checked as a candidate
604                env["TMPDIR"] = case_sensitive_tempdir
605                self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir)
606        finally:
607            tempfile.tempdir = _tempdir
608            support.rmdir(case_sensitive_tempdir)
609
610
611class TestMkstemp(BaseTestCase):
612    """Test mkstemp()."""
613
614    def do_create(self, dir=None, pre=None, suf=None):
615        output_type = tempfile._infer_return_type(dir, pre, suf)
616        if dir is None:
617            if output_type is str:
618                dir = tempfile.gettempdir()
619            else:
620                dir = tempfile.gettempdirb()
621        if pre is None:
622            pre = output_type()
623        if suf is None:
624            suf = output_type()
625        (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
626        (ndir, nbase) = os.path.split(name)
627        adir = os.path.abspath(dir)
628        self.assertEqual(adir, ndir,
629            "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
630
631        try:
632            self.nameCheck(name, dir, pre, suf)
633        finally:
634            os.close(fd)
635            os.unlink(name)
636
637    def test_basic(self):
638        # mkstemp can create files
639        self.do_create()
640        self.do_create(pre="a")
641        self.do_create(suf="b")
642        self.do_create(pre="a", suf="b")
643        self.do_create(pre="aa", suf=".txt")
644        self.do_create(dir=".")
645
646    def test_basic_with_bytes_names(self):
647        # mkstemp can create files when given name parts all
648        # specified as bytes.
649        d = tempfile.gettempdirb()
650        self.do_create(dir=d, suf=b"")
651        self.do_create(dir=d, pre=b"a")
652        self.do_create(dir=d, suf=b"b")
653        self.do_create(dir=d, pre=b"a", suf=b"b")
654        self.do_create(dir=d, pre=b"aa", suf=b".txt")
655        self.do_create(dir=b".")
656        with self.assertRaises(TypeError):
657            self.do_create(dir=".", pre=b"aa", suf=b".txt")
658        with self.assertRaises(TypeError):
659            self.do_create(dir=b".", pre="aa", suf=b".txt")
660        with self.assertRaises(TypeError):
661            self.do_create(dir=b".", pre=b"aa", suf=".txt")
662
663
664    def test_choose_directory(self):
665        # mkstemp can create directories in a user-selected directory
666        dir = tempfile.mkdtemp()
667        try:
668            self.do_create(dir=dir)
669        finally:
670            os.rmdir(dir)
671
672
673class TestMkdtemp(TestBadTempdir, BaseTestCase):
674    """Test mkdtemp()."""
675
676    def make_temp(self):
677        return tempfile.mkdtemp()
678
679    def do_create(self, dir=None, pre=None, suf=None):
680        output_type = tempfile._infer_return_type(dir, pre, suf)
681        if dir is None:
682            if output_type is str:
683                dir = tempfile.gettempdir()
684            else:
685                dir = tempfile.gettempdirb()
686        if pre is None:
687            pre = output_type()
688        if suf is None:
689            suf = output_type()
690        name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
691
692        try:
693            self.nameCheck(name, dir, pre, suf)
694            return name
695        except:
696            os.rmdir(name)
697            raise
698
699    def test_basic(self):
700        # mkdtemp can create directories
701        os.rmdir(self.do_create())
702        os.rmdir(self.do_create(pre="a"))
703        os.rmdir(self.do_create(suf="b"))
704        os.rmdir(self.do_create(pre="a", suf="b"))
705        os.rmdir(self.do_create(pre="aa", suf=".txt"))
706
707    def test_basic_with_bytes_names(self):
708        # mkdtemp can create directories when given all binary parts
709        d = tempfile.gettempdirb()
710        os.rmdir(self.do_create(dir=d))
711        os.rmdir(self.do_create(dir=d, pre=b"a"))
712        os.rmdir(self.do_create(dir=d, suf=b"b"))
713        os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b"))
714        os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt"))
715        with self.assertRaises(TypeError):
716            os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt"))
717        with self.assertRaises(TypeError):
718            os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt"))
719        with self.assertRaises(TypeError):
720            os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt"))
721
722    def test_basic_many(self):
723        # mkdtemp can create many directories (stochastic)
724        extant = list(range(TEST_FILES))
725        try:
726            for i in extant:
727                extant[i] = self.do_create(pre="aa")
728        finally:
729            for i in extant:
730                if(isinstance(i, str)):
731                    os.rmdir(i)
732
733    def test_choose_directory(self):
734        # mkdtemp can create directories in a user-selected directory
735        dir = tempfile.mkdtemp()
736        try:
737            os.rmdir(self.do_create(dir=dir))
738        finally:
739            os.rmdir(dir)
740
741    @unittest.skipUnless(has_stat, 'os.stat not available')
742    def test_mode(self):
743        # mkdtemp creates directories with the proper mode
744
745        dir = self.do_create()
746        try:
747            mode = stat.S_IMODE(os.stat(dir).st_mode)
748            mode &= 0o777 # Mask off sticky bits inherited from /tmp
749            expected = 0o700
750            if sys.platform == 'win32':
751                # There's no distinction among 'user', 'group' and 'world';
752                # replicate the 'user' bits.
753                user = expected >> 6
754                expected = user * (1 + 8 + 64)
755            self.assertEqual(mode, expected)
756        finally:
757            os.rmdir(dir)
758
759    def test_collision_with_existing_file(self):
760        # mkdtemp tries another name when a file with
761        # the chosen name already exists
762        with _inside_empty_temp_dir(), \
763             _mock_candidate_names('aaa', 'aaa', 'bbb'):
764            file = tempfile.NamedTemporaryFile(delete=False)
765            file.close()
766            self.assertTrue(file.name.endswith('aaa'))
767            dir = tempfile.mkdtemp()
768            self.assertTrue(dir.endswith('bbb'))
769
770    def test_collision_with_existing_directory(self):
771        # mkdtemp tries another name when a directory with
772        # the chosen name already exists
773        with _inside_empty_temp_dir(), \
774             _mock_candidate_names('aaa', 'aaa', 'bbb'):
775            dir1 = tempfile.mkdtemp()
776            self.assertTrue(dir1.endswith('aaa'))
777            dir2 = tempfile.mkdtemp()
778            self.assertTrue(dir2.endswith('bbb'))
779
780
781class TestMktemp(BaseTestCase):
782    """Test mktemp()."""
783
784    # For safety, all use of mktemp must occur in a private directory.
785    # We must also suppress the RuntimeWarning it generates.
786    def setUp(self):
787        self.dir = tempfile.mkdtemp()
788        super().setUp()
789
790    def tearDown(self):
791        if self.dir:
792            os.rmdir(self.dir)
793            self.dir = None
794        super().tearDown()
795
796    class mktemped:
797        _unlink = os.unlink
798        _bflags = tempfile._bin_openflags
799
800        def __init__(self, dir, pre, suf):
801            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
802            # Create the file.  This will raise an exception if it's
803            # mysteriously appeared in the meanwhile.
804            os.close(os.open(self.name, self._bflags, 0o600))
805
806        def __del__(self):
807            self._unlink(self.name)
808
809    def do_create(self, pre="", suf=""):
810        file = self.mktemped(self.dir, pre, suf)
811
812        self.nameCheck(file.name, self.dir, pre, suf)
813        return file
814
815    def test_basic(self):
816        # mktemp can choose usable file names
817        self.do_create()
818        self.do_create(pre="a")
819        self.do_create(suf="b")
820        self.do_create(pre="a", suf="b")
821        self.do_create(pre="aa", suf=".txt")
822
823    def test_many(self):
824        # mktemp can choose many usable file names (stochastic)
825        extant = list(range(TEST_FILES))
826        for i in extant:
827            extant[i] = self.do_create(pre="aa")
828
829##     def test_warning(self):
830##         # mktemp issues a warning when used
831##         warnings.filterwarnings("error",
832##                                 category=RuntimeWarning,
833##                                 message="mktemp")
834##         self.assertRaises(RuntimeWarning,
835##                           tempfile.mktemp, dir=self.dir)
836
837
838# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
839
840
841class TestNamedTemporaryFile(BaseTestCase):
842    """Test NamedTemporaryFile()."""
843
844    def do_create(self, dir=None, pre="", suf="", delete=True):
845        if dir is None:
846            dir = tempfile.gettempdir()
847        file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
848                                           delete=delete)
849
850        self.nameCheck(file.name, dir, pre, suf)
851        return file
852
853
854    def test_basic(self):
855        # NamedTemporaryFile can create files
856        self.do_create()
857        self.do_create(pre="a")
858        self.do_create(suf="b")
859        self.do_create(pre="a", suf="b")
860        self.do_create(pre="aa", suf=".txt")
861
862    def test_method_lookup(self):
863        # Issue #18879: Looking up a temporary file method should keep it
864        # alive long enough.
865        f = self.do_create()
866        wr = weakref.ref(f)
867        write = f.write
868        write2 = f.write
869        del f
870        write(b'foo')
871        del write
872        write2(b'bar')
873        del write2
874        if support.check_impl_detail(cpython=True):
875            # No reference cycle was created.
876            self.assertIsNone(wr())
877
878    def test_iter(self):
879        # Issue #23700: getting iterator from a temporary file should keep
880        # it alive as long as it's being iterated over
881        lines = [b'spam\n', b'eggs\n', b'beans\n']
882        def make_file():
883            f = tempfile.NamedTemporaryFile(mode='w+b')
884            f.write(b''.join(lines))
885            f.seek(0)
886            return f
887        for i, l in enumerate(make_file()):
888            self.assertEqual(l, lines[i])
889        self.assertEqual(i, len(lines) - 1)
890
891    def test_creates_named(self):
892        # NamedTemporaryFile creates files with names
893        f = tempfile.NamedTemporaryFile()
894        self.assertTrue(os.path.exists(f.name),
895                        "NamedTemporaryFile %s does not exist" % f.name)
896
897    def test_del_on_close(self):
898        # A NamedTemporaryFile is deleted when closed
899        dir = tempfile.mkdtemp()
900        try:
901            f = tempfile.NamedTemporaryFile(dir=dir)
902            f.write(b'blat')
903            f.close()
904            self.assertFalse(os.path.exists(f.name),
905                        "NamedTemporaryFile %s exists after close" % f.name)
906        finally:
907            os.rmdir(dir)
908
909    def test_dis_del_on_close(self):
910        # Tests that delete-on-close can be disabled
911        dir = tempfile.mkdtemp()
912        tmp = None
913        try:
914            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
915            tmp = f.name
916            f.write(b'blat')
917            f.close()
918            self.assertTrue(os.path.exists(f.name),
919                        "NamedTemporaryFile %s missing after close" % f.name)
920        finally:
921            if tmp is not None:
922                os.unlink(tmp)
923            os.rmdir(dir)
924
925    def test_multiple_close(self):
926        # A NamedTemporaryFile can be closed many times without error
927        f = tempfile.NamedTemporaryFile()
928        f.write(b'abc\n')
929        f.close()
930        f.close()
931        f.close()
932
933    def test_context_manager(self):
934        # A NamedTemporaryFile can be used as a context manager
935        with tempfile.NamedTemporaryFile() as f:
936            self.assertTrue(os.path.exists(f.name))
937        self.assertFalse(os.path.exists(f.name))
938        def use_closed():
939            with f:
940                pass
941        self.assertRaises(ValueError, use_closed)
942
943    def test_no_leak_fd(self):
944        # Issue #21058: don't leak file descriptor when io.open() fails
945        closed = []
946        os_close = os.close
947        def close(fd):
948            closed.append(fd)
949            os_close(fd)
950
951        with mock.patch('os.close', side_effect=close):
952            with mock.patch('io.open', side_effect=ValueError):
953                self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
954                self.assertEqual(len(closed), 1)
955
956    def test_bad_mode(self):
957        dir = tempfile.mkdtemp()
958        self.addCleanup(support.rmtree, dir)
959        with self.assertRaises(ValueError):
960            tempfile.NamedTemporaryFile(mode='wr', dir=dir)
961        with self.assertRaises(TypeError):
962            tempfile.NamedTemporaryFile(mode=2, dir=dir)
963        self.assertEqual(os.listdir(dir), [])
964
965    # How to test the mode and bufsize parameters?
966
967class TestSpooledTemporaryFile(BaseTestCase):
968    """Test SpooledTemporaryFile()."""
969
970    def do_create(self, max_size=0, dir=None, pre="", suf=""):
971        if dir is None:
972            dir = tempfile.gettempdir()
973        file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
974
975        return file
976
977
978    def test_basic(self):
979        # SpooledTemporaryFile can create files
980        f = self.do_create()
981        self.assertFalse(f._rolled)
982        f = self.do_create(max_size=100, pre="a", suf=".txt")
983        self.assertFalse(f._rolled)
984
985    def test_del_on_close(self):
986        # A SpooledTemporaryFile is deleted when closed
987        dir = tempfile.mkdtemp()
988        try:
989            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
990            self.assertFalse(f._rolled)
991            f.write(b'blat ' * 5)
992            self.assertTrue(f._rolled)
993            filename = f.name
994            f.close()
995            self.assertFalse(isinstance(filename, str) and os.path.exists(filename),
996                        "SpooledTemporaryFile %s exists after close" % filename)
997        finally:
998            os.rmdir(dir)
999
1000    def test_rewrite_small(self):
1001        # A SpooledTemporaryFile can be written to multiple within the max_size
1002        f = self.do_create(max_size=30)
1003        self.assertFalse(f._rolled)
1004        for i in range(5):
1005            f.seek(0, 0)
1006            f.write(b'x' * 20)
1007        self.assertFalse(f._rolled)
1008
1009    def test_write_sequential(self):
1010        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1011        # over afterward
1012        f = self.do_create(max_size=30)
1013        self.assertFalse(f._rolled)
1014        f.write(b'x' * 20)
1015        self.assertFalse(f._rolled)
1016        f.write(b'x' * 10)
1017        self.assertFalse(f._rolled)
1018        f.write(b'x')
1019        self.assertTrue(f._rolled)
1020
1021    def test_writelines(self):
1022        # Verify writelines with a SpooledTemporaryFile
1023        f = self.do_create()
1024        f.writelines((b'x', b'y', b'z'))
1025        f.seek(0)
1026        buf = f.read()
1027        self.assertEqual(buf, b'xyz')
1028
1029    def test_writelines_sequential(self):
1030        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
1031        # over afterward
1032        f = self.do_create(max_size=35)
1033        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
1034        self.assertFalse(f._rolled)
1035        f.write(b'x')
1036        self.assertTrue(f._rolled)
1037
1038    def test_sparse(self):
1039        # A SpooledTemporaryFile that is written late in the file will extend
1040        # when that occurs
1041        f = self.do_create(max_size=30)
1042        self.assertFalse(f._rolled)
1043        f.seek(100, 0)
1044        self.assertFalse(f._rolled)
1045        f.write(b'x')
1046        self.assertTrue(f._rolled)
1047
1048    def test_fileno(self):
1049        # A SpooledTemporaryFile should roll over to a real file on fileno()
1050        f = self.do_create(max_size=30)
1051        self.assertFalse(f._rolled)
1052        self.assertTrue(f.fileno() > 0)
1053        self.assertTrue(f._rolled)
1054
1055    def test_multiple_close_before_rollover(self):
1056        # A SpooledTemporaryFile can be closed many times without error
1057        f = tempfile.SpooledTemporaryFile()
1058        f.write(b'abc\n')
1059        self.assertFalse(f._rolled)
1060        f.close()
1061        f.close()
1062        f.close()
1063
1064    def test_multiple_close_after_rollover(self):
1065        # A SpooledTemporaryFile can be closed many times without error
1066        f = tempfile.SpooledTemporaryFile(max_size=1)
1067        f.write(b'abc\n')
1068        self.assertTrue(f._rolled)
1069        f.close()
1070        f.close()
1071        f.close()
1072
1073    def test_bound_methods(self):
1074        # It should be OK to steal a bound method from a SpooledTemporaryFile
1075        # and use it independently; when the file rolls over, those bound
1076        # methods should continue to function
1077        f = self.do_create(max_size=30)
1078        read = f.read
1079        write = f.write
1080        seek = f.seek
1081
1082        write(b"a" * 35)
1083        write(b"b" * 35)
1084        seek(0, 0)
1085        self.assertEqual(read(70), b'a'*35 + b'b'*35)
1086
1087    def test_properties(self):
1088        f = tempfile.SpooledTemporaryFile(max_size=10)
1089        f.write(b'x' * 10)
1090        self.assertFalse(f._rolled)
1091        self.assertEqual(f.mode, 'w+b')
1092        self.assertIsNone(f.name)
1093        with self.assertRaises(AttributeError):
1094            f.newlines
1095        with self.assertRaises(AttributeError):
1096            f.encoding
1097
1098        f.write(b'x')
1099        self.assertTrue(f._rolled)
1100        self.assertEqual(f.mode, 'rb+')
1101        self.assertIsNotNone(f.name)
1102        with self.assertRaises(AttributeError):
1103            f.newlines
1104        with self.assertRaises(AttributeError):
1105            f.encoding
1106
1107    def test_text_mode(self):
1108        # Creating a SpooledTemporaryFile with a text mode should produce
1109        # a file object reading and writing (Unicode) text strings.
1110        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10)
1111        f.write("abc\n")
1112        f.seek(0)
1113        self.assertEqual(f.read(), "abc\n")
1114        f.write("def\n")
1115        f.seek(0)
1116        self.assertEqual(f.read(), "abc\ndef\n")
1117        self.assertFalse(f._rolled)
1118        self.assertEqual(f.mode, 'w+')
1119        self.assertIsNone(f.name)
1120        self.assertIsNone(f.newlines)
1121        self.assertIsNone(f.encoding)
1122
1123        f.write("xyzzy\n")
1124        f.seek(0)
1125        self.assertEqual(f.read(), "abc\ndef\nxyzzy\n")
1126        # Check that Ctrl+Z doesn't truncate the file
1127        f.write("foo\x1abar\n")
1128        f.seek(0)
1129        self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n")
1130        self.assertTrue(f._rolled)
1131        self.assertEqual(f.mode, 'w+')
1132        self.assertIsNotNone(f.name)
1133        self.assertEqual(f.newlines, os.linesep)
1134        self.assertIsNotNone(f.encoding)
1135
1136    def test_text_newline_and_encoding(self):
1137        f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
1138                                          newline='', encoding='utf-8')
1139        f.write("\u039B\r\n")
1140        f.seek(0)
1141        self.assertEqual(f.read(), "\u039B\r\n")
1142        self.assertFalse(f._rolled)
1143        self.assertEqual(f.mode, 'w+')
1144        self.assertIsNone(f.name)
1145        self.assertIsNone(f.newlines)
1146        self.assertIsNone(f.encoding)
1147
1148        f.write("\u039B" * 20 + "\r\n")
1149        f.seek(0)
1150        self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n")
1151        self.assertTrue(f._rolled)
1152        self.assertEqual(f.mode, 'w+')
1153        self.assertIsNotNone(f.name)
1154        self.assertIsNotNone(f.newlines)
1155        self.assertEqual(f.encoding, 'utf-8')
1156
1157    def test_context_manager_before_rollover(self):
1158        # A SpooledTemporaryFile can be used as a context manager
1159        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1160            self.assertFalse(f._rolled)
1161            self.assertFalse(f.closed)
1162        self.assertTrue(f.closed)
1163        def use_closed():
1164            with f:
1165                pass
1166        self.assertRaises(ValueError, use_closed)
1167
1168    def test_context_manager_during_rollover(self):
1169        # A SpooledTemporaryFile can be used as a context manager
1170        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1171            self.assertFalse(f._rolled)
1172            f.write(b'abc\n')
1173            f.flush()
1174            self.assertTrue(f._rolled)
1175            self.assertFalse(f.closed)
1176        self.assertTrue(f.closed)
1177        def use_closed():
1178            with f:
1179                pass
1180        self.assertRaises(ValueError, use_closed)
1181
1182    def test_context_manager_after_rollover(self):
1183        # A SpooledTemporaryFile can be used as a context manager
1184        f = tempfile.SpooledTemporaryFile(max_size=1)
1185        f.write(b'abc\n')
1186        f.flush()
1187        self.assertTrue(f._rolled)
1188        with f:
1189            self.assertFalse(f.closed)
1190        self.assertTrue(f.closed)
1191        def use_closed():
1192            with f:
1193                pass
1194        self.assertRaises(ValueError, use_closed)
1195
1196    def test_truncate_with_size_parameter(self):
1197        # A SpooledTemporaryFile can be truncated to zero size
1198        f = tempfile.SpooledTemporaryFile(max_size=10)
1199        f.write(b'abcdefg\n')
1200        f.seek(0)
1201        f.truncate()
1202        self.assertFalse(f._rolled)
1203        self.assertEqual(f._file.getvalue(), b'')
1204        # A SpooledTemporaryFile can be truncated to a specific size
1205        f = tempfile.SpooledTemporaryFile(max_size=10)
1206        f.write(b'abcdefg\n')
1207        f.truncate(4)
1208        self.assertFalse(f._rolled)
1209        self.assertEqual(f._file.getvalue(), b'abcd')
1210        # A SpooledTemporaryFile rolls over if truncated to large size
1211        f = tempfile.SpooledTemporaryFile(max_size=10)
1212        f.write(b'abcdefg\n')
1213        f.truncate(20)
1214        self.assertTrue(f._rolled)
1215        if has_stat:
1216            self.assertEqual(os.fstat(f.fileno()).st_size, 20)
1217
1218
1219if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1220
1221    class TestTemporaryFile(BaseTestCase):
1222        """Test TemporaryFile()."""
1223
1224        def test_basic(self):
1225            # TemporaryFile can create files
1226            # No point in testing the name params - the file has no name.
1227            tempfile.TemporaryFile()
1228
1229        def test_has_no_name(self):
1230            # TemporaryFile creates files with no names (on this system)
1231            dir = tempfile.mkdtemp()
1232            f = tempfile.TemporaryFile(dir=dir)
1233            f.write(b'blat')
1234
1235            # Sneaky: because this file has no name, it should not prevent
1236            # us from removing the directory it was created in.
1237            try:
1238                os.rmdir(dir)
1239            except:
1240                # cleanup
1241                f.close()
1242                os.rmdir(dir)
1243                raise
1244
1245        def test_multiple_close(self):
1246            # A TemporaryFile can be closed many times without error
1247            f = tempfile.TemporaryFile()
1248            f.write(b'abc\n')
1249            f.close()
1250            f.close()
1251            f.close()
1252
1253        # How to test the mode and bufsize parameters?
1254        def test_mode_and_encoding(self):
1255
1256            def roundtrip(input, *args, **kwargs):
1257                with tempfile.TemporaryFile(*args, **kwargs) as fileobj:
1258                    fileobj.write(input)
1259                    fileobj.seek(0)
1260                    self.assertEqual(input, fileobj.read())
1261
1262            roundtrip(b"1234", "w+b")
1263            roundtrip("abdc\n", "w+")
1264            roundtrip("\u039B", "w+", encoding="utf-16")
1265            roundtrip("foo\r\n", "w+", newline="")
1266
1267        def test_no_leak_fd(self):
1268            # Issue #21058: don't leak file descriptor when io.open() fails
1269            closed = []
1270            os_close = os.close
1271            def close(fd):
1272                closed.append(fd)
1273                os_close(fd)
1274
1275            with mock.patch('os.close', side_effect=close):
1276                with mock.patch('io.open', side_effect=ValueError):
1277                    self.assertRaises(ValueError, tempfile.TemporaryFile)
1278                    self.assertEqual(len(closed), 1)
1279
1280
1281
1282# Helper for test_del_on_shutdown
1283class NulledModules:
1284    def __init__(self, *modules):
1285        self.refs = [mod.__dict__ for mod in modules]
1286        self.contents = [ref.copy() for ref in self.refs]
1287
1288    def __enter__(self):
1289        for d in self.refs:
1290            for key in d:
1291                d[key] = None
1292
1293    def __exit__(self, *exc_info):
1294        for d, c in zip(self.refs, self.contents):
1295            d.clear()
1296            d.update(c)
1297
1298class TestTemporaryDirectory(BaseTestCase):
1299    """Test TemporaryDirectory()."""
1300
1301    def do_create(self, dir=None, pre="", suf="", recurse=1):
1302        if dir is None:
1303            dir = tempfile.gettempdir()
1304        tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf)
1305        self.nameCheck(tmp.name, dir, pre, suf)
1306        # Create a subdirectory and some files
1307        if recurse:
1308            d1 = self.do_create(tmp.name, pre, suf, recurse-1)
1309            d1.name = None
1310        with open(os.path.join(tmp.name, "test.txt"), "wb") as f:
1311            f.write(b"Hello world!")
1312        return tmp
1313
1314    def test_mkdtemp_failure(self):
1315        # Check no additional exception if mkdtemp fails
1316        # Previously would raise AttributeError instead
1317        # (noted as part of Issue #10188)
1318        with tempfile.TemporaryDirectory() as nonexistent:
1319            pass
1320        with self.assertRaises(FileNotFoundError) as cm:
1321            tempfile.TemporaryDirectory(dir=nonexistent)
1322        self.assertEqual(cm.exception.errno, errno.ENOENT)
1323
1324    def test_explicit_cleanup(self):
1325        # A TemporaryDirectory is deleted when cleaned up
1326        dir = tempfile.mkdtemp()
1327        try:
1328            d = self.do_create(dir=dir)
1329            self.assertTrue(os.path.exists(d.name),
1330                            "TemporaryDirectory %s does not exist" % d.name)
1331            d.cleanup()
1332            self.assertFalse(os.path.exists(d.name),
1333                        "TemporaryDirectory %s exists after cleanup" % d.name)
1334        finally:
1335            os.rmdir(dir)
1336
1337    @support.skip_unless_symlink
1338    def test_cleanup_with_symlink_to_a_directory(self):
1339        # cleanup() should not follow symlinks to directories (issue #12464)
1340        d1 = self.do_create()
1341        d2 = self.do_create(recurse=0)
1342
1343        # Symlink d1/foo -> d2
1344        os.symlink(d2.name, os.path.join(d1.name, "foo"))
1345
1346        # This call to cleanup() should not follow the "foo" symlink
1347        d1.cleanup()
1348
1349        self.assertFalse(os.path.exists(d1.name),
1350                         "TemporaryDirectory %s exists after cleanup" % d1.name)
1351        self.assertTrue(os.path.exists(d2.name),
1352                        "Directory pointed to by a symlink was deleted")
1353        self.assertEqual(os.listdir(d2.name), ['test.txt'],
1354                         "Contents of the directory pointed to by a symlink "
1355                         "were deleted")
1356        d2.cleanup()
1357
1358    @support.cpython_only
1359    def test_del_on_collection(self):
1360        # A TemporaryDirectory is deleted when garbage collected
1361        dir = tempfile.mkdtemp()
1362        try:
1363            d = self.do_create(dir=dir)
1364            name = d.name
1365            del d # Rely on refcounting to invoke __del__
1366            self.assertFalse(os.path.exists(name),
1367                        "TemporaryDirectory %s exists after __del__" % name)
1368        finally:
1369            os.rmdir(dir)
1370
1371    def test_del_on_shutdown(self):
1372        # A TemporaryDirectory may be cleaned up during shutdown
1373        with self.do_create() as dir:
1374            for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'):
1375                code = """if True:
1376                    import builtins
1377                    import os
1378                    import shutil
1379                    import sys
1380                    import tempfile
1381                    import warnings
1382
1383                    tmp = tempfile.TemporaryDirectory(dir={dir!r})
1384                    sys.stdout.buffer.write(tmp.name.encode())
1385
1386                    tmp2 = os.path.join(tmp.name, 'test_dir')
1387                    os.mkdir(tmp2)
1388                    with open(os.path.join(tmp2, "test.txt"), "w") as f:
1389                        f.write("Hello world!")
1390
1391                    {mod}.tmp = tmp
1392
1393                    warnings.filterwarnings("always", category=ResourceWarning)
1394                    """.format(dir=dir, mod=mod)
1395                rc, out, err = script_helper.assert_python_ok("-c", code)
1396                tmp_name = out.decode().strip()
1397                self.assertFalse(os.path.exists(tmp_name),
1398                            "TemporaryDirectory %s exists after cleanup" % tmp_name)
1399                err = err.decode('utf-8', 'backslashreplace')
1400                self.assertNotIn("Exception ", err)
1401                self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1402
1403    def test_exit_on_shutdown(self):
1404        # Issue #22427
1405        with self.do_create() as dir:
1406            code = """if True:
1407                import sys
1408                import tempfile
1409                import warnings
1410
1411                def generator():
1412                    with tempfile.TemporaryDirectory(dir={dir!r}) as tmp:
1413                        yield tmp
1414                g = generator()
1415                sys.stdout.buffer.write(next(g).encode())
1416
1417                warnings.filterwarnings("always", category=ResourceWarning)
1418                """.format(dir=dir)
1419            rc, out, err = script_helper.assert_python_ok("-c", code)
1420            tmp_name = out.decode().strip()
1421            self.assertFalse(os.path.exists(tmp_name),
1422                        "TemporaryDirectory %s exists after cleanup" % tmp_name)
1423            err = err.decode('utf-8', 'backslashreplace')
1424            self.assertNotIn("Exception ", err)
1425            self.assertIn("ResourceWarning: Implicitly cleaning up", err)
1426
1427    def test_warnings_on_cleanup(self):
1428        # ResourceWarning will be triggered by __del__
1429        with self.do_create() as dir:
1430            d = self.do_create(dir=dir, recurse=3)
1431            name = d.name
1432
1433            # Check for the resource warning
1434            with support.check_warnings(('Implicitly', ResourceWarning), quiet=False):
1435                warnings.filterwarnings("always", category=ResourceWarning)
1436                del d
1437                support.gc_collect()
1438            self.assertFalse(os.path.exists(name),
1439                        "TemporaryDirectory %s exists after __del__" % name)
1440
1441    def test_multiple_close(self):
1442        # Can be cleaned-up many times without error
1443        d = self.do_create()
1444        d.cleanup()
1445        d.cleanup()
1446        d.cleanup()
1447
1448    def test_context_manager(self):
1449        # Can be used as a context manager
1450        d = self.do_create()
1451        with d as name:
1452            self.assertTrue(os.path.exists(name))
1453            self.assertEqual(name, d.name)
1454        self.assertFalse(os.path.exists(name))
1455
1456
1457if __name__ == "__main__":
1458    unittest.main()
1459