1# 2# Copyright (C) 2018 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import copy 18import os 19import os.path 20import subprocess 21import unittest 22import zipfile 23 24import common 25import test_utils 26from ota_from_target_files import ( 27 _LoadOemDicts, AbOtaPropertyFiles, BuildInfo, FinalizeMetadata, 28 GetPackageMetadata, GetTargetFilesZipForSecondaryImages, 29 GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles, 30 Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles, 31 StreamingPropertyFiles, WriteFingerprintAssertion) 32 33 34def construct_target_files(secondary=False): 35 """Returns a target-files.zip file for generating OTA packages.""" 36 target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') 37 with zipfile.ZipFile(target_files, 'w') as target_files_zip: 38 # META/update_engine_config.txt 39 target_files_zip.writestr( 40 'META/update_engine_config.txt', 41 "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n") 42 43 # META/postinstall_config.txt 44 target_files_zip.writestr( 45 POSTINSTALL_CONFIG, 46 '\n'.join([ 47 "RUN_POSTINSTALL_system=true", 48 "POSTINSTALL_PATH_system=system/bin/otapreopt_script", 49 "FILESYSTEM_TYPE_system=ext4", 50 "POSTINSTALL_OPTIONAL_system=true", 51 ])) 52 53 # META/ab_partitions.txt 54 ab_partitions = ['boot', 'system', 'vendor'] 55 target_files_zip.writestr( 56 'META/ab_partitions.txt', 57 '\n'.join(ab_partitions)) 58 59 # Create dummy images for each of them. 60 for partition in ab_partitions: 61 target_files_zip.writestr('IMAGES/' + partition + '.img', 62 os.urandom(len(partition))) 63 64 if secondary: 65 target_files_zip.writestr('IMAGES/system_other.img', 66 os.urandom(len("system_other"))) 67 68 return target_files 69 70 71class MockScriptWriter(object): 72 """A class that mocks edify_generator.EdifyGenerator. 73 74 It simply pushes the incoming arguments onto script stack, which is to assert 75 the calls to EdifyGenerator functions. 76 """ 77 78 def __init__(self): 79 self.script = [] 80 81 def Mount(self, *args): 82 self.script.append(('Mount',) + args) 83 84 def AssertDevice(self, *args): 85 self.script.append(('AssertDevice',) + args) 86 87 def AssertOemProperty(self, *args): 88 self.script.append(('AssertOemProperty',) + args) 89 90 def AssertFingerprintOrThumbprint(self, *args): 91 self.script.append(('AssertFingerprintOrThumbprint',) + args) 92 93 def AssertSomeFingerprint(self, *args): 94 self.script.append(('AssertSomeFingerprint',) + args) 95 96 def AssertSomeThumbprint(self, *args): 97 self.script.append(('AssertSomeThumbprint',) + args) 98 99 100class BuildInfoTest(unittest.TestCase): 101 102 TEST_INFO_DICT = { 103 'build.prop' : { 104 'ro.product.device' : 'product-device', 105 'ro.product.name' : 'product-name', 106 'ro.build.fingerprint' : 'build-fingerprint', 107 'ro.build.foo' : 'build-foo', 108 }, 109 'vendor.build.prop' : { 110 'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint', 111 }, 112 'property1' : 'value1', 113 'property2' : 4096, 114 } 115 116 TEST_INFO_DICT_USES_OEM_PROPS = { 117 'build.prop' : { 118 'ro.product.name' : 'product-name', 119 'ro.build.thumbprint' : 'build-thumbprint', 120 'ro.build.bar' : 'build-bar', 121 }, 122 'vendor.build.prop' : { 123 'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint', 124 }, 125 'property1' : 'value1', 126 'property2' : 4096, 127 'oem_fingerprint_properties' : 'ro.product.device ro.product.brand', 128 } 129 130 TEST_OEM_DICTS = [ 131 { 132 'ro.product.brand' : 'brand1', 133 'ro.product.device' : 'device1', 134 }, 135 { 136 'ro.product.brand' : 'brand2', 137 'ro.product.device' : 'device2', 138 }, 139 { 140 'ro.product.brand' : 'brand3', 141 'ro.product.device' : 'device3', 142 }, 143 ] 144 145 def test_init(self): 146 target_info = BuildInfo(self.TEST_INFO_DICT, None) 147 self.assertEqual('product-device', target_info.device) 148 self.assertEqual('build-fingerprint', target_info.fingerprint) 149 self.assertFalse(target_info.is_ab) 150 self.assertIsNone(target_info.oem_props) 151 152 def test_init_with_oem_props(self): 153 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 154 self.TEST_OEM_DICTS) 155 self.assertEqual('device1', target_info.device) 156 self.assertEqual('brand1/product-name/device1:build-thumbprint', 157 target_info.fingerprint) 158 159 # Swap the order in oem_dicts, which would lead to different BuildInfo. 160 oem_dicts = copy.copy(self.TEST_OEM_DICTS) 161 oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0] 162 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, oem_dicts) 163 self.assertEqual('device3', target_info.device) 164 self.assertEqual('brand3/product-name/device3:build-thumbprint', 165 target_info.fingerprint) 166 167 # Missing oem_dict should be rejected. 168 self.assertRaises(AssertionError, BuildInfo, 169 self.TEST_INFO_DICT_USES_OEM_PROPS, None) 170 171 def test___getitem__(self): 172 target_info = BuildInfo(self.TEST_INFO_DICT, None) 173 self.assertEqual('value1', target_info['property1']) 174 self.assertEqual(4096, target_info['property2']) 175 self.assertEqual('build-foo', target_info['build.prop']['ro.build.foo']) 176 177 def test___getitem__with_oem_props(self): 178 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 179 self.TEST_OEM_DICTS) 180 self.assertEqual('value1', target_info['property1']) 181 self.assertEqual(4096, target_info['property2']) 182 self.assertRaises(KeyError, 183 lambda: target_info['build.prop']['ro.build.foo']) 184 185 def test_get(self): 186 target_info = BuildInfo(self.TEST_INFO_DICT, None) 187 self.assertEqual('value1', target_info.get('property1')) 188 self.assertEqual(4096, target_info.get('property2')) 189 self.assertEqual(4096, target_info.get('property2', 1024)) 190 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 191 self.assertEqual('build-foo', target_info.get('build.prop')['ro.build.foo']) 192 193 def test_get_with_oem_props(self): 194 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 195 self.TEST_OEM_DICTS) 196 self.assertEqual('value1', target_info.get('property1')) 197 self.assertEqual(4096, target_info.get('property2')) 198 self.assertEqual(4096, target_info.get('property2', 1024)) 199 self.assertEqual(1024, target_info.get('property-nonexistent', 1024)) 200 self.assertIsNone(target_info.get('build.prop').get('ro.build.foo')) 201 self.assertRaises(KeyError, 202 lambda: target_info.get('build.prop')['ro.build.foo']) 203 204 def test_GetBuildProp(self): 205 target_info = BuildInfo(self.TEST_INFO_DICT, None) 206 self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo')) 207 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 208 'ro.build.nonexistent') 209 210 def test_GetBuildProp_with_oem_props(self): 211 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 212 self.TEST_OEM_DICTS) 213 self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar')) 214 self.assertRaises(common.ExternalError, target_info.GetBuildProp, 215 'ro.build.nonexistent') 216 217 def test_GetVendorBuildProp(self): 218 target_info = BuildInfo(self.TEST_INFO_DICT, None) 219 self.assertEqual('vendor-build-fingerprint', 220 target_info.GetVendorBuildProp( 221 'ro.vendor.build.fingerprint')) 222 self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp, 223 'ro.build.nonexistent') 224 225 def test_GetVendorBuildProp_with_oem_props(self): 226 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 227 self.TEST_OEM_DICTS) 228 self.assertEqual('vendor-build-fingerprint', 229 target_info.GetVendorBuildProp( 230 'ro.vendor.build.fingerprint')) 231 self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp, 232 'ro.build.nonexistent') 233 234 def test_WriteMountOemScript(self): 235 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 236 self.TEST_OEM_DICTS) 237 script_writer = MockScriptWriter() 238 target_info.WriteMountOemScript(script_writer) 239 self.assertEqual([('Mount', '/oem', None)], script_writer.script) 240 241 def test_WriteDeviceAssertions(self): 242 target_info = BuildInfo(self.TEST_INFO_DICT, None) 243 script_writer = MockScriptWriter() 244 target_info.WriteDeviceAssertions(script_writer, False) 245 self.assertEqual([('AssertDevice', 'product-device')], script_writer.script) 246 247 def test_WriteDeviceAssertions_with_oem_props(self): 248 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 249 self.TEST_OEM_DICTS) 250 script_writer = MockScriptWriter() 251 target_info.WriteDeviceAssertions(script_writer, False) 252 self.assertEqual( 253 [ 254 ('AssertOemProperty', 'ro.product.device', 255 ['device1', 'device2', 'device3'], False), 256 ('AssertOemProperty', 'ro.product.brand', 257 ['brand1', 'brand2', 'brand3'], False), 258 ], 259 script_writer.script) 260 261 def test_WriteFingerprintAssertion_without_oem_props(self): 262 target_info = BuildInfo(self.TEST_INFO_DICT, None) 263 source_info_dict = copy.deepcopy(self.TEST_INFO_DICT) 264 source_info_dict['build.prop']['ro.build.fingerprint'] = ( 265 'source-build-fingerprint') 266 source_info = BuildInfo(source_info_dict, None) 267 268 script_writer = MockScriptWriter() 269 WriteFingerprintAssertion(script_writer, target_info, source_info) 270 self.assertEqual( 271 [('AssertSomeFingerprint', 'source-build-fingerprint', 272 'build-fingerprint')], 273 script_writer.script) 274 275 def test_WriteFingerprintAssertion_with_source_oem_props(self): 276 target_info = BuildInfo(self.TEST_INFO_DICT, None) 277 source_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 278 self.TEST_OEM_DICTS) 279 280 script_writer = MockScriptWriter() 281 WriteFingerprintAssertion(script_writer, target_info, source_info) 282 self.assertEqual( 283 [('AssertFingerprintOrThumbprint', 'build-fingerprint', 284 'build-thumbprint')], 285 script_writer.script) 286 287 def test_WriteFingerprintAssertion_with_target_oem_props(self): 288 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 289 self.TEST_OEM_DICTS) 290 source_info = BuildInfo(self.TEST_INFO_DICT, None) 291 292 script_writer = MockScriptWriter() 293 WriteFingerprintAssertion(script_writer, target_info, source_info) 294 self.assertEqual( 295 [('AssertFingerprintOrThumbprint', 'build-fingerprint', 296 'build-thumbprint')], 297 script_writer.script) 298 299 def test_WriteFingerprintAssertion_with_both_oem_props(self): 300 target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, 301 self.TEST_OEM_DICTS) 302 source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS) 303 source_info_dict['build.prop']['ro.build.thumbprint'] = ( 304 'source-build-thumbprint') 305 source_info = BuildInfo(source_info_dict, self.TEST_OEM_DICTS) 306 307 script_writer = MockScriptWriter() 308 WriteFingerprintAssertion(script_writer, target_info, source_info) 309 self.assertEqual( 310 [('AssertSomeThumbprint', 'build-thumbprint', 311 'source-build-thumbprint')], 312 script_writer.script) 313 314 315class LoadOemDictsTest(unittest.TestCase): 316 317 def tearDown(self): 318 common.Cleanup() 319 320 def test_NoneDict(self): 321 self.assertIsNone(_LoadOemDicts(None)) 322 323 def test_SingleDict(self): 324 dict_file = common.MakeTempFile() 325 with open(dict_file, 'w') as dict_fp: 326 dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n') 327 328 oem_dicts = _LoadOemDicts([dict_file]) 329 self.assertEqual(1, len(oem_dicts)) 330 self.assertEqual('foo', oem_dicts[0]['xyz']) 331 self.assertEqual('bar', oem_dicts[0]['a.b.c']) 332 333 def test_MultipleDicts(self): 334 oem_source = [] 335 for i in range(3): 336 dict_file = common.MakeTempFile() 337 with open(dict_file, 'w') as dict_fp: 338 dict_fp.write( 339 'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i)) 340 oem_source.append(dict_file) 341 342 oem_dicts = _LoadOemDicts(oem_source) 343 self.assertEqual(3, len(oem_dicts)) 344 for i, oem_dict in enumerate(oem_dicts): 345 self.assertEqual('2', oem_dict['def']) 346 self.assertEqual('foo', oem_dict['xyz']) 347 self.assertEqual('bar', oem_dict['a.b.c']) 348 self.assertEqual('{}'.format(i), oem_dict['ro.build.index']) 349 350 351class OtaFromTargetFilesTest(unittest.TestCase): 352 353 TEST_TARGET_INFO_DICT = { 354 'build.prop' : { 355 'ro.product.device' : 'product-device', 356 'ro.build.fingerprint' : 'build-fingerprint-target', 357 'ro.build.version.incremental' : 'build-version-incremental-target', 358 'ro.build.version.sdk' : '27', 359 'ro.build.version.security_patch' : '2017-12-01', 360 'ro.build.date.utc' : '1500000000', 361 }, 362 } 363 364 TEST_SOURCE_INFO_DICT = { 365 'build.prop' : { 366 'ro.product.device' : 'product-device', 367 'ro.build.fingerprint' : 'build-fingerprint-source', 368 'ro.build.version.incremental' : 'build-version-incremental-source', 369 'ro.build.version.sdk' : '25', 370 'ro.build.version.security_patch' : '2016-12-01', 371 'ro.build.date.utc' : '1400000000', 372 }, 373 } 374 375 def setUp(self): 376 self.testdata_dir = test_utils.get_testdata_dir() 377 self.assertTrue(os.path.exists(self.testdata_dir)) 378 379 # Reset the global options as in ota_from_target_files.py. 380 common.OPTIONS.incremental_source = None 381 common.OPTIONS.downgrade = False 382 common.OPTIONS.timestamp = False 383 common.OPTIONS.wipe_user_data = False 384 common.OPTIONS.no_signing = False 385 common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') 386 common.OPTIONS.key_passwords = { 387 common.OPTIONS.package_key : None, 388 } 389 390 common.OPTIONS.search_path = test_utils.get_search_path() 391 self.assertIsNotNone(common.OPTIONS.search_path) 392 393 def tearDown(self): 394 common.Cleanup() 395 396 def test_GetPackageMetadata_abOta_full(self): 397 target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) 398 target_info_dict['ab_update'] = 'true' 399 target_info = BuildInfo(target_info_dict, None) 400 metadata = GetPackageMetadata(target_info) 401 self.assertDictEqual( 402 { 403 'ota-type' : 'AB', 404 'ota-required-cache' : '0', 405 'post-build' : 'build-fingerprint-target', 406 'post-build-incremental' : 'build-version-incremental-target', 407 'post-sdk-level' : '27', 408 'post-security-patch-level' : '2017-12-01', 409 'post-timestamp' : '1500000000', 410 'pre-device' : 'product-device', 411 }, 412 metadata) 413 414 def test_GetPackageMetadata_abOta_incremental(self): 415 target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) 416 target_info_dict['ab_update'] = 'true' 417 target_info = BuildInfo(target_info_dict, None) 418 source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None) 419 common.OPTIONS.incremental_source = '' 420 metadata = GetPackageMetadata(target_info, source_info) 421 self.assertDictEqual( 422 { 423 'ota-type' : 'AB', 424 'ota-required-cache' : '0', 425 'post-build' : 'build-fingerprint-target', 426 'post-build-incremental' : 'build-version-incremental-target', 427 'post-sdk-level' : '27', 428 'post-security-patch-level' : '2017-12-01', 429 'post-timestamp' : '1500000000', 430 'pre-device' : 'product-device', 431 'pre-build' : 'build-fingerprint-source', 432 'pre-build-incremental' : 'build-version-incremental-source', 433 }, 434 metadata) 435 436 def test_GetPackageMetadata_nonAbOta_full(self): 437 target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None) 438 metadata = GetPackageMetadata(target_info) 439 self.assertDictEqual( 440 { 441 'ota-type' : 'BLOCK', 442 'post-build' : 'build-fingerprint-target', 443 'post-build-incremental' : 'build-version-incremental-target', 444 'post-sdk-level' : '27', 445 'post-security-patch-level' : '2017-12-01', 446 'post-timestamp' : '1500000000', 447 'pre-device' : 'product-device', 448 }, 449 metadata) 450 451 def test_GetPackageMetadata_nonAbOta_incremental(self): 452 target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None) 453 source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None) 454 common.OPTIONS.incremental_source = '' 455 metadata = GetPackageMetadata(target_info, source_info) 456 self.assertDictEqual( 457 { 458 'ota-type' : 'BLOCK', 459 'post-build' : 'build-fingerprint-target', 460 'post-build-incremental' : 'build-version-incremental-target', 461 'post-sdk-level' : '27', 462 'post-security-patch-level' : '2017-12-01', 463 'post-timestamp' : '1500000000', 464 'pre-device' : 'product-device', 465 'pre-build' : 'build-fingerprint-source', 466 'pre-build-incremental' : 'build-version-incremental-source', 467 }, 468 metadata) 469 470 def test_GetPackageMetadata_wipe(self): 471 target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None) 472 common.OPTIONS.wipe_user_data = True 473 metadata = GetPackageMetadata(target_info) 474 self.assertDictEqual( 475 { 476 'ota-type' : 'BLOCK', 477 'ota-wipe' : 'yes', 478 'post-build' : 'build-fingerprint-target', 479 'post-build-incremental' : 'build-version-incremental-target', 480 'post-sdk-level' : '27', 481 'post-security-patch-level' : '2017-12-01', 482 'post-timestamp' : '1500000000', 483 'pre-device' : 'product-device', 484 }, 485 metadata) 486 487 @staticmethod 488 def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info): 489 (target_info['build.prop']['ro.build.date.utc'], 490 source_info['build.prop']['ro.build.date.utc']) = ( 491 source_info['build.prop']['ro.build.date.utc'], 492 target_info['build.prop']['ro.build.date.utc']) 493 494 def test_GetPackageMetadata_unintentionalDowngradeDetected(self): 495 target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) 496 source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT) 497 self._test_GetPackageMetadata_swapBuildTimestamps( 498 target_info_dict, source_info_dict) 499 500 target_info = BuildInfo(target_info_dict, None) 501 source_info = BuildInfo(source_info_dict, None) 502 common.OPTIONS.incremental_source = '' 503 self.assertRaises(RuntimeError, GetPackageMetadata, target_info, 504 source_info) 505 506 def test_GetPackageMetadata_downgrade(self): 507 target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT) 508 source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT) 509 self._test_GetPackageMetadata_swapBuildTimestamps( 510 target_info_dict, source_info_dict) 511 512 target_info = BuildInfo(target_info_dict, None) 513 source_info = BuildInfo(source_info_dict, None) 514 common.OPTIONS.incremental_source = '' 515 common.OPTIONS.downgrade = True 516 common.OPTIONS.wipe_user_data = True 517 metadata = GetPackageMetadata(target_info, source_info) 518 self.assertDictEqual( 519 { 520 'ota-downgrade' : 'yes', 521 'ota-type' : 'BLOCK', 522 'ota-wipe' : 'yes', 523 'post-build' : 'build-fingerprint-target', 524 'post-build-incremental' : 'build-version-incremental-target', 525 'post-sdk-level' : '27', 526 'post-security-patch-level' : '2017-12-01', 527 'post-timestamp' : '1400000000', 528 'pre-device' : 'product-device', 529 'pre-build' : 'build-fingerprint-source', 530 'pre-build-incremental' : 'build-version-incremental-source', 531 }, 532 metadata) 533 534 def test_GetTargetFilesZipForSecondaryImages(self): 535 input_file = construct_target_files(secondary=True) 536 target_file = GetTargetFilesZipForSecondaryImages(input_file) 537 538 with zipfile.ZipFile(target_file) as verify_zip: 539 namelist = verify_zip.namelist() 540 541 self.assertIn('META/ab_partitions.txt', namelist) 542 self.assertIn('IMAGES/boot.img', namelist) 543 self.assertIn('IMAGES/system.img', namelist) 544 self.assertIn('IMAGES/vendor.img', namelist) 545 self.assertIn(POSTINSTALL_CONFIG, namelist) 546 547 self.assertNotIn('IMAGES/system_other.img', namelist) 548 self.assertNotIn('IMAGES/system.map', namelist) 549 550 def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self): 551 input_file = construct_target_files(secondary=True) 552 target_file = GetTargetFilesZipForSecondaryImages( 553 input_file, skip_postinstall=True) 554 555 with zipfile.ZipFile(target_file) as verify_zip: 556 namelist = verify_zip.namelist() 557 558 self.assertIn('META/ab_partitions.txt', namelist) 559 self.assertIn('IMAGES/boot.img', namelist) 560 self.assertIn('IMAGES/system.img', namelist) 561 self.assertIn('IMAGES/vendor.img', namelist) 562 563 self.assertNotIn('IMAGES/system_other.img', namelist) 564 self.assertNotIn('IMAGES/system.map', namelist) 565 self.assertNotIn(POSTINSTALL_CONFIG, namelist) 566 567 def test_GetTargetFilesZipWithoutPostinstallConfig(self): 568 input_file = construct_target_files() 569 target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file) 570 with zipfile.ZipFile(target_file) as verify_zip: 571 self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist()) 572 573 def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self): 574 input_file = construct_target_files() 575 common.ZipDelete(input_file, POSTINSTALL_CONFIG) 576 target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file) 577 with zipfile.ZipFile(target_file) as verify_zip: 578 self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist()) 579 580 def _test_FinalizeMetadata(self, large_entry=False): 581 entries = [ 582 'required-entry1', 583 'required-entry2', 584 ] 585 zip_file = PropertyFilesTest.construct_zip_package(entries) 586 # Add a large entry of 1 GiB if requested. 587 if large_entry: 588 with zipfile.ZipFile(zip_file, 'a') as zip_fp: 589 zip_fp.writestr( 590 # Using 'zoo' so that the entry stays behind others after signing. 591 'zoo', 592 'A' * 1024 * 1024 * 1024, 593 zipfile.ZIP_STORED) 594 595 metadata = {} 596 output_file = common.MakeTempFile(suffix='.zip') 597 needed_property_files = ( 598 TestPropertyFiles(), 599 ) 600 FinalizeMetadata(metadata, zip_file, output_file, needed_property_files) 601 self.assertIn('ota-test-property-files', metadata) 602 603 def test_FinalizeMetadata(self): 604 self._test_FinalizeMetadata() 605 606 def test_FinalizeMetadata_withNoSigning(self): 607 common.OPTIONS.no_signing = True 608 self._test_FinalizeMetadata() 609 610 def test_FinalizeMetadata_largeEntry(self): 611 self._test_FinalizeMetadata(large_entry=True) 612 613 def test_FinalizeMetadata_largeEntry_withNoSigning(self): 614 common.OPTIONS.no_signing = True 615 self._test_FinalizeMetadata(large_entry=True) 616 617 def test_FinalizeMetadata_insufficientSpace(self): 618 entries = [ 619 'required-entry1', 620 'required-entry2', 621 'optional-entry1', 622 'optional-entry2', 623 ] 624 zip_file = PropertyFilesTest.construct_zip_package(entries) 625 with zipfile.ZipFile(zip_file, 'a') as zip_fp: 626 zip_fp.writestr( 627 # 'foo-entry1' will appear ahead of all other entries (in alphabetical 628 # order) after the signing, which will in turn trigger the 629 # InsufficientSpaceException and an automatic retry. 630 'foo-entry1', 631 'A' * 1024 * 1024, 632 zipfile.ZIP_STORED) 633 634 metadata = {} 635 needed_property_files = ( 636 TestPropertyFiles(), 637 ) 638 output_file = common.MakeTempFile(suffix='.zip') 639 FinalizeMetadata(metadata, zip_file, output_file, needed_property_files) 640 self.assertIn('ota-test-property-files', metadata) 641 642 643class TestPropertyFiles(PropertyFiles): 644 """A class that extends PropertyFiles for testing purpose.""" 645 646 def __init__(self): 647 super(TestPropertyFiles, self).__init__() 648 self.name = 'ota-test-property-files' 649 self.required = ( 650 'required-entry1', 651 'required-entry2', 652 ) 653 self.optional = ( 654 'optional-entry1', 655 'optional-entry2', 656 ) 657 658 659class PropertyFilesTest(unittest.TestCase): 660 661 def setUp(self): 662 common.OPTIONS.no_signing = False 663 664 def tearDown(self): 665 common.Cleanup() 666 667 @staticmethod 668 def construct_zip_package(entries): 669 zip_file = common.MakeTempFile(suffix='.zip') 670 with zipfile.ZipFile(zip_file, 'w') as zip_fp: 671 for entry in entries: 672 zip_fp.writestr( 673 entry, 674 entry.replace('.', '-').upper(), 675 zipfile.ZIP_STORED) 676 return zip_file 677 678 @staticmethod 679 def _parse_property_files_string(data): 680 result = {} 681 for token in data.split(','): 682 name, info = token.split(':', 1) 683 result[name] = info 684 return result 685 686 def _verify_entries(self, input_file, tokens, entries): 687 for entry in entries: 688 offset, size = map(int, tokens[entry].split(':')) 689 with open(input_file, 'rb') as input_fp: 690 input_fp.seek(offset) 691 if entry == 'metadata': 692 expected = b'META-INF/COM/ANDROID/METADATA' 693 else: 694 expected = entry.replace('.', '-').upper().encode() 695 self.assertEqual(expected, input_fp.read(size)) 696 697 def test_Compute(self): 698 entries = ( 699 'required-entry1', 700 'required-entry2', 701 ) 702 zip_file = self.construct_zip_package(entries) 703 property_files = TestPropertyFiles() 704 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 705 property_files_string = property_files.Compute(zip_fp) 706 707 tokens = self._parse_property_files_string(property_files_string) 708 self.assertEqual(3, len(tokens)) 709 self._verify_entries(zip_file, tokens, entries) 710 711 def test_Compute_withOptionalEntries(self): 712 entries = ( 713 'required-entry1', 714 'required-entry2', 715 'optional-entry1', 716 'optional-entry2', 717 ) 718 zip_file = self.construct_zip_package(entries) 719 property_files = TestPropertyFiles() 720 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 721 property_files_string = property_files.Compute(zip_fp) 722 723 tokens = self._parse_property_files_string(property_files_string) 724 self.assertEqual(5, len(tokens)) 725 self._verify_entries(zip_file, tokens, entries) 726 727 def test_Compute_missingRequiredEntry(self): 728 entries = ( 729 'required-entry2', 730 ) 731 zip_file = self.construct_zip_package(entries) 732 property_files = TestPropertyFiles() 733 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 734 self.assertRaises(KeyError, property_files.Compute, zip_fp) 735 736 def test_Finalize(self): 737 entries = [ 738 'required-entry1', 739 'required-entry2', 740 'META-INF/com/android/metadata', 741 ] 742 zip_file = self.construct_zip_package(entries) 743 property_files = TestPropertyFiles() 744 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 745 # pylint: disable=protected-access 746 raw_metadata = property_files._GetPropertyFilesString( 747 zip_fp, reserve_space=False) 748 streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata)) 749 tokens = self._parse_property_files_string(streaming_metadata) 750 751 self.assertEqual(3, len(tokens)) 752 # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the 753 # streaming metadata. 754 entries[2] = 'metadata' 755 self._verify_entries(zip_file, tokens, entries) 756 757 def test_Finalize_assertReservedLength(self): 758 entries = ( 759 'required-entry1', 760 'required-entry2', 761 'optional-entry1', 762 'optional-entry2', 763 'META-INF/com/android/metadata', 764 ) 765 zip_file = self.construct_zip_package(entries) 766 property_files = TestPropertyFiles() 767 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 768 # First get the raw metadata string (i.e. without padding space). 769 # pylint: disable=protected-access 770 raw_metadata = property_files._GetPropertyFilesString( 771 zip_fp, reserve_space=False) 772 raw_length = len(raw_metadata) 773 774 # Now pass in the exact expected length. 775 streaming_metadata = property_files.Finalize(zip_fp, raw_length) 776 self.assertEqual(raw_length, len(streaming_metadata)) 777 778 # Or pass in insufficient length. 779 self.assertRaises( 780 PropertyFiles.InsufficientSpaceException, 781 property_files.Finalize, 782 zip_fp, 783 raw_length - 1) 784 785 # Or pass in a much larger size. 786 streaming_metadata = property_files.Finalize( 787 zip_fp, 788 raw_length + 20) 789 self.assertEqual(raw_length + 20, len(streaming_metadata)) 790 self.assertEqual(' ' * 20, streaming_metadata[raw_length:]) 791 792 def test_Verify(self): 793 entries = ( 794 'required-entry1', 795 'required-entry2', 796 'optional-entry1', 797 'optional-entry2', 798 'META-INF/com/android/metadata', 799 ) 800 zip_file = self.construct_zip_package(entries) 801 property_files = TestPropertyFiles() 802 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 803 # First get the raw metadata string (i.e. without padding space). 804 # pylint: disable=protected-access 805 raw_metadata = property_files._GetPropertyFilesString( 806 zip_fp, reserve_space=False) 807 808 # Should pass the test if verification passes. 809 property_files.Verify(zip_fp, raw_metadata) 810 811 # Or raise on verification failure. 812 self.assertRaises( 813 AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x') 814 815 816class StreamingPropertyFilesTest(PropertyFilesTest): 817 """Additional sanity checks specialized for StreamingPropertyFiles.""" 818 819 def test_init(self): 820 property_files = StreamingPropertyFiles() 821 self.assertEqual('ota-streaming-property-files', property_files.name) 822 self.assertEqual( 823 ( 824 'payload.bin', 825 'payload_properties.txt', 826 ), 827 property_files.required) 828 self.assertEqual( 829 ( 830 'care_map.txt', 831 'compatibility.zip', 832 ), 833 property_files.optional) 834 835 def test_Compute(self): 836 entries = ( 837 'payload.bin', 838 'payload_properties.txt', 839 'care_map.txt', 840 'compatibility.zip', 841 ) 842 zip_file = self.construct_zip_package(entries) 843 property_files = StreamingPropertyFiles() 844 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 845 property_files_string = property_files.Compute(zip_fp) 846 847 tokens = self._parse_property_files_string(property_files_string) 848 self.assertEqual(5, len(tokens)) 849 self._verify_entries(zip_file, tokens, entries) 850 851 def test_Finalize(self): 852 entries = [ 853 'payload.bin', 854 'payload_properties.txt', 855 'care_map.txt', 856 'compatibility.zip', 857 'META-INF/com/android/metadata', 858 ] 859 zip_file = self.construct_zip_package(entries) 860 property_files = StreamingPropertyFiles() 861 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 862 # pylint: disable=protected-access 863 raw_metadata = property_files._GetPropertyFilesString( 864 zip_fp, reserve_space=False) 865 streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata)) 866 tokens = self._parse_property_files_string(streaming_metadata) 867 868 self.assertEqual(5, len(tokens)) 869 # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the 870 # streaming metadata. 871 entries[4] = 'metadata' 872 self._verify_entries(zip_file, tokens, entries) 873 874 def test_Verify(self): 875 entries = ( 876 'payload.bin', 877 'payload_properties.txt', 878 'care_map.txt', 879 'compatibility.zip', 880 'META-INF/com/android/metadata', 881 ) 882 zip_file = self.construct_zip_package(entries) 883 property_files = StreamingPropertyFiles() 884 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 885 # First get the raw metadata string (i.e. without padding space). 886 # pylint: disable=protected-access 887 raw_metadata = property_files._GetPropertyFilesString( 888 zip_fp, reserve_space=False) 889 890 # Should pass the test if verification passes. 891 property_files.Verify(zip_fp, raw_metadata) 892 893 # Or raise on verification failure. 894 self.assertRaises( 895 AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x') 896 897 898class AbOtaPropertyFilesTest(PropertyFilesTest): 899 """Additional sanity checks specialized for AbOtaPropertyFiles.""" 900 901 # The size for payload and metadata signature size. 902 SIGNATURE_SIZE = 256 903 904 def setUp(self): 905 self.testdata_dir = test_utils.get_testdata_dir() 906 self.assertTrue(os.path.exists(self.testdata_dir)) 907 908 common.OPTIONS.wipe_user_data = False 909 common.OPTIONS.payload_signer = None 910 common.OPTIONS.payload_signer_args = None 911 common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') 912 common.OPTIONS.key_passwords = { 913 common.OPTIONS.package_key : None, 914 } 915 916 def test_init(self): 917 property_files = AbOtaPropertyFiles() 918 self.assertEqual('ota-property-files', property_files.name) 919 self.assertEqual( 920 ( 921 'payload.bin', 922 'payload_properties.txt', 923 ), 924 property_files.required) 925 self.assertEqual( 926 ( 927 'care_map.txt', 928 'compatibility.zip', 929 ), 930 property_files.optional) 931 932 def test_GetPayloadMetadataOffsetAndSize(self): 933 target_file = construct_target_files() 934 payload = Payload() 935 payload.Generate(target_file) 936 937 payload_signer = PayloadSigner() 938 payload.Sign(payload_signer) 939 940 output_file = common.MakeTempFile(suffix='.zip') 941 with zipfile.ZipFile(output_file, 'w') as output_zip: 942 payload.WriteToZip(output_zip) 943 944 # Find out the payload metadata offset and size. 945 property_files = AbOtaPropertyFiles() 946 with zipfile.ZipFile(output_file) as input_zip: 947 # pylint: disable=protected-access 948 payload_offset, metadata_total = ( 949 property_files._GetPayloadMetadataOffsetAndSize(input_zip)) 950 951 # Read in the metadata signature directly. 952 with open(output_file, 'rb') as verify_fp: 953 verify_fp.seek(payload_offset + metadata_total - self.SIGNATURE_SIZE) 954 metadata_signature = verify_fp.read(self.SIGNATURE_SIZE) 955 956 # Now we extract the metadata hash via brillo_update_payload script, which 957 # will serve as the oracle result. 958 payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") 959 metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") 960 cmd = ['brillo_update_payload', 'hash', 961 '--unsigned_payload', payload.payload_file, 962 '--signature_size', str(self.SIGNATURE_SIZE), 963 '--metadata_hash_file', metadata_sig_file, 964 '--payload_hash_file', payload_sig_file] 965 proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 966 stdoutdata, _ = proc.communicate() 967 self.assertEqual( 968 0, proc.returncode, 969 'Failed to run brillo_update_payload: {}'.format(stdoutdata)) 970 971 signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) 972 973 # Finally we can compare the two signatures. 974 with open(signed_metadata_sig_file, 'rb') as verify_fp: 975 self.assertEqual(verify_fp.read(), metadata_signature) 976 977 @staticmethod 978 def construct_zip_package_withValidPayload(with_metadata=False): 979 # Cannot use construct_zip_package() since we need a "valid" payload.bin. 980 target_file = construct_target_files() 981 payload = Payload() 982 payload.Generate(target_file) 983 984 payload_signer = PayloadSigner() 985 payload.Sign(payload_signer) 986 987 zip_file = common.MakeTempFile(suffix='.zip') 988 with zipfile.ZipFile(zip_file, 'w') as zip_fp: 989 # 'payload.bin', 990 payload.WriteToZip(zip_fp) 991 992 # Other entries. 993 entries = ['care_map.txt', 'compatibility.zip'] 994 995 # Put META-INF/com/android/metadata if needed. 996 if with_metadata: 997 entries.append('META-INF/com/android/metadata') 998 999 for entry in entries: 1000 zip_fp.writestr( 1001 entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED) 1002 1003 return zip_file 1004 1005 def test_Compute(self): 1006 zip_file = self.construct_zip_package_withValidPayload() 1007 property_files = AbOtaPropertyFiles() 1008 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 1009 property_files_string = property_files.Compute(zip_fp) 1010 1011 tokens = self._parse_property_files_string(property_files_string) 1012 # "6" indcludes the four entries above, one metadata entry, and one entry 1013 # for payload-metadata.bin. 1014 self.assertEqual(6, len(tokens)) 1015 self._verify_entries( 1016 zip_file, tokens, ('care_map.txt', 'compatibility.zip')) 1017 1018 def test_Finalize(self): 1019 zip_file = self.construct_zip_package_withValidPayload(with_metadata=True) 1020 property_files = AbOtaPropertyFiles() 1021 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 1022 # pylint: disable=protected-access 1023 raw_metadata = property_files._GetPropertyFilesString( 1024 zip_fp, reserve_space=False) 1025 property_files_string = property_files.Finalize(zip_fp, len(raw_metadata)) 1026 1027 tokens = self._parse_property_files_string(property_files_string) 1028 # "6" indcludes the four entries above, one metadata entry, and one entry 1029 # for payload-metadata.bin. 1030 self.assertEqual(6, len(tokens)) 1031 self._verify_entries( 1032 zip_file, tokens, ('care_map.txt', 'compatibility.zip')) 1033 1034 def test_Verify(self): 1035 zip_file = self.construct_zip_package_withValidPayload(with_metadata=True) 1036 property_files = AbOtaPropertyFiles() 1037 with zipfile.ZipFile(zip_file, 'r') as zip_fp: 1038 # pylint: disable=protected-access 1039 raw_metadata = property_files._GetPropertyFilesString( 1040 zip_fp, reserve_space=False) 1041 1042 property_files.Verify(zip_fp, raw_metadata) 1043 1044 1045class NonAbOtaPropertyFilesTest(PropertyFilesTest): 1046 """Additional sanity checks specialized for NonAbOtaPropertyFiles.""" 1047 1048 def test_init(self): 1049 property_files = NonAbOtaPropertyFiles() 1050 self.assertEqual('ota-property-files', property_files.name) 1051 self.assertEqual((), property_files.required) 1052 self.assertEqual((), property_files.optional) 1053 1054 def test_Compute(self): 1055 entries = () 1056 zip_file = self.construct_zip_package(entries) 1057 property_files = NonAbOtaPropertyFiles() 1058 with zipfile.ZipFile(zip_file) as zip_fp: 1059 property_files_string = property_files.Compute(zip_fp) 1060 1061 tokens = self._parse_property_files_string(property_files_string) 1062 self.assertEqual(1, len(tokens)) 1063 self._verify_entries(zip_file, tokens, entries) 1064 1065 def test_Finalize(self): 1066 entries = [ 1067 'META-INF/com/android/metadata', 1068 ] 1069 zip_file = self.construct_zip_package(entries) 1070 property_files = NonAbOtaPropertyFiles() 1071 with zipfile.ZipFile(zip_file) as zip_fp: 1072 # pylint: disable=protected-access 1073 raw_metadata = property_files._GetPropertyFilesString( 1074 zip_fp, reserve_space=False) 1075 property_files_string = property_files.Finalize(zip_fp, len(raw_metadata)) 1076 tokens = self._parse_property_files_string(property_files_string) 1077 1078 self.assertEqual(1, len(tokens)) 1079 # 'META-INF/com/android/metadata' will be key'd as 'metadata'. 1080 entries[0] = 'metadata' 1081 self._verify_entries(zip_file, tokens, entries) 1082 1083 def test_Verify(self): 1084 entries = ( 1085 'META-INF/com/android/metadata', 1086 ) 1087 zip_file = self.construct_zip_package(entries) 1088 property_files = NonAbOtaPropertyFiles() 1089 with zipfile.ZipFile(zip_file) as zip_fp: 1090 # pylint: disable=protected-access 1091 raw_metadata = property_files._GetPropertyFilesString( 1092 zip_fp, reserve_space=False) 1093 1094 property_files.Verify(zip_fp, raw_metadata) 1095 1096 1097class PayloadSignerTest(unittest.TestCase): 1098 1099 SIGFILE = 'sigfile.bin' 1100 SIGNED_SIGFILE = 'signed-sigfile.bin' 1101 1102 def setUp(self): 1103 self.testdata_dir = test_utils.get_testdata_dir() 1104 self.assertTrue(os.path.exists(self.testdata_dir)) 1105 1106 common.OPTIONS.payload_signer = None 1107 common.OPTIONS.payload_signer_args = [] 1108 common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') 1109 common.OPTIONS.key_passwords = { 1110 common.OPTIONS.package_key : None, 1111 } 1112 1113 def tearDown(self): 1114 common.Cleanup() 1115 1116 def _assertFilesEqual(self, file1, file2): 1117 with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2: 1118 self.assertEqual(fp1.read(), fp2.read()) 1119 1120 def test_init(self): 1121 payload_signer = PayloadSigner() 1122 self.assertEqual('openssl', payload_signer.signer) 1123 1124 def test_init_withPassword(self): 1125 common.OPTIONS.package_key = os.path.join( 1126 self.testdata_dir, 'testkey_with_passwd') 1127 common.OPTIONS.key_passwords = { 1128 common.OPTIONS.package_key : 'foo', 1129 } 1130 payload_signer = PayloadSigner() 1131 self.assertEqual('openssl', payload_signer.signer) 1132 1133 def test_init_withExternalSigner(self): 1134 common.OPTIONS.payload_signer = 'abc' 1135 common.OPTIONS.payload_signer_args = ['arg1', 'arg2'] 1136 payload_signer = PayloadSigner() 1137 self.assertEqual('abc', payload_signer.signer) 1138 self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args) 1139 1140 def test_Sign(self): 1141 payload_signer = PayloadSigner() 1142 input_file = os.path.join(self.testdata_dir, self.SIGFILE) 1143 signed_file = payload_signer.Sign(input_file) 1144 1145 verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) 1146 self._assertFilesEqual(verify_file, signed_file) 1147 1148 def test_Sign_withExternalSigner_openssl(self): 1149 """Uses openssl as the external payload signer.""" 1150 common.OPTIONS.payload_signer = 'openssl' 1151 common.OPTIONS.payload_signer_args = [ 1152 'pkeyutl', '-sign', '-keyform', 'DER', '-inkey', 1153 os.path.join(self.testdata_dir, 'testkey.pk8'), 1154 '-pkeyopt', 'digest:sha256'] 1155 payload_signer = PayloadSigner() 1156 input_file = os.path.join(self.testdata_dir, self.SIGFILE) 1157 signed_file = payload_signer.Sign(input_file) 1158 1159 verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) 1160 self._assertFilesEqual(verify_file, signed_file) 1161 1162 def test_Sign_withExternalSigner_script(self): 1163 """Uses testdata/payload_signer.sh as the external payload signer.""" 1164 common.OPTIONS.payload_signer = os.path.join( 1165 self.testdata_dir, 'payload_signer.sh') 1166 common.OPTIONS.payload_signer_args = [ 1167 os.path.join(self.testdata_dir, 'testkey.pk8')] 1168 payload_signer = PayloadSigner() 1169 input_file = os.path.join(self.testdata_dir, self.SIGFILE) 1170 signed_file = payload_signer.Sign(input_file) 1171 1172 verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) 1173 self._assertFilesEqual(verify_file, signed_file) 1174 1175 1176class PayloadTest(unittest.TestCase): 1177 1178 def setUp(self): 1179 self.testdata_dir = test_utils.get_testdata_dir() 1180 self.assertTrue(os.path.exists(self.testdata_dir)) 1181 1182 common.OPTIONS.wipe_user_data = False 1183 common.OPTIONS.payload_signer = None 1184 common.OPTIONS.payload_signer_args = None 1185 common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') 1186 common.OPTIONS.key_passwords = { 1187 common.OPTIONS.package_key : None, 1188 } 1189 1190 def tearDown(self): 1191 common.Cleanup() 1192 1193 @staticmethod 1194 def _create_payload_full(secondary=False): 1195 target_file = construct_target_files(secondary) 1196 payload = Payload(secondary) 1197 payload.Generate(target_file) 1198 return payload 1199 1200 @staticmethod 1201 def _create_payload_incremental(): 1202 target_file = construct_target_files() 1203 source_file = construct_target_files() 1204 payload = Payload() 1205 payload.Generate(target_file, source_file) 1206 return payload 1207 1208 def test_Generate_full(self): 1209 payload = self._create_payload_full() 1210 self.assertTrue(os.path.exists(payload.payload_file)) 1211 1212 def test_Generate_incremental(self): 1213 payload = self._create_payload_incremental() 1214 self.assertTrue(os.path.exists(payload.payload_file)) 1215 1216 def test_Generate_additionalArgs(self): 1217 target_file = construct_target_files() 1218 source_file = construct_target_files() 1219 payload = Payload() 1220 # This should work the same as calling payload.Generate(target_file, 1221 # source_file). 1222 payload.Generate( 1223 target_file, additional_args=["--source_image", source_file]) 1224 self.assertTrue(os.path.exists(payload.payload_file)) 1225 1226 def test_Generate_invalidInput(self): 1227 target_file = construct_target_files() 1228 common.ZipDelete(target_file, 'IMAGES/vendor.img') 1229 payload = Payload() 1230 self.assertRaises(AssertionError, payload.Generate, target_file) 1231 1232 def test_Sign_full(self): 1233 payload = self._create_payload_full() 1234 payload.Sign(PayloadSigner()) 1235 1236 output_file = common.MakeTempFile(suffix='.zip') 1237 with zipfile.ZipFile(output_file, 'w') as output_zip: 1238 payload.WriteToZip(output_zip) 1239 1240 import check_ota_package_signature 1241 check_ota_package_signature.VerifyAbOtaPayload( 1242 os.path.join(self.testdata_dir, 'testkey.x509.pem'), 1243 output_file) 1244 1245 def test_Sign_incremental(self): 1246 payload = self._create_payload_incremental() 1247 payload.Sign(PayloadSigner()) 1248 1249 output_file = common.MakeTempFile(suffix='.zip') 1250 with zipfile.ZipFile(output_file, 'w') as output_zip: 1251 payload.WriteToZip(output_zip) 1252 1253 import check_ota_package_signature 1254 check_ota_package_signature.VerifyAbOtaPayload( 1255 os.path.join(self.testdata_dir, 'testkey.x509.pem'), 1256 output_file) 1257 1258 def test_Sign_withDataWipe(self): 1259 common.OPTIONS.wipe_user_data = True 1260 payload = self._create_payload_full() 1261 payload.Sign(PayloadSigner()) 1262 1263 with open(payload.payload_properties) as properties_fp: 1264 self.assertIn("POWERWASH=1", properties_fp.read()) 1265 1266 def test_Sign_secondary(self): 1267 payload = self._create_payload_full(secondary=True) 1268 payload.Sign(PayloadSigner()) 1269 1270 with open(payload.payload_properties) as properties_fp: 1271 self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read()) 1272 1273 def test_Sign_badSigner(self): 1274 """Tests that signing failure can be captured.""" 1275 payload = self._create_payload_full() 1276 payload_signer = PayloadSigner() 1277 payload_signer.signer_args.append('bad-option') 1278 self.assertRaises(AssertionError, payload.Sign, payload_signer) 1279 1280 def test_WriteToZip(self): 1281 payload = self._create_payload_full() 1282 payload.Sign(PayloadSigner()) 1283 1284 output_file = common.MakeTempFile(suffix='.zip') 1285 with zipfile.ZipFile(output_file, 'w') as output_zip: 1286 payload.WriteToZip(output_zip) 1287 1288 with zipfile.ZipFile(output_file) as verify_zip: 1289 # First make sure we have the essential entries. 1290 namelist = verify_zip.namelist() 1291 self.assertIn(Payload.PAYLOAD_BIN, namelist) 1292 self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist) 1293 1294 # Then assert these entries are stored. 1295 for entry_info in verify_zip.infolist(): 1296 if entry_info.filename not in (Payload.PAYLOAD_BIN, 1297 Payload.PAYLOAD_PROPERTIES_TXT): 1298 continue 1299 self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type) 1300 1301 def test_WriteToZip_unsignedPayload(self): 1302 """Unsigned payloads should not be allowed to be written to zip.""" 1303 payload = self._create_payload_full() 1304 1305 output_file = common.MakeTempFile(suffix='.zip') 1306 with zipfile.ZipFile(output_file, 'w') as output_zip: 1307 self.assertRaises(AssertionError, payload.WriteToZip, output_zip) 1308 1309 # Also test with incremental payload. 1310 payload = self._create_payload_incremental() 1311 1312 output_file = common.MakeTempFile(suffix='.zip') 1313 with zipfile.ZipFile(output_file, 'w') as output_zip: 1314 self.assertRaises(AssertionError, payload.WriteToZip, output_zip) 1315 1316 def test_WriteToZip_secondary(self): 1317 payload = self._create_payload_full(secondary=True) 1318 payload.Sign(PayloadSigner()) 1319 1320 output_file = common.MakeTempFile(suffix='.zip') 1321 with zipfile.ZipFile(output_file, 'w') as output_zip: 1322 payload.WriteToZip(output_zip) 1323 1324 with zipfile.ZipFile(output_file) as verify_zip: 1325 # First make sure we have the essential entries. 1326 namelist = verify_zip.namelist() 1327 self.assertIn(Payload.SECONDARY_PAYLOAD_BIN, namelist) 1328 self.assertIn(Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist) 1329 1330 # Then assert these entries are stored. 1331 for entry_info in verify_zip.infolist(): 1332 if entry_info.filename not in ( 1333 Payload.SECONDARY_PAYLOAD_BIN, 1334 Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT): 1335 continue 1336 self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type) 1337