Skip to content

Commit

Permalink
[GOBBLIN-2078] remove configs because they may have a config that is …
Browse files Browse the repository at this point in the history
…prefix of another config (#3961)
  • Loading branch information
arjun4084346 authored Jun 4, 2024
1 parent 290d17d commit 2e97f56
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.util.ConfigUtils;

import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
Expand All @@ -41,9 +40,7 @@ public class AutomaticTroubleshooterTest {
@Test
public void canCollectAndRefineIssues()
throws Exception {
Properties properties = new Properties();
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(new Properties());
try {
troubleshooter.start();
log.warn("Test warning");
Expand Down Expand Up @@ -72,8 +69,7 @@ public void canDisable()
throws Exception {
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true");
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties);
try {
troubleshooter.start();
log.warn("Test warning");
Expand All @@ -96,8 +92,7 @@ public void canDisableEventReporter()
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false");
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, "true");
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties);
try {
troubleshooter.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +56,7 @@
import com.typesafe.config.ConfigFactory;

import javax.annotation.Nullable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
Expand Down Expand Up @@ -205,7 +205,7 @@ public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> metadataT
clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags);

troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();

// Make a copy for both the system and job configuration properties and resolve the job-template if any.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,7 @@ protected void setup(Context context) {
final State gobblinJobState = HadoopUtils.getStateFromConf(context.getConfiguration());
TaskAttemptID taskAttemptID = context.getTaskAttemptID();

troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(gobblinJobState.getProperties()));
troubleshooter = AutomaticTroubleshooterFactory.createForJob(gobblinJobState.getProperties());
troubleshooter.start();

try (Closer closer = Closer.create()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package org.apache.gobblin.runtime.troubleshooter;

import java.util.Objects;

import com.typesafe.config.Config;
import java.util.Properties;

import javax.inject.Inject;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;


/**
Expand All @@ -44,15 +43,15 @@ public class AutomaticTroubleshooterConfig {
private int inMemoryRepositoryMaxSize = ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE;

@Inject
public AutomaticTroubleshooterConfig(Config config) {
Objects.requireNonNull(config, "Config cannot be null");
public AutomaticTroubleshooterConfig(Properties properties) {
Objects.requireNonNull(properties, "Properties cannot be null");

disabled = ConfigUtils.getBoolean(config, ConfigurationKeys.TROUBLESHOOTER_DISABLED, false);
disableEventReporting =
ConfigUtils.getBoolean(config, ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, false);
disabled = PropertiesUtils.getPropAsBoolean(properties, ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false");
disableEventReporting = PropertiesUtils.getPropAsBoolean(properties,
ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, "false");

inMemoryRepositoryMaxSize = ConfigUtils
.getInt(config, ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE,
inMemoryRepositoryMaxSize = PropertiesUtils.getPropAsInt(properties,
ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE,
ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.gobblin.runtime.troubleshooter;

import com.typesafe.config.Config;
import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -37,10 +37,10 @@ public class AutomaticTroubleshooterFactory {
* If this module is missing, troubleshooter will default to a no-op implementation.
*
* In addition, even when the "gobblin-troubleshooter" module is present, troubleshooter can still be disabled
* with {@link ConfigurationKeys.TROUBLESHOOTER_DISABLED} setting.
* with {@link ConfigurationKeys#TROUBLESHOOTER_DISABLED} setting.
* */
public static AutomaticTroubleshooter createForJob(Config config) {
AutomaticTroubleshooterConfig troubleshooterConfig = new AutomaticTroubleshooterConfig(config);
public static AutomaticTroubleshooter createForJob(Properties properties) {
AutomaticTroubleshooterConfig troubleshooterConfig = new AutomaticTroubleshooterConfig(properties);

Class troubleshooterClass = tryGetTroubleshooterClass();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.testng.annotations.Test;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;

import static org.junit.Assert.assertTrue;

Expand All @@ -33,9 +32,7 @@ public class AutomaticTroubleshooterFactoryTest {
public void willGetNoopTroubleshooterByDefault() {
// This test project does not reference gobblin-troubleshooter module, so we should get a noop-instance
// of troubleshooter. See the main AutomaticTroubleshooterFactory class for details.
Properties properties = new Properties();
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(new Properties());

assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter);
}
Expand All @@ -44,8 +41,7 @@ public void willGetNoopTroubleshooterByDefault() {
public void willGetNoopTroubleshooterWhenDisabled() {
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true");
AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(properties);

assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import io.temporal.failure.ApplicationFailure;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
Expand All @@ -54,16 +54,15 @@
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
Expand All @@ -87,7 +86,7 @@ public CommitStats commit(WUProcessingSpec workSpec) {
JobState jobState = Help.loadJobState(workSpec, fs);
optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, numDeserializationThreads);
if (taskStates.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.google.api.client.util.Lists;
import com.google.common.io.Closer;

import io.temporal.failure.ApplicationFailure;

import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
Expand All @@ -45,10 +44,9 @@
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


Expand All @@ -70,7 +68,7 @@ public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSub
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));

AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
AutomaticTroubleshooter troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();
try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first pre-check that the FS is available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;


Expand All @@ -76,7 +75,7 @@ public int processWorkUnit(WorkUnitClaimCheck wu) {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
log.info("{} - loaded; found {} workUnits", correlator, workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
troubleshooter = AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
return execute(workUnits, wu, jobState, fs, troubleshooter.getIssueRepository());
} catch (IOException | InterruptedException e) {
Expand Down

0 comments on commit 2e97f56

Please sign in to comment.