From 3c31990f8b24b5e09533724a3b701b376dd489df Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 14 Feb 2023 10:35:57 +0800 Subject: [PATCH 1/6] support batch message --- .../ExtProducerResetConfiguration.java | 8 + .../ListenerContainerConfiguration.java | 7 + .../spring/core/RocketMQBatchListener.java | 7 + .../DefaultRocketMQListenerContainer.java | 112 ++++++++++-- .../DefaultRocketMQListenerContainerTest.java | 173 ++++++++++++++++++ 5 files changed, 296 insertions(+), 11 deletions(-) create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index 35aee2a2..bdf2b182 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -17,10 +17,16 @@ package org.apache.rocketmq.spring.autoconfigure; +import java.lang.reflect.Field; import java.util.Map; import java.util.stream.Collectors; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; @@ -114,6 +120,8 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota boolean isEnableMsgTrace = annotation.enableMsgTrace(); String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic()); customizedTraceTopic = StringUtils.hasLength(customizedTraceTopic) ? customizedTraceTopic : producerConfig.getCustomizedTraceTopic(); + + //if String is not is equal "true" TLS mode will represent the as default value false boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable())); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index a4a762a9..6c6d3e34 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQBatchListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; @@ -132,6 +133,12 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); + if (bean instanceof RocketMQBatchListener) { + container.setRocketMQBatchListener((RocketMQBatchListener)bean); + } + else { + container.setRocketMQListener((RocketMQListener)bean); + } container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable())); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { container.setRocketMQListener((RocketMQListener) bean); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java new file mode 100644 index 00000000..bab45ab3 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java @@ -0,0 +1,7 @@ +package org.apache.rocketmq.spring.core; + +import java.util.List; + +public interface RocketMQBatchListener { + void onMessages(List message); +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index 30e71356..5b630eb6 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -23,6 +23,9 @@ import java.nio.charset.Charset; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.ObjectUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; @@ -46,6 +49,7 @@ import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQBatchListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; @@ -115,6 +119,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private RocketMQReplyListener rocketMQReplyListener; + private RocketMQBatchListener rocketMQBatchListener; + private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; @@ -217,9 +223,22 @@ public RocketMQListener getRocketMQListener() { } public void setRocketMQListener(RocketMQListener rocketMQListener) { + this.rocketMQBatchListener = null; this.rocketMQListener = rocketMQListener; } + public RocketMQBatchListener getRocketMQBatchListener() { + return rocketMQBatchListener; + } + + public void setRocketMQBatchListener(RocketMQBatchListener rocketMQBatchListener) { + this.rocketMQBatchListener = rocketMQBatchListener; + } + + private Object getCandidateRocketMQListener() { + return ObjectUtils.defaultIfNull(rocketMQBatchListener, rocketMQListener); + } + public RocketMQReplyListener getRocketMQReplyListener() { return rocketMQReplyListener; } @@ -455,6 +474,56 @@ public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderly } } + private class BatchMessageListenerConcurrently implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + try { + batchConsumeMessages(msgs); + } catch (Exception e) { + log.warn("consume message failed. messageExt:{}", msgs, e); + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + } + + private class BatchMessageListenerOrderly implements MessageListenerOrderly { + + @Override + public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { + try { + batchConsumeMessages(msgs); + } + catch (Exception e) { + log.warn("consume message failed. messageExt:{}", msgs, e); + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + + return ConsumeOrderlyStatus.SUCCESS; + } + } + + @SuppressWarnings("unchecked") + private boolean batchConsumeMessages(List msgs) { + log.debug("received msgs, size: {}", msgs.size()); + long now = System.currentTimeMillis(); + try { + List messages = msgs.stream().map(DefaultRocketMQListenerContainer.this::doConvertMessage).collect(Collectors.toList()); + rocketMQBatchListener.onMessages(messages); + } + finally { + if (log.isDebugEnabled()) { + long costTime = System.currentTimeMillis() - now; + log.debug("batch consume {} cost: {} ms", msgs.stream().map(MessageExt::getMsgId).collect(Collectors.toList()), costTime); + } + } + return true; + } + private void handleMessage( MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException { if (rocketMQListener != null) { @@ -648,6 +717,31 @@ private void initRocketMQPushConsumer() throws MQClientException { consumer.setMaxReconsumeTimes(maxReconsumeTimes); consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown); consumer.setInstanceName(instanceName); + + initMessageModel(); + + initSelectorType(); + + initConsumeMode(); + + Object candidateRocketMQListener = getCandidateRocketMQListener(); + if (candidateRocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener)candidateRocketMQListener).prepareStart(consumer); + } + + + //if String is not is equal "true" TLS mode will represent the as default value false + consumer.setUseTLS(new Boolean(tlsEnable)); + + if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); + } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { + ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); + } + + } + + private void initMessageModel() { switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); @@ -658,7 +752,9 @@ private void initRocketMQPushConsumer() throws MQClientException { default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } + } + private void initSelectorType() throws MQClientException{ switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); @@ -669,27 +765,21 @@ private void initRocketMQPushConsumer() throws MQClientException { default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } + } + private void initConsumeMode() { switch (consumeMode) { case ORDERLY: - consumer.setMessageListener(new DefaultMessageListenerOrderly()); + consumer.setMessageListener((rocketMQBatchListener != null) ? (new BatchMessageListenerOrderly()) : (new DefaultMessageListenerOrderly())); break; case CONCURRENTLY: - consumer.setMessageListener(new DefaultMessageListenerConcurrently()); + consumer.setMessageListener((rocketMQBatchListener != null) ? (new BatchMessageListenerConcurrently()) : (new DefaultMessageListenerConcurrently())); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } + } - //if String is not is equal "true" TLS mode will represent the as default value false - consumer.setUseTLS(new Boolean(tlsEnable)); - - if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { - ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); - } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { - ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); - } - } } diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java index f7cd9cfb..8935b62e 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java @@ -16,11 +16,18 @@ */ package org.apache.rocketmq.spring.support; +import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.MessageListener; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -29,10 +36,12 @@ import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQBatchListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQReplyListener; import org.junit.Test; @@ -44,6 +53,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; @@ -53,6 +68,10 @@ import static org.mockito.Mockito.when; public class DefaultRocketMQListenerContainerTest { + + private final String notExceptedString = "not excepted test string msg type"; + private final String exceptedString = "test string msg type"; + @Test public void testGetMessageType() throws Exception { DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer(); @@ -256,6 +275,160 @@ public void testSetRocketMQMessageListener() { assertEquals(anno.instanceName(), container.getInstanceName()); } + @Test + public void testSelectorType() throws Exception { + DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer(); + listenerContainer.setConsumer(new DefaultMQPushConsumer()); + Method initSelectorType = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initSelectorType"); + initSelectorType.setAccessible(true); + + try { + listenerContainer.setRocketMQMessageListener(TagClass.class.getAnnotation(RocketMQMessageListener.class)); + initSelectorType.invoke(listenerContainer); + + listenerContainer.setRocketMQMessageListener(SQL92Class.class.getAnnotation(RocketMQMessageListener.class)); + initSelectorType.invoke(listenerContainer); + } catch (Exception e) { + + } + } + + @Test + public void testBatchGetMessages() throws Exception { + DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer(); + listenerContainer.setConsumer(new DefaultMQPushConsumer()); + Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType"); + messageType.setAccessible(true); + + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "TestScheduledThread"); + } + }); + Runnable r = () -> { + try { + MessageExt msg = new MessageExt(); + msg.setMsgId("X_SVEN_AUGUSTUS_0001"); + msg.setBody((exceptedString + " 1").getBytes(listenerContainer.getCharset())); + MessageExt msg2 = new MessageExt(); + msg2.setMsgId("X_SVEN_AUGUSTUS_0002"); + msg2.setBody((exceptedString + " 2").getBytes(listenerContainer.getCharset())); + List messages = Arrays.asList(msg, msg2); + + MessageListener l = listenerContainer.getConsumer().getMessageListener(); + if (l instanceof MessageListenerConcurrently) { + ((MessageListenerConcurrently)l).consumeMessage(messages, new ConsumeConcurrentlyContext(new MessageQueue())); + } + if (l instanceof MessageListenerOrderly) { + ((MessageListenerOrderly)l).consumeMessage(messages, new ConsumeOrderlyContext(new MessageQueue())); + } + } + catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + }; + messageType.set(listenerContainer, String.class); + + // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted + tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted + + // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for not excepted + tryRocketMQBatchListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted + + // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for excepted + tryRocketMQBatchListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted + + // RocketMQBatchListener IN ConsumeMode.ORDERLY, AND test for not excepted + tryRocketMQBatchListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted + + // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for excepted + tryRocketMQListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted + + // RocketMQListener IN ConsumeMode.CONCURRENTLY, AND test for not excepted + tryRocketMQListener(listenerContainer, ConcurrentlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted + + // RocketMQListener IN ConsumeMode.ORDERLY, AND test for excepted + tryRocketMQListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, exceptedString, true);// excepted + + // RocketMQListener IN ConsumeMode.ORDERLY, AND test for not excepted + tryRocketMQListener(listenerContainer, OrderlyClass.class, scheduledExecutorService, r, notExceptedString, false);// not excepted + } + + private void tryRocketMQBatchListener(DefaultRocketMQListenerContainer listenerContainer, + final Class rocketMQMessageListenerClass, + ScheduledExecutorService scheduledExecutorService, Runnable r, final String exceptedValue, + boolean exceptedTrueOrFalse) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, InterruptedException { + Method initConsumeMode = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode"); + initConsumeMode.setAccessible(true); + + final Boolean[] result = new Boolean[] {Boolean.FALSE}; + + // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted + final CountDownLatch countDownLatch = new CountDownLatch(1); + listenerContainer.setRocketMQBatchListener(new RocketMQBatchListener() { + @Override + public void onMessages(List messages) { + result[0] = messages.stream().anyMatch(m -> m.startsWith(exceptedValue)); + countDownLatch.countDown(); + } + }); + listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class)); + initConsumeMode.invoke(listenerContainer); + scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS); + countDownLatch.await(1000, TimeUnit.MILLISECONDS); + if (exceptedTrueOrFalse) + assertThat(result[0]).isTrue(); // excepted + else + assertThat(result[0]).isFalse(); // not excepted + } + + private void tryRocketMQListener(DefaultRocketMQListenerContainer listenerContainer, + final Class rocketMQMessageListenerClass, + ScheduledExecutorService scheduledExecutorService, Runnable r, final String exceptedValue, + boolean exceptedTrueOrFalse) throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, InterruptedException { + Method initConsumeMode = DefaultRocketMQListenerContainer.class.getDeclaredMethod("initConsumeMode"); + initConsumeMode.setAccessible(true); + + final Boolean[] result = new Boolean[] {Boolean.FALSE}; + + // RocketMQBatchListener IN ConsumeMode.CONCURRENTLY, AND test for excepted + final CountDownLatch countDownLatch = new CountDownLatch(1); + listenerContainer.setRocketMQListener(new RocketMQListener() { + @Override + public void onMessage(String message) { + result[0] = message.startsWith(exceptedValue); + countDownLatch.countDown(); + } + }); + listenerContainer.setRocketMQMessageListener(rocketMQMessageListenerClass.getAnnotation(RocketMQMessageListener.class)); + initConsumeMode.invoke(listenerContainer); + scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS); + countDownLatch.await(1000, TimeUnit.MILLISECONDS); + if (exceptedTrueOrFalse) + assertThat(result[0]).isTrue(); // excepted + else + assertThat(result[0]).isFalse(); // not excepted + } + + @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", selectorExpression = "*", selectorType = SelectorType.TAG) + static class TagClass { + } + + @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", selectorType = SelectorType.SQL92) + static class SQL92Class { + } + + @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", consumeMode = ConsumeMode.CONCURRENTLY) + static class ConcurrentlyClass { + } + + @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", consumeMode = ConsumeMode.ORDERLY) + static class OrderlyClass { + } + + + @RocketMQMessageListener(consumerGroup = "abc1", topic = "test", consumeMode = ConsumeMode.ORDERLY, consumeThreadNumber = 3456, From 14a88d3b7f6ab5991c05b9492fe719e9f928bdd8 Mon Sep 17 00:00:00 2001 From: chengxy Date: Tue, 14 Feb 2023 10:42:51 +0800 Subject: [PATCH 2/6] remove useless --- .../autoconfigure/ExtProducerResetConfiguration.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index bdf2b182..35aee2a2 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -17,16 +17,10 @@ package org.apache.rocketmq.spring.autoconfigure; -import java.lang.reflect.Field; import java.util.Map; import java.util.stream.Collectors; - -import org.apache.rocketmq.acl.common.AclClientRPCHook; -import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; -import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; @@ -120,8 +114,6 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota boolean isEnableMsgTrace = annotation.enableMsgTrace(); String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic()); customizedTraceTopic = StringUtils.hasLength(customizedTraceTopic) ? customizedTraceTopic : producerConfig.getCustomizedTraceTopic(); - - //if String is not is equal "true" TLS mode will represent the as default value false boolean useTLS = new Boolean(environment.resolvePlaceholders(annotation.tlsEnable())); From 234e36a4fa84ceec10bc19c76050aff0c2ed4982 Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 15 Feb 2023 13:12:53 +0800 Subject: [PATCH 3/6] fix checkstyle --- .../ListenerContainerConfiguration.java | 12 +++--- .../spring/core/RocketMQBatchListener.java | 19 ++++++++- .../DefaultRocketMQListenerContainer.java | 24 ++++++------ .../DefaultRocketMQListenerContainerTest.java | 39 ++++++++++--------- 4 files changed, 56 insertions(+), 38 deletions(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 6c6d3e34..c9049382 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -19,6 +19,7 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicLong; + import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; @@ -57,7 +58,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware { private RocketMQMessageConverter rocketMQMessageConverter; public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { + ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; @@ -115,7 +116,7 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, - RocketMQMessageListener annotation) { + RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); @@ -134,10 +135,9 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); if (bean instanceof RocketMQBatchListener) { - container.setRocketMQBatchListener((RocketMQBatchListener)bean); - } - else { - container.setRocketMQListener((RocketMQListener)bean); + container.setRocketMQBatchListener((RocketMQBatchListener) bean); + } else { + container.setRocketMQListener((RocketMQListener) bean); } container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable())); if (RocketMQListener.class.isAssignableFrom(bean.getClass())) { diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java index bab45ab3..5fa1b893 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQBatchListener.java @@ -1,7 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.spring.core; import java.util.List; public interface RocketMQBatchListener { - void onMessages(List message); + void onMessages(List message); } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index 5b630eb6..f6265481 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -85,14 +85,14 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, /** * Suspending pulling time in orderly mode. - * + *

* The minimum value is 10 and the maximum is 30000. */ private long suspendCurrentQueueTimeMillis = 1000; /** * Message consume retry strategy in concurrently mode. - * + *

* -1,no retry,put into DLQ directly * 0,broker control retry frequency * >0,client control retry frequency @@ -496,8 +496,7 @@ private class BatchMessageListenerOrderly implements MessageListenerOrderly { public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { try { batchConsumeMessages(msgs); - } - catch (Exception e) { + } catch (Exception e) { log.warn("consume message failed. messageExt:{}", msgs, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; @@ -514,8 +513,7 @@ private boolean batchConsumeMessages(List msgs) { try { List messages = msgs.stream().map(DefaultRocketMQListenerContainer.this::doConvertMessage).collect(Collectors.toList()); rocketMQBatchListener.onMessages(messages); - } - finally { + } finally { if (log.isDebugEnabled()) { long costTime = System.currentTimeMillis() - now; log.debug("batch consume {} cost: {} ms", msgs.stream().map(MessageExt::getMsgId).collect(Collectors.toList()), costTime); @@ -536,7 +534,8 @@ private void handleMessage( DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer(); producer.setSendMsgTimeout(replyTimeout); producer.send(replyMessage, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { + @Override + public void onSuccess(SendResult sendResult) { if (sendResult.getSendStatus() != SendStatus.SEND_OK) { log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus()); } else { @@ -544,7 +543,8 @@ private void handleMessage( } } - @Override public void onException(Throwable e) { + @Override + public void onException(Throwable e) { log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage()); } }); @@ -610,7 +610,8 @@ private Object doConvertMessage(MessageExt messageExt) { } else { //if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint". //we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter. - return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class) ((ParameterizedType) messageType).getRawType(), methodParameter); + return ((SmartMessageConverter) this.getMessageConverter()) + .fromMessage(MessageBuilder.withPayload(str).build(), (Class) ((ParameterizedType) messageType).getRawType(), methodParameter); } } catch (Exception e) { log.info("convert failed. str:{}, msgType:{}", str, messageType); @@ -726,7 +727,7 @@ private void initRocketMQPushConsumer() throws MQClientException { Object candidateRocketMQListener = getCandidateRocketMQListener(); if (candidateRocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { - ((RocketMQPushConsumerLifecycleListener)candidateRocketMQListener).prepareStart(consumer); + ((RocketMQPushConsumerLifecycleListener) candidateRocketMQListener).prepareStart(consumer); } @@ -754,7 +755,7 @@ private void initMessageModel() { } } - private void initSelectorType() throws MQClientException{ + private void initSelectorType() throws MQClientException { switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); @@ -781,5 +782,4 @@ private void initConsumeMode() { } - } diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java index 8935b62e..e5fda80c 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.rocketmq.spring.support; import java.io.UnsupportedEncodingException; @@ -318,13 +319,12 @@ public Thread newThread(Runnable r) { MessageListener l = listenerContainer.getConsumer().getMessageListener(); if (l instanceof MessageListenerConcurrently) { - ((MessageListenerConcurrently)l).consumeMessage(messages, new ConsumeConcurrentlyContext(new MessageQueue())); + ((MessageListenerConcurrently) l).consumeMessage(messages, new ConsumeConcurrentlyContext(new MessageQueue())); } if (l instanceof MessageListenerOrderly) { - ((MessageListenerOrderly)l).consumeMessage(messages, new ConsumeOrderlyContext(new MessageQueue())); + ((MessageListenerOrderly) l).consumeMessage(messages, new ConsumeOrderlyContext(new MessageQueue())); } - } - catch (UnsupportedEncodingException e) { + } catch (UnsupportedEncodingException e) { e.printStackTrace(); } }; @@ -377,10 +377,11 @@ public void onMessages(List messages) { initConsumeMode.invoke(listenerContainer); scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS); countDownLatch.await(1000, TimeUnit.MILLISECONDS); - if (exceptedTrueOrFalse) + if (exceptedTrueOrFalse) { assertThat(result[0]).isTrue(); // excepted - else + } else { assertThat(result[0]).isFalse(); // not excepted + } } private void tryRocketMQListener(DefaultRocketMQListenerContainer listenerContainer, @@ -405,10 +406,11 @@ public void onMessage(String message) { initConsumeMode.invoke(listenerContainer); scheduledExecutorService.schedule(r, 100, TimeUnit.MILLISECONDS); countDownLatch.await(1000, TimeUnit.MILLISECONDS); - if (exceptedTrueOrFalse) + if (exceptedTrueOrFalse) { assertThat(result[0]).isTrue(); // excepted - else + } else { assertThat(result[0]).isFalse(); // not excepted + } } @RocketMQMessageListener(consumerGroup = "consumerGroup1", topic = "test", selectorExpression = "*", selectorType = SelectorType.TAG) @@ -428,18 +430,17 @@ static class OrderlyClass { } - @RocketMQMessageListener(consumerGroup = "abc1", topic = "test", - consumeMode = ConsumeMode.ORDERLY, - consumeThreadNumber = 3456, - messageModel = MessageModel.BROADCASTING, - selectorType = SelectorType.SQL92, - selectorExpression = "selectorExpression", - tlsEnable = "tlsEnable", - namespace = "namespace", - delayLevelWhenNextConsume = 1234, - suspendCurrentQueueTimeMillis = 2345, - instanceName = "instanceName" + consumeMode = ConsumeMode.ORDERLY, + consumeThreadNumber = 3456, + messageModel = MessageModel.BROADCASTING, + selectorType = SelectorType.SQL92, + selectorExpression = "selectorExpression", + tlsEnable = "tlsEnable", + namespace = "namespace", + delayLevelWhenNextConsume = 1234, + suspendCurrentQueueTimeMillis = 2345, + instanceName = "instanceName" ) class TestRocketMQMessageListener { } From 296e904818185c90ab5b64abd2b8c9ddd738303e Mon Sep 17 00:00:00 2001 From: chengxy Date: Wed, 15 Feb 2023 13:44:25 +0800 Subject: [PATCH 4/6] add RocketMQReplyListener type --- .../spring/autoconfigure/ListenerContainerConfiguration.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index c9049382..c8227942 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -136,6 +136,8 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); if (bean instanceof RocketMQBatchListener) { container.setRocketMQBatchListener((RocketMQBatchListener) bean); + } else if (bean instanceof RocketMQReplyListener) { + container.setRocketMQReplyListener((RocketMQReplyListener) bean); } else { container.setRocketMQListener((RocketMQListener) bean); } From 503796e5a1ede4ab3c2560b48f372bd72e72824f Mon Sep 17 00:00:00 2001 From: windwheel Date: Sat, 25 Feb 2023 11:46:03 +0800 Subject: [PATCH 5/6] add consumeMessageBatchMaxSize --- .../spring/annotation/RocketMQMessageListener.java | 6 ++++++ .../autoconfigure/ListenerContainerConfiguration.java | 1 + .../support/DefaultRocketMQListenerContainer.java | 11 +++++++++++ .../support/DefaultRocketMQListenerContainerTest.java | 3 ++- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java index 36478645..481275f1 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java @@ -143,6 +143,12 @@ * The namespace of consumer. */ String namespace() default ""; + + /** + * The consumeMessageBatchMaxSize of consumer. + * @return + */ + int consumeMessageBatchMaxSize() default 0; /** * Message consume retry strategy in concurrently mode. diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index c8227942..8e3c58d2 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -135,6 +135,7 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); if (bean instanceof RocketMQBatchListener) { + container.setConsumeMessageBatchMaxSize(annotation.consumeMessageBatchMaxSize()); container.setRocketMQBatchListener((RocketMQBatchListener) bean); } else if (bean instanceof RocketMQReplyListener) { container.setRocketMQReplyListener((RocketMQReplyListener) bean); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index f6265481..77e092f9 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -140,6 +140,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private int maxReconsumeTimes; private int replyTimeout; private String tlsEnable; + private Integer consumeMessageBatchMaxSize; private String namespace; private long awaitTerminationMillisWhenShutdown; @@ -266,6 +267,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.tlsEnable = anno.tlsEnable(); this.namespace = anno.namespace(); this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume(); + this.consumeMessageBatchMaxSize = anno.consumeMessageBatchMaxSize(); this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis(); this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown()); this.instanceName = anno.instanceName(); @@ -306,6 +308,14 @@ public String getNamespace() { public void setNamespace(String namespace) { this.namespace = namespace; } + + public Integer getConsumeMessageBatchMaxSize() { + return consumeMessageBatchMaxSize; + } + + public void setConsumeMessageBatchMaxSize(Integer consumeMessageBatchMaxSize) { + this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; + } public DefaultMQPushConsumer getConsumer() { return consumer; @@ -730,6 +740,7 @@ private void initRocketMQPushConsumer() throws MQClientException { ((RocketMQPushConsumerLifecycleListener) candidateRocketMQListener).prepareStart(consumer); } + consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); //if String is not is equal "true" TLS mode will represent the as default value false consumer.setUseTLS(new Boolean(tlsEnable)); diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java index e5fda80c..e7c8c7d1 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java @@ -440,7 +440,8 @@ static class OrderlyClass { namespace = "namespace", delayLevelWhenNextConsume = 1234, suspendCurrentQueueTimeMillis = 2345, - instanceName = "instanceName" + instanceName = "instanceName", + consumeMessageBatchMaxSize = 1024 ) class TestRocketMQMessageListener { } From 59398f600fdb874d24ab73653408a6a14bc4f060 Mon Sep 17 00:00:00 2001 From: windwheel Date: Sat, 25 Feb 2023 11:48:17 +0800 Subject: [PATCH 6/6] change default value --- .../rocketmq/spring/annotation/RocketMQMessageListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java index 481275f1..3802b856 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java @@ -157,7 +157,7 @@ * 0,broker control retry frequency * >0,client control retry frequency */ - int delayLevelWhenNextConsume() default 0; + int delayLevelWhenNextConsume() default 1024; /** * The interval of suspending the pull in orderly mode, in milliseconds.