1#
2# Copyright 2008 Google Inc. All Rights Reserved.
3
4"""Test for cli."""
5
6import unittest, os, sys, StringIO
7
8import common
9from autotest_lib.cli import atest, topic_common, rpc
10from autotest_lib.frontend.afe import rpc_client_lib
11from autotest_lib.frontend.afe.json_rpc import proxy
12from autotest_lib.client.common_lib.test_utils import mock
13from autotest_lib.client.common_lib import autotemp
14
15CLI_USING_PDB = False
16CLI_UT_DEBUG = False
17
18def create_file(content):
19    file_temp = autotemp.tempfile(unique_id='cli_mock', text=True)
20    os.write(file_temp.fd, content)
21    return file_temp
22
23
24class ExitException(Exception):
25    pass
26
27
28class cli_unittest(unittest.TestCase):
29    def setUp(self):
30        super(cli_unittest, self).setUp()
31        self.god = mock.mock_god(debug=CLI_UT_DEBUG, ut=self)
32        self.god.stub_class_method(rpc.afe_comm, 'run')
33        self.god.stub_function(sys, 'exit')
34
35        def stub_authorization_headers(*args, **kwargs):
36            return {}
37        self.god.stub_with(rpc_client_lib, 'authorization_headers',
38                           stub_authorization_headers)
39
40
41    def tearDown(self):
42        super(cli_unittest, self).tearDown()
43        self.god.unstub_all()
44
45
46    def assertEqualNoOrder(self, x, y, message=None):
47        self.assertEqual(set(x), set(y), message)
48
49
50    def assertWords(self, string, to_find=[], not_in=[]):
51        for word in to_find:
52            self.assert_(string.find(word) >= 0,
53                         "Could not find '%s' in: %s" % (word, string))
54        for word in not_in:
55            self.assert_(string.find(word) < 0,
56                         "Found (and shouldn't have) '%s' in: %s" % (word,
57                                                                     string))
58
59
60    def _check_output(self, out='', out_words_ok=[], out_words_no=[],
61                      err='', err_words_ok=[], err_words_no=[]):
62        if out_words_ok or out_words_no:
63            self.assertWords(out, out_words_ok, out_words_no)
64        else:
65            self.assertEqual('', out)
66
67        if err_words_ok or err_words_no:
68            self.assertWords(err, err_words_ok, err_words_no)
69        else:
70            self.assertEqual('', err)
71
72
73    def assertOutput(self, obj, results,
74                     out_words_ok=[], out_words_no=[],
75                     err_words_ok=[], err_words_no=[]):
76        self.god.mock_io()
77        obj.output(results)
78        obj.show_all_failures()
79        (out, err) = self.god.unmock_io()
80        self._check_output(out, out_words_ok, out_words_no,
81                           err, err_words_ok, err_words_no)
82
83
84    def mock_rpcs(self, rpcs):
85        """rpcs is a list of tuples, each representing one RPC:
86        (op, **dargs, success, expected)"""
87        for (op, dargs, success, expected) in rpcs:
88            comm = rpc.afe_comm.run
89            if success:
90                comm.expect_call(op, **dargs).and_return(expected)
91            else:
92                comm.expect_call(op, **dargs).and_raises(proxy.JSONRPCException(expected))
93
94
95
96    def run_cmd(self, argv, rpcs=[], exit_code=None,
97                out_words_ok=[], out_words_no=[],
98                err_words_ok=[], err_words_no=[]):
99        """Runs the command in argv.
100        rpcs is a list of tuples, each representing one RPC:
101             (op, **dargs, success, expected)
102        exit_code should be set if you expect the command
103        to fail
104        The words are lists of words that are expected"""
105        sys.argv = argv
106
107        self.mock_rpcs(rpcs)
108
109        if not (CLI_USING_PDB and CLI_UT_DEBUG):
110            self.god.mock_io()
111        if exit_code is not None:
112            sys.exit.expect_call(exit_code).and_raises(ExitException)
113            self.assertRaises(ExitException, atest.main)
114        else:
115            atest.main()
116        (out, err) = self.god.unmock_io()
117        self.god.check_playback()
118        self._check_output(out, out_words_ok, out_words_no,
119                           err, err_words_ok, err_words_no)
120        return (out, err)
121