Skip to content

Commit

Permalink
Allow SDK to handle speculative workflow task with command events (te…
Browse files Browse the repository at this point in the history
…mporalio#2099)

Allow SDK to handle speculative workflow task with command events
  • Loading branch information
Quinn-With-Two-Ns authored Jun 10, 2024
1 parent 42b9803 commit 4f781b3
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ public QueryResult handleDirectQueryWorkflowTask(
}

@Override
public void setCurrentStartedEvenId(Long eventId) {
workflowStateMachines.setLastWFTStartedEventId(eventId);
public void resetStartedEvenId(Long eventId) {
workflowStateMachines.resetStartedEvenId(eventId);
}

private void handleWorkflowTaskImpl(
PollWorkflowTaskQueueResponseOrBuilder workflowTask,
WorkflowHistoryIterator historyIterator) {
workflowStateMachines.setWorklfowStartedEventId(workflowTask.getStartedEventId());
workflowStateMachines.setWorkflowStartedEventId(workflowTask.getStartedEventId());
workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
workflowStateMachines.setMessages(workflowTask.getMessagesList());
applyServerHistory(workflowTask.getStartedEventId(), historyIterator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private Result handleWorkflowTaskWithQuery(
workflowTask.getWorkflowType().getName(),
workflowTask,
wftResult,
workflowRunTaskHandler::setCurrentStartedEvenId);
workflowRunTaskHandler::resetStartedEvenId);
}

if (useCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ QueryResult handleDirectQueryWorkflowTask(
throws Throwable;

/**
* Reset the workflow event Id.
* Reset the workflow event ID.
*
* @param eventId the event Id to reset the cached state to.
* @param eventId the event ID to reset the cached state to.
*/
void setCurrentStartedEvenId(Long eventId);
void resetStartedEvenId(Long eventId);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,21 +217,24 @@ public WorkflowStateMachines(
* and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
* that this workflow task will not have a matching closing event and needs to be executed.
*/
public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
public void setWorkflowStartedEventId(long workflowTaskStartedEventId) {
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
}

public void setLastWFTStartedEventId(long eventId) {
public void resetStartedEvenId(long eventId) {
// We must reset the last event we handled to be after the last WFT we really completed
// + any command events (since the SDK "processed" those when it emitted the commands). This
// is also equal to what we just processed in the speculative task, minus two, since we
// would've just handled the most recent WFT started event, and we need to drop that & the
// schedule event just before it.
long resetLastHandledEventId = this.lastHandledEventId - 2;
// We have to drop any state machines (which should only be one workflow task machine)
// created when handling the speculative workflow task
for (long i = this.lastHandledEventId; i > eventId; i--) {
for (long i = this.lastHandledEventId; i > resetLastHandledEventId; i--) {
stateMachines.remove(i);
}
this.lastWFTStartedEventId = eventId;
// When we reset the event ID on a speculative WFT we need to move this counter back
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
// always follows the WFT started.
this.lastHandledEventId = eventId + 1;
this.lastHandledEventId = resetLastHandledEventId;
}

public long getLastWFTStartedEventId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ final class Result {
private final RespondQueryTaskCompletedRequest queryCompleted;
private final RpcRetryOptions requestRetryOptions;
private final boolean completionCommand;
private final Functions.Proc1<Long> eventIdSetHandle;
private final Functions.Proc1<Long> resetEventIdHandle;

public Result(
String workflowType,
Expand All @@ -50,14 +50,14 @@ public Result(
RespondQueryTaskCompletedRequest queryCompleted,
RpcRetryOptions requestRetryOptions,
boolean completionCommand,
Functions.Proc1<Long> eventIdSetHandle) {
Functions.Proc1<Long> resetEventIdHandle) {
this.workflowType = workflowType;
this.taskCompleted = taskCompleted;
this.taskFailed = taskFailed;
this.queryCompleted = queryCompleted;
this.requestRetryOptions = requestRetryOptions;
this.completionCommand = completionCommand;
this.eventIdSetHandle = eventIdSetHandle;
this.resetEventIdHandle = resetEventIdHandle;
}

public RespondWorkflowTaskCompletedRequest getTaskCompleted() {
Expand All @@ -80,9 +80,9 @@ public boolean isCompletionCommand() {
return completionCommand;
}

public Functions.Proc1<Long> getEventIdSetHandle() {
if (eventIdSetHandle != null) {
return eventIdSetHandle;
public Functions.Proc1<Long> getResetEventIdHandle() {
if (resetEventIdHandle != null) {
return resetEventIdHandle;
}
return (arg) -> {};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void handle(WorkflowTask task) throws Exception {
// was dropped by resting out event ID.
long resetEventId = response.getResetHistoryEventId();
if (resetEventId != 0) {
result.getEventIdSetHandle().apply(resetEventId);
result.getResetEventIdHandle().apply(resetEventId);
}
nextWFTResponse =
response.hasWorkflowTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ TestHistoryBuilder add(EventType type) {
return this;
}

History getHistory() {
return History.newBuilder().addAllEvents(events).build();
}

long addGetEventId(EventType type) {
return addGetEventId(type, null);
}
Expand Down Expand Up @@ -293,7 +297,7 @@ public void handleWorkflowTask(
this.events.subList((int) stateMachines.getLastStartedEventId(), this.events.size());
PeekingIterator<HistoryEvent> history = Iterators.peekingIterator(events.iterator());
HistoryInfo info = getHistoryInfo(replayToTaskIndex);
stateMachines.setWorklfowStartedEventId(info.getWorkflowTaskStartedEventId());
stateMachines.setWorkflowStartedEventId(info.getWorkflowTaskStartedEventId());
stateMachines.setReplaying(info.getPreviousStartedEventId() > 0);

long wftStartedEventId = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,130 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
}
}

@Test
public void testUpdateRejectedAndReset() throws InvalidProtocolBufferException {
class TestUpdateListener extends TestEntityManagerListenerBase {

@Override
public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
builder.<HistoryEvent>add1(
(v, c) ->
stateMachines.newTimer(
StartTimerCommandAttributes.newBuilder()
.setTimerId("timer1")
.setStartToFireTimeout(
ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1)))
.build(),
c));
}

@Override
protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder) {
builder.add(
(r) -> {
message
.getCallbacks()
.reject(converter.exceptionToFailure(new RuntimeException("test failure")));
});
}

@Override
protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder<Void> builder) {
builder.<HistoryEvent>add1(
(v, c) ->
stateMachines.newTimer(
StartTimerCommandAttributes.newBuilder()
.setTimerId("timer2")
.setStartToFireTimeout(
ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1)))
.build(),
c));
}
}

/*
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_TIMER_STARTED
6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
7: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/

TestHistoryBuilder h = new TestHistoryBuilder();
{
TestEntityManagerListenerBase listener = new TestUpdateListener();
stateMachines = newStateMachines(listener);
h.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED);
h.addWorkflowTask();
h.add(
EventType.EVENT_TYPE_TIMER_STARTED,
TimerStartedEventAttributes.newBuilder().setTimerId("timer1"));
h.addWorkflowTaskScheduled();
h.addWorkflowTaskStarted();
}
{
// Full replay
TestEntityManagerListenerBase listener = new TestUpdateListener();
stateMachines = newStateMachines(listener);
Request request =
Request.newBuilder()
.setInput(
Input.newBuilder()
.setName("updateName")
.setArgs(converter.toPayloads("arg").get()))
.build();
stateMachines.setMessages(
Collections.unmodifiableList(
Arrays.asList(
new Message[] {
Message.newBuilder()
.setProtocolInstanceId("protocol_id")
.setId("id")
.setEventId(6)
.setBody(Any.pack(request))
.build(),
})));
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines);
assertEquals(0, commands.size());
List<Message> messages = stateMachines.takeMessages();
assertEquals(1, messages.size());
Rejection rejection = messages.get(0).getBody().unpack(Rejection.class);
assertNotNull(rejection);
assertEquals(request, rejection.getRejectedRequest());
// Simulate the server request to reset the workflow event ID
stateMachines.resetStartedEvenId(3);
// Create a new history after the reset event ID
/*
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_TIMER_STARTED
6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
8: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/
TestHistoryBuilder historyAfterReset = new TestHistoryBuilder();
historyAfterReset.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED);
historyAfterReset.addWorkflowTask();
historyAfterReset.add(
EventType.EVENT_TYPE_TIMER_STARTED,
TimerStartedEventAttributes.newBuilder().setTimerId("timer1"));
historyAfterReset.add(
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
WorkflowExecutionSignaledEventAttributes.newBuilder().setSignalName("signal1"));
historyAfterReset.addWorkflowTaskScheduled();
historyAfterReset.addWorkflowTaskStarted();
// Test new history with the old workflow state machines
commands = historyAfterReset.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
assertEquals(1, commands.size());
messages = stateMachines.takeMessages();
assertEquals(0, messages.size());
}
}

@Test
public void testUpdateAdmittedAndCompletedImmediately() {
class TestUpdateListener extends TestEntityManagerListenerBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ public QueryResult handleDirectQueryWorkflowTask(
}

@Override
public void setCurrentStartedEvenId(Long event) {}
public void resetStartedEvenId(Long event) {}

@Override
public void close() {
Expand Down
Loading

0 comments on commit 4f781b3

Please sign in to comment.