1"""Unit tests for the io module.""" 2 3# Tests of io are scattered over the test suite: 4# * test_bufio - tests file buffering 5# * test_memoryio - tests BytesIO and StringIO 6# * test_fileio - tests FileIO 7# * test_file - tests the file interface 8# * test_io - tests everything else in the io module 9# * test_univnewlines - tests universal newline support 10# * test_largefile - tests operations on a file greater than 2**32 bytes 11# (only enabled with -ulargefile) 12 13################################################################################ 14# ATTENTION TEST WRITERS!!! 15################################################################################ 16# When writing tests for io, it's important to test both the C and Python 17# implementations. This is usually done by writing a base test that refers to 18# the type it is testing as an attribute. Then it provides custom subclasses to 19# test both implementations. This file has lots of examples. 20################################################################################ 21 22import abc 23import array 24import errno 25import locale 26import os 27import pickle 28import random 29import signal 30import sys 31import sysconfig 32import textwrap 33import threading 34import time 35import unittest 36import warnings 37import weakref 38from collections import deque, UserList 39from itertools import cycle, count 40from test import support 41from test.support.script_helper import ( 42 assert_python_ok, assert_python_failure, run_python_until_end) 43from test.support import FakePath 44 45import codecs 46import io # C implementation of io 47import _pyio as pyio # Python implementation of io 48 49try: 50 import ctypes 51except ImportError: 52 def byteslike(*pos, **kw): 53 return array.array("b", bytes(*pos, **kw)) 54else: 55 def byteslike(*pos, **kw): 56 """Create a bytes-like object having no string or sequence methods""" 57 data = bytes(*pos, **kw) 58 obj = EmptyStruct() 59 ctypes.resize(obj, len(data)) 60 memoryview(obj).cast("B")[:] = data 61 return obj 62 class EmptyStruct(ctypes.Structure): 63 pass 64 65_cflags = sysconfig.get_config_var('CFLAGS') or '' 66_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 67MEMORY_SANITIZER = ( 68 '-fsanitize=memory' in _cflags or 69 '--with-memory-sanitizer' in _config_args 70) 71 72# Does io.IOBase finalizer log the exception if the close() method fails? 73# The exception is ignored silently by default in release build. 74IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode) 75 76 77def _default_chunk_size(): 78 """Get the default TextIOWrapper chunk size""" 79 with open(__file__, "r", encoding="latin-1") as f: 80 return f._CHUNK_SIZE 81 82 83class MockRawIOWithoutRead: 84 """A RawIO implementation without read(), so as to exercise the default 85 RawIO.read() which calls readinto().""" 86 87 def __init__(self, read_stack=()): 88 self._read_stack = list(read_stack) 89 self._write_stack = [] 90 self._reads = 0 91 self._extraneous_reads = 0 92 93 def write(self, b): 94 self._write_stack.append(bytes(b)) 95 return len(b) 96 97 def writable(self): 98 return True 99 100 def fileno(self): 101 return 42 102 103 def readable(self): 104 return True 105 106 def seekable(self): 107 return True 108 109 def seek(self, pos, whence): 110 return 0 # wrong but we gotta return something 111 112 def tell(self): 113 return 0 # same comment as above 114 115 def readinto(self, buf): 116 self._reads += 1 117 max_len = len(buf) 118 try: 119 data = self._read_stack[0] 120 except IndexError: 121 self._extraneous_reads += 1 122 return 0 123 if data is None: 124 del self._read_stack[0] 125 return None 126 n = len(data) 127 if len(data) <= max_len: 128 del self._read_stack[0] 129 buf[:n] = data 130 return n 131 else: 132 buf[:] = data[:max_len] 133 self._read_stack[0] = data[max_len:] 134 return max_len 135 136 def truncate(self, pos=None): 137 return pos 138 139class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase): 140 pass 141 142class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase): 143 pass 144 145 146class MockRawIO(MockRawIOWithoutRead): 147 148 def read(self, n=None): 149 self._reads += 1 150 try: 151 return self._read_stack.pop(0) 152 except: 153 self._extraneous_reads += 1 154 return b"" 155 156class CMockRawIO(MockRawIO, io.RawIOBase): 157 pass 158 159class PyMockRawIO(MockRawIO, pyio.RawIOBase): 160 pass 161 162 163class MisbehavedRawIO(MockRawIO): 164 def write(self, b): 165 return super().write(b) * 2 166 167 def read(self, n=None): 168 return super().read(n) * 2 169 170 def seek(self, pos, whence): 171 return -123 172 173 def tell(self): 174 return -456 175 176 def readinto(self, buf): 177 super().readinto(buf) 178 return len(buf) * 5 179 180class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase): 181 pass 182 183class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase): 184 pass 185 186 187class SlowFlushRawIO(MockRawIO): 188 def __init__(self): 189 super().__init__() 190 self.in_flush = threading.Event() 191 192 def flush(self): 193 self.in_flush.set() 194 time.sleep(0.25) 195 196class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase): 197 pass 198 199class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase): 200 pass 201 202 203class CloseFailureIO(MockRawIO): 204 closed = 0 205 206 def close(self): 207 if not self.closed: 208 self.closed = 1 209 raise OSError 210 211class CCloseFailureIO(CloseFailureIO, io.RawIOBase): 212 pass 213 214class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase): 215 pass 216 217 218class MockFileIO: 219 220 def __init__(self, data): 221 self.read_history = [] 222 super().__init__(data) 223 224 def read(self, n=None): 225 res = super().read(n) 226 self.read_history.append(None if res is None else len(res)) 227 return res 228 229 def readinto(self, b): 230 res = super().readinto(b) 231 self.read_history.append(res) 232 return res 233 234class CMockFileIO(MockFileIO, io.BytesIO): 235 pass 236 237class PyMockFileIO(MockFileIO, pyio.BytesIO): 238 pass 239 240 241class MockUnseekableIO: 242 def seekable(self): 243 return False 244 245 def seek(self, *args): 246 raise self.UnsupportedOperation("not seekable") 247 248 def tell(self, *args): 249 raise self.UnsupportedOperation("not seekable") 250 251 def truncate(self, *args): 252 raise self.UnsupportedOperation("not seekable") 253 254class CMockUnseekableIO(MockUnseekableIO, io.BytesIO): 255 UnsupportedOperation = io.UnsupportedOperation 256 257class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO): 258 UnsupportedOperation = pyio.UnsupportedOperation 259 260 261class MockNonBlockWriterIO: 262 263 def __init__(self): 264 self._write_stack = [] 265 self._blocker_char = None 266 267 def pop_written(self): 268 s = b"".join(self._write_stack) 269 self._write_stack[:] = [] 270 return s 271 272 def block_on(self, char): 273 """Block when a given char is encountered.""" 274 self._blocker_char = char 275 276 def readable(self): 277 return True 278 279 def seekable(self): 280 return True 281 282 def seek(self, pos, whence=0): 283 # naive implementation, enough for tests 284 return 0 285 286 def writable(self): 287 return True 288 289 def write(self, b): 290 b = bytes(b) 291 n = -1 292 if self._blocker_char: 293 try: 294 n = b.index(self._blocker_char) 295 except ValueError: 296 pass 297 else: 298 if n > 0: 299 # write data up to the first blocker 300 self._write_stack.append(b[:n]) 301 return n 302 else: 303 # cancel blocker and indicate would block 304 self._blocker_char = None 305 return None 306 self._write_stack.append(b) 307 return len(b) 308 309class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase): 310 BlockingIOError = io.BlockingIOError 311 312class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase): 313 BlockingIOError = pyio.BlockingIOError 314 315 316class IOTest(unittest.TestCase): 317 318 def setUp(self): 319 support.unlink(support.TESTFN) 320 321 def tearDown(self): 322 support.unlink(support.TESTFN) 323 324 def write_ops(self, f): 325 self.assertEqual(f.write(b"blah."), 5) 326 f.truncate(0) 327 self.assertEqual(f.tell(), 5) 328 f.seek(0) 329 330 self.assertEqual(f.write(b"blah."), 5) 331 self.assertEqual(f.seek(0), 0) 332 self.assertEqual(f.write(b"Hello."), 6) 333 self.assertEqual(f.tell(), 6) 334 self.assertEqual(f.seek(-1, 1), 5) 335 self.assertEqual(f.tell(), 5) 336 buffer = bytearray(b" world\n\n\n") 337 self.assertEqual(f.write(buffer), 9) 338 buffer[:] = b"*" * 9 # Overwrite our copy of the data 339 self.assertEqual(f.seek(0), 0) 340 self.assertEqual(f.write(b"h"), 1) 341 self.assertEqual(f.seek(-1, 2), 13) 342 self.assertEqual(f.tell(), 13) 343 344 self.assertEqual(f.truncate(12), 12) 345 self.assertEqual(f.tell(), 13) 346 self.assertRaises(TypeError, f.seek, 0.0) 347 348 def read_ops(self, f, buffered=False): 349 data = f.read(5) 350 self.assertEqual(data, b"hello") 351 data = byteslike(data) 352 self.assertEqual(f.readinto(data), 5) 353 self.assertEqual(bytes(data), b" worl") 354 data = bytearray(5) 355 self.assertEqual(f.readinto(data), 2) 356 self.assertEqual(len(data), 5) 357 self.assertEqual(data[:2], b"d\n") 358 self.assertEqual(f.seek(0), 0) 359 self.assertEqual(f.read(20), b"hello world\n") 360 self.assertEqual(f.read(1), b"") 361 self.assertEqual(f.readinto(byteslike(b"x")), 0) 362 self.assertEqual(f.seek(-6, 2), 6) 363 self.assertEqual(f.read(5), b"world") 364 self.assertEqual(f.read(0), b"") 365 self.assertEqual(f.readinto(byteslike()), 0) 366 self.assertEqual(f.seek(-6, 1), 5) 367 self.assertEqual(f.read(5), b" worl") 368 self.assertEqual(f.tell(), 10) 369 self.assertRaises(TypeError, f.seek, 0.0) 370 if buffered: 371 f.seek(0) 372 self.assertEqual(f.read(), b"hello world\n") 373 f.seek(6) 374 self.assertEqual(f.read(), b"world\n") 375 self.assertEqual(f.read(), b"") 376 f.seek(0) 377 data = byteslike(5) 378 self.assertEqual(f.readinto1(data), 5) 379 self.assertEqual(bytes(data), b"hello") 380 381 LARGE = 2**31 382 383 def large_file_ops(self, f): 384 assert f.readable() 385 assert f.writable() 386 try: 387 self.assertEqual(f.seek(self.LARGE), self.LARGE) 388 except (OverflowError, ValueError): 389 self.skipTest("no largefile support") 390 self.assertEqual(f.tell(), self.LARGE) 391 self.assertEqual(f.write(b"xxx"), 3) 392 self.assertEqual(f.tell(), self.LARGE + 3) 393 self.assertEqual(f.seek(-1, 1), self.LARGE + 2) 394 self.assertEqual(f.truncate(), self.LARGE + 2) 395 self.assertEqual(f.tell(), self.LARGE + 2) 396 self.assertEqual(f.seek(0, 2), self.LARGE + 2) 397 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1) 398 self.assertEqual(f.tell(), self.LARGE + 2) 399 self.assertEqual(f.seek(0, 2), self.LARGE + 1) 400 self.assertEqual(f.seek(-1, 2), self.LARGE) 401 self.assertEqual(f.read(2), b"x") 402 403 def test_invalid_operations(self): 404 # Try writing on a file opened in read mode and vice-versa. 405 exc = self.UnsupportedOperation 406 for mode in ("w", "wb"): 407 with self.open(support.TESTFN, mode) as fp: 408 self.assertRaises(exc, fp.read) 409 self.assertRaises(exc, fp.readline) 410 with self.open(support.TESTFN, "wb", buffering=0) as fp: 411 self.assertRaises(exc, fp.read) 412 self.assertRaises(exc, fp.readline) 413 with self.open(support.TESTFN, "rb", buffering=0) as fp: 414 self.assertRaises(exc, fp.write, b"blah") 415 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 416 with self.open(support.TESTFN, "rb") as fp: 417 self.assertRaises(exc, fp.write, b"blah") 418 self.assertRaises(exc, fp.writelines, [b"blah\n"]) 419 with self.open(support.TESTFN, "r") as fp: 420 self.assertRaises(exc, fp.write, "blah") 421 self.assertRaises(exc, fp.writelines, ["blah\n"]) 422 # Non-zero seeking from current or end pos 423 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR) 424 self.assertRaises(exc, fp.seek, -1, self.SEEK_END) 425 426 def test_optional_abilities(self): 427 # Test for OSError when optional APIs are not supported 428 # The purpose of this test is to try fileno(), reading, writing and 429 # seeking operations with various objects that indicate they do not 430 # support these operations. 431 432 def pipe_reader(): 433 [r, w] = os.pipe() 434 os.close(w) # So that read() is harmless 435 return self.FileIO(r, "r") 436 437 def pipe_writer(): 438 [r, w] = os.pipe() 439 self.addCleanup(os.close, r) 440 # Guarantee that we can write into the pipe without blocking 441 thread = threading.Thread(target=os.read, args=(r, 100)) 442 thread.start() 443 self.addCleanup(thread.join) 444 return self.FileIO(w, "w") 445 446 def buffered_reader(): 447 return self.BufferedReader(self.MockUnseekableIO()) 448 449 def buffered_writer(): 450 return self.BufferedWriter(self.MockUnseekableIO()) 451 452 def buffered_random(): 453 return self.BufferedRandom(self.BytesIO()) 454 455 def buffered_rw_pair(): 456 return self.BufferedRWPair(self.MockUnseekableIO(), 457 self.MockUnseekableIO()) 458 459 def text_reader(): 460 class UnseekableReader(self.MockUnseekableIO): 461 writable = self.BufferedIOBase.writable 462 write = self.BufferedIOBase.write 463 return self.TextIOWrapper(UnseekableReader(), "ascii") 464 465 def text_writer(): 466 class UnseekableWriter(self.MockUnseekableIO): 467 readable = self.BufferedIOBase.readable 468 read = self.BufferedIOBase.read 469 return self.TextIOWrapper(UnseekableWriter(), "ascii") 470 471 tests = ( 472 (pipe_reader, "fr"), (pipe_writer, "fw"), 473 (buffered_reader, "r"), (buffered_writer, "w"), 474 (buffered_random, "rws"), (buffered_rw_pair, "rw"), 475 (text_reader, "r"), (text_writer, "w"), 476 (self.BytesIO, "rws"), (self.StringIO, "rws"), 477 ) 478 for [test, abilities] in tests: 479 with self.subTest(test), test() as obj: 480 readable = "r" in abilities 481 self.assertEqual(obj.readable(), readable) 482 writable = "w" in abilities 483 self.assertEqual(obj.writable(), writable) 484 485 if isinstance(obj, self.TextIOBase): 486 data = "3" 487 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)): 488 data = b"3" 489 else: 490 self.fail("Unknown base class") 491 492 if "f" in abilities: 493 obj.fileno() 494 else: 495 self.assertRaises(OSError, obj.fileno) 496 497 if readable: 498 obj.read(1) 499 obj.read() 500 else: 501 self.assertRaises(OSError, obj.read, 1) 502 self.assertRaises(OSError, obj.read) 503 504 if writable: 505 obj.write(data) 506 else: 507 self.assertRaises(OSError, obj.write, data) 508 509 if sys.platform.startswith("win") and test in ( 510 pipe_reader, pipe_writer): 511 # Pipes seem to appear as seekable on Windows 512 continue 513 seekable = "s" in abilities 514 self.assertEqual(obj.seekable(), seekable) 515 516 if seekable: 517 obj.tell() 518 obj.seek(0) 519 else: 520 self.assertRaises(OSError, obj.tell) 521 self.assertRaises(OSError, obj.seek, 0) 522 523 if writable and seekable: 524 obj.truncate() 525 obj.truncate(0) 526 else: 527 self.assertRaises(OSError, obj.truncate) 528 self.assertRaises(OSError, obj.truncate, 0) 529 530 def test_open_handles_NUL_chars(self): 531 fn_with_NUL = 'foo\0bar' 532 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w') 533 534 bytes_fn = bytes(fn_with_NUL, 'ascii') 535 with warnings.catch_warnings(): 536 warnings.simplefilter("ignore", DeprecationWarning) 537 self.assertRaises(ValueError, self.open, bytes_fn, 'w') 538 539 def test_raw_file_io(self): 540 with self.open(support.TESTFN, "wb", buffering=0) as f: 541 self.assertEqual(f.readable(), False) 542 self.assertEqual(f.writable(), True) 543 self.assertEqual(f.seekable(), True) 544 self.write_ops(f) 545 with self.open(support.TESTFN, "rb", buffering=0) as f: 546 self.assertEqual(f.readable(), True) 547 self.assertEqual(f.writable(), False) 548 self.assertEqual(f.seekable(), True) 549 self.read_ops(f) 550 551 def test_buffered_file_io(self): 552 with self.open(support.TESTFN, "wb") as f: 553 self.assertEqual(f.readable(), False) 554 self.assertEqual(f.writable(), True) 555 self.assertEqual(f.seekable(), True) 556 self.write_ops(f) 557 with self.open(support.TESTFN, "rb") as f: 558 self.assertEqual(f.readable(), True) 559 self.assertEqual(f.writable(), False) 560 self.assertEqual(f.seekable(), True) 561 self.read_ops(f, True) 562 563 def test_readline(self): 564 with self.open(support.TESTFN, "wb") as f: 565 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line") 566 with self.open(support.TESTFN, "rb") as f: 567 self.assertEqual(f.readline(), b"abc\n") 568 self.assertEqual(f.readline(10), b"def\n") 569 self.assertEqual(f.readline(2), b"xy") 570 self.assertEqual(f.readline(4), b"zzy\n") 571 self.assertEqual(f.readline(), b"foo\x00bar\n") 572 self.assertEqual(f.readline(None), b"another line") 573 self.assertRaises(TypeError, f.readline, 5.3) 574 with self.open(support.TESTFN, "r") as f: 575 self.assertRaises(TypeError, f.readline, 5.3) 576 577 def test_readline_nonsizeable(self): 578 # Issue #30061 579 # Crash when readline() returns an object without __len__ 580 class R(self.IOBase): 581 def readline(self): 582 return None 583 self.assertRaises((TypeError, StopIteration), next, R()) 584 585 def test_next_nonsizeable(self): 586 # Issue #30061 587 # Crash when __next__() returns an object without __len__ 588 class R(self.IOBase): 589 def __next__(self): 590 return None 591 self.assertRaises(TypeError, R().readlines, 1) 592 593 def test_raw_bytes_io(self): 594 f = self.BytesIO() 595 self.write_ops(f) 596 data = f.getvalue() 597 self.assertEqual(data, b"hello world\n") 598 f = self.BytesIO(data) 599 self.read_ops(f, True) 600 601 def test_large_file_ops(self): 602 # On Windows and Mac OSX this test consumes large resources; It takes 603 # a long time to build the >2 GiB file and takes >2 GiB of disk space 604 # therefore the resource must be enabled to run this test. 605 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 606 support.requires( 607 'largefile', 608 'test requires %s bytes and a long time to run' % self.LARGE) 609 with self.open(support.TESTFN, "w+b", 0) as f: 610 self.large_file_ops(f) 611 with self.open(support.TESTFN, "w+b") as f: 612 self.large_file_ops(f) 613 614 def test_with_open(self): 615 for bufsize in (0, 100): 616 f = None 617 with self.open(support.TESTFN, "wb", bufsize) as f: 618 f.write(b"xxx") 619 self.assertEqual(f.closed, True) 620 f = None 621 try: 622 with self.open(support.TESTFN, "wb", bufsize) as f: 623 1/0 624 except ZeroDivisionError: 625 self.assertEqual(f.closed, True) 626 else: 627 self.fail("1/0 didn't raise an exception") 628 629 # issue 5008 630 def test_append_mode_tell(self): 631 with self.open(support.TESTFN, "wb") as f: 632 f.write(b"xxx") 633 with self.open(support.TESTFN, "ab", buffering=0) as f: 634 self.assertEqual(f.tell(), 3) 635 with self.open(support.TESTFN, "ab") as f: 636 self.assertEqual(f.tell(), 3) 637 with self.open(support.TESTFN, "a") as f: 638 self.assertGreater(f.tell(), 0) 639 640 def test_destructor(self): 641 record = [] 642 class MyFileIO(self.FileIO): 643 def __del__(self): 644 record.append(1) 645 try: 646 f = super().__del__ 647 except AttributeError: 648 pass 649 else: 650 f() 651 def close(self): 652 record.append(2) 653 super().close() 654 def flush(self): 655 record.append(3) 656 super().flush() 657 with support.check_warnings(('', ResourceWarning)): 658 f = MyFileIO(support.TESTFN, "wb") 659 f.write(b"xxx") 660 del f 661 support.gc_collect() 662 self.assertEqual(record, [1, 2, 3]) 663 with self.open(support.TESTFN, "rb") as f: 664 self.assertEqual(f.read(), b"xxx") 665 666 def _check_base_destructor(self, base): 667 record = [] 668 class MyIO(base): 669 def __init__(self): 670 # This exercises the availability of attributes on object 671 # destruction. 672 # (in the C version, close() is called by the tp_dealloc 673 # function, not by __del__) 674 self.on_del = 1 675 self.on_close = 2 676 self.on_flush = 3 677 def __del__(self): 678 record.append(self.on_del) 679 try: 680 f = super().__del__ 681 except AttributeError: 682 pass 683 else: 684 f() 685 def close(self): 686 record.append(self.on_close) 687 super().close() 688 def flush(self): 689 record.append(self.on_flush) 690 super().flush() 691 f = MyIO() 692 del f 693 support.gc_collect() 694 self.assertEqual(record, [1, 2, 3]) 695 696 def test_IOBase_destructor(self): 697 self._check_base_destructor(self.IOBase) 698 699 def test_RawIOBase_destructor(self): 700 self._check_base_destructor(self.RawIOBase) 701 702 def test_BufferedIOBase_destructor(self): 703 self._check_base_destructor(self.BufferedIOBase) 704 705 def test_TextIOBase_destructor(self): 706 self._check_base_destructor(self.TextIOBase) 707 708 def test_close_flushes(self): 709 with self.open(support.TESTFN, "wb") as f: 710 f.write(b"xxx") 711 with self.open(support.TESTFN, "rb") as f: 712 self.assertEqual(f.read(), b"xxx") 713 714 def test_array_writes(self): 715 a = array.array('i', range(10)) 716 n = len(a.tobytes()) 717 def check(f): 718 with f: 719 self.assertEqual(f.write(a), n) 720 f.writelines((a,)) 721 check(self.BytesIO()) 722 check(self.FileIO(support.TESTFN, "w")) 723 check(self.BufferedWriter(self.MockRawIO())) 724 check(self.BufferedRandom(self.MockRawIO())) 725 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())) 726 727 def test_closefd(self): 728 self.assertRaises(ValueError, self.open, support.TESTFN, 'w', 729 closefd=False) 730 731 def test_read_closed(self): 732 with self.open(support.TESTFN, "w") as f: 733 f.write("egg\n") 734 with self.open(support.TESTFN, "r") as f: 735 file = self.open(f.fileno(), "r", closefd=False) 736 self.assertEqual(file.read(), "egg\n") 737 file.seek(0) 738 file.close() 739 self.assertRaises(ValueError, file.read) 740 with self.open(support.TESTFN, "rb") as f: 741 file = self.open(f.fileno(), "rb", closefd=False) 742 self.assertEqual(file.read()[:3], b"egg") 743 file.close() 744 self.assertRaises(ValueError, file.readinto, bytearray(1)) 745 746 def test_no_closefd_with_filename(self): 747 # can't use closefd in combination with a file name 748 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False) 749 750 def test_closefd_attr(self): 751 with self.open(support.TESTFN, "wb") as f: 752 f.write(b"egg\n") 753 with self.open(support.TESTFN, "r") as f: 754 self.assertEqual(f.buffer.raw.closefd, True) 755 file = self.open(f.fileno(), "r", closefd=False) 756 self.assertEqual(file.buffer.raw.closefd, False) 757 758 def test_garbage_collection(self): 759 # FileIO objects are collected, and collecting them flushes 760 # all data to disk. 761 with support.check_warnings(('', ResourceWarning)): 762 f = self.FileIO(support.TESTFN, "wb") 763 f.write(b"abcxxx") 764 f.f = f 765 wr = weakref.ref(f) 766 del f 767 support.gc_collect() 768 self.assertIsNone(wr(), wr) 769 with self.open(support.TESTFN, "rb") as f: 770 self.assertEqual(f.read(), b"abcxxx") 771 772 def test_unbounded_file(self): 773 # Issue #1174606: reading from an unbounded stream such as /dev/zero. 774 zero = "/dev/zero" 775 if not os.path.exists(zero): 776 self.skipTest("{0} does not exist".format(zero)) 777 if sys.maxsize > 0x7FFFFFFF: 778 self.skipTest("test can only run in a 32-bit address space") 779 if support.real_max_memuse < support._2G: 780 self.skipTest("test requires at least 2 GiB of memory") 781 with self.open(zero, "rb", buffering=0) as f: 782 self.assertRaises(OverflowError, f.read) 783 with self.open(zero, "rb") as f: 784 self.assertRaises(OverflowError, f.read) 785 with self.open(zero, "r") as f: 786 self.assertRaises(OverflowError, f.read) 787 788 def check_flush_error_on_close(self, *args, **kwargs): 789 # Test that the file is closed despite failed flush 790 # and that flush() is called before file closed. 791 f = self.open(*args, **kwargs) 792 closed = [] 793 def bad_flush(): 794 closed[:] = [f.closed] 795 raise OSError() 796 f.flush = bad_flush 797 self.assertRaises(OSError, f.close) # exception not swallowed 798 self.assertTrue(f.closed) 799 self.assertTrue(closed) # flush() called 800 self.assertFalse(closed[0]) # flush() called before file closed 801 f.flush = lambda: None # break reference loop 802 803 def test_flush_error_on_close(self): 804 # raw file 805 # Issue #5700: io.FileIO calls flush() after file closed 806 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0) 807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 808 self.check_flush_error_on_close(fd, 'wb', buffering=0) 809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 810 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False) 811 os.close(fd) 812 # buffered io 813 self.check_flush_error_on_close(support.TESTFN, 'wb') 814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 815 self.check_flush_error_on_close(fd, 'wb') 816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 817 self.check_flush_error_on_close(fd, 'wb', closefd=False) 818 os.close(fd) 819 # text io 820 self.check_flush_error_on_close(support.TESTFN, 'w') 821 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 822 self.check_flush_error_on_close(fd, 'w') 823 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT) 824 self.check_flush_error_on_close(fd, 'w', closefd=False) 825 os.close(fd) 826 827 def test_multi_close(self): 828 f = self.open(support.TESTFN, "wb", buffering=0) 829 f.close() 830 f.close() 831 f.close() 832 self.assertRaises(ValueError, f.flush) 833 834 def test_RawIOBase_read(self): 835 # Exercise the default limited RawIOBase.read(n) implementation (which 836 # calls readinto() internally). 837 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None)) 838 self.assertEqual(rawio.read(2), b"ab") 839 self.assertEqual(rawio.read(2), b"c") 840 self.assertEqual(rawio.read(2), b"d") 841 self.assertEqual(rawio.read(2), None) 842 self.assertEqual(rawio.read(2), b"ef") 843 self.assertEqual(rawio.read(2), b"g") 844 self.assertEqual(rawio.read(2), None) 845 self.assertEqual(rawio.read(2), b"") 846 847 def test_types_have_dict(self): 848 test = ( 849 self.IOBase(), 850 self.RawIOBase(), 851 self.TextIOBase(), 852 self.StringIO(), 853 self.BytesIO() 854 ) 855 for obj in test: 856 self.assertTrue(hasattr(obj, "__dict__")) 857 858 def test_opener(self): 859 with self.open(support.TESTFN, "w") as f: 860 f.write("egg\n") 861 fd = os.open(support.TESTFN, os.O_RDONLY) 862 def opener(path, flags): 863 return fd 864 with self.open("non-existent", "r", opener=opener) as f: 865 self.assertEqual(f.read(), "egg\n") 866 867 def test_bad_opener_negative_1(self): 868 # Issue #27066. 869 def badopener(fname, flags): 870 return -1 871 with self.assertRaises(ValueError) as cm: 872 open('non-existent', 'r', opener=badopener) 873 self.assertEqual(str(cm.exception), 'opener returned -1') 874 875 def test_bad_opener_other_negative(self): 876 # Issue #27066. 877 def badopener(fname, flags): 878 return -2 879 with self.assertRaises(ValueError) as cm: 880 open('non-existent', 'r', opener=badopener) 881 self.assertEqual(str(cm.exception), 'opener returned -2') 882 883 def test_fileio_closefd(self): 884 # Issue #4841 885 with self.open(__file__, 'rb') as f1, \ 886 self.open(__file__, 'rb') as f2: 887 fileio = self.FileIO(f1.fileno(), closefd=False) 888 # .__init__() must not close f1 889 fileio.__init__(f2.fileno(), closefd=False) 890 f1.readline() 891 # .close() must not close f2 892 fileio.close() 893 f2.readline() 894 895 def test_nonbuffered_textio(self): 896 with support.check_no_resource_warning(self): 897 with self.assertRaises(ValueError): 898 self.open(support.TESTFN, 'w', buffering=0) 899 900 def test_invalid_newline(self): 901 with support.check_no_resource_warning(self): 902 with self.assertRaises(ValueError): 903 self.open(support.TESTFN, 'w', newline='invalid') 904 905 def test_buffered_readinto_mixin(self): 906 # Test the implementation provided by BufferedIOBase 907 class Stream(self.BufferedIOBase): 908 def read(self, size): 909 return b"12345" 910 read1 = read 911 stream = Stream() 912 for method in ("readinto", "readinto1"): 913 with self.subTest(method): 914 buffer = byteslike(5) 915 self.assertEqual(getattr(stream, method)(buffer), 5) 916 self.assertEqual(bytes(buffer), b"12345") 917 918 def test_fspath_support(self): 919 def check_path_succeeds(path): 920 with self.open(path, "w") as f: 921 f.write("egg\n") 922 923 with self.open(path, "r") as f: 924 self.assertEqual(f.read(), "egg\n") 925 926 check_path_succeeds(FakePath(support.TESTFN)) 927 check_path_succeeds(FakePath(os.fsencode(support.TESTFN))) 928 929 with self.open(support.TESTFN, "w") as f: 930 bad_path = FakePath(f.fileno()) 931 with self.assertRaises(TypeError): 932 self.open(bad_path, 'w') 933 934 bad_path = FakePath(None) 935 with self.assertRaises(TypeError): 936 self.open(bad_path, 'w') 937 938 bad_path = FakePath(FloatingPointError) 939 with self.assertRaises(FloatingPointError): 940 self.open(bad_path, 'w') 941 942 # ensure that refcounting is correct with some error conditions 943 with self.assertRaisesRegex(ValueError, 'read/write/append mode'): 944 self.open(FakePath(support.TESTFN), 'rwxa') 945 946 def test_RawIOBase_readall(self): 947 # Exercise the default unlimited RawIOBase.read() and readall() 948 # implementations. 949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 950 self.assertEqual(rawio.read(), b"abcdefg") 951 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg")) 952 self.assertEqual(rawio.readall(), b"abcdefg") 953 954 def test_BufferedIOBase_readinto(self): 955 # Exercise the default BufferedIOBase.readinto() and readinto1() 956 # implementations (which call read() or read1() internally). 957 class Reader(self.BufferedIOBase): 958 def __init__(self, avail): 959 self.avail = avail 960 def read(self, size): 961 result = self.avail[:size] 962 self.avail = self.avail[size:] 963 return result 964 def read1(self, size): 965 """Returns no more than 5 bytes at once""" 966 return self.read(min(size, 5)) 967 tests = ( 968 # (test method, total data available, read buffer size, expected 969 # read size) 970 ("readinto", 10, 5, 5), 971 ("readinto", 10, 6, 6), # More than read1() can return 972 ("readinto", 5, 6, 5), # Buffer larger than total available 973 ("readinto", 6, 7, 6), 974 ("readinto", 10, 0, 0), # Empty buffer 975 ("readinto1", 10, 5, 5), # Result limited to single read1() call 976 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return 977 ("readinto1", 5, 6, 5), # Buffer larger than total available 978 ("readinto1", 6, 7, 5), 979 ("readinto1", 10, 0, 0), # Empty buffer 980 ) 981 UNUSED_BYTE = 0x81 982 for test in tests: 983 with self.subTest(test): 984 method, avail, request, result = test 985 reader = Reader(bytes(range(avail))) 986 buffer = bytearray((UNUSED_BYTE,) * request) 987 method = getattr(reader, method) 988 self.assertEqual(method(buffer), result) 989 self.assertEqual(len(buffer), request) 990 self.assertSequenceEqual(buffer[:result], range(result)) 991 unused = (UNUSED_BYTE,) * (request - result) 992 self.assertSequenceEqual(buffer[result:], unused) 993 self.assertEqual(len(reader.avail), avail - result) 994 995 def test_close_assert(self): 996 class R(self.IOBase): 997 def __setattr__(self, name, value): 998 pass 999 def flush(self): 1000 raise OSError() 1001 f = R() 1002 # This would cause an assertion failure. 1003 self.assertRaises(OSError, f.close) 1004 1005 # Silence destructor error 1006 R.flush = lambda self: None 1007 1008 1009class CIOTest(IOTest): 1010 1011 def test_IOBase_finalize(self): 1012 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a 1013 # class which inherits IOBase and an object of this class are caught 1014 # in a reference cycle and close() is already in the method cache. 1015 class MyIO(self.IOBase): 1016 def close(self): 1017 pass 1018 1019 # create an instance to populate the method cache 1020 MyIO() 1021 obj = MyIO() 1022 obj.obj = obj 1023 wr = weakref.ref(obj) 1024 del MyIO 1025 del obj 1026 support.gc_collect() 1027 self.assertIsNone(wr(), wr) 1028 1029class PyIOTest(IOTest): 1030 pass 1031 1032 1033@support.cpython_only 1034class APIMismatchTest(unittest.TestCase): 1035 1036 def test_RawIOBase_io_in_pyio_match(self): 1037 """Test that pyio RawIOBase class has all c RawIOBase methods""" 1038 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase, 1039 ignore=('__weakref__',)) 1040 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods') 1041 1042 def test_RawIOBase_pyio_in_io_match(self): 1043 """Test that c RawIOBase class has all pyio RawIOBase methods""" 1044 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase) 1045 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods') 1046 1047 1048class CommonBufferedTests: 1049 # Tests common to BufferedReader, BufferedWriter and BufferedRandom 1050 1051 def test_detach(self): 1052 raw = self.MockRawIO() 1053 buf = self.tp(raw) 1054 self.assertIs(buf.detach(), raw) 1055 self.assertRaises(ValueError, buf.detach) 1056 1057 repr(buf) # Should still work 1058 1059 def test_fileno(self): 1060 rawio = self.MockRawIO() 1061 bufio = self.tp(rawio) 1062 1063 self.assertEqual(42, bufio.fileno()) 1064 1065 def test_invalid_args(self): 1066 rawio = self.MockRawIO() 1067 bufio = self.tp(rawio) 1068 # Invalid whence 1069 self.assertRaises(ValueError, bufio.seek, 0, -1) 1070 self.assertRaises(ValueError, bufio.seek, 0, 9) 1071 1072 def test_override_destructor(self): 1073 tp = self.tp 1074 record = [] 1075 class MyBufferedIO(tp): 1076 def __del__(self): 1077 record.append(1) 1078 try: 1079 f = super().__del__ 1080 except AttributeError: 1081 pass 1082 else: 1083 f() 1084 def close(self): 1085 record.append(2) 1086 super().close() 1087 def flush(self): 1088 record.append(3) 1089 super().flush() 1090 rawio = self.MockRawIO() 1091 bufio = MyBufferedIO(rawio) 1092 del bufio 1093 support.gc_collect() 1094 self.assertEqual(record, [1, 2, 3]) 1095 1096 def test_context_manager(self): 1097 # Test usability as a context manager 1098 rawio = self.MockRawIO() 1099 bufio = self.tp(rawio) 1100 def _with(): 1101 with bufio: 1102 pass 1103 _with() 1104 # bufio should now be closed, and using it a second time should raise 1105 # a ValueError. 1106 self.assertRaises(ValueError, _with) 1107 1108 def test_error_through_destructor(self): 1109 # Test that the exception state is not modified by a destructor, 1110 # even if close() fails. 1111 rawio = self.CloseFailureIO() 1112 with support.catch_unraisable_exception() as cm: 1113 with self.assertRaises(AttributeError): 1114 self.tp(rawio).xyzzy 1115 1116 if not IOBASE_EMITS_UNRAISABLE: 1117 self.assertIsNone(cm.unraisable) 1118 elif cm.unraisable is not None: 1119 self.assertEqual(cm.unraisable.exc_type, OSError) 1120 1121 def test_repr(self): 1122 raw = self.MockRawIO() 1123 b = self.tp(raw) 1124 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__) 1125 self.assertRegex(repr(b), "<%s>" % clsname) 1126 raw.name = "dummy" 1127 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname) 1128 raw.name = b"dummy" 1129 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname) 1130 1131 def test_recursive_repr(self): 1132 # Issue #25455 1133 raw = self.MockRawIO() 1134 b = self.tp(raw) 1135 with support.swap_attr(raw, 'name', b): 1136 try: 1137 repr(b) # Should not crash 1138 except RuntimeError: 1139 pass 1140 1141 def test_flush_error_on_close(self): 1142 # Test that buffered file is closed despite failed flush 1143 # and that flush() is called before file closed. 1144 raw = self.MockRawIO() 1145 closed = [] 1146 def bad_flush(): 1147 closed[:] = [b.closed, raw.closed] 1148 raise OSError() 1149 raw.flush = bad_flush 1150 b = self.tp(raw) 1151 self.assertRaises(OSError, b.close) # exception not swallowed 1152 self.assertTrue(b.closed) 1153 self.assertTrue(raw.closed) 1154 self.assertTrue(closed) # flush() called 1155 self.assertFalse(closed[0]) # flush() called before file closed 1156 self.assertFalse(closed[1]) 1157 raw.flush = lambda: None # break reference loop 1158 1159 def test_close_error_on_close(self): 1160 raw = self.MockRawIO() 1161 def bad_flush(): 1162 raise OSError('flush') 1163 def bad_close(): 1164 raise OSError('close') 1165 raw.close = bad_close 1166 b = self.tp(raw) 1167 b.flush = bad_flush 1168 with self.assertRaises(OSError) as err: # exception not swallowed 1169 b.close() 1170 self.assertEqual(err.exception.args, ('close',)) 1171 self.assertIsInstance(err.exception.__context__, OSError) 1172 self.assertEqual(err.exception.__context__.args, ('flush',)) 1173 self.assertFalse(b.closed) 1174 1175 # Silence destructor error 1176 raw.close = lambda: None 1177 b.flush = lambda: None 1178 1179 def test_nonnormalized_close_error_on_close(self): 1180 # Issue #21677 1181 raw = self.MockRawIO() 1182 def bad_flush(): 1183 raise non_existing_flush 1184 def bad_close(): 1185 raise non_existing_close 1186 raw.close = bad_close 1187 b = self.tp(raw) 1188 b.flush = bad_flush 1189 with self.assertRaises(NameError) as err: # exception not swallowed 1190 b.close() 1191 self.assertIn('non_existing_close', str(err.exception)) 1192 self.assertIsInstance(err.exception.__context__, NameError) 1193 self.assertIn('non_existing_flush', str(err.exception.__context__)) 1194 self.assertFalse(b.closed) 1195 1196 # Silence destructor error 1197 b.flush = lambda: None 1198 raw.close = lambda: None 1199 1200 def test_multi_close(self): 1201 raw = self.MockRawIO() 1202 b = self.tp(raw) 1203 b.close() 1204 b.close() 1205 b.close() 1206 self.assertRaises(ValueError, b.flush) 1207 1208 def test_unseekable(self): 1209 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1210 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1211 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1212 1213 def test_readonly_attributes(self): 1214 raw = self.MockRawIO() 1215 buf = self.tp(raw) 1216 x = self.MockRawIO() 1217 with self.assertRaises(AttributeError): 1218 buf.raw = x 1219 1220 1221class SizeofTest: 1222 1223 @support.cpython_only 1224 def test_sizeof(self): 1225 bufsize1 = 4096 1226 bufsize2 = 8192 1227 rawio = self.MockRawIO() 1228 bufio = self.tp(rawio, buffer_size=bufsize1) 1229 size = sys.getsizeof(bufio) - bufsize1 1230 rawio = self.MockRawIO() 1231 bufio = self.tp(rawio, buffer_size=bufsize2) 1232 self.assertEqual(sys.getsizeof(bufio), size + bufsize2) 1233 1234 @support.cpython_only 1235 def test_buffer_freeing(self) : 1236 bufsize = 4096 1237 rawio = self.MockRawIO() 1238 bufio = self.tp(rawio, buffer_size=bufsize) 1239 size = sys.getsizeof(bufio) - bufsize 1240 bufio.close() 1241 self.assertEqual(sys.getsizeof(bufio), size) 1242 1243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests): 1244 read_mode = "rb" 1245 1246 def test_constructor(self): 1247 rawio = self.MockRawIO([b"abc"]) 1248 bufio = self.tp(rawio) 1249 bufio.__init__(rawio) 1250 bufio.__init__(rawio, buffer_size=1024) 1251 bufio.__init__(rawio, buffer_size=16) 1252 self.assertEqual(b"abc", bufio.read()) 1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1256 rawio = self.MockRawIO([b"abc"]) 1257 bufio.__init__(rawio) 1258 self.assertEqual(b"abc", bufio.read()) 1259 1260 def test_uninitialized(self): 1261 bufio = self.tp.__new__(self.tp) 1262 del bufio 1263 bufio = self.tp.__new__(self.tp) 1264 self.assertRaisesRegex((ValueError, AttributeError), 1265 'uninitialized|has no attribute', 1266 bufio.read, 0) 1267 bufio.__init__(self.MockRawIO()) 1268 self.assertEqual(bufio.read(0), b'') 1269 1270 def test_read(self): 1271 for arg in (None, 7): 1272 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1273 bufio = self.tp(rawio) 1274 self.assertEqual(b"abcdefg", bufio.read(arg)) 1275 # Invalid args 1276 self.assertRaises(ValueError, bufio.read, -2) 1277 1278 def test_read1(self): 1279 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1280 bufio = self.tp(rawio) 1281 self.assertEqual(b"a", bufio.read(1)) 1282 self.assertEqual(b"b", bufio.read1(1)) 1283 self.assertEqual(rawio._reads, 1) 1284 self.assertEqual(b"", bufio.read1(0)) 1285 self.assertEqual(b"c", bufio.read1(100)) 1286 self.assertEqual(rawio._reads, 1) 1287 self.assertEqual(b"d", bufio.read1(100)) 1288 self.assertEqual(rawio._reads, 2) 1289 self.assertEqual(b"efg", bufio.read1(100)) 1290 self.assertEqual(rawio._reads, 3) 1291 self.assertEqual(b"", bufio.read1(100)) 1292 self.assertEqual(rawio._reads, 4) 1293 1294 def test_read1_arbitrary(self): 1295 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1296 bufio = self.tp(rawio) 1297 self.assertEqual(b"a", bufio.read(1)) 1298 self.assertEqual(b"bc", bufio.read1()) 1299 self.assertEqual(b"d", bufio.read1()) 1300 self.assertEqual(b"efg", bufio.read1(-1)) 1301 self.assertEqual(rawio._reads, 3) 1302 self.assertEqual(b"", bufio.read1()) 1303 self.assertEqual(rawio._reads, 4) 1304 1305 def test_readinto(self): 1306 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1307 bufio = self.tp(rawio) 1308 b = bytearray(2) 1309 self.assertEqual(bufio.readinto(b), 2) 1310 self.assertEqual(b, b"ab") 1311 self.assertEqual(bufio.readinto(b), 2) 1312 self.assertEqual(b, b"cd") 1313 self.assertEqual(bufio.readinto(b), 2) 1314 self.assertEqual(b, b"ef") 1315 self.assertEqual(bufio.readinto(b), 1) 1316 self.assertEqual(b, b"gf") 1317 self.assertEqual(bufio.readinto(b), 0) 1318 self.assertEqual(b, b"gf") 1319 rawio = self.MockRawIO((b"abc", None)) 1320 bufio = self.tp(rawio) 1321 self.assertEqual(bufio.readinto(b), 2) 1322 self.assertEqual(b, b"ab") 1323 self.assertEqual(bufio.readinto(b), 1) 1324 self.assertEqual(b, b"cb") 1325 1326 def test_readinto1(self): 1327 buffer_size = 10 1328 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl")) 1329 bufio = self.tp(rawio, buffer_size=buffer_size) 1330 b = bytearray(2) 1331 self.assertEqual(bufio.peek(3), b'abc') 1332 self.assertEqual(rawio._reads, 1) 1333 self.assertEqual(bufio.readinto1(b), 2) 1334 self.assertEqual(b, b"ab") 1335 self.assertEqual(rawio._reads, 1) 1336 self.assertEqual(bufio.readinto1(b), 1) 1337 self.assertEqual(b[:1], b"c") 1338 self.assertEqual(rawio._reads, 1) 1339 self.assertEqual(bufio.readinto1(b), 2) 1340 self.assertEqual(b, b"de") 1341 self.assertEqual(rawio._reads, 2) 1342 b = bytearray(2*buffer_size) 1343 self.assertEqual(bufio.peek(3), b'fgh') 1344 self.assertEqual(rawio._reads, 3) 1345 self.assertEqual(bufio.readinto1(b), 6) 1346 self.assertEqual(b[:6], b"fghjkl") 1347 self.assertEqual(rawio._reads, 4) 1348 1349 def test_readinto_array(self): 1350 buffer_size = 60 1351 data = b"a" * 26 1352 rawio = self.MockRawIO((data,)) 1353 bufio = self.tp(rawio, buffer_size=buffer_size) 1354 1355 # Create an array with element size > 1 byte 1356 b = array.array('i', b'x' * 32) 1357 assert len(b) != 16 1358 1359 # Read into it. We should get as many *bytes* as we can fit into b 1360 # (which is more than the number of elements) 1361 n = bufio.readinto(b) 1362 self.assertGreater(n, len(b)) 1363 1364 # Check that old contents of b are preserved 1365 bm = memoryview(b).cast('B') 1366 self.assertLess(n, len(bm)) 1367 self.assertEqual(bm[:n], data[:n]) 1368 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1369 1370 def test_readinto1_array(self): 1371 buffer_size = 60 1372 data = b"a" * 26 1373 rawio = self.MockRawIO((data,)) 1374 bufio = self.tp(rawio, buffer_size=buffer_size) 1375 1376 # Create an array with element size > 1 byte 1377 b = array.array('i', b'x' * 32) 1378 assert len(b) != 16 1379 1380 # Read into it. We should get as many *bytes* as we can fit into b 1381 # (which is more than the number of elements) 1382 n = bufio.readinto1(b) 1383 self.assertGreater(n, len(b)) 1384 1385 # Check that old contents of b are preserved 1386 bm = memoryview(b).cast('B') 1387 self.assertLess(n, len(bm)) 1388 self.assertEqual(bm[:n], data[:n]) 1389 self.assertEqual(bm[n:], b'x' * (len(bm[n:]))) 1390 1391 def test_readlines(self): 1392 def bufio(): 1393 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef")) 1394 return self.tp(rawio) 1395 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"]) 1396 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"]) 1397 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"]) 1398 1399 def test_buffering(self): 1400 data = b"abcdefghi" 1401 dlen = len(data) 1402 1403 tests = [ 1404 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ], 1405 [ 100, [ 3, 3, 3], [ dlen ] ], 1406 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ], 1407 ] 1408 1409 for bufsize, buf_read_sizes, raw_read_sizes in tests: 1410 rawio = self.MockFileIO(data) 1411 bufio = self.tp(rawio, buffer_size=bufsize) 1412 pos = 0 1413 for nbytes in buf_read_sizes: 1414 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes]) 1415 pos += nbytes 1416 # this is mildly implementation-dependent 1417 self.assertEqual(rawio.read_history, raw_read_sizes) 1418 1419 def test_read_non_blocking(self): 1420 # Inject some None's in there to simulate EWOULDBLOCK 1421 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None)) 1422 bufio = self.tp(rawio) 1423 self.assertEqual(b"abcd", bufio.read(6)) 1424 self.assertEqual(b"e", bufio.read(1)) 1425 self.assertEqual(b"fg", bufio.read()) 1426 self.assertEqual(b"", bufio.peek(1)) 1427 self.assertIsNone(bufio.read()) 1428 self.assertEqual(b"", bufio.read()) 1429 1430 rawio = self.MockRawIO((b"a", None, None)) 1431 self.assertEqual(b"a", rawio.readall()) 1432 self.assertIsNone(rawio.readall()) 1433 1434 def test_read_past_eof(self): 1435 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1436 bufio = self.tp(rawio) 1437 1438 self.assertEqual(b"abcdefg", bufio.read(9000)) 1439 1440 def test_read_all(self): 1441 rawio = self.MockRawIO((b"abc", b"d", b"efg")) 1442 bufio = self.tp(rawio) 1443 1444 self.assertEqual(b"abcdefg", bufio.read()) 1445 1446 @support.requires_resource('cpu') 1447 def test_threads(self): 1448 try: 1449 # Write out many bytes with exactly the same number of 0's, 1450 # 1's... 255's. This will help us check that concurrent reading 1451 # doesn't duplicate or forget contents. 1452 N = 1000 1453 l = list(range(256)) * N 1454 random.shuffle(l) 1455 s = bytes(bytearray(l)) 1456 with self.open(support.TESTFN, "wb") as f: 1457 f.write(s) 1458 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw: 1459 bufio = self.tp(raw, 8) 1460 errors = [] 1461 results = [] 1462 def f(): 1463 try: 1464 # Intra-buffer read then buffer-flushing read 1465 for n in cycle([1, 19]): 1466 s = bufio.read(n) 1467 if not s: 1468 break 1469 # list.append() is atomic 1470 results.append(s) 1471 except Exception as e: 1472 errors.append(e) 1473 raise 1474 threads = [threading.Thread(target=f) for x in range(20)] 1475 with support.start_threads(threads): 1476 time.sleep(0.02) # yield 1477 self.assertFalse(errors, 1478 "the following exceptions were caught: %r" % errors) 1479 s = b''.join(results) 1480 for i in range(256): 1481 c = bytes(bytearray([i])) 1482 self.assertEqual(s.count(c), N) 1483 finally: 1484 support.unlink(support.TESTFN) 1485 1486 def test_unseekable(self): 1487 bufio = self.tp(self.MockUnseekableIO(b"A" * 10)) 1488 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1490 bufio.read(1) 1491 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0) 1492 self.assertRaises(self.UnsupportedOperation, bufio.tell) 1493 1494 def test_misbehaved_io(self): 1495 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1496 bufio = self.tp(rawio) 1497 self.assertRaises(OSError, bufio.seek, 0) 1498 self.assertRaises(OSError, bufio.tell) 1499 1500 # Silence destructor error 1501 bufio.close = lambda: None 1502 1503 def test_no_extraneous_read(self): 1504 # Issue #9550; when the raw IO object has satisfied the read request, 1505 # we should not issue any additional reads, otherwise it may block 1506 # (e.g. socket). 1507 bufsize = 16 1508 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2): 1509 rawio = self.MockRawIO([b"x" * n]) 1510 bufio = self.tp(rawio, bufsize) 1511 self.assertEqual(bufio.read(n), b"x" * n) 1512 # Simple case: one raw read is enough to satisfy the request. 1513 self.assertEqual(rawio._extraneous_reads, 0, 1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1515 # A more complex case where two raw reads are needed to satisfy 1516 # the request. 1517 rawio = self.MockRawIO([b"x" * (n - 1), b"x"]) 1518 bufio = self.tp(rawio, bufsize) 1519 self.assertEqual(bufio.read(n), b"x" * n) 1520 self.assertEqual(rawio._extraneous_reads, 0, 1521 "failed for {}: {} != 0".format(n, rawio._extraneous_reads)) 1522 1523 def test_read_on_closed(self): 1524 # Issue #23796 1525 b = io.BufferedReader(io.BytesIO(b"12")) 1526 b.read(1) 1527 b.close() 1528 self.assertRaises(ValueError, b.peek) 1529 self.assertRaises(ValueError, b.read1, 1) 1530 1531 def test_truncate_on_read_only(self): 1532 rawio = self.MockFileIO(b"abc") 1533 bufio = self.tp(rawio) 1534 self.assertFalse(bufio.writable()) 1535 self.assertRaises(self.UnsupportedOperation, bufio.truncate) 1536 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0) 1537 1538 1539class CBufferedReaderTest(BufferedReaderTest, SizeofTest): 1540 tp = io.BufferedReader 1541 1542 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1543 "instead of returning NULL for malloc failure.") 1544 def test_constructor(self): 1545 BufferedReaderTest.test_constructor(self) 1546 # The allocation can succeed on 32-bit builds, e.g. with more 1547 # than 2 GiB RAM and a 64-bit kernel. 1548 if sys.maxsize > 0x7FFFFFFF: 1549 rawio = self.MockRawIO() 1550 bufio = self.tp(rawio) 1551 self.assertRaises((OverflowError, MemoryError, ValueError), 1552 bufio.__init__, rawio, sys.maxsize) 1553 1554 def test_initialization(self): 1555 rawio = self.MockRawIO([b"abc"]) 1556 bufio = self.tp(rawio) 1557 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1558 self.assertRaises(ValueError, bufio.read) 1559 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1560 self.assertRaises(ValueError, bufio.read) 1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1562 self.assertRaises(ValueError, bufio.read) 1563 1564 def test_misbehaved_io_read(self): 1565 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg")) 1566 bufio = self.tp(rawio) 1567 # _pyio.BufferedReader seems to implement reading different, so that 1568 # checking this is not so easy. 1569 self.assertRaises(OSError, bufio.read, 10) 1570 1571 def test_garbage_collection(self): 1572 # C BufferedReader objects are collected. 1573 # The Python version has __del__, so it ends into gc.garbage instead 1574 self.addCleanup(support.unlink, support.TESTFN) 1575 with support.check_warnings(('', ResourceWarning)): 1576 rawio = self.FileIO(support.TESTFN, "w+b") 1577 f = self.tp(rawio) 1578 f.f = f 1579 wr = weakref.ref(f) 1580 del f 1581 support.gc_collect() 1582 self.assertIsNone(wr(), wr) 1583 1584 def test_args_error(self): 1585 # Issue #17275 1586 with self.assertRaisesRegex(TypeError, "BufferedReader"): 1587 self.tp(io.BytesIO(), 1024, 1024, 1024) 1588 1589 1590class PyBufferedReaderTest(BufferedReaderTest): 1591 tp = pyio.BufferedReader 1592 1593 1594class BufferedWriterTest(unittest.TestCase, CommonBufferedTests): 1595 write_mode = "wb" 1596 1597 def test_constructor(self): 1598 rawio = self.MockRawIO() 1599 bufio = self.tp(rawio) 1600 bufio.__init__(rawio) 1601 bufio.__init__(rawio, buffer_size=1024) 1602 bufio.__init__(rawio, buffer_size=16) 1603 self.assertEqual(3, bufio.write(b"abc")) 1604 bufio.flush() 1605 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1606 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1607 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1608 bufio.__init__(rawio) 1609 self.assertEqual(3, bufio.write(b"ghi")) 1610 bufio.flush() 1611 self.assertEqual(b"".join(rawio._write_stack), b"abcghi") 1612 1613 def test_uninitialized(self): 1614 bufio = self.tp.__new__(self.tp) 1615 del bufio 1616 bufio = self.tp.__new__(self.tp) 1617 self.assertRaisesRegex((ValueError, AttributeError), 1618 'uninitialized|has no attribute', 1619 bufio.write, b'') 1620 bufio.__init__(self.MockRawIO()) 1621 self.assertEqual(bufio.write(b''), 0) 1622 1623 def test_detach_flush(self): 1624 raw = self.MockRawIO() 1625 buf = self.tp(raw) 1626 buf.write(b"howdy!") 1627 self.assertFalse(raw._write_stack) 1628 buf.detach() 1629 self.assertEqual(raw._write_stack, [b"howdy!"]) 1630 1631 def test_write(self): 1632 # Write to the buffered IO but don't overflow the buffer. 1633 writer = self.MockRawIO() 1634 bufio = self.tp(writer, 8) 1635 bufio.write(b"abc") 1636 self.assertFalse(writer._write_stack) 1637 buffer = bytearray(b"def") 1638 bufio.write(buffer) 1639 buffer[:] = b"***" # Overwrite our copy of the data 1640 bufio.flush() 1641 self.assertEqual(b"".join(writer._write_stack), b"abcdef") 1642 1643 def test_write_overflow(self): 1644 writer = self.MockRawIO() 1645 bufio = self.tp(writer, 8) 1646 contents = b"abcdefghijklmnop" 1647 for n in range(0, len(contents), 3): 1648 bufio.write(contents[n:n+3]) 1649 flushed = b"".join(writer._write_stack) 1650 # At least (total - 8) bytes were implicitly flushed, perhaps more 1651 # depending on the implementation. 1652 self.assertTrue(flushed.startswith(contents[:-8]), flushed) 1653 1654 def check_writes(self, intermediate_func): 1655 # Lots of writes, test the flushed output is as expected. 1656 contents = bytes(range(256)) * 1000 1657 n = 0 1658 writer = self.MockRawIO() 1659 bufio = self.tp(writer, 13) 1660 # Generator of write sizes: repeat each N 15 times then proceed to N+1 1661 def gen_sizes(): 1662 for size in count(1): 1663 for i in range(15): 1664 yield size 1665 sizes = gen_sizes() 1666 while n < len(contents): 1667 size = min(next(sizes), len(contents) - n) 1668 self.assertEqual(bufio.write(contents[n:n+size]), size) 1669 intermediate_func(bufio) 1670 n += size 1671 bufio.flush() 1672 self.assertEqual(contents, b"".join(writer._write_stack)) 1673 1674 def test_writes(self): 1675 self.check_writes(lambda bufio: None) 1676 1677 def test_writes_and_flushes(self): 1678 self.check_writes(lambda bufio: bufio.flush()) 1679 1680 def test_writes_and_seeks(self): 1681 def _seekabs(bufio): 1682 pos = bufio.tell() 1683 bufio.seek(pos + 1, 0) 1684 bufio.seek(pos - 1, 0) 1685 bufio.seek(pos, 0) 1686 self.check_writes(_seekabs) 1687 def _seekrel(bufio): 1688 pos = bufio.seek(0, 1) 1689 bufio.seek(+1, 1) 1690 bufio.seek(-1, 1) 1691 bufio.seek(pos, 0) 1692 self.check_writes(_seekrel) 1693 1694 def test_writes_and_truncates(self): 1695 self.check_writes(lambda bufio: bufio.truncate(bufio.tell())) 1696 1697 def test_write_non_blocking(self): 1698 raw = self.MockNonBlockWriterIO() 1699 bufio = self.tp(raw, 8) 1700 1701 self.assertEqual(bufio.write(b"abcd"), 4) 1702 self.assertEqual(bufio.write(b"efghi"), 5) 1703 # 1 byte will be written, the rest will be buffered 1704 raw.block_on(b"k") 1705 self.assertEqual(bufio.write(b"jklmn"), 5) 1706 1707 # 8 bytes will be written, 8 will be buffered and the rest will be lost 1708 raw.block_on(b"0") 1709 try: 1710 bufio.write(b"opqrwxyz0123456789") 1711 except self.BlockingIOError as e: 1712 written = e.characters_written 1713 else: 1714 self.fail("BlockingIOError should have been raised") 1715 self.assertEqual(written, 16) 1716 self.assertEqual(raw.pop_written(), 1717 b"abcdefghijklmnopqrwxyz") 1718 1719 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9) 1720 s = raw.pop_written() 1721 # Previously buffered bytes were flushed 1722 self.assertTrue(s.startswith(b"01234567A"), s) 1723 1724 def test_write_and_rewind(self): 1725 raw = io.BytesIO() 1726 bufio = self.tp(raw, 4) 1727 self.assertEqual(bufio.write(b"abcdef"), 6) 1728 self.assertEqual(bufio.tell(), 6) 1729 bufio.seek(0, 0) 1730 self.assertEqual(bufio.write(b"XY"), 2) 1731 bufio.seek(6, 0) 1732 self.assertEqual(raw.getvalue(), b"XYcdef") 1733 self.assertEqual(bufio.write(b"123456"), 6) 1734 bufio.flush() 1735 self.assertEqual(raw.getvalue(), b"XYcdef123456") 1736 1737 def test_flush(self): 1738 writer = self.MockRawIO() 1739 bufio = self.tp(writer, 8) 1740 bufio.write(b"abc") 1741 bufio.flush() 1742 self.assertEqual(b"abc", writer._write_stack[0]) 1743 1744 def test_writelines(self): 1745 l = [b'ab', b'cd', b'ef'] 1746 writer = self.MockRawIO() 1747 bufio = self.tp(writer, 8) 1748 bufio.writelines(l) 1749 bufio.flush() 1750 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1751 1752 def test_writelines_userlist(self): 1753 l = UserList([b'ab', b'cd', b'ef']) 1754 writer = self.MockRawIO() 1755 bufio = self.tp(writer, 8) 1756 bufio.writelines(l) 1757 bufio.flush() 1758 self.assertEqual(b''.join(writer._write_stack), b'abcdef') 1759 1760 def test_writelines_error(self): 1761 writer = self.MockRawIO() 1762 bufio = self.tp(writer, 8) 1763 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3]) 1764 self.assertRaises(TypeError, bufio.writelines, None) 1765 self.assertRaises(TypeError, bufio.writelines, 'abc') 1766 1767 def test_destructor(self): 1768 writer = self.MockRawIO() 1769 bufio = self.tp(writer, 8) 1770 bufio.write(b"abc") 1771 del bufio 1772 support.gc_collect() 1773 self.assertEqual(b"abc", writer._write_stack[0]) 1774 1775 def test_truncate(self): 1776 # Truncate implicitly flushes the buffer. 1777 self.addCleanup(support.unlink, support.TESTFN) 1778 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1779 bufio = self.tp(raw, 8) 1780 bufio.write(b"abcdef") 1781 self.assertEqual(bufio.truncate(3), 3) 1782 self.assertEqual(bufio.tell(), 6) 1783 with self.open(support.TESTFN, "rb", buffering=0) as f: 1784 self.assertEqual(f.read(), b"abc") 1785 1786 def test_truncate_after_write(self): 1787 # Ensure that truncate preserves the file position after 1788 # writes longer than the buffer size. 1789 # Issue: https://bugs.python.org/issue32228 1790 self.addCleanup(support.unlink, support.TESTFN) 1791 with self.open(support.TESTFN, "wb") as f: 1792 # Fill with some buffer 1793 f.write(b'\x00' * 10000) 1794 buffer_sizes = [8192, 4096, 200] 1795 for buffer_size in buffer_sizes: 1796 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f: 1797 f.write(b'\x00' * (buffer_size + 1)) 1798 # After write write_pos and write_end are set to 0 1799 f.read(1) 1800 # read operation makes sure that pos != raw_pos 1801 f.truncate() 1802 self.assertEqual(f.tell(), buffer_size + 2) 1803 1804 @support.requires_resource('cpu') 1805 def test_threads(self): 1806 try: 1807 # Write out many bytes from many threads and test they were 1808 # all flushed. 1809 N = 1000 1810 contents = bytes(range(256)) * N 1811 sizes = cycle([1, 19]) 1812 n = 0 1813 queue = deque() 1814 while n < len(contents): 1815 size = next(sizes) 1816 queue.append(contents[n:n+size]) 1817 n += size 1818 del contents 1819 # We use a real file object because it allows us to 1820 # exercise situations where the GIL is released before 1821 # writing the buffer to the raw streams. This is in addition 1822 # to concurrency issues due to switching threads in the middle 1823 # of Python code. 1824 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw: 1825 bufio = self.tp(raw, 8) 1826 errors = [] 1827 def f(): 1828 try: 1829 while True: 1830 try: 1831 s = queue.popleft() 1832 except IndexError: 1833 return 1834 bufio.write(s) 1835 except Exception as e: 1836 errors.append(e) 1837 raise 1838 threads = [threading.Thread(target=f) for x in range(20)] 1839 with support.start_threads(threads): 1840 time.sleep(0.02) # yield 1841 self.assertFalse(errors, 1842 "the following exceptions were caught: %r" % errors) 1843 bufio.close() 1844 with self.open(support.TESTFN, "rb") as f: 1845 s = f.read() 1846 for i in range(256): 1847 self.assertEqual(s.count(bytes([i])), N) 1848 finally: 1849 support.unlink(support.TESTFN) 1850 1851 def test_misbehaved_io(self): 1852 rawio = self.MisbehavedRawIO() 1853 bufio = self.tp(rawio, 5) 1854 self.assertRaises(OSError, bufio.seek, 0) 1855 self.assertRaises(OSError, bufio.tell) 1856 self.assertRaises(OSError, bufio.write, b"abcdef") 1857 1858 # Silence destructor error 1859 bufio.close = lambda: None 1860 1861 def test_max_buffer_size_removal(self): 1862 with self.assertRaises(TypeError): 1863 self.tp(self.MockRawIO(), 8, 12) 1864 1865 def test_write_error_on_close(self): 1866 raw = self.MockRawIO() 1867 def bad_write(b): 1868 raise OSError() 1869 raw.write = bad_write 1870 b = self.tp(raw) 1871 b.write(b'spam') 1872 self.assertRaises(OSError, b.close) # exception not swallowed 1873 self.assertTrue(b.closed) 1874 1875 def test_slow_close_from_thread(self): 1876 # Issue #31976 1877 rawio = self.SlowFlushRawIO() 1878 bufio = self.tp(rawio, 8) 1879 t = threading.Thread(target=bufio.close) 1880 t.start() 1881 rawio.in_flush.wait() 1882 self.assertRaises(ValueError, bufio.write, b'spam') 1883 self.assertTrue(bufio.closed) 1884 t.join() 1885 1886 1887 1888class CBufferedWriterTest(BufferedWriterTest, SizeofTest): 1889 tp = io.BufferedWriter 1890 1891 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 1892 "instead of returning NULL for malloc failure.") 1893 def test_constructor(self): 1894 BufferedWriterTest.test_constructor(self) 1895 # The allocation can succeed on 32-bit builds, e.g. with more 1896 # than 2 GiB RAM and a 64-bit kernel. 1897 if sys.maxsize > 0x7FFFFFFF: 1898 rawio = self.MockRawIO() 1899 bufio = self.tp(rawio) 1900 self.assertRaises((OverflowError, MemoryError, ValueError), 1901 bufio.__init__, rawio, sys.maxsize) 1902 1903 def test_initialization(self): 1904 rawio = self.MockRawIO() 1905 bufio = self.tp(rawio) 1906 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0) 1907 self.assertRaises(ValueError, bufio.write, b"def") 1908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16) 1909 self.assertRaises(ValueError, bufio.write, b"def") 1910 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1) 1911 self.assertRaises(ValueError, bufio.write, b"def") 1912 1913 def test_garbage_collection(self): 1914 # C BufferedWriter objects are collected, and collecting them flushes 1915 # all data to disk. 1916 # The Python version has __del__, so it ends into gc.garbage instead 1917 self.addCleanup(support.unlink, support.TESTFN) 1918 with support.check_warnings(('', ResourceWarning)): 1919 rawio = self.FileIO(support.TESTFN, "w+b") 1920 f = self.tp(rawio) 1921 f.write(b"123xxx") 1922 f.x = f 1923 wr = weakref.ref(f) 1924 del f 1925 support.gc_collect() 1926 self.assertIsNone(wr(), wr) 1927 with self.open(support.TESTFN, "rb") as f: 1928 self.assertEqual(f.read(), b"123xxx") 1929 1930 def test_args_error(self): 1931 # Issue #17275 1932 with self.assertRaisesRegex(TypeError, "BufferedWriter"): 1933 self.tp(io.BytesIO(), 1024, 1024, 1024) 1934 1935 1936class PyBufferedWriterTest(BufferedWriterTest): 1937 tp = pyio.BufferedWriter 1938 1939class BufferedRWPairTest(unittest.TestCase): 1940 1941 def test_constructor(self): 1942 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1943 self.assertFalse(pair.closed) 1944 1945 def test_uninitialized(self): 1946 pair = self.tp.__new__(self.tp) 1947 del pair 1948 pair = self.tp.__new__(self.tp) 1949 self.assertRaisesRegex((ValueError, AttributeError), 1950 'uninitialized|has no attribute', 1951 pair.read, 0) 1952 self.assertRaisesRegex((ValueError, AttributeError), 1953 'uninitialized|has no attribute', 1954 pair.write, b'') 1955 pair.__init__(self.MockRawIO(), self.MockRawIO()) 1956 self.assertEqual(pair.read(0), b'') 1957 self.assertEqual(pair.write(b''), 0) 1958 1959 def test_detach(self): 1960 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 1961 self.assertRaises(self.UnsupportedOperation, pair.detach) 1962 1963 def test_constructor_max_buffer_size_removal(self): 1964 with self.assertRaises(TypeError): 1965 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12) 1966 1967 def test_constructor_with_not_readable(self): 1968 class NotReadable(MockRawIO): 1969 def readable(self): 1970 return False 1971 1972 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO()) 1973 1974 def test_constructor_with_not_writeable(self): 1975 class NotWriteable(MockRawIO): 1976 def writable(self): 1977 return False 1978 1979 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable()) 1980 1981 def test_read(self): 1982 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 1983 1984 self.assertEqual(pair.read(3), b"abc") 1985 self.assertEqual(pair.read(1), b"d") 1986 self.assertEqual(pair.read(), b"ef") 1987 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO()) 1988 self.assertEqual(pair.read(None), b"abc") 1989 1990 def test_readlines(self): 1991 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO()) 1992 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1993 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"]) 1994 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"]) 1995 1996 def test_read1(self): 1997 # .read1() is delegated to the underlying reader object, so this test 1998 # can be shallow. 1999 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2000 2001 self.assertEqual(pair.read1(3), b"abc") 2002 self.assertEqual(pair.read1(), b"def") 2003 2004 def test_readinto(self): 2005 for method in ("readinto", "readinto1"): 2006 with self.subTest(method): 2007 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2008 2009 data = byteslike(b'\0' * 5) 2010 self.assertEqual(getattr(pair, method)(data), 5) 2011 self.assertEqual(bytes(data), b"abcde") 2012 2013 def test_write(self): 2014 w = self.MockRawIO() 2015 pair = self.tp(self.MockRawIO(), w) 2016 2017 pair.write(b"abc") 2018 pair.flush() 2019 buffer = bytearray(b"def") 2020 pair.write(buffer) 2021 buffer[:] = b"***" # Overwrite our copy of the data 2022 pair.flush() 2023 self.assertEqual(w._write_stack, [b"abc", b"def"]) 2024 2025 def test_peek(self): 2026 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO()) 2027 2028 self.assertTrue(pair.peek(3).startswith(b"abc")) 2029 self.assertEqual(pair.read(3), b"abc") 2030 2031 def test_readable(self): 2032 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2033 self.assertTrue(pair.readable()) 2034 2035 def test_writeable(self): 2036 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2037 self.assertTrue(pair.writable()) 2038 2039 def test_seekable(self): 2040 # BufferedRWPairs are never seekable, even if their readers and writers 2041 # are. 2042 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2043 self.assertFalse(pair.seekable()) 2044 2045 # .flush() is delegated to the underlying writer object and has been 2046 # tested in the test_write method. 2047 2048 def test_close_and_closed(self): 2049 pair = self.tp(self.MockRawIO(), self.MockRawIO()) 2050 self.assertFalse(pair.closed) 2051 pair.close() 2052 self.assertTrue(pair.closed) 2053 2054 def test_reader_close_error_on_close(self): 2055 def reader_close(): 2056 reader_non_existing 2057 reader = self.MockRawIO() 2058 reader.close = reader_close 2059 writer = self.MockRawIO() 2060 pair = self.tp(reader, writer) 2061 with self.assertRaises(NameError) as err: 2062 pair.close() 2063 self.assertIn('reader_non_existing', str(err.exception)) 2064 self.assertTrue(pair.closed) 2065 self.assertFalse(reader.closed) 2066 self.assertTrue(writer.closed) 2067 2068 # Silence destructor error 2069 reader.close = lambda: None 2070 2071 def test_writer_close_error_on_close(self): 2072 def writer_close(): 2073 writer_non_existing 2074 reader = self.MockRawIO() 2075 writer = self.MockRawIO() 2076 writer.close = writer_close 2077 pair = self.tp(reader, writer) 2078 with self.assertRaises(NameError) as err: 2079 pair.close() 2080 self.assertIn('writer_non_existing', str(err.exception)) 2081 self.assertFalse(pair.closed) 2082 self.assertTrue(reader.closed) 2083 self.assertFalse(writer.closed) 2084 2085 # Silence destructor error 2086 writer.close = lambda: None 2087 writer = None 2088 2089 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception 2090 with support.catch_unraisable_exception(): 2091 # Ignore BufferedRWPair unraisable exception 2092 with support.catch_unraisable_exception(): 2093 pair = None 2094 support.gc_collect() 2095 support.gc_collect() 2096 2097 def test_reader_writer_close_error_on_close(self): 2098 def reader_close(): 2099 reader_non_existing 2100 def writer_close(): 2101 writer_non_existing 2102 reader = self.MockRawIO() 2103 reader.close = reader_close 2104 writer = self.MockRawIO() 2105 writer.close = writer_close 2106 pair = self.tp(reader, writer) 2107 with self.assertRaises(NameError) as err: 2108 pair.close() 2109 self.assertIn('reader_non_existing', str(err.exception)) 2110 self.assertIsInstance(err.exception.__context__, NameError) 2111 self.assertIn('writer_non_existing', str(err.exception.__context__)) 2112 self.assertFalse(pair.closed) 2113 self.assertFalse(reader.closed) 2114 self.assertFalse(writer.closed) 2115 2116 # Silence destructor error 2117 reader.close = lambda: None 2118 writer.close = lambda: None 2119 2120 def test_isatty(self): 2121 class SelectableIsAtty(MockRawIO): 2122 def __init__(self, isatty): 2123 MockRawIO.__init__(self) 2124 self._isatty = isatty 2125 2126 def isatty(self): 2127 return self._isatty 2128 2129 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False)) 2130 self.assertFalse(pair.isatty()) 2131 2132 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False)) 2133 self.assertTrue(pair.isatty()) 2134 2135 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True)) 2136 self.assertTrue(pair.isatty()) 2137 2138 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True)) 2139 self.assertTrue(pair.isatty()) 2140 2141 def test_weakref_clearing(self): 2142 brw = self.tp(self.MockRawIO(), self.MockRawIO()) 2143 ref = weakref.ref(brw) 2144 brw = None 2145 ref = None # Shouldn't segfault. 2146 2147class CBufferedRWPairTest(BufferedRWPairTest): 2148 tp = io.BufferedRWPair 2149 2150class PyBufferedRWPairTest(BufferedRWPairTest): 2151 tp = pyio.BufferedRWPair 2152 2153 2154class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest): 2155 read_mode = "rb+" 2156 write_mode = "wb+" 2157 2158 def test_constructor(self): 2159 BufferedReaderTest.test_constructor(self) 2160 BufferedWriterTest.test_constructor(self) 2161 2162 def test_uninitialized(self): 2163 BufferedReaderTest.test_uninitialized(self) 2164 BufferedWriterTest.test_uninitialized(self) 2165 2166 def test_read_and_write(self): 2167 raw = self.MockRawIO((b"asdf", b"ghjk")) 2168 rw = self.tp(raw, 8) 2169 2170 self.assertEqual(b"as", rw.read(2)) 2171 rw.write(b"ddd") 2172 rw.write(b"eee") 2173 self.assertFalse(raw._write_stack) # Buffer writes 2174 self.assertEqual(b"ghjk", rw.read()) 2175 self.assertEqual(b"dddeee", raw._write_stack[0]) 2176 2177 def test_seek_and_tell(self): 2178 raw = self.BytesIO(b"asdfghjkl") 2179 rw = self.tp(raw) 2180 2181 self.assertEqual(b"as", rw.read(2)) 2182 self.assertEqual(2, rw.tell()) 2183 rw.seek(0, 0) 2184 self.assertEqual(b"asdf", rw.read(4)) 2185 2186 rw.write(b"123f") 2187 rw.seek(0, 0) 2188 self.assertEqual(b"asdf123fl", rw.read()) 2189 self.assertEqual(9, rw.tell()) 2190 rw.seek(-4, 2) 2191 self.assertEqual(5, rw.tell()) 2192 rw.seek(2, 1) 2193 self.assertEqual(7, rw.tell()) 2194 self.assertEqual(b"fl", rw.read(11)) 2195 rw.flush() 2196 self.assertEqual(b"asdf123fl", raw.getvalue()) 2197 2198 self.assertRaises(TypeError, rw.seek, 0.0) 2199 2200 def check_flush_and_read(self, read_func): 2201 raw = self.BytesIO(b"abcdefghi") 2202 bufio = self.tp(raw) 2203 2204 self.assertEqual(b"ab", read_func(bufio, 2)) 2205 bufio.write(b"12") 2206 self.assertEqual(b"ef", read_func(bufio, 2)) 2207 self.assertEqual(6, bufio.tell()) 2208 bufio.flush() 2209 self.assertEqual(6, bufio.tell()) 2210 self.assertEqual(b"ghi", read_func(bufio)) 2211 raw.seek(0, 0) 2212 raw.write(b"XYZ") 2213 # flush() resets the read buffer 2214 bufio.flush() 2215 bufio.seek(0, 0) 2216 self.assertEqual(b"XYZ", read_func(bufio, 3)) 2217 2218 def test_flush_and_read(self): 2219 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args)) 2220 2221 def test_flush_and_readinto(self): 2222 def _readinto(bufio, n=-1): 2223 b = bytearray(n if n >= 0 else 9999) 2224 n = bufio.readinto(b) 2225 return bytes(b[:n]) 2226 self.check_flush_and_read(_readinto) 2227 2228 def test_flush_and_peek(self): 2229 def _peek(bufio, n=-1): 2230 # This relies on the fact that the buffer can contain the whole 2231 # raw stream, otherwise peek() can return less. 2232 b = bufio.peek(n) 2233 if n != -1: 2234 b = b[:n] 2235 bufio.seek(len(b), 1) 2236 return b 2237 self.check_flush_and_read(_peek) 2238 2239 def test_flush_and_write(self): 2240 raw = self.BytesIO(b"abcdefghi") 2241 bufio = self.tp(raw) 2242 2243 bufio.write(b"123") 2244 bufio.flush() 2245 bufio.write(b"45") 2246 bufio.flush() 2247 bufio.seek(0, 0) 2248 self.assertEqual(b"12345fghi", raw.getvalue()) 2249 self.assertEqual(b"12345fghi", bufio.read()) 2250 2251 def test_threads(self): 2252 BufferedReaderTest.test_threads(self) 2253 BufferedWriterTest.test_threads(self) 2254 2255 def test_writes_and_peek(self): 2256 def _peek(bufio): 2257 bufio.peek(1) 2258 self.check_writes(_peek) 2259 def _peek(bufio): 2260 pos = bufio.tell() 2261 bufio.seek(-1, 1) 2262 bufio.peek(1) 2263 bufio.seek(pos, 0) 2264 self.check_writes(_peek) 2265 2266 def test_writes_and_reads(self): 2267 def _read(bufio): 2268 bufio.seek(-1, 1) 2269 bufio.read(1) 2270 self.check_writes(_read) 2271 2272 def test_writes_and_read1s(self): 2273 def _read1(bufio): 2274 bufio.seek(-1, 1) 2275 bufio.read1(1) 2276 self.check_writes(_read1) 2277 2278 def test_writes_and_readintos(self): 2279 def _read(bufio): 2280 bufio.seek(-1, 1) 2281 bufio.readinto(bytearray(1)) 2282 self.check_writes(_read) 2283 2284 def test_write_after_readahead(self): 2285 # Issue #6629: writing after the buffer was filled by readahead should 2286 # first rewind the raw stream. 2287 for overwrite_size in [1, 5]: 2288 raw = self.BytesIO(b"A" * 10) 2289 bufio = self.tp(raw, 4) 2290 # Trigger readahead 2291 self.assertEqual(bufio.read(1), b"A") 2292 self.assertEqual(bufio.tell(), 1) 2293 # Overwriting should rewind the raw stream if it needs so 2294 bufio.write(b"B" * overwrite_size) 2295 self.assertEqual(bufio.tell(), overwrite_size + 1) 2296 # If the write size was smaller than the buffer size, flush() and 2297 # check that rewind happens. 2298 bufio.flush() 2299 self.assertEqual(bufio.tell(), overwrite_size + 1) 2300 s = raw.getvalue() 2301 self.assertEqual(s, 2302 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size)) 2303 2304 def test_write_rewind_write(self): 2305 # Various combinations of reading / writing / seeking backwards / writing again 2306 def mutate(bufio, pos1, pos2): 2307 assert pos2 >= pos1 2308 # Fill the buffer 2309 bufio.seek(pos1) 2310 bufio.read(pos2 - pos1) 2311 bufio.write(b'\x02') 2312 # This writes earlier than the previous write, but still inside 2313 # the buffer. 2314 bufio.seek(pos1) 2315 bufio.write(b'\x01') 2316 2317 b = b"\x80\x81\x82\x83\x84" 2318 for i in range(0, len(b)): 2319 for j in range(i, len(b)): 2320 raw = self.BytesIO(b) 2321 bufio = self.tp(raw, 100) 2322 mutate(bufio, i, j) 2323 bufio.flush() 2324 expected = bytearray(b) 2325 expected[j] = 2 2326 expected[i] = 1 2327 self.assertEqual(raw.getvalue(), expected, 2328 "failed result for i=%d, j=%d" % (i, j)) 2329 2330 def test_truncate_after_read_or_write(self): 2331 raw = self.BytesIO(b"A" * 10) 2332 bufio = self.tp(raw, 100) 2333 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled 2334 self.assertEqual(bufio.truncate(), 2) 2335 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases 2336 self.assertEqual(bufio.truncate(), 4) 2337 2338 def test_misbehaved_io(self): 2339 BufferedReaderTest.test_misbehaved_io(self) 2340 BufferedWriterTest.test_misbehaved_io(self) 2341 2342 def test_interleaved_read_write(self): 2343 # Test for issue #12213 2344 with self.BytesIO(b'abcdefgh') as raw: 2345 with self.tp(raw, 100) as f: 2346 f.write(b"1") 2347 self.assertEqual(f.read(1), b'b') 2348 f.write(b'2') 2349 self.assertEqual(f.read1(1), b'd') 2350 f.write(b'3') 2351 buf = bytearray(1) 2352 f.readinto(buf) 2353 self.assertEqual(buf, b'f') 2354 f.write(b'4') 2355 self.assertEqual(f.peek(1), b'h') 2356 f.flush() 2357 self.assertEqual(raw.getvalue(), b'1b2d3f4h') 2358 2359 with self.BytesIO(b'abc') as raw: 2360 with self.tp(raw, 100) as f: 2361 self.assertEqual(f.read(1), b'a') 2362 f.write(b"2") 2363 self.assertEqual(f.read(1), b'c') 2364 f.flush() 2365 self.assertEqual(raw.getvalue(), b'a2c') 2366 2367 def test_interleaved_readline_write(self): 2368 with self.BytesIO(b'ab\ncdef\ng\n') as raw: 2369 with self.tp(raw) as f: 2370 f.write(b'1') 2371 self.assertEqual(f.readline(), b'b\n') 2372 f.write(b'2') 2373 self.assertEqual(f.readline(), b'def\n') 2374 f.write(b'3') 2375 self.assertEqual(f.readline(), b'\n') 2376 f.flush() 2377 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n') 2378 2379 # You can't construct a BufferedRandom over a non-seekable stream. 2380 test_unseekable = None 2381 2382 # writable() returns True, so there's no point to test it over 2383 # a writable stream. 2384 test_truncate_on_read_only = None 2385 2386 2387class CBufferedRandomTest(BufferedRandomTest, SizeofTest): 2388 tp = io.BufferedRandom 2389 2390 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing " 2391 "instead of returning NULL for malloc failure.") 2392 def test_constructor(self): 2393 BufferedRandomTest.test_constructor(self) 2394 # The allocation can succeed on 32-bit builds, e.g. with more 2395 # than 2 GiB RAM and a 64-bit kernel. 2396 if sys.maxsize > 0x7FFFFFFF: 2397 rawio = self.MockRawIO() 2398 bufio = self.tp(rawio) 2399 self.assertRaises((OverflowError, MemoryError, ValueError), 2400 bufio.__init__, rawio, sys.maxsize) 2401 2402 def test_garbage_collection(self): 2403 CBufferedReaderTest.test_garbage_collection(self) 2404 CBufferedWriterTest.test_garbage_collection(self) 2405 2406 def test_args_error(self): 2407 # Issue #17275 2408 with self.assertRaisesRegex(TypeError, "BufferedRandom"): 2409 self.tp(io.BytesIO(), 1024, 1024, 1024) 2410 2411 2412class PyBufferedRandomTest(BufferedRandomTest): 2413 tp = pyio.BufferedRandom 2414 2415 2416# To fully exercise seek/tell, the StatefulIncrementalDecoder has these 2417# properties: 2418# - A single output character can correspond to many bytes of input. 2419# - The number of input bytes to complete the character can be 2420# undetermined until the last input byte is received. 2421# - The number of input bytes can vary depending on previous input. 2422# - A single input byte can correspond to many characters of output. 2423# - The number of output characters can be undetermined until the 2424# last input byte is received. 2425# - The number of output characters can vary depending on previous input. 2426 2427class StatefulIncrementalDecoder(codecs.IncrementalDecoder): 2428 """ 2429 For testing seek/tell behavior with a stateful, buffering decoder. 2430 2431 Input is a sequence of words. Words may be fixed-length (length set 2432 by input) or variable-length (period-terminated). In variable-length 2433 mode, extra periods are ignored. Possible words are: 2434 - 'i' followed by a number sets the input length, I (maximum 99). 2435 When I is set to 0, words are space-terminated. 2436 - 'o' followed by a number sets the output length, O (maximum 99). 2437 - Any other word is converted into a word followed by a period on 2438 the output. The output word consists of the input word truncated 2439 or padded out with hyphens to make its length equal to O. If O 2440 is 0, the word is output verbatim without truncating or padding. 2441 I and O are initially set to 1. When I changes, any buffered input is 2442 re-scanned according to the new I. EOF also terminates the last word. 2443 """ 2444 2445 def __init__(self, errors='strict'): 2446 codecs.IncrementalDecoder.__init__(self, errors) 2447 self.reset() 2448 2449 def __repr__(self): 2450 return '<SID %x>' % id(self) 2451 2452 def reset(self): 2453 self.i = 1 2454 self.o = 1 2455 self.buffer = bytearray() 2456 2457 def getstate(self): 2458 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() 2459 return bytes(self.buffer), i*100 + o 2460 2461 def setstate(self, state): 2462 buffer, io = state 2463 self.buffer = bytearray(buffer) 2464 i, o = divmod(io, 100) 2465 self.i, self.o = i ^ 1, o ^ 1 2466 2467 def decode(self, input, final=False): 2468 output = '' 2469 for b in input: 2470 if self.i == 0: # variable-length, terminated with period 2471 if b == ord('.'): 2472 if self.buffer: 2473 output += self.process_word() 2474 else: 2475 self.buffer.append(b) 2476 else: # fixed-length, terminate after self.i bytes 2477 self.buffer.append(b) 2478 if len(self.buffer) == self.i: 2479 output += self.process_word() 2480 if final and self.buffer: # EOF terminates the last word 2481 output += self.process_word() 2482 return output 2483 2484 def process_word(self): 2485 output = '' 2486 if self.buffer[0] == ord('i'): 2487 self.i = min(99, int(self.buffer[1:] or 0)) # set input length 2488 elif self.buffer[0] == ord('o'): 2489 self.o = min(99, int(self.buffer[1:] or 0)) # set output length 2490 else: 2491 output = self.buffer.decode('ascii') 2492 if len(output) < self.o: 2493 output += '-'*self.o # pad out with hyphens 2494 if self.o: 2495 output = output[:self.o] # truncate to output length 2496 output += '.' 2497 self.buffer = bytearray() 2498 return output 2499 2500 codecEnabled = False 2501 2502 @classmethod 2503 def lookupTestDecoder(cls, name): 2504 if cls.codecEnabled and name == 'test_decoder': 2505 latin1 = codecs.lookup('latin-1') 2506 return codecs.CodecInfo( 2507 name='test_decoder', encode=latin1.encode, decode=None, 2508 incrementalencoder=None, 2509 streamreader=None, streamwriter=None, 2510 incrementaldecoder=cls) 2511 2512# Register the previous decoder for testing. 2513# Disabled by default, tests will enable it. 2514codecs.register(StatefulIncrementalDecoder.lookupTestDecoder) 2515 2516 2517class StatefulIncrementalDecoderTest(unittest.TestCase): 2518 """ 2519 Make sure the StatefulIncrementalDecoder actually works. 2520 """ 2521 2522 test_cases = [ 2523 # I=1, O=1 (fixed-length input == fixed-length output) 2524 (b'abcd', False, 'a.b.c.d.'), 2525 # I=0, O=0 (variable-length input, variable-length output) 2526 (b'oiabcd', True, 'abcd.'), 2527 # I=0, O=0 (should ignore extra periods) 2528 (b'oi...abcd...', True, 'abcd.'), 2529 # I=0, O=6 (variable-length input, fixed-length output) 2530 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), 2531 # I=2, O=6 (fixed-length input < fixed-length output) 2532 (b'i.i2.o6xyz', True, 'xy----.z-----.'), 2533 # I=6, O=3 (fixed-length input > fixed-length output) 2534 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), 2535 # I=0, then 3; O=29, then 15 (with longer output) 2536 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 2537 'a----------------------------.' + 2538 'b----------------------------.' + 2539 'cde--------------------------.' + 2540 'abcdefghijabcde.' + 2541 'a.b------------.' + 2542 '.c.------------.' + 2543 'd.e------------.' + 2544 'k--------------.' + 2545 'l--------------.' + 2546 'm--------------.') 2547 ] 2548 2549 def test_decoder(self): 2550 # Try a few one-shot test cases. 2551 for input, eof, output in self.test_cases: 2552 d = StatefulIncrementalDecoder() 2553 self.assertEqual(d.decode(input, eof), output) 2554 2555 # Also test an unfinished decode, followed by forcing EOF. 2556 d = StatefulIncrementalDecoder() 2557 self.assertEqual(d.decode(b'oiabcd'), '') 2558 self.assertEqual(d.decode(b'', 1), 'abcd.') 2559 2560class TextIOWrapperTest(unittest.TestCase): 2561 2562 def setUp(self): 2563 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" 2564 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") 2565 support.unlink(support.TESTFN) 2566 2567 def tearDown(self): 2568 support.unlink(support.TESTFN) 2569 2570 def test_constructor(self): 2571 r = self.BytesIO(b"\xc3\xa9\n\n") 2572 b = self.BufferedReader(r, 1000) 2573 t = self.TextIOWrapper(b) 2574 t.__init__(b, encoding="latin-1", newline="\r\n") 2575 self.assertEqual(t.encoding, "latin-1") 2576 self.assertEqual(t.line_buffering, False) 2577 t.__init__(b, encoding="utf-8", line_buffering=True) 2578 self.assertEqual(t.encoding, "utf-8") 2579 self.assertEqual(t.line_buffering, True) 2580 self.assertEqual("\xe9\n", t.readline()) 2581 self.assertRaises(TypeError, t.__init__, b, newline=42) 2582 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 2583 2584 def test_uninitialized(self): 2585 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2586 del t 2587 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 2588 self.assertRaises(Exception, repr, t) 2589 self.assertRaisesRegex((ValueError, AttributeError), 2590 'uninitialized|has no attribute', 2591 t.read, 0) 2592 t.__init__(self.MockRawIO()) 2593 self.assertEqual(t.read(0), '') 2594 2595 def test_non_text_encoding_codecs_are_rejected(self): 2596 # Ensure the constructor complains if passed a codec that isn't 2597 # marked as a text encoding 2598 # http://bugs.python.org/issue20404 2599 r = self.BytesIO() 2600 b = self.BufferedWriter(r) 2601 with self.assertRaisesRegex(LookupError, "is not a text encoding"): 2602 self.TextIOWrapper(b, encoding="hex") 2603 2604 def test_detach(self): 2605 r = self.BytesIO() 2606 b = self.BufferedWriter(r) 2607 t = self.TextIOWrapper(b) 2608 self.assertIs(t.detach(), b) 2609 2610 t = self.TextIOWrapper(b, encoding="ascii") 2611 t.write("howdy") 2612 self.assertFalse(r.getvalue()) 2613 t.detach() 2614 self.assertEqual(r.getvalue(), b"howdy") 2615 self.assertRaises(ValueError, t.detach) 2616 2617 # Operations independent of the detached stream should still work 2618 repr(t) 2619 self.assertEqual(t.encoding, "ascii") 2620 self.assertEqual(t.errors, "strict") 2621 self.assertFalse(t.line_buffering) 2622 self.assertFalse(t.write_through) 2623 2624 def test_repr(self): 2625 raw = self.BytesIO("hello".encode("utf-8")) 2626 b = self.BufferedReader(raw) 2627 t = self.TextIOWrapper(b, encoding="utf-8") 2628 modname = self.TextIOWrapper.__module__ 2629 self.assertRegex(repr(t), 2630 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) 2631 raw.name = "dummy" 2632 self.assertRegex(repr(t), 2633 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) 2634 t.mode = "r" 2635 self.assertRegex(repr(t), 2636 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) 2637 raw.name = b"dummy" 2638 self.assertRegex(repr(t), 2639 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) 2640 2641 t.buffer.detach() 2642 repr(t) # Should not raise an exception 2643 2644 def test_recursive_repr(self): 2645 # Issue #25455 2646 raw = self.BytesIO() 2647 t = self.TextIOWrapper(raw) 2648 with support.swap_attr(raw, 'name', t): 2649 try: 2650 repr(t) # Should not crash 2651 except RuntimeError: 2652 pass 2653 2654 def test_line_buffering(self): 2655 r = self.BytesIO() 2656 b = self.BufferedWriter(r, 1000) 2657 t = self.TextIOWrapper(b, newline="\n", line_buffering=True) 2658 t.write("X") 2659 self.assertEqual(r.getvalue(), b"") # No flush happened 2660 t.write("Y\nZ") 2661 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed 2662 t.write("A\rB") 2663 self.assertEqual(r.getvalue(), b"XY\nZA\rB") 2664 2665 def test_reconfigure_line_buffering(self): 2666 r = self.BytesIO() 2667 b = self.BufferedWriter(r, 1000) 2668 t = self.TextIOWrapper(b, newline="\n", line_buffering=False) 2669 t.write("AB\nC") 2670 self.assertEqual(r.getvalue(), b"") 2671 2672 t.reconfigure(line_buffering=True) # implicit flush 2673 self.assertEqual(r.getvalue(), b"AB\nC") 2674 t.write("DEF\nG") 2675 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2676 t.write("H") 2677 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") 2678 t.reconfigure(line_buffering=False) # implicit flush 2679 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2680 t.write("IJ") 2681 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") 2682 2683 # Keeping default value 2684 t.reconfigure() 2685 t.reconfigure(line_buffering=None) 2686 self.assertEqual(t.line_buffering, False) 2687 t.reconfigure(line_buffering=True) 2688 t.reconfigure() 2689 t.reconfigure(line_buffering=None) 2690 self.assertEqual(t.line_buffering, True) 2691 2692 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2693 def test_default_encoding(self): 2694 old_environ = dict(os.environ) 2695 try: 2696 # try to get a user preferred encoding different than the current 2697 # locale encoding to check that TextIOWrapper() uses the current 2698 # locale encoding and not the user preferred encoding 2699 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'): 2700 if key in os.environ: 2701 del os.environ[key] 2702 2703 current_locale_encoding = locale.getpreferredencoding(False) 2704 b = self.BytesIO() 2705 t = self.TextIOWrapper(b) 2706 self.assertEqual(t.encoding, current_locale_encoding) 2707 finally: 2708 os.environ.clear() 2709 os.environ.update(old_environ) 2710 2711 @support.cpython_only 2712 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") 2713 def test_device_encoding(self): 2714 # Issue 15989 2715 import _testcapi 2716 b = self.BytesIO() 2717 b.fileno = lambda: _testcapi.INT_MAX + 1 2718 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2719 b.fileno = lambda: _testcapi.UINT_MAX + 1 2720 self.assertRaises(OverflowError, self.TextIOWrapper, b) 2721 2722 def test_encoding(self): 2723 # Check the encoding attribute is always set, and valid 2724 b = self.BytesIO() 2725 t = self.TextIOWrapper(b, encoding="utf-8") 2726 self.assertEqual(t.encoding, "utf-8") 2727 t = self.TextIOWrapper(b) 2728 self.assertIsNotNone(t.encoding) 2729 codecs.lookup(t.encoding) 2730 2731 def test_encoding_errors_reading(self): 2732 # (1) default 2733 b = self.BytesIO(b"abc\n\xff\n") 2734 t = self.TextIOWrapper(b, encoding="ascii") 2735 self.assertRaises(UnicodeError, t.read) 2736 # (2) explicit strict 2737 b = self.BytesIO(b"abc\n\xff\n") 2738 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2739 self.assertRaises(UnicodeError, t.read) 2740 # (3) ignore 2741 b = self.BytesIO(b"abc\n\xff\n") 2742 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") 2743 self.assertEqual(t.read(), "abc\n\n") 2744 # (4) replace 2745 b = self.BytesIO(b"abc\n\xff\n") 2746 t = self.TextIOWrapper(b, encoding="ascii", errors="replace") 2747 self.assertEqual(t.read(), "abc\n\ufffd\n") 2748 2749 def test_encoding_errors_writing(self): 2750 # (1) default 2751 b = self.BytesIO() 2752 t = self.TextIOWrapper(b, encoding="ascii") 2753 self.assertRaises(UnicodeError, t.write, "\xff") 2754 # (2) explicit strict 2755 b = self.BytesIO() 2756 t = self.TextIOWrapper(b, encoding="ascii", errors="strict") 2757 self.assertRaises(UnicodeError, t.write, "\xff") 2758 # (3) ignore 2759 b = self.BytesIO() 2760 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", 2761 newline="\n") 2762 t.write("abc\xffdef\n") 2763 t.flush() 2764 self.assertEqual(b.getvalue(), b"abcdef\n") 2765 # (4) replace 2766 b = self.BytesIO() 2767 t = self.TextIOWrapper(b, encoding="ascii", errors="replace", 2768 newline="\n") 2769 t.write("abc\xffdef\n") 2770 t.flush() 2771 self.assertEqual(b.getvalue(), b"abc?def\n") 2772 2773 def test_newlines(self): 2774 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] 2775 2776 tests = [ 2777 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], 2778 [ '', input_lines ], 2779 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], 2780 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], 2781 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], 2782 ] 2783 encodings = ( 2784 'utf-8', 'latin-1', 2785 'utf-16', 'utf-16-le', 'utf-16-be', 2786 'utf-32', 'utf-32-le', 'utf-32-be', 2787 ) 2788 2789 # Try a range of buffer sizes to test the case where \r is the last 2790 # character in TextIOWrapper._pending_line. 2791 for encoding in encodings: 2792 # XXX: str.encode() should return bytes 2793 data = bytes(''.join(input_lines).encode(encoding)) 2794 for do_reads in (False, True): 2795 for bufsize in range(1, 10): 2796 for newline, exp_lines in tests: 2797 bufio = self.BufferedReader(self.BytesIO(data), bufsize) 2798 textio = self.TextIOWrapper(bufio, newline=newline, 2799 encoding=encoding) 2800 if do_reads: 2801 got_lines = [] 2802 while True: 2803 c2 = textio.read(2) 2804 if c2 == '': 2805 break 2806 self.assertEqual(len(c2), 2) 2807 got_lines.append(c2 + textio.readline()) 2808 else: 2809 got_lines = list(textio) 2810 2811 for got_line, exp_line in zip(got_lines, exp_lines): 2812 self.assertEqual(got_line, exp_line) 2813 self.assertEqual(len(got_lines), len(exp_lines)) 2814 2815 def test_newlines_input(self): 2816 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" 2817 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2818 for newline, expected in [ 2819 (None, normalized.decode("ascii").splitlines(keepends=True)), 2820 ("", testdata.decode("ascii").splitlines(keepends=True)), 2821 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2822 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), 2823 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), 2824 ]: 2825 buf = self.BytesIO(testdata) 2826 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2827 self.assertEqual(txt.readlines(), expected) 2828 txt.seek(0) 2829 self.assertEqual(txt.read(), "".join(expected)) 2830 2831 def test_newlines_output(self): 2832 testdict = { 2833 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2834 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", 2835 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", 2836 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", 2837 } 2838 tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) 2839 for newline, expected in tests: 2840 buf = self.BytesIO() 2841 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) 2842 txt.write("AAA\nB") 2843 txt.write("BB\nCCC\n") 2844 txt.write("X\rY\r\nZ") 2845 txt.flush() 2846 self.assertEqual(buf.closed, False) 2847 self.assertEqual(buf.getvalue(), expected) 2848 2849 def test_destructor(self): 2850 l = [] 2851 base = self.BytesIO 2852 class MyBytesIO(base): 2853 def close(self): 2854 l.append(self.getvalue()) 2855 base.close(self) 2856 b = MyBytesIO() 2857 t = self.TextIOWrapper(b, encoding="ascii") 2858 t.write("abc") 2859 del t 2860 support.gc_collect() 2861 self.assertEqual([b"abc"], l) 2862 2863 def test_override_destructor(self): 2864 record = [] 2865 class MyTextIO(self.TextIOWrapper): 2866 def __del__(self): 2867 record.append(1) 2868 try: 2869 f = super().__del__ 2870 except AttributeError: 2871 pass 2872 else: 2873 f() 2874 def close(self): 2875 record.append(2) 2876 super().close() 2877 def flush(self): 2878 record.append(3) 2879 super().flush() 2880 b = self.BytesIO() 2881 t = MyTextIO(b, encoding="ascii") 2882 del t 2883 support.gc_collect() 2884 self.assertEqual(record, [1, 2, 3]) 2885 2886 def test_error_through_destructor(self): 2887 # Test that the exception state is not modified by a destructor, 2888 # even if close() fails. 2889 rawio = self.CloseFailureIO() 2890 with support.catch_unraisable_exception() as cm: 2891 with self.assertRaises(AttributeError): 2892 self.TextIOWrapper(rawio).xyzzy 2893 2894 if not IOBASE_EMITS_UNRAISABLE: 2895 self.assertIsNone(cm.unraisable) 2896 elif cm.unraisable is not None: 2897 self.assertEqual(cm.unraisable.exc_type, OSError) 2898 2899 # Systematic tests of the text I/O API 2900 2901 def test_basic_io(self): 2902 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): 2903 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": 2904 f = self.open(support.TESTFN, "w+", encoding=enc) 2905 f._CHUNK_SIZE = chunksize 2906 self.assertEqual(f.write("abc"), 3) 2907 f.close() 2908 f = self.open(support.TESTFN, "r+", encoding=enc) 2909 f._CHUNK_SIZE = chunksize 2910 self.assertEqual(f.tell(), 0) 2911 self.assertEqual(f.read(), "abc") 2912 cookie = f.tell() 2913 self.assertEqual(f.seek(0), 0) 2914 self.assertEqual(f.read(None), "abc") 2915 f.seek(0) 2916 self.assertEqual(f.read(2), "ab") 2917 self.assertEqual(f.read(1), "c") 2918 self.assertEqual(f.read(1), "") 2919 self.assertEqual(f.read(), "") 2920 self.assertEqual(f.tell(), cookie) 2921 self.assertEqual(f.seek(0), 0) 2922 self.assertEqual(f.seek(0, 2), cookie) 2923 self.assertEqual(f.write("def"), 3) 2924 self.assertEqual(f.seek(cookie), cookie) 2925 self.assertEqual(f.read(), "def") 2926 if enc.startswith("utf"): 2927 self.multi_line_test(f, enc) 2928 f.close() 2929 2930 def multi_line_test(self, f, enc): 2931 f.seek(0) 2932 f.truncate() 2933 sample = "s\xff\u0fff\uffff" 2934 wlines = [] 2935 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): 2936 chars = [] 2937 for i in range(size): 2938 chars.append(sample[i % len(sample)]) 2939 line = "".join(chars) + "\n" 2940 wlines.append((f.tell(), line)) 2941 f.write(line) 2942 f.seek(0) 2943 rlines = [] 2944 while True: 2945 pos = f.tell() 2946 line = f.readline() 2947 if not line: 2948 break 2949 rlines.append((pos, line)) 2950 self.assertEqual(rlines, wlines) 2951 2952 def test_telling(self): 2953 f = self.open(support.TESTFN, "w+", encoding="utf-8") 2954 p0 = f.tell() 2955 f.write("\xff\n") 2956 p1 = f.tell() 2957 f.write("\xff\n") 2958 p2 = f.tell() 2959 f.seek(0) 2960 self.assertEqual(f.tell(), p0) 2961 self.assertEqual(f.readline(), "\xff\n") 2962 self.assertEqual(f.tell(), p1) 2963 self.assertEqual(f.readline(), "\xff\n") 2964 self.assertEqual(f.tell(), p2) 2965 f.seek(0) 2966 for line in f: 2967 self.assertEqual(line, "\xff\n") 2968 self.assertRaises(OSError, f.tell) 2969 self.assertEqual(f.tell(), p2) 2970 f.close() 2971 2972 def test_seeking(self): 2973 chunk_size = _default_chunk_size() 2974 prefix_size = chunk_size - 2 2975 u_prefix = "a" * prefix_size 2976 prefix = bytes(u_prefix.encode("utf-8")) 2977 self.assertEqual(len(u_prefix), len(prefix)) 2978 u_suffix = "\u8888\n" 2979 suffix = bytes(u_suffix.encode("utf-8")) 2980 line = prefix + suffix 2981 with self.open(support.TESTFN, "wb") as f: 2982 f.write(line*2) 2983 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2984 s = f.read(prefix_size) 2985 self.assertEqual(s, str(prefix, "ascii")) 2986 self.assertEqual(f.tell(), prefix_size) 2987 self.assertEqual(f.readline(), u_suffix) 2988 2989 def test_seeking_too(self): 2990 # Regression test for a specific bug 2991 data = b'\xe0\xbf\xbf\n' 2992 with self.open(support.TESTFN, "wb") as f: 2993 f.write(data) 2994 with self.open(support.TESTFN, "r", encoding="utf-8") as f: 2995 f._CHUNK_SIZE # Just test that it exists 2996 f._CHUNK_SIZE = 2 2997 f.readline() 2998 f.tell() 2999 3000 def test_seek_and_tell(self): 3001 #Test seek/tell using the StatefulIncrementalDecoder. 3002 # Make test faster by doing smaller seeks 3003 CHUNK_SIZE = 128 3004 3005 def test_seek_and_tell_with_data(data, min_pos=0): 3006 """Tell/seek to various points within a data stream and ensure 3007 that the decoded data returned by read() is consistent.""" 3008 f = self.open(support.TESTFN, 'wb') 3009 f.write(data) 3010 f.close() 3011 f = self.open(support.TESTFN, encoding='test_decoder') 3012 f._CHUNK_SIZE = CHUNK_SIZE 3013 decoded = f.read() 3014 f.close() 3015 3016 for i in range(min_pos, len(decoded) + 1): # seek positions 3017 for j in [1, 5, len(decoded) - i]: # read lengths 3018 f = self.open(support.TESTFN, encoding='test_decoder') 3019 self.assertEqual(f.read(i), decoded[:i]) 3020 cookie = f.tell() 3021 self.assertEqual(f.read(j), decoded[i:i + j]) 3022 f.seek(cookie) 3023 self.assertEqual(f.read(), decoded[i:]) 3024 f.close() 3025 3026 # Enable the test decoder. 3027 StatefulIncrementalDecoder.codecEnabled = 1 3028 3029 # Run the tests. 3030 try: 3031 # Try each test case. 3032 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3033 test_seek_and_tell_with_data(input) 3034 3035 # Position each test case so that it crosses a chunk boundary. 3036 for input, _, _ in StatefulIncrementalDecoderTest.test_cases: 3037 offset = CHUNK_SIZE - len(input)//2 3038 prefix = b'.'*offset 3039 # Don't bother seeking into the prefix (takes too long). 3040 min_pos = offset*2 3041 test_seek_and_tell_with_data(prefix + input, min_pos) 3042 3043 # Ensure our test decoder won't interfere with subsequent tests. 3044 finally: 3045 StatefulIncrementalDecoder.codecEnabled = 0 3046 3047 def test_multibyte_seek_and_tell(self): 3048 f = self.open(support.TESTFN, "w", encoding="euc_jp") 3049 f.write("AB\n\u3046\u3048\n") 3050 f.close() 3051 3052 f = self.open(support.TESTFN, "r", encoding="euc_jp") 3053 self.assertEqual(f.readline(), "AB\n") 3054 p0 = f.tell() 3055 self.assertEqual(f.readline(), "\u3046\u3048\n") 3056 p1 = f.tell() 3057 f.seek(p0) 3058 self.assertEqual(f.readline(), "\u3046\u3048\n") 3059 self.assertEqual(f.tell(), p1) 3060 f.close() 3061 3062 def test_seek_with_encoder_state(self): 3063 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004") 3064 f.write("\u00e6\u0300") 3065 p0 = f.tell() 3066 f.write("\u00e6") 3067 f.seek(p0) 3068 f.write("\u0300") 3069 f.close() 3070 3071 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004") 3072 self.assertEqual(f.readline(), "\u00e6\u0300\u0300") 3073 f.close() 3074 3075 def test_encoded_writes(self): 3076 data = "1234567890" 3077 tests = ("utf-16", 3078 "utf-16-le", 3079 "utf-16-be", 3080 "utf-32", 3081 "utf-32-le", 3082 "utf-32-be") 3083 for encoding in tests: 3084 buf = self.BytesIO() 3085 f = self.TextIOWrapper(buf, encoding=encoding) 3086 # Check if the BOM is written only once (see issue1753). 3087 f.write(data) 3088 f.write(data) 3089 f.seek(0) 3090 self.assertEqual(f.read(), data * 2) 3091 f.seek(0) 3092 self.assertEqual(f.read(), data * 2) 3093 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) 3094 3095 def test_unreadable(self): 3096 class UnReadable(self.BytesIO): 3097 def readable(self): 3098 return False 3099 txt = self.TextIOWrapper(UnReadable()) 3100 self.assertRaises(OSError, txt.read) 3101 3102 def test_read_one_by_one(self): 3103 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB")) 3104 reads = "" 3105 while True: 3106 c = txt.read(1) 3107 if not c: 3108 break 3109 reads += c 3110 self.assertEqual(reads, "AA\nBB") 3111 3112 def test_readlines(self): 3113 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC")) 3114 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) 3115 txt.seek(0) 3116 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) 3117 txt.seek(0) 3118 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) 3119 3120 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. 3121 def test_read_by_chunk(self): 3122 # make sure "\r\n" straddles 128 char boundary. 3123 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB")) 3124 reads = "" 3125 while True: 3126 c = txt.read(128) 3127 if not c: 3128 break 3129 reads += c 3130 self.assertEqual(reads, "A"*127+"\nB") 3131 3132 def test_writelines(self): 3133 l = ['ab', 'cd', 'ef'] 3134 buf = self.BytesIO() 3135 txt = self.TextIOWrapper(buf) 3136 txt.writelines(l) 3137 txt.flush() 3138 self.assertEqual(buf.getvalue(), b'abcdef') 3139 3140 def test_writelines_userlist(self): 3141 l = UserList(['ab', 'cd', 'ef']) 3142 buf = self.BytesIO() 3143 txt = self.TextIOWrapper(buf) 3144 txt.writelines(l) 3145 txt.flush() 3146 self.assertEqual(buf.getvalue(), b'abcdef') 3147 3148 def test_writelines_error(self): 3149 txt = self.TextIOWrapper(self.BytesIO()) 3150 self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) 3151 self.assertRaises(TypeError, txt.writelines, None) 3152 self.assertRaises(TypeError, txt.writelines, b'abc') 3153 3154 def test_issue1395_1(self): 3155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3156 3157 # read one char at a time 3158 reads = "" 3159 while True: 3160 c = txt.read(1) 3161 if not c: 3162 break 3163 reads += c 3164 self.assertEqual(reads, self.normalized) 3165 3166 def test_issue1395_2(self): 3167 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3168 txt._CHUNK_SIZE = 4 3169 3170 reads = "" 3171 while True: 3172 c = txt.read(4) 3173 if not c: 3174 break 3175 reads += c 3176 self.assertEqual(reads, self.normalized) 3177 3178 def test_issue1395_3(self): 3179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3180 txt._CHUNK_SIZE = 4 3181 3182 reads = txt.read(4) 3183 reads += txt.read(4) 3184 reads += txt.readline() 3185 reads += txt.readline() 3186 reads += txt.readline() 3187 self.assertEqual(reads, self.normalized) 3188 3189 def test_issue1395_4(self): 3190 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3191 txt._CHUNK_SIZE = 4 3192 3193 reads = txt.read(4) 3194 reads += txt.read() 3195 self.assertEqual(reads, self.normalized) 3196 3197 def test_issue1395_5(self): 3198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3199 txt._CHUNK_SIZE = 4 3200 3201 reads = txt.read(4) 3202 pos = txt.tell() 3203 txt.seek(0) 3204 txt.seek(pos) 3205 self.assertEqual(txt.read(4), "BBB\n") 3206 3207 def test_issue2282(self): 3208 buffer = self.BytesIO(self.testdata) 3209 txt = self.TextIOWrapper(buffer, encoding="ascii") 3210 3211 self.assertEqual(buffer.seekable(), txt.seekable()) 3212 3213 def test_append_bom(self): 3214 # The BOM is not written again when appending to a non-empty file 3215 filename = support.TESTFN 3216 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3217 with self.open(filename, 'w', encoding=charset) as f: 3218 f.write('aaa') 3219 pos = f.tell() 3220 with self.open(filename, 'rb') as f: 3221 self.assertEqual(f.read(), 'aaa'.encode(charset)) 3222 3223 with self.open(filename, 'a', encoding=charset) as f: 3224 f.write('xxx') 3225 with self.open(filename, 'rb') as f: 3226 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3227 3228 def test_seek_bom(self): 3229 # Same test, but when seeking manually 3230 filename = support.TESTFN 3231 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3232 with self.open(filename, 'w', encoding=charset) as f: 3233 f.write('aaa') 3234 pos = f.tell() 3235 with self.open(filename, 'r+', encoding=charset) as f: 3236 f.seek(pos) 3237 f.write('zzz') 3238 f.seek(0) 3239 f.write('bbb') 3240 with self.open(filename, 'rb') as f: 3241 self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) 3242 3243 def test_seek_append_bom(self): 3244 # Same test, but first seek to the start and then to the end 3245 filename = support.TESTFN 3246 for charset in ('utf-8-sig', 'utf-16', 'utf-32'): 3247 with self.open(filename, 'w', encoding=charset) as f: 3248 f.write('aaa') 3249 with self.open(filename, 'a', encoding=charset) as f: 3250 f.seek(0) 3251 f.seek(0, self.SEEK_END) 3252 f.write('xxx') 3253 with self.open(filename, 'rb') as f: 3254 self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) 3255 3256 def test_errors_property(self): 3257 with self.open(support.TESTFN, "w") as f: 3258 self.assertEqual(f.errors, "strict") 3259 with self.open(support.TESTFN, "w", errors="replace") as f: 3260 self.assertEqual(f.errors, "replace") 3261 3262 @support.no_tracing 3263 def test_threads_write(self): 3264 # Issue6750: concurrent writes could duplicate data 3265 event = threading.Event() 3266 with self.open(support.TESTFN, "w", buffering=1) as f: 3267 def run(n): 3268 text = "Thread%03d\n" % n 3269 event.wait() 3270 f.write(text) 3271 threads = [threading.Thread(target=run, args=(x,)) 3272 for x in range(20)] 3273 with support.start_threads(threads, event.set): 3274 time.sleep(0.02) 3275 with self.open(support.TESTFN) as f: 3276 content = f.read() 3277 for n in range(20): 3278 self.assertEqual(content.count("Thread%03d\n" % n), 1) 3279 3280 def test_flush_error_on_close(self): 3281 # Test that text file is closed despite failed flush 3282 # and that flush() is called before file closed. 3283 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3284 closed = [] 3285 def bad_flush(): 3286 closed[:] = [txt.closed, txt.buffer.closed] 3287 raise OSError() 3288 txt.flush = bad_flush 3289 self.assertRaises(OSError, txt.close) # exception not swallowed 3290 self.assertTrue(txt.closed) 3291 self.assertTrue(txt.buffer.closed) 3292 self.assertTrue(closed) # flush() called 3293 self.assertFalse(closed[0]) # flush() called before file closed 3294 self.assertFalse(closed[1]) 3295 txt.flush = lambda: None # break reference loop 3296 3297 def test_close_error_on_close(self): 3298 buffer = self.BytesIO(self.testdata) 3299 def bad_flush(): 3300 raise OSError('flush') 3301 def bad_close(): 3302 raise OSError('close') 3303 buffer.close = bad_close 3304 txt = self.TextIOWrapper(buffer, encoding="ascii") 3305 txt.flush = bad_flush 3306 with self.assertRaises(OSError) as err: # exception not swallowed 3307 txt.close() 3308 self.assertEqual(err.exception.args, ('close',)) 3309 self.assertIsInstance(err.exception.__context__, OSError) 3310 self.assertEqual(err.exception.__context__.args, ('flush',)) 3311 self.assertFalse(txt.closed) 3312 3313 # Silence destructor error 3314 buffer.close = lambda: None 3315 txt.flush = lambda: None 3316 3317 def test_nonnormalized_close_error_on_close(self): 3318 # Issue #21677 3319 buffer = self.BytesIO(self.testdata) 3320 def bad_flush(): 3321 raise non_existing_flush 3322 def bad_close(): 3323 raise non_existing_close 3324 buffer.close = bad_close 3325 txt = self.TextIOWrapper(buffer, encoding="ascii") 3326 txt.flush = bad_flush 3327 with self.assertRaises(NameError) as err: # exception not swallowed 3328 txt.close() 3329 self.assertIn('non_existing_close', str(err.exception)) 3330 self.assertIsInstance(err.exception.__context__, NameError) 3331 self.assertIn('non_existing_flush', str(err.exception.__context__)) 3332 self.assertFalse(txt.closed) 3333 3334 # Silence destructor error 3335 buffer.close = lambda: None 3336 txt.flush = lambda: None 3337 3338 def test_multi_close(self): 3339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3340 txt.close() 3341 txt.close() 3342 txt.close() 3343 self.assertRaises(ValueError, txt.flush) 3344 3345 def test_unseekable(self): 3346 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata)) 3347 self.assertRaises(self.UnsupportedOperation, txt.tell) 3348 self.assertRaises(self.UnsupportedOperation, txt.seek, 0) 3349 3350 def test_readonly_attributes(self): 3351 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") 3352 buf = self.BytesIO(self.testdata) 3353 with self.assertRaises(AttributeError): 3354 txt.buffer = buf 3355 3356 def test_rawio(self): 3357 # Issue #12591: TextIOWrapper must work with raw I/O objects, so 3358 # that subprocess.Popen() can have the required unbuffered 3359 # semantics with universal_newlines=True. 3360 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3361 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3362 # Reads 3363 self.assertEqual(txt.read(4), 'abcd') 3364 self.assertEqual(txt.readline(), 'efghi\n') 3365 self.assertEqual(list(txt), ['jkl\n', 'opq\n']) 3366 3367 def test_rawio_write_through(self): 3368 # Issue #12591: with write_through=True, writes don't need a flush 3369 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) 3370 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', 3371 write_through=True) 3372 txt.write('1') 3373 txt.write('23\n4') 3374 txt.write('5') 3375 self.assertEqual(b''.join(raw._write_stack), b'123\n45') 3376 3377 def test_bufio_write_through(self): 3378 # Issue #21396: write_through=True doesn't force a flush() 3379 # on the underlying binary buffered object. 3380 flush_called, write_called = [], [] 3381 class BufferedWriter(self.BufferedWriter): 3382 def flush(self, *args, **kwargs): 3383 flush_called.append(True) 3384 return super().flush(*args, **kwargs) 3385 def write(self, *args, **kwargs): 3386 write_called.append(True) 3387 return super().write(*args, **kwargs) 3388 3389 rawio = self.BytesIO() 3390 data = b"a" 3391 bufio = BufferedWriter(rawio, len(data)*2) 3392 textio = self.TextIOWrapper(bufio, encoding='ascii', 3393 write_through=True) 3394 # write to the buffered io but don't overflow the buffer 3395 text = data.decode('ascii') 3396 textio.write(text) 3397 3398 # buffer.flush is not called with write_through=True 3399 self.assertFalse(flush_called) 3400 # buffer.write *is* called with write_through=True 3401 self.assertTrue(write_called) 3402 self.assertEqual(rawio.getvalue(), b"") # no flush 3403 3404 write_called = [] # reset 3405 textio.write(text * 10) # total content is larger than bufio buffer 3406 self.assertTrue(write_called) 3407 self.assertEqual(rawio.getvalue(), data * 11) # all flushed 3408 3409 def test_reconfigure_write_through(self): 3410 raw = self.MockRawIO([]) 3411 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3412 t.write('1') 3413 t.reconfigure(write_through=True) # implied flush 3414 self.assertEqual(t.write_through, True) 3415 self.assertEqual(b''.join(raw._write_stack), b'1') 3416 t.write('23') 3417 self.assertEqual(b''.join(raw._write_stack), b'123') 3418 t.reconfigure(write_through=False) 3419 self.assertEqual(t.write_through, False) 3420 t.write('45') 3421 t.flush() 3422 self.assertEqual(b''.join(raw._write_stack), b'12345') 3423 # Keeping default value 3424 t.reconfigure() 3425 t.reconfigure(write_through=None) 3426 self.assertEqual(t.write_through, False) 3427 t.reconfigure(write_through=True) 3428 t.reconfigure() 3429 t.reconfigure(write_through=None) 3430 self.assertEqual(t.write_through, True) 3431 3432 def test_read_nonbytes(self): 3433 # Issue #17106 3434 # Crash when underlying read() returns non-bytes 3435 t = self.TextIOWrapper(self.StringIO('a')) 3436 self.assertRaises(TypeError, t.read, 1) 3437 t = self.TextIOWrapper(self.StringIO('a')) 3438 self.assertRaises(TypeError, t.readline) 3439 t = self.TextIOWrapper(self.StringIO('a')) 3440 self.assertRaises(TypeError, t.read) 3441 3442 def test_illegal_encoder(self): 3443 # Issue 31271: Calling write() while the return value of encoder's 3444 # encode() is invalid shouldn't cause an assertion failure. 3445 rot13 = codecs.lookup("rot13") 3446 with support.swap_attr(rot13, '_is_text_encoding', True): 3447 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13") 3448 self.assertRaises(TypeError, t.write, 'bar') 3449 3450 def test_illegal_decoder(self): 3451 # Issue #17106 3452 # Bypass the early encoding check added in issue 20404 3453 def _make_illegal_wrapper(): 3454 quopri = codecs.lookup("quopri") 3455 quopri._is_text_encoding = True 3456 try: 3457 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), 3458 newline='\n', encoding="quopri") 3459 finally: 3460 quopri._is_text_encoding = False 3461 return t 3462 # Crash when decoder returns non-string 3463 t = _make_illegal_wrapper() 3464 self.assertRaises(TypeError, t.read, 1) 3465 t = _make_illegal_wrapper() 3466 self.assertRaises(TypeError, t.readline) 3467 t = _make_illegal_wrapper() 3468 self.assertRaises(TypeError, t.read) 3469 3470 # Issue 31243: calling read() while the return value of decoder's 3471 # getstate() is invalid should neither crash the interpreter nor 3472 # raise a SystemError. 3473 def _make_very_illegal_wrapper(getstate_ret_val): 3474 class BadDecoder: 3475 def getstate(self): 3476 return getstate_ret_val 3477 def _get_bad_decoder(dummy): 3478 return BadDecoder() 3479 quopri = codecs.lookup("quopri") 3480 with support.swap_attr(quopri, 'incrementaldecoder', 3481 _get_bad_decoder): 3482 return _make_illegal_wrapper() 3483 t = _make_very_illegal_wrapper(42) 3484 self.assertRaises(TypeError, t.read, 42) 3485 t = _make_very_illegal_wrapper(()) 3486 self.assertRaises(TypeError, t.read, 42) 3487 t = _make_very_illegal_wrapper((1, 2)) 3488 self.assertRaises(TypeError, t.read, 42) 3489 3490 def _check_create_at_shutdown(self, **kwargs): 3491 # Issue #20037: creating a TextIOWrapper at shutdown 3492 # shouldn't crash the interpreter. 3493 iomod = self.io.__name__ 3494 code = """if 1: 3495 import codecs 3496 import {iomod} as io 3497 3498 # Avoid looking up codecs at shutdown 3499 codecs.lookup('utf-8') 3500 3501 class C: 3502 def __init__(self): 3503 self.buf = io.BytesIO() 3504 def __del__(self): 3505 io.TextIOWrapper(self.buf, **{kwargs}) 3506 print("ok") 3507 c = C() 3508 """.format(iomod=iomod, kwargs=kwargs) 3509 return assert_python_ok("-c", code) 3510 3511 def test_create_at_shutdown_without_encoding(self): 3512 rc, out, err = self._check_create_at_shutdown() 3513 if err: 3514 # Can error out with a RuntimeError if the module state 3515 # isn't found. 3516 self.assertIn(self.shutdown_error, err.decode()) 3517 else: 3518 self.assertEqual("ok", out.decode().strip()) 3519 3520 def test_create_at_shutdown_with_encoding(self): 3521 rc, out, err = self._check_create_at_shutdown(encoding='utf-8', 3522 errors='strict') 3523 self.assertFalse(err) 3524 self.assertEqual("ok", out.decode().strip()) 3525 3526 def test_read_byteslike(self): 3527 r = MemviewBytesIO(b'Just some random string\n') 3528 t = self.TextIOWrapper(r, 'utf-8') 3529 3530 # TextIOwrapper will not read the full string, because 3531 # we truncate it to a multiple of the native int size 3532 # so that we can construct a more complex memoryview. 3533 bytes_val = _to_memoryview(r.getvalue()).tobytes() 3534 3535 self.assertEqual(t.read(200), bytes_val.decode('utf-8')) 3536 3537 def test_issue22849(self): 3538 class F(object): 3539 def readable(self): return True 3540 def writable(self): return True 3541 def seekable(self): return True 3542 3543 for i in range(10): 3544 try: 3545 self.TextIOWrapper(F(), encoding='utf-8') 3546 except Exception: 3547 pass 3548 3549 F.tell = lambda x: 0 3550 t = self.TextIOWrapper(F(), encoding='utf-8') 3551 3552 def test_reconfigure_encoding_read(self): 3553 # latin1 -> utf8 3554 # (latin1 can decode utf-8 encoded string) 3555 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') 3556 raw = self.BytesIO(data) 3557 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3558 self.assertEqual(txt.readline(), 'abc\xe9\n') 3559 with self.assertRaises(self.UnsupportedOperation): 3560 txt.reconfigure(encoding='utf-8') 3561 with self.assertRaises(self.UnsupportedOperation): 3562 txt.reconfigure(newline=None) 3563 3564 def test_reconfigure_write_fromascii(self): 3565 # ascii has a specific encodefunc in the C implementation, 3566 # but utf-8-sig has not. Make sure that we get rid of the 3567 # cached encodefunc when we switch encoders. 3568 raw = self.BytesIO() 3569 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3570 txt.write('foo\n') 3571 txt.reconfigure(encoding='utf-8-sig') 3572 txt.write('\xe9\n') 3573 txt.flush() 3574 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') 3575 3576 def test_reconfigure_write(self): 3577 # latin -> utf8 3578 raw = self.BytesIO() 3579 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') 3580 txt.write('abc\xe9\n') 3581 txt.reconfigure(encoding='utf-8') 3582 self.assertEqual(raw.getvalue(), b'abc\xe9\n') 3583 txt.write('d\xe9f\n') 3584 txt.flush() 3585 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') 3586 3587 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of 3588 # the file 3589 raw = self.BytesIO() 3590 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3591 txt.write('abc\n') 3592 txt.reconfigure(encoding='utf-8-sig') 3593 txt.write('d\xe9f\n') 3594 txt.flush() 3595 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') 3596 3597 def test_reconfigure_write_non_seekable(self): 3598 raw = self.BytesIO() 3599 raw.seekable = lambda: False 3600 raw.seek = None 3601 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') 3602 txt.write('abc\n') 3603 txt.reconfigure(encoding='utf-8-sig') 3604 txt.write('d\xe9f\n') 3605 txt.flush() 3606 3607 # If the raw stream is not seekable, there'll be a BOM 3608 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') 3609 3610 def test_reconfigure_defaults(self): 3611 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') 3612 txt.reconfigure(encoding=None) 3613 self.assertEqual(txt.encoding, 'ascii') 3614 self.assertEqual(txt.errors, 'replace') 3615 txt.write('LF\n') 3616 3617 txt.reconfigure(newline='\r\n') 3618 self.assertEqual(txt.encoding, 'ascii') 3619 self.assertEqual(txt.errors, 'replace') 3620 3621 txt.reconfigure(errors='ignore') 3622 self.assertEqual(txt.encoding, 'ascii') 3623 self.assertEqual(txt.errors, 'ignore') 3624 txt.write('CRLF\n') 3625 3626 txt.reconfigure(encoding='utf-8', newline=None) 3627 self.assertEqual(txt.errors, 'strict') 3628 txt.seek(0) 3629 self.assertEqual(txt.read(), 'LF\nCRLF\n') 3630 3631 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') 3632 3633 def test_reconfigure_newline(self): 3634 raw = self.BytesIO(b'CR\rEOF') 3635 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3636 txt.reconfigure(newline=None) 3637 self.assertEqual(txt.readline(), 'CR\n') 3638 raw = self.BytesIO(b'CR\rEOF') 3639 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3640 txt.reconfigure(newline='') 3641 self.assertEqual(txt.readline(), 'CR\r') 3642 raw = self.BytesIO(b'CR\rLF\nEOF') 3643 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3644 txt.reconfigure(newline='\n') 3645 self.assertEqual(txt.readline(), 'CR\rLF\n') 3646 raw = self.BytesIO(b'LF\nCR\rEOF') 3647 txt = self.TextIOWrapper(raw, 'ascii', newline='\n') 3648 txt.reconfigure(newline='\r') 3649 self.assertEqual(txt.readline(), 'LF\nCR\r') 3650 raw = self.BytesIO(b'CR\rCRLF\r\nEOF') 3651 txt = self.TextIOWrapper(raw, 'ascii', newline='\r') 3652 txt.reconfigure(newline='\r\n') 3653 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') 3654 3655 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') 3656 txt.reconfigure(newline=None) 3657 txt.write('linesep\n') 3658 txt.reconfigure(newline='') 3659 txt.write('LF\n') 3660 txt.reconfigure(newline='\n') 3661 txt.write('LF\n') 3662 txt.reconfigure(newline='\r') 3663 txt.write('CR\n') 3664 txt.reconfigure(newline='\r\n') 3665 txt.write('CRLF\n') 3666 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' 3667 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) 3668 3669 def test_issue25862(self): 3670 # Assertion failures occurred in tell() after read() and write(). 3671 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3672 t.read(1) 3673 t.read() 3674 t.tell() 3675 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') 3676 t.read(1) 3677 t.write('x') 3678 t.tell() 3679 3680 3681class MemviewBytesIO(io.BytesIO): 3682 '''A BytesIO object whose read method returns memoryviews 3683 rather than bytes''' 3684 3685 def read1(self, len_): 3686 return _to_memoryview(super().read1(len_)) 3687 3688 def read(self, len_): 3689 return _to_memoryview(super().read(len_)) 3690 3691def _to_memoryview(buf): 3692 '''Convert bytes-object *buf* to a non-trivial memoryview''' 3693 3694 arr = array.array('i') 3695 idx = len(buf) - len(buf) % arr.itemsize 3696 arr.frombytes(buf[:idx]) 3697 return memoryview(arr) 3698 3699 3700class CTextIOWrapperTest(TextIOWrapperTest): 3701 io = io 3702 shutdown_error = "LookupError: unknown encoding: ascii" 3703 3704 def test_initialization(self): 3705 r = self.BytesIO(b"\xc3\xa9\n\n") 3706 b = self.BufferedReader(r, 1000) 3707 t = self.TextIOWrapper(b) 3708 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy') 3709 self.assertRaises(ValueError, t.read) 3710 3711 t = self.TextIOWrapper.__new__(self.TextIOWrapper) 3712 self.assertRaises(Exception, repr, t) 3713 3714 def test_garbage_collection(self): 3715 # C TextIOWrapper objects are collected, and collecting them flushes 3716 # all data to disk. 3717 # The Python version has __del__, so it ends in gc.garbage instead. 3718 with support.check_warnings(('', ResourceWarning)): 3719 rawio = io.FileIO(support.TESTFN, "wb") 3720 b = self.BufferedWriter(rawio) 3721 t = self.TextIOWrapper(b, encoding="ascii") 3722 t.write("456def") 3723 t.x = t 3724 wr = weakref.ref(t) 3725 del t 3726 support.gc_collect() 3727 self.assertIsNone(wr(), wr) 3728 with self.open(support.TESTFN, "rb") as f: 3729 self.assertEqual(f.read(), b"456def") 3730 3731 def test_rwpair_cleared_before_textio(self): 3732 # Issue 13070: TextIOWrapper's finalization would crash when called 3733 # after the reference to the underlying BufferedRWPair's writer got 3734 # cleared by the GC. 3735 for i in range(1000): 3736 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3737 t1 = self.TextIOWrapper(b1, encoding="ascii") 3738 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) 3739 t2 = self.TextIOWrapper(b2, encoding="ascii") 3740 # circular references 3741 t1.buddy = t2 3742 t2.buddy = t1 3743 support.gc_collect() 3744 3745 def test_del__CHUNK_SIZE_SystemError(self): 3746 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') 3747 with self.assertRaises(AttributeError): 3748 del t._CHUNK_SIZE 3749 3750 3751class PyTextIOWrapperTest(TextIOWrapperTest): 3752 io = pyio 3753 shutdown_error = "LookupError: unknown encoding: ascii" 3754 3755 3756class IncrementalNewlineDecoderTest(unittest.TestCase): 3757 3758 def check_newline_decoding_utf8(self, decoder): 3759 # UTF-8 specific tests for a newline decoder 3760 def _check_decode(b, s, **kwargs): 3761 # We exercise getstate() / setstate() as well as decode() 3762 state = decoder.getstate() 3763 self.assertEqual(decoder.decode(b, **kwargs), s) 3764 decoder.setstate(state) 3765 self.assertEqual(decoder.decode(b, **kwargs), s) 3766 3767 _check_decode(b'\xe8\xa2\x88', "\u8888") 3768 3769 _check_decode(b'\xe8', "") 3770 _check_decode(b'\xa2', "") 3771 _check_decode(b'\x88', "\u8888") 3772 3773 _check_decode(b'\xe8', "") 3774 _check_decode(b'\xa2', "") 3775 _check_decode(b'\x88', "\u8888") 3776 3777 _check_decode(b'\xe8', "") 3778 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) 3779 3780 decoder.reset() 3781 _check_decode(b'\n', "\n") 3782 _check_decode(b'\r', "") 3783 _check_decode(b'', "\n", final=True) 3784 _check_decode(b'\r', "\n", final=True) 3785 3786 _check_decode(b'\r', "") 3787 _check_decode(b'a', "\na") 3788 3789 _check_decode(b'\r\r\n', "\n\n") 3790 _check_decode(b'\r', "") 3791 _check_decode(b'\r', "\n") 3792 _check_decode(b'\na', "\na") 3793 3794 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") 3795 _check_decode(b'\xe8\xa2\x88', "\u8888") 3796 _check_decode(b'\n', "\n") 3797 _check_decode(b'\xe8\xa2\x88\r', "\u8888") 3798 _check_decode(b'\n', "\n") 3799 3800 def check_newline_decoding(self, decoder, encoding): 3801 result = [] 3802 if encoding is not None: 3803 encoder = codecs.getincrementalencoder(encoding)() 3804 def _decode_bytewise(s): 3805 # Decode one byte at a time 3806 for b in encoder.encode(s): 3807 result.append(decoder.decode(bytes([b]))) 3808 else: 3809 encoder = None 3810 def _decode_bytewise(s): 3811 # Decode one char at a time 3812 for c in s: 3813 result.append(decoder.decode(c)) 3814 self.assertEqual(decoder.newlines, None) 3815 _decode_bytewise("abc\n\r") 3816 self.assertEqual(decoder.newlines, '\n') 3817 _decode_bytewise("\nabc") 3818 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3819 _decode_bytewise("abc\r") 3820 self.assertEqual(decoder.newlines, ('\n', '\r\n')) 3821 _decode_bytewise("abc") 3822 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) 3823 _decode_bytewise("abc\r") 3824 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") 3825 decoder.reset() 3826 input = "abc" 3827 if encoder is not None: 3828 encoder.reset() 3829 input = encoder.encode(input) 3830 self.assertEqual(decoder.decode(input), "abc") 3831 self.assertEqual(decoder.newlines, None) 3832 3833 def test_newline_decoder(self): 3834 encodings = ( 3835 # None meaning the IncrementalNewlineDecoder takes unicode input 3836 # rather than bytes input 3837 None, 'utf-8', 'latin-1', 3838 'utf-16', 'utf-16-le', 'utf-16-be', 3839 'utf-32', 'utf-32-le', 'utf-32-be', 3840 ) 3841 for enc in encodings: 3842 decoder = enc and codecs.getincrementaldecoder(enc)() 3843 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3844 self.check_newline_decoding(decoder, enc) 3845 decoder = codecs.getincrementaldecoder("utf-8")() 3846 decoder = self.IncrementalNewlineDecoder(decoder, translate=True) 3847 self.check_newline_decoding_utf8(decoder) 3848 self.assertRaises(TypeError, decoder.setstate, 42) 3849 3850 def test_newline_bytes(self): 3851 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder 3852 def _check(dec): 3853 self.assertEqual(dec.newlines, None) 3854 self.assertEqual(dec.decode("\u0D00"), "\u0D00") 3855 self.assertEqual(dec.newlines, None) 3856 self.assertEqual(dec.decode("\u0A00"), "\u0A00") 3857 self.assertEqual(dec.newlines, None) 3858 dec = self.IncrementalNewlineDecoder(None, translate=False) 3859 _check(dec) 3860 dec = self.IncrementalNewlineDecoder(None, translate=True) 3861 _check(dec) 3862 3863 def test_translate(self): 3864 # issue 35062 3865 for translate in (-2, -1, 1, 2): 3866 decoder = codecs.getincrementaldecoder("utf-8")() 3867 decoder = self.IncrementalNewlineDecoder(decoder, translate) 3868 self.check_newline_decoding_utf8(decoder) 3869 decoder = codecs.getincrementaldecoder("utf-8")() 3870 decoder = self.IncrementalNewlineDecoder(decoder, translate=0) 3871 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") 3872 3873class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3874 pass 3875 3876class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest): 3877 pass 3878 3879 3880# XXX Tests for open() 3881 3882class MiscIOTest(unittest.TestCase): 3883 3884 def tearDown(self): 3885 support.unlink(support.TESTFN) 3886 3887 def test___all__(self): 3888 for name in self.io.__all__: 3889 obj = getattr(self.io, name, None) 3890 self.assertIsNotNone(obj, name) 3891 if name in ("open", "open_code"): 3892 continue 3893 elif "error" in name.lower() or name == "UnsupportedOperation": 3894 self.assertTrue(issubclass(obj, Exception), name) 3895 elif not name.startswith("SEEK_"): 3896 self.assertTrue(issubclass(obj, self.IOBase)) 3897 3898 def test_attributes(self): 3899 f = self.open(support.TESTFN, "wb", buffering=0) 3900 self.assertEqual(f.mode, "wb") 3901 f.close() 3902 3903 with support.check_warnings(('', DeprecationWarning)): 3904 f = self.open(support.TESTFN, "U") 3905 self.assertEqual(f.name, support.TESTFN) 3906 self.assertEqual(f.buffer.name, support.TESTFN) 3907 self.assertEqual(f.buffer.raw.name, support.TESTFN) 3908 self.assertEqual(f.mode, "U") 3909 self.assertEqual(f.buffer.mode, "rb") 3910 self.assertEqual(f.buffer.raw.mode, "rb") 3911 f.close() 3912 3913 f = self.open(support.TESTFN, "w+") 3914 self.assertEqual(f.mode, "w+") 3915 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter? 3916 self.assertEqual(f.buffer.raw.mode, "rb+") 3917 3918 g = self.open(f.fileno(), "wb", closefd=False) 3919 self.assertEqual(g.mode, "wb") 3920 self.assertEqual(g.raw.mode, "wb") 3921 self.assertEqual(g.name, f.fileno()) 3922 self.assertEqual(g.raw.name, f.fileno()) 3923 f.close() 3924 g.close() 3925 3926 def test_open_pipe_with_append(self): 3927 # bpo-27805: Ignore ESPIPE from lseek() in open(). 3928 r, w = os.pipe() 3929 self.addCleanup(os.close, r) 3930 f = self.open(w, 'a') 3931 self.addCleanup(f.close) 3932 # Check that the file is marked non-seekable. On Windows, however, lseek 3933 # somehow succeeds on pipes. 3934 if sys.platform != 'win32': 3935 self.assertFalse(f.seekable()) 3936 3937 def test_io_after_close(self): 3938 for kwargs in [ 3939 {"mode": "w"}, 3940 {"mode": "wb"}, 3941 {"mode": "w", "buffering": 1}, 3942 {"mode": "w", "buffering": 2}, 3943 {"mode": "wb", "buffering": 0}, 3944 {"mode": "r"}, 3945 {"mode": "rb"}, 3946 {"mode": "r", "buffering": 1}, 3947 {"mode": "r", "buffering": 2}, 3948 {"mode": "rb", "buffering": 0}, 3949 {"mode": "w+"}, 3950 {"mode": "w+b"}, 3951 {"mode": "w+", "buffering": 1}, 3952 {"mode": "w+", "buffering": 2}, 3953 {"mode": "w+b", "buffering": 0}, 3954 ]: 3955 f = self.open(support.TESTFN, **kwargs) 3956 f.close() 3957 self.assertRaises(ValueError, f.flush) 3958 self.assertRaises(ValueError, f.fileno) 3959 self.assertRaises(ValueError, f.isatty) 3960 self.assertRaises(ValueError, f.__iter__) 3961 if hasattr(f, "peek"): 3962 self.assertRaises(ValueError, f.peek, 1) 3963 self.assertRaises(ValueError, f.read) 3964 if hasattr(f, "read1"): 3965 self.assertRaises(ValueError, f.read1, 1024) 3966 self.assertRaises(ValueError, f.read1) 3967 if hasattr(f, "readall"): 3968 self.assertRaises(ValueError, f.readall) 3969 if hasattr(f, "readinto"): 3970 self.assertRaises(ValueError, f.readinto, bytearray(1024)) 3971 if hasattr(f, "readinto1"): 3972 self.assertRaises(ValueError, f.readinto1, bytearray(1024)) 3973 self.assertRaises(ValueError, f.readline) 3974 self.assertRaises(ValueError, f.readlines) 3975 self.assertRaises(ValueError, f.readlines, 1) 3976 self.assertRaises(ValueError, f.seek, 0) 3977 self.assertRaises(ValueError, f.tell) 3978 self.assertRaises(ValueError, f.truncate) 3979 self.assertRaises(ValueError, f.write, 3980 b"" if "b" in kwargs['mode'] else "") 3981 self.assertRaises(ValueError, f.writelines, []) 3982 self.assertRaises(ValueError, next, f) 3983 3984 def test_blockingioerror(self): 3985 # Various BlockingIOError issues 3986 class C(str): 3987 pass 3988 c = C("") 3989 b = self.BlockingIOError(1, c) 3990 c.b = b 3991 b.c = c 3992 wr = weakref.ref(c) 3993 del c, b 3994 support.gc_collect() 3995 self.assertIsNone(wr(), wr) 3996 3997 def test_abcs(self): 3998 # Test the visible base classes are ABCs. 3999 self.assertIsInstance(self.IOBase, abc.ABCMeta) 4000 self.assertIsInstance(self.RawIOBase, abc.ABCMeta) 4001 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta) 4002 self.assertIsInstance(self.TextIOBase, abc.ABCMeta) 4003 4004 def _check_abc_inheritance(self, abcmodule): 4005 with self.open(support.TESTFN, "wb", buffering=0) as f: 4006 self.assertIsInstance(f, abcmodule.IOBase) 4007 self.assertIsInstance(f, abcmodule.RawIOBase) 4008 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4009 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4010 with self.open(support.TESTFN, "wb") as f: 4011 self.assertIsInstance(f, abcmodule.IOBase) 4012 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4013 self.assertIsInstance(f, abcmodule.BufferedIOBase) 4014 self.assertNotIsInstance(f, abcmodule.TextIOBase) 4015 with self.open(support.TESTFN, "w") as f: 4016 self.assertIsInstance(f, abcmodule.IOBase) 4017 self.assertNotIsInstance(f, abcmodule.RawIOBase) 4018 self.assertNotIsInstance(f, abcmodule.BufferedIOBase) 4019 self.assertIsInstance(f, abcmodule.TextIOBase) 4020 4021 def test_abc_inheritance(self): 4022 # Test implementations inherit from their respective ABCs 4023 self._check_abc_inheritance(self) 4024 4025 def test_abc_inheritance_official(self): 4026 # Test implementations inherit from the official ABCs of the 4027 # baseline "io" module. 4028 self._check_abc_inheritance(io) 4029 4030 def _check_warn_on_dealloc(self, *args, **kwargs): 4031 f = open(*args, **kwargs) 4032 r = repr(f) 4033 with self.assertWarns(ResourceWarning) as cm: 4034 f = None 4035 support.gc_collect() 4036 self.assertIn(r, str(cm.warning.args[0])) 4037 4038 def test_warn_on_dealloc(self): 4039 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0) 4040 self._check_warn_on_dealloc(support.TESTFN, "wb") 4041 self._check_warn_on_dealloc(support.TESTFN, "w") 4042 4043 def _check_warn_on_dealloc_fd(self, *args, **kwargs): 4044 fds = [] 4045 def cleanup_fds(): 4046 for fd in fds: 4047 try: 4048 os.close(fd) 4049 except OSError as e: 4050 if e.errno != errno.EBADF: 4051 raise 4052 self.addCleanup(cleanup_fds) 4053 r, w = os.pipe() 4054 fds += r, w 4055 self._check_warn_on_dealloc(r, *args, **kwargs) 4056 # When using closefd=False, there's no warning 4057 r, w = os.pipe() 4058 fds += r, w 4059 with support.check_no_resource_warning(self): 4060 open(r, *args, closefd=False, **kwargs) 4061 4062 def test_warn_on_dealloc_fd(self): 4063 self._check_warn_on_dealloc_fd("rb", buffering=0) 4064 self._check_warn_on_dealloc_fd("rb") 4065 self._check_warn_on_dealloc_fd("r") 4066 4067 4068 def test_pickling(self): 4069 # Pickling file objects is forbidden 4070 for kwargs in [ 4071 {"mode": "w"}, 4072 {"mode": "wb"}, 4073 {"mode": "wb", "buffering": 0}, 4074 {"mode": "r"}, 4075 {"mode": "rb"}, 4076 {"mode": "rb", "buffering": 0}, 4077 {"mode": "w+"}, 4078 {"mode": "w+b"}, 4079 {"mode": "w+b", "buffering": 0}, 4080 ]: 4081 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 4082 with self.open(support.TESTFN, **kwargs) as f: 4083 self.assertRaises(TypeError, pickle.dumps, f, protocol) 4084 4085 def test_nonblock_pipe_write_bigbuf(self): 4086 self._test_nonblock_pipe_write(16*1024) 4087 4088 def test_nonblock_pipe_write_smallbuf(self): 4089 self._test_nonblock_pipe_write(1024) 4090 4091 @unittest.skipUnless(hasattr(os, 'set_blocking'), 4092 'os.set_blocking() required for this test') 4093 def _test_nonblock_pipe_write(self, bufsize): 4094 sent = [] 4095 received = [] 4096 r, w = os.pipe() 4097 os.set_blocking(r, False) 4098 os.set_blocking(w, False) 4099 4100 # To exercise all code paths in the C implementation we need 4101 # to play with buffer sizes. For instance, if we choose a 4102 # buffer size less than or equal to _PIPE_BUF (4096 on Linux) 4103 # then we will never get a partial write of the buffer. 4104 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize) 4105 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize) 4106 4107 with rf, wf: 4108 for N in 9999, 73, 7574: 4109 try: 4110 i = 0 4111 while True: 4112 msg = bytes([i % 26 + 97]) * N 4113 sent.append(msg) 4114 wf.write(msg) 4115 i += 1 4116 4117 except self.BlockingIOError as e: 4118 self.assertEqual(e.args[0], errno.EAGAIN) 4119 self.assertEqual(e.args[2], e.characters_written) 4120 sent[-1] = sent[-1][:e.characters_written] 4121 received.append(rf.read()) 4122 msg = b'BLOCKED' 4123 wf.write(msg) 4124 sent.append(msg) 4125 4126 while True: 4127 try: 4128 wf.flush() 4129 break 4130 except self.BlockingIOError as e: 4131 self.assertEqual(e.args[0], errno.EAGAIN) 4132 self.assertEqual(e.args[2], e.characters_written) 4133 self.assertEqual(e.characters_written, 0) 4134 received.append(rf.read()) 4135 4136 received += iter(rf.read, None) 4137 4138 sent, received = b''.join(sent), b''.join(received) 4139 self.assertEqual(sent, received) 4140 self.assertTrue(wf.closed) 4141 self.assertTrue(rf.closed) 4142 4143 def test_create_fail(self): 4144 # 'x' mode fails if file is existing 4145 with self.open(support.TESTFN, 'w'): 4146 pass 4147 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x') 4148 4149 def test_create_writes(self): 4150 # 'x' mode opens for writing 4151 with self.open(support.TESTFN, 'xb') as f: 4152 f.write(b"spam") 4153 with self.open(support.TESTFN, 'rb') as f: 4154 self.assertEqual(b"spam", f.read()) 4155 4156 def test_open_allargs(self): 4157 # there used to be a buffer overflow in the parser for rawmode 4158 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+') 4159 4160 def test_check_encoding_errors(self): 4161 # bpo-37388: open() and TextIOWrapper must check encoding and errors 4162 # arguments in dev mode 4163 mod = self.io.__name__ 4164 filename = __file__ 4165 invalid = 'Boom, Shaka Laka, Boom!' 4166 code = textwrap.dedent(f''' 4167 import sys 4168 from {mod} import open, TextIOWrapper 4169 4170 try: 4171 open({filename!r}, encoding={invalid!r}) 4172 except LookupError: 4173 pass 4174 else: 4175 sys.exit(21) 4176 4177 try: 4178 open({filename!r}, errors={invalid!r}) 4179 except LookupError: 4180 pass 4181 else: 4182 sys.exit(22) 4183 4184 fp = open({filename!r}, "rb") 4185 with fp: 4186 try: 4187 TextIOWrapper(fp, encoding={invalid!r}) 4188 except LookupError: 4189 pass 4190 else: 4191 sys.exit(23) 4192 4193 try: 4194 TextIOWrapper(fp, errors={invalid!r}) 4195 except LookupError: 4196 pass 4197 else: 4198 sys.exit(24) 4199 4200 sys.exit(10) 4201 ''') 4202 proc = assert_python_failure('-X', 'dev', '-c', code) 4203 self.assertEqual(proc.rc, 10, proc) 4204 4205 4206class CMiscIOTest(MiscIOTest): 4207 io = io 4208 4209 def test_readinto_buffer_overflow(self): 4210 # Issue #18025 4211 class BadReader(self.io.BufferedIOBase): 4212 def read(self, n=-1): 4213 return b'x' * 10**6 4214 bufio = BadReader() 4215 b = bytearray(2) 4216 self.assertRaises(ValueError, bufio.readinto, b) 4217 4218 def check_daemon_threads_shutdown_deadlock(self, stream_name): 4219 # Issue #23309: deadlocks at shutdown should be avoided when a 4220 # daemon thread and the main thread both write to a file. 4221 code = """if 1: 4222 import sys 4223 import time 4224 import threading 4225 from test.support import SuppressCrashReport 4226 4227 file = sys.{stream_name} 4228 4229 def run(): 4230 while True: 4231 file.write('.') 4232 file.flush() 4233 4234 crash = SuppressCrashReport() 4235 crash.__enter__() 4236 # don't call __exit__(): the crash occurs at Python shutdown 4237 4238 thread = threading.Thread(target=run) 4239 thread.daemon = True 4240 thread.start() 4241 4242 time.sleep(0.5) 4243 file.write('!') 4244 file.flush() 4245 """.format_map(locals()) 4246 res, _ = run_python_until_end("-c", code) 4247 err = res.err.decode() 4248 if res.rc != 0: 4249 # Failure: should be a fatal error 4250 pattern = (r"Fatal Python error: _enter_buffered_busy: " 4251 r"could not acquire lock " 4252 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> " 4253 r"at interpreter shutdown, possibly due to " 4254 r"daemon threads".format_map(locals())) 4255 self.assertRegex(err, pattern) 4256 else: 4257 self.assertFalse(err.strip('.!')) 4258 4259 def test_daemon_threads_shutdown_stdout_deadlock(self): 4260 self.check_daemon_threads_shutdown_deadlock('stdout') 4261 4262 def test_daemon_threads_shutdown_stderr_deadlock(self): 4263 self.check_daemon_threads_shutdown_deadlock('stderr') 4264 4265 4266class PyMiscIOTest(MiscIOTest): 4267 io = pyio 4268 4269 4270@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.') 4271class SignalsTest(unittest.TestCase): 4272 4273 def setUp(self): 4274 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 4275 4276 def tearDown(self): 4277 signal.signal(signal.SIGALRM, self.oldalrm) 4278 4279 def alarm_interrupt(self, sig, frame): 4280 1/0 4281 4282 def check_interrupted_write(self, item, bytes, **fdopen_kwargs): 4283 """Check that a partial write, when it gets interrupted, properly 4284 invokes the signal handler, and bubbles up the exception raised 4285 in the latter.""" 4286 read_results = [] 4287 def _read(): 4288 s = os.read(r, 1) 4289 read_results.append(s) 4290 4291 t = threading.Thread(target=_read) 4292 t.daemon = True 4293 r, w = os.pipe() 4294 fdopen_kwargs["closefd"] = False 4295 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1) 4296 try: 4297 wio = self.io.open(w, **fdopen_kwargs) 4298 if hasattr(signal, 'pthread_sigmask'): 4299 # create the thread with SIGALRM signal blocked 4300 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM]) 4301 t.start() 4302 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM]) 4303 else: 4304 t.start() 4305 4306 # Fill the pipe enough that the write will be blocking. 4307 # It will be interrupted by the timer armed above. Since the 4308 # other thread has read one byte, the low-level write will 4309 # return with a successful (partial) result rather than an EINTR. 4310 # The buffered IO layer must check for pending signal 4311 # handlers, which in this case will invoke alarm_interrupt(). 4312 signal.alarm(1) 4313 try: 4314 self.assertRaises(ZeroDivisionError, wio.write, large_data) 4315 finally: 4316 signal.alarm(0) 4317 t.join() 4318 # We got one byte, get another one and check that it isn't a 4319 # repeat of the first one. 4320 read_results.append(os.read(r, 1)) 4321 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]]) 4322 finally: 4323 os.close(w) 4324 os.close(r) 4325 # This is deliberate. If we didn't close the file descriptor 4326 # before closing wio, wio would try to flush its internal 4327 # buffer, and block again. 4328 try: 4329 wio.close() 4330 except OSError as e: 4331 if e.errno != errno.EBADF: 4332 raise 4333 4334 def test_interrupted_write_unbuffered(self): 4335 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0) 4336 4337 def test_interrupted_write_buffered(self): 4338 self.check_interrupted_write(b"xy", b"xy", mode="wb") 4339 4340 def test_interrupted_write_text(self): 4341 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii") 4342 4343 @support.no_tracing 4344 def check_reentrant_write(self, data, **fdopen_kwargs): 4345 def on_alarm(*args): 4346 # Will be called reentrantly from the same thread 4347 wio.write(data) 4348 1/0 4349 signal.signal(signal.SIGALRM, on_alarm) 4350 r, w = os.pipe() 4351 wio = self.io.open(w, **fdopen_kwargs) 4352 try: 4353 signal.alarm(1) 4354 # Either the reentrant call to wio.write() fails with RuntimeError, 4355 # or the signal handler raises ZeroDivisionError. 4356 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm: 4357 while 1: 4358 for i in range(100): 4359 wio.write(data) 4360 wio.flush() 4361 # Make sure the buffer doesn't fill up and block further writes 4362 os.read(r, len(data) * 100) 4363 exc = cm.exception 4364 if isinstance(exc, RuntimeError): 4365 self.assertTrue(str(exc).startswith("reentrant call"), str(exc)) 4366 finally: 4367 signal.alarm(0) 4368 wio.close() 4369 os.close(r) 4370 4371 def test_reentrant_write_buffered(self): 4372 self.check_reentrant_write(b"xy", mode="wb") 4373 4374 def test_reentrant_write_text(self): 4375 self.check_reentrant_write("xy", mode="w", encoding="ascii") 4376 4377 def check_interrupted_read_retry(self, decode, **fdopen_kwargs): 4378 """Check that a buffered read, when it gets interrupted (either 4379 returning a partial result or EINTR), properly invokes the signal 4380 handler and retries if the latter returned successfully.""" 4381 r, w = os.pipe() 4382 fdopen_kwargs["closefd"] = False 4383 def alarm_handler(sig, frame): 4384 os.write(w, b"bar") 4385 signal.signal(signal.SIGALRM, alarm_handler) 4386 try: 4387 rio = self.io.open(r, **fdopen_kwargs) 4388 os.write(w, b"foo") 4389 signal.alarm(1) 4390 # Expected behaviour: 4391 # - first raw read() returns partial b"foo" 4392 # - second raw read() returns EINTR 4393 # - third raw read() returns b"bar" 4394 self.assertEqual(decode(rio.read(6)), "foobar") 4395 finally: 4396 signal.alarm(0) 4397 rio.close() 4398 os.close(w) 4399 os.close(r) 4400 4401 def test_interrupted_read_retry_buffered(self): 4402 self.check_interrupted_read_retry(lambda x: x.decode('latin1'), 4403 mode="rb") 4404 4405 def test_interrupted_read_retry_text(self): 4406 self.check_interrupted_read_retry(lambda x: x, 4407 mode="r") 4408 4409 def check_interrupted_write_retry(self, item, **fdopen_kwargs): 4410 """Check that a buffered write, when it gets interrupted (either 4411 returning a partial result or EINTR), properly invokes the signal 4412 handler and retries if the latter returned successfully.""" 4413 select = support.import_module("select") 4414 4415 # A quantity that exceeds the buffer size of an anonymous pipe's 4416 # write end. 4417 N = support.PIPE_MAX_SIZE 4418 r, w = os.pipe() 4419 fdopen_kwargs["closefd"] = False 4420 4421 # We need a separate thread to read from the pipe and allow the 4422 # write() to finish. This thread is started after the SIGALRM is 4423 # received (forcing a first EINTR in write()). 4424 read_results = [] 4425 write_finished = False 4426 error = None 4427 def _read(): 4428 try: 4429 while not write_finished: 4430 while r in select.select([r], [], [], 1.0)[0]: 4431 s = os.read(r, 1024) 4432 read_results.append(s) 4433 except BaseException as exc: 4434 nonlocal error 4435 error = exc 4436 t = threading.Thread(target=_read) 4437 t.daemon = True 4438 def alarm1(sig, frame): 4439 signal.signal(signal.SIGALRM, alarm2) 4440 signal.alarm(1) 4441 def alarm2(sig, frame): 4442 t.start() 4443 4444 large_data = item * N 4445 signal.signal(signal.SIGALRM, alarm1) 4446 try: 4447 wio = self.io.open(w, **fdopen_kwargs) 4448 signal.alarm(1) 4449 # Expected behaviour: 4450 # - first raw write() is partial (because of the limited pipe buffer 4451 # and the first alarm) 4452 # - second raw write() returns EINTR (because of the second alarm) 4453 # - subsequent write()s are successful (either partial or complete) 4454 written = wio.write(large_data) 4455 self.assertEqual(N, written) 4456 4457 wio.flush() 4458 write_finished = True 4459 t.join() 4460 4461 self.assertIsNone(error) 4462 self.assertEqual(N, sum(len(x) for x in read_results)) 4463 finally: 4464 signal.alarm(0) 4465 write_finished = True 4466 os.close(w) 4467 os.close(r) 4468 # This is deliberate. If we didn't close the file descriptor 4469 # before closing wio, wio would try to flush its internal 4470 # buffer, and could block (in case of failure). 4471 try: 4472 wio.close() 4473 except OSError as e: 4474 if e.errno != errno.EBADF: 4475 raise 4476 4477 def test_interrupted_write_retry_buffered(self): 4478 self.check_interrupted_write_retry(b"x", mode="wb") 4479 4480 def test_interrupted_write_retry_text(self): 4481 self.check_interrupted_write_retry("x", mode="w", encoding="latin1") 4482 4483 4484class CSignalsTest(SignalsTest): 4485 io = io 4486 4487class PySignalsTest(SignalsTest): 4488 io = pyio 4489 4490 # Handling reentrancy issues would slow down _pyio even more, so the 4491 # tests are disabled. 4492 test_reentrant_write_buffered = None 4493 test_reentrant_write_text = None 4494 4495 4496def load_tests(*args): 4497 tests = (CIOTest, PyIOTest, APIMismatchTest, 4498 CBufferedReaderTest, PyBufferedReaderTest, 4499 CBufferedWriterTest, PyBufferedWriterTest, 4500 CBufferedRWPairTest, PyBufferedRWPairTest, 4501 CBufferedRandomTest, PyBufferedRandomTest, 4502 StatefulIncrementalDecoderTest, 4503 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest, 4504 CTextIOWrapperTest, PyTextIOWrapperTest, 4505 CMiscIOTest, PyMiscIOTest, 4506 CSignalsTest, PySignalsTest, 4507 ) 4508 4509 # Put the namespaces of the IO module we are testing and some useful mock 4510 # classes in the __dict__ of each test. 4511 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO, 4512 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead, 4513 SlowFlushRawIO) 4514 all_members = io.__all__ + ["IncrementalNewlineDecoder"] 4515 c_io_ns = {name : getattr(io, name) for name in all_members} 4516 py_io_ns = {name : getattr(pyio, name) for name in all_members} 4517 globs = globals() 4518 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks) 4519 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks) 4520 # Avoid turning open into a bound method. 4521 py_io_ns["open"] = pyio.OpenWrapper 4522 for test in tests: 4523 if test.__name__.startswith("C"): 4524 for name, obj in c_io_ns.items(): 4525 setattr(test, name, obj) 4526 elif test.__name__.startswith("Py"): 4527 for name, obj in py_io_ns.items(): 4528 setattr(test, name, obj) 4529 4530 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests]) 4531 return suite 4532 4533if __name__ == "__main__": 4534 unittest.main() 4535