Skip to content

Commit

Permalink
Merge pull request #783 from peternied/transformer-min
Browse files Browse the repository at this point in the history
Basic TransformationRule: IndexMappingTypeRemoval
  • Loading branch information
peternied authored Jul 10, 2024
2 parents 07bc3ef + 65d6cef commit c6b1cc9
Show file tree
Hide file tree
Showing 65 changed files with 1,040 additions and 431 deletions.
22 changes: 5 additions & 17 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import com.rfs.common.ConnectionDetails;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3SnapshotCreator;
Expand Down Expand Up @@ -41,22 +43,8 @@ public static class Args {
)
public String s3Region;

@Parameter(names = {"--source-host"},
required = true,
description = "The source host and port (e.g. http://localhost:9200)")
public String sourceHost;

@Parameter(names = {"--source-username"},
description = "Optional. The source username; if not provided, will assume no auth on source")
public String sourceUser = null;

@Parameter(names = {"--source-password"},
description = "Optional. The source password; if not provided, will assume no auth on source")
public String sourcePass = null;

@Parameter(names = {"--source-insecure"},
description = "Allow untrusted SSL certificates for source")
public boolean sourceInsecure = false;
@ParametersDelegate
public ConnectionDetails.SourceArgs sourceArgs;

@Parameter(names = {"--no-wait"}, description = "Optional. If provided, the snapshot runner will not wait for completion")
public boolean noWait = false;
Expand Down Expand Up @@ -96,7 +84,7 @@ public static void main(String[] args) throws Exception {
run(c -> ((arguments.fileSystemRepoPath != null)
? new FileSystemSnapshotCreator(arguments.snapshotName, c, arguments.fileSystemRepoPath)
: new S3SnapshotCreator(arguments.snapshotName, c, arguments.s3RepoUri, arguments.s3Region, arguments.maxSnapshotRateMBPerNode)),
new OpenSearchClient(arguments.sourceHost, arguments.sourceUser, arguments.sourcePass, arguments.sourceInsecure),
new OpenSearchClient(new ConnectionDetails(arguments.sourceArgs)),
arguments.noWait
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
import com.rfs.common.ShardMetadata;
import com.rfs.common.S3Repo;
import com.rfs.common.SourceRepo;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.RestClient;
import com.rfs.common.ShardMetadata;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.framework.SearchClusterContainer;
import com.rfs.http.SearchClusterRequests;
import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
Expand Down Expand Up @@ -89,7 +89,6 @@ public class FullTest {
SearchClusterContainer.ES_V7_17
);
final static List<SearchClusterContainer.Version> TARGET_IMAGES = List.of(
SearchClusterContainer.OS_V1_3_16,
SearchClusterContainer.OS_V2_14_0
);
public static final String SOURCE_SERVER_ALIAS = "source";
Expand Down
26 changes: 7 additions & 19 deletions MetadataMigration/src/main/java/com/rfs/MetadataMigration.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;

import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -11,14 +12,14 @@
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.GlobalMetadata;
import com.rfs.common.IndexMetadata;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Repo;
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SourceRepo;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
Expand Down Expand Up @@ -49,18 +50,9 @@ public static class Args {
@Parameter(names = {"--s3-region"}, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = false)
public String s3Region;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "Optional. The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "Optional. The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;

@Parameter(names = {"--target-insecure"}, description = "Allow untrusted SSL certificates for target", required = false)
public boolean targetInsecure = false;

@ParametersDelegate
public ConnectionDetails.TargetArgs targetArgs;

@Parameter(names = {"--index-allowlist"}, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();
Expand Down Expand Up @@ -103,16 +95,12 @@ public static void main(String[] args) throws Exception {
final Path s3LocalDirPath = arguments.s3LocalDirPath != null ? Paths.get(arguments.s3LocalDirPath) : null;
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final String targetHost = arguments.targetHost;
final String targetUser = arguments.targetUser;
final String targetPass = arguments.targetPass;
final List<String> indexAllowlist = arguments.indexAllowlist;
final boolean targetInsecure = arguments.targetInsecure;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;

final ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass, targetInsecure);
final ConnectionDetails targetConnection = new ConnectionDetails(arguments.targetArgs);


TryHandlePhaseFailure.executeWithTryCatch(() -> {
Expand Down
2 changes: 1 addition & 1 deletion RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ ext {

dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(':coreUtilities')
implementation project(':transformation')

implementation group: 'com.beust', name: 'jcommander'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
Expand Down
66 changes: 25 additions & 41 deletions RFS/src/main/java/com/rfs/ReindexFromSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.lucene.document.Document;
import org.apache.logging.log4j.Level;
Expand All @@ -16,6 +17,10 @@
import reactor.core.publisher.Flux;

import com.rfs.common.*;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.models.SnapshotMetadata;
import com.rfs.transformers.*;
import com.rfs.version_es_6_8.*;
import com.rfs.version_es_7_10.*;
Expand Down Expand Up @@ -46,23 +51,11 @@ public static class Args {
@Parameter(names = {"-l", "--lucene-dir"}, description = "The absolute path to the directory where we'll put the Lucene docs", required = true)
public String luceneDirPath;

@Parameter(names = {"--source-host"}, description = "The source host and port (e.g. http://localhost:9200)", required = false)
public String sourceHost = null;
@ParametersDelegate
public ConnectionDetails.SourceArgs sourceArgs;

@Parameter(names = {"--source-username"}, description = "The source username; if not provided, will assume no auth on source", required = false)
public String sourceUser = null;

@Parameter(names = {"--source-password"}, description = "The source password; if not provided, will assume no auth on source", required = false)
public String sourcePass = null;

@Parameter(names = {"--target-host"}, description = "The target host and port (e.g. http://localhost:9200)", required = true)
public String targetHost;

@Parameter(names = {"--target-username"}, description = "The target username; if not provided, will assume no auth on target", required = false)
public String targetUser = null;

@Parameter(names = {"--target-password"}, description = "The target password; if not provided, will assume no auth on target", required = false)
public String targetPass = null;
@ParametersDelegate
public ConnectionDetails.TargetArgs targetArgs;

@Parameter(names = {"-s", "--source-version"}, description = "The source cluster's version (e.g. 'es_6_8')", required = true, converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion;
Expand Down Expand Up @@ -108,12 +101,6 @@ public static void main(String[] args) throws InterruptedException {
String s3RepoUri = arguments.s3RepoUri;
String s3Region = arguments.s3Region;
Path luceneDirPath = Paths.get(arguments.luceneDirPath);
String sourceHost = arguments.sourceHost;
String sourceUser = arguments.sourceUser;
String sourcePass = arguments.sourcePass;
String targetHost = arguments.targetHost;
String targetUser = arguments.targetUser;
String targetPass = arguments.targetPass;
int awarenessDimensionality = arguments.minNumberOfReplicas + 1;
ClusterVersion sourceVersion = arguments.sourceVersion;
ClusterVersion targetVersion = arguments.targetVersion;
Expand All @@ -125,8 +112,8 @@ public static void main(String[] args) throws InterruptedException {

Logging.setLevel(logLevel);

ConnectionDetails sourceConnection = new ConnectionDetails(sourceHost, sourceUser, sourcePass);
ConnectionDetails targetConnection = new ConnectionDetails(targetHost, targetUser, targetPass);
ConnectionDetails sourceConnection = new ConnectionDetails(arguments.sourceArgs);
ConnectionDetails targetConnection = new ConnectionDetails(arguments.targetArgs);

// Sanity checks
if (!((sourceVersion == ClusterVersion.ES_6_8) || (sourceVersion == ClusterVersion.ES_7_10))) {
Expand All @@ -145,9 +132,9 @@ public static void main(String[] args) throws InterruptedException {
*
* If you provide the source host, you still need to provide the S3 details or the snapshotLocalRepoDirPath to write the snapshot to.
*/
if (snapshotDirPath != null && (sourceHost != null || s3RepoUri != null)) {
if (snapshotDirPath != null && (arguments.sourceArgs.getHost() != null || s3RepoUri != null)) {
throw new IllegalArgumentException("If you specify a local directory to take the snapshot from, you cannot specify a source host or S3 URI");
} else if (sourceHost != null) {
} else if (arguments.sourceArgs.getHost() != null) {
if (s3RepoUri == null && s3Region == null && s3LocalDirPath == null && snapshotLocalRepoDirPath == null) {
throw new IllegalArgumentException(
"If you specify a source host, you must also specify the S3 details or the snapshotLocalRepoDirPath to write the snapshot to as well");
Expand Down Expand Up @@ -175,7 +162,7 @@ public static void main(String[] args) throws InterruptedException {

try {

if (sourceHost != null) {
if (arguments.sourceArgs.getHost() != null) {
// ==========================================================================================================
// Create the snapshot if necessary
// ==========================================================================================================
Expand Down Expand Up @@ -224,7 +211,7 @@ public static void main(String[] args) throws InterruptedException {
logger.error("Snapshot not found");
return;
}
SnapshotMetadata.Data snapshotMetadata;
SnapshotMetadata snapshotMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
snapshotMetadata = new SnapshotMetadataFactory_ES_6_8().fromRepo(repo, repoDataProvider, snapshotName);
} else {
Expand Down Expand Up @@ -252,7 +239,7 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to read Global Metadata details...");
GlobalMetadata.Data globalMetadata;
GlobalMetadata globalMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
globalMetadata = new GlobalMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName);
} else {
Expand All @@ -269,13 +256,11 @@ public static void main(String[] args) throws InterruptedException {
OpenSearchClient targetClient = new OpenSearchClient(targetConnection);
if (sourceVersion == ClusterVersion.ES_6_8) {
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, templateWhitelist, componentTemplateWhitelist, List.of());
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
var transformedRoot = transformer.transformGlobalMetadata(globalMetadata);
metadataCreator.create(transformedRoot);
} else if (sourceVersion == ClusterVersion.ES_7_10) {
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(targetClient, List.of(), componentTemplateWhitelist, templateWhitelist);
ObjectNode root = globalMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformGlobalMetadata(root);
var transformedRoot = transformer.transformGlobalMetadata(globalMetadata);
metadataCreator.create(transformedRoot);
}
}
Expand All @@ -285,10 +270,10 @@ public static void main(String[] args) throws InterruptedException {
// ==========================================================================================================
logger.info("==================================================================");
logger.info("Attempting to read Index Metadata...");
List<IndexMetadata.Data> indexMetadatas = new ArrayList<>();
List<IndexMetadata> indexMetadatas = new ArrayList<>();
for (SnapshotRepo.Index index : repoDataProvider.getIndicesInSnapshot(snapshotName)) {
logger.info("Reading Index Metadata for index: " + index.getName());
IndexMetadata.Data indexMetadata;
IndexMetadata indexMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
indexMetadata = new IndexMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, index.getName());
} else {
Expand All @@ -306,12 +291,11 @@ public static void main(String[] args) throws InterruptedException {
logger.info("==================================================================");
logger.info("Attempting to recreate the indices...");
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
for (IndexMetadata.Data indexMetadata : indexMetadatas) {
for (IndexMetadata indexMetadata : indexMetadatas) {
String reindexName = indexMetadata.getName() + indexSuffix;
logger.info("Recreating index " + indexMetadata.getName() + " as " + reindexName + " on target...");

ObjectNode root = indexMetadata.toObjectNode();
ObjectNode transformedRoot = transformer.transformIndexMetadata(root);
var transformedRoot = transformer.transformIndexMetadata(indexMetadata);
indexCreator.create(transformedRoot, reindexName, indexMetadata.getId());
}
}
Expand All @@ -332,13 +316,13 @@ public static void main(String[] args) throws InterruptedException {
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(repoAccessor,luceneDirPath, bufferSize);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
for (IndexMetadata indexMetadata : indexMetadatas) {
logger.info("Processing index: " + indexMetadata.getName());
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Shard ID: " + shardId + " ===");

// Get the shard metadata
ShardMetadata.Data shardMetadata;
ShardMetadata shardMetadata;
if (sourceVersion == ClusterVersion.ES_6_8) {
shardMetadata = new ShardMetadataFactory_ES_6_8(repoDataProvider).fromRepo(snapshotName, indexMetadata.getName(), shardId);
} else {
Expand All @@ -362,7 +346,7 @@ public static void main(String[] args) throws InterruptedException {
LuceneDocumentsReader reader = new LuceneDocumentsReader(luceneDirPath);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

for (IndexMetadata.Data indexMetadata : indexMetadatas) {
for (IndexMetadata indexMetadata : indexMetadatas) {
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
logger.info("=== Index Id: " + indexMetadata.getName() + ", Shard ID: " + shardId + " ===");

Expand Down
Loading

0 comments on commit c6b1cc9

Please sign in to comment.