From 2e97f5661b665b6e8e2ea889742682cae5706d1b Mon Sep 17 00:00:00 2001 From: Arjun Singh Bora Date: Mon, 3 Jun 2024 17:24:05 -0700 Subject: [PATCH] [GOBBLIN-2078] remove configs because they may have a config that is prefix of another config (#3961) --- .../AutomaticTroubleshooterTest.java | 11 +++-------- .../gobblin/runtime/AbstractJobLauncher.java | 6 +++--- .../runtime/mapreduce/MRJobLauncher.java | 3 +-- .../AutomaticTroubleshooterConfig.java | 19 +++++++++---------- .../AutomaticTroubleshooterFactory.java | 8 ++++---- .../AutomaticTroubleshooterFactoryTest.java | 8 ++------ .../ddm/activity/impl/CommitActivityImpl.java | 15 +++++++-------- .../activity/impl/GenerateWorkUnitsImpl.java | 12 +++++------- .../activity/impl/ProcessWorkUnitImpl.java | 3 +-- 9 files changed, 35 insertions(+), 50 deletions(-) diff --git a/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java b/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java index 331d253b5a4..66c21a872b5 100644 --- a/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java +++ b/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java @@ -28,7 +28,6 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory; -import org.apache.gobblin.util.ConfigUtils; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; @@ -41,9 +40,7 @@ public class AutomaticTroubleshooterTest { @Test public void canCollectAndRefineIssues() throws Exception { - Properties properties = new Properties(); - AutomaticTroubleshooter troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(new Properties()); try { troubleshooter.start(); log.warn("Test warning"); @@ -72,8 +69,7 @@ public void canDisable() throws Exception { Properties properties = new Properties(); properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true"); - AutomaticTroubleshooter troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties); try { troubleshooter.start(); log.warn("Test warning"); @@ -96,8 +92,7 @@ public void canDisableEventReporter() Properties properties = new Properties(); properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false"); properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, "true"); - AutomaticTroubleshooter troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties); try { troubleshooter.start(); diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java index e0078803c12..fd5c9bab666 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java @@ -31,9 +31,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import java.util.concurrent.atomic.AtomicInteger; -import lombok.Getter; + import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +56,7 @@ import com.typesafe.config.ConfigFactory; import javax.annotation.Nullable; +import lombok.Getter; import lombok.RequiredArgsConstructor; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; @@ -205,7 +205,7 @@ public AbstractJobLauncher(Properties jobProps, List> metadataT clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags())); GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags); - troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps)); + troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps); troubleshooter.start(); // Make a copy for both the system and job configuration properties and resolve the job-template if any. diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java index f1c1fa319aa..7b01dc1a6bd 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java @@ -779,8 +779,7 @@ protected void setup(Context context) { final State gobblinJobState = HadoopUtils.getStateFromConf(context.getConfiguration()); TaskAttemptID taskAttemptID = context.getTaskAttemptID(); - troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(gobblinJobState.getProperties())); + troubleshooter = AutomaticTroubleshooterFactory.createForJob(gobblinJobState.getProperties()); troubleshooter.start(); try (Closer closer = Closer.create()) { diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java index 43af63328e9..d0c95acd018 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java @@ -18,8 +18,7 @@ package org.apache.gobblin.runtime.troubleshooter; import java.util.Objects; - -import com.typesafe.config.Config; +import java.util.Properties; import javax.inject.Inject; import lombok.AllArgsConstructor; @@ -27,7 +26,7 @@ import lombok.Getter; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; /** @@ -44,15 +43,15 @@ public class AutomaticTroubleshooterConfig { private int inMemoryRepositoryMaxSize = ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE; @Inject - public AutomaticTroubleshooterConfig(Config config) { - Objects.requireNonNull(config, "Config cannot be null"); + public AutomaticTroubleshooterConfig(Properties properties) { + Objects.requireNonNull(properties, "Properties cannot be null"); - disabled = ConfigUtils.getBoolean(config, ConfigurationKeys.TROUBLESHOOTER_DISABLED, false); - disableEventReporting = - ConfigUtils.getBoolean(config, ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, false); + disabled = PropertiesUtils.getPropAsBoolean(properties, ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false"); + disableEventReporting = PropertiesUtils.getPropAsBoolean(properties, + ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, "false"); - inMemoryRepositoryMaxSize = ConfigUtils - .getInt(config, ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE, + inMemoryRepositoryMaxSize = PropertiesUtils.getPropAsInt(properties, + ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE, ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE); } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java index 6d6080e2bc4..9c4925de9bf 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java @@ -17,7 +17,7 @@ package org.apache.gobblin.runtime.troubleshooter; -import com.typesafe.config.Config; +import java.util.Properties; import lombok.extern.slf4j.Slf4j; @@ -37,10 +37,10 @@ public class AutomaticTroubleshooterFactory { * If this module is missing, troubleshooter will default to a no-op implementation. * * In addition, even when the "gobblin-troubleshooter" module is present, troubleshooter can still be disabled - * with {@link ConfigurationKeys.TROUBLESHOOTER_DISABLED} setting. + * with {@link ConfigurationKeys#TROUBLESHOOTER_DISABLED} setting. * */ - public static AutomaticTroubleshooter createForJob(Config config) { - AutomaticTroubleshooterConfig troubleshooterConfig = new AutomaticTroubleshooterConfig(config); + public static AutomaticTroubleshooter createForJob(Properties properties) { + AutomaticTroubleshooterConfig troubleshooterConfig = new AutomaticTroubleshooterConfig(properties); Class troubleshooterClass = tryGetTroubleshooterClass(); diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java index 0aa7d20e855..fc23c8ed6f7 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java @@ -22,7 +22,6 @@ import org.testng.annotations.Test; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.util.ConfigUtils; import static org.junit.Assert.assertTrue; @@ -33,9 +32,7 @@ public class AutomaticTroubleshooterFactoryTest { public void willGetNoopTroubleshooterByDefault() { // This test project does not reference gobblin-troubleshooter module, so we should get a noop-instance // of troubleshooter. See the main AutomaticTroubleshooterFactory class for details. - Properties properties = new Properties(); - AutomaticTroubleshooter troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(new Properties()); assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter); } @@ -44,8 +41,7 @@ public void willGetNoopTroubleshooterByDefault() { public void willGetNoopTroubleshooterWhenDisabled() { Properties properties = new Properties(); properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true"); - AutomaticTroubleshooter troubleshooter = - AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties); assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index f409e5108c5..e8490d71417 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -29,18 +29,18 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -import javax.annotation.Nullable; -import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import com.google.api.client.util.Lists; import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import io.temporal.failure.ApplicationFailure; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import io.temporal.failure.ApplicationFailure; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.SharedResourcesBroker; @@ -54,16 +54,15 @@ import org.apache.gobblin.runtime.SafeDatasetCommit; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.TaskStateCollectorService; -import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory; +import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.CommitStats; import org.apache.gobblin.temporal.ddm.work.DatasetStats; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -87,7 +86,7 @@ public CommitStats commit(WUProcessingSpec workSpec) { JobState jobState = Help.loadJobState(workSpec, fs); optJobName = Optional.ofNullable(jobState.getJobName()); SharedResourcesBroker instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState); - troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties())); + troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties()); troubleshooter.start(); List taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads); if (taskStates.isEmpty()) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java index ace8bedc17b..0ed0675f649 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java @@ -21,16 +21,15 @@ import java.util.List; import java.util.Properties; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import com.google.api.client.util.Lists; import com.google.common.io.Closer; import io.temporal.failure.ApplicationFailure; - import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.converter.initializer.ConverterInitializerFactory; import org.apache.gobblin.destination.DestinationDatasetHandlerService; @@ -45,10 +44,9 @@ import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitStream; import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits; -import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.initializer.WriterInitializerFactory; @@ -70,7 +68,7 @@ public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSub // jobState.setBroker(jobBroker); // jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName)); - AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps)); + AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps); troubleshooter.start(); try (Closer closer = Closer.create()) { // before embarking on (potentially expensive) WU creation, first pre-check that the FS is available diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java index f11ac706877..a6753245b7e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java @@ -56,7 +56,6 @@ import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JobLauncherUtils; @@ -76,7 +75,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) { List workUnits = loadFlattenedWorkUnits(wu, fs); log.info("{} - loaded; found {} workUnits", correlator, workUnits.size()); JobState jobState = Help.loadJobState(wu, fs); - troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties())); + troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties()); troubleshooter.start(); return execute(workUnits, wu, jobState, fs, troubleshooter.getIssueRepository()); } catch (IOException | InterruptedException e) {