From 86a95dd776d2bd0e3a0449e39446750adda6607b Mon Sep 17 00:00:00 2001 From: Stephen Riesenberg Date: Wed, 29 Jan 2020 22:00:33 -0600 Subject: [PATCH] Added ZmqTemplate and other enhancements --- README.md | 2 +- VERSIONS.md | 13 ++++ build.gradle.kts | 9 ++- .../io/insource/framework/zeromq/Channel.kt | 10 ++- .../framework/zeromq/ChannelFactory.kt | 5 +- .../zeromq/DefaultMessageConverter.kt | 2 +- .../framework/zeromq/MessageConverter.kt | 3 +- .../zeromq/SimpleMessageConverter.kt | 56 +++++++++++++++++ .../insource/framework/zeromq/ZmqTemplate.kt | 61 ++++++++++++++++++ .../framework/zeromq/TopicListenerTest.kt | 8 ++- .../framework/zeromq/ZmqTemplateTest.kt | 62 +++++++++++++++++++ src/test/resources/application.yml | 3 +- 12 files changed, 219 insertions(+), 15 deletions(-) create mode 100644 VERSIONS.md create mode 100644 src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt create mode 100644 src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt create mode 100644 src/test/kotlin/io/insource/framework/zeromq/ZmqTemplateTest.kt diff --git a/README.md b/README.md index a337707..aa42fb0 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Spring Boot enabled ZeromMQ integrations inspired by spring-rabbit and built on ## Getting Started -This project is built on Spring Boot. Add this project as dependency to your Spring Boot application to get started. +This project is built on Spring Boot. Add this project as a dependency to your Spring Boot application to get started. Maven: diff --git a/VERSIONS.md b/VERSIONS.md new file mode 100644 index 0000000..d087eed --- /dev/null +++ b/VERSIONS.md @@ -0,0 +1,13 @@ +# Release Notes + +## 0.0.1 + +* Initial release + +## 0.0.2 + +* Added `headers` parameter to `MessageConverter.toMessage()` +* Added `SimpleMessageConverter` class supporting `String` and `ByteArray` +* Added `ZmqTemplate` class with basic `send()` operations +* Deprecated `ChannelFactory` which is superceded by `ZmqTemplate` +* `Channel` now implements `Closeable` \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 2e4b891..5222925 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -77,14 +77,13 @@ dependencies { val springBootVersion = Versions.springBoot val jzmqApiVersion = Versions.jzmqApi - implementation(dependencies.platform("org.springframework.boot:spring-boot-parent:$springBootVersion")) implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") - implementation("org.springframework.boot:spring-boot-autoconfigure") - implementation("org.springframework:spring-context") + implementation("org.springframework.boot:spring-boot-autoconfigure:$springBootVersion") + implementation("org.springframework:spring-context:$springBootVersion") implementation("org.zeromq:jzmq-api:$jzmqApiVersion") - compileOnly("org.springframework.boot:spring-boot-configuration-processor") - testImplementation("org.springframework.boot:spring-boot-starter-test") + compileOnly("org.springframework.boot:spring-boot-configuration-processor:$springBootVersion") + testImplementation("org.springframework.boot:spring-boot-starter-test:$springBootVersion") } tasks.withType { diff --git a/src/main/kotlin/io/insource/framework/zeromq/Channel.kt b/src/main/kotlin/io/insource/framework/zeromq/Channel.kt index a26d887..efba0fc 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/Channel.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/Channel.kt @@ -2,6 +2,7 @@ package io.insource.framework.zeromq import org.zeromq.api.Message import org.zeromq.api.Socket +import java.io.Closeable /** * Message channel for sending messages to a topic using a routing key. @@ -9,7 +10,7 @@ import org.zeromq.api.Socket class Channel internal constructor( private val socket: Socket, private val topic: String -) { +) : Closeable { /** * Send a message to this channel's topic using a routing key. * @@ -30,4 +31,11 @@ class Channel internal constructor( .addFrames(message) socket.send(frames) } + + /** + * Close the underlying socket manually. Note: This step is optional. + */ + override fun close() { + socket.close() + } } \ No newline at end of file diff --git a/src/main/kotlin/io/insource/framework/zeromq/ChannelFactory.kt b/src/main/kotlin/io/insource/framework/zeromq/ChannelFactory.kt index f73a4c9..2318a23 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/ChannelFactory.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/ChannelFactory.kt @@ -7,12 +7,13 @@ import org.zeromq.api.SocketType /** * Factory for creating channels using socket-per-thread semantics. */ +@Deprecated("Use ZmqTemplate instead.") class ChannelFactory private constructor( private val context: Context, private val topic: String ) { /** ThreadLocal to track objects per thread. */ - private val threadLocal = ThreadLocal.withInitial { + private val channels = ThreadLocal.withInitial { Channel(context.buildSocket(SocketType.PUSH).connect("inproc://$topic"), topic) } @@ -22,7 +23,7 @@ class ChannelFactory private constructor( * @return A thread-local instance of a channel */ fun channel(): Channel { - return threadLocal.get() + return channels.get() } companion object { diff --git a/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt b/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt index 1b35f7d..efc2cb1 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt @@ -11,7 +11,7 @@ import org.zeromq.api.Message @Component @ConditionalOnMissingBean(MessageConverter::class) class DefaultMessageConverter : MessageConverter { - override fun toMessage(obj: Any): Message { + override fun toMessage(obj: Any, headers: Map): Message { return Message(obj.toString()) } diff --git a/src/main/kotlin/io/insource/framework/zeromq/MessageConverter.kt b/src/main/kotlin/io/insource/framework/zeromq/MessageConverter.kt index ec40d3a..a773091 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/MessageConverter.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/MessageConverter.kt @@ -10,9 +10,10 @@ interface MessageConverter { * Convert an object to a Message. * * @param obj The object to convert + * @param headers Message headers to go with the payload * @return The message */ - fun toMessage(obj: Any): Message + fun toMessage(obj: Any, headers: Map): Message /** * Convert from a message to an object. diff --git a/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt b/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt new file mode 100644 index 0000000..d175733 --- /dev/null +++ b/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt @@ -0,0 +1,56 @@ +package io.insource.framework.zeromq + +import org.springframework.stereotype.Component +import org.zeromq.api.Message + +/** + * Simple message converter that simply checks the type of the provided object + * for conversion to a message using the `toMessage()` method, and relies on the + * the first frame containg headers with a `content-type` header to determine + * the behavior of the `fromMessage()` method. + */ +@Component +class SimpleMessageConverter : MessageConverter { + override fun toMessage(obj: Any, headers: Map): Message { + val contentType: String + val payloadFrame = when (obj) { + is ByteArray -> { + contentType = APPLICATION_OCTET_STREAM + Message.Frame.of(obj) + } + is String -> { + contentType = TEXT_PLAIN + Message.Frame.of(obj) + } + else -> throw IllegalArgumentException("${this.javaClass.simpleName} only supports ByteArray or String types in the payload, received ${obj.javaClass.simpleName}") + } + + val updatedHeaders = HashMap(headers) + updatedHeaders["content-type"] = contentType + updatedHeaders["content-length"] = payloadFrame.size().toString() + + return Message() + .addFrame(Message.Frame.of(updatedHeaders)) + .addFrame(payloadFrame) + } + + override fun fromMessage(message: Message): Any { + val headers = message.popMap() + return when (val contentType = headers["content-type"] ?: APPLICATION_OCTET_STREAM) { + TEXT, + TEXT_PLAIN, + APPLICATION_JSON, + APPLICATION_XML -> return message.popString() + APPLICATION_OCTET_STREAM -> message.popBytes() + else -> throw IllegalArgumentException("Unsupported content-type - $contentType") + } + } + + companion object { + private val TEXT = "text" + private val TEXT_PLAIN = "text/plain" + private val APPLICATION_JSON = "application/json" + private val APPLICATION_XML = "application/xml" + private val APPLICATION_OCTET_STREAM = "application/octet-stream" + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt b/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt new file mode 100644 index 0000000..55cbe71 --- /dev/null +++ b/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt @@ -0,0 +1,61 @@ +package io.insource.framework.zeromq + +import org.zeromq.ContextFactory +import org.zeromq.api.Context +import org.zeromq.api.SocketType + +/** + * Helper class that simplifies synchronous ZeroMQ access (sending and receiving messages) + * using socket-per-thread semantics. + * + * @param context The 0MQ context + */ +class ZmqTemplate(private val context: Context) { + /** Default constructor. */ + constructor() : this(ContextFactory.context()) + + /** Default topic name. Cannot be changed once threads have begun sending messages. */ + var topic: String = "default" + + /** Default routing key, used when the routing key is not specified. */ + var routingKey: String = "" + + /** Message converter used to convert a payload to/from a 0MQ `Message`. */ + var messageConverter: MessageConverter = SimpleMessageConverter() + + /** ThreadLocal for creating channels using socket-per-thread semantics. */ + private val channels: ThreadLocal = ThreadLocal.withInitial { + Channel(context.buildSocket(SocketType.PUSH).connect("inproc://$topic"), topic) + } + + /** + * Send a message with no headers to the default topic using the default + * routing key. + * + * @param obj The message payload + */ + fun send(obj: Any) { + send(routingKey, obj) + } + + /** + * Send a message with no headers to the default topic. + * + * @param obj The message payload + */ + fun send(routingKey: String, obj: Any) { + send(routingKey, obj, mapOf()) + } + + /** + * Send a message to the default topic. + * + * @param obj The message payload + * @param headers Message headers to go with the payload + */ + fun send(routingKey: String, obj: Any, headers: Map) { + val channel = channels.get() + val message = messageConverter.toMessage(obj, headers) + channel.send(routingKey, message) + } +} \ No newline at end of file diff --git a/src/test/kotlin/io/insource/framework/zeromq/TopicListenerTest.kt b/src/test/kotlin/io/insource/framework/zeromq/TopicListenerTest.kt index df99f16..bfa2822 100644 --- a/src/test/kotlin/io/insource/framework/zeromq/TopicListenerTest.kt +++ b/src/test/kotlin/io/insource/framework/zeromq/TopicListenerTest.kt @@ -4,6 +4,7 @@ import io.insource.framework.annotation.QueueBinding import io.insource.framework.annotation.ZmqHandler import io.insource.framework.annotation.ZmqSubscriber import org.hamcrest.Matchers.hasItem +import org.hamcrest.Matchers.hasSize import org.junit.Assert.assertThat import org.junit.Before import org.junit.Test @@ -17,8 +18,8 @@ import java.util.concurrent.CountDownLatch @SpringBootTest(classes = [ZeromqTestConfiguration::class]) class TopicListenerTest { @ZmqSubscriber( - QueueBinding(topic = "test", key = "Greeting", queue = "greetings"), - QueueBinding(topic = "test", key = "Message", queue = "messages") + QueueBinding(topic = "TopicListenerTest", key = "Greeting", queue = "greetings"), + QueueBinding(topic = "TopicListenerTest", key = "Message", queue = "messages") ) class Subscriber { @ZmqHandler @@ -36,12 +37,13 @@ class TopicListenerTest { @Test fun testHello() { - val channelFactory = ChannelFactory.create("test") + val channelFactory = ChannelFactory.create("TopicListenerTest") val channel = channelFactory.channel() channel.send("Greeting", Message("Hello, World")) channel.send("Message", Message("This is a message.")) countDownLatch.await() + assertThat(messages, hasSize(2)) assertThat(messages, hasItem("Hello, World")) assertThat(messages, hasItem("This is a message.")) } diff --git a/src/test/kotlin/io/insource/framework/zeromq/ZmqTemplateTest.kt b/src/test/kotlin/io/insource/framework/zeromq/ZmqTemplateTest.kt new file mode 100644 index 0000000..2de124a --- /dev/null +++ b/src/test/kotlin/io/insource/framework/zeromq/ZmqTemplateTest.kt @@ -0,0 +1,62 @@ +package io.insource.framework.zeromq + +import io.insource.framework.annotation.QueueBinding +import io.insource.framework.annotation.ZmqHandler +import io.insource.framework.annotation.ZmqSubscriber +import org.hamcrest.Matchers.hasItem +import org.hamcrest.Matchers.hasSize +import org.junit.Assert.assertThat +import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.junit4.SpringRunner +import java.util.concurrent.CountDownLatch + +@RunWith(SpringRunner::class) +@SpringBootTest(classes = [ZeromqTestConfiguration::class]) +class ZmqTemplateTest { + @ZmqSubscriber( + QueueBinding(topic = "ZmqTemplateTest", key = "Greeting", queue = "greetings"), + QueueBinding(topic = "ZmqTemplateTest", key = "Message", queue = "messages") + ) + class Subscriber { + @ZmqHandler + fun onMessage(m: String) { + messages += m + countDownLatch.countDown() + } + } + + @Before + fun setUp() { + // Allow a little time for PUB and SUB to connect to each other + Thread.sleep(50) + } + + @Test + fun testHello() { + val zmqTemplate = ZmqTemplate().apply { + topic = "ZmqTemplateTest" + routingKey = "Message" + messageConverter = DefaultMessageConverter() + } + + // Received using default routing key of Message + zmqTemplate.send("This is a message.") + // Received using explicit routing key of Greeting + zmqTemplate.send("Greeting", "Hello, World") + // Not received using bunk routing key + zmqTemplate.send("Nothing", "This message is not received.") + + countDownLatch.await() + assertThat(messages, hasSize(2)) + assertThat(messages, hasItem("Hello, World")) + assertThat(messages, hasItem("This is a message.")) + } + + companion object { + val messages = mutableListOf() + val countDownLatch = CountDownLatch(2) + } +} \ No newline at end of file diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index eeea717..6537ec0 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -1,7 +1,8 @@ zmq: publisher: topics: - - test + - TopicListenerTest + - ZmqTemplateTest port: 52345 subscriber: port: 52345