Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solace Read connector: adding Basic Authentication support #31541

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.Queue;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
* A class that manages a connection to a Solace broker using basic authentication.
*
* <p>This class provides a way to connect to a Solace broker and receive messages from a queue. The
* connection is established using basic authentication.
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
*/
public class BasicAuthJcsmpSessionService implements SessionService {
private final String queueName;
private final String host;
private final String username;
private final String password;
private final String vpnName;
@Nullable private JCSMPSession jcsmpSession;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

/**
* Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters.
*
* @param queueName The name of the queue to receive messages from.
* @param host The host name or IP address of the Solace broker. Format: Host[:Port]
* @param username The username to use for authentication.
* @param password The password to use for authentication.
* @param vpnName The name of the VPN to connect to.
*/
public BasicAuthJcsmpSessionService(
String queueName, String host, String username, String password, String vpnName) {
this.queueName = queueName;
this.host = host;
this.username = username;
this.password = password;
this.vpnName = vpnName;
}

@Override
public void connect() {
retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class));
}

@Override
public void close() {
if (isClosed()) {
return;
}
retryCallableManager.retryCallable(
() -> {
checkStateNotNull(jcsmpSession).closeSession();
return 0;
},
ImmutableSet.of(IOException.class));
}

@Override
public MessageReceiver createReceiver() {
return retryCallableManager.retryCallable(
this::createFlowReceiver, ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return jcsmpSession == null || jcsmpSession.isClosed();
}

private MessageReceiver createFlowReceiver() throws JCSMPException, IOException {
if (isClosed()) {
connectSession();
}

Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);

ConsumerFlowProperties flowProperties = new ConsumerFlowProperties();
flowProperties.setEndpoint(queue);
flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);

EndpointProperties endpointProperties = new EndpointProperties();
endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);
if (jcsmpSession != null) {
return new SolaceMessageReceiver(
createFlowReceiver(jcsmpSession, flowProperties, endpointProperties));
}
throw new IOException(
"SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null.");
}

// The `@SuppressWarning` is needed here, because the checkerframework reports an error for the
// first argument of the `createFlow` being null, even though the documentation allows it:
// https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties-
@SuppressWarnings("nullness")
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
private static FlowReceiver createFlowReceiver(
JCSMPSession jcsmpSession,
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
ConsumerFlowProperties flowProperties,
EndpointProperties endpointProperties)
throws JCSMPException {
return jcsmpSession.createFlow(null, flowProperties, endpointProperties);
}

private int connectSession() throws JCSMPException {
if (jcsmpSession == null) {
jcsmpSession = createSessionObject();
}
jcsmpSession.connect();
return 0;
}

private JCSMPSession createSessionObject() throws InvalidPropertiesException {
JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, host);
properties.setProperty(JCSMPProperties.USERNAME, username);
properties.setProperty(JCSMPProperties.PASSWORD, password);
properties.setProperty(JCSMPProperties.VPN_NAME, vpnName);

return JCSMPFactory.onlyInstance().createSession(properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;

/**
* A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link
* SessionServiceFactory}.
*
damondouglas marked this conversation as resolved.
Show resolved Hide resolved
* <p>This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with
* authenticate to Solace with Basic Authentication.
*/
@AutoValue
public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory {
public abstract String host();

public abstract String username();

public abstract String password();

public abstract String vpnName();

public static Builder builder() {
return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder();
}

@AutoValue.Builder
public abstract static class Builder {

/**
* Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or "[fe80::1]", or
* "12.34.56.78:4444".
*/
public abstract Builder host(String host);

/** Set Solace username. */
public abstract Builder username(String username);
/** Set Solace password. */
public abstract Builder password(String password);

/** Set Solace vpn name. */
public abstract Builder vpnName(String vpnName);

public abstract BasicAuthJcsmpSessionServiceFactory build();
}

@Override
public SessionService create() {
return new BasicAuthJcsmpSessionService(
checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(),
host(),
username(),
password(),
vpnName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.broker;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.StaleSessionException;
import java.io.IOException;
import org.apache.beam.sdk.io.solace.RetryCallableManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolaceMessageReceiver implements MessageReceiver {
private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class);

public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100;
private final FlowReceiver flowReceiver;
private final RetryCallableManager retryCallableManager = RetryCallableManager.create();

public SolaceMessageReceiver(FlowReceiver flowReceiver) {
this.flowReceiver = flowReceiver;
}

@Override
public void start() {
startFlowReceiver();
}

private void startFlowReceiver() {
retryCallableManager.retryCallable(
() -> {
flowReceiver.start();
return 0;
},
ImmutableSet.of(JCSMPException.class));
}

@Override
public boolean isClosed() {
return flowReceiver == null || flowReceiver.isClosed();
}

@Override
public BytesXMLMessage receive() throws IOException {
try {
return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS);
} catch (StaleSessionException e) {
LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver.");
startFlowReceiver();
throw new IOException(e);
} catch (JCSMPException e) {
throw new IOException(e);
}
}
}
Loading