Skip to content

Commit

Permalink
Merge branch 'main' into RemoveSourceAndProxyDependenciesFromMigratio…
Browse files Browse the repository at this point in the history
…nConsoleCDK
  • Loading branch information
gregschohn authored Oct 8, 2024
2 parents c332e1e + 5709add commit 4dd162b
Show file tree
Hide file tree
Showing 36 changed files with 2,768 additions and 82 deletions.
2 changes: 0 additions & 2 deletions CreateSnapshot/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(':coreUtilities')
implementation project(":RFS")

Expand Down
1 change: 0 additions & 1 deletion DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class DockerServiceProps {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")
implementation platform('io.projectreactor:reactor-bom:2023.0.5')
testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')

Expand Down
44 changes: 42 additions & 2 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,45 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then
fi
fi

echo "Executing RFS Command"
eval $RFS_COMMAND
# Extract the value passed after --s3-local-dir
S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
# Extract the value passed after --lucene-dir
LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR"
else
echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Will delete lucene local directory between runs: $LUCENE_DIR"
else
echo "--lucene-dir argument not found in RFS_COMMAND. This is required."
exit 1
fi

cleanup_directories() {
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Cleaning up S3 local directory: $S3_LOCAL_DIR"
rm -rf "$S3_LOCAL_DIR"
echo "Directory $S3_LOCAL_DIR has been cleaned up."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Cleaning up Lucene local directory: $LUCENE_DIR"
rm -rf "$LUCENE_DIR"
echo "Directory $LUCENE_DIR has been cleaned up."
fi
}



[ -z "$RFS_COMMAND" ] && \
{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \
until ! {
echo "Running command $RFS_COMMAND"
eval "$RFS_COMMAND"
}; do
echo "Cleaning up directories before the next run."
cleanup_directories
done
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 2;
public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2;
public static final int NO_WORK_LEFT_EXIT_CODE = 3;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

Expand Down Expand Up @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception {
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

var connectionContext = arguments.targetArgs.toConnectionContext();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
)) {
try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
Expand Down Expand Up @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception {
unpackerFactory,
arguments.maxShardSizeBytes,
context);
} catch (NoWorkLeftException e) {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}

private static void exitOnLeaseTimeout(String workItemId) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

Expand All @@ -24,6 +26,9 @@
import org.opensearch.testcontainers.OpensearchContainer;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.Network;
Expand All @@ -47,6 +52,37 @@ enum FailHow {
WITH_DELAYS
}

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Test
@Tag("longTest")
public void testExitsZeroThenThreeForSimpleSetup() throws Exception {
testProcess(3,
d -> {
var firstExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(0, firstExitCode);
for (int i=0; i<10; ++i) {
var secondExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
if (secondExitCode != 0) {
var lastErrorCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(secondExitCode, lastErrorCode);
return lastErrorCode;
}
}
Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code");
return -1; // won't be evaluated
});
}

@ParameterizedTest
@CsvSource(value = {
// This test will go through a proxy that doesn't add any defects and the process will use defaults
Expand All @@ -62,6 +98,12 @@ enum FailHow {
"WITH_DELAYS, 2" })
public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception {
final var failHow = FailHow.valueOf(failAfterString);
testProcess(expectedExitCode,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow));
}

@SneakyThrows
private void testProcess(int expectedExitCode, Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
Expand Down Expand Up @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow);
int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();

// Check if the exit code is as expected
Expand All @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
}
}

@SneakyThrows
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
FailHow failHow
) throws IOException, InterruptedException {
FailHow failHow)
{
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
Expand Down
2 changes: 0 additions & 2 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(":coreUtilities")
implementation project(":RFS")
implementation project(':transformation')
Expand Down
2 changes: 0 additions & 2 deletions RFS/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ ext {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")
implementation project(':awsUtilities')
implementation project(':coreUtilities')
implementation project(':transformation')
Expand Down Expand Up @@ -75,7 +74,6 @@ dependencies {
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations'

testFixturesImplementation project(":commonDependencyVersionConstraints")
testFixturesImplementation project(':transformation')
testFixturesImplementation testFixtures(project(":coreUtilities"))
testFixturesImplementation group: 'com.github.docker-java', name: 'docker-java'
Expand Down
1 change: 0 additions & 1 deletion TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins {
}

dependencies {
api project(":commonDependencyVersionConstraints")
implementation project(':TrafficCapture:captureOffloader')
implementation project(':coreUtilities')
implementation group: 'com.google.protobuf', name:'protobuf-java'
Expand Down
1 change: 0 additions & 1 deletion TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ sourceSets {
}
}
dependencies {
api project(":commonDependencyVersionConstraints")
api group: 'io.netty', name: 'netty-buffer'

implementation project(':TrafficCapture:captureProtobufs')
Expand Down
1 change: 0 additions & 1 deletion TrafficCapture/captureProtobufs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ plugins {
}

dependencies {
api project(":commonDependencyVersionConstraints")
api group: 'com.google.protobuf', name: 'protobuf-java'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def setup_backfill(request):
assert metadata_result.success
backfill_start_result: CommandResult = backfill.start()
assert backfill_start_result.success
backfill_scale_result: CommandResult = backfill.scale(units=10)
# small enough to allow containers to be reused, big enough to test scaling out
backfill_scale_result: CommandResult = backfill.scale(units=2)
assert backfill_scale_result.success


Expand Down
2 changes: 0 additions & 2 deletions TrafficCapture/nettyWireLogging/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ plugins {
}

dependencies {
api project(":commonDependencyVersionConstraints")

implementation project(':TrafficCapture:captureOffloader')
implementation project(':coreUtilities')
api group: 'io.netty', name: 'netty-buffer'
Expand Down
2 changes: 0 additions & 2 deletions TrafficCapture/trafficCaptureProxyServer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ configurations {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")

def openSearch = '2.11.1'
implementation "org.opensearch.plugin:opensearch-security:${openSearch}.0"
implementation "org.opensearch:opensearch-common:${openSearch}"
Expand Down
2 changes: 0 additions & 2 deletions TrafficCapture/trafficCaptureProxyServerTest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ def calculateDockerHash = { projectName ->
}

dependencies {
api project(":commonDependencyVersionConstraints")

implementation project(':TrafficCapture:trafficCaptureProxyServer')
compileOnly 'org.projectlombok:lombok:1.18.28'
annotationProcessor 'org.projectlombok:lombok:1.18.28'
Expand Down
2 changes: 0 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ plugins {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")

implementation project(':TrafficCapture:captureProtobufs')
implementation project(':coreUtilities')
implementation project(':awsUtilities')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ plugins {
}

dependencies {
implementation project(":commonDependencyVersionConstraints")
implementation project(':TrafficCapture:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerInterface')

implementation group: 'com.bazaarvoice.jolt', name: 'jolt-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ plugins {
}

dependencies {
api project(":commonDependencyVersionConstraints")
}
2 changes: 0 additions & 2 deletions awsUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

dependencies {
api project(":commonDependencyVersionConstraints")

implementation group: 'software.amazon.awssdk', name: 'arns'
implementation group: 'software.amazon.awssdk', name: 'auth'
implementation group: 'software.amazon.awssdk', name: 'sdk-core'
Expand Down
19 changes: 19 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ allprojects {
}
}

subprojects { subproject ->
subproject.afterEvaluate {
if (subproject.plugins.hasPlugin('java') && subproject.name != 'commonDependencyVersionConstraints') {
subproject.dependencies {
implementation project(":commonDependencyVersionConstraints")
annotationProcessor project(":commonDependencyVersionConstraints")
if (subproject.plugins.hasPlugin('java-test-fixtures')) {
testFixturesImplementation project(":commonDependencyVersionConstraints")
}
}
}
}
}

task buildDockerImages() {
dependsOn(':TrafficCapture:dockerSolution:buildDockerImages')
dependsOn(':DocumentsFromSnapshotMigration:buildDockerImages')
Expand All @@ -34,6 +48,11 @@ spotless {
indentWithSpaces()
endWithNewline()
}
json {
target 'deployment/cdk/opensearch-service-migration/*.json'
prettier()
endWithNewline()
}
}

subprojects {
Expand Down
2 changes: 0 additions & 2 deletions coreUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

dependencies {
api project(":commonDependencyVersionConstraints")

implementation group: "com.google.protobuf", name: "protobuf-java"
implementation group: 'org.slf4j', name: 'slf4j-api'

Expand Down
3 changes: 0 additions & 3 deletions dashboardsSanitizer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ repositories {
}

dependencies {

implementation project(":commonDependencyVersionConstraints")

implementation project(':coreUtilities')
implementation project(":RFS")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
},
"vpcId": "<VPC_ID>",
"reindexFromSnapshotServiceEnabled": true,
"reindexFromSnapshotMaxShardSizeGiB": 80,
"trafficReplayerServiceEnabled": true
}
}
Loading

0 comments on commit 4dd162b

Please sign in to comment.