1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apexer."""
18
19import hashlib
20import logging
21import os
22import re
23import shutil
24import stat
25import subprocess
26import tempfile
27import unittest
28from zipfile import ZipFile
29
30from apex_manifest import ValidateApexManifest
31
32logger = logging.getLogger(__name__)
33
34TEST_APEX = "com.android.example.apex"
35TEST_APEX_LEGACY = "com.android.example-legacy.apex"
36TEST_APEX_WITH_LOGGING_PARENT = "com.android.example-logging_parent.apex"
37TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME = "com.android.example-overridden_package_name.apex"
38
39TEST_PRIVATE_KEY = os.path.join("testdata", "com.android.example.apex.pem")
40TEST_X509_KEY = os.path.join("testdata", "com.android.example.apex.x509.pem")
41TEST_PK8_KEY = os.path.join("testdata", "com.android.example.apex.pk8")
42TEST_AVB_PUBLIC_KEY = os.path.join("testdata", "com.android.example.apex.avbpubkey")
43
44
45def run(args, verbose=None, **kwargs):
46    """Creates and returns a subprocess.Popen object.
47
48    Args:
49      args: The command represented as a list of strings.
50      verbose: Whether the commands should be shown. Default to the global
51          verbosity if unspecified.
52      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
53          stdin, etc. stdout and stderr will default to subprocess.PIPE and
54          subprocess.STDOUT respectively unless caller specifies any of them.
55          universal_newlines will default to True, as most of the users in
56          releasetools expect string output.
57
58    Returns:
59      A subprocess.Popen object.
60    """
61    if 'stdout' not in kwargs and 'stderr' not in kwargs:
62        kwargs['stdout'] = subprocess.PIPE
63        kwargs['stderr'] = subprocess.STDOUT
64    if 'universal_newlines' not in kwargs:
65        kwargs['universal_newlines'] = True
66    # Don't log any if caller explicitly says so.
67    if DEBUG_TEST:
68        print("\nRunning: \n%s\n" % " ".join(args))
69    if verbose:
70        logger.info("  Running: \"%s\"", " ".join(args))
71    return subprocess.Popen(args, **kwargs)
72
73
74def run_host_command(args, verbose=None, **kwargs):
75    host_build_top = os.environ.get("ANDROID_BUILD_TOP")
76    if host_build_top:
77        host_command_dir = os.path.join(host_build_top, "out/soong/host/linux-x86/bin")
78        args[0] = os.path.join(host_command_dir, args[0])
79    return run_and_check_output(args, verbose, **kwargs)
80
81
82def run_and_check_output(args, verbose=None, **kwargs):
83    """Runs the given command and returns the output.
84
85    Args:
86      args: The command represented as a list of strings.
87      verbose: Whether the commands should be shown. Default to the global
88          verbosity if unspecified.
89      kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
90          stdin, etc. stdout and stderr will default to subprocess.PIPE and
91          subprocess.STDOUT respectively unless caller specifies any of them.
92
93    Returns:
94      The output string.
95
96    Raises:
97      ExternalError: On non-zero exit from the command.
98    """
99    proc = run(args, verbose=verbose, **kwargs)
100    output, _ = proc.communicate()
101    if output is None:
102        output = ""
103    # Don't log any if caller explicitly says so.
104    if verbose:
105        logger.info("%s", output.rstrip())
106    if proc.returncode != 0:
107        raise RuntimeError(
108            "Failed to run command '{}' (exit code {}):\n{}".format(
109                args, proc.returncode, output))
110    return output
111
112
113def get_sha1sum(file_path):
114    h = hashlib.sha256()
115
116    with open(file_path, 'rb') as file:
117        while True:
118            # Reading is buffered, so we can read smaller chunks.
119            chunk = file.read(h.block_size)
120            if not chunk:
121                break
122            h.update(chunk)
123
124    return h.hexdigest()
125
126
127def get_current_dir():
128    """Returns the current dir, relative to the script dir."""
129    # The script dir is the one we want, which could be different from pwd.
130    current_dir = os.path.dirname(os.path.realpath(__file__))
131    return current_dir
132
133def round_up(size, unit):
134    assert unit & (unit - 1) == 0
135    return (size + unit - 1) & (~(unit - 1))
136
137# In order to debug test failures, set DEBUG_TEST to True and run the test from
138# local workstation bypassing atest, e.g.:
139# $ m apexer_test && out/host/linux-x86/nativetest64/apexer_test/apexer_test
140#
141# the test will print out the command used, and the temporary files used by the
142# test. You need to compare e.g. /tmp/test_simple_apex_input_XXXXXXXX.apex with
143# /tmp/test_simple_apex_repacked_YYYYYYYY.apex to check where they are
144# different.
145# A simple script to analyze the differences:
146#
147# FILE_INPUT=/tmp/test_simple_apex_input_XXXXXXXX.apex
148# FILE_OUTPUT=/tmp/test_simple_apex_repacked_YYYYYYYY.apex
149#
150# cd ~/tmp/
151# rm -rf input output
152# mkdir input output
153# unzip ${FILE_INPUT} -d input/
154# unzip ${FILE_OUTPUT} -d output/
155#
156# diff -r input/ output/
157#
158# For analyzing binary diffs I had mild success using the vbindiff utility.
159DEBUG_TEST = False
160
161
162class ApexerRebuildTest(unittest.TestCase):
163    def setUp(self):
164        self._to_cleanup = []
165        self._get_host_tools(os.path.join(get_current_dir(), "apexer_test_host_tools.zip"))
166
167    def tearDown(self):
168        if not DEBUG_TEST:
169            for i in self._to_cleanup:
170                if os.path.isdir(i):
171                    shutil.rmtree(i, ignore_errors=True)
172                else:
173                    os.remove(i)
174            del self._to_cleanup[:]
175        else:
176            print(self._to_cleanup)
177
178    def _get_host_tools(self, host_tools_file_path):
179        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_host_tools_")
180        self._to_cleanup.append(dir_name)
181        if os.path.isfile(host_tools_file_path):
182            with ZipFile(host_tools_file_path, 'r') as zip_obj:
183                zip_obj.extractall(path=dir_name)
184
185        files = {}
186        for i in ["apexer", "deapexer", "avbtool", "mke2fs", "sefcontext_compile", "e2fsdroid",
187            "resize2fs", "soong_zip", "aapt2", "merge_zips", "zipalign", "debugfs_static",
188            "signapk.jar", "android.jar"]:
189            file_path = os.path.join(dir_name, "bin", i)
190            if os.path.exists(file_path):
191                os.chmod(file_path, stat.S_IRUSR | stat.S_IXUSR);
192                files[i] = file_path
193            else:
194                files[i] = i
195        self.host_tools = files
196        self.host_tools_path = os.path.join(dir_name, "bin")
197
198        path = os.path.join(dir_name, "bin")
199        if "PATH" in os.environ:
200            path += ":" + os.environ["PATH"]
201        os.environ["PATH"] = path
202
203        ld_library_path = os.path.join(dir_name, "lib64")
204        if "LD_LIBRARY_PATH" in os.environ:
205            ld_library_path += ":" + os.environ["LD_LIBRARY_PATH"]
206        if "ANDROID_HOST_OUT" in os.environ:
207            ld_library_path += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
208        os.environ["LD_LIBRARY_PATH"] = ld_library_path
209
210    def _get_container_files(self, apex_file_path):
211        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_container_files_")
212        self._to_cleanup.append(dir_name)
213        with ZipFile(apex_file_path, 'r') as zip_obj:
214            zip_obj.extractall(path=dir_name)
215        files = {}
216        for i in ["apex_manifest.json", "apex_manifest.pb",
217                  "apex_build_info.pb", "assets",
218                  "apex_payload.img", "apex_payload.zip"]:
219            file_path = os.path.join(dir_name, i)
220            if os.path.exists(file_path):
221                files[i] = file_path
222        self.assertIn("apex_manifest.pb", files)
223        self.assertIn("apex_build_info.pb", files)
224
225        image_file = None
226        if "apex_payload.img" in files:
227            image_file = files["apex_payload.img"]
228        elif "apex_payload.zip" in files:
229            image_file = files["apex_payload.zip"]
230        self.assertIsNotNone(image_file)
231        files["apex_payload"] = image_file
232
233        return files
234
235    def _extract_payload_from_img(self, img_file_path):
236        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
237        self._to_cleanup.append(dir_name)
238        cmd = ["debugfs_static", '-R', 'rdump ./ %s' % dir_name, img_file_path]
239        run_host_command(cmd)
240
241        # Remove payload files added by apexer and e2fs tools.
242        for i in ["apex_manifest.json", "apex_manifest.pb"]:
243            if os.path.exists(os.path.join(dir_name, i)):
244                os.remove(os.path.join(dir_name, i))
245        if os.path.isdir(os.path.join(dir_name, "lost+found")):
246            shutil.rmtree(os.path.join(dir_name, "lost+found"))
247        return dir_name
248
249    def _extract_payload(self, apex_file_path):
250        dir_name = tempfile.mkdtemp(prefix=self._testMethodName+"_extracted_payload_")
251        self._to_cleanup.append(dir_name)
252        cmd = ["deapexer", "--debugfs_path", self.host_tools["debugfs_static"],
253            "extract", apex_file_path, dir_name]
254        run_host_command(cmd)
255
256        # Remove payload files added by apexer and e2fs tools.
257        for i in ["apex_manifest.json", "apex_manifest.pb"]:
258            if os.path.exists(os.path.join(dir_name, i)):
259                os.remove(os.path.join(dir_name, i))
260        if os.path.isdir(os.path.join(dir_name, "lost+found")):
261            shutil.rmtree(os.path.join(dir_name, "lost+found"))
262        return dir_name
263
264    def _run_apexer(self, container_files, payload_dir, args=[]):
265        unsigned_payload_only = False
266        payload_only = False
267        if "--unsigned_payload_only" in args:
268            unsigned_payload_only = True
269        if unsigned_payload_only or "--payload_only" in args:
270            payload_only = True
271
272        os.environ["APEXER_TOOL_PATH"] = (self.host_tools_path +
273            ":out/soong/host/linux-x86/bin:prebuilts/sdk/tools/linux/bin")
274        cmd = ["apexer", "--force", "--include_build_info", "--do_not_check_keyname"]
275        if DEBUG_TEST:
276            cmd.append('-v')
277        cmd.extend(["--apexer_tool_path", os.environ["APEXER_TOOL_PATH"]])
278        cmd.extend(["--android_jar_path", self.host_tools["android.jar"]])
279        cmd.extend(["--manifest", container_files["apex_manifest.pb"]])
280        if "apex_manifest.json" in container_files:
281            cmd.extend(["--manifest_json", container_files["apex_manifest.json"]])
282        cmd.extend(["--build_info", container_files["apex_build_info.pb"]])
283        if not payload_only and "assets" in container_files:
284            cmd.extend(["--assets_dir", "assets"])
285        if not unsigned_payload_only:
286            cmd.extend(["--key", os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
287            cmd.extend(["--pubkey", os.path.join(get_current_dir(), TEST_AVB_PUBLIC_KEY)])
288        cmd.extend(args)
289
290        # Decide on output file name
291        apex_suffix = ".apex.unsigned"
292        if payload_only:
293            apex_suffix = ".payload"
294        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=apex_suffix)
295        os.close(fd)
296        self._to_cleanup.append(fn)
297        cmd.extend([payload_dir, fn])
298
299        run_host_command(cmd)
300        return fn
301
302    def _get_java_toolchain(self):
303        java_toolchain = "java"
304        if os.path.isfile("prebuilts/jdk/jdk11/linux-x86/bin/java"):
305            java_toolchain = "prebuilts/jdk/jdk11/linux-x86/bin/java"
306        elif "ANDROID_JAVA_TOOLCHAIN" in os.environ:
307            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_TOOLCHAIN"], "java")
308        elif "ANDROID_JAVA_HOME" in os.environ:
309            java_toolchain = os.path.join(os.environ["ANDROID_JAVA_HOME"], "bin", "java")
310        elif "JAVA_HOME" in os.environ:
311            java_toolchain = os.path.join(os.environ["JAVA_HOME"], "bin", "java")
312
313        java_dep_lib = os.environ["LD_LIBRARY_PATH"]
314        if "ANDROID_HOST_OUT" in os.environ:
315            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_HOST_OUT"], "lib64")
316        if "ANDROID_BUILD_TOP" in os.environ:
317            java_dep_lib += ":" + os.path.join(os.environ["ANDROID_BUILD_TOP"],
318                "out/soong/host/linux-x86/lib64")
319
320        return [java_toolchain, java_dep_lib]
321
322    def _sign_apk_container(self, unsigned_apex):
323        fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".apex")
324        os.close(fd)
325        self._to_cleanup.append(fn)
326        java_toolchain, java_dep_lib = self._get_java_toolchain()
327        cmd = [
328            java_toolchain,
329            "-Djava.library.path=" + java_dep_lib,
330            "-jar", self.host_tools['signapk.jar'],
331            "-a", "4096",
332            os.path.join(get_current_dir(), TEST_X509_KEY),
333            os.path.join(get_current_dir(), TEST_PK8_KEY),
334            unsigned_apex, fn]
335        run_and_check_output(cmd)
336        return fn
337
338    def _sign_payload(self, container_files, unsigned_payload):
339        fd, signed_payload = \
340            tempfile.mkstemp(prefix=self._testMethodName+"_repacked_", suffix=".payload")
341        os.close(fd)
342        self._to_cleanup.append(signed_payload)
343        shutil.copyfile(unsigned_payload, signed_payload)
344
345        cmd = ['avbtool']
346        cmd.append('add_hashtree_footer')
347        cmd.append('--do_not_generate_fec')
348        cmd.extend(['--algorithm', 'SHA256_RSA4096'])
349        cmd.extend(['--hash_algorithm', 'sha256'])
350        cmd.extend(['--key', os.path.join(get_current_dir(), TEST_PRIVATE_KEY)])
351        manifest_apex = ValidateApexManifest(container_files["apex_manifest.pb"])
352        cmd.extend(['--prop', 'apex.key:' + manifest_apex.name])
353        # Set up the salt based on manifest content which includes name
354        # and version
355        salt = hashlib.sha256(manifest_apex.SerializeToString()).hexdigest()
356        cmd.extend(['--salt', salt])
357        cmd.extend(['--image', signed_payload])
358        cmd.append('--no_hashtree')
359        run_and_check_output(cmd)
360
361        return signed_payload
362
363    def _verify_payload(self, payload):
364        """Verifies that the payload is properly signed by avbtool"""
365        cmd = ["avbtool", "verify_image", "--image", payload, "--accept_zeroed_hashtree"]
366        run_and_check_output(cmd)
367
368    def _run_build_test(self, apex_name):
369        apex_file_path = os.path.join(get_current_dir(), apex_name + ".apex")
370        if DEBUG_TEST:
371            fd, fn = tempfile.mkstemp(prefix=self._testMethodName+"_input_", suffix=".apex")
372            os.close(fd)
373            shutil.copyfile(apex_file_path, fn)
374            self._to_cleanup.append(fn)
375        container_files = self._get_container_files(apex_file_path)
376        payload_dir = self._extract_payload(apex_file_path)
377        repack_apex_file_path = self._run_apexer(container_files, payload_dir)
378        resigned_apex_file_path = self._sign_apk_container(repack_apex_file_path)
379        self.assertEqual(get_sha1sum(apex_file_path), get_sha1sum(resigned_apex_file_path))
380
381    def test_simple_apex(self):
382        self._run_build_test(TEST_APEX)
383
384    def test_legacy_apex(self):
385        self._run_build_test(TEST_APEX_LEGACY)
386
387    def test_output_payload_only(self):
388        """Assert that payload-only output from apexer is same as the payload we get by unzipping
389        apex.
390        """
391        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
392        container_files = self._get_container_files(apex_file_path)
393        payload_dir = self._extract_payload(apex_file_path)
394        payload_only_file_path = self._run_apexer(container_files, payload_dir, ["--payload_only"])
395        self._verify_payload(payload_only_file_path)
396        self.assertEqual(get_sha1sum(payload_only_file_path),
397                         get_sha1sum(container_files["apex_payload"]))
398
399    def test_output_unsigned_payload_only(self):
400        """Assert that when unsigned-payload-only output from apexer is signed by the avb key, it is
401        same as the payload we get by unzipping apex.
402        """
403        apex_file_path = os.path.join(get_current_dir(), TEST_APEX + ".apex")
404        container_files = self._get_container_files(apex_file_path)
405        payload_dir = self._extract_payload(apex_file_path)
406        unsigned_payload_only_file_path = self._run_apexer(container_files, payload_dir,
407                                                  ["--unsigned_payload_only"])
408        with self.assertRaises(RuntimeError) as error:
409            self._verify_payload(unsigned_payload_only_file_path)
410        self.assertIn("Given image does not look like a vbmeta image", str(error.exception))
411        signed_payload = self._sign_payload(container_files, unsigned_payload_only_file_path)
412        self.assertEqual(get_sha1sum(signed_payload),
413                         get_sha1sum(container_files["apex_payload"]))
414
415        # Now assert that given an unsigned image and the original container
416        # files, we can produce an identical unsigned image.
417        unsigned_payload_dir = self._extract_payload_from_img(unsigned_payload_only_file_path)
418        unsigned_payload_only_2_file_path = self._run_apexer(container_files, unsigned_payload_dir,
419                                                             ["--unsigned_payload_only"])
420        self.assertEqual(get_sha1sum(unsigned_payload_only_file_path),
421                         get_sha1sum(unsigned_payload_only_2_file_path))
422
423    def test_apex_with_logging_parent(self):
424      self._run_build_test(TEST_APEX_WITH_LOGGING_PARENT)
425
426    def test_apex_with_overridden_package_name(self):
427      self._run_build_test(TEST_APEX_WITH_OVERRIDDEN_PACKAGE_NAME)
428
429
430if __name__ == '__main__':
431    unittest.main(verbosity=2)
432