Skip to content

Commit

Permalink
Merge pull request #328 from gregschohn/KafkaSimpleCommit
Browse files Browse the repository at this point in the history
SImple Manual Kafka Commit

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Sep 27, 2023
2 parents 78c558b + 8cb4a17 commit bad8c09
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 12 deletions.
5 changes: 5 additions & 0 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ dependencies {

testImplementation project(':testUtilities')
testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1'

testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.0'
testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.0'
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.0'

testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ public static KafkaProtobufConsumer buildKafkaConsumer(@NonNull String brokers,
return new KafkaProtobufConsumer(new KafkaConsumer<>(kafkaProps), topic, behavioralPolicy);
}

public static Properties buildKafkaProperties(@NonNull String brokers, @NonNull String groupId, boolean enableMSKAuth,
String propertyFilePath) throws IOException {
public static Properties buildKafkaProperties(@NonNull String brokers,
@NonNull String groupId,
boolean enableMSKAuth,
String propertyFilePath) throws IOException {
var kafkaProps = new Properties();
kafkaProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (propertyFilePath != null) {
try (InputStream input = new FileInputStream(propertyFilePath)) {
Expand Down Expand Up @@ -96,6 +99,7 @@ public List<TrafficStream> readNextTrafficStreamSynchronously() {
ConsumerRecords<String, byte[]> records;
try {
records = kafkaConsumer.poll(CONSUMER_POLL_TIMEOUT);
log.info("Kafka consumer poll has fetched {} records", records.count());
} catch (RuntimeException e) {
log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. Swallowing and awaiting next " +
"metadata refresh to try again.").addArgument(topic).log();
Expand Down Expand Up @@ -123,6 +127,7 @@ public List<TrafficStream> readNextTrafficStreamSynchronously() {
return null;
}
}).filter(Objects::nonNull);
kafkaConsumer.commitSync();
return trafficStream.collect(Collectors.toList());
} catch (Exception e) {
log.error("Terminating Kafka traffic stream");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ public void onNewDataArrivingAfterItsAccumulationHasBeenExpired(
"the Accumulation was expired. This indicates that the minimumGuaranteedLifetime " +
"must be set to at least " + Duration.between(lastPacketTimestamp, endOfWindow) +
". The beginning of the valid time window is currently " + endOfWindow +
" for (" + partitionId + "," + connectionId + ") and the last timestamp of the " +
"Accumulation object that was being assembled was");
" for (" + partitionId + "," + connectionId + ")");
}

public void onNewDataArrivingWithATimestampThatIsAlreadyExpired(
Expand All @@ -41,8 +40,7 @@ public void onNewDataArrivingWithATimestampThatIsAlreadyExpired(
"the Accumulation was expired. This indicates that the minimumGuaranteedLifetime " +
"must be set to at least " + Duration.between(timestamp, endOfWindow) +
". The beginning of the valid time window is currently " + endOfWindow +
" for (" + partitionId + "," + connectionId + ") and the last timestamp of the " +
"Accumulation object that was being assembled was");
" for (" + partitionId + "," + connectionId + ")");
}

public boolean shouldRetryAfterAccumulationTimestampRaceDetected(String partitionId, String connectionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ logger.MetricsLogger.appenderRef.metrics.ref = METRICS
appender.debug_out.type = RollingFile
appender.debug_out.name = ReplayerLogFile
appender.debug_out.fileName = logs/replayer.log
appender.debug_out.filePattern = logs/$${date:yyyy-MM}/replayer-%d{MM-dd-yyyy}-%i.log.gz
appender.debug_out.filePattern = logs/$${date:yyyy-MM}/replayer-%d{yyyy-MM-dd-HH-mm}-%i.log.gz
appender.debug_out.layout.type = PatternLayout
appender.debug_out.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n
appender.debug_out.policies.type = Policies
appender.debug_out.policies.time.type = TimeBasedTriggeringPolicy
appender.debug_out.policies.time.interval = 1
appender.debug_out.policies.time.interval = 15
appender.debug_out.policies.time.modulate = true
appender.debug_out.strategy.type = DefaultRolloverStrategy
appender.debug_out.strategy.max = 72
appender.debug_out.strategy.max = 288

rootLogger.appenderRef.logfile.ref = ReplayerLogFile

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package org.opensearch.migrations.replay.kafka;

import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Testcontainers
public class KafkaProtobufConsumerLongTermTest {

public static final String TEST_GROUP_CONSUMER_ID = "TEST_GROUP_CONSUMER_ID";
public static final String TEST_GROUP_PRODUCER_ID = "TEST_GROUP_PRODUCER_ID";
public static final int TEST_RECORD_COUNT = 10;
public static final String TEST_NODE_ID = "TestNodeId";
public static final String TEST_TRAFFIC_STREAM_ID_STRING = "TEST_TRAFFIC_STREAM_ID_STRING";
private static final String FAKE_READ_PACKET_DATA = "Fake pa";
public static final int PRODUCER_SLEEP_INTERVAL_MS = 100;
@Container
// see https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
private KafkaContainer embeddedKafkaBroker =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));;

Producer<String, byte[]> buildKafkaProducer() {
var kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// Property details: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#delivery-timeout-ms
kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000);
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
kafkaProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
kafkaProps.put(ProducerConfig.CLIENT_ID_CONFIG, TEST_GROUP_PRODUCER_ID);
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBootstrapServers());
try {
return new KafkaProducer(kafkaProps);
} catch (Exception e) {
log.error("checkme");
System.exit(1);
throw e;
}

}

TrafficStream makeTestTrafficStream(Instant t, int i) {
var timestamp = Timestamp.newBuilder()
.setSeconds(t.getEpochSecond())
.setNanos(t.getNano())
.build();
var tsb = TrafficStream.newBuilder()
.setNumber(i);
// TODO - add something for setNumberOfThisLastChunk. There's no point in doing that now though
// because the code doesn't make any distinction between the very last one and the previous ones
return tsb.setNodeId(TEST_NODE_ID)
.setConnectionId(getConnectionId(i))
.addSubStream(TrafficObservation.newBuilder().setTs(timestamp)
.setRead(ReadObservation.newBuilder()
.setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)))
.build())
.build()).build();

}

private String getConnectionId(int i) {
return TEST_TRAFFIC_STREAM_ID_STRING + "_" + i;
}

@Test
public void testTrafficCaptureSource() throws Exception {
String testTopicName = "TEST_TOPIC";

var kafkaConsumerProps = KafkaProtobufConsumer.buildKafkaProperties(embeddedKafkaBroker.getBootstrapServers(),
TEST_GROUP_CONSUMER_ID, false, null);
kafkaConsumerProps.setProperty("max.poll.interval.ms", "10000");
var kafkaConsumer = new KafkaConsumer<String,byte[]>(kafkaConsumerProps);
var kafkaTrafficCaptureSource = new KafkaProtobufConsumer(kafkaConsumer, testTopicName, null);

var kafkaProducer = buildKafkaProducer();
var sendCompleteCount = new AtomicInteger(0);
var scheduledIterationsCount = new AtomicInteger(0);
var executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(()->{
var i = scheduledIterationsCount.getAndIncrement();
if (i >= TEST_RECORD_COUNT) {
executor.shutdown();
} else {
produceKafkaRecord(testTopicName, kafkaProducer, i, sendCompleteCount);
}
}, 0, PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS);

for (int i=0; i<TEST_RECORD_COUNT; ) {
Thread.sleep(getSleepAmountMsForProducerRun(i));
var nextChunkFuture = kafkaTrafficCaptureSource.readNextTrafficStreamChunk();
var recordsList = nextChunkFuture.get((2+ TEST_RECORD_COUNT)*PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS);
for (int j=0; j<recordsList.size(); ++j) {
Assertions.assertEquals(getConnectionId(i+j), recordsList.get(j).getConnectionId());
}
log.info("Got "+recordsList.size()+" records and already had " + i);
i += recordsList.size();
}
Assertions.assertEquals(TEST_RECORD_COUNT, sendCompleteCount.get());
Assertions.assertThrows(TimeoutException.class, ()-> {
var rogueChunk = kafkaTrafficCaptureSource.readNextTrafficStreamChunk().get(1, TimeUnit.SECONDS);
if (rogueChunk.size() == 0) {
// TimeoutExceptions cannot be thrown by the supplier of the CompletableFuture today, BUT we
// could long-poll on the broker for longer than the timeout value supplied in the get() call above
throw new TimeoutException("read actually returned 0 items, but transforming this to a " +
"TimeoutException because either result would be valid.");
}
log.error("rogue chunk: "+ rogueChunk);
});
}

private long getSleepAmountMsForProducerRun(int i) {
return 1*1000;
}

private void produceKafkaRecord(String testTopicName, Producer<String, byte[]> kafkaProducer, int i,
AtomicInteger sendCompleteCount) {
var trafficStream = makeTestTrafficStream(Instant.now(), i);
var record = new ProducerRecord(testTopicName, makeKey(i), trafficStream.toByteArray());
var sendFuture = kafkaProducer.send(record, (metadata, exception) -> {
sendCompleteCount.incrementAndGet();
});
}

@NotNull
private static String makeKey(int i) {
return "KEY_" + i;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ public void testSupplyTrafficWithUnformattedMessages() {
@Test
public void testBuildPropertiesBaseCase() throws IOException {
Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", false, null);
Assertions.assertEquals(5, props.size());
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
Expand All @@ -138,7 +137,6 @@ public void testBuildPropertiesBaseCase() throws IOException {
@Test
public void testBuildPropertiesMSKAuthEnabled() throws IOException {
Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, null);
Assertions.assertEquals(9, props.size());
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
Expand All @@ -154,7 +152,6 @@ public void testBuildPropertiesMSKAuthEnabled() throws IOException {
public void testBuildPropertiesWithProvidedPropertyFile() throws IOException {
File simplePropertiesFile = new File("src/test/resources/kafka/simple-kafka.properties");
Properties props = KafkaProtobufConsumer.buildKafkaProperties("brokers", "groupId", true, simplePropertiesFile.getPath());
Assertions.assertEquals(10, props.size());
Assertions.assertEquals("brokers", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Assertions.assertEquals("org.apache.kafka.common.serialization.StringDeserializer", props.get("key.deserializer"));
Assertions.assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", props.get("value.deserializer"));
Expand Down

0 comments on commit bad8c09

Please sign in to comment.