diff --git a/pelican/__init__.py b/pelican/__init__.py index bd86798818..bacba7c3bb 100644 --- a/pelican/__init__.py +++ b/pelican/__init__.py @@ -14,10 +14,7 @@ from pkgutil import extend_path __path__ = extend_path(__path__, __name__) -# pelican.log has to be the first pelican module to be loaded -# because logging.setLoggerClass has to be called before logging.getLogger -from pelican.log import console -from pelican.log import init as init_logging +from pelican import log from pelican.generators import (ArticlesGenerator, # noqa: I100 PagesGenerator, SourceFileGenerator, StaticGenerator, TemplatePagesGenerator) @@ -164,15 +161,18 @@ def run(self): 'draft page', 'draft pages') - console.print('Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.' - .format( - pluralized_articles, - pluralized_drafts, - pluralized_hidden_articles, - pluralized_pages, - pluralized_hidden_pages, - pluralized_draft_pages, - time.time() - start_time)) + log.console.print( + 'Done: Processed {}, {}, {}, {}, {} and {} in {:.2f} seconds.' + .format( + pluralized_articles, + pluralized_drafts, + pluralized_hidden_articles, + pluralized_pages, + pluralized_hidden_pages, + pluralized_draft_pages, + time.time() - start_time + ) + ) def _get_generator_classes(self): discovered_generators = [ @@ -229,13 +229,13 @@ def _get_writer(self): class PrintSettings(argparse.Action): def __call__(self, parser, namespace, values, option_string): - init_logging(name=__name__) + log.init(level=logging.WARNING) try: instance, settings = get_instance(namespace) except Exception as e: logger.critical("%s: %s", e.__class__.__name__, e) - console.print_exception() + log.console.print_exception() sys.exit(getattr(e, 'exitcode', 1)) if values: @@ -247,15 +247,17 @@ def __call__(self, parser, namespace, values, option_string): setting_format = '\n{}:\n{}' else: setting_format = '\n{}: {}' - console.print(setting_format.format( + log.console.print(setting_format.format( setting, pprint.pformat(settings[setting]))) else: - console.print('\n{} is not a recognized setting.'.format(setting)) + log.console.print( + '\n{} is not a recognized setting.'.format(setting) + ) break else: # No argument was given to --print-settings, so print all settings - console.print(settings) + log.console.print(settings) parser.exit() @@ -358,15 +360,17 @@ def parse_arguments(argv=None): dest='selected_paths', default=None, help='Comma separated list of selected paths to write') - parser.add_argument('--fatal', metavar='errors|warnings', - choices=('errors', 'warnings'), default='', + parser.add_argument('--fatal', choices=('errors', 'warnings'), + default=None, + # type conversion deferred (see: [FATAL] section below) help=('Exit the program with non-zero status if any ' 'errors/warnings encountered.')) parser.add_argument('--logs-dedup-min-level', default='WARNING', - choices=('DEBUG', 'INFO', 'WARNING', 'ERROR'), - help=('Only enable log de-duplication for levels equal' - ' to or above the specified value')) + metavar=('logging severity level'), + type=log.severity_from_name, dest='once_lvl', + help=('Enable log de-duplication for this severity' + ' level and above.')) parser.add_argument('-l', '--listen', dest='listen', action='store_true', help='Serve content files via HTTP and port 8000.') @@ -390,6 +394,10 @@ def parse_arguments(argv=None): args = parser.parse_args(argv) + # [FATAL] value type is converted *after* parsing, to get around argparse + # complaining that converted type does not match the "choices" param type. + args.fatal = log.severity_from_name(args.fatal) + if args.port is not None and not args.listen: logger.warning('--port without --listen has no effect') if args.bind is not None and not args.listen: @@ -424,6 +432,10 @@ def get_config(args): config['PORT'] = args.port if args.bind is not None: config['BIND'] = args.bind + if args.fatal is not None: + config['LOG_FATAL'] = args.fatal + if args.once_lvl is not None: + config['LOG_ONCE_LEVEL'] = args.once_lvl config['DEBUG'] = args.verbosity == logging.DEBUG config.update(args.overrides) @@ -449,8 +461,11 @@ def get_instance(args): def autoreload(args, excqueue=None): - console.print(' --- AutoReload Mode: Monitoring `content`, `theme` and' - ' `settings` for changes. ---') + log.console.print( + ' --- ' + 'AutoReload Mode: Monitoring `content`, `theme` and `settings` for changes.' + ' ---' + ) pelican, settings = get_instance(args) watcher = FileSystemWatcher(args.settings, Readers, settings) sleep = False @@ -469,8 +484,11 @@ def autoreload(args, excqueue=None): watcher.update_watchers(settings) if any(modified.values()): - console.print('\n-> Modified: {}. re-generating...'.format( - ', '.join(k for k, v in modified.items() if v))) + log.console.print( + '\n-> Modified: {}. re-generating...'.format( + ', '.join(k for k, v in modified.items() if v) + ) + ) pelican.run() except KeyboardInterrupt: @@ -492,9 +510,14 @@ def autoreload(args, excqueue=None): def listen(server, port, output, excqueue=None): - # set logging level to at least "INFO" (so we can see the server requests) - if logger.level < logging.INFO: - logger.setLevel(logging.INFO) + # elevate logging to *at least* "INFO" level to see the server requests: + server_logger = logging.getLogger('pelican.server') + if server_logger.getEffectiveLevel() > logging.INFO: + logObj = server_logger + while logObj: + if logObj.level and logObj.level > logging.INFO: + logObj.setLevel(logging.INFO) + logObj = logObj.parent RootedHTTPServer.allow_reuse_address = True try: @@ -507,8 +530,8 @@ def listen(server, port, output, excqueue=None): return try: - console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format( - server, port)) + log.console.print("Serving site at: http://{}:{} - Tap CTRL-C to stop".format( + server, port)) httpd.serve_forever() except Exception as e: if excqueue is not None: @@ -524,15 +547,28 @@ def listen(server, port, output, excqueue=None): def main(argv=None): args = parse_arguments(argv) - logs_dedup_min_level = getattr(logging, args.logs_dedup_min_level) - init_logging(level=args.verbosity, fatal=args.fatal, - name=__name__, logs_dedup_min_level=logs_dedup_min_level) + # logging: add rich handler & set verbosity + debug_mode = logging.DEBUG == args.verbosity + log.init(level=args.verbosity) logger.debug('Pelican version: %s', __version__) logger.debug('Python version: %s', sys.version.split()[0]) try: - pelican, settings = get_instance(args) + logger.info("Reading config...") + + # temp handler to catch possible "fatal" logs while applying settings + with log.temp_handler( + logging.getLogger(), + log.AbortHandler(args.fatal or logging.CRITICAL) + ): + pelican, settings = get_instance(args) + + if debug_mode: + # LOG_FILTER is disabled in --debug + settings.pop('LOG_FILTER') + + log.configure(settings) if args.autoreload and args.listen: excqueue = multiprocessing.Queue() @@ -560,13 +596,15 @@ def main(argv=None): else: watcher = FileSystemWatcher(args.settings, Readers, settings) watcher.check() - with console.status("Generating..."): + with log.console.status("Generating..."): pelican.run() except KeyboardInterrupt: logger.warning('Keyboard interrupt received. Exiting.') except Exception as e: - logger.critical("%s: %s", e.__class__.__name__, e) + if debug_mode: # include traceback: + log.AbortHandler.trim_traceback(e) + log.console.print_exception() + else: + log.console.print(e) - if args.verbosity == logging.DEBUG: - console.print_exception() sys.exit(getattr(e, 'exitcode', 1)) diff --git a/pelican/log.py b/pelican/log.py index be176ea89d..1000019951 100644 --- a/pelican/log.py +++ b/pelican/log.py @@ -1,142 +1,446 @@ import logging from collections import defaultdict +from contextlib import contextmanager from rich.console import Console from rich.logging import RichHandler -__all__ = [ - 'init' -] console = Console() +# handles messages logged from anywhere within the pelican namespace. +PKG_LOGGER = logging.getLogger('pelican') + + +def severity_from_name(string_val): + """Convert string value to appropriate severity (AKA log level).""" + if not string_val: + return logging.NOTSET + level_name = string_val.upper().rstrip('S') + try: + return logging.getLevelNamesMapping()[level_name] + except AttributeError: # Python < 3.11 + return logging._nameToLevel.copy()[level_name] + + +@contextmanager +def temp_filter(logger, flt): + """ + Context in which a custom filter is temporarily applied to logger. + """ + try: + logger.addFilter(flt) + yield flt + finally: + logger.removeFilter(flt) + + +@contextmanager +def temp_handler(logger, hnd): + """ + Context in which a custom handler is temporarily applied to logger. + """ + try: + logger.addHandler(hnd) + yield hnd + finally: + logger.removeHandler(hnd) + + +class AbortException(Exception): + def __init__(self, *args, record=None): + self.record = record + super().__init__(*args) + + +class AbortHandler(logging.Handler): + """ + Set severity level to trigger an AbortException (eg. to exit application). + + Example:: + + # add a handler to raise an exception on logs of warning level or higher + root = logging.getLogger() + root.addHandler(AbortHandler(logging.WARNING)) + root.info("Safe") + try: + root.warning("Critical!") + except AbortHandler.EXCEPTION_CLS as e: + print(e) + print('exiting...') + raise + """ + EXCEPTION_CLS = AbortException + _HANDLER_DEPTH = 3 + + @classmethod + def trim_traceback(cls, e): + # only trim if exception came from this handler... + if isinstance(e, cls.EXCEPTION_CLS): + # find bottom of tb ... + next_tb = e.__traceback__.tb_next + record = e.record + while next_tb: + # Seek traceback for logging call that triggered the handler + if ( + next_tb.tb_frame.f_code.co_filename == record.pathname + and + next_tb.tb_lineno == record.lineno + ): + next_tb.tb_next = None # remaining traceback is irrelevant + break + else: + next_tb = next_tb.tb_next + + def emit(self, record): + # Fail on set severity level (or higher) + if (record.levelno >= self.level): + exc = AbortException( + 'Aborting on log severity of {lvl!r} or greater.'.format( + lvl=logging.getLevelName(self.level) + ), + record=record + ) + raise exc + class LimitFilter(logging.Filter): """ - Remove duplicates records, and limit the number of records in the same - group. + Modify logs using a generic message (`limit_msg`) after reaching threshold. + + Messages logged with the `limit_msg` keyword in `extra` are tracked as a + group; once the group has reached the threshold, the record is modified to + use the limit_msg instead. All subsequent log messages in the group are + ignored. + + EG.:: + + inputs = list(range(43, 55)) + for i in inputs: + logger.warning( + '%s is not the answer', i, + extra={ + 'limit_msg': 'Many erroneous answers: %s', + 'limit_args': inputs + } + ) + """ + + @classmethod + def get_filters(cls, logger): + """Return a list of LimitFilters currently on logger.""" + found = [] + for x in logger.filters: + if isinstance(x, cls): + found.append(x) + return found + + def __init__(self, threshold=5): + """ + :int threshold: + For messages logged with the `limit_msg` in extra, the limit_msg + will be used after the threshold is met. + + Example:: + + if resource.is_missing: + logger.warning( + 'The resource %s is missing', resource.name, + extra={'limit_msg': 'Other resources were missing'} + ) + + # if threshold=5, logs: + # WARNING: The resource prettiest_cat.jpg is missing + # WARNING: The resource best_cat_ever.jpg is missing + # WARNING: The resource cutest_cat.jpg is missing + # WARNING: The resource lolcat.jpg is missing + # WARNING: Other resources were missing + + """ + super().__init__() + self.limit_threshold = None + if threshold is not None: + # react at, not after, threshold + self.limit_threshold = threshold - 1 + self.group_count = defaultdict(int) + + def filter(self, record): + # ``limit_msg`` handling + limit_template = record.__dict__.get('limit_msg', None) + if limit_template: + key = (record.levelno, limit_template) + counter = self.group_count[key] + if counter > self.limit_threshold: + return False # reject: threshold exceeded + elif counter == self.limit_threshold: + # change message to the "limit_msg" + record.msg = limit_template + record.args = record.__dict__.get('limit_args', ()) + + self.group_count[key] += 1 + + return True # accept + + +class OnceFilter(logging.Filter): + """ + Allow only the first occurance of matching log messages. + + Similar to the "once" behavior in the ``warnings`` module. - Groups are specified by the message to use when the number of records in - the same group hit the limit. - E.g.: log.warning(('43 is not the answer', 'More erroneous answers')) + Parameters + ---------- + level: int + Only enable log de-duplication for levels equal to or above this + value. """ - LOGS_DEDUP_MIN_LEVEL = logging.WARNING + def __init__(self, level=logging.NOTSET): + super().__init__() + self.level = level + self._ignored = set() - _ignore = set() - _raised_messages = set() - _threshold = 5 - _group_count = defaultdict(int) + def ignore(self, level, message): + self._ignored.add((level, message)) def filter(self, record): - # don't limit log messages for anything above "warning" - if record.levelno > self.LOGS_DEDUP_MIN_LEVEL: + if record.levelno < self.level: + # Do not dedeuplicate messages of lower severity return True - # extract group - group = record.__dict__.get('limit_msg', None) - group_args = record.__dict__.get('limit_args', ()) + # ignore duplicates of resolved/formatted messages: + message_key = (record.levelno, record.getMessage()) # resolved msg + ignored = self._ignored - # ignore record if it was already raised - message_key = (record.levelno, record.getMessage()) - if message_key in self._raised_messages: - return False + if message_key in ignored: + return False # skip messages already encountered else: - self._raised_messages.add(message_key) - - # ignore LOG_FILTER records by templates or messages - # when "debug" isn't enabled - logger_level = logging.getLogger().getEffectiveLevel() - if logger_level > logging.DEBUG: - template_key = (record.levelno, record.msg) - message_key = (record.levelno, record.getMessage()) - if (template_key in self._ignore or message_key in self._ignore): - return False - - # check if we went over threshold - if group: - key = (record.levelno, group) - self._group_count[key] += 1 - if self._group_count[key] == self._threshold: - record.msg = group - record.args = group_args - elif self._group_count[key] > self._threshold: - return False + ignored.add(message_key) # continue, but mark to ignore next time + return True -class LimitLogger(logging.Logger): +class BlacklistFilter(logging.Filter): + """Use to prevent messages of specific level & content from emitting.""" + + def __init__(self, item_pairs=()): + super().__init__() + self._ignored = set() + for lvl, msg in item_pairs: + self.ignore(lvl, msg) + + @classmethod + def get_filters(cls, logger): + """Return a list of BlacklistFilters currently on logger.""" + found = [] + for x in logger.filters: + if isinstance(x, cls): + found.append(x) + return found + + def ignore(self, level, message): + self._ignored.add((level, message)) + + def filter(self, record): + if not self._ignored: + return True + if ( + ((record.levelno, record.msg) in self._ignored) + or + ((record.levelno, record.getMessage()) in self._ignored) + ): + return False # do not emit msg + return True + + +def init( + level=None, # init root log level + handlers=None, # root log handler +): """ - A logger which adds LimitFilter automatically + Wrapper for logging.basicConfig, but with a rich.console default handler. + + If the root logger has *not already been configured* (eg. has no handlers), + then the specified handler (default:rich.ConsoleHandler) will be added, and + the logging level will be set (Python defaults to WARNING level). """ + if handlers is None: # default handler: + handlers = [RichHandler(console=console)] + logging.basicConfig( + level=level, + format="%(message)s", # default format + datefmt="[%H:%M:%S]", # default date format + handlers=handlers + ) - limit_filter = LimitFilter() - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.enable_filter() +def configure(settings): + """ + Apply logging settings, and return the configured logger. - def disable_filter(self): - self.removeFilter(LimitLogger.limit_filter) + Applicable settings + -------------------- - def enable_filter(self): - self.addFilter(LimitLogger.limit_filter) + LOG_LOGGER: + Default: ``"pelican"`` (PKG_LOGGER.name). + The name of the logger to apply filtering to. -class FatalLogger(LimitLogger): - warnings_fatal = False - errors_fatal = False + LOG_LIMIT_THRESHOLD: + Default: ``5``. - def warning(self, *args, **kwargs): - super().warning(*args, **kwargs) - if FatalLogger.warnings_fatal: - raise RuntimeError('Warning encountered') + The nth occurance at which a log message will be substituted with its + ``limit_msg`` value. - def error(self, *args, **kwargs): - super().error(*args, **kwargs) - if FatalLogger.errors_fatal: - raise RuntimeError('Error encountered') + LOG_ONCE_LEVEL: + Default: ``logging.WARNING`` + The lowest severity at which log messages will be deduplicated. -logging.setLoggerClass(FatalLogger) -# force root logger to be of our preferred class -logging.getLogger().__class__ = FatalLogger + LOG_FILTER: + An iterable of (severity, msg) tuples which will be silenced by the + logger. + LOG_FATAL: + The minimum severity at which logs should raise an ``AbortException``. -def init(level=None, fatal='', handler=RichHandler(console=console), name=None, - logs_dedup_min_level=None): - FatalLogger.warnings_fatal = fatal.startswith('warning') - FatalLogger.errors_fatal = bool(fatal) + NOTE: this appends a (filtered) handler to the root logger; this + handler will *only* act on messages the LOG_LOGGER would propogate -- + effectively still targeting *only* logs from the specified logger (and + its sub-loggers), despite being at the root level. This allows all + handlers to complete *before* triggering the ``AbortException``. - LOG_FORMAT = "%(message)s" - logging.basicConfig( - level=level, - format=LOG_FORMAT, - datefmt="[%H:%M:%S]", - handlers=[handler] + """ + cfg = dict( # config values that have defaults: + LOG_LOGGER=PKG_LOGGER.name, + LOG_LIMIT_THRESHOLD=5, + LOG_ONCE_LEVEL="WARNING" ) + cfg.update(settings.copy()) + + # validate & transform types/values: treat ``None`` as logging.NOTSET + if not isinstance(cfg.get('LOG_ONCE_LEVEL') or logging.NOTSET, int): + cfg['LOG_ONCE_LEVEL'] = severity_from_name(cfg['LOG_ONCE_LEVEL']) - logger = logging.getLogger(name) + if not isinstance(cfg.get('LOG_LOGGER'), logging.Logger): + cfg['LOG_LOGGER'] = logging.getLogger(cfg.get('LOG_LOGGER')) - if level: - logger.setLevel(level) - if logs_dedup_min_level: - LimitFilter.LOGS_DEDUP_MIN_LEVEL = logs_dedup_min_level + # == FILTERING: + logger = set_filters( + logger=cfg.get('LOG_LOGGER'), + limit_threshold=cfg.get('LOG_LIMIT_THRESHOLD'), + once_lvl=cfg.get('LOG_ONCE_LEVEL'), + blacklist=cfg.get('LOG_FILTER') + ) + # == HANDLING: + if cfg.get('LOG_FATAL'): + hnd = AbortHandler(cfg['LOG_FATAL']) -def log_warnings(): - import warnings - logging.captureWarnings(True) - warnings.simplefilter("default", DeprecationWarning) - init(logging.DEBUG, name='py.warnings') + # Add to root logger so all other handlers can write/emit first. + root = logging.getLogger() + if logger is not root: + # handler only acts on records from specified logger (& children). + hnd.addFilter(logging.Filter(logger.name)) + root.addHandler(hnd) + return logger -if __name__ == '__main__': - init(level=logging.DEBUG, name=__name__) - root_logger = logging.getLogger(__name__) - root_logger.debug('debug') - root_logger.info('info') - root_logger.warning('warning') - root_logger.error('error') - root_logger.critical('critical') +def set_filters( + logger=None, # logger of interest + limit_threshold=None, # LimitFilter: when to start using limit_msg + once_lvl=None, # OnceFilter: which severity to dedupe msgs at + blacklist=None # BlacklistFilter: +): + """ + Apply filtering on the logger (Default: ``PKG_LOGGER``). + + Up to three filters are (optionally) applied: + ``LimitFilter``: + Modifies log records using `limit_msg` in the `extras` dict. The + `limit_threshold` specifies how many records using the same + limit_msg are emitted before converting the message. See + ``LimitFilter`` for an example. + ``OnceFilter``: + De-duplicates messages of identical severity & content. Only + affects messages of once_lvl and higher. + ``BlacklistFilter``: + Prevents log records of (severity, msg) from being emitted. + + Returns + ------- + The logger object the filters were applied to. + """ + if logger is None: + logger = PKG_LOGGER + + if blacklist: + try: + flt = BlacklistFilter.get_filters(logger)[0] + except IndexError: + flt = BlacklistFilter() + logger.addFilter(flt) + + for lvl, msg in blacklist: + flt.ignore(lvl, msg) + + if once_lvl is not None: + # deduplicate this severity and above: + logger.addFilter(OnceFilter(once_lvl)) + + if limit_threshold is not None: + try: + flt = LimitFilter.get_filters(logger)[0].threshold = limit_threshold + except IndexError: + logger.addFilter(LimitFilter(limit_threshold)) + + logger.debug("{!r} : log filtering configured.".format(logger)) + return logger + + +if __name__ == "__main__": + init(level=logging.INFO) + segue = "And now for something completely different: %s" + ignored_records = [ + (logging.WARNING, "I'd like to have an argument, please."), + (logging.INFO, segue) + ] + configure( + dict( + LOG_FATAL=logging.ERROR, + LOG_FILTER=ignored_records + ) + ) + logger = PKG_LOGGER + logger.info("hey") + logger.info("hey") # INFO < WARNING, not deduplicated + logger.log(*ignored_records[0]) # blacklisted + logger.log(*ignored_records[1]) # blacklisted + logger.info(segue, "a man with 3 buttocks.") # blacklisted + logger.warning("oof") + logger.warning("oof") # deduplicated + logger.warning("oof") # deduplicated + + # on 5th occurance, limit_msg is used. + for i in range(1, 11): + logger.warning("watch out x%s!", i, + extra={"limit_msg": "[more watches were outed]"}) + + # on 5th occurance, limit_msg is used, formatting msg w/ limit_args + inputs = list(range(43, 55)) + for i in inputs: + logger.warning( + '%s is not the answer', i, + extra={ + 'limit_msg': 'Many erroneous answers: %s', + 'limit_args': inputs + } + ) + + # logger not from pkg, will not trip the AbortHandler + logging.getLogger("notpelican").error("that's not good...") + + # logger from pkg: *will* trip the AbortHandler + logging.getLogger("pelican.log.example").error("x_x") + logger.critical("This line should not run.") diff --git a/pelican/settings.py b/pelican/settings.py index 5b495e863a..8752d92218 100644 --- a/pelican/settings.py +++ b/pelican/settings.py @@ -7,8 +7,6 @@ import re from os.path import isabs -from pelican.log import LimitFilter - def load_source(name, path): spec = importlib.util.spec_from_file_location(name, path) @@ -507,9 +505,18 @@ def configure_settings(settings): raise Exception('You need to specify a path containing the content' ' (see pelican --help for more information)') - # specify the log messages to be ignored - log_filter = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER']) - LimitFilter._ignore.update(set(log_filter)) + # LOG_FILTER expects a list of (log_level, str) pairs. + blacklist = settings.get('LOG_FILTER', DEFAULT_CONFIG['LOG_FILTER']) + for n, (lvl, msg) in enumerate(blacklist): + if not isinstance(lvl, int): + raise ValueError( + "Invalid LOG_FILTER (item {n}): level={lvl}".format(**locals()) + ) + if not isinstance(msg, str): + raise ValueError( + "Invalid LOG_FILTER (item {n}): msg={msg!r}".format(**locals()) + ) + settings['LOG_FILTER'] = blacklist # lookup the theme in "pelican/themes" if the given one doesn't exist if not os.path.isdir(settings['THEME']): diff --git a/pelican/tests/__init__.py b/pelican/tests/__init__.py index 564e417cf5..27951f3c6b 100644 --- a/pelican/tests/__init__.py +++ b/pelican/tests/__init__.py @@ -1,15 +1,13 @@ import logging import warnings -from pelican.log import log_warnings +# Direct Warnings to the "py.warnings" logger +logging.captureWarnings(True) -# redirect warnings module to use logging instead -log_warnings() - -# setup warnings to log DeprecationWarning's and error on -# warnings in pelican's codebase +# enable DeprecationWarnings warnings.simplefilter("default", DeprecationWarning) +# treat warnings in pelican's codebase as errors warnings.filterwarnings("error", ".*", Warning, "pelican") -# Add a NullHandler to silence warning about no available handlers -logging.getLogger().addHandler(logging.NullHandler()) +# Use: pytest --cli-log-level DEBUG for debug-level logging +logging.basicConfig(handlers=[logging.NullHandler()]) diff --git a/pelican/tests/support.py b/pelican/tests/support.py index 55ddf625fe..e15dcf073e 100644 --- a/pelican/tests/support.py +++ b/pelican/tests/support.py @@ -11,12 +11,13 @@ from logging.handlers import BufferingHandler from shutil import rmtree from tempfile import mkdtemp +from typing import Optional from pelican.contents import Article from pelican.readers import default_metadata from pelican.settings import DEFAULT_CONFIG -__all__ = ['get_article', 'unittest', ] +__all__ = ['get_article', 'unittest', 'LogCountHandler'] @contextmanager @@ -199,40 +200,94 @@ class LogCountHandler(BufferingHandler): def __init__(self, capacity=1000): super().__init__(capacity) - def count_logs(self, msg=None, level=None): - return len([ - rec - for rec - in self.buffer - if (msg is None or re.match(msg, rec.getMessage())) and - (level is None or rec.levelno == level) - ]) - - def count_formatted_logs(self, msg=None, level=None): - return len([ - rec - for rec - in self.buffer - if (msg is None or re.search(msg, self.format(rec))) and - (level is None or rec.levelno == level) - ]) - - -class LoggedTestCase(unittest.TestCase): - """A test case that captures log messages.""" - - def setUp(self): - super().setUp() - self._logcount_handler = LogCountHandler() - logging.getLogger().addHandler(self._logcount_handler) - - def tearDown(self): - logging.getLogger().removeHandler(self._logcount_handler) - super().tearDown() - - def assertLogCountEqual(self, count=None, msg=None, **kwargs): - actual = self._logcount_handler.count_logs(msg=msg, **kwargs) - self.assertEqual( - actual, count, - msg='expected {} occurrences of {!r}, but found {}'.format( - count, msg, actual)) + @classmethod + @contextmanager + def examine(cls, loggerObj): + """ + Context in which a logger's propagated messages can be examined. + + Yields + ====== + A handle to ``LogCountHandler.assert_count`` that has been added to the + specified logger for the duration of the context. + + The yielded caller can be used to assert whether a certain number of + log messages have occurred within the context. + """ + hnd = cls() + try: + loggerObj.addHandler(hnd) + yield hnd.assert_count + finally: + loggerObj.removeHandler(hnd) + + def assert_count( + self, + count: int, + msg: Optional[str] = None, + level: Optional[int] = None, + as_regex: bool = False + ): + """ + Assert how often the specified messages have been handled. + + Raises + ------- + AssertionError + """ + occurances = self.count_logs(msg, level, as_regex) + if count != occurances: + report = 'Logged occurrence' + if msg is not None: + report += ' of {!r}'.format(msg) + + if level is not None: + raise AssertionError( + ' at {}'.format(logging.getLevelName(level)) + ) + raise AssertionError( + report + ': expected/found {}/{}'.format(count, occurances) + ) + + def match_record( + self, + pattern: re.Pattern, + record: logging.LogRecord, + level: Optional[int] + ) -> Optional[re.Match]: + """ + Return regex object if pattern found in message at specified severity. + """ + if level is not None and level != record.levelno: + return None + + # prefix pattern with "^" for re.match behavior + return pattern.search(record.getMessage()) + + def count_logs( + self, + msg: Optional[str], + level: Optional[int], + as_regex: bool = False + ) -> int: + """ + Returns the number of times a message has been logged. + """ + if not msg: + if not level: + matched = self.buffer # all logged messages + else: + # all logged messages of matching severity level + matched = [rec for rec in self.buffer if rec.levelno == level] + else: + # all logged messages matching the regex and level + if not as_regex: + msg = re.escape(msg) + + pattern = re.compile(msg) + matched = [ + record for record in self.buffer + if self.match_record(pattern, record, level) + ] + + return len(matched) diff --git a/pelican/tests/test_contents.py b/pelican/tests/test_contents.py index 3a223b5a0e..27aa03d14e 100644 --- a/pelican/tests/test_contents.py +++ b/pelican/tests/test_contents.py @@ -2,6 +2,7 @@ import locale import logging import os.path +import unittest from posixpath import join as posix_join from sys import platform @@ -10,17 +11,15 @@ from pelican.contents import Article, Author, Category, Page, Static from pelican.plugins.signals import content_object_init from pelican.settings import DEFAULT_CONFIG -from pelican.tests.support import (LoggedTestCase, get_context, get_settings, - unittest) +from pelican.tests.support import (LogCountHandler, get_context, get_settings) from pelican.utils import (path_to_url, posixize_path, truncate_html_words) - # generate one paragraph, enclosed with

TEST_CONTENT = str(generate_lorem_ipsum(n=1)) TEST_SUMMARY = generate_lorem_ipsum(n=1, html=False) -class TestBase(LoggedTestCase): +class TestBase(unittest.TestCase): def setUp(self): super().setUp() @@ -41,19 +40,9 @@ def setUp(self): }, 'source_path': '/path/to/file/foo.ext' } - self._disable_limit_filter() def tearDown(self): locale.setlocale(locale.LC_ALL, self.old_locale) - self._enable_limit_filter() - - def _disable_limit_filter(self): - from pelican.contents import logger - logger.disable_filter() - - def _enable_limit_filter(self): - from pelican.contents import logger - logger.enable_filter() def _copy_page_kwargs(self): # make a deep copy of page_kwargs @@ -82,11 +71,15 @@ def test_use_args(self): def test_mandatory_properties(self): # If the title is not set, must throw an exception. - page = Page('content') - self.assertFalse(page._has_valid_mandatory_properties()) - self.assertLogCountEqual( + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + page = Page('content') + self.assertFalse(page._has_valid_mandatory_properties()) + count_msgs( count=1, msg="Skipping .*: could not find information about 'title'", + as_regex=True, level=logging.ERROR) page = Page('content', metadata={'title': 'foobar'}) self.assertTrue(page._has_valid_mandatory_properties()) @@ -132,11 +125,16 @@ def test_summary_get_summary_warning(self): page_kwargs = self._copy_page_kwargs() page = Page(**page_kwargs) self.assertEqual(page.summary, TEST_SUMMARY) - self.assertEqual(page._get_summary(), TEST_SUMMARY) - self.assertLogCountEqual( + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + self.assertEqual(page._get_summary(), TEST_SUMMARY) + count_msgs( count=1, - msg=r"_get_summary\(\) has been deprecated since 3\.6\.4\. " - "Use the summary decorator instead", + msg=( + "_get_summary() has been deprecated since 3.6.4. Use the" + " summary decorator instead" + ), level=logging.WARNING) def test_slug(self): @@ -761,7 +759,7 @@ def test_valid_save_as_passes_valid(self): self.assertTrue(article._has_valid_save_as()) -class TestStatic(LoggedTestCase): +class TestStatic(unittest.TestCase): def setUp(self): super().setUp() @@ -986,30 +984,34 @@ def test_unknown_link_syntax(self): metadata={'title': 'fakepage'}, settings=self.settings, source_path=os.path.join('dir', 'otherdir', 'fakepage.md'), context=self.context) - content = page.get_content('') - - self.assertEqual(content, html) - self.assertLogCountEqual( - count=1, - msg="Replacement Indicator 'unknown' not recognized, " - "skipping replacement", - level=logging.WARNING) + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + content = page.get_content('') + self.assertEqual(content, html) + count_msgs( + count=1, + msg="Replacement Indicator 'unknown' not recognized, " + "skipping replacement", + level=logging.WARNING) def test_link_to_unknown_file(self): "{filename} link to unknown file should trigger warning." html = 'link' - page = Page(content=html, - metadata={'title': 'fakepage'}, settings=self.settings, - source_path=os.path.join('dir', 'otherdir', 'fakepage.md'), - context=self.context) - content = page.get_content('') - - self.assertEqual(content, html) - self.assertLogCountEqual( - count=1, - msg="Unable to find 'foo', skipping url replacement.", - level=logging.WARNING) + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + page = Page(content=html, + metadata={'title': 'fakepage'}, settings=self.settings, + source_path=os.path.join('dir', 'otherdir', 'fakepage.md'), + context=self.context) + content = page.get_content('') + self.assertEqual(content, html) + count_msgs( + count=1, + msg="Unable to find 'foo', skipping url replacement.", + level=logging.WARNING) def test_index_link_syntax_with_spaces(self): """{index} link syntax triggers url replacement diff --git a/pelican/tests/test_log.py b/pelican/tests/test_log.py index 1f2fb83a84..6cf1d05de5 100644 --- a/pelican/tests/test_log.py +++ b/pelican/tests/test_log.py @@ -1,82 +1,268 @@ import logging import unittest -from collections import defaultdict from contextlib import contextmanager from pelican import log from pelican.tests.support import LogCountHandler -class TestLog(unittest.TestCase): +LOG_LVLS = [ + logging.DEBUG, + logging.INFO, + logging.WARNING, + logging.ERROR, + logging.CRITICAL +] +# Set this module's logging level to DEBUG to prevent +# tests' loggers effectiveLevels from reaching **root logger**, +# which we *don't* want to play with here. +logging.getLogger(__name__).setLevel(logging.DEBUG) + + +class _LoggingTestCase(unittest.TestCase): + """Test Case w/ test-specific loggers, and contexts for temp filters.""" + + def setUp(self): + "Each test should use a unique logger and handle all levels." + self.logger = logging.getLogger(self.id()) + self.logger.setLevel(logging.NOTSET) # log all levels + + @contextmanager + def temp_filter(self, flt): + """ + Context in which a filter is temporarily applied to test's logger. + """ + with log.temp_filter(self.logger, flt): + yield flt + + +class TestLogLevel(unittest.TestCase): + + def test_severity_name(self): + self.assertEqual(log.severity_from_name('errors'), logging.ERROR) + self.assertEqual(log.severity_from_name('error'), logging.ERROR) + self.assertEqual(log.severity_from_name('WARNINGS'), logging.WARNING) + self.assertEqual(log.severity_from_name('info'), logging.INFO) + self.assertEqual(log.severity_from_name('INFO'), logging.INFO) + self.assertEqual(log.severity_from_name(''), logging.NOTSET) + self.assertEqual(log.severity_from_name('error'), logging.ERROR) + + # add a custom level: + logging.addLevelName(5, "TRACE") + self.assertEqual(log.severity_from_name('trace'), 5) + + with self.assertRaises(KeyError): + self.assertEqual(log.severity_from_name("wrong"), logging.NOTSET) + + +class TestFatal(_LoggingTestCase): + + def test_levels(self): + logger = self.logger + with log.temp_handler(self.logger, log.AbortHandler(logging.ERROR)): + logger.debug("Calculating airspeed velocity of an unladen swallow...") + logger.info("Tis but a scratch.") + logger.warning("And now for something completely different.") + with self.assertRaises(log.AbortException): + logger.error("This is a late parrot!") + with self.assertRaises(log.AbortException): + logger.critical("Unexpected: Spanish Inquisition.") + + +class TestLogInit(_LoggingTestCase): + + def test_alt_logger(self): + self.assertEqual(log.set_filters(self.logger), self.logger) + + def test_default(self): + null_settings = dict( + LOG_LIMIT_THRESHOLD=None, + LOG_ONCE_LEVEL=None, + ) + # assert returned logger is "pelican". + pkg_logger = logging.getLogger('pelican') + self.assertEqual(pkg_logger.filters, []) + self.assertEqual(pkg_logger, log.configure(null_settings)) + # because no filtering settings supplied, + # no filters should have been added: + self.assertEqual(pkg_logger.filters, []) + + # assert no pre-existing filters on test's logger: + self.assertEqual(log.LimitFilter.get_filters(self.logger), []) + + logger = log.configure(dict(LOG_LOGGER=self.logger.name)) + + # assert returned logger has been set up with a LimitFilter + self.assertNotEqual(log.LimitFilter.get_filters(self.logger), []) + + with LogCountHandler.examine(logger) as count_msgs: + msg = 'init with alt logger: %s' % logger + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) # suppressed for WARNINGS and worse + count_msgs( + sum([1 if lvl >= logging.WARNING # dedupe warnings & above + else 2 # lvls below should not be deduped + for lvl in LOG_LVLS]), + msg + ) + + def test_dedupe_all(self): + logger = log.set_filters(self.logger, once_lvl=logging.NOTSET) # dedup all + with LogCountHandler.examine(logger) as count_msgs: + msg = 'init with alt logger: %s' % logger + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) # poss. suppressed + count_msgs(len(LOG_LVLS), msg) + + def test_dedupe_none(self): + logger = log.set_filters(self.logger, once_lvl=None) + with LogCountHandler.examine(logger) as count_msgs: + msg = 'init with alt logger: %s' % logger + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) # do not suppress + count_msgs(len(LOG_LVLS * 2), msg) + + +class classTestOnceFilter(_LoggingTestCase): + + def test_levels(self): + """ + Don't de-duplicate messages for levels below filter's value. + """ + logger = self.logger + msg = "levels_test" + with LogCountHandler.examine(logger) as count_msgs: + with self.temp_filter(log.OnceFilter()): # default=NOTSET: dedup all + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) # copy should be suppressed + count_msgs(len(LOG_LVLS), msg) + + with LogCountHandler.examine(logger) as count_msgs: + with self.temp_filter(log.OnceFilter(logging.WARNING)): + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) # copy should be suppressed + count_msgs( + sum([1 if lvl >= logging.WARNING # dedupe warnings & above + else 2 # lvls below should not be deduped + for lvl in LOG_LVLS]), + msg + ) + + def test_deduplication(self): + """Confirm that duplicate messages are ignored.""" + logger = self.logger + + # ensure counting is correct before adding filters + msg = "without filter" + with LogCountHandler.examine(logger) as count_msgs: + for level in LOG_LVLS: + logger.log(level, msg) + # count w/ level requirements + count_msgs(1, msg, level=logging.DEBUG) + count_msgs(1, msg, level=logging.INFO) + # count w/ no level requirqement + count_msgs(len(LOG_LVLS), msg) + + # add filter, check count + with self.temp_filter(log.OnceFilter()): + msg = "with filter" + with LogCountHandler.examine(logger) as count_msgs: + for level in LOG_LVLS: + logger.log(level, msg) + logger.log(level, msg) + logger.log(level, msg) + + # counting w/ level requirements + for level in LOG_LVLS: + count_msgs(1, msg, level=level) + + # counting w/ no level requirement + count_msgs(len(LOG_LVLS), msg) + + # check counting again after filter removed + msg = "removed filter" + with LogCountHandler.examine(logger) as count_msgs: + for level in LOG_LVLS: + logger.log(level, msg) + count_msgs(5, msg) + + +class TestLimitFilter(_LoggingTestCase): + def setUp(self): super().setUp() - self.logger = logging.getLogger(__name__) - self.handler = LogCountHandler() - self.logger.addHandler(self.handler) + self.logger.setLevel(logging.DEBUG) - def tearDown(self): - self._reset_limit_filter() - super().tearDown() + def test_get_filters(self): + """Assert LimitFilters on a logger are collected.""" + fltrs = [log.LimitFilter() for x in range(5)] - def _reset_limit_filter(self): - log.LimitFilter._ignore = set() - log.LimitFilter._raised_messages = set() - log.LimitFilter._threshold = 5 - log.LimitFilter._group_count = defaultdict(int) + assert not self.logger.filters - @contextmanager - def reset_logger(self): - try: - yield None - finally: - self._reset_limit_filter() - self.handler.flush() - - def test_log_filter(self): - def do_logging(): - for i in range(5): - self.logger.warning('Log %s', i) - self.logger.warning('Another log %s', i) - # no filter - with self.reset_logger(): - do_logging() - self.assertEqual( - self.handler.count_logs('Log \\d', logging.WARNING), - 5) - self.assertEqual( - self.handler.count_logs('Another log \\d', logging.WARNING), - 5) - - # filter by template - with self.reset_logger(): - log.LimitFilter._ignore.add((logging.WARNING, 'Log %s')) - do_logging() - self.assertEqual( - self.handler.count_logs('Log \\d', logging.WARNING), - 0) - self.assertEqual( - self.handler.count_logs('Another log \\d', logging.WARNING), - 5) - - # filter by exact message - with self.reset_logger(): - log.LimitFilter._ignore.add((logging.WARNING, 'Log 3')) - do_logging() - self.assertEqual( - self.handler.count_logs('Log \\d', logging.WARNING), - 4) - self.assertEqual( - self.handler.count_logs('Another log \\d', logging.WARNING), - 5) - - # filter by both - with self.reset_logger(): - log.LimitFilter._ignore.add((logging.WARNING, 'Log 3')) - log.LimitFilter._ignore.add((logging.WARNING, 'Another log %s')) - do_logging() - self.assertEqual( - self.handler.count_logs('Log \\d', logging.WARNING), - 4) - self.assertEqual( - self.handler.count_logs('Another log \\d', logging.WARNING), - 0) + for each in fltrs: + self.logger.addFilter(each) + + self.assertEqual(fltrs, log.LimitFilter.get_filters(self.logger)) + + for each in fltrs: + self.logger.removeFilter(each) + + assert not self.logger.filters + + def test_threshold(self): + """ + Assert that the `limit_msg` activates after the threshold is reached. + """ + threshold = 4 + with self.temp_filter(log.LimitFilter(threshold)): + msg = "Threshold: %s" + limits = {'limit_msg': "Threshold exceeded"} + with LogCountHandler.examine(self.logger) as count_msgs: + for i in range(1, threshold + 3): + self.logger.warning(msg, i, extra=limits) + # up to threshold + count_msgs(threshold - 1, "Threshold: [1-9+]", as_regex=True) + # exactly one "limit_msg" + count_msgs(1, limits['limit_msg']) + # total count should be == threshold, all in all. + count_msgs(threshold) + + def test_limit_args(self): + """ + `limit_args` should be used in the `limit_msg`. + """ + threshold = 3 + with self.temp_filter(log.LimitFilter(threshold)): + limits = {'limit_msg': "Threshold exceeded @ %s %s", + 'limit_args': ('count of', threshold)} + with LogCountHandler.examine(self.logger) as count_msgs: + for i in range(threshold + 1): + self.logger.warning("Threshold: %s", i, extra=limits) + count_msgs(1, limits['limit_msg'] % limits['limit_args']) + + def test_ignore(self): + """Confirm blacklisted messages (eg. from config) are not emitted.""" + blacklist = [ + "SPAM SPAM SPAM", + "I'll have your spam. I love it." + ] + + with self.temp_filter(log.BlacklistFilter()) as flt: + for template in blacklist: + flt.ignore(logging.WARNING, template) + + with LogCountHandler.examine(self.logger) as count_msgs: + count_msgs(0) + self.logger.warning(blacklist[0]) # ignored + self.logger.warning(blacklist[0]) # ignored + self.logger.warning("I don't like spam!") # 1 + self.logger.warning("Sshh, dear, don't cause a fuss.") # 2 + self.logger.warning(blacklist[1]) # ignored + self.logger.info(blacklist[1]) # 3 : diff. level (below) + self.logger.error(blacklist[1]) # 4 : diff. level (above) + count_msgs(4) diff --git a/pelican/tests/test_pelican.py b/pelican/tests/test_pelican.py index 389dbb3de6..045ba71cb6 100644 --- a/pelican/tests/test_pelican.py +++ b/pelican/tests/test_pelican.py @@ -3,6 +3,7 @@ import os import subprocess import sys +import unittest from collections.abc import Sequence from shutil import rmtree from tempfile import mkdtemp @@ -10,8 +11,11 @@ from pelican import Pelican from pelican.generators import StaticGenerator from pelican.settings import read_settings -from pelican.tests.support import (LoggedTestCase, locale_available, - mute, unittest) +from pelican.tests.support import ( + LogCountHandler, + locale_available, + mute +) CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) SAMPLES_PATH = os.path.abspath(os.path.join( @@ -35,7 +39,7 @@ def recursiveDiff(dcmp): return diff -class TestPelican(LoggedTestCase): +class TestPelican(unittest.TestCase): # general functional testing for pelican. Basically, this test case tries # to run pelican in different situations and see how it behaves @@ -102,13 +106,19 @@ def test_basic_generation_works(self): 'LOCALE': locale.normalize('en_US'), }) pelican = Pelican(settings=settings) - mute(True)(pelican.run)() + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + mute(True)(pelican.run)() + count_msgs( + count=1, + msg="Unable to find .* skipping url replacement", + as_regex=True, + level=logging.WARNING + ) self.assertDirsEqual( - self.temp_path, os.path.join(OUTPUT_PATH, 'basic')) - self.assertLogCountEqual( - count=1, - msg="Unable to find.*skipping url replacement", - level=logging.WARNING) + self.temp_path, os.path.join(OUTPUT_PATH, 'basic') + ) def test_custom_generation_works(self): # the same thing with a specified set of settings should work @@ -194,16 +204,16 @@ def test_write_only_selected(self): ], 'LOCALE': locale.normalize('en_US'), }) - pelican = Pelican(settings=settings) - logger = logging.getLogger() - orig_level = logger.getEffectiveLevel() + + logger = logging.getLogger("pelican.writers") logger.setLevel(logging.INFO) - mute(True)(pelican.run)() - logger.setLevel(orig_level) - self.assertLogCountEqual( - count=2, - msg="Writing .*", - level=logging.INFO) + with LogCountHandler.examine(logger) as count_msgs: + pelican = Pelican(settings=settings) + mute(True)(pelican.run)() + count_msgs(count=2, + msg="Writing .*", + as_regex=True, + level=logging.INFO) def test_cyclic_intersite_links_no_warnings(self): settings = read_settings(path=None, override={ @@ -211,37 +221,46 @@ def test_cyclic_intersite_links_no_warnings(self): 'OUTPUT_PATH': self.temp_path, 'CACHE_PATH': self.temp_cache, }) - pelican = Pelican(settings=settings) - mute(True)(pelican.run)() - # There are four different intersite links: - # - one pointing to the second article from first and third - # - one pointing to the first article from second and third - # - one pointing to the third article from first and second - # - one pointing to a nonexistent from each - # If everything goes well, only the warning about the nonexistent - # article should be printed. Only two articles are not sufficient, - # since the first will always have _context['generated_content'] empty - # (thus skipping the link resolving) and the second will always have it - # non-empty, containing the first, thus always succeeding. - self.assertLogCountEqual( - count=1, - msg="Unable to find '.*\\.rst', skipping url replacement.", - level=logging.WARNING) + with LogCountHandler.examine( + logging.getLogger("pelican.contents") + ) as count_msgs: + pelican = Pelican(settings=settings) + mute(True)(pelican.run)() + # There are four different intersite links: + # - one pointing to the second article from first and third + # - one pointing to the first article from second and third + # - one pointing to the third article from first and second + # - one pointing to a nonexistent from each + # If everything goes well, only the warning about the nonexistent + # article should be printed. Only two articles are not sufficient, + # since the first will always have _context['generated_content'] empty + # (thus skipping the link resolving) and the second will always have it + # non-empty, containing the first, thus always succeeding. + count_msgs( + count=3, + msg="Unable to find '.*[.]rst', skipping url replacement.", + as_regex=True, + level=logging.WARNING + ) def test_md_extensions_deprecation(self): """Test that a warning is issued if MD_EXTENSIONS is used""" - settings = read_settings(path=None, override={ - 'PATH': INPUT_PATH, - 'OUTPUT_PATH': self.temp_path, - 'CACHE_PATH': self.temp_cache, - 'MD_EXTENSIONS': {}, - }) - pelican = Pelican(settings=settings) - mute(True)(pelican.run)() - self.assertLogCountEqual( - count=1, - msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.", - level=logging.WARNING) + with LogCountHandler.examine( + logging.getLogger("pelican.settings") + ) as count_msgs: + settings = read_settings(path=None, override={ + 'PATH': INPUT_PATH, + 'OUTPUT_PATH': self.temp_path, + 'CACHE_PATH': self.temp_cache, + 'MD_EXTENSIONS': {}, + }) + pelican = Pelican(settings=settings) + mute(True)(pelican.run)() + count_msgs( + count=1, + msg="MD_EXTENSIONS is deprecated use MARKDOWN instead.", + level=logging.WARNING + ) def test_parse_errors(self): # Verify that just an error is printed and the application doesn't @@ -251,12 +270,17 @@ def test_parse_errors(self): 'OUTPUT_PATH': self.temp_path, 'CACHE_PATH': self.temp_cache, }) - pelican = Pelican(settings=settings) - mute(True)(pelican.run)() - self.assertLogCountEqual( - count=1, - msg="Could not process .*parse_error.rst", - level=logging.ERROR) + with LogCountHandler.examine( + logging.getLogger("pelican.generators") + ) as count_msgs: + pelican = Pelican(settings=settings) + mute(True)(pelican.run)() + count_msgs( + count=1, + msg="Could not process .*parse_error.rst", + as_regex=True, + level=logging.ERROR + ) def test_module_load(self): """Test loading via python -m pelican --help displays the help""" diff --git a/pelican/tests/test_settings.py b/pelican/tests/test_settings.py index 0f630ad55a..9ff37b8ddf 100644 --- a/pelican/tests/test_settings.py +++ b/pelican/tests/test_settings.py @@ -1,5 +1,6 @@ import copy import locale +import logging import os from os.path import abspath, dirname, join @@ -16,6 +17,7 @@ class TestSettingsConfiguration(unittest.TestCase): append new values to the settings (if any), and apply basic settings optimizations. """ + def setUp(self): self.old_locale = locale.setlocale(locale.LC_ALL) locale.setlocale(locale.LC_ALL, 'C') @@ -304,3 +306,19 @@ def test_deprecated_slug_substitutions_from_file(self): [(r'C\+\+', 'cpp')] + self.settings['SLUG_REGEX_SUBSTITUTIONS']) self.assertNotIn('SLUG_SUBSTITUTIONS', settings) + + def test_log_filter_setting(self): + read_settings(None, override=dict( + LOG_FILTER=[ + (logging.INFO, "Ignored Info message."), + (logging.WARNING, "Ignored Warning message."), + (logging.ERROR, "Ignored Error message.") + ] + )) + + with self.assertRaises(ValueError): + read_settings(None, override=dict( + LOG_FILTER=[ + ("Wrong order.", logging.ERROR) + ] + )) diff --git a/pelican/tests/test_utils.py b/pelican/tests/test_utils.py index e175872676..dda583c290 100644 --- a/pelican/tests/test_utils.py +++ b/pelican/tests/test_utils.py @@ -3,6 +3,7 @@ import os import shutil import time +import unittest from datetime import timezone from sys import platform from tempfile import mkdtemp @@ -16,12 +17,15 @@ from pelican.generators import TemplatePagesGenerator from pelican.readers import Readers from pelican.settings import read_settings -from pelican.tests.support import (LoggedTestCase, get_article, - locale_available, unittest) +from pelican.tests.support import ( + LogCountHandler, + get_article, + locale_available +) from pelican.writers import Writer -class TestUtils(LoggedTestCase): +class TestUtils(unittest.TestCase): _new_attribute = 'new_value' def setUp(self): @@ -39,13 +43,15 @@ def _old_attribute(): return None def test_deprecated_attribute(self): - value = self._old_attribute - self.assertEqual(value, self._new_attribute) - self.assertLogCountEqual( - count=1, - msg=('_old_attribute has been deprecated since 3.1.0 and will be ' - 'removed by version 4.1.3. Use _new_attribute instead'), - level=logging.WARNING) + with LogCountHandler.examine(logging.getLogger('pelican')) as count_msgs: + value = self._old_attribute + self.assertEqual(value, self._new_attribute) + count_msgs( + count=1, + msg=('_old_attribute has been deprecated since 3.1.0 and will be ' + 'removed by version 4.1.3. Use _new_attribute instead'), + level=logging.WARNING + ) def test_get_date(self): # valid ones @@ -271,8 +277,8 @@ def test_truncate_html_words(self): '

' + 'word ' * 20 + 'marker

') self.assertEqual( utils.truncate_html_words( - '' + 'word ' * 100 + '', 20, - 'marker'), + '' + 'word ' * 100 + '', 20, + 'marker'), '' + 'word ' * 20 + 'marker') self.assertEqual( utils.truncate_html_words('
' + 'word ' * 100, 20, @@ -417,10 +423,6 @@ def create_file(name, content): with open(name, 'w') as f: f.write(content) - # disable logger filter - from pelican.utils import logger - logger.disable_filter() - # create a temp "project" dir root = mkdtemp() content_path = os.path.join(root, 'content') @@ -440,39 +442,40 @@ def create_file(name, content): os.utime(config_file, (t, t)) settings = read_settings(config_file) - watcher = utils.FileSystemWatcher(config_file, Readers, settings) - # should get a warning for static not not existing - self.assertLogCountEqual(1, 'Watched path does not exist: .*static') - - # create it and update config - os.mkdir(static_path) - watcher.update_watchers(settings) - # no new warning - self.assertLogCountEqual(1, 'Watched path does not exist: .*static') - - # get modified values - modified = watcher.check() - # empty theme and content should raise warnings - self.assertLogCountEqual(1, 'No valid files found in content') - self.assertLogCountEqual(1, 'Empty theme folder. Using `basic` theme') - - self.assertIsNone(modified['content']) # empty - self.assertIsNone(modified['theme']) # empty - self.assertIsNone(modified['[static]static']) # empty - self.assertTrue(modified['settings']) # modified, first time - - # add a content, add file to theme and check again - create_file(os.path.join(content_path, 'article.md'), - 'Title: test\n' - 'Date: 01-01-2020') - - create_file(os.path.join(theme_path, 'dummy'), - 'test') - - modified = watcher.check() - # no new warning - self.assertLogCountEqual(1, 'No valid files found in content') - self.assertLogCountEqual(1, 'Empty theme folder. Using `basic` theme') + with LogCountHandler.examine(logging.getLogger('pelican')) as count_msgs: + watcher = utils.FileSystemWatcher(config_file, Readers, settings) + # should get a warning for static not not existing + count_msgs(1, 'Watched path does not exist: .*static', as_regex=True) + + # create it and update config + os.mkdir(static_path) + watcher.update_watchers(settings) + # no new warning + count_msgs(1, 'Watched path does not exist: .*static', as_regex=True) + + # get modified values + modified = watcher.check() + # empty theme and content should raise warnings + count_msgs(1, 'No valid files found in content') + count_msgs(1, 'Empty theme folder. Using `basic` theme') + + self.assertIsNone(modified['content']) # empty + self.assertIsNone(modified['theme']) # empty + self.assertIsNone(modified['[static]static']) # empty + self.assertTrue(modified['settings']) # modified, first time + + # add a content, add file to theme and check again + create_file(os.path.join(content_path, 'article.md'), + 'Title: test\n' + 'Date: 01-01-2020') + + create_file(os.path.join(theme_path, 'dummy'), + 'test') + + modified = watcher.check() + # no new warning + count_msgs(1, 'No valid files found in content') + count_msgs(1, 'Empty theme folder. Using `basic` theme') self.assertIsNone(modified['[static]static']) # empty self.assertFalse(modified['settings']) # not modified @@ -495,7 +498,6 @@ def create_file(name, content): self.assertFalse(modified['theme']) # not modified # cleanup - logger.enable_filter() shutil.rmtree(root) def test_clean_output_dir(self):