1#!/usr/bin/env python
2#
3# Copyright 2018 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Tests for LocalImageLocalInstance."""
17
18import os
19import subprocess
20import tempfile
21import unittest
22
23from unittest import mock
24
25from acloud import errors
26from acloud.create import local_image_local_instance
27from acloud.list import instance
28from acloud.list import list as list_instance
29from acloud.internal import constants
30from acloud.internal.lib import driver_test_lib
31from acloud.internal.lib import utils
32
33
34class LocalImageLocalInstanceTest(driver_test_lib.BaseDriverTest):
35    """Test LocalImageLocalInstance method."""
36
37    LAUNCH_CVD_CMD_WITH_DISK = """sg group1 <<EOF
38sg group2
39launch_cvd -daemon -config=phone -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -cpus fake -x_res fake -y_res fake -dpi fake -memory_mb fake -blank_data_image_mb fake -data_policy always_create -start_vnc_server=true
40EOF"""
41
42    LAUNCH_CVD_CMD_NO_DISK = """sg group1 <<EOF
43sg group2
44launch_cvd -daemon -config=phone -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -cpus fake -x_res fake -y_res fake -dpi fake -memory_mb fake -start_vnc_server=true
45EOF"""
46
47    LAUNCH_CVD_CMD_NO_DISK_WITH_GPU = """sg group1 <<EOF
48sg group2
49launch_cvd -daemon -config=phone -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -cpus fake -x_res fake -y_res fake -dpi fake -memory_mb fake -start_vnc_server=true
50EOF"""
51
52    LAUNCH_CVD_CMD_WITH_WEBRTC = """sg group1 <<EOF
53sg group2
54launch_cvd -daemon -config=auto -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -guest_enforce_security=false -vm_manager=crosvm -start_webrtc=true -webrtc_public_ip=0.0.0.0
55EOF"""
56
57    LAUNCH_CVD_CMD_WITH_MIXED_IMAGES = """sg group1 <<EOF
58sg group2
59launch_cvd -daemon -config=phone -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -start_vnc_server=true -super_image=fake_super_image -boot_image=fake_boot_image
60EOF"""
61
62    LAUNCH_CVD_CMD_WITH_ARGS = """sg group1 <<EOF
63sg group2
64launch_cvd -daemon -config=phone -run_adb_connector=true -system_image_dir fake_image_dir -instance_dir fake_cvd_dir -undefok=report_anonymous_usage_stats,enable_sandbox,config -report_anonymous_usage_stats=y -enable_sandbox=false -start_vnc_server=true -setupwizard_mode=REQUIRED
65EOF"""
66
67    _EXPECTED_DEVICES_IN_REPORT = [
68        {
69            "instance_name": "local-instance-1",
70            "ip": "0.0.0.0:6520",
71            "adb_port": 6520,
72            "vnc_port": 6444,
73            "webrtc_port": 8443
74        }
75    ]
76
77    _EXPECTED_DEVICES_IN_FAILED_REPORT = [
78        {
79            "instance_name": "local-instance-1",
80            "ip": "0.0.0.0"
81        }
82    ]
83
84    def setUp(self):
85        """Initialize new LocalImageLocalInstance."""
86        super().setUp()
87        self.local_image_local_instance = local_image_local_instance.LocalImageLocalInstance()
88
89    # pylint: disable=protected-access
90    @mock.patch("acloud.create.local_image_local_instance.utils")
91    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
92                       "GetImageArtifactsPath")
93    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
94                       "_SelectAndLockInstance")
95    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
96                       "_CheckRunningCvd")
97    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
98                       "_CreateInstance")
99    def testCreateAVD(self, mock_create, mock_check_running_cvd,
100                      mock_lock_instance, mock_get_image, mock_utils):
101        """Test _CreateAVD."""
102        mock_utils.IsSupportedPlatform.return_value = True
103        mock_get_image.return_value = local_image_local_instance.ArtifactPaths(
104            "/image/path", "/host/bin/path", None, None, None, None)
105        mock_check_running_cvd.return_value = True
106        mock_avd_spec = mock.Mock()
107        mock_lock = mock.Mock()
108        mock_lock.Unlock.return_value = False
109        mock_lock_instance.return_value = (1, mock_lock)
110
111        # Success
112        mock_create.return_value = mock.Mock()
113        self.local_image_local_instance._CreateAVD(
114            mock_avd_spec, no_prompts=True)
115        mock_lock_instance.assert_called_once()
116        mock_lock.SetInUse.assert_called_once_with(True)
117        mock_lock.Unlock.assert_called_once()
118
119        mock_lock_instance.reset_mock()
120        mock_lock.SetInUse.reset_mock()
121        mock_lock.Unlock.reset_mock()
122
123        # Failure with no report
124        mock_create.side_effect = ValueError("unit test")
125        with self.assertRaises(ValueError):
126            self.local_image_local_instance._CreateAVD(
127                mock_avd_spec, no_prompts=True)
128        mock_lock_instance.assert_called_once()
129        mock_lock.SetInUse.assert_not_called()
130        mock_lock.Unlock.assert_called_once()
131
132        # Failure with report
133        mock_lock_instance.side_effect = errors.CreateError("unit test")
134        report = self.local_image_local_instance._CreateAVD(
135            mock_avd_spec, no_prompts=True)
136        self.assertEqual(report.errors, ["unit test"])
137
138    def testSelectAndLockInstance(self):
139        """test _SelectAndLockInstance."""
140        mock_avd_spec = mock.Mock(local_instance_id=0)
141        mock_lock = mock.Mock()
142        mock_lock.Lock.return_value = True
143        mock_lock.LockIfNotInUse.side_effect = (False, True)
144        self.Patch(instance, "GetLocalInstanceLock",
145                   return_value=mock_lock)
146
147        ins_id, _ = self.local_image_local_instance._SelectAndLockInstance(
148            mock_avd_spec)
149        self.assertEqual(2, ins_id)
150        mock_lock.Lock.assert_not_called()
151        self.assertEqual(2, mock_lock.LockIfNotInUse.call_count)
152
153        mock_lock.LockIfNotInUse.reset_mock()
154
155        mock_avd_spec.local_instance_id = 1
156        ins_id, _ = self.local_image_local_instance._SelectAndLockInstance(
157            mock_avd_spec)
158        self.assertEqual(1, ins_id)
159        mock_lock.Lock.assert_called_once()
160        mock_lock.LockIfNotInUse.assert_not_called()
161
162    @mock.patch("acloud.create.local_image_local_instance.utils")
163    @mock.patch("acloud.create.local_image_local_instance.ota_tools")
164    @mock.patch("acloud.create.local_image_local_instance.create_common")
165    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
166                       "_LaunchCvd")
167    @mock.patch.object(local_image_local_instance.LocalImageLocalInstance,
168                       "PrepareLaunchCVDCmd")
169    @mock.patch.object(instance, "GetLocalInstanceRuntimeDir")
170    @mock.patch.object(instance, "GetLocalInstanceHomeDir")
171    def testCreateInstance(self, mock_home_dir, _mock_runtime_dir,
172                           _mock_prepare_cmd, mock_launch_cvd,
173                           _mock_create_common, mock_ota_tools, _mock_utils):
174        """Test the report returned by _CreateInstance."""
175        self.Patch(instance, "GetLocalInstanceName",
176                   return_value="local-instance-1")
177        mock_home_dir.return_value = "/local-instance-1"
178        artifact_paths = local_image_local_instance.ArtifactPaths(
179            "/image/path", "/host/bin/path", "/misc/info/path",
180            "/ota/tools/dir", "/system/image/path", "/boot/image/path")
181        mock_ota_tools_object = mock.Mock()
182        mock_ota_tools.OtaTools.return_value = mock_ota_tools_object
183        mock_avd_spec = mock.Mock(unlock_screen=False)
184        local_ins = mock.Mock(
185            adb_port=6520,
186            vnc_port=6444
187        )
188        local_ins.CvdStatus.return_value = True
189        self.Patch(instance, "LocalInstance",
190                   return_value=local_ins)
191        self.Patch(list_instance, "GetActiveCVD",
192                   return_value=local_ins)
193        self.Patch(os, "symlink")
194
195        # Success
196        report = self.local_image_local_instance._CreateInstance(
197            1, artifact_paths, mock_avd_spec, no_prompts=True)
198
199        self.assertEqual(report.data.get("devices"),
200                         self._EXPECTED_DEVICES_IN_REPORT)
201        mock_ota_tools.OtaTools.assert_called_with("/ota/tools/dir")
202        mock_ota_tools_object.BuildSuperImage.assert_called_with(
203            "/local-instance-1/mixed_super.img", "/misc/info/path", mock.ANY)
204
205        # Failure
206        mock_launch_cvd.side_effect = errors.LaunchCVDFail("unit test")
207
208        report = self.local_image_local_instance._CreateInstance(
209            1, artifact_paths, mock_avd_spec, no_prompts=True)
210
211        self.assertEqual(report.data.get("devices_failing_boot"),
212                         self._EXPECTED_DEVICES_IN_FAILED_REPORT)
213        self.assertIn("unit test", report.errors[0])
214
215    # pylint: disable=protected-access
216    @mock.patch("acloud.create.local_image_local_instance.os.path.isfile")
217    def testFindCvdHostBinaries(self, mock_isfile):
218        """Test FindCvdHostBinaries."""
219        cvd_host_dir = "/unit/test"
220        mock_isfile.return_value = None
221
222        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
223                             {"ANDROID_HOST_OUT": cvd_host_dir,
224                              "ANDROID_SOONG_HOST_OUT": cvd_host_dir}, clear=True):
225            with self.assertRaises(errors.GetCvdLocalHostPackageError):
226                self.local_image_local_instance._FindCvdHostBinaries(
227                    [cvd_host_dir])
228
229        mock_isfile.side_effect = (
230            lambda path: path == "/unit/test/bin/launch_cvd")
231
232        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
233                             {"ANDROID_HOST_OUT": cvd_host_dir,
234                              "ANDROID_SOONG_HOST_OUT": cvd_host_dir}, clear=True):
235            path = self.local_image_local_instance._FindCvdHostBinaries([])
236            self.assertEqual(path, cvd_host_dir)
237
238        with mock.patch.dict("acloud.internal.lib.ota_tools.os.environ",
239                             dict(), clear=True):
240            path = self.local_image_local_instance._FindCvdHostBinaries(
241                [cvd_host_dir])
242            self.assertEqual(path, cvd_host_dir)
243
244    @staticmethod
245    def _CreateEmptyFile(path):
246        os.makedirs(os.path.dirname(path), exist_ok=True)
247        with open(path, "w"):
248            pass
249
250    @mock.patch("acloud.create.local_image_local_instance.ota_tools")
251    def testGetImageArtifactsPath(self, mock_ota_tools):
252        """Test GetImageArtifactsPath without system image dir."""
253        with tempfile.TemporaryDirectory() as temp_dir:
254            image_dir = "/unit/test"
255            cvd_dir = os.path.join(temp_dir, "cvd-host_package")
256            self._CreateEmptyFile(os.path.join(cvd_dir, "bin", "launch_cvd"))
257
258            mock_avd_spec = mock.Mock(
259                local_image_dir=image_dir,
260                local_kernel_image=None,
261                local_system_image=None,
262                local_tool_dirs=[cvd_dir])
263
264            paths = self.local_image_local_instance.GetImageArtifactsPath(
265                mock_avd_spec)
266
267        mock_ota_tools.FindOtaTools.assert_not_called()
268        self.assertEqual(paths, (image_dir, cvd_dir, None, None, None, None))
269
270    @mock.patch("acloud.create.local_image_local_instance.ota_tools")
271    def testGetImageFromBuildEnvironment(self, mock_ota_tools):
272        """Test GetImageArtifactsPath with files in build environment."""
273        with tempfile.TemporaryDirectory() as temp_dir:
274            image_dir = os.path.join(temp_dir, "image")
275            cvd_dir = os.path.join(temp_dir, "cvd-host_package")
276            mock_ota_tools.FindOtaTools.return_value = cvd_dir
277            extra_image_dir = os.path.join(temp_dir, "extra_image")
278            system_image_path = os.path.join(extra_image_dir, "system.img")
279            boot_image_path = os.path.join(extra_image_dir, "boot.img")
280            misc_info_path = os.path.join(image_dir, "misc_info.txt")
281            self._CreateEmptyFile(os.path.join(image_dir, "vbmeta.img"))
282            self._CreateEmptyFile(os.path.join(cvd_dir, "bin", "launch_cvd"))
283            self._CreateEmptyFile(system_image_path)
284            self._CreateEmptyFile(boot_image_path)
285            self._CreateEmptyFile(os.path.join(extra_image_dir,
286                                               "boot-debug.img"))
287            self._CreateEmptyFile(misc_info_path)
288
289            mock_avd_spec = mock.Mock(
290                local_image_dir=image_dir,
291                local_kernel_image=extra_image_dir,
292                local_system_image=extra_image_dir,
293                local_tool_dirs=[])
294
295            with mock.patch.dict("acloud.create.local_image_local_instance."
296                                 "os.environ",
297                                 {"ANDROID_SOONG_HOST_OUT": cvd_dir},
298                                 clear=True):
299                paths = self.local_image_local_instance.GetImageArtifactsPath(
300                    mock_avd_spec)
301
302        mock_ota_tools.FindOtaTools.assert_called_once()
303        self.assertEqual(paths,
304                         (image_dir, cvd_dir, misc_info_path, cvd_dir,
305                          system_image_path, boot_image_path))
306
307    @mock.patch("acloud.create.local_image_local_instance.ota_tools")
308    def testGetImageFromTargetFiles(self, mock_ota_tools):
309        """Test GetImageArtifactsPath with extracted target files."""
310        ota_tools_dir = "/mock_ota_tools"
311        mock_ota_tools.FindOtaTools.return_value = ota_tools_dir
312
313        with tempfile.TemporaryDirectory() as temp_dir:
314            image_dir = os.path.join(temp_dir, "image")
315            cvd_dir = os.path.join(temp_dir, "cvd-host_package")
316            system_image_path = os.path.join(temp_dir, "system", "test.img")
317            misc_info_path = os.path.join(image_dir, "META", "misc_info.txt")
318            boot_image_path = os.path.join(temp_dir, "boot", "test.img")
319            self._CreateEmptyFile(os.path.join(image_dir, "IMAGES",
320                                               "vbmeta.img"))
321            self._CreateEmptyFile(os.path.join(cvd_dir, "bin", "launch_cvd"))
322            self._CreateEmptyFile(system_image_path)
323            self._CreateEmptyFile(misc_info_path)
324            self._CreateEmptyFile(boot_image_path)
325
326            mock_avd_spec = mock.Mock(
327                local_image_dir=image_dir,
328                local_kernel_image=boot_image_path,
329                local_system_image=system_image_path,
330                local_tool_dirs=[ota_tools_dir, cvd_dir])
331
332            paths = self.local_image_local_instance.GetImageArtifactsPath(
333                mock_avd_spec)
334
335        mock_ota_tools.FindOtaTools.assert_called_once()
336        self.assertEqual(paths,
337                         (os.path.join(image_dir, "IMAGES"), cvd_dir,
338                          misc_info_path, ota_tools_dir, system_image_path,
339                          boot_image_path))
340
341    @mock.patch.object(utils, "CheckUserInGroups")
342    def testPrepareLaunchCVDCmd(self, mock_usergroups):
343        """test PrepareLaunchCVDCmd."""
344        mock_usergroups.return_value = False
345        hw_property = {"cpu": "fake", "x_res": "fake", "y_res": "fake",
346                       "dpi":"fake", "memory": "fake", "disk": "fake"}
347        constants.LIST_CF_USER_GROUPS = ["group1", "group2"]
348
349        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
350            constants.CMD_LAUNCH_CVD, hw_property, True, "fake_image_dir",
351            "fake_cvd_dir", False, True, None, None, None, "phone")
352        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_WITH_DISK)
353
354        # "disk" doesn't exist in hw_property.
355        hw_property = {"cpu": "fake", "x_res": "fake", "y_res": "fake",
356                       "dpi": "fake", "memory": "fake"}
357        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
358            constants.CMD_LAUNCH_CVD, hw_property, True, "fake_image_dir",
359            "fake_cvd_dir", False, True, None, None, None, "phone")
360        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_NO_DISK)
361
362        # "gpu" is enabled with "default"
363        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
364            constants.CMD_LAUNCH_CVD, hw_property, True, "fake_image_dir",
365            "fake_cvd_dir", False, True, None, None, None, "phone")
366        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_NO_DISK_WITH_GPU)
367
368        # Following test with hw_property is None.
369        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
370            constants.CMD_LAUNCH_CVD, None, True, "fake_image_dir",
371            "fake_cvd_dir", True, False, None, None, None, "auto")
372        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_WITH_WEBRTC)
373
374        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
375            constants.CMD_LAUNCH_CVD, None, True, "fake_image_dir",
376            "fake_cvd_dir", False, True, "fake_super_image", "fake_boot_image",
377            None, "phone")
378        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_WITH_MIXED_IMAGES)
379
380        # Add args into launch command with "-setupwizard_mode=REQUIRED"
381        launch_cmd = self.local_image_local_instance.PrepareLaunchCVDCmd(
382            constants.CMD_LAUNCH_CVD, None, True, "fake_image_dir",
383            "fake_cvd_dir", False, True, None, None,
384            "-setupwizard_mode=REQUIRED", "phone")
385        self.assertEqual(launch_cmd, self.LAUNCH_CVD_CMD_WITH_ARGS)
386
387    @mock.patch.object(utils, "GetUserAnswerYes")
388    @mock.patch.object(list_instance, "GetActiveCVD")
389    def testCheckRunningCvd(self, mock_cvd_running, mock_get_answer):
390        """test _CheckRunningCvd."""
391        local_instance_id = 3
392
393        # Test that launch_cvd is running.
394        mock_cvd_running.return_value = True
395        mock_get_answer.return_value = False
396        answer = self.local_image_local_instance._CheckRunningCvd(
397            local_instance_id)
398        self.assertFalse(answer)
399
400        # Test that launch_cvd is not running.
401        mock_cvd_running.return_value = False
402        answer = self.local_image_local_instance._CheckRunningCvd(
403            local_instance_id)
404        self.assertTrue(answer)
405
406    # pylint: disable=protected-access
407    @mock.patch("acloud.create.local_image_local_instance.subprocess."
408                "check_call")
409    @mock.patch.dict("os.environ", clear=True)
410    def testLaunchCVD(self, mock_check_call):
411        """test _LaunchCvd should call subprocess.check_call with the env."""
412        local_instance_id = 3
413        launch_cvd_cmd = "launch_cvd"
414        host_bins_path = "host_bins_path"
415        cvd_home_dir = "fake_home"
416        timeout = 100
417        cvd_env = {}
418        cvd_env[constants.ENV_CVD_HOME] = cvd_home_dir
419        cvd_env[constants.ENV_CUTTLEFISH_INSTANCE] = str(local_instance_id)
420        cvd_env[constants.ENV_ANDROID_SOONG_HOST_OUT] = host_bins_path
421        cvd_env[constants.ENV_ANDROID_HOST_OUT] = host_bins_path
422
423        self.local_image_local_instance._LaunchCvd(launch_cvd_cmd,
424                                                   local_instance_id,
425                                                   host_bins_path,
426                                                   cvd_home_dir,
427                                                   timeout)
428
429        mock_check_call.assert_called_once_with(launch_cvd_cmd,
430                                                shell=True,
431                                                stderr=subprocess.STDOUT,
432                                                env=cvd_env,
433                                                timeout=timeout)
434
435    @mock.patch("acloud.create.local_image_local_instance.subprocess."
436                "check_call")
437    def testLaunchCVDTimeout(self, mock_check_call):
438        """test _LaunchCvd with subprocess errors."""
439        mock_check_call.side_effect = subprocess.TimeoutExpired(
440            cmd="launch_cvd", timeout=100)
441        with self.assertRaises(errors.LaunchCVDFail):
442            self.local_image_local_instance._LaunchCvd("launch_cvd",
443                                                       3,
444                                                       "host_bins_path",
445                                                       "cvd_home_dir",
446                                                       100)
447
448        mock_check_call.side_effect = subprocess.CalledProcessError(
449            cmd="launch_cvd", returncode=1)
450        with self.assertRaises(errors.LaunchCVDFail):
451            self.local_image_local_instance._LaunchCvd("launch_cvd",
452                                                       3,
453                                                       "host_bins_path",
454                                                       "cvd_home_dir",
455                                                       100)
456
457    def testGetWebrtcSigServerPort(self):
458        """test GetWebrtcSigServerPort."""
459        instance_id = 3
460        expected_port = 8445
461        self.assertEqual(
462            self.local_image_local_instance.GetWebrtcSigServerPort(instance_id),
463            expected_port)
464
465
466if __name__ == "__main__":
467    unittest.main()
468