Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll for backlog in background thread instead of inline #31697

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.KafkaLatestOffsetEstimator;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

public class KafkaBacklogPollThread {

@VisibleForTesting static final String METRIC_NAMESPACE =
KafkaUnboundedReader.METRIC_NAMESPACE;
@VisibleForTesting
static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;

private transient @Nullable Map<TopicPartition, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

KafkaBacklogPollThread() {
consumer = null;
}

private @Nullable Consumer<byte[], byte[]> consumer;

private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThread.class);

private @Nullable Future<?> pollFuture;

private static Map<TopicPartition, Long> backlogMap = new HashMap<>();

void startOnExecutor(ExecutorService executorService, Consumer<byte[], byte[]> consumer) {
this.consumer = consumer;
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
// like 100 milliseconds does not work well. This along with large receive buffer for
// consumer achieved best throughput in tests (see `defaultConsumerProperties`).
pollFuture = executorService.submit(this::backlogPollLoop);
}

void close() throws IOException {
if (consumer == null) {
LOG.debug("Closing backlog poll thread that was never started.");
return;
}
Preconditions.checkStateNotNull(pollFuture);
}

private static Supplier<Long> backlogQuery(
Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
return Suppliers.memoizeWithExpiration(
() -> {
synchronized (offsetConsumer) {
ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
return offsetConsumer.position(topicPartition);
}
},
1,
TimeUnit.SECONDS);
}

// should there be a separate thread per partition?
private void backlogPollLoop() {
// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
offsetEstimatorCache.entrySet()) {
TopicPartition topicPartition = tp.getKey();
KafkaLatestOffsetEstimator offsetEstimator = tp.getValue();

// Insert into backlog map. and update the tracker if the previous one is now closed.
if (backlogMap.get(topicPartition) == null || offsetEstimator.isClosed()) {
Preconditions.checkStateNotNull(this.consumer);
java.util.function.Supplier<Long> memoizedBacklog = backlogQuery(this.consumer, topicPartition);
LOG.info("xxx the backlog is {}", memoizedBacklog.get());
backlogMap.put(topicPartition, memoizedBacklog.get());
}
}

// Update backlog metrics.
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
for (TopicPartition topicPartition : offsetEstimatorCache.keySet()) {
Preconditions.checkStateNotNull(backlogMap.get(topicPartition));
Long backlogValue = backlogMap.get(topicPartition);
Gauge backlog =
Metrics.gauge(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString());
backlog.set(backlogValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.TopicPartition;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;

public class KafkaBacklogPollThreadCache {

private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThreadCache.class);
private final ExecutorService invalidationExecutor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("KafkaBacklogPollCache-invalidation-%d")
.build());
private final ExecutorService backgroundThreads =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("KafkaBacklogPollCache-poll-%d")
.build());

// Note on thread safety. This class is thread safe because:
// - Guava Cache is thread safe.
// - There is no state other than Cache.
// - API is strictly a 1:1 wrapper over Cache API (not counting cache.cleanUp() calls).
// - i.e. it does not invoke more than one call, which could make it inconsistent.
// If any of these conditions changes, please test ensure and test thread safety.

private static class CacheKey {
final Map<String, Object> consumerConfig;
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
final KafkaSourceDescriptor descriptor;

CacheKey(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor descriptor) {
this.consumerConfig = consumerConfig;
this.consumerFactoryFn = consumerFactoryFn;
this.descriptor = descriptor;
}

@Override
public boolean equals(@Nullable Object other) {
if (other == null) {
return false;
}
if (!(other instanceof CacheKey)) {
return false;
}
CacheKey otherKey = (CacheKey) other;
return descriptor.equals(otherKey.descriptor)
&& consumerFactoryFn.equals(otherKey.consumerFactoryFn)
&& consumerConfig.equals(otherKey.consumerConfig) ;
}

@Override
public int hashCode() {
return Objects.hash(descriptor, consumerFactoryFn, consumerConfig);
}
}

private static class CacheEntry {

final KafkaBacklogPollThread pollThread;

CacheEntry(KafkaBacklogPollThread pollThread) {
this.pollThread = pollThread;
}
}

private final Duration cacheDuration = Duration.ofMinutes(1);
private final Cache<CacheKey, CacheEntry> cache;

@SuppressWarnings("method.invocation")
KafkaBacklogPollThreadCache() {
this.cache =
CacheBuilder.newBuilder()
.expireAfterWrite(cacheDuration.toMillis(), TimeUnit.MILLISECONDS)
.removalListener(
(RemovalNotification<CacheKey, CacheEntry> notification) -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
LOG.info(
"Asynchronously closing backlog poll for {} as it has been idle for over {}",
notification.getKey(),
cacheDuration);
asyncCloseConsumer(
checkNotNull(notification.getKey()),
checkNotNull(notification.getValue()));
}
})
.build();
}

KafkaBacklogPollThread acquireConsumer(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor kafkaSourceDescriptor // needed to get updated consumer config
) {
cache.cleanUp();

Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition, consumerConfig, updatedConsumerConfig));

// This should create whatever is needed to poll the backlog
KafkaBacklogPollThread pollThread = new KafkaBacklogPollThread();
pollThread.startOnExecutor(backgroundThreads, offsetConsumer);
return pollThread;
}

/** Close the reader and log a warning if close fails. */
private void asyncCloseConsumer(CacheKey key, CacheEntry entry) {
invalidationExecutor.execute(
() -> {
try {
entry.pollThread.close();
LOG.info("Finished closing consumer for {}", key);
} catch (IOException e) {
LOG.warn("Failed to close consumer for {}", key, e);
}
});
}

void releaseConsumer(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor kafkaSourceDescriptor,
KafkaBacklogPollThread pollThread
) {
CacheKey key = new CacheKey(consumerConfig, consumerFactoryFn, kafkaSourceDescriptor);
CacheEntry existing = cache.asMap().putIfAbsent(key, new CacheEntry(pollThread));
if (existing != null) {
LOG.warn("Topic {} and partition {} combination does not exist in cache", kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition());
asyncCloseConsumer(key, existing);
}
cache.cleanUp();
}

private Map<String, Object> overrideBootstrapServersConfig(
Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
checkState(
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
|| description.getBootStrapServers() != null);
Map<String, Object> config = new HashMap<>(currentConfig);
if (description.getBootStrapServers() != null &&
!description.getBootStrapServers().isEmpty()) {
config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(",", description.getBootStrapServers()));
}
return config;
}
}
Loading
Loading