From 0074506285380ef00d57825e17d359e1e79efdbc Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 17 Sep 2024 02:08:20 +0900 Subject: [PATCH 1/6] add support for dynamic write in MqttIO --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 154 +++++++++++++- .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 196 ++++++++++++++++++ 2 files changed, 348 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 8b7f0991c2dd..4472a7dcdbc4 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -97,6 +98,26 @@ * "my_topic")) * * } + * + *

Dynamic Writing to a MQTT Broker

+ * + *

MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a + * function to determine the target topic for each message. The following example demonstrates how + * to configure dynamic topic writing: + * + *

{@code
+ * pipeline
+ *   .apply(...)  // Provide PCollection
+ *   .apply(
+ *     MqttIO.dynamicWrite()
+ *       .withConnectionConfiguration(
+ *         MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
+ *       .withTopicFn()
+ *       .withPayloadFn());
+ * }
+ * + *

This dynamic writing capability allows for more flexible MQTT message routing based on the + * message content, enabling scenarios where messages are directed to different topics. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -117,6 +138,10 @@ public static Write write() { return new AutoValue_MqttIO_Write.Builder().setRetained(false).build(); } + public static DynamicWrite dynamicWrite() { + return new AutoValue_MqttIO_DynamicWrite.Builder().setRetained(false).build(); + } + private MqttIO() {} /** A POJO describing a MQTT connection. */ @@ -125,7 +150,7 @@ public abstract static class ConnectionConfiguration implements Serializable { abstract String getServerUri(); - abstract String getTopic(); + abstract @Nullable String getTopic(); abstract @Nullable String getClientId(); @@ -167,6 +192,11 @@ public static ConnectionConfiguration create(String serverUri, String topic) { .build(); } + public static ConnectionConfiguration create(String serverUri) { + checkArgument(serverUri != null, "serverUri can not be null"); + return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).build(); + } + /** Set up the MQTT broker URI. */ public ConnectionConfiguration withServerUri(String serverUri) { checkArgument(serverUri != null, "serverUri can not be null"); @@ -197,7 +227,7 @@ public ConnectionConfiguration withPassword(String password) { private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("serverUri", getServerUri())); - builder.add(DisplayData.item("topic", getTopic())); + builder.addIfNotNull(DisplayData.item("topic", getTopic())); builder.addIfNotNull(DisplayData.item("clientId", getClientId())); builder.addIfNotNull(DisplayData.item("username", getUsername())); } @@ -276,6 +306,9 @@ public Read withMaxReadTime(Duration maxReadTime) { @Override public PCollection expand(PBegin input) { + checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); + checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); + org.apache.beam.sdk.io.Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this)); @@ -502,6 +535,121 @@ public UnboundedMqttSource getCurrentSource() { } } + @AutoValue + public abstract static class DynamicWrite extends PTransform, PDone> { + abstract @Nullable ConnectionConfiguration connectionConfiguration(); + + abstract @Nullable SerializableFunction topicFn(); + + abstract @Nullable SerializableFunction payloadFn(); + + abstract boolean retained(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration configuration); + + abstract Builder setRetained(boolean retained); + + abstract Builder setTopicFn(SerializableFunction topicFn); + + abstract Builder setPayloadFn(SerializableFunction payloadFn); + + abstract DynamicWrite build(); + } + + /** Define MQTT connection configuration used to connect to the MQTT broker. */ + public DynamicWrite withConnectionConfiguration(ConnectionConfiguration configuration) { + checkArgument(configuration != null, "configuration can not be null"); + return builder().setConnectionConfiguration(configuration).build(); + } + + public DynamicWrite withTopicFn(SerializableFunction topicFn) { + return builder().setTopicFn(topicFn).build(); + } + + public DynamicWrite withPayloadFn(SerializableFunction payloadFn) { + return builder().setPayloadFn(payloadFn).build(); + } + + /** + * Whether or not the publish message should be retained by the messaging engine. Sending a + * message with the retained set to {@code false} will clear the retained message from the + * server. The default value is {@code false}. When a subscriber connects, it gets the latest + * retained message (else it doesn't get any existing message, it will have to wait a new + * incoming message). + * + * @param retained Whether or not the messaging engine should retain the message. + * @return The {@link DynamicWrite} {@link PTransform} with the corresponding retained + * configuration. + */ + public DynamicWrite withRetained(boolean retained) { + return builder().setRetained(retained).build(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + connectionConfiguration().populateDisplayData(builder); + builder.add(DisplayData.item("retained", retained())); + } + + @Override + public PDone expand(PCollection input) { + checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); + checkArgument( + connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic"); + checkArgument(topicFn() != null, "topicFn can not be null"); + checkArgument(payloadFn() != null, "payloadFn can not be null"); + + input.apply(ParDo.of(new DynamicWriteFn<>(this))); + return PDone.in(input.getPipeline()); + } + + private static class DynamicWriteFn extends DoFn { + + private final DynamicWrite spec; + private final SerializableFunction topicFn; + private final SerializableFunction paylaodFn; + + private transient MQTT client; + private transient BlockingConnection connection; + + public DynamicWriteFn(DynamicWrite spec) { + this.spec = spec; + this.topicFn = spec.topicFn(); + this.paylaodFn = spec.payloadFn(); + } + + @Setup + public void createMqttClient() throws Exception { + LOG.debug("Starting MQTT writer"); + client = spec.connectionConfiguration().createClient(); + LOG.debug("MQTT writer client ID is {}", client.getClientId()); + connection = client.blockingConnection(); + connection.connect(); + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + InputT element = context.element(); + byte[] payload = this.paylaodFn.apply(element); + String topic = this.topicFn.apply(element); + LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8)); + connection.publish(topic, payload, QoS.AT_LEAST_ONCE, spec.retained()); + } + + @Teardown + public void closeMqttClient() throws Exception { + if (connection != null) { + LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId()); + connection.disconnect(); + } + } + } + } + /** A {@link PTransform} to write and send a message to a MQTT server. */ @AutoValue public abstract static class Write extends PTransform, PDone> { @@ -543,6 +691,8 @@ public Write withRetained(boolean retained) { @Override public PDone expand(PCollection input) { + checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); + checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 30adad708f8d..3efec1c15cf9 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.mqtt; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; @@ -26,16 +27,25 @@ import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.common.NetworkTestHelper; import org.apache.beam.sdk.io.mqtt.MqttIO.Read; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.fusesource.hawtbuf.Buffer; import org.fusesource.mqtt.client.BlockingConnection; @@ -267,6 +277,192 @@ public void testWrite() throws Exception { } } + @Test(timeout = 30 * 1000) + @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.") + public void testDynamicWrite() throws Exception { + final int numberOfTopic1Count = 100; + final int numberOfTopic2Count = 100; + final int numberOfTestMessages = numberOfTopic1Count + numberOfTopic2Count; + + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection connection = client.blockingConnection(); + connection.connect(); + final String writeTopic1 = "WRITE_TOPIC_1"; + final String writeTopic2 = "WRITE_TOPIC_2"; + connection.subscribe( + new Topic[] { + new Topic(Buffer.utf8(writeTopic1), QoS.EXACTLY_ONCE), + new Topic(Buffer.utf8(writeTopic2), QoS.EXACTLY_ONCE) + }); + + final Map> messageMap = new ConcurrentSkipListMap<>(); + final Thread subscriber = + new Thread( + () -> { + try { + for (int i = 0; i < numberOfTestMessages; i++) { + Message message = connection.receive(); + List messages = messageMap.get(message.getTopic()); + if (messages == null) { + messages = new ArrayList<>(); + } + messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); + messageMap.put(message.getTopic(), messages); + message.ack(); + } + } catch (Exception e) { + LOG.error("Can't receive message", e); + } + }); + + subscriber.start(); + + ArrayList> data = new ArrayList<>(); + for (int i = 0; i < numberOfTopic1Count; i++) { + data.add(KV.of(writeTopic1, ("Test" + i).getBytes(StandardCharsets.UTF_8))); + } + + for (int i = 0; i < numberOfTopic2Count; i++) { + data.add(KV.of(writeTopic2, ("Test" + i).getBytes(StandardCharsets.UTF_8))); + } + + pipeline + .apply(Create.of(data)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), ByteArrayCoder.of())) + .apply( + MqttIO.>dynamicWrite() + .withConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port) + .withClientId("READ_PIPELINE")) + .withTopicFn(input -> input.getKey()) + .withPayloadFn(input -> input.getValue())); + + pipeline.run(); + subscriber.join(); + + connection.disconnect(); + + assertEquals( + numberOfTestMessages, messageMap.values().stream().mapToLong(Collection::size).sum()); + + assertEquals(2, messageMap.keySet().size()); + assertTrue(messageMap.containsKey(writeTopic1)); + assertTrue(messageMap.containsKey(writeTopic2)); + for (Map.Entry> entry : messageMap.entrySet()) { + final List messages = entry.getValue(); + messages.forEach(message -> assertTrue(message.contains("Test"))); + } + } + + @Test + public void testReadHaveNoConnectionConfiguration() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, () -> MqttIO.read().expand(PBegin.in(pipeline))); + + assertEquals("connectionConfiguration can not be null", exception.getMessage()); + } + + @Test + public void testReadHaveNoTopic() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + MqttIO.read() + .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("serverUri")) + .expand(PBegin.in(pipeline))); + + assertEquals("topic can not be null", exception.getMessage()); + } + + @Test + public void testWriteHaveNoConnectionConfiguration() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> MqttIO.write().expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("connectionConfiguration can not be null", exception.getMessage()); + + pipeline.run(); + } + + @Test + public void testWriteHaveNoTopic() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + MqttIO.write() + .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("serverUri")) + .expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("topic can not be null", exception.getMessage()); + + pipeline.run(); + } + + @Test + public void testDynamicWriteHaveNoConnectionConfiguration() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> MqttIO.dynamicWrite().expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("connectionConfiguration can not be null", exception.getMessage()); + + pipeline.run(); + } + + @Test + public void testDynamicWriteHaveNoTopicFn() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + MqttIO.dynamicWrite() + .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("serverUri")) + .expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("topicFn can not be null", exception.getMessage()); + + pipeline.run(); + } + + @Test + public void testDynamicWriteHaveNoPayloadFn() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + MqttIO.dynamicWrite() + .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create("serverUri")) + .withTopicFn(input -> "topic") + .expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("payloadFn can not be null", exception.getMessage()); + + pipeline.run(); + } + + @Test + public void testDynamicWriteHaveStaticTopic() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + MqttIO.dynamicWrite() + .withConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("serverUri", "topic")) + .expand(pipeline.apply(Create.of(new byte[] {})))); + + assertEquals("DynamicWrite can not have static topic", exception.getMessage()); + + pipeline.run(); + } + @Test public void testReadObject() throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); From 2859b4d339f149acb6205c4a66f2893ef1a2f653 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 17 Sep 2024 02:16:42 +0900 Subject: [PATCH 2/6] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index b936ba6dda30..a6156c97bcfa 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). * Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349)) +* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376)) * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes From acaee1dbdeddc3547860afb420705446b689c481 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 17 Sep 2024 02:24:57 +0900 Subject: [PATCH 3/6] add some assertions in testDynamicWrite --- .../test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 3efec1c15cf9..398a5c59a448 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -277,8 +277,7 @@ public void testWrite() throws Exception { } } - @Test(timeout = 30 * 1000) - @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.") + @Test public void testDynamicWrite() throws Exception { final int numberOfTopic1Count = 100; final int numberOfTopic2Count = 100; @@ -352,6 +351,11 @@ public void testDynamicWrite() throws Exception { for (Map.Entry> entry : messageMap.entrySet()) { final List messages = entry.getValue(); messages.forEach(message -> assertTrue(message.contains("Test"))); + if (entry.getKey().equals(writeTopic1)) { + assertEquals(numberOfTopic1Count, messages.size()); + } else { + assertEquals(numberOfTopic2Count, messages.size()); + } } } From 2efecf05015784786190bfbf5cb12286349f14b5 Mon Sep 17 00:00:00 2001 From: twosom Date: Tue, 17 Sep 2024 02:45:19 +0900 Subject: [PATCH 4/6] remove whitespace in CHANGES.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index a6156c97bcfa..a63f7f75509c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,7 +68,7 @@ * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). * Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349)) -* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376)) +* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376)) * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes From b785bd79a8010917c337a52b2c49e904332499d9 Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 27 Sep 2024 01:21:23 +0900 Subject: [PATCH 5/6] refactor duplicated Write transform --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 177 ++++++------------ .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 20 ++ 2 files changed, 76 insertions(+), 121 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 9d13f57783f1..d0788cad2ae1 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -136,12 +137,16 @@ public static Read read() { .build(); } - public static Write write() { - return new AutoValue_MqttIO_Write.Builder().setRetained(false).build(); + public static Write write() { + return new AutoValue_MqttIO_Write.Builder() + .setRetained(false) + .setPayloadFn(SerializableFunctions.identity()) + .setDynamic(false) + .build(); } - public static DynamicWrite dynamicWrite() { - return new AutoValue_MqttIO_DynamicWrite.Builder().setRetained(false).build(); + public static Write dynamicWrite() { + return new AutoValue_MqttIO_Write.Builder().setRetained(false).setDynamic(true).build(); } private MqttIO() {} @@ -536,14 +541,17 @@ public UnboundedMqttSource getCurrentSource() { } } + /** A {@link PTransform} to write and send a message to a MQTT server. */ @AutoValue - public abstract static class DynamicWrite extends PTransform, PDone> { + public abstract static class Write extends PTransform, PDone> { abstract @Nullable ConnectionConfiguration connectionConfiguration(); abstract @Nullable SerializableFunction topicFn(); abstract @Nullable SerializableFunction payloadFn(); + abstract boolean dynamic(); + abstract boolean retained(); abstract Builder builder(); @@ -558,20 +566,24 @@ abstract static class Builder { abstract Builder setPayloadFn(SerializableFunction payloadFn); - abstract DynamicWrite build(); + abstract Builder setDynamic(boolean dynamic); + + abstract Write build(); } /** Define MQTT connection configuration used to connect to the MQTT broker. */ - public DynamicWrite withConnectionConfiguration(ConnectionConfiguration configuration) { + public Write withConnectionConfiguration(ConnectionConfiguration configuration) { checkArgument(configuration != null, "configuration can not be null"); return builder().setConnectionConfiguration(configuration).build(); } - public DynamicWrite withTopicFn(SerializableFunction topicFn) { + public Write withTopicFn(SerializableFunction topicFn) { + checkArgument(dynamic(), "withTopicFn can not use in non-dynamic write"); return builder().setTopicFn(topicFn).build(); } - public DynamicWrite withPayloadFn(SerializableFunction payloadFn) { + public Write withPayloadFn(SerializableFunction payloadFn) { + checkArgument(dynamic(), "withPayloadFn can not use in non-dynamic write"); return builder().setPayloadFn(payloadFn).build(); } @@ -583,10 +595,9 @@ public DynamicWrite withPayloadFn(SerializableFunction p * incoming message). * * @param retained Whether or not the messaging engine should retain the message. - * @return The {@link DynamicWrite} {@link PTransform} with the corresponding retained - * configuration. + * @return The {@link Write} {@link PTransform} with the corresponding retained configuration. */ - public DynamicWrite withRetained(boolean retained) { + public Write withRetained(boolean retained) { return builder().setRetained(retained).build(); } @@ -599,143 +610,67 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public PDone expand(PCollection input) { checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); - checkArgument( - connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic"); - checkArgument(topicFn() != null, "topicFn can not be null"); + final SerializableFunction topicFn; + if (dynamic()) { + checkArgument( + connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic"); + topicFn = topicFn(); + } else { + checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); + final String topic = connectionConfiguration().getTopic(); + topicFn = ignore -> topic; + } + + checkArgument(topicFn != null, "topicFn can not be null"); checkArgument(payloadFn() != null, "payloadFn can not be null"); - input.apply(ParDo.of(new DynamicWriteFn<>(this))); + input.apply( + ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn, payloadFn(), retained()))); return PDone.in(input.getPipeline()); } - private static class DynamicWriteFn extends DoFn { - - private final DynamicWrite spec; + private static class WriteFn extends DoFn { + private final ConnectionConfiguration connectionConfiguration; private final SerializableFunction topicFn; - private final SerializableFunction paylaodFn; + private final SerializableFunction payloadFn; + private final boolean retained; private transient MQTT client; private transient BlockingConnection connection; - public DynamicWriteFn(DynamicWrite spec) { - this.spec = spec; - this.topicFn = spec.topicFn(); - this.paylaodFn = spec.payloadFn(); + public WriteFn( + ConnectionConfiguration connectionConfiguration, + SerializableFunction topicFn, + SerializableFunction payloadFn, + boolean retained) { + this.connectionConfiguration = connectionConfiguration; + this.topicFn = topicFn; + this.payloadFn = payloadFn; + this.retained = retained; } @Setup public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); - client = spec.connectionConfiguration().createClient(); + this.client = this.connectionConfiguration.createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); - connection = client.blockingConnection(); - connection.connect(); + this.connection = createConnection(client); } @ProcessElement public void processElement(ProcessContext context) throws Exception { InputT element = context.element(); - byte[] payload = this.paylaodFn.apply(element); + byte[] payload = this.payloadFn.apply(element); String topic = this.topicFn.apply(element); LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8)); - connection.publish(topic, payload, QoS.AT_LEAST_ONCE, spec.retained()); + this.connection.publish(topic, payload, QoS.AT_LEAST_ONCE, this.retained); } @Teardown public void closeMqttClient() throws Exception { - if (connection != null) { + if (this.connection != null) { LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId()); - connection.disconnect(); - } - } - } - } - - /** A {@link PTransform} to write and send a message to a MQTT server. */ - @AutoValue - public abstract static class Write extends PTransform, PDone> { - - abstract @Nullable ConnectionConfiguration connectionConfiguration(); - - abstract boolean retained(); - - abstract Builder builder(); - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setConnectionConfiguration(ConnectionConfiguration configuration); - - abstract Builder setRetained(boolean retained); - - abstract Write build(); - } - - /** Define MQTT connection configuration used to connect to the MQTT broker. */ - public Write withConnectionConfiguration(ConnectionConfiguration configuration) { - checkArgument(configuration != null, "configuration can not be null"); - return builder().setConnectionConfiguration(configuration).build(); - } - - /** - * Whether or not the publish message should be retained by the messaging engine. Sending a - * message with the retained set to {@code false} will clear the retained message from the - * server. The default value is {@code false}. When a subscriber connects, it gets the latest - * retained message (else it doesn't get any existing message, it will have to wait a new - * incoming message). - * - * @param retained Whether or not the messaging engine should retain the message. - * @return The {@link Write} {@link PTransform} with the corresponding retained configuration. - */ - public Write withRetained(boolean retained) { - return builder().setRetained(retained).build(); - } - - @Override - public PDone expand(PCollection input) { - checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); - checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); - input.apply(ParDo.of(new WriteFn(this))); - return PDone.in(input.getPipeline()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - connectionConfiguration().populateDisplayData(builder); - builder.add(DisplayData.item("retained", retained())); - } - - private static class WriteFn extends DoFn { - - private final Write spec; - - private transient MQTT client; - private transient BlockingConnection connection; - - public WriteFn(Write spec) { - this.spec = spec; - } - - @Setup - public void createMqttClient() throws Exception { - LOG.debug("Starting MQTT writer"); - client = spec.connectionConfiguration().createClient(); - LOG.debug("MQTT writer client ID is {}", client.getClientId()); - connection = createConnection(client); - } - - @ProcessElement - public void processElement(ProcessContext context) throws Exception { - byte[] payload = context.element(); - LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8)); - connection.publish( - spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false); - } - - @Teardown - public void closeMqttClient() throws Exception { - if (connection != null) { - LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId()); - connection.disconnect(); + this.connection.disconnect(); } } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index ee01d9bd6ffe..8dfa7838d66a 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -378,6 +378,8 @@ public void testReadHaveNoTopic() { .expand(PBegin.in(pipeline))); assertEquals("topic can not be null", exception.getMessage()); + + pipeline.run(); } @Test @@ -466,6 +468,24 @@ public void testDynamicWriteHaveStaticTopic() { pipeline.run(); } + @Test + public void testWriteWithTopicFn() { + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, () -> MqttIO.write().withTopicFn(e -> "some topic")); + + assertEquals("withTopicFn can not use in non-dynamic write", exception.getMessage()); + } + + @Test + public void testWriteWithPayloadFn() { + final IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, () -> MqttIO.write().withPayloadFn(e -> new byte[] {})); + + assertEquals("withPayloadFn can not use in non-dynamic write", exception.getMessage()); + } + @Test public void testReadObject() throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); From 1d91ec0c419140084866681780752713597c07af Mon Sep 17 00:00:00 2001 From: twosom Date: Fri, 27 Sep 2024 23:47:28 +0900 Subject: [PATCH 6/6] change WriteFn to use Write spec --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index d0788cad2ae1..e1868e2c8461 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -610,27 +610,22 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public PDone expand(PCollection input) { checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); - final SerializableFunction topicFn; if (dynamic()) { checkArgument( connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic"); - topicFn = topicFn(); + checkArgument(topicFn() != null, "topicFn can not be null"); } else { checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); - final String topic = connectionConfiguration().getTopic(); - topicFn = ignore -> topic; } - - checkArgument(topicFn != null, "topicFn can not be null"); checkArgument(payloadFn() != null, "payloadFn can not be null"); - input.apply( - ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn, payloadFn(), retained()))); + input.apply(ParDo.of(new WriteFn<>(this))); return PDone.in(input.getPipeline()); } private static class WriteFn extends DoFn { - private final ConnectionConfiguration connectionConfiguration; + + private final Write spec; private final SerializableFunction topicFn; private final SerializableFunction payloadFn; private final boolean retained; @@ -638,21 +633,22 @@ private static class WriteFn extends DoFn { private transient MQTT client; private transient BlockingConnection connection; - public WriteFn( - ConnectionConfiguration connectionConfiguration, - SerializableFunction topicFn, - SerializableFunction payloadFn, - boolean retained) { - this.connectionConfiguration = connectionConfiguration; - this.topicFn = topicFn; - this.payloadFn = payloadFn; - this.retained = retained; + public WriteFn(Write spec) { + this.spec = spec; + if (spec.dynamic()) { + this.topicFn = spec.topicFn(); + } else { + String topic = spec.connectionConfiguration().getTopic(); + this.topicFn = ignore -> topic; + } + this.payloadFn = spec.payloadFn(); + this.retained = spec.retained(); } @Setup public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); - this.client = this.connectionConfiguration.createClient(); + this.client = this.spec.connectionConfiguration().createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); this.connection = createConnection(client); }