1#!/usr/bin/env python
2
3# Copyright (C) 2017 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Validate a given (signed) target_files.zip.
19
20It performs checks to ensure the integrity of the input zip.
21 - It verifies the file consistency between the ones in IMAGES/system.img (read
22   via IMAGES/system.map) and the ones under unpacked folder of SYSTEM/. The
23   same check also applies to the vendor image if present.
24"""
25
26import logging
27import os.path
28import re
29import sys
30import zipfile
31
32import common
33
34
35def _ReadFile(file_name, unpacked_name, round_up=False):
36  """Constructs and returns a File object. Rounds up its size if needed."""
37
38  assert os.path.exists(unpacked_name)
39  with open(unpacked_name, 'r') as f:
40    file_data = f.read()
41  file_size = len(file_data)
42  if round_up:
43    file_size_rounded_up = common.RoundUpTo4K(file_size)
44    file_data += '\0' * (file_size_rounded_up - file_size)
45  return common.File(file_name, file_data)
46
47
48def ValidateFileAgainstSha1(input_tmp, file_name, file_path, expected_sha1):
49  """Check if the file has the expected SHA-1."""
50
51  logging.info('Validating the SHA-1 of %s', file_name)
52  unpacked_name = os.path.join(input_tmp, file_path)
53  assert os.path.exists(unpacked_name)
54  actual_sha1 = _ReadFile(file_name, unpacked_name, False).sha1
55  assert actual_sha1 == expected_sha1, \
56      'SHA-1 mismatches for {}. actual {}, expected {}'.format(
57          file_name, actual_sha1, expected_sha1)
58
59
60def ValidateFileConsistency(input_zip, input_tmp, info_dict):
61  """Compare the files from image files and unpacked folders."""
62
63  def CheckAllFiles(which):
64    logging.info('Checking %s image.', which)
65    # Allow having shared blocks when loading the sparse image, because allowing
66    # that doesn't affect the checks below (we will have all the blocks on file,
67    # unless it's skipped due to the holes).
68    image = common.GetSparseImage(which, input_tmp, input_zip, True)
69    prefix = '/' + which
70    for entry in image.file_map:
71      # Skip entries like '__NONZERO-0'.
72      if not entry.startswith(prefix):
73        continue
74
75      # Read the blocks that the file resides. Note that it will contain the
76      # bytes past the file length, which is expected to be padded with '\0's.
77      ranges = image.file_map[entry]
78
79      incomplete = ranges.extra.get('incomplete', False)
80      if incomplete:
81        logging.warning('Skipping %s that has incomplete block list', entry)
82        continue
83
84      # TODO(b/79951650): Handle files with non-monotonic ranges.
85      if not ranges.monotonic:
86        logging.warning(
87            'Skipping %s that has non-monotonic ranges: %s', entry, ranges)
88        continue
89
90      blocks_sha1 = image.RangeSha1(ranges)
91
92      # The filename under unpacked directory, such as SYSTEM/bin/sh.
93      unpacked_name = os.path.join(
94          input_tmp, which.upper(), entry[(len(prefix) + 1):])
95      unpacked_file = _ReadFile(entry, unpacked_name, True)
96      file_sha1 = unpacked_file.sha1
97      assert blocks_sha1 == file_sha1, \
98          'file: %s, range: %s, blocks_sha1: %s, file_sha1: %s' % (
99              entry, ranges, blocks_sha1, file_sha1)
100
101  logging.info('Validating file consistency.')
102
103  # TODO(b/79617342): Validate non-sparse images.
104  if info_dict.get('extfs_sparse_flag') != '-s':
105    logging.warning('Skipped due to target using non-sparse images')
106    return
107
108  # Verify IMAGES/system.img.
109  CheckAllFiles('system')
110
111  # Verify IMAGES/vendor.img if applicable.
112  if 'VENDOR/' in input_zip.namelist():
113    CheckAllFiles('vendor')
114
115  # Not checking IMAGES/system_other.img since it doesn't have the map file.
116
117
118def ValidateInstallRecoveryScript(input_tmp, info_dict):
119  """Validate the SHA-1 embedded in install-recovery.sh.
120
121  install-recovery.sh is written in common.py and has the following format:
122
123  1. full recovery:
124  ...
125  if ! applypatch -c type:device:size:SHA-1; then
126  applypatch /system/etc/recovery.img type:device sha1 size && ...
127  ...
128
129  2. recovery from boot:
130  ...
131  applypatch [-b bonus_args] boot_info recovery_info recovery_sha1 \
132  recovery_size patch_info && ...
133  ...
134
135  For full recovery, we want to calculate the SHA-1 of /system/etc/recovery.img
136  and compare it against the one embedded in the script. While for recovery
137  from boot, we want to check the SHA-1 for both recovery.img and boot.img
138  under IMAGES/.
139  """
140
141  script_path = 'SYSTEM/bin/install-recovery.sh'
142  if not os.path.exists(os.path.join(input_tmp, script_path)):
143    logging.info('%s does not exist in input_tmp', script_path)
144    return
145
146  logging.info('Checking %s', script_path)
147  with open(os.path.join(input_tmp, script_path), 'r') as script:
148    lines = script.read().strip().split('\n')
149  assert len(lines) >= 6
150  check_cmd = re.search(r'if ! applypatch -c \w+:.+:\w+:(\w+);',
151                        lines[1].strip())
152  expected_recovery_check_sha1 = check_cmd.group(1)
153  patch_cmd = re.search(r'(applypatch.+)&&', lines[2].strip())
154  applypatch_argv = patch_cmd.group(1).strip().split()
155
156  full_recovery_image = info_dict.get("full_recovery_image") == "true"
157  if full_recovery_image:
158    assert len(applypatch_argv) == 5
159    # Check we have the same expected SHA-1 of recovery.img in both check mode
160    # and patch mode.
161    expected_recovery_sha1 = applypatch_argv[3].strip()
162    assert expected_recovery_check_sha1 == expected_recovery_sha1
163    ValidateFileAgainstSha1(input_tmp, 'recovery.img',
164                            'SYSTEM/etc/recovery.img', expected_recovery_sha1)
165  else:
166    # We're patching boot.img to get recovery.img where bonus_args is optional
167    if applypatch_argv[1] == "-b":
168      assert len(applypatch_argv) == 8
169      boot_info_index = 3
170    else:
171      assert len(applypatch_argv) == 6
172      boot_info_index = 1
173
174    # boot_info: boot_type:boot_device:boot_size:boot_sha1
175    boot_info = applypatch_argv[boot_info_index].strip().split(':')
176    assert len(boot_info) == 4
177    ValidateFileAgainstSha1(input_tmp, file_name='boot.img',
178                            file_path='IMAGES/boot.img',
179                            expected_sha1=boot_info[3])
180
181    recovery_sha1_index = boot_info_index + 2
182    expected_recovery_sha1 = applypatch_argv[recovery_sha1_index]
183    assert expected_recovery_check_sha1 == expected_recovery_sha1
184    ValidateFileAgainstSha1(input_tmp, file_name='recovery.img',
185                            file_path='IMAGES/recovery.img',
186                            expected_sha1=expected_recovery_sha1)
187
188  logging.info('Done checking %s', script_path)
189
190
191def main(argv):
192  def option_handler():
193    return True
194
195  args = common.ParseOptions(
196      argv, __doc__, extra_opts="",
197      extra_long_opts=[],
198      extra_option_handler=option_handler)
199
200  if len(args) != 1:
201    common.Usage(__doc__)
202    sys.exit(1)
203
204  logging_format = '%(asctime)s - %(filename)s - %(levelname)-8s: %(message)s'
205  date_format = '%Y/%m/%d %H:%M:%S'
206  logging.basicConfig(level=logging.INFO, format=logging_format,
207                      datefmt=date_format)
208
209  logging.info("Unzipping the input target_files.zip: %s", args[0])
210  input_tmp = common.UnzipTemp(args[0])
211
212  info_dict = common.LoadInfoDict(input_tmp)
213  with zipfile.ZipFile(args[0], 'r') as input_zip:
214    ValidateFileConsistency(input_zip, input_tmp, info_dict)
215
216  ValidateInstallRecoveryScript(input_tmp, info_dict)
217
218  # TODO: Check if the OTA keys have been properly updated (the ones on /system,
219  # in recovery image).
220
221  logging.info("Done.")
222
223
224if __name__ == '__main__':
225  try:
226    main(sys.argv[1:])
227  finally:
228    common.Cleanup()
229