1#!/usr/bin/python
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Unittests for deploy_production_local.py."""
7
8from __future__ import print_function
9
10import mock
11import subprocess
12import unittest
13
14import deploy_production_local as dpl
15
16
17class TestDeployProductionLocal(unittest.TestCase):
18    """Test deploy_production_local with commands mocked out."""
19
20    orig_timer = dpl.SERVICE_STABILITY_TIMER
21
22    PROD_STATUS = ('\x1b[1mproject autotest/                             '
23                   '  \x1b[m\x1b[1mbranch prod\x1b[m\n')
24
25    PROD_VERSIONS = '''\x1b[1mproject autotest/\x1b[m
26/usr/local/autotest
27ebb2182
28
29\x1b[1mproject autotest/site_utils/autotest_private/\x1b[m
30/usr/local/autotest/site_utils/autotest_private
3178b9626
32
33\x1b[1mproject autotest/site_utils/autotest_tools/\x1b[m
34/usr/local/autotest/site_utils/autotest_tools
35a1598f7
36'''
37
38
39    def setUp(self):
40        dpl.SERVICE_STABILITY_TIMER = 0.01
41
42    def tearDown(self):
43        dpl.SERVICE_STABILITY_TIMER = self.orig_timer
44
45    def test_strip_terminal_codes(self):
46        """Test deploy_production_local.strip_terminal_codes."""
47        # Leave format free lines alone.
48        result = dpl.strip_terminal_codes('')
49        self.assertEqual(result, '')
50
51        result = dpl.strip_terminal_codes('This is normal text.')
52        self.assertEqual(result, 'This is normal text.')
53
54        result = dpl.strip_terminal_codes('Line1\nLine2\n')
55        self.assertEqual(result, 'Line1\nLine2\n')
56
57        result = dpl.strip_terminal_codes('Line1\nLine2\n')
58        self.assertEqual(result, 'Line1\nLine2\n')
59
60        # Test cleaning lines with formatting.
61        result = dpl.strip_terminal_codes('\x1b[1m')
62        self.assertEqual(result, '')
63
64        result = dpl.strip_terminal_codes('\x1b[m')
65        self.assertEqual(result, '')
66
67        result = dpl.strip_terminal_codes('\x1b[1mm')
68        self.assertEqual(result, 'm')
69
70        result = dpl.strip_terminal_codes(self.PROD_STATUS)
71        self.assertEqual(result,
72                'project autotest/                               branch prod\n')
73
74        result = dpl.strip_terminal_codes(self.PROD_VERSIONS)
75        self.assertEqual(result, '''project autotest/
76/usr/local/autotest
77ebb2182
78
79project autotest/site_utils/autotest_private/
80/usr/local/autotest/site_utils/autotest_private
8178b9626
82
83project autotest/site_utils/autotest_tools/
84/usr/local/autotest/site_utils/autotest_tools
85a1598f7
86''')
87
88    @mock.patch('subprocess.check_output', autospec=True)
89    def test_verify_repo_clean(self, run_cmd):
90        """Test deploy_production_local.verify_repo_clean.
91
92        @param run_cmd: Mock of subprocess call used.
93        """
94        # If repo returns what we expect, exit cleanly.
95        run_cmd.return_value = 'nothing to commit (working directory clean)\n'
96        dpl.verify_repo_clean()
97
98        # If repo contains any branches (even clean ones), raise.
99        run_cmd.return_value = self.PROD_STATUS
100        with self.assertRaises(dpl.DirtyTreeException):
101            dpl.verify_repo_clean()
102
103        # If repo doesn't return what we expect, raise.
104        run_cmd.return_value = "That's a very dirty repo you've got."
105        with self.assertRaises(dpl.DirtyTreeException):
106            dpl.verify_repo_clean()
107
108    @mock.patch('subprocess.check_output', autospec=True)
109    def test_repo_versions(self, run_cmd):
110        """Test deploy_production_local.repo_versions.
111
112        @param run_cmd: Mock of subprocess call used.
113        """
114        expected = {
115            'autotest':
116            ('/usr/local/autotest', 'ebb2182'),
117            'autotest/site_utils/autotest_private':
118            ('/usr/local/autotest/site_utils/autotest_private', '78b9626'),
119            'autotest/site_utils/autotest_tools':
120            ('/usr/local/autotest/site_utils/autotest_tools', 'a1598f7'),
121        }
122
123        run_cmd.return_value = self.PROD_VERSIONS
124        result = dpl.repo_versions()
125        self.assertEquals(result, expected)
126
127        run_cmd.assert_called_with(
128                ['repo', 'forall', '-p', '-c',
129                 'pwd && git log -1 --format=%h'])
130
131    @mock.patch('subprocess.check_output', autospec=True)
132    def test_repo_sync(self, run_cmd):
133        """Test deploy_production_local.repo_sync.
134
135        @param run_cmd: Mock of subprocess call used.
136        """
137        dpl.repo_sync()
138        run_cmd.assert_called_with(['repo', 'sync'])
139
140    def test_discover_commands_and_services(self):
141        """Test deploy_production_local.discover_update_commands and
142        discover_restart_services."""
143        # It should always be a list, and should always be callable in
144        # any local environment, though the result will vary.
145        result = dpl.discover_update_commands()
146        self.assertIsInstance(result, list)
147
148        result = dpl.discover_restart_services()
149        self.assertIsInstance(result, list)
150
151    @mock.patch('subprocess.check_output', autospec=True)
152    def test_update_command(self, run_cmd):
153        """Test deploy_production_local.update_command.
154
155        @param run_cmd: Mock of subprocess call used.
156        """
157        # Call with a bad command name.
158        with self.assertRaises(dpl.UnknownCommandException):
159            dpl.update_command('Unknown Command')
160        self.assertFalse(run_cmd.called)
161
162        # Call with a valid command name.
163        dpl.update_command('apache')
164        run_cmd.assert_called_with('sudo service apache2 reload', shell=True,
165                                   stderr=subprocess.STDOUT)
166
167        # Call with a valid command name that uses AUTOTEST_REPO expansion.
168        dpl.update_command('build_externals')
169        expanded_cmd = dpl.common.autotest_dir+'/utils/build_externals.py'
170        run_cmd.assert_called_with(expanded_cmd, shell=True,
171                                   stderr=subprocess.STDOUT)
172
173        # Test a failed command.
174        failure = subprocess.CalledProcessError(10, expanded_cmd, 'output')
175
176        run_cmd.side_effect = failure
177        with self.assertRaises(subprocess.CalledProcessError) as unstable:
178            dpl.update_command('build_externals')
179
180    @mock.patch('subprocess.check_call', autospec=True)
181    def test_restart_service(self, run_cmd):
182        """Test deploy_production_local.restart_service.
183
184        @param run_cmd: Mock of subprocess call used.
185        """
186        # Standard call.
187        dpl.restart_service('foobar')
188        run_cmd.assert_called_with(['sudo', 'service', 'foobar', 'restart'])
189
190    @mock.patch('subprocess.check_output', autospec=True)
191    def test_restart_status(self, run_cmd):
192        """Test deploy_production_local.service_status.
193
194        @param run_cmd: Mock of subprocess call used.
195        """
196        # Standard call.
197        dpl.service_status('foobar')
198        run_cmd.assert_called_with(['sudo', 'status', 'foobar'])
199
200    @mock.patch.object(dpl, 'restart_service', autospec=True)
201    def _test_restart_services(self, service_results, _restart):
202        """Helper for testing restart_services.
203
204        @param service_results: {'service_name': ['status_1', 'status_2']}
205        """
206        # each call to service_status should return the next status value for
207        # that service.
208        with mock.patch.object(dpl, 'service_status', autospec=True,
209                               side_effect=lambda n: service_results[n].pop(0)):
210            dpl.restart_services(service_results.keys())
211
212    def test_restart_services(self):
213        """Test deploy_production_local.restart_services."""
214        single_stable = {'foo': ['status_ok', 'status_ok']}
215        double_stable = {'foo': ['status_a', 'status_a'],
216                         'bar': ['status_b', 'status_b']}
217
218        # Verify we can handle stable services.
219        self._test_restart_services(single_stable)
220        self._test_restart_services(double_stable)
221
222        single_unstable = {'foo': ['status_ok', 'status_not_ok']}
223        triple_unstable = {'foo': ['status_a', 'status_a'],
224                           'bar': ['status_b', 'status_b_not_ok'],
225                           'joe': ['status_c', 'status_c_not_ok']}
226
227        # Verify we can handle unstable services and report the right failures.
228        with self.assertRaises(dpl.UnstableServices) as unstable:
229            self._test_restart_services(single_unstable)
230        self.assertEqual(unstable.exception.args[0], ['foo'])
231
232        with self.assertRaises(dpl.UnstableServices) as unstable:
233            self._test_restart_services(triple_unstable)
234        self.assertEqual(unstable.exception.args[0], ['bar', 'joe'])
235
236    @mock.patch('subprocess.check_output', autospec=True)
237    def test_report_changes_no_update(self, run_cmd):
238        """Test deploy_production_local.report_changes.
239
240        @param run_cmd: Mock of subprocess call used.
241        """
242
243        before = {
244            'autotest': ('/usr/local/autotest', 'auto_before'),
245            'autotest_private': ('/dir/autotest_private', '78b9626'),
246            'other': ('/fake/unchanged', 'constant_hash'),
247        }
248
249        run_cmd.return_value = 'hash1 Fix change.\nhash2 Bad change.\n'
250
251        result = dpl.report_changes(before, None)
252
253        self.assertEqual(result, """autotest: auto_before
254autotest_private: 78b9626
255other: constant_hash
256""")
257
258        self.assertFalse(run_cmd.called)
259
260    @mock.patch('subprocess.check_output', autospec=True)
261    def test_report_changes_diff(self, run_cmd):
262        """Test deploy_production_local.report_changes.
263
264        @param run_cmd: Mock of subprocess call used.
265        """
266
267        before = {
268            'autotest': ('/usr/local/autotest', 'auto_before'),
269            'autotest_private': ('/dir/autotest_private', '78b9626'),
270            'other': ('/fake/unchanged', 'constant_hash'),
271        }
272
273        after = {
274            'autotest': ('/usr/local/autotest', 'auto_after'),
275            'autotest_tools': ('/dir/autotest_tools', 'a1598f7'),
276            'other': ('/fake/unchanged', 'constant_hash'),
277        }
278
279        run_cmd.return_value = 'hash1 Fix change.\nhash2 Bad change.\n'
280
281        result = dpl.report_changes(before, after)
282
283        self.assertEqual(result, """autotest:
284hash1 Fix change.
285hash2 Bad change.
286
287autotest_private:
288Removed.
289
290autotest_tools:
291Added.
292
293other:
294No Change.
295""")
296
297        run_cmd.assert_called_with(
298                ['git', 'log', 'auto_before..auto_after', '--oneline'],
299                cwd='/usr/local/autotest', stderr=subprocess.STDOUT)
300
301    def test_parse_arguments(self):
302        """Test deploy_production_local.parse_arguments."""
303        # No arguments.
304        results = dpl.parse_arguments([])
305        self.assertDictContainsSubset(
306                {'verify': True, 'update': True, 'actions': True,
307                 'report': True, 'dryrun': False},
308                vars(results))
309
310        # Dryrun.
311        results = dpl.parse_arguments(['--dryrun'])
312        self.assertDictContainsSubset(
313                {'verify': False, 'update': False, 'actions': True,
314                 'report': True, 'dryrun': True},
315                vars(results))
316
317        # Restart only.
318        results = dpl.parse_arguments(['--actions-only'])
319        self.assertDictContainsSubset(
320                {'verify': False, 'update': False, 'actions': True,
321                 'report': False, 'dryrun': False},
322                vars(results))
323
324        # All skip arguments.
325        results = dpl.parse_arguments(['--skip-verify', '--skip-update',
326                                       '--skip-actions', '--skip-report'])
327        self.assertDictContainsSubset(
328                {'verify': False, 'update': False, 'actions': False,
329                 'report': False, 'dryrun': False},
330                vars(results))
331
332        # All arguments.
333        results = dpl.parse_arguments(['--skip-verify', '--skip-update',
334                                       '--skip-actions', '--skip-report',
335                                       '--actions-only', '--dryrun'])
336        self.assertDictContainsSubset(
337                {'verify': False, 'update': False, 'actions': False,
338                 'report': False, 'dryrun': True},
339                vars(results))
340
341
342if __name__ == '__main__':
343    unittest.main()
344