Skip to content

Commit

Permalink
[GOBBLIN-2074] add dag action store inside DagManagementStateStore (#…
Browse files Browse the repository at this point in the history
…3954)

* remove unused fields/APIs
* add dag action store inside DMSS
* address review comment
  • Loading branch information
arjun4084346 authored May 29, 2024
1 parent ea1233a commit ac653fb
Show file tree
Hide file tree
Showing 32 changed files with 231 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
Expand Down Expand Up @@ -85,13 +85,13 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads,
boolean isMultiActiveSchedulerEnabled) {
this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class));
}

public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled,
DagActionStore dagActionStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
DagManagementStateStore dagManagementStateStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) {
super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
dagActionStore, isMultiActiveSchedulerEnabled);
dagManagementStateStore, isMultiActiveSchedulerEnabled);
}

protected void processMessageForTest(DecodeableKafkaRecord record) {
Expand Down Expand Up @@ -238,8 +238,7 @@ public void testStartupSequenceHandlesFailures() throws Exception {
String jobName = "testJobName";
String flowExecutionId = "12345677";

MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH);
DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);

Config monitorConfig = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
Expand All @@ -251,7 +250,7 @@ public void testStartupSequenceHandlesFailures() throws Exception {
// Throw an uncaught exception during startup sequence
when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new RuntimeException("Uncaught exception"));
mockDagActionStoreChangeMonitor = new MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
true, mysqlDagActionStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
true, dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator);
try {
mockDagActionStoreChangeMonitor.setActive();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@

import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;

import lombok.Getter;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand All @@ -75,8 +74,7 @@
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
Expand All @@ -103,7 +101,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
private MetricContext context;
private KafkaAvroEventKeyValueReporter.Builder<?> builder;
private MysqlDagActionStore mysqlDagActionStore;
private DagManagementStateStore dagManagementStateStore;
private final MockedStatic<GobblinServiceManager> mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class);

@BeforeClass
Expand All @@ -123,9 +121,8 @@ public void setUp() throws Exception {
builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
builder = builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
this.dagManagementStateStore = mock(DagManagementStateStore.class);
this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class));
this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class));
}

@Test
Expand Down Expand Up @@ -787,7 +784,7 @@ class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor {
public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads,
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, GaaSJobObservabilityEventProducer producer)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, mysqlDagActionStore);
super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, dagManagementStateStore);
shouldThrowFakeExceptionInParseJobStatus = shouldThrowFakeExceptionInParseJobStatusToggle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.net.URI;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -146,12 +147,6 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
*/
List<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId dagId) throws IOException;

/**
* Returns the {@link Dag} the provided {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to
* or Optional.absent if it is not found.
*/
Optional<Dag<JobExecutionPlan>> getParentDag(Dag.DagNode<JobExecutionPlan> dagNode);

/**
* Deletes the dag node state that was added through {@link DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)}
* No-op if the dag node is not found in the store.
Expand Down Expand Up @@ -201,4 +196,77 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
* has any running job, false otherwise.
*/
public boolean hasRunningJobs(DagManager.DagId dagId);

/**
* Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name.
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param jobName job name for the dag action
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean existsJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;

/**
* Check if an action exists in dagAction store by flow group, flow name, and flow execution id, it assumes jobName is
* empty ("").
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean existsFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;

/** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} for durability */
default void addDagAction(DagActionStore.DagAction dagAction) throws IOException {
addJobDagAction(
dagAction.getFlowGroup(),
dagAction.getFlowName(),
dagAction.getFlowExecutionId(),
dagAction.getJobName(),
dagAction.getDagActionType());
}

/**
* Persist the dag action in {@link DagActionStore} for durability
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param jobName job name for the dag action
* @param dagActionType the value of the dag action
* @throws IOException
*/
void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException;

/**
* Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName.
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
* @param dagActionType the value of the dag action
* @throws IOException
*/
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException {
addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
}

/**
* delete the dag action from {@link DagActionStore}
* @param dagAction containing all information needed to identify dag and specific action value
* @throws IOException
* @return true if we successfully delete one record, return false if the record does not exist
*/
boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException;

/***
* Get all {@link DagActionStore.DagAction}s from the {@link DagActionStore}.
* @throws IOException Exception in retrieving {@link DagActionStore.DagAction}s.
*/
Collection<DagActionStore.DagAction> getDagActions() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream {
private final Config config;
@Getter private final EventSubmitter eventSubmitter;

@Inject(optional=true)
protected Optional<DagActionStore> dagActionStore;
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
private final boolean isMultiActiveExecutionEnabled;
Expand All @@ -102,7 +99,6 @@ public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore> dagAc
throw new RuntimeException(String.format("DagProcessingEngine requires %s to be instantiated.",
DagActionReminderScheduler.class.getSimpleName()));
}
this.dagActionStore = dagActionStore;
this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
this.dagActionReminderScheduler = dagActionReminderScheduler;
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
Expand Down Expand Up @@ -214,17 +210,17 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt

switch (dagActionType) {
case ENFORCE_FLOW_FINISH_DEADLINE:
return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
case ENFORCE_JOB_START_DEADLINE:
return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
case KILL:
return new KillDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new KillDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
case LAUNCH:
return new LaunchDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new LaunchDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
case REEVALUATE:
return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
case RESUME:
return new ResumeDagTask(dagAction, leaseObtainedStatus, dagActionStore.get());
return new ResumeDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore);
default:
throw new UnsupportedOperationException(dagActionType + " not yet implemented");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
@Slf4j
public class FlowLaunchHandler {
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
private DagActionStore dagActionStore;
private DagManagementStateStore dagManagementStateStore;
private final MetricContext metricContext;
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
Expand All @@ -81,13 +81,13 @@ public class FlowLaunchHandler {
@Inject
public FlowLaunchHandler(Config config,
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter leaseArbiter,
SchedulerService schedulerService, com.google.common.base.Optional<DagActionStore> optDagActionStore) {
SchedulerService schedulerService, com.google.common.base.Optional<DagManagementStateStore> dagManagementStateStoreOpt) {
this.multiActiveLeaseArbiter = leaseArbiter;

if (!optDagActionStore.isPresent()) {
if (!dagManagementStateStoreOpt.isPresent()) {
throw new RuntimeException("DagActionStore MUST be present for flow launch handling!");
}
this.dagActionStore = optDagActionStore.get();
this.dagManagementStateStore = dagManagementStateStoreOpt.get();

this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
Expand Down Expand Up @@ -140,8 +140,8 @@ private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> calcLeasedToAnotherSt
private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) {
DagActionStore.DagAction launchDagAction = leaseStatus.getConsensusDagAction();
try {
this.dagActionStore.addDagAction(launchDagAction);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction);
this.dagManagementStateStore.addDagAction(launchDagAction);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, launchDagAction);
this.numFlowsSubmitted.mark();
// after successfully persisting, close the lease
return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
Expand Down
Loading

0 comments on commit ac653fb

Please sign in to comment.