Skip to content

Commit

Permalink
Removed DefaultMessageConverter and added docs to README
Browse files Browse the repository at this point in the history
  • Loading branch information
sjohnr committed Jan 30, 2020
1 parent 86a95dd commit 4218870
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 44 deletions.
132 changes: 111 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Maven:
<dependency>
<groupId>io.insource</groupId>
<artifactId>in-spring-zeromq</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
</dependency>
```

Expand All @@ -35,19 +35,49 @@ public class MyApplication {
}
```

To publish messages, use a `ChannelFactory` to create `ThreadLocal` instances of a `Channel`. For example:
To publish messages, use a `ZmqTemplate`. For example:

```java
ChannelFactory channelFactory = ChannelFactory.create("events"); // Use topic "events"
Channel channel = channelFactory.channel(); // Get a thread-safe channel
channel.send("MyEvent", new Message("my event")); // Send a message using routing key "MyEvent"
ZmqTemplate zmqTemplate = new ZmqTemplate();
zmqTemplate.setTopic("events"); // Use topic "events"
zmqTemplate.setRoutingKey("MyEvent"); // Use routing key "MyEvent" by default
zmqTemplate.setMessageConverter(new VerySimpleMessageConverter()); // Use custom MessageConverter

// Send using defaults
zmqTemplate.send("This is a test.");
```

This publishes a message with the string `This is a test.` to the `events` topic using a routing key of `MyEvent`. Other flavors of `send()` look like this:

```java
// Send using custom routing key
zmqTemplate.send("MyRoutingKey", "This is another test.");

// Send using custom routing key and headers
Map<String, String> headers = new HashMap<>();
headers.put("my-header", "TEST");
zmqTemplate.send("MyRoutingKey", headers, "This is yet another test.");
```

This publishes a message with the string `my event` to the `events` topic using a routing key of `MyEvent`.
Serialization to and from bytes is done using a `MessageConverter`. Here is the definition of a very simple `MessageConverter`:

```java
public class VerySimpleMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object obj, Map<String, String> headers) {
return new Message(obj.toString());
}

@Override
public Object fromMessage(Message message) {
return message.popString();
}
}
```

**Note:** Serialization on the publishing side will be added in a future release. Currently, you must use the `org.zeromq.api.Message` class to send messages.
**Note:** This example does not consider headers. The bundled class `SimpleMessageConverter` does consider headers, which changes the data format over the wire considerably. Both sides must use the same `MessageConverter` or risk incompatible (de)serialization.

This message, and all messages sent on any thread within this JVM will be sent to a local proxy to be forwarded to a remote host. To configure the proxy to forward messages for this topic, use the following configuration:
Messages sent on any thread within the JVM will be sent to a local proxy to be forwarded to a remote host. To configure the proxy to forward messages for this topic, use the following configuration:

application.yml:

Expand Down Expand Up @@ -111,19 +141,7 @@ import org.zeromq.api.Message;
public class MyConfiguration {
@Bean
public MessageConverter messageConverter() {
return new MyMessageConverter();
}
public static class MyMessageConverter implements MessageConverter {
@Override
public Object fromMessage(Message message) {
return message.popString();
}
@Override
public Message toMessage(Object obj) {
return new Message(obj.toString());
}
return new VerySimpleMessageConverter();
}
}
```
Expand All @@ -150,6 +168,78 @@ public class MySubscriber {

**Note:** More flexible method signatures for `@ZmqHandler` will be added in a future release.

### Listener

This library also includes a basic socket listener abstraction, for times when pub/sub breaks down, and raw 0MQ messaging is required. Instead of forcing the listener to adopt a different programming model, the `@ZmqListener` annotation can be used.

To use the listener, add the `@EnableZmqListener` annotation to your Spring Boot application:

```java
@SpringBootApplication
@EnableZmqListener
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
```

To create a listener, add the `@ZmqListener` annotation to a class that implements `MessageListener`:

```java
import io.insource.framework.annotation.QueueBinding;
import io.insource.framework.annotation.ZmqSubscriber;
import io.insource.framework.zeromq.MessageListener;
import org.zeromq.api.Message;
@ZmqListener(1337)
public class MyListener implements MessageListener {
@Override
public void onMessage(Message message) {
// Do something with message
System.out.println(message.popString());
}
}
```

This will create a PULL socket listening on port 1337 with its own inbox. The inbox can hold up to 1000 messages at a time. Additional messages will cause the sender to block.

Each inbox will be managed by a thread dedicated to dispatching messages to `MyListener`.

Alternatively, you can register a `MessageConverter` in your Spring `@Configuration` to deserialize messages into Java objects:

```java
import io.insource.framework.zeromq.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.zeromq.api.Message;
@Configuration
public class MyConfiguration {
@Bean
public MessageConverter messageConverter() {
return new VerySimpleMessageConverter();
}
}
```

Then simply add a method to your `@ZmqListener` with the `@ZmqHandler` annotation. Make sure it accepts exactly the type returned by your `MessageConverter`:

```java
import io.insource.framework.annotation.QueueBinding;
import io.insource.framework.annotation.ZmqHandler;
import io.insource.framework.annotation.ZmqSubscriber;
@ZmqListener(1337)
public class MyListener {
@ZmqHandler
public void onMyEvent(String message) {
// Do something with message
System.out.println(message);
}
}
```

## Contributing

To build this project, use the provided gradle build script.
Expand Down
1 change: 1 addition & 0 deletions VERSIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

* Added `headers` parameter to `MessageConverter.toMessage()`
* Added `SimpleMessageConverter` class supporting `String` and `ByteArray`
* Removed `DefaultMessageConverter` in favor of `SimpleMessageConverter`
* Added `ZmqTemplate` class with basic `send()` operations
* Deprecated `ChannelFactory` which is superceded by `ZmqTemplate`
* `Channel` now implements `Closeable`

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.insource.framework.zeromq

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.stereotype.Component
import org.zeromq.api.Message

Expand All @@ -10,6 +11,7 @@ import org.zeromq.api.Message
* the behavior of the `fromMessage()` method.
*/
@Component
@ConditionalOnMissingBean(MessageConverter::class)
class SimpleMessageConverter : MessageConverter {
override fun toMessage(obj: Any, headers: Map<String, String>): Message {
val contentType: String
Expand Down
5 changes: 3 additions & 2 deletions src/main/kotlin/io/insource/framework/zeromq/ZmqTemplate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,17 @@ class ZmqTemplate(private val context: Context) {
* @param obj The message payload
*/
fun send(routingKey: String, obj: Any) {
send(routingKey, obj, mapOf())
send(routingKey, mapOf(), obj)
}

/**
* Send a message to the default topic.
*
* @param routingKey The routing key
* @param obj The message payload
* @param headers Message headers to go with the payload
*/
fun send(routingKey: String, obj: Any, headers: Map<String, String>) {
fun send(routingKey: String, headers: Map<String, String>, obj: Any) {
val channel = channels.get()
val message = messageConverter.toMessage(obj, headers)
channel.send(routingKey, message)
Expand Down

0 comments on commit 4218870

Please sign in to comment.