diff --git a/CHANGELOG.md b/CHANGELOG.md index e9eebdb34..b636e6354 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,11 +11,23 @@ CHANGELOG ------------------ ### Configuration +- Add new optional configuration parameters for `intelmq.bots.collectors.stomp.collector` + and `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski): + - `auth_by_ssl_client_certificate` (Boolean, default: *true*; if *false* then + `ssl_client_certificate` and `ssl_client_certificate_key` will be ignored); + - `username` (STOMP authentication login, default: "guest"; to be used only + if `auth_by_ssl_client_certificate` is *false*); + - `password` (STOMP authentication passcode, default: "guest"; to be used only + if `auth_by_ssl_client_certificate` is *false*). ### Core - `intelmq.lib.message`: For invalid message keys, add a hint on the failure to the exception: not allowed by configuration or not matching regular expression (PR#2398 by Sebastian Wagner). - `intelmq.lib.exceptions.InvalidKey`: Add optional parameter `additional_text` (PR#2398 by Sebastian Wagner). - Change the way we discover bots to allow easy extending based on the entry point name. (PR#2413 by Kamil Mankowski) +- `intelmq.lib.mixins`: Add a new class, `StompMixin` (defined in a new submodule: `stomp`), + which provides certain common STOMP-bot-specific operations, factored out from + `intelmq.bots.collectors.stomp.collector` and `intelmq.bots.outputs.stomp.output` + (PR#2408 by Jan Kaliszewski). ### Development @@ -23,22 +35,50 @@ CHANGELOG ### Bots #### Collectors +- `intelmq.bots.collectors.stomp.collector` (PR#2408 by Jan Kaliszewski): + - Add support for authentication based on STOMP login and passcode, + introducing 3 new configuration parameters (see above: *Configuration*). + - Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`); + fixes [#2342](https://github.com/certtools/intelmq/issues/2342). + - Fix the reconnection behavior: do not attempt to reconnect after `shutdown`. Also, + never attempt to reconnect if the version of `stomp.py` is older than `4.1.21` (it + did not work properly anyway). + - Add coercion of the `port` config parameter to `int`. + - Add implementation of the `check` hook (verifying, in particular, accessibility + of necessary file(s)). + - Remove undocumented and unused attributes of `StompCollectorBot` instances: + `ssl_ca_cert`, `ssl_cl_cert`, `ssl_cl_cert_key`. + - Minor fixes/improvements and some refactoring (see also above: *Core*...). #### Parsers #### Experts #### Outputs +- `intelmq.bots.outputs.stomp.output` (PR#2408 by Jan Kaliszewski): + - Add support for authentication based on STOMP login and passcode, + introducing 3 new configuration parameters (see above: *Configuration*). + - Update the code to support new versions of `stomp.py`, including the latest (`8.1.0`). + - Fix `AttributeError` caused by attempts to get unset attributes of `StompOutputBot` + (`ssl_ca_cert` et consortes). + - Add coercion of the `port` config parameter to `int`. + - Add implementation of the `check` hook (verifying, in particular, accessibility + of necessary file(s)). + - Add `stomp.py` version check (raise `MissingDependencyError` if not `>=4.1.8`). + - Minor fixes/improvements and some refactoring (see also above: *Core*...). ### Documentation - Add a readthedocs configuration file to fix the build fail (PR#2403 by Sebastian Wagner). - Add a guide of developing extensions packages (PR#2413 by Kamil Mankowski) +- Update/fix/improve the stuff related to the STOMP bots and integration with the *n6*'s + Stream API (PR#2408 by Jan Kaliszewski). ### Packaging ### Tests ### Tools + - `intelmq_psql_initdb` got support for providing custom harmonization file, generating view for storing `raw` fields separately, and adding `IF NOT EXISTS`/`OR REPLACE` clauses ([PR#2404](https://github.com/certtools/intelmq/pull/2404) by Kamil Mankowski). ### Contrib diff --git a/contrib/eventdb/separate-raws-table.sql b/contrib/eventdb/separate-raws-table.sql deleted file mode 100644 index 6510630ad..000000000 --- a/contrib/eventdb/separate-raws-table.sql +++ /dev/null @@ -1,305 +0,0 @@ --- SPDX-FileCopyrightText: 2021 Sebastian Wagner --- --- SPDX-License-Identifier: AGPL-3.0-or-later - --- Create the table holding only the "raw" values: - -CREATE TABLE public.raws ( - event_id bigint, - raw text -); - -ALTER TABLE - public.raws OWNER TO intelmq; - -CREATE INDEX idx_raws_event_id ON public.raws USING btree (event_id); - -ALTER TABLE - ONLY public.raws -ADD - CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE; - --- Create the v_events view which joins the tables "events" and "raws" - -CREATE VIEW public.v_events AS - SELECT events.id, - events."classification.identifier", - events."classification.taxonomy", - events."classification.type", - events."comment", - events."destination.abuse_contact", - events."destination.account", - events."destination.allocated", - events."destination.asn", - events."destination.as_name", - events."destination.domain_suffix", - events."destination.fqdn", - events."destination.geolocation.cc", - events."destination.geolocation.city", - events."destination.geolocation.country", - events."destination.geolocation.latitude", - events."destination.geolocation.longitude", - events."destination.geolocation.region", - events."destination.geolocation.state", - events."destination.ip", - events."destination.local_hostname", - events."destination.local_ip", - events."destination.network", - events."destination.port", - events."destination.registry", - events."destination.reverse_dns", - events."destination.tor_node", - events."destination.url", - events."destination.urlpath", - events."event_description.target", - events."event_description.text", - events."event_description.url", - events."event_hash", - events."extra", - events."feed.accuracy", - events."feed.code", - events."feed.documentation", - events."feed.name", - events."feed.provider", - events."feed.url", - events."malware.hash", - events."malware.hash.md5", - events."malware.hash.sha1", - events."malware.hash.sha256", - events."malware.name", - events."malware.version", - events."misp.attribute_uuid", - events."misp.event_uuid", - events."protocol.application", - events."protocol.transport", - events."rtir_id", - events."screenshot_url", - events."source.abuse_contact", - events."source.account", - events."source.allocated", - events."source.asn", - events."source.as_name", - events."source.domain_suffix", - events."source.fqdn", - events."source.geolocation.cc", - events."source.geolocation.city", - events."source.geolocation.country", - events."source.geolocation.cymru_cc", - events."source.geolocation.geoip_cc", - events."source.geolocation.latitude", - events."source.geolocation.longitude", - events."source.geolocation.region", - events."source.geolocation.state", - events."source.ip", - events."source.local_hostname", - events."source.local_ip", - events."source.network", - events."source.port", - events."source.registry", - events."source.reverse_dns", - events."source.tor_node", - events."source.url", - events."source.urlpath", - events."status", - events."time.observation", - events."time.source", - events."tlp", - raws."event_id", - raws."raw" - FROM ( - public.events - JOIN public.raws ON ((events.id = raws.event_id))); - --- Establish the INSERT trigger for the events table, splitting the data into events and raws - -CREATE FUNCTION public.process_v_events_insert() - RETURNS trigger - LANGUAGE plpgsql - AS $$ - DECLARE event_id integer; - - BEGIN - -- add all data except raw to events - INSERT INTO - events ( - "classification.identifier", - "classification.taxonomy", - "classification.type", - "comment", - "destination.abuse_contact", - "destination.account", - "destination.allocated", - "destination.asn", - "destination.as_name", - "destination.domain_suffix", - "destination.fqdn", - "destination.geolocation.cc", - "destination.geolocation.city", - "destination.geolocation.country", - "destination.geolocation.latitude", - "destination.geolocation.longitude", - "destination.geolocation.region", - "destination.geolocation.state", - "destination.ip", - "destination.local_hostname", - "destination.local_ip", - "destination.network", - "destination.port", - "destination.registry", - "destination.reverse_dns", - "destination.tor_node", - "destination.url", - "destination.urlpath", - "event_description.target", - "event_description.text", - "event_description.url", - "event_hash", - "extra", - "feed.accuracy", - "feed.code", - "feed.documentation", - "feed.name", - "feed.provider", - "feed.url", - "malware.hash", - "malware.hash.md5", - "malware.hash.sha1", - "malware.name", - "malware.version", - "misp.attribute_uuid", - "misp.event_uuid", - "protocol.application", - "protocol.transport", - "rtir_id", - "screenshot_url", - "source.abuse_contact", - "source.account", - "source.allocated", - "source.asn", - "source.as_name", - "source.domain_suffix", - "source.fqdn", - "source.geolocation.cc", - "source.geolocation.city", - "source.geolocation.country", - "source.geolocation.cymru_cc", - "source.geolocation.geoip_cc", - "source.geolocation.latitude", - "source.geolocation.longitude", - "source.geolocation.region", - "source.geolocation.state", - "source.ip", - "source.local_hostname", - "source.local_ip", - "source.network", - "source.port", - "source.registry", - "source.reverse_dns", - "source.tor_node", - "source.url", - "source.urlpath", - "status", - "time.observation", - "time.source", - "tlp" - ) - VALUES - ( - NEW."classification.identifier", - NEW."classification.taxonomy", - NEW."classification.type", - NEW."comment", - NEW."destination.abuse_contact", - NEW."destination.account", - NEW."destination.allocated", - NEW."destination.asn", - NEW."destination.as_name", - NEW."destination.domain_suffix", - NEW."destination.fqdn", - NEW."destination.geolocation.cc", - NEW."destination.geolocation.city", - NEW."destination.geolocation.country", - NEW."destination.geolocation.latitude", - NEW."destination.geolocation.longitude", - NEW."destination.geolocation.region", - NEW."destination.geolocation.state", - NEW."destination.ip", - NEW."destination.local_hostname", - NEW."destination.local_ip", - NEW."destination.network", - NEW."destination.port", - NEW."destination.registry", - NEW."destination.reverse_dns", - NEW."destination.tor_node", - NEW."destination.url", - NEW."destination.urlpath", - NEW."event_description.target", - NEW."event_description.text", - NEW."event_description.url", - NEW."event_hash", - NEW."extra", - NEW."feed.accuracy", - NEW."feed.code", - NEW."feed.documentation", - NEW."feed.name", - NEW."feed.provider", - NEW."feed.url", - NEW."malware.hash", - NEW."malware.hash.md5", - NEW."malware.hash.sha1", - NEW."malware.name", - NEW."malware.version", - NEW."misp.attribute_uuid", - NEW."misp.event_uuid", - NEW."protocol.application", - NEW."protocol.transport", - NEW."rtir_id", - NEW."screenshot_url", - NEW."source.abuse_contact", - NEW."source.account", - NEW."source.allocated", - NEW."source.asn", - NEW."source.as_name", - NEW."source.domain_suffix", - NEW."source.fqdn", - NEW."source.geolocation.cc", - NEW."source.geolocation.city", - NEW."source.geolocation.country", - NEW."source.geolocation.cymru_cc", - NEW."source.geolocation.geoip_cc", - NEW."source.geolocation.latitude", - NEW."source.geolocation.longitude", - NEW."source.geolocation.region", - NEW."source.geolocation.state", - NEW."source.ip", - NEW."source.local_hostname", - NEW."source.local_ip", - NEW."source.network", - NEW."source.port", - NEW."source.registry", - NEW."source.reverse_dns", - NEW."source.tor_node", - NEW."source.url", - NEW."source.urlpath", - NEW."status", - NEW."time.observation", - NEW."time.source", - NEW."tlp" - ) RETURNING id INTO event_id; - - -- add the raw value to raws, link with the event_id - INSERT INTO - raws ("event_id", "raw") - VALUES - (event_id, NEW.raw); - - RETURN NEW; - - END; -$$; - -CREATE TRIGGER tr_events - INSTEAD OF INSERT - ON public.v_events - FOR EACH ROW - EXECUTE FUNCTION public.process_v_events_insert(); diff --git a/docs/user/bots.rst b/docs/user/bots.rst index 8e8f36396..2fbe27df8 100644 --- a/docs/user/bots.rst +++ b/docs/user/bots.rst @@ -945,12 +945,15 @@ Install the `stomp.py` library from PyPI: **Configuration Parameters** * **Feed parameters** (see above) -* `exchange`: exchange point +* `exchange`: STOMP *destination* to subscribe to, e.g. "/exchange/my.org/*.*.*.*" * `port`: 61614 -* `server`: hostname e.g. "n6stream.cert.pl" +* `server`: hostname, e.g. "n6stream.cert.pl" * `ssl_ca_certificate`: path to CA file -* `ssl_client_certificate`: path to client cert file -* `ssl_client_certificate_key`: path to client cert key file +* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth) +* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true +* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true +* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false +* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false .. _intelmq.bots.collectors.twitter.collector_twitter: @@ -4305,7 +4308,7 @@ Also you will need a so called "exchange point". **Configuration Parameters** -* `exchange`: The exchange to push at +* `exchange`: STOMP *destination* to push at, e.g. "/exchange/_push" * `heartbeat`: default: 60000 * `message_hierarchical_output`: Boolean, default: false * `message_jsondict_as_string`: Boolean, default: false @@ -4314,8 +4317,11 @@ Also you will need a so called "exchange point". * `server`: Host or IP address of the STOMP server * `single_key`: Boolean or string (field name), default: false * `ssl_ca_certificate`: path to CA file -* `ssl_client_certificate`: path to client cert file -* `ssl_client_certificate_key`: path to client cert key file +* `auth_by_ssl_client_certificate`: Boolean, default: true (note: set to false for new *n6* auth) +* `ssl_client_certificate`: path to client cert file, used only if `auth_by_ssl_client_certificate` is true +* `ssl_client_certificate_key`: path to client cert key file, used only if `auth_by_ssl_client_certificate` is true +* `username`: STOMP *login* (e.g., *n6* user login), used only if `auth_by_ssl_client_certificate` is false +* `password`: STOMP *passcode* (e.g., *n6* user API key), used only if `auth_by_ssl_client_certificate` is false .. _intelmq.bots.outputs.tcp.output: diff --git a/docs/user/eventdb.rst b/docs/user/eventdb.rst index 971094e6c..b3704cd0a 100644 --- a/docs/user/eventdb.rst +++ b/docs/user/eventdb.rst @@ -8,22 +8,42 @@ EventDB The EventDB is not a software itself. -The EventDB is a database (usually `PostgreSQL `_) that gets filled with with data from IntelMQ using the :ref:`intelmq.bots.outputs.sql.output` Output Bot. +The EventDB is a database (usually `PostgreSQL `_) that gets filled with data +from IntelMQ using the :ref:`intelmq.bots.outputs.sql.output` Output Bot. Its core is the `events` +table with the structure corresponding to the :doc:`IntelMQ Data Format `. +Having the table created is required to use the EventDB. + +.. _intelmq_psql_initdb: ----------------------- -The events table itself +intelmq_psql_initdb ----------------------- -IntelMQ comes with the ``intelmq_psql_initdb`` command line tool. It creates an SQL file containing: +IntelMQ comes with the ``intelmq_psql_initdb`` command line tool designed to help with creating the +EventDB. It creates in the first line: - A ``CREATE TABLE events`` statement with all valid IntelMQ fields as columns and correct types - Several indexes as examples for a good read & search performance -All elements of this SQL file can be adapted and extended before running the SQL file against a database, especially the indexes. +Having an `events` table as outlined in the SQL file, IntelMQ's :ref:`intelmq.bots.outputs.sql.output` +Output Bot can write all received events into this database table. + +In addition, the script supports some additional features supporting use cases described later in +this document: + +- ``--partition-key`` - for generating schema aligned with :ref:`TimescaleDB ` + or partitioned tables, +- ``--separate-raws`` - for generating views and triggers needed to :ref:`eventdb_raws_table` + (works also together with adjustments for partitioning). + +For a full list of supported parameters, call the script help using ``-h`` parameter. -Having an `events` table as outlined in the SQL file, IntelMQ's :ref:`intelmq.bots.outputs.sql.output` Output Bot can write all received events into this database table. +All elements of the generated SQL file can be adapted and extended before running the SQL file against +a database, especially the indexes. Please review the generated script before applying. -This events table is the core of the so-called EventDB and also required by all other sections of this document. +Be aware that if you create tables using another DB user that is used later by the output bot, you may +need to adjust ownership or privileges in the database. If you have problems with database permissions, +refer to `PostgreSQL documentation `. ----------------- EventDB Utilities @@ -77,11 +97,13 @@ By modifying the configuration file it is possible to configure various queries :alt: EventDB Statistics Example +.. _timescaledb: + ------------------------------- Using EventDB with Timescale DB ------------------------------- -`Timescale DB `_ is a PostgreSQL extension to add time-series support, which is quite handy as you dont have to learn other syntaxes as you already know. You can use the SQL Queries as before, the extension will handle the rest. +`Timescale DB `_ is a PostgreSQL extension to add time-series support, which is quite handy as you don't have to learn other syntaxes as you already know. You can use the SQL Queries as before, the extension will handle the rest. To see all limitations, please check the `Timescale DB Documentation `_. What is time-series? @@ -91,10 +113,28 @@ Time-series has been invented as traditional database design like relational or A big benefit of time-series instead of other database designs over a time-based search pattern is the performance. As IntelMQ uses data based upon time, this design is awesome & will give you a performance boost. -How to setup ------------- +How to choose the time column? +------------------------------ + +To utilize the time-series, choose a column containing the right time. This is then +used by you for manual queries and graphs, and also by the database itself for organizing the data. + +The :doc:`IntelMQ Data Format ` has two fields that can be used for this: +``time.source`` or ``time.observation``. Depending on your needs (tracking when the event occurred or when +it was detected, if different), choose one of them. + +You can use the :ref:`intelmq_psql_initdb` tool to generate SQL schema valid for TimescaleDB by passing +the partitioning key: + +.. code-block:: bash + + intelmq_psql_initdb --partition-key "time.source" + +How to setup? +------------- + +Thanks to TimescaleDB, it's very easy to setup. -Thanks to TimescaleDB its very easy to setup. 1. Choose your preferred `Timescale DB `_ environment & follow the installation instructions. 2. Now lets create a `hypertable `_, which is the timescale DB time-series structure. ``SELECT create_hypertable('', 'time.source');``. 3. Now our hypertable is setup & timescaleDB takes care of the rest. You can perform queries as usual, for further information please check `Timescale DB Documentation `_. @@ -132,5 +172,5 @@ The last steps brings us several advantages: - No code changes are needed in the IntelMQ output bot or your own scripts. A migration is seamless. - PostgreSQL itself ensures that the data of both tables is consistent and linked correctly. -The complete SQL script can be found in the `contrib/eventdb `_ directory of IntelMQ. +The complete SQL script can be generated using :ref:`intelmq_psql_initdb`. It does *not* cover step 2 to avoid accidental data loss - you need to do this step manually. diff --git a/docs/user/n6-integrations.rst b/docs/user/n6-integrations.rst index e4dc6ce9c..681678224 100644 --- a/docs/user/n6-integrations.rst +++ b/docs/user/n6-integrations.rst @@ -11,10 +11,9 @@ n6 is maintained and developed by `CERT.pl `_. Information about n6 can be found here: -- Website: `n6.cert.pl `_ +- Website: `cert.pl/en/n6 `_ - Source Code: `github.com/CERT-Polska/n6 `_ - n6 documentation: `n6.readthedocs.io `_ -- n6sdk developer documentation: `n6sdk.readthedocs.io `_ .. image:: /_static/n6/n6-schemat2.png :alt: n6 schema diff --git a/intelmq/bin/intelmq_psql_initdb.py b/intelmq/bin/intelmq_psql_initdb.py index ff19107a7..34a55a380 100644 --- a/intelmq/bin/intelmq_psql_initdb.py +++ b/intelmq/bin/intelmq_psql_initdb.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2015 Sebastian Wagner +# SPDX-FileCopyrightText: 2015 Sebastian Wagner, 2023 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -10,11 +10,11 @@ The SQL file is saved in `/tmp/initdb.sql` or a temporary name if the other one exists. """ +import argparse import json import os import sys import tempfile -import argparse from intelmq import HARMONIZATION_CONF_FILE @@ -32,8 +32,109 @@ """ -def generate(harmonization_file=HARMONIZATION_CONF_FILE): +def _generate_events_schema(fields: dict, partition_key: str = None) -> list: + sql_lines = [] + sql_lines.append("CREATE TABLE{if_not_exist} events (") + sql_lines.append(f' "id" BIGSERIAL{" UNIQUE PRIMARY KEY" if not partition_key else ""},') + + for field, field_type in sorted(fields.items()): + sql_lines.append(f' "{field}" {field_type},') + + if not partition_key: + sql_lines[-1] = sql_lines[-1][:-1] # remove last ',' + else: + sql_lines.append(f' PRIMARY KEY ("id", "{partition_key}")') + sql_lines.append(");") + + for index in INDICES: + sql_lines.append(f'CREATE INDEX{{if_not_exist}} "idx_events_{index}" ON events USING btree ("{index}");') + return sql_lines + + +RAW_TABLE_PART = """ +CREATE TABLE{if_not_exist} public.raws ( + event_id bigint, + raw text, + PRIMARY KEY(event_id)""" + +RAW_TRIGGER = """ +CREATE{or_replace} TRIGGER tr_events + INSTEAD OF INSERT + ON public.v_events + FOR EACH ROW + EXECUTE FUNCTION public.process_v_events_insert(); +""" + + +def _generate_separated_raws_schema(fields: dict, partition_key: str) -> list: + sorted_fields = sorted(key for key in fields.keys() if key != "raw") + sql_lines = ['', '-- Create the table holding only the "raw" values', RAW_TABLE_PART] + if not partition_key: + sql_lines[-1] += "," + sql_lines.append(" CONSTRAINT raws_event_id_fkey FOREIGN KEY (event_id) REFERENCES public.events(id) ON DELETE CASCADE") + sql_lines.append(");") + + sql_lines.extend([ + '', + '-- Create the v_events view which joins the tables "events" and "raws"\n', + 'CREATE{or_replace} VIEW public.v_events AS', + ' SELECT', + ' events.id,', + ]) + for field in sorted_fields: + sql_lines.append(f' events."{field}",') + sql_lines.extend([ + ' raws."event_id",', + ' raws."raw"', + ' FROM (', + ' public.events', + ' JOIN public.raws ON ((events.id = raws.event_id)));' + ]) + + sql_lines.extend([ + '', + '-- Establish the INSERT trigger for the events table, splitting the data into events and raws', + '', + 'CREATE{or_replace} FUNCTION public.process_v_events_insert()', + ' RETURNS trigger', + ' LANGUAGE plpgsql', + ' AS $$', + ' DECLARE event_id integer;', + '', + ' BEGIN', + ' INSERT INTO', + ' events (', + ]) + for field in sorted_fields: + sql_lines.append(f' "{field}"{"," if field != sorted_fields[-1] else ""}') + sql_lines.extend([ + ' )', + ' VALUES', + ' (', + ]) + for field in sorted_fields: + sql_lines.append(f' NEW."{field}"{"," if field != sorted_fields[-1] else ""}') + sql_lines.extend([ + ' )', + ' RETURNING id INTO event_id;', + ' INSERT INTO', + ' raws ("event_id", "raw")', + ' VALUES', + ' (event_id, NEW.raw);', + ' RETURN NEW;', + ' END;', + '$$;' + ]) + + sql_lines.append(RAW_TRIGGER) + + return sql_lines + + +def generate(harmonization_file=HARMONIZATION_CONF_FILE, skip_events=False, + separate_raws=False, partition_key=None, skip_or_replace=False): FIELDS = {} + sql_lines = [] try: print("INFO - Reading %s file" % harmonization_file) @@ -75,17 +176,18 @@ def generate(harmonization_file=HARMONIZATION_CONF_FILE): FIELDS[field] = dbtype - initdb = """CREATE TABLE events ( - "id" BIGSERIAL UNIQUE PRIMARY KEY,""" - for field, field_type in sorted(FIELDS.items()): - initdb += f'\n "{field}" {field_type},' + if not skip_events: + sql_lines.extend(_generate_events_schema(FIELDS, partition_key)) - initdb = initdb[:-1] # remove last ',' - initdb += "\n);\n" + if separate_raws: + sql_lines.extend(_generate_separated_raws_schema(FIELDS, partition_key)) - for index in INDICES: - initdb += 'CREATE INDEX "idx_events_{0}" ON events USING btree ("{0}");\n'.format(index) - return initdb + existing_clause = " IF NOT EXISTS" if skip_or_replace else "" + replace_clause = " OR REPLACE" if skip_or_replace else "" + + return "\n".join( + line.format(if_not_exist=existing_clause, or_replace=replace_clause) for line in sql_lines + ) def main(): @@ -97,6 +199,19 @@ def main(): help='Defines the Ouputfile', default='/tmp/initdb.sql' ) + parser.add_argument("--no-events", action="store_true", default=False, + help="Skip generating the events table schema") + parser.add_argument("--separate-raws", action="store_true", default=False, + help="Generate v_events view to separate raw field from the rest of the data on insert") + parser.add_argument("--partition-key", default=None, + help=("Add given field to the primary key of the events table to allow " + "partitioning in the database. Useful especially for setups with " + "TimescaleDB, see IntelMQ documentation for more advices. " + "If combined with --separate-raws, the v_events does not get foreign key")) + parser.add_argument("--harmonization", default=HARMONIZATION_CONF_FILE, + help="Path to the harmonization file") + parser.add_argument("--skip-or-replace", default=False, action="store_true", + help="Add IF NOT EXISTS or REPLACE directive to created schemas") args = parser.parse_args() OUTPUTFILE = args.outputfile @@ -109,7 +224,12 @@ def main(): fp = os.fdopen(os_fp, 'wt') else: fp = open(OUTPUTFILE, 'w') - psql = generate() + psql = generate(args.harmonization, + skip_events=args.no_events, + separate_raws=args.separate_raws, + partition_key=args.partition_key, + skip_or_replace=args.skip_or_replace, + ) print("INFO - Writing %s file" % OUTPUTFILE) fp.write(psql) finally: diff --git a/intelmq/bots/collectors/stomp/collector.py b/intelmq/bots/collectors/stomp/collector.py index 0c9f60456..86ebee40c 100644 --- a/intelmq/bots/collectors/stomp/collector.py +++ b/intelmq/bots/collectors/stomp/collector.py @@ -3,13 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import os.path from intelmq.lib.bot import CollectorBot -from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.mixins import StompMixin try: import stomp + import stomp.exception except ImportError: stomp = None else: @@ -18,9 +18,10 @@ class StompListener(stomp.PrintingListener): the stomp listener gets called asynchronously for every STOMP message """ - def __init__(self, n6stompcollector, conn, destination): + def __init__(self, n6stompcollector, conn, destination, connect_kwargs=None): self.stompbot = n6stompcollector self.conn = conn + self.connect_kwargs = connect_kwargs self.destination = destination super().__init__() if stomp.__version__ >= (5, 0, 0): @@ -29,15 +30,23 @@ def __init__(self, n6stompcollector, conn, destination): def on_heartbeat_timeout(self): self.stompbot.logger.info("Heartbeat timeout. Attempting to re-connect.") - connect_and_subscribe(self.conn, self.stompbot.logger, self.destination) - - def on_error(self, headers, message): - self.stompbot.logger.error('Received an error: %r.', message) - - def on_message(self, headers, message): - self.stompbot.logger.debug('Receive message %r...', message[:500]) + if self.stompbot._auto_reconnect: + connect_and_subscribe(self.conn, self.stompbot.logger, self.destination, + connect_kwargs=self.connect_kwargs) + + def on_error(self, frame, body=None): + if body is None: + # `stomp.py >= 6.1.0` + body = frame.body + self.stompbot.logger.error('Received an error: %r.', body) + + def on_message(self, frame, body=None): + if body is None: + # `stomp.py >= 6.1.0` + body = frame.body + self.stompbot.logger.debug('Receive message %r...', body[:500]) report = self.stompbot.new_report() - report.add("raw", message.rstrip()) + report.add("raw", body.rstrip()) report.add("feed.url", "stomp://" + self.stompbot.server + ":" + str(self.stompbot.port) + @@ -46,24 +55,31 @@ def on_message(self, headers, message): def on_disconnected(self): self.stompbot.logger.debug('Detected disconnect') - connect_and_subscribe(self.conn, self.stompbot.logger, self.destination) + if self.stompbot._auto_reconnect: + connect_and_subscribe(self.conn, self.stompbot.logger, self.destination, + connect_kwargs=self.connect_kwargs) -def connect_and_subscribe(conn, logger, destination, start=False): +def connect_and_subscribe(conn, logger, destination, start=False, connect_kwargs=None): if start: conn.start() - conn.connect(wait=True) + if connect_kwargs is None: + connect_kwargs = dict(wait=True) + conn.connect(**connect_kwargs) conn.subscribe(destination=destination, id=1, ack='auto') logger.info('Successfully connected and subscribed.') -class StompCollectorBot(CollectorBot): +class StompCollectorBot(CollectorBot, StompMixin): """Collect data from a STOMP Interface""" """ main class for the STOMP protocol collector """ exchange: str = '' port: int = 61614 server: str = "n6stream.cert.pl" + auth_by_ssl_client_certificate: bool = True + username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true + password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true ssl_ca_certificate: str = 'ca.pem' # TODO pathlib.Path ssl_client_certificate: str = 'client.pem' # TODO pathlib.Path ssl_client_certificate_key: str = 'client.key' # TODO pathlib.Path @@ -73,36 +89,22 @@ class StompCollectorBot(CollectorBot): __conn = False # define here so shutdown method can check for it def init(self): - if stomp is None: - raise MissingDependencyError("stomp") - elif stomp.__version__ < (4, 1, 8): - raise MissingDependencyError("stomp", version="4.1.8", - installed=stomp.__version__) - - self.ssl_ca_cert = self.ssl_ca_certificate - self.ssl_cl_cert = self.ssl_client_certificate - self.ssl_cl_cert_key = self.ssl_client_certificate_key - - # check if certificates exist - for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]: - if not os.path.isfile(f): - raise ValueError("Could not open file %r." % f) - - _host = [(self.server, self.port)] - self.__conn = stomp.Connection(host_and_ports=_host, use_ssl=True, - ssl_key_file=self.ssl_cl_cert_key, - ssl_cert_file=self.ssl_cl_cert, - ssl_ca_certs=self.ssl_ca_cert, - heartbeats=(self.heartbeat, - self.heartbeat)) - - self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange)) + self.stomp_bot_runtime_initial_check() + + # (note: older versions of `stomp.py` do not play well with reconnects) + self._auto_reconnect = (stomp.__version__ >= (4, 1, 21)) + + self.__conn, connect_kwargs = self.prepare_stomp_connection() + self.__conn.set_listener('', StompListener(self, self.__conn, self.exchange, + connect_kwargs=connect_kwargs)) connect_and_subscribe(self.__conn, self.logger, self.exchange, - start=stomp.__version__ < (4, 1, 20)) + start=stomp.__version__ < (4, 1, 20), + connect_kwargs=connect_kwargs) def shutdown(self): if not stomp or not self.__conn: return + self._auto_reconnect = False try: self.__conn.disconnect() except stomp.exception.NotConnectedException: @@ -111,5 +113,9 @@ def shutdown(self): def process(self): pass + @classmethod + def check(cls, parameters): + return cls.stomp_bot_parameters_check(parameters) or None + BOT = StompCollectorBot diff --git a/intelmq/bots/outputs/stomp/output.py b/intelmq/bots/outputs/stomp/output.py index 3b5e30d6e..a28de3f4e 100644 --- a/intelmq/bots/outputs/stomp/output.py +++ b/intelmq/bots/outputs/stomp/output.py @@ -3,11 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- -import os.path from intelmq.lib.bot import OutputBot -from intelmq.lib.exceptions import MissingDependencyError - +from intelmq.lib.mixins import StompMixin try: import stomp @@ -15,7 +13,7 @@ stomp = None -class StompOutputBot(OutputBot): +class StompOutputBot(OutputBot, StompMixin): """Send events to a STMOP server""" """ main class for the STOMP protocol output bot """ exchange: str = "/exchange/_push" @@ -28,6 +26,9 @@ class StompOutputBot(OutputBot): port: int = 61614 server: str = "127.0.0.1" # TODO: could be ip address single_key: bool = False + auth_by_ssl_client_certificate: bool = True + username: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true + password: str = 'guest' # ignored if `auth_by_ssl_client_certificate` is true ssl_ca_certificate: str = 'ca.pem' # TODO: could be pathlib.Path ssl_client_certificate: str = 'client.pem' # TODO: pathlib.Path ssl_client_certificate_key: str = 'client.key' # TODO: patlib.Path @@ -35,29 +36,18 @@ class StompOutputBot(OutputBot): _conn = None def init(self): - if stomp is None: - raise MissingDependencyError("stomp") - - # check if certificates exist - for f in [self.ssl_ca_cert, self.ssl_cl_cert, self.ssl_cl_cert_key]: - if not os.path.isfile(f): - raise ValueError("Could not open SSL (certificate) file '%s'." % f) - - _host = [(self.server, self.port)] - self._conn = stomp.Connection(host_and_ports=_host, use_ssl=True, - ssl_key_file=self.ssl_cl_cert_key, - ssl_cert_file=self.ssl_cl_cert, - ssl_ca_certs=self.ssl_ca_cert, - heartbeats=(self.heartbeat, - self.heartbeat)) + self.stomp_bot_runtime_initial_check() + (self._conn, + self._connect_kwargs) = self.prepare_stomp_connection() self.connect() def connect(self): self.logger.debug('Connecting.') # based on the documentation at: # https://github.com/jasonrbriggs/stomp.py/wiki/Simple-Example - self._conn.start() - self._conn.connect(wait=True) + if stomp.__version__ < (4, 1, 20): + self._conn.start() + self._conn.connect(**self._connect_kwargs) self.logger.debug('Connected.') def shutdown(self): @@ -73,5 +63,9 @@ def process(self): destination=self.exchange) self.acknowledge_message() + @classmethod + def check(cls, parameters): + return cls.stomp_bot_parameters_check(parameters) or None + BOT = StompOutputBot diff --git a/intelmq/etc/feeds.yaml b/intelmq/etc/feeds.yaml index 72e1fb825..f79de2dfb 100644 --- a/intelmq/etc/feeds.yaml +++ b/intelmq/etc/feeds.yaml @@ -1158,20 +1158,19 @@ providers: module: intelmq.bots.collectors.stomp.collector parameters: exchange: "{insert your exchange point as given by CERT.pl}" - ssl_client_certificate_key: "{insert path to client cert key file for - CERT.pl's n6}" ssl_ca_certificate: "{insert path to CA file for CERT.pl's n6}" + auth_by_ssl_client_certificate: false + username: "{insert n6 user's login}" + password: "{insert n6 user's API key}" port: '61614' - ssl_client_certificate: "{insert path to client cert file for CERTpl's - n6}" server: n6stream.cert.pl name: __FEED__ provider: __PROVIDER__ parser: module: intelmq.bots.parsers.n6.parser_n6stomp parameters: - revision: 2018-01-20 - documentation: https://n6.cert.pl/en/ + revision: 2023-09-23 + documentation: https://n6.readthedocs.io/usage/streamapi/ public: false AlienVault: OTX: diff --git a/intelmq/lib/mixins/__init__.py b/intelmq/lib/mixins/__init__.py index d33313548..664ae8a02 100644 --- a/intelmq/lib/mixins/__init__.py +++ b/intelmq/lib/mixins/__init__.py @@ -5,5 +5,6 @@ from intelmq.lib.mixins.http import HttpMixin from intelmq.lib.mixins.cache import CacheMixin from intelmq.lib.mixins.sql import SQLMixin +from intelmq.lib.mixins.stomp import StompMixin -__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin'] +__all__ = ['HttpMixin', 'CacheMixin', 'SQLMixin', 'StompMixin'] diff --git a/intelmq/lib/mixins/stomp.py b/intelmq/lib/mixins/stomp.py new file mode 100644 index 000000000..41cbd29cb --- /dev/null +++ b/intelmq/lib/mixins/stomp.py @@ -0,0 +1,172 @@ +""" StompMixin for IntelMQ + +SPDX-FileCopyrightText: 2017 Sebastian Wagner, 2023 NASK +SPDX-License-Identifier: AGPL-3.0-or-later +""" + +from typing import ( + Any, + Callable, + List, + NoReturn, + Tuple, +) + +try: + import stomp +except ImportError: + stomp = None + +from intelmq.lib.exceptions import MissingDependencyError + + +class StompMixin: + + """A mixin that provides certain common methods for STOMP bots.""" + + # + # STOMP bot attributes relevant to this mixin + + server: str + port: int + heartbeat: int + + auth_by_ssl_client_certificate: bool + + username: str # to be ignored if `auth_by_ssl_client_certificate` is true + password: str # to be ignored if `auth_by_ssl_client_certificate` is true + + ssl_ca_certificate: str # TODO: could be pathlib.Path + ssl_client_certificate: str # TODO: could be pathlib.Path + ssl_client_certificate_key: str # TODO: could be patlib.Path + + # + # Helper methods intended to be used in subclasses + + @classmethod + def stomp_bot_parameters_check(cls, parameters: dict) -> List[List[str]]: + """Intended to be used in bots' `check()` static/class method.""" + logs = [] + cls.__verify_parameters( + get_param=parameters.get, + on_error=lambda msg: logs.append(['error', msg]), + ) + return logs + + def stomp_bot_runtime_initial_check(self) -> None: + """Intended to be used in bots' `init()` instance method.""" + self.__verify_dependency() + self.__verify_parameters( + get_param=self.__get_own_attribute, + on_error=self.__raise_value_error, + ) + + def prepare_stomp_connection(self) -> Tuple['stomp.Connection', dict]: + """ + Get a `(, )` pair. + + * `` is a new instance of `stomp.Connection`, + with the SSL stuff *already configured*, but *without* any + invocations of `connect()` made yet; + + * `` is a dict of arguments -- ready + to be passed to the `connect()` method of the aforementioned + `` object. + """ + ssl_kwargs, connect_kwargs = self.__get_ssl_and_connect_kwargs() + host_and_ports = [(self.server, int(self.port))] + stomp_connection = stomp.Connection(host_and_ports=host_and_ports, + heartbeats=(self.heartbeat, + self.heartbeat)) + stomp_connection.set_ssl(host_and_ports, **ssl_kwargs) + return stomp_connection, connect_kwargs + + # + # Implementation details + + _DEPENDENCY_NAME_REMARK = ( + "Note that the actual name of the pip-installable " + "distribution package is 'stomp.py', not 'stomp'.") + + @classmethod + def __verify_dependency(cls) -> None: + # Note: the pip-installable package's name is 'stomp.py', but + # e.g. the apt-installable package's name is 'python3-stomp' (or + # similar) -- that's why we pass to the `MissingDependencyError` + # constructor the name 'stomp', but also pass the value of the + # `_DEPENDENCY_NAME_REMARK` constant as `additional_text`... + if stomp is None: + raise MissingDependencyError('stomp', + additional_text=cls._DEPENDENCY_NAME_REMARK) + if stomp.__version__ < (4, 1, 8): + raise MissingDependencyError('stomp', version="4.1.8", + installed=stomp.__version__, + additional_text=cls._DEPENDENCY_NAME_REMARK) + + @classmethod + def __verify_parameters(cls, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> None: + file_param_names = ['ssl_ca_certificate'] + if cls.__should_cert_auth_params_be_verified(get_param, on_error): + file_param_names.extend([ + 'ssl_client_certificate', + 'ssl_client_certificate_key', + ]) + for param_name in file_param_names: + cls.__verify_file_param(param_name, get_param, on_error) + + @classmethod + def __should_cert_auth_params_be_verified(cls, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> bool: + flag = get_param('auth_by_ssl_client_certificate') + if not isinstance(flag, bool): + # Let us better be strict here -- explicitly rejecting any + # non-`bool` values as potentially misleading (e.g., consider + # a string like "false", which would be interpreted as True). + on_error(f"Parameter 'auth_by_ssl_client_certificate' " + f"is not set to a bool value (got: {flag!r}).") + flag = False + return flag + + @classmethod + def __verify_file_param(cls, + param_name: str, + get_param: Callable[[str], Any], + on_error: Callable[[str], None]) -> None: + path = get_param(param_name) + if path is None: + on_error(f"Parameter {param_name!r} is not given " + f"(or is set to None).") + return + try: + open(path, 'rb').close() + except OSError as exc: + # (note: the filename is mentioned in the included exc message) + on_error(f"Cannot open file specified as parameter " + f"{param_name!r} ({str(exc)!r}).") + + def __get_own_attribute(self, param_name: str) -> Any: + return getattr(self, param_name, None) + + def __raise_value_error(self, msg: str) -> NoReturn: + raise ValueError(msg) + + def __get_ssl_and_connect_kwargs(self) -> Tuple[dict, dict]: + # Note: the `ca_certs` argument to `set_ssl()` must always be + # provided, otherwise the `stomp.py`'s machinery would *not* + # perform any certificate verification! + ssl_kwargs = dict(ca_certs=self.ssl_ca_certificate) + connect_kwargs = dict(wait=True) + if self.auth_by_ssl_client_certificate: + ssl_kwargs.update( + cert_file=self.ssl_client_certificate, + key_file=self.ssl_client_certificate_key, + ) + else: + connect_kwargs.update( + username=self.username, + passcode=self.password, + ) + return ssl_kwargs, connect_kwargs diff --git a/intelmq/tests/bin/test_psql_initdb.py b/intelmq/tests/bin/test_psql_initdb.py index 9d9151c5b..284664bcd 100644 --- a/intelmq/tests/bin/test_psql_initdb.py +++ b/intelmq/tests/bin/test_psql_initdb.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: 2016 Sebastian Wagner +# SPDX-FileCopyrightText: 2016 Sebastian Wagner, 2023 CERT.at GmbH # # SPDX-License-Identifier: AGPL-3.0-or-later @@ -8,12 +8,15 @@ @author: sebastian """ +import json import os +import re +import tempfile import unittest import pkg_resources -import intelmq.bin.intelmq_psql_initdb as psql_initd +import intelmq.bin.intelmq_psql_initdb as psql_initdb class TestPsqlInit(unittest.TestCase): @@ -21,13 +24,156 @@ class TestPsqlInit(unittest.TestCase): A TestCase for intelmq_psql_initdb. """ + def setUp(self) -> None: + super().setUp() + + self.tempdir = tempfile.TemporaryDirectory() + self.addCleanup(self.tempdir.cleanup) + + self.harmonization_path = f"{self.tempdir.name}/harmonization.conf" + self._create_simple_harmonization() + + def _create_simple_harmonization(self): + simple_harmonization = { + "event": { + "classification.identifier": { + "type": "String" + }, + "time.source": { + "type": "DateTime" + }, + "raw": { + "type": "Base64" + } + } + } + with open(self.harmonization_path, "w+") as f: + json.dump(simple_harmonization, f) + def test_output(self): """ Compare output to cached one. """ with open(os.path.join(os.path.dirname(__file__), 'initdb.sql')) as handle: expected = handle.read() fname = pkg_resources.resource_filename('intelmq', 'etc/harmonization.conf') - self.assertEqual(psql_initd.generate(fname).strip(), expected.strip()) + self.assertEqual(psql_initdb.generate(fname).strip(), expected.strip()) + + def test_generating_events_schema(self): + expected_table = """ + CREATE TABLE events ( + "id" BIGSERIAL UNIQUE PRIMARY KEY, + "classification.identifier" text, + "raw" text, + "time.source" timestamp with time zone + ); + """ + expected_table = self._normalize_leading_whitespaces(expected_table) + expected_indexes = [ + """CREATE INDEX "idx_events_classification.identifier" ON events USING btree ("classification.identifier");""", + """CREATE INDEX "idx_events_time.source" ON events USING btree ("time.source");""" + ] + generated = psql_initdb.generate(self.harmonization_path) + + self.assertTrue(self._normalize_leading_whitespaces(generated).startswith(expected_table)) + + for index in expected_indexes: + self.assertIn(index, generated) + + def test_skip_generating_events_table_schema(self): + generated = psql_initdb.generate(self.harmonization_path, skip_events=True) + + self.assertNotIn("CREATE TABLE events", generated) + self.assertNotIn("CREATE INDEX", generated) + + def test_separated_raws_view_schema(self): + expected_view = """ + CREATE VIEW public.v_events AS + SELECT + events.id, + events."classification.identifier", + events."time.source", + raws."event_id", + raws."raw" + FROM ( + public.events + JOIN public.raws ON ((events.id = raws.event_id))); + """ + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn("CREATE TABLE public.raws", generated) # static schema, check if added + self.assertIn("CONSTRAINT raws_event_id_fkey", generated) + self.assertIn(self._normalize_leading_whitespaces(expected_view), generated) + + def test_separated_raws_trigger(self): + expected_function = """ + CREATE FUNCTION public.process_v_events_insert() + RETURNS trigger + LANGUAGE plpgsql + AS $$ + DECLARE event_id integer; + + BEGIN + INSERT INTO + events ( + "classification.identifier", + "time.source" + ) + VALUES + ( + NEW."classification.identifier", + NEW."time.source" + ) + RETURNING id INTO event_id; + INSERT INTO + raws ("event_id", "raw") + VALUES + (event_id, NEW.raw); + RETURN NEW; + END; + $$; + """ + + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn(self._normalize_leading_whitespaces(expected_function), generated) + self.assertIn("CREATE TRIGGER tr_events", generated) # Static, check if added + + def test_partition_field(self): + """For paritioned table """ + expected_table = """ + CREATE TABLE events ( + "id" BIGSERIAL, + "classification.identifier" text, + "raw" text, + "time.source" timestamp with time zone, + PRIMARY KEY ("id", "time.source") + ); + """ + generated = psql_initdb.generate(self.harmonization_path, partition_key="time.source", + separate_raws=True) + generated = self._normalize_leading_whitespaces(generated) + self.assertIn(self._normalize_leading_whitespaces(expected_table), generated) + # Foreign key may not be supported on partitioned or Timescale tables, skipping + self.assertNotIn("CONSTRAINT raws_event_id_fkey", generated) + + def test_skip_or_replace(self): + expected_creates = [ + "CREATE TABLE IF NOT EXISTS public.raws", + "CREATE TABLE IF NOT EXISTS events", + 'CREATE INDEX IF NOT EXISTS "idx_events_classification.identifier"', + # Do not support IF NOT EXISTS: + "CREATE OR REPLACE TRIGGER tr_events", + "CREATE OR REPLACE FUNCTION public.process_v_events_insert()", + "CREATE OR REPLACE VIEW public.v_events", + ] + generated = psql_initdb.generate(self.harmonization_path, separate_raws=True, + skip_or_replace=True) + for create in expected_creates: + self.assertIn(create, generated) + + @staticmethod + def _normalize_leading_whitespaces(data: str) -> str: + return re.sub(r"^(\s)*", " ", data.strip(), flags=re.MULTILINE) if __name__ == '__main__': # pragma: no cover