Skip to content

Commit

Permalink
✨ 添加 mica-nats 模块,待完善
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Aug 17, 2023
1 parent efd1f17 commit c11693a
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 0 deletions.
4 changes: 4 additions & 0 deletions mica-nats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## mica-nats(高性能、轻量级消息队列)

nats 封装,更改方便 Spring boot 下使用。

7 changes: 7 additions & 0 deletions mica-nats/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dependencies {
api "io.nats:jnats:2.16.13"
implementation "org.springframework.boot:spring-boot-starter"
compileOnly "org.springframework.cloud:spring-cloud-context"
compileOnly "net.dreamlu:mica-auto:${micaAutoVersion}"
annotationProcessor "net.dreamlu:mica-auto:${micaAutoVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.annotation;

import java.lang.annotation.*;

/**
* nats 监听器
*
* @author L.cm
*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NatsListener {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.annotation;

import java.lang.annotation.*;

/**
* nats stream 监听器注解
*
* @author L.cm
*/
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NatsStreamListener {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
* nats 配置
*
* @author L.cm
*/
@AutoConfiguration
@EnableConfigurationProperties
public class NatsConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;

/**
* nats 配置
*
* @author L.cm
*/
@Getter
@Setter
@RefreshScope
@ConfigurationProperties(NatsProperties.PREFIX)
public class NatsProperties {
public static final String PREFIX = "nats";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.config;

import org.springframework.boot.autoconfigure.AutoConfiguration;

/**
* nats stream 配置
*
* @author L.cm
*/
@AutoConfiguration
public class NatsStreamConfiguration {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.core;

/**
* nats stream Template
*
* @author L.cm
*/
public interface NatsStreamTemplate {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2019-2029, Dreamlu 卢春梦 ([email protected] & www.dreamlu.net).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.dreamlu.mica.nats.core;

/**
* nats Template
*
* @author L.cm
*/
public interface NatsTemplate {
}
36 changes: 36 additions & 0 deletions mica-nats/src/test/java/net/dreamlu/mica/nats/NatsExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package net.dreamlu.mica.nats;

import io.nats.client.*;

import java.util.Timer;
import java.util.TimerTask;

/**
* nats 测试
*/
public class NatsExample {

public static void main(String[] args) {
Options options = new Options.Builder()
.server(Options.DEFAULT_URL)
.build();
try {
Connection nc = Nats.connect(options);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
nc.publish("subject", "Hello, NATS!".getBytes());
}
}, 3000, 3000);
Dispatcher dispatcher = nc.createDispatcher(msg -> {
byte[] bytes = msg.getData();
System.out.println(new String(bytes));
});
dispatcher.subscribe("subject");
Thread.sleep(10000L);
} catch (Exception e) {
e.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package net.dreamlu.mica.nats;

import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.JsonUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class PubWildcardSubWildcard {

private static final String defaultStream = "pubsubwildcardasync-stream";
private static final String defaultSubjectWildcard = "audit.us.*";
private static final String defaultSubjectSpecific = "audit.us.east";
private static final String defaultMessage = "Audit User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";

public static void main(String[] args) {
System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);
try (Connection nc = Nats.connect(defaultServer)) {
JetStreamManagement jsm = nc.jetStreamManagement();

StreamConfiguration sc = StreamConfiguration.builder()
.name(defaultStream)
.storageType(StorageType.Memory)
.subjects(defaultSubjectWildcard)
.build();

StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);

JetStream js = nc.jetStream();

List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
String data = defaultMessage + "-" + x;

Message msg = NatsMessage.builder()
.subject(defaultSubjectSpecific)
.data(data, StandardCharsets.UTF_8)
.build();
System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);

futures.add(js.publishAsync(msg));
}

while (futures.size() > 0) {
CompletableFuture<PublishAck> f = futures.remove(0);
if (f.isDone()) {
try {
PublishAck pa = f.get();
System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
} catch (ExecutionException ee) {
System.out.println("Publish Failed " + ee);
}
} else {
futures.add(f);
}
}

JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
List<Message> messages = new ArrayList<>();
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
if (first) {
first = false;
System.out.print("Read/Ack ->");
}
messages.add(msg);
if (msg.isJetStream()) {
msg.ack();
System.out.print(" " + new String(msg.getData()) + "\n");
} else if (msg.isStatusMessage()) {
System.out.print(" !" + msg.getStatus().getCode() + "!");
}
JsonUtils.printFormatted(msg.metaData());
msg = sub.nextMessage(Duration.ofSeconds(1));
}
nc.flush(Duration.ZERO);
} catch (Exception e) {
e.printStackTrace();
}
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ include "mica-jetcache"
include "mica-lite"
include "mica-activerecord"
include "mica-prometheus"
include 'mica-nats'

0 comments on commit c11693a

Please sign in to comment.