Skip to content

Commit

Permalink
Route ListTransactions request
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 3, 2024
1 parent 257297b commit 9a1c514
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 6 deletions.
12 changes: 12 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,17 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
}
}

async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let _transaction_producer = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
.await;

let actual_results = admin.list_transactions().await;
let expected_results = ["some_transaction_id".to_owned()];
assert_eq!(actual_results, expected_results);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand All @@ -1485,6 +1496,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
}
}

Expand Down
41 changes: 36 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use kafka_protocol::messages::{
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -49,7 +50,7 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -1002,10 +1003,15 @@ impl KafkaSinkCluster {
..
})) => self.route_to_controller(request),

// route to all nodes
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(_),
..
})) => self.split_and_route_request::<ListTransactionsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -2018,6 +2024,10 @@ impl KafkaSinkCluster {
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(base),
..
})) => Self::combine_list_transactions(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2257,6 +2267,27 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_list_transactions(
base_list_transactions: &mut ListTransactionsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(next_list_transactions),
..
})) = next.frame()
{
base_list_transactions
.transaction_states
.extend(std::mem::take(
&mut next_list_transactions.transaction_states,
));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
31 changes: 30 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -194,6 +195,34 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
}
}

pub struct ListTransactionsSplitAndRouter;

impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
type Request = ListTransactionsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
_request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
// No need to reassemble, each ListTransactions is an exact clone of the original
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
19 changes: 19 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,25 @@ impl KafkaAdminJava {
results
}

pub async fn list_transactions(&self) -> Vec<String> {
let java_results = self
.admin
.call("listTransactions", vec![])
.call_async("all", vec![])
.await;

let mut results = vec![];
for java_group in java_results.call("iterator", vec![]).into_iter() {
results.push(
java_group
.cast("org.apache.kafka.clients.admin.TransactionListing")
.call("transactionalId", vec![])
.into_rust(),
)
}
results
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down
8 changes: 8 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ impl KafkaAdmin {
}
}

pub async fn list_transactions(&self) -> Vec<String> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_transactions"),
Self::Java(java) => java.list_transactions().await,
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down

0 comments on commit 9a1c514

Please sign in to comment.