Skip to content

Commit

Permalink
Allow more versions to be in scope (#1051)
Browse files Browse the repository at this point in the history
Allow more versions to be in scope

- Loosen the ES 6.8 requirement to allow 6.X instead.
- Fixed some code smells for SonarQube

Signed-off-by: Peter Nied <[email protected]>
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied authored Oct 16, 2024
1 parent d0a0b6f commit 30185a2
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected Clusters createClusters() {
var sourceCluster = clusterReaderCliExtractor.extractClusterReader();
clusters.source(sourceCluster);

var targetCluster = ClusterProviderRegistry.getRemoteWriter(arguments.targetArgs.toConnectionContext(), arguments.dataFilterArgs);
var targetCluster = ClusterProviderRegistry.getRemoteWriter(arguments.targetArgs.toConnectionContext(), null, arguments.dataFilterArgs);
clusters.target(targetCluster);
return clusters.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void migrateFrom_ES(
bothClustersStarted.join();

Version sourceVersion = sourceCluster.getContainerVersion().getVersion();
var sourceIsES6_8 = VersionMatchers.isES_6_8.test(sourceVersion);
var sourceIsES6_8 = VersionMatchers.isES_6_X.test(sourceVersion);
var sourceIsES7_X = VersionMatchers.isES_7_X.test(sourceVersion) || VersionMatchers.isOS_1_X.test(sourceVersion);

if (!(sourceIsES6_8 || sourceIsES7_X)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static Transformer getTransformer(
int dimensionality
) {
if (VersionMatchers.isOS_2_X.test(targetVersion)) {
if (VersionMatchers.isES_6_8.test(sourceVersion)) {
if (VersionMatchers.isES_6_X.test(sourceVersion)) {
return new Transformer_ES_6_8_to_OS_2_11(dimensionality);
}
if (VersionMatchers.equalOrGreaterThanES_7_10.test(sourceVersion)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ public class SnapshotReader_ES_6_8 implements ClusterSnapshotReader {

@Override
public boolean compatibleWith(Version version) {
return VersionMatchers.isES_6_8.test(version);
return VersionMatchers.isES_6_X.test(version);
}

@Override
public void initialize(SourceRepo sourceRepo) {
public ClusterSnapshotReader initialize(SourceRepo sourceRepo) {
this.sourceRepo = sourceRepo;
return this;
}

@Override
Expand Down Expand Up @@ -60,8 +61,9 @@ public Version getVersion() {
}

@Override
public void initialize(Version version) {
public ClusterSnapshotReader initialize(Version version) {
this.version = version;
return this;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ public boolean compatibleWith(Version version) {
}

@Override
public void initialize(SourceRepo sourceRepo) {
public ClusterSnapshotReader initialize(SourceRepo sourceRepo) {
this.sourceRepo = sourceRepo;
return this;
}

@Override
Expand Down Expand Up @@ -62,8 +63,9 @@ public Version getVersion() {
}

@Override
public void initialize(Version version) {
public ClusterSnapshotReader initialize(Version version) {
this.version = version;
return this;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import org.opensearch.migrations.metadata.GlobalMetadataCreator;
import org.opensearch.migrations.metadata.IndexCreator;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RemoteWriter_OS_2_11 implements RemoteCluster, ClusterWriter {
private Version version;
private OpenSearchClient client;
Expand All @@ -22,13 +25,24 @@ public boolean compatibleWith(Version version) {
}

@Override
public void initialize(DataFilterArgs dataFilterArgs) {
public ClusterWriter initialize(Version versionOverride) {
if (versionOverride != null) {
log.warn("Overriding version for cluster, " + versionOverride);
this.version = versionOverride;
}
return this;
}

@Override
public ClusterWriter initialize(DataFilterArgs dataFilterArgs) {
this.dataFilterArgs = dataFilterArgs;
return this;
}

@Override
public void initialize(ConnectionContext connection) {
public RemoteCluster initialize(ConnectionContext connection) {
this.connection = connection;
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ public class RemoteReader implements RemoteCluster, ClusterReader {

@Override
public boolean compatibleWith(Version version) {
return VersionMatchers.isES_6_8
return VersionMatchers.isES_6_X
.or(VersionMatchers.isES_7_X)
.or(VersionMatchers.isOS_1_X)
.or(VersionMatchers.isOS_2_X)
.test(version);
}

@Override
public void initialize(ConnectionContext connection) {
public RemoteCluster initialize(ConnectionContext connection) {
this.connection = connection;
return this;
}

@Override
Expand Down Expand Up @@ -55,7 +56,7 @@ public String toString() {

private RemoteReaderClient getClient() {
if (client == null) {
if (VersionMatchers.isES_6_8.test(getVersion())) {
if (VersionMatchers.isES_6_X.test(getVersion())) {
client = new RemoteReaderClient_ES_6_8(getConnection());
} else {
client = new RemoteReaderClient(getConnection());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.cluster;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import org.opensearch.migrations.Version;
Expand Down Expand Up @@ -42,7 +43,7 @@ public ClusterSnapshotReader getSnapshotReader(Version version, SourceRepo repo)
.filter(p -> p.compatibleWith(version))
.filter(ClusterSnapshotReader.class::isInstance)
.map(ClusterSnapshotReader.class::cast)
.peek(p -> p.initialize(version))
.map(p -> p.initialize(version))
.findFirst()
.orElseThrow(() -> new UnsupportedVersionException("No snapshot provider found for version: " + version));

Expand Down Expand Up @@ -76,15 +77,16 @@ public ClusterReader getRemoteReader(ConnectionContext connection) {
* @param connection The connection context for the cluster
* @return The remote resource creator
*/
public ClusterWriter getRemoteWriter(ConnectionContext connection, DataFilterArgs dataFilterArgs) {
var client = new OpenSearchClient(connection);
var version = client.getClusterVersion();
public ClusterWriter getRemoteWriter(ConnectionContext connection, Version versionOverride, DataFilterArgs dataFilterArgs) {
var version = Optional.ofNullable(versionOverride)
.orElseGet(() -> new OpenSearchClient(connection).getClusterVersion());

var remoteProvider = getRemoteProviders(connection)
.filter(p -> p.compatibleWith(version))
.filter(ClusterWriter.class::isInstance)
.map(ClusterWriter.class::cast)
.peek(p -> p.initialize(dataFilterArgs))
.map(p -> p.initialize(versionOverride))
.map(p -> p.initialize(dataFilterArgs))
.findFirst()
.orElseThrow(() -> new UnsupportedVersionException("Unable to find compatible writer for " + connection + ", " + version));

Expand All @@ -97,7 +99,7 @@ private Stream<RemoteCluster> getRemoteProviders(ConnectionContext connection) {
.stream()
.filter(RemoteCluster.class::isInstance)
.map(RemoteCluster.class::cast)
.peek(p -> p.initialize(connection));
.map(p -> p.initialize(connection));
}

static class UnsupportedVersionException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
public interface ClusterSnapshotReader extends ClusterReader {

/** Snapshots are read differently based on their versions */
void initialize(Version version);
ClusterSnapshotReader initialize(Version version);

/** Where to read the snapshot from */
void initialize(SourceRepo sourceRepo);
ClusterSnapshotReader initialize(SourceRepo sourceRepo);

/** Reads information about index shards */
ShardMetadata.Factory getShardMetadata();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package org.opensearch.migrations.cluster;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.metadata.GlobalMetadataCreator;
import org.opensearch.migrations.metadata.IndexCreator;

/** Writes data onto a cluster */
public interface ClusterWriter extends VersionSpecificCluster {

/** Allow forcing the version */
ClusterWriter initialize(Version versionOverride);

/** Filters what data is written onto the cluster */
public void initialize(DataFilterArgs dataFilterArgs);
ClusterWriter initialize(DataFilterArgs dataFilterArgs);

/** Creates global metadata items */
public GlobalMetadataCreator getGlobalMetadataCreator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
public interface RemoteCluster extends VersionSpecificCluster {

/** Remote clusters are communicated with via a connection */
void initialize(ConnectionContext connection);
RemoteCluster initialize(ConnectionContext connection);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.migrations.cluster;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.bulkload.common.SourceRepo;
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.bulkload.version_es_6_8.SnapshotReader_ES_6_8;
import org.opensearch.migrations.bulkload.version_os_2_11.RemoteWriter_OS_2_11;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;

public class ClusterProviderRegistryTest {

@Test
void testGetRemoteWriter_overridden() {
var connectionContext = mock(ConnectionContext.class);
var dataFilterArgs = mock(DataFilterArgs.class);

var writer = ClusterProviderRegistry.getRemoteWriter(connectionContext, Version.fromString("OS 2.15"), dataFilterArgs);

assertThat(writer, instanceOf(RemoteWriter_OS_2_11.class));
}

@Test
void testGetSnapshotReader_ES_6_4() {
var sourceRepo = mock(SourceRepo.class);

var reader = ClusterProviderRegistry.getSnapshotReader(Version.fromString("ES 6.4"), sourceRepo);

assertThat(reader, instanceOf(SnapshotReader_ES_6_8.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

@UtilityClass
public class VersionMatchers {
public static final Predicate<Version> isES_6_8 = VersionMatchers.matchesMinorVersion(Version.fromString("ES 6.8"));
public static final Predicate<Version> isES_6_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 6.8"));
public static final Predicate<Version> isES_7_X = VersionMatchers.matchesMajorVersion(Version.fromString("ES 7.10"));
public static final Predicate<Version> equalOrGreaterThanES_7_10 = VersionMatchers.equalOrGreaterThanMinorVersion(Version.fromString("ES 7.10"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ private void testPredicate(Predicate<Version> predicate, String matcherName, Lis
}

@Test
void isES_6_8Test() {
void isES_6_XTest() {
testPredicate(
VersionMatchers.isES_6_8,
"isES_6_8",
List.of("ES 6.8", "ES 6.8.23"),
List.of("ES 6.7", "ES 6.9", "OS 1.3")
VersionMatchers.isES_6_X,
"isES_6_X",
List.of("ES 6.8", "ES 6.8.23", "ES 6.9"),
List.of("ES 5.7", "OS 1.3")
);
}

Expand Down

0 comments on commit 30185a2

Please sign in to comment.