1"Test posix functions"
2
3from test import support
4from test.support.script_helper import assert_python_ok
5
6# Skip these tests if there is no posix module.
7posix = support.import_module('posix')
8
9import errno
10import sys
11import signal
12import time
13import os
14import platform
15import pwd
16import stat
17import tempfile
18import unittest
19import warnings
20import textwrap
21
22_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
23                              support.TESTFN + '-dummy-symlink')
24
25requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
26        'test is only meaningful on 32-bit builds')
27
28def _supports_sched():
29    if not hasattr(posix, 'sched_getscheduler'):
30        return False
31    try:
32        posix.sched_getscheduler(0)
33    except OSError as e:
34        if e.errno == errno.ENOSYS:
35            return False
36    return True
37
38requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
39
40
41class PosixTester(unittest.TestCase):
42
43    def setUp(self):
44        # create empty file
45        fp = open(support.TESTFN, 'w+')
46        fp.close()
47        self.teardown_files = [ support.TESTFN ]
48        self._warnings_manager = support.check_warnings()
49        self._warnings_manager.__enter__()
50        warnings.filterwarnings('ignore', '.* potential security risk .*',
51                                RuntimeWarning)
52
53    def tearDown(self):
54        for teardown_file in self.teardown_files:
55            support.unlink(teardown_file)
56        self._warnings_manager.__exit__(None, None, None)
57
58    def testNoArgFunctions(self):
59        # test posix functions which take no arguments and have
60        # no side-effects which we need to cleanup (e.g., fork, wait, abort)
61        NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
62                             "times", "getloadavg",
63                             "getegid", "geteuid", "getgid", "getgroups",
64                             "getpid", "getpgrp", "getppid", "getuid", "sync",
65                           ]
66
67        for name in NO_ARG_FUNCTIONS:
68            posix_func = getattr(posix, name, None)
69            if posix_func is not None:
70                posix_func()
71                self.assertRaises(TypeError, posix_func, 1)
72
73    @unittest.skipUnless(hasattr(posix, 'getresuid'),
74                         'test needs posix.getresuid()')
75    def test_getresuid(self):
76        user_ids = posix.getresuid()
77        self.assertEqual(len(user_ids), 3)
78        for val in user_ids:
79            self.assertGreaterEqual(val, 0)
80
81    @unittest.skipUnless(hasattr(posix, 'getresgid'),
82                         'test needs posix.getresgid()')
83    def test_getresgid(self):
84        group_ids = posix.getresgid()
85        self.assertEqual(len(group_ids), 3)
86        for val in group_ids:
87            self.assertGreaterEqual(val, 0)
88
89    @unittest.skipUnless(hasattr(posix, 'setresuid'),
90                         'test needs posix.setresuid()')
91    def test_setresuid(self):
92        current_user_ids = posix.getresuid()
93        self.assertIsNone(posix.setresuid(*current_user_ids))
94        # -1 means don't change that value.
95        self.assertIsNone(posix.setresuid(-1, -1, -1))
96
97    @unittest.skipUnless(hasattr(posix, 'setresuid'),
98                         'test needs posix.setresuid()')
99    def test_setresuid_exception(self):
100        # Don't do this test if someone is silly enough to run us as root.
101        current_user_ids = posix.getresuid()
102        if 0 not in current_user_ids:
103            new_user_ids = (current_user_ids[0]+1, -1, -1)
104            self.assertRaises(OSError, posix.setresuid, *new_user_ids)
105
106    @unittest.skipUnless(hasattr(posix, 'setresgid'),
107                         'test needs posix.setresgid()')
108    def test_setresgid(self):
109        current_group_ids = posix.getresgid()
110        self.assertIsNone(posix.setresgid(*current_group_ids))
111        # -1 means don't change that value.
112        self.assertIsNone(posix.setresgid(-1, -1, -1))
113
114    @unittest.skipUnless(hasattr(posix, 'setresgid'),
115                         'test needs posix.setresgid()')
116    def test_setresgid_exception(self):
117        # Don't do this test if someone is silly enough to run us as root.
118        current_group_ids = posix.getresgid()
119        if 0 not in current_group_ids:
120            new_group_ids = (current_group_ids[0]+1, -1, -1)
121            self.assertRaises(OSError, posix.setresgid, *new_group_ids)
122
123    @unittest.skipUnless(hasattr(posix, 'initgroups'),
124                         "test needs os.initgroups()")
125    def test_initgroups(self):
126        # It takes a string and an integer; check that it raises a TypeError
127        # for other argument lists.
128        self.assertRaises(TypeError, posix.initgroups)
129        self.assertRaises(TypeError, posix.initgroups, None)
130        self.assertRaises(TypeError, posix.initgroups, 3, "foo")
131        self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
132
133        # If a non-privileged user invokes it, it should fail with OSError
134        # EPERM.
135        if os.getuid() != 0:
136            try:
137                name = pwd.getpwuid(posix.getuid()).pw_name
138            except KeyError:
139                # the current UID may not have a pwd entry
140                raise unittest.SkipTest("need a pwd entry")
141            try:
142                posix.initgroups(name, 13)
143            except OSError as e:
144                self.assertEqual(e.errno, errno.EPERM)
145            else:
146                self.fail("Expected OSError to be raised by initgroups")
147
148    @unittest.skipUnless(hasattr(posix, 'statvfs'),
149                         'test needs posix.statvfs()')
150    def test_statvfs(self):
151        self.assertTrue(posix.statvfs(os.curdir))
152
153    @unittest.skipUnless(hasattr(posix, 'fstatvfs'),
154                         'test needs posix.fstatvfs()')
155    def test_fstatvfs(self):
156        fp = open(support.TESTFN)
157        try:
158            self.assertTrue(posix.fstatvfs(fp.fileno()))
159            self.assertTrue(posix.statvfs(fp.fileno()))
160        finally:
161            fp.close()
162
163    @unittest.skipUnless(hasattr(posix, 'ftruncate'),
164                         'test needs posix.ftruncate()')
165    def test_ftruncate(self):
166        fp = open(support.TESTFN, 'w+')
167        try:
168            # we need to have some data to truncate
169            fp.write('test')
170            fp.flush()
171            posix.ftruncate(fp.fileno(), 0)
172        finally:
173            fp.close()
174
175    @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
176    def test_truncate(self):
177        with open(support.TESTFN, 'w') as fp:
178            fp.write('test')
179            fp.flush()
180        posix.truncate(support.TESTFN, 0)
181
182    @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
183    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
184    def test_fexecve(self):
185        fp = os.open(sys.executable, os.O_RDONLY)
186        try:
187            pid = os.fork()
188            if pid == 0:
189                os.chdir(os.path.split(sys.executable)[0])
190                posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
191            else:
192                support.wait_process(pid, exitcode=0)
193        finally:
194            os.close(fp)
195
196
197    @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
198    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
199    def test_waitid(self):
200        pid = os.fork()
201        if pid == 0:
202            os.chdir(os.path.split(sys.executable)[0])
203            posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
204        else:
205            res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
206            self.assertEqual(pid, res.si_pid)
207
208    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
209    def test_register_at_fork(self):
210        with self.assertRaises(TypeError, msg="Positional args not allowed"):
211            os.register_at_fork(lambda: None)
212        with self.assertRaises(TypeError, msg="Args must be callable"):
213            os.register_at_fork(before=2)
214        with self.assertRaises(TypeError, msg="Args must be callable"):
215            os.register_at_fork(after_in_child="three")
216        with self.assertRaises(TypeError, msg="Args must be callable"):
217            os.register_at_fork(after_in_parent=b"Five")
218        with self.assertRaises(TypeError, msg="Args must not be None"):
219            os.register_at_fork(before=None)
220        with self.assertRaises(TypeError, msg="Args must not be None"):
221            os.register_at_fork(after_in_child=None)
222        with self.assertRaises(TypeError, msg="Args must not be None"):
223            os.register_at_fork(after_in_parent=None)
224        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
225            # Ensure a combination of valid and invalid is an error.
226            os.register_at_fork(before=None, after_in_parent=lambda: 3)
227        with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
228            # Ensure a combination of valid and invalid is an error.
229            os.register_at_fork(before=lambda: None, after_in_child='')
230        # We test actual registrations in their own process so as not to
231        # pollute this one.  There is no way to unregister for cleanup.
232        code = """if 1:
233            import os
234
235            r, w = os.pipe()
236            fin_r, fin_w = os.pipe()
237
238            os.register_at_fork(before=lambda: os.write(w, b'A'))
239            os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
240            os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
241            os.register_at_fork(before=lambda: os.write(w, b'B'),
242                                after_in_parent=lambda: os.write(w, b'D'),
243                                after_in_child=lambda: os.write(w, b'F'))
244
245            pid = os.fork()
246            if pid == 0:
247                # At this point, after-forkers have already been executed
248                os.close(w)
249                # Wait for parent to tell us to exit
250                os.read(fin_r, 1)
251                os._exit(0)
252            else:
253                try:
254                    os.close(w)
255                    with open(r, "rb") as f:
256                        data = f.read()
257                        assert len(data) == 6, data
258                        # Check before-fork callbacks
259                        assert data[:2] == b'BA', data
260                        # Check after-fork callbacks
261                        assert sorted(data[2:]) == list(b'CDEF'), data
262                        assert data.index(b'C') < data.index(b'D'), data
263                        assert data.index(b'E') < data.index(b'F'), data
264                finally:
265                    os.write(fin_w, b'!')
266            """
267        assert_python_ok('-c', code)
268
269    @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
270    def test_lockf(self):
271        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
272        try:
273            os.write(fd, b'test')
274            os.lseek(fd, 0, os.SEEK_SET)
275            posix.lockf(fd, posix.F_LOCK, 4)
276            # section is locked
277            posix.lockf(fd, posix.F_ULOCK, 4)
278        finally:
279            os.close(fd)
280
281    @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
282    def test_pread(self):
283        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
284        try:
285            os.write(fd, b'test')
286            os.lseek(fd, 0, os.SEEK_SET)
287            self.assertEqual(b'es', posix.pread(fd, 2, 1))
288            # the first pread() shouldn't disturb the file offset
289            self.assertEqual(b'te', posix.read(fd, 2))
290        finally:
291            os.close(fd)
292
293    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
294    def test_preadv(self):
295        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
296        try:
297            os.write(fd, b'test1tt2t3t5t6t6t8')
298            buf = [bytearray(i) for i in [5, 3, 2]]
299            self.assertEqual(posix.preadv(fd, buf, 3), 10)
300            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
301        finally:
302            os.close(fd)
303
304    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
305    @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
306    def test_preadv_flags(self):
307        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
308        try:
309            os.write(fd, b'test1tt2t3t5t6t6t8')
310            buf = [bytearray(i) for i in [5, 3, 2]]
311            self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
312            self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
313        except NotImplementedError:
314            self.skipTest("preadv2 not available")
315        except OSError as inst:
316            # Is possible that the macro RWF_HIPRI was defined at compilation time
317            # but the option is not supported by the kernel or the runtime libc shared
318            # library.
319            if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
320                raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
321            else:
322                raise
323        finally:
324            os.close(fd)
325
326    @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
327    @requires_32b
328    def test_preadv_overflow_32bits(self):
329        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
330        try:
331            buf = [bytearray(2**16)] * 2**15
332            with self.assertRaises(OSError) as cm:
333                os.preadv(fd, buf, 0)
334            self.assertEqual(cm.exception.errno, errno.EINVAL)
335            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
336        finally:
337            os.close(fd)
338
339    @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
340    def test_pwrite(self):
341        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
342        try:
343            os.write(fd, b'test')
344            os.lseek(fd, 0, os.SEEK_SET)
345            posix.pwrite(fd, b'xx', 1)
346            self.assertEqual(b'txxt', posix.read(fd, 4))
347        finally:
348            os.close(fd)
349
350    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
351    def test_pwritev(self):
352        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
353        try:
354            os.write(fd, b"xx")
355            os.lseek(fd, 0, os.SEEK_SET)
356            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2)
357            self.assertEqual(n, 10)
358
359            os.lseek(fd, 0, os.SEEK_SET)
360            self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100))
361        finally:
362            os.close(fd)
363
364    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
365    @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC")
366    def test_pwritev_flags(self):
367        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
368        try:
369            os.write(fd,b"xx")
370            os.lseek(fd, 0, os.SEEK_SET)
371            n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC)
372            self.assertEqual(n, 10)
373
374            os.lseek(fd, 0, os.SEEK_SET)
375            self.assertEqual(b'xxtest1tt2', posix.read(fd, 100))
376        finally:
377            os.close(fd)
378
379    @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()")
380    @requires_32b
381    def test_pwritev_overflow_32bits(self):
382        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
383        try:
384            with self.assertRaises(OSError) as cm:
385                os.pwritev(fd, [b"x" * 2**16] * 2**15, 0)
386            self.assertEqual(cm.exception.errno, errno.EINVAL)
387        finally:
388            os.close(fd)
389
390    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
391        "test needs posix.posix_fallocate()")
392    def test_posix_fallocate(self):
393        fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
394        try:
395            posix.posix_fallocate(fd, 0, 10)
396        except OSError as inst:
397            # issue10812, ZFS doesn't appear to support posix_fallocate,
398            # so skip Solaris-based since they are likely to have ZFS.
399            # issue33655: Also ignore EINVAL on *BSD since ZFS is also
400            # often used there.
401            if inst.errno == errno.EINVAL and sys.platform.startswith(
402                ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')):
403                raise unittest.SkipTest("test may fail on ZFS filesystems")
404            else:
405                raise
406        finally:
407            os.close(fd)
408
409    # issue31106 - posix_fallocate() does not set error in errno.
410    @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
411        "test needs posix.posix_fallocate()")
412    def test_posix_fallocate_errno(self):
413        try:
414            posix.posix_fallocate(-42, 0, 10)
415        except OSError as inst:
416            if inst.errno != errno.EBADF:
417                raise
418
419    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
420        "test needs posix.posix_fadvise()")
421    def test_posix_fadvise(self):
422        fd = os.open(support.TESTFN, os.O_RDONLY)
423        try:
424            posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
425        finally:
426            os.close(fd)
427
428    @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
429        "test needs posix.posix_fadvise()")
430    def test_posix_fadvise_errno(self):
431        try:
432            posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED)
433        except OSError as inst:
434            if inst.errno != errno.EBADF:
435                raise
436
437    @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")
438    def test_utime_with_fd(self):
439        now = time.time()
440        fd = os.open(support.TESTFN, os.O_RDONLY)
441        try:
442            posix.utime(fd)
443            posix.utime(fd, None)
444            self.assertRaises(TypeError, posix.utime, fd, (None, None))
445            self.assertRaises(TypeError, posix.utime, fd, (now, None))
446            self.assertRaises(TypeError, posix.utime, fd, (None, now))
447            posix.utime(fd, (int(now), int(now)))
448            posix.utime(fd, (now, now))
449            self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))
450            self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))
451            self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))
452            posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))
453            posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))
454
455        finally:
456            os.close(fd)
457
458    @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")
459    def test_utime_nofollow_symlinks(self):
460        now = time.time()
461        posix.utime(support.TESTFN, None, follow_symlinks=False)
462        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)
463        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)
464        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)
465        posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)
466        posix.utime(support.TESTFN, (now, now), follow_symlinks=False)
467        posix.utime(support.TESTFN, follow_symlinks=False)
468
469    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
470    def test_writev(self):
471        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
472        try:
473            n = os.writev(fd, (b'test1', b'tt2', b't3'))
474            self.assertEqual(n, 10)
475
476            os.lseek(fd, 0, os.SEEK_SET)
477            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
478
479            # Issue #20113: empty list of buffers should not crash
480            try:
481                size = posix.writev(fd, [])
482            except OSError:
483                # writev(fd, []) raises OSError(22, "Invalid argument")
484                # on OpenIndiana
485                pass
486            else:
487                self.assertEqual(size, 0)
488        finally:
489            os.close(fd)
490
491    @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
492    @requires_32b
493    def test_writev_overflow_32bits(self):
494        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
495        try:
496            with self.assertRaises(OSError) as cm:
497                os.writev(fd, [b"x" * 2**16] * 2**15)
498            self.assertEqual(cm.exception.errno, errno.EINVAL)
499        finally:
500            os.close(fd)
501
502    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
503    def test_readv(self):
504        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
505        try:
506            os.write(fd, b'test1tt2t3')
507            os.lseek(fd, 0, os.SEEK_SET)
508            buf = [bytearray(i) for i in [5, 3, 2]]
509            self.assertEqual(posix.readv(fd, buf), 10)
510            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
511
512            # Issue #20113: empty list of buffers should not crash
513            try:
514                size = posix.readv(fd, [])
515            except OSError:
516                # readv(fd, []) raises OSError(22, "Invalid argument")
517                # on OpenIndiana
518                pass
519            else:
520                self.assertEqual(size, 0)
521        finally:
522            os.close(fd)
523
524    @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
525    @requires_32b
526    def test_readv_overflow_32bits(self):
527        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
528        try:
529            buf = [bytearray(2**16)] * 2**15
530            with self.assertRaises(OSError) as cm:
531                os.readv(fd, buf)
532            self.assertEqual(cm.exception.errno, errno.EINVAL)
533            self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
534        finally:
535            os.close(fd)
536
537    @unittest.skipUnless(hasattr(posix, 'dup'),
538                         'test needs posix.dup()')
539    def test_dup(self):
540        fp = open(support.TESTFN)
541        try:
542            fd = posix.dup(fp.fileno())
543            self.assertIsInstance(fd, int)
544            os.close(fd)
545        finally:
546            fp.close()
547
548    @unittest.skipUnless(hasattr(posix, 'confstr'),
549                         'test needs posix.confstr()')
550    def test_confstr(self):
551        self.assertRaises(ValueError, posix.confstr, "CS_garbage")
552        self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
553
554    @unittest.skipUnless(hasattr(posix, 'dup2'),
555                         'test needs posix.dup2()')
556    def test_dup2(self):
557        fp1 = open(support.TESTFN)
558        fp2 = open(support.TESTFN)
559        try:
560            posix.dup2(fp1.fileno(), fp2.fileno())
561        finally:
562            fp1.close()
563            fp2.close()
564
565    @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
566    @support.requires_linux_version(2, 6, 23)
567    def test_oscloexec(self):
568        fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
569        self.addCleanup(os.close, fd)
570        self.assertFalse(os.get_inheritable(fd))
571
572    @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),
573                         'test needs posix.O_EXLOCK')
574    def test_osexlock(self):
575        fd = os.open(support.TESTFN,
576                     os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
577        self.assertRaises(OSError, os.open, support.TESTFN,
578                          os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
579        os.close(fd)
580
581        if hasattr(posix, "O_SHLOCK"):
582            fd = os.open(support.TESTFN,
583                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
584            self.assertRaises(OSError, os.open, support.TESTFN,
585                              os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
586            os.close(fd)
587
588    @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),
589                         'test needs posix.O_SHLOCK')
590    def test_osshlock(self):
591        fd1 = os.open(support.TESTFN,
592                     os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
593        fd2 = os.open(support.TESTFN,
594                      os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
595        os.close(fd2)
596        os.close(fd1)
597
598        if hasattr(posix, "O_EXLOCK"):
599            fd = os.open(support.TESTFN,
600                         os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
601            self.assertRaises(OSError, os.open, support.TESTFN,
602                              os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
603            os.close(fd)
604
605    @unittest.skipUnless(hasattr(posix, 'fstat'),
606                         'test needs posix.fstat()')
607    def test_fstat(self):
608        fp = open(support.TESTFN)
609        try:
610            self.assertTrue(posix.fstat(fp.fileno()))
611            self.assertTrue(posix.stat(fp.fileno()))
612
613            self.assertRaisesRegex(TypeError,
614                    'should be string, bytes, os.PathLike or integer, not',
615                    posix.stat, float(fp.fileno()))
616        finally:
617            fp.close()
618
619    def test_stat(self):
620        self.assertTrue(posix.stat(support.TESTFN))
621        self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))
622
623        self.assertWarnsRegex(DeprecationWarning,
624                'should be string, bytes, os.PathLike or integer, not',
625                posix.stat, bytearray(os.fsencode(support.TESTFN)))
626        self.assertRaisesRegex(TypeError,
627                'should be string, bytes, os.PathLike or integer, not',
628                posix.stat, None)
629        self.assertRaisesRegex(TypeError,
630                'should be string, bytes, os.PathLike or integer, not',
631                posix.stat, list(support.TESTFN))
632        self.assertRaisesRegex(TypeError,
633                'should be string, bytes, os.PathLike or integer, not',
634                posix.stat, list(os.fsencode(support.TESTFN)))
635
636    @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
637    def test_mkfifo(self):
638        support.unlink(support.TESTFN)
639        try:
640            posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
641        except PermissionError as e:
642            self.skipTest('posix.mkfifo(): %s' % e)
643        self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
644
645    @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
646                         "don't have mknod()/S_IFIFO")
647    def test_mknod(self):
648        # Test using mknod() to create a FIFO (the only use specified
649        # by POSIX).
650        support.unlink(support.TESTFN)
651        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
652        try:
653            posix.mknod(support.TESTFN, mode, 0)
654        except OSError as e:
655            # Some old systems don't allow unprivileged users to use
656            # mknod(), or only support creating device nodes.
657            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
658        else:
659            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
660
661        # Keyword arguments are also supported
662        support.unlink(support.TESTFN)
663        try:
664            posix.mknod(path=support.TESTFN, mode=mode, device=0,
665                dir_fd=None)
666        except OSError as e:
667            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
668
669    @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')
670    def test_makedev(self):
671        st = posix.stat(support.TESTFN)
672        dev = st.st_dev
673        self.assertIsInstance(dev, int)
674        self.assertGreaterEqual(dev, 0)
675
676        major = posix.major(dev)
677        self.assertIsInstance(major, int)
678        self.assertGreaterEqual(major, 0)
679        self.assertEqual(posix.major(dev), major)
680        self.assertRaises(TypeError, posix.major, float(dev))
681        self.assertRaises(TypeError, posix.major)
682        self.assertRaises((ValueError, OverflowError), posix.major, -1)
683
684        minor = posix.minor(dev)
685        self.assertIsInstance(minor, int)
686        self.assertGreaterEqual(minor, 0)
687        self.assertEqual(posix.minor(dev), minor)
688        self.assertRaises(TypeError, posix.minor, float(dev))
689        self.assertRaises(TypeError, posix.minor)
690        self.assertRaises((ValueError, OverflowError), posix.minor, -1)
691
692        self.assertEqual(posix.makedev(major, minor), dev)
693        self.assertRaises(TypeError, posix.makedev, float(major), minor)
694        self.assertRaises(TypeError, posix.makedev, major, float(minor))
695        self.assertRaises(TypeError, posix.makedev, major)
696        self.assertRaises(TypeError, posix.makedev)
697
698    def _test_all_chown_common(self, chown_func, first_param, stat_func):
699        """Common code for chown, fchown and lchown tests."""
700        def check_stat(uid, gid):
701            if stat_func is not None:
702                stat = stat_func(first_param)
703                self.assertEqual(stat.st_uid, uid)
704                self.assertEqual(stat.st_gid, gid)
705        uid = os.getuid()
706        gid = os.getgid()
707        # test a successful chown call
708        chown_func(first_param, uid, gid)
709        check_stat(uid, gid)
710        chown_func(first_param, -1, gid)
711        check_stat(uid, gid)
712        chown_func(first_param, uid, -1)
713        check_stat(uid, gid)
714
715        if uid == 0:
716            # Try an amusingly large uid/gid to make sure we handle
717            # large unsigned values.  (chown lets you use any
718            # uid/gid you like, even if they aren't defined.)
719            #
720            # This problem keeps coming up:
721            #   http://bugs.python.org/issue1747858
722            #   http://bugs.python.org/issue4591
723            #   http://bugs.python.org/issue15301
724            # Hopefully the fix in 4591 fixes it for good!
725            #
726            # This part of the test only runs when run as root.
727            # Only scary people run their tests as root.
728
729            big_value = 2**31
730            chown_func(first_param, big_value, big_value)
731            check_stat(big_value, big_value)
732            chown_func(first_param, -1, -1)
733            check_stat(big_value, big_value)
734            chown_func(first_param, uid, gid)
735            check_stat(uid, gid)
736        elif platform.system() in ('HP-UX', 'SunOS'):
737            # HP-UX and Solaris can allow a non-root user to chown() to root
738            # (issue #5113)
739            raise unittest.SkipTest("Skipping because of non-standard chown() "
740                                    "behavior")
741        else:
742            # non-root cannot chown to root, raises OSError
743            self.assertRaises(OSError, chown_func, first_param, 0, 0)
744            check_stat(uid, gid)
745            self.assertRaises(OSError, chown_func, first_param, 0, -1)
746            check_stat(uid, gid)
747            if 0 not in os.getgroups():
748                self.assertRaises(OSError, chown_func, first_param, -1, 0)
749                check_stat(uid, gid)
750        # test illegal types
751        for t in str, float:
752            self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)
753            check_stat(uid, gid)
754            self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))
755            check_stat(uid, gid)
756
757    @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
758    def test_chown(self):
759        # raise an OSError if the file does not exist
760        os.unlink(support.TESTFN)
761        self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
762
763        # re-create the file
764        support.create_empty_file(support.TESTFN)
765        self._test_all_chown_common(posix.chown, support.TESTFN, posix.stat)
766
767    @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
768    def test_fchown(self):
769        os.unlink(support.TESTFN)
770
771        # re-create the file
772        test_file = open(support.TESTFN, 'w')
773        try:
774            fd = test_file.fileno()
775            self._test_all_chown_common(posix.fchown, fd,
776                                        getattr(posix, 'fstat', None))
777        finally:
778            test_file.close()
779
780    @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
781    def test_lchown(self):
782        os.unlink(support.TESTFN)
783        # create a symlink
784        os.symlink(_DUMMY_SYMLINK, support.TESTFN)
785        self._test_all_chown_common(posix.lchown, support.TESTFN,
786                                    getattr(posix, 'lstat', None))
787
788    @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')
789    def test_chdir(self):
790        posix.chdir(os.curdir)
791        self.assertRaises(OSError, posix.chdir, support.TESTFN)
792
793    def test_listdir(self):
794        self.assertIn(support.TESTFN, posix.listdir(os.curdir))
795
796    def test_listdir_default(self):
797        # When listdir is called without argument,
798        # it's the same as listdir(os.curdir).
799        self.assertIn(support.TESTFN, posix.listdir())
800
801    def test_listdir_bytes(self):
802        # When listdir is called with a bytes object,
803        # the returned strings are of type bytes.
804        self.assertIn(os.fsencode(support.TESTFN), posix.listdir(b'.'))
805
806    def test_listdir_bytes_like(self):
807        for cls in bytearray, memoryview:
808            with self.assertWarns(DeprecationWarning):
809                names = posix.listdir(cls(b'.'))
810            self.assertIn(os.fsencode(support.TESTFN), names)
811            for name in names:
812                self.assertIs(type(name), bytes)
813
814    @unittest.skipUnless(posix.listdir in os.supports_fd,
815                         "test needs fd support for posix.listdir()")
816    def test_listdir_fd(self):
817        f = posix.open(posix.getcwd(), posix.O_RDONLY)
818        self.addCleanup(posix.close, f)
819        self.assertEqual(
820            sorted(posix.listdir('.')),
821            sorted(posix.listdir(f))
822            )
823        # Check that the fd offset was reset (issue #13739)
824        self.assertEqual(
825            sorted(posix.listdir('.')),
826            sorted(posix.listdir(f))
827            )
828
829    @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')
830    def test_access(self):
831        self.assertTrue(posix.access(support.TESTFN, os.R_OK))
832
833    @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')
834    def test_umask(self):
835        old_mask = posix.umask(0)
836        self.assertIsInstance(old_mask, int)
837        posix.umask(old_mask)
838
839    @unittest.skipUnless(hasattr(posix, 'strerror'),
840                         'test needs posix.strerror()')
841    def test_strerror(self):
842        self.assertTrue(posix.strerror(0))
843
844    @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')
845    def test_pipe(self):
846        reader, writer = posix.pipe()
847        os.close(reader)
848        os.close(writer)
849
850    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
851    @support.requires_linux_version(2, 6, 27)
852    def test_pipe2(self):
853        self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
854        self.assertRaises(TypeError, os.pipe2, 0, 0)
855
856        # try calling with flags = 0, like os.pipe()
857        r, w = os.pipe2(0)
858        os.close(r)
859        os.close(w)
860
861        # test flags
862        r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
863        self.addCleanup(os.close, r)
864        self.addCleanup(os.close, w)
865        self.assertFalse(os.get_inheritable(r))
866        self.assertFalse(os.get_inheritable(w))
867        self.assertFalse(os.get_blocking(r))
868        self.assertFalse(os.get_blocking(w))
869        # try reading from an empty pipe: this should fail, not block
870        self.assertRaises(OSError, os.read, r, 1)
871        # try a write big enough to fill-up the pipe: this should either
872        # fail or perform a partial write, not block
873        try:
874            os.write(w, b'x' * support.PIPE_MAX_SIZE)
875        except OSError:
876            pass
877
878    @support.cpython_only
879    @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
880    @support.requires_linux_version(2, 6, 27)
881    def test_pipe2_c_limits(self):
882        # Issue 15989
883        import _testcapi
884        self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
885        self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
886
887    @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')
888    def test_utime(self):
889        now = time.time()
890        posix.utime(support.TESTFN, None)
891        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
892        self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
893        self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
894        posix.utime(support.TESTFN, (int(now), int(now)))
895        posix.utime(support.TESTFN, (now, now))
896
897    def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):
898        st = os.stat(target_file)
899        self.assertTrue(hasattr(st, 'st_flags'))
900
901        # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
902        flags = st.st_flags | stat.UF_IMMUTABLE
903        try:
904            chflags_func(target_file, flags, **kwargs)
905        except OSError as err:
906            if err.errno != errno.EOPNOTSUPP:
907                raise
908            msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
909            self.skipTest(msg)
910
911        try:
912            new_st = os.stat(target_file)
913            self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
914            try:
915                fd = open(target_file, 'w+')
916            except OSError as e:
917                self.assertEqual(e.errno, errno.EPERM)
918        finally:
919            posix.chflags(target_file, st.st_flags)
920
921    @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
922    def test_chflags(self):
923        self._test_chflags_regular_file(posix.chflags, support.TESTFN)
924
925    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
926    def test_lchflags_regular_file(self):
927        self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
928        self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)
929
930    @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
931    def test_lchflags_symlink(self):
932        testfn_st = os.stat(support.TESTFN)
933
934        self.assertTrue(hasattr(testfn_st, 'st_flags'))
935
936        os.symlink(support.TESTFN, _DUMMY_SYMLINK)
937        self.teardown_files.append(_DUMMY_SYMLINK)
938        dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
939
940        def chflags_nofollow(path, flags):
941            return posix.chflags(path, flags, follow_symlinks=False)
942
943        for fn in (posix.lchflags, chflags_nofollow):
944            # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.
945            flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE
946            try:
947                fn(_DUMMY_SYMLINK, flags)
948            except OSError as err:
949                if err.errno != errno.EOPNOTSUPP:
950                    raise
951                msg = 'chflag UF_IMMUTABLE not supported by underlying fs'
952                self.skipTest(msg)
953            try:
954                new_testfn_st = os.stat(support.TESTFN)
955                new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
956
957                self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
958                self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
959                                 new_dummy_symlink_st.st_flags)
960            finally:
961                fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
962
963    def test_environ(self):
964        if os.name == "nt":
965            item_type = str
966        else:
967            item_type = bytes
968        for k, v in posix.environ.items():
969            self.assertEqual(type(k), item_type)
970            self.assertEqual(type(v), item_type)
971
972    def test_putenv(self):
973        with self.assertRaises(ValueError):
974            os.putenv('FRUIT\0VEGETABLE', 'cabbage')
975        with self.assertRaises(ValueError):
976            os.putenv(b'FRUIT\0VEGETABLE', b'cabbage')
977        with self.assertRaises(ValueError):
978            os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage')
979        with self.assertRaises(ValueError):
980            os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage')
981        with self.assertRaises(ValueError):
982            os.putenv('FRUIT=ORANGE', 'lemon')
983        with self.assertRaises(ValueError):
984            os.putenv(b'FRUIT=ORANGE', b'lemon')
985
986    @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')
987    def test_getcwd_long_pathnames(self):
988        dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
989        curdir = os.getcwd()
990        base_path = os.path.abspath(support.TESTFN) + '.getcwd'
991
992        try:
993            os.mkdir(base_path)
994            os.chdir(base_path)
995        except:
996            #  Just returning nothing instead of the SkipTest exception, because
997            #  the test results in Error in that case.  Is that ok?
998            #  raise unittest.SkipTest("cannot create directory for testing")
999            return
1000
1001            def _create_and_do_getcwd(dirname, current_path_length = 0):
1002                try:
1003                    os.mkdir(dirname)
1004                except:
1005                    raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
1006
1007                os.chdir(dirname)
1008                try:
1009                    os.getcwd()
1010                    if current_path_length < 1027:
1011                        _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
1012                finally:
1013                    os.chdir('..')
1014                    os.rmdir(dirname)
1015
1016            _create_and_do_getcwd(dirname)
1017
1018        finally:
1019            os.chdir(curdir)
1020            support.rmtree(base_path)
1021
1022    @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
1023    @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
1024    @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
1025    def test_getgrouplist(self):
1026        user = pwd.getpwuid(os.getuid())[0]
1027        group = pwd.getpwuid(os.getuid())[3]
1028        self.assertIn(group, posix.getgrouplist(user, group))
1029
1030
1031    @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
1032    def test_getgroups(self):
1033        with os.popen('id -G 2>/dev/null') as idg:
1034            groups = idg.read().strip()
1035            ret = idg.close()
1036
1037        try:
1038            idg_groups = set(int(g) for g in groups.split())
1039        except ValueError:
1040            idg_groups = set()
1041        if ret is not None or not idg_groups:
1042            raise unittest.SkipTest("need working 'id -G'")
1043
1044        # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()
1045        if sys.platform == 'darwin':
1046            import sysconfig
1047            dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'
1048            if tuple(int(n) for n in str(dt).split('.')[0:2]) < (10, 6):
1049                raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")
1050
1051        # 'id -G' and 'os.getgroups()' should return the same
1052        # groups, ignoring order, duplicates, and the effective gid.
1053        # #10822/#26944 - It is implementation defined whether
1054        # posix.getgroups() includes the effective gid.
1055        symdiff = idg_groups.symmetric_difference(posix.getgroups())
1056        self.assertTrue(not symdiff or symdiff == {posix.getegid()})
1057
1058    # tests for the posix *at functions follow
1059
1060    @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
1061    def test_access_dir_fd(self):
1062        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1063        try:
1064            self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
1065        finally:
1066            posix.close(f)
1067
1068    @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
1069    def test_chmod_dir_fd(self):
1070        os.chmod(support.TESTFN, stat.S_IRUSR)
1071
1072        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1073        try:
1074            posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1075
1076            s = posix.stat(support.TESTFN)
1077            self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
1078        finally:
1079            posix.close(f)
1080
1081    @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
1082    def test_chown_dir_fd(self):
1083        support.unlink(support.TESTFN)
1084        support.create_empty_file(support.TESTFN)
1085
1086        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1087        try:
1088            posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
1089        finally:
1090            posix.close(f)
1091
1092    @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
1093    def test_stat_dir_fd(self):
1094        support.unlink(support.TESTFN)
1095        with open(support.TESTFN, 'w') as outfile:
1096            outfile.write("testline\n")
1097
1098        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1099        try:
1100            s1 = posix.stat(support.TESTFN)
1101            s2 = posix.stat(support.TESTFN, dir_fd=f)
1102            self.assertEqual(s1, s2)
1103            s2 = posix.stat(support.TESTFN, dir_fd=None)
1104            self.assertEqual(s1, s2)
1105            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1106                    posix.stat, support.TESTFN, dir_fd=posix.getcwd())
1107            self.assertRaisesRegex(TypeError, 'should be integer or None, not',
1108                    posix.stat, support.TESTFN, dir_fd=float(f))
1109            self.assertRaises(OverflowError,
1110                    posix.stat, support.TESTFN, dir_fd=10**20)
1111        finally:
1112            posix.close(f)
1113
1114    @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
1115    def test_utime_dir_fd(self):
1116        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1117        try:
1118            now = time.time()
1119            posix.utime(support.TESTFN, None, dir_fd=f)
1120            posix.utime(support.TESTFN, dir_fd=f)
1121            self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
1122            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
1123            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
1124            self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
1125            self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
1126            posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
1127            posix.utime(support.TESTFN, (now, now), dir_fd=f)
1128            posix.utime(support.TESTFN,
1129                    (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
1130            posix.utime(support.TESTFN, dir_fd=f,
1131                            times=(int(now), int((now - int(now)) * 1e9)))
1132
1133            # try dir_fd and follow_symlinks together
1134            if os.utime in os.supports_follow_symlinks:
1135                try:
1136                    posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
1137                except ValueError:
1138                    # whoops!  using both together not supported on this platform.
1139                    pass
1140
1141        finally:
1142            posix.close(f)
1143
1144    @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
1145    def test_link_dir_fd(self):
1146        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1147        try:
1148            posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
1149        except PermissionError as e:
1150            self.skipTest('posix.link(): %s' % e)
1151        else:
1152            # should have same inodes
1153            self.assertEqual(posix.stat(support.TESTFN)[1],
1154                posix.stat(support.TESTFN + 'link')[1])
1155        finally:
1156            posix.close(f)
1157            support.unlink(support.TESTFN + 'link')
1158
1159    @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
1160    def test_mkdir_dir_fd(self):
1161        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1162        try:
1163            posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
1164            posix.stat(support.TESTFN + 'dir') # should not raise exception
1165        finally:
1166            posix.close(f)
1167            support.rmtree(support.TESTFN + 'dir')
1168
1169    @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
1170                         "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
1171    def test_mknod_dir_fd(self):
1172        # Test using mknodat() to create a FIFO (the only use specified
1173        # by POSIX).
1174        support.unlink(support.TESTFN)
1175        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
1176        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1177        try:
1178            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
1179        except OSError as e:
1180            # Some old systems don't allow unprivileged users to use
1181            # mknod(), or only support creating device nodes.
1182            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES))
1183        else:
1184            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
1185        finally:
1186            posix.close(f)
1187
1188    @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
1189    def test_open_dir_fd(self):
1190        support.unlink(support.TESTFN)
1191        with open(support.TESTFN, 'w') as outfile:
1192            outfile.write("testline\n")
1193        a = posix.open(posix.getcwd(), posix.O_RDONLY)
1194        b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
1195        try:
1196            res = posix.read(b, 9).decode(encoding="utf-8")
1197            self.assertEqual("testline\n", res)
1198        finally:
1199            posix.close(a)
1200            posix.close(b)
1201
1202    @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
1203    def test_readlink_dir_fd(self):
1204        os.symlink(support.TESTFN, support.TESTFN + 'link')
1205        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1206        try:
1207            self.assertEqual(posix.readlink(support.TESTFN + 'link'),
1208                posix.readlink(support.TESTFN + 'link', dir_fd=f))
1209        finally:
1210            support.unlink(support.TESTFN + 'link')
1211            posix.close(f)
1212
1213    @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
1214    def test_rename_dir_fd(self):
1215        support.unlink(support.TESTFN)
1216        support.create_empty_file(support.TESTFN + 'ren')
1217        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1218        try:
1219            posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
1220        except:
1221            posix.rename(support.TESTFN + 'ren', support.TESTFN)
1222            raise
1223        else:
1224            posix.stat(support.TESTFN) # should not raise exception
1225        finally:
1226            posix.close(f)
1227
1228    @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal')
1229    @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result")
1230    def test_cld_xxxx_constants(self):
1231        os.CLD_EXITED
1232        os.CLD_KILLED
1233        os.CLD_DUMPED
1234        os.CLD_TRAPPED
1235        os.CLD_STOPPED
1236        os.CLD_CONTINUED
1237
1238    @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
1239    def test_symlink_dir_fd(self):
1240        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1241        try:
1242            posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
1243            self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
1244        finally:
1245            posix.close(f)
1246            support.unlink(support.TESTFN + 'link')
1247
1248    @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
1249    def test_unlink_dir_fd(self):
1250        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1251        support.create_empty_file(support.TESTFN + 'del')
1252        posix.stat(support.TESTFN + 'del') # should not raise exception
1253        try:
1254            posix.unlink(support.TESTFN + 'del', dir_fd=f)
1255        except:
1256            support.unlink(support.TESTFN + 'del')
1257            raise
1258        else:
1259            self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
1260        finally:
1261            posix.close(f)
1262
1263    @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
1264    def test_mkfifo_dir_fd(self):
1265        support.unlink(support.TESTFN)
1266        f = posix.open(posix.getcwd(), posix.O_RDONLY)
1267        try:
1268            try:
1269                posix.mkfifo(support.TESTFN,
1270                             stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
1271            except PermissionError as e:
1272                self.skipTest('posix.mkfifo(): %s' % e)
1273            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
1274        finally:
1275            posix.close(f)
1276
1277    requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
1278                                           "don't have scheduling support")
1279    requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
1280                                                  "don't have sched affinity support")
1281
1282    @requires_sched_h
1283    def test_sched_yield(self):
1284        # This has no error conditions (at least on Linux).
1285        posix.sched_yield()
1286
1287    @requires_sched_h
1288    @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
1289                         "requires sched_get_priority_max()")
1290    def test_sched_priority(self):
1291        # Round-robin usually has interesting priorities.
1292        pol = posix.SCHED_RR
1293        lo = posix.sched_get_priority_min(pol)
1294        hi = posix.sched_get_priority_max(pol)
1295        self.assertIsInstance(lo, int)
1296        self.assertIsInstance(hi, int)
1297        self.assertGreaterEqual(hi, lo)
1298        # OSX evidently just returns 15 without checking the argument.
1299        if sys.platform != "darwin":
1300            self.assertRaises(OSError, posix.sched_get_priority_min, -23)
1301            self.assertRaises(OSError, posix.sched_get_priority_max, -23)
1302
1303    @requires_sched
1304    def test_get_and_set_scheduler_and_param(self):
1305        possible_schedulers = [sched for name, sched in posix.__dict__.items()
1306                               if name.startswith("SCHED_")]
1307        mine = posix.sched_getscheduler(0)
1308        self.assertIn(mine, possible_schedulers)
1309        try:
1310            parent = posix.sched_getscheduler(os.getppid())
1311        except OSError as e:
1312            if e.errno != errno.EPERM:
1313                raise
1314        else:
1315            self.assertIn(parent, possible_schedulers)
1316        self.assertRaises(OSError, posix.sched_getscheduler, -1)
1317        self.assertRaises(OSError, posix.sched_getparam, -1)
1318        param = posix.sched_getparam(0)
1319        self.assertIsInstance(param.sched_priority, int)
1320
1321        # POSIX states that calling sched_setparam() or sched_setscheduler() on
1322        # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR
1323        # is implementation-defined: NetBSD and FreeBSD can return EINVAL.
1324        if not sys.platform.startswith(('freebsd', 'netbsd')):
1325            try:
1326                posix.sched_setscheduler(0, mine, param)
1327                posix.sched_setparam(0, param)
1328            except OSError as e:
1329                if e.errno != errno.EPERM:
1330                    raise
1331            self.assertRaises(OSError, posix.sched_setparam, -1, param)
1332
1333        self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
1334        self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
1335        self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
1336        param = posix.sched_param(None)
1337        self.assertRaises(TypeError, posix.sched_setparam, 0, param)
1338        large = 214748364700
1339        param = posix.sched_param(large)
1340        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1341        param = posix.sched_param(sched_priority=-large)
1342        self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
1343
1344    @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
1345    def test_sched_rr_get_interval(self):
1346        try:
1347            interval = posix.sched_rr_get_interval(0)
1348        except OSError as e:
1349            # This likely means that sched_rr_get_interval is only valid for
1350            # processes with the SCHED_RR scheduler in effect.
1351            if e.errno != errno.EINVAL:
1352                raise
1353            self.skipTest("only works on SCHED_RR processes")
1354        self.assertIsInstance(interval, float)
1355        # Reasonable constraints, I think.
1356        self.assertGreaterEqual(interval, 0.)
1357        self.assertLess(interval, 1.)
1358
1359    @requires_sched_affinity
1360    def test_sched_getaffinity(self):
1361        mask = posix.sched_getaffinity(0)
1362        self.assertIsInstance(mask, set)
1363        self.assertGreaterEqual(len(mask), 1)
1364        self.assertRaises(OSError, posix.sched_getaffinity, -1)
1365        for cpu in mask:
1366            self.assertIsInstance(cpu, int)
1367            self.assertGreaterEqual(cpu, 0)
1368            self.assertLess(cpu, 1 << 32)
1369
1370    @requires_sched_affinity
1371    def test_sched_setaffinity(self):
1372        mask = posix.sched_getaffinity(0)
1373        if len(mask) > 1:
1374            # Empty masks are forbidden
1375            mask.pop()
1376        posix.sched_setaffinity(0, mask)
1377        self.assertEqual(posix.sched_getaffinity(0), mask)
1378        self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
1379        self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
1380        self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X"))
1381        self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
1382        self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
1383
1384    def test_rtld_constants(self):
1385        # check presence of major RTLD_* constants
1386        posix.RTLD_LAZY
1387        posix.RTLD_NOW
1388        posix.RTLD_GLOBAL
1389        posix.RTLD_LOCAL
1390
1391    @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
1392                         "test needs an OS that reports file holes")
1393    def test_fs_holes(self):
1394        # Even if the filesystem doesn't report holes,
1395        # if the OS supports it the SEEK_* constants
1396        # will be defined and will have a consistent
1397        # behaviour:
1398        # os.SEEK_DATA = current position
1399        # os.SEEK_HOLE = end of file position
1400        with open(support.TESTFN, 'r+b') as fp:
1401            fp.write(b"hello")
1402            fp.flush()
1403            size = fp.tell()
1404            fno = fp.fileno()
1405            try :
1406                for i in range(size):
1407                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
1408                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
1409                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
1410                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
1411            except OSError :
1412                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
1413                # but it is not true.
1414                # For instance:
1415                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
1416                raise unittest.SkipTest("OSError raised!")
1417
1418    def test_path_error2(self):
1419        """
1420        Test functions that call path_error2(), providing two filenames in their exceptions.
1421        """
1422        for name in ("rename", "replace", "link"):
1423            function = getattr(os, name, None)
1424            if function is None:
1425                continue
1426
1427            for dst in ("noodly2", support.TESTFN):
1428                try:
1429                    function('doesnotexistfilename', dst)
1430                except OSError as e:
1431                    self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))
1432                    break
1433            else:
1434                self.fail("No valid path_error2() test for os." + name)
1435
1436    def test_path_with_null_character(self):
1437        fn = support.TESTFN
1438        fn_with_NUL = fn + '\0'
1439        self.addCleanup(support.unlink, fn)
1440        support.unlink(fn)
1441        fd = None
1442        try:
1443            with self.assertRaises(ValueError):
1444                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1445        finally:
1446            if fd is not None:
1447                os.close(fd)
1448        self.assertFalse(os.path.exists(fn))
1449        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1450        self.assertFalse(os.path.exists(fn))
1451        open(fn, 'wb').close()
1452        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1453
1454    def test_path_with_null_byte(self):
1455        fn = os.fsencode(support.TESTFN)
1456        fn_with_NUL = fn + b'\0'
1457        self.addCleanup(support.unlink, fn)
1458        support.unlink(fn)
1459        fd = None
1460        try:
1461            with self.assertRaises(ValueError):
1462                fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises
1463        finally:
1464            if fd is not None:
1465                os.close(fd)
1466        self.assertFalse(os.path.exists(fn))
1467        self.assertRaises(ValueError, os.mkdir, fn_with_NUL)
1468        self.assertFalse(os.path.exists(fn))
1469        open(fn, 'wb').close()
1470        self.assertRaises(ValueError, os.stat, fn_with_NUL)
1471
1472    @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable")
1473    def test_pidfd_open(self):
1474        with self.assertRaises(OSError) as cm:
1475            os.pidfd_open(-1)
1476        if cm.exception.errno == errno.ENOSYS:
1477            self.skipTest("system does not support pidfd_open")
1478        if isinstance(cm.exception, PermissionError):
1479            self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}")
1480        self.assertEqual(cm.exception.errno, errno.EINVAL)
1481        os.close(os.pidfd_open(os.getpid(), 0))
1482
1483class PosixGroupsTester(unittest.TestCase):
1484
1485    def setUp(self):
1486        if posix.getuid() != 0:
1487            raise unittest.SkipTest("not enough privileges")
1488        if not hasattr(posix, 'getgroups'):
1489            raise unittest.SkipTest("need posix.getgroups")
1490        if sys.platform == 'darwin':
1491            raise unittest.SkipTest("getgroups(2) is broken on OSX")
1492        self.saved_groups = posix.getgroups()
1493
1494    def tearDown(self):
1495        if hasattr(posix, 'setgroups'):
1496            posix.setgroups(self.saved_groups)
1497        elif hasattr(posix, 'initgroups'):
1498            name = pwd.getpwuid(posix.getuid()).pw_name
1499            posix.initgroups(name, self.saved_groups[0])
1500
1501    @unittest.skipUnless(hasattr(posix, 'initgroups'),
1502                         "test needs posix.initgroups()")
1503    def test_initgroups(self):
1504        # find missing group
1505
1506        g = max(self.saved_groups or [0]) + 1
1507        name = pwd.getpwuid(posix.getuid()).pw_name
1508        posix.initgroups(name, g)
1509        self.assertIn(g, posix.getgroups())
1510
1511    @unittest.skipUnless(hasattr(posix, 'setgroups'),
1512                         "test needs posix.setgroups()")
1513    def test_setgroups(self):
1514        for groups in [[0], list(range(16))]:
1515            posix.setgroups(groups)
1516            self.assertListEqual(groups, posix.getgroups())
1517
1518
1519class _PosixSpawnMixin:
1520    # Program which does nothing and exits with status 0 (success)
1521    NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass')
1522    spawn_func = None
1523
1524    def python_args(self, *args):
1525        # Disable site module to avoid side effects. For example,
1526        # on Fedora 28, if the HOME environment variable is not set,
1527        # site._getuserbase() calls pwd.getpwuid() which opens
1528        # /var/lib/sss/mc/passwd but then leaves the file open which makes
1529        # test_close_file() to fail.
1530        return (sys.executable, '-I', '-S', *args)
1531
1532    def test_returns_pid(self):
1533        pidfile = support.TESTFN
1534        self.addCleanup(support.unlink, pidfile)
1535        script = f"""if 1:
1536            import os
1537            with open({pidfile!r}, "w") as pidfile:
1538                pidfile.write(str(os.getpid()))
1539            """
1540        args = self.python_args('-c', script)
1541        pid = self.spawn_func(args[0], args, os.environ)
1542        support.wait_process(pid, exitcode=0)
1543        with open(pidfile) as f:
1544            self.assertEqual(f.read(), str(pid))
1545
1546    def test_no_such_executable(self):
1547        no_such_executable = 'no_such_executable'
1548        try:
1549            pid = self.spawn_func(no_such_executable,
1550                                  [no_such_executable],
1551                                  os.environ)
1552        # bpo-35794: PermissionError can be raised if there are
1553        # directories in the $PATH that are not accessible.
1554        except (FileNotFoundError, PermissionError) as exc:
1555            self.assertEqual(exc.filename, no_such_executable)
1556        else:
1557            pid2, status = os.waitpid(pid, 0)
1558            self.assertEqual(pid2, pid)
1559            self.assertNotEqual(status, 0)
1560
1561    def test_specify_environment(self):
1562        envfile = support.TESTFN
1563        self.addCleanup(support.unlink, envfile)
1564        script = f"""if 1:
1565            import os
1566            with open({envfile!r}, "w") as envfile:
1567                envfile.write(os.environ['foo'])
1568        """
1569        args = self.python_args('-c', script)
1570        pid = self.spawn_func(args[0], args,
1571                              {**os.environ, 'foo': 'bar'})
1572        support.wait_process(pid, exitcode=0)
1573        with open(envfile) as f:
1574            self.assertEqual(f.read(), 'bar')
1575
1576    def test_none_file_actions(self):
1577        pid = self.spawn_func(
1578            self.NOOP_PROGRAM[0],
1579            self.NOOP_PROGRAM,
1580            os.environ,
1581            file_actions=None
1582        )
1583        support.wait_process(pid, exitcode=0)
1584
1585    def test_empty_file_actions(self):
1586        pid = self.spawn_func(
1587            self.NOOP_PROGRAM[0],
1588            self.NOOP_PROGRAM,
1589            os.environ,
1590            file_actions=[]
1591        )
1592        support.wait_process(pid, exitcode=0)
1593
1594    def test_resetids_explicit_default(self):
1595        pid = self.spawn_func(
1596            sys.executable,
1597            [sys.executable, '-c', 'pass'],
1598            os.environ,
1599            resetids=False
1600        )
1601        support.wait_process(pid, exitcode=0)
1602
1603    def test_resetids(self):
1604        pid = self.spawn_func(
1605            sys.executable,
1606            [sys.executable, '-c', 'pass'],
1607            os.environ,
1608            resetids=True
1609        )
1610        support.wait_process(pid, exitcode=0)
1611
1612    def test_resetids_wrong_type(self):
1613        with self.assertRaises(TypeError):
1614            self.spawn_func(sys.executable,
1615                            [sys.executable, "-c", "pass"],
1616                            os.environ, resetids=None)
1617
1618    def test_setpgroup(self):
1619        pid = self.spawn_func(
1620            sys.executable,
1621            [sys.executable, '-c', 'pass'],
1622            os.environ,
1623            setpgroup=os.getpgrp()
1624        )
1625        support.wait_process(pid, exitcode=0)
1626
1627    def test_setpgroup_wrong_type(self):
1628        with self.assertRaises(TypeError):
1629            self.spawn_func(sys.executable,
1630                            [sys.executable, "-c", "pass"],
1631                            os.environ, setpgroup="023")
1632
1633    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1634                           'need signal.pthread_sigmask()')
1635    def test_setsigmask(self):
1636        code = textwrap.dedent("""\
1637            import signal
1638            signal.raise_signal(signal.SIGUSR1)""")
1639
1640        pid = self.spawn_func(
1641            sys.executable,
1642            [sys.executable, '-c', code],
1643            os.environ,
1644            setsigmask=[signal.SIGUSR1]
1645        )
1646        support.wait_process(pid, exitcode=0)
1647
1648    def test_setsigmask_wrong_type(self):
1649        with self.assertRaises(TypeError):
1650            self.spawn_func(sys.executable,
1651                            [sys.executable, "-c", "pass"],
1652                            os.environ, setsigmask=34)
1653        with self.assertRaises(TypeError):
1654            self.spawn_func(sys.executable,
1655                            [sys.executable, "-c", "pass"],
1656                            os.environ, setsigmask=["j"])
1657        with self.assertRaises(ValueError):
1658            self.spawn_func(sys.executable,
1659                            [sys.executable, "-c", "pass"],
1660                            os.environ, setsigmask=[signal.NSIG,
1661                                                    signal.NSIG+1])
1662
1663    def test_setsid(self):
1664        rfd, wfd = os.pipe()
1665        self.addCleanup(os.close, rfd)
1666        try:
1667            os.set_inheritable(wfd, True)
1668
1669            code = textwrap.dedent(f"""
1670                import os
1671                fd = {wfd}
1672                sid = os.getsid(0)
1673                os.write(fd, str(sid).encode())
1674            """)
1675
1676            try:
1677                pid = self.spawn_func(sys.executable,
1678                                      [sys.executable, "-c", code],
1679                                      os.environ, setsid=True)
1680            except NotImplementedError as exc:
1681                self.skipTest(f"setsid is not supported: {exc!r}")
1682            except PermissionError as exc:
1683                self.skipTest(f"setsid failed with: {exc!r}")
1684        finally:
1685            os.close(wfd)
1686
1687        support.wait_process(pid, exitcode=0)
1688
1689        output = os.read(rfd, 100)
1690        child_sid = int(output)
1691        parent_sid = os.getsid(os.getpid())
1692        self.assertNotEqual(parent_sid, child_sid)
1693
1694    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1695                         'need signal.pthread_sigmask()')
1696    def test_setsigdef(self):
1697        original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN)
1698        code = textwrap.dedent("""\
1699            import signal
1700            signal.raise_signal(signal.SIGUSR1)""")
1701        try:
1702            pid = self.spawn_func(
1703                sys.executable,
1704                [sys.executable, '-c', code],
1705                os.environ,
1706                setsigdef=[signal.SIGUSR1]
1707            )
1708        finally:
1709            signal.signal(signal.SIGUSR1, original_handler)
1710
1711        support.wait_process(pid, exitcode=-signal.SIGUSR1)
1712
1713    def test_setsigdef_wrong_type(self):
1714        with self.assertRaises(TypeError):
1715            self.spawn_func(sys.executable,
1716                            [sys.executable, "-c", "pass"],
1717                            os.environ, setsigdef=34)
1718        with self.assertRaises(TypeError):
1719            self.spawn_func(sys.executable,
1720                            [sys.executable, "-c", "pass"],
1721                            os.environ, setsigdef=["j"])
1722        with self.assertRaises(ValueError):
1723            self.spawn_func(sys.executable,
1724                            [sys.executable, "-c", "pass"],
1725                            os.environ, setsigdef=[signal.NSIG, signal.NSIG+1])
1726
1727    @requires_sched
1728    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1729                     "bpo-34685: test can fail on BSD")
1730    def test_setscheduler_only_param(self):
1731        policy = os.sched_getscheduler(0)
1732        priority = os.sched_get_priority_min(policy)
1733        code = textwrap.dedent(f"""\
1734            import os, sys
1735            if os.sched_getscheduler(0) != {policy}:
1736                sys.exit(101)
1737            if os.sched_getparam(0).sched_priority != {priority}:
1738                sys.exit(102)""")
1739        pid = self.spawn_func(
1740            sys.executable,
1741            [sys.executable, '-c', code],
1742            os.environ,
1743            scheduler=(None, os.sched_param(priority))
1744        )
1745        support.wait_process(pid, exitcode=0)
1746
1747    @requires_sched
1748    @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')),
1749                     "bpo-34685: test can fail on BSD")
1750    def test_setscheduler_with_policy(self):
1751        policy = os.sched_getscheduler(0)
1752        priority = os.sched_get_priority_min(policy)
1753        code = textwrap.dedent(f"""\
1754            import os, sys
1755            if os.sched_getscheduler(0) != {policy}:
1756                sys.exit(101)
1757            if os.sched_getparam(0).sched_priority != {priority}:
1758                sys.exit(102)""")
1759        pid = self.spawn_func(
1760            sys.executable,
1761            [sys.executable, '-c', code],
1762            os.environ,
1763            scheduler=(policy, os.sched_param(priority))
1764        )
1765        support.wait_process(pid, exitcode=0)
1766
1767    def test_multiple_file_actions(self):
1768        file_actions = [
1769            (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0),
1770            (os.POSIX_SPAWN_CLOSE, 0),
1771            (os.POSIX_SPAWN_DUP2, 1, 4),
1772        ]
1773        pid = self.spawn_func(self.NOOP_PROGRAM[0],
1774                              self.NOOP_PROGRAM,
1775                              os.environ,
1776                              file_actions=file_actions)
1777        support.wait_process(pid, exitcode=0)
1778
1779    def test_bad_file_actions(self):
1780        args = self.NOOP_PROGRAM
1781        with self.assertRaises(TypeError):
1782            self.spawn_func(args[0], args, os.environ,
1783                            file_actions=[None])
1784        with self.assertRaises(TypeError):
1785            self.spawn_func(args[0], args, os.environ,
1786                            file_actions=[()])
1787        with self.assertRaises(TypeError):
1788            self.spawn_func(args[0], args, os.environ,
1789                            file_actions=[(None,)])
1790        with self.assertRaises(TypeError):
1791            self.spawn_func(args[0], args, os.environ,
1792                            file_actions=[(12345,)])
1793        with self.assertRaises(TypeError):
1794            self.spawn_func(args[0], args, os.environ,
1795                            file_actions=[(os.POSIX_SPAWN_CLOSE,)])
1796        with self.assertRaises(TypeError):
1797            self.spawn_func(args[0], args, os.environ,
1798                            file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)])
1799        with self.assertRaises(TypeError):
1800            self.spawn_func(args[0], args, os.environ,
1801                            file_actions=[(os.POSIX_SPAWN_CLOSE, None)])
1802        with self.assertRaises(ValueError):
1803            self.spawn_func(args[0], args, os.environ,
1804                            file_actions=[(os.POSIX_SPAWN_OPEN,
1805                                           3, __file__ + '\0',
1806                                           os.O_RDONLY, 0)])
1807
1808    def test_open_file(self):
1809        outfile = support.TESTFN
1810        self.addCleanup(support.unlink, outfile)
1811        script = """if 1:
1812            import sys
1813            sys.stdout.write("hello")
1814            """
1815        file_actions = [
1816            (os.POSIX_SPAWN_OPEN, 1, outfile,
1817                os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
1818                stat.S_IRUSR | stat.S_IWUSR),
1819        ]
1820        args = self.python_args('-c', script)
1821        pid = self.spawn_func(args[0], args, os.environ,
1822                              file_actions=file_actions)
1823
1824        support.wait_process(pid, exitcode=0)
1825        with open(outfile) as f:
1826            self.assertEqual(f.read(), 'hello')
1827
1828    def test_close_file(self):
1829        closefile = support.TESTFN
1830        self.addCleanup(support.unlink, closefile)
1831        script = f"""if 1:
1832            import os
1833            try:
1834                os.fstat(0)
1835            except OSError as e:
1836                with open({closefile!r}, 'w') as closefile:
1837                    closefile.write('is closed %d' % e.errno)
1838            """
1839        args = self.python_args('-c', script)
1840        pid = self.spawn_func(args[0], args, os.environ,
1841                              file_actions=[(os.POSIX_SPAWN_CLOSE, 0)])
1842
1843        support.wait_process(pid, exitcode=0)
1844        with open(closefile) as f:
1845            self.assertEqual(f.read(), 'is closed %d' % errno.EBADF)
1846
1847    def test_dup2(self):
1848        dupfile = support.TESTFN
1849        self.addCleanup(support.unlink, dupfile)
1850        script = """if 1:
1851            import sys
1852            sys.stdout.write("hello")
1853            """
1854        with open(dupfile, "wb") as childfile:
1855            file_actions = [
1856                (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1),
1857            ]
1858            args = self.python_args('-c', script)
1859            pid = self.spawn_func(args[0], args, os.environ,
1860                                  file_actions=file_actions)
1861            support.wait_process(pid, exitcode=0)
1862        with open(dupfile) as f:
1863            self.assertEqual(f.read(), 'hello')
1864
1865
1866@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn")
1867class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin):
1868    spawn_func = getattr(posix, 'posix_spawn', None)
1869
1870
1871@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp")
1872class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin):
1873    spawn_func = getattr(posix, 'posix_spawnp', None)
1874
1875    @support.skip_unless_symlink
1876    def test_posix_spawnp(self):
1877        # Use a symlink to create a program in its own temporary directory
1878        temp_dir = tempfile.mkdtemp()
1879        self.addCleanup(support.rmtree, temp_dir)
1880
1881        program = 'posix_spawnp_test_program.exe'
1882        program_fullpath = os.path.join(temp_dir, program)
1883        os.symlink(sys.executable, program_fullpath)
1884
1885        try:
1886            path = os.pathsep.join((temp_dir, os.environ['PATH']))
1887        except KeyError:
1888            path = temp_dir   # PATH is not set
1889
1890        spawn_args = (program, '-I', '-S', '-c', 'pass')
1891        code = textwrap.dedent("""
1892            import os
1893            from test import support
1894
1895            args = %a
1896            pid = os.posix_spawnp(args[0], args, os.environ)
1897
1898            support.wait_process(pid, exitcode=0)
1899        """ % (spawn_args,))
1900
1901        # Use a subprocess to test os.posix_spawnp() with a modified PATH
1902        # environment variable: posix_spawnp() uses the current environment
1903        # to locate the program, not its environment argument.
1904        args = ('-c', code)
1905        assert_python_ok(*args, PATH=path)
1906
1907
1908@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS")
1909class TestPosixWeaklinking(unittest.TestCase):
1910    # These test cases verify that weak linking support on macOS works
1911    # as expected. These cases only test new behaviour introduced by weak linking,
1912    # regular behaviour is tested by the normal test cases.
1913    #
1914    # See the section on Weak Linking in Mac/README.txt for more information.
1915    def setUp(self):
1916        import sysconfig
1917        import platform
1918
1919        config_vars = sysconfig.get_config_vars()
1920        self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] }
1921        self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split("."))
1922
1923    def _verify_available(self, name):
1924        if name not in self.available:
1925            raise unittest.SkipTest(f"{name} not weak-linked")
1926
1927    def test_pwritev(self):
1928        self._verify_available("HAVE_PWRITEV")
1929        if self.mac_ver >= (10, 16):
1930            self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available")
1931            self.assertTrue(hasattr(os, "preadv"), "os.readv is not available")
1932
1933        else:
1934            self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available")
1935            self.assertFalse(hasattr(os, "preadv"), "os.readv is available")
1936
1937    def test_stat(self):
1938        self._verify_available("HAVE_FSTATAT")
1939        if self.mac_ver >= (10, 10):
1940            self.assertIn("HAVE_FSTATAT", posix._have_functions)
1941
1942        else:
1943            self.assertNotIn("HAVE_FSTATAT", posix._have_functions)
1944
1945            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1946                os.stat("file", dir_fd=0)
1947
1948    def test_access(self):
1949        self._verify_available("HAVE_FACCESSAT")
1950        if self.mac_ver >= (10, 10):
1951            self.assertIn("HAVE_FACCESSAT", posix._have_functions)
1952
1953        else:
1954            self.assertNotIn("HAVE_FACCESSAT", posix._have_functions)
1955
1956            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1957                os.access("file", os.R_OK, dir_fd=0)
1958
1959            with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"):
1960                os.access("file", os.R_OK, follow_symlinks=False)
1961
1962            with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"):
1963                os.access("file", os.R_OK, effective_ids=True)
1964
1965    def test_chmod(self):
1966        self._verify_available("HAVE_FCHMODAT")
1967        if self.mac_ver >= (10, 10):
1968            self.assertIn("HAVE_FCHMODAT", posix._have_functions)
1969
1970        else:
1971            self.assertNotIn("HAVE_FCHMODAT", posix._have_functions)
1972            self.assertIn("HAVE_LCHMOD", posix._have_functions)
1973
1974            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1975                os.chmod("file", 0o644, dir_fd=0)
1976
1977    def test_chown(self):
1978        self._verify_available("HAVE_FCHOWNAT")
1979        if self.mac_ver >= (10, 10):
1980            self.assertIn("HAVE_FCHOWNAT", posix._have_functions)
1981
1982        else:
1983            self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions)
1984            self.assertIn("HAVE_LCHOWN", posix._have_functions)
1985
1986            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
1987                os.chown("file", 0, 0, dir_fd=0)
1988
1989    def test_link(self):
1990        self._verify_available("HAVE_LINKAT")
1991        if self.mac_ver >= (10, 10):
1992            self.assertIn("HAVE_LINKAT", posix._have_functions)
1993
1994        else:
1995            self.assertNotIn("HAVE_LINKAT", posix._have_functions)
1996
1997            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
1998                os.link("source", "target",  src_dir_fd=0)
1999
2000            with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"):
2001                os.link("source", "target",  dst_dir_fd=0)
2002
2003            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"):
2004                os.link("source", "target",  src_dir_fd=0, dst_dir_fd=0)
2005
2006            # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag
2007            with support.temp_dir() as base_path:
2008                link_path = os.path.join(base_path, "link")
2009                target_path = os.path.join(base_path, "target")
2010                source_path = os.path.join(base_path, "source")
2011
2012                with open(source_path, "w") as fp:
2013                    fp.write("data")
2014
2015                os.symlink("target", link_path)
2016
2017                # Calling os.link should fail in the link(2) call, and
2018                # should not reject *follow_symlinks* (to match the
2019                # behaviour you'd get when building on a platform without
2020                # linkat)
2021                with self.assertRaises(FileExistsError):
2022                    os.link(source_path, link_path, follow_symlinks=True)
2023
2024                with self.assertRaises(FileExistsError):
2025                    os.link(source_path, link_path, follow_symlinks=False)
2026
2027
2028    def test_listdir_scandir(self):
2029        self._verify_available("HAVE_FDOPENDIR")
2030        if self.mac_ver >= (10, 10):
2031            self.assertIn("HAVE_FDOPENDIR", posix._have_functions)
2032
2033        else:
2034            self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions)
2035
2036            with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"):
2037                os.listdir(0)
2038
2039            with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"):
2040                os.scandir(0)
2041
2042    def test_mkdir(self):
2043        self._verify_available("HAVE_MKDIRAT")
2044        if self.mac_ver >= (10, 10):
2045            self.assertIn("HAVE_MKDIRAT", posix._have_functions)
2046
2047        else:
2048            self.assertNotIn("HAVE_MKDIRAT", posix._have_functions)
2049
2050            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2051                os.mkdir("dir", dir_fd=0)
2052
2053    def test_rename_replace(self):
2054        self._verify_available("HAVE_RENAMEAT")
2055        if self.mac_ver >= (10, 10):
2056            self.assertIn("HAVE_RENAMEAT", posix._have_functions)
2057
2058        else:
2059            self.assertNotIn("HAVE_RENAMEAT", posix._have_functions)
2060
2061            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2062                os.rename("a", "b", src_dir_fd=0)
2063
2064            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2065                os.rename("a", "b", dst_dir_fd=0)
2066
2067            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2068                os.replace("a", "b", src_dir_fd=0)
2069
2070            with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"):
2071                os.replace("a", "b", dst_dir_fd=0)
2072
2073    def test_unlink_rmdir(self):
2074        self._verify_available("HAVE_UNLINKAT")
2075        if self.mac_ver >= (10, 10):
2076            self.assertIn("HAVE_UNLINKAT", posix._have_functions)
2077
2078        else:
2079            self.assertNotIn("HAVE_UNLINKAT", posix._have_functions)
2080
2081            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2082                os.unlink("path", dir_fd=0)
2083
2084            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2085                os.rmdir("path", dir_fd=0)
2086
2087    def test_open(self):
2088        self._verify_available("HAVE_OPENAT")
2089        if self.mac_ver >= (10, 10):
2090            self.assertIn("HAVE_OPENAT", posix._have_functions)
2091
2092        else:
2093            self.assertNotIn("HAVE_OPENAT", posix._have_functions)
2094
2095            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2096                os.open("path", os.O_RDONLY, dir_fd=0)
2097
2098    def test_readlink(self):
2099        self._verify_available("HAVE_READLINKAT")
2100        if self.mac_ver >= (10, 10):
2101            self.assertIn("HAVE_READLINKAT", posix._have_functions)
2102
2103        else:
2104            self.assertNotIn("HAVE_READLINKAT", posix._have_functions)
2105
2106            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2107                os.readlink("path",  dir_fd=0)
2108
2109    def test_symlink(self):
2110        self._verify_available("HAVE_SYMLINKAT")
2111        if self.mac_ver >= (10, 10):
2112            self.assertIn("HAVE_SYMLINKAT", posix._have_functions)
2113
2114        else:
2115            self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions)
2116
2117            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2118                os.symlink("a", "b",  dir_fd=0)
2119
2120    def test_utime(self):
2121        self._verify_available("HAVE_FUTIMENS")
2122        self._verify_available("HAVE_UTIMENSAT")
2123        if self.mac_ver >= (10, 13):
2124            self.assertIn("HAVE_FUTIMENS", posix._have_functions)
2125            self.assertIn("HAVE_UTIMENSAT", posix._have_functions)
2126
2127        else:
2128            self.assertNotIn("HAVE_FUTIMENS", posix._have_functions)
2129            self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions)
2130
2131            with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"):
2132                os.utime("path", dir_fd=0)
2133
2134
2135def test_main():
2136    try:
2137        support.run_unittest(
2138            PosixTester,
2139            PosixGroupsTester,
2140            TestPosixSpawn,
2141            TestPosixSpawnP,
2142            TestPosixWeaklinking
2143        )
2144    finally:
2145        support.reap_children()
2146
2147if __name__ == '__main__':
2148    test_main()
2149