1# 2# Copyright 2008 Google Inc. All Rights Reserved. 3 4"""Test for cli.""" 5 6import unittest, os, sys, StringIO 7 8import common 9from autotest_lib.cli import atest, topic_common, rpc 10from autotest_lib.frontend.afe import rpc_client_lib 11from autotest_lib.frontend.afe.json_rpc import proxy 12from autotest_lib.client.common_lib.test_utils import mock 13from autotest_lib.client.common_lib import autotemp 14 15CLI_USING_PDB = False 16CLI_UT_DEBUG = False 17 18def create_file(content): 19 file_temp = autotemp.tempfile(unique_id='cli_mock', text=True) 20 os.write(file_temp.fd, content) 21 return file_temp 22 23 24class ExitException(Exception): 25 pass 26 27 28class cli_unittest(unittest.TestCase): 29 def setUp(self): 30 super(cli_unittest, self).setUp() 31 self.god = mock.mock_god(debug=CLI_UT_DEBUG, ut=self) 32 self.god.stub_class_method(rpc.afe_comm, 'run') 33 self.god.stub_function(sys, 'exit') 34 35 def stub_authorization_headers(*args, **kwargs): 36 return {} 37 self.god.stub_with(rpc_client_lib, 'authorization_headers', 38 stub_authorization_headers) 39 40 41 def tearDown(self): 42 super(cli_unittest, self).tearDown() 43 self.god.unstub_all() 44 45 46 def assertEqualNoOrder(self, x, y, message=None): 47 self.assertEqual(set(x), set(y), message) 48 49 50 def assertWords(self, string, to_find=[], not_in=[]): 51 for word in to_find: 52 self.assert_(string.find(word) >= 0, 53 "Could not find '%s' in: %s" % (word, string)) 54 for word in not_in: 55 self.assert_(string.find(word) < 0, 56 "Found (and shouldn't have) '%s' in: %s" % (word, 57 string)) 58 59 60 def _check_output(self, out='', out_words_ok=[], out_words_no=[], 61 err='', err_words_ok=[], err_words_no=[]): 62 if out_words_ok or out_words_no: 63 self.assertWords(out, out_words_ok, out_words_no) 64 else: 65 self.assertEqual('', out) 66 67 if err_words_ok or err_words_no: 68 self.assertWords(err, err_words_ok, err_words_no) 69 else: 70 self.assertEqual('', err) 71 72 73 def assertOutput(self, obj, results, 74 out_words_ok=[], out_words_no=[], 75 err_words_ok=[], err_words_no=[]): 76 self.god.mock_io() 77 obj.output(results) 78 obj.show_all_failures() 79 (out, err) = self.god.unmock_io() 80 self._check_output(out, out_words_ok, out_words_no, 81 err, err_words_ok, err_words_no) 82 83 84 def mock_rpcs(self, rpcs): 85 """rpcs is a list of tuples, each representing one RPC: 86 (op, **dargs, success, expected)""" 87 for (op, dargs, success, expected) in rpcs: 88 comm = rpc.afe_comm.run 89 if success: 90 comm.expect_call(op, **dargs).and_return(expected) 91 else: 92 comm.expect_call(op, **dargs).and_raises(proxy.JSONRPCException(expected)) 93 94 95 96 def run_cmd(self, argv, rpcs=[], exit_code=None, 97 out_words_ok=[], out_words_no=[], 98 err_words_ok=[], err_words_no=[]): 99 """Runs the command in argv. 100 rpcs is a list of tuples, each representing one RPC: 101 (op, **dargs, success, expected) 102 exit_code should be set if you expect the command 103 to fail 104 The words are lists of words that are expected""" 105 sys.argv = argv 106 107 self.mock_rpcs(rpcs) 108 109 if not (CLI_USING_PDB and CLI_UT_DEBUG): 110 self.god.mock_io() 111 if exit_code is not None: 112 sys.exit.expect_call(exit_code).and_raises(ExitException) 113 self.assertRaises(ExitException, atest.main) 114 else: 115 atest.main() 116 (out, err) = self.god.unmock_io() 117 self.god.check_playback() 118 self._check_output(out, out_words_ok, out_words_no, 119 err, err_words_ok, err_words_no) 120 return (out, err) 121