Skip to content

Commit

Permalink
[GOBBLIN-2013] update guice initialization for 'DagProcEngine enabled…
Browse files Browse the repository at this point in the history
…' and related classes (#3892)

* update guice module
* address review comment
  • Loading branch information
arjun4084346 authored Mar 12, 2024
1 parent 02eca4f commit 2217205
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.gobblin.runtime.util;

import org.apache.gobblin.service.ServiceConfigKeys;


/**
* These names are used for dependency injection, when we need to inject different instances of the same type,
* or inject constants.
Expand All @@ -29,4 +32,5 @@ public final class InjectionNames {
// TODO: Rename `warm_standby_enabled` config to `message_forwarding_enabled` since it's a misnomer.
public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
public static final String MULTI_ACTIVE_SCHEDULER_ENABLED = "multiActiveSchedulerEnabled";
public static final String DAG_PROC_ENGINE_ENABLED = ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED;
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean onlyAnnounceLeader;

@Getter
private final boolean isDagProcessingEngineEnabled;

@Getter
private final Config innerConfig;

Expand Down Expand Up @@ -114,5 +117,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
this.isTopologySpecFactoryEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
this.onlyAnnounceLeader = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER, false);
this.isDagProcessingEngineEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementTaskStreamImpl;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagProcFactory;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
Expand All @@ -79,10 +86,12 @@
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
Expand Down Expand Up @@ -154,6 +163,10 @@ public void configure(Binder binder) {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
.to(serviceConfig.isMultiActiveSchedulerEnabled());
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.DAG_PROC_ENGINE_ENABLED))
.to(serviceConfig.isDagProcessingEngineEnabled());

OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
Expand All @@ -173,6 +186,12 @@ public void configure(Binder binder) {
binder.bind(FlowTriggerHandler.class);
}

binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
binder.bind(DagProcFactory.class);
binder.bind(DagProcessingEngine.class);

binder.bind(FlowConfigsResource.class);
binder.bind(FlowConfigsV2Resource.class);
binder.bind(FlowStatusResource.class);
Expand All @@ -188,6 +207,7 @@ public void configure(Binder binder) {
.to(NoopRequesterService.class);

binder.bind(SharedFlowMetricsSingleton.class);
binder.bind(FlowCompilationValidationHelper.class);

OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
binder.bind(TopologyCatalog.class);
Expand Down Expand Up @@ -248,7 +268,12 @@ public void configure(Binder binder) {

if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
if (serviceConfig.isDagProcessingEngineEnabled()) {
binder.bind(DagActionStoreChangeMonitor.class)
.toProvider(DagManagementDagActionStoreChangeMonitorFactory.class).in(Singleton.class);
} else {
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
}
}

binder.bind(GobblinServiceManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public class MostlyMySqlDagManagementStateStore implements DagManagementStateSto
FlowCatalog flowCatalog;

@Inject
public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog) throws IOException {
this.quotaManager = new MysqlUserQuotaManager(config);
public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,13 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;

private final ClassAliasResolver<SpecCompiler> aliasResolver;

@Inject
public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager dagManager,
Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<FlowCatalog> flowCatalog) {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<FlowCatalog> flowCatalog,
DagManagementStateStore dagManagementStateStore, FlowCompilationValidationHelper flowCompilationValidationHelper) throws IOException {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
ClassAliasResolver<SpecCompiler> aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
Expand All @@ -132,14 +131,15 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager d
}
_log.info("Using specCompiler class name/alias " + specCompilerClassName);

this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(specCompilerClassName)), config);
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
}

//At this point, the TopologySpecMap is initialized by the SpecCompiler. Pass the TopologySpecMap to the DagManager.
this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
((MostlyMySqlDagManagementStateStore) dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());

this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
this.flowOrchestrationSuccessFulMeter = this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER);
Expand All @@ -153,8 +153,7 @@ public Orchestrator(Config config, TopologyCatalog topologyCatalog, DagManager d
quotaManager = GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
this.flowCompilationValidationHelper = new FlowCompilationValidationHelper(sharedFlowMetricsSingleton, specCompiler,
quotaManager, eventSubmitter, flowStatusGenerator, isFlowConcurrencyEnabled);
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, Opti
public LaunchDagProc(LaunchDagTask launchDagTask) {
this.launchDagTask = launchDagTask;
this.orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric = this.metricContext.newContextAwareGauge
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, orchestrationDelayCounter::get);
this.metricContext.register(orchestrationDelayMetric);
metricContext.register(orchestrationDelayMetric);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -120,7 +122,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
protected final Map<String, FlowSpec> scheduledFlowSpecs;
@Getter
protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
protected volatile int loadSpecsBatchSize = -1;
Expand Down Expand Up @@ -214,11 +216,12 @@ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusG
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, TopologyCatalog topologyCatalog,
DagManager dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional <FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, DagManagementStateStore dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper)
throws Exception {
this(serviceName, config, helixManager, flowCatalog,
new Orchestrator(config, topologyCatalog, dagManager, log, flowStatusGenerator, flowTriggerHandler,
sharedFlowMetricsSingleton, flowCatalog),
sharedFlowMetricsSingleton, flowCatalog, dagManagementStateStore, flowCompilationValidationHelper),
schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerHandler);
}

Expand Down Expand Up @@ -502,7 +505,7 @@ public static long utcDateAsUTCEpochMillis(Date date) {
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
FlowSpec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
Expand Down Expand Up @@ -597,7 +600,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}

// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);

if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,33 @@

package org.apache.gobblin.service.modules.utils;

import com.google.common.base.Optional;
import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;

import org.apache.commons.lang3.reflect.ConstructorUtils;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.typesafe.config.Config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;


Expand All @@ -59,6 +69,27 @@ public final class FlowCompilationValidationHelper {
private final FlowStatusGenerator flowStatusGenerator;
private final boolean isFlowConcurrencyEnabled;

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
try {
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
}
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.quotaManager = userQuotaManager;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build();
this.flowStatusGenerator = flowStatusGenerator;
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}

/**
* For a given a flowSpec, verifies that an execution is allowed (in case there is an ongoing execution) and the
* flowspec can be compiled. If the pre-conditions hold, then a JobExecutionPlan is constructed and returned to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* A factory implementation that returns a {@link DagManagementDagActionStoreChangeMonitor} instance.
*/
@Slf4j
public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements Provider<DagActionStoreChangeMonitor> {
public class DagManagementDagActionStoreChangeMonitorFactory implements Provider<DagActionStoreChangeMonitor> {
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";

private final Config config;
Expand All @@ -50,7 +50,7 @@ public class DagProcEngineEnabledDagActionStoreChangeMonitorFactory implements P
private final DagManagement dagManagement;

@Inject
public DagProcEngineEnabledDagActionStoreChangeMonitorFactory(Config config, DagManager dagManager, FlowCatalog flowCatalog,
public DagManagementDagActionStoreChangeMonitorFactory(Config config, DagManager dagManager, FlowCatalog flowCatalog,
Orchestrator orchestrator, DagActionStore dagActionStore, DagManagement dagManagement,
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.MysqlDagStateStore;
import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
Expand Down Expand Up @@ -161,6 +162,7 @@ public void setup() throws Exception {
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR);
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_STORE_CLASS_KEY, "org.apache.gobblin.runtime.spec_store.MysqlSpecStore");
serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "flow_spec_store");
serviceCoreProperties.put(MysqlDagStateStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag_store");
serviceCoreProperties.put(FlowCatalog.FLOWSPEC_SERDE_CLASS_KEY, "org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe");

serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME);
Expand Down
Loading

0 comments on commit 2217205

Please sign in to comment.