From 587b324bf411f9e0faf25082f3ed49e0ec3640e1 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 9 May 2023 15:53:30 +0200 Subject: [PATCH 01/29] bug: Bot: stop Bot on deletion --- intelmq/lib/bot.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 7b4085b74..ab28113ed 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -525,6 +525,9 @@ def __sleep(self, remaining: Optional[float] = None, log: bool = True): self.__handle_sighup() remaining = self.rate_limit - (time.time() - starttime) + def __del__(self): + return self.stop(exitcode=0) + def stop(self, exitcode: int = 1): if not self.logger: print('Could not initialize logger, only logging to stdout.') From 50fc07200c557eeb346e781a672b54e61a370e58 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 11 May 2023 19:54:23 +0200 Subject: [PATCH 02/29] bot debugger: workaround for systemexit of bot --- intelmq/lib/bot_debugger.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/intelmq/lib/bot_debugger.py b/intelmq/lib/bot_debugger.py index 0afeaa5c1..4e37de54c 100644 --- a/intelmq/lib/bot_debugger.py +++ b/intelmq/lib/bot_debugger.py @@ -217,3 +217,8 @@ def messageWizzard(self, msg): def pprint(msg) -> str: """ We can't use standard pprint as JSON standard asks for double quotes. """ return json.dumps(msg, indent=4, sort_keys=True) + + def __del__(self): + # prevents a SystemExit Exception at object deletion + # remove once PR#2358 (library mode) is merged + setattr(self.instance, 'testing', True) From d6356b57f766e4f1c0e611334b3db67370a110fc Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 21 Apr 2023 22:54:41 +0200 Subject: [PATCH 03/29] lib/bot: allow setting the configuration via Bot's constructor and if loaded from file, only load it once --- CHANGELOG.md | 1 + intelmq/lib/bot.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39e44f2ee..072b7a238 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ CHANGELOG - `intelmq.lib.harmonization`: - Changes signature and names of `DateTime` conversion functions for consistency, backwards compatible (PR#2329 by Filip Pokorný). - Ensure rejecting URLs with leading whitespaces after changes in CPython (fixes [#2377](https://github.com/certtools/intelmq/issues/2377)) +- `intelmq.lib.bot.Bot`: Allow setting the parameters via parameter on bot initialization. ### Development - CI: pin the Codespell version to omit troubles caused by its new releases (PR #2379). diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index ab28113ed..9916d1fb9 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -55,6 +55,7 @@ class Bot: __source_pipeline = None __destination_pipeline = None __log_buffer: List[tuple] = [] + __settings: Optional[dict] = None logger = None # Bot is capable of SIGHUP delaying @@ -120,7 +121,7 @@ class Bot: _harmonization: dict = {} def __init__(self, bot_id: str, start: bool = False, sighup_event=None, - disable_multithreading: bool = None): + disable_multithreading: bool = None, settings: Optional[dict] = None): self.__log_buffer: list = [] @@ -128,6 +129,8 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.__source_pipeline: Optional[Pipeline] = None self.__destination_pipeline: Optional[Pipeline] = None self.logger = None + if settings is not None: + self.__settings = settings self.__message_counter = {"since": 0, # messages since last logging "start": None, # last login time @@ -769,9 +772,10 @@ def _dump_message(self, error_traceback, message: dict): self.logger.debug('Message dumped.') def __load_defaults_configuration(self): - config = utils.get_global_settings() + if not self.__settings: + self.__settings = utils.get_runtime() - setattr(self, 'logging_path', DEFAULT_LOGGING_PATH) + config = self.__settings.get('global', {}) for option, value in config.items(): setattr(self, option, value) @@ -781,11 +785,10 @@ def __load_defaults_configuration(self): def __load_runtime_configuration(self): self.logger.debug("Loading runtime configuration from %r.", RUNTIME_CONF_FILE) - config = utils.load_configuration(RUNTIME_CONF_FILE) reinitialize_logging = False - if self.__bot_id in config: - params = config[self.__bot_id] + if self.__bot_id in self.__settings: + params = self.__settings[self.__bot_id] for key, value in params.items(): if key in ALLOWED_SYSTEM_PARAMETERS and value: self.__log_configuration_parameter("system", key, value) From 310e3994fdfb8c38a17141a4a71ebfe5a322e1ab Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 21 Apr 2023 22:55:52 +0200 Subject: [PATCH 04/29] poc: load bots as library --- intelmq/lib/bot.py | 14 ++++++++++++-- intelmq/tests/lib/test_bot.py | 25 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 9916d1fb9..705794f44 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -121,7 +121,8 @@ class Bot: _harmonization: dict = {} def __init__(self, bot_id: str, start: bool = False, sighup_event=None, - disable_multithreading: bool = None, settings: Optional[dict] = None): + disable_multithreading: bool = None, settings: Optional[dict] = None, + source_queue: Optional[str] = None): self.__log_buffer: list = [] @@ -131,6 +132,8 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.logger = None if settings is not None: self.__settings = settings + if source_queue is not None: + self.source_queue = source_queue self.__message_counter = {"since": 0, # messages since last logging "start": None, # last login time @@ -587,6 +590,9 @@ def __check_bot_id(self, name: str) -> Tuple[str, str, str]: def __connect_pipelines(self): pipeline_args = {key: getattr(self, key) for key in dir(self) if not inspect.ismethod(getattr(self, key)) and (key.startswith('source_pipeline_') or key.startswith('destination_pipeline'))} + print('pipeline_args', pipeline_args, 'source_queue', self.source_queue) + if getattr(self, 'skip_pipeline', False): + return if self.source_queue is not None: self.logger.debug("Loading source pipeline and queue %r.", self.source_queue) self.__source_pipeline = PipelineFactory.create(logger=self.logger, @@ -667,7 +673,7 @@ def receive_message(self) -> libmessage.Message: will be rejected to the pipeline in the first place to get to a clean state. Then, after reloading, the message will be retrieved again. """ - if self.__current_message: + if self.__current_message is not None: self.logger.debug("Reusing existing current message as incoming.") return self.__current_message @@ -775,6 +781,8 @@ def __load_defaults_configuration(self): if not self.__settings: self.__settings = utils.get_runtime() + print('settings:', self.__settings, 'logging_path', self.__settings.get('logging_path')) + config = self.__settings.get('global', {}) for option, value in config.items(): @@ -782,6 +790,7 @@ def __load_defaults_configuration(self): self.__log_configuration_parameter("defaults", option, value) self.__log_processed_messages_seconds = timedelta(seconds=self.log_processed_messages_seconds) + print('settings:', self.__settings, 'logging_path', self.__settings.get('logging_path')) def __load_runtime_configuration(self): self.logger.debug("Loading runtime configuration from %r.", RUNTIME_CONF_FILE) @@ -840,6 +849,7 @@ def __init_logger(self): syslog = self.logging_syslog else: syslog = False + print('self.logging_path', self.logging_path) self.logger = utils.log(self.__bot_id_full, syslog=syslog, log_path=self.logging_path, log_level=self.logging_level, diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index b7b16192a..9bc297f94 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -11,6 +11,7 @@ import intelmq.lib.test as test from intelmq.tests.lib import test_parser_bot +from intelmq.lib.message import MessageFactory class TestDummyParserBot(test.BotTestCase, unittest.TestCase): @@ -79,5 +80,29 @@ def test_invalid_value_on_input_message(self): self.assertEqual(self.pipe.state['test-bot-output'], []) +def send_message(self, *messages, path: str = "_default", auto_add=None, + path_permissive: bool = False): + self._sent_messages.extend(messages) + + +def _dump_message(self, error_traceback, message: dict): + self._dumped_messages.append((error_traceback, message)) + + +class TestBotAsLibrary(unittest.TestCase): + def test_dummy(self): + bot = test_parser_bot.DummyParserBot('dummy-bot', settings={'global': {'logging_path': None, 'skip_pipeline': True}, 'dummy-bot': {}}) + bot._Bot__current_message = MessageFactory.from_dict(test_parser_bot.EXAMPLE_REPORT) + bot._Bot__connect_pipelines = lambda self: None + bot._sent_messages = [] + bot._dumped_messages = [] + bot.send_message = send_message.__get__(bot, test_parser_bot.DummyParserBot) + bot._dump_message = _dump_message.__get__(bot, test_parser_bot.DummyParserBot) + bot.process() + assert bot._sent_messages == [MessageFactory.from_dict(test_parser_bot.EXAMPLE_EVENT)] + assert bot._dumped_messages[0][1] == test_parser_bot.EXPECTED_DUMP[0] + assert bot._dumped_messages[1][1] == test_parser_bot.EXPECTED_DUMP[1] + + if __name__ == '__main__': # pragma: no cover unittest.main() From 5610d6d41fdf8ed95a978aa927eb3772bf358234 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 26 Apr 2023 22:49:28 +0200 Subject: [PATCH 05/29] enh: lib/bot: new __log method to simplify pre-logger logging a new helper method which simplifies the logging before the logger is initialized completely it decides by itself where the log message goes. the caller does not need to decide it by itself --- intelmq/lib/bot.py | 48 +++++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 705794f44..393f6f424 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -146,13 +146,10 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, try: version_info = sys.version.splitlines()[0].strip() - self.__log_buffer.append(('info', - '{bot} initialized with id {id} and intelmq {intelmq}' - ' and python {python} as process {pid}.' - ''.format(bot=self.__class__.__name__, - id=bot_id, python=version_info, - pid=os.getpid(), intelmq=__version__))) - self.__log_buffer.append(('debug', 'Library path: %r.' % __file__)) + self.__log('info', + f'{self.__class__.__name__} initialized with id {bot_id} and intelmq {__version__}' + f' and python {version_info} as process {os.getpid()}.') + self.__log('debug', f'Library path: {__file__!r}.') if not utils.drop_privileges(): raise ValueError('IntelMQ must not run as root. Dropping privileges did not work.') @@ -163,7 +160,7 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.is_multithreaded = True self.__init_logger() except Exception: - self.__log_buffer.append(('critical', traceback.format_exc())) + self.__log('critical', traceback.format_exc()) self.stop() else: for line in self.__log_buffer: @@ -561,7 +558,7 @@ def stop(self, exitcode: int = 1): self.logger.info("Bot stopped.") logging.shutdown() else: - self.__log_buffer.append(('info', 'Bot stopped.')) + self.__log('info', 'Bot stopped.') self.__print_log_buffer() if not getattr(self, 'testing', False): @@ -582,9 +579,9 @@ def __check_bot_id(self, name: str) -> Tuple[str, str, str]: if res: if not (res.group(2) and threading.current_thread() == threading.main_thread()): return name, res.group(1), res.group(2)[1:] if res.group(2) else None - self.__log_buffer.append(('error', - "Invalid bot id, must match '" - r"[^0-9a-zA-Z\-]+'.")) + self.__log('error', + "Invalid bot id, must match '" + r"[^0-9a-zA-Z\-]+'.") self.stop() return False, False, False @@ -803,16 +800,16 @@ def __load_runtime_configuration(self): self.__log_configuration_parameter("system", key, value) setattr(self, key, value) elif key not in IGNORED_SYSTEM_PARAMETERS: - self.logger.warning('Ignoring disallowed system parameter %r.', - key) + self.__log('warning', 'Ignoring disallowed system parameter %r.', + key) for option, value in params.get('parameters', {}).items(): setattr(self, option, value) self.__log_configuration_parameter("runtime", option, value) if option.startswith('logging_'): reinitialize_logging = True else: - self.logger.warning('Bot ID %r not found in runtime configuration - could not load any parameters.', - self.__bot_id) + self.__log('warning', 'Bot ID %r not found in runtime configuration - could not load any parameters.', + self.__bot_id) intelmq_environment = [elem for elem in os.environ if elem.startswith('INTELMQ_')] for elem in intelmq_environment: @@ -856,17 +853,24 @@ def __init_logger(self): log_max_size=getattr(self, "logging_max_size", 0), log_max_copies=getattr(self, "logging_max_copies", None)) + def __log(self, level, message, *args, **kwargs): + """ + If the logger is already initialized, redirect to the logger + othwise write the message to the log buffer + """ + if self.logger: + getattr(self.logger, level)(message, *args, **kwargs) + else: + # we can't process **kwargs here, but not needed at this stage + self.__log_buffer.append((level, message % args)) + def __log_configuration_parameter(self, config_name: str, option: str, value: Any): if "password" in option or "token" in option: value = "HIDDEN" - message = "{} configuration: parameter {!r} loaded with value {!r}." \ - .format(config_name.title(), option, value) + message = f"{config_name.title()} configuration: parameter {option!r} loaded with value {value!r}." - if self.logger: - self.logger.debug(message) - else: - self.__log_buffer.append(("debug", message)) + self.__log('debug', message) def __load_harmonization_configuration(self): self.logger.debug("Loading Harmonization configuration from %r.", HARMONIZATION_CONF_FILE) From aede8dae8c91095f9a8b7aaaaea678289bf00050 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 26 Apr 2023 22:51:03 +0200 Subject: [PATCH 06/29] iep007 poc: implement settings and process_message --- intelmq/lib/bot.py | 119 ++++++++++++++++++++++------------ intelmq/lib/message.py | 6 +- intelmq/lib/pipeline.py | 61 +++++++++++++++++ intelmq/tests/lib/test_bot.py | 57 ++++++++++++++-- 4 files changed, 191 insertions(+), 52 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 393f6f424..4db349159 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -55,7 +55,14 @@ class Bot: __source_pipeline = None __destination_pipeline = None __log_buffer: List[tuple] = [] + # runtime_file + __runtime_settings: Optional[dict] = None + # settings provided via parameter __settings: Optional[dict] = None + # if messages should be serialized/unserialized coming from/sending to the pipeline + __pipeline_serialize_messages = True + # if the Bot is running by itself or called by other procedures + _standalone: bool = False logger = None # Bot is capable of SIGHUP delaying @@ -92,7 +99,7 @@ class Bot: log_processed_messages_count: int = 500 log_processed_messages_seconds: int = 900 logging_handler: str = "file" - logging_level: str = DEFAULT_LOGGING_LEVEL + logging_level: str = 'DEBUG' # DEFAULT_LOGGING_LEVEL logging_path: str = DEFAULT_LOGGING_PATH logging_syslog: str = "/dev/log" process_manager: str = "intelmq" @@ -122,7 +129,7 @@ class Bot: def __init__(self, bot_id: str, start: bool = False, sighup_event=None, disable_multithreading: bool = None, settings: Optional[dict] = None, - source_queue: Optional[str] = None): + source_queue: Optional[str] = None, standalone: bool = False): self.__log_buffer: list = [] @@ -134,6 +141,7 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.__settings = settings if source_queue is not None: self.source_queue = source_queue + self._standalone = standalone self.__message_counter = {"since": 0, # messages since last logging "start": None, # last login time @@ -153,9 +161,10 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, if not utils.drop_privileges(): raise ValueError('IntelMQ must not run as root. Dropping privileges did not work.') - self.__load_defaults_configuration() - self.__bot_id_full, self.__bot_id, self.__instance_id = self.__check_bot_id(bot_id) + + self.__load_configuration() + if self.__instance_id: self.is_multithreaded = True self.__init_logger() @@ -168,7 +177,6 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, try: self.logger.info('Bot is starting.') - self.__load_runtime_configuration() broker = self.source_pipeline_broker.title() if broker != 'Amqp': @@ -561,7 +569,7 @@ def stop(self, exitcode: int = 1): self.__log('info', 'Bot stopped.') self.__print_log_buffer() - if not getattr(self, 'testing', False): + if not getattr(self, 'testing', False) and self._standalone: sys.exit(exitcode) def __print_log_buffer(self): @@ -588,10 +596,8 @@ def __check_bot_id(self, name: str) -> Tuple[str, str, str]: def __connect_pipelines(self): pipeline_args = {key: getattr(self, key) for key in dir(self) if not inspect.ismethod(getattr(self, key)) and (key.startswith('source_pipeline_') or key.startswith('destination_pipeline'))} print('pipeline_args', pipeline_args, 'source_queue', self.source_queue) - if getattr(self, 'skip_pipeline', False): - return if self.source_queue is not None: - self.logger.debug("Loading source pipeline and queue %r.", self.source_queue) + self.logger.info("Loading source pipeline and queue %r.", self.source_queue) self.__source_pipeline = PipelineFactory.create(logger=self.logger, direction="source", queues=self.source_queue, @@ -601,10 +607,10 @@ def __connect_pipelines(self): self.__source_pipeline.connect() self.__current_message = None - self.logger.debug("Connected to source queue.") + self.logger.info("Connected to source queue.") if self.destination_queues: - self.logger.debug("Loading destination pipeline and queues %r.", self.destination_queues) + self.logger.info("Loading destination pipeline and queues %r.", self.destination_queues) self.__destination_pipeline = PipelineFactory.create(logger=self.logger, direction="destination", queues=self.destination_queues, @@ -613,9 +619,9 @@ def __connect_pipelines(self): is_multithreaded=self.is_multithreaded) self.__destination_pipeline.connect() - self.logger.debug("Connected to destination queues.") + self.logger.info("Connected to destination queues.") else: - self.logger.debug("No destination queues to load.") + self.logger.info("No destination queues to load.") def __disconnect_pipelines(self): """ Disconnecting pipelines. """ @@ -646,6 +652,8 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, 'but needed') self.logger.debug("Sending message to path %r.", path) + + # Message counter start self.__message_counter["since"] += 1 self.__message_counter["path"][path] += 1 if not self.__message_counter["start"]: @@ -657,10 +665,16 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, self.__message_counter["since"]) self.__message_counter["since"] = 0 self.__message_counter["start"] = datetime.now() + # Message counter end - raw_message = libmessage.MessageFactory.serialize(message) - self.__destination_pipeline.send(raw_message, path=path, - path_permissive=path_permissive) + if self.__pipeline_serialize_messages: + raw_message = libmessage.MessageFactory.serialize(message) + self.__destination_pipeline.send(raw_message, path=path, + path_permissive=path_permissive) + else: + print(f'send_message, message: {message!r}') + self.__destination_pipeline.send(message, path=path, + path_permissive=path_permissive) def receive_message(self) -> libmessage.Message: """ @@ -692,8 +706,12 @@ def receive_message(self) -> libmessage.Message: return self.receive_message() try: - self.__current_message = libmessage.MessageFactory.unserialize(message, - harmonization=self.harmonization) + if self.__pipeline_serialize_messages: + self.__current_message = libmessage.MessageFactory.unserialize(message, + harmonization=self.harmonization) + else: + self.__current_message = message + except exceptions.InvalidKey as exc: # In case a incoming message is malformed an does not conform with the currently # loaded harmonization, stop now as this will happen repeatedly without any change @@ -774,27 +792,25 @@ def _dump_message(self, error_traceback, message: dict): self.logger.debug('Message dumped.') - def __load_defaults_configuration(self): - if not self.__settings: - self.__settings = utils.get_runtime() - - print('settings:', self.__settings, 'logging_path', self.__settings.get('logging_path')) + def __load_configuration(self): + self.__log('debug', "Loading runtime configuration from %r.", RUNTIME_CONF_FILE) + if not self.__runtime_settings: + self.__runtime_settings = utils.get_runtime() - config = self.__settings.get('global', {}) + # merge in configuration provided as parameter to init + if self.__settings: + if self.__bot_id not in self.__runtime_settings: + self.__runtime_settings[self.__bot_id] = {} + if 'parameters' not in self.__runtime_settings[self.__bot_id]: + self.__runtime_settings[self.__bot_id]['parameters'] = {} + self.__runtime_settings[self.__bot_id]['parameters'].update(self.__settings) - for option, value in config.items(): + for option, value in self.__runtime_settings.get('global', {}).items(): setattr(self, option, value) self.__log_configuration_parameter("defaults", option, value) - self.__log_processed_messages_seconds = timedelta(seconds=self.log_processed_messages_seconds) - print('settings:', self.__settings, 'logging_path', self.__settings.get('logging_path')) - - def __load_runtime_configuration(self): - self.logger.debug("Loading runtime configuration from %r.", RUNTIME_CONF_FILE) - reinitialize_logging = False - - if self.__bot_id in self.__settings: - params = self.__settings[self.__bot_id] + if self.__bot_id in self.__runtime_settings: + params = self.__runtime_settings[self.__bot_id] for key, value in params.items(): if key in ALLOWED_SYSTEM_PARAMETERS and value: self.__log_configuration_parameter("system", key, value) @@ -805,8 +821,6 @@ def __load_runtime_configuration(self): for option, value in params.get('parameters', {}).items(): setattr(self, option, value) self.__log_configuration_parameter("runtime", option, value) - if option.startswith('logging_'): - reinitialize_logging = True else: self.__log('warning', 'Bot ID %r not found in runtime configuration - could not load any parameters.', self.__bot_id) @@ -826,12 +840,7 @@ def __load_runtime_configuration(self): setattr(self, option, value) self.__log_configuration_parameter("environment", option, value) - if option.startswith('logging_'): - reinitialize_logging = True - - if reinitialize_logging: - self.logger.handlers = [] # remove all existing handlers - self.__init_logger() + self.__log_processed_messages_seconds = timedelta(seconds=self.log_processed_messages_seconds) # The default source_queue should be "{bot-id}-queue", # but this can be overridden @@ -847,6 +856,8 @@ def __init_logger(self): else: syslog = False print('self.logging_path', self.logging_path) + import pprint + pprint.pprint(self.__log_buffer) self.logger = utils.log(self.__bot_id_full, syslog=syslog, log_path=self.logging_path, log_level=self.logging_level, @@ -888,7 +899,7 @@ def run(cls, parsed_args=None): if not parsed_args.bot_id: sys.exit('No bot ID given.') - instance = cls(parsed_args.bot_id) + instance = cls(parsed_args.bot_id, standalone=True) if not instance.is_multithreaded: instance.start() @@ -976,6 +987,25 @@ def _create_argparser(cls): argparser.add_argument('bot_id', nargs='?', metavar='BOT-ID', help='unique bot-id of your choosing') return argparser + def process_message(self, message: Optional[libmessage.Message] = None): + if self.bottype == BotType.COLLECTOR: + if message: + raise exceptions.InvalidArgument('Collector Bots take not messages as processing input') + else: + # convert to Message object, it the input is a dict + # use a appropriate default message type, not requiring __type keys in the message + if not isinstance(message, libmessage.Message) and isinstance(message, dict): + message = libmessage.MessageFactory.from_dict(message=message, harmonization=self.harmonization, default_type=self._default_message_type) + print('destination_queues', repr(self.destination_queues)) + # messages is a tuple, the pipeline can't pop from a tuple. convert to a list instead + self.__source_pipeline.state[self.source_queue].append(message) + # do not dump to file + self.error_dump_message = False + # do not serialize messages to strings, keep the objects + self.__pipeline_serialize_messages = False + self.process() + return self.__destination_pipeline.state + class ParserBot(Bot): bottype = BotType.PARSER @@ -984,6 +1014,7 @@ class ParserBot(Bot): _handle = None _current_line: Optional[str] = None _line_ending = '\r\n' + _default_message_type = 'Report' default_fields: Optional[dict] = {} @@ -1340,6 +1371,7 @@ class ExpertBot(Bot): Base class for expert bots. """ bottype = BotType.EXPERT + _default_message_type = 'Event' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1350,6 +1382,7 @@ class OutputBot(Bot): Base class for outputs. """ bottype = BotType.OUTPUT + _default_message_type = 'Event' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index 69137209a..ed1b7ff67 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -56,8 +56,10 @@ def from_dict(message: dict, harmonization=None, got=message["__type"], expected=VALID_MESSSAGE_TYPES, docs=HARMONIZATION_CONF_FILE) - del message["__type"] - return class_reference(message, auto=True, harmonization=harmonization) + # don't modify the parameter + message_copy = message.copy() + del message_copy["__type"] + return class_reference(message_copy, auto=True, harmonization=harmonization) @staticmethod def unserialize(raw_message: str, harmonization: dict = None, diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 9a9229187..b6f9e63c1 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -23,6 +23,7 @@ import intelmq.lib.exceptions as exceptions import intelmq.lib.pipeline import intelmq.lib.utils as utils +from intelmq.lib.message import Message __all__ = ['Pipeline', 'PipelineFactory', 'Redis', 'Pythonlist', 'Amqp'] @@ -421,6 +422,66 @@ def _reject_message(self): """ +class Pythonlistsimple(Pythonlist): + """ + This pipeline uses simple lists for internal queues. + + It does no conversions and encoding stuff. + """ + + state = {} # type: Dict[str, list] + + def connect(self): + pass + + def send(self, message: Message, + path: str = "_default", + path_permissive: bool = True): + """ + Sends a message to the destination queues + + message should be of type Message, not string/bytes + + path_permissive defaults to true as opposed to the other pipelines! + """ + print(f'Pythonlistsimple, send: {message!r}') + if path not in self.destination_queues and path_permissive: + return + + for destination_queue in self.destination_queues[path]: + try: + self.state[destination_queue].append(message) + except KeyError: + self.state[destination_queue] = [message] + + def _receive(self) -> bytes: + """ + Receives the last not yet acknowledged message. + + Does not block unlike the other pipelines. + """ + if len(self.state[self.internal_queue]) > 0: + return self.state[self.internal_queue][0] + + try: + first_msg = self.state[self.source_queue].pop(0) + except IndexError as exc: + raise exceptions.PipelineError(exc) + self.state[self.internal_queue].append(first_msg) + + return first_msg + + def receive(self) -> str: + if self._has_message: + raise exceptions.PipelineError("There's already a message, first " + "acknowledge the existing one.") + + retval = self._receive() + self._has_message = True + # no decoding + return retval + + class Amqp(Pipeline): queue_args = {'x-queue-mode': 'lazy'} source_pipeline_host = '127.0.0.1' diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index 9bc297f94..12e2dd8fb 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -6,12 +6,12 @@ """ Tests the Bot class itself. """ - +import json import unittest import intelmq.lib.test as test from intelmq.tests.lib import test_parser_bot -from intelmq.lib.message import MessageFactory +from intelmq.lib.message import MessageFactory, Message class TestDummyParserBot(test.BotTestCase, unittest.TestCase): @@ -85,13 +85,41 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, self._sent_messages.extend(messages) -def _dump_message(self, error_traceback, message: dict): - self._dumped_messages.append((error_traceback, message)) +BotLibSettings = {'logging_path': None, + 'source_pipeline_broker': 'Pythonlistsimple', + 'destination_pipeline_broker': 'Pythonlistsimple', + 'destination_queues': {'_default': 'output', + '_on_error': 'error'}} class TestBotAsLibrary(unittest.TestCase): - def test_dummy(self): - bot = test_parser_bot.DummyParserBot('dummy-bot', settings={'global': {'logging_path': None, 'skip_pipeline': True}, 'dummy-bot': {}}) + def assertMessageEqual(self, actual, expected): + """ + Compare two messages as dicts. + """ + if isinstance(actual, Message): + actual = actual.to_dict(with_type=True) + else: + actual = actual.copy() + + if isinstance(expected, Message): + expected = expected.to_dict(with_type=True) + else: + expected = expected.copy() + + if 'time.observation' in actual: + del actual['time.observation'] + if 'time.observation' in expected: + del expected['time.observation'] + if 'output' in actual: + actual['output'] = json.loads(actual['output']) + if 'output' in expected: + expected['output'] = json.loads(expected['output']) + + self.assertDictEqual(actual, expected) + + """def test_dummy_mocked(self): + bot = test_parser_bot.DummyParserBot('dummy-bot', settings={'global': {'logging_path': None, 'skip_pipeline': True, 'broker': 'pythonlist'}, 'dummy-bot': {}}) bot._Bot__current_message = MessageFactory.from_dict(test_parser_bot.EXAMPLE_REPORT) bot._Bot__connect_pipelines = lambda self: None bot._sent_messages = [] @@ -101,7 +129,22 @@ def test_dummy(self): bot.process() assert bot._sent_messages == [MessageFactory.from_dict(test_parser_bot.EXAMPLE_EVENT)] assert bot._dumped_messages[0][1] == test_parser_bot.EXPECTED_DUMP[0] - assert bot._dumped_messages[1][1] == test_parser_bot.EXPECTED_DUMP[1] + assert bot._dumped_messages[1][1] == test_parser_bot.EXPECTED_DUMP[1]""" + + def test_dummy_pythonlist(self): + bot = test_parser_bot.DummyParserBot('dummy-bot', settings=BotLibSettings) + sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT) + self.assertMessageEqual(sent_messages['output'][0], test_parser_bot.EXAMPLE_EVENT) + self.assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0], default_type='Report')) + self.assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1], default_type='Report')) + + def test_domain_suffix(self): + from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot + domain_suffix = DomainSuffixExpertBot('domain-suffix', # bot id + settings=BotLibSettings | {'field': 'fqdn', + 'suffix_file': '/usr/share/publicsuffix/public_suffix_list.dat'}) + queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) + assert queues['output'][0]['source.domain_suffix'] == 'com' if __name__ == '__main__': # pragma: no cover From 2028e579d679ca8c10844fd5fd76ca40aea3b3ad Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 26 Apr 2023 23:39:42 +0200 Subject: [PATCH 07/29] temporary: run tests for all branches --- .github/workflows/unittests.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/unittests.yml b/.github/workflows/unittests.yml index 5a740bc85..aed0642e0 100644 --- a/.github/workflows/unittests.yml +++ b/.github/workflows/unittests.yml @@ -6,7 +6,6 @@ name: "Unit tests" on: push: - branches: [develop, maintenance, master] pull_request: branches: [develop, maintenance] paths-ignore: From 8aed2bf820ad9acdd00b9efe2165013bc8e74169 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 26 Apr 2023 23:40:19 +0200 Subject: [PATCH 08/29] cleanup debugging and add docstring, fix tests make a copy of the re-used events to prevent modifying the original content --- intelmq/lib/bot.py | 4 ---- intelmq/tests/lib/test_bot.py | 6 +++--- intelmq/tests/lib/test_parser_bot.py | 4 +++- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 4db349159..3ef65ca4f 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -595,7 +595,6 @@ def __check_bot_id(self, name: str) -> Tuple[str, str, str]: def __connect_pipelines(self): pipeline_args = {key: getattr(self, key) for key in dir(self) if not inspect.ismethod(getattr(self, key)) and (key.startswith('source_pipeline_') or key.startswith('destination_pipeline'))} - print('pipeline_args', pipeline_args, 'source_queue', self.source_queue) if self.source_queue is not None: self.logger.info("Loading source pipeline and queue %r.", self.source_queue) self.__source_pipeline = PipelineFactory.create(logger=self.logger, @@ -855,9 +854,6 @@ def __init_logger(self): syslog = self.logging_syslog else: syslog = False - print('self.logging_path', self.logging_path) - import pprint - pprint.pprint(self.__log_buffer) self.logger = utils.log(self.__bot_id_full, syslog=syslog, log_path=self.logging_path, log_level=self.logging_level, diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index 12e2dd8fb..8eef18504 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -133,10 +133,10 @@ def assertMessageEqual(self, actual, expected): def test_dummy_pythonlist(self): bot = test_parser_bot.DummyParserBot('dummy-bot', settings=BotLibSettings) - sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT) + sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT.copy()) self.assertMessageEqual(sent_messages['output'][0], test_parser_bot.EXAMPLE_EVENT) - self.assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0], default_type='Report')) - self.assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1], default_type='Report')) + self.assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0].copy(), default_type='Report')) + self.assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1].copy(), default_type='Report')) def test_domain_suffix(self): from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot diff --git a/intelmq/tests/lib/test_parser_bot.py b/intelmq/tests/lib/test_parser_bot.py index b32584658..4859cda60 100644 --- a/intelmq/tests/lib/test_parser_bot.py +++ b/intelmq/tests/lib/test_parser_bot.py @@ -141,7 +141,9 @@ def run_bot(self, *args, **kwargs): super().run_bot(*args, **kwargs) def test_event(self): - """ Test DummyParserBot """ + """ + Test DummyParserBot: One event (and two dumps) + """ self.run_bot(allowed_error_count=2) self.assertMessageEqual(0, EXAMPLE_EVENT) From 403e10637ee2b7e098572e461323a79f725862f8 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 26 Apr 2023 23:51:47 +0200 Subject: [PATCH 09/29] fix log --- intelmq/lib/bot.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 3ef65ca4f..8b7979146 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -869,7 +869,8 @@ def __log(self, level, message, *args, **kwargs): getattr(self.logger, level)(message, *args, **kwargs) else: # we can't process **kwargs here, but not needed at this stage - self.__log_buffer.append((level, message % args)) + # if the message contains '%Y' or similar (e.g. a formatted `http_url`) but not args for formatting, no formatting should be done. if we did it, a wrong 'TypeError: not enough arguments for format string' would be thrown + self.__log_buffer.append((level, message % args if args else message)) def __log_configuration_parameter(self, config_name: str, option: str, value: Any): if "password" in option or "token" in option: From e81a378f4ffa7d8b4288a653e237c6e5ce5b7794 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 27 Apr 2023 00:22:45 +0200 Subject: [PATCH 10/29] lib/bot: do not depend on conf file if called as lib in lib-mode, configuration files are optional --- intelmq/lib/bot.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 8b7979146..3b57f18c4 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -29,6 +29,7 @@ from collections import defaultdict from datetime import datetime, timedelta from typing import Any, List, Optional, Union, Tuple +from pkg_resources import resource_filename import intelmq.lib.message as libmessage from intelmq import (DEFAULT_LOGGING_PATH, @@ -794,7 +795,15 @@ def _dump_message(self, error_traceback, message: dict): def __load_configuration(self): self.__log('debug', "Loading runtime configuration from %r.", RUNTIME_CONF_FILE) if not self.__runtime_settings: - self.__runtime_settings = utils.get_runtime() + try: + self.__runtime_settings = utils.get_runtime() + except ValueError: + if not self._standalone: + self.__log('info', 'Could not load runtime configuration file. ' + 'Continuing, as settings are provided as parameter in library-mode.') + self.__runtime_settings = {} + else: + raise # merge in configuration provided as parameter to init if self.__settings: @@ -882,7 +891,13 @@ def __log_configuration_parameter(self, config_name: str, option: str, value: An def __load_harmonization_configuration(self): self.logger.debug("Loading Harmonization configuration from %r.", HARMONIZATION_CONF_FILE) - self._harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + try: + self._harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + except ValueError: + if self._standalone: + raise + else: + self._harmonization = utils.load_configuration(resource_filename('intelmq', 'etc/harmonization.conf')) def new_event(self, *args, **kwargs): return libmessage.Event(*args, harmonization=self.harmonization, **kwargs) From 0be69d1ccbc084ca54b620d3fffba1c35d728ea6 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 27 Apr 2023 00:23:09 +0200 Subject: [PATCH 11/29] tst: fix test_processed_messages_seconds if log_processed_messages_seconds is 0, it will log after every message, not as intended --- intelmq/lib/bot.py | 1 - intelmq/tests/lib/test_parser_bot.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 3b57f18c4..d5d835142 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1008,7 +1008,6 @@ def process_message(self, message: Optional[libmessage.Message] = None): # use a appropriate default message type, not requiring __type keys in the message if not isinstance(message, libmessage.Message) and isinstance(message, dict): message = libmessage.MessageFactory.from_dict(message=message, harmonization=self.harmonization, default_type=self._default_message_type) - print('destination_queues', repr(self.destination_queues)) # messages is a tuple, the pipeline can't pop from a tuple. convert to a list instead self.__source_pipeline.state[self.source_queue].append(message) # do not dump to file diff --git a/intelmq/tests/lib/test_parser_bot.py b/intelmq/tests/lib/test_parser_bot.py index 4859cda60..0ccf05813 100644 --- a/intelmq/tests/lib/test_parser_bot.py +++ b/intelmq/tests/lib/test_parser_bot.py @@ -185,7 +185,7 @@ def test_processed_messages_count(self): def test_processed_messages_seconds(self): self.input_message = EXAMPLE_SHORT self.run_bot(parameters={'log_processed_messages_count': 10, - 'log_processed_messages_seconds': 0}) + 'log_processed_messages_seconds': 0.1}) self.assertAnyLoglineEqual(message='Processed 2 messages since last logging.', levelname='INFO') From bea2794a3414a27e5f04c966c50f473bc3c79f4d Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 27 Apr 2023 00:35:15 +0200 Subject: [PATCH 12/29] compatibility with python < 3.9 --- intelmq/lib/bot.py | 28 ++++++++++++++++++++++++++++ intelmq/tests/lib/test_bot.py | 21 ++++++++++++++++----- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index d5d835142..662c0e1cf 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1464,3 +1464,31 @@ def export_event(self, event: libmessage.Event, class Parameters: pass +<<<<<<< HEAD +======= + + +if sys.version_info < (3, 9): + class Dict39(dict): + """ + Python 3.9 introduced the handy | operator for dicts. + Tor backwards-compatibility, this is the backport + as IntelMQ supports Python >= 3.7 + """ + def __or__(self, other): + """ + Create a new dictionary with the merged keys and values of d and other, which must both be dictionaries. The values of other take priority when d and other share keys. + """ + ret = self.copy() + ret.update(other) + return ret +else: + Dict39 = dict + + +BotLibSettings = Dict39({'logging_path': None, + 'source_pipeline_broker': 'Pythonlistsimple', + 'destination_pipeline_broker': 'Pythonlistsimple', + 'destination_queues': {'_default': 'output', + '_on_error': 'error'}}) +>>>>>>> abe70085f (compat) diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index 8eef18504..8040ac78e 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -85,11 +85,22 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, self._sent_messages.extend(messages) -BotLibSettings = {'logging_path': None, - 'source_pipeline_broker': 'Pythonlistsimple', - 'destination_pipeline_broker': 'Pythonlistsimple', - 'destination_queues': {'_default': 'output', - '_on_error': 'error'}} +class Dict39(dict): + """ + Python 3.9 introduced the | operator for dicts + for backwards-compatibility, this is the backport + """ + def or_(self, a, b): + ret = a.copy() + a.update(b) + return ret + + +BotLibSettings = Dict39({'logging_path': None, + 'source_pipeline_broker': 'Pythonlistsimple', + 'destination_pipeline_broker': 'Pythonlistsimple', + 'destination_queues': {'_default': 'output', + '_on_error': 'error'}}) class TestBotAsLibrary(unittest.TestCase): From d16f6dab572111675037d7779e1bf4fc2e9a8cae Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 27 Apr 2023 20:31:49 +0200 Subject: [PATCH 13/29] lib/message: if harm config not found, load internal defaults if Message instance has no harmonization given as parameter, it loads the harm from the configuration file if the configuration file does not exist, e.g. when IntelMQ is run in library-mode, automatically load the internal (default) harmonization configuration --- intelmq/lib/message.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/intelmq/lib/message.py b/intelmq/lib/message.py index ed1b7ff67..2b116bb6b 100644 --- a/intelmq/lib/message.py +++ b/intelmq/lib/message.py @@ -14,6 +14,7 @@ import warnings from collections import defaultdict from typing import Any, Dict, Iterable, Optional, Sequence, Union +from pkg_resources import resource_filename import intelmq.lib.exceptions as exceptions import intelmq.lib.harmonization @@ -105,7 +106,11 @@ def __init__(self, message: Union[dict, tuple] = (), auto: bool = False, classname = self.__class__.__name__.lower() if harmonization is None: - harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + try: + harmonization = utils.load_configuration(HARMONIZATION_CONF_FILE) + except ValueError: + # Fallback to internal harmonization file + harmonization = utils.load_configuration(resource_filename('intelmq', 'etc/harmonization.conf')) try: self.harmonization_config = harmonization[classname] except KeyError: From c3aac4878876c1141015b9119bbaa26d7f8665ca Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 27 Apr 2023 21:05:36 +0200 Subject: [PATCH 14/29] lib/bot: default library-mode settings, compatibility --- intelmq/lib/bot.py | 3 --- intelmq/tests/lib/test_bot.py | 19 +------------------ 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 662c0e1cf..8db2279c6 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1464,8 +1464,6 @@ def export_event(self, event: libmessage.Event, class Parameters: pass -<<<<<<< HEAD -======= if sys.version_info < (3, 9): @@ -1491,4 +1489,3 @@ def __or__(self, other): 'destination_pipeline_broker': 'Pythonlistsimple', 'destination_queues': {'_default': 'output', '_on_error': 'error'}}) ->>>>>>> abe70085f (compat) diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index 8040ac78e..d424ddd4e 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -12,6 +12,7 @@ import intelmq.lib.test as test from intelmq.tests.lib import test_parser_bot from intelmq.lib.message import MessageFactory, Message +from intelmq.lib.bot import BotLibSettings class TestDummyParserBot(test.BotTestCase, unittest.TestCase): @@ -85,24 +86,6 @@ def send_message(self, *messages, path: str = "_default", auto_add=None, self._sent_messages.extend(messages) -class Dict39(dict): - """ - Python 3.9 introduced the | operator for dicts - for backwards-compatibility, this is the backport - """ - def or_(self, a, b): - ret = a.copy() - a.update(b) - return ret - - -BotLibSettings = Dict39({'logging_path': None, - 'source_pipeline_broker': 'Pythonlistsimple', - 'destination_pipeline_broker': 'Pythonlistsimple', - 'destination_queues': {'_default': 'output', - '_on_error': 'error'}}) - - class TestBotAsLibrary(unittest.TestCase): def assertMessageEqual(self, actual, expected): """ From 3c9aba10ce6d116f2fdfad606aaf9becf751ee95 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 28 Apr 2023 16:47:38 +0200 Subject: [PATCH 15/29] enhancements for library mode --- intelmq/lib/bot.py | 6 ++++-- intelmq/lib/pipeline.py | 1 - intelmq/tests/lib/test_bot.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 8db2279c6..ed1268138 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -100,7 +100,7 @@ class Bot: log_processed_messages_count: int = 500 log_processed_messages_seconds: int = 900 logging_handler: str = "file" - logging_level: str = 'DEBUG' # DEFAULT_LOGGING_LEVEL + logging_level: str = DEFAULT_LOGGING_LEVEL logging_path: str = DEFAULT_LOGGING_PATH logging_syslog: str = "/dev/log" process_manager: str = "intelmq" @@ -572,6 +572,8 @@ def stop(self, exitcode: int = 1): if not getattr(self, 'testing', False) and self._standalone: sys.exit(exitcode) + if not self._standalone: # library-mode + raise ValueError('Bot shutdown. See error messages in logs for details.') def __print_log_buffer(self): for level, message in self.__log_buffer: @@ -800,7 +802,7 @@ def __load_configuration(self): except ValueError: if not self._standalone: self.__log('info', 'Could not load runtime configuration file. ' - 'Continuing, as settings are provided as parameter in library-mode.') + 'Continuing, as we in library-mode.') self.__runtime_settings = {} else: raise diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index b6f9e63c1..4319eb260 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -444,7 +444,6 @@ def send(self, message: Message, path_permissive defaults to true as opposed to the other pipelines! """ - print(f'Pythonlistsimple, send: {message!r}') if path not in self.destination_queues and path_permissive: return diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index d424ddd4e..abbe58adb 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -134,7 +134,7 @@ def test_dummy_pythonlist(self): def test_domain_suffix(self): from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot - domain_suffix = DomainSuffixExpertBot('domain-suffix', # bot id + domain_suffix = DomainSuffixExpertBot('domain-suffix', settings=BotLibSettings | {'field': 'fqdn', 'suffix_file': '/usr/share/publicsuffix/public_suffix_list.dat'}) queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) From 68627fa2a95ce41a16b960b225413eb365e1f545 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 28 Apr 2023 17:15:21 +0200 Subject: [PATCH 16/29] doc: add library functionality --- docs/dev/library.rst | 67 ++++++++++++++++++++++++++++++++++++++++++++ docs/index.rst | 1 + 2 files changed, 68 insertions(+) create mode 100644 docs/dev/library.rst diff --git a/docs/dev/library.rst b/docs/dev/library.rst new file mode 100644 index 000000000..e7826923e --- /dev/null +++ b/docs/dev/library.rst @@ -0,0 +1,67 @@ +.. + SPDX-FileCopyrightText: 2023 Bundesamt für Sicherheit in der Informationstechnik (BSI) + SPDX-License-Identifier: AGPL-3.0-or-later + +########################## +Running IntelMQ as Library +########################## + +.. contents:: + +************ +Introduction +************ + +The feature is specified in `IEP007 `_. + +********** +Quickstart +********** + +First, import the Python module and a helper. More about the ``BotLibSettings`` later. + +.. code-block:: python + + from intelmq.lib.bot import BotLibSettings + from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot + +Then we need to initialize the bot's instance. +We pass two parameters: +* ``bot_id``: The id of the bot +* ``settings``: A Python dictionary of runtime configuration parameters, see :ref:`runtime-configuration`. + The bot first loads the runtime configuration file if it exists. + Then we update them with the ``BotLibSettings`` which are some accumulated settings disabling the logging to files and configure the pipeline so that we can send and receive messages directly to/from the bot. + Last by not least, the actual bot parameters, taking the highest priority. + +.. code-block:: python + + domain_suffix = DomainSuffixExpertBot('domain-suffix', # bot id + settings=BotLibSettings | { + 'field': 'fqdn', + 'suffix_file': '/usr/share/publicsuffix/public_suffix_list.dat'} + +As the bot is not fully initialized, we can process messages now. +Inserting a message as dictionary: + +.. code-block:: python + + queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) + +The return value is a dictionary of queues, e.g. the output queue and the error queue. +More details below. + + +Select the output queue (as defined in `destination_queues`), first message, access the field 'source.domain_suffix': + +.. code-block:: python + + >>> output['output'][0]['source.domain_suffix'] + 'com' + +************* +Configuration +************* + +Configuration files are not required to run IntelMQ as library. +Contrary to IntelMQ normal behavior, if the files ``runtime.yaml`` and ``harmonization.conf`` do not exist, IntelMQ won't raise any errors. +For the harmonization configuration, internal defaults are loaded. diff --git a/docs/index.rst b/docs/index.rst index 3990b4592..1fbf7aafe 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -74,6 +74,7 @@ Getting involved :maxdepth: 1 dev/guide + dev/library dev/data-format dev/harmonization-fields dev/release-procedure From 174c4b3282452124c07cd38fea504ee4511f26c4 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 28 Apr 2023 19:28:21 +0200 Subject: [PATCH 17/29] lib/bot: fix stop() for library and test modes --- intelmq/lib/bot.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index ed1268138..44cad5640 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -572,7 +572,9 @@ def stop(self, exitcode: int = 1): if not getattr(self, 'testing', False) and self._standalone: sys.exit(exitcode) - if not self._standalone: # library-mode + + # in library-mode raise an error if e.g. initialization failed + if exitcode != 0 and not self._standalone and not getattr(self, 'testing', False): raise ValueError('Bot shutdown. See error messages in logs for details.') def __print_log_buffer(self): From 4fb980a9e934cec4ee84e375dc34f3e1f503161f Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 3 May 2023 15:29:40 +0200 Subject: [PATCH 18/29] lib/bot: do not drop privs in library mode only in standalone mode, drop privileges In library mode, the calling user can vary, we must not change their user --- intelmq/lib/bot.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 44cad5640..6c2686a8e 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -159,7 +159,10 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, f'{self.__class__.__name__} initialized with id {bot_id} and intelmq {__version__}' f' and python {version_info} as process {os.getpid()}.') self.__log('debug', f'Library path: {__file__!r}.') - if not utils.drop_privileges(): + + # in standalone mode, drop privileges + # In library mode, the calling user can vary, we must not change their user + if self._standalone and not utils.drop_privileges(): raise ValueError('IntelMQ must not run as root. Dropping privileges did not work.') self.__bot_id_full, self.__bot_id, self.__instance_id = self.__check_bot_id(bot_id) From aa77a857f0305d2535cc6725c7a64e3589e34e80 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 3 May 2023 17:29:13 +0200 Subject: [PATCH 19/29] lib/bot: only handle signals in standalone mode in library-mode, handle no signals to not interfere with the caller --- intelmq/lib/bot.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 6c2686a8e..faa7d46c9 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -185,6 +185,9 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, broker = self.source_pipeline_broker.title() if broker != 'Amqp': self._is_multithreadable = False + # multithreading is not (yet) available in library-mode + elif not self._standalone: + self._is_multithreadable = False """ Multithreading """ if (self.instances_threads > 1 and not self.is_multithreaded and @@ -235,7 +238,9 @@ def handle_sighup_signal_threading(signum: int, self.__reset_total_path_stats() self.init() - if not self.__instance_id: + # only the main thread registers the signal handlers + # in library-mode, handle no signals to not interfere with the caller + if not self.__instance_id and self._standalone: self.__sighup = threading.Event() signal.signal(signal.SIGHUP, self.__handle_sighup_signal) # system calls should not be interrupted, but restarted @@ -293,7 +298,7 @@ def __handle_sighup(self): """ Handle SIGHUP. """ - if not self.__sighup.is_set(): + if not self.__sighup or not self.__sighup.is_set(): return False self.logger.info('Handling SIGHUP, initializing again now.') self.__disconnect_pipelines() @@ -707,7 +712,7 @@ def receive_message(self) -> libmessage.Message: # * handle a sighup which happened during blocking read # * re-queue the message before reloading # https://github.com/certtools/intelmq/issues/1438 - if self.__sighup.is_set(): + if self.__sighup and self.__sighup.is_set(): self.__source_pipeline.reject_message() self.__handle_sighup() return self.receive_message() From 2a861d5d5111bb71bad5dd71f120e6a6ef7e3524 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 4 May 2023 15:37:03 +0200 Subject: [PATCH 20/29] bug: lib/bot: create copy of settings before modifying them the bots may modify the values of the dict (the settings), mainly at initialization for initializing objects to speed up the processing but these modified objects should not end up the the caller's scope, so we create a deep copy here --- intelmq/lib/bot.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index faa7d46c9..d6eb6efa4 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -27,6 +27,7 @@ import types import warnings from collections import defaultdict +from copy import deepcopy from datetime import datetime, timedelta from typing import Any, List, Optional, Union, Tuple from pkg_resources import resource_filename @@ -139,7 +140,8 @@ def __init__(self, bot_id: str, start: bool = False, sighup_event=None, self.__destination_pipeline: Optional[Pipeline] = None self.logger = None if settings is not None: - self.__settings = settings + # make a copy of the settings dict, to no modify the original values of the caller + self.__settings = deepcopy(settings) if source_queue is not None: self.source_queue = source_queue self._standalone = standalone From fe20e7a1e33eb3d3aa60f4986a53f7a71e9051d7 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 4 May 2023 23:04:49 +0200 Subject: [PATCH 21/29] tst: add tests for library mode --- intelmq/tests/lib/test_bot_library_mode.py | 65 ++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 intelmq/tests/lib/test_bot_library_mode.py diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py new file mode 100644 index 000000000..54d09a09a --- /dev/null +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: 2023 by Bundesamt für Sicherheit in der Informationstechnik (BSI) +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# -*- coding: utf-8 -*- +""" +Copyright (c) 2023 by Bundesamt für Sicherheit in der Informationstechnik (BSI) + +Software engineering by BSI & Intevation GmbH + +This tests IntelMQ bots in library mode (IEP007) +""" +import unittest + +from intelmq.lib.bot import Dict39, BotLibSettings + +from intelmq.bots.experts.url.expert import URLExpertBot +from intelmq.bots.experts.taxonomy.expert import TaxonomyExpertBot +from intelmq.bots.experts.jinja.expert import JinjaExpertBot + +EXAMPLE_DATA_URL = Dict39({'source.url': 'http://example.com/'}) +BOT_CONFIG_JINJA_FAILING = Dict39({ + 'fields': { + 'feed.url': "{{ error! msg['source.fqdn'] | upper }}" + } +}) + + +def test_url_expert(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues = url_expert.process_message(EXAMPLE_DATA_URL.copy()) + del url_expert + print(0, queues) + assert queues['output'] == [EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', + 'source.port': 80, + 'source.urlpath': '/', + 'protocol.application': 'http', + 'protocol.transport': 'tcp'}] + + +def test_url_and_taxonomy(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues_url = url_expert.process_message(EXAMPLE_DATA_URL.copy()) + del url_expert + message = queues_url['output'][0] + taxonomy_expert = TaxonomyExpertBot('taxonomy', settings=BotLibSettings) + queues = taxonomy_expert.process_message(message) + assert queues['output'] == [EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', + 'source.port': 80, + 'source.urlpath': '/', + 'protocol.application': 'http', 'protocol.transport': 'tcp', + 'classification.taxonomy': 'other', 'classification.type': 'undetermined'}] + + +def test_bot_exception_import(): + """ + When a bot raises an exception during Bot initialization + """ + try: + JinjaExpertBot('jinja', settings=BotLibSettings | BOT_CONFIG_JINJA_FAILING) + except: + pass + + +if __name__ == '__main__': # pragma: no cover + unittest.main() From d27e613fd1961fd59a0fb27137a822172fd6fbc2 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Tue, 9 May 2023 15:53:01 +0200 Subject: [PATCH 22/29] Bot.process_message: support for multiple input messages --- docs/dev/library.rst | 7 ++++ intelmq/lib/bot.py | 45 ++++++++++++++++------ intelmq/lib/pipeline.py | 5 +++ intelmq/tests/lib/test_bot_library_mode.py | 25 ++++++------ 4 files changed, 60 insertions(+), 22 deletions(-) diff --git a/docs/dev/library.rst b/docs/dev/library.rst index e7826923e..8553e79f8 100644 --- a/docs/dev/library.rst +++ b/docs/dev/library.rst @@ -50,6 +50,13 @@ Inserting a message as dictionary: The return value is a dictionary of queues, e.g. the output queue and the error queue. More details below. +The methods accepts multiple messages as positional argument: + +.. code-block:: python + + domain_suffix.process_message({'source.fqdn': 'www.example.com'}, {'source.fqdn': 'www.example.net'}) + domain_suffix.process_message(*[{'source.fqdn': 'www.example.com'}, {'source.fqdn': 'www.example.net'}]) + Select the output queue (as defined in `destination_queues`), first message, access the field 'source.domain_suffix': diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index d6eb6efa4..dc3565ef1 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1013,23 +1013,46 @@ def _create_argparser(cls): argparser.add_argument('bot_id', nargs='?', metavar='BOT-ID', help='unique bot-id of your choosing') return argparser - def process_message(self, message: Optional[libmessage.Message] = None): + def process_message(self, *messages: Union[libmessage.Message, dict]): + """ + Call the bot's process method with a prepared source queue. + Return value is a dict with the complete pipeline state. + Multiple messages can be given as positional argument. + The pipeline needs to be configured accordinglit with BotLibSettings, + see https://intelmq.readthedocs.io/en/develop/dev/library.html + + Access the output queue e.g. with return_value['output'] + """ + print('process_message, destination state:', self.__destination_pipeline.state, 'self.destination_queues', self.destination_queues) if self.bottype == BotType.COLLECTOR: - if message: - raise exceptions.InvalidArgument('Collector Bots take not messages as processing input') + if messages: + raise exceptions.InvalidArgument('Collector Bots take no messages as processing input') else: - # convert to Message object, it the input is a dict - # use a appropriate default message type, not requiring __type keys in the message - if not isinstance(message, libmessage.Message) and isinstance(message, dict): - message = libmessage.MessageFactory.from_dict(message=message, harmonization=self.harmonization, default_type=self._default_message_type) - # messages is a tuple, the pipeline can't pop from a tuple. convert to a list instead - self.__source_pipeline.state[self.source_queue].append(message) + # reset source queue + self.__source_pipeline.state[self.source_queue] = [] + for message in messages: + # convert to Message objects, it the message is a dict + # use an appropriate default message type, not requiring __type keys in the message + if not isinstance(message, libmessage.Message) and isinstance(message, dict): + message = libmessage.MessageFactory.from_dict(message=message, + harmonization=self.harmonization, + default_type=self._default_message_type) + self.__source_pipeline.state[self.source_queue].append(message) # do not dump to file self.error_dump_message = False # do not serialize messages to strings, keep the objects self.__pipeline_serialize_messages = False - self.process() - return self.__destination_pipeline.state + + # process all input messages + while self.__source_pipeline.state[self.source_queue]: + self.logger.info('source queue: %r', self.__source_pipeline.state[self.source_queue]) + self.process() + + # clear destination state, before make a copy for return + state = self.__destination_pipeline.state.copy() + self.__destination_pipeline.clear_all_queues() + + return state class ParserBot(Bot): diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 4319eb260..5159c7bc5 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -416,6 +416,11 @@ def clear_queue(self, queue): """ Empties given queue. """ self.state[queue] = [] + def clear_all_queues(self): + """ Empties all queues / state """ + for queue in self.state: + self.state[queue] = [] + def _reject_message(self): """ No-op because of the internal queue diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py index 54d09a09a..0e4fbefa9 100644 --- a/intelmq/tests/lib/test_bot_library_mode.py +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -18,6 +18,11 @@ from intelmq.bots.experts.jinja.expert import JinjaExpertBot EXAMPLE_DATA_URL = Dict39({'source.url': 'http://example.com/'}) +EXAMPLE_DATA_URL_OUT = EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', + 'source.port': 80, + 'source.urlpath': '/', + 'protocol.application': 'http', + 'protocol.transport': 'tcp'} BOT_CONFIG_JINJA_FAILING = Dict39({ 'fields': { 'feed.url': "{{ error! msg['source.fqdn'] | upper }}" @@ -29,12 +34,7 @@ def test_url_expert(): url_expert = URLExpertBot('url', settings=BotLibSettings) queues = url_expert.process_message(EXAMPLE_DATA_URL.copy()) del url_expert - print(0, queues) - assert queues['output'] == [EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', - 'source.port': 80, - 'source.urlpath': '/', - 'protocol.application': 'http', - 'protocol.transport': 'tcp'}] + assert queues['output'] == [EXAMPLE_DATA_URL_OUT] def test_url_and_taxonomy(): @@ -44,11 +44,7 @@ def test_url_and_taxonomy(): message = queues_url['output'][0] taxonomy_expert = TaxonomyExpertBot('taxonomy', settings=BotLibSettings) queues = taxonomy_expert.process_message(message) - assert queues['output'] == [EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', - 'source.port': 80, - 'source.urlpath': '/', - 'protocol.application': 'http', 'protocol.transport': 'tcp', - 'classification.taxonomy': 'other', 'classification.type': 'undetermined'}] + assert queues['output'] == [Dict39(EXAMPLE_DATA_URL_OUT) | {'classification.taxonomy': 'other', 'classification.type': 'undetermined'}] def test_bot_exception_import(): @@ -61,5 +57,12 @@ def test_bot_exception_import(): pass +def test_bot_multi_message(): + url_expert = URLExpertBot('url', settings=BotLibSettings) + queues = url_expert.process_message(EXAMPLE_DATA_URL.copy(), EXAMPLE_DATA_URL.copy()) + del url_expert + assert queues['output'] == [EXAMPLE_DATA_URL_OUT] * 2 + + if __name__ == '__main__': # pragma: no cover unittest.main() From 1442e6bfcf48c58f8888765230973a4dffb67bfb Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Thu, 11 May 2023 12:24:31 +0200 Subject: [PATCH 23/29] move Dict39 to lib/datatypes --- intelmq/lib/bot.py | 20 +------------------- intelmq/lib/datatypes.py | 24 +++++++++++++++++++++--- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index dc3565ef1..55a99a76c 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -40,7 +40,7 @@ from intelmq.lib import cache, exceptions, utils from intelmq.lib.pipeline import PipelineFactory, Pipeline from intelmq.lib.utils import RewindableFileHandle, base64_decode -from intelmq.lib.datatypes import BotType +from intelmq.lib.datatypes import BotType, Dict39 __all__ = ['Bot', 'CollectorBot', 'ParserBot', 'OutputBot', 'ExpertBot'] ALLOWED_SYSTEM_PARAMETERS = {'enabled', 'run_mode', 'group', 'description', 'module', 'name'} @@ -1503,24 +1503,6 @@ class Parameters: pass -if sys.version_info < (3, 9): - class Dict39(dict): - """ - Python 3.9 introduced the handy | operator for dicts. - Tor backwards-compatibility, this is the backport - as IntelMQ supports Python >= 3.7 - """ - def __or__(self, other): - """ - Create a new dictionary with the merged keys and values of d and other, which must both be dictionaries. The values of other take priority when d and other share keys. - """ - ret = self.copy() - ret.update(other) - return ret -else: - Dict39 = dict - - BotLibSettings = Dict39({'logging_path': None, 'source_pipeline_broker': 'Pythonlistsimple', 'destination_pipeline_broker': 'Pythonlistsimple', diff --git a/intelmq/lib/datatypes.py b/intelmq/lib/datatypes.py index 7458b98ea..a4e57ecfe 100644 --- a/intelmq/lib/datatypes.py +++ b/intelmq/lib/datatypes.py @@ -1,13 +1,13 @@ -# SPDX-FileCopyrightText: 2021 Birger Schacht +# SPDX-FileCopyrightText: 2021 Birger Schacht, 2023 Bundesamt für Sicherheit in der Informationstechnik (BSI) # # SPDX-License-Identifier: AGPL-3.0-or-later -from datetime import datetime from enum import Enum from inspect import signature +from sys import version_info from typing import Optional, Callable, Union, List +from datetime import datetime from termstyle import green - from intelmq.lib.exceptions import InvalidArgument from intelmq.lib.harmonization import DateTime @@ -137,3 +137,21 @@ def validate(value: str) -> [Callable, Optional[str]]: expected=conversion_name) return conversion, format_string + + +if version_info < (3, 9): + class Dict39(dict): + """ + Python 3.9 introduced the handy | operator for dicts. + For backwards-compatibility, this is the backport + as IntelMQ supports Python >= 3.7 + """ + def __or__(self, other: dict) -> 'Dict39': + """ + Create a new dictionary with the merged keys and values of d and other, which must both be dictionaries. The values of other take priority when d and other share keys. + """ + ret = Dict39(self.copy()) + ret.update(other) + return ret +else: + Dict39 = dict From 28aed7865872dbc15a406acca6d2e94651b3b593 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 15 May 2023 17:32:26 +0200 Subject: [PATCH 24/29] bug: lib/bot_debugger: call bots with standalone flag to raise systemexit exceptions at the bot's processing end --- intelmq/lib/bot_debugger.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/intelmq/lib/bot_debugger.py b/intelmq/lib/bot_debugger.py index 4e37de54c..4853bebf4 100644 --- a/intelmq/lib/bot_debugger.py +++ b/intelmq/lib/bot_debugger.py @@ -65,7 +65,9 @@ def __init__(self, runtime_configuration, bot_id, run_subcommand=None, console_t # Set's the bot's default and initial value for the logging_level to the value we want bot.logging_level = self.logging_level - self.instance = bot(bot_id, disable_multithreading=True) + self.instance = bot(bot_id, disable_multithreading=True, + standalone=True, # instruct the bot to call SystemExit exception at the end or in case of errors + ) def run(self) -> str: if not self.run_subcommand: @@ -221,4 +223,5 @@ def pprint(msg) -> str: def __del__(self): # prevents a SystemExit Exception at object deletion # remove once PR#2358 (library mode) is merged - setattr(self.instance, 'testing', True) + if self.instance: + setattr(self.instance, 'testing', True) From 2dcf46d350520395e8967664eb3edcd6062b2459 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 24 May 2023 17:13:39 +0200 Subject: [PATCH 25/29] tst: move all library-related tests to the dedicated test file --- intelmq/tests/lib/test_bot.py | 63 ---------------------- intelmq/tests/lib/test_bot_library_mode.py | 55 +++++++++++++++++-- 2 files changed, 51 insertions(+), 67 deletions(-) diff --git a/intelmq/tests/lib/test_bot.py b/intelmq/tests/lib/test_bot.py index abbe58adb..0470ec8c2 100644 --- a/intelmq/tests/lib/test_bot.py +++ b/intelmq/tests/lib/test_bot.py @@ -6,13 +6,10 @@ """ Tests the Bot class itself. """ -import json import unittest import intelmq.lib.test as test from intelmq.tests.lib import test_parser_bot -from intelmq.lib.message import MessageFactory, Message -from intelmq.lib.bot import BotLibSettings class TestDummyParserBot(test.BotTestCase, unittest.TestCase): @@ -81,65 +78,5 @@ def test_invalid_value_on_input_message(self): self.assertEqual(self.pipe.state['test-bot-output'], []) -def send_message(self, *messages, path: str = "_default", auto_add=None, - path_permissive: bool = False): - self._sent_messages.extend(messages) - - -class TestBotAsLibrary(unittest.TestCase): - def assertMessageEqual(self, actual, expected): - """ - Compare two messages as dicts. - """ - if isinstance(actual, Message): - actual = actual.to_dict(with_type=True) - else: - actual = actual.copy() - - if isinstance(expected, Message): - expected = expected.to_dict(with_type=True) - else: - expected = expected.copy() - - if 'time.observation' in actual: - del actual['time.observation'] - if 'time.observation' in expected: - del expected['time.observation'] - if 'output' in actual: - actual['output'] = json.loads(actual['output']) - if 'output' in expected: - expected['output'] = json.loads(expected['output']) - - self.assertDictEqual(actual, expected) - - """def test_dummy_mocked(self): - bot = test_parser_bot.DummyParserBot('dummy-bot', settings={'global': {'logging_path': None, 'skip_pipeline': True, 'broker': 'pythonlist'}, 'dummy-bot': {}}) - bot._Bot__current_message = MessageFactory.from_dict(test_parser_bot.EXAMPLE_REPORT) - bot._Bot__connect_pipelines = lambda self: None - bot._sent_messages = [] - bot._dumped_messages = [] - bot.send_message = send_message.__get__(bot, test_parser_bot.DummyParserBot) - bot._dump_message = _dump_message.__get__(bot, test_parser_bot.DummyParserBot) - bot.process() - assert bot._sent_messages == [MessageFactory.from_dict(test_parser_bot.EXAMPLE_EVENT)] - assert bot._dumped_messages[0][1] == test_parser_bot.EXPECTED_DUMP[0] - assert bot._dumped_messages[1][1] == test_parser_bot.EXPECTED_DUMP[1]""" - - def test_dummy_pythonlist(self): - bot = test_parser_bot.DummyParserBot('dummy-bot', settings=BotLibSettings) - sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT.copy()) - self.assertMessageEqual(sent_messages['output'][0], test_parser_bot.EXAMPLE_EVENT) - self.assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0].copy(), default_type='Report')) - self.assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1].copy(), default_type='Report')) - - def test_domain_suffix(self): - from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot - domain_suffix = DomainSuffixExpertBot('domain-suffix', - settings=BotLibSettings | {'field': 'fqdn', - 'suffix_file': '/usr/share/publicsuffix/public_suffix_list.dat'}) - queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) - assert queues['output'][0]['source.domain_suffix'] == 'com' - - if __name__ == '__main__': # pragma: no cover unittest.main() diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py index 0e4fbefa9..38908cf6b 100644 --- a/intelmq/tests/lib/test_bot_library_mode.py +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -9,13 +9,18 @@ This tests IntelMQ bots in library mode (IEP007) """ +import json import unittest +from os.path import dirname, join -from intelmq.lib.bot import Dict39, BotLibSettings - -from intelmq.bots.experts.url.expert import URLExpertBot -from intelmq.bots.experts.taxonomy.expert import TaxonomyExpertBot +import intelmq.tests.bots.experts.domain_suffix.test_expert as domain_suffix_expert_test +from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot from intelmq.bots.experts.jinja.expert import JinjaExpertBot +from intelmq.bots.experts.taxonomy.expert import TaxonomyExpertBot +from intelmq.bots.experts.url.expert import URLExpertBot +from intelmq.lib.bot import BotLibSettings, Dict39 +from intelmq.lib.message import Message, MessageFactory +from intelmq.tests.lib import test_parser_bot EXAMPLE_DATA_URL = Dict39({'source.url': 'http://example.com/'}) EXAMPLE_DATA_URL_OUT = EXAMPLE_DATA_URL | {'source.fqdn': 'example.com', @@ -30,6 +35,48 @@ }) +def assertMessageEqual(actual, expected): + """ + Compare two messages as dicts. + """ + if isinstance(actual, Message): + actual = actual.to_dict(with_type=True) + else: + actual = actual.copy() + + if isinstance(expected, Message): + expected = expected.to_dict(with_type=True) + else: + expected = expected.copy() + + if 'time.observation' in actual: + del actual['time.observation'] + if 'time.observation' in expected: + del expected['time.observation'] + if 'output' in actual: + actual['output'] = json.loads(actual['output']) + if 'output' in expected: + expected['output'] = json.loads(expected['output']) + + assert actual == expected + + +def test_dummy_parser_bot(): + bot = test_parser_bot.DummyParserBot('dummy-bot', settings=BotLibSettings) + sent_messages = bot.process_message(test_parser_bot.EXAMPLE_REPORT.copy()) + assertMessageEqual(sent_messages['output'][0], test_parser_bot.EXAMPLE_EVENT) + assertMessageEqual(sent_messages['error'][0], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[0].copy(), default_type='Report')) + assertMessageEqual(sent_messages['error'][1], MessageFactory.from_dict(test_parser_bot.EXPECTED_DUMP[1].copy(), default_type='Report')) + + +def test_domain_suffix(): + domain_suffix = DomainSuffixExpertBot('domain-suffix', + settings=BotLibSettings | {'field': 'fqdn', + 'suffix_file': join(dirname(domain_suffix_expert_test.__file__), 'public_suffix_list.dat')}) + queues = domain_suffix.process_message({'source.fqdn': 'www.example.com'}) + assert queues['output'][0]['source.domain_suffix'] == 'example.com' + + def test_url_expert(): url_expert = URLExpertBot('url', settings=BotLibSettings) queues = url_expert.process_message(EXAMPLE_DATA_URL.copy()) From a0d7aa751578ec9234f43e9b731e4311e77cc014 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 24 May 2023 17:21:39 +0200 Subject: [PATCH 26/29] tst: test_bot_library: simpler tests for failing init --- intelmq/tests/lib/test_bot_library_mode.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py index 38908cf6b..0d3f90880 100644 --- a/intelmq/tests/lib/test_bot_library_mode.py +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -12,13 +12,13 @@ import json import unittest from os.path import dirname, join +from pytest import raises import intelmq.tests.bots.experts.domain_suffix.test_expert as domain_suffix_expert_test from intelmq.bots.experts.domain_suffix.expert import DomainSuffixExpertBot -from intelmq.bots.experts.jinja.expert import JinjaExpertBot from intelmq.bots.experts.taxonomy.expert import TaxonomyExpertBot from intelmq.bots.experts.url.expert import URLExpertBot -from intelmq.lib.bot import BotLibSettings, Dict39 +from intelmq.lib.bot import BotLibSettings, Dict39, ExpertBot from intelmq.lib.message import Message, MessageFactory from intelmq.tests.lib import test_parser_bot @@ -28,11 +28,11 @@ 'source.urlpath': '/', 'protocol.application': 'http', 'protocol.transport': 'tcp'} -BOT_CONFIG_JINJA_FAILING = Dict39({ - 'fields': { - 'feed.url': "{{ error! msg['source.fqdn'] | upper }}" - } -}) + + +class BrokenInitExpertBot(ExpertBot): + def init(self): + raise ValueError('This initialization intionally raises an error!') def assertMessageEqual(actual, expected): @@ -94,14 +94,12 @@ def test_url_and_taxonomy(): assert queues['output'] == [Dict39(EXAMPLE_DATA_URL_OUT) | {'classification.taxonomy': 'other', 'classification.type': 'undetermined'}] -def test_bot_exception_import(): +def test_bot_exception_init(): """ When a bot raises an exception during Bot initialization """ - try: - JinjaExpertBot('jinja', settings=BotLibSettings | BOT_CONFIG_JINJA_FAILING) - except: - pass + with raises(ValueError): + BrokenInitExpertBot('broken', settings=BotLibSettings) def test_bot_multi_message(): From 523b8babcc77ff93c25a4ba22ddc0d05fb00e79d Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Wed, 24 May 2023 17:42:14 +0200 Subject: [PATCH 27/29] pkg: add pytest to debian build requirements --- debian/control | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/debian/control b/debian/control index f77640cf6..4e71ade05 100644 --- a/debian/control +++ b/debian/control @@ -22,7 +22,8 @@ Build-Depends: debhelper (>= 4.1.16), python3-tz, quilt, rsync, - safe-rm + safe-rm, + python3-pytest-cov X-Python3-Version: >= 3.7 Standards-Version: 3.9.6 Homepage: https://github.com/certtools/intelmq/ From 360d058f95059ea75519235ff400d9d6380839b2 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 9 Jun 2023 10:59:26 +0200 Subject: [PATCH 28/29] lib/bot: remove debug logging statement --- intelmq/lib/bot.py | 1 - 1 file changed, 1 deletion(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index 55a99a76c..f45f3b7be 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1045,7 +1045,6 @@ def process_message(self, *messages: Union[libmessage.Message, dict]): # process all input messages while self.__source_pipeline.state[self.source_queue]: - self.logger.info('source queue: %r', self.__source_pipeline.state[self.source_queue]) self.process() # clear destination state, before make a copy for return From 7c1baed82cf129b96aa20220257291eb8f141a07 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Fri, 9 Jun 2023 17:41:05 +0200 Subject: [PATCH 29/29] bug: bot library mode: clear internal messages before run and add test before a message is processed in library mode, clear the internal queue of the pipeline and the cached message of the bot instance. in case of leftovers, which can happen when the process method raised exception, the leftover message would take precendence over any new messages and thus keep the bot in a endless loop of wrong errors this commit also adds a unittest to cover this behaviour --- intelmq/lib/bot.py | 6 +++- intelmq/lib/pipeline.py | 2 +- intelmq/tests/lib/test_bot_library_mode.py | 34 +++++++++++++++++++++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index f45f3b7be..5ad559050 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -1023,13 +1023,17 @@ def process_message(self, *messages: Union[libmessage.Message, dict]): Access the output queue e.g. with return_value['output'] """ - print('process_message, destination state:', self.__destination_pipeline.state, 'self.destination_queues', self.destination_queues) if self.bottype == BotType.COLLECTOR: if messages: raise exceptions.InvalidArgument('Collector Bots take no messages as processing input') else: # reset source queue self.__source_pipeline.state[self.source_queue] = [] + # reset internal queue + if self.__source_pipeline._has_message: + self.__source_pipeline.acknowledge() + self.__current_message = None + for message in messages: # convert to Message objects, it the message is a dict # use an appropriate default message type, not requiring __type keys in the message diff --git a/intelmq/lib/pipeline.py b/intelmq/lib/pipeline.py index 5159c7bc5..8d307cad1 100644 --- a/intelmq/lib/pipeline.py +++ b/intelmq/lib/pipeline.py @@ -15,7 +15,7 @@ import time from itertools import chain -from typing import Dict, Optional +from typing import Optional import ssl import redis diff --git a/intelmq/tests/lib/test_bot_library_mode.py b/intelmq/tests/lib/test_bot_library_mode.py index 0d3f90880..0b7f89a04 100644 --- a/intelmq/tests/lib/test_bot_library_mode.py +++ b/intelmq/tests/lib/test_bot_library_mode.py @@ -7,7 +7,7 @@ Software engineering by BSI & Intevation GmbH -This tests IntelMQ bots in library mode (IEP007) +This file tests IntelMQ bots in library mode (IEP007) """ import json import unittest @@ -28,6 +28,10 @@ 'source.urlpath': '/', 'protocol.application': 'http', 'protocol.transport': 'tcp'} +EXAMPLE_IP_INPUT = {"source.ip": "192.0.43.7", # icann.org. + "destination.ip": "192.0.43.8", # iana.org. + "time.observation": "2015-01-01T00:00:00+00:00", + } class BrokenInitExpertBot(ExpertBot): @@ -35,6 +39,21 @@ def init(self): raise ValueError('This initialization intionally raises an error!') +class RaisesOnFirstRunExpertBot(ExpertBot): + counter = 0 + + def init(self): + self.counter = 0 + + def process(self): + event = self.receive_message() + self.counter += 1 + if self.counter == 1: + raise ValueError('This initialization intionally raises an error!') + self.send_message(event) + self.acknowledge_message() + + def assertMessageEqual(actual, expected): """ Compare two messages as dicts. @@ -109,5 +128,18 @@ def test_bot_multi_message(): assert queues['output'] == [EXAMPLE_DATA_URL_OUT] * 2 +def test_bot_raises_and_second_message(): + """ + The first message raises an error and the second message + This test is based on an issue where the exception-raising message was not cleared from the internal message store of the Bot/Pipeline instance and thus re-used on the second run + """ + raises_on_first_run = RaisesOnFirstRunExpertBot('raises', settings=BotLibSettings) + with raises(ValueError): + raises_on_first_run.process_message(EXAMPLE_DATA_URL) + queues = raises_on_first_run.process_message(EXAMPLE_IP_INPUT) + assert len(queues['output']) == 1 + assertMessageEqual(queues['output'][0], EXAMPLE_IP_INPUT) + + if __name__ == '__main__': # pragma: no cover unittest.main()