1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""OtaTools class."""
15
16import logging
17import os
18import subprocess
19import tempfile
20
21from six import b
22
23
24from acloud import errors
25from acloud.internal import constants
26from acloud.internal.lib import utils
27
28logger = logging.getLogger(__name__)
29
30_BIN_DIR_NAME = "bin"
31_LPMAKE = "lpmake"
32_BUILD_SUPER_IMAGE = "build_super_image"
33_AVBTOOL = "avbtool"
34_SGDISK = "sgdisk"
35_SIMG2IMG = "simg2img"
36_MK_COMBINED_IMG = "mk_combined_img"
37
38_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30
39_AVBTOOL_TIMEOUT_SECS = 30
40_MK_COMBINED_IMG_TIMEOUT_SECS = 180
41
42_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` "
43                          "in build environment, or set --local-tool to an "
44                          "extracted otatools.zip.")
45
46
47def FindOtaTools(search_paths):
48    """Find OTA tools in the search paths and in build environment.
49
50    Args:
51        search_paths: List of paths, the directories to search for OTA tools.
52
53    Returns:
54        The directory containing OTA tools.
55
56    Raises:
57        errors.CheckPathError if OTA tools are not found.
58    """
59    for search_path in search_paths:
60        if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME,
61                                       _BUILD_SUPER_IMAGE)):
62            return search_path
63    for env_host_out in [constants.ENV_ANDROID_SOONG_HOST_OUT,
64                         constants.ENV_ANDROID_HOST_OUT]:
65        host_out_dir = os.environ.get(env_host_out)
66        if (host_out_dir and
67                os.path.isfile(os.path.join(host_out_dir, _BIN_DIR_NAME,
68                                            _BUILD_SUPER_IMAGE))):
69            return host_out_dir
70
71    raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG %
72                                {"tool_name": "OTA tool directory"})
73
74
75def GetImageForPartition(partition_name, image_dir, **image_paths):
76    """Map a partition name to an image path.
77
78    This function is used with BuildSuperImage or MkCombinedImg to mix
79    image_dir and image_paths into the output file.
80
81    Args:
82        partition_name: String, e.g., "system", "product", and "vendor".
83        image_dir: String, the directory to search for the images that are not
84                   given in image_paths.
85        image_paths: Pairs of partition names and image paths.
86
87    Returns:
88        The image path if the partition is in image_paths.
89        Otherwise, this function returns the path under image_dir.
90
91    Raises
92        errors.GetLocalImageError if the image does not exist.
93    """
94    image_path = (image_paths.get(partition_name) or
95                  os.path.join(image_dir, partition_name + ".img"))
96    if not os.path.isfile(image_path):
97        raise errors.GetLocalImageError(
98            "Cannot find image for partition %s" % partition_name)
99    return image_path
100
101
102class OtaTools:
103    """The class that executes OTA tool commands."""
104
105    def __init__(self, ota_tools_dir):
106        self._ota_tools_dir = os.path.abspath(ota_tools_dir)
107
108    def _GetBinary(self, name):
109        """Get an executable file from _ota_tools_dir.
110
111        Args:
112            name: String, the file name.
113
114        Returns:
115            String, the absolute path.
116
117        Raises:
118            errors.NoExecuteCmd if the file does not exist.
119        """
120        path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name)
121        if not os.path.isfile(path):
122            raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG %
123                                      {"tool_name": name})
124        utils.SetExecutable(path)
125        return path
126
127    @staticmethod
128    def _ExecuteCommand(*command, **popen_args):
129        """Execute a command and log the output.
130
131        This method waits for the process to terminate. It kills the process
132        if it's interrupted due to timeout.
133
134        Args:
135            command: Strings, the command.
136            popen_kwargs: The arguments to be passed to subprocess.Popen.
137
138        Raises:
139            errors.SubprocessFail if the process returns non-zero.
140        """
141        proc = None
142        try:
143            logger.info("Execute %s", command)
144            popen_args["stdin"] = subprocess.PIPE
145            popen_args["stdout"] = subprocess.PIPE
146            popen_args["stderr"] = subprocess.PIPE
147
148            # Some OTA tools are Python scripts in different versions. The
149            # PYTHONPATH for acloud may be incompatible with the tools.
150            if "env" not in popen_args and "PYTHONPATH" in os.environ:
151                popen_env = os.environ.copy()
152                del popen_env["PYTHONPATH"]
153                popen_args["env"] = popen_env
154
155            proc = subprocess.Popen(command, **popen_args)
156            stdout, stderr = proc.communicate()
157            logger.info("%s stdout: %s", command[0], stdout)
158            logger.info("%s stderr: %s", command[0], stderr)
159
160            if proc.returncode != 0:
161                raise errors.SubprocessFail("%s returned %d." %
162                                            (command[0], proc.returncode))
163        finally:
164            if proc and proc.poll() is None:
165                logger.info("Kill %s", command[0])
166                proc.kill()
167
168    @staticmethod
169    def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image):
170        """Rewrite lpmake and image paths in misc_info.txt.
171
172        Misc info consists of multiple lines of <key>=<value>.
173        Sample input_file:
174        lpmake=lpmake
175        dynamic_partition_list= system system_ext product vendor
176
177        Sample output_file:
178        lpmake=/path/to/lpmake
179        dynamic_partition_list= system system_ext product vendor
180        system_image=/path/to/system.img
181        system_ext_image=/path/to/system_ext.img
182        product_image=/path/to/product.img
183        vendor_image=/path/to/vendor.img
184
185        This method replaces lpmake with the specified path, and sets
186        *_image for every partition in dynamic_partition_list.
187
188        Args:
189            output_file: The output file object.
190            input_file: The input file object.
191            lpmake_path: The path to lpmake binary.
192            get_image: A function that takes the partition name as the
193                       parameter and returns the image path.
194        """
195        partition_names = ()
196        for line in input_file:
197            split_line = line.strip().split("=", 1)
198            if len(split_line) < 2:
199                split_line = (split_line[0], "")
200            if split_line[0] == "dynamic_partition_list":
201                partition_names = split_line[1].split()
202            elif split_line[0] == "lpmake":
203                output_file.write(b("lpmake=%s\n" % lpmake_path))
204                continue
205            elif split_line[0].endswith("_image"):
206                continue
207            output_file.write(b(line))
208
209        if not partition_names:
210            logger.w("No dynamic partition list in misc info.")
211
212        for partition_name in partition_names:
213            output_file.write(b("%s_image=%s\n" %
214                                (partition_name, get_image(partition_name))))
215
216    @utils.TimeExecute(function_description="Build super image")
217    @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS)
218    def BuildSuperImage(self, output_path, misc_info_path, get_image):
219        """Use build_super_image to create a super image.
220
221        Args:
222            output_path: The path to the output super image.
223            misc_info_path: The path to the misc info that provides parameters
224                            to create the super image.
225            get_image: A function that takes the partition name as the
226                       parameter and returns the image path.
227        """
228        build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE)
229        lpmake = self._GetBinary(_LPMAKE)
230
231        new_misc_info_path = None
232        try:
233            with open(misc_info_path, "r") as misc_info:
234                with tempfile.NamedTemporaryFile(
235                        prefix="misc_info_", suffix=".txt",
236                        delete=False) as new_misc_info:
237                    new_misc_info_path = new_misc_info.name
238                    self._RewriteMiscInfo(new_misc_info, misc_info, lpmake,
239                                          get_image)
240
241            self._ExecuteCommand(build_super_image, new_misc_info_path,
242                                 output_path)
243        finally:
244            if new_misc_info_path:
245                os.remove(new_misc_info_path)
246
247    @utils.TimeExecute(function_description="Make disabled vbmeta image.")
248    @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS)
249    def MakeDisabledVbmetaImage(self, output_path):
250        """Use avbtool to create a vbmeta image with verification disabled.
251
252        Args:
253            output_path: The path to the output vbmeta image.
254        """
255        avbtool = self._GetBinary(_AVBTOOL)
256        self._ExecuteCommand(avbtool, "make_vbmeta_image",
257                             "--flag", "2",
258                             "--padding_size", "4096",
259                             "--output", output_path)
260
261    @staticmethod
262    def _RewriteSystemQemuConfig(output_file, input_file, get_image):
263        """Rewrite image paths in system-qemu-config.txt.
264
265        Sample input_file:
266        out/target/product/generic_x86_64/vbmeta.img vbmeta 1
267        out/target/product/generic_x86_64/super.img super 2
268
269        Sample output_file:
270        /path/to/vbmeta.img vbmeta 1
271        /path/to/super.img super 2
272
273        This method replaces the first entry of each line with the path
274        returned by get_image.
275
276        Args:
277            output_file: The output file object.
278            input_file: The input file object.
279            get_image: A function that takes the partition name as the
280                       parameter and returns the image path.
281        """
282        for line in input_file:
283            split_line = line.split()
284            if len(split_line) == 3:
285                output_file.write(b("%s %s %s\n" % (get_image(split_line[1]),
286                                                    split_line[1],
287                                                    split_line[2])))
288            else:
289                output_file.write(b(line))
290
291    @utils.TimeExecute(function_description="Make combined image")
292    @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS)
293    def MkCombinedImg(self, output_path, system_qemu_config_path, get_image):
294        """Use mk_combined_img to create a disk image.
295
296        Args:
297            output_path: The path to the output disk image.
298            system_qemu_config: The path to the config that provides the
299                                parition information on the disk.
300            get_image: A function that takes the partition name as the
301                       parameter and returns the image path.
302        """
303        mk_combined_img = self._GetBinary(_MK_COMBINED_IMG)
304        sgdisk = self._GetBinary(_SGDISK)
305        simg2img = self._GetBinary(_SIMG2IMG)
306
307        new_config_path = None
308        try:
309            with open(system_qemu_config_path, "r") as config:
310                with tempfile.NamedTemporaryFile(
311                        prefix="system-qemu-config_", suffix=".txt",
312                        delete=False) as new_config:
313                    new_config_path = new_config.name
314                    self._RewriteSystemQemuConfig(new_config, config,
315                                                  get_image)
316
317            mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
318            self._ExecuteCommand(mk_combined_img,
319                                 "-i", new_config_path,
320                                 "-o", output_path,
321                                 env=mk_combined_img_env)
322        finally:
323            if new_config_path:
324                os.remove(new_config_path)
325