From 75032b7f8ed1868fc83077dfb1efd7204ba4fa5b Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Sun, 20 Oct 2024 23:34:48 +0530 Subject: [PATCH] fix the existing repo lookup --- .../coordination/JoinTaskExecutor.java | 43 ++++++++++++------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 1d5ffd5fbd940..db8e76e751463 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; @@ -57,6 +58,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -185,19 +187,29 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. Optional remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); - RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - dn, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) - ); - Optional remotePublicationDN = currentNodes.getNodes() .values() .stream() .filter(DiscoveryNode::isRemoteStatePublicationEnabled) .findFirst(); + RepositoriesMetadata existingrepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE); + Map repositories = new LinkedHashMap<>(); + if (existingrepositoriesMetadata != null) { + existingrepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } + if (remoteDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remoteDN.get(), + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } if (remotePublicationDN.isPresent()) { - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(remotePublicationDN.get(), repositoriesMetadata); + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remotePublicationDN.get(), + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); } assert nodesBuilder.isLocalNodeElectedClusterManager(); @@ -228,17 +240,16 @@ public ClusterTasksResult execute(ClusterState currentState, List jo ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); - if (remoteDN.isEmpty() && node.isRemoteStoreNode()) { + if ((remoteDN.isEmpty() && node.isRemoteStoreNode()) + || (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) { // This is hit only on cases where we encounter first remote node logger.info("Updating system repository now for remote store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata); - } - if (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled()) { - // This is hit only on cases where we encounter first remote publication node - logger.info("Updating system repository now for remote publication store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(node, repositoriesMetadata); + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + node, + existingrepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); } - nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -252,7 +263,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo } results.success(joinTask); } - + RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values())); if (nodesChanged) { rerouteService.reroute( "post-join reroute",