1#!/usr/bin/python
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Unittests for deploy_server_local.py."""
7
8from __future__ import print_function
9
10import mock
11import subprocess
12import unittest
13
14import deploy_server_local as dsl
15
16
17class TestDeployServerLocal(unittest.TestCase):
18    """Test deploy_server_local with commands mocked out."""
19
20    orig_timer = dsl.SERVICE_STABILITY_TIMER
21
22    PROD_STATUS = ('\x1b[1mproject autotest/                             '
23                   '  \x1b[m\x1b[1mbranch prod\x1b[m\n')
24
25    PROD_VERSIONS = '''\x1b[1mproject autotest/\x1b[m
26/usr/local/autotest
27ebb2182
28
29\x1b[1mproject autotest/site_utils/autotest_private/\x1b[m
30/usr/local/autotest/site_utils/autotest_private
3178b9626
32
33\x1b[1mproject autotest/site_utils/autotest_tools/\x1b[m
34/usr/local/autotest/site_utils/autotest_tools
35a1598f7
36'''
37
38
39    def setUp(self):
40        dsl.SERVICE_STABILITY_TIMER = 0.01
41
42    def tearDown(self):
43        dsl.SERVICE_STABILITY_TIMER = self.orig_timer
44
45    def test_strip_terminal_codes(self):
46        """Test deploy_server_local.strip_terminal_codes."""
47        # Leave format free lines alone.
48        result = dsl.strip_terminal_codes('')
49        self.assertEqual(result, '')
50
51        result = dsl.strip_terminal_codes('This is normal text.')
52        self.assertEqual(result, 'This is normal text.')
53
54        result = dsl.strip_terminal_codes('Line1\nLine2\n')
55        self.assertEqual(result, 'Line1\nLine2\n')
56
57        result = dsl.strip_terminal_codes('Line1\nLine2\n')
58        self.assertEqual(result, 'Line1\nLine2\n')
59
60        # Test cleaning lines with formatting.
61        result = dsl.strip_terminal_codes('\x1b[1m')
62        self.assertEqual(result, '')
63
64        result = dsl.strip_terminal_codes('\x1b[m')
65        self.assertEqual(result, '')
66
67        result = dsl.strip_terminal_codes('\x1b[1mm')
68        self.assertEqual(result, 'm')
69
70        result = dsl.strip_terminal_codes(self.PROD_STATUS)
71        self.assertEqual(result,
72                'project autotest/                               branch prod\n')
73
74        result = dsl.strip_terminal_codes(self.PROD_VERSIONS)
75        self.assertEqual(result, '''project autotest/
76/usr/local/autotest
77ebb2182
78
79project autotest/site_utils/autotest_private/
80/usr/local/autotest/site_utils/autotest_private
8178b9626
82
83project autotest/site_utils/autotest_tools/
84/usr/local/autotest/site_utils/autotest_tools
85a1598f7
86''')
87
88    @mock.patch('subprocess.check_output', autospec=True)
89    def test_verify_repo_clean(self, run_cmd):
90        """Test deploy_server_local.verify_repo_clean.
91
92        @param run_cmd: Mock of subprocess call used.
93        """
94        # If repo returns what we expect, exit cleanly.
95        run_cmd.return_value = 'nothing to commit (working directory clean)\n'
96        dsl.verify_repo_clean()
97
98        # If repo contains any branches (even clean ones), raise.
99        run_cmd.return_value = self.PROD_STATUS
100        with self.assertRaises(dsl.DirtyTreeException):
101            dsl.verify_repo_clean()
102
103        # If repo doesn't return what we expect, raise.
104        run_cmd.return_value = "That's a very dirty repo you've got."
105        with self.assertRaises(dsl.DirtyTreeException):
106            dsl.verify_repo_clean()
107
108    @mock.patch('subprocess.check_output', autospec=True)
109    def test_repo_versions(self, run_cmd):
110        """Test deploy_server_local.repo_versions.
111
112        @param run_cmd: Mock of subprocess call used.
113        """
114        expected = {
115            'autotest':
116            ('/usr/local/autotest', 'ebb2182'),
117            'autotest/site_utils/autotest_private':
118            ('/usr/local/autotest/site_utils/autotest_private', '78b9626'),
119            'autotest/site_utils/autotest_tools':
120            ('/usr/local/autotest/site_utils/autotest_tools', 'a1598f7'),
121        }
122
123        run_cmd.return_value = self.PROD_VERSIONS
124        result = dsl.repo_versions()
125        self.assertEquals(result, expected)
126
127        run_cmd.assert_called_with(
128                ['repo', 'forall', '-p', '-c',
129                 'pwd && git log -1 --format=%h'])
130
131    @mock.patch('subprocess.check_output', autospec=True)
132    def test_repo_sync_not_for_push_servers(self, run_cmd):
133        """Test deploy_server_local.repo_sync.
134
135        @param run_cmd: Mock of subprocess call used.
136        """
137        dsl.repo_sync()
138        expect_cmds = [mock.call(['git', 'checkout', 'cros/prod'], stderr=-2),
139                       mock.call(['pyclean', '.', '-q'])]
140        run_cmd.assert_has_calls(expect_cmds)
141
142    @mock.patch('subprocess.check_output', autospec=True)
143    def test_repo_sync_for_push_servers(self, run_cmd):
144        """Test deploy_server_local.repo_sync.
145
146        @param run_cmd: Mock of subprocess call used.
147        """
148        dsl.repo_sync(update_push_servers=True)
149        expect_cmds = [mock.call(['git', 'checkout', 'cros/master'], stderr=-2),
150                       mock.call(['pyclean', '.', '-q'])]
151        run_cmd.assert_has_calls(expect_cmds)
152
153    def test_discover_commands_and_services(self):
154        """Test deploy_server_local.discover_update_commands and
155        discover_restart_services."""
156        # It should always be a list, and should always be callable in
157        # any local environment, though the result will vary.
158        result = dsl.discover_update_commands()
159        self.assertIsInstance(result, list)
160
161        result = dsl.discover_restart_services()
162        self.assertIsInstance(result, list)
163
164    @mock.patch('subprocess.check_output', autospec=True)
165    def test_update_command(self, run_cmd):
166        """Test deploy_server_local.update_command.
167
168        @param run_cmd: Mock of subprocess call used.
169        """
170        # Call with a bad command name.
171        with self.assertRaises(dsl.UnknownCommandException):
172            dsl.update_command('Unknown Command')
173        self.assertFalse(run_cmd.called)
174
175        # Call with a valid command name.
176        dsl.update_command('apache')
177        run_cmd.assert_called_with('sudo service apache2 reload', shell=True,
178                                   stderr=subprocess.STDOUT)
179
180        # Call with a valid command name that uses AUTOTEST_REPO expansion.
181        dsl.update_command('build_externals')
182        expanded_cmd = dsl.common.autotest_dir+'/utils/build_externals.py'
183        run_cmd.assert_called_with(expanded_cmd, shell=True,
184                                   stderr=subprocess.STDOUT)
185
186        # Test a failed command.
187        failure = subprocess.CalledProcessError(10, expanded_cmd, 'output')
188
189        run_cmd.side_effect = failure
190        with self.assertRaises(subprocess.CalledProcessError) as unstable:
191            dsl.update_command('build_externals')
192
193    @mock.patch('subprocess.check_call', autospec=True)
194    def test_restart_service(self, run_cmd):
195        """Test deploy_server_local.restart_service.
196
197        @param run_cmd: Mock of subprocess call used.
198        """
199        # Standard call.
200        dsl.restart_service('foobar')
201        run_cmd.assert_called_with(['sudo', 'service', 'foobar', 'restart'],
202                                   stderr=-2)
203
204    @mock.patch('subprocess.check_output', autospec=True)
205    def test_restart_status(self, run_cmd):
206        """Test deploy_server_local.service_status.
207
208        @param run_cmd: Mock of subprocess call used.
209        """
210        # Standard call.
211        dsl.service_status('foobar')
212        run_cmd.assert_called_with(['sudo', 'status', 'foobar'])
213
214    @mock.patch.object(dsl, 'restart_service', autospec=True)
215    def _test_restart_services(self, service_results, _restart):
216        """Helper for testing restart_services.
217
218        @param service_results: {'service_name': ['status_1', 'status_2']}
219        """
220        # each call to service_status should return the next status value for
221        # that service.
222        with mock.patch.object(dsl, 'service_status', autospec=True,
223                               side_effect=lambda n: service_results[n].pop(0)):
224            dsl.restart_services(service_results.keys())
225
226    def test_restart_services(self):
227        """Test deploy_server_local.restart_services."""
228        single_stable = {'foo': ['status_ok', 'status_ok']}
229        double_stable = {'foo': ['status_a', 'status_a'],
230                         'bar': ['status_b', 'status_b']}
231
232        # Verify we can handle stable services.
233        self._test_restart_services(single_stable)
234        self._test_restart_services(double_stable)
235
236        single_unstable = {'foo': ['status_ok', 'status_not_ok']}
237        triple_unstable = {'foo': ['status_a', 'status_a'],
238                           'bar': ['status_b', 'status_b_not_ok'],
239                           'joe': ['status_c', 'status_c_not_ok']}
240
241        # Verify we can handle unstable services and report the right failures.
242        with self.assertRaises(dsl.UnstableServices) as unstable:
243            self._test_restart_services(single_unstable)
244        self.assertEqual(unstable.exception.args[0], ['foo'])
245
246        with self.assertRaises(dsl.UnstableServices) as unstable:
247            self._test_restart_services(triple_unstable)
248        self.assertEqual(unstable.exception.args[0], ['bar', 'joe'])
249
250    @mock.patch('subprocess.check_output', autospec=True)
251    def test_report_changes_no_update(self, run_cmd):
252        """Test deploy_server_local.report_changes.
253
254        @param run_cmd: Mock of subprocess call used.
255        """
256
257        before = {
258            'autotest': ('/usr/local/autotest', 'auto_before'),
259            'autotest_private': ('/dir/autotest_private', '78b9626'),
260            'other': ('/fake/unchanged', 'constant_hash'),
261        }
262
263        run_cmd.return_value = 'hash1 Fix change.\nhash2 Bad change.\n'
264
265        result = dsl.report_changes(before, None)
266
267        self.assertEqual(result, """autotest: auto_before
268autotest_private: 78b9626
269other: constant_hash
270""")
271
272        self.assertFalse(run_cmd.called)
273
274    @mock.patch('subprocess.check_output', autospec=True)
275    def test_report_changes_diff(self, run_cmd):
276        """Test deploy_server_local.report_changes.
277
278        @param run_cmd: Mock of subprocess call used.
279        """
280
281        before = {
282            'autotest': ('/usr/local/autotest', 'auto_before'),
283            'autotest_private': ('/dir/autotest_private', '78b9626'),
284            'other': ('/fake/unchanged', 'constant_hash'),
285        }
286
287        after = {
288            'autotest': ('/usr/local/autotest', 'auto_after'),
289            'autotest_tools': ('/dir/autotest_tools', 'a1598f7'),
290            'other': ('/fake/unchanged', 'constant_hash'),
291        }
292
293        run_cmd.return_value = 'hash1 Fix change.\nhash2 Bad change.\n'
294
295        result = dsl.report_changes(before, after)
296
297        self.assertEqual(result, """autotest:
298hash1 Fix change.
299hash2 Bad change.
300
301autotest_private:
302Removed.
303
304autotest_tools:
305Added.
306
307other:
308No Change.
309""")
310
311        run_cmd.assert_called_with(
312                ['git', 'log', 'auto_before..auto_after', '--oneline'],
313                cwd='/usr/local/autotest', stderr=subprocess.STDOUT)
314
315    def test_parse_arguments(self):
316        """Test deploy_server_local.parse_arguments."""
317        # No arguments.
318        results = dsl.parse_arguments([])
319        self.assertDictContainsSubset(
320                {'verify': True, 'update': True, 'actions': True,
321                 'report': True, 'dryrun': False, 'update_push_servers': False},
322                vars(results))
323
324        # Update test_push servers.
325        results = dsl.parse_arguments(['--update_push_servers'])
326        self.assertDictContainsSubset(
327                {'verify': True, 'update': True, 'actions': True,
328                 'report': True, 'dryrun': False, 'update_push_servers': True},
329                vars(results))
330
331        # Dryrun.
332        results = dsl.parse_arguments(['--dryrun'])
333        self.assertDictContainsSubset(
334                {'verify': False, 'update': False, 'actions': True,
335                 'report': True, 'dryrun': True, 'update_push_servers': False},
336                vars(results))
337
338        # Restart only.
339        results = dsl.parse_arguments(['--actions-only'])
340        self.assertDictContainsSubset(
341                {'verify': False, 'update': False, 'actions': True,
342                 'report': False, 'dryrun': False,
343                 'update_push_servers': False},
344                vars(results))
345
346        # All skip arguments.
347        results = dsl.parse_arguments(['--skip-verify', '--skip-update',
348                                       '--skip-actions', '--skip-report'])
349        self.assertDictContainsSubset(
350                {'verify': False, 'update': False, 'actions': False,
351                 'report': False, 'dryrun': False,
352                 'update_push_servers': False},
353                vars(results))
354
355        # All arguments.
356        results = dsl.parse_arguments(['--skip-verify', '--skip-update',
357                                       '--skip-actions', '--skip-report',
358                                       '--actions-only', '--dryrun',
359                                       '--update_push_servers'])
360        self.assertDictContainsSubset(
361                {'verify': False, 'update': False, 'actions': False,
362                 'report': False, 'dryrun': True, 'update_push_servers': True},
363                vars(results))
364
365
366if __name__ == '__main__':
367    unittest.main()
368