From 5f4a9563c89ac21d06d832f50b201560708d316a Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Mon, 24 Jun 2024 19:57:56 +0200 Subject: [PATCH 1/3] Add support for BasicAuth to Solace --- .../broker/BasicAuthJcsmpSessionService.java | 148 ++++++++++++++++++ .../BasicAuthJcsmpSessionServiceFactory.java | 73 +++++++++ .../solace/broker/SolaceMessageReceiver.java | 72 +++++++++ 3 files changed, 293 insertions(+) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java new file mode 100644 index 000000000000..9246ccae0177 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.ConsumerFlowProperties; +import com.solacesystems.jcsmp.EndpointProperties; +import com.solacesystems.jcsmp.FlowReceiver; +import com.solacesystems.jcsmp.InvalidPropertiesException; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPProperties; +import com.solacesystems.jcsmp.JCSMPSession; +import com.solacesystems.jcsmp.Queue; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; + +/** + * A class that manages a connection to a Solace broker using basic authentication. + * + *

This class provides a way to connect to a Solace broker and receive messages from a queue. The + * connection is established using basic authentication. + */ +public class BasicAuthJcsmpSessionService implements SessionService { + private final String queueName; + private final String host; + private final String username; + private final String password; + private final String vpnName; + @Nullable private JCSMPSession jcsmpSession; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + /** + * Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters. + * + * @param queueName The name of the queue to receive messages from. + * @param host The host name or IP address of the Solace broker. Format: Host[:Port] + * @param username The username to use for authentication. + * @param password The password to use for authentication. + * @param vpnName The name of the VPN to connect to. + */ + public BasicAuthJcsmpSessionService( + String queueName, String host, String username, String password, String vpnName) { + this.queueName = queueName; + this.host = host; + this.username = username; + this.password = password; + this.vpnName = vpnName; + } + + @Override + public void connect() { + retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public void close() { + if (jcsmpSession != null && !jcsmpSession.isClosed()) { + retryCallableManager.retryCallable( + () -> { + if (jcsmpSession != null) { + jcsmpSession.closeSession(); + } + return 0; + }, + ImmutableSet.of(IOException.class)); + } + } + + @Override + public MessageReceiver createReceiver() { + return retryCallableManager.retryCallable( + this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return jcsmpSession == null || jcsmpSession.isClosed(); + } + + private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { + if (isClosed()) { + connectSession(); + } + + Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName); + + ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); + flowProperties.setEndpoint(queue); + flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); + + EndpointProperties endpointProperties = new EndpointProperties(); + endpointProperties.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE); + if (jcsmpSession != null) { + return new SolaceMessageReceiver( + createFlowReceiver(jcsmpSession, flowProperties, endpointProperties)); + } else { + throw new IOException( + "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); + } + } + + // The `@SuppressWarning` is needed here, because the checkerframework reports an error for the + // first argument of the `createFlow` being null, even though the documentation allows it: + // https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/com/solacesystems/jcsmp/JCSMPSession.html#createFlow-com.solacesystems.jcsmp.XMLMessageListener-com.solacesystems.jcsmp.ConsumerFlowProperties-com.solacesystems.jcsmp.EndpointProperties- + @SuppressWarnings("nullness") + private static FlowReceiver createFlowReceiver( + JCSMPSession jcsmpSession, + ConsumerFlowProperties flowProperties, + EndpointProperties endpointProperties) + throws JCSMPException { + return jcsmpSession.createFlow(null, flowProperties, endpointProperties); + } + + private int connectSession() throws JCSMPException { + if (jcsmpSession == null) { + jcsmpSession = createSessionObject(); + } + jcsmpSession.connect(); + return 0; + } + + private JCSMPSession createSessionObject() throws InvalidPropertiesException { + JCSMPProperties properties = new JCSMPProperties(); + properties.setProperty(JCSMPProperties.HOST, host); + properties.setProperty(JCSMPProperties.USERNAME, username); + properties.setProperty(JCSMPProperties.PASSWORD, password); + properties.setProperty(JCSMPProperties.VPN_NAME, vpnName); + + return JCSMPFactory.onlyInstance().createSession(properties); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java new file mode 100644 index 000000000000..79230f0bcf89 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; + +/** + * A factory for creating {@link BasicAuthJcsmpSessionService} instances. + * + *

This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with + * different configurations. + */ +@AutoValue +public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { + public abstract String host(); + + public abstract String username(); + + public abstract String password(); + + public abstract String vpnName(); + + public static Builder builder() { + return new AutoValue_BasicAuthJcsmpSessionServiceFactory.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + /** + * Set Solace host, format: Host[:Port] e.g. "12.34.56.78", or "[fe80::1]", or + * "12.34.56.78:4444". + */ + public abstract Builder host(String host); + + /** Set Solace username. */ + public abstract Builder username(String username); + /** Set Solace password. */ + public abstract Builder password(String password); + + /** Set Solace vpn name. */ + public abstract Builder vpnName(String vpnName); + + public abstract BasicAuthJcsmpSessionServiceFactory build(); + } + + @Override + public SessionService create() { + return new BasicAuthJcsmpSessionService( + checkNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), + host(), + username(), + password(), + vpnName()); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java new file mode 100644 index 000000000000..e5f129d3ddfc --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageReceiver.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.FlowReceiver; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.StaleSessionException; +import java.io.IOException; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolaceMessageReceiver implements MessageReceiver { + private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageReceiver.class); + + public static final int DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS = 100; + private final FlowReceiver flowReceiver; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + public SolaceMessageReceiver(FlowReceiver flowReceiver) { + this.flowReceiver = flowReceiver; + } + + @Override + public void start() { + startFlowReceiver(); + } + + private void startFlowReceiver() { + retryCallableManager.retryCallable( + () -> { + flowReceiver.start(); + return 0; + }, + ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return flowReceiver == null || flowReceiver.isClosed(); + } + + @Override + public BytesXMLMessage receive() throws IOException { + try { + return flowReceiver.receive(DEFAULT_ADVANCE_TIMEOUT_IN_MILLIS); + } catch (StaleSessionException e) { + LOG.warn("SolaceIO: Caught StaleSessionException, restarting the FlowReceiver."); + startFlowReceiver(); + throw new IOException(e); + } catch (JCSMPException e) { + throw new IOException(e); + } + } +} From 626f7e944bfea2786c1d0387b18a7a290087ec47 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Fri, 28 Jun 2024 14:46:49 +0200 Subject: [PATCH 2/3] Address PR comments --- .../broker/BasicAuthJcsmpSessionService.java | 24 +++++++++---------- .../BasicAuthJcsmpSessionServiceFactory.java | 5 ++-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 9246ccae0177..8ef2cf412489 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.solace.broker; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + import com.solacesystems.jcsmp.ConsumerFlowProperties; import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.FlowReceiver; @@ -71,16 +73,15 @@ public void connect() { @Override public void close() { - if (jcsmpSession != null && !jcsmpSession.isClosed()) { - retryCallableManager.retryCallable( - () -> { - if (jcsmpSession != null) { - jcsmpSession.closeSession(); - } - return 0; - }, - ImmutableSet.of(IOException.class)); + if (isClosed()) { + return; } + retryCallableManager.retryCallable( + () -> { + checkNotNull(jcsmpSession).closeSession(); + return 0; + }, + ImmutableSet.of(IOException.class)); } @Override @@ -110,10 +111,9 @@ private MessageReceiver createFlowReceiver() throws JCSMPException, IOException if (jcsmpSession != null) { return new SolaceMessageReceiver( createFlowReceiver(jcsmpSession, flowProperties, endpointProperties)); - } else { - throw new IOException( - "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); } + throw new IOException( + "SolaceIO.Read: Could not create a receiver from the Jcsmp session: session object is null."); } // The `@SuppressWarning` is needed here, because the checkerframework reports an error for the diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 79230f0bcf89..366ee0c6768f 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -22,10 +22,11 @@ import com.google.auto.value.AutoValue; /** - * A factory for creating {@link BasicAuthJcsmpSessionService} instances. + * A factory for creating {@link BasicAuthJcsmpSessionService} instances. Extends {@link + * SessionServiceFactory}. * *

This factory provides a way to create {@link BasicAuthJcsmpSessionService} instances with - * different configurations. + * authenticate to Solace with Basic Authentication. */ @AutoValue public abstract class BasicAuthJcsmpSessionServiceFactory extends SessionServiceFactory { From 951e5c60d8970b94e344665ecd77067dc59a72c3 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Mon, 1 Jul 2024 22:37:09 +0200 Subject: [PATCH 3/3] Use `checkStateNotNull` --- .../sdk/io/solace/broker/BasicAuthJcsmpSessionService.java | 4 ++-- .../io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 8ef2cf412489..7863dbd129ce 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.solace.broker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.solacesystems.jcsmp.ConsumerFlowProperties; import com.solacesystems.jcsmp.EndpointProperties; @@ -78,7 +78,7 @@ public void close() { } retryCallableManager.retryCallable( () -> { - checkNotNull(jcsmpSession).closeSession(); + checkStateNotNull(jcsmpSession).closeSession(); return 0; }, ImmutableSet.of(IOException.class)); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 366ee0c6768f..8cb4ff0af053 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.solace.broker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -65,7 +65,7 @@ public abstract static class Builder { @Override public SessionService create() { return new BasicAuthJcsmpSessionService( - checkNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), + checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), host(), username(), password(),