diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index bc9fd8228c70..0dedd4f34fd6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -504,10 +504,16 @@ static StreamingDataflowWorker forTesting( executorSupplier) : new StreamingApplianceComputationConfigFetcher( windmillServer::getConfig, globalConfigHandle); - StreamingGlobalConfig config = configFetcher.getGlobalConfigHandle().getConfig(); - if (!config.windmillServiceEndpoints().isEmpty()) { - windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints()); - } + configFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + if (config.windmillServiceEndpoints().isEmpty()) { + LOG.warn("Received empty windmill service endpoints"); + return; + } + windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints()); + }); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); ComputationStateCache computationStateCache = diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java index aa92ea319fe8..9ed5c9fcf396 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java @@ -40,6 +40,7 @@ public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHan private final CopyOnWriteArrayList configCallbacks = new CopyOnWriteArrayList<>(); @Override + @Nonnull public StreamingGlobalConfig getConfig() { Preconditions.checkState( streamingEngineConfig.get() != null, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java index e2fa2245a3f1..6981312eff1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java @@ -161,7 +161,7 @@ private void addStreamingEngineStatusPages() { "jobSettings", "User Worker Job Settings", writer -> { - StreamingGlobalConfig config = globalConfig.get(); + @Nullable StreamingGlobalConfig config = globalConfig.get(); if (config == null) { writer.println("Job Settings not loaded."); return; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 4a264b84db9c..412608ea3981 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -150,6 +150,7 @@ public boolean hasInitializedEndpoints() { public void onJobConfig(StreamingGlobalConfig config) { if (config.windmillServiceEndpoints().isEmpty()) { + LOG.warn("Dispatcher client received empty windmill service endpoints from global config"); return; } consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 95d3a9f7ce55..dadf02171235 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -277,7 +277,7 @@ public Long get() { @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC(); @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); - StreamingGlobalConfigHandleImpl mockglobalConfigHandle = + StreamingGlobalConfigHandleImpl mockGlobalConfigHandle = mock(StreamingGlobalConfigHandleImpl.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); @@ -840,7 +840,7 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args private StreamingDataflowWorker makeWorker( StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) { - when(mockglobalConfigHandle.getConfig()) + when(mockGlobalConfigHandle.getConfig()) .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig()); StreamingDataflowWorker worker = StreamingDataflowWorker.forTesting( @@ -855,7 +855,7 @@ private StreamingDataflowWorker makeWorker( hotKeyLogger, streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), - mockglobalConfigHandle, + mockGlobalConfigHandle, streamingDataflowWorkerTestParams.localRetryTimeoutMs()); this.computationStateCache = worker.getComputationStateCache(); return worker; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java index 6a8ec6e4b1ee..059f60731a7d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java @@ -56,6 +56,23 @@ public void getConfig() { .build(); globalConfigHandle.setConfig(config); assertEquals(config, globalConfigHandle.getConfig()); + + StreamingGlobalConfig updatedConfig = + StreamingGlobalConfig.builder() + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(324) + .setMaxOutputKeyBytes(456) + .setMaxWorkItemCommitBytes(123) + .build()) + .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1"))) + .setUserWorkerJobSettings( + UserWorkerRunnerV1Settings.newBuilder() + .setUseSeparateWindmillHeartbeatStreams(true) + .build()) + .build(); + globalConfigHandle.setConfig(updatedConfig); + assertEquals(updatedConfig, globalConfigHandle.getConfig()); } @Test @@ -263,7 +280,7 @@ public void registerConfigObserver_updateConfigWhenCallbackIsRunning() globalConfigHandle.registerConfigObserver( config -> { configsFromCallback.add(config); - if (globalConfigHandle.getConfig().equals(config)) { + if (config.equals(initialConfig)) { globalConfigHandle.setConfig(updatedConfig); } latch.countDown();