1import unittest 2from test import support 3import binascii 4import pickle 5import random 6import sys 7from test.support import bigmemtest, _1G, _4G 8 9zlib = support.import_module('zlib') 10 11requires_Compress_copy = unittest.skipUnless( 12 hasattr(zlib.compressobj(), "copy"), 13 'requires Compress.copy()') 14requires_Decompress_copy = unittest.skipUnless( 15 hasattr(zlib.decompressobj(), "copy"), 16 'requires Decompress.copy()') 17 18 19class VersionTestCase(unittest.TestCase): 20 21 def test_library_version(self): 22 # Test that the major version of the actual library in use matches the 23 # major version that we were compiled against. We can't guarantee that 24 # the minor versions will match (even on the machine on which the module 25 # was compiled), and the API is stable between minor versions, so 26 # testing only the major versions avoids spurious failures. 27 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 28 29 30class ChecksumTestCase(unittest.TestCase): 31 # checksum test cases 32 def test_crc32start(self): 33 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 34 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 35 36 def test_crc32empty(self): 37 self.assertEqual(zlib.crc32(b"", 0), 0) 38 self.assertEqual(zlib.crc32(b"", 1), 1) 39 self.assertEqual(zlib.crc32(b"", 432), 432) 40 41 def test_adler32start(self): 42 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 43 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 44 45 def test_adler32empty(self): 46 self.assertEqual(zlib.adler32(b"", 0), 0) 47 self.assertEqual(zlib.adler32(b"", 1), 1) 48 self.assertEqual(zlib.adler32(b"", 432), 432) 49 50 def test_penguins(self): 51 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 52 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 53 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 54 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 55 56 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 57 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 58 59 def test_crc32_adler32_unsigned(self): 60 foo = b'abcdefghijklmnop' 61 # explicitly test signed behavior 62 self.assertEqual(zlib.crc32(foo), 2486878355) 63 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 64 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 65 self.assertEqual(zlib.adler32(b'spam'), 72286642) 66 67 def test_same_as_binascii_crc32(self): 68 foo = b'abcdefghijklmnop' 69 crc = 2486878355 70 self.assertEqual(binascii.crc32(foo), crc) 71 self.assertEqual(zlib.crc32(foo), crc) 72 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 73 74 75# Issue #10276 - check that inputs >=4GB are handled correctly. 76class ChecksumBigBufferTestCase(unittest.TestCase): 77 78 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 79 def test_big_buffer(self, size): 80 data = b"nyan" * (_1G + 1) 81 self.assertEqual(zlib.crc32(data), 1044521549) 82 self.assertEqual(zlib.adler32(data), 2256789997) 83 84 85class ExceptionTestCase(unittest.TestCase): 86 # make sure we generate some expected errors 87 def test_badlevel(self): 88 # specifying compression level out of range causes an error 89 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 90 # accepts 0 too) 91 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 92 93 def test_badargs(self): 94 self.assertRaises(TypeError, zlib.adler32) 95 self.assertRaises(TypeError, zlib.crc32) 96 self.assertRaises(TypeError, zlib.compress) 97 self.assertRaises(TypeError, zlib.decompress) 98 for arg in (42, None, '', 'abc', (), []): 99 self.assertRaises(TypeError, zlib.adler32, arg) 100 self.assertRaises(TypeError, zlib.crc32, arg) 101 self.assertRaises(TypeError, zlib.compress, arg) 102 self.assertRaises(TypeError, zlib.decompress, arg) 103 104 def test_badcompressobj(self): 105 # verify failure on building compress object with bad params 106 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 107 # specifying total bits too large causes an error 108 self.assertRaises(ValueError, 109 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 110 111 def test_baddecompressobj(self): 112 # verify failure on building decompress object with bad params 113 self.assertRaises(ValueError, zlib.decompressobj, -1) 114 115 def test_decompressobj_badflush(self): 116 # verify failure on calling decompressobj.flush with bad params 117 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 118 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 119 120 @support.cpython_only 121 def test_overflow(self): 122 with self.assertRaisesRegex(OverflowError, 'int too large'): 123 zlib.decompress(b'', 15, sys.maxsize + 1) 124 with self.assertRaisesRegex(OverflowError, 'int too large'): 125 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 126 with self.assertRaisesRegex(OverflowError, 'int too large'): 127 zlib.decompressobj().flush(sys.maxsize + 1) 128 129 130class BaseCompressTestCase(object): 131 def check_big_compress_buffer(self, size, compress_func): 132 _1M = 1024 * 1024 133 # Generate 10MB worth of random, and expand it by repeating it. 134 # The assumption is that zlib's memory is not big enough to exploit 135 # such spread out redundancy. 136 data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little') 137 for i in range(10)]) 138 data = data * (size // len(data) + 1) 139 try: 140 compress_func(data) 141 finally: 142 # Release memory 143 data = None 144 145 def check_big_decompress_buffer(self, size, decompress_func): 146 data = b'x' * size 147 try: 148 compressed = zlib.compress(data, 1) 149 finally: 150 # Release memory 151 data = None 152 data = decompress_func(compressed) 153 # Sanity check 154 try: 155 self.assertEqual(len(data), size) 156 self.assertEqual(len(data.strip(b'x')), 0) 157 finally: 158 data = None 159 160 161class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 162 # Test compression in one go (whole message compression) 163 def test_speech(self): 164 x = zlib.compress(HAMLET_SCENE) 165 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 166 167 def test_keywords(self): 168 x = zlib.compress(HAMLET_SCENE, level=3) 169 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 170 with self.assertRaises(TypeError): 171 zlib.compress(data=HAMLET_SCENE, level=3) 172 self.assertEqual(zlib.decompress(x, 173 wbits=zlib.MAX_WBITS, 174 bufsize=zlib.DEF_BUF_SIZE), 175 HAMLET_SCENE) 176 177 def test_speech128(self): 178 # compress more data 179 data = HAMLET_SCENE * 128 180 x = zlib.compress(data) 181 self.assertEqual(zlib.compress(bytearray(data)), x) 182 for ob in x, bytearray(x): 183 self.assertEqual(zlib.decompress(ob), data) 184 185 def test_incomplete_stream(self): 186 # A useful error message is given 187 x = zlib.compress(HAMLET_SCENE) 188 self.assertRaisesRegex(zlib.error, 189 "Error -5 while decompressing data: incomplete or truncated stream", 190 zlib.decompress, x[:-1]) 191 192 # Memory use of the following functions takes into account overallocation 193 194 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 195 def test_big_compress_buffer(self, size): 196 compress = lambda s: zlib.compress(s, 1) 197 self.check_big_compress_buffer(size, compress) 198 199 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 200 def test_big_decompress_buffer(self, size): 201 self.check_big_decompress_buffer(size, zlib.decompress) 202 203 @bigmemtest(size=_4G, memuse=1) 204 def test_large_bufsize(self, size): 205 # Test decompress(bufsize) parameter greater than the internal limit 206 data = HAMLET_SCENE * 10 207 compressed = zlib.compress(data, 1) 208 self.assertEqual(zlib.decompress(compressed, 15, size), data) 209 210 def test_custom_bufsize(self): 211 data = HAMLET_SCENE * 10 212 compressed = zlib.compress(data, 1) 213 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 214 215 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 216 @bigmemtest(size=_4G + 100, memuse=4) 217 def test_64bit_compress(self, size): 218 data = b'x' * size 219 try: 220 comp = zlib.compress(data, 0) 221 self.assertEqual(zlib.decompress(comp), data) 222 finally: 223 comp = data = None 224 225 226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 227 # Test compression object 228 def test_pair(self): 229 # straightforward compress/decompress objects 230 datasrc = HAMLET_SCENE * 128 231 datazip = zlib.compress(datasrc) 232 # should compress both bytes and bytearray data 233 for data in (datasrc, bytearray(datasrc)): 234 co = zlib.compressobj() 235 x1 = co.compress(data) 236 x2 = co.flush() 237 self.assertRaises(zlib.error, co.flush) # second flush should not work 238 self.assertEqual(x1 + x2, datazip) 239 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 240 dco = zlib.decompressobj() 241 y1 = dco.decompress(v1 + v2) 242 y2 = dco.flush() 243 self.assertEqual(data, y1 + y2) 244 self.assertIsInstance(dco.unconsumed_tail, bytes) 245 self.assertIsInstance(dco.unused_data, bytes) 246 247 def test_keywords(self): 248 level = 2 249 method = zlib.DEFLATED 250 wbits = -12 251 memLevel = 9 252 strategy = zlib.Z_FILTERED 253 co = zlib.compressobj(level=level, 254 method=method, 255 wbits=wbits, 256 memLevel=memLevel, 257 strategy=strategy, 258 zdict=b"") 259 do = zlib.decompressobj(wbits=wbits, zdict=b"") 260 with self.assertRaises(TypeError): 261 co.compress(data=HAMLET_SCENE) 262 with self.assertRaises(TypeError): 263 do.decompress(data=zlib.compress(HAMLET_SCENE)) 264 x = co.compress(HAMLET_SCENE) + co.flush() 265 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 266 self.assertEqual(HAMLET_SCENE, y) 267 268 def test_compressoptions(self): 269 # specify lots of options to compressobj() 270 level = 2 271 method = zlib.DEFLATED 272 wbits = -12 273 memLevel = 9 274 strategy = zlib.Z_FILTERED 275 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 276 x1 = co.compress(HAMLET_SCENE) 277 x2 = co.flush() 278 dco = zlib.decompressobj(wbits) 279 y1 = dco.decompress(x1 + x2) 280 y2 = dco.flush() 281 self.assertEqual(HAMLET_SCENE, y1 + y2) 282 283 def test_compressincremental(self): 284 # compress object in steps, decompress object as one-shot 285 data = HAMLET_SCENE * 128 286 co = zlib.compressobj() 287 bufs = [] 288 for i in range(0, len(data), 256): 289 bufs.append(co.compress(data[i:i+256])) 290 bufs.append(co.flush()) 291 combuf = b''.join(bufs) 292 293 dco = zlib.decompressobj() 294 y1 = dco.decompress(b''.join(bufs)) 295 y2 = dco.flush() 296 self.assertEqual(data, y1 + y2) 297 298 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 299 # compress object in steps, decompress object in steps 300 source = source or HAMLET_SCENE 301 data = source * 128 302 co = zlib.compressobj() 303 bufs = [] 304 for i in range(0, len(data), cx): 305 bufs.append(co.compress(data[i:i+cx])) 306 bufs.append(co.flush()) 307 combuf = b''.join(bufs) 308 309 decombuf = zlib.decompress(combuf) 310 # Test type of return value 311 self.assertIsInstance(decombuf, bytes) 312 313 self.assertEqual(data, decombuf) 314 315 dco = zlib.decompressobj() 316 bufs = [] 317 for i in range(0, len(combuf), dcx): 318 bufs.append(dco.decompress(combuf[i:i+dcx])) 319 self.assertEqual(b'', dco.unconsumed_tail, ######## 320 "(A) uct should be b'': not %d long" % 321 len(dco.unconsumed_tail)) 322 self.assertEqual(b'', dco.unused_data) 323 if flush: 324 bufs.append(dco.flush()) 325 else: 326 while True: 327 chunk = dco.decompress(b'') 328 if chunk: 329 bufs.append(chunk) 330 else: 331 break 332 self.assertEqual(b'', dco.unconsumed_tail, ######## 333 "(B) uct should be b'': not %d long" % 334 len(dco.unconsumed_tail)) 335 self.assertEqual(b'', dco.unused_data) 336 self.assertEqual(data, b''.join(bufs)) 337 # Failure means: "decompressobj with init options failed" 338 339 def test_decompincflush(self): 340 self.test_decompinc(flush=True) 341 342 def test_decompimax(self, source=None, cx=256, dcx=64): 343 # compress in steps, decompress in length-restricted steps 344 source = source or HAMLET_SCENE 345 # Check a decompression object with max_length specified 346 data = source * 128 347 co = zlib.compressobj() 348 bufs = [] 349 for i in range(0, len(data), cx): 350 bufs.append(co.compress(data[i:i+cx])) 351 bufs.append(co.flush()) 352 combuf = b''.join(bufs) 353 self.assertEqual(data, zlib.decompress(combuf), 354 'compressed data failure') 355 356 dco = zlib.decompressobj() 357 bufs = [] 358 cb = combuf 359 while cb: 360 #max_length = 1 + len(cb)//10 361 chunk = dco.decompress(cb, dcx) 362 self.assertFalse(len(chunk) > dcx, 363 'chunk too big (%d>%d)' % (len(chunk), dcx)) 364 bufs.append(chunk) 365 cb = dco.unconsumed_tail 366 bufs.append(dco.flush()) 367 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 368 369 def test_decompressmaxlen(self, flush=False): 370 # Check a decompression object with max_length specified 371 data = HAMLET_SCENE * 128 372 co = zlib.compressobj() 373 bufs = [] 374 for i in range(0, len(data), 256): 375 bufs.append(co.compress(data[i:i+256])) 376 bufs.append(co.flush()) 377 combuf = b''.join(bufs) 378 self.assertEqual(data, zlib.decompress(combuf), 379 'compressed data failure') 380 381 dco = zlib.decompressobj() 382 bufs = [] 383 cb = combuf 384 while cb: 385 max_length = 1 + len(cb)//10 386 chunk = dco.decompress(cb, max_length) 387 self.assertFalse(len(chunk) > max_length, 388 'chunk too big (%d>%d)' % (len(chunk),max_length)) 389 bufs.append(chunk) 390 cb = dco.unconsumed_tail 391 if flush: 392 bufs.append(dco.flush()) 393 else: 394 while chunk: 395 chunk = dco.decompress(b'', max_length) 396 self.assertFalse(len(chunk) > max_length, 397 'chunk too big (%d>%d)' % (len(chunk),max_length)) 398 bufs.append(chunk) 399 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 400 401 def test_decompressmaxlenflush(self): 402 self.test_decompressmaxlen(flush=True) 403 404 def test_maxlenmisc(self): 405 # Misc tests of max_length 406 dco = zlib.decompressobj() 407 self.assertRaises(ValueError, dco.decompress, b"", -1) 408 self.assertEqual(b'', dco.unconsumed_tail) 409 410 def test_maxlen_large(self): 411 # Sizes up to sys.maxsize should be accepted, although zlib is 412 # internally limited to expressing sizes with unsigned int 413 data = HAMLET_SCENE * 10 414 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 415 compressed = zlib.compress(data, 1) 416 dco = zlib.decompressobj() 417 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 418 419 def test_maxlen_custom(self): 420 data = HAMLET_SCENE * 10 421 compressed = zlib.compress(data, 1) 422 dco = zlib.decompressobj() 423 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 424 425 def test_clear_unconsumed_tail(self): 426 # Issue #12050: calling decompress() without providing max_length 427 # should clear the unconsumed_tail attribute. 428 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 429 dco = zlib.decompressobj() 430 ddata = dco.decompress(cdata, 1) 431 ddata += dco.decompress(dco.unconsumed_tail) 432 self.assertEqual(dco.unconsumed_tail, b"") 433 434 def test_flushes(self): 435 # Test flush() with the various options, using all the 436 # different levels in order to provide more variations. 437 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 438 sync_opt = [getattr(zlib, opt) for opt in sync_opt 439 if hasattr(zlib, opt)] 440 data = HAMLET_SCENE * 8 441 442 for sync in sync_opt: 443 for level in range(10): 444 obj = zlib.compressobj( level ) 445 a = obj.compress( data[:3000] ) 446 b = obj.flush( sync ) 447 c = obj.compress( data[3000:] ) 448 d = obj.flush() 449 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 450 data, ("Decompress failed: flush " 451 "mode=%i, level=%i") % (sync, level)) 452 del obj 453 454 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 455 'requires zlib.Z_SYNC_FLUSH') 456 def test_odd_flush(self): 457 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 458 import random 459 # Testing on 17K of "random" data 460 461 # Create compressor and decompressor objects 462 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 463 dco = zlib.decompressobj() 464 465 # Try 17K of data 466 # generate random data stream 467 try: 468 # In 2.3 and later, WichmannHill is the RNG of the bug report 469 gen = random.WichmannHill() 470 except AttributeError: 471 try: 472 # 2.2 called it Random 473 gen = random.Random() 474 except AttributeError: 475 # others might simply have a single RNG 476 gen = random 477 gen.seed(1) 478 data = genblock(1, 17 * 1024, generator=gen) 479 480 # compress, sync-flush, and decompress 481 first = co.compress(data) 482 second = co.flush(zlib.Z_SYNC_FLUSH) 483 expanded = dco.decompress(first + second) 484 485 # if decompressed data is different from the input data, choke. 486 self.assertEqual(expanded, data, "17K random source doesn't match") 487 488 def test_empty_flush(self): 489 # Test that calling .flush() on unused objects works. 490 # (Bug #1083110 -- calling .flush() on decompress objects 491 # caused a core dump.) 492 493 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 494 self.assertTrue(co.flush()) # Returns a zlib header 495 dco = zlib.decompressobj() 496 self.assertEqual(dco.flush(), b"") # Returns nothing 497 498 def test_dictionary(self): 499 h = HAMLET_SCENE 500 # Build a simulated dictionary out of the words in HAMLET. 501 words = h.split() 502 random.shuffle(words) 503 zdict = b''.join(words) 504 # Use it to compress HAMLET. 505 co = zlib.compressobj(zdict=zdict) 506 cd = co.compress(h) + co.flush() 507 # Verify that it will decompress with the dictionary. 508 dco = zlib.decompressobj(zdict=zdict) 509 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 510 # Verify that it fails when not given the dictionary. 511 dco = zlib.decompressobj() 512 self.assertRaises(zlib.error, dco.decompress, cd) 513 514 def test_dictionary_streaming(self): 515 # This simulates the reuse of a compressor object for compressing 516 # several separate data streams. 517 co = zlib.compressobj(zdict=HAMLET_SCENE) 518 do = zlib.decompressobj(zdict=HAMLET_SCENE) 519 piece = HAMLET_SCENE[1000:1500] 520 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 521 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 522 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 523 self.assertEqual(do.decompress(d0), piece) 524 self.assertEqual(do.decompress(d1), piece[100:]) 525 self.assertEqual(do.decompress(d2), piece[:-100]) 526 527 def test_decompress_incomplete_stream(self): 528 # This is 'foo', deflated 529 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 530 # For the record 531 self.assertEqual(zlib.decompress(x), b'foo') 532 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 533 # Omitting the stream end works with decompressor objects 534 # (see issue #8672). 535 dco = zlib.decompressobj() 536 y = dco.decompress(x[:-5]) 537 y += dco.flush() 538 self.assertEqual(y, b'foo') 539 540 def test_decompress_eof(self): 541 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 542 dco = zlib.decompressobj() 543 self.assertFalse(dco.eof) 544 dco.decompress(x[:-5]) 545 self.assertFalse(dco.eof) 546 dco.decompress(x[-5:]) 547 self.assertTrue(dco.eof) 548 dco.flush() 549 self.assertTrue(dco.eof) 550 551 def test_decompress_eof_incomplete_stream(self): 552 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 553 dco = zlib.decompressobj() 554 self.assertFalse(dco.eof) 555 dco.decompress(x[:-5]) 556 self.assertFalse(dco.eof) 557 dco.flush() 558 self.assertFalse(dco.eof) 559 560 def test_decompress_unused_data(self): 561 # Repeated calls to decompress() after EOF should accumulate data in 562 # dco.unused_data, instead of just storing the arg to the last call. 563 source = b'abcdefghijklmnopqrstuvwxyz' 564 remainder = b'0123456789' 565 y = zlib.compress(source) 566 x = y + remainder 567 for maxlen in 0, 1000: 568 for step in 1, 2, len(y), len(x): 569 dco = zlib.decompressobj() 570 data = b'' 571 for i in range(0, len(x), step): 572 if i < len(y): 573 self.assertEqual(dco.unused_data, b'') 574 if maxlen == 0: 575 data += dco.decompress(x[i : i + step]) 576 self.assertEqual(dco.unconsumed_tail, b'') 577 else: 578 data += dco.decompress( 579 dco.unconsumed_tail + x[i : i + step], maxlen) 580 data += dco.flush() 581 self.assertTrue(dco.eof) 582 self.assertEqual(data, source) 583 self.assertEqual(dco.unconsumed_tail, b'') 584 self.assertEqual(dco.unused_data, remainder) 585 586 # issue27164 587 def test_decompress_raw_with_dictionary(self): 588 zdict = b'abcdefghijklmnopqrstuvwxyz' 589 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 590 comp = co.compress(zdict) + co.flush() 591 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 592 uncomp = dco.decompress(comp) + dco.flush() 593 self.assertEqual(zdict, uncomp) 594 595 def test_flush_with_freed_input(self): 596 # Issue #16411: decompressor accesses input to last decompress() call 597 # in flush(), even if this object has been freed in the meanwhile. 598 input1 = b'abcdefghijklmnopqrstuvwxyz' 599 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 600 data = zlib.compress(input1) 601 dco = zlib.decompressobj() 602 dco.decompress(data, 1) 603 del data 604 data = zlib.compress(input2) 605 self.assertEqual(dco.flush(), input1[1:]) 606 607 @bigmemtest(size=_4G, memuse=1) 608 def test_flush_large_length(self, size): 609 # Test flush(length) parameter greater than internal limit UINT_MAX 610 input = HAMLET_SCENE * 10 611 data = zlib.compress(input, 1) 612 dco = zlib.decompressobj() 613 dco.decompress(data, 1) 614 self.assertEqual(dco.flush(size), input[1:]) 615 616 def test_flush_custom_length(self): 617 input = HAMLET_SCENE * 10 618 data = zlib.compress(input, 1) 619 dco = zlib.decompressobj() 620 dco.decompress(data, 1) 621 self.assertEqual(dco.flush(CustomInt()), input[1:]) 622 623 @requires_Compress_copy 624 def test_compresscopy(self): 625 # Test copying a compression object 626 data0 = HAMLET_SCENE 627 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 628 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 629 bufs0 = [] 630 bufs0.append(c0.compress(data0)) 631 632 c1 = c0.copy() 633 bufs1 = bufs0[:] 634 635 bufs0.append(c0.compress(data0)) 636 bufs0.append(c0.flush()) 637 s0 = b''.join(bufs0) 638 639 bufs1.append(c1.compress(data1)) 640 bufs1.append(c1.flush()) 641 s1 = b''.join(bufs1) 642 643 self.assertEqual(zlib.decompress(s0),data0+data0) 644 self.assertEqual(zlib.decompress(s1),data0+data1) 645 646 @requires_Compress_copy 647 def test_badcompresscopy(self): 648 # Test copying a compression object in an inconsistent state 649 c = zlib.compressobj() 650 c.compress(HAMLET_SCENE) 651 c.flush() 652 self.assertRaises(ValueError, c.copy) 653 654 @requires_Decompress_copy 655 def test_decompresscopy(self): 656 # Test copying a decompression object 657 data = HAMLET_SCENE 658 comp = zlib.compress(data) 659 # Test type of return value 660 self.assertIsInstance(comp, bytes) 661 662 d0 = zlib.decompressobj() 663 bufs0 = [] 664 bufs0.append(d0.decompress(comp[:32])) 665 666 d1 = d0.copy() 667 bufs1 = bufs0[:] 668 669 bufs0.append(d0.decompress(comp[32:])) 670 s0 = b''.join(bufs0) 671 672 bufs1.append(d1.decompress(comp[32:])) 673 s1 = b''.join(bufs1) 674 675 self.assertEqual(s0,s1) 676 self.assertEqual(s0,data) 677 678 @requires_Decompress_copy 679 def test_baddecompresscopy(self): 680 # Test copying a compression object in an inconsistent state 681 data = zlib.compress(HAMLET_SCENE) 682 d = zlib.decompressobj() 683 d.decompress(data) 684 d.flush() 685 self.assertRaises(ValueError, d.copy) 686 687 def test_compresspickle(self): 688 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 689 with self.assertRaises((TypeError, pickle.PicklingError)): 690 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 691 692 def test_decompresspickle(self): 693 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 694 with self.assertRaises((TypeError, pickle.PicklingError)): 695 pickle.dumps(zlib.decompressobj(), proto) 696 697 # Memory use of the following functions takes into account overallocation 698 699 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 700 def test_big_compress_buffer(self, size): 701 c = zlib.compressobj(1) 702 compress = lambda s: c.compress(s) + c.flush() 703 self.check_big_compress_buffer(size, compress) 704 705 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 706 def test_big_decompress_buffer(self, size): 707 d = zlib.decompressobj() 708 decompress = lambda s: d.decompress(s) + d.flush() 709 self.check_big_decompress_buffer(size, decompress) 710 711 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 712 @bigmemtest(size=_4G + 100, memuse=4) 713 def test_64bit_compress(self, size): 714 data = b'x' * size 715 co = zlib.compressobj(0) 716 do = zlib.decompressobj() 717 try: 718 comp = co.compress(data) + co.flush() 719 uncomp = do.decompress(comp) + do.flush() 720 self.assertEqual(uncomp, data) 721 finally: 722 comp = uncomp = data = None 723 724 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 725 @bigmemtest(size=_4G + 100, memuse=3) 726 def test_large_unused_data(self, size): 727 data = b'abcdefghijklmnop' 728 unused = b'x' * size 729 comp = zlib.compress(data) + unused 730 do = zlib.decompressobj() 731 try: 732 uncomp = do.decompress(comp) + do.flush() 733 self.assertEqual(unused, do.unused_data) 734 self.assertEqual(uncomp, data) 735 finally: 736 unused = comp = do = None 737 738 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 739 @bigmemtest(size=_4G + 100, memuse=5) 740 def test_large_unconsumed_tail(self, size): 741 data = b'x' * size 742 do = zlib.decompressobj() 743 try: 744 comp = zlib.compress(data, 0) 745 uncomp = do.decompress(comp, 1) + do.flush() 746 self.assertEqual(uncomp, data) 747 self.assertEqual(do.unconsumed_tail, b'') 748 finally: 749 comp = uncomp = data = None 750 751 def test_wbits(self): 752 # wbits=0 only supported since zlib v1.2.3.5 753 # Register "1.2.3" as "1.2.3.0" 754 v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4) 755 supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \ 756 and (int(v[1]) > 2 or int(v[1]) == 2 757 and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5)) 758 759 co = zlib.compressobj(level=1, wbits=15) 760 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 761 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 762 if supports_wbits_0: 763 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 764 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 765 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 766 zlib.decompress(zlib15, 14) 767 dco = zlib.decompressobj(wbits=32 + 15) 768 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 769 dco = zlib.decompressobj(wbits=14) 770 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 771 dco.decompress(zlib15) 772 773 co = zlib.compressobj(level=1, wbits=9) 774 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 775 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 776 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 777 if supports_wbits_0: 778 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 779 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 780 dco = zlib.decompressobj(wbits=32 + 9) 781 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 782 783 co = zlib.compressobj(level=1, wbits=-15) 784 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 785 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 786 dco = zlib.decompressobj(wbits=-15) 787 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 788 789 co = zlib.compressobj(level=1, wbits=-9) 790 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 791 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 792 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 793 dco = zlib.decompressobj(wbits=-9) 794 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 795 796 co = zlib.compressobj(level=1, wbits=16 + 15) 797 gzip = co.compress(HAMLET_SCENE) + co.flush() 798 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 799 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 800 dco = zlib.decompressobj(32 + 15) 801 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 802 803 804def genblock(seed, length, step=1024, generator=random): 805 """length-byte stream of random data from a seed (in step-byte blocks).""" 806 if seed is not None: 807 generator.seed(seed) 808 randint = generator.randint 809 if length < step or step < 2: 810 step = length 811 blocks = bytes() 812 for i in range(0, length, step): 813 blocks += bytes(randint(0, 255) for x in range(step)) 814 return blocks 815 816 817 818def choose_lines(source, number, seed=None, generator=random): 819 """Return a list of number lines randomly chosen from the source""" 820 if seed is not None: 821 generator.seed(seed) 822 sources = source.split('\n') 823 return [generator.choice(sources) for n in range(number)] 824 825 826 827HAMLET_SCENE = b""" 828LAERTES 829 830 O, fear me not. 831 I stay too long: but here my father comes. 832 833 Enter POLONIUS 834 835 A double blessing is a double grace, 836 Occasion smiles upon a second leave. 837 838LORD POLONIUS 839 840 Yet here, Laertes! aboard, aboard, for shame! 841 The wind sits in the shoulder of your sail, 842 And you are stay'd for. There; my blessing with thee! 843 And these few precepts in thy memory 844 See thou character. Give thy thoughts no tongue, 845 Nor any unproportioned thought his act. 846 Be thou familiar, but by no means vulgar. 847 Those friends thou hast, and their adoption tried, 848 Grapple them to thy soul with hoops of steel; 849 But do not dull thy palm with entertainment 850 Of each new-hatch'd, unfledged comrade. Beware 851 Of entrance to a quarrel, but being in, 852 Bear't that the opposed may beware of thee. 853 Give every man thy ear, but few thy voice; 854 Take each man's censure, but reserve thy judgment. 855 Costly thy habit as thy purse can buy, 856 But not express'd in fancy; rich, not gaudy; 857 For the apparel oft proclaims the man, 858 And they in France of the best rank and station 859 Are of a most select and generous chief in that. 860 Neither a borrower nor a lender be; 861 For loan oft loses both itself and friend, 862 And borrowing dulls the edge of husbandry. 863 This above all: to thine ownself be true, 864 And it must follow, as the night the day, 865 Thou canst not then be false to any man. 866 Farewell: my blessing season this in thee! 867 868LAERTES 869 870 Most humbly do I take my leave, my lord. 871 872LORD POLONIUS 873 874 The time invites you; go; your servants tend. 875 876LAERTES 877 878 Farewell, Ophelia; and remember well 879 What I have said to you. 880 881OPHELIA 882 883 'Tis in my memory lock'd, 884 And you yourself shall keep the key of it. 885 886LAERTES 887 888 Farewell. 889""" 890 891 892class CustomInt: 893 def __int__(self): 894 return 100 895 896 897if __name__ == "__main__": 898 unittest.main() 899