• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import unittest
4import itertools
5import select
6import signal
7import stat
8import subprocess
9import time
10from array import array
11from weakref import proxy
12try:
13    import threading
14except ImportError:
15    threading = None
16
17from test import test_support
18from test.test_support import TESTFN, run_unittest, requires
19from UserList import UserList
20
21class AutoFileTests(unittest.TestCase):
22    # file tests for which a test file is automatically set up
23
24    def setUp(self):
25        self.f = open(TESTFN, 'wb')
26
27    def tearDown(self):
28        if self.f:
29            self.f.close()
30        os.remove(TESTFN)
31
32    def testWeakRefs(self):
33        # verify weak references
34        p = proxy(self.f)
35        p.write('teststring')
36        self.assertEqual(self.f.tell(), p.tell())
37        self.f.close()
38        self.f = None
39        self.assertRaises(ReferenceError, getattr, p, 'tell')
40
41    def testAttributes(self):
42        # verify expected attributes exist
43        f = self.f
44        with test_support.check_py3k_warnings():
45            softspace = f.softspace
46        f.name     # merely shouldn't blow up
47        f.mode     # ditto
48        f.closed   # ditto
49
50        with test_support.check_py3k_warnings():
51            # verify softspace is writable
52            f.softspace = softspace    # merely shouldn't blow up
53
54        # verify the others aren't
55        for attr in 'name', 'mode', 'closed':
56            self.assertRaises((AttributeError, TypeError), setattr, f, attr, 'oops')
57
58    def testReadinto(self):
59        # verify readinto
60        self.f.write('12')
61        self.f.close()
62        a = array('c', 'x'*10)
63        self.f = open(TESTFN, 'rb')
64        n = self.f.readinto(a)
65        self.assertEqual('12', a.tostring()[:n])
66
67    def testWritelinesUserList(self):
68        # verify writelines with instance sequence
69        l = UserList(['1', '2'])
70        self.f.writelines(l)
71        self.f.close()
72        self.f = open(TESTFN, 'rb')
73        buf = self.f.read()
74        self.assertEqual(buf, '12')
75
76    def testWritelinesIntegers(self):
77        # verify writelines with integers
78        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
79
80    def testWritelinesIntegersUserList(self):
81        # verify writelines with integers in UserList
82        l = UserList([1,2,3])
83        self.assertRaises(TypeError, self.f.writelines, l)
84
85    def testWritelinesNonString(self):
86        # verify writelines with non-string object
87        class NonString:
88            pass
89
90        self.assertRaises(TypeError, self.f.writelines,
91                          [NonString(), NonString()])
92
93    def testWritelinesBuffer(self):
94        self.f.writelines([array('c', 'abc')])
95        self.f.close()
96        self.f = open(TESTFN, 'rb')
97        buf = self.f.read()
98        self.assertEqual(buf, 'abc')
99
100    def testRepr(self):
101        # verify repr works
102        self.assertTrue(repr(self.f).startswith("<open file '" + TESTFN))
103        # see issue #14161
104        # Windows doesn't like \r\n\t" in the file name, but ' is ok
105        fname = 'xx\rxx\nxx\'xx"xx' if sys.platform != "win32" else "xx'xx"
106        with open(fname, 'w') as f:
107            self.addCleanup(os.remove, fname)
108            self.assertTrue(repr(f).startswith(
109                    "<open file %r, mode 'w' at" % fname))
110
111    def testErrors(self):
112        self.f.close()
113        self.f = open(TESTFN, 'rb')
114        f = self.f
115        self.assertEqual(f.name, TESTFN)
116        self.assertTrue(not f.isatty())
117        self.assertTrue(not f.closed)
118
119        self.assertRaises(TypeError, f.readinto, "")
120        f.close()
121        self.assertTrue(f.closed)
122
123    def testMethods(self):
124        methods = ['fileno', 'flush', 'isatty', 'next', 'read', 'readinto',
125                   'readline', 'readlines', 'seek', 'tell', 'truncate',
126                   'write', '__iter__']
127        deprecated_methods = ['xreadlines']
128        if sys.platform.startswith('atheos'):
129            methods.remove('truncate')
130
131        # __exit__ should close the file
132        self.f.__exit__(None, None, None)
133        self.assertTrue(self.f.closed)
134
135        for methodname in methods:
136            method = getattr(self.f, methodname)
137            # should raise on closed file
138            self.assertRaises(ValueError, method)
139        with test_support.check_py3k_warnings():
140            for methodname in deprecated_methods:
141                method = getattr(self.f, methodname)
142                self.assertRaises(ValueError, method)
143        self.assertRaises(ValueError, self.f.writelines, [])
144
145        # file is closed, __exit__ shouldn't do anything
146        self.assertEqual(self.f.__exit__(None, None, None), None)
147        # it must also return None if an exception was given
148        try:
149            1 // 0
150        except:
151            self.assertEqual(self.f.__exit__(*sys.exc_info()), None)
152
153    def testReadWhenWriting(self):
154        self.assertRaises(IOError, self.f.read)
155
156    def testNastyWritelinesGenerator(self):
157        def nasty():
158            for i in range(5):
159                if i == 3:
160                    self.f.close()
161                yield str(i)
162        self.assertRaises(ValueError, self.f.writelines, nasty())
163
164    def testIssue5677(self):
165        # Remark: Do not perform more than one test per open file,
166        # since that does NOT catch the readline error on Windows.
167        data = 'xxx'
168        for mode in ['w', 'wb', 'a', 'ab']:
169            for attr in ['read', 'readline', 'readlines']:
170                self.f = open(TESTFN, mode)
171                self.f.write(data)
172                self.assertRaises(IOError, getattr(self.f, attr))
173                self.f.close()
174
175            self.f = open(TESTFN, mode)
176            self.f.write(data)
177            self.assertRaises(IOError, lambda: [line for line in self.f])
178            self.f.close()
179
180            self.f = open(TESTFN, mode)
181            self.f.write(data)
182            self.assertRaises(IOError, self.f.readinto, bytearray(len(data)))
183            self.f.close()
184
185        for mode in ['r', 'rb', 'U', 'Ub', 'Ur', 'rU', 'rbU', 'rUb']:
186            self.f = open(TESTFN, mode)
187            self.assertRaises(IOError, self.f.write, data)
188            self.f.close()
189
190            self.f = open(TESTFN, mode)
191            self.assertRaises(IOError, self.f.writelines, [data, data])
192            self.f.close()
193
194            self.f = open(TESTFN, mode)
195            self.assertRaises(IOError, self.f.truncate)
196            self.f.close()
197
198class OtherFileTests(unittest.TestCase):
199
200    def testOpenDir(self):
201        this_dir = os.path.dirname(__file__) or os.curdir
202        for mode in (None, "w"):
203            try:
204                if mode:
205                    f = open(this_dir, mode)
206                else:
207                    f = open(this_dir)
208            except IOError as e:
209                self.assertEqual(e.filename, this_dir)
210            else:
211                self.fail("opening a directory didn't raise an IOError")
212
213    def testModeStrings(self):
214        # check invalid mode strings
215        for mode in ("", "aU", "wU+"):
216            try:
217                f = open(TESTFN, mode)
218            except ValueError:
219                pass
220            else:
221                f.close()
222                self.fail('%r is an invalid file mode' % mode)
223
224        # Some invalid modes fail on Windows, but pass on Unix
225        # Issue3965: avoid a crash on Windows when filename is unicode
226        for name in (TESTFN, unicode(TESTFN), unicode(TESTFN + '\t')):
227            try:
228                f = open(name, "rr")
229            except (IOError, ValueError):
230                pass
231            else:
232                f.close()
233
234    def testStdinSeek(self):
235        if sys.platform == 'osf1V5':
236            # This causes the interpreter to exit on OSF1 v5.1.
237            self.skipTest('Skipping sys.stdin.seek(-1), it may crash '
238                          'the interpreter. Test manually.')
239
240        if not sys.stdin.isatty():
241            # Issue #23168: if stdin is redirected to a file, stdin becomes
242            # seekable
243            self.skipTest('stdin must be a TTY in this test')
244
245        self.assertRaises(IOError, sys.stdin.seek, -1)
246
247    def testStdinTruncate(self):
248        self.assertRaises(IOError, sys.stdin.truncate)
249
250    def testUnicodeOpen(self):
251        # verify repr works for unicode too
252        f = open(unicode(TESTFN), "w")
253        self.assertTrue(repr(f).startswith("<open file u'" + TESTFN))
254        f.close()
255        os.unlink(TESTFN)
256
257    def testBadModeArgument(self):
258        # verify that we get a sensible error message for bad mode argument
259        bad_mode = "qwerty"
260        try:
261            f = open(TESTFN, bad_mode)
262        except ValueError, msg:
263            if msg.args[0] != 0:
264                s = str(msg)
265                if TESTFN in s or bad_mode not in s:
266                    self.fail("bad error message for invalid mode: %s" % s)
267            # if msg.args[0] == 0, we're probably on Windows where there may
268            # be no obvious way to discover why open() failed.
269        else:
270            f.close()
271            self.fail("no error for invalid mode: %s" % bad_mode)
272
273    def testSetBufferSize(self):
274        # make sure that explicitly setting the buffer size doesn't cause
275        # misbehaviour especially with repeated close() calls
276        for s in (-1, 0, 1, 512):
277            try:
278                f = open(TESTFN, 'w', s)
279                f.write(str(s))
280                f.close()
281                f.close()
282                f = open(TESTFN, 'r', s)
283                d = int(f.read())
284                f.close()
285                f.close()
286            except IOError, msg:
287                self.fail('error setting buffer size %d: %s' % (s, str(msg)))
288            self.assertEqual(d, s)
289
290    def testTruncateOnWindows(self):
291        os.unlink(TESTFN)
292
293        def bug801631():
294            # SF bug <http://www.python.org/sf/801631>
295            # "file.truncate fault on windows"
296            f = open(TESTFN, 'wb')
297            f.write('12345678901')   # 11 bytes
298            f.close()
299
300            f = open(TESTFN,'rb+')
301            data = f.read(5)
302            if data != '12345':
303                self.fail("Read on file opened for update failed %r" % data)
304            if f.tell() != 5:
305                self.fail("File pos after read wrong %d" % f.tell())
306
307            f.truncate()
308            if f.tell() != 5:
309                self.fail("File pos after ftruncate wrong %d" % f.tell())
310
311            f.close()
312            size = os.path.getsize(TESTFN)
313            if size != 5:
314                self.fail("File size after ftruncate wrong %d" % size)
315
316        try:
317            bug801631()
318        finally:
319            os.unlink(TESTFN)
320
321    def testIteration(self):
322        # Test the complex interaction when mixing file-iteration and the
323        # various read* methods. Ostensibly, the mixture could just be tested
324        # to work when it should work according to the Python language,
325        # instead of fail when it should fail according to the current CPython
326        # implementation.  People don't always program Python the way they
327        # should, though, and the implemenation might change in subtle ways,
328        # so we explicitly test for errors, too; the test will just have to
329        # be updated when the implementation changes.
330        dataoffset = 16384
331        filler = "ham\n"
332        assert not dataoffset % len(filler), \
333            "dataoffset must be multiple of len(filler)"
334        nchunks = dataoffset // len(filler)
335        testlines = [
336            "spam, spam and eggs\n",
337            "eggs, spam, ham and spam\n",
338            "saussages, spam, spam and eggs\n",
339            "spam, ham, spam and eggs\n",
340            "spam, spam, spam, spam, spam, ham, spam\n",
341            "wonderful spaaaaaam.\n"
342        ]
343        methods = [("readline", ()), ("read", ()), ("readlines", ()),
344                   ("readinto", (array("c", " "*100),))]
345
346        try:
347            # Prepare the testfile
348            bag = open(TESTFN, "w")
349            bag.write(filler * nchunks)
350            bag.writelines(testlines)
351            bag.close()
352            # Test for appropriate errors mixing read* and iteration
353            for methodname, args in methods:
354                f = open(TESTFN)
355                if f.next() != filler:
356                    self.fail, "Broken testfile"
357                meth = getattr(f, methodname)
358                try:
359                    meth(*args)
360                except ValueError:
361                    pass
362                else:
363                    self.fail("%s%r after next() didn't raise ValueError" %
364                                     (methodname, args))
365                f.close()
366
367            # Test to see if harmless (by accident) mixing of read* and
368            # iteration still works. This depends on the size of the internal
369            # iteration buffer (currently 8192,) but we can test it in a
370            # flexible manner.  Each line in the bag o' ham is 4 bytes
371            # ("h", "a", "m", "\n"), so 4096 lines of that should get us
372            # exactly on the buffer boundary for any power-of-2 buffersize
373            # between 4 and 16384 (inclusive).
374            f = open(TESTFN)
375            for i in range(nchunks):
376                f.next()
377            testline = testlines.pop(0)
378            try:
379                line = f.readline()
380            except ValueError:
381                self.fail("readline() after next() with supposedly empty "
382                          "iteration-buffer failed anyway")
383            if line != testline:
384                self.fail("readline() after next() with empty buffer "
385                          "failed. Got %r, expected %r" % (line, testline))
386            testline = testlines.pop(0)
387            buf = array("c", "\x00" * len(testline))
388            try:
389                f.readinto(buf)
390            except ValueError:
391                self.fail("readinto() after next() with supposedly empty "
392                          "iteration-buffer failed anyway")
393            line = buf.tostring()
394            if line != testline:
395                self.fail("readinto() after next() with empty buffer "
396                          "failed. Got %r, expected %r" % (line, testline))
397
398            testline = testlines.pop(0)
399            try:
400                line = f.read(len(testline))
401            except ValueError:
402                self.fail("read() after next() with supposedly empty "
403                          "iteration-buffer failed anyway")
404            if line != testline:
405                self.fail("read() after next() with empty buffer "
406                          "failed. Got %r, expected %r" % (line, testline))
407            try:
408                lines = f.readlines()
409            except ValueError:
410                self.fail("readlines() after next() with supposedly empty "
411                          "iteration-buffer failed anyway")
412            if lines != testlines:
413                self.fail("readlines() after next() with empty buffer "
414                          "failed. Got %r, expected %r" % (line, testline))
415            # Reading after iteration hit EOF shouldn't hurt either
416            f = open(TESTFN)
417            try:
418                for line in f:
419                    pass
420                try:
421                    f.readline()
422                    f.readinto(buf)
423                    f.read()
424                    f.readlines()
425                except ValueError:
426                    self.fail("read* failed after next() consumed file")
427            finally:
428                f.close()
429        finally:
430            os.unlink(TESTFN)
431
432    @unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
433    def test_write_full(self):
434        devfull = '/dev/full'
435        if not (os.path.exists(devfull) and
436                stat.S_ISCHR(os.stat(devfull).st_mode)):
437            # Issue #21934: OpenBSD does not have a /dev/full character device
438            self.skipTest('requires %r' % devfull)
439        with open(devfull, 'wb', 1) as f:
440            with self.assertRaises(IOError):
441                f.write('hello\n')
442        with open(devfull, 'wb', 1) as f:
443            with self.assertRaises(IOError):
444                # Issue #17976
445                f.write('hello')
446                f.write('\n')
447        with open(devfull, 'wb', 0) as f:
448            with self.assertRaises(IOError):
449                f.write('h')
450
451    @unittest.skipUnless(sys.maxsize > 2**31, "requires 64-bit system")
452    @test_support.precisionbigmemtest(2**31, 2.5, dry_run=False)
453    def test_very_long_line(self, size):
454        # Issue #22526
455        requires('largefile')
456        with open(TESTFN, "wb") as fp:
457            fp.seek(size - 1)
458            fp.write("\0")
459        with open(TESTFN, "rb") as fp:
460            for l in fp:
461                pass
462        self.assertEqual(len(l), size)
463        self.assertEqual(l.count("\0"), size)
464        l = None
465
466class FileSubclassTests(unittest.TestCase):
467
468    def testExit(self):
469        # test that exiting with context calls subclass' close
470        class C(file):
471            def __init__(self, *args):
472                self.subclass_closed = False
473                file.__init__(self, *args)
474            def close(self):
475                self.subclass_closed = True
476                file.close(self)
477
478        with C(TESTFN, 'w') as f:
479            pass
480        self.assertTrue(f.subclass_closed)
481
482
483@unittest.skipUnless(threading, 'Threading required for this test.')
484class FileThreadingTests(unittest.TestCase):
485    # These tests check the ability to call various methods of file objects
486    # (including close()) concurrently without crashing the Python interpreter.
487    # See #815646, #595601
488
489    def setUp(self):
490        self._threads = test_support.threading_setup()
491        self.f = None
492        self.filename = TESTFN
493        with open(self.filename, "w") as f:
494            f.write("\n".join("0123456789"))
495        self._count_lock = threading.Lock()
496        self.close_count = 0
497        self.close_success_count = 0
498        self.use_buffering = False
499
500    def tearDown(self):
501        if self.f:
502            try:
503                self.f.close()
504            except (EnvironmentError, ValueError):
505                pass
506        try:
507            os.remove(self.filename)
508        except EnvironmentError:
509            pass
510        test_support.threading_cleanup(*self._threads)
511
512    def _create_file(self):
513        if self.use_buffering:
514            self.f = open(self.filename, "w+", buffering=1024*16)
515        else:
516            self.f = open(self.filename, "w+")
517
518    def _close_file(self):
519        with self._count_lock:
520            self.close_count += 1
521        self.f.close()
522        with self._count_lock:
523            self.close_success_count += 1
524
525    def _close_and_reopen_file(self):
526        self._close_file()
527        # if close raises an exception thats fine, self.f remains valid so
528        # we don't need to reopen.
529        self._create_file()
530
531    def _run_workers(self, func, nb_workers, duration=0.2):
532        with self._count_lock:
533            self.close_count = 0
534            self.close_success_count = 0
535        self.do_continue = True
536        threads = []
537        try:
538            for i in range(nb_workers):
539                t = threading.Thread(target=func)
540                t.start()
541                threads.append(t)
542            for _ in xrange(100):
543                time.sleep(duration/100)
544                with self._count_lock:
545                    if self.close_count-self.close_success_count > nb_workers+1:
546                        if test_support.verbose:
547                            print 'Q',
548                        break
549            time.sleep(duration)
550        finally:
551            self.do_continue = False
552            for t in threads:
553                t.join()
554
555    def _test_close_open_io(self, io_func, nb_workers=5):
556        def worker():
557            self._create_file()
558            funcs = itertools.cycle((
559                lambda: io_func(),
560                lambda: self._close_and_reopen_file(),
561            ))
562            for f in funcs:
563                if not self.do_continue:
564                    break
565                try:
566                    f()
567                except (IOError, ValueError):
568                    pass
569        self._run_workers(worker, nb_workers)
570        if test_support.verbose:
571            # Useful verbose statistics when tuning this test to take
572            # less time to run but still ensuring that its still useful.
573            #
574            # the percent of close calls that raised an error
575            percent = 100. - 100.*self.close_success_count/self.close_count
576            print self.close_count, ('%.4f ' % percent),
577
578    def test_close_open(self):
579        def io_func():
580            pass
581        self._test_close_open_io(io_func)
582
583    def test_close_open_flush(self):
584        def io_func():
585            self.f.flush()
586        self._test_close_open_io(io_func)
587
588    def test_close_open_iter(self):
589        def io_func():
590            list(iter(self.f))
591        self._test_close_open_io(io_func)
592
593    def test_close_open_isatty(self):
594        def io_func():
595            self.f.isatty()
596        self._test_close_open_io(io_func)
597
598    def test_close_open_print(self):
599        def io_func():
600            print >> self.f, ''
601        self._test_close_open_io(io_func)
602
603    def test_close_open_print_buffered(self):
604        self.use_buffering = True
605        def io_func():
606            print >> self.f, ''
607        self._test_close_open_io(io_func)
608
609    def test_close_open_read(self):
610        def io_func():
611            self.f.read(0)
612        self._test_close_open_io(io_func)
613
614    def test_close_open_readinto(self):
615        def io_func():
616            a = array('c', 'xxxxx')
617            self.f.readinto(a)
618        self._test_close_open_io(io_func)
619
620    def test_close_open_readline(self):
621        def io_func():
622            self.f.readline()
623        self._test_close_open_io(io_func)
624
625    def test_close_open_readlines(self):
626        def io_func():
627            self.f.readlines()
628        self._test_close_open_io(io_func)
629
630    def test_close_open_seek(self):
631        def io_func():
632            self.f.seek(0, 0)
633        self._test_close_open_io(io_func)
634
635    def test_close_open_tell(self):
636        def io_func():
637            self.f.tell()
638        self._test_close_open_io(io_func)
639
640    def test_close_open_truncate(self):
641        def io_func():
642            self.f.truncate()
643        self._test_close_open_io(io_func)
644
645    def test_close_open_write(self):
646        def io_func():
647            self.f.write('')
648        self._test_close_open_io(io_func)
649
650    def test_close_open_writelines(self):
651        def io_func():
652            self.f.writelines('')
653        self._test_close_open_io(io_func)
654
655    def test_iteration_torture(self):
656        # bpo-31530
657        with open(self.filename, "wb") as fp:
658            for i in xrange(2**20):
659                fp.write(b"0"*50 + b"\n")
660        with open(self.filename, "rb") as f:
661            def it():
662                for l in f:
663                    pass
664            self._run_workers(it, 10)
665
666    def test_iteration_seek(self):
667        # bpo-31530: Crash when concurrently seek and iterate over a file.
668        with open(self.filename, "wb") as fp:
669            for i in xrange(10000):
670                fp.write(b"0"*50 + b"\n")
671        with open(self.filename, "rb") as f:
672            it = iter([1] + [0]*10)  # one thread reads, others seek
673            def iterate():
674                if next(it):
675                    for l in f:
676                        pass
677                else:
678                    for i in xrange(100):
679                        f.seek(i*100, 0)
680            self._run_workers(iterate, 10)
681
682
683@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
684class TestFileSignalEINTR(unittest.TestCase):
685    def _test_reading(self, data_to_write, read_and_verify_code, method_name,
686                      universal_newlines=False):
687        """Generic buffered read method test harness to verify EINTR behavior.
688
689        Also validates that Python signal handlers are run during the read.
690
691        Args:
692            data_to_write: String to write to the child process for reading
693                before sending it a signal, confirming the signal was handled,
694                writing a final newline char and closing the infile pipe.
695            read_and_verify_code: Single "line" of code to read from a file
696                object named 'infile' and validate the result.  This will be
697                executed as part of a python subprocess fed data_to_write.
698            method_name: The name of the read method being tested, for use in
699                an error message on failure.
700            universal_newlines: If True, infile will be opened in universal
701                newline mode in the child process.
702        """
703        if universal_newlines:
704            # Test the \r\n -> \n conversion while we're at it.
705            data_to_write = data_to_write.replace('\n', '\r\n')
706            infile_setup_code = 'infile = os.fdopen(sys.stdin.fileno(), "rU")'
707        else:
708            infile_setup_code = 'infile = sys.stdin'
709        # Total pipe IO in this function is smaller than the minimum posix OS
710        # pipe buffer size of 512 bytes.  No writer should block.
711        assert len(data_to_write) < 512, 'data_to_write must fit in pipe buf.'
712
713        child_code = (
714             'import os, signal, sys ;'
715             'signal.signal('
716                     'signal.SIGINT, lambda s, f: sys.stderr.write("$\\n")) ;'
717             + infile_setup_code + ' ;' +
718             'assert isinstance(infile, file) ;'
719             'sys.stderr.write("Go.\\n") ;'
720             + read_and_verify_code)
721        reader_process = subprocess.Popen(
722                [sys.executable, '-c', child_code],
723                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
724                stderr=subprocess.PIPE)
725        # Wait for the signal handler to be installed.
726        go = reader_process.stderr.read(4)
727        if go != 'Go.\n':
728            reader_process.kill()
729            self.fail('Error from %s process while awaiting "Go":\n%s' % (
730                    method_name, go+reader_process.stderr.read()))
731        reader_process.stdin.write(data_to_write)
732        signals_sent = 0
733        rlist = []
734        # We don't know when the read_and_verify_code in our child is actually
735        # executing within the read system call we want to interrupt.  This
736        # loop waits for a bit before sending the first signal to increase
737        # the likelihood of that.  Implementations without correct EINTR
738        # and signal handling usually fail this test.
739        while not rlist:
740            rlist, _, _ = select.select([reader_process.stderr], (), (), 0.05)
741            reader_process.send_signal(signal.SIGINT)
742            # Give the subprocess time to handle it before we loop around and
743            # send another one.  On OSX the second signal happening close to
744            # immediately after the first was causing the subprocess to crash
745            # via the OS's default SIGINT handler.
746            time.sleep(0.1)
747            signals_sent += 1
748            if signals_sent > 200:
749                reader_process.kill()
750                self.fail("failed to handle signal during %s." % method_name)
751        # This assumes anything unexpected that writes to stderr will also
752        # write a newline.  That is true of the traceback printing code.
753        signal_line = reader_process.stderr.readline()
754        if signal_line != '$\n':
755            reader_process.kill()
756            self.fail('Error from %s process while awaiting signal:\n%s' % (
757                    method_name, signal_line+reader_process.stderr.read()))
758        # We append a newline to our input so that a readline call can
759        # end on its own before the EOF is seen.
760        stdout, stderr = reader_process.communicate(input='\n')
761        if reader_process.returncode != 0:
762            self.fail('%s() process exited rc=%d.\nSTDOUT:\n%s\nSTDERR:\n%s' % (
763                    method_name, reader_process.returncode, stdout, stderr))
764
765    def test_readline(self, universal_newlines=False):
766        """file.readline must handle signals and not lose data."""
767        self._test_reading(
768                data_to_write='hello, world!',
769                read_and_verify_code=(
770                        'line = infile.readline() ;'
771                        'expected_line = "hello, world!\\n" ;'
772                        'assert line == expected_line, ('
773                        '"read %r expected %r" % (line, expected_line))'
774                ),
775                method_name='readline',
776                universal_newlines=universal_newlines)
777
778    def test_readline_with_universal_newlines(self):
779        self.test_readline(universal_newlines=True)
780
781    def test_readlines(self, universal_newlines=False):
782        """file.readlines must handle signals and not lose data."""
783        self._test_reading(
784                data_to_write='hello\nworld!',
785                read_and_verify_code=(
786                        'lines = infile.readlines() ;'
787                        'expected_lines = ["hello\\n", "world!\\n"] ;'
788                        'assert lines == expected_lines, ('
789                        '"readlines returned wrong data.\\n" '
790                        '"got lines %r\\nexpected  %r" '
791                        '% (lines, expected_lines))'
792                ),
793                method_name='readlines',
794                universal_newlines=universal_newlines)
795
796    def test_readlines_with_universal_newlines(self):
797        self.test_readlines(universal_newlines=True)
798
799    def test_readall(self):
800        """Unbounded file.read() must handle signals and not lose data."""
801        self._test_reading(
802                data_to_write='hello, world!abcdefghijklm',
803                read_and_verify_code=(
804                        'data = infile.read() ;'
805                        'expected_data = "hello, world!abcdefghijklm\\n";'
806                        'assert data == expected_data, ('
807                        '"read %r expected %r" % (data, expected_data))'
808                ),
809                method_name='unbounded read')
810
811    def test_readinto(self):
812        """file.readinto must handle signals and not lose data."""
813        self._test_reading(
814                data_to_write='hello, world!',
815                read_and_verify_code=(
816                        'data = bytearray(50) ;'
817                        'num_read = infile.readinto(data) ;'
818                        'expected_data = "hello, world!\\n";'
819                        'assert data[:num_read] == expected_data, ('
820                        '"read %r expected %r" % (data, expected_data))'
821                ),
822                method_name='readinto')
823
824
825class StdoutTests(unittest.TestCase):
826
827    def test_move_stdout_on_write(self):
828        # Issue 3242: sys.stdout can be replaced (and freed) during a
829        # print statement; prevent a segfault in this case
830        save_stdout = sys.stdout
831
832        class File:
833            def write(self, data):
834                if '\n' in data:
835                    sys.stdout = save_stdout
836
837        try:
838            sys.stdout = File()
839            print "some text"
840        finally:
841            sys.stdout = save_stdout
842
843    def test_del_stdout_before_print(self):
844        # Issue 4597: 'print' with no argument wasn't reporting when
845        # sys.stdout was deleted.
846        save_stdout = sys.stdout
847        del sys.stdout
848        try:
849            print
850        except RuntimeError as e:
851            self.assertEqual(str(e), "lost sys.stdout")
852        else:
853            self.fail("Expected RuntimeError")
854        finally:
855            sys.stdout = save_stdout
856
857    def test_unicode(self):
858        import subprocess
859
860        def get_message(encoding, *code):
861            code = '\n'.join(code)
862            env = os.environ.copy()
863            env['PYTHONIOENCODING'] = encoding
864            process = subprocess.Popen([sys.executable, "-c", code],
865                                       stdout=subprocess.PIPE, env=env)
866            stdout, stderr = process.communicate()
867            self.assertEqual(process.returncode, 0)
868            return stdout
869
870        def check_message(text, encoding, expected):
871            stdout = get_message(encoding,
872                "import sys",
873                "sys.stdout.write(%r)" % text,
874                "sys.stdout.flush()")
875            self.assertEqual(stdout, expected)
876
877        # test the encoding
878        check_message(u'15\u20ac', "iso-8859-15", "15\xa4")
879        check_message(u'15\u20ac', "utf-8", '15\xe2\x82\xac')
880        check_message(u'15\u20ac', "utf-16-le", '1\x005\x00\xac\x20')
881
882        # test the error handler
883        check_message(u'15\u20ac', "iso-8859-1:ignore", "15")
884        check_message(u'15\u20ac', "iso-8859-1:replace", "15?")
885        check_message(u'15\u20ac', "iso-8859-1:backslashreplace", "15\\u20ac")
886
887        # test the buffer API
888        for objtype in ('buffer', 'bytearray'):
889            stdout = get_message('ascii',
890                'import sys',
891                r'sys.stdout.write(%s("\xe9"))' % objtype,
892                'sys.stdout.flush()')
893            self.assertEqual(stdout, "\xe9")
894
895
896def test_main():
897    # Historically, these tests have been sloppy about removing TESTFN.
898    # So get rid of it no matter what.
899    try:
900        run_unittest(AutoFileTests, OtherFileTests, FileSubclassTests,
901            FileThreadingTests, TestFileSignalEINTR, StdoutTests)
902    finally:
903        if os.path.exists(TESTFN):
904            os.unlink(TESTFN)
905
906if __name__ == '__main__':
907    test_main()
908