Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamic write in MqttIO #32470

Merged
merged 8 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349))
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -99,6 +100,26 @@
* "my_topic"))
*
* }</pre>
*
* <h3>Dynamic Writing to a MQTT Broker</h3>
*
* <p>MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a
* function to determine the target topic for each message. The following example demonstrates how
* to configure dynamic topic writing:
*
* <pre>{@code
* pipeline
* .apply(...) // Provide PCollection<InputT>
* .apply(
* MqttIO.<InputT>dynamicWrite()
* .withConnectionConfiguration(
* MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
* .withTopicFn(<Function to determine the topic dynamically>)
* .withPayloadFn(<Function to extract the payload>));
* }</pre>
*
* <p>This dynamic writing capability allows for more flexible MQTT message routing based on the
* message content, enabling scenarios where messages are directed to different topics.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -119,6 +140,10 @@ public static Write write() {
return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
}

public static <InputT> DynamicWrite<InputT> dynamicWrite() {
return new AutoValue_MqttIO_DynamicWrite.Builder<InputT>().setRetained(false).build();
}

private MqttIO() {}

/** A POJO describing a MQTT connection. */
Expand All @@ -127,7 +152,7 @@ public abstract static class ConnectionConfiguration implements Serializable {

abstract String getServerUri();

abstract String getTopic();
abstract @Nullable String getTopic();

abstract @Nullable String getClientId();

Expand Down Expand Up @@ -169,6 +194,11 @@ public static ConnectionConfiguration create(String serverUri, String topic) {
.build();
}

public static ConnectionConfiguration create(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).build();
}

/** Set up the MQTT broker URI. */
public ConnectionConfiguration withServerUri(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
Expand Down Expand Up @@ -199,7 +229,7 @@ public ConnectionConfiguration withPassword(String password) {

private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("serverUri", getServerUri()));
builder.add(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("clientId", getClientId()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
Expand Down Expand Up @@ -278,6 +308,9 @@ public Read withMaxReadTime(Duration maxReadTime) {

@Override
public PCollection<byte[]> expand(PBegin input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");

org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));

Expand Down Expand Up @@ -503,6 +536,121 @@ public UnboundedMqttSource getCurrentSource() {
}
}

@AutoValue
public abstract static class DynamicWrite<InputT> extends PTransform<PCollection<InputT>, PDone> {
abstract @Nullable ConnectionConfiguration connectionConfiguration();

abstract @Nullable SerializableFunction<InputT, String> topicFn();

abstract @Nullable SerializableFunction<InputT, byte[]> payloadFn();

abstract boolean retained();

abstract Builder<InputT> builder();

@AutoValue.Builder
abstract static class Builder<InputT> {
abstract Builder<InputT> setConnectionConfiguration(ConnectionConfiguration configuration);

abstract Builder<InputT> setRetained(boolean retained);

abstract Builder<InputT> setTopicFn(SerializableFunction<InputT, String> topicFn);

abstract Builder<InputT> setPayloadFn(SerializableFunction<InputT, byte[]> payloadFn);

abstract DynamicWrite<InputT> build();
}

/** Define MQTT connection configuration used to connect to the MQTT broker. */
public DynamicWrite<InputT> withConnectionConfiguration(ConnectionConfiguration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return builder().setConnectionConfiguration(configuration).build();
}

public DynamicWrite<InputT> withTopicFn(SerializableFunction<InputT, String> topicFn) {
return builder().setTopicFn(topicFn).build();
}

public DynamicWrite<InputT> withPayloadFn(SerializableFunction<InputT, byte[]> payloadFn) {
return builder().setPayloadFn(payloadFn).build();
}

/**
* Whether or not the publish message should be retained by the messaging engine. Sending a
* message with the retained set to {@code false} will clear the retained message from the
* server. The default value is {@code false}. When a subscriber connects, it gets the latest
* retained message (else it doesn't get any existing message, it will have to wait a new
* incoming message).
*
* @param retained Whether or not the messaging engine should retain the message.
* @return The {@link DynamicWrite} {@link PTransform} with the corresponding retained
* configuration.
*/
public DynamicWrite<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}

@Override
public PDone expand(PCollection<InputT> input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
checkArgument(
connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic");
checkArgument(topicFn() != null, "topicFn can not be null");
checkArgument(payloadFn() != null, "payloadFn can not be null");

input.apply(ParDo.of(new DynamicWriteFn<>(this)));
return PDone.in(input.getPipeline());
}

private static class DynamicWriteFn<InputT> extends DoFn<InputT, Void> {

private final DynamicWrite<InputT> spec;
private final SerializableFunction<InputT, String> topicFn;
private final SerializableFunction<InputT, byte[]> paylaodFn;

private transient MQTT client;
private transient BlockingConnection connection;

public DynamicWriteFn(DynamicWrite<InputT> spec) {
this.spec = spec;
this.topicFn = spec.topicFn();
this.paylaodFn = spec.payloadFn();
}

@Setup
public void createMqttClient() throws Exception {
LOG.debug("Starting MQTT writer");
client = spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
connection = client.blockingConnection();
connection.connect();
}

@ProcessElement
public void processElement(ProcessContext context) throws Exception {
InputT element = context.element();
byte[] payload = this.paylaodFn.apply(element);
String topic = this.topicFn.apply(element);
LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8));
connection.publish(topic, payload, QoS.AT_LEAST_ONCE, spec.retained());
}

@Teardown
public void closeMqttClient() throws Exception {
if (connection != null) {
LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId());
connection.disconnect();
}
}
}
}

/** A {@link PTransform} to write and send a message to a MQTT server. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
Expand Down Expand Up @@ -544,6 +692,8 @@ public Write withRetained(boolean retained) {

@Override
public PDone expand(PCollection<byte[]> input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
Expand Down
Loading
Loading