diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 0bc4c948ebd..0bca916a704 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -61,6 +61,7 @@ import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.EventBasedSource; import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.MultiWorkUnit; @@ -218,7 +219,7 @@ public List getWorkunits(SourceState state) { this.kafkaConsumerClient.set(kafkaConsumerClientFactory.create(config)); - List topics = getFilteredTopics(state); + List topics = getValidTopics(getFilteredTopics(state), state); this.topicsToProcess = topics.stream().map(KafkaTopic::getName).collect(toSet()); for (String topic : this.topicsToProcess) { @@ -802,6 +803,7 @@ private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, Offsets protected List getFilteredTopics(SourceState state) { List blacklist = DatasetFilterUtils.getPatternList(state, TOPIC_BLACKLIST); List whitelist = DatasetFilterUtils.getPatternList(state, TOPIC_WHITELIST); + // TODO: replace this with TopicNameValidator in the config once TopicValidators is rolled out. if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) { blacklist.add(Pattern.compile(".*\\..*")); } @@ -815,6 +817,13 @@ public void shutdown(SourceState state) { state.setProp(ConfigurationKeys.FAIL_TO_GET_OFFSET_COUNT, this.failToGetOffsetCount); } + /** + * Return topics that pass all the topic validators. + */ + protected List getValidTopics(List topics, SourceState state) { + return new TopicValidators(state).validate(topics); + } + /** * This class contains startOffset, earliestOffset and latestOffset for a Kafka partition. */ diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java new file mode 100644 index 00000000000..c8dd8223f3d --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicNameValidator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; + +/** + * A topic validator that validates the topic name + */ +public class TopicNameValidator extends TopicValidatorBase { + private static final String DOT = "."; + + public TopicNameValidator(State state) { + super(state); + } + + /** + * Check if a topic name is valid, current rules are: + * 1. must not contain "." + * @param topic the topic to be validated + * @return true if the topic name is valid (aka. doesn't contain ".") + */ + @Override + public boolean validate(KafkaTopic topic) throws Exception { + return !topic.getName().contains(DOT); + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java new file mode 100644 index 00000000000..69c5bc92a65 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorBase.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; + +/** + * The base class of a topic validator + */ +public abstract class TopicValidatorBase { + protected State state; + + public TopicValidatorBase(State sourceState) { + this.state = sourceState; + } + + public abstract boolean validate(KafkaTopic topic) throws Exception; +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java new file mode 100644 index 00000000000..fbed07c7609 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidators.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * The TopicValidators contains a list of {@link TopicValidatorBase} that validate topics. + * To enable it, add below settings in the config: + * gobblin.kafka.topicValidators=validator1_class_name,validator2_class_name... + */ +@Slf4j +public class TopicValidators { + public static final String VALIDATOR_CLASSES_KEY = "gobblin.kafka.topicValidators"; + + private static long DEFAULTL_TIMEOUT = 10L; + + private static TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.MINUTES; + + private final List validators = new ArrayList<>(); + + private final State state; + + public TopicValidators(State state) { + this.state = state; + for (String validatorClassName : state.getPropAsList(VALIDATOR_CLASSES_KEY, StringUtils.EMPTY)) { + try { + this.validators.add(GobblinConstructorUtils.invokeConstructor(TopicValidatorBase.class, validatorClassName, + state)); + } catch (Exception e) { + log.error("Failed to create topic validator: {}, due to {}", validatorClassName, e); + } + } + } + + /** + * Validate topics with all the internal validators. The default timeout is set to 1 hour. + * Note: + * 1. the validations for every topic run in parallel. + * 2. when timeout happens, un-validated topics are still treated as "valid". + * @param topics the topics to be validated + * @return the topics that pass all the validators + */ + public List validate(List topics) { + return validate(topics, DEFAULTL_TIMEOUT, DEFAULT_TIMEOUT_UNIT); + } + + /** + * Validate topics with all the internal validators. + * Note: + * 1. the validations for every topic run in parallel. + * 2. when timeout happens, un-validated topics are still treated as "valid". + * @param topics the topics to be validated + * @param timeout the timeout for the validation + * @param timeoutUnit the time unit for the timeout + * @return the topics that pass all the validators + */ + public List validate(List topics, long timeout, TimeUnit timeoutUnit) { + int numOfThreads = state.getPropAsInt(ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS, + ConfigurationKeys.KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT); + + // Tasks running in the thread pool will have the same access control and class loader settings as current thread + ExecutorService threadPool = Executors.newFixedThreadPool(numOfThreads, ExecutorsUtils.newPrivilegedThreadFactory( + Optional.of(log))); + + List> results = new ArrayList<>(); + Stopwatch stopwatch = Stopwatch.createStarted(); + for (KafkaTopic topic : topics) { + results.add(threadPool.submit(() -> validate(topic))); + } + ExecutorsUtils.shutdownExecutorService(threadPool, Optional.of(log), timeout, timeoutUnit); + log.info(String.format("Validate %d topics in %d seconds", topics.size(), stopwatch.elapsed(TimeUnit.SECONDS))); + + List validTopics = new ArrayList<>(); + for (int i = 0; i < results.size(); ++i) { + try { + if (results.get(i).get()) { + validTopics.add(topics.get(i)); + } + } catch (InterruptedException | ExecutionException e) { + log.warn("Failed to validate topic: {}, treat it as a valid topic", topics.get(i)); + validTopics.add(topics.get(i)); + } + } + return validTopics; + } + + /** + * Validates a single topic with all the internal validators + */ + private boolean validate(KafkaTopic topic) throws Exception { + log.info("Validating topic {} in thread: {}", topic, Thread.currentThread().getName()); + for (TopicValidatorBase validator : this.validators) { + if (!validator.validate(topic)) { + log.warn("KafkaTopic: {} doesn't pass the validator: {}", topic, validator.getClass().getName()); + return false; + } + } + return true; + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java index 9992d4442a5..c26872e1ce3 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java @@ -25,6 +25,9 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicNameValidator; +import org.apache.gobblin.source.extractor.extract.kafka.validator.TopicValidators; import org.testng.Assert; import org.testng.annotations.Test; @@ -56,6 +59,36 @@ public void testGetFilteredTopics() { Assert.assertEquals(new TestKafkaSource(testKafkaClient).getFilteredTopics(state), toKafkaTopicList(allTopics.subList(0, 3))); } + @Test + public void testTopicValidators() { + TestKafkaClient testKafkaClient = new TestKafkaClient(); + List allTopics = Arrays.asList( + "Topic1", "topic-v2", "topic3", // allowed + "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", //period topics + "not-allowed-topic"); + testKafkaClient.testTopics = allTopics; + KafkaSource kafkaSource = new TestKafkaSource(testKafkaClient); + + SourceState state = new SourceState(); + state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*"); + state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*"); + List topicsToValidate = kafkaSource.getFilteredTopics(state); + + // Test without TopicValidators in the state + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 6)))); + + // Test empty TopicValidators in the state + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, ""); + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 6)))); + + // Test TopicValidators with TopicNameValidator in the state + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, TopicNameValidator.class.getName()); + Assert.assertTrue(CollectionUtils.isEqualCollection(kafkaSource.getValidTopics(topicsToValidate, state), + toKafkaTopicList(allTopics.subList(0, 3)))); + } + public List toKafkaTopicList(List topicNames) { return topicNames.stream().map(topicName -> new KafkaTopic(topicName, Collections.emptyList())).collect(Collectors.toList()); } diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java new file mode 100644 index 00000000000..2691ae112c2 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/validator/TopicValidatorsTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka.validator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TopicValidatorsTest { + @Test + public void testTopicValidators() { + List allTopics = Arrays.asList( + "topic1", "topic2", // allowed + "topic-with.period-in_middle", ".topic-with-period-at-start", "topicWithPeriodAtEnd.", // bad topics + "topic3", "topic4"); // in deny list + List topics = buildKafkaTopics(allTopics); + + State state = new State(); + + // Without any topic validators + List validTopics = new TopicValidators(state).validate(topics); + Assert.assertEquals(validTopics.size(), 7); + + // Use 2 topic validators: TopicNameValidator and DenyListValidator + String validatorsToUse = String.join(",", ImmutableList.of( + TopicNameValidator.class.getName(), DenyListValidator.class.getName())); + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, validatorsToUse); + validTopics = new TopicValidators(state).validate(topics); + + Assert.assertEquals(validTopics.size(), 2); + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic1"))); + Assert.assertTrue(validTopics.stream().anyMatch(topic -> topic.getName().equals("topic2"))); + } + + @Test + public void testValidatorTimeout() { + List allTopics = Arrays.asList("topic1", "topic2", "topic3"); + List topics = buildKafkaTopics(allTopics); + State state = new State(); + state.setProp(TopicValidators.VALIDATOR_CLASSES_KEY, RejectEverythingValidator.class.getName()); + List validTopics = new TopicValidators(state).validate(topics, 5, TimeUnit.SECONDS); + Assert.assertEquals(validTopics.size(), 1); // topic 2 times out, it should be treated as a valid topic + Assert.assertEquals(validTopics.get(0).getName(), "topic2"); + } + + private List buildKafkaTopics(List topics) { + return topics.stream() + .map(topicName -> new KafkaTopic(topicName, Collections.emptyList())) + .collect(Collectors.toList()); + } + + // A TopicValidator class to mimic a deny list + public static class DenyListValidator extends TopicValidatorBase { + Set denyList = ImmutableSet.of("topic3", "topic4"); + + public DenyListValidator(State state) { + super(state); + } + + @Override + public boolean validate(KafkaTopic topic) { + return !this.denyList.contains(topic.getName()); + } + } + + // A validator that always returns false when validate() is called. + // Sleep for 5 sec when processing topic2 to simulate a slow validation. + public static class RejectEverythingValidator extends TopicValidatorBase { + + public RejectEverythingValidator(State state) { + super(state); + } + + @Override + public boolean validate(KafkaTopic topic) { + if (!topic.getName().equals("topic2")) { + return false; + } + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return false; + } + } +} diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java index b05674a9592..09f2f00b49a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/ExecutorsUtils.java @@ -103,6 +103,21 @@ public static ThreadFactory newDaemonThreadFactory(Optional logger, Opti return newThreadFactory(new ThreadFactoryBuilder().setDaemon(true), logger, nameFormat); } + /** + * Get a new {@link ThreadFactory} that uses a {@link LoggingUncaughtExceptionHandler} + * to handle uncaught exceptions. + * Tasks running within such threads will have the same access control and class loader settings as the + * thread that invokes this method. + * + * @param logger an {@link Optional} wrapping the {@link Logger} that the + * {@link LoggingUncaughtExceptionHandler} uses to log uncaught exceptions thrown in threads + * @return a new {@link ThreadFactory} + */ + public static ThreadFactory newPrivilegedThreadFactory(Optional logger) { + return newThreadFactory(new ThreadFactoryBuilder().setThreadFactory(Executors.privilegedThreadFactory()), logger, + Optional.absent()); + } + private static ThreadFactory newThreadFactory(ThreadFactoryBuilder builder, Optional logger, Optional nameFormat) { if (nameFormat.isPresent()) {