From fd59ddb882c6f57fedc287f3f38578057933a3be Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 4 Nov 2024 09:39:22 +1100 Subject: [PATCH] Route ListTransactions request --- .../tests/kafka_int_tests/test_cases.rs | 14 ++- .../src/transforms/kafka/sink_cluster/mod.rs | 104 ++++++++++++++---- .../transforms/kafka/sink_cluster/split.rs | 31 +++++- test-helpers/src/connection/kafka/java.rs | 19 ++++ test-helpers/src/connection/kafka/mod.rs | 8 ++ 5 files changed, 153 insertions(+), 23 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 4779e405a..b80df9594 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1456,10 +1456,21 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) { let actual_results = admin.list_groups().await; if !actual_results.contains(&"list_groups_test".to_owned()) { - panic!("Expected to find list_groups_test in {actual_results:?} but was misisng") + panic!("Expected to find \"list_groups_test\" in {actual_results:?} but was missing") } } +async fn list_transactions(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + let _transaction_producer = connection_builder + .connect_producer_with_transactions("some_transaction_id".to_owned()) + .await; + + let actual_results = admin.list_transactions().await; + let expected_results = ["some_transaction_id".to_owned()]; + assert_eq!(actual_results, expected_results); +} + async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin @@ -1485,6 +1496,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { list_groups(connection_builder).await; + list_transactions(connection_builder).await; } } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index aedf1004f..2e189e3c5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -30,10 +30,11 @@ use kafka_protocol::messages::{ BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, - OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, - ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, - SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -49,7 +50,7 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter, + ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -164,6 +165,7 @@ struct KafkaSinkClusterBuilder { first_contact_points: Vec, shotover_nodes: Vec, rack: StrBytes, + broker_id: BrokerId, connect_timeout: Duration, read_timeout: Option, controller_broker: Arc, @@ -214,6 +216,7 @@ impl KafkaSinkClusterBuilder { .map(|x| x.get_builder(connect_timeout, read_timeout)) .transpose()?, shotover_nodes, + broker_id: BrokerId(local_shotover_broker_id), rack, connect_timeout, read_timeout, @@ -236,6 +239,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { first_contact_points: self.first_contact_points.clone(), shotover_nodes: self.shotover_nodes.clone(), rack: self.rack.clone(), + broker_id: self.broker_id, nodes: vec![], nodes_shared: self.nodes_shared.clone(), controller_broker: self.controller_broker.clone(), @@ -300,6 +304,7 @@ pub(crate) struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, rack: StrBytes, + broker_id: BrokerId, nodes: Vec, nodes_shared: Arc>>, controller_broker: Arc, @@ -1025,10 +1030,15 @@ The connection to the client has been closed." .. })) => self.route_to_controller(request), + // route to all nodes Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListGroups(_), .. })) => self.split_and_route_request::(request)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListTransactions(_), + .. + })) => self.split_and_route_request::(request)?, // route to random broker Some(Frame::Kafka(KafkaFrame::Request { @@ -1444,24 +1454,50 @@ The connection to the client has been closed." result } + /// Route broadcasted requests to all brokers split across all shotover nodes. + /// That is, each shotover node in a rack will deterministically be assigned a portion of the rack to route the request to. + /// If a shotover node is the only node in its rack it will route to all kafka brokers in the rack. + /// When combined with a client that is routing this request to all shotover nodes, the request will reach each kafka broker exactly once. + /// + /// The logic in this function relies on `self.shotover_nodes` and `self.nodes` being sorted by broker id. fn split_request_by_routing_to_all_brokers(&mut self) -> HashMap { - let mut result: HashMap = Default::default(); - - for broker in self.nodes.iter().filter(|node| { - node.is_up() - && node - .rack - .as_ref() - .map(|rack| rack == &self.rack) - // If the cluster is not using racks, include all brokers in the list. - // This ensure we get full coverage of the cluster. - // The client driver can filter out the resulting duplicates. - .unwrap_or(true) - }) { - result.insert(broker.broker_id, ()); - } + // Will always be at least 1, since the shotover this code is running in will always be included. + let shotovers_in_rack = self + .shotover_nodes + .iter() + .filter(|node| node.is_up() && node.rack == self.rack) + .count(); - result + // The offset of this shotover node within the rack + // will be 0 <= shotover_offsets_within_rack < shotovers_in_rack + // The purpose is to create a deterministic way of splitting up broadcasted requests across shotover nodes. + let local_shotover_index_in_rack = self + .shotover_nodes + .iter() + .filter(|node| node.is_up() && node.rack == self.rack) + // start enumerating AFTER filtering down to brokers in the rack + .enumerate() + .find(|node| node.1.broker_id == self.broker_id) + .unwrap() + .0; + + self.nodes + .iter() + .filter(|node| { + node.is_up() + && node + .rack + .as_ref() + .map(|rack| rack == &self.rack) + // If the cluster is not using racks, include all brokers in the list. + .unwrap_or(true) + }) + // start enumerating AFTER filtering down to brokers in the rack + .enumerate() + // assign each shotover node in the rack a subset of the brokers in the rack + .filter(|(i, _)| i % shotovers_in_rack == local_shotover_index_in_rack) + .map(|(_, broker)| (broker.broker_id, ())) + .collect() } /// This method removes all groups from the OffsetFetch request and returns them split up by their destination. @@ -2041,6 +2077,10 @@ The connection to the client has been closed." body: ResponseBody::ListGroups(base), .. })) => Self::combine_list_groups(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListTransactions(base), + .. + })) => Self::combine_list_transactions(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2280,6 +2320,27 @@ The connection to the client has been closed." Ok(()) } + fn combine_list_transactions( + base_list_transactions: &mut ListTransactionsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ListTransactions(next_list_transactions), + .. + })) = next.frame() + { + base_list_transactions + .transaction_states + .extend(std::mem::take( + &mut next_list_transactions.transaction_states, + )); + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, @@ -3205,6 +3266,7 @@ The connection to the client has been closed." .all(|node| node.broker_id != new_node.broker_id); if missing_from_shared { nodes_shared.push(new_node); + nodes_shared.sort_by_key(|node| node.broker_id); } } diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 9a7fbb0fd..7ada3283b 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -11,7 +11,8 @@ use kafka_protocol::messages::{ list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest, - ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, + ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -194,6 +195,34 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter { } } +pub struct ListTransactionsSplitAndRouter; + +impl RequestSplitAndRouter for ListTransactionsSplitAndRouter { + type Request = ListTransactionsRequest; + type SubRequests = (); + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + _request: &mut Self::Request, + ) -> HashMap { + transform.split_request_by_routing_to_all_brokers() + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListTransactions(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) { + // No need to reassemble, each ListTransactions is an exact clone of the original + } +} + pub struct OffsetFetchSplitAndRouter; impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index c7c4cd937..2f56f9275 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -684,6 +684,25 @@ impl KafkaAdminJava { results } + pub async fn list_transactions(&self) -> Vec { + let java_results = self + .admin + .call("listTransactions", vec![]) + .call_async("all", vec![]) + .await; + + let mut results = vec![]; + for java_group in java_results.call("iterator", vec![]).into_iter() { + results.push( + java_group + .cast("org.apache.kafka.clients.admin.TransactionListing") + .call("transactionalId", vec![]) + .into_rust(), + ) + } + results + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 839e67ec0..e3151bbd1 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -461,6 +461,14 @@ impl KafkaAdmin { } } + pub async fn list_transactions(&self) -> Vec { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_transactions"), + Self::Java(java) => java.list_transactions().await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")]