Skip to content

Commit

Permalink
Merge pull request #848 from gregschohn/ReplayerRetries
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn authored Sep 10, 2024
2 parents e64957a + 9b22274 commit 3396fce
Show file tree
Hide file tree
Showing 71 changed files with 2,312 additions and 1,045 deletions.
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {

testImplementation testFixtures(project(":RFS"))
testImplementation testFixtures(project(":coreUtilities"))
testImplementation testFixtures(project(":testHelperFixtures"))
testImplementation project(":CreateSnapshot")
testImplementation project(":MetadataMigration")
testImplementation group: 'org.apache.lucene', name: 'lucene-core'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.migrations.Version;
import org.opensearch.migrations.metadata.tracing.MetadataMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.testutils.ToxiProxyWrapper;
import org.opensearch.testcontainers.OpensearchContainer;

import com.rfs.common.FileSystemRepo;
Expand All @@ -27,12 +28,10 @@
import com.rfs.common.http.ConnectionContextTestParams;
import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.framework.SearchClusterContainer;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;

/**
* TODO - the code in this test was lifted from FullTest.java (now named ParallelDocumentMigrationsTest.java).
Expand All @@ -42,11 +41,9 @@
@Tag("longTest")
public class ProcessLifecycleTest extends SourceTestBase {

public static final String TOXIPROXY_IMAGE_NAME = "ghcr.io/shopify/toxiproxy:2.9.0";
public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";
public static final List<String> INDEX_ALLOWLIST = List.of();
public static final int TOXIPROXY_PORT = 8666;
public static final int OPENSEARCH_PORT = 9200;

enum FailHow {
Expand Down Expand Up @@ -93,7 +90,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
var osTargetContainer = new OpensearchContainer<>(targetImageName).withExposedPorts(OPENSEARCH_PORT)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
var proxyContainer = new ToxiproxyContainer(TOXIPROXY_IMAGE_NAME).withNetwork(network)
var proxyContainer = new ToxiProxyWrapper(network)
) {

CompletableFuture.allOf(CompletableFuture.supplyAsync(() -> {
Expand All @@ -103,7 +100,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
osTargetContainer.start();
return null;
}), CompletableFuture.supplyAsync(() -> {
proxyContainer.start();
proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT);
return null;
})).join();

Expand Down Expand Up @@ -160,10 +157,16 @@ private static void migrateMetadata(
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiproxyContainer proxyContainer,
ToxiProxyWrapper proxyContainer,
FailHow failHow
) throws IOException, InterruptedException {
String targetAddress = setupProxyAndGetAddress(proxyContainer, failHow);
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
tp.disable();
} else if (failHow == FailHow.WITH_DELAYS) {
tp.toxics().latency("latency-toxic", ToxicDirection.DOWNSTREAM, 100);
}

int timeoutSeconds = 90;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow);
Expand All @@ -183,26 +186,6 @@ private static int runProcessAgainstToxicTarget(
return process.exitValue();
}

@NotNull
private static String setupProxyAndGetAddress(ToxiproxyContainer proxyContainer, FailHow failHow)
throws IOException {
var toxiproxyClient = new ToxiproxyClient(proxyContainer.getHost(), proxyContainer.getControlPort());
var proxy = toxiproxyClient.createProxy(
"proxy",
"0.0.0.0:" + TOXIPROXY_PORT,
TARGET_DOCKER_HOSTNAME + ":" + OPENSEARCH_PORT
);
String targetAddress = "http://"
+ proxyContainer.getHost()
+ ":"
+ proxyContainer.getMappedPort(TOXIPROXY_PORT);
if (failHow == FailHow.AT_STARTUP) {
proxy.disable();
} else if (failHow == FailHow.WITH_DELAYS) {
proxy.toxics().latency("latency-toxic", ToxicDirection.DOWNSTREAM, 100);
}
return targetAddress;
}

@NotNull
private static ProcessBuilder setupProcess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,23 @@

logger = logging.getLogger(__name__)

BASE64_ENCODED_TUPLE_PATHS = ["sourceRequest.body", "targetRequest.body", "sourceResponse.body", "targetResponse.body"]
BASE64_ENCODED_TUPLE_PATHS = ["sourceRequest.body", "targetRequest.body", "sourceResponse.body"]
# TODO: I'm not positive about the capitalization of the Content-Encoding and Content-Type headers.
# This version worked on my test cases, but not guaranteed to work in all cases.
CONTENT_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Encoding"
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Encoding"
}
CONTENT_TYPE_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Type"
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Type"
}
TRANSFER_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Transfer-Encoding"
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Transfer-Encoding"
}

CONTENT_TYPE_JSON = "application/json"
Expand Down Expand Up @@ -160,17 +157,28 @@ def parse_tuple(line: str, line_no: int) -> dict:
if base64value is None:
# This component has no body element, which is potentially valid.
continue
content_encoding = get_element(CONTENT_ENCODING_PATH[body_path], tuple, try_lowercase_keys=True)
content_type = get_element(CONTENT_TYPE_PATH[body_path], tuple, try_lowercase_keys=True)
is_chunked_transfer = get_element(TRANSFER_ENCODING_PATH[body_path],
tuple, try_lowercase_keys=True) == TRANSFER_ENCODING_CHUNKED
value = parse_body_value(base64value, content_encoding, content_type, is_bulk_path,
is_chunked_transfer, line_no)
value = decode_base64_http_message(base64value, CONTENT_ENCODING_PATH[body_path], CONTENT_TYPE_PATH[body_path],
TRANSFER_ENCODING_PATH[body_path], is_bulk_path, line_no, tuple)
if value and type(value) is not bytes:
set_element(body_path, tuple, value)
for target_response in get_element("targetResponses", tuple):
value = decode_base64_http_message(base64value, "Content-Encoding", "Content-Type",
"Transfer-Encoding", is_bulk_path, line_no, target_response)
if value and type(value) is not bytes:
set_element("body", target_response, value)
return tuple


def decode_base64_http_message(base64value, content_encoding, content_type, transfer_encoding,
is_bulk_path, line_no, tuple):
content_encoding = get_element(content_encoding, tuple, try_lowercase_keys=True)
content_type = get_element(content_type, tuple, try_lowercase_keys=True)
is_chunked_transfer = get_element(transfer_encoding,
tuple, try_lowercase_keys=True) == TRANSFER_ENCODING_CHUNKED
return parse_body_value(base64value, content_encoding, content_type, is_bulk_path,
is_chunked_transfer, line_no)


if __name__ == "__main__":
args = parse_args()
if args.outfile:
Expand Down
2 changes: 0 additions & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ dependencies {
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.google.guava', name: 'guava'
implementation group: 'com.google.protobuf', name: 'protobuf-java'
implementation group: 'io.github.resilience4j', name: 'resilience4j-ratelimiter'
implementation group: 'io.github.resilience4j', name: 'resilience4j-retry'
implementation group: 'io.netty', name: 'netty-all'
implementation group: 'org.apache.kafka', name: 'kafka-clients'
implementation group: 'org.apache.logging.log4j', name: 'log4j-api'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -61,8 +62,14 @@ public static TrackedFuture<String, Void> bindNettyScheduleToCompletableFuture(
Duration delay
) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToTrackableFuture(
eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS),
var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
if (eventLoop.isShuttingDown()) {
// This is safe to do even though the event was scheduled because the scheduled future hasn't
// been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit.
return TextTrackedFuture.failedFuture(new CancellationException("event loop is already shutting down"),
() -> "Signalling that work cannot be scheduled because the even loop is already being shut down");
}
return bindNettyFutureToTrackableFuture(scheduledFuture,
"scheduling to run next send in " + delay + " (clipped: " + delayMs + "ms)"
);
}
Expand All @@ -73,6 +80,14 @@ public static CompletableFuture<Void> bindNettyScheduleToCompletableFuture(
CompletableFuture<Void> cf
) {
var delayMs = Math.max(0, delay.toMillis());
return bindNettyFutureToCompletableFuture(eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS), cf);
eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
var scheduledFuture = eventLoop.schedule(() -> {}, delayMs, TimeUnit.MILLISECONDS);
if (eventLoop.isShuttingDown()) {
// This is safe to do even though the event was scheduled because the scheduled future hasn't
// been wired to trigger anything else yet. This gives us an opportunity for an easy 2-phase-commit.
cf.completeExceptionally(new CancellationException("event loop is already shutting down"));
return cf;
}
return bindNettyFutureToCompletableFuture(scheduledFuture, cf);
}
}

This file was deleted.

Loading

0 comments on commit 3396fce

Please sign in to comment.