1#!/usr/bin/env python
2
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Validate a given (signed) target_files.zip.
19
20It performs the following checks to assert the integrity of the input zip.
21
22 - It verifies the file consistency between the ones in IMAGES/system.img (read
23   via IMAGES/system.map) and the ones under unpacked folder of SYSTEM/. The
24   same check also applies to the vendor image if present.
25
26 - It verifies the install-recovery script consistency, by comparing the
27   checksums in the script against the ones of IMAGES/{boot,recovery}.img.
28
29 - It verifies the signed Verified Boot related images, for both of Verified
30   Boot 1.0 and 2.0 (aka AVB).
31"""
32
33import argparse
34import filecmp
35import logging
36import os.path
37import re
38import zipfile
39from hashlib import sha1
40
41import common
42import rangelib
43
44
45def _ReadFile(file_name, unpacked_name, round_up=False):
46  """Constructs and returns a File object. Rounds up its size if needed."""
47  assert os.path.exists(unpacked_name)
48  with open(unpacked_name, 'rb') as f:
49    file_data = f.read()
50  file_size = len(file_data)
51  if round_up:
52    file_size_rounded_up = common.RoundUpTo4K(file_size)
53    file_data += b'\0' * (file_size_rounded_up - file_size)
54  return common.File(file_name, file_data)
55
56
57def ValidateFileAgainstSha1(input_tmp, file_name, file_path, expected_sha1):
58  """Check if the file has the expected SHA-1."""
59
60  logging.info('Validating the SHA-1 of %s', file_name)
61  unpacked_name = os.path.join(input_tmp, file_path)
62  assert os.path.exists(unpacked_name)
63  actual_sha1 = _ReadFile(file_name, unpacked_name, False).sha1
64  assert actual_sha1 == expected_sha1, \
65      'SHA-1 mismatches for {}. actual {}, expected {}'.format(
66          file_name, actual_sha1, expected_sha1)
67
68
69def ValidateFileConsistency(input_zip, input_tmp, info_dict):
70  """Compare the files from image files and unpacked folders."""
71
72  def CheckAllFiles(which):
73    logging.info('Checking %s image.', which)
74    # Allow having shared blocks when loading the sparse image, because allowing
75    # that doesn't affect the checks below (we will have all the blocks on file,
76    # unless it's skipped due to the holes).
77    image = common.GetSparseImage(which, input_tmp, input_zip, True)
78    prefix = '/' + which
79    for entry in image.file_map:
80      # Skip entries like '__NONZERO-0'.
81      if not entry.startswith(prefix):
82        continue
83
84      # Read the blocks that the file resides. Note that it will contain the
85      # bytes past the file length, which is expected to be padded with '\0's.
86      ranges = image.file_map[entry]
87
88      # Use the original RangeSet if applicable, which includes the shared
89      # blocks. And this needs to happen before checking the monotonicity flag.
90      if ranges.extra.get('uses_shared_blocks'):
91        file_ranges = ranges.extra['uses_shared_blocks']
92      else:
93        file_ranges = ranges
94
95      incomplete = file_ranges.extra.get('incomplete', False)
96      if incomplete:
97        logging.warning('Skipping %s that has incomplete block list', entry)
98        continue
99
100      # If the file has non-monotonic ranges, read each range in order.
101      if not file_ranges.monotonic:
102        h = sha1()
103        for file_range in file_ranges.extra['text_str'].split(' '):
104          for data in image.ReadRangeSet(rangelib.RangeSet(file_range)):
105            h.update(data)
106        blocks_sha1 = h.hexdigest()
107      else:
108        blocks_sha1 = image.RangeSha1(file_ranges)
109
110      # The filename under unpacked directory, such as SYSTEM/bin/sh.
111      unpacked_name = os.path.join(
112          input_tmp, which.upper(), entry[(len(prefix) + 1):])
113      unpacked_file = _ReadFile(entry, unpacked_name, True)
114      file_sha1 = unpacked_file.sha1
115      assert blocks_sha1 == file_sha1, \
116          'file: %s, range: %s, blocks_sha1: %s, file_sha1: %s' % (
117              entry, file_ranges, blocks_sha1, file_sha1)
118
119  logging.info('Validating file consistency.')
120
121  # TODO(b/79617342): Validate non-sparse images.
122  if info_dict.get('extfs_sparse_flag') != '-s':
123    logging.warning('Skipped due to target using non-sparse images')
124    return
125
126  # Verify IMAGES/system.img.
127  CheckAllFiles('system')
128
129  # Verify IMAGES/vendor.img if applicable.
130  if 'VENDOR/' in input_zip.namelist():
131    CheckAllFiles('vendor')
132
133  # Not checking IMAGES/system_other.img since it doesn't have the map file.
134
135
136def ValidateInstallRecoveryScript(input_tmp, info_dict):
137  """Validate the SHA-1 embedded in install-recovery.sh.
138
139  install-recovery.sh is written in common.py and has the following format:
140
141  1. full recovery:
142  ...
143  if ! applypatch --check type:device:size:sha1; then
144    applypatch --flash /vendor/etc/recovery.img \\
145        type:device:size:sha1 && \\
146  ...
147
148  2. recovery from boot:
149  ...
150  if ! applypatch --check type:recovery_device:recovery_size:recovery_sha1; then
151    applypatch [--bonus bonus_args] \\
152        --patch /vendor/recovery-from-boot.p \\
153        --source type:boot_device:boot_size:boot_sha1 \\
154        --target type:recovery_device:recovery_size:recovery_sha1 && \\
155  ...
156
157  For full recovery, we want to calculate the SHA-1 of /vendor/etc/recovery.img
158  and compare it against the one embedded in the script. While for recovery
159  from boot, we want to check the SHA-1 for both recovery.img and boot.img
160  under IMAGES/.
161  """
162
163  board_uses_vendorimage = info_dict.get("board_uses_vendorimage") == "true"
164
165  if board_uses_vendorimage:
166    script_path = 'VENDOR/bin/install-recovery.sh'
167    recovery_img = 'VENDOR/etc/recovery.img'
168  else:
169    script_path = 'SYSTEM/vendor/bin/install-recovery.sh'
170    recovery_img = 'SYSTEM/vendor/etc/recovery.img'
171
172  if not os.path.exists(os.path.join(input_tmp, script_path)):
173    logging.info('%s does not exist in input_tmp', script_path)
174    return
175
176  logging.info('Checking %s', script_path)
177  with open(os.path.join(input_tmp, script_path), 'r') as script:
178    lines = script.read().strip().split('\n')
179  assert len(lines) >= 10
180  check_cmd = re.search(r'if ! applypatch --check (\w+:.+:\w+:\w+);',
181                        lines[1].strip())
182  check_partition = check_cmd.group(1)
183  assert len(check_partition.split(':')) == 4
184
185  full_recovery_image = info_dict.get("full_recovery_image") == "true"
186  if full_recovery_image:
187    assert len(lines) == 10, "Invalid line count: {}".format(lines)
188
189    # Expect something like "EMMC:/dev/block/recovery:28:5f9c..62e3".
190    target = re.search(r'--target (.+) &&', lines[4].strip())
191    assert target is not None, \
192        "Failed to parse target line \"{}\"".format(lines[4])
193    flash_partition = target.group(1)
194
195    # Check we have the same recovery target in the check and flash commands.
196    assert check_partition == flash_partition, \
197        "Mismatching targets: {} vs {}".format(check_partition, flash_partition)
198
199    # Validate the SHA-1 of the recovery image.
200    recovery_sha1 = flash_partition.split(':')[3]
201    ValidateFileAgainstSha1(
202        input_tmp, 'recovery.img', recovery_img, recovery_sha1)
203  else:
204    assert len(lines) == 11, "Invalid line count: {}".format(lines)
205
206    # --source boot_type:boot_device:boot_size:boot_sha1
207    source = re.search(r'--source (\w+:.+:\w+:\w+) \\', lines[4].strip())
208    assert source is not None, \
209        "Failed to parse source line \"{}\"".format(lines[4])
210
211    source_partition = source.group(1)
212    source_info = source_partition.split(':')
213    assert len(source_info) == 4, \
214        "Invalid source partition: {}".format(source_partition)
215    ValidateFileAgainstSha1(input_tmp, file_name='boot.img',
216                            file_path='IMAGES/boot.img',
217                            expected_sha1=source_info[3])
218
219    # --target recovery_type:recovery_device:recovery_size:recovery_sha1
220    target = re.search(r'--target (\w+:.+:\w+:\w+) && \\', lines[5].strip())
221    assert target is not None, \
222        "Failed to parse target line \"{}\"".format(lines[5])
223    target_partition = target.group(1)
224
225    # Check we have the same recovery target in the check and patch commands.
226    assert check_partition == target_partition, \
227        "Mismatching targets: {} vs {}".format(
228            check_partition, target_partition)
229
230    recovery_info = target_partition.split(':')
231    assert len(recovery_info) == 4, \
232        "Invalid target partition: {}".format(target_partition)
233    ValidateFileAgainstSha1(input_tmp, file_name='recovery.img',
234                            file_path='IMAGES/recovery.img',
235                            expected_sha1=recovery_info[3])
236
237  logging.info('Done checking %s', script_path)
238
239
240# Symlink files in `src` to `dst`, if the files do not
241# already exists in `dst` directory.
242def symlinkIfNotExists(src, dst):
243  if not os.path.isdir(src):
244    return
245  for filename in os.listdir(src):
246    if os.path.exists(os.path.join(dst, filename)):
247      continue
248    os.symlink(os.path.join(src, filename), os.path.join(dst, filename))
249
250
251def ValidateVerifiedBootImages(input_tmp, info_dict, options):
252  """Validates the Verified Boot related images.
253
254  For Verified Boot 1.0, it verifies the signatures of the bootable images
255  (boot/recovery etc), as well as the dm-verity metadata in system images
256  (system/vendor/product). For Verified Boot 2.0, it calls avbtool to verify
257  vbmeta.img, which in turn verifies all the descriptors listed in vbmeta.
258
259  Args:
260    input_tmp: The top-level directory of unpacked target-files.zip.
261    info_dict: The loaded info dict.
262    options: A dict that contains the user-supplied public keys to be used for
263        image verification. In particular, 'verity_key' is used to verify the
264        bootable images in VB 1.0, and the vbmeta image in VB 2.0, where
265        applicable. 'verity_key_mincrypt' will be used to verify the system
266        images in VB 1.0.
267
268  Raises:
269    AssertionError: On any verification failure.
270  """
271  # See bug 159299583
272  # After commit 5277d1015, some images (e.g. acpio.img and tos.img) are no
273  # longer copied from RADIO to the IMAGES folder. But avbtool assumes that
274  # images are in IMAGES folder. So we symlink them.
275  symlinkIfNotExists(os.path.join(input_tmp, "RADIO"),
276                    os.path.join(input_tmp, "IMAGES"))
277  # Verified boot 1.0 (images signed with boot_signer and verity_signer).
278  if info_dict.get('boot_signer') == 'true':
279    logging.info('Verifying Verified Boot images...')
280
281    # Verify the boot/recovery images (signed with boot_signer), against the
282    # given X.509 encoded pubkey (or falling back to the one in the info_dict if
283    # none given).
284    verity_key = options['verity_key']
285    if verity_key is None:
286      verity_key = info_dict['verity_key'] + '.x509.pem'
287    for image in ('boot.img', 'recovery.img', 'recovery-two-step.img'):
288      if image == 'recovery-two-step.img':
289        image_path = os.path.join(input_tmp, 'OTA', image)
290      else:
291        image_path = os.path.join(input_tmp, 'IMAGES', image)
292      if not os.path.exists(image_path):
293        continue
294
295      cmd = ['boot_signer', '-verify', image_path, '-certificate', verity_key]
296      proc = common.Run(cmd)
297      stdoutdata, _ = proc.communicate()
298      assert proc.returncode == 0, \
299          'Failed to verify {} with boot_signer:\n{}'.format(image, stdoutdata)
300      logging.info(
301          'Verified %s with boot_signer (key: %s):\n%s', image, verity_key,
302          stdoutdata.rstrip())
303
304  # Verify verity signed system images in Verified Boot 1.0. Note that not using
305  # 'elif' here, since 'boot_signer' and 'verity' are not bundled in VB 1.0.
306  if info_dict.get('verity') == 'true':
307    # First verify that the verity key is built into the root image (regardless
308    # of system-as-root).
309    verity_key_mincrypt = os.path.join(input_tmp, 'ROOT', 'verity_key')
310    assert os.path.exists(verity_key_mincrypt), 'Missing verity_key'
311
312    # Verify /verity_key matches the one given via command line, if any.
313    if options['verity_key_mincrypt'] is None:
314      logging.warn(
315          'Skipped checking the content of /verity_key, as the key file not '
316          'provided. Use --verity_key_mincrypt to specify.')
317    else:
318      expected_key = options['verity_key_mincrypt']
319      assert filecmp.cmp(expected_key, verity_key_mincrypt, shallow=False), \
320          "Mismatching mincrypt verity key files"
321      logging.info('Verified the content of /verity_key')
322
323    # For devices with a separate ramdisk (i.e. non-system-as-root), there must
324    # be a copy in ramdisk.
325    if info_dict.get("system_root_image") != "true":
326      verity_key_ramdisk = os.path.join(
327          input_tmp, 'BOOT', 'RAMDISK', 'verity_key')
328      assert os.path.exists(verity_key_ramdisk), 'Missing verity_key in ramdisk'
329
330      assert filecmp.cmp(
331          verity_key_mincrypt, verity_key_ramdisk, shallow=False), \
332              'Mismatching verity_key files in root and ramdisk'
333      logging.info('Verified the content of /verity_key in ramdisk')
334
335    # Then verify the verity signed system/vendor/product images, against the
336    # verity pubkey in mincrypt format.
337    for image in ('system.img', 'vendor.img', 'product.img'):
338      image_path = os.path.join(input_tmp, 'IMAGES', image)
339
340      # We are not checking if the image is actually enabled via info_dict (e.g.
341      # 'system_verity_block_device=...'). Because it's most likely a bug that
342      # skips signing some of the images in signed target-files.zip, while
343      # having the top-level verity flag enabled.
344      if not os.path.exists(image_path):
345        continue
346
347      cmd = ['verity_verifier', image_path, '-mincrypt', verity_key_mincrypt]
348      proc = common.Run(cmd)
349      stdoutdata, _ = proc.communicate()
350      assert proc.returncode == 0, \
351          'Failed to verify {} with verity_verifier (key: {}):\n{}'.format(
352              image, verity_key_mincrypt, stdoutdata)
353      logging.info(
354          'Verified %s with verity_verifier (key: %s):\n%s', image,
355          verity_key_mincrypt, stdoutdata.rstrip())
356
357  # Handle the case of Verified Boot 2.0 (AVB).
358  if info_dict.get("avb_enable") == "true":
359    logging.info('Verifying Verified Boot 2.0 (AVB) images...')
360
361    key = options['verity_key']
362    if key is None:
363      key = info_dict['avb_vbmeta_key_path']
364
365    # avbtool verifies all the images that have descriptors listed in vbmeta.
366    # Using `--follow_chain_partitions` so it would additionally verify chained
367    # vbmeta partitions (e.g. vbmeta_system).
368    image = os.path.join(input_tmp, 'IMAGES', 'vbmeta.img')
369    cmd = [info_dict['avb_avbtool'], 'verify_image', '--image', image,
370           '--follow_chain_partitions']
371
372    # Custom images.
373    custom_partitions = info_dict.get(
374        "avb_custom_images_partition_list", "").strip().split()
375
376    # Append the args for chained partitions if any.
377    for partition in (common.AVB_PARTITIONS + common.AVB_VBMETA_PARTITIONS +
378                      tuple(custom_partitions)):
379      key_name = 'avb_' + partition + '_key_path'
380      if info_dict.get(key_name) is not None:
381        if info_dict.get('ab_update') != 'true' and partition == 'recovery':
382          continue
383
384        # Use the key file from command line if specified; otherwise fall back
385        # to the one in info dict.
386        key_file = options.get(key_name, info_dict[key_name])
387        chained_partition_arg = common.GetAvbChainedPartitionArg(
388            partition, info_dict, key_file)
389        cmd.extend(['--expected_chain_partition', chained_partition_arg])
390
391    # Handle the boot image with a non-default name, e.g. boot-5.4.img
392    boot_images = info_dict.get("boot_images")
393    if boot_images:
394      # we used the 1st boot image to generate the vbmeta. Rename the filename
395      # to boot.img so that avbtool can find it correctly.
396      first_image_name = boot_images.split()[0]
397      first_image_path = os.path.join(input_tmp, 'IMAGES', first_image_name)
398      assert os.path.isfile(first_image_path)
399      renamed_boot_image_path = os.path.join(input_tmp, 'IMAGES', 'boot.img')
400      os.rename(first_image_path, renamed_boot_image_path)
401
402    proc = common.Run(cmd)
403    stdoutdata, _ = proc.communicate()
404    assert proc.returncode == 0, \
405        'Failed to verify {} with avbtool (key: {}):\n{}'.format(
406            image, key, stdoutdata)
407
408    logging.info(
409        'Verified %s with avbtool (key: %s):\n%s', image, key,
410        stdoutdata.rstrip())
411
412    # avbtool verifies recovery image for non-A/B devices.
413    if (info_dict.get('ab_update') != 'true' and
414        info_dict.get('no_recovery') != 'true'):
415      image = os.path.join(input_tmp, 'IMAGES', 'recovery.img')
416      key = info_dict['avb_recovery_key_path']
417      cmd = [info_dict['avb_avbtool'], 'verify_image', '--image', image,
418             '--key', key]
419      proc = common.Run(cmd)
420      stdoutdata, _ = proc.communicate()
421      assert proc.returncode == 0, \
422          'Failed to verify {} with avbtool (key: {}):\n{}'.format(
423              image, key, stdoutdata)
424      logging.info(
425          'Verified %s with avbtool (key: %s):\n%s', image, key,
426          stdoutdata.rstrip())
427
428
429def CheckDataInconsistency(lines):
430    build_prop = {}
431    for line in lines:
432      if line.startswith("import") or line.startswith("#"):
433        continue
434      if "=" not in line:
435        continue
436
437      key, value = line.rstrip().split("=", 1)
438      if key in build_prop:
439        logging.info("Duplicated key found for {}".format(key))
440        if value != build_prop[key]:
441          logging.error("Key {} is defined twice with different values {} vs {}"
442                        .format(key, value, build_prop[key]))
443          return key
444      build_prop[key] = value
445
446
447def CheckBuildPropDuplicity(input_tmp):
448  """Check all buld.prop files inside directory input_tmp, raise error
449  if they contain duplicates"""
450
451  if not os.path.isdir(input_tmp):
452    raise ValueError("Expect {} to be a directory".format(input_tmp))
453  for name in os.listdir(input_tmp):
454    if not name.isupper():
455      continue
456    for prop_file in ['build.prop', 'etc/build.prop']:
457      path = os.path.join(input_tmp, name, prop_file)
458      if not os.path.exists(path):
459        continue
460      logging.info("Checking {}".format(path))
461      with open(path, 'r') as fp:
462        dupKey = CheckDataInconsistency(fp.readlines())
463        if dupKey:
464          raise ValueError("{} contains duplicate keys for {}".format(
465              path, dupKey))
466
467
468def main():
469  parser = argparse.ArgumentParser(
470      description=__doc__,
471      formatter_class=argparse.RawDescriptionHelpFormatter)
472  parser.add_argument(
473      'target_files',
474      help='the input target_files.zip to be validated')
475  parser.add_argument(
476      '--verity_key',
477      help='the verity public key to verify the bootable images (Verified '
478           'Boot 1.0), or the vbmeta image (Verified Boot 2.0, aka AVB), where '
479           'applicable')
480  for partition in common.AVB_PARTITIONS + common.AVB_VBMETA_PARTITIONS:
481    parser.add_argument(
482        '--avb_' + partition + '_key_path',
483        help='the public or private key in PEM format to verify AVB chained '
484             'partition of {}'.format(partition))
485  parser.add_argument(
486      '--verity_key_mincrypt',
487      help='the verity public key in mincrypt format to verify the system '
488           'images, if target using Verified Boot 1.0')
489  args = parser.parse_args()
490
491  # Unprovided args will have 'None' as the value.
492  options = vars(args)
493
494  logging_format = '%(asctime)s - %(filename)s - %(levelname)-8s: %(message)s'
495  date_format = '%Y/%m/%d %H:%M:%S'
496  logging.basicConfig(level=logging.INFO, format=logging_format,
497                      datefmt=date_format)
498
499  logging.info("Unzipping the input target_files.zip: %s", args.target_files)
500  input_tmp = common.UnzipTemp(args.target_files)
501
502  info_dict = common.LoadInfoDict(input_tmp)
503  with zipfile.ZipFile(args.target_files, 'r', allowZip64=True) as input_zip:
504    ValidateFileConsistency(input_zip, input_tmp, info_dict)
505
506  CheckBuildPropDuplicity(input_tmp)
507
508  ValidateInstallRecoveryScript(input_tmp, info_dict)
509
510  ValidateVerifiedBootImages(input_tmp, info_dict, options)
511
512  # TODO: Check if the OTA keys have been properly updated (the ones on /system,
513  # in recovery image).
514
515  logging.info("Done.")
516
517
518if __name__ == '__main__':
519  try:
520    main()
521  finally:
522    common.Cleanup()
523