Skip to content

Commit

Permalink
low-level method to send messages in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Oct 24, 2024
1 parent f6835d5 commit ea4b91b
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation "space.jasan:groovy-closure-support:$closureSupportVersion"
implementation 'io.micronaut.validation:micronaut-validation'
implementation 'io.micronaut:micronaut-jackson-databind'
implementation 'io.micronaut.reactor:micronaut-reactor'

testImplementation project(':micronaut-amazon-awssdk-integration-testing')
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package com.agorapulse.micronaut.amazon.awssdk.sqs;

import io.micronaut.core.util.StringUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.SqsClient;
Expand Down Expand Up @@ -278,6 +280,54 @@ public String sendMessage(String queueName, String messageBody, int delaySeconds
return messageId;
}

@Override
public Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds, String groupId) {
String queueUrl = getQueueUrl(queueName);

return Flux.from(messageBodies).map(messageBody -> {
SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry
.builder()
.id(UUID.randomUUID().toString())
.messageBody(messageBody);

if (delaySeconds > 0) {
request.delaySeconds(delaySeconds);
}

if (StringUtils.isNotEmpty(groupId)) {
request.messageGroupId(groupId);
}

return request.build();
}).buffer(10).map(batch -> {
SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch);
SendMessageBatchResponse response = client.sendMessageBatch(request.build());
return response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList();
}).flatMap(Flux::fromIterable);
}

@Override
public Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration) {
String queueUrl = getQueueUrl(queueName);

return Flux.from(messageBodies).map(messageBody -> {
SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry.builder().messageBody(messageBody);
messageConfiguration.accept(request);
return request.build();
}).buffer(10).flatMap(batch -> Flux.<List<String>>generate(synchronousSink -> {
SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch);
SendMessageBatchResponse response = client.sendMessageBatch(request.build());
if (response.failed().isEmpty()) {
synchronousSink.next(response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList());
synchronousSink.complete();
} else {
synchronousSink.error(new IllegalArgumentException("Following messages were not sent:\n" + response.failed().stream().map(e ->
String.format("Message %s failed with code %s and message %s%n", e.id(), e.code(), e.message())
).toList()));
}
})).flatMap(Flux::fromIterable);
}

/**
* @param queueName
* @param messageBody
Expand Down Expand Up @@ -319,9 +369,7 @@ private void addQueue(String queueUrl) {
throw new IllegalStateException("Queue URL cannot be null or empty");
}

synchronized (queueUrlByNames) {
queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl);
}
queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl);
}

private void loadQueues() {
Expand All @@ -340,20 +388,16 @@ private void loadQueues() {
)
);

synchronized (queueUrlByNames) {
queueUrlByNames.clear();
queueUrlByNames.putAll(queueUrls);
}
}

private void removeQueue(String queueUrl) {
if (StringUtils.isEmpty(queueUrl)) {
throw new IllegalStateException("Queue URL cannot be null or empty");
}

synchronized (queueUrlByNames) {
queueUrlByNames.remove(getQueueNameFromUrl(queueUrl));
}
queueUrlByNames.remove(getQueueNameFromUrl(queueUrl));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
*/
package com.agorapulse.micronaut.amazon.awssdk.sqs;

import org.reactivestreams.Publisher;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import java.util.List;
Expand Down Expand Up @@ -288,4 +290,86 @@ default String sendMessage(String messageBody, Consumer<SendMessageRequest.Build
* @return message id
*/
String sendMessage(String queueName, String messageBody, Consumer<SendMessageRequest.Builder> messageConfiguration);

/**
* Send message immediately
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies) {
return sendMessages(queueName, messageBodies, 0);
}

/**
* Send message with given delay.
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @param groupId group id for FIFO queues
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds, String groupId);

/**
* Send message with given delay.
* @param queueName the name of the queue
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, int delaySeconds) {
return sendMessages(queueName, messageBodies, delaySeconds, null);
}

/**
* Send message in default queue immediately
* @param messageBodies the message bodies to be sent
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies) {
return sendMessages(getDefaultQueueName(), messageBodies);
}

/**
* Send message in the default queue with given delay.
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies, int delaySeconds) {
return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds);
}

/**
* Send message with given delay.
* @param messageBodies the message bodies to be sent
* @param delaySeconds the delay in seconds
* @param groupId group id for FIFO queues
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessage(Publisher<String> messageBodies, int delaySeconds, String groupId) {
return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds, groupId);
}

/**
* Sends message with additional configuration into the default queue.
* @param messageBodies the message bodies to be sent
* @param messageConfiguration additional configuration
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
default Publisher<String> sendMessages(Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration) {
return sendMessages(getDefaultQueueName(), messageBodies, messageConfiguration);
}

/**
* Sends message with additional configuration into the given queue.
* @param queueName name of the queue
* @param messageBodies the message bodies to be sent
* @param messageConfiguration additional configuration
* @return the publisher of message ids that must be subscribed in order to send the messages
*/
Publisher<String> sendMessages(String queueName, Publisher<String> messageBodies, Consumer<SendMessageBatchRequestEntry.Builder> messageConfiguration);


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import groovy.lang.DelegatesTo;
import groovy.transform.stc.ClosureParams;
import groovy.transform.stc.FromString;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import space.jasan.support.groovy.closure.ConsumerWithDelegate;

Expand Down Expand Up @@ -69,4 +71,40 @@ public static String sendMessage(
return self.sendMessage(queueName, messageBody, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends message with additional configuration into the default queue.
*
* @param messageBodies message bodies
* @param messageConfiguration additional configuration
* @return message id
*/
public static Publisher<String> sendMessages(
SimpleQueueService self,
Publisher<String> messageBodies,
@DelegatesTo(value = SendMessageBatchRequestEntry.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(self.getDefaultQueueName(), messageBodies, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends message with additional configuration into the given queue.
*
* @param queueName name of the queue
* @param messageBodies message bodies
* @param messageConfiguration additional configuration
* @return message id
*/
public static Publisher<String> sendMessages(
SimpleQueueService self,
String queueName,
Publisher<String> messageBodies,
@DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(queueName, messageBodies, ConsumerWithDelegate.create(messageConfiguration));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs

import io.micronaut.context.annotation.Property
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import reactor.core.publisher.Flux
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Retry
Expand Down Expand Up @@ -74,6 +75,21 @@ class SimpleQueueServiceSpec extends Specification {
then:
!service.receiveMessages()

when:
List<String> messageIds = Flux.from(service.sendMessages(Flux.just(DATA).repeat(11))).collectList().block()
and:
List<Message> firstBatch = service.receiveMessages(10)
then:
messageIds.size() == 12
firstBatch.size() == 10

when:
firstBatch.forEach { m -> service.deleteMessage(m.receiptHandle()) }
and:
List<Message> secondBatch = service.receiveMessages(10)
then:
secondBatch.size() == 2

when:
service.deleteQueue(TEST_QUEUE)
then:
Expand Down

0 comments on commit ea4b91b

Please sign in to comment.