1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import datetime
29import pathlib
30import pickle
31import io
32import gc
33import json
34import os
35import queue
36import random
37import re
38import signal
39import socket
40import struct
41import sys
42import tempfile
43from test.support.script_helper import assert_python_ok
44from test import support
45import textwrap
46import threading
47import time
48import unittest
49import warnings
50import weakref
51
52import asyncore
53from http.server import HTTPServer, BaseHTTPRequestHandler
54import smtpd
55from urllib.parse import urlparse, parse_qs
56from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
57                          ThreadingTCPServer, StreamRequestHandler)
58
59try:
60    import win32evtlog, win32evtlogutil, pywintypes
61except ImportError:
62    win32evtlog = win32evtlogutil = pywintypes = None
63
64try:
65    import zlib
66except ImportError:
67    pass
68
69class BaseTest(unittest.TestCase):
70
71    """Base class for logging tests."""
72
73    log_format = "%(name)s -> %(levelname)s: %(message)s"
74    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
75    message_num = 0
76
77    def setUp(self):
78        """Setup the default logging stream to an internal StringIO instance,
79        so that we can examine log output as we want."""
80        self._threading_key = support.threading_setup()
81
82        logger_dict = logging.getLogger().manager.loggerDict
83        logging._acquireLock()
84        try:
85            self.saved_handlers = logging._handlers.copy()
86            self.saved_handler_list = logging._handlerList[:]
87            self.saved_loggers = saved_loggers = logger_dict.copy()
88            self.saved_name_to_level = logging._nameToLevel.copy()
89            self.saved_level_to_name = logging._levelToName.copy()
90            self.logger_states = logger_states = {}
91            for name in saved_loggers:
92                logger_states[name] = getattr(saved_loggers[name],
93                                              'disabled', None)
94        finally:
95            logging._releaseLock()
96
97        # Set two unused loggers
98        self.logger1 = logging.getLogger("\xab\xd7\xbb")
99        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
100
101        self.root_logger = logging.getLogger("")
102        self.original_logging_level = self.root_logger.getEffectiveLevel()
103
104        self.stream = io.StringIO()
105        self.root_logger.setLevel(logging.DEBUG)
106        self.root_hdlr = logging.StreamHandler(self.stream)
107        self.root_formatter = logging.Formatter(self.log_format)
108        self.root_hdlr.setFormatter(self.root_formatter)
109        if self.logger1.hasHandlers():
110            hlist = self.logger1.handlers + self.root_logger.handlers
111            raise AssertionError('Unexpected handlers: %s' % hlist)
112        if self.logger2.hasHandlers():
113            hlist = self.logger2.handlers + self.root_logger.handlers
114            raise AssertionError('Unexpected handlers: %s' % hlist)
115        self.root_logger.addHandler(self.root_hdlr)
116        self.assertTrue(self.logger1.hasHandlers())
117        self.assertTrue(self.logger2.hasHandlers())
118
119    def tearDown(self):
120        """Remove our logging stream, and restore the original logging
121        level."""
122        self.stream.close()
123        self.root_logger.removeHandler(self.root_hdlr)
124        while self.root_logger.handlers:
125            h = self.root_logger.handlers[0]
126            self.root_logger.removeHandler(h)
127            h.close()
128        self.root_logger.setLevel(self.original_logging_level)
129        logging._acquireLock()
130        try:
131            logging._levelToName.clear()
132            logging._levelToName.update(self.saved_level_to_name)
133            logging._nameToLevel.clear()
134            logging._nameToLevel.update(self.saved_name_to_level)
135            logging._handlers.clear()
136            logging._handlers.update(self.saved_handlers)
137            logging._handlerList[:] = self.saved_handler_list
138            manager = logging.getLogger().manager
139            manager.disable = 0
140            loggerDict = manager.loggerDict
141            loggerDict.clear()
142            loggerDict.update(self.saved_loggers)
143            logger_states = self.logger_states
144            for name in self.logger_states:
145                if logger_states[name] is not None:
146                    self.saved_loggers[name].disabled = logger_states[name]
147        finally:
148            logging._releaseLock()
149
150        self.doCleanups()
151        support.threading_cleanup(*self._threading_key)
152
153    def assert_log_lines(self, expected_values, stream=None, pat=None):
154        """Match the collected log lines against the regular expression
155        self.expected_log_pat, and compare the extracted group values to
156        the expected_values list of tuples."""
157        stream = stream or self.stream
158        pat = re.compile(pat or self.expected_log_pat)
159        actual_lines = stream.getvalue().splitlines()
160        self.assertEqual(len(actual_lines), len(expected_values))
161        for actual, expected in zip(actual_lines, expected_values):
162            match = pat.search(actual)
163            if not match:
164                self.fail("Log line does not match expected pattern:\n" +
165                            actual)
166            self.assertEqual(tuple(match.groups()), expected)
167        s = stream.read()
168        if s:
169            self.fail("Remaining output at end of log stream:\n" + s)
170
171    def next_message(self):
172        """Generate a message consisting solely of an auto-incrementing
173        integer."""
174        self.message_num += 1
175        return "%d" % self.message_num
176
177
178class BuiltinLevelsTest(BaseTest):
179    """Test builtin levels and their inheritance."""
180
181    def test_flat(self):
182        # Logging levels in a flat logger namespace.
183        m = self.next_message
184
185        ERR = logging.getLogger("ERR")
186        ERR.setLevel(logging.ERROR)
187        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
188        INF.setLevel(logging.INFO)
189        DEB = logging.getLogger("DEB")
190        DEB.setLevel(logging.DEBUG)
191
192        # These should log.
193        ERR.log(logging.CRITICAL, m())
194        ERR.error(m())
195
196        INF.log(logging.CRITICAL, m())
197        INF.error(m())
198        INF.warning(m())
199        INF.info(m())
200
201        DEB.log(logging.CRITICAL, m())
202        DEB.error(m())
203        DEB.warning(m())
204        DEB.info(m())
205        DEB.debug(m())
206
207        # These should not log.
208        ERR.warning(m())
209        ERR.info(m())
210        ERR.debug(m())
211
212        INF.debug(m())
213
214        self.assert_log_lines([
215            ('ERR', 'CRITICAL', '1'),
216            ('ERR', 'ERROR', '2'),
217            ('INF', 'CRITICAL', '3'),
218            ('INF', 'ERROR', '4'),
219            ('INF', 'WARNING', '5'),
220            ('INF', 'INFO', '6'),
221            ('DEB', 'CRITICAL', '7'),
222            ('DEB', 'ERROR', '8'),
223            ('DEB', 'WARNING', '9'),
224            ('DEB', 'INFO', '10'),
225            ('DEB', 'DEBUG', '11'),
226        ])
227
228    def test_nested_explicit(self):
229        # Logging levels in a nested namespace, all explicitly set.
230        m = self.next_message
231
232        INF = logging.getLogger("INF")
233        INF.setLevel(logging.INFO)
234        INF_ERR  = logging.getLogger("INF.ERR")
235        INF_ERR.setLevel(logging.ERROR)
236
237        # These should log.
238        INF_ERR.log(logging.CRITICAL, m())
239        INF_ERR.error(m())
240
241        # These should not log.
242        INF_ERR.warning(m())
243        INF_ERR.info(m())
244        INF_ERR.debug(m())
245
246        self.assert_log_lines([
247            ('INF.ERR', 'CRITICAL', '1'),
248            ('INF.ERR', 'ERROR', '2'),
249        ])
250
251    def test_nested_inherited(self):
252        # Logging levels in a nested namespace, inherited from parent loggers.
253        m = self.next_message
254
255        INF = logging.getLogger("INF")
256        INF.setLevel(logging.INFO)
257        INF_ERR  = logging.getLogger("INF.ERR")
258        INF_ERR.setLevel(logging.ERROR)
259        INF_UNDEF = logging.getLogger("INF.UNDEF")
260        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
261        UNDEF = logging.getLogger("UNDEF")
262
263        # These should log.
264        INF_UNDEF.log(logging.CRITICAL, m())
265        INF_UNDEF.error(m())
266        INF_UNDEF.warning(m())
267        INF_UNDEF.info(m())
268        INF_ERR_UNDEF.log(logging.CRITICAL, m())
269        INF_ERR_UNDEF.error(m())
270
271        # These should not log.
272        INF_UNDEF.debug(m())
273        INF_ERR_UNDEF.warning(m())
274        INF_ERR_UNDEF.info(m())
275        INF_ERR_UNDEF.debug(m())
276
277        self.assert_log_lines([
278            ('INF.UNDEF', 'CRITICAL', '1'),
279            ('INF.UNDEF', 'ERROR', '2'),
280            ('INF.UNDEF', 'WARNING', '3'),
281            ('INF.UNDEF', 'INFO', '4'),
282            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
283            ('INF.ERR.UNDEF', 'ERROR', '6'),
284        ])
285
286    def test_nested_with_virtual_parent(self):
287        # Logging levels when some parent does not exist yet.
288        m = self.next_message
289
290        INF = logging.getLogger("INF")
291        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
292        CHILD = logging.getLogger("INF.BADPARENT")
293        INF.setLevel(logging.INFO)
294
295        # These should log.
296        GRANDCHILD.log(logging.FATAL, m())
297        GRANDCHILD.info(m())
298        CHILD.log(logging.FATAL, m())
299        CHILD.info(m())
300
301        # These should not log.
302        GRANDCHILD.debug(m())
303        CHILD.debug(m())
304
305        self.assert_log_lines([
306            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
307            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
308            ('INF.BADPARENT', 'CRITICAL', '3'),
309            ('INF.BADPARENT', 'INFO', '4'),
310        ])
311
312    def test_regression_22386(self):
313        """See issue #22386 for more information."""
314        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
315        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
316
317    def test_regression_29220(self):
318        """See issue #29220 for more information."""
319        logging.addLevelName(logging.INFO, '')
320        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
321        self.assertEqual(logging.getLevelName(logging.INFO), '')
322
323    def test_issue27935(self):
324        fatal = logging.getLevelName('FATAL')
325        self.assertEqual(fatal, logging.FATAL)
326
327    def test_regression_29220(self):
328        """See issue #29220 for more information."""
329        logging.addLevelName(logging.INFO, '')
330        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
331        self.assertEqual(logging.getLevelName(logging.INFO), '')
332        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
333        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
334
335class BasicFilterTest(BaseTest):
336
337    """Test the bundled Filter class."""
338
339    def test_filter(self):
340        # Only messages satisfying the specified criteria pass through the
341        #  filter.
342        filter_ = logging.Filter("spam.eggs")
343        handler = self.root_logger.handlers[0]
344        try:
345            handler.addFilter(filter_)
346            spam = logging.getLogger("spam")
347            spam_eggs = logging.getLogger("spam.eggs")
348            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
349            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
350
351            spam.info(self.next_message())
352            spam_eggs.info(self.next_message())  # Good.
353            spam_eggs_fish.info(self.next_message())  # Good.
354            spam_bakedbeans.info(self.next_message())
355
356            self.assert_log_lines([
357                ('spam.eggs', 'INFO', '2'),
358                ('spam.eggs.fish', 'INFO', '3'),
359            ])
360        finally:
361            handler.removeFilter(filter_)
362
363    def test_callable_filter(self):
364        # Only messages satisfying the specified criteria pass through the
365        #  filter.
366
367        def filterfunc(record):
368            parts = record.name.split('.')
369            prefix = '.'.join(parts[:2])
370            return prefix == 'spam.eggs'
371
372        handler = self.root_logger.handlers[0]
373        try:
374            handler.addFilter(filterfunc)
375            spam = logging.getLogger("spam")
376            spam_eggs = logging.getLogger("spam.eggs")
377            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
378            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
379
380            spam.info(self.next_message())
381            spam_eggs.info(self.next_message())  # Good.
382            spam_eggs_fish.info(self.next_message())  # Good.
383            spam_bakedbeans.info(self.next_message())
384
385            self.assert_log_lines([
386                ('spam.eggs', 'INFO', '2'),
387                ('spam.eggs.fish', 'INFO', '3'),
388            ])
389        finally:
390            handler.removeFilter(filterfunc)
391
392    def test_empty_filter(self):
393        f = logging.Filter()
394        r = logging.makeLogRecord({'name': 'spam.eggs'})
395        self.assertTrue(f.filter(r))
396
397#
398#   First, we define our levels. There can be as many as you want - the only
399#     limitations are that they should be integers, the lowest should be > 0 and
400#   larger values mean less information being logged. If you need specific
401#   level values which do not fit into these limitations, you can use a
402#   mapping dictionary to convert between your application levels and the
403#   logging system.
404#
405SILENT      = 120
406TACITURN    = 119
407TERSE       = 118
408EFFUSIVE    = 117
409SOCIABLE    = 116
410VERBOSE     = 115
411TALKATIVE   = 114
412GARRULOUS   = 113
413CHATTERBOX  = 112
414BORING      = 111
415
416LEVEL_RANGE = range(BORING, SILENT + 1)
417
418#
419#   Next, we define names for our levels. You don't need to do this - in which
420#   case the system will use "Level n" to denote the text for the level.
421#
422my_logging_levels = {
423    SILENT      : 'Silent',
424    TACITURN    : 'Taciturn',
425    TERSE       : 'Terse',
426    EFFUSIVE    : 'Effusive',
427    SOCIABLE    : 'Sociable',
428    VERBOSE     : 'Verbose',
429    TALKATIVE   : 'Talkative',
430    GARRULOUS   : 'Garrulous',
431    CHATTERBOX  : 'Chatterbox',
432    BORING      : 'Boring',
433}
434
435class GarrulousFilter(logging.Filter):
436
437    """A filter which blocks garrulous messages."""
438
439    def filter(self, record):
440        return record.levelno != GARRULOUS
441
442class VerySpecificFilter(logging.Filter):
443
444    """A filter which blocks sociable and taciturn messages."""
445
446    def filter(self, record):
447        return record.levelno not in [SOCIABLE, TACITURN]
448
449
450class CustomLevelsAndFiltersTest(BaseTest):
451
452    """Test various filtering possibilities with custom logging levels."""
453
454    # Skip the logger name group.
455    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
456
457    def setUp(self):
458        BaseTest.setUp(self)
459        for k, v in my_logging_levels.items():
460            logging.addLevelName(k, v)
461
462    def log_at_all_levels(self, logger):
463        for lvl in LEVEL_RANGE:
464            logger.log(lvl, self.next_message())
465
466    def test_logger_filter(self):
467        # Filter at logger level.
468        self.root_logger.setLevel(VERBOSE)
469        # Levels >= 'Verbose' are good.
470        self.log_at_all_levels(self.root_logger)
471        self.assert_log_lines([
472            ('Verbose', '5'),
473            ('Sociable', '6'),
474            ('Effusive', '7'),
475            ('Terse', '8'),
476            ('Taciturn', '9'),
477            ('Silent', '10'),
478        ])
479
480    def test_handler_filter(self):
481        # Filter at handler level.
482        self.root_logger.handlers[0].setLevel(SOCIABLE)
483        try:
484            # Levels >= 'Sociable' are good.
485            self.log_at_all_levels(self.root_logger)
486            self.assert_log_lines([
487                ('Sociable', '6'),
488                ('Effusive', '7'),
489                ('Terse', '8'),
490                ('Taciturn', '9'),
491                ('Silent', '10'),
492            ])
493        finally:
494            self.root_logger.handlers[0].setLevel(logging.NOTSET)
495
496    def test_specific_filters(self):
497        # Set a specific filter object on the handler, and then add another
498        #  filter object on the logger itself.
499        handler = self.root_logger.handlers[0]
500        specific_filter = None
501        garr = GarrulousFilter()
502        handler.addFilter(garr)
503        try:
504            self.log_at_all_levels(self.root_logger)
505            first_lines = [
506                # Notice how 'Garrulous' is missing
507                ('Boring', '1'),
508                ('Chatterbox', '2'),
509                ('Talkative', '4'),
510                ('Verbose', '5'),
511                ('Sociable', '6'),
512                ('Effusive', '7'),
513                ('Terse', '8'),
514                ('Taciturn', '9'),
515                ('Silent', '10'),
516            ]
517            self.assert_log_lines(first_lines)
518
519            specific_filter = VerySpecificFilter()
520            self.root_logger.addFilter(specific_filter)
521            self.log_at_all_levels(self.root_logger)
522            self.assert_log_lines(first_lines + [
523                # Not only 'Garrulous' is still missing, but also 'Sociable'
524                # and 'Taciturn'
525                ('Boring', '11'),
526                ('Chatterbox', '12'),
527                ('Talkative', '14'),
528                ('Verbose', '15'),
529                ('Effusive', '17'),
530                ('Terse', '18'),
531                ('Silent', '20'),
532        ])
533        finally:
534            if specific_filter:
535                self.root_logger.removeFilter(specific_filter)
536            handler.removeFilter(garr)
537
538
539class HandlerTest(BaseTest):
540    def test_name(self):
541        h = logging.Handler()
542        h.name = 'generic'
543        self.assertEqual(h.name, 'generic')
544        h.name = 'anothergeneric'
545        self.assertEqual(h.name, 'anothergeneric')
546        self.assertRaises(NotImplementedError, h.emit, None)
547
548    def test_builtin_handlers(self):
549        # We can't actually *use* too many handlers in the tests,
550        # but we can try instantiating them with various options
551        if sys.platform in ('linux', 'darwin'):
552            for existing in (True, False):
553                fd, fn = tempfile.mkstemp()
554                os.close(fd)
555                if not existing:
556                    os.unlink(fn)
557                h = logging.handlers.WatchedFileHandler(fn, delay=True)
558                if existing:
559                    dev, ino = h.dev, h.ino
560                    self.assertEqual(dev, -1)
561                    self.assertEqual(ino, -1)
562                    r = logging.makeLogRecord({'msg': 'Test'})
563                    h.handle(r)
564                    # Now remove the file.
565                    os.unlink(fn)
566                    self.assertFalse(os.path.exists(fn))
567                    # The next call should recreate the file.
568                    h.handle(r)
569                    self.assertTrue(os.path.exists(fn))
570                else:
571                    self.assertEqual(h.dev, -1)
572                    self.assertEqual(h.ino, -1)
573                h.close()
574                if existing:
575                    os.unlink(fn)
576            if sys.platform == 'darwin':
577                sockname = '/var/run/syslog'
578            else:
579                sockname = '/dev/log'
580            try:
581                h = logging.handlers.SysLogHandler(sockname)
582                self.assertEqual(h.facility, h.LOG_USER)
583                self.assertTrue(h.unixsocket)
584                h.close()
585            except OSError: # syslogd might not be available
586                pass
587        for method in ('GET', 'POST', 'PUT'):
588            if method == 'PUT':
589                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
590                                  'localhost', '/log', method)
591            else:
592                h = logging.handlers.HTTPHandler('localhost', '/log', method)
593                h.close()
594        h = logging.handlers.BufferingHandler(0)
595        r = logging.makeLogRecord({})
596        self.assertTrue(h.shouldFlush(r))
597        h.close()
598        h = logging.handlers.BufferingHandler(1)
599        self.assertFalse(h.shouldFlush(r))
600        h.close()
601
602    def test_path_objects(self):
603        """
604        Test that Path objects are accepted as filename arguments to handlers.
605
606        See Issue #27493.
607        """
608        fd, fn = tempfile.mkstemp()
609        os.close(fd)
610        os.unlink(fn)
611        pfn = pathlib.Path(fn)
612        cases = (
613                    (logging.FileHandler, (pfn, 'w')),
614                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
615                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
616                )
617        if sys.platform in ('linux', 'darwin'):
618            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
619        for cls, args in cases:
620            h = cls(*args)
621            self.assertTrue(os.path.exists(fn))
622            h.close()
623            os.unlink(fn)
624
625    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
626    def test_race(self):
627        # Issue #14632 refers.
628        def remove_loop(fname, tries):
629            for _ in range(tries):
630                try:
631                    os.unlink(fname)
632                    self.deletion_time = time.time()
633                except OSError:
634                    pass
635                time.sleep(0.004 * random.randint(0, 4))
636
637        del_count = 500
638        log_count = 500
639
640        self.handle_time = None
641        self.deletion_time = None
642
643        for delay in (False, True):
644            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
645            os.close(fd)
646            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
647            remover.daemon = True
648            remover.start()
649            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
650            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
651            h.setFormatter(f)
652            try:
653                for _ in range(log_count):
654                    time.sleep(0.005)
655                    r = logging.makeLogRecord({'msg': 'testing' })
656                    try:
657                        self.handle_time = time.time()
658                        h.handle(r)
659                    except Exception:
660                        print('Deleted at %s, '
661                              'opened at %s' % (self.deletion_time,
662                                                self.handle_time))
663                        raise
664            finally:
665                remover.join()
666                h.close()
667                if os.path.exists(fn):
668                    os.unlink(fn)
669
670    # The implementation relies on os.register_at_fork existing, but we test
671    # based on os.fork existing because that is what users and this test use.
672    # This helps ensure that when fork exists (the important concept) that the
673    # register_at_fork mechanism is also present and used.
674    @unittest.skipIf(not hasattr(os, 'fork'), 'Test requires os.fork().')
675    def test_post_fork_child_no_deadlock(self):
676        """Ensure forked child logging locks are not held; bpo-6721."""
677        refed_h = logging.Handler()
678        refed_h.name = 'because we need at least one for this test'
679        self.assertGreater(len(logging._handlers), 0)
680
681        locks_held__ready_to_fork = threading.Event()
682        fork_happened__release_locks_and_end_thread = threading.Event()
683
684        def lock_holder_thread_fn():
685            logging._acquireLock()
686            try:
687                refed_h.acquire()
688                try:
689                    # Tell the main thread to do the fork.
690                    locks_held__ready_to_fork.set()
691
692                    # If the deadlock bug exists, the fork will happen
693                    # without dealing with the locks we hold, deadlocking
694                    # the child.
695
696                    # Wait for a successful fork or an unreasonable amount of
697                    # time before releasing our locks.  To avoid a timing based
698                    # test we'd need communication from os.fork() as to when it
699                    # has actually happened.  Given this is a regression test
700                    # for a fixed issue, potentially less reliably detecting
701                    # regression via timing is acceptable for simplicity.
702                    # The test will always take at least this long. :(
703                    fork_happened__release_locks_and_end_thread.wait(0.5)
704                finally:
705                    refed_h.release()
706            finally:
707                logging._releaseLock()
708
709        lock_holder_thread = threading.Thread(
710                target=lock_holder_thread_fn,
711                name='test_post_fork_child_no_deadlock lock holder')
712        lock_holder_thread.start()
713
714        locks_held__ready_to_fork.wait()
715        pid = os.fork()
716        if pid == 0:  # Child.
717            logging.error(r'Child process did not deadlock. \o/')
718            os._exit(0)
719        else:  # Parent.
720            fork_happened__release_locks_and_end_thread.set()
721            lock_holder_thread.join()
722            start_time = time.monotonic()
723            while True:
724                waited_pid, status = os.waitpid(pid, os.WNOHANG)
725                if waited_pid == pid:
726                    break  # child process exited.
727                if time.monotonic() - start_time > 7:
728                    break  # so long? implies child deadlock.
729                time.sleep(0.05)
730            if waited_pid != pid:
731                os.kill(pid, signal.SIGKILL)
732                waited_pid, status = os.waitpid(pid, 0)
733                self.fail("child process deadlocked.")
734            self.assertEqual(status, 0, msg="child process error")
735
736
737class BadStream(object):
738    def write(self, data):
739        raise RuntimeError('deliberate mistake')
740
741class TestStreamHandler(logging.StreamHandler):
742    def handleError(self, record):
743        self.error_record = record
744
745class StreamHandlerTest(BaseTest):
746    def test_error_handling(self):
747        h = TestStreamHandler(BadStream())
748        r = logging.makeLogRecord({})
749        old_raise = logging.raiseExceptions
750
751        try:
752            h.handle(r)
753            self.assertIs(h.error_record, r)
754
755            h = logging.StreamHandler(BadStream())
756            with support.captured_stderr() as stderr:
757                h.handle(r)
758                msg = '\nRuntimeError: deliberate mistake\n'
759                self.assertIn(msg, stderr.getvalue())
760
761            logging.raiseExceptions = False
762            with support.captured_stderr() as stderr:
763                h.handle(r)
764                self.assertEqual('', stderr.getvalue())
765        finally:
766            logging.raiseExceptions = old_raise
767
768    def test_stream_setting(self):
769        """
770        Test setting the handler's stream
771        """
772        h = logging.StreamHandler()
773        stream = io.StringIO()
774        old = h.setStream(stream)
775        self.assertIs(old, sys.stderr)
776        actual = h.setStream(old)
777        self.assertIs(actual, stream)
778        # test that setting to existing value returns None
779        actual = h.setStream(old)
780        self.assertIsNone(actual)
781
782# -- The following section could be moved into a server_helper.py module
783# -- if it proves to be of wider utility than just test_logging
784
785class TestSMTPServer(smtpd.SMTPServer):
786    """
787    This class implements a test SMTP server.
788
789    :param addr: A (host, port) tuple which the server listens on.
790                 You can specify a port value of zero: the server's
791                 *port* attribute will hold the actual port number
792                 used, which can be used in client connections.
793    :param handler: A callable which will be called to process
794                    incoming messages. The handler will be passed
795                    the client address tuple, who the message is from,
796                    a list of recipients and the message data.
797    :param poll_interval: The interval, in seconds, used in the underlying
798                          :func:`select` or :func:`poll` call by
799                          :func:`asyncore.loop`.
800    :param sockmap: A dictionary which will be used to hold
801                    :class:`asyncore.dispatcher` instances used by
802                    :func:`asyncore.loop`. This avoids changing the
803                    :mod:`asyncore` module's global state.
804    """
805
806    def __init__(self, addr, handler, poll_interval, sockmap):
807        smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
808                                  decode_data=True)
809        self.port = self.socket.getsockname()[1]
810        self._handler = handler
811        self._thread = None
812        self.poll_interval = poll_interval
813
814    def process_message(self, peer, mailfrom, rcpttos, data):
815        """
816        Delegates to the handler passed in to the server's constructor.
817
818        Typically, this will be a test case method.
819        :param peer: The client (host, port) tuple.
820        :param mailfrom: The address of the sender.
821        :param rcpttos: The addresses of the recipients.
822        :param data: The message.
823        """
824        self._handler(peer, mailfrom, rcpttos, data)
825
826    def start(self):
827        """
828        Start the server running on a separate daemon thread.
829        """
830        self._thread = t = threading.Thread(target=self.serve_forever,
831                                            args=(self.poll_interval,))
832        t.setDaemon(True)
833        t.start()
834
835    def serve_forever(self, poll_interval):
836        """
837        Run the :mod:`asyncore` loop until normal termination
838        conditions arise.
839        :param poll_interval: The interval, in seconds, used in the underlying
840                              :func:`select` or :func:`poll` call by
841                              :func:`asyncore.loop`.
842        """
843        asyncore.loop(poll_interval, map=self._map)
844
845    def stop(self, timeout=None):
846        """
847        Stop the thread by closing the server instance.
848        Wait for the server thread to terminate.
849
850        :param timeout: How long to wait for the server thread
851                        to terminate.
852        """
853        self.close()
854        support.join_thread(self._thread, timeout)
855        self._thread = None
856        asyncore.close_all(map=self._map, ignore_all=True)
857
858
859class ControlMixin(object):
860    """
861    This mixin is used to start a server on a separate thread, and
862    shut it down programmatically. Request handling is simplified - instead
863    of needing to derive a suitable RequestHandler subclass, you just
864    provide a callable which will be passed each received request to be
865    processed.
866
867    :param handler: A handler callable which will be called with a
868                    single parameter - the request - in order to
869                    process the request. This handler is called on the
870                    server thread, effectively meaning that requests are
871                    processed serially. While not quite Web scale ;-),
872                    this should be fine for testing applications.
873    :param poll_interval: The polling interval in seconds.
874    """
875    def __init__(self, handler, poll_interval):
876        self._thread = None
877        self.poll_interval = poll_interval
878        self._handler = handler
879        self.ready = threading.Event()
880
881    def start(self):
882        """
883        Create a daemon thread to run the server, and start it.
884        """
885        self._thread = t = threading.Thread(target=self.serve_forever,
886                                            args=(self.poll_interval,))
887        t.setDaemon(True)
888        t.start()
889
890    def serve_forever(self, poll_interval):
891        """
892        Run the server. Set the ready flag before entering the
893        service loop.
894        """
895        self.ready.set()
896        super(ControlMixin, self).serve_forever(poll_interval)
897
898    def stop(self, timeout=None):
899        """
900        Tell the server thread to stop, and wait for it to do so.
901
902        :param timeout: How long to wait for the server thread
903                        to terminate.
904        """
905        self.shutdown()
906        if self._thread is not None:
907            support.join_thread(self._thread, timeout)
908            self._thread = None
909        self.server_close()
910        self.ready.clear()
911
912class TestHTTPServer(ControlMixin, HTTPServer):
913    """
914    An HTTP server which is controllable using :class:`ControlMixin`.
915
916    :param addr: A tuple with the IP address and port to listen on.
917    :param handler: A handler callable which will be called with a
918                    single parameter - the request - in order to
919                    process the request.
920    :param poll_interval: The polling interval in seconds.
921    :param log: Pass ``True`` to enable log messages.
922    """
923    def __init__(self, addr, handler, poll_interval=0.5,
924                 log=False, sslctx=None):
925        class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
926            def __getattr__(self, name, default=None):
927                if name.startswith('do_'):
928                    return self.process_request
929                raise AttributeError(name)
930
931            def process_request(self):
932                self.server._handler(self)
933
934            def log_message(self, format, *args):
935                if log:
936                    super(DelegatingHTTPRequestHandler,
937                          self).log_message(format, *args)
938        HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
939        ControlMixin.__init__(self, handler, poll_interval)
940        self.sslctx = sslctx
941
942    def get_request(self):
943        try:
944            sock, addr = self.socket.accept()
945            if self.sslctx:
946                sock = self.sslctx.wrap_socket(sock, server_side=True)
947        except OSError as e:
948            # socket errors are silenced by the caller, print them here
949            sys.stderr.write("Got an error:\n%s\n" % e)
950            raise
951        return sock, addr
952
953class TestTCPServer(ControlMixin, ThreadingTCPServer):
954    """
955    A TCP server which is controllable using :class:`ControlMixin`.
956
957    :param addr: A tuple with the IP address and port to listen on.
958    :param handler: A handler callable which will be called with a single
959                    parameter - the request - in order to process the request.
960    :param poll_interval: The polling interval in seconds.
961    :bind_and_activate: If True (the default), binds the server and starts it
962                        listening. If False, you need to call
963                        :meth:`server_bind` and :meth:`server_activate` at
964                        some later time before calling :meth:`start`, so that
965                        the server will set up the socket and listen on it.
966    """
967
968    allow_reuse_address = True
969
970    def __init__(self, addr, handler, poll_interval=0.5,
971                 bind_and_activate=True):
972        class DelegatingTCPRequestHandler(StreamRequestHandler):
973
974            def handle(self):
975                self.server._handler(self)
976        ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
977                                    bind_and_activate)
978        ControlMixin.__init__(self, handler, poll_interval)
979
980    def server_bind(self):
981        super(TestTCPServer, self).server_bind()
982        self.port = self.socket.getsockname()[1]
983
984class TestUDPServer(ControlMixin, ThreadingUDPServer):
985    """
986    A UDP server which is controllable using :class:`ControlMixin`.
987
988    :param addr: A tuple with the IP address and port to listen on.
989    :param handler: A handler callable which will be called with a
990                    single parameter - the request - in order to
991                    process the request.
992    :param poll_interval: The polling interval for shutdown requests,
993                          in seconds.
994    :bind_and_activate: If True (the default), binds the server and
995                        starts it listening. If False, you need to
996                        call :meth:`server_bind` and
997                        :meth:`server_activate` at some later time
998                        before calling :meth:`start`, so that the server will
999                        set up the socket and listen on it.
1000    """
1001    def __init__(self, addr, handler, poll_interval=0.5,
1002                 bind_and_activate=True):
1003        class DelegatingUDPRequestHandler(DatagramRequestHandler):
1004
1005            def handle(self):
1006                self.server._handler(self)
1007
1008            def finish(self):
1009                data = self.wfile.getvalue()
1010                if data:
1011                    try:
1012                        super(DelegatingUDPRequestHandler, self).finish()
1013                    except OSError:
1014                        if not self.server._closed:
1015                            raise
1016
1017        ThreadingUDPServer.__init__(self, addr,
1018                                    DelegatingUDPRequestHandler,
1019                                    bind_and_activate)
1020        ControlMixin.__init__(self, handler, poll_interval)
1021        self._closed = False
1022
1023    def server_bind(self):
1024        super(TestUDPServer, self).server_bind()
1025        self.port = self.socket.getsockname()[1]
1026
1027    def server_close(self):
1028        super(TestUDPServer, self).server_close()
1029        self._closed = True
1030
1031if hasattr(socket, "AF_UNIX"):
1032    class TestUnixStreamServer(TestTCPServer):
1033        address_family = socket.AF_UNIX
1034
1035    class TestUnixDatagramServer(TestUDPServer):
1036        address_family = socket.AF_UNIX
1037
1038# - end of server_helper section
1039
1040class SMTPHandlerTest(BaseTest):
1041    # bpo-14314, bpo-19665, bpo-34092: don't wait forever, timeout of 1 minute
1042    TIMEOUT = 60.0
1043
1044    def test_basic(self):
1045        sockmap = {}
1046        server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
1047                                sockmap)
1048        server.start()
1049        addr = (support.HOST, server.port)
1050        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
1051                                         timeout=self.TIMEOUT)
1052        self.assertEqual(h.toaddrs, ['you'])
1053        self.messages = []
1054        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
1055        self.handled = threading.Event()
1056        h.handle(r)
1057        self.handled.wait(self.TIMEOUT)
1058        server.stop()
1059        self.assertTrue(self.handled.is_set())
1060        self.assertEqual(len(self.messages), 1)
1061        peer, mailfrom, rcpttos, data = self.messages[0]
1062        self.assertEqual(mailfrom, 'me')
1063        self.assertEqual(rcpttos, ['you'])
1064        self.assertIn('\nSubject: Log\n', data)
1065        self.assertTrue(data.endswith('\n\nHello \u2713'))
1066        h.close()
1067
1068    def process_message(self, *args):
1069        self.messages.append(args)
1070        self.handled.set()
1071
1072class MemoryHandlerTest(BaseTest):
1073
1074    """Tests for the MemoryHandler."""
1075
1076    # Do not bother with a logger name group.
1077    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
1078
1079    def setUp(self):
1080        BaseTest.setUp(self)
1081        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1082                                                       self.root_hdlr)
1083        self.mem_logger = logging.getLogger('mem')
1084        self.mem_logger.propagate = 0
1085        self.mem_logger.addHandler(self.mem_hdlr)
1086
1087    def tearDown(self):
1088        self.mem_hdlr.close()
1089        BaseTest.tearDown(self)
1090
1091    def test_flush(self):
1092        # The memory handler flushes to its target handler based on specific
1093        #  criteria (message count and message level).
1094        self.mem_logger.debug(self.next_message())
1095        self.assert_log_lines([])
1096        self.mem_logger.info(self.next_message())
1097        self.assert_log_lines([])
1098        # This will flush because the level is >= logging.WARNING
1099        self.mem_logger.warning(self.next_message())
1100        lines = [
1101            ('DEBUG', '1'),
1102            ('INFO', '2'),
1103            ('WARNING', '3'),
1104        ]
1105        self.assert_log_lines(lines)
1106        for n in (4, 14):
1107            for i in range(9):
1108                self.mem_logger.debug(self.next_message())
1109            self.assert_log_lines(lines)
1110            # This will flush because it's the 10th message since the last
1111            #  flush.
1112            self.mem_logger.debug(self.next_message())
1113            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1114            self.assert_log_lines(lines)
1115
1116        self.mem_logger.debug(self.next_message())
1117        self.assert_log_lines(lines)
1118
1119    def test_flush_on_close(self):
1120        """
1121        Test that the flush-on-close configuration works as expected.
1122        """
1123        self.mem_logger.debug(self.next_message())
1124        self.assert_log_lines([])
1125        self.mem_logger.info(self.next_message())
1126        self.assert_log_lines([])
1127        self.mem_logger.removeHandler(self.mem_hdlr)
1128        # Default behaviour is to flush on close. Check that it happens.
1129        self.mem_hdlr.close()
1130        lines = [
1131            ('DEBUG', '1'),
1132            ('INFO', '2'),
1133        ]
1134        self.assert_log_lines(lines)
1135        # Now configure for flushing not to be done on close.
1136        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1137                                                       self.root_hdlr,
1138                                                       False)
1139        self.mem_logger.addHandler(self.mem_hdlr)
1140        self.mem_logger.debug(self.next_message())
1141        self.assert_log_lines(lines)  # no change
1142        self.mem_logger.info(self.next_message())
1143        self.assert_log_lines(lines)  # no change
1144        self.mem_logger.removeHandler(self.mem_hdlr)
1145        self.mem_hdlr.close()
1146        # assert that no new lines have been added
1147        self.assert_log_lines(lines)  # no change
1148
1149
1150class ExceptionFormatter(logging.Formatter):
1151    """A special exception formatter."""
1152    def formatException(self, ei):
1153        return "Got a [%s]" % ei[0].__name__
1154
1155
1156class ConfigFileTest(BaseTest):
1157
1158    """Reading logging config from a .ini-style config file."""
1159
1160    check_no_resource_warning = support.check_no_resource_warning
1161    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1162
1163    # config0 is a standard configuration.
1164    config0 = """
1165    [loggers]
1166    keys=root
1167
1168    [handlers]
1169    keys=hand1
1170
1171    [formatters]
1172    keys=form1
1173
1174    [logger_root]
1175    level=WARNING
1176    handlers=hand1
1177
1178    [handler_hand1]
1179    class=StreamHandler
1180    level=NOTSET
1181    formatter=form1
1182    args=(sys.stdout,)
1183
1184    [formatter_form1]
1185    format=%(levelname)s ++ %(message)s
1186    datefmt=
1187    """
1188
1189    # config1 adds a little to the standard configuration.
1190    config1 = """
1191    [loggers]
1192    keys=root,parser
1193
1194    [handlers]
1195    keys=hand1
1196
1197    [formatters]
1198    keys=form1
1199
1200    [logger_root]
1201    level=WARNING
1202    handlers=
1203
1204    [logger_parser]
1205    level=DEBUG
1206    handlers=hand1
1207    propagate=1
1208    qualname=compiler.parser
1209
1210    [handler_hand1]
1211    class=StreamHandler
1212    level=NOTSET
1213    formatter=form1
1214    args=(sys.stdout,)
1215
1216    [formatter_form1]
1217    format=%(levelname)s ++ %(message)s
1218    datefmt=
1219    """
1220
1221    # config1a moves the handler to the root.
1222    config1a = """
1223    [loggers]
1224    keys=root,parser
1225
1226    [handlers]
1227    keys=hand1
1228
1229    [formatters]
1230    keys=form1
1231
1232    [logger_root]
1233    level=WARNING
1234    handlers=hand1
1235
1236    [logger_parser]
1237    level=DEBUG
1238    handlers=
1239    propagate=1
1240    qualname=compiler.parser
1241
1242    [handler_hand1]
1243    class=StreamHandler
1244    level=NOTSET
1245    formatter=form1
1246    args=(sys.stdout,)
1247
1248    [formatter_form1]
1249    format=%(levelname)s ++ %(message)s
1250    datefmt=
1251    """
1252
1253    # config2 has a subtle configuration error that should be reported
1254    config2 = config1.replace("sys.stdout", "sys.stbout")
1255
1256    # config3 has a less subtle configuration error
1257    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1258
1259    # config4 specifies a custom formatter class to be loaded
1260    config4 = """
1261    [loggers]
1262    keys=root
1263
1264    [handlers]
1265    keys=hand1
1266
1267    [formatters]
1268    keys=form1
1269
1270    [logger_root]
1271    level=NOTSET
1272    handlers=hand1
1273
1274    [handler_hand1]
1275    class=StreamHandler
1276    level=NOTSET
1277    formatter=form1
1278    args=(sys.stdout,)
1279
1280    [formatter_form1]
1281    class=""" + __name__ + """.ExceptionFormatter
1282    format=%(levelname)s:%(name)s:%(message)s
1283    datefmt=
1284    """
1285
1286    # config5 specifies a custom handler class to be loaded
1287    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1288
1289    # config6 uses ', ' delimiters in the handlers and formatters sections
1290    config6 = """
1291    [loggers]
1292    keys=root,parser
1293
1294    [handlers]
1295    keys=hand1, hand2
1296
1297    [formatters]
1298    keys=form1, form2
1299
1300    [logger_root]
1301    level=WARNING
1302    handlers=
1303
1304    [logger_parser]
1305    level=DEBUG
1306    handlers=hand1
1307    propagate=1
1308    qualname=compiler.parser
1309
1310    [handler_hand1]
1311    class=StreamHandler
1312    level=NOTSET
1313    formatter=form1
1314    args=(sys.stdout,)
1315
1316    [handler_hand2]
1317    class=StreamHandler
1318    level=NOTSET
1319    formatter=form1
1320    args=(sys.stderr,)
1321
1322    [formatter_form1]
1323    format=%(levelname)s ++ %(message)s
1324    datefmt=
1325
1326    [formatter_form2]
1327    format=%(message)s
1328    datefmt=
1329    """
1330
1331    # config7 adds a compiler logger, and uses kwargs instead of args.
1332    config7 = """
1333    [loggers]
1334    keys=root,parser,compiler
1335
1336    [handlers]
1337    keys=hand1
1338
1339    [formatters]
1340    keys=form1
1341
1342    [logger_root]
1343    level=WARNING
1344    handlers=hand1
1345
1346    [logger_compiler]
1347    level=DEBUG
1348    handlers=
1349    propagate=1
1350    qualname=compiler
1351
1352    [logger_parser]
1353    level=DEBUG
1354    handlers=
1355    propagate=1
1356    qualname=compiler.parser
1357
1358    [handler_hand1]
1359    class=StreamHandler
1360    level=NOTSET
1361    formatter=form1
1362    kwargs={'stream': sys.stdout,}
1363
1364    [formatter_form1]
1365    format=%(levelname)s ++ %(message)s
1366    datefmt=
1367    """
1368
1369    # config 8, check for resource warning
1370    config8 = r"""
1371    [loggers]
1372    keys=root
1373
1374    [handlers]
1375    keys=file
1376
1377    [formatters]
1378    keys=
1379
1380    [logger_root]
1381    level=DEBUG
1382    handlers=file
1383
1384    [handler_file]
1385    class=FileHandler
1386    level=DEBUG
1387    args=("{tempfile}",)
1388    """
1389
1390    disable_test = """
1391    [loggers]
1392    keys=root
1393
1394    [handlers]
1395    keys=screen
1396
1397    [formatters]
1398    keys=
1399
1400    [logger_root]
1401    level=DEBUG
1402    handlers=screen
1403
1404    [handler_screen]
1405    level=DEBUG
1406    class=StreamHandler
1407    args=(sys.stdout,)
1408    formatter=
1409    """
1410
1411    def apply_config(self, conf, **kwargs):
1412        file = io.StringIO(textwrap.dedent(conf))
1413        logging.config.fileConfig(file, **kwargs)
1414
1415    def test_config0_ok(self):
1416        # A simple config file which overrides the default settings.
1417        with support.captured_stdout() as output:
1418            self.apply_config(self.config0)
1419            logger = logging.getLogger()
1420            # Won't output anything
1421            logger.info(self.next_message())
1422            # Outputs a message
1423            logger.error(self.next_message())
1424            self.assert_log_lines([
1425                ('ERROR', '2'),
1426            ], stream=output)
1427            # Original logger output is empty.
1428            self.assert_log_lines([])
1429
1430    def test_config0_using_cp_ok(self):
1431        # A simple config file which overrides the default settings.
1432        with support.captured_stdout() as output:
1433            file = io.StringIO(textwrap.dedent(self.config0))
1434            cp = configparser.ConfigParser()
1435            cp.read_file(file)
1436            logging.config.fileConfig(cp)
1437            logger = logging.getLogger()
1438            # Won't output anything
1439            logger.info(self.next_message())
1440            # Outputs a message
1441            logger.error(self.next_message())
1442            self.assert_log_lines([
1443                ('ERROR', '2'),
1444            ], stream=output)
1445            # Original logger output is empty.
1446            self.assert_log_lines([])
1447
1448    def test_config1_ok(self, config=config1):
1449        # A config file defining a sub-parser as well.
1450        with support.captured_stdout() as output:
1451            self.apply_config(config)
1452            logger = logging.getLogger("compiler.parser")
1453            # Both will output a message
1454            logger.info(self.next_message())
1455            logger.error(self.next_message())
1456            self.assert_log_lines([
1457                ('INFO', '1'),
1458                ('ERROR', '2'),
1459            ], stream=output)
1460            # Original logger output is empty.
1461            self.assert_log_lines([])
1462
1463    def test_config2_failure(self):
1464        # A simple config file which overrides the default settings.
1465        self.assertRaises(Exception, self.apply_config, self.config2)
1466
1467    def test_config3_failure(self):
1468        # A simple config file which overrides the default settings.
1469        self.assertRaises(Exception, self.apply_config, self.config3)
1470
1471    def test_config4_ok(self):
1472        # A config file specifying a custom formatter class.
1473        with support.captured_stdout() as output:
1474            self.apply_config(self.config4)
1475            logger = logging.getLogger()
1476            try:
1477                raise RuntimeError()
1478            except RuntimeError:
1479                logging.exception("just testing")
1480            sys.stdout.seek(0)
1481            self.assertEqual(output.getvalue(),
1482                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1483            # Original logger output is empty
1484            self.assert_log_lines([])
1485
1486    def test_config5_ok(self):
1487        self.test_config1_ok(config=self.config5)
1488
1489    def test_config6_ok(self):
1490        self.test_config1_ok(config=self.config6)
1491
1492    def test_config7_ok(self):
1493        with support.captured_stdout() as output:
1494            self.apply_config(self.config1a)
1495            logger = logging.getLogger("compiler.parser")
1496            # See issue #11424. compiler-hyphenated sorts
1497            # between compiler and compiler.xyz and this
1498            # was preventing compiler.xyz from being included
1499            # in the child loggers of compiler because of an
1500            # overzealous loop termination condition.
1501            hyphenated = logging.getLogger('compiler-hyphenated')
1502            # All will output a message
1503            logger.info(self.next_message())
1504            logger.error(self.next_message())
1505            hyphenated.critical(self.next_message())
1506            self.assert_log_lines([
1507                ('INFO', '1'),
1508                ('ERROR', '2'),
1509                ('CRITICAL', '3'),
1510            ], stream=output)
1511            # Original logger output is empty.
1512            self.assert_log_lines([])
1513        with support.captured_stdout() as output:
1514            self.apply_config(self.config7)
1515            logger = logging.getLogger("compiler.parser")
1516            self.assertFalse(logger.disabled)
1517            # Both will output a message
1518            logger.info(self.next_message())
1519            logger.error(self.next_message())
1520            logger = logging.getLogger("compiler.lexer")
1521            # Both will output a message
1522            logger.info(self.next_message())
1523            logger.error(self.next_message())
1524            # Will not appear
1525            hyphenated.critical(self.next_message())
1526            self.assert_log_lines([
1527                ('INFO', '4'),
1528                ('ERROR', '5'),
1529                ('INFO', '6'),
1530                ('ERROR', '7'),
1531            ], stream=output)
1532            # Original logger output is empty.
1533            self.assert_log_lines([])
1534
1535    def test_config8_ok(self):
1536
1537        def cleanup(h1, fn):
1538            h1.close()
1539            os.remove(fn)
1540
1541        with self.check_no_resource_warning():
1542            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
1543            os.close(fd)
1544
1545            # Replace single backslash with double backslash in windows
1546            # to avoid unicode error during string formatting
1547            if os.name == "nt":
1548                fn = fn.replace("\\", "\\\\")
1549
1550            config8 = self.config8.format(tempfile=fn)
1551
1552            self.apply_config(config8)
1553            self.apply_config(config8)
1554
1555        handler = logging.root.handlers[0]
1556        self.addCleanup(cleanup, handler, fn)
1557
1558    def test_logger_disabling(self):
1559        self.apply_config(self.disable_test)
1560        logger = logging.getLogger('some_pristine_logger')
1561        self.assertFalse(logger.disabled)
1562        self.apply_config(self.disable_test)
1563        self.assertTrue(logger.disabled)
1564        self.apply_config(self.disable_test, disable_existing_loggers=False)
1565        self.assertFalse(logger.disabled)
1566
1567    def test_defaults_do_no_interpolation(self):
1568        """bpo-33802 defaults should not get interpolated"""
1569        ini = textwrap.dedent("""
1570            [formatters]
1571            keys=default
1572
1573            [formatter_default]
1574
1575            [handlers]
1576            keys=console
1577
1578            [handler_console]
1579            class=logging.StreamHandler
1580            args=tuple()
1581
1582            [loggers]
1583            keys=root
1584
1585            [logger_root]
1586            formatter=default
1587            handlers=console
1588            """).strip()
1589        fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.ini')
1590        try:
1591            os.write(fd, ini.encode('ascii'))
1592            os.close(fd)
1593            logging.config.fileConfig(
1594                fn,
1595                defaults=dict(
1596                    version=1,
1597                    disable_existing_loggers=False,
1598                    formatters={
1599                        "generic": {
1600                            "format": "%(asctime)s [%(process)d] [%(levelname)s] %(message)s",
1601                            "datefmt": "[%Y-%m-%d %H:%M:%S %z]",
1602                            "class": "logging.Formatter"
1603                        },
1604                    },
1605                )
1606            )
1607        finally:
1608            os.unlink(fn)
1609
1610
1611class SocketHandlerTest(BaseTest):
1612
1613    """Test for SocketHandler objects."""
1614
1615    server_class = TestTCPServer
1616    address = ('localhost', 0)
1617
1618    def setUp(self):
1619        """Set up a TCP server to receive log messages, and a SocketHandler
1620        pointing to that server's address and port."""
1621        BaseTest.setUp(self)
1622        # Issue #29177: deal with errors that happen during setup
1623        self.server = self.sock_hdlr = self.server_exception = None
1624        try:
1625            self.server = server = self.server_class(self.address,
1626                                                     self.handle_socket, 0.01)
1627            server.start()
1628            # Uncomment next line to test error recovery in setUp()
1629            # raise OSError('dummy error raised')
1630        except OSError as e:
1631            self.server_exception = e
1632            return
1633        server.ready.wait()
1634        hcls = logging.handlers.SocketHandler
1635        if isinstance(server.server_address, tuple):
1636            self.sock_hdlr = hcls('localhost', server.port)
1637        else:
1638            self.sock_hdlr = hcls(server.server_address, None)
1639        self.log_output = ''
1640        self.root_logger.removeHandler(self.root_logger.handlers[0])
1641        self.root_logger.addHandler(self.sock_hdlr)
1642        self.handled = threading.Semaphore(0)
1643
1644    def tearDown(self):
1645        """Shutdown the TCP server."""
1646        try:
1647            if self.sock_hdlr:
1648                self.root_logger.removeHandler(self.sock_hdlr)
1649                self.sock_hdlr.close()
1650            if self.server:
1651                self.server.stop(2.0)
1652        finally:
1653            BaseTest.tearDown(self)
1654
1655    def handle_socket(self, request):
1656        conn = request.connection
1657        while True:
1658            chunk = conn.recv(4)
1659            if len(chunk) < 4:
1660                break
1661            slen = struct.unpack(">L", chunk)[0]
1662            chunk = conn.recv(slen)
1663            while len(chunk) < slen:
1664                chunk = chunk + conn.recv(slen - len(chunk))
1665            obj = pickle.loads(chunk)
1666            record = logging.makeLogRecord(obj)
1667            self.log_output += record.msg + '\n'
1668            self.handled.release()
1669
1670    def test_output(self):
1671        # The log message sent to the SocketHandler is properly received.
1672        if self.server_exception:
1673            self.skipTest(self.server_exception)
1674        logger = logging.getLogger("tcp")
1675        logger.error("spam")
1676        self.handled.acquire()
1677        logger.debug("eggs")
1678        self.handled.acquire()
1679        self.assertEqual(self.log_output, "spam\neggs\n")
1680
1681    def test_noserver(self):
1682        if self.server_exception:
1683            self.skipTest(self.server_exception)
1684        # Avoid timing-related failures due to SocketHandler's own hard-wired
1685        # one-second timeout on socket.create_connection() (issue #16264).
1686        self.sock_hdlr.retryStart = 2.5
1687        # Kill the server
1688        self.server.stop(2.0)
1689        # The logging call should try to connect, which should fail
1690        try:
1691            raise RuntimeError('Deliberate mistake')
1692        except RuntimeError:
1693            self.root_logger.exception('Never sent')
1694        self.root_logger.error('Never sent, either')
1695        now = time.time()
1696        self.assertGreater(self.sock_hdlr.retryTime, now)
1697        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1698        self.root_logger.error('Nor this')
1699
1700def _get_temp_domain_socket():
1701    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1702    os.close(fd)
1703    # just need a name - file can't be present, or we'll get an
1704    # 'address already in use' error.
1705    os.remove(fn)
1706    return fn
1707
1708@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1709class UnixSocketHandlerTest(SocketHandlerTest):
1710
1711    """Test for SocketHandler with unix sockets."""
1712
1713    if hasattr(socket, "AF_UNIX"):
1714        server_class = TestUnixStreamServer
1715
1716    def setUp(self):
1717        # override the definition in the base class
1718        self.address = _get_temp_domain_socket()
1719        SocketHandlerTest.setUp(self)
1720
1721    def tearDown(self):
1722        SocketHandlerTest.tearDown(self)
1723        support.unlink(self.address)
1724
1725class DatagramHandlerTest(BaseTest):
1726
1727    """Test for DatagramHandler."""
1728
1729    server_class = TestUDPServer
1730    address = ('localhost', 0)
1731
1732    def setUp(self):
1733        """Set up a UDP server to receive log messages, and a DatagramHandler
1734        pointing to that server's address and port."""
1735        BaseTest.setUp(self)
1736        # Issue #29177: deal with errors that happen during setup
1737        self.server = self.sock_hdlr = self.server_exception = None
1738        try:
1739            self.server = server = self.server_class(self.address,
1740                                                     self.handle_datagram, 0.01)
1741            server.start()
1742            # Uncomment next line to test error recovery in setUp()
1743            # raise OSError('dummy error raised')
1744        except OSError as e:
1745            self.server_exception = e
1746            return
1747        server.ready.wait()
1748        hcls = logging.handlers.DatagramHandler
1749        if isinstance(server.server_address, tuple):
1750            self.sock_hdlr = hcls('localhost', server.port)
1751        else:
1752            self.sock_hdlr = hcls(server.server_address, None)
1753        self.log_output = ''
1754        self.root_logger.removeHandler(self.root_logger.handlers[0])
1755        self.root_logger.addHandler(self.sock_hdlr)
1756        self.handled = threading.Event()
1757
1758    def tearDown(self):
1759        """Shutdown the UDP server."""
1760        try:
1761            if self.server:
1762                self.server.stop(2.0)
1763            if self.sock_hdlr:
1764                self.root_logger.removeHandler(self.sock_hdlr)
1765                self.sock_hdlr.close()
1766        finally:
1767            BaseTest.tearDown(self)
1768
1769    def handle_datagram(self, request):
1770        slen = struct.pack('>L', 0) # length of prefix
1771        packet = request.packet[len(slen):]
1772        obj = pickle.loads(packet)
1773        record = logging.makeLogRecord(obj)
1774        self.log_output += record.msg + '\n'
1775        self.handled.set()
1776
1777    def test_output(self):
1778        # The log message sent to the DatagramHandler is properly received.
1779        if self.server_exception:
1780            self.skipTest(self.server_exception)
1781        logger = logging.getLogger("udp")
1782        logger.error("spam")
1783        self.handled.wait()
1784        self.handled.clear()
1785        logger.error("eggs")
1786        self.handled.wait()
1787        self.assertEqual(self.log_output, "spam\neggs\n")
1788
1789@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1790class UnixDatagramHandlerTest(DatagramHandlerTest):
1791
1792    """Test for DatagramHandler using Unix sockets."""
1793
1794    if hasattr(socket, "AF_UNIX"):
1795        server_class = TestUnixDatagramServer
1796
1797    def setUp(self):
1798        # override the definition in the base class
1799        self.address = _get_temp_domain_socket()
1800        DatagramHandlerTest.setUp(self)
1801
1802    def tearDown(self):
1803        DatagramHandlerTest.tearDown(self)
1804        support.unlink(self.address)
1805
1806class SysLogHandlerTest(BaseTest):
1807
1808    """Test for SysLogHandler using UDP."""
1809
1810    server_class = TestUDPServer
1811    address = ('localhost', 0)
1812
1813    def setUp(self):
1814        """Set up a UDP server to receive log messages, and a SysLogHandler
1815        pointing to that server's address and port."""
1816        BaseTest.setUp(self)
1817        # Issue #29177: deal with errors that happen during setup
1818        self.server = self.sl_hdlr = self.server_exception = None
1819        try:
1820            self.server = server = self.server_class(self.address,
1821                                                     self.handle_datagram, 0.01)
1822            server.start()
1823            # Uncomment next line to test error recovery in setUp()
1824            # raise OSError('dummy error raised')
1825        except OSError as e:
1826            self.server_exception = e
1827            return
1828        server.ready.wait()
1829        hcls = logging.handlers.SysLogHandler
1830        if isinstance(server.server_address, tuple):
1831            self.sl_hdlr = hcls((server.server_address[0], server.port))
1832        else:
1833            self.sl_hdlr = hcls(server.server_address)
1834        self.log_output = ''
1835        self.root_logger.removeHandler(self.root_logger.handlers[0])
1836        self.root_logger.addHandler(self.sl_hdlr)
1837        self.handled = threading.Event()
1838
1839    def tearDown(self):
1840        """Shutdown the server."""
1841        try:
1842            if self.server:
1843                self.server.stop(2.0)
1844            if self.sl_hdlr:
1845                self.root_logger.removeHandler(self.sl_hdlr)
1846                self.sl_hdlr.close()
1847        finally:
1848            BaseTest.tearDown(self)
1849
1850    def handle_datagram(self, request):
1851        self.log_output = request.packet
1852        self.handled.set()
1853
1854    def test_output(self):
1855        if self.server_exception:
1856            self.skipTest(self.server_exception)
1857        # The log message sent to the SysLogHandler is properly received.
1858        logger = logging.getLogger("slh")
1859        logger.error("sp\xe4m")
1860        self.handled.wait()
1861        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1862        self.handled.clear()
1863        self.sl_hdlr.append_nul = False
1864        logger.error("sp\xe4m")
1865        self.handled.wait()
1866        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1867        self.handled.clear()
1868        self.sl_hdlr.ident = "h\xe4m-"
1869        logger.error("sp\xe4m")
1870        self.handled.wait()
1871        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1872
1873@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1874class UnixSysLogHandlerTest(SysLogHandlerTest):
1875
1876    """Test for SysLogHandler with Unix sockets."""
1877
1878    if hasattr(socket, "AF_UNIX"):
1879        server_class = TestUnixDatagramServer
1880
1881    def setUp(self):
1882        # override the definition in the base class
1883        self.address = _get_temp_domain_socket()
1884        SysLogHandlerTest.setUp(self)
1885
1886    def tearDown(self):
1887        SysLogHandlerTest.tearDown(self)
1888        support.unlink(self.address)
1889
1890@unittest.skipUnless(support.IPV6_ENABLED,
1891                     'IPv6 support required for this test.')
1892class IPv6SysLogHandlerTest(SysLogHandlerTest):
1893
1894    """Test for SysLogHandler with IPv6 host."""
1895
1896    server_class = TestUDPServer
1897    address = ('::1', 0)
1898
1899    def setUp(self):
1900        self.server_class.address_family = socket.AF_INET6
1901        super(IPv6SysLogHandlerTest, self).setUp()
1902
1903    def tearDown(self):
1904        self.server_class.address_family = socket.AF_INET
1905        super(IPv6SysLogHandlerTest, self).tearDown()
1906
1907class HTTPHandlerTest(BaseTest):
1908    """Test for HTTPHandler."""
1909
1910    def setUp(self):
1911        """Set up an HTTP server to receive log messages, and a HTTPHandler
1912        pointing to that server's address and port."""
1913        BaseTest.setUp(self)
1914        self.handled = threading.Event()
1915
1916    def handle_request(self, request):
1917        self.command = request.command
1918        self.log_data = urlparse(request.path)
1919        if self.command == 'POST':
1920            try:
1921                rlen = int(request.headers['Content-Length'])
1922                self.post_data = request.rfile.read(rlen)
1923            except:
1924                self.post_data = None
1925        request.send_response(200)
1926        request.end_headers()
1927        self.handled.set()
1928
1929    def test_output(self):
1930        # The log message sent to the HTTPHandler is properly received.
1931        logger = logging.getLogger("http")
1932        root_logger = self.root_logger
1933        root_logger.removeHandler(self.root_logger.handlers[0])
1934        for secure in (False, True):
1935            addr = ('localhost', 0)
1936            if secure:
1937                try:
1938                    import ssl
1939                except ImportError:
1940                    sslctx = None
1941                else:
1942                    here = os.path.dirname(__file__)
1943                    localhost_cert = os.path.join(here, "keycert.pem")
1944                    sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
1945                    sslctx.load_cert_chain(localhost_cert)
1946
1947                    context = ssl.create_default_context(cafile=localhost_cert)
1948            else:
1949                sslctx = None
1950                context = None
1951            self.server = server = TestHTTPServer(addr, self.handle_request,
1952                                                    0.01, sslctx=sslctx)
1953            server.start()
1954            server.ready.wait()
1955            host = 'localhost:%d' % server.server_port
1956            secure_client = secure and sslctx
1957            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
1958                                                       secure=secure_client,
1959                                                       context=context,
1960                                                       credentials=('foo', 'bar'))
1961            self.log_data = None
1962            root_logger.addHandler(self.h_hdlr)
1963
1964            for method in ('GET', 'POST'):
1965                self.h_hdlr.method = method
1966                self.handled.clear()
1967                msg = "sp\xe4m"
1968                logger.error(msg)
1969                self.handled.wait()
1970                self.assertEqual(self.log_data.path, '/frob')
1971                self.assertEqual(self.command, method)
1972                if method == 'GET':
1973                    d = parse_qs(self.log_data.query)
1974                else:
1975                    d = parse_qs(self.post_data.decode('utf-8'))
1976                self.assertEqual(d['name'], ['http'])
1977                self.assertEqual(d['funcName'], ['test_output'])
1978                self.assertEqual(d['msg'], [msg])
1979
1980            self.server.stop(2.0)
1981            self.root_logger.removeHandler(self.h_hdlr)
1982            self.h_hdlr.close()
1983
1984class MemoryTest(BaseTest):
1985
1986    """Test memory persistence of logger objects."""
1987
1988    def setUp(self):
1989        """Create a dict to remember potentially destroyed objects."""
1990        BaseTest.setUp(self)
1991        self._survivors = {}
1992
1993    def _watch_for_survival(self, *args):
1994        """Watch the given objects for survival, by creating weakrefs to
1995        them."""
1996        for obj in args:
1997            key = id(obj), repr(obj)
1998            self._survivors[key] = weakref.ref(obj)
1999
2000    def _assertTruesurvival(self):
2001        """Assert that all objects watched for survival have survived."""
2002        # Trigger cycle breaking.
2003        gc.collect()
2004        dead = []
2005        for (id_, repr_), ref in self._survivors.items():
2006            if ref() is None:
2007                dead.append(repr_)
2008        if dead:
2009            self.fail("%d objects should have survived "
2010                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
2011
2012    def test_persistent_loggers(self):
2013        # Logger objects are persistent and retain their configuration, even
2014        #  if visible references are destroyed.
2015        self.root_logger.setLevel(logging.INFO)
2016        foo = logging.getLogger("foo")
2017        self._watch_for_survival(foo)
2018        foo.setLevel(logging.DEBUG)
2019        self.root_logger.debug(self.next_message())
2020        foo.debug(self.next_message())
2021        self.assert_log_lines([
2022            ('foo', 'DEBUG', '2'),
2023        ])
2024        del foo
2025        # foo has survived.
2026        self._assertTruesurvival()
2027        # foo has retained its settings.
2028        bar = logging.getLogger("foo")
2029        bar.debug(self.next_message())
2030        self.assert_log_lines([
2031            ('foo', 'DEBUG', '2'),
2032            ('foo', 'DEBUG', '3'),
2033        ])
2034
2035
2036class EncodingTest(BaseTest):
2037    def test_encoding_plain_file(self):
2038        # In Python 2.x, a plain file object is treated as having no encoding.
2039        log = logging.getLogger("test")
2040        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
2041        os.close(fd)
2042        # the non-ascii data we write to the log.
2043        data = "foo\x80"
2044        try:
2045            handler = logging.FileHandler(fn, encoding="utf-8")
2046            log.addHandler(handler)
2047            try:
2048                # write non-ascii data to the log.
2049                log.warning(data)
2050            finally:
2051                log.removeHandler(handler)
2052                handler.close()
2053            # check we wrote exactly those bytes, ignoring trailing \n etc
2054            f = open(fn, encoding="utf-8")
2055            try:
2056                self.assertEqual(f.read().rstrip(), data)
2057            finally:
2058                f.close()
2059        finally:
2060            if os.path.isfile(fn):
2061                os.remove(fn)
2062
2063    def test_encoding_cyrillic_unicode(self):
2064        log = logging.getLogger("test")
2065        # Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
2066        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
2067        # Ensure it's written in a Cyrillic encoding
2068        writer_class = codecs.getwriter('cp1251')
2069        writer_class.encoding = 'cp1251'
2070        stream = io.BytesIO()
2071        writer = writer_class(stream, 'strict')
2072        handler = logging.StreamHandler(writer)
2073        log.addHandler(handler)
2074        try:
2075            log.warning(message)
2076        finally:
2077            log.removeHandler(handler)
2078            handler.close()
2079        # check we wrote exactly those bytes, ignoring trailing \n etc
2080        s = stream.getvalue()
2081        # Compare against what the data should be when encoded in CP-1251
2082        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
2083
2084
2085class WarningsTest(BaseTest):
2086
2087    def test_warnings(self):
2088        with warnings.catch_warnings():
2089            logging.captureWarnings(True)
2090            self.addCleanup(logging.captureWarnings, False)
2091            warnings.filterwarnings("always", category=UserWarning)
2092            stream = io.StringIO()
2093            h = logging.StreamHandler(stream)
2094            logger = logging.getLogger("py.warnings")
2095            logger.addHandler(h)
2096            warnings.warn("I'm warning you...")
2097            logger.removeHandler(h)
2098            s = stream.getvalue()
2099            h.close()
2100            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
2101
2102            # See if an explicit file uses the original implementation
2103            a_file = io.StringIO()
2104            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
2105                                 a_file, "Dummy line")
2106            s = a_file.getvalue()
2107            a_file.close()
2108            self.assertEqual(s,
2109                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
2110
2111    def test_warnings_no_handlers(self):
2112        with warnings.catch_warnings():
2113            logging.captureWarnings(True)
2114            self.addCleanup(logging.captureWarnings, False)
2115
2116            # confirm our assumption: no loggers are set
2117            logger = logging.getLogger("py.warnings")
2118            self.assertEqual(logger.handlers, [])
2119
2120            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
2121            self.assertEqual(len(logger.handlers), 1)
2122            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
2123
2124
2125def formatFunc(format, datefmt=None):
2126    return logging.Formatter(format, datefmt)
2127
2128def handlerFunc():
2129    return logging.StreamHandler()
2130
2131class CustomHandler(logging.StreamHandler):
2132    pass
2133
2134class ConfigDictTest(BaseTest):
2135
2136    """Reading logging config from a dictionary."""
2137
2138    check_no_resource_warning = support.check_no_resource_warning
2139    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
2140
2141    # config0 is a standard configuration.
2142    config0 = {
2143        'version': 1,
2144        'formatters': {
2145            'form1' : {
2146                'format' : '%(levelname)s ++ %(message)s',
2147            },
2148        },
2149        'handlers' : {
2150            'hand1' : {
2151                'class' : 'logging.StreamHandler',
2152                'formatter' : 'form1',
2153                'level' : 'NOTSET',
2154                'stream'  : 'ext://sys.stdout',
2155            },
2156        },
2157        'root' : {
2158            'level' : 'WARNING',
2159            'handlers' : ['hand1'],
2160        },
2161    }
2162
2163    # config1 adds a little to the standard configuration.
2164    config1 = {
2165        'version': 1,
2166        'formatters': {
2167            'form1' : {
2168                'format' : '%(levelname)s ++ %(message)s',
2169            },
2170        },
2171        'handlers' : {
2172            'hand1' : {
2173                'class' : 'logging.StreamHandler',
2174                'formatter' : 'form1',
2175                'level' : 'NOTSET',
2176                'stream'  : 'ext://sys.stdout',
2177            },
2178        },
2179        'loggers' : {
2180            'compiler.parser' : {
2181                'level' : 'DEBUG',
2182                'handlers' : ['hand1'],
2183            },
2184        },
2185        'root' : {
2186            'level' : 'WARNING',
2187        },
2188    }
2189
2190    # config1a moves the handler to the root. Used with config8a
2191    config1a = {
2192        'version': 1,
2193        'formatters': {
2194            'form1' : {
2195                'format' : '%(levelname)s ++ %(message)s',
2196            },
2197        },
2198        'handlers' : {
2199            'hand1' : {
2200                'class' : 'logging.StreamHandler',
2201                'formatter' : 'form1',
2202                'level' : 'NOTSET',
2203                'stream'  : 'ext://sys.stdout',
2204            },
2205        },
2206        'loggers' : {
2207            'compiler.parser' : {
2208                'level' : 'DEBUG',
2209            },
2210        },
2211        'root' : {
2212            'level' : 'WARNING',
2213            'handlers' : ['hand1'],
2214        },
2215    }
2216
2217    # config2 has a subtle configuration error that should be reported
2218    config2 = {
2219        'version': 1,
2220        'formatters': {
2221            'form1' : {
2222                'format' : '%(levelname)s ++ %(message)s',
2223            },
2224        },
2225        'handlers' : {
2226            'hand1' : {
2227                'class' : 'logging.StreamHandler',
2228                'formatter' : 'form1',
2229                'level' : 'NOTSET',
2230                'stream'  : 'ext://sys.stdbout',
2231            },
2232        },
2233        'loggers' : {
2234            'compiler.parser' : {
2235                'level' : 'DEBUG',
2236                'handlers' : ['hand1'],
2237            },
2238        },
2239        'root' : {
2240            'level' : 'WARNING',
2241        },
2242    }
2243
2244    # As config1 but with a misspelt level on a handler
2245    config2a = {
2246        'version': 1,
2247        'formatters': {
2248            'form1' : {
2249                'format' : '%(levelname)s ++ %(message)s',
2250            },
2251        },
2252        'handlers' : {
2253            'hand1' : {
2254                'class' : 'logging.StreamHandler',
2255                'formatter' : 'form1',
2256                'level' : 'NTOSET',
2257                'stream'  : 'ext://sys.stdout',
2258            },
2259        },
2260        'loggers' : {
2261            'compiler.parser' : {
2262                'level' : 'DEBUG',
2263                'handlers' : ['hand1'],
2264            },
2265        },
2266        'root' : {
2267            'level' : 'WARNING',
2268        },
2269    }
2270
2271
2272    # As config1 but with a misspelt level on a logger
2273    config2b = {
2274        'version': 1,
2275        'formatters': {
2276            'form1' : {
2277                'format' : '%(levelname)s ++ %(message)s',
2278            },
2279        },
2280        'handlers' : {
2281            'hand1' : {
2282                'class' : 'logging.StreamHandler',
2283                'formatter' : 'form1',
2284                'level' : 'NOTSET',
2285                'stream'  : 'ext://sys.stdout',
2286            },
2287        },
2288        'loggers' : {
2289            'compiler.parser' : {
2290                'level' : 'DEBUG',
2291                'handlers' : ['hand1'],
2292            },
2293        },
2294        'root' : {
2295            'level' : 'WRANING',
2296        },
2297    }
2298
2299    # config3 has a less subtle configuration error
2300    config3 = {
2301        'version': 1,
2302        'formatters': {
2303            'form1' : {
2304                'format' : '%(levelname)s ++ %(message)s',
2305            },
2306        },
2307        'handlers' : {
2308            'hand1' : {
2309                'class' : 'logging.StreamHandler',
2310                'formatter' : 'misspelled_name',
2311                'level' : 'NOTSET',
2312                'stream'  : 'ext://sys.stdout',
2313            },
2314        },
2315        'loggers' : {
2316            'compiler.parser' : {
2317                'level' : 'DEBUG',
2318                'handlers' : ['hand1'],
2319            },
2320        },
2321        'root' : {
2322            'level' : 'WARNING',
2323        },
2324    }
2325
2326    # config4 specifies a custom formatter class to be loaded
2327    config4 = {
2328        'version': 1,
2329        'formatters': {
2330            'form1' : {
2331                '()' : __name__ + '.ExceptionFormatter',
2332                'format' : '%(levelname)s:%(name)s:%(message)s',
2333            },
2334        },
2335        'handlers' : {
2336            'hand1' : {
2337                'class' : 'logging.StreamHandler',
2338                'formatter' : 'form1',
2339                'level' : 'NOTSET',
2340                'stream'  : 'ext://sys.stdout',
2341            },
2342        },
2343        'root' : {
2344            'level' : 'NOTSET',
2345                'handlers' : ['hand1'],
2346        },
2347    }
2348
2349    # As config4 but using an actual callable rather than a string
2350    config4a = {
2351        'version': 1,
2352        'formatters': {
2353            'form1' : {
2354                '()' : ExceptionFormatter,
2355                'format' : '%(levelname)s:%(name)s:%(message)s',
2356            },
2357            'form2' : {
2358                '()' : __name__ + '.formatFunc',
2359                'format' : '%(levelname)s:%(name)s:%(message)s',
2360            },
2361            'form3' : {
2362                '()' : formatFunc,
2363                'format' : '%(levelname)s:%(name)s:%(message)s',
2364            },
2365        },
2366        'handlers' : {
2367            'hand1' : {
2368                'class' : 'logging.StreamHandler',
2369                'formatter' : 'form1',
2370                'level' : 'NOTSET',
2371                'stream'  : 'ext://sys.stdout',
2372            },
2373            'hand2' : {
2374                '()' : handlerFunc,
2375            },
2376        },
2377        'root' : {
2378            'level' : 'NOTSET',
2379                'handlers' : ['hand1'],
2380        },
2381    }
2382
2383    # config5 specifies a custom handler class to be loaded
2384    config5 = {
2385        'version': 1,
2386        'formatters': {
2387            'form1' : {
2388                'format' : '%(levelname)s ++ %(message)s',
2389            },
2390        },
2391        'handlers' : {
2392            'hand1' : {
2393                'class' : __name__ + '.CustomHandler',
2394                'formatter' : 'form1',
2395                'level' : 'NOTSET',
2396                'stream'  : 'ext://sys.stdout',
2397            },
2398        },
2399        'loggers' : {
2400            'compiler.parser' : {
2401                'level' : 'DEBUG',
2402                'handlers' : ['hand1'],
2403            },
2404        },
2405        'root' : {
2406            'level' : 'WARNING',
2407        },
2408    }
2409
2410    # config6 specifies a custom handler class to be loaded
2411    # but has bad arguments
2412    config6 = {
2413        'version': 1,
2414        'formatters': {
2415            'form1' : {
2416                'format' : '%(levelname)s ++ %(message)s',
2417            },
2418        },
2419        'handlers' : {
2420            'hand1' : {
2421                'class' : __name__ + '.CustomHandler',
2422                'formatter' : 'form1',
2423                'level' : 'NOTSET',
2424                'stream'  : 'ext://sys.stdout',
2425                '9' : 'invalid parameter name',
2426            },
2427        },
2428        'loggers' : {
2429            'compiler.parser' : {
2430                'level' : 'DEBUG',
2431                'handlers' : ['hand1'],
2432            },
2433        },
2434        'root' : {
2435            'level' : 'WARNING',
2436        },
2437    }
2438
2439    # config 7 does not define compiler.parser but defines compiler.lexer
2440    # so compiler.parser should be disabled after applying it
2441    config7 = {
2442        'version': 1,
2443        'formatters': {
2444            'form1' : {
2445                'format' : '%(levelname)s ++ %(message)s',
2446            },
2447        },
2448        'handlers' : {
2449            'hand1' : {
2450                'class' : 'logging.StreamHandler',
2451                'formatter' : 'form1',
2452                'level' : 'NOTSET',
2453                'stream'  : 'ext://sys.stdout',
2454            },
2455        },
2456        'loggers' : {
2457            'compiler.lexer' : {
2458                'level' : 'DEBUG',
2459                'handlers' : ['hand1'],
2460            },
2461        },
2462        'root' : {
2463            'level' : 'WARNING',
2464        },
2465    }
2466
2467    # config8 defines both compiler and compiler.lexer
2468    # so compiler.parser should not be disabled (since
2469    # compiler is defined)
2470    config8 = {
2471        'version': 1,
2472        'disable_existing_loggers' : False,
2473        'formatters': {
2474            'form1' : {
2475                'format' : '%(levelname)s ++ %(message)s',
2476            },
2477        },
2478        'handlers' : {
2479            'hand1' : {
2480                'class' : 'logging.StreamHandler',
2481                'formatter' : 'form1',
2482                'level' : 'NOTSET',
2483                'stream'  : 'ext://sys.stdout',
2484            },
2485        },
2486        'loggers' : {
2487            'compiler' : {
2488                'level' : 'DEBUG',
2489                'handlers' : ['hand1'],
2490            },
2491            'compiler.lexer' : {
2492            },
2493        },
2494        'root' : {
2495            'level' : 'WARNING',
2496        },
2497    }
2498
2499    # config8a disables existing loggers
2500    config8a = {
2501        'version': 1,
2502        'disable_existing_loggers' : True,
2503        'formatters': {
2504            'form1' : {
2505                'format' : '%(levelname)s ++ %(message)s',
2506            },
2507        },
2508        'handlers' : {
2509            'hand1' : {
2510                'class' : 'logging.StreamHandler',
2511                'formatter' : 'form1',
2512                'level' : 'NOTSET',
2513                'stream'  : 'ext://sys.stdout',
2514            },
2515        },
2516        'loggers' : {
2517            'compiler' : {
2518                'level' : 'DEBUG',
2519                'handlers' : ['hand1'],
2520            },
2521            'compiler.lexer' : {
2522            },
2523        },
2524        'root' : {
2525            'level' : 'WARNING',
2526        },
2527    }
2528
2529    config9 = {
2530        'version': 1,
2531        'formatters': {
2532            'form1' : {
2533                'format' : '%(levelname)s ++ %(message)s',
2534            },
2535        },
2536        'handlers' : {
2537            'hand1' : {
2538                'class' : 'logging.StreamHandler',
2539                'formatter' : 'form1',
2540                'level' : 'WARNING',
2541                'stream'  : 'ext://sys.stdout',
2542            },
2543        },
2544        'loggers' : {
2545            'compiler.parser' : {
2546                'level' : 'WARNING',
2547                'handlers' : ['hand1'],
2548            },
2549        },
2550        'root' : {
2551            'level' : 'NOTSET',
2552        },
2553    }
2554
2555    config9a = {
2556        'version': 1,
2557        'incremental' : True,
2558        'handlers' : {
2559            'hand1' : {
2560                'level' : 'WARNING',
2561            },
2562        },
2563        'loggers' : {
2564            'compiler.parser' : {
2565                'level' : 'INFO',
2566            },
2567        },
2568    }
2569
2570    config9b = {
2571        'version': 1,
2572        'incremental' : True,
2573        'handlers' : {
2574            'hand1' : {
2575                'level' : 'INFO',
2576            },
2577        },
2578        'loggers' : {
2579            'compiler.parser' : {
2580                'level' : 'INFO',
2581            },
2582        },
2583    }
2584
2585    # As config1 but with a filter added
2586    config10 = {
2587        'version': 1,
2588        'formatters': {
2589            'form1' : {
2590                'format' : '%(levelname)s ++ %(message)s',
2591            },
2592        },
2593        'filters' : {
2594            'filt1' : {
2595                'name' : 'compiler.parser',
2596            },
2597        },
2598        'handlers' : {
2599            'hand1' : {
2600                'class' : 'logging.StreamHandler',
2601                'formatter' : 'form1',
2602                'level' : 'NOTSET',
2603                'stream'  : 'ext://sys.stdout',
2604                'filters' : ['filt1'],
2605            },
2606        },
2607        'loggers' : {
2608            'compiler.parser' : {
2609                'level' : 'DEBUG',
2610                'filters' : ['filt1'],
2611            },
2612        },
2613        'root' : {
2614            'level' : 'WARNING',
2615            'handlers' : ['hand1'],
2616        },
2617    }
2618
2619    # As config1 but using cfg:// references
2620    config11 = {
2621        'version': 1,
2622        'true_formatters': {
2623            'form1' : {
2624                'format' : '%(levelname)s ++ %(message)s',
2625            },
2626        },
2627        'handler_configs': {
2628            'hand1' : {
2629                'class' : 'logging.StreamHandler',
2630                'formatter' : 'form1',
2631                'level' : 'NOTSET',
2632                'stream'  : 'ext://sys.stdout',
2633            },
2634        },
2635        'formatters' : 'cfg://true_formatters',
2636        'handlers' : {
2637            'hand1' : 'cfg://handler_configs[hand1]',
2638        },
2639        'loggers' : {
2640            'compiler.parser' : {
2641                'level' : 'DEBUG',
2642                'handlers' : ['hand1'],
2643            },
2644        },
2645        'root' : {
2646            'level' : 'WARNING',
2647        },
2648    }
2649
2650    # As config11 but missing the version key
2651    config12 = {
2652        'true_formatters': {
2653            'form1' : {
2654                'format' : '%(levelname)s ++ %(message)s',
2655            },
2656        },
2657        'handler_configs': {
2658            'hand1' : {
2659                'class' : 'logging.StreamHandler',
2660                'formatter' : 'form1',
2661                'level' : 'NOTSET',
2662                'stream'  : 'ext://sys.stdout',
2663            },
2664        },
2665        'formatters' : 'cfg://true_formatters',
2666        'handlers' : {
2667            'hand1' : 'cfg://handler_configs[hand1]',
2668        },
2669        'loggers' : {
2670            'compiler.parser' : {
2671                'level' : 'DEBUG',
2672                'handlers' : ['hand1'],
2673            },
2674        },
2675        'root' : {
2676            'level' : 'WARNING',
2677        },
2678    }
2679
2680    # As config11 but using an unsupported version
2681    config13 = {
2682        'version': 2,
2683        'true_formatters': {
2684            'form1' : {
2685                'format' : '%(levelname)s ++ %(message)s',
2686            },
2687        },
2688        'handler_configs': {
2689            'hand1' : {
2690                'class' : 'logging.StreamHandler',
2691                'formatter' : 'form1',
2692                'level' : 'NOTSET',
2693                'stream'  : 'ext://sys.stdout',
2694            },
2695        },
2696        'formatters' : 'cfg://true_formatters',
2697        'handlers' : {
2698            'hand1' : 'cfg://handler_configs[hand1]',
2699        },
2700        'loggers' : {
2701            'compiler.parser' : {
2702                'level' : 'DEBUG',
2703                'handlers' : ['hand1'],
2704            },
2705        },
2706        'root' : {
2707            'level' : 'WARNING',
2708        },
2709    }
2710
2711    # As config0, but with properties
2712    config14 = {
2713        'version': 1,
2714        'formatters': {
2715            'form1' : {
2716                'format' : '%(levelname)s ++ %(message)s',
2717            },
2718        },
2719        'handlers' : {
2720            'hand1' : {
2721                'class' : 'logging.StreamHandler',
2722                'formatter' : 'form1',
2723                'level' : 'NOTSET',
2724                'stream'  : 'ext://sys.stdout',
2725                '.': {
2726                    'foo': 'bar',
2727                    'terminator': '!\n',
2728                }
2729            },
2730        },
2731        'root' : {
2732            'level' : 'WARNING',
2733            'handlers' : ['hand1'],
2734        },
2735    }
2736
2737    out_of_order = {
2738        "version": 1,
2739        "formatters": {
2740            "mySimpleFormatter": {
2741                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2742                "style": "$"
2743            }
2744        },
2745        "handlers": {
2746            "fileGlobal": {
2747                "class": "logging.StreamHandler",
2748                "level": "DEBUG",
2749                "formatter": "mySimpleFormatter"
2750            },
2751            "bufferGlobal": {
2752                "class": "logging.handlers.MemoryHandler",
2753                "capacity": 5,
2754                "formatter": "mySimpleFormatter",
2755                "target": "fileGlobal",
2756                "level": "DEBUG"
2757                }
2758        },
2759        "loggers": {
2760            "mymodule": {
2761                "level": "DEBUG",
2762                "handlers": ["bufferGlobal"],
2763                "propagate": "true"
2764            }
2765        }
2766    }
2767
2768    def apply_config(self, conf):
2769        logging.config.dictConfig(conf)
2770
2771    def test_config0_ok(self):
2772        # A simple config which overrides the default settings.
2773        with support.captured_stdout() as output:
2774            self.apply_config(self.config0)
2775            logger = logging.getLogger()
2776            # Won't output anything
2777            logger.info(self.next_message())
2778            # Outputs a message
2779            logger.error(self.next_message())
2780            self.assert_log_lines([
2781                ('ERROR', '2'),
2782            ], stream=output)
2783            # Original logger output is empty.
2784            self.assert_log_lines([])
2785
2786    def test_config1_ok(self, config=config1):
2787        # A config defining a sub-parser as well.
2788        with support.captured_stdout() as output:
2789            self.apply_config(config)
2790            logger = logging.getLogger("compiler.parser")
2791            # Both will output a message
2792            logger.info(self.next_message())
2793            logger.error(self.next_message())
2794            self.assert_log_lines([
2795                ('INFO', '1'),
2796                ('ERROR', '2'),
2797            ], stream=output)
2798            # Original logger output is empty.
2799            self.assert_log_lines([])
2800
2801    def test_config2_failure(self):
2802        # A simple config which overrides the default settings.
2803        self.assertRaises(Exception, self.apply_config, self.config2)
2804
2805    def test_config2a_failure(self):
2806        # A simple config which overrides the default settings.
2807        self.assertRaises(Exception, self.apply_config, self.config2a)
2808
2809    def test_config2b_failure(self):
2810        # A simple config which overrides the default settings.
2811        self.assertRaises(Exception, self.apply_config, self.config2b)
2812
2813    def test_config3_failure(self):
2814        # A simple config which overrides the default settings.
2815        self.assertRaises(Exception, self.apply_config, self.config3)
2816
2817    def test_config4_ok(self):
2818        # A config specifying a custom formatter class.
2819        with support.captured_stdout() as output:
2820            self.apply_config(self.config4)
2821            #logger = logging.getLogger()
2822            try:
2823                raise RuntimeError()
2824            except RuntimeError:
2825                logging.exception("just testing")
2826            sys.stdout.seek(0)
2827            self.assertEqual(output.getvalue(),
2828                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2829            # Original logger output is empty
2830            self.assert_log_lines([])
2831
2832    def test_config4a_ok(self):
2833        # A config specifying a custom formatter class.
2834        with support.captured_stdout() as output:
2835            self.apply_config(self.config4a)
2836            #logger = logging.getLogger()
2837            try:
2838                raise RuntimeError()
2839            except RuntimeError:
2840                logging.exception("just testing")
2841            sys.stdout.seek(0)
2842            self.assertEqual(output.getvalue(),
2843                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2844            # Original logger output is empty
2845            self.assert_log_lines([])
2846
2847    def test_config5_ok(self):
2848        self.test_config1_ok(config=self.config5)
2849
2850    def test_config6_failure(self):
2851        self.assertRaises(Exception, self.apply_config, self.config6)
2852
2853    def test_config7_ok(self):
2854        with support.captured_stdout() as output:
2855            self.apply_config(self.config1)
2856            logger = logging.getLogger("compiler.parser")
2857            # Both will output a message
2858            logger.info(self.next_message())
2859            logger.error(self.next_message())
2860            self.assert_log_lines([
2861                ('INFO', '1'),
2862                ('ERROR', '2'),
2863            ], stream=output)
2864            # Original logger output is empty.
2865            self.assert_log_lines([])
2866        with support.captured_stdout() as output:
2867            self.apply_config(self.config7)
2868            logger = logging.getLogger("compiler.parser")
2869            self.assertTrue(logger.disabled)
2870            logger = logging.getLogger("compiler.lexer")
2871            # Both will output a message
2872            logger.info(self.next_message())
2873            logger.error(self.next_message())
2874            self.assert_log_lines([
2875                ('INFO', '3'),
2876                ('ERROR', '4'),
2877            ], stream=output)
2878            # Original logger output is empty.
2879            self.assert_log_lines([])
2880
2881    # Same as test_config_7_ok but don't disable old loggers.
2882    def test_config_8_ok(self):
2883        with support.captured_stdout() as output:
2884            self.apply_config(self.config1)
2885            logger = logging.getLogger("compiler.parser")
2886            # All will output a message
2887            logger.info(self.next_message())
2888            logger.error(self.next_message())
2889            self.assert_log_lines([
2890                ('INFO', '1'),
2891                ('ERROR', '2'),
2892            ], stream=output)
2893            # Original logger output is empty.
2894            self.assert_log_lines([])
2895        with support.captured_stdout() as output:
2896            self.apply_config(self.config8)
2897            logger = logging.getLogger("compiler.parser")
2898            self.assertFalse(logger.disabled)
2899            # Both will output a message
2900            logger.info(self.next_message())
2901            logger.error(self.next_message())
2902            logger = logging.getLogger("compiler.lexer")
2903            # Both will output a message
2904            logger.info(self.next_message())
2905            logger.error(self.next_message())
2906            self.assert_log_lines([
2907                ('INFO', '3'),
2908                ('ERROR', '4'),
2909                ('INFO', '5'),
2910                ('ERROR', '6'),
2911            ], stream=output)
2912            # Original logger output is empty.
2913            self.assert_log_lines([])
2914
2915    def test_config_8a_ok(self):
2916        with support.captured_stdout() as output:
2917            self.apply_config(self.config1a)
2918            logger = logging.getLogger("compiler.parser")
2919            # See issue #11424. compiler-hyphenated sorts
2920            # between compiler and compiler.xyz and this
2921            # was preventing compiler.xyz from being included
2922            # in the child loggers of compiler because of an
2923            # overzealous loop termination condition.
2924            hyphenated = logging.getLogger('compiler-hyphenated')
2925            # All will output a message
2926            logger.info(self.next_message())
2927            logger.error(self.next_message())
2928            hyphenated.critical(self.next_message())
2929            self.assert_log_lines([
2930                ('INFO', '1'),
2931                ('ERROR', '2'),
2932                ('CRITICAL', '3'),
2933            ], stream=output)
2934            # Original logger output is empty.
2935            self.assert_log_lines([])
2936        with support.captured_stdout() as output:
2937            self.apply_config(self.config8a)
2938            logger = logging.getLogger("compiler.parser")
2939            self.assertFalse(logger.disabled)
2940            # Both will output a message
2941            logger.info(self.next_message())
2942            logger.error(self.next_message())
2943            logger = logging.getLogger("compiler.lexer")
2944            # Both will output a message
2945            logger.info(self.next_message())
2946            logger.error(self.next_message())
2947            # Will not appear
2948            hyphenated.critical(self.next_message())
2949            self.assert_log_lines([
2950                ('INFO', '4'),
2951                ('ERROR', '5'),
2952                ('INFO', '6'),
2953                ('ERROR', '7'),
2954            ], stream=output)
2955            # Original logger output is empty.
2956            self.assert_log_lines([])
2957
2958    def test_config_9_ok(self):
2959        with support.captured_stdout() as output:
2960            self.apply_config(self.config9)
2961            logger = logging.getLogger("compiler.parser")
2962            # Nothing will be output since both handler and logger are set to WARNING
2963            logger.info(self.next_message())
2964            self.assert_log_lines([], stream=output)
2965            self.apply_config(self.config9a)
2966            # Nothing will be output since handler is still set to WARNING
2967            logger.info(self.next_message())
2968            self.assert_log_lines([], stream=output)
2969            self.apply_config(self.config9b)
2970            # Message should now be output
2971            logger.info(self.next_message())
2972            self.assert_log_lines([
2973                ('INFO', '3'),
2974            ], stream=output)
2975
2976    def test_config_10_ok(self):
2977        with support.captured_stdout() as output:
2978            self.apply_config(self.config10)
2979            logger = logging.getLogger("compiler.parser")
2980            logger.warning(self.next_message())
2981            logger = logging.getLogger('compiler')
2982            # Not output, because filtered
2983            logger.warning(self.next_message())
2984            logger = logging.getLogger('compiler.lexer')
2985            # Not output, because filtered
2986            logger.warning(self.next_message())
2987            logger = logging.getLogger("compiler.parser.codegen")
2988            # Output, as not filtered
2989            logger.error(self.next_message())
2990            self.assert_log_lines([
2991                ('WARNING', '1'),
2992                ('ERROR', '4'),
2993            ], stream=output)
2994
2995    def test_config11_ok(self):
2996        self.test_config1_ok(self.config11)
2997
2998    def test_config12_failure(self):
2999        self.assertRaises(Exception, self.apply_config, self.config12)
3000
3001    def test_config13_failure(self):
3002        self.assertRaises(Exception, self.apply_config, self.config13)
3003
3004    def test_config14_ok(self):
3005        with support.captured_stdout() as output:
3006            self.apply_config(self.config14)
3007            h = logging._handlers['hand1']
3008            self.assertEqual(h.foo, 'bar')
3009            self.assertEqual(h.terminator, '!\n')
3010            logging.warning('Exclamation')
3011            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
3012
3013    def test_config15_ok(self):
3014
3015        def cleanup(h1, fn):
3016            h1.close()
3017            os.remove(fn)
3018
3019        with self.check_no_resource_warning():
3020            fd, fn = tempfile.mkstemp(".log", "test_logging-X-")
3021            os.close(fd)
3022
3023            config = {
3024                "version": 1,
3025                "handlers": {
3026                    "file": {
3027                        "class": "logging.FileHandler",
3028                        "filename": fn
3029                    }
3030                },
3031                "root": {
3032                    "handlers": ["file"]
3033                }
3034            }
3035
3036            self.apply_config(config)
3037            self.apply_config(config)
3038
3039        handler = logging.root.handlers[0]
3040        self.addCleanup(cleanup, handler, fn)
3041
3042    def setup_via_listener(self, text, verify=None):
3043        text = text.encode("utf-8")
3044        # Ask for a randomly assigned port (by using port 0)
3045        t = logging.config.listen(0, verify)
3046        t.start()
3047        t.ready.wait()
3048        # Now get the port allocated
3049        port = t.port
3050        t.ready.clear()
3051        try:
3052            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3053            sock.settimeout(2.0)
3054            sock.connect(('localhost', port))
3055
3056            slen = struct.pack('>L', len(text))
3057            s = slen + text
3058            sentsofar = 0
3059            left = len(s)
3060            while left > 0:
3061                sent = sock.send(s[sentsofar:])
3062                sentsofar += sent
3063                left -= sent
3064            sock.close()
3065        finally:
3066            t.ready.wait(2.0)
3067            logging.config.stopListening()
3068            support.join_thread(t, 2.0)
3069
3070    def test_listen_config_10_ok(self):
3071        with support.captured_stdout() as output:
3072            self.setup_via_listener(json.dumps(self.config10))
3073            logger = logging.getLogger("compiler.parser")
3074            logger.warning(self.next_message())
3075            logger = logging.getLogger('compiler')
3076            # Not output, because filtered
3077            logger.warning(self.next_message())
3078            logger = logging.getLogger('compiler.lexer')
3079            # Not output, because filtered
3080            logger.warning(self.next_message())
3081            logger = logging.getLogger("compiler.parser.codegen")
3082            # Output, as not filtered
3083            logger.error(self.next_message())
3084            self.assert_log_lines([
3085                ('WARNING', '1'),
3086                ('ERROR', '4'),
3087            ], stream=output)
3088
3089    def test_listen_config_1_ok(self):
3090        with support.captured_stdout() as output:
3091            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
3092            logger = logging.getLogger("compiler.parser")
3093            # Both will output a message
3094            logger.info(self.next_message())
3095            logger.error(self.next_message())
3096            self.assert_log_lines([
3097                ('INFO', '1'),
3098                ('ERROR', '2'),
3099            ], stream=output)
3100            # Original logger output is empty.
3101            self.assert_log_lines([])
3102
3103    def test_listen_verify(self):
3104
3105        def verify_fail(stuff):
3106            return None
3107
3108        def verify_reverse(stuff):
3109            return stuff[::-1]
3110
3111        logger = logging.getLogger("compiler.parser")
3112        to_send = textwrap.dedent(ConfigFileTest.config1)
3113        # First, specify a verification function that will fail.
3114        # We expect to see no output, since our configuration
3115        # never took effect.
3116        with support.captured_stdout() as output:
3117            self.setup_via_listener(to_send, verify_fail)
3118            # Both will output a message
3119            logger.info(self.next_message())
3120            logger.error(self.next_message())
3121        self.assert_log_lines([], stream=output)
3122        # Original logger output has the stuff we logged.
3123        self.assert_log_lines([
3124            ('INFO', '1'),
3125            ('ERROR', '2'),
3126        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3127
3128        # Now, perform no verification. Our configuration
3129        # should take effect.
3130
3131        with support.captured_stdout() as output:
3132            self.setup_via_listener(to_send)    # no verify callable specified
3133            logger = logging.getLogger("compiler.parser")
3134            # Both will output a message
3135            logger.info(self.next_message())
3136            logger.error(self.next_message())
3137        self.assert_log_lines([
3138            ('INFO', '3'),
3139            ('ERROR', '4'),
3140        ], stream=output)
3141        # Original logger output still has the stuff we logged before.
3142        self.assert_log_lines([
3143            ('INFO', '1'),
3144            ('ERROR', '2'),
3145        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3146
3147        # Now, perform verification which transforms the bytes.
3148
3149        with support.captured_stdout() as output:
3150            self.setup_via_listener(to_send[::-1], verify_reverse)
3151            logger = logging.getLogger("compiler.parser")
3152            # Both will output a message
3153            logger.info(self.next_message())
3154            logger.error(self.next_message())
3155        self.assert_log_lines([
3156            ('INFO', '5'),
3157            ('ERROR', '6'),
3158        ], stream=output)
3159        # Original logger output still has the stuff we logged before.
3160        self.assert_log_lines([
3161            ('INFO', '1'),
3162            ('ERROR', '2'),
3163        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
3164
3165    def test_out_of_order(self):
3166        self.apply_config(self.out_of_order)
3167        handler = logging.getLogger('mymodule').handlers[0]
3168        self.assertIsInstance(handler.target, logging.Handler)
3169        self.assertIsInstance(handler.formatter._style,
3170                              logging.StringTemplateStyle)
3171
3172    def test_baseconfig(self):
3173        d = {
3174            'atuple': (1, 2, 3),
3175            'alist': ['a', 'b', 'c'],
3176            'adict': {'d': 'e', 'f': 3 },
3177            'nest1': ('g', ('h', 'i'), 'j'),
3178            'nest2': ['k', ['l', 'm'], 'n'],
3179            'nest3': ['o', 'cfg://alist', 'p'],
3180        }
3181        bc = logging.config.BaseConfigurator(d)
3182        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
3183        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
3184        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
3185        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
3186        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
3187        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
3188        v = bc.convert('cfg://nest3')
3189        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
3190        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
3191        self.assertRaises(ValueError, bc.convert, 'cfg://!')
3192        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
3193
3194class ManagerTest(BaseTest):
3195    def test_manager_loggerclass(self):
3196        logged = []
3197
3198        class MyLogger(logging.Logger):
3199            def _log(self, level, msg, args, exc_info=None, extra=None):
3200                logged.append(msg)
3201
3202        man = logging.Manager(None)
3203        self.assertRaises(TypeError, man.setLoggerClass, int)
3204        man.setLoggerClass(MyLogger)
3205        logger = man.getLogger('test')
3206        logger.warning('should appear in logged')
3207        logging.warning('should not appear in logged')
3208
3209        self.assertEqual(logged, ['should appear in logged'])
3210
3211    def test_set_log_record_factory(self):
3212        man = logging.Manager(None)
3213        expected = object()
3214        man.setLogRecordFactory(expected)
3215        self.assertEqual(man.logRecordFactory, expected)
3216
3217class ChildLoggerTest(BaseTest):
3218    def test_child_loggers(self):
3219        r = logging.getLogger()
3220        l1 = logging.getLogger('abc')
3221        l2 = logging.getLogger('def.ghi')
3222        c1 = r.getChild('xyz')
3223        c2 = r.getChild('uvw.xyz')
3224        self.assertIs(c1, logging.getLogger('xyz'))
3225        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3226        c1 = l1.getChild('def')
3227        c2 = c1.getChild('ghi')
3228        c3 = l1.getChild('def.ghi')
3229        self.assertIs(c1, logging.getLogger('abc.def'))
3230        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3231        self.assertIs(c2, c3)
3232
3233
3234class DerivedLogRecord(logging.LogRecord):
3235    pass
3236
3237class LogRecordFactoryTest(BaseTest):
3238
3239    def setUp(self):
3240        class CheckingFilter(logging.Filter):
3241            def __init__(self, cls):
3242                self.cls = cls
3243
3244            def filter(self, record):
3245                t = type(record)
3246                if t is not self.cls:
3247                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3248                            self.cls)
3249                    raise TypeError(msg)
3250                return True
3251
3252        BaseTest.setUp(self)
3253        self.filter = CheckingFilter(DerivedLogRecord)
3254        self.root_logger.addFilter(self.filter)
3255        self.orig_factory = logging.getLogRecordFactory()
3256
3257    def tearDown(self):
3258        self.root_logger.removeFilter(self.filter)
3259        BaseTest.tearDown(self)
3260        logging.setLogRecordFactory(self.orig_factory)
3261
3262    def test_logrecord_class(self):
3263        self.assertRaises(TypeError, self.root_logger.warning,
3264                          self.next_message())
3265        logging.setLogRecordFactory(DerivedLogRecord)
3266        self.root_logger.error(self.next_message())
3267        self.assert_log_lines([
3268           ('root', 'ERROR', '2'),
3269        ])
3270
3271
3272class QueueHandlerTest(BaseTest):
3273    # Do not bother with a logger name group.
3274    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3275
3276    def setUp(self):
3277        BaseTest.setUp(self)
3278        self.queue = queue.Queue(-1)
3279        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3280        self.name = 'que'
3281        self.que_logger = logging.getLogger('que')
3282        self.que_logger.propagate = False
3283        self.que_logger.setLevel(logging.WARNING)
3284        self.que_logger.addHandler(self.que_hdlr)
3285
3286    def tearDown(self):
3287        self.que_hdlr.close()
3288        BaseTest.tearDown(self)
3289
3290    def test_queue_handler(self):
3291        self.que_logger.debug(self.next_message())
3292        self.assertRaises(queue.Empty, self.queue.get_nowait)
3293        self.que_logger.info(self.next_message())
3294        self.assertRaises(queue.Empty, self.queue.get_nowait)
3295        msg = self.next_message()
3296        self.que_logger.warning(msg)
3297        data = self.queue.get_nowait()
3298        self.assertTrue(isinstance(data, logging.LogRecord))
3299        self.assertEqual(data.name, self.que_logger.name)
3300        self.assertEqual((data.msg, data.args), (msg, None))
3301
3302    def test_formatting(self):
3303        msg = self.next_message()
3304        levelname = logging.getLevelName(logging.WARNING)
3305        log_format_str = '{name} -> {levelname}: {message}'
3306        formatted_msg = log_format_str.format(name=self.name,
3307                                              levelname=levelname, message=msg)
3308        formatter = logging.Formatter(self.log_format)
3309        self.que_hdlr.setFormatter(formatter)
3310        self.que_logger.warning(msg)
3311        log_record = self.queue.get_nowait()
3312        self.assertEqual(formatted_msg, log_record.msg)
3313        self.assertEqual(formatted_msg, log_record.message)
3314
3315    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3316                         'logging.handlers.QueueListener required for this test')
3317    def test_queue_listener(self):
3318        handler = support.TestHandler(support.Matcher())
3319        listener = logging.handlers.QueueListener(self.queue, handler)
3320        listener.start()
3321        try:
3322            self.que_logger.warning(self.next_message())
3323            self.que_logger.error(self.next_message())
3324            self.que_logger.critical(self.next_message())
3325        finally:
3326            listener.stop()
3327        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3328        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3329        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3330        handler.close()
3331
3332        # Now test with respect_handler_level set
3333
3334        handler = support.TestHandler(support.Matcher())
3335        handler.setLevel(logging.CRITICAL)
3336        listener = logging.handlers.QueueListener(self.queue, handler,
3337                                                  respect_handler_level=True)
3338        listener.start()
3339        try:
3340            self.que_logger.warning(self.next_message())
3341            self.que_logger.error(self.next_message())
3342            self.que_logger.critical(self.next_message())
3343        finally:
3344            listener.stop()
3345        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3346        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3347        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3348        handler.close()
3349
3350    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3351                         'logging.handlers.QueueListener required for this test')
3352    def test_queue_listener_with_StreamHandler(self):
3353        # Test that traceback only appends once (bpo-34334).
3354        listener = logging.handlers.QueueListener(self.queue, self.root_hdlr)
3355        listener.start()
3356        try:
3357            1 / 0
3358        except ZeroDivisionError as e:
3359            exc = e
3360            self.que_logger.exception(self.next_message(), exc_info=exc)
3361        listener.stop()
3362        self.assertEqual(self.stream.getvalue().strip().count('Traceback'), 1)
3363
3364if hasattr(logging.handlers, 'QueueListener'):
3365    import multiprocessing
3366    from unittest.mock import patch
3367
3368    class QueueListenerTest(BaseTest):
3369        """
3370        Tests based on patch submitted for issue #27930. Ensure that
3371        QueueListener handles all log messages.
3372        """
3373
3374        repeat = 20
3375
3376        @staticmethod
3377        def setup_and_log(log_queue, ident):
3378            """
3379            Creates a logger with a QueueHandler that logs to a queue read by a
3380            QueueListener. Starts the listener, logs five messages, and stops
3381            the listener.
3382            """
3383            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3384            logger.setLevel(logging.DEBUG)
3385            handler = logging.handlers.QueueHandler(log_queue)
3386            logger.addHandler(handler)
3387            listener = logging.handlers.QueueListener(log_queue)
3388            listener.start()
3389
3390            logger.info('one')
3391            logger.info('two')
3392            logger.info('three')
3393            logger.info('four')
3394            logger.info('five')
3395
3396            listener.stop()
3397            logger.removeHandler(handler)
3398            handler.close()
3399
3400        @patch.object(logging.handlers.QueueListener, 'handle')
3401        def test_handle_called_with_queue_queue(self, mock_handle):
3402            for i in range(self.repeat):
3403                log_queue = queue.Queue()
3404                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3405            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3406                             'correct number of handled log messages')
3407
3408        @patch.object(logging.handlers.QueueListener, 'handle')
3409        def test_handle_called_with_mp_queue(self, mock_handle):
3410            # Issue 28668: The multiprocessing (mp) module is not functional
3411            # when the mp.synchronize module cannot be imported.
3412            support.import_module('multiprocessing.synchronize')
3413            for i in range(self.repeat):
3414                log_queue = multiprocessing.Queue()
3415                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3416                log_queue.close()
3417                log_queue.join_thread()
3418            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3419                             'correct number of handled log messages')
3420
3421        @staticmethod
3422        def get_all_from_queue(log_queue):
3423            try:
3424                while True:
3425                    yield log_queue.get_nowait()
3426            except queue.Empty:
3427                return []
3428
3429        def test_no_messages_in_queue_after_stop(self):
3430            """
3431            Five messages are logged then the QueueListener is stopped. This
3432            test then gets everything off the queue. Failure of this test
3433            indicates that messages were not registered on the queue until
3434            _after_ the QueueListener stopped.
3435            """
3436            # Issue 28668: The multiprocessing (mp) module is not functional
3437            # when the mp.synchronize module cannot be imported.
3438            support.import_module('multiprocessing.synchronize')
3439            for i in range(self.repeat):
3440                queue = multiprocessing.Queue()
3441                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3442                # time.sleep(1)
3443                items = list(self.get_all_from_queue(queue))
3444                queue.close()
3445                queue.join_thread()
3446
3447                expected = [[], [logging.handlers.QueueListener._sentinel]]
3448                self.assertIn(items, expected,
3449                              'Found unexpected messages in queue: %s' % (
3450                                    [m.msg if isinstance(m, logging.LogRecord)
3451                                     else m for m in items]))
3452
3453
3454ZERO = datetime.timedelta(0)
3455
3456class UTC(datetime.tzinfo):
3457    def utcoffset(self, dt):
3458        return ZERO
3459
3460    dst = utcoffset
3461
3462    def tzname(self, dt):
3463        return 'UTC'
3464
3465utc = UTC()
3466
3467class FormatterTest(unittest.TestCase):
3468    def setUp(self):
3469        self.common = {
3470            'name': 'formatter.test',
3471            'level': logging.DEBUG,
3472            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3473            'lineno': 42,
3474            'exc_info': None,
3475            'func': None,
3476            'msg': 'Message with %d %s',
3477            'args': (2, 'placeholders'),
3478        }
3479        self.variants = {
3480        }
3481
3482    def get_record(self, name=None):
3483        result = dict(self.common)
3484        if name is not None:
3485            result.update(self.variants[name])
3486        return logging.makeLogRecord(result)
3487
3488    def test_percent(self):
3489        # Test %-formatting
3490        r = self.get_record()
3491        f = logging.Formatter('${%(message)s}')
3492        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3493        f = logging.Formatter('%(random)s')
3494        self.assertRaises(KeyError, f.format, r)
3495        self.assertFalse(f.usesTime())
3496        f = logging.Formatter('%(asctime)s')
3497        self.assertTrue(f.usesTime())
3498        f = logging.Formatter('%(asctime)-15s')
3499        self.assertTrue(f.usesTime())
3500        f = logging.Formatter('asctime')
3501        self.assertFalse(f.usesTime())
3502
3503    def test_braces(self):
3504        # Test {}-formatting
3505        r = self.get_record()
3506        f = logging.Formatter('$%{message}%$', style='{')
3507        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3508        f = logging.Formatter('{random}', style='{')
3509        self.assertRaises(KeyError, f.format, r)
3510        self.assertFalse(f.usesTime())
3511        f = logging.Formatter('{asctime}', style='{')
3512        self.assertTrue(f.usesTime())
3513        f = logging.Formatter('{asctime!s:15}', style='{')
3514        self.assertTrue(f.usesTime())
3515        f = logging.Formatter('{asctime:15}', style='{')
3516        self.assertTrue(f.usesTime())
3517        f = logging.Formatter('asctime', style='{')
3518        self.assertFalse(f.usesTime())
3519
3520    def test_dollars(self):
3521        # Test $-formatting
3522        r = self.get_record()
3523        f = logging.Formatter('$message', style='$')
3524        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3525        f = logging.Formatter('$$%${message}%$$', style='$')
3526        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3527        f = logging.Formatter('${random}', style='$')
3528        self.assertRaises(KeyError, f.format, r)
3529        self.assertFalse(f.usesTime())
3530        f = logging.Formatter('${asctime}', style='$')
3531        self.assertTrue(f.usesTime())
3532        f = logging.Formatter('${asctime', style='$')
3533        self.assertFalse(f.usesTime())
3534        f = logging.Formatter('$asctime', style='$')
3535        self.assertTrue(f.usesTime())
3536        f = logging.Formatter('asctime', style='$')
3537        self.assertFalse(f.usesTime())
3538
3539    def test_invalid_style(self):
3540        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3541
3542    def test_time(self):
3543        r = self.get_record()
3544        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3545        # We use None to indicate we want the local timezone
3546        # We're essentially converting a UTC time to local time
3547        r.created = time.mktime(dt.astimezone(None).timetuple())
3548        r.msecs = 123
3549        f = logging.Formatter('%(asctime)s %(message)s')
3550        f.converter = time.gmtime
3551        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
3552        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
3553        f.format(r)
3554        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
3555
3556class TestBufferingFormatter(logging.BufferingFormatter):
3557    def formatHeader(self, records):
3558        return '[(%d)' % len(records)
3559
3560    def formatFooter(self, records):
3561        return '(%d)]' % len(records)
3562
3563class BufferingFormatterTest(unittest.TestCase):
3564    def setUp(self):
3565        self.records = [
3566            logging.makeLogRecord({'msg': 'one'}),
3567            logging.makeLogRecord({'msg': 'two'}),
3568        ]
3569
3570    def test_default(self):
3571        f = logging.BufferingFormatter()
3572        self.assertEqual('', f.format([]))
3573        self.assertEqual('onetwo', f.format(self.records))
3574
3575    def test_custom(self):
3576        f = TestBufferingFormatter()
3577        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
3578        lf = logging.Formatter('<%(message)s>')
3579        f = TestBufferingFormatter(lf)
3580        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
3581
3582class ExceptionTest(BaseTest):
3583    def test_formatting(self):
3584        r = self.root_logger
3585        h = RecordingHandler()
3586        r.addHandler(h)
3587        try:
3588            raise RuntimeError('deliberate mistake')
3589        except:
3590            logging.exception('failed', stack_info=True)
3591        r.removeHandler(h)
3592        h.close()
3593        r = h.records[0]
3594        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
3595                                              'call last):\n'))
3596        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
3597                                            'deliberate mistake'))
3598        self.assertTrue(r.stack_info.startswith('Stack (most recent '
3599                                              'call last):\n'))
3600        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
3601                                            'stack_info=True)'))
3602
3603
3604class LastResortTest(BaseTest):
3605    def test_last_resort(self):
3606        # Test the last resort handler
3607        root = self.root_logger
3608        root.removeHandler(self.root_hdlr)
3609        old_lastresort = logging.lastResort
3610        old_raise_exceptions = logging.raiseExceptions
3611
3612        try:
3613            with support.captured_stderr() as stderr:
3614                root.debug('This should not appear')
3615                self.assertEqual(stderr.getvalue(), '')
3616                root.warning('Final chance!')
3617                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
3618
3619            # No handlers and no last resort, so 'No handlers' message
3620            logging.lastResort = None
3621            with support.captured_stderr() as stderr:
3622                root.warning('Final chance!')
3623                msg = 'No handlers could be found for logger "root"\n'
3624                self.assertEqual(stderr.getvalue(), msg)
3625
3626            # 'No handlers' message only printed once
3627            with support.captured_stderr() as stderr:
3628                root.warning('Final chance!')
3629                self.assertEqual(stderr.getvalue(), '')
3630
3631            # If raiseExceptions is False, no message is printed
3632            root.manager.emittedNoHandlerWarning = False
3633            logging.raiseExceptions = False
3634            with support.captured_stderr() as stderr:
3635                root.warning('Final chance!')
3636                self.assertEqual(stderr.getvalue(), '')
3637        finally:
3638            root.addHandler(self.root_hdlr)
3639            logging.lastResort = old_lastresort
3640            logging.raiseExceptions = old_raise_exceptions
3641
3642
3643class FakeHandler:
3644
3645    def __init__(self, identifier, called):
3646        for method in ('acquire', 'flush', 'close', 'release'):
3647            setattr(self, method, self.record_call(identifier, method, called))
3648
3649    def record_call(self, identifier, method_name, called):
3650        def inner():
3651            called.append('{} - {}'.format(identifier, method_name))
3652        return inner
3653
3654
3655class RecordingHandler(logging.NullHandler):
3656
3657    def __init__(self, *args, **kwargs):
3658        super(RecordingHandler, self).__init__(*args, **kwargs)
3659        self.records = []
3660
3661    def handle(self, record):
3662        """Keep track of all the emitted records."""
3663        self.records.append(record)
3664
3665
3666class ShutdownTest(BaseTest):
3667
3668    """Test suite for the shutdown method."""
3669
3670    def setUp(self):
3671        super(ShutdownTest, self).setUp()
3672        self.called = []
3673
3674        raise_exceptions = logging.raiseExceptions
3675        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
3676
3677    def raise_error(self, error):
3678        def inner():
3679            raise error()
3680        return inner
3681
3682    def test_no_failure(self):
3683        # create some fake handlers
3684        handler0 = FakeHandler(0, self.called)
3685        handler1 = FakeHandler(1, self.called)
3686        handler2 = FakeHandler(2, self.called)
3687
3688        # create live weakref to those handlers
3689        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
3690
3691        logging.shutdown(handlerList=list(handlers))
3692
3693        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
3694                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
3695                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
3696        self.assertEqual(expected, self.called)
3697
3698    def _test_with_failure_in_method(self, method, error):
3699        handler = FakeHandler(0, self.called)
3700        setattr(handler, method, self.raise_error(error))
3701        handlers = [logging.weakref.ref(handler)]
3702
3703        logging.shutdown(handlerList=list(handlers))
3704
3705        self.assertEqual('0 - release', self.called[-1])
3706
3707    def test_with_ioerror_in_acquire(self):
3708        self._test_with_failure_in_method('acquire', OSError)
3709
3710    def test_with_ioerror_in_flush(self):
3711        self._test_with_failure_in_method('flush', OSError)
3712
3713    def test_with_ioerror_in_close(self):
3714        self._test_with_failure_in_method('close', OSError)
3715
3716    def test_with_valueerror_in_acquire(self):
3717        self._test_with_failure_in_method('acquire', ValueError)
3718
3719    def test_with_valueerror_in_flush(self):
3720        self._test_with_failure_in_method('flush', ValueError)
3721
3722    def test_with_valueerror_in_close(self):
3723        self._test_with_failure_in_method('close', ValueError)
3724
3725    def test_with_other_error_in_acquire_without_raise(self):
3726        logging.raiseExceptions = False
3727        self._test_with_failure_in_method('acquire', IndexError)
3728
3729    def test_with_other_error_in_flush_without_raise(self):
3730        logging.raiseExceptions = False
3731        self._test_with_failure_in_method('flush', IndexError)
3732
3733    def test_with_other_error_in_close_without_raise(self):
3734        logging.raiseExceptions = False
3735        self._test_with_failure_in_method('close', IndexError)
3736
3737    def test_with_other_error_in_acquire_with_raise(self):
3738        logging.raiseExceptions = True
3739        self.assertRaises(IndexError, self._test_with_failure_in_method,
3740                          'acquire', IndexError)
3741
3742    def test_with_other_error_in_flush_with_raise(self):
3743        logging.raiseExceptions = True
3744        self.assertRaises(IndexError, self._test_with_failure_in_method,
3745                          'flush', IndexError)
3746
3747    def test_with_other_error_in_close_with_raise(self):
3748        logging.raiseExceptions = True
3749        self.assertRaises(IndexError, self._test_with_failure_in_method,
3750                          'close', IndexError)
3751
3752
3753class ModuleLevelMiscTest(BaseTest):
3754
3755    """Test suite for some module level methods."""
3756
3757    def test_disable(self):
3758        old_disable = logging.root.manager.disable
3759        # confirm our assumptions are correct
3760        self.assertEqual(old_disable, 0)
3761        self.addCleanup(logging.disable, old_disable)
3762
3763        logging.disable(83)
3764        self.assertEqual(logging.root.manager.disable, 83)
3765
3766        # test the default value introduced in 3.7
3767        # (Issue #28524)
3768        logging.disable()
3769        self.assertEqual(logging.root.manager.disable, logging.CRITICAL)
3770
3771    def _test_log(self, method, level=None):
3772        called = []
3773        support.patch(self, logging, 'basicConfig',
3774                      lambda *a, **kw: called.append((a, kw)))
3775
3776        recording = RecordingHandler()
3777        logging.root.addHandler(recording)
3778
3779        log_method = getattr(logging, method)
3780        if level is not None:
3781            log_method(level, "test me: %r", recording)
3782        else:
3783            log_method("test me: %r", recording)
3784
3785        self.assertEqual(len(recording.records), 1)
3786        record = recording.records[0]
3787        self.assertEqual(record.getMessage(), "test me: %r" % recording)
3788
3789        expected_level = level if level is not None else getattr(logging, method.upper())
3790        self.assertEqual(record.levelno, expected_level)
3791
3792        # basicConfig was not called!
3793        self.assertEqual(called, [])
3794
3795    def test_log(self):
3796        self._test_log('log', logging.ERROR)
3797
3798    def test_debug(self):
3799        self._test_log('debug')
3800
3801    def test_info(self):
3802        self._test_log('info')
3803
3804    def test_warning(self):
3805        self._test_log('warning')
3806
3807    def test_error(self):
3808        self._test_log('error')
3809
3810    def test_critical(self):
3811        self._test_log('critical')
3812
3813    def test_set_logger_class(self):
3814        self.assertRaises(TypeError, logging.setLoggerClass, object)
3815
3816        class MyLogger(logging.Logger):
3817            pass
3818
3819        logging.setLoggerClass(MyLogger)
3820        self.assertEqual(logging.getLoggerClass(), MyLogger)
3821
3822        logging.setLoggerClass(logging.Logger)
3823        self.assertEqual(logging.getLoggerClass(), logging.Logger)
3824
3825    @support.requires_type_collecting
3826    def test_logging_at_shutdown(self):
3827        # Issue #20037
3828        code = """if 1:
3829            import logging
3830
3831            class A:
3832                def __del__(self):
3833                    try:
3834                        raise ValueError("some error")
3835                    except Exception:
3836                        logging.exception("exception in __del__")
3837
3838            a = A()"""
3839        rc, out, err = assert_python_ok("-c", code)
3840        err = err.decode()
3841        self.assertIn("exception in __del__", err)
3842        self.assertIn("ValueError: some error", err)
3843
3844
3845class LogRecordTest(BaseTest):
3846    def test_str_rep(self):
3847        r = logging.makeLogRecord({})
3848        s = str(r)
3849        self.assertTrue(s.startswith('<LogRecord: '))
3850        self.assertTrue(s.endswith('>'))
3851
3852    def test_dict_arg(self):
3853        h = RecordingHandler()
3854        r = logging.getLogger()
3855        r.addHandler(h)
3856        d = {'less' : 'more' }
3857        logging.warning('less is %(less)s', d)
3858        self.assertIs(h.records[0].args, d)
3859        self.assertEqual(h.records[0].message, 'less is more')
3860        r.removeHandler(h)
3861        h.close()
3862
3863    def test_multiprocessing(self):
3864        r = logging.makeLogRecord({})
3865        self.assertEqual(r.processName, 'MainProcess')
3866        try:
3867            import multiprocessing as mp
3868            r = logging.makeLogRecord({})
3869            self.assertEqual(r.processName, mp.current_process().name)
3870        except ImportError:
3871            pass
3872
3873    def test_optional(self):
3874        r = logging.makeLogRecord({})
3875        NOT_NONE = self.assertIsNotNone
3876        NOT_NONE(r.thread)
3877        NOT_NONE(r.threadName)
3878        NOT_NONE(r.process)
3879        NOT_NONE(r.processName)
3880        log_threads = logging.logThreads
3881        log_processes = logging.logProcesses
3882        log_multiprocessing = logging.logMultiprocessing
3883        try:
3884            logging.logThreads = False
3885            logging.logProcesses = False
3886            logging.logMultiprocessing = False
3887            r = logging.makeLogRecord({})
3888            NONE = self.assertIsNone
3889            NONE(r.thread)
3890            NONE(r.threadName)
3891            NONE(r.process)
3892            NONE(r.processName)
3893        finally:
3894            logging.logThreads = log_threads
3895            logging.logProcesses = log_processes
3896            logging.logMultiprocessing = log_multiprocessing
3897
3898class BasicConfigTest(unittest.TestCase):
3899
3900    """Test suite for logging.basicConfig."""
3901
3902    def setUp(self):
3903        super(BasicConfigTest, self).setUp()
3904        self.handlers = logging.root.handlers
3905        self.saved_handlers = logging._handlers.copy()
3906        self.saved_handler_list = logging._handlerList[:]
3907        self.original_logging_level = logging.root.level
3908        self.addCleanup(self.cleanup)
3909        logging.root.handlers = []
3910
3911    def tearDown(self):
3912        for h in logging.root.handlers[:]:
3913            logging.root.removeHandler(h)
3914            h.close()
3915        super(BasicConfigTest, self).tearDown()
3916
3917    def cleanup(self):
3918        setattr(logging.root, 'handlers', self.handlers)
3919        logging._handlers.clear()
3920        logging._handlers.update(self.saved_handlers)
3921        logging._handlerList[:] = self.saved_handler_list
3922        logging.root.level = self.original_logging_level
3923
3924    def test_no_kwargs(self):
3925        logging.basicConfig()
3926
3927        # handler defaults to a StreamHandler to sys.stderr
3928        self.assertEqual(len(logging.root.handlers), 1)
3929        handler = logging.root.handlers[0]
3930        self.assertIsInstance(handler, logging.StreamHandler)
3931        self.assertEqual(handler.stream, sys.stderr)
3932
3933        formatter = handler.formatter
3934        # format defaults to logging.BASIC_FORMAT
3935        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
3936        # datefmt defaults to None
3937        self.assertIsNone(formatter.datefmt)
3938        # style defaults to %
3939        self.assertIsInstance(formatter._style, logging.PercentStyle)
3940
3941        # level is not explicitly set
3942        self.assertEqual(logging.root.level, self.original_logging_level)
3943
3944    def test_strformatstyle(self):
3945        with support.captured_stdout() as output:
3946            logging.basicConfig(stream=sys.stdout, style="{")
3947            logging.error("Log an error")
3948            sys.stdout.seek(0)
3949            self.assertEqual(output.getvalue().strip(),
3950                "ERROR:root:Log an error")
3951
3952    def test_stringtemplatestyle(self):
3953        with support.captured_stdout() as output:
3954            logging.basicConfig(stream=sys.stdout, style="$")
3955            logging.error("Log an error")
3956            sys.stdout.seek(0)
3957            self.assertEqual(output.getvalue().strip(),
3958                "ERROR:root:Log an error")
3959
3960    def test_filename(self):
3961
3962        def cleanup(h1, h2, fn):
3963            h1.close()
3964            h2.close()
3965            os.remove(fn)
3966
3967        logging.basicConfig(filename='test.log')
3968
3969        self.assertEqual(len(logging.root.handlers), 1)
3970        handler = logging.root.handlers[0]
3971        self.assertIsInstance(handler, logging.FileHandler)
3972
3973        expected = logging.FileHandler('test.log', 'a')
3974        self.assertEqual(handler.stream.mode, expected.stream.mode)
3975        self.assertEqual(handler.stream.name, expected.stream.name)
3976        self.addCleanup(cleanup, handler, expected, 'test.log')
3977
3978    def test_filemode(self):
3979
3980        def cleanup(h1, h2, fn):
3981            h1.close()
3982            h2.close()
3983            os.remove(fn)
3984
3985        logging.basicConfig(filename='test.log', filemode='wb')
3986
3987        handler = logging.root.handlers[0]
3988        expected = logging.FileHandler('test.log', 'wb')
3989        self.assertEqual(handler.stream.mode, expected.stream.mode)
3990        self.addCleanup(cleanup, handler, expected, 'test.log')
3991
3992    def test_stream(self):
3993        stream = io.StringIO()
3994        self.addCleanup(stream.close)
3995        logging.basicConfig(stream=stream)
3996
3997        self.assertEqual(len(logging.root.handlers), 1)
3998        handler = logging.root.handlers[0]
3999        self.assertIsInstance(handler, logging.StreamHandler)
4000        self.assertEqual(handler.stream, stream)
4001
4002    def test_format(self):
4003        logging.basicConfig(format='foo')
4004
4005        formatter = logging.root.handlers[0].formatter
4006        self.assertEqual(formatter._style._fmt, 'foo')
4007
4008    def test_datefmt(self):
4009        logging.basicConfig(datefmt='bar')
4010
4011        formatter = logging.root.handlers[0].formatter
4012        self.assertEqual(formatter.datefmt, 'bar')
4013
4014    def test_style(self):
4015        logging.basicConfig(style='$')
4016
4017        formatter = logging.root.handlers[0].formatter
4018        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
4019
4020    def test_level(self):
4021        old_level = logging.root.level
4022        self.addCleanup(logging.root.setLevel, old_level)
4023
4024        logging.basicConfig(level=57)
4025        self.assertEqual(logging.root.level, 57)
4026        # Test that second call has no effect
4027        logging.basicConfig(level=58)
4028        self.assertEqual(logging.root.level, 57)
4029
4030    def test_incompatible(self):
4031        assertRaises = self.assertRaises
4032        handlers = [logging.StreamHandler()]
4033        stream = sys.stderr
4034        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4035                                                     stream=stream)
4036        assertRaises(ValueError, logging.basicConfig, filename='test.log',
4037                                                     handlers=handlers)
4038        assertRaises(ValueError, logging.basicConfig, stream=stream,
4039                                                     handlers=handlers)
4040        # Issue 23207: test for invalid kwargs
4041        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
4042        # Should pop both filename and filemode even if filename is None
4043        logging.basicConfig(filename=None, filemode='a')
4044
4045    def test_handlers(self):
4046        handlers = [
4047            logging.StreamHandler(),
4048            logging.StreamHandler(sys.stdout),
4049            logging.StreamHandler(),
4050        ]
4051        f = logging.Formatter()
4052        handlers[2].setFormatter(f)
4053        logging.basicConfig(handlers=handlers)
4054        self.assertIs(handlers[0], logging.root.handlers[0])
4055        self.assertIs(handlers[1], logging.root.handlers[1])
4056        self.assertIs(handlers[2], logging.root.handlers[2])
4057        self.assertIsNotNone(handlers[0].formatter)
4058        self.assertIsNotNone(handlers[1].formatter)
4059        self.assertIs(handlers[2].formatter, f)
4060        self.assertIs(handlers[0].formatter, handlers[1].formatter)
4061
4062    def _test_log(self, method, level=None):
4063        # logging.root has no handlers so basicConfig should be called
4064        called = []
4065
4066        old_basic_config = logging.basicConfig
4067        def my_basic_config(*a, **kw):
4068            old_basic_config()
4069            old_level = logging.root.level
4070            logging.root.setLevel(100)  # avoid having messages in stderr
4071            self.addCleanup(logging.root.setLevel, old_level)
4072            called.append((a, kw))
4073
4074        support.patch(self, logging, 'basicConfig', my_basic_config)
4075
4076        log_method = getattr(logging, method)
4077        if level is not None:
4078            log_method(level, "test me")
4079        else:
4080            log_method("test me")
4081
4082        # basicConfig was called with no arguments
4083        self.assertEqual(called, [((), {})])
4084
4085    def test_log(self):
4086        self._test_log('log', logging.WARNING)
4087
4088    def test_debug(self):
4089        self._test_log('debug')
4090
4091    def test_info(self):
4092        self._test_log('info')
4093
4094    def test_warning(self):
4095        self._test_log('warning')
4096
4097    def test_error(self):
4098        self._test_log('error')
4099
4100    def test_critical(self):
4101        self._test_log('critical')
4102
4103
4104class LoggerAdapterTest(unittest.TestCase):
4105    def setUp(self):
4106        super(LoggerAdapterTest, self).setUp()
4107        old_handler_list = logging._handlerList[:]
4108
4109        self.recording = RecordingHandler()
4110        self.logger = logging.root
4111        self.logger.addHandler(self.recording)
4112        self.addCleanup(self.logger.removeHandler, self.recording)
4113        self.addCleanup(self.recording.close)
4114
4115        def cleanup():
4116            logging._handlerList[:] = old_handler_list
4117
4118        self.addCleanup(cleanup)
4119        self.addCleanup(logging.shutdown)
4120        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
4121
4122    def test_exception(self):
4123        msg = 'testing exception: %r'
4124        exc = None
4125        try:
4126            1 / 0
4127        except ZeroDivisionError as e:
4128            exc = e
4129            self.adapter.exception(msg, self.recording)
4130
4131        self.assertEqual(len(self.recording.records), 1)
4132        record = self.recording.records[0]
4133        self.assertEqual(record.levelno, logging.ERROR)
4134        self.assertEqual(record.msg, msg)
4135        self.assertEqual(record.args, (self.recording,))
4136        self.assertEqual(record.exc_info,
4137                         (exc.__class__, exc, exc.__traceback__))
4138
4139    def test_exception_excinfo(self):
4140        try:
4141            1 / 0
4142        except ZeroDivisionError as e:
4143            exc = e
4144
4145        self.adapter.exception('exc_info test', exc_info=exc)
4146
4147        self.assertEqual(len(self.recording.records), 1)
4148        record = self.recording.records[0]
4149        self.assertEqual(record.exc_info,
4150                         (exc.__class__, exc, exc.__traceback__))
4151
4152    def test_critical(self):
4153        msg = 'critical test! %r'
4154        self.adapter.critical(msg, self.recording)
4155
4156        self.assertEqual(len(self.recording.records), 1)
4157        record = self.recording.records[0]
4158        self.assertEqual(record.levelno, logging.CRITICAL)
4159        self.assertEqual(record.msg, msg)
4160        self.assertEqual(record.args, (self.recording,))
4161
4162    def test_is_enabled_for(self):
4163        old_disable = self.adapter.logger.manager.disable
4164        self.adapter.logger.manager.disable = 33
4165        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
4166                        old_disable)
4167        self.assertFalse(self.adapter.isEnabledFor(32))
4168
4169    def test_has_handlers(self):
4170        self.assertTrue(self.adapter.hasHandlers())
4171
4172        for handler in self.logger.handlers:
4173            self.logger.removeHandler(handler)
4174
4175        self.assertFalse(self.logger.hasHandlers())
4176        self.assertFalse(self.adapter.hasHandlers())
4177
4178    def test_nested(self):
4179        class Adapter(logging.LoggerAdapter):
4180            prefix = 'Adapter'
4181
4182            def process(self, msg, kwargs):
4183                return f"{self.prefix} {msg}", kwargs
4184
4185        msg = 'Adapters can be nested, yo.'
4186        adapter = Adapter(logger=self.logger, extra=None)
4187        adapter_adapter = Adapter(logger=adapter, extra=None)
4188        adapter_adapter.prefix = 'AdapterAdapter'
4189        self.assertEqual(repr(adapter), repr(adapter_adapter))
4190        adapter_adapter.log(logging.CRITICAL, msg, self.recording)
4191        self.assertEqual(len(self.recording.records), 1)
4192        record = self.recording.records[0]
4193        self.assertEqual(record.levelno, logging.CRITICAL)
4194        self.assertEqual(record.msg, f"Adapter AdapterAdapter {msg}")
4195        self.assertEqual(record.args, (self.recording,))
4196        orig_manager = adapter_adapter.manager
4197        self.assertIs(adapter.manager, orig_manager)
4198        self.assertIs(self.logger.manager, orig_manager)
4199        temp_manager = object()
4200        try:
4201            adapter_adapter.manager = temp_manager
4202            self.assertIs(adapter_adapter.manager, temp_manager)
4203            self.assertIs(adapter.manager, temp_manager)
4204            self.assertIs(self.logger.manager, temp_manager)
4205        finally:
4206            adapter_adapter.manager = orig_manager
4207        self.assertIs(adapter_adapter.manager, orig_manager)
4208        self.assertIs(adapter.manager, orig_manager)
4209        self.assertIs(self.logger.manager, orig_manager)
4210
4211
4212class LoggerTest(BaseTest):
4213
4214    def setUp(self):
4215        super(LoggerTest, self).setUp()
4216        self.recording = RecordingHandler()
4217        self.logger = logging.Logger(name='blah')
4218        self.logger.addHandler(self.recording)
4219        self.addCleanup(self.logger.removeHandler, self.recording)
4220        self.addCleanup(self.recording.close)
4221        self.addCleanup(logging.shutdown)
4222
4223    def test_set_invalid_level(self):
4224        self.assertRaises(TypeError, self.logger.setLevel, object())
4225
4226    def test_exception(self):
4227        msg = 'testing exception: %r'
4228        exc = None
4229        try:
4230            1 / 0
4231        except ZeroDivisionError as e:
4232            exc = e
4233            self.logger.exception(msg, self.recording)
4234
4235        self.assertEqual(len(self.recording.records), 1)
4236        record = self.recording.records[0]
4237        self.assertEqual(record.levelno, logging.ERROR)
4238        self.assertEqual(record.msg, msg)
4239        self.assertEqual(record.args, (self.recording,))
4240        self.assertEqual(record.exc_info,
4241                         (exc.__class__, exc, exc.__traceback__))
4242
4243    def test_log_invalid_level_with_raise(self):
4244        with support.swap_attr(logging, 'raiseExceptions', True):
4245            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
4246
4247    def test_log_invalid_level_no_raise(self):
4248        with support.swap_attr(logging, 'raiseExceptions', False):
4249            self.logger.log('10', 'test message')  # no exception happens
4250
4251    def test_find_caller_with_stack_info(self):
4252        called = []
4253        support.patch(self, logging.traceback, 'print_stack',
4254                      lambda f, file: called.append(file.getvalue()))
4255
4256        self.logger.findCaller(stack_info=True)
4257
4258        self.assertEqual(len(called), 1)
4259        self.assertEqual('Stack (most recent call last):\n', called[0])
4260
4261    def test_make_record_with_extra_overwrite(self):
4262        name = 'my record'
4263        level = 13
4264        fn = lno = msg = args = exc_info = func = sinfo = None
4265        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
4266                                       exc_info, func, sinfo)
4267
4268        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
4269            extra = {key: 'some value'}
4270            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
4271                              fn, lno, msg, args, exc_info,
4272                              extra=extra, sinfo=sinfo)
4273
4274    def test_make_record_with_extra_no_overwrite(self):
4275        name = 'my record'
4276        level = 13
4277        fn = lno = msg = args = exc_info = func = sinfo = None
4278        extra = {'valid_key': 'some value'}
4279        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
4280                                        exc_info, extra=extra, sinfo=sinfo)
4281        self.assertIn('valid_key', result.__dict__)
4282
4283    def test_has_handlers(self):
4284        self.assertTrue(self.logger.hasHandlers())
4285
4286        for handler in self.logger.handlers:
4287            self.logger.removeHandler(handler)
4288        self.assertFalse(self.logger.hasHandlers())
4289
4290    def test_has_handlers_no_propagate(self):
4291        child_logger = logging.getLogger('blah.child')
4292        child_logger.propagate = False
4293        self.assertFalse(child_logger.hasHandlers())
4294
4295    def test_is_enabled_for(self):
4296        old_disable = self.logger.manager.disable
4297        self.logger.manager.disable = 23
4298        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4299        self.assertFalse(self.logger.isEnabledFor(22))
4300
4301    def test_root_logger_aliases(self):
4302        root = logging.getLogger()
4303        self.assertIs(root, logging.root)
4304        self.assertIs(root, logging.getLogger(None))
4305        self.assertIs(root, logging.getLogger(''))
4306        self.assertIs(root, logging.getLogger('foo').root)
4307        self.assertIs(root, logging.getLogger('foo.bar').root)
4308        self.assertIs(root, logging.getLogger('foo').parent)
4309
4310        self.assertIsNot(root, logging.getLogger('\0'))
4311        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
4312
4313    def test_invalid_names(self):
4314        self.assertRaises(TypeError, logging.getLogger, any)
4315        self.assertRaises(TypeError, logging.getLogger, b'foo')
4316
4317    def test_pickling(self):
4318        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
4319            for name in ('', 'root', 'foo', 'foo.bar', 'baz.bar'):
4320                logger = logging.getLogger(name)
4321                s = pickle.dumps(logger, proto)
4322                unpickled = pickle.loads(s)
4323                self.assertIs(unpickled, logger)
4324
4325    def test_caching(self):
4326        root = self.root_logger
4327        logger1 = logging.getLogger("abc")
4328        logger2 = logging.getLogger("abc.def")
4329
4330        # Set root logger level and ensure cache is empty
4331        root.setLevel(logging.ERROR)
4332        self.assertEqual(logger2.getEffectiveLevel(), logging.ERROR)
4333        self.assertEqual(logger2._cache, {})
4334
4335        # Ensure cache is populated and calls are consistent
4336        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4337        self.assertFalse(logger2.isEnabledFor(logging.DEBUG))
4338        self.assertEqual(logger2._cache, {logging.ERROR: True, logging.DEBUG: False})
4339        self.assertEqual(root._cache, {})
4340        self.assertTrue(logger2.isEnabledFor(logging.ERROR))
4341
4342        # Ensure root cache gets populated
4343        self.assertEqual(root._cache, {})
4344        self.assertTrue(root.isEnabledFor(logging.ERROR))
4345        self.assertEqual(root._cache, {logging.ERROR: True})
4346
4347        # Set parent logger level and ensure caches are emptied
4348        logger1.setLevel(logging.CRITICAL)
4349        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4350        self.assertEqual(logger2._cache, {})
4351
4352        # Ensure logger2 uses parent logger's effective level
4353        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4354
4355        # Set level to NOTSET and ensure caches are empty
4356        logger2.setLevel(logging.NOTSET)
4357        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4358        self.assertEqual(logger2._cache, {})
4359        self.assertEqual(logger1._cache, {})
4360        self.assertEqual(root._cache, {})
4361
4362        # Verify logger2 follows parent and not root
4363        self.assertFalse(logger2.isEnabledFor(logging.ERROR))
4364        self.assertTrue(logger2.isEnabledFor(logging.CRITICAL))
4365        self.assertFalse(logger1.isEnabledFor(logging.ERROR))
4366        self.assertTrue(logger1.isEnabledFor(logging.CRITICAL))
4367        self.assertTrue(root.isEnabledFor(logging.ERROR))
4368
4369        # Disable logging in manager and ensure caches are clear
4370        logging.disable()
4371        self.assertEqual(logger2.getEffectiveLevel(), logging.CRITICAL)
4372        self.assertEqual(logger2._cache, {})
4373        self.assertEqual(logger1._cache, {})
4374        self.assertEqual(root._cache, {})
4375
4376        # Ensure no loggers are enabled
4377        self.assertFalse(logger1.isEnabledFor(logging.CRITICAL))
4378        self.assertFalse(logger2.isEnabledFor(logging.CRITICAL))
4379        self.assertFalse(root.isEnabledFor(logging.CRITICAL))
4380
4381
4382class BaseFileTest(BaseTest):
4383    "Base class for handler tests that write log files"
4384
4385    def setUp(self):
4386        BaseTest.setUp(self)
4387        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
4388        os.close(fd)
4389        self.rmfiles = []
4390
4391    def tearDown(self):
4392        for fn in self.rmfiles:
4393            os.unlink(fn)
4394        if os.path.exists(self.fn):
4395            os.unlink(self.fn)
4396        BaseTest.tearDown(self)
4397
4398    def assertLogFile(self, filename):
4399        "Assert a log file is there and register it for deletion"
4400        self.assertTrue(os.path.exists(filename),
4401                        msg="Log file %r does not exist" % filename)
4402        self.rmfiles.append(filename)
4403
4404
4405class FileHandlerTest(BaseFileTest):
4406    def test_delay(self):
4407        os.unlink(self.fn)
4408        fh = logging.FileHandler(self.fn, delay=True)
4409        self.assertIsNone(fh.stream)
4410        self.assertFalse(os.path.exists(self.fn))
4411        fh.handle(logging.makeLogRecord({}))
4412        self.assertIsNotNone(fh.stream)
4413        self.assertTrue(os.path.exists(self.fn))
4414        fh.close()
4415
4416class RotatingFileHandlerTest(BaseFileTest):
4417    def next_rec(self):
4418        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
4419                                 self.next_message(), None, None, None)
4420
4421    def test_should_not_rollover(self):
4422        # If maxbytes is zero rollover never occurs
4423        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
4424        self.assertFalse(rh.shouldRollover(None))
4425        rh.close()
4426
4427    def test_should_rollover(self):
4428        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
4429        self.assertTrue(rh.shouldRollover(self.next_rec()))
4430        rh.close()
4431
4432    def test_file_created(self):
4433        # checks that the file is created and assumes it was created
4434        # by us
4435        rh = logging.handlers.RotatingFileHandler(self.fn)
4436        rh.emit(self.next_rec())
4437        self.assertLogFile(self.fn)
4438        rh.close()
4439
4440    def test_rollover_filenames(self):
4441        def namer(name):
4442            return name + ".test"
4443        rh = logging.handlers.RotatingFileHandler(
4444            self.fn, backupCount=2, maxBytes=1)
4445        rh.namer = namer
4446        rh.emit(self.next_rec())
4447        self.assertLogFile(self.fn)
4448        rh.emit(self.next_rec())
4449        self.assertLogFile(namer(self.fn + ".1"))
4450        rh.emit(self.next_rec())
4451        self.assertLogFile(namer(self.fn + ".2"))
4452        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4453        rh.close()
4454
4455    @support.requires_zlib
4456    def test_rotator(self):
4457        def namer(name):
4458            return name + ".gz"
4459
4460        def rotator(source, dest):
4461            with open(source, "rb") as sf:
4462                data = sf.read()
4463                compressed = zlib.compress(data, 9)
4464                with open(dest, "wb") as df:
4465                    df.write(compressed)
4466            os.remove(source)
4467
4468        rh = logging.handlers.RotatingFileHandler(
4469            self.fn, backupCount=2, maxBytes=1)
4470        rh.rotator = rotator
4471        rh.namer = namer
4472        m1 = self.next_rec()
4473        rh.emit(m1)
4474        self.assertLogFile(self.fn)
4475        m2 = self.next_rec()
4476        rh.emit(m2)
4477        fn = namer(self.fn + ".1")
4478        self.assertLogFile(fn)
4479        newline = os.linesep
4480        with open(fn, "rb") as f:
4481            compressed = f.read()
4482            data = zlib.decompress(compressed)
4483            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4484        rh.emit(self.next_rec())
4485        fn = namer(self.fn + ".2")
4486        self.assertLogFile(fn)
4487        with open(fn, "rb") as f:
4488            compressed = f.read()
4489            data = zlib.decompress(compressed)
4490            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4491        rh.emit(self.next_rec())
4492        fn = namer(self.fn + ".2")
4493        with open(fn, "rb") as f:
4494            compressed = f.read()
4495            data = zlib.decompress(compressed)
4496            self.assertEqual(data.decode("ascii"), m2.msg + newline)
4497        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4498        rh.close()
4499
4500class TimedRotatingFileHandlerTest(BaseFileTest):
4501    # other test methods added below
4502    def test_rollover(self):
4503        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
4504                                                       backupCount=1)
4505        fmt = logging.Formatter('%(asctime)s %(message)s')
4506        fh.setFormatter(fmt)
4507        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
4508        fh.emit(r1)
4509        self.assertLogFile(self.fn)
4510        time.sleep(1.1)    # a little over a second ...
4511        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
4512        fh.emit(r2)
4513        fh.close()
4514        # At this point, we should have a recent rotated file which we
4515        # can test for the existence of. However, in practice, on some
4516        # machines which run really slowly, we don't know how far back
4517        # in time to go to look for the log file. So, we go back a fair
4518        # bit, and stop as soon as we see a rotated file. In theory this
4519        # could of course still fail, but the chances are lower.
4520        found = False
4521        now = datetime.datetime.now()
4522        GO_BACK = 5 * 60 # seconds
4523        for secs in range(GO_BACK):
4524            prev = now - datetime.timedelta(seconds=secs)
4525            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
4526            found = os.path.exists(fn)
4527            if found:
4528                self.rmfiles.append(fn)
4529                break
4530        msg = 'No rotated files found, went back %d seconds' % GO_BACK
4531        if not found:
4532            # print additional diagnostics
4533            dn, fn = os.path.split(self.fn)
4534            files = [f for f in os.listdir(dn) if f.startswith(fn)]
4535            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
4536            print('The only matching files are: %s' % files, file=sys.stderr)
4537            for f in files:
4538                print('Contents of %s:' % f)
4539                path = os.path.join(dn, f)
4540                with open(path, 'r') as tf:
4541                    print(tf.read())
4542        self.assertTrue(found, msg=msg)
4543
4544    def test_invalid(self):
4545        assertRaises = self.assertRaises
4546        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4547                     self.fn, 'X', delay=True)
4548        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4549                     self.fn, 'W', delay=True)
4550        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4551                     self.fn, 'W7', delay=True)
4552
4553    def test_compute_rollover_daily_attime(self):
4554        currentTime = 0
4555        atTime = datetime.time(12, 0, 0)
4556        rh = logging.handlers.TimedRotatingFileHandler(
4557            self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
4558            atTime=atTime)
4559        try:
4560            actual = rh.computeRollover(currentTime)
4561            self.assertEqual(actual, currentTime + 12 * 60 * 60)
4562
4563            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
4564            self.assertEqual(actual, currentTime + 36 * 60 * 60)
4565        finally:
4566            rh.close()
4567
4568    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
4569    def test_compute_rollover_weekly_attime(self):
4570        currentTime = int(time.time())
4571        today = currentTime - currentTime % 86400
4572
4573        atTime = datetime.time(12, 0, 0)
4574
4575        wday = time.gmtime(today).tm_wday
4576        for day in range(7):
4577            rh = logging.handlers.TimedRotatingFileHandler(
4578                self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
4579                atTime=atTime)
4580            try:
4581                if wday > day:
4582                    # The rollover day has already passed this week, so we
4583                    # go over into next week
4584                    expected = (7 - wday + day)
4585                else:
4586                    expected = (day - wday)
4587                # At this point expected is in days from now, convert to seconds
4588                expected *= 24 * 60 * 60
4589                # Add in the rollover time
4590                expected += 12 * 60 * 60
4591                # Add in adjustment for today
4592                expected += today
4593                actual = rh.computeRollover(today)
4594                if actual != expected:
4595                    print('failed in timezone: %d' % time.timezone)
4596                    print('local vars: %s' % locals())
4597                self.assertEqual(actual, expected)
4598                if day == wday:
4599                    # goes into following week
4600                    expected += 7 * 24 * 60 * 60
4601                actual = rh.computeRollover(today + 13 * 60 * 60)
4602                if actual != expected:
4603                    print('failed in timezone: %d' % time.timezone)
4604                    print('local vars: %s' % locals())
4605                self.assertEqual(actual, expected)
4606            finally:
4607                rh.close()
4608
4609
4610def secs(**kw):
4611    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
4612
4613for when, exp in (('S', 1),
4614                  ('M', 60),
4615                  ('H', 60 * 60),
4616                  ('D', 60 * 60 * 24),
4617                  ('MIDNIGHT', 60 * 60 * 24),
4618                  # current time (epoch start) is a Thursday, W0 means Monday
4619                  ('W0', secs(days=4, hours=24)),
4620                 ):
4621    def test_compute_rollover(self, when=when, exp=exp):
4622        rh = logging.handlers.TimedRotatingFileHandler(
4623            self.fn, when=when, interval=1, backupCount=0, utc=True)
4624        currentTime = 0.0
4625        actual = rh.computeRollover(currentTime)
4626        if exp != actual:
4627            # Failures occur on some systems for MIDNIGHT and W0.
4628            # Print detailed calculation for MIDNIGHT so we can try to see
4629            # what's going on
4630            if when == 'MIDNIGHT':
4631                try:
4632                    if rh.utc:
4633                        t = time.gmtime(currentTime)
4634                    else:
4635                        t = time.localtime(currentTime)
4636                    currentHour = t[3]
4637                    currentMinute = t[4]
4638                    currentSecond = t[5]
4639                    # r is the number of seconds left between now and midnight
4640                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
4641                                                       currentMinute) * 60 +
4642                            currentSecond)
4643                    result = currentTime + r
4644                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
4645                    print('currentHour: %s' % currentHour, file=sys.stderr)
4646                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
4647                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
4648                    print('r: %s' % r, file=sys.stderr)
4649                    print('result: %s' % result, file=sys.stderr)
4650                except Exception:
4651                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
4652        self.assertEqual(exp, actual)
4653        rh.close()
4654    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
4655
4656
4657@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
4658class NTEventLogHandlerTest(BaseTest):
4659    def test_basic(self):
4660        logtype = 'Application'
4661        elh = win32evtlog.OpenEventLog(None, logtype)
4662        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
4663
4664        try:
4665            h = logging.handlers.NTEventLogHandler('test_logging')
4666        except pywintypes.error as e:
4667            if e.winerror == 5:  # access denied
4668                raise unittest.SkipTest('Insufficient privileges to run test')
4669            raise
4670
4671        r = logging.makeLogRecord({'msg': 'Test Log Message'})
4672        h.handle(r)
4673        h.close()
4674        # Now see if the event is recorded
4675        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
4676        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
4677                win32evtlog.EVENTLOG_SEQUENTIAL_READ
4678        found = False
4679        GO_BACK = 100
4680        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
4681        for e in events:
4682            if e.SourceName != 'test_logging':
4683                continue
4684            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
4685            if msg != 'Test Log Message\r\n':
4686                continue
4687            found = True
4688            break
4689        msg = 'Record not found in event log, went back %d records' % GO_BACK
4690        self.assertTrue(found, msg=msg)
4691
4692
4693class MiscTestCase(unittest.TestCase):
4694    def test__all__(self):
4695        blacklist = {'logThreads', 'logMultiprocessing',
4696                     'logProcesses', 'currentframe',
4697                     'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
4698                     'Filterer', 'PlaceHolder', 'Manager', 'RootLogger',
4699                     'root', 'threading'}
4700        support.check__all__(self, logging, blacklist=blacklist)
4701
4702
4703# Set the locale to the platform-dependent default.  I have no idea
4704# why the test does this, but in any case we save the current locale
4705# first and restore it at the end.
4706@support.run_with_locale('LC_ALL', '')
4707def test_main():
4708    tests = [
4709        BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
4710        HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
4711        DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
4712        ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest,
4713        StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
4714        QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
4715        LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest,
4716        RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
4717        ExceptionTest, SysLogHandlerTest, IPv6SysLogHandlerTest, HTTPHandlerTest,
4718        NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
4719        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
4720        MiscTestCase
4721    ]
4722    if hasattr(logging.handlers, 'QueueListener'):
4723        tests.append(QueueListenerTest)
4724    support.run_unittest(*tests)
4725
4726if __name__ == "__main__":
4727    test_main()
4728