From 35452f885521015abe8d7a62a932733f4b4f1f2b Mon Sep 17 00:00:00 2001 From: JoeKerouac Date: Thu, 12 Jan 2023 16:21:31 +0800 Subject: [PATCH] adds RocketMQTracing Signed-off-by: Adrian Cole --- brave-bom/pom.xml | 10 + instrumentation/README.md | 1 + instrumentation/pom.xml | 1 + instrumentation/rocketmq-client/README.md | 100 +++++++ instrumentation/rocketmq-client/bnd.bnd | 6 + instrumentation/rocketmq-client/pom.xml | 68 +++++ .../rocketmq/client/RocketMQTracing.java | 109 ++++++++ .../java/brave/rocketmq/client/SpanUtil.java | 49 ++++ .../brave/rocketmq/client/StringUtils.java | 26 ++ .../brave/rocketmq/client/TraceConstants.java | 26 ++ .../client/TracingConsumerRequest.java | 80 ++++++ .../TracingMessageListenerConcurrently.java | 71 +++++ .../client/TracingMessageListenerOrderly.java | 70 +++++ .../client/TracingProducerRequest.java | 79 ++++++ .../client/TracingSendMessageHook.java | 76 +++++ .../client/ITRocketMQTracingTest.java | 260 ++++++++++++++++++ .../rocketmq/client/RocketMQContainer.java | 66 +++++ .../src/test/resources/broker.conf | 3 + .../src/test/resources/log4j2.properties | 9 + .../src/test/resources/start.sh | 30 ++ 20 files changed, 1140 insertions(+) create mode 100644 instrumentation/rocketmq-client/README.md create mode 100644 instrumentation/rocketmq-client/bnd.bnd create mode 100644 instrumentation/rocketmq-client/pom.xml create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java create mode 100644 instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java create mode 100644 instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java create mode 100644 instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java create mode 100644 instrumentation/rocketmq-client/src/test/resources/broker.conf create mode 100644 instrumentation/rocketmq-client/src/test/resources/log4j2.properties create mode 100644 instrumentation/rocketmq-client/src/test/resources/start.sh diff --git a/brave-bom/pom.xml b/brave-bom/pom.xml index bba48e8cd9..4769371f91 100644 --- a/brave-bom/pom.xml +++ b/brave-bom/pom.xml @@ -226,6 +226,16 @@ brave-instrumentation-okhttp3 ${project.version} + + ${project.groupId} + brave-instrumentation-rocketmq-client + ${project.version} + + + ${project.groupId} + brave-instrumentation-rocketmq-clients + ${project.version} + ${project.groupId} brave-instrumentation-rpc diff --git a/instrumentation/README.md b/instrumentation/README.md index 8e76945ad6..ae07ffd68e 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -20,6 +20,7 @@ Here's a brief overview of what's packaged here: * [mysql8](mysql8/README.md) - Tracing MySQL v8 statement interceptor * [netty-codec-http](netty-codec-http/README.md) - Tracing handler for [Netty](http://netty.io/) 4.x http servers * [okhttp3](okhttp3/README.md) - Tracing decorators for [OkHttp](https://github.com/square/okhttp) 3.x +* [rocketmq-client](rocketmq-client/README.md) - Tracing decorators for RocketMQ producers and consumers. * [servlet](servlet/README.md) - Tracing filter for Servlet 2.5+ (including Async) * [spring-rabbit](spring-rabbit/README.md) - Tracing MessagePostProcessor and ListenerAdvice for [Spring Rabbit](https://spring.io/guides/gs/messaging-rabbitmq/) * [spring-web](spring-web/README.md) - Tracing interceptor for [Spring RestTemplate](https://spring.io/guides/gs/consuming-rest/) diff --git a/instrumentation/pom.xml b/instrumentation/pom.xml index c0e2f5a4d4..c2dd48809d 100644 --- a/instrumentation/pom.xml +++ b/instrumentation/pom.xml @@ -51,6 +51,7 @@ mysql6 mysql8 okhttp3 + rocketmq-client rpc servlet servlet-jakarta diff --git a/instrumentation/rocketmq-client/README.md b/instrumentation/rocketmq-client/README.md new file mode 100644 index 0000000000..c447f15103 --- /dev/null +++ b/instrumentation/rocketmq-client/README.md @@ -0,0 +1,100 @@ +# brave-instrumentation-rocketmq-client + +## Tracing for RocketMQ Client + +This module provides instrumentation for RocketMQ based services. + +## example + +### producer + +The key is to register our hook to the producer + +```java +package brave.rocketmq.client; + +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing producerTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // todo This is the key, register the hook to the producer + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new SendMessageBraveHookImpl(producerTracing)); + // Replace with actual address + producer.setNamesrvAddr("127.0.0.1:9876"); + producer.start(); + producer.send(message); + + producer.shutdown(); + } +} + +``` + +### consumer + +Replace `org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently` +with `brave.rocketmq.client.TracingMessageListenerConcurrently` +or `org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly` +with `brave.rocketmq.client.TracingMessageListenerOrderly`; + +```java +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.Optional; + +public class ProducerExample { + + public static void main(String[] args) throws Exception { + // todo Replaced with actual tracing construct + Tracing tracing = Tracing.newBuilder().build(); + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + RocketMQTracing producerTracing = RocketMQTracing.create( + MessagingTracing.newBuilder(tracing).producerSampler(producerSampler).build()); + + String topic = "testPushConsumer"; + String nameserverAddr = "127.0.0.1:9876"; + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testPushConsumer"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + consumer.registerMessageListener(new TraceableMessageListenerConcurrently(0, producerTracing) { + @Override + protected void handleMessage(MessageExt messageExt) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + // do something + } + }); + consumer.start(); + } +} + +``` + diff --git a/instrumentation/rocketmq-client/bnd.bnd b/instrumentation/rocketmq-client/bnd.bnd new file mode 100644 index 0000000000..ff1f9dad33 --- /dev/null +++ b/instrumentation/rocketmq-client/bnd.bnd @@ -0,0 +1,6 @@ +# We use brave.internal.Nullable, but it is not used at runtime. +Import-Package: \ + !brave.internal*,\ + * +Export-Package: \ + brave.rocketmq.client diff --git a/instrumentation/rocketmq-client/pom.xml b/instrumentation/rocketmq-client/pom.xml new file mode 100644 index 0000000000..3b62306596 --- /dev/null +++ b/instrumentation/rocketmq-client/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + io.zipkin.brave + brave-instrumentation-parent + 6.0.1-SNAPSHOT + + + brave-instrumentation-rocketmq-client + Brave Instrumentation: RocketMQ Client + + + + brave.rocketmq.client + + ${project.basedir}/../.. + + 4.7.0 + + --add-opens java.base/java.nio=ALL-UNNAMED + + + + + ${project.groupId} + brave-instrumentation-messaging + ${project.version} + + + org.apache.rocketmq + rocketmq-client + ${rocketmq.version} + provided + + + + ${project.groupId} + brave-tests + test + ${project.version} + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java new file mode 100644 index 0000000000..ca107c98d2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/RocketMQTracing.java @@ -0,0 +1,109 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.propagation.Propagation; +import brave.propagation.TraceContext.Extractor; +import brave.propagation.TraceContext.Injector; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import java.util.Map; + +public class RocketMQTracing { + public static RocketMQTracing create(Tracing tracing) { + return new RocketMQTracing(MessagingTracing.create(tracing), TraceConstants.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing) { + return new RocketMQTracing(messagingTracing, TraceConstants.ROCKETMQ_SERVICE); + } + + public static RocketMQTracing create(MessagingTracing messagingTracing, + String remoteServiceName) { + return new RocketMQTracing(messagingTracing, remoteServiceName); + } + + final Tracing tracing; + final Tracer tracer; + final Extractor producerExtractor; + final Extractor consumerExtractor; + final Injector producerInjector; + final Injector consumerInjector; + final String[] traceIdHeaders; + final SamplerFunction producerSampler, consumerSampler; + final String remoteServiceName; + + RocketMQTracing(MessagingTracing messagingTracing, + String remoteServiceName) { // intentionally hidden constructor + this.tracing = messagingTracing.tracing(); + this.tracer = tracing.tracer(); + Propagation propagation = messagingTracing.propagation(); + this.producerExtractor = propagation.extractor(TracingProducerRequest.GETTER); + this.consumerExtractor = propagation.extractor(TracingConsumerRequest.GETTER); + this.producerInjector = propagation.injector(TracingProducerRequest.SETTER); + this.consumerInjector = propagation.injector(TracingConsumerRequest.SETTER); + this.producerSampler = messagingTracing.producerSampler(); + this.consumerSampler = messagingTracing.consumerSampler(); + this.remoteServiceName = remoteServiceName; + + // We clear the trace ID headers, so that a stale consumer span is not preferred over current + // listener. We intentionally don't clear BaggagePropagation.allKeyNames as doing so will + // application fields "user_id" or "country_code" + this.traceIdHeaders = propagation.keys().toArray(new String[0]); + } + + TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(Extractor extractor, + R request, + Map properties) { + TraceContextOrSamplingFlags extracted = extractor.extract(request); + // Clear any propagation keys present in the headers + if (extracted.samplingFlags() == null) { // then trace IDs were extracted + if (properties != null) { + clearTraceIdHeaders(properties); + } + } + return extracted; + } + + /** Creates a potentially noop remote span representing this request. */ + Span nextMessagingSpan(SamplerFunction sampler, MessagingRequest request, + TraceContextOrSamplingFlags extracted) { + Boolean sampled = extracted.sampled(); + // only recreate the context if the messaging sampler made a decision + if (sampled == null && (sampled = sampler.trySample(request)) != null) { + extracted = extracted.sampled(sampled); + } + return tracer.nextSpan(extracted); + } + + // We can't just skip clearing headers we use because we might inject B3 single, yet have stale B3 + // multi, or vice versa. + void clearTraceIdHeaders(Map headers) { + for (String traceIDHeader : traceIdHeaders) + headers.remove(traceIDHeader); + } + + public Tracing tracing() { + return tracing; + } + + public Tracer tracer() { + return tracer; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java new file mode 100644 index 0000000000..18fd080925 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/SpanUtil.java @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.messaging.MessagingRequest; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; +import brave.sampler.SamplerFunction; +import java.util.Map; + +class SpanUtil { + static Span createAndStartSpan(RocketMQTracing tracing, + TraceContext.Extractor extractor, SamplerFunction sampler, T request, + Map props) { + Tracer tracer = tracing.tracer; + CurrentTraceContext currentTraceContext = tracing.tracing.currentTraceContext(); + TraceContext traceContext = currentTraceContext.get(); + Span span; + + if (traceContext == null) { + TraceContextOrSamplingFlags extracted = + tracing.extractAndClearTraceIdHeaders(extractor, request, props); + span = tracing.nextMessagingSpan(sampler, request, extracted); + } else { + span = tracer.newChild(traceContext); + } + + span.kind(request.spanKind()); + span.remoteServiceName(tracing.remoteServiceName); + span.tag(TraceConstants.ROCKETMQ_TOPIC, request.channelName()); + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.start(timestamp); + return span; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java new file mode 100644 index 0000000000..9d03d1d3df --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/StringUtils.java @@ -0,0 +1,26 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +class StringUtils { + + // TODO: we shouldn't add tags with empty values! + static String getOrEmpty(String obj) { + if (obj == null) { + return ""; + } else { + return obj; + } + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java new file mode 100644 index 0000000000..fec6c3904d --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TraceConstants.java @@ -0,0 +1,26 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package brave.rocketmq.client; + +public class TraceConstants { + + public static final String TO_PREFIX = "To_"; + public static final String FROM_PREFIX = "From_"; + public static final String ROCKETMQ_SERVICE = "rocketmq"; + + // TODO: maybe like HttpTags.PATH if we support extended tags on first version + public static final String ROCKETMQ_TAGS = "rocketmq.tags"; + public static final String ROCKETMQ_TOPIC = "rocketmq.topic"; +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java new file mode 100644 index 0000000000..b63886aab3 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingConsumerRequest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.messaging.ConsumerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.MessageExt; + +final class TracingConsumerRequest extends ConsumerRequest { + + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public String get(TracingConsumerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "Message::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Span.Kind spanKind() { + return Span.Kind.CONSUMER; + } + + @Override public void put(TracingConsumerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "Message::putUserProperty"; + } + }; + + final MessageExt delegate; + + TracingConsumerRequest(MessageExt delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public MessageExt unwrap() { + return delegate; + } + + @Override public String operation() { + return "receive"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return delegate.getMsgId(); + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java new file mode 100644 index 0000000000..3e60625c8b --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerConcurrently.java @@ -0,0 +1,71 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; + +// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can +// do custom tagging via their own MessageListenerConcurrently. +// Maybe expose RocketMQTracing.messageListenerConcurrently() to wrap theirs or make spans default +// and not expose this. +public abstract class TracingMessageListenerConcurrently implements MessageListenerConcurrently { + + private final int delayLevelWhenNextConsume; + + private final RocketMQTracing tracing; + + public TracingMessageListenerConcurrently(int delayLevelWhenNextConsume, + RocketMQTracing tracing) { + this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; + this.tracing = tracing; + } + + @Override + public final ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + + ConsumeConcurrentlyStatus result; + try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + result = handleMessage(msg, context); + } catch (Exception e) { + context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); + result = ConsumeConcurrentlyStatus.RECONSUME_LATER; + } finally { + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.finish(timestamp); + } + + if (result != ConsumeConcurrentlyStatus.CONSUME_SUCCESS) { + return result; + } + } + + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + + protected abstract ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, + ConsumeConcurrentlyContext context); +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java new file mode 100644 index 0000000000..472ff7d6ee --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingMessageListenerOrderly.java @@ -0,0 +1,70 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import java.util.List; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.common.message.MessageExt; + +// TODO: I think we don't want to expose a custom class rather wrap in context and prove a user can +// do custom tagging via their own MessageListenerOrderly. +// Maybe expose RocketMQTracing.messageListenerOrderly() to wrap theirs or make spans default +// and not expose this. +public abstract class TracingMessageListenerOrderly implements MessageListenerOrderly { + private final long suspendCurrentQueueTimeMillis; + private final RocketMQTracing tracing; + + public TracingMessageListenerOrderly(long suspendCurrentQueueTimeMillis, + RocketMQTracing tracing) { + this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; + this.tracing = tracing; + } + + @Override + public final ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { + for (MessageExt msg : msgs) { + TracingConsumerRequest request = new TracingConsumerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.consumerExtractor, tracing.consumerSampler, + request, msg.getProperties()); + span.name(TraceConstants.FROM_PREFIX + msg.getTopic()); + + ConsumeOrderlyStatus result; + try (Tracer.SpanInScope scope = tracing.tracer().withSpanInScope(span)) { + result = handleMessage(msg, context); + } catch (Exception e) { + span.error(e); + context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); + result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } finally { + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + span.finish(timestamp); + } + + if (result != ConsumeOrderlyStatus.SUCCESS) { + return result; + } + } + + return ConsumeOrderlyStatus.SUCCESS; + } + + protected abstract ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context); +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java new file mode 100644 index 0000000000..62695d7d4c --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingProducerRequest.java @@ -0,0 +1,79 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span.Kind; +import brave.messaging.ProducerRequest; +import brave.propagation.Propagation.RemoteGetter; +import brave.propagation.Propagation.RemoteSetter; +import org.apache.rocketmq.common.message.Message; + +final class TracingProducerRequest extends ProducerRequest { + static final RemoteGetter GETTER = + new RemoteGetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public String get(TracingProducerRequest request, String name) { + return request.delegate.getUserProperty(name); + } + + @Override public String toString() { + return "Message::getUserProperty"; + } + }; + + static final RemoteSetter SETTER = + new RemoteSetter() { + @Override public Kind spanKind() { + return Kind.PRODUCER; + } + + @Override public void put(TracingProducerRequest request, String name, String value) { + request.delegate.putUserProperty(name, value); + } + + @Override public String toString() { + return "Message::putUserProperty"; + } + }; + + final Message delegate; + + TracingProducerRequest(Message delegate) { + if (delegate == null) throw new NullPointerException("delegate == null"); + this.delegate = delegate; + } + + @Override public Message unwrap() { + return delegate; + } + + @Override public String operation() { + return "send"; + } + + @Override public String channelKind() { + return "topic"; + } + + @Override public String channelName() { + return delegate.getTopic(); + } + + @Override public String messageId() { + return null; + } +} diff --git a/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java new file mode 100644 index 0000000000..a11e21a6e2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/main/java/brave/rocketmq/client/TracingSendMessageHook.java @@ -0,0 +1,76 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +final class TracingSendMessageHook implements SendMessageHook { + final RocketMQTracing tracing; + + public TracingSendMessageHook(RocketMQTracing tracing) { + this.tracing = tracing; + } + + @Override + public String hookName() { + return "SendMessageBraveHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null || context.getMessage() == null) { + return; + } + + Message msg = context.getMessage(); + TracingProducerRequest request = new TracingProducerRequest(msg); + Span span = + SpanUtil.createAndStartSpan(tracing, tracing.producerExtractor, tracing.producerSampler, + request, + msg.getProperties()); + span.name(TraceConstants.TO_PREFIX + msg.getTopic()); + span.tag(TraceConstants.ROCKETMQ_TAGS, StringUtils.getOrEmpty(msg.getTags())); + context.setMqTraceContext(span); + tracing.producerInjector.inject(span.context(), request); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMessage() == null || context.getMqTraceContext() == null) { + return; + } + + SendResult sendResult = context.getSendResult(); + Span span = (Span) context.getMqTraceContext(); + TracingProducerRequest request = new TracingProducerRequest(context.getMessage()); + + long timestamp = tracing.tracing.clock(span.context()).currentTimeMicroseconds(); + if (sendResult == null) { + if (context.getCommunicationMode() == CommunicationMode.ASYNC) { + return; + } + span.finish(timestamp); + tracing.producerInjector.inject(span.context(), request); + return; + } + + tracing.producerInjector.inject(span.context(), request); + span.finish(timestamp); + } +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java new file mode 100644 index 0000000000..7c749f322b --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/ITRocketMQTracingTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.handler.MutableSpan; +import brave.messaging.MessagingRequest; +import brave.messaging.MessagingTracing; +import brave.sampler.Sampler; +import brave.sampler.SamplerFunction; +import brave.sampler.SamplerFunctions; +import brave.test.ITRemote; +import brave.test.IntegrationTestSpanHandler; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import static org.assertj.core.api.Assertions.assertThat; + +@Tag("docker") +@Testcontainers(disabledWithoutDocker = true) +@Timeout(60) +class ITRocketMQTracingTest extends ITRemote { + static final String TOPIC_PREFIX = "JoeKerouac_Test_"; + + @Container static RocketMQContainer rocketMQ = new RocketMQContainer(); + + IntegrationTestSpanHandler producerSpanHandler = new IntegrationTestSpanHandler(); + IntegrationTestSpanHandler consumerSpanHandler = new IntegrationTestSpanHandler(); + + SamplerFunction producerSampler = SamplerFunctions.deferDecision(); + SamplerFunction consumerSampler = SamplerFunctions.deferDecision(); + + RocketMQTracing producerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("producer").clearSpanHandlers() + .addSpanHandler(producerSpanHandler).build()) + .producerSampler(r -> producerSampler.trySample(r)).build()); + + RocketMQTracing consumerTracing = + RocketMQTracing.create(MessagingTracing + .newBuilder( + tracingBuilder(Sampler.ALWAYS_SAMPLE).localServiceName("consumer").clearSpanHandlers() + .addSpanHandler(consumerSpanHandler).build()) + .consumerSampler(r -> consumerSampler.trySample(r)).build()); + + @Test void send() throws Exception { + String topic = TOPIC_PREFIX + "testSend"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSend"); + // TODO: what is this deprecated in favor of? + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.send(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendOneway() throws Exception { + String topic = TOPIC_PREFIX + "testSendOneway"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendOneway"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + producer.sendOneway(message); + + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void sendAsync() throws Exception { + String topic = TOPIC_PREFIX + "testSendAsync"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + DefaultMQProducer producer = new DefaultMQProducer("testSendAsync"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(rocketMQ.getNamesrvAddr()); + producer.start(); + CountDownLatch latch = new CountDownLatch(1); + producer.send(message, new SendCallback() { + @Override public void onSuccess(SendResult sendResult) { + latch.countDown(); + } + + @Override public void onException(Throwable e) { + + } + }); + + assertThat(latch.await(3000, TimeUnit.MILLISECONDS)).isTrue(); + producer.shutdown(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerConcurrently() throws Exception { + String topic = TOPIC_PREFIX + "tracingMessageListenerConcurrently"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerConcurrently"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerConcurrently"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerConcurrently(0, consumerTracing) { + @Override + protected ConsumeConcurrentlyStatus handleMessage(MessageExt messageExt, + ConsumeConcurrentlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void tracingMessageListenerOrderly() throws Exception { + String topic = TOPIC_PREFIX + "tracingMessageListenerOrderly"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("tracingMessageListenerOrderly"); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = + new DefaultMQPushConsumer("tracingMessageListenerOrderly"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + @Override + protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeOrderlyStatus.SUCCESS; + } + }); + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNull(); + } + + @Test void all() throws Exception { + String topic = TOPIC_PREFIX + "testAll"; + Message message = new Message(topic, "JoeKerouac", "hello".getBytes()); + String nameserverAddr = rocketMQ.getNamesrvAddr(); + DefaultMQProducer producer = new DefaultMQProducer("testAll"); + producer.getDefaultMQProducerImpl() + .registerSendMessageHook(new TracingSendMessageHook(producerTracing)); + producer.setNamesrvAddr(nameserverAddr); + producer.start(); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testAll"); + consumer.setNamesrvAddr(nameserverAddr); + consumer.subscribe(topic, "*"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + consumer.registerMessageListener(new TracingMessageListenerOrderly(0, consumerTracing) { + @Override + protected ConsumeOrderlyStatus handleMessage(MessageExt messageExt, + ConsumeOrderlyContext context) { + Span span = + Optional.ofNullable(Tracing.currentTracer()).map(Tracer::currentSpan).orElse(null); + reference.set(span); + latch.countDown(); + return ConsumeOrderlyStatus.SUCCESS; + } + }); + + producer.send(message); + consumer.start(); + + boolean flag = latch.await(3000, TimeUnit.MILLISECONDS); + + producer.shutdown(); + consumer.shutdown(); + + assertThat(flag).isTrue(); + assertThat(reference.get()).isNotNull(); + + MutableSpan producerSpan = producerSpanHandler.takeRemoteSpan(Span.Kind.PRODUCER); + assertThat(producerSpan.parentId()).isNull(); + MutableSpan consumerSpan = consumerSpanHandler.takeRemoteSpan(Span.Kind.CONSUMER); + assertThat(consumerSpan.parentId()).isNotNull(); + assertChildOf(consumerSpan, producerSpan); + } +} diff --git a/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java new file mode 100644 index 0000000000..6fb01079d2 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/java/brave/rocketmq/client/RocketMQContainer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2013-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package brave.rocketmq.client; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +final class RocketMQContainer extends GenericContainer { + static final int NAMESERVER_PORT = 9876; + static final int BROKER_PORT = 10911; + + RocketMQContainer() { + super(DockerImageName.parse("apache/rocketmq:5.1.4")); + List portBindings = new ArrayList<>(); + portBindings.add(String.format("%d:%d", NAMESERVER_PORT, NAMESERVER_PORT)); + portBindings.add(String.format("%d:%d", BROKER_PORT, BROKER_PORT)); + setPortBindings(portBindings); + + // do not publish all ports + withCreateContainerCmdModifier(cmd -> { + if (cmd.getHostConfig() != null) { + cmd.getHostConfig().withPublishAllPorts(false); + } + }); + + setCommand("sh /start.sh"); + this.waitStrategy = + Wait.forLogMessage(".*boot success.*", 1).withStartupTimeout(Duration.ofSeconds(60)); + + mount("broker.conf"); + mount("start.sh"); + } + + private void mount(String fileName) { + URL confUrl = getClass().getClassLoader().getResource(fileName); + try { + withFileSystemBind(new File(confUrl.toURI()).getAbsolutePath(), "/" + fileName, + BindMode.READ_ONLY); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public String getNamesrvAddr() { + return getHost() + ":" + NAMESERVER_PORT; + } +} diff --git a/instrumentation/rocketmq-client/src/test/resources/broker.conf b/instrumentation/rocketmq-client/src/test/resources/broker.conf new file mode 100644 index 0000000000..4948380251 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/broker.conf @@ -0,0 +1,3 @@ +brokerName=JoeKerouac-Test +brokerIP1=127.0.0.1 +autoCreateTopicEnable=true diff --git a/instrumentation/rocketmq-client/src/test/resources/log4j2.properties b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties new file mode 100644 index 0000000000..e6988c4497 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/log4j2.properties @@ -0,0 +1,9 @@ +appenders=console +appender.console.type=Console +appender.console.name=STDOUT +appender.console.layout.type=PatternLayout +appender.console.layout.pattern=%d{ABSOLUTE} %-5p [%t] %C{2} (%F:%L) - %m%n +rootLogger.level=warn +rootLogger.appenderRefs=stdout +rootLogger.appenderRef.stdout.ref=STDOUT + diff --git a/instrumentation/rocketmq-client/src/test/resources/start.sh b/instrumentation/rocketmq-client/src/test/resources/start.sh new file mode 100644 index 0000000000..0a0dba7b76 --- /dev/null +++ b/instrumentation/rocketmq-client/src/test/resources/start.sh @@ -0,0 +1,30 @@ +function check() { + for i in {1..10} + do + if grep -q "$1" $2; then + break + else + sleep 1 + fi + done +} + +cleanup() { + jps | grep -v Jps | awk '{print $1}' | xargs -I {} kill {} + exit 0 +} + +trap cleanup SIGINT SIGTERM + +sh /home/rocketmq/rocketmq-4.6.0/bin/mqnamesrv > ~/ns.log & +check "The Name Server boot success" ~/ns.log + +sh /home/rocketmq/rocketmq-4.6.0/bin/mqbroker -n 127.0.0.1:9876 -c /broker.conf > ~/broker.log & +check "boot success" ~/broker.log + +echo "boot success" + +while true +do + sleep 1 +done