1# Copyright 2019 - The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests for OtaTools.""" 15 16import os 17import shutil 18import tempfile 19import unittest 20import mock 21 22from acloud import errors 23from acloud.internal.lib import ota_tools 24 25_INPUT_MISC_INFO = """ 26mkbootimg_args= 27lpmake=lpmake 28dynamic_partition_list= system vendor 29vendor_image=/path/to/vendor.img 30super_super_device_size=3229614080 31""" 32 33_EXPECTED_MISC_INFO = """ 34mkbootimg_args= 35lpmake=%s 36dynamic_partition_list= system vendor 37super_super_device_size=3229614080 38system_image=/path/to/system.img 39vendor_image=/path/to/vendor.img 40""" 41 42_INPUT_SYSTEM_QEMU_CONFIG = """ 43out/target/product/generic_x86_64/vbmeta.img vbmeta 1 44out/target/product/generic_x86_64/super.img super 2 45""" 46 47_EXPECTED_SYSTEM_QEMU_CONFIG = """ 48/path/to/vbmeta.img vbmeta 1 49/path/to/super.img super 2 50""" 51 52 53def _GetImage(name): 54 """Return the image path that appears in the expected output.""" 55 return "/path/to/" + name + ".img" 56 57 58class CapturedFile(object): 59 """Capture intermediate files created by OtaTools.""" 60 61 def __init__(self): 62 self.path = None 63 self.contents = None 64 65 def Load(self, path): 66 """Load file contents to this object.""" 67 self.path = path 68 if not os.path.isfile(path): 69 return 70 with open(path, "r") as f: 71 self.contents = f.read() 72 73 74class OtaToolsTest(unittest.TestCase): 75 """Test OtaToolsTest methods.""" 76 77 def setUp(self): 78 self._temp_dir = tempfile.mkdtemp() 79 os.mkdir(os.path.join(self._temp_dir, "bin")) 80 self._ota = ota_tools.OtaTools(self._temp_dir) 81 self._captured_files = [] 82 83 def tearDown(self): 84 shutil.rmtree(self._temp_dir) 85 for path in self._captured_files: 86 if os.path.isfile(path): 87 os.remove(path) 88 89 @staticmethod 90 def _CreateFile(path, contents): 91 """Create and write to a file.""" 92 parent_dir = os.path.dirname(path) 93 if not os.path.exists(parent_dir): 94 os.makedirs(parent_dir) 95 with open(path, "w") as f: 96 f.write(contents) 97 98 def _CreateBinary(self, name): 99 """Create an empty file in bin directory.""" 100 path = os.path.join(self._temp_dir, "bin", name) 101 self._CreateFile(path, "") 102 return path 103 104 @staticmethod 105 def _MockPopen(return_value): 106 """Create a mock Popen object.""" 107 popen = mock.Mock() 108 popen.communicate.return_value = ("stdout", "stderr") 109 popen.returncode = return_value 110 popen.poll.return_value = return_value 111 return popen 112 113 @staticmethod 114 def _MockPopenTimeout(): 115 """Create a mock Popen object that raises timeout error.""" 116 popen = mock.Mock() 117 popen.communicate.side_effect = errors.FunctionTimeoutError( 118 "unit test") 119 popen.returncode = None 120 popen.poll.return_value = None 121 return popen 122 123 def testFindOtaTools(self): 124 """Test FindOtaTools.""" 125 # CVD host package contains lpmake but not all tools. 126 self._CreateBinary("lpmake") 127 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 128 {"ANDROID_HOST_OUT": self._temp_dir}, clear=True): 129 with self.assertRaises(errors.CheckPathError): 130 ota_tools.FindOtaTools([self._temp_dir]) 131 132 # The function identifies OTA tool directory by build_super_image. 133 self._CreateBinary("build_super_image") 134 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 135 dict(), clear=True): 136 self.assertEqual(ota_tools.FindOtaTools([self._temp_dir]), 137 self._temp_dir) 138 139 # ANDROID_HOST_OUT contains OTA tools in build environment. 140 with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ", 141 {"ANDROID_HOST_OUT": self._temp_dir}, clear=True): 142 self.assertEqual(ota_tools.FindOtaTools([]), self._temp_dir) 143 144 # pylint: disable=broad-except 145 def _TestBuildSuperImage(self, mock_popen, mock_popen_object, 146 expected_error=None): 147 """Test BuildSuperImage. 148 149 Args: 150 mock_popen: Mock class of subprocess.Popen. 151 popen_return_value: Mock object of subprocess.Popen. 152 expected_error: The error type that BuildSuperImage should raise. 153 """ 154 build_super_image = self._CreateBinary("build_super_image") 155 lpmake = self._CreateBinary("lpmake") 156 157 misc_info_path = os.path.join(self._temp_dir, "misc_info.txt") 158 self._CreateFile(misc_info_path, _INPUT_MISC_INFO) 159 160 rewritten_misc_info = CapturedFile() 161 162 def _CaptureMiscInfo(cmd, **_kwargs): 163 self._captured_files.append(cmd[1]) 164 rewritten_misc_info.Load(cmd[1]) 165 return mock_popen_object 166 167 mock_popen.side_effect = _CaptureMiscInfo 168 169 try: 170 self._ota.BuildSuperImage("/unit/test", misc_info_path, _GetImage) 171 if expected_error: 172 self.fail(expected_error.__name__ + " is not raised.") 173 except Exception as e: 174 if not expected_error or not isinstance(e, expected_error): 175 raise 176 177 expected_cmd = ( 178 build_super_image, 179 rewritten_misc_info.path, 180 "/unit/test", 181 ) 182 183 mock_popen.assert_called_once() 184 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 185 self.assertEqual(rewritten_misc_info.contents, 186 _EXPECTED_MISC_INFO % lpmake) 187 self.assertFalse(os.path.exists(rewritten_misc_info.path)) 188 189 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 190 def testBuildSuperImageSuccess(self, mock_popen): 191 """Test BuildSuperImage.""" 192 self._TestBuildSuperImage(mock_popen, self._MockPopen(return_value=0)) 193 194 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 195 def testBuildSuperImageTimeout(self, mock_popen): 196 """Test BuildSuperImage with command timeout.""" 197 self._TestBuildSuperImage(mock_popen, self._MockPopenTimeout(), 198 errors.FunctionTimeoutError) 199 200 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 201 def testMakeDisabledVbmetaImageSuccess(self, mock_popen): 202 """Test MakeDisabledVbmetaImage.""" 203 avbtool = self._CreateBinary("avbtool") 204 205 mock_popen.return_value = self._MockPopen(return_value=0) 206 207 self._ota.MakeDisabledVbmetaImage("/unit/test") 208 209 expected_cmd = ( 210 avbtool, "make_vbmeta_image", 211 "--flag", "2", 212 "--padding_size", "4096", 213 "--output", "/unit/test", 214 ) 215 216 mock_popen.assert_called_once() 217 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 218 219 # pylint: disable=broad-except 220 def _TestMkCombinedImg(self, mock_popen, mock_popen_object, 221 expected_error=None): 222 """Test MkCombinedImg. 223 224 Args: 225 mock_popen: Mock class of subprocess.Popen. 226 mock_popen_object: Mock object of subprocess.Popen. 227 expected_error: The error type that MkCombinedImg should raise. 228 """ 229 mk_combined_img = self._CreateBinary("mk_combined_img") 230 sgdisk = self._CreateBinary("sgdisk") 231 simg2img = self._CreateBinary("simg2img") 232 233 config_path = os.path.join(self._temp_dir, "misc_info.txt") 234 self._CreateFile(config_path, _INPUT_SYSTEM_QEMU_CONFIG) 235 236 rewritten_config = CapturedFile() 237 238 def _CaptureSystemQemuConfig(cmd, **_kwargs): 239 self._captured_files.append(cmd[2]) 240 rewritten_config.Load(cmd[2]) 241 return mock_popen_object 242 243 mock_popen.side_effect = _CaptureSystemQemuConfig 244 245 try: 246 self._ota.MkCombinedImg("/unit/test", config_path, _GetImage) 247 if expected_error: 248 self.fail(expected_error.__name__ + " is not raised.") 249 except Exception as e: 250 if not expected_error or not isinstance(e, expected_error): 251 raise 252 253 expected_cmd = ( 254 mk_combined_img, 255 "-i", rewritten_config.path, 256 "-o", "/unit/test", 257 ) 258 259 expected_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img} 260 261 mock_popen.assert_called_once() 262 self.assertEqual(mock_popen.call_args[0][0], expected_cmd) 263 self.assertEqual(mock_popen.call_args[1].get("env"), expected_env) 264 self.assertEqual(rewritten_config.contents, 265 _EXPECTED_SYSTEM_QEMU_CONFIG) 266 self.assertFalse(os.path.exists(rewritten_config.path)) 267 268 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 269 def testMkCombinedImgSuccess(self, mock_popen): 270 """Test MkCombinedImg.""" 271 return self._TestMkCombinedImg(mock_popen, 272 self._MockPopen(return_value=0)) 273 274 @mock.patch("acloud.internal.lib.ota_tools.subprocess.Popen") 275 def testMkCombinedImgFailure(self, mock_popen): 276 """Test MkCombinedImg with command failure.""" 277 return self._TestMkCombinedImg(mock_popen, 278 self._MockPopen(return_value=1), 279 errors.SubprocessFail) 280 281 282if __name__ == "__main__": 283 unittest.main() 284