From eab4be0ec6855f100b4007a5a93f4e3baa21e650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?istrator=C2=B2?= <82608337+istrator2@users.noreply.github.com> Date: Mon, 27 Nov 2023 22:32:48 +0100 Subject: [PATCH] support for multiple prefixes and related fixes (#116) * make code multiprefix-able * fix some autoformatting * change quoting * add unittests --- .gitignore | 4 +- docker-compose.yml | 14 +++--- entrypoint | 23 ++++++---- env.example | 6 +-- requirements.txt | 17 +++---- wgkex.yaml.example | 32 ++++++------- wgkex/broker/app.py | 2 +- wgkex/broker/templates/index.html | 2 +- wgkex/config/config.py | 17 +++++-- wgkex/config/config_test.py | 13 ++++-- wgkex/worker/app.py | 76 +++++++++++++++++++++++++------ wgkex/worker/app_test.py | 48 +++++++++++++++++-- wgkex/worker/mqtt.py | 12 +++-- 13 files changed, 189 insertions(+), 77 deletions(-) diff --git a/.gitignore b/.gitignore index c2847fb..4ffb7f2 100644 --- a/.gitignore +++ b/.gitignore @@ -132,7 +132,6 @@ dmypy.json bazel-* # docker-compose -.env docker-compose.override.yaml # docker-compose volumes /volumes @@ -141,3 +140,6 @@ docker-compose.override.yaml # config file wgkex.yaml + +# pycharm project metadata +.idea/ diff --git a/docker-compose.yml b/docker-compose.yml index 0f96056..39ba4f5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: - ./volumes/mosquitto/data:/mosquitto/data - ./volumes/mosquitto/log:/mosquitto/log ports: - - "9001:9001" + - "9001:9001" broker: image: ghcr.io/freifunkmuc/wgkex:latest @@ -17,11 +17,11 @@ services: restart: unless-stopped ports: - "5000:5000" - #volumes: + #volumes: #- ./config/broker/wgkex.yaml:/etc/wgkex.yaml environment: - WGKEX_DOMAINS: ${WGKEX_DOMAINS-ffmuc_freising, ffmuc_gauting, ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_uml_nord, ffmuc_uml_ost, ffmuc_uml_sued, ffmuc_uml_west, ffmuc_welt} - WGKEX_DOMAIN_PREFIX: ${WGKEX_DOMAIN_PREFIX-ffmuc_} + WGKEX_DOMAINS: ${WGKEX_DOMAINS-ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_welt, ffwert_city} + WGKEX_DOMAIN_PREFIXES: ${WGKEX_DOMAIN_PREFIXES-ffmuc_, ffdon_, ffwert_} WGKEX_DEBUG: ${WGKEX_DEBUG-DEBUG} MQTT_BROKER_URL: ${MQTT_BROKER_URL-mqtt} MQTT_BROKER_PORT: ${MQTT_BROKER_PORT-1883} @@ -35,10 +35,10 @@ services: command: worker restart: unless-stopped #volumes: - #- ./config/worker/wgkex.yaml:/etc/wgkex.yaml + #- ./config/worker/wgkex.yaml:/etc/wgkex.yaml environment: - WGKEX_DOMAINS: ${WGKEX_DOMAINS-ffmuc_freising, ffmuc_gauting, ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_uml_nord, ffmuc_uml_ost, ffmuc_uml_sued, ffmuc_uml_west, ffmuc_welt} - WGKEX_DOMAIN_PREFIX: ${WGKEX_DOMAIN_PREFIX-ffmuc_} + WGKEX_DOMAINS: ${WGKEX_DOMAINS-ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_welt, ffwert_city} + WGKEX_DOMAIN_PREFIXES: ${WGKEX_DOMAIN_PREFIXES-ffmuc_, ffdon_, ffwert_} WGKEX_DEBUG: ${WGKEX_DEBUG-DEBUG} MQTT_BROKER_URL: ${MQTT_BROKER_URL-mqtt} MQTT_BROKER_PORT: ${MQTT_BROKER_PORT-1883} diff --git a/entrypoint b/entrypoint index 6741032..4975f35 100755 --- a/entrypoint +++ b/entrypoint @@ -1,15 +1,15 @@ #!/bin/bash set -e -: ${WGKEX_DOMAINS:="ffmuc_freising, ffmuc_gauting, ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_uml_nord, ffmuc_uml_ost, ffmuc_uml_sued, ffmuc_uml_west, ffmuc_welt"} -: ${WGKEX_DOMAIN_PREFIX:="ffmuc_"} -: ${WGKEX_DEBUG:="DEBUG"} -: ${MQTT_BROKER_URL:="mqtt"} -: ${MQTT_BROKER_PORT:="1883"} -: ${MQTT_USERNAME:=""} -: ${MQTT_PASSWORD:=""} -: ${MQTT_KEEPALIVE:="5"} -: ${MQTT_TLS:="False"} +: "${WGKEX_DOMAINS:=ffmuc_muc_cty, ffmuc_muc_nord, ffmuc_muc_ost, ffmuc_muc_sued, ffmuc_muc_west, ffmuc_welt, ffwert_city}" +: "${WGKEX_DOMAIN_PREFIXES:=ffmuc_, ffdon_, ffwert_}" +: "${WGKEX_DEBUG:=DEBUG}" +: "${MQTT_BROKER_URL:=mqtt}" +: "${MQTT_BROKER_PORT:=1883}" +: "${MQTT_USERNAME:=}" +: "${MQTT_PASSWORD:=}" +: "${MQTT_KEEPALIVE:=5}" +: "${MQTT_TLS:=False}" mk_config() { if [ ! -e /etc/wgkex.yaml ] ; then @@ -19,9 +19,12 @@ IFS=", " for i in $WGKEX_DOMAINS; do echo " - $i" done +echo "domain_prefixes:" +for i in $WGKEX_DOMAIN_PREFIXES; do + echo " - $i" +done cat < -wgkex + wgkex

WGKEX

diff --git a/wgkex/config/config.py b/wgkex/config/config.py index efe4d0f..0659d69 100644 --- a/wgkex/config/config.py +++ b/wgkex/config/config.py @@ -1,4 +1,5 @@ """Configuration handling class.""" +import logging import os import sys import yaml @@ -41,6 +42,14 @@ class MQTT: @classmethod def from_dict(cls, mqtt_cfg: Dict[str, str]) -> "MQTT": + """seems to generate a mqtt config object from dictionary + + Args: + mqtt_cfg (): + + Returns: + mqtt config object + """ return cls( broker_url=mqtt_cfg["broker_url"], username=mqtt_cfg["username"], @@ -60,12 +69,11 @@ class Config: Attributes: domains: The list of domains to listen for. mqtt: The MQTT configuration. - domain_prefix: The prefix to pre-pend to a given domain. - """ + domain_prefixes: The list of prefixes to pre-pend to a given domain.""" domains: List[str] mqtt: MQTT - domain_prefix: str + domain_prefixes: List[str] @classmethod def from_dict(cls, cfg: Dict[str, str]) -> "Config": @@ -79,7 +87,7 @@ def from_dict(cls, cfg: Dict[str, str]) -> "Config": return cls( domains=cfg["domains"], mqtt=mqtt_cfg, - domain_prefix=cfg["domain_prefix"], + domain_prefixes=cfg["domain_prefixes"], ) @@ -124,6 +132,7 @@ def fetch_config_from_disk() -> str: The file contents as string. """ config_file = os.environ.get(WG_CONFIG_OS_ENV, WG_CONFIG_DEFAULT_LOCATION) + logging.debug("getting config_file: %s", repr(config_file)) try: with open(config_file, "r") as stream: return stream.read() diff --git a/wgkex/config/config_test.py b/wgkex/config/config_test.py index 58427c8..3c33148 100644 --- a/wgkex/config/config_test.py +++ b/wgkex/config/config_test.py @@ -1,10 +1,17 @@ +"""Tests for configuration handling class.""" import unittest import mock import config import yaml -_VALID_CFG = "domain_prefix: ffmuc_\nlog_level: DEBUG\ndomains:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n" -_INVALID_LINT = "domain_prefix: ffmuc_\nBAD_KEY_FOR_DOMAIN:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n" +_VALID_CFG = ( + "domain_prefixes:\n- ffmuc_\n- ffdon_\n- ffwert_\nlog_level: DEBUG\ndomains:\n- a\n- b\nmqtt:\n broker_port: 1883" + "\n broker_url: mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n" +) +_INVALID_LINT = ( + "domain_prefixes: ffmuc_\nBAD_KEY_FOR_DOMAIN:\n- a\n- b\nmqtt:\n broker_port: 1883\n broker_url: " + "mqtt://broker\n keepalive: 5\n password: pass\n tls: true\n username: user\n" +) _INVALID_CFG = "asdasdasdasd" @@ -52,7 +59,7 @@ def test_fetch_from_config_success(self): self.assertListEqual(["a", "b"], config.fetch_from_config("domains")) def test_fetch_from_config_no_key_in_config(self): - """Test fetch non existent key from configuration.""" + """Test fetch non-existent key from configuration.""" mock_open = mock.mock_open(read_data=_VALID_CFG) with mock.patch("builtins.open", mock_open): self.assertIsNone(config.fetch_from_config("key_does_not_exist")) diff --git a/wgkex/worker/app.py b/wgkex/worker/app.py index 911fd8b..70aa8fc 100644 --- a/wgkex/worker/app.py +++ b/wgkex/worker/app.py @@ -20,6 +20,14 @@ class DomainsNotInConfig(Error): """If no domains exist in configuration file.""" +class PrefixesNotInConfig(Error): + """If no prefixes exist in configuration file.""" + + +class DomainsAreNotUnique(Error): + """If non-unique domains exist in configuration file.""" + + def flush_workers(domain: Text) -> None: """Calls peer flush every _CLEANUP_TIME interval.""" while True: @@ -35,20 +43,58 @@ def clean_up_worker(domains: List[Text]) -> None: domains: list of domains. """ logger.debug("Cleaning up the following domains: %s", domains) - prefix = config.load_config().get("domain_prefix") + prefixes = config.load_config().get("domain_prefixes") + cleanup_counter = 0 + # ToDo: do we need a check if every domain got gleaned? + for prefix in prefixes: + for domain in domains: + if prefix in domain: + logger.info("Scheduling cleanup task for %s, ", domain) + try: + cleaned_domain = domain.split(prefix)[1] + cleanup_counter += 1 + except IndexError: + logger.error( + "Cannot strip domain with prefix %s from passed value %s. Skipping cleanup operation", + prefix, + domain, + ) + continue + thread = threading.Thread(target=flush_workers, args=(cleaned_domain,)) + thread.start() + if cleanup_counter < len(domains): + logger.error( + "Not every domain got cleaned. Check domains for missing prefixes", + repr(domains), + repr(prefixes), + ) + + +def check_all_domains_unique(domains, prefixes): + """strips off prefixes and checks if domains are unique + + Args: + domains: [str] + Returns: + boolean + """ + if not prefixes: + raise PrefixesNotInConfig("Could not locate prefixes in configuration.") + if not isinstance(prefixes, list): + raise TypeError("prefixes is not a list") + unique_domains = [] for domain in domains: - logger.info("Scheduling cleanup task for %s, ", domain) - try: - cleaned_domain = domain.split(prefix)[1] - except IndexError: - logger.error( - "Cannot strip domain with prefix %s from passed value %s. Skipping cleanup operation", - prefix, - domain, - ) - continue - thread = threading.Thread(target=flush_workers, args=(cleaned_domain,)) - thread.start() + for prefix in prefixes: + if prefix in domain: + stripped_domain = domain.split(prefix)[1] + if stripped_domain in unique_domains: + logger.error( + "We have a non-unique domain here", + domain, + ) + return False + unique_domains.append(stripped_domain) + return True def main(): @@ -56,10 +102,14 @@ def main(): Raises: DomainsNotInConfig: If no domains were found in configuration file. + DomainsAreNotUnique: If there were non-unique domains after stripping prefix """ domains = config.load_config().get("domains") + prefixes = config.load_config().get("domain_prefixes") if not domains: raise DomainsNotInConfig("Could not locate domains in configuration.") + if not check_all_domains_unique(domains, prefixes): + raise DomainsAreNotUnique("There are non-unique domains! Check config.") clean_up_worker(domains) watch_queue() mqtt.connect() diff --git a/wgkex/worker/app_test.py b/wgkex/worker/app_test.py index 1fbea9d..111590b 100644 --- a/wgkex/worker/app_test.py +++ b/wgkex/worker/app_test.py @@ -5,17 +5,57 @@ class AppTest(unittest.TestCase): + """unittest.TestCase class""" + def setUp(self) -> None: + """set up unittests""" app._CLEANUP_TIME = 0 + def test_unique_domains_success(self): + """Ensure domain suffixes are unique.""" + test_prefixes = ["TEST_PREFIX_", "TEST_PREFIX2_"] + test_domains = [ + "TEST_PREFIX_DOMAINSUFFIX1", + "TEST_PREFIX_DOMAINSUFFIX2", + "TEST_PREFIX2_DOMAINSUFFIX3", + ] + self.assertTrue( + app.check_all_domains_unique(test_domains, test_prefixes), + "unique domains are not detected unique", + ) + + def test_unique_domains_fail(self): + """Ensure domain suffixes are not unique.""" + test_prefixes = ["TEST_PREFIX_", "TEST_PREFIX2_"] + test_domains = [ + "TEST_PREFIX_DOMAINSUFFIX1", + "TEST_PREFIX_DOMAINSUFFIX2", + "TEST_PREFIX2_DOMAINSUFFIX1", + ] + self.assertFalse( + app.check_all_domains_unique(test_domains, test_prefixes), + "non-unique domains are detected as unique", + ) + + def test_unique_domains_not_list(self): + """Ensure domain prefixes are a list.""" + test_prefixes = "TEST_PREFIX_, TEST_PREFIX2_" + test_domains = [ + "TEST_PREFIX_DOMAINSUFFIX1", + "TEST_PREFIX_DOMAINSUFFIX2", + "TEST_PREFIX2_DOMAINSUFFIX1", + ] + with self.assertRaises(TypeError): + app.check_all_domains_unique(test_domains, test_prefixes) + @mock.patch.object(app.config, "load_config") @mock.patch.object(app.mqtt, "connect", autospec=True) def test_main_success(self, connect_mock, config_mock): """Ensure we can execute main.""" connect_mock.return_value = None - test_prefix = "TEST_PREFIX_" + test_prefixes = ["TEST_PREFIX_", "TEST_PREFIX2_"] config_mock.return_value = dict( - domains=[f"{test_prefix}domain.one"], domain_prefix=test_prefix + domains=[f"{test_prefixes[1]}domain.one"], domain_prefixes=test_prefixes ) with mock.patch("app.flush_workers", return_value=None): app.main() @@ -34,9 +74,9 @@ def test_main_fails_no_domain(self, connect_mock, config_mock): @mock.patch.object(app.mqtt, "connect", autospec=True) def test_main_fails_bad_domain(self, connect_mock, config_mock): """Ensure we fail when domains are badly formatted.""" - test_prefix = "TEST_PREFIX_" + test_prefixes = ["TEST_PREFIX_", "TEST_PREFIX2_"] config_mock.return_value = dict( - domains=[f"cant_split_domain"], domain_prefix=test_prefix + domains=[f"cant_split_domain"], domain_prefixes=test_prefixes ) connect_mock.return_value = None with mock.patch("app.flush_workers", return_value=None): diff --git a/wgkex/worker/mqtt.py b/wgkex/worker/mqtt.py index 995d49c..1c1cf31 100644 --- a/wgkex/worker/mqtt.py +++ b/wgkex/worker/mqtt.py @@ -84,15 +84,19 @@ def on_message(client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage) -> """ # TODO(ruairi): Check bounds and raise exception here. logger.debug("Got message %s from MTQQ", message) - domain_prefix = load_config().get("domain_prefix") - domain = re.search(r"/.*" + domain_prefix + "(\w+)/", message.topic) + domain_prefixes = load_config().get("domain_prefixes") + domain = None + for domain_prefix in domain_prefixes: + domain = re.search(r"/.*" + domain_prefix + "(\w+)/", message.topic) + if domain: + break if not domain: raise ValueError( - "Could not find a match for %s on %s", domain_prefix, message.topic + "Could not find a match for %s on %s", repr(domain_prefixes), message.topic ) + # this will not work, if we have non-unique prefix stripped domains domain = domain.group(1) logger.debug("Found domain %s", domain) - logger.info( f"Received create message for key {str(message.payload.decode('utf-8'))} on domain {domain} adding to queue" )