1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16import os 17import tempfile 18import time 19import unittest 20import zipfile 21 22import common 23 24 25def random_string_with_holes(size, block_size, step_size): 26 data = ["\0"] * size 27 for begin in range(0, size, step_size): 28 end = begin + block_size 29 data[begin:end] = os.urandom(block_size) 30 return "".join(data) 31 32def get_2gb_string(): 33 kilobytes = 1024 34 megabytes = 1024 * kilobytes 35 gigabytes = 1024 * megabytes 36 37 size = int(2 * gigabytes + 1) 38 block_size = 4 * kilobytes 39 step_size = 4 * megabytes 40 two_gb_string = random_string_with_holes( 41 size, block_size, step_size) 42 return two_gb_string 43 44 45class CommonZipTest(unittest.TestCase): 46 def _verify(self, zip_file, zip_file_name, arcname, contents, 47 test_file_name=None, expected_stat=None, expected_mode=0o644, 48 expected_compress_type=zipfile.ZIP_STORED): 49 # Verify the stat if present. 50 if test_file_name is not None: 51 new_stat = os.stat(test_file_name) 52 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 53 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 54 55 # Reopen the zip file to verify. 56 zip_file = zipfile.ZipFile(zip_file_name, "r") 57 58 # Verify the timestamp. 59 info = zip_file.getinfo(arcname) 60 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 61 62 # Verify the file mode. 63 mode = (info.external_attr >> 16) & 0o777 64 self.assertEqual(mode, expected_mode) 65 66 # Verify the compress type. 67 self.assertEqual(info.compress_type, expected_compress_type) 68 69 # Verify the zip contents. 70 self.assertEqual(zip_file.read(arcname), contents) 71 self.assertIsNone(zip_file.testzip()) 72 73 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 74 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 75 76 test_file = tempfile.NamedTemporaryFile(delete=False) 77 test_file_name = test_file.name 78 79 zip_file = tempfile.NamedTemporaryFile(delete=False) 80 zip_file_name = zip_file.name 81 82 # File names within an archive strip the leading slash. 83 arcname = extra_zipwrite_args.get("arcname", test_file_name) 84 if arcname[0] == "/": 85 arcname = arcname[1:] 86 87 zip_file.close() 88 zip_file = zipfile.ZipFile(zip_file_name, "w") 89 90 try: 91 test_file.write(contents) 92 test_file.close() 93 94 expected_stat = os.stat(test_file_name) 95 expected_mode = extra_zipwrite_args.get("perms", 0o644) 96 expected_compress_type = extra_zipwrite_args.get("compress_type", 97 zipfile.ZIP_STORED) 98 time.sleep(5) # Make sure the atime/mtime will change measurably. 99 100 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 101 common.ZipClose(zip_file) 102 103 self._verify(zip_file, zip_file_name, arcname, contents, test_file_name, 104 expected_stat, expected_mode, expected_compress_type) 105 finally: 106 os.remove(test_file_name) 107 os.remove(zip_file_name) 108 109 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 110 extra_args = dict(extra_args or {}) 111 112 zip_file = tempfile.NamedTemporaryFile(delete=False) 113 zip_file_name = zip_file.name 114 zip_file.close() 115 116 zip_file = zipfile.ZipFile(zip_file_name, "w") 117 118 try: 119 expected_compress_type = extra_args.get("compress_type", 120 zipfile.ZIP_STORED) 121 time.sleep(5) # Make sure the atime/mtime will change measurably. 122 123 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 124 arcname = zinfo_or_arcname 125 expected_mode = extra_args.get("perms", 0o644) 126 else: 127 arcname = zinfo_or_arcname.filename 128 expected_mode = extra_args.get("perms", 129 zinfo_or_arcname.external_attr >> 16) 130 131 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 132 common.ZipClose(zip_file) 133 134 self._verify(zip_file, zip_file_name, arcname, contents, 135 expected_mode=expected_mode, 136 expected_compress_type=expected_compress_type) 137 finally: 138 os.remove(zip_file_name) 139 140 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 141 extra_args = dict(extra_args or {}) 142 143 zip_file = tempfile.NamedTemporaryFile(delete=False) 144 zip_file_name = zip_file.name 145 146 test_file = tempfile.NamedTemporaryFile(delete=False) 147 test_file_name = test_file.name 148 149 arcname_large = test_file_name 150 arcname_small = "bar" 151 152 # File names within an archive strip the leading slash. 153 if arcname_large[0] == "/": 154 arcname_large = arcname_large[1:] 155 156 zip_file.close() 157 zip_file = zipfile.ZipFile(zip_file_name, "w") 158 159 try: 160 test_file.write(large) 161 test_file.close() 162 163 expected_stat = os.stat(test_file_name) 164 expected_mode = 0o644 165 expected_compress_type = extra_args.get("compress_type", 166 zipfile.ZIP_STORED) 167 time.sleep(5) # Make sure the atime/mtime will change measurably. 168 169 common.ZipWrite(zip_file, test_file_name, **extra_args) 170 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 171 common.ZipClose(zip_file) 172 173 # Verify the contents written by ZipWrite(). 174 self._verify(zip_file, zip_file_name, arcname_large, large, 175 test_file_name, expected_stat, expected_mode, 176 expected_compress_type) 177 178 # Verify the contents written by ZipWriteStr(). 179 self._verify(zip_file, zip_file_name, arcname_small, small, 180 expected_compress_type=expected_compress_type) 181 finally: 182 os.remove(zip_file_name) 183 os.remove(test_file_name) 184 185 def _test_reset_ZIP64_LIMIT(self, func, *args): 186 default_limit = (1 << 31) - 1 187 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 188 func(*args) 189 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 190 191 def test_ZipWrite(self): 192 file_contents = os.urandom(1024) 193 self._test_ZipWrite(file_contents) 194 195 def test_ZipWrite_with_opts(self): 196 file_contents = os.urandom(1024) 197 self._test_ZipWrite(file_contents, { 198 "arcname": "foobar", 199 "perms": 0o777, 200 "compress_type": zipfile.ZIP_DEFLATED, 201 }) 202 self._test_ZipWrite(file_contents, { 203 "arcname": "foobar", 204 "perms": 0o700, 205 "compress_type": zipfile.ZIP_STORED, 206 }) 207 208 def test_ZipWrite_large_file(self): 209 file_contents = get_2gb_string() 210 self._test_ZipWrite(file_contents, { 211 "compress_type": zipfile.ZIP_DEFLATED, 212 }) 213 214 def test_ZipWrite_resets_ZIP64_LIMIT(self): 215 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 216 217 def test_ZipWriteStr(self): 218 random_string = os.urandom(1024) 219 # Passing arcname 220 self._test_ZipWriteStr("foo", random_string) 221 222 # Passing zinfo 223 zinfo = zipfile.ZipInfo(filename="foo") 224 self._test_ZipWriteStr(zinfo, random_string) 225 226 # Timestamp in the zinfo should be overwritten. 227 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 228 self._test_ZipWriteStr(zinfo, random_string) 229 230 def test_ZipWriteStr_with_opts(self): 231 random_string = os.urandom(1024) 232 # Passing arcname 233 self._test_ZipWriteStr("foo", random_string, { 234 "perms": 0o700, 235 "compress_type": zipfile.ZIP_DEFLATED, 236 }) 237 self._test_ZipWriteStr("bar", random_string, { 238 "compress_type": zipfile.ZIP_STORED, 239 }) 240 241 # Passing zinfo 242 zinfo = zipfile.ZipInfo(filename="foo") 243 self._test_ZipWriteStr(zinfo, random_string, { 244 "compress_type": zipfile.ZIP_DEFLATED, 245 }) 246 self._test_ZipWriteStr(zinfo, random_string, { 247 "perms": 0o600, 248 "compress_type": zipfile.ZIP_STORED, 249 }) 250 251 def test_ZipWriteStr_large_file(self): 252 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 253 # the workaround. We will only test the case of writing a string into a 254 # large archive. 255 long_string = get_2gb_string() 256 short_string = os.urandom(1024) 257 self._test_ZipWriteStr_large_file(long_string, short_string, { 258 "compress_type": zipfile.ZIP_DEFLATED, 259 }) 260 261 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 262 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") 263 zinfo = zipfile.ZipInfo(filename="foo") 264 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") 265 266 def test_bug21309935(self): 267 zip_file = tempfile.NamedTemporaryFile(delete=False) 268 zip_file_name = zip_file.name 269 zip_file.close() 270 271 try: 272 random_string = os.urandom(1024) 273 zip_file = zipfile.ZipFile(zip_file_name, "w") 274 # Default perms should be 0o644 when passing the filename. 275 common.ZipWriteStr(zip_file, "foo", random_string) 276 # Honor the specified perms. 277 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 278 # The perms in zinfo should be untouched. 279 zinfo = zipfile.ZipInfo(filename="baz") 280 zinfo.external_attr = 0o740 << 16 281 common.ZipWriteStr(zip_file, zinfo, random_string) 282 # Explicitly specified perms has the priority. 283 zinfo = zipfile.ZipInfo(filename="qux") 284 zinfo.external_attr = 0o700 << 16 285 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 286 common.ZipClose(zip_file) 287 288 self._verify(zip_file, zip_file_name, "foo", random_string, 289 expected_mode=0o644) 290 self._verify(zip_file, zip_file_name, "bar", random_string, 291 expected_mode=0o755) 292 self._verify(zip_file, zip_file_name, "baz", random_string, 293 expected_mode=0o740) 294 self._verify(zip_file, zip_file_name, "qux", random_string, 295 expected_mode=0o400) 296 finally: 297 os.remove(zip_file_name) 298