1#
2# Copyright (C) 2018 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import os
18import os.path
19import unittest
20import zipfile
21
22import common
23import test_utils
24from add_img_to_target_files import (
25    AddCareMapTxtForAbOta, AddPackRadioImages, AddRadioImagesForAbOta,
26    GetCareMap)
27from rangelib import RangeSet
28
29
30OPTIONS = common.OPTIONS
31
32
33class AddImagesToTargetFilesTest(unittest.TestCase):
34
35  def setUp(self):
36    OPTIONS.input_tmp = common.MakeTempDir()
37
38  def tearDown(self):
39    common.Cleanup()
40
41  @staticmethod
42  def _create_images(images, prefix):
43    """Creates images under OPTIONS.input_tmp/prefix."""
44    path = os.path.join(OPTIONS.input_tmp, prefix)
45    if not os.path.exists(path):
46      os.mkdir(path)
47
48    for image in images:
49      image_path = os.path.join(path, image + '.img')
50      with open(image_path, 'wb') as image_fp:
51        image_fp.write(image.encode())
52
53    images_path = os.path.join(OPTIONS.input_tmp, 'IMAGES')
54    if not os.path.exists(images_path):
55      os.mkdir(images_path)
56    return images, images_path
57
58  def test_AddRadioImagesForAbOta_imageExists(self):
59    """Tests the case with existing images under IMAGES/."""
60    images, images_path = self._create_images(['aboot', 'xbl'], 'IMAGES')
61    AddRadioImagesForAbOta(None, images)
62
63    for image in images:
64      self.assertTrue(
65          os.path.exists(os.path.join(images_path, image + '.img')))
66
67  def test_AddRadioImagesForAbOta_copyFromRadio(self):
68    """Tests the case that copies images from RADIO/."""
69    images, images_path = self._create_images(['aboot', 'xbl'], 'RADIO')
70    AddRadioImagesForAbOta(None, images)
71
72    for image in images:
73      self.assertTrue(
74          os.path.exists(os.path.join(images_path, image + '.img')))
75
76  def test_AddRadioImagesForAbOta_copyFromRadio_zipOutput(self):
77    images, _ = self._create_images(['aboot', 'xbl'], 'RADIO')
78
79    # Set up the output zip.
80    output_file = common.MakeTempFile(suffix='.zip')
81    with zipfile.ZipFile(output_file, 'w') as output_zip:
82      AddRadioImagesForAbOta(output_zip, images)
83
84    with zipfile.ZipFile(output_file, 'r') as verify_zip:
85      for image in images:
86        self.assertIn('IMAGES/' + image + '.img', verify_zip.namelist())
87
88  def test_AddRadioImagesForAbOta_copyFromVendorImages(self):
89    """Tests the case that copies images from VENDOR_IMAGES/."""
90    vendor_images_path = os.path.join(OPTIONS.input_tmp, 'VENDOR_IMAGES')
91    os.mkdir(vendor_images_path)
92
93    partitions = ['aboot', 'xbl']
94    for index, partition in enumerate(partitions):
95      subdir = os.path.join(vendor_images_path, 'subdir-{}'.format(index))
96      os.mkdir(subdir)
97
98      partition_image_path = os.path.join(subdir, partition + '.img')
99      with open(partition_image_path, 'wb') as partition_fp:
100        partition_fp.write(partition.encode())
101
102    # Set up the output dir.
103    images_path = os.path.join(OPTIONS.input_tmp, 'IMAGES')
104    os.mkdir(images_path)
105
106    AddRadioImagesForAbOta(None, partitions)
107
108    for partition in partitions:
109      self.assertTrue(
110          os.path.exists(os.path.join(images_path, partition + '.img')))
111
112  def test_AddRadioImagesForAbOta_missingImages(self):
113    images, _ = self._create_images(['aboot', 'xbl'], 'RADIO')
114    self.assertRaises(AssertionError, AddRadioImagesForAbOta, None,
115                      images + ['baz'])
116
117  def test_AddRadioImagesForAbOta_missingImages_zipOutput(self):
118    images, _ = self._create_images(['aboot', 'xbl'], 'RADIO')
119
120    # Set up the output zip.
121    output_file = common.MakeTempFile(suffix='.zip')
122    with zipfile.ZipFile(output_file, 'w') as output_zip:
123      self.assertRaises(AssertionError, AddRadioImagesForAbOta, output_zip,
124                        images + ['baz'])
125
126  def test_AddPackRadioImages(self):
127    images, images_path = self._create_images(['foo', 'bar'], 'RADIO')
128    AddPackRadioImages(None, images)
129
130    for image in images:
131      self.assertTrue(
132          os.path.exists(os.path.join(images_path, image + '.img')))
133
134  def test_AddPackRadioImages_with_suffix(self):
135    images, images_path = self._create_images(['foo', 'bar'], 'RADIO')
136    images_with_suffix = [image + '.img' for image in images]
137    AddPackRadioImages(None, images_with_suffix)
138
139    for image in images:
140      self.assertTrue(
141          os.path.exists(os.path.join(images_path, image + '.img')))
142
143  def test_AddPackRadioImages_zipOutput(self):
144    images, _ = self._create_images(['foo', 'bar'], 'RADIO')
145
146    # Set up the output zip.
147    output_file = common.MakeTempFile(suffix='.zip')
148    with zipfile.ZipFile(output_file, 'w') as output_zip:
149      AddPackRadioImages(output_zip, images)
150
151    with zipfile.ZipFile(output_file, 'r') as verify_zip:
152      for image in images:
153        self.assertIn('IMAGES/' + image + '.img', verify_zip.namelist())
154
155  def test_AddPackRadioImages_imageExists(self):
156    images, images_path = self._create_images(['foo', 'bar'], 'RADIO')
157
158    # Additionally create images under IMAGES/ so that they should be skipped.
159    images, images_path = self._create_images(['foo', 'bar'], 'IMAGES')
160
161    AddPackRadioImages(None, images)
162
163    for image in images:
164      self.assertTrue(
165          os.path.exists(os.path.join(images_path, image + '.img')))
166
167  def test_AddPackRadioImages_missingImages(self):
168    images, _ = self._create_images(['foo', 'bar'], 'RADIO')
169    AddPackRadioImages(None, images)
170
171    self.assertRaises(AssertionError, AddPackRadioImages, None,
172                      images + ['baz'])
173
174  @staticmethod
175  def _test_AddCareMapTxtForAbOta():
176    """Helper function to set up the test for test_AddCareMapTxtForAbOta()."""
177    OPTIONS.info_dict = {
178        'system_verity_block_device' : '/dev/block/system',
179        'vendor_verity_block_device' : '/dev/block/vendor',
180    }
181
182    # Prepare the META/ folder.
183    meta_path = os.path.join(OPTIONS.input_tmp, 'META')
184    if not os.path.exists(meta_path):
185      os.mkdir(meta_path)
186
187    system_image = test_utils.construct_sparse_image([
188        (0xCAC1, 6),
189        (0xCAC3, 4),
190        (0xCAC1, 6)])
191    vendor_image = test_utils.construct_sparse_image([
192        (0xCAC2, 10)])
193
194    image_paths = {
195        'system' : system_image,
196        'vendor' : vendor_image,
197    }
198    return image_paths
199
200  def test_AddCareMapTxtForAbOta(self):
201    image_paths = self._test_AddCareMapTxtForAbOta()
202
203    AddCareMapTxtForAbOta(None, ['system', 'vendor'], image_paths)
204
205    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
206    with open(care_map_file, 'r') as verify_fp:
207      care_map = verify_fp.read()
208
209    lines = care_map.split('\n')
210    self.assertEqual(4, len(lines))
211    self.assertEqual('system', lines[0])
212    self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
213    self.assertEqual('vendor', lines[2])
214    self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
215
216  def test_AddCareMapTxtForAbOta_withNonCareMapPartitions(self):
217    """Partitions without care_map should be ignored."""
218    image_paths = self._test_AddCareMapTxtForAbOta()
219
220    AddCareMapTxtForAbOta(
221        None, ['boot', 'system', 'vendor', 'vbmeta'], image_paths)
222
223    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
224    with open(care_map_file, 'r') as verify_fp:
225      care_map = verify_fp.read()
226
227    lines = care_map.split('\n')
228    self.assertEqual(4, len(lines))
229    self.assertEqual('system', lines[0])
230    self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
231    self.assertEqual('vendor', lines[2])
232    self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
233
234  def test_AddCareMapTxtForAbOta_withAvb(self):
235    """Tests the case for device using AVB."""
236    image_paths = self._test_AddCareMapTxtForAbOta()
237    OPTIONS.info_dict = {
238        'avb_system_hashtree_enable' : 'true',
239        'avb_vendor_hashtree_enable' : 'true',
240    }
241
242    AddCareMapTxtForAbOta(None, ['system', 'vendor'], image_paths)
243
244    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
245    with open(care_map_file, 'r') as verify_fp:
246      care_map = verify_fp.read()
247
248    lines = care_map.split('\n')
249    self.assertEqual(4, len(lines))
250    self.assertEqual('system', lines[0])
251    self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
252    self.assertEqual('vendor', lines[2])
253    self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
254
255  def test_AddCareMapTxtForAbOta_verityNotEnabled(self):
256    """No care_map.txt should be generated if verity not enabled."""
257    image_paths = self._test_AddCareMapTxtForAbOta()
258    OPTIONS.info_dict = {}
259    AddCareMapTxtForAbOta(None, ['system', 'vendor'], image_paths)
260
261    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
262    self.assertFalse(os.path.exists(care_map_file))
263
264  def test_AddCareMapTxtForAbOta_missingImageFile(self):
265    """Missing image file should be considered fatal."""
266    image_paths = self._test_AddCareMapTxtForAbOta()
267    image_paths['vendor'] = ''
268    self.assertRaises(AssertionError, AddCareMapTxtForAbOta, None,
269                      ['system', 'vendor'], image_paths)
270
271  def test_AddCareMapTxtForAbOta_zipOutput(self):
272    """Tests the case with ZIP output."""
273    image_paths = self._test_AddCareMapTxtForAbOta()
274
275    output_file = common.MakeTempFile(suffix='.zip')
276    with zipfile.ZipFile(output_file, 'w') as output_zip:
277      AddCareMapTxtForAbOta(output_zip, ['system', 'vendor'], image_paths)
278
279    with zipfile.ZipFile(output_file, 'r') as verify_zip:
280      care_map = verify_zip.read('META/care_map.txt').decode('ascii')
281
282    lines = care_map.split('\n')
283    self.assertEqual(4, len(lines))
284    self.assertEqual('system', lines[0])
285    self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
286    self.assertEqual('vendor', lines[2])
287    self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
288
289  def test_AddCareMapTxtForAbOta_zipOutput_careMapEntryExists(self):
290    """Tests the case with ZIP output which already has care_map entry."""
291    image_paths = self._test_AddCareMapTxtForAbOta()
292
293    output_file = common.MakeTempFile(suffix='.zip')
294    with zipfile.ZipFile(output_file, 'w') as output_zip:
295      # Create an existing META/care_map.txt entry.
296      common.ZipWriteStr(output_zip, 'META/care_map.txt', 'dummy care_map.txt')
297
298      # Request to add META/care_map.txt again.
299      AddCareMapTxtForAbOta(output_zip, ['system', 'vendor'], image_paths)
300
301    # The one under OPTIONS.input_tmp must have been replaced.
302    care_map_file = os.path.join(OPTIONS.input_tmp, 'META', 'care_map.txt')
303    with open(care_map_file, 'r') as verify_fp:
304      care_map = verify_fp.read()
305
306    lines = care_map.split('\n')
307    self.assertEqual(4, len(lines))
308    self.assertEqual('system', lines[0])
309    self.assertEqual(RangeSet("0-5 10-15").to_string_raw(), lines[1])
310    self.assertEqual('vendor', lines[2])
311    self.assertEqual(RangeSet("0-9").to_string_raw(), lines[3])
312
313    # The existing entry should be scheduled to be replaced.
314    self.assertIn('META/care_map.txt', OPTIONS.replace_updated_files_list)
315
316  def test_GetCareMap(self):
317    sparse_image = test_utils.construct_sparse_image([
318        (0xCAC1, 6),
319        (0xCAC3, 4),
320        (0xCAC1, 6)])
321    OPTIONS.info_dict = {
322        'system_adjusted_partition_size' : 12,
323    }
324    name, care_map = GetCareMap('system', sparse_image)
325    self.assertEqual('system', name)
326    self.assertEqual(RangeSet("0-5 10-12").to_string_raw(), care_map)
327
328  def test_GetCareMap_invalidPartition(self):
329    self.assertRaises(AssertionError, GetCareMap, 'oem', None)
330
331  def test_GetCareMap_invalidAdjustedPartitionSize(self):
332    sparse_image = test_utils.construct_sparse_image([
333        (0xCAC1, 6),
334        (0xCAC3, 4),
335        (0xCAC1, 6)])
336    OPTIONS.info_dict = {
337        'system_adjusted_partition_size' : -12,
338    }
339    self.assertRaises(AssertionError, GetCareMap, 'system', sparse_image)
340