1# Copyright 2016 The Brotli Authors. All rights reserved.
2#
3# Distributed under MIT license.
4# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
5
6import functools
7import unittest
8
9from . import _test_utils
10import brotli
11
12
13def _get_original_name(test_data):
14    return test_data.split('.compressed')[0]
15
16
17class TestDecompressor(_test_utils.TestCase):
18
19    CHUNK_SIZE = 1
20
21    def setUp(self):
22        self.decompressor = brotli.Decompressor()
23
24    def tearDown(self):
25        self.decompressor = None
26
27    def _check_decompression(self, test_data):
28        # Verify decompression matches the original.
29        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
30        original = _get_original_name(test_data)
31        self.assertFilesMatch(temp_uncompressed, original)
32
33    def _decompress(self, test_data):
34        temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
35        with open(temp_uncompressed, 'wb') as out_file:
36            with open(test_data, 'rb') as in_file:
37                read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
38                for data in iter(read_chunk, b''):
39                    out_file.write(self.decompressor.process(data))
40        self.assertTrue(self.decompressor.is_finished())
41
42    def _test_decompress(self, test_data):
43        self._decompress(test_data)
44        self._check_decompression(test_data)
45
46    def test_garbage_appended(self):
47        with self.assertRaises(brotli.error):
48            self.decompressor.process(brotli.compress(b'a') + b'a')
49
50    def test_already_finished(self):
51        self.decompressor.process(brotli.compress(b'a'))
52        with self.assertRaises(brotli.error):
53            self.decompressor.process(b'a')
54
55
56_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
57
58if __name__ == '__main__':
59    unittest.main()
60