Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash handling support #41

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

* Support for `.env` file loading
* LogstashHandler support using `LOGGING_LOGSTASH` and `LOGSTASH_BASE_URL`

### Changed

Expand Down
2 changes: 1 addition & 1 deletion src/netius/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
from .container import Container, ContainerServer
from .errors import NetiusError, RuntimeError, StopError, PauseError, WakeupError,\
DataError, ParserError, GeneratorError, SecurityError, NotImplemented, AssertionError
from .log import SILENT, rotating_handler, smtp_handler
from .log import SILENT, MAX_LENGTH_LOGSTASH, TIMEOUT_LOGSTASH, LogstashHandler, rotating_handler, smtp_handler
from .observer import Observable
from .poll import Poll, EpollPoll, KqueuePoll, PollPoll, SelectPoll
from .protocol import Protocol, DatagramProtocol, StreamProtocol
Expand Down
31 changes: 16 additions & 15 deletions src/netius/base/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ def __init__(self, name = None, handlers = None, *args, **kwargs):
poll = cls.test_poll()
self.name = name or self.__class__.__name__
self.handler_stream = logging.StreamHandler()
self.handlers = handlers or (self.handler_stream,)
self.handler_remote = log.LogstashHandler() if log.LogstashHandler.is_ready() else None
self.handlers = handlers or (self.handler_stream,) + ((self.handler_remote,) if self.handler_remote else ())
self.level = kwargs.get("level", logging.INFO)
self.diag = kwargs.get("diag", False)
self.middleware = kwargs.get("middleware", [])
Expand Down Expand Up @@ -2903,25 +2904,25 @@ def is_critical(self):
if not self.logger: return False
return self.logger.isEnabledFor(logging.CRITICAL)

def debug(self, object):
def debug(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.DEBUG)
self.log(object, level = logging.DEBUG, **kwargs)

def info(self, object):
def info(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.INFO)
self.log(object, level = logging.INFO, **kwargs)

def warning(self, object):
def warning(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.WARNING)
self.log(object, level = logging.WARNING, **kwargs)

def error(self, object):
def error(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.ERROR)
self.log(object, level = logging.ERROR, **kwargs)

def critical(self, object):
def critical(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.CRITICAL)
self.log(object, level = logging.CRITICAL, **kwargs)

def log_stack(self, method = None, info = True):
if not method: method = self.info
Expand All @@ -2938,19 +2939,19 @@ def log(self, *args, **kwargs):
if legacy.PYTHON_3: return self.log_python_3(*args, **kwargs)
else: return self.log_python_2(*args, **kwargs)

def log_python_3(self, object, level = logging.INFO):
def log_python_3(self, object, level = logging.INFO, **kwargs):
is_str = isinstance(object, legacy.STRINGS)
try: message = str(object) if not is_str else object
except Exception: message = str(object)
if not self.logger: return
self.logger.log(level, message)
self.logger.log(level, message, **kwargs)

def log_python_2(self, object, level = logging.INFO):
def log_python_2(self, object, level = logging.INFO, **kwargs):
is_str = isinstance(object, legacy.STRINGS)
try: message = unicode(object) if not is_str else object #@UndefinedVariable
except Exception: message = str(object).decode("utf-8", "ignore")
if not self.logger: return
self.logger.log(level, message)
self.logger.log(level, message, **kwargs)

def build_poll(self):
# retrieves the reference to the parent class associated with
Expand Down
5 changes: 5 additions & 0 deletions src/netius/base/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ def info_dict(self, full = False):
)
return info

def log_dict(self, full = False):
info = self.info_dict(full = full)
info["address"] = str(self.address)
return info

def ssl_certificate(self, binary = False):
if not self.ssl: return None
return self.socket.getpeercert(binary_form = binary)
Expand Down
130 changes: 130 additions & 0 deletions src/netius/base/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,145 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import os
import time
import socket
import inspect
import datetime
import threading
import collections

import logging.handlers

from . import config

SILENT = logging.CRITICAL + 1
""" The "artificial" silent level used to silent a logger
or an handler, this is used as an utility for debugging
purposes more that a real feature for production systems """

MAX_LENGTH_LOGSTASH = 64
""" The maximum amount of messages that are kept in
memory until they are flushed, avoid a very large
number for this value or else a large amount of memory
may be used for logging purposes """

TIMEOUT_LOGSTASH = 30.0
""" The maximum amount of time in between flush
operations in the logstash handler """

class LogstashHandler(logging.Handler):

def __init__(
self,
level=logging.NOTSET,
max_length=MAX_LENGTH_LOGSTASH,
timeout=TIMEOUT_LOGSTASH,
api=None,
):
logging.Handler.__init__(self, level=level)
if not api:
api = self._build_api()
self.messages = collections.deque()
self.max_length = max_length
self.timeout = timeout
self.api = api
self._last_flush = time.time()

@classmethod
def is_ready(cls):
try:
import logstash
except ImportError:
return False
if not config.conf("LOGGING_LOGSTASH", False, cast=bool):
return False
return True

def emit(self, record):
from . import common

# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api:
return

# retrieves the current date time value as an utc value
# and then formats it according to the provided format string
message = self.format(record)

# creates the log record structure that is going to be sent
# to the logstash infra-structure, this should represent a
# proper structure ready to be debugged
now = datetime.datetime.utcnow()
now_s = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

# tries to build the right version of the meta information
# present in the record using either the structure `meta`
# value or the lazy evaluation of the `meta_c` method
if hasattr(record, "meta"):
record.meta = record.meta
elif hasattr(record, "meta_c"):
record.meta = record.meta_c()

log = {
"@timestamp": now_s,
"message_fmt": message,
"logger": record.name,
"message": record.message,
"level": record.levelname,
"path": record.pathname,
"lineno": record.lineno,
"host": socket.gethostname(),
"hostname": socket.gethostname(),
"tid": threading.current_thread().ident,
"pid": os.getpid() if hasattr(os, "getpid") else -1,
"agent": common.NAME,
"version": common.VERSION,
"identifier": common.IDENTIFIER_SHORT,
"identifier_long": common.IDENTIFIER_LONG,
"netius": True
}
if hasattr(record, "meta"):
log["meta"] = record.meta

self.messages.append(log)
should_flush = len(self.messages) >= self.max_length
should_flush = should_flush or time.time() - self._last_flush > self.timeout
if should_flush:
self.flush()

def flush(self, force=False):
logging.Handler.flush(self)

# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api:
return

# in case the force flag is not set and there are no messages
# to be flushed returns immediately (nothing to be done)
messages = self.messages
if not messages and not force:
return

# posts the complete set of messages to logstash and then clears the messages
# and updates the last flush time
self.api.log_bulk(messages, tag="default")
self.messages = []
self.last_flush = time.time()

def _build_api(self):
try:
import logstash
except ImportError:
return None

if not config.conf("LOGGING_LOGSTASH", False, cast=bool):
return None

return logstash.API()

def rotating_handler(
path = "netius.log",
max_bytes = 1048576,
Expand Down
6 changes: 3 additions & 3 deletions src/netius/servers/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def send_smtp_base(self, code, message = "", delay = True, callback = None):
base = "%d %s" % (code, message)
data = base + "\r\n"
count = self.send(data, delay = delay, callback = callback)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))
return count

def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback = None):
Expand All @@ -149,7 +149,7 @@ def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback
lines_s.append("%d %s" % (code, tail))
data = "\r\n".join(lines_s) + "\r\n"
count = self.send(data, delay = delay, callback = callback)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))
return count

def ready(self):
Expand Down Expand Up @@ -291,7 +291,7 @@ def on_line(self, code, message, is_final = True):
# "joins" the code and the message part of the message into the base
# string and then uses this value to print some debug information
base = "%s %s" % (code, message)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))

# calls the proper top level owner based line information handler that
# should ignore any usages as the connection will take care of the proper
Expand Down