From 5008fc6a2e3a737b36a0eee21adafcd02b8f9b19 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Fri, 30 Aug 2024 16:28:19 -0400 Subject: [PATCH] External Mqtt source and sink --- sdks/java/io/expansion-service/build.gradle | 2 + .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 7 + .../mqtt/MqttReadSchemaTransformProvider.java | 136 +++++++++++++++ .../MqttWriteSchemaTransformProvider.java | 123 ++++++++++++++ .../mqtt/MqttSchemaTransformProviderTest.java | 157 ++++++++++++++++++ sdks/standard_external_transforms.yaml | 44 ++++- 6 files changed, 468 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java create mode 100644 sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java create mode 100644 sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 37ee03eba00d..6187c6829440 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -43,6 +43,8 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 + implementation project(":sdks:java:io:mqtt") + permitUnusedDeclared project(":sdks:java:io:mqtt") // BEAM-11761 // **** IcebergIO runtime dependencies **** runtimeOnly library.java.hadoop_client diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 8b7f0991c2dd..163ba077f31d 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -35,6 +35,9 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -120,13 +123,17 @@ public static Write write() { private MqttIO() {} /** A POJO describing a MQTT connection. */ + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class ConnectionConfiguration implements Serializable { + @SchemaFieldDescription("The MQTT broker URI.") abstract String getServerUri(); + @SchemaFieldDescription("The MQTT topic pattern.") abstract String getTopic(); + @SchemaFieldDescription("The client ID prefix, which is used to construct a unique client ID.") abstract @Nullable String getClientId(); abstract @Nullable String getUsername(); diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java new file mode 100644 index 000000000000..b05b18e1f8df --- /dev/null +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttReadSchemaTransformProvider.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttReadSchemaTransformProvider.ReadConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.joda.time.Duration; + +@AutoService(SchemaTransformProvider.class) +public class MqttReadSchemaTransformProvider + extends TypedSchemaTransformProvider { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class ReadConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttReadSchemaTransformProvider_ReadConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "The max number of records to receive. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxNumRecords(); + + @SchemaFieldDescription( + "The maximum time for this source to read messages. Setting this will result in a bounded PCollection.") + @Nullable + public abstract Long getMaxReadTimeSeconds(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration( + ConnectionConfiguration connectionConfiguration); + + public abstract Builder setMaxNumRecords(Long maxNumRecords); + + public abstract Builder setMaxReadTimeSeconds(Long maxReadTimeSeconds); + + public abstract ReadConfiguration build(); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mqtt_read:v1"; + } + + @Override + protected SchemaTransform from(ReadConfiguration configuration) { + return new MqttReadSchemaTransform(configuration); + } + + private static class MqttReadSchemaTransform extends SchemaTransform { + private final ReadConfiguration config; + + MqttReadSchemaTransform(ReadConfiguration configuration) { + this.config = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Preconditions.checkState( + input.getAll().isEmpty(), + "Expected zero input PCollections for this source, but found: %", + input.getAll().keySet()); + + MqttIO.Read readTransform = + MqttIO.read().withConnectionConfiguration(config.getConnectionConfiguration()); + + Long maxRecords = config.getMaxNumRecords(); + Long maxReadTime = config.getMaxReadTimeSeconds(); + if (maxRecords != null) { + readTransform = readTransform.withMaxNumRecords(maxRecords); + } + if (maxReadTime != null) { + readTransform = readTransform.withMaxReadTime(Duration.standardSeconds(maxReadTime)); + } + + Schema outputSchema = Schema.builder().addByteArrayField("bytes").build(); + + PCollection outputRows = + input + .getPipeline() + .apply(readTransform) + .apply( + "Wrap in Beam Rows", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element byte[] data, OutputReceiver outputReceiver) { + outputReceiver.output( + Row.withSchema(outputSchema).addValue(data).build()); + } + })) + .setRowSchema(outputSchema); + + return PCollectionRowTuple.of("output", outputRows); + } + } +} diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java new file mode 100644 index 000000000000..08af47073cc5 --- /dev/null +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttWriteSchemaTransformProvider.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.apache.beam.sdk.io.mqtt.MqttIO.ConnectionConfiguration; +import static org.apache.beam.sdk.io.mqtt.MqttWriteSchemaTransformProvider.WriteConfiguration; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +@AutoService(SchemaTransformProvider.class) +public class MqttWriteSchemaTransformProvider + extends TypedSchemaTransformProvider { + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class WriteConfiguration implements Serializable { + public static Builder builder() { + return new AutoValue_MqttWriteSchemaTransformProvider_WriteConfiguration.Builder(); + } + + @SchemaFieldDescription("Configuration options to set up the MQTT connection.") + public abstract ConnectionConfiguration getConnectionConfiguration(); + + @SchemaFieldDescription( + "Whether or not the publish message should be retained by the messaging engine. " + + "When a subscriber connects, it gets the latest retained message. " + + "Defaults to `False`, which will clear the retained message from the server.") + @Nullable + public abstract Boolean getRetained(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setConnectionConfiguration(ConnectionConfiguration foo); + + public abstract Builder setRetained(Boolean retained); + + public abstract WriteConfiguration build(); + } + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mqtt_write:v1"; + } + + @Override + protected SchemaTransform from(WriteConfiguration configuration) { + return new MqttWriteSchemaTransform(configuration); + } + + private static class MqttWriteSchemaTransform extends SchemaTransform { + private final WriteConfiguration config; + + MqttWriteSchemaTransform(WriteConfiguration configuration) { + this.config = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection inputRows = input.getSinglePCollection(); + + Preconditions.checkState( + inputRows.getSchema().getFieldCount() == 1 + && inputRows.getSchema().getField(0).getType().equals(Schema.FieldType.BYTES), + "Expected only one Schema field containing bytes, but instead received: %s", + inputRows.getSchema()); + + MqttIO.Write writeTransform = + MqttIO.write().withConnectionConfiguration(config.getConnectionConfiguration()); + Boolean retained = config.getRetained(); + if (retained != null) { + writeTransform = writeTransform.withRetained(retained); + } + + inputRows + .apply( + "Extract bytes", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element Row row, OutputReceiver outputReceiver) { + outputReceiver.output( + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + row.getBytes(0))); + } + })) + .apply(writeTransform); + + return PCollectionRowTuple.empty(inputRows.getPipeline()); + } + } +} diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java new file mode 100644 index 000000000000..fbb42c047fd3 --- /dev/null +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttSchemaTransformProviderTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mqtt; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import org.apache.activemq.broker.BrokerService; +import org.apache.beam.sdk.io.common.NetworkTestHelper; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.fusesource.hawtbuf.Buffer; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.Message; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MqttSchemaTransformProviderTest { + private static final Logger LOG = LoggerFactory.getLogger(MqttSchemaTransformProviderTest.class); + + private BrokerService brokerService; + + private int port; + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void startBroker() throws Exception { + port = NetworkTestHelper.getAvailableLocalPort(); + LOG.info("Starting ActiveMQ brokerService on {}", port); + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + // use memory persistence for the test: it's faster and don't pollute test folder with KahaDB + brokerService.setPersistent(false); + brokerService.addConnector("mqtt://localhost:" + port); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @Test(timeout = 30 * 1000) + public void testReceiveWithTimeoutAndNoData() { + MqttReadSchemaTransformProvider.ReadConfiguration readConfiguration = + MqttReadSchemaTransformProvider.ReadConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "READ_TOPIC") + .withClientId("READ_PIPELINE")) + .setMaxReadTimeSeconds(2L) + .build(); + + PCollectionRowTuple.empty(pipeline) + .apply(new MqttReadSchemaTransformProvider().from(readConfiguration)); + + // should stop before the test timeout + pipeline.run().waitUntilFinish(); + } + + @Test + public void testWrite() throws Exception { + final int numberOfTestMessages = 200; + MQTT client = new MQTT(); + client.setHost("tcp://localhost:" + port); + final BlockingConnection connection = client.blockingConnection(); + connection.connect(); + connection.subscribe(new Topic[] {new Topic(Buffer.utf8("WRITE_TOPIC"), QoS.EXACTLY_ONCE)}); + + final Set messages = new ConcurrentSkipListSet<>(); + + Thread subscriber = + new Thread( + () -> { + try { + for (int i = 0; i < numberOfTestMessages; i++) { + Message message = connection.receive(); + messages.add(new String(message.getPayload(), StandardCharsets.UTF_8)); + message.ack(); + System.out.println( + "message: " + new String(message.getPayload(), StandardCharsets.UTF_8)); + } + } catch (Exception e) { + LOG.error("Can't receive message", e); + } + }); + subscriber.start(); + + ArrayList data = new ArrayList<>(); + for (int i = 0; i < numberOfTestMessages; i++) { + data.add(("Test " + i).getBytes(StandardCharsets.UTF_8)); + } + + MqttWriteSchemaTransformProvider.WriteConfiguration writeConfiguration = + MqttWriteSchemaTransformProvider.WriteConfiguration.builder() + .setConnectionConfiguration( + MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, "WRITE_TOPIC")) + .build(); + Schema dataSchema = Schema.builder().addByteArrayField("bytes").build(); + + PCollection inputRows = + pipeline + .apply(Create.of(data)) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via(d -> Row.withSchema(dataSchema).addValue(d).build())) + .setRowSchema(dataSchema); + PCollectionRowTuple.of("input", inputRows) + .apply(new MqttWriteSchemaTransformProvider().from(writeConfiguration)); + pipeline.run(); + subscriber.join(); + + connection.disconnect(); + + assertEquals(numberOfTestMessages, messages.size()); + for (int i = 0; i < numberOfTestMessages; i++) { + assertTrue(messages.contains("Test " + i)); + } + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } +} diff --git a/sdks/standard_external_transforms.yaml b/sdks/standard_external_transforms.yaml index 05aa3c9b9de5..c31ae2c1ce4f 100644 --- a/sdks/standard_external_transforms.yaml +++ b/sdks/standard_external_transforms.yaml @@ -19,7 +19,7 @@ # configuration in /sdks/standard_expansion_services.yaml. # Refer to gen_xlang_wrappers.py for more info. # -# Last updated on: 2024-04-18 +# Last updated on: 2024-08-30 - default_service: sdks:java:io:expansion-service:shadowJar description: 'Outputs a PCollection of Beam Rows, each containing a single INT64 @@ -51,3 +51,45 @@ type: numpy.int64 identifier: beam:schematransform:org.apache.beam:generate_sequence:v1 name: GenerateSequence +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + connection_configuration: + description: Configuration options to set up the MQTT connection. + nullable: false + type: Row(client_id=typing.Union[str, NoneType], password=typing.Union[str, + NoneType], server_uri=, topic=, username=typing.Union[str, + NoneType]) + max_num_records: + description: The max number of records to receive. Setting this will result + in a bounded PCollection. + nullable: true + type: numpy.int64 + max_read_time_seconds: + description: The maximum time for this source to read messages. Setting this + will result in a bounded PCollection. + nullable: true + type: numpy.int64 + identifier: beam:schematransform:org.apache.beam:mqtt_read:v1 + name: MqttRead +- default_service: sdks:java:io:expansion-service:shadowJar + description: '' + destinations: + python: apache_beam/io + fields: + connection_configuration: + description: Configuration options to set up the MQTT connection. + nullable: false + type: Row(client_id=typing.Union[str, NoneType], password=typing.Union[str, + NoneType], server_uri=, topic=, username=typing.Union[str, + NoneType]) + retained: + description: Whether or not the publish message should be retained by the messaging + engine. When a subscriber connects, it gets the latest retained message. Defaults + to `False`, which will clear the retained message from the server. + nullable: true + type: bool + identifier: beam:schematransform:org.apache.beam:mqtt_write:v1 + name: MqttWrite