Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state #16763

Merged
merged 12 commits into from
Dec 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,33 @@
package org.opensearch.discovery;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.disruption.ServiceDisruptionScheme;
import org.opensearch.test.disruption.SlowClusterStateProcessing;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
Expand Down Expand Up @@ -250,4 +260,87 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
ensureStableCluster(3);
}

/**
* Test Repositories Configured Node Join Commit failures.
*/
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
final String remoteStateRepoName = "remote-state-repo";
final String remoteRoutingTableRepoName = "routing-table-repo";

Settings remotePublicationSettings = buildRemotePublicationNodeAttributes(
remoteStateRepoName,
ReloadableFsRepository.TYPE,
remoteRoutingTableRepoName,
ReloadableFsRepository.TYPE
);
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startDataOnlyNodes(3);

String clusterManagerNode = internalCluster().getClusterManagerName();
List<String> nonClusterManagerNodes = Arrays.stream(internalCluster().getNodeNames())
.filter(node -> !node.equals(clusterManagerNode))
.collect(Collectors.toList());

ensureStableCluster(6);

MockTransportService clusterManagerTransportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
clusterManagerNode
);
logger.info("Blocking Cluster Manager Commit Request on all nodes");
nonClusterManagerNodes.forEach(node -> {
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
clusterManagerTransportService.addOpenSearchFailureException(
targetTransportService,
new FailedToCommitClusterStateException("Blocking Commit"),
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME
);
});

logger.info("Starting Node with remote publication settings");
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);

Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Stopping current Cluster Manager");
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(6);

String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));

RepositoriesMetadata repositoriesMetadata = internalCluster().getInstance(ClusterService.class, randomNode)
.state()
.metadata()
.custom(RepositoriesMetadata.TYPE);

Boolean isRemoteStateRepoConfigured = Boolean.FALSE;
Boolean isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

for (RepositoryMetadata repo : repositoriesMetadata.repositories()) {
if (repo.name().equals(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
} else if (repo.name().equals(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}
}

Assert.assertTrue("RemoteState Repo is not set in RepositoriesMetadata", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoriesMetadata", isRemoteRoutingTableRepoConfigured);

isRemoteStateRepoConfigured = Boolean.FALSE;
isRemoteRoutingTableRepoConfigured = Boolean.FALSE;

RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);

if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) {
isRemoteStateRepoConfigured = Boolean.TRUE;
}
if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) {
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
}

Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);
Assert.assertTrue("RemoteRoutingTable Repo is not set in RepositoryService", isRemoteRoutingTableRepoConfigured);

logger.info("Stopping current Cluster Manager");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
boolean repositoryAlreadyPresent = false;
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
// repository service. This may lead to assertion failures down the line.
if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(
"remote repository [{}] in cluster-state but repository-service but not present "
+ "in repository-service, skipping checks",
newRepositoryMetadata.name()
);
break;
}
try {
// This will help in handling two scenarios -
// 1. When a fresh cluster is formed and a node tries to join the cluster, the repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -578,6 +579,10 @@
throw new RepositoryMissingException(repositoryName);
}

public Boolean isRepositoryPresent(final String repositoryName) {
return Objects.nonNull(repositories.get(repositoryName));

Check warning on line 583 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L583

Added line #L583 was not covered by tests
}

public List<RepositoryStatsSnapshot> repositoriesStats() {
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
return activeRepoStats;
Expand Down Expand Up @@ -904,6 +909,12 @@
Settings newRepositoryMetadataSettings = newRepositoryMetadata.settings();
Settings currentRepositoryMetadataSettings = currentRepositoryMetadata.settings();

assert Objects.nonNull(repository) : String.format(
Locale.ROOT,
"repository [%s] not present in RepositoryService",
currentRepositoryMetadata.name()

Check warning on line 915 in server/src/main/java/org/opensearch/repositories/RepositoriesService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/RepositoriesService.java#L915

Added line #L915 was not covered by tests
);

List<String> restrictedSettings = repository.getRestrictedSystemRepositorySettings()
.stream()
.map(setting -> setting.getKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2322,10 +2322,24 @@ public List<String> startNodes(int numOfNodes, Settings settings) {
return startNodes(Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public List<String> startNodes(int numOfNodes, Settings settings, Boolean ignoreNodeJoin) {
Pranshu-S marked this conversation as resolved.
Show resolved Hide resolved
return startNodes(ignoreNodeJoin, Collections.nCopies(numOfNodes, settings).toArray(new Settings[0]));
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Settings... extraSettings) {
return startNodes(false, extraSettings);
}

/**
* Starts multiple nodes with the given settings and returns their names
*/
public synchronized List<String> startNodes(Boolean ignoreNodeJoin, Settings... extraSettings) {
final int newClusterManagerCount = Math.toIntExact(Stream.of(extraSettings).filter(DiscoveryNode::isClusterManagerNode).count());
final int defaultMinClusterManagerNodes;
if (autoManageClusterManagerNodes) {
Expand Down Expand Up @@ -2377,7 +2391,7 @@ public synchronized List<String> startNodes(Settings... extraSettings) {
nodes.add(nodeAndClient);
}
startAndPublishNodesAndClients(nodes);
if (autoManageClusterManagerNodes) {
if (autoManageClusterManagerNodes && !ignoreNodeJoin) {
validateClusterFormed();
}
return nodes.stream().map(NodeAndClient::getName).collect(Collectors.toList());
Expand Down Expand Up @@ -2422,6 +2436,10 @@ public List<String> startDataOnlyNodes(int numNodes, Settings settings) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build());
}

public List<String> startDataOnlyNodes(int numNodes, Settings settings, Boolean ignoreNodeJoin) {
return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build(), ignoreNodeJoin);
}

public List<String> startSearchOnlyNodes(int numNodes) {
return startSearchOnlyNodes(numNodes, Settings.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import reactor.util.annotation.NonNull;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
Expand Down Expand Up @@ -2915,6 +2917,43 @@ protected static Settings buildRemoteStoreNodeAttributes(
return settings.build();
}

protected Settings buildRemotePublicationNodeAttributes(
@NonNull String remoteStateRepoName,
@NonNull String remoteStateRepoType,
@NonNull String routingTableRepoName,
@NonNull String routingTableRepoType
) {
String remoteStateRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
remoteStateRepoName
);
String routingTableRepositoryTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
routingTableRepoName
);
String remoteStateRepositorySettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
remoteStateRepoName
);
String routingTableRepositorySettingsAttributeKeyPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
routingTableRepoName
);

return Settings.builder()
.put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, remoteStateRepoName)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(remoteStateRepositoryTypeAttributeKey, remoteStateRepoType)
.put(routingTableRepositoryTypeAttributeKey, routingTableRepoType)
.put(remoteStateRepositorySettingsAttributeKeyPrefix + "location", randomRepoPath().toAbsolutePath())
.put(routingTableRepositorySettingsAttributeKeyPrefix + "location", randomRepoPath().toAbsolutePath())
.build();
}

public static String resolvePath(IndexId indexId, String shardId) {
PathType pathType = PathType.fromCode(indexId.getShardPathType());
RemoteStorePathStrategy.SnapshotShardPathInput shardPathInput = new RemoteStorePathStrategy.SnapshotShardPathInput.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterModule;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -376,6 +377,47 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final
});
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportService transportService,
final OpenSearchException exception,
final String... blockedActions
) {
addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions)));
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportService transportService,
OpenSearchException exception,
final Set<String> blockedActions
) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
addOpenSearchFailureException(transportAddress, exception, blockedActions);
}
}

/**
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
*/
public void addOpenSearchFailureException(
TransportAddress transportAddress,
OpenSearchException exception,
final Set<String> blockedActions
) {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
throw exception;
}
connection.sendRequest(requestId, action, request, options);
});
}

/**
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
* and failing to connect once the rule was added.
Expand Down
Loading