diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 938b6e1fd63d..436f0bda8537 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.10' + project.version = '200.39.1.3' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index ccde0df35d21..a95f3ca3ec88 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.10 -sdk_version=2.45.10 +version=200.39.1.3 +sdk_version=200.39.1.3 javaVersion=1.8 diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java index 873c5f3005f6..36ba19d7da3c 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java @@ -17,26 +17,7 @@ */ package org.apache.beam.runners.samza.runtime; -import java.util.Collection; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; -import org.apache.samza.operators.Scheduler; -import org.joda.time.Duration; import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a @@ -51,283 +32,43 @@ * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when * all the future emitted by the processElement is resolved. * - *

This class is not thread safe and the current implementation relies on the assumption that - * messages are dispatched to BundleManager in a single threaded mode. - * * @param output type of the {@link DoFnOp} */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public class BundleManager { - private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class); - private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L; - - private final long maxBundleSize; - private final long maxBundleTimeMs; - private final BundleProgressListener bundleProgressListener; - private final FutureCollector futureCollector; - private final Scheduler> bundleTimerScheduler; - private final String bundleCheckTimerId; - - // Number elements belonging to the current active bundle - private transient AtomicLong currentBundleElementCount; - // Number of bundles that are in progress but not yet finished - private transient AtomicLong pendingBundleCount; - // Denotes the start time of the current active bundle - private transient AtomicLong bundleStartTime; - // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple - // bundle in progress. - // This flag denotes if there is a bundle that is current and hasn't been closed. - private transient AtomicBoolean isBundleStarted; - // Holder for watermark which gets propagated when the bundle is finished. - private transient Instant bundleWatermarkHold; - // A future that is completed once all futures belonging to the current active bundle are - // completed. The value is null if there are no futures in the current active bundle. - private transient AtomicReference> currentActiveBundleDoneFutureReference; - private transient CompletionStage watermarkFuture; - - public BundleManager( - BundleProgressListener bundleProgressListener, - FutureCollector futureCollector, - long maxBundleSize, - long maxBundleTimeMs, - Scheduler> bundleTimerScheduler, - String bundleCheckTimerId) { - this.maxBundleSize = maxBundleSize; - this.maxBundleTimeMs = maxBundleTimeMs; - this.bundleProgressListener = bundleProgressListener; - this.bundleTimerScheduler = bundleTimerScheduler; - this.bundleCheckTimerId = bundleCheckTimerId; - this.futureCollector = futureCollector; - - if (maxBundleSize > 1) { - scheduleNextBundleCheck(); - } - - // instance variable initialization for bundle tracking - this.bundleStartTime = new AtomicLong(Long.MAX_VALUE); - this.currentActiveBundleDoneFutureReference = new AtomicReference<>(); - this.currentBundleElementCount = new AtomicLong(0L); - this.isBundleStarted = new AtomicBoolean(false); - this.pendingBundleCount = new AtomicLong(0L); - this.watermarkFuture = CompletableFuture.completedFuture(null); - } +public interface BundleManager { + /** Starts a new bundle if not already started, then adds an element to the existing bundle. */ + void tryStartBundle(); - /* - * Schedule in processing time to check whether the current bundle should be closed. Note that - * we only approximately achieve max bundle time by checking as frequent as half of the max bundle - * time set by users. This would violate the max bundle time by up to half of it but should - * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle). + /** + * Signals a watermark event arrived. The BundleManager will decide if the watermark needs to be + * processed, and notify the listener if needed. + * + * @param watermark + * @param emitter */ - private void scheduleNextBundleCheck() { - final Instant nextBundleCheckTime = - Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS)); - final TimerInternals.TimerData timerData = - TimerInternals.TimerData.of( - this.bundleCheckTimerId, - StateNamespaces.global(), - nextBundleCheckTime, - nextBundleCheckTime, - TimeDomain.PROCESSING_TIME); - bundleTimerScheduler.schedule( - new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); - } - - void tryStartBundle() { - futureCollector.prepare(); - - if (isBundleStarted.compareAndSet(false, true)) { - LOG.debug("Starting a new bundle."); - // make sure the previous bundle is sealed and futures are cleared - Preconditions.checkArgument( - currentActiveBundleDoneFutureReference.get() == null, - "Current active bundle done future should be null before starting a new bundle."); - bundleStartTime.set(System.currentTimeMillis()); - pendingBundleCount.incrementAndGet(); - bundleProgressListener.onBundleStarted(); - } - - currentBundleElementCount.incrementAndGet(); - } - - void processWatermark(Instant watermark, OpEmitter emitter) { - // propagate watermark immediately if no bundle is in progress and all the previous bundles have - // completed. - if (!isBundleStarted() && pendingBundleCount.get() == 0) { - LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark); - bundleProgressListener.onWatermark(watermark, emitter); - return; - } - - // hold back the watermark since there is either a bundle in progress or previously closed - // bundles are unfinished. - this.bundleWatermarkHold = watermark; - - // for batch mode, the max watermark should force the bundle to close - if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) { - /* - * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark - * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark. - * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures. - */ - if (isBundleStarted()) { - LOG.info( - "Received max watermark. Triggering finish bundle before flushing the watermark downstream."); - tryFinishBundle(emitter); - watermarkFuture.toCompletableFuture().join(); - } else { - LOG.info( - "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream."); - watermarkFuture.toCompletableFuture().join(); - bundleProgressListener.onWatermark(watermark, emitter); - } - } - } - - void processTimer(KeyedTimerData keyedTimerData, OpEmitter emitter) { - // this is internal timer in processing time to check whether a bundle should be closed - if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) { - tryFinishBundle(emitter); - scheduleNextBundleCheck(); - } - } + void processWatermark(Instant watermark, OpEmitter emitter); /** - * Signal the bundle manager to handle failure. We discard the output collected as part of - * processing the current element and reset the bundle count. + * Signals the BundleManager that a timer is up. * - * @param t failure cause + * @param keyedTimerData + * @param emitter */ - void signalFailure(Throwable t) { - LOG.error("Encountered error during processing the message. Discarding the output due to: ", t); - futureCollector.discard(); - // reset the bundle start flag only if the bundle has started - isBundleStarted.compareAndSet(true, false); - - // bundle start may not necessarily mean we have actually started the bundle since some of the - // invariant check conditions within bundle start could throw exceptions. so rely on bundle - // start time - if (bundleStartTime.get() != Long.MAX_VALUE) { - currentBundleElementCount.set(0L); - bundleStartTime.set(Long.MAX_VALUE); - pendingBundleCount.decrementAndGet(); - currentActiveBundleDoneFutureReference.set(null); - } - } - - void tryFinishBundle(OpEmitter emitter) { - - // we need to seal the output for each element within a bundle irrespective of the whether we - // decide to finish the - // bundle or not - CompletionStage>> outputFuture = futureCollector.finish(); - - if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) { - LOG.debug("Finishing the current bundle."); + void processTimer(KeyedTimerData keyedTimerData, OpEmitter emitter); - // reset the bundle count - // seal the bundle and emit the result future (collection of results) - // chain the finish bundle invocation on the finish bundle - currentBundleElementCount.set(0L); - bundleStartTime.set(Long.MAX_VALUE); - Instant watermarkHold = bundleWatermarkHold; - bundleWatermarkHold = null; - - CompletionStage currentActiveBundleDoneFuture = - currentActiveBundleDoneFutureReference.get(); - outputFuture = - outputFuture.thenCombine( - currentActiveBundleDoneFuture != null - ? currentActiveBundleDoneFuture - : CompletableFuture.completedFuture(null), - (res, ignored) -> { - bundleProgressListener.onBundleFinished(emitter); - return res; - }); - - BiConsumer>, Void> watermarkPropagationFn; - if (watermarkHold == null) { - watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet(); - } else { - watermarkPropagationFn = - (ignored, res) -> { - LOG.debug("Propagating watermark: {} to downstream.", watermarkHold); - bundleProgressListener.onWatermark(watermarkHold, emitter); - pendingBundleCount.decrementAndGet(); - }; - } - - // We chain the current watermark emission with previous watermark and the output futures - // since bundles can finish out of order but we still want the watermark to be emitted in - // order. - watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn); - currentActiveBundleDoneFutureReference.set(null); - } else if (isBundleStarted.get()) { - final CompletableFuture>> finalOutputFuture = - outputFuture.toCompletableFuture(); - currentActiveBundleDoneFutureReference.updateAndGet( - maybePrevFuture -> { - CompletableFuture prevFuture = - maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null); - - return CompletableFuture.allOf(prevFuture, finalOutputFuture); - }); - } - - // emit the future to the propagate it to rest of the DAG - emitter.emitFuture(outputFuture); - } - - @VisibleForTesting - long getCurrentBundleElementCount() { - return currentBundleElementCount.longValue(); - } - - @VisibleForTesting - @Nullable - CompletionStage getCurrentBundleDoneFuture() { - return currentActiveBundleDoneFutureReference.get(); - } - - @VisibleForTesting - void setCurrentBundleDoneFuture(CompletableFuture currentBundleResultFuture) { - this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture); - } - - @VisibleForTesting - long getPendingBundleCount() { - return pendingBundleCount.longValue(); - } - - @VisibleForTesting - void setPendingBundleCount(long value) { - pendingBundleCount.set(value); - } - - @VisibleForTesting - boolean isBundleStarted() { - return isBundleStarted.get(); - } - - @VisibleForTesting - void setBundleWatermarkHold(Instant watermark) { - this.bundleWatermarkHold = watermark; - } + /** + * Fails the current bundle, throws away the pending output, and resets the bundle to an empty + * state. + * + * @param t the throwable that caused the failure. + */ + void signalFailure(Throwable t); /** - * We close the current bundle in progress if one of the following criteria is met 1. The bundle - * count ≥ maxBundleSize 2. Time elapsed since the bundle started is ≥ maxBundleTimeMs 3. - * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs + * Tries to close the bundle, and reset the bundle to an empty state. * - * @return true - if one of the criteria above is satisfied; false - otherwise + * @param emitter */ - private boolean shouldFinishBundle() { - return isBundleStarted.get() - && (currentBundleElementCount.get() >= maxBundleSize - || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs - || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold)); - } + void tryFinishBundle(OpEmitter emitter); /** * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle @@ -339,7 +80,7 @@ private boolean shouldFinishBundle() { * * @param */ - public interface BundleProgressListener { + interface BundleProgressListener { void onBundleStarted(); void onBundleFinished(OpEmitter emitter); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java new file mode 100644 index 000000000000..47899dd43c02 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.samza.operators.Scheduler; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @inheritDoc Implementation of BundleManager for non-portable mode. Keeps track of the async + * function completions. + *

This class is not thread safe and the current implementation relies on the assumption that + * messages are dispatched to BundleManager in a single threaded mode. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class ClassicBundleManager implements BundleManager { + private static final Logger LOG = LoggerFactory.getLogger(ClassicBundleManager.class); + private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L; + + private final long maxBundleSize; + private final long maxBundleTimeMs; + private final BundleProgressListener bundleProgressListener; + private final FutureCollector futureCollector; + private final Scheduler> bundleTimerScheduler; + private final String bundleCheckTimerId; + + // Number elements belonging to the current active bundle + private transient AtomicLong currentBundleElementCount; + // Number of bundles that are in progress but not yet finished + private transient AtomicLong pendingBundleCount; + // Denotes the start time of the current active bundle + private transient AtomicLong bundleStartTime; + // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple + // bundle in progress. + // This flag denotes if there is a bundle that is current and hasn't been closed. + private transient AtomicBoolean isBundleStarted; + // Holder for watermark which gets propagated when the bundle is finished. + private transient Instant bundleWatermarkHold; + // A future that is completed once all futures belonging to the current active bundle are + // completed. The value is null if there are no futures in the current active bundle. + private transient AtomicReference> currentActiveBundleDoneFutureReference; + private transient CompletionStage watermarkFuture; + + public ClassicBundleManager( + BundleProgressListener bundleProgressListener, + FutureCollector futureCollector, + long maxBundleSize, + long maxBundleTimeMs, + Scheduler> bundleTimerScheduler, + String bundleCheckTimerId) { + this.maxBundleSize = maxBundleSize; + this.maxBundleTimeMs = maxBundleTimeMs; + this.bundleProgressListener = bundleProgressListener; + this.bundleTimerScheduler = bundleTimerScheduler; + this.bundleCheckTimerId = bundleCheckTimerId; + this.futureCollector = futureCollector; + + if (maxBundleSize > 1) { + scheduleNextBundleCheck(); + } + + // instance variable initialization for bundle tracking + this.bundleStartTime = new AtomicLong(Long.MAX_VALUE); + this.currentActiveBundleDoneFutureReference = new AtomicReference<>(); + this.currentBundleElementCount = new AtomicLong(0L); + this.isBundleStarted = new AtomicBoolean(false); + this.pendingBundleCount = new AtomicLong(0L); + this.watermarkFuture = CompletableFuture.completedFuture(null); + } + + /* + * Schedule in processing time to check whether the current bundle should be closed. Note that + * we only approximately achieve max bundle time by checking as frequent as half of the max bundle + * time set by users. This would violate the max bundle time by up to half of it but should + * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle). + */ + private void scheduleNextBundleCheck() { + final Instant nextBundleCheckTime = + Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS)); + final TimerInternals.TimerData timerData = + TimerInternals.TimerData.of( + this.bundleCheckTimerId, + StateNamespaces.global(), + nextBundleCheckTime, + nextBundleCheckTime, + TimeDomain.PROCESSING_TIME); + bundleTimerScheduler.schedule( + new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); + } + + @Override + public void tryStartBundle() { + futureCollector.prepare(); + + if (isBundleStarted.compareAndSet(false, true)) { + LOG.debug("Starting a new bundle."); + // make sure the previous bundle is sealed and futures are cleared + Preconditions.checkArgument( + currentActiveBundleDoneFutureReference.get() == null, + "Current active bundle done future should be null before starting a new bundle."); + bundleStartTime.set(System.currentTimeMillis()); + pendingBundleCount.incrementAndGet(); + bundleProgressListener.onBundleStarted(); + } + + currentBundleElementCount.incrementAndGet(); + } + + @Override + public void processWatermark(Instant watermark, OpEmitter emitter) { + // propagate watermark immediately if no bundle is in progress and all the previous bundles have + // completed. + if (!isBundleStarted() && pendingBundleCount.get() == 0) { + LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark); + bundleProgressListener.onWatermark(watermark, emitter); + return; + } + + // hold back the watermark since there is either a bundle in progress or previously closed + // bundles are unfinished. + this.bundleWatermarkHold = watermark; + + // for batch mode, the max watermark should force the bundle to close + if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) { + /* + * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark + * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark. + * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures. + */ + if (isBundleStarted()) { + LOG.info( + "Received max watermark. Triggering finish bundle before flushing the watermark downstream."); + tryFinishBundle(emitter); + watermarkFuture.toCompletableFuture().join(); + } else { + LOG.info( + "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream."); + watermarkFuture.toCompletableFuture().join(); + bundleProgressListener.onWatermark(watermark, emitter); + } + } + } + + @Override + public void processTimer(KeyedTimerData keyedTimerData, OpEmitter emitter) { + // this is internal timer in processing time to check whether a bundle should be closed + if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) { + tryFinishBundle(emitter); + scheduleNextBundleCheck(); + } + } + + /** + * Signal the bundle manager to handle failure. We discard the output collected as part of + * processing the current element and reset the bundle count. + * + * @param t failure cause + */ + @Override + public void signalFailure(Throwable t) { + LOG.error("Encountered error during processing the message. Discarding the output due to: ", t); + futureCollector.discard(); + // reset the bundle start flag only if the bundle has started + isBundleStarted.compareAndSet(true, false); + + // bundle start may not necessarily mean we have actually started the bundle since some of the + // invariant check conditions within bundle start could throw exceptions. so rely on bundle + // start time + if (bundleStartTime.get() != Long.MAX_VALUE) { + currentBundleElementCount.set(0L); + bundleStartTime.set(Long.MAX_VALUE); + pendingBundleCount.decrementAndGet(); + currentActiveBundleDoneFutureReference.set(null); + } + } + + @Override + public void tryFinishBundle(OpEmitter emitter) { + + // we need to seal the output for each element within a bundle irrespective of the whether we + // decide to finish the + // bundle or not + CompletionStage>> outputFuture = futureCollector.finish(); + + if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) { + LOG.debug("Finishing the current bundle."); + + // reset the bundle count + // seal the bundle and emit the result future (collection of results) + // chain the finish bundle invocation on the finish bundle + currentBundleElementCount.set(0L); + bundleStartTime.set(Long.MAX_VALUE); + Instant watermarkHold = bundleWatermarkHold; + bundleWatermarkHold = null; + + CompletionStage currentActiveBundleDoneFuture = + currentActiveBundleDoneFutureReference.get(); + outputFuture = + outputFuture.thenCombine( + currentActiveBundleDoneFuture != null + ? currentActiveBundleDoneFuture + : CompletableFuture.completedFuture(null), + (res, ignored) -> { + bundleProgressListener.onBundleFinished(emitter); + return res; + }); + + BiConsumer>, Void> watermarkPropagationFn; + if (watermarkHold == null) { + watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet(); + } else { + watermarkPropagationFn = + (ignored, res) -> { + LOG.debug("Propagating watermark: {} to downstream.", watermarkHold); + bundleProgressListener.onWatermark(watermarkHold, emitter); + pendingBundleCount.decrementAndGet(); + }; + } + + // We chain the current watermark emission with previous watermark and the output futures + // since bundles can finish out of order but we still want the watermark to be emitted in + // order. + watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn); + currentActiveBundleDoneFutureReference.set(null); + } else if (isBundleStarted.get()) { + final CompletableFuture>> finalOutputFuture = + outputFuture.toCompletableFuture(); + currentActiveBundleDoneFutureReference.updateAndGet( + maybePrevFuture -> { + CompletableFuture prevFuture = + maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null); + + return CompletableFuture.allOf(prevFuture, finalOutputFuture); + }); + } + + // emit the future to the propagate it to rest of the DAG + emitter.emitFuture(outputFuture); + } + + @VisibleForTesting + long getCurrentBundleElementCount() { + return currentBundleElementCount.longValue(); + } + + @VisibleForTesting + @Nullable + CompletionStage getCurrentBundleDoneFuture() { + return currentActiveBundleDoneFutureReference.get(); + } + + @VisibleForTesting + void setCurrentBundleDoneFuture(CompletableFuture currentBundleResultFuture) { + this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture); + } + + @VisibleForTesting + long getPendingBundleCount() { + return pendingBundleCount.longValue(); + } + + @VisibleForTesting + void setPendingBundleCount(long value) { + pendingBundleCount.set(value); + } + + @VisibleForTesting + boolean isBundleStarted() { + return isBundleStarted.get(); + } + + @VisibleForTesting + void setBundleWatermarkHold(Instant watermark) { + this.bundleWatermarkHold = watermark; + } + + /** + * We close the current bundle in progress if one of the following criteria is met 1. The bundle + * count ≥ maxBundleSize 2. Time elapsed since the bundle started is ≥ maxBundleTimeMs 3. + * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs + * + * @return true - if one of the criteria above is satisfied; false - otherwise + */ + private boolean shouldFinishBundle() { + return isBundleStarted.get() + && (currentBundleElementCount.get() >= maxBundleSize + || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs + || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold)); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 35661ae86fe1..c5fda797bddd 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -26,9 +26,7 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.DoFnRunner; @@ -46,7 +44,6 @@ import org.apache.beam.runners.samza.SamzaExecutionContext; import org.apache.beam.runners.samza.SamzaPipelineOptions; import org.apache.beam.runners.samza.util.DoFnUtils; -import org.apache.beam.runners.samza.util.FutureUtils; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; @@ -200,13 +197,20 @@ public void open( final FutureCollector outputFutureCollector = createFutureCollector(); this.bundleManager = - new BundleManager<>( - createBundleProgressListener(), - outputFutureCollector, - samzaPipelineOptions.getMaxBundleSize(), - samzaPipelineOptions.getMaxBundleTimeMs(), - timerRegistry, - bundleCheckTimerId); + isPortable + ? new PortableBundleManager<>( + createBundleProgressListener(), + samzaPipelineOptions.getMaxBundleSize(), + samzaPipelineOptions.getMaxBundleTimeMs(), + timerRegistry, + bundleCheckTimerId) + : new ClassicBundleManager<>( + createBundleProgressListener(), + outputFutureCollector, + samzaPipelineOptions.getMaxBundleSize(), + samzaPipelineOptions.getMaxBundleTimeMs(), + timerRegistry, + bundleCheckTimerId); this.timerInternalsFactory = SamzaTimerInternalsFactory.createTimerInternalFactory( @@ -484,77 +488,6 @@ static CompletionStage> createOutputFuture( windowedValue.getPane())); } - static class FutureCollectorImpl implements FutureCollector { - private final AtomicBoolean collectorSealed; - private CompletionStage>> outputFuture; - - FutureCollectorImpl() { - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - collectorSealed = new AtomicBoolean(true); - } - - @Override - public void add(CompletionStage> element) { - checkState( - !collectorSealed.get(), - "Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements."); - - // We need synchronize guard against scenarios when watermark/finish bundle trigger outputs. - synchronized (this) { - outputFuture = - outputFuture.thenCombine( - element, - (collection, event) -> { - collection.add(event); - return collection; - }); - } - } - - @Override - public void addAll(CompletionStage>> elements) { - checkState( - !collectorSealed.get(), - "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements."); - - synchronized (this) { - outputFuture = FutureUtils.combineFutures(outputFuture, elements); - } - } - - @Override - public void discard() { - collectorSealed.compareAndSet(false, true); - - synchronized (this) { - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - } - } - - @Override - public CompletionStage>> finish() { - /* - * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op - * and an empty collection will be returned. - */ - collectorSealed.compareAndSet(false, true); - - synchronized (this) { - final CompletionStage>> sealedOutputFuture = outputFuture; - outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); - return sealedOutputFuture; - } - } - - @Override - public void prepare() { - boolean isCollectorSealed = collectorSealed.compareAndSet(true, false); - checkState( - isCollectorSealed, - "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked."); - } - } - /** * Factory class to create an {@link org.apache.beam.runners.core.DoFnRunners.OutputManager} that * emits values to the main output only, which is a single {@link diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java new file mode 100644 index 000000000000..10f915a8baf9 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/FutureCollectorImpl.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.samza.util.FutureUtils; +import org.apache.beam.sdk.util.WindowedValue; + +class FutureCollectorImpl implements FutureCollector { + private final AtomicBoolean collectorSealed; + private CompletionStage>> outputFuture; + + FutureCollectorImpl() { + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + collectorSealed = new AtomicBoolean(true); + } + + @Override + public void add(CompletionStage> element) { + checkState( + !collectorSealed.get(), + "Cannot add element to an unprepared collector. Make sure prepare() is invoked before adding elements."); + + // We need synchronize guard against scenarios when watermark/finish bundle trigger outputs. + synchronized (this) { + outputFuture = + outputFuture.thenCombine( + element, + (collection, event) -> { + collection.add(event); + return collection; + }); + } + } + + @Override + public void addAll(CompletionStage>> elements) { + checkState( + !collectorSealed.get(), + "Cannot add elements to an unprepared collector. Make sure prepare() is invoked before adding elements."); + + synchronized (this) { + outputFuture = FutureUtils.combineFutures(outputFuture, elements); + } + } + + @Override + public void discard() { + collectorSealed.compareAndSet(false, true); + + synchronized (this) { + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + } + } + + @Override + public CompletionStage>> finish() { + /* + * We can ignore the results here because its okay to call finish without invoking prepare. It will be a no-op + * and an empty collection will be returned. + */ + collectorSealed.compareAndSet(false, true); + + synchronized (this) { + final CompletionStage>> sealedOutputFuture = outputFuture; + outputFuture = CompletableFuture.completedFuture(new ArrayList<>()); + return sealedOutputFuture; + } + } + + @Override + public void prepare() { + boolean isCollectorSealed = collectorSealed.compareAndSet(true, false); + checkState( + isCollectorSealed, + "Failed to prepare the collector. Collector needs to be sealed before prepare() is invoked."); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java new file mode 100644 index 000000000000..92d6c35cc104 --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.samza.operators.Scheduler; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a + * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at + * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous + * bundles have completed. + * + *

This class is not thread safe and the current implementation relies on the assumption that + * messages are dispatched to BundleManager in a single threaded mode. + * + * @param output type of the {@link DoFnOp} + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class PortableBundleManager implements BundleManager { + private static final Logger LOG = LoggerFactory.getLogger(PortableBundleManager.class); + private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L; + + private final long maxBundleSize; + private final long maxBundleTimeMs; + private final BundleProgressListener bundleProgressListener; + private final Scheduler> bundleTimerScheduler; + private final String bundleCheckTimerId; + + // Number elements belonging to the current active bundle + private long currentBundleElementCount; + // Number of bundles that are in progress but not yet finished + private long pendingBundleCount; + // Denotes the start time of the current active bundle + private long bundleStartTime; + // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple + // bundle in progress. + // This flag denotes if there is a bundle that is current and hasn't been closed. + private boolean isBundleStarted; + // Holder for watermark which gets propagated when the bundle is finished. + private Instant bundleWatermarkHold; + + public PortableBundleManager( + BundleProgressListener bundleProgressListener, + long maxBundleSize, + long maxBundleTimeMs, + Scheduler> bundleTimerScheduler, + String bundleCheckTimerId) { + this.maxBundleSize = maxBundleSize; + this.maxBundleTimeMs = maxBundleTimeMs; + this.bundleProgressListener = bundleProgressListener; + this.bundleTimerScheduler = bundleTimerScheduler; + this.bundleCheckTimerId = bundleCheckTimerId; + + if (maxBundleSize > 1) { + scheduleNextBundleCheck(); + } + + // instance variable initialization for bundle tracking + this.bundleStartTime = Long.MAX_VALUE; + this.currentBundleElementCount = 0; + this.isBundleStarted = false; + this.pendingBundleCount = 0; + } + + /* + * Schedule in processing time to check whether the current bundle should be closed. Note that + * we only approximately achieve max bundle time by checking as frequent as half of the max bundle + * time set by users. This would violate the max bundle time by up to half of it but should + * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle). + */ + private void scheduleNextBundleCheck() { + final Instant nextBundleCheckTime = + Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS)); + final TimerInternals.TimerData timerData = + TimerInternals.TimerData.of( + this.bundleCheckTimerId, + StateNamespaces.global(), + nextBundleCheckTime, + nextBundleCheckTime, + TimeDomain.PROCESSING_TIME); + bundleTimerScheduler.schedule( + new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); + } + + @Override + public void tryStartBundle() { + + currentBundleElementCount++; + + if (!isBundleStarted) { + LOG.debug("Starting a new bundle."); + isBundleStarted = true; + bundleStartTime = System.currentTimeMillis(); + pendingBundleCount++; + bundleProgressListener.onBundleStarted(); + } + } + + @Override + public void processWatermark(Instant watermark, OpEmitter emitter) { + // propagate watermark immediately if no bundle is in progress and all the previous bundles have + // completed. + if (shouldProcessWatermark()) { + LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark); + bundleProgressListener.onWatermark(watermark, emitter); + return; + } + + // hold back the watermark since there is either a bundle in progress or previously closed + // bundles are unfinished. + this.bundleWatermarkHold = watermark; + } + + @Override + public void processTimer(KeyedTimerData keyedTimerData, OpEmitter emitter) { + // this is internal timer in processing time to check whether a bundle should be closed + if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) { + tryFinishBundle(emitter); + scheduleNextBundleCheck(); + } + } + + /** + * Signal the bundle manager to handle failure. We discard the output collected as part of + * processing the current element and reset the bundle count. + * + * @param t failure cause + */ + @Override + public void signalFailure(Throwable t) { + LOG.error("Encountered error during processing the message. Discarding the output due to: ", t); + + isBundleStarted = false; + currentBundleElementCount = 0; + bundleStartTime = Long.MAX_VALUE; + pendingBundleCount--; + } + + @Override + public void tryFinishBundle(OpEmitter emitter) { + if (shouldFinishBundle()) { + LOG.debug("Finishing the current bundle."); + isBundleStarted = false; + currentBundleElementCount = 0; + bundleStartTime = Long.MAX_VALUE; + + Instant watermarkHold = bundleWatermarkHold; + bundleWatermarkHold = null; + + pendingBundleCount--; + + bundleProgressListener.onBundleFinished(emitter); + if (watermarkHold != null) { + bundleProgressListener.onWatermark(watermarkHold, emitter); + } + } + } + + private boolean shouldProcessWatermark() { + return !isBundleStarted && pendingBundleCount == 0; + } + + /** + * We close the current bundle in progress if one of the following criteria is met 1. The bundle + * count ≥ maxBundleSize 2. Time elapsed since the bundle started is ≥ maxBundleTimeMs 3. + * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs + * + * @return true - if one of the criteria above is satisfied; false - otherwise + */ + private boolean shouldFinishBundle() { + return isBundleStarted + && (currentBundleElementCount >= maxBundleSize + || System.currentTimeMillis() - bundleStartTime >= maxBundleTimeMs + || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold)); + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java index 1d02102a1920..509b56f1e6a8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java @@ -101,7 +101,12 @@ public Config build() { config.put(JOB_ID, options.getJobInstance()); // bundle-related configs - config.putAll(createBundleConfig(options, config)); + if (!testIsPortable(options)) { + config.putAll(createBundleConfig(options, config)); + LOG.info("Set bundle-related configs for classic mode"); + } else { + LOG.info("Skipped bundle-related configs for portable mode"); + } // remove config overrides before serialization (LISAMZA-15259) options.setConfigOverride(new HashMap<>()); @@ -117,6 +122,15 @@ public Config build() { } } + static boolean testIsPortable(SamzaPipelineOptions options) { + Map override = options.getConfigOverride(); + if (override == null) { + return false; + } + + return Boolean.parseBoolean(override.getOrDefault("beam.portable.mode", "false")); + } + @VisibleForTesting static Map createBundleConfig( SamzaPipelineOptions options, Map config) { diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java index 6add2f079862..983275c7b472 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunnerTest.java @@ -205,7 +205,7 @@ public void testKeyedOutputFutures() { options.setNumThreadsForProcessElement(4); final OpEmitter opEmitter = new OpAdapter.OpEmitterImpl<>(); - final FutureCollector futureCollector = new DoFnOp.FutureCollectorImpl<>(); + final FutureCollector futureCollector = new FutureCollectorImpl<>(); futureCollector.prepare(); final AsyncDoFnRunner, Void> asyncDoFnRunner = diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java similarity index 93% rename from runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java rename to runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java index 91422097e83b..b62b9246eb06 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java @@ -42,24 +42,24 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; -/** Unit tests for {@linkplain BundleManager}. */ -public final class BundleManagerTest { +/** Unit tests for {@linkplain ClassicBundleManager}. */ +public final class ClassicBundleManagerTest { private static final long MAX_BUNDLE_SIZE = 3; private static final long MAX_BUNDLE_TIME_MS = 2000; private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer"; private FutureCollector mockFutureCollector; - private BundleManager bundleManager; - private BundleManager.BundleProgressListener bundleProgressListener; + private ClassicBundleManager bundleManager; + private ClassicBundleManager.BundleProgressListener bundleProgressListener; private Scheduler> mockScheduler; @Before public void setUp() { mockFutureCollector = mock(FutureCollector.class); - bundleProgressListener = mock(BundleManager.BundleProgressListener.class); + bundleProgressListener = mock(ClassicBundleManager.BundleProgressListener.class); mockScheduler = mock(Scheduler.class); bundleManager = - new BundleManager<>( + new ClassicBundleManager<>( bundleProgressListener, mockFutureCollector, MAX_BUNDLE_SIZE, @@ -69,7 +69,7 @@ public void setUp() { } @Test - public void testTryStartBundleStartsBundle() { + public void testWhenFirstTryStartBundleThenStartsBundle() { bundleManager.tryStartBundle(); verify(bundleProgressListener, times(1)).onBundleStarted(); @@ -82,29 +82,14 @@ public void testTryStartBundleStartsBundle() { assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted()); } - @Test - public void testTryStartBundleThrowsExceptionAndSignalError() { + @Test(expected = IllegalArgumentException.class) + public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() { bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null)); - try { - bundleManager.tryStartBundle(); - } catch (IllegalArgumentException e) { - bundleManager.signalFailure(e); - } - - // verify if the signal failure only resets appropriate attributes of bundle - verify(mockFutureCollector, times(1)).prepare(); - verify(mockFutureCollector, times(1)).discard(); - assertEquals( - "Expected the number of element in the current bundle to 0", - 0L, - bundleManager.getCurrentBundleElementCount()); - assertEquals( - "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount()); - assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted()); + bundleManager.tryStartBundle(); } @Test - public void testTryStartBundleThrowsExceptionFromTheListener() { + public void testWhenSignalFailureThenResetCurrentBundle() { doThrow(new RuntimeException("User start bundle threw an exception")) .when(bundleProgressListener) .onBundleStarted(); @@ -128,7 +113,7 @@ public void testTryStartBundleThrowsExceptionFromTheListener() { } @Test - public void testMultipleStartBundle() { + public void testWhenMultipleTryStartThenOnlyStartBundleOnce() { bundleManager.tryStartBundle(); bundleManager.tryStartBundle(); @@ -153,7 +138,7 @@ public void testMultipleStartBundle() { * 2. onBundleFinished callback is invoked on the progress listener */ @Test - public void testTryFinishBundleClosesBundle() { + public void testWhenTryFinishBundleThenBundleIsReset() { OpEmitter mockEmitter = mock(OpEmitter.class); when(mockFutureCollector.finish()) .thenReturn( @@ -307,8 +292,8 @@ public void testMaxWatermarkWithBundleInProgress() { @Test public void testProcessTimerWithBundleTimeElapsed() { - BundleManager bundleManager = - new BundleManager<>( + ClassicBundleManager bundleManager = + new ClassicBundleManager<>( bundleProgressListener, mockFutureCollector, MAX_BUNDLE_SIZE, diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java index f126dd14b835..58d0fbd3f1b6 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/FutureCollectorImplTest.java @@ -31,14 +31,14 @@ import org.junit.Before; import org.junit.Test; -/** Unit tests for {@linkplain org.apache.beam.runners.samza.runtime.DoFnOp.FutureCollectorImpl}. */ +/** Unit tests for {@linkplain FutureCollectorImpl}. */ public final class FutureCollectorImplTest { private static final List RESULTS = ImmutableList.of("hello", "world"); - private FutureCollector futureCollector = new DoFnOp.FutureCollectorImpl<>(); + private FutureCollector futureCollector = new FutureCollectorImpl<>(); @Before public void setup() { - futureCollector = new DoFnOp.FutureCollectorImpl<>(); + futureCollector = new FutureCollectorImpl<>(); } @Test(expected = IllegalStateException.class) diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/PortableBundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/PortableBundleManagerTest.java new file mode 100644 index 000000000000..522e146d21e6 --- /dev/null +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/PortableBundleManagerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.runtime; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.core.TimerInternals; +import org.apache.samza.operators.Scheduler; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class PortableBundleManagerTest { + + @Mock BundleManager.BundleProgressListener bundleProgressListener; + @Mock Scheduler> bundleTimerScheduler; + @Mock OpEmitter emitter; + + PortableBundleManager portableBundleManager; + + private static final String TIMER_ID = "timerId"; + + private static final long MAX_BUNDLE_TIME_MS = 100000; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void test() { + portableBundleManager = + new PortableBundleManager<>( + bundleProgressListener, 1, MAX_BUNDLE_TIME_MS, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + + verify(bundleProgressListener, times(1)).onBundleStarted(); + } + + @Test + public void testWhen() { + portableBundleManager = + new PortableBundleManager<>( + bundleProgressListener, 4, MAX_BUNDLE_TIME_MS, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryStartBundle(); + + verify(bundleProgressListener, times(1)).onBundleStarted(); + } + + @Test + public void testWhenElementCountNotReachedTHenBundleDoesntFinish() { + portableBundleManager = + new PortableBundleManager<>( + bundleProgressListener, 4, MAX_BUNDLE_TIME_MS, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryFinishBundle(emitter); + + verify(bundleProgressListener, times(1)).onBundleStarted(); + verify(bundleProgressListener, times(0)).onBundleFinished(any()); + } + + @Test + public void testWhenElementCountReachedThenFinishBundle() { + portableBundleManager = + new PortableBundleManager<>( + bundleProgressListener, 4, MAX_BUNDLE_TIME_MS, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryStartBundle(); + portableBundleManager.tryFinishBundle(emitter); + + verify(bundleProgressListener, times(1)).onBundleStarted(); + verify(bundleProgressListener, times(1)).onBundleFinished(any()); + } + + @Test + public void testWhenBundleTimeReachedThenFinishBundle() throws Exception { + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 4, 1, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + Thread.sleep(2); + portableBundleManager.tryFinishBundle(emitter); + + verify(bundleProgressListener, times(1)).onBundleStarted(); + verify(bundleProgressListener, times(1)).onBundleFinished(any()); + } + + @Test + public void testWhenSignalFailureThenResetBundle() throws Exception { + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 4, 1, bundleTimerScheduler, TIMER_ID); + portableBundleManager.tryStartBundle(); + portableBundleManager.signalFailure(new Exception()); + portableBundleManager.tryStartBundle(); + + verify(bundleProgressListener, times(2)).onBundleStarted(); + } + + @Test + public void testProcessWatermarkWhenBundleNotStarted() { + Instant watermark = new Instant(); + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 4, 1, bundleTimerScheduler, TIMER_ID); + portableBundleManager.processWatermark(watermark, emitter); + verify(bundleProgressListener, times(1)).onWatermark(eq(watermark), eq(emitter)); + } + + @Test + public void testQueueWatermarkWhenBundleStarted() { + Instant watermark = new Instant(); + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 1, 1, bundleTimerScheduler, TIMER_ID); + + portableBundleManager.tryStartBundle(); + portableBundleManager.processWatermark(watermark, emitter); + verify(bundleProgressListener, times(0)).onWatermark(eq(watermark), eq(emitter)); + + portableBundleManager.tryFinishBundle(emitter); + verify(bundleProgressListener, times(1)).onWatermark(eq(watermark), eq(emitter)); + } + + @Test + public void testProcessTimerTriesFinishBundle() { + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 1, 1, bundleTimerScheduler, TIMER_ID); + + portableBundleManager.tryStartBundle(); + KeyedTimerData keyedTimerData = mock(KeyedTimerData.class); + TimerInternals.TimerData timerData = mock(TimerInternals.TimerData.class); + when(keyedTimerData.getTimerData()).thenReturn(timerData); + when(timerData.getTimerId()).thenReturn(TIMER_ID); + + portableBundleManager.processTimer(keyedTimerData, emitter); + verify(bundleProgressListener, times(1)).onBundleFinished(any()); + verify(bundleTimerScheduler).schedule(any(KeyedTimerData.class), anyLong()); + } + + @Test + public void testDifferentTimerIdIsIgnored() { + portableBundleManager = + new PortableBundleManager<>(bundleProgressListener, 1, 1, bundleTimerScheduler, TIMER_ID); + + portableBundleManager.tryStartBundle(); + KeyedTimerData keyedTimerData = mock(KeyedTimerData.class); + TimerInternals.TimerData timerData = mock(TimerInternals.TimerData.class); + when(keyedTimerData.getTimerData()).thenReturn(timerData); + when(timerData.getTimerId()).thenReturn("NOT_TIMER_ID"); + + portableBundleManager.processTimer(keyedTimerData, emitter); + verify(bundleProgressListener, times(0)).onBundleFinished(any()); + } +}