1"Test posix functions" 2 3from test import support 4from test.support.script_helper import assert_python_ok 5 6# Skip these tests if there is no posix module. 7posix = support.import_module('posix') 8 9import errno 10import sys 11import signal 12import time 13import os 14import platform 15import pwd 16import stat 17import tempfile 18import unittest 19import warnings 20import textwrap 21 22_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(), 23 support.TESTFN + '-dummy-symlink') 24 25requires_32b = unittest.skipUnless(sys.maxsize < 2**32, 26 'test is only meaningful on 32-bit builds') 27 28def _supports_sched(): 29 if not hasattr(posix, 'sched_getscheduler'): 30 return False 31 try: 32 posix.sched_getscheduler(0) 33 except OSError as e: 34 if e.errno == errno.ENOSYS: 35 return False 36 return True 37 38requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API') 39 40 41class PosixTester(unittest.TestCase): 42 43 def setUp(self): 44 # create empty file 45 fp = open(support.TESTFN, 'w+') 46 fp.close() 47 self.teardown_files = [ support.TESTFN ] 48 self._warnings_manager = support.check_warnings() 49 self._warnings_manager.__enter__() 50 warnings.filterwarnings('ignore', '.* potential security risk .*', 51 RuntimeWarning) 52 53 def tearDown(self): 54 for teardown_file in self.teardown_files: 55 support.unlink(teardown_file) 56 self._warnings_manager.__exit__(None, None, None) 57 58 def testNoArgFunctions(self): 59 # test posix functions which take no arguments and have 60 # no side-effects which we need to cleanup (e.g., fork, wait, abort) 61 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname", 62 "times", "getloadavg", 63 "getegid", "geteuid", "getgid", "getgroups", 64 "getpid", "getpgrp", "getppid", "getuid", "sync", 65 ] 66 67 for name in NO_ARG_FUNCTIONS: 68 posix_func = getattr(posix, name, None) 69 if posix_func is not None: 70 posix_func() 71 self.assertRaises(TypeError, posix_func, 1) 72 73 @unittest.skipUnless(hasattr(posix, 'getresuid'), 74 'test needs posix.getresuid()') 75 def test_getresuid(self): 76 user_ids = posix.getresuid() 77 self.assertEqual(len(user_ids), 3) 78 for val in user_ids: 79 self.assertGreaterEqual(val, 0) 80 81 @unittest.skipUnless(hasattr(posix, 'getresgid'), 82 'test needs posix.getresgid()') 83 def test_getresgid(self): 84 group_ids = posix.getresgid() 85 self.assertEqual(len(group_ids), 3) 86 for val in group_ids: 87 self.assertGreaterEqual(val, 0) 88 89 @unittest.skipUnless(hasattr(posix, 'setresuid'), 90 'test needs posix.setresuid()') 91 def test_setresuid(self): 92 current_user_ids = posix.getresuid() 93 self.assertIsNone(posix.setresuid(*current_user_ids)) 94 # -1 means don't change that value. 95 self.assertIsNone(posix.setresuid(-1, -1, -1)) 96 97 @unittest.skipUnless(hasattr(posix, 'setresuid'), 98 'test needs posix.setresuid()') 99 def test_setresuid_exception(self): 100 # Don't do this test if someone is silly enough to run us as root. 101 current_user_ids = posix.getresuid() 102 if 0 not in current_user_ids: 103 new_user_ids = (current_user_ids[0]+1, -1, -1) 104 self.assertRaises(OSError, posix.setresuid, *new_user_ids) 105 106 @unittest.skipUnless(hasattr(posix, 'setresgid'), 107 'test needs posix.setresgid()') 108 def test_setresgid(self): 109 current_group_ids = posix.getresgid() 110 self.assertIsNone(posix.setresgid(*current_group_ids)) 111 # -1 means don't change that value. 112 self.assertIsNone(posix.setresgid(-1, -1, -1)) 113 114 @unittest.skipUnless(hasattr(posix, 'setresgid'), 115 'test needs posix.setresgid()') 116 def test_setresgid_exception(self): 117 # Don't do this test if someone is silly enough to run us as root. 118 current_group_ids = posix.getresgid() 119 if 0 not in current_group_ids: 120 new_group_ids = (current_group_ids[0]+1, -1, -1) 121 self.assertRaises(OSError, posix.setresgid, *new_group_ids) 122 123 @unittest.skipUnless(hasattr(posix, 'initgroups'), 124 "test needs os.initgroups()") 125 def test_initgroups(self): 126 # It takes a string and an integer; check that it raises a TypeError 127 # for other argument lists. 128 self.assertRaises(TypeError, posix.initgroups) 129 self.assertRaises(TypeError, posix.initgroups, None) 130 self.assertRaises(TypeError, posix.initgroups, 3, "foo") 131 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) 132 133 # If a non-privileged user invokes it, it should fail with OSError 134 # EPERM. 135 if os.getuid() != 0: 136 try: 137 name = pwd.getpwuid(posix.getuid()).pw_name 138 except KeyError: 139 # the current UID may not have a pwd entry 140 raise unittest.SkipTest("need a pwd entry") 141 try: 142 posix.initgroups(name, 13) 143 except OSError as e: 144 self.assertEqual(e.errno, errno.EPERM) 145 else: 146 self.fail("Expected OSError to be raised by initgroups") 147 148 @unittest.skipUnless(hasattr(posix, 'statvfs'), 149 'test needs posix.statvfs()') 150 def test_statvfs(self): 151 self.assertTrue(posix.statvfs(os.curdir)) 152 153 @unittest.skipUnless(hasattr(posix, 'fstatvfs'), 154 'test needs posix.fstatvfs()') 155 def test_fstatvfs(self): 156 fp = open(support.TESTFN) 157 try: 158 self.assertTrue(posix.fstatvfs(fp.fileno())) 159 self.assertTrue(posix.statvfs(fp.fileno())) 160 finally: 161 fp.close() 162 163 @unittest.skipUnless(hasattr(posix, 'ftruncate'), 164 'test needs posix.ftruncate()') 165 def test_ftruncate(self): 166 fp = open(support.TESTFN, 'w+') 167 try: 168 # we need to have some data to truncate 169 fp.write('test') 170 fp.flush() 171 posix.ftruncate(fp.fileno(), 0) 172 finally: 173 fp.close() 174 175 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()") 176 def test_truncate(self): 177 with open(support.TESTFN, 'w') as fp: 178 fp.write('test') 179 fp.flush() 180 posix.truncate(support.TESTFN, 0) 181 182 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter") 183 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 184 def test_fexecve(self): 185 fp = os.open(sys.executable, os.O_RDONLY) 186 try: 187 pid = os.fork() 188 if pid == 0: 189 os.chdir(os.path.split(sys.executable)[0]) 190 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ) 191 else: 192 support.wait_process(pid, exitcode=0) 193 finally: 194 os.close(fp) 195 196 197 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()") 198 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 199 def test_waitid(self): 200 pid = os.fork() 201 if pid == 0: 202 os.chdir(os.path.split(sys.executable)[0]) 203 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ) 204 else: 205 res = posix.waitid(posix.P_PID, pid, posix.WEXITED) 206 self.assertEqual(pid, res.si_pid) 207 208 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 209 def test_register_at_fork(self): 210 with self.assertRaises(TypeError, msg="Positional args not allowed"): 211 os.register_at_fork(lambda: None) 212 with self.assertRaises(TypeError, msg="Args must be callable"): 213 os.register_at_fork(before=2) 214 with self.assertRaises(TypeError, msg="Args must be callable"): 215 os.register_at_fork(after_in_child="three") 216 with self.assertRaises(TypeError, msg="Args must be callable"): 217 os.register_at_fork(after_in_parent=b"Five") 218 with self.assertRaises(TypeError, msg="Args must not be None"): 219 os.register_at_fork(before=None) 220 with self.assertRaises(TypeError, msg="Args must not be None"): 221 os.register_at_fork(after_in_child=None) 222 with self.assertRaises(TypeError, msg="Args must not be None"): 223 os.register_at_fork(after_in_parent=None) 224 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 225 # Ensure a combination of valid and invalid is an error. 226 os.register_at_fork(before=None, after_in_parent=lambda: 3) 227 with self.assertRaises(TypeError, msg="Invalid arg was allowed"): 228 # Ensure a combination of valid and invalid is an error. 229 os.register_at_fork(before=lambda: None, after_in_child='') 230 # We test actual registrations in their own process so as not to 231 # pollute this one. There is no way to unregister for cleanup. 232 code = """if 1: 233 import os 234 235 r, w = os.pipe() 236 fin_r, fin_w = os.pipe() 237 238 os.register_at_fork(before=lambda: os.write(w, b'A')) 239 os.register_at_fork(after_in_parent=lambda: os.write(w, b'C')) 240 os.register_at_fork(after_in_child=lambda: os.write(w, b'E')) 241 os.register_at_fork(before=lambda: os.write(w, b'B'), 242 after_in_parent=lambda: os.write(w, b'D'), 243 after_in_child=lambda: os.write(w, b'F')) 244 245 pid = os.fork() 246 if pid == 0: 247 # At this point, after-forkers have already been executed 248 os.close(w) 249 # Wait for parent to tell us to exit 250 os.read(fin_r, 1) 251 os._exit(0) 252 else: 253 try: 254 os.close(w) 255 with open(r, "rb") as f: 256 data = f.read() 257 assert len(data) == 6, data 258 # Check before-fork callbacks 259 assert data[:2] == b'BA', data 260 # Check after-fork callbacks 261 assert sorted(data[2:]) == list(b'CDEF'), data 262 assert data.index(b'C') < data.index(b'D'), data 263 assert data.index(b'E') < data.index(b'F'), data 264 finally: 265 os.write(fin_w, b'!') 266 """ 267 assert_python_ok('-c', code) 268 269 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()") 270 def test_lockf(self): 271 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 272 try: 273 os.write(fd, b'test') 274 os.lseek(fd, 0, os.SEEK_SET) 275 posix.lockf(fd, posix.F_LOCK, 4) 276 # section is locked 277 posix.lockf(fd, posix.F_ULOCK, 4) 278 finally: 279 os.close(fd) 280 281 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()") 282 def test_pread(self): 283 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 284 try: 285 os.write(fd, b'test') 286 os.lseek(fd, 0, os.SEEK_SET) 287 self.assertEqual(b'es', posix.pread(fd, 2, 1)) 288 # the first pread() shouldn't disturb the file offset 289 self.assertEqual(b'te', posix.read(fd, 2)) 290 finally: 291 os.close(fd) 292 293 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 294 def test_preadv(self): 295 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 296 try: 297 os.write(fd, b'test1tt2t3t5t6t6t8') 298 buf = [bytearray(i) for i in [5, 3, 2]] 299 self.assertEqual(posix.preadv(fd, buf, 3), 10) 300 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 301 finally: 302 os.close(fd) 303 304 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 305 @unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI") 306 def test_preadv_flags(self): 307 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 308 try: 309 os.write(fd, b'test1tt2t3t5t6t6t8') 310 buf = [bytearray(i) for i in [5, 3, 2]] 311 self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10) 312 self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf)) 313 except NotImplementedError: 314 self.skipTest("preadv2 not available") 315 except OSError as inst: 316 # Is possible that the macro RWF_HIPRI was defined at compilation time 317 # but the option is not supported by the kernel or the runtime libc shared 318 # library. 319 if inst.errno in {errno.EINVAL, errno.ENOTSUP}: 320 raise unittest.SkipTest("RWF_HIPRI is not supported by the current system") 321 else: 322 raise 323 finally: 324 os.close(fd) 325 326 @unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()") 327 @requires_32b 328 def test_preadv_overflow_32bits(self): 329 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 330 try: 331 buf = [bytearray(2**16)] * 2**15 332 with self.assertRaises(OSError) as cm: 333 os.preadv(fd, buf, 0) 334 self.assertEqual(cm.exception.errno, errno.EINVAL) 335 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 336 finally: 337 os.close(fd) 338 339 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()") 340 def test_pwrite(self): 341 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 342 try: 343 os.write(fd, b'test') 344 os.lseek(fd, 0, os.SEEK_SET) 345 posix.pwrite(fd, b'xx', 1) 346 self.assertEqual(b'txxt', posix.read(fd, 4)) 347 finally: 348 os.close(fd) 349 350 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 351 def test_pwritev(self): 352 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 353 try: 354 os.write(fd, b"xx") 355 os.lseek(fd, 0, os.SEEK_SET) 356 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2) 357 self.assertEqual(n, 10) 358 359 os.lseek(fd, 0, os.SEEK_SET) 360 self.assertEqual(b'xxtest1tt2t3', posix.read(fd, 100)) 361 finally: 362 os.close(fd) 363 364 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 365 @unittest.skipUnless(hasattr(posix, 'os.RWF_SYNC'), "test needs os.RWF_SYNC") 366 def test_pwritev_flags(self): 367 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 368 try: 369 os.write(fd,b"xx") 370 os.lseek(fd, 0, os.SEEK_SET) 371 n = os.pwritev(fd, [b'test1', b'tt2', b't3'], 2, os.RWF_SYNC) 372 self.assertEqual(n, 10) 373 374 os.lseek(fd, 0, os.SEEK_SET) 375 self.assertEqual(b'xxtest1tt2', posix.read(fd, 100)) 376 finally: 377 os.close(fd) 378 379 @unittest.skipUnless(hasattr(posix, 'pwritev'), "test needs posix.pwritev()") 380 @requires_32b 381 def test_pwritev_overflow_32bits(self): 382 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 383 try: 384 with self.assertRaises(OSError) as cm: 385 os.pwritev(fd, [b"x" * 2**16] * 2**15, 0) 386 self.assertEqual(cm.exception.errno, errno.EINVAL) 387 finally: 388 os.close(fd) 389 390 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 391 "test needs posix.posix_fallocate()") 392 def test_posix_fallocate(self): 393 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT) 394 try: 395 posix.posix_fallocate(fd, 0, 10) 396 except OSError as inst: 397 # issue10812, ZFS doesn't appear to support posix_fallocate, 398 # so skip Solaris-based since they are likely to have ZFS. 399 # issue33655: Also ignore EINVAL on *BSD since ZFS is also 400 # often used there. 401 if inst.errno == errno.EINVAL and sys.platform.startswith( 402 ('sunos', 'freebsd', 'netbsd', 'openbsd', 'gnukfreebsd')): 403 raise unittest.SkipTest("test may fail on ZFS filesystems") 404 else: 405 raise 406 finally: 407 os.close(fd) 408 409 # issue31106 - posix_fallocate() does not set error in errno. 410 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'), 411 "test needs posix.posix_fallocate()") 412 def test_posix_fallocate_errno(self): 413 try: 414 posix.posix_fallocate(-42, 0, 10) 415 except OSError as inst: 416 if inst.errno != errno.EBADF: 417 raise 418 419 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 420 "test needs posix.posix_fadvise()") 421 def test_posix_fadvise(self): 422 fd = os.open(support.TESTFN, os.O_RDONLY) 423 try: 424 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED) 425 finally: 426 os.close(fd) 427 428 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'), 429 "test needs posix.posix_fadvise()") 430 def test_posix_fadvise_errno(self): 431 try: 432 posix.posix_fadvise(-42, 0, 0, posix.POSIX_FADV_WILLNEED) 433 except OSError as inst: 434 if inst.errno != errno.EBADF: 435 raise 436 437 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime") 438 def test_utime_with_fd(self): 439 now = time.time() 440 fd = os.open(support.TESTFN, os.O_RDONLY) 441 try: 442 posix.utime(fd) 443 posix.utime(fd, None) 444 self.assertRaises(TypeError, posix.utime, fd, (None, None)) 445 self.assertRaises(TypeError, posix.utime, fd, (now, None)) 446 self.assertRaises(TypeError, posix.utime, fd, (None, now)) 447 posix.utime(fd, (int(now), int(now))) 448 posix.utime(fd, (now, now)) 449 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now)) 450 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None)) 451 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0)) 452 posix.utime(fd, (int(now), int((now - int(now)) * 1e9))) 453 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9))) 454 455 finally: 456 os.close(fd) 457 458 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime") 459 def test_utime_nofollow_symlinks(self): 460 now = time.time() 461 posix.utime(support.TESTFN, None, follow_symlinks=False) 462 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False) 463 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False) 464 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False) 465 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False) 466 posix.utime(support.TESTFN, (now, now), follow_symlinks=False) 467 posix.utime(support.TESTFN, follow_symlinks=False) 468 469 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 470 def test_writev(self): 471 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 472 try: 473 n = os.writev(fd, (b'test1', b'tt2', b't3')) 474 self.assertEqual(n, 10) 475 476 os.lseek(fd, 0, os.SEEK_SET) 477 self.assertEqual(b'test1tt2t3', posix.read(fd, 10)) 478 479 # Issue #20113: empty list of buffers should not crash 480 try: 481 size = posix.writev(fd, []) 482 except OSError: 483 # writev(fd, []) raises OSError(22, "Invalid argument") 484 # on OpenIndiana 485 pass 486 else: 487 self.assertEqual(size, 0) 488 finally: 489 os.close(fd) 490 491 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()") 492 @requires_32b 493 def test_writev_overflow_32bits(self): 494 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 495 try: 496 with self.assertRaises(OSError) as cm: 497 os.writev(fd, [b"x" * 2**16] * 2**15) 498 self.assertEqual(cm.exception.errno, errno.EINVAL) 499 finally: 500 os.close(fd) 501 502 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 503 def test_readv(self): 504 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 505 try: 506 os.write(fd, b'test1tt2t3') 507 os.lseek(fd, 0, os.SEEK_SET) 508 buf = [bytearray(i) for i in [5, 3, 2]] 509 self.assertEqual(posix.readv(fd, buf), 10) 510 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf]) 511 512 # Issue #20113: empty list of buffers should not crash 513 try: 514 size = posix.readv(fd, []) 515 except OSError: 516 # readv(fd, []) raises OSError(22, "Invalid argument") 517 # on OpenIndiana 518 pass 519 else: 520 self.assertEqual(size, 0) 521 finally: 522 os.close(fd) 523 524 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()") 525 @requires_32b 526 def test_readv_overflow_32bits(self): 527 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT) 528 try: 529 buf = [bytearray(2**16)] * 2**15 530 with self.assertRaises(OSError) as cm: 531 os.readv(fd, buf) 532 self.assertEqual(cm.exception.errno, errno.EINVAL) 533 self.assertEqual(bytes(buf[0]), b'\0'* 2**16) 534 finally: 535 os.close(fd) 536 537 @unittest.skipUnless(hasattr(posix, 'dup'), 538 'test needs posix.dup()') 539 def test_dup(self): 540 fp = open(support.TESTFN) 541 try: 542 fd = posix.dup(fp.fileno()) 543 self.assertIsInstance(fd, int) 544 os.close(fd) 545 finally: 546 fp.close() 547 548 @unittest.skipUnless(hasattr(posix, 'confstr'), 549 'test needs posix.confstr()') 550 def test_confstr(self): 551 self.assertRaises(ValueError, posix.confstr, "CS_garbage") 552 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) 553 554 @unittest.skipUnless(hasattr(posix, 'dup2'), 555 'test needs posix.dup2()') 556 def test_dup2(self): 557 fp1 = open(support.TESTFN) 558 fp2 = open(support.TESTFN) 559 try: 560 posix.dup2(fp1.fileno(), fp2.fileno()) 561 finally: 562 fp1.close() 563 fp2.close() 564 565 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC") 566 @support.requires_linux_version(2, 6, 23) 567 def test_oscloexec(self): 568 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC) 569 self.addCleanup(os.close, fd) 570 self.assertFalse(os.get_inheritable(fd)) 571 572 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'), 573 'test needs posix.O_EXLOCK') 574 def test_osexlock(self): 575 fd = os.open(support.TESTFN, 576 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) 577 self.assertRaises(OSError, os.open, support.TESTFN, 578 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 579 os.close(fd) 580 581 if hasattr(posix, "O_SHLOCK"): 582 fd = os.open(support.TESTFN, 583 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 584 self.assertRaises(OSError, os.open, support.TESTFN, 585 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) 586 os.close(fd) 587 588 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'), 589 'test needs posix.O_SHLOCK') 590 def test_osshlock(self): 591 fd1 = os.open(support.TESTFN, 592 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 593 fd2 = os.open(support.TESTFN, 594 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 595 os.close(fd2) 596 os.close(fd1) 597 598 if hasattr(posix, "O_EXLOCK"): 599 fd = os.open(support.TESTFN, 600 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) 601 self.assertRaises(OSError, os.open, support.TESTFN, 602 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) 603 os.close(fd) 604 605 @unittest.skipUnless(hasattr(posix, 'fstat'), 606 'test needs posix.fstat()') 607 def test_fstat(self): 608 fp = open(support.TESTFN) 609 try: 610 self.assertTrue(posix.fstat(fp.fileno())) 611 self.assertTrue(posix.stat(fp.fileno())) 612 613 self.assertRaisesRegex(TypeError, 614 'should be string, bytes, os.PathLike or integer, not', 615 posix.stat, float(fp.fileno())) 616 finally: 617 fp.close() 618 619 def test_stat(self): 620 self.assertTrue(posix.stat(support.TESTFN)) 621 self.assertTrue(posix.stat(os.fsencode(support.TESTFN))) 622 623 self.assertWarnsRegex(DeprecationWarning, 624 'should be string, bytes, os.PathLike or integer, not', 625 posix.stat, bytearray(os.fsencode(support.TESTFN))) 626 self.assertRaisesRegex(TypeError, 627 'should be string, bytes, os.PathLike or integer, not', 628 posix.stat, None) 629 self.assertRaisesRegex(TypeError, 630 'should be string, bytes, os.PathLike or integer, not', 631 posix.stat, list(support.TESTFN)) 632 self.assertRaisesRegex(TypeError, 633 'should be string, bytes, os.PathLike or integer, not', 634 posix.stat, list(os.fsencode(support.TESTFN))) 635 636 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()") 637 def test_mkfifo(self): 638 support.unlink(support.TESTFN) 639 try: 640 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR) 641 except PermissionError as e: 642 self.skipTest('posix.mkfifo(): %s' % e) 643 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 644 645 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'), 646 "don't have mknod()/S_IFIFO") 647 def test_mknod(self): 648 # Test using mknod() to create a FIFO (the only use specified 649 # by POSIX). 650 support.unlink(support.TESTFN) 651 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 652 try: 653 posix.mknod(support.TESTFN, mode, 0) 654 except OSError as e: 655 # Some old systems don't allow unprivileged users to use 656 # mknod(), or only support creating device nodes. 657 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 658 else: 659 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 660 661 # Keyword arguments are also supported 662 support.unlink(support.TESTFN) 663 try: 664 posix.mknod(path=support.TESTFN, mode=mode, device=0, 665 dir_fd=None) 666 except OSError as e: 667 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 668 669 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()') 670 def test_makedev(self): 671 st = posix.stat(support.TESTFN) 672 dev = st.st_dev 673 self.assertIsInstance(dev, int) 674 self.assertGreaterEqual(dev, 0) 675 676 major = posix.major(dev) 677 self.assertIsInstance(major, int) 678 self.assertGreaterEqual(major, 0) 679 self.assertEqual(posix.major(dev), major) 680 self.assertRaises(TypeError, posix.major, float(dev)) 681 self.assertRaises(TypeError, posix.major) 682 self.assertRaises((ValueError, OverflowError), posix.major, -1) 683 684 minor = posix.minor(dev) 685 self.assertIsInstance(minor, int) 686 self.assertGreaterEqual(minor, 0) 687 self.assertEqual(posix.minor(dev), minor) 688 self.assertRaises(TypeError, posix.minor, float(dev)) 689 self.assertRaises(TypeError, posix.minor) 690 self.assertRaises((ValueError, OverflowError), posix.minor, -1) 691 692 self.assertEqual(posix.makedev(major, minor), dev) 693 self.assertRaises(TypeError, posix.makedev, float(major), minor) 694 self.assertRaises(TypeError, posix.makedev, major, float(minor)) 695 self.assertRaises(TypeError, posix.makedev, major) 696 self.assertRaises(TypeError, posix.makedev) 697 698 def _test_all_chown_common(self, chown_func, first_param, stat_func): 699 """Common code for chown, fchown and lchown tests.""" 700 def check_stat(uid, gid): 701 if stat_func is not None: 702 stat = stat_func(first_param) 703 self.assertEqual(stat.st_uid, uid) 704 self.assertEqual(stat.st_gid, gid) 705 uid = os.getuid() 706 gid = os.getgid() 707 # test a successful chown call 708 chown_func(first_param, uid, gid) 709 check_stat(uid, gid) 710 chown_func(first_param, -1, gid) 711 check_stat(uid, gid) 712 chown_func(first_param, uid, -1) 713 check_stat(uid, gid) 714 715 if uid == 0: 716 # Try an amusingly large uid/gid to make sure we handle 717 # large unsigned values. (chown lets you use any 718 # uid/gid you like, even if they aren't defined.) 719 # 720 # This problem keeps coming up: 721 # http://bugs.python.org/issue1747858 722 # http://bugs.python.org/issue4591 723 # http://bugs.python.org/issue15301 724 # Hopefully the fix in 4591 fixes it for good! 725 # 726 # This part of the test only runs when run as root. 727 # Only scary people run their tests as root. 728 729 big_value = 2**31 730 chown_func(first_param, big_value, big_value) 731 check_stat(big_value, big_value) 732 chown_func(first_param, -1, -1) 733 check_stat(big_value, big_value) 734 chown_func(first_param, uid, gid) 735 check_stat(uid, gid) 736 elif platform.system() in ('HP-UX', 'SunOS'): 737 # HP-UX and Solaris can allow a non-root user to chown() to root 738 # (issue #5113) 739 raise unittest.SkipTest("Skipping because of non-standard chown() " 740 "behavior") 741 else: 742 # non-root cannot chown to root, raises OSError 743 self.assertRaises(OSError, chown_func, first_param, 0, 0) 744 check_stat(uid, gid) 745 self.assertRaises(OSError, chown_func, first_param, 0, -1) 746 check_stat(uid, gid) 747 if 0 not in os.getgroups(): 748 self.assertRaises(OSError, chown_func, first_param, -1, 0) 749 check_stat(uid, gid) 750 # test illegal types 751 for t in str, float: 752 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid) 753 check_stat(uid, gid) 754 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid)) 755 check_stat(uid, gid) 756 757 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") 758 def test_chown(self): 759 # raise an OSError if the file does not exist 760 os.unlink(support.TESTFN) 761 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1) 762 763 # re-create the file 764 support.create_empty_file(support.TESTFN) 765 self._test_all_chown_common(posix.chown, support.TESTFN, posix.stat) 766 767 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") 768 def test_fchown(self): 769 os.unlink(support.TESTFN) 770 771 # re-create the file 772 test_file = open(support.TESTFN, 'w') 773 try: 774 fd = test_file.fileno() 775 self._test_all_chown_common(posix.fchown, fd, 776 getattr(posix, 'fstat', None)) 777 finally: 778 test_file.close() 779 780 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") 781 def test_lchown(self): 782 os.unlink(support.TESTFN) 783 # create a symlink 784 os.symlink(_DUMMY_SYMLINK, support.TESTFN) 785 self._test_all_chown_common(posix.lchown, support.TESTFN, 786 getattr(posix, 'lstat', None)) 787 788 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()') 789 def test_chdir(self): 790 posix.chdir(os.curdir) 791 self.assertRaises(OSError, posix.chdir, support.TESTFN) 792 793 def test_listdir(self): 794 self.assertIn(support.TESTFN, posix.listdir(os.curdir)) 795 796 def test_listdir_default(self): 797 # When listdir is called without argument, 798 # it's the same as listdir(os.curdir). 799 self.assertIn(support.TESTFN, posix.listdir()) 800 801 def test_listdir_bytes(self): 802 # When listdir is called with a bytes object, 803 # the returned strings are of type bytes. 804 self.assertIn(os.fsencode(support.TESTFN), posix.listdir(b'.')) 805 806 def test_listdir_bytes_like(self): 807 for cls in bytearray, memoryview: 808 with self.assertWarns(DeprecationWarning): 809 names = posix.listdir(cls(b'.')) 810 self.assertIn(os.fsencode(support.TESTFN), names) 811 for name in names: 812 self.assertIs(type(name), bytes) 813 814 @unittest.skipUnless(posix.listdir in os.supports_fd, 815 "test needs fd support for posix.listdir()") 816 def test_listdir_fd(self): 817 f = posix.open(posix.getcwd(), posix.O_RDONLY) 818 self.addCleanup(posix.close, f) 819 self.assertEqual( 820 sorted(posix.listdir('.')), 821 sorted(posix.listdir(f)) 822 ) 823 # Check that the fd offset was reset (issue #13739) 824 self.assertEqual( 825 sorted(posix.listdir('.')), 826 sorted(posix.listdir(f)) 827 ) 828 829 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()') 830 def test_access(self): 831 self.assertTrue(posix.access(support.TESTFN, os.R_OK)) 832 833 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()') 834 def test_umask(self): 835 old_mask = posix.umask(0) 836 self.assertIsInstance(old_mask, int) 837 posix.umask(old_mask) 838 839 @unittest.skipUnless(hasattr(posix, 'strerror'), 840 'test needs posix.strerror()') 841 def test_strerror(self): 842 self.assertTrue(posix.strerror(0)) 843 844 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()') 845 def test_pipe(self): 846 reader, writer = posix.pipe() 847 os.close(reader) 848 os.close(writer) 849 850 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 851 @support.requires_linux_version(2, 6, 27) 852 def test_pipe2(self): 853 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF') 854 self.assertRaises(TypeError, os.pipe2, 0, 0) 855 856 # try calling with flags = 0, like os.pipe() 857 r, w = os.pipe2(0) 858 os.close(r) 859 os.close(w) 860 861 # test flags 862 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK) 863 self.addCleanup(os.close, r) 864 self.addCleanup(os.close, w) 865 self.assertFalse(os.get_inheritable(r)) 866 self.assertFalse(os.get_inheritable(w)) 867 self.assertFalse(os.get_blocking(r)) 868 self.assertFalse(os.get_blocking(w)) 869 # try reading from an empty pipe: this should fail, not block 870 self.assertRaises(OSError, os.read, r, 1) 871 # try a write big enough to fill-up the pipe: this should either 872 # fail or perform a partial write, not block 873 try: 874 os.write(w, b'x' * support.PIPE_MAX_SIZE) 875 except OSError: 876 pass 877 878 @support.cpython_only 879 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()") 880 @support.requires_linux_version(2, 6, 27) 881 def test_pipe2_c_limits(self): 882 # Issue 15989 883 import _testcapi 884 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1) 885 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1) 886 887 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()') 888 def test_utime(self): 889 now = time.time() 890 posix.utime(support.TESTFN, None) 891 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None)) 892 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None)) 893 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now)) 894 posix.utime(support.TESTFN, (int(now), int(now))) 895 posix.utime(support.TESTFN, (now, now)) 896 897 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs): 898 st = os.stat(target_file) 899 self.assertTrue(hasattr(st, 'st_flags')) 900 901 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 902 flags = st.st_flags | stat.UF_IMMUTABLE 903 try: 904 chflags_func(target_file, flags, **kwargs) 905 except OSError as err: 906 if err.errno != errno.EOPNOTSUPP: 907 raise 908 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 909 self.skipTest(msg) 910 911 try: 912 new_st = os.stat(target_file) 913 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags) 914 try: 915 fd = open(target_file, 'w+') 916 except OSError as e: 917 self.assertEqual(e.errno, errno.EPERM) 918 finally: 919 posix.chflags(target_file, st.st_flags) 920 921 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()') 922 def test_chflags(self): 923 self._test_chflags_regular_file(posix.chflags, support.TESTFN) 924 925 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 926 def test_lchflags_regular_file(self): 927 self._test_chflags_regular_file(posix.lchflags, support.TESTFN) 928 self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False) 929 930 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()') 931 def test_lchflags_symlink(self): 932 testfn_st = os.stat(support.TESTFN) 933 934 self.assertTrue(hasattr(testfn_st, 'st_flags')) 935 936 os.symlink(support.TESTFN, _DUMMY_SYMLINK) 937 self.teardown_files.append(_DUMMY_SYMLINK) 938 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 939 940 def chflags_nofollow(path, flags): 941 return posix.chflags(path, flags, follow_symlinks=False) 942 943 for fn in (posix.lchflags, chflags_nofollow): 944 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE. 945 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE 946 try: 947 fn(_DUMMY_SYMLINK, flags) 948 except OSError as err: 949 if err.errno != errno.EOPNOTSUPP: 950 raise 951 msg = 'chflag UF_IMMUTABLE not supported by underlying fs' 952 self.skipTest(msg) 953 try: 954 new_testfn_st = os.stat(support.TESTFN) 955 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK) 956 957 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags) 958 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE, 959 new_dummy_symlink_st.st_flags) 960 finally: 961 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags) 962 963 def test_environ(self): 964 if os.name == "nt": 965 item_type = str 966 else: 967 item_type = bytes 968 for k, v in posix.environ.items(): 969 self.assertEqual(type(k), item_type) 970 self.assertEqual(type(v), item_type) 971 972 def test_putenv(self): 973 with self.assertRaises(ValueError): 974 os.putenv('FRUIT\0VEGETABLE', 'cabbage') 975 with self.assertRaises(ValueError): 976 os.putenv(b'FRUIT\0VEGETABLE', b'cabbage') 977 with self.assertRaises(ValueError): 978 os.putenv('FRUIT', 'orange\0VEGETABLE=cabbage') 979 with self.assertRaises(ValueError): 980 os.putenv(b'FRUIT', b'orange\0VEGETABLE=cabbage') 981 with self.assertRaises(ValueError): 982 os.putenv('FRUIT=ORANGE', 'lemon') 983 with self.assertRaises(ValueError): 984 os.putenv(b'FRUIT=ORANGE', b'lemon') 985 986 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()') 987 def test_getcwd_long_pathnames(self): 988 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' 989 curdir = os.getcwd() 990 base_path = os.path.abspath(support.TESTFN) + '.getcwd' 991 992 try: 993 os.mkdir(base_path) 994 os.chdir(base_path) 995 except: 996 # Just returning nothing instead of the SkipTest exception, because 997 # the test results in Error in that case. Is that ok? 998 # raise unittest.SkipTest("cannot create directory for testing") 999 return 1000 1001 def _create_and_do_getcwd(dirname, current_path_length = 0): 1002 try: 1003 os.mkdir(dirname) 1004 except: 1005 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test") 1006 1007 os.chdir(dirname) 1008 try: 1009 os.getcwd() 1010 if current_path_length < 1027: 1011 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) 1012 finally: 1013 os.chdir('..') 1014 os.rmdir(dirname) 1015 1016 _create_and_do_getcwd(dirname) 1017 1018 finally: 1019 os.chdir(curdir) 1020 support.rmtree(base_path) 1021 1022 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()") 1023 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()") 1024 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()") 1025 def test_getgrouplist(self): 1026 user = pwd.getpwuid(os.getuid())[0] 1027 group = pwd.getpwuid(os.getuid())[3] 1028 self.assertIn(group, posix.getgrouplist(user, group)) 1029 1030 1031 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") 1032 def test_getgroups(self): 1033 with os.popen('id -G 2>/dev/null') as idg: 1034 groups = idg.read().strip() 1035 ret = idg.close() 1036 1037 try: 1038 idg_groups = set(int(g) for g in groups.split()) 1039 except ValueError: 1040 idg_groups = set() 1041 if ret is not None or not idg_groups: 1042 raise unittest.SkipTest("need working 'id -G'") 1043 1044 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups() 1045 if sys.platform == 'darwin': 1046 import sysconfig 1047 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0' 1048 if tuple(int(n) for n in str(dt).split('.')[0:2]) < (10, 6): 1049 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6") 1050 1051 # 'id -G' and 'os.getgroups()' should return the same 1052 # groups, ignoring order, duplicates, and the effective gid. 1053 # #10822/#26944 - It is implementation defined whether 1054 # posix.getgroups() includes the effective gid. 1055 symdiff = idg_groups.symmetric_difference(posix.getgroups()) 1056 self.assertTrue(not symdiff or symdiff == {posix.getegid()}) 1057 1058 # tests for the posix *at functions follow 1059 1060 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()") 1061 def test_access_dir_fd(self): 1062 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1063 try: 1064 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f)) 1065 finally: 1066 posix.close(f) 1067 1068 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()") 1069 def test_chmod_dir_fd(self): 1070 os.chmod(support.TESTFN, stat.S_IRUSR) 1071 1072 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1073 try: 1074 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1075 1076 s = posix.stat(support.TESTFN) 1077 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR) 1078 finally: 1079 posix.close(f) 1080 1081 @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()") 1082 def test_chown_dir_fd(self): 1083 support.unlink(support.TESTFN) 1084 support.create_empty_file(support.TESTFN) 1085 1086 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1087 try: 1088 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f) 1089 finally: 1090 posix.close(f) 1091 1092 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()") 1093 def test_stat_dir_fd(self): 1094 support.unlink(support.TESTFN) 1095 with open(support.TESTFN, 'w') as outfile: 1096 outfile.write("testline\n") 1097 1098 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1099 try: 1100 s1 = posix.stat(support.TESTFN) 1101 s2 = posix.stat(support.TESTFN, dir_fd=f) 1102 self.assertEqual(s1, s2) 1103 s2 = posix.stat(support.TESTFN, dir_fd=None) 1104 self.assertEqual(s1, s2) 1105 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1106 posix.stat, support.TESTFN, dir_fd=posix.getcwd()) 1107 self.assertRaisesRegex(TypeError, 'should be integer or None, not', 1108 posix.stat, support.TESTFN, dir_fd=float(f)) 1109 self.assertRaises(OverflowError, 1110 posix.stat, support.TESTFN, dir_fd=10**20) 1111 finally: 1112 posix.close(f) 1113 1114 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()") 1115 def test_utime_dir_fd(self): 1116 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1117 try: 1118 now = time.time() 1119 posix.utime(support.TESTFN, None, dir_fd=f) 1120 posix.utime(support.TESTFN, dir_fd=f) 1121 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f) 1122 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f) 1123 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f) 1124 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f) 1125 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f) 1126 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f) 1127 posix.utime(support.TESTFN, (now, now), dir_fd=f) 1128 posix.utime(support.TESTFN, 1129 (int(now), int((now - int(now)) * 1e9)), dir_fd=f) 1130 posix.utime(support.TESTFN, dir_fd=f, 1131 times=(int(now), int((now - int(now)) * 1e9))) 1132 1133 # try dir_fd and follow_symlinks together 1134 if os.utime in os.supports_follow_symlinks: 1135 try: 1136 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f) 1137 except ValueError: 1138 # whoops! using both together not supported on this platform. 1139 pass 1140 1141 finally: 1142 posix.close(f) 1143 1144 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()") 1145 def test_link_dir_fd(self): 1146 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1147 try: 1148 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f) 1149 except PermissionError as e: 1150 self.skipTest('posix.link(): %s' % e) 1151 else: 1152 # should have same inodes 1153 self.assertEqual(posix.stat(support.TESTFN)[1], 1154 posix.stat(support.TESTFN + 'link')[1]) 1155 finally: 1156 posix.close(f) 1157 support.unlink(support.TESTFN + 'link') 1158 1159 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()") 1160 def test_mkdir_dir_fd(self): 1161 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1162 try: 1163 posix.mkdir(support.TESTFN + 'dir', dir_fd=f) 1164 posix.stat(support.TESTFN + 'dir') # should not raise exception 1165 finally: 1166 posix.close(f) 1167 support.rmtree(support.TESTFN + 'dir') 1168 1169 @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'), 1170 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()") 1171 def test_mknod_dir_fd(self): 1172 # Test using mknodat() to create a FIFO (the only use specified 1173 # by POSIX). 1174 support.unlink(support.TESTFN) 1175 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR 1176 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1177 try: 1178 posix.mknod(support.TESTFN, mode, 0, dir_fd=f) 1179 except OSError as e: 1180 # Some old systems don't allow unprivileged users to use 1181 # mknod(), or only support creating device nodes. 1182 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL, errno.EACCES)) 1183 else: 1184 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 1185 finally: 1186 posix.close(f) 1187 1188 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()") 1189 def test_open_dir_fd(self): 1190 support.unlink(support.TESTFN) 1191 with open(support.TESTFN, 'w') as outfile: 1192 outfile.write("testline\n") 1193 a = posix.open(posix.getcwd(), posix.O_RDONLY) 1194 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a) 1195 try: 1196 res = posix.read(b, 9).decode(encoding="utf-8") 1197 self.assertEqual("testline\n", res) 1198 finally: 1199 posix.close(a) 1200 posix.close(b) 1201 1202 @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()") 1203 def test_readlink_dir_fd(self): 1204 os.symlink(support.TESTFN, support.TESTFN + 'link') 1205 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1206 try: 1207 self.assertEqual(posix.readlink(support.TESTFN + 'link'), 1208 posix.readlink(support.TESTFN + 'link', dir_fd=f)) 1209 finally: 1210 support.unlink(support.TESTFN + 'link') 1211 posix.close(f) 1212 1213 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()") 1214 def test_rename_dir_fd(self): 1215 support.unlink(support.TESTFN) 1216 support.create_empty_file(support.TESTFN + 'ren') 1217 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1218 try: 1219 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f) 1220 except: 1221 posix.rename(support.TESTFN + 'ren', support.TESTFN) 1222 raise 1223 else: 1224 posix.stat(support.TESTFN) # should not raise exception 1225 finally: 1226 posix.close(f) 1227 1228 @unittest.skipUnless(hasattr(signal, 'SIGCHLD'), 'CLD_XXXX be placed in si_code for a SIGCHLD signal') 1229 @unittest.skipUnless(hasattr(os, 'waitid_result'), "test needs os.waitid_result") 1230 def test_cld_xxxx_constants(self): 1231 os.CLD_EXITED 1232 os.CLD_KILLED 1233 os.CLD_DUMPED 1234 os.CLD_TRAPPED 1235 os.CLD_STOPPED 1236 os.CLD_CONTINUED 1237 1238 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()") 1239 def test_symlink_dir_fd(self): 1240 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1241 try: 1242 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f) 1243 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN) 1244 finally: 1245 posix.close(f) 1246 support.unlink(support.TESTFN + 'link') 1247 1248 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()") 1249 def test_unlink_dir_fd(self): 1250 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1251 support.create_empty_file(support.TESTFN + 'del') 1252 posix.stat(support.TESTFN + 'del') # should not raise exception 1253 try: 1254 posix.unlink(support.TESTFN + 'del', dir_fd=f) 1255 except: 1256 support.unlink(support.TESTFN + 'del') 1257 raise 1258 else: 1259 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link') 1260 finally: 1261 posix.close(f) 1262 1263 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()") 1264 def test_mkfifo_dir_fd(self): 1265 support.unlink(support.TESTFN) 1266 f = posix.open(posix.getcwd(), posix.O_RDONLY) 1267 try: 1268 try: 1269 posix.mkfifo(support.TESTFN, 1270 stat.S_IRUSR | stat.S_IWUSR, dir_fd=f) 1271 except PermissionError as e: 1272 self.skipTest('posix.mkfifo(): %s' % e) 1273 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode)) 1274 finally: 1275 posix.close(f) 1276 1277 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'), 1278 "don't have scheduling support") 1279 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'), 1280 "don't have sched affinity support") 1281 1282 @requires_sched_h 1283 def test_sched_yield(self): 1284 # This has no error conditions (at least on Linux). 1285 posix.sched_yield() 1286 1287 @requires_sched_h 1288 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'), 1289 "requires sched_get_priority_max()") 1290 def test_sched_priority(self): 1291 # Round-robin usually has interesting priorities. 1292 pol = posix.SCHED_RR 1293 lo = posix.sched_get_priority_min(pol) 1294 hi = posix.sched_get_priority_max(pol) 1295 self.assertIsInstance(lo, int) 1296 self.assertIsInstance(hi, int) 1297 self.assertGreaterEqual(hi, lo) 1298 # OSX evidently just returns 15 without checking the argument. 1299 if sys.platform != "darwin": 1300 self.assertRaises(OSError, posix.sched_get_priority_min, -23) 1301 self.assertRaises(OSError, posix.sched_get_priority_max, -23) 1302 1303 @requires_sched 1304 def test_get_and_set_scheduler_and_param(self): 1305 possible_schedulers = [sched for name, sched in posix.__dict__.items() 1306 if name.startswith("SCHED_")] 1307 mine = posix.sched_getscheduler(0) 1308 self.assertIn(mine, possible_schedulers) 1309 try: 1310 parent = posix.sched_getscheduler(os.getppid()) 1311 except OSError as e: 1312 if e.errno != errno.EPERM: 1313 raise 1314 else: 1315 self.assertIn(parent, possible_schedulers) 1316 self.assertRaises(OSError, posix.sched_getscheduler, -1) 1317 self.assertRaises(OSError, posix.sched_getparam, -1) 1318 param = posix.sched_getparam(0) 1319 self.assertIsInstance(param.sched_priority, int) 1320 1321 # POSIX states that calling sched_setparam() or sched_setscheduler() on 1322 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR 1323 # is implementation-defined: NetBSD and FreeBSD can return EINVAL. 1324 if not sys.platform.startswith(('freebsd', 'netbsd')): 1325 try: 1326 posix.sched_setscheduler(0, mine, param) 1327 posix.sched_setparam(0, param) 1328 except OSError as e: 1329 if e.errno != errno.EPERM: 1330 raise 1331 self.assertRaises(OSError, posix.sched_setparam, -1, param) 1332 1333 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param) 1334 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None) 1335 self.assertRaises(TypeError, posix.sched_setparam, 0, 43) 1336 param = posix.sched_param(None) 1337 self.assertRaises(TypeError, posix.sched_setparam, 0, param) 1338 large = 214748364700 1339 param = posix.sched_param(large) 1340 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1341 param = posix.sched_param(sched_priority=-large) 1342 self.assertRaises(OverflowError, posix.sched_setparam, 0, param) 1343 1344 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function") 1345 def test_sched_rr_get_interval(self): 1346 try: 1347 interval = posix.sched_rr_get_interval(0) 1348 except OSError as e: 1349 # This likely means that sched_rr_get_interval is only valid for 1350 # processes with the SCHED_RR scheduler in effect. 1351 if e.errno != errno.EINVAL: 1352 raise 1353 self.skipTest("only works on SCHED_RR processes") 1354 self.assertIsInstance(interval, float) 1355 # Reasonable constraints, I think. 1356 self.assertGreaterEqual(interval, 0.) 1357 self.assertLess(interval, 1.) 1358 1359 @requires_sched_affinity 1360 def test_sched_getaffinity(self): 1361 mask = posix.sched_getaffinity(0) 1362 self.assertIsInstance(mask, set) 1363 self.assertGreaterEqual(len(mask), 1) 1364 self.assertRaises(OSError, posix.sched_getaffinity, -1) 1365 for cpu in mask: 1366 self.assertIsInstance(cpu, int) 1367 self.assertGreaterEqual(cpu, 0) 1368 self.assertLess(cpu, 1 << 32) 1369 1370 @requires_sched_affinity 1371 def test_sched_setaffinity(self): 1372 mask = posix.sched_getaffinity(0) 1373 if len(mask) > 1: 1374 # Empty masks are forbidden 1375 mask.pop() 1376 posix.sched_setaffinity(0, mask) 1377 self.assertEqual(posix.sched_getaffinity(0), mask) 1378 self.assertRaises(OSError, posix.sched_setaffinity, 0, []) 1379 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10]) 1380 self.assertRaises(ValueError, posix.sched_setaffinity, 0, map(int, "0X")) 1381 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128]) 1382 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask) 1383 1384 def test_rtld_constants(self): 1385 # check presence of major RTLD_* constants 1386 posix.RTLD_LAZY 1387 posix.RTLD_NOW 1388 posix.RTLD_GLOBAL 1389 posix.RTLD_LOCAL 1390 1391 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'), 1392 "test needs an OS that reports file holes") 1393 def test_fs_holes(self): 1394 # Even if the filesystem doesn't report holes, 1395 # if the OS supports it the SEEK_* constants 1396 # will be defined and will have a consistent 1397 # behaviour: 1398 # os.SEEK_DATA = current position 1399 # os.SEEK_HOLE = end of file position 1400 with open(support.TESTFN, 'r+b') as fp: 1401 fp.write(b"hello") 1402 fp.flush() 1403 size = fp.tell() 1404 fno = fp.fileno() 1405 try : 1406 for i in range(size): 1407 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA)) 1408 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE)) 1409 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA) 1410 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE) 1411 except OSError : 1412 # Some OSs claim to support SEEK_HOLE/SEEK_DATA 1413 # but it is not true. 1414 # For instance: 1415 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html 1416 raise unittest.SkipTest("OSError raised!") 1417 1418 def test_path_error2(self): 1419 """ 1420 Test functions that call path_error2(), providing two filenames in their exceptions. 1421 """ 1422 for name in ("rename", "replace", "link"): 1423 function = getattr(os, name, None) 1424 if function is None: 1425 continue 1426 1427 for dst in ("noodly2", support.TESTFN): 1428 try: 1429 function('doesnotexistfilename', dst) 1430 except OSError as e: 1431 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e)) 1432 break 1433 else: 1434 self.fail("No valid path_error2() test for os." + name) 1435 1436 def test_path_with_null_character(self): 1437 fn = support.TESTFN 1438 fn_with_NUL = fn + '\0' 1439 self.addCleanup(support.unlink, fn) 1440 support.unlink(fn) 1441 fd = None 1442 try: 1443 with self.assertRaises(ValueError): 1444 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1445 finally: 1446 if fd is not None: 1447 os.close(fd) 1448 self.assertFalse(os.path.exists(fn)) 1449 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1450 self.assertFalse(os.path.exists(fn)) 1451 open(fn, 'wb').close() 1452 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1453 1454 def test_path_with_null_byte(self): 1455 fn = os.fsencode(support.TESTFN) 1456 fn_with_NUL = fn + b'\0' 1457 self.addCleanup(support.unlink, fn) 1458 support.unlink(fn) 1459 fd = None 1460 try: 1461 with self.assertRaises(ValueError): 1462 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises 1463 finally: 1464 if fd is not None: 1465 os.close(fd) 1466 self.assertFalse(os.path.exists(fn)) 1467 self.assertRaises(ValueError, os.mkdir, fn_with_NUL) 1468 self.assertFalse(os.path.exists(fn)) 1469 open(fn, 'wb').close() 1470 self.assertRaises(ValueError, os.stat, fn_with_NUL) 1471 1472 @unittest.skipUnless(hasattr(os, "pidfd_open"), "pidfd_open unavailable") 1473 def test_pidfd_open(self): 1474 with self.assertRaises(OSError) as cm: 1475 os.pidfd_open(-1) 1476 if cm.exception.errno == errno.ENOSYS: 1477 self.skipTest("system does not support pidfd_open") 1478 if isinstance(cm.exception, PermissionError): 1479 self.skipTest(f"pidfd_open syscall blocked: {cm.exception!r}") 1480 self.assertEqual(cm.exception.errno, errno.EINVAL) 1481 os.close(os.pidfd_open(os.getpid(), 0)) 1482 1483class PosixGroupsTester(unittest.TestCase): 1484 1485 def setUp(self): 1486 if posix.getuid() != 0: 1487 raise unittest.SkipTest("not enough privileges") 1488 if not hasattr(posix, 'getgroups'): 1489 raise unittest.SkipTest("need posix.getgroups") 1490 if sys.platform == 'darwin': 1491 raise unittest.SkipTest("getgroups(2) is broken on OSX") 1492 self.saved_groups = posix.getgroups() 1493 1494 def tearDown(self): 1495 if hasattr(posix, 'setgroups'): 1496 posix.setgroups(self.saved_groups) 1497 elif hasattr(posix, 'initgroups'): 1498 name = pwd.getpwuid(posix.getuid()).pw_name 1499 posix.initgroups(name, self.saved_groups[0]) 1500 1501 @unittest.skipUnless(hasattr(posix, 'initgroups'), 1502 "test needs posix.initgroups()") 1503 def test_initgroups(self): 1504 # find missing group 1505 1506 g = max(self.saved_groups or [0]) + 1 1507 name = pwd.getpwuid(posix.getuid()).pw_name 1508 posix.initgroups(name, g) 1509 self.assertIn(g, posix.getgroups()) 1510 1511 @unittest.skipUnless(hasattr(posix, 'setgroups'), 1512 "test needs posix.setgroups()") 1513 def test_setgroups(self): 1514 for groups in [[0], list(range(16))]: 1515 posix.setgroups(groups) 1516 self.assertListEqual(groups, posix.getgroups()) 1517 1518 1519class _PosixSpawnMixin: 1520 # Program which does nothing and exits with status 0 (success) 1521 NOOP_PROGRAM = (sys.executable, '-I', '-S', '-c', 'pass') 1522 spawn_func = None 1523 1524 def python_args(self, *args): 1525 # Disable site module to avoid side effects. For example, 1526 # on Fedora 28, if the HOME environment variable is not set, 1527 # site._getuserbase() calls pwd.getpwuid() which opens 1528 # /var/lib/sss/mc/passwd but then leaves the file open which makes 1529 # test_close_file() to fail. 1530 return (sys.executable, '-I', '-S', *args) 1531 1532 def test_returns_pid(self): 1533 pidfile = support.TESTFN 1534 self.addCleanup(support.unlink, pidfile) 1535 script = f"""if 1: 1536 import os 1537 with open({pidfile!r}, "w") as pidfile: 1538 pidfile.write(str(os.getpid())) 1539 """ 1540 args = self.python_args('-c', script) 1541 pid = self.spawn_func(args[0], args, os.environ) 1542 support.wait_process(pid, exitcode=0) 1543 with open(pidfile) as f: 1544 self.assertEqual(f.read(), str(pid)) 1545 1546 def test_no_such_executable(self): 1547 no_such_executable = 'no_such_executable' 1548 try: 1549 pid = self.spawn_func(no_such_executable, 1550 [no_such_executable], 1551 os.environ) 1552 # bpo-35794: PermissionError can be raised if there are 1553 # directories in the $PATH that are not accessible. 1554 except (FileNotFoundError, PermissionError) as exc: 1555 self.assertEqual(exc.filename, no_such_executable) 1556 else: 1557 pid2, status = os.waitpid(pid, 0) 1558 self.assertEqual(pid2, pid) 1559 self.assertNotEqual(status, 0) 1560 1561 def test_specify_environment(self): 1562 envfile = support.TESTFN 1563 self.addCleanup(support.unlink, envfile) 1564 script = f"""if 1: 1565 import os 1566 with open({envfile!r}, "w") as envfile: 1567 envfile.write(os.environ['foo']) 1568 """ 1569 args = self.python_args('-c', script) 1570 pid = self.spawn_func(args[0], args, 1571 {**os.environ, 'foo': 'bar'}) 1572 support.wait_process(pid, exitcode=0) 1573 with open(envfile) as f: 1574 self.assertEqual(f.read(), 'bar') 1575 1576 def test_none_file_actions(self): 1577 pid = self.spawn_func( 1578 self.NOOP_PROGRAM[0], 1579 self.NOOP_PROGRAM, 1580 os.environ, 1581 file_actions=None 1582 ) 1583 support.wait_process(pid, exitcode=0) 1584 1585 def test_empty_file_actions(self): 1586 pid = self.spawn_func( 1587 self.NOOP_PROGRAM[0], 1588 self.NOOP_PROGRAM, 1589 os.environ, 1590 file_actions=[] 1591 ) 1592 support.wait_process(pid, exitcode=0) 1593 1594 def test_resetids_explicit_default(self): 1595 pid = self.spawn_func( 1596 sys.executable, 1597 [sys.executable, '-c', 'pass'], 1598 os.environ, 1599 resetids=False 1600 ) 1601 support.wait_process(pid, exitcode=0) 1602 1603 def test_resetids(self): 1604 pid = self.spawn_func( 1605 sys.executable, 1606 [sys.executable, '-c', 'pass'], 1607 os.environ, 1608 resetids=True 1609 ) 1610 support.wait_process(pid, exitcode=0) 1611 1612 def test_resetids_wrong_type(self): 1613 with self.assertRaises(TypeError): 1614 self.spawn_func(sys.executable, 1615 [sys.executable, "-c", "pass"], 1616 os.environ, resetids=None) 1617 1618 def test_setpgroup(self): 1619 pid = self.spawn_func( 1620 sys.executable, 1621 [sys.executable, '-c', 'pass'], 1622 os.environ, 1623 setpgroup=os.getpgrp() 1624 ) 1625 support.wait_process(pid, exitcode=0) 1626 1627 def test_setpgroup_wrong_type(self): 1628 with self.assertRaises(TypeError): 1629 self.spawn_func(sys.executable, 1630 [sys.executable, "-c", "pass"], 1631 os.environ, setpgroup="023") 1632 1633 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1634 'need signal.pthread_sigmask()') 1635 def test_setsigmask(self): 1636 code = textwrap.dedent("""\ 1637 import signal 1638 signal.raise_signal(signal.SIGUSR1)""") 1639 1640 pid = self.spawn_func( 1641 sys.executable, 1642 [sys.executable, '-c', code], 1643 os.environ, 1644 setsigmask=[signal.SIGUSR1] 1645 ) 1646 support.wait_process(pid, exitcode=0) 1647 1648 def test_setsigmask_wrong_type(self): 1649 with self.assertRaises(TypeError): 1650 self.spawn_func(sys.executable, 1651 [sys.executable, "-c", "pass"], 1652 os.environ, setsigmask=34) 1653 with self.assertRaises(TypeError): 1654 self.spawn_func(sys.executable, 1655 [sys.executable, "-c", "pass"], 1656 os.environ, setsigmask=["j"]) 1657 with self.assertRaises(ValueError): 1658 self.spawn_func(sys.executable, 1659 [sys.executable, "-c", "pass"], 1660 os.environ, setsigmask=[signal.NSIG, 1661 signal.NSIG+1]) 1662 1663 def test_setsid(self): 1664 rfd, wfd = os.pipe() 1665 self.addCleanup(os.close, rfd) 1666 try: 1667 os.set_inheritable(wfd, True) 1668 1669 code = textwrap.dedent(f""" 1670 import os 1671 fd = {wfd} 1672 sid = os.getsid(0) 1673 os.write(fd, str(sid).encode()) 1674 """) 1675 1676 try: 1677 pid = self.spawn_func(sys.executable, 1678 [sys.executable, "-c", code], 1679 os.environ, setsid=True) 1680 except NotImplementedError as exc: 1681 self.skipTest(f"setsid is not supported: {exc!r}") 1682 except PermissionError as exc: 1683 self.skipTest(f"setsid failed with: {exc!r}") 1684 finally: 1685 os.close(wfd) 1686 1687 support.wait_process(pid, exitcode=0) 1688 1689 output = os.read(rfd, 100) 1690 child_sid = int(output) 1691 parent_sid = os.getsid(os.getpid()) 1692 self.assertNotEqual(parent_sid, child_sid) 1693 1694 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1695 'need signal.pthread_sigmask()') 1696 def test_setsigdef(self): 1697 original_handler = signal.signal(signal.SIGUSR1, signal.SIG_IGN) 1698 code = textwrap.dedent("""\ 1699 import signal 1700 signal.raise_signal(signal.SIGUSR1)""") 1701 try: 1702 pid = self.spawn_func( 1703 sys.executable, 1704 [sys.executable, '-c', code], 1705 os.environ, 1706 setsigdef=[signal.SIGUSR1] 1707 ) 1708 finally: 1709 signal.signal(signal.SIGUSR1, original_handler) 1710 1711 support.wait_process(pid, exitcode=-signal.SIGUSR1) 1712 1713 def test_setsigdef_wrong_type(self): 1714 with self.assertRaises(TypeError): 1715 self.spawn_func(sys.executable, 1716 [sys.executable, "-c", "pass"], 1717 os.environ, setsigdef=34) 1718 with self.assertRaises(TypeError): 1719 self.spawn_func(sys.executable, 1720 [sys.executable, "-c", "pass"], 1721 os.environ, setsigdef=["j"]) 1722 with self.assertRaises(ValueError): 1723 self.spawn_func(sys.executable, 1724 [sys.executable, "-c", "pass"], 1725 os.environ, setsigdef=[signal.NSIG, signal.NSIG+1]) 1726 1727 @requires_sched 1728 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1729 "bpo-34685: test can fail on BSD") 1730 def test_setscheduler_only_param(self): 1731 policy = os.sched_getscheduler(0) 1732 priority = os.sched_get_priority_min(policy) 1733 code = textwrap.dedent(f"""\ 1734 import os, sys 1735 if os.sched_getscheduler(0) != {policy}: 1736 sys.exit(101) 1737 if os.sched_getparam(0).sched_priority != {priority}: 1738 sys.exit(102)""") 1739 pid = self.spawn_func( 1740 sys.executable, 1741 [sys.executable, '-c', code], 1742 os.environ, 1743 scheduler=(None, os.sched_param(priority)) 1744 ) 1745 support.wait_process(pid, exitcode=0) 1746 1747 @requires_sched 1748 @unittest.skipIf(sys.platform.startswith(('freebsd', 'netbsd')), 1749 "bpo-34685: test can fail on BSD") 1750 def test_setscheduler_with_policy(self): 1751 policy = os.sched_getscheduler(0) 1752 priority = os.sched_get_priority_min(policy) 1753 code = textwrap.dedent(f"""\ 1754 import os, sys 1755 if os.sched_getscheduler(0) != {policy}: 1756 sys.exit(101) 1757 if os.sched_getparam(0).sched_priority != {priority}: 1758 sys.exit(102)""") 1759 pid = self.spawn_func( 1760 sys.executable, 1761 [sys.executable, '-c', code], 1762 os.environ, 1763 scheduler=(policy, os.sched_param(priority)) 1764 ) 1765 support.wait_process(pid, exitcode=0) 1766 1767 def test_multiple_file_actions(self): 1768 file_actions = [ 1769 (os.POSIX_SPAWN_OPEN, 3, os.path.realpath(__file__), os.O_RDONLY, 0), 1770 (os.POSIX_SPAWN_CLOSE, 0), 1771 (os.POSIX_SPAWN_DUP2, 1, 4), 1772 ] 1773 pid = self.spawn_func(self.NOOP_PROGRAM[0], 1774 self.NOOP_PROGRAM, 1775 os.environ, 1776 file_actions=file_actions) 1777 support.wait_process(pid, exitcode=0) 1778 1779 def test_bad_file_actions(self): 1780 args = self.NOOP_PROGRAM 1781 with self.assertRaises(TypeError): 1782 self.spawn_func(args[0], args, os.environ, 1783 file_actions=[None]) 1784 with self.assertRaises(TypeError): 1785 self.spawn_func(args[0], args, os.environ, 1786 file_actions=[()]) 1787 with self.assertRaises(TypeError): 1788 self.spawn_func(args[0], args, os.environ, 1789 file_actions=[(None,)]) 1790 with self.assertRaises(TypeError): 1791 self.spawn_func(args[0], args, os.environ, 1792 file_actions=[(12345,)]) 1793 with self.assertRaises(TypeError): 1794 self.spawn_func(args[0], args, os.environ, 1795 file_actions=[(os.POSIX_SPAWN_CLOSE,)]) 1796 with self.assertRaises(TypeError): 1797 self.spawn_func(args[0], args, os.environ, 1798 file_actions=[(os.POSIX_SPAWN_CLOSE, 1, 2)]) 1799 with self.assertRaises(TypeError): 1800 self.spawn_func(args[0], args, os.environ, 1801 file_actions=[(os.POSIX_SPAWN_CLOSE, None)]) 1802 with self.assertRaises(ValueError): 1803 self.spawn_func(args[0], args, os.environ, 1804 file_actions=[(os.POSIX_SPAWN_OPEN, 1805 3, __file__ + '\0', 1806 os.O_RDONLY, 0)]) 1807 1808 def test_open_file(self): 1809 outfile = support.TESTFN 1810 self.addCleanup(support.unlink, outfile) 1811 script = """if 1: 1812 import sys 1813 sys.stdout.write("hello") 1814 """ 1815 file_actions = [ 1816 (os.POSIX_SPAWN_OPEN, 1, outfile, 1817 os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 1818 stat.S_IRUSR | stat.S_IWUSR), 1819 ] 1820 args = self.python_args('-c', script) 1821 pid = self.spawn_func(args[0], args, os.environ, 1822 file_actions=file_actions) 1823 1824 support.wait_process(pid, exitcode=0) 1825 with open(outfile) as f: 1826 self.assertEqual(f.read(), 'hello') 1827 1828 def test_close_file(self): 1829 closefile = support.TESTFN 1830 self.addCleanup(support.unlink, closefile) 1831 script = f"""if 1: 1832 import os 1833 try: 1834 os.fstat(0) 1835 except OSError as e: 1836 with open({closefile!r}, 'w') as closefile: 1837 closefile.write('is closed %d' % e.errno) 1838 """ 1839 args = self.python_args('-c', script) 1840 pid = self.spawn_func(args[0], args, os.environ, 1841 file_actions=[(os.POSIX_SPAWN_CLOSE, 0)]) 1842 1843 support.wait_process(pid, exitcode=0) 1844 with open(closefile) as f: 1845 self.assertEqual(f.read(), 'is closed %d' % errno.EBADF) 1846 1847 def test_dup2(self): 1848 dupfile = support.TESTFN 1849 self.addCleanup(support.unlink, dupfile) 1850 script = """if 1: 1851 import sys 1852 sys.stdout.write("hello") 1853 """ 1854 with open(dupfile, "wb") as childfile: 1855 file_actions = [ 1856 (os.POSIX_SPAWN_DUP2, childfile.fileno(), 1), 1857 ] 1858 args = self.python_args('-c', script) 1859 pid = self.spawn_func(args[0], args, os.environ, 1860 file_actions=file_actions) 1861 support.wait_process(pid, exitcode=0) 1862 with open(dupfile) as f: 1863 self.assertEqual(f.read(), 'hello') 1864 1865 1866@unittest.skipUnless(hasattr(os, 'posix_spawn'), "test needs os.posix_spawn") 1867class TestPosixSpawn(unittest.TestCase, _PosixSpawnMixin): 1868 spawn_func = getattr(posix, 'posix_spawn', None) 1869 1870 1871@unittest.skipUnless(hasattr(os, 'posix_spawnp'), "test needs os.posix_spawnp") 1872class TestPosixSpawnP(unittest.TestCase, _PosixSpawnMixin): 1873 spawn_func = getattr(posix, 'posix_spawnp', None) 1874 1875 @support.skip_unless_symlink 1876 def test_posix_spawnp(self): 1877 # Use a symlink to create a program in its own temporary directory 1878 temp_dir = tempfile.mkdtemp() 1879 self.addCleanup(support.rmtree, temp_dir) 1880 1881 program = 'posix_spawnp_test_program.exe' 1882 program_fullpath = os.path.join(temp_dir, program) 1883 os.symlink(sys.executable, program_fullpath) 1884 1885 try: 1886 path = os.pathsep.join((temp_dir, os.environ['PATH'])) 1887 except KeyError: 1888 path = temp_dir # PATH is not set 1889 1890 spawn_args = (program, '-I', '-S', '-c', 'pass') 1891 code = textwrap.dedent(""" 1892 import os 1893 from test import support 1894 1895 args = %a 1896 pid = os.posix_spawnp(args[0], args, os.environ) 1897 1898 support.wait_process(pid, exitcode=0) 1899 """ % (spawn_args,)) 1900 1901 # Use a subprocess to test os.posix_spawnp() with a modified PATH 1902 # environment variable: posix_spawnp() uses the current environment 1903 # to locate the program, not its environment argument. 1904 args = ('-c', code) 1905 assert_python_ok(*args, PATH=path) 1906 1907 1908@unittest.skipUnless(sys.platform == "darwin", "test weak linking on macOS") 1909class TestPosixWeaklinking(unittest.TestCase): 1910 # These test cases verify that weak linking support on macOS works 1911 # as expected. These cases only test new behaviour introduced by weak linking, 1912 # regular behaviour is tested by the normal test cases. 1913 # 1914 # See the section on Weak Linking in Mac/README.txt for more information. 1915 def setUp(self): 1916 import sysconfig 1917 import platform 1918 1919 config_vars = sysconfig.get_config_vars() 1920 self.available = { nm for nm in config_vars if nm.startswith("HAVE_") and config_vars[nm] } 1921 self.mac_ver = tuple(int(part) for part in platform.mac_ver()[0].split(".")) 1922 1923 def _verify_available(self, name): 1924 if name not in self.available: 1925 raise unittest.SkipTest(f"{name} not weak-linked") 1926 1927 def test_pwritev(self): 1928 self._verify_available("HAVE_PWRITEV") 1929 if self.mac_ver >= (10, 16): 1930 self.assertTrue(hasattr(os, "pwritev"), "os.pwritev is not available") 1931 self.assertTrue(hasattr(os, "preadv"), "os.readv is not available") 1932 1933 else: 1934 self.assertFalse(hasattr(os, "pwritev"), "os.pwritev is available") 1935 self.assertFalse(hasattr(os, "preadv"), "os.readv is available") 1936 1937 def test_stat(self): 1938 self._verify_available("HAVE_FSTATAT") 1939 if self.mac_ver >= (10, 10): 1940 self.assertIn("HAVE_FSTATAT", posix._have_functions) 1941 1942 else: 1943 self.assertNotIn("HAVE_FSTATAT", posix._have_functions) 1944 1945 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1946 os.stat("file", dir_fd=0) 1947 1948 def test_access(self): 1949 self._verify_available("HAVE_FACCESSAT") 1950 if self.mac_ver >= (10, 10): 1951 self.assertIn("HAVE_FACCESSAT", posix._have_functions) 1952 1953 else: 1954 self.assertNotIn("HAVE_FACCESSAT", posix._have_functions) 1955 1956 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1957 os.access("file", os.R_OK, dir_fd=0) 1958 1959 with self.assertRaisesRegex(NotImplementedError, "follow_symlinks unavailable"): 1960 os.access("file", os.R_OK, follow_symlinks=False) 1961 1962 with self.assertRaisesRegex(NotImplementedError, "effective_ids unavailable"): 1963 os.access("file", os.R_OK, effective_ids=True) 1964 1965 def test_chmod(self): 1966 self._verify_available("HAVE_FCHMODAT") 1967 if self.mac_ver >= (10, 10): 1968 self.assertIn("HAVE_FCHMODAT", posix._have_functions) 1969 1970 else: 1971 self.assertNotIn("HAVE_FCHMODAT", posix._have_functions) 1972 self.assertIn("HAVE_LCHMOD", posix._have_functions) 1973 1974 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1975 os.chmod("file", 0o644, dir_fd=0) 1976 1977 def test_chown(self): 1978 self._verify_available("HAVE_FCHOWNAT") 1979 if self.mac_ver >= (10, 10): 1980 self.assertIn("HAVE_FCHOWNAT", posix._have_functions) 1981 1982 else: 1983 self.assertNotIn("HAVE_FCHOWNAT", posix._have_functions) 1984 self.assertIn("HAVE_LCHOWN", posix._have_functions) 1985 1986 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 1987 os.chown("file", 0, 0, dir_fd=0) 1988 1989 def test_link(self): 1990 self._verify_available("HAVE_LINKAT") 1991 if self.mac_ver >= (10, 10): 1992 self.assertIn("HAVE_LINKAT", posix._have_functions) 1993 1994 else: 1995 self.assertNotIn("HAVE_LINKAT", posix._have_functions) 1996 1997 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 1998 os.link("source", "target", src_dir_fd=0) 1999 2000 with self.assertRaisesRegex(NotImplementedError, "dst_dir_fd unavailable"): 2001 os.link("source", "target", dst_dir_fd=0) 2002 2003 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd unavailable"): 2004 os.link("source", "target", src_dir_fd=0, dst_dir_fd=0) 2005 2006 # issue 41355: !HAVE_LINKAT code path ignores the follow_symlinks flag 2007 with support.temp_dir() as base_path: 2008 link_path = os.path.join(base_path, "link") 2009 target_path = os.path.join(base_path, "target") 2010 source_path = os.path.join(base_path, "source") 2011 2012 with open(source_path, "w") as fp: 2013 fp.write("data") 2014 2015 os.symlink("target", link_path) 2016 2017 # Calling os.link should fail in the link(2) call, and 2018 # should not reject *follow_symlinks* (to match the 2019 # behaviour you'd get when building on a platform without 2020 # linkat) 2021 with self.assertRaises(FileExistsError): 2022 os.link(source_path, link_path, follow_symlinks=True) 2023 2024 with self.assertRaises(FileExistsError): 2025 os.link(source_path, link_path, follow_symlinks=False) 2026 2027 2028 def test_listdir_scandir(self): 2029 self._verify_available("HAVE_FDOPENDIR") 2030 if self.mac_ver >= (10, 10): 2031 self.assertIn("HAVE_FDOPENDIR", posix._have_functions) 2032 2033 else: 2034 self.assertNotIn("HAVE_FDOPENDIR", posix._have_functions) 2035 2036 with self.assertRaisesRegex(TypeError, "listdir: path should be string, bytes, os.PathLike or None, not int"): 2037 os.listdir(0) 2038 2039 with self.assertRaisesRegex(TypeError, "scandir: path should be string, bytes, os.PathLike or None, not int"): 2040 os.scandir(0) 2041 2042 def test_mkdir(self): 2043 self._verify_available("HAVE_MKDIRAT") 2044 if self.mac_ver >= (10, 10): 2045 self.assertIn("HAVE_MKDIRAT", posix._have_functions) 2046 2047 else: 2048 self.assertNotIn("HAVE_MKDIRAT", posix._have_functions) 2049 2050 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2051 os.mkdir("dir", dir_fd=0) 2052 2053 def test_rename_replace(self): 2054 self._verify_available("HAVE_RENAMEAT") 2055 if self.mac_ver >= (10, 10): 2056 self.assertIn("HAVE_RENAMEAT", posix._have_functions) 2057 2058 else: 2059 self.assertNotIn("HAVE_RENAMEAT", posix._have_functions) 2060 2061 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2062 os.rename("a", "b", src_dir_fd=0) 2063 2064 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2065 os.rename("a", "b", dst_dir_fd=0) 2066 2067 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2068 os.replace("a", "b", src_dir_fd=0) 2069 2070 with self.assertRaisesRegex(NotImplementedError, "src_dir_fd and dst_dir_fd unavailable"): 2071 os.replace("a", "b", dst_dir_fd=0) 2072 2073 def test_unlink_rmdir(self): 2074 self._verify_available("HAVE_UNLINKAT") 2075 if self.mac_ver >= (10, 10): 2076 self.assertIn("HAVE_UNLINKAT", posix._have_functions) 2077 2078 else: 2079 self.assertNotIn("HAVE_UNLINKAT", posix._have_functions) 2080 2081 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2082 os.unlink("path", dir_fd=0) 2083 2084 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2085 os.rmdir("path", dir_fd=0) 2086 2087 def test_open(self): 2088 self._verify_available("HAVE_OPENAT") 2089 if self.mac_ver >= (10, 10): 2090 self.assertIn("HAVE_OPENAT", posix._have_functions) 2091 2092 else: 2093 self.assertNotIn("HAVE_OPENAT", posix._have_functions) 2094 2095 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2096 os.open("path", os.O_RDONLY, dir_fd=0) 2097 2098 def test_readlink(self): 2099 self._verify_available("HAVE_READLINKAT") 2100 if self.mac_ver >= (10, 10): 2101 self.assertIn("HAVE_READLINKAT", posix._have_functions) 2102 2103 else: 2104 self.assertNotIn("HAVE_READLINKAT", posix._have_functions) 2105 2106 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2107 os.readlink("path", dir_fd=0) 2108 2109 def test_symlink(self): 2110 self._verify_available("HAVE_SYMLINKAT") 2111 if self.mac_ver >= (10, 10): 2112 self.assertIn("HAVE_SYMLINKAT", posix._have_functions) 2113 2114 else: 2115 self.assertNotIn("HAVE_SYMLINKAT", posix._have_functions) 2116 2117 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2118 os.symlink("a", "b", dir_fd=0) 2119 2120 def test_utime(self): 2121 self._verify_available("HAVE_FUTIMENS") 2122 self._verify_available("HAVE_UTIMENSAT") 2123 if self.mac_ver >= (10, 13): 2124 self.assertIn("HAVE_FUTIMENS", posix._have_functions) 2125 self.assertIn("HAVE_UTIMENSAT", posix._have_functions) 2126 2127 else: 2128 self.assertNotIn("HAVE_FUTIMENS", posix._have_functions) 2129 self.assertNotIn("HAVE_UTIMENSAT", posix._have_functions) 2130 2131 with self.assertRaisesRegex(NotImplementedError, "dir_fd unavailable"): 2132 os.utime("path", dir_fd=0) 2133 2134 2135def test_main(): 2136 try: 2137 support.run_unittest( 2138 PosixTester, 2139 PosixGroupsTester, 2140 TestPosixSpawn, 2141 TestPosixSpawnP, 2142 TestPosixWeaklinking 2143 ) 2144 finally: 2145 support.reap_children() 2146 2147if __name__ == '__main__': 2148 test_main() 2149