Skip to content

Commit

Permalink
chore(test): fix falling test
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 5, 2022
1 parent 7e99277 commit 741b9d5
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.micrometer.core.lang.Nullable;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
Expand Down Expand Up @@ -77,8 +79,9 @@ public class RecordRepository extends AbstractRepository {
@Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}")
protected int maxPollRecords;

@Value("${akhq.topic-data.kafka-max-message-length}")
private int maxKafkaMessageLength;
@Nullable
@Property(name = "akhq.topic-data.kafka-max-message-length")
protected Integer maxKafkaMessageLength;

public Map<String, Record> getLastRecord(String clusterId, List<String> topicsName) throws ExecutionException, InterruptedException {
Map<String, Topic> topics = topicRepository.findByName(clusterId, topicsName).stream()
Expand Down Expand Up @@ -1276,6 +1279,10 @@ private static class EndOffsetBound {
}

private void filterMessageLength(Record record) {
if (maxKafkaMessageLength == null) {
return;
}

int bytesLength = record.getValue().getBytes(StandardCharsets.UTF_8).length;
if (bytesLength > maxKafkaMessageLength) {
int substringChars = maxKafkaMessageLength / 1000;
Expand Down

0 comments on commit 741b9d5

Please sign in to comment.