1# Copyright 2016 The Brotli Authors. All rights reserved. 2# 3# Distributed under MIT license. 4# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT 5 6import functools 7import unittest 8 9from . import _test_utils 10import brotli 11 12 13def _get_original_name(test_data): 14 return test_data.split('.compressed')[0] 15 16 17class TestDecompressor(_test_utils.TestCase): 18 19 CHUNK_SIZE = 1 20 21 def setUp(self): 22 self.decompressor = brotli.Decompressor() 23 24 def tearDown(self): 25 self.decompressor = None 26 27 def _check_decompression(self, test_data): 28 # Verify decompression matches the original. 29 temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) 30 original = _get_original_name(test_data) 31 self.assertFilesMatch(temp_uncompressed, original) 32 33 def _decompress(self, test_data): 34 temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) 35 with open(temp_uncompressed, 'wb') as out_file: 36 with open(test_data, 'rb') as in_file: 37 read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) 38 for data in iter(read_chunk, b''): 39 out_file.write(self.decompressor.process(data)) 40 self.assertTrue(self.decompressor.is_finished()) 41 42 def _test_decompress(self, test_data): 43 self._decompress(test_data) 44 self._check_decompression(test_data) 45 46 def test_garbage_appended(self): 47 with self.assertRaises(brotli.error): 48 self.decompressor.process(brotli.compress(b'a') + b'a') 49 50 def test_already_finished(self): 51 self.decompressor.process(brotli.compress(b'a')) 52 with self.assertRaises(brotli.error): 53 self.decompressor.process(b'a') 54 55 56_test_utils.generate_test_methods(TestDecompressor, for_decompression=True) 57 58if __name__ == '__main__': 59 unittest.main() 60