diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index fbf096abd22f..b33b8fb18027 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -32,7 +32,10 @@ dependencies { implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.solace implementation project(":sdks:java:extensions:avro") + implementation library.java.avro + permitUnusedDeclared library.java.avro } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java new file mode 100644 index 000000000000..199a83e322bd --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.io.IOException; + +/** + * Interface for receiving messages from a Solace broker. + * + *
Implementations of this interface are responsible for managing the connection to the broker + * and for receiving messages from the broker. + */ +public interface MessageReceiver { + /** + * Starts the message receiver. + * + *
This method is called in the {@link + * org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method. + */ + void start(); + + /** + * Returns {@literal true} if the message receiver is closed, {@literal false} otherwise. + * + *
A message receiver is closed when it is no longer able to receive messages. + */ + boolean isClosed(); + + /** + * Receives a message from the broker. + * + *
This method will block until a message is received.
+ */
+ BytesXMLMessage receive() throws IOException;
+
+ /**
+ * Test clients may return {@literal true} to signal that all expected messages have been pulled
+ * and the test may complete. Real clients should always return {@literal false}.
+ */
+ default boolean isEOF() {
+ return false;
+ }
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java
new file mode 100644
index 000000000000..465f37c14036
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClient.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.Queue;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * This interface defines methods for interacting with a Solace message broker using the Solace
+ * Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of
+ * the broker, including queues and topics.
+ */
+public interface SempClient extends Serializable {
+
+ /**
+ * Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow
+ * multiple consumers to receive messages from the queue.
+ */
+ boolean isQueueNonExclusive(String queueName) throws IOException;
+
+ /**
+ * This is only called when a user requests to read data from a topic. This method creates a new
+ * queue on the Solace broker and associates it with the specified topic. This ensures that
+ * messages published to the topic are delivered to the queue, allowing consumers to receive them.
+ */
+ Queue createQueueForTopic(String queueName, String topicName) throws IOException;
+
+ /**
+ * Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents
+ * the amount of data in messages that are waiting to be delivered to consumers.
+ */
+ long getBacklogBytes(String queueName) throws IOException;
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java
index b5cb53e14b39..79f690fde175 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempClientFactory.java
@@ -23,4 +23,13 @@
* This interface serves as a blueprint for creating SempClient objects, which are used to interact
* with a Solace message broker using the Solace Element Management Protocol (SEMP).
*/
-public interface SempClientFactory extends Serializable {}
+public interface SempClientFactory extends Serializable {
+
+ /**
+ * This method is the core of the factory interface. It defines how to construct and return a
+ * SempClient object. Implementations of this interface will provide the specific logic for
+ * creating a client instance, which might involve connecting to the broker, handling
+ * authentication, and configuring other settings.
+ */
+ SempClient create();
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
new file mode 100644
index 000000000000..cd368865f0c3
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import java.io.Serializable;
+
+/**
+ * The SessionService interface provides a set of methods for managing a session with the Solace
+ * messaging system. It allows for establishing a connection, creating a message-receiver object,
+ * checking if the connection is closed or not, and gracefully closing the session.
+ */
+public interface SessionService extends Serializable {
+
+ /**
+ * Establishes a connection to the service. This could involve providing connection details like
+ * host, port, VPN name, username, and password.
+ */
+ void connect();
+
+ /** Gracefully closes the connection to the service. */
+ void close();
+
+ /**
+ * Checks whether the connection to the service is currently closed. This method is called when an
+ * `UnboundedSolaceReader` is starting to read messages - a session will be created if this
+ * returns true.
+ */
+ boolean isClosed();
+
+ /**
+ * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
+ * created from the session instance.
+ */
+ MessageReceiver createReceiver();
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index ab1f55ae7a9c..9b4ef99eba77 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -23,4 +23,11 @@
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
-public abstract class SessionServiceFactory implements Serializable {}
+public abstract class SessionServiceFactory implements Serializable {
+
+ /**
+ * This is the core method that subclasses must implement. It defines how to construct and return
+ * a SessionService object.
+ */
+ public abstract SessionService create();
+}
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
index f429df3f8cd1..77f6eed8f62c 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
@@ -23,9 +23,11 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -33,7 +35,9 @@
* acknowledged.
*/
@DefaultCoder(AvroCoder.class)
-class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
+@Internal
+@VisibleForTesting
+public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
new file mode 100644
index 000000000000..0155345a2323
--- /dev/null
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.read;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.solace.broker.MessageReceiver;
+import org.apache.beam.sdk.io.solace.broker.SempClient;
+import org.apache.beam.sdk.io.solace.broker.SessionService;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Unbounded Reader to read messages from a Solace Router. */
+@VisibleForTesting
+class UnboundedSolaceReader