diff --git a/CHANGELOG.md b/CHANGELOG.md index d92a0677..9524f060 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added * Support for `.env` file loading +* LogstashHandler support using `LOGGING_LOGSTASH` and `LOGSTASH_BASE_URL` ### Changed diff --git a/src/netius/base/__init__.py b/src/netius/base/__init__.py index a2291fc4..17f39480 100644 --- a/src/netius/base/__init__.py +++ b/src/netius/base/__init__.py @@ -73,7 +73,7 @@ from .container import Container, ContainerServer from .errors import NetiusError, RuntimeError, StopError, PauseError, WakeupError,\ DataError, ParserError, GeneratorError, SecurityError, NotImplemented, AssertionError -from .log import SILENT, rotating_handler, smtp_handler +from .log import SILENT, MAX_LENGTH_LOGSTASH, TIMEOUT_LOGSTASH, LogstashHandler, rotating_handler, smtp_handler from .observer import Observable from .poll import Poll, EpollPoll, KqueuePoll, PollPoll, SelectPoll from .protocol import Protocol, DatagramProtocol, StreamProtocol diff --git a/src/netius/base/common.py b/src/netius/base/common.py index 216cba57..4eb1b708 100644 --- a/src/netius/base/common.py +++ b/src/netius/base/common.py @@ -323,7 +323,8 @@ def __init__(self, name = None, handlers = None, *args, **kwargs): poll = cls.test_poll() self.name = name or self.__class__.__name__ self.handler_stream = logging.StreamHandler() - self.handlers = handlers or (self.handler_stream,) + self.handler_remote = log.LogstashHandler() if log.LogstashHandler.is_ready() else None + self.handlers = handlers or (self.handler_stream,) + ((self.handler_remote,) if self.handler_remote else ()) self.level = kwargs.get("level", logging.INFO) self.diag = kwargs.get("diag", False) self.middleware = kwargs.get("middleware", []) @@ -2903,25 +2904,25 @@ def is_critical(self): if not self.logger: return False return self.logger.isEnabledFor(logging.CRITICAL) - def debug(self, object): + def debug(self, object, **kwargs): if not logging: return - self.log(object, level = logging.DEBUG) + self.log(object, level = logging.DEBUG, **kwargs) - def info(self, object): + def info(self, object, **kwargs): if not logging: return - self.log(object, level = logging.INFO) + self.log(object, level = logging.INFO, **kwargs) - def warning(self, object): + def warning(self, object, **kwargs): if not logging: return - self.log(object, level = logging.WARNING) + self.log(object, level = logging.WARNING, **kwargs) - def error(self, object): + def error(self, object, **kwargs): if not logging: return - self.log(object, level = logging.ERROR) + self.log(object, level = logging.ERROR, **kwargs) - def critical(self, object): + def critical(self, object, **kwargs): if not logging: return - self.log(object, level = logging.CRITICAL) + self.log(object, level = logging.CRITICAL, **kwargs) def log_stack(self, method = None, info = True): if not method: method = self.info @@ -2938,19 +2939,19 @@ def log(self, *args, **kwargs): if legacy.PYTHON_3: return self.log_python_3(*args, **kwargs) else: return self.log_python_2(*args, **kwargs) - def log_python_3(self, object, level = logging.INFO): + def log_python_3(self, object, level = logging.INFO, **kwargs): is_str = isinstance(object, legacy.STRINGS) try: message = str(object) if not is_str else object except Exception: message = str(object) if not self.logger: return - self.logger.log(level, message) + self.logger.log(level, message, **kwargs) - def log_python_2(self, object, level = logging.INFO): + def log_python_2(self, object, level = logging.INFO, **kwargs): is_str = isinstance(object, legacy.STRINGS) try: message = unicode(object) if not is_str else object #@UndefinedVariable except Exception: message = str(object).decode("utf-8", "ignore") if not self.logger: return - self.logger.log(level, message) + self.logger.log(level, message, **kwargs) def build_poll(self): # retrieves the reference to the parent class associated with diff --git a/src/netius/base/conn.py b/src/netius/base/conn.py index 7a6ea2bd..60ad7427 100644 --- a/src/netius/base/conn.py +++ b/src/netius/base/conn.py @@ -633,6 +633,11 @@ def info_dict(self, full = False): ) return info + def log_dict(self, full = False): + info = self.info_dict(full = full) + info["address"] = str(self.address) + return info + def ssl_certificate(self, binary = False): if not self.ssl: return None return self.socket.getpeercert(binary_form = binary) diff --git a/src/netius/base/log.py b/src/netius/base/log.py index b1c67f71..c5e09c2e 100644 --- a/src/netius/base/log.py +++ b/src/netius/base/log.py @@ -37,15 +37,145 @@ __license__ = "Apache License, Version 2.0" """ The license for the module """ +import os +import time +import socket import inspect +import datetime +import threading +import collections import logging.handlers +from . import config + SILENT = logging.CRITICAL + 1 """ The "artificial" silent level used to silent a logger or an handler, this is used as an utility for debugging purposes more that a real feature for production systems """ +MAX_LENGTH_LOGSTASH = 64 +""" The maximum amount of messages that are kept in +memory until they are flushed, avoid a very large +number for this value or else a large amount of memory +may be used for logging purposes """ + +TIMEOUT_LOGSTASH = 30.0 +""" The maximum amount of time in between flush +operations in the logstash handler """ + +class LogstashHandler(logging.Handler): + + def __init__( + self, + level=logging.NOTSET, + max_length=MAX_LENGTH_LOGSTASH, + timeout=TIMEOUT_LOGSTASH, + api=None, + ): + logging.Handler.__init__(self, level=level) + if not api: + api = self._build_api() + self.messages = collections.deque() + self.max_length = max_length + self.timeout = timeout + self.api = api + self._last_flush = time.time() + + @classmethod + def is_ready(cls): + try: + import logstash + except ImportError: + return False + if not config.conf("LOGGING_LOGSTASH", False, cast=bool): + return False + return True + + def emit(self, record): + from . import common + + # verifies if the API structure is defined and set and if + # that's not the case returns immediately + if not self.api: + return + + # retrieves the current date time value as an utc value + # and then formats it according to the provided format string + message = self.format(record) + + # creates the log record structure that is going to be sent + # to the logstash infra-structure, this should represent a + # proper structure ready to be debugged + now = datetime.datetime.utcnow() + now_s = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + # tries to build the right version of the meta information + # present in the record using either the structure `meta` + # value or the lazy evaluation of the `meta_c` method + if hasattr(record, "meta"): + record.meta = record.meta + elif hasattr(record, "meta_c"): + record.meta = record.meta_c() + + log = { + "@timestamp": now_s, + "message_fmt": message, + "logger": record.name, + "message": record.message, + "level": record.levelname, + "path": record.pathname, + "lineno": record.lineno, + "host": socket.gethostname(), + "hostname": socket.gethostname(), + "tid": threading.current_thread().ident, + "pid": os.getpid() if hasattr(os, "getpid") else -1, + "agent": common.NAME, + "version": common.VERSION, + "identifier": common.IDENTIFIER_SHORT, + "identifier_long": common.IDENTIFIER_LONG, + "netius": True + } + if hasattr(record, "meta"): + log["meta"] = record.meta + + self.messages.append(log) + should_flush = len(self.messages) >= self.max_length + should_flush = should_flush or time.time() - self._last_flush > self.timeout + if should_flush: + self.flush() + + def flush(self, force=False): + logging.Handler.flush(self) + + # verifies if the API structure is defined and set and if + # that's not the case returns immediately + if not self.api: + return + + # in case the force flag is not set and there are no messages + # to be flushed returns immediately (nothing to be done) + messages = self.messages + if not messages and not force: + return + + # posts the complete set of messages to logstash and then clears the messages + # and updates the last flush time + self.api.log_bulk(messages, tag="default") + self.messages = [] + self.last_flush = time.time() + + def _build_api(self): + try: + import logstash + except ImportError: + return None + + if not config.conf("LOGGING_LOGSTASH", False, cast=bool): + return None + + return logstash.API() + def rotating_handler( path = "netius.log", max_bytes = 1048576, diff --git a/src/netius/servers/smtp.py b/src/netius/servers/smtp.py index d0d197bc..81650256 100644 --- a/src/netius/servers/smtp.py +++ b/src/netius/servers/smtp.py @@ -136,7 +136,7 @@ def send_smtp_base(self, code, message = "", delay = True, callback = None): base = "%d %s" % (code, message) data = base + "\r\n" count = self.send(data, delay = delay, callback = callback) - self.owner.debug(base) + self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict())) return count def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback = None): @@ -149,7 +149,7 @@ def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback lines_s.append("%d %s" % (code, tail)) data = "\r\n".join(lines_s) + "\r\n" count = self.send(data, delay = delay, callback = callback) - self.owner.debug(base) + self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict())) return count def ready(self): @@ -291,7 +291,7 @@ def on_line(self, code, message, is_final = True): # "joins" the code and the message part of the message into the base # string and then uses this value to print some debug information base = "%s %s" % (code, message) - self.owner.debug(base) + self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict())) # calls the proper top level owner based line information handler that # should ignore any usages as the connection will take care of the proper