1import unittest
2from test import support
3import binascii
4import pickle
5import random
6import sys
7from test.support import bigmemtest, _1G, _4G
8
9zlib = support.import_module('zlib')
10
11requires_Compress_copy = unittest.skipUnless(
12        hasattr(zlib.compressobj(), "copy"),
13        'requires Compress.copy()')
14requires_Decompress_copy = unittest.skipUnless(
15        hasattr(zlib.decompressobj(), "copy"),
16        'requires Decompress.copy()')
17
18
19class VersionTestCase(unittest.TestCase):
20
21    def test_library_version(self):
22        # Test that the major version of the actual library in use matches the
23        # major version that we were compiled against. We can't guarantee that
24        # the minor versions will match (even on the machine on which the module
25        # was compiled), and the API is stable between minor versions, so
26        # testing only the major versions avoids spurious failures.
27        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
28
29
30class ChecksumTestCase(unittest.TestCase):
31    # checksum test cases
32    def test_crc32start(self):
33        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
34        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
35
36    def test_crc32empty(self):
37        self.assertEqual(zlib.crc32(b"", 0), 0)
38        self.assertEqual(zlib.crc32(b"", 1), 1)
39        self.assertEqual(zlib.crc32(b"", 432), 432)
40
41    def test_adler32start(self):
42        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
43        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
44
45    def test_adler32empty(self):
46        self.assertEqual(zlib.adler32(b"", 0), 0)
47        self.assertEqual(zlib.adler32(b"", 1), 1)
48        self.assertEqual(zlib.adler32(b"", 432), 432)
49
50    def test_penguins(self):
51        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
52        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
53        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
54        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
55
56        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
57        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
58
59    def test_crc32_adler32_unsigned(self):
60        foo = b'abcdefghijklmnop'
61        # explicitly test signed behavior
62        self.assertEqual(zlib.crc32(foo), 2486878355)
63        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
64        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
65        self.assertEqual(zlib.adler32(b'spam'), 72286642)
66
67    def test_same_as_binascii_crc32(self):
68        foo = b'abcdefghijklmnop'
69        crc = 2486878355
70        self.assertEqual(binascii.crc32(foo), crc)
71        self.assertEqual(zlib.crc32(foo), crc)
72        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
73
74
75# Issue #10276 - check that inputs >=4GB are handled correctly.
76class ChecksumBigBufferTestCase(unittest.TestCase):
77
78    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
79    def test_big_buffer(self, size):
80        data = b"nyan" * (_1G + 1)
81        self.assertEqual(zlib.crc32(data), 1044521549)
82        self.assertEqual(zlib.adler32(data), 2256789997)
83
84
85class ExceptionTestCase(unittest.TestCase):
86    # make sure we generate some expected errors
87    def test_badlevel(self):
88        # specifying compression level out of range causes an error
89        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
90        # accepts 0 too)
91        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
92
93    def test_badargs(self):
94        self.assertRaises(TypeError, zlib.adler32)
95        self.assertRaises(TypeError, zlib.crc32)
96        self.assertRaises(TypeError, zlib.compress)
97        self.assertRaises(TypeError, zlib.decompress)
98        for arg in (42, None, '', 'abc', (), []):
99            self.assertRaises(TypeError, zlib.adler32, arg)
100            self.assertRaises(TypeError, zlib.crc32, arg)
101            self.assertRaises(TypeError, zlib.compress, arg)
102            self.assertRaises(TypeError, zlib.decompress, arg)
103
104    def test_badcompressobj(self):
105        # verify failure on building compress object with bad params
106        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
107        # specifying total bits too large causes an error
108        self.assertRaises(ValueError,
109                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
110
111    def test_baddecompressobj(self):
112        # verify failure on building decompress object with bad params
113        self.assertRaises(ValueError, zlib.decompressobj, -1)
114
115    def test_decompressobj_badflush(self):
116        # verify failure on calling decompressobj.flush with bad params
117        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
118        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
119
120    @support.cpython_only
121    def test_overflow(self):
122        with self.assertRaisesRegex(OverflowError, 'int too large'):
123            zlib.decompress(b'', 15, sys.maxsize + 1)
124        with self.assertRaisesRegex(OverflowError, 'int too large'):
125            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
126        with self.assertRaisesRegex(OverflowError, 'int too large'):
127            zlib.decompressobj().flush(sys.maxsize + 1)
128
129
130class BaseCompressTestCase(object):
131    def check_big_compress_buffer(self, size, compress_func):
132        _1M = 1024 * 1024
133        # Generate 10MB worth of random, and expand it by repeating it.
134        # The assumption is that zlib's memory is not big enough to exploit
135        # such spread out redundancy.
136        data = b''.join([random.getrandbits(8 * _1M).to_bytes(_1M, 'little')
137                        for i in range(10)])
138        data = data * (size // len(data) + 1)
139        try:
140            compress_func(data)
141        finally:
142            # Release memory
143            data = None
144
145    def check_big_decompress_buffer(self, size, decompress_func):
146        data = b'x' * size
147        try:
148            compressed = zlib.compress(data, 1)
149        finally:
150            # Release memory
151            data = None
152        data = decompress_func(compressed)
153        # Sanity check
154        try:
155            self.assertEqual(len(data), size)
156            self.assertEqual(len(data.strip(b'x')), 0)
157        finally:
158            data = None
159
160
161class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
162    # Test compression in one go (whole message compression)
163    def test_speech(self):
164        x = zlib.compress(HAMLET_SCENE)
165        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
166
167    def test_keywords(self):
168        x = zlib.compress(HAMLET_SCENE, level=3)
169        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
170        with self.assertRaises(TypeError):
171            zlib.compress(data=HAMLET_SCENE, level=3)
172        self.assertEqual(zlib.decompress(x,
173                                         wbits=zlib.MAX_WBITS,
174                                         bufsize=zlib.DEF_BUF_SIZE),
175                         HAMLET_SCENE)
176
177    def test_speech128(self):
178        # compress more data
179        data = HAMLET_SCENE * 128
180        x = zlib.compress(data)
181        self.assertEqual(zlib.compress(bytearray(data)), x)
182        for ob in x, bytearray(x):
183            self.assertEqual(zlib.decompress(ob), data)
184
185    def test_incomplete_stream(self):
186        # A useful error message is given
187        x = zlib.compress(HAMLET_SCENE)
188        self.assertRaisesRegex(zlib.error,
189            "Error -5 while decompressing data: incomplete or truncated stream",
190            zlib.decompress, x[:-1])
191
192    # Memory use of the following functions takes into account overallocation
193
194    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
195    def test_big_compress_buffer(self, size):
196        compress = lambda s: zlib.compress(s, 1)
197        self.check_big_compress_buffer(size, compress)
198
199    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
200    def test_big_decompress_buffer(self, size):
201        self.check_big_decompress_buffer(size, zlib.decompress)
202
203    @bigmemtest(size=_4G, memuse=1)
204    def test_large_bufsize(self, size):
205        # Test decompress(bufsize) parameter greater than the internal limit
206        data = HAMLET_SCENE * 10
207        compressed = zlib.compress(data, 1)
208        self.assertEqual(zlib.decompress(compressed, 15, size), data)
209
210    def test_custom_bufsize(self):
211        data = HAMLET_SCENE * 10
212        compressed = zlib.compress(data, 1)
213        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214
215    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216    @bigmemtest(size=_4G + 100, memuse=4)
217    def test_64bit_compress(self, size):
218        data = b'x' * size
219        try:
220            comp = zlib.compress(data, 0)
221            self.assertEqual(zlib.decompress(comp), data)
222        finally:
223            comp = data = None
224
225
226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
227    # Test compression object
228    def test_pair(self):
229        # straightforward compress/decompress objects
230        datasrc = HAMLET_SCENE * 128
231        datazip = zlib.compress(datasrc)
232        # should compress both bytes and bytearray data
233        for data in (datasrc, bytearray(datasrc)):
234            co = zlib.compressobj()
235            x1 = co.compress(data)
236            x2 = co.flush()
237            self.assertRaises(zlib.error, co.flush) # second flush should not work
238            self.assertEqual(x1 + x2, datazip)
239        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240            dco = zlib.decompressobj()
241            y1 = dco.decompress(v1 + v2)
242            y2 = dco.flush()
243            self.assertEqual(data, y1 + y2)
244            self.assertIsInstance(dco.unconsumed_tail, bytes)
245            self.assertIsInstance(dco.unused_data, bytes)
246
247    def test_keywords(self):
248        level = 2
249        method = zlib.DEFLATED
250        wbits = -12
251        memLevel = 9
252        strategy = zlib.Z_FILTERED
253        co = zlib.compressobj(level=level,
254                              method=method,
255                              wbits=wbits,
256                              memLevel=memLevel,
257                              strategy=strategy,
258                              zdict=b"")
259        do = zlib.decompressobj(wbits=wbits, zdict=b"")
260        with self.assertRaises(TypeError):
261            co.compress(data=HAMLET_SCENE)
262        with self.assertRaises(TypeError):
263            do.decompress(data=zlib.compress(HAMLET_SCENE))
264        x = co.compress(HAMLET_SCENE) + co.flush()
265        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266        self.assertEqual(HAMLET_SCENE, y)
267
268    def test_compressoptions(self):
269        # specify lots of options to compressobj()
270        level = 2
271        method = zlib.DEFLATED
272        wbits = -12
273        memLevel = 9
274        strategy = zlib.Z_FILTERED
275        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
276        x1 = co.compress(HAMLET_SCENE)
277        x2 = co.flush()
278        dco = zlib.decompressobj(wbits)
279        y1 = dco.decompress(x1 + x2)
280        y2 = dco.flush()
281        self.assertEqual(HAMLET_SCENE, y1 + y2)
282
283    def test_compressincremental(self):
284        # compress object in steps, decompress object as one-shot
285        data = HAMLET_SCENE * 128
286        co = zlib.compressobj()
287        bufs = []
288        for i in range(0, len(data), 256):
289            bufs.append(co.compress(data[i:i+256]))
290        bufs.append(co.flush())
291        combuf = b''.join(bufs)
292
293        dco = zlib.decompressobj()
294        y1 = dco.decompress(b''.join(bufs))
295        y2 = dco.flush()
296        self.assertEqual(data, y1 + y2)
297
298    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
299        # compress object in steps, decompress object in steps
300        source = source or HAMLET_SCENE
301        data = source * 128
302        co = zlib.compressobj()
303        bufs = []
304        for i in range(0, len(data), cx):
305            bufs.append(co.compress(data[i:i+cx]))
306        bufs.append(co.flush())
307        combuf = b''.join(bufs)
308
309        decombuf = zlib.decompress(combuf)
310        # Test type of return value
311        self.assertIsInstance(decombuf, bytes)
312
313        self.assertEqual(data, decombuf)
314
315        dco = zlib.decompressobj()
316        bufs = []
317        for i in range(0, len(combuf), dcx):
318            bufs.append(dco.decompress(combuf[i:i+dcx]))
319            self.assertEqual(b'', dco.unconsumed_tail, ########
320                             "(A) uct should be b'': not %d long" %
321                                       len(dco.unconsumed_tail))
322            self.assertEqual(b'', dco.unused_data)
323        if flush:
324            bufs.append(dco.flush())
325        else:
326            while True:
327                chunk = dco.decompress(b'')
328                if chunk:
329                    bufs.append(chunk)
330                else:
331                    break
332        self.assertEqual(b'', dco.unconsumed_tail, ########
333                         "(B) uct should be b'': not %d long" %
334                                       len(dco.unconsumed_tail))
335        self.assertEqual(b'', dco.unused_data)
336        self.assertEqual(data, b''.join(bufs))
337        # Failure means: "decompressobj with init options failed"
338
339    def test_decompincflush(self):
340        self.test_decompinc(flush=True)
341
342    def test_decompimax(self, source=None, cx=256, dcx=64):
343        # compress in steps, decompress in length-restricted steps
344        source = source or HAMLET_SCENE
345        # Check a decompression object with max_length specified
346        data = source * 128
347        co = zlib.compressobj()
348        bufs = []
349        for i in range(0, len(data), cx):
350            bufs.append(co.compress(data[i:i+cx]))
351        bufs.append(co.flush())
352        combuf = b''.join(bufs)
353        self.assertEqual(data, zlib.decompress(combuf),
354                         'compressed data failure')
355
356        dco = zlib.decompressobj()
357        bufs = []
358        cb = combuf
359        while cb:
360            #max_length = 1 + len(cb)//10
361            chunk = dco.decompress(cb, dcx)
362            self.assertFalse(len(chunk) > dcx,
363                    'chunk too big (%d>%d)' % (len(chunk), dcx))
364            bufs.append(chunk)
365            cb = dco.unconsumed_tail
366        bufs.append(dco.flush())
367        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
368
369    def test_decompressmaxlen(self, flush=False):
370        # Check a decompression object with max_length specified
371        data = HAMLET_SCENE * 128
372        co = zlib.compressobj()
373        bufs = []
374        for i in range(0, len(data), 256):
375            bufs.append(co.compress(data[i:i+256]))
376        bufs.append(co.flush())
377        combuf = b''.join(bufs)
378        self.assertEqual(data, zlib.decompress(combuf),
379                         'compressed data failure')
380
381        dco = zlib.decompressobj()
382        bufs = []
383        cb = combuf
384        while cb:
385            max_length = 1 + len(cb)//10
386            chunk = dco.decompress(cb, max_length)
387            self.assertFalse(len(chunk) > max_length,
388                        'chunk too big (%d>%d)' % (len(chunk),max_length))
389            bufs.append(chunk)
390            cb = dco.unconsumed_tail
391        if flush:
392            bufs.append(dco.flush())
393        else:
394            while chunk:
395                chunk = dco.decompress(b'', max_length)
396                self.assertFalse(len(chunk) > max_length,
397                            'chunk too big (%d>%d)' % (len(chunk),max_length))
398                bufs.append(chunk)
399        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
400
401    def test_decompressmaxlenflush(self):
402        self.test_decompressmaxlen(flush=True)
403
404    def test_maxlenmisc(self):
405        # Misc tests of max_length
406        dco = zlib.decompressobj()
407        self.assertRaises(ValueError, dco.decompress, b"", -1)
408        self.assertEqual(b'', dco.unconsumed_tail)
409
410    def test_maxlen_large(self):
411        # Sizes up to sys.maxsize should be accepted, although zlib is
412        # internally limited to expressing sizes with unsigned int
413        data = HAMLET_SCENE * 10
414        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415        compressed = zlib.compress(data, 1)
416        dco = zlib.decompressobj()
417        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418
419    def test_maxlen_custom(self):
420        data = HAMLET_SCENE * 10
421        compressed = zlib.compress(data, 1)
422        dco = zlib.decompressobj()
423        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424
425    def test_clear_unconsumed_tail(self):
426        # Issue #12050: calling decompress() without providing max_length
427        # should clear the unconsumed_tail attribute.
428        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
429        dco = zlib.decompressobj()
430        ddata = dco.decompress(cdata, 1)
431        ddata += dco.decompress(dco.unconsumed_tail)
432        self.assertEqual(dco.unconsumed_tail, b"")
433
434    def test_flushes(self):
435        # Test flush() with the various options, using all the
436        # different levels in order to provide more variations.
437        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
438        sync_opt = [getattr(zlib, opt) for opt in sync_opt
439                    if hasattr(zlib, opt)]
440        data = HAMLET_SCENE * 8
441
442        for sync in sync_opt:
443            for level in range(10):
444                obj = zlib.compressobj( level )
445                a = obj.compress( data[:3000] )
446                b = obj.flush( sync )
447                c = obj.compress( data[3000:] )
448                d = obj.flush()
449                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
450                                 data, ("Decompress failed: flush "
451                                        "mode=%i, level=%i") % (sync, level))
452                del obj
453
454    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
455                         'requires zlib.Z_SYNC_FLUSH')
456    def test_odd_flush(self):
457        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
458        import random
459        # Testing on 17K of "random" data
460
461        # Create compressor and decompressor objects
462        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
463        dco = zlib.decompressobj()
464
465        # Try 17K of data
466        # generate random data stream
467        try:
468            # In 2.3 and later, WichmannHill is the RNG of the bug report
469            gen = random.WichmannHill()
470        except AttributeError:
471            try:
472                # 2.2 called it Random
473                gen = random.Random()
474            except AttributeError:
475                # others might simply have a single RNG
476                gen = random
477        gen.seed(1)
478        data = genblock(1, 17 * 1024, generator=gen)
479
480        # compress, sync-flush, and decompress
481        first = co.compress(data)
482        second = co.flush(zlib.Z_SYNC_FLUSH)
483        expanded = dco.decompress(first + second)
484
485        # if decompressed data is different from the input data, choke.
486        self.assertEqual(expanded, data, "17K random source doesn't match")
487
488    def test_empty_flush(self):
489        # Test that calling .flush() on unused objects works.
490        # (Bug #1083110 -- calling .flush() on decompress objects
491        # caused a core dump.)
492
493        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
494        self.assertTrue(co.flush())  # Returns a zlib header
495        dco = zlib.decompressobj()
496        self.assertEqual(dco.flush(), b"") # Returns nothing
497
498    def test_dictionary(self):
499        h = HAMLET_SCENE
500        # Build a simulated dictionary out of the words in HAMLET.
501        words = h.split()
502        random.shuffle(words)
503        zdict = b''.join(words)
504        # Use it to compress HAMLET.
505        co = zlib.compressobj(zdict=zdict)
506        cd = co.compress(h) + co.flush()
507        # Verify that it will decompress with the dictionary.
508        dco = zlib.decompressobj(zdict=zdict)
509        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
510        # Verify that it fails when not given the dictionary.
511        dco = zlib.decompressobj()
512        self.assertRaises(zlib.error, dco.decompress, cd)
513
514    def test_dictionary_streaming(self):
515        # This simulates the reuse of a compressor object for compressing
516        # several separate data streams.
517        co = zlib.compressobj(zdict=HAMLET_SCENE)
518        do = zlib.decompressobj(zdict=HAMLET_SCENE)
519        piece = HAMLET_SCENE[1000:1500]
520        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
521        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
522        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
523        self.assertEqual(do.decompress(d0), piece)
524        self.assertEqual(do.decompress(d1), piece[100:])
525        self.assertEqual(do.decompress(d2), piece[:-100])
526
527    def test_decompress_incomplete_stream(self):
528        # This is 'foo', deflated
529        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
530        # For the record
531        self.assertEqual(zlib.decompress(x), b'foo')
532        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
533        # Omitting the stream end works with decompressor objects
534        # (see issue #8672).
535        dco = zlib.decompressobj()
536        y = dco.decompress(x[:-5])
537        y += dco.flush()
538        self.assertEqual(y, b'foo')
539
540    def test_decompress_eof(self):
541        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
542        dco = zlib.decompressobj()
543        self.assertFalse(dco.eof)
544        dco.decompress(x[:-5])
545        self.assertFalse(dco.eof)
546        dco.decompress(x[-5:])
547        self.assertTrue(dco.eof)
548        dco.flush()
549        self.assertTrue(dco.eof)
550
551    def test_decompress_eof_incomplete_stream(self):
552        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
553        dco = zlib.decompressobj()
554        self.assertFalse(dco.eof)
555        dco.decompress(x[:-5])
556        self.assertFalse(dco.eof)
557        dco.flush()
558        self.assertFalse(dco.eof)
559
560    def test_decompress_unused_data(self):
561        # Repeated calls to decompress() after EOF should accumulate data in
562        # dco.unused_data, instead of just storing the arg to the last call.
563        source = b'abcdefghijklmnopqrstuvwxyz'
564        remainder = b'0123456789'
565        y = zlib.compress(source)
566        x = y + remainder
567        for maxlen in 0, 1000:
568            for step in 1, 2, len(y), len(x):
569                dco = zlib.decompressobj()
570                data = b''
571                for i in range(0, len(x), step):
572                    if i < len(y):
573                        self.assertEqual(dco.unused_data, b'')
574                    if maxlen == 0:
575                        data += dco.decompress(x[i : i + step])
576                        self.assertEqual(dco.unconsumed_tail, b'')
577                    else:
578                        data += dco.decompress(
579                                dco.unconsumed_tail + x[i : i + step], maxlen)
580                data += dco.flush()
581                self.assertTrue(dco.eof)
582                self.assertEqual(data, source)
583                self.assertEqual(dco.unconsumed_tail, b'')
584                self.assertEqual(dco.unused_data, remainder)
585
586    # issue27164
587    def test_decompress_raw_with_dictionary(self):
588        zdict = b'abcdefghijklmnopqrstuvwxyz'
589        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
590        comp = co.compress(zdict) + co.flush()
591        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
592        uncomp = dco.decompress(comp) + dco.flush()
593        self.assertEqual(zdict, uncomp)
594
595    def test_flush_with_freed_input(self):
596        # Issue #16411: decompressor accesses input to last decompress() call
597        # in flush(), even if this object has been freed in the meanwhile.
598        input1 = b'abcdefghijklmnopqrstuvwxyz'
599        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
600        data = zlib.compress(input1)
601        dco = zlib.decompressobj()
602        dco.decompress(data, 1)
603        del data
604        data = zlib.compress(input2)
605        self.assertEqual(dco.flush(), input1[1:])
606
607    @bigmemtest(size=_4G, memuse=1)
608    def test_flush_large_length(self, size):
609        # Test flush(length) parameter greater than internal limit UINT_MAX
610        input = HAMLET_SCENE * 10
611        data = zlib.compress(input, 1)
612        dco = zlib.decompressobj()
613        dco.decompress(data, 1)
614        self.assertEqual(dco.flush(size), input[1:])
615
616    def test_flush_custom_length(self):
617        input = HAMLET_SCENE * 10
618        data = zlib.compress(input, 1)
619        dco = zlib.decompressobj()
620        dco.decompress(data, 1)
621        self.assertEqual(dco.flush(CustomInt()), input[1:])
622
623    @requires_Compress_copy
624    def test_compresscopy(self):
625        # Test copying a compression object
626        data0 = HAMLET_SCENE
627        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
628        c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
629        bufs0 = []
630        bufs0.append(c0.compress(data0))
631
632        c1 = c0.copy()
633        bufs1 = bufs0[:]
634
635        bufs0.append(c0.compress(data0))
636        bufs0.append(c0.flush())
637        s0 = b''.join(bufs0)
638
639        bufs1.append(c1.compress(data1))
640        bufs1.append(c1.flush())
641        s1 = b''.join(bufs1)
642
643        self.assertEqual(zlib.decompress(s0),data0+data0)
644        self.assertEqual(zlib.decompress(s1),data0+data1)
645
646    @requires_Compress_copy
647    def test_badcompresscopy(self):
648        # Test copying a compression object in an inconsistent state
649        c = zlib.compressobj()
650        c.compress(HAMLET_SCENE)
651        c.flush()
652        self.assertRaises(ValueError, c.copy)
653
654    @requires_Decompress_copy
655    def test_decompresscopy(self):
656        # Test copying a decompression object
657        data = HAMLET_SCENE
658        comp = zlib.compress(data)
659        # Test type of return value
660        self.assertIsInstance(comp, bytes)
661
662        d0 = zlib.decompressobj()
663        bufs0 = []
664        bufs0.append(d0.decompress(comp[:32]))
665
666        d1 = d0.copy()
667        bufs1 = bufs0[:]
668
669        bufs0.append(d0.decompress(comp[32:]))
670        s0 = b''.join(bufs0)
671
672        bufs1.append(d1.decompress(comp[32:]))
673        s1 = b''.join(bufs1)
674
675        self.assertEqual(s0,s1)
676        self.assertEqual(s0,data)
677
678    @requires_Decompress_copy
679    def test_baddecompresscopy(self):
680        # Test copying a compression object in an inconsistent state
681        data = zlib.compress(HAMLET_SCENE)
682        d = zlib.decompressobj()
683        d.decompress(data)
684        d.flush()
685        self.assertRaises(ValueError, d.copy)
686
687    def test_compresspickle(self):
688        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
689            with self.assertRaises((TypeError, pickle.PicklingError)):
690                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
691
692    def test_decompresspickle(self):
693        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
694            with self.assertRaises((TypeError, pickle.PicklingError)):
695                pickle.dumps(zlib.decompressobj(), proto)
696
697    # Memory use of the following functions takes into account overallocation
698
699    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
700    def test_big_compress_buffer(self, size):
701        c = zlib.compressobj(1)
702        compress = lambda s: c.compress(s) + c.flush()
703        self.check_big_compress_buffer(size, compress)
704
705    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
706    def test_big_decompress_buffer(self, size):
707        d = zlib.decompressobj()
708        decompress = lambda s: d.decompress(s) + d.flush()
709        self.check_big_decompress_buffer(size, decompress)
710
711    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
712    @bigmemtest(size=_4G + 100, memuse=4)
713    def test_64bit_compress(self, size):
714        data = b'x' * size
715        co = zlib.compressobj(0)
716        do = zlib.decompressobj()
717        try:
718            comp = co.compress(data) + co.flush()
719            uncomp = do.decompress(comp) + do.flush()
720            self.assertEqual(uncomp, data)
721        finally:
722            comp = uncomp = data = None
723
724    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
725    @bigmemtest(size=_4G + 100, memuse=3)
726    def test_large_unused_data(self, size):
727        data = b'abcdefghijklmnop'
728        unused = b'x' * size
729        comp = zlib.compress(data) + unused
730        do = zlib.decompressobj()
731        try:
732            uncomp = do.decompress(comp) + do.flush()
733            self.assertEqual(unused, do.unused_data)
734            self.assertEqual(uncomp, data)
735        finally:
736            unused = comp = do = None
737
738    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
739    @bigmemtest(size=_4G + 100, memuse=5)
740    def test_large_unconsumed_tail(self, size):
741        data = b'x' * size
742        do = zlib.decompressobj()
743        try:
744            comp = zlib.compress(data, 0)
745            uncomp = do.decompress(comp, 1) + do.flush()
746            self.assertEqual(uncomp, data)
747            self.assertEqual(do.unconsumed_tail, b'')
748        finally:
749            comp = uncomp = data = None
750
751    def test_wbits(self):
752        # wbits=0 only supported since zlib v1.2.3.5
753        # Register "1.2.3" as "1.2.3.0"
754        v = (zlib.ZLIB_RUNTIME_VERSION + ".0").split(".", 4)
755        supports_wbits_0 = int(v[0]) > 1 or int(v[0]) == 1 \
756            and (int(v[1]) > 2 or int(v[1]) == 2
757            and (int(v[2]) > 3 or int(v[2]) == 3 and int(v[3]) >= 5))
758
759        co = zlib.compressobj(level=1, wbits=15)
760        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
761        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
762        if supports_wbits_0:
763            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
764        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
765        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
766            zlib.decompress(zlib15, 14)
767        dco = zlib.decompressobj(wbits=32 + 15)
768        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
769        dco = zlib.decompressobj(wbits=14)
770        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
771            dco.decompress(zlib15)
772
773        co = zlib.compressobj(level=1, wbits=9)
774        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
775        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
776        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
777        if supports_wbits_0:
778            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
779        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
780        dco = zlib.decompressobj(wbits=32 + 9)
781        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
782
783        co = zlib.compressobj(level=1, wbits=-15)
784        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
785        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
786        dco = zlib.decompressobj(wbits=-15)
787        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
788
789        co = zlib.compressobj(level=1, wbits=-9)
790        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
791        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
792        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
793        dco = zlib.decompressobj(wbits=-9)
794        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
795
796        co = zlib.compressobj(level=1, wbits=16 + 15)
797        gzip = co.compress(HAMLET_SCENE) + co.flush()
798        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
799        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
800        dco = zlib.decompressobj(32 + 15)
801        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
802
803
804def genblock(seed, length, step=1024, generator=random):
805    """length-byte stream of random data from a seed (in step-byte blocks)."""
806    if seed is not None:
807        generator.seed(seed)
808    randint = generator.randint
809    if length < step or step < 2:
810        step = length
811    blocks = bytes()
812    for i in range(0, length, step):
813        blocks += bytes(randint(0, 255) for x in range(step))
814    return blocks
815
816
817
818def choose_lines(source, number, seed=None, generator=random):
819    """Return a list of number lines randomly chosen from the source"""
820    if seed is not None:
821        generator.seed(seed)
822    sources = source.split('\n')
823    return [generator.choice(sources) for n in range(number)]
824
825
826
827HAMLET_SCENE = b"""
828LAERTES
829
830       O, fear me not.
831       I stay too long: but here my father comes.
832
833       Enter POLONIUS
834
835       A double blessing is a double grace,
836       Occasion smiles upon a second leave.
837
838LORD POLONIUS
839
840       Yet here, Laertes! aboard, aboard, for shame!
841       The wind sits in the shoulder of your sail,
842       And you are stay'd for. There; my blessing with thee!
843       And these few precepts in thy memory
844       See thou character. Give thy thoughts no tongue,
845       Nor any unproportioned thought his act.
846       Be thou familiar, but by no means vulgar.
847       Those friends thou hast, and their adoption tried,
848       Grapple them to thy soul with hoops of steel;
849       But do not dull thy palm with entertainment
850       Of each new-hatch'd, unfledged comrade. Beware
851       Of entrance to a quarrel, but being in,
852       Bear't that the opposed may beware of thee.
853       Give every man thy ear, but few thy voice;
854       Take each man's censure, but reserve thy judgment.
855       Costly thy habit as thy purse can buy,
856       But not express'd in fancy; rich, not gaudy;
857       For the apparel oft proclaims the man,
858       And they in France of the best rank and station
859       Are of a most select and generous chief in that.
860       Neither a borrower nor a lender be;
861       For loan oft loses both itself and friend,
862       And borrowing dulls the edge of husbandry.
863       This above all: to thine ownself be true,
864       And it must follow, as the night the day,
865       Thou canst not then be false to any man.
866       Farewell: my blessing season this in thee!
867
868LAERTES
869
870       Most humbly do I take my leave, my lord.
871
872LORD POLONIUS
873
874       The time invites you; go; your servants tend.
875
876LAERTES
877
878       Farewell, Ophelia; and remember well
879       What I have said to you.
880
881OPHELIA
882
883       'Tis in my memory lock'd,
884       And you yourself shall keep the key of it.
885
886LAERTES
887
888       Farewell.
889"""
890
891
892class CustomInt:
893    def __int__(self):
894        return 100
895
896
897if __name__ == "__main__":
898    unittest.main()
899