1#!/usr/bin/env python 2# Copyright 2013 The Chromium Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Tests for the cmd_helper module.""" 6 7import unittest 8import subprocess 9import sys 10import time 11 12from devil import devil_env 13from devil.utils import cmd_helper 14 15with devil_env.SysPath(devil_env.PYMOCK_PATH): 16 import mock # pylint: disable=import-error 17 18 19class CmdHelperSingleQuoteTest(unittest.TestCase): 20 def testSingleQuote_basic(self): 21 self.assertEquals('hello', cmd_helper.SingleQuote('hello')) 22 23 def testSingleQuote_withSpaces(self): 24 self.assertEquals("'hello world'", cmd_helper.SingleQuote('hello world')) 25 26 def testSingleQuote_withUnsafeChars(self): 27 self.assertEquals("""'hello'"'"'; rm -rf /'""", 28 cmd_helper.SingleQuote("hello'; rm -rf /")) 29 30 def testSingleQuote_dontExpand(self): 31 test_string = 'hello $TEST_VAR' 32 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string) 33 self.assertEquals(test_string, 34 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 35 36class CmdHelperGetCmdStatusAndOutputTest(unittest.TestCase): 37 def testGetCmdStatusAndOutput_success(self): 38 cmd = 'echo "Hello World"' 39 status, output = cmd_helper.GetCmdStatusAndOutput(cmd, shell=True) 40 self.assertEqual(status, 0) 41 self.assertEqual(output.rstrip(), "Hello World") 42 43 def testGetCmdStatusAndOutput_unicode(self): 44 # pylint: disable=no-self-use 45 cmd = 'echo "\x80\x31Hello World\n"' 46 cmd_helper.GetCmdStatusAndOutput(cmd, shell=True) 47 48class CmdHelperDoubleQuoteTest(unittest.TestCase): 49 def testDoubleQuote_basic(self): 50 self.assertEquals('hello', cmd_helper.DoubleQuote('hello')) 51 52 def testDoubleQuote_withSpaces(self): 53 self.assertEquals('"hello world"', cmd_helper.DoubleQuote('hello world')) 54 55 def testDoubleQuote_withUnsafeChars(self): 56 self.assertEquals('''"hello\\"; rm -rf /"''', 57 cmd_helper.DoubleQuote('hello"; rm -rf /')) 58 59 def testSingleQuote_doExpand(self): 60 test_string = 'hello $TEST_VAR' 61 cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string) 62 self.assertEquals('hello world', 63 cmd_helper.GetCmdOutput(cmd, shell=True).rstrip()) 64 65 66class CmdHelperShinkToSnippetTest(unittest.TestCase): 67 def testShrinkToSnippet_noArgs(self): 68 self.assertEquals('foo', cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar')) 69 self.assertEquals("'foo foo'", 70 cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar')) 71 self.assertEquals('"$a"\' bar\'', 72 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo')) 73 self.assertEquals('\'foo \'"$a"', 74 cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar')) 75 self.assertEquals('foo"$a"', 76 cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar')) 77 78 def testShrinkToSnippet_singleArg(self): 79 self.assertEquals("foo ''", 80 cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar')) 81 self.assertEquals("foo foo", 82 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar')) 83 self.assertEquals('"$a" "$a"', 84 cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo')) 85 self.assertEquals('foo "$a""$a"', 86 cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar')) 87 self.assertEquals( 88 'foo "$a"\' \'"$a"', 89 cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar')) 90 self.assertEquals( 91 'foo "$a""$a"\' \'', 92 cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar')) 93 self.assertEquals( 94 'foo \' \'"$a""$a"\' \'', 95 cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar')) 96 97 98_DEFAULT = 'DEFAULT' 99 100 101class _ProcessOutputEvent(object): 102 def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT): 103 self.select_fds = select_fds 104 self.read_contents = read_contents 105 self.ts = ts 106 107 108class _MockProcess(object): 109 def __init__(self, output_sequence=None, return_value=0): 110 111 # Arbitrary. 112 fake_stdout_fileno = 25 113 114 self.mock_proc = mock.MagicMock(spec=subprocess.Popen) 115 self.mock_proc.stdout = mock.MagicMock() 116 self.mock_proc.stdout.fileno = mock.MagicMock( 117 return_value=fake_stdout_fileno) 118 self.mock_proc.returncode = None 119 120 self._return_value = return_value 121 122 # This links the behavior of os.read, select.select, time.time, and 123 # <process>.poll. The output sequence can be thought of as a list of 124 # return values for select.select with corresponding return values for 125 # the other calls at any time between that select call and the following 126 # one. We iterate through the sequence only on calls to select.select. 127 # 128 # os.read is a special case, though, where we only return a given chunk 129 # of data *once* after a given call to select. 130 131 if not output_sequence: 132 output_sequence = [] 133 134 # Use an leading element to make the iteration logic work. 135 initial_seq_element = _ProcessOutputEvent( 136 _DEFAULT, '', output_sequence[0].ts if output_sequence else _DEFAULT) 137 output_sequence.insert(0, initial_seq_element) 138 139 for o in output_sequence: 140 if o.select_fds == _DEFAULT: 141 if o.read_contents is None: 142 o.select_fds = [] 143 else: 144 o.select_fds = [fake_stdout_fileno] 145 if o.ts == _DEFAULT: 146 o.ts = time.time() 147 self._output_sequence = output_sequence 148 149 self._output_seq_index = 0 150 self._read_flags = [False] * len(output_sequence) 151 152 def read_side_effect(*_args, **_kwargs): 153 if self._read_flags[self._output_seq_index]: 154 return None 155 self._read_flags[self._output_seq_index] = True 156 return self._output_sequence[self._output_seq_index].read_contents 157 158 def select_side_effect(*_args, **_kwargs): 159 if self._output_seq_index is None: 160 self._output_seq_index = 0 161 else: 162 self._output_seq_index += 1 163 if self._output_seq_index < len(self._output_sequence): 164 return (self._output_sequence[self._output_seq_index].select_fds, None, 165 None) 166 else: 167 return ([], None, None) 168 169 def time_side_effect(*_args, **_kwargs): 170 return self._output_sequence[self._output_seq_index].ts 171 172 def poll_side_effect(*_args, **_kwargs): 173 if self._output_seq_index >= len(self._output_sequence) - 1: 174 self.mock_proc.returncode = self._return_value 175 return self.mock_proc.returncode 176 177 mock_read = mock.MagicMock(side_effect=read_side_effect) 178 mock_select = mock.MagicMock(side_effect=select_side_effect) 179 mock_time = mock.MagicMock(side_effect=time_side_effect) 180 self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect) 181 182 # Set up but *do not start* the mocks. 183 self._mocks = [ 184 mock.patch('os.read', new=mock_read), 185 mock.patch('select.select', new=mock_select), 186 mock.patch('time.time', new=mock_time), 187 ] 188 if sys.platform != 'win32': 189 self._mocks.append(mock.patch('fcntl.fcntl')) 190 191 def __enter__(self): 192 for m in self._mocks: 193 m.__enter__() 194 return self.mock_proc 195 196 def __exit__(self, exc_type, exc_val, exc_tb): 197 for m in reversed(self._mocks): 198 m.__exit__(exc_type, exc_val, exc_tb) 199 200 201class CmdHelperIterCmdOutputLinesTest(unittest.TestCase): 202 """Test IterCmdOutputLines with some calls to the unix 'seq' command.""" 203 204 # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it 205 # can mock the process. 206 # pylint: disable=protected-access 207 208 _SIMPLE_OUTPUT_SEQUENCE = [ 209 _ProcessOutputEvent(read_contents=b'1\n2\n'), 210 ] 211 212 def testIterCmdOutputLines_success(self): 213 with _MockProcess( 214 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc: 215 for num, line in enumerate( 216 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 217 self.assertEquals(num, int(line)) 218 219 def testIterCmdOutputLines_unicode(self): 220 output_sequence = [ 221 _ProcessOutputEvent(read_contents=b'\x80\x31\nHello\n\xE2\x98\xA0') 222 ] 223 with _MockProcess(output_sequence=output_sequence) as mock_proc: 224 lines = list(cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc')) 225 self.assertEquals(lines[1], "Hello") 226 227 def testIterCmdOutputLines_exitStatusFail(self): 228 with self.assertRaises(subprocess.CalledProcessError): 229 with _MockProcess( 230 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 231 return_value=1) as mock_proc: 232 for num, line in enumerate( 233 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 234 self.assertEquals(num, int(line)) 235 # after reading all the output we get an exit status of 1 236 237 def testIterCmdOutputLines_exitStatusIgnored(self): 238 with _MockProcess( 239 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 240 return_value=1) as mock_proc: 241 for num, line in enumerate( 242 cmd_helper._IterCmdOutputLines( 243 mock_proc, 'mock_proc', check_status=False), 1): 244 self.assertEquals(num, int(line)) 245 246 def testIterCmdOutputLines_exitStatusSkipped(self): 247 with _MockProcess( 248 output_sequence=self._SIMPLE_OUTPUT_SEQUENCE, 249 return_value=1) as mock_proc: 250 for num, line in enumerate( 251 cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1): 252 self.assertEquals(num, int(line)) 253 # no exception will be raised because we don't attempt to read past 254 # the end of the output and, thus, the status never gets checked 255 if num == 2: 256 break 257 258 def testIterCmdOutputLines_delay(self): 259 output_sequence = [ 260 _ProcessOutputEvent(read_contents=b'1\n2\n', ts=1), 261 _ProcessOutputEvent(read_contents=None, ts=2), 262 _ProcessOutputEvent(read_contents=b'Awake', ts=10), 263 ] 264 with _MockProcess(output_sequence=output_sequence) as mock_proc: 265 for num, line in enumerate( 266 cmd_helper._IterCmdOutputLines( 267 mock_proc, 'mock_proc', iter_timeout=5), 1): 268 if num <= 2: 269 self.assertEquals(num, int(line)) 270 elif num == 3: 271 self.assertEquals(None, line) 272 elif num == 4: 273 self.assertEquals('Awake', line) 274 else: 275 self.fail() 276 277 278if __name__ == '__main__': 279 unittest.main() 280