diff --git a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java index 4db824afb..bf51fd4c9 100644 --- a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java +++ b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder; import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; @@ -47,6 +48,11 @@ public KafkaCaptureFactory(String nodeId, Producer producer, int this(nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize); } + @Override + public IChannelConnectionCaptureSerializer createOffloader(String connectionId) { + return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId)); + } + @AllArgsConstructor static class CodedOutputStreamWrapper implements CodedOutputStreamHolder { private final CodedOutputStream codedOutputStream; @@ -58,7 +64,7 @@ static class CodedOutputStreamWrapper implements CodedOutputStreamHolder { } @AllArgsConstructor - class StreamManager extends OrderedStreamLifecyleManager { + class StreamManager extends OrderedStreamLifecyleManager { String connectionId; @Override @@ -68,10 +74,10 @@ public CodedOutputStreamWrapper createStream() { } @Override - public CompletableFuture + public CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) { if (!(outputStreamHolder instanceof CodedOutputStreamWrapper)) { - throw new RuntimeException("Unknown outputStreamHolder sent back to StreamManager: " + + throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamWrapper) outputStreamHolder; @@ -80,17 +86,17 @@ public CodedOutputStreamWrapper createStream() { try { String recordId = String.format("%s.%d", connectionId, index); var byteBuffer = osh.byteBuffer; - ProducerRecord record = new ProducerRecord<>(topicNameForTraffic, recordId, + ProducerRecord kafkaRecord = new ProducerRecord<>(topicNameForTraffic, recordId, Arrays.copyOfRange(byteBuffer.array(), 0, byteBuffer.position())); // Used to essentially wrap Future returned by Producer to CompletableFuture - CompletableFuture cf = new CompletableFuture<>(); + var cf = new CompletableFuture(); log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic); // Async request to Kafka cluster - producer.send(record, handleProducerRecordSent(cf, recordId)); + producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId)); metricsLogger.atSuccess() .addKeyValue("channelId", connectionId) .addKeyValue("topicName", topicNameForTraffic) - .addKeyValue("sizeInBytes", record.value().length) + .addKeyValue("sizeInBytes", kafkaRecord.value().length) .addKeyValue("diagnosticId", recordId) .setMessage("Sent message to Kafka").log(); return cf; @@ -99,38 +105,32 @@ public CodedOutputStreamWrapper createStream() { .addKeyValue("channelId", connectionId) .addKeyValue("topicName", topicNameForTraffic) .setMessage("Sending message to Kafka failed.").log(); - throw new RuntimeException(e); + throw e; } } - } - - @Override - public IChannelConnectionCaptureSerializer createOffloader(String connectionId) { - return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId)); - } - /** - * The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the - * documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html - * "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE, - * and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries." - * - * Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be - * retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html - * as well as basic retry backoff - */ - private Callback handleProducerRecordSent(CompletableFuture cf, String recordId) { - return (metadata, exception) -> { - if (exception != null) { - log.error("Error sending producer record: {}", recordId, exception); - cf.completeExceptionally(exception); - } - else { - log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}", - recordId, metadata.topic(), metadata.partition()); - cf.complete(metadata); - } - }; + /** + * The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the + * documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html + * "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE, + * and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries." + *

+ * Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be + * retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html + * as well as basic retry backoff + */ + private Callback handleProducerRecordSent(CompletableFuture cf, String recordId) { + return (metadata, exception) -> { + if (exception != null) { + log.error("Error sending producer record: {}", recordId, exception); + cf.completeExceptionally(exception); + } else { + log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}", + recordId, metadata.topic(), metadata.partition()); + cf.complete(metadata); + } + }; + } } } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java index 535859bcd..e6335a238 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java @@ -2,7 +2,6 @@ import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; -import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; @@ -14,6 +13,11 @@ */ public class CodedOutputStreamSizeUtil { + /** + * Static class + */ + private CodedOutputStreamSizeUtil() {} + public static int getSizeOfTimestamp(Instant t) { long seconds = t.getEpochSecond(); int nanos = t.getNano(); diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java index b1dd22341..c05abb270 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/FileConnectionCaptureFactory.java @@ -1,14 +1,11 @@ package org.opensearch.migrations.trafficcapture; -import com.google.protobuf.CodedOutputStream; import lombok.AllArgsConstructor; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -22,8 +19,8 @@ * @deprecated - This class is NOT meant to be used for production. */ @Slf4j -@Deprecated -public class FileConnectionCaptureFactory implements IConnectionCaptureFactory { +@Deprecated(since="0.1", forRemoval = false) +public class FileConnectionCaptureFactory implements IConnectionCaptureFactory { private final BiFunction outputStreamCreator; private String nodeId; private final int bufferSize; @@ -51,7 +48,7 @@ public FileConnectionCaptureFactory(String nodeId, String path, int bufferSize) } @AllArgsConstructor - class StreamManager extends OrderedStreamLifecyleManager { + class StreamManager extends OrderedStreamLifecyleManager { String connectionId; @Override public CodedOutputStreamAndByteBufferWrapper createStream() { @@ -59,10 +56,10 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { } @Override - public CompletableFuture + public CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) { if (!(outputStreamHolder instanceof CodedOutputStreamAndByteBufferWrapper)) { - throw new RuntimeException("Unknown outputStreamHolder sent back to StreamManager: " + + throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; @@ -74,7 +71,6 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { fs.write(filledBytes); fs.flush(); log.warn("NOT removing the CodedOutputStream from the WeakHashMap, which is a memory leak. Doing this until the system knows when to properly flush buffers"); - //codedStreamToFileStreamMap.remove(stream); } catch (IOException e) { throw new RuntimeException(e); } @@ -84,6 +80,6 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { @Override public IChannelConnectionCaptureSerializer createOffloader(String connectionId) { - return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId)); + return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId)); } } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java index 959a614c4..e60a14f2c 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java @@ -7,7 +7,7 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; -public interface IChannelConnectionCaptureListener { +public interface IChannelConnectionCaptureListener { default void addBindEvent(Instant timestamp, SocketAddress addr) throws IOException { } @@ -75,7 +75,7 @@ default void addEndOfHeadersIndicator(int characterIndex) throws IOException { default void commitEndOfHttpMessageIndicator(Instant timestamp) throws IOException { } - default CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { + default CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { return CompletableFuture.completedFuture(null); } } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureOffloader.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureOffloader.java index f42c66452..738f4ad45 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureOffloader.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureOffloader.java @@ -1,4 +1,4 @@ package org.opensearch.migrations.trafficcapture; -public interface IChannelConnectionCaptureOffloader extends IChannelConnectionCaptureListener { +public interface IChannelConnectionCaptureOffloader extends IChannelConnectionCaptureListener { } \ No newline at end of file diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureSerializer.java index c7282c178..4b63ab75b 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureSerializer.java @@ -1,4 +1,4 @@ package org.opensearch.migrations.trafficcapture; -public interface IChannelConnectionCaptureSerializer extends IChannelConnectionCaptureListener { +public interface IChannelConnectionCaptureSerializer extends IChannelConnectionCaptureListener { } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IConnectionCaptureFactory.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IConnectionCaptureFactory.java index aa4de0cc2..9f5ec26c0 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IConnectionCaptureFactory.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IConnectionCaptureFactory.java @@ -2,6 +2,6 @@ import java.io.IOException; -public interface IConnectionCaptureFactory { - IChannelConnectionCaptureSerializer createOffloader(String connectionId) throws IOException; +public interface IConnectionCaptureFactory { + IChannelConnectionCaptureSerializer createOffloader(String connectionId) throws IOException; } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/OrderedStreamLifecyleManager.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/OrderedStreamLifecyleManager.java index 8f0f674e6..d585c1ff4 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/OrderedStreamLifecyleManager.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/OrderedStreamLifecyleManager.java @@ -2,16 +2,14 @@ import java.util.concurrent.CompletableFuture; -public abstract class OrderedStreamLifecyleManager implements StreamLifecycleManager { - CompletableFuture futureForLastClose = CompletableFuture.completedFuture(null); +public abstract class OrderedStreamLifecyleManager implements StreamLifecycleManager { + CompletableFuture futureForLastClose = CompletableFuture.completedFuture(null); - public abstract CodedOutputStreamHolder createStream(); - - public CompletableFuture closeStream(CodedOutputStreamHolder outputStreamHolder, int index) { + public CompletableFuture closeStream(CodedOutputStreamHolder outputStreamHolder, int index) { futureForLastClose = futureForLastClose.thenCompose(v -> kickoffCloseStream(outputStreamHolder, index)); return futureForLastClose; } - protected abstract CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, + protected abstract CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index); } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index d7c269131..a35a68c26 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -59,9 +59,9 @@ * 3: 1 */ @Slf4j -public class StreamChannelConnectionCaptureSerializer implements IChannelConnectionCaptureSerializer, Closeable { +public class StreamChannelConnectionCaptureSerializer implements IChannelConnectionCaptureSerializer { - private final static int MAX_ID_SIZE = 96; + private static final int MAX_ID_SIZE = 96; private boolean readObservationsAreWaitingForEom; private int eomsSoFar; @@ -69,13 +69,13 @@ public class StreamChannelConnectionCaptureSerializer implements IChannelConnect private int firstLineByteLength = -1; private int headersByteLength = -1; - private final StreamLifecycleManager streamManager; + private final StreamLifecycleManager streamManager; private final String nodeIdString; private final String connectionIdString; private CodedOutputStreamHolder currentCodedOutputStreamHolderOrNull; public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId, - @NonNull StreamLifecycleManager streamLifecycleManager) { + @NonNull StreamLifecycleManager streamLifecycleManager) { this.streamManager = streamLifecycleManager; assert (nodeId == null ? 0 : CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, nodeId)) + CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionId) @@ -173,7 +173,7 @@ private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOE } @Override - public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { + public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { if (currentCodedOutputStreamHolderOrNull == null && !isFinal) { return CompletableFuture.completedFuture(null); } @@ -190,40 +190,19 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) thro return future; } - /** - * This call is BLOCKING. Override the Closeable interface - not addCloseEvent. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - try { - flushCommitAndResetStream(true).get(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - - private TrafficObservation.Builder getTrafficObservationBuilder() { - return TrafficObservation.newBuilder(); - } - @Override public void addBindEvent(Instant timestamp, SocketAddress addr) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addConnectEvent(Instant timestamp, SocketAddress remote, SocketAddress local) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addDisconnectEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override @@ -234,11 +213,7 @@ public void addCloseEvent(Instant timestamp) throws IOException { @Override public void addDeregisterEvent(Instant timestamp) throws IOException { - - } - - static abstract class BufRangeConsumer { - abstract void accept(byte[] buff, int offset, int len); + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } private void addStringMessage(int captureFieldNumber, int dataFieldNumber, @@ -247,7 +222,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, int lengthSize = 1; if (str.length() > 0) { dataSize = CodedOutputStream.computeStringSize(dataFieldNumber, str); - lengthSize = getOrCreateCodedOutputStream().computeInt32SizeNoTag(dataSize); + lengthSize = CodedOutputStream.computeInt32SizeNoTag(dataSize); } beginSubstreamObservation(timestamp, captureFieldNumber, dataSize + lengthSize); // e.g. 4 { @@ -260,7 +235,8 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buffer) throws IOException { var byteBuffer = buffer.nioBuffer(); - int segmentFieldNumber,segmentCountFieldNumber,segmentDataFieldNumber; + int segmentFieldNumber; + int segmentDataFieldNumber; if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) { segmentFieldNumber = TrafficObservation.READSEGMENT_FIELD_NUMBER; segmentDataFieldNumber = ReadSegmentObservation.DATA_FIELD_NUMBER; @@ -360,47 +336,47 @@ public void addWriteEvent(Instant timestamp, ByteBuf buffer) throws IOException @Override public void addFlushEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelRegisteredEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelUnregisteredEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelActiveEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelInactiveEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelReadEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelReadCompleteEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addUserEventTriggeredEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override public void addChannelWritabilityChangedEvent(Instant timestamp) throws IOException { - + // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation } @Override diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java index 418e82103..18db43cc4 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamLifecycleManager.java @@ -2,8 +2,8 @@ import java.util.concurrent.CompletableFuture; -public interface StreamLifecycleManager { +public interface StreamLifecycleManager { CodedOutputStreamHolder createStream(); - CompletableFuture closeStream(CodedOutputStreamHolder outputStreamHolder, int index); + CompletableFuture closeStream(CodedOutputStreamHolder outputStreamHolder, int index); } diff --git a/TrafficCapture/captureOffloader/src/testFixtures/java/org/opensearch/migrations/trafficcapture/InMemoryConnectionCaptureFactory.java b/TrafficCapture/captureOffloader/src/testFixtures/java/org/opensearch/migrations/trafficcapture/InMemoryConnectionCaptureFactory.java index a46c5ab3f..853de3421 100644 --- a/TrafficCapture/captureOffloader/src/testFixtures/java/org/opensearch/migrations/trafficcapture/InMemoryConnectionCaptureFactory.java +++ b/TrafficCapture/captureOffloader/src/testFixtures/java/org/opensearch/migrations/trafficcapture/InMemoryConnectionCaptureFactory.java @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Stream; -public class InMemoryConnectionCaptureFactory implements IConnectionCaptureFactory { +public class InMemoryConnectionCaptureFactory implements IConnectionCaptureFactory { private final int bufferSize; private final String nodeId; @@ -35,16 +35,16 @@ public InMemoryConnectionCaptureFactory(String nodeId, int bufferSize, Runnable } @AllArgsConstructor - class StreamManager extends OrderedStreamLifecyleManager { + class StreamManager extends OrderedStreamLifecyleManager { @Override public CodedOutputStreamHolder createStream() { return new CodedOutputStreamAndByteBufferWrapper(bufferSize); } @Override - protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) { + protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) { if (!(outputStreamHolder instanceof CodedOutputStreamAndByteBufferWrapper)) { - throw new RuntimeException("Unknown outputStreamHolder sent back to StreamManager: " + + throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; @@ -59,9 +59,9 @@ protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder o } @Override - public IChannelConnectionCaptureSerializer createOffloader(String connectionId) throws IOException { + public IChannelConnectionCaptureSerializer createOffloader(String connectionId) throws IOException { // This array is only an indirection to work around Java's constraint that lambda values are final - return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager()); + return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager()); } public Stream getRecordedTrafficStreamsStream() { diff --git a/TrafficCapture/nettyWireLogging/build.gradle b/TrafficCapture/nettyWireLogging/build.gradle index efbea073a..3a9a1b973 100644 --- a/TrafficCapture/nettyWireLogging/build.gradle +++ b/TrafficCapture/nettyWireLogging/build.gradle @@ -15,7 +15,6 @@ dependencies { implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' testImplementation project(':captureProtobufs') - testImplementation project(':testUtilities') testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' @@ -24,4 +23,6 @@ dependencies { testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0' testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0' + + testImplementation testFixtures(project(path: ':testUtilities')) } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java index 48bca26e3..03b5a7de9 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java @@ -9,10 +9,10 @@ import java.util.function.Predicate; @Slf4j -public class ConditionallyReliableLoggingHttpRequestHandler extends LoggingHttpRequestHandler { +public class ConditionallyReliableLoggingHttpRequestHandler extends LoggingHttpRequestHandler { private final Predicate shouldBlockPredicate; - public ConditionallyReliableLoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader, + public ConditionallyReliableLoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader, Predicate headerPredicateForWhenToBlock) { super(trafficOffloader); this.shouldBlockPredicate = headerPredicateForWhenToBlock; diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java index 95dc430ba..8877bc96c 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java @@ -22,7 +22,7 @@ import java.time.Instant; @Slf4j -public class LoggingHttpRequestHandler extends ChannelInboundHandlerAdapter { +public class LoggingHttpRequestHandler extends ChannelInboundHandlerAdapter { private static final MetricsLogger metricsLogger = new MetricsLogger("LoggingHttpRequestHandler"); static class SimpleHttpRequestDecoder extends HttpRequestDecoder { @@ -66,13 +66,13 @@ public HttpRequest resetCurrentRequest() { } } - protected final IChannelConnectionCaptureSerializer trafficOffloader; + protected final IChannelConnectionCaptureSerializer trafficOffloader; protected final EmbeddedChannel httpDecoderChannel; protected final SimpleHttpRequestDecoder requestDecoder; - public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader) { + public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader) { this.trafficOffloader = trafficOffloader; requestDecoder = new SimpleHttpRequestDecoder(); // as a field for easier debugging httpDecoderChannel = new EmbeddedChannel( @@ -114,6 +114,21 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }); } + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + trafficOffloader.flushCommitAndResetStream(true).whenComplete((result, t) -> { + if (t != null) { + log.warn("Got error: " + t.getMessage()); + } + try { + super.channelUnregistered(ctx); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + super.handlerRemoved(ctx); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java index 8135515b8..5a318efb1 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpResponseHandler.java @@ -67,6 +67,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); } + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + flush(ctx); + super.handlerRemoved(ctx); + } + @Override public void flush(ChannelHandlerContext ctx) throws Exception { super.flush(ctx); diff --git a/TrafficCapture/testUtilities/build.gradle b/TrafficCapture/testUtilities/build.gradle index 590999fce..50d190ba0 100644 --- a/TrafficCapture/testUtilities/build.gradle +++ b/TrafficCapture/testUtilities/build.gradle @@ -24,6 +24,7 @@ plugins { // id 'checkstyle' id 'org.owasp.dependencycheck' version '8.2.1' id "io.freefair.lombok" version "8.0.1" + id 'java-test-fixtures' } //spotbugs { @@ -44,15 +45,14 @@ repositories { dependencies { // spotbugs 'com.github.spotbugs:spotbugs:4.7.3' - implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.15.0' - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' - implementation group: 'io.netty', name: 'netty-all', version: '4.1.94.Final' - implementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' - implementation group: 'org.bouncycastle', name: 'bcprov-jdk18on', version: '1.74' - implementation group: 'org.bouncycastle', name: 'bcpkix-jdk18on', version: '1.74' - implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.22' - implementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.9.3' - implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' + testFixturesImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.15.0' + testFixturesImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + testFixturesImplementation group: 'io.netty', name: 'netty-all', version: '4.1.94.Final' + testFixturesImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' + testFixturesImplementation group: 'org.bouncycastle', name: 'bcprov-jdk18on', version: '1.74' + testFixturesImplementation group: 'org.bouncycastle', name: 'bcpkix-jdk18on', version: '1.74' + testFixturesImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api', version: '5.9.3' + testFixturesImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0' diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/HeapDumper.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/HeapDumper.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/HeapDumper.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/HeapDumper.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/HttpFirstLine.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/HttpFirstLine.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/HttpFirstLine.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/HttpFirstLine.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/NettyLeakCheckTestExtension.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/PortFinder.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/PortFinder.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SelfSignedSSLContextBuilder.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SelfSignedSSLContextBuilder.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SelfSignedSSLContextBuilder.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SelfSignedSSLContextBuilder.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpResponse.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpResponse.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpResponse.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpResponse.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpServer.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpServer.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleHttpServer.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpServer.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleNettyHttpServer.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleNettyHttpServer.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/SimpleNettyHttpServer.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleNettyHttpServer.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/TestUtilities.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/TestUtilities.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/TestUtilities.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/TestUtilities.java diff --git a/TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java similarity index 100% rename from TrafficCapture/testUtilities/src/main/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java rename to TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/WrapWithNettyLeakDetection.java diff --git a/TrafficCapture/trafficCaptureProxyServer/build.gradle b/TrafficCapture/trafficCaptureProxyServer/build.gradle index 76f4c27b2..4955e20fe 100644 --- a/TrafficCapture/trafficCaptureProxyServer/build.gradle +++ b/TrafficCapture/trafficCaptureProxyServer/build.gradle @@ -40,7 +40,7 @@ dependencies { implementation 'com.google.protobuf:protobuf-java:3.22.2' testImplementation project(':captureProtobufs') - testImplementation project(path: ':testUtilities') + testImplementation testFixtures(project(path: ':testUtilities')) testImplementation testFixtures(project(path: ':captureOffloader')) } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index ebce40c14..526916550 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -194,8 +194,8 @@ static Properties buildKafkaProperties(Parameters params) throws IOException { kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); if (params.kafkaPropertiesFile != null) { - try { - kafkaProps.load(new FileReader(params.kafkaPropertiesFile)); + try (var fileReader = new FileReader(params.kafkaPropertiesFile)) { + kafkaProps.load(fileReader); } catch (IOException e) { log.error("Unable to locate provided Kafka producer properties file path: " + params.kafkaPropertiesFile); throw e; @@ -294,8 +294,7 @@ public static void main(String[] args) throws InterruptedException, IOException } }).orElse(null), getConnectionCaptureFactory(params)); } catch (Exception e) { - System.err.println(e); - e.printStackTrace(); + log.atError().setCause(e).setMessage("Caught exception while setting up the server and rethrowing").log(); throw e; } Runtime.getRuntime().addShutdownHook(new Thread(() -> { @@ -310,12 +309,6 @@ public static void main(String[] args) throws InterruptedException, IOException })); // This loop just gives the main() function something to do while the netty event loops // work in the background. - while (true) { - // TODO: The kill signal will cause the sleep to throw an InterruptedException, - // which may not be what we want to do - it seems like returning an exit code of 0 - // might make more sense. This is something to research - e.g. by seeing if there's - // specific behavior that POSIX recommends/requires. - Thread.sleep(60*1000); - } + proxy.waitForClose(); } } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java index 74e8d60c9..91bcd3b73 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/BacksideHandler.java @@ -49,7 +49,7 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + log.atError().setCause(cause).setMessage("Caught error").log(); String channelId = ctx.channel().id().asLongText(); FrontsideHandler.closeAndFlush(ctx.channel()); metricsLogger.atError(cause) diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java index c2fadcca5..c775c76cf 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/FrontsideHandler.java @@ -70,7 +70,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + log.atError().setCause(cause).setMessage("Caught error").log(); ctx.close(); } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java index 16b8e0a08..2dec0f550 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java @@ -58,4 +58,8 @@ public void stop() throws InterruptedException { bossGroup.shutdownGracefully(); } } + + public void waitForClose() throws InterruptedException { + mainChannel.closeFuture().sync(); + } } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java index 98e207c5b..9db246bee 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java @@ -13,7 +13,7 @@ import java.io.IOException; import java.util.function.Supplier; -public class ProxyChannelInitializer extends ChannelInitializer { +public class ProxyChannelInitializer extends ChannelInitializer { private final IConnectionCaptureFactory connectionCaptureFactory; private final Supplier sslEngineProvider; @@ -30,7 +30,8 @@ public boolean shouldGuaranteeMessageOffloading(HttpRequest httpRequest) { return (httpRequest != null && (httpRequest.method().equals(HttpMethod.POST) || httpRequest.method().equals(HttpMethod.PUT) || - httpRequest.method().equals(HttpMethod.DELETE))); + httpRequest.method().equals(HttpMethod.DELETE) || + httpRequest.method().equals(HttpMethod.PATCH))); } @Override @@ -42,7 +43,7 @@ protected void initChannel(SocketChannel ch) throws IOException { var offloader = connectionCaptureFactory.createOffloader(ch.id().asLongText()); ch.pipeline().addLast(new LoggingHttpResponseHandler(offloader)); - ch.pipeline().addLast(new ConditionallyReliableLoggingHttpRequestHandler(offloader, + ch.pipeline().addLast(new ConditionallyReliableLoggingHttpRequestHandler(offloader, this::shouldGuaranteeMessageOffloading)); ch.pipeline().addLast(new FrontsideHandler(backsideConnectionPool)); } diff --git a/TrafficCapture/trafficReplayer/build.gradle b/TrafficCapture/trafficReplayer/build.gradle index 1805a6fb5..190b3c779 100644 --- a/TrafficCapture/trafficReplayer/build.gradle +++ b/TrafficCapture/trafficReplayer/build.gradle @@ -58,9 +58,10 @@ dependencies { implementation group: 'software.amazon.awssdk', name: 'auth', version: '2.20.102' implementation group: 'software.amazon.awssdk', name: 'secretsmanager', version: '2.20.127' - testImplementation project(':testUtilities') + testImplementation project(':captureOffloader') testImplementation testFixtures(project(path: ':captureOffloader')) + testImplementation testFixtures(project(path: ':testUtilities')) testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api', version:'5.x.x' diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonMessageWithFaultingPayload.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonMessageWithFaultingPayload.java index d39722fcf..ef050606d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonMessageWithFaultingPayload.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonMessageWithFaultingPayload.java @@ -4,15 +4,14 @@ import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; public class HttpJsonMessageWithFaultingPayload extends LinkedHashMap implements IHttpMessage { - public final static String METHOD = "method"; - public final static String URI = "URI"; - public final static String PROTOCOL = "protocol"; - public final static String HEADERS = "headers"; - public final static String PAYLOAD = "payload"; + public final static String METHOD_KEY = "method"; + public final static String URI_KEY = "URI"; + public final static String PROTOCOL_KEY = "protocol"; + public final static String HEADERS_KEY = "headers"; + public final static String PAYLOAD_KEY = "payload"; public HttpJsonMessageWithFaultingPayload() { } @@ -23,25 +22,25 @@ public HttpJsonMessageWithFaultingPayload(Map m) { @Override public String method() { - return (String) this.get(METHOD); + return (String) this.get(METHOD_KEY); } public void setMethod(String value) { - this.put(METHOD, value); + this.put(METHOD_KEY, value); } @Override public String path() { - return (String) this.get(URI); + return (String) this.get(URI_KEY); } public void setPath(String value) { - this.put(URI, value); + this.put(URI_KEY, value); } @Override public String protocol() { - return (String) this.get(PROTOCOL); + return (String) this.get(PROTOCOL_KEY); } public void setProtocol(String value) { - this.put(PROTOCOL, value); + this.put(PROTOCOL_KEY, value); } @@ -51,15 +50,15 @@ public Map headersMap() { } public ListKeyAdaptingCaseInsensitiveHeadersMap headers() { - return (ListKeyAdaptingCaseInsensitiveHeadersMap) this.get(HEADERS); + return (ListKeyAdaptingCaseInsensitiveHeadersMap) this.get(HEADERS_KEY); } public void setHeaders(ListKeyAdaptingCaseInsensitiveHeadersMap value) { - this.put(HEADERS, value); + this.put(HEADERS_KEY, value); } public Map payload() { - return (Map) this.get(PAYLOAD); + return (Map) this.get(PAYLOAD_KEY); } public void setPayloadFaultMap(PayloadAccessFaultingMap value) { - this.put(PAYLOAD, value); + this.put(PAYLOAD_KEY, value); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java index 71820716b..55730ea7d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java @@ -37,7 +37,7 @@ public Object transformJson(Object incomingJson) { } private Object transformHttpMessage(Map httpMsg) { - var incomingMethod = httpMsg.get(HttpJsonMessageWithFaultingPayload.METHOD); + var incomingMethod = httpMsg.get(HttpJsonMessageWithFaultingPayload.METHOD_KEY); if ("GET".equals(incomingMethod)) { processGet(httpMsg); } else if ("PUT".equals(incomingMethod)) { @@ -47,30 +47,30 @@ private Object transformHttpMessage(Map httpMsg) { } private void processGet(Map httpMsg) { - var incomingUri = (String) httpMsg.get(HttpJsonMessageWithFaultingPayload.URI); + var incomingUri = (String) httpMsg.get(HttpJsonMessageWithFaultingPayload.URI_KEY); var matchedUri = TYPED_OPERATION_URI_PATTERN_WITH_SIDE_CAPTURES.matcher(incomingUri); if (matchedUri.matches()) { var operationStr = matchedUri.group(2); if (operationStr.equals(SEARCH_URI_COMPONENT)) { - httpMsg.put(HttpJsonMessageWithFaultingPayload.URI, matchedUri.group(1) + operationStr); + httpMsg.put(HttpJsonMessageWithFaultingPayload.URI_KEY, matchedUri.group(1) + operationStr); } } } private void processPut(Map httpMsg) { - final var uriStr = (String) httpMsg.get(HttpJsonMessageWithFaultingPayload.URI); + final var uriStr = (String) httpMsg.get(HttpJsonMessageWithFaultingPayload.URI_KEY); var matchedTriple = TYPED_OPERATION_URI_PATTERN_WITH_SIDE_CAPTURES.matcher(uriStr); if (matchedTriple.matches()) { // TODO: Add support for multiple type mappings per index (something possible with // versions before ES7) - httpMsg.put(HttpJsonMessageWithFaultingPayload.URI, + httpMsg.put(HttpJsonMessageWithFaultingPayload.URI_KEY, matchedTriple.group(1) + DOC_URI_COMPONENT + matchedTriple.group(2)); return; } var matchedSingle = SINGLE_LEVEL_OPERATION_PATTERN_WITH_CAPTURE.matcher(uriStr); if (matchedSingle.matches()) { var topPayloadElement = - (Map) ((Map) httpMsg.get(HttpJsonMessageWithFaultingPayload.PAYLOAD)) + (Map) ((Map) httpMsg.get(HttpJsonMessageWithFaultingPayload.PAYLOAD_KEY)) .get(PayloadAccessFaultingMap.INLINED_JSON_BODY_DOCUMENT_KEY); var mappingsValue = (Map) topPayloadElement.get(MAPPINGS_KEYNAME); if (mappingsValue != null) {