1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""Unittests for parsing files in zip64 format"""
19
20import os
21import subprocess
22import tempfile
23import unittest
24import zipfile
25import time
26
27class Zip64Test(unittest.TestCase):
28  @staticmethod
29  def _WriteFile(path, size_in_kib):
30    contents = os.path.basename(path)[0] * 1024
31    with open(path, 'w') as f:
32      for it in range(0, size_in_kib):
33        f.write(contents)
34
35  @staticmethod
36  def _AddEntriesToZip(output_zip, entries_dict=None):
37    for name, size in entries_dict.items():
38      file_path = tempfile.NamedTemporaryFile()
39      Zip64Test._WriteFile(file_path.name, size)
40      output_zip.write(file_path.name, arcname = name)
41
42  def _getEntryNames(self, zip_name):
43    cmd = ['ziptool', 'zipinfo', '-1', zip_name]
44    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
45    output, _ = proc.communicate()
46    self.assertEquals(0, proc.returncode)
47    self.assertNotEqual(None, output)
48    return output.split()
49
50  def _ExtractEntries(self, zip_name):
51    temp_dir = tempfile.mkdtemp()
52    cmd = ['ziptool', 'unzip', '-d', temp_dir, zip_name]
53    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
54    proc.communicate()
55    self.assertEquals(0, proc.returncode)
56
57  def test_entriesSmallerThan2G(self):
58    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
59    # Add a few entries with each of them smaller than 2GiB. But the entire zip file is larger
60    # than 4GiB in size.
61    with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
62      entry_dict = {'a.txt': 1025 * 1024, 'b.txt': 1025 * 1024, 'c.txt': 1025 * 1024,
63                    'd.txt': 1025 * 1024, 'e.txt': 1024}
64      self._AddEntriesToZip(output_zip, entry_dict)
65
66    read_names = self._getEntryNames(zip_path.name)
67    self.assertEquals(sorted(entry_dict.keys()), sorted(read_names))
68    self._ExtractEntries(zip_path.name)
69
70
71  def test_largeNumberOfEntries(self):
72    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
73    entry_dict = {}
74    # Add 100k entries (more than 65535|UINT16_MAX).
75    for num in range(0, 100 * 1024):
76      entry_dict[str(num)] = 50
77
78    with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
79      self._AddEntriesToZip(output_zip, entry_dict)
80
81    read_names = self._getEntryNames(zip_path.name)
82    self.assertEquals(sorted(entry_dict.keys()), sorted(read_names))
83    self._ExtractEntries(zip_path.name)
84
85
86  def test_largeCompressedEntriesSmallerThan4G(self):
87    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
88    with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
89                         allowZip64=True) as output_zip:
90      # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
91      # sizes in the extra field. Test if our ziptool should be able to parse it.
92      entry_dict = {'e.txt': 4095 * 1024, 'f.txt': 4095 * 1024}
93      self._AddEntriesToZip(output_zip, entry_dict)
94
95    read_names = self._getEntryNames(zip_path.name)
96    self.assertEquals(sorted(entry_dict.keys()), sorted(read_names))
97    self._ExtractEntries(zip_path.name)
98
99
100  def test_forceDataDescriptor(self):
101    file_path = tempfile.NamedTemporaryFile(suffix='.txt')
102    self._WriteFile(file_path.name, 5000 * 1024)
103
104    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
105    with zipfile.ZipFile(zip_path, 'w', allowZip64=True) as output_zip:
106      pass
107    # The fd option force writes a data descriptor
108    cmd = ['zip', '-fd', zip_path.name, file_path.name]
109    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
110    proc.communicate()
111    read_names = self._getEntryNames(zip_path.name)
112    self.assertEquals([file_path.name[1:]], read_names)
113    self._ExtractEntries(zip_path.name)
114
115
116  def test_largeUncompressedEntriesLargerThan4G(self):
117    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
118    with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED,
119                         allowZip64=True) as output_zip:
120      # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
121      # sizes in the extra field. Test if our ziptool should be able to parse it.
122      entry_dict = {'g.txt': 5000 * 1024, 'h.txt': 6000 * 1024}
123      self._AddEntriesToZip(output_zip, entry_dict)
124
125    read_names = self._getEntryNames(zip_path.name)
126    self.assertEquals(sorted(entry_dict.keys()), sorted(read_names))
127    self._ExtractEntries(zip_path.name)
128
129
130  def test_largeCompressedEntriesLargerThan4G(self):
131    zip_path = tempfile.NamedTemporaryFile(suffix='.zip')
132    with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_DEFLATED,
133                         allowZip64=True) as output_zip:
134      # Add entries close to 4GiB in size. Somehow the python library will put the (un)compressed
135      # sizes in the extra field. Test if our ziptool should be able to parse it.
136      entry_dict = {'i.txt': 4096 * 1024, 'j.txt': 7000 * 1024}
137      self._AddEntriesToZip(output_zip, entry_dict)
138
139    read_names = self._getEntryNames(zip_path.name)
140    self.assertEquals(sorted(entry_dict.keys()), sorted(read_names))
141    self._ExtractEntries(zip_path.name)
142
143
144if __name__ == '__main__':
145  testsuite = unittest.TestLoader().discover(
146      os.path.dirname(os.path.realpath(__file__)))
147  unittest.TextTestRunner(verbosity=2).run(testsuite)
148