1#!/usr/bin/env python3 2# Copyright 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16"""Unittests for the utils module.""" 17 18import datetime 19import os 20import sys 21import unittest 22 23_path = os.path.realpath(__file__ + '/../..') 24if sys.path[0] != _path: 25 sys.path.insert(0, _path) 26del _path 27 28# We have to import our local modules after the sys.path tweak. We can't use 29# relative imports because this is an executable program, not a module. 30# pylint: disable=wrong-import-position 31import rh 32import rh.utils 33 34 35class TimeDeltaStrTests(unittest.TestCase): 36 """Verify behavior of timedelta_str object.""" 37 38 def test_same(self): 39 """Check timedelta of 0 seconds.""" 40 delta = datetime.timedelta(0) 41 self.assertEqual('0.000s', rh.utils.timedelta_str(delta)) 42 43 def test_millisecondss(self): 44 """Check timedelta of milliseconds.""" 45 delta = datetime.timedelta(seconds=0.123456) 46 self.assertEqual('0.123s', rh.utils.timedelta_str(delta)) 47 48 def test_seconds(self): 49 """Check timedelta of seconds.""" 50 delta = datetime.timedelta(seconds=12.3) 51 self.assertEqual('12.300s', rh.utils.timedelta_str(delta)) 52 53 def test_minutes(self): 54 """Check timedelta of minutes.""" 55 delta = datetime.timedelta(seconds=72.3) 56 self.assertEqual('1m12.300s', rh.utils.timedelta_str(delta)) 57 58 def test_hours(self): 59 """Check timedelta of hours.""" 60 delta = datetime.timedelta(seconds=4000.3) 61 self.assertEqual('1h6m40.300s', rh.utils.timedelta_str(delta)) 62 63 64class CompletedProcessTests(unittest.TestCase): 65 """Verify behavior of CompletedProcess object.""" 66 67 def test_empty_cmdstr(self): 68 """Check cmdstr with an empty command.""" 69 result = rh.utils.CompletedProcess(args=[]) 70 self.assertEqual('', result.cmdstr) 71 72 def test_basic_cmdstr(self): 73 """Check cmdstr with a basic command command.""" 74 result = rh.utils.CompletedProcess(args=['ls', 'a b']) 75 self.assertEqual("ls 'a b'", result.cmdstr) 76 77 def test_str(self): 78 """Check str() handling.""" 79 # We don't enforce much, just that it doesn't crash. 80 result = rh.utils.CompletedProcess() 81 self.assertNotEqual('', str(result)) 82 result = rh.utils.CompletedProcess(args=[]) 83 self.assertNotEqual('', str(result)) 84 85 def test_repr(self): 86 """Check repr() handling.""" 87 # We don't enforce much, just that it doesn't crash. 88 result = rh.utils.CompletedProcess() 89 self.assertNotEqual('', repr(result)) 90 result = rh.utils.CompletedProcess(args=[]) 91 self.assertNotEqual('', repr(result)) 92 93 94class CalledProcessErrorTests(unittest.TestCase): 95 """Verify behavior of CalledProcessError object.""" 96 97 def test_basic(self): 98 """Basic test we can create a normal instance.""" 99 rh.utils.CalledProcessError(0, ['mycmd']) 100 rh.utils.CalledProcessError(1, ['mycmd'], exception=Exception('bad')) 101 102 def test_stringify(self): 103 """Check stringify() handling.""" 104 # We don't assert much so we leave flexibility in changing format. 105 err = rh.utils.CalledProcessError(0, ['mycmd']) 106 self.assertIn('mycmd', err.stringify()) 107 err = rh.utils.CalledProcessError( 108 0, ['mycmd'], exception=Exception('bad')) 109 self.assertIn('mycmd', err.stringify()) 110 111 def test_str(self): 112 """Check str() handling.""" 113 # We don't assert much so we leave flexibility in changing format. 114 err = rh.utils.CalledProcessError(0, ['mycmd']) 115 self.assertIn('mycmd', str(err)) 116 err = rh.utils.CalledProcessError( 117 0, ['mycmd'], exception=Exception('bad')) 118 self.assertIn('mycmd', str(err)) 119 120 def test_repr(self): 121 """Check repr() handling.""" 122 # We don't assert much so we leave flexibility in changing format. 123 err = rh.utils.CalledProcessError(0, ['mycmd']) 124 self.assertNotEqual('', repr(err)) 125 err = rh.utils.CalledProcessError( 126 0, ['mycmd'], exception=Exception('bad')) 127 self.assertNotEqual('', repr(err)) 128 129 130class RunCommandTests(unittest.TestCase): 131 """Verify behavior of run helper.""" 132 133 def test_basic(self): 134 """Simple basic test.""" 135 ret = rh.utils.run(['true']) 136 self.assertEqual('true', ret.cmdstr) 137 self.assertIsNone(ret.stdout) 138 self.assertIsNone(ret.stderr) 139 140 def test_stdout_capture(self): 141 """Verify output capturing works.""" 142 ret = rh.utils.run(['echo', 'hi'], redirect_stdout=True) 143 self.assertEqual('hi\n', ret.stdout) 144 self.assertIsNone(ret.stderr) 145 146 def test_stderr_capture(self): 147 """Verify stderr capturing works.""" 148 ret = rh.utils.run(['sh', '-c', 'echo hi >&2'], redirect_stderr=True) 149 self.assertIsNone(ret.stdout) 150 self.assertEqual('hi\n', ret.stderr) 151 152 def test_stdout_utf8(self): 153 """Verify reading UTF-8 data works.""" 154 ret = rh.utils.run(['printf', r'\xc3\x9f'], redirect_stdout=True) 155 self.assertEqual(u'ß', ret.stdout) 156 self.assertIsNone(ret.stderr) 157 158 def test_stdin_utf8(self): 159 """Verify writing UTF-8 data works.""" 160 ret = rh.utils.run(['cat'], redirect_stdout=True, input=u'ß') 161 self.assertEqual(u'ß', ret.stdout) 162 self.assertIsNone(ret.stderr) 163 164 def test_check_false(self): 165 """Verify handling of check=False.""" 166 ret = rh.utils.run(['false'], check=False) 167 self.assertNotEqual(0, ret.returncode) 168 self.assertIn('false', str(ret)) 169 170 ret = rh.utils.run(['true'], check=False) 171 self.assertEqual(0, ret.returncode) 172 self.assertIn('true', str(ret)) 173 174 def test_check_true(self): 175 """Verify handling of check=True.""" 176 with self.assertRaises(rh.utils.CalledProcessError) as e: 177 rh.utils.run(['false'], check=True) 178 err = e.exception 179 self.assertNotEqual(0, err.returncode) 180 self.assertIn('false', str(err)) 181 182 ret = rh.utils.run(['true'], check=True) 183 self.assertEqual(0, ret.returncode) 184 self.assertIn('true', str(ret)) 185 186 def test_check_false_output(self): 187 """Verify handling of output capturing w/check=False.""" 188 with self.assertRaises(rh.utils.CalledProcessError) as e: 189 rh.utils.run(['sh', '-c', 'echo out; echo err >&2; false'], 190 check=True, capture_output=True) 191 err = e.exception 192 self.assertNotEqual(0, err.returncode) 193 self.assertIn('false', str(err)) 194 195 def test_check_true_missing_prog_output(self): 196 """Verify handling of output capturing w/missing progs.""" 197 with self.assertRaises(rh.utils.CalledProcessError) as e: 198 rh.utils.run(['./!~a/b/c/d/'], check=True, capture_output=True) 199 err = e.exception 200 self.assertNotEqual(0, err.returncode) 201 self.assertIn('a/b/c/d', str(err)) 202 203 def test_check_false_missing_prog_output(self): 204 """Verify handling of output capturing w/missing progs.""" 205 ret = rh.utils.run(['./!~a/b/c/d/'], check=False, capture_output=True) 206 self.assertNotEqual(0, ret.returncode) 207 self.assertIn('a/b/c/d', str(ret)) 208 209 def test_check_false_missing_prog_combined_output(self): 210 """Verify handling of combined output capturing w/missing progs.""" 211 with self.assertRaises(rh.utils.CalledProcessError) as e: 212 rh.utils.run(['./!~a/b/c/d/'], check=True, 213 combine_stdout_stderr=True) 214 err = e.exception 215 self.assertNotEqual(0, err.returncode) 216 self.assertIn('a/b/c/d', str(err)) 217 218 219if __name__ == '__main__': 220 unittest.main() 221