Skip to content

Commit

Permalink
Fix issue with isReplaying causing direct query to spam logs (tempora…
Browse files Browse the repository at this point in the history
…lio#2087)

Fix direct query causing logs when replaying in certain cases
  • Loading branch information
Quinn-With-Two-Ns authored May 31, 2024
1 parent 08b220c commit 5c464e8
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ public Optional<HistoryEvent> getWorkflowTaskCompletedEvent() {

/**
* @return Should the buffer be fetched. true if a whole history for a workflow task is
* accumulated or events can't be attributed to a completed workflow task
* accumulated or events can't be attributed to a completed workflow task. The whole history
* includes the unprocessed history events before the WorkflowTaskStarted and the command
* events after the WorkflowTaskCompleted.
*/
public boolean addEvent(HistoryEvent event, boolean hasNextEvent) {
if (readyToFetch.size() > 0) {
Expand All @@ -84,6 +86,13 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
// flush buffer
flushBuffer();

// If the last event in history is a WORKFLOW_TASK_COMPLETED, because say we received a direct
// query,
// we need to return it as a batch.
if (WFTState.Started.equals(wftSequenceState)
&& event.getEventType().equals(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)) {
workflowTaskCompletedEvent = Optional.of(event);
}
// exit the sequence
wftSequenceState = WFTState.None;
readyToFetch.add(event);
Expand Down Expand Up @@ -148,16 +157,16 @@ private void addToBuffer(HistoryEvent event) {
public EventBatch fetch() {
if (readyToFetch.size() == 1) {
HistoryEvent event = readyToFetch.get(0);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
Optional<HistoryEvent> wftCompleted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return new EventBatch(wftStarted, Collections.singletonList(event));
return new EventBatch(wftCompleted, Collections.singletonList(event));
} else {
List<HistoryEvent> result = new ArrayList<>(readyToFetch);
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
Optional<HistoryEvent> wftCompleted = workflowTaskCompletedEvent;
workflowTaskCompletedEvent = Optional.empty();
readyToFetch.clear();
return new EventBatch(wftStarted, result);
return new EventBatch(wftCompleted, result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowException;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.Issue;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -49,7 +57,10 @@ public class DirectQueryReplaysDontSpamLogWithWorkflowExecutionExceptionsTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestWorkflowNonRetryableFlag.class).build();
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestWorkflowNonRetryableFlag.class, LogAndKeepRunningWorkflow.class)
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
.build();

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -79,6 +90,72 @@ public void queriedWorkflowFailureDoesntProduceAdditionalLogs() {
workflowExecuteRunnableLoggerAppender.list.size());
}

@Test
public void queriedWorkflowFailureDoesntProduceAdditionalLogsWhenWorkflowIsNotCompleted() {
TestWorkflows.QueryableWorkflow workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.QueryableWorkflow.class);

WorkflowExecution execution = WorkflowClient.start(workflow::execute);

assertEquals("my-state", workflow.getState());
assertEquals("There was only one execution.", 1, workflowCodeExecutionCount.get());

testWorkflowRule.invalidateWorkflowCache();
assertEquals("my-state", workflow.getState());
assertEquals(
"There was two executions - one original and one full replay for query.",
2,
workflowCodeExecutionCount.get());

workflow.mySignal("exit");
assertEquals("my-state", workflow.getState());
assertEquals(
"There was three executions - one original and two full replays for query.",
3,
workflowCodeExecutionCount.get());
assertEquals(
"Only the original exception should be logged.",
1,
workflowExecuteRunnableLoggerAppender.list.size());
}

public static class LogAndKeepRunningWorkflow implements TestWorkflows.QueryableWorkflow {
private final org.slf4j.Logger logger =
Workflow.getLogger("io.temporal.internal.sync.WorkflowExecutionHandler");
private final TestActivities.VariousTestActivities activities =
Workflow.newActivityStub(
TestActivities.VariousTestActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
.build());
private boolean exit;

@Override
public String execute() {
workflowCodeExecutionCount.incrementAndGet();
while (true) {
try {
activities.throwIO();
} catch (ActivityFailure e) {
logger.error("Unexpected error on activity", e);
Workflow.await(() -> exit);
return "exit";
}
}
}

@Override
public String getState() {
return "my-state";
}

@Override
public void mySignal(String value) {
exit = true;
}
}

public static class TestWorkflowNonRetryableFlag implements TestWorkflows.TestWorkflowWithQuery {

@Override
Expand Down

0 comments on commit 5c464e8

Please sign in to comment.