1#!/usr/bin/env python 2# 3# Copyright (C) 2020 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17"""Unit tests for apex_compression_tool.""" 18import hashlib 19import logging 20import os 21import shutil 22import subprocess 23import tempfile 24import unittest 25from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED 26 27import apex_manifest_pb2 28 29logger = logging.getLogger(__name__) 30 31TEST_APEX = 'com.android.example.apex' 32 33# In order to debug test failures, set DEBUG_TEST to True and run the test from 34# local workstation bypassing atest, e.g.: 35# $ m apex_compression_tool_test && \ 36# out/host/linux-x86/nativetest64/apex_compression_tool_test/\ 37# apex_compression_tool_test 38# 39# the test will print out the command used, and the temporary files used by the 40# test. 41DEBUG_TEST = False 42 43 44def run(args, verbose=None, **kwargs): 45 """Creates and returns a subprocess.Popen object. 46 47 Args: 48 args: The command represented as a list of strings. 49 verbose: Whether the commands should be shown. Default to the global 50 verbosity if unspecified. 51 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 52 stdin, etc. stdout and stderr will default to subprocess.PIPE and 53 subprocess.STDOUT respectively unless caller specifies any of them. 54 universal_newlines will default to True, as most of the users in 55 releasetools expect string output. 56 57 Returns: 58 A subprocess.Popen object. 59 """ 60 if 'stdout' not in kwargs and 'stderr' not in kwargs: 61 kwargs['stdout'] = subprocess.PIPE 62 kwargs['stderr'] = subprocess.STDOUT 63 if 'universal_newlines' not in kwargs: 64 kwargs['universal_newlines'] = True 65 if DEBUG_TEST: 66 print('\nRunning: \n%s\n' % ' '.join(args)) 67 # Don't log any if caller explicitly says so. 68 if verbose: 69 logger.info(' Running: \'%s\'', ' '.join(args)) 70 return subprocess.Popen(args, **kwargs) 71 72 73def run_host_command(args, verbose=None, **kwargs): 74 host_build_top = os.environ.get('ANDROID_BUILD_TOP') 75 if host_build_top: 76 host_command_dir = os.path.join(host_build_top, 77 'out/soong/host/linux-x86/bin') 78 args[0] = os.path.join(host_command_dir, args[0]) 79 return run_and_check_output(args, verbose, **kwargs) 80 81 82def run_and_check_output(args, verbose=None, **kwargs): 83 """Runs the given command and returns the output. 84 85 Args: 86 args: The command represented as a list of strings. 87 verbose: Whether the commands should be shown. Default to the global 88 verbosity if unspecified. 89 kwargs: Any additional args to be passed to subprocess.Popen(), such as env, 90 stdin, etc. stdout and stderr will default to subprocess.PIPE and 91 subprocess.STDOUT respectively unless caller specifies any of them. 92 93 Returns: 94 The output string. 95 96 Raises: 97 ExternalError: On non-zero exit from the command. 98 """ 99 100 proc = run(args, verbose=verbose, **kwargs) 101 output, _ = proc.communicate() 102 if output is None: 103 output = '' 104 # Don't log any if caller explicitly says so. 105 if verbose: 106 logger.info('%s', output.rstrip()) 107 if proc.returncode != 0: 108 raise RuntimeError( 109 "Failed to run command '{}' (exit code {}):\n{}".format( 110 args, proc.returncode, output)) 111 return output 112 113 114def get_current_dir(): 115 """Returns the current dir, relative to the script dir.""" 116 # The script dir is the one we want, which could be different from pwd. 117 current_dir = os.path.dirname(os.path.realpath(__file__)) 118 return current_dir 119 120 121def get_sha1sum(file_path): 122 h = hashlib.sha256() 123 124 with open(file_path, 'rb') as file: 125 while True: 126 # Reading is buffered, so we can read smaller chunks. 127 chunk = file.read(h.block_size) 128 if not chunk: 129 break 130 h.update(chunk) 131 132 return h.hexdigest() 133 134 135class ApexCompressionTest(unittest.TestCase): 136 def setUp(self): 137 self._to_cleanup = [] 138 139 def tearDown(self): 140 if not DEBUG_TEST: 141 for i in self._to_cleanup: 142 if os.path.isdir(i): 143 shutil.rmtree(i, ignore_errors=True) 144 else: 145 os.remove(i) 146 del self._to_cleanup[:] 147 else: 148 print('Cleanup: ' + str(self._to_cleanup)) 149 150 def _run_apex_compression_tool(self, args): 151 cmd = ['apex_compression_tool'] 152 host_build_top = os.environ.get('ANDROID_BUILD_TOP') 153 if host_build_top: 154 os.environ['APEX_COMPRESSION_TOOL_PATH'] = ( 155 os.path.join(host_build_top, 'out/soong/host/linux-x86/bin') 156 + ':' + os.path.join(host_build_top, 'prebuilts/sdk/tools/linux/bin')) 157 else: 158 os.environ['APEX_COMPRESSION_TOOL_PATH'] = os.path.dirname( 159 shutil.which('apex_compression_tool')) 160 cmd.extend(args) 161 run_host_command(cmd, True) 162 163 def _get_container_files(self, apex_file_path): 164 dir_name = tempfile.mkdtemp( 165 prefix=self._testMethodName + '_container_files_') 166 self._to_cleanup.append(dir_name) 167 with ZipFile(apex_file_path, 'r') as zip_obj: 168 zip_obj.extractall(path=dir_name) 169 files = {} 170 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 171 'apex_build_info.pb', 'apex_payload.img', 'apex_payload.zip', 172 'AndroidManifest.xml', 'original_apex']: 173 file_path = os.path.join(dir_name, i) 174 if os.path.exists(file_path): 175 files[i] = file_path 176 177 image_file = files.get('apex_payload.img', None) 178 if image_file is None: 179 image_file = files.get('apex_payload.zip', None) 180 else: 181 files['apex_payload'] = image_file 182 # Also retrieve the root digest of the image 183 avbtool_cmd = ['avbtool', 184 'print_partition_digests', '--image', files['apex_payload']] 185 # avbtool_cmd output has format "<name>: <value>" 186 files['digest'] = run_host_command( 187 avbtool_cmd, True).split(': ')[1].strip() 188 189 return files 190 191 def _get_manifest_string(self, manifest_path): 192 cmd = ['conv_apex_manifest'] 193 cmd.extend([ 194 'print', 195 manifest_path 196 ]) 197 return run_host_command(cmd, 'True') 198 199 # Mutates the manifest located at |manifest_path| 200 def _unset_original_apex_digest(self, manifest_path): 201 # Open the protobuf 202 with open(manifest_path, 'rb') as f: 203 pb = apex_manifest_pb2.ApexManifest() 204 pb.ParseFromString(f.read()) 205 pb.ClearField('capexMetadata') 206 with open(manifest_path, 'wb') as f: 207 f.write(pb.SerializeToString()) 208 209 def _compress_apex(self, uncompressed_apex_fp): 210 """Returns file path to compressed APEX""" 211 fd, compressed_apex_fp = tempfile.mkstemp( 212 prefix=self._testMethodName + '_compressed_', 213 suffix='.capex') 214 os.close(fd) 215 self._to_cleanup.append(compressed_apex_fp) 216 self._run_apex_compression_tool([ 217 'compress', 218 '--input', uncompressed_apex_fp, 219 '--output', compressed_apex_fp 220 ]) 221 return compressed_apex_fp 222 223 def _decompress_apex(self, compressed_apex_fp): 224 """Returns file path to decompressed APEX""" 225 decompressed_apex_fp = tempfile. \ 226 NamedTemporaryFile(prefix=self._testMethodName + '_decompressed_', 227 suffix='.apex').name 228 # Use deapexer to decompress 229 cmd = ['deapexer'] 230 cmd.extend([ 231 'decompress', 232 '--input', compressed_apex_fp, 233 '--output', decompressed_apex_fp 234 ]) 235 run_host_command(cmd, True) 236 237 self.assertTrue(os.path.exists(decompressed_apex_fp), 238 'Decompressed APEX does not exist') 239 self._to_cleanup.append(decompressed_apex_fp) 240 return decompressed_apex_fp 241 242 def _get_type(self, apex_file_path): 243 cmd = ['deapexer', 'info', '--print-type', apex_file_path] 244 return run_host_command(cmd, True).strip() 245 246 def test_compression(self): 247 uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex') 248 # TODO(samiul): try compressing a compressed APEX 249 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 250 251 # Verify output file has been created and is smaller than input file 252 uncompressed_file_size = os.path.getsize(uncompressed_apex_fp) 253 compressed_file_size = os.path.getsize(compressed_apex_fp) 254 self.assertGreater(compressed_file_size, 0, 'Compressed APEX is empty') 255 self.assertLess(compressed_file_size, uncompressed_file_size, 256 'Compressed APEX is not smaller than uncompressed APEX') 257 258 # Verify type of the apex is 'COMPRESSED' 259 self.assertEqual(self._get_type(compressed_apex_fp), 'COMPRESSED') 260 261 # Verify the contents of the compressed apex files 262 content_in_compressed_apex = self._get_container_files(compressed_apex_fp) 263 self.assertIsNotNone(content_in_compressed_apex['original_apex']) 264 content_in_uncompressed_apex = self._get_container_files( 265 uncompressed_apex_fp) 266 self.assertIsNotNone(content_in_uncompressed_apex['apex_payload']) 267 self.assertIsNotNone(content_in_uncompressed_apex['digest']) 268 269 # Verify that CAPEX manifest contains digest of original_apex 270 manifest_string = self._get_manifest_string( 271 content_in_compressed_apex['apex_manifest.pb']) 272 self.assertIn('originalApexDigest: "' 273 + content_in_uncompressed_apex['digest'] + '"', manifest_string) 274 275 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 276 'apex_build_info.pb', 'AndroidManifest.xml']: 277 if i in content_in_uncompressed_apex: 278 if i == 'apex_manifest.pb': 279 # Get rid of originalApexDigest field, which should be the 280 # only difference 281 self._unset_original_apex_digest(content_in_compressed_apex[i]) 282 self.assertEqual(get_sha1sum(content_in_compressed_apex[i]), 283 get_sha1sum(content_in_uncompressed_apex[i])) 284 285 def test_decompression(self): 286 # setup: create compressed APEX 287 uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex') 288 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 289 290 # Decompress it 291 decompressed_apex_fp = self._decompress_apex(compressed_apex_fp) 292 293 # Verify type of the apex is 'UNCOMPRESSED' 294 self.assertEqual(self._get_type(decompressed_apex_fp), 'UNCOMPRESSED') 295 296 # Verify decompressed APEX is same as uncompressed APEX 297 self.assertEqual(get_sha1sum(uncompressed_apex_fp), 298 get_sha1sum(decompressed_apex_fp), 299 'Decompressed APEX is not same as uncompressed APEX') 300 301 # Try decompressing uncompressed APEX. It should not work. 302 with self.assertRaises(RuntimeError) as error: 303 self._decompress_apex(uncompressed_apex_fp) 304 305 self.assertIn(uncompressed_apex_fp 306 + ' is not a compressed APEX', str(error.exception)) 307 308 def test_only_original_apex_is_compressed(self): 309 uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex') 310 compressed_apex_fp = self._compress_apex(uncompressed_apex_fp) 311 312 with ZipFile(compressed_apex_fp, 'r') as zip_obj: 313 self.assertEqual(zip_obj.getinfo('original_apex').compress_type, 314 ZIP_DEFLATED) 315 content_in_uncompressed_apex = self._get_container_files( 316 uncompressed_apex_fp) 317 for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey', 318 'apex_build_info.pb', 'AndroidManifest.xml']: 319 if i in content_in_uncompressed_apex: 320 self.assertEqual(zip_obj.getinfo(i).compress_type, ZIP_STORED) 321 322if __name__ == '__main__': 323 unittest.main(verbosity=2) 324