From 6451e75f1e2b603d109a639510dc7f2512944b9d Mon Sep 17 00:00:00 2001 From: Naireen Date: Tue, 13 Aug 2024 21:28:21 +0000 Subject: [PATCH] add background cache --- .../sdk/io/kafka/KafkaBacklogPollThread.java | 126 +++++++++++ .../io/kafka/KafkaBacklogPollThreadCache.java | 205 ++++++++++++++++++ .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 10 +- 3 files changed, 336 insertions(+), 5 deletions(-) create mode 100644 sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThread.java create mode 100644 sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThreadCache.java diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThread.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThread.java new file mode 100644 index 000000000000..084e1852bb76 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThread.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.KafkaLatestOffsetEstimator; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + +public class KafkaBacklogPollThread { + + @VisibleForTesting static final String METRIC_NAMESPACE = +KafkaUnboundedReader.METRIC_NAMESPACE; + @VisibleForTesting + static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX; + + private transient @Nullable Map +offsetEstimatorCache; + + KafkaBacklogPollThread() { + consumer = null; + } + + private @Nullable Consumer consumer; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThread.class); + + private @Nullable Future pollFuture; + + private static Map backlogMap = new HashMap<>(); + + void startOnExecutor(ExecutorService executorService, Consumer consumer) { + this.consumer = consumer; + // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including + // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout + // like 100 milliseconds does not work well. This along with large receive buffer for + // consumer achieved best throughput in tests (see `defaultConsumerProperties`). + pollFuture = executorService.submit(this::backlogPollLoop); + } + + void close() throws IOException { + if (consumer == null) { + LOG.debug("Closing backlog poll thread that was never started."); + return; + } + Preconditions.checkStateNotNull(pollFuture); + } + + private static Supplier backlogQuery( + Consumer offsetConsumer, TopicPartition topicPartition) { + return Suppliers.memoizeWithExpiration( + () -> { + synchronized (offsetConsumer) { + ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition); + return offsetConsumer.position(topicPartition); + } + }, + 1, + TimeUnit.SECONDS); + } + + // should there be a separate thread per partition? + private void backlogPollLoop() { + // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, + // so we want to minimize the amount of connections that we start and track with Kafka. Another + // point is that it has a memoized backlog, and this should make that more reusable estimations. + Preconditions.checkStateNotNull(this.offsetEstimatorCache); + for (Map.Entry tp : + offsetEstimatorCache.entrySet()) { + TopicPartition topicPartition = tp.getKey(); + KafkaLatestOffsetEstimator offsetEstimator = tp.getValue(); + + // Insert into backlog map. and update the tracker if the previous one is now closed. + if (backlogMap.get(topicPartition) == null || offsetEstimator.isClosed()) { + Preconditions.checkStateNotNull(this.consumer); + java.util.function.Supplier memoizedBacklog = backlogQuery(this.consumer, topicPartition); + LOG.info("xxx the backlog is {}", memoizedBacklog.get()); + backlogMap.put(topicPartition, memoizedBacklog.get()); + } + } + + // Update backlog metrics. + Preconditions.checkStateNotNull(this.offsetEstimatorCache); + for (TopicPartition topicPartition : offsetEstimatorCache.keySet()) { + Preconditions.checkStateNotNull(backlogMap.get(topicPartition)); + Long backlogValue = backlogMap.get(topicPartition); + Gauge backlog = + Metrics.gauge( + METRIC_NAMESPACE, + RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString()); + backlog.set(backlogValue); + } + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThreadCache.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThreadCache.java new file mode 100644 index 000000000000..5051fd6949af --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaBacklogPollThreadCache.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static +org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static +org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalCause; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import +org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.kafka.common.TopicPartition; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils; + +public class KafkaBacklogPollThreadCache { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaBacklogPollThreadCache.class); + private final ExecutorService invalidationExecutor = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("KafkaBacklogPollCache-invalidation-%d") + .build()); + private final ExecutorService backgroundThreads = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("KafkaBacklogPollCache-poll-%d") + .build()); + + // Note on thread safety. This class is thread safe because: + // - Guava Cache is thread safe. + // - There is no state other than Cache. + // - API is strictly a 1:1 wrapper over Cache API (not counting cache.cleanUp() calls). + // - i.e. it does not invoke more than one call, which could make it inconsistent. + // If any of these conditions changes, please test ensure and test thread safety. + + private static class CacheKey { + final Map consumerConfig; + final SerializableFunction, Consumer> consumerFactoryFn; + final KafkaSourceDescriptor descriptor; + + CacheKey( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor descriptor) { + this.consumerConfig = consumerConfig; + this.consumerFactoryFn = consumerFactoryFn; + this.descriptor = descriptor; + } + + @Override + public boolean equals(@Nullable Object other) { + if (other == null) { + return false; + } + if (!(other instanceof CacheKey)) { + return false; + } + CacheKey otherKey = (CacheKey) other; + return descriptor.equals(otherKey.descriptor) + && consumerFactoryFn.equals(otherKey.consumerFactoryFn) + && consumerConfig.equals(otherKey.consumerConfig) ; + } + + @Override + public int hashCode() { + return Objects.hash(descriptor, consumerFactoryFn, consumerConfig); + } + } + + private static class CacheEntry { + + final KafkaBacklogPollThread pollThread; + + CacheEntry(KafkaBacklogPollThread pollThread) { + this.pollThread = pollThread; + } + } + + private final Duration cacheDuration = Duration.ofMinutes(1); + private final Cache cache; + + @SuppressWarnings("method.invocation") + KafkaBacklogPollThreadCache() { + this.cache = + CacheBuilder.newBuilder() + .expireAfterWrite(cacheDuration.toMillis(), TimeUnit.MILLISECONDS) + .removalListener( + (RemovalNotification notification) -> { + if (notification.getCause() != RemovalCause.EXPLICIT) { + LOG.info( + "Asynchronously closing backlog poll for {} as it has been idle for over {}", + notification.getKey(), + cacheDuration); + asyncCloseConsumer( + checkNotNull(notification.getKey()), +checkNotNull(notification.getValue())); + } + }) + .build(); + } + + KafkaBacklogPollThread acquireConsumer( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor kafkaSourceDescriptor // needed to get updated consumer config + ) { + cache.cleanUp(); + + Map updatedConsumerConfig = + overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor); + LOG.info( + "Creating Kafka consumer for process continuation for {}", + kafkaSourceDescriptor.getTopicPartition()); + + TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); + Consumer offsetConsumer = + consumerFactoryFn.apply( + KafkaIOUtils.getOffsetConsumerConfig( + "tracker-" + topicPartition, consumerConfig, updatedConsumerConfig)); + + // This should create whatever is needed to poll the backlog + KafkaBacklogPollThread pollThread = new KafkaBacklogPollThread(); + pollThread.startOnExecutor(backgroundThreads, offsetConsumer); + return pollThread; + } + + /** Close the reader and log a warning if close fails. */ + private void asyncCloseConsumer(CacheKey key, CacheEntry entry) { + invalidationExecutor.execute( + () -> { + try { + entry.pollThread.close(); + LOG.info("Finished closing consumer for {}", key); + } catch (IOException e) { + LOG.warn("Failed to close consumer for {}", key, e); + } + }); + } + + void releaseConsumer( + Map consumerConfig, + SerializableFunction, Consumer> consumerFactoryFn, + KafkaSourceDescriptor kafkaSourceDescriptor, + KafkaBacklogPollThread pollThread + ) { + CacheKey key = new CacheKey(consumerConfig, consumerFactoryFn, kafkaSourceDescriptor); + CacheEntry existing = cache.asMap().putIfAbsent(key, new CacheEntry(pollThread)); + if (existing != null) { + LOG.warn("Topic {} and partition {} combination does not exist in cache", kafkaSourceDescriptor.getTopic(), kafkaSourceDescriptor.getPartition()); + asyncCloseConsumer(key, existing); + } + cache.cleanUp(); + } + + private Map overrideBootstrapServersConfig( + Map currentConfig, KafkaSourceDescriptor description) { + checkState( + currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) + || description.getBootStrapServers() != null); + Map config = new HashMap<>(currentConfig); + if (description.getBootStrapServers() != null && +!description.getBootStrapServers().isEmpty()) { + config.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + String.join(",", description.getBootStrapServers())); + } + return config; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index acec0aa6198a..2f9ee646fb05 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -154,8 +154,8 @@ abstract class ReadFromKafkaDoFn // private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 1; - private transient ScheduledExecutorService backlogFetcherThread = - Executors.newSingleThreadScheduledExecutor(); + // private transient ScheduledExecutorService backlogFetcherThread = + // Executors.newSingleThreadScheduledExecutor(); private Map updatedConsumerConfig; static ReadFromKafkaDoFn create( @@ -203,7 +203,7 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - this.backlogFetcherThread = Executors.newSingleThreadScheduledExecutor(); + // this.backlogFetcherThread = Executors.newSingleThreadScheduledExecutor(); if (transform.getConsumerPollingTimeout() > 0) { this.consumerPollingTimeout = transform.getConsumerPollingTimeout(); } else { @@ -320,7 +320,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource } else if (startReadTime != null) { startOffset = ConsumerSpEL.offsetForTime(offsetConsumer, topicPartition, startReadTime); } else { - startOffset = offsetConsumer.position(partition); + startOffset = offsetConsumer.position(topicPartition); } long endOffset = Long.MAX_VALUE; @@ -704,7 +704,7 @@ public void teardown() throws Exception { LOG.warn("Fail to close resource during finishing bundle.", anyException); } - backlogFetcherThread.shutdown(); + // backlogFetcherThread.shutdown(); if (offsetEstimatorCache != null) { offsetEstimatorCache.clear();