Skip to content

Commit

Permalink
[GOBBLIN-2148] Add temporal workflow cancel support (#4045)
Browse files Browse the repository at this point in the history
* Add temporal workflow cancel support
  • Loading branch information
abhishekmjain authored Oct 7, 2024
1 parent 4776135 commit 20c4341
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public ExecuteGobblinJobLauncher(
public void submitJob(List<WorkUnit> workunits) {
try {
Properties finalProps = adjustJobProperties(this.jobProps);
// Initialize workflowId.
this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(finalProps)))
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(finalProps))
.setWorkflowId(this.workflowId)
.build();
ExecuteGobblinWorkflow workflow = this.client.newWorkflowStub(ExecuteGobblinWorkflow.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ public GenerateWorkUnitsJobLauncher(
@Override
public void submitJob(List<WorkUnit> workunits) {
try {
this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)))
.setWorkflowId(this.workflowId)
.build();
GenerateWorkUnitsWorkflow workflow = this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ public void submitJob(List<WorkUnit> workunits) {
GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX,
GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_METRICS_SUFFIX));

this.workflowId = Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setSearchAttributes(TemporalWorkFlowUtils.generateGaasSearchAttributes(this.jobProps))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
.setWorkflowId(this.workflowId)
.build();

Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, Help.loadFileSystem(wuSpec)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -87,7 +91,7 @@ public abstract class GobblinJobLauncher extends AbstractJobLauncher {
protected final StateStores stateStores;
protected JobListener jobListener;
protected volatile boolean jobSubmitted = false;

private final ExecutorService executor;

public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap, EventBus eventbus)
Expand Down Expand Up @@ -122,6 +126,7 @@ public GobblinJobLauncher(Properties jobProps, Path appWorkDir,
this.taskStateCollectorService =
new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), this.eventBus, this.eventSubmitter,
this.stateStores.getTaskStateStore(), this.outputTaskStateDir, this.getIssueRepository());
this.executor = Executors.newSingleThreadExecutor();
}

@Override
Expand Down Expand Up @@ -150,17 +155,23 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
// Start the output TaskState collector service
this.taskStateCollectorService.startAsync().awaitRunning();

Future<?> submitJobFuture = null;
synchronized (this.cancellationRequest) {
if (!this.cancellationRequested) {
submitJob(workUnits);
submitJobFuture = executor.submit(() -> {
try {
submitJob(workUnits);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
log.info(String.format("Submitted job %s", this.jobContext.getJobId()));
this.jobSubmitted = true;
} else {
log.warn("Job {} not submitted as it was requested to be cancelled.", this.jobContext.getJobId());
}
}

waitJob();
waitJob(submitJobFuture);
log.info(String.format("Job %s completed", this.jobContext.getJobId()));
} finally {
// The last iteration of output TaskState collecting will run when the collector service gets stopped
Expand All @@ -172,7 +183,11 @@ protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception {
protected void submitJob(List<WorkUnit> workUnits) throws Exception {
}

protected void waitJob() throws InterruptedException {
protected void waitJob(Future<?> submitJobFuture)
throws InterruptedException, ExecutionException {
if (submitJobFuture != null) {
submitJobFuture.get();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Workflow;

Expand Down Expand Up @@ -65,10 +72,13 @@
@Alpha
public abstract class GobblinTemporalJobLauncher extends GobblinJobLauncher {
private static final Logger log = Workflow.getLogger(GobblinTemporalJobLauncher.class);
private static final int TERMINATION_TIMEOUT_SECONDS = 3;

protected WorkflowServiceStubs workflowServiceStubs;
protected WorkflowClient client;
protected String queueName;
protected String namespace;
protected String workflowId;

public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap, EventBus eventBus)
Expand All @@ -79,11 +89,13 @@ public GobblinTemporalJobLauncher(Properties jobProps, Path appWorkDir,
String connectionUri = jobProps.getProperty(TEMPORAL_CONNECTION_STRING);
this.workflowServiceStubs = createServiceInstance(connectionUri);

String namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.namespace = jobProps.getProperty(GOBBLIN_TEMPORAL_NAMESPACE, DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE);
this.client = createClientInstance(workflowServiceStubs, namespace);

this.queueName = jobProps.getProperty(GOBBLIN_TEMPORAL_TASK_QUEUE, DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE);

// non-null value indicates job has been submitted
this.workflowId = null;
startCancellationExecutor();
}

Expand Down Expand Up @@ -113,7 +125,56 @@ protected void handleLaunchFinalization() {

@Override
protected void executeCancellation() {
log.info("Cancel temporal workflow");
if (this.workflowId == null) {
log.info("Cancellation of temporal workflow attempted without submitting it");
return;
}

log.info("Cancelling temporal workflow {}", this.workflowId);
try {
WorkflowStub workflowStub = this.client.newUntypedWorkflowStub(this.workflowId);

// Describe the workflow execution to get its status
DescribeWorkflowExecutionRequest request = DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(this.namespace)
.setExecution(workflowStub.getExecution())
.build();
DescribeWorkflowExecutionResponse response = workflowServiceStubs.blockingStub().describeWorkflowExecution(request);

WorkflowExecutionStatus status;
try {
status = response.getWorkflowExecutionInfo().getStatus();
} catch (Exception e) {
log.warn("Exception occurred while getting status of the workflow " + this.workflowId
+ ". We would still attempt the cancellation", e);
workflowStub.cancel();
log.info("Temporal workflow {} cancelled successfully", this.workflowId);
return;
}

// Check if the workflow is not finished
if (status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED &&
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED &&
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED &&
status != WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED) {
workflowStub.cancel();
try {
// Check workflow status, if it is cancelled, will throw WorkflowFailedException else TimeoutException
workflowStub.getResult(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS, String.class, String.class);
} catch (TimeoutException te) {
// Workflow is still running, terminate it.
log.info("Workflow is still running, will attempt termination", te);
workflowStub.terminate("Job cancel invoked");
} catch (WorkflowFailedException wfe) {
// Do nothing as exception is expected.
}
log.info("Temporal workflow {} cancelled successfully", this.workflowId);
} else {
log.info("Workflow {} is already finished with status {}", this.workflowId, status);
}
} catch (Exception e) {
log.error("Exception occurred while cancelling the workflow " + this.workflowId, e);
}
}

/** No-op: merely logs a warning, since not expected to be invoked */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) {
LOGGER.info("No job schedule found, so running job " + jobUri);
GobblinTemporalJobLauncherListener listener = new GobblinTemporalJobLauncherListener(this.launcherMetrics);
JobLauncher launcher = buildJobLauncher(newJobArrival.getJobConfig());
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
launcher.cancelJob(listener);
} catch (JobException e) {
LOGGER.error("Failed to cancel the job during shutdown", e);
throw new RuntimeException(e);
}
}));
launcher.launchJob(listener);
}
} catch (Exception je) {
Expand Down
Loading

0 comments on commit 20c4341

Please sign in to comment.