From ea4b91bb7673c20fbd40a4bf50f2834ff4993801 Mon Sep 17 00:00:00 2001 From: musketyr Date: Thu, 24 Oct 2024 10:37:26 +0200 Subject: [PATCH] low-level method to send messages in batch --- .../micronaut-amazon-awssdk-sqs.gradle | 1 + .../awssdk/sqs/DefaultSimpleQueueService.java | 60 +++++++++++-- .../amazon/awssdk/sqs/SimpleQueueService.java | 84 +++++++++++++++++++ .../groovy/SimpleQueueServiceExtensions.java | 38 +++++++++ .../awssdk/sqs/SimpleQueueServiceSpec.groovy | 16 ++++ 5 files changed, 191 insertions(+), 8 deletions(-) diff --git a/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle b/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle index ac201b5e8..c0c0c7577 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle +++ b/subprojects/micronaut-amazon-awssdk-sqs/micronaut-amazon-awssdk-sqs.gradle @@ -23,6 +23,7 @@ dependencies { implementation "space.jasan:groovy-closure-support:$closureSupportVersion" implementation 'io.micronaut.validation:micronaut-validation' implementation 'io.micronaut:micronaut-jackson-databind' + implementation 'io.micronaut.reactor:micronaut-reactor' testImplementation project(':micronaut-amazon-awssdk-integration-testing') } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java index b51d70633..ecd1b4756 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultSimpleQueueService.java @@ -18,8 +18,10 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs; import io.micronaut.core.util.StringUtils; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.sqs.SqsClient; @@ -278,6 +280,54 @@ public String sendMessage(String queueName, String messageBody, int delaySeconds return messageId; } + @Override + public Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds, String groupId) { + String queueUrl = getQueueUrl(queueName); + + return Flux.from(messageBodies).map(messageBody -> { + SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry + .builder() + .id(UUID.randomUUID().toString()) + .messageBody(messageBody); + + if (delaySeconds > 0) { + request.delaySeconds(delaySeconds); + } + + if (StringUtils.isNotEmpty(groupId)) { + request.messageGroupId(groupId); + } + + return request.build(); + }).buffer(10).map(batch -> { + SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch); + SendMessageBatchResponse response = client.sendMessageBatch(request.build()); + return response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList(); + }).flatMap(Flux::fromIterable); + } + + @Override + public Publisher sendMessages(String queueName, Publisher messageBodies, Consumer messageConfiguration) { + String queueUrl = getQueueUrl(queueName); + + return Flux.from(messageBodies).map(messageBody -> { + SendMessageBatchRequestEntry.Builder request = SendMessageBatchRequestEntry.builder().messageBody(messageBody); + messageConfiguration.accept(request); + return request.build(); + }).buffer(10).flatMap(batch -> Flux.>generate(synchronousSink -> { + SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(batch); + SendMessageBatchResponse response = client.sendMessageBatch(request.build()); + if (response.failed().isEmpty()) { + synchronousSink.next(response.successful().stream().map(SendMessageBatchResultEntry::messageId).toList()); + synchronousSink.complete(); + } else { + synchronousSink.error(new IllegalArgumentException("Following messages were not sent:\n" + response.failed().stream().map(e -> + String.format("Message %s failed with code %s and message %s%n", e.id(), e.code(), e.message()) + ).toList())); + } + })).flatMap(Flux::fromIterable); + } + /** * @param queueName * @param messageBody @@ -319,9 +369,7 @@ private void addQueue(String queueUrl) { throw new IllegalStateException("Queue URL cannot be null or empty"); } - synchronized (queueUrlByNames) { - queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl); - } + queueUrlByNames.put(getQueueNameFromUrl(queueUrl), queueUrl); } private void loadQueues() { @@ -340,10 +388,8 @@ private void loadQueues() { ) ); - synchronized (queueUrlByNames) { queueUrlByNames.clear(); queueUrlByNames.putAll(queueUrls); - } } private void removeQueue(String queueUrl) { @@ -351,9 +397,7 @@ private void removeQueue(String queueUrl) { throw new IllegalStateException("Queue URL cannot be null or empty"); } - synchronized (queueUrlByNames) { - queueUrlByNames.remove(getQueueNameFromUrl(queueUrl)); - } + queueUrlByNames.remove(getQueueNameFromUrl(queueUrl)); } } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java index 368413a0d..72068cbf9 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueService.java @@ -17,8 +17,10 @@ */ package com.agorapulse.micronaut.amazon.awssdk.sqs; +import org.reactivestreams.Publisher; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import java.util.List; @@ -288,4 +290,86 @@ default String sendMessage(String messageBody, Consumer messageConfiguration); + + /** + * Send message immediately + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(String queueName, Publisher messageBodies) { + return sendMessages(queueName, messageBodies, 0); + } + + /** + * Send message with given delay. + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @param groupId group id for FIFO queues + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds, String groupId); + + /** + * Send message with given delay. + * @param queueName the name of the queue + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(String queueName, Publisher messageBodies, int delaySeconds) { + return sendMessages(queueName, messageBodies, delaySeconds, null); + } + + /** + * Send message in default queue immediately + * @param messageBodies the message bodies to be sent + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies) { + return sendMessages(getDefaultQueueName(), messageBodies); + } + + /** + * Send message in the default queue with given delay. + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies, int delaySeconds) { + return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds); + } + + /** + * Send message with given delay. + * @param messageBodies the message bodies to be sent + * @param delaySeconds the delay in seconds + * @param groupId group id for FIFO queues + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessage(Publisher messageBodies, int delaySeconds, String groupId) { + return sendMessages(getDefaultQueueName(), messageBodies, delaySeconds, groupId); + } + + /** + * Sends message with additional configuration into the default queue. + * @param messageBodies the message bodies to be sent + * @param messageConfiguration additional configuration + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + default Publisher sendMessages(Publisher messageBodies, Consumer messageConfiguration) { + return sendMessages(getDefaultQueueName(), messageBodies, messageConfiguration); + } + + /** + * Sends message with additional configuration into the given queue. + * @param queueName name of the queue + * @param messageBodies the message bodies to be sent + * @param messageConfiguration additional configuration + * @return the publisher of message ids that must be subscribed in order to send the messages + */ + Publisher sendMessages(String queueName, Publisher messageBodies, Consumer messageConfiguration); + + } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java index 4164e9f24..87cf34b73 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/groovy/SimpleQueueServiceExtensions.java @@ -23,6 +23,8 @@ import groovy.lang.DelegatesTo; import groovy.transform.stc.ClosureParams; import groovy.transform.stc.FromString; +import org.reactivestreams.Publisher; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import space.jasan.support.groovy.closure.ConsumerWithDelegate; @@ -69,4 +71,40 @@ public static String sendMessage( return self.sendMessage(queueName, messageBody, ConsumerWithDelegate.create(messageConfiguration)); } + /** + * Sends message with additional configuration into the default queue. + * + * @param messageBodies message bodies + * @param messageConfiguration additional configuration + * @return message id + */ + public static Publisher sendMessages( + SimpleQueueService self, + Publisher messageBodies, + @DelegatesTo(value = SendMessageBatchRequestEntry.Builder.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder") + Closure messageConfiguration + ) { + return self.sendMessages(self.getDefaultQueueName(), messageBodies, ConsumerWithDelegate.create(messageConfiguration)); + } + + /** + * Sends message with additional configuration into the given queue. + * + * @param queueName name of the queue + * @param messageBodies message bodies + * @param messageConfiguration additional configuration + * @return message id + */ + public static Publisher sendMessages( + SimpleQueueService self, + String queueName, + Publisher messageBodies, + @DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST) + @ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry.Builder") + Closure messageConfiguration + ) { + return self.sendMessages(queueName, messageBodies, ConsumerWithDelegate.create(messageConfiguration)); + } + } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy index 984a36ddc..8aacfda20 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/SimpleQueueServiceSpec.groovy @@ -19,6 +19,7 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs import io.micronaut.context.annotation.Property import io.micronaut.test.extensions.spock.annotation.MicronautTest +import reactor.core.publisher.Flux import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.QueueAttributeName import spock.lang.Retry @@ -74,6 +75,21 @@ class SimpleQueueServiceSpec extends Specification { then: !service.receiveMessages() + when: + List messageIds = Flux.from(service.sendMessages(Flux.just(DATA).repeat(11))).collectList().block() + and: + List firstBatch = service.receiveMessages(10) + then: + messageIds.size() == 12 + firstBatch.size() == 10 + + when: + firstBatch.forEach { m -> service.deleteMessage(m.receiptHandle()) } + and: + List secondBatch = service.receiveMessages(10) + then: + secondBatch.size() == 2 + when: service.deleteQueue(TEST_QUEUE) then: