Skip to content

Commit

Permalink
Merge pull request #856 from peternied/cluster-cli
Browse files Browse the repository at this point in the history
Create command output models for metadata migration tool
  • Loading branch information
peternied authored Aug 15, 2024
2 parents 9c5ac1f + 6f55e83 commit b40a900
Show file tree
Hide file tree
Showing 17 changed files with 522 additions and 8 deletions.
35 changes: 30 additions & 5 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
plugins {
id 'application'
id 'java'
id 'io.freefair.lombok' version '8.6'
id 'io.freefair.lombok'
}

import org.opensearch.migrations.common.CommonUtils

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

Expand All @@ -22,12 +20,39 @@ dependencies {
implementation group: 'com.beust', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'

testImplementation testFixtures(project(path: ':RFS'))
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.testcontainers', name: 'testcontainers'

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
}

application {
mainClassName = 'com.rfs.MetadataMigration'
mainClassName = 'org.opensearch.migrations.MetadataMigration'
}

test {
useJUnitPlatform()
useJUnitPlatform {
excludeTags 'longTest'
}
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
showExceptions true
showCauses true
}
}

task slowTest(type: Test) {
// include longTest
jacoco {
enabled = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.opensearch.migrations;

import java.util.List;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import com.rfs.common.http.ConnectionContext;

public class MetadataArgs {
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
public boolean help;

@Parameter(names = { "--snapshot-name" }, description = "The name of the snapshot to migrate", required = true)
public String snapshotName;

@Parameter(names = {
"--file-system-repo-path" }, required = false, description = "The full path to the snapshot repo on the file system.")
public String fileSystemRepoPath;

@Parameter(names = {
"--s3-local-dir" }, description = "The absolute path to the directory on local disk to download S3 files to", required = false)
public String s3LocalDirPath;

@Parameter(names = {
"--s3-repo-uri" }, description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2", required = false)
public String s3RepoUri;

@Parameter(names = {
"--s3-region" }, description = "The AWS Region the S3 bucket is in, like: us-east-2", required = false)
public String s3Region;

@ParametersDelegate
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@Parameter(names = { "--index-allowlist" }, description = ("Optional. List of index names to migrate"
+ " (e.g. 'logs_2024_01, logs_2024_02'). Default: all non-system indices (e.g. those not starting with '.')"), required = false)
public List<String> indexAllowlist = List.of();

@Parameter(names = {
"--index-template-allowlist" }, description = ("Optional. List of index template names to migrate"
+ " (e.g. 'posts_index_template1, posts_index_template2'). Default: empty list"), required = false)
public List<String> indexTemplateAllowlist = List.of();

@Parameter(names = {
"--component-template-allowlist" }, description = ("Optional. List of component template names to migrate"
+ " (e.g. 'posts_template1, posts_template2'). Default: empty list"), required = false)
public List<String> componentTemplateAllowlist = List.of();

// https://opensearch.org/docs/2.11/api-reference/cluster-api/cluster-awareness/
@Parameter(names = {
"--min-replicas" }, description = ("Optional. The minimum number of replicas configured for migrated indices on the target."
+ " This can be useful for migrating to targets which use zonal deployments and require additional replicas to meet zone requirements. Default: 0"), required = false)
public int minNumberOfReplicas = 0;

@Parameter(required = false, names = {
"--otel-collector-endpoint" }, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
+ "forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.opensearch.migrations;

import org.opensearch.migrations.commands.Configure;
import org.opensearch.migrations.commands.Evaluate;
import org.opensearch.migrations.commands.Migrate;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
import org.opensearch.migrations.tracing.CompositeContextTracker;
import org.opensearch.migrations.tracing.RootOtelContext;

import com.beust.jcommander.JCommander;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MetadataMigration {

public static void main(String[] args) throws Exception {
var arguments = new MetadataArgs();
var jCommander = JCommander.newBuilder().addObject(arguments).build();
jCommander.parse(args);

if (arguments.help) {
jCommander.usage();
return;
}

var context = new RootMetadataMigrationContext(
RootOtelContext.initializeOpenTelemetryWithCollectorOrAsNoop(arguments.otelCollectorEndpoint, "metadata"),
new CompositeContextTracker(new ActiveContextTracker(), new ActiveContextTrackerByActivityType())
);

var meta = new MetadataMigration(arguments);
var result = meta.migrate().execute(context);
System.exit(result.getExitCode());
}

private final MetadataArgs arguments;

public MetadataMigration(MetadataArgs arguments) {
this.arguments = arguments;
}

public Configure configure() {
return new Configure();
}

public Evaluate evaluate() {
return new Evaluate();
}

public Migrate migrate() {
return new Migrate(arguments);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.migrations.commands;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Configure {

public ConfigureResult execute() {
log.atError().setMessage("configure is not supported").log();
return new ConfigureResult(9999);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.migrations.commands;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public class ConfigureResult implements Result {
@Getter
private final int exitCode;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.migrations.commands;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Evaluate {

public EvaluateResult execute() {
log.atError().setMessage("evaluate is not supported").log();
return new EvaluateResult(9999);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.migrations.commands;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public class EvaluateResult implements Result {
@Getter
private final int exitCode;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.opensearch.migrations.commands;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import org.opensearch.migrations.MetadataArgs;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;

import com.beust.jcommander.ParameterException;
import com.rfs.common.ClusterVersion;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Repo;
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SourceRepo;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.version_os_2_11.IndexCreator_OS_2_11;
import com.rfs.worker.IndexRunner;
import com.rfs.worker.MetadataRunner;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Migrate {

static final int INVALID_PARAMETER_CODE = 999;
static final int UNEXPECTED_FAILURE_CODE = 888;
private final MetadataArgs arguments;

public Migrate(MetadataArgs arguments) {
this.arguments = arguments;
}

public MigrateResult execute(RootMetadataMigrationContext context) {
log.atInfo().setMessage("Command line arguments {0}").addArgument(arguments::toString).log();
try {
if (arguments.fileSystemRepoPath == null && arguments.s3RepoUri == null) {
throw new ParameterException("Either file-system-repo-path or s3-repo-uri must be set");
}
if (arguments.fileSystemRepoPath != null && arguments.s3RepoUri != null) {
throw new ParameterException("Only one of file-system-repo-path and s3-repo-uri can be set");
}
if ((arguments.s3RepoUri != null) && (arguments.s3Region == null || arguments.s3LocalDirPath == null)) {
throw new ParameterException("If an s3 repo is being used, s3-region and s3-local-dir-path must be set");
}
} catch (Exception e) {
log.atError().setMessage("Invalid parameter").setCause(e).log();
return new MigrateResult(INVALID_PARAMETER_CODE);
}

final String snapshotName = arguments.snapshotName;
final Path fileSystemRepoPath = arguments.fileSystemRepoPath != null
? Paths.get(arguments.fileSystemRepoPath)
: null;
final Path s3LocalDirPath = arguments.s3LocalDirPath != null ? Paths.get(arguments.s3LocalDirPath) : null;
final String s3RepoUri = arguments.s3RepoUri;
final String s3Region = arguments.s3Region;
final List<String> indexAllowlist = arguments.indexAllowlist;
final List<String> indexTemplateAllowlist = arguments.indexTemplateAllowlist;
final List<String> componentTemplateAllowlist = arguments.componentTemplateAllowlist;
final int awarenessDimensionality = arguments.minNumberOfReplicas + 1;

try {
log.info("Running RfsWorker");
final OpenSearchClient targetClient = new OpenSearchClient(arguments.targetArgs.toConnectionContext());

final SourceRepo sourceRepo = fileSystemRepoPath != null
? new FileSystemRepo(fileSystemRepoPath)
: S3Repo.create(s3LocalDirPath, new S3Uri(s3RepoUri), s3Region);
final SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
final GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider);
final GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(
targetClient,
List.of(),
componentTemplateAllowlist,
indexTemplateAllowlist,
context.createMetadataMigrationContext()
);
final Transformer transformer = TransformFunctions.getTransformer(
ClusterVersion.ES_7_10,
ClusterVersion.OS_2_11,
awarenessDimensionality
);
new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata();
log.info("Metadata copy complete.");

final IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
final IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(
snapshotName,
indexMetadataFactory,
indexCreator,
transformer,
indexAllowlist,
context.createIndexContext()
).migrateIndices();
log.info("Index copy complete.");
} catch (Throwable e) {
log.atError().setMessage("Unexpected failure").setCause(e).log();
return new MigrateResult(UNEXPECTED_FAILURE_CODE);
}

return new MigrateResult(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.opensearch.migrations.commands;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public class MigrateResult implements Result {
@Getter
private final int exitCode;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.opensearch.migrations.commands;

/** All shared cli result information */
public interface Result {
int getExitCode();
}
Loading

0 comments on commit b40a900

Please sign in to comment.