Skip to content

Commit

Permalink
Added ZmqTemplate and other enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
sjohnr committed Jan 30, 2020
1 parent 3214d19 commit 86a95dd
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Spring Boot enabled ZeromMQ integrations inspired by spring-rabbit and built on

## Getting Started

This project is built on Spring Boot. Add this project as dependency to your Spring Boot application to get started.
This project is built on Spring Boot. Add this project as a dependency to your Spring Boot application to get started.

Maven:

Expand Down
13 changes: 13 additions & 0 deletions VERSIONS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Release Notes

## 0.0.1

* Initial release

## 0.0.2

* Added `headers` parameter to `MessageConverter.toMessage()`
* Added `SimpleMessageConverter` class supporting `String` and `ByteArray`
* Added `ZmqTemplate` class with basic `send()` operations
* Deprecated `ChannelFactory` which is superceded by `ZmqTemplate`
* `Channel` now implements `Closeable`
9 changes: 4 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,13 @@ dependencies {
val springBootVersion = Versions.springBoot
val jzmqApiVersion = Versions.jzmqApi

implementation(dependencies.platform("org.springframework.boot:spring-boot-parent:$springBootVersion"))
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.boot:spring-boot-autoconfigure")
implementation("org.springframework:spring-context")
implementation("org.springframework.boot:spring-boot-autoconfigure:$springBootVersion")
implementation("org.springframework:spring-context:$springBootVersion")
implementation("org.zeromq:jzmq-api:$jzmqApiVersion")
compileOnly("org.springframework.boot:spring-boot-configuration-processor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
compileOnly("org.springframework.boot:spring-boot-configuration-processor:$springBootVersion")
testImplementation("org.springframework.boot:spring-boot-starter-test:$springBootVersion")
}

tasks.withType<KotlinCompile> {
Expand Down
10 changes: 9 additions & 1 deletion src/main/kotlin/io/insource/framework/zeromq/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package io.insource.framework.zeromq

import org.zeromq.api.Message
import org.zeromq.api.Socket
import java.io.Closeable

/**
* Message channel for sending messages to a topic using a routing key.
*/
class Channel internal constructor(
private val socket: Socket,
private val topic: String
) {
) : Closeable {
/**
* Send a message to this channel's topic using a routing key.
*
Expand All @@ -30,4 +31,11 @@ class Channel internal constructor(
.addFrames(message)
socket.send(frames)
}

/**
* Close the underlying socket manually. Note: This step is optional.
*/
override fun close() {
socket.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import org.zeromq.api.SocketType
/**
* Factory for creating channels using socket-per-thread semantics.
*/
@Deprecated("Use ZmqTemplate instead.")
class ChannelFactory private constructor(
private val context: Context,
private val topic: String
) {
/** ThreadLocal to track objects per thread. */
private val threadLocal = ThreadLocal.withInitial {
private val channels = ThreadLocal.withInitial {
Channel(context.buildSocket(SocketType.PUSH).connect("inproc://$topic"), topic)
}

Expand All @@ -22,7 +23,7 @@ class ChannelFactory private constructor(
* @return A thread-local instance of a channel
*/
fun channel(): Channel {
return threadLocal.get()
return channels.get()
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.zeromq.api.Message
@Component
@ConditionalOnMissingBean(MessageConverter::class)
class DefaultMessageConverter : MessageConverter {
override fun toMessage(obj: Any): Message {
override fun toMessage(obj: Any, headers: Map<String, String>): Message {
return Message(obj.toString())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ interface MessageConverter {
* Convert an object to a Message.
*
* @param obj The object to convert
* @param headers Message headers to go with the payload
* @return The message
*/
fun toMessage(obj: Any): Message
fun toMessage(obj: Any, headers: Map<String, String>): Message

/**
* Convert from a message to an object.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.insource.framework.zeromq

import org.springframework.stereotype.Component
import org.zeromq.api.Message

/**
* Simple message converter that simply checks the type of the provided object
* for conversion to a message using the `toMessage()` method, and relies on the
* the first frame containg headers with a `content-type` header to determine
* the behavior of the `fromMessage()` method.
*/
@Component
class SimpleMessageConverter : MessageConverter {
override fun toMessage(obj: Any, headers: Map<String, String>): Message {
val contentType: String
val payloadFrame = when (obj) {
is ByteArray -> {
contentType = APPLICATION_OCTET_STREAM
Message.Frame.of(obj)
}
is String -> {
contentType = TEXT_PLAIN
Message.Frame.of(obj)
}
else -> throw IllegalArgumentException("${this.javaClass.simpleName} only supports ByteArray or String types in the payload, received ${obj.javaClass.simpleName}")
}

val updatedHeaders = HashMap(headers)
updatedHeaders["content-type"] = contentType
updatedHeaders["content-length"] = payloadFrame.size().toString()

return Message()
.addFrame(Message.Frame.of(updatedHeaders))
.addFrame(payloadFrame)
}

override fun fromMessage(message: Message): Any {
val headers = message.popMap()
return when (val contentType = headers["content-type"] ?: APPLICATION_OCTET_STREAM) {
TEXT,
TEXT_PLAIN,
APPLICATION_JSON,
APPLICATION_XML -> return message.popString()
APPLICATION_OCTET_STREAM -> message.popBytes()
else -> throw IllegalArgumentException("Unsupported content-type - $contentType")
}
}

companion object {
private val TEXT = "text"
private val TEXT_PLAIN = "text/plain"
private val APPLICATION_JSON = "application/json"
private val APPLICATION_XML = "application/xml"
private val APPLICATION_OCTET_STREAM = "application/octet-stream"
}
}
61 changes: 61 additions & 0 deletions src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.insource.framework.zeromq

import org.zeromq.ContextFactory
import org.zeromq.api.Context
import org.zeromq.api.SocketType

/**
* Helper class that simplifies synchronous ZeroMQ access (sending and receiving messages)
* using socket-per-thread semantics.
*
* @param context The 0MQ context
*/
class ZmqTemplate(private val context: Context) {
/** Default constructor. */
constructor() : this(ContextFactory.context())

/** Default topic name. Cannot be changed once threads have begun sending messages. */
var topic: String = "default"

/** Default routing key, used when the routing key is not specified. */
var routingKey: String = ""

/** Message converter used to convert a payload to/from a 0MQ `Message`. */
var messageConverter: MessageConverter = SimpleMessageConverter()

/** ThreadLocal for creating channels using socket-per-thread semantics. */
private val channels: ThreadLocal<Channel> = ThreadLocal.withInitial {
Channel(context.buildSocket(SocketType.PUSH).connect("inproc://$topic"), topic)
}

/**
* Send a message with no headers to the default topic using the default
* routing key.
*
* @param obj The message payload
*/
fun send(obj: Any) {
send(routingKey, obj)
}

/**
* Send a message with no headers to the default topic.
*
* @param obj The message payload
*/
fun send(routingKey: String, obj: Any) {
send(routingKey, obj, mapOf())
}

/**
* Send a message to the default topic.
*
* @param obj The message payload
* @param headers Message headers to go with the payload
*/
fun send(routingKey: String, obj: Any, headers: Map<String, String>) {
val channel = channels.get()
val message = messageConverter.toMessage(obj, headers)
channel.send(routingKey, message)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.insource.framework.annotation.QueueBinding
import io.insource.framework.annotation.ZmqHandler
import io.insource.framework.annotation.ZmqSubscriber
import org.hamcrest.Matchers.hasItem
import org.hamcrest.Matchers.hasSize
import org.junit.Assert.assertThat
import org.junit.Before
import org.junit.Test
Expand All @@ -17,8 +18,8 @@ import java.util.concurrent.CountDownLatch
@SpringBootTest(classes = [ZeromqTestConfiguration::class])
class TopicListenerTest {
@ZmqSubscriber(
QueueBinding(topic = "test", key = "Greeting", queue = "greetings"),
QueueBinding(topic = "test", key = "Message", queue = "messages")
QueueBinding(topic = "TopicListenerTest", key = "Greeting", queue = "greetings"),
QueueBinding(topic = "TopicListenerTest", key = "Message", queue = "messages")
)
class Subscriber {
@ZmqHandler
Expand All @@ -36,12 +37,13 @@ class TopicListenerTest {

@Test
fun testHello() {
val channelFactory = ChannelFactory.create("test")
val channelFactory = ChannelFactory.create("TopicListenerTest")
val channel = channelFactory.channel()
channel.send("Greeting", Message("Hello, World"))
channel.send("Message", Message("This is a message."))

countDownLatch.await()
assertThat(messages, hasSize(2))
assertThat(messages, hasItem("Hello, World"))
assertThat(messages, hasItem("This is a message."))
}
Expand Down
62 changes: 62 additions & 0 deletions src/test/kotlin/io/insource/framework/zeromq/ZmqTemplateTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.insource.framework.zeromq

import io.insource.framework.annotation.QueueBinding
import io.insource.framework.annotation.ZmqHandler
import io.insource.framework.annotation.ZmqSubscriber
import org.hamcrest.Matchers.hasItem
import org.hamcrest.Matchers.hasSize
import org.junit.Assert.assertThat
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.junit4.SpringRunner
import java.util.concurrent.CountDownLatch

@RunWith(SpringRunner::class)
@SpringBootTest(classes = [ZeromqTestConfiguration::class])
class ZmqTemplateTest {
@ZmqSubscriber(
QueueBinding(topic = "ZmqTemplateTest", key = "Greeting", queue = "greetings"),
QueueBinding(topic = "ZmqTemplateTest", key = "Message", queue = "messages")
)
class Subscriber {
@ZmqHandler
fun onMessage(m: String) {
messages += m
countDownLatch.countDown()
}
}

@Before
fun setUp() {
// Allow a little time for PUB and SUB to connect to each other
Thread.sleep(50)
}

@Test
fun testHello() {
val zmqTemplate = ZmqTemplate().apply {
topic = "ZmqTemplateTest"
routingKey = "Message"
messageConverter = DefaultMessageConverter()
}

// Received using default routing key of Message
zmqTemplate.send("This is a message.")
// Received using explicit routing key of Greeting
zmqTemplate.send("Greeting", "Hello, World")
// Not received using bunk routing key
zmqTemplate.send("Nothing", "This message is not received.")

countDownLatch.await()
assertThat(messages, hasSize(2))
assertThat(messages, hasItem("Hello, World"))
assertThat(messages, hasItem("This is a message."))
}

companion object {
val messages = mutableListOf<String>()
val countDownLatch = CountDownLatch(2)
}
}
3 changes: 2 additions & 1 deletion src/test/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
zmq:
publisher:
topics:
- test
- TopicListenerTest
- ZmqTemplateTest
port: 52345
subscriber:
port: 52345
Expand Down

0 comments on commit 86a95dd

Please sign in to comment.