From 42188709ba190aae21609ebe906b21e7ed1b8aae Mon Sep 17 00:00:00 2001 From: Stephen Riesenberg Date: Wed, 29 Jan 2020 22:33:46 -0600 Subject: [PATCH] Removed DefaultMessageConverter and added docs to README --- README.md | 132 +++++++++++++++--- VERSIONS.md | 1 + .../zeromq/DefaultMessageConverter.kt | 21 --- .../zeromq/SimpleMessageConverter.kt | 2 + .../insource/framework/zeromq/ZmqTemplate.kt | 5 +- 5 files changed, 117 insertions(+), 44 deletions(-) delete mode 100644 src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt diff --git a/README.md b/README.md index aa42fb0..7915dfe 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Maven: io.insource in-spring-zeromq - 0.0.1 + 0.0.2 ``` @@ -35,19 +35,49 @@ public class MyApplication { } ``` -To publish messages, use a `ChannelFactory` to create `ThreadLocal` instances of a `Channel`. For example: +To publish messages, use a `ZmqTemplate`. For example: ```java -ChannelFactory channelFactory = ChannelFactory.create("events"); // Use topic "events" -Channel channel = channelFactory.channel(); // Get a thread-safe channel -channel.send("MyEvent", new Message("my event")); // Send a message using routing key "MyEvent" +ZmqTemplate zmqTemplate = new ZmqTemplate(); +zmqTemplate.setTopic("events"); // Use topic "events" +zmqTemplate.setRoutingKey("MyEvent"); // Use routing key "MyEvent" by default +zmqTemplate.setMessageConverter(new VerySimpleMessageConverter()); // Use custom MessageConverter + +// Send using defaults +zmqTemplate.send("This is a test."); +``` + +This publishes a message with the string `This is a test.` to the `events` topic using a routing key of `MyEvent`. Other flavors of `send()` look like this: + +```java +// Send using custom routing key +zmqTemplate.send("MyRoutingKey", "This is another test."); + +// Send using custom routing key and headers +Map headers = new HashMap<>(); +headers.put("my-header", "TEST"); +zmqTemplate.send("MyRoutingKey", headers, "This is yet another test."); ``` -This publishes a message with the string `my event` to the `events` topic using a routing key of `MyEvent`. +Serialization to and from bytes is done using a `MessageConverter`. Here is the definition of a very simple `MessageConverter`: + +```java +public class VerySimpleMessageConverter implements MessageConverter { + @Override + public Message toMessage(Object obj, Map headers) { + return new Message(obj.toString()); + } + + @Override + public Object fromMessage(Message message) { + return message.popString(); + } +} +``` -**Note:** Serialization on the publishing side will be added in a future release. Currently, you must use the `org.zeromq.api.Message` class to send messages. +**Note:** This example does not consider headers. The bundled class `SimpleMessageConverter` does consider headers, which changes the data format over the wire considerably. Both sides must use the same `MessageConverter` or risk incompatible (de)serialization. -This message, and all messages sent on any thread within this JVM will be sent to a local proxy to be forwarded to a remote host. To configure the proxy to forward messages for this topic, use the following configuration: +Messages sent on any thread within the JVM will be sent to a local proxy to be forwarded to a remote host. To configure the proxy to forward messages for this topic, use the following configuration: application.yml: @@ -111,19 +141,7 @@ import org.zeromq.api.Message; public class MyConfiguration { @Bean public MessageConverter messageConverter() { - return new MyMessageConverter(); - } - - public static class MyMessageConverter implements MessageConverter { - @Override - public Object fromMessage(Message message) { - return message.popString(); - } - - @Override - public Message toMessage(Object obj) { - return new Message(obj.toString()); - } + return new VerySimpleMessageConverter(); } } ``` @@ -150,6 +168,78 @@ public class MySubscriber { **Note:** More flexible method signatures for `@ZmqHandler` will be added in a future release. +### Listener + +This library also includes a basic socket listener abstraction, for times when pub/sub breaks down, and raw 0MQ messaging is required. Instead of forcing the listener to adopt a different programming model, the `@ZmqListener` annotation can be used. + +To use the listener, add the `@EnableZmqListener` annotation to your Spring Boot application: + +```java +@SpringBootApplication +@EnableZmqListener +public class MyApplication { + public static void main(String[] args) { + SpringApplication.run(MyApplication.class, args); + } +} +``` + +To create a listener, add the `@ZmqListener` annotation to a class that implements `MessageListener`: + +```java +import io.insource.framework.annotation.QueueBinding; +import io.insource.framework.annotation.ZmqSubscriber; +import io.insource.framework.zeromq.MessageListener; +import org.zeromq.api.Message; + +@ZmqListener(1337) +public class MyListener implements MessageListener { + @Override + public void onMessage(Message message) { + // Do something with message + System.out.println(message.popString()); + } +} +``` + +This will create a PULL socket listening on port 1337 with its own inbox. The inbox can hold up to 1000 messages at a time. Additional messages will cause the sender to block. + +Each inbox will be managed by a thread dedicated to dispatching messages to `MyListener`. + +Alternatively, you can register a `MessageConverter` in your Spring `@Configuration` to deserialize messages into Java objects: + +```java +import io.insource.framework.zeromq.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.zeromq.api.Message; + +@Configuration +public class MyConfiguration { + @Bean + public MessageConverter messageConverter() { + return new VerySimpleMessageConverter(); + } +} +``` + +Then simply add a method to your `@ZmqListener` with the `@ZmqHandler` annotation. Make sure it accepts exactly the type returned by your `MessageConverter`: + +```java +import io.insource.framework.annotation.QueueBinding; +import io.insource.framework.annotation.ZmqHandler; +import io.insource.framework.annotation.ZmqSubscriber; + +@ZmqListener(1337) +public class MyListener { + @ZmqHandler + public void onMyEvent(String message) { + // Do something with message + System.out.println(message); + } +} +``` + ## Contributing To build this project, use the provided gradle build script. diff --git a/VERSIONS.md b/VERSIONS.md index d087eed..ce07f49 100644 --- a/VERSIONS.md +++ b/VERSIONS.md @@ -8,6 +8,7 @@ * Added `headers` parameter to `MessageConverter.toMessage()` * Added `SimpleMessageConverter` class supporting `String` and `ByteArray` +* Removed `DefaultMessageConverter` in favor of `SimpleMessageConverter` * Added `ZmqTemplate` class with basic `send()` operations * Deprecated `ChannelFactory` which is superceded by `ZmqTemplate` * `Channel` now implements `Closeable` \ No newline at end of file diff --git a/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt b/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt deleted file mode 100644 index efc2cb1..0000000 --- a/src/main/kotlin/io/insource/framework/zeromq/DefaultMessageConverter.kt +++ /dev/null @@ -1,21 +0,0 @@ -package io.insource.framework.zeromq - -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean -import org.springframework.stereotype.Component -import org.zeromq.api.Message - -/** - * Default message converter that simply converts messages to/from a string. - * Useful in demonstrations and hello world applications. - */ -@Component -@ConditionalOnMissingBean(MessageConverter::class) -class DefaultMessageConverter : MessageConverter { - override fun toMessage(obj: Any, headers: Map): Message { - return Message(obj.toString()) - } - - override fun fromMessage(message: Message): Any { - return message.popString() - } -} \ No newline at end of file diff --git a/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt b/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt index d175733..b2276ea 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/SimpleMessageConverter.kt @@ -1,5 +1,6 @@ package io.insource.framework.zeromq +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.stereotype.Component import org.zeromq.api.Message @@ -10,6 +11,7 @@ import org.zeromq.api.Message * the behavior of the `fromMessage()` method. */ @Component +@ConditionalOnMissingBean(MessageConverter::class) class SimpleMessageConverter : MessageConverter { override fun toMessage(obj: Any, headers: Map): Message { val contentType: String diff --git a/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt b/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt index 55cbe71..de98985 100644 --- a/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt +++ b/src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt @@ -44,16 +44,17 @@ class ZmqTemplate(private val context: Context) { * @param obj The message payload */ fun send(routingKey: String, obj: Any) { - send(routingKey, obj, mapOf()) + send(routingKey, mapOf(), obj) } /** * Send a message to the default topic. * + * @param routingKey The routing key * @param obj The message payload * @param headers Message headers to go with the payload */ - fun send(routingKey: String, obj: Any, headers: Map) { + fun send(routingKey: String, headers: Map, obj: Any) { val channel = channels.get() val message = messageConverter.toMessage(obj, headers) channel.send(routingKey, message)