Skip to content

Commit

Permalink
[GOBBLIN-2015] implement LaunchDagProc to handle new dag launches (#3893
Browse files Browse the repository at this point in the history
)

* add launch dag proc
* add unit test
* address review comments
  • Loading branch information
arjun4084346 authored Mar 13, 2024
1 parent 3a54961 commit 2bdf7eb
Show file tree
Hide file tree
Showing 16 changed files with 341 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ public boolean isLeader() {
return !helixManager.isPresent() || helixManager.get().isLeader();
}


private FileSystem buildFileSystem(Config config)
throws IOException {
return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
Expand All @@ -291,8 +290,6 @@ private Path getServiceWorkDirPath(FileSystem fs, String serviceName, String ser
return new Path(fs.getHomeDirectory(), serviceName + Path.SEPARATOR + serviceId);
}



/**
* Handle leadership change.
* @param changeContext notification context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,9 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
* Returns true if successfully reduces the quota usage
*/
boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;

/**
* Returns a {@link DagManagerMetrics} that monitors dags execution.
*/
DagManagerMetrics getDagManagerMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream {
private final Config config;
@Getter private final EventSubmitter eventSubmitter;
@Getter private static final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();

@Inject(optional=true)
protected Optional<DagActionStore> dagActionStore;
Expand All @@ -64,7 +63,6 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
this.dagActionStore = dagActionStore;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build();
dagManagerMetrics.activate();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,8 +932,6 @@ private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws Execution
return false;
}



/**
* Submit next set of Dag nodes in the Dag identified by the provided dagId
* @param dagId The dagId that should be processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.GobblinMetricsKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
Expand Down Expand Up @@ -86,7 +86,7 @@ public DagManagerMetrics() {
// Create a new metric context for the DagManagerMetrics tagged appropriately
List<Tag<?>> tags = new ArrayList<>();
tags.add(new Tag<>(MetricTagNames.METRIC_BACKEND_REPRESENTATION, GobblinMetrics.MetricType.COUNTER));
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), this.getClass(), tags);
this.metricContext = Instrumented.getMetricContext(new State(), this.getClass(), tags);
}

public void activate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.gobblin.service.modules.orchestration;

import com.google.inject.Inject;
import com.google.inject.Singleton;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;


/**
Expand All @@ -35,9 +37,17 @@
@Alpha
@Singleton
public class DagProcFactory implements DagTaskVisitor<DagProc> {

private final FlowCompilationValidationHelper flowCompilationValidationHelper;

@Inject
public DagProcFactory(FlowCompilationValidationHelper flowCompilationValidationHelper) {
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}

@Override
public LaunchDagProc meet(LaunchDagTask launchDagTask) {
return new LaunchDagProc(launchDagTask);
return new LaunchDagProc(launchDagTask, this.flowCompilationValidationHelper);
}
//todo - overload meet method for other dag tasks
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while processing dag " + dagTask.getDagId(), e);
DagManagementTaskStreamImpl.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
// todo mark lease success and releases it
//dagTaskStream.complete(dagTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.inject.Inject;
import com.typesafe.config.Config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.api.FlowSpec;
Expand Down Expand Up @@ -68,16 +69,19 @@ public class MostlyMySqlDagManagementStateStore implements DagManagementStateSto
private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
FlowCatalog flowCatalog;
@Getter
private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics();

@Inject
public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager) {
this.quotaManager = userQuotaManager;
this.config = config;
this.flowCatalog = flowCatalog;
this.dagManagerMetrics.activate();
}

// It should be called after topology spec map is set
public synchronized void start() throws IOException {
private synchronized void start() throws IOException {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;


Expand All @@ -46,8 +47,11 @@ public final void process(DagManagementStateStore dagManagementStateStore) throw
T result = act(dagManagementStateStore, state); // todo - retry
commit(dagManagementStateStore, result); // todo - retry
sendNotification(result, eventSubmitter); // todo - retry
log.info("{} successfully concluded actions for dagId : {}", getClass().getSimpleName(), getDagId());
}

protected abstract DagManager.DagId getDagId();

protected abstract S initialize(DagManagementStateStore dagManagementStateStore) throws IOException;

protected abstract T act(DagManagementStateStore dagManagementStateStore, S state) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,181 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.collect.Maps;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;


/**
* An implementation for {@link LaunchDagTask}
* An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
@Alpha
@RequiredArgsConstructor
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, Optional<Dag<JobExecutionPlan>>> {
private final LaunchDagTask launchDagTask;
private final AtomicLong orchestrationDelayCounter;

public LaunchDagProc(LaunchDagTask launchDagTask) {
this.launchDagTask = launchDagTask;
this.orchestrationDelayCounter = new AtomicLong(0);
ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge
(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, orchestrationDelayCounter::get);
metricContext.register(orchestrationDelayMetric);
private final FlowCompilationValidationHelper flowCompilationValidationHelper;
// todo - this is not orchestration delay and should be renamed. keeping it the same because DagManager is also using
// the same name
private static final AtomicLong orchestrationDelayCounter = new AtomicLong(0);
static {
metricContext.register(
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, orchestrationDelayCounter::get));
}

@Override
protected DagManager.DagId getDagId() {
return this.launchDagTask.getDagId();
}

@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented");
try {
DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId());
return this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
} catch (URISyntaxException | SpecNotFoundException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private FlowSpec loadFlowSpec(DagManagementStateStore dagManagementStateStore, DagActionStore.DagAction dagAction)
throws URISyntaxException, SpecNotFoundException {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(dagAction.getFlowId());
return dagManagementStateStore.getFlowSpec(flowUri);
}

@Override
protected Optional<Dag<JobExecutionPlan>> act(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented");
if (!dag.isPresent()) {
log.warn("Dag with id " + getDagId() + " could not be compiled.");
// todo - add metrics
return Optional.empty();
}
submitNextNodes(dagManagementStateStore, dag.get());
orchestrationDelayCounter.set(System.currentTimeMillis() - DagManagerUtils.getFlowExecId(dag.get()));
return dag;
}

@Override
protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, EventSubmitter eventSubmitter)
throws IOException {
throw new UnsupportedOperationException("Not yet implemented");
/**
* Submit next set of Dag nodes in the provided Dag.
*/
private void submitNextNodes(DagManagementStateStore dagManagementStateStore,
Dag<JobExecutionPlan> dag) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagManagerUtils.getNext(dag);

if (nextNodes.size() > 1) {
handleMultipleJobs(nextNodes);
}

//Submit jobs from the dag ready for execution.
for (Dag.DagNode<JobExecutionPlan> dagNode : nextNodes) {
submitJobToExecutor(dagManagementStateStore, dagNode);
dagManagementStateStore.addDagNodeState(dagNode, getDagId());
log.info("Submitted job {} for dagId {}", DagManagerUtils.getJobName(dagNode), getDagId());
}

//Checkpoint the dag state, it should have an updated value of dag nodes
dagManagementStateStore.checkpointDag(dag);
}

private void handleMultipleJobs(Set<Dag.DagNode<JobExecutionPlan>> nextNodes) {
throw new UnsupportedOperationException("More than one start job is not allowed");
}

/**
* Submits a {@link JobSpec} to a {@link SpecExecutor}.
*/
private void submitJobToExecutor(DagManagementStateStore dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode) {
DagManagerUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);

String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);

// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED);

// Increment job count before submitting the job onto the spec producer, in case that throws an exception.
// By this point the quota is allocated, so it's imperative to increment as missing would introduce the potential to decrement below zero upon quota release.
// Quota release is guaranteed, despite failure, because exception handling within would mark the job FAILED.
// When the ensuing kafka message spurs DagManager processing, the quota is released and the counts decremented
// Ensure that we do not double increment for flows that are retried
if (DagManagerUtils.getJobExecutionPlan(dagNode).getCurrentAttempts() == 1) {
dagManagementStateStore.getDagManagerMetrics().incrementRunningJobMetrics(dagNode);
}
// Submit the job to the SpecProducer, which in turn performs the actual job submission to the SpecExecutor instance.
// The SpecProducer implementations submit the job to the underlying executor and return when the submission is complete,
// either successfully or unsuccessfully. To catch any exceptions in the job submission, the DagManagerThread
// blocks (by calling Future#get()) until the submission is completed.
dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode));
Future<?> addSpecFuture = producer.addSpec(jobSpec);
// todo - we should add future.get() instead of the complete future into the JobExecutionPlan
dagNode.getValue().setJobFuture(com.google.common.base.Optional.of(addSpecFuture));
addSpecFuture.get();
jobExecutionPlan.setExecutionStatus(ExecutionStatus.ORCHESTRATED);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));
// Add serialized job properties as part of the orchestrated job event metadata
jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, dagNode.getValue().toString());
jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}", DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
dagManagementStateStore.releaseQuota(dagNode);
} catch (IOException ex) {
log.error("Could not release quota while handling e", ex);
}
throw new RuntimeException(e);
}
}

@Override
protected void commit(DagManagementStateStore dagManagementStateStore, Optional<Dag<JobExecutionPlan>> dag) {
throw new UnsupportedOperationException("Not yet implemented");
protected void sendNotification(Optional<Dag<JobExecutionPlan>> result, EventSubmitter eventSubmitter) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
*/
@Slf4j
@Data
public final class FlowCompilationValidationHelper {
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setUp() throws Exception {
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config, Optional.empty());
this.dagProcFactory = new DagProcFactory();
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
this.dagManagementTaskStream, this.dagProcFactory, dagManagementStateStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String fl
return buildDag(id, flowExecutionId, flowFailureOption, numNodes, "testUser", ConfigFactory.empty());
}

static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes, String proxyUser, Config additionalConfig)
public static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes, String proxyUser, Config additionalConfig)
throws URISyntaxException {
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();

Expand Down
Loading

0 comments on commit 2bdf7eb

Please sign in to comment.