1"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11#     (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as an attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
21
22import abc
23import array
24import errno
25import locale
26import os
27import pickle
28import random
29import signal
30import sys
31import sysconfig
32import threading
33import time
34import unittest
35import warnings
36import weakref
37from collections import deque, UserList
38from itertools import cycle, count
39from test import support
40from test.support.script_helper import assert_python_ok, run_python_until_end
41from test.support import FakePath
42
43import codecs
44import io  # C implementation of io
45import _pyio as pyio # Python implementation of io
46
47try:
48    import ctypes
49except ImportError:
50    def byteslike(*pos, **kw):
51        return array.array("b", bytes(*pos, **kw))
52else:
53    def byteslike(*pos, **kw):
54        """Create a bytes-like object having no string or sequence methods"""
55        data = bytes(*pos, **kw)
56        obj = EmptyStruct()
57        ctypes.resize(obj, len(data))
58        memoryview(obj).cast("B")[:] = data
59        return obj
60    class EmptyStruct(ctypes.Structure):
61        pass
62
63_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66    '-fsanitize=memory' in _cflags or
67    '--with-memory-sanitizer' in _config_args
68)
69
70def _default_chunk_size():
71    """Get the default TextIOWrapper chunk size"""
72    with open(__file__, "r", encoding="latin-1") as f:
73        return f._CHUNK_SIZE
74
75
76class MockRawIOWithoutRead:
77    """A RawIO implementation without read(), so as to exercise the default
78    RawIO.read() which calls readinto()."""
79
80    def __init__(self, read_stack=()):
81        self._read_stack = list(read_stack)
82        self._write_stack = []
83        self._reads = 0
84        self._extraneous_reads = 0
85
86    def write(self, b):
87        self._write_stack.append(bytes(b))
88        return len(b)
89
90    def writable(self):
91        return True
92
93    def fileno(self):
94        return 42
95
96    def readable(self):
97        return True
98
99    def seekable(self):
100        return True
101
102    def seek(self, pos, whence):
103        return 0   # wrong but we gotta return something
104
105    def tell(self):
106        return 0   # same comment as above
107
108    def readinto(self, buf):
109        self._reads += 1
110        max_len = len(buf)
111        try:
112            data = self._read_stack[0]
113        except IndexError:
114            self._extraneous_reads += 1
115            return 0
116        if data is None:
117            del self._read_stack[0]
118            return None
119        n = len(data)
120        if len(data) <= max_len:
121            del self._read_stack[0]
122            buf[:n] = data
123            return n
124        else:
125            buf[:] = data[:max_len]
126            self._read_stack[0] = data[max_len:]
127            return max_len
128
129    def truncate(self, pos=None):
130        return pos
131
132class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
133    pass
134
135class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
136    pass
137
138
139class MockRawIO(MockRawIOWithoutRead):
140
141    def read(self, n=None):
142        self._reads += 1
143        try:
144            return self._read_stack.pop(0)
145        except:
146            self._extraneous_reads += 1
147            return b""
148
149class CMockRawIO(MockRawIO, io.RawIOBase):
150    pass
151
152class PyMockRawIO(MockRawIO, pyio.RawIOBase):
153    pass
154
155
156class MisbehavedRawIO(MockRawIO):
157    def write(self, b):
158        return super().write(b) * 2
159
160    def read(self, n=None):
161        return super().read(n) * 2
162
163    def seek(self, pos, whence):
164        return -123
165
166    def tell(self):
167        return -456
168
169    def readinto(self, buf):
170        super().readinto(buf)
171        return len(buf) * 5
172
173class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
174    pass
175
176class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
177    pass
178
179
180class SlowFlushRawIO(MockRawIO):
181    def __init__(self):
182        super().__init__()
183        self.in_flush = threading.Event()
184
185    def flush(self):
186        self.in_flush.set()
187        time.sleep(0.25)
188
189class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
190    pass
191
192class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
193    pass
194
195
196class CloseFailureIO(MockRawIO):
197    closed = 0
198
199    def close(self):
200        if not self.closed:
201            self.closed = 1
202            raise OSError
203
204class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
205    pass
206
207class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
208    pass
209
210
211class MockFileIO:
212
213    def __init__(self, data):
214        self.read_history = []
215        super().__init__(data)
216
217    def read(self, n=None):
218        res = super().read(n)
219        self.read_history.append(None if res is None else len(res))
220        return res
221
222    def readinto(self, b):
223        res = super().readinto(b)
224        self.read_history.append(res)
225        return res
226
227class CMockFileIO(MockFileIO, io.BytesIO):
228    pass
229
230class PyMockFileIO(MockFileIO, pyio.BytesIO):
231    pass
232
233
234class MockUnseekableIO:
235    def seekable(self):
236        return False
237
238    def seek(self, *args):
239        raise self.UnsupportedOperation("not seekable")
240
241    def tell(self, *args):
242        raise self.UnsupportedOperation("not seekable")
243
244    def truncate(self, *args):
245        raise self.UnsupportedOperation("not seekable")
246
247class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
248    UnsupportedOperation = io.UnsupportedOperation
249
250class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
251    UnsupportedOperation = pyio.UnsupportedOperation
252
253
254class MockNonBlockWriterIO:
255
256    def __init__(self):
257        self._write_stack = []
258        self._blocker_char = None
259
260    def pop_written(self):
261        s = b"".join(self._write_stack)
262        self._write_stack[:] = []
263        return s
264
265    def block_on(self, char):
266        """Block when a given char is encountered."""
267        self._blocker_char = char
268
269    def readable(self):
270        return True
271
272    def seekable(self):
273        return True
274
275    def writable(self):
276        return True
277
278    def write(self, b):
279        b = bytes(b)
280        n = -1
281        if self._blocker_char:
282            try:
283                n = b.index(self._blocker_char)
284            except ValueError:
285                pass
286            else:
287                if n > 0:
288                    # write data up to the first blocker
289                    self._write_stack.append(b[:n])
290                    return n
291                else:
292                    # cancel blocker and indicate would block
293                    self._blocker_char = None
294                    return None
295        self._write_stack.append(b)
296        return len(b)
297
298class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
299    BlockingIOError = io.BlockingIOError
300
301class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
302    BlockingIOError = pyio.BlockingIOError
303
304
305class IOTest(unittest.TestCase):
306
307    def setUp(self):
308        support.unlink(support.TESTFN)
309
310    def tearDown(self):
311        support.unlink(support.TESTFN)
312
313    def write_ops(self, f):
314        self.assertEqual(f.write(b"blah."), 5)
315        f.truncate(0)
316        self.assertEqual(f.tell(), 5)
317        f.seek(0)
318
319        self.assertEqual(f.write(b"blah."), 5)
320        self.assertEqual(f.seek(0), 0)
321        self.assertEqual(f.write(b"Hello."), 6)
322        self.assertEqual(f.tell(), 6)
323        self.assertEqual(f.seek(-1, 1), 5)
324        self.assertEqual(f.tell(), 5)
325        buffer = bytearray(b" world\n\n\n")
326        self.assertEqual(f.write(buffer), 9)
327        buffer[:] = b"*" * 9  # Overwrite our copy of the data
328        self.assertEqual(f.seek(0), 0)
329        self.assertEqual(f.write(b"h"), 1)
330        self.assertEqual(f.seek(-1, 2), 13)
331        self.assertEqual(f.tell(), 13)
332
333        self.assertEqual(f.truncate(12), 12)
334        self.assertEqual(f.tell(), 13)
335        self.assertRaises(TypeError, f.seek, 0.0)
336
337    def read_ops(self, f, buffered=False):
338        data = f.read(5)
339        self.assertEqual(data, b"hello")
340        data = byteslike(data)
341        self.assertEqual(f.readinto(data), 5)
342        self.assertEqual(bytes(data), b" worl")
343        data = bytearray(5)
344        self.assertEqual(f.readinto(data), 2)
345        self.assertEqual(len(data), 5)
346        self.assertEqual(data[:2], b"d\n")
347        self.assertEqual(f.seek(0), 0)
348        self.assertEqual(f.read(20), b"hello world\n")
349        self.assertEqual(f.read(1), b"")
350        self.assertEqual(f.readinto(byteslike(b"x")), 0)
351        self.assertEqual(f.seek(-6, 2), 6)
352        self.assertEqual(f.read(5), b"world")
353        self.assertEqual(f.read(0), b"")
354        self.assertEqual(f.readinto(byteslike()), 0)
355        self.assertEqual(f.seek(-6, 1), 5)
356        self.assertEqual(f.read(5), b" worl")
357        self.assertEqual(f.tell(), 10)
358        self.assertRaises(TypeError, f.seek, 0.0)
359        if buffered:
360            f.seek(0)
361            self.assertEqual(f.read(), b"hello world\n")
362            f.seek(6)
363            self.assertEqual(f.read(), b"world\n")
364            self.assertEqual(f.read(), b"")
365            f.seek(0)
366            data = byteslike(5)
367            self.assertEqual(f.readinto1(data), 5)
368            self.assertEqual(bytes(data), b"hello")
369
370    LARGE = 2**31
371
372    def large_file_ops(self, f):
373        assert f.readable()
374        assert f.writable()
375        try:
376            self.assertEqual(f.seek(self.LARGE), self.LARGE)
377        except (OverflowError, ValueError):
378            self.skipTest("no largefile support")
379        self.assertEqual(f.tell(), self.LARGE)
380        self.assertEqual(f.write(b"xxx"), 3)
381        self.assertEqual(f.tell(), self.LARGE + 3)
382        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
383        self.assertEqual(f.truncate(), self.LARGE + 2)
384        self.assertEqual(f.tell(), self.LARGE + 2)
385        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
386        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
387        self.assertEqual(f.tell(), self.LARGE + 2)
388        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
389        self.assertEqual(f.seek(-1, 2), self.LARGE)
390        self.assertEqual(f.read(2), b"x")
391
392    def test_invalid_operations(self):
393        # Try writing on a file opened in read mode and vice-versa.
394        exc = self.UnsupportedOperation
395        for mode in ("w", "wb"):
396            with self.open(support.TESTFN, mode) as fp:
397                self.assertRaises(exc, fp.read)
398                self.assertRaises(exc, fp.readline)
399        with self.open(support.TESTFN, "wb", buffering=0) as fp:
400            self.assertRaises(exc, fp.read)
401            self.assertRaises(exc, fp.readline)
402        with self.open(support.TESTFN, "rb", buffering=0) as fp:
403            self.assertRaises(exc, fp.write, b"blah")
404            self.assertRaises(exc, fp.writelines, [b"blah\n"])
405        with self.open(support.TESTFN, "rb") as fp:
406            self.assertRaises(exc, fp.write, b"blah")
407            self.assertRaises(exc, fp.writelines, [b"blah\n"])
408        with self.open(support.TESTFN, "r") as fp:
409            self.assertRaises(exc, fp.write, "blah")
410            self.assertRaises(exc, fp.writelines, ["blah\n"])
411            # Non-zero seeking from current or end pos
412            self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
413            self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
414
415    def test_optional_abilities(self):
416        # Test for OSError when optional APIs are not supported
417        # The purpose of this test is to try fileno(), reading, writing and
418        # seeking operations with various objects that indicate they do not
419        # support these operations.
420
421        def pipe_reader():
422            [r, w] = os.pipe()
423            os.close(w)  # So that read() is harmless
424            return self.FileIO(r, "r")
425
426        def pipe_writer():
427            [r, w] = os.pipe()
428            self.addCleanup(os.close, r)
429            # Guarantee that we can write into the pipe without blocking
430            thread = threading.Thread(target=os.read, args=(r, 100))
431            thread.start()
432            self.addCleanup(thread.join)
433            return self.FileIO(w, "w")
434
435        def buffered_reader():
436            return self.BufferedReader(self.MockUnseekableIO())
437
438        def buffered_writer():
439            return self.BufferedWriter(self.MockUnseekableIO())
440
441        def buffered_random():
442            return self.BufferedRandom(self.BytesIO())
443
444        def buffered_rw_pair():
445            return self.BufferedRWPair(self.MockUnseekableIO(),
446                self.MockUnseekableIO())
447
448        def text_reader():
449            class UnseekableReader(self.MockUnseekableIO):
450                writable = self.BufferedIOBase.writable
451                write = self.BufferedIOBase.write
452            return self.TextIOWrapper(UnseekableReader(), "ascii")
453
454        def text_writer():
455            class UnseekableWriter(self.MockUnseekableIO):
456                readable = self.BufferedIOBase.readable
457                read = self.BufferedIOBase.read
458            return self.TextIOWrapper(UnseekableWriter(), "ascii")
459
460        tests = (
461            (pipe_reader, "fr"), (pipe_writer, "fw"),
462            (buffered_reader, "r"), (buffered_writer, "w"),
463            (buffered_random, "rws"), (buffered_rw_pair, "rw"),
464            (text_reader, "r"), (text_writer, "w"),
465            (self.BytesIO, "rws"), (self.StringIO, "rws"),
466        )
467        for [test, abilities] in tests:
468            with self.subTest(test), test() as obj:
469                readable = "r" in abilities
470                self.assertEqual(obj.readable(), readable)
471                writable = "w" in abilities
472                self.assertEqual(obj.writable(), writable)
473
474                if isinstance(obj, self.TextIOBase):
475                    data = "3"
476                elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
477                    data = b"3"
478                else:
479                    self.fail("Unknown base class")
480
481                if "f" in abilities:
482                    obj.fileno()
483                else:
484                    self.assertRaises(OSError, obj.fileno)
485
486                if readable:
487                    obj.read(1)
488                    obj.read()
489                else:
490                    self.assertRaises(OSError, obj.read, 1)
491                    self.assertRaises(OSError, obj.read)
492
493                if writable:
494                    obj.write(data)
495                else:
496                    self.assertRaises(OSError, obj.write, data)
497
498                if sys.platform.startswith("win") and test in (
499                        pipe_reader, pipe_writer):
500                    # Pipes seem to appear as seekable on Windows
501                    continue
502                seekable = "s" in abilities
503                self.assertEqual(obj.seekable(), seekable)
504
505                if seekable:
506                    obj.tell()
507                    obj.seek(0)
508                else:
509                    self.assertRaises(OSError, obj.tell)
510                    self.assertRaises(OSError, obj.seek, 0)
511
512                if writable and seekable:
513                    obj.truncate()
514                    obj.truncate(0)
515                else:
516                    self.assertRaises(OSError, obj.truncate)
517                    self.assertRaises(OSError, obj.truncate, 0)
518
519    def test_open_handles_NUL_chars(self):
520        fn_with_NUL = 'foo\0bar'
521        self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
522
523        bytes_fn = bytes(fn_with_NUL, 'ascii')
524        with warnings.catch_warnings():
525            warnings.simplefilter("ignore", DeprecationWarning)
526            self.assertRaises(ValueError, self.open, bytes_fn, 'w')
527
528    def test_raw_file_io(self):
529        with self.open(support.TESTFN, "wb", buffering=0) as f:
530            self.assertEqual(f.readable(), False)
531            self.assertEqual(f.writable(), True)
532            self.assertEqual(f.seekable(), True)
533            self.write_ops(f)
534        with self.open(support.TESTFN, "rb", buffering=0) as f:
535            self.assertEqual(f.readable(), True)
536            self.assertEqual(f.writable(), False)
537            self.assertEqual(f.seekable(), True)
538            self.read_ops(f)
539
540    def test_buffered_file_io(self):
541        with self.open(support.TESTFN, "wb") as f:
542            self.assertEqual(f.readable(), False)
543            self.assertEqual(f.writable(), True)
544            self.assertEqual(f.seekable(), True)
545            self.write_ops(f)
546        with self.open(support.TESTFN, "rb") as f:
547            self.assertEqual(f.readable(), True)
548            self.assertEqual(f.writable(), False)
549            self.assertEqual(f.seekable(), True)
550            self.read_ops(f, True)
551
552    def test_readline(self):
553        with self.open(support.TESTFN, "wb") as f:
554            f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
555        with self.open(support.TESTFN, "rb") as f:
556            self.assertEqual(f.readline(), b"abc\n")
557            self.assertEqual(f.readline(10), b"def\n")
558            self.assertEqual(f.readline(2), b"xy")
559            self.assertEqual(f.readline(4), b"zzy\n")
560            self.assertEqual(f.readline(), b"foo\x00bar\n")
561            self.assertEqual(f.readline(None), b"another line")
562            self.assertRaises(TypeError, f.readline, 5.3)
563        with self.open(support.TESTFN, "r") as f:
564            self.assertRaises(TypeError, f.readline, 5.3)
565
566    def test_readline_nonsizeable(self):
567        # Issue #30061
568        # Crash when readline() returns an object without __len__
569        class R(self.IOBase):
570            def readline(self):
571                return None
572        self.assertRaises((TypeError, StopIteration), next, R())
573
574    def test_next_nonsizeable(self):
575        # Issue #30061
576        # Crash when __next__() returns an object without __len__
577        class R(self.IOBase):
578            def __next__(self):
579                return None
580        self.assertRaises(TypeError, R().readlines, 1)
581
582    def test_raw_bytes_io(self):
583        f = self.BytesIO()
584        self.write_ops(f)
585        data = f.getvalue()
586        self.assertEqual(data, b"hello world\n")
587        f = self.BytesIO(data)
588        self.read_ops(f, True)
589
590    def test_large_file_ops(self):
591        # On Windows and Mac OSX this test consumes large resources; It takes
592        # a long time to build the >2 GiB file and takes >2 GiB of disk space
593        # therefore the resource must be enabled to run this test.
594        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
595            support.requires(
596                'largefile',
597                'test requires %s bytes and a long time to run' % self.LARGE)
598        with self.open(support.TESTFN, "w+b", 0) as f:
599            self.large_file_ops(f)
600        with self.open(support.TESTFN, "w+b") as f:
601            self.large_file_ops(f)
602
603    def test_with_open(self):
604        for bufsize in (0, 1, 100):
605            f = None
606            with self.open(support.TESTFN, "wb", bufsize) as f:
607                f.write(b"xxx")
608            self.assertEqual(f.closed, True)
609            f = None
610            try:
611                with self.open(support.TESTFN, "wb", bufsize) as f:
612                    1/0
613            except ZeroDivisionError:
614                self.assertEqual(f.closed, True)
615            else:
616                self.fail("1/0 didn't raise an exception")
617
618    # issue 5008
619    def test_append_mode_tell(self):
620        with self.open(support.TESTFN, "wb") as f:
621            f.write(b"xxx")
622        with self.open(support.TESTFN, "ab", buffering=0) as f:
623            self.assertEqual(f.tell(), 3)
624        with self.open(support.TESTFN, "ab") as f:
625            self.assertEqual(f.tell(), 3)
626        with self.open(support.TESTFN, "a") as f:
627            self.assertGreater(f.tell(), 0)
628
629    def test_destructor(self):
630        record = []
631        class MyFileIO(self.FileIO):
632            def __del__(self):
633                record.append(1)
634                try:
635                    f = super().__del__
636                except AttributeError:
637                    pass
638                else:
639                    f()
640            def close(self):
641                record.append(2)
642                super().close()
643            def flush(self):
644                record.append(3)
645                super().flush()
646        with support.check_warnings(('', ResourceWarning)):
647            f = MyFileIO(support.TESTFN, "wb")
648            f.write(b"xxx")
649            del f
650            support.gc_collect()
651            self.assertEqual(record, [1, 2, 3])
652            with self.open(support.TESTFN, "rb") as f:
653                self.assertEqual(f.read(), b"xxx")
654
655    def _check_base_destructor(self, base):
656        record = []
657        class MyIO(base):
658            def __init__(self):
659                # This exercises the availability of attributes on object
660                # destruction.
661                # (in the C version, close() is called by the tp_dealloc
662                # function, not by __del__)
663                self.on_del = 1
664                self.on_close = 2
665                self.on_flush = 3
666            def __del__(self):
667                record.append(self.on_del)
668                try:
669                    f = super().__del__
670                except AttributeError:
671                    pass
672                else:
673                    f()
674            def close(self):
675                record.append(self.on_close)
676                super().close()
677            def flush(self):
678                record.append(self.on_flush)
679                super().flush()
680        f = MyIO()
681        del f
682        support.gc_collect()
683        self.assertEqual(record, [1, 2, 3])
684
685    def test_IOBase_destructor(self):
686        self._check_base_destructor(self.IOBase)
687
688    def test_RawIOBase_destructor(self):
689        self._check_base_destructor(self.RawIOBase)
690
691    def test_BufferedIOBase_destructor(self):
692        self._check_base_destructor(self.BufferedIOBase)
693
694    def test_TextIOBase_destructor(self):
695        self._check_base_destructor(self.TextIOBase)
696
697    def test_close_flushes(self):
698        with self.open(support.TESTFN, "wb") as f:
699            f.write(b"xxx")
700        with self.open(support.TESTFN, "rb") as f:
701            self.assertEqual(f.read(), b"xxx")
702
703    def test_array_writes(self):
704        a = array.array('i', range(10))
705        n = len(a.tobytes())
706        def check(f):
707            with f:
708                self.assertEqual(f.write(a), n)
709                f.writelines((a,))
710        check(self.BytesIO())
711        check(self.FileIO(support.TESTFN, "w"))
712        check(self.BufferedWriter(self.MockRawIO()))
713        check(self.BufferedRandom(self.MockRawIO()))
714        check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
715
716    def test_closefd(self):
717        self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
718                          closefd=False)
719
720    def test_read_closed(self):
721        with self.open(support.TESTFN, "w") as f:
722            f.write("egg\n")
723        with self.open(support.TESTFN, "r") as f:
724            file = self.open(f.fileno(), "r", closefd=False)
725            self.assertEqual(file.read(), "egg\n")
726            file.seek(0)
727            file.close()
728            self.assertRaises(ValueError, file.read)
729
730    def test_no_closefd_with_filename(self):
731        # can't use closefd in combination with a file name
732        self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
733
734    def test_closefd_attr(self):
735        with self.open(support.TESTFN, "wb") as f:
736            f.write(b"egg\n")
737        with self.open(support.TESTFN, "r") as f:
738            self.assertEqual(f.buffer.raw.closefd, True)
739            file = self.open(f.fileno(), "r", closefd=False)
740            self.assertEqual(file.buffer.raw.closefd, False)
741
742    def test_garbage_collection(self):
743        # FileIO objects are collected, and collecting them flushes
744        # all data to disk.
745        with support.check_warnings(('', ResourceWarning)):
746            f = self.FileIO(support.TESTFN, "wb")
747            f.write(b"abcxxx")
748            f.f = f
749            wr = weakref.ref(f)
750            del f
751            support.gc_collect()
752        self.assertIsNone(wr(), wr)
753        with self.open(support.TESTFN, "rb") as f:
754            self.assertEqual(f.read(), b"abcxxx")
755
756    def test_unbounded_file(self):
757        # Issue #1174606: reading from an unbounded stream such as /dev/zero.
758        zero = "/dev/zero"
759        if not os.path.exists(zero):
760            self.skipTest("{0} does not exist".format(zero))
761        if sys.maxsize > 0x7FFFFFFF:
762            self.skipTest("test can only run in a 32-bit address space")
763        if support.real_max_memuse < support._2G:
764            self.skipTest("test requires at least 2 GiB of memory")
765        with self.open(zero, "rb", buffering=0) as f:
766            self.assertRaises(OverflowError, f.read)
767        with self.open(zero, "rb") as f:
768            self.assertRaises(OverflowError, f.read)
769        with self.open(zero, "r") as f:
770            self.assertRaises(OverflowError, f.read)
771
772    def check_flush_error_on_close(self, *args, **kwargs):
773        # Test that the file is closed despite failed flush
774        # and that flush() is called before file closed.
775        f = self.open(*args, **kwargs)
776        closed = []
777        def bad_flush():
778            closed[:] = [f.closed]
779            raise OSError()
780        f.flush = bad_flush
781        self.assertRaises(OSError, f.close) # exception not swallowed
782        self.assertTrue(f.closed)
783        self.assertTrue(closed)      # flush() called
784        self.assertFalse(closed[0])  # flush() called before file closed
785        f.flush = lambda: None  # break reference loop
786
787    def test_flush_error_on_close(self):
788        # raw file
789        # Issue #5700: io.FileIO calls flush() after file closed
790        self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
791        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
792        self.check_flush_error_on_close(fd, 'wb', buffering=0)
793        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
794        self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
795        os.close(fd)
796        # buffered io
797        self.check_flush_error_on_close(support.TESTFN, 'wb')
798        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
799        self.check_flush_error_on_close(fd, 'wb')
800        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
801        self.check_flush_error_on_close(fd, 'wb', closefd=False)
802        os.close(fd)
803        # text io
804        self.check_flush_error_on_close(support.TESTFN, 'w')
805        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806        self.check_flush_error_on_close(fd, 'w')
807        fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808        self.check_flush_error_on_close(fd, 'w', closefd=False)
809        os.close(fd)
810
811    def test_multi_close(self):
812        f = self.open(support.TESTFN, "wb", buffering=0)
813        f.close()
814        f.close()
815        f.close()
816        self.assertRaises(ValueError, f.flush)
817
818    def test_RawIOBase_read(self):
819        # Exercise the default limited RawIOBase.read(n) implementation (which
820        # calls readinto() internally).
821        rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
822        self.assertEqual(rawio.read(2), b"ab")
823        self.assertEqual(rawio.read(2), b"c")
824        self.assertEqual(rawio.read(2), b"d")
825        self.assertEqual(rawio.read(2), None)
826        self.assertEqual(rawio.read(2), b"ef")
827        self.assertEqual(rawio.read(2), b"g")
828        self.assertEqual(rawio.read(2), None)
829        self.assertEqual(rawio.read(2), b"")
830
831    def test_types_have_dict(self):
832        test = (
833            self.IOBase(),
834            self.RawIOBase(),
835            self.TextIOBase(),
836            self.StringIO(),
837            self.BytesIO()
838        )
839        for obj in test:
840            self.assertTrue(hasattr(obj, "__dict__"))
841
842    def test_opener(self):
843        with self.open(support.TESTFN, "w") as f:
844            f.write("egg\n")
845        fd = os.open(support.TESTFN, os.O_RDONLY)
846        def opener(path, flags):
847            return fd
848        with self.open("non-existent", "r", opener=opener) as f:
849            self.assertEqual(f.read(), "egg\n")
850
851    def test_bad_opener_negative_1(self):
852        # Issue #27066.
853        def badopener(fname, flags):
854            return -1
855        with self.assertRaises(ValueError) as cm:
856            open('non-existent', 'r', opener=badopener)
857        self.assertEqual(str(cm.exception), 'opener returned -1')
858
859    def test_bad_opener_other_negative(self):
860        # Issue #27066.
861        def badopener(fname, flags):
862            return -2
863        with self.assertRaises(ValueError) as cm:
864            open('non-existent', 'r', opener=badopener)
865        self.assertEqual(str(cm.exception), 'opener returned -2')
866
867    def test_fileio_closefd(self):
868        # Issue #4841
869        with self.open(__file__, 'rb') as f1, \
870             self.open(__file__, 'rb') as f2:
871            fileio = self.FileIO(f1.fileno(), closefd=False)
872            # .__init__() must not close f1
873            fileio.__init__(f2.fileno(), closefd=False)
874            f1.readline()
875            # .close() must not close f2
876            fileio.close()
877            f2.readline()
878
879    def test_nonbuffered_textio(self):
880        with support.check_no_resource_warning(self):
881            with self.assertRaises(ValueError):
882                self.open(support.TESTFN, 'w', buffering=0)
883
884    def test_invalid_newline(self):
885        with support.check_no_resource_warning(self):
886            with self.assertRaises(ValueError):
887                self.open(support.TESTFN, 'w', newline='invalid')
888
889    def test_buffered_readinto_mixin(self):
890        # Test the implementation provided by BufferedIOBase
891        class Stream(self.BufferedIOBase):
892            def read(self, size):
893                return b"12345"
894            read1 = read
895        stream = Stream()
896        for method in ("readinto", "readinto1"):
897            with self.subTest(method):
898                buffer = byteslike(5)
899                self.assertEqual(getattr(stream, method)(buffer), 5)
900                self.assertEqual(bytes(buffer), b"12345")
901
902    def test_fspath_support(self):
903        def check_path_succeeds(path):
904            with self.open(path, "w") as f:
905                f.write("egg\n")
906
907            with self.open(path, "r") as f:
908                self.assertEqual(f.read(), "egg\n")
909
910        check_path_succeeds(FakePath(support.TESTFN))
911        check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
912
913        with self.open(support.TESTFN, "w") as f:
914            bad_path = FakePath(f.fileno())
915            with self.assertRaises(TypeError):
916                self.open(bad_path, 'w')
917
918        bad_path = FakePath(None)
919        with self.assertRaises(TypeError):
920            self.open(bad_path, 'w')
921
922        bad_path = FakePath(FloatingPointError)
923        with self.assertRaises(FloatingPointError):
924            self.open(bad_path, 'w')
925
926        # ensure that refcounting is correct with some error conditions
927        with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
928            self.open(FakePath(support.TESTFN), 'rwxa')
929
930    def test_RawIOBase_readall(self):
931        # Exercise the default unlimited RawIOBase.read() and readall()
932        # implementations.
933        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
934        self.assertEqual(rawio.read(), b"abcdefg")
935        rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
936        self.assertEqual(rawio.readall(), b"abcdefg")
937
938    def test_BufferedIOBase_readinto(self):
939        # Exercise the default BufferedIOBase.readinto() and readinto1()
940        # implementations (which call read() or read1() internally).
941        class Reader(self.BufferedIOBase):
942            def __init__(self, avail):
943                self.avail = avail
944            def read(self, size):
945                result = self.avail[:size]
946                self.avail = self.avail[size:]
947                return result
948            def read1(self, size):
949                """Returns no more than 5 bytes at once"""
950                return self.read(min(size, 5))
951        tests = (
952            # (test method, total data available, read buffer size, expected
953            #     read size)
954            ("readinto", 10, 5, 5),
955            ("readinto", 10, 6, 6),  # More than read1() can return
956            ("readinto", 5, 6, 5),  # Buffer larger than total available
957            ("readinto", 6, 7, 6),
958            ("readinto", 10, 0, 0),  # Empty buffer
959            ("readinto1", 10, 5, 5),  # Result limited to single read1() call
960            ("readinto1", 10, 6, 5),  # Buffer larger than read1() can return
961            ("readinto1", 5, 6, 5),  # Buffer larger than total available
962            ("readinto1", 6, 7, 5),
963            ("readinto1", 10, 0, 0),  # Empty buffer
964        )
965        UNUSED_BYTE = 0x81
966        for test in tests:
967            with self.subTest(test):
968                method, avail, request, result = test
969                reader = Reader(bytes(range(avail)))
970                buffer = bytearray((UNUSED_BYTE,) * request)
971                method = getattr(reader, method)
972                self.assertEqual(method(buffer), result)
973                self.assertEqual(len(buffer), request)
974                self.assertSequenceEqual(buffer[:result], range(result))
975                unused = (UNUSED_BYTE,) * (request - result)
976                self.assertSequenceEqual(buffer[result:], unused)
977                self.assertEqual(len(reader.avail), avail - result)
978
979    def test_close_assert(self):
980        class R(self.IOBase):
981            def __setattr__(self, name, value):
982                pass
983            def flush(self):
984                raise OSError()
985        f = R()
986        # This would cause an assertion failure.
987        self.assertRaises(OSError, f.close)
988
989
990class CIOTest(IOTest):
991
992    def test_IOBase_finalize(self):
993        # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
994        # class which inherits IOBase and an object of this class are caught
995        # in a reference cycle and close() is already in the method cache.
996        class MyIO(self.IOBase):
997            def close(self):
998                pass
999
1000        # create an instance to populate the method cache
1001        MyIO()
1002        obj = MyIO()
1003        obj.obj = obj
1004        wr = weakref.ref(obj)
1005        del MyIO
1006        del obj
1007        support.gc_collect()
1008        self.assertIsNone(wr(), wr)
1009
1010class PyIOTest(IOTest):
1011    pass
1012
1013
1014@support.cpython_only
1015class APIMismatchTest(unittest.TestCase):
1016
1017    def test_RawIOBase_io_in_pyio_match(self):
1018        """Test that pyio RawIOBase class has all c RawIOBase methods"""
1019        mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1020                                               ignore=('__weakref__',))
1021        self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1022
1023    def test_RawIOBase_pyio_in_io_match(self):
1024        """Test that c RawIOBase class has all pyio RawIOBase methods"""
1025        mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1026        self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1027
1028
1029class CommonBufferedTests:
1030    # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1031
1032    def test_detach(self):
1033        raw = self.MockRawIO()
1034        buf = self.tp(raw)
1035        self.assertIs(buf.detach(), raw)
1036        self.assertRaises(ValueError, buf.detach)
1037
1038        repr(buf)  # Should still work
1039
1040    def test_fileno(self):
1041        rawio = self.MockRawIO()
1042        bufio = self.tp(rawio)
1043
1044        self.assertEqual(42, bufio.fileno())
1045
1046    def test_invalid_args(self):
1047        rawio = self.MockRawIO()
1048        bufio = self.tp(rawio)
1049        # Invalid whence
1050        self.assertRaises(ValueError, bufio.seek, 0, -1)
1051        self.assertRaises(ValueError, bufio.seek, 0, 9)
1052
1053    def test_override_destructor(self):
1054        tp = self.tp
1055        record = []
1056        class MyBufferedIO(tp):
1057            def __del__(self):
1058                record.append(1)
1059                try:
1060                    f = super().__del__
1061                except AttributeError:
1062                    pass
1063                else:
1064                    f()
1065            def close(self):
1066                record.append(2)
1067                super().close()
1068            def flush(self):
1069                record.append(3)
1070                super().flush()
1071        rawio = self.MockRawIO()
1072        bufio = MyBufferedIO(rawio)
1073        del bufio
1074        support.gc_collect()
1075        self.assertEqual(record, [1, 2, 3])
1076
1077    def test_context_manager(self):
1078        # Test usability as a context manager
1079        rawio = self.MockRawIO()
1080        bufio = self.tp(rawio)
1081        def _with():
1082            with bufio:
1083                pass
1084        _with()
1085        # bufio should now be closed, and using it a second time should raise
1086        # a ValueError.
1087        self.assertRaises(ValueError, _with)
1088
1089    def test_error_through_destructor(self):
1090        # Test that the exception state is not modified by a destructor,
1091        # even if close() fails.
1092        rawio = self.CloseFailureIO()
1093        def f():
1094            self.tp(rawio).xyzzy
1095        with support.captured_output("stderr") as s:
1096            self.assertRaises(AttributeError, f)
1097        s = s.getvalue().strip()
1098        if s:
1099            # The destructor *may* have printed an unraisable error, check it
1100            self.assertEqual(len(s.splitlines()), 1)
1101            self.assertTrue(s.startswith("Exception OSError: "), s)
1102            self.assertTrue(s.endswith(" ignored"), s)
1103
1104    def test_repr(self):
1105        raw = self.MockRawIO()
1106        b = self.tp(raw)
1107        clsname = "%s.%s" % (self.tp.__module__, self.tp.__qualname__)
1108        self.assertEqual(repr(b), "<%s>" % clsname)
1109        raw.name = "dummy"
1110        self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
1111        raw.name = b"dummy"
1112        self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
1113
1114    def test_recursive_repr(self):
1115        # Issue #25455
1116        raw = self.MockRawIO()
1117        b = self.tp(raw)
1118        with support.swap_attr(raw, 'name', b):
1119            try:
1120                repr(b)  # Should not crash
1121            except RuntimeError:
1122                pass
1123
1124    def test_flush_error_on_close(self):
1125        # Test that buffered file is closed despite failed flush
1126        # and that flush() is called before file closed.
1127        raw = self.MockRawIO()
1128        closed = []
1129        def bad_flush():
1130            closed[:] = [b.closed, raw.closed]
1131            raise OSError()
1132        raw.flush = bad_flush
1133        b = self.tp(raw)
1134        self.assertRaises(OSError, b.close) # exception not swallowed
1135        self.assertTrue(b.closed)
1136        self.assertTrue(raw.closed)
1137        self.assertTrue(closed)      # flush() called
1138        self.assertFalse(closed[0])  # flush() called before file closed
1139        self.assertFalse(closed[1])
1140        raw.flush = lambda: None  # break reference loop
1141
1142    def test_close_error_on_close(self):
1143        raw = self.MockRawIO()
1144        def bad_flush():
1145            raise OSError('flush')
1146        def bad_close():
1147            raise OSError('close')
1148        raw.close = bad_close
1149        b = self.tp(raw)
1150        b.flush = bad_flush
1151        with self.assertRaises(OSError) as err: # exception not swallowed
1152            b.close()
1153        self.assertEqual(err.exception.args, ('close',))
1154        self.assertIsInstance(err.exception.__context__, OSError)
1155        self.assertEqual(err.exception.__context__.args, ('flush',))
1156        self.assertFalse(b.closed)
1157
1158    def test_nonnormalized_close_error_on_close(self):
1159        # Issue #21677
1160        raw = self.MockRawIO()
1161        def bad_flush():
1162            raise non_existing_flush
1163        def bad_close():
1164            raise non_existing_close
1165        raw.close = bad_close
1166        b = self.tp(raw)
1167        b.flush = bad_flush
1168        with self.assertRaises(NameError) as err: # exception not swallowed
1169            b.close()
1170        self.assertIn('non_existing_close', str(err.exception))
1171        self.assertIsInstance(err.exception.__context__, NameError)
1172        self.assertIn('non_existing_flush', str(err.exception.__context__))
1173        self.assertFalse(b.closed)
1174
1175    def test_multi_close(self):
1176        raw = self.MockRawIO()
1177        b = self.tp(raw)
1178        b.close()
1179        b.close()
1180        b.close()
1181        self.assertRaises(ValueError, b.flush)
1182
1183    def test_unseekable(self):
1184        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1185        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1186        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1187
1188    def test_readonly_attributes(self):
1189        raw = self.MockRawIO()
1190        buf = self.tp(raw)
1191        x = self.MockRawIO()
1192        with self.assertRaises(AttributeError):
1193            buf.raw = x
1194
1195
1196class SizeofTest:
1197
1198    @support.cpython_only
1199    def test_sizeof(self):
1200        bufsize1 = 4096
1201        bufsize2 = 8192
1202        rawio = self.MockRawIO()
1203        bufio = self.tp(rawio, buffer_size=bufsize1)
1204        size = sys.getsizeof(bufio) - bufsize1
1205        rawio = self.MockRawIO()
1206        bufio = self.tp(rawio, buffer_size=bufsize2)
1207        self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1208
1209    @support.cpython_only
1210    def test_buffer_freeing(self) :
1211        bufsize = 4096
1212        rawio = self.MockRawIO()
1213        bufio = self.tp(rawio, buffer_size=bufsize)
1214        size = sys.getsizeof(bufio) - bufsize
1215        bufio.close()
1216        self.assertEqual(sys.getsizeof(bufio), size)
1217
1218class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1219    read_mode = "rb"
1220
1221    def test_constructor(self):
1222        rawio = self.MockRawIO([b"abc"])
1223        bufio = self.tp(rawio)
1224        bufio.__init__(rawio)
1225        bufio.__init__(rawio, buffer_size=1024)
1226        bufio.__init__(rawio, buffer_size=16)
1227        self.assertEqual(b"abc", bufio.read())
1228        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1229        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1230        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1231        rawio = self.MockRawIO([b"abc"])
1232        bufio.__init__(rawio)
1233        self.assertEqual(b"abc", bufio.read())
1234
1235    def test_uninitialized(self):
1236        bufio = self.tp.__new__(self.tp)
1237        del bufio
1238        bufio = self.tp.__new__(self.tp)
1239        self.assertRaisesRegex((ValueError, AttributeError),
1240                               'uninitialized|has no attribute',
1241                               bufio.read, 0)
1242        bufio.__init__(self.MockRawIO())
1243        self.assertEqual(bufio.read(0), b'')
1244
1245    def test_read(self):
1246        for arg in (None, 7):
1247            rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1248            bufio = self.tp(rawio)
1249            self.assertEqual(b"abcdefg", bufio.read(arg))
1250        # Invalid args
1251        self.assertRaises(ValueError, bufio.read, -2)
1252
1253    def test_read1(self):
1254        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1255        bufio = self.tp(rawio)
1256        self.assertEqual(b"a", bufio.read(1))
1257        self.assertEqual(b"b", bufio.read1(1))
1258        self.assertEqual(rawio._reads, 1)
1259        self.assertEqual(b"", bufio.read1(0))
1260        self.assertEqual(b"c", bufio.read1(100))
1261        self.assertEqual(rawio._reads, 1)
1262        self.assertEqual(b"d", bufio.read1(100))
1263        self.assertEqual(rawio._reads, 2)
1264        self.assertEqual(b"efg", bufio.read1(100))
1265        self.assertEqual(rawio._reads, 3)
1266        self.assertEqual(b"", bufio.read1(100))
1267        self.assertEqual(rawio._reads, 4)
1268
1269    def test_read1_arbitrary(self):
1270        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1271        bufio = self.tp(rawio)
1272        self.assertEqual(b"a", bufio.read(1))
1273        self.assertEqual(b"bc", bufio.read1())
1274        self.assertEqual(b"d", bufio.read1())
1275        self.assertEqual(b"efg", bufio.read1(-1))
1276        self.assertEqual(rawio._reads, 3)
1277        self.assertEqual(b"", bufio.read1())
1278        self.assertEqual(rawio._reads, 4)
1279
1280    def test_readinto(self):
1281        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1282        bufio = self.tp(rawio)
1283        b = bytearray(2)
1284        self.assertEqual(bufio.readinto(b), 2)
1285        self.assertEqual(b, b"ab")
1286        self.assertEqual(bufio.readinto(b), 2)
1287        self.assertEqual(b, b"cd")
1288        self.assertEqual(bufio.readinto(b), 2)
1289        self.assertEqual(b, b"ef")
1290        self.assertEqual(bufio.readinto(b), 1)
1291        self.assertEqual(b, b"gf")
1292        self.assertEqual(bufio.readinto(b), 0)
1293        self.assertEqual(b, b"gf")
1294        rawio = self.MockRawIO((b"abc", None))
1295        bufio = self.tp(rawio)
1296        self.assertEqual(bufio.readinto(b), 2)
1297        self.assertEqual(b, b"ab")
1298        self.assertEqual(bufio.readinto(b), 1)
1299        self.assertEqual(b, b"cb")
1300
1301    def test_readinto1(self):
1302        buffer_size = 10
1303        rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1304        bufio = self.tp(rawio, buffer_size=buffer_size)
1305        b = bytearray(2)
1306        self.assertEqual(bufio.peek(3), b'abc')
1307        self.assertEqual(rawio._reads, 1)
1308        self.assertEqual(bufio.readinto1(b), 2)
1309        self.assertEqual(b, b"ab")
1310        self.assertEqual(rawio._reads, 1)
1311        self.assertEqual(bufio.readinto1(b), 1)
1312        self.assertEqual(b[:1], b"c")
1313        self.assertEqual(rawio._reads, 1)
1314        self.assertEqual(bufio.readinto1(b), 2)
1315        self.assertEqual(b, b"de")
1316        self.assertEqual(rawio._reads, 2)
1317        b = bytearray(2*buffer_size)
1318        self.assertEqual(bufio.peek(3), b'fgh')
1319        self.assertEqual(rawio._reads, 3)
1320        self.assertEqual(bufio.readinto1(b), 6)
1321        self.assertEqual(b[:6], b"fghjkl")
1322        self.assertEqual(rawio._reads, 4)
1323
1324    def test_readinto_array(self):
1325        buffer_size = 60
1326        data = b"a" * 26
1327        rawio = self.MockRawIO((data,))
1328        bufio = self.tp(rawio, buffer_size=buffer_size)
1329
1330        # Create an array with element size > 1 byte
1331        b = array.array('i', b'x' * 32)
1332        assert len(b) != 16
1333
1334        # Read into it. We should get as many *bytes* as we can fit into b
1335        # (which is more than the number of elements)
1336        n = bufio.readinto(b)
1337        self.assertGreater(n, len(b))
1338
1339        # Check that old contents of b are preserved
1340        bm = memoryview(b).cast('B')
1341        self.assertLess(n, len(bm))
1342        self.assertEqual(bm[:n], data[:n])
1343        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1344
1345    def test_readinto1_array(self):
1346        buffer_size = 60
1347        data = b"a" * 26
1348        rawio = self.MockRawIO((data,))
1349        bufio = self.tp(rawio, buffer_size=buffer_size)
1350
1351        # Create an array with element size > 1 byte
1352        b = array.array('i', b'x' * 32)
1353        assert len(b) != 16
1354
1355        # Read into it. We should get as many *bytes* as we can fit into b
1356        # (which is more than the number of elements)
1357        n = bufio.readinto1(b)
1358        self.assertGreater(n, len(b))
1359
1360        # Check that old contents of b are preserved
1361        bm = memoryview(b).cast('B')
1362        self.assertLess(n, len(bm))
1363        self.assertEqual(bm[:n], data[:n])
1364        self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1365
1366    def test_readlines(self):
1367        def bufio():
1368            rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1369            return self.tp(rawio)
1370        self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1371        self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1372        self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
1373
1374    def test_buffering(self):
1375        data = b"abcdefghi"
1376        dlen = len(data)
1377
1378        tests = [
1379            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1380            [ 100, [ 3, 3, 3],     [ dlen ]    ],
1381            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1382        ]
1383
1384        for bufsize, buf_read_sizes, raw_read_sizes in tests:
1385            rawio = self.MockFileIO(data)
1386            bufio = self.tp(rawio, buffer_size=bufsize)
1387            pos = 0
1388            for nbytes in buf_read_sizes:
1389                self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
1390                pos += nbytes
1391            # this is mildly implementation-dependent
1392            self.assertEqual(rawio.read_history, raw_read_sizes)
1393
1394    def test_read_non_blocking(self):
1395        # Inject some None's in there to simulate EWOULDBLOCK
1396        rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1397        bufio = self.tp(rawio)
1398        self.assertEqual(b"abcd", bufio.read(6))
1399        self.assertEqual(b"e", bufio.read(1))
1400        self.assertEqual(b"fg", bufio.read())
1401        self.assertEqual(b"", bufio.peek(1))
1402        self.assertIsNone(bufio.read())
1403        self.assertEqual(b"", bufio.read())
1404
1405        rawio = self.MockRawIO((b"a", None, None))
1406        self.assertEqual(b"a", rawio.readall())
1407        self.assertIsNone(rawio.readall())
1408
1409    def test_read_past_eof(self):
1410        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1411        bufio = self.tp(rawio)
1412
1413        self.assertEqual(b"abcdefg", bufio.read(9000))
1414
1415    def test_read_all(self):
1416        rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1417        bufio = self.tp(rawio)
1418
1419        self.assertEqual(b"abcdefg", bufio.read())
1420
1421    @support.requires_resource('cpu')
1422    def test_threads(self):
1423        try:
1424            # Write out many bytes with exactly the same number of 0's,
1425            # 1's... 255's. This will help us check that concurrent reading
1426            # doesn't duplicate or forget contents.
1427            N = 1000
1428            l = list(range(256)) * N
1429            random.shuffle(l)
1430            s = bytes(bytearray(l))
1431            with self.open(support.TESTFN, "wb") as f:
1432                f.write(s)
1433            with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
1434                bufio = self.tp(raw, 8)
1435                errors = []
1436                results = []
1437                def f():
1438                    try:
1439                        # Intra-buffer read then buffer-flushing read
1440                        for n in cycle([1, 19]):
1441                            s = bufio.read(n)
1442                            if not s:
1443                                break
1444                            # list.append() is atomic
1445                            results.append(s)
1446                    except Exception as e:
1447                        errors.append(e)
1448                        raise
1449                threads = [threading.Thread(target=f) for x in range(20)]
1450                with support.start_threads(threads):
1451                    time.sleep(0.02) # yield
1452                self.assertFalse(errors,
1453                    "the following exceptions were caught: %r" % errors)
1454                s = b''.join(results)
1455                for i in range(256):
1456                    c = bytes(bytearray([i]))
1457                    self.assertEqual(s.count(c), N)
1458        finally:
1459            support.unlink(support.TESTFN)
1460
1461    def test_unseekable(self):
1462        bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1463        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1464        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1465        bufio.read(1)
1466        self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1467        self.assertRaises(self.UnsupportedOperation, bufio.tell)
1468
1469    def test_misbehaved_io(self):
1470        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1471        bufio = self.tp(rawio)
1472        self.assertRaises(OSError, bufio.seek, 0)
1473        self.assertRaises(OSError, bufio.tell)
1474
1475    def test_no_extraneous_read(self):
1476        # Issue #9550; when the raw IO object has satisfied the read request,
1477        # we should not issue any additional reads, otherwise it may block
1478        # (e.g. socket).
1479        bufsize = 16
1480        for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1481            rawio = self.MockRawIO([b"x" * n])
1482            bufio = self.tp(rawio, bufsize)
1483            self.assertEqual(bufio.read(n), b"x" * n)
1484            # Simple case: one raw read is enough to satisfy the request.
1485            self.assertEqual(rawio._extraneous_reads, 0,
1486                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1487            # A more complex case where two raw reads are needed to satisfy
1488            # the request.
1489            rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1490            bufio = self.tp(rawio, bufsize)
1491            self.assertEqual(bufio.read(n), b"x" * n)
1492            self.assertEqual(rawio._extraneous_reads, 0,
1493                             "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1494
1495    def test_read_on_closed(self):
1496        # Issue #23796
1497        b = io.BufferedReader(io.BytesIO(b"12"))
1498        b.read(1)
1499        b.close()
1500        self.assertRaises(ValueError, b.peek)
1501        self.assertRaises(ValueError, b.read1, 1)
1502
1503
1504class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
1505    tp = io.BufferedReader
1506
1507    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1508                     "instead of returning NULL for malloc failure.")
1509    def test_constructor(self):
1510        BufferedReaderTest.test_constructor(self)
1511        # The allocation can succeed on 32-bit builds, e.g. with more
1512        # than 2 GiB RAM and a 64-bit kernel.
1513        if sys.maxsize > 0x7FFFFFFF:
1514            rawio = self.MockRawIO()
1515            bufio = self.tp(rawio)
1516            self.assertRaises((OverflowError, MemoryError, ValueError),
1517                bufio.__init__, rawio, sys.maxsize)
1518
1519    def test_initialization(self):
1520        rawio = self.MockRawIO([b"abc"])
1521        bufio = self.tp(rawio)
1522        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1523        self.assertRaises(ValueError, bufio.read)
1524        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1525        self.assertRaises(ValueError, bufio.read)
1526        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1527        self.assertRaises(ValueError, bufio.read)
1528
1529    def test_misbehaved_io_read(self):
1530        rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1531        bufio = self.tp(rawio)
1532        # _pyio.BufferedReader seems to implement reading different, so that
1533        # checking this is not so easy.
1534        self.assertRaises(OSError, bufio.read, 10)
1535
1536    def test_garbage_collection(self):
1537        # C BufferedReader objects are collected.
1538        # The Python version has __del__, so it ends into gc.garbage instead
1539        self.addCleanup(support.unlink, support.TESTFN)
1540        with support.check_warnings(('', ResourceWarning)):
1541            rawio = self.FileIO(support.TESTFN, "w+b")
1542            f = self.tp(rawio)
1543            f.f = f
1544            wr = weakref.ref(f)
1545            del f
1546            support.gc_collect()
1547        self.assertIsNone(wr(), wr)
1548
1549    def test_args_error(self):
1550        # Issue #17275
1551        with self.assertRaisesRegex(TypeError, "BufferedReader"):
1552            self.tp(io.BytesIO(), 1024, 1024, 1024)
1553
1554
1555class PyBufferedReaderTest(BufferedReaderTest):
1556    tp = pyio.BufferedReader
1557
1558
1559class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1560    write_mode = "wb"
1561
1562    def test_constructor(self):
1563        rawio = self.MockRawIO()
1564        bufio = self.tp(rawio)
1565        bufio.__init__(rawio)
1566        bufio.__init__(rawio, buffer_size=1024)
1567        bufio.__init__(rawio, buffer_size=16)
1568        self.assertEqual(3, bufio.write(b"abc"))
1569        bufio.flush()
1570        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1571        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1572        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1573        bufio.__init__(rawio)
1574        self.assertEqual(3, bufio.write(b"ghi"))
1575        bufio.flush()
1576        self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
1577
1578    def test_uninitialized(self):
1579        bufio = self.tp.__new__(self.tp)
1580        del bufio
1581        bufio = self.tp.__new__(self.tp)
1582        self.assertRaisesRegex((ValueError, AttributeError),
1583                               'uninitialized|has no attribute',
1584                               bufio.write, b'')
1585        bufio.__init__(self.MockRawIO())
1586        self.assertEqual(bufio.write(b''), 0)
1587
1588    def test_detach_flush(self):
1589        raw = self.MockRawIO()
1590        buf = self.tp(raw)
1591        buf.write(b"howdy!")
1592        self.assertFalse(raw._write_stack)
1593        buf.detach()
1594        self.assertEqual(raw._write_stack, [b"howdy!"])
1595
1596    def test_write(self):
1597        # Write to the buffered IO but don't overflow the buffer.
1598        writer = self.MockRawIO()
1599        bufio = self.tp(writer, 8)
1600        bufio.write(b"abc")
1601        self.assertFalse(writer._write_stack)
1602        buffer = bytearray(b"def")
1603        bufio.write(buffer)
1604        buffer[:] = b"***"  # Overwrite our copy of the data
1605        bufio.flush()
1606        self.assertEqual(b"".join(writer._write_stack), b"abcdef")
1607
1608    def test_write_overflow(self):
1609        writer = self.MockRawIO()
1610        bufio = self.tp(writer, 8)
1611        contents = b"abcdefghijklmnop"
1612        for n in range(0, len(contents), 3):
1613            bufio.write(contents[n:n+3])
1614        flushed = b"".join(writer._write_stack)
1615        # At least (total - 8) bytes were implicitly flushed, perhaps more
1616        # depending on the implementation.
1617        self.assertTrue(flushed.startswith(contents[:-8]), flushed)
1618
1619    def check_writes(self, intermediate_func):
1620        # Lots of writes, test the flushed output is as expected.
1621        contents = bytes(range(256)) * 1000
1622        n = 0
1623        writer = self.MockRawIO()
1624        bufio = self.tp(writer, 13)
1625        # Generator of write sizes: repeat each N 15 times then proceed to N+1
1626        def gen_sizes():
1627            for size in count(1):
1628                for i in range(15):
1629                    yield size
1630        sizes = gen_sizes()
1631        while n < len(contents):
1632            size = min(next(sizes), len(contents) - n)
1633            self.assertEqual(bufio.write(contents[n:n+size]), size)
1634            intermediate_func(bufio)
1635            n += size
1636        bufio.flush()
1637        self.assertEqual(contents, b"".join(writer._write_stack))
1638
1639    def test_writes(self):
1640        self.check_writes(lambda bufio: None)
1641
1642    def test_writes_and_flushes(self):
1643        self.check_writes(lambda bufio: bufio.flush())
1644
1645    def test_writes_and_seeks(self):
1646        def _seekabs(bufio):
1647            pos = bufio.tell()
1648            bufio.seek(pos + 1, 0)
1649            bufio.seek(pos - 1, 0)
1650            bufio.seek(pos, 0)
1651        self.check_writes(_seekabs)
1652        def _seekrel(bufio):
1653            pos = bufio.seek(0, 1)
1654            bufio.seek(+1, 1)
1655            bufio.seek(-1, 1)
1656            bufio.seek(pos, 0)
1657        self.check_writes(_seekrel)
1658
1659    def test_writes_and_truncates(self):
1660        self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
1661
1662    def test_write_non_blocking(self):
1663        raw = self.MockNonBlockWriterIO()
1664        bufio = self.tp(raw, 8)
1665
1666        self.assertEqual(bufio.write(b"abcd"), 4)
1667        self.assertEqual(bufio.write(b"efghi"), 5)
1668        # 1 byte will be written, the rest will be buffered
1669        raw.block_on(b"k")
1670        self.assertEqual(bufio.write(b"jklmn"), 5)
1671
1672        # 8 bytes will be written, 8 will be buffered and the rest will be lost
1673        raw.block_on(b"0")
1674        try:
1675            bufio.write(b"opqrwxyz0123456789")
1676        except self.BlockingIOError as e:
1677            written = e.characters_written
1678        else:
1679            self.fail("BlockingIOError should have been raised")
1680        self.assertEqual(written, 16)
1681        self.assertEqual(raw.pop_written(),
1682            b"abcdefghijklmnopqrwxyz")
1683
1684        self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
1685        s = raw.pop_written()
1686        # Previously buffered bytes were flushed
1687        self.assertTrue(s.startswith(b"01234567A"), s)
1688
1689    def test_write_and_rewind(self):
1690        raw = io.BytesIO()
1691        bufio = self.tp(raw, 4)
1692        self.assertEqual(bufio.write(b"abcdef"), 6)
1693        self.assertEqual(bufio.tell(), 6)
1694        bufio.seek(0, 0)
1695        self.assertEqual(bufio.write(b"XY"), 2)
1696        bufio.seek(6, 0)
1697        self.assertEqual(raw.getvalue(), b"XYcdef")
1698        self.assertEqual(bufio.write(b"123456"), 6)
1699        bufio.flush()
1700        self.assertEqual(raw.getvalue(), b"XYcdef123456")
1701
1702    def test_flush(self):
1703        writer = self.MockRawIO()
1704        bufio = self.tp(writer, 8)
1705        bufio.write(b"abc")
1706        bufio.flush()
1707        self.assertEqual(b"abc", writer._write_stack[0])
1708
1709    def test_writelines(self):
1710        l = [b'ab', b'cd', b'ef']
1711        writer = self.MockRawIO()
1712        bufio = self.tp(writer, 8)
1713        bufio.writelines(l)
1714        bufio.flush()
1715        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1716
1717    def test_writelines_userlist(self):
1718        l = UserList([b'ab', b'cd', b'ef'])
1719        writer = self.MockRawIO()
1720        bufio = self.tp(writer, 8)
1721        bufio.writelines(l)
1722        bufio.flush()
1723        self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1724
1725    def test_writelines_error(self):
1726        writer = self.MockRawIO()
1727        bufio = self.tp(writer, 8)
1728        self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1729        self.assertRaises(TypeError, bufio.writelines, None)
1730        self.assertRaises(TypeError, bufio.writelines, 'abc')
1731
1732    def test_destructor(self):
1733        writer = self.MockRawIO()
1734        bufio = self.tp(writer, 8)
1735        bufio.write(b"abc")
1736        del bufio
1737        support.gc_collect()
1738        self.assertEqual(b"abc", writer._write_stack[0])
1739
1740    def test_truncate(self):
1741        # Truncate implicitly flushes the buffer.
1742        self.addCleanup(support.unlink, support.TESTFN)
1743        with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1744            bufio = self.tp(raw, 8)
1745            bufio.write(b"abcdef")
1746            self.assertEqual(bufio.truncate(3), 3)
1747            self.assertEqual(bufio.tell(), 6)
1748        with self.open(support.TESTFN, "rb", buffering=0) as f:
1749            self.assertEqual(f.read(), b"abc")
1750
1751    def test_truncate_after_write(self):
1752        # Ensure that truncate preserves the file position after
1753        # writes longer than the buffer size.
1754        # Issue: https://bugs.python.org/issue32228
1755        self.addCleanup(support.unlink, support.TESTFN)
1756        with self.open(support.TESTFN, "wb") as f:
1757            # Fill with some buffer
1758            f.write(b'\x00' * 10000)
1759        buffer_sizes = [8192, 4096, 200]
1760        for buffer_size in buffer_sizes:
1761            with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1762                f.write(b'\x00' * (buffer_size + 1))
1763                # After write write_pos and write_end are set to 0
1764                f.read(1)
1765                # read operation makes sure that pos != raw_pos
1766                f.truncate()
1767                self.assertEqual(f.tell(), buffer_size + 2)
1768
1769    @support.requires_resource('cpu')
1770    def test_threads(self):
1771        try:
1772            # Write out many bytes from many threads and test they were
1773            # all flushed.
1774            N = 1000
1775            contents = bytes(range(256)) * N
1776            sizes = cycle([1, 19])
1777            n = 0
1778            queue = deque()
1779            while n < len(contents):
1780                size = next(sizes)
1781                queue.append(contents[n:n+size])
1782                n += size
1783            del contents
1784            # We use a real file object because it allows us to
1785            # exercise situations where the GIL is released before
1786            # writing the buffer to the raw streams. This is in addition
1787            # to concurrency issues due to switching threads in the middle
1788            # of Python code.
1789            with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
1790                bufio = self.tp(raw, 8)
1791                errors = []
1792                def f():
1793                    try:
1794                        while True:
1795                            try:
1796                                s = queue.popleft()
1797                            except IndexError:
1798                                return
1799                            bufio.write(s)
1800                    except Exception as e:
1801                        errors.append(e)
1802                        raise
1803                threads = [threading.Thread(target=f) for x in range(20)]
1804                with support.start_threads(threads):
1805                    time.sleep(0.02) # yield
1806                self.assertFalse(errors,
1807                    "the following exceptions were caught: %r" % errors)
1808                bufio.close()
1809            with self.open(support.TESTFN, "rb") as f:
1810                s = f.read()
1811            for i in range(256):
1812                self.assertEqual(s.count(bytes([i])), N)
1813        finally:
1814            support.unlink(support.TESTFN)
1815
1816    def test_misbehaved_io(self):
1817        rawio = self.MisbehavedRawIO()
1818        bufio = self.tp(rawio, 5)
1819        self.assertRaises(OSError, bufio.seek, 0)
1820        self.assertRaises(OSError, bufio.tell)
1821        self.assertRaises(OSError, bufio.write, b"abcdef")
1822
1823    def test_max_buffer_size_removal(self):
1824        with self.assertRaises(TypeError):
1825            self.tp(self.MockRawIO(), 8, 12)
1826
1827    def test_write_error_on_close(self):
1828        raw = self.MockRawIO()
1829        def bad_write(b):
1830            raise OSError()
1831        raw.write = bad_write
1832        b = self.tp(raw)
1833        b.write(b'spam')
1834        self.assertRaises(OSError, b.close) # exception not swallowed
1835        self.assertTrue(b.closed)
1836
1837    def test_slow_close_from_thread(self):
1838        # Issue #31976
1839        rawio = self.SlowFlushRawIO()
1840        bufio = self.tp(rawio, 8)
1841        t = threading.Thread(target=bufio.close)
1842        t.start()
1843        rawio.in_flush.wait()
1844        self.assertRaises(ValueError, bufio.write, b'spam')
1845        self.assertTrue(bufio.closed)
1846        t.join()
1847
1848
1849
1850class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
1851    tp = io.BufferedWriter
1852
1853    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1854                     "instead of returning NULL for malloc failure.")
1855    def test_constructor(self):
1856        BufferedWriterTest.test_constructor(self)
1857        # The allocation can succeed on 32-bit builds, e.g. with more
1858        # than 2 GiB RAM and a 64-bit kernel.
1859        if sys.maxsize > 0x7FFFFFFF:
1860            rawio = self.MockRawIO()
1861            bufio = self.tp(rawio)
1862            self.assertRaises((OverflowError, MemoryError, ValueError),
1863                bufio.__init__, rawio, sys.maxsize)
1864
1865    def test_initialization(self):
1866        rawio = self.MockRawIO()
1867        bufio = self.tp(rawio)
1868        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1869        self.assertRaises(ValueError, bufio.write, b"def")
1870        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1871        self.assertRaises(ValueError, bufio.write, b"def")
1872        self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1873        self.assertRaises(ValueError, bufio.write, b"def")
1874
1875    def test_garbage_collection(self):
1876        # C BufferedWriter objects are collected, and collecting them flushes
1877        # all data to disk.
1878        # The Python version has __del__, so it ends into gc.garbage instead
1879        self.addCleanup(support.unlink, support.TESTFN)
1880        with support.check_warnings(('', ResourceWarning)):
1881            rawio = self.FileIO(support.TESTFN, "w+b")
1882            f = self.tp(rawio)
1883            f.write(b"123xxx")
1884            f.x = f
1885            wr = weakref.ref(f)
1886            del f
1887            support.gc_collect()
1888        self.assertIsNone(wr(), wr)
1889        with self.open(support.TESTFN, "rb") as f:
1890            self.assertEqual(f.read(), b"123xxx")
1891
1892    def test_args_error(self):
1893        # Issue #17275
1894        with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1895            self.tp(io.BytesIO(), 1024, 1024, 1024)
1896
1897
1898class PyBufferedWriterTest(BufferedWriterTest):
1899    tp = pyio.BufferedWriter
1900
1901class BufferedRWPairTest(unittest.TestCase):
1902
1903    def test_constructor(self):
1904        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1905        self.assertFalse(pair.closed)
1906
1907    def test_uninitialized(self):
1908        pair = self.tp.__new__(self.tp)
1909        del pair
1910        pair = self.tp.__new__(self.tp)
1911        self.assertRaisesRegex((ValueError, AttributeError),
1912                               'uninitialized|has no attribute',
1913                               pair.read, 0)
1914        self.assertRaisesRegex((ValueError, AttributeError),
1915                               'uninitialized|has no attribute',
1916                               pair.write, b'')
1917        pair.__init__(self.MockRawIO(), self.MockRawIO())
1918        self.assertEqual(pair.read(0), b'')
1919        self.assertEqual(pair.write(b''), 0)
1920
1921    def test_detach(self):
1922        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1923        self.assertRaises(self.UnsupportedOperation, pair.detach)
1924
1925    def test_constructor_max_buffer_size_removal(self):
1926        with self.assertRaises(TypeError):
1927            self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1928
1929    def test_constructor_with_not_readable(self):
1930        class NotReadable(MockRawIO):
1931            def readable(self):
1932                return False
1933
1934        self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
1935
1936    def test_constructor_with_not_writeable(self):
1937        class NotWriteable(MockRawIO):
1938            def writable(self):
1939                return False
1940
1941        self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
1942
1943    def test_read(self):
1944        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1945
1946        self.assertEqual(pair.read(3), b"abc")
1947        self.assertEqual(pair.read(1), b"d")
1948        self.assertEqual(pair.read(), b"ef")
1949        pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1950        self.assertEqual(pair.read(None), b"abc")
1951
1952    def test_readlines(self):
1953        pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1954        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1955        self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1956        self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
1957
1958    def test_read1(self):
1959        # .read1() is delegated to the underlying reader object, so this test
1960        # can be shallow.
1961        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1962
1963        self.assertEqual(pair.read1(3), b"abc")
1964        self.assertEqual(pair.read1(), b"def")
1965
1966    def test_readinto(self):
1967        for method in ("readinto", "readinto1"):
1968            with self.subTest(method):
1969                pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1970
1971                data = byteslike(b'\0' * 5)
1972                self.assertEqual(getattr(pair, method)(data), 5)
1973                self.assertEqual(bytes(data), b"abcde")
1974
1975    def test_write(self):
1976        w = self.MockRawIO()
1977        pair = self.tp(self.MockRawIO(), w)
1978
1979        pair.write(b"abc")
1980        pair.flush()
1981        buffer = bytearray(b"def")
1982        pair.write(buffer)
1983        buffer[:] = b"***"  # Overwrite our copy of the data
1984        pair.flush()
1985        self.assertEqual(w._write_stack, [b"abc", b"def"])
1986
1987    def test_peek(self):
1988        pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1989
1990        self.assertTrue(pair.peek(3).startswith(b"abc"))
1991        self.assertEqual(pair.read(3), b"abc")
1992
1993    def test_readable(self):
1994        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1995        self.assertTrue(pair.readable())
1996
1997    def test_writeable(self):
1998        pair = self.tp(self.MockRawIO(), self.MockRawIO())
1999        self.assertTrue(pair.writable())
2000
2001    def test_seekable(self):
2002        # BufferedRWPairs are never seekable, even if their readers and writers
2003        # are.
2004        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2005        self.assertFalse(pair.seekable())
2006
2007    # .flush() is delegated to the underlying writer object and has been
2008    # tested in the test_write method.
2009
2010    def test_close_and_closed(self):
2011        pair = self.tp(self.MockRawIO(), self.MockRawIO())
2012        self.assertFalse(pair.closed)
2013        pair.close()
2014        self.assertTrue(pair.closed)
2015
2016    def test_reader_close_error_on_close(self):
2017        def reader_close():
2018            reader_non_existing
2019        reader = self.MockRawIO()
2020        reader.close = reader_close
2021        writer = self.MockRawIO()
2022        pair = self.tp(reader, writer)
2023        with self.assertRaises(NameError) as err:
2024            pair.close()
2025        self.assertIn('reader_non_existing', str(err.exception))
2026        self.assertTrue(pair.closed)
2027        self.assertFalse(reader.closed)
2028        self.assertTrue(writer.closed)
2029
2030    def test_writer_close_error_on_close(self):
2031        def writer_close():
2032            writer_non_existing
2033        reader = self.MockRawIO()
2034        writer = self.MockRawIO()
2035        writer.close = writer_close
2036        pair = self.tp(reader, writer)
2037        with self.assertRaises(NameError) as err:
2038            pair.close()
2039        self.assertIn('writer_non_existing', str(err.exception))
2040        self.assertFalse(pair.closed)
2041        self.assertTrue(reader.closed)
2042        self.assertFalse(writer.closed)
2043
2044    def test_reader_writer_close_error_on_close(self):
2045        def reader_close():
2046            reader_non_existing
2047        def writer_close():
2048            writer_non_existing
2049        reader = self.MockRawIO()
2050        reader.close = reader_close
2051        writer = self.MockRawIO()
2052        writer.close = writer_close
2053        pair = self.tp(reader, writer)
2054        with self.assertRaises(NameError) as err:
2055            pair.close()
2056        self.assertIn('reader_non_existing', str(err.exception))
2057        self.assertIsInstance(err.exception.__context__, NameError)
2058        self.assertIn('writer_non_existing', str(err.exception.__context__))
2059        self.assertFalse(pair.closed)
2060        self.assertFalse(reader.closed)
2061        self.assertFalse(writer.closed)
2062
2063    def test_isatty(self):
2064        class SelectableIsAtty(MockRawIO):
2065            def __init__(self, isatty):
2066                MockRawIO.__init__(self)
2067                self._isatty = isatty
2068
2069            def isatty(self):
2070                return self._isatty
2071
2072        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2073        self.assertFalse(pair.isatty())
2074
2075        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2076        self.assertTrue(pair.isatty())
2077
2078        pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2079        self.assertTrue(pair.isatty())
2080
2081        pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2082        self.assertTrue(pair.isatty())
2083
2084    def test_weakref_clearing(self):
2085        brw = self.tp(self.MockRawIO(), self.MockRawIO())
2086        ref = weakref.ref(brw)
2087        brw = None
2088        ref = None # Shouldn't segfault.
2089
2090class CBufferedRWPairTest(BufferedRWPairTest):
2091    tp = io.BufferedRWPair
2092
2093class PyBufferedRWPairTest(BufferedRWPairTest):
2094    tp = pyio.BufferedRWPair
2095
2096
2097class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2098    read_mode = "rb+"
2099    write_mode = "wb+"
2100
2101    def test_constructor(self):
2102        BufferedReaderTest.test_constructor(self)
2103        BufferedWriterTest.test_constructor(self)
2104
2105    def test_uninitialized(self):
2106        BufferedReaderTest.test_uninitialized(self)
2107        BufferedWriterTest.test_uninitialized(self)
2108
2109    def test_read_and_write(self):
2110        raw = self.MockRawIO((b"asdf", b"ghjk"))
2111        rw = self.tp(raw, 8)
2112
2113        self.assertEqual(b"as", rw.read(2))
2114        rw.write(b"ddd")
2115        rw.write(b"eee")
2116        self.assertFalse(raw._write_stack) # Buffer writes
2117        self.assertEqual(b"ghjk", rw.read())
2118        self.assertEqual(b"dddeee", raw._write_stack[0])
2119
2120    def test_seek_and_tell(self):
2121        raw = self.BytesIO(b"asdfghjkl")
2122        rw = self.tp(raw)
2123
2124        self.assertEqual(b"as", rw.read(2))
2125        self.assertEqual(2, rw.tell())
2126        rw.seek(0, 0)
2127        self.assertEqual(b"asdf", rw.read(4))
2128
2129        rw.write(b"123f")
2130        rw.seek(0, 0)
2131        self.assertEqual(b"asdf123fl", rw.read())
2132        self.assertEqual(9, rw.tell())
2133        rw.seek(-4, 2)
2134        self.assertEqual(5, rw.tell())
2135        rw.seek(2, 1)
2136        self.assertEqual(7, rw.tell())
2137        self.assertEqual(b"fl", rw.read(11))
2138        rw.flush()
2139        self.assertEqual(b"asdf123fl", raw.getvalue())
2140
2141        self.assertRaises(TypeError, rw.seek, 0.0)
2142
2143    def check_flush_and_read(self, read_func):
2144        raw = self.BytesIO(b"abcdefghi")
2145        bufio = self.tp(raw)
2146
2147        self.assertEqual(b"ab", read_func(bufio, 2))
2148        bufio.write(b"12")
2149        self.assertEqual(b"ef", read_func(bufio, 2))
2150        self.assertEqual(6, bufio.tell())
2151        bufio.flush()
2152        self.assertEqual(6, bufio.tell())
2153        self.assertEqual(b"ghi", read_func(bufio))
2154        raw.seek(0, 0)
2155        raw.write(b"XYZ")
2156        # flush() resets the read buffer
2157        bufio.flush()
2158        bufio.seek(0, 0)
2159        self.assertEqual(b"XYZ", read_func(bufio, 3))
2160
2161    def test_flush_and_read(self):
2162        self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2163
2164    def test_flush_and_readinto(self):
2165        def _readinto(bufio, n=-1):
2166            b = bytearray(n if n >= 0 else 9999)
2167            n = bufio.readinto(b)
2168            return bytes(b[:n])
2169        self.check_flush_and_read(_readinto)
2170
2171    def test_flush_and_peek(self):
2172        def _peek(bufio, n=-1):
2173            # This relies on the fact that the buffer can contain the whole
2174            # raw stream, otherwise peek() can return less.
2175            b = bufio.peek(n)
2176            if n != -1:
2177                b = b[:n]
2178            bufio.seek(len(b), 1)
2179            return b
2180        self.check_flush_and_read(_peek)
2181
2182    def test_flush_and_write(self):
2183        raw = self.BytesIO(b"abcdefghi")
2184        bufio = self.tp(raw)
2185
2186        bufio.write(b"123")
2187        bufio.flush()
2188        bufio.write(b"45")
2189        bufio.flush()
2190        bufio.seek(0, 0)
2191        self.assertEqual(b"12345fghi", raw.getvalue())
2192        self.assertEqual(b"12345fghi", bufio.read())
2193
2194    def test_threads(self):
2195        BufferedReaderTest.test_threads(self)
2196        BufferedWriterTest.test_threads(self)
2197
2198    def test_writes_and_peek(self):
2199        def _peek(bufio):
2200            bufio.peek(1)
2201        self.check_writes(_peek)
2202        def _peek(bufio):
2203            pos = bufio.tell()
2204            bufio.seek(-1, 1)
2205            bufio.peek(1)
2206            bufio.seek(pos, 0)
2207        self.check_writes(_peek)
2208
2209    def test_writes_and_reads(self):
2210        def _read(bufio):
2211            bufio.seek(-1, 1)
2212            bufio.read(1)
2213        self.check_writes(_read)
2214
2215    def test_writes_and_read1s(self):
2216        def _read1(bufio):
2217            bufio.seek(-1, 1)
2218            bufio.read1(1)
2219        self.check_writes(_read1)
2220
2221    def test_writes_and_readintos(self):
2222        def _read(bufio):
2223            bufio.seek(-1, 1)
2224            bufio.readinto(bytearray(1))
2225        self.check_writes(_read)
2226
2227    def test_write_after_readahead(self):
2228        # Issue #6629: writing after the buffer was filled by readahead should
2229        # first rewind the raw stream.
2230        for overwrite_size in [1, 5]:
2231            raw = self.BytesIO(b"A" * 10)
2232            bufio = self.tp(raw, 4)
2233            # Trigger readahead
2234            self.assertEqual(bufio.read(1), b"A")
2235            self.assertEqual(bufio.tell(), 1)
2236            # Overwriting should rewind the raw stream if it needs so
2237            bufio.write(b"B" * overwrite_size)
2238            self.assertEqual(bufio.tell(), overwrite_size + 1)
2239            # If the write size was smaller than the buffer size, flush() and
2240            # check that rewind happens.
2241            bufio.flush()
2242            self.assertEqual(bufio.tell(), overwrite_size + 1)
2243            s = raw.getvalue()
2244            self.assertEqual(s,
2245                b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2246
2247    def test_write_rewind_write(self):
2248        # Various combinations of reading / writing / seeking backwards / writing again
2249        def mutate(bufio, pos1, pos2):
2250            assert pos2 >= pos1
2251            # Fill the buffer
2252            bufio.seek(pos1)
2253            bufio.read(pos2 - pos1)
2254            bufio.write(b'\x02')
2255            # This writes earlier than the previous write, but still inside
2256            # the buffer.
2257            bufio.seek(pos1)
2258            bufio.write(b'\x01')
2259
2260        b = b"\x80\x81\x82\x83\x84"
2261        for i in range(0, len(b)):
2262            for j in range(i, len(b)):
2263                raw = self.BytesIO(b)
2264                bufio = self.tp(raw, 100)
2265                mutate(bufio, i, j)
2266                bufio.flush()
2267                expected = bytearray(b)
2268                expected[j] = 2
2269                expected[i] = 1
2270                self.assertEqual(raw.getvalue(), expected,
2271                                 "failed result for i=%d, j=%d" % (i, j))
2272
2273    def test_truncate_after_read_or_write(self):
2274        raw = self.BytesIO(b"A" * 10)
2275        bufio = self.tp(raw, 100)
2276        self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2277        self.assertEqual(bufio.truncate(), 2)
2278        self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2279        self.assertEqual(bufio.truncate(), 4)
2280
2281    def test_misbehaved_io(self):
2282        BufferedReaderTest.test_misbehaved_io(self)
2283        BufferedWriterTest.test_misbehaved_io(self)
2284
2285    def test_interleaved_read_write(self):
2286        # Test for issue #12213
2287        with self.BytesIO(b'abcdefgh') as raw:
2288            with self.tp(raw, 100) as f:
2289                f.write(b"1")
2290                self.assertEqual(f.read(1), b'b')
2291                f.write(b'2')
2292                self.assertEqual(f.read1(1), b'd')
2293                f.write(b'3')
2294                buf = bytearray(1)
2295                f.readinto(buf)
2296                self.assertEqual(buf, b'f')
2297                f.write(b'4')
2298                self.assertEqual(f.peek(1), b'h')
2299                f.flush()
2300                self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2301
2302        with self.BytesIO(b'abc') as raw:
2303            with self.tp(raw, 100) as f:
2304                self.assertEqual(f.read(1), b'a')
2305                f.write(b"2")
2306                self.assertEqual(f.read(1), b'c')
2307                f.flush()
2308                self.assertEqual(raw.getvalue(), b'a2c')
2309
2310    def test_interleaved_readline_write(self):
2311        with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2312            with self.tp(raw) as f:
2313                f.write(b'1')
2314                self.assertEqual(f.readline(), b'b\n')
2315                f.write(b'2')
2316                self.assertEqual(f.readline(), b'def\n')
2317                f.write(b'3')
2318                self.assertEqual(f.readline(), b'\n')
2319                f.flush()
2320                self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2321
2322    # You can't construct a BufferedRandom over a non-seekable stream.
2323    test_unseekable = None
2324
2325
2326class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
2327    tp = io.BufferedRandom
2328
2329    @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2330                     "instead of returning NULL for malloc failure.")
2331    def test_constructor(self):
2332        BufferedRandomTest.test_constructor(self)
2333        # The allocation can succeed on 32-bit builds, e.g. with more
2334        # than 2 GiB RAM and a 64-bit kernel.
2335        if sys.maxsize > 0x7FFFFFFF:
2336            rawio = self.MockRawIO()
2337            bufio = self.tp(rawio)
2338            self.assertRaises((OverflowError, MemoryError, ValueError),
2339                bufio.__init__, rawio, sys.maxsize)
2340
2341    def test_garbage_collection(self):
2342        CBufferedReaderTest.test_garbage_collection(self)
2343        CBufferedWriterTest.test_garbage_collection(self)
2344
2345    def test_args_error(self):
2346        # Issue #17275
2347        with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2348            self.tp(io.BytesIO(), 1024, 1024, 1024)
2349
2350
2351class PyBufferedRandomTest(BufferedRandomTest):
2352    tp = pyio.BufferedRandom
2353
2354
2355# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2356# properties:
2357#   - A single output character can correspond to many bytes of input.
2358#   - The number of input bytes to complete the character can be
2359#     undetermined until the last input byte is received.
2360#   - The number of input bytes can vary depending on previous input.
2361#   - A single input byte can correspond to many characters of output.
2362#   - The number of output characters can be undetermined until the
2363#     last input byte is received.
2364#   - The number of output characters can vary depending on previous input.
2365
2366class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2367    """
2368    For testing seek/tell behavior with a stateful, buffering decoder.
2369
2370    Input is a sequence of words.  Words may be fixed-length (length set
2371    by input) or variable-length (period-terminated).  In variable-length
2372    mode, extra periods are ignored.  Possible words are:
2373      - 'i' followed by a number sets the input length, I (maximum 99).
2374        When I is set to 0, words are space-terminated.
2375      - 'o' followed by a number sets the output length, O (maximum 99).
2376      - Any other word is converted into a word followed by a period on
2377        the output.  The output word consists of the input word truncated
2378        or padded out with hyphens to make its length equal to O.  If O
2379        is 0, the word is output verbatim without truncating or padding.
2380    I and O are initially set to 1.  When I changes, any buffered input is
2381    re-scanned according to the new I.  EOF also terminates the last word.
2382    """
2383
2384    def __init__(self, errors='strict'):
2385        codecs.IncrementalDecoder.__init__(self, errors)
2386        self.reset()
2387
2388    def __repr__(self):
2389        return '<SID %x>' % id(self)
2390
2391    def reset(self):
2392        self.i = 1
2393        self.o = 1
2394        self.buffer = bytearray()
2395
2396    def getstate(self):
2397        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2398        return bytes(self.buffer), i*100 + o
2399
2400    def setstate(self, state):
2401        buffer, io = state
2402        self.buffer = bytearray(buffer)
2403        i, o = divmod(io, 100)
2404        self.i, self.o = i ^ 1, o ^ 1
2405
2406    def decode(self, input, final=False):
2407        output = ''
2408        for b in input:
2409            if self.i == 0: # variable-length, terminated with period
2410                if b == ord('.'):
2411                    if self.buffer:
2412                        output += self.process_word()
2413                else:
2414                    self.buffer.append(b)
2415            else: # fixed-length, terminate after self.i bytes
2416                self.buffer.append(b)
2417                if len(self.buffer) == self.i:
2418                    output += self.process_word()
2419        if final and self.buffer: # EOF terminates the last word
2420            output += self.process_word()
2421        return output
2422
2423    def process_word(self):
2424        output = ''
2425        if self.buffer[0] == ord('i'):
2426            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2427        elif self.buffer[0] == ord('o'):
2428            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2429        else:
2430            output = self.buffer.decode('ascii')
2431            if len(output) < self.o:
2432                output += '-'*self.o # pad out with hyphens
2433            if self.o:
2434                output = output[:self.o] # truncate to output length
2435            output += '.'
2436        self.buffer = bytearray()
2437        return output
2438
2439    codecEnabled = False
2440
2441    @classmethod
2442    def lookupTestDecoder(cls, name):
2443        if cls.codecEnabled and name == 'test_decoder':
2444            latin1 = codecs.lookup('latin-1')
2445            return codecs.CodecInfo(
2446                name='test_decoder', encode=latin1.encode, decode=None,
2447                incrementalencoder=None,
2448                streamreader=None, streamwriter=None,
2449                incrementaldecoder=cls)
2450
2451# Register the previous decoder for testing.
2452# Disabled by default, tests will enable it.
2453codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2454
2455
2456class StatefulIncrementalDecoderTest(unittest.TestCase):
2457    """
2458    Make sure the StatefulIncrementalDecoder actually works.
2459    """
2460
2461    test_cases = [
2462        # I=1, O=1 (fixed-length input == fixed-length output)
2463        (b'abcd', False, 'a.b.c.d.'),
2464        # I=0, O=0 (variable-length input, variable-length output)
2465        (b'oiabcd', True, 'abcd.'),
2466        # I=0, O=0 (should ignore extra periods)
2467        (b'oi...abcd...', True, 'abcd.'),
2468        # I=0, O=6 (variable-length input, fixed-length output)
2469        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2470        # I=2, O=6 (fixed-length input < fixed-length output)
2471        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
2472        # I=6, O=3 (fixed-length input > fixed-length output)
2473        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2474        # I=0, then 3; O=29, then 15 (with longer output)
2475        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2476         'a----------------------------.' +
2477         'b----------------------------.' +
2478         'cde--------------------------.' +
2479         'abcdefghijabcde.' +
2480         'a.b------------.' +
2481         '.c.------------.' +
2482         'd.e------------.' +
2483         'k--------------.' +
2484         'l--------------.' +
2485         'm--------------.')
2486    ]
2487
2488    def test_decoder(self):
2489        # Try a few one-shot test cases.
2490        for input, eof, output in self.test_cases:
2491            d = StatefulIncrementalDecoder()
2492            self.assertEqual(d.decode(input, eof), output)
2493
2494        # Also test an unfinished decode, followed by forcing EOF.
2495        d = StatefulIncrementalDecoder()
2496        self.assertEqual(d.decode(b'oiabcd'), '')
2497        self.assertEqual(d.decode(b'', 1), 'abcd.')
2498
2499class TextIOWrapperTest(unittest.TestCase):
2500
2501    def setUp(self):
2502        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2503        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
2504        support.unlink(support.TESTFN)
2505
2506    def tearDown(self):
2507        support.unlink(support.TESTFN)
2508
2509    def test_constructor(self):
2510        r = self.BytesIO(b"\xc3\xa9\n\n")
2511        b = self.BufferedReader(r, 1000)
2512        t = self.TextIOWrapper(b)
2513        t.__init__(b, encoding="latin-1", newline="\r\n")
2514        self.assertEqual(t.encoding, "latin-1")
2515        self.assertEqual(t.line_buffering, False)
2516        t.__init__(b, encoding="utf-8", line_buffering=True)
2517        self.assertEqual(t.encoding, "utf-8")
2518        self.assertEqual(t.line_buffering, True)
2519        self.assertEqual("\xe9\n", t.readline())
2520        self.assertRaises(TypeError, t.__init__, b, newline=42)
2521        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2522
2523    def test_uninitialized(self):
2524        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2525        del t
2526        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2527        self.assertRaises(Exception, repr, t)
2528        self.assertRaisesRegex((ValueError, AttributeError),
2529                               'uninitialized|has no attribute',
2530                               t.read, 0)
2531        t.__init__(self.MockRawIO())
2532        self.assertEqual(t.read(0), '')
2533
2534    def test_non_text_encoding_codecs_are_rejected(self):
2535        # Ensure the constructor complains if passed a codec that isn't
2536        # marked as a text encoding
2537        # http://bugs.python.org/issue20404
2538        r = self.BytesIO()
2539        b = self.BufferedWriter(r)
2540        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2541            self.TextIOWrapper(b, encoding="hex")
2542
2543    def test_detach(self):
2544        r = self.BytesIO()
2545        b = self.BufferedWriter(r)
2546        t = self.TextIOWrapper(b)
2547        self.assertIs(t.detach(), b)
2548
2549        t = self.TextIOWrapper(b, encoding="ascii")
2550        t.write("howdy")
2551        self.assertFalse(r.getvalue())
2552        t.detach()
2553        self.assertEqual(r.getvalue(), b"howdy")
2554        self.assertRaises(ValueError, t.detach)
2555
2556        # Operations independent of the detached stream should still work
2557        repr(t)
2558        self.assertEqual(t.encoding, "ascii")
2559        self.assertEqual(t.errors, "strict")
2560        self.assertFalse(t.line_buffering)
2561        self.assertFalse(t.write_through)
2562
2563    def test_repr(self):
2564        raw = self.BytesIO("hello".encode("utf-8"))
2565        b = self.BufferedReader(raw)
2566        t = self.TextIOWrapper(b, encoding="utf-8")
2567        modname = self.TextIOWrapper.__module__
2568        self.assertEqual(repr(t),
2569                         "<%s.TextIOWrapper encoding='utf-8'>" % modname)
2570        raw.name = "dummy"
2571        self.assertEqual(repr(t),
2572                         "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
2573        t.mode = "r"
2574        self.assertEqual(repr(t),
2575                         "<%s.TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
2576        raw.name = b"dummy"
2577        self.assertEqual(repr(t),
2578                         "<%s.TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
2579
2580        t.buffer.detach()
2581        repr(t)  # Should not raise an exception
2582
2583    def test_recursive_repr(self):
2584        # Issue #25455
2585        raw = self.BytesIO()
2586        t = self.TextIOWrapper(raw)
2587        with support.swap_attr(raw, 'name', t):
2588            try:
2589                repr(t)  # Should not crash
2590            except RuntimeError:
2591                pass
2592
2593    def test_line_buffering(self):
2594        r = self.BytesIO()
2595        b = self.BufferedWriter(r, 1000)
2596        t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
2597        t.write("X")
2598        self.assertEqual(r.getvalue(), b"")  # No flush happened
2599        t.write("Y\nZ")
2600        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
2601        t.write("A\rB")
2602        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
2603
2604    def test_reconfigure_line_buffering(self):
2605        r = self.BytesIO()
2606        b = self.BufferedWriter(r, 1000)
2607        t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2608        t.write("AB\nC")
2609        self.assertEqual(r.getvalue(), b"")
2610
2611        t.reconfigure(line_buffering=True)   # implicit flush
2612        self.assertEqual(r.getvalue(), b"AB\nC")
2613        t.write("DEF\nG")
2614        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2615        t.write("H")
2616        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2617        t.reconfigure(line_buffering=False)   # implicit flush
2618        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2619        t.write("IJ")
2620        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2621
2622        # Keeping default value
2623        t.reconfigure()
2624        t.reconfigure(line_buffering=None)
2625        self.assertEqual(t.line_buffering, False)
2626        t.reconfigure(line_buffering=True)
2627        t.reconfigure()
2628        t.reconfigure(line_buffering=None)
2629        self.assertEqual(t.line_buffering, True)
2630
2631    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2632    def test_default_encoding(self):
2633        old_environ = dict(os.environ)
2634        try:
2635            # try to get a user preferred encoding different than the current
2636            # locale encoding to check that TextIOWrapper() uses the current
2637            # locale encoding and not the user preferred encoding
2638            for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2639                if key in os.environ:
2640                    del os.environ[key]
2641
2642            current_locale_encoding = locale.getpreferredencoding(False)
2643            b = self.BytesIO()
2644            t = self.TextIOWrapper(b)
2645            self.assertEqual(t.encoding, current_locale_encoding)
2646        finally:
2647            os.environ.clear()
2648            os.environ.update(old_environ)
2649
2650    @support.cpython_only
2651    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
2652    def test_device_encoding(self):
2653        # Issue 15989
2654        import _testcapi
2655        b = self.BytesIO()
2656        b.fileno = lambda: _testcapi.INT_MAX + 1
2657        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2658        b.fileno = lambda: _testcapi.UINT_MAX + 1
2659        self.assertRaises(OverflowError, self.TextIOWrapper, b)
2660
2661    def test_encoding(self):
2662        # Check the encoding attribute is always set, and valid
2663        b = self.BytesIO()
2664        t = self.TextIOWrapper(b, encoding="utf-8")
2665        self.assertEqual(t.encoding, "utf-8")
2666        t = self.TextIOWrapper(b)
2667        self.assertIsNotNone(t.encoding)
2668        codecs.lookup(t.encoding)
2669
2670    def test_encoding_errors_reading(self):
2671        # (1) default
2672        b = self.BytesIO(b"abc\n\xff\n")
2673        t = self.TextIOWrapper(b, encoding="ascii")
2674        self.assertRaises(UnicodeError, t.read)
2675        # (2) explicit strict
2676        b = self.BytesIO(b"abc\n\xff\n")
2677        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2678        self.assertRaises(UnicodeError, t.read)
2679        # (3) ignore
2680        b = self.BytesIO(b"abc\n\xff\n")
2681        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
2682        self.assertEqual(t.read(), "abc\n\n")
2683        # (4) replace
2684        b = self.BytesIO(b"abc\n\xff\n")
2685        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
2686        self.assertEqual(t.read(), "abc\n\ufffd\n")
2687
2688    def test_encoding_errors_writing(self):
2689        # (1) default
2690        b = self.BytesIO()
2691        t = self.TextIOWrapper(b, encoding="ascii")
2692        self.assertRaises(UnicodeError, t.write, "\xff")
2693        # (2) explicit strict
2694        b = self.BytesIO()
2695        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
2696        self.assertRaises(UnicodeError, t.write, "\xff")
2697        # (3) ignore
2698        b = self.BytesIO()
2699        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
2700                             newline="\n")
2701        t.write("abc\xffdef\n")
2702        t.flush()
2703        self.assertEqual(b.getvalue(), b"abcdef\n")
2704        # (4) replace
2705        b = self.BytesIO()
2706        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
2707                             newline="\n")
2708        t.write("abc\xffdef\n")
2709        t.flush()
2710        self.assertEqual(b.getvalue(), b"abc?def\n")
2711
2712    def test_newlines(self):
2713        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2714
2715        tests = [
2716            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
2717            [ '', input_lines ],
2718            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2719            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2720            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
2721        ]
2722        encodings = (
2723            'utf-8', 'latin-1',
2724            'utf-16', 'utf-16-le', 'utf-16-be',
2725            'utf-32', 'utf-32-le', 'utf-32-be',
2726        )
2727
2728        # Try a range of buffer sizes to test the case where \r is the last
2729        # character in TextIOWrapper._pending_line.
2730        for encoding in encodings:
2731            # XXX: str.encode() should return bytes
2732            data = bytes(''.join(input_lines).encode(encoding))
2733            for do_reads in (False, True):
2734                for bufsize in range(1, 10):
2735                    for newline, exp_lines in tests:
2736                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2737                        textio = self.TextIOWrapper(bufio, newline=newline,
2738                                                  encoding=encoding)
2739                        if do_reads:
2740                            got_lines = []
2741                            while True:
2742                                c2 = textio.read(2)
2743                                if c2 == '':
2744                                    break
2745                                self.assertEqual(len(c2), 2)
2746                                got_lines.append(c2 + textio.readline())
2747                        else:
2748                            got_lines = list(textio)
2749
2750                        for got_line, exp_line in zip(got_lines, exp_lines):
2751                            self.assertEqual(got_line, exp_line)
2752                        self.assertEqual(len(got_lines), len(exp_lines))
2753
2754    def test_newlines_input(self):
2755        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
2756        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2757        for newline, expected in [
2758            (None, normalized.decode("ascii").splitlines(keepends=True)),
2759            ("", testdata.decode("ascii").splitlines(keepends=True)),
2760            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2761            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2762            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
2763            ]:
2764            buf = self.BytesIO(testdata)
2765            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2766            self.assertEqual(txt.readlines(), expected)
2767            txt.seek(0)
2768            self.assertEqual(txt.read(), "".join(expected))
2769
2770    def test_newlines_output(self):
2771        testdict = {
2772            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2773            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2774            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2775            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2776            }
2777        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2778        for newline, expected in tests:
2779            buf = self.BytesIO()
2780            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2781            txt.write("AAA\nB")
2782            txt.write("BB\nCCC\n")
2783            txt.write("X\rY\r\nZ")
2784            txt.flush()
2785            self.assertEqual(buf.closed, False)
2786            self.assertEqual(buf.getvalue(), expected)
2787
2788    def test_destructor(self):
2789        l = []
2790        base = self.BytesIO
2791        class MyBytesIO(base):
2792            def close(self):
2793                l.append(self.getvalue())
2794                base.close(self)
2795        b = MyBytesIO()
2796        t = self.TextIOWrapper(b, encoding="ascii")
2797        t.write("abc")
2798        del t
2799        support.gc_collect()
2800        self.assertEqual([b"abc"], l)
2801
2802    def test_override_destructor(self):
2803        record = []
2804        class MyTextIO(self.TextIOWrapper):
2805            def __del__(self):
2806                record.append(1)
2807                try:
2808                    f = super().__del__
2809                except AttributeError:
2810                    pass
2811                else:
2812                    f()
2813            def close(self):
2814                record.append(2)
2815                super().close()
2816            def flush(self):
2817                record.append(3)
2818                super().flush()
2819        b = self.BytesIO()
2820        t = MyTextIO(b, encoding="ascii")
2821        del t
2822        support.gc_collect()
2823        self.assertEqual(record, [1, 2, 3])
2824
2825    def test_error_through_destructor(self):
2826        # Test that the exception state is not modified by a destructor,
2827        # even if close() fails.
2828        rawio = self.CloseFailureIO()
2829        def f():
2830            self.TextIOWrapper(rawio).xyzzy
2831        with support.captured_output("stderr") as s:
2832            self.assertRaises(AttributeError, f)
2833        s = s.getvalue().strip()
2834        if s:
2835            # The destructor *may* have printed an unraisable error, check it
2836            self.assertEqual(len(s.splitlines()), 1)
2837            self.assertTrue(s.startswith("Exception OSError: "), s)
2838            self.assertTrue(s.endswith(" ignored"), s)
2839
2840    # Systematic tests of the text I/O API
2841
2842    def test_basic_io(self):
2843        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
2844            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
2845                f = self.open(support.TESTFN, "w+", encoding=enc)
2846                f._CHUNK_SIZE = chunksize
2847                self.assertEqual(f.write("abc"), 3)
2848                f.close()
2849                f = self.open(support.TESTFN, "r+", encoding=enc)
2850                f._CHUNK_SIZE = chunksize
2851                self.assertEqual(f.tell(), 0)
2852                self.assertEqual(f.read(), "abc")
2853                cookie = f.tell()
2854                self.assertEqual(f.seek(0), 0)
2855                self.assertEqual(f.read(None), "abc")
2856                f.seek(0)
2857                self.assertEqual(f.read(2), "ab")
2858                self.assertEqual(f.read(1), "c")
2859                self.assertEqual(f.read(1), "")
2860                self.assertEqual(f.read(), "")
2861                self.assertEqual(f.tell(), cookie)
2862                self.assertEqual(f.seek(0), 0)
2863                self.assertEqual(f.seek(0, 2), cookie)
2864                self.assertEqual(f.write("def"), 3)
2865                self.assertEqual(f.seek(cookie), cookie)
2866                self.assertEqual(f.read(), "def")
2867                if enc.startswith("utf"):
2868                    self.multi_line_test(f, enc)
2869                f.close()
2870
2871    def multi_line_test(self, f, enc):
2872        f.seek(0)
2873        f.truncate()
2874        sample = "s\xff\u0fff\uffff"
2875        wlines = []
2876        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
2877            chars = []
2878            for i in range(size):
2879                chars.append(sample[i % len(sample)])
2880            line = "".join(chars) + "\n"
2881            wlines.append((f.tell(), line))
2882            f.write(line)
2883        f.seek(0)
2884        rlines = []
2885        while True:
2886            pos = f.tell()
2887            line = f.readline()
2888            if not line:
2889                break
2890            rlines.append((pos, line))
2891        self.assertEqual(rlines, wlines)
2892
2893    def test_telling(self):
2894        f = self.open(support.TESTFN, "w+", encoding="utf-8")
2895        p0 = f.tell()
2896        f.write("\xff\n")
2897        p1 = f.tell()
2898        f.write("\xff\n")
2899        p2 = f.tell()
2900        f.seek(0)
2901        self.assertEqual(f.tell(), p0)
2902        self.assertEqual(f.readline(), "\xff\n")
2903        self.assertEqual(f.tell(), p1)
2904        self.assertEqual(f.readline(), "\xff\n")
2905        self.assertEqual(f.tell(), p2)
2906        f.seek(0)
2907        for line in f:
2908            self.assertEqual(line, "\xff\n")
2909            self.assertRaises(OSError, f.tell)
2910        self.assertEqual(f.tell(), p2)
2911        f.close()
2912
2913    def test_seeking(self):
2914        chunk_size = _default_chunk_size()
2915        prefix_size = chunk_size - 2
2916        u_prefix = "a" * prefix_size
2917        prefix = bytes(u_prefix.encode("utf-8"))
2918        self.assertEqual(len(u_prefix), len(prefix))
2919        u_suffix = "\u8888\n"
2920        suffix = bytes(u_suffix.encode("utf-8"))
2921        line = prefix + suffix
2922        with self.open(support.TESTFN, "wb") as f:
2923            f.write(line*2)
2924        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2925            s = f.read(prefix_size)
2926            self.assertEqual(s, str(prefix, "ascii"))
2927            self.assertEqual(f.tell(), prefix_size)
2928            self.assertEqual(f.readline(), u_suffix)
2929
2930    def test_seeking_too(self):
2931        # Regression test for a specific bug
2932        data = b'\xe0\xbf\xbf\n'
2933        with self.open(support.TESTFN, "wb") as f:
2934            f.write(data)
2935        with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2936            f._CHUNK_SIZE  # Just test that it exists
2937            f._CHUNK_SIZE = 2
2938            f.readline()
2939            f.tell()
2940
2941    def test_seek_and_tell(self):
2942        #Test seek/tell using the StatefulIncrementalDecoder.
2943        # Make test faster by doing smaller seeks
2944        CHUNK_SIZE = 128
2945
2946        def test_seek_and_tell_with_data(data, min_pos=0):
2947            """Tell/seek to various points within a data stream and ensure
2948            that the decoded data returned by read() is consistent."""
2949            f = self.open(support.TESTFN, 'wb')
2950            f.write(data)
2951            f.close()
2952            f = self.open(support.TESTFN, encoding='test_decoder')
2953            f._CHUNK_SIZE = CHUNK_SIZE
2954            decoded = f.read()
2955            f.close()
2956
2957            for i in range(min_pos, len(decoded) + 1): # seek positions
2958                for j in [1, 5, len(decoded) - i]: # read lengths
2959                    f = self.open(support.TESTFN, encoding='test_decoder')
2960                    self.assertEqual(f.read(i), decoded[:i])
2961                    cookie = f.tell()
2962                    self.assertEqual(f.read(j), decoded[i:i + j])
2963                    f.seek(cookie)
2964                    self.assertEqual(f.read(), decoded[i:])
2965                    f.close()
2966
2967        # Enable the test decoder.
2968        StatefulIncrementalDecoder.codecEnabled = 1
2969
2970        # Run the tests.
2971        try:
2972            # Try each test case.
2973            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2974                test_seek_and_tell_with_data(input)
2975
2976            # Position each test case so that it crosses a chunk boundary.
2977            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
2978                offset = CHUNK_SIZE - len(input)//2
2979                prefix = b'.'*offset
2980                # Don't bother seeking into the prefix (takes too long).
2981                min_pos = offset*2
2982                test_seek_and_tell_with_data(prefix + input, min_pos)
2983
2984        # Ensure our test decoder won't interfere with subsequent tests.
2985        finally:
2986            StatefulIncrementalDecoder.codecEnabled = 0
2987
2988    def test_encoded_writes(self):
2989        data = "1234567890"
2990        tests = ("utf-16",
2991                 "utf-16-le",
2992                 "utf-16-be",
2993                 "utf-32",
2994                 "utf-32-le",
2995                 "utf-32-be")
2996        for encoding in tests:
2997            buf = self.BytesIO()
2998            f = self.TextIOWrapper(buf, encoding=encoding)
2999            # Check if the BOM is written only once (see issue1753).
3000            f.write(data)
3001            f.write(data)
3002            f.seek(0)
3003            self.assertEqual(f.read(), data * 2)
3004            f.seek(0)
3005            self.assertEqual(f.read(), data * 2)
3006            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
3007
3008    def test_unreadable(self):
3009        class UnReadable(self.BytesIO):
3010            def readable(self):
3011                return False
3012        txt = self.TextIOWrapper(UnReadable())
3013        self.assertRaises(OSError, txt.read)
3014
3015    def test_read_one_by_one(self):
3016        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
3017        reads = ""
3018        while True:
3019            c = txt.read(1)
3020            if not c:
3021                break
3022            reads += c
3023        self.assertEqual(reads, "AA\nBB")
3024
3025    def test_readlines(self):
3026        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3027        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3028        txt.seek(0)
3029        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3030        txt.seek(0)
3031        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3032
3033    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
3034    def test_read_by_chunk(self):
3035        # make sure "\r\n" straddles 128 char boundary.
3036        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
3037        reads = ""
3038        while True:
3039            c = txt.read(128)
3040            if not c:
3041                break
3042            reads += c
3043        self.assertEqual(reads, "A"*127+"\nB")
3044
3045    def test_writelines(self):
3046        l = ['ab', 'cd', 'ef']
3047        buf = self.BytesIO()
3048        txt = self.TextIOWrapper(buf)
3049        txt.writelines(l)
3050        txt.flush()
3051        self.assertEqual(buf.getvalue(), b'abcdef')
3052
3053    def test_writelines_userlist(self):
3054        l = UserList(['ab', 'cd', 'ef'])
3055        buf = self.BytesIO()
3056        txt = self.TextIOWrapper(buf)
3057        txt.writelines(l)
3058        txt.flush()
3059        self.assertEqual(buf.getvalue(), b'abcdef')
3060
3061    def test_writelines_error(self):
3062        txt = self.TextIOWrapper(self.BytesIO())
3063        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3064        self.assertRaises(TypeError, txt.writelines, None)
3065        self.assertRaises(TypeError, txt.writelines, b'abc')
3066
3067    def test_issue1395_1(self):
3068        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3069
3070        # read one char at a time
3071        reads = ""
3072        while True:
3073            c = txt.read(1)
3074            if not c:
3075                break
3076            reads += c
3077        self.assertEqual(reads, self.normalized)
3078
3079    def test_issue1395_2(self):
3080        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3081        txt._CHUNK_SIZE = 4
3082
3083        reads = ""
3084        while True:
3085            c = txt.read(4)
3086            if not c:
3087                break
3088            reads += c
3089        self.assertEqual(reads, self.normalized)
3090
3091    def test_issue1395_3(self):
3092        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3093        txt._CHUNK_SIZE = 4
3094
3095        reads = txt.read(4)
3096        reads += txt.read(4)
3097        reads += txt.readline()
3098        reads += txt.readline()
3099        reads += txt.readline()
3100        self.assertEqual(reads, self.normalized)
3101
3102    def test_issue1395_4(self):
3103        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3104        txt._CHUNK_SIZE = 4
3105
3106        reads = txt.read(4)
3107        reads += txt.read()
3108        self.assertEqual(reads, self.normalized)
3109
3110    def test_issue1395_5(self):
3111        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3112        txt._CHUNK_SIZE = 4
3113
3114        reads = txt.read(4)
3115        pos = txt.tell()
3116        txt.seek(0)
3117        txt.seek(pos)
3118        self.assertEqual(txt.read(4), "BBB\n")
3119
3120    def test_issue2282(self):
3121        buffer = self.BytesIO(self.testdata)
3122        txt = self.TextIOWrapper(buffer, encoding="ascii")
3123
3124        self.assertEqual(buffer.seekable(), txt.seekable())
3125
3126    def test_append_bom(self):
3127        # The BOM is not written again when appending to a non-empty file
3128        filename = support.TESTFN
3129        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3130            with self.open(filename, 'w', encoding=charset) as f:
3131                f.write('aaa')
3132                pos = f.tell()
3133            with self.open(filename, 'rb') as f:
3134                self.assertEqual(f.read(), 'aaa'.encode(charset))
3135
3136            with self.open(filename, 'a', encoding=charset) as f:
3137                f.write('xxx')
3138            with self.open(filename, 'rb') as f:
3139                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3140
3141    def test_seek_bom(self):
3142        # Same test, but when seeking manually
3143        filename = support.TESTFN
3144        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3145            with self.open(filename, 'w', encoding=charset) as f:
3146                f.write('aaa')
3147                pos = f.tell()
3148            with self.open(filename, 'r+', encoding=charset) as f:
3149                f.seek(pos)
3150                f.write('zzz')
3151                f.seek(0)
3152                f.write('bbb')
3153            with self.open(filename, 'rb') as f:
3154                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
3155
3156    def test_seek_append_bom(self):
3157        # Same test, but first seek to the start and then to the end
3158        filename = support.TESTFN
3159        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3160            with self.open(filename, 'w', encoding=charset) as f:
3161                f.write('aaa')
3162            with self.open(filename, 'a', encoding=charset) as f:
3163                f.seek(0)
3164                f.seek(0, self.SEEK_END)
3165                f.write('xxx')
3166            with self.open(filename, 'rb') as f:
3167                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3168
3169    def test_errors_property(self):
3170        with self.open(support.TESTFN, "w") as f:
3171            self.assertEqual(f.errors, "strict")
3172        with self.open(support.TESTFN, "w", errors="replace") as f:
3173            self.assertEqual(f.errors, "replace")
3174
3175    @support.no_tracing
3176    def test_threads_write(self):
3177        # Issue6750: concurrent writes could duplicate data
3178        event = threading.Event()
3179        with self.open(support.TESTFN, "w", buffering=1) as f:
3180            def run(n):
3181                text = "Thread%03d\n" % n
3182                event.wait()
3183                f.write(text)
3184            threads = [threading.Thread(target=run, args=(x,))
3185                       for x in range(20)]
3186            with support.start_threads(threads, event.set):
3187                time.sleep(0.02)
3188        with self.open(support.TESTFN) as f:
3189            content = f.read()
3190            for n in range(20):
3191                self.assertEqual(content.count("Thread%03d\n" % n), 1)
3192
3193    def test_flush_error_on_close(self):
3194        # Test that text file is closed despite failed flush
3195        # and that flush() is called before file closed.
3196        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3197        closed = []
3198        def bad_flush():
3199            closed[:] = [txt.closed, txt.buffer.closed]
3200            raise OSError()
3201        txt.flush = bad_flush
3202        self.assertRaises(OSError, txt.close) # exception not swallowed
3203        self.assertTrue(txt.closed)
3204        self.assertTrue(txt.buffer.closed)
3205        self.assertTrue(closed)      # flush() called
3206        self.assertFalse(closed[0])  # flush() called before file closed
3207        self.assertFalse(closed[1])
3208        txt.flush = lambda: None  # break reference loop
3209
3210    def test_close_error_on_close(self):
3211        buffer = self.BytesIO(self.testdata)
3212        def bad_flush():
3213            raise OSError('flush')
3214        def bad_close():
3215            raise OSError('close')
3216        buffer.close = bad_close
3217        txt = self.TextIOWrapper(buffer, encoding="ascii")
3218        txt.flush = bad_flush
3219        with self.assertRaises(OSError) as err: # exception not swallowed
3220            txt.close()
3221        self.assertEqual(err.exception.args, ('close',))
3222        self.assertIsInstance(err.exception.__context__, OSError)
3223        self.assertEqual(err.exception.__context__.args, ('flush',))
3224        self.assertFalse(txt.closed)
3225
3226    def test_nonnormalized_close_error_on_close(self):
3227        # Issue #21677
3228        buffer = self.BytesIO(self.testdata)
3229        def bad_flush():
3230            raise non_existing_flush
3231        def bad_close():
3232            raise non_existing_close
3233        buffer.close = bad_close
3234        txt = self.TextIOWrapper(buffer, encoding="ascii")
3235        txt.flush = bad_flush
3236        with self.assertRaises(NameError) as err: # exception not swallowed
3237            txt.close()
3238        self.assertIn('non_existing_close', str(err.exception))
3239        self.assertIsInstance(err.exception.__context__, NameError)
3240        self.assertIn('non_existing_flush', str(err.exception.__context__))
3241        self.assertFalse(txt.closed)
3242
3243    def test_multi_close(self):
3244        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3245        txt.close()
3246        txt.close()
3247        txt.close()
3248        self.assertRaises(ValueError, txt.flush)
3249
3250    def test_unseekable(self):
3251        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3252        self.assertRaises(self.UnsupportedOperation, txt.tell)
3253        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3254
3255    def test_readonly_attributes(self):
3256        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3257        buf = self.BytesIO(self.testdata)
3258        with self.assertRaises(AttributeError):
3259            txt.buffer = buf
3260
3261    def test_rawio(self):
3262        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3263        # that subprocess.Popen() can have the required unbuffered
3264        # semantics with universal_newlines=True.
3265        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3266        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3267        # Reads
3268        self.assertEqual(txt.read(4), 'abcd')
3269        self.assertEqual(txt.readline(), 'efghi\n')
3270        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3271
3272    def test_rawio_write_through(self):
3273        # Issue #12591: with write_through=True, writes don't need a flush
3274        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3275        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3276                                 write_through=True)
3277        txt.write('1')
3278        txt.write('23\n4')
3279        txt.write('5')
3280        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3281
3282    def test_bufio_write_through(self):
3283        # Issue #21396: write_through=True doesn't force a flush()
3284        # on the underlying binary buffered object.
3285        flush_called, write_called = [], []
3286        class BufferedWriter(self.BufferedWriter):
3287            def flush(self, *args, **kwargs):
3288                flush_called.append(True)
3289                return super().flush(*args, **kwargs)
3290            def write(self, *args, **kwargs):
3291                write_called.append(True)
3292                return super().write(*args, **kwargs)
3293
3294        rawio = self.BytesIO()
3295        data = b"a"
3296        bufio = BufferedWriter(rawio, len(data)*2)
3297        textio = self.TextIOWrapper(bufio, encoding='ascii',
3298                                    write_through=True)
3299        # write to the buffered io but don't overflow the buffer
3300        text = data.decode('ascii')
3301        textio.write(text)
3302
3303        # buffer.flush is not called with write_through=True
3304        self.assertFalse(flush_called)
3305        # buffer.write *is* called with write_through=True
3306        self.assertTrue(write_called)
3307        self.assertEqual(rawio.getvalue(), b"") # no flush
3308
3309        write_called = [] # reset
3310        textio.write(text * 10) # total content is larger than bufio buffer
3311        self.assertTrue(write_called)
3312        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3313
3314    def test_reconfigure_write_through(self):
3315        raw = self.MockRawIO([])
3316        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3317        t.write('1')
3318        t.reconfigure(write_through=True)  # implied flush
3319        self.assertEqual(t.write_through, True)
3320        self.assertEqual(b''.join(raw._write_stack), b'1')
3321        t.write('23')
3322        self.assertEqual(b''.join(raw._write_stack), b'123')
3323        t.reconfigure(write_through=False)
3324        self.assertEqual(t.write_through, False)
3325        t.write('45')
3326        t.flush()
3327        self.assertEqual(b''.join(raw._write_stack), b'12345')
3328        # Keeping default value
3329        t.reconfigure()
3330        t.reconfigure(write_through=None)
3331        self.assertEqual(t.write_through, False)
3332        t.reconfigure(write_through=True)
3333        t.reconfigure()
3334        t.reconfigure(write_through=None)
3335        self.assertEqual(t.write_through, True)
3336
3337    def test_read_nonbytes(self):
3338        # Issue #17106
3339        # Crash when underlying read() returns non-bytes
3340        t = self.TextIOWrapper(self.StringIO('a'))
3341        self.assertRaises(TypeError, t.read, 1)
3342        t = self.TextIOWrapper(self.StringIO('a'))
3343        self.assertRaises(TypeError, t.readline)
3344        t = self.TextIOWrapper(self.StringIO('a'))
3345        self.assertRaises(TypeError, t.read)
3346
3347    def test_illegal_encoder(self):
3348        # Issue 31271: Calling write() while the return value of encoder's
3349        # encode() is invalid shouldn't cause an assertion failure.
3350        rot13 = codecs.lookup("rot13")
3351        with support.swap_attr(rot13, '_is_text_encoding', True):
3352            t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3353        self.assertRaises(TypeError, t.write, 'bar')
3354
3355    def test_illegal_decoder(self):
3356        # Issue #17106
3357        # Bypass the early encoding check added in issue 20404
3358        def _make_illegal_wrapper():
3359            quopri = codecs.lookup("quopri")
3360            quopri._is_text_encoding = True
3361            try:
3362                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3363                                       newline='\n', encoding="quopri")
3364            finally:
3365                quopri._is_text_encoding = False
3366            return t
3367        # Crash when decoder returns non-string
3368        t = _make_illegal_wrapper()
3369        self.assertRaises(TypeError, t.read, 1)
3370        t = _make_illegal_wrapper()
3371        self.assertRaises(TypeError, t.readline)
3372        t = _make_illegal_wrapper()
3373        self.assertRaises(TypeError, t.read)
3374
3375        # Issue 31243: calling read() while the return value of decoder's
3376        # getstate() is invalid should neither crash the interpreter nor
3377        # raise a SystemError.
3378        def _make_very_illegal_wrapper(getstate_ret_val):
3379            class BadDecoder:
3380                def getstate(self):
3381                    return getstate_ret_val
3382            def _get_bad_decoder(dummy):
3383                return BadDecoder()
3384            quopri = codecs.lookup("quopri")
3385            with support.swap_attr(quopri, 'incrementaldecoder',
3386                                   _get_bad_decoder):
3387                return _make_illegal_wrapper()
3388        t = _make_very_illegal_wrapper(42)
3389        self.assertRaises(TypeError, t.read, 42)
3390        t = _make_very_illegal_wrapper(())
3391        self.assertRaises(TypeError, t.read, 42)
3392        t = _make_very_illegal_wrapper((1, 2))
3393        self.assertRaises(TypeError, t.read, 42)
3394
3395    def _check_create_at_shutdown(self, **kwargs):
3396        # Issue #20037: creating a TextIOWrapper at shutdown
3397        # shouldn't crash the interpreter.
3398        iomod = self.io.__name__
3399        code = """if 1:
3400            import codecs
3401            import {iomod} as io
3402
3403            # Avoid looking up codecs at shutdown
3404            codecs.lookup('utf-8')
3405
3406            class C:
3407                def __init__(self):
3408                    self.buf = io.BytesIO()
3409                def __del__(self):
3410                    io.TextIOWrapper(self.buf, **{kwargs})
3411                    print("ok")
3412            c = C()
3413            """.format(iomod=iomod, kwargs=kwargs)
3414        return assert_python_ok("-c", code)
3415
3416    @support.requires_type_collecting
3417    def test_create_at_shutdown_without_encoding(self):
3418        rc, out, err = self._check_create_at_shutdown()
3419        if err:
3420            # Can error out with a RuntimeError if the module state
3421            # isn't found.
3422            self.assertIn(self.shutdown_error, err.decode())
3423        else:
3424            self.assertEqual("ok", out.decode().strip())
3425
3426    @support.requires_type_collecting
3427    def test_create_at_shutdown_with_encoding(self):
3428        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3429                                                      errors='strict')
3430        self.assertFalse(err)
3431        self.assertEqual("ok", out.decode().strip())
3432
3433    def test_read_byteslike(self):
3434        r = MemviewBytesIO(b'Just some random string\n')
3435        t = self.TextIOWrapper(r, 'utf-8')
3436
3437        # TextIOwrapper will not read the full string, because
3438        # we truncate it to a multiple of the native int size
3439        # so that we can construct a more complex memoryview.
3440        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
3441
3442        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3443
3444    def test_issue22849(self):
3445        class F(object):
3446            def readable(self): return True
3447            def writable(self): return True
3448            def seekable(self): return True
3449
3450        for i in range(10):
3451            try:
3452                self.TextIOWrapper(F(), encoding='utf-8')
3453            except Exception:
3454                pass
3455
3456        F.tell = lambda x: 0
3457        t = self.TextIOWrapper(F(), encoding='utf-8')
3458
3459    def test_reconfigure_encoding_read(self):
3460        # latin1 -> utf8
3461        # (latin1 can decode utf-8 encoded string)
3462        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3463        raw = self.BytesIO(data)
3464        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3465        self.assertEqual(txt.readline(), 'abc\xe9\n')
3466        with self.assertRaises(self.UnsupportedOperation):
3467            txt.reconfigure(encoding='utf-8')
3468        with self.assertRaises(self.UnsupportedOperation):
3469            txt.reconfigure(newline=None)
3470
3471    def test_reconfigure_write_fromascii(self):
3472        # ascii has a specific encodefunc in the C implementation,
3473        # but utf-8-sig has not. Make sure that we get rid of the
3474        # cached encodefunc when we switch encoders.
3475        raw = self.BytesIO()
3476        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3477        txt.write('foo\n')
3478        txt.reconfigure(encoding='utf-8-sig')
3479        txt.write('\xe9\n')
3480        txt.flush()
3481        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3482
3483    def test_reconfigure_write(self):
3484        # latin -> utf8
3485        raw = self.BytesIO()
3486        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3487        txt.write('abc\xe9\n')
3488        txt.reconfigure(encoding='utf-8')
3489        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3490        txt.write('d\xe9f\n')
3491        txt.flush()
3492        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3493
3494        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3495        # the file
3496        raw = self.BytesIO()
3497        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3498        txt.write('abc\n')
3499        txt.reconfigure(encoding='utf-8-sig')
3500        txt.write('d\xe9f\n')
3501        txt.flush()
3502        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3503
3504    def test_reconfigure_write_non_seekable(self):
3505        raw = self.BytesIO()
3506        raw.seekable = lambda: False
3507        raw.seek = None
3508        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3509        txt.write('abc\n')
3510        txt.reconfigure(encoding='utf-8-sig')
3511        txt.write('d\xe9f\n')
3512        txt.flush()
3513
3514        # If the raw stream is not seekable, there'll be a BOM
3515        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3516
3517    def test_reconfigure_defaults(self):
3518        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3519        txt.reconfigure(encoding=None)
3520        self.assertEqual(txt.encoding, 'ascii')
3521        self.assertEqual(txt.errors, 'replace')
3522        txt.write('LF\n')
3523
3524        txt.reconfigure(newline='\r\n')
3525        self.assertEqual(txt.encoding, 'ascii')
3526        self.assertEqual(txt.errors, 'replace')
3527
3528        txt.reconfigure(errors='ignore')
3529        self.assertEqual(txt.encoding, 'ascii')
3530        self.assertEqual(txt.errors, 'ignore')
3531        txt.write('CRLF\n')
3532
3533        txt.reconfigure(encoding='utf-8', newline=None)
3534        self.assertEqual(txt.errors, 'strict')
3535        txt.seek(0)
3536        self.assertEqual(txt.read(), 'LF\nCRLF\n')
3537
3538        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3539
3540    def test_reconfigure_newline(self):
3541        raw = self.BytesIO(b'CR\rEOF')
3542        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3543        txt.reconfigure(newline=None)
3544        self.assertEqual(txt.readline(), 'CR\n')
3545        raw = self.BytesIO(b'CR\rEOF')
3546        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3547        txt.reconfigure(newline='')
3548        self.assertEqual(txt.readline(), 'CR\r')
3549        raw = self.BytesIO(b'CR\rLF\nEOF')
3550        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3551        txt.reconfigure(newline='\n')
3552        self.assertEqual(txt.readline(), 'CR\rLF\n')
3553        raw = self.BytesIO(b'LF\nCR\rEOF')
3554        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3555        txt.reconfigure(newline='\r')
3556        self.assertEqual(txt.readline(), 'LF\nCR\r')
3557        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3558        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3559        txt.reconfigure(newline='\r\n')
3560        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3561
3562        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3563        txt.reconfigure(newline=None)
3564        txt.write('linesep\n')
3565        txt.reconfigure(newline='')
3566        txt.write('LF\n')
3567        txt.reconfigure(newline='\n')
3568        txt.write('LF\n')
3569        txt.reconfigure(newline='\r')
3570        txt.write('CR\n')
3571        txt.reconfigure(newline='\r\n')
3572        txt.write('CRLF\n')
3573        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3574        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3575
3576    def test_issue25862(self):
3577        # Assertion failures occurred in tell() after read() and write().
3578        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3579        t.read(1)
3580        t.read()
3581        t.tell()
3582        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3583        t.read(1)
3584        t.write('x')
3585        t.tell()
3586
3587
3588class MemviewBytesIO(io.BytesIO):
3589    '''A BytesIO object whose read method returns memoryviews
3590       rather than bytes'''
3591
3592    def read1(self, len_):
3593        return _to_memoryview(super().read1(len_))
3594
3595    def read(self, len_):
3596        return _to_memoryview(super().read(len_))
3597
3598def _to_memoryview(buf):
3599    '''Convert bytes-object *buf* to a non-trivial memoryview'''
3600
3601    arr = array.array('i')
3602    idx = len(buf) - len(buf) % arr.itemsize
3603    arr.frombytes(buf[:idx])
3604    return memoryview(arr)
3605
3606
3607class CTextIOWrapperTest(TextIOWrapperTest):
3608    io = io
3609    shutdown_error = "RuntimeError: could not find io module state"
3610
3611    def test_initialization(self):
3612        r = self.BytesIO(b"\xc3\xa9\n\n")
3613        b = self.BufferedReader(r, 1000)
3614        t = self.TextIOWrapper(b)
3615        self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3616        self.assertRaises(ValueError, t.read)
3617
3618        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3619        self.assertRaises(Exception, repr, t)
3620
3621    def test_garbage_collection(self):
3622        # C TextIOWrapper objects are collected, and collecting them flushes
3623        # all data to disk.
3624        # The Python version has __del__, so it ends in gc.garbage instead.
3625        with support.check_warnings(('', ResourceWarning)):
3626            rawio = io.FileIO(support.TESTFN, "wb")
3627            b = self.BufferedWriter(rawio)
3628            t = self.TextIOWrapper(b, encoding="ascii")
3629            t.write("456def")
3630            t.x = t
3631            wr = weakref.ref(t)
3632            del t
3633            support.gc_collect()
3634        self.assertIsNone(wr(), wr)
3635        with self.open(support.TESTFN, "rb") as f:
3636            self.assertEqual(f.read(), b"456def")
3637
3638    def test_rwpair_cleared_before_textio(self):
3639        # Issue 13070: TextIOWrapper's finalization would crash when called
3640        # after the reference to the underlying BufferedRWPair's writer got
3641        # cleared by the GC.
3642        for i in range(1000):
3643            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3644            t1 = self.TextIOWrapper(b1, encoding="ascii")
3645            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3646            t2 = self.TextIOWrapper(b2, encoding="ascii")
3647            # circular references
3648            t1.buddy = t2
3649            t2.buddy = t1
3650        support.gc_collect()
3651
3652    def test_del__CHUNK_SIZE_SystemError(self):
3653        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3654        with self.assertRaises(AttributeError):
3655            del t._CHUNK_SIZE
3656
3657
3658class PyTextIOWrapperTest(TextIOWrapperTest):
3659    io = pyio
3660    shutdown_error = "LookupError: unknown encoding: ascii"
3661
3662
3663class IncrementalNewlineDecoderTest(unittest.TestCase):
3664
3665    def check_newline_decoding_utf8(self, decoder):
3666        # UTF-8 specific tests for a newline decoder
3667        def _check_decode(b, s, **kwargs):
3668            # We exercise getstate() / setstate() as well as decode()
3669            state = decoder.getstate()
3670            self.assertEqual(decoder.decode(b, **kwargs), s)
3671            decoder.setstate(state)
3672            self.assertEqual(decoder.decode(b, **kwargs), s)
3673
3674        _check_decode(b'\xe8\xa2\x88', "\u8888")
3675
3676        _check_decode(b'\xe8', "")
3677        _check_decode(b'\xa2', "")
3678        _check_decode(b'\x88', "\u8888")
3679
3680        _check_decode(b'\xe8', "")
3681        _check_decode(b'\xa2', "")
3682        _check_decode(b'\x88', "\u8888")
3683
3684        _check_decode(b'\xe8', "")
3685        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3686
3687        decoder.reset()
3688        _check_decode(b'\n', "\n")
3689        _check_decode(b'\r', "")
3690        _check_decode(b'', "\n", final=True)
3691        _check_decode(b'\r', "\n", final=True)
3692
3693        _check_decode(b'\r', "")
3694        _check_decode(b'a', "\na")
3695
3696        _check_decode(b'\r\r\n', "\n\n")
3697        _check_decode(b'\r', "")
3698        _check_decode(b'\r', "\n")
3699        _check_decode(b'\na', "\na")
3700
3701        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3702        _check_decode(b'\xe8\xa2\x88', "\u8888")
3703        _check_decode(b'\n', "\n")
3704        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3705        _check_decode(b'\n', "\n")
3706
3707    def check_newline_decoding(self, decoder, encoding):
3708        result = []
3709        if encoding is not None:
3710            encoder = codecs.getincrementalencoder(encoding)()
3711            def _decode_bytewise(s):
3712                # Decode one byte at a time
3713                for b in encoder.encode(s):
3714                    result.append(decoder.decode(bytes([b])))
3715        else:
3716            encoder = None
3717            def _decode_bytewise(s):
3718                # Decode one char at a time
3719                for c in s:
3720                    result.append(decoder.decode(c))
3721        self.assertEqual(decoder.newlines, None)
3722        _decode_bytewise("abc\n\r")
3723        self.assertEqual(decoder.newlines, '\n')
3724        _decode_bytewise("\nabc")
3725        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3726        _decode_bytewise("abc\r")
3727        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
3728        _decode_bytewise("abc")
3729        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
3730        _decode_bytewise("abc\r")
3731        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
3732        decoder.reset()
3733        input = "abc"
3734        if encoder is not None:
3735            encoder.reset()
3736            input = encoder.encode(input)
3737        self.assertEqual(decoder.decode(input), "abc")
3738        self.assertEqual(decoder.newlines, None)
3739
3740    def test_newline_decoder(self):
3741        encodings = (
3742            # None meaning the IncrementalNewlineDecoder takes unicode input
3743            # rather than bytes input
3744            None, 'utf-8', 'latin-1',
3745            'utf-16', 'utf-16-le', 'utf-16-be',
3746            'utf-32', 'utf-32-le', 'utf-32-be',
3747        )
3748        for enc in encodings:
3749            decoder = enc and codecs.getincrementaldecoder(enc)()
3750            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3751            self.check_newline_decoding(decoder, enc)
3752        decoder = codecs.getincrementaldecoder("utf-8")()
3753        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3754        self.check_newline_decoding_utf8(decoder)
3755        self.assertRaises(TypeError, decoder.setstate, 42)
3756
3757    def test_newline_bytes(self):
3758        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3759        def _check(dec):
3760            self.assertEqual(dec.newlines, None)
3761            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3762            self.assertEqual(dec.newlines, None)
3763            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3764            self.assertEqual(dec.newlines, None)
3765        dec = self.IncrementalNewlineDecoder(None, translate=False)
3766        _check(dec)
3767        dec = self.IncrementalNewlineDecoder(None, translate=True)
3768        _check(dec)
3769
3770    def test_translate(self):
3771        # issue 35062
3772        for translate in (-2, -1, 1, 2):
3773            decoder = codecs.getincrementaldecoder("utf-8")()
3774            decoder = self.IncrementalNewlineDecoder(decoder, translate)
3775            self.check_newline_decoding_utf8(decoder)
3776        decoder = codecs.getincrementaldecoder("utf-8")()
3777        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3778        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3779
3780class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3781    pass
3782
3783class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3784    pass
3785
3786
3787# XXX Tests for open()
3788
3789class MiscIOTest(unittest.TestCase):
3790
3791    def tearDown(self):
3792        support.unlink(support.TESTFN)
3793
3794    def test___all__(self):
3795        for name in self.io.__all__:
3796            obj = getattr(self.io, name, None)
3797            self.assertIsNotNone(obj, name)
3798            if name == "open":
3799                continue
3800            elif "error" in name.lower() or name == "UnsupportedOperation":
3801                self.assertTrue(issubclass(obj, Exception), name)
3802            elif not name.startswith("SEEK_"):
3803                self.assertTrue(issubclass(obj, self.IOBase))
3804
3805    def test_attributes(self):
3806        f = self.open(support.TESTFN, "wb", buffering=0)
3807        self.assertEqual(f.mode, "wb")
3808        f.close()
3809
3810        with support.check_warnings(('', DeprecationWarning)):
3811            f = self.open(support.TESTFN, "U")
3812        self.assertEqual(f.name,            support.TESTFN)
3813        self.assertEqual(f.buffer.name,     support.TESTFN)
3814        self.assertEqual(f.buffer.raw.name, support.TESTFN)
3815        self.assertEqual(f.mode,            "U")
3816        self.assertEqual(f.buffer.mode,     "rb")
3817        self.assertEqual(f.buffer.raw.mode, "rb")
3818        f.close()
3819
3820        f = self.open(support.TESTFN, "w+")
3821        self.assertEqual(f.mode,            "w+")
3822        self.assertEqual(f.buffer.mode,     "rb+") # Does it really matter?
3823        self.assertEqual(f.buffer.raw.mode, "rb+")
3824
3825        g = self.open(f.fileno(), "wb", closefd=False)
3826        self.assertEqual(g.mode,     "wb")
3827        self.assertEqual(g.raw.mode, "wb")
3828        self.assertEqual(g.name,     f.fileno())
3829        self.assertEqual(g.raw.name, f.fileno())
3830        f.close()
3831        g.close()
3832
3833    def test_io_after_close(self):
3834        for kwargs in [
3835                {"mode": "w"},
3836                {"mode": "wb"},
3837                {"mode": "w", "buffering": 1},
3838                {"mode": "w", "buffering": 2},
3839                {"mode": "wb", "buffering": 0},
3840                {"mode": "r"},
3841                {"mode": "rb"},
3842                {"mode": "r", "buffering": 1},
3843                {"mode": "r", "buffering": 2},
3844                {"mode": "rb", "buffering": 0},
3845                {"mode": "w+"},
3846                {"mode": "w+b"},
3847                {"mode": "w+", "buffering": 1},
3848                {"mode": "w+", "buffering": 2},
3849                {"mode": "w+b", "buffering": 0},
3850            ]:
3851            f = self.open(support.TESTFN, **kwargs)
3852            f.close()
3853            self.assertRaises(ValueError, f.flush)
3854            self.assertRaises(ValueError, f.fileno)
3855            self.assertRaises(ValueError, f.isatty)
3856            self.assertRaises(ValueError, f.__iter__)
3857            if hasattr(f, "peek"):
3858                self.assertRaises(ValueError, f.peek, 1)
3859            self.assertRaises(ValueError, f.read)
3860            if hasattr(f, "read1"):
3861                self.assertRaises(ValueError, f.read1, 1024)
3862                self.assertRaises(ValueError, f.read1)
3863            if hasattr(f, "readall"):
3864                self.assertRaises(ValueError, f.readall)
3865            if hasattr(f, "readinto"):
3866                self.assertRaises(ValueError, f.readinto, bytearray(1024))
3867            if hasattr(f, "readinto1"):
3868                self.assertRaises(ValueError, f.readinto1, bytearray(1024))
3869            self.assertRaises(ValueError, f.readline)
3870            self.assertRaises(ValueError, f.readlines)
3871            self.assertRaises(ValueError, f.readlines, 1)
3872            self.assertRaises(ValueError, f.seek, 0)
3873            self.assertRaises(ValueError, f.tell)
3874            self.assertRaises(ValueError, f.truncate)
3875            self.assertRaises(ValueError, f.write,
3876                              b"" if "b" in kwargs['mode'] else "")
3877            self.assertRaises(ValueError, f.writelines, [])
3878            self.assertRaises(ValueError, next, f)
3879
3880    def test_blockingioerror(self):
3881        # Various BlockingIOError issues
3882        class C(str):
3883            pass
3884        c = C("")
3885        b = self.BlockingIOError(1, c)
3886        c.b = b
3887        b.c = c
3888        wr = weakref.ref(c)
3889        del c, b
3890        support.gc_collect()
3891        self.assertIsNone(wr(), wr)
3892
3893    def test_abcs(self):
3894        # Test the visible base classes are ABCs.
3895        self.assertIsInstance(self.IOBase, abc.ABCMeta)
3896        self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3897        self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3898        self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
3899
3900    def _check_abc_inheritance(self, abcmodule):
3901        with self.open(support.TESTFN, "wb", buffering=0) as f:
3902            self.assertIsInstance(f, abcmodule.IOBase)
3903            self.assertIsInstance(f, abcmodule.RawIOBase)
3904            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3905            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3906        with self.open(support.TESTFN, "wb") as f:
3907            self.assertIsInstance(f, abcmodule.IOBase)
3908            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3909            self.assertIsInstance(f, abcmodule.BufferedIOBase)
3910            self.assertNotIsInstance(f, abcmodule.TextIOBase)
3911        with self.open(support.TESTFN, "w") as f:
3912            self.assertIsInstance(f, abcmodule.IOBase)
3913            self.assertNotIsInstance(f, abcmodule.RawIOBase)
3914            self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3915            self.assertIsInstance(f, abcmodule.TextIOBase)
3916
3917    def test_abc_inheritance(self):
3918        # Test implementations inherit from their respective ABCs
3919        self._check_abc_inheritance(self)
3920
3921    def test_abc_inheritance_official(self):
3922        # Test implementations inherit from the official ABCs of the
3923        # baseline "io" module.
3924        self._check_abc_inheritance(io)
3925
3926    def _check_warn_on_dealloc(self, *args, **kwargs):
3927        f = open(*args, **kwargs)
3928        r = repr(f)
3929        with self.assertWarns(ResourceWarning) as cm:
3930            f = None
3931            support.gc_collect()
3932        self.assertIn(r, str(cm.warning.args[0]))
3933
3934    def test_warn_on_dealloc(self):
3935        self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
3936        self._check_warn_on_dealloc(support.TESTFN, "wb")
3937        self._check_warn_on_dealloc(support.TESTFN, "w")
3938
3939    def _check_warn_on_dealloc_fd(self, *args, **kwargs):
3940        fds = []
3941        def cleanup_fds():
3942            for fd in fds:
3943                try:
3944                    os.close(fd)
3945                except OSError as e:
3946                    if e.errno != errno.EBADF:
3947                        raise
3948        self.addCleanup(cleanup_fds)
3949        r, w = os.pipe()
3950        fds += r, w
3951        self._check_warn_on_dealloc(r, *args, **kwargs)
3952        # When using closefd=False, there's no warning
3953        r, w = os.pipe()
3954        fds += r, w
3955        with support.check_no_resource_warning(self):
3956            open(r, *args, closefd=False, **kwargs)
3957
3958    def test_warn_on_dealloc_fd(self):
3959        self._check_warn_on_dealloc_fd("rb", buffering=0)
3960        self._check_warn_on_dealloc_fd("rb")
3961        self._check_warn_on_dealloc_fd("r")
3962
3963
3964    def test_pickling(self):
3965        # Pickling file objects is forbidden
3966        for kwargs in [
3967                {"mode": "w"},
3968                {"mode": "wb"},
3969                {"mode": "wb", "buffering": 0},
3970                {"mode": "r"},
3971                {"mode": "rb"},
3972                {"mode": "rb", "buffering": 0},
3973                {"mode": "w+"},
3974                {"mode": "w+b"},
3975                {"mode": "w+b", "buffering": 0},
3976            ]:
3977            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
3978                with self.open(support.TESTFN, **kwargs) as f:
3979                    self.assertRaises(TypeError, pickle.dumps, f, protocol)
3980
3981    def test_nonblock_pipe_write_bigbuf(self):
3982        self._test_nonblock_pipe_write(16*1024)
3983
3984    def test_nonblock_pipe_write_smallbuf(self):
3985        self._test_nonblock_pipe_write(1024)
3986
3987    @unittest.skipUnless(hasattr(os, 'set_blocking'),
3988                         'os.set_blocking() required for this test')
3989    def _test_nonblock_pipe_write(self, bufsize):
3990        sent = []
3991        received = []
3992        r, w = os.pipe()
3993        os.set_blocking(r, False)
3994        os.set_blocking(w, False)
3995
3996        # To exercise all code paths in the C implementation we need
3997        # to play with buffer sizes.  For instance, if we choose a
3998        # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
3999        # then we will never get a partial write of the buffer.
4000        rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4001        wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4002
4003        with rf, wf:
4004            for N in 9999, 73, 7574:
4005                try:
4006                    i = 0
4007                    while True:
4008                        msg = bytes([i % 26 + 97]) * N
4009                        sent.append(msg)
4010                        wf.write(msg)
4011                        i += 1
4012
4013                except self.BlockingIOError as e:
4014                    self.assertEqual(e.args[0], errno.EAGAIN)
4015                    self.assertEqual(e.args[2], e.characters_written)
4016                    sent[-1] = sent[-1][:e.characters_written]
4017                    received.append(rf.read())
4018                    msg = b'BLOCKED'
4019                    wf.write(msg)
4020                    sent.append(msg)
4021
4022            while True:
4023                try:
4024                    wf.flush()
4025                    break
4026                except self.BlockingIOError as e:
4027                    self.assertEqual(e.args[0], errno.EAGAIN)
4028                    self.assertEqual(e.args[2], e.characters_written)
4029                    self.assertEqual(e.characters_written, 0)
4030                    received.append(rf.read())
4031
4032            received += iter(rf.read, None)
4033
4034        sent, received = b''.join(sent), b''.join(received)
4035        self.assertEqual(sent, received)
4036        self.assertTrue(wf.closed)
4037        self.assertTrue(rf.closed)
4038
4039    def test_create_fail(self):
4040        # 'x' mode fails if file is existing
4041        with self.open(support.TESTFN, 'w'):
4042            pass
4043        self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4044
4045    def test_create_writes(self):
4046        # 'x' mode opens for writing
4047        with self.open(support.TESTFN, 'xb') as f:
4048            f.write(b"spam")
4049        with self.open(support.TESTFN, 'rb') as f:
4050            self.assertEqual(b"spam", f.read())
4051
4052    def test_open_allargs(self):
4053        # there used to be a buffer overflow in the parser for rawmode
4054        self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4055
4056
4057class CMiscIOTest(MiscIOTest):
4058    io = io
4059
4060    def test_readinto_buffer_overflow(self):
4061        # Issue #18025
4062        class BadReader(self.io.BufferedIOBase):
4063            def read(self, n=-1):
4064                return b'x' * 10**6
4065        bufio = BadReader()
4066        b = bytearray(2)
4067        self.assertRaises(ValueError, bufio.readinto, b)
4068
4069    def check_daemon_threads_shutdown_deadlock(self, stream_name):
4070        # Issue #23309: deadlocks at shutdown should be avoided when a
4071        # daemon thread and the main thread both write to a file.
4072        code = """if 1:
4073            import sys
4074            import time
4075            import threading
4076            from test.support import SuppressCrashReport
4077
4078            file = sys.{stream_name}
4079
4080            def run():
4081                while True:
4082                    file.write('.')
4083                    file.flush()
4084
4085            crash = SuppressCrashReport()
4086            crash.__enter__()
4087            # don't call __exit__(): the crash occurs at Python shutdown
4088
4089            thread = threading.Thread(target=run)
4090            thread.daemon = True
4091            thread.start()
4092
4093            time.sleep(0.5)
4094            file.write('!')
4095            file.flush()
4096            """.format_map(locals())
4097        res, _ = run_python_until_end("-c", code)
4098        err = res.err.decode()
4099        if res.rc != 0:
4100            # Failure: should be a fatal error
4101            self.assertIn("Fatal Python error: could not acquire lock "
4102                          "for <_io.BufferedWriter name='<{stream_name}>'> "
4103                          "at interpreter shutdown, possibly due to "
4104                          "daemon threads".format_map(locals()),
4105                          err)
4106        else:
4107            self.assertFalse(err.strip('.!'))
4108
4109    def test_daemon_threads_shutdown_stdout_deadlock(self):
4110        self.check_daemon_threads_shutdown_deadlock('stdout')
4111
4112    def test_daemon_threads_shutdown_stderr_deadlock(self):
4113        self.check_daemon_threads_shutdown_deadlock('stderr')
4114
4115
4116class PyMiscIOTest(MiscIOTest):
4117    io = pyio
4118
4119
4120@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4121class SignalsTest(unittest.TestCase):
4122
4123    def setUp(self):
4124        self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4125
4126    def tearDown(self):
4127        signal.signal(signal.SIGALRM, self.oldalrm)
4128
4129    def alarm_interrupt(self, sig, frame):
4130        1/0
4131
4132    def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4133        """Check that a partial write, when it gets interrupted, properly
4134        invokes the signal handler, and bubbles up the exception raised
4135        in the latter."""
4136        read_results = []
4137        def _read():
4138            s = os.read(r, 1)
4139            read_results.append(s)
4140
4141        t = threading.Thread(target=_read)
4142        t.daemon = True
4143        r, w = os.pipe()
4144        fdopen_kwargs["closefd"] = False
4145        large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
4146        try:
4147            wio = self.io.open(w, **fdopen_kwargs)
4148            if hasattr(signal, 'pthread_sigmask'):
4149                # create the thread with SIGALRM signal blocked
4150                signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4151                t.start()
4152                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4153            else:
4154                t.start()
4155
4156            # Fill the pipe enough that the write will be blocking.
4157            # It will be interrupted by the timer armed above.  Since the
4158            # other thread has read one byte, the low-level write will
4159            # return with a successful (partial) result rather than an EINTR.
4160            # The buffered IO layer must check for pending signal
4161            # handlers, which in this case will invoke alarm_interrupt().
4162            signal.alarm(1)
4163            try:
4164                self.assertRaises(ZeroDivisionError, wio.write, large_data)
4165            finally:
4166                signal.alarm(0)
4167                t.join()
4168            # We got one byte, get another one and check that it isn't a
4169            # repeat of the first one.
4170            read_results.append(os.read(r, 1))
4171            self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4172        finally:
4173            os.close(w)
4174            os.close(r)
4175            # This is deliberate. If we didn't close the file descriptor
4176            # before closing wio, wio would try to flush its internal
4177            # buffer, and block again.
4178            try:
4179                wio.close()
4180            except OSError as e:
4181                if e.errno != errno.EBADF:
4182                    raise
4183
4184    def test_interrupted_write_unbuffered(self):
4185        self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4186
4187    def test_interrupted_write_buffered(self):
4188        self.check_interrupted_write(b"xy", b"xy", mode="wb")
4189
4190    def test_interrupted_write_text(self):
4191        self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4192
4193    @support.no_tracing
4194    def check_reentrant_write(self, data, **fdopen_kwargs):
4195        def on_alarm(*args):
4196            # Will be called reentrantly from the same thread
4197            wio.write(data)
4198            1/0
4199        signal.signal(signal.SIGALRM, on_alarm)
4200        r, w = os.pipe()
4201        wio = self.io.open(w, **fdopen_kwargs)
4202        try:
4203            signal.alarm(1)
4204            # Either the reentrant call to wio.write() fails with RuntimeError,
4205            # or the signal handler raises ZeroDivisionError.
4206            with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4207                while 1:
4208                    for i in range(100):
4209                        wio.write(data)
4210                        wio.flush()
4211                    # Make sure the buffer doesn't fill up and block further writes
4212                    os.read(r, len(data) * 100)
4213            exc = cm.exception
4214            if isinstance(exc, RuntimeError):
4215                self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4216        finally:
4217            signal.alarm(0)
4218            wio.close()
4219            os.close(r)
4220
4221    def test_reentrant_write_buffered(self):
4222        self.check_reentrant_write(b"xy", mode="wb")
4223
4224    def test_reentrant_write_text(self):
4225        self.check_reentrant_write("xy", mode="w", encoding="ascii")
4226
4227    def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4228        """Check that a buffered read, when it gets interrupted (either
4229        returning a partial result or EINTR), properly invokes the signal
4230        handler and retries if the latter returned successfully."""
4231        r, w = os.pipe()
4232        fdopen_kwargs["closefd"] = False
4233        def alarm_handler(sig, frame):
4234            os.write(w, b"bar")
4235        signal.signal(signal.SIGALRM, alarm_handler)
4236        try:
4237            rio = self.io.open(r, **fdopen_kwargs)
4238            os.write(w, b"foo")
4239            signal.alarm(1)
4240            # Expected behaviour:
4241            # - first raw read() returns partial b"foo"
4242            # - second raw read() returns EINTR
4243            # - third raw read() returns b"bar"
4244            self.assertEqual(decode(rio.read(6)), "foobar")
4245        finally:
4246            signal.alarm(0)
4247            rio.close()
4248            os.close(w)
4249            os.close(r)
4250
4251    def test_interrupted_read_retry_buffered(self):
4252        self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4253                                          mode="rb")
4254
4255    def test_interrupted_read_retry_text(self):
4256        self.check_interrupted_read_retry(lambda x: x,
4257                                          mode="r")
4258
4259    def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4260        """Check that a buffered write, when it gets interrupted (either
4261        returning a partial result or EINTR), properly invokes the signal
4262        handler and retries if the latter returned successfully."""
4263        select = support.import_module("select")
4264
4265        # A quantity that exceeds the buffer size of an anonymous pipe's
4266        # write end.
4267        N = support.PIPE_MAX_SIZE
4268        r, w = os.pipe()
4269        fdopen_kwargs["closefd"] = False
4270
4271        # We need a separate thread to read from the pipe and allow the
4272        # write() to finish.  This thread is started after the SIGALRM is
4273        # received (forcing a first EINTR in write()).
4274        read_results = []
4275        write_finished = False
4276        error = None
4277        def _read():
4278            try:
4279                while not write_finished:
4280                    while r in select.select([r], [], [], 1.0)[0]:
4281                        s = os.read(r, 1024)
4282                        read_results.append(s)
4283            except BaseException as exc:
4284                nonlocal error
4285                error = exc
4286        t = threading.Thread(target=_read)
4287        t.daemon = True
4288        def alarm1(sig, frame):
4289            signal.signal(signal.SIGALRM, alarm2)
4290            signal.alarm(1)
4291        def alarm2(sig, frame):
4292            t.start()
4293
4294        large_data = item * N
4295        signal.signal(signal.SIGALRM, alarm1)
4296        try:
4297            wio = self.io.open(w, **fdopen_kwargs)
4298            signal.alarm(1)
4299            # Expected behaviour:
4300            # - first raw write() is partial (because of the limited pipe buffer
4301            #   and the first alarm)
4302            # - second raw write() returns EINTR (because of the second alarm)
4303            # - subsequent write()s are successful (either partial or complete)
4304            written = wio.write(large_data)
4305            self.assertEqual(N, written)
4306
4307            wio.flush()
4308            write_finished = True
4309            t.join()
4310
4311            self.assertIsNone(error)
4312            self.assertEqual(N, sum(len(x) for x in read_results))
4313        finally:
4314            signal.alarm(0)
4315            write_finished = True
4316            os.close(w)
4317            os.close(r)
4318            # This is deliberate. If we didn't close the file descriptor
4319            # before closing wio, wio would try to flush its internal
4320            # buffer, and could block (in case of failure).
4321            try:
4322                wio.close()
4323            except OSError as e:
4324                if e.errno != errno.EBADF:
4325                    raise
4326
4327    def test_interrupted_write_retry_buffered(self):
4328        self.check_interrupted_write_retry(b"x", mode="wb")
4329
4330    def test_interrupted_write_retry_text(self):
4331        self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4332
4333
4334class CSignalsTest(SignalsTest):
4335    io = io
4336
4337class PySignalsTest(SignalsTest):
4338    io = pyio
4339
4340    # Handling reentrancy issues would slow down _pyio even more, so the
4341    # tests are disabled.
4342    test_reentrant_write_buffered = None
4343    test_reentrant_write_text = None
4344
4345
4346def load_tests(*args):
4347    tests = (CIOTest, PyIOTest, APIMismatchTest,
4348             CBufferedReaderTest, PyBufferedReaderTest,
4349             CBufferedWriterTest, PyBufferedWriterTest,
4350             CBufferedRWPairTest, PyBufferedRWPairTest,
4351             CBufferedRandomTest, PyBufferedRandomTest,
4352             StatefulIncrementalDecoderTest,
4353             CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4354             CTextIOWrapperTest, PyTextIOWrapperTest,
4355             CMiscIOTest, PyMiscIOTest,
4356             CSignalsTest, PySignalsTest,
4357             )
4358
4359    # Put the namespaces of the IO module we are testing and some useful mock
4360    # classes in the __dict__ of each test.
4361    mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
4362             MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4363             SlowFlushRawIO)
4364    all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4365    c_io_ns = {name : getattr(io, name) for name in all_members}
4366    py_io_ns = {name : getattr(pyio, name) for name in all_members}
4367    globs = globals()
4368    c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4369    py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4370    # Avoid turning open into a bound method.
4371    py_io_ns["open"] = pyio.OpenWrapper
4372    for test in tests:
4373        if test.__name__.startswith("C"):
4374            for name, obj in c_io_ns.items():
4375                setattr(test, name, obj)
4376        elif test.__name__.startswith("Py"):
4377            for name, obj in py_io_ns.items():
4378                setattr(test, name, obj)
4379
4380    suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4381    return suite
4382
4383if __name__ == "__main__":
4384    unittest.main()
4385