1#!/usr/bin/env python
2#
3# Copyright 2018 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16"""Tests for instance class."""
17
18import collections
19import datetime
20import subprocess
21import unittest
22
23from unittest import mock
24from six import b
25
26# pylint: disable=import-error
27import dateutil.parser
28import dateutil.tz
29
30from acloud.internal import constants
31from acloud.internal.lib import cvd_runtime_config
32from acloud.internal.lib import driver_test_lib
33from acloud.internal.lib import utils
34from acloud.internal.lib.adb_tools import AdbTools
35from acloud.list import instance
36
37
38class InstanceTest(driver_test_lib.BaseDriverTest):
39    """Test instance."""
40    PS_SSH_TUNNEL = b("/fake_ps_1 --fake arg \n"
41                      "/fake_ps_2 --fake arg \n"
42                      "/usr/bin/ssh -i ~/.ssh/acloud_rsa "
43                      "-o UserKnownHostsFile=/dev/null "
44                      "-o StrictHostKeyChecking=no -L 54321:127.0.0.1:6520 "
45                      "-L 12345:127.0.0.1:6444 -N -f -l user 1.1.1.1")
46    GCE_INSTANCE = {
47        constants.INS_KEY_NAME: "fake_ins_name",
48        constants.INS_KEY_CREATETIME: "fake_create_time",
49        constants.INS_KEY_STATUS: "fake_status",
50        constants.INS_KEY_ZONE: "test/zones/fake_zone",
51        "networkInterfaces": [{"accessConfigs": [{"natIP": "1.1.1.1"}]}],
52        "labels": {constants.INS_KEY_AVD_TYPE: "fake_type",
53                   constants.INS_KEY_AVD_FLAVOR: "fake_flavor"},
54        "metadata": {
55            "items":[{"key":constants.INS_KEY_AVD_TYPE,
56                      "value":"fake_type"},
57                     {"key":constants.INS_KEY_AVD_FLAVOR,
58                      "value":"fake_flavor"}]}
59    }
60
61    @staticmethod
62    def _MockCvdRuntimeConfig():
63        """Create a mock CvdRuntimeConfig."""
64        return mock.MagicMock(
65            instance_id=2,
66            x_res=1080,
67            y_res=1920,
68            dpi=480,
69            instance_dir="fake_instance_dir",
70            adb_port=6521,
71            vnc_port=6445,
72            adb_ip_port="127.0.0.1:6521",
73            cvd_tools_path="fake_cvd_tools_path",
74            config_path="fake_config_path",
75        )
76
77    @mock.patch("acloud.list.instance.AdbTools")
78    def testCreateLocalInstance(self, mock_adb_tools):
79        """Test getting local instance info from cvd runtime config."""
80        mock_adb_tools_object = mock.Mock(device_information={})
81        mock_adb_tools_object.IsAdbConnected.return_value = True
82        mock_adb_tools.return_value = mock_adb_tools_object
83        self.Patch(cvd_runtime_config, "CvdRuntimeConfig",
84                   return_value=self._MockCvdRuntimeConfig())
85        local_instance = instance.LocalInstance("fake_config_path")
86
87        self.assertEqual("local-instance-2", local_instance.name)
88        self.assertEqual(True, local_instance.islocal)
89        self.assertEqual("1080x1920 (480)", local_instance.display)
90        expected_full_name = ("device serial: 0.0.0.0:%s (%s) elapsed time: %s"
91                              % ("6521",
92                                 "local-instance-2",
93                                 "None"))
94        self.assertEqual(expected_full_name, local_instance.fullname)
95        self.assertEqual(6521, local_instance.adb_port)
96        self.assertEqual(6445, local_instance.vnc_port)
97        self.assertEqual(8444, local_instance.webrtc_port)
98
99    @mock.patch("acloud.list.instance.AdbTools")
100    def testDeleteLocalInstance(self, mock_adb_tools):
101        """Test executing stop_cvd command."""
102        self.Patch(cvd_runtime_config, "CvdRuntimeConfig",
103                   return_value=self._MockCvdRuntimeConfig())
104        mock_adb_tools_object = mock.Mock(device_information={})
105        mock_adb_tools_object.IsAdbConnected.return_value = True
106        mock_adb_tools.return_value = mock_adb_tools_object
107        self.Patch(utils, "AddUserGroupsToCmd",
108                   side_effect=lambda cmd, groups: cmd)
109        mock_check_call = self.Patch(subprocess, "check_call")
110
111        local_instance = instance.LocalInstance("fake_config_path")
112        with mock.patch.dict("acloud.list.instance.os.environ", clear=True):
113            local_instance.Delete()
114
115        expected_env = {
116            'CUTTLEFISH_INSTANCE': '2',
117            'HOME': '/tmp/acloud_cvd_temp/local-instance-2',
118            'CUTTLEFISH_CONFIG_FILE': 'fake_config_path',
119        }
120        mock_check_call.assert_called_with(
121            'fake_cvd_tools_path/stop_cvd', stderr=subprocess.STDOUT,
122            shell=True, env=expected_env)
123        mock_adb_tools_object.DisconnectAdb.assert_called()
124
125    @mock.patch("acloud.list.instance.tempfile")
126    @mock.patch("acloud.list.instance.AdbTools")
127    def testCreateLocalGoldfishInstance(self, mock_adb_tools, mock_tempfile):
128        """"Test the attributes of LocalGoldfishInstance."""
129        mock_tempfile.gettempdir.return_value = "/unit/test"
130        mock_adb_tools.return_value = mock.Mock(device_information={})
131
132        inst = instance.LocalGoldfishInstance(1)
133
134        self.assertEqual(inst.name, "local-goldfish-instance-1")
135        self.assertEqual(inst.avd_type, constants.TYPE_GF)
136        self.assertEqual(inst.adb_port, 5555)
137        self.assertTrue(inst.islocal)
138        self.assertEqual(inst.console_port, 5554)
139        self.assertEqual(inst.device_serial, "emulator-5554")
140        self.assertEqual(inst.instance_dir,
141                         "/unit/test/acloud_gf_temp/local-goldfish-instance-1")
142
143    @mock.patch("acloud.list.instance.AdbTools")
144    def testGetLocalGoldfishInstances(self, mock_adb_tools):
145        """Test LocalGoldfishInstance.GetExistingInstances."""
146        mock_adb_tools.GetDeviceSerials.return_value = [
147            "127.0.0.1:6520", "emulator-5554", "ABCD", "emulator-5558"]
148
149        instances = instance.LocalGoldfishInstance.GetExistingInstances()
150
151        self.assertEqual(len(instances), 2)
152        self.assertEqual(instances[0].console_port, 5554)
153        self.assertEqual(instances[0].name, "local-goldfish-instance-1")
154        self.assertEqual(instances[1].console_port, 5558)
155        self.assertEqual(instances[1].name, "local-goldfish-instance-3")
156
157    def testGetMaxNumberOfGoldfishInstances(self):
158        """Test LocalGoldfishInstance.GetMaxNumberOfInstances."""
159        mock_environ = {}
160        with mock.patch.dict("acloud.list.instance.os.environ",
161                             mock_environ, clear=True):
162            num = instance.LocalGoldfishInstance.GetMaxNumberOfInstances()
163        self.assertEqual(num, 16)
164
165        mock_environ["ADB_LOCAL_TRANSPORT_MAX_PORT"] = "5565"
166        with mock.patch.dict("acloud.list.instance.os.environ",
167                             mock_environ, clear=True):
168            num = instance.LocalGoldfishInstance.GetMaxNumberOfInstances()
169        self.assertEqual(num, 6)
170
171    # pylint: disable=protected-access
172    def testGetElapsedTime(self):
173        """Test _GetElapsedTime"""
174        # Instance time can't parse
175        start_time = "error time"
176        self.assertEqual(instance._MSG_UNABLE_TO_CALCULATE,
177                         instance._GetElapsedTime(start_time))
178
179        # Remote instance elapsed time
180        now = "2019-01-14T13:00:00.000-07:00"
181        start_time = "2019-01-14T03:00:00.000-07:00"
182        self.Patch(instance, "datetime")
183        instance.datetime.datetime.now.return_value = dateutil.parser.parse(now)
184        self.assertEqual(
185            datetime.timedelta(hours=10), instance._GetElapsedTime(start_time))
186
187        # Local instance elapsed time
188        now = "Mon Jan 14 10:10:10 2019"
189        start_time = "Mon Jan 14 08:10:10 2019"
190        instance.datetime.datetime.now.return_value = dateutil.parser.parse(
191            now).replace(tzinfo=dateutil.tz.tzlocal())
192        self.assertEqual(
193            datetime.timedelta(hours=2), instance._GetElapsedTime(start_time))
194
195    # pylint: disable=protected-access
196    def testGetAdbVncPortFromSSHTunnel(self):
197        """"Test Get forwarding adb and vnc port from ssh tunnel."""
198        self.Patch(subprocess, "check_output", return_value=self.PS_SSH_TUNNEL)
199        self.Patch(instance, "_GetElapsedTime", return_value="fake_time")
200        self.Patch(instance.RemoteInstance, "_GetZoneName", return_value="fake_zone")
201        forwarded_ports = instance.RemoteInstance(
202            mock.MagicMock()).GetAdbVncPortFromSSHTunnel(
203                "1.1.1.1", constants.TYPE_CF)
204        self.assertEqual(54321, forwarded_ports.adb_port)
205        self.assertEqual(12345, forwarded_ports.vnc_port)
206
207        # If avd_type is undefined in utils.AVD_PORT_DICT.
208        forwarded_ports = instance.RemoteInstance(
209            mock.MagicMock()).GetAdbVncPortFromSSHTunnel(
210                "1.1.1.1", "undefined_avd_type")
211        self.assertEqual(None, forwarded_ports.adb_port)
212        self.assertEqual(None, forwarded_ports.vnc_port)
213
214    # pylint: disable=protected-access
215    def testProcessGceInstance(self):
216        """"Test process instance detail."""
217        fake_adb = 123456
218        fake_vnc = 654321
219        forwarded_ports = collections.namedtuple("ForwardedPorts",
220                                                 [constants.VNC_PORT,
221                                                  constants.ADB_PORT])
222        self.Patch(
223            instance.RemoteInstance,
224            "GetAdbVncPortFromSSHTunnel",
225            return_value=forwarded_ports(vnc_port=fake_vnc, adb_port=fake_adb))
226        self.Patch(instance, "_GetElapsedTime", return_value="fake_time")
227        self.Patch(AdbTools, "IsAdbConnected", return_value=True)
228
229        # test ssh_tunnel_is_connected will be true if ssh tunnel connection is found
230        instance_info = instance.RemoteInstance(self.GCE_INSTANCE)
231        self.assertTrue(instance_info.ssh_tunnel_is_connected)
232        self.assertEqual(instance_info.adb_port, fake_adb)
233        self.assertEqual(instance_info.vnc_port, fake_vnc)
234        self.assertEqual("1.1.1.1", instance_info.ip)
235        self.assertEqual("fake_status", instance_info.status)
236        self.assertEqual("fake_type", instance_info.avd_type)
237        self.assertEqual("fake_flavor", instance_info.avd_flavor)
238        expected_full_name = "device serial: 127.0.0.1:%s (%s) elapsed time: %s" % (
239            fake_adb, self.GCE_INSTANCE[constants.INS_KEY_NAME], "fake_time")
240        self.assertEqual(expected_full_name, instance_info.fullname)
241
242        # test ssh tunnel is connected but adb is disconnected
243        self.Patch(AdbTools, "IsAdbConnected", return_value=False)
244        instance_info = instance.RemoteInstance(self.GCE_INSTANCE)
245        self.assertTrue(instance_info.ssh_tunnel_is_connected)
246        expected_full_name = "device serial: not connected (%s) elapsed time: %s" % (
247            instance_info.name, "fake_time")
248        self.assertEqual(expected_full_name, instance_info.fullname)
249
250        # test ssh_tunnel_is_connected will be false if ssh tunnel connection is not found
251        self.Patch(
252            instance.RemoteInstance,
253            "GetAdbVncPortFromSSHTunnel",
254            return_value=forwarded_ports(vnc_port=None, adb_port=None))
255        instance_info = instance.RemoteInstance(self.GCE_INSTANCE)
256        self.assertFalse(instance_info.ssh_tunnel_is_connected)
257        expected_full_name = "device serial: not connected (%s) elapsed time: %s" % (
258            self.GCE_INSTANCE[constants.INS_KEY_NAME], "fake_time")
259        self.assertEqual(expected_full_name, instance_info.fullname)
260
261    def testInstanceSummary(self):
262        """Test instance summary."""
263        fake_adb = 123456
264        fake_vnc = 654321
265        forwarded_ports = collections.namedtuple("ForwardedPorts",
266                                                 [constants.VNC_PORT,
267                                                  constants.ADB_PORT])
268        self.Patch(
269            instance.RemoteInstance,
270            "GetAdbVncPortFromSSHTunnel",
271            return_value=forwarded_ports(vnc_port=fake_vnc, adb_port=fake_adb))
272        self.Patch(instance, "_GetElapsedTime", return_value="fake_time")
273        self.Patch(AdbTools, "IsAdbConnected", return_value=True)
274        remote_instance = instance.RemoteInstance(self.GCE_INSTANCE)
275        result_summary = (" name: fake_ins_name\n "
276                          "   IP: 1.1.1.1\n "
277                          "   create time: fake_create_time\n "
278                          "   elapse time: fake_time\n "
279                          "   status: fake_status\n "
280                          "   avd type: fake_type\n "
281                          "   display: None\n "
282                          "   vnc: 127.0.0.1:654321\n "
283                          "   zone: fake_zone\n "
284                          "   webrtc port: 8443\n "
285                          "   adb serial: 127.0.0.1:123456\n "
286                          "   product: None\n "
287                          "   model: None\n "
288                          "   device: None\n "
289                          "   transport_id: None")
290        self.assertEqual(remote_instance.Summary(), result_summary)
291
292        self.Patch(
293            instance.RemoteInstance,
294            "GetAdbVncPortFromSSHTunnel",
295            return_value=forwarded_ports(vnc_port=None, adb_port=None))
296        self.Patch(instance, "_GetElapsedTime", return_value="fake_time")
297        self.Patch(AdbTools, "IsAdbConnected", return_value=False)
298        remote_instance = instance.RemoteInstance(self.GCE_INSTANCE)
299        result_summary = (" name: fake_ins_name\n "
300                          "   IP: 1.1.1.1\n "
301                          "   create time: fake_create_time\n "
302                          "   elapse time: fake_time\n "
303                          "   status: fake_status\n "
304                          "   avd type: fake_type\n "
305                          "   display: None\n "
306                          "   vnc: 127.0.0.1:None\n "
307                          "   zone: fake_zone\n "
308                          "   webrtc port: 8443\n "
309                          "   adb serial: disconnected")
310        self.assertEqual(remote_instance.Summary(), result_summary)
311
312    def testGetZoneName(self):
313        """Test GetZoneName."""
314        zone_info = "v1/projects/project/zones/us-central1-c"
315        expected_result = "us-central1-c"
316        self.assertEqual(instance.RemoteInstance._GetZoneName(zone_info),
317                         expected_result)
318        # Test can't get zone name from zone info.
319        zone_info = "v1/projects/project/us-central1-c"
320        self.assertEqual(instance.RemoteInstance._GetZoneName(zone_info), None)
321
322
323if __name__ == "__main__":
324    unittest.main()
325