1import unittest 2from test import test_support 3import subprocess 4import sys 5import platform 6import signal 7import os 8import errno 9import tempfile 10import time 11import re 12import sysconfig 13import textwrap 14 15try: 16 import ctypes 17except ImportError: 18 ctypes = None 19else: 20 import ctypes.util 21 22try: 23 import resource 24except ImportError: 25 resource = None 26try: 27 import threading 28except ImportError: 29 threading = None 30 31try: 32 import _testcapi 33except ImportError: 34 _testcapi = None 35 36mswindows = (sys.platform == "win32") 37 38# 39# Depends on the following external programs: Python 40# 41 42if mswindows: 43 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 44 'os.O_BINARY);') 45else: 46 SETBINARY = '' 47 48 49class BaseTestCase(unittest.TestCase): 50 def setUp(self): 51 # Try to minimize the number of children we have so this test 52 # doesn't crash on some buildbots (Alphas in particular). 53 test_support.reap_children() 54 55 def tearDown(self): 56 for inst in subprocess._active: 57 inst.wait() 58 subprocess._cleanup() 59 self.assertFalse(subprocess._active, "subprocess._active not empty") 60 self.doCleanups() 61 test_support.reap_children() 62 63 def assertStderrEqual(self, stderr, expected, msg=None): 64 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 65 # shutdown time. That frustrates tests trying to check stderr produced 66 # from a spawned Python process. 67 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 68 self.assertEqual(actual, expected, msg) 69 70 71class PopenTestException(Exception): 72 pass 73 74 75class PopenExecuteChildRaises(subprocess.Popen): 76 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 77 _execute_child fails. 78 """ 79 def _execute_child(self, *args, **kwargs): 80 raise PopenTestException("Forced Exception for Test") 81 82 83class ProcessTestCase(BaseTestCase): 84 85 def test_call_seq(self): 86 # call() function with sequence argument 87 rc = subprocess.call([sys.executable, "-c", 88 "import sys; sys.exit(47)"]) 89 self.assertEqual(rc, 47) 90 91 def test_check_call_zero(self): 92 # check_call() function with zero return code 93 rc = subprocess.check_call([sys.executable, "-c", 94 "import sys; sys.exit(0)"]) 95 self.assertEqual(rc, 0) 96 97 def test_check_call_nonzero(self): 98 # check_call() function with non-zero return code 99 with self.assertRaises(subprocess.CalledProcessError) as c: 100 subprocess.check_call([sys.executable, "-c", 101 "import sys; sys.exit(47)"]) 102 self.assertEqual(c.exception.returncode, 47) 103 104 def test_check_output(self): 105 # check_output() function with zero return code 106 output = subprocess.check_output( 107 [sys.executable, "-c", "print 'BDFL'"]) 108 self.assertIn('BDFL', output) 109 110 def test_check_output_nonzero(self): 111 # check_call() function with non-zero return code 112 with self.assertRaises(subprocess.CalledProcessError) as c: 113 subprocess.check_output( 114 [sys.executable, "-c", "import sys; sys.exit(5)"]) 115 self.assertEqual(c.exception.returncode, 5) 116 117 def test_check_output_stderr(self): 118 # check_output() function stderr redirected to stdout 119 output = subprocess.check_output( 120 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 121 stderr=subprocess.STDOUT) 122 self.assertIn('BDFL', output) 123 124 def test_check_output_stdout_arg(self): 125 # check_output() function stderr redirected to stdout 126 with self.assertRaises(ValueError) as c: 127 output = subprocess.check_output( 128 [sys.executable, "-c", "print 'will not be run'"], 129 stdout=sys.stdout) 130 self.fail("Expected ValueError when stdout arg supplied.") 131 self.assertIn('stdout', c.exception.args[0]) 132 133 def test_call_kwargs(self): 134 # call() function with keyword args 135 newenv = os.environ.copy() 136 newenv["FRUIT"] = "banana" 137 rc = subprocess.call([sys.executable, "-c", 138 'import sys, os;' 139 'sys.exit(os.getenv("FRUIT")=="banana")'], 140 env=newenv) 141 self.assertEqual(rc, 1) 142 143 def test_invalid_args(self): 144 # Popen() called with invalid arguments should raise TypeError 145 # but Popen.__del__ should not complain (issue #12085) 146 with test_support.captured_stderr() as s: 147 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 148 argcount = subprocess.Popen.__init__.__code__.co_argcount 149 too_many_args = [0] * (argcount + 1) 150 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 151 self.assertEqual(s.getvalue(), '') 152 153 def test_stdin_none(self): 154 # .stdin is None when not redirected 155 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 156 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 157 self.addCleanup(p.stdout.close) 158 self.addCleanup(p.stderr.close) 159 p.wait() 160 self.assertEqual(p.stdin, None) 161 162 def test_stdout_none(self): 163 # .stdout is None when not redirected, and the child's stdout will 164 # be inherited from the parent. In order to test this we run a 165 # subprocess in a subprocess: 166 # this_test 167 # \-- subprocess created by this test (parent) 168 # \-- subprocess created by the parent subprocess (child) 169 # The parent doesn't specify stdout, so the child will use the 170 # parent's stdout. This test checks that the message printed by the 171 # child goes to the parent stdout. The parent also checks that the 172 # child's stdout is None. See #11963. 173 code = ('import sys; from subprocess import Popen, PIPE;' 174 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],' 175 ' stdin=PIPE, stderr=PIPE);' 176 'p.wait(); assert p.stdout is None;') 177 p = subprocess.Popen([sys.executable, "-c", code], 178 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 179 self.addCleanup(p.stdout.close) 180 self.addCleanup(p.stderr.close) 181 out, err = p.communicate() 182 self.assertEqual(p.returncode, 0, err) 183 self.assertEqual(out.rstrip(), 'test_stdout_none') 184 185 def test_stderr_none(self): 186 # .stderr is None when not redirected 187 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 188 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 189 self.addCleanup(p.stdout.close) 190 self.addCleanup(p.stdin.close) 191 p.wait() 192 self.assertEqual(p.stderr, None) 193 194 def test_executable_with_cwd(self): 195 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 196 p = subprocess.Popen(["somethingyoudonthave", "-c", 197 "import sys; sys.exit(47)"], 198 executable=sys.executable, cwd=python_dir) 199 p.wait() 200 self.assertEqual(p.returncode, 47) 201 202 @unittest.skipIf(sysconfig.is_python_build(), 203 "need an installed Python. See #7774") 204 def test_executable_without_cwd(self): 205 # For a normal installation, it should work without 'cwd' 206 # argument. For test runs in the build directory, see #7774. 207 p = subprocess.Popen(["somethingyoudonthave", "-c", 208 "import sys; sys.exit(47)"], 209 executable=sys.executable) 210 p.wait() 211 self.assertEqual(p.returncode, 47) 212 213 def test_stdin_pipe(self): 214 # stdin redirection 215 p = subprocess.Popen([sys.executable, "-c", 216 'import sys; sys.exit(sys.stdin.read() == "pear")'], 217 stdin=subprocess.PIPE) 218 p.stdin.write("pear") 219 p.stdin.close() 220 p.wait() 221 self.assertEqual(p.returncode, 1) 222 223 def test_stdin_filedes(self): 224 # stdin is set to open file descriptor 225 tf = tempfile.TemporaryFile() 226 d = tf.fileno() 227 os.write(d, "pear") 228 os.lseek(d, 0, 0) 229 p = subprocess.Popen([sys.executable, "-c", 230 'import sys; sys.exit(sys.stdin.read() == "pear")'], 231 stdin=d) 232 p.wait() 233 self.assertEqual(p.returncode, 1) 234 235 def test_stdin_fileobj(self): 236 # stdin is set to open file object 237 tf = tempfile.TemporaryFile() 238 tf.write("pear") 239 tf.seek(0) 240 p = subprocess.Popen([sys.executable, "-c", 241 'import sys; sys.exit(sys.stdin.read() == "pear")'], 242 stdin=tf) 243 p.wait() 244 self.assertEqual(p.returncode, 1) 245 246 def test_stdout_pipe(self): 247 # stdout redirection 248 p = subprocess.Popen([sys.executable, "-c", 249 'import sys; sys.stdout.write("orange")'], 250 stdout=subprocess.PIPE) 251 self.addCleanup(p.stdout.close) 252 self.assertEqual(p.stdout.read(), "orange") 253 254 def test_stdout_filedes(self): 255 # stdout is set to open file descriptor 256 tf = tempfile.TemporaryFile() 257 d = tf.fileno() 258 p = subprocess.Popen([sys.executable, "-c", 259 'import sys; sys.stdout.write("orange")'], 260 stdout=d) 261 p.wait() 262 os.lseek(d, 0, 0) 263 self.assertEqual(os.read(d, 1024), "orange") 264 265 def test_stdout_fileobj(self): 266 # stdout is set to open file object 267 tf = tempfile.TemporaryFile() 268 p = subprocess.Popen([sys.executable, "-c", 269 'import sys; sys.stdout.write("orange")'], 270 stdout=tf) 271 p.wait() 272 tf.seek(0) 273 self.assertEqual(tf.read(), "orange") 274 275 def test_stderr_pipe(self): 276 # stderr redirection 277 p = subprocess.Popen([sys.executable, "-c", 278 'import sys; sys.stderr.write("strawberry")'], 279 stderr=subprocess.PIPE) 280 self.addCleanup(p.stderr.close) 281 self.assertStderrEqual(p.stderr.read(), "strawberry") 282 283 def test_stderr_filedes(self): 284 # stderr is set to open file descriptor 285 tf = tempfile.TemporaryFile() 286 d = tf.fileno() 287 p = subprocess.Popen([sys.executable, "-c", 288 'import sys; sys.stderr.write("strawberry")'], 289 stderr=d) 290 p.wait() 291 os.lseek(d, 0, 0) 292 self.assertStderrEqual(os.read(d, 1024), "strawberry") 293 294 def test_stderr_fileobj(self): 295 # stderr is set to open file object 296 tf = tempfile.TemporaryFile() 297 p = subprocess.Popen([sys.executable, "-c", 298 'import sys; sys.stderr.write("strawberry")'], 299 stderr=tf) 300 p.wait() 301 tf.seek(0) 302 self.assertStderrEqual(tf.read(), "strawberry") 303 304 def test_stderr_redirect_with_no_stdout_redirect(self): 305 # test stderr=STDOUT while stdout=None (not set) 306 307 # - grandchild prints to stderr 308 # - child redirects grandchild's stderr to its stdout 309 # - the parent should get grandchild's stderr in child's stdout 310 p = subprocess.Popen([sys.executable, "-c", 311 'import sys, subprocess;' 312 'rc = subprocess.call([sys.executable, "-c",' 313 ' "import sys;"' 314 ' "sys.stderr.write(\'42\')"],' 315 ' stderr=subprocess.STDOUT);' 316 'sys.exit(rc)'], 317 stdout=subprocess.PIPE, 318 stderr=subprocess.PIPE) 319 stdout, stderr = p.communicate() 320 #NOTE: stdout should get stderr from grandchild 321 self.assertStderrEqual(stdout, b'42') 322 self.assertStderrEqual(stderr, b'') # should be empty 323 self.assertEqual(p.returncode, 0) 324 325 def test_stdout_stderr_pipe(self): 326 # capture stdout and stderr to the same pipe 327 p = subprocess.Popen([sys.executable, "-c", 328 'import sys;' 329 'sys.stdout.write("apple");' 330 'sys.stdout.flush();' 331 'sys.stderr.write("orange")'], 332 stdout=subprocess.PIPE, 333 stderr=subprocess.STDOUT) 334 self.addCleanup(p.stdout.close) 335 self.assertStderrEqual(p.stdout.read(), "appleorange") 336 337 def test_stdout_stderr_file(self): 338 # capture stdout and stderr to the same open file 339 tf = tempfile.TemporaryFile() 340 p = subprocess.Popen([sys.executable, "-c", 341 'import sys;' 342 'sys.stdout.write("apple");' 343 'sys.stdout.flush();' 344 'sys.stderr.write("orange")'], 345 stdout=tf, 346 stderr=tf) 347 p.wait() 348 tf.seek(0) 349 self.assertStderrEqual(tf.read(), "appleorange") 350 351 def test_stdout_filedes_of_stdout(self): 352 # stdout is set to 1 (#1531862). 353 # To avoid printing the text on stdout, we do something similar to 354 # test_stdout_none (see above). The parent subprocess calls the child 355 # subprocess passing stdout=1, and this test uses stdout=PIPE in 356 # order to capture and check the output of the parent. See #11963. 357 code = ('import sys, subprocess; ' 358 'rc = subprocess.call([sys.executable, "-c", ' 359 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 360 '\'test with stdout=1\'))"], stdout=1); ' 361 'assert rc == 18') 362 p = subprocess.Popen([sys.executable, "-c", code], 363 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 364 self.addCleanup(p.stdout.close) 365 self.addCleanup(p.stderr.close) 366 out, err = p.communicate() 367 self.assertEqual(p.returncode, 0, err) 368 self.assertEqual(out.rstrip(), 'test with stdout=1') 369 370 def test_cwd(self): 371 tmpdir = tempfile.gettempdir() 372 # We cannot use os.path.realpath to canonicalize the path, 373 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. 374 cwd = os.getcwd() 375 os.chdir(tmpdir) 376 tmpdir = os.getcwd() 377 os.chdir(cwd) 378 p = subprocess.Popen([sys.executable, "-c", 379 'import sys,os;' 380 'sys.stdout.write(os.getcwd())'], 381 stdout=subprocess.PIPE, 382 cwd=tmpdir) 383 self.addCleanup(p.stdout.close) 384 normcase = os.path.normcase 385 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) 386 387 def test_env(self): 388 newenv = os.environ.copy() 389 newenv["FRUIT"] = "orange" 390 p = subprocess.Popen([sys.executable, "-c", 391 'import sys,os;' 392 'sys.stdout.write(os.getenv("FRUIT"))'], 393 stdout=subprocess.PIPE, 394 env=newenv) 395 self.addCleanup(p.stdout.close) 396 self.assertEqual(p.stdout.read(), "orange") 397 398 def test_invalid_cmd(self): 399 # null character in the command name 400 cmd = sys.executable + '\0' 401 with self.assertRaises(TypeError): 402 subprocess.Popen([cmd, "-c", "pass"]) 403 404 # null character in the command argument 405 with self.assertRaises(TypeError): 406 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 407 408 def test_invalid_env(self): 409 # null character in the enviroment variable name 410 newenv = os.environ.copy() 411 newenv["FRUIT\0VEGETABLE"] = "cabbage" 412 with self.assertRaises(TypeError): 413 subprocess.Popen([sys.executable, "-c", "pass"], env=newenv) 414 415 # null character in the enviroment variable value 416 newenv = os.environ.copy() 417 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 418 with self.assertRaises(TypeError): 419 subprocess.Popen([sys.executable, "-c", "pass"], env=newenv) 420 421 # equal character in the enviroment variable name 422 newenv = os.environ.copy() 423 newenv["FRUIT=ORANGE"] = "lemon" 424 with self.assertRaises(ValueError): 425 subprocess.Popen([sys.executable, "-c", "pass"], env=newenv) 426 427 # equal character in the enviroment variable value 428 newenv = os.environ.copy() 429 newenv["FRUIT"] = "orange=lemon" 430 p = subprocess.Popen([sys.executable, "-c", 431 'import sys, os;' 432 'sys.stdout.write(os.getenv("FRUIT"))'], 433 stdout=subprocess.PIPE, 434 env=newenv) 435 stdout, stderr = p.communicate() 436 self.assertEqual(stdout, "orange=lemon") 437 438 def test_communicate_stdin(self): 439 p = subprocess.Popen([sys.executable, "-c", 440 'import sys;' 441 'sys.exit(sys.stdin.read() == "pear")'], 442 stdin=subprocess.PIPE) 443 p.communicate("pear") 444 self.assertEqual(p.returncode, 1) 445 446 def test_communicate_stdout(self): 447 p = subprocess.Popen([sys.executable, "-c", 448 'import sys; sys.stdout.write("pineapple")'], 449 stdout=subprocess.PIPE) 450 (stdout, stderr) = p.communicate() 451 self.assertEqual(stdout, "pineapple") 452 self.assertEqual(stderr, None) 453 454 def test_communicate_stderr(self): 455 p = subprocess.Popen([sys.executable, "-c", 456 'import sys; sys.stderr.write("pineapple")'], 457 stderr=subprocess.PIPE) 458 (stdout, stderr) = p.communicate() 459 self.assertEqual(stdout, None) 460 self.assertStderrEqual(stderr, "pineapple") 461 462 def test_communicate(self): 463 p = subprocess.Popen([sys.executable, "-c", 464 'import sys,os;' 465 'sys.stderr.write("pineapple");' 466 'sys.stdout.write(sys.stdin.read())'], 467 stdin=subprocess.PIPE, 468 stdout=subprocess.PIPE, 469 stderr=subprocess.PIPE) 470 self.addCleanup(p.stdout.close) 471 self.addCleanup(p.stderr.close) 472 self.addCleanup(p.stdin.close) 473 (stdout, stderr) = p.communicate("banana") 474 self.assertEqual(stdout, "banana") 475 self.assertStderrEqual(stderr, "pineapple") 476 477 # This test is Linux specific for simplicity to at least have 478 # some coverage. It is not a platform specific bug. 479 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 480 "Linux specific") 481 # Test for the fd leak reported in http://bugs.python.org/issue2791. 482 def test_communicate_pipe_fd_leak(self): 483 fd_directory = '/proc/%d/fd' % os.getpid() 484 num_fds_before_popen = len(os.listdir(fd_directory)) 485 p = subprocess.Popen([sys.executable, "-c", "print('')"], 486 stdout=subprocess.PIPE) 487 p.communicate() 488 num_fds_after_communicate = len(os.listdir(fd_directory)) 489 del p 490 num_fds_after_destruction = len(os.listdir(fd_directory)) 491 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 492 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 493 494 def test_communicate_returns(self): 495 # communicate() should return None if no redirection is active 496 p = subprocess.Popen([sys.executable, "-c", 497 "import sys; sys.exit(47)"]) 498 (stdout, stderr) = p.communicate() 499 self.assertEqual(stdout, None) 500 self.assertEqual(stderr, None) 501 502 def test_communicate_pipe_buf(self): 503 # communicate() with writes larger than pipe_buf 504 # This test will probably deadlock rather than fail, if 505 # communicate() does not work properly. 506 x, y = os.pipe() 507 if mswindows: 508 pipe_buf = 512 509 else: 510 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") 511 os.close(x) 512 os.close(y) 513 p = subprocess.Popen([sys.executable, "-c", 514 'import sys,os;' 515 'sys.stdout.write(sys.stdin.read(47));' 516 'sys.stderr.write("xyz"*%d);' 517 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 518 stdin=subprocess.PIPE, 519 stdout=subprocess.PIPE, 520 stderr=subprocess.PIPE) 521 self.addCleanup(p.stdout.close) 522 self.addCleanup(p.stderr.close) 523 self.addCleanup(p.stdin.close) 524 string_to_write = "abc"*pipe_buf 525 (stdout, stderr) = p.communicate(string_to_write) 526 self.assertEqual(stdout, string_to_write) 527 528 def test_writes_before_communicate(self): 529 # stdin.write before communicate() 530 p = subprocess.Popen([sys.executable, "-c", 531 'import sys,os;' 532 'sys.stdout.write(sys.stdin.read())'], 533 stdin=subprocess.PIPE, 534 stdout=subprocess.PIPE, 535 stderr=subprocess.PIPE) 536 self.addCleanup(p.stdout.close) 537 self.addCleanup(p.stderr.close) 538 self.addCleanup(p.stdin.close) 539 p.stdin.write("banana") 540 (stdout, stderr) = p.communicate("split") 541 self.assertEqual(stdout, "bananasplit") 542 self.assertStderrEqual(stderr, "") 543 544 def test_universal_newlines(self): 545 p = subprocess.Popen([sys.executable, "-c", 546 'import sys,os;' + SETBINARY + 547 'sys.stdout.write("line1\\n");' 548 'sys.stdout.flush();' 549 'sys.stdout.write("line2\\r");' 550 'sys.stdout.flush();' 551 'sys.stdout.write("line3\\r\\n");' 552 'sys.stdout.flush();' 553 'sys.stdout.write("line4\\r");' 554 'sys.stdout.flush();' 555 'sys.stdout.write("\\nline5");' 556 'sys.stdout.flush();' 557 'sys.stdout.write("\\nline6");'], 558 stdout=subprocess.PIPE, 559 universal_newlines=1) 560 self.addCleanup(p.stdout.close) 561 stdout = p.stdout.read() 562 if hasattr(file, 'newlines'): 563 # Interpreter with universal newline support 564 self.assertEqual(stdout, 565 "line1\nline2\nline3\nline4\nline5\nline6") 566 else: 567 # Interpreter without universal newline support 568 self.assertEqual(stdout, 569 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 570 571 def test_universal_newlines_communicate(self): 572 # universal newlines through communicate() 573 p = subprocess.Popen([sys.executable, "-c", 574 'import sys,os;' + SETBINARY + 575 'sys.stdout.write("line1\\n");' 576 'sys.stdout.flush();' 577 'sys.stdout.write("line2\\r");' 578 'sys.stdout.flush();' 579 'sys.stdout.write("line3\\r\\n");' 580 'sys.stdout.flush();' 581 'sys.stdout.write("line4\\r");' 582 'sys.stdout.flush();' 583 'sys.stdout.write("\\nline5");' 584 'sys.stdout.flush();' 585 'sys.stdout.write("\\nline6");'], 586 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 587 universal_newlines=1) 588 self.addCleanup(p.stdout.close) 589 self.addCleanup(p.stderr.close) 590 (stdout, stderr) = p.communicate() 591 if hasattr(file, 'newlines'): 592 # Interpreter with universal newline support 593 self.assertEqual(stdout, 594 "line1\nline2\nline3\nline4\nline5\nline6") 595 else: 596 # Interpreter without universal newline support 597 self.assertEqual(stdout, 598 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 599 600 def test_no_leaking(self): 601 # Make sure we leak no resources 602 if not mswindows: 603 max_handles = 1026 # too much for most UNIX systems 604 else: 605 max_handles = 2050 # too much for (at least some) Windows setups 606 handles = [] 607 try: 608 for i in range(max_handles): 609 try: 610 handles.append(os.open(test_support.TESTFN, 611 os.O_WRONLY | os.O_CREAT)) 612 except OSError as e: 613 if e.errno != errno.EMFILE: 614 raise 615 break 616 else: 617 self.skipTest("failed to reach the file descriptor limit " 618 "(tried %d)" % max_handles) 619 # Close a couple of them (should be enough for a subprocess) 620 for i in range(10): 621 os.close(handles.pop()) 622 # Loop creating some subprocesses. If one of them leaks some fds, 623 # the next loop iteration will fail by reaching the max fd limit. 624 for i in range(15): 625 p = subprocess.Popen([sys.executable, "-c", 626 "import sys;" 627 "sys.stdout.write(sys.stdin.read())"], 628 stdin=subprocess.PIPE, 629 stdout=subprocess.PIPE, 630 stderr=subprocess.PIPE) 631 data = p.communicate(b"lime")[0] 632 self.assertEqual(data, b"lime") 633 finally: 634 for h in handles: 635 os.close(h) 636 test_support.unlink(test_support.TESTFN) 637 638 def test_list2cmdline(self): 639 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 640 '"a b c" d e') 641 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 642 'ab\\"c \\ d') 643 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 644 'ab\\"c " \\\\" d') 645 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 646 'a\\\\\\b "de fg" h') 647 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 648 'a\\\\\\"b c d') 649 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 650 '"a\\\\b c" d e') 651 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 652 '"a\\\\b\\ c" d e') 653 self.assertEqual(subprocess.list2cmdline(['ab', '']), 654 'ab ""') 655 656 657 def test_poll(self): 658 p = subprocess.Popen([sys.executable, 659 "-c", "import time; time.sleep(1)"]) 660 count = 0 661 while p.poll() is None: 662 time.sleep(0.1) 663 count += 1 664 # We expect that the poll loop probably went around about 10 times, 665 # but, based on system scheduling we can't control, it's possible 666 # poll() never returned None. It "should be" very rare that it 667 # didn't go around at least twice. 668 self.assertGreaterEqual(count, 2) 669 # Subsequent invocations should just return the returncode 670 self.assertEqual(p.poll(), 0) 671 672 673 def test_wait(self): 674 p = subprocess.Popen([sys.executable, 675 "-c", "import time; time.sleep(2)"]) 676 self.assertEqual(p.wait(), 0) 677 # Subsequent invocations should just return the returncode 678 self.assertEqual(p.wait(), 0) 679 680 681 def test_invalid_bufsize(self): 682 # an invalid type of the bufsize argument should raise 683 # TypeError. 684 with self.assertRaises(TypeError): 685 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 686 687 def test_leaking_fds_on_error(self): 688 # see bug #5179: Popen leaks file descriptors to PIPEs if 689 # the child fails to execute; this will eventually exhaust 690 # the maximum number of open fds. 1024 seems a very common 691 # value for that limit, but Windows has 2048, so we loop 692 # 1024 times (each call leaked two fds). 693 for i in range(1024): 694 # Windows raises IOError. Others raise OSError. 695 with self.assertRaises(EnvironmentError) as c: 696 subprocess.Popen(['nonexisting_i_hope'], 697 stdout=subprocess.PIPE, 698 stderr=subprocess.PIPE) 699 # ignore errors that indicate the command was not found 700 if c.exception.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EACCES): 701 raise c.exception 702 703 @unittest.skipIf(threading is None, "threading required") 704 def test_double_close_on_error(self): 705 # Issue #18851 706 fds = [] 707 def open_fds(): 708 for i in range(20): 709 fds.extend(os.pipe()) 710 time.sleep(0.001) 711 t = threading.Thread(target=open_fds) 712 t.start() 713 try: 714 with self.assertRaises(EnvironmentError): 715 subprocess.Popen(['nonexisting_i_hope'], 716 stdin=subprocess.PIPE, 717 stdout=subprocess.PIPE, 718 stderr=subprocess.PIPE) 719 finally: 720 t.join() 721 exc = None 722 for fd in fds: 723 # If a double close occurred, some of those fds will 724 # already have been closed by mistake, and os.close() 725 # here will raise. 726 try: 727 os.close(fd) 728 except OSError as e: 729 exc = e 730 if exc is not None: 731 raise exc 732 733 def test_handles_closed_on_exception(self): 734 # If CreateProcess exits with an error, ensure the 735 # duplicate output handles are released 736 ifhandle, ifname = tempfile.mkstemp() 737 ofhandle, ofname = tempfile.mkstemp() 738 efhandle, efname = tempfile.mkstemp() 739 try: 740 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 741 stderr=efhandle) 742 except OSError: 743 os.close(ifhandle) 744 os.remove(ifname) 745 os.close(ofhandle) 746 os.remove(ofname) 747 os.close(efhandle) 748 os.remove(efname) 749 self.assertFalse(os.path.exists(ifname)) 750 self.assertFalse(os.path.exists(ofname)) 751 self.assertFalse(os.path.exists(efname)) 752 753 def test_communicate_epipe(self): 754 # Issue 10963: communicate() should hide EPIPE 755 p = subprocess.Popen([sys.executable, "-c", 'pass'], 756 stdin=subprocess.PIPE, 757 stdout=subprocess.PIPE, 758 stderr=subprocess.PIPE) 759 self.addCleanup(p.stdout.close) 760 self.addCleanup(p.stderr.close) 761 self.addCleanup(p.stdin.close) 762 p.communicate("x" * 2**20) 763 764 def test_communicate_epipe_only_stdin(self): 765 # Issue 10963: communicate() should hide EPIPE 766 p = subprocess.Popen([sys.executable, "-c", 'pass'], 767 stdin=subprocess.PIPE) 768 self.addCleanup(p.stdin.close) 769 time.sleep(2) 770 p.communicate("x" * 2**20) 771 772 # This test is Linux-ish specific for simplicity to at least have 773 # some coverage. It is not a platform specific bug. 774 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 775 "Linux specific") 776 def test_failed_child_execute_fd_leak(self): 777 """Test for the fork() failure fd leak reported in issue16327.""" 778 fd_directory = '/proc/%d/fd' % os.getpid() 779 fds_before_popen = os.listdir(fd_directory) 780 with self.assertRaises(PopenTestException): 781 PopenExecuteChildRaises( 782 [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE, 783 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 784 785 # NOTE: This test doesn't verify that the real _execute_child 786 # does not close the file descriptors itself on the way out 787 # during an exception. Code inspection has confirmed that. 788 789 fds_after_exception = os.listdir(fd_directory) 790 self.assertEqual(fds_before_popen, fds_after_exception) 791 792 793# context manager 794class _SuppressCoreFiles(object): 795 """Try to prevent core files from being created.""" 796 old_limit = None 797 798 def __enter__(self): 799 """Try to save previous ulimit, then set it to (0, 0).""" 800 if resource is not None: 801 try: 802 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 803 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 804 except (ValueError, resource.error): 805 pass 806 807 if sys.platform == 'darwin': 808 # Check if the 'Crash Reporter' on OSX was configured 809 # in 'Developer' mode and warn that it will get triggered 810 # when it is. 811 # 812 # This assumes that this context manager is used in tests 813 # that might trigger the next manager. 814 value = subprocess.Popen(['/usr/bin/defaults', 'read', 815 'com.apple.CrashReporter', 'DialogType'], 816 stdout=subprocess.PIPE).communicate()[0] 817 if value.strip() == b'developer': 818 print "this tests triggers the Crash Reporter, that is intentional" 819 sys.stdout.flush() 820 821 def __exit__(self, *args): 822 """Return core file behavior to default.""" 823 if self.old_limit is None: 824 return 825 if resource is not None: 826 try: 827 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 828 except (ValueError, resource.error): 829 pass 830 831 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 832 "Requires signal.SIGALRM") 833 def test_communicate_eintr(self): 834 # Issue #12493: communicate() should handle EINTR 835 def handler(signum, frame): 836 pass 837 old_handler = signal.signal(signal.SIGALRM, handler) 838 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) 839 840 # the process is running for 2 seconds 841 args = [sys.executable, "-c", 'import time; time.sleep(2)'] 842 for stream in ('stdout', 'stderr'): 843 kw = {stream: subprocess.PIPE} 844 with subprocess.Popen(args, **kw) as process: 845 signal.alarm(1) 846 try: 847 # communicate() will be interrupted by SIGALRM 848 process.communicate() 849 finally: 850 signal.alarm(0) 851 852 853@unittest.skipIf(mswindows, "POSIX specific tests") 854class POSIXProcessTestCase(BaseTestCase): 855 856 def test_exceptions(self): 857 # caught & re-raised exceptions 858 with self.assertRaises(OSError) as c: 859 p = subprocess.Popen([sys.executable, "-c", ""], 860 cwd="/this/path/does/not/exist") 861 # The attribute child_traceback should contain "os.chdir" somewhere. 862 self.assertIn("os.chdir", c.exception.child_traceback) 863 864 def test_run_abort(self): 865 # returncode handles signal termination 866 with _SuppressCoreFiles(): 867 p = subprocess.Popen([sys.executable, "-c", 868 "import os; os.abort()"]) 869 p.wait() 870 self.assertEqual(-p.returncode, signal.SIGABRT) 871 872 def test_preexec(self): 873 # preexec function 874 p = subprocess.Popen([sys.executable, "-c", 875 "import sys, os;" 876 "sys.stdout.write(os.getenv('FRUIT'))"], 877 stdout=subprocess.PIPE, 878 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 879 self.addCleanup(p.stdout.close) 880 self.assertEqual(p.stdout.read(), "apple") 881 882 class _TestExecuteChildPopen(subprocess.Popen): 883 """Used to test behavior at the end of _execute_child.""" 884 def __init__(self, testcase, *args, **kwargs): 885 self._testcase = testcase 886 subprocess.Popen.__init__(self, *args, **kwargs) 887 888 def _execute_child( 889 self, args, executable, preexec_fn, close_fds, cwd, env, 890 universal_newlines, startupinfo, creationflags, shell, to_close, 891 p2cread, p2cwrite, 892 c2pread, c2pwrite, 893 errread, errwrite): 894 try: 895 subprocess.Popen._execute_child( 896 self, args, executable, preexec_fn, close_fds, 897 cwd, env, universal_newlines, 898 startupinfo, creationflags, shell, to_close, 899 p2cread, p2cwrite, 900 c2pread, c2pwrite, 901 errread, errwrite) 902 finally: 903 # Open a bunch of file descriptors and verify that 904 # none of them are the same as the ones the Popen 905 # instance is using for stdin/stdout/stderr. 906 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 907 for _ in range(8)] 908 try: 909 for fd in devzero_fds: 910 self._testcase.assertNotIn( 911 fd, (p2cwrite, c2pread, errread)) 912 finally: 913 for fd in devzero_fds: 914 os.close(fd) 915 916 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 917 def test_preexec_errpipe_does_not_double_close_pipes(self): 918 """Issue16140: Don't double close pipes on preexec error.""" 919 920 def raise_it(): 921 raise RuntimeError("force the _execute_child() errpipe_data path.") 922 923 with self.assertRaises(RuntimeError): 924 self._TestExecuteChildPopen( 925 self, [sys.executable, "-c", "pass"], 926 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 927 stderr=subprocess.PIPE, preexec_fn=raise_it) 928 929 def test_args_string(self): 930 # args is a string 931 f, fname = tempfile.mkstemp() 932 os.write(f, "#!/bin/sh\n") 933 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 934 sys.executable) 935 os.close(f) 936 os.chmod(fname, 0o700) 937 p = subprocess.Popen(fname) 938 p.wait() 939 os.remove(fname) 940 self.assertEqual(p.returncode, 47) 941 942 def test_invalid_args(self): 943 # invalid arguments should raise ValueError 944 self.assertRaises(ValueError, subprocess.call, 945 [sys.executable, "-c", 946 "import sys; sys.exit(47)"], 947 startupinfo=47) 948 self.assertRaises(ValueError, subprocess.call, 949 [sys.executable, "-c", 950 "import sys; sys.exit(47)"], 951 creationflags=47) 952 953 def test_shell_sequence(self): 954 # Run command through the shell (sequence) 955 newenv = os.environ.copy() 956 newenv["FRUIT"] = "apple" 957 p = subprocess.Popen(["echo $FRUIT"], shell=1, 958 stdout=subprocess.PIPE, 959 env=newenv) 960 self.addCleanup(p.stdout.close) 961 self.assertEqual(p.stdout.read().strip(), "apple") 962 963 def test_shell_string(self): 964 # Run command through the shell (string) 965 newenv = os.environ.copy() 966 newenv["FRUIT"] = "apple" 967 p = subprocess.Popen("echo $FRUIT", shell=1, 968 stdout=subprocess.PIPE, 969 env=newenv) 970 self.addCleanup(p.stdout.close) 971 self.assertEqual(p.stdout.read().strip(), "apple") 972 973 def test_call_string(self): 974 # call() function with string argument on UNIX 975 f, fname = tempfile.mkstemp() 976 os.write(f, "#!/bin/sh\n") 977 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 978 sys.executable) 979 os.close(f) 980 os.chmod(fname, 0700) 981 rc = subprocess.call(fname) 982 os.remove(fname) 983 self.assertEqual(rc, 47) 984 985 def test_specific_shell(self): 986 # Issue #9265: Incorrect name passed as arg[0]. 987 shells = [] 988 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 989 for name in ['bash', 'ksh']: 990 sh = os.path.join(prefix, name) 991 if os.path.isfile(sh): 992 shells.append(sh) 993 if not shells: # Will probably work for any shell but csh. 994 self.skipTest("bash or ksh required for this test") 995 sh = '/bin/sh' 996 if os.path.isfile(sh) and not os.path.islink(sh): 997 # Test will fail if /bin/sh is a symlink to csh. 998 shells.append(sh) 999 for sh in shells: 1000 p = subprocess.Popen("echo $0", executable=sh, shell=True, 1001 stdout=subprocess.PIPE) 1002 self.addCleanup(p.stdout.close) 1003 self.assertEqual(p.stdout.read().strip(), sh) 1004 1005 def _kill_process(self, method, *args): 1006 # Do not inherit file handles from the parent. 1007 # It should fix failures on some platforms. 1008 p = subprocess.Popen([sys.executable, "-c", """if 1: 1009 import sys, time 1010 sys.stdout.write('x\\n') 1011 sys.stdout.flush() 1012 time.sleep(30) 1013 """], 1014 close_fds=True, 1015 stdin=subprocess.PIPE, 1016 stdout=subprocess.PIPE, 1017 stderr=subprocess.PIPE) 1018 # Wait for the interpreter to be completely initialized before 1019 # sending any signal. 1020 p.stdout.read(1) 1021 getattr(p, method)(*args) 1022 return p 1023 1024 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 1025 "Due to known OS bug (issue #16762)") 1026 def _kill_dead_process(self, method, *args): 1027 # Do not inherit file handles from the parent. 1028 # It should fix failures on some platforms. 1029 p = subprocess.Popen([sys.executable, "-c", """if 1: 1030 import sys, time 1031 sys.stdout.write('x\\n') 1032 sys.stdout.flush() 1033 """], 1034 close_fds=True, 1035 stdin=subprocess.PIPE, 1036 stdout=subprocess.PIPE, 1037 stderr=subprocess.PIPE) 1038 # Wait for the interpreter to be completely initialized before 1039 # sending any signal. 1040 p.stdout.read(1) 1041 # The process should end after this 1042 time.sleep(1) 1043 # This shouldn't raise even though the child is now dead 1044 getattr(p, method)(*args) 1045 p.communicate() 1046 1047 def test_send_signal(self): 1048 p = self._kill_process('send_signal', signal.SIGINT) 1049 _, stderr = p.communicate() 1050 self.assertIn('KeyboardInterrupt', stderr) 1051 self.assertNotEqual(p.wait(), 0) 1052 1053 def test_kill(self): 1054 p = self._kill_process('kill') 1055 _, stderr = p.communicate() 1056 self.assertStderrEqual(stderr, '') 1057 self.assertEqual(p.wait(), -signal.SIGKILL) 1058 1059 def test_terminate(self): 1060 p = self._kill_process('terminate') 1061 _, stderr = p.communicate() 1062 self.assertStderrEqual(stderr, '') 1063 self.assertEqual(p.wait(), -signal.SIGTERM) 1064 1065 def test_send_signal_dead(self): 1066 # Sending a signal to a dead process 1067 self._kill_dead_process('send_signal', signal.SIGINT) 1068 1069 def test_kill_dead(self): 1070 # Killing a dead process 1071 self._kill_dead_process('kill') 1072 1073 def test_terminate_dead(self): 1074 # Terminating a dead process 1075 self._kill_dead_process('terminate') 1076 1077 def check_close_std_fds(self, fds): 1078 # Issue #9905: test that subprocess pipes still work properly with 1079 # some standard fds closed 1080 stdin = 0 1081 newfds = [] 1082 for a in fds: 1083 b = os.dup(a) 1084 newfds.append(b) 1085 if a == 0: 1086 stdin = b 1087 try: 1088 for fd in fds: 1089 os.close(fd) 1090 out, err = subprocess.Popen([sys.executable, "-c", 1091 'import sys;' 1092 'sys.stdout.write("apple");' 1093 'sys.stdout.flush();' 1094 'sys.stderr.write("orange")'], 1095 stdin=stdin, 1096 stdout=subprocess.PIPE, 1097 stderr=subprocess.PIPE).communicate() 1098 err = test_support.strip_python_stderr(err) 1099 self.assertEqual((out, err), (b'apple', b'orange')) 1100 finally: 1101 for b, a in zip(newfds, fds): 1102 os.dup2(b, a) 1103 for b in newfds: 1104 os.close(b) 1105 1106 def test_close_fd_0(self): 1107 self.check_close_std_fds([0]) 1108 1109 def test_close_fd_1(self): 1110 self.check_close_std_fds([1]) 1111 1112 def test_close_fd_2(self): 1113 self.check_close_std_fds([2]) 1114 1115 def test_close_fds_0_1(self): 1116 self.check_close_std_fds([0, 1]) 1117 1118 def test_close_fds_0_2(self): 1119 self.check_close_std_fds([0, 2]) 1120 1121 def test_close_fds_1_2(self): 1122 self.check_close_std_fds([1, 2]) 1123 1124 def test_close_fds_0_1_2(self): 1125 # Issue #10806: test that subprocess pipes still work properly with 1126 # all standard fds closed. 1127 self.check_close_std_fds([0, 1, 2]) 1128 1129 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 1130 # open up some temporary files 1131 temps = [tempfile.mkstemp() for i in range(3)] 1132 temp_fds = [fd for fd, fname in temps] 1133 try: 1134 # unlink the files -- we won't need to reopen them 1135 for fd, fname in temps: 1136 os.unlink(fname) 1137 1138 # save a copy of the standard file descriptors 1139 saved_fds = [os.dup(fd) for fd in range(3)] 1140 try: 1141 # duplicate the temp files over the standard fd's 0, 1, 2 1142 for fd, temp_fd in enumerate(temp_fds): 1143 os.dup2(temp_fd, fd) 1144 1145 # write some data to what will become stdin, and rewind 1146 os.write(stdin_no, b"STDIN") 1147 os.lseek(stdin_no, 0, 0) 1148 1149 # now use those files in the given order, so that subprocess 1150 # has to rearrange them in the child 1151 p = subprocess.Popen([sys.executable, "-c", 1152 'import sys; got = sys.stdin.read();' 1153 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 1154 stdin=stdin_no, 1155 stdout=stdout_no, 1156 stderr=stderr_no) 1157 p.wait() 1158 1159 for fd in temp_fds: 1160 os.lseek(fd, 0, 0) 1161 1162 out = os.read(stdout_no, 1024) 1163 err = test_support.strip_python_stderr(os.read(stderr_no, 1024)) 1164 finally: 1165 for std, saved in enumerate(saved_fds): 1166 os.dup2(saved, std) 1167 os.close(saved) 1168 1169 self.assertEqual(out, b"got STDIN") 1170 self.assertEqual(err, b"err") 1171 1172 finally: 1173 for fd in temp_fds: 1174 os.close(fd) 1175 1176 # When duping fds, if there arises a situation where one of the fds is 1177 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 1178 # This tests all combinations of this. 1179 def test_swap_fds(self): 1180 self.check_swap_fds(0, 1, 2) 1181 self.check_swap_fds(0, 2, 1) 1182 self.check_swap_fds(1, 0, 2) 1183 self.check_swap_fds(1, 2, 0) 1184 self.check_swap_fds(2, 0, 1) 1185 self.check_swap_fds(2, 1, 0) 1186 1187 def test_wait_when_sigchild_ignored(self): 1188 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 1189 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 1190 subdir="subprocessdata") 1191 p = subprocess.Popen([sys.executable, sigchild_ignore], 1192 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1193 stdout, stderr = p.communicate() 1194 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 1195 " non-zero with this error:\n%s" % stderr) 1196 1197 def test_zombie_fast_process_del(self): 1198 # Issue #12650: on Unix, if Popen.__del__() was called before the 1199 # process exited, it wouldn't be added to subprocess._active, and would 1200 # remain a zombie. 1201 # spawn a Popen, and delete its reference before it exits 1202 p = subprocess.Popen([sys.executable, "-c", 1203 'import sys, time;' 1204 'time.sleep(0.2)'], 1205 stdout=subprocess.PIPE, 1206 stderr=subprocess.PIPE) 1207 self.addCleanup(p.stdout.close) 1208 self.addCleanup(p.stderr.close) 1209 ident = id(p) 1210 pid = p.pid 1211 del p 1212 # check that p is in the active processes list 1213 self.assertIn(ident, [id(o) for o in subprocess._active]) 1214 1215 def test_leak_fast_process_del_killed(self): 1216 # Issue #12650: on Unix, if Popen.__del__() was called before the 1217 # process exited, and the process got killed by a signal, it would never 1218 # be removed from subprocess._active, which triggered a FD and memory 1219 # leak. 1220 # spawn a Popen, delete its reference and kill it 1221 p = subprocess.Popen([sys.executable, "-c", 1222 'import time;' 1223 'time.sleep(3)'], 1224 stdout=subprocess.PIPE, 1225 stderr=subprocess.PIPE) 1226 self.addCleanup(p.stdout.close) 1227 self.addCleanup(p.stderr.close) 1228 ident = id(p) 1229 pid = p.pid 1230 del p 1231 os.kill(pid, signal.SIGKILL) 1232 # check that p is in the active processes list 1233 self.assertIn(ident, [id(o) for o in subprocess._active]) 1234 1235 # let some time for the process to exit, and create a new Popen: this 1236 # should trigger the wait() of p 1237 time.sleep(0.2) 1238 with self.assertRaises(EnvironmentError) as c: 1239 with subprocess.Popen(['nonexisting_i_hope'], 1240 stdout=subprocess.PIPE, 1241 stderr=subprocess.PIPE) as proc: 1242 pass 1243 # p should have been wait()ed on, and removed from the _active list 1244 self.assertRaises(OSError, os.waitpid, pid, 0) 1245 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 1246 1247 def test_pipe_cloexec(self): 1248 # Issue 12786: check that the communication pipes' FDs are set CLOEXEC, 1249 # and are not inherited by another child process. 1250 p1 = subprocess.Popen([sys.executable, "-c", 1251 'import os;' 1252 'os.read(0, 1)' 1253 ], 1254 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1255 stderr=subprocess.PIPE) 1256 1257 p2 = subprocess.Popen([sys.executable, "-c", """if True: 1258 import os, errno, sys 1259 for fd in %r: 1260 try: 1261 os.close(fd) 1262 except OSError as e: 1263 if e.errno != errno.EBADF: 1264 raise 1265 else: 1266 sys.exit(1) 1267 sys.exit(0) 1268 """ % [f.fileno() for f in (p1.stdin, p1.stdout, 1269 p1.stderr)] 1270 ], 1271 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1272 stderr=subprocess.PIPE, close_fds=False) 1273 p1.communicate('foo') 1274 _, stderr = p2.communicate() 1275 1276 self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr)) 1277 1278 @unittest.skipUnless(_testcapi is not None 1279 and hasattr(_testcapi, 'W_STOPCODE'), 1280 'need _testcapi.W_STOPCODE') 1281 def test_stopped(self): 1282 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 1283 args = [sys.executable, '-c', 'pass'] 1284 proc = subprocess.Popen(args) 1285 1286 # Wait until the real process completes to avoid zombie process 1287 pid = proc.pid 1288 pid, status = os.waitpid(pid, 0) 1289 self.assertEqual(status, 0) 1290 1291 status = _testcapi.W_STOPCODE(3) 1292 1293 def mock_waitpid(pid, flags): 1294 return (pid, status) 1295 1296 with test_support.swap_attr(os, 'waitpid', mock_waitpid): 1297 returncode = proc.wait() 1298 1299 self.assertEqual(returncode, -3) 1300 1301 1302@unittest.skipUnless(mswindows, "Windows specific tests") 1303class Win32ProcessTestCase(BaseTestCase): 1304 1305 def test_startupinfo(self): 1306 # startupinfo argument 1307 # We uses hardcoded constants, because we do not want to 1308 # depend on win32all. 1309 STARTF_USESHOWWINDOW = 1 1310 SW_MAXIMIZE = 3 1311 startupinfo = subprocess.STARTUPINFO() 1312 startupinfo.dwFlags = STARTF_USESHOWWINDOW 1313 startupinfo.wShowWindow = SW_MAXIMIZE 1314 # Since Python is a console process, it won't be affected 1315 # by wShowWindow, but the argument should be silently 1316 # ignored 1317 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 1318 startupinfo=startupinfo) 1319 1320 def test_creationflags(self): 1321 # creationflags argument 1322 CREATE_NEW_CONSOLE = 16 1323 sys.stderr.write(" a DOS box should flash briefly ...\n") 1324 subprocess.call(sys.executable + 1325 ' -c "import time; time.sleep(0.25)"', 1326 creationflags=CREATE_NEW_CONSOLE) 1327 1328 def test_invalid_args(self): 1329 # invalid arguments should raise ValueError 1330 self.assertRaises(ValueError, subprocess.call, 1331 [sys.executable, "-c", 1332 "import sys; sys.exit(47)"], 1333 preexec_fn=lambda: 1) 1334 self.assertRaises(ValueError, subprocess.call, 1335 [sys.executable, "-c", 1336 "import sys; sys.exit(47)"], 1337 stdout=subprocess.PIPE, 1338 close_fds=True) 1339 1340 def test_close_fds(self): 1341 # close file descriptors 1342 rc = subprocess.call([sys.executable, "-c", 1343 "import sys; sys.exit(47)"], 1344 close_fds=True) 1345 self.assertEqual(rc, 47) 1346 1347 def test_shell_sequence(self): 1348 # Run command through the shell (sequence) 1349 newenv = os.environ.copy() 1350 newenv["FRUIT"] = "physalis" 1351 p = subprocess.Popen(["set"], shell=1, 1352 stdout=subprocess.PIPE, 1353 env=newenv) 1354 self.addCleanup(p.stdout.close) 1355 self.assertIn("physalis", p.stdout.read()) 1356 1357 def test_shell_string(self): 1358 # Run command through the shell (string) 1359 newenv = os.environ.copy() 1360 newenv["FRUIT"] = "physalis" 1361 p = subprocess.Popen("set", shell=1, 1362 stdout=subprocess.PIPE, 1363 env=newenv) 1364 self.addCleanup(p.stdout.close) 1365 self.assertIn("physalis", p.stdout.read()) 1366 1367 def test_call_string(self): 1368 # call() function with string argument on Windows 1369 rc = subprocess.call(sys.executable + 1370 ' -c "import sys; sys.exit(47)"') 1371 self.assertEqual(rc, 47) 1372 1373 def _kill_process(self, method, *args): 1374 # Some win32 buildbot raises EOFError if stdin is inherited 1375 p = subprocess.Popen([sys.executable, "-c", """if 1: 1376 import sys, time 1377 sys.stdout.write('x\\n') 1378 sys.stdout.flush() 1379 time.sleep(30) 1380 """], 1381 stdin=subprocess.PIPE, 1382 stdout=subprocess.PIPE, 1383 stderr=subprocess.PIPE) 1384 self.addCleanup(p.stdout.close) 1385 self.addCleanup(p.stderr.close) 1386 self.addCleanup(p.stdin.close) 1387 # Wait for the interpreter to be completely initialized before 1388 # sending any signal. 1389 p.stdout.read(1) 1390 getattr(p, method)(*args) 1391 _, stderr = p.communicate() 1392 self.assertStderrEqual(stderr, '') 1393 returncode = p.wait() 1394 self.assertNotEqual(returncode, 0) 1395 1396 def _kill_dead_process(self, method, *args): 1397 p = subprocess.Popen([sys.executable, "-c", """if 1: 1398 import sys, time 1399 sys.stdout.write('x\\n') 1400 sys.stdout.flush() 1401 sys.exit(42) 1402 """], 1403 stdin=subprocess.PIPE, 1404 stdout=subprocess.PIPE, 1405 stderr=subprocess.PIPE) 1406 self.addCleanup(p.stdout.close) 1407 self.addCleanup(p.stderr.close) 1408 self.addCleanup(p.stdin.close) 1409 # Wait for the interpreter to be completely initialized before 1410 # sending any signal. 1411 p.stdout.read(1) 1412 # The process should end after this 1413 time.sleep(1) 1414 # This shouldn't raise even though the child is now dead 1415 getattr(p, method)(*args) 1416 _, stderr = p.communicate() 1417 self.assertStderrEqual(stderr, b'') 1418 rc = p.wait() 1419 self.assertEqual(rc, 42) 1420 1421 def test_send_signal(self): 1422 self._kill_process('send_signal', signal.SIGTERM) 1423 1424 def test_kill(self): 1425 self._kill_process('kill') 1426 1427 def test_terminate(self): 1428 self._kill_process('terminate') 1429 1430 def test_send_signal_dead(self): 1431 self._kill_dead_process('send_signal', signal.SIGTERM) 1432 1433 def test_kill_dead(self): 1434 self._kill_dead_process('kill') 1435 1436 def test_terminate_dead(self): 1437 self._kill_dead_process('terminate') 1438 1439 1440@unittest.skipUnless(getattr(subprocess, '_has_poll', False), 1441 "poll system call not supported") 1442class ProcessTestCaseNoPoll(ProcessTestCase): 1443 def setUp(self): 1444 subprocess._has_poll = False 1445 ProcessTestCase.setUp(self) 1446 1447 def tearDown(self): 1448 subprocess._has_poll = True 1449 ProcessTestCase.tearDown(self) 1450 1451 1452class HelperFunctionTests(unittest.TestCase): 1453 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 1454 def test_eintr_retry_call(self): 1455 record_calls = [] 1456 def fake_os_func(*args): 1457 record_calls.append(args) 1458 if len(record_calls) == 2: 1459 raise OSError(errno.EINTR, "fake interrupted system call") 1460 return tuple(reversed(args)) 1461 1462 self.assertEqual((999, 256), 1463 subprocess._eintr_retry_call(fake_os_func, 256, 999)) 1464 self.assertEqual([(256, 999)], record_calls) 1465 # This time there will be an EINTR so it will loop once. 1466 self.assertEqual((666,), 1467 subprocess._eintr_retry_call(fake_os_func, 666)) 1468 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 1469 1470@unittest.skipUnless(mswindows, "mswindows only") 1471class CommandsWithSpaces (BaseTestCase): 1472 1473 def setUp(self): 1474 super(CommandsWithSpaces, self).setUp() 1475 f, fname = tempfile.mkstemp(".py", "te st") 1476 self.fname = fname.lower () 1477 os.write(f, b"import sys;" 1478 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1479 ) 1480 os.close(f) 1481 1482 def tearDown(self): 1483 os.remove(self.fname) 1484 super(CommandsWithSpaces, self).tearDown() 1485 1486 def with_spaces(self, *args, **kwargs): 1487 kwargs['stdout'] = subprocess.PIPE 1488 p = subprocess.Popen(*args, **kwargs) 1489 self.addCleanup(p.stdout.close) 1490 self.assertEqual( 1491 p.stdout.read ().decode("mbcs"), 1492 "2 [%r, 'ab cd']" % self.fname 1493 ) 1494 1495 def test_shell_string_with_spaces(self): 1496 # call() function with string argument with spaces on Windows 1497 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1498 "ab cd"), shell=1) 1499 1500 def test_shell_sequence_with_spaces(self): 1501 # call() function with sequence argument with spaces on Windows 1502 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1503 1504 def test_noshell_string_with_spaces(self): 1505 # call() function with string argument with spaces on Windows 1506 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1507 "ab cd")) 1508 1509 def test_noshell_sequence_with_spaces(self): 1510 # call() function with sequence argument with spaces on Windows 1511 self.with_spaces([sys.executable, self.fname, "ab cd"]) 1512 1513def test_main(): 1514 unit_tests = (ProcessTestCase, 1515 POSIXProcessTestCase, 1516 Win32ProcessTestCase, 1517 ProcessTestCaseNoPoll, 1518 HelperFunctionTests, 1519 CommandsWithSpaces) 1520 1521 test_support.run_unittest(*unit_tests) 1522 test_support.reap_children() 1523 1524if __name__ == "__main__": 1525 test_main() 1526