Skip to content

Commit

Permalink
Merge pull request #41 from hivesolutions/joamag/logstash
Browse files Browse the repository at this point in the history
Logstash handling support
  • Loading branch information
joamag authored Apr 22, 2024
2 parents 7b0e677 + 81b9567 commit da3efb6
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

* Support for `.env` file loading
* LogstashHandler support using `LOGGING_LOGSTASH` and `LOGSTASH_BASE_URL`

### Changed

Expand Down
2 changes: 1 addition & 1 deletion src/netius/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
from .container import Container, ContainerServer
from .errors import NetiusError, RuntimeError, StopError, PauseError, WakeupError,\
DataError, ParserError, GeneratorError, SecurityError, NotImplemented, AssertionError
from .log import SILENT, rotating_handler, smtp_handler
from .log import SILENT, MAX_LENGTH_LOGSTASH, TIMEOUT_LOGSTASH, LogstashHandler, rotating_handler, smtp_handler
from .observer import Observable
from .poll import Poll, EpollPoll, KqueuePoll, PollPoll, SelectPoll
from .protocol import Protocol, DatagramProtocol, StreamProtocol
Expand Down
31 changes: 16 additions & 15 deletions src/netius/base/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ def __init__(self, name = None, handlers = None, *args, **kwargs):
poll = cls.test_poll()
self.name = name or self.__class__.__name__
self.handler_stream = logging.StreamHandler()
self.handlers = handlers or (self.handler_stream,)
self.handler_remote = log.LogstashHandler() if log.LogstashHandler.is_ready() else None
self.handlers = handlers or (self.handler_stream,) + ((self.handler_remote,) if self.handler_remote else ())
self.level = kwargs.get("level", logging.INFO)
self.diag = kwargs.get("diag", False)
self.middleware = kwargs.get("middleware", [])
Expand Down Expand Up @@ -2903,25 +2904,25 @@ def is_critical(self):
if not self.logger: return False
return self.logger.isEnabledFor(logging.CRITICAL)

def debug(self, object):
def debug(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.DEBUG)
self.log(object, level = logging.DEBUG, **kwargs)

def info(self, object):
def info(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.INFO)
self.log(object, level = logging.INFO, **kwargs)

def warning(self, object):
def warning(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.WARNING)
self.log(object, level = logging.WARNING, **kwargs)

def error(self, object):
def error(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.ERROR)
self.log(object, level = logging.ERROR, **kwargs)

def critical(self, object):
def critical(self, object, **kwargs):
if not logging: return
self.log(object, level = logging.CRITICAL)
self.log(object, level = logging.CRITICAL, **kwargs)

def log_stack(self, method = None, info = True):
if not method: method = self.info
Expand All @@ -2938,19 +2939,19 @@ def log(self, *args, **kwargs):
if legacy.PYTHON_3: return self.log_python_3(*args, **kwargs)
else: return self.log_python_2(*args, **kwargs)

def log_python_3(self, object, level = logging.INFO):
def log_python_3(self, object, level = logging.INFO, **kwargs):
is_str = isinstance(object, legacy.STRINGS)
try: message = str(object) if not is_str else object
except Exception: message = str(object)
if not self.logger: return
self.logger.log(level, message)
self.logger.log(level, message, **kwargs)

def log_python_2(self, object, level = logging.INFO):
def log_python_2(self, object, level = logging.INFO, **kwargs):
is_str = isinstance(object, legacy.STRINGS)
try: message = unicode(object) if not is_str else object #@UndefinedVariable
except Exception: message = str(object).decode("utf-8", "ignore")
if not self.logger: return
self.logger.log(level, message)
self.logger.log(level, message, **kwargs)

def build_poll(self):
# retrieves the reference to the parent class associated with
Expand Down
5 changes: 5 additions & 0 deletions src/netius/base/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ def info_dict(self, full = False):
)
return info

def log_dict(self, full = False):
info = self.info_dict(full = full)
info["address"] = str(self.address)
return info

def ssl_certificate(self, binary = False):
if not self.ssl: return None
return self.socket.getpeercert(binary_form = binary)
Expand Down
130 changes: 130 additions & 0 deletions src/netius/base/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,145 @@
__license__ = "Apache License, Version 2.0"
""" The license for the module """

import os
import time
import socket
import inspect
import datetime
import threading
import collections

import logging.handlers

from . import config

SILENT = logging.CRITICAL + 1
""" The "artificial" silent level used to silent a logger
or an handler, this is used as an utility for debugging
purposes more that a real feature for production systems """

MAX_LENGTH_LOGSTASH = 64
""" The maximum amount of messages that are kept in
memory until they are flushed, avoid a very large
number for this value or else a large amount of memory
may be used for logging purposes """

TIMEOUT_LOGSTASH = 30.0
""" The maximum amount of time in between flush
operations in the logstash handler """

class LogstashHandler(logging.Handler):

def __init__(
self,
level=logging.NOTSET,
max_length=MAX_LENGTH_LOGSTASH,
timeout=TIMEOUT_LOGSTASH,
api=None,
):
logging.Handler.__init__(self, level=level)
if not api:
api = self._build_api()
self.messages = collections.deque()
self.max_length = max_length
self.timeout = timeout
self.api = api
self._last_flush = time.time()

@classmethod
def is_ready(cls):
try:
import logstash
except ImportError:
return False
if not config.conf("LOGGING_LOGSTASH", False, cast=bool):
return False
return True

def emit(self, record):
from . import common

# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api:
return

# retrieves the current date time value as an utc value
# and then formats it according to the provided format string
message = self.format(record)

# creates the log record structure that is going to be sent
# to the logstash infra-structure, this should represent a
# proper structure ready to be debugged
now = datetime.datetime.utcnow()
now_s = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

# tries to build the right version of the meta information
# present in the record using either the structure `meta`
# value or the lazy evaluation of the `meta_c` method
if hasattr(record, "meta"):
record.meta = record.meta
elif hasattr(record, "meta_c"):
record.meta = record.meta_c()

log = {
"@timestamp": now_s,
"message_fmt": message,
"logger": record.name,
"message": record.message,
"level": record.levelname,
"path": record.pathname,
"lineno": record.lineno,
"host": socket.gethostname(),
"hostname": socket.gethostname(),
"tid": threading.current_thread().ident,
"pid": os.getpid() if hasattr(os, "getpid") else -1,
"agent": common.NAME,
"version": common.VERSION,
"identifier": common.IDENTIFIER_SHORT,
"identifier_long": common.IDENTIFIER_LONG,
"netius": True
}
if hasattr(record, "meta"):
log["meta"] = record.meta

self.messages.append(log)
should_flush = len(self.messages) >= self.max_length
should_flush = should_flush or time.time() - self._last_flush > self.timeout
if should_flush:
self.flush()

def flush(self, force=False):
logging.Handler.flush(self)

# verifies if the API structure is defined and set and if
# that's not the case returns immediately
if not self.api:
return

# in case the force flag is not set and there are no messages
# to be flushed returns immediately (nothing to be done)
messages = self.messages
if not messages and not force:
return

# posts the complete set of messages to logstash and then clears the messages
# and updates the last flush time
self.api.log_bulk(messages, tag="default")
self.messages = []
self.last_flush = time.time()

def _build_api(self):
try:
import logstash
except ImportError:
return None

if not config.conf("LOGGING_LOGSTASH", False, cast=bool):
return None

return logstash.API()

def rotating_handler(
path = "netius.log",
max_bytes = 1048576,
Expand Down
6 changes: 3 additions & 3 deletions src/netius/servers/smtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def send_smtp_base(self, code, message = "", delay = True, callback = None):
base = "%d %s" % (code, message)
data = base + "\r\n"
count = self.send(data, delay = delay, callback = callback)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))
return count

def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback = None):
Expand All @@ -149,7 +149,7 @@ def send_smtp_lines(self, code, message = "", lines = (), delay = True, callback
lines_s.append("%d %s" % (code, tail))
data = "\r\n".join(lines_s) + "\r\n"
count = self.send(data, delay = delay, callback = callback)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))
return count

def ready(self):
Expand Down Expand Up @@ -291,7 +291,7 @@ def on_line(self, code, message, is_final = True):
# "joins" the code and the message part of the message into the base
# string and then uses this value to print some debug information
base = "%s %s" % (code, message)
self.owner.debug(base)
self.owner.debug(base, extra = dict(meta_c = lambda: self.log_dict()))

# calls the proper top level owner based line information handler that
# should ignore any usages as the connection will take care of the proper
Expand Down

0 comments on commit da3efb6

Please sign in to comment.