Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Sep 18, 2024
1 parent ce1f07e commit c32695e
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,10 +504,16 @@ static StreamingDataflowWorker forTesting(
executorSupplier)
: new StreamingApplianceComputationConfigFetcher(
windmillServer::getConfig, globalConfigHandle);
StreamingGlobalConfig config = configFetcher.getGlobalConfigHandle().getConfig();
if (!config.windmillServiceEndpoints().isEmpty()) {
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
}
configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
config -> {
if (config.windmillServiceEndpoints().isEmpty()) {
LOG.warn("Received empty windmill service endpoints");
return;
}
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
});
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
ComputationStateCache computationStateCache =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class StreamingGlobalConfigHandleImpl implements StreamingGlobalConfigHan
private final CopyOnWriteArrayList<ConfigCallback> configCallbacks = new CopyOnWriteArrayList<>();

@Override
@Nonnull
public StreamingGlobalConfig getConfig() {
Preconditions.checkState(
streamingEngineConfig.get() != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void addStreamingEngineStatusPages() {
"jobSettings",
"User Worker Job Settings",
writer -> {
StreamingGlobalConfig config = globalConfig.get();
@Nullable StreamingGlobalConfig config = globalConfig.get();
if (config == null) {
writer.println("Job Settings not loaded.");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public boolean hasInitializedEndpoints() {

public void onJobConfig(StreamingGlobalConfig config) {
if (config.windmillServiceEndpoints().isEmpty()) {
LOG.warn("Dispatcher client received empty windmill service endpoints from global config");
return;
}
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public Long get() {
@Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC();
@Rule public ErrorCollector errorCollector = new ErrorCollector();
WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class);
StreamingGlobalConfigHandleImpl mockglobalConfigHandle =
StreamingGlobalConfigHandleImpl mockGlobalConfigHandle =
mock(StreamingGlobalConfigHandleImpl.class);
HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class);

Expand Down Expand Up @@ -840,7 +840,7 @@ private DataflowWorkerHarnessOptions createTestingPipelineOptions(String... args

private StreamingDataflowWorker makeWorker(
StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
when(mockglobalConfigHandle.getConfig())
when(mockGlobalConfigHandle.getConfig())
.thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig());
StreamingDataflowWorker worker =
StreamingDataflowWorker.forTesting(
Expand All @@ -855,7 +855,7 @@ private StreamingDataflowWorker makeWorker(
hotKeyLogger,
streamingDataflowWorkerTestParams.clock(),
streamingDataflowWorkerTestParams.executorSupplier(),
mockglobalConfigHandle,
mockGlobalConfigHandle,
streamingDataflowWorkerTestParams.localRetryTimeoutMs());
this.computationStateCache = worker.getComputationStateCache();
return worker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ public void getConfig() {
.build();
globalConfigHandle.setConfig(config);
assertEquals(config, globalConfigHandle.getConfig());

StreamingGlobalConfig updatedConfig =
StreamingGlobalConfig.builder()
.setOperationalLimits(
OperationalLimits.builder()
.setMaxOutputValueBytes(324)
.setMaxOutputKeyBytes(456)
.setMaxWorkItemCommitBytes(123)
.build())
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1")))
.setUserWorkerJobSettings(
UserWorkerRunnerV1Settings.newBuilder()
.setUseSeparateWindmillHeartbeatStreams(true)
.build())
.build();
globalConfigHandle.setConfig(updatedConfig);
assertEquals(updatedConfig, globalConfigHandle.getConfig());
}

@Test
Expand Down Expand Up @@ -263,7 +280,7 @@ public void registerConfigObserver_updateConfigWhenCallbackIsRunning()
globalConfigHandle.registerConfigObserver(
config -> {
configsFromCallback.add(config);
if (globalConfigHandle.getConfig().equals(config)) {
if (config.equals(initialConfig)) {
globalConfigHandle.setConfig(updatedConfig);
}
latch.countDown();
Expand Down

0 comments on commit c32695e

Please sign in to comment.