1import unittest
2from test import test_support
3import subprocess
4import sys
5import platform
6import signal
7import os
8import errno
9import tempfile
10import time
11import re
12import sysconfig
13import textwrap
14
15try:
16    import ctypes
17except ImportError:
18    ctypes = None
19else:
20    import ctypes.util
21
22try:
23    import resource
24except ImportError:
25    resource = None
26try:
27    import threading
28except ImportError:
29    threading = None
30
31try:
32    import _testcapi
33except ImportError:
34    _testcapi = None
35
36mswindows = (sys.platform == "win32")
37
38#
39# Depends on the following external programs: Python
40#
41
42if mswindows:
43    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
44                                                'os.O_BINARY);')
45else:
46    SETBINARY = ''
47
48
49class BaseTestCase(unittest.TestCase):
50    def setUp(self):
51        # Try to minimize the number of children we have so this test
52        # doesn't crash on some buildbots (Alphas in particular).
53        test_support.reap_children()
54
55    def tearDown(self):
56        for inst in subprocess._active:
57            inst.wait()
58        subprocess._cleanup()
59        self.assertFalse(subprocess._active, "subprocess._active not empty")
60        self.doCleanups()
61        test_support.reap_children()
62
63    def assertStderrEqual(self, stderr, expected, msg=None):
64        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
65        # shutdown time.  That frustrates tests trying to check stderr produced
66        # from a spawned Python process.
67        actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
68        self.assertEqual(actual, expected, msg)
69
70
71class PopenTestException(Exception):
72    pass
73
74
75class PopenExecuteChildRaises(subprocess.Popen):
76    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
77    _execute_child fails.
78    """
79    def _execute_child(self, *args, **kwargs):
80        raise PopenTestException("Forced Exception for Test")
81
82
83class ProcessTestCase(BaseTestCase):
84
85    def test_call_seq(self):
86        # call() function with sequence argument
87        rc = subprocess.call([sys.executable, "-c",
88                              "import sys; sys.exit(47)"])
89        self.assertEqual(rc, 47)
90
91    def test_check_call_zero(self):
92        # check_call() function with zero return code
93        rc = subprocess.check_call([sys.executable, "-c",
94                                    "import sys; sys.exit(0)"])
95        self.assertEqual(rc, 0)
96
97    def test_check_call_nonzero(self):
98        # check_call() function with non-zero return code
99        with self.assertRaises(subprocess.CalledProcessError) as c:
100            subprocess.check_call([sys.executable, "-c",
101                                   "import sys; sys.exit(47)"])
102        self.assertEqual(c.exception.returncode, 47)
103
104    def test_check_output(self):
105        # check_output() function with zero return code
106        output = subprocess.check_output(
107                [sys.executable, "-c", "print 'BDFL'"])
108        self.assertIn('BDFL', output)
109
110    def test_check_output_nonzero(self):
111        # check_call() function with non-zero return code
112        with self.assertRaises(subprocess.CalledProcessError) as c:
113            subprocess.check_output(
114                    [sys.executable, "-c", "import sys; sys.exit(5)"])
115        self.assertEqual(c.exception.returncode, 5)
116
117    def test_check_output_stderr(self):
118        # check_output() function stderr redirected to stdout
119        output = subprocess.check_output(
120                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
121                stderr=subprocess.STDOUT)
122        self.assertIn('BDFL', output)
123
124    def test_check_output_stdout_arg(self):
125        # check_output() function stderr redirected to stdout
126        with self.assertRaises(ValueError) as c:
127            output = subprocess.check_output(
128                    [sys.executable, "-c", "print 'will not be run'"],
129                    stdout=sys.stdout)
130            self.fail("Expected ValueError when stdout arg supplied.")
131        self.assertIn('stdout', c.exception.args[0])
132
133    def test_call_kwargs(self):
134        # call() function with keyword args
135        newenv = os.environ.copy()
136        newenv["FRUIT"] = "banana"
137        rc = subprocess.call([sys.executable, "-c",
138                              'import sys, os;'
139                              'sys.exit(os.getenv("FRUIT")=="banana")'],
140                             env=newenv)
141        self.assertEqual(rc, 1)
142
143    def test_invalid_args(self):
144        # Popen() called with invalid arguments should raise TypeError
145        # but Popen.__del__ should not complain (issue #12085)
146        with test_support.captured_stderr() as s:
147            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
148            argcount = subprocess.Popen.__init__.__code__.co_argcount
149            too_many_args = [0] * (argcount + 1)
150            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
151        self.assertEqual(s.getvalue(), '')
152
153    def test_stdin_none(self):
154        # .stdin is None when not redirected
155        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
156                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
157        self.addCleanup(p.stdout.close)
158        self.addCleanup(p.stderr.close)
159        p.wait()
160        self.assertEqual(p.stdin, None)
161
162    def test_stdout_none(self):
163        # .stdout is None when not redirected, and the child's stdout will
164        # be inherited from the parent.  In order to test this we run a
165        # subprocess in a subprocess:
166        # this_test
167        #   \-- subprocess created by this test (parent)
168        #          \-- subprocess created by the parent subprocess (child)
169        # The parent doesn't specify stdout, so the child will use the
170        # parent's stdout.  This test checks that the message printed by the
171        # child goes to the parent stdout.  The parent also checks that the
172        # child's stdout is None.  See #11963.
173        code = ('import sys; from subprocess import Popen, PIPE;'
174                'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
175                '          stdin=PIPE, stderr=PIPE);'
176                'p.wait(); assert p.stdout is None;')
177        p = subprocess.Popen([sys.executable, "-c", code],
178                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
179        self.addCleanup(p.stdout.close)
180        self.addCleanup(p.stderr.close)
181        out, err = p.communicate()
182        self.assertEqual(p.returncode, 0, err)
183        self.assertEqual(out.rstrip(), 'test_stdout_none')
184
185    def test_stderr_none(self):
186        # .stderr is None when not redirected
187        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
188                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
189        self.addCleanup(p.stdout.close)
190        self.addCleanup(p.stdin.close)
191        p.wait()
192        self.assertEqual(p.stderr, None)
193
194    def test_executable_with_cwd(self):
195        python_dir = os.path.dirname(os.path.realpath(sys.executable))
196        p = subprocess.Popen(["somethingyoudonthave", "-c",
197                              "import sys; sys.exit(47)"],
198                             executable=sys.executable, cwd=python_dir)
199        p.wait()
200        self.assertEqual(p.returncode, 47)
201
202    @unittest.skipIf(sysconfig.is_python_build(),
203                     "need an installed Python. See #7774")
204    def test_executable_without_cwd(self):
205        # For a normal installation, it should work without 'cwd'
206        # argument.  For test runs in the build directory, see #7774.
207        p = subprocess.Popen(["somethingyoudonthave", "-c",
208                              "import sys; sys.exit(47)"],
209                             executable=sys.executable)
210        p.wait()
211        self.assertEqual(p.returncode, 47)
212
213    def test_stdin_pipe(self):
214        # stdin redirection
215        p = subprocess.Popen([sys.executable, "-c",
216                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
217                        stdin=subprocess.PIPE)
218        p.stdin.write("pear")
219        p.stdin.close()
220        p.wait()
221        self.assertEqual(p.returncode, 1)
222
223    def test_stdin_filedes(self):
224        # stdin is set to open file descriptor
225        tf = tempfile.TemporaryFile()
226        d = tf.fileno()
227        os.write(d, "pear")
228        os.lseek(d, 0, 0)
229        p = subprocess.Popen([sys.executable, "-c",
230                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
231                         stdin=d)
232        p.wait()
233        self.assertEqual(p.returncode, 1)
234
235    def test_stdin_fileobj(self):
236        # stdin is set to open file object
237        tf = tempfile.TemporaryFile()
238        tf.write("pear")
239        tf.seek(0)
240        p = subprocess.Popen([sys.executable, "-c",
241                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
242                         stdin=tf)
243        p.wait()
244        self.assertEqual(p.returncode, 1)
245
246    def test_stdout_pipe(self):
247        # stdout redirection
248        p = subprocess.Popen([sys.executable, "-c",
249                          'import sys; sys.stdout.write("orange")'],
250                         stdout=subprocess.PIPE)
251        self.addCleanup(p.stdout.close)
252        self.assertEqual(p.stdout.read(), "orange")
253
254    def test_stdout_filedes(self):
255        # stdout is set to open file descriptor
256        tf = tempfile.TemporaryFile()
257        d = tf.fileno()
258        p = subprocess.Popen([sys.executable, "-c",
259                          'import sys; sys.stdout.write("orange")'],
260                         stdout=d)
261        p.wait()
262        os.lseek(d, 0, 0)
263        self.assertEqual(os.read(d, 1024), "orange")
264
265    def test_stdout_fileobj(self):
266        # stdout is set to open file object
267        tf = tempfile.TemporaryFile()
268        p = subprocess.Popen([sys.executable, "-c",
269                          'import sys; sys.stdout.write("orange")'],
270                         stdout=tf)
271        p.wait()
272        tf.seek(0)
273        self.assertEqual(tf.read(), "orange")
274
275    def test_stderr_pipe(self):
276        # stderr redirection
277        p = subprocess.Popen([sys.executable, "-c",
278                          'import sys; sys.stderr.write("strawberry")'],
279                         stderr=subprocess.PIPE)
280        self.addCleanup(p.stderr.close)
281        self.assertStderrEqual(p.stderr.read(), "strawberry")
282
283    def test_stderr_filedes(self):
284        # stderr is set to open file descriptor
285        tf = tempfile.TemporaryFile()
286        d = tf.fileno()
287        p = subprocess.Popen([sys.executable, "-c",
288                          'import sys; sys.stderr.write("strawberry")'],
289                         stderr=d)
290        p.wait()
291        os.lseek(d, 0, 0)
292        self.assertStderrEqual(os.read(d, 1024), "strawberry")
293
294    def test_stderr_fileobj(self):
295        # stderr is set to open file object
296        tf = tempfile.TemporaryFile()
297        p = subprocess.Popen([sys.executable, "-c",
298                          'import sys; sys.stderr.write("strawberry")'],
299                         stderr=tf)
300        p.wait()
301        tf.seek(0)
302        self.assertStderrEqual(tf.read(), "strawberry")
303
304    def test_stderr_redirect_with_no_stdout_redirect(self):
305        # test stderr=STDOUT while stdout=None (not set)
306
307        # - grandchild prints to stderr
308        # - child redirects grandchild's stderr to its stdout
309        # - the parent should get grandchild's stderr in child's stdout
310        p = subprocess.Popen([sys.executable, "-c",
311                              'import sys, subprocess;'
312                              'rc = subprocess.call([sys.executable, "-c",'
313                              '    "import sys;"'
314                              '    "sys.stderr.write(\'42\')"],'
315                              '    stderr=subprocess.STDOUT);'
316                              'sys.exit(rc)'],
317                             stdout=subprocess.PIPE,
318                             stderr=subprocess.PIPE)
319        stdout, stderr = p.communicate()
320        #NOTE: stdout should get stderr from grandchild
321        self.assertStderrEqual(stdout, b'42')
322        self.assertStderrEqual(stderr, b'') # should be empty
323        self.assertEqual(p.returncode, 0)
324
325    def test_stdout_stderr_pipe(self):
326        # capture stdout and stderr to the same pipe
327        p = subprocess.Popen([sys.executable, "-c",
328                          'import sys;'
329                          'sys.stdout.write("apple");'
330                          'sys.stdout.flush();'
331                          'sys.stderr.write("orange")'],
332                         stdout=subprocess.PIPE,
333                         stderr=subprocess.STDOUT)
334        self.addCleanup(p.stdout.close)
335        self.assertStderrEqual(p.stdout.read(), "appleorange")
336
337    def test_stdout_stderr_file(self):
338        # capture stdout and stderr to the same open file
339        tf = tempfile.TemporaryFile()
340        p = subprocess.Popen([sys.executable, "-c",
341                          'import sys;'
342                          'sys.stdout.write("apple");'
343                          'sys.stdout.flush();'
344                          'sys.stderr.write("orange")'],
345                         stdout=tf,
346                         stderr=tf)
347        p.wait()
348        tf.seek(0)
349        self.assertStderrEqual(tf.read(), "appleorange")
350
351    def test_stdout_filedes_of_stdout(self):
352        # stdout is set to 1 (#1531862).
353        # To avoid printing the text on stdout, we do something similar to
354        # test_stdout_none (see above).  The parent subprocess calls the child
355        # subprocess passing stdout=1, and this test uses stdout=PIPE in
356        # order to capture and check the output of the parent. See #11963.
357        code = ('import sys, subprocess; '
358                'rc = subprocess.call([sys.executable, "-c", '
359                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
360                     '\'test with stdout=1\'))"], stdout=1); '
361                'assert rc == 18')
362        p = subprocess.Popen([sys.executable, "-c", code],
363                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
364        self.addCleanup(p.stdout.close)
365        self.addCleanup(p.stderr.close)
366        out, err = p.communicate()
367        self.assertEqual(p.returncode, 0, err)
368        self.assertEqual(out.rstrip(), 'test with stdout=1')
369
370    def test_cwd(self):
371        tmpdir = tempfile.gettempdir()
372        # We cannot use os.path.realpath to canonicalize the path,
373        # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
374        cwd = os.getcwd()
375        os.chdir(tmpdir)
376        tmpdir = os.getcwd()
377        os.chdir(cwd)
378        p = subprocess.Popen([sys.executable, "-c",
379                          'import sys,os;'
380                          'sys.stdout.write(os.getcwd())'],
381                         stdout=subprocess.PIPE,
382                         cwd=tmpdir)
383        self.addCleanup(p.stdout.close)
384        normcase = os.path.normcase
385        self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
386
387    def test_env(self):
388        newenv = os.environ.copy()
389        newenv["FRUIT"] = "orange"
390        p = subprocess.Popen([sys.executable, "-c",
391                          'import sys,os;'
392                          'sys.stdout.write(os.getenv("FRUIT"))'],
393                         stdout=subprocess.PIPE,
394                         env=newenv)
395        self.addCleanup(p.stdout.close)
396        self.assertEqual(p.stdout.read(), "orange")
397
398    def test_invalid_cmd(self):
399        # null character in the command name
400        cmd = sys.executable + '\0'
401        with self.assertRaises(TypeError):
402            subprocess.Popen([cmd, "-c", "pass"])
403
404        # null character in the command argument
405        with self.assertRaises(TypeError):
406            subprocess.Popen([sys.executable, "-c", "pass#\0"])
407
408    def test_invalid_env(self):
409        # null character in the enviroment variable name
410        newenv = os.environ.copy()
411        newenv["FRUIT\0VEGETABLE"] = "cabbage"
412        with self.assertRaises(TypeError):
413            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
414
415        # null character in the enviroment variable value
416        newenv = os.environ.copy()
417        newenv["FRUIT"] = "orange\0VEGETABLE=cabbage"
418        with self.assertRaises(TypeError):
419            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
420
421        # equal character in the enviroment variable name
422        newenv = os.environ.copy()
423        newenv["FRUIT=ORANGE"] = "lemon"
424        with self.assertRaises(ValueError):
425            subprocess.Popen([sys.executable, "-c", "pass"], env=newenv)
426
427        # equal character in the enviroment variable value
428        newenv = os.environ.copy()
429        newenv["FRUIT"] = "orange=lemon"
430        p = subprocess.Popen([sys.executable, "-c",
431                              'import sys, os;'
432                              'sys.stdout.write(os.getenv("FRUIT"))'],
433                             stdout=subprocess.PIPE,
434                             env=newenv)
435        stdout, stderr = p.communicate()
436        self.assertEqual(stdout, "orange=lemon")
437
438    def test_communicate_stdin(self):
439        p = subprocess.Popen([sys.executable, "-c",
440                              'import sys;'
441                              'sys.exit(sys.stdin.read() == "pear")'],
442                             stdin=subprocess.PIPE)
443        p.communicate("pear")
444        self.assertEqual(p.returncode, 1)
445
446    def test_communicate_stdout(self):
447        p = subprocess.Popen([sys.executable, "-c",
448                              'import sys; sys.stdout.write("pineapple")'],
449                             stdout=subprocess.PIPE)
450        (stdout, stderr) = p.communicate()
451        self.assertEqual(stdout, "pineapple")
452        self.assertEqual(stderr, None)
453
454    def test_communicate_stderr(self):
455        p = subprocess.Popen([sys.executable, "-c",
456                              'import sys; sys.stderr.write("pineapple")'],
457                             stderr=subprocess.PIPE)
458        (stdout, stderr) = p.communicate()
459        self.assertEqual(stdout, None)
460        self.assertStderrEqual(stderr, "pineapple")
461
462    def test_communicate(self):
463        p = subprocess.Popen([sys.executable, "-c",
464                          'import sys,os;'
465                          'sys.stderr.write("pineapple");'
466                          'sys.stdout.write(sys.stdin.read())'],
467                         stdin=subprocess.PIPE,
468                         stdout=subprocess.PIPE,
469                         stderr=subprocess.PIPE)
470        self.addCleanup(p.stdout.close)
471        self.addCleanup(p.stderr.close)
472        self.addCleanup(p.stdin.close)
473        (stdout, stderr) = p.communicate("banana")
474        self.assertEqual(stdout, "banana")
475        self.assertStderrEqual(stderr, "pineapple")
476
477    # This test is Linux specific for simplicity to at least have
478    # some coverage.  It is not a platform specific bug.
479    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
480                         "Linux specific")
481    # Test for the fd leak reported in http://bugs.python.org/issue2791.
482    def test_communicate_pipe_fd_leak(self):
483        fd_directory = '/proc/%d/fd' % os.getpid()
484        num_fds_before_popen = len(os.listdir(fd_directory))
485        p = subprocess.Popen([sys.executable, "-c", "print('')"],
486                             stdout=subprocess.PIPE)
487        p.communicate()
488        num_fds_after_communicate = len(os.listdir(fd_directory))
489        del p
490        num_fds_after_destruction = len(os.listdir(fd_directory))
491        self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
492        self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
493
494    def test_communicate_returns(self):
495        # communicate() should return None if no redirection is active
496        p = subprocess.Popen([sys.executable, "-c",
497                              "import sys; sys.exit(47)"])
498        (stdout, stderr) = p.communicate()
499        self.assertEqual(stdout, None)
500        self.assertEqual(stderr, None)
501
502    def test_communicate_pipe_buf(self):
503        # communicate() with writes larger than pipe_buf
504        # This test will probably deadlock rather than fail, if
505        # communicate() does not work properly.
506        x, y = os.pipe()
507        if mswindows:
508            pipe_buf = 512
509        else:
510            pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
511        os.close(x)
512        os.close(y)
513        p = subprocess.Popen([sys.executable, "-c",
514                          'import sys,os;'
515                          'sys.stdout.write(sys.stdin.read(47));'
516                          'sys.stderr.write("xyz"*%d);'
517                          'sys.stdout.write(sys.stdin.read())' % pipe_buf],
518                         stdin=subprocess.PIPE,
519                         stdout=subprocess.PIPE,
520                         stderr=subprocess.PIPE)
521        self.addCleanup(p.stdout.close)
522        self.addCleanup(p.stderr.close)
523        self.addCleanup(p.stdin.close)
524        string_to_write = "abc"*pipe_buf
525        (stdout, stderr) = p.communicate(string_to_write)
526        self.assertEqual(stdout, string_to_write)
527
528    def test_writes_before_communicate(self):
529        # stdin.write before communicate()
530        p = subprocess.Popen([sys.executable, "-c",
531                          'import sys,os;'
532                          'sys.stdout.write(sys.stdin.read())'],
533                         stdin=subprocess.PIPE,
534                         stdout=subprocess.PIPE,
535                         stderr=subprocess.PIPE)
536        self.addCleanup(p.stdout.close)
537        self.addCleanup(p.stderr.close)
538        self.addCleanup(p.stdin.close)
539        p.stdin.write("banana")
540        (stdout, stderr) = p.communicate("split")
541        self.assertEqual(stdout, "bananasplit")
542        self.assertStderrEqual(stderr, "")
543
544    def test_universal_newlines(self):
545        p = subprocess.Popen([sys.executable, "-c",
546                          'import sys,os;' + SETBINARY +
547                          'sys.stdout.write("line1\\n");'
548                          'sys.stdout.flush();'
549                          'sys.stdout.write("line2\\r");'
550                          'sys.stdout.flush();'
551                          'sys.stdout.write("line3\\r\\n");'
552                          'sys.stdout.flush();'
553                          'sys.stdout.write("line4\\r");'
554                          'sys.stdout.flush();'
555                          'sys.stdout.write("\\nline5");'
556                          'sys.stdout.flush();'
557                          'sys.stdout.write("\\nline6");'],
558                         stdout=subprocess.PIPE,
559                         universal_newlines=1)
560        self.addCleanup(p.stdout.close)
561        stdout = p.stdout.read()
562        if hasattr(file, 'newlines'):
563            # Interpreter with universal newline support
564            self.assertEqual(stdout,
565                             "line1\nline2\nline3\nline4\nline5\nline6")
566        else:
567            # Interpreter without universal newline support
568            self.assertEqual(stdout,
569                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
570
571    def test_universal_newlines_communicate(self):
572        # universal newlines through communicate()
573        p = subprocess.Popen([sys.executable, "-c",
574                          'import sys,os;' + SETBINARY +
575                          'sys.stdout.write("line1\\n");'
576                          'sys.stdout.flush();'
577                          'sys.stdout.write("line2\\r");'
578                          'sys.stdout.flush();'
579                          'sys.stdout.write("line3\\r\\n");'
580                          'sys.stdout.flush();'
581                          'sys.stdout.write("line4\\r");'
582                          'sys.stdout.flush();'
583                          'sys.stdout.write("\\nline5");'
584                          'sys.stdout.flush();'
585                          'sys.stdout.write("\\nline6");'],
586                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
587                         universal_newlines=1)
588        self.addCleanup(p.stdout.close)
589        self.addCleanup(p.stderr.close)
590        (stdout, stderr) = p.communicate()
591        if hasattr(file, 'newlines'):
592            # Interpreter with universal newline support
593            self.assertEqual(stdout,
594                             "line1\nline2\nline3\nline4\nline5\nline6")
595        else:
596            # Interpreter without universal newline support
597            self.assertEqual(stdout,
598                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
599
600    def test_no_leaking(self):
601        # Make sure we leak no resources
602        if not mswindows:
603            max_handles = 1026 # too much for most UNIX systems
604        else:
605            max_handles = 2050 # too much for (at least some) Windows setups
606        handles = []
607        try:
608            for i in range(max_handles):
609                try:
610                    handles.append(os.open(test_support.TESTFN,
611                                           os.O_WRONLY | os.O_CREAT))
612                except OSError as e:
613                    if e.errno != errno.EMFILE:
614                        raise
615                    break
616            else:
617                self.skipTest("failed to reach the file descriptor limit "
618                    "(tried %d)" % max_handles)
619            # Close a couple of them (should be enough for a subprocess)
620            for i in range(10):
621                os.close(handles.pop())
622            # Loop creating some subprocesses. If one of them leaks some fds,
623            # the next loop iteration will fail by reaching the max fd limit.
624            for i in range(15):
625                p = subprocess.Popen([sys.executable, "-c",
626                                      "import sys;"
627                                      "sys.stdout.write(sys.stdin.read())"],
628                                     stdin=subprocess.PIPE,
629                                     stdout=subprocess.PIPE,
630                                     stderr=subprocess.PIPE)
631                data = p.communicate(b"lime")[0]
632                self.assertEqual(data, b"lime")
633        finally:
634            for h in handles:
635                os.close(h)
636            test_support.unlink(test_support.TESTFN)
637
638    def test_list2cmdline(self):
639        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
640                         '"a b c" d e')
641        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
642                         'ab\\"c \\ d')
643        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
644                         'ab\\"c " \\\\" d')
645        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
646                         'a\\\\\\b "de fg" h')
647        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
648                         'a\\\\\\"b c d')
649        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
650                         '"a\\\\b c" d e')
651        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
652                         '"a\\\\b\\ c" d e')
653        self.assertEqual(subprocess.list2cmdline(['ab', '']),
654                         'ab ""')
655
656
657    def test_poll(self):
658        p = subprocess.Popen([sys.executable,
659                          "-c", "import time; time.sleep(1)"])
660        count = 0
661        while p.poll() is None:
662            time.sleep(0.1)
663            count += 1
664        # We expect that the poll loop probably went around about 10 times,
665        # but, based on system scheduling we can't control, it's possible
666        # poll() never returned None.  It "should be" very rare that it
667        # didn't go around at least twice.
668        self.assertGreaterEqual(count, 2)
669        # Subsequent invocations should just return the returncode
670        self.assertEqual(p.poll(), 0)
671
672
673    def test_wait(self):
674        p = subprocess.Popen([sys.executable,
675                          "-c", "import time; time.sleep(2)"])
676        self.assertEqual(p.wait(), 0)
677        # Subsequent invocations should just return the returncode
678        self.assertEqual(p.wait(), 0)
679
680
681    def test_invalid_bufsize(self):
682        # an invalid type of the bufsize argument should raise
683        # TypeError.
684        with self.assertRaises(TypeError):
685            subprocess.Popen([sys.executable, "-c", "pass"], "orange")
686
687    def test_leaking_fds_on_error(self):
688        # see bug #5179: Popen leaks file descriptors to PIPEs if
689        # the child fails to execute; this will eventually exhaust
690        # the maximum number of open fds. 1024 seems a very common
691        # value for that limit, but Windows has 2048, so we loop
692        # 1024 times (each call leaked two fds).
693        for i in range(1024):
694            # Windows raises IOError.  Others raise OSError.
695            with self.assertRaises(EnvironmentError) as c:
696                subprocess.Popen(['nonexisting_i_hope'],
697                                 stdout=subprocess.PIPE,
698                                 stderr=subprocess.PIPE)
699            # ignore errors that indicate the command was not found
700            if c.exception.errno not in (errno.ENOENT, errno.ENOTDIR, errno.EACCES):
701                raise c.exception
702
703    @unittest.skipIf(threading is None, "threading required")
704    def test_double_close_on_error(self):
705        # Issue #18851
706        fds = []
707        def open_fds():
708            for i in range(20):
709                fds.extend(os.pipe())
710                time.sleep(0.001)
711        t = threading.Thread(target=open_fds)
712        t.start()
713        try:
714            with self.assertRaises(EnvironmentError):
715                subprocess.Popen(['nonexisting_i_hope'],
716                                 stdin=subprocess.PIPE,
717                                 stdout=subprocess.PIPE,
718                                 stderr=subprocess.PIPE)
719        finally:
720            t.join()
721            exc = None
722            for fd in fds:
723                # If a double close occurred, some of those fds will
724                # already have been closed by mistake, and os.close()
725                # here will raise.
726                try:
727                    os.close(fd)
728                except OSError as e:
729                    exc = e
730            if exc is not None:
731                raise exc
732
733    def test_handles_closed_on_exception(self):
734        # If CreateProcess exits with an error, ensure the
735        # duplicate output handles are released
736        ifhandle, ifname = tempfile.mkstemp()
737        ofhandle, ofname = tempfile.mkstemp()
738        efhandle, efname = tempfile.mkstemp()
739        try:
740            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
741              stderr=efhandle)
742        except OSError:
743            os.close(ifhandle)
744            os.remove(ifname)
745            os.close(ofhandle)
746            os.remove(ofname)
747            os.close(efhandle)
748            os.remove(efname)
749        self.assertFalse(os.path.exists(ifname))
750        self.assertFalse(os.path.exists(ofname))
751        self.assertFalse(os.path.exists(efname))
752
753    def test_communicate_epipe(self):
754        # Issue 10963: communicate() should hide EPIPE
755        p = subprocess.Popen([sys.executable, "-c", 'pass'],
756                             stdin=subprocess.PIPE,
757                             stdout=subprocess.PIPE,
758                             stderr=subprocess.PIPE)
759        self.addCleanup(p.stdout.close)
760        self.addCleanup(p.stderr.close)
761        self.addCleanup(p.stdin.close)
762        p.communicate("x" * 2**20)
763
764    def test_communicate_epipe_only_stdin(self):
765        # Issue 10963: communicate() should hide EPIPE
766        p = subprocess.Popen([sys.executable, "-c", 'pass'],
767                             stdin=subprocess.PIPE)
768        self.addCleanup(p.stdin.close)
769        time.sleep(2)
770        p.communicate("x" * 2**20)
771
772    # This test is Linux-ish specific for simplicity to at least have
773    # some coverage.  It is not a platform specific bug.
774    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
775                         "Linux specific")
776    def test_failed_child_execute_fd_leak(self):
777        """Test for the fork() failure fd leak reported in issue16327."""
778        fd_directory = '/proc/%d/fd' % os.getpid()
779        fds_before_popen = os.listdir(fd_directory)
780        with self.assertRaises(PopenTestException):
781            PopenExecuteChildRaises(
782                    [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
783                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
784
785        # NOTE: This test doesn't verify that the real _execute_child
786        # does not close the file descriptors itself on the way out
787        # during an exception.  Code inspection has confirmed that.
788
789        fds_after_exception = os.listdir(fd_directory)
790        self.assertEqual(fds_before_popen, fds_after_exception)
791
792
793# context manager
794class _SuppressCoreFiles(object):
795    """Try to prevent core files from being created."""
796    old_limit = None
797
798    def __enter__(self):
799        """Try to save previous ulimit, then set it to (0, 0)."""
800        if resource is not None:
801            try:
802                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
803                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
804            except (ValueError, resource.error):
805                pass
806
807        if sys.platform == 'darwin':
808            # Check if the 'Crash Reporter' on OSX was configured
809            # in 'Developer' mode and warn that it will get triggered
810            # when it is.
811            #
812            # This assumes that this context manager is used in tests
813            # that might trigger the next manager.
814            value = subprocess.Popen(['/usr/bin/defaults', 'read',
815                    'com.apple.CrashReporter', 'DialogType'],
816                    stdout=subprocess.PIPE).communicate()[0]
817            if value.strip() == b'developer':
818                print "this tests triggers the Crash Reporter, that is intentional"
819                sys.stdout.flush()
820
821    def __exit__(self, *args):
822        """Return core file behavior to default."""
823        if self.old_limit is None:
824            return
825        if resource is not None:
826            try:
827                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
828            except (ValueError, resource.error):
829                pass
830
831    @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
832                         "Requires signal.SIGALRM")
833    def test_communicate_eintr(self):
834        # Issue #12493: communicate() should handle EINTR
835        def handler(signum, frame):
836            pass
837        old_handler = signal.signal(signal.SIGALRM, handler)
838        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
839
840        # the process is running for 2 seconds
841        args = [sys.executable, "-c", 'import time; time.sleep(2)']
842        for stream in ('stdout', 'stderr'):
843            kw = {stream: subprocess.PIPE}
844            with subprocess.Popen(args, **kw) as process:
845                signal.alarm(1)
846                try:
847                    # communicate() will be interrupted by SIGALRM
848                    process.communicate()
849                finally:
850                    signal.alarm(0)
851
852
853@unittest.skipIf(mswindows, "POSIX specific tests")
854class POSIXProcessTestCase(BaseTestCase):
855
856    def test_exceptions(self):
857        # caught & re-raised exceptions
858        with self.assertRaises(OSError) as c:
859            p = subprocess.Popen([sys.executable, "-c", ""],
860                                 cwd="/this/path/does/not/exist")
861        # The attribute child_traceback should contain "os.chdir" somewhere.
862        self.assertIn("os.chdir", c.exception.child_traceback)
863
864    def test_run_abort(self):
865        # returncode handles signal termination
866        with _SuppressCoreFiles():
867            p = subprocess.Popen([sys.executable, "-c",
868                                  "import os; os.abort()"])
869            p.wait()
870        self.assertEqual(-p.returncode, signal.SIGABRT)
871
872    def test_preexec(self):
873        # preexec function
874        p = subprocess.Popen([sys.executable, "-c",
875                              "import sys, os;"
876                              "sys.stdout.write(os.getenv('FRUIT'))"],
877                             stdout=subprocess.PIPE,
878                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
879        self.addCleanup(p.stdout.close)
880        self.assertEqual(p.stdout.read(), "apple")
881
882    class _TestExecuteChildPopen(subprocess.Popen):
883        """Used to test behavior at the end of _execute_child."""
884        def __init__(self, testcase, *args, **kwargs):
885            self._testcase = testcase
886            subprocess.Popen.__init__(self, *args, **kwargs)
887
888        def _execute_child(
889                self, args, executable, preexec_fn, close_fds, cwd, env,
890                universal_newlines, startupinfo, creationflags, shell, to_close,
891                p2cread, p2cwrite,
892                c2pread, c2pwrite,
893                errread, errwrite):
894            try:
895                subprocess.Popen._execute_child(
896                        self, args, executable, preexec_fn, close_fds,
897                        cwd, env, universal_newlines,
898                        startupinfo, creationflags, shell, to_close,
899                        p2cread, p2cwrite,
900                        c2pread, c2pwrite,
901                        errread, errwrite)
902            finally:
903                # Open a bunch of file descriptors and verify that
904                # none of them are the same as the ones the Popen
905                # instance is using for stdin/stdout/stderr.
906                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
907                               for _ in range(8)]
908                try:
909                    for fd in devzero_fds:
910                        self._testcase.assertNotIn(
911                                fd, (p2cwrite, c2pread, errread))
912                finally:
913                    for fd in devzero_fds:
914                        os.close(fd)
915
916    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
917    def test_preexec_errpipe_does_not_double_close_pipes(self):
918        """Issue16140: Don't double close pipes on preexec error."""
919
920        def raise_it():
921            raise RuntimeError("force the _execute_child() errpipe_data path.")
922
923        with self.assertRaises(RuntimeError):
924            self._TestExecuteChildPopen(
925                    self, [sys.executable, "-c", "pass"],
926                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
927                    stderr=subprocess.PIPE, preexec_fn=raise_it)
928
929    def test_args_string(self):
930        # args is a string
931        f, fname = tempfile.mkstemp()
932        os.write(f, "#!/bin/sh\n")
933        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
934                    sys.executable)
935        os.close(f)
936        os.chmod(fname, 0o700)
937        p = subprocess.Popen(fname)
938        p.wait()
939        os.remove(fname)
940        self.assertEqual(p.returncode, 47)
941
942    def test_invalid_args(self):
943        # invalid arguments should raise ValueError
944        self.assertRaises(ValueError, subprocess.call,
945                          [sys.executable, "-c",
946                           "import sys; sys.exit(47)"],
947                          startupinfo=47)
948        self.assertRaises(ValueError, subprocess.call,
949                          [sys.executable, "-c",
950                           "import sys; sys.exit(47)"],
951                          creationflags=47)
952
953    def test_shell_sequence(self):
954        # Run command through the shell (sequence)
955        newenv = os.environ.copy()
956        newenv["FRUIT"] = "apple"
957        p = subprocess.Popen(["echo $FRUIT"], shell=1,
958                             stdout=subprocess.PIPE,
959                             env=newenv)
960        self.addCleanup(p.stdout.close)
961        self.assertEqual(p.stdout.read().strip(), "apple")
962
963    def test_shell_string(self):
964        # Run command through the shell (string)
965        newenv = os.environ.copy()
966        newenv["FRUIT"] = "apple"
967        p = subprocess.Popen("echo $FRUIT", shell=1,
968                             stdout=subprocess.PIPE,
969                             env=newenv)
970        self.addCleanup(p.stdout.close)
971        self.assertEqual(p.stdout.read().strip(), "apple")
972
973    def test_call_string(self):
974        # call() function with string argument on UNIX
975        f, fname = tempfile.mkstemp()
976        os.write(f, "#!/bin/sh\n")
977        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
978                    sys.executable)
979        os.close(f)
980        os.chmod(fname, 0700)
981        rc = subprocess.call(fname)
982        os.remove(fname)
983        self.assertEqual(rc, 47)
984
985    def test_specific_shell(self):
986        # Issue #9265: Incorrect name passed as arg[0].
987        shells = []
988        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
989            for name in ['bash', 'ksh']:
990                sh = os.path.join(prefix, name)
991                if os.path.isfile(sh):
992                    shells.append(sh)
993        if not shells: # Will probably work for any shell but csh.
994            self.skipTest("bash or ksh required for this test")
995        sh = '/bin/sh'
996        if os.path.isfile(sh) and not os.path.islink(sh):
997            # Test will fail if /bin/sh is a symlink to csh.
998            shells.append(sh)
999        for sh in shells:
1000            p = subprocess.Popen("echo $0", executable=sh, shell=True,
1001                                 stdout=subprocess.PIPE)
1002            self.addCleanup(p.stdout.close)
1003            self.assertEqual(p.stdout.read().strip(), sh)
1004
1005    def _kill_process(self, method, *args):
1006        # Do not inherit file handles from the parent.
1007        # It should fix failures on some platforms.
1008        p = subprocess.Popen([sys.executable, "-c", """if 1:
1009                             import sys, time
1010                             sys.stdout.write('x\\n')
1011                             sys.stdout.flush()
1012                             time.sleep(30)
1013                             """],
1014                             close_fds=True,
1015                             stdin=subprocess.PIPE,
1016                             stdout=subprocess.PIPE,
1017                             stderr=subprocess.PIPE)
1018        # Wait for the interpreter to be completely initialized before
1019        # sending any signal.
1020        p.stdout.read(1)
1021        getattr(p, method)(*args)
1022        return p
1023
1024    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
1025                     "Due to known OS bug (issue #16762)")
1026    def _kill_dead_process(self, method, *args):
1027        # Do not inherit file handles from the parent.
1028        # It should fix failures on some platforms.
1029        p = subprocess.Popen([sys.executable, "-c", """if 1:
1030                             import sys, time
1031                             sys.stdout.write('x\\n')
1032                             sys.stdout.flush()
1033                             """],
1034                             close_fds=True,
1035                             stdin=subprocess.PIPE,
1036                             stdout=subprocess.PIPE,
1037                             stderr=subprocess.PIPE)
1038        # Wait for the interpreter to be completely initialized before
1039        # sending any signal.
1040        p.stdout.read(1)
1041        # The process should end after this
1042        time.sleep(1)
1043        # This shouldn't raise even though the child is now dead
1044        getattr(p, method)(*args)
1045        p.communicate()
1046
1047    def test_send_signal(self):
1048        p = self._kill_process('send_signal', signal.SIGINT)
1049        _, stderr = p.communicate()
1050        self.assertIn('KeyboardInterrupt', stderr)
1051        self.assertNotEqual(p.wait(), 0)
1052
1053    def test_kill(self):
1054        p = self._kill_process('kill')
1055        _, stderr = p.communicate()
1056        self.assertStderrEqual(stderr, '')
1057        self.assertEqual(p.wait(), -signal.SIGKILL)
1058
1059    def test_terminate(self):
1060        p = self._kill_process('terminate')
1061        _, stderr = p.communicate()
1062        self.assertStderrEqual(stderr, '')
1063        self.assertEqual(p.wait(), -signal.SIGTERM)
1064
1065    def test_send_signal_dead(self):
1066        # Sending a signal to a dead process
1067        self._kill_dead_process('send_signal', signal.SIGINT)
1068
1069    def test_kill_dead(self):
1070        # Killing a dead process
1071        self._kill_dead_process('kill')
1072
1073    def test_terminate_dead(self):
1074        # Terminating a dead process
1075        self._kill_dead_process('terminate')
1076
1077    def check_close_std_fds(self, fds):
1078        # Issue #9905: test that subprocess pipes still work properly with
1079        # some standard fds closed
1080        stdin = 0
1081        newfds = []
1082        for a in fds:
1083            b = os.dup(a)
1084            newfds.append(b)
1085            if a == 0:
1086                stdin = b
1087        try:
1088            for fd in fds:
1089                os.close(fd)
1090            out, err = subprocess.Popen([sys.executable, "-c",
1091                              'import sys;'
1092                              'sys.stdout.write("apple");'
1093                              'sys.stdout.flush();'
1094                              'sys.stderr.write("orange")'],
1095                       stdin=stdin,
1096                       stdout=subprocess.PIPE,
1097                       stderr=subprocess.PIPE).communicate()
1098            err = test_support.strip_python_stderr(err)
1099            self.assertEqual((out, err), (b'apple', b'orange'))
1100        finally:
1101            for b, a in zip(newfds, fds):
1102                os.dup2(b, a)
1103            for b in newfds:
1104                os.close(b)
1105
1106    def test_close_fd_0(self):
1107        self.check_close_std_fds([0])
1108
1109    def test_close_fd_1(self):
1110        self.check_close_std_fds([1])
1111
1112    def test_close_fd_2(self):
1113        self.check_close_std_fds([2])
1114
1115    def test_close_fds_0_1(self):
1116        self.check_close_std_fds([0, 1])
1117
1118    def test_close_fds_0_2(self):
1119        self.check_close_std_fds([0, 2])
1120
1121    def test_close_fds_1_2(self):
1122        self.check_close_std_fds([1, 2])
1123
1124    def test_close_fds_0_1_2(self):
1125        # Issue #10806: test that subprocess pipes still work properly with
1126        # all standard fds closed.
1127        self.check_close_std_fds([0, 1, 2])
1128
1129    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1130        # open up some temporary files
1131        temps = [tempfile.mkstemp() for i in range(3)]
1132        temp_fds = [fd for fd, fname in temps]
1133        try:
1134            # unlink the files -- we won't need to reopen them
1135            for fd, fname in temps:
1136                os.unlink(fname)
1137
1138            # save a copy of the standard file descriptors
1139            saved_fds = [os.dup(fd) for fd in range(3)]
1140            try:
1141                # duplicate the temp files over the standard fd's 0, 1, 2
1142                for fd, temp_fd in enumerate(temp_fds):
1143                    os.dup2(temp_fd, fd)
1144
1145                # write some data to what will become stdin, and rewind
1146                os.write(stdin_no, b"STDIN")
1147                os.lseek(stdin_no, 0, 0)
1148
1149                # now use those files in the given order, so that subprocess
1150                # has to rearrange them in the child
1151                p = subprocess.Popen([sys.executable, "-c",
1152                    'import sys; got = sys.stdin.read();'
1153                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1154                    stdin=stdin_no,
1155                    stdout=stdout_no,
1156                    stderr=stderr_no)
1157                p.wait()
1158
1159                for fd in temp_fds:
1160                    os.lseek(fd, 0, 0)
1161
1162                out = os.read(stdout_no, 1024)
1163                err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
1164            finally:
1165                for std, saved in enumerate(saved_fds):
1166                    os.dup2(saved, std)
1167                    os.close(saved)
1168
1169            self.assertEqual(out, b"got STDIN")
1170            self.assertEqual(err, b"err")
1171
1172        finally:
1173            for fd in temp_fds:
1174                os.close(fd)
1175
1176    # When duping fds, if there arises a situation where one of the fds is
1177    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1178    # This tests all combinations of this.
1179    def test_swap_fds(self):
1180        self.check_swap_fds(0, 1, 2)
1181        self.check_swap_fds(0, 2, 1)
1182        self.check_swap_fds(1, 0, 2)
1183        self.check_swap_fds(1, 2, 0)
1184        self.check_swap_fds(2, 0, 1)
1185        self.check_swap_fds(2, 1, 0)
1186
1187    def test_wait_when_sigchild_ignored(self):
1188        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
1189        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
1190                                                subdir="subprocessdata")
1191        p = subprocess.Popen([sys.executable, sigchild_ignore],
1192                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1193        stdout, stderr = p.communicate()
1194        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
1195                         " non-zero with this error:\n%s" % stderr)
1196
1197    def test_zombie_fast_process_del(self):
1198        # Issue #12650: on Unix, if Popen.__del__() was called before the
1199        # process exited, it wouldn't be added to subprocess._active, and would
1200        # remain a zombie.
1201        # spawn a Popen, and delete its reference before it exits
1202        p = subprocess.Popen([sys.executable, "-c",
1203                              'import sys, time;'
1204                              'time.sleep(0.2)'],
1205                             stdout=subprocess.PIPE,
1206                             stderr=subprocess.PIPE)
1207        self.addCleanup(p.stdout.close)
1208        self.addCleanup(p.stderr.close)
1209        ident = id(p)
1210        pid = p.pid
1211        del p
1212        # check that p is in the active processes list
1213        self.assertIn(ident, [id(o) for o in subprocess._active])
1214
1215    def test_leak_fast_process_del_killed(self):
1216        # Issue #12650: on Unix, if Popen.__del__() was called before the
1217        # process exited, and the process got killed by a signal, it would never
1218        # be removed from subprocess._active, which triggered a FD and memory
1219        # leak.
1220        # spawn a Popen, delete its reference and kill it
1221        p = subprocess.Popen([sys.executable, "-c",
1222                              'import time;'
1223                              'time.sleep(3)'],
1224                             stdout=subprocess.PIPE,
1225                             stderr=subprocess.PIPE)
1226        self.addCleanup(p.stdout.close)
1227        self.addCleanup(p.stderr.close)
1228        ident = id(p)
1229        pid = p.pid
1230        del p
1231        os.kill(pid, signal.SIGKILL)
1232        # check that p is in the active processes list
1233        self.assertIn(ident, [id(o) for o in subprocess._active])
1234
1235        # let some time for the process to exit, and create a new Popen: this
1236        # should trigger the wait() of p
1237        time.sleep(0.2)
1238        with self.assertRaises(EnvironmentError) as c:
1239            with subprocess.Popen(['nonexisting_i_hope'],
1240                                  stdout=subprocess.PIPE,
1241                                  stderr=subprocess.PIPE) as proc:
1242                pass
1243        # p should have been wait()ed on, and removed from the _active list
1244        self.assertRaises(OSError, os.waitpid, pid, 0)
1245        self.assertNotIn(ident, [id(o) for o in subprocess._active])
1246
1247    def test_pipe_cloexec(self):
1248        # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
1249        # and are not inherited by another child process.
1250        p1 = subprocess.Popen([sys.executable, "-c",
1251                               'import os;'
1252                               'os.read(0, 1)'
1253                              ],
1254                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1255                              stderr=subprocess.PIPE)
1256
1257        p2 = subprocess.Popen([sys.executable, "-c", """if True:
1258                               import os, errno, sys
1259                               for fd in %r:
1260                                   try:
1261                                       os.close(fd)
1262                                   except OSError as e:
1263                                       if e.errno != errno.EBADF:
1264                                           raise
1265                                   else:
1266                                       sys.exit(1)
1267                               sys.exit(0)
1268                               """ % [f.fileno() for f in (p1.stdin, p1.stdout,
1269                                                           p1.stderr)]
1270                              ],
1271                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1272                              stderr=subprocess.PIPE, close_fds=False)
1273        p1.communicate('foo')
1274        _, stderr = p2.communicate()
1275
1276        self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
1277
1278    @unittest.skipUnless(_testcapi is not None
1279                         and hasattr(_testcapi, 'W_STOPCODE'),
1280                         'need _testcapi.W_STOPCODE')
1281    def test_stopped(self):
1282        """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335."""
1283        args = [sys.executable, '-c', 'pass']
1284        proc = subprocess.Popen(args)
1285
1286        # Wait until the real process completes to avoid zombie process
1287        pid = proc.pid
1288        pid, status = os.waitpid(pid, 0)
1289        self.assertEqual(status, 0)
1290
1291        status = _testcapi.W_STOPCODE(3)
1292
1293        def mock_waitpid(pid, flags):
1294            return (pid, status)
1295
1296        with test_support.swap_attr(os, 'waitpid', mock_waitpid):
1297            returncode = proc.wait()
1298
1299        self.assertEqual(returncode, -3)
1300
1301
1302@unittest.skipUnless(mswindows, "Windows specific tests")
1303class Win32ProcessTestCase(BaseTestCase):
1304
1305    def test_startupinfo(self):
1306        # startupinfo argument
1307        # We uses hardcoded constants, because we do not want to
1308        # depend on win32all.
1309        STARTF_USESHOWWINDOW = 1
1310        SW_MAXIMIZE = 3
1311        startupinfo = subprocess.STARTUPINFO()
1312        startupinfo.dwFlags = STARTF_USESHOWWINDOW
1313        startupinfo.wShowWindow = SW_MAXIMIZE
1314        # Since Python is a console process, it won't be affected
1315        # by wShowWindow, but the argument should be silently
1316        # ignored
1317        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
1318                        startupinfo=startupinfo)
1319
1320    def test_creationflags(self):
1321        # creationflags argument
1322        CREATE_NEW_CONSOLE = 16
1323        sys.stderr.write("    a DOS box should flash briefly ...\n")
1324        subprocess.call(sys.executable +
1325                        ' -c "import time; time.sleep(0.25)"',
1326                        creationflags=CREATE_NEW_CONSOLE)
1327
1328    def test_invalid_args(self):
1329        # invalid arguments should raise ValueError
1330        self.assertRaises(ValueError, subprocess.call,
1331                          [sys.executable, "-c",
1332                           "import sys; sys.exit(47)"],
1333                          preexec_fn=lambda: 1)
1334        self.assertRaises(ValueError, subprocess.call,
1335                          [sys.executable, "-c",
1336                           "import sys; sys.exit(47)"],
1337                          stdout=subprocess.PIPE,
1338                          close_fds=True)
1339
1340    def test_close_fds(self):
1341        # close file descriptors
1342        rc = subprocess.call([sys.executable, "-c",
1343                              "import sys; sys.exit(47)"],
1344                              close_fds=True)
1345        self.assertEqual(rc, 47)
1346
1347    def test_shell_sequence(self):
1348        # Run command through the shell (sequence)
1349        newenv = os.environ.copy()
1350        newenv["FRUIT"] = "physalis"
1351        p = subprocess.Popen(["set"], shell=1,
1352                             stdout=subprocess.PIPE,
1353                             env=newenv)
1354        self.addCleanup(p.stdout.close)
1355        self.assertIn("physalis", p.stdout.read())
1356
1357    def test_shell_string(self):
1358        # Run command through the shell (string)
1359        newenv = os.environ.copy()
1360        newenv["FRUIT"] = "physalis"
1361        p = subprocess.Popen("set", shell=1,
1362                             stdout=subprocess.PIPE,
1363                             env=newenv)
1364        self.addCleanup(p.stdout.close)
1365        self.assertIn("physalis", p.stdout.read())
1366
1367    def test_call_string(self):
1368        # call() function with string argument on Windows
1369        rc = subprocess.call(sys.executable +
1370                             ' -c "import sys; sys.exit(47)"')
1371        self.assertEqual(rc, 47)
1372
1373    def _kill_process(self, method, *args):
1374        # Some win32 buildbot raises EOFError if stdin is inherited
1375        p = subprocess.Popen([sys.executable, "-c", """if 1:
1376                             import sys, time
1377                             sys.stdout.write('x\\n')
1378                             sys.stdout.flush()
1379                             time.sleep(30)
1380                             """],
1381                             stdin=subprocess.PIPE,
1382                             stdout=subprocess.PIPE,
1383                             stderr=subprocess.PIPE)
1384        self.addCleanup(p.stdout.close)
1385        self.addCleanup(p.stderr.close)
1386        self.addCleanup(p.stdin.close)
1387        # Wait for the interpreter to be completely initialized before
1388        # sending any signal.
1389        p.stdout.read(1)
1390        getattr(p, method)(*args)
1391        _, stderr = p.communicate()
1392        self.assertStderrEqual(stderr, '')
1393        returncode = p.wait()
1394        self.assertNotEqual(returncode, 0)
1395
1396    def _kill_dead_process(self, method, *args):
1397        p = subprocess.Popen([sys.executable, "-c", """if 1:
1398                             import sys, time
1399                             sys.stdout.write('x\\n')
1400                             sys.stdout.flush()
1401                             sys.exit(42)
1402                             """],
1403                             stdin=subprocess.PIPE,
1404                             stdout=subprocess.PIPE,
1405                             stderr=subprocess.PIPE)
1406        self.addCleanup(p.stdout.close)
1407        self.addCleanup(p.stderr.close)
1408        self.addCleanup(p.stdin.close)
1409        # Wait for the interpreter to be completely initialized before
1410        # sending any signal.
1411        p.stdout.read(1)
1412        # The process should end after this
1413        time.sleep(1)
1414        # This shouldn't raise even though the child is now dead
1415        getattr(p, method)(*args)
1416        _, stderr = p.communicate()
1417        self.assertStderrEqual(stderr, b'')
1418        rc = p.wait()
1419        self.assertEqual(rc, 42)
1420
1421    def test_send_signal(self):
1422        self._kill_process('send_signal', signal.SIGTERM)
1423
1424    def test_kill(self):
1425        self._kill_process('kill')
1426
1427    def test_terminate(self):
1428        self._kill_process('terminate')
1429
1430    def test_send_signal_dead(self):
1431        self._kill_dead_process('send_signal', signal.SIGTERM)
1432
1433    def test_kill_dead(self):
1434        self._kill_dead_process('kill')
1435
1436    def test_terminate_dead(self):
1437        self._kill_dead_process('terminate')
1438
1439
1440@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
1441                     "poll system call not supported")
1442class ProcessTestCaseNoPoll(ProcessTestCase):
1443    def setUp(self):
1444        subprocess._has_poll = False
1445        ProcessTestCase.setUp(self)
1446
1447    def tearDown(self):
1448        subprocess._has_poll = True
1449        ProcessTestCase.tearDown(self)
1450
1451
1452class HelperFunctionTests(unittest.TestCase):
1453    @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
1454    def test_eintr_retry_call(self):
1455        record_calls = []
1456        def fake_os_func(*args):
1457            record_calls.append(args)
1458            if len(record_calls) == 2:
1459                raise OSError(errno.EINTR, "fake interrupted system call")
1460            return tuple(reversed(args))
1461
1462        self.assertEqual((999, 256),
1463                         subprocess._eintr_retry_call(fake_os_func, 256, 999))
1464        self.assertEqual([(256, 999)], record_calls)
1465        # This time there will be an EINTR so it will loop once.
1466        self.assertEqual((666,),
1467                         subprocess._eintr_retry_call(fake_os_func, 666))
1468        self.assertEqual([(256, 999), (666,), (666,)], record_calls)
1469
1470@unittest.skipUnless(mswindows, "mswindows only")
1471class CommandsWithSpaces (BaseTestCase):
1472
1473    def setUp(self):
1474        super(CommandsWithSpaces, self).setUp()
1475        f, fname = tempfile.mkstemp(".py", "te st")
1476        self.fname = fname.lower ()
1477        os.write(f, b"import sys;"
1478                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
1479        )
1480        os.close(f)
1481
1482    def tearDown(self):
1483        os.remove(self.fname)
1484        super(CommandsWithSpaces, self).tearDown()
1485
1486    def with_spaces(self, *args, **kwargs):
1487        kwargs['stdout'] = subprocess.PIPE
1488        p = subprocess.Popen(*args, **kwargs)
1489        self.addCleanup(p.stdout.close)
1490        self.assertEqual(
1491          p.stdout.read ().decode("mbcs"),
1492          "2 [%r, 'ab cd']" % self.fname
1493        )
1494
1495    def test_shell_string_with_spaces(self):
1496        # call() function with string argument with spaces on Windows
1497        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1498                                             "ab cd"), shell=1)
1499
1500    def test_shell_sequence_with_spaces(self):
1501        # call() function with sequence argument with spaces on Windows
1502        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
1503
1504    def test_noshell_string_with_spaces(self):
1505        # call() function with string argument with spaces on Windows
1506        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1507                             "ab cd"))
1508
1509    def test_noshell_sequence_with_spaces(self):
1510        # call() function with sequence argument with spaces on Windows
1511        self.with_spaces([sys.executable, self.fname, "ab cd"])
1512
1513def test_main():
1514    unit_tests = (ProcessTestCase,
1515                  POSIXProcessTestCase,
1516                  Win32ProcessTestCase,
1517                  ProcessTestCaseNoPoll,
1518                  HelperFunctionTests,
1519                  CommandsWithSpaces)
1520
1521    test_support.run_unittest(*unit_tests)
1522    test_support.reap_children()
1523
1524if __name__ == "__main__":
1525    test_main()
1526