# Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """OtaTools class.""" import logging import os import subprocess import tempfile from six import b from acloud import errors from acloud.internal import constants from acloud.internal.lib import utils logger = logging.getLogger(__name__) _BIN_DIR_NAME = "bin" _LPMAKE = "lpmake" _BUILD_SUPER_IMAGE = "build_super_image" _AVBTOOL = "avbtool" _SGDISK = "sgdisk" _SIMG2IMG = "simg2img" _MK_COMBINED_IMG = "mk_combined_img" _BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30 _AVBTOOL_TIMEOUT_SECS = 30 _MK_COMBINED_IMG_TIMEOUT_SECS = 180 _MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` " "in build environment, or set --local-tool to an " "extracted otatools.zip.") def FindOtaTools(search_paths): """Find OTA tools in the search paths and in build environment. Args: search_paths: List of paths, the directories to search for OTA tools. Returns: The directory containing OTA tools. Raises: errors.CheckPathError if OTA tools are not found. """ for search_path in search_paths: if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME, _BUILD_SUPER_IMAGE)): return search_path for env_host_out in [constants.ENV_ANDROID_SOONG_HOST_OUT, constants.ENV_ANDROID_HOST_OUT]: host_out_dir = os.environ.get(env_host_out) if (host_out_dir and os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME, _BUILD_SUPER_IMAGE))): return host_out_dir raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG % {"tool_name": "OTA tool directory"}) def GetImageForPartition(partition_name, image_dir, **image_paths): """Map a partition name to an image path. This function is used with BuildSuperImage or MkCombinedImg to mix image_dir and image_paths into the output file. Args: partition_name: String, e.g., "system", "product", and "vendor". image_dir: String, the directory to search for the images that are not given in image_paths. image_paths: Pairs of partition names and image paths. Returns: The image path if the partition is in image_paths. Otherwise, this function returns the path under image_dir. Raises errors.GetLocalImageError if the image does not exist. """ image_path = (image_paths.get(partition_name) or os.path.join(image_dir, partition_name + ".img")) if not os.path.isfile(image_path): raise errors.GetLocalImageError( "Cannot find image for partition %s" % partition_name) return image_path class OtaTools: """The class that executes OTA tool commands.""" def __init__(self, ota_tools_dir): self._ota_tools_dir = os.path.abspath(ota_tools_dir) def _GetBinary(self, name): """Get an executable file from _ota_tools_dir. Args: name: String, the file name. Returns: String, the absolute path. Raises: errors.NoExecuteCmd if the file does not exist. """ path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name) if not os.path.isfile(path): raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG % {"tool_name": name}) utils.SetExecutable(path) return path @staticmethod def _ExecuteCommand(*command, **popen_args): """Execute a command and log the output. This method waits for the process to terminate. It kills the process if it's interrupted due to timeout. Args: command: Strings, the command. popen_kwargs: The arguments to be passed to subprocess.Popen. Raises: errors.SubprocessFail if the process returns non-zero. """ proc = None try: logger.info("Execute %s", command) popen_args["stdin"] = subprocess.PIPE popen_args["stdout"] = subprocess.PIPE popen_args["stderr"] = subprocess.PIPE # Some OTA tools are Python scripts in different versions. The # PYTHONPATH for acloud may be incompatible with the tools. if "env" not in popen_args and "PYTHONPATH" in os.environ: popen_env = os.environ.copy() del popen_env["PYTHONPATH"] popen_args["env"] = popen_env proc = subprocess.Popen(command, **popen_args) stdout, stderr = proc.communicate() logger.info("%s stdout: %s", command[0], stdout) logger.info("%s stderr: %s", command[0], stderr) if proc.returncode != 0: raise errors.SubprocessFail("%s returned %d." % (command[0], proc.returncode)) finally: if proc and proc.poll() is None: logger.info("Kill %s", command[0]) proc.kill() @staticmethod def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image): """Rewrite lpmake and image paths in misc_info.txt. Misc info consists of multiple lines of =. Sample input_file: lpmake=lpmake dynamic_partition_list= system system_ext product vendor Sample output_file: lpmake=/path/to/lpmake dynamic_partition_list= system system_ext product vendor system_image=/path/to/system.img system_ext_image=/path/to/system_ext.img product_image=/path/to/product.img vendor_image=/path/to/vendor.img This method replaces lpmake with the specified path, and sets *_image for every partition in dynamic_partition_list. Args: output_file: The output file object. input_file: The input file object. lpmake_path: The path to lpmake binary. get_image: A function that takes the partition name as the parameter and returns the image path. """ partition_names = () for line in input_file: split_line = line.strip().split("=", 1) if len(split_line) < 2: split_line = (split_line[0], "") if split_line[0] == "dynamic_partition_list": partition_names = split_line[1].split() elif split_line[0] == "lpmake": output_file.write(b("lpmake=%s\n" % lpmake_path)) continue elif split_line[0].endswith("_image"): continue output_file.write(b(line)) if not partition_names: logger.w("No dynamic partition list in misc info.") for partition_name in partition_names: output_file.write(b("%s_image=%s\n" % (partition_name, get_image(partition_name)))) @utils.TimeExecute(function_description="Build super image") @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS) def BuildSuperImage(self, output_path, misc_info_path, get_image): """Use build_super_image to create a super image. Args: output_path: The path to the output super image. misc_info_path: The path to the misc info that provides parameters to create the super image. get_image: A function that takes the partition name as the parameter and returns the image path. """ build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE) lpmake = self._GetBinary(_LPMAKE) new_misc_info_path = None try: with open(misc_info_path, "r") as misc_info: with tempfile.NamedTemporaryFile( prefix="misc_info_", suffix=".txt", delete=False) as new_misc_info: new_misc_info_path = new_misc_info.name self._RewriteMiscInfo(new_misc_info, misc_info, lpmake, get_image) self._ExecuteCommand(build_super_image, new_misc_info_path, output_path) finally: if new_misc_info_path: os.remove(new_misc_info_path) @utils.TimeExecute(function_description="Make disabled vbmeta image.") @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS) def MakeDisabledVbmetaImage(self, output_path): """Use avbtool to create a vbmeta image with verification disabled. Args: output_path: The path to the output vbmeta image. """ avbtool = self._GetBinary(_AVBTOOL) self._ExecuteCommand(avbtool, "make_vbmeta_image", "--flag", "2", "--padding_size", "4096", "--output", output_path) @staticmethod def _RewriteSystemQemuConfig(output_file, input_file, get_image): """Rewrite image paths in system-qemu-config.txt. Sample input_file: out/target/product/generic_x86_64/vbmeta.img vbmeta 1 out/target/product/generic_x86_64/super.img super 2 Sample output_file: /path/to/vbmeta.img vbmeta 1 /path/to/super.img super 2 This method replaces the first entry of each line with the path returned by get_image. Args: output_file: The output file object. input_file: The input file object. get_image: A function that takes the partition name as the parameter and returns the image path. """ for line in input_file: split_line = line.split() if len(split_line) == 3: output_file.write(b("%s %s %s\n" % (get_image(split_line[1]), split_line[1], split_line[2]))) else: output_file.write(b(line)) @utils.TimeExecute(function_description="Make combined image") @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS) def MkCombinedImg(self, output_path, system_qemu_config_path, get_image): """Use mk_combined_img to create a disk image. Args: output_path: The path to the output disk image. system_qemu_config: The path to the config that provides the parition information on the disk. get_image: A function that takes the partition name as the parameter and returns the image path. """ mk_combined_img = self._GetBinary(_MK_COMBINED_IMG) sgdisk = self._GetBinary(_SGDISK) simg2img = self._GetBinary(_SIMG2IMG) new_config_path = None try: with open(system_qemu_config_path, "r") as config: with tempfile.NamedTemporaryFile( prefix="system-qemu-config_", suffix=".txt", delete=False) as new_config: new_config_path = new_config.name self._RewriteSystemQemuConfig(new_config, config, get_image) mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} self._ExecuteCommand(mk_combined_img, "-i", new_config_path, "-o", output_path, env=mk_combined_img_env) finally: if new_config_path: os.remove(new_config_path)