Skip to content

Commit

Permalink
Merge pull request #283 from mikaylathompson/update-exported-tuples
Browse files Browse the repository at this point in the history
(cherry picked from commit 92405b8)
  • Loading branch information
mikaylathompson committed Aug 29, 2023
1 parent 9221fe9 commit 491d4d4
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,32 @@
logger = logging.getLogger(__name__)

LOG_JSON_TUPLE_FIELD = "message"
BASE64_ENCODED_TUPLE_PATHS = ["request.body", "primaryResponse.body", "shadowResponse.body"]
BASE64_ENCODED_TUPLE_PATHS = ["sourceRequest.body", "targetRequest.body", "sourceResponse.body", "targetResponse.body"]
# TODO: I'm not positive about the capitalization of the Content-Encoding and Content-Type headers.
# This version worked on my test cases, but not guaranteed to work in all cases.
CONTENT_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Encoding"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Encoding"
}
CONTENT_TYPE_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Content-Type"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Content-Type",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Content-Type"
}
TRANSFER_ENCODING_PATH = {
BASE64_ENCODED_TUPLE_PATHS[0]: "request.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "primaryResponse.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "shadowResponse.Transfer-Encoding"
BASE64_ENCODED_TUPLE_PATHS[0]: "sourceRequest.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[1]: "targetRequest.Content-Encoding",
BASE64_ENCODED_TUPLE_PATHS[2]: "sourceResponse.Transfer-Encoding",
BASE64_ENCODED_TUPLE_PATHS[3]: "targetResponse.Transfer-Encoding"
}

CONTENT_TYPE_JSON = "application/json"
CONTENT_ENCODING_GZIP = "gzip"
TRANSFER_ENCODING_CHUNKED = "chunked"
URI_PATH = "request.Request-URI"
URI_PATH = "sourceRequest.Request-URI"
BULK_URI_PATH = "_bulk"


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@
@Slf4j
public class SourceTargetCaptureTuple {
private RequestResponsePacketPair sourcePair;
private final List<byte[]> shadowRequestData;
private final List<byte[]> shadowResponseData;
private final List<byte[]> targetRequestData;
private final List<byte[]> targetResponseData;
private final AggregatedTransformedResponse.HttpRequestTransformationStatus transformationStatus;
private final Throwable errorCause;
Duration shadowResponseDuration;
Duration targetResponseDuration;

public SourceTargetCaptureTuple(RequestResponsePacketPair sourcePair,
List<byte[]> shadowRequestData,
List<byte[]> shadowResponseData,
List<byte[]> targetRequestData,
List<byte[]> targetResponseData,
AggregatedTransformedResponse.HttpRequestTransformationStatus transformationStatus,
Throwable errorCause,
Duration shadowResponseDuration) {
Duration targetResponseDuration) {

Check warning on line 33 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L33

Added line #L33 was not covered by tests
this.sourcePair = sourcePair;
this.shadowRequestData = shadowRequestData;
this.shadowResponseData = shadowResponseData;
this.targetRequestData = targetRequestData;
this.targetResponseData = targetResponseData;

Check warning on line 36 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L35-L36

Added lines #L35 - L36 were not covered by tests
this.transformationStatus = transformationStatus;
this.errorCause = errorCause;
this.shadowResponseDuration = shadowResponseDuration;
this.targetResponseDuration = targetResponseDuration;

Check warning on line 39 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L39

Added line #L39 was not covered by tests
}

public static class TupleToFileWriter {
Expand All @@ -48,7 +48,6 @@ public TupleToFileWriter(OutputStream outputStream){
}

private JSONObject jsonFromHttpDataUnsafe(List<byte[]> data) throws IOException {

SequenceInputStream collatedStream = ReplayUtils.byteArraysToInputStream(data);
Scanner scanner = new Scanner(collatedStream, StandardCharsets.UTF_8);
scanner.useDelimiter("\r\n\r\n"); // The headers are seperated from the body with two newlines.
Expand Down Expand Up @@ -88,46 +87,58 @@ private JSONObject jsonFromHttpData(List<byte[]> data, Duration latency) throws

private JSONObject toJSONObject(SourceTargetCaptureTuple triple) throws IOException {
JSONObject meta = new JSONObject();
meta.put("request", jsonFromHttpData(triple.sourcePair.requestData.packetBytes));
meta.put("sourceRequest", jsonFromHttpData(triple.sourcePair.requestData.packetBytes));
meta.put("targetRequest", jsonFromHttpData(triple.targetRequestData));

Check warning on line 91 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L90-L91

Added lines #L90 - L91 were not covered by tests
//log.warn("TODO: These durations are not measuring the same values!");
meta.put("primaryResponse", jsonFromHttpData(triple.sourcePair.responseData.packetBytes,
meta.put("sourceResponse", jsonFromHttpData(triple.sourcePair.responseData.packetBytes,

Check warning on line 93 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L93

Added line #L93 was not covered by tests
Duration.between(triple.sourcePair.requestData.getLastPacketTimestamp(), triple.sourcePair.responseData.getLastPacketTimestamp())));
meta.put("shadowResponse", jsonFromHttpData(triple.shadowResponseData,
triple.shadowResponseDuration));
meta.put("targetResponse", jsonFromHttpData(triple.targetResponseData,

Check warning on line 95 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L95

Added line #L95 was not covered by tests
triple.targetResponseDuration));
meta.put("connectionId", triple.sourcePair.connectionId);

Check warning on line 97 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L97

Added line #L97 was not covered by tests

return meta;
}

/**
* Writes a "triple" object to an output stream as a JSON object.
* The JSON triple is output on one line, and has three objects: "request", "primaryResponse",
* and "shadowResponse". An example of the format is below.
* Writes a tuple object to an output stream as a JSON object.
* The JSON tuple is output on one line, and has several objects: "sourceRequest", "sourceResponse",
* "targetRequest", and "targetResponse". The "connectionId" is also included to aid in debugging.
* An example of the format is below.
* <p>
* {
* "request": {
* "sourceRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "primaryResponse": {
* "targetRequest": {
* "Request-URI": XYZ,
* "Method": XYZ,
* "HTTP-Version": XYZ
* "body": XYZ,
* "header-1": XYZ,
* "header-2": XYZ
* },
* "sourceResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-1": ABC
* },
* "shadowResponse": {
* "targetResponse": {
* "HTTP-Version": ABC,
* "Status-Code": ABC,
* "Reason-Phrase": ABC,
* "response_time_ms": 123,
* "body": ABC,
* "header-2": ABC
* }
* },
* "connectionId": "0242acfffe1d0008-0000000c-00000003-0745a19f7c3c5fc9-121001ff.0"
* }
*
* @param triple the RequestResponseResponseTriple object to be converted into json and written to the stream.
Expand All @@ -146,9 +157,9 @@ public String toString() {
final StringBuilder sb = new StringBuilder("SourceTargetCaptureTuple{");
sb.append("\n diagnosticLabel=").append(sourcePair.connectionId);
sb.append("\n sourcePair=").append(sourcePair);
sb.append("\n shadowResponseDuration=").append(shadowResponseDuration);
sb.append("\n shadowRequestData=").append(Utils.packetsToStringTruncated(shadowRequestData));
sb.append("\n shadowResponseData=").append(Utils.packetsToStringTruncated(shadowResponseData));
sb.append("\n targetResponseDuration=").append(targetResponseDuration);
sb.append("\n targetRequestData=").append(Utils.packetsToStringTruncated(targetRequestData));
sb.append("\n targetResponseData=").append(Utils.packetsToStringTruncated(targetResponseData));

Check warning on line 162 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java#L160-L162

Added lines #L160 - L162 were not covered by tests
sb.append("\n transformStatus=").append(transformationStatus);
sb.append("\n errorCause=").append(errorCause==null ? "null" : errorCause.toString());
sb.append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String
}

try {
log.info("Source/Shadow Request/Response tuple: " + requestResponseTriple);
log.info("Source/Target Request/Response tuple: " + requestResponseTriple);

Check warning on line 491 in TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java#L491

Added line #L491 was not covered by tests
tripleWriter.writeJSON(requestResponseTriple);
} catch (IOException e) {
log.error("Caught an IOException while writing triples output.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,18 @@ private static void writeHeadersIntoStream(HttpJsonMessageWithFaultingPayload ht
osw.append(httpJson.path());
osw.append(" ");
osw.append(httpJson.protocol());
osw.append("\n");
osw.append("\r\n");

for (var kvpList : httpJson.headers().asStrictMap().entrySet()) {
var key = kvpList.getKey();
for (var valueEntry : kvpList.getValue()) {
osw.append(key);
osw.append(": ");
osw.append(valueEntry);
osw.append("\n");
osw.append("\r\n");
}
}
osw.append("\n");
osw.append("\r\n");
osw.flush();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public void testTransformer() throws Exception {
.build();
var transformingHandler = new HttpJsonTransformingConsumer(jsonHandler, null, testPacketCapture, "TEST");
runRandomPayloadWithTransformer(transformingHandler, dummyAggregatedResponse, testPacketCapture,
contentLength -> "GET / HTTP/1.1\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\n" +
"content-length: " + contentLength + "\n");
contentLength -> "GET / HTTP/1.1\r\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" +
"content-length: " + contentLength + "\r\n");
}

private void runRandomPayloadWithTransformer(HttpJsonTransformingConsumer transformingHandler,
Expand Down Expand Up @@ -88,11 +88,11 @@ public void testMalformedPayloadIsPassedThrough() throws Exception {
httpBasicAuthTransformer, testPacketCapture, "TEST");

runRandomPayloadWithTransformer(transformingHandler, dummyAggregatedResponse, testPacketCapture,
contentLength -> "GET / HTTP/1.1\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\n" +
"content-type: application/json\n" +
"content-length: " + contentLength + "\n" +
"authorization: Basic YWRtaW46YWRtaW4=\n");
contentLength -> "GET / HTTP/1.1\r\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" +
"content-type: application/json\r\n" +
"content-length: " + contentLength + "\r\n" +
"authorization: Basic YWRtaW46YWRtaW4=\r\n");
}

/**
Expand All @@ -119,10 +119,10 @@ public void testMalformedPayload_andTypeMappingUri_IsPassedThrough() throws Exce
TestUtils.chainedDualWriteHeaderAndPayloadParts(transformingHandler,
stringParts,
referenceStringBuilder,
contentLength -> "PUT /foo HTTP/1.1\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\n" +
"content-type: application/json\n" +
"content-length: " + contentLength + "\n"
contentLength -> "PUT /foo HTTP/1.1\r\n" +
"HoSt: " + SOURCE_CLUSTER_NAME + "\r\n" +
"content-type: application/json\r\n" +
"content-length: " + contentLength + "\r\n"
);

var finalizationFuture = allConsumesFuture.thenCompose(v->transformingHandler.finalizeRequest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static DiagnosticTrackableCompletableFuture<String,Void> chainedWriteHeadersAndD
StringBuilder referenceStringAccumulator,
Function<Integer, String> headersGenerator) {
var contentLength = stringParts.stream().mapToInt(s->s.length()).sum();
String headers = headersGenerator.apply(contentLength) + "\n";
String headers = headersGenerator.apply(contentLength) + "\r\n";
referenceStringAccumulator.append(headers);
return chainedWriteHeadersAndDualWritePayloadParts(packetConsumer, stringParts, referenceStringAccumulator, headers);
}
Expand Down Expand Up @@ -134,10 +134,10 @@ static void runPipelineAndValidate(IJsonTransformer transformer,
"TEST");

var contentLength = stringParts.stream().mapToInt(s->s.length()).sum();
var headerString = "GET / HTTP/1.1\n" +
"host: localhost\n" +
var headerString = "GET / HTTP/1.1\r\n" +
"host: localhost\r\n" +
(extraHeaders == null ? "" : extraHeaders) +
"content-length: " + contentLength + "\n\n";
"content-length: " + contentLength + "\r\n\r\n";
var referenceStringBuilder = new StringBuilder();
var allConsumesFuture = chainedWriteHeadersAndDualWritePayloadParts(transformingHandler,
stringParts, referenceStringBuilder, headerString);
Expand Down

0 comments on commit 491d4d4

Please sign in to comment.