Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solace Read connector: UnboundedSource and UnboundedReader #31636

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.solace
implementation project(":sdks:java:extensions:avro")
implementation library.java.avro
permitUnusedDeclared library.java.avro
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.IOException;

/**
* Interface for receiving messages from a Solace broker.
*
* <p>Implementations of this interface are responsible for managing the connection to the broker
* and for receiving messages from the broker.
*/
public interface MessageReceiver {
/**
* Starts the message receiver.
*
* <p>This method is called in the {@link
* org.apache.beam.sdk.io.solace.read.UnboundedSolaceReader#start()} method.
*/
void start();

/**
* Returns {@literal true} if the message receiver is closed, {@literal false} otherwise.
*
* <p>A message receiver is closed when it is no longer able to receive messages.
*/
boolean isClosed();

/**
* Receives a message from the broker.
*
* <p>This method will block until a message is received.
*/
BytesXMLMessage receive() throws IOException;

/**
* Test clients may return {@literal true} to signal that all expected messages have been pulled
* and the test may complete. Real clients should always return {@literal false}.
*/
default boolean isEOF() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import java.io.Serializable;

/**
* This interface defines methods for interacting with a Solace message broker using the Solace
* Element Management Protocol (SEMP). SEMP provides a way to manage and monitor various aspects of
* the broker, including queues and topics.
*/
public interface SempClient extends Serializable {

/**
* Determines if the specified queue is non-exclusive. In Solace, non-exclusive queues allow
* multiple consumers to receive messages from the queue.
*/
boolean isQueueNonExclusive(String queueName) throws IOException;

/**
* This is only called when a user requests to read data from a topic. This method creates a new
* queue on the Solace broker and associates it with the specified topic. This ensures that
* messages published to the topic are delivered to the queue, allowing consumers to receive them.
*/
Queue createQueueForTopic(String queueName, String topicName) throws IOException;

/**
* Retrieves the size of the backlog (in bytes) for the specified queue. The backlog represents
* the amount of data in messages that are waiting to be delivered to consumers.
*/
long getBacklogBytes(String queueName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,13 @@
* This interface serves as a blueprint for creating SempClient objects, which are used to interact
* with a Solace message broker using the Solace Element Management Protocol (SEMP).
*/
public interface SempClientFactory extends Serializable {}
public interface SempClientFactory extends Serializable {

/**
* This method is the core of the factory interface. It defines how to construct and return a
* SempClient object. Implementations of this interface will provide the specific logic for
* creating a client instance, which might involve connecting to the broker, handling
* authentication, and configuring other settings.
*/
SempClient create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import java.io.Serializable;

/**
* The SessionService interface provides a set of methods for managing a session with the Solace
* messaging system. It allows for establishing a connection, creating a message-receiver object,
* checking if the connection is closed or not, and gracefully closing the session.
*/
public interface SessionService extends Serializable {

/**
* Establishes a connection to the service. This could involve providing connection details like
* host, port, VPN name, username, and password.
*/
void connect();

/** Gracefully closes the connection to the service. */
void close();

/**
* Checks whether the connection to the service is currently closed. This method is called when an
* `UnboundedSolaceReader` is starting to read messages - a session will be created if this
* returns true.
*/
boolean isClosed();

/**
* Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
* created from the session instance.
*/
MessageReceiver createReceiver();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,11 @@
* This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
* queue property and mandates the implementation of a create() method in concrete subclasses.
*/
public abstract class SessionServiceFactory implements Serializable {}
public abstract class SessionServiceFactory implements Serializable {

/**
* This is the core method that subclasses must implement. It defines how to construct and return
* a SessionService object.
*/
public abstract SessionService create();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be
* acknowledged.
*/
@DefaultCoder(AvroCoder.class)
class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
private transient AtomicBoolean activeReader;
// BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry
// these messages here. We relay on Solace's retry mechanism.
Expand Down
Loading
Loading