1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import sysconfig
32import textwrap
33import threading
34import time
35import unittest
36import warnings
37import weakref
38from collections import deque, UserList
39from itertools import cycle, count
40from test import support
41from test.support.script_helper import (
42    assert_python_ok, assert_python_failure, run_python_until_end)
43from test.support import FakePath
44
45import codecs
46import io  # C implementation of io
47import _pyio as pyio # Python implementation of io
48
49try:
50    import ctypes
51except ImportError:
52    def byteslike(*pos, **kw):
53        return array.array("b", bytes(*pos, **kw))
54else:
55    def byteslike(*pos, **kw):
56        """Create a bytes-like object having no string or sequence methods"""
57        data = bytes(*pos, **kw)
58        obj = EmptyStruct()
59        ctypes.resize(obj, len(data))
60        memoryview(obj).cast("B")[:] = data
61        return obj
62    class EmptyStruct(ctypes.Structure):
63        pass
64
65_cflags = sysconfig.get_config_var('CFLAGS') or ''
66_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
67MEMORY_SANITIZER = (
68    '-fsanitize=memory' in _cflags or
69    '--with-memory-sanitizer' in _config_args
70)
71
72# Does io.IOBase finalizer log the exception if the close() method fails?
73# The exception is ignored silently by default in release build.
74IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
75
76
77def _default_chunk_size():
78    """Get the default TextIOWrapper chunk size"""
79    with open(__file__, "r", encoding="latin-1") as f:
80        return f._CHUNK_SIZE
81
82
83class MockRawIOWithoutRead:
84    """A RawIO implementation without read(), so as to exercise the default
85    RawIO.read() which calls readinto()."""
86
87    def __init__(self, read_stack=()):
88        self._read_stack = list(read_stack)
89        self._write_stack = []
90        self._reads = 0
91        self._extraneous_reads = 0
92
93    def write(self, b):
94        self._write_stack.append(bytes(b))
95        return len(b)
96
97    def writable(self):
98        return True
99
100    def fileno(self):
101        return 42
102
103    def readable(self):
104        return True
105
106    def seekable(self):
107        return True
108
109    def seek(self, pos, whence):
110        return 0   # wrong but we gotta return something
111
112    def tell(self):
113        return 0   # same comment as above
114
115    def readinto(self, buf):
116        self._reads += 1
117        max_len = len(buf)
118        try:
119            data = self._read_stack[0]
120        except IndexError:
121            self._extraneous_reads += 1
122            return 0
123        if data is None:
124            del self._read_stack[0]
125            return None
126        n = len(data)
127        if len(data) <= max_len:
128            del self._read_stack[0]
129            buf[:n] = data
130            return n
131        else:
132            buf[:] = data[:max_len]
133            self._read_stack[0] = data[max_len:]
134            return max_len
135
136    def truncate(self, pos=None):
137        return pos
138
139class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
140    pass
141
142class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
143    pass
144
145
146class MockRawIO(MockRawIOWithoutRead):
147
148    def read(self, n=None):
149        self._reads += 1
150        try:
151            return self._read_stack.pop(0)
152        except:
153            self._extraneous_reads += 1
154            return b""
155
156class CMockRawIO(MockRawIO, io.RawIOBase):
157    pass
158
159class PyMockRawIO(MockRawIO, pyio.RawIOBase):
160    pass
161
162
163class MisbehavedRawIO(MockRawIO):
164    def write(self, b):
165        return super().write(b) * 2
166
167    def read(self, n=None):
168        return super().read(n) * 2
169
170    def seek(self, pos, whence):
171        return -123
172
173    def tell(self):
174        return -456
175
176    def readinto(self, buf):
177        super().readinto(buf)
178        return len(buf) * 5
179
180class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
181    pass
182
183class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
184    pass
185
186
187class SlowFlushRawIO(MockRawIO):
188    def __init__(self):
189        super().__init__()
190        self.in_flush = threading.Event()
191
192    def flush(self):
193        self.in_flush.set()
194        time.sleep(0.25)
195
196class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
197    pass
198
199class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
200    pass
201
202
203class CloseFailureIO(MockRawIO):
204    closed = 0
205
206    def close(self):
207        if not self.closed:
208            self.closed = 1
209            raise OSError
210
211class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
212    pass
213
214class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
215    pass
216
217
218class MockFileIO:
219
220    def __init__(self, data):
221        self.read_history = []
222        super().__init__(data)
223
224    def read(self, n=None):
225        res = super().read(n)
226        self.read_history.append(None if res is None else len(res))
227        return res
228
229    def readinto(self, b):
230        res = super().readinto(b)
231        self.read_history.append(res)
232        return res
233
234class CMockFileIO(MockFileIO, io.BytesIO):
235    pass
236
237class PyMockFileIO(MockFileIO, pyio.BytesIO):
238    pass
239
240
241class MockUnseekableIO:
242    def seekable(self):
243        return False
244
245    def seek(self, *args):
246        raise self.UnsupportedOperation("not seekable")
247
248    def tell(self, *args):
249        raise self.UnsupportedOperation("not seekable")
250
251    def truncate(self, *args):
252        raise self.UnsupportedOperation("not seekable")
253
254class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
255    UnsupportedOperation = io.UnsupportedOperation
256
257class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
258    UnsupportedOperation = pyio.UnsupportedOperation
259
260
261class MockNonBlockWriterIO:
262
263    def __init__(self):
264        self._write_stack = []
265        self._blocker_char = None
266
267    def pop_written(self):
268        s = b"".join(self._write_stack)
269        self._write_stack[:] = []
270        return s
271
272    def block_on(self, char):
273        """Block when a given char is encountered."""
274        self._blocker_char = char
275
276    def readable(self):
277        return True
278
279    def seekable(self):
280        return True
281
282    def seek(self, pos, whence=0):
283        # naive implementation, enough for tests
284        return 0
285
286    def writable(self):
287        return True
288
289    def write(self, b):
290        b = bytes(b)
291        n = -1
292        if self._blocker_char:
293            try:
294                n = b.index(self._blocker_char)
295            except ValueError:
296                pass
297            else:
298                if n > 0:
299                    # write data up to the first blocker
300                    self._write_stack.append(b[:n])
301                    return n
302                else:
303                    # cancel blocker and indicate would block
304                    self._blocker_char = None
305                    return None
306        self._write_stack.append(b)
307        return len(b)
308
309class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
310    BlockingIOError = io.BlockingIOError
311
312class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
313    BlockingIOError = pyio.BlockingIOError
314
315
316class IOTest(unittest.TestCase):
317
318    def setUp(self):
319        support.unlink(support.TESTFN)
320
321    def tearDown(self):
322        support.unlink(support.TESTFN)
323
324    def write_ops(self, f):
325        self.assertEqual(f.write(b"blah."), 5)
326        f.truncate(0)
327        self.assertEqual(f.tell(), 5)
328        f.seek(0)
329
330        self.assertEqual(f.write(b"blah."), 5)
331        self.assertEqual(f.seek(0), 0)
332        self.assertEqual(f.write(b"Hello."), 6)
333        self.assertEqual(f.tell(), 6)
334        self.assertEqual(f.seek(-1, 1), 5)
335        self.assertEqual(f.tell(), 5)
336        buffer = bytearray(b" world\n\n\n")
337        self.assertEqual(f.write(buffer), 9)
338        buffer[:] = b"*" * 9  # Overwrite our copy of the data
339        self.assertEqual(f.seek(0), 0)
340        self.assertEqual(f.write(b"h"), 1)
341        self.assertEqual(f.seek(-1, 2), 13)
342        self.assertEqual(f.tell(), 13)
343
344        self.assertEqual(f.truncate(12), 12)
345        self.assertEqual(f.tell(), 13)
346        self.assertRaises(TypeError, f.seek, 0.0)
347
348    def read_ops(self, f, buffered=False):
349        data = f.read(5)
350        self.assertEqual(data, b"hello")
351        data = byteslike(data)
352        self.assertEqual(f.readinto(data), 5)
353        self.assertEqual(bytes(data), b" worl")
354        data = bytearray(5)
355        self.assertEqual(f.readinto(data), 2)
356        self.assertEqual(len(data), 5)
357        self.assertEqual(data[:2], b"d\n")
358        self.assertEqual(f.seek(0), 0)
359        self.assertEqual(f.read(20), b"hello world\n")
360        self.assertEqual(f.read(1), b"")
361        self.assertEqual(f.readinto(byteslike(b"x")), 0)
362        self.assertEqual(f.seek(-6, 2), 6)
363        self.assertEqual(f.read(5), b"world")
364        self.assertEqual(f.read(0), b"")
365        self.assertEqual(f.readinto(byteslike()), 0)
366        self.assertEqual(f.seek(-6, 1), 5)
367        self.assertEqual(f.read(5), b" worl")
368        self.assertEqual(f.tell(), 10)
369        self.assertRaises(TypeError, f.seek, 0.0)
370        if buffered:
371            f.seek(0)
372            self.assertEqual(f.read(), b"hello world\n")
373            f.seek(6)
374            self.assertEqual(f.read(), b"world\n")
375            self.assertEqual(f.read(), b"")
376            f.seek(0)
377            data = byteslike(5)
378            self.assertEqual(f.readinto1(data), 5)
379            self.assertEqual(bytes(data), b"hello")
380
381    LARGE = 2**31
382
383    def large_file_ops(self, f):
384        assert f.readable()
385        assert f.writable()
386        try:
387            self.assertEqual(f.seek(self.LARGE), self.LARGE)
388        except (OverflowError, ValueError):
389            self.skipTest("no largefile support")
390        self.assertEqual(f.tell(), self.LARGE)
391        self.assertEqual(f.write(b"xxx"), 3)
392        self.assertEqual(f.tell(), self.LARGE + 3)
393        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
394        self.assertEqual(f.truncate(), self.LARGE + 2)
395        self.assertEqual(f.tell(), self.LARGE + 2)
396        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
397        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
398        self.assertEqual(f.tell(), self.LARGE + 2)
399        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
400        self.assertEqual(f.seek(-1, 2), self.LARGE)
401        self.assertEqual(f.read(2), b"x")
402
403    def test_invalid_operations(self):
404        # Try writing on a file opened in read mode and vice-versa.
405        exc = self.UnsupportedOperation
406        for mode in ("w", "wb"):
407            with self.open(support.TESTFN, mode) as fp:
408                self.assertRaises(exc, fp.read)
409                self.assertRaises(exc, fp.readline)
410        with self.open(support.TESTFN, "wb", buffering=0) as fp:
411            self.assertRaises(exc, fp.read)
412            self.assertRaises(exc, fp.readline)
413        with self.open(support.TESTFN, "rb", buffering=0) as fp:
414            self.assertRaises(exc, fp.write, b"blah")
415            self.assertRaises(exc, fp.writelines, [b"blah\n"])
416        with self.open(support.TESTFN, "rb") as fp:
417            self.assertRaises(exc, fp.write, b"blah")
418            self.assertRaises(exc, fp.writelines, [b"blah\n"])
419        with self.open(support.TESTFN, "r") as fp:
420            self.assertRaises(exc, fp.write, "blah")
421            self.assertRaises(exc, fp.writelines, ["blah\n"])
422            # Non-zero seeking from current or end pos
423            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
424            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
425
426    def test_optional_abilities(self):
427        # Test for OSError when optional APIs are not supported
428        # The purpose of this test is to try fileno(), reading, writing and
429        # seeking operations with various objects that indicate they do not
430        # support these operations.
431
432        def pipe_reader():
433            [r, w] = os.pipe()
434            os.close(w)  # So that read() is harmless
435            return self.FileIO(r, "r")
436
437        def pipe_writer():
438            [r, w] = os.pipe()
439            self.addCleanup(os.close, r)
440            # Guarantee that we can write into the pipe without blocking
441            thread = threading.Thread(target=os.read, args=(r, 100))
442            thread.start()
443            self.addCleanup(thread.join)
444            return self.FileIO(w, "w")
445
446        def buffered_reader():
447            return self.BufferedReader(self.MockUnseekableIO())
448
449        def buffered_writer():
450            return self.BufferedWriter(self.MockUnseekableIO())
451
452        def buffered_random():
453            return self.BufferedRandom(self.BytesIO())
454
455        def buffered_rw_pair():
456            return self.BufferedRWPair(self.MockUnseekableIO(),
457                self.MockUnseekableIO())
458
459        def text_reader():
460            class UnseekableReader(self.MockUnseekableIO):
461                writable = self.BufferedIOBase.writable
462                write = self.BufferedIOBase.write
463            return self.TextIOWrapper(UnseekableReader(), "ascii")
464
465        def text_writer():
466            class UnseekableWriter(self.MockUnseekableIO):
467                readable = self.BufferedIOBase.readable
468                read = self.BufferedIOBase.read
469            return self.TextIOWrapper(UnseekableWriter(), "ascii")
470
471        tests = (
472            (pipe_reader, "fr"), (pipe_writer, "fw"),
473            (buffered_reader, "r"), (buffered_writer, "w"),
474            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
475            (text_reader, "r"), (text_writer, "w"),
476            (self.BytesIO, "rws"), (self.StringIO, "rws"),
477        )
478        for [test, abilities] in tests:
479            with self.subTest(test), test() as obj:
480                readable = "r" in abilities
481                self.assertEqual(obj.readable(), readable)
482                writable = "w" in abilities
483                self.assertEqual(obj.writable(), writable)
484
485                if isinstance(obj, self.TextIOBase):
486                    data = "3"
487                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
488                    data = b"3"
489                else:
490                    self.fail("Unknown base class")
491
492                if "f" in abilities:
493                    obj.fileno()
494                else:
495                    self.assertRaises(OSError, obj.fileno)
496
497                if readable:
498                    obj.read(1)
499                    obj.read()
500                else:
501                    self.assertRaises(OSError, obj.read, 1)
502                    self.assertRaises(OSError, obj.read)
503
504                if writable:
505                    obj.write(data)
506                else:
507                    self.assertRaises(OSError, obj.write, data)
508
509                if sys.platform.startswith("win") and test in (
510                        pipe_reader, pipe_writer):
511                    # Pipes seem to appear as seekable on Windows
512                    continue
513                seekable = "s" in abilities
514                self.assertEqual(obj.seekable(), seekable)
515
516                if seekable:
517                    obj.tell()
518                    obj.seek(0)
519                else:
520                    self.assertRaises(OSError, obj.tell)
521                    self.assertRaises(OSError, obj.seek, 0)
522
523                if writable and seekable:
524                    obj.truncate()
525                    obj.truncate(0)
526                else:
527                    self.assertRaises(OSError, obj.truncate)
528                    self.assertRaises(OSError, obj.truncate, 0)
529
530    def test_open_handles_NUL_chars(self):
531        fn_with_NUL = 'foo\0bar'
532        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
533
534        bytes_fn = bytes(fn_with_NUL, 'ascii')
535        with warnings.catch_warnings():
536            warnings.simplefilter("ignore", DeprecationWarning)
537            self.assertRaises(ValueError, self.open, bytes_fn, 'w')
538
539    def test_raw_file_io(self):
540        with self.open(support.TESTFN, "wb", buffering=0) as f:
541            self.assertEqual(f.readable(), False)
542            self.assertEqual(f.writable(), True)
543            self.assertEqual(f.seekable(), True)
544            self.write_ops(f)
545        with self.open(support.TESTFN, "rb", buffering=0) as f:
546            self.assertEqual(f.readable(), True)
547            self.assertEqual(f.writable(), False)
548            self.assertEqual(f.seekable(), True)
549            self.read_ops(f)
550
551    def test_buffered_file_io(self):
552        with self.open(support.TESTFN, "wb") as f:
553            self.assertEqual(f.readable(), False)
554            self.assertEqual(f.writable(), True)
555            self.assertEqual(f.seekable(), True)
556            self.write_ops(f)
557        with self.open(support.TESTFN, "rb") as f:
558            self.assertEqual(f.readable(), True)
559            self.assertEqual(f.writable(), False)
560            self.assertEqual(f.seekable(), True)
561            self.read_ops(f, True)
562
563    def test_readline(self):
564        with self.open(support.TESTFN, "wb") as f:
565            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
566        with self.open(support.TESTFN, "rb") as f:
567            self.assertEqual(f.readline(), b"abc\n")
568            self.assertEqual(f.readline(10), b"def\n")
569            self.assertEqual(f.readline(2), b"xy")
570            self.assertEqual(f.readline(4), b"zzy\n")
571            self.assertEqual(f.readline(), b"foo\x00bar\n")
572            self.assertEqual(f.readline(None), b"another line")
573            self.assertRaises(TypeError, f.readline, 5.3)
574        with self.open(support.TESTFN, "r") as f:
575            self.assertRaises(TypeError, f.readline, 5.3)
576
577    def test_readline_nonsizeable(self):
578        # Issue #30061
579        # Crash when readline() returns an object without __len__
580        class R(self.IOBase):
581            def readline(self):
582                return None
583        self.assertRaises((TypeError, StopIteration), next, R())
584
585    def test_next_nonsizeable(self):
586        # Issue #30061
587        # Crash when __next__() returns an object without __len__
588        class R(self.IOBase):
589            def __next__(self):
590                return None
591        self.assertRaises(TypeError, R().readlines, 1)
592
593    def test_raw_bytes_io(self):
594        f = self.BytesIO()
595        self.write_ops(f)
596        data = f.getvalue()
597        self.assertEqual(data, b"hello world\n")
598        f = self.BytesIO(data)
599        self.read_ops(f, True)
600
601    def test_large_file_ops(self):
602        # On Windows and Mac OSX this test consumes large resources; It takes
603        # a long time to build the >2 GiB file and takes >2 GiB of disk space
604        # therefore the resource must be enabled to run this test.
605        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
606            support.requires(
607                'largefile',
608                'test requires %s bytes and a long time to run' % self.LARGE)
609        with self.open(support.TESTFN, "w+b", 0) as f:
610            self.large_file_ops(f)
611        with self.open(support.TESTFN, "w+b") as f:
612            self.large_file_ops(f)
613
614    def test_with_open(self):
615        for bufsize in (0, 100):
616            f = None
617            with self.open(support.TESTFN, "wb", bufsize) as f:
618                f.write(b"xxx")
619            self.assertEqual(f.closed, True)
620            f = None
621            try:
622                with self.open(support.TESTFN, "wb", bufsize) as f:
623                    1/0
624            except ZeroDivisionError:
625                self.assertEqual(f.closed, True)
626            else:
627                self.fail("1/0 didn't raise an exception")
628
629    # issue 5008
630    def test_append_mode_tell(self):
631        with self.open(support.TESTFN, "wb") as f:
632            f.write(b"xxx")
633        with self.open(support.TESTFN, "ab", buffering=0) as f:
634            self.assertEqual(f.tell(), 3)
635        with self.open(support.TESTFN, "ab") as f:
636            self.assertEqual(f.tell(), 3)
637        with self.open(support.TESTFN, "a") as f:
638            self.assertGreater(f.tell(), 0)
639
640    def test_destructor(self):
641        record = []
642        class MyFileIO(self.FileIO):
643            def __del__(self):
644                record.append(1)
645                try:
646                    f = super().__del__
647                except AttributeError:
648                    pass
649                else:
650                    f()
651            def close(self):
652                record.append(2)
653                super().close()
654            def flush(self):
655                record.append(3)
656                super().flush()
657        with support.check_warnings(('', ResourceWarning)):
658            f = MyFileIO(support.TESTFN, "wb")
659            f.write(b"xxx")
660            del f
661            support.gc_collect()
662            self.assertEqual(record, [1, 2, 3])
663            with self.open(support.TESTFN, "rb") as f:
664                self.assertEqual(f.read(), b"xxx")
665
666    def _check_base_destructor(self, base):
667        record = []
668        class MyIO(base):
669            def __init__(self):
670                # This exercises the availability of attributes on object
671                # destruction.
672                # (in the C version, close() is called by the tp_dealloc
673                # function, not by __del__)
674                self.on_del = 1
675                self.on_close = 2
676                self.on_flush = 3
677            def __del__(self):
678                record.append(self.on_del)
679                try:
680                    f = super().__del__
681                except AttributeError:
682                    pass
683                else:
684                    f()
685            def close(self):
686                record.append(self.on_close)
687                super().close()
688            def flush(self):
689                record.append(self.on_flush)
690                super().flush()
691        f = MyIO()
692        del f
693        support.gc_collect()
694        self.assertEqual(record, [1, 2, 3])
695
696    def test_IOBase_destructor(self):
697        self._check_base_destructor(self.IOBase)
698
699    def test_RawIOBase_destructor(self):
700        self._check_base_destructor(self.RawIOBase)
701
702    def test_BufferedIOBase_destructor(self):
703        self._check_base_destructor(self.BufferedIOBase)
704
705    def test_TextIOBase_destructor(self):
706        self._check_base_destructor(self.TextIOBase)
707
708    def test_close_flushes(self):
709        with self.open(support.TESTFN, "wb") as f:
710            f.write(b"xxx")
711        with self.open(support.TESTFN, "rb") as f:
712            self.assertEqual(f.read(), b"xxx")
713
714    def test_array_writes(self):
715        a = array.array('i', range(10))
716        n = len(a.tobytes())
717        def check(f):
718            with f:
719                self.assertEqual(f.write(a), n)
720                f.writelines((a,))
721        check(self.BytesIO())
722        check(self.FileIO(support.TESTFN, "w"))
723        check(self.BufferedWriter(self.MockRawIO()))
724        check(self.BufferedRandom(self.MockRawIO()))
725        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
726
727    def test_closefd(self):
728        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
729                          closefd=False)
730
731    def test_read_closed(self):
732        with self.open(support.TESTFN, "w") as f:
733            f.write("egg\n")
734        with self.open(support.TESTFN, "r") as f:
735            file = self.open(f.fileno(), "r", closefd=False)
736            self.assertEqual(file.read(), "egg\n")
737            file.seek(0)
738            file.close()
739            self.assertRaises(ValueError, file.read)
740        with self.open(support.TESTFN, "rb") as f:
741            file = self.open(f.fileno(), "rb", closefd=False)
742            self.assertEqual(file.read()[:3], b"egg")
743            file.close()
744            self.assertRaises(ValueError, file.readinto, bytearray(1))
745
746    def test_no_closefd_with_filename(self):
747        # can't use closefd in combination with a file name
748        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
749
750    def test_closefd_attr(self):
751        with self.open(support.TESTFN, "wb") as f:
752            f.write(b"egg\n")
753        with self.open(support.TESTFN, "r") as f:
754            self.assertEqual(f.buffer.raw.closefd, True)
755            file = self.open(f.fileno(), "r", closefd=False)
756            self.assertEqual(file.buffer.raw.closefd, False)
757
758    def test_garbage_collection(self):
759        # FileIO objects are collected, and collecting them flushes
760        # all data to disk.
761        with support.check_warnings(('', ResourceWarning)):
762            f = self.FileIO(support.TESTFN, "wb")
763            f.write(b"abcxxx")
764            f.f = f
765            wr = weakref.ref(f)
766            del f
767            support.gc_collect()
768        self.assertIsNone(wr(), wr)
769        with self.open(support.TESTFN, "rb") as f:
770            self.assertEqual(f.read(), b"abcxxx")
771
772    def test_unbounded_file(self):
773        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
774        zero = "/dev/zero"
775        if not os.path.exists(zero):
776            self.skipTest("{0} does not exist".format(zero))
777        if sys.maxsize > 0x7FFFFFFF:
778            self.skipTest("test can only run in a 32-bit address space")
779        if support.real_max_memuse < support._2G:
780            self.skipTest("test requires at least 2 GiB of memory")
781        with self.open(zero, "rb", buffering=0) as f:
782            self.assertRaises(OverflowError, f.read)
783        with self.open(zero, "rb") as f:
784            self.assertRaises(OverflowError, f.read)
785        with self.open(zero, "r") as f:
786            self.assertRaises(OverflowError, f.read)
787
788    def check_flush_error_on_close(self, *args, **kwargs):
789        # Test that the file is closed despite failed flush
790        # and that flush() is called before file closed.
791        f = self.open(*args, **kwargs)
792        closed = []
793        def bad_flush():
794            closed[:] = [f.closed]
795            raise OSError()
796        f.flush = bad_flush
797        self.assertRaises(OSError, f.close) # exception not swallowed
798        self.assertTrue(f.closed)
799        self.assertTrue(closed)      # flush() called
800        self.assertFalse(closed[0])  # flush() called before file closed
801        f.flush = lambda: None  # break reference loop
802
803    def test_flush_error_on_close(self):
804        # raw file
805        # Issue #5700: io.FileIO calls flush() after file closed
806        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
807        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808        self.check_flush_error_on_close(fd, 'wb', buffering=0)
809        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
811        os.close(fd)
812        # buffered io
813        self.check_flush_error_on_close(support.TESTFN, 'wb')
814        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815        self.check_flush_error_on_close(fd, 'wb')
816        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817        self.check_flush_error_on_close(fd, 'wb', closefd=False)
818        os.close(fd)
819        # text io
820        self.check_flush_error_on_close(support.TESTFN, 'w')
821        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
822        self.check_flush_error_on_close(fd, 'w')
823        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
824        self.check_flush_error_on_close(fd, 'w', closefd=False)
825        os.close(fd)
826
827    def test_multi_close(self):
828        f = self.open(support.TESTFN, "wb", buffering=0)
829        f.close()
830        f.close()
831        f.close()
832        self.assertRaises(ValueError, f.flush)
833
834    def test_RawIOBase_read(self):
835        # Exercise the default limited RawIOBase.read(n) implementation (which
836        # calls readinto() internally).
837        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
838        self.assertEqual(rawio.read(2), b"ab")
839        self.assertEqual(rawio.read(2), b"c")
840        self.assertEqual(rawio.read(2), b"d")
841        self.assertEqual(rawio.read(2), None)
842        self.assertEqual(rawio.read(2), b"ef")
843        self.assertEqual(rawio.read(2), b"g")
844        self.assertEqual(rawio.read(2), None)
845        self.assertEqual(rawio.read(2), b"")
846
847    def test_types_have_dict(self):
848        test = (
849            self.IOBase(),
850            self.RawIOBase(),
851            self.TextIOBase(),
852            self.StringIO(),
853            self.BytesIO()
854        )
855        for obj in test:
856            self.assertTrue(hasattr(obj, "__dict__"))
857
858    def test_opener(self):
859        with self.open(support.TESTFN, "w") as f:
860            f.write("egg\n")
861        fd = os.open(support.TESTFN, os.O_RDONLY)
862        def opener(path, flags):
863            return fd
864        with self.open("non-existent", "r", opener=opener) as f:
865            self.assertEqual(f.read(), "egg\n")
866
867    def test_bad_opener_negative_1(self):
868        # Issue #27066.
869        def badopener(fname, flags):
870            return -1
871        with self.assertRaises(ValueError) as cm:
872            open('non-existent', 'r', opener=badopener)
873        self.assertEqual(str(cm.exception), 'opener returned -1')
874
875    def test_bad_opener_other_negative(self):
876        # Issue #27066.
877        def badopener(fname, flags):
878            return -2
879        with self.assertRaises(ValueError) as cm:
880            open('non-existent', 'r', opener=badopener)
881        self.assertEqual(str(cm.exception), 'opener returned -2')
882
883    def test_fileio_closefd(self):
884        # Issue #4841
885        with self.open(__file__, 'rb') as f1, \
886             self.open(__file__, 'rb') as f2:
887            fileio = self.FileIO(f1.fileno(), closefd=False)
888            # .__init__() must not close f1
889            fileio.__init__(f2.fileno(), closefd=False)
890            f1.readline()
891            # .close() must not close f2
892            fileio.close()
893            f2.readline()
894
895    def test_nonbuffered_textio(self):
896        with support.check_no_resource_warning(self):
897            with self.assertRaises(ValueError):
898                self.open(support.TESTFN, 'w', buffering=0)
899
900    def test_invalid_newline(self):
901        with support.check_no_resource_warning(self):
902            with self.assertRaises(ValueError):
903                self.open(support.TESTFN, 'w', newline='invalid')
904
905    def test_buffered_readinto_mixin(self):
906        # Test the implementation provided by BufferedIOBase
907        class Stream(self.BufferedIOBase):
908            def read(self, size):
909                return b"12345"
910            read1 = read
911        stream = Stream()
912        for method in ("readinto", "readinto1"):
913            with self.subTest(method):
914                buffer = byteslike(5)
915                self.assertEqual(getattr(stream, method)(buffer), 5)
916                self.assertEqual(bytes(buffer), b"12345")
917
918    def test_fspath_support(self):
919        def check_path_succeeds(path):
920            with self.open(path, "w") as f:
921                f.write("egg\n")
922
923            with self.open(path, "r") as f:
924                self.assertEqual(f.read(), "egg\n")
925
926        check_path_succeeds(FakePath(support.TESTFN))
927        check_path_succeeds(FakePath(os.fsencode(support.TESTFN)))
928
929        with self.open(support.TESTFN, "w") as f:
930            bad_path = FakePath(f.fileno())
931            with self.assertRaises(TypeError):
932                self.open(bad_path, 'w')
933
934        bad_path = FakePath(None)
935        with self.assertRaises(TypeError):
936            self.open(bad_path, 'w')
937
938        bad_path = FakePath(FloatingPointError)
939        with self.assertRaises(FloatingPointError):
940            self.open(bad_path, 'w')
941
942        # ensure that refcounting is correct with some error conditions
943        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
944            self.open(FakePath(support.TESTFN), 'rwxa')
945
946    def test_RawIOBase_readall(self):
947        # Exercise the default unlimited RawIOBase.read() and readall()
948        # implementations.
949        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950        self.assertEqual(rawio.read(), b"abcdefg")
951        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
952        self.assertEqual(rawio.readall(), b"abcdefg")
953
954    def test_BufferedIOBase_readinto(self):
955        # Exercise the default BufferedIOBase.readinto() and readinto1()
956        # implementations (which call read() or read1() internally).
957        class Reader(self.BufferedIOBase):
958            def __init__(self, avail):
959                self.avail = avail
960            def read(self, size):
961                result = self.avail[:size]
962                self.avail = self.avail[size:]
963                return result
964            def read1(self, size):
965                """Returns no more than 5 bytes at once"""
966                return self.read(min(size, 5))
967        tests = (
968            # (test method, total data available, read buffer size, expected
969            #     read size)
970            ("readinto", 10, 5, 5),
971            ("readinto", 10, 6, 6),  # More than read1() can return
972            ("readinto", 5, 6, 5),  # Buffer larger than total available
973            ("readinto", 6, 7, 6),
974            ("readinto", 10, 0, 0),  # Empty buffer
975            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
976            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
977            ("readinto1", 5, 6, 5),  # Buffer larger than total available
978            ("readinto1", 6, 7, 5),
979            ("readinto1", 10, 0, 0),  # Empty buffer
980        )
981        UNUSED_BYTE = 0x81
982        for test in tests:
983            with self.subTest(test):
984                method, avail, request, result = test
985                reader = Reader(bytes(range(avail)))
986                buffer = bytearray((UNUSED_BYTE,) * request)
987                method = getattr(reader, method)
988                self.assertEqual(method(buffer), result)
989                self.assertEqual(len(buffer), request)
990                self.assertSequenceEqual(buffer[:result], range(result))
991                unused = (UNUSED_BYTE,) * (request - result)
992                self.assertSequenceEqual(buffer[result:], unused)
993                self.assertEqual(len(reader.avail), avail - result)
994
995    def test_close_assert(self):
996        class R(self.IOBase):
997            def __setattr__(self, name, value):
998                pass
999            def flush(self):
1000                raise OSError()
1001        f = R()
1002        # This would cause an assertion failure.
1003        self.assertRaises(OSError, f.close)
1004
1005        # Silence destructor error
1006        R.flush = lambda self: None
1007
1008
1009class CIOTest(IOTest):
1010
1011    def test_IOBase_finalize(self):
1012        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1013        # class which inherits IOBase and an object of this class are caught
1014        # in a reference cycle and close() is already in the method cache.
1015        class MyIO(self.IOBase):
1016            def close(self):
1017                pass
1018
1019        # create an instance to populate the method cache
1020        MyIO()
1021        obj = MyIO()
1022        obj.obj = obj
1023        wr = weakref.ref(obj)
1024        del MyIO
1025        del obj
1026        support.gc_collect()
1027        self.assertIsNone(wr(), wr)
1028
1029class PyIOTest(IOTest):
1030    pass
1031
1032
1033@support.cpython_only
1034class APIMismatchTest(unittest.TestCase):
1035
1036    def test_RawIOBase_io_in_pyio_match(self):
1037        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1038        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1039                                               ignore=('__weakref__',))
1040        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1041
1042    def test_RawIOBase_pyio_in_io_match(self):
1043        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1044        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1045        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1046
1047
1048class CommonBufferedTests:
1049    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1050
1051    def test_detach(self):
1052        raw = self.MockRawIO()
1053        buf = self.tp(raw)
1054        self.assertIs(buf.detach(), raw)
1055        self.assertRaises(ValueError, buf.detach)
1056
1057        repr(buf)  # Should still work
1058
1059    def test_fileno(self):
1060        rawio = self.MockRawIO()
1061        bufio = self.tp(rawio)
1062
1063        self.assertEqual(42, bufio.fileno())
1064
1065    def test_invalid_args(self):
1066        rawio = self.MockRawIO()
1067        bufio = self.tp(rawio)
1068        # Invalid whence
1069        self.assertRaises(ValueError, bufio.seek, 0, -1)
1070        self.assertRaises(ValueError, bufio.seek, 0, 9)
1071
1072    def test_override_destructor(self):
1073        tp = self.tp
1074        record = []
1075        class MyBufferedIO(tp):
1076            def __del__(self):
1077                record.append(1)
1078                try:
1079                    f = super().__del__
1080                except AttributeError:
1081                    pass
1082                else:
1083                    f()
1084            def close(self):
1085                record.append(2)
1086                super().close()
1087            def flush(self):
1088                record.append(3)
1089                super().flush()
1090        rawio = self.MockRawIO()
1091        bufio = MyBufferedIO(rawio)
1092        del bufio
1093        support.gc_collect()
1094        self.assertEqual(record, [1, 2, 3])
1095
1096    def test_context_manager(self):
1097        # Test usability as a context manager
1098        rawio = self.MockRawIO()
1099        bufio = self.tp(rawio)
1100        def _with():
1101            with bufio:
1102                pass
1103        _with()
1104        # bufio should now be closed, and using it a second time should raise
1105        # a ValueError.
1106        self.assertRaises(ValueError, _with)
1107
1108    def test_error_through_destructor(self):
1109        # Test that the exception state is not modified by a destructor,
1110        # even if close() fails.
1111        rawio = self.CloseFailureIO()
1112        with support.catch_unraisable_exception() as cm:
1113            with self.assertRaises(AttributeError):
1114                self.tp(rawio).xyzzy
1115
1116            if not IOBASE_EMITS_UNRAISABLE:
1117                self.assertIsNone(cm.unraisable)
1118            elif cm.unraisable is not None:
1119                self.assertEqual(cm.unraisable.exc_type, OSError)
1120
1121    def test_repr(self):
1122        raw = self.MockRawIO()
1123        b = self.tp(raw)
1124        clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1125        self.assertRegex(repr(b), "<%s>" % clsname)
1126        raw.name = "dummy"
1127        self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
1128        raw.name = b"dummy"
1129        self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
1130
1131    def test_recursive_repr(self):
1132        # Issue #25455
1133        raw = self.MockRawIO()
1134        b = self.tp(raw)
1135        with support.swap_attr(raw, 'name', b):
1136            try:
1137                repr(b)  # Should not crash
1138            except RuntimeError:
1139                pass
1140
1141    def test_flush_error_on_close(self):
1142        # Test that buffered file is closed despite failed flush
1143        # and that flush() is called before file closed.
1144        raw = self.MockRawIO()
1145        closed = []
1146        def bad_flush():
1147            closed[:] = [b.closed, raw.closed]
1148            raise OSError()
1149        raw.flush = bad_flush
1150        b = self.tp(raw)
1151        self.assertRaises(OSError, b.close) # exception not swallowed
1152        self.assertTrue(b.closed)
1153        self.assertTrue(raw.closed)
1154        self.assertTrue(closed)      # flush() called
1155        self.assertFalse(closed[0])  # flush() called before file closed
1156        self.assertFalse(closed[1])
1157        raw.flush = lambda: None  # break reference loop
1158
1159    def test_close_error_on_close(self):
1160        raw = self.MockRawIO()
1161        def bad_flush():
1162            raise OSError('flush')
1163        def bad_close():
1164            raise OSError('close')
1165        raw.close = bad_close
1166        b = self.tp(raw)
1167        b.flush = bad_flush
1168        with self.assertRaises(OSError) as err: # exception not swallowed
1169            b.close()
1170        self.assertEqual(err.exception.args, ('close',))
1171        self.assertIsInstance(err.exception.__context__, OSError)
1172        self.assertEqual(err.exception.__context__.args, ('flush',))
1173        self.assertFalse(b.closed)
1174
1175        # Silence destructor error
1176        raw.close = lambda: None
1177        b.flush = lambda: None
1178
1179    def test_nonnormalized_close_error_on_close(self):
1180        # Issue #21677
1181        raw = self.MockRawIO()
1182        def bad_flush():
1183            raise non_existing_flush
1184        def bad_close():
1185            raise non_existing_close
1186        raw.close = bad_close
1187        b = self.tp(raw)
1188        b.flush = bad_flush
1189        with self.assertRaises(NameError) as err: # exception not swallowed
1190            b.close()
1191        self.assertIn('non_existing_close', str(err.exception))
1192        self.assertIsInstance(err.exception.__context__, NameError)
1193        self.assertIn('non_existing_flush', str(err.exception.__context__))
1194        self.assertFalse(b.closed)
1195
1196        # Silence destructor error
1197        b.flush = lambda: None
1198        raw.close = lambda: None
1199
1200    def test_multi_close(self):
1201        raw = self.MockRawIO()
1202        b = self.tp(raw)
1203        b.close()
1204        b.close()
1205        b.close()
1206        self.assertRaises(ValueError, b.flush)
1207
1208    def test_unseekable(self):
1209        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1210        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1211        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1212
1213    def test_readonly_attributes(self):
1214        raw = self.MockRawIO()
1215        buf = self.tp(raw)
1216        x = self.MockRawIO()
1217        with self.assertRaises(AttributeError):
1218            buf.raw = x
1219
1220
1221class SizeofTest:
1222
1223    @support.cpython_only
1224    def test_sizeof(self):
1225        bufsize1 = 4096
1226        bufsize2 = 8192
1227        rawio = self.MockRawIO()
1228        bufio = self.tp(rawio, buffer_size=bufsize1)
1229        size = sys.getsizeof(bufio) - bufsize1
1230        rawio = self.MockRawIO()
1231        bufio = self.tp(rawio, buffer_size=bufsize2)
1232        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1233
1234    @support.cpython_only
1235    def test_buffer_freeing(self) :
1236        bufsize = 4096
1237        rawio = self.MockRawIO()
1238        bufio = self.tp(rawio, buffer_size=bufsize)
1239        size = sys.getsizeof(bufio) - bufsize
1240        bufio.close()
1241        self.assertEqual(sys.getsizeof(bufio), size)
1242
1243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1244    read_mode = "rb"
1245
1246    def test_constructor(self):
1247        rawio = self.MockRawIO([b"abc"])
1248        bufio = self.tp(rawio)
1249        bufio.__init__(rawio)
1250        bufio.__init__(rawio, buffer_size=1024)
1251        bufio.__init__(rawio, buffer_size=16)
1252        self.assertEqual(b"abc", bufio.read())
1253        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1254        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1255        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1256        rawio = self.MockRawIO([b"abc"])
1257        bufio.__init__(rawio)
1258        self.assertEqual(b"abc", bufio.read())
1259
1260    def test_uninitialized(self):
1261        bufio = self.tp.__new__(self.tp)
1262        del bufio
1263        bufio = self.tp.__new__(self.tp)
1264        self.assertRaisesRegex((ValueError, AttributeError),
1265                               'uninitialized|has no attribute',
1266                               bufio.read, 0)
1267        bufio.__init__(self.MockRawIO())
1268        self.assertEqual(bufio.read(0), b'')
1269
1270    def test_read(self):
1271        for arg in (None, 7):
1272            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273            bufio = self.tp(rawio)
1274            self.assertEqual(b"abcdefg", bufio.read(arg))
1275        # Invalid args
1276        self.assertRaises(ValueError, bufio.read, -2)
1277
1278    def test_read1(self):
1279        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1280        bufio = self.tp(rawio)
1281        self.assertEqual(b"a", bufio.read(1))
1282        self.assertEqual(b"b", bufio.read1(1))
1283        self.assertEqual(rawio._reads, 1)
1284        self.assertEqual(b"", bufio.read1(0))
1285        self.assertEqual(b"c", bufio.read1(100))
1286        self.assertEqual(rawio._reads, 1)
1287        self.assertEqual(b"d", bufio.read1(100))
1288        self.assertEqual(rawio._reads, 2)
1289        self.assertEqual(b"efg", bufio.read1(100))
1290        self.assertEqual(rawio._reads, 3)
1291        self.assertEqual(b"", bufio.read1(100))
1292        self.assertEqual(rawio._reads, 4)
1293
1294    def test_read1_arbitrary(self):
1295        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296        bufio = self.tp(rawio)
1297        self.assertEqual(b"a", bufio.read(1))
1298        self.assertEqual(b"bc", bufio.read1())
1299        self.assertEqual(b"d", bufio.read1())
1300        self.assertEqual(b"efg", bufio.read1(-1))
1301        self.assertEqual(rawio._reads, 3)
1302        self.assertEqual(b"", bufio.read1())
1303        self.assertEqual(rawio._reads, 4)
1304
1305    def test_readinto(self):
1306        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307        bufio = self.tp(rawio)
1308        b = bytearray(2)
1309        self.assertEqual(bufio.readinto(b), 2)
1310        self.assertEqual(b, b"ab")
1311        self.assertEqual(bufio.readinto(b), 2)
1312        self.assertEqual(b, b"cd")
1313        self.assertEqual(bufio.readinto(b), 2)
1314        self.assertEqual(b, b"ef")
1315        self.assertEqual(bufio.readinto(b), 1)
1316        self.assertEqual(b, b"gf")
1317        self.assertEqual(bufio.readinto(b), 0)
1318        self.assertEqual(b, b"gf")
1319        rawio = self.MockRawIO((b"abc", None))
1320        bufio = self.tp(rawio)
1321        self.assertEqual(bufio.readinto(b), 2)
1322        self.assertEqual(b, b"ab")
1323        self.assertEqual(bufio.readinto(b), 1)
1324        self.assertEqual(b, b"cb")
1325
1326    def test_readinto1(self):
1327        buffer_size = 10
1328        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1329        bufio = self.tp(rawio, buffer_size=buffer_size)
1330        b = bytearray(2)
1331        self.assertEqual(bufio.peek(3), b'abc')
1332        self.assertEqual(rawio._reads, 1)
1333        self.assertEqual(bufio.readinto1(b), 2)
1334        self.assertEqual(b, b"ab")
1335        self.assertEqual(rawio._reads, 1)
1336        self.assertEqual(bufio.readinto1(b), 1)
1337        self.assertEqual(b[:1], b"c")
1338        self.assertEqual(rawio._reads, 1)
1339        self.assertEqual(bufio.readinto1(b), 2)
1340        self.assertEqual(b, b"de")
1341        self.assertEqual(rawio._reads, 2)
1342        b = bytearray(2*buffer_size)
1343        self.assertEqual(bufio.peek(3), b'fgh')
1344        self.assertEqual(rawio._reads, 3)
1345        self.assertEqual(bufio.readinto1(b), 6)
1346        self.assertEqual(b[:6], b"fghjkl")
1347        self.assertEqual(rawio._reads, 4)
1348
1349    def test_readinto_array(self):
1350        buffer_size = 60
1351        data = b"a" * 26
1352        rawio = self.MockRawIO((data,))
1353        bufio = self.tp(rawio, buffer_size=buffer_size)
1354
1355        # Create an array with element size > 1 byte
1356        b = array.array('i', b'x' * 32)
1357        assert len(b) != 16
1358
1359        # Read into it. We should get as many *bytes* as we can fit into b
1360        # (which is more than the number of elements)
1361        n = bufio.readinto(b)
1362        self.assertGreater(n, len(b))
1363
1364        # Check that old contents of b are preserved
1365        bm = memoryview(b).cast('B')
1366        self.assertLess(n, len(bm))
1367        self.assertEqual(bm[:n], data[:n])
1368        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1369
1370    def test_readinto1_array(self):
1371        buffer_size = 60
1372        data = b"a" * 26
1373        rawio = self.MockRawIO((data,))
1374        bufio = self.tp(rawio, buffer_size=buffer_size)
1375
1376        # Create an array with element size > 1 byte
1377        b = array.array('i', b'x' * 32)
1378        assert len(b) != 16
1379
1380        # Read into it. We should get as many *bytes* as we can fit into b
1381        # (which is more than the number of elements)
1382        n = bufio.readinto1(b)
1383        self.assertGreater(n, len(b))
1384
1385        # Check that old contents of b are preserved
1386        bm = memoryview(b).cast('B')
1387        self.assertLess(n, len(bm))
1388        self.assertEqual(bm[:n], data[:n])
1389        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1390
1391    def test_readlines(self):
1392        def bufio():
1393            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1394            return self.tp(rawio)
1395        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1396        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1397        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1398
1399    def test_buffering(self):
1400        data = b"abcdefghi"
1401        dlen = len(data)
1402
1403        tests = [
1404            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1405            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1406            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1407        ]
1408
1409        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1410            rawio = self.MockFileIO(data)
1411            bufio = self.tp(rawio, buffer_size=bufsize)
1412            pos = 0
1413            for nbytes in buf_read_sizes:
1414                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1415                pos += nbytes
1416            # this is mildly implementation-dependent
1417            self.assertEqual(rawio.read_history, raw_read_sizes)
1418
1419    def test_read_non_blocking(self):
1420        # Inject some None's in there to simulate EWOULDBLOCK
1421        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1422        bufio = self.tp(rawio)
1423        self.assertEqual(b"abcd", bufio.read(6))
1424        self.assertEqual(b"e", bufio.read(1))
1425        self.assertEqual(b"fg", bufio.read())
1426        self.assertEqual(b"", bufio.peek(1))
1427        self.assertIsNone(bufio.read())
1428        self.assertEqual(b"", bufio.read())
1429
1430        rawio = self.MockRawIO((b"a", None, None))
1431        self.assertEqual(b"a", rawio.readall())
1432        self.assertIsNone(rawio.readall())
1433
1434    def test_read_past_eof(self):
1435        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1436        bufio = self.tp(rawio)
1437
1438        self.assertEqual(b"abcdefg", bufio.read(9000))
1439
1440    def test_read_all(self):
1441        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1442        bufio = self.tp(rawio)
1443
1444        self.assertEqual(b"abcdefg", bufio.read())
1445
1446    @support.requires_resource('cpu')
1447    def test_threads(self):
1448        try:
1449            # Write out many bytes with exactly the same number of 0's,
1450            # 1's... 255's. This will help us check that concurrent reading
1451            # doesn't duplicate or forget contents.
1452            N = 1000
1453            l = list(range(256)) * N
1454            random.shuffle(l)
1455            s = bytes(bytearray(l))
1456            with self.open(support.TESTFN, "wb") as f:
1457                f.write(s)
1458            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1459                bufio = self.tp(raw, 8)
1460                errors = []
1461                results = []
1462                def f():
1463                    try:
1464                        # Intra-buffer read then buffer-flushing read
1465                        for n in cycle([1, 19]):
1466                            s = bufio.read(n)
1467                            if not s:
1468                                break
1469                            # list.append() is atomic
1470                            results.append(s)
1471                    except Exception as e:
1472                        errors.append(e)
1473                        raise
1474                threads = [threading.Thread(target=f) for x in range(20)]
1475                with support.start_threads(threads):
1476                    time.sleep(0.02) # yield
1477                self.assertFalse(errors,
1478                    "the following exceptions were caught: %r" % errors)
1479                s = b''.join(results)
1480                for i in range(256):
1481                    c = bytes(bytearray([i]))
1482                    self.assertEqual(s.count(c), N)
1483        finally:
1484            support.unlink(support.TESTFN)
1485
1486    def test_unseekable(self):
1487        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1488        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1489        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490        bufio.read(1)
1491        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1492        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493
1494    def test_misbehaved_io(self):
1495        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1496        bufio = self.tp(rawio)
1497        self.assertRaises(OSError, bufio.seek, 0)
1498        self.assertRaises(OSError, bufio.tell)
1499
1500        # Silence destructor error
1501        bufio.close = lambda: None
1502
1503    def test_no_extraneous_read(self):
1504        # Issue #9550; when the raw IO object has satisfied the read request,
1505        # we should not issue any additional reads, otherwise it may block
1506        # (e.g. socket).
1507        bufsize = 16
1508        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1509            rawio = self.MockRawIO([b"x" * n])
1510            bufio = self.tp(rawio, bufsize)
1511            self.assertEqual(bufio.read(n), b"x" * n)
1512            # Simple case: one raw read is enough to satisfy the request.
1513            self.assertEqual(rawio._extraneous_reads, 0,
1514                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515            # A more complex case where two raw reads are needed to satisfy
1516            # the request.
1517            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1518            bufio = self.tp(rawio, bufsize)
1519            self.assertEqual(bufio.read(n), b"x" * n)
1520            self.assertEqual(rawio._extraneous_reads, 0,
1521                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1522
1523    def test_read_on_closed(self):
1524        # Issue #23796
1525        b = io.BufferedReader(io.BytesIO(b"12"))
1526        b.read(1)
1527        b.close()
1528        self.assertRaises(ValueError, b.peek)
1529        self.assertRaises(ValueError, b.read1, 1)
1530
1531    def test_truncate_on_read_only(self):
1532        rawio = self.MockFileIO(b"abc")
1533        bufio = self.tp(rawio)
1534        self.assertFalse(bufio.writable())
1535        self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1536        self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1537
1538
1539class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1540    tp = io.BufferedReader
1541
1542    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1543                     "instead of returning NULL for malloc failure.")
1544    def test_constructor(self):
1545        BufferedReaderTest.test_constructor(self)
1546        # The allocation can succeed on 32-bit builds, e.g. with more
1547        # than 2 GiB RAM and a 64-bit kernel.
1548        if sys.maxsize > 0x7FFFFFFF:
1549            rawio = self.MockRawIO()
1550            bufio = self.tp(rawio)
1551            self.assertRaises((OverflowError, MemoryError, ValueError),
1552                bufio.__init__, rawio, sys.maxsize)
1553
1554    def test_initialization(self):
1555        rawio = self.MockRawIO([b"abc"])
1556        bufio = self.tp(rawio)
1557        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1558        self.assertRaises(ValueError, bufio.read)
1559        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1560        self.assertRaises(ValueError, bufio.read)
1561        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1562        self.assertRaises(ValueError, bufio.read)
1563
1564    def test_misbehaved_io_read(self):
1565        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1566        bufio = self.tp(rawio)
1567        # _pyio.BufferedReader seems to implement reading different, so that
1568        # checking this is not so easy.
1569        self.assertRaises(OSError, bufio.read, 10)
1570
1571    def test_garbage_collection(self):
1572        # C BufferedReader objects are collected.
1573        # The Python version has __del__, so it ends into gc.garbage instead
1574        self.addCleanup(support.unlink, support.TESTFN)
1575        with support.check_warnings(('', ResourceWarning)):
1576            rawio = self.FileIO(support.TESTFN, "w+b")
1577            f = self.tp(rawio)
1578            f.f = f
1579            wr = weakref.ref(f)
1580            del f
1581            support.gc_collect()
1582        self.assertIsNone(wr(), wr)
1583
1584    def test_args_error(self):
1585        # Issue #17275
1586        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1587            self.tp(io.BytesIO(), 1024, 1024, 1024)
1588
1589
1590class PyBufferedReaderTest(BufferedReaderTest):
1591    tp = pyio.BufferedReader
1592
1593
1594class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1595    write_mode = "wb"
1596
1597    def test_constructor(self):
1598        rawio = self.MockRawIO()
1599        bufio = self.tp(rawio)
1600        bufio.__init__(rawio)
1601        bufio.__init__(rawio, buffer_size=1024)
1602        bufio.__init__(rawio, buffer_size=16)
1603        self.assertEqual(3, bufio.write(b"abc"))
1604        bufio.flush()
1605        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1606        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1607        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1608        bufio.__init__(rawio)
1609        self.assertEqual(3, bufio.write(b"ghi"))
1610        bufio.flush()
1611        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1612
1613    def test_uninitialized(self):
1614        bufio = self.tp.__new__(self.tp)
1615        del bufio
1616        bufio = self.tp.__new__(self.tp)
1617        self.assertRaisesRegex((ValueError, AttributeError),
1618                               'uninitialized|has no attribute',
1619                               bufio.write, b'')
1620        bufio.__init__(self.MockRawIO())
1621        self.assertEqual(bufio.write(b''), 0)
1622
1623    def test_detach_flush(self):
1624        raw = self.MockRawIO()
1625        buf = self.tp(raw)
1626        buf.write(b"howdy!")
1627        self.assertFalse(raw._write_stack)
1628        buf.detach()
1629        self.assertEqual(raw._write_stack, [b"howdy!"])
1630
1631    def test_write(self):
1632        # Write to the buffered IO but don't overflow the buffer.
1633        writer = self.MockRawIO()
1634        bufio = self.tp(writer, 8)
1635        bufio.write(b"abc")
1636        self.assertFalse(writer._write_stack)
1637        buffer = bytearray(b"def")
1638        bufio.write(buffer)
1639        buffer[:] = b"***"  # Overwrite our copy of the data
1640        bufio.flush()
1641        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1642
1643    def test_write_overflow(self):
1644        writer = self.MockRawIO()
1645        bufio = self.tp(writer, 8)
1646        contents = b"abcdefghijklmnop"
1647        for n in range(0, len(contents), 3):
1648            bufio.write(contents[n:n+3])
1649        flushed = b"".join(writer._write_stack)
1650        # At least (total - 8) bytes were implicitly flushed, perhaps more
1651        # depending on the implementation.
1652        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1653
1654    def check_writes(self, intermediate_func):
1655        # Lots of writes, test the flushed output is as expected.
1656        contents = bytes(range(256)) * 1000
1657        n = 0
1658        writer = self.MockRawIO()
1659        bufio = self.tp(writer, 13)
1660        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1661        def gen_sizes():
1662            for size in count(1):
1663                for i in range(15):
1664                    yield size
1665        sizes = gen_sizes()
1666        while n < len(contents):
1667            size = min(next(sizes), len(contents) - n)
1668            self.assertEqual(bufio.write(contents[n:n+size]), size)
1669            intermediate_func(bufio)
1670            n += size
1671        bufio.flush()
1672        self.assertEqual(contents, b"".join(writer._write_stack))
1673
1674    def test_writes(self):
1675        self.check_writes(lambda bufio: None)
1676
1677    def test_writes_and_flushes(self):
1678        self.check_writes(lambda bufio: bufio.flush())
1679
1680    def test_writes_and_seeks(self):
1681        def _seekabs(bufio):
1682            pos = bufio.tell()
1683            bufio.seek(pos + 1, 0)
1684            bufio.seek(pos - 1, 0)
1685            bufio.seek(pos, 0)
1686        self.check_writes(_seekabs)
1687        def _seekrel(bufio):
1688            pos = bufio.seek(0, 1)
1689            bufio.seek(+1, 1)
1690            bufio.seek(-1, 1)
1691            bufio.seek(pos, 0)
1692        self.check_writes(_seekrel)
1693
1694    def test_writes_and_truncates(self):
1695        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1696
1697    def test_write_non_blocking(self):
1698        raw = self.MockNonBlockWriterIO()
1699        bufio = self.tp(raw, 8)
1700
1701        self.assertEqual(bufio.write(b"abcd"), 4)
1702        self.assertEqual(bufio.write(b"efghi"), 5)
1703        # 1 byte will be written, the rest will be buffered
1704        raw.block_on(b"k")
1705        self.assertEqual(bufio.write(b"jklmn"), 5)
1706
1707        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1708        raw.block_on(b"0")
1709        try:
1710            bufio.write(b"opqrwxyz0123456789")
1711        except self.BlockingIOError as e:
1712            written = e.characters_written
1713        else:
1714            self.fail("BlockingIOError should have been raised")
1715        self.assertEqual(written, 16)
1716        self.assertEqual(raw.pop_written(),
1717            b"abcdefghijklmnopqrwxyz")
1718
1719        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1720        s = raw.pop_written()
1721        # Previously buffered bytes were flushed
1722        self.assertTrue(s.startswith(b"01234567A"), s)
1723
1724    def test_write_and_rewind(self):
1725        raw = io.BytesIO()
1726        bufio = self.tp(raw, 4)
1727        self.assertEqual(bufio.write(b"abcdef"), 6)
1728        self.assertEqual(bufio.tell(), 6)
1729        bufio.seek(0, 0)
1730        self.assertEqual(bufio.write(b"XY"), 2)
1731        bufio.seek(6, 0)
1732        self.assertEqual(raw.getvalue(), b"XYcdef")
1733        self.assertEqual(bufio.write(b"123456"), 6)
1734        bufio.flush()
1735        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1736
1737    def test_flush(self):
1738        writer = self.MockRawIO()
1739        bufio = self.tp(writer, 8)
1740        bufio.write(b"abc")
1741        bufio.flush()
1742        self.assertEqual(b"abc", writer._write_stack[0])
1743
1744    def test_writelines(self):
1745        l = [b'ab', b'cd', b'ef']
1746        writer = self.MockRawIO()
1747        bufio = self.tp(writer, 8)
1748        bufio.writelines(l)
1749        bufio.flush()
1750        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1751
1752    def test_writelines_userlist(self):
1753        l = UserList([b'ab', b'cd', b'ef'])
1754        writer = self.MockRawIO()
1755        bufio = self.tp(writer, 8)
1756        bufio.writelines(l)
1757        bufio.flush()
1758        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1759
1760    def test_writelines_error(self):
1761        writer = self.MockRawIO()
1762        bufio = self.tp(writer, 8)
1763        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1764        self.assertRaises(TypeError, bufio.writelines, None)
1765        self.assertRaises(TypeError, bufio.writelines, 'abc')
1766
1767    def test_destructor(self):
1768        writer = self.MockRawIO()
1769        bufio = self.tp(writer, 8)
1770        bufio.write(b"abc")
1771        del bufio
1772        support.gc_collect()
1773        self.assertEqual(b"abc", writer._write_stack[0])
1774
1775    def test_truncate(self):
1776        # Truncate implicitly flushes the buffer.
1777        self.addCleanup(support.unlink, support.TESTFN)
1778        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1779            bufio = self.tp(raw, 8)
1780            bufio.write(b"abcdef")
1781            self.assertEqual(bufio.truncate(3), 3)
1782            self.assertEqual(bufio.tell(), 6)
1783        with self.open(support.TESTFN, "rb", buffering=0) as f:
1784            self.assertEqual(f.read(), b"abc")
1785
1786    def test_truncate_after_write(self):
1787        # Ensure that truncate preserves the file position after
1788        # writes longer than the buffer size.
1789        # Issue: https://bugs.python.org/issue32228
1790        self.addCleanup(support.unlink, support.TESTFN)
1791        with self.open(support.TESTFN, "wb") as f:
1792            # Fill with some buffer
1793            f.write(b'\x00' * 10000)
1794        buffer_sizes = [8192, 4096, 200]
1795        for buffer_size in buffer_sizes:
1796            with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1797                f.write(b'\x00' * (buffer_size + 1))
1798                # After write write_pos and write_end are set to 0
1799                f.read(1)
1800                # read operation makes sure that pos != raw_pos
1801                f.truncate()
1802                self.assertEqual(f.tell(), buffer_size + 2)
1803
1804    @support.requires_resource('cpu')
1805    def test_threads(self):
1806        try:
1807            # Write out many bytes from many threads and test they were
1808            # all flushed.
1809            N = 1000
1810            contents = bytes(range(256)) * N
1811            sizes = cycle([1, 19])
1812            n = 0
1813            queue = deque()
1814            while n < len(contents):
1815                size = next(sizes)
1816                queue.append(contents[n:n+size])
1817                n += size
1818            del contents
1819            # We use a real file object because it allows us to
1820            # exercise situations where the GIL is released before
1821            # writing the buffer to the raw streams. This is in addition
1822            # to concurrency issues due to switching threads in the middle
1823            # of Python code.
1824            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1825                bufio = self.tp(raw, 8)
1826                errors = []
1827                def f():
1828                    try:
1829                        while True:
1830                            try:
1831                                s = queue.popleft()
1832                            except IndexError:
1833                                return
1834                            bufio.write(s)
1835                    except Exception as e:
1836                        errors.append(e)
1837                        raise
1838                threads = [threading.Thread(target=f) for x in range(20)]
1839                with support.start_threads(threads):
1840                    time.sleep(0.02) # yield
1841                self.assertFalse(errors,
1842                    "the following exceptions were caught: %r" % errors)
1843                bufio.close()
1844            with self.open(support.TESTFN, "rb") as f:
1845                s = f.read()
1846            for i in range(256):
1847                self.assertEqual(s.count(bytes([i])), N)
1848        finally:
1849            support.unlink(support.TESTFN)
1850
1851    def test_misbehaved_io(self):
1852        rawio = self.MisbehavedRawIO()
1853        bufio = self.tp(rawio, 5)
1854        self.assertRaises(OSError, bufio.seek, 0)
1855        self.assertRaises(OSError, bufio.tell)
1856        self.assertRaises(OSError, bufio.write, b"abcdef")
1857
1858        # Silence destructor error
1859        bufio.close = lambda: None
1860
1861    def test_max_buffer_size_removal(self):
1862        with self.assertRaises(TypeError):
1863            self.tp(self.MockRawIO(), 8, 12)
1864
1865    def test_write_error_on_close(self):
1866        raw = self.MockRawIO()
1867        def bad_write(b):
1868            raise OSError()
1869        raw.write = bad_write
1870        b = self.tp(raw)
1871        b.write(b'spam')
1872        self.assertRaises(OSError, b.close) # exception not swallowed
1873        self.assertTrue(b.closed)
1874
1875    def test_slow_close_from_thread(self):
1876        # Issue #31976
1877        rawio = self.SlowFlushRawIO()
1878        bufio = self.tp(rawio, 8)
1879        t = threading.Thread(target=bufio.close)
1880        t.start()
1881        rawio.in_flush.wait()
1882        self.assertRaises(ValueError, bufio.write, b'spam')
1883        self.assertTrue(bufio.closed)
1884        t.join()
1885
1886
1887
1888class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1889    tp = io.BufferedWriter
1890
1891    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1892                     "instead of returning NULL for malloc failure.")
1893    def test_constructor(self):
1894        BufferedWriterTest.test_constructor(self)
1895        # The allocation can succeed on 32-bit builds, e.g. with more
1896        # than 2 GiB RAM and a 64-bit kernel.
1897        if sys.maxsize > 0x7FFFFFFF:
1898            rawio = self.MockRawIO()
1899            bufio = self.tp(rawio)
1900            self.assertRaises((OverflowError, MemoryError, ValueError),
1901                bufio.__init__, rawio, sys.maxsize)
1902
1903    def test_initialization(self):
1904        rawio = self.MockRawIO()
1905        bufio = self.tp(rawio)
1906        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1907        self.assertRaises(ValueError, bufio.write, b"def")
1908        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1909        self.assertRaises(ValueError, bufio.write, b"def")
1910        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1911        self.assertRaises(ValueError, bufio.write, b"def")
1912
1913    def test_garbage_collection(self):
1914        # C BufferedWriter objects are collected, and collecting them flushes
1915        # all data to disk.
1916        # The Python version has __del__, so it ends into gc.garbage instead
1917        self.addCleanup(support.unlink, support.TESTFN)
1918        with support.check_warnings(('', ResourceWarning)):
1919            rawio = self.FileIO(support.TESTFN, "w+b")
1920            f = self.tp(rawio)
1921            f.write(b"123xxx")
1922            f.x = f
1923            wr = weakref.ref(f)
1924            del f
1925            support.gc_collect()
1926        self.assertIsNone(wr(), wr)
1927        with self.open(support.TESTFN, "rb") as f:
1928            self.assertEqual(f.read(), b"123xxx")
1929
1930    def test_args_error(self):
1931        # Issue #17275
1932        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1933            self.tp(io.BytesIO(), 1024, 1024, 1024)
1934
1935
1936class PyBufferedWriterTest(BufferedWriterTest):
1937    tp = pyio.BufferedWriter
1938
1939class BufferedRWPairTest(unittest.TestCase):
1940
1941    def test_constructor(self):
1942        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1943        self.assertFalse(pair.closed)
1944
1945    def test_uninitialized(self):
1946        pair = self.tp.__new__(self.tp)
1947        del pair
1948        pair = self.tp.__new__(self.tp)
1949        self.assertRaisesRegex((ValueError, AttributeError),
1950                               'uninitialized|has no attribute',
1951                               pair.read, 0)
1952        self.assertRaisesRegex((ValueError, AttributeError),
1953                               'uninitialized|has no attribute',
1954                               pair.write, b'')
1955        pair.__init__(self.MockRawIO(), self.MockRawIO())
1956        self.assertEqual(pair.read(0), b'')
1957        self.assertEqual(pair.write(b''), 0)
1958
1959    def test_detach(self):
1960        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1961        self.assertRaises(self.UnsupportedOperation, pair.detach)
1962
1963    def test_constructor_max_buffer_size_removal(self):
1964        with self.assertRaises(TypeError):
1965            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1966
1967    def test_constructor_with_not_readable(self):
1968        class NotReadable(MockRawIO):
1969            def readable(self):
1970                return False
1971
1972        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1973
1974    def test_constructor_with_not_writeable(self):
1975        class NotWriteable(MockRawIO):
1976            def writable(self):
1977                return False
1978
1979        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1980
1981    def test_read(self):
1982        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1983
1984        self.assertEqual(pair.read(3), b"abc")
1985        self.assertEqual(pair.read(1), b"d")
1986        self.assertEqual(pair.read(), b"ef")
1987        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1988        self.assertEqual(pair.read(None), b"abc")
1989
1990    def test_readlines(self):
1991        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1992        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1993        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1994        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1995
1996    def test_read1(self):
1997        # .read1() is delegated to the underlying reader object, so this test
1998        # can be shallow.
1999        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2000
2001        self.assertEqual(pair.read1(3), b"abc")
2002        self.assertEqual(pair.read1(), b"def")
2003
2004    def test_readinto(self):
2005        for method in ("readinto", "readinto1"):
2006            with self.subTest(method):
2007                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2008
2009                data = byteslike(b'\0' * 5)
2010                self.assertEqual(getattr(pair, method)(data), 5)
2011                self.assertEqual(bytes(data), b"abcde")
2012
2013    def test_write(self):
2014        w = self.MockRawIO()
2015        pair = self.tp(self.MockRawIO(), w)
2016
2017        pair.write(b"abc")
2018        pair.flush()
2019        buffer = bytearray(b"def")
2020        pair.write(buffer)
2021        buffer[:] = b"***"  # Overwrite our copy of the data
2022        pair.flush()
2023        self.assertEqual(w._write_stack, [b"abc", b"def"])
2024
2025    def test_peek(self):
2026        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2027
2028        self.assertTrue(pair.peek(3).startswith(b"abc"))
2029        self.assertEqual(pair.read(3), b"abc")
2030
2031    def test_readable(self):
2032        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2033        self.assertTrue(pair.readable())
2034
2035    def test_writeable(self):
2036        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2037        self.assertTrue(pair.writable())
2038
2039    def test_seekable(self):
2040        # BufferedRWPairs are never seekable, even if their readers and writers
2041        # are.
2042        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2043        self.assertFalse(pair.seekable())
2044
2045    # .flush() is delegated to the underlying writer object and has been
2046    # tested in the test_write method.
2047
2048    def test_close_and_closed(self):
2049        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2050        self.assertFalse(pair.closed)
2051        pair.close()
2052        self.assertTrue(pair.closed)
2053
2054    def test_reader_close_error_on_close(self):
2055        def reader_close():
2056            reader_non_existing
2057        reader = self.MockRawIO()
2058        reader.close = reader_close
2059        writer = self.MockRawIO()
2060        pair = self.tp(reader, writer)
2061        with self.assertRaises(NameError) as err:
2062            pair.close()
2063        self.assertIn('reader_non_existing', str(err.exception))
2064        self.assertTrue(pair.closed)
2065        self.assertFalse(reader.closed)
2066        self.assertTrue(writer.closed)
2067
2068        # Silence destructor error
2069        reader.close = lambda: None
2070
2071    def test_writer_close_error_on_close(self):
2072        def writer_close():
2073            writer_non_existing
2074        reader = self.MockRawIO()
2075        writer = self.MockRawIO()
2076        writer.close = writer_close
2077        pair = self.tp(reader, writer)
2078        with self.assertRaises(NameError) as err:
2079            pair.close()
2080        self.assertIn('writer_non_existing', str(err.exception))
2081        self.assertFalse(pair.closed)
2082        self.assertTrue(reader.closed)
2083        self.assertFalse(writer.closed)
2084
2085        # Silence destructor error
2086        writer.close = lambda: None
2087        writer = None
2088
2089        # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
2090        with support.catch_unraisable_exception():
2091            # Ignore BufferedRWPair unraisable exception
2092            with support.catch_unraisable_exception():
2093                pair = None
2094                support.gc_collect()
2095            support.gc_collect()
2096
2097    def test_reader_writer_close_error_on_close(self):
2098        def reader_close():
2099            reader_non_existing
2100        def writer_close():
2101            writer_non_existing
2102        reader = self.MockRawIO()
2103        reader.close = reader_close
2104        writer = self.MockRawIO()
2105        writer.close = writer_close
2106        pair = self.tp(reader, writer)
2107        with self.assertRaises(NameError) as err:
2108            pair.close()
2109        self.assertIn('reader_non_existing', str(err.exception))
2110        self.assertIsInstance(err.exception.__context__, NameError)
2111        self.assertIn('writer_non_existing', str(err.exception.__context__))
2112        self.assertFalse(pair.closed)
2113        self.assertFalse(reader.closed)
2114        self.assertFalse(writer.closed)
2115
2116        # Silence destructor error
2117        reader.close = lambda: None
2118        writer.close = lambda: None
2119
2120    def test_isatty(self):
2121        class SelectableIsAtty(MockRawIO):
2122            def __init__(self, isatty):
2123                MockRawIO.__init__(self)
2124                self._isatty = isatty
2125
2126            def isatty(self):
2127                return self._isatty
2128
2129        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2130        self.assertFalse(pair.isatty())
2131
2132        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2133        self.assertTrue(pair.isatty())
2134
2135        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2136        self.assertTrue(pair.isatty())
2137
2138        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2139        self.assertTrue(pair.isatty())
2140
2141    def test_weakref_clearing(self):
2142        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2143        ref = weakref.ref(brw)
2144        brw = None
2145        ref = None # Shouldn't segfault.
2146
2147class CBufferedRWPairTest(BufferedRWPairTest):
2148    tp = io.BufferedRWPair
2149
2150class PyBufferedRWPairTest(BufferedRWPairTest):
2151    tp = pyio.BufferedRWPair
2152
2153
2154class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2155    read_mode = "rb+"
2156    write_mode = "wb+"
2157
2158    def test_constructor(self):
2159        BufferedReaderTest.test_constructor(self)
2160        BufferedWriterTest.test_constructor(self)
2161
2162    def test_uninitialized(self):
2163        BufferedReaderTest.test_uninitialized(self)
2164        BufferedWriterTest.test_uninitialized(self)
2165
2166    def test_read_and_write(self):
2167        raw = self.MockRawIO((b"asdf", b"ghjk"))
2168        rw = self.tp(raw, 8)
2169
2170        self.assertEqual(b"as", rw.read(2))
2171        rw.write(b"ddd")
2172        rw.write(b"eee")
2173        self.assertFalse(raw._write_stack) # Buffer writes
2174        self.assertEqual(b"ghjk", rw.read())
2175        self.assertEqual(b"dddeee", raw._write_stack[0])
2176
2177    def test_seek_and_tell(self):
2178        raw = self.BytesIO(b"asdfghjkl")
2179        rw = self.tp(raw)
2180
2181        self.assertEqual(b"as", rw.read(2))
2182        self.assertEqual(2, rw.tell())
2183        rw.seek(0, 0)
2184        self.assertEqual(b"asdf", rw.read(4))
2185
2186        rw.write(b"123f")
2187        rw.seek(0, 0)
2188        self.assertEqual(b"asdf123fl", rw.read())
2189        self.assertEqual(9, rw.tell())
2190        rw.seek(-4, 2)
2191        self.assertEqual(5, rw.tell())
2192        rw.seek(2, 1)
2193        self.assertEqual(7, rw.tell())
2194        self.assertEqual(b"fl", rw.read(11))
2195        rw.flush()
2196        self.assertEqual(b"asdf123fl", raw.getvalue())
2197
2198        self.assertRaises(TypeError, rw.seek, 0.0)
2199
2200    def check_flush_and_read(self, read_func):
2201        raw = self.BytesIO(b"abcdefghi")
2202        bufio = self.tp(raw)
2203
2204        self.assertEqual(b"ab", read_func(bufio, 2))
2205        bufio.write(b"12")
2206        self.assertEqual(b"ef", read_func(bufio, 2))
2207        self.assertEqual(6, bufio.tell())
2208        bufio.flush()
2209        self.assertEqual(6, bufio.tell())
2210        self.assertEqual(b"ghi", read_func(bufio))
2211        raw.seek(0, 0)
2212        raw.write(b"XYZ")
2213        # flush() resets the read buffer
2214        bufio.flush()
2215        bufio.seek(0, 0)
2216        self.assertEqual(b"XYZ", read_func(bufio, 3))
2217
2218    def test_flush_and_read(self):
2219        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2220
2221    def test_flush_and_readinto(self):
2222        def _readinto(bufio, n=-1):
2223            b = bytearray(n if n >= 0 else 9999)
2224            n = bufio.readinto(b)
2225            return bytes(b[:n])
2226        self.check_flush_and_read(_readinto)
2227
2228    def test_flush_and_peek(self):
2229        def _peek(bufio, n=-1):
2230            # This relies on the fact that the buffer can contain the whole
2231            # raw stream, otherwise peek() can return less.
2232            b = bufio.peek(n)
2233            if n != -1:
2234                b = b[:n]
2235            bufio.seek(len(b), 1)
2236            return b
2237        self.check_flush_and_read(_peek)
2238
2239    def test_flush_and_write(self):
2240        raw = self.BytesIO(b"abcdefghi")
2241        bufio = self.tp(raw)
2242
2243        bufio.write(b"123")
2244        bufio.flush()
2245        bufio.write(b"45")
2246        bufio.flush()
2247        bufio.seek(0, 0)
2248        self.assertEqual(b"12345fghi", raw.getvalue())
2249        self.assertEqual(b"12345fghi", bufio.read())
2250
2251    def test_threads(self):
2252        BufferedReaderTest.test_threads(self)
2253        BufferedWriterTest.test_threads(self)
2254
2255    def test_writes_and_peek(self):
2256        def _peek(bufio):
2257            bufio.peek(1)
2258        self.check_writes(_peek)
2259        def _peek(bufio):
2260            pos = bufio.tell()
2261            bufio.seek(-1, 1)
2262            bufio.peek(1)
2263            bufio.seek(pos, 0)
2264        self.check_writes(_peek)
2265
2266    def test_writes_and_reads(self):
2267        def _read(bufio):
2268            bufio.seek(-1, 1)
2269            bufio.read(1)
2270        self.check_writes(_read)
2271
2272    def test_writes_and_read1s(self):
2273        def _read1(bufio):
2274            bufio.seek(-1, 1)
2275            bufio.read1(1)
2276        self.check_writes(_read1)
2277
2278    def test_writes_and_readintos(self):
2279        def _read(bufio):
2280            bufio.seek(-1, 1)
2281            bufio.readinto(bytearray(1))
2282        self.check_writes(_read)
2283
2284    def test_write_after_readahead(self):
2285        # Issue #6629: writing after the buffer was filled by readahead should
2286        # first rewind the raw stream.
2287        for overwrite_size in [1, 5]:
2288            raw = self.BytesIO(b"A" * 10)
2289            bufio = self.tp(raw, 4)
2290            # Trigger readahead
2291            self.assertEqual(bufio.read(1), b"A")
2292            self.assertEqual(bufio.tell(), 1)
2293            # Overwriting should rewind the raw stream if it needs so
2294            bufio.write(b"B" * overwrite_size)
2295            self.assertEqual(bufio.tell(), overwrite_size + 1)
2296            # If the write size was smaller than the buffer size, flush() and
2297            # check that rewind happens.
2298            bufio.flush()
2299            self.assertEqual(bufio.tell(), overwrite_size + 1)
2300            s = raw.getvalue()
2301            self.assertEqual(s,
2302                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2303
2304    def test_write_rewind_write(self):
2305        # Various combinations of reading / writing / seeking backwards / writing again
2306        def mutate(bufio, pos1, pos2):
2307            assert pos2 >= pos1
2308            # Fill the buffer
2309            bufio.seek(pos1)
2310            bufio.read(pos2 - pos1)
2311            bufio.write(b'\x02')
2312            # This writes earlier than the previous write, but still inside
2313            # the buffer.
2314            bufio.seek(pos1)
2315            bufio.write(b'\x01')
2316
2317        b = b"\x80\x81\x82\x83\x84"
2318        for i in range(0, len(b)):
2319            for j in range(i, len(b)):
2320                raw = self.BytesIO(b)
2321                bufio = self.tp(raw, 100)
2322                mutate(bufio, i, j)
2323                bufio.flush()
2324                expected = bytearray(b)
2325                expected[j] = 2
2326                expected[i] = 1
2327                self.assertEqual(raw.getvalue(), expected,
2328                                 "failed result for i=%d, j=%d" % (i, j))
2329
2330    def test_truncate_after_read_or_write(self):
2331        raw = self.BytesIO(b"A" * 10)
2332        bufio = self.tp(raw, 100)
2333        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2334        self.assertEqual(bufio.truncate(), 2)
2335        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2336        self.assertEqual(bufio.truncate(), 4)
2337
2338    def test_misbehaved_io(self):
2339        BufferedReaderTest.test_misbehaved_io(self)
2340        BufferedWriterTest.test_misbehaved_io(self)
2341
2342    def test_interleaved_read_write(self):
2343        # Test for issue #12213
2344        with self.BytesIO(b'abcdefgh') as raw:
2345            with self.tp(raw, 100) as f:
2346                f.write(b"1")
2347                self.assertEqual(f.read(1), b'b')
2348                f.write(b'2')
2349                self.assertEqual(f.read1(1), b'd')
2350                f.write(b'3')
2351                buf = bytearray(1)
2352                f.readinto(buf)
2353                self.assertEqual(buf, b'f')
2354                f.write(b'4')
2355                self.assertEqual(f.peek(1), b'h')
2356                f.flush()
2357                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2358
2359        with self.BytesIO(b'abc') as raw:
2360            with self.tp(raw, 100) as f:
2361                self.assertEqual(f.read(1), b'a')
2362                f.write(b"2")
2363                self.assertEqual(f.read(1), b'c')
2364                f.flush()
2365                self.assertEqual(raw.getvalue(), b'a2c')
2366
2367    def test_interleaved_readline_write(self):
2368        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2369            with self.tp(raw) as f:
2370                f.write(b'1')
2371                self.assertEqual(f.readline(), b'b\n')
2372                f.write(b'2')
2373                self.assertEqual(f.readline(), b'def\n')
2374                f.write(b'3')
2375                self.assertEqual(f.readline(), b'\n')
2376                f.flush()
2377                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2378
2379    # You can't construct a BufferedRandom over a non-seekable stream.
2380    test_unseekable = None
2381
2382    # writable() returns True, so there's no point to test it over
2383    # a writable stream.
2384    test_truncate_on_read_only = None
2385
2386
2387class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2388    tp = io.BufferedRandom
2389
2390    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2391                     "instead of returning NULL for malloc failure.")
2392    def test_constructor(self):
2393        BufferedRandomTest.test_constructor(self)
2394        # The allocation can succeed on 32-bit builds, e.g. with more
2395        # than 2 GiB RAM and a 64-bit kernel.
2396        if sys.maxsize > 0x7FFFFFFF:
2397            rawio = self.MockRawIO()
2398            bufio = self.tp(rawio)
2399            self.assertRaises((OverflowError, MemoryError, ValueError),
2400                bufio.__init__, rawio, sys.maxsize)
2401
2402    def test_garbage_collection(self):
2403        CBufferedReaderTest.test_garbage_collection(self)
2404        CBufferedWriterTest.test_garbage_collection(self)
2405
2406    def test_args_error(self):
2407        # Issue #17275
2408        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2409            self.tp(io.BytesIO(), 1024, 1024, 1024)
2410
2411
2412class PyBufferedRandomTest(BufferedRandomTest):
2413    tp = pyio.BufferedRandom
2414
2415
2416# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2417# properties:
2418#   - A single output character can correspond to many bytes of input.
2419#   - The number of input bytes to complete the character can be
2420#     undetermined until the last input byte is received.
2421#   - The number of input bytes can vary depending on previous input.
2422#   - A single input byte can correspond to many characters of output.
2423#   - The number of output characters can be undetermined until the
2424#     last input byte is received.
2425#   - The number of output characters can vary depending on previous input.
2426
2427class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2428    """
2429    For testing seek/tell behavior with a stateful, buffering decoder.
2430
2431    Input is a sequence of words.  Words may be fixed-length (length set
2432    by input) or variable-length (period-terminated).  In variable-length
2433    mode, extra periods are ignored.  Possible words are:
2434      - 'i' followed by a number sets the input length, I (maximum 99).
2435        When I is set to 0, words are space-terminated.
2436      - 'o' followed by a number sets the output length, O (maximum 99).
2437      - Any other word is converted into a word followed by a period on
2438        the output.  The output word consists of the input word truncated
2439        or padded out with hyphens to make its length equal to O.  If O
2440        is 0, the word is output verbatim without truncating or padding.
2441    I and O are initially set to 1.  When I changes, any buffered input is
2442    re-scanned according to the new I.  EOF also terminates the last word.
2443    """
2444
2445    def __init__(self, errors='strict'):
2446        codecs.IncrementalDecoder.__init__(self, errors)
2447        self.reset()
2448
2449    def __repr__(self):
2450        return '<SID %x>' % id(self)
2451
2452    def reset(self):
2453        self.i = 1
2454        self.o = 1
2455        self.buffer = bytearray()
2456
2457    def getstate(self):
2458        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2459        return bytes(self.buffer), i*100 + o
2460
2461    def setstate(self, state):
2462        buffer, io = state
2463        self.buffer = bytearray(buffer)
2464        i, o = divmod(io, 100)
2465        self.i, self.o = i ^ 1, o ^ 1
2466
2467    def decode(self, input, final=False):
2468        output = ''
2469        for b in input:
2470            if self.i == 0: # variable-length, terminated with period
2471                if b == ord('.'):
2472                    if self.buffer:
2473                        output += self.process_word()
2474                else:
2475                    self.buffer.append(b)
2476            else: # fixed-length, terminate after self.i bytes
2477                self.buffer.append(b)
2478                if len(self.buffer) == self.i:
2479                    output += self.process_word()
2480        if final and self.buffer: # EOF terminates the last word
2481            output += self.process_word()
2482        return output
2483
2484    def process_word(self):
2485        output = ''
2486        if self.buffer[0] == ord('i'):
2487            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2488        elif self.buffer[0] == ord('o'):
2489            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2490        else:
2491            output = self.buffer.decode('ascii')
2492            if len(output) < self.o:
2493                output += '-'*self.o # pad out with hyphens
2494            if self.o:
2495                output = output[:self.o] # truncate to output length
2496            output += '.'
2497        self.buffer = bytearray()
2498        return output
2499
2500    codecEnabled = False
2501
2502    @classmethod
2503    def lookupTestDecoder(cls, name):
2504        if cls.codecEnabled and name == 'test_decoder':
2505            latin1 = codecs.lookup('latin-1')
2506            return codecs.CodecInfo(
2507                name='test_decoder', encode=latin1.encode, decode=None,
2508                incrementalencoder=None,
2509                streamreader=None, streamwriter=None,
2510                incrementaldecoder=cls)
2511
2512# Register the previous decoder for testing.
2513# Disabled by default, tests will enable it.
2514codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2515
2516
2517class StatefulIncrementalDecoderTest(unittest.TestCase):
2518    """
2519    Make sure the StatefulIncrementalDecoder actually works.
2520    """
2521
2522    test_cases = [
2523        # I=1, O=1 (fixed-length input == fixed-length output)
2524        (b'abcd', False, 'a.b.c.d.'),
2525        # I=0, O=0 (variable-length input, variable-length output)
2526        (b'oiabcd', True, 'abcd.'),
2527        # I=0, O=0 (should ignore extra periods)
2528        (b'oi...abcd...', True, 'abcd.'),
2529        # I=0, O=6 (variable-length input, fixed-length output)
2530        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2531        # I=2, O=6 (fixed-length input < fixed-length output)
2532        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2533        # I=6, O=3 (fixed-length input > fixed-length output)
2534        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2535        # I=0, then 3; O=29, then 15 (with longer output)
2536        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2537         'a----------------------------.' +
2538         'b----------------------------.' +
2539         'cde--------------------------.' +
2540         'abcdefghijabcde.' +
2541         'a.b------------.' +
2542         '.c.------------.' +
2543         'd.e------------.' +
2544         'k--------------.' +
2545         'l--------------.' +
2546         'm--------------.')
2547    ]
2548
2549    def test_decoder(self):
2550        # Try a few one-shot test cases.
2551        for input, eof, output in self.test_cases:
2552            d = StatefulIncrementalDecoder()
2553            self.assertEqual(d.decode(input, eof), output)
2554
2555        # Also test an unfinished decode, followed by forcing EOF.
2556        d = StatefulIncrementalDecoder()
2557        self.assertEqual(d.decode(b'oiabcd'), '')
2558        self.assertEqual(d.decode(b'', 1), 'abcd.')
2559
2560class TextIOWrapperTest(unittest.TestCase):
2561
2562    def setUp(self):
2563        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2564        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2565        support.unlink(support.TESTFN)
2566
2567    def tearDown(self):
2568        support.unlink(support.TESTFN)
2569
2570    def test_constructor(self):
2571        r = self.BytesIO(b"\xc3\xa9\n\n")
2572        b = self.BufferedReader(r, 1000)
2573        t = self.TextIOWrapper(b)
2574        t.__init__(b, encoding="latin-1", newline="\r\n")
2575        self.assertEqual(t.encoding, "latin-1")
2576        self.assertEqual(t.line_buffering, False)
2577        t.__init__(b, encoding="utf-8", line_buffering=True)
2578        self.assertEqual(t.encoding, "utf-8")
2579        self.assertEqual(t.line_buffering, True)
2580        self.assertEqual("\xe9\n", t.readline())
2581        self.assertRaises(TypeError, t.__init__, b, newline=42)
2582        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2583
2584    def test_uninitialized(self):
2585        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2586        del t
2587        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2588        self.assertRaises(Exception, repr, t)
2589        self.assertRaisesRegex((ValueError, AttributeError),
2590                               'uninitialized|has no attribute',
2591                               t.read, 0)
2592        t.__init__(self.MockRawIO())
2593        self.assertEqual(t.read(0), '')
2594
2595    def test_non_text_encoding_codecs_are_rejected(self):
2596        # Ensure the constructor complains if passed a codec that isn't
2597        # marked as a text encoding
2598        # http://bugs.python.org/issue20404
2599        r = self.BytesIO()
2600        b = self.BufferedWriter(r)
2601        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2602            self.TextIOWrapper(b, encoding="hex")
2603
2604    def test_detach(self):
2605        r = self.BytesIO()
2606        b = self.BufferedWriter(r)
2607        t = self.TextIOWrapper(b)
2608        self.assertIs(t.detach(), b)
2609
2610        t = self.TextIOWrapper(b, encoding="ascii")
2611        t.write("howdy")
2612        self.assertFalse(r.getvalue())
2613        t.detach()
2614        self.assertEqual(r.getvalue(), b"howdy")
2615        self.assertRaises(ValueError, t.detach)
2616
2617        # Operations independent of the detached stream should still work
2618        repr(t)
2619        self.assertEqual(t.encoding, "ascii")
2620        self.assertEqual(t.errors, "strict")
2621        self.assertFalse(t.line_buffering)
2622        self.assertFalse(t.write_through)
2623
2624    def test_repr(self):
2625        raw = self.BytesIO("hello".encode("utf-8"))
2626        b = self.BufferedReader(raw)
2627        t = self.TextIOWrapper(b, encoding="utf-8")
2628        modname = self.TextIOWrapper.__module__
2629        self.assertRegex(repr(t),
2630                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
2631        raw.name = "dummy"
2632        self.assertRegex(repr(t),
2633                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2634        t.mode = "r"
2635        self.assertRegex(repr(t),
2636                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2637        raw.name = b"dummy"
2638        self.assertRegex(repr(t),
2639                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2640
2641        t.buffer.detach()
2642        repr(t)  # Should not raise an exception
2643
2644    def test_recursive_repr(self):
2645        # Issue #25455
2646        raw = self.BytesIO()
2647        t = self.TextIOWrapper(raw)
2648        with support.swap_attr(raw, 'name', t):
2649            try:
2650                repr(t)  # Should not crash
2651            except RuntimeError:
2652                pass
2653
2654    def test_line_buffering(self):
2655        r = self.BytesIO()
2656        b = self.BufferedWriter(r, 1000)
2657        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2658        t.write("X")
2659        self.assertEqual(r.getvalue(), b"")  # No flush happened
2660        t.write("Y\nZ")
2661        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2662        t.write("A\rB")
2663        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2664
2665    def test_reconfigure_line_buffering(self):
2666        r = self.BytesIO()
2667        b = self.BufferedWriter(r, 1000)
2668        t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2669        t.write("AB\nC")
2670        self.assertEqual(r.getvalue(), b"")
2671
2672        t.reconfigure(line_buffering=True)   # implicit flush
2673        self.assertEqual(r.getvalue(), b"AB\nC")
2674        t.write("DEF\nG")
2675        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2676        t.write("H")
2677        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2678        t.reconfigure(line_buffering=False)   # implicit flush
2679        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2680        t.write("IJ")
2681        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2682
2683        # Keeping default value
2684        t.reconfigure()
2685        t.reconfigure(line_buffering=None)
2686        self.assertEqual(t.line_buffering, False)
2687        t.reconfigure(line_buffering=True)
2688        t.reconfigure()
2689        t.reconfigure(line_buffering=None)
2690        self.assertEqual(t.line_buffering, True)
2691
2692    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2693    def test_default_encoding(self):
2694        old_environ = dict(os.environ)
2695        try:
2696            # try to get a user preferred encoding different than the current
2697            # locale encoding to check that TextIOWrapper() uses the current
2698            # locale encoding and not the user preferred encoding
2699            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2700                if key in os.environ:
2701                    del os.environ[key]
2702
2703            current_locale_encoding = locale.getpreferredencoding(False)
2704            b = self.BytesIO()
2705            t = self.TextIOWrapper(b)
2706            self.assertEqual(t.encoding, current_locale_encoding)
2707        finally:
2708            os.environ.clear()
2709            os.environ.update(old_environ)
2710
2711    @support.cpython_only
2712    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2713    def test_device_encoding(self):
2714        # Issue 15989
2715        import _testcapi
2716        b = self.BytesIO()
2717        b.fileno = lambda: _testcapi.INT_MAX + 1
2718        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2719        b.fileno = lambda: _testcapi.UINT_MAX + 1
2720        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2721
2722    def test_encoding(self):
2723        # Check the encoding attribute is always set, and valid
2724        b = self.BytesIO()
2725        t = self.TextIOWrapper(b, encoding="utf-8")
2726        self.assertEqual(t.encoding, "utf-8")
2727        t = self.TextIOWrapper(b)
2728        self.assertIsNotNone(t.encoding)
2729        codecs.lookup(t.encoding)
2730
2731    def test_encoding_errors_reading(self):
2732        # (1) default
2733        b = self.BytesIO(b"abc\n\xff\n")
2734        t = self.TextIOWrapper(b, encoding="ascii")
2735        self.assertRaises(UnicodeError, t.read)
2736        # (2) explicit strict
2737        b = self.BytesIO(b"abc\n\xff\n")
2738        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2739        self.assertRaises(UnicodeError, t.read)
2740        # (3) ignore
2741        b = self.BytesIO(b"abc\n\xff\n")
2742        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2743        self.assertEqual(t.read(), "abc\n\n")
2744        # (4) replace
2745        b = self.BytesIO(b"abc\n\xff\n")
2746        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2747        self.assertEqual(t.read(), "abc\n\ufffd\n")
2748
2749    def test_encoding_errors_writing(self):
2750        # (1) default
2751        b = self.BytesIO()
2752        t = self.TextIOWrapper(b, encoding="ascii")
2753        self.assertRaises(UnicodeError, t.write, "\xff")
2754        # (2) explicit strict
2755        b = self.BytesIO()
2756        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2757        self.assertRaises(UnicodeError, t.write, "\xff")
2758        # (3) ignore
2759        b = self.BytesIO()
2760        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2761                             newline="\n")
2762        t.write("abc\xffdef\n")
2763        t.flush()
2764        self.assertEqual(b.getvalue(), b"abcdef\n")
2765        # (4) replace
2766        b = self.BytesIO()
2767        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2768                             newline="\n")
2769        t.write("abc\xffdef\n")
2770        t.flush()
2771        self.assertEqual(b.getvalue(), b"abc?def\n")
2772
2773    def test_newlines(self):
2774        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2775
2776        tests = [
2777            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2778            [ '', input_lines ],
2779            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2780            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2781            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2782        ]
2783        encodings = (
2784            'utf-8', 'latin-1',
2785            'utf-16', 'utf-16-le', 'utf-16-be',
2786            'utf-32', 'utf-32-le', 'utf-32-be',
2787        )
2788
2789        # Try a range of buffer sizes to test the case where \r is the last
2790        # character in TextIOWrapper._pending_line.
2791        for encoding in encodings:
2792            # XXX: str.encode() should return bytes
2793            data = bytes(''.join(input_lines).encode(encoding))
2794            for do_reads in (False, True):
2795                for bufsize in range(1, 10):
2796                    for newline, exp_lines in tests:
2797                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2798                        textio = self.TextIOWrapper(bufio, newline=newline,
2799                                                  encoding=encoding)
2800                        if do_reads:
2801                            got_lines = []
2802                            while True:
2803                                c2 = textio.read(2)
2804                                if c2 == '':
2805                                    break
2806                                self.assertEqual(len(c2), 2)
2807                                got_lines.append(c2 + textio.readline())
2808                        else:
2809                            got_lines = list(textio)
2810
2811                        for got_line, exp_line in zip(got_lines, exp_lines):
2812                            self.assertEqual(got_line, exp_line)
2813                        self.assertEqual(len(got_lines), len(exp_lines))
2814
2815    def test_newlines_input(self):
2816        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2817        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2818        for newline, expected in [
2819            (None, normalized.decode("ascii").splitlines(keepends=True)),
2820            ("", testdata.decode("ascii").splitlines(keepends=True)),
2821            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2822            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2823            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2824            ]:
2825            buf = self.BytesIO(testdata)
2826            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2827            self.assertEqual(txt.readlines(), expected)
2828            txt.seek(0)
2829            self.assertEqual(txt.read(), "".join(expected))
2830
2831    def test_newlines_output(self):
2832        testdict = {
2833            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2834            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2835            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2836            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2837            }
2838        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2839        for newline, expected in tests:
2840            buf = self.BytesIO()
2841            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2842            txt.write("AAA\nB")
2843            txt.write("BB\nCCC\n")
2844            txt.write("X\rY\r\nZ")
2845            txt.flush()
2846            self.assertEqual(buf.closed, False)
2847            self.assertEqual(buf.getvalue(), expected)
2848
2849    def test_destructor(self):
2850        l = []
2851        base = self.BytesIO
2852        class MyBytesIO(base):
2853            def close(self):
2854                l.append(self.getvalue())
2855                base.close(self)
2856        b = MyBytesIO()
2857        t = self.TextIOWrapper(b, encoding="ascii")
2858        t.write("abc")
2859        del t
2860        support.gc_collect()
2861        self.assertEqual([b"abc"], l)
2862
2863    def test_override_destructor(self):
2864        record = []
2865        class MyTextIO(self.TextIOWrapper):
2866            def __del__(self):
2867                record.append(1)
2868                try:
2869                    f = super().__del__
2870                except AttributeError:
2871                    pass
2872                else:
2873                    f()
2874            def close(self):
2875                record.append(2)
2876                super().close()
2877            def flush(self):
2878                record.append(3)
2879                super().flush()
2880        b = self.BytesIO()
2881        t = MyTextIO(b, encoding="ascii")
2882        del t
2883        support.gc_collect()
2884        self.assertEqual(record, [1, 2, 3])
2885
2886    def test_error_through_destructor(self):
2887        # Test that the exception state is not modified by a destructor,
2888        # even if close() fails.
2889        rawio = self.CloseFailureIO()
2890        with support.catch_unraisable_exception() as cm:
2891            with self.assertRaises(AttributeError):
2892                self.TextIOWrapper(rawio).xyzzy
2893
2894            if not IOBASE_EMITS_UNRAISABLE:
2895                self.assertIsNone(cm.unraisable)
2896            elif cm.unraisable is not None:
2897                self.assertEqual(cm.unraisable.exc_type, OSError)
2898
2899    # Systematic tests of the text I/O API
2900
2901    def test_basic_io(self):
2902        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2903            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2904                f = self.open(support.TESTFN, "w+", encoding=enc)
2905                f._CHUNK_SIZE = chunksize
2906                self.assertEqual(f.write("abc"), 3)
2907                f.close()
2908                f = self.open(support.TESTFN, "r+", encoding=enc)
2909                f._CHUNK_SIZE = chunksize
2910                self.assertEqual(f.tell(), 0)
2911                self.assertEqual(f.read(), "abc")
2912                cookie = f.tell()
2913                self.assertEqual(f.seek(0), 0)
2914                self.assertEqual(f.read(None), "abc")
2915                f.seek(0)
2916                self.assertEqual(f.read(2), "ab")
2917                self.assertEqual(f.read(1), "c")
2918                self.assertEqual(f.read(1), "")
2919                self.assertEqual(f.read(), "")
2920                self.assertEqual(f.tell(), cookie)
2921                self.assertEqual(f.seek(0), 0)
2922                self.assertEqual(f.seek(0, 2), cookie)
2923                self.assertEqual(f.write("def"), 3)
2924                self.assertEqual(f.seek(cookie), cookie)
2925                self.assertEqual(f.read(), "def")
2926                if enc.startswith("utf"):
2927                    self.multi_line_test(f, enc)
2928                f.close()
2929
2930    def multi_line_test(self, f, enc):
2931        f.seek(0)
2932        f.truncate()
2933        sample = "s\xff\u0fff\uffff"
2934        wlines = []
2935        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2936            chars = []
2937            for i in range(size):
2938                chars.append(sample[i % len(sample)])
2939            line = "".join(chars) + "\n"
2940            wlines.append((f.tell(), line))
2941            f.write(line)
2942        f.seek(0)
2943        rlines = []
2944        while True:
2945            pos = f.tell()
2946            line = f.readline()
2947            if not line:
2948                break
2949            rlines.append((pos, line))
2950        self.assertEqual(rlines, wlines)
2951
2952    def test_telling(self):
2953        f = self.open(support.TESTFN, "w+", encoding="utf-8")
2954        p0 = f.tell()
2955        f.write("\xff\n")
2956        p1 = f.tell()
2957        f.write("\xff\n")
2958        p2 = f.tell()
2959        f.seek(0)
2960        self.assertEqual(f.tell(), p0)
2961        self.assertEqual(f.readline(), "\xff\n")
2962        self.assertEqual(f.tell(), p1)
2963        self.assertEqual(f.readline(), "\xff\n")
2964        self.assertEqual(f.tell(), p2)
2965        f.seek(0)
2966        for line in f:
2967            self.assertEqual(line, "\xff\n")
2968            self.assertRaises(OSError, f.tell)
2969        self.assertEqual(f.tell(), p2)
2970        f.close()
2971
2972    def test_seeking(self):
2973        chunk_size = _default_chunk_size()
2974        prefix_size = chunk_size - 2
2975        u_prefix = "a" * prefix_size
2976        prefix = bytes(u_prefix.encode("utf-8"))
2977        self.assertEqual(len(u_prefix), len(prefix))
2978        u_suffix = "\u8888\n"
2979        suffix = bytes(u_suffix.encode("utf-8"))
2980        line = prefix + suffix
2981        with self.open(support.TESTFN, "wb") as f:
2982            f.write(line*2)
2983        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2984            s = f.read(prefix_size)
2985            self.assertEqual(s, str(prefix, "ascii"))
2986            self.assertEqual(f.tell(), prefix_size)
2987            self.assertEqual(f.readline(), u_suffix)
2988
2989    def test_seeking_too(self):
2990        # Regression test for a specific bug
2991        data = b'\xe0\xbf\xbf\n'
2992        with self.open(support.TESTFN, "wb") as f:
2993            f.write(data)
2994        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2995            f._CHUNK_SIZE  # Just test that it exists
2996            f._CHUNK_SIZE = 2
2997            f.readline()
2998            f.tell()
2999
3000    def test_seek_and_tell(self):
3001        #Test seek/tell using the StatefulIncrementalDecoder.
3002        # Make test faster by doing smaller seeks
3003        CHUNK_SIZE = 128
3004
3005        def test_seek_and_tell_with_data(data, min_pos=0):
3006            """Tell/seek to various points within a data stream and ensure
3007            that the decoded data returned by read() is consistent."""
3008            f = self.open(support.TESTFN, 'wb')
3009            f.write(data)
3010            f.close()
3011            f = self.open(support.TESTFN, encoding='test_decoder')
3012            f._CHUNK_SIZE = CHUNK_SIZE
3013            decoded = f.read()
3014            f.close()
3015
3016            for i in range(min_pos, len(decoded) + 1): # seek positions
3017                for j in [1, 5, len(decoded) - i]: # read lengths
3018                    f = self.open(support.TESTFN, encoding='test_decoder')
3019                    self.assertEqual(f.read(i), decoded[:i])
3020                    cookie = f.tell()
3021                    self.assertEqual(f.read(j), decoded[i:i + j])
3022                    f.seek(cookie)
3023                    self.assertEqual(f.read(), decoded[i:])
3024                    f.close()
3025
3026        # Enable the test decoder.
3027        StatefulIncrementalDecoder.codecEnabled = 1
3028
3029        # Run the tests.
3030        try:
3031            # Try each test case.
3032            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3033                test_seek_and_tell_with_data(input)
3034
3035            # Position each test case so that it crosses a chunk boundary.
3036            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3037                offset = CHUNK_SIZE - len(input)//2
3038                prefix = b'.'*offset
3039                # Don't bother seeking into the prefix (takes too long).
3040                min_pos = offset*2
3041                test_seek_and_tell_with_data(prefix + input, min_pos)
3042
3043        # Ensure our test decoder won't interfere with subsequent tests.
3044        finally:
3045            StatefulIncrementalDecoder.codecEnabled = 0
3046
3047    def test_multibyte_seek_and_tell(self):
3048        f = self.open(support.TESTFN, "w", encoding="euc_jp")
3049        f.write("AB\n\u3046\u3048\n")
3050        f.close()
3051
3052        f = self.open(support.TESTFN, "r", encoding="euc_jp")
3053        self.assertEqual(f.readline(), "AB\n")
3054        p0 = f.tell()
3055        self.assertEqual(f.readline(), "\u3046\u3048\n")
3056        p1 = f.tell()
3057        f.seek(p0)
3058        self.assertEqual(f.readline(), "\u3046\u3048\n")
3059        self.assertEqual(f.tell(), p1)
3060        f.close()
3061
3062    def test_seek_with_encoder_state(self):
3063        f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3064        f.write("\u00e6\u0300")
3065        p0 = f.tell()
3066        f.write("\u00e6")
3067        f.seek(p0)
3068        f.write("\u0300")
3069        f.close()
3070
3071        f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3072        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3073        f.close()
3074
3075    def test_encoded_writes(self):
3076        data = "1234567890"
3077        tests = ("utf-16",
3078                 "utf-16-le",
3079                 "utf-16-be",
3080                 "utf-32",
3081                 "utf-32-le",
3082                 "utf-32-be")
3083        for encoding in tests:
3084            buf = self.BytesIO()
3085            f = self.TextIOWrapper(buf, encoding=encoding)
3086            # Check if the BOM is written only once (see issue1753).
3087            f.write(data)
3088            f.write(data)
3089            f.seek(0)
3090            self.assertEqual(f.read(), data * 2)
3091            f.seek(0)
3092            self.assertEqual(f.read(), data * 2)
3093            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3094
3095    def test_unreadable(self):
3096        class UnReadable(self.BytesIO):
3097            def readable(self):
3098                return False
3099        txt = self.TextIOWrapper(UnReadable())
3100        self.assertRaises(OSError, txt.read)
3101
3102    def test_read_one_by_one(self):
3103        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
3104        reads = ""
3105        while True:
3106            c = txt.read(1)
3107            if not c:
3108                break
3109            reads += c
3110        self.assertEqual(reads, "AA\nBB")
3111
3112    def test_readlines(self):
3113        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3114        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3115        txt.seek(0)
3116        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3117        txt.seek(0)
3118        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3119
3120    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3121    def test_read_by_chunk(self):
3122        # make sure "\r\n" straddles 128 char boundary.
3123        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
3124        reads = ""
3125        while True:
3126            c = txt.read(128)
3127            if not c:
3128                break
3129            reads += c
3130        self.assertEqual(reads, "A"*127+"\nB")
3131
3132    def test_writelines(self):
3133        l = ['ab', 'cd', 'ef']
3134        buf = self.BytesIO()
3135        txt = self.TextIOWrapper(buf)
3136        txt.writelines(l)
3137        txt.flush()
3138        self.assertEqual(buf.getvalue(), b'abcdef')
3139
3140    def test_writelines_userlist(self):
3141        l = UserList(['ab', 'cd', 'ef'])
3142        buf = self.BytesIO()
3143        txt = self.TextIOWrapper(buf)
3144        txt.writelines(l)
3145        txt.flush()
3146        self.assertEqual(buf.getvalue(), b'abcdef')
3147
3148    def test_writelines_error(self):
3149        txt = self.TextIOWrapper(self.BytesIO())
3150        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3151        self.assertRaises(TypeError, txt.writelines, None)
3152        self.assertRaises(TypeError, txt.writelines, b'abc')
3153
3154    def test_issue1395_1(self):
3155        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3156
3157        # read one char at a time
3158        reads = ""
3159        while True:
3160            c = txt.read(1)
3161            if not c:
3162                break
3163            reads += c
3164        self.assertEqual(reads, self.normalized)
3165
3166    def test_issue1395_2(self):
3167        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3168        txt._CHUNK_SIZE = 4
3169
3170        reads = ""
3171        while True:
3172            c = txt.read(4)
3173            if not c:
3174                break
3175            reads += c
3176        self.assertEqual(reads, self.normalized)
3177
3178    def test_issue1395_3(self):
3179        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3180        txt._CHUNK_SIZE = 4
3181
3182        reads = txt.read(4)
3183        reads += txt.read(4)
3184        reads += txt.readline()
3185        reads += txt.readline()
3186        reads += txt.readline()
3187        self.assertEqual(reads, self.normalized)
3188
3189    def test_issue1395_4(self):
3190        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3191        txt._CHUNK_SIZE = 4
3192
3193        reads = txt.read(4)
3194        reads += txt.read()
3195        self.assertEqual(reads, self.normalized)
3196
3197    def test_issue1395_5(self):
3198        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3199        txt._CHUNK_SIZE = 4
3200
3201        reads = txt.read(4)
3202        pos = txt.tell()
3203        txt.seek(0)
3204        txt.seek(pos)
3205        self.assertEqual(txt.read(4), "BBB\n")
3206
3207    def test_issue2282(self):
3208        buffer = self.BytesIO(self.testdata)
3209        txt = self.TextIOWrapper(buffer, encoding="ascii")
3210
3211        self.assertEqual(buffer.seekable(), txt.seekable())
3212
3213    def test_append_bom(self):
3214        # The BOM is not written again when appending to a non-empty file
3215        filename = support.TESTFN
3216        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3217            with self.open(filename, 'w', encoding=charset) as f:
3218                f.write('aaa')
3219                pos = f.tell()
3220            with self.open(filename, 'rb') as f:
3221                self.assertEqual(f.read(), 'aaa'.encode(charset))
3222
3223            with self.open(filename, 'a', encoding=charset) as f:
3224                f.write('xxx')
3225            with self.open(filename, 'rb') as f:
3226                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3227
3228    def test_seek_bom(self):
3229        # Same test, but when seeking manually
3230        filename = support.TESTFN
3231        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3232            with self.open(filename, 'w', encoding=charset) as f:
3233                f.write('aaa')
3234                pos = f.tell()
3235            with self.open(filename, 'r+', encoding=charset) as f:
3236                f.seek(pos)
3237                f.write('zzz')
3238                f.seek(0)
3239                f.write('bbb')
3240            with self.open(filename, 'rb') as f:
3241                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3242
3243    def test_seek_append_bom(self):
3244        # Same test, but first seek to the start and then to the end
3245        filename = support.TESTFN
3246        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3247            with self.open(filename, 'w', encoding=charset) as f:
3248                f.write('aaa')
3249            with self.open(filename, 'a', encoding=charset) as f:
3250                f.seek(0)
3251                f.seek(0, self.SEEK_END)
3252                f.write('xxx')
3253            with self.open(filename, 'rb') as f:
3254                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3255
3256    def test_errors_property(self):
3257        with self.open(support.TESTFN, "w") as f:
3258            self.assertEqual(f.errors, "strict")
3259        with self.open(support.TESTFN, "w", errors="replace") as f:
3260            self.assertEqual(f.errors, "replace")
3261
3262    @support.no_tracing
3263    def test_threads_write(self):
3264        # Issue6750: concurrent writes could duplicate data
3265        event = threading.Event()
3266        with self.open(support.TESTFN, "w", buffering=1) as f:
3267            def run(n):
3268                text = "Thread%03d\n" % n
3269                event.wait()
3270                f.write(text)
3271            threads = [threading.Thread(target=run, args=(x,))
3272                       for x in range(20)]
3273            with support.start_threads(threads, event.set):
3274                time.sleep(0.02)
3275        with self.open(support.TESTFN) as f:
3276            content = f.read()
3277            for n in range(20):
3278                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3279
3280    def test_flush_error_on_close(self):
3281        # Test that text file is closed despite failed flush
3282        # and that flush() is called before file closed.
3283        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3284        closed = []
3285        def bad_flush():
3286            closed[:] = [txt.closed, txt.buffer.closed]
3287            raise OSError()
3288        txt.flush = bad_flush
3289        self.assertRaises(OSError, txt.close) # exception not swallowed
3290        self.assertTrue(txt.closed)
3291        self.assertTrue(txt.buffer.closed)
3292        self.assertTrue(closed)      # flush() called
3293        self.assertFalse(closed[0])  # flush() called before file closed
3294        self.assertFalse(closed[1])
3295        txt.flush = lambda: None  # break reference loop
3296
3297    def test_close_error_on_close(self):
3298        buffer = self.BytesIO(self.testdata)
3299        def bad_flush():
3300            raise OSError('flush')
3301        def bad_close():
3302            raise OSError('close')
3303        buffer.close = bad_close
3304        txt = self.TextIOWrapper(buffer, encoding="ascii")
3305        txt.flush = bad_flush
3306        with self.assertRaises(OSError) as err: # exception not swallowed
3307            txt.close()
3308        self.assertEqual(err.exception.args, ('close',))
3309        self.assertIsInstance(err.exception.__context__, OSError)
3310        self.assertEqual(err.exception.__context__.args, ('flush',))
3311        self.assertFalse(txt.closed)
3312
3313        # Silence destructor error
3314        buffer.close = lambda: None
3315        txt.flush = lambda: None
3316
3317    def test_nonnormalized_close_error_on_close(self):
3318        # Issue #21677
3319        buffer = self.BytesIO(self.testdata)
3320        def bad_flush():
3321            raise non_existing_flush
3322        def bad_close():
3323            raise non_existing_close
3324        buffer.close = bad_close
3325        txt = self.TextIOWrapper(buffer, encoding="ascii")
3326        txt.flush = bad_flush
3327        with self.assertRaises(NameError) as err: # exception not swallowed
3328            txt.close()
3329        self.assertIn('non_existing_close', str(err.exception))
3330        self.assertIsInstance(err.exception.__context__, NameError)
3331        self.assertIn('non_existing_flush', str(err.exception.__context__))
3332        self.assertFalse(txt.closed)
3333
3334        # Silence destructor error
3335        buffer.close = lambda: None
3336        txt.flush = lambda: None
3337
3338    def test_multi_close(self):
3339        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3340        txt.close()
3341        txt.close()
3342        txt.close()
3343        self.assertRaises(ValueError, txt.flush)
3344
3345    def test_unseekable(self):
3346        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3347        self.assertRaises(self.UnsupportedOperation, txt.tell)
3348        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3349
3350    def test_readonly_attributes(self):
3351        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3352        buf = self.BytesIO(self.testdata)
3353        with self.assertRaises(AttributeError):
3354            txt.buffer = buf
3355
3356    def test_rawio(self):
3357        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3358        # that subprocess.Popen() can have the required unbuffered
3359        # semantics with universal_newlines=True.
3360        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3361        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3362        # Reads
3363        self.assertEqual(txt.read(4), 'abcd')
3364        self.assertEqual(txt.readline(), 'efghi\n')
3365        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3366
3367    def test_rawio_write_through(self):
3368        # Issue #12591: with write_through=True, writes don't need a flush
3369        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3370        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3371                                 write_through=True)
3372        txt.write('1')
3373        txt.write('23\n4')
3374        txt.write('5')
3375        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3376
3377    def test_bufio_write_through(self):
3378        # Issue #21396: write_through=True doesn't force a flush()
3379        # on the underlying binary buffered object.
3380        flush_called, write_called = [], []
3381        class BufferedWriter(self.BufferedWriter):
3382            def flush(self, *args, **kwargs):
3383                flush_called.append(True)
3384                return super().flush(*args, **kwargs)
3385            def write(self, *args, **kwargs):
3386                write_called.append(True)
3387                return super().write(*args, **kwargs)
3388
3389        rawio = self.BytesIO()
3390        data = b"a"
3391        bufio = BufferedWriter(rawio, len(data)*2)
3392        textio = self.TextIOWrapper(bufio, encoding='ascii',
3393                                    write_through=True)
3394        # write to the buffered io but don't overflow the buffer
3395        text = data.decode('ascii')
3396        textio.write(text)
3397
3398        # buffer.flush is not called with write_through=True
3399        self.assertFalse(flush_called)
3400        # buffer.write *is* called with write_through=True
3401        self.assertTrue(write_called)
3402        self.assertEqual(rawio.getvalue(), b"") # no flush
3403
3404        write_called = [] # reset
3405        textio.write(text * 10) # total content is larger than bufio buffer
3406        self.assertTrue(write_called)
3407        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3408
3409    def test_reconfigure_write_through(self):
3410        raw = self.MockRawIO([])
3411        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3412        t.write('1')
3413        t.reconfigure(write_through=True)  # implied flush
3414        self.assertEqual(t.write_through, True)
3415        self.assertEqual(b''.join(raw._write_stack), b'1')
3416        t.write('23')
3417        self.assertEqual(b''.join(raw._write_stack), b'123')
3418        t.reconfigure(write_through=False)
3419        self.assertEqual(t.write_through, False)
3420        t.write('45')
3421        t.flush()
3422        self.assertEqual(b''.join(raw._write_stack), b'12345')
3423        # Keeping default value
3424        t.reconfigure()
3425        t.reconfigure(write_through=None)
3426        self.assertEqual(t.write_through, False)
3427        t.reconfigure(write_through=True)
3428        t.reconfigure()
3429        t.reconfigure(write_through=None)
3430        self.assertEqual(t.write_through, True)
3431
3432    def test_read_nonbytes(self):
3433        # Issue #17106
3434        # Crash when underlying read() returns non-bytes
3435        t = self.TextIOWrapper(self.StringIO('a'))
3436        self.assertRaises(TypeError, t.read, 1)
3437        t = self.TextIOWrapper(self.StringIO('a'))
3438        self.assertRaises(TypeError, t.readline)
3439        t = self.TextIOWrapper(self.StringIO('a'))
3440        self.assertRaises(TypeError, t.read)
3441
3442    def test_illegal_encoder(self):
3443        # Issue 31271: Calling write() while the return value of encoder's
3444        # encode() is invalid shouldn't cause an assertion failure.
3445        rot13 = codecs.lookup("rot13")
3446        with support.swap_attr(rot13, '_is_text_encoding', True):
3447            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3448        self.assertRaises(TypeError, t.write, 'bar')
3449
3450    def test_illegal_decoder(self):
3451        # Issue #17106
3452        # Bypass the early encoding check added in issue 20404
3453        def _make_illegal_wrapper():
3454            quopri = codecs.lookup("quopri")
3455            quopri._is_text_encoding = True
3456            try:
3457                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3458                                       newline='\n', encoding="quopri")
3459            finally:
3460                quopri._is_text_encoding = False
3461            return t
3462        # Crash when decoder returns non-string
3463        t = _make_illegal_wrapper()
3464        self.assertRaises(TypeError, t.read, 1)
3465        t = _make_illegal_wrapper()
3466        self.assertRaises(TypeError, t.readline)
3467        t = _make_illegal_wrapper()
3468        self.assertRaises(TypeError, t.read)
3469
3470        # Issue 31243: calling read() while the return value of decoder's
3471        # getstate() is invalid should neither crash the interpreter nor
3472        # raise a SystemError.
3473        def _make_very_illegal_wrapper(getstate_ret_val):
3474            class BadDecoder:
3475                def getstate(self):
3476                    return getstate_ret_val
3477            def _get_bad_decoder(dummy):
3478                return BadDecoder()
3479            quopri = codecs.lookup("quopri")
3480            with support.swap_attr(quopri, 'incrementaldecoder',
3481                                   _get_bad_decoder):
3482                return _make_illegal_wrapper()
3483        t = _make_very_illegal_wrapper(42)
3484        self.assertRaises(TypeError, t.read, 42)
3485        t = _make_very_illegal_wrapper(())
3486        self.assertRaises(TypeError, t.read, 42)
3487        t = _make_very_illegal_wrapper((1, 2))
3488        self.assertRaises(TypeError, t.read, 42)
3489
3490    def _check_create_at_shutdown(self, **kwargs):
3491        # Issue #20037: creating a TextIOWrapper at shutdown
3492        # shouldn't crash the interpreter.
3493        iomod = self.io.__name__
3494        code = """if 1:
3495            import codecs
3496            import {iomod} as io
3497
3498            # Avoid looking up codecs at shutdown
3499            codecs.lookup('utf-8')
3500
3501            class C:
3502                def __init__(self):
3503                    self.buf = io.BytesIO()
3504                def __del__(self):
3505                    io.TextIOWrapper(self.buf, **{kwargs})
3506                    print("ok")
3507            c = C()
3508            """.format(iomod=iomod, kwargs=kwargs)
3509        return assert_python_ok("-c", code)
3510
3511    def test_create_at_shutdown_without_encoding(self):
3512        rc, out, err = self._check_create_at_shutdown()
3513        if err:
3514            # Can error out with a RuntimeError if the module state
3515            # isn't found.
3516            self.assertIn(self.shutdown_error, err.decode())
3517        else:
3518            self.assertEqual("ok", out.decode().strip())
3519
3520    def test_create_at_shutdown_with_encoding(self):
3521        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3522                                                      errors='strict')
3523        self.assertFalse(err)
3524        self.assertEqual("ok", out.decode().strip())
3525
3526    def test_read_byteslike(self):
3527        r = MemviewBytesIO(b'Just some random string\n')
3528        t = self.TextIOWrapper(r, 'utf-8')
3529
3530        # TextIOwrapper will not read the full string, because
3531        # we truncate it to a multiple of the native int size
3532        # so that we can construct a more complex memoryview.
3533        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3534
3535        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3536
3537    def test_issue22849(self):
3538        class F(object):
3539            def readable(self): return True
3540            def writable(self): return True
3541            def seekable(self): return True
3542
3543        for i in range(10):
3544            try:
3545                self.TextIOWrapper(F(), encoding='utf-8')
3546            except Exception:
3547                pass
3548
3549        F.tell = lambda x: 0
3550        t = self.TextIOWrapper(F(), encoding='utf-8')
3551
3552    def test_reconfigure_encoding_read(self):
3553        # latin1 -> utf8
3554        # (latin1 can decode utf-8 encoded string)
3555        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3556        raw = self.BytesIO(data)
3557        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3558        self.assertEqual(txt.readline(), 'abc\xe9\n')
3559        with self.assertRaises(self.UnsupportedOperation):
3560            txt.reconfigure(encoding='utf-8')
3561        with self.assertRaises(self.UnsupportedOperation):
3562            txt.reconfigure(newline=None)
3563
3564    def test_reconfigure_write_fromascii(self):
3565        # ascii has a specific encodefunc in the C implementation,
3566        # but utf-8-sig has not. Make sure that we get rid of the
3567        # cached encodefunc when we switch encoders.
3568        raw = self.BytesIO()
3569        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3570        txt.write('foo\n')
3571        txt.reconfigure(encoding='utf-8-sig')
3572        txt.write('\xe9\n')
3573        txt.flush()
3574        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3575
3576    def test_reconfigure_write(self):
3577        # latin -> utf8
3578        raw = self.BytesIO()
3579        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3580        txt.write('abc\xe9\n')
3581        txt.reconfigure(encoding='utf-8')
3582        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3583        txt.write('d\xe9f\n')
3584        txt.flush()
3585        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3586
3587        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3588        # the file
3589        raw = self.BytesIO()
3590        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3591        txt.write('abc\n')
3592        txt.reconfigure(encoding='utf-8-sig')
3593        txt.write('d\xe9f\n')
3594        txt.flush()
3595        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3596
3597    def test_reconfigure_write_non_seekable(self):
3598        raw = self.BytesIO()
3599        raw.seekable = lambda: False
3600        raw.seek = None
3601        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3602        txt.write('abc\n')
3603        txt.reconfigure(encoding='utf-8-sig')
3604        txt.write('d\xe9f\n')
3605        txt.flush()
3606
3607        # If the raw stream is not seekable, there'll be a BOM
3608        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3609
3610    def test_reconfigure_defaults(self):
3611        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3612        txt.reconfigure(encoding=None)
3613        self.assertEqual(txt.encoding, 'ascii')
3614        self.assertEqual(txt.errors, 'replace')
3615        txt.write('LF\n')
3616
3617        txt.reconfigure(newline='\r\n')
3618        self.assertEqual(txt.encoding, 'ascii')
3619        self.assertEqual(txt.errors, 'replace')
3620
3621        txt.reconfigure(errors='ignore')
3622        self.assertEqual(txt.encoding, 'ascii')
3623        self.assertEqual(txt.errors, 'ignore')
3624        txt.write('CRLF\n')
3625
3626        txt.reconfigure(encoding='utf-8', newline=None)
3627        self.assertEqual(txt.errors, 'strict')
3628        txt.seek(0)
3629        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3630
3631        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3632
3633    def test_reconfigure_newline(self):
3634        raw = self.BytesIO(b'CR\rEOF')
3635        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3636        txt.reconfigure(newline=None)
3637        self.assertEqual(txt.readline(), 'CR\n')
3638        raw = self.BytesIO(b'CR\rEOF')
3639        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3640        txt.reconfigure(newline='')
3641        self.assertEqual(txt.readline(), 'CR\r')
3642        raw = self.BytesIO(b'CR\rLF\nEOF')
3643        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3644        txt.reconfigure(newline='\n')
3645        self.assertEqual(txt.readline(), 'CR\rLF\n')
3646        raw = self.BytesIO(b'LF\nCR\rEOF')
3647        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3648        txt.reconfigure(newline='\r')
3649        self.assertEqual(txt.readline(), 'LF\nCR\r')
3650        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3651        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3652        txt.reconfigure(newline='\r\n')
3653        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3654
3655        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3656        txt.reconfigure(newline=None)
3657        txt.write('linesep\n')
3658        txt.reconfigure(newline='')
3659        txt.write('LF\n')
3660        txt.reconfigure(newline='\n')
3661        txt.write('LF\n')
3662        txt.reconfigure(newline='\r')
3663        txt.write('CR\n')
3664        txt.reconfigure(newline='\r\n')
3665        txt.write('CRLF\n')
3666        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3667        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3668
3669    def test_issue25862(self):
3670        # Assertion failures occurred in tell() after read() and write().
3671        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3672        t.read(1)
3673        t.read()
3674        t.tell()
3675        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3676        t.read(1)
3677        t.write('x')
3678        t.tell()
3679
3680
3681class MemviewBytesIO(io.BytesIO):
3682    '''A BytesIO object whose read method returns memoryviews
3683       rather than bytes'''
3684
3685    def read1(self, len_):
3686        return _to_memoryview(super().read1(len_))
3687
3688    def read(self, len_):
3689        return _to_memoryview(super().read(len_))
3690
3691def _to_memoryview(buf):
3692    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3693
3694    arr = array.array('i')
3695    idx = len(buf) - len(buf) % arr.itemsize
3696    arr.frombytes(buf[:idx])
3697    return memoryview(arr)
3698
3699
3700class CTextIOWrapperTest(TextIOWrapperTest):
3701    io = io
3702    shutdown_error = "LookupError: unknown encoding: ascii"
3703
3704    def test_initialization(self):
3705        r = self.BytesIO(b"\xc3\xa9\n\n")
3706        b = self.BufferedReader(r, 1000)
3707        t = self.TextIOWrapper(b)
3708        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3709        self.assertRaises(ValueError, t.read)
3710
3711        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3712        self.assertRaises(Exception, repr, t)
3713
3714    def test_garbage_collection(self):
3715        # C TextIOWrapper objects are collected, and collecting them flushes
3716        # all data to disk.
3717        # The Python version has __del__, so it ends in gc.garbage instead.
3718        with support.check_warnings(('', ResourceWarning)):
3719            rawio = io.FileIO(support.TESTFN, "wb")
3720            b = self.BufferedWriter(rawio)
3721            t = self.TextIOWrapper(b, encoding="ascii")
3722            t.write("456def")
3723            t.x = t
3724            wr = weakref.ref(t)
3725            del t
3726            support.gc_collect()
3727        self.assertIsNone(wr(), wr)
3728        with self.open(support.TESTFN, "rb") as f:
3729            self.assertEqual(f.read(), b"456def")
3730
3731    def test_rwpair_cleared_before_textio(self):
3732        # Issue 13070: TextIOWrapper's finalization would crash when called
3733        # after the reference to the underlying BufferedRWPair's writer got
3734        # cleared by the GC.
3735        for i in range(1000):
3736            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3737            t1 = self.TextIOWrapper(b1, encoding="ascii")
3738            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3739            t2 = self.TextIOWrapper(b2, encoding="ascii")
3740            # circular references
3741            t1.buddy = t2
3742            t2.buddy = t1
3743        support.gc_collect()
3744
3745    def test_del__CHUNK_SIZE_SystemError(self):
3746        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3747        with self.assertRaises(AttributeError):
3748            del t._CHUNK_SIZE
3749
3750
3751class PyTextIOWrapperTest(TextIOWrapperTest):
3752    io = pyio
3753    shutdown_error = "LookupError: unknown encoding: ascii"
3754
3755
3756class IncrementalNewlineDecoderTest(unittest.TestCase):
3757
3758    def check_newline_decoding_utf8(self, decoder):
3759        # UTF-8 specific tests for a newline decoder
3760        def _check_decode(b, s, **kwargs):
3761            # We exercise getstate() / setstate() as well as decode()
3762            state = decoder.getstate()
3763            self.assertEqual(decoder.decode(b, **kwargs), s)
3764            decoder.setstate(state)
3765            self.assertEqual(decoder.decode(b, **kwargs), s)
3766
3767        _check_decode(b'\xe8\xa2\x88', "\u8888")
3768
3769        _check_decode(b'\xe8', "")
3770        _check_decode(b'\xa2', "")
3771        _check_decode(b'\x88', "\u8888")
3772
3773        _check_decode(b'\xe8', "")
3774        _check_decode(b'\xa2', "")
3775        _check_decode(b'\x88', "\u8888")
3776
3777        _check_decode(b'\xe8', "")
3778        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3779
3780        decoder.reset()
3781        _check_decode(b'\n', "\n")
3782        _check_decode(b'\r', "")
3783        _check_decode(b'', "\n", final=True)
3784        _check_decode(b'\r', "\n", final=True)
3785
3786        _check_decode(b'\r', "")
3787        _check_decode(b'a', "\na")
3788
3789        _check_decode(b'\r\r\n', "\n\n")
3790        _check_decode(b'\r', "")
3791        _check_decode(b'\r', "\n")
3792        _check_decode(b'\na', "\na")
3793
3794        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3795        _check_decode(b'\xe8\xa2\x88', "\u8888")
3796        _check_decode(b'\n', "\n")
3797        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3798        _check_decode(b'\n', "\n")
3799
3800    def check_newline_decoding(self, decoder, encoding):
3801        result = []
3802        if encoding is not None:
3803            encoder = codecs.getincrementalencoder(encoding)()
3804            def _decode_bytewise(s):
3805                # Decode one byte at a time
3806                for b in encoder.encode(s):
3807                    result.append(decoder.decode(bytes([b])))
3808        else:
3809            encoder = None
3810            def _decode_bytewise(s):
3811                # Decode one char at a time
3812                for c in s:
3813                    result.append(decoder.decode(c))
3814        self.assertEqual(decoder.newlines, None)
3815        _decode_bytewise("abc\n\r")
3816        self.assertEqual(decoder.newlines, '\n')
3817        _decode_bytewise("\nabc")
3818        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3819        _decode_bytewise("abc\r")
3820        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3821        _decode_bytewise("abc")
3822        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3823        _decode_bytewise("abc\r")
3824        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3825        decoder.reset()
3826        input = "abc"
3827        if encoder is not None:
3828            encoder.reset()
3829            input = encoder.encode(input)
3830        self.assertEqual(decoder.decode(input), "abc")
3831        self.assertEqual(decoder.newlines, None)
3832
3833    def test_newline_decoder(self):
3834        encodings = (
3835            # None meaning the IncrementalNewlineDecoder takes unicode input
3836            # rather than bytes input
3837            None, 'utf-8', 'latin-1',
3838            'utf-16', 'utf-16-le', 'utf-16-be',
3839            'utf-32', 'utf-32-le', 'utf-32-be',
3840        )
3841        for enc in encodings:
3842            decoder = enc and codecs.getincrementaldecoder(enc)()
3843            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3844            self.check_newline_decoding(decoder, enc)
3845        decoder = codecs.getincrementaldecoder("utf-8")()
3846        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3847        self.check_newline_decoding_utf8(decoder)
3848        self.assertRaises(TypeError, decoder.setstate, 42)
3849
3850    def test_newline_bytes(self):
3851        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3852        def _check(dec):
3853            self.assertEqual(dec.newlines, None)
3854            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3855            self.assertEqual(dec.newlines, None)
3856            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3857            self.assertEqual(dec.newlines, None)
3858        dec = self.IncrementalNewlineDecoder(None, translate=False)
3859        _check(dec)
3860        dec = self.IncrementalNewlineDecoder(None, translate=True)
3861        _check(dec)
3862
3863    def test_translate(self):
3864        # issue 35062
3865        for translate in (-2, -1, 1, 2):
3866            decoder = codecs.getincrementaldecoder("utf-8")()
3867            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3868            self.check_newline_decoding_utf8(decoder)
3869        decoder = codecs.getincrementaldecoder("utf-8")()
3870        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3871        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3872
3873class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3874    pass
3875
3876class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3877    pass
3878
3879
3880# XXX Tests for open()
3881
3882class MiscIOTest(unittest.TestCase):
3883
3884    def tearDown(self):
3885        support.unlink(support.TESTFN)
3886
3887    def test___all__(self):
3888        for name in self.io.__all__:
3889            obj = getattr(self.io, name, None)
3890            self.assertIsNotNone(obj, name)
3891            if name in ("open", "open_code"):
3892                continue
3893            elif "error" in name.lower() or name == "UnsupportedOperation":
3894                self.assertTrue(issubclass(obj, Exception), name)
3895            elif not name.startswith("SEEK_"):
3896                self.assertTrue(issubclass(obj, self.IOBase))
3897
3898    def test_attributes(self):
3899        f = self.open(support.TESTFN, "wb", buffering=0)
3900        self.assertEqual(f.mode, "wb")
3901        f.close()
3902
3903        with support.check_warnings(('', DeprecationWarning)):
3904            f = self.open(support.TESTFN, "U")
3905        self.assertEqual(f.name,            support.TESTFN)
3906        self.assertEqual(f.buffer.name,     support.TESTFN)
3907        self.assertEqual(f.buffer.raw.name, support.TESTFN)
3908        self.assertEqual(f.mode,            "U")
3909        self.assertEqual(f.buffer.mode,     "rb")
3910        self.assertEqual(f.buffer.raw.mode, "rb")
3911        f.close()
3912
3913        f = self.open(support.TESTFN, "w+")
3914        self.assertEqual(f.mode,            "w+")
3915        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3916        self.assertEqual(f.buffer.raw.mode, "rb+")
3917
3918        g = self.open(f.fileno(), "wb", closefd=False)
3919        self.assertEqual(g.mode,     "wb")
3920        self.assertEqual(g.raw.mode, "wb")
3921        self.assertEqual(g.name,     f.fileno())
3922        self.assertEqual(g.raw.name, f.fileno())
3923        f.close()
3924        g.close()
3925
3926    def test_open_pipe_with_append(self):
3927        # bpo-27805: Ignore ESPIPE from lseek() in open().
3928        r, w = os.pipe()
3929        self.addCleanup(os.close, r)
3930        f = self.open(w, 'a')
3931        self.addCleanup(f.close)
3932        # Check that the file is marked non-seekable. On Windows, however, lseek
3933        # somehow succeeds on pipes.
3934        if sys.platform != 'win32':
3935            self.assertFalse(f.seekable())
3936
3937    def test_io_after_close(self):
3938        for kwargs in [
3939                {"mode": "w"},
3940                {"mode": "wb"},
3941                {"mode": "w", "buffering": 1},
3942                {"mode": "w", "buffering": 2},
3943                {"mode": "wb", "buffering": 0},
3944                {"mode": "r"},
3945                {"mode": "rb"},
3946                {"mode": "r", "buffering": 1},
3947                {"mode": "r", "buffering": 2},
3948                {"mode": "rb", "buffering": 0},
3949                {"mode": "w+"},
3950                {"mode": "w+b"},
3951                {"mode": "w+", "buffering": 1},
3952                {"mode": "w+", "buffering": 2},
3953                {"mode": "w+b", "buffering": 0},
3954            ]:
3955            f = self.open(support.TESTFN, **kwargs)
3956            f.close()
3957            self.assertRaises(ValueError, f.flush)
3958            self.assertRaises(ValueError, f.fileno)
3959            self.assertRaises(ValueError, f.isatty)
3960            self.assertRaises(ValueError, f.__iter__)
3961            if hasattr(f, "peek"):
3962                self.assertRaises(ValueError, f.peek, 1)
3963            self.assertRaises(ValueError, f.read)
3964            if hasattr(f, "read1"):
3965                self.assertRaises(ValueError, f.read1, 1024)
3966                self.assertRaises(ValueError, f.read1)
3967            if hasattr(f, "readall"):
3968                self.assertRaises(ValueError, f.readall)
3969            if hasattr(f, "readinto"):
3970                self.assertRaises(ValueError, f.readinto, bytearray(1024))
3971            if hasattr(f, "readinto1"):
3972                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
3973            self.assertRaises(ValueError, f.readline)
3974            self.assertRaises(ValueError, f.readlines)
3975            self.assertRaises(ValueError, f.readlines, 1)
3976            self.assertRaises(ValueError, f.seek, 0)
3977            self.assertRaises(ValueError, f.tell)
3978            self.assertRaises(ValueError, f.truncate)
3979            self.assertRaises(ValueError, f.write,
3980                              b"" if "b" in kwargs['mode'] else "")
3981            self.assertRaises(ValueError, f.writelines, [])
3982            self.assertRaises(ValueError, next, f)
3983
3984    def test_blockingioerror(self):
3985        # Various BlockingIOError issues
3986        class C(str):
3987            pass
3988        c = C("")
3989        b = self.BlockingIOError(1, c)
3990        c.b = b
3991        b.c = c
3992        wr = weakref.ref(c)
3993        del c, b
3994        support.gc_collect()
3995        self.assertIsNone(wr(), wr)
3996
3997    def test_abcs(self):
3998        # Test the visible base classes are ABCs.
3999        self.assertIsInstance(self.IOBase, abc.ABCMeta)
4000        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4001        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4002        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
4003
4004    def _check_abc_inheritance(self, abcmodule):
4005        with self.open(support.TESTFN, "wb", buffering=0) as f:
4006            self.assertIsInstance(f, abcmodule.IOBase)
4007            self.assertIsInstance(f, abcmodule.RawIOBase)
4008            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4009            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4010        with self.open(support.TESTFN, "wb") as f:
4011            self.assertIsInstance(f, abcmodule.IOBase)
4012            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4013            self.assertIsInstance(f, abcmodule.BufferedIOBase)
4014            self.assertNotIsInstance(f, abcmodule.TextIOBase)
4015        with self.open(support.TESTFN, "w") as f:
4016            self.assertIsInstance(f, abcmodule.IOBase)
4017            self.assertNotIsInstance(f, abcmodule.RawIOBase)
4018            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4019            self.assertIsInstance(f, abcmodule.TextIOBase)
4020
4021    def test_abc_inheritance(self):
4022        # Test implementations inherit from their respective ABCs
4023        self._check_abc_inheritance(self)
4024
4025    def test_abc_inheritance_official(self):
4026        # Test implementations inherit from the official ABCs of the
4027        # baseline "io" module.
4028        self._check_abc_inheritance(io)
4029
4030    def _check_warn_on_dealloc(self, *args, **kwargs):
4031        f = open(*args, **kwargs)
4032        r = repr(f)
4033        with self.assertWarns(ResourceWarning) as cm:
4034            f = None
4035            support.gc_collect()
4036        self.assertIn(r, str(cm.warning.args[0]))
4037
4038    def test_warn_on_dealloc(self):
4039        self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4040        self._check_warn_on_dealloc(support.TESTFN, "wb")
4041        self._check_warn_on_dealloc(support.TESTFN, "w")
4042
4043    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4044        fds = []
4045        def cleanup_fds():
4046            for fd in fds:
4047                try:
4048                    os.close(fd)
4049                except OSError as e:
4050                    if e.errno != errno.EBADF:
4051                        raise
4052        self.addCleanup(cleanup_fds)
4053        r, w = os.pipe()
4054        fds += r, w
4055        self._check_warn_on_dealloc(r, *args, **kwargs)
4056        # When using closefd=False, there's no warning
4057        r, w = os.pipe()
4058        fds += r, w
4059        with support.check_no_resource_warning(self):
4060            open(r, *args, closefd=False, **kwargs)
4061
4062    def test_warn_on_dealloc_fd(self):
4063        self._check_warn_on_dealloc_fd("rb", buffering=0)
4064        self._check_warn_on_dealloc_fd("rb")
4065        self._check_warn_on_dealloc_fd("r")
4066
4067
4068    def test_pickling(self):
4069        # Pickling file objects is forbidden
4070        for kwargs in [
4071                {"mode": "w"},
4072                {"mode": "wb"},
4073                {"mode": "wb", "buffering": 0},
4074                {"mode": "r"},
4075                {"mode": "rb"},
4076                {"mode": "rb", "buffering": 0},
4077                {"mode": "w+"},
4078                {"mode": "w+b"},
4079                {"mode": "w+b", "buffering": 0},
4080            ]:
4081            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4082                with self.open(support.TESTFN, **kwargs) as f:
4083                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
4084
4085    def test_nonblock_pipe_write_bigbuf(self):
4086        self._test_nonblock_pipe_write(16*1024)
4087
4088    def test_nonblock_pipe_write_smallbuf(self):
4089        self._test_nonblock_pipe_write(1024)
4090
4091    @unittest.skipUnless(hasattr(os, 'set_blocking'),
4092                         'os.set_blocking() required for this test')
4093    def _test_nonblock_pipe_write(self, bufsize):
4094        sent = []
4095        received = []
4096        r, w = os.pipe()
4097        os.set_blocking(r, False)
4098        os.set_blocking(w, False)
4099
4100        # To exercise all code paths in the C implementation we need
4101        # to play with buffer sizes.  For instance, if we choose a
4102        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4103        # then we will never get a partial write of the buffer.
4104        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4105        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4106
4107        with rf, wf:
4108            for N in 9999, 73, 7574:
4109                try:
4110                    i = 0
4111                    while True:
4112                        msg = bytes([i % 26 + 97]) * N
4113                        sent.append(msg)
4114                        wf.write(msg)
4115                        i += 1
4116
4117                except self.BlockingIOError as e:
4118                    self.assertEqual(e.args[0], errno.EAGAIN)
4119                    self.assertEqual(e.args[2], e.characters_written)
4120                    sent[-1] = sent[-1][:e.characters_written]
4121                    received.append(rf.read())
4122                    msg = b'BLOCKED'
4123                    wf.write(msg)
4124                    sent.append(msg)
4125
4126            while True:
4127                try:
4128                    wf.flush()
4129                    break
4130                except self.BlockingIOError as e:
4131                    self.assertEqual(e.args[0], errno.EAGAIN)
4132                    self.assertEqual(e.args[2], e.characters_written)
4133                    self.assertEqual(e.characters_written, 0)
4134                    received.append(rf.read())
4135
4136            received += iter(rf.read, None)
4137
4138        sent, received = b''.join(sent), b''.join(received)
4139        self.assertEqual(sent, received)
4140        self.assertTrue(wf.closed)
4141        self.assertTrue(rf.closed)
4142
4143    def test_create_fail(self):
4144        # 'x' mode fails if file is existing
4145        with self.open(support.TESTFN, 'w'):
4146            pass
4147        self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4148
4149    def test_create_writes(self):
4150        # 'x' mode opens for writing
4151        with self.open(support.TESTFN, 'xb') as f:
4152            f.write(b"spam")
4153        with self.open(support.TESTFN, 'rb') as f:
4154            self.assertEqual(b"spam", f.read())
4155
4156    def test_open_allargs(self):
4157        # there used to be a buffer overflow in the parser for rawmode
4158        self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4159
4160    def test_check_encoding_errors(self):
4161        # bpo-37388: open() and TextIOWrapper must check encoding and errors
4162        # arguments in dev mode
4163        mod = self.io.__name__
4164        filename = __file__
4165        invalid = 'Boom, Shaka Laka, Boom!'
4166        code = textwrap.dedent(f'''
4167            import sys
4168            from {mod} import open, TextIOWrapper
4169
4170            try:
4171                open({filename!r}, encoding={invalid!r})
4172            except LookupError:
4173                pass
4174            else:
4175                sys.exit(21)
4176
4177            try:
4178                open({filename!r}, errors={invalid!r})
4179            except LookupError:
4180                pass
4181            else:
4182                sys.exit(22)
4183
4184            fp = open({filename!r}, "rb")
4185            with fp:
4186                try:
4187                    TextIOWrapper(fp, encoding={invalid!r})
4188                except LookupError:
4189                    pass
4190                else:
4191                    sys.exit(23)
4192
4193                try:
4194                    TextIOWrapper(fp, errors={invalid!r})
4195                except LookupError:
4196                    pass
4197                else:
4198                    sys.exit(24)
4199
4200            sys.exit(10)
4201        ''')
4202        proc = assert_python_failure('-X', 'dev', '-c', code)
4203        self.assertEqual(proc.rc, 10, proc)
4204
4205
4206class CMiscIOTest(MiscIOTest):
4207    io = io
4208
4209    def test_readinto_buffer_overflow(self):
4210        # Issue #18025
4211        class BadReader(self.io.BufferedIOBase):
4212            def read(self, n=-1):
4213                return b'x' * 10**6
4214        bufio = BadReader()
4215        b = bytearray(2)
4216        self.assertRaises(ValueError, bufio.readinto, b)
4217
4218    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4219        # Issue #23309: deadlocks at shutdown should be avoided when a
4220        # daemon thread and the main thread both write to a file.
4221        code = """if 1:
4222            import sys
4223            import time
4224            import threading
4225            from test.support import SuppressCrashReport
4226
4227            file = sys.{stream_name}
4228
4229            def run():
4230                while True:
4231                    file.write('.')
4232                    file.flush()
4233
4234            crash = SuppressCrashReport()
4235            crash.__enter__()
4236            # don't call __exit__(): the crash occurs at Python shutdown
4237
4238            thread = threading.Thread(target=run)
4239            thread.daemon = True
4240            thread.start()
4241
4242            time.sleep(0.5)
4243            file.write('!')
4244            file.flush()
4245            """.format_map(locals())
4246        res, _ = run_python_until_end("-c", code)
4247        err = res.err.decode()
4248        if res.rc != 0:
4249            # Failure: should be a fatal error
4250            pattern = (r"Fatal Python error: _enter_buffered_busy: "
4251                       r"could not acquire lock "
4252                       r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4253                       r"at interpreter shutdown, possibly due to "
4254                       r"daemon threads".format_map(locals()))
4255            self.assertRegex(err, pattern)
4256        else:
4257            self.assertFalse(err.strip('.!'))
4258
4259    def test_daemon_threads_shutdown_stdout_deadlock(self):
4260        self.check_daemon_threads_shutdown_deadlock('stdout')
4261
4262    def test_daemon_threads_shutdown_stderr_deadlock(self):
4263        self.check_daemon_threads_shutdown_deadlock('stderr')
4264
4265
4266class PyMiscIOTest(MiscIOTest):
4267    io = pyio
4268
4269
4270@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4271class SignalsTest(unittest.TestCase):
4272
4273    def setUp(self):
4274        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4275
4276    def tearDown(self):
4277        signal.signal(signal.SIGALRM, self.oldalrm)
4278
4279    def alarm_interrupt(self, sig, frame):
4280        1/0
4281
4282    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4283        """Check that a partial write, when it gets interrupted, properly
4284        invokes the signal handler, and bubbles up the exception raised
4285        in the latter."""
4286        read_results = []
4287        def _read():
4288            s = os.read(r, 1)
4289            read_results.append(s)
4290
4291        t = threading.Thread(target=_read)
4292        t.daemon = True
4293        r, w = os.pipe()
4294        fdopen_kwargs["closefd"] = False
4295        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4296        try:
4297            wio = self.io.open(w, **fdopen_kwargs)
4298            if hasattr(signal, 'pthread_sigmask'):
4299                # create the thread with SIGALRM signal blocked
4300                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4301                t.start()
4302                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4303            else:
4304                t.start()
4305
4306            # Fill the pipe enough that the write will be blocking.
4307            # It will be interrupted by the timer armed above.  Since the
4308            # other thread has read one byte, the low-level write will
4309            # return with a successful (partial) result rather than an EINTR.
4310            # The buffered IO layer must check for pending signal
4311            # handlers, which in this case will invoke alarm_interrupt().
4312            signal.alarm(1)
4313            try:
4314                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4315            finally:
4316                signal.alarm(0)
4317                t.join()
4318            # We got one byte, get another one and check that it isn't a
4319            # repeat of the first one.
4320            read_results.append(os.read(r, 1))
4321            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4322        finally:
4323            os.close(w)
4324            os.close(r)
4325            # This is deliberate. If we didn't close the file descriptor
4326            # before closing wio, wio would try to flush its internal
4327            # buffer, and block again.
4328            try:
4329                wio.close()
4330            except OSError as e:
4331                if e.errno != errno.EBADF:
4332                    raise
4333
4334    def test_interrupted_write_unbuffered(self):
4335        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4336
4337    def test_interrupted_write_buffered(self):
4338        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4339
4340    def test_interrupted_write_text(self):
4341        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4342
4343    @support.no_tracing
4344    def check_reentrant_write(self, data, **fdopen_kwargs):
4345        def on_alarm(*args):
4346            # Will be called reentrantly from the same thread
4347            wio.write(data)
4348            1/0
4349        signal.signal(signal.SIGALRM, on_alarm)
4350        r, w = os.pipe()
4351        wio = self.io.open(w, **fdopen_kwargs)
4352        try:
4353            signal.alarm(1)
4354            # Either the reentrant call to wio.write() fails with RuntimeError,
4355            # or the signal handler raises ZeroDivisionError.
4356            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4357                while 1:
4358                    for i in range(100):
4359                        wio.write(data)
4360                        wio.flush()
4361                    # Make sure the buffer doesn't fill up and block further writes
4362                    os.read(r, len(data) * 100)
4363            exc = cm.exception
4364            if isinstance(exc, RuntimeError):
4365                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4366        finally:
4367            signal.alarm(0)
4368            wio.close()
4369            os.close(r)
4370
4371    def test_reentrant_write_buffered(self):
4372        self.check_reentrant_write(b"xy", mode="wb")
4373
4374    def test_reentrant_write_text(self):
4375        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4376
4377    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4378        """Check that a buffered read, when it gets interrupted (either
4379        returning a partial result or EINTR), properly invokes the signal
4380        handler and retries if the latter returned successfully."""
4381        r, w = os.pipe()
4382        fdopen_kwargs["closefd"] = False
4383        def alarm_handler(sig, frame):
4384            os.write(w, b"bar")
4385        signal.signal(signal.SIGALRM, alarm_handler)
4386        try:
4387            rio = self.io.open(r, **fdopen_kwargs)
4388            os.write(w, b"foo")
4389            signal.alarm(1)
4390            # Expected behaviour:
4391            # - first raw read() returns partial b"foo"
4392            # - second raw read() returns EINTR
4393            # - third raw read() returns b"bar"
4394            self.assertEqual(decode(rio.read(6)), "foobar")
4395        finally:
4396            signal.alarm(0)
4397            rio.close()
4398            os.close(w)
4399            os.close(r)
4400
4401    def test_interrupted_read_retry_buffered(self):
4402        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4403                                          mode="rb")
4404
4405    def test_interrupted_read_retry_text(self):
4406        self.check_interrupted_read_retry(lambda x: x,
4407                                          mode="r")
4408
4409    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4410        """Check that a buffered write, when it gets interrupted (either
4411        returning a partial result or EINTR), properly invokes the signal
4412        handler and retries if the latter returned successfully."""
4413        select = support.import_module("select")
4414
4415        # A quantity that exceeds the buffer size of an anonymous pipe's
4416        # write end.
4417        N = support.PIPE_MAX_SIZE
4418        r, w = os.pipe()
4419        fdopen_kwargs["closefd"] = False
4420
4421        # We need a separate thread to read from the pipe and allow the
4422        # write() to finish.  This thread is started after the SIGALRM is
4423        # received (forcing a first EINTR in write()).
4424        read_results = []
4425        write_finished = False
4426        error = None
4427        def _read():
4428            try:
4429                while not write_finished:
4430                    while r in select.select([r], [], [], 1.0)[0]:
4431                        s = os.read(r, 1024)
4432                        read_results.append(s)
4433            except BaseException as exc:
4434                nonlocal error
4435                error = exc
4436        t = threading.Thread(target=_read)
4437        t.daemon = True
4438        def alarm1(sig, frame):
4439            signal.signal(signal.SIGALRM, alarm2)
4440            signal.alarm(1)
4441        def alarm2(sig, frame):
4442            t.start()
4443
4444        large_data = item * N
4445        signal.signal(signal.SIGALRM, alarm1)
4446        try:
4447            wio = self.io.open(w, **fdopen_kwargs)
4448            signal.alarm(1)
4449            # Expected behaviour:
4450            # - first raw write() is partial (because of the limited pipe buffer
4451            #   and the first alarm)
4452            # - second raw write() returns EINTR (because of the second alarm)
4453            # - subsequent write()s are successful (either partial or complete)
4454            written = wio.write(large_data)
4455            self.assertEqual(N, written)
4456
4457            wio.flush()
4458            write_finished = True
4459            t.join()
4460
4461            self.assertIsNone(error)
4462            self.assertEqual(N, sum(len(x) for x in read_results))
4463        finally:
4464            signal.alarm(0)
4465            write_finished = True
4466            os.close(w)
4467            os.close(r)
4468            # This is deliberate. If we didn't close the file descriptor
4469            # before closing wio, wio would try to flush its internal
4470            # buffer, and could block (in case of failure).
4471            try:
4472                wio.close()
4473            except OSError as e:
4474                if e.errno != errno.EBADF:
4475                    raise
4476
4477    def test_interrupted_write_retry_buffered(self):
4478        self.check_interrupted_write_retry(b"x", mode="wb")
4479
4480    def test_interrupted_write_retry_text(self):
4481        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4482
4483
4484class CSignalsTest(SignalsTest):
4485    io = io
4486
4487class PySignalsTest(SignalsTest):
4488    io = pyio
4489
4490    # Handling reentrancy issues would slow down _pyio even more, so the
4491    # tests are disabled.
4492    test_reentrant_write_buffered = None
4493    test_reentrant_write_text = None
4494
4495
4496def load_tests(*args):
4497    tests = (CIOTest, PyIOTest, APIMismatchTest,
4498             CBufferedReaderTest, PyBufferedReaderTest,
4499             CBufferedWriterTest, PyBufferedWriterTest,
4500             CBufferedRWPairTest, PyBufferedRWPairTest,
4501             CBufferedRandomTest, PyBufferedRandomTest,
4502             StatefulIncrementalDecoderTest,
4503             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4504             CTextIOWrapperTest, PyTextIOWrapperTest,
4505             CMiscIOTest, PyMiscIOTest,
4506             CSignalsTest, PySignalsTest,
4507             )
4508
4509    # Put the namespaces of the IO module we are testing and some useful mock
4510    # classes in the __dict__ of each test.
4511    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4512             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4513             SlowFlushRawIO)
4514    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4515    c_io_ns = {name : getattr(io, name) for name in all_members}
4516    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4517    globs = globals()
4518    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4519    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4520    # Avoid turning open into a bound method.
4521    py_io_ns["open"] = pyio.OpenWrapper
4522    for test in tests:
4523        if test.__name__.startswith("C"):
4524            for name, obj in c_io_ns.items():
4525                setattr(test, name, obj)
4526        elif test.__name__.startswith("Py"):
4527            for name, obj in py_io_ns.items():
4528                setattr(test, name, obj)
4529
4530    suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4531    return suite
4532
4533if __name__ == "__main__":
4534    unittest.main()
4535