Skip to content

Commit

Permalink
Expand target support to OpenSearch v1.3 (#1074)
Browse files Browse the repository at this point in the history
* Expand target support to  OpenSearch v1.3

- Updated the java end to end test cases to read from the same list for
target platforms.
- Update ParallelDocumentMigrationsTest to verify behavior on all supported target cluster types

Signed-off-by: Peter Nied <[email protected]>
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied authored Oct 17, 2024
1 parent 98d1318 commit d7224ce
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
Expand All @@ -20,7 +21,8 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -30,52 +32,33 @@ public class EndToEndTest extends SourceTestBase {
@TempDir
private File localDirectory;

@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v6_8(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23);
final var targetCluster = new SearchClusterContainer(targetVersion)
) {
migrateFrom_ES(sourceCluster, targetCluster);
}
}
private static Stream<Arguments> scenarios() {
var scenarios = Stream.<Arguments>builder();

@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v7_10(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2);
final var targetCluster = new SearchClusterContainer(targetVersion)
) {
migrateFrom_ES(sourceCluster, targetCluster);
for (var sourceCluster : SupportedClusters.sources()) {
for (var targetCluster : SupportedClusters.targets()) {
scenarios.add(Arguments.of(sourceCluster, targetCluster));
}
}
}

@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_ES_v7_17(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_17);
final var targetCluster = new SearchClusterContainer(targetVersion)
) {
migrateFrom_ES(sourceCluster, targetCluster);
}
return scenarios.build();
}

@ParameterizedTest(name = "Target {0}")
@ArgumentsSource(SupportedTargetCluster.class)
public void migrateFrom_OS_v1_3(final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
@ParameterizedTest(name = "Source {0} to Target {1}")
@MethodSource(value = "scenarios")
public void migrationDocuments(
final SearchClusterContainer.ContainerVersion sourceVersion,
final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.OS_V1_3_16);
final var sourceCluster = new SearchClusterContainer(sourceVersion);
final var targetCluster = new SearchClusterContainer(targetVersion)
) {
migrateFrom_ES(sourceCluster, targetCluster);
migrationDocumentsWithClusters(sourceCluster, targetCluster);
}
}

@SneakyThrows
private void migrateFrom_ES(
private void migrationDocumentsWithClusters(
final SearchClusterContainer sourceCluster,
final SearchClusterContainer targetCluster
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.migrations.CreateSnapshot;
Expand Down Expand Up @@ -37,28 +36,17 @@
@Slf4j
public class ParallelDocumentMigrationsTest extends SourceTestBase {

static final List<SearchClusterContainer.ContainerVersion> SOURCE_IMAGES = List.of(
SearchClusterContainer.ES_V7_10_2
);
static final List<SearchClusterContainer.ContainerVersion> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);

public static Stream<Arguments> makeDocumentMigrationArgs() {
var targetImageNames = TARGET_IMAGES.stream()
.collect(Collectors.toList());
var numWorkersList = List.of(1, 3, 40);
var compressionEnabledList = List.of(true, false);
return SOURCE_IMAGES.stream()
return SupportedClusters.targets().stream()
.flatMap(
sourceImage -> targetImageNames.stream()
.flatMap(
targetImage -> numWorkersList.stream()
.flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of(
numWorkers,
targetImage,
sourceImage,
compression
))
)
targetImage -> numWorkersList.stream()
.flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of(
numWorkers,
targetImage,
compression
))
)
);
}
Expand All @@ -68,14 +56,15 @@ public static Stream<Arguments> makeDocumentMigrationArgs() {
public void testDocumentMigration(
int numWorkers,
SearchClusterContainer.ContainerVersion targetVersion,
SearchClusterContainer.ContainerVersion sourceVersion,
boolean compressionEnabled
) throws Exception {
var executorService = Executors.newFixedThreadPool(numWorkers);
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();
final var testDocMigrationContext = DocumentMigrationTestContext.factory()
.withAllTracking();

// The source container version doesn't impact the test focus to stress work coordination store with many worker instances.
final var sourceVersion = SearchClusterContainer.ES_V7_10_2;
try (
var esSourceContainer = new SearchClusterContainer(sourceVersion);
var osTargetContainer = new SearchClusterContainer(targetVersion);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.migrations.bulkload.SupportedClusters;
import org.opensearch.migrations.bulkload.common.FileSystemSnapshotCreator;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer.ContainerVersion;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.worker.SnapshotRunner;
Expand Down Expand Up @@ -44,54 +46,29 @@ class EndToEndTest {
private File localDirectory;

private static Stream<Arguments> scenarios() {
return Stream.of(
Arguments.of(TransferMedium.Http, MetadataCommands.EVALUATE)
// Arguments.of(TransferMedium.SnapshotImage, MetadataCommands.MIGRATE),
// Arguments.of(TransferMedium.Http, MetadataCommands.MIGRATE)
);
}

@ParameterizedTest(name = "Command {1}, Medium of transfer {0}")
@MethodSource(value = "scenarios")
void metadataMigrateFrom_ES_v6_8(TransferMedium medium, MetadataCommands command) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V6_8_23);
final var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
) {
migrateFrom_ES(sourceCluster, targetCluster, medium, command);
var scenarios = Stream.<Arguments>builder();

for (var sourceCluster : SupportedClusters.sources()) {
for (var targetCluster : SupportedClusters.targets()) {
for (var command : MetadataCommands.values()) {
scenarios.add(Arguments.of(sourceCluster, targetCluster, TransferMedium.Http, command));
}
// Only test snapshot for migrate case
scenarios.add(Arguments.of(sourceCluster, targetCluster, TransferMedium.SnapshotImage, MetadataCommands.MIGRATE));
}
}
}

@ParameterizedTest(name = "Command {1}, Medium of transfer {0}")
@MethodSource(value = "scenarios")
void metadataMigrateFrom_ES_v7_17(TransferMedium medium, MetadataCommands command) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_17);
final var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
) {
migrateFrom_ES(sourceCluster, targetCluster, medium, command);
}
}

@ParameterizedTest(name = "Command {1}, Medium of transfer {0}")
@MethodSource(value = "scenarios")
void metadataMigrateFrom_ES_v7_10(TransferMedium medium, MetadataCommands command) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2);
final var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
) {
migrateFrom_ES(sourceCluster, targetCluster, medium, command);
}
return scenarios.build();
}

@ParameterizedTest(name = "Command {1}, Medium of transfer {0}")
@ParameterizedTest(name = "From version {0} to version {1}, Command {2}, Medium of transfer {3}")
@MethodSource(value = "scenarios")
void metadataMigrateFrom_OS_v1_3(TransferMedium medium, MetadataCommands command) throws Exception {
void metadataCommand(ContainerVersion sourceVersion, ContainerVersion targetVersion, TransferMedium medium, MetadataCommands command) throws Exception {
try (
final var sourceCluster = new SearchClusterContainer(SearchClusterContainer.OS_V1_3_16);
final var targetCluster = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
final var sourceCluster = new SearchClusterContainer(sourceVersion);
final var targetCluster = new SearchClusterContainer(targetVersion)
) {
migrateFrom_ES(sourceCluster, targetCluster, medium, command);
metadataCommandOnClusters(sourceCluster, targetCluster, medium, command);
}
}

Expand All @@ -101,7 +78,7 @@ private enum TransferMedium {
}

@SneakyThrows
private void migrateFrom_ES(
private void metadataCommandOnClusters(
final SearchClusterContainer sourceCluster,
final SearchClusterContainer targetCluster,
final TransferMedium medium,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static Transformer getTransformer(
Version targetVersion,
int dimensionality
) {
if (VersionMatchers.isOS_2_X.test(targetVersion)) {
if (VersionMatchers.isOS_2_X.or(VersionMatchers.isOS_1_X).test(targetVersion)) {
if (VersionMatchers.isES_6_X.test(sourceVersion)) {
return new Transformer_ES_6_8_to_OS_2_11(dimensionality);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ public class RemoteWriter_OS_2_11 implements RemoteCluster, ClusterWriter {

@Override
public boolean compatibleWith(Version version) {
return VersionMatchers.isOS_2_X.test(version);
return VersionMatchers.isOS_2_X.or
(VersionMatchers.isOS_1_X)
.test(version);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.migrations.bulkload;

import java.util.List;

import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer.ContainerVersion;

import lombok.experimental.UtilityClass;

/**
* Defines all supported clusters
*/
@UtilityClass
public class SupportedClusters {

public static List<ContainerVersion> sources() {
return List.of(
SearchClusterContainer.ES_V6_8_23,
SearchClusterContainer.ES_V7_10_2,
SearchClusterContainer.ES_V7_17,
SearchClusterContainer.OS_V1_3_16
);
}

public static List<ContainerVersion> targets() {
return List.of(
SearchClusterContainer.OS_V1_3_16,
SearchClusterContainer.OS_V2_14_0
);
}
}

0 comments on commit d7224ce

Please sign in to comment.