From a60185cece306e3d747cd4e16a4fa4141003d148 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 5 Nov 2024 16:31:52 +1100 Subject: [PATCH] KafkaSinkCluster: DeleteRecords --- .../tests/kafka_int_tests/test_cases.rs | 50 ++++++++++++++++++- .../src/transforms/kafka/sink_cluster/mod.rs | 1 + test-helpers/src/connection/kafka/java.rs | 26 +++++++++- test-helpers/src/connection/kafka/mod.rs | 12 +++++ 4 files changed, 85 insertions(+), 4 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index b80df9594..f77193529 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -5,8 +5,8 @@ use test_helpers::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, - OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, - ResourceType, TopicPartition, + OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -132,11 +132,57 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; + + delete_record(&admin, connection_builder).await; admin .delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"]) .await; } +async fn delete_record(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) { + // Only supported by java driver + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // assert partitions1 contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) + .with_group("test_delete_records"), + ) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".into()), + topic_name: "partitions1".to_owned(), + offset: Some(0), + }) + .await; + + // delete the record + admin + .delete_records(&[RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }, + delete_before_offset: 0, + }]) + .await; + + // assert partitions1 no longer contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) + .with_group("test_delete_records2"), + ) + .await; + consumer + .assert_no_consume_within_timeout(Duration::from_secs(2)) + .await; + } +} + /// Attempt to make the driver batch produce requests for different topics into the same request /// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) { diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 2e189e3c5..241c69749 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1048,6 +1048,7 @@ The connection to the client has been closed." | RequestBody::AlterConfigs(_) | RequestBody::CreatePartitions(_) | RequestBody::DeleteTopics(_) + | RequestBody::DeleteRecords(_) | RequestBody::CreateAcls(_) | RequestBody::ApiVersions(_), .. diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 2f56f9275..876254d66 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,8 +1,8 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult, - Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, - TopicPartitionInfo, + Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, + TopicDescription, TopicPartition, TopicPartitionInfo, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; @@ -522,6 +522,28 @@ impl KafkaAdminJava { .await; } + pub async fn delete_records(&self, to_delete: &[RecordsToDelete]) { + let to_delete: Vec<(Value, Value)> = to_delete + .iter() + .map(|x| { + ( + create_topic_partition(&self.jvm, &x.topic_partition), + self.jvm.call_static( + "org.apache.kafka.clients.admin.RecordsToDelete", + "beforeOffset", + vec![self.jvm.new_long(x.delete_before_offset)], + ), + ) + }) + .collect(); + let to_delete = self.jvm.new_map(to_delete); + + self.admin + .call("deleteRecords", vec![to_delete]) + .call_async("all", vec![]) + .await; + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { let partitions: Vec<(Value, Value)> = partitions .iter() diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index e3151bbd1..678e73891 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -442,6 +442,14 @@ impl KafkaAdmin { } } + pub async fn delete_records(&self, to_delete: &[RecordsToDelete]) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => unimplemented!(), + Self::Java(java) => java.delete_records(to_delete).await, + } + } + pub async fn list_offsets( &self, topic_partitions: HashMap, @@ -657,3 +665,7 @@ impl IsolationLevel { pub struct OffsetAndMetadata { pub offset: i64, } +pub struct RecordsToDelete { + pub topic_partition: TopicPartition, + pub delete_before_offset: i64, +}