1#!/usr/bin/python2
2
3from __future__ import absolute_import
4from __future__ import division
5from __future__ import print_function
6import logging, os, select, six, subprocess, sys, unittest
7import common
8from autotest_lib.client.common_lib import logging_manager, logging_config
9
10
11class PipedStringIO(object):
12    """
13    Like StringIO, but all I/O passes through a pipe.  This means a
14    PipedStringIO is backed by a file descriptor is thus can do things like
15    pass down to a subprocess.  However, this means the creating process must
16    call read_pipe() (or the classmethod read_all_pipes()) periodically to read
17    the pipe, and must call close() (or the classmethod cleanup()) to close the
18    pipe.
19    """
20    _instances = set()
21
22    def __init__(self):
23        self._string_io = six.StringIO()
24        self._read_end, self._write_end = os.pipe()
25        PipedStringIO._instances.add(self)
26
27
28    def close(self):
29        self._string_io.close()
30        os.close(self._read_end)
31        os.close(self._write_end)
32        PipedStringIO._instances.remove(self)
33
34
35    def write(self, data):
36        os.write(self._write_end, data)
37
38
39    def flush(self):
40        pass
41
42
43    def fileno(self):
44        return self._write_end
45
46
47    def getvalue(self):
48        self.read_pipe()
49        return self._string_io.getvalue()
50
51
52    def read_pipe(self):
53        while True:
54            read_list, _, _ = select.select([self._read_end], [], [], 0)
55            if not read_list:
56                return
57            if six.PY2:
58                self._string_io.write(os.read(self._read_end, 1024))
59            elif six.PY3:
60                self._string_io.write(os.read(self._read_end, 1024).decode('utf-8'))
61
62
63    @classmethod
64    def read_all_pipes(cls):
65        for instance in cls._instances:
66            instance.read_pipe()
67
68
69    @classmethod
70    def cleanup_all_instances(cls):
71        for instance in list(cls._instances):
72            instance.close()
73
74
75LOGGING_FORMAT = '%(levelname)s: %(message)s'
76
77_EXPECTED_STDOUT = """\
78print 1
79system 1
80INFO: logging 1
81INFO: print 2
82INFO: system 2
83INFO: logging 2
84INFO: print 6
85INFO: system 6
86INFO: logging 6
87print 7
88system 7
89INFO: logging 7
90"""
91
92_EXPECTED_LOG1 = """\
93INFO: print 3
94INFO: system 3
95INFO: logging 3
96INFO: print 4
97INFO: system 4
98INFO: logging 4
99INFO: print 5
100INFO: system 5
101INFO: logging 5
102"""
103
104_EXPECTED_LOG2 = """\
105INFO: print 4
106INFO: system 4
107INFO: logging 4
108"""
109
110
111class DummyLoggingConfig(logging_config.LoggingConfig):
112    console_formatter = logging.Formatter(LOGGING_FORMAT)
113
114    def __init__(self):
115        super(DummyLoggingConfig, self).__init__()
116        self.log = PipedStringIO()
117
118
119    def add_debug_file_handlers(self, log_dir, log_name=None):
120        self.logger.addHandler(logging.StreamHandler(self.log))
121
122
123# this isn't really a unit test since it creates subprocesses and pipes and all
124# that. i just use the unittest framework because it's convenient.
125class LoggingManagerTest(unittest.TestCase):
126    def setUp(self):
127        self.stdout = PipedStringIO()
128        self._log1 = PipedStringIO()
129        self._log2 = PipedStringIO()
130
131        self._real_system_calls = False
132
133        # the LoggingManager will change self.stdout (by design), so keep a
134        # copy around
135        self._original_stdout = self.stdout
136
137        # clear out import-time logging config and reconfigure
138        root_logger = logging.getLogger()
139        for handler in list(root_logger.handlers):
140            root_logger.removeHandler(handler)
141        # use INFO to ignore debug output from logging_manager itself
142        logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT,
143                            stream=self.stdout)
144
145        self._config_object = DummyLoggingConfig()
146        logging_manager.LoggingManager.logging_config_object = (
147                self._config_object)
148
149
150    def tearDown(self):
151        PipedStringIO.cleanup_all_instances()
152
153
154    def _say(self, suffix):
155        print('print %s' % suffix, file=self.stdout)
156        if self._real_system_calls:
157            os.system('echo system %s >&%s' % (suffix,
158                                               self._original_stdout.fileno()))
159        else:
160            print('system %s' % suffix, file=self.stdout)
161        logging.info('logging %s', suffix)
162        PipedStringIO.read_all_pipes()
163
164
165    def _setup_manager(self, manager_class=logging_manager.LoggingManager):
166        def set_stdout(file_object):
167            self.stdout = file_object
168        manager = manager_class()
169        manager.manage_stream(self.stdout, logging.INFO, set_stdout)
170        return manager
171
172
173    def _run_test(self, manager_class):
174        manager = self._setup_manager(manager_class)
175
176        self._say(1)
177
178        manager.start_logging()
179        self._say(2)
180
181        manager.redirect_to_stream(self._log1)
182        self._say(3)
183
184        manager.tee_redirect_to_stream(self._log2)
185        self._say(4)
186
187        manager.undo_redirect()
188        self._say(5)
189
190        manager.undo_redirect()
191        self._say(6)
192
193        manager.stop_logging()
194        self._say(7)
195
196
197    def _grab_fd_info(self):
198        command = 'ls -l /proc/%s/fd' % os.getpid()
199        proc = subprocess.Popen(command.split(), shell=True,
200                                stdout=subprocess.PIPE)
201        return proc.communicate()[0]
202
203
204    def _compare_logs(self, log_buffer, expected_text):
205        actual_lines = log_buffer.getvalue().splitlines()
206        expected_lines = expected_text.splitlines()
207        if self._real_system_calls:
208            # because of the many interacting processes, we can't ensure perfect
209            # interleaving.  so compare sets of lines rather than ordered lines.
210            actual_lines = set(actual_lines)
211            expected_lines = set(expected_lines)
212        self.assertEquals(actual_lines, expected_lines)
213
214
215    def _check_results(self):
216        # ensure our stdout was restored
217        self.assertEquals(self.stdout, self._original_stdout)
218
219        if self._real_system_calls:
220            # ensure FDs were left in their original state
221            self.assertEquals(self._grab_fd_info(), self._fd_info)
222
223        self._compare_logs(self.stdout, _EXPECTED_STDOUT)
224        self._compare_logs(self._log1, _EXPECTED_LOG1)
225        self._compare_logs(self._log2, _EXPECTED_LOG2)
226
227
228    def test_logging_manager(self):
229        self._run_test(logging_manager.LoggingManager)
230        self._check_results()
231
232
233    def test_fd_redirection_logging_manager(self):
234        self._real_system_calls = True
235        self._fd_info = self._grab_fd_info()
236        self._run_test(logging_manager.FdRedirectionLoggingManager)
237        self._check_results()
238
239
240    def test_tee_redirect_debug_dir(self):
241        manager = self._setup_manager()
242        manager.start_logging()
243
244        manager.tee_redirect_debug_dir('/fake/dir', tag='mytag')
245        print('hello', file=self.stdout)
246
247        manager.undo_redirect()
248        print('goodbye', file=self.stdout)
249
250        manager.stop_logging()
251
252        self._compare_logs(self.stdout,
253                           'INFO: mytag : hello\nINFO: goodbye')
254        self._compare_logs(self._config_object.log, 'hello\n')
255
256
257class MonkeyPatchTestCase(unittest.TestCase):
258    def setUp(self):
259        filename = os.path.split(__file__)[1]
260        if filename.endswith('.pyc'):
261            filename = filename[:-1]
262        self.expected_filename = filename
263
264
265    def check_filename(self, filename, expected=None):
266        if expected is None:
267            expected = [self.expected_filename]
268        self.assertIn(os.path.split(filename)[1], expected)
269
270
271    def _0_test_find_caller(self):
272        finder = logging_manager._logging_manager_aware_logger__find_caller
273        filename, lineno, caller_name = finder(logging_manager.logger)
274        self.check_filename(filename)
275        self.assertEquals('test_find_caller', caller_name)
276
277
278    def _1_test_find_caller(self):
279        self._0_test_find_caller()
280
281
282    def test_find_caller(self):
283        self._1_test_find_caller()
284
285
286    def _0_test_non_reported_find_caller(self):
287        finder = logging_manager._logging_manager_aware_logger__find_caller
288        filename, lineno, caller_name = finder(logging_manager.logger)
289        # Python 2.4 unittest implementation will call the unittest method in
290        # file 'unittest.py', and Python >= 2.6 does the same in 'case.py'
291        self.check_filename(filename, expected=['unittest.py', 'case.py'])
292
293
294    def _1_test_non_reported_find_caller(self):
295        self._0_test_non_reported_find_caller()
296
297
298    @logging_manager.do_not_report_as_logging_caller
299    def test_non_reported_find_caller(self):
300        self._1_test_non_reported_find_caller()
301
302
303
304if __name__ == '__main__':
305    unittest.main()
306