Skip to content

Commit

Permalink
Route ListTransactions request
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 5, 2024
1 parent f81fbdb commit fd59ddb
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 23 deletions.
14 changes: 13 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,10 +1456,21 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) {

let actual_results = admin.list_groups().await;
if !actual_results.contains(&"list_groups_test".to_owned()) {
panic!("Expected to find list_groups_test in {actual_results:?} but was misisng")
panic!("Expected to find \"list_groups_test\" in {actual_results:?} but was missing")
}
}

async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let _transaction_producer = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
.await;

let actual_results = admin.list_transactions().await;
let expected_results = ["some_transaction_id".to_owned()];
assert_eq!(actual_results, expected_results);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand All @@ -1485,6 +1496,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
}
}

Expand Down
104 changes: 83 additions & 21 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use kafka_protocol::messages::{
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -49,7 +50,7 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -164,6 +165,7 @@ struct KafkaSinkClusterBuilder {
first_contact_points: Vec<KafkaAddress>,
shotover_nodes: Vec<ShotoverNode>,
rack: StrBytes,
broker_id: BrokerId,
connect_timeout: Duration,
read_timeout: Option<Duration>,
controller_broker: Arc<AtomicBrokerId>,
Expand Down Expand Up @@ -214,6 +216,7 @@ impl KafkaSinkClusterBuilder {
.map(|x| x.get_builder(connect_timeout, read_timeout))
.transpose()?,
shotover_nodes,
broker_id: BrokerId(local_shotover_broker_id),
rack,
connect_timeout,
read_timeout,
Expand All @@ -236,6 +239,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
first_contact_points: self.first_contact_points.clone(),
shotover_nodes: self.shotover_nodes.clone(),
rack: self.rack.clone(),
broker_id: self.broker_id,
nodes: vec![],
nodes_shared: self.nodes_shared.clone(),
controller_broker: self.controller_broker.clone(),
Expand Down Expand Up @@ -300,6 +304,7 @@ pub(crate) struct KafkaSinkCluster {
first_contact_points: Vec<KafkaAddress>,
shotover_nodes: Vec<ShotoverNode>,
rack: StrBytes,
broker_id: BrokerId,
nodes: Vec<KafkaNode>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
controller_broker: Arc<AtomicBrokerId>,
Expand Down Expand Up @@ -1025,10 +1030,15 @@ The connection to the client has been closed."
..
})) => self.route_to_controller(request),

// route to all nodes
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(_),
..
})) => self.split_and_route_request::<ListTransactionsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -1444,24 +1454,50 @@ The connection to the client has been closed."
result
}

/// Route broadcasted requests to all brokers split across all shotover nodes.
/// That is, each shotover node in a rack will deterministically be assigned a portion of the rack to route the request to.
/// If a shotover node is the only node in its rack it will route to all kafka brokers in the rack.
/// When combined with a client that is routing this request to all shotover nodes, the request will reach each kafka broker exactly once.
///
/// The logic in this function relies on `self.shotover_nodes` and `self.nodes` being sorted by broker id.
fn split_request_by_routing_to_all_brokers(&mut self) -> HashMap<BrokerId, ()> {
let mut result: HashMap<BrokerId, ()> = Default::default();

for broker in self.nodes.iter().filter(|node| {
node.is_up()
&& node
.rack
.as_ref()
.map(|rack| rack == &self.rack)
// If the cluster is not using racks, include all brokers in the list.
// This ensure we get full coverage of the cluster.
// The client driver can filter out the resulting duplicates.
.unwrap_or(true)
}) {
result.insert(broker.broker_id, ());
}
// Will always be at least 1, since the shotover this code is running in will always be included.
let shotovers_in_rack = self
.shotover_nodes
.iter()
.filter(|node| node.is_up() && node.rack == self.rack)
.count();

result
// The offset of this shotover node within the rack
// will be 0 <= shotover_offsets_within_rack < shotovers_in_rack
// The purpose is to create a deterministic way of splitting up broadcasted requests across shotover nodes.
let local_shotover_index_in_rack = self
.shotover_nodes
.iter()
.filter(|node| node.is_up() && node.rack == self.rack)
// start enumerating AFTER filtering down to brokers in the rack
.enumerate()
.find(|node| node.1.broker_id == self.broker_id)
.unwrap()
.0;

self.nodes
.iter()
.filter(|node| {
node.is_up()
&& node
.rack
.as_ref()
.map(|rack| rack == &self.rack)
// If the cluster is not using racks, include all brokers in the list.
.unwrap_or(true)
})
// start enumerating AFTER filtering down to brokers in the rack
.enumerate()
// assign each shotover node in the rack a subset of the brokers in the rack
.filter(|(i, _)| i % shotovers_in_rack == local_shotover_index_in_rack)
.map(|(_, broker)| (broker.broker_id, ()))
.collect()
}

/// This method removes all groups from the OffsetFetch request and returns them split up by their destination.
Expand Down Expand Up @@ -2041,6 +2077,10 @@ The connection to the client has been closed."
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(base),
..
})) => Self::combine_list_transactions(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2280,6 +2320,27 @@ The connection to the client has been closed."
Ok(())
}

fn combine_list_transactions(
base_list_transactions: &mut ListTransactionsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(next_list_transactions),
..
})) = next.frame()
{
base_list_transactions
.transaction_states
.extend(std::mem::take(
&mut next_list_transactions.transaction_states,
));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down Expand Up @@ -3205,6 +3266,7 @@ The connection to the client has been closed."
.all(|node| node.broker_id != new_node.broker_id);
if missing_from_shared {
nodes_shared.push(new_node);
nodes_shared.sort_by_key(|node| node.broker_id);
}
}

Expand Down
31 changes: 30 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -194,6 +195,34 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
}
}

pub struct ListTransactionsSplitAndRouter;

impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
type Request = ListTransactionsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
_request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
// No need to reassemble, each ListTransactions is an exact clone of the original
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
19 changes: 19 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,25 @@ impl KafkaAdminJava {
results
}

pub async fn list_transactions(&self) -> Vec<String> {
let java_results = self
.admin
.call("listTransactions", vec![])
.call_async("all", vec![])
.await;

let mut results = vec![];
for java_group in java_results.call("iterator", vec![]).into_iter() {
results.push(
java_group
.cast("org.apache.kafka.clients.admin.TransactionListing")
.call("transactionalId", vec![])
.into_rust(),
)
}
results
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down
8 changes: 8 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ impl KafkaAdmin {
}
}

pub async fn list_transactions(&self) -> Vec<String> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_transactions"),
Self::Java(java) => java.list_transactions().await,
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down

0 comments on commit fd59ddb

Please sign in to comment.