Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#502] batch message support #526

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@
* The namespace of consumer.
*/
String namespace() default "";

/**
* The consumeMessageBatchMaxSize of consumer.
* @return
*/
int consumeMessageBatchMaxSize() default 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why consumeMessageBatchMaxSize default is zero and delayLevelWhenNextConsume default is 1024 ?


/**
* Message consume retry strategy in concurrently mode.
Expand All @@ -151,7 +157,7 @@
* 0,broker control retry frequency
* >0,client control retry frequency
*/
int delayLevelWhenNextConsume() default 0;
int delayLevelWhenNextConsume() default 1024;

/**
* The interval of suspending the pull in orderly mode, in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQBatchListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
Expand Down Expand Up @@ -56,7 +58,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware {
private RocketMQMessageConverter rocketMQMessageConverter;

public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
Expand Down Expand Up @@ -114,7 +116,7 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe
}

private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
RocketMQMessageListener annotation) {
RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

container.setRocketMQMessageListener(annotation);
Expand All @@ -132,6 +134,14 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
if (bean instanceof RocketMQBatchListener) {
container.setConsumeMessageBatchMaxSize(annotation.consumeMessageBatchMaxSize());
container.setRocketMQBatchListener((RocketMQBatchListener) bean);
} else if (bean instanceof RocketMQReplyListener) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
} else {
container.setRocketMQListener((RocketMQListener) bean);
}
container.setTlsEnable(environment.resolvePlaceholders(annotation.tlsEnable()));
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.spring.core;

import java.util.List;

public interface RocketMQBatchListener<T> {
void onMessages(List<T> message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
Expand All @@ -46,6 +49,7 @@
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQBatchListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
Expand Down Expand Up @@ -81,14 +85,14 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,

/**
* Suspending pulling time in orderly mode.
*
* <p>
* The minimum value is 10 and the maximum is 30000.
*/
private long suspendCurrentQueueTimeMillis = 1000;

/**
* Message consume retry strategy in concurrently mode.
*
* <p>
* -1,no retry,put into DLQ directly
* 0,broker control retry frequency
* >0,client control retry frequency
Expand All @@ -115,6 +119,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,

private RocketMQReplyListener rocketMQReplyListener;

private RocketMQBatchListener rocketMQBatchListener;

private RocketMQMessageListener rocketMQMessageListener;

private DefaultMQPushConsumer consumer;
Expand All @@ -134,6 +140,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private int maxReconsumeTimes;
private int replyTimeout;
private String tlsEnable;
private Integer consumeMessageBatchMaxSize;
private String namespace;
private long awaitTerminationMillisWhenShutdown;

Expand Down Expand Up @@ -217,9 +224,22 @@ public RocketMQListener getRocketMQListener() {
}

public void setRocketMQListener(RocketMQListener rocketMQListener) {
this.rocketMQBatchListener = null;
this.rocketMQListener = rocketMQListener;
}

public RocketMQBatchListener getRocketMQBatchListener() {
return rocketMQBatchListener;
}

public void setRocketMQBatchListener(RocketMQBatchListener rocketMQBatchListener) {
this.rocketMQBatchListener = rocketMQBatchListener;
}

private Object getCandidateRocketMQListener() {
return ObjectUtils.defaultIfNull(rocketMQBatchListener, rocketMQListener);
}

public RocketMQReplyListener getRocketMQReplyListener() {
return rocketMQReplyListener;
}
Expand Down Expand Up @@ -247,6 +267,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) {
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.consumeMessageBatchMaxSize = anno.consumeMessageBatchMaxSize();
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
this.instanceName = anno.instanceName();
Expand Down Expand Up @@ -287,6 +308,14 @@ public String getNamespace() {
public void setNamespace(String namespace) {
this.namespace = namespace;
}

public Integer getConsumeMessageBatchMaxSize() {
return consumeMessageBatchMaxSize;
}

public void setConsumeMessageBatchMaxSize(Integer consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}

public DefaultMQPushConsumer getConsumer() {
return consumer;
Expand Down Expand Up @@ -455,6 +484,54 @@ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderly
}
}

private class BatchMessageListenerConcurrently implements MessageListenerConcurrently {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
batchConsumeMessages(msgs);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", msgs, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

private class BatchMessageListenerOrderly implements MessageListenerOrderly {

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
batchConsumeMessages(msgs);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}", msgs, e);
context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}

return ConsumeOrderlyStatus.SUCCESS;
}
}

@SuppressWarnings("unchecked")
private boolean batchConsumeMessages(List<MessageExt> msgs) {
log.debug("received msgs, size: {}", msgs.size());
long now = System.currentTimeMillis();
try {
List messages = msgs.stream().map(DefaultRocketMQListenerContainer.this::doConvertMessage).collect(Collectors.toList());
rocketMQBatchListener.onMessages(messages);
} finally {
if (log.isDebugEnabled()) {
long costTime = System.currentTimeMillis() - now;
log.debug("batch consume {} cost: {} ms", msgs.stream().map(MessageExt::getMsgId).collect(Collectors.toList()), costTime);
}
}
return true;
}

private void handleMessage(
MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
if (rocketMQListener != null) {
Expand All @@ -467,15 +544,17 @@ private void handleMessage(
DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
producer.setSendMsgTimeout(replyTimeout);
producer.send(replyMessage, new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
@Override
public void onSuccess(SendResult sendResult) {
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.debug("Consumer replies message success.");
}
}

@Override public void onException(Throwable e) {
@Override
public void onException(Throwable e) {
log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
}
});
Expand Down Expand Up @@ -541,7 +620,8 @@ private Object doConvertMessage(MessageExt messageExt) {
} else {
//if the messageType has Generic Parameter, then use SmartMessageConverter#fromMessage with third parameter "conversionHint".
//we have validate the MessageConverter is SmartMessageConverter in this#getMethodParameter.
return ((SmartMessageConverter) this.getMessageConverter()).fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
return ((SmartMessageConverter) this.getMessageConverter())
.fromMessage(MessageBuilder.withPayload(str).build(), (Class<?>) ((ParameterizedType) messageType).getRawType(), methodParameter);
}
} catch (Exception e) {
log.info("convert failed. str:{}, msgType:{}", str, messageType);
Expand Down Expand Up @@ -648,6 +728,32 @@ private void initRocketMQPushConsumer() throws MQClientException {
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
consumer.setInstanceName(instanceName);

initMessageModel();

initSelectorType();

initConsumeMode();

Object candidateRocketMQListener = getCandidateRocketMQListener();
if (candidateRocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) candidateRocketMQListener).prepareStart(consumer);
}

consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);

//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(new Boolean(tlsEnable));

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}

}

private void initMessageModel() {
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
Expand All @@ -658,7 +764,9 @@ private void initRocketMQPushConsumer() throws MQClientException {
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
}

private void initSelectorType() throws MQClientException {
switch (selectorType) {
case TAG:
consumer.subscribe(topic, selectorExpression);
Expand All @@ -669,27 +777,20 @@ private void initRocketMQPushConsumer() throws MQClientException {
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
}

private void initConsumeMode() {
switch (consumeMode) {
case ORDERLY:
consumer.setMessageListener(new DefaultMessageListenerOrderly());
consumer.setMessageListener((rocketMQBatchListener != null) ? (new BatchMessageListenerOrderly()) : (new DefaultMessageListenerOrderly()));
break;
case CONCURRENTLY:
consumer.setMessageListener(new DefaultMessageListenerConcurrently());
consumer.setMessageListener((rocketMQBatchListener != null) ? (new BatchMessageListenerConcurrently()) : (new DefaultMessageListenerConcurrently()));
break;
default:
throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
}

//if String is not is equal "true" TLS mode will represent the as default value false
consumer.setUseTLS(new Boolean(tlsEnable));

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}

}


}
Loading