diff --git a/.gitignore b/.gitignore index cbaed2c10..b96951d65 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ TrafficCapture/**/out/ opensearch-cluster-cdk/ test/opensearch-cluster-cdk/ TrafficCapture/dockerSolution/src/main/docker/migrationConsole/staging + +# Directory used to temporarily store snapshots during testing +DocumentsFromSnapshotMigration/docker/snapshots/ diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index 5eaedcbf1..143ba1393 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -94,6 +94,9 @@ DockerServiceProps[] dockerServices = [ dockerImageName:"reindex_from_snapshot", inputDir:"./docker", taskDependencies:["copyDockerRuntimeJars"]]), + new DockerServiceProps([projectName:"emptyElasticsearchSource_5_6", + dockerImageName:"empty_elasticsearch_source_5_6", + inputDir:"./docker/TestSource_ES_5_6"]), new DockerServiceProps([projectName:"emptyElasticsearchSource_6_8", dockerImageName:"empty_elasticsearch_source_6_8", inputDir:"./docker/TestSource_ES_6_8"]), @@ -112,6 +115,7 @@ for (dockerService in dockerServices) { dependsOn dep } inputDir = project.file(dockerService.inputDir) + // platform.set("linux/amd64") buildArgs = dockerService.buildArgs images.add("migrations/${dockerService.dockerImageName}:${hash}") images.add("migrations/${dockerService.dockerImageName}:${version}") @@ -123,6 +127,10 @@ dockerCompose { useComposeFiles = ['docker/docker-compose-es710.yml'] projectName = 'rfs-compose' + es56 { + useComposeFiles = ['docker/docker-compose-es56.yml'] + } + es68 { useComposeFiles = ['docker/docker-compose-es68.yml'] } diff --git a/DocumentsFromSnapshotMigration/docker/TestSource_ES_5_6/Dockerfile b/DocumentsFromSnapshotMigration/docker/TestSource_ES_5_6/Dockerfile new file mode 100644 index 000000000..b2650692a --- /dev/null +++ b/DocumentsFromSnapshotMigration/docker/TestSource_ES_5_6/Dockerfile @@ -0,0 +1,20 @@ +FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.16 AS base + +# Configure Elastic +ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml +RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE +RUN echo "xpack.security.enabled: false" >> $ELASTIC_SEARCH_CONFIG_FILE +RUN echo "bootstrap.system_call_filter: false" >> $ELASTIC_SEARCH_CONFIG_FILE +ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/ + +# Make our snapshot directory +USER root +RUN mkdir /snapshots && chown elasticsearch /snapshots +USER elasticsearch + +# We do not install the S3 Repo plugin here, because it is not compatible with modern +# IAM Roles. Specifically, it does not support the AWS_SESSION_TOKEN environment variable. +# We will instead take snapshots into a mounted local volume. + +# Additionally, we will rely on the base image's default entrypoint command to start the +# Elasticsearch service. \ No newline at end of file diff --git a/DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml b/DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml new file mode 100644 index 000000000..0985b84ed --- /dev/null +++ b/DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml @@ -0,0 +1,47 @@ +version: '3.7' +services: + + elasticsearchsource: + image: 'migrations/empty_elasticsearch_source_5_6:latest' + platform: linux/amd64 + networks: + - migrations + environment: + - path.repo=/snapshots + - AWS_ACCESS_KEY_ID=${access_key} + - AWS_SECRET_ACCESS_KEY=${secret_key} + - AWS_SESSION_TOKEN=${session_token} + ports: + - '19200:9200' + volumes: + - ./snapshots:/snapshots + + reindex-from-snapshot: + image: 'migrations/reindex_from_snapshot:latest' + depends_on: + elasticsearchsource: + condition: service_started + opensearchtarget: + condition: service_started + networks: + - migrations + environment: + - AWS_ACCESS_KEY_ID=${access_key} + - AWS_SECRET_ACCESS_KEY=${secret_key} + - AWS_SESSION_TOKEN=${session_token} + volumes: + - ./snapshots:/snapshots + + opensearchtarget: + image: 'opensearchproject/opensearch:2.11.1' + environment: + - discovery.type=single-node + - plugins.security.disabled=true + networks: + - migrations + ports: + - "29200:9200" + +networks: + migrations: + driver: bridge diff --git a/DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml b/DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml index 66120a0e7..85b246425 100644 --- a/DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml +++ b/DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml @@ -6,7 +6,6 @@ services: networks: - migrations environment: - - discovery.type=single-node - path.repo=/snapshots - AWS_ACCESS_KEY_ID=${access_key} - AWS_SECRET_ACCESS_KEY=${secret_key} @@ -14,9 +13,8 @@ services: ports: - '19200:9200' volumes: - - snapshotStorage:/snapshots + - ./snapshots:/snapshots - # Sample command to kick off RFS here: https://github.com/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker reindex-from-snapshot: image: 'migrations/reindex_from_snapshot:latest' depends_on: @@ -31,7 +29,7 @@ services: - AWS_SECRET_ACCESS_KEY=${secret_key} - AWS_SESSION_TOKEN=${session_token} volumes: - - snapshotStorage:/snapshots + - ./snapshots:/snapshots opensearchtarget: image: 'opensearchproject/opensearch:2.11.1' @@ -46,7 +44,3 @@ services: networks: migrations: driver: bridge - -volumes: - snapshotStorage: - driver: local diff --git a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java index 92dd0860c..b1bb92b1f 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/com/rfs/RfsMigrateDocuments.java @@ -25,6 +25,7 @@ import com.rfs.cms.LeaseExpireTrigger; import com.rfs.cms.OpenSearchWorkCoordinator; import com.rfs.cms.ScopedWorkCoordinator; +import com.rfs.common.ClusterVersion; import com.rfs.common.DefaultSourceRepoAccessor; import com.rfs.common.DocumentReindexer; import com.rfs.common.FileSystemRepo; @@ -35,15 +36,13 @@ import com.rfs.common.SnapshotRepo; import com.rfs.common.SnapshotShardUnpacker; import com.rfs.common.SourceRepo; +import com.rfs.common.SourceResourceProvider; +import com.rfs.common.SourceResourceProviderFactory; import com.rfs.common.TryHandlePhaseFailure; import com.rfs.common.http.ConnectionContext; import com.rfs.models.IndexMetadata; import com.rfs.models.ShardMetadata; import com.rfs.tracing.RootWorkCoordinationContext; -import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import com.rfs.worker.DocumentsRunner; import com.rfs.worker.ShardWorkPreparer; import lombok.extern.slf4j.Slf4j; @@ -132,6 +131,11 @@ public static class Args { description = "Optional. The maximum number of connections to simultaneously " + "used to communicate to the target, default 10") int maxConnections = 10; + + @Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible " + + "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false, + converter = ClusterVersion.ArgsConverter.class) + public ClusterVersion sourceVersion = ClusterVersion.ES_7_10; } public static class NoWorkLeftException extends Exception { @@ -212,20 +216,22 @@ public static void main(String[] args) throws Exception { } else { sourceRepo = new FileSystemRepo(snapshotLocalDirPath); } - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + + SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion); + + SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo); + IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider); + ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider); SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory( repoAccessor, luceneDirPath, - ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES + sourceResourceProvider.getBufferSizeInBytes() ); run( - LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE, - ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD), + LuceneDocumentsReader.getFactory(sourceResourceProvider.getSoftDeletesPossible(), + sourceResourceProvider.getSoftDeletesFieldData()), reindexer, workCoordinator, arguments.initialLeaseDuration, diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/EndToEndTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/EndToEndTest.java new file mode 100644 index 000000000..2734fda37 --- /dev/null +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/EndToEndTest.java @@ -0,0 +1,238 @@ +package com.rfs; + +import java.io.File; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; +import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; +import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext; + +import com.rfs.common.FileSystemRepo; +import com.rfs.common.FileSystemSnapshotCreator; +import com.rfs.common.OpenSearchClient; +import com.rfs.common.http.ConnectionContextTestParams; +import com.rfs.framework.SearchClusterContainer; +import com.rfs.http.ClusterOperations; +import com.rfs.worker.DocumentsRunner; +import com.rfs.worker.SnapshotRunner; +import lombok.SneakyThrows; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class EndToEndTest extends SourceTestBase { + @TempDir + private File localDirectory; + + @ParameterizedTest(name = "Target OpenSearch {0}") + @ArgumentsSource(SupportedTargetCluster.class) + public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersion) throws Exception { + final var snapshotContext = SnapshotTestContext.factory().noOtelTracking(); + final var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking(); + final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking(); + final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking(); + + try ( + final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23); + final var targetCluster = new SearchClusterContainer(targetVersion) + ) { + // === ACTION: Set up the source/target clusters === + var bothClustersStarted = CompletableFuture.allOf( + CompletableFuture.runAsync(() -> sourceCluster.start()), + CompletableFuture.runAsync(() -> targetCluster.start()) + ); + bothClustersStarted.join(); + + // Create a template + var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl()); + var templateName = "my_template_foo"; + sourceClusterOperations.createES6LegacyTemplate(templateName, "bar*"); + var indexName = "barstool"; + + // Create a document that uses the template + sourceClusterOperations.createDocument(indexName, "222", "{\"hi\":\"yay\"}"); + + // === ACTION: Take a snapshot === + var snapshotName = "my_snap"; + var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(sourceCluster.getUrl()) + .insecure(true) + .build() + .toConnectionContext()); + var snapshotCreator = new FileSystemSnapshotCreator( + snapshotName, + sourceClient, + SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, + snapshotContext.createSnapshotCreateContext() + ); + SnapshotRunner.runAndWaitForCompletion(snapshotCreator); + sourceCluster.copySnapshotData(localDirectory.toString()); + + var sourceRepo = new FileSystemRepo(localDirectory.toPath()); + var targetClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(targetCluster.getUrl()) + .insecure(true) + .build() + .toConnectionContext()); + + // === ACTION: Migrate the templates and indices === + migrateMetadata( + sourceRepo, + targetClient, + snapshotName, + List.of(templateName), + List.of(), + List.of(), + List.of(), + metadataContext, + sourceCluster.getVersion().getSourceVersion() + ); + + // Check that the templates were migrated + var targetClusterOperations = new ClusterOperations(targetCluster.getUrl()); + var res = targetClusterOperations.get("/_template/" + templateName); + assertThat(res.getValue(), res.getKey(), equalTo(200)); + assertThat(res.getValue(), Matchers.containsString("mappings\":{")); + + // === ACTION: Migrate the documents === + final var clockJitter = new Random(1); + var result = migrateDocumentsWithOneWorker( + sourceRepo, + snapshotName, + List.of(), + targetCluster.getUrl(), + clockJitter, + testDocMigrationContext, + sourceCluster.getVersion().getSourceVersion() + ); + assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED)); + + // Check that the docs were migrated + checkClusterMigrationOnFinished(sourceCluster, targetCluster, testDocMigrationContext); + } finally { + deleteTree(localDirectory.toPath()); + } + } + + @ParameterizedTest(name = "Target OpenSearch {0}") + @ArgumentsSource(SupportedTargetCluster.class) + public void migrateFrom_ES_v7_10(final SearchClusterContainer.Version targetVersion) throws Exception { + try ( + final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2); + final var targetCluster = new SearchClusterContainer(targetVersion) + ) { + migrateFrom_ES_v7_X(sourceCluster, targetCluster); + } + } + + @ParameterizedTest(name = "Target OpenSearch {0}") + @ArgumentsSource(SupportedTargetCluster.class) + public void migrateFrom_ES_v7_17(final SearchClusterContainer.Version targetVersion) throws Exception { + try ( + final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_17); + final var targetCluster = new SearchClusterContainer(targetVersion) + ) { + migrateFrom_ES_v7_X(sourceCluster, targetCluster); + } + } + + @SneakyThrows + private void migrateFrom_ES_v7_X( + final SearchClusterContainer sourceCluster, + final SearchClusterContainer targetCluster + ) { + final var snapshotContext = SnapshotTestContext.factory().noOtelTracking(); + final var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking(); + final var workCoordinationContext = WorkCoordinationTestContext.factory().noOtelTracking(); + final var testDocMigrationContext = DocumentMigrationTestContext.factory(workCoordinationContext).noOtelTracking(); + + try { + + // === ACTION: Set up the source/target clusters === + var bothClustersStarted = CompletableFuture.allOf( + CompletableFuture.runAsync(() -> sourceCluster.start()), + CompletableFuture.runAsync(() -> targetCluster.start()) + ); + bothClustersStarted.join(); + + // Create the component and index templates + var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl()); + var compoTemplateName = "simple_component_template"; + var indexTemplateName = "simple_index_template"; + sourceClusterOperations.createES7Templates(compoTemplateName, indexTemplateName, "author", "blog*"); + var indexName = "blog_2023"; + + // Creates a document that uses the template + sourceClusterOperations.createDocument(indexName, "222", "{\"author\":\"Tobias Funke\"}"); + + // === ACTION: Take a snapshot === + var snapshotName = "my_snap"; + var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(sourceCluster.getUrl()) + .insecure(true) + .build() + .toConnectionContext()); + var snapshotCreator = new FileSystemSnapshotCreator( + snapshotName, + sourceClient, + SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, + snapshotContext.createSnapshotCreateContext() + ); + SnapshotRunner.runAndWaitForCompletion(snapshotCreator); + sourceCluster.copySnapshotData(localDirectory.toString()); + + var sourceRepo = new FileSystemRepo(localDirectory.toPath()); + var targetClient = new OpenSearchClient(ConnectionContextTestParams.builder() + .host(targetCluster.getUrl()) + .insecure(true) + .build() + .toConnectionContext()); + + // === ACTION: Migrate the templates and indices === + migrateMetadata( + sourceRepo, + targetClient, + snapshotName, + List.of(), + List.of(compoTemplateName), + List.of(indexTemplateName), + List.of(), + metadataContext, + sourceCluster.getVersion().getSourceVersion() + ); + + // Check that the templates were migrated + var targetClusterOperations = new ClusterOperations(targetCluster.getUrl()); + var res = targetClusterOperations.get("/_index_template/" + indexTemplateName); + assertThat(res.getValue(), res.getKey(), equalTo(200)); + assertThat(res.getValue(), Matchers.containsString("composed_of\":[\"" + compoTemplateName + "\"]")); + + // === ACTION: Migrate the documents === + final var clockJitter = new Random(1); + var result = migrateDocumentsWithOneWorker( + sourceRepo, + snapshotName, + List.of(), + targetCluster.getUrl(), + clockJitter, + testDocMigrationContext, + sourceCluster.getVersion().getSourceVersion() + ); + assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED)); + + // Check that the docs were migrated + checkClusterMigrationOnFinished(sourceCluster, targetCluster, testDocMigrationContext); + } finally { + deleteTree(localDirectory.toPath()); + } + } + +} diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java index 5cec07bc3..00bb8f50a 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ParallelDocumentMigrationsTest.java @@ -1,29 +1,19 @@ package com.rfs; import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Clock; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import org.apache.lucene.document.Document; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; @@ -34,37 +24,15 @@ import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; import org.opensearch.migrations.workcoordination.tracing.WorkCoordinationTestContext; -import org.opensearch.testcontainers.OpensearchContainer; -import com.rfs.cms.CoordinateWorkHttpClient; -import com.rfs.cms.LeaseExpireTrigger; -import com.rfs.cms.OpenSearchWorkCoordinator; -import com.rfs.common.DefaultSourceRepoAccessor; -import com.rfs.common.DocumentReindexer; import com.rfs.common.FileSystemRepo; import com.rfs.common.FileSystemSnapshotCreator; -import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; -import com.rfs.common.RestClient; -import com.rfs.common.SnapshotRepo; -import com.rfs.common.SnapshotShardUnpacker; -import com.rfs.common.SourceRepo; import com.rfs.common.http.ConnectionContextTestParams; import com.rfs.framework.PreloadedSearchClusterContainer; import com.rfs.framework.SearchClusterContainer; -import com.rfs.http.SearchClusterRequests; -import com.rfs.models.IndexMetadata; -import com.rfs.models.ShardMetadata; -import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; -import com.rfs.worker.DocumentsRunner; -import lombok.AllArgsConstructor; import lombok.Lombok; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; @Tag("longTest") @Slf4j @@ -82,7 +50,6 @@ public static Stream makeDocumentMigrationArgs() { .map(name -> makeParamsForBase(name)) .collect(Collectors.toList()); var targetImageNames = TARGET_IMAGES.stream() - .map(SearchClusterContainer.Version::getImageName) .collect(Collectors.toList()); var numWorkersList = List.of(1, 3, 40); return sourceImageArgs.stream() @@ -107,7 +74,7 @@ public static Stream makeDocumentMigrationArgs() { @MethodSource("makeDocumentMigrationArgs") public void testDocumentMigration( int numWorkers, - String targetImageName, + SearchClusterContainer.Version targetVersion, SearchClusterContainer.Version baseSourceImageVersion, String generatorImage, String[] generatorArgs @@ -126,7 +93,7 @@ public void testDocumentMigration( generatorImage, generatorArgs ); - OpensearchContainer osTargetContainer = new OpensearchContainer<>(targetImageName) + SearchClusterContainer osTargetContainer = new SearchClusterContainer(targetVersion); ) { CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> { esSourceContainer.start(); @@ -160,7 +127,7 @@ public void testDocumentMigration( .build() .toConnectionContext()); var sourceRepo = new FileSystemRepo(tempDir); - migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST, testMetadataMigrationContext); + migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, List.of(), List.of(), List.of(), INDEX_ALLOWLIST, testMetadataMigrationContext, baseSourceImageVersion.getSourceVersion()); var workerFutures = new ArrayList>(); var runCounter = new AtomicInteger(); @@ -173,10 +140,11 @@ public void testDocumentMigration( sourceRepo, SNAPSHOT_NAME, INDEX_ALLOWLIST, - osTargetContainer.getHttpHostAddress(), + osTargetContainer.getUrl(), runCounter, clockJitter, - testDocMigrationContext + testDocMigrationContext, + baseSourceImageVersion.getSourceVersion() ), executorService ) @@ -266,163 +234,4 @@ private static void verifyWorkItemCounts( Assertions.assertEquals(numCompleted, shardCount + 1); } - private void checkClusterMigrationOnFinished( - SearchClusterContainer esSourceContainer, - OpensearchContainer osTargetContainer, - DocumentMigrationTestContext context - ) { - var targetClient = new RestClient(ConnectionContextTestParams.builder() - .host(osTargetContainer.getHttpHostAddress()) - .build() - .toConnectionContext() - ); - var sourceClient = new RestClient(ConnectionContextTestParams.builder() - .host(esSourceContainer.getUrl()) - .build() - .toConnectionContext() - ); - - var requests = new SearchClusterRequests(context); - var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient); - var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext()); - Assertions.assertEquals(200, refreshResponse.statusCode); - var targetMap = requests.getMapOfIndexAndDocCount(targetClient); - - MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap)); - } - - @AllArgsConstructor - private static class ExpectedMigrationWorkTerminationException extends RuntimeException { - public final RfsMigrateDocuments.NoWorkLeftException exception; - public final int numRuns; - } - - private int migrateDocumentsSequentially( - FileSystemRepo sourceRepo, - String snapshotName, - List indexAllowlist, - String targetAddress, - AtomicInteger runCounter, - Random clockJitter, - DocumentMigrationTestContext testContext - ) { - for (int runNumber = 1;; ++runNumber) { - try { - var workResult = migrateDocumentsWithOneWorker( - sourceRepo, - snapshotName, - indexAllowlist, - targetAddress, - clockJitter, - testContext - ); - if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) { - return runNumber; - } else { - runCounter.incrementAndGet(); - } - } catch (RfsMigrateDocuments.NoWorkLeftException e) { - log.info( - "No work at all was found. " - + "Presuming that work was complete and that all worker processes should terminate" - ); - throw new ExpectedMigrationWorkTerminationException(e, runNumber); - } catch (Exception e) { - log.atError() - .setCause(e) - .setMessage( - () -> "Caught an exception, " - + "but just going to run again with this worker to simulate task/container recycling" - ) - .log(); - } - } - } - - private static class FilteredLuceneDocumentsReader extends LuceneDocumentsReader { - private final UnaryOperator docTransformer; - - public FilteredLuceneDocumentsReader(Path luceneFilesBasePath, boolean softDeletesPossible, String softDeletesField, UnaryOperator docTransformer) { - super(luceneFilesBasePath, softDeletesPossible, softDeletesField); - this.docTransformer = docTransformer; - } - - @Override - public Flux readDocuments() { - return super.readDocuments().map(docTransformer::apply); - } - } - - static class LeasePastError extends Error {} - - @SneakyThrows - private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( - SourceRepo sourceRepo, - String snapshotName, - List indexAllowlist, - String targetAddress, - Random clockJitter, - DocumentMigrationTestContext context - ) throws RfsMigrateDocuments.NoWorkLeftException { - var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); - var shouldThrow = new AtomicBoolean(); - try (var processManager = new LeaseExpireTrigger(workItemId -> { - log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log(); - shouldThrow.set(true); - })) { - UnaryOperator terminatingDocumentFilter = d -> { - if (shouldThrow.get()) { - throw new LeasePastError(); - } - return d; - }; - - DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); - SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory( - repoAccessor, - tempDir, - ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES - ); - - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider); - final int ms_window = 1000; - final var nextClockShift = (int) (clockJitter.nextDouble() * ms_window) - (ms_window / 2); - log.info("nextClockShift=" + nextClockShift); - - - Function readerFactory = path -> new FilteredLuceneDocumentsReader(path, ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE, - ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD, terminatingDocumentFilter); - - return RfsMigrateDocuments.run( - readerFactory, - new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder() - .host(targetAddress) - .build() - .toConnectionContext()), 1000, Long.MAX_VALUE, 1), - new OpenSearchWorkCoordinator( - new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() - .host(targetAddress) - .build() - .toConnectionContext()), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, - UUID.randomUUID().toString(), - Clock.offset(Clock.systemUTC(), Duration.ofMillis(nextClockShift)) - ), - Duration.ofMinutes(10), - processManager, - indexMetadataFactory, - snapshotName, - indexAllowlist, - shardMetadataFactory, - unpackerFactory, - MAX_SHARD_SIZE_BYTES, - context - ); - } finally { - deleteTree(tempDir); - } - } - } diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java index 8c150e29c..e353add54 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/ProcessLifecycleTest.java @@ -20,6 +20,7 @@ import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; import org.opensearch.testcontainers.OpensearchContainer; +import com.rfs.common.ClusterVersion; import com.rfs.common.FileSystemRepo; import com.rfs.common.FileSystemSnapshotCreator; import com.rfs.common.OpenSearchClient; @@ -121,7 +122,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC ); esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); - migrateMetadata(osTargetContainer, tempDirSnapshot, testMetadataMigrationContext); + migrateMetadata(osTargetContainer, tempDirSnapshot, testMetadataMigrationContext, baseSourceImageVersion.getSourceVersion()); int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow); log.atInfo().setMessage("Process exited with code: " + actualExitCode).log(); @@ -141,7 +142,8 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC private static void migrateMetadata( OpensearchContainer targetContainer, Path tempDirSnapshot, - MetadataMigrationTestContext testMetadataMigrationContext + MetadataMigrationTestContext testMetadataMigrationContext, + ClusterVersion sourceVersion ) { String targetAddress = "http://" + targetContainer.getHost() @@ -152,7 +154,7 @@ private static void migrateMetadata( .build() .toConnectionContext()); var sourceRepo = new FileSystemRepo(tempDirSnapshot); - migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST, testMetadataMigrationContext); + migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, List.of(), List.of(), List.of(), INDEX_ALLOWLIST, testMetadataMigrationContext, sourceVersion); } private static int runProcessAgainstToxicTarget( diff --git a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SourceTestBase.java index 58daac45c..133a4e36b 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SourceTestBase.java @@ -1,34 +1,68 @@ package com.rfs; + import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Clock; +import java.time.Duration; import java.util.Comparator; import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.UnaryOperator; + +import org.apache.lucene.document.Document; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; +import com.rfs.cms.CoordinateWorkHttpClient; +import com.rfs.cms.LeaseExpireTrigger; +import com.rfs.cms.OpenSearchWorkCoordinator; import com.rfs.common.ClusterVersion; +import com.rfs.common.DefaultSourceRepoAccessor; +import com.rfs.common.DocumentReindexer; +import com.rfs.common.FileSystemRepo; +import com.rfs.common.LuceneDocumentsReader; import com.rfs.common.OpenSearchClient; +import com.rfs.common.RestClient; import com.rfs.common.SnapshotRepo; +import com.rfs.common.SnapshotShardUnpacker; import com.rfs.common.SourceRepo; +import com.rfs.common.SourceResourceProvider; +import com.rfs.common.SourceResourceProviderFactory; +import com.rfs.common.http.ConnectionContextTestParams; import com.rfs.framework.SearchClusterContainer; +import com.rfs.http.SearchClusterRequests; import com.rfs.models.GlobalMetadata; import com.rfs.models.IndexMetadata; +import com.rfs.models.ShardMetadata; import com.rfs.transformers.TransformFunctions; import com.rfs.transformers.Transformer; -import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; import com.rfs.version_os_2_11.IndexCreator_OS_2_11; +import com.rfs.worker.DocumentsRunner; import com.rfs.worker.IndexRunner; import com.rfs.worker.MetadataRunner; +import lombok.AllArgsConstructor; import lombok.Lombok; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Flux; +@Slf4j public class SourceTestBase { public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest"; + public final static int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; public static final String SOURCE_SERVER_ALIAS = "source"; + public final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; protected static Object[] makeParamsForBase(SearchClusterContainer.Version baseSourceImage) { return new Object[] { @@ -41,22 +75,27 @@ protected static void migrateMetadata( SourceRepo sourceRepo, OpenSearchClient targetClient, String snapshotName, + List legacyTemplateAllowlist, + List componentTemplateAllowlist, + List indexTemplateAllowlist, List indexAllowlist, - MetadataMigrationTestContext context + MetadataMigrationTestContext context, + ClusterVersion sourceVersion ) { - SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - GlobalMetadata.Factory metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); + SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(sourceVersion); + SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo); + GlobalMetadata.Factory metadataFactory = sourceResourceProvider.getGlobalMetadataFactory(repoDataProvider); GlobalMetadataCreator_OS_2_11 metadataCreator = new GlobalMetadataCreator_OS_2_11( targetClient, - List.of(), - List.of(), - List.of(), + legacyTemplateAllowlist, + componentTemplateAllowlist, + indexTemplateAllowlist, context.createMetadataMigrationContext() ); - Transformer transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, 1); + Transformer transformer = TransformFunctions.getTransformer(sourceResourceProvider.getVersion(), ClusterVersion.OS_2_11, 1); new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata(); - IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); + IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider); IndexCreator_OS_2_11 indexCreator = new IndexCreator_OS_2_11(targetClient); new IndexRunner( snapshotName, @@ -68,7 +107,171 @@ protected static void migrateMetadata( ).migrateIndices(); } - protected static void deleteTree(Path path) throws IOException { + @AllArgsConstructor + public static class ExpectedMigrationWorkTerminationException extends RuntimeException { + public final RfsMigrateDocuments.NoWorkLeftException exception; + public final int numRuns; + } + + public static void checkClusterMigrationOnFinished( + SearchClusterContainer esSourceContainer, + SearchClusterContainer osTargetContainer, + DocumentMigrationTestContext context + ) { + var targetClient = new RestClient(ConnectionContextTestParams.builder() + .host(osTargetContainer.getUrl()) + .build() + .toConnectionContext() + ); + var sourceClient = new RestClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + + var requests = new SearchClusterRequests(context); + var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient); + var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext()); + Assertions.assertEquals(200, refreshResponse.statusCode); + var targetMap = requests.getMapOfIndexAndDocCount(targetClient); + + MatcherAssert.assertThat(targetMap, Matchers.equalTo(sourceMap)); + } + + public static int migrateDocumentsSequentially( + FileSystemRepo sourceRepo, + String snapshotName, + List indexAllowlist, + String targetAddress, + AtomicInteger runCounter, + Random clockJitter, + DocumentMigrationTestContext testContext, + ClusterVersion parserVersion + ) { + for (int runNumber = 1;; ++runNumber) { + try { + var workResult = migrateDocumentsWithOneWorker( + sourceRepo, + snapshotName, + indexAllowlist, + targetAddress, + clockJitter, + testContext, + parserVersion + ); + if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) { + return runNumber; + } else { + runCounter.incrementAndGet(); + } + } catch (RfsMigrateDocuments.NoWorkLeftException e) { + log.info( + "No work at all was found. " + + "Presuming that work was complete and that all worker processes should terminate" + ); + throw new ExpectedMigrationWorkTerminationException(e, runNumber); + } catch (Exception e) { + log.atError() + .setCause(e) + .setMessage( + () -> "Caught an exception, " + + "but just going to run again with this worker to simulate task/container recycling" + ) + .log(); + } + } + } + + public static class FilteredLuceneDocumentsReader extends LuceneDocumentsReader { + private final UnaryOperator docTransformer; + + public FilteredLuceneDocumentsReader(Path luceneFilesBasePath, boolean softDeletesPossible, String softDeletesField, UnaryOperator docTransformer) { + super(luceneFilesBasePath, softDeletesPossible, softDeletesField); + this.docTransformer = docTransformer; + } + + @Override + public Flux readDocuments() { + return super.readDocuments().map(docTransformer::apply); + } + } + + static class LeasePastError extends Error {} + + @SneakyThrows + public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker( + SourceRepo sourceRepo, + String snapshotName, + List indexAllowlist, + String targetAddress, + Random clockJitter, + DocumentMigrationTestContext context, + ClusterVersion parserVersion + ) throws RfsMigrateDocuments.NoWorkLeftException { + var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); + var shouldThrow = new AtomicBoolean(); + try (var processManager = new LeaseExpireTrigger(workItemId -> { + log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log(); + shouldThrow.set(true); + })) { + UnaryOperator terminatingDocumentFilter = d -> { + if (shouldThrow.get()) { + throw new LeasePastError(); + } + return d; + }; + + SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(parserVersion); + + DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo); + SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory( + repoAccessor, + tempDir, + sourceResourceProvider.getBufferSizeInBytes() + ); + + SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo); + IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider); + ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider); + final int ms_window = 1000; + final var nextClockShift = (int) (clockJitter.nextDouble() * ms_window) - (ms_window / 2); + log.info("nextClockShift=" + nextClockShift); + + + Function readerFactory = path -> new FilteredLuceneDocumentsReader(path, sourceResourceProvider.getSoftDeletesPossible(), + sourceResourceProvider.getSoftDeletesFieldData(), terminatingDocumentFilter); + + return RfsMigrateDocuments.run( + readerFactory, + new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder() + .host(targetAddress) + .build() + .toConnectionContext()), 1000, Long.MAX_VALUE, 1), + new OpenSearchWorkCoordinator( + new CoordinateWorkHttpClient(ConnectionContextTestParams.builder() + .host(targetAddress) + .build() + .toConnectionContext()), + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, + UUID.randomUUID().toString(), + Clock.offset(Clock.systemUTC(), Duration.ofMillis(nextClockShift)) + ), + Duration.ofMinutes(10), + processManager, + indexMetadataFactory, + snapshotName, + indexAllowlist, + shardMetadataFactory, + unpackerFactory, + MAX_SHARD_SIZE_BYTES, + context + ); + } finally { + deleteTree(tempDir); + } + } + + public static void deleteTree(Path path) throws IOException { try (var walk = Files.walk(path)) { walk.sorted(Comparator.reverseOrder()).forEach(p -> { try { diff --git a/RFS/src/test/java/com/rfs/integration/SupportedTargetCluster.java b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SupportedTargetCluster.java similarity index 95% rename from RFS/src/test/java/com/rfs/integration/SupportedTargetCluster.java rename to DocumentsFromSnapshotMigration/src/test/java/com/rfs/SupportedTargetCluster.java index 38a687422..c71a3f1c6 100644 --- a/RFS/src/test/java/com/rfs/integration/SupportedTargetCluster.java +++ b/DocumentsFromSnapshotMigration/src/test/java/com/rfs/SupportedTargetCluster.java @@ -1,4 +1,4 @@ -package com.rfs.integration; +package com.rfs; import java.util.stream.Stream; diff --git a/RFS/src/main/java/com/rfs/common/ClusterVersion.java b/RFS/src/main/java/com/rfs/common/ClusterVersion.java index 0c6bce96d..9435da61d 100644 --- a/RFS/src/main/java/com/rfs/common/ClusterVersion.java +++ b/RFS/src/main/java/com/rfs/common/ClusterVersion.java @@ -1,5 +1,7 @@ package com.rfs.common; +import java.util.List; + import com.beust.jcommander.IStringConverter; import com.beust.jcommander.ParameterException; @@ -9,20 +11,30 @@ public enum ClusterVersion { ES_6_8, ES_7_10, + ES_7_17, + OS_1_3, OS_2_11; + public static final List SOURCE_VERSIONS = List.of(ES_6_8, ES_7_10, ES_7_17); + public static final List TARGET_VERSIONS = List.of(OS_2_11); + public static class ArgsConverter implements IStringConverter { @Override public ClusterVersion convert(String value) { - switch (value) { + String lowerCasedValue = value.toLowerCase(); + switch (lowerCasedValue) { case "es_6_8": return ClusterVersion.ES_6_8; case "es_7_10": return ClusterVersion.ES_7_10; + case "es_7_17": + return ClusterVersion.ES_7_17; + case "os_1_3": + return ClusterVersion.OS_1_3; case "os_2_11": return ClusterVersion.OS_2_11; default: - throw new ParameterException("Invalid source version: " + value); + throw new ParameterException("Invalid cluster version: " + value); } } } diff --git a/RFS/src/main/java/com/rfs/common/SourceResourceProvider.java b/RFS/src/main/java/com/rfs/common/SourceResourceProvider.java new file mode 100644 index 000000000..d2b86e58c --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/SourceResourceProvider.java @@ -0,0 +1,17 @@ +package com.rfs.common; + +import com.rfs.models.GlobalMetadata; +import com.rfs.models.IndexMetadata; +import com.rfs.models.ShardMetadata; + +public interface SourceResourceProvider { + GlobalMetadata.Factory getGlobalMetadataFactory(SnapshotRepo.Provider repoDataProvider); + SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo); + IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider); + ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider); + + int getBufferSizeInBytes(); + boolean getSoftDeletesPossible(); + String getSoftDeletesFieldData(); + ClusterVersion getVersion(); +} diff --git a/RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java b/RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java new file mode 100644 index 000000000..74d17baee --- /dev/null +++ b/RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java @@ -0,0 +1,21 @@ +package com.rfs.common; + +import com.rfs.version_es_6_8.SourceResourceProvider_ES_6_8; +import com.rfs.version_es_7_10.SourceResourceProvider_ES_7_10; + +public class SourceResourceProviderFactory { + public static SourceResourceProvider getProvider(ClusterVersion version) { + switch (version) { + case ES_6_8: + return new SourceResourceProvider_ES_6_8(); + case ES_7_10: + return new SourceResourceProvider_ES_7_10(); + case ES_7_17: + // We don't currently distinguish between 7.10 and 7.17 + return new SourceResourceProvider_ES_7_10(); + default: + throw new IllegalArgumentException("Invalid version: " + version); + } + } + +} diff --git a/RFS/src/main/java/com/rfs/version_es_6_8/SourceResourceProvider_ES_6_8.java b/RFS/src/main/java/com/rfs/version_es_6_8/SourceResourceProvider_ES_6_8.java new file mode 100644 index 000000000..d33bee39d --- /dev/null +++ b/RFS/src/main/java/com/rfs/version_es_6_8/SourceResourceProvider_ES_6_8.java @@ -0,0 +1,53 @@ +package com.rfs.version_es_6_8; + +import com.rfs.common.ClusterVersion; +import com.rfs.common.SnapshotRepo; +import com.rfs.common.SourceRepo; +import com.rfs.common.SourceResourceProvider; +import com.rfs.models.GlobalMetadata; +import com.rfs.models.IndexMetadata; +import com.rfs.models.ShardMetadata; + +public class SourceResourceProvider_ES_6_8 implements SourceResourceProvider { + + @Override + public GlobalMetadata.Factory getGlobalMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new GlobalMetadataFactory_ES_6_8(repoDataProvider); + } + + @Override + public SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo) { + return new SnapshotRepoProvider_ES_6_8(sourceRepo); + } + + @Override + public IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new IndexMetadataFactory_ES_6_8(repoDataProvider); + } + + @Override + public ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new ShardMetadataFactory_ES_6_8(repoDataProvider); + } + + @Override + public int getBufferSizeInBytes() { + return ElasticsearchConstants_ES_6_8.BUFFER_SIZE_IN_BYTES; + } + + @Override + public boolean getSoftDeletesPossible() { + return ElasticsearchConstants_ES_6_8.SOFT_DELETES_POSSIBLE; + } + + @Override + public String getSoftDeletesFieldData() { + return ElasticsearchConstants_ES_6_8.SOFT_DELETES_FIELD; + } + + @Override + public ClusterVersion getVersion() { + return ClusterVersion.ES_6_8; + } + +} diff --git a/RFS/src/main/java/com/rfs/version_es_7_10/SourceResourceProvider_ES_7_10.java b/RFS/src/main/java/com/rfs/version_es_7_10/SourceResourceProvider_ES_7_10.java new file mode 100644 index 000000000..81d315b39 --- /dev/null +++ b/RFS/src/main/java/com/rfs/version_es_7_10/SourceResourceProvider_ES_7_10.java @@ -0,0 +1,53 @@ +package com.rfs.version_es_7_10; + +import com.rfs.common.ClusterVersion; +import com.rfs.common.SnapshotRepo; +import com.rfs.common.SourceRepo; +import com.rfs.common.SourceResourceProvider; +import com.rfs.models.GlobalMetadata; +import com.rfs.models.IndexMetadata; +import com.rfs.models.ShardMetadata; + +public class SourceResourceProvider_ES_7_10 implements SourceResourceProvider { + + @Override + public GlobalMetadata .Factory getGlobalMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new GlobalMetadataFactory_ES_7_10(repoDataProvider); + } + + @Override + public SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo) { + return new SnapshotRepoProvider_ES_7_10(sourceRepo); + } + + @Override + public IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new IndexMetadataFactory_ES_7_10(repoDataProvider); + } + + @Override + public ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider) { + return new ShardMetadataFactory_ES_7_10(repoDataProvider); + } + + @Override + public int getBufferSizeInBytes() { + return ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES; + } + + @Override + public boolean getSoftDeletesPossible() { + return ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE; + } + + @Override + public String getSoftDeletesFieldData() { + return ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD; + } + + @Override + public ClusterVersion getVersion() { + return ClusterVersion.ES_7_10; + } + +} diff --git a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java index e6d6d3ad4..db55fba22 100644 --- a/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java +++ b/RFS/src/test/java/com/rfs/common/LuceneDocumentsReaderTest.java @@ -32,9 +32,6 @@ import com.rfs.common.TestResources.Snapshot; import com.rfs.models.ShardMetadata; -import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10; -import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; @@ -71,21 +68,22 @@ public void tearDown() throws IOException { log.atDebug().log("Temporary directory deleted."); } - static Stream provideSnapshots_ES_7_10() { + static Stream provideSnapshots() { return Stream.of( - Arguments.of(TestResources.SNAPSHOT_ES_7_10_W_SOFT), - Arguments.of(TestResources.SNAPSHOT_ES_7_10_WO_SOFT) + Arguments.of(TestResources.SNAPSHOT_ES_6_8, SourceResourceProviderFactory.getProvider(ClusterVersion.ES_6_8)), + Arguments.of(TestResources.SNAPSHOT_ES_7_10_W_SOFT, SourceResourceProviderFactory.getProvider(ClusterVersion.ES_7_10)), + Arguments.of(TestResources.SNAPSHOT_ES_7_10_WO_SOFT, SourceResourceProviderFactory.getProvider(ClusterVersion.ES_7_10)) ); } @ParameterizedTest - @MethodSource("provideSnapshots_ES_7_10") - public void ReadDocuments_ES_7_10_AsExpected(Snapshot snapshot) { + @MethodSource("provideSnapshots") + public void ReadDocuments_AsExpected(Snapshot snapshot, SourceResourceProvider sourceResourceProvider) throws Exception { final var repo = new FileSystemRepo(snapshot.dir); - SnapshotRepo.Provider snapShotProvider = new SnapshotRepoProvider_ES_7_10(repo); + SnapshotRepo.Provider snapShotProvider = sourceResourceProvider.getSnapshotRepoProvider(repo); DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo); - final ShardMetadata shardMetadata = new ShardMetadataFactory_ES_7_10(snapShotProvider).fromRepo(snapshot.name, "test_updates_deletes", 0); + final ShardMetadata shardMetadata = sourceResourceProvider.getShardMetadataFactory(snapShotProvider).fromRepo(snapshot.name, "test_updates_deletes", 0); SnapshotShardUnpacker unpacker = new SnapshotShardUnpacker( repoAccessor, @@ -98,8 +96,8 @@ public void ReadDocuments_ES_7_10_AsExpected(Snapshot snapshot) { // Use the LuceneDocumentsReader to get the documents Flux documents = new LuceneDocumentsReader( luceneDir, - ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE, - ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD + sourceResourceProvider.getSoftDeletesPossible(), + sourceResourceProvider.getSoftDeletesFieldData() ).readDocuments() .sort(Comparator.comparing(doc -> Uid.decodeId(doc.getBinaryValue("_id").bytes))); // Sort for consistent order given LuceneDocumentsReader may interleave diff --git a/RFS/src/test/java/com/rfs/common/TestResources.java b/RFS/src/test/java/com/rfs/common/TestResources.java index e10858e4a..9672e1787 100644 --- a/RFS/src/test/java/com/rfs/common/TestResources.java +++ b/RFS/src/test/java/com/rfs/common/TestResources.java @@ -12,12 +12,30 @@ public static class Snapshot { public final String name; } + public static final Snapshot SNAPSHOT_ES_5_6; + public static final Snapshot SNAPSHOT_ES_6_8; + public static final Snapshot SNAPSHOT_ES_6_8_MERGED; public static final Snapshot SNAPSHOT_ES_7_10_W_SOFT; public static final Snapshot SNAPSHOT_ES_7_10_WO_SOFT; static { Path rfsBaseDir = Paths.get(System.getProperty("user.dir")); + SNAPSHOT_ES_5_6 = new Snapshot( + rfsBaseDir.resolve(Paths.get("test-resources", "snapshots", "ES_5_6_Updates_Deletes")), + "rfs_snapshot" + ); + + SNAPSHOT_ES_6_8 = new Snapshot( + rfsBaseDir.resolve(Paths.get("test-resources", "snapshots", "ES_6_8_Updates_Deletes_Native")), + "rfs_snapshot" + ); + + SNAPSHOT_ES_6_8_MERGED = new Snapshot( + rfsBaseDir.resolve(Paths.get("test-resources", "snapshots", "ES_6_8_Updates_Deletes_Merged")), + "rfs_snapshot" + ); + SNAPSHOT_ES_7_10_W_SOFT = new Snapshot( rfsBaseDir.resolve(Paths.get("test-resources", "snapshots", "ES_7_10_Updates_Deletes_w_Soft")), "rfs_snapshot" diff --git a/RFS/src/test/java/com/rfs/integration/EndToEndTest.java b/RFS/src/test/java/com/rfs/integration/EndToEndTest.java deleted file mode 100644 index 874d9a874..000000000 --- a/RFS/src/test/java/com/rfs/integration/EndToEndTest.java +++ /dev/null @@ -1,282 +0,0 @@ -package com.rfs.integration; - -import java.io.File; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.hamcrest.Matchers; -import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ArgumentsSource; - -import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext; -import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; - -import com.rfs.common.ClusterVersion; -import com.rfs.common.FileSystemRepo; -import com.rfs.common.FileSystemSnapshotCreator; -import com.rfs.common.OpenSearchClient; -import com.rfs.common.http.ConnectionContextTestParams; -import com.rfs.framework.SearchClusterContainer; -import com.rfs.framework.SimpleRestoreFromSnapshot; -import com.rfs.http.ClusterOperations; -import com.rfs.transformers.TransformFunctions; -import com.rfs.version_es_6_8.GlobalMetadataFactory_ES_6_8; -import com.rfs.version_es_6_8.IndexMetadataFactory_ES_6_8; -import com.rfs.version_es_6_8.SnapshotRepoProvider_ES_6_8; -import com.rfs.version_es_7_10.GlobalMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10; -import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10; -import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11; -import com.rfs.version_os_2_11.IndexCreator_OS_2_11; -import com.rfs.worker.IndexRunner; -import com.rfs.worker.MetadataRunner; -import com.rfs.worker.SnapshotRunner; -import lombok.SneakyThrows; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Tests focused on setting up whole source clusters, performing a migration, and validation on the target cluster - */ -public class EndToEndTest { - - @TempDir - private File localDirectory; - - protected SimpleRestoreFromSnapshot simpleRfsInstance; - - @ParameterizedTest(name = "Target OpenSearch {0}") - @ArgumentsSource(SupportedTargetCluster.class) - public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersion) throws Exception { - var snapshotContext = SnapshotTestContext.factory().noOtelTracking(); - var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking(); - try ( - final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23); - final var targetCluster = new SearchClusterContainer(targetVersion) - ) { - // Setup - // Start the clusters for testing - var bothClustersStarted = CompletableFuture.allOf( - CompletableFuture.runAsync(() -> sourceCluster.start()), - CompletableFuture.runAsync(() -> targetCluster.start()) - ); - bothClustersStarted.join(); - - // Setup - var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl()); - var templateName = "my_template_foo"; - sourceClusterOperations.createES6LegacyTemplate(templateName, "bar*"); - var indexName = "barstool"; - // Creates a document that uses the template - sourceClusterOperations.createDocument(indexName, "222", "{\"hi\":\"yay\"}"); - - // Take a snapshot - var snapshotName = "my_snap"; - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(sourceCluster.getUrl()) - .insecure(true) - .build() - .toConnectionContext()); - var snapshotCreator = new FileSystemSnapshotCreator( - snapshotName, - sourceClient, - SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, - snapshotContext.createSnapshotCreateContext() - ); - SnapshotRunner.runAndWaitForCompletion(snapshotCreator); - sourceCluster.copySnapshotData(localDirectory.toString()); - - var sourceRepo = new FileSystemRepo(localDirectory.toPath()); - var targetClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(targetCluster.getUrl()) - .insecure(true) - .build() - .toConnectionContext()); - - var repoDataProvider = new SnapshotRepoProvider_ES_6_8(sourceRepo); - var metadataFactory = new GlobalMetadataFactory_ES_6_8(repoDataProvider); - var metadataCreator = new GlobalMetadataCreator_OS_2_11( - targetClient, - null, - null, - null, - metadataContext.createMetadataMigrationContext() - ); - var transformer = TransformFunctions.getTransformer(ClusterVersion.ES_6_8, ClusterVersion.OS_2_11, 1); - // Action - // Migrate metadata - new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata(); - - // Validation - var targetClusterOperations = new ClusterOperations(targetCluster.getUrl()); - var res = targetClusterOperations.get("/_template/" + templateName); - assertThat(res.getValue(), res.getKey(), equalTo(200)); - // Be sure that the mapping type on the template is an object - assertThat(res.getValue(), Matchers.containsString("mappings\":{")); - - res = targetClusterOperations.get("/" + indexName); - assertThat("Shouldn't exist yet, body:\n" + res.getValue(), res.getKey(), equalTo(404)); - - // Action - // Migrate indices - var indexMetadataFactory = new IndexMetadataFactory_ES_6_8(repoDataProvider); - var indexCreator = new IndexCreator_OS_2_11(targetClient); - new IndexRunner( - snapshotName, - indexMetadataFactory, - indexCreator, - transformer, - List.of(), - metadataContext.createIndexContext() - ).migrateIndices(); - - res = targetClusterOperations.get("/barstool"); - assertThat(res.getValue(), res.getKey(), equalTo(200)); - - // Action - // PSEUDOMigrate documents - // PSEUDO: Verify creation of 2 index templates on the cluster - // PSEUDO: Verify creation of 5 indices on the cluster - // - logs-01-2345 - // - logs-12-3456 - // - data-rolling - // - playground - // - playground2 - // PSEUDO: Verify documents - - // PSEUDO: Additional validation: - if (SearchClusterContainer.OS_V2_14_0.equals(targetVersion)) { - // - Mapping type parameter is removed - // https://opensearch.org/docs/latest/breaking-changes/#remove-mapping-types-parameter - } - } - } - - @ParameterizedTest(name = "Target OpenSearch {0}") - @ArgumentsSource(SupportedTargetCluster.class) - public void migrateFrom_ES_v7_10(final SearchClusterContainer.Version targetVersion) throws Exception { - try ( - final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2); - final var targetCluster = new SearchClusterContainer(targetVersion) - ) { - migrateFrom_ES_v7_X(sourceCluster, targetCluster); - } - } - - @ParameterizedTest(name = "Target OpenSearch {0}") - @ArgumentsSource(SupportedTargetCluster.class) - public void migrateFrom_ES_v7_17(final SearchClusterContainer.Version targetVersion) throws Exception { - try ( - final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_17); - final var targetCluster = new SearchClusterContainer(targetVersion) - ) { - migrateFrom_ES_v7_X(sourceCluster, targetCluster); - } - } - - @SneakyThrows - private void migrateFrom_ES_v7_X( - final SearchClusterContainer sourceCluster, - final SearchClusterContainer targetCluster - ) { - - var snapshotContext = SnapshotTestContext.factory().noOtelTracking(); - var metadataContext = MetadataMigrationTestContext.factory().noOtelTracking(); - - // ACTION: Set up the source/target clusters - var bothClustersStarted = CompletableFuture.allOf( - CompletableFuture.runAsync(() -> sourceCluster.start()), - CompletableFuture.runAsync(() -> targetCluster.start()) - ); - bothClustersStarted.join(); - - // Create the component and index templates - var sourceClusterOperations = new ClusterOperations(sourceCluster.getUrl()); - var compoTemplateName = "simple_component_template"; - var indexTemplateName = "simple_index_template"; - sourceClusterOperations.createES7Templates(compoTemplateName, indexTemplateName, "author", "blog*"); - var indexName = "blog_2023"; - - // Creates a document that uses the template - sourceClusterOperations.createDocument(indexName, "222", "{\"author\":\"Tobias Funke\"}"); - - // ACTION: Take a snapshot - var snapshotName = "my_snap"; - var sourceClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(sourceCluster.getUrl()) - .insecure(true) - .build() - .toConnectionContext()); - var snapshotCreator = new FileSystemSnapshotCreator( - snapshotName, - sourceClient, - SearchClusterContainer.CLUSTER_SNAPSHOT_DIR, - snapshotContext.createSnapshotCreateContext() - ); - SnapshotRunner.runAndWaitForCompletion(snapshotCreator); - sourceCluster.copySnapshotData(localDirectory.toString()); - - var sourceRepo = new FileSystemRepo(localDirectory.toPath()); - var targetClient = new OpenSearchClient(ConnectionContextTestParams.builder() - .host(targetCluster.getUrl()) - .insecure(true) - .build() - .toConnectionContext()); - // ACTION: Migrate the templates - var repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo); - var metadataFactory = new GlobalMetadataFactory_ES_7_10(repoDataProvider); - var metadataCreator = new GlobalMetadataCreator_OS_2_11( - targetClient, - List.of(), - List.of(compoTemplateName), - List.of(indexTemplateName), - metadataContext.createMetadataMigrationContext() - ); - var transformer = TransformFunctions.getTransformer(ClusterVersion.ES_7_10, ClusterVersion.OS_2_11, 1); - new MetadataRunner(snapshotName, metadataFactory, metadataCreator, transformer).migrateMetadata(); - - // Check that the templates were migrated - var targetClusterOperations = new ClusterOperations(targetCluster.getUrl()); - var res = targetClusterOperations.get("/_index_template/" + indexTemplateName); - assertThat(res.getValue(), res.getKey(), equalTo(200)); - assertThat(res.getValue(), Matchers.containsString("composed_of\":[\"" + compoTemplateName + "\"]")); - - // ACTION: Migrate the indices - res = targetClusterOperations.get("/" + indexName); - assertThat("Shouldn't exist yet, body:\n" + res.getValue(), res.getKey(), equalTo(404)); - - var indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider); - var indexCreator = new IndexCreator_OS_2_11(targetClient); - new IndexRunner( - snapshotName, - indexMetadataFactory, - indexCreator, - transformer, - List.of(), - metadataContext.createIndexContext() - ).migrateIndices(); - - // Check that the index was migrated - res = targetClusterOperations.get("/" + indexName); - assertThat(res.getValue(), res.getKey(), equalTo(200)); - - // Action - // PSEUDO Migrate documents - // PSEUDO: Verify creation of 2 index templates on the cluster - // PSEUDO: Verify creation of 5 indices on the cluster - // - logs-01-2345 - // - logs-12-3456 - // - data-rolling - // - playground - // - playground2 - // PSEUDO: Verify documents - - // PSEUDO: Additional validation: - if (SearchClusterContainer.OS_V2_14_0.equals(targetCluster.getVersion())) { - // - Mapping type parameter is removed - // https://opensearch.org/docs/latest/breaking-changes/#remove-mapping-types-parameter - } - } -} diff --git a/RFS/src/testFixtures/java/com/rfs/framework/PreloadedSearchClusterContainer.java b/RFS/src/testFixtures/java/com/rfs/framework/PreloadedSearchClusterContainer.java index 1b6870669..0c0e20357 100644 --- a/RFS/src/testFixtures/java/com/rfs/framework/PreloadedSearchClusterContainer.java +++ b/RFS/src/testFixtures/java/com/rfs/framework/PreloadedSearchClusterContainer.java @@ -14,6 +14,7 @@ public PreloadedSearchClusterContainer( new ElasticsearchVersion( new PreloadedDataContainerOrchestrator(baseVersion, serverAlias, dataLoaderImageName, generatorArgs) .getReadyImageName(pullIfUnavailable), + baseVersion.getSourceVersion(), baseVersion.prettyName + "_preloaded" ) ); diff --git a/RFS/src/testFixtures/java/com/rfs/framework/SearchClusterContainer.java b/RFS/src/testFixtures/java/com/rfs/framework/SearchClusterContainer.java index 51d2c3c71..aacd466ef 100644 --- a/RFS/src/testFixtures/java/com/rfs/framework/SearchClusterContainer.java +++ b/RFS/src/testFixtures/java/com/rfs/framework/SearchClusterContainer.java @@ -6,6 +6,7 @@ import com.google.common.collect.ImmutableMap; +import com.rfs.common.ClusterVersion; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; @@ -22,19 +23,22 @@ public class SearchClusterContainer extends GenericContainer