From c01b025dd23606a490a41e84f02c66095d355ab4 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Mon, 30 Sep 2024 15:13:49 +0200 Subject: [PATCH 1/3] fix: unknown Kafka errors with _RESOLVE error code Also don't log KafkaUnavailableError as unexpected error --- src/karapace/kafka/common.py | 11 ++++++++++- src/karapace/schema_reader.py | 3 +++ stubs/confluent_kafka/cimpl.pyi | 1 + 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/karapace/kafka/common.py b/src/karapace/kafka/common.py index 5df9838a2..add44eea3 100644 --- a/src/karapace/kafka/common.py +++ b/src/karapace/kafka/common.py @@ -6,7 +6,14 @@ from __future__ import annotations from aiokafka.client import UnknownTopicOrPartitionError -from aiokafka.errors import AuthenticationFailedError, for_code, IllegalStateError, KafkaTimeoutError, NoBrokersAvailable +from aiokafka.errors import ( + AuthenticationFailedError, + for_code, + IllegalStateError, + KafkaTimeoutError, + KafkaUnavailableError, + NoBrokersAvailable, +) from collections.abc import Iterable from concurrent.futures import Future from confluent_kafka.error import KafkaError, KafkaException @@ -52,6 +59,8 @@ def translate_from_kafkaerror(error: KafkaError) -> Exception: return KafkaTimeoutError() if code == KafkaError._STATE: # pylint: disable=protected-access return IllegalStateError() + if code == KafkaError._RESOLVE: # pylint: disable=protected-access + return KafkaUnavailableError() return for_code(code) diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index 01f07f379..cd04944dc 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -11,6 +11,7 @@ InvalidReplicationFactorError, KafkaConfigurationError, KafkaTimeoutError, + KafkaUnavailableError, LeaderNotAvailableError, NoBrokersAvailable, NodeNotReadyError, @@ -250,6 +251,8 @@ def run(self) -> None: except ShutdownException: self._stop_schema_reader.set() shutdown() + except KafkaUnavailableError: + LOG.warning("Kafka cluster is unavailable or broker can't be resolved.") except Exception as e: # pylint: disable=broad-except self.stats.unexpected_exception(ex=e, where="schema_reader_loop") LOG.warning("Unexpected exception in schema reader loop - %s", e) diff --git a/stubs/confluent_kafka/cimpl.pyi b/stubs/confluent_kafka/cimpl.pyi index 74760897c..6936d10f0 100644 --- a/stubs/confluent_kafka/cimpl.pyi +++ b/stubs/confluent_kafka/cimpl.pyi @@ -11,6 +11,7 @@ class KafkaError: _UNKNOWN_PARTITION: int _TIMED_OUT: int _STATE: int + _RESOLVE: int UNKNOWN_TOPIC_OR_PART: int def code(self) -> int: ... From da7336f4f01e494b11d9fa6382384a0bdcd9bf83 Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Mon, 30 Sep 2024 16:44:22 +0200 Subject: [PATCH 2/3] fix: source layout check --- src/karapace/backup/backends/v3/schema_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/karapace/backup/backends/v3/schema_tool.py b/src/karapace/backup/backends/v3/schema_tool.py index 65a3ea2bf..340be2477 100644 --- a/src/karapace/backup/backends/v3/schema_tool.py +++ b/src/karapace/backup/backends/v3/schema_tool.py @@ -86,7 +86,7 @@ def check_compatibility(git_target: str) -> None: for file in schema_directory.glob(f"*{extension}"): relative = relative_path(file) - if source_layout: + if not source_layout: relative = pathlib.Path(*relative.parts[1:]) with subprocess.Popen( ["git", "show", f"{git_target}:{relative}"], From 9604b874aa41d416115d708567fb1d76f832ad2f Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 2 Oct 2024 10:45:43 +0300 Subject: [PATCH 3/3] fix: add Protopace go source to be included in MANIFEST.in --- MANIFEST.in | 1 + 1 file changed, 1 insertion(+) diff --git a/MANIFEST.in b/MANIFEST.in index 4ee222daa..9a30ae03f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ graft src +graft go include karapace.json include karapace.unit include tests/*.py