From 8274fe862d64341a672499d9131c86fd614f1750 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Sun, 8 Sep 2024 10:04:31 -0400 Subject: [PATCH] Support for nd-json payloads. These are deserialized into a list of json objects and are accessed via a new top-level payload key so that they're distinguishable from a single json object. Any leftover bytes are now appended after json objects. Both the single-json and ndjson-list (mutually exclusive) and the leftover bytes are all accessible for transformations. I've also added a transformation script to the replayer's command for docker-compose to exercise json body transformations. Signed-off-by: Greg Schohn --- .../src/main/docker/docker-compose.yml | 2 +- .../replay/datahandlers/JsonAccumulator.java | 23 +++- .../PayloadAccessFaultingMap.java | 115 +----------------- .../http/HttpJsonTransformingConsumer.java | 1 - .../http/NettyJsonBodyAccumulateHandler.java | 45 +++++-- .../http/NettyJsonBodySerializeHandler.java | 50 ++++++-- .../datahandlers/JsonAccumulatorTest.java | 2 +- .../HttpJsonTransformingConsumerTest.java | 79 ++++++++++++ .../transform/JsonKeysForHttpMessage.java | 4 + .../transform/TypeMappingsExcisionTest.java | 2 +- 10 files changed, 185 insertions(+), 138 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml index 04c432a7c..aab6e6ef4 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml +++ b/TrafficCapture/dockerSolution/src/main/docker/docker-compose.yml @@ -78,7 +78,7 @@ services: condition: service_started opensearchtarget: condition: service_started - command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" + command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg==" opensearchtarget: image: 'opensearchproject/opensearch:2.15.0' diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulator.java index 34672dfa5..72c3fce4e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulator.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.async.ByteBufferFeeder; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** @@ -30,28 +31,42 @@ public class JsonAccumulator { * Name in the stack. */ private final Deque jsonObjectStack; + private final ByteBufferFeeder feeder; + @Getter + private long totalBytesFullyConsumed; public JsonAccumulator() throws IOException { jsonObjectStack = new ArrayDeque<>(); JsonFactory factory = new JsonFactory(); parser = factory.createNonBlockingByteBufferParser(); + feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder(); } protected Map createMap() { return new LinkedHashMap<>(); } + public boolean hasPartialValues() { + return !jsonObjectStack.isEmpty(); + } + /** * Returns the top-level object once it has been fully constructed or null if more input is still required. * @param byteBuffer * @return * @throws IOException */ - public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException { - ByteBufferFeeder feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder(); + public Object consumeByteBufferForSingleObject(ByteBuffer byteBuffer) throws IOException { + consumeByteBuffer(byteBuffer); + return getNextTopLevelObject(); + } + + public void consumeByteBuffer(ByteBuffer byteBuffer) throws IOException { log.trace("Consuming bytes: " + byteBuffer.toString()); feeder.feedInput(byteBuffer); - + } + + public Object getNextTopLevelObject() throws IOException { while (!parser.isClosed()) { var token = parser.nextToken(); if (token == null) { @@ -71,6 +86,7 @@ public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException { var array = ((ArrayList) jsonObjectStack.pop()).toArray(); pushCompletedValue(array); if (jsonObjectStack.isEmpty()) { + totalBytesFullyConsumed = parser.currentLocation().getByteOffset(); return array; } break; @@ -81,6 +97,7 @@ public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException { case END_OBJECT: { var popped = jsonObjectStack.pop(); if (jsonObjectStack.isEmpty()) { + totalBytesFullyConsumed = parser.currentLocation().getByteOffset(); return popped; } else { pushCompletedValue(popped); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java index 64cb7f64d..801e9e1a4 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java @@ -1,19 +1,12 @@ package org.opensearch.migrations.replay.datahandlers; -import java.util.AbstractMap; -import java.util.AbstractSet; -import java.util.Iterator; -import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Set; +import java.util.TreeMap; import org.opensearch.migrations.replay.datahandlers.http.StrictCaseInsensitiveHttpHeadersMap; -import org.opensearch.migrations.transform.JsonKeysForHttpMessage; -import io.netty.buffer.ByteBuf; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.NonNull; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -26,11 +19,9 @@ */ @EqualsAndHashCode(callSuper = false) @Slf4j -public class PayloadAccessFaultingMap extends AbstractMap { +public class PayloadAccessFaultingMap extends TreeMap { private final boolean isJson; - private Object jsonValue; - private ByteBuf binaryValue; @Getter @Setter private boolean disableThrowingPayloadNotLoaded; @@ -41,12 +32,6 @@ public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) { .orElse(false); } - @Override - public boolean containsKey(Object key) { - return (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key) && jsonValue != null) || - (JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key) && binaryValue != null); - } - private Object nullOrThrow() { if (disableThrowingPayloadNotLoaded) { return null; @@ -56,98 +41,10 @@ private Object nullOrThrow() { @Override public Object get(Object key) { - if (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key)) { - if (jsonValue == null) { - return nullOrThrow(); - } else { - return jsonValue; - } - } else if (JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key)) { - if (binaryValue == null) { - return nullOrThrow(); - } else { - return binaryValue; - } - } else { - return null; + var value = super.get(key); + if (value == null && !disableThrowingPayloadNotLoaded) { + throw PayloadNotLoadedException.getInstance(); } - } - - @Override - @NonNull - public Set> entrySet() { - if (jsonValue != null) { - return Set.of(new SimpleEntry<>(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, jsonValue)); - } else if (binaryValue != null) { - return Set.of(new SimpleEntry<>(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, binaryValue)); - } else { - return new AbstractSet<>() { - @Override - @NonNull - public Iterator> iterator() { - return new Iterator<>() { - private int count; - - @Override - public boolean hasNext() { - return count == 0 && isJson; - } - - @Override - public Entry next() { - if (isJson && count == 0) { - ++count; - if (jsonValue != null) { - return new SimpleEntry<>( - JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, - jsonValue - ); - } else if (binaryValue != null) { - return new SimpleEntry<>( - JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, - binaryValue - ); - } else { - if (!disableThrowingPayloadNotLoaded) { - throw PayloadNotLoadedException.getInstance(); - } - } - } - throw new NoSuchElementException(); - } - }; - } - - @Override - public int size() { - return isJson ? 1 : 0; - } - }; - } - } - - @Override - public Object put(String key, Object value) { - if (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key)) { - Object old = jsonValue; - jsonValue = value; - return old; - } else if (JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key)) { - Object old = binaryValue; - binaryValue = (ByteBuf) value; - return old; - } else { - return null; - } - } - - @Override - public String toString() { - final var sb = new StringBuilder("PayloadFaultMap{"); - sb.append("isJson=").append(isJson); - sb.append(", jsonValue=").append(jsonValue); - sb.append(", binaryValue=").append(binaryValue); - sb.append('}'); - return sb.toString(); + return value; } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java index df3247cb0..1891cabf0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java @@ -20,7 +20,6 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import lombok.extern.slf4j.Slf4j; -import org.slf4j.event.Level; /** * This class implements a packet consuming interface by using an EmbeddedChannel to write individual diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java index b3ba5893d..71e11bf09 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.replay.datahandlers.http; -import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import org.opensearch.migrations.replay.datahandlers.JsonAccumulator; import org.opensearch.migrations.replay.tracing.IReplayContexts; @@ -28,14 +29,18 @@ public class NettyJsonBodyAccumulateHandler extends ChannelInboundHandlerAdapter JsonAccumulator jsonAccumulator; HttpJsonMessageWithFaultingPayload capturedHttpJsonMessage; - Object parsedJsonObject; + List parsedJsonObjects; CompositeByteBuf accumulatedBody; @SneakyThrows public NettyJsonBodyAccumulateHandler(IReplayContexts.IRequestTransformationContext context) { this.context = context; this.jsonAccumulator = new JsonAccumulator(); - this.accumulatedBody = Unpooled.compositeBuffer(); + // use 1024 (as opposed to the default of 16) because we really don't ever want the hit of a consolidation. + // For this buffer to continue to be used, we are far-off the happy-path. + // Consolidating will likely burn more cycles + this.accumulatedBody = Unpooled.compositeBuffer(1024); + this.parsedJsonObjects = new ArrayList<>(); } @Override @@ -45,17 +50,37 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } else if (msg instanceof HttpContent) { var contentBuf = ((HttpContent) msg).content(); accumulatedBody.addComponent(true, contentBuf.retainedDuplicate()); - parsedJsonObject = jsonAccumulator.consumeByteBuffer(contentBuf.nioBuffer()); + var nioBuf = contentBuf.nioBuffer(); + jsonAccumulator.consumeByteBuffer(nioBuf); + Object nextObj; + while ((nextObj = jsonAccumulator.getNextTopLevelObject()) != null) { + parsedJsonObjects.add(nextObj); + } if (msg instanceof LastHttpContent) { - if (parsedJsonObject != null) { + if (!parsedJsonObjects.isEmpty()) { + var payload = capturedHttpJsonMessage.payload(); + if (parsedJsonObjects.size() > 1) { + payload.put(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY, parsedJsonObjects); + } else { + payload.put(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, parsedJsonObjects.get(0)); + } + if (!jsonAccumulator.hasPartialValues()) { + context.onJsonPayloadParseSucceeded(); + } + } + if (jsonAccumulator.hasPartialValues()) { + if (jsonAccumulator.getTotalBytesFullyConsumed() > Integer.MAX_VALUE) { + throw new IndexOutOfBoundsException("JSON contents were too large " + + jsonAccumulator.getTotalBytesFullyConsumed() + " for a single composite ByteBuf"); + } + // skip the contents that were already parsed and included in the payload as parsed json + accumulatedBody.readerIndex((int) jsonAccumulator.getTotalBytesFullyConsumed()); + // and pass the remaining stream capturedHttpJsonMessage.payload() - .put(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, parsedJsonObject); - context.onJsonPayloadParseSucceeded(); + .put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, accumulatedBody); + } else { accumulatedBody.release(); accumulatedBody = null; - } else { - capturedHttpJsonMessage.payload() - .put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, accumulatedBody); } ctx.fireChannelRead(capturedHttpJsonMessage); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java index c71a1dd09..45cb7c52b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodySerializeHandler.java @@ -1,16 +1,16 @@ package org.opensearch.migrations.replay.datahandlers.http; import java.io.IOException; -import java.util.Map; +import java.util.List; import org.opensearch.migrations.replay.datahandlers.JsonEmitter; import org.opensearch.migrations.transform.JsonKeysForHttpMessage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; import lombok.extern.slf4j.Slf4j; @@ -26,28 +26,54 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception jsonMessage.setPayloadFaultMap(null); ctx.fireChannelRead(msg); if (payload.containsKey(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)) { - serializePayload(ctx, (Map) payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)); - } else { - if (payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) { - var rawBody = (ByteBuf) payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); - if (rawBody.readableBytes() > 0) { - ctx.fireChannelRead(new DefaultHttpContent(rawBody)); - } + serializePayload(ctx, payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)); + } else if (payload.containsKey(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY)) { + serializePayloadList(ctx, + (List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY), + !payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)); + } + if (payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) { + var rawBody = (ByteBuf) payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); + if (rawBody.readableBytes() > 0) { + ctx.fireChannelRead(new DefaultHttpContent(rawBody)); } - ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); } + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); } else { super.channelRead(ctx, msg); } } - private void serializePayload(ChannelHandlerContext ctx, Map payload) throws IOException { + private static final ByteBuf NEWLINE = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[]{'\n'})); + + private void serializePayloadList(ChannelHandlerContext ctx, List payloadList, boolean addLastNewline) + throws IOException + { + var it = payloadList.iterator(); + while (it.hasNext()) { + var payload = it.next(); + try (var jsonEmitter = new JsonEmitter(ctx.alloc())) { + var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING); + while (true) { + ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents)); + if (pac.nextSupplier == null) { + break; + } + pac = pac.nextSupplier.get(); + } + if (addLastNewline || it.hasNext()) { + ctx.fireChannelRead(new DefaultHttpContent(NEWLINE.duplicate())); + } + } + } + } + + private void serializePayload(ChannelHandlerContext ctx, Object payload) throws IOException{ try (var jsonEmitter = new JsonEmitter(ctx.alloc())) { var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING); while (true) { ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents)); if (pac.nextSupplier == null) { - ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); break; } pac = pac.nextSupplier.get(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java index cc9683e1c..49a40c92e 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/JsonAccumulatorTest.java @@ -31,7 +31,7 @@ static Object readJson(byte[] testFileBytes, int chunkBound) throws IOException var chunkSize = Math.min(r.nextInt(chunkBound), chunkByteBuffer.remaining()); chunkByteBuffer.limit(chunkSize + i); i += chunkSize; - var completedObject = jsonParser.consumeByteBuffer(chunkByteBuffer); + var completedObject = jsonParser.consumeByteBufferForSingleObject(chunkByteBuffer); if (completedObject != null) { Assertions.assertEquals(testFileBytes.length, i); return completedObject; diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java index fe85e97c3..82539288c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumerTest.java @@ -24,11 +24,24 @@ import org.opensearch.migrations.tracing.InstrumentationTest; import org.opensearch.migrations.transform.IJsonTransformer; import org.opensearch.migrations.transform.JsonCompositeTransformer; +import org.opensearch.migrations.transform.JsonKeysForHttpMessage; import org.opensearch.migrations.transform.RemovingAuthTransformerFactory; +import io.netty.buffer.ByteBuf; + @WrapWithNettyLeakDetection class HttpJsonTransformingConsumerTest extends InstrumentationTest { + private final static String NDJSON_TEST_REQUEST = ( + "POST /test HTTP/1.1\r\n" + + "Host: foo.example\r\n" + + "Content-Type: application/json\r\n" + + "Content-Length: 97\r\n" + + "\r\n" + + "{\"index\":{\"_index\":\"test\",\"_id\":\"2\"}}\n" + + "{\"field1\":\"value1\"}\n" + + "{\"delete\":{\"_index\":\"test\",\"_id\":\"1\"}}\n"); + private static Stream provideTestParameters() { Integer[] attemptedChunks = { 1, 2, 4, 8, 100, 1000, Integer.MAX_VALUE }; Boolean[] transformationOptions = { true, false }; @@ -131,6 +144,9 @@ public void testPartialBodyIsPassedThrough() throws Exception { var complexTransformer = new JsonCompositeTransformer(new IJsonTransformer() { @Override public Map transformJson(Map incomingJson) { + var payload = (Map) incomingJson.get("payload"); + Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY)); + Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)); ((Map) incomingJson.get("headers")) .put("extraKey", "extraValue"); // just walk everything - that's enough to touch the payload and throw @@ -173,6 +189,69 @@ private void walkMaps(Object o) { Assertions.assertNull(returnedResponse.error); } + @Test + public void testNewlineDelimitedJsonBodyIsHandled() throws Exception { + final var dummyAggregatedResponse = new AggregatedRawResponse(19, null, null, null); + var testPacketCapture = new TestCapturePacketToHttpHandler(Duration.ofMillis(100), dummyAggregatedResponse); + var sizeCalculatingTransformer = new JsonCompositeTransformer(incomingJson -> { + var payload = (Map) incomingJson.get("payload"); + Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)); + Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)); + var list = (List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY); + ((Map) incomingJson.get("headers")) + .put("listSize", ""+list.size()); + return incomingJson; + }); + var transformingHandler = new HttpJsonTransformingConsumer( + sizeCalculatingTransformer, + null, + testPacketCapture, + rootContext.getTestConnectionRequestContext(0) + ); + + transformingHandler.consumeBytes(NDJSON_TEST_REQUEST.getBytes(StandardCharsets.UTF_8)); + var returnedResponse = transformingHandler.finalizeRequest().get(); + var expectedString = NDJSON_TEST_REQUEST.replace("\r\n\r\n","\r\nlistSize: 3\r\n\r\n"); + Assertions.assertEquals(expectedString, testPacketCapture.getCapturedAsString()); + Assertions.assertEquals(HttpRequestTransformationStatus.COMPLETED, returnedResponse.transformationStatus); + Assertions.assertNull(returnedResponse.error); + } + + @Test + public void testPartialNewlineDelimitedJsonBodyIsHandled() throws Exception { + final var dummyAggregatedResponse = new AggregatedRawResponse(19, null, null, null); + var testPacketCapture = new TestCapturePacketToHttpHandler(Duration.ofMillis(100), dummyAggregatedResponse); + var sizeCalculatingTransformer = new JsonCompositeTransformer(incomingJson -> { + var payload = (Map) incomingJson.get("payload"); + Assertions.assertNull(payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)); + Assertions.assertNotNull(payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)); + var list = (List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY); + var leftoverBytes = (ByteBuf) payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); + var headers = (Map) incomingJson.get("headers"); + headers.put("listSize", "" + list.size()); + headers.put("leftover", "" + leftoverBytes.readableBytes()); + return incomingJson; + }); + var transformingHandler = new HttpJsonTransformingConsumer( + sizeCalculatingTransformer, + null, + testPacketCapture, + rootContext.getTestConnectionRequestContext(0) + ); + + var testString = NDJSON_TEST_REQUEST + .replace("Content-Length: 97", "Content-Length: 87") + .substring(0, NDJSON_TEST_REQUEST.length()-10); + var testBytes = testString.getBytes(StandardCharsets.UTF_8); + transformingHandler.consumeBytes(testBytes); + var returnedResponse = transformingHandler.finalizeRequest().get(); + var expectedString = new String(testBytes, StandardCharsets.UTF_8) + .replace("\r\n\r\n","\r\nlistSize: 2\r\nleftover: 30\r\n\r\n"); + Assertions.assertEquals(expectedString, testPacketCapture.getCapturedAsString()); + Assertions.assertEquals(HttpRequestTransformationStatus.COMPLETED, returnedResponse.transformationStatus); + Assertions.assertNull(returnedResponse.error); + } + public static List sliceRandomChunks(byte[] bytes, int numChunks) { Random random = new Random(0); List chunkSizes = new ArrayList<>(numChunks); diff --git a/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java b/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java index 70ff96f76..f5ff837c2 100644 --- a/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java +++ b/TrafficCapture/transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java @@ -16,6 +16,10 @@ private JsonKeysForHttpMessage() {} * the payload object will be an empty map. */ public static final String INLINED_JSON_BODY_DOCUMENT_KEY = "inlinedJsonBody"; + /** + * for the type application + */ + public static final String INLINED_NDJSON_BODIES_DOCUMENT_KEY = "inlinedJsonSequenceBodies"; /** * This maps to a ByteBuf that is owned by the caller. * Any consumers should retain if they need to access it later. This may be UTF8, UTF16 encoded, or something else. diff --git a/TrafficCapture/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/test/java/org/opensearch/migrations/transform/TypeMappingsExcisionTest.java b/TrafficCapture/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/test/java/org/opensearch/migrations/transform/TypeMappingsExcisionTest.java index 5eb218337..4f9fd36e2 100644 --- a/TrafficCapture/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/test/java/org/opensearch/migrations/transform/TypeMappingsExcisionTest.java +++ b/TrafficCapture/transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/test/java/org/opensearch/migrations/transform/TypeMappingsExcisionTest.java @@ -51,7 +51,7 @@ private static Map parseJsonFromResourceName(String resourceName var isr = new InputStreamReader(resourceStream, StandardCharsets.UTF_8) ) { var expectedBytes = CharStreams.toString(isr).getBytes(StandardCharsets.UTF_8); - return (Map) jsonAccumulator.consumeByteBuffer(ByteBuffer.wrap(expectedBytes)); + return (Map) jsonAccumulator.consumeByteBufferForSingleObject(ByteBuffer.wrap(expectedBytes)); } }