Skip to content

Commit

Permalink
add background cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Sep 19, 2024
1 parent eacc509 commit 6451e75
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.KafkaLatestOffsetEstimator;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

public class KafkaBacklogPollThread {

@VisibleForTesting static final String METRIC_NAMESPACE =
KafkaUnboundedReader.METRIC_NAMESPACE;
@VisibleForTesting
static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX;

private transient @Nullable Map<TopicPartition, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

KafkaBacklogPollThread() {
consumer = null;
}

private @Nullable Consumer<byte[], byte[]> consumer;

private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThread.class);

private @Nullable Future<?> pollFuture;

private static Map<TopicPartition, Long> backlogMap = new HashMap<>();

void startOnExecutor(ExecutorService executorService, Consumer<byte[], byte[]> consumer) {
this.consumer = consumer;
// Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
// network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
// like 100 milliseconds does not work well. This along with large receive buffer for
// consumer achieved best throughput in tests (see `defaultConsumerProperties`).
pollFuture = executorService.submit(this::backlogPollLoop);
}

void close() throws IOException {
if (consumer == null) {
LOG.debug("Closing backlog poll thread that was never started.");
return;
}
Preconditions.checkStateNotNull(pollFuture);
}

private static Supplier<Long> backlogQuery(
Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) {
return Suppliers.memoizeWithExpiration(
() -> {
synchronized (offsetConsumer) {
ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition);
return offsetConsumer.position(topicPartition);
}
},
1,
TimeUnit.SECONDS);
}

// should there be a separate thread per partition?
private void backlogPollLoop() {
// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
offsetEstimatorCache.entrySet()) {
TopicPartition topicPartition = tp.getKey();
KafkaLatestOffsetEstimator offsetEstimator = tp.getValue();

// Insert into backlog map. and update the tracker if the previous one is now closed.
if (backlogMap.get(topicPartition) == null || offsetEstimator.isClosed()) {
Preconditions.checkStateNotNull(this.consumer);
java.util.function.Supplier<Long> memoizedBacklog = backlogQuery(this.consumer, topicPartition);
LOG.info("xxx the backlog is {}", memoizedBacklog.get());
backlogMap.put(topicPartition, memoizedBacklog.get());
}
}

// Update backlog metrics.
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
for (TopicPartition topicPartition : offsetEstimatorCache.keySet()) {
Preconditions.checkStateNotNull(backlogMap.get(topicPartition));
Long backlogValue = backlogMap.get(topicPartition);
Gauge backlog =
Metrics.gauge(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString());
backlog.set(backlogValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;

import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.common.TopicPartition;
import org.apache.beam.sdk.io.kafka.KafkaIOUtils;

public class KafkaBacklogPollThreadCache {

private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThreadCache.class);
private final ExecutorService invalidationExecutor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("KafkaBacklogPollCache-invalidation-%d")
.build());
private final ExecutorService backgroundThreads =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("KafkaBacklogPollCache-poll-%d")
.build());

// Note on thread safety. This class is thread safe because:
// - Guava Cache is thread safe.
// - There is no state other than Cache.
// - API is strictly a 1:1 wrapper over Cache API (not counting cache.cleanUp() calls).
// - i.e. it does not invoke more than one call, which could make it inconsistent.
// If any of these conditions changes, please test ensure and test thread safety.

private static class CacheKey {
final Map<String, Object> consumerConfig;
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
final KafkaSourceDescriptor descriptor;

CacheKey(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor descriptor) {
this.consumerConfig = consumerConfig;
this.consumerFactoryFn = consumerFactoryFn;
this.descriptor = descriptor;
}

@Override
public boolean equals(@Nullable Object other) {
if (other == null) {
return false;
}
if (!(other instanceof CacheKey)) {
return false;
}
CacheKey otherKey = (CacheKey) other;
return descriptor.equals(otherKey.descriptor)
&& consumerFactoryFn.equals(otherKey.consumerFactoryFn)
&& consumerConfig.equals(otherKey.consumerConfig) ;
}

@Override
public int hashCode() {
return Objects.hash(descriptor, consumerFactoryFn, consumerConfig);
}
}

private static class CacheEntry {

final KafkaBacklogPollThread pollThread;

CacheEntry(KafkaBacklogPollThread pollThread) {
this.pollThread = pollThread;
}
}

private final Duration cacheDuration = Duration.ofMinutes(1);
private final Cache<CacheKey, CacheEntry> cache;

@SuppressWarnings("method.invocation")
KafkaBacklogPollThreadCache() {
this.cache =
CacheBuilder.newBuilder()
.expireAfterWrite(cacheDuration.toMillis(), TimeUnit.MILLISECONDS)
.removalListener(
(RemovalNotification<CacheKey, CacheEntry> notification) -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
LOG.info(
"Asynchronously closing backlog poll for {} as it has been idle for over {}",
notification.getKey(),
cacheDuration);
asyncCloseConsumer(
checkNotNull(notification.getKey()),
checkNotNull(notification.getValue()));
}
})
.build();
}

KafkaBacklogPollThread acquireConsumer(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor kafkaSourceDescriptor // needed to get updated consumer config
) {
cache.cleanUp();

Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition, consumerConfig, updatedConsumerConfig));

// This should create whatever is needed to poll the backlog
KafkaBacklogPollThread pollThread = new KafkaBacklogPollThread();
pollThread.startOnExecutor(backgroundThreads, offsetConsumer);
return pollThread;
}

/** Close the reader and log a warning if close fails. */
private void asyncCloseConsumer(CacheKey key, CacheEntry entry) {
invalidationExecutor.execute(
() -> {
try {
entry.pollThread.close();
LOG.info("Finished closing consumer for {}", key);
} catch (IOException e) {
LOG.warn("Failed to close consumer for {}", key, e);
}
});
}

void releaseConsumer(
Map<String, Object> consumerConfig,
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
KafkaSourceDescriptor kafkaSourceDescriptor,
KafkaBacklogPollThread pollThread
) {
CacheKey key = new CacheKey(consumerConfig, consumerFactoryFn, kafkaSourceDescriptor);
CacheEntry existing = cache.asMap().putIfAbsent(key, new CacheEntry(pollThread));
if (existing != null) {
LOG.warn("Topic {} and partition {} combination does not exist in cache", kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition());
asyncCloseConsumer(key, existing);
}
cache.cleanUp();
}

private Map<String, Object> overrideBootstrapServersConfig(
Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
checkState(
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
|| description.getBootStrapServers() != null);
Map<String, Object> config = new HashMap<>(currentConfig);
if (description.getBootStrapServers() != null &&
!description.getBootStrapServers().isEmpty()) {
config.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
String.join(",", description.getBootStrapServers()));
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ abstract class ReadFromKafkaDoFn<K, V>

// private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1;

private transient ScheduledExecutorService backlogFetcherThread =
Executors.newSingleThreadScheduledExecutor();
// private transient ScheduledExecutorService backlogFetcherThread =
// Executors.newSingleThreadScheduledExecutor();
private Map<String, Object> updatedConsumerConfig;

static <K, V> ReadFromKafkaDoFn<K, V> create(
Expand Down Expand Up @@ -203,7 +203,7 @@ private ReadFromKafkaDoFn(
this.checkStopReadingFn = transform.getCheckStopReadingFn();
this.badRecordRouter = transform.getBadRecordRouter();
this.recordTag = recordTag;
this.backlogFetcherThread = Executors.newSingleThreadScheduledExecutor();
// this.backlogFetcherThread = Executors.newSingleThreadScheduledExecutor();
if (transform.getConsumerPollingTimeout() > 0) {
this.consumerPollingTimeout = transform.getConsumerPollingTimeout();
} else {
Expand Down Expand Up @@ -320,7 +320,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
} else if (startReadTime != null) {
startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, topicPartition, startReadTime);
} else {
startOffset = offsetConsumer.position(partition);
startOffset = offsetConsumer.position(topicPartition);
}

long endOffset = Long.MAX_VALUE;
Expand Down Expand Up @@ -704,7 +704,7 @@ public void teardown() throws Exception {
LOG.warn("Fail to close resource during finishing bundle.", anyException);
}

backlogFetcherThread.shutdown();
// backlogFetcherThread.shutdown();

if (offsetEstimatorCache != null) {
offsetEstimatorCache.clear();
Expand Down

0 comments on commit 6451e75

Please sign in to comment.