Skip to content

Commit

Permalink
Skip remote-repositories validations for node-joins when Repositories…
Browse files Browse the repository at this point in the history
…Service is not in sync with cluster-state (opensearch-project#16763)

* Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state

Signed-off-by: Pranshu Shukla <[email protected]>
Signed-off-by: Mingshi Liu <[email protected]>
  • Loading branch information
Pranshu-S authored and mingshl committed Dec 16, 2024
1 parent ea88a2a commit 4509065
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered Caching] Fix bug in cache stats API ([#16560](https://github.com/opensearch-project/OpenSearch/pull/16560))
- Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702))
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,37 @@
package org.opensearch.discovery;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.disruption.SlowClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -250,4 +264,142 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
ensureStableCluster(3);
}

/**
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
final String remoteStateRepoName = "remote-state-repo";
final String remoteRoutingTableRepoName = "routing-table-repo";

Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
remoteStateRepoName,
ReloadableFsRepository.TYPE,
remoteRoutingTableRepoName,
ReloadableFsRepository.TYPE
);
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startDataOnlyNodes(3);

String clusterManagerNode = internalCluster().getClusterManagerName();
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames())
.filter(node -> !node.equals(clusterManagerNode))
.collect(Collectors.toList());

ensureStableCluster(6);

MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
nonClusterManagerNodes.forEach(node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
throw new FailedToCommitClusterStateException("Blocking Commit");
}
connection.sendRequest(requestId, action, request, options);
});
});

logger.info("Starting Node with remote publication settings");
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

// Checking if publish succeeded in the nodes before shutting down the blocked cluster-manager
assertBusy(() -> {
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, randomNode);

ClusterState state = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL).getLastAcceptedState();
RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

assertNotNull(repositoriesMetadata);
assertNotNull(repositoriesMetadata.repositories());

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}
// Asserting that the metadata is present in the persisted cluster-state
assertTrue(isRemoteStateRepoConfigured);
assertTrue(isRemoteRoutingTableRepoConfigured);

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

// Asserting that the metadata is not present in the repository service.
Assert.assertFalse(isRemoteStateRepoConfigured);
Assert.assertFalse(isRemoteRoutingTableRepoConfigured);
});

logger.info("Stopping current Cluster Manager");
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
// we had the new cluster-state published but not commited.
internalCluster().stopCurrentClusterManagerNode();

// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
// the repository service will not be updated as the commit attempt failed.
ensureStableCluster(6);

String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));

// Checking if the final cluster-state is updated.
RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode)
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);

Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}

Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

isRemoteStateRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteStateRepoName);
isRemoteRoutingTableRepoConfigured = isRepoPresentInRepositoryService(repositoriesService, remoteRoutingTableRepoName);

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);

logger.info("Stopping current Cluster Manager");
}

private Boolean isRepoPresentInRepositoryService(RepositoriesService repositoriesService, String repoName) {
try {
Repository remoteStateRepo = repositoriesService.repository(repoName);
if (Objects.nonNull(remoteStateRepo)) {
return Boolean.TRUE;
}
} catch (RepositoryMissingException e) {
return Boolean.FALSE;
}

return Boolean.FALSE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -183,6 +184,20 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
try {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
// but the commit operation failed, the cluster-state may have the repository metadata which is not applied
// into the repository service. This may lead to assertion failures down the line.
repositoriesService.get().repository(newRepositoryMetadata.name());
} catch (RepositoryMissingException e) {
logger.warn(
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
+ "in the repository service.",
newRepositoryMetadata.name()
);
break;
}

try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -904,6 +905,12 @@ public void ensureValidSystemRepositoryUpdate(RepositoryMetadata newRepositoryMe
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();

assert Objects.nonNull(repository) : String.format(
Locale.ROOT,
"repository [%s] not present in RepositoryService",
currentRepositoryMetadata.name()
);

List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
}

public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
Map<String, String> newNodeAttributes = new HashMap<>();
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));

final AllocationService allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);

final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
Settings.EMPTY,
allocationService,
logger,
rerouteService,
remoteStoreNodeService
);

final DiscoveryNode clusterManagerNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
{
add(clusterStateRepo);
add(routingTableRepo);
}
};

final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(
DiscoveryNodes.builder()
.add(clusterManagerNode)
.localNodeId(clusterManagerNode.getId())
.clusterManagerNodeId(clusterManagerNode.getId())
)
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
.build();

final DiscoveryNode joiningNode = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
newNodeAttributes,
DiscoveryNodeRole.BUILT_IN_ROLES,
Version.CURRENT
);

final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
clusterState,
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
);
assertThat(result.executionResults.entrySet(), hasSize(1));
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
assertTrue(taskResult.isSuccess());
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);

}

private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,10 +2322,24 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public List<String> startNodes(int numOfNodes, Settings settings, Boolean waitForNodeJoin) {
return startNodes(waitForNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Settings... extraSettings) {
return startNodes(false, extraSettings);
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Boolean waitForNodeJoin, Settings... extraSettings) {
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
final int defaultMinClusterManagerNodes;
if (autoManageClusterManagerNodes) {
Expand Down Expand Up @@ -2377,7 +2391,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
nodes.add(nodeAndClient);
}
startAndPublishNodesAndClients(nodes);
if (autoManageClusterManagerNodes) {
if (autoManageClusterManagerNodes && !waitForNodeJoin) {
validateClusterFormed();
}
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
Expand Down Expand Up @@ -2422,6 +2436,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
}

public List<String> startSearchOnlyNodes(int numNodes) {
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down
Loading

0 comments on commit 4509065

Please sign in to comment.