1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import datetime 29import pathlib 30import pickle 31import io 32import gc 33import json 34import os 35import queue 36import random 37import re 38import signal 39import socket 40import struct 41import sys 42import tempfile 43from test.support.script_helper import assert_python_ok 44from test import support 45import textwrap 46import threading 47import time 48import unittest 49import warnings 50import weakref 51 52import asyncore 53from http.server import HTTPServer, BaseHTTPRequestHandler 54import smtpd 55from urllib.parse import urlparse, parse_qs 56from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 57 ThreadingTCPServer, StreamRequestHandler) 58 59try: 60 import win32evtlog, win32evtlogutil, pywintypes 61except ImportError: 62 win32evtlog = win32evtlogutil = pywintypes = None 63 64try: 65 import zlib 66except ImportError: 67 pass 68 69class BaseTest(unittest.TestCase): 70 71 """Base class for logging tests.""" 72 73 log_format = "%(name)s -> %(levelname)s: %(message)s" 74 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 75 message_num = 0 76 77 def setUp(self): 78 """Setup the default logging stream to an internal StringIO instance, 79 so that we can examine log output as we want.""" 80 self._threading_key = support.threading_setup() 81 82 logger_dict = logging.getLogger().manager.loggerDict 83 logging._acquireLock() 84 try: 85 self.saved_handlers = logging._handlers.copy() 86 self.saved_handler_list = logging._handlerList[:] 87 self.saved_loggers = saved_loggers = logger_dict.copy() 88 self.saved_name_to_level = logging._nameToLevel.copy() 89 self.saved_level_to_name = logging._levelToName.copy() 90 self.logger_states = logger_states = {} 91 for name in saved_loggers: 92 logger_states[name] = getattr(saved_loggers[name], 93 'disabled', None) 94 finally: 95 logging._releaseLock() 96 97 # Set two unused loggers 98 self.logger1 = logging.getLogger("\xab\xd7\xbb") 99 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 100 101 self.root_logger = logging.getLogger("") 102 self.original_logging_level = self.root_logger.getEffectiveLevel() 103 104 self.stream = io.StringIO() 105 self.root_logger.setLevel(logging.DEBUG) 106 self.root_hdlr = logging.StreamHandler(self.stream) 107 self.root_formatter = logging.Formatter(self.log_format) 108 self.root_hdlr.setFormatter(self.root_formatter) 109 if self.logger1.hasHandlers(): 110 hlist = self.logger1.handlers + self.root_logger.handlers 111 raise AssertionError('Unexpected handlers: %s' % hlist) 112 if self.logger2.hasHandlers(): 113 hlist = self.logger2.handlers + self.root_logger.handlers 114 raise AssertionError('Unexpected handlers: %s' % hlist) 115 self.root_logger.addHandler(self.root_hdlr) 116 self.assertTrue(self.logger1.hasHandlers()) 117 self.assertTrue(self.logger2.hasHandlers()) 118 119 def tearDown(self): 120 """Remove our logging stream, and restore the original logging 121 level.""" 122 self.stream.close() 123 self.root_logger.removeHandler(self.root_hdlr) 124 while self.root_logger.handlers: 125 h = self.root_logger.handlers[0] 126 self.root_logger.removeHandler(h) 127 h.close() 128 self.root_logger.setLevel(self.original_logging_level) 129 logging._acquireLock() 130 try: 131 logging._levelToName.clear() 132 logging._levelToName.update(self.saved_level_to_name) 133 logging._nameToLevel.clear() 134 logging._nameToLevel.update(self.saved_name_to_level) 135 logging._handlers.clear() 136 logging._handlers.update(self.saved_handlers) 137 logging._handlerList[:] = self.saved_handler_list 138 manager = logging.getLogger().manager 139 manager.disable = 0 140 loggerDict = manager.loggerDict 141 loggerDict.clear() 142 loggerDict.update(self.saved_loggers) 143 logger_states = self.logger_states 144 for name in self.logger_states: 145 if logger_states[name] is not None: 146 self.saved_loggers[name].disabled = logger_states[name] 147 finally: 148 logging._releaseLock() 149 150 self.doCleanups() 151 support.threading_cleanup(*self._threading_key) 152 153 def assert_log_lines(self, expected_values, stream=None, pat=None): 154 """Match the collected log lines against the regular expression 155 self.expected_log_pat, and compare the extracted group values to 156 the expected_values list of tuples.""" 157 stream = stream or self.stream 158 pat = re.compile(pat or self.expected_log_pat) 159 actual_lines = stream.getvalue().splitlines() 160 self.assertEqual(len(actual_lines), len(expected_values)) 161 for actual, expected in zip(actual_lines, expected_values): 162 match = pat.search(actual) 163 if not match: 164 self.fail("Log line does not match expected pattern:\n" + 165 actual) 166 self.assertEqual(tuple(match.groups()), expected) 167 s = stream.read() 168 if s: 169 self.fail("Remaining output at end of log stream:\n" + s) 170 171 def next_message(self): 172 """Generate a message consisting solely of an auto-incrementing 173 integer.""" 174 self.message_num += 1 175 return "%d" % self.message_num 176 177 178class BuiltinLevelsTest(BaseTest): 179 """Test builtin levels and their inheritance.""" 180 181 def test_flat(self): 182 # Logging levels in a flat logger namespace. 183 m = self.next_message 184 185 ERR = logging.getLogger("ERR") 186 ERR.setLevel(logging.ERROR) 187 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 188 INF.setLevel(logging.INFO) 189 DEB = logging.getLogger("DEB") 190 DEB.setLevel(logging.DEBUG) 191 192 # These should log. 193 ERR.log(logging.CRITICAL, m()) 194 ERR.error(m()) 195 196 INF.log(logging.CRITICAL, m()) 197 INF.error(m()) 198 INF.warning(m()) 199 INF.info(m()) 200 201 DEB.log(logging.CRITICAL, m()) 202 DEB.error(m()) 203 DEB.warning(m()) 204 DEB.info(m()) 205 DEB.debug(m()) 206 207 # These should not log. 208 ERR.warning(m()) 209 ERR.info(m()) 210 ERR.debug(m()) 211 212 INF.debug(m()) 213 214 self.assert_log_lines([ 215 ('ERR', 'CRITICAL', '1'), 216 ('ERR', 'ERROR', '2'), 217 ('INF', 'CRITICAL', '3'), 218 ('INF', 'ERROR', '4'), 219 ('INF', 'WARNING', '5'), 220 ('INF', 'INFO', '6'), 221 ('DEB', 'CRITICAL', '7'), 222 ('DEB', 'ERROR', '8'), 223 ('DEB', 'WARNING', '9'), 224 ('DEB', 'INFO', '10'), 225 ('DEB', 'DEBUG', '11'), 226 ]) 227 228 def test_nested_explicit(self): 229 # Logging levels in a nested namespace, all explicitly set. 230 m = self.next_message 231 232 INF = logging.getLogger("INF") 233 INF.setLevel(logging.INFO) 234 INF_ERR = logging.getLogger("INF.ERR") 235 INF_ERR.setLevel(logging.ERROR) 236 237 # These should log. 238 INF_ERR.log(logging.CRITICAL, m()) 239 INF_ERR.error(m()) 240 241 # These should not log. 242 INF_ERR.warning(m()) 243 INF_ERR.info(m()) 244 INF_ERR.debug(m()) 245 246 self.assert_log_lines([ 247 ('INF.ERR', 'CRITICAL', '1'), 248 ('INF.ERR', 'ERROR', '2'), 249 ]) 250 251 def test_nested_inherited(self): 252 # Logging levels in a nested namespace, inherited from parent loggers. 253 m = self.next_message 254 255 INF = logging.getLogger("INF") 256 INF.setLevel(logging.INFO) 257 INF_ERR = logging.getLogger("INF.ERR") 258 INF_ERR.setLevel(logging.ERROR) 259 INF_UNDEF = logging.getLogger("INF.UNDEF") 260 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 261 UNDEF = logging.getLogger("UNDEF") 262 263 # These should log. 264 INF_UNDEF.log(logging.CRITICAL, m()) 265 INF_UNDEF.error(m()) 266 INF_UNDEF.warning(m()) 267 INF_UNDEF.info(m()) 268 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 269 INF_ERR_UNDEF.error(m()) 270 271 # These should not log. 272 INF_UNDEF.debug(m()) 273 INF_ERR_UNDEF.warning(m()) 274 INF_ERR_UNDEF.info(m()) 275 INF_ERR_UNDEF.debug(m()) 276 277 self.assert_log_lines([ 278 ('INF.UNDEF', 'CRITICAL', '1'), 279 ('INF.UNDEF', 'ERROR', '2'), 280 ('INF.UNDEF', 'WARNING', '3'), 281 ('INF.UNDEF', 'INFO', '4'), 282 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 283 ('INF.ERR.UNDEF', 'ERROR', '6'), 284 ]) 285 286 def test_nested_with_virtual_parent(self): 287 # Logging levels when some parent does not exist yet. 288 m = self.next_message 289 290 INF = logging.getLogger("INF") 291 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 292 CHILD = logging.getLogger("INF.BADPARENT") 293 INF.setLevel(logging.INFO) 294 295 # These should log. 296 GRANDCHILD.log(logging.FATAL, m()) 297 GRANDCHILD.info(m()) 298 CHILD.log(logging.FATAL, m()) 299 CHILD.info(m()) 300 301 # These should not log. 302 GRANDCHILD.debug(m()) 303 CHILD.debug(m()) 304 305 self.assert_log_lines([ 306 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 307 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 308 ('INF.BADPARENT', 'CRITICAL', '3'), 309 ('INF.BADPARENT', 'INFO', '4'), 310 ]) 311 312 def test_regression_22386(self): 313 """See issue #22386 for more information.""" 314 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 315 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 316 317 def test_regression_29220(self): 318 """See issue #29220 for more information.""" 319 logging.addLevelName(logging.INFO, '') 320 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 321 self.assertEqual(logging.getLevelName(logging.INFO), '') 322 323 def test_issue27935(self): 324 fatal = logging.getLevelName('FATAL') 325 self.assertEqual(fatal, logging.FATAL) 326 327 def test_regression_29220(self): 328 """See issue #29220 for more information.""" 329 logging.addLevelName(logging.INFO, '') 330 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 331 self.assertEqual(logging.getLevelName(logging.INFO), '') 332 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 333 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 334 335class BasicFilterTest(BaseTest): 336 337 """Test the bundled Filter class.""" 338 339 def test_filter(self): 340 # Only messages satisfying the specified criteria pass through the 341 # filter. 342 filter_ = logging.Filter("spam.eggs") 343 handler = self.root_logger.handlers[0] 344 try: 345 handler.addFilter(filter_) 346 spam = logging.getLogger("spam") 347 spam_eggs = logging.getLogger("spam.eggs") 348 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 349 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 350 351 spam.info(self.next_message()) 352 spam_eggs.info(self.next_message()) # Good. 353 spam_eggs_fish.info(self.next_message()) # Good. 354 spam_bakedbeans.info(self.next_message()) 355 356 self.assert_log_lines([ 357 ('spam.eggs', 'INFO', '2'), 358 ('spam.eggs.fish', 'INFO', '3'), 359 ]) 360 finally: 361 handler.removeFilter(filter_) 362 363 def test_callable_filter(self): 364 # Only messages satisfying the specified criteria pass through the 365 # filter. 366 367 def filterfunc(record): 368 parts = record.name.split('.') 369 prefix = '.'.join(parts[:2]) 370 return prefix == 'spam.eggs' 371 372 handler = self.root_logger.handlers[0] 373 try: 374 handler.addFilter(filterfunc) 375 spam = logging.getLogger("spam") 376 spam_eggs = logging.getLogger("spam.eggs") 377 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 378 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 379 380 spam.info(self.next_message()) 381 spam_eggs.info(self.next_message()) # Good. 382 spam_eggs_fish.info(self.next_message()) # Good. 383 spam_bakedbeans.info(self.next_message()) 384 385 self.assert_log_lines([ 386 ('spam.eggs', 'INFO', '2'), 387 ('spam.eggs.fish', 'INFO', '3'), 388 ]) 389 finally: 390 handler.removeFilter(filterfunc) 391 392 def test_empty_filter(self): 393 f = logging.Filter() 394 r = logging.makeLogRecord({'name': 'spam.eggs'}) 395 self.assertTrue(f.filter(r)) 396 397# 398# First, we define our levels. There can be as many as you want - the only 399# limitations are that they should be integers, the lowest should be > 0 and 400# larger values mean less information being logged. If you need specific 401# level values which do not fit into these limitations, you can use a 402# mapping dictionary to convert between your application levels and the 403# logging system. 404# 405SILENT = 120 406TACITURN = 119 407TERSE = 118 408EFFUSIVE = 117 409SOCIABLE = 116 410VERBOSE = 115 411TALKATIVE = 114 412GARRULOUS = 113 413CHATTERBOX = 112 414BORING = 111 415 416LEVEL_RANGE = range(BORING, SILENT + 1) 417 418# 419# Next, we define names for our levels. You don't need to do this - in which 420# case the system will use "Level n" to denote the text for the level. 421# 422my_logging_levels = { 423 SILENT : 'Silent', 424 TACITURN : 'Taciturn', 425 TERSE : 'Terse', 426 EFFUSIVE : 'Effusive', 427 SOCIABLE : 'Sociable', 428 VERBOSE : 'Verbose', 429 TALKATIVE : 'Talkative', 430 GARRULOUS : 'Garrulous', 431 CHATTERBOX : 'Chatterbox', 432 BORING : 'Boring', 433} 434 435class GarrulousFilter(logging.Filter): 436 437 """A filter which blocks garrulous messages.""" 438 439 def filter(self, record): 440 return record.levelno != GARRULOUS 441 442class VerySpecificFilter(logging.Filter): 443 444 """A filter which blocks sociable and taciturn messages.""" 445 446 def filter(self, record): 447 return record.levelno not in [SOCIABLE, TACITURN] 448 449 450class CustomLevelsAndFiltersTest(BaseTest): 451 452 """Test various filtering possibilities with custom logging levels.""" 453 454 # Skip the logger name group. 455 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 456 457 def setUp(self): 458 BaseTest.setUp(self) 459 for k, v in my_logging_levels.items(): 460 logging.addLevelName(k, v) 461 462 def log_at_all_levels(self, logger): 463 for lvl in LEVEL_RANGE: 464 logger.log(lvl, self.next_message()) 465 466 def test_logger_filter(self): 467 # Filter at logger level. 468 self.root_logger.setLevel(VERBOSE) 469 # Levels >= 'Verbose' are good. 470 self.log_at_all_levels(self.root_logger) 471 self.assert_log_lines([ 472 ('Verbose', '5'), 473 ('Sociable', '6'), 474 ('Effusive', '7'), 475 ('Terse', '8'), 476 ('Taciturn', '9'), 477 ('Silent', '10'), 478 ]) 479 480 def test_handler_filter(self): 481 # Filter at handler level. 482 self.root_logger.handlers[0].setLevel(SOCIABLE) 483 try: 484 # Levels >= 'Sociable' are good. 485 self.log_at_all_levels(self.root_logger) 486 self.assert_log_lines([ 487 ('Sociable', '6'), 488 ('Effusive', '7'), 489 ('Terse', '8'), 490 ('Taciturn', '9'), 491 ('Silent', '10'), 492 ]) 493 finally: 494 self.root_logger.handlers[0].setLevel(logging.NOTSET) 495 496 def test_specific_filters(self): 497 # Set a specific filter object on the handler, and then add another 498 # filter object on the logger itself. 499 handler = self.root_logger.handlers[0] 500 specific_filter = None 501 garr = GarrulousFilter() 502 handler.addFilter(garr) 503 try: 504 self.log_at_all_levels(self.root_logger) 505 first_lines = [ 506 # Notice how 'Garrulous' is missing 507 ('Boring', '1'), 508 ('Chatterbox', '2'), 509 ('Talkative', '4'), 510 ('Verbose', '5'), 511 ('Sociable', '6'), 512 ('Effusive', '7'), 513 ('Terse', '8'), 514 ('Taciturn', '9'), 515 ('Silent', '10'), 516 ] 517 self.assert_log_lines(first_lines) 518 519 specific_filter = VerySpecificFilter() 520 self.root_logger.addFilter(specific_filter) 521 self.log_at_all_levels(self.root_logger) 522 self.assert_log_lines(first_lines + [ 523 # Not only 'Garrulous' is still missing, but also 'Sociable' 524 # and 'Taciturn' 525 ('Boring', '11'), 526 ('Chatterbox', '12'), 527 ('Talkative', '14'), 528 ('Verbose', '15'), 529 ('Effusive', '17'), 530 ('Terse', '18'), 531 ('Silent', '20'), 532 ]) 533 finally: 534 if specific_filter: 535 self.root_logger.removeFilter(specific_filter) 536 handler.removeFilter(garr) 537 538 539class HandlerTest(BaseTest): 540 def test_name(self): 541 h = logging.Handler() 542 h.name = 'generic' 543 self.assertEqual(h.name, 'generic') 544 h.name = 'anothergeneric' 545 self.assertEqual(h.name, 'anothergeneric') 546 self.assertRaises(NotImplementedError, h.emit, None) 547 548 def test_builtin_handlers(self): 549 # We can't actually *use* too many handlers in the tests, 550 # but we can try instantiating them with various options 551 if sys.platform in ('linux', 'darwin'): 552 for existing in (True, False): 553 fd, fn = tempfile.mkstemp() 554 os.close(fd) 555 if not existing: 556 os.unlink(fn) 557 h = logging.handlers.WatchedFileHandler(fn, delay=True) 558 if existing: 559 dev, ino = h.dev, h.ino 560 self.assertEqual(dev, -1) 561 self.assertEqual(ino, -1) 562 r = logging.makeLogRecord({'msg': 'Test'}) 563 h.handle(r) 564 # Now remove the file. 565 os.unlink(fn) 566 self.assertFalse(os.path.exists(fn)) 567 # The next call should recreate the file. 568 h.handle(r) 569 self.assertTrue(os.path.exists(fn)) 570 else: 571 self.assertEqual(h.dev, -1) 572 self.assertEqual(h.ino, -1) 573 h.close() 574 if existing: 575 os.unlink(fn) 576 if sys.platform == 'darwin': 577 sockname = '/var/run/syslog' 578 else: 579 sockname = '/dev/log' 580 try: 581 h = logging.handlers.SysLogHandler(sockname) 582 self.assertEqual(h.facility, h.LOG_USER) 583 self.assertTrue(h.unixsocket) 584 h.close() 585 except OSError: # syslogd might not be available 586 pass 587 for method in ('GET', 'POST', 'PUT'): 588 if method == 'PUT': 589 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 590 'localhost', '/log', method) 591 else: 592 h = logging.handlers.HTTPHandler('localhost', '/log', method) 593 h.close() 594 h = logging.handlers.BufferingHandler(0) 595 r = logging.makeLogRecord({}) 596 self.assertTrue(h.shouldFlush(r)) 597 h.close() 598 h = logging.handlers.BufferingHandler(1) 599 self.assertFalse(h.shouldFlush(r)) 600 h.close() 601 602 def test_path_objects(self): 603 """ 604 Test that Path objects are accepted as filename arguments to handlers. 605 606 See Issue #27493. 607 """ 608 fd, fn = tempfile.mkstemp() 609 os.close(fd) 610 os.unlink(fn) 611 pfn = pathlib.Path(fn) 612 cases = ( 613 (logging.FileHandler, (pfn, 'w')), 614 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 615 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 616 ) 617 if sys.platform in ('linux', 'darwin'): 618 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 619 for cls, args in cases: 620 h = cls(*args) 621 self.assertTrue(os.path.exists(fn)) 622 h.close() 623 os.unlink(fn) 624 625 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 626 def test_race(self): 627 # Issue #14632 refers. 628 def remove_loop(fname, tries): 629 for _ in range(tries): 630 try: 631 os.unlink(fname) 632 self.deletion_time = time.time() 633 except OSError: 634 pass 635 time.sleep(0.004 * random.randint(0, 4)) 636 637 del_count = 500 638 log_count = 500 639 640 self.handle_time = None 641 self.deletion_time = None 642 643 for delay in (False, True): 644 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 645 os.close(fd) 646 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 647 remover.daemon = True 648 remover.start() 649 h = logging.handlers.WatchedFileHandler(fn, delay=delay) 650 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 651 h.setFormatter(f) 652 try: 653 for _ in range(log_count): 654 time.sleep(0.005) 655 r = logging.makeLogRecord({'msg': 'testing' }) 656 try: 657 self.handle_time = time.time() 658 h.handle(r) 659 except Exception: 660 print('Deleted at %s, ' 661 'opened at %s' % (self.deletion_time, 662 self.handle_time)) 663 raise 664 finally: 665 remover.join() 666 h.close() 667 if os.path.exists(fn): 668 os.unlink(fn) 669 670 # The implementation relies on os.register_at_fork existing, but we test 671 # based on os.fork existing because that is what users and this test use. 672 # This helps ensure that when fork exists (the important concept) that the 673 # register_at_fork mechanism is also present and used. 674 @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().') 675 def test_post_fork_child_no_deadlock(self): 676 """Ensure forked child logging locks are not held; bpo-6721.""" 677 refed_h = logging.Handler() 678 refed_h.name = 'because we need at least one for this test' 679 self.assertGreater(len(logging._handlers), 0) 680 681 locks_held__ready_to_fork = threading.Event() 682 fork_happened__release_locks_and_end_thread = threading.Event() 683 684 def lock_holder_thread_fn(): 685 logging._acquireLock() 686 try: 687 refed_h.acquire() 688 try: 689 # Tell the main thread to do the fork. 690 locks_held__ready_to_fork.set() 691 692 # If the deadlock bug exists, the fork will happen 693 # without dealing with the locks we hold, deadlocking 694 # the child. 695 696 # Wait for a successful fork or an unreasonable amount of 697 # time before releasing our locks. To avoid a timing based 698 # test we'd need communication from os.fork() as to when it 699 # has actually happened. Given this is a regression test 700 # for a fixed issue, potentially less reliably detecting 701 # regression via timing is acceptable for simplicity. 702 # The test will always take at least this long. :( 703 fork_happened__release_locks_and_end_thread.wait(0.5) 704 finally: 705 refed_h.release() 706 finally: 707 logging._releaseLock() 708 709 lock_holder_thread = threading.Thread( 710 target=lock_holder_thread_fn, 711 name='test_post_fork_child_no_deadlock lock holder') 712 lock_holder_thread.start() 713 714 locks_held__ready_to_fork.wait() 715 pid = os.fork() 716 if pid == 0: # Child. 717 logging.error(r'Child process did not deadlock. \o/') 718 os._exit(0) 719 else: # Parent. 720 fork_happened__release_locks_and_end_thread.set() 721 lock_holder_thread.join() 722 start_time = time.monotonic() 723 while True: 724 waited_pid, status = os.waitpid(pid, os.WNOHANG) 725 if waited_pid == pid: 726 break # child process exited. 727 if time.monotonic() - start_time > 7: 728 break # so long? implies child deadlock. 729 time.sleep(0.05) 730 if waited_pid != pid: 731 os.kill(pid, signal.SIGKILL) 732 waited_pid, status = os.waitpid(pid, 0) 733 self.fail("child process deadlocked.") 734 self.assertEqual(status, 0, msg="child process error") 735 736 737class BadStream(object): 738 def write(self, data): 739 raise RuntimeError('deliberate mistake') 740 741class TestStreamHandler(logging.StreamHandler): 742 def handleError(self, record): 743 self.error_record = record 744 745class StreamHandlerTest(BaseTest): 746 def test_error_handling(self): 747 h = TestStreamHandler(BadStream()) 748 r = logging.makeLogRecord({}) 749 old_raise = logging.raiseExceptions 750 751 try: 752 h.handle(r) 753 self.assertIs(h.error_record, r) 754 755 h = logging.StreamHandler(BadStream()) 756 with support.captured_stderr() as stderr: 757 h.handle(r) 758 msg = '\nRuntimeError: deliberate mistake\n' 759 self.assertIn(msg, stderr.getvalue()) 760 761 logging.raiseExceptions = False 762 with support.captured_stderr() as stderr: 763 h.handle(r) 764 self.assertEqual('', stderr.getvalue()) 765 finally: 766 logging.raiseExceptions = old_raise 767 768 def test_stream_setting(self): 769 """ 770 Test setting the handler's stream 771 """ 772 h = logging.StreamHandler() 773 stream = io.StringIO() 774 old = h.setStream(stream) 775 self.assertIs(old, sys.stderr) 776 actual = h.setStream(old) 777 self.assertIs(actual, stream) 778 # test that setting to existing value returns None 779 actual = h.setStream(old) 780 self.assertIsNone(actual) 781 782# -- The following section could be moved into a server_helper.py module 783# -- if it proves to be of wider utility than just test_logging 784 785class TestSMTPServer(smtpd.SMTPServer): 786 """ 787 This class implements a test SMTP server. 788 789 :param addr: A (host, port) tuple which the server listens on. 790 You can specify a port value of zero: the server's 791 *port* attribute will hold the actual port number 792 used, which can be used in client connections. 793 :param handler: A callable which will be called to process 794 incoming messages. The handler will be passed 795 the client address tuple, who the message is from, 796 a list of recipients and the message data. 797 :param poll_interval: The interval, in seconds, used in the underlying 798 :func:`select` or :func:`poll` call by 799 :func:`asyncore.loop`. 800 :param sockmap: A dictionary which will be used to hold 801 :class:`asyncore.dispatcher` instances used by 802 :func:`asyncore.loop`. This avoids changing the 803 :mod:`asyncore` module's global state. 804 """ 805 806 def __init__(self, addr, handler, poll_interval, sockmap): 807 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 808 decode_data=True) 809 self.port = self.socket.getsockname()[1] 810 self._handler = handler 811 self._thread = None 812 self.poll_interval = poll_interval 813 814 def process_message(self, peer, mailfrom, rcpttos, data): 815 """ 816 Delegates to the handler passed in to the server's constructor. 817 818 Typically, this will be a test case method. 819 :param peer: The client (host, port) tuple. 820 :param mailfrom: The address of the sender. 821 :param rcpttos: The addresses of the recipients. 822 :param data: The message. 823 """ 824 self._handler(peer, mailfrom, rcpttos, data) 825 826 def start(self): 827 """ 828 Start the server running on a separate daemon thread. 829 """ 830 self._thread = t = threading.Thread(target=self.serve_forever, 831 args=(self.poll_interval,)) 832 t.setDaemon(True) 833 t.start() 834 835 def serve_forever(self, poll_interval): 836 """ 837 Run the :mod:`asyncore` loop until normal termination 838 conditions arise. 839 :param poll_interval: The interval, in seconds, used in the underlying 840 :func:`select` or :func:`poll` call by 841 :func:`asyncore.loop`. 842 """ 843 asyncore.loop(poll_interval, map=self._map) 844 845 def stop(self, timeout=None): 846 """ 847 Stop the thread by closing the server instance. 848 Wait for the server thread to terminate. 849 850 :param timeout: How long to wait for the server thread 851 to terminate. 852 """ 853 self.close() 854 support.join_thread(self._thread, timeout) 855 self._thread = None 856 asyncore.close_all(map=self._map, ignore_all=True) 857 858 859class ControlMixin(object): 860 """ 861 This mixin is used to start a server on a separate thread, and 862 shut it down programmatically. Request handling is simplified - instead 863 of needing to derive a suitable RequestHandler subclass, you just 864 provide a callable which will be passed each received request to be 865 processed. 866 867 :param handler: A handler callable which will be called with a 868 single parameter - the request - in order to 869 process the request. This handler is called on the 870 server thread, effectively meaning that requests are 871 processed serially. While not quite Web scale ;-), 872 this should be fine for testing applications. 873 :param poll_interval: The polling interval in seconds. 874 """ 875 def __init__(self, handler, poll_interval): 876 self._thread = None 877 self.poll_interval = poll_interval 878 self._handler = handler 879 self.ready = threading.Event() 880 881 def start(self): 882 """ 883 Create a daemon thread to run the server, and start it. 884 """ 885 self._thread = t = threading.Thread(target=self.serve_forever, 886 args=(self.poll_interval,)) 887 t.setDaemon(True) 888 t.start() 889 890 def serve_forever(self, poll_interval): 891 """ 892 Run the server. Set the ready flag before entering the 893 service loop. 894 """ 895 self.ready.set() 896 super(ControlMixin, self).serve_forever(poll_interval) 897 898 def stop(self, timeout=None): 899 """ 900 Tell the server thread to stop, and wait for it to do so. 901 902 :param timeout: How long to wait for the server thread 903 to terminate. 904 """ 905 self.shutdown() 906 if self._thread is not None: 907 support.join_thread(self._thread, timeout) 908 self._thread = None 909 self.server_close() 910 self.ready.clear() 911 912class TestHTTPServer(ControlMixin, HTTPServer): 913 """ 914 An HTTP server which is controllable using :class:`ControlMixin`. 915 916 :param addr: A tuple with the IP address and port to listen on. 917 :param handler: A handler callable which will be called with a 918 single parameter - the request - in order to 919 process the request. 920 :param poll_interval: The polling interval in seconds. 921 :param log: Pass ``True`` to enable log messages. 922 """ 923 def __init__(self, addr, handler, poll_interval=0.5, 924 log=False, sslctx=None): 925 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 926 def __getattr__(self, name, default=None): 927 if name.startswith('do_'): 928 return self.process_request 929 raise AttributeError(name) 930 931 def process_request(self): 932 self.server._handler(self) 933 934 def log_message(self, format, *args): 935 if log: 936 super(DelegatingHTTPRequestHandler, 937 self).log_message(format, *args) 938 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 939 ControlMixin.__init__(self, handler, poll_interval) 940 self.sslctx = sslctx 941 942 def get_request(self): 943 try: 944 sock, addr = self.socket.accept() 945 if self.sslctx: 946 sock = self.sslctx.wrap_socket(sock, server_side=True) 947 except OSError as e: 948 # socket errors are silenced by the caller, print them here 949 sys.stderr.write("Got an error:\n%s\n" % e) 950 raise 951 return sock, addr 952 953class TestTCPServer(ControlMixin, ThreadingTCPServer): 954 """ 955 A TCP server which is controllable using :class:`ControlMixin`. 956 957 :param addr: A tuple with the IP address and port to listen on. 958 :param handler: A handler callable which will be called with a single 959 parameter - the request - in order to process the request. 960 :param poll_interval: The polling interval in seconds. 961 :bind_and_activate: If True (the default), binds the server and starts it 962 listening. If False, you need to call 963 :meth:`server_bind` and :meth:`server_activate` at 964 some later time before calling :meth:`start`, so that 965 the server will set up the socket and listen on it. 966 """ 967 968 allow_reuse_address = True 969 970 def __init__(self, addr, handler, poll_interval=0.5, 971 bind_and_activate=True): 972 class DelegatingTCPRequestHandler(StreamRequestHandler): 973 974 def handle(self): 975 self.server._handler(self) 976 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 977 bind_and_activate) 978 ControlMixin.__init__(self, handler, poll_interval) 979 980 def server_bind(self): 981 super(TestTCPServer, self).server_bind() 982 self.port = self.socket.getsockname()[1] 983 984class TestUDPServer(ControlMixin, ThreadingUDPServer): 985 """ 986 A UDP server which is controllable using :class:`ControlMixin`. 987 988 :param addr: A tuple with the IP address and port to listen on. 989 :param handler: A handler callable which will be called with a 990 single parameter - the request - in order to 991 process the request. 992 :param poll_interval: The polling interval for shutdown requests, 993 in seconds. 994 :bind_and_activate: If True (the default), binds the server and 995 starts it listening. If False, you need to 996 call :meth:`server_bind` and 997 :meth:`server_activate` at some later time 998 before calling :meth:`start`, so that the server will 999 set up the socket and listen on it. 1000 """ 1001 def __init__(self, addr, handler, poll_interval=0.5, 1002 bind_and_activate=True): 1003 class DelegatingUDPRequestHandler(DatagramRequestHandler): 1004 1005 def handle(self): 1006 self.server._handler(self) 1007 1008 def finish(self): 1009 data = self.wfile.getvalue() 1010 if data: 1011 try: 1012 super(DelegatingUDPRequestHandler, self).finish() 1013 except OSError: 1014 if not self.server._closed: 1015 raise 1016 1017 ThreadingUDPServer.__init__(self, addr, 1018 DelegatingUDPRequestHandler, 1019 bind_and_activate) 1020 ControlMixin.__init__(self, handler, poll_interval) 1021 self._closed = False 1022 1023 def server_bind(self): 1024 super(TestUDPServer, self).server_bind() 1025 self.port = self.socket.getsockname()[1] 1026 1027 def server_close(self): 1028 super(TestUDPServer, self).server_close() 1029 self._closed = True 1030 1031if hasattr(socket, "AF_UNIX"): 1032 class TestUnixStreamServer(TestTCPServer): 1033 address_family = socket.AF_UNIX 1034 1035 class TestUnixDatagramServer(TestUDPServer): 1036 address_family = socket.AF_UNIX 1037 1038# - end of server_helper section 1039 1040class SMTPHandlerTest(BaseTest): 1041 # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute 1042 TIMEOUT = 60.0 1043 1044 def test_basic(self): 1045 sockmap = {} 1046 server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, 1047 sockmap) 1048 server.start() 1049 addr = (support.HOST, server.port) 1050 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 1051 timeout=self.TIMEOUT) 1052 self.assertEqual(h.toaddrs, ['you']) 1053 self.messages = [] 1054 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 1055 self.handled = threading.Event() 1056 h.handle(r) 1057 self.handled.wait(self.TIMEOUT) 1058 server.stop() 1059 self.assertTrue(self.handled.is_set()) 1060 self.assertEqual(len(self.messages), 1) 1061 peer, mailfrom, rcpttos, data = self.messages[0] 1062 self.assertEqual(mailfrom, 'me') 1063 self.assertEqual(rcpttos, ['you']) 1064 self.assertIn('\nSubject: Log\n', data) 1065 self.assertTrue(data.endswith('\n\nHello \u2713')) 1066 h.close() 1067 1068 def process_message(self, *args): 1069 self.messages.append(args) 1070 self.handled.set() 1071 1072class MemoryHandlerTest(BaseTest): 1073 1074 """Tests for the MemoryHandler.""" 1075 1076 # Do not bother with a logger name group. 1077 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 1078 1079 def setUp(self): 1080 BaseTest.setUp(self) 1081 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1082 self.root_hdlr) 1083 self.mem_logger = logging.getLogger('mem') 1084 self.mem_logger.propagate = 0 1085 self.mem_logger.addHandler(self.mem_hdlr) 1086 1087 def tearDown(self): 1088 self.mem_hdlr.close() 1089 BaseTest.tearDown(self) 1090 1091 def test_flush(self): 1092 # The memory handler flushes to its target handler based on specific 1093 # criteria (message count and message level). 1094 self.mem_logger.debug(self.next_message()) 1095 self.assert_log_lines([]) 1096 self.mem_logger.info(self.next_message()) 1097 self.assert_log_lines([]) 1098 # This will flush because the level is >= logging.WARNING 1099 self.mem_logger.warning(self.next_message()) 1100 lines = [ 1101 ('DEBUG', '1'), 1102 ('INFO', '2'), 1103 ('WARNING', '3'), 1104 ] 1105 self.assert_log_lines(lines) 1106 for n in (4, 14): 1107 for i in range(9): 1108 self.mem_logger.debug(self.next_message()) 1109 self.assert_log_lines(lines) 1110 # This will flush because it's the 10th message since the last 1111 # flush. 1112 self.mem_logger.debug(self.next_message()) 1113 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1114 self.assert_log_lines(lines) 1115 1116 self.mem_logger.debug(self.next_message()) 1117 self.assert_log_lines(lines) 1118 1119 def test_flush_on_close(self): 1120 """ 1121 Test that the flush-on-close configuration works as expected. 1122 """ 1123 self.mem_logger.debug(self.next_message()) 1124 self.assert_log_lines([]) 1125 self.mem_logger.info(self.next_message()) 1126 self.assert_log_lines([]) 1127 self.mem_logger.removeHandler(self.mem_hdlr) 1128 # Default behaviour is to flush on close. Check that it happens. 1129 self.mem_hdlr.close() 1130 lines = [ 1131 ('DEBUG', '1'), 1132 ('INFO', '2'), 1133 ] 1134 self.assert_log_lines(lines) 1135 # Now configure for flushing not to be done on close. 1136 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1137 self.root_hdlr, 1138 False) 1139 self.mem_logger.addHandler(self.mem_hdlr) 1140 self.mem_logger.debug(self.next_message()) 1141 self.assert_log_lines(lines) # no change 1142 self.mem_logger.info(self.next_message()) 1143 self.assert_log_lines(lines) # no change 1144 self.mem_logger.removeHandler(self.mem_hdlr) 1145 self.mem_hdlr.close() 1146 # assert that no new lines have been added 1147 self.assert_log_lines(lines) # no change 1148 1149 1150class ExceptionFormatter(logging.Formatter): 1151 """A special exception formatter.""" 1152 def formatException(self, ei): 1153 return "Got a [%s]" % ei[0].__name__ 1154 1155 1156class ConfigFileTest(BaseTest): 1157 1158 """Reading logging config from a .ini-style config file.""" 1159 1160 check_no_resource_warning = support.check_no_resource_warning 1161 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1162 1163 # config0 is a standard configuration. 1164 config0 = """ 1165 [loggers] 1166 keys=root 1167 1168 [handlers] 1169 keys=hand1 1170 1171 [formatters] 1172 keys=form1 1173 1174 [logger_root] 1175 level=WARNING 1176 handlers=hand1 1177 1178 [handler_hand1] 1179 class=StreamHandler 1180 level=NOTSET 1181 formatter=form1 1182 args=(sys.stdout,) 1183 1184 [formatter_form1] 1185 format=%(levelname)s ++ %(message)s 1186 datefmt= 1187 """ 1188 1189 # config1 adds a little to the standard configuration. 1190 config1 = """ 1191 [loggers] 1192 keys=root,parser 1193 1194 [handlers] 1195 keys=hand1 1196 1197 [formatters] 1198 keys=form1 1199 1200 [logger_root] 1201 level=WARNING 1202 handlers= 1203 1204 [logger_parser] 1205 level=DEBUG 1206 handlers=hand1 1207 propagate=1 1208 qualname=compiler.parser 1209 1210 [handler_hand1] 1211 class=StreamHandler 1212 level=NOTSET 1213 formatter=form1 1214 args=(sys.stdout,) 1215 1216 [formatter_form1] 1217 format=%(levelname)s ++ %(message)s 1218 datefmt= 1219 """ 1220 1221 # config1a moves the handler to the root. 1222 config1a = """ 1223 [loggers] 1224 keys=root,parser 1225 1226 [handlers] 1227 keys=hand1 1228 1229 [formatters] 1230 keys=form1 1231 1232 [logger_root] 1233 level=WARNING 1234 handlers=hand1 1235 1236 [logger_parser] 1237 level=DEBUG 1238 handlers= 1239 propagate=1 1240 qualname=compiler.parser 1241 1242 [handler_hand1] 1243 class=StreamHandler 1244 level=NOTSET 1245 formatter=form1 1246 args=(sys.stdout,) 1247 1248 [formatter_form1] 1249 format=%(levelname)s ++ %(message)s 1250 datefmt= 1251 """ 1252 1253 # config2 has a subtle configuration error that should be reported 1254 config2 = config1.replace("sys.stdout", "sys.stbout") 1255 1256 # config3 has a less subtle configuration error 1257 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1258 1259 # config4 specifies a custom formatter class to be loaded 1260 config4 = """ 1261 [loggers] 1262 keys=root 1263 1264 [handlers] 1265 keys=hand1 1266 1267 [formatters] 1268 keys=form1 1269 1270 [logger_root] 1271 level=NOTSET 1272 handlers=hand1 1273 1274 [handler_hand1] 1275 class=StreamHandler 1276 level=NOTSET 1277 formatter=form1 1278 args=(sys.stdout,) 1279 1280 [formatter_form1] 1281 class=""" + __name__ + """.ExceptionFormatter 1282 format=%(levelname)s:%(name)s:%(message)s 1283 datefmt= 1284 """ 1285 1286 # config5 specifies a custom handler class to be loaded 1287 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1288 1289 # config6 uses ', ' delimiters in the handlers and formatters sections 1290 config6 = """ 1291 [loggers] 1292 keys=root,parser 1293 1294 [handlers] 1295 keys=hand1, hand2 1296 1297 [formatters] 1298 keys=form1, form2 1299 1300 [logger_root] 1301 level=WARNING 1302 handlers= 1303 1304 [logger_parser] 1305 level=DEBUG 1306 handlers=hand1 1307 propagate=1 1308 qualname=compiler.parser 1309 1310 [handler_hand1] 1311 class=StreamHandler 1312 level=NOTSET 1313 formatter=form1 1314 args=(sys.stdout,) 1315 1316 [handler_hand2] 1317 class=StreamHandler 1318 level=NOTSET 1319 formatter=form1 1320 args=(sys.stderr,) 1321 1322 [formatter_form1] 1323 format=%(levelname)s ++ %(message)s 1324 datefmt= 1325 1326 [formatter_form2] 1327 format=%(message)s 1328 datefmt= 1329 """ 1330 1331 # config7 adds a compiler logger, and uses kwargs instead of args. 1332 config7 = """ 1333 [loggers] 1334 keys=root,parser,compiler 1335 1336 [handlers] 1337 keys=hand1 1338 1339 [formatters] 1340 keys=form1 1341 1342 [logger_root] 1343 level=WARNING 1344 handlers=hand1 1345 1346 [logger_compiler] 1347 level=DEBUG 1348 handlers= 1349 propagate=1 1350 qualname=compiler 1351 1352 [logger_parser] 1353 level=DEBUG 1354 handlers= 1355 propagate=1 1356 qualname=compiler.parser 1357 1358 [handler_hand1] 1359 class=StreamHandler 1360 level=NOTSET 1361 formatter=form1 1362 kwargs={'stream': sys.stdout,} 1363 1364 [formatter_form1] 1365 format=%(levelname)s ++ %(message)s 1366 datefmt= 1367 """ 1368 1369 # config 8, check for resource warning 1370 config8 = r""" 1371 [loggers] 1372 keys=root 1373 1374 [handlers] 1375 keys=file 1376 1377 [formatters] 1378 keys= 1379 1380 [logger_root] 1381 level=DEBUG 1382 handlers=file 1383 1384 [handler_file] 1385 class=FileHandler 1386 level=DEBUG 1387 args=("{tempfile}",) 1388 """ 1389 1390 disable_test = """ 1391 [loggers] 1392 keys=root 1393 1394 [handlers] 1395 keys=screen 1396 1397 [formatters] 1398 keys= 1399 1400 [logger_root] 1401 level=DEBUG 1402 handlers=screen 1403 1404 [handler_screen] 1405 level=DEBUG 1406 class=StreamHandler 1407 args=(sys.stdout,) 1408 formatter= 1409 """ 1410 1411 def apply_config(self, conf, **kwargs): 1412 file = io.StringIO(textwrap.dedent(conf)) 1413 logging.config.fileConfig(file, **kwargs) 1414 1415 def test_config0_ok(self): 1416 # A simple config file which overrides the default settings. 1417 with support.captured_stdout() as output: 1418 self.apply_config(self.config0) 1419 logger = logging.getLogger() 1420 # Won't output anything 1421 logger.info(self.next_message()) 1422 # Outputs a message 1423 logger.error(self.next_message()) 1424 self.assert_log_lines([ 1425 ('ERROR', '2'), 1426 ], stream=output) 1427 # Original logger output is empty. 1428 self.assert_log_lines([]) 1429 1430 def test_config0_using_cp_ok(self): 1431 # A simple config file which overrides the default settings. 1432 with support.captured_stdout() as output: 1433 file = io.StringIO(textwrap.dedent(self.config0)) 1434 cp = configparser.ConfigParser() 1435 cp.read_file(file) 1436 logging.config.fileConfig(cp) 1437 logger = logging.getLogger() 1438 # Won't output anything 1439 logger.info(self.next_message()) 1440 # Outputs a message 1441 logger.error(self.next_message()) 1442 self.assert_log_lines([ 1443 ('ERROR', '2'), 1444 ], stream=output) 1445 # Original logger output is empty. 1446 self.assert_log_lines([]) 1447 1448 def test_config1_ok(self, config=config1): 1449 # A config file defining a sub-parser as well. 1450 with support.captured_stdout() as output: 1451 self.apply_config(config) 1452 logger = logging.getLogger("compiler.parser") 1453 # Both will output a message 1454 logger.info(self.next_message()) 1455 logger.error(self.next_message()) 1456 self.assert_log_lines([ 1457 ('INFO', '1'), 1458 ('ERROR', '2'), 1459 ], stream=output) 1460 # Original logger output is empty. 1461 self.assert_log_lines([]) 1462 1463 def test_config2_failure(self): 1464 # A simple config file which overrides the default settings. 1465 self.assertRaises(Exception, self.apply_config, self.config2) 1466 1467 def test_config3_failure(self): 1468 # A simple config file which overrides the default settings. 1469 self.assertRaises(Exception, self.apply_config, self.config3) 1470 1471 def test_config4_ok(self): 1472 # A config file specifying a custom formatter class. 1473 with support.captured_stdout() as output: 1474 self.apply_config(self.config4) 1475 logger = logging.getLogger() 1476 try: 1477 raise RuntimeError() 1478 except RuntimeError: 1479 logging.exception("just testing") 1480 sys.stdout.seek(0) 1481 self.assertEqual(output.getvalue(), 1482 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1483 # Original logger output is empty 1484 self.assert_log_lines([]) 1485 1486 def test_config5_ok(self): 1487 self.test_config1_ok(config=self.config5) 1488 1489 def test_config6_ok(self): 1490 self.test_config1_ok(config=self.config6) 1491 1492 def test_config7_ok(self): 1493 with support.captured_stdout() as output: 1494 self.apply_config(self.config1a) 1495 logger = logging.getLogger("compiler.parser") 1496 # See issue #11424. compiler-hyphenated sorts 1497 # between compiler and compiler.xyz and this 1498 # was preventing compiler.xyz from being included 1499 # in the child loggers of compiler because of an 1500 # overzealous loop termination condition. 1501 hyphenated = logging.getLogger('compiler-hyphenated') 1502 # All will output a message 1503 logger.info(self.next_message()) 1504 logger.error(self.next_message()) 1505 hyphenated.critical(self.next_message()) 1506 self.assert_log_lines([ 1507 ('INFO', '1'), 1508 ('ERROR', '2'), 1509 ('CRITICAL', '3'), 1510 ], stream=output) 1511 # Original logger output is empty. 1512 self.assert_log_lines([]) 1513 with support.captured_stdout() as output: 1514 self.apply_config(self.config7) 1515 logger = logging.getLogger("compiler.parser") 1516 self.assertFalse(logger.disabled) 1517 # Both will output a message 1518 logger.info(self.next_message()) 1519 logger.error(self.next_message()) 1520 logger = logging.getLogger("compiler.lexer") 1521 # Both will output a message 1522 logger.info(self.next_message()) 1523 logger.error(self.next_message()) 1524 # Will not appear 1525 hyphenated.critical(self.next_message()) 1526 self.assert_log_lines([ 1527 ('INFO', '4'), 1528 ('ERROR', '5'), 1529 ('INFO', '6'), 1530 ('ERROR', '7'), 1531 ], stream=output) 1532 # Original logger output is empty. 1533 self.assert_log_lines([]) 1534 1535 def test_config8_ok(self): 1536 1537 def cleanup(h1, fn): 1538 h1.close() 1539 os.remove(fn) 1540 1541 with self.check_no_resource_warning(): 1542 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 1543 os.close(fd) 1544 1545 # Replace single backslash with double backslash in windows 1546 # to avoid unicode error during string formatting 1547 if os.name == "nt": 1548 fn = fn.replace("\\", "\\\\") 1549 1550 config8 = self.config8.format(tempfile=fn) 1551 1552 self.apply_config(config8) 1553 self.apply_config(config8) 1554 1555 handler = logging.root.handlers[0] 1556 self.addCleanup(cleanup, handler, fn) 1557 1558 def test_logger_disabling(self): 1559 self.apply_config(self.disable_test) 1560 logger = logging.getLogger('some_pristine_logger') 1561 self.assertFalse(logger.disabled) 1562 self.apply_config(self.disable_test) 1563 self.assertTrue(logger.disabled) 1564 self.apply_config(self.disable_test, disable_existing_loggers=False) 1565 self.assertFalse(logger.disabled) 1566 1567 def test_defaults_do_no_interpolation(self): 1568 """bpo-33802 defaults should not get interpolated""" 1569 ini = textwrap.dedent(""" 1570 [formatters] 1571 keys=default 1572 1573 [formatter_default] 1574 1575 [handlers] 1576 keys=console 1577 1578 [handler_console] 1579 class=logging.StreamHandler 1580 args=tuple() 1581 1582 [loggers] 1583 keys=root 1584 1585 [logger_root] 1586 formatter=default 1587 handlers=console 1588 """).strip() 1589 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini') 1590 try: 1591 os.write(fd, ini.encode('ascii')) 1592 os.close(fd) 1593 logging.config.fileConfig( 1594 fn, 1595 defaults=dict( 1596 version=1, 1597 disable_existing_loggers=False, 1598 formatters={ 1599 "generic": { 1600 "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s", 1601 "datefmt": "[%Y-%m-%d %H:%M:%S %z]", 1602 "class": "logging.Formatter" 1603 }, 1604 }, 1605 ) 1606 ) 1607 finally: 1608 os.unlink(fn) 1609 1610 1611class SocketHandlerTest(BaseTest): 1612 1613 """Test for SocketHandler objects.""" 1614 1615 server_class = TestTCPServer 1616 address = ('localhost', 0) 1617 1618 def setUp(self): 1619 """Set up a TCP server to receive log messages, and a SocketHandler 1620 pointing to that server's address and port.""" 1621 BaseTest.setUp(self) 1622 # Issue #29177: deal with errors that happen during setup 1623 self.server = self.sock_hdlr = self.server_exception = None 1624 try: 1625 self.server = server = self.server_class(self.address, 1626 self.handle_socket, 0.01) 1627 server.start() 1628 # Uncomment next line to test error recovery in setUp() 1629 # raise OSError('dummy error raised') 1630 except OSError as e: 1631 self.server_exception = e 1632 return 1633 server.ready.wait() 1634 hcls = logging.handlers.SocketHandler 1635 if isinstance(server.server_address, tuple): 1636 self.sock_hdlr = hcls('localhost', server.port) 1637 else: 1638 self.sock_hdlr = hcls(server.server_address, None) 1639 self.log_output = '' 1640 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1641 self.root_logger.addHandler(self.sock_hdlr) 1642 self.handled = threading.Semaphore(0) 1643 1644 def tearDown(self): 1645 """Shutdown the TCP server.""" 1646 try: 1647 if self.sock_hdlr: 1648 self.root_logger.removeHandler(self.sock_hdlr) 1649 self.sock_hdlr.close() 1650 if self.server: 1651 self.server.stop(2.0) 1652 finally: 1653 BaseTest.tearDown(self) 1654 1655 def handle_socket(self, request): 1656 conn = request.connection 1657 while True: 1658 chunk = conn.recv(4) 1659 if len(chunk) < 4: 1660 break 1661 slen = struct.unpack(">L", chunk)[0] 1662 chunk = conn.recv(slen) 1663 while len(chunk) < slen: 1664 chunk = chunk + conn.recv(slen - len(chunk)) 1665 obj = pickle.loads(chunk) 1666 record = logging.makeLogRecord(obj) 1667 self.log_output += record.msg + '\n' 1668 self.handled.release() 1669 1670 def test_output(self): 1671 # The log message sent to the SocketHandler is properly received. 1672 if self.server_exception: 1673 self.skipTest(self.server_exception) 1674 logger = logging.getLogger("tcp") 1675 logger.error("spam") 1676 self.handled.acquire() 1677 logger.debug("eggs") 1678 self.handled.acquire() 1679 self.assertEqual(self.log_output, "spam\neggs\n") 1680 1681 def test_noserver(self): 1682 if self.server_exception: 1683 self.skipTest(self.server_exception) 1684 # Avoid timing-related failures due to SocketHandler's own hard-wired 1685 # one-second timeout on socket.create_connection() (issue #16264). 1686 self.sock_hdlr.retryStart = 2.5 1687 # Kill the server 1688 self.server.stop(2.0) 1689 # The logging call should try to connect, which should fail 1690 try: 1691 raise RuntimeError('Deliberate mistake') 1692 except RuntimeError: 1693 self.root_logger.exception('Never sent') 1694 self.root_logger.error('Never sent, either') 1695 now = time.time() 1696 self.assertGreater(self.sock_hdlr.retryTime, now) 1697 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1698 self.root_logger.error('Nor this') 1699 1700def _get_temp_domain_socket(): 1701 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1702 os.close(fd) 1703 # just need a name - file can't be present, or we'll get an 1704 # 'address already in use' error. 1705 os.remove(fn) 1706 return fn 1707 1708@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1709class UnixSocketHandlerTest(SocketHandlerTest): 1710 1711 """Test for SocketHandler with unix sockets.""" 1712 1713 if hasattr(socket, "AF_UNIX"): 1714 server_class = TestUnixStreamServer 1715 1716 def setUp(self): 1717 # override the definition in the base class 1718 self.address = _get_temp_domain_socket() 1719 SocketHandlerTest.setUp(self) 1720 1721 def tearDown(self): 1722 SocketHandlerTest.tearDown(self) 1723 support.unlink(self.address) 1724 1725class DatagramHandlerTest(BaseTest): 1726 1727 """Test for DatagramHandler.""" 1728 1729 server_class = TestUDPServer 1730 address = ('localhost', 0) 1731 1732 def setUp(self): 1733 """Set up a UDP server to receive log messages, and a DatagramHandler 1734 pointing to that server's address and port.""" 1735 BaseTest.setUp(self) 1736 # Issue #29177: deal with errors that happen during setup 1737 self.server = self.sock_hdlr = self.server_exception = None 1738 try: 1739 self.server = server = self.server_class(self.address, 1740 self.handle_datagram, 0.01) 1741 server.start() 1742 # Uncomment next line to test error recovery in setUp() 1743 # raise OSError('dummy error raised') 1744 except OSError as e: 1745 self.server_exception = e 1746 return 1747 server.ready.wait() 1748 hcls = logging.handlers.DatagramHandler 1749 if isinstance(server.server_address, tuple): 1750 self.sock_hdlr = hcls('localhost', server.port) 1751 else: 1752 self.sock_hdlr = hcls(server.server_address, None) 1753 self.log_output = '' 1754 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1755 self.root_logger.addHandler(self.sock_hdlr) 1756 self.handled = threading.Event() 1757 1758 def tearDown(self): 1759 """Shutdown the UDP server.""" 1760 try: 1761 if self.server: 1762 self.server.stop(2.0) 1763 if self.sock_hdlr: 1764 self.root_logger.removeHandler(self.sock_hdlr) 1765 self.sock_hdlr.close() 1766 finally: 1767 BaseTest.tearDown(self) 1768 1769 def handle_datagram(self, request): 1770 slen = struct.pack('>L', 0) # length of prefix 1771 packet = request.packet[len(slen):] 1772 obj = pickle.loads(packet) 1773 record = logging.makeLogRecord(obj) 1774 self.log_output += record.msg + '\n' 1775 self.handled.set() 1776 1777 def test_output(self): 1778 # The log message sent to the DatagramHandler is properly received. 1779 if self.server_exception: 1780 self.skipTest(self.server_exception) 1781 logger = logging.getLogger("udp") 1782 logger.error("spam") 1783 self.handled.wait() 1784 self.handled.clear() 1785 logger.error("eggs") 1786 self.handled.wait() 1787 self.assertEqual(self.log_output, "spam\neggs\n") 1788 1789@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1790class UnixDatagramHandlerTest(DatagramHandlerTest): 1791 1792 """Test for DatagramHandler using Unix sockets.""" 1793 1794 if hasattr(socket, "AF_UNIX"): 1795 server_class = TestUnixDatagramServer 1796 1797 def setUp(self): 1798 # override the definition in the base class 1799 self.address = _get_temp_domain_socket() 1800 DatagramHandlerTest.setUp(self) 1801 1802 def tearDown(self): 1803 DatagramHandlerTest.tearDown(self) 1804 support.unlink(self.address) 1805 1806class SysLogHandlerTest(BaseTest): 1807 1808 """Test for SysLogHandler using UDP.""" 1809 1810 server_class = TestUDPServer 1811 address = ('localhost', 0) 1812 1813 def setUp(self): 1814 """Set up a UDP server to receive log messages, and a SysLogHandler 1815 pointing to that server's address and port.""" 1816 BaseTest.setUp(self) 1817 # Issue #29177: deal with errors that happen during setup 1818 self.server = self.sl_hdlr = self.server_exception = None 1819 try: 1820 self.server = server = self.server_class(self.address, 1821 self.handle_datagram, 0.01) 1822 server.start() 1823 # Uncomment next line to test error recovery in setUp() 1824 # raise OSError('dummy error raised') 1825 except OSError as e: 1826 self.server_exception = e 1827 return 1828 server.ready.wait() 1829 hcls = logging.handlers.SysLogHandler 1830 if isinstance(server.server_address, tuple): 1831 self.sl_hdlr = hcls((server.server_address[0], server.port)) 1832 else: 1833 self.sl_hdlr = hcls(server.server_address) 1834 self.log_output = '' 1835 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1836 self.root_logger.addHandler(self.sl_hdlr) 1837 self.handled = threading.Event() 1838 1839 def tearDown(self): 1840 """Shutdown the server.""" 1841 try: 1842 if self.server: 1843 self.server.stop(2.0) 1844 if self.sl_hdlr: 1845 self.root_logger.removeHandler(self.sl_hdlr) 1846 self.sl_hdlr.close() 1847 finally: 1848 BaseTest.tearDown(self) 1849 1850 def handle_datagram(self, request): 1851 self.log_output = request.packet 1852 self.handled.set() 1853 1854 def test_output(self): 1855 if self.server_exception: 1856 self.skipTest(self.server_exception) 1857 # The log message sent to the SysLogHandler is properly received. 1858 logger = logging.getLogger("slh") 1859 logger.error("sp\xe4m") 1860 self.handled.wait() 1861 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1862 self.handled.clear() 1863 self.sl_hdlr.append_nul = False 1864 logger.error("sp\xe4m") 1865 self.handled.wait() 1866 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1867 self.handled.clear() 1868 self.sl_hdlr.ident = "h\xe4m-" 1869 logger.error("sp\xe4m") 1870 self.handled.wait() 1871 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1872 1873@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1874class UnixSysLogHandlerTest(SysLogHandlerTest): 1875 1876 """Test for SysLogHandler with Unix sockets.""" 1877 1878 if hasattr(socket, "AF_UNIX"): 1879 server_class = TestUnixDatagramServer 1880 1881 def setUp(self): 1882 # override the definition in the base class 1883 self.address = _get_temp_domain_socket() 1884 SysLogHandlerTest.setUp(self) 1885 1886 def tearDown(self): 1887 SysLogHandlerTest.tearDown(self) 1888 support.unlink(self.address) 1889 1890@unittest.skipUnless(support.IPV6_ENABLED, 1891 'IPv6 support required for this test.') 1892class IPv6SysLogHandlerTest(SysLogHandlerTest): 1893 1894 """Test for SysLogHandler with IPv6 host.""" 1895 1896 server_class = TestUDPServer 1897 address = ('::1', 0) 1898 1899 def setUp(self): 1900 self.server_class.address_family = socket.AF_INET6 1901 super(IPv6SysLogHandlerTest, self).setUp() 1902 1903 def tearDown(self): 1904 self.server_class.address_family = socket.AF_INET 1905 super(IPv6SysLogHandlerTest, self).tearDown() 1906 1907class HTTPHandlerTest(BaseTest): 1908 """Test for HTTPHandler.""" 1909 1910 def setUp(self): 1911 """Set up an HTTP server to receive log messages, and a HTTPHandler 1912 pointing to that server's address and port.""" 1913 BaseTest.setUp(self) 1914 self.handled = threading.Event() 1915 1916 def handle_request(self, request): 1917 self.command = request.command 1918 self.log_data = urlparse(request.path) 1919 if self.command == 'POST': 1920 try: 1921 rlen = int(request.headers['Content-Length']) 1922 self.post_data = request.rfile.read(rlen) 1923 except: 1924 self.post_data = None 1925 request.send_response(200) 1926 request.end_headers() 1927 self.handled.set() 1928 1929 def test_output(self): 1930 # The log message sent to the HTTPHandler is properly received. 1931 logger = logging.getLogger("http") 1932 root_logger = self.root_logger 1933 root_logger.removeHandler(self.root_logger.handlers[0]) 1934 for secure in (False, True): 1935 addr = ('localhost', 0) 1936 if secure: 1937 try: 1938 import ssl 1939 except ImportError: 1940 sslctx = None 1941 else: 1942 here = os.path.dirname(__file__) 1943 localhost_cert = os.path.join(here, "keycert.pem") 1944 sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 1945 sslctx.load_cert_chain(localhost_cert) 1946 1947 context = ssl.create_default_context(cafile=localhost_cert) 1948 else: 1949 sslctx = None 1950 context = None 1951 self.server = server = TestHTTPServer(addr, self.handle_request, 1952 0.01, sslctx=sslctx) 1953 server.start() 1954 server.ready.wait() 1955 host = 'localhost:%d' % server.server_port 1956 secure_client = secure and sslctx 1957 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 1958 secure=secure_client, 1959 context=context, 1960 credentials=('foo', 'bar')) 1961 self.log_data = None 1962 root_logger.addHandler(self.h_hdlr) 1963 1964 for method in ('GET', 'POST'): 1965 self.h_hdlr.method = method 1966 self.handled.clear() 1967 msg = "sp\xe4m" 1968 logger.error(msg) 1969 self.handled.wait() 1970 self.assertEqual(self.log_data.path, '/frob') 1971 self.assertEqual(self.command, method) 1972 if method == 'GET': 1973 d = parse_qs(self.log_data.query) 1974 else: 1975 d = parse_qs(self.post_data.decode('utf-8')) 1976 self.assertEqual(d['name'], ['http']) 1977 self.assertEqual(d['funcName'], ['test_output']) 1978 self.assertEqual(d['msg'], [msg]) 1979 1980 self.server.stop(2.0) 1981 self.root_logger.removeHandler(self.h_hdlr) 1982 self.h_hdlr.close() 1983 1984class MemoryTest(BaseTest): 1985 1986 """Test memory persistence of logger objects.""" 1987 1988 def setUp(self): 1989 """Create a dict to remember potentially destroyed objects.""" 1990 BaseTest.setUp(self) 1991 self._survivors = {} 1992 1993 def _watch_for_survival(self, *args): 1994 """Watch the given objects for survival, by creating weakrefs to 1995 them.""" 1996 for obj in args: 1997 key = id(obj), repr(obj) 1998 self._survivors[key] = weakref.ref(obj) 1999 2000 def _assertTruesurvival(self): 2001 """Assert that all objects watched for survival have survived.""" 2002 # Trigger cycle breaking. 2003 gc.collect() 2004 dead = [] 2005 for (id_, repr_), ref in self._survivors.items(): 2006 if ref() is None: 2007 dead.append(repr_) 2008 if dead: 2009 self.fail("%d objects should have survived " 2010 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 2011 2012 def test_persistent_loggers(self): 2013 # Logger objects are persistent and retain their configuration, even 2014 # if visible references are destroyed. 2015 self.root_logger.setLevel(logging.INFO) 2016 foo = logging.getLogger("foo") 2017 self._watch_for_survival(foo) 2018 foo.setLevel(logging.DEBUG) 2019 self.root_logger.debug(self.next_message()) 2020 foo.debug(self.next_message()) 2021 self.assert_log_lines([ 2022 ('foo', 'DEBUG', '2'), 2023 ]) 2024 del foo 2025 # foo has survived. 2026 self._assertTruesurvival() 2027 # foo has retained its settings. 2028 bar = logging.getLogger("foo") 2029 bar.debug(self.next_message()) 2030 self.assert_log_lines([ 2031 ('foo', 'DEBUG', '2'), 2032 ('foo', 'DEBUG', '3'), 2033 ]) 2034 2035 2036class EncodingTest(BaseTest): 2037 def test_encoding_plain_file(self): 2038 # In Python 2.x, a plain file object is treated as having no encoding. 2039 log = logging.getLogger("test") 2040 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 2041 os.close(fd) 2042 # the non-ascii data we write to the log. 2043 data = "foo\x80" 2044 try: 2045 handler = logging.FileHandler(fn, encoding="utf-8") 2046 log.addHandler(handler) 2047 try: 2048 # write non-ascii data to the log. 2049 log.warning(data) 2050 finally: 2051 log.removeHandler(handler) 2052 handler.close() 2053 # check we wrote exactly those bytes, ignoring trailing \n etc 2054 f = open(fn, encoding="utf-8") 2055 try: 2056 self.assertEqual(f.read().rstrip(), data) 2057 finally: 2058 f.close() 2059 finally: 2060 if os.path.isfile(fn): 2061 os.remove(fn) 2062 2063 def test_encoding_cyrillic_unicode(self): 2064 log = logging.getLogger("test") 2065 # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 2066 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 2067 # Ensure it's written in a Cyrillic encoding 2068 writer_class = codecs.getwriter('cp1251') 2069 writer_class.encoding = 'cp1251' 2070 stream = io.BytesIO() 2071 writer = writer_class(stream, 'strict') 2072 handler = logging.StreamHandler(writer) 2073 log.addHandler(handler) 2074 try: 2075 log.warning(message) 2076 finally: 2077 log.removeHandler(handler) 2078 handler.close() 2079 # check we wrote exactly those bytes, ignoring trailing \n etc 2080 s = stream.getvalue() 2081 # Compare against what the data should be when encoded in CP-1251 2082 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 2083 2084 2085class WarningsTest(BaseTest): 2086 2087 def test_warnings(self): 2088 with warnings.catch_warnings(): 2089 logging.captureWarnings(True) 2090 self.addCleanup(logging.captureWarnings, False) 2091 warnings.filterwarnings("always", category=UserWarning) 2092 stream = io.StringIO() 2093 h = logging.StreamHandler(stream) 2094 logger = logging.getLogger("py.warnings") 2095 logger.addHandler(h) 2096 warnings.warn("I'm warning you...") 2097 logger.removeHandler(h) 2098 s = stream.getvalue() 2099 h.close() 2100 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 2101 2102 # See if an explicit file uses the original implementation 2103 a_file = io.StringIO() 2104 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 2105 a_file, "Dummy line") 2106 s = a_file.getvalue() 2107 a_file.close() 2108 self.assertEqual(s, 2109 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 2110 2111 def test_warnings_no_handlers(self): 2112 with warnings.catch_warnings(): 2113 logging.captureWarnings(True) 2114 self.addCleanup(logging.captureWarnings, False) 2115 2116 # confirm our assumption: no loggers are set 2117 logger = logging.getLogger("py.warnings") 2118 self.assertEqual(logger.handlers, []) 2119 2120 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 2121 self.assertEqual(len(logger.handlers), 1) 2122 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 2123 2124 2125def formatFunc(format, datefmt=None): 2126 return logging.Formatter(format, datefmt) 2127 2128def handlerFunc(): 2129 return logging.StreamHandler() 2130 2131class CustomHandler(logging.StreamHandler): 2132 pass 2133 2134class ConfigDictTest(BaseTest): 2135 2136 """Reading logging config from a dictionary.""" 2137 2138 check_no_resource_warning = support.check_no_resource_warning 2139 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 2140 2141 # config0 is a standard configuration. 2142 config0 = { 2143 'version': 1, 2144 'formatters': { 2145 'form1' : { 2146 'format' : '%(levelname)s ++ %(message)s', 2147 }, 2148 }, 2149 'handlers' : { 2150 'hand1' : { 2151 'class' : 'logging.StreamHandler', 2152 'formatter' : 'form1', 2153 'level' : 'NOTSET', 2154 'stream' : 'ext://sys.stdout', 2155 }, 2156 }, 2157 'root' : { 2158 'level' : 'WARNING', 2159 'handlers' : ['hand1'], 2160 }, 2161 } 2162 2163 # config1 adds a little to the standard configuration. 2164 config1 = { 2165 'version': 1, 2166 'formatters': { 2167 'form1' : { 2168 'format' : '%(levelname)s ++ %(message)s', 2169 }, 2170 }, 2171 'handlers' : { 2172 'hand1' : { 2173 'class' : 'logging.StreamHandler', 2174 'formatter' : 'form1', 2175 'level' : 'NOTSET', 2176 'stream' : 'ext://sys.stdout', 2177 }, 2178 }, 2179 'loggers' : { 2180 'compiler.parser' : { 2181 'level' : 'DEBUG', 2182 'handlers' : ['hand1'], 2183 }, 2184 }, 2185 'root' : { 2186 'level' : 'WARNING', 2187 }, 2188 } 2189 2190 # config1a moves the handler to the root. Used with config8a 2191 config1a = { 2192 'version': 1, 2193 'formatters': { 2194 'form1' : { 2195 'format' : '%(levelname)s ++ %(message)s', 2196 }, 2197 }, 2198 'handlers' : { 2199 'hand1' : { 2200 'class' : 'logging.StreamHandler', 2201 'formatter' : 'form1', 2202 'level' : 'NOTSET', 2203 'stream' : 'ext://sys.stdout', 2204 }, 2205 }, 2206 'loggers' : { 2207 'compiler.parser' : { 2208 'level' : 'DEBUG', 2209 }, 2210 }, 2211 'root' : { 2212 'level' : 'WARNING', 2213 'handlers' : ['hand1'], 2214 }, 2215 } 2216 2217 # config2 has a subtle configuration error that should be reported 2218 config2 = { 2219 'version': 1, 2220 'formatters': { 2221 'form1' : { 2222 'format' : '%(levelname)s ++ %(message)s', 2223 }, 2224 }, 2225 'handlers' : { 2226 'hand1' : { 2227 'class' : 'logging.StreamHandler', 2228 'formatter' : 'form1', 2229 'level' : 'NOTSET', 2230 'stream' : 'ext://sys.stdbout', 2231 }, 2232 }, 2233 'loggers' : { 2234 'compiler.parser' : { 2235 'level' : 'DEBUG', 2236 'handlers' : ['hand1'], 2237 }, 2238 }, 2239 'root' : { 2240 'level' : 'WARNING', 2241 }, 2242 } 2243 2244 # As config1 but with a misspelt level on a handler 2245 config2a = { 2246 'version': 1, 2247 'formatters': { 2248 'form1' : { 2249 'format' : '%(levelname)s ++ %(message)s', 2250 }, 2251 }, 2252 'handlers' : { 2253 'hand1' : { 2254 'class' : 'logging.StreamHandler', 2255 'formatter' : 'form1', 2256 'level' : 'NTOSET', 2257 'stream' : 'ext://sys.stdout', 2258 }, 2259 }, 2260 'loggers' : { 2261 'compiler.parser' : { 2262 'level' : 'DEBUG', 2263 'handlers' : ['hand1'], 2264 }, 2265 }, 2266 'root' : { 2267 'level' : 'WARNING', 2268 }, 2269 } 2270 2271 2272 # As config1 but with a misspelt level on a logger 2273 config2b = { 2274 'version': 1, 2275 'formatters': { 2276 'form1' : { 2277 'format' : '%(levelname)s ++ %(message)s', 2278 }, 2279 }, 2280 'handlers' : { 2281 'hand1' : { 2282 'class' : 'logging.StreamHandler', 2283 'formatter' : 'form1', 2284 'level' : 'NOTSET', 2285 'stream' : 'ext://sys.stdout', 2286 }, 2287 }, 2288 'loggers' : { 2289 'compiler.parser' : { 2290 'level' : 'DEBUG', 2291 'handlers' : ['hand1'], 2292 }, 2293 }, 2294 'root' : { 2295 'level' : 'WRANING', 2296 }, 2297 } 2298 2299 # config3 has a less subtle configuration error 2300 config3 = { 2301 'version': 1, 2302 'formatters': { 2303 'form1' : { 2304 'format' : '%(levelname)s ++ %(message)s', 2305 }, 2306 }, 2307 'handlers' : { 2308 'hand1' : { 2309 'class' : 'logging.StreamHandler', 2310 'formatter' : 'misspelled_name', 2311 'level' : 'NOTSET', 2312 'stream' : 'ext://sys.stdout', 2313 }, 2314 }, 2315 'loggers' : { 2316 'compiler.parser' : { 2317 'level' : 'DEBUG', 2318 'handlers' : ['hand1'], 2319 }, 2320 }, 2321 'root' : { 2322 'level' : 'WARNING', 2323 }, 2324 } 2325 2326 # config4 specifies a custom formatter class to be loaded 2327 config4 = { 2328 'version': 1, 2329 'formatters': { 2330 'form1' : { 2331 '()' : __name__ + '.ExceptionFormatter', 2332 'format' : '%(levelname)s:%(name)s:%(message)s', 2333 }, 2334 }, 2335 'handlers' : { 2336 'hand1' : { 2337 'class' : 'logging.StreamHandler', 2338 'formatter' : 'form1', 2339 'level' : 'NOTSET', 2340 'stream' : 'ext://sys.stdout', 2341 }, 2342 }, 2343 'root' : { 2344 'level' : 'NOTSET', 2345 'handlers' : ['hand1'], 2346 }, 2347 } 2348 2349 # As config4 but using an actual callable rather than a string 2350 config4a = { 2351 'version': 1, 2352 'formatters': { 2353 'form1' : { 2354 '()' : ExceptionFormatter, 2355 'format' : '%(levelname)s:%(name)s:%(message)s', 2356 }, 2357 'form2' : { 2358 '()' : __name__ + '.formatFunc', 2359 'format' : '%(levelname)s:%(name)s:%(message)s', 2360 }, 2361 'form3' : { 2362 '()' : formatFunc, 2363 'format' : '%(levelname)s:%(name)s:%(message)s', 2364 }, 2365 }, 2366 'handlers' : { 2367 'hand1' : { 2368 'class' : 'logging.StreamHandler', 2369 'formatter' : 'form1', 2370 'level' : 'NOTSET', 2371 'stream' : 'ext://sys.stdout', 2372 }, 2373 'hand2' : { 2374 '()' : handlerFunc, 2375 }, 2376 }, 2377 'root' : { 2378 'level' : 'NOTSET', 2379 'handlers' : ['hand1'], 2380 }, 2381 } 2382 2383 # config5 specifies a custom handler class to be loaded 2384 config5 = { 2385 'version': 1, 2386 'formatters': { 2387 'form1' : { 2388 'format' : '%(levelname)s ++ %(message)s', 2389 }, 2390 }, 2391 'handlers' : { 2392 'hand1' : { 2393 'class' : __name__ + '.CustomHandler', 2394 'formatter' : 'form1', 2395 'level' : 'NOTSET', 2396 'stream' : 'ext://sys.stdout', 2397 }, 2398 }, 2399 'loggers' : { 2400 'compiler.parser' : { 2401 'level' : 'DEBUG', 2402 'handlers' : ['hand1'], 2403 }, 2404 }, 2405 'root' : { 2406 'level' : 'WARNING', 2407 }, 2408 } 2409 2410 # config6 specifies a custom handler class to be loaded 2411 # but has bad arguments 2412 config6 = { 2413 'version': 1, 2414 'formatters': { 2415 'form1' : { 2416 'format' : '%(levelname)s ++ %(message)s', 2417 }, 2418 }, 2419 'handlers' : { 2420 'hand1' : { 2421 'class' : __name__ + '.CustomHandler', 2422 'formatter' : 'form1', 2423 'level' : 'NOTSET', 2424 'stream' : 'ext://sys.stdout', 2425 '9' : 'invalid parameter name', 2426 }, 2427 }, 2428 'loggers' : { 2429 'compiler.parser' : { 2430 'level' : 'DEBUG', 2431 'handlers' : ['hand1'], 2432 }, 2433 }, 2434 'root' : { 2435 'level' : 'WARNING', 2436 }, 2437 } 2438 2439 # config 7 does not define compiler.parser but defines compiler.lexer 2440 # so compiler.parser should be disabled after applying it 2441 config7 = { 2442 'version': 1, 2443 'formatters': { 2444 'form1' : { 2445 'format' : '%(levelname)s ++ %(message)s', 2446 }, 2447 }, 2448 'handlers' : { 2449 'hand1' : { 2450 'class' : 'logging.StreamHandler', 2451 'formatter' : 'form1', 2452 'level' : 'NOTSET', 2453 'stream' : 'ext://sys.stdout', 2454 }, 2455 }, 2456 'loggers' : { 2457 'compiler.lexer' : { 2458 'level' : 'DEBUG', 2459 'handlers' : ['hand1'], 2460 }, 2461 }, 2462 'root' : { 2463 'level' : 'WARNING', 2464 }, 2465 } 2466 2467 # config8 defines both compiler and compiler.lexer 2468 # so compiler.parser should not be disabled (since 2469 # compiler is defined) 2470 config8 = { 2471 'version': 1, 2472 'disable_existing_loggers' : False, 2473 'formatters': { 2474 'form1' : { 2475 'format' : '%(levelname)s ++ %(message)s', 2476 }, 2477 }, 2478 'handlers' : { 2479 'hand1' : { 2480 'class' : 'logging.StreamHandler', 2481 'formatter' : 'form1', 2482 'level' : 'NOTSET', 2483 'stream' : 'ext://sys.stdout', 2484 }, 2485 }, 2486 'loggers' : { 2487 'compiler' : { 2488 'level' : 'DEBUG', 2489 'handlers' : ['hand1'], 2490 }, 2491 'compiler.lexer' : { 2492 }, 2493 }, 2494 'root' : { 2495 'level' : 'WARNING', 2496 }, 2497 } 2498 2499 # config8a disables existing loggers 2500 config8a = { 2501 'version': 1, 2502 'disable_existing_loggers' : True, 2503 'formatters': { 2504 'form1' : { 2505 'format' : '%(levelname)s ++ %(message)s', 2506 }, 2507 }, 2508 'handlers' : { 2509 'hand1' : { 2510 'class' : 'logging.StreamHandler', 2511 'formatter' : 'form1', 2512 'level' : 'NOTSET', 2513 'stream' : 'ext://sys.stdout', 2514 }, 2515 }, 2516 'loggers' : { 2517 'compiler' : { 2518 'level' : 'DEBUG', 2519 'handlers' : ['hand1'], 2520 }, 2521 'compiler.lexer' : { 2522 }, 2523 }, 2524 'root' : { 2525 'level' : 'WARNING', 2526 }, 2527 } 2528 2529 config9 = { 2530 'version': 1, 2531 'formatters': { 2532 'form1' : { 2533 'format' : '%(levelname)s ++ %(message)s', 2534 }, 2535 }, 2536 'handlers' : { 2537 'hand1' : { 2538 'class' : 'logging.StreamHandler', 2539 'formatter' : 'form1', 2540 'level' : 'WARNING', 2541 'stream' : 'ext://sys.stdout', 2542 }, 2543 }, 2544 'loggers' : { 2545 'compiler.parser' : { 2546 'level' : 'WARNING', 2547 'handlers' : ['hand1'], 2548 }, 2549 }, 2550 'root' : { 2551 'level' : 'NOTSET', 2552 }, 2553 } 2554 2555 config9a = { 2556 'version': 1, 2557 'incremental' : True, 2558 'handlers' : { 2559 'hand1' : { 2560 'level' : 'WARNING', 2561 }, 2562 }, 2563 'loggers' : { 2564 'compiler.parser' : { 2565 'level' : 'INFO', 2566 }, 2567 }, 2568 } 2569 2570 config9b = { 2571 'version': 1, 2572 'incremental' : True, 2573 'handlers' : { 2574 'hand1' : { 2575 'level' : 'INFO', 2576 }, 2577 }, 2578 'loggers' : { 2579 'compiler.parser' : { 2580 'level' : 'INFO', 2581 }, 2582 }, 2583 } 2584 2585 # As config1 but with a filter added 2586 config10 = { 2587 'version': 1, 2588 'formatters': { 2589 'form1' : { 2590 'format' : '%(levelname)s ++ %(message)s', 2591 }, 2592 }, 2593 'filters' : { 2594 'filt1' : { 2595 'name' : 'compiler.parser', 2596 }, 2597 }, 2598 'handlers' : { 2599 'hand1' : { 2600 'class' : 'logging.StreamHandler', 2601 'formatter' : 'form1', 2602 'level' : 'NOTSET', 2603 'stream' : 'ext://sys.stdout', 2604 'filters' : ['filt1'], 2605 }, 2606 }, 2607 'loggers' : { 2608 'compiler.parser' : { 2609 'level' : 'DEBUG', 2610 'filters' : ['filt1'], 2611 }, 2612 }, 2613 'root' : { 2614 'level' : 'WARNING', 2615 'handlers' : ['hand1'], 2616 }, 2617 } 2618 2619 # As config1 but using cfg:// references 2620 config11 = { 2621 'version': 1, 2622 'true_formatters': { 2623 'form1' : { 2624 'format' : '%(levelname)s ++ %(message)s', 2625 }, 2626 }, 2627 'handler_configs': { 2628 'hand1' : { 2629 'class' : 'logging.StreamHandler', 2630 'formatter' : 'form1', 2631 'level' : 'NOTSET', 2632 'stream' : 'ext://sys.stdout', 2633 }, 2634 }, 2635 'formatters' : 'cfg://true_formatters', 2636 'handlers' : { 2637 'hand1' : 'cfg://handler_configs[hand1]', 2638 }, 2639 'loggers' : { 2640 'compiler.parser' : { 2641 'level' : 'DEBUG', 2642 'handlers' : ['hand1'], 2643 }, 2644 }, 2645 'root' : { 2646 'level' : 'WARNING', 2647 }, 2648 } 2649 2650 # As config11 but missing the version key 2651 config12 = { 2652 'true_formatters': { 2653 'form1' : { 2654 'format' : '%(levelname)s ++ %(message)s', 2655 }, 2656 }, 2657 'handler_configs': { 2658 'hand1' : { 2659 'class' : 'logging.StreamHandler', 2660 'formatter' : 'form1', 2661 'level' : 'NOTSET', 2662 'stream' : 'ext://sys.stdout', 2663 }, 2664 }, 2665 'formatters' : 'cfg://true_formatters', 2666 'handlers' : { 2667 'hand1' : 'cfg://handler_configs[hand1]', 2668 }, 2669 'loggers' : { 2670 'compiler.parser' : { 2671 'level' : 'DEBUG', 2672 'handlers' : ['hand1'], 2673 }, 2674 }, 2675 'root' : { 2676 'level' : 'WARNING', 2677 }, 2678 } 2679 2680 # As config11 but using an unsupported version 2681 config13 = { 2682 'version': 2, 2683 'true_formatters': { 2684 'form1' : { 2685 'format' : '%(levelname)s ++ %(message)s', 2686 }, 2687 }, 2688 'handler_configs': { 2689 'hand1' : { 2690 'class' : 'logging.StreamHandler', 2691 'formatter' : 'form1', 2692 'level' : 'NOTSET', 2693 'stream' : 'ext://sys.stdout', 2694 }, 2695 }, 2696 'formatters' : 'cfg://true_formatters', 2697 'handlers' : { 2698 'hand1' : 'cfg://handler_configs[hand1]', 2699 }, 2700 'loggers' : { 2701 'compiler.parser' : { 2702 'level' : 'DEBUG', 2703 'handlers' : ['hand1'], 2704 }, 2705 }, 2706 'root' : { 2707 'level' : 'WARNING', 2708 }, 2709 } 2710 2711 # As config0, but with properties 2712 config14 = { 2713 'version': 1, 2714 'formatters': { 2715 'form1' : { 2716 'format' : '%(levelname)s ++ %(message)s', 2717 }, 2718 }, 2719 'handlers' : { 2720 'hand1' : { 2721 'class' : 'logging.StreamHandler', 2722 'formatter' : 'form1', 2723 'level' : 'NOTSET', 2724 'stream' : 'ext://sys.stdout', 2725 '.': { 2726 'foo': 'bar', 2727 'terminator': '!\n', 2728 } 2729 }, 2730 }, 2731 'root' : { 2732 'level' : 'WARNING', 2733 'handlers' : ['hand1'], 2734 }, 2735 } 2736 2737 out_of_order = { 2738 "version": 1, 2739 "formatters": { 2740 "mySimpleFormatter": { 2741 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2742 "style": "$" 2743 } 2744 }, 2745 "handlers": { 2746 "fileGlobal": { 2747 "class": "logging.StreamHandler", 2748 "level": "DEBUG", 2749 "formatter": "mySimpleFormatter" 2750 }, 2751 "bufferGlobal": { 2752 "class": "logging.handlers.MemoryHandler", 2753 "capacity": 5, 2754 "formatter": "mySimpleFormatter", 2755 "target": "fileGlobal", 2756 "level": "DEBUG" 2757 } 2758 }, 2759 "loggers": { 2760 "mymodule": { 2761 "level": "DEBUG", 2762 "handlers": ["bufferGlobal"], 2763 "propagate": "true" 2764 } 2765 } 2766 } 2767 2768 def apply_config(self, conf): 2769 logging.config.dictConfig(conf) 2770 2771 def test_config0_ok(self): 2772 # A simple config which overrides the default settings. 2773 with support.captured_stdout() as output: 2774 self.apply_config(self.config0) 2775 logger = logging.getLogger() 2776 # Won't output anything 2777 logger.info(self.next_message()) 2778 # Outputs a message 2779 logger.error(self.next_message()) 2780 self.assert_log_lines([ 2781 ('ERROR', '2'), 2782 ], stream=output) 2783 # Original logger output is empty. 2784 self.assert_log_lines([]) 2785 2786 def test_config1_ok(self, config=config1): 2787 # A config defining a sub-parser as well. 2788 with support.captured_stdout() as output: 2789 self.apply_config(config) 2790 logger = logging.getLogger("compiler.parser") 2791 # Both will output a message 2792 logger.info(self.next_message()) 2793 logger.error(self.next_message()) 2794 self.assert_log_lines([ 2795 ('INFO', '1'), 2796 ('ERROR', '2'), 2797 ], stream=output) 2798 # Original logger output is empty. 2799 self.assert_log_lines([]) 2800 2801 def test_config2_failure(self): 2802 # A simple config which overrides the default settings. 2803 self.assertRaises(Exception, self.apply_config, self.config2) 2804 2805 def test_config2a_failure(self): 2806 # A simple config which overrides the default settings. 2807 self.assertRaises(Exception, self.apply_config, self.config2a) 2808 2809 def test_config2b_failure(self): 2810 # A simple config which overrides the default settings. 2811 self.assertRaises(Exception, self.apply_config, self.config2b) 2812 2813 def test_config3_failure(self): 2814 # A simple config which overrides the default settings. 2815 self.assertRaises(Exception, self.apply_config, self.config3) 2816 2817 def test_config4_ok(self): 2818 # A config specifying a custom formatter class. 2819 with support.captured_stdout() as output: 2820 self.apply_config(self.config4) 2821 #logger = logging.getLogger() 2822 try: 2823 raise RuntimeError() 2824 except RuntimeError: 2825 logging.exception("just testing") 2826 sys.stdout.seek(0) 2827 self.assertEqual(output.getvalue(), 2828 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2829 # Original logger output is empty 2830 self.assert_log_lines([]) 2831 2832 def test_config4a_ok(self): 2833 # A config specifying a custom formatter class. 2834 with support.captured_stdout() as output: 2835 self.apply_config(self.config4a) 2836 #logger = logging.getLogger() 2837 try: 2838 raise RuntimeError() 2839 except RuntimeError: 2840 logging.exception("just testing") 2841 sys.stdout.seek(0) 2842 self.assertEqual(output.getvalue(), 2843 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2844 # Original logger output is empty 2845 self.assert_log_lines([]) 2846 2847 def test_config5_ok(self): 2848 self.test_config1_ok(config=self.config5) 2849 2850 def test_config6_failure(self): 2851 self.assertRaises(Exception, self.apply_config, self.config6) 2852 2853 def test_config7_ok(self): 2854 with support.captured_stdout() as output: 2855 self.apply_config(self.config1) 2856 logger = logging.getLogger("compiler.parser") 2857 # Both will output a message 2858 logger.info(self.next_message()) 2859 logger.error(self.next_message()) 2860 self.assert_log_lines([ 2861 ('INFO', '1'), 2862 ('ERROR', '2'), 2863 ], stream=output) 2864 # Original logger output is empty. 2865 self.assert_log_lines([]) 2866 with support.captured_stdout() as output: 2867 self.apply_config(self.config7) 2868 logger = logging.getLogger("compiler.parser") 2869 self.assertTrue(logger.disabled) 2870 logger = logging.getLogger("compiler.lexer") 2871 # Both will output a message 2872 logger.info(self.next_message()) 2873 logger.error(self.next_message()) 2874 self.assert_log_lines([ 2875 ('INFO', '3'), 2876 ('ERROR', '4'), 2877 ], stream=output) 2878 # Original logger output is empty. 2879 self.assert_log_lines([]) 2880 2881 # Same as test_config_7_ok but don't disable old loggers. 2882 def test_config_8_ok(self): 2883 with support.captured_stdout() as output: 2884 self.apply_config(self.config1) 2885 logger = logging.getLogger("compiler.parser") 2886 # All will output a message 2887 logger.info(self.next_message()) 2888 logger.error(self.next_message()) 2889 self.assert_log_lines([ 2890 ('INFO', '1'), 2891 ('ERROR', '2'), 2892 ], stream=output) 2893 # Original logger output is empty. 2894 self.assert_log_lines([]) 2895 with support.captured_stdout() as output: 2896 self.apply_config(self.config8) 2897 logger = logging.getLogger("compiler.parser") 2898 self.assertFalse(logger.disabled) 2899 # Both will output a message 2900 logger.info(self.next_message()) 2901 logger.error(self.next_message()) 2902 logger = logging.getLogger("compiler.lexer") 2903 # Both will output a message 2904 logger.info(self.next_message()) 2905 logger.error(self.next_message()) 2906 self.assert_log_lines([ 2907 ('INFO', '3'), 2908 ('ERROR', '4'), 2909 ('INFO', '5'), 2910 ('ERROR', '6'), 2911 ], stream=output) 2912 # Original logger output is empty. 2913 self.assert_log_lines([]) 2914 2915 def test_config_8a_ok(self): 2916 with support.captured_stdout() as output: 2917 self.apply_config(self.config1a) 2918 logger = logging.getLogger("compiler.parser") 2919 # See issue #11424. compiler-hyphenated sorts 2920 # between compiler and compiler.xyz and this 2921 # was preventing compiler.xyz from being included 2922 # in the child loggers of compiler because of an 2923 # overzealous loop termination condition. 2924 hyphenated = logging.getLogger('compiler-hyphenated') 2925 # All will output a message 2926 logger.info(self.next_message()) 2927 logger.error(self.next_message()) 2928 hyphenated.critical(self.next_message()) 2929 self.assert_log_lines([ 2930 ('INFO', '1'), 2931 ('ERROR', '2'), 2932 ('CRITICAL', '3'), 2933 ], stream=output) 2934 # Original logger output is empty. 2935 self.assert_log_lines([]) 2936 with support.captured_stdout() as output: 2937 self.apply_config(self.config8a) 2938 logger = logging.getLogger("compiler.parser") 2939 self.assertFalse(logger.disabled) 2940 # Both will output a message 2941 logger.info(self.next_message()) 2942 logger.error(self.next_message()) 2943 logger = logging.getLogger("compiler.lexer") 2944 # Both will output a message 2945 logger.info(self.next_message()) 2946 logger.error(self.next_message()) 2947 # Will not appear 2948 hyphenated.critical(self.next_message()) 2949 self.assert_log_lines([ 2950 ('INFO', '4'), 2951 ('ERROR', '5'), 2952 ('INFO', '6'), 2953 ('ERROR', '7'), 2954 ], stream=output) 2955 # Original logger output is empty. 2956 self.assert_log_lines([]) 2957 2958 def test_config_9_ok(self): 2959 with support.captured_stdout() as output: 2960 self.apply_config(self.config9) 2961 logger = logging.getLogger("compiler.parser") 2962 # Nothing will be output since both handler and logger are set to WARNING 2963 logger.info(self.next_message()) 2964 self.assert_log_lines([], stream=output) 2965 self.apply_config(self.config9a) 2966 # Nothing will be output since handler is still set to WARNING 2967 logger.info(self.next_message()) 2968 self.assert_log_lines([], stream=output) 2969 self.apply_config(self.config9b) 2970 # Message should now be output 2971 logger.info(self.next_message()) 2972 self.assert_log_lines([ 2973 ('INFO', '3'), 2974 ], stream=output) 2975 2976 def test_config_10_ok(self): 2977 with support.captured_stdout() as output: 2978 self.apply_config(self.config10) 2979 logger = logging.getLogger("compiler.parser") 2980 logger.warning(self.next_message()) 2981 logger = logging.getLogger('compiler') 2982 # Not output, because filtered 2983 logger.warning(self.next_message()) 2984 logger = logging.getLogger('compiler.lexer') 2985 # Not output, because filtered 2986 logger.warning(self.next_message()) 2987 logger = logging.getLogger("compiler.parser.codegen") 2988 # Output, as not filtered 2989 logger.error(self.next_message()) 2990 self.assert_log_lines([ 2991 ('WARNING', '1'), 2992 ('ERROR', '4'), 2993 ], stream=output) 2994 2995 def test_config11_ok(self): 2996 self.test_config1_ok(self.config11) 2997 2998 def test_config12_failure(self): 2999 self.assertRaises(Exception, self.apply_config, self.config12) 3000 3001 def test_config13_failure(self): 3002 self.assertRaises(Exception, self.apply_config, self.config13) 3003 3004 def test_config14_ok(self): 3005 with support.captured_stdout() as output: 3006 self.apply_config(self.config14) 3007 h = logging._handlers['hand1'] 3008 self.assertEqual(h.foo, 'bar') 3009 self.assertEqual(h.terminator, '!\n') 3010 logging.warning('Exclamation') 3011 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 3012 3013 def test_config15_ok(self): 3014 3015 def cleanup(h1, fn): 3016 h1.close() 3017 os.remove(fn) 3018 3019 with self.check_no_resource_warning(): 3020 fd, fn = tempfile.mkstemp(".log", "test_logging-X-") 3021 os.close(fd) 3022 3023 config = { 3024 "version": 1, 3025 "handlers": { 3026 "file": { 3027 "class": "logging.FileHandler", 3028 "filename": fn 3029 } 3030 }, 3031 "root": { 3032 "handlers": ["file"] 3033 } 3034 } 3035 3036 self.apply_config(config) 3037 self.apply_config(config) 3038 3039 handler = logging.root.handlers[0] 3040 self.addCleanup(cleanup, handler, fn) 3041 3042 def setup_via_listener(self, text, verify=None): 3043 text = text.encode("utf-8") 3044 # Ask for a randomly assigned port (by using port 0) 3045 t = logging.config.listen(0, verify) 3046 t.start() 3047 t.ready.wait() 3048 # Now get the port allocated 3049 port = t.port 3050 t.ready.clear() 3051 try: 3052 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3053 sock.settimeout(2.0) 3054 sock.connect(('localhost', port)) 3055 3056 slen = struct.pack('>L', len(text)) 3057 s = slen + text 3058 sentsofar = 0 3059 left = len(s) 3060 while left > 0: 3061 sent = sock.send(s[sentsofar:]) 3062 sentsofar += sent 3063 left -= sent 3064 sock.close() 3065 finally: 3066 t.ready.wait(2.0) 3067 logging.config.stopListening() 3068 support.join_thread(t, 2.0) 3069 3070 def test_listen_config_10_ok(self): 3071 with support.captured_stdout() as output: 3072 self.setup_via_listener(json.dumps(self.config10)) 3073 logger = logging.getLogger("compiler.parser") 3074 logger.warning(self.next_message()) 3075 logger = logging.getLogger('compiler') 3076 # Not output, because filtered 3077 logger.warning(self.next_message()) 3078 logger = logging.getLogger('compiler.lexer') 3079 # Not output, because filtered 3080 logger.warning(self.next_message()) 3081 logger = logging.getLogger("compiler.parser.codegen") 3082 # Output, as not filtered 3083 logger.error(self.next_message()) 3084 self.assert_log_lines([ 3085 ('WARNING', '1'), 3086 ('ERROR', '4'), 3087 ], stream=output) 3088 3089 def test_listen_config_1_ok(self): 3090 with support.captured_stdout() as output: 3091 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 3092 logger = logging.getLogger("compiler.parser") 3093 # Both will output a message 3094 logger.info(self.next_message()) 3095 logger.error(self.next_message()) 3096 self.assert_log_lines([ 3097 ('INFO', '1'), 3098 ('ERROR', '2'), 3099 ], stream=output) 3100 # Original logger output is empty. 3101 self.assert_log_lines([]) 3102 3103 def test_listen_verify(self): 3104 3105 def verify_fail(stuff): 3106 return None 3107 3108 def verify_reverse(stuff): 3109 return stuff[::-1] 3110 3111 logger = logging.getLogger("compiler.parser") 3112 to_send = textwrap.dedent(ConfigFileTest.config1) 3113 # First, specify a verification function that will fail. 3114 # We expect to see no output, since our configuration 3115 # never took effect. 3116 with support.captured_stdout() as output: 3117 self.setup_via_listener(to_send, verify_fail) 3118 # Both will output a message 3119 logger.info(self.next_message()) 3120 logger.error(self.next_message()) 3121 self.assert_log_lines([], stream=output) 3122 # Original logger output has the stuff we logged. 3123 self.assert_log_lines([ 3124 ('INFO', '1'), 3125 ('ERROR', '2'), 3126 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3127 3128 # Now, perform no verification. Our configuration 3129 # should take effect. 3130 3131 with support.captured_stdout() as output: 3132 self.setup_via_listener(to_send) # no verify callable specified 3133 logger = logging.getLogger("compiler.parser") 3134 # Both will output a message 3135 logger.info(self.next_message()) 3136 logger.error(self.next_message()) 3137 self.assert_log_lines([ 3138 ('INFO', '3'), 3139 ('ERROR', '4'), 3140 ], stream=output) 3141 # Original logger output still has the stuff we logged before. 3142 self.assert_log_lines([ 3143 ('INFO', '1'), 3144 ('ERROR', '2'), 3145 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3146 3147 # Now, perform verification which transforms the bytes. 3148 3149 with support.captured_stdout() as output: 3150 self.setup_via_listener(to_send[::-1], verify_reverse) 3151 logger = logging.getLogger("compiler.parser") 3152 # Both will output a message 3153 logger.info(self.next_message()) 3154 logger.error(self.next_message()) 3155 self.assert_log_lines([ 3156 ('INFO', '5'), 3157 ('ERROR', '6'), 3158 ], stream=output) 3159 # Original logger output still has the stuff we logged before. 3160 self.assert_log_lines([ 3161 ('INFO', '1'), 3162 ('ERROR', '2'), 3163 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 3164 3165 def test_out_of_order(self): 3166 self.apply_config(self.out_of_order) 3167 handler = logging.getLogger('mymodule').handlers[0] 3168 self.assertIsInstance(handler.target, logging.Handler) 3169 self.assertIsInstance(handler.formatter._style, 3170 logging.StringTemplateStyle) 3171 3172 def test_baseconfig(self): 3173 d = { 3174 'atuple': (1, 2, 3), 3175 'alist': ['a', 'b', 'c'], 3176 'adict': {'d': 'e', 'f': 3 }, 3177 'nest1': ('g', ('h', 'i'), 'j'), 3178 'nest2': ['k', ['l', 'm'], 'n'], 3179 'nest3': ['o', 'cfg://alist', 'p'], 3180 } 3181 bc = logging.config.BaseConfigurator(d) 3182 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 3183 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 3184 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 3185 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 3186 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 3187 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 3188 v = bc.convert('cfg://nest3') 3189 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 3190 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 3191 self.assertRaises(ValueError, bc.convert, 'cfg://!') 3192 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 3193 3194class ManagerTest(BaseTest): 3195 def test_manager_loggerclass(self): 3196 logged = [] 3197 3198 class MyLogger(logging.Logger): 3199 def _log(self, level, msg, args, exc_info=None, extra=None): 3200 logged.append(msg) 3201 3202 man = logging.Manager(None) 3203 self.assertRaises(TypeError, man.setLoggerClass, int) 3204 man.setLoggerClass(MyLogger) 3205 logger = man.getLogger('test') 3206 logger.warning('should appear in logged') 3207 logging.warning('should not appear in logged') 3208 3209 self.assertEqual(logged, ['should appear in logged']) 3210 3211 def test_set_log_record_factory(self): 3212 man = logging.Manager(None) 3213 expected = object() 3214 man.setLogRecordFactory(expected) 3215 self.assertEqual(man.logRecordFactory, expected) 3216 3217class ChildLoggerTest(BaseTest): 3218 def test_child_loggers(self): 3219 r = logging.getLogger() 3220 l1 = logging.getLogger('abc') 3221 l2 = logging.getLogger('def.ghi') 3222 c1 = r.getChild('xyz') 3223 c2 = r.getChild('uvw.xyz') 3224 self.assertIs(c1, logging.getLogger('xyz')) 3225 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3226 c1 = l1.getChild('def') 3227 c2 = c1.getChild('ghi') 3228 c3 = l1.getChild('def.ghi') 3229 self.assertIs(c1, logging.getLogger('abc.def')) 3230 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3231 self.assertIs(c2, c3) 3232 3233 3234class DerivedLogRecord(logging.LogRecord): 3235 pass 3236 3237class LogRecordFactoryTest(BaseTest): 3238 3239 def setUp(self): 3240 class CheckingFilter(logging.Filter): 3241 def __init__(self, cls): 3242 self.cls = cls 3243 3244 def filter(self, record): 3245 t = type(record) 3246 if t is not self.cls: 3247 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3248 self.cls) 3249 raise TypeError(msg) 3250 return True 3251 3252 BaseTest.setUp(self) 3253 self.filter = CheckingFilter(DerivedLogRecord) 3254 self.root_logger.addFilter(self.filter) 3255 self.orig_factory = logging.getLogRecordFactory() 3256 3257 def tearDown(self): 3258 self.root_logger.removeFilter(self.filter) 3259 BaseTest.tearDown(self) 3260 logging.setLogRecordFactory(self.orig_factory) 3261 3262 def test_logrecord_class(self): 3263 self.assertRaises(TypeError, self.root_logger.warning, 3264 self.next_message()) 3265 logging.setLogRecordFactory(DerivedLogRecord) 3266 self.root_logger.error(self.next_message()) 3267 self.assert_log_lines([ 3268 ('root', 'ERROR', '2'), 3269 ]) 3270 3271 3272class QueueHandlerTest(BaseTest): 3273 # Do not bother with a logger name group. 3274 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3275 3276 def setUp(self): 3277 BaseTest.setUp(self) 3278 self.queue = queue.Queue(-1) 3279 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3280 self.name = 'que' 3281 self.que_logger = logging.getLogger('que') 3282 self.que_logger.propagate = False 3283 self.que_logger.setLevel(logging.WARNING) 3284 self.que_logger.addHandler(self.que_hdlr) 3285 3286 def tearDown(self): 3287 self.que_hdlr.close() 3288 BaseTest.tearDown(self) 3289 3290 def test_queue_handler(self): 3291 self.que_logger.debug(self.next_message()) 3292 self.assertRaises(queue.Empty, self.queue.get_nowait) 3293 self.que_logger.info(self.next_message()) 3294 self.assertRaises(queue.Empty, self.queue.get_nowait) 3295 msg = self.next_message() 3296 self.que_logger.warning(msg) 3297 data = self.queue.get_nowait() 3298 self.assertTrue(isinstance(data, logging.LogRecord)) 3299 self.assertEqual(data.name, self.que_logger.name) 3300 self.assertEqual((data.msg, data.args), (msg, None)) 3301 3302 def test_formatting(self): 3303 msg = self.next_message() 3304 levelname = logging.getLevelName(logging.WARNING) 3305 log_format_str = '{name} -> {levelname}: {message}' 3306 formatted_msg = log_format_str.format(name=self.name, 3307 levelname=levelname, message=msg) 3308 formatter = logging.Formatter(self.log_format) 3309 self.que_hdlr.setFormatter(formatter) 3310 self.que_logger.warning(msg) 3311 log_record = self.queue.get_nowait() 3312 self.assertEqual(formatted_msg, log_record.msg) 3313 self.assertEqual(formatted_msg, log_record.message) 3314 3315 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3316 'logging.handlers.QueueListener required for this test') 3317 def test_queue_listener(self): 3318 handler = support.TestHandler(support.Matcher()) 3319 listener = logging.handlers.QueueListener(self.queue, handler) 3320 listener.start() 3321 try: 3322 self.que_logger.warning(self.next_message()) 3323 self.que_logger.error(self.next_message()) 3324 self.que_logger.critical(self.next_message()) 3325 finally: 3326 listener.stop() 3327 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3328 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3329 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3330 handler.close() 3331 3332 # Now test with respect_handler_level set 3333 3334 handler = support.TestHandler(support.Matcher()) 3335 handler.setLevel(logging.CRITICAL) 3336 listener = logging.handlers.QueueListener(self.queue, handler, 3337 respect_handler_level=True) 3338 listener.start() 3339 try: 3340 self.que_logger.warning(self.next_message()) 3341 self.que_logger.error(self.next_message()) 3342 self.que_logger.critical(self.next_message()) 3343 finally: 3344 listener.stop() 3345 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3346 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3347 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3348 handler.close() 3349 3350 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3351 'logging.handlers.QueueListener required for this test') 3352 def test_queue_listener_with_StreamHandler(self): 3353 # Test that traceback only appends once (bpo-34334). 3354 listener = logging.handlers.QueueListener(self.queue, self.root_hdlr) 3355 listener.start() 3356 try: 3357 1 / 0 3358 except ZeroDivisionError as e: 3359 exc = e 3360 self.que_logger.exception(self.next_message(), exc_info=exc) 3361 listener.stop() 3362 self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1) 3363 3364if hasattr(logging.handlers, 'QueueListener'): 3365 import multiprocessing 3366 from unittest.mock import patch 3367 3368 class QueueListenerTest(BaseTest): 3369 """ 3370 Tests based on patch submitted for issue #27930. Ensure that 3371 QueueListener handles all log messages. 3372 """ 3373 3374 repeat = 20 3375 3376 @staticmethod 3377 def setup_and_log(log_queue, ident): 3378 """ 3379 Creates a logger with a QueueHandler that logs to a queue read by a 3380 QueueListener. Starts the listener, logs five messages, and stops 3381 the listener. 3382 """ 3383 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3384 logger.setLevel(logging.DEBUG) 3385 handler = logging.handlers.QueueHandler(log_queue) 3386 logger.addHandler(handler) 3387 listener = logging.handlers.QueueListener(log_queue) 3388 listener.start() 3389 3390 logger.info('one') 3391 logger.info('two') 3392 logger.info('three') 3393 logger.info('four') 3394 logger.info('five') 3395 3396 listener.stop() 3397 logger.removeHandler(handler) 3398 handler.close() 3399 3400 @patch.object(logging.handlers.QueueListener, 'handle') 3401 def test_handle_called_with_queue_queue(self, mock_handle): 3402 for i in range(self.repeat): 3403 log_queue = queue.Queue() 3404 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3405 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3406 'correct number of handled log messages') 3407 3408 @patch.object(logging.handlers.QueueListener, 'handle') 3409 def test_handle_called_with_mp_queue(self, mock_handle): 3410 # Issue 28668: The multiprocessing (mp) module is not functional 3411 # when the mp.synchronize module cannot be imported. 3412 support.import_module('multiprocessing.synchronize') 3413 for i in range(self.repeat): 3414 log_queue = multiprocessing.Queue() 3415 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3416 log_queue.close() 3417 log_queue.join_thread() 3418 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3419 'correct number of handled log messages') 3420 3421 @staticmethod 3422 def get_all_from_queue(log_queue): 3423 try: 3424 while True: 3425 yield log_queue.get_nowait() 3426 except queue.Empty: 3427 return [] 3428 3429 def test_no_messages_in_queue_after_stop(self): 3430 """ 3431 Five messages are logged then the QueueListener is stopped. This 3432 test then gets everything off the queue. Failure of this test 3433 indicates that messages were not registered on the queue until 3434 _after_ the QueueListener stopped. 3435 """ 3436 # Issue 28668: The multiprocessing (mp) module is not functional 3437 # when the mp.synchronize module cannot be imported. 3438 support.import_module('multiprocessing.synchronize') 3439 for i in range(self.repeat): 3440 queue = multiprocessing.Queue() 3441 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3442 # time.sleep(1) 3443 items = list(self.get_all_from_queue(queue)) 3444 queue.close() 3445 queue.join_thread() 3446 3447 expected = [[], [logging.handlers.QueueListener._sentinel]] 3448 self.assertIn(items, expected, 3449 'Found unexpected messages in queue: %s' % ( 3450 [m.msg if isinstance(m, logging.LogRecord) 3451 else m for m in items])) 3452 3453 3454ZERO = datetime.timedelta(0) 3455 3456class UTC(datetime.tzinfo): 3457 def utcoffset(self, dt): 3458 return ZERO 3459 3460 dst = utcoffset 3461 3462 def tzname(self, dt): 3463 return 'UTC' 3464 3465utc = UTC() 3466 3467class FormatterTest(unittest.TestCase): 3468 def setUp(self): 3469 self.common = { 3470 'name': 'formatter.test', 3471 'level': logging.DEBUG, 3472 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3473 'lineno': 42, 3474 'exc_info': None, 3475 'func': None, 3476 'msg': 'Message with %d %s', 3477 'args': (2, 'placeholders'), 3478 } 3479 self.variants = { 3480 } 3481 3482 def get_record(self, name=None): 3483 result = dict(self.common) 3484 if name is not None: 3485 result.update(self.variants[name]) 3486 return logging.makeLogRecord(result) 3487 3488 def test_percent(self): 3489 # Test %-formatting 3490 r = self.get_record() 3491 f = logging.Formatter('${%(message)s}') 3492 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3493 f = logging.Formatter('%(random)s') 3494 self.assertRaises(KeyError, f.format, r) 3495 self.assertFalse(f.usesTime()) 3496 f = logging.Formatter('%(asctime)s') 3497 self.assertTrue(f.usesTime()) 3498 f = logging.Formatter('%(asctime)-15s') 3499 self.assertTrue(f.usesTime()) 3500 f = logging.Formatter('asctime') 3501 self.assertFalse(f.usesTime()) 3502 3503 def test_braces(self): 3504 # Test {}-formatting 3505 r = self.get_record() 3506 f = logging.Formatter('$%{message}%$', style='{') 3507 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3508 f = logging.Formatter('{random}', style='{') 3509 self.assertRaises(KeyError, f.format, r) 3510 self.assertFalse(f.usesTime()) 3511 f = logging.Formatter('{asctime}', style='{') 3512 self.assertTrue(f.usesTime()) 3513 f = logging.Formatter('{asctime!s:15}', style='{') 3514 self.assertTrue(f.usesTime()) 3515 f = logging.Formatter('{asctime:15}', style='{') 3516 self.assertTrue(f.usesTime()) 3517 f = logging.Formatter('asctime', style='{') 3518 self.assertFalse(f.usesTime()) 3519 3520 def test_dollars(self): 3521 # Test $-formatting 3522 r = self.get_record() 3523 f = logging.Formatter('$message', style='$') 3524 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3525 f = logging.Formatter('$$%${message}%$$', style='$') 3526 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3527 f = logging.Formatter('${random}', style='$') 3528 self.assertRaises(KeyError, f.format, r) 3529 self.assertFalse(f.usesTime()) 3530 f = logging.Formatter('${asctime}', style='$') 3531 self.assertTrue(f.usesTime()) 3532 f = logging.Formatter('${asctime', style='$') 3533 self.assertFalse(f.usesTime()) 3534 f = logging.Formatter('$asctime', style='$') 3535 self.assertTrue(f.usesTime()) 3536 f = logging.Formatter('asctime', style='$') 3537 self.assertFalse(f.usesTime()) 3538 3539 def test_invalid_style(self): 3540 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 3541 3542 def test_time(self): 3543 r = self.get_record() 3544 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 3545 # We use None to indicate we want the local timezone 3546 # We're essentially converting a UTC time to local time 3547 r.created = time.mktime(dt.astimezone(None).timetuple()) 3548 r.msecs = 123 3549 f = logging.Formatter('%(asctime)s %(message)s') 3550 f.converter = time.gmtime 3551 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 3552 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 3553 f.format(r) 3554 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 3555 3556class TestBufferingFormatter(logging.BufferingFormatter): 3557 def formatHeader(self, records): 3558 return '[(%d)' % len(records) 3559 3560 def formatFooter(self, records): 3561 return '(%d)]' % len(records) 3562 3563class BufferingFormatterTest(unittest.TestCase): 3564 def setUp(self): 3565 self.records = [ 3566 logging.makeLogRecord({'msg': 'one'}), 3567 logging.makeLogRecord({'msg': 'two'}), 3568 ] 3569 3570 def test_default(self): 3571 f = logging.BufferingFormatter() 3572 self.assertEqual('', f.format([])) 3573 self.assertEqual('onetwo', f.format(self.records)) 3574 3575 def test_custom(self): 3576 f = TestBufferingFormatter() 3577 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 3578 lf = logging.Formatter('<%(message)s>') 3579 f = TestBufferingFormatter(lf) 3580 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 3581 3582class ExceptionTest(BaseTest): 3583 def test_formatting(self): 3584 r = self.root_logger 3585 h = RecordingHandler() 3586 r.addHandler(h) 3587 try: 3588 raise RuntimeError('deliberate mistake') 3589 except: 3590 logging.exception('failed', stack_info=True) 3591 r.removeHandler(h) 3592 h.close() 3593 r = h.records[0] 3594 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 3595 'call last):\n')) 3596 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 3597 'deliberate mistake')) 3598 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 3599 'call last):\n')) 3600 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 3601 'stack_info=True)')) 3602 3603 3604class LastResortTest(BaseTest): 3605 def test_last_resort(self): 3606 # Test the last resort handler 3607 root = self.root_logger 3608 root.removeHandler(self.root_hdlr) 3609 old_lastresort = logging.lastResort 3610 old_raise_exceptions = logging.raiseExceptions 3611 3612 try: 3613 with support.captured_stderr() as stderr: 3614 root.debug('This should not appear') 3615 self.assertEqual(stderr.getvalue(), '') 3616 root.warning('Final chance!') 3617 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 3618 3619 # No handlers and no last resort, so 'No handlers' message 3620 logging.lastResort = None 3621 with support.captured_stderr() as stderr: 3622 root.warning('Final chance!') 3623 msg = 'No handlers could be found for logger "root"\n' 3624 self.assertEqual(stderr.getvalue(), msg) 3625 3626 # 'No handlers' message only printed once 3627 with support.captured_stderr() as stderr: 3628 root.warning('Final chance!') 3629 self.assertEqual(stderr.getvalue(), '') 3630 3631 # If raiseExceptions is False, no message is printed 3632 root.manager.emittedNoHandlerWarning = False 3633 logging.raiseExceptions = False 3634 with support.captured_stderr() as stderr: 3635 root.warning('Final chance!') 3636 self.assertEqual(stderr.getvalue(), '') 3637 finally: 3638 root.addHandler(self.root_hdlr) 3639 logging.lastResort = old_lastresort 3640 logging.raiseExceptions = old_raise_exceptions 3641 3642 3643class FakeHandler: 3644 3645 def __init__(self, identifier, called): 3646 for method in ('acquire', 'flush', 'close', 'release'): 3647 setattr(self, method, self.record_call(identifier, method, called)) 3648 3649 def record_call(self, identifier, method_name, called): 3650 def inner(): 3651 called.append('{} - {}'.format(identifier, method_name)) 3652 return inner 3653 3654 3655class RecordingHandler(logging.NullHandler): 3656 3657 def __init__(self, *args, **kwargs): 3658 super(RecordingHandler, self).__init__(*args, **kwargs) 3659 self.records = [] 3660 3661 def handle(self, record): 3662 """Keep track of all the emitted records.""" 3663 self.records.append(record) 3664 3665 3666class ShutdownTest(BaseTest): 3667 3668 """Test suite for the shutdown method.""" 3669 3670 def setUp(self): 3671 super(ShutdownTest, self).setUp() 3672 self.called = [] 3673 3674 raise_exceptions = logging.raiseExceptions 3675 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 3676 3677 def raise_error(self, error): 3678 def inner(): 3679 raise error() 3680 return inner 3681 3682 def test_no_failure(self): 3683 # create some fake handlers 3684 handler0 = FakeHandler(0, self.called) 3685 handler1 = FakeHandler(1, self.called) 3686 handler2 = FakeHandler(2, self.called) 3687 3688 # create live weakref to those handlers 3689 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 3690 3691 logging.shutdown(handlerList=list(handlers)) 3692 3693 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 3694 '1 - acquire', '1 - flush', '1 - close', '1 - release', 3695 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 3696 self.assertEqual(expected, self.called) 3697 3698 def _test_with_failure_in_method(self, method, error): 3699 handler = FakeHandler(0, self.called) 3700 setattr(handler, method, self.raise_error(error)) 3701 handlers = [logging.weakref.ref(handler)] 3702 3703 logging.shutdown(handlerList=list(handlers)) 3704 3705 self.assertEqual('0 - release', self.called[-1]) 3706 3707 def test_with_ioerror_in_acquire(self): 3708 self._test_with_failure_in_method('acquire', OSError) 3709 3710 def test_with_ioerror_in_flush(self): 3711 self._test_with_failure_in_method('flush', OSError) 3712 3713 def test_with_ioerror_in_close(self): 3714 self._test_with_failure_in_method('close', OSError) 3715 3716 def test_with_valueerror_in_acquire(self): 3717 self._test_with_failure_in_method('acquire', ValueError) 3718 3719 def test_with_valueerror_in_flush(self): 3720 self._test_with_failure_in_method('flush', ValueError) 3721 3722 def test_with_valueerror_in_close(self): 3723 self._test_with_failure_in_method('close', ValueError) 3724 3725 def test_with_other_error_in_acquire_without_raise(self): 3726 logging.raiseExceptions = False 3727 self._test_with_failure_in_method('acquire', IndexError) 3728 3729 def test_with_other_error_in_flush_without_raise(self): 3730 logging.raiseExceptions = False 3731 self._test_with_failure_in_method('flush', IndexError) 3732 3733 def test_with_other_error_in_close_without_raise(self): 3734 logging.raiseExceptions = False 3735 self._test_with_failure_in_method('close', IndexError) 3736 3737 def test_with_other_error_in_acquire_with_raise(self): 3738 logging.raiseExceptions = True 3739 self.assertRaises(IndexError, self._test_with_failure_in_method, 3740 'acquire', IndexError) 3741 3742 def test_with_other_error_in_flush_with_raise(self): 3743 logging.raiseExceptions = True 3744 self.assertRaises(IndexError, self._test_with_failure_in_method, 3745 'flush', IndexError) 3746 3747 def test_with_other_error_in_close_with_raise(self): 3748 logging.raiseExceptions = True 3749 self.assertRaises(IndexError, self._test_with_failure_in_method, 3750 'close', IndexError) 3751 3752 3753class ModuleLevelMiscTest(BaseTest): 3754 3755 """Test suite for some module level methods.""" 3756 3757 def test_disable(self): 3758 old_disable = logging.root.manager.disable 3759 # confirm our assumptions are correct 3760 self.assertEqual(old_disable, 0) 3761 self.addCleanup(logging.disable, old_disable) 3762 3763 logging.disable(83) 3764 self.assertEqual(logging.root.manager.disable, 83) 3765 3766 # test the default value introduced in 3.7 3767 # (Issue #28524) 3768 logging.disable() 3769 self.assertEqual(logging.root.manager.disable, logging.CRITICAL) 3770 3771 def _test_log(self, method, level=None): 3772 called = [] 3773 support.patch(self, logging, 'basicConfig', 3774 lambda *a, **kw: called.append((a, kw))) 3775 3776 recording = RecordingHandler() 3777 logging.root.addHandler(recording) 3778 3779 log_method = getattr(logging, method) 3780 if level is not None: 3781 log_method(level, "test me: %r", recording) 3782 else: 3783 log_method("test me: %r", recording) 3784 3785 self.assertEqual(len(recording.records), 1) 3786 record = recording.records[0] 3787 self.assertEqual(record.getMessage(), "test me: %r" % recording) 3788 3789 expected_level = level if level is not None else getattr(logging, method.upper()) 3790 self.assertEqual(record.levelno, expected_level) 3791 3792 # basicConfig was not called! 3793 self.assertEqual(called, []) 3794 3795 def test_log(self): 3796 self._test_log('log', logging.ERROR) 3797 3798 def test_debug(self): 3799 self._test_log('debug') 3800 3801 def test_info(self): 3802 self._test_log('info') 3803 3804 def test_warning(self): 3805 self._test_log('warning') 3806 3807 def test_error(self): 3808 self._test_log('error') 3809 3810 def test_critical(self): 3811 self._test_log('critical') 3812 3813 def test_set_logger_class(self): 3814 self.assertRaises(TypeError, logging.setLoggerClass, object) 3815 3816 class MyLogger(logging.Logger): 3817 pass 3818 3819 logging.setLoggerClass(MyLogger) 3820 self.assertEqual(logging.getLoggerClass(), MyLogger) 3821 3822 logging.setLoggerClass(logging.Logger) 3823 self.assertEqual(logging.getLoggerClass(), logging.Logger) 3824 3825 @support.requires_type_collecting 3826 def test_logging_at_shutdown(self): 3827 # Issue #20037 3828 code = """if 1: 3829 import logging 3830 3831 class A: 3832 def __del__(self): 3833 try: 3834 raise ValueError("some error") 3835 except Exception: 3836 logging.exception("exception in __del__") 3837 3838 a = A()""" 3839 rc, out, err = assert_python_ok("-c", code) 3840 err = err.decode() 3841 self.assertIn("exception in __del__", err) 3842 self.assertIn("ValueError: some error", err) 3843 3844 3845class LogRecordTest(BaseTest): 3846 def test_str_rep(self): 3847 r = logging.makeLogRecord({}) 3848 s = str(r) 3849 self.assertTrue(s.startswith('<LogRecord: ')) 3850 self.assertTrue(s.endswith('>')) 3851 3852 def test_dict_arg(self): 3853 h = RecordingHandler() 3854 r = logging.getLogger() 3855 r.addHandler(h) 3856 d = {'less' : 'more' } 3857 logging.warning('less is %(less)s', d) 3858 self.assertIs(h.records[0].args, d) 3859 self.assertEqual(h.records[0].message, 'less is more') 3860 r.removeHandler(h) 3861 h.close() 3862 3863 def test_multiprocessing(self): 3864 r = logging.makeLogRecord({}) 3865 self.assertEqual(r.processName, 'MainProcess') 3866 try: 3867 import multiprocessing as mp 3868 r = logging.makeLogRecord({}) 3869 self.assertEqual(r.processName, mp.current_process().name) 3870 except ImportError: 3871 pass 3872 3873 def test_optional(self): 3874 r = logging.makeLogRecord({}) 3875 NOT_NONE = self.assertIsNotNone 3876 NOT_NONE(r.thread) 3877 NOT_NONE(r.threadName) 3878 NOT_NONE(r.process) 3879 NOT_NONE(r.processName) 3880 log_threads = logging.logThreads 3881 log_processes = logging.logProcesses 3882 log_multiprocessing = logging.logMultiprocessing 3883 try: 3884 logging.logThreads = False 3885 logging.logProcesses = False 3886 logging.logMultiprocessing = False 3887 r = logging.makeLogRecord({}) 3888 NONE = self.assertIsNone 3889 NONE(r.thread) 3890 NONE(r.threadName) 3891 NONE(r.process) 3892 NONE(r.processName) 3893 finally: 3894 logging.logThreads = log_threads 3895 logging.logProcesses = log_processes 3896 logging.logMultiprocessing = log_multiprocessing 3897 3898class BasicConfigTest(unittest.TestCase): 3899 3900 """Test suite for logging.basicConfig.""" 3901 3902 def setUp(self): 3903 super(BasicConfigTest, self).setUp() 3904 self.handlers = logging.root.handlers 3905 self.saved_handlers = logging._handlers.copy() 3906 self.saved_handler_list = logging._handlerList[:] 3907 self.original_logging_level = logging.root.level 3908 self.addCleanup(self.cleanup) 3909 logging.root.handlers = [] 3910 3911 def tearDown(self): 3912 for h in logging.root.handlers[:]: 3913 logging.root.removeHandler(h) 3914 h.close() 3915 super(BasicConfigTest, self).tearDown() 3916 3917 def cleanup(self): 3918 setattr(logging.root, 'handlers', self.handlers) 3919 logging._handlers.clear() 3920 logging._handlers.update(self.saved_handlers) 3921 logging._handlerList[:] = self.saved_handler_list 3922 logging.root.level = self.original_logging_level 3923 3924 def test_no_kwargs(self): 3925 logging.basicConfig() 3926 3927 # handler defaults to a StreamHandler to sys.stderr 3928 self.assertEqual(len(logging.root.handlers), 1) 3929 handler = logging.root.handlers[0] 3930 self.assertIsInstance(handler, logging.StreamHandler) 3931 self.assertEqual(handler.stream, sys.stderr) 3932 3933 formatter = handler.formatter 3934 # format defaults to logging.BASIC_FORMAT 3935 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 3936 # datefmt defaults to None 3937 self.assertIsNone(formatter.datefmt) 3938 # style defaults to % 3939 self.assertIsInstance(formatter._style, logging.PercentStyle) 3940 3941 # level is not explicitly set 3942 self.assertEqual(logging.root.level, self.original_logging_level) 3943 3944 def test_strformatstyle(self): 3945 with support.captured_stdout() as output: 3946 logging.basicConfig(stream=sys.stdout, style="{") 3947 logging.error("Log an error") 3948 sys.stdout.seek(0) 3949 self.assertEqual(output.getvalue().strip(), 3950 "ERROR:root:Log an error") 3951 3952 def test_stringtemplatestyle(self): 3953 with support.captured_stdout() as output: 3954 logging.basicConfig(stream=sys.stdout, style="$") 3955 logging.error("Log an error") 3956 sys.stdout.seek(0) 3957 self.assertEqual(output.getvalue().strip(), 3958 "ERROR:root:Log an error") 3959 3960 def test_filename(self): 3961 3962 def cleanup(h1, h2, fn): 3963 h1.close() 3964 h2.close() 3965 os.remove(fn) 3966 3967 logging.basicConfig(filename='test.log') 3968 3969 self.assertEqual(len(logging.root.handlers), 1) 3970 handler = logging.root.handlers[0] 3971 self.assertIsInstance(handler, logging.FileHandler) 3972 3973 expected = logging.FileHandler('test.log', 'a') 3974 self.assertEqual(handler.stream.mode, expected.stream.mode) 3975 self.assertEqual(handler.stream.name, expected.stream.name) 3976 self.addCleanup(cleanup, handler, expected, 'test.log') 3977 3978 def test_filemode(self): 3979 3980 def cleanup(h1, h2, fn): 3981 h1.close() 3982 h2.close() 3983 os.remove(fn) 3984 3985 logging.basicConfig(filename='test.log', filemode='wb') 3986 3987 handler = logging.root.handlers[0] 3988 expected = logging.FileHandler('test.log', 'wb') 3989 self.assertEqual(handler.stream.mode, expected.stream.mode) 3990 self.addCleanup(cleanup, handler, expected, 'test.log') 3991 3992 def test_stream(self): 3993 stream = io.StringIO() 3994 self.addCleanup(stream.close) 3995 logging.basicConfig(stream=stream) 3996 3997 self.assertEqual(len(logging.root.handlers), 1) 3998 handler = logging.root.handlers[0] 3999 self.assertIsInstance(handler, logging.StreamHandler) 4000 self.assertEqual(handler.stream, stream) 4001 4002 def test_format(self): 4003 logging.basicConfig(format='foo') 4004 4005 formatter = logging.root.handlers[0].formatter 4006 self.assertEqual(formatter._style._fmt, 'foo') 4007 4008 def test_datefmt(self): 4009 logging.basicConfig(datefmt='bar') 4010 4011 formatter = logging.root.handlers[0].formatter 4012 self.assertEqual(formatter.datefmt, 'bar') 4013 4014 def test_style(self): 4015 logging.basicConfig(style='$') 4016 4017 formatter = logging.root.handlers[0].formatter 4018 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 4019 4020 def test_level(self): 4021 old_level = logging.root.level 4022 self.addCleanup(logging.root.setLevel, old_level) 4023 4024 logging.basicConfig(level=57) 4025 self.assertEqual(logging.root.level, 57) 4026 # Test that second call has no effect 4027 logging.basicConfig(level=58) 4028 self.assertEqual(logging.root.level, 57) 4029 4030 def test_incompatible(self): 4031 assertRaises = self.assertRaises 4032 handlers = [logging.StreamHandler()] 4033 stream = sys.stderr 4034 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4035 stream=stream) 4036 assertRaises(ValueError, logging.basicConfig, filename='test.log', 4037 handlers=handlers) 4038 assertRaises(ValueError, logging.basicConfig, stream=stream, 4039 handlers=handlers) 4040 # Issue 23207: test for invalid kwargs 4041 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 4042 # Should pop both filename and filemode even if filename is None 4043 logging.basicConfig(filename=None, filemode='a') 4044 4045 def test_handlers(self): 4046 handlers = [ 4047 logging.StreamHandler(), 4048 logging.StreamHandler(sys.stdout), 4049 logging.StreamHandler(), 4050 ] 4051 f = logging.Formatter() 4052 handlers[2].setFormatter(f) 4053 logging.basicConfig(handlers=handlers) 4054 self.assertIs(handlers[0], logging.root.handlers[0]) 4055 self.assertIs(handlers[1], logging.root.handlers[1]) 4056 self.assertIs(handlers[2], logging.root.handlers[2]) 4057 self.assertIsNotNone(handlers[0].formatter) 4058 self.assertIsNotNone(handlers[1].formatter) 4059 self.assertIs(handlers[2].formatter, f) 4060 self.assertIs(handlers[0].formatter, handlers[1].formatter) 4061 4062 def _test_log(self, method, level=None): 4063 # logging.root has no handlers so basicConfig should be called 4064 called = [] 4065 4066 old_basic_config = logging.basicConfig 4067 def my_basic_config(*a, **kw): 4068 old_basic_config() 4069 old_level = logging.root.level 4070 logging.root.setLevel(100) # avoid having messages in stderr 4071 self.addCleanup(logging.root.setLevel, old_level) 4072 called.append((a, kw)) 4073 4074 support.patch(self, logging, 'basicConfig', my_basic_config) 4075 4076 log_method = getattr(logging, method) 4077 if level is not None: 4078 log_method(level, "test me") 4079 else: 4080 log_method("test me") 4081 4082 # basicConfig was called with no arguments 4083 self.assertEqual(called, [((), {})]) 4084 4085 def test_log(self): 4086 self._test_log('log', logging.WARNING) 4087 4088 def test_debug(self): 4089 self._test_log('debug') 4090 4091 def test_info(self): 4092 self._test_log('info') 4093 4094 def test_warning(self): 4095 self._test_log('warning') 4096 4097 def test_error(self): 4098 self._test_log('error') 4099 4100 def test_critical(self): 4101 self._test_log('critical') 4102 4103 4104class LoggerAdapterTest(unittest.TestCase): 4105 def setUp(self): 4106 super(LoggerAdapterTest, self).setUp() 4107 old_handler_list = logging._handlerList[:] 4108 4109 self.recording = RecordingHandler() 4110 self.logger = logging.root 4111 self.logger.addHandler(self.recording) 4112 self.addCleanup(self.logger.removeHandler, self.recording) 4113 self.addCleanup(self.recording.close) 4114 4115 def cleanup(): 4116 logging._handlerList[:] = old_handler_list 4117 4118 self.addCleanup(cleanup) 4119 self.addCleanup(logging.shutdown) 4120 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 4121 4122 def test_exception(self): 4123 msg = 'testing exception: %r' 4124 exc = None 4125 try: 4126 1 / 0 4127 except ZeroDivisionError as e: 4128 exc = e 4129 self.adapter.exception(msg, self.recording) 4130 4131 self.assertEqual(len(self.recording.records), 1) 4132 record = self.recording.records[0] 4133 self.assertEqual(record.levelno, logging.ERROR) 4134 self.assertEqual(record.msg, msg) 4135 self.assertEqual(record.args, (self.recording,)) 4136 self.assertEqual(record.exc_info, 4137 (exc.__class__, exc, exc.__traceback__)) 4138 4139 def test_exception_excinfo(self): 4140 try: 4141 1 / 0 4142 except ZeroDivisionError as e: 4143 exc = e 4144 4145 self.adapter.exception('exc_info test', exc_info=exc) 4146 4147 self.assertEqual(len(self.recording.records), 1) 4148 record = self.recording.records[0] 4149 self.assertEqual(record.exc_info, 4150 (exc.__class__, exc, exc.__traceback__)) 4151 4152 def test_critical(self): 4153 msg = 'critical test! %r' 4154 self.adapter.critical(msg, self.recording) 4155 4156 self.assertEqual(len(self.recording.records), 1) 4157 record = self.recording.records[0] 4158 self.assertEqual(record.levelno, logging.CRITICAL) 4159 self.assertEqual(record.msg, msg) 4160 self.assertEqual(record.args, (self.recording,)) 4161 4162 def test_is_enabled_for(self): 4163 old_disable = self.adapter.logger.manager.disable 4164 self.adapter.logger.manager.disable = 33 4165 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 4166 old_disable) 4167 self.assertFalse(self.adapter.isEnabledFor(32)) 4168 4169 def test_has_handlers(self): 4170 self.assertTrue(self.adapter.hasHandlers()) 4171 4172 for handler in self.logger.handlers: 4173 self.logger.removeHandler(handler) 4174 4175 self.assertFalse(self.logger.hasHandlers()) 4176 self.assertFalse(self.adapter.hasHandlers()) 4177 4178 def test_nested(self): 4179 class Adapter(logging.LoggerAdapter): 4180 prefix = 'Adapter' 4181 4182 def process(self, msg, kwargs): 4183 return f"{self.prefix} {msg}", kwargs 4184 4185 msg = 'Adapters can be nested, yo.' 4186 adapter = Adapter(logger=self.logger, extra=None) 4187 adapter_adapter = Adapter(logger=adapter, extra=None) 4188 adapter_adapter.prefix = 'AdapterAdapter' 4189 self.assertEqual(repr(adapter), repr(adapter_adapter)) 4190 adapter_adapter.log(logging.CRITICAL, msg, self.recording) 4191 self.assertEqual(len(self.recording.records), 1) 4192 record = self.recording.records[0] 4193 self.assertEqual(record.levelno, logging.CRITICAL) 4194 self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}") 4195 self.assertEqual(record.args, (self.recording,)) 4196 orig_manager = adapter_adapter.manager 4197 self.assertIs(adapter.manager, orig_manager) 4198 self.assertIs(self.logger.manager, orig_manager) 4199 temp_manager = object() 4200 try: 4201 adapter_adapter.manager = temp_manager 4202 self.assertIs(adapter_adapter.manager, temp_manager) 4203 self.assertIs(adapter.manager, temp_manager) 4204 self.assertIs(self.logger.manager, temp_manager) 4205 finally: 4206 adapter_adapter.manager = orig_manager 4207 self.assertIs(adapter_adapter.manager, orig_manager) 4208 self.assertIs(adapter.manager, orig_manager) 4209 self.assertIs(self.logger.manager, orig_manager) 4210 4211 4212class LoggerTest(BaseTest): 4213 4214 def setUp(self): 4215 super(LoggerTest, self).setUp() 4216 self.recording = RecordingHandler() 4217 self.logger = logging.Logger(name='blah') 4218 self.logger.addHandler(self.recording) 4219 self.addCleanup(self.logger.removeHandler, self.recording) 4220 self.addCleanup(self.recording.close) 4221 self.addCleanup(logging.shutdown) 4222 4223 def test_set_invalid_level(self): 4224 self.assertRaises(TypeError, self.logger.setLevel, object()) 4225 4226 def test_exception(self): 4227 msg = 'testing exception: %r' 4228 exc = None 4229 try: 4230 1 / 0 4231 except ZeroDivisionError as e: 4232 exc = e 4233 self.logger.exception(msg, self.recording) 4234 4235 self.assertEqual(len(self.recording.records), 1) 4236 record = self.recording.records[0] 4237 self.assertEqual(record.levelno, logging.ERROR) 4238 self.assertEqual(record.msg, msg) 4239 self.assertEqual(record.args, (self.recording,)) 4240 self.assertEqual(record.exc_info, 4241 (exc.__class__, exc, exc.__traceback__)) 4242 4243 def test_log_invalid_level_with_raise(self): 4244 with support.swap_attr(logging, 'raiseExceptions', True): 4245 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 4246 4247 def test_log_invalid_level_no_raise(self): 4248 with support.swap_attr(logging, 'raiseExceptions', False): 4249 self.logger.log('10', 'test message') # no exception happens 4250 4251 def test_find_caller_with_stack_info(self): 4252 called = [] 4253 support.patch(self, logging.traceback, 'print_stack', 4254 lambda f, file: called.append(file.getvalue())) 4255 4256 self.logger.findCaller(stack_info=True) 4257 4258 self.assertEqual(len(called), 1) 4259 self.assertEqual('Stack (most recent call last):\n', called[0]) 4260 4261 def test_make_record_with_extra_overwrite(self): 4262 name = 'my record' 4263 level = 13 4264 fn = lno = msg = args = exc_info = func = sinfo = None 4265 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 4266 exc_info, func, sinfo) 4267 4268 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 4269 extra = {key: 'some value'} 4270 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 4271 fn, lno, msg, args, exc_info, 4272 extra=extra, sinfo=sinfo) 4273 4274 def test_make_record_with_extra_no_overwrite(self): 4275 name = 'my record' 4276 level = 13 4277 fn = lno = msg = args = exc_info = func = sinfo = None 4278 extra = {'valid_key': 'some value'} 4279 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 4280 exc_info, extra=extra, sinfo=sinfo) 4281 self.assertIn('valid_key', result.__dict__) 4282 4283 def test_has_handlers(self): 4284 self.assertTrue(self.logger.hasHandlers()) 4285 4286 for handler in self.logger.handlers: 4287 self.logger.removeHandler(handler) 4288 self.assertFalse(self.logger.hasHandlers()) 4289 4290 def test_has_handlers_no_propagate(self): 4291 child_logger = logging.getLogger('blah.child') 4292 child_logger.propagate = False 4293 self.assertFalse(child_logger.hasHandlers()) 4294 4295 def test_is_enabled_for(self): 4296 old_disable = self.logger.manager.disable 4297 self.logger.manager.disable = 23 4298 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4299 self.assertFalse(self.logger.isEnabledFor(22)) 4300 4301 def test_root_logger_aliases(self): 4302 root = logging.getLogger() 4303 self.assertIs(root, logging.root) 4304 self.assertIs(root, logging.getLogger(None)) 4305 self.assertIs(root, logging.getLogger('')) 4306 self.assertIs(root, logging.getLogger('foo').root) 4307 self.assertIs(root, logging.getLogger('foo.bar').root) 4308 self.assertIs(root, logging.getLogger('foo').parent) 4309 4310 self.assertIsNot(root, logging.getLogger('\0')) 4311 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 4312 4313 def test_invalid_names(self): 4314 self.assertRaises(TypeError, logging.getLogger, any) 4315 self.assertRaises(TypeError, logging.getLogger, b'foo') 4316 4317 def test_pickling(self): 4318 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 4319 for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'): 4320 logger = logging.getLogger(name) 4321 s = pickle.dumps(logger, proto) 4322 unpickled = pickle.loads(s) 4323 self.assertIs(unpickled, logger) 4324 4325 def test_caching(self): 4326 root = self.root_logger 4327 logger1 = logging.getLogger("abc") 4328 logger2 = logging.getLogger("abc.def") 4329 4330 # Set root logger level and ensure cache is empty 4331 root.setLevel(logging.ERROR) 4332 self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR) 4333 self.assertEqual(logger2._cache, {}) 4334 4335 # Ensure cache is populated and calls are consistent 4336 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4337 self.assertFalse(logger2.isEnabledFor(logging.DEBUG)) 4338 self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False}) 4339 self.assertEqual(root._cache, {}) 4340 self.assertTrue(logger2.isEnabledFor(logging.ERROR)) 4341 4342 # Ensure root cache gets populated 4343 self.assertEqual(root._cache, {}) 4344 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4345 self.assertEqual(root._cache, {logging.ERROR: True}) 4346 4347 # Set parent logger level and ensure caches are emptied 4348 logger1.setLevel(logging.CRITICAL) 4349 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4350 self.assertEqual(logger2._cache, {}) 4351 4352 # Ensure logger2 uses parent logger's effective level 4353 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4354 4355 # Set level to NOTSET and ensure caches are empty 4356 logger2.setLevel(logging.NOTSET) 4357 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4358 self.assertEqual(logger2._cache, {}) 4359 self.assertEqual(logger1._cache, {}) 4360 self.assertEqual(root._cache, {}) 4361 4362 # Verify logger2 follows parent and not root 4363 self.assertFalse(logger2.isEnabledFor(logging.ERROR)) 4364 self.assertTrue(logger2.isEnabledFor(logging.CRITICAL)) 4365 self.assertFalse(logger1.isEnabledFor(logging.ERROR)) 4366 self.assertTrue(logger1.isEnabledFor(logging.CRITICAL)) 4367 self.assertTrue(root.isEnabledFor(logging.ERROR)) 4368 4369 # Disable logging in manager and ensure caches are clear 4370 logging.disable() 4371 self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL) 4372 self.assertEqual(logger2._cache, {}) 4373 self.assertEqual(logger1._cache, {}) 4374 self.assertEqual(root._cache, {}) 4375 4376 # Ensure no loggers are enabled 4377 self.assertFalse(logger1.isEnabledFor(logging.CRITICAL)) 4378 self.assertFalse(logger2.isEnabledFor(logging.CRITICAL)) 4379 self.assertFalse(root.isEnabledFor(logging.CRITICAL)) 4380 4381 4382class BaseFileTest(BaseTest): 4383 "Base class for handler tests that write log files" 4384 4385 def setUp(self): 4386 BaseTest.setUp(self) 4387 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 4388 os.close(fd) 4389 self.rmfiles = [] 4390 4391 def tearDown(self): 4392 for fn in self.rmfiles: 4393 os.unlink(fn) 4394 if os.path.exists(self.fn): 4395 os.unlink(self.fn) 4396 BaseTest.tearDown(self) 4397 4398 def assertLogFile(self, filename): 4399 "Assert a log file is there and register it for deletion" 4400 self.assertTrue(os.path.exists(filename), 4401 msg="Log file %r does not exist" % filename) 4402 self.rmfiles.append(filename) 4403 4404 4405class FileHandlerTest(BaseFileTest): 4406 def test_delay(self): 4407 os.unlink(self.fn) 4408 fh = logging.FileHandler(self.fn, delay=True) 4409 self.assertIsNone(fh.stream) 4410 self.assertFalse(os.path.exists(self.fn)) 4411 fh.handle(logging.makeLogRecord({})) 4412 self.assertIsNotNone(fh.stream) 4413 self.assertTrue(os.path.exists(self.fn)) 4414 fh.close() 4415 4416class RotatingFileHandlerTest(BaseFileTest): 4417 def next_rec(self): 4418 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 4419 self.next_message(), None, None, None) 4420 4421 def test_should_not_rollover(self): 4422 # If maxbytes is zero rollover never occurs 4423 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) 4424 self.assertFalse(rh.shouldRollover(None)) 4425 rh.close() 4426 4427 def test_should_rollover(self): 4428 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) 4429 self.assertTrue(rh.shouldRollover(self.next_rec())) 4430 rh.close() 4431 4432 def test_file_created(self): 4433 # checks that the file is created and assumes it was created 4434 # by us 4435 rh = logging.handlers.RotatingFileHandler(self.fn) 4436 rh.emit(self.next_rec()) 4437 self.assertLogFile(self.fn) 4438 rh.close() 4439 4440 def test_rollover_filenames(self): 4441 def namer(name): 4442 return name + ".test" 4443 rh = logging.handlers.RotatingFileHandler( 4444 self.fn, backupCount=2, maxBytes=1) 4445 rh.namer = namer 4446 rh.emit(self.next_rec()) 4447 self.assertLogFile(self.fn) 4448 rh.emit(self.next_rec()) 4449 self.assertLogFile(namer(self.fn + ".1")) 4450 rh.emit(self.next_rec()) 4451 self.assertLogFile(namer(self.fn + ".2")) 4452 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4453 rh.close() 4454 4455 @support.requires_zlib 4456 def test_rotator(self): 4457 def namer(name): 4458 return name + ".gz" 4459 4460 def rotator(source, dest): 4461 with open(source, "rb") as sf: 4462 data = sf.read() 4463 compressed = zlib.compress(data, 9) 4464 with open(dest, "wb") as df: 4465 df.write(compressed) 4466 os.remove(source) 4467 4468 rh = logging.handlers.RotatingFileHandler( 4469 self.fn, backupCount=2, maxBytes=1) 4470 rh.rotator = rotator 4471 rh.namer = namer 4472 m1 = self.next_rec() 4473 rh.emit(m1) 4474 self.assertLogFile(self.fn) 4475 m2 = self.next_rec() 4476 rh.emit(m2) 4477 fn = namer(self.fn + ".1") 4478 self.assertLogFile(fn) 4479 newline = os.linesep 4480 with open(fn, "rb") as f: 4481 compressed = f.read() 4482 data = zlib.decompress(compressed) 4483 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4484 rh.emit(self.next_rec()) 4485 fn = namer(self.fn + ".2") 4486 self.assertLogFile(fn) 4487 with open(fn, "rb") as f: 4488 compressed = f.read() 4489 data = zlib.decompress(compressed) 4490 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4491 rh.emit(self.next_rec()) 4492 fn = namer(self.fn + ".2") 4493 with open(fn, "rb") as f: 4494 compressed = f.read() 4495 data = zlib.decompress(compressed) 4496 self.assertEqual(data.decode("ascii"), m2.msg + newline) 4497 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4498 rh.close() 4499 4500class TimedRotatingFileHandlerTest(BaseFileTest): 4501 # other test methods added below 4502 def test_rollover(self): 4503 fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', 4504 backupCount=1) 4505 fmt = logging.Formatter('%(asctime)s %(message)s') 4506 fh.setFormatter(fmt) 4507 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 4508 fh.emit(r1) 4509 self.assertLogFile(self.fn) 4510 time.sleep(1.1) # a little over a second ... 4511 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 4512 fh.emit(r2) 4513 fh.close() 4514 # At this point, we should have a recent rotated file which we 4515 # can test for the existence of. However, in practice, on some 4516 # machines which run really slowly, we don't know how far back 4517 # in time to go to look for the log file. So, we go back a fair 4518 # bit, and stop as soon as we see a rotated file. In theory this 4519 # could of course still fail, but the chances are lower. 4520 found = False 4521 now = datetime.datetime.now() 4522 GO_BACK = 5 * 60 # seconds 4523 for secs in range(GO_BACK): 4524 prev = now - datetime.timedelta(seconds=secs) 4525 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 4526 found = os.path.exists(fn) 4527 if found: 4528 self.rmfiles.append(fn) 4529 break 4530 msg = 'No rotated files found, went back %d seconds' % GO_BACK 4531 if not found: 4532 # print additional diagnostics 4533 dn, fn = os.path.split(self.fn) 4534 files = [f for f in os.listdir(dn) if f.startswith(fn)] 4535 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 4536 print('The only matching files are: %s' % files, file=sys.stderr) 4537 for f in files: 4538 print('Contents of %s:' % f) 4539 path = os.path.join(dn, f) 4540 with open(path, 'r') as tf: 4541 print(tf.read()) 4542 self.assertTrue(found, msg=msg) 4543 4544 def test_invalid(self): 4545 assertRaises = self.assertRaises 4546 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4547 self.fn, 'X', delay=True) 4548 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4549 self.fn, 'W', delay=True) 4550 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4551 self.fn, 'W7', delay=True) 4552 4553 def test_compute_rollover_daily_attime(self): 4554 currentTime = 0 4555 atTime = datetime.time(12, 0, 0) 4556 rh = logging.handlers.TimedRotatingFileHandler( 4557 self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, 4558 atTime=atTime) 4559 try: 4560 actual = rh.computeRollover(currentTime) 4561 self.assertEqual(actual, currentTime + 12 * 60 * 60) 4562 4563 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 4564 self.assertEqual(actual, currentTime + 36 * 60 * 60) 4565 finally: 4566 rh.close() 4567 4568 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 4569 def test_compute_rollover_weekly_attime(self): 4570 currentTime = int(time.time()) 4571 today = currentTime - currentTime % 86400 4572 4573 atTime = datetime.time(12, 0, 0) 4574 4575 wday = time.gmtime(today).tm_wday 4576 for day in range(7): 4577 rh = logging.handlers.TimedRotatingFileHandler( 4578 self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, 4579 atTime=atTime) 4580 try: 4581 if wday > day: 4582 # The rollover day has already passed this week, so we 4583 # go over into next week 4584 expected = (7 - wday + day) 4585 else: 4586 expected = (day - wday) 4587 # At this point expected is in days from now, convert to seconds 4588 expected *= 24 * 60 * 60 4589 # Add in the rollover time 4590 expected += 12 * 60 * 60 4591 # Add in adjustment for today 4592 expected += today 4593 actual = rh.computeRollover(today) 4594 if actual != expected: 4595 print('failed in timezone: %d' % time.timezone) 4596 print('local vars: %s' % locals()) 4597 self.assertEqual(actual, expected) 4598 if day == wday: 4599 # goes into following week 4600 expected += 7 * 24 * 60 * 60 4601 actual = rh.computeRollover(today + 13 * 60 * 60) 4602 if actual != expected: 4603 print('failed in timezone: %d' % time.timezone) 4604 print('local vars: %s' % locals()) 4605 self.assertEqual(actual, expected) 4606 finally: 4607 rh.close() 4608 4609 4610def secs(**kw): 4611 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 4612 4613for when, exp in (('S', 1), 4614 ('M', 60), 4615 ('H', 60 * 60), 4616 ('D', 60 * 60 * 24), 4617 ('MIDNIGHT', 60 * 60 * 24), 4618 # current time (epoch start) is a Thursday, W0 means Monday 4619 ('W0', secs(days=4, hours=24)), 4620 ): 4621 def test_compute_rollover(self, when=when, exp=exp): 4622 rh = logging.handlers.TimedRotatingFileHandler( 4623 self.fn, when=when, interval=1, backupCount=0, utc=True) 4624 currentTime = 0.0 4625 actual = rh.computeRollover(currentTime) 4626 if exp != actual: 4627 # Failures occur on some systems for MIDNIGHT and W0. 4628 # Print detailed calculation for MIDNIGHT so we can try to see 4629 # what's going on 4630 if when == 'MIDNIGHT': 4631 try: 4632 if rh.utc: 4633 t = time.gmtime(currentTime) 4634 else: 4635 t = time.localtime(currentTime) 4636 currentHour = t[3] 4637 currentMinute = t[4] 4638 currentSecond = t[5] 4639 # r is the number of seconds left between now and midnight 4640 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 4641 currentMinute) * 60 + 4642 currentSecond) 4643 result = currentTime + r 4644 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 4645 print('currentHour: %s' % currentHour, file=sys.stderr) 4646 print('currentMinute: %s' % currentMinute, file=sys.stderr) 4647 print('currentSecond: %s' % currentSecond, file=sys.stderr) 4648 print('r: %s' % r, file=sys.stderr) 4649 print('result: %s' % result, file=sys.stderr) 4650 except Exception: 4651 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 4652 self.assertEqual(exp, actual) 4653 rh.close() 4654 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 4655 4656 4657@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 4658class NTEventLogHandlerTest(BaseTest): 4659 def test_basic(self): 4660 logtype = 'Application' 4661 elh = win32evtlog.OpenEventLog(None, logtype) 4662 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 4663 4664 try: 4665 h = logging.handlers.NTEventLogHandler('test_logging') 4666 except pywintypes.error as e: 4667 if e.winerror == 5: # access denied 4668 raise unittest.SkipTest('Insufficient privileges to run test') 4669 raise 4670 4671 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 4672 h.handle(r) 4673 h.close() 4674 # Now see if the event is recorded 4675 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 4676 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 4677 win32evtlog.EVENTLOG_SEQUENTIAL_READ 4678 found = False 4679 GO_BACK = 100 4680 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 4681 for e in events: 4682 if e.SourceName != 'test_logging': 4683 continue 4684 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 4685 if msg != 'Test Log Message\r\n': 4686 continue 4687 found = True 4688 break 4689 msg = 'Record not found in event log, went back %d records' % GO_BACK 4690 self.assertTrue(found, msg=msg) 4691 4692 4693class MiscTestCase(unittest.TestCase): 4694 def test__all__(self): 4695 blacklist = {'logThreads', 'logMultiprocessing', 4696 'logProcesses', 'currentframe', 4697 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 4698 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 4699 'root', 'threading'} 4700 support.check__all__(self, logging, blacklist=blacklist) 4701 4702 4703# Set the locale to the platform-dependent default. I have no idea 4704# why the test does this, but in any case we save the current locale 4705# first and restore it at the end. 4706@support.run_with_locale('LC_ALL', '') 4707def test_main(): 4708 tests = [ 4709 BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, 4710 HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest, 4711 DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest, 4712 ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest, 4713 StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, 4714 QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, 4715 LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest, 4716 RotatingFileHandlerTest, LastResortTest, LogRecordTest, 4717 ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest, 4718 NTEventLogHandlerTest, TimedRotatingFileHandlerTest, 4719 UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest, 4720 MiscTestCase 4721 ] 4722 if hasattr(logging.handlers, 'QueueListener'): 4723 tests.append(QueueListenerTest) 4724 support.run_unittest(*tests) 4725 4726if __name__ == "__main__": 4727 test_main() 4728