Skip to content

Commit

Permalink
Fix Kafka with Redistribute and commits enabled (#32344)
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen authored Sep 17, 2024
1 parent b949ac2 commit 1b2d21a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,6 @@ static <K, V> void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
System.out.println("xxx builder service" + builder.toString());
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -1697,11 +1696,12 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
}

if (kafkaRead.isRedistributed()) {
// fail here instead.
checkArgument(
kafkaRead.isCommitOffsetsInFinalizeEnabled(),
"commitOffsetsInFinalize() can't be enabled with isRedistributed");
if (kafkaRead.isCommitOffsetsInFinalizeEnabled() && kafkaRead.isAllowDuplicates()) {
LOG.warn(
"Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled");
}
PCollection<KafkaRecord<K, V>> output = input.getPipeline().apply(transform);

if (kafkaRead.getRedistributeNumKeys() == 0) {
return output.apply(
"Insert Redistribute",
Expand Down Expand Up @@ -1797,7 +1797,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
return pcol.apply(
"Insert Redistribute with Shards",
Redistribute.<KafkaRecord<K, V>>arbitrarily()
.withAllowDuplicates(true)
.withAllowDuplicates(kafkaRead.isAllowDuplicates())
.withNumBuckets((int) kafkaRead.getRedistributeNumKeys()));
}
}
Expand Down Expand Up @@ -2654,6 +2654,12 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
if (getRedistributeNumKeys() == 0) {
LOG.warn("This will create a key per record, which is sub-optimal for most use cases.");
}
if ((isCommitOffsetEnabled() || configuredKafkaCommit()) && isAllowDuplicates()) {
LOG.warn(
"Either auto_commit is set, or commitOffsetEnabled is enabled (or both), but since "
+ "withRestribute() is enabled with allow duplicates, the runner may have additional work processed that "
+ "is ahead of the current checkpoint");
}
}

if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
Expand Down Expand Up @@ -2687,8 +2693,7 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));

boolean applyCommitOffsets =
isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute();
boolean applyCommitOffsets = isCommitOffsetEnabled() && !configuredKafkaCommit();
if (!applyCommitOffsets) {
return outputWithDescriptor
.apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() {}).via(KV::getValue))
Expand All @@ -2710,6 +2715,15 @@ public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescriptor>
if (Comparators.lexicographical(Comparator.<String>naturalOrder())
.compare(requestedVersion, targetVersion)
< 0) {
// Redistribute is not allowed with commits prior to 2.59.0, since there is a Reshuffle
// prior to the redistribute. The reshuffle will occur before commits are offsetted and
// before outputting KafkaRecords. Adding a redistribute then afterwards doesn't provide
// additional performance benefit.
checkArgument(
!isRedistribute(),
"Can not enable isRedistribute() while committing offsets prior to "
+ String.join(".", targetVersion));

return expand259Commits(
outputWithDescriptor, recordCoder, input.getPipeline().getSchemaRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie
Function<KafkaIO.Read<Integer, Long>, KafkaIO.Read<Integer, Long>> kafkaReadDecorator) {
p.apply(
kafkaReadDecorator.apply(
mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 0)));
mkKafkaReadTransform(
1000,
null,
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
0)));
return p.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down Expand Up @@ -381,7 +382,13 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {

static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
return mkKafkaReadTransform(numElements, numElements, timestampFn, false, 0);
return mkKafkaReadTransform(
numElements,
numElements,
timestampFn,
false, /*redistribute*/
false, /*allowDuplicates*/
0);
}

/**
Expand All @@ -393,6 +400,7 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
@Nullable Integer maxNumRecords,
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
@Nullable Boolean redistribute,
@Nullable Boolean withAllowDuplicates,
@Nullable Integer numKeys) {

KafkaIO.Read<Integer, Long> reader =
Expand All @@ -408,13 +416,21 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
reader = reader.withMaxNumRecords(maxNumRecords);
}

if (withAllowDuplicates == null) {
withAllowDuplicates = false;
}

if (timestampFn != null) {
reader = reader.withTimestampFn(timestampFn);
}

if (redistribute) {
if (numKeys != null) {
reader = reader.withRedistribute().withRedistributeNumKeys(numKeys);
reader =
reader
.withRedistribute()
.withAllowDuplicates(withAllowDuplicates)
.withRedistributeNumKeys(numKeys);
}
reader = reader.withRedistribute();
}
Expand Down Expand Up @@ -628,17 +644,47 @@ public void testRiskyConfigurationWarnsProperly() {
}

@Test
public void testCommitOffsetsInFinalizeAndRedistributeErrors() {
thrown.expect(Exception.class);
thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with isRedistributed");
public void warningsWithAllowDuplicatesEnabledAndCommitOffsets() {
int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
true, /*redistribute*/
true, /*allowDuplicates*/
0)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
.withoutMetadata())
.apply(Values.create());

addCountingAsserts(input, numElements);
p.run();

kafkaIOExpectedLogs.verifyWarn(
"Offsets committed due to usage of commitOffsetsInFinalize() and may not capture all work processed due to use of withRedistribute() with duplicates enabled");
}

@Test
public void noWarningsWithNoAllowDuplicatesAndCommitOffsets() {
int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0)
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
true, /*redistribute*/
false, /*allowDuplicates*/
0)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
.withoutMetadata())
.apply(Values.create());

Expand All @@ -648,13 +694,25 @@ public void testCommitOffsetsInFinalizeAndRedistributeErrors() {

@Test
public void testNumKeysIgnoredWithRedistributeNotEnabled() {
thrown.expect(Exception.class);
thrown.expectMessage(
"withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform");

int numElements = 1000;

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0)
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
0)
.withRedistributeNumKeys(100)
.commitOffsetsInFinalize()
.withConsumerConfigUpdates(
ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true))
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
.withoutMetadata())
.apply(Values.create());

Expand All @@ -663,6 +721,32 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
p.run();
}

@Test
public void testDisableRedistributeKafkaOffsetLegacy() {
thrown.expect(Exception.class);
thrown.expectMessage(
"Can not enable isRedistribute() while committing offsets prior to 2.60.0");
p.getOptions().as(StreamingOptions.class).setUpdateCompatibilityVersion("2.59.0");

p.apply(
Create.of(
KafkaSourceDescriptor.of(
new TopicPartition("topic", 1),
null,
null,
null,
null,
ImmutableList.of("8.8.8.8:9092"))))
.apply(
KafkaIO.<Long, Long>readSourceDescriptors()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(LongDeserializer.class)
.withRedistribute()
.withProcessingTime()
.commitOffsets());
p.run();
}

@Test
public void testUnreachableKafkaBrokers() {
// Expect an exception when the Kafka brokers are not reachable on the workers.
Expand Down Expand Up @@ -1982,7 +2066,13 @@ public void testUnboundedSourceStartReadTime() {

PCollection<Long> input =
p.apply(
mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn(), false, 0)
mkKafkaReadTransform(
numElements,
maxNumRecords,
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
0)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());
Expand All @@ -2006,7 +2096,13 @@ public void testUnboundedSourceStartReadTimeException() {
int startTime = numElements / 20;

p.apply(
mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0)
mkKafkaReadTransform(
numElements,
numElements,
new ValueAsTimestampFn(),
false, /*redistribute*/
false, /*allowDuplicates*/
0)
.withStartReadTime(new Instant(startTime))
.withoutMetadata())
.apply(Values.create());
Expand Down

0 comments on commit 1b2d21a

Please sign in to comment.