-
Notifications
You must be signed in to change notification settings - Fork 898
FAQ
-
How to connected many
nameserver
on production environment?rocketmq.name-server
support the configuration of multiplenameserver
, separated by;
. For example:172.19.0.1:9876; 172.19.0.2:9876
-
When was
rocketMQTemplate
destroyed?Developers do not need to manually execute the
rocketMQTemplate.destroy ()
method when usingrocketMQTemplate
to send a message in the project, androcketMQTemplate
will be destroyed automatically when the spring container is destroyed. -
start exception:
Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please
RocketMQ in the design do not want a consumer to deal with multiple types of messages at the same time, so the same
consumerGroup
consumer responsibility should be the same, do not do different things (that is, consumption of multiple topics). SuggestedconsumerGroup
andtopic
one correspondence. -
How is the message content body being serialized and deserialized?
RocketMQ's message body is stored as
byte []
. When the business system message content body if it isjava.lang.String
type, unified in accordance withutf-8
code intobyte []
; If the business system message content is notjava.lang.String
Type, then use jackson-databind serialized into theJSON
format string, and then unified in accordance withutf-8
code intobyte []
. -
How do I specify the
tags
for topic?RocketMQ best practice recommended: an application as much as possible with one Topic, the message sub-type with
tags
to identify,tags
can be set by the application free.When you use
rocketMQTemplate
to send a message, set the destination of the message by setting thedestination
parameter of the send method. Thedestination
format istopicName:tagName
,:
Precedes the name of the topic, followed by thetags
name.Note:
tags
looks a complex, but when sending a message , the destination can only specify one topic under atag
, can not specify multiple. -
How do I set the message's
key
when sending a message?You can send a message by overloading method like
xxxSend(String destination, Message<?> msg, ...)
, settingheaders
ofmsg
. for example:Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build(); rocketMQTemplate.send("topic-test", message);
Similarly, you can also set the message
FLAG
,WAIT_STORE_MSG_OK
and some other user-defined other header information according to the above method.Note:
In the case of converting Spring's Message to RocketMQ's Message, to prevent the
header
information from conflicting with RocketMQ's system properties, the prefixUSERS_
was added in front of allheader
names. So if you want to get a custom message header when consuming, please pass through the key at the beginning ofUSERS_
in the header. -
When consume message, in addition to get the message
payload
, but also want to get RocketMQ message of other system attributes, how to do?Consumers in the realization of
RocketMQListener
interface, only need to be generic for theMessageExt
can, so in theonMessage
method will receive RocketMQ native 'MessageExt` message.@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer2 implements RocketMQListener<MessageExt>{ public void onMessage(MessageExt messageExt) { log.info("received messageExt: {}", messageExt); } }
-
How do I specify where consumers start consuming messages?
The default consume offset please refer: RocketMQ FAQ. To customize the consumer's starting location, simply add a
RocketMQPushConsumerLifecycleListener
interface implementation to the consumer class. Examples are as follows:@Slf4j @Service @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1") public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(String message) { log.info("received message: {}", message); } @Override public void prepareStart(final DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } }
Similarly, any other configuration on
DefaultMQPushConsumer
can be done in the same way as above. -
How do I send transactional messages?
It needs two steps on client side:
a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented;
b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API.
Note: After rocketmq-spring version 2.1.0, the annotation @RocketmqTransactionListener cannot set txProducerGroup, AK, SK. And these values are consistent with the corresponding rocketMQTemplate
-
How do I create more than one RocketMQTemplate with a different name-server or other specific properties?
Step1. Define an extra RocketMQTemplate with required properties. You can define different nameservers, groupnames from the standard rocketMQTemplate. If you do not define, it will use the global configuration definition by default.
// The RocketMQTemplate's Spring Bean name is 'extRocketMQTemplate', same with the simplified class name (Initials lowercase) @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876" , ... // override other specific properties if needed ) public class ExtRocketMQTemplate extends RocketMQTemplate { // keep the body empty }
Step2. Use the extra RocketMQTemplate. e.g.
@Resource(name = "extRocketMQTemplate") // Must define the name to qualify to extra-defined RocketMQTemplate bean. private RocketMQTemplate extRocketMQTemplate;
Then you can use the template as normal.
-
How to use extra rocketmqtemplate to send transactional messages?
First, define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface. The rocketMQTemplateBeanName of the annotation field indicates the bean name of the extra RocketMQTemplate (if it is not set, the default is the standard rocketmptemplate). For example, if the extra RocketMQTemplate bean name is "extRocketMQTemplate", the code As follows:
@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } }
Then use extRocketMQTemplate to call sendTessagentransaction to send transactional messages.
-
How do I create a consumer Listener with different name-server other than the global Spring configuration 'rocketmq.name-server' ?
@Service @RocketMQMessageListener( nameServer = "NEW-NAMESERVER-LIST", // define new nameServer list topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1", enableMsgTrace = true, customizedTraceTopic = "my-trace-topic" ) public class MyNameServerConsumer implements RocketMQListener<String> { ... }