1#!/usr/bin/env python 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""VTS tests to verify boot/recovery image header versions.""" 18 19import logging 20import os 21import shutil 22from struct import unpack 23import tempfile 24import zlib 25 26from vts.runners.host import asserts 27from vts.runners.host import base_test 28from vts.runners.host import test_runner 29from vts.utils.python.android import api 30from vts.utils.python.file import target_file_utils 31 32block_dev_path = "/dev/block/platform" # path to platform block devices 33PROPERTY_SLOT_SUFFIX = "ro.boot.slot_suffix" # indicates current slot suffix for A/B devices 34BOOT_HEADER_DTBO_SIZE_OFFSET = 1632 # offset of recovery dtbo size in boot header of version 1. 35 36 37class VtsFirmwareBootHeaderVerificationTest(base_test.BaseTestClass): 38 """Verifies boot/recovery image header. 39 40 Attributes: 41 temp_dir: The temporary directory on host. 42 slot_suffix: The current slot suffix for A/B devices. 43 """ 44 45 def setUpClass(self): 46 """Initializes the DUT and creates temporary directories.""" 47 self.dut = self.android_devices[0] 48 self.shell = self.dut.shell 49 self.adb = self.dut.adb 50 self.temp_dir = tempfile.mkdtemp() 51 self.launch_api_level = self.dut.getLaunchApiLevel() 52 logging.info("Create %s", self.temp_dir) 53 self.slot_suffix = self.dut.getProp(PROPERTY_SLOT_SUFFIX) 54 if self.slot_suffix is None: 55 self.slot_suffix = "" 56 logging.info("current slot suffix: %s", self.slot_suffix) 57 58 def setUp(self): 59 """Checks if the the preconditions to run the test are met.""" 60 if "x86" in self.dut.cpu_abi: 61 global block_dev_path 62 block_dev_path = "/dev/block" 63 acpio_idx_string = self.adb.shell( 64 "cat /proc/cmdline | " 65 "grep -o \"androidboot.acpio_idx=[^ ]*\" |" 66 "cut -d \"=\" -f 2 ").replace('\n','') 67 asserts.skipIf((len(acpio_idx_string) == 0), "Skipping test for x86 NON-ACPI ABI") 68 69 def get_number_of_pages(self, image_size, page_size): 70 """Calculates the number of pages required for the image. 71 72 Args: 73 image_size: size of the image. 74 page_size : size of page. 75 76 Returns: 77 Number of pages required for the image 78 """ 79 return (image_size + page_size - 1) / page_size 80 81 def checkValidRamdisk(self, ramdisk_image): 82 """Verifies that the ramdisk extracted from boot.img is a valid gzipped cpio archive. 83 84 Args: 85 ramdisk_image: ramdisk extracted from boot.img. 86 """ 87 # Set wbits parameter to zlib.MAX_WBITS|16 to expect a gzip header and 88 # trailer. 89 unzipped_ramdisk = zlib.decompress(ramdisk_image, zlib.MAX_WBITS|16) 90 # The CPIO header magic can be "070701" or "070702" as per kernel 91 # documentation: Documentation/early-userspace/buffer-format.txt 92 cpio_header_magic = unzipped_ramdisk[0:6] 93 asserts.assertTrue(cpio_header_magic == "070701" or cpio_header_magic == "070702", 94 "cpio archive header magic not found in ramdisk") 95 96 def CheckImageHeader(self, boot_image, is_recovery=False): 97 """Verifies the boot image format. 98 99 Args: 100 boot_image: Path to the boot image. 101 is_recovery: Indicates that the image is recovery if true. 102 """ 103 try: 104 with open(boot_image, "rb") as image_file: 105 image_file.read(8) # read boot magic 106 (kernel_size, _, ramdisk_size, _, _, _, _, page_size, 107 host_image_header_version) = unpack("9I", image_file.read(9 * 4)) 108 109 asserts.assertNotEqual(kernel_size, 0, "boot.img/recovery.img must contain kernel") 110 111 if self.launch_api_level > api.PLATFORM_API_LEVEL_P: 112 asserts.assertTrue( 113 host_image_header_version >= 2, 114 "Device must atleast have a boot image of version 2") 115 116 asserts.assertNotEqual(ramdisk_size, 0, "boot.img must contain ramdisk") 117 118 # ramdisk comes after the header and kernel pages 119 num_kernel_pages = self.get_number_of_pages(kernel_size, page_size) 120 ramdisk_offset = page_size * (1 + num_kernel_pages) 121 image_file.seek(ramdisk_offset) 122 ramdisk_buf = image_file.read(ramdisk_size) 123 self.checkValidRamdisk(ramdisk_buf) 124 else: 125 asserts.assertTrue( 126 host_image_header_version >= 1, 127 "Device must atleast have a boot image of version 1") 128 image_file.seek(BOOT_HEADER_DTBO_SIZE_OFFSET) 129 recovery_dtbo_size = unpack("I", image_file.read(4))[0] 130 image_file.read(8) # ignore recovery dtbo load address 131 if is_recovery: 132 asserts.assertNotEqual( 133 recovery_dtbo_size, 0, 134 "recovery partition for non-A/B devices must contain the recovery DTBO" 135 ) 136 boot_header_size = unpack("I", image_file.read(4))[0] 137 if host_image_header_version > 1: 138 dtb_size = unpack("I", image_file.read(4))[0] 139 asserts.assertNotEqual(dtb_size, 0, "Boot/recovery image must contain DTB") 140 image_file.read(8) # ignore DTB physical load address 141 expected_header_size = image_file.tell() 142 asserts.assertEqual( 143 boot_header_size, expected_header_size, 144 "Test failure due to boot header size mismatch. Expected %s Actual %s" 145 % (expected_header_size, boot_header_size)) 146 except IOError as e: 147 logging.exception(e) 148 asserts.fail("Unable to open boot image file") 149 150 def testBootImageHeader(self): 151 """Validates boot image header.""" 152 current_boot_partition = "boot" + str(self.slot_suffix) 153 boot_path = target_file_utils.FindFiles( 154 self.shell, block_dev_path, current_boot_partition, "-type l") 155 logging.info("Boot path %s", boot_path) 156 if not boot_path: 157 asserts.fail("Unable to find path to boot image on device.") 158 host_boot_path = os.path.join(self.temp_dir, "boot.img") 159 self.adb.pull("%s %s" % (boot_path[0], host_boot_path)) 160 self.CheckImageHeader(host_boot_path) 161 162 def testRecoveryImageHeader(self): 163 """Validates recovery image header.""" 164 asserts.skipIf(self.slot_suffix, 165 "A/B devices do not have a separate recovery partition") 166 recovery_path = target_file_utils.FindFiles(self.shell, block_dev_path, 167 "recovery", "-type l") 168 logging.info("recovery path %s", recovery_path) 169 if not recovery_path: 170 asserts.fail("Unable to find path to recovery image on device.") 171 host_recovery_path = os.path.join(self.temp_dir, "recovery.img") 172 self.adb.pull("%s %s" % (recovery_path[0], host_recovery_path)) 173 self.CheckImageHeader(host_recovery_path, True) 174 175 def tearDownClass(self): 176 """Deletes temporary directories.""" 177 shutil.rmtree(self.temp_dir) 178 179 180if __name__ == "__main__": 181 test_runner.main() 182