Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/2.7.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Aug 22, 2023
2 parents 00ce9e7 + eaf9eab commit 95368dd
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 43 deletions.
11 changes: 11 additions & 0 deletions mica-nats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,14 @@

nats 封装,更改方便 Spring boot 下使用。

## nats Streaming 概念

在 NATS Streaming 中,`subject``queue``stream` 是三个相关但具有不同概念和用途的术语。

- **Subject(主题)**`subject` 是指发布者将消息发送到的特定主题名称,也可以认为是消息的目标地址。订阅者可以通过订阅与特定主题相关联的消息来接收该主题下的消息。

- **Queue(队列)**`queue` 是用于在多个订阅者之间进行负载均衡和竞争消费的机制。当多个订阅者订阅相同的主题时,消息将被发送到队列中,然后只有一个订阅者(随机选择)能够接收和处理该条消息。这种方式确保了消息被平均地分配给订阅者,避免重复处理。

- **Stream(消息流)**`stream` 是NATS Streaming中用于组织和存储消息的逻辑实体。它是由一个字符串标识符(也称为stream name)表示的,并用于发布和订阅相关的消息。一个消息流可以包含多个主题,而每个主题下可以有不同的订阅者。

总结来说,`subject` 是发布和订阅消息的目标名称,`queue` 是用于负载均衡和竞争消费的机制,而 `stream` 是用于组织和存储消息的逻辑实体。它们在NATS Streaming中各自扮演不同的角色和功能。
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package net.dreamlu.mica.nats.annotation;

import io.nats.client.Consumer;

import java.lang.annotation.*;

/**
Expand All @@ -29,27 +31,67 @@
@Retention(RetentionPolicy.RUNTIME)
public @interface NatsStreamListener {

/**
* 主题 subject
*
* @return subject
*/
String value();

/**
* 队列
* @return 队列名称
*/
String queue() default "";

/**
* 交付主题
*/
String deliverSubject() default "";

/**
* 交付组
*/
String deliverGroup() default "";
/**
* 主题 subject
*
* @return subject
*/
String value();

/**
* 队列
*
* @return 队列名称
*/
String queue() default "";

/**
* Stream(消息流)
*
* @return Stream name
*/
String stream() default "";

/**
* 自动 ack
*
* @return 是否自动 ack
*/
boolean autoAck() default false;

/**
* 是否按顺序消费
*
* @return 顺序消费
*/
boolean ordered() default false;

/**
* 交付主题
*
* @return 交付主题
*/
String deliverSubject() default "";

/**
* 交付组
*
* @return 交付组
*/
String deliverGroup() default "";

/**
* 设置非调度推送订阅在内部(挂起)消息队列中所能容纳的最大消息数量。
*
* @return 最大消息数量
*/
long pendingMessageLimit() default Consumer.DEFAULT_MAX_BYTES;

/**
* 设置非调度推送订阅在内部(挂起)消息队列中所能容纳的最大字节数。
*
* @return 最大字节数
*/
long pendingByteLimit() default Consumer.DEFAULT_MAX_BYTES;

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public class NatsConfiguration {

@Bean
public Options natsOptions(NatsProperties properties,
ObjectProvider<ConnectionListener> connectionListenerObjectProvider) {
Options.Builder builder = new Options.Builder()
ObjectProvider<ConnectionListener> connectionListenerObjectProvider,
ObjectProvider<NatsCustomizer> natsCustomizerObjectProvider) {
final Options.Builder builder = new Options.Builder()
.server(properties.getServer())
.connectionName(properties.getConnectionName())
.maxReconnects(properties.getMaxReconnect())
Expand All @@ -80,13 +81,13 @@ public Options natsOptions(NatsProperties properties,
String username = properties.getUsername();
String password = properties.getPassword();
if (StringUtils.hasText(nKey)) {
builder = builder.authHandler(Nats.staticCredentials(null, nKey.toCharArray()));
builder.authHandler(Nats.staticCredentials(null, nKey.toCharArray()));
} else if (StringUtils.hasText(credentials)) {
builder = builder.authHandler(Nats.credentials(credentials));
builder.authHandler(Nats.credentials(credentials));
} else if (StringUtils.hasText(token)) {
builder = builder.token(token.toCharArray());
builder.token(token.toCharArray());
} else if (StringUtils.hasText(username) && StringUtils.hasText(password)) {
builder = builder.userInfo(username, password);
builder.userInfo(username, password);
}
// ssl 证书信息
String keyStorePath = properties.getKeyStorePath();
Expand All @@ -96,6 +97,8 @@ public Options natsOptions(NatsProperties properties,
}
// 设置 nats 连接监听器
connectionListenerObjectProvider.ifAvailable(builder::connectionListener);
// 用户自定义配置
natsCustomizerObjectProvider.orderedStream().forEach(natsOptionsCustomizer -> natsOptionsCustomizer.customize(builder));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import io.nats.client.Options;

/**
* nats Options Builder 自定义配置
*
* @author L.cm
*/
@FunctionalInterface
public interface NatsCustomizer {

/**
* Customize the Options Builder.
*
* @param builder the Options Builder to customize
*/
void customize(Options.Builder builder);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import io.nats.client.*;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.mica.nats.core.NatsStreamListenerDetector;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;

import java.io.IOException;

/**
* nats 配置
*
* @author L.cm
*/
@Slf4j
@AutoConfiguration(after = NatsConfiguration.class)
@ConditionalOnClass(Options.class)
public class NatsStreamConfiguration {

@Bean
public JetStream natsJetStream(Connection natsConnection,
NatsStreamProperties properties,
ObjectProvider<NatsStreamCustomizer> natsStreamCustomizerObjectProvider)
throws IOException, JetStreamApiException {
StreamConfiguration.Builder streamConfigurationBuilder = StreamConfiguration.builder()
.name(properties.getName())
.description(properties.getDescription())
.subjects(properties.getSubjects())
.retentionPolicy(properties.getRetentionPolicy())
.maxConsumers(properties.getMaxConsumers())
.maxMessages(properties.getMaxMsgs())
.maxMessagesPerSubject(properties.getMaxMsgsPerSubject())
.maxBytes(properties.getMaxBytes())
.maxAge(properties.getMaxAge())
.maxMsgSize(properties.getMaxMsgSize())
.storageType(properties.getStorageType())
.replicas(properties.getReplicas())
.noAck(properties.isNoAck())
.templateOwner(properties.getTemplateOwner())
.discardPolicy(properties.getDiscardPolicy())
.discardNewPerSubject(properties.isDiscardNewPerSubject())
.duplicateWindow(properties.getDuplicateWindow())
.allowRollup(properties.isAllowRollup())
.allowDirect(properties.isAllowDirect())
.denyDelete(properties.isDenyDelete())
.denyPurge(properties.isDenyPurge())
.metadata(properties.getMetadata());
// 是否已封存
if (properties.isSealed()) {
streamConfigurationBuilder.seal();
}
// 用户自定义配置
natsStreamCustomizerObjectProvider.orderedStream().forEach(natsOptionsCustomizer -> natsOptionsCustomizer.customize(streamConfigurationBuilder));
// stream 配置
StreamConfiguration streamConfiguration = streamConfigurationBuilder.build();
// stream 流管理器
JetStreamManagement jsm = natsConnection.jetStreamManagement();
StreamInfo streamInfo = jsm.addStream(streamConfiguration);
// 打印 stream 信息
log.info(JsonUtils.getFormatted(streamInfo));
return natsConnection.jetStream();
}

@Bean
public NatsStreamListenerDetector natsStreamListenerDetector(Connection natsConnection,
JetStream natsJetStream) {
return new NatsStreamListenerDetector(natsConnection, natsJetStream);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import io.nats.client.api.StreamConfiguration;

/**
* nats stream StreamConfiguration 自定义配置
*
* @author L.cm
*/
@FunctionalInterface
public interface NatsStreamCustomizer {

/**
* Customize the StreamConfiguration.
*
* @param builder the StreamConfiguration Builder configuration to customize
*/
void customize(StreamConfiguration.Builder builder);

}
Loading

0 comments on commit 95368dd

Please sign in to comment.