Skip to content

Commit

Permalink
KafkaSinkCluster: DeleteRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 6, 2024
1 parent f81fbdb commit 0e956ef
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 4 deletions.
50 changes: 48 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use test_helpers::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -132,11 +132,57 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {

async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

delete_record(&admin, connection_builder).await;
admin
.delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"])
.await;
}

async fn delete_record(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// assert partitions1 contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("test_delete_records"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".into()),
topic_name: "partitions1".to_owned(),
offset: Some(0),
})
.await;

// delete the record
admin
.delete_records(&[RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
delete_before_offset: 0,
}])
.await;

// assert partitions1 no longer contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("test_delete_records2"),
)
.await;
consumer
.assert_no_consume_within_timeout(Duration::from_secs(2))
.await;
}
}

/// Attempt to make the driver batch produce requests for different topics into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) {
Expand Down
1 change: 1 addition & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ The connection to the client has been closed."
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::DeleteRecords(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
..
Expand Down
26 changes: 24 additions & 2 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo,
Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition, TopicPartitionInfo,
};
use crate::connection::java::{Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -522,6 +522,28 @@ impl KafkaAdminJava {
.await;
}

pub async fn delete_records(&self, to_delete: &[RecordsToDelete]) {
let to_delete: Vec<(Value, Value)> = to_delete
.iter()
.map(|x| {
(
create_topic_partition(&self.jvm, &x.topic_partition),
self.jvm.call_static(
"org.apache.kafka.clients.admin.RecordsToDelete",
"beforeOffset",
vec![self.jvm.new_long(x.delete_before_offset)],
),
)
})
.collect();
let to_delete = self.jvm.new_map(to_delete);

self.admin
.call("deleteRecords", vec![to_delete])
.call_async("all", vec![])
.await;
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
let partitions: Vec<(Value, Value)> = partitions
.iter()
Expand Down
12 changes: 12 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,14 @@ impl KafkaAdmin {
}
}

pub async fn delete_records(&self, to_delete: &[RecordsToDelete]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => unimplemented!(),
Self::Java(java) => java.delete_records(to_delete).await,
}
}

pub async fn list_offsets(
&self,
topic_partitions: HashMap<TopicPartition, OffsetSpec>,
Expand Down Expand Up @@ -649,3 +657,7 @@ impl IsolationLevel {
pub struct OffsetAndMetadata {
pub offset: i64,
}
pub struct RecordsToDelete {
pub topic_partition: TopicPartition,
pub delete_before_offset: i64,
}

0 comments on commit 0e956ef

Please sign in to comment.