1#!/usr/bin/env python
2# Copyright 2013 The Chromium Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Tests for the cmd_helper module."""
6
7import unittest
8import subprocess
9import sys
10import time
11
12from devil import devil_env
13from devil.utils import cmd_helper
14
15with devil_env.SysPath(devil_env.PYMOCK_PATH):
16  import mock  # pylint: disable=import-error
17
18
19class CmdHelperSingleQuoteTest(unittest.TestCase):
20  def testSingleQuote_basic(self):
21    self.assertEquals('hello', cmd_helper.SingleQuote('hello'))
22
23  def testSingleQuote_withSpaces(self):
24    self.assertEquals("'hello world'", cmd_helper.SingleQuote('hello world'))
25
26  def testSingleQuote_withUnsafeChars(self):
27    self.assertEquals("""'hello'"'"'; rm -rf /'""",
28                      cmd_helper.SingleQuote("hello'; rm -rf /"))
29
30  def testSingleQuote_dontExpand(self):
31    test_string = 'hello $TEST_VAR'
32    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
33    self.assertEquals(test_string,
34                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
35
36class CmdHelperGetCmdStatusAndOutputTest(unittest.TestCase):
37  def testGetCmdStatusAndOutput_success(self):
38    cmd = 'echo "Hello World"'
39    status, output = cmd_helper.GetCmdStatusAndOutput(cmd, shell=True)
40    self.assertEqual(status, 0)
41    self.assertEqual(output.rstrip(), "Hello World")
42
43  def testGetCmdStatusAndOutput_unicode(self):
44    # pylint: disable=no-self-use
45    cmd = 'echo "\x80\x31Hello World\n"'
46    cmd_helper.GetCmdStatusAndOutput(cmd, shell=True)
47
48class CmdHelperDoubleQuoteTest(unittest.TestCase):
49  def testDoubleQuote_basic(self):
50    self.assertEquals('hello', cmd_helper.DoubleQuote('hello'))
51
52  def testDoubleQuote_withSpaces(self):
53    self.assertEquals('"hello world"', cmd_helper.DoubleQuote('hello world'))
54
55  def testDoubleQuote_withUnsafeChars(self):
56    self.assertEquals('''"hello\\"; rm -rf /"''',
57                      cmd_helper.DoubleQuote('hello"; rm -rf /'))
58
59  def testSingleQuote_doExpand(self):
60    test_string = 'hello $TEST_VAR'
61    cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
62    self.assertEquals('hello world',
63                      cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
64
65
66class CmdHelperShinkToSnippetTest(unittest.TestCase):
67  def testShrinkToSnippet_noArgs(self):
68    self.assertEquals('foo', cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
69    self.assertEquals("'foo foo'",
70                      cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
71    self.assertEquals('"$a"\' bar\'',
72                      cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
73    self.assertEquals('\'foo \'"$a"',
74                      cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
75    self.assertEquals('foo"$a"',
76                      cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
77
78  def testShrinkToSnippet_singleArg(self):
79    self.assertEquals("foo ''",
80                      cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
81    self.assertEquals("foo foo",
82                      cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
83    self.assertEquals('"$a" "$a"',
84                      cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
85    self.assertEquals('foo "$a""$a"',
86                      cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
87    self.assertEquals(
88        'foo "$a"\' \'"$a"',
89        cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
90    self.assertEquals(
91        'foo "$a""$a"\' \'',
92        cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
93    self.assertEquals(
94        'foo \' \'"$a""$a"\' \'',
95        cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
96
97
98_DEFAULT = 'DEFAULT'
99
100
101class _ProcessOutputEvent(object):
102  def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
103    self.select_fds = select_fds
104    self.read_contents = read_contents
105    self.ts = ts
106
107
108class _MockProcess(object):
109  def __init__(self, output_sequence=None, return_value=0):
110
111    # Arbitrary.
112    fake_stdout_fileno = 25
113
114    self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
115    self.mock_proc.stdout = mock.MagicMock()
116    self.mock_proc.stdout.fileno = mock.MagicMock(
117        return_value=fake_stdout_fileno)
118    self.mock_proc.returncode = None
119
120    self._return_value = return_value
121
122    # This links the behavior of os.read, select.select, time.time, and
123    # <process>.poll. The output sequence can be thought of as a list of
124    # return values for select.select with corresponding return values for
125    # the other calls at any time between that select call and the following
126    # one. We iterate through the sequence only on calls to select.select.
127    #
128    # os.read is a special case, though, where we only return a given chunk
129    # of data *once* after a given call to select.
130
131    if not output_sequence:
132      output_sequence = []
133
134    # Use an leading element to make the iteration logic work.
135    initial_seq_element = _ProcessOutputEvent(
136        _DEFAULT, '', output_sequence[0].ts if output_sequence else _DEFAULT)
137    output_sequence.insert(0, initial_seq_element)
138
139    for o in output_sequence:
140      if o.select_fds == _DEFAULT:
141        if o.read_contents is None:
142          o.select_fds = []
143        else:
144          o.select_fds = [fake_stdout_fileno]
145      if o.ts == _DEFAULT:
146        o.ts = time.time()
147    self._output_sequence = output_sequence
148
149    self._output_seq_index = 0
150    self._read_flags = [False] * len(output_sequence)
151
152    def read_side_effect(*_args, **_kwargs):
153      if self._read_flags[self._output_seq_index]:
154        return None
155      self._read_flags[self._output_seq_index] = True
156      return self._output_sequence[self._output_seq_index].read_contents
157
158    def select_side_effect(*_args, **_kwargs):
159      if self._output_seq_index is None:
160        self._output_seq_index = 0
161      else:
162        self._output_seq_index += 1
163      if self._output_seq_index < len(self._output_sequence):
164        return (self._output_sequence[self._output_seq_index].select_fds, None,
165                None)
166      else:
167        return ([], None, None)
168
169    def time_side_effect(*_args, **_kwargs):
170      return self._output_sequence[self._output_seq_index].ts
171
172    def poll_side_effect(*_args, **_kwargs):
173      if self._output_seq_index >= len(self._output_sequence) - 1:
174        self.mock_proc.returncode = self._return_value
175      return self.mock_proc.returncode
176
177    mock_read = mock.MagicMock(side_effect=read_side_effect)
178    mock_select = mock.MagicMock(side_effect=select_side_effect)
179    mock_time = mock.MagicMock(side_effect=time_side_effect)
180    self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
181
182    # Set up but *do not start* the mocks.
183    self._mocks = [
184        mock.patch('os.read', new=mock_read),
185        mock.patch('select.select', new=mock_select),
186        mock.patch('time.time', new=mock_time),
187    ]
188    if sys.platform != 'win32':
189      self._mocks.append(mock.patch('fcntl.fcntl'))
190
191  def __enter__(self):
192    for m in self._mocks:
193      m.__enter__()
194    return self.mock_proc
195
196  def __exit__(self, exc_type, exc_val, exc_tb):
197    for m in reversed(self._mocks):
198      m.__exit__(exc_type, exc_val, exc_tb)
199
200
201class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
202  """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
203
204  # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
205  # can mock the process.
206  # pylint: disable=protected-access
207
208  _SIMPLE_OUTPUT_SEQUENCE = [
209      _ProcessOutputEvent(read_contents=b'1\n2\n'),
210  ]
211
212  def testIterCmdOutputLines_success(self):
213    with _MockProcess(
214        output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
215      for num, line in enumerate(
216          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
217        self.assertEquals(num, int(line))
218
219  def testIterCmdOutputLines_unicode(self):
220    output_sequence = [
221        _ProcessOutputEvent(read_contents=b'\x80\x31\nHello\n\xE2\x98\xA0')
222    ]
223    with _MockProcess(output_sequence=output_sequence) as mock_proc:
224      lines = list(cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'))
225      self.assertEquals(lines[1], "Hello")
226
227  def testIterCmdOutputLines_exitStatusFail(self):
228    with self.assertRaises(subprocess.CalledProcessError):
229      with _MockProcess(
230          output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
231          return_value=1) as mock_proc:
232        for num, line in enumerate(
233            cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
234          self.assertEquals(num, int(line))
235        # after reading all the output we get an exit status of 1
236
237  def testIterCmdOutputLines_exitStatusIgnored(self):
238    with _MockProcess(
239        output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
240        return_value=1) as mock_proc:
241      for num, line in enumerate(
242          cmd_helper._IterCmdOutputLines(
243              mock_proc, 'mock_proc', check_status=False), 1):
244        self.assertEquals(num, int(line))
245
246  def testIterCmdOutputLines_exitStatusSkipped(self):
247    with _MockProcess(
248        output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
249        return_value=1) as mock_proc:
250      for num, line in enumerate(
251          cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
252        self.assertEquals(num, int(line))
253        # no exception will be raised because we don't attempt to read past
254        # the end of the output and, thus, the status never gets checked
255        if num == 2:
256          break
257
258  def testIterCmdOutputLines_delay(self):
259    output_sequence = [
260        _ProcessOutputEvent(read_contents=b'1\n2\n', ts=1),
261        _ProcessOutputEvent(read_contents=None, ts=2),
262        _ProcessOutputEvent(read_contents=b'Awake', ts=10),
263    ]
264    with _MockProcess(output_sequence=output_sequence) as mock_proc:
265      for num, line in enumerate(
266          cmd_helper._IterCmdOutputLines(
267              mock_proc, 'mock_proc', iter_timeout=5), 1):
268        if num <= 2:
269          self.assertEquals(num, int(line))
270        elif num == 3:
271          self.assertEquals(None, line)
272        elif num == 4:
273          self.assertEquals('Awake', line)
274        else:
275          self.fail()
276
277
278if __name__ == '__main__':
279  unittest.main()
280