diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 9e5ffa8a1c42..a03645f7b953 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -738,7 +738,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20240323-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240624-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20240319-2.0.0", // [bomupgrader] sets version diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index 47e36e498507..84f41c473fe0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -17,37 +17,38 @@ */ package org.apache.beam.runners.dataflow.worker; -import com.google.auto.value.AutoBuilder; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Internal; /** Keep track of any operational limits required by the backend. */ -public class OperationalLimits { +@AutoValue +@Internal +public abstract class OperationalLimits { + + private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20; + // Maximum size of a commit from a single work item. - public final long maxWorkItemCommitBytes; + public abstract long getMaxWorkItemCommitBytes(); // Maximum size of a single output element's serialized key. - public final long maxOutputKeyBytes; + public abstract long getMaxOutputKeyBytes(); // Maximum size of a single output element's serialized value. - public final long maxOutputValueBytes; + public abstract long getMaxOutputValueBytes(); - OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) { - this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; - this.maxOutputKeyBytes = maxOutputKeyBytes; - this.maxOutputValueBytes = maxOutputValueBytes; - } + @AutoValue.Builder + public abstract static class Builder { - @AutoBuilder(ofClass = OperationalLimits.class) - public interface Builder { - Builder setMaxWorkItemCommitBytes(long bytes); + public abstract Builder setMaxWorkItemCommitBytes(long bytes); - Builder setMaxOutputKeyBytes(long bytes); + public abstract Builder setMaxOutputKeyBytes(long bytes); - Builder setMaxOutputValueBytes(long bytes); + public abstract Builder setMaxOutputValueBytes(long bytes); - OperationalLimits build(); + public abstract OperationalLimits build(); } - public static Builder builder() { - return new AutoBuilder_OperationalLimits_Builder() - .setMaxWorkItemCommitBytes(Long.MAX_VALUE) + public static OperationalLimits.Builder builder() { + return new AutoValue_OperationalLimits.Builder() + .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) .setMaxOutputKeyBytes(Long.MAX_VALUE) .setMaxOutputValueBytes(Long.MAX_VALUE); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 1af677382092..0dedd4f34fd6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -34,8 +34,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.runners.core.metrics.MetricsLogger; @@ -49,9 +47,11 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; @@ -103,9 +103,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; import org.joda.time.Instant; @@ -181,7 +179,6 @@ private StreamingDataflowWorker( WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, - AtomicReference operationalLimits, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, ConcurrentMap stageInfoMap) { @@ -237,8 +234,8 @@ private StreamingDataflowWorker( streamingCounters, hotKeyLogger, sampler, - operationalLimits, ID_GENERATOR, + configFetcher.getGlobalConfigHandle(), stageInfoMap); ThrottlingGetDataMetricTracker getDataMetricTracker = @@ -298,6 +295,7 @@ private StreamingDataflowWorker( .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) + .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) .build(); Windmill.GetWorkRequest request = @@ -335,8 +333,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingCounters streamingCounters = StreamingCounters.create(); WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); - AtomicReference operationalLimits = - new AtomicReference<>(OperationalLimits.builder().build()); WindmillStateCache windmillStateCache = WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) @@ -354,7 +350,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( options, dataflowServiceClient, - operationalLimits, windmillStreamFactoryBuilder, configFetcher -> ComputationStateCache.create( @@ -412,7 +407,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o workFailureProcessor, streamingCounters, memoryMonitor, - operationalLimits, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); @@ -428,7 +422,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, - AtomicReference operationalLimits, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; @@ -440,13 +433,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o GrpcDispatcherClient.create(createStubFactory(options)); configFetcher = StreamingEngineComputationConfigFetcher.create( - options.getGlobalConfigRefreshPeriod().getMillis(), - dataflowServiceClient, - config -> - onPipelineConfig( - config, - dispatcherClient::consumeWindmillDispatcherEndpoints, - operationalLimits::set)); + options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); + configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -474,7 +462,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillServer = new JniWindmillApplianceServer(options.getLocalWindmillHostport()); } - configFetcher = new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); + configFetcher = + new StreamingApplianceComputationConfigFetcher( + windmillServer::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); computationStateCache = computationStateCacheFactory.apply(configFetcher); } @@ -494,10 +485,9 @@ static StreamingDataflowWorker forTesting( HotKeyLogger hotKeyLogger, Supplier clock, Function executorSupplier, - int localRetryTimeoutMs, - OperationalLimits limits) { + StreamingGlobalConfigHandleImpl globalConfigHandle, + int localRetryTimeoutMs) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); - AtomicReference operationalLimits = new AtomicReference<>(limits); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.builder() @@ -510,13 +500,20 @@ static StreamingDataflowWorker forTesting( /* hasReceivedGlobalConfig= */ true, options.getGlobalConfigRefreshPeriod().getMillis(), workUnitClient, - executorSupplier, - config -> - onPipelineConfig( - config, - windmillServer::setWindmillServiceEndpoints, - operationalLimits::set)) - : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); + globalConfigHandle, + executorSupplier) + : new StreamingApplianceComputationConfigFetcher( + windmillServer::getConfig, globalConfigHandle); + configFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + if (config.windmillServiceEndpoints().isEmpty()) { + LOG.warn("Received empty windmill service endpoints"); + return; + } + windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints()); + }); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); ComputationStateCache computationStateCache = @@ -583,7 +580,6 @@ static StreamingDataflowWorker forTesting( workFailureProcessor, streamingCounters, memoryMonitor, - operationalLimits, options.isEnableStreamingEngine() ? windmillStreamFactory .setHealthCheckIntervalMillis( @@ -594,23 +590,6 @@ static StreamingDataflowWorker forTesting( stageInfo); } - private static void onPipelineConfig( - StreamingEnginePipelineConfig config, - Consumer> consumeWindmillServiceEndpoints, - Consumer operationalLimits) { - - operationalLimits.accept( - OperationalLimits.builder() - .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes()) - .setMaxOutputKeyBytes(config.maxOutputKeyBytes()) - .setMaxOutputValueBytes(config.maxOutputValueBytes()) - .build()); - - if (!config.windmillServiceEndpoints().isEmpty()) { - consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints()); - } - } - private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( DataflowWorkerHarnessOptions options, long clientId) { Duration maxBackoff = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f25f6294da86..5ff94884e974 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; @@ -107,6 +108,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext stateNameMap; private final WindmillStateCache.ForComputation stateCache; private final ReaderCache readerCache; + private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; private volatile long backlogBytes; @@ -153,6 +155,7 @@ public StreamingModeExecutionContext( MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, + StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, boolean throwExceptionOnLargeOutput) { super( @@ -163,6 +166,7 @@ public StreamingModeExecutionContext( sinkByteLimit); this.computationId = computationId; this.readerCache = readerCache; + this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; @@ -176,11 +180,11 @@ public final long getBacklogBytes() { } public long getMaxOutputKeyBytes() { - return operationalLimits.maxOutputKeyBytes; + return operationalLimits.getMaxOutputKeyBytes(); } public long getMaxOutputValueBytes() { - return operationalLimits.maxOutputValueBytes; + return operationalLimits.getMaxOutputValueBytes(); } public boolean throwExceptionsForLargeOutput() { @@ -196,13 +200,13 @@ public void start( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - OperationalLimits operationalLimits, Windmill.WorkItemCommitRequest.Builder outputBuilder) { this.key = key; this.work = work; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; - this.operationalLimits = operationalLimits; + // Snapshot the limits for entire bundle processing. + this.operationalLimits = globalConfigHandle.getConfig().operationalLimits(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); clearSinkFullHint(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index 8a00194887da..8dc681fc640c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -24,7 +24,6 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; -import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; @@ -73,11 +72,9 @@ public final void executeWork( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - OperationalLimits operationalLimits, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context() - .start(key, work, stateReader, sideInputStateFetcher, operationalLimits, outputBuilder); + context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java index fb8bcf7edbfb..9702751aeb98 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java @@ -48,12 +48,13 @@ public static ComputationConfig create( public abstract ImmutableMap stateNameMap(); /** Interface to fetch configurations for a specific computation. */ - @FunctionalInterface public interface Fetcher { default void start() {} default void stop() {} Optional fetchConfig(String computationId); + + StreamingGlobalConfigHandle getGlobalConfigHandle(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java new file mode 100644 index 000000000000..c244ecb8c7a8 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +/* + * StreamingGlobalConfigHandle returning a fixed config + * initialized during construction. Used for Appliance and Tests. + */ +public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle { + + private final StreamingGlobalConfig config; + + public FixedGlobalConfigHandle(StreamingGlobalConfig config) { + this.config = config; + } + + @Override + public StreamingGlobalConfig getConfig() { + return config; + } + + @Override + public void registerConfigObserver(@Nonnull Consumer callback) { + callback.accept(config); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java index 786ded09498a..025e66be79c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java @@ -48,11 +48,14 @@ public final class StreamingApplianceComputationConfigFetcher implements Computa private final ApplianceComputationConfigFetcher applianceComputationConfigFetcher; private final ConcurrentHashMap systemNameToComputationIdMap; + private final StreamingGlobalConfigHandle globalConfigHandle; public StreamingApplianceComputationConfigFetcher( - ApplianceComputationConfigFetcher applianceComputationConfigFetcher) { + ApplianceComputationConfigFetcher applianceComputationConfigFetcher, + StreamingGlobalConfigHandle globalConfigHandle) { this.applianceComputationConfigFetcher = applianceComputationConfigFetcher; this.systemNameToComputationIdMap = new ConcurrentHashMap<>(); + this.globalConfigHandle = globalConfigHandle; } /** Returns a {@code Table} */ @@ -112,6 +115,11 @@ public Optional fetchConfig(String computationId) { .collect(toImmutableMap(NameMapEntry::getUserName, NameMapEntry::getSystemName))); } + @Override + public StreamingGlobalConfigHandle getGlobalConfigHandle() { + return globalConfigHandle; + } + private Optional createComputationConfig( String serializedMapTask, Table transformUserNameToStateFamilyByComputationId, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index d230aac54c63..22b0dac6eb22 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -30,16 +30,18 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.StreamSupport; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.WorkUnitClient; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; @@ -72,33 +74,31 @@ public final class StreamingEngineComputationConfigFetcher implements Computatio private final long globalConfigRefreshPeriodMillis; private final WorkUnitClient dataflowServiceClient; private final ScheduledExecutorService globalConfigRefresher; - private final Consumer onStreamingConfig; + private final StreamingGlobalConfigHandleImpl globalConfigHandle; private final AtomicBoolean hasReceivedGlobalConfig; private StreamingEngineComputationConfigFetcher( boolean hasReceivedGlobalConfig, long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - ScheduledExecutorService globalConfigRefresher, - Consumer onStreamingConfig) { + StreamingGlobalConfigHandleImpl globalConfigHandle, + ScheduledExecutorService globalConfigRefresher) { this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis; this.dataflowServiceClient = dataflowServiceClient; this.globalConfigRefresher = globalConfigRefresher; - this.onStreamingConfig = onStreamingConfig; + this.globalConfigHandle = globalConfigHandle; this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig); } public static StreamingEngineComputationConfigFetcher create( - long globalConfigRefreshPeriodMillis, - WorkUnitClient dataflowServiceClient, - Consumer onStreamingConfig) { + long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient) { return new StreamingEngineComputationConfigFetcher( /* hasReceivedGlobalConfig= */ false, globalConfigRefreshPeriodMillis, dataflowServiceClient, + new StreamingGlobalConfigHandleImpl(), Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()), - onStreamingConfig); + new ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build())); } @VisibleForTesting @@ -106,14 +106,14 @@ public static StreamingEngineComputationConfigFetcher forTesting( boolean hasReceivedGlobalConfig, long globalConfigRefreshPeriodMillis, WorkUnitClient dataflowServiceClient, - Function executorSupplier, - Consumer onStreamingConfig) { + StreamingGlobalConfigHandleImpl globalConfigHandle, + Function executorSupplier) { return new StreamingEngineComputationConfigFetcher( hasReceivedGlobalConfig, globalConfigRefreshPeriodMillis, dataflowServiceClient, - executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME), - onStreamingConfig); + globalConfigHandle, + executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME)); } @VisibleForTesting @@ -157,11 +157,9 @@ private static Optional fetchConfigWithRetry( } } - private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { - StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); - if (config.getUserStepToStateFamilyNameMap() != null) { - pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap()); - } + private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { + StreamingGlobalConfig.Builder pipelineConfig = StreamingGlobalConfig.builder(); + OperationalLimits.Builder operationalLimits = OperationalLimits.builder(); if (config.getWindmillServiceEndpoint() != null && !config.getWindmillServiceEndpoint().isEmpty()) { @@ -184,23 +182,36 @@ private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask c if (config.getMaxWorkItemCommitBytes() != null && config.getMaxWorkItemCommitBytes() > 0 && config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) { - pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); + operationalLimits.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); } if (config.getOperationalLimits() != null) { if (config.getOperationalLimits().getMaxKeyBytes() != null && config.getOperationalLimits().getMaxKeyBytes() > 0 && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { - pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); + operationalLimits.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); } if (config.getOperationalLimits().getMaxProductionOutputBytes() != null && config.getOperationalLimits().getMaxProductionOutputBytes() > 0 && config.getOperationalLimits().getMaxProductionOutputBytes() <= Integer.MAX_VALUE) { - pipelineConfig.setMaxOutputValueBytes( + operationalLimits.setMaxOutputValueBytes( config.getOperationalLimits().getMaxProductionOutputBytes()); } } + pipelineConfig.setOperationalLimits(operationalLimits.build()); + + byte[] settings_bytes = config.decodeUserWorkerRunnerV1Settings(); + if (settings_bytes != null) { + UserWorkerRunnerV1Settings settings = UserWorkerRunnerV1Settings.newBuilder().build(); + try { + settings = UserWorkerRunnerV1Settings.parseFrom(settings_bytes); + } catch (InvalidProtocolBufferException e) { + LOG.error("Parsing UserWorkerRunnerV1Settings failed", e); + } + pipelineConfig.setUserWorkerJobSettings(settings); + } + return pipelineConfig.build(); } @@ -233,6 +244,11 @@ public Optional fetchConfig(String computationId) { .flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig); } + @Override + public StreamingGlobalConfigHandle getGlobalConfigHandle() { + return globalConfigHandle; + } + @Override public void stop() { // We have already shutdown or start has not been called. @@ -259,7 +275,7 @@ public void stop() { @SuppressWarnings("FutureReturnValueIgnored") private void schedulePeriodicGlobalConfigRequests() { globalConfigRefresher.scheduleWithFixedDelay( - () -> fetchGlobalConfig().ifPresent(onStreamingConfig), + () -> fetchGlobalConfig().ifPresent(globalConfigHandle::setConfig), 0, globalConfigRefreshPeriodMillis, TimeUnit.MILLISECONDS); @@ -272,9 +288,9 @@ private void schedulePeriodicGlobalConfigRequests() { private synchronized void fetchInitialPipelineGlobalConfig() { while (!hasReceivedGlobalConfig.get()) { LOG.info("Sending request to get initial global configuration for this worker."); - Optional globalConfig = fetchGlobalConfig(); + Optional globalConfig = fetchGlobalConfig(); if (globalConfig.isPresent()) { - onStreamingConfig.accept(globalConfig.get()); + globalConfigHandle.setConfig(globalConfig.get()); hasReceivedGlobalConfig.set(true); break; } @@ -285,13 +301,14 @@ private synchronized void fetchInitialPipelineGlobalConfig() { LOG.info("Initial global configuration received, harness is now ready"); } - private Optional fetchGlobalConfig() { + private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) .map(config -> createPipelineConfig(config)); } @FunctionalInterface private interface ThrowingFetchWorkItemFn { + Optional fetchWorkItem() throws IOException; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java similarity index 56% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java index 8f1ff93f6a49..8f76f5ec27af 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.dataflow.worker.streaming.config; import com.google.auto.value.AutoValue; -import java.util.HashMap; -import java.util.Map; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; @@ -27,41 +27,30 @@ /** Global pipeline config for pipelines running in Streaming Engine mode. */ @AutoValue @Internal -public abstract class StreamingEnginePipelineConfig { +public abstract class StreamingGlobalConfig { - private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20; - - public static StreamingEnginePipelineConfig.Builder builder() { - return new AutoValue_StreamingEnginePipelineConfig.Builder() - .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) - .setMaxOutputKeyBytes(Long.MAX_VALUE) - .setMaxOutputValueBytes(Long.MAX_VALUE) - .setUserStepToStateFamilyNameMap(new HashMap<>()) - .setWindmillServiceEndpoints(ImmutableSet.of()); + public static StreamingGlobalConfig.Builder builder() { + return new AutoValue_StreamingGlobalConfig.Builder() + .setWindmillServiceEndpoints(ImmutableSet.of()) + .setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build()) + .setOperationalLimits(OperationalLimits.builder().build()); } - public abstract long maxWorkItemCommitBytes(); - - public abstract long maxOutputKeyBytes(); - - public abstract long maxOutputValueBytes(); - - public abstract Map userStepToStateFamilyNameMap(); + public abstract OperationalLimits operationalLimits(); public abstract ImmutableSet windmillServiceEndpoints(); + public abstract UserWorkerRunnerV1Settings userWorkerJobSettings(); + @AutoValue.Builder public abstract static class Builder { - public abstract Builder setMaxWorkItemCommitBytes(long value); - - public abstract Builder setMaxOutputKeyBytes(long value); - public abstract Builder setMaxOutputValueBytes(long value); + public abstract Builder setWindmillServiceEndpoints(ImmutableSet value); - public abstract Builder setUserStepToStateFamilyNameMap(Map value); + public abstract Builder setOperationalLimits(OperationalLimits operationalLimits); - public abstract Builder setWindmillServiceEndpoints(ImmutableSet value); + public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings); - public abstract StreamingEnginePipelineConfig build(); + public abstract StreamingGlobalConfig build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java new file mode 100644 index 000000000000..6f75ba887473 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +@ThreadSafe +public interface StreamingGlobalConfigHandle { + + /** Returns the latest StreamingGlobalConfig */ + StreamingGlobalConfig getConfig(); + + /** + * Subscribe to config updates by registering a callback. Callback should be called the first time + * with settings, if any. The callback could execute inline before the method returns. + */ + void registerConfigObserver(@Nonnull Consumer callback); +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java new file mode 100644 index 000000000000..9ed5c9fcf396 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +@ThreadSafe +public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHandle { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingGlobalConfigHandleImpl.class); + + private final AtomicReference streamingEngineConfig = + new AtomicReference<>(); + + private final CopyOnWriteArrayList configCallbacks = new CopyOnWriteArrayList<>(); + + @Override + @Nonnull + public StreamingGlobalConfig getConfig() { + Preconditions.checkState( + streamingEngineConfig.get() != null, + "Global config should be set before any processing is done"); + return streamingEngineConfig.get(); + } + + @Override + public void registerConfigObserver(@Nonnull Consumer callback) { + ConfigCallback configCallback = new ConfigCallback(callback); + configCallbacks.add(configCallback); + if (streamingEngineConfig.get() != null) { + configCallback.run(); + } + } + + void setConfig(@Nonnull StreamingGlobalConfig config) { + if (config.equals(streamingEngineConfig.get())) { + return; + } + streamingEngineConfig.set(config); + for (ConfigCallback configCallback : configCallbacks) { + configCallback.run(); + } + } + + private class ConfigCallback { + + private final AtomicInteger queuedOrRunning = new AtomicInteger(0); + private final Consumer configConsumer; + + private ConfigCallback(Consumer configConsumer) { + this.configConsumer = configConsumer; + } + + /** + * Runs the passed in callback with the latest config. Overlapping `run()` calls will be + * collapsed into one. If the callback is already running a new call will be scheduled to run + * after the current execution completes, on the same thread which ran the previous run. + */ + private void run() { + // If the callback is already running, + // Increment queued and return. The thread running + // the callback will run it again with the latest config. + if (queuedOrRunning.incrementAndGet() > 1) { + return; + } + // Else run the callback + while (true) { + try { + configConsumer.accept(StreamingGlobalConfigHandleImpl.this.streamingEngineConfig.get()); + } catch (Exception e) { + LOG.error("Exception running GlobalConfig callback", e); + } + if (queuedOrRunning.updateAndGet( + queuedOrRunning -> { + if (queuedOrRunning == 1) { + // If there are no queued requests stop processing. + return 0; + } + // Else, clear queue, set 1 running and run the callback + return 1; + }) + == 0) { + break; + } + } + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index d305e25af7e5..6981312eff1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import javax.servlet.http.HttpServletRequest; @@ -38,6 +39,8 @@ import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; @@ -77,6 +80,8 @@ public final class StreamingWorkerStatusPages { private final DebugCapture.@Nullable Manager debugCapture; private final @Nullable ChannelzServlet channelzServlet; + private final AtomicReference globalConfig = new AtomicReference<>(); + StreamingWorkerStatusPages( Supplier clock, long clientId, @@ -90,7 +95,8 @@ public final class StreamingWorkerStatusPages { @Nullable GrpcWindmillStreamFactory windmillStreamFactory, Consumer getDataStatusProvider, BoundedQueueExecutor workUnitExecutor, - ScheduledExecutorService statusPageDumper) { + ScheduledExecutorService statusPageDumper, + StreamingGlobalConfigHandle globalConfigHandle) { this.clock = clock; this.clientId = clientId; this.isRunning = isRunning; @@ -104,6 +110,7 @@ public final class StreamingWorkerStatusPages { this.getDataStatusProvider = getDataStatusProvider; this.workUnitExecutor = workUnitExecutor; this.statusPageDumper = statusPageDumper; + globalConfigHandle.registerConfigObserver(globalConfig::set); } public static StreamingWorkerStatusPages.Builder builder() { @@ -150,6 +157,17 @@ private void addStreamingEngineStatusPages() { statusPages.addCapturePage(Preconditions.checkNotNull(channelzServlet)); statusPages.addStatusDataProvider( "streaming", "Streaming RPCs", Preconditions.checkNotNull(windmillStreamFactory)); + statusPages.addStatusDataProvider( + "jobSettings", + "User Worker Job Settings", + writer -> { + @Nullable StreamingGlobalConfig config = globalConfig.get(); + if (config == null) { + writer.println("Job Settings not loaded."); + return; + } + writer.println(config.userWorkerJobSettings().toString()); + }); } private boolean isStreamingEngine() { @@ -256,6 +274,8 @@ public interface Builder { Builder setStatusPageDumper(ScheduledExecutorService statusPageDumper); + Builder setGlobalConfigHandle(StreamingGlobalConfigHandle globalConfigHandle); + StreamingWorkerStatusPages build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index cf2e7260592d..412608ea3981 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; @@ -46,6 +47,7 @@ /** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */ @ThreadSafe public class GrpcDispatcherClient { + private static final Logger LOG = LoggerFactory.getLogger(GrpcDispatcherClient.class); private final WindmillStubFactory windmillStubFactory; private final CountDownLatch onInitializedEndpoints; @@ -146,6 +148,14 @@ public boolean hasInitializedEndpoints() { return dispatcherStubs.get().hasInitializedEndpoints(); } + public void onJobConfig(StreamingGlobalConfig config) { + if (config.windmillServiceEndpoints().isEmpty()) { + LOG.warn("Dispatcher client received empty windmill service endpoints from global config"); + return; + } + consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints()); + } + public synchronized void consumeWindmillDispatcherEndpoints( ImmutableSet dispatcherEndpoints) { ImmutableSet currentDispatcherEndpoints = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index 20c1247b2168..d5e0b3a24e2a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -47,6 +47,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor; import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor; import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; @@ -94,6 +95,7 @@ final class ComputationWorkExecutorFactory { private final long maxSinkBytes; private final IdGenerator idGenerator; + private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; ComputationWorkExecutorFactory( @@ -103,12 +105,14 @@ final class ComputationWorkExecutorFactory { Function stateCacheFactory, DataflowExecutionStateSampler sampler, CounterSet pendingDeltaCounters, - IdGenerator idGenerator) { + IdGenerator idGenerator, + StreamingGlobalConfigHandle globalConfigHandle) { this.options = options; this.mapTaskExecutorFactory = mapTaskExecutorFactory; this.readerCache = readerCache; this.stateCacheFactory = stateCacheFactory; this.idGenerator = idGenerator; + this.globalConfigHandle = globalConfigHandle; this.readerRegistry = ReaderRegistry.defaultRegistry(); this.sinkRegistry = SinkRegistry.defaultRegistry(); this.sampler = sampler; @@ -262,6 +266,7 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.metricsContainerRegistry(), executionStateTracker, stageInfo.executionStateRegistry(), + globalConfigHandle, maxSinkBytes, throwExceptionOnLargeOutput); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 86f2cffe604c..641fd119a42d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; @@ -33,7 +32,6 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory; import org.apache.beam.runners.dataflow.worker.HotKeyLogger; -import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.ReaderCache; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; @@ -44,6 +42,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory; @@ -85,7 +84,7 @@ public final class StreamingWorkScheduler { private final HotKeyLogger hotKeyLogger; private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final AtomicReference operationalLimits; + private final StreamingGlobalConfigHandle globalConfigHandle; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -99,7 +98,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - AtomicReference operationalLimits) { + StreamingGlobalConfigHandle globalConfigHandle) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -111,7 +110,7 @@ public StreamingWorkScheduler( this.hotKeyLogger = hotKeyLogger; this.stageInfoMap = stageInfoMap; this.sampler = sampler; - this.operationalLimits = operationalLimits; + this.globalConfigHandle = globalConfigHandle; } public static StreamingWorkScheduler create( @@ -126,8 +125,8 @@ public static StreamingWorkScheduler create( StreamingCounters streamingCounters, HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, - AtomicReference operationalLimits, IdGenerator idGenerator, + StreamingGlobalConfigHandle globalConfigHandle, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = new ComputationWorkExecutorFactory( @@ -137,7 +136,8 @@ public static StreamingWorkScheduler create( stateCacheFactory, sampler, streamingCounters.pendingDeltaCounters(), - idGenerator); + idGenerator, + globalConfigHandle); return new StreamingWorkScheduler( options, @@ -151,7 +151,7 @@ public static StreamingWorkScheduler create( hotKeyLogger, stageInfoMap, sampler, - operationalLimits); + globalConfigHandle); } private static long computeShuffleBytesRead(Windmill.WorkItem workItem) { @@ -295,7 +295,7 @@ private Windmill.WorkItemCommitRequest validateCommitRequestSize( Windmill.WorkItemCommitRequest commitRequest, String computationId, Windmill.WorkItem workItem) { - long byteLimit = operationalLimits.get().maxWorkItemCommitBytes; + long byteLimit = globalConfigHandle.getConfig().operationalLimits().getMaxWorkItemCommitBytes(); int commitSize = commitRequest.getSerializedSize(); int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize; @@ -380,12 +380,7 @@ private ExecuteWorkResult executeWork( // Blocks while executing work. computationWorkExecutor.executeWork( - executionKey, - work, - stateReader, - localSideInputStateFetcher, - operationalLimits.get(), - outputBuilder); + executionKey, work, stateReader, localSideInputStateFetcher, outputBuilder); if (work.isFailed()) { throw new WorkItemCancelledException(workItem.getShardingKey()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index b41ad391d878..dadf02171235 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -102,6 +102,8 @@ import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; import org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; @@ -275,6 +277,8 @@ public Long get() { @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC(); @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); + StreamingGlobalConfigHandleImpl mockGlobalConfigHandle = + mock(StreamingGlobalConfigHandleImpl.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); private @Nullable ComputationStateCache computationStateCache = null; @@ -750,7 +754,9 @@ private StringBuilder initializeExpectedCommitRequest( requestBuilder.append("cache_token: "); requestBuilder.append(index + 1); requestBuilder.append(" "); - if (hasSourceBytesProcessed) requestBuilder.append("source_bytes_processed: 0 "); + if (hasSourceBytesProcessed) { + requestBuilder.append("source_bytes_processed: 0 "); + } return requestBuilder; } @@ -834,6 +840,8 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args private StreamingDataflowWorker makeWorker( StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) { + when(mockGlobalConfigHandle.getConfig()) + .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig()); StreamingDataflowWorker worker = StreamingDataflowWorker.forTesting( streamingDataflowWorkerTestParams.stateNameMappings(), @@ -847,8 +855,8 @@ private StreamingDataflowWorker makeWorker( hotKeyLogger, streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), - streamingDataflowWorkerTestParams.localRetryTimeoutMs(), - streamingDataflowWorkerTestParams.operationalLimits()); + mockGlobalConfigHandle, + streamingDataflowWorkerTestParams.localRetryTimeoutMs()); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1210,8 +1218,11 @@ public void testKeyCommitTooLargeException() throws Exception { makeWorker( defaultWorkerParams() .setInstructions(instructions) - .setOperationalLimits( - OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build()) + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build()) + .build()) .publishCounters() .build()); worker.start(); @@ -1282,7 +1293,11 @@ public void testOutputKeyTooLargeException() throws Exception { makeWorker( defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) - .setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) + .build()) .build()); worker.start(); @@ -1315,8 +1330,11 @@ public void testOutputValueTooLargeException() throws Exception { makeWorker( defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) - .setOperationalLimits( - OperationalLimits.builder().setMaxOutputValueBytes(15).build()) + .setStreamingGlobalConfig( + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder().setMaxOutputValueBytes(15).build()) + .build()) .build()); worker.start(); @@ -4412,7 +4430,9 @@ Duration getLatencyAttributionDuration(long workToken, LatencyAttribution.State } boolean isActiveWorkRefresh(GetDataRequest request) { - if (request.getComputationHeartbeatRequestCount() > 0) return true; + if (request.getComputationHeartbeatRequestCount() > 0) { + return true; + } for (ComputationGetDataRequest computationRequest : request.getRequestsList()) { if (!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) { return false; @@ -4508,7 +4528,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setOperationalLimits(OperationalLimits.builder().build()); + .setStreamingGlobalConfig(StreamingGlobalConfig.builder().build()); } abstract ImmutableMap stateNameMappings(); @@ -4525,10 +4545,11 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int localRetryTimeoutMs(); - abstract OperationalLimits operationalLimits(); + abstract StreamingGlobalConfig streamingGlobalConfig(); @AutoValue.Builder abstract static class Builder { + abstract Builder setStateNameMappings(ImmutableMap value); abstract ImmutableMap.Builder stateNameMappingsBuilder(); @@ -4559,7 +4580,7 @@ final Builder publishCounters() { abstract Builder setLocalRetryTimeoutMs(int value); - abstract Builder setOperationalLimits(OperationalLimits operationalLimits); + abstract Builder setStreamingGlobalConfig(StreamingGlobalConfig config); abstract StreamingDataflowWorkerTestParams build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 86ed8f552d16..a1d4210f3dbc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -59,6 +59,9 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; @@ -107,6 +110,8 @@ public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); CounterSet counterSet = new CounterSet(); ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); + StreamingGlobalConfigHandle globalConfigHandle = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); executionContext = new StreamingModeExecutionContext( @@ -127,6 +132,7 @@ public void setUp() { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); } @@ -158,7 +164,6 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); @@ -208,7 +213,6 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, - OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index f2e03b453fd8..8ad73a5145bc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -90,6 +90,9 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; @@ -594,6 +597,8 @@ public void testReadUnboundedReader() throws Exception { StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), Runnable::run); + StreamingGlobalConfigHandle globalConfigHandle = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( counterSet, @@ -610,6 +615,7 @@ public void testReadUnboundedReader() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); @@ -635,7 +641,6 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -960,6 +965,8 @@ public void testFailedWorkItemsAbort() throws Exception { CounterSet counterSet = new CounterSet(); StreamingModeExecutionStateRegistry executionStateRegistry = new StreamingModeExecutionStateRegistry(); + StreamingGlobalConfigHandle globalConfigHandle = + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); StreamingModeExecutionContext context = new StreamingModeExecutionContext( counterSet, @@ -979,6 +986,7 @@ public void testFailedWorkItemsAbort() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, + globalConfigHandle, Long.MAX_VALUE, /*throwExceptionOnLargeOutput=*/ false); @@ -1012,7 +1020,6 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), - OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java new file mode 100644 index 000000000000..b5cb85a58c12 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class FixedGlobalConfigHandleTest { + + @Test + public void getConfig() { + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config); + assertEquals(config, globalConfigHandle.getConfig()); + } + + @Test + public void registerConfigObserver() throws InterruptedException { + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config); + AtomicReference configFromCallback = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + globalConfigHandle.registerConfigObserver( + cbConfig -> { + configFromCallback.set(cbConfig); + latch.countDown(); + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configFromCallback.get(), globalConfigHandle.getConfig()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java index f39c98c61b19..2586ae2be86f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java @@ -137,6 +137,8 @@ public void testGetComputationConfig_onFetchConfigError() { } private StreamingApplianceComputationConfigFetcher createStreamingApplianceConfigLoader() { - return new StreamingApplianceComputationConfigFetcher(mockWindmillServer::getConfig); + return new StreamingApplianceComputationConfigFetcher( + mockWindmillServer::getConfig, + new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build())); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 59fd092adcba..9fa17588c94d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -34,7 +34,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.function.Consumer; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.WorkUnitClient; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -47,6 +47,7 @@ @RunWith(JUnit4.class) public class StreamingEngineComputationConfigFetcherTest { + private final WorkUnitClient mockDataflowServiceClient = mock(WorkUnitClient.class, new Returns(Optional.empty())); private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher; @@ -54,13 +55,13 @@ public class StreamingEngineComputationConfigFetcherTest { private StreamingEngineComputationConfigFetcher createConfigFetcher( boolean waitForInitialConfig, long globalConfigRefreshPeriod, - Consumer onPipelineConfig) { + StreamingGlobalConfigHandleImpl globalConfigHandle) { return StreamingEngineComputationConfigFetcher.forTesting( !waitForInitialConfig, globalConfigRefreshPeriod, mockDataflowServiceClient, - ignored -> Executors.newSingleThreadScheduledExecutor(), - onPipelineConfig); + globalConfigHandle, + ignored -> Executors.newSingleThreadScheduledExecutor()); } @After @@ -75,31 +76,33 @@ public void testStart_requiresInitialConfig() throws IOException, InterruptedExc .setJobId("job") .setStreamingConfigTask(new StreamingConfigTask().setMaxWorkItemCommitBytes(10L)); CountDownLatch waitForInitialConfig = new CountDownLatch(1); - Set receivedPipelineConfig = new HashSet<>(); + Set receivedPipelineConfig = new HashSet<>(); when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) .thenReturn(Optional.of(initialConfig)); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.registerConfigObserver( + config -> { + try { + receivedPipelineConfig.add(config); + waitForInitialConfig.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); streamingEngineConfigFetcher = - createConfigFetcher( - /* waitForInitialConfig= */ true, - 0, - config -> { - try { - receivedPipelineConfig.add(config); - waitForInitialConfig.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }); + createConfigFetcher(/* waitForInitialConfig= */ true, 0, globalConfigHandle); Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); asyncStartConfigLoader.start(); waitForInitialConfig.countDown(); asyncStartConfigLoader.join(); - assertThat(receivedPipelineConfig) - .containsExactly( - StreamingEnginePipelineConfig.builder() - .setMaxWorkItemCommitBytes( - initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) - .build()); + StreamingGlobalConfig.Builder configBuilder = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes( + initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + .build()); + assertThat(receivedPipelineConfig).containsExactly(configBuilder.build()); } @Test @@ -117,7 +120,7 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru .setJobId("job") .setStreamingConfigTask(new StreamingConfigTask().setMaxWorkItemCommitBytes(100L)); CountDownLatch numExpectedRefreshes = new CountDownLatch(3); - Set receivedPipelineConfig = new HashSet<>(); + Set receivedPipelineConfig = new HashSet<>(); when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) .thenReturn(Optional.of(firstConfig)) .thenReturn(Optional.of(secondConfig)) @@ -127,15 +130,15 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru // ConfigFetcher should not do anything with a config that doesn't contain a // StreamingConfigTask. .thenReturn(Optional.of(new WorkItem().setJobId("jobId"))); - + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.registerConfigObserver( + config -> { + receivedPipelineConfig.add(config); + numExpectedRefreshes.countDown(); + }); streamingEngineConfigFetcher = createConfigFetcher( - /* waitForInitialConfig= */ true, - Duration.millis(100).getMillis(), - config -> { - receivedPipelineConfig.add(config); - numExpectedRefreshes.countDown(); - }); + /* waitForInitialConfig= */ true, Duration.millis(100).getMillis(), globalConfigHandle); Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); asyncStartConfigLoader.start(); @@ -143,24 +146,34 @@ public void testStart_startsPeriodicConfigRequests() throws IOException, Interru asyncStartConfigLoader.join(); assertThat(receivedPipelineConfig) .containsExactly( - StreamingEnginePipelineConfig.builder() - .setMaxWorkItemCommitBytes( - firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes( + firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + .build()) .build(), - StreamingEnginePipelineConfig.builder() - .setMaxWorkItemCommitBytes( - secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes( + secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + .build()) .build(), - StreamingEnginePipelineConfig.builder() - .setMaxWorkItemCommitBytes( - thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes( + thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes()) + .build()) .build()); } @Test public void testGetComputationConfig() throws IOException { + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored -> {}); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); String computationId = "computationId"; String stageName = "stageName"; String systemName = "systemName"; @@ -193,9 +206,11 @@ public void testGetComputationConfig() throws IOException { @Test public void testGetComputationConfig_noComputationPresent() throws IOException { - Set receivedPipelineConfig = new HashSet<>(); + Set receivedPipelineConfig = new HashSet<>(); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.registerConfigObserver(receivedPipelineConfig::add); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, receivedPipelineConfig::add); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())) .thenReturn(Optional.empty()); Optional pipelineConfig = @@ -206,8 +221,9 @@ public void testGetComputationConfig_noComputationPresent() throws IOException { @Test public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOException { + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); streamingEngineConfigFetcher = - createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored -> {}); + createConfigFetcher(/* waitForInitialConfig= */ false, 0, globalConfigHandle); RuntimeException e = new RuntimeException("something bad happened."); when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())).thenThrow(e); Throwable fetchConfigError = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java new file mode 100644 index 000000000000..059f60731a7d --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class StreamingGlobalConfigHandleImplTest { + + @Test + public void getConfig() { + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig config = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + globalConfigHandle.setConfig(config); + assertEquals(config, globalConfigHandle.getConfig()); + + StreamingGlobalConfig updatedConfig = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(324) + .setMaxOutputKeyBytes(456) + .setMaxWorkItemCommitBytes(123) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(true) + .build()) + .build(); + globalConfigHandle.setConfig(updatedConfig); + assertEquals(updatedConfig, globalConfigHandle.getConfig()); + } + + @Test + public void registerConfigObserver_configSetAfterRegisteringCallback() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + AtomicReference configFromCallback1 = new AtomicReference<>(); + AtomicReference configFromCallback2 = new AtomicReference<>(); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback1.set(config); + latch.countDown(); + }); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback2.set(config); + latch.countDown(); + }); + globalConfigHandle.setConfig(configToSet); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig()); + assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig()); + } + + @Test + public void registerConfigObserver_configSetBeforeRegisteringCallback() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + AtomicReference configFromCallback1 = new AtomicReference<>(); + AtomicReference configFromCallback2 = new AtomicReference<>(); + globalConfigHandle.setConfig(configToSet); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback1.set(config); + latch.countDown(); + }); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback2.set(config); + latch.countDown(); + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig()); + assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig()); + } + + @Test + public void registerConfigObserver_configSetBeforeRegisteringCallback_callbackThrowsException() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + AtomicReference configFromCallback = new AtomicReference<>(); + globalConfigHandle.setConfig(configToSet); + globalConfigHandle.registerConfigObserver( + config -> { + latch.countDown(); + throw new RuntimeException(); + }); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback.set(config); + latch.countDown(); + }); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configFromCallback.get(), configToSet); + } + + @Test + public void registerConfigObserver_configSetAfterRegisteringCallback_callbackThrowsException() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig configToSet = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + AtomicReference configFromCallback = new AtomicReference<>(); + globalConfigHandle.registerConfigObserver( + config -> { + latch.countDown(); + throw new RuntimeException(); + }); + globalConfigHandle.registerConfigObserver( + config -> { + configFromCallback.set(config); + latch.countDown(); + }); + globalConfigHandle.setConfig(configToSet); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configFromCallback.get(), configToSet); + } + + @Test + public void registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger callbackCount = new AtomicInteger(0); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + Supplier configToSet = + () -> + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + globalConfigHandle.registerConfigObserver( + config -> { + callbackCount.incrementAndGet(); + latch.countDown(); + }); + globalConfigHandle.setConfig(configToSet.get()); + // call setter again with same config + globalConfigHandle.setConfig(configToSet.get()); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + assertEquals(1, callbackCount.get()); + } + + @Test + public void registerConfigObserver_updateConfigWhenCallbackIsRunning() + throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + StreamingGlobalConfig initialConfig = + StreamingGlobalConfig.builder() + .setOperationalLimits(OperationalLimits.builder().setMaxOutputValueBytes(4569).build()) + .build(); + StreamingGlobalConfig updatedConfig = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(123) + .setMaxOutputKeyBytes(324) + .setMaxWorkItemCommitBytes(456) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(false) + .build()) + .build(); + CopyOnWriteArrayList configsFromCallback = new CopyOnWriteArrayList<>(); + globalConfigHandle.registerConfigObserver( + config -> { + configsFromCallback.add(config); + if (config.equals(initialConfig)) { + globalConfigHandle.setConfig(updatedConfig); + } + latch.countDown(); + }); + globalConfigHandle.setConfig(initialConfig); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertEquals(configsFromCallback.get(0), initialConfig); + assertEquals(configsFromCallback.get(1), updatedConfig); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 4677ff9dcc9a..3b3348dbc3fa 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -923,6 +923,15 @@ message WorkerMetadataResponse { reserved 4; } +// Settings to control runtime behavior of the java runner v1 user worker. +message UserWorkerRunnerV1Settings { + // If true, use separate channels for each windmill RPC. + optional bool use_windmill_isolated_channels = 1 [default = true]; + + // If true, use separate streaming RPC for windmill heartbeats and state reads. + optional bool use_separate_windmill_heartbeat_streams = 2 [default = true]; +} + service WindmillAppliance { // Gets streaming Dataflow work. rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);