diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 5a740bc85..aed0642e0 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -6,7 +6,6 @@ name: "Unit tests" on: push: - branches: [develop, maintenance, master] pull_request: branches: [develop, maintenance] paths-ignore: diff --git a/CHANGELOG.md b/CHANGELOG.md index 160102216..87716a454 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ CHANGELOG - `intelmq.lib.harmonization`: - Changes signature and names of `DateTime` conversion functions for consistency, backwards compatible (PR#2329 by Filip Pokorný). - Ensure rejecting URLs with leading whitespaces after changes in CPython (fixes [#2377](https://github.com/certtools/intelmq/issues/2377)) +- `intelmq.lib.bot.Bot`: Allow setting the parameters via parameter on bot initialization. ### Development - CI: pin the Codespell version to omit troubles caused by its new releases (PR #2379). diff --git a/debian/control b/debian/control index f77640cf6..4e71ade05 100644 --- a/debian/control +++ b/debian/control @@ -22,7 +22,8 @@ Build-Depends: debhelper (>= 4.1.16), python3-tz, quilt, rsync, - safe-rm + safe-rm, + python3-pytest-cov X-Python3-Version: >= 3.7 Standards-Version: 3.9.6 Homepage: https://github.com/certtools/intelmq/ diff --git a/docs/dev/library.rst b/docs/dev/library.rst new file mode 100644 index 000000000..8553e79f8 --- /dev/null +++ b/docs/dev/library.rst @@ -0,0 +1,74 @@ +.. + SPDX-FileCopyrightText: 2023 Bundesamt für Sicherheit in der Informationstechnik (BSI) + SPDX-License-Identifier: AGPL-3.0-or-later + +########################## +Running IntelMQ as Library +########################## + +.. contents:: + +************ +Introduction +************ + +The feature is specified in `IEP007 `_. + +********** +Quickstart +********** + +First, import the Python module and a helper. More about the ``BotLibSettings`` later. + +.. code-block:: python + + from intelmq.lib.bot import BotLibSettings + from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot + +Then we need to initialize the bot's instance. +We pass two parameters: +* ``bot_id``: The id of the bot +* ``settings``: A Python dictionary of runtime configuration parameters, see :ref:`runtime-configuration`. + The bot first loads the runtime configuration file if it exists. + Then we update them with the ``BotLibSettings`` which are some accumulated settings disabling the logging to files and configure the pipeline so that we can send and receive messages directly to/from the bot. + Last by not least, the actual bot parameters, taking the highest priority. + +.. code-block:: python + + domain_suffix = DomainSuffixExpertBot('domain-suffix', # bot id + settings=BotLibSettings | { + 'field': 'fqdn', + 'suffix_file': '/usr/share/publicsuffix/public_suffix_list.dat'} + +As the bot is not fully initialized, we can process messages now. +Inserting a message as dictionary: + +.. code-block:: python + + queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) + +The return value is a dictionary of queues, e.g. the output queue and the error queue. +More details below. + +The methods accepts multiple messages as positional argument: + +.. code-block:: python + + domain_suffix.process_message({'source.fqdn': 'www.example.com'}, {'source.fqdn': 'www.example.net'}) + domain_suffix.process_message(*[{'source.fqdn': 'www.example.com'}, {'source.fqdn': 'www.example.net'}]) + + +Select the output queue (as defined in `destination_queues`), first message, access the field 'source.domain_suffix': + +.. code-block:: python + + >>> output['output'][0]['source.domain_suffix'] + 'com' + +************* +Configuration +************* + +Configuration files are not required to run IntelMQ as library. +Contrary to IntelMQ normal behavior, if the files ``runtime.yaml`` and ``harmonization.conf`` do not exist, IntelMQ won't raise any errors. +For the harmonization configuration, internal defaults are loaded. diff --git a/docs/index.rst b/docs/index.rst index 3990b4592..1fbf7aafe 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -74,6 +74,7 @@ Getting involved :maxdepth: 1 dev/guide + dev/library dev/data-format dev/harmonization-fields dev/release-procedure diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 7b4085b74..5ad559050 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -27,8 +27,10 @@ import types import warnings from collections import defaultdict +from copy import deepcopy from datetime import datetime, timedelta from typing import Any, List, Optional, Union, Tuple +from pkg_resources import resource_filename import intelmq.lib.message as libmessage from intelmq import (DEFAULT_LOGGING_PATH, @@ -38,7 +40,7 @@ from intelmq.lib import cache, exceptions, utils from intelmq.lib.pipeline import PipelineFactory, Pipeline from intelmq.lib.utils import RewindableFileHandle, base64_decode -from intelmq.lib.datatypes import BotType +from intelmq.lib.datatypes import BotType, Dict39 __all__ = ['Bot', 'CollectorBot', 'ParserBot', 'OutputBot', 'ExpertBot'] ALLOWED_SYSTEM_PARAMETERS = {'enabled', 'run_mode', 'group', 'description', 'module', 'name'} @@ -55,6 +57,14 @@ class Bot: __source_pipeline = None __destination_pipeline = None __log_buffer: List[tuple] = [] + # runtime_file + __runtime_settings: Optional[dict] = None + # settings provided via parameter + __settings: Optional[dict] = None + # if messages should be serialized/unserialized coming from/sending to the pipeline + __pipeline_serialize_messages = True + # if the Bot is running by itself or called by other procedures + _standalone: bool = False logger = None # Bot is capable of SIGHUP delaying @@ -120,7 +130,8 @@ class Bot: _harmonization: dict = {} def __init__(self, bot_id: str, start: bool = False, sighup_event=None, - disable_multithreading: bool = None): + disable_multithreading: bool = None, settings: Optional[dict] = None, + source_queue: Optional[str] = None, standalone: bool = False): self.__log_buffer: list = [] @@ -128,6 +139,12 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.__source_pipeline: Optional[Pipeline] = None self.__destination_pipeline: Optional[Pipeline] = None self.logger = None + if settings is not None: + # make a copy of the settings dict, to no modify the original values of the caller + self.__settings = deepcopy(settings) + if source_queue is not None: + self.source_queue = source_queue + self._standalone = standalone self.__message_counter = {"since": 0, # messages since last logging "start": None, # last login time @@ -140,24 +157,25 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, try: version_info = sys.version.splitlines()[0].strip() - self.__log_buffer.append(('info', - '{bot} initialized with id {id} and intelmq {intelmq}' - ' and python {python} as process {pid}.' - ''.format(bot=self.__class__.__name__, - id=bot_id, python=version_info, - pid=os.getpid(), intelmq=__version__))) - self.__log_buffer.append(('debug', 'Library path: %r.' % __file__)) - if not utils.drop_privileges(): + self.__log('info', + f'{self.__class__.__name__} initialized with id {bot_id} and intelmq {__version__}' + f' and python {version_info} as process {os.getpid()}.') + self.__log('debug', f'Library path: {__file__!r}.') + + # in standalone mode, drop privileges + # In library mode, the calling user can vary, we must not change their user + if self._standalone and not utils.drop_privileges(): raise ValueError('IntelMQ must not run as root. Dropping privileges did not work.') - self.__load_defaults_configuration() - self.__bot_id_full, self.__bot_id, self.__instance_id = self.__check_bot_id(bot_id) + + self.__load_configuration() + if self.__instance_id: self.is_multithreaded = True self.__init_logger() except Exception: - self.__log_buffer.append(('critical', traceback.format_exc())) + self.__log('critical', traceback.format_exc()) self.stop() else: for line in self.__log_buffer: @@ -165,11 +183,13 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, try: self.logger.info('Bot is starting.') - self.__load_runtime_configuration() broker = self.source_pipeline_broker.title() if broker != 'Amqp': self._is_multithreadable = False + # multithreading is not (yet) available in library-mode + elif not self._standalone: + self._is_multithreadable = False """ Multithreading """ if (self.instances_threads > 1 and not self.is_multithreaded and @@ -220,7 +240,9 @@ def handle_sighup_signal_threading(signum: int, self.__reset_total_path_stats() self.init() - if not self.__instance_id: + # only the main thread registers the signal handlers + # in library-mode, handle no signals to not interfere with the caller + if not self.__instance_id and self._standalone: self.__sighup = threading.Event() signal.signal(signal.SIGHUP, self.__handle_sighup_signal) # system calls should not be interrupted, but restarted @@ -278,7 +300,7 @@ def __handle_sighup(self): """ Handle SIGHUP. """ - if not self.__sighup.is_set(): + if not self.__sighup or not self.__sighup.is_set(): return False self.logger.info('Handling SIGHUP, initializing again now.') self.__disconnect_pipelines() @@ -525,6 +547,9 @@ def __sleep(self, remaining: Optional[float] = None, log: bool = True): self.__handle_sighup() remaining = self.rate_limit - (time.time() - starttime) + def __del__(self): + return self.stop(exitcode=0) + def stop(self, exitcode: int = 1): if not self.logger: print('Could not initialize logger, only logging to stdout.') @@ -552,12 +577,16 @@ def stop(self, exitcode: int = 1): self.logger.info("Bot stopped.") logging.shutdown() else: - self.__log_buffer.append(('info', 'Bot stopped.')) + self.__log('info', 'Bot stopped.') self.__print_log_buffer() - if not getattr(self, 'testing', False): + if not getattr(self, 'testing', False) and self._standalone: sys.exit(exitcode) + # in library-mode raise an error if e.g. initialization failed + if exitcode != 0 and not self._standalone and not getattr(self, 'testing', False): + raise ValueError('Bot shutdown. See error messages in logs for details.') + def __print_log_buffer(self): for level, message in self.__log_buffer: if self.logger: @@ -573,16 +602,16 @@ def __check_bot_id(self, name: str) -> Tuple[str, str, str]: if res: if not (res.group(2) and threading.current_thread() == threading.main_thread()): return name, res.group(1), res.group(2)[1:] if res.group(2) else None - self.__log_buffer.append(('error', - "Invalid bot id, must match '" - r"[^0-9a-zA-Z\-]+'.")) + self.__log('error', + "Invalid bot id, must match '" + r"[^0-9a-zA-Z\-]+'.") self.stop() return False, False, False def __connect_pipelines(self): pipeline_args = {key: getattr(self, key) for key in dir(self) if not inspect.ismethod(getattr(self, key)) and (key.startswith('source_pipeline_') or key.startswith('destination_pipeline'))} if self.source_queue is not None: - self.logger.debug("Loading source pipeline and queue %r.", self.source_queue) + self.logger.info("Loading source pipeline and queue %r.", self.source_queue) self.__source_pipeline = PipelineFactory.create(logger=self.logger, direction="source", queues=self.source_queue, @@ -592,10 +621,10 @@ def __connect_pipelines(self): self.__source_pipeline.connect() self.__current_message = None - self.logger.debug("Connected to source queue.") + self.logger.info("Connected to source queue.") if self.destination_queues: - self.logger.debug("Loading destination pipeline and queues %r.", self.destination_queues) + self.logger.info("Loading destination pipeline and queues %r.", self.destination_queues) self.__destination_pipeline = PipelineFactory.create(logger=self.logger, direction="destination", queues=self.destination_queues, @@ -604,9 +633,9 @@ def __connect_pipelines(self): is_multithreaded=self.is_multithreaded) self.__destination_pipeline.connect() - self.logger.debug("Connected to destination queues.") + self.logger.info("Connected to destination queues.") else: - self.logger.debug("No destination queues to load.") + self.logger.info("No destination queues to load.") def __disconnect_pipelines(self): """ Disconnecting pipelines. """ @@ -637,6 +666,8 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, 'but needed') self.logger.debug("Sending message to path %r.", path) + + # Message counter start self.__message_counter["since"] += 1 self.__message_counter["path"][path] += 1 if not self.__message_counter["start"]: @@ -648,10 +679,16 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, self.__message_counter["since"]) self.__message_counter["since"] = 0 self.__message_counter["start"] = datetime.now() + # Message counter end - raw_message = libmessage.MessageFactory.serialize(message) - self.__destination_pipeline.send(raw_message, path=path, - path_permissive=path_permissive) + if self.__pipeline_serialize_messages: + raw_message = libmessage.MessageFactory.serialize(message) + self.__destination_pipeline.send(raw_message, path=path, + path_permissive=path_permissive) + else: + print(f'send_message, message: {message!r}') + self.__destination_pipeline.send(message, path=path, + path_permissive=path_permissive) def receive_message(self) -> libmessage.Message: """ @@ -661,7 +698,7 @@ def receive_message(self) -> libmessage.Message: will be rejected to the pipeline in the first place to get to a clean state. Then, after reloading, the message will be retrieved again. """ - if self.__current_message: + if self.__current_message is not None: self.logger.debug("Reusing existing current message as incoming.") return self.__current_message @@ -677,14 +714,18 @@ def receive_message(self) -> libmessage.Message: # * handle a sighup which happened during blocking read # * re-queue the message before reloading # https://github.com/certtools/intelmq/issues/1438 - if self.__sighup.is_set(): + if self.__sighup and self.__sighup.is_set(): self.__source_pipeline.reject_message() self.__handle_sighup() return self.receive_message() try: - self.__current_message = libmessage.MessageFactory.unserialize(message, - harmonization=self.harmonization) + if self.__pipeline_serialize_messages: + self.__current_message = libmessage.MessageFactory.unserialize(message, + harmonization=self.harmonization) + else: + self.__current_message = message + except exceptions.InvalidKey as exc: # In case a incoming message is malformed an does not conform with the currently # loaded harmonization, stop now as this will happen repeatedly without any change @@ -765,39 +806,46 @@ def _dump_message(self, error_traceback, message: dict): self.logger.debug('Message dumped.') - def __load_defaults_configuration(self): - config = utils.get_global_settings() + def __load_configuration(self): + self.__log('debug', "Loading runtime configuration from %r.", RUNTIME_CONF_FILE) + if not self.__runtime_settings: + try: + self.__runtime_settings = utils.get_runtime() + except ValueError: + if not self._standalone: + self.__log('info', 'Could not load runtime configuration file. ' + 'Continuing, as we in library-mode.') + self.__runtime_settings = {} + else: + raise - setattr(self, 'logging_path', DEFAULT_LOGGING_PATH) + # merge in configuration provided as parameter to init + if self.__settings: + if self.__bot_id not in self.__runtime_settings: + self.__runtime_settings[self.__bot_id] = {} + if 'parameters' not in self.__runtime_settings[self.__bot_id]: + self.__runtime_settings[self.__bot_id]['parameters'] = {} + self.__runtime_settings[self.__bot_id]['parameters'].update(self.__settings) - for option, value in config.items(): + for option, value in self.__runtime_settings.get('global', {}).items(): setattr(self, option, value) self.__log_configuration_parameter("defaults", option, value) - self.__log_processed_messages_seconds = timedelta(seconds=self.log_processed_messages_seconds) - - def __load_runtime_configuration(self): - self.logger.debug("Loading runtime configuration from %r.", RUNTIME_CONF_FILE) - config = utils.load_configuration(RUNTIME_CONF_FILE) - reinitialize_logging = False - - if self.__bot_id in config: - params = config[self.__bot_id] + if self.__bot_id in self.__runtime_settings: + params = self.__runtime_settings[self.__bot_id] for key, value in params.items(): if key in ALLOWED_SYSTEM_PARAMETERS and value: self.__log_configuration_parameter("system", key, value) setattr(self, key, value) elif key not in IGNORED_SYSTEM_PARAMETERS: - self.logger.warning('Ignoring disallowed system parameter %r.', - key) + self.__log('warning', 'Ignoring disallowed system parameter %r.', + key) for option, value in params.get('parameters', {}).items(): setattr(self, option, value) self.__log_configuration_parameter("runtime", option, value) - if option.startswith('logging_'): - reinitialize_logging = True else: - self.logger.warning('Bot ID %r not found in runtime configuration - could not load any parameters.', - self.__bot_id) + self.__log('warning', 'Bot ID %r not found in runtime configuration - could not load any parameters.', + self.__bot_id) intelmq_environment = [elem for elem in os.environ if elem.startswith('INTELMQ_')] for elem in intelmq_environment: @@ -814,12 +862,7 @@ def __load_runtime_configuration(self): setattr(self, option, value) self.__log_configuration_parameter("environment", option, value) - if option.startswith('logging_'): - reinitialize_logging = True - - if reinitialize_logging: - self.logger.handlers = [] # remove all existing handlers - self.__init_logger() + self.__log_processed_messages_seconds = timedelta(seconds=self.log_processed_messages_seconds) # The default source_queue should be "{bot-id}-queue", # but this can be overridden @@ -840,21 +883,35 @@ def __init_logger(self): log_max_size=getattr(self, "logging_max_size", 0), log_max_copies=getattr(self, "logging_max_copies", None)) + def __log(self, level, message, *args, **kwargs): + """ + If the logger is already initialized, redirect to the logger + othwise write the message to the log buffer + """ + if self.logger: + getattr(self.logger, level)(message, *args, **kwargs) + else: + # we can't process **kwargs here, but not needed at this stage + # if the message contains '%Y' or similar (e.g. a formatted `http_url`) but not args for formatting, no formatting should be done. if we did it, a wrong 'TypeError: not enough arguments for format string' would be thrown + self.__log_buffer.append((level, message % args if args else message)) + def __log_configuration_parameter(self, config_name: str, option: str, value: Any): if "password" in option or "token" in option: value = "HIDDEN" - message = "{} configuration: parameter {!r} loaded with value {!r}." \ - .format(config_name.title(), option, value) + message = f"{config_name.title()} configuration: parameter {option!r} loaded with value {value!r}." - if self.logger: - self.logger.debug(message) - else: - self.__log_buffer.append(("debug", message)) + self.__log('debug', message) def __load_harmonization_configuration(self): self.logger.debug("Loading Harmonization configuration from %r.", HARMONIZATION_CONF_FILE) - self._harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + try: + self._harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + except ValueError: + if self._standalone: + raise + else: + self._harmonization = utils.load_configuration(resource_filename('intelmq', 'etc/harmonization.conf')) def new_event(self, *args, **kwargs): return libmessage.Event(*args, harmonization=self.harmonization, **kwargs) @@ -868,7 +925,7 @@ def run(cls, parsed_args=None): if not parsed_args.bot_id: sys.exit('No bot ID given.') - instance = cls(parsed_args.bot_id) + instance = cls(parsed_args.bot_id, standalone=True) if not instance.is_multithreaded: instance.start() @@ -956,6 +1013,50 @@ def _create_argparser(cls): argparser.add_argument('bot_id', nargs='?', metavar='BOT-ID', help='unique bot-id of your choosing') return argparser + def process_message(self, *messages: Union[libmessage.Message, dict]): + """ + Call the bot's process method with a prepared source queue. + Return value is a dict with the complete pipeline state. + Multiple messages can be given as positional argument. + The pipeline needs to be configured accordinglit with BotLibSettings, + see https://intelmq.readthedocs.io/en/develop/dev/library.html + + Access the output queue e.g. with return_value['output'] + """ + if self.bottype == BotType.COLLECTOR: + if messages: + raise exceptions.InvalidArgument('Collector Bots take no messages as processing input') + else: + # reset source queue + self.__source_pipeline.state[self.source_queue] = [] + # reset internal queue + if self.__source_pipeline._has_message: + self.__source_pipeline.acknowledge() + self.__current_message = None + + for message in messages: + # convert to Message objects, it the message is a dict + # use an appropriate default message type, not requiring __type keys in the message + if not isinstance(message, libmessage.Message) and isinstance(message, dict): + message = libmessage.MessageFactory.from_dict(message=message, + harmonization=self.harmonization, + default_type=self._default_message_type) + self.__source_pipeline.state[self.source_queue].append(message) + # do not dump to file + self.error_dump_message = False + # do not serialize messages to strings, keep the objects + self.__pipeline_serialize_messages = False + + # process all input messages + while self.__source_pipeline.state[self.source_queue]: + self.process() + + # clear destination state, before make a copy for return + state = self.__destination_pipeline.state.copy() + self.__destination_pipeline.clear_all_queues() + + return state + class ParserBot(Bot): bottype = BotType.PARSER @@ -964,6 +1065,7 @@ class ParserBot(Bot): _handle = None _current_line: Optional[str] = None _line_ending = '\r\n' + _default_message_type = 'Report' default_fields: Optional[dict] = {} @@ -1320,6 +1422,7 @@ class ExpertBot(Bot): Base class for expert bots. """ bottype = BotType.EXPERT + _default_message_type = 'Event' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1330,6 +1433,7 @@ class OutputBot(Bot): Base class for outputs. """ bottype = BotType.OUTPUT + _default_message_type = 'Event' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1400,3 +1504,10 @@ def export_event(self, event: libmessage.Event, class Parameters: pass + + +BotLibSettings = Dict39({'logging_path': None, + 'source_pipeline_broker': 'Pythonlistsimple', + 'destination_pipeline_broker': 'Pythonlistsimple', + 'destination_queues': {'_default': 'output', + '_on_error': 'error'}}) diff --git a/intelmq/lib/bot_debugger.py b/intelmq/lib/bot_debugger.py index 0afeaa5c1..4853bebf4 100644 --- a/intelmq/lib/bot_debugger.py +++ b/intelmq/lib/bot_debugger.py @@ -65,7 +65,9 @@ def __init__(self, runtime_configuration, bot_id, run_subcommand=None, console_t # Set's the bot's default and initial value for the logging_level to the value we want bot.logging_level = self.logging_level - self.instance = bot(bot_id, disable_multithreading=True) + self.instance = bot(bot_id, disable_multithreading=True, + standalone=True, # instruct the bot to call SystemExit exception at the end or in case of errors + ) def run(self) -> str: if not self.run_subcommand: @@ -217,3 +219,9 @@ def messageWizzard(self, msg): def pprint(msg) -> str: """ We can't use standard pprint as JSON standard asks for double quotes. """ return json.dumps(msg, indent=4, sort_keys=True) + + def __del__(self): + # prevents a SystemExit Exception at object deletion + # remove once PR#2358 (library mode) is merged + if self.instance: + setattr(self.instance, 'testing', True) diff --git a/intelmq/lib/datatypes.py b/intelmq/lib/datatypes.py index 7458b98ea..a4e57ecfe 100644 --- a/intelmq/lib/datatypes.py +++ b/intelmq/lib/datatypes.py @@ -1,13 +1,13 @@ -# SPDX-FileCopyrightText: 2021 Birger Schacht +# SPDX-FileCopyrightText: 2021 Birger Schacht, 2023 Bundesamt für Sicherheit in der Informationstechnik (BSI) # # SPDX-License-Identifier: AGPL-3.0-or-later -from datetime import datetime from enum import Enum from inspect import signature +from sys import version_info from typing import Optional, Callable, Union, List +from datetime import datetime from termstyle import green - from intelmq.lib.exceptions import InvalidArgument from intelmq.lib.harmonization import DateTime @@ -137,3 +137,21 @@ def validate(value: str) -> [Callable, Optional[str]]: expected=conversion_name) return conversion, format_string + + +if version_info < (3, 9): + class Dict39(dict): + """ + Python 3.9 introduced the handy | operator for dicts. + For backwards-compatibility, this is the backport + as IntelMQ supports Python >= 3.7 + """ + def __or__(self, other: dict) -> 'Dict39': + """ + Create a new dictionary with the merged keys and values of d and other, which must both be dictionaries. The values of other take priority when d and other share keys. + """ + ret = Dict39(self.copy()) + ret.update(other) + return ret +else: + Dict39 = dict diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index 69137209a..2b116bb6b 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -14,6 +14,7 @@ import warnings from collections import defaultdict from typing import Any, Dict, Iterable, Optional, Sequence, Union +from pkg_resources import resource_filename import intelmq.lib.exceptions as exceptions import intelmq.lib.harmonization @@ -56,8 +57,10 @@ def from_dict(message: dict, harmonization=None, got=message["__type"], expected=VALID_MESSSAGE_TYPES, docs=HARMONIZATION_CONF_FILE) - del message["__type"] - return class_reference(message, auto=True, harmonization=harmonization) + # don't modify the parameter + message_copy = message.copy() + del message_copy["__type"] + return class_reference(message_copy, auto=True, harmonization=harmonization) @staticmethod def unserialize(raw_message: str, harmonization: dict = None, @@ -103,7 +106,11 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False, classname = self.__class__.__name__.lower() if harmonization is None: - harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + try: + harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + except ValueError: + # Fallback to internal harmonization file + harmonization = utils.load_configuration(resource_filename('intelmq', 'etc/harmonization.conf')) try: self.harmonization_config = harmonization[classname] except KeyError: diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 9a9229187..8d307cad1 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -15,7 +15,7 @@ import time from itertools import chain -from typing import Dict, Optional +from typing import Optional import ssl import redis @@ -23,6 +23,7 @@ import intelmq.lib.exceptions as exceptions import intelmq.lib.pipeline import intelmq.lib.utils as utils +from intelmq.lib.message import Message __all__ = ['Pipeline', 'PipelineFactory', 'Redis', 'Pythonlist', 'Amqp'] @@ -415,12 +416,76 @@ def clear_queue(self, queue): """ Empties given queue. """ self.state[queue] = [] + def clear_all_queues(self): + """ Empties all queues / state """ + for queue in self.state: + self.state[queue] = [] + def _reject_message(self): """ No-op because of the internal queue """ +class Pythonlistsimple(Pythonlist): + """ + This pipeline uses simple lists for internal queues. + + It does no conversions and encoding stuff. + """ + + state = {} # type: Dict[str, list] + + def connect(self): + pass + + def send(self, message: Message, + path: str = "_default", + path_permissive: bool = True): + """ + Sends a message to the destination queues + + message should be of type Message, not string/bytes + + path_permissive defaults to true as opposed to the other pipelines! + """ + if path not in self.destination_queues and path_permissive: + return + + for destination_queue in self.destination_queues[path]: + try: + self.state[destination_queue].append(message) + except KeyError: + self.state[destination_queue] = [message] + + def _receive(self) -> bytes: + """ + Receives the last not yet acknowledged message. + + Does not block unlike the other pipelines. + """ + if len(self.state[self.internal_queue]) > 0: + return self.state[self.internal_queue][0] + + try: + first_msg = self.state[self.source_queue].pop(0) + except IndexError as exc: + raise exceptions.PipelineError(exc) + self.state[self.internal_queue].append(first_msg) + + return first_msg + + def receive(self) -> str: + if self._has_message: + raise exceptions.PipelineError("There's already a message, first " + "acknowledge the existing one.") + + retval = self._receive() + self._has_message = True + # no decoding + return retval + + class Amqp(Pipeline): queue_args = {'x-queue-mode': 'lazy'} source_pipeline_host = '127.0.0.1' diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index b7b16192a..0470ec8c2 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -6,7 +6,6 @@ """ Tests the Bot class itself. """ - import unittest import intelmq.lib.test as test diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py new file mode 100644 index 000000000..0b7f89a04 --- /dev/null +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -0,0 +1,145 @@ +# SPDX-FileCopyrightText: 2023 by Bundesamt für Sicherheit in der Informationstechnik (BSI) +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# -*- coding: utf-8 -*- +""" +Copyright (c) 2023 by Bundesamt für Sicherheit in der Informationstechnik (BSI) + +Software engineering by BSI & Intevation GmbH + +This file tests IntelMQ bots in library mode (IEP007) +""" +import json +import unittest +from os.path import dirname, join +from pytest import raises + +import intelmq.tests.bots.experts.domain_suffix.test_expert as domain_suffix_expert_test +from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot +from intelmq.bots.experts.taxonomy.expert import TaxonomyExpertBot +from intelmq.bots.experts.url.expert import URLExpertBot +from intelmq.lib.bot import BotLibSettings, Dict39, ExpertBot +from intelmq.lib.message import Message, MessageFactory +from intelmq.tests.lib import test_parser_bot + +EXAMPLE_DATA_URL = Dict39({'source.url': 'http://example.com/'}) +EXAMPLE_DATA_URL_OUT = EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', + 'source.port': 80, + 'source.urlpath': '/', + 'protocol.application': 'http', + 'protocol.transport': 'tcp'} +EXAMPLE_IP_INPUT = {"source.ip": "192.0.43.7", # icann.org. + "destination.ip": "192.0.43.8", # iana.org. + "time.observation": "2015-01-01T00:00:00+00:00", + } + + +class BrokenInitExpertBot(ExpertBot): + def init(self): + raise ValueError('This initialization intionally raises an error!') + + +class RaisesOnFirstRunExpertBot(ExpertBot): + counter = 0 + + def init(self): + self.counter = 0 + + def process(self): + event = self.receive_message() + self.counter += 1 + if self.counter == 1: + raise ValueError('This initialization intionally raises an error!') + self.send_message(event) + self.acknowledge_message() + + +def assertMessageEqual(actual, expected): + """ + Compare two messages as dicts. + """ + if isinstance(actual, Message): + actual = actual.to_dict(with_type=True) + else: + actual = actual.copy() + + if isinstance(expected, Message): + expected = expected.to_dict(with_type=True) + else: + expected = expected.copy() + + if 'time.observation' in actual: + del actual['time.observation'] + if 'time.observation' in expected: + del expected['time.observation'] + if 'output' in actual: + actual['output'] = json.loads(actual['output']) + if 'output' in expected: + expected['output'] = json.loads(expected['output']) + + assert actual == expected + + +def test_dummy_parser_bot(): + bot = test_parser_bot.DummyParserBot('dummy-bot', settings=BotLibSettings) + sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT.copy()) + assertMessageEqual(sent_messages['output'][0], test_parser_bot.EXAMPLE_EVENT) + assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0].copy(), default_type='Report')) + assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1].copy(), default_type='Report')) + + +def test_domain_suffix(): + domain_suffix = DomainSuffixExpertBot('domain-suffix', + settings=BotLibSettings | {'field': 'fqdn', + 'suffix_file': join(dirname(domain_suffix_expert_test.__file__), 'public_suffix_list.dat')}) + queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) + assert queues['output'][0]['source.domain_suffix'] == 'example.com' + + +def test_url_expert(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues = url_expert.process_message(EXAMPLE_DATA_URL.copy()) + del url_expert + assert queues['output'] == [EXAMPLE_DATA_URL_OUT] + + +def test_url_and_taxonomy(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues_url = url_expert.process_message(EXAMPLE_DATA_URL.copy()) + del url_expert + message = queues_url['output'][0] + taxonomy_expert = TaxonomyExpertBot('taxonomy', settings=BotLibSettings) + queues = taxonomy_expert.process_message(message) + assert queues['output'] == [Dict39(EXAMPLE_DATA_URL_OUT) | {'classification.taxonomy': 'other', 'classification.type': 'undetermined'}] + + +def test_bot_exception_init(): + """ + When a bot raises an exception during Bot initialization + """ + with raises(ValueError): + BrokenInitExpertBot('broken', settings=BotLibSettings) + + +def test_bot_multi_message(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues = url_expert.process_message(EXAMPLE_DATA_URL.copy(), EXAMPLE_DATA_URL.copy()) + del url_expert + assert queues['output'] == [EXAMPLE_DATA_URL_OUT] * 2 + + +def test_bot_raises_and_second_message(): + """ + The first message raises an error and the second message + This test is based on an issue where the exception-raising message was not cleared from the internal message store of the Bot/Pipeline instance and thus re-used on the second run + """ + raises_on_first_run = RaisesOnFirstRunExpertBot('raises', settings=BotLibSettings) + with raises(ValueError): + raises_on_first_run.process_message(EXAMPLE_DATA_URL) + queues = raises_on_first_run.process_message(EXAMPLE_IP_INPUT) + assert len(queues['output']) == 1 + assertMessageEqual(queues['output'][0], EXAMPLE_IP_INPUT) + + +if __name__ == '__main__': # pragma: no cover + unittest.main() diff --git a/intelmq/tests/lib/test_parser_bot.py b/intelmq/tests/lib/test_parser_bot.py index b32584658..0ccf05813 100644 --- a/intelmq/tests/lib/test_parser_bot.py +++ b/intelmq/tests/lib/test_parser_bot.py @@ -141,7 +141,9 @@ def run_bot(self, *args, **kwargs): super().run_bot(*args, **kwargs) def test_event(self): - """ Test DummyParserBot """ + """ + Test DummyParserBot: One event (and two dumps) + """ self.run_bot(allowed_error_count=2) self.assertMessageEqual(0, EXAMPLE_EVENT) @@ -183,7 +185,7 @@ def test_processed_messages_count(self): def test_processed_messages_seconds(self): self.input_message = EXAMPLE_SHORT self.run_bot(parameters={'log_processed_messages_count': 10, - 'log_processed_messages_seconds': 0}) + 'log_processed_messages_seconds': 0.1}) self.assertAnyLoglineEqual(message='Processed 2 messages since last logging.', levelname='INFO')