Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hanghangliu committed Oct 19, 2023
1 parent e289df3 commit da2ce02
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,17 @@ protected String canonicalMetricName(String metricGroup, Collection<String> metr
return processedName;
}

/**
* Get a list of all kafka topics
*/
public abstract List<KafkaTopic> getTopics();

/**
* Get a list of {@link KafkaTopic} with the provided topic names.
* The default implementation lists all the topics.
* Implementations of this class can improve this method.
*/
public Collection<KafkaTopic> getTopics(Collection<String> topics) {
return getTopics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.Maps;
import com.typesafe.config.Config;

import java.util.stream.Collectors;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
Expand Down Expand Up @@ -170,22 +169,6 @@ default long committed(KafkaPartition partition) {

public default void assignAndSeek(List<KafkaPartition> topicPartitions, Map<KafkaPartition, LongWatermark> topicWatermarksMap) { return; }

/**
* Get a list of all kafka topics
*/
public default List<KafkaTopic> getTopics() {return Collections.emptyList();};

/**
* Get a list of {@link KafkaTopic} with the provided topic names.
* The default implementation firstly retrieve all the topics, then filter by the provided list.
* Implementations of this class can improve this method.
*/
public default Collection<KafkaTopic> getTopics(Collection<String> topics) {
return getTopics().stream()
.filter(list-> topics.contains(list))
.collect(Collectors.toList());
}

/**
* A factory to create {@link GobblinKafkaConsumerClient}s
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -29,8 +32,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -192,10 +195,27 @@ private void setLimiterReportKeyListToWorkUnits(List<WorkUnit> workUnits, List<S

@Override
public List<WorkUnit> getWorkunits(SourceState state) {
return this.getWorkunitsForFilteredPartitions(state, Optional.absent(), Optional.absent());
}

/**
* Compute Workunits for Kafka Topics. If filteredTopicPartition present, respect this map and only compute the provided
* topics and filtered partitions. If not, use state to discover Kafka topics and all available partitions.
*
* @param filteredTopicPartition optional parameter to determine if only filtered topic-partitions are needed.
* @param minContainer give an option to specify a minimum container count. Please be advised that how it being used is
* determined by the implementation of concrete {@link KafkaWorkUnitPacker} class.
*
* TODO: Utilize the minContainer in {@link KafkaTopicGroupingWorkUnitPacker#pack(Map, int)}, as the numContainers variable
* is not used currently.
*/
public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state,
Optional<Map<String, List<Integer>>> filteredTopicPartition, Optional<Integer> minContainer) {
this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class);
this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());

Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
Map<String, List<Integer>> filteredTopicPartitionMap = filteredTopicPartition.or(new HashMap<>());
Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) {
String tableTypeStr =
state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, KafkaSource.DEFAULT_TABLE_TYPE.toString());
Expand All @@ -215,18 +235,22 @@ public List<WorkUnit> getWorkunits(SourceState state) {
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));

List<KafkaTopic> topics = getFilteredTopics(state);
Collection<KafkaTopic> topics;
if(filteredTopicPartition.isPresent()) {
// If filteredTopicPartition present, use it to construct the whitelist pattern while leave blacklist empty
topics = this.kafkaConsumerClient.get().getFilteredTopics(Collections.emptyList(),
filteredTopicPartitionMap.keySet().stream().map(Pattern::compile).collect(Collectors.toList()));
} else {
topics = getFilteredTopics(state);
}
this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet());

for (String topic : this.topicsToProcess) {
LOG.info("Discovered topic " + topic);
}
Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {

Expand All @@ -236,20 +260,13 @@ public String apply(KafkaTopic topic) {
}
}), state);

for (KafkaTopic topic : topics) {
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
}

int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT);
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
Expand All @@ -259,115 +276,41 @@ public String apply(KafkaTopic topic) {
Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();

for (KafkaTopic topic : topics) {
threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
workUnits));
}

ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info(String.format("Created workunits for %d topics in %d seconds", workUnits.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(workUnits, topicSpecificStateMap, state);

KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits= 0;
if(!(kafkaWorkUnitPacker instanceof KafkaTopicGroupingWorkUnitPacker)) {
numOfMultiWorkunits = calculateNumMappersForPacker(state, kafkaWorkUnitPacker, workUnits);
}
addTopicSpecificPropsToWorkUnits(workUnits, topicSpecificStateMap);
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(workUnits, numOfMultiWorkunits);
setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException("Checked exception caught", e);
} catch (Throwable t) {
throw new RuntimeException("Unexpected throwable caught, ", t);
} finally {
try {
GobblinKafkaConsumerClient consumerClient = this.kafkaConsumerClient.get();
if (consumerClient != null) {
consumerClient.close();
}
// cleanup clients from pool
for (GobblinKafkaConsumerClient client: kafkaConsumerClientPool) {
client.close();
}
} catch (Throwable t) {
//Swallow any exceptions in the finally{..} block to allow potential exceptions from the main try{..} block to be
//propagated
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}
}

public List<WorkUnit> getWorkunitsForFilteredPartitions(SourceState state, Map<String, List<String>> filteredTopicPartitionMap, int minContainer) {
Map<String, List<WorkUnit>> kafkaTopicWorkunitMap = Maps.newConcurrentMap();
Collection<KafkaTopic> topics;
try {
Config config = ConfigUtils.propertiesToConfig(state.getProperties());
GobblinKafkaConsumerClientFactory kafkaConsumerClientFactory = kafkaConsumerClientResolver
.resolveClass(
state.getProp(GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
DEFAULT_GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS)).newInstance();

this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config));
topics = this.kafkaConsumerClient.get().getTopics(filteredTopicPartitionMap.keySet());

Map<String, State> topicSpecificStateMap =
DatasetUtils.getDatasetSpecificProps(Iterables.transform(topics, new Function<KafkaTopic, String>() {

@Override
public String apply(KafkaTopic topic) {
return topic.getName();
}
}), state);

int numOfThreads = Math.min(topics.size(), state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS,
ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT));
ExecutorService threadPool =
Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newThreadFactory(Optional.of(LOG)));

if (state.getPropAsBoolean(ConfigurationKeys.KAFKA_SOURCE_SHARE_CONSUMER_CLIENT,
ConfigurationKeys.DEFAULT_KAFKA_SOURCE_SHARE_CONSUMER_CLIENT)) {
this.sharedKafkaConsumerClient = this.kafkaConsumerClient.get();
} else {
// preallocate one client per thread
populateClientPool(3, kafkaConsumerClientFactory, config);
}

Stopwatch createWorkUnitStopwatch = Stopwatch.createStarted();
for (KafkaTopic topic : topics) {
LOG.info("Trying to re-compute the workunit for topic {} with filtered partitions: {}",
topic.getName(), filteredTopicPartitionMap.get(topic.getName()));
LOG.info("Discovered topic " + topic);
if (topic.getTopicSpecificState().isPresent()) {
topicSpecificStateMap.computeIfAbsent(topic.getName(), k -> new State())
.addAllIfNotExist(topic.getTopicSpecificState().get());
}
Set<Integer> partitionIDSet = filteredTopicPartitionMap.get(topic.getName())
.stream().map(Integer::parseInt).collect(Collectors.toSet());
Optional<Set<Integer>> partitionIDSet = Optional.absent();
if(filteredTopicPartition.isPresent()) {
List<Integer> list = java.util.Optional.ofNullable(filteredTopicPartitionMap.get(topic.getName()))
.orElse(new ArrayList<>());
partitionIDSet = Optional.of(new HashSet<>(list));
LOG.info("Compute the workunit for topic {} with num of filtered partitions: {}",
topic.getName(), list.size());
}

threadPool.submit(
new WorkUnitCreator(topic, state, Optional.fromNullable(topicSpecificStateMap.get(topic.getName())),
kafkaTopicWorkunitMap, partitionIDSet));
}

ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(LOG), 1L, TimeUnit.HOURS);
LOG.info("Created workunits for {} topics in {} seconds", kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS));
LOG.info(String.format("Created workunits for %d topics in %d seconds", kafkaTopicWorkunitMap.size(),
createWorkUnitStopwatch.elapsed(TimeUnit.SECONDS)));

// Create empty WorkUnits for skipped partitions (i.e., partitions that have previous offsets,
// but aren't processed).
createEmptyWorkUnitsForSkippedPartitions(kafkaTopicWorkunitMap, topicSpecificStateMap, state);

KafkaWorkUnitPacker kafkaWorkUnitPacker = KafkaWorkUnitPacker.getInstance(this, state, Optional.of(this.metricContext));
int numOfMultiWorkunits = minContainer;
int numOfMultiWorkunits = minContainer.or(0);
if(state.contains(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY)) {
numOfMultiWorkunits = Math.max(numOfMultiWorkunits,
calculateNumMappersForPacker(state, kafkaWorkUnitPacker, kafkaTopicWorkunitMap));
}

addTopicSpecificPropsToWorkUnits(kafkaTopicWorkunitMap, topicSpecificStateMap);
// TODO: numOfMultiWorkunits is currently not used in the #KafkaTopicGroupingWorkUnitPacker.pack() method.
// Next step is utilize this number to determine the container count
List<WorkUnit> workUnitList = kafkaWorkUnitPacker.pack(kafkaTopicWorkunitMap, numOfMultiWorkunits);
setLimiterReportKeyListToWorkUnits(workUnitList, getLimiterExtractorReportKeys());
return workUnitList;
Expand All @@ -391,6 +334,7 @@ public String apply(KafkaTopic topic) {
LOG.error("Exception {} encountered closing GobblinKafkaConsumerClient ", t);
}
}

}

protected void populateClientPool(int count,
Expand Down Expand Up @@ -482,10 +426,6 @@ private int calculateNumMappersForPacker(SourceState state,
/*
* This function need to be thread safe since it is called in the Runnable
*/
private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState) {
return this.getWorkUnitsForTopic(topic, state, topicSpecificState, Optional.absent());
}

private List<WorkUnit> getWorkUnitsForTopic(KafkaTopic topic, SourceState state,
Optional<State> topicSpecificState, Optional<Set<Integer>> filteredPartitions) {
Timer.Context context = this.metricContext.timer("isTopicQualifiedTimer").time();
Expand Down Expand Up @@ -1001,20 +941,16 @@ private class WorkUnitCreator implements Runnable {

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
this.filteredPartitionsId = Optional.absent();
this(topic, state, topicSpecificState, workUnits, Optional.absent());
}

WorkUnitCreator(KafkaTopic topic, SourceState state, Optional<State> topicSpecificState,
Map<String, List<WorkUnit>> workUnits, Set<Integer> filteredPartitionsId) {
Map<String, List<WorkUnit>> workUnits, Optional<Set<Integer>> filteredPartitionsId) {
this.topic = topic;
this.state = state;
this.topicSpecificState = topicSpecificState;
this.allTopicWorkUnits = workUnits;
this.filteredPartitionsId = Optional.of(filteredPartitionsId);
this.filteredPartitionsId = filteredPartitionsId;
}

@Override
Expand Down
Loading

0 comments on commit da2ce02

Please sign in to comment.