Skip to content

Commit

Permalink
[GOBBLIN-2084] change flow execution id to long everywhere it is poss…
Browse files Browse the repository at this point in the history
…ible (#3967)

* change flow execution id to long everywhere it is possible
  • Loading branch information
arjun4084346 authored Jun 12, 2024
1 parent 8433913 commit 1258cc6
Show file tree
Hide file tree
Showing 32 changed files with 134 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
Expand Down Expand Up @@ -71,7 +70,7 @@ public class DagActionStoreChangeMonitorTest {

private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final String FLOW_EXECUTION_ID = "123";
private final long FLOW_EXECUTION_ID = 123L;
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;

Expand Down Expand Up @@ -136,7 +135,7 @@ public void tearDown() throws Exception {
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws SpecNotFoundException {
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", null);
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", FLOW_EXECUTION_ID, null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
verify(mockDagActionStoreChangeMonitor.getDagManager(), times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
Expand Down Expand Up @@ -227,17 +226,6 @@ public void testProcessMessageWithDelete() throws SpecNotFoundException {

@Test (dependsOnMethods = "testProcessMessageWithDelete")
public void testStartupSequenceHandlesFailures() throws Exception {
Config config = ConfigBuilder.create()
.addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
.addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
.addPrimitive("MysqlDagActionStore." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
.build();
String flowGroup = "testFlowGroup";
String flowName = "testFlowName";
String jobName = "testJobName";
String flowExecutionId = "12345677";

DagManagementStateStore dagManagementStateStore = mock(DagManagementStateStore.class);

Config monitorConfig = ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, ConfigValueFactory.fromAnyRef("localhost:0000"))
Expand All @@ -263,28 +251,28 @@ public void testStartupSequenceHandlesFailures() throws Exception {
* Util to create a general DagActionStoreChange type event
*/
private DagActionStoreChangeEvent createDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, String flowExecutionId, DagActionValue dagAction) {
String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) {
String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter), System.currentTimeMillis(), operationType);
txidCounter++;
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, flowExecutionId,
return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, flowName, String.valueOf(flowExecutionId),
DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
}

/**
* Form a key for events using the flow identifiers
* @return a key formed by adding an '_' delimiter between the flow identifiers
*/
private String getKeyForFlow(String flowGroup, String flowName, String flowExecutionId) {
private String getKeyForFlow(String flowGroup, String flowName, long flowExecutionId) {
return flowGroup + "_" + flowName + "_" + flowExecutionId;
}

/**
* Util to create wrapper around DagActionStoreChangeEvent
*/
private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, String flowName,
String flowExecutionId, DagActionValue dagAction) {
private Kafka09ConsumerClient.Kafka09ConsumerRecord wrapDagActionStoreChangeEvent(OperationType operationType,
String flowGroup, String flowName, long flowExecutionId, DagActionValue dagAction) {
DagActionStoreChangeEvent eventToProcess = null;
try {
eventToProcess =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class KafkaAvroJobStatusMonitorTest {
private String flowName = "myFlowName";
private String jobGroup = "myJobGroup";
private String jobName = "myJobName";
private String flowExecutionId = "1234";
private long flowExecutionId = 1234L;
private String jobExecutionId = "1111";
private String message = "https://myServer:8143/1234/1111";
private String stateStoreDir = "/tmp/jobStatusMonitor/statestore";
Expand Down Expand Up @@ -707,7 +707,7 @@ private GobblinTrackingEvent createGTE(String eventName, Map<String, String> cus
Map<String, String> metadata = Maps.newHashMap();
metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, this.flowGroup);
metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, this.flowName);
metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, this.flowExecutionId);
metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, String.valueOf(this.flowExecutionId));
metadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, this.jobName);
metadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, this.jobGroup);
metadata.put(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, this.jobExecutionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
String flowSpecUriString = "/flowgroup/flowname/spec";
Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, 12345L);
String specUriString = "/foo/bar/spec";
Spec spec = initJobSpec(specUriString);

Expand Down Expand Up @@ -253,9 +253,9 @@ private static JobSpec initJobSpec(String specUri) {
.build();
}

private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
private static JobSpec initJobSpecWithFlowExecutionId(String specUri, long flowExecutionId) {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, String.valueOf(flowExecutionId));
return JobSpec.builder(specUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
.withVersion("1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@

package org.apache.gobblin.runtime.api;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.lang.StringUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand All @@ -27,20 +38,13 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;

import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowConfig;
Expand Down Expand Up @@ -146,14 +150,13 @@ public static FlowSpec.Builder builder(URI catalogURI, Properties flowProps) {
* @param key
* @param value
*/
public synchronized void addProperty(String key, String value) {
public synchronized void addProperty(String key, Long value) {
this.config = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
/* Make sure configAsProperties has been initialized. If it's just initialized, setting the property will be a
redundant operation. However, if it already existed we need to update/add the key-value pair.
*/
this.getConfigAsProperties();
this.configAsProperties.setProperty(key, value);

this.configAsProperties.setProperty(key, value.toString());
}

public void addCompilationError(String src, String dst, String errorMessage, int numberOfHops) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
*/

package org.apache.gobblin.runtime.spec_executorInstance;
import com.typesafe.config.Config;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -70,7 +74,7 @@ public Future<?> updateSpec(Spec updatedSpec) {
private Future<?> writeSpec(Spec spec, SpecExecutor.Verb verb) {
if (spec instanceof JobSpec) {
// format the JobSpec to have file of <flowGroup>_<flowName>.job
String flowExecutionId = ((JobSpec) spec).getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
long flowExecutionId = ((JobSpec) spec).getConfig().getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobFileName = getJobFileName(spec.getUri(), flowExecutionId);
try (
FileOutputStream fStream = new FileOutputStream(this.specProducerPath + File.separatorChar + jobFileName);
Expand Down Expand Up @@ -117,7 +121,7 @@ public Future<? extends List<Spec>> listSpecs() {
throw new UnsupportedOperationException();
}

public static String getJobFileName(URI specUri, String flowExecutionId) {
public static String getJobFileName(URI specUri, long flowExecutionId) {
String[] uriTokens = specUri.getPath().split("/");
return String.join("_", uriTokens) + "_" + flowExecutionId + ".job";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.gobblin.runtime.api;

import com.typesafe.config.Config;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowId;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.typesafe.config.Config;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.FlowId;


public class FlowSpecTest {

Expand All @@ -38,7 +41,7 @@ public class FlowSpecTest {
public void testAddProperty() throws URISyntaxException {
String flowGroup = "myGroup";
String flowName = "myName";
String flowExecutionId = "1234";
long flowExecutionId = 1234L;
FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);

Expand All @@ -52,13 +55,13 @@ public void testAddProperty() throws URISyntaxException {
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);

Properties updatedProperties = flowSpec.getConfigAsProperties();
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), String.valueOf(flowExecutionId));
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");

Config updatedConfig = flowSpec.getConfig();
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), flowExecutionId);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY), String.valueOf(flowExecutionId));
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY), flowGroup);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_NAME_KEY), flowName);
Assert.assertEquals(updatedConfig.getString(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY), "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void execute(JobExecutionContext context) {
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
String flowExecutionId = jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
long flowExecutionId = jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);

log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", flowName: " + flowName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ enum DagActionType {
class DagAction {
final String flowGroup;
final String flowName;
final String flowExecutionId;
final long flowExecutionId;
final String jobName;
final DagActionType dagActionType;
final boolean isReminder;

public DagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType) {
public DagAction(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionType dagActionType) {
this(flowGroup, flowName, flowExecutionId, jobName, dagActionType, false);
}

public static DagAction forFlow(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) {
public static DagAction forFlow(String flowGroup, String flowName, long flowExecutionId, DagActionType dagActionType) {
return new DagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType);
}

Expand All @@ -67,16 +67,14 @@ public FlowId getFlowId() {
* Replace flow execution id with agreed upon event time to easily track the flow
*/
public DagAction updateFlowExecutionId(long eventTimeMillis) {
return new DagAction(this.getFlowGroup(), this.getFlowName(),
String.valueOf(eventTimeMillis), this.getJobName(), this.getDagActionType());
return new DagAction(this.getFlowGroup(), this.getFlowName(), eventTimeMillis, this.getJobName(), this.getDagActionType());
}

/**
* Creates and returns a {@link DagNodeId} for this DagAction.
*/
public DagNodeId getDagNodeId() {
return new DagNodeId(this.flowGroup, this.flowName,
Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
return new DagNodeId(this.flowGroup, this.flowName, this.flowExecutionId, this.flowGroup, this.jobName);
}

/**
Expand All @@ -97,7 +95,7 @@ public DagManager.DagId getDagId() {
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType) throws IOException, SQLException;
boolean exists(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionType dagActionType) throws IOException, SQLException;

/**
* Check if an action exists in dagAction store by flow group, flow name, and flow execution id, it assumes jobName is
Expand All @@ -108,7 +106,7 @@ public DagManager.DagId getDagId() {
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException, SQLException;
boolean exists(String flowGroup, String flowName, long flowExecutionId, DagActionType dagActionType) throws IOException, SQLException;

/** Persist the {@link DagAction} in {@link DagActionStore} for durability */
default void addDagAction(DagAction dagAction) throws IOException {
Expand All @@ -129,7 +127,7 @@ default void addDagAction(DagAction dagAction) throws IOException {
* @param dagActionType the value of the dag action
* @throws IOException
*/
void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType) throws IOException;
void addJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName, DagActionType dagActionType) throws IOException;

/**
* Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName.
Expand All @@ -139,7 +137,7 @@ default void addDagAction(DagAction dagAction) throws IOException {
* @param dagActionType the value of the dag action
* @throws IOException
*/
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException {
default void addFlowDagAction(String flowGroup, String flowName, long flowExecutionId, DagActionType dagActionType) throws IOException {
addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ default void deleteFailedDag(Dag<JobExecutionPlan> dag) throws IOException {
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean existsJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
boolean existsJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;

/**
Expand All @@ -218,7 +218,7 @@ boolean existsJobDagAction(String flowGroup, String flowName, String flowExecuti
* @param dagActionType the value of the dag action
* @throws IOException
*/
boolean existsFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
boolean existsFlowDagAction(String flowGroup, String flowName, long flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException, SQLException;

/** Persist the {@link DagActionStore.DagAction} in {@link DagActionStore} for durability */
Expand All @@ -240,7 +240,7 @@ default void addDagAction(DagActionStore.DagAction dagAction) throws IOException
* @param dagActionType the value of the dag action
* @throws IOException
*/
void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName,
void addJobDagAction(String flowGroup, String flowName, long flowExecutionId, String jobName,
DagActionStore.DagActionType dagActionType) throws IOException;

/**
Expand All @@ -251,7 +251,7 @@ void addJobDagAction(String flowGroup, String flowName, String flowExecutionId,
* @param dagActionType the value of the dag action
* @throws IOException
*/
default void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId,
default void addFlowDagAction(String flowGroup, String flowName, long flowExecutionId,
DagActionStore.DagActionType dagActionType) throws IOException {
addDagAction(DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId, dagActionType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public String toString() {
public static class DagId {
String flowGroup;
String flowName;
String flowExecutionId;
long flowExecutionId;

public DagId(String flowGroup, String flowName, String flowExecutionId) {
public DagId(String flowGroup, String flowName, long flowExecutionId) {
this.flowGroup = flowGroup;
this.flowName = flowName;
this.flowExecutionId = flowExecutionId;
Expand Down
Loading

0 comments on commit 1258cc6

Please sign in to comment.