diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java index bb9f0c6ea689..dcfdcc4fabb9 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -393,7 +394,8 @@ public class SolaceIO { } }; private static final boolean DEFAULT_DEDUPLICATE_RECORDS = false; - + private static final Duration DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD = + Duration.standardSeconds(30); public static final int DEFAULT_WRITER_MAX_NUMBER_OF_WORKERS = 20; public static final int DEFAULT_WRITER_CLIENTS_PER_WORKER = 4; public static final Boolean DEFAULT_WRITER_PUBLISH_LATENCY_METRICS = false; @@ -440,7 +442,8 @@ public static Read read() { .setTypeDescriptor(TypeDescriptor.of(Solace.Record.class)) .setParseFn(SolaceRecordMapper::map) .setTimestampFn(SENDER_TIMESTAMP_FUNCTION) - .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)); + .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } /** * Create a {@link Read} transform, to read from Solace. Specify a {@link SerializableFunction} to @@ -467,7 +470,8 @@ public static Read read( .setTypeDescriptor(typeDescriptor) .setParseFn(parseFn) .setTimestampFn(timestampFn) - .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS)); + .setDeduplicateRecords(DEFAULT_DEDUPLICATE_RECORDS) + .setWatermarkIdleDurationThreshold(DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD)); } /** @@ -540,6 +544,19 @@ public Read withMaxNumConnections(Integer maxNumConnections) { return this; } + /** + * Optional. Denotes the duration for which the watermark can be idle. If there are no incoming + * messages for this ‘idle’ period of time, the watermark is set to a timestamp representing a + * time earlier than now by the ‘idle’ period of time (e.g. if the ‘idle’ period of time is set + * to 30 seconds, and there is no new data incoming for 30 seconds, the watermark will be set to + * max(currentWatermark, now() - 30 seconds). The default watermark idle duration threshold is + * {@link #DEFAULT_WATERMARK_IDLE_DURATION_THRESHOLD}. + */ + public Read withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) { + configurationBuilder.setWatermarkIdleDurationThreshold(idleDurationThreshold); + return this; + } + /** * Optional, default: false. Set to deduplicate messages based on the {@link * BytesXMLMessage#getApplicationMessageId()} of the incoming {@link BytesXMLMessage}. If the @@ -652,6 +669,8 @@ abstract static class Configuration { abstract TypeDescriptor getTypeDescriptor(); + abstract Duration getWatermarkIdleDurationThreshold(); + public static Builder builder() { Builder builder = new org.apache.beam.sdk.io.solace.AutoValue_SolaceIO_Read_Configuration.Builder(); @@ -680,6 +699,8 @@ abstract Builder setParseFn( abstract Builder setTypeDescriptor(TypeDescriptor typeDescriptor); + abstract Builder setWatermarkIdleDurationThreshold(Duration idleDurationThreshold); + abstract Configuration build(); } } @@ -716,6 +737,7 @@ public PCollection expand(PBegin input) { configuration.getDeduplicateRecords(), coder, configuration.getTimestampFn(), + configuration.getWatermarkIdleDurationThreshold(), configuration.getParseFn()))); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java index 0155345a2323..c18a9d110b2a 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java @@ -62,7 +62,9 @@ class UnboundedSolaceReader extends UnboundedReader { public UnboundedSolaceReader(UnboundedSolaceSource currentSource) { this.currentSource = currentSource; - this.watermarkPolicy = WatermarkPolicy.create(currentSource.getTimestampFn()); + this.watermarkPolicy = + WatermarkPolicy.create( + currentSource.getTimestampFn(), currentSource.getWatermarkIdleDurationThreshold()); this.sessionService = currentSource.getSessionServiceFactory().create(); this.sempClient = currentSource.getSempClientFactory().create(); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java index 370159994941..1cb17a49fbdb 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceSource.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.SerializableFunction; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ public class UnboundedSolaceSource extends UnboundedSource timestampFn; + private final Duration watermarkIdleDurationThreshold; private final SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn; public Queue getQueue() { @@ -64,6 +66,10 @@ public SerializableFunction getTimestampFn() { return timestampFn; } + public Duration getWatermarkIdleDurationThreshold() { + return watermarkIdleDurationThreshold; + } + public SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> getParseFn() { return parseFn; } @@ -76,6 +82,7 @@ public UnboundedSolaceSource( boolean enableDeduplication, Coder coder, SerializableFunction timestampFn, + Duration watermarkIdleDurationThreshold, SerializableFunction<@Nullable BytesXMLMessage, @Nullable T> parseFn) { this.queue = queue; this.sempClientFactory = sempClientFactory; @@ -84,6 +91,7 @@ public UnboundedSolaceSource( this.enableDeduplication = enableDeduplication; this.coder = coder; this.timestampFn = timestampFn; + this.watermarkIdleDurationThreshold = watermarkIdleDurationThreshold; this.parseFn = parseFn; } @@ -125,6 +133,7 @@ private List> getSolaceSources( enableDeduplication, coder, timestampFn, + watermarkIdleDurationThreshold, parseFn); sourceList.add(source); } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java index f58cb1cc202d..29b35d883f22 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkParameters.java @@ -21,7 +21,6 @@ import java.io.Serializable; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.joda.time.Duration; import org.joda.time.Instant; @@ -29,9 +28,6 @@ @AutoValue abstract class WatermarkParameters implements Serializable { - private static final Duration STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD = - Duration.standardSeconds(30); - abstract Instant getCurrentWatermark(); abstract Instant getLastSavedWatermark(); @@ -48,8 +44,7 @@ static Builder builder() { return new AutoValue_WatermarkParameters.Builder() .setCurrentWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE) .setLastSavedWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE) - .setLastUpdateTime(Instant.now()) - .setWatermarkIdleDurationThreshold(STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD); + .setLastUpdateTime(Instant.now()); } @AutoValue.Builder @@ -66,23 +61,4 @@ abstract static class Builder { abstract WatermarkParameters build(); } - - /** - * Create an instance of {@link WatermarkParameters} with a {@code SerializableFunction} to - * extract the event time. - */ - static WatermarkParameters create(SerializableFunction timestampFn) { - Preconditions.checkArgument(timestampFn != null, "timestampFn function is null"); - return WatermarkParameters.builder().setTimestampFn(timestampFn).build(); - } - - /** - * Specify the watermark idle duration to consider before advancing the watermark. The default - * watermark idle duration threshold is {@link #STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD}. - */ - WatermarkParameters withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) { - Preconditions.checkArgument( - idleDurationThreshold != null, "watermark idle duration threshold is null"); - return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build(); - } } diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java index 13d65639e335..9d2ed24f3c06 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/WatermarkPolicy.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -39,8 +40,13 @@ class WatermarkPolicy implements Serializable { private WatermarkParameters watermarkParameters; - static WatermarkPolicy create(SerializableFunction timestampFunction) { - return new WatermarkPolicy(WatermarkParameters.create(timestampFunction)); + static WatermarkPolicy create( + SerializableFunction timestampFunction, Duration watermarkIdleDurationThreshold) { + return new WatermarkPolicy( + WatermarkParameters.builder() + .setTimestampFn(timestampFunction) + .setWatermarkIdleDurationThreshold(watermarkIdleDurationThreshold) + .build()); } private WatermarkPolicy(WatermarkParameters watermarkParameters) { diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java index bd9d5d401b54..cc1fa1d667aa 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/SolaceIOTest.java @@ -95,6 +95,7 @@ private static UnboundedSolaceSource getSource(Read spec, TestPi configuration.getDeduplicateRecords(), spec.inferCoder(pipeline, configuration.getTypeDescriptor()), configuration.getTimestampFn(), + configuration.getWatermarkIdleDurationThreshold(), configuration.getParseFn()); } @@ -527,7 +528,7 @@ public void testCheckpointMarkSafety() throws Exception { @Test public void testDefaultCoder() { Coder coder = - new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null) + new UnboundedSolaceSource<>(null, null, null, 0, false, null, null, null, null) .getCheckpointMarkCoder(); CoderProperties.coderSerializable(coder); }