diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java index 54bc6ace5ea..9031f0031e0 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java @@ -41,8 +41,8 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagManager; -import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent; import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor; @@ -85,13 +85,13 @@ class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor { public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled) { - this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class)); + this(topic, config, numThreads, isMultiActiveSchedulerEnabled, mock(DagManagementStateStore.class), mock(DagManager.class), mock(FlowCatalog.class), mock(Orchestrator.class)); } public MockDagActionStoreChangeMonitor(String topic, Config config, int numThreads, boolean isMultiActiveSchedulerEnabled, - DagActionStore dagActionStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) { + DagManagementStateStore dagManagementStateStore, DagManager dagManager, FlowCatalog flowCatalog, Orchestrator orchestrator) { super(topic, config, dagManager, numThreads, flowCatalog, orchestrator, - dagActionStore, isMultiActiveSchedulerEnabled); + dagManagementStateStore, isMultiActiveSchedulerEnabled); } protected void processMessageForTest(DecodeableKafkaRecord record) { @@ -238,8 +238,7 @@ public void testStartupSequenceHandlesFailures() throws Exception { String jobName = "testJobName"; String flowExecutionId = "12345677"; - MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config); - mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.LAUNCH); + DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class); Config monitorConfig = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000")) .withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer")) @@ -251,7 +250,7 @@ public void testStartupSequenceHandlesFailures() throws Exception { // Throw an uncaught exception during startup sequence when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new RuntimeException("Uncaught exception")); mockDagActionStoreChangeMonitor = new MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5, - true, mysqlDagActionStore, mockDagManager, mockFlowCatalog, mockOrchestrator); + true, dagManagementStateStore, mockDagManager, mockFlowCatalog, mockOrchestrator); try { mockDagActionStoreChangeMonitor.setActive(); } catch (Exception e) { diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java index 1d1520d5dc2..337e6e33e14 100644 --- a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java @@ -51,7 +51,6 @@ import kafka.consumer.ConsumerIterator; import kafka.message.MessageAndMetadata; - import lombok.Getter; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -75,8 +74,7 @@ import org.apache.gobblin.service.ExecutionStatus; import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler; -import org.apache.gobblin.service.modules.orchestration.DagActionStore; -import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.monitoring.GaaSJobObservabilityEventProducer; import org.apache.gobblin.service.monitoring.JobStatusRetriever; import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor; @@ -103,7 +101,7 @@ public class KafkaAvroJobStatusMonitorTest { private String stateStoreDir = "/tmp/jobStatusMonitor/statestore"; private MetricContext context; private KafkaAvroEventKeyValueReporter.Builder builder; - private MysqlDagActionStore mysqlDagActionStore; + private DagManagementStateStore dagManagementStateStore; private final MockedStatic mockedGobblinServiceManager = Mockito.mockStatic(GobblinServiceManager.class); @BeforeClass @@ -123,9 +121,8 @@ public void setUp() throws Exception { builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context); builder = builder.withKafkaPusher(pusher).withKeys(Lists.newArrayList(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)); - this.mysqlDagActionStore = mock(MysqlDagActionStore.class); + this.dagManagementStateStore = mock(DagManagementStateStore.class); this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(mock(DagActionReminderScheduler.class)); - this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class)); } @Test @@ -787,7 +784,7 @@ class MockKafkaAvroJobStatusMonitor extends KafkaAvroJobStatusMonitor { public MockKafkaAvroJobStatusMonitor(String topic, Config config, int numThreads, AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle, GaaSJobObservabilityEventProducer producer) throws IOException, ReflectiveOperationException { - super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, mysqlDagActionStore); + super(topic, config, numThreads, mock(JobIssueEventHandler.class), producer, dagManagementStateStore); shouldThrowFakeExceptionInParseJobStatus = shouldThrowFakeExceptionInParseJobStatusToggle; } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index b56459da72b..6a6140d805f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URI; +import java.sql.SQLException; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -146,12 +147,6 @@ default void deleteFailedDag(Dag dag) throws IOException { */ List> getDagNodes(DagManager.DagId dagId) throws IOException; - /** - * Returns the {@link Dag} the provided {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} belongs to - * or Optional.absent if it is not found. - */ - Optional> getParentDag(Dag.DagNode dagNode); - /** * Deletes the dag node state that was added through {@link DagManagementStateStore#addDagNodeState(Dag.DagNode, DagManager.DagId)} * No-op if the dag node is not found in the store. @@ -201,4 +196,77 @@ default void deleteFailedDag(Dag dag) throws IOException { * has any running job, false otherwise. */ public boolean hasRunningJobs(DagManager.DagId dagId); + + /** + * Check if an action exists in dagAction store by flow group, flow name, flow execution id, and job name. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param jobName job name for the dag action + * @param dagActionType the value of the dag action + * @throws IOException + */ + boolean existsJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionStore.DagActionType dagActionType) throws IOException, SQLException; + + /** + * Check if an action exists in dagAction store by flow group, flow name, and flow execution id, it assumes jobName is + * empty (""). + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param dagActionType the value of the dag action + * @throws IOException + */ + boolean existsFlowDagAction(String flowGroup, String flowName, String flowExecutionId, + DagActionStore.DagActionType dagActionType) throws IOException, SQLException; + + /** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} for durability */ + default void addDagAction(DagActionStore.DagAction dagAction) throws IOException { + addJobDagAction( + dagAction.getFlowGroup(), + dagAction.getFlowName(), + dagAction.getFlowExecutionId(), + dagAction.getJobName(), + dagAction.getDagActionType()); + } + + /** + * Persist the dag action in {@link DagActionStore} for durability + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param jobName job name for the dag action + * @param dagActionType the value of the dag action + * @throws IOException + */ + void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionStore.DagActionType dagActionType) throws IOException; + + /** + * Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param dagActionType the value of the dag action + * @throws IOException + */ + default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, + DagActionStore.DagActionType dagActionType) throws IOException { + addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType)); + } + + /** + * delete the dag action from {@link DagActionStore} + * @param dagAction containing all information needed to identify dag and specific action value + * @throws IOException + * @return true if we successfully delete one record, return false if the record does not exist + */ + boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException; + + /*** + * Get all {@link DagActionStore.DagAction}s from the {@link DagActionStore}. + * @throws IOException Exception in retrieving {@link DagActionStore.DagAction}s. + */ + Collection getDagActions() throws IOException; } \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java index 1d32cde3998..0d4a229d044 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java @@ -76,9 +76,6 @@ public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream { private final Config config; @Getter private final EventSubmitter eventSubmitter; - - @Inject(optional=true) - protected Optional dagActionStore; protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter; protected Optional dagActionReminderScheduler; private final boolean isMultiActiveExecutionEnabled; @@ -102,7 +99,6 @@ public DagManagementTaskStreamImpl(Config config, Optional dagAc throw new RuntimeException(String.format("DagProcessingEngine requires %s to be instantiated.", DagActionReminderScheduler.class.getSimpleName())); } - this.dagActionStore = dagActionStore; this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter; this.dagActionReminderScheduler = dagActionReminderScheduler; this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled; @@ -214,17 +210,17 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, LeaseAttemptSt switch (dagActionType) { case ENFORCE_FLOW_FINISH_DEADLINE: - return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new EnforceFlowFinishDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); case ENFORCE_JOB_START_DEADLINE: - return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new EnforceJobStartDeadlineDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); case KILL: - return new KillDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new KillDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); case LAUNCH: - return new LaunchDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new LaunchDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); case REEVALUATE: - return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new ReevaluateDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); case RESUME: - return new ResumeDagTask(dagAction, leaseObtainedStatus, dagActionStore.get()); + return new ResumeDagTask(dagAction, leaseObtainedStatus, dagManagementStateStore); default: throw new UnsupportedOperationException(dagActionType + " not yet implemented"); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java index bb4eda41628..864fae944ad 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java @@ -69,7 +69,7 @@ @Slf4j public class FlowLaunchHandler { private final MultiActiveLeaseArbiter multiActiveLeaseArbiter; - private DagActionStore dagActionStore; + private DagManagementStateStore dagManagementStateStore; private final MetricContext metricContext; private final int schedulerMaxBackoffMillis; private static Random random = new Random(); @@ -81,13 +81,13 @@ public class FlowLaunchHandler { @Inject public FlowLaunchHandler(Config config, @Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) MultiActiveLeaseArbiter leaseArbiter, - SchedulerService schedulerService, com.google.common.base.Optional optDagActionStore) { + SchedulerService schedulerService, com.google.common.base.Optional dagManagementStateStoreOpt) { this.multiActiveLeaseArbiter = leaseArbiter; - if (!optDagActionStore.isPresent()) { + if (!dagManagementStateStoreOpt.isPresent()) { throw new RuntimeException("DagActionStore MUST be present for flow launch handling!"); } - this.dagActionStore = optDagActionStore.get(); + this.dagManagementStateStore = dagManagementStateStoreOpt.get(); this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY, ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS); @@ -140,8 +140,8 @@ private Optional calcLeasedToAnotherSt private boolean persistLaunchDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) { DagActionStore.DagAction launchDagAction = leaseStatus.getConsensusDagAction(); try { - this.dagActionStore.addDagAction(launchDagAction); - DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(launchDagAction); + this.dagManagementStateStore.addDagAction(launchDagAction); + DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, launchDagAction); this.numFlowsSubmitted.mark(); // after successfully persisting, close the lease return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java index 7f949876428..da50c50553b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.URI; +import java.sql.SQLException; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -63,11 +64,9 @@ @Slf4j @Singleton public class MostlyMySqlDagManagementStateStore implements DagManagementStateStore { - private final Map, Dag> jobToDag = new ConcurrentHashMap<>(); private final Map> dagNodes = new ConcurrentHashMap<>(); // dagToJobs holds a map of dagId to running jobs of that dag private final Map>> dagToJobs = new ConcurrentHashMap<>(); - private final Map dagToDeadline = new ConcurrentHashMap<>(); private DagStateStore dagStateStore; private DagStateStore failedDagStateStore; private JobStatusRetriever jobStatusRetriever; @@ -80,19 +79,21 @@ public class MostlyMySqlDagManagementStateStore implements DagManagementStateSto FlowCatalog flowCatalog; @Getter private final DagManagerMetrics dagManagerMetrics = new DagManagerMetrics(); + private final DagActionStore dagActionStore; @Inject public MostlyMySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, UserQuotaManager userQuotaManager, - JobStatusRetriever jobStatusRetriever) { + JobStatusRetriever jobStatusRetriever, DagActionStore dagActionStore) { this.quotaManager = userQuotaManager; this.config = config; this.flowCatalog = flowCatalog; this.jobStatusRetriever = jobStatusRetriever; this.dagManagerMetrics.activate(); + this.dagActionStore = dagActionStore; } // It should be called after topology spec map is set - private synchronized void start() throws IOException { + private synchronized void start() { if (!dagStoresInitialized) { this.dagStateStore = createDagStateStore(config, topologySpecMap); this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), @@ -180,9 +181,7 @@ public Set getFailedDagIds() throws IOException { @Override // todo - updating different maps here and in addDagNodeState can result in inconsistency between the maps public synchronized void deleteDagNodeState(DagManager.DagId dagId, Dag.DagNode dagNode) { - this.jobToDag.remove(dagNode); this.dagNodes.remove(dagNode.getValue().getId()); - this.dagToDeadline.remove(dagId); if (this.dagToJobs.containsKey(dagId)) { this.dagToJobs.get(dagId).remove(dagNode); if (this.dagToJobs.get(dagId).isEmpty()) { @@ -199,7 +198,6 @@ public synchronized void addDagNodeState(Dag.DagNode dagNode, if (!dag.isPresent()) { throw new RuntimeException("Dag " + dagId + " not found"); } - this.jobToDag.put(dagNode, dag.get()); this.dagNodes.put(dagNode.getValue().getId(), dagNode); if (!this.dagToJobs.containsKey(dagId)) { this.dagToJobs.put(dagId, Lists.newLinkedList()); @@ -227,11 +225,6 @@ public Pair>, Optional> getDag } } - @Override - public Optional> getParentDag(Dag.DagNode dagNode) { - return Optional.of(this.jobToDag.get(dagNode)); - } - @Override public List> getDagNodes(DagManager.DagId dagId) { List> dagNodes = this.dagToJobs.get(dagId); @@ -275,4 +268,32 @@ public Optional getJobStatus(DagNodeId dagNodeId) { public boolean hasRunningJobs(DagManager.DagId dagId) { return !getDagNodes(dagId).isEmpty(); } + + @Override + public boolean existsJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionStore.DagActionType dagActionType) throws IOException, SQLException { + return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, jobName, dagActionType); + } + + @Override + public boolean existsFlowDagAction(String flowGroup, String flowName, String flowExecutionId, + DagActionStore.DagActionType dagActionType) throws IOException, SQLException { + return this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, dagActionType); + } + + @Override + public void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionStore.DagActionType dagActionType) throws IOException { + this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType); + } + + @Override + public boolean deleteDagAction(DagActionStore.DagAction dagAction) throws IOException { + return this.dagActionStore.deleteDagAction(dagAction); + } + + @Override + public Collection getDagActions() throws IOException { + return this.dagActionStore.getDagActions(); + } } \ No newline at end of file diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java index 5396b0502e1..41615c79bc0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java @@ -37,7 +37,6 @@ import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.service.ExecutionStatus; -import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.orchestration.DagActionStore; import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; @@ -55,8 +54,6 @@ */ @Slf4j public class DagProcUtils { - private static final DagActionStore dagActionStore = GobblinServiceManager.getClass(DagActionStore.class); - /** * - submits a {@link JobSpec} to a {@link SpecExecutor} * - emits a {@link TimingEvent.LauncherTimings#JOB_ORCHESTRATED} {@link org.apache.gobblin.metrics.GobblinTrackingEvent} @@ -94,7 +91,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat // blocks (by calling Future#get()) until the submission is completed. dagManagementStateStore.tryAcquireQuota(Collections.singleton(dagNode)); - sendEnforceJobStartDeadlineDagAction(dagNode); + sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); Future addSpecFuture = producer.addSpec(jobSpec); // todo - we should add future.get() instead of the complete future into the JobExecutionPlan @@ -167,16 +164,16 @@ public static void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) { jobExecutionPlan.setExecutionStatus(CANCELLED); } - private static void sendEnforceJobStartDeadlineDagAction(Dag.DagNode dagNode) + private static void sendEnforceJobStartDeadlineDagAction(DagManagementStateStore dagManagementStateStore, Dag.DagNode dagNode) throws IOException { - dagActionStore.addJobDagAction(dagNode.getValue().getFlowGroup(), dagNode.getValue().getFlowName(), + dagManagementStateStore.addJobDagAction(dagNode.getValue().getFlowGroup(), dagNode.getValue().getFlowName(), String.valueOf(dagNode.getValue().getFlowExecutionId()), dagNode.getValue().getJobName(), DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE); } - public static void sendEnforceFlowFinishDeadlineDagAction(DagActionStore.DagAction launchDagAction) + public static void sendEnforceFlowFinishDeadlineDagAction(DagManagementStateStore dagManagementStateStore, DagActionStore.DagAction launchDagAction) throws IOException { - dagActionStore.addFlowDagAction(launchDagAction.getFlowGroup(), launchDagAction.getFlowName(), + dagManagementStateStore.addFlowDagAction(launchDagAction.getFlowGroup(), launchDagAction.getFlowName(), launchDagAction.getFlowExecutionId(), DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java index 10529b25ed7..bc786b91f9c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java @@ -119,7 +119,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair> nextNodes) { throw new UnsupportedOperationException("More than one start job is not allowed"); } - private void removeFlowFinishDeadlineTriggerAndDagAction() { + private void removeFlowFinishDeadlineTriggerAndDagAction(DagManagementStateStore dagManagementStateStore) { DagActionStore.DagAction enforceFlowFinishDeadlineDagAction = DagActionStore.DagAction.forFlow(getDagNodeId().getFlowGroup(), getDagNodeId().getFlowName(), String.valueOf(getDagNodeId().getFlowExecutionId()), DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE); @@ -206,7 +206,7 @@ private void removeFlowFinishDeadlineTriggerAndDagAction() { try { GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(getDagTask().getDagAction()); - GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceFlowFinishDeadlineDagAction); + dagManagementStateStore.deleteDagAction(enforceFlowFinishDeadlineDagAction); } catch (SchedulerException | IOException e) { log.warn("Failed to unschedule the reminder for {}", enforceFlowFinishDeadlineDagAction); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java index f031134d1c5..b01860a9e24 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java @@ -41,13 +41,13 @@ public abstract class DagTask { @Getter public final DagActionStore.DagAction dagAction; private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus; - private final DagActionStore dagActionStore; + private final DagManagementStateStore dagManagementStateStore; public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { + DagManagementStateStore dagManagementStateStore) { this.dagAction = dagAction; this.leaseObtainedStatus = leaseObtainedStatus; - this.dagActionStore = dagActionStore; + this.dagManagementStateStore = dagManagementStateStore; } public abstract T host(DagTaskVisitor visitor); @@ -59,7 +59,7 @@ public DagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtai */ public final boolean conclude() { try { - this.dagActionStore.deleteDagAction(this.dagAction); + this.dagManagementStateStore.deleteDagAction(this.dagAction); return this.leaseObtainedStatus.completeLease(); } catch (IOException e) { // TODO: Decide appropriate exception to throw and add to the commit method's signature diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java index 5eaf4ffa7f4..bfd709fb9a3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceFlowFinishDeadlineDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -29,8 +30,8 @@ public class EnforceFlowFinishDeadlineDagTask extends DagTask { public EnforceFlowFinishDeadlineDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java index 7ad3867a502..c1c270eb7b7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/EnforceJobStartDeadlineDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -29,8 +30,8 @@ public class EnforceJobStartDeadlineDagTask extends DagTask { public EnforceJobStartDeadlineDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java index 9533a0e694e..3d7315378cb 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/KillDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -28,8 +29,8 @@ public class KillDagTask extends DagTask { public KillDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java index 88a620ddad3..8a726b2dc86 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -28,8 +29,8 @@ public class LaunchDagTask extends DagTask { public LaunchDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java index 7a90c99123d..ea1b36e3414 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ReevaluateDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -28,8 +29,8 @@ public class ReevaluateDagTask extends DagTask { public ReevaluateDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java index b9ff4fb8abc..4e5ed2657db 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/ResumeDagTask.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration.task; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor; import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus; @@ -27,8 +28,8 @@ */ public class ResumeDagTask extends DagTask { public ResumeDagTask(DagActionStore.DagAction dagAction, LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus, - DagActionStore dagActionStore) { - super(dagAction, leaseObtainedStatus, dagActionStore); + DagManagementStateStore dagManagementStateStore) { + super(dagAction, leaseObtainedStatus, dagManagementStateStore); } public T host(DagTaskVisitor visitor) { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java index caee99e592b..1b5e2108399 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java @@ -40,16 +40,18 @@ import org.apache.gobblin.service.FlowStatusId; import org.apache.gobblin.service.modules.core.GobblinServiceManager; import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; + @Slf4j public class GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends GobblinServiceFlowExecutionResourceHandler{ - private DagActionStore dagActionStore; + private DagManagementStateStore dagManagementStateStore; @Inject public GobblinServiceFlowExecutionResourceHandlerWithWarmStandby(FlowExecutionResourceLocalHandler handler, @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus, - Optional manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader, DagActionStore dagActionStore) { + Optional manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader, DagManagementStateStore dagManagementStateStore) { super(handler, eventBus, manager, forceLeader); - this.dagActionStore = dagActionStore; + this.dagManagementStateStore = dagManagementStateStore; } @Override @@ -69,12 +71,12 @@ public UpdateResponse delete(ComplexResourceKey dagActions = null; + Collection dagActions; try { - dagActions = dagActionStore.getDagActions(); + dagActions = dagManagementStateStore.getDagActions(); } catch (IOException e) { throw new RuntimeException(String.format("Unable to retrieve dagActions from the dagActionStore while " + "initializing the %s", DagActionStoreChangeMonitor.class.getCanonicalName()), e); @@ -326,7 +327,7 @@ protected void submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction, } finally { // Delete the dag action regardless of whether it was processed successfully to avoid accumulating failure cases try { - this.dagActionStore.deleteDagAction(dagAction); + this.dagManagementStateStore.deleteDagAction(dagAction); } catch (IOException e) { log.warn("Failed to delete dag action from dagActionStore. dagAction: {}", dagAction); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java index 767f2b76d15..5bac15089b1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java @@ -28,7 +28,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.util.InjectionNames; -import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; @@ -45,18 +45,18 @@ public class DagActionStoreChangeMonitorFactory implements Provider persistJobStatusRetryer; private final GaaSJobObservabilityEventProducer eventProducer; - private final DagActionStore dagActionStore; + private final DagManagementStateStore dagManagementStateStore; private final boolean dagProcEngineEnabled; public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIssueEventHandler jobIssueEventHandler, - GaaSJobObservabilityEventProducer observabilityEventProducer, DagActionStore dagActionStore) + GaaSJobObservabilityEventProducer observabilityEventProducer, DagManagementStateStore dagManagementStateStore) throws ReflectiveOperationException { super(topic, config.withFallback(DEFAULTS), numThreads); String stateStoreFactoryClass = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, FileContextBasedFsStateStoreFactory.class.getName()); @@ -133,7 +134,7 @@ public KafkaJobStatusMonitor(String topic, Config config, int numThreads, JobIss this.scheduledExecutorService = Executors.newScheduledThreadPool(1); this.jobIssueEventHandler = jobIssueEventHandler; - this.dagActionStore = dagActionStore; + this.dagManagementStateStore = dagManagementStateStore; this.dagProcEngineEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false); Config retryerOverridesConfig = config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX) @@ -227,10 +228,10 @@ protected void processMessage(DecodeableKafkaRecord message) { this.eventProducer.emitObservabilityEvent(jobStatus); if (this.dagProcEngineEnabled) { // todo - retried/resumed jobs *may* not be handled here, we may want to create their dag action elsewhere - this.dagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE); + this.dagManagementStateStore.addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE); } } else if (updatedJobStatus.getRight() == NewState.RUNNING) { - removeStartDeadlineTriggerAndDagAction(flowGroup, flowName, flowExecutionId, jobName); + removeStartDeadlineTriggerAndDagAction(dagManagementStateStore, flowGroup, flowName, flowExecutionId, jobName); } // update the state store after adding a dag action to guaranty at-least-once adding of dag action @@ -256,7 +257,7 @@ protected void processMessage(DecodeableKafkaRecord message) { } } - private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName) { + private void removeStartDeadlineTriggerAndDagAction(DagManagementStateStore dagManagementStateStore, String flowGroup, String flowName, String flowExecutionId, String jobName) { DagActionStore.DagAction enforceStartDeadlineDagAction = new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId), jobName, DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE); log.info("Deleting reminder trigger and dag action {}", enforceStartDeadlineDagAction); @@ -264,7 +265,7 @@ private void removeStartDeadlineTriggerAndDagAction(String flowGroup, String flo try { GobblinServiceManager.getClass(DagActionReminderScheduler.class).unscheduleReminderJob(enforceStartDeadlineDagAction); - GobblinServiceManager.getClass(DagActionStore.class).deleteDagAction(enforceStartDeadlineDagAction); + dagManagementStateStore.deleteDagAction(enforceStartDeadlineDagAction); } catch (SchedulerException | IOException e) { log.error("Failed to unschedule the reminder for {}", enforceStartDeadlineDagAction); } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java index a1698e57a14..756e04f73e0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java @@ -35,7 +35,7 @@ import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler; import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository; import org.apache.gobblin.runtime.util.InjectionNames; -import org.apache.gobblin.service.modules.orchestration.DagActionStore; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -52,22 +52,22 @@ public class KafkaJobStatusMonitorFactory implements Provider> dagNodes = this.dagManagementStateStore.getDagNodes(dagId); Assert.assertEquals(2, dagNodes.size()); @@ -138,7 +137,7 @@ public static MostlyMySqlDagManagementStateStore getDummyDMSS(ITestMetastoreData URI specExecURI = new URI(specExecInstance); topologySpecMap.put(specExecURI, topologySpec); MostlyMySqlDagManagementStateStore dagManagementStateStore = - new MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever); + new MostlyMySqlDagManagementStateStore(config, null, null, jobStatusRetriever, mock(DagActionStore.class)); dagManagementStateStore.setTopologySpecMap(topologySpecMap); return dagManagementStateStore; } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 87ce003686c..86d6ff1047b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -125,7 +125,7 @@ public void setup() throws Exception { .addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY), TEST_TABLE).build(); MostlyMySqlDagManagementStateStore dagManagementStateStore = - new MostlyMySqlDagManagementStateStore(config, null, null, null); + new MostlyMySqlDagManagementStateStore(config, null, null, null, mock(DagActionStore.class)); SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties)); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java index 4f8f7b36594..52d80469f32 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceDeadlineDagProcsTest.java @@ -50,7 +50,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -92,12 +91,11 @@ public void enforceJobStartDeadlineTest() throws Exception { message("Test message").eventName(ExecutionStatus.ORCHESTRATED.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class)); dagManagementStateStore.checkpointDag(dag); // simulate having a dag that has not yet started running EnforceJobStartDeadlineDagProc enforceJobStartDeadlineDagProc = new EnforceJobStartDeadlineDagProc( new EnforceJobStartDeadlineDagTask(new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId), - "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), null, mock(DagActionStore.class))); + "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), null, dagManagementStateStore)); enforceJobStartDeadlineDagProc.process(dagManagementStateStore); int expectedNumOfDeleteDagNodeStates = 1; // the one dag node corresponding to the EnforceStartDeadlineDagProc @@ -129,12 +127,11 @@ public void enforceFlowFinishDeadlineTest() throws Exception { message("Test message").eventName(ExecutionStatus.RUNNING.name()).startTime(flowExecutionId).shouldRetry(false).orchestratedTime(flowExecutionId).build(); doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); doReturn(Pair.of(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(mock(DagActionStore.class)); dagManagementStateStore.checkpointDag(dag); // simulate having a dag that is in running state EnforceFlowFinishDeadlineDagProc enforceFlowFinishDeadlineDagProc = new EnforceFlowFinishDeadlineDagProc( new EnforceFlowFinishDeadlineDagTask(new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId), - "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), null, mock(DagActionStore.class))); + "job0", DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE), null, dagManagementStateStore)); enforceFlowFinishDeadlineDagProc.process(dagManagementStateStore); Assert.assertEquals(numOfDagNodes, diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java index c283b49d542..b7c86eebef8 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProcTest.java @@ -94,7 +94,7 @@ public void killDag() throws IOException, URISyntaxException, InterruptedExcepti LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow1", String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH), - null, mock(DagActionStore.class)), flowCompilationValidationHelper); + null, this.dagManagementStateStore), flowCompilationValidationHelper); launchDagProc.process(this.dagManagementStateStore); List> specProducers = dag.getNodes().stream().map(n -> { @@ -107,7 +107,7 @@ public void killDag() throws IOException, URISyntaxException, InterruptedExcepti KillDagProc killDagProc = new KillDagProc(new KillDagTask(new DagActionStore.DagAction("fg", "flow1", String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.KILL), - null, mock(DagActionStore.class))); + null, this.dagManagementStateStore)); killDagProc.process(this.dagManagementStateStore); long cancelJobCount = specProducers.stream() @@ -137,7 +137,7 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new DagActionStore.DagAction("fg", "flow2", String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH), - null, mock(DagActionStore.class)), flowCompilationValidationHelper); + null, this.dagManagementStateStore), flowCompilationValidationHelper); launchDagProc.process(this.dagManagementStateStore); List> specProducers = dag.getNodes().stream().map(n -> { @@ -150,7 +150,7 @@ public void killDagNode() throws IOException, URISyntaxException, InterruptedExc KillDagProc killDagProc = new KillDagProc(new KillDagTask(new DagActionStore.DagAction("fg", "flow2", String.valueOf(flowExecutionId), "job2", DagActionStore.DagActionType.KILL), - null, mock(DagActionStore.class))); + null, this.dagManagementStateStore)); killDagProc.process(this.dagManagementStateStore); long cancelJobCount = specProducers.stream() diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java index 48c65870e01..34a32391441 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java @@ -89,7 +89,7 @@ public void launchDag() doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any()); LaunchDagProc launchDagProc = new LaunchDagProc( new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345", - "jn", DagActionStore.DagActionType.LAUNCH), null, mock(DagActionStore.class)), + "jn", DagActionStore.DagActionType.LAUNCH), null, this.dagManagementStateStore), flowCompilationValidationHelper); launchDagProc.process(this.dagManagementStateStore); diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java index 6a625e527a9..eb3429597ab 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProcTest.java @@ -70,7 +70,6 @@ public class ReevaluateDagProcTest { private ITestMetastoreDatabase testMetastoreDatabase; private DagManagementStateStore dagManagementStateStore; private MockedStatic mockedGobblinServiceManager; - private DagActionStore dagActionStore; private DagActionReminderScheduler dagActionReminderScheduler; @BeforeClass @@ -83,9 +82,7 @@ public void setUpClass() throws Exception { public void setUp() throws Exception { this.dagManagementStateStore = spy(MostlyMySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); mockDMSSCommonBehavior(dagManagementStateStore); - this.dagActionStore = mock(DagActionStore.class); this.dagActionReminderScheduler = mock(DagActionReminderScheduler.class); - this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionStore.class)).thenReturn(this.dagActionStore); this.mockedGobblinServiceManager.when(() -> GobblinServiceManager.getClass(DagActionReminderScheduler.class)).thenReturn(this.dagActionReminderScheduler); } @@ -125,11 +122,10 @@ public void testOneNextJobToRun() throws Exception { doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any()); ReevaluateDagProc reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction(flowGroup, flowName, - String.valueOf(flowExecutionId), "job0", DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class))); + String.valueOf(flowExecutionId), "job0", DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore)); reEvaluateDagProc.process(dagManagementStateStore); long addSpecCount = specProducers.stream() @@ -151,7 +147,7 @@ public void testOneNextJobToRun() throws Exception { .filter(a -> a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1); // when there is no more job to run in re-evaluate dag proc, it deletes enforce_flow_finish_dag_action also - Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream() + Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream() .filter(a -> a.getMethod().getName().equals("deleteDagAction")).count(), 1); } @@ -170,7 +166,6 @@ public void testNoNextJobToRun() throws Exception { doReturn(Optional.of(dag)).when(dagManagementStateStore).getDag(any()); doReturn(new ImmutablePair<>(Optional.of(dag.getStartNodes().get(0)), Optional.of(jobStatus))).when(dagManagementStateStore).getDagNodeWithJobStatus(any()); - doReturn(Optional.of(dag)).when(dagManagementStateStore).getParentDag(any()); doReturn(true).when(dagManagementStateStore).releaseQuota(any()); List> specProducers = dag.getNodes().stream().map(n -> { @@ -191,7 +186,7 @@ public void testNoNextJobToRun() throws Exception { ReevaluateDagProc reEvaluateDagProc = new ReevaluateDagProc(new ReevaluateDagTask(new DagActionStore.DagAction(flowGroup, flowName, - String.valueOf(flowExecutionId), "job0", DagActionStore.DagActionType.REEVALUATE), null, mock(DagActionStore.class))); + String.valueOf(flowExecutionId), "job0", DagActionStore.DagActionType.REEVALUATE), null, dagManagementStateStore)); reEvaluateDagProc.process(dagManagementStateStore); // no new job to launch for this one job flow @@ -208,7 +203,7 @@ public void testNoNextJobToRun() throws Exception { Assert.assertEquals(Mockito.mockingDetails(this.dagActionReminderScheduler).getInvocations().stream() .filter(a -> a.getMethod().getName().equals("unscheduleReminderJob")).count(), 1); - Assert.assertEquals(Mockito.mockingDetails(this.dagActionStore).getInvocations().stream() + Assert.assertEquals(Mockito.mockingDetails(this.dagManagementStateStore).getInvocations().stream() .filter(a -> a.getMethod().getName().equals("deleteDagAction")).count(), 1); } } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java index 993fba8c02f..08fff6fe193 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java @@ -53,7 +53,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -98,7 +97,7 @@ public void resumeDag() ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new DagActionStore.DagAction(flowGroup, flowName, String.valueOf(flowExecutionId), MysqlDagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.RESUME), - null, mock(DagActionStore.class))); + null, this.dagManagementStateStore)); resumeDagProc.process(this.dagManagementStateStore); SpecProducer specProducer = DagManagerUtils.getSpecProducer(dag.getNodes().get(1));