Skip to content

Commit

Permalink
Solace Read connector: watermark-calculation related classes (#31595)
Browse files Browse the repository at this point in the history
Solace Read connector: watermark-calculation related classes
  • Loading branch information
bzablocki authored Jun 17, 2024
1 parent 3a53181 commit 372ebd8
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** {@code WatermarkParameters} contains the parameters used for watermark computation. */
@AutoValue
abstract class WatermarkParameters<T> implements Serializable {

private static final Duration STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD =
Duration.standardSeconds(30);

abstract Instant getCurrentWatermark();

abstract Instant getLastSavedWatermark();

abstract Instant getLastUpdateTime();

abstract SerializableFunction<T, Instant> getTimestampFn();

abstract Duration getWatermarkIdleDurationThreshold();

abstract Builder<T> toBuilder();

static <T> Builder<T> builder() {
return new AutoValue_WatermarkParameters.Builder<T>()
.setCurrentWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE)
.setLastSavedWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE)
.setLastUpdateTime(Instant.now())
.setWatermarkIdleDurationThreshold(STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD);
}

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setCurrentWatermark(Instant currentWatermark);

abstract Builder<T> setLastSavedWatermark(Instant eventTime);

abstract Builder<T> setLastUpdateTime(Instant now);

abstract Builder<T> setWatermarkIdleDurationThreshold(Duration watermarkIdleDurationThreshold);

abstract Builder<T> setTimestampFn(SerializableFunction<T, Instant> timestampFn);

abstract WatermarkParameters<T> build();
}

/**
* Create an instance of {@link WatermarkParameters} with a {@code SerializableFunction} to
* extract the event time.
*/
static <T> WatermarkParameters<T> create(SerializableFunction<T, Instant> timestampFn) {
Preconditions.checkArgument(timestampFn != null, "timestampFn function is null");
return WatermarkParameters.<T>builder().setTimestampFn(timestampFn).build();
}

/**
* Specify the watermark idle duration to consider before advancing the watermark. The default
* watermark idle duration threshold is {@link #STANDARD_WATERMARK_IDLE_DURATION_THRESHOLD}.
*/
WatermarkParameters<T> withWatermarkIdleDurationThreshold(Duration idleDurationThreshold) {
Preconditions.checkArgument(
idleDurationThreshold != null, "watermark idle duration threshold is null");
return toBuilder().setWatermarkIdleDurationThreshold(idleDurationThreshold).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.solace.read;

import java.io.Serializable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Ordering;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

/**
* A class that manages the watermark for a Solace source.
*
* <p>The watermark is calculated based on the last saved watermark, the last update time, and the
* watermark idle duration threshold. If the last update time is before the watermark idle duration
* threshold, the watermark is set to the watermark idle duration threshold. Otherwise, the
* watermark is set to the last saved watermark.
*
* <p>The watermark is updated when a new record is received. The last saved watermark is set to the
* maximum of the current last saved watermark and the value resulting from the function calculating
* the timestamp from the record. The last update time is set to the current time when the watermark
* is updated.
*/
class WatermarkPolicy<T> implements Serializable {
private WatermarkParameters<T> watermarkParameters;

static <T> WatermarkPolicy<T> create(SerializableFunction<T, Instant> timestampFunction) {
return new WatermarkPolicy<T>(WatermarkParameters.<T>create(timestampFunction));
}

private WatermarkPolicy(WatermarkParameters<T> watermarkParameters) {
this.watermarkParameters = watermarkParameters;
}
/**
* Returns the current watermark.
*
* <p>The watermark is calculated based on the last saved watermark, the last update time, and the
* watermark idle duration threshold. If the last update time is before the watermark idle
* duration threshold, the watermark is set to the watermark idle duration threshold. Otherwise,
* the watermark is set to the last saved watermark.
*
* @return the current watermark
*/
Instant getWatermark() {
Instant now = Instant.now();
Instant watermarkIdleThreshold =
now.minus(watermarkParameters.getWatermarkIdleDurationThreshold());

Instant newWatermark =
watermarkParameters.getLastUpdateTime().isBefore(watermarkIdleThreshold)
? watermarkIdleThreshold
: watermarkParameters.getLastSavedWatermark();

if (newWatermark.isAfter(watermarkParameters.getCurrentWatermark())) {
watermarkParameters =
watermarkParameters.toBuilder().setCurrentWatermark(newWatermark).build();
}
return watermarkParameters.getCurrentWatermark();
}

/**
* Updates the watermark based on the provided record.
*
* <p>This method updates the last saved watermark and the last update time based on the timestamp
* function for the provided record. The last saved watermark is set to the maximum of the current
* last saved watermark and the timestamp of the record. The last update time is set to the
* current time.
*
* @param record The record to update the watermark with.
*/
void update(@Nullable T record) {
if (record == null) {
return;
}
watermarkParameters =
watermarkParameters
.toBuilder()
.setLastSavedWatermark(
Ordering.natural()
.max(
watermarkParameters.getLastSavedWatermark(),
watermarkParameters.getTimestampFn().apply(record)))
.setLastUpdateTime(Instant.now())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Solace IO connector - read connector classes. */
package org.apache.beam.sdk.io.solace.read;

0 comments on commit 372ebd8

Please sign in to comment.