Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into transformer-min
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Jul 8, 2024
2 parents 7ae3e66 + 431c07e commit 5635e09
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 57 deletions.
16 changes: 5 additions & 11 deletions .whitesource
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
{
"scanSettings": {
"configMode": "LOCAL",
"configExternalURL": "",
"projectToken": "",
"baseBranches": []
"baseBranches": ["main"]
},
"checkRunSettings": {
"vulnerableCheckRunConclusionLevel": "failure",
"vulnerableCheckRunConclusionLevel": "success",
"licenseCheckRunConclusionLevel": "success",
"displayMode": "diff",
"useMendCheckNames": true,
"strictMode" : "failure"
"strictMode" : "warning"
},
"issueSettings": {
"minSeverityLevel": "LOW",
"issueType": "DEPENDENCY"
},
"remediateSettings": {
"workflowRules": {
"enabled": true
}
"minSeverityLevel": "NONE"
}
}
14 changes: 12 additions & 2 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,23 @@ task slowTest(type: Test) {
jacoco {
enabled = true
}
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
showExceptions true
showCauses true
showStackTraces true
// showStandardStreams true
}
reports {
html.required = true
html.destination file("${buildDir}/reports/tests/slowTest")
}
}

jacocoTestReport {
dependsOn slowTest
reports {
xml.required = true
xml.destination file("${buildDir}/reports/jacoco/test/jacocoTestReport.xml")
html.required = true
html.destination file("${buildDir}/reports/jacoco/test/html")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;

import java.io.IOException;
import java.net.URI;
Expand All @@ -22,6 +23,8 @@

import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
import com.rfs.common.IndexMetadata;
import com.rfs.common.LuceneDocumentsReader;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3Uri;
Expand Down Expand Up @@ -49,25 +52,38 @@ public static class Args {
description = "The name of the snapshot to migrate")
public String snapshotName;

@Parameter(names = {"--snapshot-local-dir"},
required = false,
description = ("The absolute path to the directory on local disk where the snapshot exists. Use this parameter"
+ " if have a copy of the snapshot disk. Mutually exclusive with --s3-local-dir, --s3-repo-uri, and --s3-region."
))
public String snapshotLocalDir = null;

@Parameter(names = {"--s3-local-dir"},
required = true,
description = "The absolute path to the directory on local disk to download S3 files to")
public String s3LocalDirPath;
required = false,
description = ("The absolute path to the directory on local disk to download S3 files to. If you supply this, you must"
+ " also supply --s3-repo-uri and --s3-region. Mutually exclusive with --snapshot-local-dir."
))
public String s3LocalDir = null;

@Parameter(names = {"--s3-repo-uri"},
required = true,
description = "The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2")
public String s3RepoUri;
required = false,
description = ("The S3 URI of the snapshot repo, like: s3://my-bucket/dir1/dir2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-region. Mutually exclusive with --snapshot-local-dir."
))
public String s3RepoUri = null;

@Parameter(names = {"--s3-region"},
required = true,
description = "The AWS Region the S3 bucket is in, like: us-east-2")
public String s3Region;
required = false,
description = ("The AWS Region the S3 bucket is in, like: us-east-2. If you supply this, you must"
+ " also supply --s3-local-dir and --s3-repo-uri. Mutually exclusive with --snapshot-local-dir."
))
public String s3Region = null;

@Parameter(names = {"--lucene-dir"},
required = true,
description = "The absolute path to the directory where we'll put the Lucene docs")
public String luceneDirPath;
public String luceneDir;

@Parameter(names = {"--target-host"},
required = true,
Expand Down Expand Up @@ -97,17 +113,40 @@ public NoWorkLeftException(String message) {
}
}

public static void validateArgs(Args args) {
boolean isSnapshotLocalDirProvided = args.snapshotLocalDir != null;
boolean areAllS3ArgsProvided = args.s3LocalDir != null && args.s3RepoUri != null && args.s3Region != null;
boolean areAnyS3ArgsProvided = args.s3LocalDir != null || args.s3RepoUri != null || args.s3Region != null;

if (isSnapshotLocalDirProvided && areAnyS3ArgsProvided) {
throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region, but not both.");
}

if (areAnyS3ArgsProvided && !areAllS3ArgsProvided) {
throw new ParameterException("If provide the S3 Snapshot args, you must provide all of them (--s3-local-dir, --s3-repo-uri and --s3-region).");
}

if (!isSnapshotLocalDirProvided && !areAllS3ArgsProvided) {
throw new ParameterException("You must provide either --snapshot-local-dir or --s3-local-dir, --s3-repo-uri, and --s3-region.");
}

}

public static void main(String[] args) throws Exception {
// Grab out args
Args arguments = new Args();
JCommander.newBuilder()
.addObject(arguments)
.build()
.parse(args);

var luceneDirPath = Paths.get(arguments.luceneDirPath);
validateArgs(arguments);

var luceneDirPath = Paths.get(arguments.luceneDir);
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;


try (var processManager = new LeaseExpireTrigger(workItemId->{
log.error("terminating RunRfsWorker because its lease has expired for " + workItemId);
log.error("Terminating RunRfsWorker because its lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
var workCoordinator = new OpenSearchWorkCoordinator(new ApacheHttpClient(new URI(arguments.targetHost)),
Expand All @@ -120,8 +159,16 @@ public static void main(String[] args) throws Exception {
new OpenSearchClient(arguments.targetHost, arguments.targetUser, arguments.targetPass, false);
DocumentReindexer reindexer = new DocumentReindexer(targetClient);

SourceRepo sourceRepo = S3Repo.create(Paths.get(arguments.s3LocalDirPath),
new S3Uri(arguments.s3RepoUri), arguments.s3Region);
SourceRepo sourceRepo;
if (snapshotLocalDirPath == null) {
sourceRepo = S3Repo.create(
Paths.get(arguments.s3LocalDir),
new S3Uri(arguments.s3RepoUri),
arguments.s3Region
);
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
Expand Down
134 changes: 124 additions & 10 deletions DocumentsFromSnapshotMigration/src/test/java/com/rfs/FullTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -65,6 +68,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
Expand All @@ -79,18 +83,22 @@ public class FullTest {
final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;
final static Pattern CAT_INDICES_INDEX_COUNT_PATTERN =
Pattern.compile("(?:\\S+\\s+){2}(\\S+)\\s+(?:\\S+\\s+){3}(\\S+)");
final static List<SearchClusterContainer.Version> SOURCE_IMAGES = List.of(
SearchClusterContainer.ES_V7_10_2,
SearchClusterContainer.ES_V7_17
);
final static List<SearchClusterContainer.Version> TARGET_IMAGES = List.of(
SearchClusterContainer.OS_V1_3_16,
SearchClusterContainer.OS_V2_14_0
);
public static final String SOURCE_SERVER_ALIAS = "source";
public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;

public static Stream<Arguments> makeArgs() {
var sourceImageNames = List.of(
makeParamsForBase(SearchClusterContainer.ES_V7_17),
makeParamsForBase(SearchClusterContainer.ES_V7_10_2));
var targetImageNames = List.of(
SearchClusterContainer.OS_V1_3_16.getImageName(),
SearchClusterContainer.OS_V2_14_0.getImageName());
public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream().map(name -> makeParamsForBase(name)).collect(Collectors.toList());
var targetImageNames = TARGET_IMAGES.stream().map(SearchClusterContainer.Version::getImageName).collect(Collectors.toList());
var numWorkers = List.of(1, 3, 40);
return sourceImageNames.stream()
return sourceImageArgs.stream()
.flatMap(a->
targetImageNames.stream().flatMap(b->
numWorkers.stream().map(c->Arguments.of(a[0], a[1], a[2], b, c))));
Expand All @@ -105,8 +113,8 @@ private static Object[] makeParamsForBase(SearchClusterContainer.Version baseSou
}

@ParameterizedTest
@MethodSource("makeArgs")
public void test(SearchClusterContainer.Version baseSourceImageVersion,
@MethodSource("makeDocumentMigrationArgs")
public void testDocumentMigration(SearchClusterContainer.Version baseSourceImageVersion,
String generatorImage, String[] generatorArgs,
String targetImageName, int numWorkers)
throws Exception
Expand Down Expand Up @@ -308,6 +316,112 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
}
}

public static Stream<Arguments> makeProcessExitArgs() {
return Stream.of(
Arguments.of(true, 0),
Arguments.of(false, 1)
);
}

@ParameterizedTest
@MethodSource("makeProcessExitArgs")
public void testProcessExitsAsExpected(boolean targetAvailable, int expectedExitCode) throws Exception {
var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
var baseSourceImageVersion = (SearchClusterContainer.Version) sourceImageArgs[0];
var generatorImage = (String) sourceImageArgs[1];
var generatorArgs = (String[]) sourceImageArgs[2];
var targetImageName = SearchClusterContainer.OS_V2_14_0.getImageName();

try (var esSourceContainer = new PreloadedSearchClusterContainer(baseSourceImageVersion,
SOURCE_SERVER_ALIAS, generatorImage, generatorArgs);
OpensearchContainer<?> osTargetContainer =
new OpensearchContainer<>(targetImageName)) {
esSourceContainer.start();
osTargetContainer.start();

final var SNAPSHOT_NAME = "test_snapshot";
final List<String> INDEX_ALLOWLIST = List.of();
CreateSnapshot.run(
c -> new FileSystemSnapshotCreator(SNAPSHOT_NAME, c, SearchClusterContainer.CLUSTER_SNAPSHOT_DIR),
new OpenSearchClient(esSourceContainer.getUrl(), null),
false);
var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

String targetAddress = osTargetContainer.getHttpHostAddress();

String[] args = {
"--snapshot-name", SNAPSHOT_NAME,
"--snapshot-local-dir", tempDirSnapshot.toString(),
"--lucene-dir", tempDirLucene.toString(),
"--target-host", targetAddress
};

try {
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

var targetClient = new OpenSearchClient(targetAddress, null);
var sourceRepo = new FileSystemRepo(tempDirSnapshot);
migrateMetadata(sourceRepo, targetClient, SNAPSHOT_NAME, INDEX_ALLOWLIST);

// Stop the target container if we don't want it to be available. We've already cached the address it was
// using, so we can have reasonable confidence that nothing else will be using it and bork our test.
if (!targetAvailable) {
osTargetContainer.stop();
}

String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: " + Arrays.toString(args)).log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable, "-cp", classpath, "com.rfs.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();
log.atInfo().setMessage("Process started with ID: " + Long.toString(process.toHandle().pid())).log();

// Kill the process and fail if we have to wait too long
int timeoutSeconds = 90;
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
// Print the process output
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append(System.lineSeparator());
}
}
log.atError().setMessage("Process Output:").log();
log.atError().setMessage(output.toString()).log();

log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly(); // ..then avada kedavra
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

int actualExitCode = process.exitValue();
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();

// Check if the exit code is as expected
Assertions.assertEquals(expectedExitCode, actualExitCode, "The program did not exit with the expected status code.");

} finally {
deleteTree(tempDirSnapshot);
deleteTree(tempDirLucene);
}
}
}

private static void deleteTree(Path path) throws IOException {
try (var walk = Files.walk(path)) {
walk.sorted(Comparator.reverseOrder()).forEach(p -> {
Expand Down
4 changes: 2 additions & 2 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ test {
}

testLogging {
exceptionFormat = 'full'
events "failed"
events "passed", "skipped", "failed"
exceptionFormat "full"
showExceptions true
showCauses true
showStackTraces true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import os
from pathlib import Path
from typing import List

# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
Expand All @@ -28,8 +29,17 @@
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

DEPLOYED_STAGE = os.environ.get('MIGRATION_STAGE')
ALLOWED_HOSTS = ['migration-console', f'migration-console.migration.{DEPLOYED_STAGE}.local', 'localhost']

def get_allowed_hosts() -> List[str]:
hosts = os.getenv('API_ALLOWED_HOSTS', '')
if not hosts:
return ['localhost']
# Remove any quotes and strip extra spacing characters
hosts = hosts.replace('"', '').replace('\'', '')
return [host.strip() for host in hosts.split(',')]


ALLOWED_HOSTS = get_allowed_hosts()


# Application definition
Expand Down
Loading

0 comments on commit 5635e09

Please sign in to comment.