1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""VTS tests to verify boot/recovery image header versions."""
18
19import logging
20import os
21import shutil
22from struct import unpack
23import tempfile
24import zlib
25
26from vts.runners.host import asserts
27from vts.runners.host import base_test
28from vts.runners.host import test_runner
29from vts.utils.python.android import api
30from vts.utils.python.file import target_file_utils
31
32block_dev_path = "/dev/block/platform"  # path to platform block devices
33PROPERTY_SLOT_SUFFIX = "ro.boot.slot_suffix"  # indicates current slot suffix for A/B devices
34BOOT_HEADER_DTBO_SIZE_OFFSET = 1632  # offset of recovery dtbo size in boot header of version 1.
35
36
37class VtsFirmwareBootHeaderVerificationTest(base_test.BaseTestClass):
38    """Verifies boot/recovery image header.
39
40    Attributes:
41        temp_dir: The temporary directory on host.
42        slot_suffix: The current slot suffix for A/B devices.
43    """
44
45    def setUpClass(self):
46        """Initializes the DUT and creates temporary directories."""
47        self.dut = self.android_devices[0]
48        self.shell = self.dut.shell
49        self.adb = self.dut.adb
50        self.temp_dir = tempfile.mkdtemp()
51        self.launch_api_level = self.dut.getLaunchApiLevel()
52        logging.info("Create %s", self.temp_dir)
53        self.slot_suffix = self.dut.getProp(PROPERTY_SLOT_SUFFIX)
54        if self.slot_suffix is None:
55            self.slot_suffix = ""
56        logging.info("current slot suffix: %s", self.slot_suffix)
57
58    def setUp(self):
59        """Checks if the the preconditions to run the test are met."""
60        if "x86" in self.dut.cpu_abi:
61            global block_dev_path
62            block_dev_path = "/dev/block"
63            acpio_idx_string = self.adb.shell(
64                "cat /proc/cmdline | "
65                "grep -o \"androidboot.acpio_idx=[^ ]*\" |"
66                "cut -d \"=\" -f 2 ").replace('\n','')
67            asserts.skipIf((len(acpio_idx_string) == 0), "Skipping test for x86 NON-ACPI ABI")
68
69    def get_number_of_pages(self, image_size, page_size):
70        """Calculates the number of pages required for the image.
71
72        Args:
73            image_size: size of the image.
74            page_size : size of page.
75
76        Returns:
77            Number of pages required for the image
78        """
79        return (image_size + page_size - 1) / page_size
80
81    def checkValidRamdisk(self, ramdisk_image):
82        """Verifies that the ramdisk extracted from boot.img is a valid gzipped cpio archive.
83
84        Args:
85            ramdisk_image: ramdisk extracted from boot.img.
86        """
87        # Set wbits parameter to zlib.MAX_WBITS|16 to expect a gzip header and
88        # trailer.
89        unzipped_ramdisk = zlib.decompress(ramdisk_image, zlib.MAX_WBITS|16)
90        # The CPIO header magic can be "070701" or "070702" as per kernel
91        # documentation: Documentation/early-userspace/buffer-format.txt
92        cpio_header_magic = unzipped_ramdisk[0:6]
93        asserts.assertTrue(cpio_header_magic == "070701" or cpio_header_magic == "070702",
94                           "cpio archive header magic not found in ramdisk")
95
96    def CheckImageHeader(self, boot_image, is_recovery=False):
97        """Verifies the boot image format.
98
99        Args:
100            boot_image: Path to the boot image.
101            is_recovery: Indicates that the image is recovery if true.
102        """
103        try:
104            with open(boot_image, "rb") as image_file:
105                image_file.read(8)  # read boot magic
106                (kernel_size, _, ramdisk_size, _, _, _, _, page_size,
107                 host_image_header_version) = unpack("9I", image_file.read(9 * 4))
108
109                asserts.assertNotEqual(kernel_size, 0, "boot.img/recovery.img must contain kernel")
110
111                if self.launch_api_level > api.PLATFORM_API_LEVEL_P:
112                    asserts.assertTrue(
113                        host_image_header_version >= 2,
114                        "Device must atleast have a boot image of version 2")
115
116                    asserts.assertNotEqual(ramdisk_size, 0, "boot.img must contain ramdisk")
117
118                    # ramdisk comes after the header and kernel pages
119                    num_kernel_pages = self.get_number_of_pages(kernel_size, page_size)
120                    ramdisk_offset = page_size * (1 + num_kernel_pages)
121                    image_file.seek(ramdisk_offset)
122                    ramdisk_buf = image_file.read(ramdisk_size)
123                    self.checkValidRamdisk(ramdisk_buf)
124                else:
125                    asserts.assertTrue(
126                        host_image_header_version >= 1,
127                        "Device must atleast have a boot image of version 1")
128                image_file.seek(BOOT_HEADER_DTBO_SIZE_OFFSET)
129                recovery_dtbo_size = unpack("I", image_file.read(4))[0]
130                image_file.read(8)  # ignore recovery dtbo load address
131                if is_recovery:
132                    asserts.assertNotEqual(
133                        recovery_dtbo_size, 0,
134                        "recovery partition for non-A/B devices must contain the recovery DTBO"
135                    )
136                boot_header_size = unpack("I", image_file.read(4))[0]
137                if host_image_header_version > 1:
138                    dtb_size = unpack("I", image_file.read(4))[0]
139                    asserts.assertNotEqual(dtb_size, 0, "Boot/recovery image must contain DTB")
140                    image_file.read(8)  # ignore DTB physical load address
141                expected_header_size = image_file.tell()
142                asserts.assertEqual(
143                    boot_header_size, expected_header_size,
144                    "Test failure due to boot header size mismatch. Expected %s Actual %s"
145                    % (expected_header_size, boot_header_size))
146        except IOError as e:
147            logging.exception(e)
148            asserts.fail("Unable to open boot image file")
149
150    def testBootImageHeader(self):
151        """Validates boot image header."""
152        current_boot_partition = "boot" + str(self.slot_suffix)
153        boot_path = target_file_utils.FindFiles(
154            self.shell, block_dev_path, current_boot_partition, "-type l")
155        logging.info("Boot path %s", boot_path)
156        if not boot_path:
157            asserts.fail("Unable to find path to boot image on device.")
158        host_boot_path = os.path.join(self.temp_dir, "boot.img")
159        self.adb.pull("%s %s" % (boot_path[0], host_boot_path))
160        self.CheckImageHeader(host_boot_path)
161
162    def testRecoveryImageHeader(self):
163        """Validates recovery image header."""
164        asserts.skipIf(self.slot_suffix,
165                       "A/B devices do not have a separate recovery partition")
166        recovery_path = target_file_utils.FindFiles(self.shell, block_dev_path,
167                                                    "recovery", "-type l")
168        logging.info("recovery path %s", recovery_path)
169        if not recovery_path:
170            asserts.fail("Unable to find path to recovery image on device.")
171        host_recovery_path = os.path.join(self.temp_dir, "recovery.img")
172        self.adb.pull("%s %s" % (recovery_path[0], host_recovery_path))
173        self.CheckImageHeader(host_recovery_path, True)
174
175    def tearDownClass(self):
176        """Deletes temporary directories."""
177        shutil.rmtree(self.temp_dir)
178
179
180if __name__ == "__main__":
181    test_runner.main()
182