• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17"""Unit tests for apex_compression_tool."""
18import hashlib
19import logging
20import os
21import shutil
22import subprocess
23import tempfile
24import unittest
25from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED
26
27import apex_manifest_pb2
28
29logger = logging.getLogger(__name__)
30
31TEST_APEX = 'com.android.example.apex'
32
33# In order to debug test failures, set DEBUG_TEST to True and run the test from
34# local workstation bypassing atest, e.g.:
35# $ m apex_compression_tool_test && \
36#   out/host/linux-x86/nativetest64/apex_compression_tool_test/\
37#   apex_compression_tool_test
38#
39# the test will print out the command used, and the temporary files used by the
40# test.
41DEBUG_TEST = False
42
43
44def run(args, verbose=None, **kwargs):
45  """Creates and returns a subprocess.Popen object.
46
47  Args:
48    args: The command represented as a list of strings.
49    verbose: Whether the commands should be shown. Default to the global
50        verbosity if unspecified.
51    kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
52        stdin, etc. stdout and stderr will default to subprocess.PIPE and
53        subprocess.STDOUT respectively unless caller specifies any of them.
54        universal_newlines will default to True, as most of the users in
55        releasetools expect string output.
56
57  Returns:
58    A subprocess.Popen object.
59  """
60  if 'stdout' not in kwargs and 'stderr' not in kwargs:
61    kwargs['stdout'] = subprocess.PIPE
62    kwargs['stderr'] = subprocess.STDOUT
63  if 'universal_newlines' not in kwargs:
64    kwargs['universal_newlines'] = True
65  if DEBUG_TEST:
66    print('\nRunning: \n%s\n' % ' '.join(args))
67  # Don't log any if caller explicitly says so.
68  if verbose:
69    logger.info('  Running: \'%s\'', ' '.join(args))
70  return subprocess.Popen(args, **kwargs)
71
72
73def run_host_command(args, verbose=None, **kwargs):
74  host_build_top = os.environ.get('ANDROID_BUILD_TOP')
75  if host_build_top:
76    host_command_dir = os.path.join(host_build_top,
77                                    'out/soong/host/linux-x86/bin')
78    args[0] = os.path.join(host_command_dir, args[0])
79  return run_and_check_output(args, verbose, **kwargs)
80
81
82def run_and_check_output(args, verbose=None, **kwargs):
83  """Runs the given command and returns the output.
84
85  Args:
86    args: The command represented as a list of strings.
87    verbose: Whether the commands should be shown. Default to the global
88        verbosity if unspecified.
89    kwargs: Any additional args to be passed to subprocess.Popen(), such as env,
90        stdin, etc. stdout and stderr will default to subprocess.PIPE and
91        subprocess.STDOUT respectively unless caller specifies any of them.
92
93  Returns:
94    The output string.
95
96  Raises:
97    ExternalError: On non-zero exit from the command.
98  """
99
100  proc = run(args, verbose=verbose, **kwargs)
101  output, _ = proc.communicate()
102  if output is None:
103    output = ''
104  # Don't log any if caller explicitly says so.
105  if verbose:
106    logger.info('%s', output.rstrip())
107  if proc.returncode != 0:
108    raise RuntimeError(
109        "Failed to run command '{}' (exit code {}):\n{}".format(
110            args, proc.returncode, output))
111  return output
112
113
114def get_current_dir():
115  """Returns the current dir, relative to the script dir."""
116  # The script dir is the one we want, which could be different from pwd.
117  current_dir = os.path.dirname(os.path.realpath(__file__))
118  return current_dir
119
120
121def get_sha1sum(file_path):
122  h = hashlib.sha256()
123
124  with open(file_path, 'rb') as file:
125    while True:
126      # Reading is buffered, so we can read smaller chunks.
127      chunk = file.read(h.block_size)
128      if not chunk:
129        break
130      h.update(chunk)
131
132  return h.hexdigest()
133
134
135class ApexCompressionTest(unittest.TestCase):
136  def setUp(self):
137    self._to_cleanup = []
138
139  def tearDown(self):
140    if not DEBUG_TEST:
141      for i in self._to_cleanup:
142        if os.path.isdir(i):
143          shutil.rmtree(i, ignore_errors=True)
144        else:
145          os.remove(i)
146      del self._to_cleanup[:]
147    else:
148      print('Cleanup: ' + str(self._to_cleanup))
149
150  def _run_apex_compression_tool(self, args):
151    cmd = ['apex_compression_tool']
152    host_build_top = os.environ.get('ANDROID_BUILD_TOP')
153    if host_build_top:
154      os.environ['APEX_COMPRESSION_TOOL_PATH'] = (
155          os.path.join(host_build_top, 'out/soong/host/linux-x86/bin')
156          + ':' + os.path.join(host_build_top, 'prebuilts/sdk/tools/linux/bin'))
157    else:
158      os.environ['APEX_COMPRESSION_TOOL_PATH'] = os.path.dirname(
159        shutil.which('apex_compression_tool'))
160    cmd.extend(args)
161    run_host_command(cmd, True)
162
163  def _get_container_files(self, apex_file_path):
164    dir_name = tempfile.mkdtemp(
165        prefix=self._testMethodName + '_container_files_')
166    self._to_cleanup.append(dir_name)
167    with ZipFile(apex_file_path, 'r') as zip_obj:
168      zip_obj.extractall(path=dir_name)
169    files = {}
170    for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
171              'apex_build_info.pb', 'apex_payload.img', 'apex_payload.zip',
172              'AndroidManifest.xml', 'original_apex']:
173      file_path = os.path.join(dir_name, i)
174      if os.path.exists(file_path):
175        files[i] = file_path
176
177    image_file = files.get('apex_payload.img', None)
178    if image_file is None:
179      image_file = files.get('apex_payload.zip', None)
180    else:
181      files['apex_payload'] = image_file
182      # Also retrieve the root digest of the image
183      avbtool_cmd = ['avbtool',
184        'print_partition_digests', '--image', files['apex_payload']]
185      # avbtool_cmd output has format "<name>: <value>"
186      files['digest'] = run_host_command(
187        avbtool_cmd, True).split(': ')[1].strip()
188
189    return files
190
191  def _get_manifest_string(self, manifest_path):
192    cmd = ['conv_apex_manifest']
193    cmd.extend([
194        'print',
195        manifest_path
196    ])
197    return run_host_command(cmd, 'True')
198
199  # Mutates the manifest located at |manifest_path|
200  def _unset_original_apex_digest(self, manifest_path):
201    # Open the protobuf
202    with open(manifest_path, 'rb') as f:
203      pb = apex_manifest_pb2.ApexManifest()
204      pb.ParseFromString(f.read())
205    pb.ClearField('capexMetadata')
206    with open(manifest_path, 'wb') as f:
207      f.write(pb.SerializeToString())
208
209  def _compress_apex(self, uncompressed_apex_fp):
210    """Returns file path to compressed APEX"""
211    fd, compressed_apex_fp = tempfile.mkstemp(
212        prefix=self._testMethodName + '_compressed_',
213        suffix='.capex')
214    os.close(fd)
215    self._to_cleanup.append(compressed_apex_fp)
216    self._run_apex_compression_tool([
217        'compress',
218        '--input', uncompressed_apex_fp,
219        '--output', compressed_apex_fp
220    ])
221    return compressed_apex_fp
222
223  def _decompress_apex(self, compressed_apex_fp):
224    """Returns file path to decompressed APEX"""
225    decompressed_apex_fp = tempfile. \
226      NamedTemporaryFile(prefix=self._testMethodName + '_decompressed_',
227                         suffix='.apex').name
228    # Use deapexer to decompress
229    cmd = ['deapexer']
230    cmd.extend([
231        'decompress',
232        '--input', compressed_apex_fp,
233        '--output', decompressed_apex_fp
234    ])
235    run_host_command(cmd, True)
236
237    self.assertTrue(os.path.exists(decompressed_apex_fp),
238                    'Decompressed APEX does not exist')
239    self._to_cleanup.append(decompressed_apex_fp)
240    return decompressed_apex_fp
241
242  def _get_type(self, apex_file_path):
243    cmd = ['deapexer', 'info', '--print-type', apex_file_path]
244    return run_host_command(cmd, True).strip()
245
246  def test_compression(self):
247    uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex')
248    # TODO(samiul): try compressing a compressed APEX
249    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
250
251    # Verify output file has been created and is smaller than input file
252    uncompressed_file_size = os.path.getsize(uncompressed_apex_fp)
253    compressed_file_size = os.path.getsize(compressed_apex_fp)
254    self.assertGreater(compressed_file_size, 0, 'Compressed APEX is empty')
255    self.assertLess(compressed_file_size, uncompressed_file_size,
256                    'Compressed APEX is not smaller than uncompressed APEX')
257
258    # Verify type of the apex is 'COMPRESSED'
259    self.assertEqual(self._get_type(compressed_apex_fp), 'COMPRESSED')
260
261    # Verify the contents of the compressed apex files
262    content_in_compressed_apex = self._get_container_files(compressed_apex_fp)
263    self.assertIsNotNone(content_in_compressed_apex['original_apex'])
264    content_in_uncompressed_apex = self._get_container_files(
265        uncompressed_apex_fp)
266    self.assertIsNotNone(content_in_uncompressed_apex['apex_payload'])
267    self.assertIsNotNone(content_in_uncompressed_apex['digest'])
268
269    # Verify that CAPEX manifest contains digest of original_apex
270    manifest_string = self._get_manifest_string(
271        content_in_compressed_apex['apex_manifest.pb'])
272    self.assertIn('originalApexDigest: "'
273         + content_in_uncompressed_apex['digest'] + '"', manifest_string)
274
275    for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
276              'apex_build_info.pb', 'AndroidManifest.xml']:
277      if i in content_in_uncompressed_apex:
278        if i == 'apex_manifest.pb':
279          # Get rid of originalApexDigest field, which should be the
280          # only difference
281          self._unset_original_apex_digest(content_in_compressed_apex[i])
282        self.assertEqual(get_sha1sum(content_in_compressed_apex[i]),
283                         get_sha1sum(content_in_uncompressed_apex[i]))
284
285  def test_decompression(self):
286    # setup: create compressed APEX
287    uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex')
288    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
289
290    # Decompress it
291    decompressed_apex_fp = self._decompress_apex(compressed_apex_fp)
292
293    # Verify type of the apex is 'UNCOMPRESSED'
294    self.assertEqual(self._get_type(decompressed_apex_fp), 'UNCOMPRESSED')
295
296    # Verify decompressed APEX is same as uncompressed APEX
297    self.assertEqual(get_sha1sum(uncompressed_apex_fp),
298                     get_sha1sum(decompressed_apex_fp),
299                     'Decompressed APEX is not same as uncompressed APEX')
300
301    # Try decompressing uncompressed APEX. It should not work.
302    with self.assertRaises(RuntimeError) as error:
303      self._decompress_apex(uncompressed_apex_fp)
304
305    self.assertIn(uncompressed_apex_fp
306                  + ' is not a compressed APEX', str(error.exception))
307
308  def test_only_original_apex_is_compressed(self):
309    uncompressed_apex_fp = os.path.join(get_current_dir(), TEST_APEX + '.apex')
310    compressed_apex_fp = self._compress_apex(uncompressed_apex_fp)
311
312    with ZipFile(compressed_apex_fp, 'r') as zip_obj:
313      self.assertEqual(zip_obj.getinfo('original_apex').compress_type,
314                       ZIP_DEFLATED)
315      content_in_uncompressed_apex = self._get_container_files(
316          uncompressed_apex_fp)
317      for i in ['apex_manifest.json', 'apex_manifest.pb', 'apex_pubkey',
318                'apex_build_info.pb', 'AndroidManifest.xml']:
319        if i in content_in_uncompressed_apex:
320          self.assertEqual(zip_obj.getinfo(i).compress_type, ZIP_STORED)
321
322if __name__ == '__main__':
323  unittest.main(verbosity=2)
324