1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for OtaTools."""
15
16import os
17import shutil
18import tempfile
19import unittest
20import mock
21
22from acloud import errors
23from acloud.internal.lib import ota_tools
24
25_INPUT_MISC_INFO = """
26mkbootimg_args=
27lpmake=lpmake
28dynamic_partition_list= system vendor
29vendor_image=/path/to/vendor.img
30super_super_device_size=3229614080
31"""
32
33_EXPECTED_MISC_INFO = """
34mkbootimg_args=
35lpmake=%s
36dynamic_partition_list= system vendor
37super_super_device_size=3229614080
38system_image=/path/to/system.img
39vendor_image=/path/to/vendor.img
40"""
41
42_INPUT_SYSTEM_QEMU_CONFIG = """
43out/target/product/generic_x86_64/vbmeta.img vbmeta 1
44out/target/product/generic_x86_64/super.img super 2
45"""
46
47_EXPECTED_SYSTEM_QEMU_CONFIG = """
48/path/to/vbmeta.img vbmeta 1
49/path/to/super.img super 2
50"""
51
52
53def _GetImage(name):
54    """Return the image path that appears in the expected output."""
55    return "/path/to/" + name + ".img"
56
57
58class CapturedFile(object):
59    """Capture intermediate files created by OtaTools."""
60
61    def __init__(self):
62        self.path = None
63        self.contents = None
64
65    def Load(self, path):
66        """Load file contents to this object."""
67        self.path = path
68        if not os.path.isfile(path):
69            return
70        with open(path, "r") as f:
71            self.contents = f.read()
72
73
74class OtaToolsTest(unittest.TestCase):
75    """Test OtaToolsTest methods."""
76
77    def setUp(self):
78        self._temp_dir = tempfile.mkdtemp()
79        os.mkdir(os.path.join(self._temp_dir, "bin"))
80        self._ota = ota_tools.OtaTools(self._temp_dir)
81        self._captured_files = []
82
83    def tearDown(self):
84        shutil.rmtree(self._temp_dir)
85        for path in self._captured_files:
86            if os.path.isfile(path):
87                os.remove(path)
88
89    @staticmethod
90    def _CreateFile(path, contents):
91        """Create and write to a file."""
92        parent_dir = os.path.dirname(path)
93        if not os.path.exists(parent_dir):
94            os.makedirs(parent_dir)
95        with open(path, "w") as f:
96            f.write(contents)
97
98    def _CreateBinary(self, name):
99        """Create an empty file in bin directory."""
100        path = os.path.join(self._temp_dir, "bin", name)
101        self._CreateFile(path, "")
102        return path
103
104    @staticmethod
105    def _MockPopen(return_value):
106        """Create a mock Popen object."""
107        popen = mock.Mock()
108        popen.communicate.return_value = ("stdout", "stderr")
109        popen.returncode = return_value
110        popen.poll.return_value = return_value
111        return popen
112
113    @staticmethod
114    def _MockPopenTimeout():
115        """Create a mock Popen object that raises timeout error."""
116        popen = mock.Mock()
117        popen.communicate.side_effect = errors.FunctionTimeoutError(
118            "unit test")
119        popen.returncode = None
120        popen.poll.return_value = None
121        return popen
122
123    def testFindOtaTools(self):
124        """Test FindOtaTools."""
125        # CVD host package contains lpmake but not all tools.
126        self._CreateBinary("lpmake")
127        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
128                             {"ANDROID_HOST_OUT": self._temp_dir}, clear=True):
129            with self.assertRaises(errors.CheckPathError):
130                ota_tools.FindOtaTools([self._temp_dir])
131
132        # The function identifies OTA tool directory by build_super_image.
133        self._CreateBinary("build_super_image")
134        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
135                             dict(), clear=True):
136            self.assertEqual(ota_tools.FindOtaTools([self._temp_dir]),
137                             self._temp_dir)
138
139        # ANDROID_HOST_OUT contains OTA tools in build environment.
140        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
141                             {"ANDROID_HOST_OUT": self._temp_dir}, clear=True):
142            self.assertEqual(ota_tools.FindOtaTools([]), self._temp_dir)
143
144    # pylint: disable=broad-except
145    def _TestBuildSuperImage(self, mock_popen, mock_popen_object,
146                             expected_error=None):
147        """Test BuildSuperImage.
148
149        Args:
150            mock_popen: Mock class of subprocess.Popen.
151            popen_return_value: Mock object of subprocess.Popen.
152            expected_error: The error type that BuildSuperImage should raise.
153        """
154        build_super_image = self._CreateBinary("build_super_image")
155        lpmake = self._CreateBinary("lpmake")
156
157        misc_info_path = os.path.join(self._temp_dir, "misc_info.txt")
158        self._CreateFile(misc_info_path, _INPUT_MISC_INFO)
159
160        rewritten_misc_info = CapturedFile()
161
162        def _CaptureMiscInfo(cmd, **_kwargs):
163            self._captured_files.append(cmd[1])
164            rewritten_misc_info.Load(cmd[1])
165            return mock_popen_object
166
167        mock_popen.side_effect = _CaptureMiscInfo
168
169        try:
170            self._ota.BuildSuperImage("/unit/test", misc_info_path, _GetImage)
171            if expected_error:
172                self.fail(expected_error.__name__ + " is not raised.")
173        except Exception as e:
174            if not expected_error or not isinstance(e, expected_error):
175                raise
176
177        expected_cmd = (
178            build_super_image,
179            rewritten_misc_info.path,
180            "/unit/test",
181        )
182
183        mock_popen.assert_called_once()
184        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
185        self.assertEqual(rewritten_misc_info.contents,
186                         _EXPECTED_MISC_INFO % lpmake)
187        self.assertFalse(os.path.exists(rewritten_misc_info.path))
188
189    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
190    def testBuildSuperImageSuccess(self, mock_popen):
191        """Test BuildSuperImage."""
192        self._TestBuildSuperImage(mock_popen, self._MockPopen(return_value=0))
193
194    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
195    def testBuildSuperImageTimeout(self, mock_popen):
196        """Test BuildSuperImage with command timeout."""
197        self._TestBuildSuperImage(mock_popen, self._MockPopenTimeout(),
198                                  errors.FunctionTimeoutError)
199
200    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
201    def testMakeDisabledVbmetaImageSuccess(self, mock_popen):
202        """Test MakeDisabledVbmetaImage."""
203        avbtool = self._CreateBinary("avbtool")
204
205        mock_popen.return_value = self._MockPopen(return_value=0)
206
207        self._ota.MakeDisabledVbmetaImage("/unit/test")
208
209        expected_cmd = (
210            avbtool, "make_vbmeta_image",
211            "--flag", "2",
212            "--padding_size", "4096",
213            "--output", "/unit/test",
214        )
215
216        mock_popen.assert_called_once()
217        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
218
219    # pylint: disable=broad-except
220    def _TestMkCombinedImg(self, mock_popen, mock_popen_object,
221                           expected_error=None):
222        """Test MkCombinedImg.
223
224        Args:
225            mock_popen: Mock class of subprocess.Popen.
226            mock_popen_object: Mock object of subprocess.Popen.
227            expected_error: The error type that MkCombinedImg should raise.
228        """
229        mk_combined_img = self._CreateBinary("mk_combined_img")
230        sgdisk = self._CreateBinary("sgdisk")
231        simg2img = self._CreateBinary("simg2img")
232
233        config_path = os.path.join(self._temp_dir, "misc_info.txt")
234        self._CreateFile(config_path, _INPUT_SYSTEM_QEMU_CONFIG)
235
236        rewritten_config = CapturedFile()
237
238        def _CaptureSystemQemuConfig(cmd, **_kwargs):
239            self._captured_files.append(cmd[2])
240            rewritten_config.Load(cmd[2])
241            return mock_popen_object
242
243        mock_popen.side_effect = _CaptureSystemQemuConfig
244
245        try:
246            self._ota.MkCombinedImg("/unit/test", config_path, _GetImage)
247            if expected_error:
248                self.fail(expected_error.__name__ + " is not raised.")
249        except Exception as e:
250            if not expected_error or not isinstance(e, expected_error):
251                raise
252
253        expected_cmd = (
254            mk_combined_img,
255            "-i", rewritten_config.path,
256            "-o", "/unit/test",
257        )
258
259        expected_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
260
261        mock_popen.assert_called_once()
262        self.assertEqual(mock_popen.call_args[0][0], expected_cmd)
263        self.assertEqual(mock_popen.call_args[1].get("env"), expected_env)
264        self.assertEqual(rewritten_config.contents,
265                         _EXPECTED_SYSTEM_QEMU_CONFIG)
266        self.assertFalse(os.path.exists(rewritten_config.path))
267
268    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
269    def testMkCombinedImgSuccess(self, mock_popen):
270        """Test MkCombinedImg."""
271        return self._TestMkCombinedImg(mock_popen,
272                                       self._MockPopen(return_value=0))
273
274    @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen")
275    def testMkCombinedImgFailure(self, mock_popen):
276        """Test MkCombinedImg with command failure."""
277        return self._TestMkCombinedImg(mock_popen,
278                                       self._MockPopen(return_value=1),
279                                       errors.SubprocessFail)
280
281
282if __name__ == "__main__":
283    unittest.main()
284