Skip to content

Commit

Permalink
Support for nd-json payloads.
Browse files Browse the repository at this point in the history
These are deserialized into a list of json objects and are accessed via a new top-level payload key so that they're distinguishable from a single json object.  Any leftover bytes are now appended after json objects.  Both the single-json and ndjson-list (mutually exclusive) and the leftover bytes are all accessible for transformations.
I've also added a transformation script to the replayer's command for docker-compose to exercise json body transformations.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Sep 8, 2024
1 parent 1b3794f commit b39eb8e
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317 --transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.async.ByteBufferFeeder;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -30,28 +31,42 @@ public class JsonAccumulator {
* Name in the stack.
*/
private final Deque<Object> jsonObjectStack;
private final ByteBufferFeeder feeder;
@Getter
private long totalBytesFullyConsumed;

public JsonAccumulator() throws IOException {
jsonObjectStack = new ArrayDeque<>();
JsonFactory factory = new JsonFactory();
parser = factory.createNonBlockingByteBufferParser();
feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder();
}

protected Map<String, Object> createMap() {
return new LinkedHashMap<>();
}

public boolean hasPartialValues() {
return !jsonObjectStack.isEmpty();
}

/**
* Returns the top-level object once it has been fully constructed or null if more input is still required.
* @param byteBuffer
* @return
* @throws IOException
*/
public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException {
ByteBufferFeeder feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder();
public Object consumeByteBufferForSingleObject(ByteBuffer byteBuffer) throws IOException {
consumeByteBuffer(byteBuffer);
return getNextTopLevelObject();
}

public void consumeByteBuffer(ByteBuffer byteBuffer) throws IOException {
log.trace("Consuming bytes: " + byteBuffer.toString());
feeder.feedInput(byteBuffer);

}

public Object getNextTopLevelObject() throws IOException {
while (!parser.isClosed()) {
var token = parser.nextToken();
if (token == null) {
Expand All @@ -71,6 +86,7 @@ public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException {
var array = ((ArrayList) jsonObjectStack.pop()).toArray();
pushCompletedValue(array);
if (jsonObjectStack.isEmpty()) {
totalBytesFullyConsumed = parser.currentLocation().getByteOffset();
return array;
}
break;
Expand All @@ -81,6 +97,7 @@ public Object consumeByteBuffer(ByteBuffer byteBuffer) throws IOException {
case END_OBJECT: {
var popped = jsonObjectStack.pop();
if (jsonObjectStack.isEmpty()) {
totalBytesFullyConsumed = parser.currentLocation().getByteOffset();
return popped;
} else {
pushCompletedValue(popped);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
package org.opensearch.migrations.replay.datahandlers;

import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;

import org.opensearch.migrations.replay.datahandlers.http.StrictCaseInsensitiveHttpHeadersMap;
import org.opensearch.migrations.transform.JsonKeysForHttpMessage;

import io.netty.buffer.ByteBuf;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,11 +19,9 @@
*/
@EqualsAndHashCode(callSuper = false)
@Slf4j
public class PayloadAccessFaultingMap extends AbstractMap<String, Object> {
public class PayloadAccessFaultingMap extends TreeMap<String, Object> {

private final boolean isJson;
private Object jsonValue;
private ByteBuf binaryValue;
@Getter
@Setter
private boolean disableThrowingPayloadNotLoaded;
Expand All @@ -41,12 +32,6 @@ public PayloadAccessFaultingMap(StrictCaseInsensitiveHttpHeadersMap headers) {
.orElse(false);
}

@Override
public boolean containsKey(Object key) {
return (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key) && jsonValue != null) ||
(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key) && binaryValue != null);
}

private Object nullOrThrow() {
if (disableThrowingPayloadNotLoaded) {
return null;
Expand All @@ -56,98 +41,10 @@ private Object nullOrThrow() {

@Override
public Object get(Object key) {
if (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key)) {
if (jsonValue == null) {
return nullOrThrow();
} else {
return jsonValue;
}
} else if (JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key)) {
if (binaryValue == null) {
return nullOrThrow();
} else {
return binaryValue;
}
} else {
return null;
var value = super.get(key);
if (value == null && !disableThrowingPayloadNotLoaded) {
throw PayloadNotLoadedException.getInstance();
}
}

@Override
@NonNull
public Set<Entry<String, Object>> entrySet() {
if (jsonValue != null) {
return Set.of(new SimpleEntry<>(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, jsonValue));
} else if (binaryValue != null) {
return Set.of(new SimpleEntry<>(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, binaryValue));
} else {
return new AbstractSet<>() {
@Override
@NonNull
public Iterator<Entry<String, Object>> iterator() {
return new Iterator<>() {
private int count;

@Override
public boolean hasNext() {
return count == 0 && isJson;
}

@Override
public Entry<String, Object> next() {
if (isJson && count == 0) {
++count;
if (jsonValue != null) {
return new SimpleEntry<>(
JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY,
jsonValue
);
} else if (binaryValue != null) {
return new SimpleEntry<>(
JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY,
binaryValue
);
} else {
if (!disableThrowingPayloadNotLoaded) {
throw PayloadNotLoadedException.getInstance();
}
}
}
throw new NoSuchElementException();
}
};
}

@Override
public int size() {
return isJson ? 1 : 0;
}
};
}
}

@Override
public Object put(String key, Object value) {
if (JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY.equals(key)) {
Object old = jsonValue;
jsonValue = value;
return old;
} else if (JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY.equals(key)) {
Object old = binaryValue;
binaryValue = (ByteBuf) value;
return old;
} else {
return null;
}
}

@Override
public String toString() {
final var sb = new StringBuilder("PayloadFaultMap{");
sb.append("isJson=").append(isJson);
sb.append(", jsonValue=").append(jsonValue);
sb.append(", binaryValue=").append(binaryValue);
sb.append('}');
return sb.toString();
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.event.Level;

/**
* This class implements a packet consuming interface by using an EmbeddedChannel to write individual
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.replay.datahandlers.http;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.opensearch.migrations.replay.datahandlers.JsonAccumulator;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
Expand Down Expand Up @@ -28,14 +29,18 @@ public class NettyJsonBodyAccumulateHandler extends ChannelInboundHandlerAdapter

JsonAccumulator jsonAccumulator;
HttpJsonMessageWithFaultingPayload capturedHttpJsonMessage;
Object parsedJsonObject;
List<Object> parsedJsonObjects;
CompositeByteBuf accumulatedBody;

@SneakyThrows
public NettyJsonBodyAccumulateHandler(IReplayContexts.IRequestTransformationContext context) {
this.context = context;
this.jsonAccumulator = new JsonAccumulator();
this.accumulatedBody = Unpooled.compositeBuffer();
// use 1024 (as opposed to the default of 16) because we really don't ever want the hit of a consolidation.
// For this buffer to continue to be used, we are far-off the happy-path.
// Consolidating will likely burn more cycles
this.accumulatedBody = Unpooled.compositeBuffer(1024);
this.parsedJsonObjects = new ArrayList<>();
}

@Override
Expand All @@ -45,17 +50,37 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
} else if (msg instanceof HttpContent) {
var contentBuf = ((HttpContent) msg).content();
accumulatedBody.addComponent(true, contentBuf.retainedDuplicate());
parsedJsonObject = jsonAccumulator.consumeByteBuffer(contentBuf.nioBuffer());
var nioBuf = contentBuf.nioBuffer();
jsonAccumulator.consumeByteBuffer(nioBuf);
Object nextObj;
while ((nextObj = jsonAccumulator.getNextTopLevelObject()) != null) {
parsedJsonObjects.add(nextObj);
}
if (msg instanceof LastHttpContent) {
if (parsedJsonObject != null) {
if (!parsedJsonObjects.isEmpty()) {
var payload = capturedHttpJsonMessage.payload();
if (parsedJsonObjects.size() > 1) {
payload.put(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY, parsedJsonObjects);
} else {
payload.put(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, parsedJsonObjects.get(0));
}
if (!jsonAccumulator.hasPartialValues()) {
context.onJsonPayloadParseSucceeded();
}
}
if (jsonAccumulator.hasPartialValues()) {
if (jsonAccumulator.getTotalBytesFullyConsumed() > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException("JSON contents were too large " +
jsonAccumulator.getTotalBytesFullyConsumed() + " for a single composite ByteBuf");
}
// skip the contents that were already parsed and included in the payload as parsed json
accumulatedBody.readerIndex((int) jsonAccumulator.getTotalBytesFullyConsumed());
// and pass the remaining stream
capturedHttpJsonMessage.payload()
.put(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY, parsedJsonObject);
context.onJsonPayloadParseSucceeded();
.put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, accumulatedBody);
} else {
accumulatedBody.release();
accumulatedBody = null;
} else {
capturedHttpJsonMessage.payload()
.put(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY, accumulatedBody);
}
ctx.fireChannelRead(capturedHttpJsonMessage);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package org.opensearch.migrations.replay.datahandlers.http;

import java.io.IOException;
import java.util.Map;
import java.util.List;

import org.opensearch.migrations.replay.datahandlers.JsonEmitter;
import org.opensearch.migrations.transform.JsonKeysForHttpMessage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,28 +26,54 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
jsonMessage.setPayloadFaultMap(null);
ctx.fireChannelRead(msg);
if (payload.containsKey(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY)) {
serializePayload(ctx, (Map) payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY));
} else {
if (payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) {
var rawBody = (ByteBuf) payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY);
if (rawBody.readableBytes() > 0) {
ctx.fireChannelRead(new DefaultHttpContent(rawBody));
}
serializePayload(ctx, payload.get(JsonKeysForHttpMessage.INLINED_JSON_BODY_DOCUMENT_KEY));
} else if (payload.containsKey(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY)) {
serializePayloadList(ctx,
(List) payload.get(JsonKeysForHttpMessage.INLINED_NDJSON_BODIES_DOCUMENT_KEY),
!payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY));
}
if (payload.containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) {
var rawBody = (ByteBuf) payload.get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY);
if (rawBody.readableBytes() > 0) {
ctx.fireChannelRead(new DefaultHttpContent(rawBody));
}
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
}
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
} else {
super.channelRead(ctx, msg);
}
}

private void serializePayload(ChannelHandlerContext ctx, Map<String, Object> payload) throws IOException {
private static final ByteBuf NEWLINE = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[]{'\n'}));

private void serializePayloadList(ChannelHandlerContext ctx, List<Object> payloadList, boolean addLastNewline)
throws IOException
{
var it = payloadList.iterator();
while (it.hasNext()) {
var payload = it.next();
try (var jsonEmitter = new JsonEmitter(ctx.alloc())) {
var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING);
while (true) {
ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents));
if (pac.nextSupplier == null) {
break;
}
pac = pac.nextSupplier.get();
}
if (addLastNewline || it.hasNext()) {
ctx.fireChannelRead(new DefaultHttpContent(NEWLINE.duplicate()));
}
}
}
}

private void serializePayload(ChannelHandlerContext ctx, Object payload) throws IOException{
try (var jsonEmitter = new JsonEmitter(ctx.alloc())) {
var pac = jsonEmitter.getChunkAndContinuations(payload, NUM_BYTES_TO_ACCUMULATE_BEFORE_FIRING);
while (true) {
ctx.fireChannelRead(new DefaultHttpContent(pac.partialSerializedContents));
if (pac.nextSupplier == null) {
ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT);
break;
}
pac = pac.nextSupplier.get();
Expand Down
Loading

0 comments on commit b39eb8e

Please sign in to comment.