1# 2# Copyright (C) 2015 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import os 18import subprocess 19import tempfile 20import time 21import unittest 22import zipfile 23from hashlib import sha1 24 25import common 26import test_utils 27import validate_target_files 28from rangelib import RangeSet 29 30 31KiB = 1024 32MiB = 1024 * KiB 33GiB = 1024 * MiB 34 35 36def get_2gb_string(): 37 size = int(2 * GiB + 1) 38 block_size = 4 * KiB 39 step_size = 4 * MiB 40 # Generate a long string with holes, e.g. 'xyz\x00abc\x00...'. 41 for _ in range(0, size, step_size): 42 yield os.urandom(block_size) 43 yield '\0' * (step_size - block_size) 44 45 46class CommonZipTest(unittest.TestCase): 47 def _verify(self, zip_file, zip_file_name, arcname, expected_hash, 48 test_file_name=None, expected_stat=None, expected_mode=0o644, 49 expected_compress_type=zipfile.ZIP_STORED): 50 # Verify the stat if present. 51 if test_file_name is not None: 52 new_stat = os.stat(test_file_name) 53 self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode)) 54 self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime)) 55 56 # Reopen the zip file to verify. 57 zip_file = zipfile.ZipFile(zip_file_name, "r") 58 59 # Verify the timestamp. 60 info = zip_file.getinfo(arcname) 61 self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0)) 62 63 # Verify the file mode. 64 mode = (info.external_attr >> 16) & 0o777 65 self.assertEqual(mode, expected_mode) 66 67 # Verify the compress type. 68 self.assertEqual(info.compress_type, expected_compress_type) 69 70 # Verify the zip contents. 71 entry = zip_file.open(arcname) 72 sha1_hash = sha1() 73 for chunk in iter(lambda: entry.read(4 * MiB), ''): 74 sha1_hash.update(chunk) 75 self.assertEqual(expected_hash, sha1_hash.hexdigest()) 76 self.assertIsNone(zip_file.testzip()) 77 78 def _test_ZipWrite(self, contents, extra_zipwrite_args=None): 79 extra_zipwrite_args = dict(extra_zipwrite_args or {}) 80 81 test_file = tempfile.NamedTemporaryFile(delete=False) 82 test_file_name = test_file.name 83 84 zip_file = tempfile.NamedTemporaryFile(delete=False) 85 zip_file_name = zip_file.name 86 87 # File names within an archive strip the leading slash. 88 arcname = extra_zipwrite_args.get("arcname", test_file_name) 89 if arcname[0] == "/": 90 arcname = arcname[1:] 91 92 zip_file.close() 93 zip_file = zipfile.ZipFile(zip_file_name, "w") 94 95 try: 96 sha1_hash = sha1() 97 for data in contents: 98 sha1_hash.update(data) 99 test_file.write(data) 100 test_file.close() 101 102 expected_stat = os.stat(test_file_name) 103 expected_mode = extra_zipwrite_args.get("perms", 0o644) 104 expected_compress_type = extra_zipwrite_args.get("compress_type", 105 zipfile.ZIP_STORED) 106 time.sleep(5) # Make sure the atime/mtime will change measurably. 107 108 common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) 109 common.ZipClose(zip_file) 110 111 self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(), 112 test_file_name, expected_stat, expected_mode, 113 expected_compress_type) 114 finally: 115 os.remove(test_file_name) 116 os.remove(zip_file_name) 117 118 def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None): 119 extra_args = dict(extra_args or {}) 120 121 zip_file = tempfile.NamedTemporaryFile(delete=False) 122 zip_file_name = zip_file.name 123 zip_file.close() 124 125 zip_file = zipfile.ZipFile(zip_file_name, "w") 126 127 try: 128 expected_compress_type = extra_args.get("compress_type", 129 zipfile.ZIP_STORED) 130 time.sleep(5) # Make sure the atime/mtime will change measurably. 131 132 if not isinstance(zinfo_or_arcname, zipfile.ZipInfo): 133 arcname = zinfo_or_arcname 134 expected_mode = extra_args.get("perms", 0o644) 135 else: 136 arcname = zinfo_or_arcname.filename 137 expected_mode = extra_args.get("perms", 138 zinfo_or_arcname.external_attr >> 16) 139 140 common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) 141 common.ZipClose(zip_file) 142 143 self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(), 144 expected_mode=expected_mode, 145 expected_compress_type=expected_compress_type) 146 finally: 147 os.remove(zip_file_name) 148 149 def _test_ZipWriteStr_large_file(self, large, small, extra_args=None): 150 extra_args = dict(extra_args or {}) 151 152 zip_file = tempfile.NamedTemporaryFile(delete=False) 153 zip_file_name = zip_file.name 154 155 test_file = tempfile.NamedTemporaryFile(delete=False) 156 test_file_name = test_file.name 157 158 arcname_large = test_file_name 159 arcname_small = "bar" 160 161 # File names within an archive strip the leading slash. 162 if arcname_large[0] == "/": 163 arcname_large = arcname_large[1:] 164 165 zip_file.close() 166 zip_file = zipfile.ZipFile(zip_file_name, "w") 167 168 try: 169 sha1_hash = sha1() 170 for data in large: 171 sha1_hash.update(data) 172 test_file.write(data) 173 test_file.close() 174 175 expected_stat = os.stat(test_file_name) 176 expected_mode = 0o644 177 expected_compress_type = extra_args.get("compress_type", 178 zipfile.ZIP_STORED) 179 time.sleep(5) # Make sure the atime/mtime will change measurably. 180 181 common.ZipWrite(zip_file, test_file_name, **extra_args) 182 common.ZipWriteStr(zip_file, arcname_small, small, **extra_args) 183 common.ZipClose(zip_file) 184 185 # Verify the contents written by ZipWrite(). 186 self._verify(zip_file, zip_file_name, arcname_large, 187 sha1_hash.hexdigest(), test_file_name, expected_stat, 188 expected_mode, expected_compress_type) 189 190 # Verify the contents written by ZipWriteStr(). 191 self._verify(zip_file, zip_file_name, arcname_small, 192 sha1(small).hexdigest(), 193 expected_compress_type=expected_compress_type) 194 finally: 195 os.remove(zip_file_name) 196 os.remove(test_file_name) 197 198 def _test_reset_ZIP64_LIMIT(self, func, *args): 199 default_limit = (1 << 31) - 1 200 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 201 func(*args) 202 self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) 203 204 def test_ZipWrite(self): 205 file_contents = os.urandom(1024) 206 self._test_ZipWrite(file_contents) 207 208 def test_ZipWrite_with_opts(self): 209 file_contents = os.urandom(1024) 210 self._test_ZipWrite(file_contents, { 211 "arcname": "foobar", 212 "perms": 0o777, 213 "compress_type": zipfile.ZIP_DEFLATED, 214 }) 215 self._test_ZipWrite(file_contents, { 216 "arcname": "foobar", 217 "perms": 0o700, 218 "compress_type": zipfile.ZIP_STORED, 219 }) 220 221 def test_ZipWrite_large_file(self): 222 file_contents = get_2gb_string() 223 self._test_ZipWrite(file_contents, { 224 "compress_type": zipfile.ZIP_DEFLATED, 225 }) 226 227 def test_ZipWrite_resets_ZIP64_LIMIT(self): 228 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "") 229 230 def test_ZipWriteStr(self): 231 random_string = os.urandom(1024) 232 # Passing arcname 233 self._test_ZipWriteStr("foo", random_string) 234 235 # Passing zinfo 236 zinfo = zipfile.ZipInfo(filename="foo") 237 self._test_ZipWriteStr(zinfo, random_string) 238 239 # Timestamp in the zinfo should be overwritten. 240 zinfo.date_time = (2015, 3, 1, 15, 30, 0) 241 self._test_ZipWriteStr(zinfo, random_string) 242 243 def test_ZipWriteStr_with_opts(self): 244 random_string = os.urandom(1024) 245 # Passing arcname 246 self._test_ZipWriteStr("foo", random_string, { 247 "perms": 0o700, 248 "compress_type": zipfile.ZIP_DEFLATED, 249 }) 250 self._test_ZipWriteStr("bar", random_string, { 251 "compress_type": zipfile.ZIP_STORED, 252 }) 253 254 # Passing zinfo 255 zinfo = zipfile.ZipInfo(filename="foo") 256 self._test_ZipWriteStr(zinfo, random_string, { 257 "compress_type": zipfile.ZIP_DEFLATED, 258 }) 259 self._test_ZipWriteStr(zinfo, random_string, { 260 "perms": 0o600, 261 "compress_type": zipfile.ZIP_STORED, 262 }) 263 264 def test_ZipWriteStr_large_file(self): 265 # zipfile.writestr() doesn't work when the str size is over 2GiB even with 266 # the workaround. We will only test the case of writing a string into a 267 # large archive. 268 long_string = get_2gb_string() 269 short_string = os.urandom(1024) 270 self._test_ZipWriteStr_large_file(long_string, short_string, { 271 "compress_type": zipfile.ZIP_DEFLATED, 272 }) 273 274 def test_ZipWriteStr_resets_ZIP64_LIMIT(self): 275 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "") 276 zinfo = zipfile.ZipInfo(filename="foo") 277 self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "") 278 279 def test_bug21309935(self): 280 zip_file = tempfile.NamedTemporaryFile(delete=False) 281 zip_file_name = zip_file.name 282 zip_file.close() 283 284 try: 285 random_string = os.urandom(1024) 286 zip_file = zipfile.ZipFile(zip_file_name, "w") 287 # Default perms should be 0o644 when passing the filename. 288 common.ZipWriteStr(zip_file, "foo", random_string) 289 # Honor the specified perms. 290 common.ZipWriteStr(zip_file, "bar", random_string, perms=0o755) 291 # The perms in zinfo should be untouched. 292 zinfo = zipfile.ZipInfo(filename="baz") 293 zinfo.external_attr = 0o740 << 16 294 common.ZipWriteStr(zip_file, zinfo, random_string) 295 # Explicitly specified perms has the priority. 296 zinfo = zipfile.ZipInfo(filename="qux") 297 zinfo.external_attr = 0o700 << 16 298 common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) 299 common.ZipClose(zip_file) 300 301 self._verify(zip_file, zip_file_name, "foo", 302 sha1(random_string).hexdigest(), 303 expected_mode=0o644) 304 self._verify(zip_file, zip_file_name, "bar", 305 sha1(random_string).hexdigest(), 306 expected_mode=0o755) 307 self._verify(zip_file, zip_file_name, "baz", 308 sha1(random_string).hexdigest(), 309 expected_mode=0o740) 310 self._verify(zip_file, zip_file_name, "qux", 311 sha1(random_string).hexdigest(), 312 expected_mode=0o400) 313 finally: 314 os.remove(zip_file_name) 315 316 def test_ZipDelete(self): 317 zip_file = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') 318 output_zip = zipfile.ZipFile(zip_file.name, 'w', 319 compression=zipfile.ZIP_DEFLATED) 320 with tempfile.NamedTemporaryFile() as entry_file: 321 entry_file.write(os.urandom(1024)) 322 common.ZipWrite(output_zip, entry_file.name, arcname='Test1') 323 common.ZipWrite(output_zip, entry_file.name, arcname='Test2') 324 common.ZipWrite(output_zip, entry_file.name, arcname='Test3') 325 common.ZipClose(output_zip) 326 zip_file.close() 327 328 try: 329 common.ZipDelete(zip_file.name, 'Test2') 330 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 331 entries = check_zip.namelist() 332 self.assertTrue('Test1' in entries) 333 self.assertFalse('Test2' in entries) 334 self.assertTrue('Test3' in entries) 335 336 self.assertRaises(AssertionError, common.ZipDelete, zip_file.name, 337 'Test2') 338 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 339 entries = check_zip.namelist() 340 self.assertTrue('Test1' in entries) 341 self.assertFalse('Test2' in entries) 342 self.assertTrue('Test3' in entries) 343 344 common.ZipDelete(zip_file.name, ['Test3']) 345 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 346 entries = check_zip.namelist() 347 self.assertTrue('Test1' in entries) 348 self.assertFalse('Test2' in entries) 349 self.assertFalse('Test3' in entries) 350 351 common.ZipDelete(zip_file.name, ['Test1', 'Test2']) 352 with zipfile.ZipFile(zip_file.name, 'r') as check_zip: 353 entries = check_zip.namelist() 354 self.assertFalse('Test1' in entries) 355 self.assertFalse('Test2' in entries) 356 self.assertFalse('Test3' in entries) 357 finally: 358 os.remove(zip_file.name) 359 360 361class CommonApkUtilsTest(unittest.TestCase): 362 """Tests the APK utils related functions.""" 363 364 APKCERTS_TXT1 = ( 365 'name="RecoveryLocalizer.apk" certificate="certs/devkey.x509.pem"' 366 ' private_key="certs/devkey.pk8"\n' 367 'name="Settings.apk"' 368 ' certificate="build/target/product/security/platform.x509.pem"' 369 ' private_key="build/target/product/security/platform.pk8"\n' 370 'name="TV.apk" certificate="PRESIGNED" private_key=""\n' 371 ) 372 373 APKCERTS_CERTMAP1 = { 374 'RecoveryLocalizer.apk' : 'certs/devkey', 375 'Settings.apk' : 'build/target/product/security/platform', 376 'TV.apk' : 'PRESIGNED', 377 } 378 379 APKCERTS_TXT2 = ( 380 'name="Compressed1.apk" certificate="certs/compressed1.x509.pem"' 381 ' private_key="certs/compressed1.pk8" compressed="gz"\n' 382 'name="Compressed2a.apk" certificate="certs/compressed2.x509.pem"' 383 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 384 'name="Compressed2b.apk" certificate="certs/compressed2.x509.pem"' 385 ' private_key="certs/compressed2.pk8" compressed="gz"\n' 386 'name="Compressed3.apk" certificate="certs/compressed3.x509.pem"' 387 ' private_key="certs/compressed3.pk8" compressed="gz"\n' 388 ) 389 390 APKCERTS_CERTMAP2 = { 391 'Compressed1.apk' : 'certs/compressed1', 392 'Compressed2a.apk' : 'certs/compressed2', 393 'Compressed2b.apk' : 'certs/compressed2', 394 'Compressed3.apk' : 'certs/compressed3', 395 } 396 397 APKCERTS_TXT3 = ( 398 'name="Compressed4.apk" certificate="certs/compressed4.x509.pem"' 399 ' private_key="certs/compressed4.pk8" compressed="xz"\n' 400 ) 401 402 APKCERTS_CERTMAP3 = { 403 'Compressed4.apk' : 'certs/compressed4', 404 } 405 406 def setUp(self): 407 self.testdata_dir = test_utils.get_testdata_dir() 408 409 def tearDown(self): 410 common.Cleanup() 411 412 @staticmethod 413 def _write_apkcerts_txt(apkcerts_txt, additional=None): 414 if additional is None: 415 additional = [] 416 target_files = common.MakeTempFile(suffix='.zip') 417 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 418 target_files_zip.writestr('META/apkcerts.txt', apkcerts_txt) 419 for entry in additional: 420 target_files_zip.writestr(entry, '') 421 return target_files 422 423 def test_ReadApkCerts_NoncompressedApks(self): 424 target_files = self._write_apkcerts_txt(self.APKCERTS_TXT1) 425 with zipfile.ZipFile(target_files, 'r') as input_zip: 426 certmap, ext = common.ReadApkCerts(input_zip) 427 428 self.assertDictEqual(self.APKCERTS_CERTMAP1, certmap) 429 self.assertIsNone(ext) 430 431 def test_ReadApkCerts_CompressedApks(self): 432 # We have "installed" Compressed1.apk.gz only. Note that Compressed3.apk is 433 # not stored in '.gz' format, so it shouldn't be considered as installed. 434 target_files = self._write_apkcerts_txt( 435 self.APKCERTS_TXT2, 436 ['Compressed1.apk.gz', 'Compressed3.apk']) 437 438 with zipfile.ZipFile(target_files, 'r') as input_zip: 439 certmap, ext = common.ReadApkCerts(input_zip) 440 441 self.assertDictEqual(self.APKCERTS_CERTMAP2, certmap) 442 self.assertEqual('.gz', ext) 443 444 # Alternative case with '.xz'. 445 target_files = self._write_apkcerts_txt( 446 self.APKCERTS_TXT3, ['Compressed4.apk.xz']) 447 448 with zipfile.ZipFile(target_files, 'r') as input_zip: 449 certmap, ext = common.ReadApkCerts(input_zip) 450 451 self.assertDictEqual(self.APKCERTS_CERTMAP3, certmap) 452 self.assertEqual('.xz', ext) 453 454 def test_ReadApkCerts_CompressedAndNoncompressedApks(self): 455 target_files = self._write_apkcerts_txt( 456 self.APKCERTS_TXT1 + self.APKCERTS_TXT2, 457 ['Compressed1.apk.gz', 'Compressed3.apk']) 458 459 with zipfile.ZipFile(target_files, 'r') as input_zip: 460 certmap, ext = common.ReadApkCerts(input_zip) 461 462 certmap_merged = self.APKCERTS_CERTMAP1.copy() 463 certmap_merged.update(self.APKCERTS_CERTMAP2) 464 self.assertDictEqual(certmap_merged, certmap) 465 self.assertEqual('.gz', ext) 466 467 def test_ReadApkCerts_MultipleCompressionMethods(self): 468 target_files = self._write_apkcerts_txt( 469 self.APKCERTS_TXT2 + self.APKCERTS_TXT3, 470 ['Compressed1.apk.gz', 'Compressed4.apk.xz']) 471 472 with zipfile.ZipFile(target_files, 'r') as input_zip: 473 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 474 475 def test_ReadApkCerts_MismatchingKeys(self): 476 malformed_apkcerts_txt = ( 477 'name="App1.apk" certificate="certs/cert1.x509.pem"' 478 ' private_key="certs/cert2.pk8"\n' 479 ) 480 target_files = self._write_apkcerts_txt(malformed_apkcerts_txt) 481 482 with zipfile.ZipFile(target_files, 'r') as input_zip: 483 self.assertRaises(ValueError, common.ReadApkCerts, input_zip) 484 485 def test_ExtractPublicKey(self): 486 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 487 pubkey = os.path.join(self.testdata_dir, 'testkey.pubkey.pem') 488 with open(pubkey, 'rb') as pubkey_fp: 489 self.assertEqual(pubkey_fp.read(), common.ExtractPublicKey(cert)) 490 491 def test_ExtractPublicKey_invalidInput(self): 492 wrong_input = os.path.join(self.testdata_dir, 'testkey.pk8') 493 self.assertRaises(AssertionError, common.ExtractPublicKey, wrong_input) 494 495 def test_ParseCertificate(self): 496 cert = os.path.join(self.testdata_dir, 'testkey.x509.pem') 497 498 cmd = ['openssl', 'x509', '-in', cert, '-outform', 'DER'] 499 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 500 expected, _ = proc.communicate() 501 self.assertEqual(0, proc.returncode) 502 503 with open(cert) as cert_fp: 504 actual = common.ParseCertificate(cert_fp.read()) 505 self.assertEqual(expected, actual) 506 507 508class CommonUtilsTest(unittest.TestCase): 509 510 def tearDown(self): 511 common.Cleanup() 512 513 def test_GetSparseImage_emptyBlockMapFile(self): 514 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 515 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 516 target_files_zip.write( 517 test_utils.construct_sparse_image([ 518 (0xCAC1, 6), 519 (0xCAC3, 3), 520 (0xCAC1, 4)]), 521 arcname='IMAGES/system.img') 522 target_files_zip.writestr('IMAGES/system.map', '') 523 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 524 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 525 526 tempdir = common.UnzipTemp(target_files) 527 with zipfile.ZipFile(target_files, 'r') as input_zip: 528 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 529 530 self.assertDictEqual( 531 { 532 '__COPY': RangeSet("0"), 533 '__NONZERO-0': RangeSet("1-5 9-12"), 534 }, 535 sparse_image.file_map) 536 537 def test_GetSparseImage_invalidImageName(self): 538 self.assertRaises( 539 AssertionError, common.GetSparseImage, 'system2', None, None, False) 540 self.assertRaises( 541 AssertionError, common.GetSparseImage, 'unknown', None, None, False) 542 543 def test_GetSparseImage_missingBlockMapFile(self): 544 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 545 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 546 target_files_zip.write( 547 test_utils.construct_sparse_image([ 548 (0xCAC1, 6), 549 (0xCAC3, 3), 550 (0xCAC1, 4)]), 551 arcname='IMAGES/system.img') 552 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 8)) 553 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 554 555 tempdir = common.UnzipTemp(target_files) 556 with zipfile.ZipFile(target_files, 'r') as input_zip: 557 self.assertRaises( 558 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 559 False) 560 561 def test_GetSparseImage_sharedBlocks_notAllowed(self): 562 """Tests the case of having overlapping blocks but disallowed.""" 563 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 564 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 565 target_files_zip.write( 566 test_utils.construct_sparse_image([(0xCAC2, 16)]), 567 arcname='IMAGES/system.img') 568 # Block 10 is shared between two files. 569 target_files_zip.writestr( 570 'IMAGES/system.map', 571 '\n'.join([ 572 '/system/file1 1-5 9-10', 573 '/system/file2 10-12'])) 574 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 575 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 576 577 tempdir = common.UnzipTemp(target_files) 578 with zipfile.ZipFile(target_files, 'r') as input_zip: 579 self.assertRaises( 580 AssertionError, common.GetSparseImage, 'system', tempdir, input_zip, 581 False) 582 583 def test_GetSparseImage_sharedBlocks_allowed(self): 584 """Tests the case for target using BOARD_EXT4_SHARE_DUP_BLOCKS := true.""" 585 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 586 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 587 # Construct an image with a care_map of "0-5 9-12". 588 target_files_zip.write( 589 test_utils.construct_sparse_image([(0xCAC2, 16)]), 590 arcname='IMAGES/system.img') 591 # Block 10 is shared between two files. 592 target_files_zip.writestr( 593 'IMAGES/system.map', 594 '\n'.join([ 595 '/system/file1 1-5 9-10', 596 '/system/file2 10-12'])) 597 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 598 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 599 600 tempdir = common.UnzipTemp(target_files) 601 with zipfile.ZipFile(target_files, 'r') as input_zip: 602 sparse_image = common.GetSparseImage('system', tempdir, input_zip, True) 603 604 self.assertDictEqual( 605 { 606 '__COPY': RangeSet("0"), 607 '__NONZERO-0': RangeSet("6-8 13-15"), 608 '/system/file1': RangeSet("1-5 9-10"), 609 '/system/file2': RangeSet("11-12"), 610 }, 611 sparse_image.file_map) 612 613 # '/system/file2' should be marked with 'uses_shared_blocks', but not with 614 # 'incomplete'. 615 self.assertTrue( 616 sparse_image.file_map['/system/file2'].extra['uses_shared_blocks']) 617 self.assertNotIn( 618 'incomplete', sparse_image.file_map['/system/file2'].extra) 619 620 # All other entries should look normal without any tags. 621 self.assertFalse(sparse_image.file_map['__COPY'].extra) 622 self.assertFalse(sparse_image.file_map['__NONZERO-0'].extra) 623 self.assertFalse(sparse_image.file_map['/system/file1'].extra) 624 625 def test_GetSparseImage_incompleteRanges(self): 626 """Tests the case of ext4 images with holes.""" 627 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 628 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 629 target_files_zip.write( 630 test_utils.construct_sparse_image([(0xCAC2, 16)]), 631 arcname='IMAGES/system.img') 632 target_files_zip.writestr( 633 'IMAGES/system.map', 634 '\n'.join([ 635 '/system/file1 1-5 9-10', 636 '/system/file2 11-12'])) 637 target_files_zip.writestr('SYSTEM/file1', os.urandom(4096 * 7)) 638 # '/system/file2' has less blocks listed (2) than actual (3). 639 target_files_zip.writestr('SYSTEM/file2', os.urandom(4096 * 3)) 640 641 tempdir = common.UnzipTemp(target_files) 642 with zipfile.ZipFile(target_files, 'r') as input_zip: 643 sparse_image = common.GetSparseImage('system', tempdir, input_zip, False) 644 645 self.assertFalse(sparse_image.file_map['/system/file1'].extra) 646 self.assertTrue(sparse_image.file_map['/system/file2'].extra['incomplete']) 647 648 649class InstallRecoveryScriptFormatTest(unittest.TestCase): 650 """Checks the format of install-recovery.sh. 651 652 Its format should match between common.py and validate_target_files.py. 653 """ 654 655 def setUp(self): 656 self._tempdir = common.MakeTempDir() 657 # Create a dummy dict that contains the fstab info for boot&recovery. 658 self._info = {"fstab" : {}} 659 dummy_fstab = [ 660 "/dev/soc.0/by-name/boot /boot emmc defaults defaults", 661 "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] 662 self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab) 663 # Construct the gzipped recovery.img and boot.img 664 self.recovery_data = bytearray([ 665 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a, 666 0x4d, 0xce, 0x2f, 0x4b, 0x2d, 0xaa, 0x04, 0x00, 0xc9, 0x93, 0x43, 0xf3, 667 0x08, 0x00, 0x00, 0x00 668 ]) 669 # echo -n "boot" | gzip -f | hd 670 self.boot_data = bytearray([ 671 0x1f, 0x8b, 0x08, 0x00, 0x8c, 0x12, 0x02, 0x5a, 0x00, 0x03, 0x4b, 0xca, 672 0xcf, 0x2f, 0x01, 0x00, 0xc4, 0xae, 0xed, 0x46, 0x04, 0x00, 0x00, 0x00 673 ]) 674 675 def _out_tmp_sink(self, name, data, prefix="SYSTEM"): 676 loc = os.path.join(self._tempdir, prefix, name) 677 if not os.path.exists(os.path.dirname(loc)): 678 os.makedirs(os.path.dirname(loc)) 679 with open(loc, "w+") as f: 680 f.write(data) 681 682 def test_full_recovery(self): 683 recovery_image = common.File("recovery.img", self.recovery_data) 684 boot_image = common.File("boot.img", self.boot_data) 685 self._info["full_recovery_image"] = "true" 686 687 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 688 recovery_image, boot_image, self._info) 689 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 690 self._info) 691 692 def test_recovery_from_boot(self): 693 recovery_image = common.File("recovery.img", self.recovery_data) 694 self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") 695 boot_image = common.File("boot.img", self.boot_data) 696 self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") 697 698 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 699 recovery_image, boot_image, self._info) 700 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 701 self._info) 702 # Validate 'recovery-from-boot' with bonus argument. 703 self._out_tmp_sink("etc/recovery-resource.dat", "bonus", "SYSTEM") 704 common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, 705 recovery_image, boot_image, self._info) 706 validate_target_files.ValidateInstallRecoveryScript(self._tempdir, 707 self._info) 708 709 def tearDown(self): 710 common.Cleanup() 711