1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""OtaTools class."""
15
16import logging
17import os
18import stat
19import subprocess
20import tempfile
21
22from acloud import errors
23from acloud.internal import constants
24from acloud.internal.lib import utils
25
26logger = logging.getLogger(__name__)
27
28_BIN_DIR_NAME = "bin"
29_LPMAKE = "lpmake"
30_BUILD_SUPER_IMAGE = "build_super_image"
31_AVBTOOL = "avbtool"
32_SGDISK = "sgdisk"
33_SIMG2IMG = "simg2img"
34_MK_COMBINED_IMG = "mk_combined_img"
35
36_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30
37_AVBTOOL_TIMEOUT_SECS = 30
38_MK_COMBINED_IMG_TIMEOUT_SECS = 180
39
40_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` "
41                          "in build environment, or set --local-tool to an "
42                          "extracted otatools.zip.")
43
44
45def FindOtaTools(search_paths):
46    """Find OTA tools in the search paths and in build environment.
47
48    Args:
49        search_paths: List of paths, the directories to search for OTA tools.
50
51    Returns:
52        The directory containing OTA tools.
53
54    Raises:
55        errors.CheckPathError if OTA tools are not found.
56    """
57    for search_path in search_paths:
58        if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME,
59                                       _BUILD_SUPER_IMAGE)):
60            return search_path
61
62    host_out_dir = os.environ.get(constants.ENV_ANDROID_HOST_OUT)
63    if (host_out_dir and
64            os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME,
65                                        _BUILD_SUPER_IMAGE))):
66        return host_out_dir
67
68    raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG %
69                                {"tool_name": "OTA tool directory"})
70
71
72class OtaTools(object):
73    """The class that executes OTA tool commands."""
74
75    def __init__(self, ota_tools_dir):
76        self._ota_tools_dir = os.path.abspath(ota_tools_dir)
77
78    def _GetBinary(self, name):
79        """Get an executable file from _ota_tools_dir.
80
81        Args:
82            name: String, the file name.
83
84        Returns:
85            String, the absolute path.
86
87        Raises:
88            errors.NoExecuteCmd if the file does not exist.
89        """
90        path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name)
91        if not os.path.isfile(path):
92            raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG %
93                                      {"tool_name": name})
94        mode = os.stat(path).st_mode
95        os.chmod(path, mode | (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH |
96                               stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
97        return path
98
99    @staticmethod
100    def _ExecuteCommand(*command, **popen_args):
101        """Execute a command and log the output.
102
103        This method waits for the process to terminate. It kills the process
104        if it's interrupted due to timeout.
105
106        Args:
107            command: Strings, the command.
108            popen_kwargs: The arguments to be passed to subprocess.Popen.
109
110        Raises:
111            errors.SubprocessFail if the process returns non-zero.
112        """
113        proc = None
114        try:
115            logger.info("Execute %s", command)
116            popen_args["stdin"] = subprocess.PIPE
117            popen_args["stdout"] = subprocess.PIPE
118            popen_args["stderr"] = subprocess.PIPE
119            proc = subprocess.Popen(command, **popen_args)
120            stdout, stderr = proc.communicate()
121            logger.info("%s stdout: %s", command[0], stdout)
122            logger.info("%s stderr: %s", command[0], stderr)
123
124            if proc.returncode != 0:
125                raise errors.SubprocessFail("%s returned %d." %
126                                            (command[0], proc.returncode))
127        finally:
128            if proc and proc.poll() is None:
129                logger.info("Kill %s", command[0])
130                proc.kill()
131
132    @staticmethod
133    def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image):
134        """Rewrite lpmake and image paths in misc_info.txt.
135
136        Misc info consists of multiple lines of <key>=<value>.
137        Sample input_file:
138        lpmake=lpmake
139        dynamic_partition_list= system system_ext product vendor
140
141        Sample output_file:
142        lpmake=/path/to/lpmake
143        dynamic_partition_list= system system_ext product vendor
144        system_image=/path/to/system.img
145        system_ext_image=/path/to/system_ext.img
146        product_image=/path/to/product.img
147        vendor_image=/path/to/vendor.img
148
149        This method replaces lpmake with the specified path, and sets
150        *_image for every partition in dynamic_partition_list.
151
152        Args:
153            output_file: The output file object.
154            input_file: The input file object.
155            lpmake_path: The path to lpmake binary.
156            get_image: A function that takes the partition name as the
157                       parameter and returns the image path.
158        """
159        partition_names = ()
160        for line in input_file:
161            split_line = line.strip().split("=", 1)
162            if len(split_line) < 2:
163                split_line = (split_line[0], "")
164            if split_line[0] == "dynamic_partition_list":
165                partition_names = split_line[1].split()
166            elif split_line[0] == "lpmake":
167                output_file.write("lpmake=%s\n" % lpmake_path)
168                continue
169            elif split_line[0].endswith("_image"):
170                continue
171            output_file.write(line)
172
173        if not partition_names:
174            logger.w("No dynamic partition list in misc info.")
175
176        for partition_name in partition_names:
177            output_file.write("%s_image=%s\n" %
178                              (partition_name, get_image(partition_name)))
179
180    @utils.TimeExecute(function_description="Build super image")
181    @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS)
182    def BuildSuperImage(self, output_path, misc_info_path, get_image):
183        """Use build_super_image to create a super image.
184
185        Args:
186            output_path: The path to the output super image.
187            misc_info_path: The path to the misc info that provides parameters
188                            to create the super image.
189            get_image: A function that takes the partition name as the
190                       parameter and returns the image path.
191        """
192        build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE)
193        lpmake = self._GetBinary(_LPMAKE)
194
195        new_misc_info_path = None
196        try:
197            with open(misc_info_path, "r") as misc_info:
198                with tempfile.NamedTemporaryFile(
199                    prefix="misc_info_", suffix=".txt",
200                    delete=False) as new_misc_info:
201                    new_misc_info_path = new_misc_info.name
202                    self._RewriteMiscInfo(new_misc_info, misc_info, lpmake,
203                                          get_image)
204
205            self._ExecuteCommand(build_super_image, new_misc_info_path,
206                                 output_path)
207        finally:
208            if new_misc_info_path:
209                os.remove(new_misc_info_path)
210
211    @utils.TimeExecute(function_description="Make disabled vbmeta image.")
212    @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS)
213    def MakeDisabledVbmetaImage(self, output_path):
214        """Use avbtool to create a vbmeta image with verification disabled.
215
216        Args:
217            output_path: The path to the output vbmeta image.
218        """
219        avbtool = self._GetBinary(_AVBTOOL)
220        self._ExecuteCommand(avbtool, "make_vbmeta_image",
221                             "--flag", "2",
222                             "--padding_size", "4096",
223                             "--output", output_path)
224
225    @staticmethod
226    def _RewriteSystemQemuConfig(output_file, input_file, get_image):
227        """Rewrite image paths in system-qemu-config.txt.
228
229        Sample input_file:
230        out/target/product/generic_x86_64/vbmeta.img vbmeta 1
231        out/target/product/generic_x86_64/super.img super 2
232
233        Sample output_file:
234        /path/to/vbmeta.img vbmeta 1
235        /path/to/super.img super 2
236
237        This method replaces the first entry of each line with the path
238        returned by get_image.
239
240        Args:
241            output_file: The output file object.
242            input_file: The input file object.
243            get_image: A function that takes the partition name as the
244                       parameter and returns the image path.
245        """
246        for line in input_file:
247            split_line = line.split()
248            if len(split_line) == 3:
249                output_file.write("%s %s %s\n" % (get_image(split_line[1]),
250                                                  split_line[1],
251                                                  split_line[2]))
252            else:
253                output_file.write(line)
254
255    @utils.TimeExecute(function_description="Make combined image")
256    @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS)
257    def MkCombinedImg(self, output_path, system_qemu_config_path, get_image):
258        """Use mk_combined_img to create a disk image.
259
260        Args:
261            output_path: The path to the output disk image.
262            system_qemu_config: The path to the config that provides the
263                                parition information on the disk.
264            get_image: A function that takes the partition name as the
265                       parameter and returns the image path.
266        """
267        mk_combined_img = self._GetBinary(_MK_COMBINED_IMG)
268        sgdisk = self._GetBinary(_SGDISK)
269        simg2img = self._GetBinary(_SIMG2IMG)
270
271        new_config_path = None
272        try:
273            with open(system_qemu_config_path, "r") as config:
274                with tempfile.NamedTemporaryFile(
275                    prefix="system-qemu-config_", suffix=".txt",
276                    delete=False) as new_config:
277                    new_config_path = new_config.name
278                    self._RewriteSystemQemuConfig(new_config, config,
279                                                  get_image)
280
281            mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
282            self._ExecuteCommand(mk_combined_img,
283                                 "-i", new_config_path,
284                                 "-o", output_path,
285                                 env=mk_combined_img_env)
286        finally:
287            if new_config_path:
288                os.remove(new_config_path)
289