1#!/usr/bin/env python3
2# Copyright 2019 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""Unittests for the utils module."""
17
18import datetime
19import os
20import sys
21import unittest
22
23_path = os.path.realpath(__file__ + '/../..')
24if sys.path[0] != _path:
25    sys.path.insert(0, _path)
26del _path
27
28# We have to import our local modules after the sys.path tweak.  We can't use
29# relative imports because this is an executable program, not a module.
30# pylint: disable=wrong-import-position
31import rh
32import rh.utils
33
34
35class TimeDeltaStrTests(unittest.TestCase):
36    """Verify behavior of timedelta_str object."""
37
38    def test_same(self):
39        """Check timedelta of 0 seconds."""
40        delta = datetime.timedelta(0)
41        self.assertEqual('0.000s', rh.utils.timedelta_str(delta))
42
43    def test_millisecondss(self):
44        """Check timedelta of milliseconds."""
45        delta = datetime.timedelta(seconds=0.123456)
46        self.assertEqual('0.123s', rh.utils.timedelta_str(delta))
47
48    def test_seconds(self):
49        """Check timedelta of seconds."""
50        delta = datetime.timedelta(seconds=12.3)
51        self.assertEqual('12.300s', rh.utils.timedelta_str(delta))
52
53    def test_minutes(self):
54        """Check timedelta of minutes."""
55        delta = datetime.timedelta(seconds=72.3)
56        self.assertEqual('1m12.300s', rh.utils.timedelta_str(delta))
57
58    def test_hours(self):
59        """Check timedelta of hours."""
60        delta = datetime.timedelta(seconds=4000.3)
61        self.assertEqual('1h6m40.300s', rh.utils.timedelta_str(delta))
62
63
64class CompletedProcessTests(unittest.TestCase):
65    """Verify behavior of CompletedProcess object."""
66
67    def test_empty_cmdstr(self):
68        """Check cmdstr with an empty command."""
69        result = rh.utils.CompletedProcess(args=[])
70        self.assertEqual('', result.cmdstr)
71
72    def test_basic_cmdstr(self):
73        """Check cmdstr with a basic command command."""
74        result = rh.utils.CompletedProcess(args=['ls', 'a b'])
75        self.assertEqual("ls 'a b'", result.cmdstr)
76
77    def test_str(self):
78        """Check str() handling."""
79        # We don't enforce much, just that it doesn't crash.
80        result = rh.utils.CompletedProcess()
81        self.assertNotEqual('', str(result))
82        result = rh.utils.CompletedProcess(args=[])
83        self.assertNotEqual('', str(result))
84
85    def test_repr(self):
86        """Check repr() handling."""
87        # We don't enforce much, just that it doesn't crash.
88        result = rh.utils.CompletedProcess()
89        self.assertNotEqual('', repr(result))
90        result = rh.utils.CompletedProcess(args=[])
91        self.assertNotEqual('', repr(result))
92
93
94class CalledProcessErrorTests(unittest.TestCase):
95    """Verify behavior of CalledProcessError object."""
96
97    def test_basic(self):
98        """Basic test we can create a normal instance."""
99        rh.utils.CalledProcessError(0, ['mycmd'])
100        rh.utils.CalledProcessError(1, ['mycmd'], exception=Exception('bad'))
101
102    def test_stringify(self):
103        """Check stringify() handling."""
104        # We don't assert much so we leave flexibility in changing format.
105        err = rh.utils.CalledProcessError(0, ['mycmd'])
106        self.assertIn('mycmd', err.stringify())
107        err = rh.utils.CalledProcessError(
108            0, ['mycmd'], exception=Exception('bad'))
109        self.assertIn('mycmd', err.stringify())
110
111    def test_str(self):
112        """Check str() handling."""
113        # We don't assert much so we leave flexibility in changing format.
114        err = rh.utils.CalledProcessError(0, ['mycmd'])
115        self.assertIn('mycmd', str(err))
116        err = rh.utils.CalledProcessError(
117            0, ['mycmd'], exception=Exception('bad'))
118        self.assertIn('mycmd', str(err))
119
120    def test_repr(self):
121        """Check repr() handling."""
122        # We don't assert much so we leave flexibility in changing format.
123        err = rh.utils.CalledProcessError(0, ['mycmd'])
124        self.assertNotEqual('', repr(err))
125        err = rh.utils.CalledProcessError(
126            0, ['mycmd'], exception=Exception('bad'))
127        self.assertNotEqual('', repr(err))
128
129
130class RunCommandTests(unittest.TestCase):
131    """Verify behavior of run helper."""
132
133    def test_basic(self):
134        """Simple basic test."""
135        ret = rh.utils.run(['true'])
136        self.assertEqual('true', ret.cmdstr)
137        self.assertIsNone(ret.stdout)
138        self.assertIsNone(ret.stderr)
139
140    def test_stdout_capture(self):
141        """Verify output capturing works."""
142        ret = rh.utils.run(['echo', 'hi'], redirect_stdout=True)
143        self.assertEqual('hi\n', ret.stdout)
144        self.assertIsNone(ret.stderr)
145
146    def test_stderr_capture(self):
147        """Verify stderr capturing works."""
148        ret = rh.utils.run(['sh', '-c', 'echo hi >&2'], redirect_stderr=True)
149        self.assertIsNone(ret.stdout)
150        self.assertEqual('hi\n', ret.stderr)
151
152    def test_stdout_utf8(self):
153        """Verify reading UTF-8 data works."""
154        ret = rh.utils.run(['printf', r'\xc3\x9f'], redirect_stdout=True)
155        self.assertEqual(u'ß', ret.stdout)
156        self.assertIsNone(ret.stderr)
157
158    def test_stdin_utf8(self):
159        """Verify writing UTF-8 data works."""
160        ret = rh.utils.run(['cat'], redirect_stdout=True, input=u'ß')
161        self.assertEqual(u'ß', ret.stdout)
162        self.assertIsNone(ret.stderr)
163
164    def test_check_false(self):
165        """Verify handling of check=False."""
166        ret = rh.utils.run(['false'], check=False)
167        self.assertNotEqual(0, ret.returncode)
168        self.assertIn('false', str(ret))
169
170        ret = rh.utils.run(['true'], check=False)
171        self.assertEqual(0, ret.returncode)
172        self.assertIn('true', str(ret))
173
174    def test_check_true(self):
175        """Verify handling of check=True."""
176        with self.assertRaises(rh.utils.CalledProcessError) as e:
177            rh.utils.run(['false'], check=True)
178        err = e.exception
179        self.assertNotEqual(0, err.returncode)
180        self.assertIn('false', str(err))
181
182        ret = rh.utils.run(['true'], check=True)
183        self.assertEqual(0, ret.returncode)
184        self.assertIn('true', str(ret))
185
186    def test_check_false_output(self):
187        """Verify handling of output capturing w/check=False."""
188        with self.assertRaises(rh.utils.CalledProcessError) as e:
189            rh.utils.run(['sh', '-c', 'echo out; echo err >&2; false'],
190                         check=True, capture_output=True)
191        err = e.exception
192        self.assertNotEqual(0, err.returncode)
193        self.assertIn('false', str(err))
194
195    def test_check_true_missing_prog_output(self):
196        """Verify handling of output capturing w/missing progs."""
197        with self.assertRaises(rh.utils.CalledProcessError) as e:
198            rh.utils.run(['./!~a/b/c/d/'], check=True, capture_output=True)
199        err = e.exception
200        self.assertNotEqual(0, err.returncode)
201        self.assertIn('a/b/c/d', str(err))
202
203    def test_check_false_missing_prog_output(self):
204        """Verify handling of output capturing w/missing progs."""
205        ret = rh.utils.run(['./!~a/b/c/d/'], check=False, capture_output=True)
206        self.assertNotEqual(0, ret.returncode)
207        self.assertIn('a/b/c/d', str(ret))
208
209    def test_check_false_missing_prog_combined_output(self):
210        """Verify handling of combined output capturing w/missing progs."""
211        with self.assertRaises(rh.utils.CalledProcessError) as e:
212            rh.utils.run(['./!~a/b/c/d/'], check=True,
213                         combine_stdout_stderr=True)
214        err = e.exception
215        self.assertNotEqual(0, err.returncode)
216        self.assertIn('a/b/c/d', str(err))
217
218
219if __name__ == '__main__':
220    unittest.main()
221