1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module containing kernel handler class used by SAFT."""
5
6import hashlib
7import os
8import re
9
10TMP_FILE_NAME = 'kernel_header_dump'
11
12# Types of kernel modifications.
13KERNEL_BODY_MOD = 1
14KERNEL_VERSION_MOD = 2
15KERNEL_RESIGN_MOD = 3
16
17
18class KernelHandlerError(Exception):
19    """KernelHandler-specific exception."""
20    pass
21
22
23class KernelHandler(object):
24    """An object to provide ChromeOS kernel related actions.
25
26    Mostly it allows to corrupt and restore a particular kernel partition
27    (designated by the partition name, A or B.
28
29    @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface
30    """
31
32    # This value is used to alter contents of a byte in the appropriate kernel
33    # image. First added to corrupt the image, then subtracted to restore the
34    # image.
35    DELTA = 1
36
37    # The maximum kernel size in MB.
38    KERNEL_SIZE_MB = 16
39
40    def __init__(self, os_if):
41        self.os_if = os_if
42        self.dump_file_name = None
43        self.partition_map = {}
44        self.root_dev = None
45        self.initialized = False
46
47    def _get_version(self, device):
48        """Get version of the kernel hosted on the passed in partition."""
49        # 16 K should be enough to include headers and keys
50        data = self.os_if.read_partition(device, 0x4000)
51        return self.os_if.retrieve_body_version(data)
52
53    def _get_datakey_version(self, device):
54        """Get datakey version of kernel hosted on the passed in partition."""
55        # 16 K should be enought to include headers and keys
56        data = self.os_if.read_partition(device, 0x4000)
57        return self.os_if.retrieve_datakey_version(data)
58
59    def _get_partition_map(self, internal_disk=True):
60        """Scan `cgpt show <device> output to find kernel devices.
61
62        Args:
63          internal_disk - decide whether to use internal kernel disk.
64        """
65        if internal_disk:
66            target_device = self.os_if.get_internal_disk(
67                    self.os_if.get_root_part())
68        else:
69            target_device = self.root_dev
70
71        kernel_partitions = re.compile('KERN-([AB])')
72        disk_map = self.os_if.run_shell_command_get_output(
73                'cgpt show %s' % target_device)
74
75        for line in disk_map:
76            matched_line = kernel_partitions.search(line)
77            if not matched_line:
78                continue
79            label = matched_line.group(1)
80            part_info = {}
81            device = self.os_if.join_part(target_device, line.split()[2])
82            part_info['device'] = device
83            part_info['version'] = self._get_version(device)
84            part_info['datakey_version'] = self._get_datakey_version(device)
85            self.partition_map[label] = part_info
86
87    def dump_kernel(self, section, kernel_path):
88        """Dump the specified kernel to a file.
89
90        @param section: The kernel to dump. May be A or B.
91        @param kernel_path: The path to the kernel image.
92        """
93        dev = self.partition_map[section.upper()]['device']
94        cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path,
95                                                 self.KERNEL_SIZE_MB)
96        self.os_if.run_shell_command(cmd)
97
98    def write_kernel(self, section, kernel_path):
99        """Write a kernel image to the specified section.
100
101        @param section: The kernel to write. May be A or B.
102        @param kernel_path: The path to the kernel image to write.
103        """
104        dev = self.partition_map[section.upper()]['device']
105        dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev,
106                                                    self.KERNEL_SIZE_MB)
107        self.os_if.run_shell_command(dd_cmd, modifies_device=True)
108
109    def _modify_kernel(self,
110                       section,
111                       delta,
112                       modification_type=KERNEL_BODY_MOD,
113                       key_path=None):
114        """Modify kernel image on a disk partition.
115
116        This method supports three types of kernel modification. KERNEL_BODY_MOD
117        just adds the value of delta to the first byte of the kernel blob.
118        This might leave the kernel corrupted (as required by the test).
119
120        The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
121        version number, it will put it in the kernel header, and then will
122        resign the kernel blob.
123
124        The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
125        argument key_path. If key_path is None, choose dev_key_path as resign
126        key directory.
127        """
128        self.dump_kernel(section, self.dump_file_name)
129        data = list(self.os_if.read_file(self.dump_file_name))
130        if modification_type == KERNEL_BODY_MOD:
131            data[0] = '%c' % ((ord(data[0]) + delta) % 0x100)
132            self.os_if.write_file(self.dump_file_name, ''.join(data))
133            kernel_to_write = self.dump_file_name
134        elif modification_type == KERNEL_VERSION_MOD:
135            new_version = delta
136            kernel_to_write = self.dump_file_name + '.new'
137            self.os_if.run_shell_command(
138                    'vbutil_kernel --repack %s --version %d '
139                    '--signprivate %s --oldblob %s' %
140                    (kernel_to_write, new_version,
141                     os.path.join(self.dev_key_path,
142                                  'kernel_data_key.vbprivk'),
143                     self.dump_file_name))
144        elif modification_type == KERNEL_RESIGN_MOD:
145            if key_path and self.os_if.is_dir(key_path):
146                resign_key_path = key_path
147            else:
148                resign_key_path = self.dev_key_path
149
150            kernel_to_write = self.dump_file_name + '.new'
151            self.os_if.run_shell_command(
152                    'vbutil_kernel --repack %s '
153                    '--signprivate %s --oldblob %s --keyblock %s' %
154                    (kernel_to_write,
155                     os.path.join(resign_key_path, 'kernel_data_key.vbprivk'),
156                     self.dump_file_name,
157                     os.path.join(resign_key_path, 'kernel.keyblock')))
158        else:
159            return  # Unsupported mode, ignore.
160        self.write_kernel(section, kernel_to_write)
161
162    def corrupt_kernel(self, section):
163        """Corrupt a kernel section (add DELTA to the first byte)."""
164        self._modify_kernel(section.upper(), self.DELTA)
165
166    def restore_kernel(self, section):
167        """Restore the previously corrupted kernel."""
168        self._modify_kernel(section.upper(), -self.DELTA)
169
170    def get_version(self, section):
171        """Return version read from this section blob's header."""
172        return self.partition_map[section.upper()]['version']
173
174    def get_datakey_version(self, section):
175        """Return datakey version read from this section blob's header."""
176        return self.partition_map[section.upper()]['datakey_version']
177
178    def get_sha(self, section):
179        """Return the SHA1 hash of the section blob."""
180        s = hashlib.sha1()
181        dev = self.partition_map[section.upper()]['device']
182        s.update(self.os_if.read_file(dev))
183        return s.hexdigest()
184
185    def set_version(self, section, version):
186        """Set version of this kernel blob and re-sign it."""
187        if version < 0:
188            raise KernelHandlerError('Bad version value %d' % version)
189        self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
190
191    def resign_kernel(self, section, key_path=None):
192        """Resign kernel with original kernel version and keys in key_path."""
193        self._modify_kernel(section.upper(), self.get_version(section),
194                            KERNEL_RESIGN_MOD, key_path)
195
196    def init(self, dev_key_path='.', internal_disk=True):
197        """Initialize the kernel handler object.
198
199        Input argument is an OS interface object reference.
200        """
201        self.dev_key_path = dev_key_path
202        self.root_dev = self.os_if.get_root_dev()
203        self.dump_file_name = self.os_if.state_dir_file(TMP_FILE_NAME)
204        self._get_partition_map(internal_disk)
205        self.initialized = True
206