1#!/usr/bin/env python3
2#
3# Copyright 2021, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Repacks the boot image.
18
19Unpacks the boot image and the ramdisk inside, then add files into
20the ramdisk to repack the boot image.
21"""
22
23import argparse
24import datetime
25import enum
26import glob
27import os
28import shlex
29import shutil
30import subprocess
31import tempfile
32
33
34class TempFileManager:
35    """Manages temporary files and dirs."""
36
37    def __init__(self):
38        self._temp_files = []
39
40    def __del__(self):
41        """Removes temp dirs and files."""
42        for f in self._temp_files:
43            if os.path.isdir(f):
44                shutil.rmtree(f, ignore_errors=True)
45            else:
46                os.remove(f)
47
48    def make_temp_dir(self, prefix='tmp', suffix=''):
49        """Makes a temporary dir that will be cleaned up in the destructor.
50
51        Returns:
52            The absolute pathname of the new directory.
53        """
54        dir_name = tempfile.mkdtemp(prefix=prefix, suffix=suffix)
55        self._temp_files.append(dir_name)
56        return dir_name
57
58    def make_temp_file(self, prefix='tmp', suffix=''):
59        """Make a temp file that will be deleted in the destructor.
60
61        Returns:
62            The absolute pathname of the new file.
63        """
64        fd, file_name = tempfile.mkstemp(prefix=prefix, suffix=suffix)
65        os.close(fd)
66        self._temp_files.append(file_name)
67        return file_name
68
69
70class RamdiskFormat(enum.Enum):
71    """Enum class for different ramdisk compression formats."""
72    LZ4 = 1
73    GZIP = 2
74
75
76class BootImageType(enum.Enum):
77    """Enum class for different boot image types."""
78    BOOT_IMAGE = 1
79    VENDOR_BOOT_IMAGE = 2
80    SINGLE_RAMDISK_FRAGMENT = 3
81    MULTIPLE_RAMDISK_FRAGMENTS = 4
82
83
84class RamdiskImage:
85    """A class that supports packing/unpacking a ramdisk."""
86    def __init__(self, ramdisk_img, unpack=True):
87        self._ramdisk_img = ramdisk_img
88        self._ramdisk_format = None
89        self._ramdisk_dir = None
90        self._temp_file_manager = TempFileManager()
91
92        if unpack:
93            self._unpack_ramdisk()
94        else:
95            self._ramdisk_dir = self._temp_file_manager.make_temp_dir(
96                suffix='_new_ramdisk')
97
98    def _unpack_ramdisk(self):
99        """Unpacks the ramdisk."""
100        self._ramdisk_dir = self._temp_file_manager.make_temp_dir(
101            suffix='_' + os.path.basename(self._ramdisk_img))
102
103        # The compression format might be in 'lz4' or 'gzip' format,
104        # trying lz4 first.
105        for compression_type, compression_util in [
106            (RamdiskFormat.LZ4, 'lz4'),
107            (RamdiskFormat.GZIP, 'minigzip')]:
108
109            # Command arguments:
110            #   -d: decompression
111            #   -c: write to stdout
112            decompression_cmd = [
113                compression_util, '-d', '-c', self._ramdisk_img]
114
115            decompressed_result = subprocess.run(
116                decompression_cmd, check=False, capture_output=True)
117
118            if decompressed_result.returncode == 0:
119                self._ramdisk_format = compression_type
120                break
121
122        if self._ramdisk_format is not None:
123            # toybox cpio arguments:
124            #   -i: extract files from stdin
125            #   -d: create directories if needed
126            #   -u: override existing files
127            subprocess.run(
128                ['toybox', 'cpio', '-idu'], check=True,
129                input=decompressed_result.stdout, cwd=self._ramdisk_dir)
130
131            print("=== Unpacked ramdisk: '{}' ===".format(
132                self._ramdisk_img))
133        else:
134            raise RuntimeError('Failed to decompress ramdisk.')
135
136    def repack_ramdisk(self, out_ramdisk_file):
137        """Repacks a ramdisk from self._ramdisk_dir.
138
139        Args:
140            out_ramdisk_file: the output ramdisk file to save.
141        """
142        compression_cmd = ['lz4', '-l', '-12', '--favor-decSpeed']
143        if self._ramdisk_format == RamdiskFormat.GZIP:
144            compression_cmd = ['minigzip']
145
146        print('Repacking ramdisk, which might take a few seconds ...')
147
148        mkbootfs_result = subprocess.run(
149            ['mkbootfs', self._ramdisk_dir], check=True, capture_output=True)
150
151        with open(out_ramdisk_file, 'w') as output_fd:
152            subprocess.run(compression_cmd, check=True,
153                           input=mkbootfs_result.stdout, stdout=output_fd)
154
155        print("=== Repacked ramdisk: '{}' ===".format(out_ramdisk_file))
156
157    @property
158    def ramdisk_dir(self):
159        """Returns the internal ramdisk dir."""
160        return self._ramdisk_dir
161
162
163class BootImage:
164    """A class that supports packing/unpacking a boot.img and ramdisk."""
165
166    def __init__(self, bootimg):
167        self._bootimg = bootimg
168        self._bootimg_dir = None
169        self._bootimg_type = None
170        self._ramdisk = None
171        self._previous_mkbootimg_args = []
172        self._temp_file_manager = TempFileManager()
173
174        self._unpack_bootimg()
175
176    def _get_vendor_ramdisks(self):
177        """Returns a list of vendor ramdisks after unpack."""
178        return sorted(glob.glob(
179            os.path.join(self._bootimg_dir, 'vendor_ramdisk*')))
180
181    def _unpack_bootimg(self):
182        """Unpacks the boot.img and the ramdisk inside."""
183        self._bootimg_dir = self._temp_file_manager.make_temp_dir(
184            suffix='_' + os.path.basename(self._bootimg))
185
186        # Unpacks the boot.img first.
187        unpack_bootimg_cmds = [
188            'unpack_bootimg',
189            '--boot_img', self._bootimg,
190            '--out', self._bootimg_dir,
191            '--format=mkbootimg',
192        ]
193        result = subprocess.run(unpack_bootimg_cmds, check=True,
194                                capture_output=True, encoding='utf-8')
195        self._previous_mkbootimg_args = shlex.split(result.stdout)
196        print("=== Unpacked boot image: '{}' ===".format(self._bootimg))
197
198        # From the output dir, checks there is 'ramdisk' or 'vendor_ramdisk'.
199        ramdisk = os.path.join(self._bootimg_dir, 'ramdisk')
200        vendor_ramdisk = os.path.join(self._bootimg_dir, 'vendor_ramdisk')
201        vendor_ramdisks = self._get_vendor_ramdisks()
202        if os.path.exists(ramdisk):
203            self._ramdisk = RamdiskImage(ramdisk)
204            self._bootimg_type = BootImageType.BOOT_IMAGE
205        elif os.path.exists(vendor_ramdisk):
206            self._ramdisk = RamdiskImage(vendor_ramdisk)
207            self._bootimg_type = BootImageType.VENDOR_BOOT_IMAGE
208        elif len(vendor_ramdisks) == 1:
209            self._ramdisk = RamdiskImage(vendor_ramdisks[0])
210            self._bootimg_type = BootImageType.SINGLE_RAMDISK_FRAGMENT
211        elif len(vendor_ramdisks) > 1:
212            # Creates an empty RamdiskImage() below, without unpack.
213            # We'll then add files into this newly created ramdisk, then pack
214            # it with other vendor ramdisks together.
215            self._ramdisk = RamdiskImage(ramdisk_img=None, unpack=False)
216            self._bootimg_type = BootImageType.MULTIPLE_RAMDISK_FRAGMENTS
217        else:
218            raise RuntimeError('Both ramdisk and vendor_ramdisk do not exist.')
219
220    def repack_bootimg(self):
221        """Repacks the ramdisk and rebuild the boot.img"""
222
223        new_ramdisk = self._temp_file_manager.make_temp_file(
224            prefix='ramdisk-patched')
225        self._ramdisk.repack_ramdisk(new_ramdisk)
226
227        mkbootimg_cmd = ['mkbootimg']
228
229        # Uses previous mkbootimg args, e.g., --vendor_cmdline, --dtb_offset.
230        mkbootimg_cmd.extend(self._previous_mkbootimg_args)
231
232        ramdisk_option = ''
233        if self._bootimg_type == BootImageType.BOOT_IMAGE:
234            ramdisk_option = '--ramdisk'
235            mkbootimg_cmd.extend(['--output', self._bootimg])
236        elif self._bootimg_type == BootImageType.VENDOR_BOOT_IMAGE:
237            ramdisk_option = '--vendor_ramdisk'
238            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
239        elif self._bootimg_type == BootImageType.SINGLE_RAMDISK_FRAGMENT:
240            ramdisk_option = '--vendor_ramdisk_fragment'
241            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
242        elif self._bootimg_type == BootImageType.MULTIPLE_RAMDISK_FRAGMENTS:
243            mkbootimg_cmd.extend(['--ramdisk_type', 'PLATFORM'])
244            ramdisk_name = (
245                'RAMDISK_' +
246                datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S'))
247            mkbootimg_cmd.extend(['--ramdisk_name', ramdisk_name])
248            mkbootimg_cmd.extend(['--vendor_ramdisk_fragment', new_ramdisk])
249            mkbootimg_cmd.extend(['--vendor_boot', self._bootimg])
250
251        if ramdisk_option and ramdisk_option not in mkbootimg_cmd:
252            raise RuntimeError("Failed to find '{}' from:\n  {}".format(
253                ramdisk_option, shlex.join(mkbootimg_cmd)))
254        # Replaces the original ramdisk with the newly packed ramdisk.
255        if ramdisk_option:
256            ramdisk_index = mkbootimg_cmd.index(ramdisk_option) + 1
257            mkbootimg_cmd[ramdisk_index] = new_ramdisk
258
259        subprocess.check_call(mkbootimg_cmd)
260        print("=== Repacked boot image: '{}' ===".format(self._bootimg))
261
262    def add_files(self, src_dir, files):
263        """Copy files from the src_dir into current ramdisk.
264
265        Args:
266            src_dir: a source dir containing the files to copy from.
267            files: a list of files or src_file:dst_file pairs to copy from
268              src_dir to the current ramdisk.
269        """
270        # Creates missing parent dirs with 0o755.
271        original_mask = os.umask(0o022)
272        for f in files:
273            if ':' in f:
274                src_file = os.path.join(src_dir, f.split(':')[0])
275                dst_file = os.path.join(self.ramdisk_dir, f.split(':')[1])
276            else:
277                src_file = os.path.join(src_dir, f)
278                dst_file = os.path.join(self.ramdisk_dir, f)
279
280            dst_dir = os.path.dirname(dst_file)
281            if not os.path.exists(dst_dir):
282                print("Creating dir '{}'".format(dst_dir))
283                os.makedirs(dst_dir, 0o755)
284            print("Copying file '{}' into '{}'".format(src_file, dst_file))
285            shutil.copy2(src_file, dst_file)
286        os.umask(original_mask)
287
288    @property
289    def ramdisk_dir(self):
290        """Returns the internal ramdisk dir."""
291        return self._ramdisk.ramdisk_dir
292
293
294def _get_repack_usage():
295    return """Usage examples:
296
297  * --ramdisk_add
298
299    Specifies a list of files or src_file:dst_file pairs to copy from
300    --src_bootimg's ramdisk into --dst_bootimg's ramdisk.
301
302    $ repack_bootimg \\
303        --src_bootimg boot-debug-5.4.img --dst_bootimg vendor_boot-debug.img \\
304        --ramdisk_add first_stage_ramdisk/userdebug_plat_sepolicy.cil:userdebug_plat_sepolicy.cil
305
306    The above command copies '/first_stage_ramdisk/userdebug_plat_sepolicy.cil'
307    from --src_bootimg's ramdisk to '/userdebug_plat_sepolicy.cil' of
308    --dst_bootimg's ramdisk, then repacks the --dst_bootimg.
309
310    $ repack_bootimg \\
311        --src_bootimg boot-debug-5.4.img --dst_bootimg vendor_boot-debug.img \\
312        --ramdisk_add first_stage_ramdisk/userdebug_plat_sepolicy.cil
313
314    This is similar to the previous example, but the source file path and
315    destination file path are the same:
316        '/first_stage_ramdisk/userdebug_plat_sepolicy.cil'.
317
318    We can also combine both usage together with a list of copy instructions.
319    For example:
320
321    $ repack_bootimg \\
322        --src_bootimg boot-debug-5.4.img --dst_bootimg vendor_boot-debug.img \\
323        --ramdisk_add file1 file2:/subdir/file2 file3
324"""
325
326
327def _parse_args():
328    """Parse command-line options."""
329    parser = argparse.ArgumentParser(
330        formatter_class=argparse.RawDescriptionHelpFormatter,
331        description='Repacks boot, recovery or vendor_boot image by importing'
332                    'ramdisk files from --src_bootimg to --dst_bootimg.',
333        epilog=_get_repack_usage(),
334    )
335
336    parser.add_argument(
337        '--src_bootimg', help='filename to source boot image',
338        type=str, required=True)
339    parser.add_argument(
340        '--dst_bootimg', help='filename to destination boot image',
341        type=str, required=True)
342    parser.add_argument(
343        '--ramdisk_add', nargs='+',
344        help='a list of files or src_file:dst_file pairs to add into '
345             'the ramdisk',
346        default=['userdebug_plat_sepolicy.cil']
347    )
348
349    return parser.parse_args()
350
351
352def main():
353    """Parse arguments and repack boot image."""
354    args = _parse_args()
355    src_bootimg = BootImage(args.src_bootimg)
356    dst_bootimg = BootImage(args.dst_bootimg)
357    dst_bootimg.add_files(src_bootimg.ramdisk_dir, args.ramdisk_add)
358    dst_bootimg.repack_bootimg()
359
360
361if __name__ == '__main__':
362    main()
363