Skip to content

Commit

Permalink
Add support for dynamic write in MqttIO (#32470)
Browse files Browse the repository at this point in the history
* add support for dynamic write in MqttIO

* Update CHANGES.md

* add some assertions in testDynamicWrite

* remove whitespace in CHANGES.md

* refactor duplicated Write transform

* change WriteFn to use Write spec
  • Loading branch information
twosom authored Oct 1, 2024
1 parent 301286f commit c8c674e
Show file tree
Hide file tree
Showing 3 changed files with 332 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
* Prism
* Prism now supports Bundle Finalization. ([#32425](https://github.com/apache/beam/pull/32425))
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

Expand Down
139 changes: 110 additions & 29 deletions sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -99,6 +101,26 @@
* "my_topic"))
*
* }</pre>
*
* <h3>Dynamic Writing to a MQTT Broker</h3>
*
* <p>MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a
* function to determine the target topic for each message. The following example demonstrates how
* to configure dynamic topic writing:
*
* <pre>{@code
* pipeline
* .apply(...) // Provide PCollection<InputT>
* .apply(
* MqttIO.<InputT>dynamicWrite()
* .withConnectionConfiguration(
* MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
* .withTopicFn(<Function to determine the topic dynamically>)
* .withPayloadFn(<Function to extract the payload>));
* }</pre>
*
* <p>This dynamic writing capability allows for more flexible MQTT message routing based on the
* message content, enabling scenarios where messages are directed to different topics.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -115,8 +137,16 @@ public static Read read() {
.build();
}

public static Write write() {
return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
public static Write<byte[]> write() {
return new AutoValue_MqttIO_Write.Builder<byte[]>()
.setRetained(false)
.setPayloadFn(SerializableFunctions.identity())
.setDynamic(false)
.build();
}

public static <InputT> Write<InputT> dynamicWrite() {
return new AutoValue_MqttIO_Write.Builder<InputT>().setRetained(false).setDynamic(true).build();
}

private MqttIO() {}
Expand All @@ -127,7 +157,7 @@ public abstract static class ConnectionConfiguration implements Serializable {

abstract String getServerUri();

abstract String getTopic();
abstract @Nullable String getTopic();

abstract @Nullable String getClientId();

Expand Down Expand Up @@ -169,6 +199,11 @@ public static ConnectionConfiguration create(String serverUri, String topic) {
.build();
}

public static ConnectionConfiguration create(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).build();
}

/** Set up the MQTT broker URI. */
public ConnectionConfiguration withServerUri(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
Expand Down Expand Up @@ -199,7 +234,7 @@ public ConnectionConfiguration withPassword(String password) {

private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("serverUri", getServerUri()));
builder.add(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("clientId", getClientId()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
Expand Down Expand Up @@ -278,6 +313,9 @@ public Read withMaxReadTime(Duration maxReadTime) {

@Override
public PCollection<byte[]> expand(PBegin input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");

org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));

Expand Down Expand Up @@ -505,29 +543,50 @@ public UnboundedMqttSource getCurrentSource() {

/** A {@link PTransform} to write and send a message to a MQTT server. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {

public abstract static class Write<InputT> extends PTransform<PCollection<InputT>, PDone> {
abstract @Nullable ConnectionConfiguration connectionConfiguration();

abstract @Nullable SerializableFunction<InputT, String> topicFn();

abstract @Nullable SerializableFunction<InputT, byte[]> payloadFn();

abstract boolean dynamic();

abstract boolean retained();

abstract Builder builder();
abstract Builder<InputT> builder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration configuration);
abstract static class Builder<InputT> {
abstract Builder<InputT> setConnectionConfiguration(ConnectionConfiguration configuration);

abstract Builder<InputT> setRetained(boolean retained);

abstract Builder<InputT> setTopicFn(SerializableFunction<InputT, String> topicFn);

abstract Builder setRetained(boolean retained);
abstract Builder<InputT> setPayloadFn(SerializableFunction<InputT, byte[]> payloadFn);

abstract Write build();
abstract Builder<InputT> setDynamic(boolean dynamic);

abstract Write<InputT> build();
}

/** Define MQTT connection configuration used to connect to the MQTT broker. */
public Write withConnectionConfiguration(ConnectionConfiguration configuration) {
public Write<InputT> withConnectionConfiguration(ConnectionConfiguration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return builder().setConnectionConfiguration(configuration).build();
}

public Write<InputT> withTopicFn(SerializableFunction<InputT, String> topicFn) {
checkArgument(dynamic(), "withTopicFn can not use in non-dynamic write");
return builder().setTopicFn(topicFn).build();
}

public Write<InputT> withPayloadFn(SerializableFunction<InputT, byte[]> payloadFn) {
checkArgument(dynamic(), "withPayloadFn can not use in non-dynamic write");
return builder().setPayloadFn(payloadFn).build();
}

/**
* Whether or not the publish message should be retained by the messaging engine. Sending a
* message with the retained set to {@code false} will clear the retained message from the
Expand All @@ -538,54 +597,76 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration)
* @param retained Whether or not the messaging engine should retain the message.
* @return The {@link Write} {@link PTransform} with the corresponding retained configuration.
*/
public Write withRetained(boolean retained) {
public Write<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}

@Override
public PDone expand(PCollection<byte[]> input) {
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}

private static class WriteFn extends DoFn<byte[], Void> {
@Override
public PDone expand(PCollection<InputT> input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
if (dynamic()) {
checkArgument(
connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic");
checkArgument(topicFn() != null, "topicFn can not be null");
} else {
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");
}
checkArgument(payloadFn() != null, "payloadFn can not be null");

input.apply(ParDo.of(new WriteFn<>(this)));
return PDone.in(input.getPipeline());
}

private static class WriteFn<InputT> extends DoFn<InputT, Void> {

private final Write spec;
private final Write<InputT> spec;
private final SerializableFunction<InputT, String> topicFn;
private final SerializableFunction<InputT, byte[]> payloadFn;
private final boolean retained;

private transient MQTT client;
private transient BlockingConnection connection;

public WriteFn(Write spec) {
public WriteFn(Write<InputT> spec) {
this.spec = spec;
if (spec.dynamic()) {
this.topicFn = spec.topicFn();
} else {
String topic = spec.connectionConfiguration().getTopic();
this.topicFn = ignore -> topic;
}
this.payloadFn = spec.payloadFn();
this.retained = spec.retained();
}

@Setup
public void createMqttClient() throws Exception {
LOG.debug("Starting MQTT writer");
client = spec.connectionConfiguration().createClient();
this.client = this.spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
connection = createConnection(client);
this.connection = createConnection(client);
}

@ProcessElement
public void processElement(ProcessContext context) throws Exception {
byte[] payload = context.element();
InputT element = context.element();
byte[] payload = this.payloadFn.apply(element);
String topic = this.topicFn.apply(element);
LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8));
connection.publish(
spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false);
this.connection.publish(topic, payload, QoS.AT_LEAST_ONCE, this.retained);
}

@Teardown
public void closeMqttClient() throws Exception {
if (connection != null) {
if (this.connection != null) {
LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId());
connection.disconnect();
this.connection.disconnect();
}
}
}
Expand Down
Loading

0 comments on commit c8c674e

Please sign in to comment.