From 5e90124ecfa85cc94d97c95c6a322c8a1200bb05 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 31 Aug 2023 10:03:19 +0200 Subject: [PATCH 01/12] ENH: Extend initdb script with separating raws --- intelmq/bin/intelmq_psql_initdb.py | 128 +++++++++++++++++++++++--- intelmq/tests/bin/test_psql_initdb.py | 118 +++++++++++++++++++++++- 2 files changed, 231 insertions(+), 15 deletions(-) diff --git a/intelmq/bin/intelmq_psql_initdb.py b/intelmq/bin/intelmq_psql_initdb.py index ff19107a7..d8101b814 100644 --- a/intelmq/bin/intelmq_psql_initdb.py +++ b/intelmq/bin/intelmq_psql_initdb.py @@ -10,11 +10,11 @@ The SQL file is saved in `/tmp/initdb.sql` or a temporary name if the other one exists. """ +import argparse import json import os import sys import tempfile -import argparse from intelmq import HARMONIZATION_CONF_FILE @@ -32,8 +32,104 @@ """ -def generate(harmonization_file=HARMONIZATION_CONF_FILE): +def _generate_events_schema(fields: dict) -> list: + sql_lines = [] + sql_lines.append("CREATE TABLE events (") + sql_lines.append(' "id" BIGSERIAL UNIQUE PRIMARY KEY,') + + for field, field_type in sorted(fields.items()): + sql_lines.append(f' "{field}" {field_type},') + + sql_lines[-1] = sql_lines[-1][:-1] # remove last ',' + sql_lines.append(");") + + for index in INDICES: + sql_lines.append('CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");'.format(index)) + return sql_lines + + +RAW_TABLE = """ +CREATE TABLE public.raws ( + event_id bigint, + raw text, + PRIMARY KEY(event_id), + CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE +); +""" + +RAW_TRIGGER = """ +CREATE TRIGGER tr_events + INSTEAD OF INSERT + ON public.v_events + FOR EACH ROW + EXECUTE FUNCTION public.process_v_events_insert(); +""" + + +def _generate_separated_raws_schema(fields: dict) -> list: + sorted_fields = sorted(key for key in fields.keys() if key != "raw") + sql_lines = ['-- Create the table holding only the "raw" values\n', RAW_TABLE] + + sql_lines.extend([ + '', + '-- Create the v_events view which joins the tables "events" and "raws"\n', + 'CREATE VIEW public.v_events AS', + ' SELECT', + ' events.id,', + ]) + for field in sorted_fields: + sql_lines.append(f' events."{field}",') + sql_lines.extend([ + ' raws."event_id",', + ' raws."raw"', + ' FROM (', + ' public.events', + ' JOIN public.raws ON ((events.id = raws.event_id)));' + ]) + + sql_lines.extend([ + '', + '-- Establish the INSERT trigger for the events table, splitting the data into events and raws', + '', + 'CREATE FUNCTION public.process_v_events_insert()', + ' RETURNS trigger', + ' LANGUAGE plpgsql', + ' AS $$', + ' DECLARE event_id integer;', + '', + ' BEGIN', + ' INSERT INTO', + ' events (', + ]) + for field in sorted_fields: + sql_lines.append(f' "{field}"{"," if field != sorted_fields[-1] else ""}') + sql_lines.extend([ + ' )', + ' VALUES', + ' (', + ]) + for field in sorted_fields: + sql_lines.append(f' NEW."{field}"{"," if field != sorted_fields[-1] else ""}') + sql_lines.extend([ + ' )', + ' RETURNING id INTO event_id;', + ' INSERT INTO', + ' raws ("event_id", "raw")', + ' VALUES', + ' (event_id, NEW.raw);', + ' RETURN NEW;', + ' END;', + '$$;' + ]) + + sql_lines.append(RAW_TRIGGER) + + return sql_lines + + +def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, separate_raws=False): FIELDS = {} + sql_lines = [] try: print("INFO - Reading %s file" % harmonization_file) @@ -75,17 +171,13 @@ def generate(harmonization_file=HARMONIZATION_CONF_FILE): FIELDS[field] = dbtype - initdb = """CREATE TABLE events ( - "id" BIGSERIAL UNIQUE PRIMARY KEY,""" - for field, field_type in sorted(FIELDS.items()): - initdb += f'\n "{field}" {field_type},' + if not skip_events: + sql_lines.extend(_generate_events_schema(FIELDS)) - initdb = initdb[:-1] # remove last ',' - initdb += "\n);\n" + if separate_raws: + sql_lines.extend(_generate_separated_raws_schema(FIELDS)) - for index in INDICES: - initdb += 'CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");\n'.format(index) - return initdb + return "\n".join(sql_lines) def main(): @@ -97,6 +189,16 @@ def main(): help='Defines the Ouputfile', default='/tmp/initdb.sql' ) + parser.add_argument("--no-events", action="store_true", default=False, + help="Skip generating the events table schema") + parser.add_argument("--separate-raws", action="store_true", default=False, + help="Generate v_events view to separate raw field from the rest of the data on insert") + parser.add_argument("--partition-field", default=None, + help="Add given field to all generated indexes. Useful when utilizing partitioning for TimescaleDB") + parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE, + help="Path to the harmonization file") + parser.add_argument("--if-not-exists", default=False, + help="Add IF NOT EXISTS directive to created schemas") args = parser.parse_args() OUTPUTFILE = args.outputfile @@ -109,7 +211,9 @@ def main(): fp = os.fdopen(os_fp, 'wt') else: fp = open(OUTPUTFILE, 'w') - psql = generate() + psql = generate(args.harmonization, + skip_events=args.no_events, + separate_raws=args.separate_raws) print("INFO - Writing %s file" % OUTPUTFILE) fp.write(psql) finally: diff --git a/intelmq/tests/bin/test_psql_initdb.py b/intelmq/tests/bin/test_psql_initdb.py index 9d9151c5b..2d4e7ed78 100644 --- a/intelmq/tests/bin/test_psql_initdb.py +++ b/intelmq/tests/bin/test_psql_initdb.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2016 Sebastian Wagner +# SPDX-FileCopyrightText: 2016 Sebastian Wagner, 2023 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -8,12 +8,15 @@ @author: sebastian """ +import json import os +import re +import tempfile import unittest import pkg_resources -import intelmq.bin.intelmq_psql_initdb as psql_initd +import intelmq.bin.intelmq_psql_initdb as psql_initdb class TestPsqlInit(unittest.TestCase): @@ -21,13 +24,122 @@ class TestPsqlInit(unittest.TestCase): A TestCase for intelmq_psql_initdb. """ + def setUp(self) -> None: + super().setUp() + + self.tempdir = tempfile.TemporaryDirectory() + self.addCleanup(self.tempdir.cleanup) + + self.harmonization_path = f"{self.tempdir.name}/harmonization.conf" + self._create_simple_harmonization() + + def _create_simple_harmonization(self): + simple_harmonization = { + "event": { + "classification.identifier": { + "type": "String" + }, + "time.source": { + "type": "DateTime" + }, + "raw": { + "type": "Base64" + } + } + } + with open(self.harmonization_path, "w+") as f: + json.dump(simple_harmonization, f) + def test_output(self): """ Compare output to cached one. """ with open(os.path.join(os.path.dirname(__file__), 'initdb.sql')) as handle: expected = handle.read() fname = pkg_resources.resource_filename('intelmq', 'etc/harmonization.conf') - self.assertEqual(psql_initd.generate(fname).strip(), expected.strip()) + self.assertEqual(psql_initdb.generate(fname).strip(), expected.strip()) + + def test_generating_events_schema(self): + expected_table = """ + CREATE TABLE events ( + "id" BIGSERIAL UNIQUE PRIMARY KEY, + "classification.identifier" text, + "raw" text, + "time.source" timestamp with time zone + ); + """ + expected_table = self._normalize_leading_whitespaces(expected_table) + expected_indexes = [ + """CREATE INDEX "idx_events_classification.identifier" ON events USING btree ("classification.identifier");""", + """CREATE INDEX "idx_events_time.source" ON events USING btree ("time.source");""" + ] + generated = psql_initdb.generate(self.harmonization_path) + + self.assertTrue(self._normalize_leading_whitespaces(generated).startswith(expected_table)) + + for index in expected_indexes: + self.assertIn(index, generated) + + def test_skip_generating_events_table_schema(self): + generated = psql_initdb.generate(self.harmonization_path, skip_events=True) + + self.assertNotIn("CREATE TABLE events", generated) + self.assertNotIn("CREATE INDEX", generated) + + def test_separated_raws_view_schema(self): + expected_view = """ + CREATE VIEW public.v_events AS + SELECT + events.id, + events."classification.identifier", + events."time.source", + raws."event_id", + raws."raw" + FROM ( + public.events + JOIN public.raws ON ((events.id = raws.event_id))); + """ + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn("CREATE TABLE public.raws", generated) # static schema, check if added + self.assertIn(self._normalize_leading_whitespaces(expected_view), generated) + + def test_separated_raws_trigger(self): + expected_function = """ + CREATE FUNCTION public.process_v_events_insert() + RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE event_id integer; + + BEGIN + INSERT INTO + events ( + "classification.identifier", + "time.source" + ) + VALUES + ( + NEW."classification.identifier", + NEW."time.source" + ) + RETURNING id INTO event_id; + INSERT INTO + raws ("event_id", "raw") + VALUES + (event_id, NEW.raw); + RETURN NEW; + END; + $$; + """ + + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn(self._normalize_leading_whitespaces(expected_function), generated) + self.assertIn("CREATE TRIGGER tr_events", generated) # Static, check if added + + @staticmethod + def _normalize_leading_whitespaces(data: str) -> str: + return re.sub(r"^(\s)*", " ", data.strip(), flags=re.MULTILINE) if __name__ == '__main__': # pragma: no cover From 8b0688074d0c47546ff5a06e9dca02b55d96d1a8 Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 31 Aug 2023 13:34:32 +0200 Subject: [PATCH 02/12] ENH: Partitioning and rerun in schema generation Those changes improves the capabilities of the database schema creation. It's now easier to switch to the TimescaleDB or use partitioning in PostgreSQL. Creating scripts can be generated from the harmonisation, as previously just the most basic version. If requested, creation commands get IF NOT EXISTS or OR REPLACE directives to allow re-runing script. --- CHANGELOG.md | 1 + intelmq/bin/intelmq_psql_initdb.py | 67 ++++++++++++++++----------- intelmq/tests/bin/test_psql_initdb.py | 34 ++++++++++++++ 3 files changed, 76 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bf9d9bec..db7e006f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ CHANGELOG ### Tests ### Tools + - `intelmq_psql_initdb` got support for providing custom harmonization file, generating view for storing `raw` fields separately, and adding `IF NOT EXISTS`/`OR REPLACE` clauses (PR by Kamil Mankowski). ### Contrib diff --git a/intelmq/bin/intelmq_psql_initdb.py b/intelmq/bin/intelmq_psql_initdb.py index d8101b814..24fccfa5e 100644 --- a/intelmq/bin/intelmq_psql_initdb.py +++ b/intelmq/bin/intelmq_psql_initdb.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2015 Sebastian Wagner +# SPDX-FileCopyrightText: 2015 Sebastian Wagner, 2023 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -32,33 +32,33 @@ """ -def _generate_events_schema(fields: dict) -> list: +def _generate_events_schema(fields: dict, partition_key: str = None) -> list: sql_lines = [] - sql_lines.append("CREATE TABLE events (") - sql_lines.append(' "id" BIGSERIAL UNIQUE PRIMARY KEY,') + sql_lines.append("CREATE TABLE{if_not_exist} events (") + sql_lines.append(f' "id" BIGSERIAL{" UNIQUE PRIMARY KEY" if not partition_key else ""},') for field, field_type in sorted(fields.items()): sql_lines.append(f' "{field}" {field_type},') - sql_lines[-1] = sql_lines[-1][:-1] # remove last ',' + if not partition_key: + sql_lines[-1] = sql_lines[-1][:-1] # remove last ',' + else: + sql_lines.append(f' PRIMARY KEY ("id", "{partition_key}")') sql_lines.append(");") for index in INDICES: - sql_lines.append('CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");'.format(index)) + sql_lines.append(f'CREATE INDEX{{if_not_exist}} "idx_events_{index}" ON events USING btree ("{index}");') return sql_lines -RAW_TABLE = """ -CREATE TABLE public.raws ( +RAW_TABLE_PART = """ +CREATE TABLE{if_not_exist} public.raws ( event_id bigint, raw text, - PRIMARY KEY(event_id), - CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE -); -""" + PRIMARY KEY(event_id)""" RAW_TRIGGER = """ -CREATE TRIGGER tr_events +CREATE{or_replace} TRIGGER tr_events INSTEAD OF INSERT ON public.v_events FOR EACH ROW @@ -66,14 +66,18 @@ def _generate_events_schema(fields: dict) -> list: """ -def _generate_separated_raws_schema(fields: dict) -> list: +def _generate_separated_raws_schema(fields: dict, partition_key: str) -> list: sorted_fields = sorted(key for key in fields.keys() if key != "raw") - sql_lines = ['-- Create the table holding only the "raw" values\n', RAW_TABLE] + sql_lines = ['', '-- Create the table holding only the "raw" values', RAW_TABLE_PART] + if not partition_key: + sql_lines[-1] += "," + sql_lines.append(" CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE") + sql_lines.append(");") sql_lines.extend([ '', '-- Create the v_events view which joins the tables "events" and "raws"\n', - 'CREATE VIEW public.v_events AS', + 'CREATE{or_replace} VIEW public.v_events AS', ' SELECT', ' events.id,', ]) @@ -91,7 +95,7 @@ def _generate_separated_raws_schema(fields: dict) -> list: '', '-- Establish the INSERT trigger for the events table, splitting the data into events and raws', '', - 'CREATE FUNCTION public.process_v_events_insert()', + 'CREATE{or_replace} FUNCTION public.process_v_events_insert()', ' RETURNS trigger', ' LANGUAGE plpgsql', ' AS $$', @@ -127,7 +131,8 @@ def _generate_separated_raws_schema(fields: dict) -> list: return sql_lines -def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, separate_raws=False): +def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, + separate_raws=False, partition_key=None, skip_or_replace=False): FIELDS = {} sql_lines = [] @@ -172,12 +177,17 @@ def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, sepa FIELDS[field] = dbtype if not skip_events: - sql_lines.extend(_generate_events_schema(FIELDS)) + sql_lines.extend(_generate_events_schema(FIELDS, partition_key)) if separate_raws: - sql_lines.extend(_generate_separated_raws_schema(FIELDS)) + sql_lines.extend(_generate_separated_raws_schema(FIELDS, partition_key)) - return "\n".join(sql_lines) + existing_clause = " IF NOT EXISTS" if skip_or_replace else "" + replace_clause = " OR REPLACE" if skip_or_replace else "" + + return "\n".join( + line.format(if_not_exist=existing_clause, or_replace=replace_clause) for line in sql_lines + ) def main(): @@ -193,12 +203,14 @@ def main(): help="Skip generating the events table schema") parser.add_argument("--separate-raws", action="store_true", default=False, help="Generate v_events view to separate raw field from the rest of the data on insert") - parser.add_argument("--partition-field", default=None, - help="Add given field to all generated indexes. Useful when utilizing partitioning for TimescaleDB") + parser.add_argument("--partition-key", default=None, + help=("Add given field to the primary key of the events table. " + "Useful when utilizing partitioning for TimescaleDB. " + "If combined with --separate-raws, the v_events does not get foreign key")) parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE, help="Path to the harmonization file") - parser.add_argument("--if-not-exists", default=False, - help="Add IF NOT EXISTS directive to created schemas") + parser.add_argument("--skip-or-replace", default=False, action="store_true", + help="Add IF NOT EXISTS or REPLACE directive to created schemas") args = parser.parse_args() OUTPUTFILE = args.outputfile @@ -213,7 +225,10 @@ def main(): fp = open(OUTPUTFILE, 'w') psql = generate(args.harmonization, skip_events=args.no_events, - separate_raws=args.separate_raws) + separate_raws=args.separate_raws, + partition_key=args.partition_key, + skip_or_replace=args.skip_or_replace, + ) print("INFO - Writing %s file" % OUTPUTFILE) fp.write(psql) finally: diff --git a/intelmq/tests/bin/test_psql_initdb.py b/intelmq/tests/bin/test_psql_initdb.py index 2d4e7ed78..284664bcd 100644 --- a/intelmq/tests/bin/test_psql_initdb.py +++ b/intelmq/tests/bin/test_psql_initdb.py @@ -101,6 +101,7 @@ def test_separated_raws_view_schema(self): generated = psql_initdb.generate(self.harmonization_path, separate_raws=True) generated = self._normalize_leading_whitespaces(generated) self.assertIn("CREATE TABLE public.raws", generated) # static schema, check if added + self.assertIn("CONSTRAINT raws_event_id_fkey", generated) self.assertIn(self._normalize_leading_whitespaces(expected_view), generated) def test_separated_raws_trigger(self): @@ -137,6 +138,39 @@ def test_separated_raws_trigger(self): self.assertIn(self._normalize_leading_whitespaces(expected_function), generated) self.assertIn("CREATE TRIGGER tr_events", generated) # Static, check if added + def test_partition_field(self): + """For paritioned table """ + expected_table = """ + CREATE TABLE events ( + "id" BIGSERIAL, + "classification.identifier" text, + "raw" text, + "time.source" timestamp with time zone, + PRIMARY KEY ("id", "time.source") + ); + """ + generated = psql_initdb.generate(self.harmonization_path, partition_key="time.source", + separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn(self._normalize_leading_whitespaces(expected_table), generated) + # Foreign key may not be supported on partitioned or Timescale tables, skipping + self.assertNotIn("CONSTRAINT raws_event_id_fkey", generated) + + def test_skip_or_replace(self): + expected_creates = [ + "CREATE TABLE IF NOT EXISTS public.raws", + "CREATE TABLE IF NOT EXISTS events", + 'CREATE INDEX IF NOT EXISTS "idx_events_classification.identifier"', + # Do not support IF NOT EXISTS: + "CREATE OR REPLACE TRIGGER tr_events", + "CREATE OR REPLACE FUNCTION public.process_v_events_insert()", + "CREATE OR REPLACE VIEW public.v_events", + ] + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True, + skip_or_replace=True) + for create in expected_creates: + self.assertIn(create, generated) + @staticmethod def _normalize_leading_whitespaces(data: str) -> str: return re.sub(r"^(\s)*", " ", data.strip(), flags=re.MULTILINE) From df0d984af0d139a83f56e824bbf8c480ecbd57f5 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Tue, 5 Sep 2023 15:11:59 +0200 Subject: [PATCH 03/12] Update CHANGELOG.md Co-authored-by: Sebastian --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db7e006f7..69fc9b011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,7 @@ CHANGELOG ### Tests ### Tools - - `intelmq_psql_initdb` got support for providing custom harmonization file, generating view for storing `raw` fields separately, and adding `IF NOT EXISTS`/`OR REPLACE` clauses (PR by Kamil Mankowski). + - `intelmq_psql_initdb` got support for providing custom harmonization file, generating view for storing `raw` fields separately, and adding `IF NOT EXISTS`/`OR REPLACE` clauses ([PR#2404](https://github.com/certtools/intelmq/pull/2404) by Kamil Mankowski). ### Contrib From 67d47308bfc217c225a71f39ced5a887c77df4fe Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Tue, 5 Sep 2023 16:11:13 +0200 Subject: [PATCH 04/12] Adjust documentation. Remove obsolete scripts --- contrib/eventdb/separate-raws-table.sql | 305 ------------------------ docs/user/eventdb.rst | 62 ++++- intelmq/bin/intelmq_psql_initdb.py | 5 +- 3 files changed, 54 insertions(+), 318 deletions(-) delete mode 100644 contrib/eventdb/separate-raws-table.sql diff --git a/contrib/eventdb/separate-raws-table.sql b/contrib/eventdb/separate-raws-table.sql deleted file mode 100644 index 6510630ad..000000000 --- a/contrib/eventdb/separate-raws-table.sql +++ /dev/null @@ -1,305 +0,0 @@ --- SPDX-FileCopyrightText: 2021 Sebastian Wagner --- --- SPDX-License-Identifier: AGPL-3.0-or-later - --- Create the table holding only the "raw" values: - -CREATE TABLE public.raws ( - event_id bigint, - raw text -); - -ALTER TABLE - public.raws OWNER TO intelmq; - -CREATE INDEX idx_raws_event_id ON public.raws USING btree (event_id); - -ALTER TABLE - ONLY public.raws -ADD - CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE; - --- Create the v_events view which joins the tables "events" and "raws" - -CREATE VIEW public.v_events AS - SELECT events.id, - events."classification.identifier", - events."classification.taxonomy", - events."classification.type", - events."comment", - events."destination.abuse_contact", - events."destination.account", - events."destination.allocated", - events."destination.asn", - events."destination.as_name", - events."destination.domain_suffix", - events."destination.fqdn", - events."destination.geolocation.cc", - events."destination.geolocation.city", - events."destination.geolocation.country", - events."destination.geolocation.latitude", - events."destination.geolocation.longitude", - events."destination.geolocation.region", - events."destination.geolocation.state", - events."destination.ip", - events."destination.local_hostname", - events."destination.local_ip", - events."destination.network", - events."destination.port", - events."destination.registry", - events."destination.reverse_dns", - events."destination.tor_node", - events."destination.url", - events."destination.urlpath", - events."event_description.target", - events."event_description.text", - events."event_description.url", - events."event_hash", - events."extra", - events."feed.accuracy", - events."feed.code", - events."feed.documentation", - events."feed.name", - events."feed.provider", - events."feed.url", - events."malware.hash", - events."malware.hash.md5", - events."malware.hash.sha1", - events."malware.hash.sha256", - events."malware.name", - events."malware.version", - events."misp.attribute_uuid", - events."misp.event_uuid", - events."protocol.application", - events."protocol.transport", - events."rtir_id", - events."screenshot_url", - events."source.abuse_contact", - events."source.account", - events."source.allocated", - events."source.asn", - events."source.as_name", - events."source.domain_suffix", - events."source.fqdn", - events."source.geolocation.cc", - events."source.geolocation.city", - events."source.geolocation.country", - events."source.geolocation.cymru_cc", - events."source.geolocation.geoip_cc", - events."source.geolocation.latitude", - events."source.geolocation.longitude", - events."source.geolocation.region", - events."source.geolocation.state", - events."source.ip", - events."source.local_hostname", - events."source.local_ip", - events."source.network", - events."source.port", - events."source.registry", - events."source.reverse_dns", - events."source.tor_node", - events."source.url", - events."source.urlpath", - events."status", - events."time.observation", - events."time.source", - events."tlp", - raws."event_id", - raws."raw" - FROM ( - public.events - JOIN public.raws ON ((events.id = raws.event_id))); - --- Establish the INSERT trigger for the events table, splitting the data into events and raws - -CREATE FUNCTION public.process_v_events_insert() - RETURNS trigger - LANGUAGE plpgsql - AS $$ - DECLARE event_id integer; - - BEGIN - -- add all data except raw to events - INSERT INTO - events ( - "classification.identifier", - "classification.taxonomy", - "classification.type", - "comment", - "destination.abuse_contact", - "destination.account", - "destination.allocated", - "destination.asn", - "destination.as_name", - "destination.domain_suffix", - "destination.fqdn", - "destination.geolocation.cc", - "destination.geolocation.city", - "destination.geolocation.country", - "destination.geolocation.latitude", - "destination.geolocation.longitude", - "destination.geolocation.region", - "destination.geolocation.state", - "destination.ip", - "destination.local_hostname", - "destination.local_ip", - "destination.network", - "destination.port", - "destination.registry", - "destination.reverse_dns", - "destination.tor_node", - "destination.url", - "destination.urlpath", - "event_description.target", - "event_description.text", - "event_description.url", - "event_hash", - "extra", - "feed.accuracy", - "feed.code", - "feed.documentation", - "feed.name", - "feed.provider", - "feed.url", - "malware.hash", - "malware.hash.md5", - "malware.hash.sha1", - "malware.name", - "malware.version", - "misp.attribute_uuid", - "misp.event_uuid", - "protocol.application", - "protocol.transport", - "rtir_id", - "screenshot_url", - "source.abuse_contact", - "source.account", - "source.allocated", - "source.asn", - "source.as_name", - "source.domain_suffix", - "source.fqdn", - "source.geolocation.cc", - "source.geolocation.city", - "source.geolocation.country", - "source.geolocation.cymru_cc", - "source.geolocation.geoip_cc", - "source.geolocation.latitude", - "source.geolocation.longitude", - "source.geolocation.region", - "source.geolocation.state", - "source.ip", - "source.local_hostname", - "source.local_ip", - "source.network", - "source.port", - "source.registry", - "source.reverse_dns", - "source.tor_node", - "source.url", - "source.urlpath", - "status", - "time.observation", - "time.source", - "tlp" - ) - VALUES - ( - NEW."classification.identifier", - NEW."classification.taxonomy", - NEW."classification.type", - NEW."comment", - NEW."destination.abuse_contact", - NEW."destination.account", - NEW."destination.allocated", - NEW."destination.asn", - NEW."destination.as_name", - NEW."destination.domain_suffix", - NEW."destination.fqdn", - NEW."destination.geolocation.cc", - NEW."destination.geolocation.city", - NEW."destination.geolocation.country", - NEW."destination.geolocation.latitude", - NEW."destination.geolocation.longitude", - NEW."destination.geolocation.region", - NEW."destination.geolocation.state", - NEW."destination.ip", - NEW."destination.local_hostname", - NEW."destination.local_ip", - NEW."destination.network", - NEW."destination.port", - NEW."destination.registry", - NEW."destination.reverse_dns", - NEW."destination.tor_node", - NEW."destination.url", - NEW."destination.urlpath", - NEW."event_description.target", - NEW."event_description.text", - NEW."event_description.url", - NEW."event_hash", - NEW."extra", - NEW."feed.accuracy", - NEW."feed.code", - NEW."feed.documentation", - NEW."feed.name", - NEW."feed.provider", - NEW."feed.url", - NEW."malware.hash", - NEW."malware.hash.md5", - NEW."malware.hash.sha1", - NEW."malware.name", - NEW."malware.version", - NEW."misp.attribute_uuid", - NEW."misp.event_uuid", - NEW."protocol.application", - NEW."protocol.transport", - NEW."rtir_id", - NEW."screenshot_url", - NEW."source.abuse_contact", - NEW."source.account", - NEW."source.allocated", - NEW."source.asn", - NEW."source.as_name", - NEW."source.domain_suffix", - NEW."source.fqdn", - NEW."source.geolocation.cc", - NEW."source.geolocation.city", - NEW."source.geolocation.country", - NEW."source.geolocation.cymru_cc", - NEW."source.geolocation.geoip_cc", - NEW."source.geolocation.latitude", - NEW."source.geolocation.longitude", - NEW."source.geolocation.region", - NEW."source.geolocation.state", - NEW."source.ip", - NEW."source.local_hostname", - NEW."source.local_ip", - NEW."source.network", - NEW."source.port", - NEW."source.registry", - NEW."source.reverse_dns", - NEW."source.tor_node", - NEW."source.url", - NEW."source.urlpath", - NEW."status", - NEW."time.observation", - NEW."time.source", - NEW."tlp" - ) RETURNING id INTO event_id; - - -- add the raw value to raws, link with the event_id - INSERT INTO - raws ("event_id", "raw") - VALUES - (event_id, NEW.raw); - - RETURN NEW; - - END; -$$; - -CREATE TRIGGER tr_events - INSTEAD OF INSERT - ON public.v_events - FOR EACH ROW - EXECUTE FUNCTION public.process_v_events_insert(); diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index 971094e6c..fcae69a06 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -8,22 +8,42 @@ EventDB The EventDB is not a software itself. -The EventDB is a database (usually `PostgreSQL `_) that gets filled with with data from IntelMQ using the :ref:`intelmq.bots.outputs.sql.output` Output Bot. +The EventDB is a database (usually `PostgreSQL `_) that gets filled with with data +from IntelMQ using the :ref:`intelmq.bots.outputs.sql.output` Output Bot. Its core is the `events` +table with the structure corresponding to the :doc:`IntelMQ Data Format `. +Having the table created is required to use the EventDB. + +.. _intelmq_psql_initdb: ----------------------- -The events table itself +intelmq_psql_initdb ----------------------- -IntelMQ comes with the ``intelmq_psql_initdb`` command line tool. It creates an SQL file containing: +IntelMQ comes with the ``intelmq_psql_initdb`` command line tool designed to help with creating the +EventDB. It creates in the first line: - A ``CREATE TABLE events`` statement with all valid IntelMQ fields as columns and correct types - Several indexes as examples for a good read & search performance -All elements of this SQL file can be adapted and extended before running the SQL file against a database, especially the indexes. +Having an `events` table as outlined in the SQL file, IntelMQ's :ref:`intelmq.bots.outputs.sql.output` +Output Bot can write all received events into this database table. + +In addition, the script supports some additional features supporting use cases described later in +this document: + +- ``--partition-key`` - for generating schema aligned with :ref:`TimescaleDB ` + or partitioned tables, +- ``--separate-raws`` - for generating views and triggers needed to :ref:`eventdb_raws_table` + (works also together with adjustments for partitioning). + +For full list of supported parameters, call the script help using ``-h`` parameter. -Having an `events` table as outlined in the SQL file, IntelMQ's :ref:`intelmq.bots.outputs.sql.output` Output Bot can write all received events into this database table. +All elements of generated SQL file can be adapted and extended before running the SQL file against +a database, especially the indexes. Please review the generated script before applying. -This events table is the core of the so-called EventDB and also required by all other sections of this document. +Be aware that if you create tables using another DB user that used later by the output bot, you may +need to adjust ownership or privileges in the database. If you have problems with database permissions, +refer to `PostgreSQL documentation `. ----------------- EventDB Utilities @@ -77,11 +97,13 @@ By modifying the configuration file it is possible to configure various queries :alt: EventDB Statistics Example +.. _timescaledb: + ------------------------------- Using EventDB with Timescale DB ------------------------------- -`Timescale DB `_ is a PostgreSQL extension to add time-series support, which is quite handy as you dont have to learn other syntaxes as you already know. You can use the SQL Queries as before, the extension will handle the rest. +`Timescale DB `_ is a PostgreSQL extension to add time-series support, which is quite handy as you don't have to learn other syntaxes as you already know. You can use the SQL Queries as before, the extension will handle the rest. To see all limitations, please check the `Timescale DB Documentation `_. What is time-series? @@ -91,10 +113,28 @@ Time-series has been invented as traditional database design like relational or A big benefit of time-series instead of other database designs over a time-based search pattern is the performance. As IntelMQ uses data based upon time, this design is awesome & will give you a performance boost. -How to setup ------------- +How to choose the time column? +------------------------------ + +To utilize the time-series, you need to choose a column containing the right time. This is then +used by you for manual queries and graphs, but also by the database itself for organizing the data. + +The :doc:`IntelMQ Data Format ` has two fields that can be used for this: +``time.source`` or ``time.observation``. Depending on your needs (tracking when event occurred or when +was detected, if different) choose one of them. + +You can use :ref:`intelmq_psql_initdb` tool to generate SQL schema valid for TimescaleDB by passing +the partitioning key: + +.. code-block:: bash + + intelmq_psql_initdb --partition-key "time.source" + +How to setup? +------------- + +Thanks to TimescaleDB it's very easy to setup. -Thanks to TimescaleDB its very easy to setup. 1. Choose your preferred `Timescale DB `_ environment & follow the installation instructions. 2. Now lets create a `hypertable `_, which is the timescale DB time-series structure. ``SELECT create_hypertable('', 'time.source');``. 3. Now our hypertable is setup & timescaleDB takes care of the rest. You can perform queries as usual, for further information please check `Timescale DB Documentation `_. @@ -132,5 +172,5 @@ The last steps brings us several advantages: - No code changes are needed in the IntelMQ output bot or your own scripts. A migration is seamless. - PostgreSQL itself ensures that the data of both tables is consistent and linked correctly. -The complete SQL script can be found in the `contrib/eventdb `_ directory of IntelMQ. +The complete SQL script can be generated using :ref:`intelmq_psql_initdb`. It does *not* cover step 2 to avoid accidental data loss - you need to do this step manually. diff --git a/intelmq/bin/intelmq_psql_initdb.py b/intelmq/bin/intelmq_psql_initdb.py index 24fccfa5e..34a55a380 100644 --- a/intelmq/bin/intelmq_psql_initdb.py +++ b/intelmq/bin/intelmq_psql_initdb.py @@ -204,8 +204,9 @@ def main(): parser.add_argument("--separate-raws", action="store_true", default=False, help="Generate v_events view to separate raw field from the rest of the data on insert") parser.add_argument("--partition-key", default=None, - help=("Add given field to the primary key of the events table. " - "Useful when utilizing partitioning for TimescaleDB. " + help=("Add given field to the primary key of the events table to allow " + "partitioning in the database. Useful especially for setups with " + "TimescaleDB, see IntelMQ documentation for more advices. " "If combined with --separate-raws, the v_events does not get foreign key")) parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE, help="Path to the harmonization file") From 836610946ebed1b272f21915cc550e78c20d1605 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Wed, 6 Sep 2023 15:24:12 +0200 Subject: [PATCH 05/12] Update docs/user/eventdb.rst Co-authored-by: Sebastian --- docs/user/eventdb.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index fcae69a06..64ccaa348 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -8,7 +8,7 @@ EventDB The EventDB is not a software itself. -The EventDB is a database (usually `PostgreSQL `_) that gets filled with with data +The EventDB is a database (usually `PostgreSQL `_) that gets filled with data from IntelMQ using the :ref:`intelmq.bots.outputs.sql.output` Output Bot. Its core is the `events` table with the structure corresponding to the :doc:`IntelMQ Data Format `. Having the table created is required to use the EventDB. From ec297866d42047da617f5340eca3f2fea3b7ae49 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Wed, 6 Sep 2023 15:24:19 +0200 Subject: [PATCH 06/12] Update docs/user/eventdb.rst Co-authored-by: Sebastian --- docs/user/eventdb.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index 64ccaa348..afb70691b 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -36,7 +36,7 @@ this document: - ``--separate-raws`` - for generating views and triggers needed to :ref:`eventdb_raws_table` (works also together with adjustments for partitioning). -For full list of supported parameters, call the script help using ``-h`` parameter. +For a full list of supported parameters, call the script help using ``-h`` parameter. All elements of generated SQL file can be adapted and extended before running the SQL file against a database, especially the indexes. Please review the generated script before applying. From accb93ee43188f1ab12c653e4928a52154965d53 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Wed, 6 Sep 2023 15:24:27 +0200 Subject: [PATCH 07/12] Update docs/user/eventdb.rst Co-authored-by: Sebastian --- docs/user/eventdb.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index afb70691b..7969b75f7 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -41,7 +41,7 @@ For a full list of supported parameters, call the script help using ``-h`` param All elements of generated SQL file can be adapted and extended before running the SQL file against a database, especially the indexes. Please review the generated script before applying. -Be aware that if you create tables using another DB user that used later by the output bot, you may +Be aware that if you create tables using another DB user that is used later by the output bot, you may need to adjust ownership or privileges in the database. If you have problems with database permissions, refer to `PostgreSQL documentation `. From e60cf7185d3beae17432e679b804cce61b0bc2a2 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Wed, 6 Sep 2023 15:24:34 +0200 Subject: [PATCH 08/12] Update docs/user/eventdb.rst Co-authored-by: Sebastian --- docs/user/eventdb.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index 7969b75f7..e3af87640 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -38,7 +38,7 @@ this document: For a full list of supported parameters, call the script help using ``-h`` parameter. -All elements of generated SQL file can be adapted and extended before running the SQL file against +All elements of the generated SQL file can be adapted and extended before running the SQL file against a database, especially the indexes. Please review the generated script before applying. Be aware that if you create tables using another DB user that is used later by the output bot, you may From 8f9883b464d5ec6fb04d8f40c9b0b28b2eb41817 Mon Sep 17 00:00:00 2001 From: kamil-certat <117654481+kamil-certat@users.noreply.github.com> Date: Wed, 6 Sep 2023 15:26:40 +0200 Subject: [PATCH 09/12] Apply suggestions from code review Co-authored-by: Sebastian --- docs/user/eventdb.rst | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index e3af87640..b3704cd0a 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -116,14 +116,14 @@ As IntelMQ uses data based upon time, this design is awesome & will give you a p How to choose the time column? ------------------------------ -To utilize the time-series, you need to choose a column containing the right time. This is then -used by you for manual queries and graphs, but also by the database itself for organizing the data. +To utilize the time-series, choose a column containing the right time. This is then +used by you for manual queries and graphs, and also by the database itself for organizing the data. The :doc:`IntelMQ Data Format ` has two fields that can be used for this: -``time.source`` or ``time.observation``. Depending on your needs (tracking when event occurred or when -was detected, if different) choose one of them. +``time.source`` or ``time.observation``. Depending on your needs (tracking when the event occurred or when +it was detected, if different), choose one of them. -You can use :ref:`intelmq_psql_initdb` tool to generate SQL schema valid for TimescaleDB by passing +You can use the :ref:`intelmq_psql_initdb` tool to generate SQL schema valid for TimescaleDB by passing the partitioning key: .. code-block:: bash @@ -133,7 +133,7 @@ the partitioning key: How to setup? ------------- -Thanks to TimescaleDB it's very easy to setup. +Thanks to TimescaleDB, it's very easy to setup. 1. Choose your preferred `Timescale DB `_ environment & follow the installation instructions. 2. Now lets create a `hypertable `_, which is the timescale DB time-series structure. ``SELECT create_hypertable('', 'time.source');``. From e7c12350973beeeb53137dddb03892a2c8073e63 Mon Sep 17 00:00:00 2001 From: Jan Kaliszewski Date: Wed, 20 Sep 2023 21:02:48 +0200 Subject: [PATCH 10/12] STOMP bots: several updates, fixes and improvements + some refactoring The updates, fixes and improvements regard the *STOMP collector* and *STOMP output* bots. Important changes are described below... From now on, newer versions of the `stomp.py` package are supported -- including the latest (8.1.0). Now both STOMP bots coerce the `port` configuration parameter to int -- so that a string representing an integer number is also acceptable (even if not recommended) as a value of that parameter. In the *STOMP output* bot, a bug has been fixed: `AttributeError` caused by attempts to get unset attributes (`ssl_ca_cert` and companions...). The *STOMP collector*'s reconnection mechanism has been fixed: from now on, no reconnection attempts are made after `shutdown()`. Apart from that, reconnection is not attempted at all for versions of `stomp.py` older than 4.1.21 (as it did not work properly anyway). Also regarding the *STOMP collector* bot, the following (undocumented and unused) attributes of `StompCollectorBot` instances are no longer set in `init()`: `ssl_ca_cert`, `ssl_cl_cert`, `ssl_cl_cert_key`. Various checks have been improved/enhanced. Now, for example, both STOMP bot classes implement the `check()` static/class method -- whose role is to check ("statically", without the need to run the bot) configuration parameters; in particular, it checks whether necessary certificate files are accessible. When it comes to runtime (on-initialization) checks, one notable improvement is that now also the *STOMP output* bot will raise a `MissingDependencyError` if the `stomp.py` version is older than 4.1.8 (an analogous check has already been implemented by *STOMP collector*). The code of those bot classes have also been significantly refactored -- in particular, several common operations have been factored out and placed in a new mix-in class: `intelmq.lib.mixins.StompMixin`; its definition resides in a new module: `intelmq.lib.mixins.stomp`. --- intelmq/bots/collectors/stomp/collector.py | 83 ++++++------ intelmq/bots/outputs/stomp/output.py | 33 ++--- intelmq/lib/mixins/__init__.py | 3 +- intelmq/lib/mixins/stomp.py | 145 +++++++++++++++++++++ 4 files changed, 202 insertions(+), 62 deletions(-) create mode 100644 intelmq/lib/mixins/stomp.py diff --git a/intelmq/bots/collectors/stomp/collector.py b/intelmq/bots/collectors/stomp/collector.py index 0c9f60456..610a39fa8 100644 --- a/intelmq/bots/collectors/stomp/collector.py +++ b/intelmq/bots/collectors/stomp/collector.py @@ -3,13 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import os.path from intelmq.lib.bot import CollectorBot -from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.mixins import StompMixin try: import stomp + import stomp.exception except ImportError: stomp = None else: @@ -18,9 +18,10 @@ class StompListener(stomp.PrintingListener): the stomp listener gets called asynchronously for every STOMP message """ - def __init__(self, n6stompcollector, conn, destination): + def __init__(self, n6stompcollector, conn, destination, connect_kwargs=None): self.stompbot = n6stompcollector self.conn = conn + self.connect_kwargs = connect_kwargs self.destination = destination super().__init__() if stomp.__version__ >= (5, 0, 0): @@ -29,15 +30,23 @@ def __init__(self, n6stompcollector, conn, destination): def on_heartbeat_timeout(self): self.stompbot.logger.info("Heartbeat timeout. Attempting to re-connect.") - connect_and_subscribe(self.conn, self.stompbot.logger, self.destination) - - def on_error(self, headers, message): - self.stompbot.logger.error('Received an error: %r.', message) - - def on_message(self, headers, message): - self.stompbot.logger.debug('Receive message %r...', message[:500]) + if self.stompbot._auto_reconnect: + connect_and_subscribe(self.conn, self.stompbot.logger, self.destination, + connect_kwargs=self.connect_kwargs) + + def on_error(self, frame, body=None): + if body is None: + # `stomp.py >= 6.1.0` + body = frame.body + self.stompbot.logger.error('Received an error: %r.', body) + + def on_message(self, frame, body=None): + if body is None: + # `stomp.py >= 6.1.0` + body = frame.body + self.stompbot.logger.debug('Receive message %r...', body[:500]) report = self.stompbot.new_report() - report.add("raw", message.rstrip()) + report.add("raw", body.rstrip()) report.add("feed.url", "stomp://" + self.stompbot.server + ":" + str(self.stompbot.port) + @@ -46,19 +55,23 @@ def on_message(self, headers, message): def on_disconnected(self): self.stompbot.logger.debug('Detected disconnect') - connect_and_subscribe(self.conn, self.stompbot.logger, self.destination) + if self.stompbot._auto_reconnect: + connect_and_subscribe(self.conn, self.stompbot.logger, self.destination, + connect_kwargs=self.connect_kwargs) -def connect_and_subscribe(conn, logger, destination, start=False): +def connect_and_subscribe(conn, logger, destination, start=False, connect_kwargs=None): if start: conn.start() - conn.connect(wait=True) + if connect_kwargs is None: + connect_kwargs = dict(wait=True) + conn.connect(**connect_kwargs) conn.subscribe(destination=destination, id=1, ack='auto') logger.info('Successfully connected and subscribed.') -class StompCollectorBot(CollectorBot): +class StompCollectorBot(CollectorBot, StompMixin): """Collect data from a STOMP Interface""" """ main class for the STOMP protocol collector """ exchange: str = '' @@ -73,36 +86,22 @@ class StompCollectorBot(CollectorBot): __conn = False # define here so shutdown method can check for it def init(self): - if stomp is None: - raise MissingDependencyError("stomp") - elif stomp.__version__ < (4, 1, 8): - raise MissingDependencyError("stomp", version="4.1.8", - installed=stomp.__version__) - - self.ssl_ca_cert = self.ssl_ca_certificate - self.ssl_cl_cert = self.ssl_client_certificate - self.ssl_cl_cert_key = self.ssl_client_certificate_key - - # check if certificates exist - for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]: - if not os.path.isfile(f): - raise ValueError("Could not open file %r." % f) - - _host = [(self.server, self.port)] - self.__conn = stomp.Connection(host_and_ports=_host, use_ssl=True, - ssl_key_file=self.ssl_cl_cert_key, - ssl_cert_file=self.ssl_cl_cert, - ssl_ca_certs=self.ssl_ca_cert, - heartbeats=(self.heartbeat, - self.heartbeat)) - - self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange)) + self.stomp_bot_runtime_initial_check() + + # (note: older versions of `stomp.py` do not play well with reconnects) + self._auto_reconnect = (stomp.__version__ >= (4, 1, 21)) + + self.__conn, connect_kwargs = self.prepare_stomp_connection() + self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange, + connect_kwargs=connect_kwargs)) connect_and_subscribe(self.__conn, self.logger, self.exchange, - start=stomp.__version__ < (4, 1, 20)) + start=stomp.__version__ < (4, 1, 20), + connect_kwargs=connect_kwargs) def shutdown(self): if not stomp or not self.__conn: return + self._auto_reconnect = False try: self.__conn.disconnect() except stomp.exception.NotConnectedException: @@ -111,5 +110,9 @@ def shutdown(self): def process(self): pass + @classmethod + def check(cls, parameters): + return cls.stomp_bot_parameters_check(parameters) or None + BOT = StompCollectorBot diff --git a/intelmq/bots/outputs/stomp/output.py b/intelmq/bots/outputs/stomp/output.py index 3b5e30d6e..c491db45f 100644 --- a/intelmq/bots/outputs/stomp/output.py +++ b/intelmq/bots/outputs/stomp/output.py @@ -3,11 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import os.path from intelmq.lib.bot import OutputBot -from intelmq.lib.exceptions import MissingDependencyError - +from intelmq.lib.mixins import StompMixin try: import stomp @@ -15,7 +13,7 @@ stomp = None -class StompOutputBot(OutputBot): +class StompOutputBot(OutputBot, StompMixin): """Send events to a STMOP server""" """ main class for the STOMP protocol output bot """ exchange: str = "/exchange/_push" @@ -35,29 +33,18 @@ class StompOutputBot(OutputBot): _conn = None def init(self): - if stomp is None: - raise MissingDependencyError("stomp") - - # check if certificates exist - for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]: - if not os.path.isfile(f): - raise ValueError("Could not open SSL (certificate) file '%s'." % f) - - _host = [(self.server, self.port)] - self._conn = stomp.Connection(host_and_ports=_host, use_ssl=True, - ssl_key_file=self.ssl_cl_cert_key, - ssl_cert_file=self.ssl_cl_cert, - ssl_ca_certs=self.ssl_ca_cert, - heartbeats=(self.heartbeat, - self.heartbeat)) + self.stomp_bot_runtime_initial_check() + (self._conn, + self._connect_kwargs) = self.prepare_stomp_connection() self.connect() def connect(self): self.logger.debug('Connecting.') # based on the documentation at: # https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example - self._conn.start() - self._conn.connect(wait=True) + if stomp.__version__ < (4, 1, 20): + self._conn.start() + self._conn.connect(**self._connect_kwargs) self.logger.debug('Connected.') def shutdown(self): @@ -73,5 +60,9 @@ def process(self): destination=self.exchange) self.acknowledge_message() + @classmethod + def check(cls, parameters): + return cls.stomp_bot_parameters_check(parameters) or None + BOT = StompOutputBot diff --git a/intelmq/lib/mixins/__init__.py b/intelmq/lib/mixins/__init__.py index d33313548..664ae8a02 100644 --- a/intelmq/lib/mixins/__init__.py +++ b/intelmq/lib/mixins/__init__.py @@ -5,5 +5,6 @@ from intelmq.lib.mixins.http import HttpMixin from intelmq.lib.mixins.cache import CacheMixin from intelmq.lib.mixins.sql import SQLMixin +from intelmq.lib.mixins.stomp import StompMixin -__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin'] +__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin', 'StompMixin'] diff --git a/intelmq/lib/mixins/stomp.py b/intelmq/lib/mixins/stomp.py new file mode 100644 index 000000000..29b7175b8 --- /dev/null +++ b/intelmq/lib/mixins/stomp.py @@ -0,0 +1,145 @@ +""" StompMixin for IntelMQ + +SPDX-FileCopyrightText: 2017 Sebastian Wagner, 2023 NASK +SPDX-License-Identifier: AGPL-3.0-or-later +""" + +from typing import ( + Any, + Callable, + List, + NoReturn, + Tuple, +) + +try: + import stomp +except ImportError: + stomp = None + +from intelmq.lib.exceptions import MissingDependencyError + + +class StompMixin: + + """A mixin that provides certain common methods for STOMP bots.""" + + # + # STOMP bot attributes relevant to this mixin + + server: str + port: int + heartbeat: int + + ssl_ca_certificate: str # TODO: could be pathlib.Path + ssl_client_certificate: str # TODO: could be pathlib.Path + ssl_client_certificate_key: str # TODO: could be patlib.Path + + # + # Helper methods intended to be used in subclasses + + @classmethod + def stomp_bot_parameters_check(cls, parameters: dict) -> List[List[str]]: + """Intended to be used in bots' `check()` static/class method.""" + logs = [] + cls.__verify_parameters( + get_param=parameters.get, + on_error=lambda msg: logs.append(['error', msg]), + ) + return logs + + def stomp_bot_runtime_initial_check(self) -> None: + """Intended to be used in bots' `init()` instance method.""" + self.__verify_dependency() + self.__verify_parameters( + get_param=self.__get_own_attribute, + on_error=self.__raise_value_error, + ) + + def prepare_stomp_connection(self) -> Tuple['stomp.Connection', dict]: + """ + Get a `(, )` pair. + + * `` is a new instance of `stomp.Connection`, + with the SSL stuff *already configured*, but *without* any + invocations of `connect()` made yet; + + * `` is a dict of arguments -- ready + to be passed to the `connect()` method of the aforementioned + `` object. + """ + ssl_kwargs, connect_kwargs = self.__get_ssl_and_connect_kwargs() + host_and_ports = [(self.server, int(self.port))] + stomp_connection = stomp.Connection(host_and_ports=host_and_ports, + heartbeats=(self.heartbeat, + self.heartbeat)) + stomp_connection.set_ssl(host_and_ports, **ssl_kwargs) + return stomp_connection, connect_kwargs + + # + # Implementation details + + _DEPENDENCY_NAME_REMARK = ( + "Note that the actual name of the pip-installable " + "distribution package is 'stomp.py', not 'stomp'.") + + @classmethod + def __verify_dependency(cls) -> None: + # Note: the pip-installable package's name is 'stomp.py', but + # e.g. the apt-installable package's name is 'python3-stomp' (or + # similar) -- that's why we pass to the `MissingDependencyError` + # constructor the name 'stomp', but also pass the value of the + # `_DEPENDENCY_NAME_REMARK` constant as `additional_text`... + if stomp is None: + raise MissingDependencyError('stomp', + additional_text=cls._DEPENDENCY_NAME_REMARK) + if stomp.__version__ < (4, 1, 8): + raise MissingDependencyError('stomp', version="4.1.8", + installed=stomp.__version__, + additional_text=cls._DEPENDENCY_NAME_REMARK) + + @classmethod + def __verify_parameters(cls, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> None: + for param_name in [ + 'ssl_ca_certificate', + 'ssl_client_certificate', + 'ssl_client_certificate_key', + ]: + cls.__verify_file_param(param_name, get_param, on_error) + + @classmethod + def __verify_file_param(cls, + param_name: str, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> None: + path = get_param(param_name) + if path is None: + on_error(f"Parameter {param_name!r} is not given " + f"(or is set to None).") + return + try: + open(path, 'rb').close() + except OSError as exc: + # (note: the filename is mentioned in the included exc message) + on_error(f"Cannot open file specified as parameter " + f"{param_name!r} ({str(exc)!r}).") + + def __get_own_attribute(self, param_name: str) -> Any: + return getattr(self, param_name, None) + + def __raise_value_error(self, msg: str) -> NoReturn: + raise ValueError(msg) + + def __get_ssl_and_connect_kwargs(self) -> Tuple[dict, dict]: + # Note: the `ca_certs` argument to `set_ssl()` must always be + # provided, otherwise the `stomp.py`'s machinery would *not* + # perform any certificate verification! + ssl_kwargs = dict( + ca_certs=self.ssl_ca_certificate, + cert_file=self.ssl_client_certificate, + key_file=self.ssl_client_certificate_key, + ) + connect_kwargs = dict(wait=True) + return ssl_kwargs, connect_kwargs From 700182cd98f23a94c5dd63b40c404b2d850b1110 Mon Sep 17 00:00:00 2001 From: Jan Kaliszewski Date: Wed, 20 Sep 2023 21:04:32 +0200 Subject: [PATCH 11/12] STOMP bots: add support for authentication by `username` + `password` Each of the *STOMP collector* and *STOMP output* bots obtained the following new configuration parameters: * `auth_by_ssl_client_certificate` (a Boolean flag; it is `True` by default -- to keep backward compatibility); * `username` and `password` -- to be used as STOMP authentication credentials (login and passcode), but *only* if the aforementioned parameter `auth_by_ssl_client_certificate` is `False`. If `auth_by_ssl_client_certificate` is `False`, then the (supported also previously...) `ssl_client_certificate` and `ssl_client_certificate_key` parameters are ignored (i.e., not only left unused, but also there are *no checks* whether the files they refer to actually exist). --- intelmq/bots/collectors/stomp/collector.py | 3 ++ intelmq/bots/outputs/stomp/output.py | 3 ++ intelmq/lib/mixins/stomp.py | 47 +++++++++++++++++----- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/intelmq/bots/collectors/stomp/collector.py b/intelmq/bots/collectors/stomp/collector.py index 610a39fa8..86ebee40c 100644 --- a/intelmq/bots/collectors/stomp/collector.py +++ b/intelmq/bots/collectors/stomp/collector.py @@ -77,6 +77,9 @@ class StompCollectorBot(CollectorBot, StompMixin): exchange: str = '' port: int = 61614 server: str = "n6stream.cert.pl" + auth_by_ssl_client_certificate: bool = True + username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true + password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true ssl_ca_certificate: str = 'ca.pem' # TODO pathlib.Path ssl_client_certificate: str = 'client.pem' # TODO pathlib.Path ssl_client_certificate_key: str = 'client.key' # TODO pathlib.Path diff --git a/intelmq/bots/outputs/stomp/output.py b/intelmq/bots/outputs/stomp/output.py index c491db45f..a28de3f4e 100644 --- a/intelmq/bots/outputs/stomp/output.py +++ b/intelmq/bots/outputs/stomp/output.py @@ -26,6 +26,9 @@ class StompOutputBot(OutputBot, StompMixin): port: int = 61614 server: str = "127.0.0.1" # TODO: could be ip address single_key: bool = False + auth_by_ssl_client_certificate: bool = True + username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true + password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true ssl_ca_certificate: str = 'ca.pem' # TODO: could be pathlib.Path ssl_client_certificate: str = 'client.pem' # TODO: pathlib.Path ssl_client_certificate_key: str = 'client.key' # TODO: patlib.Path diff --git a/intelmq/lib/mixins/stomp.py b/intelmq/lib/mixins/stomp.py index 29b7175b8..41cbd29cb 100644 --- a/intelmq/lib/mixins/stomp.py +++ b/intelmq/lib/mixins/stomp.py @@ -31,6 +31,11 @@ class StompMixin: port: int heartbeat: int + auth_by_ssl_client_certificate: bool + + username: str # to be ignored if `auth_by_ssl_client_certificate` is true + password: str # to be ignored if `auth_by_ssl_client_certificate` is true + ssl_ca_certificate: str # TODO: could be pathlib.Path ssl_client_certificate: str # TODO: could be pathlib.Path ssl_client_certificate_key: str # TODO: could be patlib.Path @@ -102,13 +107,29 @@ def __verify_dependency(cls) -> None: def __verify_parameters(cls, get_param: Callable[[str], Any], on_error: Callable[[str], None]) -> None: - for param_name in [ - 'ssl_ca_certificate', - 'ssl_client_certificate', - 'ssl_client_certificate_key', - ]: + file_param_names = ['ssl_ca_certificate'] + if cls.__should_cert_auth_params_be_verified(get_param, on_error): + file_param_names.extend([ + 'ssl_client_certificate', + 'ssl_client_certificate_key', + ]) + for param_name in file_param_names: cls.__verify_file_param(param_name, get_param, on_error) + @classmethod + def __should_cert_auth_params_be_verified(cls, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> bool: + flag = get_param('auth_by_ssl_client_certificate') + if not isinstance(flag, bool): + # Let us better be strict here -- explicitly rejecting any + # non-`bool` values as potentially misleading (e.g., consider + # a string like "false", which would be interpreted as True). + on_error(f"Parameter 'auth_by_ssl_client_certificate' " + f"is not set to a bool value (got: {flag!r}).") + flag = False + return flag + @classmethod def __verify_file_param(cls, param_name: str, @@ -136,10 +157,16 @@ def __get_ssl_and_connect_kwargs(self) -> Tuple[dict, dict]: # Note: the `ca_certs` argument to `set_ssl()` must always be # provided, otherwise the `stomp.py`'s machinery would *not* # perform any certificate verification! - ssl_kwargs = dict( - ca_certs=self.ssl_ca_certificate, - cert_file=self.ssl_client_certificate, - key_file=self.ssl_client_certificate_key, - ) + ssl_kwargs = dict(ca_certs=self.ssl_ca_certificate) connect_kwargs = dict(wait=True) + if self.auth_by_ssl_client_certificate: + ssl_kwargs.update( + cert_file=self.ssl_client_certificate, + key_file=self.ssl_client_certificate_key, + ) + else: + connect_kwargs.update( + username=self.username, + passcode=self.password, + ) return ssl_kwargs, connect_kwargs From 7c59d49f2e8291fb46f5411a88d6fec9501336c9 Mon Sep 17 00:00:00 2001 From: Jan Kaliszewski Date: Wed, 20 Sep 2023 21:06:48 +0200 Subject: [PATCH 12/12] doc: add/update/improve stuff related to STOMP bots and *n6* The changes include also those regarding *feeds* (values of certain properties of the CERT.PL's "N6 Stomp Stream" feed entry have been updated/improved) and the *changelog*. --- CHANGELOG.md | 39 +++++++++++++++++++++++++++++++++++ docs/user/bots.rst | 20 +++++++++++------- docs/user/n6-integrations.rst | 3 +-- intelmq/etc/feeds.yaml | 11 +++++----- 4 files changed, 58 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0c13e242..72d950193 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,10 +11,22 @@ CHANGELOG ------------------ ### Configuration +- Add new optional configuration parameters for `intelmq.bots.collectors.stomp.collector` + and `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski): + - `auth_by_ssl_client_certificate` (Boolean, default: *true*; if *false* then + `ssl_client_certificate` and `ssl_client_certificate_key` will be ignored); + - `username` (STOMP authentication login, default: "guest"; to be used only + if `auth_by_ssl_client_certificate` is *false*); + - `password` (STOMP authentication passcode, default: "guest"; to be used only + if `auth_by_ssl_client_certificate` is *false*). ### Core - `intelmq.lib.message`: For invalid message keys, add a hint on the failure to the exception: not allowed by configuration or not matching regular expression (PR#2398 by Sebastian Wagner). - `intelmq.lib.exceptions.InvalidKey`: Add optional parameter `additional_text` (PR#2398 by Sebastian Wagner). +- `intelmq.lib.mixins`: Add a new class, `StompMixin` (defined in a new submodule: `stomp`), + which provides certain common STOMP-bot-specific operations, factored out from + `intelmq.bots.collectors.stomp.collector` and `intelmq.bots.outputs.stomp.output` + (PR#2408 by Jan Kaliszewski). ### Development @@ -22,15 +34,42 @@ CHANGELOG ### Bots #### Collectors +- `intelmq.bots.collectors.stomp.collector` (PR#2408 by Jan Kaliszewski): + - Add support for authentication based on STOMP login and passcode, + introducing 3 new configuration parameters (see above: *Configuration*). + - Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`); + fixes [#2342](https://github.com/certtools/intelmq/issues/2342). + - Fix the reconnection behavior: do not attempt to reconnect after `shutdown`. Also, + never attempt to reconnect if the version of `stomp.py` is older than `4.1.21` (it + did not work properly anyway). + - Add coercion of the `port` config parameter to `int`. + - Add implementation of the `check` hook (verifying, in particular, accessibility + of necessary file(s)). + - Remove undocumented and unused attributes of `StompCollectorBot` instances: + `ssl_ca_cert`, `ssl_cl_cert`, `ssl_cl_cert_key`. + - Minor fixes/improvements and some refactoring (see also above: *Core*...). #### Parsers #### Experts #### Outputs +- `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski): + - Add support for authentication based on STOMP login and passcode, + introducing 3 new configuration parameters (see above: *Configuration*). + - Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`). + - Fix `AttributeError` caused by attempts to get unset attributes of `StompOutputBot` + (`ssl_ca_cert` et consortes). + - Add coercion of the `port` config parameter to `int`. + - Add implementation of the `check` hook (verifying, in particular, accessibility + of necessary file(s)). + - Add `stomp.py` version check (raise `MissingDependencyError` if not `>=4.1.8`). + - Minor fixes/improvements and some refactoring (see also above: *Core*...). ### Documentation - Add a readthedocs configuration file to fix the build fail (PR#2403 by Sebastian Wagner). +- Update/fix/improve the stuff related to the STOMP bots and integration with the *n6*'s + Stream API (PR#2408 by Jan Kaliszewski). ### Packaging diff --git a/docs/user/bots.rst b/docs/user/bots.rst index 8e8f36396..2fbe27df8 100644 --- a/docs/user/bots.rst +++ b/docs/user/bots.rst @@ -945,12 +945,15 @@ Install the `stomp.py` library from PyPI: **Configuration Parameters** * **Feed parameters** (see above) -* `exchange`: exchange point +* `exchange`: STOMP *destination* to subscribe to, e.g. "/exchange/my.org/*.*.*.*" * `port`: 61614 -* `server`: hostname e.g. "n6stream.cert.pl" +* `server`: hostname, e.g. "n6stream.cert.pl" * `ssl_ca_certificate`: path to CA file -* `ssl_client_certificate`: path to client cert file -* `ssl_client_certificate_key`: path to client cert key file +* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth) +* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true +* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true +* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false +* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false .. _intelmq.bots.collectors.twitter.collector_twitter: @@ -4305,7 +4308,7 @@ Also you will need a so called "exchange point". **Configuration Parameters** -* `exchange`: The exchange to push at +* `exchange`: STOMP *destination* to push at, e.g. "/exchange/_push" * `heartbeat`: default: 60000 * `message_hierarchical_output`: Boolean, default: false * `message_jsondict_as_string`: Boolean, default: false @@ -4314,8 +4317,11 @@ Also you will need a so called "exchange point". * `server`: Host or IP address of the STOMP server * `single_key`: Boolean or string (field name), default: false * `ssl_ca_certificate`: path to CA file -* `ssl_client_certificate`: path to client cert file -* `ssl_client_certificate_key`: path to client cert key file +* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth) +* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true +* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true +* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false +* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false .. _intelmq.bots.outputs.tcp.output: diff --git a/docs/user/n6-integrations.rst b/docs/user/n6-integrations.rst index e4dc6ce9c..681678224 100644 --- a/docs/user/n6-integrations.rst +++ b/docs/user/n6-integrations.rst @@ -11,10 +11,9 @@ n6 is maintained and developed by `CERT.pl `_. Information about n6 can be found here: -- Website: `n6.cert.pl `_ +- Website: `cert.pl/en/n6 `_ - Source Code: `github.com/CERT-Polska/n6 `_ - n6 documentation: `n6.readthedocs.io `_ -- n6sdk developer documentation: `n6sdk.readthedocs.io `_ .. image:: /_static/n6/n6-schemat2.png :alt: n6 schema diff --git a/intelmq/etc/feeds.yaml b/intelmq/etc/feeds.yaml index 72e1fb825..f79de2dfb 100644 --- a/intelmq/etc/feeds.yaml +++ b/intelmq/etc/feeds.yaml @@ -1158,20 +1158,19 @@ providers: module: intelmq.bots.collectors.stomp.collector parameters: exchange: "{insert your exchange point as given by CERT.pl}" - ssl_client_certificate_key: "{insert path to client cert key file for - CERT.pl's n6}" ssl_ca_certificate: "{insert path to CA file for CERT.pl's n6}" + auth_by_ssl_client_certificate: false + username: "{insert n6 user's login}" + password: "{insert n6 user's API key}" port: '61614' - ssl_client_certificate: "{insert path to client cert file for CERTpl's - n6}" server: n6stream.cert.pl name: __FEED__ provider: __PROVIDER__ parser: module: intelmq.bots.parsers.n6.parser_n6stomp parameters: - revision: 2018-01-20 - documentation: https://n6.cert.pl/en/ + revision: 2023-09-23 + documentation: https://n6.readthedocs.io/usage/streamapi/ public: false AlienVault: OTX: