Skip to content

Commit

Permalink
Merge pull request #375 from gregschohn/LintingChanges
Browse files Browse the repository at this point in the history
Linting changes
  • Loading branch information
gregschohn authored Nov 1, 2023
2 parents 31de502 + b2f2f6c commit f8017c5
Show file tree
Hide file tree
Showing 37 changed files with 161 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
Expand Down Expand Up @@ -47,6 +48,11 @@ public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer, int
this(nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize);
}

@Override
public IChannelConnectionCaptureSerializer createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId));
}

@AllArgsConstructor
static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
private final CodedOutputStream codedOutputStream;
Expand All @@ -58,7 +64,7 @@ static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager {
class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
String connectionId;

@Override
Expand All @@ -68,10 +74,10 @@ public CodedOutputStreamWrapper createStream() {
}

@Override
public CompletableFuture<Object>
public CompletableFuture<RecordMetadata>
kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) {
if (!(outputStreamHolder instanceof CodedOutputStreamWrapper)) {
throw new RuntimeException("Unknown outputStreamHolder sent back to StreamManager: " +
throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " +
outputStreamHolder);
}
var osh = (CodedOutputStreamWrapper) outputStreamHolder;
Expand All @@ -80,17 +86,17 @@ public CodedOutputStreamWrapper createStream() {
try {
String recordId = String.format("%s.%d", connectionId, index);
var byteBuffer = osh.byteBuffer;
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicNameForTraffic, recordId,
ProducerRecord<String, byte[]> kafkaRecord = new ProducerRecord<>(topicNameForTraffic, recordId,
Arrays.copyOfRange(byteBuffer.array(), 0, byteBuffer.position()));
// Used to essentially wrap Future returned by Producer to CompletableFuture
CompletableFuture cf = new CompletableFuture<>();
var cf = new CompletableFuture<RecordMetadata>();
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);
// Async request to Kafka cluster
producer.send(record, handleProducerRecordSent(cf, recordId));
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
metricsLogger.atSuccess()
.addKeyValue("channelId", connectionId)
.addKeyValue("topicName", topicNameForTraffic)
.addKeyValue("sizeInBytes", record.value().length)
.addKeyValue("sizeInBytes", kafkaRecord.value().length)
.addKeyValue("diagnosticId", recordId)
.setMessage("Sent message to Kafka").log();
return cf;
Expand All @@ -99,38 +105,32 @@ public CodedOutputStreamWrapper createStream() {
.addKeyValue("channelId", connectionId)
.addKeyValue("topicName", topicNameForTraffic)
.setMessage("Sending message to Kafka failed.").log();
throw new RuntimeException(e);
throw e;
}
}
}

@Override
public IChannelConnectionCaptureSerializer createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId));
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
*
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture cf, String recordId) {
return (metadata, exception) -> {
if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
}
else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId) {
return (metadata, exception) -> {
if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

Expand All @@ -14,6 +13,11 @@
*/
public class CodedOutputStreamSizeUtil {

/**
* Static class
*/
private CodedOutputStreamSizeUtil() {}

public static int getSizeOfTimestamp(Instant t) {
long seconds = t.getEpochSecond();
int nanos = t.getNano();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package org.opensearch.migrations.trafficcapture;

import com.google.protobuf.CodedOutputStream;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand All @@ -22,8 +19,8 @@
* @deprecated - This class is NOT meant to be used for production.
*/
@Slf4j
@Deprecated
public class FileConnectionCaptureFactory implements IConnectionCaptureFactory {
@Deprecated(since="0.1", forRemoval = false)
public class FileConnectionCaptureFactory implements IConnectionCaptureFactory<Void> {
private final BiFunction<String, Integer, FileOutputStream> outputStreamCreator;
private String nodeId;
private final int bufferSize;
Expand Down Expand Up @@ -51,18 +48,18 @@ public FileConnectionCaptureFactory(String nodeId, String path, int bufferSize)
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager {
class StreamManager extends OrderedStreamLifecyleManager<Void> {
String connectionId;
@Override
public CodedOutputStreamAndByteBufferWrapper createStream() {
return new CodedOutputStreamAndByteBufferWrapper(bufferSize);
}

@Override
public CompletableFuture<Object>
public CompletableFuture<Void>
kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) {
if (!(outputStreamHolder instanceof CodedOutputStreamAndByteBufferWrapper)) {
throw new RuntimeException("Unknown outputStreamHolder sent back to StreamManager: " +
throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " +
outputStreamHolder);
}
var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder;
Expand All @@ -74,7 +71,6 @@ public CodedOutputStreamAndByteBufferWrapper createStream() {
fs.write(filledBytes);
fs.flush();
log.warn("NOT removing the CodedOutputStream from the WeakHashMap, which is a memory leak. Doing this until the system knows when to properly flush buffers");
//codedStreamToFileStreamMap.remove(stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -84,6 +80,6 @@ public CodedOutputStreamAndByteBufferWrapper createStream() {

@Override
public IChannelConnectionCaptureSerializer createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer(nodeId, connectionId, new StreamManager(connectionId));
return new StreamChannelConnectionCaptureSerializer<Void>(nodeId, connectionId, new StreamManager(connectionId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.time.Instant;
import java.util.concurrent.CompletableFuture;

public interface IChannelConnectionCaptureListener {
public interface IChannelConnectionCaptureListener<T> {
default void addBindEvent(Instant timestamp, SocketAddress addr) throws IOException {
}

Expand Down Expand Up @@ -75,7 +75,7 @@ default void addEndOfHeadersIndicator(int characterIndex) throws IOException {
default void commitEndOfHttpMessageIndicator(Instant timestamp) throws IOException {
}

default CompletableFuture<Object> flushCommitAndResetStream(boolean isFinal) throws IOException {
default CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IOException {
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.migrations.trafficcapture;

public interface IChannelConnectionCaptureOffloader extends IChannelConnectionCaptureListener {
public interface IChannelConnectionCaptureOffloader<T> extends IChannelConnectionCaptureListener<T> {
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.opensearch.migrations.trafficcapture;

public interface IChannelConnectionCaptureSerializer extends IChannelConnectionCaptureListener {
public interface IChannelConnectionCaptureSerializer<T> extends IChannelConnectionCaptureListener<T> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

import java.io.IOException;

public interface IConnectionCaptureFactory {
IChannelConnectionCaptureSerializer createOffloader(String connectionId) throws IOException;
public interface IConnectionCaptureFactory<T> {
IChannelConnectionCaptureSerializer<T> createOffloader(String connectionId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

import java.util.concurrent.CompletableFuture;

public abstract class OrderedStreamLifecyleManager implements StreamLifecycleManager {
CompletableFuture<Object> futureForLastClose = CompletableFuture.completedFuture(null);
public abstract class OrderedStreamLifecyleManager<T> implements StreamLifecycleManager<T> {
CompletableFuture<T> futureForLastClose = CompletableFuture.completedFuture(null);

public abstract CodedOutputStreamHolder createStream();

public CompletableFuture<Object> closeStream(CodedOutputStreamHolder outputStreamHolder, int index) {
public CompletableFuture<T> closeStream(CodedOutputStreamHolder outputStreamHolder, int index) {
futureForLastClose = futureForLastClose.thenCompose(v -> kickoffCloseStream(outputStreamHolder, index));
return futureForLastClose;
}

protected abstract CompletableFuture<Object> kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder,
protected abstract CompletableFuture<T> kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder,
int index);
}
Loading

0 comments on commit f8017c5

Please sign in to comment.