1#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Tests for acloud.internal.lib.utils."""
18
19import getpass
20import os
21import subprocess
22import time
23
24import mock
25
26import unittest
27from acloud.internal.lib import driver_test_lib
28from acloud.internal.lib import utils
29
30
31class UtilsTest(driver_test_lib.BaseDriverTest):
32
33  def testCreateSshKeyPair_KeyAlreadyExists(self):
34    """Test when the key pair already exists."""
35    public_key = "/fake/public_key"
36    private_key = "/fake/private_key"
37    self.Patch(os.path, "exists", side_effect=lambda path: path == public_key)
38    self.Patch(subprocess, "check_call")
39    utils.CreateSshKeyPairIfNotExist(private_key, public_key)
40    self.assertEqual(subprocess.check_call.call_count, 0)
41
42  def testCreateSshKeyPair_KeyAreCreated(self):
43    """Test when the key pair created."""
44    public_key = "/fake/public_key"
45    private_key = "/fake/private_key"
46    self.Patch(os.path, "exists", return_value=False)
47    self.Patch(subprocess, "check_call")
48    self.Patch(os, "rename")
49    utils.CreateSshKeyPairIfNotExist(private_key, public_key)
50    self.assertEqual(subprocess.check_call.call_count, 1)
51    subprocess.check_call.assert_called_with(
52        utils.SSH_KEYGEN_CMD + ["-C", getpass.getuser(), "-f", private_key],
53        stdout=mock.ANY, stderr=mock.ANY)
54
55  def testRetryOnException(self):
56    def _IsValueError(exc):
57      return isinstance(exc, ValueError)
58    num_retry = 5
59
60    @utils.RetryOnException(_IsValueError, num_retry)
61    def _RaiseAndRetry(sentinel):
62      sentinel.alert()
63      raise ValueError("Fake error.")
64
65    sentinel = mock.MagicMock()
66    self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
67    self.assertEqual(1 + num_retry, sentinel.alert.call_count)
68
69  def testRetryExceptionType(self):
70    """Test RetryExceptionType function."""
71    def _RaiseAndRetry(sentinel):
72      sentinel.alert()
73      raise ValueError("Fake error.")
74
75    num_retry = 5
76    sentinel = mock.MagicMock()
77    self.assertRaises(ValueError, utils.RetryExceptionType,
78                      (KeyError, ValueError), num_retry, _RaiseAndRetry,
79                      sentinel=sentinel)
80    self.assertEqual(1 + num_retry, sentinel.alert.call_count)
81
82  def testRetry(self):
83    """Test Retry."""
84    self.Patch(time, "sleep")
85    def _RaiseAndRetry(sentinel):
86      sentinel.alert()
87      raise ValueError("Fake error.")
88
89    num_retry = 5
90    sentinel = mock.MagicMock()
91    self.assertRaises(ValueError, utils.RetryExceptionType,
92                      (ValueError, KeyError), num_retry, _RaiseAndRetry,
93                      sleep_multiplier=1,
94                      retry_backoff_factor=2,
95                      sentinel=sentinel)
96
97    self.assertEqual(1 + num_retry, sentinel.alert.call_count)
98    time.sleep.assert_has_calls(
99        [mock.call(1), mock.call(2), mock.call(4), mock.call(8), mock.call(16)])
100
101
102if __name__ == "__main__":
103    unittest.main()
104