1# tempfile.py unit tests.
2import tempfile
3import errno
4import io
5import os
6import signal
7import shutil
8import sys
9import re
10import warnings
11import contextlib
12
13import unittest
14from test import test_support as support
15
16warnings.filterwarnings("ignore",
17                        category=RuntimeWarning,
18                        message="mktemp", module=__name__)
19
20if hasattr(os, 'stat'):
21    import stat
22    has_stat = 1
23else:
24    has_stat = 0
25
26has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
27has_spawnl = hasattr(os, 'spawnl')
28
29# TEST_FILES may need to be tweaked for systems depending on the maximum
30# number of files that can be opened at one time (see ulimit -n)
31if sys.platform in ('openbsd3', 'openbsd4'):
32    TEST_FILES = 48
33else:
34    TEST_FILES = 100
35
36# This is organized as one test for each chunk of code in tempfile.py,
37# in order of their appearance in the file.  Testing which requires
38# threads is not done here.
39
40# Common functionality.
41class TC(unittest.TestCase):
42
43    str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
44
45    def failOnException(self, what, ei=None):
46        if ei is None:
47            ei = sys.exc_info()
48        self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
49
50    def nameCheck(self, name, dir, pre, suf):
51        (ndir, nbase) = os.path.split(name)
52        npre  = nbase[:len(pre)]
53        nsuf  = nbase[len(nbase)-len(suf):]
54
55        # check for equality of the absolute paths!
56        self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
57                         "file '%s' not in directory '%s'" % (name, dir))
58        self.assertEqual(npre, pre,
59                         "file '%s' does not begin with '%s'" % (nbase, pre))
60        self.assertEqual(nsuf, suf,
61                         "file '%s' does not end with '%s'" % (nbase, suf))
62
63        nbase = nbase[len(pre):len(nbase)-len(suf)]
64        self.assertTrue(self.str_check.match(nbase),
65                     "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
66                     % nbase)
67
68test_classes = []
69
70class test_exports(TC):
71    def test_exports(self):
72        # There are no surprising symbols in the tempfile module
73        dict = tempfile.__dict__
74
75        expected = {
76            "NamedTemporaryFile" : 1,
77            "TemporaryFile" : 1,
78            "mkstemp" : 1,
79            "mkdtemp" : 1,
80            "mktemp" : 1,
81            "TMP_MAX" : 1,
82            "gettempprefix" : 1,
83            "gettempdir" : 1,
84            "tempdir" : 1,
85            "template" : 1,
86            "SpooledTemporaryFile" : 1
87        }
88
89        unexp = []
90        for key in dict:
91            if key[0] != '_' and key not in expected:
92                unexp.append(key)
93        self.assertTrue(len(unexp) == 0,
94                        "unexpected keys: %s" % unexp)
95
96test_classes.append(test_exports)
97
98
99class test__RandomNameSequence(TC):
100    """Test the internal iterator object _RandomNameSequence."""
101
102    def setUp(self):
103        self.r = tempfile._RandomNameSequence()
104
105    def test_get_six_char_str(self):
106        # _RandomNameSequence returns a six-character string
107        s = self.r.next()
108        self.nameCheck(s, '', '', '')
109
110    def test_many(self):
111        # _RandomNameSequence returns no duplicate strings (stochastic)
112
113        dict = {}
114        r = self.r
115        for i in xrange(TEST_FILES):
116            s = r.next()
117            self.nameCheck(s, '', '', '')
118            self.assertNotIn(s, dict)
119            dict[s] = 1
120
121    def test_supports_iter(self):
122        # _RandomNameSequence supports the iterator protocol
123
124        i = 0
125        r = self.r
126        try:
127            for s in r:
128                i += 1
129                if i == 20:
130                    break
131        except:
132            self.failOnException("iteration")
133
134    @unittest.skipUnless(hasattr(os, 'fork'),
135        "os.fork is required for this test")
136    def test_process_awareness(self):
137        # ensure that the random source differs between
138        # child and parent.
139        read_fd, write_fd = os.pipe()
140        pid = None
141        try:
142            pid = os.fork()
143            if not pid:
144                # child process
145                os.close(read_fd)
146                os.write(write_fd, next(self.r).encode("ascii"))
147                os.close(write_fd)
148                # bypass the normal exit handlers- leave those to
149                # the parent.
150                os._exit(0)
151
152            # parent process
153            parent_value = next(self.r)
154            child_value = os.read(read_fd, len(parent_value)).decode("ascii")
155        finally:
156            if pid:
157                # best effort to ensure the process can't bleed out
158                # via any bugs above
159                try:
160                    os.kill(pid, signal.SIGKILL)
161                except EnvironmentError:
162                    pass
163
164                # Read the process exit status to avoid zombie process
165                os.waitpid(pid, 0)
166
167            os.close(read_fd)
168            os.close(write_fd)
169        self.assertNotEqual(child_value, parent_value)
170
171
172test_classes.append(test__RandomNameSequence)
173
174
175class test__candidate_tempdir_list(TC):
176    """Test the internal function _candidate_tempdir_list."""
177
178    def test_nonempty_list(self):
179        # _candidate_tempdir_list returns a nonempty list of strings
180
181        cand = tempfile._candidate_tempdir_list()
182
183        self.assertFalse(len(cand) == 0)
184        for c in cand:
185            self.assertIsInstance(c, basestring)
186
187    def test_wanted_dirs(self):
188        # _candidate_tempdir_list contains the expected directories
189
190        # Make sure the interesting environment variables are all set.
191        with support.EnvironmentVarGuard() as env:
192            for envname in 'TMPDIR', 'TEMP', 'TMP':
193                dirname = os.getenv(envname)
194                if not dirname:
195                    env[envname] = os.path.abspath(envname)
196
197            cand = tempfile._candidate_tempdir_list()
198
199            for envname in 'TMPDIR', 'TEMP', 'TMP':
200                dirname = os.getenv(envname)
201                if not dirname: raise ValueError
202                self.assertIn(dirname, cand)
203
204            try:
205                dirname = os.getcwd()
206            except (AttributeError, os.error):
207                dirname = os.curdir
208
209            self.assertIn(dirname, cand)
210
211            # Not practical to try to verify the presence of OS-specific
212            # paths in this list.
213
214test_classes.append(test__candidate_tempdir_list)
215
216# We test _get_default_tempdir some more by testing gettempdir.
217
218class TestGetDefaultTempdir(TC):
219    """Test _get_default_tempdir()."""
220
221    def test_no_files_left_behind(self):
222        # use a private empty directory
223        our_temp_directory = tempfile.mkdtemp()
224        try:
225            # force _get_default_tempdir() to consider our empty directory
226            def our_candidate_list():
227                return [our_temp_directory]
228
229            with support.swap_attr(tempfile, "_candidate_tempdir_list",
230                                   our_candidate_list):
231                # verify our directory is empty after _get_default_tempdir()
232                tempfile._get_default_tempdir()
233                self.assertEqual(os.listdir(our_temp_directory), [])
234
235                def raise_OSError(*args, **kwargs):
236                    raise OSError(-1)
237
238                with support.swap_attr(io, "open", raise_OSError):
239                    # test again with failing io.open()
240                    with self.assertRaises(IOError) as cm:
241                        tempfile._get_default_tempdir()
242                    self.assertEqual(cm.exception.errno, errno.ENOENT)
243                    self.assertEqual(os.listdir(our_temp_directory), [])
244
245                def bad_writer(*args, **kwargs):
246                    fp = orig_open(*args, **kwargs)
247                    fp.write = raise_OSError
248                    return fp
249
250                with support.swap_attr(io, "open", bad_writer) as orig_open:
251                    # test again with failing write()
252                    with self.assertRaises(IOError) as cm:
253                        tempfile._get_default_tempdir()
254                    self.assertEqual(cm.exception.errno, errno.ENOENT)
255                    self.assertEqual(os.listdir(our_temp_directory), [])
256        finally:
257            shutil.rmtree(our_temp_directory)
258
259test_classes.append(TestGetDefaultTempdir)
260
261
262class test__get_candidate_names(TC):
263    """Test the internal function _get_candidate_names."""
264
265    def test_retval(self):
266        # _get_candidate_names returns a _RandomNameSequence object
267        obj = tempfile._get_candidate_names()
268        self.assertIsInstance(obj, tempfile._RandomNameSequence)
269
270    def test_same_thing(self):
271        # _get_candidate_names always returns the same object
272        a = tempfile._get_candidate_names()
273        b = tempfile._get_candidate_names()
274
275        self.assertTrue(a is b)
276
277test_classes.append(test__get_candidate_names)
278
279
280@contextlib.contextmanager
281def _inside_empty_temp_dir():
282    dir = tempfile.mkdtemp()
283    try:
284        with support.swap_attr(tempfile, 'tempdir', dir):
285            yield
286    finally:
287        support.rmtree(dir)
288
289
290def _mock_candidate_names(*names):
291    return support.swap_attr(tempfile,
292                             '_get_candidate_names',
293                             lambda: iter(names))
294
295
296class TestBadTempdir:
297
298    def test_read_only_directory(self):
299        with _inside_empty_temp_dir():
300            oldmode = mode = os.stat(tempfile.tempdir).st_mode
301            mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
302            os.chmod(tempfile.tempdir, mode)
303            try:
304                if os.access(tempfile.tempdir, os.W_OK):
305                    self.skipTest("can't set the directory read-only")
306                with self.assertRaises(OSError) as cm:
307                    self.make_temp()
308                self.assertIn(cm.exception.errno, (errno.EPERM, errno.EACCES))
309                self.assertEqual(os.listdir(tempfile.tempdir), [])
310            finally:
311                os.chmod(tempfile.tempdir, oldmode)
312
313    def test_nonexisting_directory(self):
314        with _inside_empty_temp_dir():
315            tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
316            with support.swap_attr(tempfile, 'tempdir', tempdir):
317                with self.assertRaises(OSError) as cm:
318                    self.make_temp()
319                self.assertEqual(cm.exception.errno, errno.ENOENT)
320
321    def test_non_directory(self):
322        with _inside_empty_temp_dir():
323            tempdir = os.path.join(tempfile.tempdir, 'file')
324            open(tempdir, 'wb').close()
325            with support.swap_attr(tempfile, 'tempdir', tempdir):
326                with self.assertRaises(OSError) as cm:
327                    self.make_temp()
328                self.assertIn(cm.exception.errno, (errno.ENOTDIR, errno.ENOENT))
329
330
331class test__mkstemp_inner(TestBadTempdir, TC):
332    """Test the internal function _mkstemp_inner."""
333
334    class mkstemped:
335        _bflags = tempfile._bin_openflags
336        _tflags = tempfile._text_openflags
337        _close = os.close
338        _unlink = os.unlink
339
340        def __init__(self, dir, pre, suf, bin):
341            if bin: flags = self._bflags
342            else:   flags = self._tflags
343
344            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
345
346        def write(self, str):
347            os.write(self.fd, str)
348
349        def __del__(self):
350            self._close(self.fd)
351            self._unlink(self.name)
352
353    def do_create(self, dir=None, pre="", suf="", bin=1):
354        if dir is None:
355            dir = tempfile.gettempdir()
356        try:
357            file = self.mkstemped(dir, pre, suf, bin)
358        except:
359            self.failOnException("_mkstemp_inner")
360
361        self.nameCheck(file.name, dir, pre, suf)
362        return file
363
364    def test_basic(self):
365        # _mkstemp_inner can create files
366        self.do_create().write("blat")
367        self.do_create(pre="a").write("blat")
368        self.do_create(suf="b").write("blat")
369        self.do_create(pre="a", suf="b").write("blat")
370        self.do_create(pre="aa", suf=".txt").write("blat")
371
372    def test_basic_many(self):
373        # _mkstemp_inner can create many files (stochastic)
374        extant = range(TEST_FILES)
375        for i in extant:
376            extant[i] = self.do_create(pre="aa")
377
378    def test_choose_directory(self):
379        # _mkstemp_inner can create files in a user-selected directory
380        dir = tempfile.mkdtemp()
381        try:
382            self.do_create(dir=dir).write("blat")
383        finally:
384            os.rmdir(dir)
385
386    @unittest.skipUnless(has_stat, 'os.stat not available')
387    def test_file_mode(self):
388        # _mkstemp_inner creates files with the proper mode
389
390        file = self.do_create()
391        mode = stat.S_IMODE(os.stat(file.name).st_mode)
392        expected = 0600
393        if sys.platform in ('win32', 'os2emx'):
394            # There's no distinction among 'user', 'group' and 'world';
395            # replicate the 'user' bits.
396            user = expected >> 6
397            expected = user * (1 + 8 + 64)
398        self.assertEqual(mode, expected)
399
400    @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
401    def test_noinherit(self):
402        # _mkstemp_inner file handles are not inherited by child processes
403
404        if support.verbose:
405            v="v"
406        else:
407            v="q"
408
409        file = self.do_create()
410        fd = "%d" % file.fd
411
412        try:
413            me = __file__
414        except NameError:
415            me = sys.argv[0]
416
417        # We have to exec something, so that FD_CLOEXEC will take
418        # effect.  The core of this test is therefore in
419        # tf_inherit_check.py, which see.
420        tester = os.path.join(os.path.dirname(os.path.abspath(me)),
421                              "tf_inherit_check.py")
422
423        # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
424        # but an arg with embedded spaces should be decorated with double
425        # quotes on each end
426        if sys.platform in ('win32',):
427            decorated = '"%s"' % sys.executable
428            tester = '"%s"' % tester
429        else:
430            decorated = sys.executable
431
432        retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
433        self.assertFalse(retval < 0,
434                    "child process caught fatal signal %d" % -retval)
435        self.assertFalse(retval > 0, "child process reports failure %d"%retval)
436
437    @unittest.skipUnless(has_textmode, "text mode not available")
438    def test_textmode(self):
439        # _mkstemp_inner can create files in text mode
440
441        self.do_create(bin=0).write("blat\n")
442        # XXX should test that the file really is a text file
443
444    def make_temp(self):
445        return tempfile._mkstemp_inner(tempfile.gettempdir(),
446                                       tempfile.template,
447                                       '',
448                                       tempfile._bin_openflags)
449
450    def test_collision_with_existing_file(self):
451        # _mkstemp_inner tries another name when a file with
452        # the chosen name already exists
453        with _inside_empty_temp_dir(), \
454             _mock_candidate_names('aaa', 'aaa', 'bbb'):
455            (fd1, name1) = self.make_temp()
456            os.close(fd1)
457            self.assertTrue(name1.endswith('aaa'))
458
459            (fd2, name2) = self.make_temp()
460            os.close(fd2)
461            self.assertTrue(name2.endswith('bbb'))
462
463    def test_collision_with_existing_directory(self):
464        # _mkstemp_inner tries another name when a directory with
465        # the chosen name already exists
466        with _inside_empty_temp_dir(), \
467             _mock_candidate_names('aaa', 'aaa', 'bbb'):
468            dir = tempfile.mkdtemp()
469            self.assertTrue(dir.endswith('aaa'))
470
471            (fd, name) = self.make_temp()
472            os.close(fd)
473            self.assertTrue(name.endswith('bbb'))
474
475test_classes.append(test__mkstemp_inner)
476
477
478class test_gettempprefix(TC):
479    """Test gettempprefix()."""
480
481    def test_sane_template(self):
482        # gettempprefix returns a nonempty prefix string
483        p = tempfile.gettempprefix()
484
485        self.assertIsInstance(p, basestring)
486        self.assertTrue(len(p) > 0)
487
488    def test_usable_template(self):
489        # gettempprefix returns a usable prefix string
490
491        # Create a temp directory, avoiding use of the prefix.
492        # Then attempt to create a file whose name is
493        # prefix + 'xxxxxx.xxx' in that directory.
494        p = tempfile.gettempprefix() + "xxxxxx.xxx"
495        d = tempfile.mkdtemp(prefix="")
496        try:
497            p = os.path.join(d, p)
498            try:
499                fd = os.open(p, os.O_RDWR | os.O_CREAT)
500            except:
501                self.failOnException("os.open")
502            os.close(fd)
503            os.unlink(p)
504        finally:
505            os.rmdir(d)
506
507test_classes.append(test_gettempprefix)
508
509
510class test_gettempdir(TC):
511    """Test gettempdir()."""
512
513    def test_directory_exists(self):
514        # gettempdir returns a directory which exists
515
516        dir = tempfile.gettempdir()
517        self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
518                     "%s is not an absolute path" % dir)
519        self.assertTrue(os.path.isdir(dir),
520                     "%s is not a directory" % dir)
521
522    def test_directory_writable(self):
523        # gettempdir returns a directory writable by the user
524
525        # sneaky: just instantiate a NamedTemporaryFile, which
526        # defaults to writing into the directory returned by
527        # gettempdir.
528        try:
529            file = tempfile.NamedTemporaryFile()
530            file.write("blat")
531            file.close()
532        except:
533            self.failOnException("create file in %s" % tempfile.gettempdir())
534
535    def test_same_thing(self):
536        # gettempdir always returns the same object
537        a = tempfile.gettempdir()
538        b = tempfile.gettempdir()
539
540        self.assertTrue(a is b)
541
542test_classes.append(test_gettempdir)
543
544
545class test_mkstemp(TC):
546    """Test mkstemp()."""
547
548    def do_create(self, dir=None, pre="", suf=""):
549        if dir is None:
550            dir = tempfile.gettempdir()
551        try:
552            (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
553            (ndir, nbase) = os.path.split(name)
554            adir = os.path.abspath(dir)
555            self.assertEqual(adir, ndir,
556                "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
557        except:
558            self.failOnException("mkstemp")
559
560        try:
561            self.nameCheck(name, dir, pre, suf)
562        finally:
563            os.close(fd)
564            os.unlink(name)
565
566    def test_basic(self):
567        # mkstemp can create files
568        self.do_create()
569        self.do_create(pre="a")
570        self.do_create(suf="b")
571        self.do_create(pre="a", suf="b")
572        self.do_create(pre="aa", suf=".txt")
573        self.do_create(dir=".")
574
575    def test_choose_directory(self):
576        # mkstemp can create directories in a user-selected directory
577        dir = tempfile.mkdtemp()
578        try:
579            self.do_create(dir=dir)
580        finally:
581            os.rmdir(dir)
582
583test_classes.append(test_mkstemp)
584
585
586class test_mkdtemp(TestBadTempdir, TC):
587    """Test mkdtemp()."""
588
589    def make_temp(self):
590        return tempfile.mkdtemp()
591
592    def do_create(self, dir=None, pre="", suf=""):
593        if dir is None:
594            dir = tempfile.gettempdir()
595        try:
596            name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
597        except:
598            self.failOnException("mkdtemp")
599
600        try:
601            self.nameCheck(name, dir, pre, suf)
602            return name
603        except:
604            os.rmdir(name)
605            raise
606
607    def test_basic(self):
608        # mkdtemp can create directories
609        os.rmdir(self.do_create())
610        os.rmdir(self.do_create(pre="a"))
611        os.rmdir(self.do_create(suf="b"))
612        os.rmdir(self.do_create(pre="a", suf="b"))
613        os.rmdir(self.do_create(pre="aa", suf=".txt"))
614
615    def test_basic_many(self):
616        # mkdtemp can create many directories (stochastic)
617        extant = range(TEST_FILES)
618        try:
619            for i in extant:
620                extant[i] = self.do_create(pre="aa")
621        finally:
622            for i in extant:
623                if(isinstance(i, basestring)):
624                    os.rmdir(i)
625
626    def test_choose_directory(self):
627        # mkdtemp can create directories in a user-selected directory
628        dir = tempfile.mkdtemp()
629        try:
630            os.rmdir(self.do_create(dir=dir))
631        finally:
632            os.rmdir(dir)
633
634    @unittest.skipUnless(has_stat, 'os.stat not available')
635    def test_mode(self):
636        # mkdtemp creates directories with the proper mode
637
638        dir = self.do_create()
639        try:
640            mode = stat.S_IMODE(os.stat(dir).st_mode)
641            mode &= 0777 # Mask off sticky bits inherited from /tmp
642            expected = 0700
643            if sys.platform in ('win32', 'os2emx'):
644                # There's no distinction among 'user', 'group' and 'world';
645                # replicate the 'user' bits.
646                user = expected >> 6
647                expected = user * (1 + 8 + 64)
648            self.assertEqual(mode, expected)
649        finally:
650            os.rmdir(dir)
651
652    def test_collision_with_existing_file(self):
653        # mkdtemp tries another name when a file with
654        # the chosen name already exists
655        with _inside_empty_temp_dir(), \
656             _mock_candidate_names('aaa', 'aaa', 'bbb'):
657            file = tempfile.NamedTemporaryFile(delete=False)
658            file.close()
659            self.assertTrue(file.name.endswith('aaa'))
660            dir = tempfile.mkdtemp()
661            self.assertTrue(dir.endswith('bbb'))
662
663    def test_collision_with_existing_directory(self):
664        # mkdtemp tries another name when a directory with
665        # the chosen name already exists
666        with _inside_empty_temp_dir(), \
667             _mock_candidate_names('aaa', 'aaa', 'bbb'):
668            dir1 = tempfile.mkdtemp()
669            self.assertTrue(dir1.endswith('aaa'))
670            dir2 = tempfile.mkdtemp()
671            self.assertTrue(dir2.endswith('bbb'))
672
673test_classes.append(test_mkdtemp)
674
675
676class test_mktemp(TC):
677    """Test mktemp()."""
678
679    # For safety, all use of mktemp must occur in a private directory.
680    # We must also suppress the RuntimeWarning it generates.
681    def setUp(self):
682        self.dir = tempfile.mkdtemp()
683
684    def tearDown(self):
685        if self.dir:
686            os.rmdir(self.dir)
687            self.dir = None
688
689    class mktemped:
690        _unlink = os.unlink
691        _bflags = tempfile._bin_openflags
692
693        def __init__(self, dir, pre, suf):
694            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
695            # Create the file.  This will raise an exception if it's
696            # mysteriously appeared in the meanwhile.
697            os.close(os.open(self.name, self._bflags, 0600))
698
699        def __del__(self):
700            self._unlink(self.name)
701
702    def do_create(self, pre="", suf=""):
703        try:
704            file = self.mktemped(self.dir, pre, suf)
705        except:
706            self.failOnException("mktemp")
707
708        self.nameCheck(file.name, self.dir, pre, suf)
709        return file
710
711    def test_basic(self):
712        # mktemp can choose usable file names
713        self.do_create()
714        self.do_create(pre="a")
715        self.do_create(suf="b")
716        self.do_create(pre="a", suf="b")
717        self.do_create(pre="aa", suf=".txt")
718
719    def test_many(self):
720        # mktemp can choose many usable file names (stochastic)
721        extant = range(TEST_FILES)
722        for i in extant:
723            extant[i] = self.do_create(pre="aa")
724
725##     def test_warning(self):
726##         # mktemp issues a warning when used
727##         warnings.filterwarnings("error",
728##                                 category=RuntimeWarning,
729##                                 message="mktemp")
730##         self.assertRaises(RuntimeWarning,
731##                           tempfile.mktemp, dir=self.dir)
732
733test_classes.append(test_mktemp)
734
735
736# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
737
738
739class test_NamedTemporaryFile(TC):
740    """Test NamedTemporaryFile()."""
741
742    def do_create(self, dir=None, pre="", suf="", delete=True):
743        if dir is None:
744            dir = tempfile.gettempdir()
745        try:
746            file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
747                                               delete=delete)
748        except:
749            self.failOnException("NamedTemporaryFile")
750
751        self.nameCheck(file.name, dir, pre, suf)
752        return file
753
754
755    def test_basic(self):
756        # NamedTemporaryFile can create files
757        self.do_create()
758        self.do_create(pre="a")
759        self.do_create(suf="b")
760        self.do_create(pre="a", suf="b")
761        self.do_create(pre="aa", suf=".txt")
762
763    def test_creates_named(self):
764        # NamedTemporaryFile creates files with names
765        f = tempfile.NamedTemporaryFile()
766        self.assertTrue(os.path.exists(f.name),
767                        "NamedTemporaryFile %s does not exist" % f.name)
768
769    def test_del_on_close(self):
770        # A NamedTemporaryFile is deleted when closed
771        dir = tempfile.mkdtemp()
772        try:
773            f = tempfile.NamedTemporaryFile(dir=dir)
774            f.write('blat')
775            f.close()
776            self.assertFalse(os.path.exists(f.name),
777                        "NamedTemporaryFile %s exists after close" % f.name)
778        finally:
779            os.rmdir(dir)
780
781    def test_dis_del_on_close(self):
782        # Tests that delete-on-close can be disabled
783        dir = tempfile.mkdtemp()
784        tmp = None
785        try:
786            f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
787            tmp = f.name
788            f.write('blat')
789            f.close()
790            self.assertTrue(os.path.exists(f.name),
791                        "NamedTemporaryFile %s missing after close" % f.name)
792        finally:
793            if tmp is not None:
794                os.unlink(tmp)
795            os.rmdir(dir)
796
797    def test_multiple_close(self):
798        # A NamedTemporaryFile can be closed many times without error
799        f = tempfile.NamedTemporaryFile()
800        f.write('abc\n')
801        f.close()
802        try:
803            f.close()
804            f.close()
805        except:
806            self.failOnException("close")
807
808    def test_context_manager(self):
809        # A NamedTemporaryFile can be used as a context manager
810        with tempfile.NamedTemporaryFile() as f:
811            self.assertTrue(os.path.exists(f.name))
812        self.assertFalse(os.path.exists(f.name))
813        def use_closed():
814            with f:
815                pass
816        self.assertRaises(ValueError, use_closed)
817
818    def test_no_leak_fd(self):
819        # Issue #21058: don't leak file descriptor when fdopen() fails
820        old_close = os.close
821        old_fdopen = os.fdopen
822        closed = []
823        def close(fd):
824            old_close(fd)
825            closed.append(fd)
826        def fdopen(*args):
827            raise ValueError()
828        os.close = close
829        os.fdopen = fdopen
830        try:
831            self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
832            self.assertEqual(len(closed), 1)
833        finally:
834            os.close = old_close
835            os.fdopen = old_fdopen
836
837    def test_bad_mode(self):
838        dir = tempfile.mkdtemp()
839        self.addCleanup(support.rmtree, dir)
840        with self.assertRaises(TypeError):
841            tempfile.NamedTemporaryFile(mode=(), dir=dir)
842        self.assertEqual(os.listdir(dir), [])
843
844    # How to test the mode and bufsize parameters?
845
846test_classes.append(test_NamedTemporaryFile)
847
848class test_SpooledTemporaryFile(TC):
849    """Test SpooledTemporaryFile()."""
850
851    def do_create(self, max_size=0, dir=None, pre="", suf=""):
852        if dir is None:
853            dir = tempfile.gettempdir()
854        try:
855            file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
856        except:
857            self.failOnException("SpooledTemporaryFile")
858
859        return file
860
861
862    def test_basic(self):
863        # SpooledTemporaryFile can create files
864        f = self.do_create()
865        self.assertFalse(f._rolled)
866        f = self.do_create(max_size=100, pre="a", suf=".txt")
867        self.assertFalse(f._rolled)
868
869    def test_del_on_close(self):
870        # A SpooledTemporaryFile is deleted when closed
871        dir = tempfile.mkdtemp()
872        try:
873            f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
874            self.assertFalse(f._rolled)
875            f.write('blat ' * 5)
876            self.assertTrue(f._rolled)
877            filename = f.name
878            f.close()
879            self.assertFalse(os.path.exists(filename),
880                        "SpooledTemporaryFile %s exists after close" % filename)
881        finally:
882            os.rmdir(dir)
883
884    def test_rewrite_small(self):
885        # A SpooledTemporaryFile can be written to multiple within the max_size
886        f = self.do_create(max_size=30)
887        self.assertFalse(f._rolled)
888        for i in range(5):
889            f.seek(0, 0)
890            f.write('x' * 20)
891        self.assertFalse(f._rolled)
892
893    def test_write_sequential(self):
894        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
895        # over afterward
896        f = self.do_create(max_size=30)
897        self.assertFalse(f._rolled)
898        f.write('x' * 20)
899        self.assertFalse(f._rolled)
900        f.write('x' * 10)
901        self.assertFalse(f._rolled)
902        f.write('x')
903        self.assertTrue(f._rolled)
904
905    def test_writelines(self):
906        # Verify writelines with a SpooledTemporaryFile
907        f = self.do_create()
908        f.writelines((b'x', b'y', b'z'))
909        f.seek(0)
910        buf = f.read()
911        self.assertEqual(buf, b'xyz')
912
913    def test_writelines_sequential(self):
914        # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
915        # over afterward
916        f = self.do_create(max_size=35)
917        f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
918        self.assertFalse(f._rolled)
919        f.write(b'x')
920        self.assertTrue(f._rolled)
921
922    def test_xreadlines(self):
923        f = self.do_create(max_size=20)
924        f.write(b'abc\n' * 5)
925        f.seek(0)
926        self.assertFalse(f._rolled)
927        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
928        f.write(b'x\ny')
929        self.assertTrue(f._rolled)
930        f.seek(0)
931        self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
932
933    def test_sparse(self):
934        # A SpooledTemporaryFile that is written late in the file will extend
935        # when that occurs
936        f = self.do_create(max_size=30)
937        self.assertFalse(f._rolled)
938        f.seek(100, 0)
939        self.assertFalse(f._rolled)
940        f.write('x')
941        self.assertTrue(f._rolled)
942
943    def test_fileno(self):
944        # A SpooledTemporaryFile should roll over to a real file on fileno()
945        f = self.do_create(max_size=30)
946        self.assertFalse(f._rolled)
947        self.assertTrue(f.fileno() > 0)
948        self.assertTrue(f._rolled)
949
950    def test_multiple_close_before_rollover(self):
951        # A SpooledTemporaryFile can be closed many times without error
952        f = tempfile.SpooledTemporaryFile()
953        f.write('abc\n')
954        self.assertFalse(f._rolled)
955        f.close()
956        try:
957            f.close()
958            f.close()
959        except:
960            self.failOnException("close")
961
962    def test_multiple_close_after_rollover(self):
963        # A SpooledTemporaryFile can be closed many times without error
964        f = tempfile.SpooledTemporaryFile(max_size=1)
965        f.write('abc\n')
966        self.assertTrue(f._rolled)
967        f.close()
968        try:
969            f.close()
970            f.close()
971        except:
972            self.failOnException("close")
973
974    def test_bound_methods(self):
975        # It should be OK to steal a bound method from a SpooledTemporaryFile
976        # and use it independently; when the file rolls over, those bound
977        # methods should continue to function
978        f = self.do_create(max_size=30)
979        read = f.read
980        write = f.write
981        seek = f.seek
982
983        write("a" * 35)
984        write("b" * 35)
985        seek(0, 0)
986        self.assertTrue(read(70) == 'a'*35 + 'b'*35)
987
988    def test_properties(self):
989        f = tempfile.SpooledTemporaryFile(max_size=10)
990        f.write(b'x' * 10)
991        self.assertFalse(f._rolled)
992        self.assertEqual(f.mode, 'w+b')
993        self.assertIsNone(f.name)
994        with self.assertRaises(AttributeError):
995            f.newlines
996        with self.assertRaises(AttributeError):
997            f.encoding
998
999        f.write(b'x')
1000        self.assertTrue(f._rolled)
1001        self.assertEqual(f.mode, 'w+b')
1002        self.assertIsNotNone(f.name)
1003        with self.assertRaises(AttributeError):
1004            f.newlines
1005        with self.assertRaises(AttributeError):
1006            f.encoding
1007
1008    def test_context_manager_before_rollover(self):
1009        # A SpooledTemporaryFile can be used as a context manager
1010        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1011            self.assertFalse(f._rolled)
1012            self.assertFalse(f.closed)
1013        self.assertTrue(f.closed)
1014        def use_closed():
1015            with f:
1016                pass
1017        self.assertRaises(ValueError, use_closed)
1018
1019    def test_context_manager_during_rollover(self):
1020        # A SpooledTemporaryFile can be used as a context manager
1021        with tempfile.SpooledTemporaryFile(max_size=1) as f:
1022            self.assertFalse(f._rolled)
1023            f.write('abc\n')
1024            f.flush()
1025            self.assertTrue(f._rolled)
1026            self.assertFalse(f.closed)
1027        self.assertTrue(f.closed)
1028        def use_closed():
1029            with f:
1030                pass
1031        self.assertRaises(ValueError, use_closed)
1032
1033    def test_context_manager_after_rollover(self):
1034        # A SpooledTemporaryFile can be used as a context manager
1035        f = tempfile.SpooledTemporaryFile(max_size=1)
1036        f.write('abc\n')
1037        f.flush()
1038        self.assertTrue(f._rolled)
1039        with f:
1040            self.assertFalse(f.closed)
1041        self.assertTrue(f.closed)
1042        def use_closed():
1043            with f:
1044                pass
1045        self.assertRaises(ValueError, use_closed)
1046
1047
1048test_classes.append(test_SpooledTemporaryFile)
1049
1050
1051class test_TemporaryFile(TC):
1052    """Test TemporaryFile()."""
1053
1054    def test_basic(self):
1055        # TemporaryFile can create files
1056        # No point in testing the name params - the file has no name.
1057        try:
1058            tempfile.TemporaryFile()
1059        except:
1060            self.failOnException("TemporaryFile")
1061
1062    def test_has_no_name(self):
1063        # TemporaryFile creates files with no names (on this system)
1064        dir = tempfile.mkdtemp()
1065        f = tempfile.TemporaryFile(dir=dir)
1066        f.write('blat')
1067
1068        # Sneaky: because this file has no name, it should not prevent
1069        # us from removing the directory it was created in.
1070        try:
1071            os.rmdir(dir)
1072        except:
1073            ei = sys.exc_info()
1074            # cleanup
1075            f.close()
1076            os.rmdir(dir)
1077            self.failOnException("rmdir", ei)
1078
1079    def test_multiple_close(self):
1080        # A TemporaryFile can be closed many times without error
1081        f = tempfile.TemporaryFile()
1082        f.write('abc\n')
1083        f.close()
1084        try:
1085            f.close()
1086            f.close()
1087        except:
1088            self.failOnException("close")
1089
1090    # How to test the mode and bufsize parameters?
1091
1092
1093if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
1094    test_classes.append(test_TemporaryFile)
1095
1096def test_main():
1097    support.run_unittest(*test_classes)
1098
1099if __name__ == "__main__":
1100    test_main()
1101