diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 4779e405a..101d2e40c 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1460,6 +1460,17 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) { } } +async fn list_transactions(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + let _transaction_producer = connection_builder + .connect_producer_with_transactions("some_transaction_id".to_owned()) + .await; + + let actual_results = admin.list_transactions().await; + let expected_results = ["some_transaction_id".to_owned()]; + assert_eq!(actual_results, expected_results); +} + async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin @@ -1485,6 +1496,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { list_groups(connection_builder).await; + list_transactions(connection_builder).await; } } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index fe00dbfb7..35c82dddf 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -30,10 +30,11 @@ use kafka_protocol::messages::{ BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, - OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, - ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, - SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -49,7 +50,7 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter, + ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -1002,10 +1003,15 @@ impl KafkaSinkCluster { .. })) => self.route_to_controller(request), + // route to all nodes Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListGroups(_), .. })) => self.split_and_route_request::(request)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListTransactions(_), + .. + })) => self.split_and_route_request::(request)?, // route to random broker Some(Frame::Kafka(KafkaFrame::Request { @@ -2018,6 +2024,10 @@ impl KafkaSinkCluster { body: ResponseBody::ListGroups(base), .. })) => Self::combine_list_groups(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListTransactions(base), + .. + })) => Self::combine_list_transactions(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2257,6 +2267,27 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_list_transactions( + base_list_transactions: &mut ListTransactionsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListTransactions(next_list_transactions), + .. + })) = next.frame() + { + base_list_transactions + .transaction_states + .extend(std::mem::take( + &mut next_list_transactions.transaction_states, + )); + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 9a7fbb0fd..7ada3283b 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -11,7 +11,8 @@ use kafka_protocol::messages::{ list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest, - ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, + ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -194,6 +195,34 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter { } } +pub struct ListTransactionsSplitAndRouter; + +impl RequestSplitAndRouter for ListTransactionsSplitAndRouter { + type Request = ListTransactionsRequest; + type SubRequests = (); + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + _request: &mut Self::Request, + ) -> HashMap { + transform.split_request_by_routing_to_all_brokers() + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListTransactions(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) { + // No need to reassemble, each ListTransactions is an exact clone of the original + } +} + pub struct OffsetFetchSplitAndRouter; impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index c7c4cd937..2f56f9275 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -684,6 +684,25 @@ impl KafkaAdminJava { results } + pub async fn list_transactions(&self) -> Vec { + let java_results = self + .admin + .call("listTransactions", vec![]) + .call_async("all", vec![]) + .await; + + let mut results = vec![]; + for java_group in java_results.call("iterator", vec![]).into_iter() { + results.push( + java_group + .cast("org.apache.kafka.clients.admin.TransactionListing") + .call("transactionalId", vec![]) + .into_rust(), + ) + } + results + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 839e67ec0..e3151bbd1 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -461,6 +461,14 @@ impl KafkaAdmin { } } + pub async fn list_transactions(&self) -> Vec { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_transactions"), + Self::Java(java) => java.list_transactions().await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")]