From e5d67fde702ac05865f78349e6ad60740a6bede7 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Fri, 26 Jul 2024 19:08:23 +0200 Subject: [PATCH] This is a follow-up PR to #31953, and part of the issue #31905. This PR adds the actual writer functionality, and some additional testing, including integration testing. This should be final PR for the SolaceIO write connector to be complete. --- CHANGES.md | 1 + sdks/java/io/solace/build.gradle | 1 + .../apache/beam/sdk/io/solace/SolaceIO.java | 209 +++++++++- .../broker/BasicAuthJcsmpSessionService.java | 113 +++-- .../BasicAuthJcsmpSessionServiceFactory.java | 17 +- .../GCPSecretSessionServiceFactory.java | 2 +- .../sdk/io/solace/broker/MessageProducer.java | 143 +++++++ .../sdk/io/solace/broker/MessageReceiver.java | 3 +- .../solace/broker/PublishResultHandler.java | 90 ++++ .../sdk/io/solace/broker/SessionService.java | 67 ++- .../solace/broker/SessionServiceFactory.java | 40 +- .../solace/broker/SolaceMessageProducer.java | 84 ++++ .../beam/sdk/io/solace/data/Solace.java | 66 +-- .../io/solace/read/UnboundedSolaceReader.java | 19 +- .../solace/write/PublishResultsReceiver.java | 43 ++ .../write/SolaceWriteSessionsHandler.java | 97 +++++ .../write/UnboundedBatchedSolaceWriter.java | 177 ++++++++ .../solace/write/UnboundedSolaceWriter.java | 394 ++++++++++++++++++ .../write/UnboundedStreamingSolaceWriter.java | 144 +++++++ .../io/solace/MockEmptySessionService.java | 16 +- .../sdk/io/solace/MockSessionService.java | 120 ++++-- .../io/solace/MockSessionServiceFactory.java | 48 ++- ...olaceIOTest.java => SolaceIOReadTest.java} | 257 ++++++------ .../beam/sdk/io/solace/SolaceIOWriteTest.java | 157 +++++++ .../broker/OverrideWriterPropertiesTest.java | 4 +- .../sdk/io/solace/data/SolaceDataUtils.java | 4 +- .../beam/sdk/io/solace/it/SolaceIOIT.java | 132 +++++- 27 files changed, 2155 insertions(+), 293 deletions(-) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java rename sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/{SolaceIOTest.java => SolaceIOReadTest.java} (72%) create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java diff --git a/CHANGES.md b/CHANGES.md index 60ba3ca37446..83c6437afe22 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)). ## New Features / Improvements diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 741db51a5772..84ea1f18b08a 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -38,6 +38,7 @@ dependencies { implementation library.java.proto_google_cloud_secret_manager_v1 implementation library.java.protobuf_java implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.vendored_grpc_1_60_1 implementation project(":sdks:java:extensions:avro") implementation library.java.avro permitUnusedDeclared library.java.avro diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index dcfdcc4fabb9..92651d504087 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -41,15 +41,28 @@ import org.apache.beam.sdk.io.solace.data.Solace.SolaceRecordMapper; import org.apache.beam.sdk.io.solace.read.UnboundedSolaceSource; import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.io.solace.write.UnboundedBatchedSolaceWriter; +import org.apache.beam.sdk.io.solace.write.UnboundedSolaceWriter; +import org.apache.beam.sdk.io.solace.write.UnboundedStreamingSolaceWriter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -805,7 +818,8 @@ private Queue initializeQueueForTopicIfNeeded( public enum SubmissionMode { HIGHER_THROUGHPUT, - LOWER_LATENCY + LOWER_LATENCY, + TESTING // Send acks 1 by 1, this will be very slow, never use this in an actual pipeline! } public enum WriterType { @@ -816,6 +830,8 @@ public enum WriterType { @AutoValue public abstract static class Write extends PTransform, SolaceOutput> { + private static final Logger LOG = LoggerFactory.getLogger(Write.class); + public static final TupleTag FAILED_PUBLISH_TAG = new TupleTag() {}; public static final TupleTag SUCCESSFUL_PUBLISH_TAG = @@ -961,6 +977,21 @@ public Write withWriterType(WriterType writerType) { return toBuilder().setWriterType(writerType).build(); } + /** + * Set the format function for your custom data type, and/or for dynamic destinations. + * + *

If you are using a custom data class, this function should return a {@link Solace.Record} + * corresponding to your custom data class instance. + * + *

If you are using this formatting function with dynamic destinations, you must ensure that + * you set the right value in the destination value of the {@link Solace.Record} messages. + * + *

In any other case, this format function is optional. + */ + public Write withFormatFunction(SerializableFunction formatFunction) { + return toBuilder().setFormatFunction(formatFunction).build(); + } + /** * Set the provider used to obtain the properties to initialize a new session in the broker. * @@ -1026,8 +1057,180 @@ abstract static class Builder { @Override public SolaceOutput expand(PCollection input) { - // TODO: will be sent in upcoming PR - return SolaceOutput.in(input.getPipeline(), null, null); + Class pcollClass = checkNotNull(input.getTypeDescriptor()).getRawType(); + boolean usingSolaceRecord = + pcollClass + .getTypeName() + .equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record") + || pcollClass.isAssignableFrom(Solace.Record.class); + + validateWriteTransform(usingSolaceRecord); + + boolean usingDynamicDestinations = getDestination() == null; + SerializableFunction destinationFn; + if (usingDynamicDestinations) { + destinationFn = x -> SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination())); + } else { + // Constant destination for all messages (same topic or queue) + // This should not be non-null, as nulls would have been flagged by the + // validateWriteTransform method + destinationFn = x -> checkNotNull(getDestination()); + } + + @SuppressWarnings("unchecked") + PCollection records = + getFormatFunction() == null + ? (PCollection) input + : input.apply( + "Format records", + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(checkNotNull(getFormatFunction()))); + + // Store the current window used by the input + PCollection captureWindow = + records.apply( + "Capture window", ParDo.of(new UnboundedSolaceWriter.RecordToPublishResultDoFn())); + + @SuppressWarnings("unchecked") + WindowingStrategy windowingStrategy = + (WindowingStrategy) + captureWindow.getWindowingStrategy(); + + PCollection withGlobalWindow = + records.apply("Global window", Window.into(new GlobalWindows())); + + PCollection> withShardKeys = + withGlobalWindow.apply( + "Add shard key", + ParDo.of(new UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers()))); + + String label = + getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)"; + + PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn)); + + SolaceOutput output; + if (getDeliveryMode() == DeliveryMode.PERSISTENT) { + PCollection failedPublish = solaceOutput.get(FAILED_PUBLISH_TAG); + PCollection successfulPublish = + solaceOutput.get(SUCCESSFUL_PUBLISH_TAG); + output = + rewindow( + SolaceOutput.in(input.getPipeline(), failedPublish, successfulPublish), + windowingStrategy); + } else { + LOG.info( + String.format( + "Solace.Write: omitting writer output because delivery mode is %s", + getDeliveryMode())); + output = SolaceOutput.in(input.getPipeline(), null, null); + } + + return output; + } + + private ParDo.MultiOutput, Solace.PublishResult> getWriterTransform( + SerializableFunction destinationFn) { + + ParDo.SingleOutput, Solace.PublishResult> writer = + ParDo.of( + getWriterType() == WriterType.STREAMING + ? new UnboundedStreamingSolaceWriter.WriterDoFn( + destinationFn, + checkNotNull(getSessionServiceFactory()), + getDeliveryMode(), + getDispatchMode(), + getNumberOfClientsPerWorker(), + getPublishLatencyMetrics()) + : new UnboundedBatchedSolaceWriter.WriterDoFn( + destinationFn, + checkNotNull(getSessionServiceFactory()), + getDeliveryMode(), + getDispatchMode(), + getNumberOfClientsPerWorker(), + getPublishLatencyMetrics())); + + return writer.withOutputTags(FAILED_PUBLISH_TAG, TupleTagList.of(SUCCESSFUL_PUBLISH_TAG)); + } + + private SolaceOutput rewindow( + SolaceOutput solacePublishResult, + WindowingStrategy strategy) { + PCollection correct = solacePublishResult.getSuccessfulPublish(); + PCollection failed = solacePublishResult.getFailedPublish(); + + PCollection correctWithWindow = null; + PCollection failedWithWindow = null; + + if (correct != null) { + correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow correct"); + } + + if (failed != null) { + failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow failed"); + } + + return SolaceOutput.in( + solacePublishResult.getPipeline(), failedWithWindow, correctWithWindow); + } + + private static PCollection applyOriginalWindow( + PCollection pcoll, + WindowingStrategy strategy, + String label) { + Window originalWindow = captureWindowDetails(strategy); + + if (strategy.getMode() == WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) { + originalWindow = originalWindow.accumulatingFiredPanes(); + } else { + originalWindow = originalWindow.discardingFiredPanes(); + } + + return pcoll.apply(label, originalWindow); + } + + private static Window captureWindowDetails( + WindowingStrategy strategy) { + return Window.into(strategy.getWindowFn()) + .withAllowedLateness(strategy.getAllowedLateness()) + .withOnTimeBehavior(strategy.getOnTimeBehavior()) + .withTimestampCombiner(strategy.getTimestampCombiner()) + .triggering(strategy.getTrigger()); + } + + /** + * Called before running the Pipeline to verify this transform is fully and correctly specified. + */ + private void validateWriteTransform(boolean usingSolaceRecords) { + if (!usingSolaceRecords) { + Preconditions.checkArgument( + getFormatFunction() != null, + "SolaceIO.Write: If you are not using Solace.Record as the input type, you" + + " must set a format function using withFormatFunction()."); + } + + Preconditions.checkArgument( + getMaxNumOfUsedWorkers() > 0, + "SolaceIO.Write: The number of used workers must be positive."); + Preconditions.checkArgument( + getNumberOfClientsPerWorker() > 0, + "SolaceIO.Write: The number of clients per worker must be positive."); + Preconditions.checkArgument( + getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == DeliveryMode.PERSISTENT, + String.format( + "SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s" + + " not supported", + getDeliveryMode())); + if (getPublishLatencyMetrics()) { + Preconditions.checkArgument( + getDeliveryMode() == DeliveryMode.PERSISTENT, + "SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT" + + " delivery mode."); + } + Preconditions.checkArgument( + getSessionServiceFactory() != null, + "SolaceIO: You need to pass a session service factory. For basic" + + " authentication, you can use BasicAuthJcsmpSessionServiceFactory."); } } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java index 2137d574b09a..3e3477937725 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.ConsumerFlowProperties; import com.solacesystems.jcsmp.EndpointProperties; import com.solacesystems.jcsmp.FlowReceiver; @@ -28,7 +29,10 @@ import com.solacesystems.jcsmp.JCSMPProperties; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.Queue; +import com.solacesystems.jcsmp.XMLMessageProducer; import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.beam.sdk.io.solace.RetryCallableManager; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; @@ -39,34 +43,43 @@ *

This class provides a way to connect to a Solace broker and receive messages from a queue. The * connection is established using basic authentication. */ -public class BasicAuthJcsmpSessionService extends SessionService { - private final String queueName; - private final String host; - private final String username; - private final String password; - private final String vpnName; +@AutoValue +public abstract class BasicAuthJcsmpSessionService extends SessionService { + /** The name of the queue to receive messages from. */ + public abstract @Nullable String queueName(); + /** The host name or IP address of the Solace broker. Format: Host[:Port] */ + public abstract String host(); + /** The username to use for authentication. */ + public abstract String username(); + /** The password to use for authentication. */ + public abstract String password(); + /** The name of the VPN to connect to. */ + public abstract String vpnName(); + + public static Builder builder() { + return new AutoValue_BasicAuthJcsmpSessionService.Builder().vpnName(DEFAULT_VPN_NAME); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder queueName(@Nullable String q); + + public abstract Builder host(String host); + + public abstract Builder username(String username); + + public abstract Builder password(String password); + + public abstract Builder vpnName(String vpnName); + + public abstract BasicAuthJcsmpSessionService build(); + } + @Nullable private JCSMPSession jcsmpSession; @Nullable private MessageReceiver messageReceiver; + @Nullable private MessageProducer messageProducer; private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); - /** - * Creates a new {@link BasicAuthJcsmpSessionService} with the given parameters. - * - * @param queueName The name of the queue to receive messages from. - * @param host The host name or IP address of the Solace broker. Format: Host[:Port] - * @param username The username to use for authentication. - * @param password The password to use for authentication. - * @param vpnName The name of the VPN to connect to. - */ - public BasicAuthJcsmpSessionService( - String queueName, String host, String username, String password, String vpnName) { - this.queueName = queueName; - this.host = host; - this.username = username; - this.password = password; - this.vpnName = vpnName; - } - @Override public void connect() { retryCallableManager.retryCallable(this::connectSession, ImmutableSet.of(JCSMPException.class)); @@ -79,6 +92,9 @@ public void close() { if (messageReceiver != null) { messageReceiver.close(); } + if (messageProducer != null) { + messageProducer.close(); + } if (!isClosed()) { checkStateNotNull(jcsmpSession).closeSession(); } @@ -88,24 +104,55 @@ public void close() { } @Override - public MessageReceiver createReceiver() { - this.messageReceiver = - retryCallableManager.retryCallable( - this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + public MessageReceiver getReceiver() { + if (this.messageReceiver == null) { + this.messageReceiver = + retryCallableManager.retryCallable( + this::createFlowReceiver, ImmutableSet.of(JCSMPException.class)); + } return this.messageReceiver; } + @Override + public MessageProducer getProducer() { + if (this.messageProducer == null || this.messageProducer.isClosed()) { + this.messageProducer = + retryCallableManager.retryCallable( + this::createXMLMessageProducer, ImmutableSet.of(JCSMPException.class)); + } + return checkStateNotNull(this.messageProducer); + } + @Override public boolean isClosed() { return jcsmpSession == null || jcsmpSession.isClosed(); } + private MessageProducer createXMLMessageProducer() throws JCSMPException, IOException { + if (isClosed()) { + connectSession(); + } + + @SuppressWarnings("nullness") + Callable initProducer = + () -> Objects.requireNonNull(jcsmpSession).getMessageProducer(new PublishResultHandler()); + + XMLMessageProducer producer = + retryCallableManager.retryCallable(initProducer, ImmutableSet.of(JCSMPException.class)); + if (producer == null) { + throw new IOException("SolaceIO.Write: Could not create producer, producer object is null"); + } + return new SolaceMessageProducer(producer); + } + private MessageReceiver createFlowReceiver() throws JCSMPException, IOException { if (isClosed()) { connectSession(); } - Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName); + Queue queue = + JCSMPFactory.onlyInstance() + .createQueue(checkStateNotNull(queueName(), "SolaceIO.Read: Queue is not set.")); ConsumerFlowProperties flowProperties = new ConsumerFlowProperties(); flowProperties.setEndpoint(queue); @@ -148,13 +195,13 @@ private JCSMPSession createSessionObject() throws InvalidPropertiesException { @Override public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) { - baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName); + baseProps.setProperty(JCSMPProperties.VPN_NAME, vpnName()); baseProps.setProperty( JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC); - baseProps.setProperty(JCSMPProperties.USERNAME, username); - baseProps.setProperty(JCSMPProperties.PASSWORD, password); - baseProps.setProperty(JCSMPProperties.HOST, host); + baseProps.setProperty(JCSMPProperties.USERNAME, username()); + baseProps.setProperty(JCSMPProperties.PASSWORD, password()); + baseProps.setProperty(JCSMPProperties.HOST, host()); return baseProps; } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java index 2084e61b7e38..31834d237847 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionServiceFactory.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.solace.broker; import static org.apache.beam.sdk.io.solace.broker.SessionService.DEFAULT_VPN_NAME; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -65,11 +64,15 @@ public abstract static class Builder { @Override public SessionService create() { - return new BasicAuthJcsmpSessionService( - checkStateNotNull(queue, "SolaceIO.Read: Queue is not set.").getName(), - host(), - username(), - password(), - vpnName()); + BasicAuthJcsmpSessionService.Builder builder = BasicAuthJcsmpSessionService.builder(); + if (queue != null) { + builder = builder.queueName(queue.getName()); + } + return builder + .host(host()) + .username(username()) + .password(password()) + .vpnName(vpnName()) + .build(); } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java index dd87e1d75fa5..7f691b46be31 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/GCPSecretSessionServiceFactory.java @@ -117,7 +117,7 @@ public abstract static class Builder { @Override public SessionService create() { - String password = null; + String password; try { password = retrieveSecret(); } catch (IOException e) { diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java new file mode 100644 index 000000000000..44f7e99f063f --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageProducer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** + * Interface for publishing messages to a Solace broker. + * + *

Implementations of this interface are responsible for managing the connection to the broker + * and for publishing messages to the broker. + */ +@Internal +public abstract class MessageProducer { + // This is the batch limit supported by the send multiple JCSMP API method. + static final int SOLACE_BATCH_LIMIT = 50; + + /** Publishes a message to the broker. */ + public abstract void publishSingleMessage( + Solace.Record msg, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode); + + /** + * Publishes a batch of messages to the broker. + * + *

The size of the batch cannot exceed 50 messages, this is a limitation of the Solace API. + * + *

It returns the number of messages written. + */ + public abstract int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode); + + /** + * Create a {@link BytesXMLMessage} to be published in Solace. + * + * @param record The record to be published. + * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency. + * @param deliveryMode The {@link DeliveryMode} used to publish the message. + * @return A {@link BytesXMLMessage} that can be sent to Solace "as is". + */ + public static BytesXMLMessage createBytesXMLMessage( + Solace.Record record, boolean useCorrelationKeyLatency, DeliveryMode deliveryMode) { + JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance(); + BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage(); + byte[] payload = record.getPayload(); + msg.writeBytes(payload); + + Long senderTimestamp = record.getSenderTimestamp(); + if (senderTimestamp == null) { + senderTimestamp = System.currentTimeMillis(); + } + msg.setSenderTimestamp(senderTimestamp); + msg.setDeliveryMode(deliveryMode); + if (useCorrelationKeyLatency) { + Solace.CorrelationKey key = + Solace.CorrelationKey.builder() + .setMessageId(record.getMessageId()) + .setPublishMonotonicMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())) + .build(); + msg.setCorrelationKey(key); + } else { + // Use only a string as correlation key + msg.setCorrelationKey(record.getMessageId()); + } + msg.setApplicationMessageId(record.getMessageId()); + return msg; + } + + /** + * Create a {@link JCSMPSendMultipleEntry} array to be published in Solace. This can be used with + * `sendMultiple` to send all the messages in a single API call. + * + *

The size of the list cannot be larger than 50 messages. This is a hard limit enforced by the + * Solace API. + * + * @param records A {@link List} of records to be published + * @param useCorrelationKeyLatency Whether to use a complex key for tracking latency. + * @param destinationFn A function that maps every record to its destination. + * @param deliveryMode The {@link DeliveryMode} used to publish the message. + * @return A {@link JCSMPSendMultipleEntry} array that can be sent to Solace "as is". + */ + public static JCSMPSendMultipleEntry[] createJCSMPSendMultipleEntry( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + if (records.size() > SOLACE_BATCH_LIMIT) { + throw new RuntimeException( + String.format( + "SolaceIO.Write: Trying to create a batch of %d, but Solace supports a" + + " maximum of %d. The batch will likely be rejected by Solace.", + records.size(), SOLACE_BATCH_LIMIT)); + } + + JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[records.size()]; + for (int i = 0; i < records.size(); i++) { + Solace.Record record = records.get(i); + JCSMPSendMultipleEntry entry = + JCSMPFactory.onlyInstance() + .createSendMultipleEntry( + createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode), + destinationFn.apply(record)); + entries[i] = entry; + } + + return entries; + } + + /** Returns {@literal true} if the message producer is closed, {@literal false} otherwise. */ + public abstract boolean isClosed(); + + /** Closes the message producer. */ + public abstract void close(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java index 95f989bd1be9..0bb9bf929437 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/MessageReceiver.java @@ -19,6 +19,7 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.IOException; +import java.io.Serializable; /** * Interface for receiving messages from a Solace broker. @@ -26,7 +27,7 @@ *

Implementations of this interface are responsible for managing the connection to the broker * and for receiving messages from the broker. */ -public interface MessageReceiver { +public interface MessageReceiver extends Serializable { /** * Starts the message receiver. * diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java new file mode 100644 index 000000000000..d03cd82b3bb4 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/PublishResultHandler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; +import org.apache.beam.sdk.io.solace.write.PublishResultsReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is required to handle callbacks from Solace, to find out if messages were actually + * published or there were any kind of error. + * + *

This class is also used to calculate the latency of the publication. The correlation key + * contains the original timestamp of when the message was sent from the pipeline to Solace. The + * comparison of that value with the clock now, using a monotonic clock, is understood as the + * latency of the publication + */ +public final class PublishResultHandler implements JCSMPStreamingPublishCorrelatingEventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PublishResultHandler.class); + + @Override + public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { + processKey(key, false, Optional.ofNullable(cause)); + } + + @Override + public void responseReceivedEx(Object key) { + processKey(key, true, Optional.empty()); + } + + private void processKey(Object key, boolean isPublished, Optional cause) { + PublishResult.Builder resultBuilder = PublishResult.builder(); + String messageId; + if (key == null) { + messageId = "NULL"; + } else if (key instanceof Solace.CorrelationKey) { + messageId = ((Solace.CorrelationKey) key).getMessageId(); + long latencyMillis = calculateLatency((Solace.CorrelationKey) key); + resultBuilder = resultBuilder.setLatencyMilliseconds(latencyMillis); + } else { + messageId = key.toString(); + } + + resultBuilder = resultBuilder.setMessageId(messageId).setPublished(isPublished); + if (!isPublished) { + if (cause.isPresent()) { + resultBuilder = resultBuilder.setError(cause.get().getMessage()); + } else { + resultBuilder = resultBuilder.setError("NULL - Not set by Solace"); + } + } else if (cause.isPresent()) { + LOG.warn( + "Message with id {} is published but exception is populated. Ignoring exception", + messageId); + } + + PublishResult publishResult = resultBuilder.build(); + // Static reference, it receives all callbacks from all publications + // from all threads + PublishResultsReceiver.addResult(publishResult); + } + + private static long calculateLatency(Solace.CorrelationKey key) { + long currentMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + long publishMillis = key.getPublishMonotonicMillis(); + return currentMillis - publishMillis; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java index aed700a71ded..cde9d66c454a 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionService.java @@ -20,6 +20,7 @@ import com.solacesystems.jcsmp.JCSMPProperties; import java.io.Serializable; import org.apache.beam.sdk.io.solace.SolaceIO; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,21 +70,23 @@ *

For basic authentication, use {@link BasicAuthJcsmpSessionService} and {@link * BasicAuthJcsmpSessionServiceFactory}. * - *

For other situations, you need to extend this class. For instance: + *

For other situations, you need to extend this class and implement the `equals` method, so two + * instances of your class can be compared by value. We recommend using AutoValue for that. For + * instance: * *

{@code
+ * {@literal }@AutoValue
  * public class MySessionService extends SessionService {
- *   private final String authToken;
+ *   abstract String authToken();
  *
- *   public MySessionService(String token) {
- *    this.oauthToken = token;
- *    ...
+ *   public static MySessionService create(String authToken) {
+ *       return new AutoValue_MySessionService(authToken);
  *   }
  *
  *   {@literal }@Override
  *   public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProps) {
  *     baseProps.setProperty(JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2);
- *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken);
+ *     baseProps.setProperty(JCSMPProperties.OAUTH2_ACCESS_TOKEN, authToken());
  *     return props;
  *   }
  *
@@ -101,6 +104,7 @@ public abstract class SessionService implements Serializable {
 
   public static final String DEFAULT_VPN_NAME = "default";
 
+  private static final int TESTING_PUB_ACK_WINDOW = 1;
   private static final int STREAMING_PUB_ACK_WINDOW = 50;
   private static final int BATCHED_PUB_ACK_WINDOW = 255;
 
@@ -121,10 +125,18 @@ public abstract class SessionService implements Serializable {
   public abstract boolean isClosed();
 
   /**
-   * Creates a MessageReceiver object for receiving messages from Solace. Typically, this object is
-   * created from the session instance.
+   * Returns a MessageReceiver object for receiving messages from Solace. If it is the first time
+   * this method is used, the receiver is created from the session instance, otherwise it returns
+   * the receiver created initially.
    */
-  public abstract MessageReceiver createReceiver();
+  public abstract MessageReceiver getReceiver();
+
+  /**
+   * Returns a MessageProducer object for publishing messages to Solace. If it is the first time
+   * this method is used, the producer is created from the session instance, otherwise it returns
+   * the producer created initially.
+   */
+  public abstract MessageProducer getProducer();
 
   /**
    * Override this method and provide your specific properties, including all those related to
@@ -147,6 +159,20 @@ public abstract class SessionService implements Serializable {
    */
   public abstract JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties);
 
+  /**
+   * You need to override this method to be able to compare these objects by value. We recommend
+   * using AutoValue for that.
+   */
+  @Override
+  public abstract boolean equals(@Nullable Object other);
+
+  /**
+   * You need to override this method to be able to compare these objects by value. We recommend
+   * using AutoValue for that.
+   */
+  @Override
+  public abstract int hashCode();
+
   /**
    * This method will be called by the write connector when a new session is started.
    *
@@ -195,10 +221,9 @@ private static JCSMPProperties overrideConnectorProperties(
         LOG.warn(
             "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to false since"
                 + " HIGHER_THROUGHPUT mode was selected");
+        props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
       }
 
-      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, false);
-
       Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
       if ((ackWindowSize != null && ackWindowSize != BATCHED_PUB_ACK_WINDOW)) {
         LOG.warn(
@@ -206,9 +231,10 @@ private static JCSMPProperties overrideConnectorProperties(
                 "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
                     + " HIGHER_THROUGHPUT mode was selected",
                 BATCHED_PUB_ACK_WINDOW));
+        props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
       }
-      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, BATCHED_PUB_ACK_WINDOW);
-    } else {
+
+    } else if (mode == SolaceIO.SubmissionMode.LOWER_LATENCY) {
       // Send from the same thread where the produced is being called. This offers the lowest
       // latency, but a low throughput too.
       Boolean msgCbProp = props.getBooleanProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR);
@@ -216,10 +242,9 @@ private static JCSMPProperties overrideConnectorProperties(
         LOG.warn(
             "SolaceIO.Write: Overriding MESSAGE_CALLBACK_ON_REACTOR to true since"
                 + " LOWER_LATENCY mode was selected");
+        props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
       }
 
-      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
-
       Integer ackWindowSize = props.getIntegerProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE);
       if ((ackWindowSize != null && ackWindowSize != STREAMING_PUB_ACK_WINDOW)) {
         LOG.warn(
@@ -227,10 +252,18 @@ private static JCSMPProperties overrideConnectorProperties(
                 "SolaceIO.Write: Overriding PUB_ACK_WINDOW_SIZE to %d since"
                     + " LOWER_LATENCY mode was selected",
                 STREAMING_PUB_ACK_WINDOW));
+        props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
       }
-
-      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, STREAMING_PUB_ACK_WINDOW);
+    } else if (mode == SolaceIO.SubmissionMode.TESTING) {
+      LOG.warn(
+          "SolaceIO.Write: Overriding JCSMP properties for testing. **IF THIS IS AN"
+              + " ACTUAL PIPELINE, CHANGE THE SUBMISSION MODE TO HIGHER_THROUGHPUT "
+              + "OR LOWER_LATENCY.**");
+      // Minimize multi-threading for testing
+      props.setProperty(JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, true);
+      props.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, TESTING_PUB_ACK_WINDOW);
     }
+
     return props;
   }
 }
diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
index 027de2cff134..98059bc9248f 100644
--- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
+++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SessionServiceFactory.java
@@ -22,8 +22,33 @@
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
- * This abstract class serves as a blueprint for creating `SessionService` objects. It introduces a
+ * This abstract class serves as a blueprint for creating `SessionServiceFactory` objects. It introduces a
  * queue property and mandates the implementation of a create() method in concrete subclasses.
+ *
+ * 

For basic authentication, use {@link BasicAuthJcsmpSessionServiceFactory}

. + * + *

For other situations, you need to extend this class. Classes extending from this abstract class must implement the `equals` method so two instances + * can be compared by value, and not by reference. We recommend using AutoValue for that. + * + *

{@code
+ * @AutoValue
+ * public abstract class MyFactory implements SessionServiceClientFactory {
+ *
+ *   abstract String value1();
+ *
+ *   abstract String value2();
+ *
+ *   public static MyFactory create(String value1, String value2) {
+ *     return new AutoValue_MyFactory.Builder(value1, value2);
+ *   }
+ *
+ *   ...
+ *
+ *   @Override
+ *   public SessionService create() {
+ *     ...
+ *   }
+ *}
*/ public abstract class SessionServiceFactory implements Serializable { /** @@ -40,6 +65,19 @@ public abstract class SessionServiceFactory implements Serializable { */ public abstract SessionService create(); + /** + * You need to override this method to be able to compare these objects by value. We recommend + * using AutoValue for that. + */ + @Override + public abstract boolean equals(@Nullable Object other); + + /** + * You need to override this method to be able to compare these objects by value. We recommend + * using AutoValue for that. + */ + @Override + public abstract int hashCode(); /** * This method is called in the {@link * org.apache.beam.sdk.io.solace.SolaceIO.Read#expand(org.apache.beam.sdk.values.PBegin)} method diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java new file mode 100644 index 000000000000..e5925503b233 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPException; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import com.solacesystems.jcsmp.XMLMessageProducer; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.RetryCallableManager; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; + +@Internal +public class SolaceMessageProducer extends MessageProducer { + + private final XMLMessageProducer producer; + private final RetryCallableManager retryCallableManager = RetryCallableManager.create(); + + public SolaceMessageProducer(XMLMessageProducer producer) { + this.producer = producer; + } + + @Override + public void publishSingleMessage( + Solace.Record record, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode) { + BytesXMLMessage msg = createBytesXMLMessage(record, useCorrelationKeyLatency, deliveryMode); + Callable publish = + () -> { + producer.send(msg, topicOrQueue); + return 0; + }; + + retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + JCSMPSendMultipleEntry[] batch = + createJCSMPSendMultipleEntry( + records, useCorrelationKeyLatency, destinationFn, deliveryMode); + Callable publish = () -> producer.sendMultiple(batch, 0, batch.length, 0); + return retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class)); + } + + @Override + public boolean isClosed() { + return producer == null || producer.isClosed(); + } + + @Override + public void close() { + if (!isClosed()) { + this.producer.close(); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java index 00b94b5b9ea9..86c7f20ae9c8 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Solace.java @@ -21,10 +21,10 @@ import com.solacesystems.jcsmp.BytesXMLMessage; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,17 +93,17 @@ public abstract static class Destination { */ public abstract DestinationType getType(); - static Builder builder() { + public static Builder builder() { return new AutoValue_Solace_Destination.Builder(); } @AutoValue.Builder - abstract static class Builder { - abstract Builder setName(String name); + public abstract static class Builder { + public abstract Builder setName(String name); - abstract Builder setType(DestinationType type); + public abstract Builder setType(DestinationType type); - abstract Destination build(); + public abstract Destination build(); } } @@ -120,7 +120,7 @@ public abstract static class Record { * @return The message ID, or null if not available. */ @SchemaFieldNumber("0") - public abstract @Nullable String getMessageId(); + public abstract String getMessageId(); /** * Gets the payload of the message as a ByteString. @@ -129,8 +129,9 @@ public abstract static class Record { * * @return The message payload. */ + @SuppressWarnings("mutable") @SchemaFieldNumber("1") - public abstract ByteBuffer getPayload(); + public abstract byte[] getPayload(); /** * Gets the destination (topic or queue) to which the message was sent. * @@ -192,7 +193,7 @@ public abstract static class Record { * @return The timestamp. */ @SchemaFieldNumber("7") - public abstract long getReceiveTimestamp(); + public abstract @Nullable Long getReceiveTimestamp(); /** * Gets the timestamp (in milliseconds since the Unix epoch) when the message was sent by the @@ -248,42 +249,49 @@ public abstract static class Record { * * @return The attachment data, or an empty ByteString if no attachment is present. */ + @SuppressWarnings("mutable") @SchemaFieldNumber("12") - public abstract ByteBuffer getAttachmentBytes(); + public abstract byte[] getAttachmentBytes(); - static Builder builder() { - return new AutoValue_Solace_Record.Builder(); + public static Builder builder() { + return new AutoValue_Solace_Record.Builder() + .setExpiration(0L) + .setPriority(-1) + .setRedelivered(false) + .setTimeToLive(0) + .setAttachmentBytes(ByteString.empty().toByteArray()); } @AutoValue.Builder - abstract static class Builder { - abstract Builder setMessageId(@Nullable String messageId); + public abstract static class Builder { + public abstract Builder setMessageId(String messageId); - abstract Builder setPayload(ByteBuffer payload); + public abstract Builder setPayload(byte[] payload); - abstract Builder setDestination(@Nullable Destination destination); + public abstract Builder setDestination(@Nullable Destination destination); - abstract Builder setExpiration(long expiration); + public abstract Builder setExpiration(long expiration); - abstract Builder setPriority(int priority); + public abstract Builder setPriority(int priority); - abstract Builder setRedelivered(boolean redelivered); + public abstract Builder setRedelivered(boolean redelivered); - abstract Builder setReplyTo(@Nullable Destination replyTo); + public abstract Builder setReplyTo(@Nullable Destination replyTo); - abstract Builder setReceiveTimestamp(long receiveTimestamp); + public abstract Builder setReceiveTimestamp(@Nullable Long receiveTimestamp); - abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp); + public abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp); - abstract Builder setSequenceNumber(@Nullable Long sequenceNumber); + public abstract Builder setSequenceNumber(@Nullable Long sequenceNumber); - abstract Builder setTimeToLive(long timeToLive); + public abstract Builder setTimeToLive(long timeToLive); - abstract Builder setReplicationGroupMessageId(@Nullable String replicationGroupMessageId); + public abstract Builder setReplicationGroupMessageId( + @Nullable String replicationGroupMessageId); - abstract Builder setAttachmentBytes(ByteBuffer attachmentBytes); + public abstract Builder setAttachmentBytes(byte[] attachmentBytes); - abstract Record build(); + public abstract Record build(); } } @@ -414,7 +422,7 @@ public static class SolaceRecordMapper { Destination destination = getDestination(msg.getCorrelationId(), msg.getDestination()); return Record.builder() .setMessageId(msg.getApplicationMessageId()) - .setPayload(ByteBuffer.wrap(payloadBytesStream.toByteArray())) + .setPayload(payloadBytesStream.toByteArray()) .setDestination(destination) .setExpiration(msg.getExpiration()) .setPriority(msg.getPriority()) @@ -428,7 +436,7 @@ public static class SolaceRecordMapper { msg.getReplicationGroupMessageId() != null ? msg.getReplicationGroupMessageId().toString() : null) - .setAttachmentBytes(ByteBuffer.wrap(attachmentBytesStream.toByteArray())) + .setAttachmentBytes(attachmentBytesStream.toByteArray()) .build(); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index c18a9d110b2a..a421970370da 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; -import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SempClient; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -49,7 +48,6 @@ class UnboundedSolaceReader extends UnboundedReader { private final SempClient sempClient; private @Nullable BytesXMLMessage solaceOriginalRecord; private @Nullable T solaceMappedRecord; - private @Nullable MessageReceiver messageReceiver; private @Nullable SessionService sessionService; AtomicBoolean active = new AtomicBoolean(true); @@ -72,7 +70,7 @@ public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { @Override public boolean start() { populateSession(); - populateMessageConsumer(); + checkNotNull(sessionService).getReceiver().start(); return advance(); } @@ -85,22 +83,11 @@ public void populateSession() { } } - private void populateMessageConsumer() { - if (messageReceiver == null) { - messageReceiver = checkNotNull(sessionService).createReceiver(); - messageReceiver.start(); - } - MessageReceiver receiver = checkNotNull(messageReceiver); - if (receiver.isClosed()) { - receiver.start(); - } - } - @Override public boolean advance() { BytesXMLMessage receivedXmlMessage; try { - receivedXmlMessage = checkNotNull(messageReceiver).receive(); + receivedXmlMessage = checkNotNull(sessionService).getReceiver().receive(); } catch (IOException e) { LOG.warn("SolaceIO.Read: Exception when pulling messages from the broker.", e); return false; @@ -125,7 +112,7 @@ public void close() { @Override public Instant getWatermark() { // should be only used by a test receiver - if (checkNotNull(messageReceiver).isEOF()) { + if (checkNotNull(sessionService).getReceiver().isEOF()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } return watermarkPolicy.getWatermark(); diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java new file mode 100644 index 000000000000..0877f6537514 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Solace.PublishResult; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * This will receive all the publishing results asynchronously, from the callbacks done by Solace + * when the ack of publishing a persistent message is received. This is then used by the finish + * bundle method of the writer to emit the corresponding results as the output of the write + * connector. + */ +@Internal +public final class PublishResultsReceiver { + private static final ConcurrentLinkedQueue resultsQueue = + new ConcurrentLinkedQueue<>(); + + public static @Nullable PublishResult pollResults() { + return resultsQueue.poll(); + } + + public static boolean addResult(PublishResult result) { + return resultsQueue.add(result); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java new file mode 100644 index 000000000000..d705035cbf75 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/SolaceWriteSessionsHandler.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import static org.apache.beam.sdk.io.solace.SolaceIO.DEFAULT_WRITER_CLIENTS_PER_WORKER; + +import com.google.auto.value.AutoValue; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; + +/** + * All the writer threads belonging to the same factory share the same instance of this class, to + * control for the number of clients that are connected to Solace, and minimize problems with quotas + * and limits. + * + *

This class maintains a map of all the session open in a worker, and control the size of that + * map, to avoid creating more sessions than Solace could handle. + * + *

This class is thread-safe and creates a pool of producers per SessionServiceFactory. If there + * is only a Write transform in the pipeline, this is effectively a singleton. If there are more + * than one, each {@link SessionServiceFactory} instance keeps their own pool of producers. + */ +final class SolaceWriteSessionsHandler { + private static final ConcurrentHashMap sessionsMap = + new ConcurrentHashMap<>(DEFAULT_WRITER_CLIENTS_PER_WORKER); + + public static SessionService getSessionService( + int producerIndex, SessionServiceFactory sessionServiceFactory) { + SessionConfigurationIndex key = + SessionConfigurationIndex.builder() + .producerIndex(producerIndex) + .sessionServiceFactory(sessionServiceFactory) + .build(); + return sessionsMap.computeIfAbsent( + key, SolaceWriteSessionsHandler::createSessionAndStartProducer); + } + + private static SessionService createSessionAndStartProducer(SessionConfigurationIndex key) { + SessionServiceFactory factory = key.sessionServiceFactory(); + SessionService sessionService = factory.create(); + // Start the producer now that the initialization is locked for other threads + sessionService.getProducer(); + return sessionService; + } + + /** Disconnect all the sessions from Solace, and clear the corresponding state. */ + public static void disconnectFromSolace(SessionServiceFactory factory, int producersCardinality) { + for (int i = 0; i < producersCardinality; i++) { + SessionConfigurationIndex key = + SessionConfigurationIndex.builder() + .producerIndex(i) + .sessionServiceFactory(factory) + .build(); + + SessionService sessionService = sessionsMap.remove(key); + if (sessionService != null) { + sessionService.close(); + } + } + } + + @AutoValue + abstract static class SessionConfigurationIndex { + abstract int producerIndex(); + + abstract SessionServiceFactory sessionServiceFactory(); + + static Builder builder() { + return new AutoValue_SolaceWriteSessionsHandler_SessionConfigurationIndex.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder producerIndex(int producerIndex); + + abstract Builder sessionServiceFactory(SessionServiceFactory sessionServiceFactory); + + abstract SessionConfigurationIndex build(); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java new file mode 100644 index 000000000000..ba971a2faddf --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedBatchedSolaceWriter.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public final class UnboundedBatchedSolaceWriter { + /** + * This DoFn is the responsible for writing to Solace in batch mode (holding up any messages), and + * emit the corresponding output (success or fail; only for persistent messages), so the + * SolaceIO.Write connector can be composed with other subsequent transforms in the pipeline. + * + *

The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be + * reused across different threads (if the number of threads is higher than the number of + * sessions, which is probably the most common case). + * + *

The producer uses the JCSMP send multiple mode to publish a batch of messages together with + * a single API call. The acks from this publication are also processed in batch, and returned as + * the output of the DoFn. + * + *

The batch size is 50, and this is currently the maximum value supported by Solace. + * + *

There are no acks if the delivery mode is set to DIRECT. + * + *

This writer DoFn offers higher throughput than {@link + * UnboundedStreamingSolaceWriter.WriterDoFn} but also higher latency. + */ + @Internal + public static class WriterDoFn extends UnboundedSolaceWriter.AbstractWriterDoFn { + + private static final Logger LOG = LoggerFactory.getLogger(WriterDoFn.class); + + private static final int ACKS_FLUSHING_INTERVAL_SECS = 10; + + private final Counter sentToBroker = + Metrics.counter(UnboundedBatchedSolaceWriter.class, "msgs_sent_to_broker"); + + private final Counter batchesRejectedByBroker = + Metrics.counter(UnboundedBatchedSolaceWriter.class, "batches_rejected"); + + // State variables are never explicitly "used" + @SuppressWarnings("UnusedVariable") + @StateId("processing_key") + private final StateSpec> processingKeySpec = StateSpecs.value(); + + @SuppressWarnings("UnusedVariable") + @TimerId("bundle_flusher") + private final TimerSpec bundleFlusherTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + public WriterDoFn( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + super( + destinationFn, + sessionServiceFactory, + deliveryMode, + submissionMode, + producersMapCardinality, + publishLatencyMetrics); + } + + // The state variable is here just to force a shuffling with a certain cardinality + @ProcessElement + public void processElement( + @Element KV element, + @StateId("processing_key") ValueState ignoredProcessingKey, + @TimerId("bundle_flusher") Timer bundleFlusherTimer, + @Timestamp Instant timestamp, + BoundedWindow window) { + + setCurrentBundleTimestamp(timestamp); + setCurrentBundleWindow(window); + + Solace.Record record = element.getValue(); + + if (record == null) { + LOG.error( + "SolaceIO.Write: Found null record with key {}. Ignoring record.", element.getKey()); + } else { + addToCurrentBundle(record); + // Extend timer for bundle flushing + bundleFlusherTimer + .offset(Duration.standardSeconds(ACKS_FLUSHING_INTERVAL_SECS)) + .setRelative(); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) { + // Take messages in groups of 50 (if there are enough messages) + List currentBundle = getCurrentBundle(); + for (int i = 0; i < currentBundle.size(); i += SOLACE_BATCH_LIMIT) { + int toIndex = Math.min(i + SOLACE_BATCH_LIMIT, currentBundle.size()); + List batch = currentBundle.subList(i, toIndex); + if (batch.isEmpty()) { + continue; + } + publishBatch(batch); + } + getCurrentBundle().clear(); + + publishResults(BeamContextWrapper.of(context)); + } + + @OnTimer("bundle_flusher") + public void flushBundle(OnTimerContext context) { + publishResults(BeamContextWrapper.of(context)); + } + + private void publishBatch(List records) { + try { + int entriesPublished = + solaceSessionService() + .getProducer() + .publishBatch( + records, shouldPublishLatencyMetrics(), getDestinationFn(), getDeliveryMode()); + sentToBroker.inc(entriesPublished); + } catch (Exception e) { + batchesRejectedByBroker.inc(); + Solace.PublishResult errorPublish = + Solace.PublishResult.builder() + .setPublished(false) + .setMessageId(String.format("BATCH_OF_%d_ENTRIES", records.size())) + .setError( + String.format( + "Batch could not be published after several" + " retries. Error: %s", + e.getMessage())) + .setLatencyMilliseconds(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())) + .build(); + PublishResultsReceiver.addResult(errorPublish); + } + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java new file mode 100644 index 000000000000..f0efba49c27f --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import static org.apache.beam.sdk.io.solace.SolaceIO.Write.FAILED_PUBLISH_TAG; +import static org.apache.beam.sdk.io.solace.SolaceIO.Write.SUCCESSFUL_PUBLISH_TAG; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import com.solacesystems.jcsmp.JCSMPFactory; +import com.solacesystems.jcsmp.JCSMPSendMultipleEntry; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public final class UnboundedSolaceWriter { + + /** + * This DoFn encapsulates common code used both for the {@link UnboundedBatchedSolaceWriter} and + * {@link UnboundedStreamingSolaceWriter}. + */ + abstract static class AbstractWriterDoFn + extends DoFn, Solace.PublishResult> { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractWriterDoFn.class); + + // This is the batch limit supported by the send multiple JCSMP API method. + static final int SOLACE_BATCH_LIMIT = 50; + private final Distribution latencyPublish = + Metrics.distribution(SolaceIO.Write.class, "latency_publish_ms"); + + private final Distribution latencyErrors = + Metrics.distribution(SolaceIO.Write.class, "latency_failed_ms"); + + private final SerializableFunction destinationFn; + + private final SessionServiceFactory sessionServiceFactory; + private final DeliveryMode deliveryMode; + private final SubmissionMode submissionMode; + private final int producersMapCardinality; + private final boolean publishLatencyMetrics; + private int currentBundleProducerIndex = 0; + + private final List batchToEmit; + + private @Nullable Instant bundleTimestamp; + private @Nullable BoundedWindow bundleWindow; + + public AbstractWriterDoFn( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + this.destinationFn = destinationFn; + this.sessionServiceFactory = sessionServiceFactory; + this.deliveryMode = deliveryMode; + this.submissionMode = submissionMode; + this.producersMapCardinality = producersMapCardinality; + this.publishLatencyMetrics = publishLatencyMetrics; + this.batchToEmit = new ArrayList<>(); + } + + @Teardown + public void teardown() { + SolaceWriteSessionsHandler.disconnectFromSolace( + sessionServiceFactory, producersMapCardinality); + } + + public void updateProducerIndex() { + currentBundleProducerIndex = (int) (Math.random() * producersMapCardinality); + } + + @StartBundle + public void startBundle() { + // Pick a producer at random for this bundle, reuse for the whole bundle + updateProducerIndex(); + batchToEmit.clear(); + } + + public SessionService solaceSessionService() { + return SolaceWriteSessionsHandler.getSessionService( + currentBundleProducerIndex, sessionServiceFactory); + } + + public void publishResults(BeamContextWrapper context) { + long sumPublish = 0; + long countPublish = 0; + long minPublish = Long.MAX_VALUE; + long maxPublish = 0; + + long sumFailed = 0; + long countFailed = 0; + long minFailed = Long.MAX_VALUE; + long maxFailed = 0; + + Solace.PublishResult result = PublishResultsReceiver.pollResults(); + + if (result != null) { + if (getCurrentBundleTimestamp() == null) { + setCurrentBundleTimestamp(Instant.now()); + } + + if (getCurrentBundleWindow() == null) { + setCurrentBundleWindow(GlobalWindow.INSTANCE); + } + } + + while (result != null) { + Long latency = result.getLatencyMilliseconds(); + + if (latency == null && shouldPublishLatencyMetrics()) { + LOG.error( + "SolaceIO.Write: Latency is null but user asked for latency metrics." + + " This may be a bug."); + } + + if (latency != null) { + if (result.getPublished()) { + sumPublish += latency; + countPublish++; + minPublish = Math.min(minPublish, latency); + maxPublish = Math.max(maxPublish, latency); + } else { + sumFailed += latency; + countFailed++; + minFailed = Math.min(minFailed, latency); + maxFailed = Math.max(maxFailed, latency); + } + } + + if (result.getPublished()) { + context.output( + SUCCESSFUL_PUBLISH_TAG, + result, + getCurrentBundleTimestamp(), + getCurrentBundleWindow()); + } else { + context.output( + FAILED_PUBLISH_TAG, result, getCurrentBundleTimestamp(), getCurrentBundleWindow()); + } + + result = PublishResultsReceiver.pollResults(); + } + + if (shouldPublishLatencyMetrics()) { + if (countPublish > 0) { + getPublishLatencyMetric().update(sumPublish, countPublish, minPublish, maxPublish); + } + + if (countFailed > 0) { + getFailedLatencyMetric().update(sumFailed, countFailed, minFailed, maxFailed); + } + } + } + + public BytesXMLMessage createSingleMessage( + Solace.Record record, boolean useCorrelationKeyLatency) { + JCSMPFactory jcsmpFactory = JCSMPFactory.onlyInstance(); + BytesXMLMessage msg = jcsmpFactory.createBytesXMLMessage(); + byte[] payload = record.getPayload(); + msg.writeBytes(payload); + + Long senderTimestamp = record.getSenderTimestamp(); + if (senderTimestamp == null) { + LOG.error( + "SolaceIO.Write: Record with id {} has no sender timestamp. Using current" + + " worker clock as timestamp.", + record.getMessageId()); + senderTimestamp = System.currentTimeMillis(); + } + msg.setSenderTimestamp(senderTimestamp); + msg.setDeliveryMode(getDeliveryMode()); + if (useCorrelationKeyLatency) { + Solace.CorrelationKey key = + Solace.CorrelationKey.builder() + .setMessageId(record.getMessageId()) + .setPublishMonotonicMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())) + .build(); + msg.setCorrelationKey(key); + } else { + // Use only a string as correlation key + msg.setCorrelationKey(record.getMessageId()); + } + msg.setApplicationMessageId(record.getMessageId()); + return msg; + } + + public JCSMPSendMultipleEntry[] createMessagesArray( + Iterable records, boolean useCorrelationKeyLatency) { + // Solace batch publishing only supports 50 elements max, so it is safe to convert to + // list here + ArrayList recordsList = Lists.newArrayList(records); + if (recordsList.size() > SOLACE_BATCH_LIMIT) { + LOG.error( + "SolaceIO.Write: Trying to create a batch of {}, but Solace supports a" + + " maximum of {}. The batch will likely be rejected by Solace.", + recordsList.size(), + SOLACE_BATCH_LIMIT); + } + + JCSMPSendMultipleEntry[] entries = new JCSMPSendMultipleEntry[recordsList.size()]; + for (int i = 0; i < recordsList.size(); i++) { + Solace.Record record = recordsList.get(i); + JCSMPSendMultipleEntry entry = + JCSMPFactory.onlyInstance() + .createSendMultipleEntry( + createSingleMessage(record, useCorrelationKeyLatency), + getDestinationFn().apply(record)); + entries[i] = entry; + } + + return entries; + } + + public int getProducersMapCardinality() { + return producersMapCardinality; + } + + public Distribution getPublishLatencyMetric() { + return latencyPublish; + } + + public Distribution getFailedLatencyMetric() { + return latencyErrors; + } + + public boolean shouldPublishLatencyMetrics() { + return publishLatencyMetrics; + } + + public SerializableFunction getDestinationFn() { + return destinationFn; + } + + public DeliveryMode getDeliveryMode() { + return deliveryMode; + } + + public SubmissionMode getDispatchMode() { + return submissionMode; + } + + public void addToCurrentBundle(Solace.Record record) { + batchToEmit.add(record); + } + + public List getCurrentBundle() { + return batchToEmit; + } + + public @Nullable Instant getCurrentBundleTimestamp() { + return bundleTimestamp; + } + + public @Nullable BoundedWindow getCurrentBundleWindow() { + return bundleWindow; + } + + public void setCurrentBundleTimestamp(Instant bundleTimestamp) { + if (this.bundleTimestamp == null || bundleTimestamp.isBefore(this.bundleTimestamp)) { + this.bundleTimestamp = bundleTimestamp; + } + } + + public void setCurrentBundleWindow(BoundedWindow bundleWindow) { + this.bundleWindow = bundleWindow; + } + + /** + * Since we need to publish from on timer methods and finish bundle methods, we need a + * consistent way to handle both WindowedContext and FinishBundleContext. + */ + static class BeamContextWrapper { + private @Nullable WindowedContext windowedContext; + private @Nullable FinishBundleContext finishBundleContext; + + private BeamContextWrapper() {} + + public static BeamContextWrapper of(WindowedContext windowedContext) { + BeamContextWrapper beamContextWrapper = new BeamContextWrapper(); + beamContextWrapper.windowedContext = windowedContext; + return beamContextWrapper; + } + + public static BeamContextWrapper of(FinishBundleContext finishBundleContext) { + BeamContextWrapper beamContextWrapper = new BeamContextWrapper(); + beamContextWrapper.finishBundleContext = finishBundleContext; + return beamContextWrapper; + } + + public void output( + TupleTag tag, + Solace.PublishResult output, + @Nullable Instant timestamp, // Not required for windowed context + @Nullable BoundedWindow window) { // Not required for windowed context + if (windowedContext != null) { + windowedContext.output(tag, output); + } else if (finishBundleContext != null) { + if (timestamp == null) { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: Timestamp is required for a FinishBundleContext."); + } + if (window == null) { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: BoundedWindow is required for a FinishBundleContext."); + } + finishBundleContext.output(tag, output, timestamp, window); + } else { + throw new IllegalStateException( + "SolaceIO.Write.UnboundedSolaceWriter.Context: No context provided"); + } + } + } + } + + /** + * This class a pseudo-key with a given cardinality. The downstream steps will use state & timers + * to distribute the data and control for the number of parallel workers used for writing. + */ + @Internal + public static class AddShardKeyDoFn extends DoFn> { + private final int shardCount; + private int shardKey; + + public AddShardKeyDoFn(int shardCount) { + this.shardCount = shardCount; + shardKey = -1; + } + + @ProcessElement + public void processElement( + @Element Solace.Record record, OutputReceiver> c) { + shardKey = (shardKey + 1) % shardCount; + c.output(KV.of(shardKey, record)); + } + } + + /** + * This class just transforms to PublishResult to be able to capture the windowing with the right + * strategy. The output is not used for anything else. + */ + @Internal + public static class RecordToPublishResultDoFn extends DoFn { + @ProcessElement + public void processElement( + @Element Solace.Record record, OutputReceiver receiver) { + Solace.PublishResult result = + Solace.PublishResult.builder() + .setPublished(true) + .setMessageId(record.getMessageId()) + .setLatencyMilliseconds(0L) + .build(); + receiver.output(result); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java new file mode 100644 index 000000000000..f3707f73a6cf --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedStreamingSolaceWriter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.write; + +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.SolaceIO; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public final class UnboundedStreamingSolaceWriter { + /** + * This DoFn is the responsible for writing to Solace in streaming mode (one message at a time, + * not holding up any message), and emit the corresponding output (success or fail; only for + * persistent messages), so the SolaceIO.Write connector can be composed with other subsequent + * transforms in the pipeline. + * + *

The DoFn will create several JCSMP sessions per VM, and the sessions and producers will be + * reused across different threads (if the number of threads is higher than the number of + * sessions, which is probably the most common case). + * + *

The producer uses the JCSMP streaming mode to publish a single message at a time, processing + * the acks from this publication, and returning them as output of the DoFn. + * + *

There are no acks if the delivery mode is set to DIRECT. + * + *

This writer DoFn offers lower latency and lower throughput than {@link + * UnboundedBatchedSolaceWriter.WriterDoFn}. + */ + @Internal + public static class WriterDoFn extends UnboundedSolaceWriter.AbstractWriterDoFn { + private static final Logger LOG = LoggerFactory.getLogger(WriterDoFn.class); + + private final Counter sentToBroker = + Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_sent_to_broker"); + + private final Counter rejectedByBroker = + Metrics.counter(UnboundedStreamingSolaceWriter.class, "msgs_rejected_by_broker"); + + // We use a state variable to force a shuffling and ensure the cardinality of the processing + @SuppressWarnings("UnusedVariable") + @StateId("current_key") + private final StateSpec> currentKeySpec = StateSpecs.value(); + + public WriterDoFn( + SerializableFunction destinationFn, + SessionServiceFactory sessionServiceFactory, + DeliveryMode deliveryMode, + SolaceIO.SubmissionMode submissionMode, + int producersMapCardinality, + boolean publishLatencyMetrics) { + super( + destinationFn, + sessionServiceFactory, + deliveryMode, + submissionMode, + producersMapCardinality, + publishLatencyMetrics); + } + + @ProcessElement + public void processElement( + @Element KV element, + @Timestamp Instant timestamp, + @AlwaysFetched @StateId("current_key") ValueState currentKeyState, + BoundedWindow window) { + + setCurrentBundleTimestamp(timestamp); + setCurrentBundleWindow(window); + + Integer currentKey = currentKeyState.read(); + Integer elementKey = element.getKey(); + Solace.Record record = element.getValue(); + + if (currentKey == null || !currentKey.equals(elementKey)) { + currentKeyState.write(elementKey); + } + + if (record == null) { + LOG.error("SolaceIO.Write: Found null record with key {}. Ignoring record.", elementKey); + return; + } + + // The publish method will retry, let's send a failure message if all the retries fail + try { + solaceSessionService() + .getProducer() + .publishSingleMessage( + record, + getDestinationFn().apply(record), + shouldPublishLatencyMetrics(), + getDeliveryMode()); + sentToBroker.inc(); + } catch (Exception e) { + rejectedByBroker.inc(); + Solace.PublishResult errorPublish = + Solace.PublishResult.builder() + .setPublished(false) + .setMessageId(record.getMessageId()) + .setError( + String.format( + "Message could not be published after several" + " retries. Error: %s", + e.getMessage())) + .setLatencyMilliseconds(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())) + .build(); + PublishResultsReceiver.addResult(errorPublish); + } + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) { + publishResults(BeamContextWrapper.of(context)); + } + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java index ec0ae7194686..18e896988a33 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockEmptySessionService.java @@ -17,14 +17,21 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.JCSMPProperties; +import org.apache.beam.sdk.io.solace.broker.MessageProducer; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; import org.apache.beam.sdk.io.solace.broker.SessionService; -public class MockEmptySessionService extends SessionService { +@AutoValue +public abstract class MockEmptySessionService extends SessionService { String exceptionMessage = "This is an empty client, use a MockSessionService instead."; + public static MockEmptySessionService create() { + return new AutoValue_MockEmptySessionService(); + } + @Override public void close() { throw new UnsupportedOperationException(exceptionMessage); @@ -36,7 +43,12 @@ public boolean isClosed() { } @Override - public MessageReceiver createReceiver() { + public MessageReceiver getReceiver() { + throw new UnsupportedOperationException(exceptionMessage); + } + + @Override + public MessageProducer getProducer() { throw new UnsupportedOperationException(exceptionMessage); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java index a4d6a42ef302..7943a6afd844 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionService.java @@ -17,38 +17,52 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; import com.solacesystems.jcsmp.BytesXMLMessage; +import com.solacesystems.jcsmp.DeliveryMode; +import com.solacesystems.jcsmp.Destination; import com.solacesystems.jcsmp.JCSMPProperties; import java.io.IOException; import java.io.Serializable; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.broker.MessageProducer; import org.apache.beam.sdk.io.solace.broker.MessageReceiver; +import org.apache.beam.sdk.io.solace.broker.PublishResultHandler; import org.apache.beam.sdk.io.solace.broker.SessionService; +import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.transforms.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; -public class MockSessionService extends SessionService { +@AutoValue +public abstract class MockSessionService extends SessionService { - private final SerializableFunction getRecordFn; - private MessageReceiver messageReceiver = null; - private final int minMessagesReceived; - private final @Nullable SubmissionMode mode; - - public MockSessionService( - SerializableFunction getRecordFn, - int minMessagesReceived, - @Nullable SubmissionMode mode) { - this.getRecordFn = getRecordFn; - this.minMessagesReceived = minMessagesReceived; - this.mode = mode; + public abstract @Nullable SerializableFunction recordFn(); + + public abstract int minMessagesReceived(); + + public abstract @Nullable SubmissionMode mode(); + + public static Builder builder() { + return new AutoValue_MockSessionService.Builder().minMessagesReceived(0); } - public MockSessionService( - SerializableFunction getRecordFn, int minMessagesReceived) { - this(getRecordFn, minMessagesReceived, null); + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder recordFn( + @Nullable SerializableFunction recordFn); + + public abstract Builder minMessagesReceived(int minMessagesReceived); + + public abstract Builder mode(@Nullable SubmissionMode mode); + + public abstract MockSessionService build(); } + private MessageReceiver messageReceiver = null; + private MockProducer messageProducer = null; + @Override public void close() {} @@ -58,16 +72,36 @@ public boolean isClosed() { } @Override - public MessageReceiver createReceiver() { + public MessageReceiver getReceiver() { if (messageReceiver == null) { - messageReceiver = new MockReceiver(getRecordFn, minMessagesReceived); + messageReceiver = new MockReceiver(recordFn(), minMessagesReceived()); } return messageReceiver; } + @Override + public MessageProducer getProducer() { + if (messageProducer == null) { + messageProducer = new MockProducer(new PublishResultHandler()); + } + return messageProducer; + } + @Override public void connect() {} + @Override + public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { + // Let's override some properties that will be overriden by the connector + // Opposite of the mode, to test that is overriden + baseProperties.setProperty( + JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode() == SubmissionMode.HIGHER_THROUGHPUT); + + baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87); + + return baseProperties; + } + public static class MockReceiver implements MessageReceiver, Serializable { private final AtomicInteger counter = new AtomicInteger(); private final SerializableFunction getRecordFn; @@ -101,15 +135,49 @@ public boolean isEOF() { } } - @Override - public JCSMPProperties initializeSessionProperties(JCSMPProperties baseProperties) { - // Let's override some properties that will be overriden by the connector - // Opposite of the mode, to test that is overriden - baseProperties.setProperty( - JCSMPProperties.MESSAGE_CALLBACK_ON_REACTOR, mode == SubmissionMode.HIGHER_THROUGHPUT); + public static class MockProducer extends MessageProducer implements Serializable { + private final PublishResultHandler handler; - baseProperties.setProperty(JCSMPProperties.PUB_ACK_WINDOW_SIZE, 87); + public MockProducer(PublishResultHandler handler) { + this.handler = handler; + } - return baseProperties; + @Override + public void publishSingleMessage( + Solace.Record msg, + Destination topicOrQueue, + boolean useCorrelationKeyLatency, + DeliveryMode deliveryMode) { + if (useCorrelationKeyLatency) { + handler.responseReceivedEx( + Solace.PublishResult.builder() + .setPublished(true) + .setMessageId(msg.getMessageId()) + .build()); + } else { + handler.responseReceivedEx(msg.getMessageId()); + } + } + + @Override + public int publishBatch( + List records, + boolean useCorrelationKeyLatency, + SerializableFunction destinationFn, + DeliveryMode deliveryMode) { + for (Solace.Record record : records) { + this.publishSingleMessage( + record, destinationFn.apply(record), useCorrelationKeyLatency, deliveryMode); + } + return records.size(); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() {} } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java index 603a30ad2c90..ae2d1f5919b8 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/MockSessionServiceFactory.java @@ -17,22 +17,58 @@ */ package org.apache.beam.sdk.io.solace; +import com.google.auto.value.AutoValue; +import com.solacesystems.jcsmp.BytesXMLMessage; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; import org.apache.beam.sdk.io.solace.broker.SessionService; import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.checkerframework.checker.nullness.qual.Nullable; -public class MockSessionServiceFactory extends SessionServiceFactory { - SessionService sessionService; +@AutoValue +public abstract class MockSessionServiceFactory extends SessionServiceFactory { + public abstract @Nullable SubmissionMode mode(); - public MockSessionServiceFactory(SessionService clientService) { - this.sessionService = clientService; + public abstract @Nullable SerializableFunction recordFn(); + + public abstract int minMessagesReceived(); + + public abstract boolean useEmptySessionMock(); + + public static Builder builder() { + return new AutoValue_MockSessionServiceFactory.Builder() + .minMessagesReceived(0) + .useEmptySessionMock(false); } public static SessionServiceFactory getDefaultMock() { - return new MockSessionServiceFactory(new MockEmptySessionService()); + return MockSessionServiceFactory.builder().build(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder mode(@Nullable SubmissionMode mode); + + public abstract Builder recordFn( + @Nullable SerializableFunction recordFn); + + public abstract Builder minMessagesReceived(int minMessagesReceived); + + public abstract Builder useEmptySessionMock(boolean useEmptySessionMock); + + public abstract MockSessionServiceFactory build(); } @Override public SessionService create() { - return sessionService; + if (useEmptySessionMock()) { + return MockEmptySessionService.create(); + } else { + return MockSessionService.builder() + .recordFn(recordFn()) + .minMessagesReceived(minMessagesReceived()) + .mode(mode()) + .build(); + } } } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java similarity index 72% rename from sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java rename to sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java index cc1fa1d667aa..6c197cd6dad5 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOReadTest.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; @@ -49,6 +50,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -61,7 +63,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class SolaceIOTest { +public class SolaceIOReadTest { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -69,7 +71,6 @@ private Read getDefaultRead() { return SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) .withMaxNumConnections(1); } @@ -77,7 +78,6 @@ private Read getDefaultReadForTopic() { return SolaceIO.read() .from(Solace.Topic.fromName("topic")) .withSempClientFactory(MockSempClientFactory.getDefaultMock()) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()) .withMaxNumConnections(1); } @@ -102,20 +102,18 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi @Test public void testReadMessages() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().minMessagesReceived(3).recordFn(recordFn).build(); // Expected data List expected = new ArrayList<>(); @@ -137,20 +135,18 @@ public void testReadMessages() { @Test public void testReadMessagesWithDeduplication() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -172,19 +168,18 @@ public void testReadMessagesWithDeduplication() { @Test public void testReadMessagesWithoutDeduplication() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "451")); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -206,32 +201,38 @@ public void testReadMessagesWithoutDeduplication() { @Test public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage( - "payload_test0", null, null, new ReplicationGroupMessageIdImpl(2L, 1L)), - SolaceDataUtils.getBytesXmlMessage( - "payload_test1", null, null, new ReplicationGroupMessageIdImpl(2L, 2L)), - SolaceDataUtils.getBytesXmlMessage( - "payload_test2", null, null, new ReplicationGroupMessageIdImpl(2L, 2L))); - return getOrNull(index, messages); - }, - 3); + + String id0 = UUID.randomUUID().toString(); + String id1 = UUID.randomUUID().toString(); + String id2 = UUID.randomUUID().toString(); + + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage( + "payload_test0", id0, null, new ReplicationGroupMessageIdImpl(2L, 1L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test1", id1, null, new ReplicationGroupMessageIdImpl(2L, 2L)), + SolaceDataUtils.getBytesXmlMessage( + "payload_test2", id2, null, new ReplicationGroupMessageIdImpl(2L, 2L))); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); expected.add( SolaceDataUtils.getSolaceRecord( - "payload_test0", null, new ReplicationGroupMessageIdImpl(2L, 1L))); + "payload_test0", id0, new ReplicationGroupMessageIdImpl(2L, 1L))); + expected.add( + SolaceDataUtils.getSolaceRecord( + "payload_test1", id1, new ReplicationGroupMessageIdImpl(2L, 2L))); expected.add( SolaceDataUtils.getSolaceRecord( - "payload_test1", null, new ReplicationGroupMessageIdImpl(2L, 2L))); + "payload_test2", id2, new ReplicationGroupMessageIdImpl(2L, 2L))); // Run the pipeline PCollection events = @@ -248,19 +249,18 @@ public void testReadMessagesWithDeduplicationOnReplicationGroupMessageId() { @Test public void testReadWithCoderAndParseFnAndTimestampFn() { // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Expected data List expected = new ArrayList<>(); @@ -304,7 +304,8 @@ public void testSplitsForExclusiveQueue() throws Exception { SolaceIO.read() .from(Solace.Queue.fromName("queue")) .withSempClientFactory(new MockSempClientFactory(mockSempClient)) - .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); + .withSessionServiceFactory( + MockSessionServiceFactory.builder().useEmptySessionMock(true).build()); int desiredNumSplits = 5; @@ -316,7 +317,10 @@ public void testSplitsForExclusiveQueue() throws Exception { @Test public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Exception { - Read spec = getDefaultRead().withMaxNumConnections(3); + Read spec = + getDefaultRead() + .withMaxNumConnections(3) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); int desiredNumSplits = 5; @@ -328,7 +332,10 @@ public void testSplitsForNonExclusiveQueueWithMaxNumConnections() throws Excepti @Test public void testSplitsForNonExclusiveQueueWithMaxNumConnectionsRespectDesired() throws Exception { - Read spec = getDefaultRead().withMaxNumConnections(10); + Read spec = + getDefaultRead() + .withMaxNumConnections(10) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); int desiredNumSplits = 5; UnboundedSolaceSource initialSource = getSource(spec, pipeline); @@ -346,7 +353,9 @@ public void testCreateQueueForTopic() { .build(); Read spec = - getDefaultReadForTopic().withSempClientFactory(new MockSempClientFactory(mockSempClient)); + getDefaultReadForTopic() + .withSempClientFactory(new MockSempClientFactory(mockSempClient)) + .withSessionServiceFactory(MockSessionServiceFactory.getDefaultMock()); spec.expand(PBegin.in(TestPipeline.create())); // check if createQueueForTopic was executed assertEquals(1, createQueueForTopicFnCounter.get()); @@ -358,22 +367,22 @@ public void testCheckpointMark() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + Read spec = getDefaultRead().withSessionServiceFactory(fakeSessionServiceFactory); UnboundedSolaceSource initialSource = getSource(spec, pipeline); @@ -407,21 +416,20 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; + SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); Read spec = getDefaultRead() @@ -467,22 +475,21 @@ public void testCheckpointMarkSafety() throws Exception { AtomicInteger countAckMessages = new AtomicInteger(0); // Broker that creates input data - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = new ArrayList<>(); - for (int i = 0; i < messagesToProcess; i++) { - messages.add( - SolaceDataUtils.getBytesXmlMessage( - "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); - } - countConsumedMessages.incrementAndGet(); - return getOrNull(index, messages); - }, - 10); + SerializableFunction recordFn = + index -> { + List messages = new ArrayList<>(); + for (int i = 0; i < messagesToProcess; i++) { + messages.add( + SolaceDataUtils.getBytesXmlMessage( + "payload_test" + i, "45" + i, (num) -> countAckMessages.incrementAndGet())); + } + countConsumedMessages.incrementAndGet(); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(10).build(); + Read spec = getDefaultRead() .withSessionServiceFactory(fakeSessionServiceFactory) @@ -558,20 +565,18 @@ public void testDestinationTopicQueueCreation() { @Test public void testTopicEncoding() { - MockSessionService mockClientService = - new MockSessionService( - index -> { - List messages = - ImmutableList.of( - SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), - SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), - SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); - return getOrNull(index, messages); - }, - 3); + SerializableFunction recordFn = + index -> { + List messages = + ImmutableList.of( + SolaceDataUtils.getBytesXmlMessage("payload_test0", "450"), + SolaceDataUtils.getBytesXmlMessage("payload_test1", "451"), + SolaceDataUtils.getBytesXmlMessage("payload_test2", "452")); + return getOrNull(index, messages); + }; SessionServiceFactory fakeSessionServiceFactory = - new MockSessionServiceFactory(mockClientService); + MockSessionServiceFactory.builder().recordFn(recordFn).minMessagesReceived(3).build(); // Run PCollection events = diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java new file mode 100644 index 000000000000..d5ec282debfc --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOWriteTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace; + +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.solacesystems.jcsmp.DeliveryMode; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.solace.SolaceIO.SubmissionMode; +import org.apache.beam.sdk.io.solace.SolaceIO.WriterType; +import org.apache.beam.sdk.io.solace.broker.SessionServiceFactory; +import org.apache.beam.sdk.io.solace.data.Solace; +import org.apache.beam.sdk.io.solace.data.Solace.Record; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SolaceIOWriteTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private final List keys = ImmutableList.of("450", "451", "452"); + private final List payloads = ImmutableList.of("payload0", "payload1", "payload2"); + + private PCollection getRecords(Pipeline p) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + assert keys.size() == payloads.size(); + + for (int k = 0; k < keys.size(); k++) { + kvBuilder = + kvBuilder + .addElements(KV.of(keys.get(k), payloads.get(k))) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + PCollection> kvs = p.apply("Test stream", testStream); + + return kvs.apply( + "To Record", + MapElements.into(TypeDescriptor.of(Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + } + + private SolaceOutput getWriteTransform(SubmissionMode mode, WriterType writerType, Pipeline p) { + SessionServiceFactory fakeSessionServiceFactory = + MockSessionServiceFactory.builder().mode(mode).build(); + + PCollection records = getRecords(p); + return records.apply( + "Write to Solace", + SolaceIO.write() + .to(Solace.Queue.fromName("queue")) + .withSubmissionMode(mode) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withSessionServiceFactory(fakeSessionServiceFactory)); + } + + private static PCollection getIdsPCollection(SolaceOutput output) { + return output + .getSuccessfulPublish() + .apply( + "Get message ids", MapElements.into(strings()).via(Solace.PublishResult::getMessageId)); + } + + @Test + public void testWriteLatencyStreaming() { + SubmissionMode mode = SubmissionMode.LOWER_LATENCY; + WriterType writerType = WriterType.STREAMING; + + SolaceOutput output = getWriteTransform(mode, writerType, pipeline); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + PAssert.that(output.getFailedPublish()).empty(); + + pipeline.run(); + } + + @Test + public void testWriteThroughputStreaming() { + SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT; + WriterType writerType = WriterType.STREAMING; + + SolaceOutput output = getWriteTransform(mode, writerType, pipeline); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + PAssert.that(output.getFailedPublish()).empty(); + + pipeline.run(); + } + + @Test + public void testWriteLatencyBatched() { + SubmissionMode mode = SubmissionMode.LOWER_LATENCY; + WriterType writerType = WriterType.BATCHED; + + SolaceOutput output = getWriteTransform(mode, writerType, pipeline); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + PAssert.that(output.getFailedPublish()).empty(); + + pipeline.run(); + } + + @Test + public void testWriteThroughputBatched() { + SubmissionMode mode = SubmissionMode.HIGHER_THROUGHPUT; + WriterType writerType = WriterType.BATCHED; + + SolaceOutput output = getWriteTransform(mode, writerType, pipeline); + PCollection ids = getIdsPCollection(output); + + PAssert.that(ids).containsInAnyOrder(keys); + PAssert.that(output.getFailedPublish()).empty(); + + pipeline.run(); + } +} diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java index 0c6f88a7c9d5..d14fc9d3ca22 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/OverrideWriterPropertiesTest.java @@ -31,7 +31,7 @@ public class OverrideWriterPropertiesTest { @Test public void testOverrideForHigherThroughput() { SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.HIGHER_THROUGHPUT; - MockSessionService service = new MockSessionService(null, 0, mode); + MockSessionService service = MockSessionService.builder().mode(mode).build(); // Test HIGHER_THROUGHPUT mode JCSMPProperties props = service.initializeWriteSessionProperties(mode); @@ -44,7 +44,7 @@ public void testOverrideForHigherThroughput() { @Test public void testOverrideForLowerLatency() { SolaceIO.SubmissionMode mode = SolaceIO.SubmissionMode.LOWER_LATENCY; - MockSessionService service = new MockSessionService(null, 0, mode); + MockSessionService service = MockSessionService.builder().mode(mode).build(); // Test HIGHER_THROUGHPUT mode JCSMPProperties props = service.initializeWriteSessionProperties(mode); diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java index 5134bd131d73..9e04c4cfd276 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/data/SolaceDataUtils.java @@ -100,7 +100,7 @@ public static Solace.Record getSolaceRecord( : DEFAULT_REPLICATION_GROUP_ID.toString(); return Solace.Record.builder() - .setPayload(ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8))) + .setPayload(payload.getBytes(StandardCharsets.UTF_8)) .setMessageId(messageId) .setDestination( Solace.Destination.builder() @@ -116,7 +116,7 @@ public static Solace.Record getSolaceRecord( .setTimeToLive(1000L) .setSenderTimestamp(null) .setReplicationGroupMessageId(replicationGroupMessageIdString) - .setAttachmentBytes(ByteBuffer.wrap(new byte[0])) + .setAttachmentBytes(new byte[0]) .build(); } diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java index 35ee7595352d..5f0cdecafc54 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/it/SolaceIOIT.java @@ -17,49 +17,71 @@ */ package org.apache.beam.sdk.io.solace.it; +import static org.apache.beam.sdk.io.solace.it.SolaceContainerManager.TOPIC_NAME; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.junit.Assert.assertEquals; +import com.solacesystems.jcsmp.DeliveryMode; import java.io.IOException; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.io.solace.SolaceIO; import org.apache.beam.sdk.io.solace.broker.BasicAuthJcsmpSessionServiceFactory; import org.apache.beam.sdk.io.solace.broker.BasicAuthSempClientFactory; +import org.apache.beam.sdk.io.solace.data.Solace; import org.apache.beam.sdk.io.solace.data.Solace.Queue; +import org.apache.beam.sdk.io.solace.data.SolaceDataUtils; +import org.apache.beam.sdk.io.solace.write.SolaceOutput; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testutils.metrics.MetricsReader; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.FixMethodOrder; import org.junit.Rule; import org.junit.Test; +import org.junit.runners.MethodSorters; +@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class SolaceIOIT { private static final String NAMESPACE = SolaceIOIT.class.getName(); private static final String READ_COUNT = "read_count"; + private static final String WRITE_COUNT = "write_count"; private static SolaceContainerManager solaceContainerManager; - private static final TestPipelineOptions readPipelineOptions; + private static final String queueName = "test_queue"; + private static final TestPipelineOptions pipelineOptions; + private static final long PUBLISH_MESSAGE_COUNT = 20; static { - readPipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); - readPipelineOptions.setBlockOnRun(false); - readPipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); - readPipelineOptions.as(StreamingOptions.class).setStreaming(false); + pipelineOptions = PipelineOptionsFactory.create().as(TestPipelineOptions.class); + pipelineOptions.as(StreamingOptions.class).setStreaming(true); + // For the read connector tests, we need to make sure that p.run() does not block + pipelineOptions.setBlockOnRun(false); + pipelineOptions.as(TestPipelineOptions.class).setBlockOnRun(false); } - @Rule public final TestPipeline readPipeline = TestPipeline.fromOptions(readPipelineOptions); + @Rule public final TestPipeline pipeline = TestPipeline.fromOptions(pipelineOptions); @BeforeClass public static void setup() throws IOException { solaceContainerManager = new SolaceContainerManager(); solaceContainerManager.start(); + solaceContainerManager.createQueueWithSubscriptionTopic(queueName); } @AfterClass @@ -69,20 +91,17 @@ public static void afterClass() { } } + // The order of the following tests matter. The first test publishes some messages in a Solace + // queue, and those messages are read by the second test. If another writer tests is run before + // the read test, that will alter the count for the read test and will make it fail. @Test - public void testRead() { - String queueName = "test_queue"; - solaceContainerManager.createQueueWithSubscriptionTopic(queueName); - - // todo this is very slow, needs to be replaced with the SolaceIO.write connector. - int publishMessagesCount = 20; - for (int i = 0; i < publishMessagesCount; i++) { - solaceContainerManager.sendToTopic( - "{\"field_str\":\"value\",\"field_int\":123}", - ImmutableList.of("Solace-Message-ID:m" + i)); - } + public void test01WriteStreaming() { + testWriteConnector(SolaceIO.WriterType.STREAMING); + } - readPipeline + @Test + public void test02Read() { + pipeline .apply( "Read from Solace", SolaceIO.read() @@ -104,12 +123,83 @@ public void testRead() { .build())) .apply("Count", ParDo.of(new CountingFn<>(NAMESPACE, READ_COUNT))); - PipelineResult pipelineResult = readPipeline.run(); + PipelineResult pipelineResult = pipeline.run(); + // We need enough time for Beam to pull all messages from the queue, but we need a timeout too, + // as the Read connector will keep attempting to read forever. pipelineResult.waitUntilFinish(Duration.standardSeconds(15)); MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); long actualRecordsCount = metricsReader.getCounterMetric(READ_COUNT); - assertEquals(publishMessagesCount, actualRecordsCount); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + @Test + public void test03WriteBatched() { + testWriteConnector(SolaceIO.WriterType.BATCHED); + } + + private void testWriteConnector(SolaceIO.WriterType writerType) { + Pipeline p = createWriterPipeline(writerType); + + PipelineResult pipelineResult = p.run(); + pipelineResult.waitUntilFinish(); + MetricsReader metricsReader = new MetricsReader(pipelineResult, NAMESPACE); + long actualRecordsCount = metricsReader.getCounterMetric(WRITE_COUNT); + assertEquals(PUBLISH_MESSAGE_COUNT, actualRecordsCount); + } + + private Pipeline createWriterPipeline(SolaceIO.WriterType writerType) { + TestStream.Builder> kvBuilder = + TestStream.create(KvCoder.of(AvroCoder.of(String.class), AvroCoder.of(String.class))) + .advanceWatermarkTo(Instant.EPOCH); + + for (int i = 0; i < PUBLISH_MESSAGE_COUNT; i++) { + String key = "Solace-Message-ID:m" + i; + String payload = String.format("{\"field_str\":\"value\",\"field_int\":123%d}", i); + kvBuilder = + kvBuilder + .addElements(KV.of(key, payload)) + .advanceProcessingTime(Duration.standardSeconds(60)); + } + + TestStream> testStream = kvBuilder.advanceWatermarkToInfinity(); + + PCollection> kvs = + pipeline.apply(String.format("Test stream %s", writerType), testStream); + + PCollection records = + kvs.apply( + String.format("To Record %s", writerType), + MapElements.into(TypeDescriptor.of(Solace.Record.class)) + .via(kv -> SolaceDataUtils.getSolaceRecord(kv.getValue(), kv.getKey()))); + + SolaceOutput result = + records.apply( + String.format("Write to Solace %s", writerType), + SolaceIO.write() + .to(Solace.Topic.fromName(TOPIC_NAME)) + .withSubmissionMode(SolaceIO.SubmissionMode.TESTING) + .withWriterType(writerType) + .withDeliveryMode(DeliveryMode.PERSISTENT) + .withNumberOfClientsPerWorker(1) + .withMaxNumOfUsedWorkers(1) + .withSessionServiceFactory( + BasicAuthJcsmpSessionServiceFactory.builder() + .host("localhost:" + solaceContainerManager.jcsmpPortMapped) + .username(SolaceContainerManager.USERNAME) + .password(SolaceContainerManager.PASSWORD) + .vpnName(SolaceContainerManager.VPN_NAME) + .build())); + result + .getSuccessfulPublish() + .apply( + String.format("Get ids %s", writerType), + MapElements.into(strings()).via(Solace.PublishResult::getMessageId)) + .apply( + String.format("Count %s", writerType), + ParDo.of(new CountingFn<>(NAMESPACE, WRITE_COUNT))); + + return pipeline; } private static class CountingFn extends DoFn {