1# Copyright 2018 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for create_common."""
15
16import os
17import shutil
18import tempfile
19import unittest
20
21from unittest import mock
22
23from acloud import errors
24from acloud.create import create_common
25from acloud.internal.lib import android_build_client
26from acloud.internal.lib import auth
27from acloud.internal.lib import driver_test_lib
28from acloud.internal.lib import utils
29
30
31class FakeZipFile:
32    """Fake implementation of ZipFile()"""
33
34    # pylint: disable=invalid-name,unused-argument,no-self-use
35    def write(self, filename, arcname=None, compress_type=None):
36        """Fake write method."""
37        return
38
39    # pylint: disable=invalid-name,no-self-use
40    def close(self):
41        """Fake close method."""
42        return
43
44
45# pylint: disable=invalid-name,protected-access
46class CreateCommonTest(driver_test_lib.BaseDriverTest):
47    """Test create_common functions."""
48
49    # pylint: disable=protected-access
50    def testProcessHWPropertyWithInvalidArgs(self):
51        """Test ParseKeyValuePairArgs with invalid args."""
52        # Checking wrong property value.
53        args_str = "cpu:3,disk:"
54        with self.assertRaises(errors.MalformedDictStringError):
55            create_common.ParseKeyValuePairArgs(args_str)
56
57        # Checking wrong property format.
58        args_str = "cpu:3,disk"
59        with self.assertRaises(errors.MalformedDictStringError):
60            create_common.ParseKeyValuePairArgs(args_str)
61
62    def testParseHWPropertyStr(self):
63        """Test ParseKeyValuePairArgs."""
64        expected_dict = {"cpu": "2", "resolution": "1080x1920", "dpi": "240",
65                         "memory": "4g", "disk": "4g"}
66        args_str = "cpu:2,resolution:1080x1920,dpi:240,memory:4g,disk:4g"
67        result_dict = create_common.ParseKeyValuePairArgs(args_str)
68        self.assertTrue(expected_dict == result_dict)
69
70    def testGetCvdHostPackage(self):
71        """test GetCvdHostPackage."""
72        # Can't find the cvd host package
73        with mock.patch("os.path.exists") as exists:
74            exists.return_value = False
75            self.assertRaises(
76                errors.GetCvdLocalHostPackageError,
77                create_common.GetCvdHostPackage)
78
79        self.Patch(os.environ, "get", return_value="/fake_dir2")
80        self.Patch(utils, "GetDistDir", return_value="/fake_dir1")
81        # First and 2nd path are host out dirs, 3rd path is dist dir.
82        self.Patch(os.path, "exists",
83                   side_effect=[False, False, True])
84
85        # Find cvd host in dist dir.
86        self.assertEqual(
87            create_common.GetCvdHostPackage(),
88            "/fake_dir1/cvd-host_package.tar.gz")
89
90        # Find cvd host in host out dir.
91        self.Patch(os.environ, "get", return_value="/fake_dir2")
92        self.Patch(utils, "GetDistDir", return_value=None)
93        with mock.patch("os.path.exists") as exists:
94            exists.return_value = True
95            self.assertEqual(
96                create_common.GetCvdHostPackage(),
97                "/fake_dir2/cvd-host_package.tar.gz")
98
99    @mock.patch("acloud.create.create_common.os.path.isfile",
100                side_effect=lambda path: path == "/dir/name")
101    @mock.patch("acloud.create.create_common.os.path.isdir",
102                side_effect=lambda path: path == "/dir")
103    @mock.patch("acloud.create.create_common.os.listdir",
104                return_value=["name", "name2"])
105    def testFindLocalImage(self, _mock_listdir, _mock_isdir, _mock_isfile):
106        """Test FindLocalImage."""
107        self.assertEqual(
108            "/dir/name",
109            create_common.FindLocalImage("/test/../dir/name", "not_exist"))
110
111        self.assertEqual("/dir/name",
112                         create_common.FindLocalImage("/dir/", "name"))
113
114        with self.assertRaises(errors.GetLocalImageError):
115            create_common.FindLocalImage("/dir", "not_exist")
116
117        with self.assertRaises(errors.GetLocalImageError):
118            create_common.FindLocalImage("/dir", "name.?")
119
120    @mock.patch.object(utils, "Decompress")
121    def testDownloadRemoteArtifact(self, mock_decompress):
122        """Test Download cuttlefish package."""
123        mock_build_client = mock.MagicMock()
124        self.Patch(
125            android_build_client,
126            "AndroidBuildClient",
127            return_value=mock_build_client)
128        self.Patch(auth, "CreateCredentials", return_value=mock.MagicMock())
129        avd_spec = mock.MagicMock()
130        avd_spec.cfg = mock.MagicMock()
131        avd_spec.remote_image = {"build_target" : "aosp_cf_x86_phone-userdebug",
132                                 "build_id": "1234"}
133        build_id = "1234"
134        build_target = "aosp_cf_x86_phone-userdebug"
135        checkfile1 = "aosp_cf_x86_phone-img-1234.zip"
136        checkfile2 = "cvd-host_package.tar.gz"
137        extract_path = "/tmp/1234"
138
139        create_common.DownloadRemoteArtifact(
140            avd_spec.cfg,
141            avd_spec.remote_image["build_target"],
142            avd_spec.remote_image["build_id"],
143            checkfile1,
144            extract_path,
145            decompress=True)
146
147        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
148        mock_build_client.DownloadArtifact.assert_called_once_with(
149            build_target,
150            build_id,
151            checkfile1,
152            "%s/%s" % (extract_path, checkfile1))
153        self.assertEqual(mock_decompress.call_count, 1)
154
155        mock_decompress.call_count = 0
156        mock_build_client.DownloadArtifact.call_count = 0
157        create_common.DownloadRemoteArtifact(
158            avd_spec.cfg,
159            avd_spec.remote_image["build_target"],
160            avd_spec.remote_image["build_id"],
161            checkfile2,
162            extract_path)
163
164        self.assertEqual(mock_build_client.DownloadArtifact.call_count, 1)
165        mock_build_client.DownloadArtifact.assert_called_once_with(
166            build_target,
167            build_id,
168            checkfile2,
169            "%s/%s" % (extract_path, checkfile2))
170        self.assertEqual(mock_decompress.call_count, 0)
171
172    def testPrepareLocalInstanceDir(self):
173        """test PrepareLocalInstanceDir."""
174        temp_dir = tempfile.mkdtemp()
175        try:
176            cvd_home_dir = os.path.join(temp_dir, "local-instance-1")
177            mock_avd_spec = mock.Mock(local_instance_dir=None)
178            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
179            self.assertTrue(os.path.isdir(cvd_home_dir) and
180                            not os.path.islink(cvd_home_dir))
181
182            link_target_dir = os.path.join(temp_dir, "cvd_home")
183            os.mkdir(link_target_dir)
184            mock_avd_spec.local_instance_dir = link_target_dir
185            create_common.PrepareLocalInstanceDir(cvd_home_dir, mock_avd_spec)
186            self.assertTrue(os.path.islink(cvd_home_dir) and
187                            os.path.samefile(cvd_home_dir, link_target_dir))
188        finally:
189            shutil.rmtree(temp_dir)
190
191
192if __name__ == "__main__":
193    unittest.main()
194