1import unittest
2from test import test_support as support
3from test.test_support import TESTFN, run_unittest, import_module, unlink, requires
4import binascii
5import pickle
6import random
7from test.test_support import precisionbigmemtest, _1G, _4G
8import sys
9
10try:
11    import mmap
12except ImportError:
13    mmap = None
14
15zlib = import_module('zlib')
16
17requires_Compress_copy = unittest.skipUnless(
18        hasattr(zlib.compressobj(), "copy"),
19        'requires Compress.copy()')
20requires_Decompress_copy = unittest.skipUnless(
21        hasattr(zlib.decompressobj(), "copy"),
22        'requires Decompress.copy()')
23
24
25class ChecksumTestCase(unittest.TestCase):
26    # checksum test cases
27    def test_crc32start(self):
28        self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
29        self.assertTrue(zlib.crc32("abc", 0xffffffff))
30
31    def test_crc32empty(self):
32        self.assertEqual(zlib.crc32("", 0), 0)
33        self.assertEqual(zlib.crc32("", 1), 1)
34        self.assertEqual(zlib.crc32("", 432), 432)
35
36    def test_adler32start(self):
37        self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
38        self.assertTrue(zlib.adler32("abc", 0xffffffff))
39
40    def test_adler32empty(self):
41        self.assertEqual(zlib.adler32("", 0), 0)
42        self.assertEqual(zlib.adler32("", 1), 1)
43        self.assertEqual(zlib.adler32("", 432), 432)
44
45    def assertEqual32(self, seen, expected):
46        # 32-bit values masked -- checksums on 32- vs 64- bit machines
47        # This is important if bit 31 (0x08000000L) is set.
48        self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
49
50    def test_penguins(self):
51        self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
52        self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
53        self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
54        self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
55
56        self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
57        self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
58
59    def test_abcdefghijklmnop(self):
60        """test issue1202 compliance: signed crc32, adler32 in 2.x"""
61        foo = 'abcdefghijklmnop'
62        # explicitly test signed behavior
63        self.assertEqual(zlib.crc32(foo), -1808088941)
64        self.assertEqual(zlib.crc32('spam'), 1138425661)
65        self.assertEqual(zlib.adler32(foo+foo), -721416943)
66        self.assertEqual(zlib.adler32('spam'), 72286642)
67
68    def test_same_as_binascii_crc32(self):
69        foo = 'abcdefghijklmnop'
70        self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
71        self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
72
73    def test_negative_crc_iv_input(self):
74        # The range of valid input values for the crc state should be
75        # -2**31 through 2**32-1 to allow inputs artifically constrained
76        # to a signed 32-bit integer.
77        self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
78        self.assertEqual(zlib.crc32('spam', -3141593),
79                         zlib.crc32('spam',  0xffd01027L))
80        self.assertEqual(zlib.crc32('spam', -(2**31)),
81                         zlib.crc32('spam',  (2**31)))
82
83
84# Issue #10276 - check that inputs >=4GB are handled correctly.
85class ChecksumBigBufferTestCase(unittest.TestCase):
86
87    @precisionbigmemtest(size=_4G + 4, memuse=1, dry_run=False)
88    def test_big_buffer(self, size):
89        data = b"nyan" * (_1G + 1)
90        self.assertEqual(zlib.crc32(data) & 0xFFFFFFFF, 1044521549)
91        self.assertEqual(zlib.adler32(data) & 0xFFFFFFFF, 2256789997)
92
93
94class ExceptionTestCase(unittest.TestCase):
95    # make sure we generate some expected errors
96    def test_badlevel(self):
97        # specifying compression level out of range causes an error
98        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
99        # accepts 0 too)
100        self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
101
102    def test_badcompressobj(self):
103        # verify failure on building compress object with bad params
104        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
105        # specifying total bits too large causes an error
106        self.assertRaises(ValueError,
107                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
108
109    def test_baddecompressobj(self):
110        # verify failure on building decompress object with bad params
111        self.assertRaises(ValueError, zlib.decompressobj, -1)
112
113    def test_decompressobj_badflush(self):
114        # verify failure on calling decompressobj.flush with bad params
115        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
116        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
117
118    @support.cpython_only
119    def test_overflow(self):
120        with self.assertRaisesRegexp(OverflowError, 'int too large'):
121            zlib.decompress(b'', 15, sys.maxsize + 1)
122        with self.assertRaisesRegexp(OverflowError, 'int too large'):
123            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
124        with self.assertRaisesRegexp(OverflowError, 'int too large'):
125            zlib.decompressobj().flush(sys.maxsize + 1)
126
127
128class BaseCompressTestCase(object):
129    def check_big_compress_buffer(self, size, compress_func):
130        _1M = 1024 * 1024
131        fmt = "%%0%dx" % (2 * _1M)
132        # Generate 10MB worth of random, and expand it by repeating it.
133        # The assumption is that zlib's memory is not big enough to exploit
134        # such spread out redundancy.
135        data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
136                        for i in range(10)])
137        data = data * (size // len(data) + 1)
138        try:
139            compress_func(data)
140        finally:
141            # Release memory
142            data = None
143
144    def check_big_decompress_buffer(self, size, decompress_func):
145        data = 'x' * size
146        try:
147            compressed = zlib.compress(data, 1)
148        finally:
149            # Release memory
150            data = None
151        data = decompress_func(compressed)
152        # Sanity check
153        try:
154            self.assertEqual(len(data), size)
155            self.assertEqual(len(data.strip('x')), 0)
156        finally:
157            data = None
158
159
160class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
161    # Test compression in one go (whole message compression)
162    def test_speech(self):
163        x = zlib.compress(HAMLET_SCENE)
164        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
165
166    def test_speech128(self):
167        # compress more data
168        data = HAMLET_SCENE * 128
169        x = zlib.compress(data)
170        self.assertEqual(zlib.decompress(x), data)
171
172    def test_incomplete_stream(self):
173        # A useful error message is given
174        x = zlib.compress(HAMLET_SCENE)
175        self.assertRaisesRegexp(zlib.error,
176            "Error -5 while decompressing data: incomplete or truncated stream",
177            zlib.decompress, x[:-1])
178
179    # Memory use of the following functions takes into account overallocation
180
181    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
182    def test_big_compress_buffer(self, size):
183        compress = lambda s: zlib.compress(s, 1)
184        self.check_big_compress_buffer(size, compress)
185
186    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
187    def test_big_decompress_buffer(self, size):
188        self.check_big_decompress_buffer(size, zlib.decompress)
189
190    @precisionbigmemtest(size=_4G, memuse=1)
191    def test_large_bufsize(self, size):
192        # Test decompress(bufsize) parameter greater than the internal limit
193        data = HAMLET_SCENE * 10
194        compressed = zlib.compress(data, 1)
195        self.assertEqual(zlib.decompress(compressed, 15, size), data)
196
197    def test_custom_bufsize(self):
198        data = HAMLET_SCENE * 10
199        compressed = zlib.compress(data, 1)
200        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
201
202    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
203    @precisionbigmemtest(size=_4G + 100, memuse=4)
204    def test_64bit_compress(self, size):
205        data = b'x' * size
206        try:
207            comp = zlib.compress(data, 0)
208            self.assertEqual(zlib.decompress(comp), data)
209        finally:
210            comp = data = None
211
212
213class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
214    # Test compression object
215    def test_pair(self):
216        # straightforward compress/decompress objects
217        data = HAMLET_SCENE * 128
218        co = zlib.compressobj()
219        x1 = co.compress(data)
220        x2 = co.flush()
221        self.assertRaises(zlib.error, co.flush) # second flush should not work
222        dco = zlib.decompressobj()
223        y1 = dco.decompress(x1 + x2)
224        y2 = dco.flush()
225        self.assertEqual(data, y1 + y2)
226
227    def test_compressoptions(self):
228        # specify lots of options to compressobj()
229        level = 2
230        method = zlib.DEFLATED
231        wbits = -12
232        memlevel = 9
233        strategy = zlib.Z_FILTERED
234        co = zlib.compressobj(level, method, wbits, memlevel, strategy)
235        x1 = co.compress(HAMLET_SCENE)
236        x2 = co.flush()
237        dco = zlib.decompressobj(wbits)
238        y1 = dco.decompress(x1 + x2)
239        y2 = dco.flush()
240        self.assertEqual(HAMLET_SCENE, y1 + y2)
241
242    def test_compressincremental(self):
243        # compress object in steps, decompress object as one-shot
244        data = HAMLET_SCENE * 128
245        co = zlib.compressobj()
246        bufs = []
247        for i in range(0, len(data), 256):
248            bufs.append(co.compress(data[i:i+256]))
249        bufs.append(co.flush())
250        combuf = ''.join(bufs)
251
252        dco = zlib.decompressobj()
253        y1 = dco.decompress(''.join(bufs))
254        y2 = dco.flush()
255        self.assertEqual(data, y1 + y2)
256
257    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
258        # compress object in steps, decompress object in steps
259        source = source or HAMLET_SCENE
260        data = source * 128
261        co = zlib.compressobj()
262        bufs = []
263        for i in range(0, len(data), cx):
264            bufs.append(co.compress(data[i:i+cx]))
265        bufs.append(co.flush())
266        combuf = ''.join(bufs)
267
268        self.assertEqual(data, zlib.decompress(combuf))
269
270        dco = zlib.decompressobj()
271        bufs = []
272        for i in range(0, len(combuf), dcx):
273            bufs.append(dco.decompress(combuf[i:i+dcx]))
274            self.assertEqual('', dco.unconsumed_tail, ########
275                             "(A) uct should be '': not %d long" %
276                                       len(dco.unconsumed_tail))
277        if flush:
278            bufs.append(dco.flush())
279        else:
280            while True:
281                chunk = dco.decompress('')
282                if chunk:
283                    bufs.append(chunk)
284                else:
285                    break
286        self.assertEqual('', dco.unconsumed_tail, ########
287                         "(B) uct should be '': not %d long" %
288                                       len(dco.unconsumed_tail))
289        self.assertEqual(data, ''.join(bufs))
290        # Failure means: "decompressobj with init options failed"
291
292    def test_decompincflush(self):
293        self.test_decompinc(flush=True)
294
295    def test_decompimax(self, source=None, cx=256, dcx=64):
296        # compress in steps, decompress in length-restricted steps
297        source = source or HAMLET_SCENE
298        # Check a decompression object with max_length specified
299        data = source * 128
300        co = zlib.compressobj()
301        bufs = []
302        for i in range(0, len(data), cx):
303            bufs.append(co.compress(data[i:i+cx]))
304        bufs.append(co.flush())
305        combuf = ''.join(bufs)
306        self.assertEqual(data, zlib.decompress(combuf),
307                         'compressed data failure')
308
309        dco = zlib.decompressobj()
310        bufs = []
311        cb = combuf
312        while cb:
313            #max_length = 1 + len(cb)//10
314            chunk = dco.decompress(cb, dcx)
315            self.assertFalse(len(chunk) > dcx,
316                    'chunk too big (%d>%d)' % (len(chunk), dcx))
317            bufs.append(chunk)
318            cb = dco.unconsumed_tail
319        bufs.append(dco.flush())
320        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
321
322    def test_decompressmaxlen(self, flush=False):
323        # Check a decompression object with max_length specified
324        data = HAMLET_SCENE * 128
325        co = zlib.compressobj()
326        bufs = []
327        for i in range(0, len(data), 256):
328            bufs.append(co.compress(data[i:i+256]))
329        bufs.append(co.flush())
330        combuf = ''.join(bufs)
331        self.assertEqual(data, zlib.decompress(combuf),
332                         'compressed data failure')
333
334        dco = zlib.decompressobj()
335        bufs = []
336        cb = combuf
337        while cb:
338            max_length = 1 + len(cb)//10
339            chunk = dco.decompress(cb, max_length)
340            self.assertFalse(len(chunk) > max_length,
341                        'chunk too big (%d>%d)' % (len(chunk),max_length))
342            bufs.append(chunk)
343            cb = dco.unconsumed_tail
344        if flush:
345            bufs.append(dco.flush())
346        else:
347            while chunk:
348                chunk = dco.decompress('', max_length)
349                self.assertFalse(len(chunk) > max_length,
350                            'chunk too big (%d>%d)' % (len(chunk),max_length))
351                bufs.append(chunk)
352        self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
353
354    def test_decompressmaxlenflush(self):
355        self.test_decompressmaxlen(flush=True)
356
357    def test_maxlenmisc(self):
358        # Misc tests of max_length
359        dco = zlib.decompressobj()
360        self.assertRaises(ValueError, dco.decompress, "", -1)
361        self.assertEqual('', dco.unconsumed_tail)
362
363    def test_maxlen_large(self):
364        # Sizes up to sys.maxsize should be accepted, although zlib is
365        # internally limited to expressing sizes with unsigned int
366        data = HAMLET_SCENE * 10
367        DEFAULTALLOC = 16 * 1024
368        self.assertGreater(len(data), DEFAULTALLOC)
369        compressed = zlib.compress(data, 1)
370        dco = zlib.decompressobj()
371        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
372
373    def test_maxlen_custom(self):
374        data = HAMLET_SCENE * 10
375        compressed = zlib.compress(data, 1)
376        dco = zlib.decompressobj()
377        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
378
379    def test_clear_unconsumed_tail(self):
380        # Issue #12050: calling decompress() without providing max_length
381        # should clear the unconsumed_tail attribute.
382        cdata = "x\x9cKLJ\x06\x00\x02M\x01"     # "abc"
383        dco = zlib.decompressobj()
384        ddata = dco.decompress(cdata, 1)
385        ddata += dco.decompress(dco.unconsumed_tail)
386        self.assertEqual(dco.unconsumed_tail, "")
387
388    def test_flushes(self):
389        # Test flush() with the various options, using all the
390        # different levels in order to provide more variations.
391        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
392        sync_opt = [getattr(zlib, opt) for opt in sync_opt
393                    if hasattr(zlib, opt)]
394        data = HAMLET_SCENE * 8
395
396        for sync in sync_opt:
397            for level in range(10):
398                obj = zlib.compressobj( level )
399                a = obj.compress( data[:3000] )
400                b = obj.flush( sync )
401                c = obj.compress( data[3000:] )
402                d = obj.flush()
403                self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
404                                 data, ("Decompress failed: flush "
405                                        "mode=%i, level=%i") % (sync, level))
406                del obj
407
408    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
409                         'requires zlib.Z_SYNC_FLUSH')
410    def test_odd_flush(self):
411        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
412        import random
413        # Testing on 17K of "random" data
414
415        # Create compressor and decompressor objects
416        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
417        dco = zlib.decompressobj()
418
419        # Try 17K of data
420        # generate random data stream
421        try:
422            # In 2.3 and later, WichmannHill is the RNG of the bug report
423            gen = random.WichmannHill()
424        except AttributeError:
425            try:
426                # 2.2 called it Random
427                gen = random.Random()
428            except AttributeError:
429                # others might simply have a single RNG
430                gen = random
431        gen.seed(1)
432        data = genblock(1, 17 * 1024, generator=gen)
433
434        # compress, sync-flush, and decompress
435        first = co.compress(data)
436        second = co.flush(zlib.Z_SYNC_FLUSH)
437        expanded = dco.decompress(first + second)
438
439        # if decompressed data is different from the input data, choke.
440        self.assertEqual(expanded, data, "17K random source doesn't match")
441
442    def test_empty_flush(self):
443        # Test that calling .flush() on unused objects works.
444        # (Bug #1083110 -- calling .flush() on decompress objects
445        # caused a core dump.)
446
447        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
448        self.assertTrue(co.flush())  # Returns a zlib header
449        dco = zlib.decompressobj()
450        self.assertEqual(dco.flush(), "") # Returns nothing
451
452    def test_decompress_incomplete_stream(self):
453        # This is 'foo', deflated
454        x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
455        # For the record
456        self.assertEqual(zlib.decompress(x), 'foo')
457        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
458        # Omitting the stream end works with decompressor objects
459        # (see issue #8672).
460        dco = zlib.decompressobj()
461        y = dco.decompress(x[:-5])
462        y += dco.flush()
463        self.assertEqual(y, 'foo')
464
465    def test_flush_with_freed_input(self):
466        # Issue #16411: decompressor accesses input to last decompress() call
467        # in flush(), even if this object has been freed in the meanwhile.
468        input1 = 'abcdefghijklmnopqrstuvwxyz'
469        input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM'
470        data = zlib.compress(input1)
471        dco = zlib.decompressobj()
472        dco.decompress(data, 1)
473        del data
474        data = zlib.compress(input2)
475        self.assertEqual(dco.flush(), input1[1:])
476
477    @precisionbigmemtest(size=_4G, memuse=1)
478    def test_flush_large_length(self, size):
479        # Test flush(length) parameter greater than internal limit UINT_MAX
480        input = HAMLET_SCENE * 10
481        data = zlib.compress(input, 1)
482        dco = zlib.decompressobj()
483        dco.decompress(data, 1)
484        self.assertEqual(dco.flush(size), input[1:])
485
486    def test_flush_custom_length(self):
487        input = HAMLET_SCENE * 10
488        data = zlib.compress(input, 1)
489        dco = zlib.decompressobj()
490        dco.decompress(data, 1)
491        self.assertEqual(dco.flush(CustomInt()), input[1:])
492
493    @requires_Compress_copy
494    def test_compresscopy(self):
495        # Test copying a compression object
496        data0 = HAMLET_SCENE
497        data1 = HAMLET_SCENE.swapcase()
498        c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
499        bufs0 = []
500        bufs0.append(c0.compress(data0))
501
502        c1 = c0.copy()
503        bufs1 = bufs0[:]
504
505        bufs0.append(c0.compress(data0))
506        bufs0.append(c0.flush())
507        s0 = ''.join(bufs0)
508
509        bufs1.append(c1.compress(data1))
510        bufs1.append(c1.flush())
511        s1 = ''.join(bufs1)
512
513        self.assertEqual(zlib.decompress(s0),data0+data0)
514        self.assertEqual(zlib.decompress(s1),data0+data1)
515
516    @requires_Compress_copy
517    def test_badcompresscopy(self):
518        # Test copying a compression object in an inconsistent state
519        c = zlib.compressobj()
520        c.compress(HAMLET_SCENE)
521        c.flush()
522        self.assertRaises(ValueError, c.copy)
523
524    def test_decompress_unused_data(self):
525        # Repeated calls to decompress() after EOF should accumulate data in
526        # dco.unused_data, instead of just storing the arg to the last call.
527        source = b'abcdefghijklmnopqrstuvwxyz'
528        remainder = b'0123456789'
529        y = zlib.compress(source)
530        x = y + remainder
531        for maxlen in 0, 1000:
532            for step in 1, 2, len(y), len(x):
533                dco = zlib.decompressobj()
534                data = b''
535                for i in range(0, len(x), step):
536                    if i < len(y):
537                        self.assertEqual(dco.unused_data, b'')
538                    if maxlen == 0:
539                        data += dco.decompress(x[i : i + step])
540                        self.assertEqual(dco.unconsumed_tail, b'')
541                    else:
542                        data += dco.decompress(
543                                dco.unconsumed_tail + x[i : i + step], maxlen)
544                data += dco.flush()
545                self.assertEqual(data, source)
546                self.assertEqual(dco.unconsumed_tail, b'')
547                self.assertEqual(dco.unused_data, remainder)
548
549    @requires_Decompress_copy
550    def test_decompresscopy(self):
551        # Test copying a decompression object
552        data = HAMLET_SCENE
553        comp = zlib.compress(data)
554
555        d0 = zlib.decompressobj()
556        bufs0 = []
557        bufs0.append(d0.decompress(comp[:32]))
558
559        d1 = d0.copy()
560        bufs1 = bufs0[:]
561
562        bufs0.append(d0.decompress(comp[32:]))
563        s0 = ''.join(bufs0)
564
565        bufs1.append(d1.decompress(comp[32:]))
566        s1 = ''.join(bufs1)
567
568        self.assertEqual(s0,s1)
569        self.assertEqual(s0,data)
570
571    @requires_Decompress_copy
572    def test_baddecompresscopy(self):
573        # Test copying a compression object in an inconsistent state
574        data = zlib.compress(HAMLET_SCENE)
575        d = zlib.decompressobj()
576        d.decompress(data)
577        d.flush()
578        self.assertRaises(ValueError, d.copy)
579
580    def test_compresspickle(self):
581        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
582            with self.assertRaises((TypeError, pickle.PicklingError)):
583                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
584
585    def test_decompresspickle(self):
586        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
587            with self.assertRaises((TypeError, pickle.PicklingError)):
588                pickle.dumps(zlib.decompressobj(), proto)
589
590    # Memory use of the following functions takes into account overallocation
591
592    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
593    def test_big_compress_buffer(self, size):
594        c = zlib.compressobj(1)
595        compress = lambda s: c.compress(s) + c.flush()
596        self.check_big_compress_buffer(size, compress)
597
598    @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
599    def test_big_decompress_buffer(self, size):
600        d = zlib.decompressobj()
601        decompress = lambda s: d.decompress(s) + d.flush()
602        self.check_big_decompress_buffer(size, decompress)
603
604    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
605    @precisionbigmemtest(size=_4G + 100, memuse=4)
606    def test_64bit_compress(self, size):
607        data = b'x' * size
608        co = zlib.compressobj(0)
609        do = zlib.decompressobj()
610        try:
611            comp = co.compress(data) + co.flush()
612            uncomp = do.decompress(comp) + do.flush()
613            self.assertEqual(uncomp, data)
614        finally:
615            comp = uncomp = data = None
616
617    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
618    @precisionbigmemtest(size=_4G + 100, memuse=3)
619    def test_large_unused_data(self, size):
620        data = b'abcdefghijklmnop'
621        unused = b'x' * size
622        comp = zlib.compress(data) + unused
623        do = zlib.decompressobj()
624        try:
625            uncomp = do.decompress(comp) + do.flush()
626            self.assertEqual(unused, do.unused_data)
627            self.assertEqual(uncomp, data)
628        finally:
629            unused = comp = do = None
630
631    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
632    @precisionbigmemtest(size=_4G + 100, memuse=5)
633    def test_large_unconsumed_tail(self, size):
634        data = b'x' * size
635        do = zlib.decompressobj()
636        try:
637            comp = zlib.compress(data, 0)
638            uncomp = do.decompress(comp, 1) + do.flush()
639            self.assertEqual(uncomp, data)
640            self.assertEqual(do.unconsumed_tail, b'')
641        finally:
642            comp = uncomp = data = None
643
644    def test_wbits(self):
645        co = zlib.compressobj(1, zlib.DEFLATED, 15)
646        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
647        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
648        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
649        with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
650            zlib.decompress(zlib15, 14)
651        dco = zlib.decompressobj(32 + 15)
652        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
653        dco = zlib.decompressobj(14)
654        with self.assertRaisesRegexp(zlib.error, 'invalid window size'):
655            dco.decompress(zlib15)
656
657        co = zlib.compressobj(1, zlib.DEFLATED, 9)
658        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
659        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
660        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
661        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
662        dco = zlib.decompressobj(32 + 9)
663        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
664
665        co = zlib.compressobj(1, zlib.DEFLATED, -15)
666        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
667        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
668        dco = zlib.decompressobj(-15)
669        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
670
671        co = zlib.compressobj(1, zlib.DEFLATED, -9)
672        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
673        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
674        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
675        dco = zlib.decompressobj(-9)
676        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
677
678        co = zlib.compressobj(1, zlib.DEFLATED, 16 + 15)
679        gzip = co.compress(HAMLET_SCENE) + co.flush()
680        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
681        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
682        dco = zlib.decompressobj(32 + 15)
683        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
684
685
686def genblock(seed, length, step=1024, generator=random):
687    """length-byte stream of random data from a seed (in step-byte blocks)."""
688    if seed is not None:
689        generator.seed(seed)
690    randint = generator.randint
691    if length < step or step < 2:
692        step = length
693    blocks = []
694    for i in range(0, length, step):
695        blocks.append(''.join([chr(randint(0,255))
696                               for x in range(step)]))
697    return ''.join(blocks)[:length]
698
699
700
701def choose_lines(source, number, seed=None, generator=random):
702    """Return a list of number lines randomly chosen from the source"""
703    if seed is not None:
704        generator.seed(seed)
705    sources = source.split('\n')
706    return [generator.choice(sources) for n in range(number)]
707
708
709
710HAMLET_SCENE = """
711LAERTES
712
713       O, fear me not.
714       I stay too long: but here my father comes.
715
716       Enter POLONIUS
717
718       A double blessing is a double grace,
719       Occasion smiles upon a second leave.
720
721LORD POLONIUS
722
723       Yet here, Laertes! aboard, aboard, for shame!
724       The wind sits in the shoulder of your sail,
725       And you are stay'd for. There; my blessing with thee!
726       And these few precepts in thy memory
727       See thou character. Give thy thoughts no tongue,
728       Nor any unproportioned thought his act.
729       Be thou familiar, but by no means vulgar.
730       Those friends thou hast, and their adoption tried,
731       Grapple them to thy soul with hoops of steel;
732       But do not dull thy palm with entertainment
733       Of each new-hatch'd, unfledged comrade. Beware
734       Of entrance to a quarrel, but being in,
735       Bear't that the opposed may beware of thee.
736       Give every man thy ear, but few thy voice;
737       Take each man's censure, but reserve thy judgment.
738       Costly thy habit as thy purse can buy,
739       But not express'd in fancy; rich, not gaudy;
740       For the apparel oft proclaims the man,
741       And they in France of the best rank and station
742       Are of a most select and generous chief in that.
743       Neither a borrower nor a lender be;
744       For loan oft loses both itself and friend,
745       And borrowing dulls the edge of husbandry.
746       This above all: to thine ownself be true,
747       And it must follow, as the night the day,
748       Thou canst not then be false to any man.
749       Farewell: my blessing season this in thee!
750
751LAERTES
752
753       Most humbly do I take my leave, my lord.
754
755LORD POLONIUS
756
757       The time invites you; go; your servants tend.
758
759LAERTES
760
761       Farewell, Ophelia; and remember well
762       What I have said to you.
763
764OPHELIA
765
766       'Tis in my memory lock'd,
767       And you yourself shall keep the key of it.
768
769LAERTES
770
771       Farewell.
772"""
773
774
775class CustomInt:
776    def __int__(self):
777        return 100
778
779
780def test_main():
781    run_unittest(
782        ChecksumTestCase,
783        ChecksumBigBufferTestCase,
784        ExceptionTestCase,
785        CompressTestCase,
786        CompressObjectTestCase
787    )
788
789if __name__ == "__main__":
790    test_main()
791