1# As a test suite for the os module, this is woefully inadequate, but this
2# does add tests for a few functions which have been determined to be more
3# portable than they had been thought to be.
4
5import os
6import errno
7import unittest
8import warnings
9import sys
10import signal
11import subprocess
12import sysconfig
13import textwrap
14import time
15try:
16    import resource
17except ImportError:
18    resource = None
19
20from test import test_support
21from test.script_helper import assert_python_ok
22import mmap
23import uuid
24
25warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
26warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)
27
28# Tests creating TESTFN
29class FileTests(unittest.TestCase):
30    def setUp(self):
31        if os.path.exists(test_support.TESTFN):
32            os.unlink(test_support.TESTFN)
33    tearDown = setUp
34
35    def test_access(self):
36        f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
37        os.close(f)
38        self.assertTrue(os.access(test_support.TESTFN, os.W_OK))
39
40    def test_closerange(self):
41        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
42        # We must allocate two consecutive file descriptors, otherwise
43        # it will mess up other file descriptors (perhaps even the three
44        # standard ones).
45        second = os.dup(first)
46        try:
47            retries = 0
48            while second != first + 1:
49                os.close(first)
50                retries += 1
51                if retries > 10:
52                    # XXX test skipped
53                    self.skipTest("couldn't allocate two consecutive fds")
54                first, second = second, os.dup(second)
55        finally:
56            os.close(second)
57        # close a fd that is open, and one that isn't
58        os.closerange(first, first + 2)
59        self.assertRaises(OSError, os.write, first, "a")
60
61    @test_support.cpython_only
62    def test_rename(self):
63        path = unicode(test_support.TESTFN)
64        old = sys.getrefcount(path)
65        self.assertRaises(TypeError, os.rename, path, 0)
66        new = sys.getrefcount(path)
67        self.assertEqual(old, new)
68
69
70class TemporaryFileTests(unittest.TestCase):
71    def setUp(self):
72        self.files = []
73        os.mkdir(test_support.TESTFN)
74
75    def tearDown(self):
76        for name in self.files:
77            os.unlink(name)
78        os.rmdir(test_support.TESTFN)
79
80    def check_tempfile(self, name):
81        # make sure it doesn't already exist:
82        self.assertFalse(os.path.exists(name),
83                    "file already exists for temporary file")
84        # make sure we can create the file
85        open(name, "w")
86        self.files.append(name)
87
88    @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()')
89    def test_tempnam(self):
90        with warnings.catch_warnings():
91            warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
92                                    r"test_os$")
93            warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
94            self.check_tempfile(os.tempnam())
95
96            name = os.tempnam(test_support.TESTFN)
97            self.check_tempfile(name)
98
99            name = os.tempnam(test_support.TESTFN, "pfx")
100            self.assertTrue(os.path.basename(name)[:3] == "pfx")
101            self.check_tempfile(name)
102
103    @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()')
104    def test_tmpfile(self):
105        # As with test_tmpnam() below, the Windows implementation of tmpfile()
106        # attempts to create a file in the root directory of the current drive.
107        # On Vista and Server 2008, this test will always fail for normal users
108        # as writing to the root directory requires elevated privileges.  With
109        # XP and below, the semantics of tmpfile() are the same, but the user
110        # running the test is more likely to have administrative privileges on
111        # their account already.  If that's the case, then os.tmpfile() should
112        # work.  In order to make this test as useful as possible, rather than
113        # trying to detect Windows versions or whether or not the user has the
114        # right permissions, just try and create a file in the root directory
115        # and see if it raises a 'Permission denied' OSError.  If it does, then
116        # test that a subsequent call to os.tmpfile() raises the same error. If
117        # it doesn't, assume we're on XP or below and the user running the test
118        # has administrative privileges, and proceed with the test as normal.
119        with warnings.catch_warnings():
120            warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
121
122            if sys.platform == 'win32':
123                name = '\\python_test_os_test_tmpfile.txt'
124                if os.path.exists(name):
125                    os.remove(name)
126                try:
127                    fp = open(name, 'w')
128                except IOError, first:
129                    # open() failed, assert tmpfile() fails in the same way.
130                    # Although open() raises an IOError and os.tmpfile() raises an
131                    # OSError(), 'args' will be (13, 'Permission denied') in both
132                    # cases.
133                    try:
134                        fp = os.tmpfile()
135                    except OSError, second:
136                        self.assertEqual(first.args, second.args)
137                    else:
138                        self.fail("expected os.tmpfile() to raise OSError")
139                    return
140                else:
141                    # open() worked, therefore, tmpfile() should work.  Close our
142                    # dummy file and proceed with the test as normal.
143                    fp.close()
144                    os.remove(name)
145
146            fp = os.tmpfile()
147            fp.write("foobar")
148            fp.seek(0,0)
149            s = fp.read()
150            fp.close()
151            self.assertTrue(s == "foobar")
152
153    @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()')
154    def test_tmpnam(self):
155        with warnings.catch_warnings():
156            warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
157                                    r"test_os$")
158            warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)
159
160            name = os.tmpnam()
161            if sys.platform in ("win32",):
162                # The Windows tmpnam() seems useless.  From the MS docs:
163                #
164                #     The character string that tmpnam creates consists of
165                #     the path prefix, defined by the entry P_tmpdir in the
166                #     file STDIO.H, followed by a sequence consisting of the
167                #     digit characters '0' through '9'; the numerical value
168                #     of this string is in the range 1 - 65,535.  Changing the
169                #     definitions of L_tmpnam or P_tmpdir in STDIO.H does not
170                #     change the operation of tmpnam.
171                #
172                # The really bizarre part is that, at least under MSVC6,
173                # P_tmpdir is "\\".  That is, the path returned refers to
174                # the root of the current drive.  That's a terrible place to
175                # put temp files, and, depending on privileges, the user
176                # may not even be able to open a file in the root directory.
177                self.assertFalse(os.path.exists(name),
178                            "file already exists for temporary file")
179            else:
180                self.check_tempfile(name)
181
182# Test attributes on return values from os.*stat* family.
183class StatAttributeTests(unittest.TestCase):
184    def setUp(self):
185        os.mkdir(test_support.TESTFN)
186        self.fname = os.path.join(test_support.TESTFN, "f1")
187        f = open(self.fname, 'wb')
188        f.write("ABC")
189        f.close()
190
191    def tearDown(self):
192        os.unlink(self.fname)
193        os.rmdir(test_support.TESTFN)
194
195    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
196    def test_stat_attributes(self):
197        import stat
198        result = os.stat(self.fname)
199
200        # Make sure direct access works
201        self.assertEqual(result[stat.ST_SIZE], 3)
202        self.assertEqual(result.st_size, 3)
203
204        # Make sure all the attributes are there
205        members = dir(result)
206        for name in dir(stat):
207            if name[:3] == 'ST_':
208                attr = name.lower()
209                if name.endswith("TIME"):
210                    def trunc(x): return int(x)
211                else:
212                    def trunc(x): return x
213                self.assertEqual(trunc(getattr(result, attr)),
214                                 result[getattr(stat, name)])
215                self.assertIn(attr, members)
216
217        try:
218            result[200]
219            self.fail("No exception raised")
220        except IndexError:
221            pass
222
223        # Make sure that assignment fails
224        try:
225            result.st_mode = 1
226            self.fail("No exception raised")
227        except (AttributeError, TypeError):
228            pass
229
230        try:
231            result.st_rdev = 1
232            self.fail("No exception raised")
233        except (AttributeError, TypeError):
234            pass
235
236        try:
237            result.parrot = 1
238            self.fail("No exception raised")
239        except AttributeError:
240            pass
241
242        # Use the stat_result constructor with a too-short tuple.
243        try:
244            result2 = os.stat_result((10,))
245            self.fail("No exception raised")
246        except TypeError:
247            pass
248
249        # Use the constructor with a too-long tuple.
250        try:
251            result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
252        except TypeError:
253            pass
254
255
256    @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
257    def test_statvfs_attributes(self):
258        try:
259            result = os.statvfs(self.fname)
260        except OSError, e:
261            # On AtheOS, glibc always returns ENOSYS
262            if e.errno == errno.ENOSYS:
263                self.skipTest('glibc always returns ENOSYS on AtheOS')
264
265        # Make sure direct access works
266        self.assertEqual(result.f_bfree, result[3])
267
268        # Make sure all the attributes are there.
269        members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
270                    'ffree', 'favail', 'flag', 'namemax')
271        for value, member in enumerate(members):
272            self.assertEqual(getattr(result, 'f_' + member), result[value])
273
274        # Make sure that assignment really fails
275        try:
276            result.f_bfree = 1
277            self.fail("No exception raised")
278        except TypeError:
279            pass
280
281        try:
282            result.parrot = 1
283            self.fail("No exception raised")
284        except AttributeError:
285            pass
286
287        # Use the constructor with a too-short tuple.
288        try:
289            result2 = os.statvfs_result((10,))
290            self.fail("No exception raised")
291        except TypeError:
292            pass
293
294        # Use the constructor with a too-long tuple.
295        try:
296            result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
297        except TypeError:
298            pass
299
300    def test_utime_dir(self):
301        delta = 1000000
302        st = os.stat(test_support.TESTFN)
303        # round to int, because some systems may support sub-second
304        # time stamps in stat, but not in utime.
305        os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
306        st2 = os.stat(test_support.TESTFN)
307        self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
308
309    # Restrict tests to Win32, since there is no guarantee other
310    # systems support centiseconds
311    def get_file_system(path):
312        if sys.platform == 'win32':
313            root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
314            import ctypes
315            kernel32 = ctypes.windll.kernel32
316            buf = ctypes.create_string_buffer("", 100)
317            if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
318                return buf.value
319
320    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
321    @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
322                         "requires NTFS")
323    def test_1565150(self):
324        t1 = 1159195039.25
325        os.utime(self.fname, (t1, t1))
326        self.assertEqual(os.stat(self.fname).st_mtime, t1)
327
328    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
329    @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
330                         "requires NTFS")
331    def test_large_time(self):
332        t1 = 5000000000 # some day in 2128
333        os.utime(self.fname, (t1, t1))
334        self.assertEqual(os.stat(self.fname).st_mtime, t1)
335
336    @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
337    def test_1686475(self):
338        # Verify that an open file can be stat'ed
339        try:
340            os.stat(r"c:\pagefile.sys")
341        except WindowsError, e:
342            if e.errno == 2: # file does not exist; cannot run test
343                self.skipTest(r'c:\pagefile.sys does not exist')
344            self.fail("Could not stat pagefile.sys")
345
346from test import mapping_tests
347
348class EnvironTests(mapping_tests.BasicTestMappingProtocol):
349    """check that os.environ object conform to mapping protocol"""
350    type2test = None
351    def _reference(self):
352        return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
353    def _empty_mapping(self):
354        os.environ.clear()
355        return os.environ
356    def setUp(self):
357        self.__save = dict(os.environ)
358        os.environ.clear()
359    def tearDown(self):
360        os.environ.clear()
361        os.environ.update(self.__save)
362
363    # Bug 1110478
364    def test_update2(self):
365        if os.path.exists("/bin/sh"):
366            os.environ.update(HELLO="World")
367            with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
368                value = popen.read().strip()
369                self.assertEqual(value, "World")
370
371    # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
372    # #13415).
373    @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')),
374                     "due to known OS bug: see issue #13415")
375    def test_unset_error(self):
376        if sys.platform == "win32":
377            # an environment variable is limited to 32,767 characters
378            key = 'x' * 50000
379            self.assertRaises(ValueError, os.environ.__delitem__, key)
380        else:
381            # "=" is not allowed in a variable name
382            key = 'key='
383            self.assertRaises(OSError, os.environ.__delitem__, key)
384
385class WalkTests(unittest.TestCase):
386    """Tests for os.walk()."""
387
388    def test_traversal(self):
389        import os
390        from os.path import join
391
392        # Build:
393        #     TESTFN/
394        #       TEST1/              a file kid and two directory kids
395        #         tmp1
396        #         SUB1/             a file kid and a directory kid
397        #           tmp2
398        #           SUB11/          no kids
399        #         SUB2/             a file kid and a dirsymlink kid
400        #           tmp3
401        #           link/           a symlink to TESTFN.2
402        #       TEST2/
403        #         tmp4              a lone file
404        walk_path = join(test_support.TESTFN, "TEST1")
405        sub1_path = join(walk_path, "SUB1")
406        sub11_path = join(sub1_path, "SUB11")
407        sub2_path = join(walk_path, "SUB2")
408        tmp1_path = join(walk_path, "tmp1")
409        tmp2_path = join(sub1_path, "tmp2")
410        tmp3_path = join(sub2_path, "tmp3")
411        link_path = join(sub2_path, "link")
412        t2_path = join(test_support.TESTFN, "TEST2")
413        tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")
414
415        # Create stuff.
416        os.makedirs(sub11_path)
417        os.makedirs(sub2_path)
418        os.makedirs(t2_path)
419        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
420            f = file(path, "w")
421            f.write("I'm " + path + " and proud of it.  Blame test_os.\n")
422            f.close()
423        if hasattr(os, "symlink"):
424            os.symlink(os.path.abspath(t2_path), link_path)
425            sub2_tree = (sub2_path, ["link"], ["tmp3"])
426        else:
427            sub2_tree = (sub2_path, [], ["tmp3"])
428
429        # Walk top-down.
430        all = list(os.walk(walk_path))
431        self.assertEqual(len(all), 4)
432        # We can't know which order SUB1 and SUB2 will appear in.
433        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
434        #     flipped:  TESTFN, SUB2, SUB1, SUB11
435        flipped = all[0][1][0] != "SUB1"
436        all[0][1].sort()
437        self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
438        self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
439        self.assertEqual(all[2 + flipped], (sub11_path, [], []))
440        self.assertEqual(all[3 - 2 * flipped], sub2_tree)
441
442        # Prune the search.
443        all = []
444        for root, dirs, files in os.walk(walk_path):
445            all.append((root, dirs, files))
446            # Don't descend into SUB1.
447            if 'SUB1' in dirs:
448                # Note that this also mutates the dirs we appended to all!
449                dirs.remove('SUB1')
450        self.assertEqual(len(all), 2)
451        self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
452        self.assertEqual(all[1], sub2_tree)
453
454        # Walk bottom-up.
455        all = list(os.walk(walk_path, topdown=False))
456        self.assertEqual(len(all), 4)
457        # We can't know which order SUB1 and SUB2 will appear in.
458        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
459        #     flipped:  SUB2, SUB11, SUB1, TESTFN
460        flipped = all[3][1][0] != "SUB1"
461        all[3][1].sort()
462        self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
463        self.assertEqual(all[flipped], (sub11_path, [], []))
464        self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
465        self.assertEqual(all[2 - 2 * flipped], sub2_tree)
466
467        if hasattr(os, "symlink"):
468            # Walk, following symlinks.
469            for root, dirs, files in os.walk(walk_path, followlinks=True):
470                if root == link_path:
471                    self.assertEqual(dirs, [])
472                    self.assertEqual(files, ["tmp4"])
473                    break
474            else:
475                self.fail("Didn't follow symlink with followlinks=True")
476
477    def tearDown(self):
478        # Tear everything down.  This is a decent use for bottom-up on
479        # Windows, which doesn't have a recursive delete command.  The
480        # (not so) subtlety is that rmdir will fail unless the dir's
481        # kids are removed first, so bottom up is essential.
482        for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):
483            for name in files:
484                os.remove(os.path.join(root, name))
485            for name in dirs:
486                dirname = os.path.join(root, name)
487                if not os.path.islink(dirname):
488                    os.rmdir(dirname)
489                else:
490                    os.remove(dirname)
491        os.rmdir(test_support.TESTFN)
492
493class MakedirTests (unittest.TestCase):
494    def setUp(self):
495        os.mkdir(test_support.TESTFN)
496
497    def test_makedir(self):
498        base = test_support.TESTFN
499        path = os.path.join(base, 'dir1', 'dir2', 'dir3')
500        os.makedirs(path)             # Should work
501        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
502        os.makedirs(path)
503
504        # Try paths with a '.' in them
505        self.assertRaises(OSError, os.makedirs, os.curdir)
506        path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
507        os.makedirs(path)
508        path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
509                            'dir5', 'dir6')
510        os.makedirs(path)
511
512
513
514
515    def tearDown(self):
516        path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',
517                            'dir4', 'dir5', 'dir6')
518        # If the tests failed, the bottom-most directory ('../dir6')
519        # may not have been created, so we look for the outermost directory
520        # that exists.
521        while not os.path.exists(path) and path != test_support.TESTFN:
522            path = os.path.dirname(path)
523
524        os.removedirs(path)
525
526class DevNullTests (unittest.TestCase):
527    def test_devnull(self):
528        f = file(os.devnull, 'w')
529        f.write('hello')
530        f.close()
531        f = file(os.devnull, 'r')
532        self.assertEqual(f.read(), '')
533        f.close()
534
535class URandomTests (unittest.TestCase):
536
537    def test_urandom_length(self):
538        self.assertEqual(len(os.urandom(0)), 0)
539        self.assertEqual(len(os.urandom(1)), 1)
540        self.assertEqual(len(os.urandom(10)), 10)
541        self.assertEqual(len(os.urandom(100)), 100)
542        self.assertEqual(len(os.urandom(1000)), 1000)
543
544    def test_urandom_value(self):
545        data1 = os.urandom(16)
546        data2 = os.urandom(16)
547        self.assertNotEqual(data1, data2)
548
549    def get_urandom_subprocess(self, count):
550        # We need to use repr() and eval() to avoid line ending conversions
551        # under Windows.
552        code = '\n'.join((
553            'import os, sys',
554            'data = os.urandom(%s)' % count,
555            'sys.stdout.write(repr(data))',
556            'sys.stdout.flush()',
557            'print >> sys.stderr, (len(data), data)'))
558        cmd_line = [sys.executable, '-c', code]
559        p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
560                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
561        out, err = p.communicate()
562        self.assertEqual(p.wait(), 0, (p.wait(), err))
563        out = eval(out)
564        self.assertEqual(len(out), count, err)
565        return out
566
567    def test_urandom_subprocess(self):
568        data1 = self.get_urandom_subprocess(16)
569        data2 = self.get_urandom_subprocess(16)
570        self.assertNotEqual(data1, data2)
571
572
573HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
574
575@unittest.skipIf(HAVE_GETENTROPY,
576                 "getentropy() does not use a file descriptor")
577class URandomFDTests(unittest.TestCase):
578    @unittest.skipUnless(resource, "test requires the resource module")
579    def test_urandom_failure(self):
580        # Check urandom() failing when it is not able to open /dev/random.
581        # We spawn a new process to make the test more robust (if getrlimit()
582        # failed to restore the file descriptor limit after this, the whole
583        # test suite would crash; this actually happened on the OS X Tiger
584        # buildbot).
585        code = """if 1:
586            import errno
587            import os
588            import resource
589
590            soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
591            resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
592            try:
593                os.urandom(16)
594            except OSError as e:
595                assert e.errno == errno.EMFILE, e.errno
596            else:
597                raise AssertionError("OSError not raised")
598            """
599        assert_python_ok('-c', code)
600
601
602class ExecTests(unittest.TestCase):
603
604    def test_execvpe_with_bad_arglist(self):
605        self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
606
607    def test_execve_invalid_env(self):
608        args = [sys.executable, '-c', 'pass']
609
610        # null character in the enviroment variable name
611        newenv = os.environ.copy()
612        newenv["FRUIT\0VEGETABLE"] = "cabbage"
613        with self.assertRaises(TypeError):
614            os.execve(args[0], args, newenv)
615
616        # null character in the enviroment variable value
617        newenv = os.environ.copy()
618        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
619        with self.assertRaises(TypeError):
620            os.execve(args[0], args, newenv)
621
622        # equal character in the enviroment variable name
623        newenv = os.environ.copy()
624        newenv["FRUIT=ORANGE"] = "lemon"
625        with self.assertRaises(ValueError):
626            os.execve(args[0], args, newenv)
627
628
629@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
630class Win32ErrorTests(unittest.TestCase):
631    def test_rename(self):
632        self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
633
634    def test_remove(self):
635        self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
636
637    def test_chdir(self):
638        self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
639
640    def test_mkdir(self):
641        f = open(test_support.TESTFN, "w")
642        try:
643            self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)
644        finally:
645            f.close()
646            os.unlink(test_support.TESTFN)
647
648    def test_utime(self):
649        self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
650
651    def test_chmod(self):
652        self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)
653
654class TestInvalidFD(unittest.TestCase):
655    singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",
656               "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
657    #singles.append("close")
658    #We omit close because it doesn't raise an exception on some platforms
659    def get_single(f):
660        def helper(self):
661            if  hasattr(os, f):
662                self.check(getattr(os, f))
663        return helper
664    for f in singles:
665        locals()["test_"+f] = get_single(f)
666
667    def check(self, f, *args):
668        try:
669            f(test_support.make_bad_fd(), *args)
670        except OSError as e:
671            self.assertEqual(e.errno, errno.EBADF)
672        else:
673            self.fail("%r didn't raise an OSError with a bad file descriptor"
674                      % f)
675
676    @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
677    def test_isatty(self):
678        self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
679
680    @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
681    def test_closerange(self):
682        fd = test_support.make_bad_fd()
683        # Make sure none of the descriptors we are about to close are
684        # currently valid (issue 6542).
685        for i in range(10):
686            try: os.fstat(fd+i)
687            except OSError:
688                pass
689            else:
690                break
691        if i < 2:
692            raise unittest.SkipTest(
693                "Unable to acquire a range of invalid file descriptors")
694        self.assertEqual(os.closerange(fd, fd + i-1), None)
695
696    @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
697    def test_dup2(self):
698        self.check(os.dup2, 20)
699
700    @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
701    def test_fchmod(self):
702        self.check(os.fchmod, 0)
703
704    @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
705    def test_fchown(self):
706        self.check(os.fchown, -1, -1)
707
708    @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
709    def test_fpathconf(self):
710        self.check(os.fpathconf, "PC_NAME_MAX")
711
712    @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
713    def test_ftruncate(self):
714        self.check(os.ftruncate, 0)
715
716    @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
717    def test_lseek(self):
718        self.check(os.lseek, 0, 0)
719
720    @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
721    def test_read(self):
722        self.check(os.read, 1)
723
724    @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
725    def test_tcsetpgrpt(self):
726        self.check(os.tcsetpgrp, 0)
727
728    @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
729    def test_write(self):
730        self.check(os.write, " ")
731
732@unittest.skipIf(sys.platform == "win32", "Posix specific tests")
733class PosixUidGidTests(unittest.TestCase):
734    @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
735    def test_setuid(self):
736        if os.getuid() != 0:
737            self.assertRaises(os.error, os.setuid, 0)
738        self.assertRaises(OverflowError, os.setuid, 1<<32)
739
740    @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
741    def test_setgid(self):
742        if os.getuid() != 0:
743            self.assertRaises(os.error, os.setgid, 0)
744        self.assertRaises(OverflowError, os.setgid, 1<<32)
745
746    @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
747    def test_seteuid(self):
748        if os.getuid() != 0:
749            self.assertRaises(os.error, os.seteuid, 0)
750        self.assertRaises(OverflowError, os.seteuid, 1<<32)
751
752    @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
753    def test_setegid(self):
754        if os.getuid() != 0:
755            self.assertRaises(os.error, os.setegid, 0)
756        self.assertRaises(OverflowError, os.setegid, 1<<32)
757
758    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
759    def test_setreuid(self):
760        if os.getuid() != 0:
761            self.assertRaises(os.error, os.setreuid, 0, 0)
762        self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
763        self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
764
765    @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
766    def test_setreuid_neg1(self):
767        # Needs to accept -1.  We run this in a subprocess to avoid
768        # altering the test runner's process state (issue8045).
769        subprocess.check_call([
770                sys.executable, '-c',
771                'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
772
773    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
774    def test_setregid(self):
775        if os.getuid() != 0:
776            self.assertRaises(os.error, os.setregid, 0, 0)
777        self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
778        self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
779
780    @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
781    def test_setregid_neg1(self):
782        # Needs to accept -1.  We run this in a subprocess to avoid
783        # altering the test runner's process state (issue8045).
784        subprocess.check_call([
785                sys.executable, '-c',
786                'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
787
788
789@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
790class Win32KillTests(unittest.TestCase):
791    def _kill(self, sig):
792        # Start sys.executable as a subprocess and communicate from the
793        # subprocess to the parent that the interpreter is ready. When it
794        # becomes ready, send *sig* via os.kill to the subprocess and check
795        # that the return code is equal to *sig*.
796        import ctypes
797        from ctypes import wintypes
798        import msvcrt
799
800        # Since we can't access the contents of the process' stdout until the
801        # process has exited, use PeekNamedPipe to see what's inside stdout
802        # without waiting. This is done so we can tell that the interpreter
803        # is started and running at a point where it could handle a signal.
804        PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
805        PeekNamedPipe.restype = wintypes.BOOL
806        PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
807                                  ctypes.POINTER(ctypes.c_char), # stdout buf
808                                  wintypes.DWORD, # Buffer size
809                                  ctypes.POINTER(wintypes.DWORD), # bytes read
810                                  ctypes.POINTER(wintypes.DWORD), # bytes avail
811                                  ctypes.POINTER(wintypes.DWORD)) # bytes left
812        msg = "running"
813        proc = subprocess.Popen([sys.executable, "-c",
814                                 "import sys;"
815                                 "sys.stdout.write('{}');"
816                                 "sys.stdout.flush();"
817                                 "input()".format(msg)],
818                                stdout=subprocess.PIPE,
819                                stderr=subprocess.PIPE,
820                                stdin=subprocess.PIPE)
821        self.addCleanup(proc.stdout.close)
822        self.addCleanup(proc.stderr.close)
823        self.addCleanup(proc.stdin.close)
824
825        count, max = 0, 100
826        while count < max and proc.poll() is None:
827            # Create a string buffer to store the result of stdout from the pipe
828            buf = ctypes.create_string_buffer(len(msg))
829            # Obtain the text currently in proc.stdout
830            # Bytes read/avail/left are left as NULL and unused
831            rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
832                                 buf, ctypes.sizeof(buf), None, None, None)
833            self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
834            if buf.value:
835                self.assertEqual(msg, buf.value)
836                break
837            time.sleep(0.1)
838            count += 1
839        else:
840            self.fail("Did not receive communication from the subprocess")
841
842        os.kill(proc.pid, sig)
843        self.assertEqual(proc.wait(), sig)
844
845    def test_kill_sigterm(self):
846        # SIGTERM doesn't mean anything special, but make sure it works
847        self._kill(signal.SIGTERM)
848
849    def test_kill_int(self):
850        # os.kill on Windows can take an int which gets set as the exit code
851        self._kill(100)
852
853    def _kill_with_event(self, event, name):
854        tagname = "test_os_%s" % uuid.uuid1()
855        m = mmap.mmap(-1, 1, tagname)
856        m[0] = '0'
857        # Run a script which has console control handling enabled.
858        proc = subprocess.Popen([sys.executable,
859                   os.path.join(os.path.dirname(__file__),
860                                "win_console_handler.py"), tagname],
861                   creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
862        # Let the interpreter startup before we send signals. See #3137.
863        count, max = 0, 20
864        while count < max and proc.poll() is None:
865            if m[0] == '1':
866                break
867            time.sleep(0.5)
868            count += 1
869        else:
870            self.fail("Subprocess didn't finish initialization")
871        os.kill(proc.pid, event)
872        # proc.send_signal(event) could also be done here.
873        # Allow time for the signal to be passed and the process to exit.
874        time.sleep(0.5)
875        if not proc.poll():
876            # Forcefully kill the process if we weren't able to signal it.
877            os.kill(proc.pid, signal.SIGINT)
878            self.fail("subprocess did not stop on {}".format(name))
879
880    @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
881    def test_CTRL_C_EVENT(self):
882        from ctypes import wintypes
883        import ctypes
884
885        # Make a NULL value by creating a pointer with no argument.
886        NULL = ctypes.POINTER(ctypes.c_int)()
887        SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
888        SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
889                                          wintypes.BOOL)
890        SetConsoleCtrlHandler.restype = wintypes.BOOL
891
892        # Calling this with NULL and FALSE causes the calling process to
893        # handle Ctrl+C, rather than ignore it. This property is inherited
894        # by subprocesses.
895        SetConsoleCtrlHandler(NULL, 0)
896
897        self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
898
899    def test_CTRL_BREAK_EVENT(self):
900        self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
901
902
903@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
904class Win32ListdirTests(unittest.TestCase):
905    """Test listdir on Windows."""
906
907    def setUp(self):
908        self.created_paths = []
909        for i in range(2):
910            dir_name = 'SUB%d' % i
911            dir_path = os.path.join(support.TESTFN, dir_name)
912            file_name = 'FILE%d' % i
913            file_path = os.path.join(support.TESTFN, file_name)
914            os.makedirs(dir_path)
915            with open(file_path, 'w') as f:
916                f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
917            self.created_paths.extend([dir_name, file_name])
918        self.created_paths.sort()
919
920    def tearDown(self):
921        shutil.rmtree(support.TESTFN)
922
923    def test_listdir_no_extended_path(self):
924        """Test when the path is not an "extended" path."""
925        # unicode
926        fs_encoding = sys.getfilesystemencoding()
927        self.assertEqual(
928                sorted(os.listdir(support.TESTFN.decode(fs_encoding))),
929                [path.decode(fs_encoding) for path in self.created_paths])
930
931        # bytes
932        self.assertEqual(
933                sorted(os.listdir(os.fsencode(support.TESTFN))),
934                self.created_paths)
935
936    def test_listdir_extended_path(self):
937        """Test when the path starts with '\\\\?\\'."""
938        # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
939        # unicode
940        fs_encoding = sys.getfilesystemencoding()
941        path = u'\\\\?\\' + os.path.abspath(support.TESTFN.decode(fs_encoding))
942        self.assertEqual(
943                sorted(os.listdir(path)),
944                [path.decode(fs_encoding) for path in self.created_paths])
945
946        # bytes
947        path = b'\\\\?\\' + os.path.abspath(support.TESTFN)
948        self.assertEqual(
949                sorted(os.listdir(path)),
950                self.created_paths)
951
952
953class SpawnTests(unittest.TestCase):
954    def _test_invalid_env(self, spawn):
955        args = [sys.executable, '-c', 'pass']
956
957        # null character in the enviroment variable name
958        newenv = os.environ.copy()
959        newenv["FRUIT\0VEGETABLE"] = "cabbage"
960        try:
961            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
962        except TypeError:
963            pass
964        else:
965            self.assertEqual(exitcode, 127)
966
967        # null character in the enviroment variable value
968        newenv = os.environ.copy()
969        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
970        try:
971            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
972        except TypeError:
973            pass
974        else:
975            self.assertEqual(exitcode, 127)
976
977        # equal character in the enviroment variable name
978        newenv = os.environ.copy()
979        newenv["FRUIT=ORANGE"] = "lemon"
980        try:
981            exitcode = spawn(os.P_WAIT, args[0], args, newenv)
982        except ValueError:
983            pass
984        else:
985            self.assertEqual(exitcode, 127)
986
987        # equal character in the enviroment variable value
988        filename = test_support.TESTFN
989        self.addCleanup(test_support.unlink, filename)
990        with open(filename, "w") as fp:
991            fp.write('import sys, os\n'
992                     'if os.getenv("FRUIT") != "orange=lemon":\n'
993                     '    raise AssertionError')
994        args = [sys.executable, filename]
995        newenv = os.environ.copy()
996        newenv["FRUIT"] = "orange=lemon"
997        exitcode = spawn(os.P_WAIT, args[0], args, newenv)
998        self.assertEqual(exitcode, 0)
999
1000    @unittest.skipUnless(hasattr(os, 'spawnve'), 'test needs os.spawnve()')
1001    def test_spawnve_invalid_env(self):
1002        self._test_invalid_env(os.spawnve)
1003
1004    @unittest.skipUnless(hasattr(os, 'spawnvpe'), 'test needs os.spawnvpe()')
1005    def test_spawnvpe_invalid_env(self):
1006        self._test_invalid_env(os.spawnvpe)
1007
1008
1009def test_main():
1010    test_support.run_unittest(
1011        FileTests,
1012        TemporaryFileTests,
1013        StatAttributeTests,
1014        EnvironTests,
1015        WalkTests,
1016        MakedirTests,
1017        DevNullTests,
1018        URandomTests,
1019        URandomFDTests,
1020        ExecTests,
1021        Win32ErrorTests,
1022        TestInvalidFD,
1023        PosixUidGidTests,
1024        Win32KillTests,
1025        SpawnTests,
1026    )
1027
1028if __name__ == "__main__":
1029    test_main()
1030