1#!/usr/bin/env python
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#      http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18"""
19Utils for running unittests.
20"""
21
22import logging
23import os
24import os.path
25import re
26import struct
27import sys
28import unittest
29import zipfile
30
31import common
32
33# Some test runner doesn't like outputs from stderr.
34logging.basicConfig(stream=sys.stdout)
35
36# Use ANDROID_BUILD_TOP as an indicator to tell if the needed tools (e.g.
37# avbtool, mke2fs) are available while running the tests, unless
38# FORCE_RUN_RELEASETOOLS is set to '1'. Not having the required vars means we
39# can't run the tests that require external tools.
40EXTERNAL_TOOLS_UNAVAILABLE = (
41    not os.environ.get('ANDROID_BUILD_TOP') and
42    os.environ.get('FORCE_RUN_RELEASETOOLS') != '1')
43
44
45def SkipIfExternalToolsUnavailable():
46  """Decorator function that allows skipping tests per tools availability."""
47  if EXTERNAL_TOOLS_UNAVAILABLE:
48    return unittest.skip('External tools unavailable')
49  return lambda func: func
50
51
52def get_testdata_dir():
53  """Returns the testdata dir, in relative to the script dir."""
54  # The script dir is the one we want, which could be different from pwd.
55  current_dir = os.path.dirname(os.path.realpath(__file__))
56  return os.path.join(current_dir, 'testdata')
57
58def get_current_dir():
59  """Returns the current dir, relative to the script dir."""
60  # The script dir is the one we want, which could be different from pwd.
61  current_dir = os.path.dirname(os.path.realpath(__file__))
62  return current_dir
63
64def get_search_path():
65  """Returns the search path that has 'framework/signapk.jar' under."""
66
67  def signapk_exists(path):
68    signapk_path = os.path.realpath(
69        os.path.join(path, 'framework', 'signapk.jar'))
70    return os.path.exists(signapk_path)
71
72  # Try with ANDROID_BUILD_TOP first.
73  full_path = os.path.realpath(os.path.join(
74      os.environ.get('ANDROID_BUILD_TOP', ''), 'out', 'host', 'linux-x86'))
75  if signapk_exists(full_path):
76    return full_path
77
78  # Otherwise try going with relative pathes.
79  current_dir = os.path.dirname(os.path.realpath(__file__))
80  for path in (
81      # In relative to 'build/make/tools/releasetools' in the Android source.
82      ['..'] * 4 + ['out', 'host', 'linux-x86'],
83      # Or running the script unpacked from otatools.zip.
84      ['..']):
85    full_path = os.path.realpath(os.path.join(current_dir, *path))
86    if signapk_exists(full_path):
87      return full_path
88  return None
89
90
91def construct_sparse_image(chunks):
92  """Returns a sparse image file constructed from the given chunks.
93
94  From system/core/libsparse/sparse_format.h.
95  typedef struct sparse_header {
96    __le32 magic;  // 0xed26ff3a
97    __le16 major_version;  // (0x1) - reject images with higher major versions
98    __le16 minor_version;  // (0x0) - allow images with higer minor versions
99    __le16 file_hdr_sz;  // 28 bytes for first revision of the file format
100    __le16 chunk_hdr_sz;  // 12 bytes for first revision of the file format
101    __le32 blk_sz;  // block size in bytes, must be a multiple of 4 (4096)
102    __le32 total_blks;  // total blocks in the non-sparse output image
103    __le32 total_chunks;  // total chunks in the sparse input image
104    __le32 image_checksum;  // CRC32 checksum of the original data, counting
105                            // "don't care" as 0. Standard 802.3 polynomial,
106                            // use a Public Domain table implementation
107  } sparse_header_t;
108
109  typedef struct chunk_header {
110    __le16 chunk_type;  // 0xCAC1 -> raw; 0xCAC2 -> fill;
111                        // 0xCAC3 -> don't care
112    __le16 reserved1;
113    __le32 chunk_sz;  // in blocks in output image
114    __le32 total_sz;  // in bytes of chunk input file including chunk header
115                      // and data
116  } chunk_header_t;
117
118  Args:
119    chunks: A list of chunks to be written. Each entry should be a tuple of
120        (chunk_type, block_number).
121
122  Returns:
123    Filename of the created sparse image.
124  """
125  SPARSE_HEADER_MAGIC = 0xED26FF3A
126  SPARSE_HEADER_FORMAT = "<I4H4I"
127  CHUNK_HEADER_FORMAT = "<2H2I"
128
129  sparse_image = common.MakeTempFile(prefix='sparse-', suffix='.img')
130  with open(sparse_image, 'wb') as fp:
131    fp.write(struct.pack(
132        SPARSE_HEADER_FORMAT, SPARSE_HEADER_MAGIC, 1, 0, 28, 12, 4096,
133        sum(chunk[1] for chunk in chunks),
134        len(chunks), 0))
135
136    for chunk in chunks:
137      data_size = 0
138      if chunk[0] == 0xCAC1:
139        data_size = 4096 * chunk[1]
140      elif chunk[0] == 0xCAC2:
141        data_size = 4
142      elif chunk[0] == 0xCAC3:
143        pass
144      else:
145        assert False, "Unsupported chunk type: {}".format(chunk[0])
146
147      fp.write(struct.pack(
148          CHUNK_HEADER_FORMAT, chunk[0], 0, chunk[1], data_size + 12))
149      if data_size != 0:
150        fp.write(os.urandom(data_size))
151
152  return sparse_image
153
154
155class MockScriptWriter(object):
156  """A class that mocks edify_generator.EdifyGenerator.
157
158  It simply pushes the incoming arguments onto script stack, which is to assert
159  the calls to EdifyGenerator functions.
160  """
161
162  def __init__(self, enable_comments=False):
163    self.lines = []
164    self.enable_comments = enable_comments
165
166  def Mount(self, *args):
167    self.lines.append(('Mount',) + args)
168
169  def AssertDevice(self, *args):
170    self.lines.append(('AssertDevice',) + args)
171
172  def AssertOemProperty(self, *args):
173    self.lines.append(('AssertOemProperty',) + args)
174
175  def AssertFingerprintOrThumbprint(self, *args):
176    self.lines.append(('AssertFingerprintOrThumbprint',) + args)
177
178  def AssertSomeFingerprint(self, *args):
179    self.lines.append(('AssertSomeFingerprint',) + args)
180
181  def AssertSomeThumbprint(self, *args):
182    self.lines.append(('AssertSomeThumbprint',) + args)
183
184  def Comment(self, comment):
185    if not self.enable_comments:
186      return
187    self.lines.append('# {}'.format(comment))
188
189  def AppendExtra(self, extra):
190    self.lines.append(extra)
191
192  def __str__(self):
193    return '\n'.join(self.lines)
194
195
196class ReleaseToolsTestCase(unittest.TestCase):
197  """A common base class for all the releasetools unittests."""
198
199  def tearDown(self):
200    common.Cleanup()
201
202class PropertyFilesTestCase(ReleaseToolsTestCase):
203
204  @staticmethod
205  def construct_zip_package(entries):
206    zip_file = common.MakeTempFile(suffix='.zip')
207    with zipfile.ZipFile(zip_file, 'w', allowZip64=True) as zip_fp:
208      for entry in entries:
209        zip_fp.writestr(
210            entry,
211            entry.replace('.', '-').upper(),
212            zipfile.ZIP_STORED)
213    return zip_file
214
215  @staticmethod
216  def _parse_property_files_string(data):
217    result = {}
218    for token in data.split(','):
219      name, info = token.split(':', 1)
220      result[name] = info
221    return result
222
223  def setUp(self):
224    common.OPTIONS.no_signing = False
225
226  def _verify_entries(self, input_file, tokens, entries):
227    for entry in entries:
228      offset, size = map(int, tokens[entry].split(':'))
229      with open(input_file, 'rb') as input_fp:
230        input_fp.seek(offset)
231        if entry == 'metadata':
232          expected = b'META-INF/COM/ANDROID/METADATA'
233        elif entry == 'metadata.pb':
234          expected = b'META-INF/COM/ANDROID/METADATA-PB'
235        else:
236          expected = entry.replace('.', '-').upper().encode()
237        self.assertEqual(expected, input_fp.read(size))
238
239
240if __name__ == '__main__':
241  # We only want to run tests from the top level directory. Unfortunately the
242  # pattern option of unittest.discover, internally using fnmatch, doesn't
243  # provide a good API to filter the test files based on directory. So we do an
244  # os walk and load them manually.
245  test_modules = []
246  base_path = os.path.dirname(os.path.realpath(__file__))
247  for dirpath, _, files in os.walk(base_path):
248    for fn in files:
249      if dirpath == base_path and re.match('test_.*\\.py$', fn):
250        test_modules.append(fn[:-3])
251
252  test_suite = unittest.TestLoader().loadTestsFromNames(test_modules)
253
254  # atest needs a verbosity level of >= 2 to correctly parse the result.
255  unittest.TextTestRunner(verbosity=2).run(test_suite)
256