1# tempfile.py unit tests. 2import tempfile 3import errno 4import io 5import os 6import signal 7import sys 8import re 9import warnings 10import contextlib 11import weakref 12from unittest import mock 13 14import unittest 15from test import support 16from test.support import script_helper 17 18 19if hasattr(os, 'stat'): 20 import stat 21 has_stat = 1 22else: 23 has_stat = 0 24 25has_textmode = (tempfile._text_openflags != tempfile._bin_openflags) 26has_spawnl = hasattr(os, 'spawnl') 27 28# TEST_FILES may need to be tweaked for systems depending on the maximum 29# number of files that can be opened at one time (see ulimit -n) 30if sys.platform.startswith('openbsd'): 31 TEST_FILES = 48 32else: 33 TEST_FILES = 100 34 35# This is organized as one test for each chunk of code in tempfile.py, 36# in order of their appearance in the file. Testing which requires 37# threads is not done here. 38 39class TestLowLevelInternals(unittest.TestCase): 40 def test_infer_return_type_singles(self): 41 self.assertIs(str, tempfile._infer_return_type('')) 42 self.assertIs(bytes, tempfile._infer_return_type(b'')) 43 self.assertIs(str, tempfile._infer_return_type(None)) 44 45 def test_infer_return_type_multiples(self): 46 self.assertIs(str, tempfile._infer_return_type('', '')) 47 self.assertIs(bytes, tempfile._infer_return_type(b'', b'')) 48 with self.assertRaises(TypeError): 49 tempfile._infer_return_type('', b'') 50 with self.assertRaises(TypeError): 51 tempfile._infer_return_type(b'', '') 52 53 def test_infer_return_type_multiples_and_none(self): 54 self.assertIs(str, tempfile._infer_return_type(None, '')) 55 self.assertIs(str, tempfile._infer_return_type('', None)) 56 self.assertIs(str, tempfile._infer_return_type(None, None)) 57 self.assertIs(bytes, tempfile._infer_return_type(b'', None)) 58 self.assertIs(bytes, tempfile._infer_return_type(None, b'')) 59 with self.assertRaises(TypeError): 60 tempfile._infer_return_type('', None, b'') 61 with self.assertRaises(TypeError): 62 tempfile._infer_return_type(b'', None, '') 63 64 65# Common functionality. 66 67class BaseTestCase(unittest.TestCase): 68 69 str_check = re.compile(r"^[a-z0-9_-]{8}$") 70 b_check = re.compile(br"^[a-z0-9_-]{8}$") 71 72 def setUp(self): 73 self._warnings_manager = support.check_warnings() 74 self._warnings_manager.__enter__() 75 warnings.filterwarnings("ignore", category=RuntimeWarning, 76 message="mktemp", module=__name__) 77 78 def tearDown(self): 79 self._warnings_manager.__exit__(None, None, None) 80 81 def nameCheck(self, name, dir, pre, suf): 82 (ndir, nbase) = os.path.split(name) 83 npre = nbase[:len(pre)] 84 nsuf = nbase[len(nbase)-len(suf):] 85 86 if dir is not None: 87 self.assertIs(type(name), str if type(dir) is str else bytes, 88 "unexpected return type") 89 if pre is not None: 90 self.assertIs(type(name), str if type(pre) is str else bytes, 91 "unexpected return type") 92 if suf is not None: 93 self.assertIs(type(name), str if type(suf) is str else bytes, 94 "unexpected return type") 95 if (dir, pre, suf) == (None, None, None): 96 self.assertIs(type(name), str, "default return type must be str") 97 98 # check for equality of the absolute paths! 99 self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir), 100 "file %r not in directory %r" % (name, dir)) 101 self.assertEqual(npre, pre, 102 "file %r does not begin with %r" % (nbase, pre)) 103 self.assertEqual(nsuf, suf, 104 "file %r does not end with %r" % (nbase, suf)) 105 106 nbase = nbase[len(pre):len(nbase)-len(suf)] 107 check = self.str_check if isinstance(nbase, str) else self.b_check 108 self.assertTrue(check.match(nbase), 109 "random characters %r do not match %r" 110 % (nbase, check.pattern)) 111 112 113class TestExports(BaseTestCase): 114 def test_exports(self): 115 # There are no surprising symbols in the tempfile module 116 dict = tempfile.__dict__ 117 118 expected = { 119 "NamedTemporaryFile" : 1, 120 "TemporaryFile" : 1, 121 "mkstemp" : 1, 122 "mkdtemp" : 1, 123 "mktemp" : 1, 124 "TMP_MAX" : 1, 125 "gettempprefix" : 1, 126 "gettempprefixb" : 1, 127 "gettempdir" : 1, 128 "gettempdirb" : 1, 129 "tempdir" : 1, 130 "template" : 1, 131 "SpooledTemporaryFile" : 1, 132 "TemporaryDirectory" : 1, 133 } 134 135 unexp = [] 136 for key in dict: 137 if key[0] != '_' and key not in expected: 138 unexp.append(key) 139 self.assertTrue(len(unexp) == 0, 140 "unexpected keys: %s" % unexp) 141 142 143class TestRandomNameSequence(BaseTestCase): 144 """Test the internal iterator object _RandomNameSequence.""" 145 146 def setUp(self): 147 self.r = tempfile._RandomNameSequence() 148 super().setUp() 149 150 def test_get_six_char_str(self): 151 # _RandomNameSequence returns a six-character string 152 s = next(self.r) 153 self.nameCheck(s, '', '', '') 154 155 def test_many(self): 156 # _RandomNameSequence returns no duplicate strings (stochastic) 157 158 dict = {} 159 r = self.r 160 for i in range(TEST_FILES): 161 s = next(r) 162 self.nameCheck(s, '', '', '') 163 self.assertNotIn(s, dict) 164 dict[s] = 1 165 166 def supports_iter(self): 167 # _RandomNameSequence supports the iterator protocol 168 169 i = 0 170 r = self.r 171 for s in r: 172 i += 1 173 if i == 20: 174 break 175 176 @unittest.skipUnless(hasattr(os, 'fork'), 177 "os.fork is required for this test") 178 def test_process_awareness(self): 179 # ensure that the random source differs between 180 # child and parent. 181 read_fd, write_fd = os.pipe() 182 pid = None 183 try: 184 pid = os.fork() 185 if not pid: 186 # child process 187 os.close(read_fd) 188 os.write(write_fd, next(self.r).encode("ascii")) 189 os.close(write_fd) 190 # bypass the normal exit handlers- leave those to 191 # the parent. 192 os._exit(0) 193 194 # parent process 195 parent_value = next(self.r) 196 child_value = os.read(read_fd, len(parent_value)).decode("ascii") 197 finally: 198 if pid: 199 # best effort to ensure the process can't bleed out 200 # via any bugs above 201 try: 202 os.kill(pid, signal.SIGKILL) 203 except OSError: 204 pass 205 206 # Read the process exit status to avoid zombie process 207 os.waitpid(pid, 0) 208 209 os.close(read_fd) 210 os.close(write_fd) 211 self.assertNotEqual(child_value, parent_value) 212 213 214 215class TestCandidateTempdirList(BaseTestCase): 216 """Test the internal function _candidate_tempdir_list.""" 217 218 def test_nonempty_list(self): 219 # _candidate_tempdir_list returns a nonempty list of strings 220 221 cand = tempfile._candidate_tempdir_list() 222 223 self.assertFalse(len(cand) == 0) 224 for c in cand: 225 self.assertIsInstance(c, str) 226 227 def test_wanted_dirs(self): 228 # _candidate_tempdir_list contains the expected directories 229 230 # Make sure the interesting environment variables are all set. 231 with support.EnvironmentVarGuard() as env: 232 for envname in 'TMPDIR', 'TEMP', 'TMP': 233 dirname = os.getenv(envname) 234 if not dirname: 235 env[envname] = os.path.abspath(envname) 236 237 cand = tempfile._candidate_tempdir_list() 238 239 for envname in 'TMPDIR', 'TEMP', 'TMP': 240 dirname = os.getenv(envname) 241 if not dirname: raise ValueError 242 self.assertIn(dirname, cand) 243 244 try: 245 dirname = os.getcwd() 246 except (AttributeError, OSError): 247 dirname = os.curdir 248 249 self.assertIn(dirname, cand) 250 251 # Not practical to try to verify the presence of OS-specific 252 # paths in this list. 253 254 255# We test _get_default_tempdir some more by testing gettempdir. 256 257class TestGetDefaultTempdir(BaseTestCase): 258 """Test _get_default_tempdir().""" 259 260 def test_no_files_left_behind(self): 261 # use a private empty directory 262 with tempfile.TemporaryDirectory() as our_temp_directory: 263 # force _get_default_tempdir() to consider our empty directory 264 def our_candidate_list(): 265 return [our_temp_directory] 266 267 with support.swap_attr(tempfile, "_candidate_tempdir_list", 268 our_candidate_list): 269 # verify our directory is empty after _get_default_tempdir() 270 tempfile._get_default_tempdir() 271 self.assertEqual(os.listdir(our_temp_directory), []) 272 273 def raise_OSError(*args, **kwargs): 274 raise OSError() 275 276 with support.swap_attr(io, "open", raise_OSError): 277 # test again with failing io.open() 278 with self.assertRaises(FileNotFoundError): 279 tempfile._get_default_tempdir() 280 self.assertEqual(os.listdir(our_temp_directory), []) 281 282 def bad_writer(*args, **kwargs): 283 fp = orig_open(*args, **kwargs) 284 fp.write = raise_OSError 285 return fp 286 287 with support.swap_attr(io, "open", bad_writer) as orig_open: 288 # test again with failing write() 289 with self.assertRaises(FileNotFoundError): 290 tempfile._get_default_tempdir() 291 self.assertEqual(os.listdir(our_temp_directory), []) 292 293 294class TestGetCandidateNames(BaseTestCase): 295 """Test the internal function _get_candidate_names.""" 296 297 def test_retval(self): 298 # _get_candidate_names returns a _RandomNameSequence object 299 obj = tempfile._get_candidate_names() 300 self.assertIsInstance(obj, tempfile._RandomNameSequence) 301 302 def test_same_thing(self): 303 # _get_candidate_names always returns the same object 304 a = tempfile._get_candidate_names() 305 b = tempfile._get_candidate_names() 306 307 self.assertTrue(a is b) 308 309 310@contextlib.contextmanager 311def _inside_empty_temp_dir(): 312 dir = tempfile.mkdtemp() 313 try: 314 with support.swap_attr(tempfile, 'tempdir', dir): 315 yield 316 finally: 317 support.rmtree(dir) 318 319 320def _mock_candidate_names(*names): 321 return support.swap_attr(tempfile, 322 '_get_candidate_names', 323 lambda: iter(names)) 324 325 326class TestBadTempdir: 327 328 def test_read_only_directory(self): 329 with _inside_empty_temp_dir(): 330 oldmode = mode = os.stat(tempfile.tempdir).st_mode 331 mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH) 332 os.chmod(tempfile.tempdir, mode) 333 try: 334 if os.access(tempfile.tempdir, os.W_OK): 335 self.skipTest("can't set the directory read-only") 336 with self.assertRaises(PermissionError): 337 self.make_temp() 338 self.assertEqual(os.listdir(tempfile.tempdir), []) 339 finally: 340 os.chmod(tempfile.tempdir, oldmode) 341 342 def test_nonexisting_directory(self): 343 with _inside_empty_temp_dir(): 344 tempdir = os.path.join(tempfile.tempdir, 'nonexistent') 345 with support.swap_attr(tempfile, 'tempdir', tempdir): 346 with self.assertRaises(FileNotFoundError): 347 self.make_temp() 348 349 def test_non_directory(self): 350 with _inside_empty_temp_dir(): 351 tempdir = os.path.join(tempfile.tempdir, 'file') 352 open(tempdir, 'wb').close() 353 with support.swap_attr(tempfile, 'tempdir', tempdir): 354 with self.assertRaises((NotADirectoryError, FileNotFoundError)): 355 self.make_temp() 356 357 358class TestMkstempInner(TestBadTempdir, BaseTestCase): 359 """Test the internal function _mkstemp_inner.""" 360 361 class mkstemped: 362 _bflags = tempfile._bin_openflags 363 _tflags = tempfile._text_openflags 364 _close = os.close 365 _unlink = os.unlink 366 367 def __init__(self, dir, pre, suf, bin): 368 if bin: flags = self._bflags 369 else: flags = self._tflags 370 371 output_type = tempfile._infer_return_type(dir, pre, suf) 372 (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags, output_type) 373 374 def write(self, str): 375 os.write(self.fd, str) 376 377 def __del__(self): 378 self._close(self.fd) 379 self._unlink(self.name) 380 381 def do_create(self, dir=None, pre=None, suf=None, bin=1): 382 output_type = tempfile._infer_return_type(dir, pre, suf) 383 if dir is None: 384 if output_type is str: 385 dir = tempfile.gettempdir() 386 else: 387 dir = tempfile.gettempdirb() 388 if pre is None: 389 pre = output_type() 390 if suf is None: 391 suf = output_type() 392 file = self.mkstemped(dir, pre, suf, bin) 393 394 self.nameCheck(file.name, dir, pre, suf) 395 return file 396 397 def test_basic(self): 398 # _mkstemp_inner can create files 399 self.do_create().write(b"blat") 400 self.do_create(pre="a").write(b"blat") 401 self.do_create(suf="b").write(b"blat") 402 self.do_create(pre="a", suf="b").write(b"blat") 403 self.do_create(pre="aa", suf=".txt").write(b"blat") 404 405 def test_basic_with_bytes_names(self): 406 # _mkstemp_inner can create files when given name parts all 407 # specified as bytes. 408 dir_b = tempfile.gettempdirb() 409 self.do_create(dir=dir_b, suf=b"").write(b"blat") 410 self.do_create(dir=dir_b, pre=b"a").write(b"blat") 411 self.do_create(dir=dir_b, suf=b"b").write(b"blat") 412 self.do_create(dir=dir_b, pre=b"a", suf=b"b").write(b"blat") 413 self.do_create(dir=dir_b, pre=b"aa", suf=b".txt").write(b"blat") 414 # Can't mix str & binary types in the args. 415 with self.assertRaises(TypeError): 416 self.do_create(dir="", suf=b"").write(b"blat") 417 with self.assertRaises(TypeError): 418 self.do_create(dir=dir_b, pre="").write(b"blat") 419 with self.assertRaises(TypeError): 420 self.do_create(dir=dir_b, pre=b"", suf="").write(b"blat") 421 422 def test_basic_many(self): 423 # _mkstemp_inner can create many files (stochastic) 424 extant = list(range(TEST_FILES)) 425 for i in extant: 426 extant[i] = self.do_create(pre="aa") 427 428 def test_choose_directory(self): 429 # _mkstemp_inner can create files in a user-selected directory 430 dir = tempfile.mkdtemp() 431 try: 432 self.do_create(dir=dir).write(b"blat") 433 finally: 434 os.rmdir(dir) 435 436 @unittest.skipUnless(has_stat, 'os.stat not available') 437 def test_file_mode(self): 438 # _mkstemp_inner creates files with the proper mode 439 440 file = self.do_create() 441 mode = stat.S_IMODE(os.stat(file.name).st_mode) 442 expected = 0o600 443 if sys.platform == 'win32': 444 # There's no distinction among 'user', 'group' and 'world'; 445 # replicate the 'user' bits. 446 user = expected >> 6 447 expected = user * (1 + 8 + 64) 448 self.assertEqual(mode, expected) 449 450 @unittest.skipUnless(has_spawnl, 'os.spawnl not available') 451 def test_noinherit(self): 452 # _mkstemp_inner file handles are not inherited by child processes 453 454 if support.verbose: 455 v="v" 456 else: 457 v="q" 458 459 file = self.do_create() 460 self.assertEqual(os.get_inheritable(file.fd), False) 461 fd = "%d" % file.fd 462 463 try: 464 me = __file__ 465 except NameError: 466 me = sys.argv[0] 467 468 # We have to exec something, so that FD_CLOEXEC will take 469 # effect. The core of this test is therefore in 470 # tf_inherit_check.py, which see. 471 tester = os.path.join(os.path.dirname(os.path.abspath(me)), 472 "tf_inherit_check.py") 473 474 # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted, 475 # but an arg with embedded spaces should be decorated with double 476 # quotes on each end 477 if sys.platform == 'win32': 478 decorated = '"%s"' % sys.executable 479 tester = '"%s"' % tester 480 else: 481 decorated = sys.executable 482 483 retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd) 484 self.assertFalse(retval < 0, 485 "child process caught fatal signal %d" % -retval) 486 self.assertFalse(retval > 0, "child process reports failure %d"%retval) 487 488 @unittest.skipUnless(has_textmode, "text mode not available") 489 def test_textmode(self): 490 # _mkstemp_inner can create files in text mode 491 492 # A text file is truncated at the first Ctrl+Z byte 493 f = self.do_create(bin=0) 494 f.write(b"blat\x1a") 495 f.write(b"extra\n") 496 os.lseek(f.fd, 0, os.SEEK_SET) 497 self.assertEqual(os.read(f.fd, 20), b"blat") 498 499 def make_temp(self): 500 return tempfile._mkstemp_inner(tempfile.gettempdir(), 501 tempfile.gettempprefix(), 502 '', 503 tempfile._bin_openflags, 504 str) 505 506 def test_collision_with_existing_file(self): 507 # _mkstemp_inner tries another name when a file with 508 # the chosen name already exists 509 with _inside_empty_temp_dir(), \ 510 _mock_candidate_names('aaa', 'aaa', 'bbb'): 511 (fd1, name1) = self.make_temp() 512 os.close(fd1) 513 self.assertTrue(name1.endswith('aaa')) 514 515 (fd2, name2) = self.make_temp() 516 os.close(fd2) 517 self.assertTrue(name2.endswith('bbb')) 518 519 def test_collision_with_existing_directory(self): 520 # _mkstemp_inner tries another name when a directory with 521 # the chosen name already exists 522 with _inside_empty_temp_dir(), \ 523 _mock_candidate_names('aaa', 'aaa', 'bbb'): 524 dir = tempfile.mkdtemp() 525 self.assertTrue(dir.endswith('aaa')) 526 527 (fd, name) = self.make_temp() 528 os.close(fd) 529 self.assertTrue(name.endswith('bbb')) 530 531 532class TestGetTempPrefix(BaseTestCase): 533 """Test gettempprefix().""" 534 535 def test_sane_template(self): 536 # gettempprefix returns a nonempty prefix string 537 p = tempfile.gettempprefix() 538 539 self.assertIsInstance(p, str) 540 self.assertGreater(len(p), 0) 541 542 pb = tempfile.gettempprefixb() 543 544 self.assertIsInstance(pb, bytes) 545 self.assertGreater(len(pb), 0) 546 547 def test_usable_template(self): 548 # gettempprefix returns a usable prefix string 549 550 # Create a temp directory, avoiding use of the prefix. 551 # Then attempt to create a file whose name is 552 # prefix + 'xxxxxx.xxx' in that directory. 553 p = tempfile.gettempprefix() + "xxxxxx.xxx" 554 d = tempfile.mkdtemp(prefix="") 555 try: 556 p = os.path.join(d, p) 557 fd = os.open(p, os.O_RDWR | os.O_CREAT) 558 os.close(fd) 559 os.unlink(p) 560 finally: 561 os.rmdir(d) 562 563 564class TestGetTempDir(BaseTestCase): 565 """Test gettempdir().""" 566 567 def test_directory_exists(self): 568 # gettempdir returns a directory which exists 569 570 for d in (tempfile.gettempdir(), tempfile.gettempdirb()): 571 self.assertTrue(os.path.isabs(d) or d == os.curdir, 572 "%r is not an absolute path" % d) 573 self.assertTrue(os.path.isdir(d), 574 "%r is not a directory" % d) 575 576 def test_directory_writable(self): 577 # gettempdir returns a directory writable by the user 578 579 # sneaky: just instantiate a NamedTemporaryFile, which 580 # defaults to writing into the directory returned by 581 # gettempdir. 582 file = tempfile.NamedTemporaryFile() 583 file.write(b"blat") 584 file.close() 585 586 def test_same_thing(self): 587 # gettempdir always returns the same object 588 a = tempfile.gettempdir() 589 b = tempfile.gettempdir() 590 c = tempfile.gettempdirb() 591 592 self.assertTrue(a is b) 593 self.assertNotEqual(type(a), type(c)) 594 self.assertEqual(a, os.fsdecode(c)) 595 596 def test_case_sensitive(self): 597 # gettempdir should not flatten its case 598 # even on a case-insensitive file system 599 case_sensitive_tempdir = tempfile.mkdtemp("-Temp") 600 _tempdir, tempfile.tempdir = tempfile.tempdir, None 601 try: 602 with support.EnvironmentVarGuard() as env: 603 # Fake the first env var which is checked as a candidate 604 env["TMPDIR"] = case_sensitive_tempdir 605 self.assertEqual(tempfile.gettempdir(), case_sensitive_tempdir) 606 finally: 607 tempfile.tempdir = _tempdir 608 support.rmdir(case_sensitive_tempdir) 609 610 611class TestMkstemp(BaseTestCase): 612 """Test mkstemp().""" 613 614 def do_create(self, dir=None, pre=None, suf=None): 615 output_type = tempfile._infer_return_type(dir, pre, suf) 616 if dir is None: 617 if output_type is str: 618 dir = tempfile.gettempdir() 619 else: 620 dir = tempfile.gettempdirb() 621 if pre is None: 622 pre = output_type() 623 if suf is None: 624 suf = output_type() 625 (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf) 626 (ndir, nbase) = os.path.split(name) 627 adir = os.path.abspath(dir) 628 self.assertEqual(adir, ndir, 629 "Directory '%s' incorrectly returned as '%s'" % (adir, ndir)) 630 631 try: 632 self.nameCheck(name, dir, pre, suf) 633 finally: 634 os.close(fd) 635 os.unlink(name) 636 637 def test_basic(self): 638 # mkstemp can create files 639 self.do_create() 640 self.do_create(pre="a") 641 self.do_create(suf="b") 642 self.do_create(pre="a", suf="b") 643 self.do_create(pre="aa", suf=".txt") 644 self.do_create(dir=".") 645 646 def test_basic_with_bytes_names(self): 647 # mkstemp can create files when given name parts all 648 # specified as bytes. 649 d = tempfile.gettempdirb() 650 self.do_create(dir=d, suf=b"") 651 self.do_create(dir=d, pre=b"a") 652 self.do_create(dir=d, suf=b"b") 653 self.do_create(dir=d, pre=b"a", suf=b"b") 654 self.do_create(dir=d, pre=b"aa", suf=b".txt") 655 self.do_create(dir=b".") 656 with self.assertRaises(TypeError): 657 self.do_create(dir=".", pre=b"aa", suf=b".txt") 658 with self.assertRaises(TypeError): 659 self.do_create(dir=b".", pre="aa", suf=b".txt") 660 with self.assertRaises(TypeError): 661 self.do_create(dir=b".", pre=b"aa", suf=".txt") 662 663 664 def test_choose_directory(self): 665 # mkstemp can create directories in a user-selected directory 666 dir = tempfile.mkdtemp() 667 try: 668 self.do_create(dir=dir) 669 finally: 670 os.rmdir(dir) 671 672 673class TestMkdtemp(TestBadTempdir, BaseTestCase): 674 """Test mkdtemp().""" 675 676 def make_temp(self): 677 return tempfile.mkdtemp() 678 679 def do_create(self, dir=None, pre=None, suf=None): 680 output_type = tempfile._infer_return_type(dir, pre, suf) 681 if dir is None: 682 if output_type is str: 683 dir = tempfile.gettempdir() 684 else: 685 dir = tempfile.gettempdirb() 686 if pre is None: 687 pre = output_type() 688 if suf is None: 689 suf = output_type() 690 name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf) 691 692 try: 693 self.nameCheck(name, dir, pre, suf) 694 return name 695 except: 696 os.rmdir(name) 697 raise 698 699 def test_basic(self): 700 # mkdtemp can create directories 701 os.rmdir(self.do_create()) 702 os.rmdir(self.do_create(pre="a")) 703 os.rmdir(self.do_create(suf="b")) 704 os.rmdir(self.do_create(pre="a", suf="b")) 705 os.rmdir(self.do_create(pre="aa", suf=".txt")) 706 707 def test_basic_with_bytes_names(self): 708 # mkdtemp can create directories when given all binary parts 709 d = tempfile.gettempdirb() 710 os.rmdir(self.do_create(dir=d)) 711 os.rmdir(self.do_create(dir=d, pre=b"a")) 712 os.rmdir(self.do_create(dir=d, suf=b"b")) 713 os.rmdir(self.do_create(dir=d, pre=b"a", suf=b"b")) 714 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=b".txt")) 715 with self.assertRaises(TypeError): 716 os.rmdir(self.do_create(dir=d, pre="aa", suf=b".txt")) 717 with self.assertRaises(TypeError): 718 os.rmdir(self.do_create(dir=d, pre=b"aa", suf=".txt")) 719 with self.assertRaises(TypeError): 720 os.rmdir(self.do_create(dir="", pre=b"aa", suf=b".txt")) 721 722 def test_basic_many(self): 723 # mkdtemp can create many directories (stochastic) 724 extant = list(range(TEST_FILES)) 725 try: 726 for i in extant: 727 extant[i] = self.do_create(pre="aa") 728 finally: 729 for i in extant: 730 if(isinstance(i, str)): 731 os.rmdir(i) 732 733 def test_choose_directory(self): 734 # mkdtemp can create directories in a user-selected directory 735 dir = tempfile.mkdtemp() 736 try: 737 os.rmdir(self.do_create(dir=dir)) 738 finally: 739 os.rmdir(dir) 740 741 @unittest.skipUnless(has_stat, 'os.stat not available') 742 def test_mode(self): 743 # mkdtemp creates directories with the proper mode 744 745 dir = self.do_create() 746 try: 747 mode = stat.S_IMODE(os.stat(dir).st_mode) 748 mode &= 0o777 # Mask off sticky bits inherited from /tmp 749 expected = 0o700 750 if sys.platform == 'win32': 751 # There's no distinction among 'user', 'group' and 'world'; 752 # replicate the 'user' bits. 753 user = expected >> 6 754 expected = user * (1 + 8 + 64) 755 self.assertEqual(mode, expected) 756 finally: 757 os.rmdir(dir) 758 759 def test_collision_with_existing_file(self): 760 # mkdtemp tries another name when a file with 761 # the chosen name already exists 762 with _inside_empty_temp_dir(), \ 763 _mock_candidate_names('aaa', 'aaa', 'bbb'): 764 file = tempfile.NamedTemporaryFile(delete=False) 765 file.close() 766 self.assertTrue(file.name.endswith('aaa')) 767 dir = tempfile.mkdtemp() 768 self.assertTrue(dir.endswith('bbb')) 769 770 def test_collision_with_existing_directory(self): 771 # mkdtemp tries another name when a directory with 772 # the chosen name already exists 773 with _inside_empty_temp_dir(), \ 774 _mock_candidate_names('aaa', 'aaa', 'bbb'): 775 dir1 = tempfile.mkdtemp() 776 self.assertTrue(dir1.endswith('aaa')) 777 dir2 = tempfile.mkdtemp() 778 self.assertTrue(dir2.endswith('bbb')) 779 780 781class TestMktemp(BaseTestCase): 782 """Test mktemp().""" 783 784 # For safety, all use of mktemp must occur in a private directory. 785 # We must also suppress the RuntimeWarning it generates. 786 def setUp(self): 787 self.dir = tempfile.mkdtemp() 788 super().setUp() 789 790 def tearDown(self): 791 if self.dir: 792 os.rmdir(self.dir) 793 self.dir = None 794 super().tearDown() 795 796 class mktemped: 797 _unlink = os.unlink 798 _bflags = tempfile._bin_openflags 799 800 def __init__(self, dir, pre, suf): 801 self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf) 802 # Create the file. This will raise an exception if it's 803 # mysteriously appeared in the meanwhile. 804 os.close(os.open(self.name, self._bflags, 0o600)) 805 806 def __del__(self): 807 self._unlink(self.name) 808 809 def do_create(self, pre="", suf=""): 810 file = self.mktemped(self.dir, pre, suf) 811 812 self.nameCheck(file.name, self.dir, pre, suf) 813 return file 814 815 def test_basic(self): 816 # mktemp can choose usable file names 817 self.do_create() 818 self.do_create(pre="a") 819 self.do_create(suf="b") 820 self.do_create(pre="a", suf="b") 821 self.do_create(pre="aa", suf=".txt") 822 823 def test_many(self): 824 # mktemp can choose many usable file names (stochastic) 825 extant = list(range(TEST_FILES)) 826 for i in extant: 827 extant[i] = self.do_create(pre="aa") 828 829## def test_warning(self): 830## # mktemp issues a warning when used 831## warnings.filterwarnings("error", 832## category=RuntimeWarning, 833## message="mktemp") 834## self.assertRaises(RuntimeWarning, 835## tempfile.mktemp, dir=self.dir) 836 837 838# We test _TemporaryFileWrapper by testing NamedTemporaryFile. 839 840 841class TestNamedTemporaryFile(BaseTestCase): 842 """Test NamedTemporaryFile().""" 843 844 def do_create(self, dir=None, pre="", suf="", delete=True): 845 if dir is None: 846 dir = tempfile.gettempdir() 847 file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf, 848 delete=delete) 849 850 self.nameCheck(file.name, dir, pre, suf) 851 return file 852 853 854 def test_basic(self): 855 # NamedTemporaryFile can create files 856 self.do_create() 857 self.do_create(pre="a") 858 self.do_create(suf="b") 859 self.do_create(pre="a", suf="b") 860 self.do_create(pre="aa", suf=".txt") 861 862 def test_method_lookup(self): 863 # Issue #18879: Looking up a temporary file method should keep it 864 # alive long enough. 865 f = self.do_create() 866 wr = weakref.ref(f) 867 write = f.write 868 write2 = f.write 869 del f 870 write(b'foo') 871 del write 872 write2(b'bar') 873 del write2 874 if support.check_impl_detail(cpython=True): 875 # No reference cycle was created. 876 self.assertIsNone(wr()) 877 878 def test_iter(self): 879 # Issue #23700: getting iterator from a temporary file should keep 880 # it alive as long as it's being iterated over 881 lines = [b'spam\n', b'eggs\n', b'beans\n'] 882 def make_file(): 883 f = tempfile.NamedTemporaryFile(mode='w+b') 884 f.write(b''.join(lines)) 885 f.seek(0) 886 return f 887 for i, l in enumerate(make_file()): 888 self.assertEqual(l, lines[i]) 889 self.assertEqual(i, len(lines) - 1) 890 891 def test_creates_named(self): 892 # NamedTemporaryFile creates files with names 893 f = tempfile.NamedTemporaryFile() 894 self.assertTrue(os.path.exists(f.name), 895 "NamedTemporaryFile %s does not exist" % f.name) 896 897 def test_del_on_close(self): 898 # A NamedTemporaryFile is deleted when closed 899 dir = tempfile.mkdtemp() 900 try: 901 f = tempfile.NamedTemporaryFile(dir=dir) 902 f.write(b'blat') 903 f.close() 904 self.assertFalse(os.path.exists(f.name), 905 "NamedTemporaryFile %s exists after close" % f.name) 906 finally: 907 os.rmdir(dir) 908 909 def test_dis_del_on_close(self): 910 # Tests that delete-on-close can be disabled 911 dir = tempfile.mkdtemp() 912 tmp = None 913 try: 914 f = tempfile.NamedTemporaryFile(dir=dir, delete=False) 915 tmp = f.name 916 f.write(b'blat') 917 f.close() 918 self.assertTrue(os.path.exists(f.name), 919 "NamedTemporaryFile %s missing after close" % f.name) 920 finally: 921 if tmp is not None: 922 os.unlink(tmp) 923 os.rmdir(dir) 924 925 def test_multiple_close(self): 926 # A NamedTemporaryFile can be closed many times without error 927 f = tempfile.NamedTemporaryFile() 928 f.write(b'abc\n') 929 f.close() 930 f.close() 931 f.close() 932 933 def test_context_manager(self): 934 # A NamedTemporaryFile can be used as a context manager 935 with tempfile.NamedTemporaryFile() as f: 936 self.assertTrue(os.path.exists(f.name)) 937 self.assertFalse(os.path.exists(f.name)) 938 def use_closed(): 939 with f: 940 pass 941 self.assertRaises(ValueError, use_closed) 942 943 def test_no_leak_fd(self): 944 # Issue #21058: don't leak file descriptor when io.open() fails 945 closed = [] 946 os_close = os.close 947 def close(fd): 948 closed.append(fd) 949 os_close(fd) 950 951 with mock.patch('os.close', side_effect=close): 952 with mock.patch('io.open', side_effect=ValueError): 953 self.assertRaises(ValueError, tempfile.NamedTemporaryFile) 954 self.assertEqual(len(closed), 1) 955 956 def test_bad_mode(self): 957 dir = tempfile.mkdtemp() 958 self.addCleanup(support.rmtree, dir) 959 with self.assertRaises(ValueError): 960 tempfile.NamedTemporaryFile(mode='wr', dir=dir) 961 with self.assertRaises(TypeError): 962 tempfile.NamedTemporaryFile(mode=2, dir=dir) 963 self.assertEqual(os.listdir(dir), []) 964 965 # How to test the mode and bufsize parameters? 966 967class TestSpooledTemporaryFile(BaseTestCase): 968 """Test SpooledTemporaryFile().""" 969 970 def do_create(self, max_size=0, dir=None, pre="", suf=""): 971 if dir is None: 972 dir = tempfile.gettempdir() 973 file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf) 974 975 return file 976 977 978 def test_basic(self): 979 # SpooledTemporaryFile can create files 980 f = self.do_create() 981 self.assertFalse(f._rolled) 982 f = self.do_create(max_size=100, pre="a", suf=".txt") 983 self.assertFalse(f._rolled) 984 985 def test_del_on_close(self): 986 # A SpooledTemporaryFile is deleted when closed 987 dir = tempfile.mkdtemp() 988 try: 989 f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir) 990 self.assertFalse(f._rolled) 991 f.write(b'blat ' * 5) 992 self.assertTrue(f._rolled) 993 filename = f.name 994 f.close() 995 self.assertFalse(isinstance(filename, str) and os.path.exists(filename), 996 "SpooledTemporaryFile %s exists after close" % filename) 997 finally: 998 os.rmdir(dir) 999 1000 def test_rewrite_small(self): 1001 # A SpooledTemporaryFile can be written to multiple within the max_size 1002 f = self.do_create(max_size=30) 1003 self.assertFalse(f._rolled) 1004 for i in range(5): 1005 f.seek(0, 0) 1006 f.write(b'x' * 20) 1007 self.assertFalse(f._rolled) 1008 1009 def test_write_sequential(self): 1010 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1011 # over afterward 1012 f = self.do_create(max_size=30) 1013 self.assertFalse(f._rolled) 1014 f.write(b'x' * 20) 1015 self.assertFalse(f._rolled) 1016 f.write(b'x' * 10) 1017 self.assertFalse(f._rolled) 1018 f.write(b'x') 1019 self.assertTrue(f._rolled) 1020 1021 def test_writelines(self): 1022 # Verify writelines with a SpooledTemporaryFile 1023 f = self.do_create() 1024 f.writelines((b'x', b'y', b'z')) 1025 f.seek(0) 1026 buf = f.read() 1027 self.assertEqual(buf, b'xyz') 1028 1029 def test_writelines_sequential(self): 1030 # A SpooledTemporaryFile should hold exactly max_size bytes, and roll 1031 # over afterward 1032 f = self.do_create(max_size=35) 1033 f.writelines((b'x' * 20, b'x' * 10, b'x' * 5)) 1034 self.assertFalse(f._rolled) 1035 f.write(b'x') 1036 self.assertTrue(f._rolled) 1037 1038 def test_sparse(self): 1039 # A SpooledTemporaryFile that is written late in the file will extend 1040 # when that occurs 1041 f = self.do_create(max_size=30) 1042 self.assertFalse(f._rolled) 1043 f.seek(100, 0) 1044 self.assertFalse(f._rolled) 1045 f.write(b'x') 1046 self.assertTrue(f._rolled) 1047 1048 def test_fileno(self): 1049 # A SpooledTemporaryFile should roll over to a real file on fileno() 1050 f = self.do_create(max_size=30) 1051 self.assertFalse(f._rolled) 1052 self.assertTrue(f.fileno() > 0) 1053 self.assertTrue(f._rolled) 1054 1055 def test_multiple_close_before_rollover(self): 1056 # A SpooledTemporaryFile can be closed many times without error 1057 f = tempfile.SpooledTemporaryFile() 1058 f.write(b'abc\n') 1059 self.assertFalse(f._rolled) 1060 f.close() 1061 f.close() 1062 f.close() 1063 1064 def test_multiple_close_after_rollover(self): 1065 # A SpooledTemporaryFile can be closed many times without error 1066 f = tempfile.SpooledTemporaryFile(max_size=1) 1067 f.write(b'abc\n') 1068 self.assertTrue(f._rolled) 1069 f.close() 1070 f.close() 1071 f.close() 1072 1073 def test_bound_methods(self): 1074 # It should be OK to steal a bound method from a SpooledTemporaryFile 1075 # and use it independently; when the file rolls over, those bound 1076 # methods should continue to function 1077 f = self.do_create(max_size=30) 1078 read = f.read 1079 write = f.write 1080 seek = f.seek 1081 1082 write(b"a" * 35) 1083 write(b"b" * 35) 1084 seek(0, 0) 1085 self.assertEqual(read(70), b'a'*35 + b'b'*35) 1086 1087 def test_properties(self): 1088 f = tempfile.SpooledTemporaryFile(max_size=10) 1089 f.write(b'x' * 10) 1090 self.assertFalse(f._rolled) 1091 self.assertEqual(f.mode, 'w+b') 1092 self.assertIsNone(f.name) 1093 with self.assertRaises(AttributeError): 1094 f.newlines 1095 with self.assertRaises(AttributeError): 1096 f.encoding 1097 1098 f.write(b'x') 1099 self.assertTrue(f._rolled) 1100 self.assertEqual(f.mode, 'rb+') 1101 self.assertIsNotNone(f.name) 1102 with self.assertRaises(AttributeError): 1103 f.newlines 1104 with self.assertRaises(AttributeError): 1105 f.encoding 1106 1107 def test_text_mode(self): 1108 # Creating a SpooledTemporaryFile with a text mode should produce 1109 # a file object reading and writing (Unicode) text strings. 1110 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) 1111 f.write("abc\n") 1112 f.seek(0) 1113 self.assertEqual(f.read(), "abc\n") 1114 f.write("def\n") 1115 f.seek(0) 1116 self.assertEqual(f.read(), "abc\ndef\n") 1117 self.assertFalse(f._rolled) 1118 self.assertEqual(f.mode, 'w+') 1119 self.assertIsNone(f.name) 1120 self.assertIsNone(f.newlines) 1121 self.assertIsNone(f.encoding) 1122 1123 f.write("xyzzy\n") 1124 f.seek(0) 1125 self.assertEqual(f.read(), "abc\ndef\nxyzzy\n") 1126 # Check that Ctrl+Z doesn't truncate the file 1127 f.write("foo\x1abar\n") 1128 f.seek(0) 1129 self.assertEqual(f.read(), "abc\ndef\nxyzzy\nfoo\x1abar\n") 1130 self.assertTrue(f._rolled) 1131 self.assertEqual(f.mode, 'w+') 1132 self.assertIsNotNone(f.name) 1133 self.assertEqual(f.newlines, os.linesep) 1134 self.assertIsNotNone(f.encoding) 1135 1136 def test_text_newline_and_encoding(self): 1137 f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, 1138 newline='', encoding='utf-8') 1139 f.write("\u039B\r\n") 1140 f.seek(0) 1141 self.assertEqual(f.read(), "\u039B\r\n") 1142 self.assertFalse(f._rolled) 1143 self.assertEqual(f.mode, 'w+') 1144 self.assertIsNone(f.name) 1145 self.assertIsNone(f.newlines) 1146 self.assertIsNone(f.encoding) 1147 1148 f.write("\u039B" * 20 + "\r\n") 1149 f.seek(0) 1150 self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") 1151 self.assertTrue(f._rolled) 1152 self.assertEqual(f.mode, 'w+') 1153 self.assertIsNotNone(f.name) 1154 self.assertIsNotNone(f.newlines) 1155 self.assertEqual(f.encoding, 'utf-8') 1156 1157 def test_context_manager_before_rollover(self): 1158 # A SpooledTemporaryFile can be used as a context manager 1159 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1160 self.assertFalse(f._rolled) 1161 self.assertFalse(f.closed) 1162 self.assertTrue(f.closed) 1163 def use_closed(): 1164 with f: 1165 pass 1166 self.assertRaises(ValueError, use_closed) 1167 1168 def test_context_manager_during_rollover(self): 1169 # A SpooledTemporaryFile can be used as a context manager 1170 with tempfile.SpooledTemporaryFile(max_size=1) as f: 1171 self.assertFalse(f._rolled) 1172 f.write(b'abc\n') 1173 f.flush() 1174 self.assertTrue(f._rolled) 1175 self.assertFalse(f.closed) 1176 self.assertTrue(f.closed) 1177 def use_closed(): 1178 with f: 1179 pass 1180 self.assertRaises(ValueError, use_closed) 1181 1182 def test_context_manager_after_rollover(self): 1183 # A SpooledTemporaryFile can be used as a context manager 1184 f = tempfile.SpooledTemporaryFile(max_size=1) 1185 f.write(b'abc\n') 1186 f.flush() 1187 self.assertTrue(f._rolled) 1188 with f: 1189 self.assertFalse(f.closed) 1190 self.assertTrue(f.closed) 1191 def use_closed(): 1192 with f: 1193 pass 1194 self.assertRaises(ValueError, use_closed) 1195 1196 def test_truncate_with_size_parameter(self): 1197 # A SpooledTemporaryFile can be truncated to zero size 1198 f = tempfile.SpooledTemporaryFile(max_size=10) 1199 f.write(b'abcdefg\n') 1200 f.seek(0) 1201 f.truncate() 1202 self.assertFalse(f._rolled) 1203 self.assertEqual(f._file.getvalue(), b'') 1204 # A SpooledTemporaryFile can be truncated to a specific size 1205 f = tempfile.SpooledTemporaryFile(max_size=10) 1206 f.write(b'abcdefg\n') 1207 f.truncate(4) 1208 self.assertFalse(f._rolled) 1209 self.assertEqual(f._file.getvalue(), b'abcd') 1210 # A SpooledTemporaryFile rolls over if truncated to large size 1211 f = tempfile.SpooledTemporaryFile(max_size=10) 1212 f.write(b'abcdefg\n') 1213 f.truncate(20) 1214 self.assertTrue(f._rolled) 1215 if has_stat: 1216 self.assertEqual(os.fstat(f.fileno()).st_size, 20) 1217 1218 1219if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile: 1220 1221 class TestTemporaryFile(BaseTestCase): 1222 """Test TemporaryFile().""" 1223 1224 def test_basic(self): 1225 # TemporaryFile can create files 1226 # No point in testing the name params - the file has no name. 1227 tempfile.TemporaryFile() 1228 1229 def test_has_no_name(self): 1230 # TemporaryFile creates files with no names (on this system) 1231 dir = tempfile.mkdtemp() 1232 f = tempfile.TemporaryFile(dir=dir) 1233 f.write(b'blat') 1234 1235 # Sneaky: because this file has no name, it should not prevent 1236 # us from removing the directory it was created in. 1237 try: 1238 os.rmdir(dir) 1239 except: 1240 # cleanup 1241 f.close() 1242 os.rmdir(dir) 1243 raise 1244 1245 def test_multiple_close(self): 1246 # A TemporaryFile can be closed many times without error 1247 f = tempfile.TemporaryFile() 1248 f.write(b'abc\n') 1249 f.close() 1250 f.close() 1251 f.close() 1252 1253 # How to test the mode and bufsize parameters? 1254 def test_mode_and_encoding(self): 1255 1256 def roundtrip(input, *args, **kwargs): 1257 with tempfile.TemporaryFile(*args, **kwargs) as fileobj: 1258 fileobj.write(input) 1259 fileobj.seek(0) 1260 self.assertEqual(input, fileobj.read()) 1261 1262 roundtrip(b"1234", "w+b") 1263 roundtrip("abdc\n", "w+") 1264 roundtrip("\u039B", "w+", encoding="utf-16") 1265 roundtrip("foo\r\n", "w+", newline="") 1266 1267 def test_no_leak_fd(self): 1268 # Issue #21058: don't leak file descriptor when io.open() fails 1269 closed = [] 1270 os_close = os.close 1271 def close(fd): 1272 closed.append(fd) 1273 os_close(fd) 1274 1275 with mock.patch('os.close', side_effect=close): 1276 with mock.patch('io.open', side_effect=ValueError): 1277 self.assertRaises(ValueError, tempfile.TemporaryFile) 1278 self.assertEqual(len(closed), 1) 1279 1280 1281 1282# Helper for test_del_on_shutdown 1283class NulledModules: 1284 def __init__(self, *modules): 1285 self.refs = [mod.__dict__ for mod in modules] 1286 self.contents = [ref.copy() for ref in self.refs] 1287 1288 def __enter__(self): 1289 for d in self.refs: 1290 for key in d: 1291 d[key] = None 1292 1293 def __exit__(self, *exc_info): 1294 for d, c in zip(self.refs, self.contents): 1295 d.clear() 1296 d.update(c) 1297 1298class TestTemporaryDirectory(BaseTestCase): 1299 """Test TemporaryDirectory().""" 1300 1301 def do_create(self, dir=None, pre="", suf="", recurse=1): 1302 if dir is None: 1303 dir = tempfile.gettempdir() 1304 tmp = tempfile.TemporaryDirectory(dir=dir, prefix=pre, suffix=suf) 1305 self.nameCheck(tmp.name, dir, pre, suf) 1306 # Create a subdirectory and some files 1307 if recurse: 1308 d1 = self.do_create(tmp.name, pre, suf, recurse-1) 1309 d1.name = None 1310 with open(os.path.join(tmp.name, "test.txt"), "wb") as f: 1311 f.write(b"Hello world!") 1312 return tmp 1313 1314 def test_mkdtemp_failure(self): 1315 # Check no additional exception if mkdtemp fails 1316 # Previously would raise AttributeError instead 1317 # (noted as part of Issue #10188) 1318 with tempfile.TemporaryDirectory() as nonexistent: 1319 pass 1320 with self.assertRaises(FileNotFoundError) as cm: 1321 tempfile.TemporaryDirectory(dir=nonexistent) 1322 self.assertEqual(cm.exception.errno, errno.ENOENT) 1323 1324 def test_explicit_cleanup(self): 1325 # A TemporaryDirectory is deleted when cleaned up 1326 dir = tempfile.mkdtemp() 1327 try: 1328 d = self.do_create(dir=dir) 1329 self.assertTrue(os.path.exists(d.name), 1330 "TemporaryDirectory %s does not exist" % d.name) 1331 d.cleanup() 1332 self.assertFalse(os.path.exists(d.name), 1333 "TemporaryDirectory %s exists after cleanup" % d.name) 1334 finally: 1335 os.rmdir(dir) 1336 1337 @support.skip_unless_symlink 1338 def test_cleanup_with_symlink_to_a_directory(self): 1339 # cleanup() should not follow symlinks to directories (issue #12464) 1340 d1 = self.do_create() 1341 d2 = self.do_create(recurse=0) 1342 1343 # Symlink d1/foo -> d2 1344 os.symlink(d2.name, os.path.join(d1.name, "foo")) 1345 1346 # This call to cleanup() should not follow the "foo" symlink 1347 d1.cleanup() 1348 1349 self.assertFalse(os.path.exists(d1.name), 1350 "TemporaryDirectory %s exists after cleanup" % d1.name) 1351 self.assertTrue(os.path.exists(d2.name), 1352 "Directory pointed to by a symlink was deleted") 1353 self.assertEqual(os.listdir(d2.name), ['test.txt'], 1354 "Contents of the directory pointed to by a symlink " 1355 "were deleted") 1356 d2.cleanup() 1357 1358 @support.cpython_only 1359 def test_del_on_collection(self): 1360 # A TemporaryDirectory is deleted when garbage collected 1361 dir = tempfile.mkdtemp() 1362 try: 1363 d = self.do_create(dir=dir) 1364 name = d.name 1365 del d # Rely on refcounting to invoke __del__ 1366 self.assertFalse(os.path.exists(name), 1367 "TemporaryDirectory %s exists after __del__" % name) 1368 finally: 1369 os.rmdir(dir) 1370 1371 def test_del_on_shutdown(self): 1372 # A TemporaryDirectory may be cleaned up during shutdown 1373 with self.do_create() as dir: 1374 for mod in ('builtins', 'os', 'shutil', 'sys', 'tempfile', 'warnings'): 1375 code = """if True: 1376 import builtins 1377 import os 1378 import shutil 1379 import sys 1380 import tempfile 1381 import warnings 1382 1383 tmp = tempfile.TemporaryDirectory(dir={dir!r}) 1384 sys.stdout.buffer.write(tmp.name.encode()) 1385 1386 tmp2 = os.path.join(tmp.name, 'test_dir') 1387 os.mkdir(tmp2) 1388 with open(os.path.join(tmp2, "test.txt"), "w") as f: 1389 f.write("Hello world!") 1390 1391 {mod}.tmp = tmp 1392 1393 warnings.filterwarnings("always", category=ResourceWarning) 1394 """.format(dir=dir, mod=mod) 1395 rc, out, err = script_helper.assert_python_ok("-c", code) 1396 tmp_name = out.decode().strip() 1397 self.assertFalse(os.path.exists(tmp_name), 1398 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1399 err = err.decode('utf-8', 'backslashreplace') 1400 self.assertNotIn("Exception ", err) 1401 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1402 1403 def test_exit_on_shutdown(self): 1404 # Issue #22427 1405 with self.do_create() as dir: 1406 code = """if True: 1407 import sys 1408 import tempfile 1409 import warnings 1410 1411 def generator(): 1412 with tempfile.TemporaryDirectory(dir={dir!r}) as tmp: 1413 yield tmp 1414 g = generator() 1415 sys.stdout.buffer.write(next(g).encode()) 1416 1417 warnings.filterwarnings("always", category=ResourceWarning) 1418 """.format(dir=dir) 1419 rc, out, err = script_helper.assert_python_ok("-c", code) 1420 tmp_name = out.decode().strip() 1421 self.assertFalse(os.path.exists(tmp_name), 1422 "TemporaryDirectory %s exists after cleanup" % tmp_name) 1423 err = err.decode('utf-8', 'backslashreplace') 1424 self.assertNotIn("Exception ", err) 1425 self.assertIn("ResourceWarning: Implicitly cleaning up", err) 1426 1427 def test_warnings_on_cleanup(self): 1428 # ResourceWarning will be triggered by __del__ 1429 with self.do_create() as dir: 1430 d = self.do_create(dir=dir, recurse=3) 1431 name = d.name 1432 1433 # Check for the resource warning 1434 with support.check_warnings(('Implicitly', ResourceWarning), quiet=False): 1435 warnings.filterwarnings("always", category=ResourceWarning) 1436 del d 1437 support.gc_collect() 1438 self.assertFalse(os.path.exists(name), 1439 "TemporaryDirectory %s exists after __del__" % name) 1440 1441 def test_multiple_close(self): 1442 # Can be cleaned-up many times without error 1443 d = self.do_create() 1444 d.cleanup() 1445 d.cleanup() 1446 d.cleanup() 1447 1448 def test_context_manager(self): 1449 # Can be used as a context manager 1450 d = self.do_create() 1451 with d as name: 1452 self.assertTrue(os.path.exists(name)) 1453 self.assertEqual(name, d.name) 1454 self.assertFalse(os.path.exists(name)) 1455 1456 1457if __name__ == "__main__": 1458 unittest.main() 1459