Skip to content

Commit

Permalink
Merge pull request #922 from peternied/meta-output
Browse files Browse the repository at this point in the history
Metadata Migration with user centric output
  • Loading branch information
peternied authored Sep 5, 2024
2 parents e1aa766 + 382fb75 commit 53a1ed5
Show file tree
Hide file tree
Showing 77 changed files with 1,997 additions and 777 deletions.
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {

implementation project(":coreUtilities")
implementation project(":RFS")
implementation project(":transformation")

implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.UUID;
import java.util.function.Function;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionConverter;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
Expand All @@ -25,19 +28,15 @@
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Repo;
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
Expand Down Expand Up @@ -132,10 +131,9 @@ public static class Args {
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible "
+ "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false,
converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion = ClusterVersion.ES_7_10;
@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Default: ES_7.10"), required = false,
converter = VersionConverter.class)
public Version sourceVersion = Version.fromString("ES 7.10");
}

public static class NoWorkLeftException extends Exception {
Expand Down Expand Up @@ -218,11 +216,8 @@ public static void main(String[] args) throws Exception {
}
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(arguments.sourceVersion, sourceRepo);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
Expand All @@ -236,10 +231,10 @@ public static void main(String[] args) throws Exception {
workCoordinator,
arguments.initialLeaseDuration,
processManager,
indexMetadataFactory,
sourceResourceProvider.getIndexMetadata(),
arguments.snapshotName,
arguments.indexAllowlist,
shardMetadataFactory,
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
arguments.maxShardSizeBytes,
rootDocumentContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public class EndToEndTest extends SourceTestBase {
@TempDir
private File localDirectory;

@ParameterizedTest(name = "Target OpenSearch {0}")
@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersion) throws Exception {
public void migrateFrom_ES_v6_8(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
final var snapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking();
final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking();
Expand Down Expand Up @@ -93,7 +93,7 @@ public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersi
List.of(),
List.of(),
metadataContext,
sourceCluster.getVersion().getSourceVersion()
sourceCluster.getContainerVersion().getVersion()
);

// Check that the templates were migrated
Expand All @@ -111,7 +111,7 @@ public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersi
targetCluster.getUrl(),
clockJitter,
testDocMigrationContext,
sourceCluster.getVersion().getSourceVersion(),
sourceCluster.getContainerVersion().getVersion(),
false
);
assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED));
Expand All @@ -123,9 +123,9 @@ public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersi
}
}

@ParameterizedTest(name = "Target OpenSearch {0}")
@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v7_10(final SearchClusterContainer.Version targetVersion) throws Exception {
public void migrateFrom_ES_v7_10(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2);
final var targetCluster = new SearchClusterContainer(targetVersion)
Expand All @@ -134,9 +134,9 @@ public void migrateFrom_ES_v7_10(final SearchClusterContainer.Version targetVers
}
}

@ParameterizedTest(name = "Target OpenSearch {0}")
@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v7_17(final SearchClusterContainer.Version targetVersion) throws Exception {
public void migrateFrom_ES_v7_17(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_17);
final var targetCluster = new SearchClusterContainer(targetVersion)
Expand Down Expand Up @@ -207,7 +207,7 @@ private void migrateFrom_ES_v7_X(
List.of(indexTemplateName),
List.of(),
metadataContext,
sourceCluster.getVersion().getSourceVersion()
sourceCluster.getContainerVersion().getVersion()
);

// Check that the templates were migrated
Expand All @@ -225,7 +225,7 @@ private void migrateFrom_ES_v7_X(
targetCluster.getUrl(),
clockJitter,
testDocMigrationContext,
sourceCluster.getVersion().getSourceVersion(),
sourceCluster.getContainerVersion().getVersion(),
false
);
assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
@Tag("longTest")
@Slf4j
public class ParallelDocumentMigrationsTest extends SourceTestBase {
final static List<SearchClusterContainer.Version> SOURCE_IMAGES = List.of(

final static List<SearchClusterContainer.ContainerVersion> SOURCE_IMAGES = List.of(
SearchClusterContainer.ES_V7_10_2,
SearchClusterContainer.ES_V7_17
);
final static List<SearchClusterContainer.Version> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);
final static List<SearchClusterContainer.ContainerVersion> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);

public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream()
Expand Down Expand Up @@ -73,8 +74,8 @@ public static Stream<Arguments> makeDocumentMigrationArgs() {
@MethodSource("makeDocumentMigrationArgs")
public void testDocumentMigration(
int numWorkers,
SearchClusterContainer.Version targetVersion,
SearchClusterContainer.Version baseSourceImageVersion,
SearchClusterContainer.ContainerVersion targetVersion,
SearchClusterContainer.ContainerVersion baseSourceImageVersion,
String generatorImage,
String[] generatorArgs,
boolean compressionEnabled
Expand Down Expand Up @@ -127,7 +128,7 @@ public void testDocumentMigration(
.build()
.toConnectionContext());
var sourceRepo = new FileSystemRepo(tempDir);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, List.of(), List.of(), List.of(), INDEX_ALLOWLIST, testMetadataMigrationContext, baseSourceImageVersion.getSourceVersion());
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, List.of(), List.of(), List.of(), INDEX_ALLOWLIST, testMetadataMigrationContext, baseSourceImageVersion.getVersion());

var workerFutures = new ArrayList<CompletableFuture<Integer>>();
var runCounter = new AtomicInteger();
Expand All @@ -144,7 +145,7 @@ public void testDocumentMigration(
runCounter,
clockJitter,
testDocMigrationContext,
baseSourceImageVersion.getSourceVersion(),
baseSourceImageVersion.getVersion(),
compressionEnabled
),
executorService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.testcontainers.OpensearchContainer;

import com.rfs.common.ClusterVersion;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.FileSystemSnapshotCreator;
import com.rfs.common.OpenSearchClient;
Expand Down Expand Up @@ -74,7 +74,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
final var testMetadataMigrationContext = MetadataMigrationTestContext.factory().noOtelTracking();

var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
var baseSourceImageVersion = (SearchClusterContainer.Version) sourceImageArgs[0];
var baseSourceImageVersion = (SearchClusterContainer.ContainerVersion) sourceImageArgs[0];
var generatorImage = (String) sourceImageArgs[1];
var generatorArgs = (String[]) sourceImageArgs[2];
var targetImageName = SearchClusterContainer.OS_V2_14_0.getImageName();
Expand Down Expand Up @@ -122,7 +122,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
);
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

migrateMetadata(osTargetContainer, tempDirSnapshot, testMetadataMigrationContext, baseSourceImageVersion.getSourceVersion());
migrateMetadata(osTargetContainer, tempDirSnapshot, testMetadataMigrationContext, baseSourceImageVersion.getVersion());

int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow);
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();
Expand All @@ -143,7 +143,7 @@ private static void migrateMetadata(
OpensearchContainer targetContainer,
Path tempDirSnapshot,
MetadataMigrationTestContext testMetadataMigrationContext,
ClusterVersion sourceVersion
Version sourceVersion
) {
String targetAddress = "http://"
+ targetContainer.getHost()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,25 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;

import com.rfs.cms.CoordinateWorkHttpClient;
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.RestClient;
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.http.ConnectionContextTestParams;
import com.rfs.framework.SearchClusterContainer;
import com.rfs.http.SearchClusterRequests;
import com.rfs.models.GlobalMetadata;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.transformers.TransformFunctions;
import com.rfs.transformers.Transformer;
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
Expand All @@ -64,7 +59,7 @@ public class SourceTestBase {
public static final String SOURCE_SERVER_ALIAS = "source";
public final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;

protected static Object[] makeParamsForBase(SearchClusterContainer.Version baseSourceImage) {
protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) {
return new Object[] {
baseSourceImage,
GENERATOR_BASE_IMAGE,
Expand All @@ -80,31 +75,32 @@ protected static void migrateMetadata(
List<String> indexTemplateAllowlist,
List<String> indexAllowlist,
MetadataMigrationTestContext context,
ClusterVersion sourceVersion
Version sourceVersion
) {
SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(sourceVersion);
SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
GlobalMetadata.Factory metadataFactory = sourceResourceProvider.getGlobalMetadataFactory(repoDataProvider);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(sourceVersion, sourceRepo);
var targetVersion = Version.fromString("OS 2.11");
GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11(
targetClient,
legacyTemplateAllowlist,
componentTemplateAllowlist,
indexTemplateAllowlist,
context.createMetadataMigrationContext()
indexTemplateAllowlist
);
Transformer transformer = TransformFunctions.getTransformer(sourceResourceProvider.getVersion(), ClusterVersion.OS_2_11, 1);
new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata();
Transformer transformer = TransformFunctions.getTransformer(sourceVersion, targetVersion, 1);
new MetadataRunner(
snapshotName,
sourceResourceProvider.getGlobalMetadata(),
metadataCreator,
transformer
).migrateMetadata(context.createMetadataMigrationContext());

IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient);
new IndexRunner(
snapshotName,
indexMetadataFactory,
sourceResourceProvider.getIndexMetadata(),
indexCreator,
transformer,
indexAllowlist,
context.createIndexContext()
).migrateIndices();
indexAllowlist
).migrateIndices(context.createIndexContext());
}

@AllArgsConstructor
Expand Down Expand Up @@ -146,7 +142,7 @@ public static int migrateDocumentsSequentially(
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext,
ClusterVersion parserVersion,
Version version,
boolean compressionEnabled
) {
for (int runNumber = 1;; ++runNumber) {
Expand All @@ -158,7 +154,7 @@ public static int migrateDocumentsSequentially(
targetAddress,
clockJitter,
testContext,
parserVersion,
version,
compressionEnabled
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
Expand Down Expand Up @@ -208,7 +204,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context,
ClusterVersion parserVersion,
Version version,
boolean compressionEnabled
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
Expand All @@ -224,7 +220,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
return d;
};

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(parserVersion);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(version, sourceRepo);

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
Expand All @@ -233,9 +229,6 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
sourceResourceProvider.getBufferSizeInBytes()
);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
final int ms_window = 1000;
final var nextClockShift = (int) (clockJitter.nextDouble() * ms_window) - (ms_window / 2);
log.info("nextClockShift=" + nextClockShift);
Expand All @@ -262,10 +255,10 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
),
Duration.ofMinutes(10),
processManager,
indexMetadataFactory,
sourceResourceProvider.getIndexMetadata(),
snapshotName,
indexAllowlist,
shardMetadataFactory,
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
MAX_SHARD_SIZE_BYTES,
context
Expand Down
Loading

0 comments on commit 53a1ed5

Please sign in to comment.