1#!/usr/bin/env python 2# 3# Copyright (C) 2021 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16"""sign_virt_apex is a command line tool for sign the Virt APEX file. 17 18Typical usage: 19 sign_virt_apex payload_key payload_dir 20 -v, --verbose 21 --verify 22 --avbtool path_to_avbtool 23 --signing_args args 24 25sign_virt_apex uses external tools which are assumed to be available via PATH. 26- avbtool (--avbtool can override the tool) 27- lpmake, lpunpack, simg2img, img2simg, initrd_bootconfig 28""" 29import argparse 30import binascii 31import builtins 32import hashlib 33import os 34import re 35import shlex 36import subprocess 37import sys 38import tempfile 39import traceback 40from concurrent import futures 41 42# pylint: disable=line-too-long,consider-using-with 43 44# Use executor to parallelize the invocation of external tools 45# If a task depends on another, pass the future object of the previous task as wait list. 46# Every future object created by a task should be consumed with AwaitAll() 47# so that exceptions are propagated . 48executor = futures.ThreadPoolExecutor() 49 50# Temporary directory for unpacked super.img. 51# We could put its creation/deletion into the task graph as well, but 52# having it as a global setup is much simpler. 53unpack_dir = tempfile.TemporaryDirectory() 54 55# tasks created with Async() are kept in a list so that they are awaited 56# before exit. 57tasks = [] 58 59# create an async task and return a future value of it. 60def Async(fn, *args, wait=None, **kwargs): 61 62 # wrap a function with AwaitAll() 63 def wrapped(): 64 AwaitAll(wait) 65 fn(*args, **kwargs) 66 67 task = executor.submit(wrapped) 68 tasks.append(task) 69 return task 70 71 72# waits for task (captured in fs as future values) with future.result() 73# so that any exception raised during task can be raised upward. 74def AwaitAll(fs): 75 if fs: 76 for f in fs: 77 f.result() 78 79 80def ParseArgs(argv): 81 parser = argparse.ArgumentParser(description='Sign the Virt APEX') 82 parser.add_argument('--verify', action='store_true', 83 help='Verify the Virt APEX') 84 parser.add_argument( 85 '-v', '--verbose', 86 action='store_true', 87 help='verbose execution') 88 parser.add_argument( 89 '--avbtool', 90 default='avbtool', 91 help='Optional flag that specifies the AVB tool to use. Defaults to `avbtool`.') 92 parser.add_argument( 93 '--signing_args', 94 help='the extra signing arguments passed to avbtool.' 95 ) 96 parser.add_argument( 97 '--key_override', 98 metavar="filename=key", 99 action='append', 100 help='Overrides a signing key for a file e.g. microdroid_bootloader=mykey (for testing)') 101 parser.add_argument( 102 'key', 103 help='path to the private key file.') 104 parser.add_argument( 105 'input_dir', 106 help='the directory having files to be packaged') 107 parser.add_argument( 108 '--do_not_update_bootconfigs', 109 action='store_true', 110 help='This will NOT update the vbmeta related bootconfigs while signing the apex.\ 111 Used for testing only!!') 112 parser.add_argument('--do_not_validate_avb_version', action='store_true', help='Do not validate the avb_version when updating vbmeta bootconfig. Only use in tests!') 113 args = parser.parse_args(argv) 114 # preprocess --key_override into a map 115 args.key_overrides = {} 116 if args.key_override: 117 for pair in args.key_override: 118 name, key = pair.split('=') 119 args.key_overrides[name] = key 120 return args 121 122 123def RunCommand(args, cmd, env=None, expected_return_values=None): 124 expected_return_values = expected_return_values or {0} 125 env = env or {} 126 env.update(os.environ.copy()) 127 128 # TODO(b/193504286): we need a way to find other tool (cmd[0]) in various contexts 129 # e.g. sign_apex.py, sign_target_files_apk.py 130 if cmd[0] == 'avbtool': 131 cmd[0] = args.avbtool 132 133 if args.verbose: 134 print('Running: ' + ' '.join(cmd)) 135 p = subprocess.Popen( 136 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, universal_newlines=True) 137 output, _ = p.communicate() 138 139 if args.verbose or p.returncode not in expected_return_values: 140 print(output.rstrip()) 141 142 assert p.returncode in expected_return_values, ( 143 '%d Failed to execute: ' + ' '.join(cmd)) % p.returncode 144 return (output, p.returncode) 145 146 147def ReadBytesSize(value): 148 return int(value.removesuffix(' bytes')) 149 150 151def ExtractAvbPubkey(args, key, output): 152 RunCommand(args, ['avbtool', 'extract_public_key', 153 '--key', key, '--output', output]) 154 155 156def is_lz4(args, path): 157 # error 44: Unrecognized header 158 result = RunCommand(args, ['lz4', '-t', path], expected_return_values={0, 44}) 159 return result[1] == 0 160 161 162def AvbInfo(args, image_path): 163 """Parses avbtool --info image output 164 165 Args: 166 args: program arguments. 167 image_path: The path to the image, either raw or lz4 compressed 168 descriptor_name: Descriptor name of interest. 169 170 Returns: 171 A pair of 172 - a dict that contains VBMeta info. None if there's no VBMeta info. 173 - a list of descriptors. 174 """ 175 if not os.path.exists(image_path): 176 raise ValueError(f'Failed to find image: {image_path}') 177 178 if is_lz4(args, image_path): 179 with tempfile.NamedTemporaryFile() as decompressed_image: 180 RunCommand(args, ['lz4', '-d', '-f', image_path, decompressed_image.name]) 181 return AvbInfo(args, decompressed_image.name) 182 183 output, ret_code = RunCommand( 184 args, ['avbtool', 'info_image', '--image', image_path], expected_return_values={0, 1}) 185 if ret_code == 1: 186 return None, None 187 188 info, descriptors = {}, [] 189 190 # Read `avbtool info_image` output as "key:value" lines 191 matcher = re.compile(r'^(\s*)([^:]+):\s*(.*)$') 192 193 def IterateLine(output): 194 for line in output.split('\n'): 195 line_info = matcher.match(line) 196 if not line_info: 197 continue 198 yield line_info.group(1), line_info.group(2), line_info.group(3) 199 200 gen = IterateLine(output) 201 202 def ReadDescriptors(cur_indent, cur_name, cur_value): 203 descriptor = cur_value if cur_name == 'Prop' else {} 204 descriptors.append((cur_name, descriptor)) 205 for indent, key, value in gen: 206 if indent <= cur_indent: 207 # read descriptors recursively to pass the read key as descriptor name 208 ReadDescriptors(indent, key, value) 209 break 210 descriptor[key] = value 211 212 # Read VBMeta info 213 for _, key, value in gen: 214 if key == 'Descriptors': 215 ReadDescriptors(*next(gen)) 216 break 217 info[key] = value 218 219 return info, descriptors 220 221 222def find_all_values_by_key(pairs, key): 223 """Find all the values of the key in the pairs.""" 224 return [v for (k, v) in pairs if k == key] 225 226# Extract properties from the descriptors of original vbmeta image, 227# append to command as parameter. 228def AppendPropArgument(cmd, descriptors): 229 for prop in find_all_values_by_key(descriptors, 'Prop'): 230 cmd.append('--prop') 231 result = re.match(r"(.+) -> '(.+)'", prop) 232 cmd.append(result.group(1) + ":" + result.group(2)) 233 234 235def check_resigned_image_avb_info(image_path, original_info, original_descriptors, args): 236 updated_info, updated_descriptors = AvbInfo(args, image_path) 237 assert original_info is not None, f'no avbinfo on original image: {image_path}' 238 assert updated_info is not None, f'no avbinfo on resigned image: {image_path}' 239 assert_different_value(original_info, updated_info, "Public key (sha1)", image_path) 240 updated_public_key = updated_info.pop("Public key (sha1)") 241 if not hasattr(check_resigned_image_avb_info, "new_public_key"): 242 check_resigned_image_avb_info.new_public_key = updated_public_key 243 else: 244 assert check_resigned_image_avb_info.new_public_key == updated_public_key, \ 245 "All images should be resigned with the same public key. Expected public key (sha1):" \ 246 f" {check_resigned_image_avb_info.new_public_key}, actual public key (sha1): " \ 247 f"{updated_public_key}, Path: {image_path}" 248 original_info.pop("Public key (sha1)") 249 assert original_info == updated_info, \ 250 f"Original info and updated info should be the same for {image_path}. " \ 251 f"Original info: {original_info}, updated info: {updated_info}" 252 253 # Verify the descriptors of the original and updated images. 254 assert len(original_descriptors) == len(updated_descriptors), \ 255 f"Number of descriptors should be the same for {image_path}. " \ 256 f"Original descriptors: {original_descriptors}, updated descriptors: {updated_descriptors}" 257 original_prop_descriptors = sorted(find_all_values_by_key(original_descriptors, "Prop")) 258 updated_prop_descriptors = sorted(find_all_values_by_key(updated_descriptors, "Prop")) 259 assert original_prop_descriptors == updated_prop_descriptors, \ 260 f"Prop descriptors should be the same for {image_path}. " \ 261 f"Original prop descriptors: {original_prop_descriptors}, " \ 262 f"updated prop descriptors: {updated_prop_descriptors}" 263 264 # Remove digest from hash descriptors before comparing, since some digests should change. 265 original_hash_descriptors = extract_hash_descriptors(original_descriptors, drop_digest) 266 updated_hash_descriptors = extract_hash_descriptors(updated_descriptors, drop_digest) 267 assert original_hash_descriptors == updated_hash_descriptors, \ 268 f"Hash descriptors' parameters should be the same for {image_path}. " \ 269 f"Original hash descriptors: {original_hash_descriptors}, " \ 270 f"updated hash descriptors: {updated_hash_descriptors}" 271 272def drop_digest(descriptor): 273 return {k: v for k, v in descriptor.items() if k != "Digest"} 274 275def AddHashFooter(args, key, image_path, additional_images=()): 276 if os.path.basename(image_path) in args.key_overrides: 277 key = args.key_overrides[os.path.basename(image_path)] 278 info, descriptors = AvbInfo(args, image_path) 279 assert info is not None, f'no avbinfo: {image_path}' 280 281 # Extract hash descriptor of original image. 282 hash_descriptors_original = extract_hash_descriptors(descriptors, drop_digest) 283 for additional_image in additional_images: 284 _, additional_desc = AvbInfo(args, additional_image) 285 hash_descriptors = extract_hash_descriptors(additional_desc, drop_digest) 286 for k, v in hash_descriptors.items(): 287 assert v == hash_descriptors_original[k], \ 288 f"Hash descriptor of {k} in {additional_image} and {image_path} should be " \ 289 f"the same. {additional_image}: {v}, {image_path}: {hash_descriptors_original[k]}" 290 del hash_descriptors_original[k] 291 assert len(hash_descriptors_original) == 1, \ 292 f"Only one hash descriptor is expected for {image_path} after removing " \ 293 f"additional images. Hash descriptors: {hash_descriptors_original}" 294 [(original_image_partition_name, original_image_descriptor)] = hash_descriptors_original.items() 295 assert info["Original image size"] == original_image_descriptor["Image Size"], \ 296 f"Original image size should be the same as the image size in the hash descriptor " \ 297 f"for {image_path}. Original image size: {info['Original image size']}, " \ 298 f"image size in the hash descriptor: {original_image_descriptor['Image Size']}" 299 300 partition_size = str(ReadBytesSize(info['Image size'])) 301 algorithm = info['Algorithm'] 302 original_image_salt = original_image_descriptor['Salt'] 303 304 cmd = ['avbtool', 'add_hash_footer', 305 '--key', key, 306 '--algorithm', algorithm, 307 '--partition_name', original_image_partition_name, 308 '--salt', original_image_salt, 309 '--partition_size', partition_size, 310 '--image', image_path] 311 AppendPropArgument(cmd, descriptors) 312 if args.signing_args: 313 cmd.extend(shlex.split(args.signing_args)) 314 for additional_image in additional_images: 315 cmd.extend(['--include_descriptors_from_image', additional_image]) 316 cmd.extend(['--rollback_index', info['Rollback Index']]) 317 318 RunCommand(args, cmd) 319 check_resigned_image_avb_info(image_path, info, descriptors, args) 320 321def AddHashTreeFooter(args, key, image_path): 322 if os.path.basename(image_path) in args.key_overrides: 323 key = args.key_overrides[os.path.basename(image_path)] 324 info, descriptors = AvbInfo(args, image_path) 325 if info: 326 descriptor = find_all_values_by_key(descriptors, 'Hashtree descriptor')[0] 327 image_size = ReadBytesSize(info['Image size']) 328 algorithm = info['Algorithm'] 329 partition_name = descriptor['Partition Name'] 330 hash_algorithm = descriptor['Hash Algorithm'] 331 salt = descriptor['Salt'] 332 partition_size = str(image_size) 333 cmd = ['avbtool', 'add_hashtree_footer', 334 '--key', key, 335 '--algorithm', algorithm, 336 '--partition_name', partition_name, 337 '--partition_size', partition_size, 338 '--do_not_generate_fec', 339 '--hash_algorithm', hash_algorithm, 340 '--salt', salt, 341 '--image', image_path] 342 AppendPropArgument(cmd, descriptors) 343 if args.signing_args: 344 cmd.extend(shlex.split(args.signing_args)) 345 RunCommand(args, cmd) 346 check_resigned_image_avb_info(image_path, info, descriptors, args) 347 348 349def UpdateVbmetaBootconfig(args, initrds, vbmeta_img): 350 # Update the bootconfigs in ramdisk 351 def detach_bootconfigs(initrd_bc, initrd, bc): 352 cmd = ['initrd_bootconfig', 'detach', initrd_bc, initrd, bc] 353 RunCommand(args, cmd) 354 355 def attach_bootconfigs(initrd_bc, initrd, bc): 356 cmd = ['initrd_bootconfig', 'attach', 357 initrd, bc, '--output', initrd_bc] 358 RunCommand(args, cmd) 359 360 # Validate that avb version used while signing the apex is the same as used by build server 361 def validate_avb_version(bootconfigs): 362 cmd = ['avbtool', 'version'] 363 stdout, _ = RunCommand(args, cmd) 364 avb_version_curr = stdout.split(" ")[1].strip() 365 avb_version_curr = avb_version_curr[0:avb_version_curr.rfind('.')] 366 367 avb_version_bc = re.search( 368 r"androidboot.vbmeta.avb_version = \"([^\"]*)\"", bootconfigs).group(1) 369 if avb_version_curr != avb_version_bc: 370 raise builtins.Exception(f'AVB version mismatch between current & one & \ 371 used to build bootconfigs:{avb_version_curr}&{avb_version_bc}') 372 373 def calc_vbmeta_digest(): 374 cmd = ['avbtool', 'calculate_vbmeta_digest', '--image', 375 vbmeta_img, '--hash_algorithm', 'sha256'] 376 stdout, _ = RunCommand(args, cmd) 377 return stdout.strip() 378 379 def calc_vbmeta_size(): 380 cmd = ['avbtool', 'info_image', '--image', vbmeta_img] 381 stdout, _ = RunCommand(args, cmd) 382 size = 0 383 for line in stdout.split("\n"): 384 line = line.split(":") 385 if line[0] in ['Header Block', 'Authentication Block', 'Auxiliary Block']: 386 size += int(line[1].strip()[0:-6]) 387 return size 388 389 def update_vbmeta_digest(bootconfigs): 390 # Update androidboot.vbmeta.digest in bootconfigs 391 result = re.search( 392 r"androidboot.vbmeta.digest = \"[^\"]*\"", bootconfigs) 393 if not result: 394 raise ValueError("Failed to find androidboot.vbmeta.digest") 395 396 return bootconfigs.replace(result.group(), 397 f'androidboot.vbmeta.digest = "{calc_vbmeta_digest()}"') 398 399 def update_vbmeta_size(bootconfigs): 400 # Update androidboot.vbmeta.size in bootconfigs 401 result = re.search(r"androidboot.vbmeta.size = [0-9]+", bootconfigs) 402 if not result: 403 raise ValueError("Failed to find androidboot.vbmeta.size") 404 return bootconfigs.replace(result.group(), 405 f'androidboot.vbmeta.size = {calc_vbmeta_size()}') 406 407 with tempfile.TemporaryDirectory() as work_dir: 408 tmp_initrd = os.path.join(work_dir, 'initrd') 409 tmp_bc = os.path.join(work_dir, 'bc') 410 411 for initrd in initrds: 412 detach_bootconfigs(initrd, tmp_initrd, tmp_bc) 413 bc_file = open(tmp_bc, "rt", encoding="utf-8") 414 bc_data = bc_file.read() 415 if not args.do_not_validate_avb_version: 416 validate_avb_version(bc_data) 417 bc_data = update_vbmeta_digest(bc_data) 418 bc_data = update_vbmeta_size(bc_data) 419 bc_file.close() 420 bc_file = open(tmp_bc, "wt", encoding="utf-8") 421 bc_file.write(bc_data) 422 bc_file.flush() 423 attach_bootconfigs(initrd, tmp_initrd, tmp_bc) 424 425 426def MakeVbmetaImage(args, key, vbmeta_img, images=None, chained_partitions=None): 427 if os.path.basename(vbmeta_img) in args.key_overrides: 428 key = args.key_overrides[os.path.basename(vbmeta_img)] 429 info, descriptors = AvbInfo(args, vbmeta_img) 430 if info is None: 431 return 432 433 with tempfile.TemporaryDirectory() as work_dir: 434 algorithm = info['Algorithm'] 435 rollback_index = info['Rollback Index'] 436 rollback_index_location = info['Rollback Index Location'] 437 438 cmd = ['avbtool', 'make_vbmeta_image', 439 '--key', key, 440 '--algorithm', algorithm, 441 '--rollback_index', rollback_index, 442 '--rollback_index_location', rollback_index_location, 443 '--output', vbmeta_img] 444 if images: 445 for img in images: 446 cmd.extend(['--include_descriptors_from_image', img]) 447 448 # replace pubkeys of chained_partitions as well 449 for name, descriptor in descriptors: 450 if name == 'Chain Partition descriptor': 451 part_name = descriptor['Partition Name'] 452 ril = descriptor['Rollback Index Location'] 453 part_key = chained_partitions[part_name] 454 avbpubkey = os.path.join(work_dir, part_name + '.avbpubkey') 455 ExtractAvbPubkey(args, part_key, avbpubkey) 456 cmd.extend(['--chain_partition', f'{part_name}:{ril}:{avbpubkey}']) 457 458 if args.signing_args: 459 cmd.extend(shlex.split(args.signing_args)) 460 461 RunCommand(args, cmd) 462 check_resigned_image_avb_info(vbmeta_img, info, descriptors, args) 463 # libavb expects to be able to read the maximum vbmeta size, so we must provide a partition 464 # which matches this or the read will fail. 465 with open(vbmeta_img, 'a', encoding='utf8') as f: 466 f.truncate(65536) 467 468 469def UnpackSuperImg(args, super_img, work_dir): 470 tmp_super_img = os.path.join(work_dir, 'super.img') 471 RunCommand(args, ['simg2img', super_img, tmp_super_img]) 472 RunCommand(args, ['lpunpack', tmp_super_img, work_dir]) 473 474 475def MakeSuperImage(args, partitions, output): 476 with tempfile.TemporaryDirectory() as work_dir: 477 cmd = ['lpmake', '--device-size=auto', '--metadata-slots=2', # A/B 478 '--metadata-size=65536', '--sparse', '--output=' + output] 479 480 for part, img in partitions.items(): 481 tmp_img = os.path.join(work_dir, part) 482 RunCommand(args, ['img2simg', img, tmp_img]) 483 484 image_arg = f'--image={part}={img}' 485 partition_arg = f'--partition={part}:readonly:{os.path.getsize(img)}:default' 486 cmd.extend([image_arg, partition_arg]) 487 488 RunCommand(args, cmd) 489 490 491def GenVbmetaImage(args, image, output, partition_name, salt): 492 cmd = ['avbtool', 'add_hash_footer', '--dynamic_partition_size', 493 '--do_not_append_vbmeta_image', 494 '--partition_name', partition_name, 495 '--salt', salt, 496 '--image', image, 497 '--output_vbmeta_image', output] 498 RunCommand(args, cmd) 499 500 501gki_versions = ['android14-6.1-pkvm_experimental'] 502 503# dict of (key, file) for re-sign/verification. keys are un-versioned for readability. 504virt_apex_non_gki_files = { 505 'kernel': 'etc/fs/microdroid_kernel', 506 'vbmeta.img': 'etc/fs/microdroid_vbmeta.img', 507 'super.img': 'etc/fs/microdroid_super.img', 508 'initrd_normal.img': 'etc/microdroid_initrd_normal.img', 509 'initrd_debuggable.img': 'etc/microdroid_initrd_debuggable.img', 510 'rialto': 'etc/rialto.bin', 511} 512 513def TargetFiles(input_dir): 514 ret = {k: os.path.join(input_dir, v) for k, v in virt_apex_non_gki_files.items()} 515 516 for ver in gki_versions: 517 kernel = os.path.join(input_dir, f'etc/fs/microdroid_gki-{ver}_kernel') 518 initrd_normal = os.path.join(input_dir, f'etc/microdroid_gki-{ver}_initrd_normal.img') 519 initrd_debug = os.path.join(input_dir, f'etc/microdroid_gki-{ver}_initrd_debuggable.img') 520 521 if os.path.isfile(kernel): 522 ret[f'gki-{ver}_kernel'] = kernel 523 ret[f'gki-{ver}_initrd_normal.img'] = initrd_normal 524 ret[f'gki-{ver}_initrd_debuggable.img'] = initrd_debug 525 526 return ret 527 528def IsInitrdImage(path): 529 return path.endswith('initrd_normal.img') or path.endswith('initrd_debuggable.img') 530 531 532def SignVirtApex(args): 533 key = args.key 534 input_dir = args.input_dir 535 files = TargetFiles(input_dir) 536 537 # unpacked files (will be unpacked from super.img below) 538 system_a_img = os.path.join(unpack_dir.name, 'system_a.img') 539 vendor_a_img = os.path.join(unpack_dir.name, 'vendor_a.img') 540 541 # re-sign super.img 542 # 1. unpack super.img 543 # 2. resign system and vendor (if exists) 544 # 3. repack super.img out of resigned system and vendor (if exists) 545 UnpackSuperImg(args, files['super.img'], unpack_dir.name) 546 system_a_f = Async(AddHashTreeFooter, args, key, system_a_img) 547 partitions = {"system_a": system_a_img} 548 images = [system_a_img] 549 images_f = [system_a_f] 550 551 # if vendor_a.img exists, resign it 552 if os.path.exists(vendor_a_img): 553 partitions.update({'vendor_a': vendor_a_img}) 554 images.append(vendor_a_img) 555 vendor_a_f = Async(AddHashTreeFooter, args, key, vendor_a_img) 556 images_f.append(vendor_a_f) 557 558 Async(MakeSuperImage, args, partitions, 559 files['super.img'], wait=images_f) 560 561 # re-generate vbmeta from re-signed system_a.img 562 vbmeta_f = Async(MakeVbmetaImage, args, key, files['vbmeta.img'], 563 images=images, 564 wait=images_f) 565 566 vbmeta_bc_f = None 567 if not args.do_not_update_bootconfigs: 568 initrd_files = [v for k, v in files.items() if IsInitrdImage(k)] 569 vbmeta_bc_f = Async(UpdateVbmetaBootconfig, args, initrd_files, 570 files['vbmeta.img'], 571 wait=[vbmeta_f]) 572 573 # Re-sign kernel. Note kernel's vbmeta contain addition descriptor from ramdisk(s) 574 def resign_decompressed_kernel(kernel_file, initrd_normal_file, initrd_debug_file): 575 _, kernel_image_descriptors = AvbInfo(args, kernel_file) 576 salts = extract_hash_descriptors( 577 kernel_image_descriptors, lambda descriptor: descriptor['Salt']) 578 initrd_normal_hashdesc = tempfile.NamedTemporaryFile(delete=False).name 579 initrd_debug_hashdesc = tempfile.NamedTemporaryFile(delete=False).name 580 initrd_n_f = Async(GenVbmetaImage, args, initrd_normal_file, 581 initrd_normal_hashdesc, "initrd_normal", salts["initrd_normal"], 582 wait=[vbmeta_bc_f] if vbmeta_bc_f is not None else []) 583 initrd_d_f = Async(GenVbmetaImage, args, initrd_debug_file, 584 initrd_debug_hashdesc, "initrd_debug", salts["initrd_debug"], 585 wait=[vbmeta_bc_f] if vbmeta_bc_f is not None else []) 586 return Async(AddHashFooter, args, key, kernel_file, 587 additional_images=[initrd_normal_hashdesc, initrd_debug_hashdesc], 588 wait=[initrd_n_f, initrd_d_f]) 589 590 def resign_compressed_kernel(kernel_file, initrd_normal_file, initrd_debug_file): 591 # decompress, re-sign, compress again 592 with tempfile.TemporaryDirectory() as work_dir: 593 decompressed_kernel_file = os.path.join(work_dir, os.path.basename(kernel_file)) 594 RunCommand(args, ['lz4', '-d', kernel_file, decompressed_kernel_file]) 595 resign_decompressed_kernel(decompressed_kernel_file, initrd_normal_file, 596 initrd_debug_file).result() 597 RunCommand(args, ['lz4', '-9', '-f', decompressed_kernel_file, kernel_file]) 598 599 def resign_kernel(kernel, initrd_normal, initrd_debug): 600 kernel_file = files[kernel] 601 initrd_normal_file = files[initrd_normal] 602 initrd_debug_file = files[initrd_debug] 603 604 # kernel may be compressed with lz4. 605 if is_lz4(args, kernel_file): 606 return Async(resign_compressed_kernel, kernel_file, initrd_normal_file, 607 initrd_debug_file) 608 else: 609 return resign_decompressed_kernel(kernel_file, initrd_normal_file, initrd_debug_file) 610 611 _, original_kernel_descriptors = AvbInfo(args, files['kernel']) 612 resign_kernel_tasks = [resign_kernel('kernel', 'initrd_normal.img', 'initrd_debuggable.img')] 613 original_kernels = {"kernel" : original_kernel_descriptors} 614 615 for ver in gki_versions: 616 if f'gki-{ver}_kernel' in files: 617 kernel_name = f'gki-{ver}_kernel' 618 _, original_kernel_descriptors = AvbInfo(args, files[kernel_name]) 619 task = resign_kernel( 620 kernel_name, 621 f'gki-{ver}_initrd_normal.img', 622 f'gki-{ver}_initrd_debuggable.img') 623 resign_kernel_tasks.append(task) 624 original_kernels[kernel_name] = original_kernel_descriptors 625 626 # Re-sign rialto if it exists. Rialto only exists in arm64 environment. 627 if os.path.exists(files['rialto']): 628 update_initrd_digests_task = Async( 629 update_initrd_digests_of_kernels_in_rialto, original_kernels, args, files, 630 wait=resign_kernel_tasks) 631 Async(resign_rialto, args, key, files['rialto'], wait=[update_initrd_digests_task]) 632 633def resign_rialto(args, key, rialto_path): 634 _, original_descriptors = AvbInfo(args, rialto_path) 635 AddHashFooter(args, key, rialto_path) 636 637 # Verify the new AVB footer. 638 updated_info, updated_descriptors = AvbInfo(args, rialto_path) 639 assert len(updated_descriptors) == 2, \ 640 f"There should be two descriptors for rialto. Updated descriptors: {updated_descriptors}" 641 updated_prop = find_all_values_by_key(updated_descriptors, "Prop") 642 assert len(updated_prop) == 1, "There should be only one Prop descriptor for rialto. " \ 643 f"Updated descriptors: {updated_descriptors}" 644 assert updated_info["Rollback Index"] != "0", "Rollback index should not be zero for rialto." 645 646 # Verify the only hash descriptor of rialto. 647 updated_hash_descriptors = extract_hash_descriptors(updated_descriptors) 648 assert len(updated_hash_descriptors) == 1, \ 649 f"There should be only one hash descriptor for rialto. " \ 650 f"Updated hash descriptors: {updated_hash_descriptors}" 651 # Since salt is not updated, the change of digest reflects the change of content of rialto 652 # kernel. 653 if not args.do_not_update_bootconfigs: 654 [(_, original_descriptor)] = extract_hash_descriptors(original_descriptors).items() 655 [(_, updated_descriptor)] = updated_hash_descriptors.items() 656 assert_different_value(original_descriptor, updated_descriptor, "Digest", 657 "rialto_hash_descriptor") 658 659def assert_different_value(original, updated, key, context): 660 assert original[key] != updated[key], \ 661 f"Value of '{key}' should change for '{context}'" \ 662 f"Original value: {original[key]}, updated value: {updated[key]}" 663 664def update_initrd_digests_of_kernels_in_rialto(original_kernels, args, files): 665 # Update the hashes of initrd_normal and initrd_debug in rialto if the 666 # bootconfigs in them are updated. 667 if args.do_not_update_bootconfigs: 668 return 669 670 with open(files['rialto'], "rb") as file: 671 content = file.read() 672 673 for kernel_name, descriptors in original_kernels.items(): 674 content = update_initrd_digests_in_rialto( 675 descriptors, args, files, kernel_name, content) 676 677 with open(files['rialto'], "wb") as file: 678 file.write(content) 679 680def update_initrd_digests_in_rialto( 681 original_descriptors, args, files, kernel_name, content): 682 _, updated_descriptors = AvbInfo(args, files[kernel_name]) 683 684 original_digests = extract_hash_descriptors( 685 original_descriptors, lambda x: binascii.unhexlify(x['Digest'])) 686 updated_digests = extract_hash_descriptors( 687 updated_descriptors, lambda x: binascii.unhexlify(x['Digest'])) 688 assert original_digests.pop("boot") == updated_digests.pop("boot"), \ 689 "Hash descriptor of boot should not change for " + kernel_name + \ 690 f"\nOriginal descriptors: {original_descriptors}, " \ 691 f"\nUpdated descriptors: {updated_descriptors}" 692 693 # Check that the original and updated digests are different before updating rialto. 694 partition_names = {'initrd_normal', 'initrd_debug'} 695 assert set(original_digests.keys()) == set(updated_digests.keys()) == partition_names, \ 696 f"Original digests' partitions should be {partition_names}. " \ 697 f"Original digests: {original_digests}. Updated digests: {updated_digests}" 698 assert set(original_digests.values()).isdisjoint(updated_digests.values()), \ 699 "Digests of initrd_normal and initrd_debug should change. " \ 700 f"Original descriptors: {original_descriptors}, " \ 701 f"updated descriptors: {updated_descriptors}" 702 703 for partition_name, original_digest in original_digests.items(): 704 updated_digest = updated_digests[partition_name] 705 assert len(original_digest) == len(updated_digest), \ 706 f"Length of original_digest and updated_digest must be the same for {partition_name}." \ 707 f" Original digest: {original_digest}, updated digest: {updated_digest}" 708 709 new_content = content.replace(original_digest, updated_digest) 710 assert len(new_content) == len(content), \ 711 "Length of new_content and content must be the same." 712 assert new_content != content, \ 713 f"original digest of the partition {partition_name} not found." 714 content = new_content 715 716 return content 717 718def extract_hash_descriptors(descriptors, f=lambda x: x): 719 return {desc["Partition Name"]: f(desc) for desc in 720 find_all_values_by_key(descriptors, "Hash descriptor")} 721 722def VerifyVirtApex(args): 723 key = args.key 724 input_dir = args.input_dir 725 files = TargetFiles(input_dir) 726 727 # unpacked files 728 UnpackSuperImg(args, files['super.img'], unpack_dir.name) 729 system_a_img = os.path.join(unpack_dir.name, 'system_a.img') 730 731 # Read pubkey digest from the input key 732 with tempfile.NamedTemporaryFile() as pubkey_file: 733 ExtractAvbPubkey(args, key, pubkey_file.name) 734 with open(pubkey_file.name, 'rb') as f: 735 pubkey = f.read() 736 pubkey_digest = hashlib.sha1(pubkey).hexdigest() 737 738 def check_avb_pubkey(file): 739 info, _ = AvbInfo(args, file) 740 assert info is not None, f'no avbinfo: {file}' 741 assert info['Public key (sha1)'] == pubkey_digest, f'pubkey mismatch: {file}' 742 743 for k, f in files.items(): 744 if IsInitrdImage(k): 745 # TODO(b/245277660): Verify that ramdisks contain the correct vbmeta digest 746 continue 747 if k == 'rialto' and not os.path.exists(f): 748 # Rialto only exists in arm64 environment. 749 continue 750 if k == 'super.img': 751 Async(check_avb_pubkey, system_a_img) 752 else: 753 # Check pubkey for other files using avbtool 754 Async(check_avb_pubkey, f) 755 756 757def main(argv): 758 try: 759 args = ParseArgs(argv) 760 if args.verify: 761 VerifyVirtApex(args) 762 else: 763 SignVirtApex(args) 764 # ensure all tasks are completed without exceptions 765 AwaitAll(tasks) 766 except: # pylint: disable=bare-except 767 traceback.print_exc() 768 sys.exit(1) 769 770 771if __name__ == '__main__': 772 main(sys.argv[1:]) 773