diff --git a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java index ab6c1fc43..b9871e08b 100644 --- a/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java +++ b/MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java @@ -40,7 +40,7 @@ protected Clusters createClusters() { var sourceCluster = clusterReaderCliExtractor.extractClusterReader(); clusters.source(sourceCluster); - var targetCluster = ClusterProviderRegistry.getRemoteWriter(arguments.targetArgs.toConnectionContext(), arguments.dataFilterArgs); + var targetCluster = ClusterProviderRegistry.getRemoteWriter(arguments.targetArgs.toConnectionContext(), null, arguments.dataFilterArgs); clusters.target(targetCluster); return clusters.build(); } diff --git a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java index 4afdcf59f..e1b2f350c 100644 --- a/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java +++ b/MetadataMigration/src/test/java/org/opensearch/migrations/EndToEndTest.java @@ -115,7 +115,7 @@ private void migrateFrom_ES( bothClustersStarted.join(); Version sourceVersion = sourceCluster.getContainerVersion().getVersion(); - var sourceIsES6_8 = VersionMatchers.isES_6_8.test(sourceVersion); + var sourceIsES6_8 = VersionMatchers.isES_6_X.test(sourceVersion); var sourceIsES7_X = VersionMatchers.isES_7_X.test(sourceVersion) || VersionMatchers.isOS_1_X.test(sourceVersion); if (!(sourceIsES6_8 || sourceIsES7_X)) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java index efbb85439..4dd9df6d1 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/transformers/TransformFunctions.java @@ -23,7 +23,7 @@ public static Transformer getTransformer( int dimensionality ) { if (VersionMatchers.isOS_2_X.test(targetVersion)) { - if (VersionMatchers.isES_6_8.test(sourceVersion)) { + if (VersionMatchers.isES_6_X.test(sourceVersion)) { return new Transformer_ES_6_8_to_OS_2_11(dimensionality); } if (VersionMatchers.equalOrGreaterThanES_7_10.test(sourceVersion)) { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/SnapshotReader_ES_6_8.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/SnapshotReader_ES_6_8.java index 74a11f07a..34fdb3ba4 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/SnapshotReader_ES_6_8.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_6_8/SnapshotReader_ES_6_8.java @@ -16,12 +16,13 @@ public class SnapshotReader_ES_6_8 implements ClusterSnapshotReader { @Override public boolean compatibleWith(Version version) { - return VersionMatchers.isES_6_8.test(version); + return VersionMatchers.isES_6_X.test(version); } @Override - public void initialize(SourceRepo sourceRepo) { + public ClusterSnapshotReader initialize(SourceRepo sourceRepo) { this.sourceRepo = sourceRepo; + return this; } @Override @@ -60,8 +61,9 @@ public Version getVersion() { } @Override - public void initialize(Version version) { + public ClusterSnapshotReader initialize(Version version) { this.version = version; + return this; } public String toString() { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotReader_ES_7_10.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotReader_ES_7_10.java index b3f983316..c79f70e18 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotReader_ES_7_10.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_es_7_10/SnapshotReader_ES_7_10.java @@ -22,8 +22,9 @@ public boolean compatibleWith(Version version) { } @Override - public void initialize(SourceRepo sourceRepo) { + public ClusterSnapshotReader initialize(SourceRepo sourceRepo) { this.sourceRepo = sourceRepo; + return this; } @Override @@ -62,8 +63,9 @@ public Version getVersion() { } @Override - public void initialize(Version version) { + public ClusterSnapshotReader initialize(Version version) { this.version = version; + return this; } public String toString() { diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java index cfe4b398a..ecd0e6c07 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_os_2_11/RemoteWriter_OS_2_11.java @@ -10,6 +10,9 @@ import org.opensearch.migrations.metadata.GlobalMetadataCreator; import org.opensearch.migrations.metadata.IndexCreator; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class RemoteWriter_OS_2_11 implements RemoteCluster, ClusterWriter { private Version version; private OpenSearchClient client; @@ -22,13 +25,24 @@ public boolean compatibleWith(Version version) { } @Override - public void initialize(DataFilterArgs dataFilterArgs) { + public ClusterWriter initialize(Version versionOverride) { + if (versionOverride != null) { + log.warn("Overriding version for cluster, " + versionOverride); + this.version = versionOverride; + } + return this; + } + + @Override + public ClusterWriter initialize(DataFilterArgs dataFilterArgs) { this.dataFilterArgs = dataFilterArgs; + return this; } @Override - public void initialize(ConnectionContext connection) { + public RemoteCluster initialize(ConnectionContext connection) { this.connection = connection; + return this; } @Override diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java index 591736d5b..7f52443dc 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/version_universal/RemoteReader.java @@ -17,7 +17,7 @@ public class RemoteReader implements RemoteCluster, ClusterReader { @Override public boolean compatibleWith(Version version) { - return VersionMatchers.isES_6_8 + return VersionMatchers.isES_6_X .or(VersionMatchers.isES_7_X) .or(VersionMatchers.isOS_1_X) .or(VersionMatchers.isOS_2_X) @@ -25,8 +25,9 @@ public boolean compatibleWith(Version version) { } @Override - public void initialize(ConnectionContext connection) { + public RemoteCluster initialize(ConnectionContext connection) { this.connection = connection; + return this; } @Override @@ -55,7 +56,7 @@ public String toString() { private RemoteReaderClient getClient() { if (client == null) { - if (VersionMatchers.isES_6_8.test(getVersion())) { + if (VersionMatchers.isES_6_X.test(getVersion())) { client = new RemoteReaderClient_ES_6_8(getConnection()); } else { client = new RemoteReaderClient(getConnection()); diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java index 1981b1779..3dcb69e77 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterProviderRegistry.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.cluster; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.opensearch.migrations.Version; @@ -42,7 +43,7 @@ public ClusterSnapshotReader getSnapshotReader(Version version, SourceRepo repo) .filter(p -> p.compatibleWith(version)) .filter(ClusterSnapshotReader.class::isInstance) .map(ClusterSnapshotReader.class::cast) - .peek(p -> p.initialize(version)) + .map(p -> p.initialize(version)) .findFirst() .orElseThrow(() -> new UnsupportedVersionException("No snapshot provider found for version: " + version)); @@ -76,15 +77,16 @@ public ClusterReader getRemoteReader(ConnectionContext connection) { * @param connection The connection context for the cluster * @return The remote resource creator */ - public ClusterWriter getRemoteWriter(ConnectionContext connection, DataFilterArgs dataFilterArgs) { - var client = new OpenSearchClient(connection); - var version = client.getClusterVersion(); + public ClusterWriter getRemoteWriter(ConnectionContext connection, Version versionOverride, DataFilterArgs dataFilterArgs) { + var version = Optional.ofNullable(versionOverride) + .orElseGet(() -> new OpenSearchClient(connection).getClusterVersion()); var remoteProvider = getRemoteProviders(connection) .filter(p -> p.compatibleWith(version)) .filter(ClusterWriter.class::isInstance) .map(ClusterWriter.class::cast) - .peek(p -> p.initialize(dataFilterArgs)) + .map(p -> p.initialize(versionOverride)) + .map(p -> p.initialize(dataFilterArgs)) .findFirst() .orElseThrow(() -> new UnsupportedVersionException("Unable to find compatible writer for " + connection + ", " + version)); @@ -97,7 +99,7 @@ private Stream getRemoteProviders(ConnectionContext connection) { .stream() .filter(RemoteCluster.class::isInstance) .map(RemoteCluster.class::cast) - .peek(p -> p.initialize(connection)); + .map(p -> p.initialize(connection)); } static class UnsupportedVersionException extends RuntimeException { diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterSnapshotReader.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterSnapshotReader.java index edf3055e3..6f4bed81c 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterSnapshotReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterSnapshotReader.java @@ -8,10 +8,10 @@ public interface ClusterSnapshotReader extends ClusterReader { /** Snapshots are read differently based on their versions */ - void initialize(Version version); + ClusterSnapshotReader initialize(Version version); /** Where to read the snapshot from */ - void initialize(SourceRepo sourceRepo); + ClusterSnapshotReader initialize(SourceRepo sourceRepo); /** Reads information about index shards */ ShardMetadata.Factory getShardMetadata(); diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterWriter.java b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterWriter.java index 6dad74ab4..55963fc05 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterWriter.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/ClusterWriter.java @@ -1,5 +1,6 @@ package org.opensearch.migrations.cluster; +import org.opensearch.migrations.Version; import org.opensearch.migrations.bulkload.models.DataFilterArgs; import org.opensearch.migrations.metadata.GlobalMetadataCreator; import org.opensearch.migrations.metadata.IndexCreator; @@ -7,8 +8,11 @@ /** Writes data onto a cluster */ public interface ClusterWriter extends VersionSpecificCluster { + /** Allow forcing the version */ + ClusterWriter initialize(Version versionOverride); + /** Filters what data is written onto the cluster */ - public void initialize(DataFilterArgs dataFilterArgs); + ClusterWriter initialize(DataFilterArgs dataFilterArgs); /** Creates global metadata items */ public GlobalMetadataCreator getGlobalMetadataCreator(); diff --git a/RFS/src/main/java/org/opensearch/migrations/cluster/RemoteCluster.java b/RFS/src/main/java/org/opensearch/migrations/cluster/RemoteCluster.java index 7750ea5b2..8e015f4fe 100644 --- a/RFS/src/main/java/org/opensearch/migrations/cluster/RemoteCluster.java +++ b/RFS/src/main/java/org/opensearch/migrations/cluster/RemoteCluster.java @@ -6,5 +6,5 @@ public interface RemoteCluster extends VersionSpecificCluster { /** Remote clusters are communicated with via a connection */ - void initialize(ConnectionContext connection); + RemoteCluster initialize(ConnectionContext connection); } diff --git a/RFS/src/test/java/org/opensearch/migrations/cluster/ClusterProviderRegistryTest.java b/RFS/src/test/java/org/opensearch/migrations/cluster/ClusterProviderRegistryTest.java new file mode 100644 index 000000000..002b2adf7 --- /dev/null +++ b/RFS/src/test/java/org/opensearch/migrations/cluster/ClusterProviderRegistryTest.java @@ -0,0 +1,36 @@ +package org.opensearch.migrations.cluster; + +import org.opensearch.migrations.Version; +import org.opensearch.migrations.bulkload.common.SourceRepo; +import org.opensearch.migrations.bulkload.common.http.ConnectionContext; +import org.opensearch.migrations.bulkload.models.DataFilterArgs; +import org.opensearch.migrations.bulkload.version_es_6_8.SnapshotReader_ES_6_8; +import org.opensearch.migrations.bulkload.version_os_2_11.RemoteWriter_OS_2_11; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +public class ClusterProviderRegistryTest { + + @Test + void testGetRemoteWriter_overridden() { + var connectionContext = mock(ConnectionContext.class); + var dataFilterArgs = mock(DataFilterArgs.class); + + var writer = ClusterProviderRegistry.getRemoteWriter(connectionContext, Version.fromString("OS 2.15"), dataFilterArgs); + + assertThat(writer, instanceOf(RemoteWriter_OS_2_11.class)); + } + + @Test + void testGetSnapshotReader_ES_6_4() { + var sourceRepo = mock(SourceRepo.class); + + var reader = ClusterProviderRegistry.getSnapshotReader(Version.fromString("ES 6.4"), sourceRepo); + + assertThat(reader, instanceOf(SnapshotReader_ES_6_8.class)); + } +} diff --git a/transformation/src/main/java/org/opensearch/migrations/VersionMatchers.java b/transformation/src/main/java/org/opensearch/migrations/VersionMatchers.java index 6ebc77fc1..b421cd6f0 100644 --- a/transformation/src/main/java/org/opensearch/migrations/VersionMatchers.java +++ b/transformation/src/main/java/org/opensearch/migrations/VersionMatchers.java @@ -6,7 +6,7 @@ @UtilityClass public class VersionMatchers { - public static final Predicate isES_6_8 = VersionMatchers.matchesMinorVersion(Version.fromString("ES 6.8")); + public static final Predicate isES_6_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 6.8")); public static final Predicate isES_7_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 7.10")); public static final Predicate equalOrGreaterThanES_7_10 = VersionMatchers.equalOrGreaterThanMinorVersion(Version.fromString("ES 7.10")); diff --git a/transformation/src/test/java/org/opensearch/migrations/VersionMatchersTest.java b/transformation/src/test/java/org/opensearch/migrations/VersionMatchersTest.java index 2734f8706..f48e6f07a 100644 --- a/transformation/src/test/java/org/opensearch/migrations/VersionMatchersTest.java +++ b/transformation/src/test/java/org/opensearch/migrations/VersionMatchersTest.java @@ -21,12 +21,12 @@ private void testPredicate(Predicate predicate, String matcherName, Lis } @Test - void isES_6_8Test() { + void isES_6_XTest() { testPredicate( - VersionMatchers.isES_6_8, - "isES_6_8", - List.of("ES 6.8", "ES 6.8.23"), - List.of("ES 6.7", "ES 6.9", "OS 1.3") + VersionMatchers.isES_6_X, + "isES_6_X", + List.of("ES 6.8", "ES 6.8.23", "ES 6.9"), + List.of("ES 5.7", "OS 1.3") ); }