1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import copy
18import os
19import os.path
20import subprocess
21import unittest
22import zipfile
23
24import common
25import test_utils
26from ota_from_target_files import (
27    _LoadOemDicts, AbOtaPropertyFiles, BuildInfo, FinalizeMetadata,
28    GetPackageMetadata, GetTargetFilesZipForSecondaryImages,
29    GetTargetFilesZipWithoutPostinstallConfig, NonAbOtaPropertyFiles,
30    Payload, PayloadSigner, POSTINSTALL_CONFIG, PropertyFiles,
31    StreamingPropertyFiles, WriteFingerprintAssertion)
32
33
34def construct_target_files(secondary=False):
35  """Returns a target-files.zip file for generating OTA packages."""
36  target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
37  with zipfile.ZipFile(target_files, 'w') as target_files_zip:
38    # META/update_engine_config.txt
39    target_files_zip.writestr(
40        'META/update_engine_config.txt',
41        "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
42
43    # META/postinstall_config.txt
44    target_files_zip.writestr(
45        POSTINSTALL_CONFIG,
46        '\n'.join([
47            "RUN_POSTINSTALL_system=true",
48            "POSTINSTALL_PATH_system=system/bin/otapreopt_script",
49            "FILESYSTEM_TYPE_system=ext4",
50            "POSTINSTALL_OPTIONAL_system=true",
51        ]))
52
53    # META/ab_partitions.txt
54    ab_partitions = ['boot', 'system', 'vendor']
55    target_files_zip.writestr(
56        'META/ab_partitions.txt',
57        '\n'.join(ab_partitions))
58
59    # Create dummy images for each of them.
60    for partition in ab_partitions:
61      target_files_zip.writestr('IMAGES/' + partition + '.img',
62                                os.urandom(len(partition)))
63
64    if secondary:
65      target_files_zip.writestr('IMAGES/system_other.img',
66                                os.urandom(len("system_other")))
67
68  return target_files
69
70
71class MockScriptWriter(object):
72  """A class that mocks edify_generator.EdifyGenerator.
73
74  It simply pushes the incoming arguments onto script stack, which is to assert
75  the calls to EdifyGenerator functions.
76  """
77
78  def __init__(self):
79    self.script = []
80
81  def Mount(self, *args):
82    self.script.append(('Mount',) + args)
83
84  def AssertDevice(self, *args):
85    self.script.append(('AssertDevice',) + args)
86
87  def AssertOemProperty(self, *args):
88    self.script.append(('AssertOemProperty',) + args)
89
90  def AssertFingerprintOrThumbprint(self, *args):
91    self.script.append(('AssertFingerprintOrThumbprint',) + args)
92
93  def AssertSomeFingerprint(self, *args):
94    self.script.append(('AssertSomeFingerprint',) + args)
95
96  def AssertSomeThumbprint(self, *args):
97    self.script.append(('AssertSomeThumbprint',) + args)
98
99
100class BuildInfoTest(unittest.TestCase):
101
102  TEST_INFO_DICT = {
103      'build.prop' : {
104          'ro.product.device' : 'product-device',
105          'ro.product.name' : 'product-name',
106          'ro.build.fingerprint' : 'build-fingerprint',
107          'ro.build.foo' : 'build-foo',
108      },
109      'vendor.build.prop' : {
110          'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
111      },
112      'property1' : 'value1',
113      'property2' : 4096,
114  }
115
116  TEST_INFO_DICT_USES_OEM_PROPS = {
117      'build.prop' : {
118          'ro.product.name' : 'product-name',
119          'ro.build.thumbprint' : 'build-thumbprint',
120          'ro.build.bar' : 'build-bar',
121      },
122      'vendor.build.prop' : {
123          'ro.vendor.build.fingerprint' : 'vendor-build-fingerprint',
124      },
125      'property1' : 'value1',
126      'property2' : 4096,
127      'oem_fingerprint_properties' : 'ro.product.device ro.product.brand',
128  }
129
130  TEST_OEM_DICTS = [
131      {
132          'ro.product.brand' : 'brand1',
133          'ro.product.device' : 'device1',
134      },
135      {
136          'ro.product.brand' : 'brand2',
137          'ro.product.device' : 'device2',
138      },
139      {
140          'ro.product.brand' : 'brand3',
141          'ro.product.device' : 'device3',
142      },
143  ]
144
145  def test_init(self):
146    target_info = BuildInfo(self.TEST_INFO_DICT, None)
147    self.assertEqual('product-device', target_info.device)
148    self.assertEqual('build-fingerprint', target_info.fingerprint)
149    self.assertFalse(target_info.is_ab)
150    self.assertIsNone(target_info.oem_props)
151
152  def test_init_with_oem_props(self):
153    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
154                            self.TEST_OEM_DICTS)
155    self.assertEqual('device1', target_info.device)
156    self.assertEqual('brand1/product-name/device1:build-thumbprint',
157                     target_info.fingerprint)
158
159    # Swap the order in oem_dicts, which would lead to different BuildInfo.
160    oem_dicts = copy.copy(self.TEST_OEM_DICTS)
161    oem_dicts[0], oem_dicts[2] = oem_dicts[2], oem_dicts[0]
162    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS, oem_dicts)
163    self.assertEqual('device3', target_info.device)
164    self.assertEqual('brand3/product-name/device3:build-thumbprint',
165                     target_info.fingerprint)
166
167    # Missing oem_dict should be rejected.
168    self.assertRaises(AssertionError, BuildInfo,
169                      self.TEST_INFO_DICT_USES_OEM_PROPS, None)
170
171  def test___getitem__(self):
172    target_info = BuildInfo(self.TEST_INFO_DICT, None)
173    self.assertEqual('value1', target_info['property1'])
174    self.assertEqual(4096, target_info['property2'])
175    self.assertEqual('build-foo', target_info['build.prop']['ro.build.foo'])
176
177  def test___getitem__with_oem_props(self):
178    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
179                            self.TEST_OEM_DICTS)
180    self.assertEqual('value1', target_info['property1'])
181    self.assertEqual(4096, target_info['property2'])
182    self.assertRaises(KeyError,
183                      lambda: target_info['build.prop']['ro.build.foo'])
184
185  def test_get(self):
186    target_info = BuildInfo(self.TEST_INFO_DICT, None)
187    self.assertEqual('value1', target_info.get('property1'))
188    self.assertEqual(4096, target_info.get('property2'))
189    self.assertEqual(4096, target_info.get('property2', 1024))
190    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
191    self.assertEqual('build-foo', target_info.get('build.prop')['ro.build.foo'])
192
193  def test_get_with_oem_props(self):
194    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
195                            self.TEST_OEM_DICTS)
196    self.assertEqual('value1', target_info.get('property1'))
197    self.assertEqual(4096, target_info.get('property2'))
198    self.assertEqual(4096, target_info.get('property2', 1024))
199    self.assertEqual(1024, target_info.get('property-nonexistent', 1024))
200    self.assertIsNone(target_info.get('build.prop').get('ro.build.foo'))
201    self.assertRaises(KeyError,
202                      lambda: target_info.get('build.prop')['ro.build.foo'])
203
204  def test_GetBuildProp(self):
205    target_info = BuildInfo(self.TEST_INFO_DICT, None)
206    self.assertEqual('build-foo', target_info.GetBuildProp('ro.build.foo'))
207    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
208                      'ro.build.nonexistent')
209
210  def test_GetBuildProp_with_oem_props(self):
211    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
212                            self.TEST_OEM_DICTS)
213    self.assertEqual('build-bar', target_info.GetBuildProp('ro.build.bar'))
214    self.assertRaises(common.ExternalError, target_info.GetBuildProp,
215                      'ro.build.nonexistent')
216
217  def test_GetVendorBuildProp(self):
218    target_info = BuildInfo(self.TEST_INFO_DICT, None)
219    self.assertEqual('vendor-build-fingerprint',
220                     target_info.GetVendorBuildProp(
221                         'ro.vendor.build.fingerprint'))
222    self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
223                      'ro.build.nonexistent')
224
225  def test_GetVendorBuildProp_with_oem_props(self):
226    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
227                            self.TEST_OEM_DICTS)
228    self.assertEqual('vendor-build-fingerprint',
229                     target_info.GetVendorBuildProp(
230                         'ro.vendor.build.fingerprint'))
231    self.assertRaises(common.ExternalError, target_info.GetVendorBuildProp,
232                      'ro.build.nonexistent')
233
234  def test_WriteMountOemScript(self):
235    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
236                            self.TEST_OEM_DICTS)
237    script_writer = MockScriptWriter()
238    target_info.WriteMountOemScript(script_writer)
239    self.assertEqual([('Mount', '/oem', None)], script_writer.script)
240
241  def test_WriteDeviceAssertions(self):
242    target_info = BuildInfo(self.TEST_INFO_DICT, None)
243    script_writer = MockScriptWriter()
244    target_info.WriteDeviceAssertions(script_writer, False)
245    self.assertEqual([('AssertDevice', 'product-device')], script_writer.script)
246
247  def test_WriteDeviceAssertions_with_oem_props(self):
248    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
249                            self.TEST_OEM_DICTS)
250    script_writer = MockScriptWriter()
251    target_info.WriteDeviceAssertions(script_writer, False)
252    self.assertEqual(
253        [
254            ('AssertOemProperty', 'ro.product.device',
255             ['device1', 'device2', 'device3'], False),
256            ('AssertOemProperty', 'ro.product.brand',
257             ['brand1', 'brand2', 'brand3'], False),
258        ],
259        script_writer.script)
260
261  def test_WriteFingerprintAssertion_without_oem_props(self):
262    target_info = BuildInfo(self.TEST_INFO_DICT, None)
263    source_info_dict = copy.deepcopy(self.TEST_INFO_DICT)
264    source_info_dict['build.prop']['ro.build.fingerprint'] = (
265        'source-build-fingerprint')
266    source_info = BuildInfo(source_info_dict, None)
267
268    script_writer = MockScriptWriter()
269    WriteFingerprintAssertion(script_writer, target_info, source_info)
270    self.assertEqual(
271        [('AssertSomeFingerprint', 'source-build-fingerprint',
272          'build-fingerprint')],
273        script_writer.script)
274
275  def test_WriteFingerprintAssertion_with_source_oem_props(self):
276    target_info = BuildInfo(self.TEST_INFO_DICT, None)
277    source_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
278                            self.TEST_OEM_DICTS)
279
280    script_writer = MockScriptWriter()
281    WriteFingerprintAssertion(script_writer, target_info, source_info)
282    self.assertEqual(
283        [('AssertFingerprintOrThumbprint', 'build-fingerprint',
284          'build-thumbprint')],
285        script_writer.script)
286
287  def test_WriteFingerprintAssertion_with_target_oem_props(self):
288    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
289                            self.TEST_OEM_DICTS)
290    source_info = BuildInfo(self.TEST_INFO_DICT, None)
291
292    script_writer = MockScriptWriter()
293    WriteFingerprintAssertion(script_writer, target_info, source_info)
294    self.assertEqual(
295        [('AssertFingerprintOrThumbprint', 'build-fingerprint',
296          'build-thumbprint')],
297        script_writer.script)
298
299  def test_WriteFingerprintAssertion_with_both_oem_props(self):
300    target_info = BuildInfo(self.TEST_INFO_DICT_USES_OEM_PROPS,
301                            self.TEST_OEM_DICTS)
302    source_info_dict = copy.deepcopy(self.TEST_INFO_DICT_USES_OEM_PROPS)
303    source_info_dict['build.prop']['ro.build.thumbprint'] = (
304        'source-build-thumbprint')
305    source_info = BuildInfo(source_info_dict, self.TEST_OEM_DICTS)
306
307    script_writer = MockScriptWriter()
308    WriteFingerprintAssertion(script_writer, target_info, source_info)
309    self.assertEqual(
310        [('AssertSomeThumbprint', 'build-thumbprint',
311          'source-build-thumbprint')],
312        script_writer.script)
313
314
315class LoadOemDictsTest(unittest.TestCase):
316
317  def tearDown(self):
318    common.Cleanup()
319
320  def test_NoneDict(self):
321    self.assertIsNone(_LoadOemDicts(None))
322
323  def test_SingleDict(self):
324    dict_file = common.MakeTempFile()
325    with open(dict_file, 'w') as dict_fp:
326      dict_fp.write('abc=1\ndef=2\nxyz=foo\na.b.c=bar\n')
327
328    oem_dicts = _LoadOemDicts([dict_file])
329    self.assertEqual(1, len(oem_dicts))
330    self.assertEqual('foo', oem_dicts[0]['xyz'])
331    self.assertEqual('bar', oem_dicts[0]['a.b.c'])
332
333  def test_MultipleDicts(self):
334    oem_source = []
335    for i in range(3):
336      dict_file = common.MakeTempFile()
337      with open(dict_file, 'w') as dict_fp:
338        dict_fp.write(
339            'ro.build.index={}\ndef=2\nxyz=foo\na.b.c=bar\n'.format(i))
340      oem_source.append(dict_file)
341
342    oem_dicts = _LoadOemDicts(oem_source)
343    self.assertEqual(3, len(oem_dicts))
344    for i, oem_dict in enumerate(oem_dicts):
345      self.assertEqual('2', oem_dict['def'])
346      self.assertEqual('foo', oem_dict['xyz'])
347      self.assertEqual('bar', oem_dict['a.b.c'])
348      self.assertEqual('{}'.format(i), oem_dict['ro.build.index'])
349
350
351class OtaFromTargetFilesTest(unittest.TestCase):
352
353  TEST_TARGET_INFO_DICT = {
354      'build.prop' : {
355          'ro.product.device' : 'product-device',
356          'ro.build.fingerprint' : 'build-fingerprint-target',
357          'ro.build.version.incremental' : 'build-version-incremental-target',
358          'ro.build.version.sdk' : '27',
359          'ro.build.version.security_patch' : '2017-12-01',
360          'ro.build.date.utc' : '1500000000',
361      },
362  }
363
364  TEST_SOURCE_INFO_DICT = {
365      'build.prop' : {
366          'ro.product.device' : 'product-device',
367          'ro.build.fingerprint' : 'build-fingerprint-source',
368          'ro.build.version.incremental' : 'build-version-incremental-source',
369          'ro.build.version.sdk' : '25',
370          'ro.build.version.security_patch' : '2016-12-01',
371          'ro.build.date.utc' : '1400000000',
372      },
373  }
374
375  def setUp(self):
376    self.testdata_dir = test_utils.get_testdata_dir()
377    self.assertTrue(os.path.exists(self.testdata_dir))
378
379    # Reset the global options as in ota_from_target_files.py.
380    common.OPTIONS.incremental_source = None
381    common.OPTIONS.downgrade = False
382    common.OPTIONS.timestamp = False
383    common.OPTIONS.wipe_user_data = False
384    common.OPTIONS.no_signing = False
385    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
386    common.OPTIONS.key_passwords = {
387        common.OPTIONS.package_key : None,
388    }
389
390    common.OPTIONS.search_path = test_utils.get_search_path()
391    self.assertIsNotNone(common.OPTIONS.search_path)
392
393  def tearDown(self):
394    common.Cleanup()
395
396  def test_GetPackageMetadata_abOta_full(self):
397    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
398    target_info_dict['ab_update'] = 'true'
399    target_info = BuildInfo(target_info_dict, None)
400    metadata = GetPackageMetadata(target_info)
401    self.assertDictEqual(
402        {
403            'ota-type' : 'AB',
404            'ota-required-cache' : '0',
405            'post-build' : 'build-fingerprint-target',
406            'post-build-incremental' : 'build-version-incremental-target',
407            'post-sdk-level' : '27',
408            'post-security-patch-level' : '2017-12-01',
409            'post-timestamp' : '1500000000',
410            'pre-device' : 'product-device',
411        },
412        metadata)
413
414  def test_GetPackageMetadata_abOta_incremental(self):
415    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
416    target_info_dict['ab_update'] = 'true'
417    target_info = BuildInfo(target_info_dict, None)
418    source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
419    common.OPTIONS.incremental_source = ''
420    metadata = GetPackageMetadata(target_info, source_info)
421    self.assertDictEqual(
422        {
423            'ota-type' : 'AB',
424            'ota-required-cache' : '0',
425            'post-build' : 'build-fingerprint-target',
426            'post-build-incremental' : 'build-version-incremental-target',
427            'post-sdk-level' : '27',
428            'post-security-patch-level' : '2017-12-01',
429            'post-timestamp' : '1500000000',
430            'pre-device' : 'product-device',
431            'pre-build' : 'build-fingerprint-source',
432            'pre-build-incremental' : 'build-version-incremental-source',
433        },
434        metadata)
435
436  def test_GetPackageMetadata_nonAbOta_full(self):
437    target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
438    metadata = GetPackageMetadata(target_info)
439    self.assertDictEqual(
440        {
441            'ota-type' : 'BLOCK',
442            'post-build' : 'build-fingerprint-target',
443            'post-build-incremental' : 'build-version-incremental-target',
444            'post-sdk-level' : '27',
445            'post-security-patch-level' : '2017-12-01',
446            'post-timestamp' : '1500000000',
447            'pre-device' : 'product-device',
448        },
449        metadata)
450
451  def test_GetPackageMetadata_nonAbOta_incremental(self):
452    target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
453    source_info = BuildInfo(self.TEST_SOURCE_INFO_DICT, None)
454    common.OPTIONS.incremental_source = ''
455    metadata = GetPackageMetadata(target_info, source_info)
456    self.assertDictEqual(
457        {
458            'ota-type' : 'BLOCK',
459            'post-build' : 'build-fingerprint-target',
460            'post-build-incremental' : 'build-version-incremental-target',
461            'post-sdk-level' : '27',
462            'post-security-patch-level' : '2017-12-01',
463            'post-timestamp' : '1500000000',
464            'pre-device' : 'product-device',
465            'pre-build' : 'build-fingerprint-source',
466            'pre-build-incremental' : 'build-version-incremental-source',
467        },
468        metadata)
469
470  def test_GetPackageMetadata_wipe(self):
471    target_info = BuildInfo(self.TEST_TARGET_INFO_DICT, None)
472    common.OPTIONS.wipe_user_data = True
473    metadata = GetPackageMetadata(target_info)
474    self.assertDictEqual(
475        {
476            'ota-type' : 'BLOCK',
477            'ota-wipe' : 'yes',
478            'post-build' : 'build-fingerprint-target',
479            'post-build-incremental' : 'build-version-incremental-target',
480            'post-sdk-level' : '27',
481            'post-security-patch-level' : '2017-12-01',
482            'post-timestamp' : '1500000000',
483            'pre-device' : 'product-device',
484        },
485        metadata)
486
487  @staticmethod
488  def _test_GetPackageMetadata_swapBuildTimestamps(target_info, source_info):
489    (target_info['build.prop']['ro.build.date.utc'],
490     source_info['build.prop']['ro.build.date.utc']) = (
491         source_info['build.prop']['ro.build.date.utc'],
492         target_info['build.prop']['ro.build.date.utc'])
493
494  def test_GetPackageMetadata_unintentionalDowngradeDetected(self):
495    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
496    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
497    self._test_GetPackageMetadata_swapBuildTimestamps(
498        target_info_dict, source_info_dict)
499
500    target_info = BuildInfo(target_info_dict, None)
501    source_info = BuildInfo(source_info_dict, None)
502    common.OPTIONS.incremental_source = ''
503    self.assertRaises(RuntimeError, GetPackageMetadata, target_info,
504                      source_info)
505
506  def test_GetPackageMetadata_downgrade(self):
507    target_info_dict = copy.deepcopy(self.TEST_TARGET_INFO_DICT)
508    source_info_dict = copy.deepcopy(self.TEST_SOURCE_INFO_DICT)
509    self._test_GetPackageMetadata_swapBuildTimestamps(
510        target_info_dict, source_info_dict)
511
512    target_info = BuildInfo(target_info_dict, None)
513    source_info = BuildInfo(source_info_dict, None)
514    common.OPTIONS.incremental_source = ''
515    common.OPTIONS.downgrade = True
516    common.OPTIONS.wipe_user_data = True
517    metadata = GetPackageMetadata(target_info, source_info)
518    self.assertDictEqual(
519        {
520            'ota-downgrade' : 'yes',
521            'ota-type' : 'BLOCK',
522            'ota-wipe' : 'yes',
523            'post-build' : 'build-fingerprint-target',
524            'post-build-incremental' : 'build-version-incremental-target',
525            'post-sdk-level' : '27',
526            'post-security-patch-level' : '2017-12-01',
527            'post-timestamp' : '1400000000',
528            'pre-device' : 'product-device',
529            'pre-build' : 'build-fingerprint-source',
530            'pre-build-incremental' : 'build-version-incremental-source',
531        },
532        metadata)
533
534  def test_GetTargetFilesZipForSecondaryImages(self):
535    input_file = construct_target_files(secondary=True)
536    target_file = GetTargetFilesZipForSecondaryImages(input_file)
537
538    with zipfile.ZipFile(target_file) as verify_zip:
539      namelist = verify_zip.namelist()
540
541    self.assertIn('META/ab_partitions.txt', namelist)
542    self.assertIn('IMAGES/boot.img', namelist)
543    self.assertIn('IMAGES/system.img', namelist)
544    self.assertIn('IMAGES/vendor.img', namelist)
545    self.assertIn(POSTINSTALL_CONFIG, namelist)
546
547    self.assertNotIn('IMAGES/system_other.img', namelist)
548    self.assertNotIn('IMAGES/system.map', namelist)
549
550  def test_GetTargetFilesZipForSecondaryImages_skipPostinstall(self):
551    input_file = construct_target_files(secondary=True)
552    target_file = GetTargetFilesZipForSecondaryImages(
553        input_file, skip_postinstall=True)
554
555    with zipfile.ZipFile(target_file) as verify_zip:
556      namelist = verify_zip.namelist()
557
558    self.assertIn('META/ab_partitions.txt', namelist)
559    self.assertIn('IMAGES/boot.img', namelist)
560    self.assertIn('IMAGES/system.img', namelist)
561    self.assertIn('IMAGES/vendor.img', namelist)
562
563    self.assertNotIn('IMAGES/system_other.img', namelist)
564    self.assertNotIn('IMAGES/system.map', namelist)
565    self.assertNotIn(POSTINSTALL_CONFIG, namelist)
566
567  def test_GetTargetFilesZipWithoutPostinstallConfig(self):
568    input_file = construct_target_files()
569    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
570    with zipfile.ZipFile(target_file) as verify_zip:
571      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
572
573  def test_GetTargetFilesZipWithoutPostinstallConfig_missingEntry(self):
574    input_file = construct_target_files()
575    common.ZipDelete(input_file, POSTINSTALL_CONFIG)
576    target_file = GetTargetFilesZipWithoutPostinstallConfig(input_file)
577    with zipfile.ZipFile(target_file) as verify_zip:
578      self.assertNotIn(POSTINSTALL_CONFIG, verify_zip.namelist())
579
580  def _test_FinalizeMetadata(self, large_entry=False):
581    entries = [
582        'required-entry1',
583        'required-entry2',
584    ]
585    zip_file = PropertyFilesTest.construct_zip_package(entries)
586    # Add a large entry of 1 GiB if requested.
587    if large_entry:
588      with zipfile.ZipFile(zip_file, 'a') as zip_fp:
589        zip_fp.writestr(
590            # Using 'zoo' so that the entry stays behind others after signing.
591            'zoo',
592            'A' * 1024 * 1024 * 1024,
593            zipfile.ZIP_STORED)
594
595    metadata = {}
596    output_file = common.MakeTempFile(suffix='.zip')
597    needed_property_files = (
598        TestPropertyFiles(),
599    )
600    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
601    self.assertIn('ota-test-property-files', metadata)
602
603  def test_FinalizeMetadata(self):
604    self._test_FinalizeMetadata()
605
606  def test_FinalizeMetadata_withNoSigning(self):
607    common.OPTIONS.no_signing = True
608    self._test_FinalizeMetadata()
609
610  def test_FinalizeMetadata_largeEntry(self):
611    self._test_FinalizeMetadata(large_entry=True)
612
613  def test_FinalizeMetadata_largeEntry_withNoSigning(self):
614    common.OPTIONS.no_signing = True
615    self._test_FinalizeMetadata(large_entry=True)
616
617  def test_FinalizeMetadata_insufficientSpace(self):
618    entries = [
619        'required-entry1',
620        'required-entry2',
621        'optional-entry1',
622        'optional-entry2',
623    ]
624    zip_file = PropertyFilesTest.construct_zip_package(entries)
625    with zipfile.ZipFile(zip_file, 'a') as zip_fp:
626      zip_fp.writestr(
627          # 'foo-entry1' will appear ahead of all other entries (in alphabetical
628          # order) after the signing, which will in turn trigger the
629          # InsufficientSpaceException and an automatic retry.
630          'foo-entry1',
631          'A' * 1024 * 1024,
632          zipfile.ZIP_STORED)
633
634    metadata = {}
635    needed_property_files = (
636        TestPropertyFiles(),
637    )
638    output_file = common.MakeTempFile(suffix='.zip')
639    FinalizeMetadata(metadata, zip_file, output_file, needed_property_files)
640    self.assertIn('ota-test-property-files', metadata)
641
642
643class TestPropertyFiles(PropertyFiles):
644  """A class that extends PropertyFiles for testing purpose."""
645
646  def __init__(self):
647    super(TestPropertyFiles, self).__init__()
648    self.name = 'ota-test-property-files'
649    self.required = (
650        'required-entry1',
651        'required-entry2',
652    )
653    self.optional = (
654        'optional-entry1',
655        'optional-entry2',
656    )
657
658
659class PropertyFilesTest(unittest.TestCase):
660
661  def setUp(self):
662    common.OPTIONS.no_signing = False
663
664  def tearDown(self):
665    common.Cleanup()
666
667  @staticmethod
668  def construct_zip_package(entries):
669    zip_file = common.MakeTempFile(suffix='.zip')
670    with zipfile.ZipFile(zip_file, 'w') as zip_fp:
671      for entry in entries:
672        zip_fp.writestr(
673            entry,
674            entry.replace('.', '-').upper(),
675            zipfile.ZIP_STORED)
676    return zip_file
677
678  @staticmethod
679  def _parse_property_files_string(data):
680    result = {}
681    for token in data.split(','):
682      name, info = token.split(':', 1)
683      result[name] = info
684    return result
685
686  def _verify_entries(self, input_file, tokens, entries):
687    for entry in entries:
688      offset, size = map(int, tokens[entry].split(':'))
689      with open(input_file, 'rb') as input_fp:
690        input_fp.seek(offset)
691        if entry == 'metadata':
692          expected = b'META-INF/COM/ANDROID/METADATA'
693        else:
694          expected = entry.replace('.', '-').upper().encode()
695        self.assertEqual(expected, input_fp.read(size))
696
697  def test_Compute(self):
698    entries = (
699        'required-entry1',
700        'required-entry2',
701    )
702    zip_file = self.construct_zip_package(entries)
703    property_files = TestPropertyFiles()
704    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
705      property_files_string = property_files.Compute(zip_fp)
706
707    tokens = self._parse_property_files_string(property_files_string)
708    self.assertEqual(3, len(tokens))
709    self._verify_entries(zip_file, tokens, entries)
710
711  def test_Compute_withOptionalEntries(self):
712    entries = (
713        'required-entry1',
714        'required-entry2',
715        'optional-entry1',
716        'optional-entry2',
717    )
718    zip_file = self.construct_zip_package(entries)
719    property_files = TestPropertyFiles()
720    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
721      property_files_string = property_files.Compute(zip_fp)
722
723    tokens = self._parse_property_files_string(property_files_string)
724    self.assertEqual(5, len(tokens))
725    self._verify_entries(zip_file, tokens, entries)
726
727  def test_Compute_missingRequiredEntry(self):
728    entries = (
729        'required-entry2',
730    )
731    zip_file = self.construct_zip_package(entries)
732    property_files = TestPropertyFiles()
733    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
734      self.assertRaises(KeyError, property_files.Compute, zip_fp)
735
736  def test_Finalize(self):
737    entries = [
738        'required-entry1',
739        'required-entry2',
740        'META-INF/com/android/metadata',
741    ]
742    zip_file = self.construct_zip_package(entries)
743    property_files = TestPropertyFiles()
744    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
745      # pylint: disable=protected-access
746      raw_metadata = property_files._GetPropertyFilesString(
747          zip_fp, reserve_space=False)
748      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
749    tokens = self._parse_property_files_string(streaming_metadata)
750
751    self.assertEqual(3, len(tokens))
752    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
753    # streaming metadata.
754    entries[2] = 'metadata'
755    self._verify_entries(zip_file, tokens, entries)
756
757  def test_Finalize_assertReservedLength(self):
758    entries = (
759        'required-entry1',
760        'required-entry2',
761        'optional-entry1',
762        'optional-entry2',
763        'META-INF/com/android/metadata',
764    )
765    zip_file = self.construct_zip_package(entries)
766    property_files = TestPropertyFiles()
767    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
768      # First get the raw metadata string (i.e. without padding space).
769      # pylint: disable=protected-access
770      raw_metadata = property_files._GetPropertyFilesString(
771          zip_fp, reserve_space=False)
772      raw_length = len(raw_metadata)
773
774      # Now pass in the exact expected length.
775      streaming_metadata = property_files.Finalize(zip_fp, raw_length)
776      self.assertEqual(raw_length, len(streaming_metadata))
777
778      # Or pass in insufficient length.
779      self.assertRaises(
780          PropertyFiles.InsufficientSpaceException,
781          property_files.Finalize,
782          zip_fp,
783          raw_length - 1)
784
785      # Or pass in a much larger size.
786      streaming_metadata = property_files.Finalize(
787          zip_fp,
788          raw_length + 20)
789      self.assertEqual(raw_length + 20, len(streaming_metadata))
790      self.assertEqual(' ' * 20, streaming_metadata[raw_length:])
791
792  def test_Verify(self):
793    entries = (
794        'required-entry1',
795        'required-entry2',
796        'optional-entry1',
797        'optional-entry2',
798        'META-INF/com/android/metadata',
799    )
800    zip_file = self.construct_zip_package(entries)
801    property_files = TestPropertyFiles()
802    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
803      # First get the raw metadata string (i.e. without padding space).
804      # pylint: disable=protected-access
805      raw_metadata = property_files._GetPropertyFilesString(
806          zip_fp, reserve_space=False)
807
808      # Should pass the test if verification passes.
809      property_files.Verify(zip_fp, raw_metadata)
810
811      # Or raise on verification failure.
812      self.assertRaises(
813          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
814
815
816class StreamingPropertyFilesTest(PropertyFilesTest):
817  """Additional sanity checks specialized for StreamingPropertyFiles."""
818
819  def test_init(self):
820    property_files = StreamingPropertyFiles()
821    self.assertEqual('ota-streaming-property-files', property_files.name)
822    self.assertEqual(
823        (
824            'payload.bin',
825            'payload_properties.txt',
826        ),
827        property_files.required)
828    self.assertEqual(
829        (
830            'care_map.txt',
831            'compatibility.zip',
832        ),
833        property_files.optional)
834
835  def test_Compute(self):
836    entries = (
837        'payload.bin',
838        'payload_properties.txt',
839        'care_map.txt',
840        'compatibility.zip',
841    )
842    zip_file = self.construct_zip_package(entries)
843    property_files = StreamingPropertyFiles()
844    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
845      property_files_string = property_files.Compute(zip_fp)
846
847    tokens = self._parse_property_files_string(property_files_string)
848    self.assertEqual(5, len(tokens))
849    self._verify_entries(zip_file, tokens, entries)
850
851  def test_Finalize(self):
852    entries = [
853        'payload.bin',
854        'payload_properties.txt',
855        'care_map.txt',
856        'compatibility.zip',
857        'META-INF/com/android/metadata',
858    ]
859    zip_file = self.construct_zip_package(entries)
860    property_files = StreamingPropertyFiles()
861    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
862      # pylint: disable=protected-access
863      raw_metadata = property_files._GetPropertyFilesString(
864          zip_fp, reserve_space=False)
865      streaming_metadata = property_files.Finalize(zip_fp, len(raw_metadata))
866    tokens = self._parse_property_files_string(streaming_metadata)
867
868    self.assertEqual(5, len(tokens))
869    # 'META-INF/com/android/metadata' will be key'd as 'metadata' in the
870    # streaming metadata.
871    entries[4] = 'metadata'
872    self._verify_entries(zip_file, tokens, entries)
873
874  def test_Verify(self):
875    entries = (
876        'payload.bin',
877        'payload_properties.txt',
878        'care_map.txt',
879        'compatibility.zip',
880        'META-INF/com/android/metadata',
881    )
882    zip_file = self.construct_zip_package(entries)
883    property_files = StreamingPropertyFiles()
884    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
885      # First get the raw metadata string (i.e. without padding space).
886      # pylint: disable=protected-access
887      raw_metadata = property_files._GetPropertyFilesString(
888          zip_fp, reserve_space=False)
889
890      # Should pass the test if verification passes.
891      property_files.Verify(zip_fp, raw_metadata)
892
893      # Or raise on verification failure.
894      self.assertRaises(
895          AssertionError, property_files.Verify, zip_fp, raw_metadata + 'x')
896
897
898class AbOtaPropertyFilesTest(PropertyFilesTest):
899  """Additional sanity checks specialized for AbOtaPropertyFiles."""
900
901  # The size for payload and metadata signature size.
902  SIGNATURE_SIZE = 256
903
904  def setUp(self):
905    self.testdata_dir = test_utils.get_testdata_dir()
906    self.assertTrue(os.path.exists(self.testdata_dir))
907
908    common.OPTIONS.wipe_user_data = False
909    common.OPTIONS.payload_signer = None
910    common.OPTIONS.payload_signer_args = None
911    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
912    common.OPTIONS.key_passwords = {
913        common.OPTIONS.package_key : None,
914    }
915
916  def test_init(self):
917    property_files = AbOtaPropertyFiles()
918    self.assertEqual('ota-property-files', property_files.name)
919    self.assertEqual(
920        (
921            'payload.bin',
922            'payload_properties.txt',
923        ),
924        property_files.required)
925    self.assertEqual(
926        (
927            'care_map.txt',
928            'compatibility.zip',
929        ),
930        property_files.optional)
931
932  def test_GetPayloadMetadataOffsetAndSize(self):
933    target_file = construct_target_files()
934    payload = Payload()
935    payload.Generate(target_file)
936
937    payload_signer = PayloadSigner()
938    payload.Sign(payload_signer)
939
940    output_file = common.MakeTempFile(suffix='.zip')
941    with zipfile.ZipFile(output_file, 'w') as output_zip:
942      payload.WriteToZip(output_zip)
943
944    # Find out the payload metadata offset and size.
945    property_files = AbOtaPropertyFiles()
946    with zipfile.ZipFile(output_file) as input_zip:
947      # pylint: disable=protected-access
948      payload_offset, metadata_total = (
949          property_files._GetPayloadMetadataOffsetAndSize(input_zip))
950
951    # Read in the metadata signature directly.
952    with open(output_file, 'rb') as verify_fp:
953      verify_fp.seek(payload_offset + metadata_total - self.SIGNATURE_SIZE)
954      metadata_signature = verify_fp.read(self.SIGNATURE_SIZE)
955
956    # Now we extract the metadata hash via brillo_update_payload script, which
957    # will serve as the oracle result.
958    payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
959    metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
960    cmd = ['brillo_update_payload', 'hash',
961           '--unsigned_payload', payload.payload_file,
962           '--signature_size', str(self.SIGNATURE_SIZE),
963           '--metadata_hash_file', metadata_sig_file,
964           '--payload_hash_file', payload_sig_file]
965    proc = common.Run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
966    stdoutdata, _ = proc.communicate()
967    self.assertEqual(
968        0, proc.returncode,
969        'Failed to run brillo_update_payload: {}'.format(stdoutdata))
970
971    signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
972
973    # Finally we can compare the two signatures.
974    with open(signed_metadata_sig_file, 'rb') as verify_fp:
975      self.assertEqual(verify_fp.read(), metadata_signature)
976
977  @staticmethod
978  def construct_zip_package_withValidPayload(with_metadata=False):
979    # Cannot use construct_zip_package() since we need a "valid" payload.bin.
980    target_file = construct_target_files()
981    payload = Payload()
982    payload.Generate(target_file)
983
984    payload_signer = PayloadSigner()
985    payload.Sign(payload_signer)
986
987    zip_file = common.MakeTempFile(suffix='.zip')
988    with zipfile.ZipFile(zip_file, 'w') as zip_fp:
989      # 'payload.bin',
990      payload.WriteToZip(zip_fp)
991
992      # Other entries.
993      entries = ['care_map.txt', 'compatibility.zip']
994
995      # Put META-INF/com/android/metadata if needed.
996      if with_metadata:
997        entries.append('META-INF/com/android/metadata')
998
999      for entry in entries:
1000        zip_fp.writestr(
1001            entry, entry.replace('.', '-').upper(), zipfile.ZIP_STORED)
1002
1003    return zip_file
1004
1005  def test_Compute(self):
1006    zip_file = self.construct_zip_package_withValidPayload()
1007    property_files = AbOtaPropertyFiles()
1008    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
1009      property_files_string = property_files.Compute(zip_fp)
1010
1011    tokens = self._parse_property_files_string(property_files_string)
1012    # "6" indcludes the four entries above, one metadata entry, and one entry
1013    # for payload-metadata.bin.
1014    self.assertEqual(6, len(tokens))
1015    self._verify_entries(
1016        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1017
1018  def test_Finalize(self):
1019    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1020    property_files = AbOtaPropertyFiles()
1021    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
1022      # pylint: disable=protected-access
1023      raw_metadata = property_files._GetPropertyFilesString(
1024          zip_fp, reserve_space=False)
1025      property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
1026
1027    tokens = self._parse_property_files_string(property_files_string)
1028    # "6" indcludes the four entries above, one metadata entry, and one entry
1029    # for payload-metadata.bin.
1030    self.assertEqual(6, len(tokens))
1031    self._verify_entries(
1032        zip_file, tokens, ('care_map.txt', 'compatibility.zip'))
1033
1034  def test_Verify(self):
1035    zip_file = self.construct_zip_package_withValidPayload(with_metadata=True)
1036    property_files = AbOtaPropertyFiles()
1037    with zipfile.ZipFile(zip_file, 'r') as zip_fp:
1038      # pylint: disable=protected-access
1039      raw_metadata = property_files._GetPropertyFilesString(
1040          zip_fp, reserve_space=False)
1041
1042      property_files.Verify(zip_fp, raw_metadata)
1043
1044
1045class NonAbOtaPropertyFilesTest(PropertyFilesTest):
1046  """Additional sanity checks specialized for NonAbOtaPropertyFiles."""
1047
1048  def test_init(self):
1049    property_files = NonAbOtaPropertyFiles()
1050    self.assertEqual('ota-property-files', property_files.name)
1051    self.assertEqual((), property_files.required)
1052    self.assertEqual((), property_files.optional)
1053
1054  def test_Compute(self):
1055    entries = ()
1056    zip_file = self.construct_zip_package(entries)
1057    property_files = NonAbOtaPropertyFiles()
1058    with zipfile.ZipFile(zip_file) as zip_fp:
1059      property_files_string = property_files.Compute(zip_fp)
1060
1061    tokens = self._parse_property_files_string(property_files_string)
1062    self.assertEqual(1, len(tokens))
1063    self._verify_entries(zip_file, tokens, entries)
1064
1065  def test_Finalize(self):
1066    entries = [
1067        'META-INF/com/android/metadata',
1068    ]
1069    zip_file = self.construct_zip_package(entries)
1070    property_files = NonAbOtaPropertyFiles()
1071    with zipfile.ZipFile(zip_file) as zip_fp:
1072      # pylint: disable=protected-access
1073      raw_metadata = property_files._GetPropertyFilesString(
1074          zip_fp, reserve_space=False)
1075      property_files_string = property_files.Finalize(zip_fp, len(raw_metadata))
1076    tokens = self._parse_property_files_string(property_files_string)
1077
1078    self.assertEqual(1, len(tokens))
1079    # 'META-INF/com/android/metadata' will be key'd as 'metadata'.
1080    entries[0] = 'metadata'
1081    self._verify_entries(zip_file, tokens, entries)
1082
1083  def test_Verify(self):
1084    entries = (
1085        'META-INF/com/android/metadata',
1086    )
1087    zip_file = self.construct_zip_package(entries)
1088    property_files = NonAbOtaPropertyFiles()
1089    with zipfile.ZipFile(zip_file) as zip_fp:
1090      # pylint: disable=protected-access
1091      raw_metadata = property_files._GetPropertyFilesString(
1092          zip_fp, reserve_space=False)
1093
1094      property_files.Verify(zip_fp, raw_metadata)
1095
1096
1097class PayloadSignerTest(unittest.TestCase):
1098
1099  SIGFILE = 'sigfile.bin'
1100  SIGNED_SIGFILE = 'signed-sigfile.bin'
1101
1102  def setUp(self):
1103    self.testdata_dir = test_utils.get_testdata_dir()
1104    self.assertTrue(os.path.exists(self.testdata_dir))
1105
1106    common.OPTIONS.payload_signer = None
1107    common.OPTIONS.payload_signer_args = []
1108    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1109    common.OPTIONS.key_passwords = {
1110        common.OPTIONS.package_key : None,
1111    }
1112
1113  def tearDown(self):
1114    common.Cleanup()
1115
1116  def _assertFilesEqual(self, file1, file2):
1117    with open(file1, 'rb') as fp1, open(file2, 'rb') as fp2:
1118      self.assertEqual(fp1.read(), fp2.read())
1119
1120  def test_init(self):
1121    payload_signer = PayloadSigner()
1122    self.assertEqual('openssl', payload_signer.signer)
1123
1124  def test_init_withPassword(self):
1125    common.OPTIONS.package_key = os.path.join(
1126        self.testdata_dir, 'testkey_with_passwd')
1127    common.OPTIONS.key_passwords = {
1128        common.OPTIONS.package_key : 'foo',
1129    }
1130    payload_signer = PayloadSigner()
1131    self.assertEqual('openssl', payload_signer.signer)
1132
1133  def test_init_withExternalSigner(self):
1134    common.OPTIONS.payload_signer = 'abc'
1135    common.OPTIONS.payload_signer_args = ['arg1', 'arg2']
1136    payload_signer = PayloadSigner()
1137    self.assertEqual('abc', payload_signer.signer)
1138    self.assertEqual(['arg1', 'arg2'], payload_signer.signer_args)
1139
1140  def test_Sign(self):
1141    payload_signer = PayloadSigner()
1142    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1143    signed_file = payload_signer.Sign(input_file)
1144
1145    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1146    self._assertFilesEqual(verify_file, signed_file)
1147
1148  def test_Sign_withExternalSigner_openssl(self):
1149    """Uses openssl as the external payload signer."""
1150    common.OPTIONS.payload_signer = 'openssl'
1151    common.OPTIONS.payload_signer_args = [
1152        'pkeyutl', '-sign', '-keyform', 'DER', '-inkey',
1153        os.path.join(self.testdata_dir, 'testkey.pk8'),
1154        '-pkeyopt', 'digest:sha256']
1155    payload_signer = PayloadSigner()
1156    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1157    signed_file = payload_signer.Sign(input_file)
1158
1159    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1160    self._assertFilesEqual(verify_file, signed_file)
1161
1162  def test_Sign_withExternalSigner_script(self):
1163    """Uses testdata/payload_signer.sh as the external payload signer."""
1164    common.OPTIONS.payload_signer = os.path.join(
1165        self.testdata_dir, 'payload_signer.sh')
1166    common.OPTIONS.payload_signer_args = [
1167        os.path.join(self.testdata_dir, 'testkey.pk8')]
1168    payload_signer = PayloadSigner()
1169    input_file = os.path.join(self.testdata_dir, self.SIGFILE)
1170    signed_file = payload_signer.Sign(input_file)
1171
1172    verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
1173    self._assertFilesEqual(verify_file, signed_file)
1174
1175
1176class PayloadTest(unittest.TestCase):
1177
1178  def setUp(self):
1179    self.testdata_dir = test_utils.get_testdata_dir()
1180    self.assertTrue(os.path.exists(self.testdata_dir))
1181
1182    common.OPTIONS.wipe_user_data = False
1183    common.OPTIONS.payload_signer = None
1184    common.OPTIONS.payload_signer_args = None
1185    common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
1186    common.OPTIONS.key_passwords = {
1187        common.OPTIONS.package_key : None,
1188    }
1189
1190  def tearDown(self):
1191    common.Cleanup()
1192
1193  @staticmethod
1194  def _create_payload_full(secondary=False):
1195    target_file = construct_target_files(secondary)
1196    payload = Payload(secondary)
1197    payload.Generate(target_file)
1198    return payload
1199
1200  @staticmethod
1201  def _create_payload_incremental():
1202    target_file = construct_target_files()
1203    source_file = construct_target_files()
1204    payload = Payload()
1205    payload.Generate(target_file, source_file)
1206    return payload
1207
1208  def test_Generate_full(self):
1209    payload = self._create_payload_full()
1210    self.assertTrue(os.path.exists(payload.payload_file))
1211
1212  def test_Generate_incremental(self):
1213    payload = self._create_payload_incremental()
1214    self.assertTrue(os.path.exists(payload.payload_file))
1215
1216  def test_Generate_additionalArgs(self):
1217    target_file = construct_target_files()
1218    source_file = construct_target_files()
1219    payload = Payload()
1220    # This should work the same as calling payload.Generate(target_file,
1221    # source_file).
1222    payload.Generate(
1223        target_file, additional_args=["--source_image", source_file])
1224    self.assertTrue(os.path.exists(payload.payload_file))
1225
1226  def test_Generate_invalidInput(self):
1227    target_file = construct_target_files()
1228    common.ZipDelete(target_file, 'IMAGES/vendor.img')
1229    payload = Payload()
1230    self.assertRaises(AssertionError, payload.Generate, target_file)
1231
1232  def test_Sign_full(self):
1233    payload = self._create_payload_full()
1234    payload.Sign(PayloadSigner())
1235
1236    output_file = common.MakeTempFile(suffix='.zip')
1237    with zipfile.ZipFile(output_file, 'w') as output_zip:
1238      payload.WriteToZip(output_zip)
1239
1240    import check_ota_package_signature
1241    check_ota_package_signature.VerifyAbOtaPayload(
1242        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1243        output_file)
1244
1245  def test_Sign_incremental(self):
1246    payload = self._create_payload_incremental()
1247    payload.Sign(PayloadSigner())
1248
1249    output_file = common.MakeTempFile(suffix='.zip')
1250    with zipfile.ZipFile(output_file, 'w') as output_zip:
1251      payload.WriteToZip(output_zip)
1252
1253    import check_ota_package_signature
1254    check_ota_package_signature.VerifyAbOtaPayload(
1255        os.path.join(self.testdata_dir, 'testkey.x509.pem'),
1256        output_file)
1257
1258  def test_Sign_withDataWipe(self):
1259    common.OPTIONS.wipe_user_data = True
1260    payload = self._create_payload_full()
1261    payload.Sign(PayloadSigner())
1262
1263    with open(payload.payload_properties) as properties_fp:
1264      self.assertIn("POWERWASH=1", properties_fp.read())
1265
1266  def test_Sign_secondary(self):
1267    payload = self._create_payload_full(secondary=True)
1268    payload.Sign(PayloadSigner())
1269
1270    with open(payload.payload_properties) as properties_fp:
1271      self.assertIn("SWITCH_SLOT_ON_REBOOT=0", properties_fp.read())
1272
1273  def test_Sign_badSigner(self):
1274    """Tests that signing failure can be captured."""
1275    payload = self._create_payload_full()
1276    payload_signer = PayloadSigner()
1277    payload_signer.signer_args.append('bad-option')
1278    self.assertRaises(AssertionError, payload.Sign, payload_signer)
1279
1280  def test_WriteToZip(self):
1281    payload = self._create_payload_full()
1282    payload.Sign(PayloadSigner())
1283
1284    output_file = common.MakeTempFile(suffix='.zip')
1285    with zipfile.ZipFile(output_file, 'w') as output_zip:
1286      payload.WriteToZip(output_zip)
1287
1288    with zipfile.ZipFile(output_file) as verify_zip:
1289      # First make sure we have the essential entries.
1290      namelist = verify_zip.namelist()
1291      self.assertIn(Payload.PAYLOAD_BIN, namelist)
1292      self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
1293
1294      # Then assert these entries are stored.
1295      for entry_info in verify_zip.infolist():
1296        if entry_info.filename not in (Payload.PAYLOAD_BIN,
1297                                       Payload.PAYLOAD_PROPERTIES_TXT):
1298          continue
1299        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1300
1301  def test_WriteToZip_unsignedPayload(self):
1302    """Unsigned payloads should not be allowed to be written to zip."""
1303    payload = self._create_payload_full()
1304
1305    output_file = common.MakeTempFile(suffix='.zip')
1306    with zipfile.ZipFile(output_file, 'w') as output_zip:
1307      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
1308
1309    # Also test with incremental payload.
1310    payload = self._create_payload_incremental()
1311
1312    output_file = common.MakeTempFile(suffix='.zip')
1313    with zipfile.ZipFile(output_file, 'w') as output_zip:
1314      self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
1315
1316  def test_WriteToZip_secondary(self):
1317    payload = self._create_payload_full(secondary=True)
1318    payload.Sign(PayloadSigner())
1319
1320    output_file = common.MakeTempFile(suffix='.zip')
1321    with zipfile.ZipFile(output_file, 'w') as output_zip:
1322      payload.WriteToZip(output_zip)
1323
1324    with zipfile.ZipFile(output_file) as verify_zip:
1325      # First make sure we have the essential entries.
1326      namelist = verify_zip.namelist()
1327      self.assertIn(Payload.SECONDARY_PAYLOAD_BIN, namelist)
1328      self.assertIn(Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT, namelist)
1329
1330      # Then assert these entries are stored.
1331      for entry_info in verify_zip.infolist():
1332        if entry_info.filename not in (
1333            Payload.SECONDARY_PAYLOAD_BIN,
1334            Payload.SECONDARY_PAYLOAD_PROPERTIES_TXT):
1335          continue
1336        self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
1337