Skip to content

Commit

Permalink
fix: dead letter policy for batched messages (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
rliang authored Sep 11, 2023
1 parent 2faad19 commit cdb1fdf
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 26 deletions.
48 changes: 22 additions & 26 deletions src/consumer/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,34 +484,30 @@ impl<Exe: Executor> ConsumerEngine<Exe> {
}
};

match payload.metadata.num_messages_in_batch {
Some(_) => {
let it = BatchedMessageIterator::new(message.message_id, payload)?;
for (id, payload) in it {
// TODO: Dead letter policy for batched messages
self.send_to_consumer(id, payload).await?;
}
}
None => match (message.redelivery_count, self.dead_letter_policy.as_ref()) {
(Some(redelivery_count), Some(dead_letter_policy)) => {
let payloads = if payload.metadata.num_messages_in_batch.is_some() {
BatchedMessageIterator::new(message.message_id, payload)?.collect()
} else {
vec![(message.message_id, payload)]
};
for (message_id, payload) in payloads {
match (message.redelivery_count, &self.dead_letter_policy) {
(Some(redelivery_count), Some(dead_letter_policy))
if redelivery_count as usize >= dead_letter_policy.max_redeliver_count =>
{
// Send message to Dead Letter Topic and ack message in original topic
if redelivery_count as usize >= dead_letter_policy.max_redeliver_count {
self.client
.send(&dead_letter_policy.dead_letter_topic, payload.data)
.await?
.await
.map_err(|e| {
error!("One shot cancelled {:?}", e);
Error::Custom("DLQ send error".to_string())
})?;

self.ack(message.message_id, false);
} else {
self.send_to_consumer(message.message_id, payload).await?
}
self.client
.send(&dead_letter_policy.dead_letter_topic, payload.data)
.await?
.await
.map_err(|e| {
error!("One shot cancelled {:?}", e);
Error::Custom("DLQ send error".to_string())
})?;

self.ack(message_id, false);
}
_ => self.send_to_consumer(message.message_id, payload).await?,
},
_ => self.send_to_consumer(message_id, payload).await?,
}
}
Ok(())
}
Expand Down
95 changes: 95 additions & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,101 @@ mod tests {
dlq_consumer.ack(&dlq_msg).await.unwrap();
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
async fn dead_letter_queue_batched() {
use crate::ProducerOptions;

let _result = log::set_logger(&TEST_LOGGER);
log::set_max_level(LevelFilter::Debug);
let addr = "pulsar://127.0.0.1:6650";

let test_id: u16 = rand::random();
let topic = format!("dead_letter_queue_batched_test_{test_id}");

let dead_letter_topic = format!("{topic}_dlq");

let dead_letter_policy = DeadLetterPolicy {
max_redeliver_count: 1,
dead_letter_topic: dead_letter_topic.clone(),
};

let client: Pulsar<_> = Pulsar::builder(addr, TokioExecutor).build().await.unwrap();

println!("creating consumer");
let mut consumer: Consumer<TestData, _> = client
.consumer()
.with_topic(topic.clone())
.with_subscription("nack")
.with_subscription_type(SubType::Shared)
.with_dead_letter_policy(dead_letter_policy)
.build()
.await
.unwrap();

println!("created consumer");

println!("creating second consumer that consumes from the DLQ");
let mut dlq_consumer: Consumer<TestData, _> = client
.clone()
.consumer()
.with_topic(dead_letter_topic)
.with_subscription("dead_letter_topic")
.with_subscription_type(SubType::Shared)
.build()
.await
.unwrap();

println!("created second consumer");

let mut producer = client
.producer()
.with_topic(&topic)
.with_options(ProducerOptions {
batch_size: Some(2),
..Default::default()
})
.build()
.await
.unwrap();

let messages = vec![
TestData {
topic: topic.clone(),
msg: rand::random(),
},
TestData {
topic: topic.clone(),
msg: rand::random(),
},
];
let receipts = producer.send_all(&messages).await.unwrap();
producer.send_batch().await.unwrap();
try_join_all(receipts).await.unwrap();
println!("producer sends done");

for message in messages {
let msg = consumer.next().await.unwrap().unwrap();
println!("got message: {:?}", msg.payload);
assert_eq!(
message,
msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
// Nacking message to send it to DLQ
consumer.nack(&msg).await.unwrap();

let dlq_msg = dlq_consumer.next().await.unwrap().unwrap();
println!("got message: {:?}", dlq_msg.payload);
assert_eq!(
message,
dlq_msg.deserialize().unwrap(),
"we probably received a message from a previous run of the test"
);
dlq_consumer.ack(&dlq_msg).await.unwrap();
}
}

#[tokio::test]
#[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))]
async fn failover() {
Expand Down

0 comments on commit cdb1fdf

Please sign in to comment.