From b058f0fcbd1ec03f5804856c3bc09066a61f271b Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Tue, 18 Jun 2024 14:16:51 +0200 Subject: [PATCH] Add SolaceCheckpointMark for handling Solace checkpointing (#31621) --- sdks/java/io/solace/build.gradle | 1 + .../io/solace/read/SolaceCheckpointMark.java | 94 +++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index c49b79f96a3d..fbf096abd22f 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -34,4 +34,5 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.joda_time implementation library.java.solace + implementation project(":sdks:java:extensions:avro") } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java new file mode 100644 index 000000000000..f429df3f8cd1 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.read; + +import com.solacesystems.jcsmp.BytesXMLMessage; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Checkpoint for an unbounded Solace source. Consists of the Solace messages waiting to be + * acknowledged. + */ +@DefaultCoder(AvroCoder.class) +class SolaceCheckpointMark implements UnboundedSource.CheckpointMark { + private transient AtomicBoolean activeReader; + // BytesXMLMessage is not serializable so if a job restarts from the checkpoint, we cannot retry + // these messages here. We relay on Solace's retry mechanism. + private transient ArrayDeque ackQueue; + + @SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction + private SolaceCheckpointMark() {} + + /** + * Creates a new {@link SolaceCheckpointMark}. + * + * @param activeReader {@link AtomicBoolean} indicating if the related reader is active. The + * reader creating the messages has to be active to acknowledge the messages. + * @param ackQueue {@link List} of {@link BytesXMLMessage} to be acknowledged. + */ + SolaceCheckpointMark(AtomicBoolean activeReader, List ackQueue) { + this.activeReader = activeReader; + this.ackQueue = new ArrayDeque<>(ackQueue); + } + + @Override + public void finalizeCheckpoint() { + if (activeReader == null || !activeReader.get() || ackQueue == null) { + return; + } + + while (!ackQueue.isEmpty()) { + BytesXMLMessage msg = ackQueue.poll(); + if (msg != null) { + msg.ackMessage(); + } + } + } + + @Override + public boolean equals(@Nullable Object o) { + if (o == null) { + return false; + } + if (this == o) { + return true; + } + if (!(o instanceof SolaceCheckpointMark)) { + return false; + } + SolaceCheckpointMark that = (SolaceCheckpointMark) o; + // Needed to convert to ArrayList because ArrayDeque.equals checks only for reference, not + // content. + ArrayList ackList = new ArrayList<>(ackQueue); + ArrayList thatAckList = new ArrayList<>(that.ackQueue); + return Objects.equals(activeReader, that.activeReader) && Objects.equals(ackList, thatAckList); + } + + @Override + public int hashCode() { + return Objects.hash(activeReader, ackQueue); + } +}