diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 4e3478961..8114c47fd 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" ) @@ -551,6 +552,254 @@ func TestBlockingSelect(t *testing.T) { require.EqualValues(t, expected, history) } +func TestSelectBlockingDefault(t *testing.T) { + var history []string + env := &workflowEnvironmentImpl{ + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + // Verify that the flag is not set + require.False(t, env.GetFlag(SDKFlagBlockedSelectorSignalReceive)) + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + + // Default behavior this signal is lost + require.True(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + }, func() bool { return false }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.False(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + } + require.EqualValues(t, expected, history) +} + +func TestSelectBlockingDefaultWithFlag(t *testing.T) { + var history []string + env := &workflowEnvironmentImpl{ + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + commandsHelper: newCommandsHelper(), + dataConverter: converter.GetDefaultDataConverter(), + workflowInfo: &WorkflowInfo{ + Namespace: "namespace:" + t.Name(), + TaskQueueName: "taskqueue:" + t.Name(), + }, + } + require.True(t, env.TryUse(SDKFlagBlockedSelectorSignalReceive)) + interceptor, ctx, err := newWorkflowContext(env, nil) + require.NoError(t, err, "newWorkflowContext failed") + d, _ := newDispatcher(ctx, interceptor, func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + + }) + + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + c2.Send(ctx, "two") + history = append(history, "add-two-done") + }) + + selector := NewSelector(ctx) + var v string + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddDefault(func() { + c2.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c2-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + + // Signal should not be lost + require.False(t, c1.Len() == 0 && v == "two") + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + }, func() bool { return false }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone()) + + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "add-two-done", + "c2-two", + "select2", + "c1-one", + "done", + } + + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectFuture(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + f1, s1 := NewFuture(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + history = append(history, "add-two") + s1.SetValue("one-future") + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddFuture(f1, func(f Future) { + var v string + err := f.Get(ctx, &v) + require.NoError(t, err) + history = append(history, fmt.Sprintf("f1-%v", v)) + }) + history = append(history, "select1") + selector.Select(ctx) + fmt.Println("select1 done", history) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "add-two", + "c1-one", + "select2", + "f1-one-future", + "done", + } + require.EqualValues(t, expected, history) +} + +func TestBlockingSelectSend(t *testing.T) { + var history []string + d := createNewDispatcher(func(ctx Context) { + c1 := NewChannel(ctx) + c2 := NewChannel(ctx) + + Go(ctx, func(ctx Context) { + history = append(history, "add-one") + c1.Send(ctx, "one") + history = append(history, "add-one-done") + }) + Go(ctx, func(ctx Context) { + require.True(t, c2.Len() == 1) + history = append(history, "receiver") + var v string + more := c2.Receive(ctx, &v) + require.True(t, more) + history = append(history, fmt.Sprintf("c2-%v", v)) + require.True(t, c2.Len() == 0) + }) + + selector := NewSelector(ctx) + selector. + AddReceive(c1, func(c ReceiveChannel, more bool) { + var v string + c.Receive(ctx, &v) + history = append(history, fmt.Sprintf("c1-%v", v)) + }). + AddSend(c2, "two", func() { history = append(history, "send2") }) + history = append(history, "select1") + selector.Select(ctx) + + history = append(history, "select2") + selector.Select(ctx) + history = append(history, "done") + + }) + defer d.Close() + requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout)) + require.True(t, d.IsDone(), strings.Join(history, "\n")) + expected := []string{ + "select1", + "add-one", + "add-one-done", + "receiver", + "c1-one", + "select2", + "send2", + "done", + "c2-two", + } + require.EqualValues(t, expected, history) +} + func TestBlockingSelectAsyncSend(t *testing.T) { var history []string d := createNewDispatcher(func(ctx Context) { diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 82a9785be..9f6860fc3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -978,6 +978,10 @@ func (wc *workflowEnvironmentImpl) TryUse(flag sdkFlag) bool { return wc.sdkFlags.tryUse(flag, !wc.isReplay) } +func (wc *workflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return wc.sdkFlags.getFlag(flag) +} + func (wc *workflowEnvironmentImpl) QueueUpdate(name string, f func()) { wc.bufferedUpdateRequests[name] = append(wc.bufferedUpdateRequests[name], f) } diff --git a/internal/internal_flags.go b/internal/internal_flags.go index 503c650a5..89693d839 100644 --- a/internal/internal_flags.go +++ b/internal/internal_flags.go @@ -47,7 +47,10 @@ const ( // SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method. // It will also cause the SDK to immediately handle updates when a handler is registered. SDKPriorityUpdateHandling = 4 - SDKFlagUnknown = math.MaxUint32 + // SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost + // when the Default path is blocked. + SDKFlagBlockedSelectorSignalReceive = 5 + SDKFlagUnknown = math.MaxUint32 ) func sdkFlagFromUint(value uint32) sdkFlag { @@ -62,6 +65,8 @@ func sdkFlagFromUint(value uint32) sdkFlag { return SDKFlagProtocolMessageCommand case uint32(SDKPriorityUpdateHandling): return SDKPriorityUpdateHandling + case uint32(SDKFlagBlockedSelectorSignalReceive): + return SDKFlagBlockedSelectorSignalReceive default: return SDKFlagUnknown } @@ -105,6 +110,11 @@ func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool { } } +// getFlag returns true if the flag is currently set. +func (sf *sdkFlags) getFlag(flag sdkFlag) bool { + return sf.currentFlags[flag] || sf.newFlags[flag] +} + // set marks a flag as in current use regardless of replay status. func (sf *sdkFlags) set(flags ...sdkFlag) { if !sf.capabilities.GetSdkMetadata() { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index dc3865e6c..f7160da8e 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -145,6 +145,8 @@ type ( DrainUnhandledUpdates() bool // TryUse returns true if this flag may currently be used. TryUse(flag sdkFlag) bool + // GetFlag returns if the flag is currently used. + GetFlag(flag sdkFlag) bool } // WorkflowDefinitionFactory factory for creating WorkflowDefinition instances. diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index ce4eb8986..245661027 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1407,8 +1407,17 @@ func (s *selectorImpl) Select(ctx Context) { if readyBranch != nil { return false } - readyBranch = func() { + // readyBranch is not executed when AddDefault is specified, + // setting the value here prevents the signal from being dropped + dropSignalFlag := getWorkflowEnvironment(ctx).GetFlag(SDKFlagBlockedSelectorSignalReceive) + if dropSignalFlag { c.recValue = &v + } + + readyBranch = func() { + if !dropSignalFlag { + c.recValue = &v + } f(c, more) } return true diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 3b9742f73..acefd43ce 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -229,6 +229,8 @@ type ( workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() + + sdkFlags *sdkFlags } testSessionEnvironmentImpl struct { @@ -289,6 +291,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist failureConverter: GetDefaultFailureConverter(), runTimeout: maxWorkflowTimeout, bufferedUpdateRequests: make(map[string][]func()), + sdkFlags: newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), } if debugMode { @@ -581,7 +584,11 @@ func (env *testWorkflowEnvironmentImpl) getWorkflowDefinition(wt WorkflowType) ( } func (env *testWorkflowEnvironmentImpl) TryUse(flag sdkFlag) bool { - return true + return env.sdkFlags.tryUse(flag, true) +} + +func (env *testWorkflowEnvironmentImpl) GetFlag(flag sdkFlag) bool { + return env.sdkFlags.getFlag(flag) } func (env *testWorkflowEnvironmentImpl) QueueUpdate(name string, f func()) { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 472e8f1fa..558ae7a43 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -4240,3 +4240,37 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameWorkflowAndActivityNames() { s.Require().True(env.IsWorkflowCompleted()) s.Require().NoError(env.GetWorkflowError()) } + +func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() { + workflowFn := func(ctx Context) error { + ch1 := GetSignalChannel(ctx, "test-signal") + ch2 := GetSignalChannel(ctx, "test-signal-2") + selector := NewSelector(ctx) + var v string + selector.AddReceive(ch1, func(c ReceiveChannel, more bool) { + c.Receive(ctx, &v) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &v) + }) + selector.Select(ctx) + s.Require().True(ch1.Len() == 0 && v == "s2") + selector.Select(ctx) + + return nil + } + + // send a signal after workflow has started + env := s.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("test-signal", "s1") + env.SignalWorkflow("test-signal-2", "s2") + }, 5*time.Second) + env.ExecuteWorkflow(workflowFn) + s.True(env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + s.Error(err) + var workflowErr *WorkflowExecutionError + s.True(errors.As(err, &workflowErr)) + s.Equal("deadline exceeded (type: ScheduleToClose)", workflowErr.cause.Error()) +} diff --git a/test/replaytests/replay_test.go b/test/replaytests/replay_test.go index 55c55e2f6..452c4501e 100644 --- a/test/replaytests/replay_test.go +++ b/test/replaytests/replay_test.go @@ -464,6 +464,16 @@ func (s *replayTestSuite) TestGogoprotoPayloadWorkflow() { s.NoError(err) } +func (s *replayTestSuite) TestSelectorBlockingDefault() { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow) + // Verify we can still replay an old workflow that does + // not have the SDKFlagBlockedSelectorSignalReceive flag + err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-blocking-default.json") + s.NoError(err) + require.NoError(s.T(), err) +} + type captureConverter struct { converter.DataConverter toPayloads []interface{} diff --git a/test/replaytests/selector-blocking-default.json b/test/replaytests/selector-blocking-default.json new file mode 100644 index 000000000..07c2d0387 --- /dev/null +++ b/test/replaytests/selector-blocking-default.json @@ -0,0 +1,89 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-10-21T23:39:08.991521Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1048587", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "SelectorBlockingDefaultWorkflow" + }, + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowExecutionTimeout": "0s", + "workflowRunTimeout": "0s", + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "identity": "47182@Andrews-MacBook-Pro.local@", + "firstExecutionRunId": "dde5f879-0e59-47e8-8048-ac0f164866fd", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "header": {}, + "workflowId": "hello_world_workflowID" + } + }, + { + "eventId": "2", + "eventTime": "2024-10-21T23:39:08.991569Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1048588", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "hello-world", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-10-21T23:39:08.994898Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1048593", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "47139@Andrews-MacBook-Pro.local@", + "requestId": "a7a50c99-1d0d-449c-9d75-09458ac1e7af", + "historySizeBytes": "282", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + } + } + }, + { + "eventId": "4", + "eventTime": "2024-10-21T23:39:08.999006Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1048597", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "47139@Andrews-MacBook-Pro.local@", + "workerVersion": { + "buildId": "e15e79cbae5f5acc33774a930eed2f97" + }, + "sdkMetadata": { + "langUsedFlags": [ + 3 + ], + "sdkName": "temporal-go", + "sdkVersion": "1.29.1" + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-10-21T23:39:08.999055Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1048598", + "workflowExecutionCompletedEventAttributes": { + "workflowTaskCompletedEventId": "4" + } + } + ] +} \ No newline at end of file diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index 90fbdff5c..97089844a 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -610,3 +610,49 @@ func ListAndDescribeWorkflow(ctx workflow.Context) (int, error) { } return len(result.Executions), nil } + +func SelectorBlockingDefaultWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + ch1 := workflow.NewChannel(ctx) + ch2 := workflow.NewChannel(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch1.Send(ctx, "one") + + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch2.Send(ctx, "two") + }) + + selector := workflow.NewSelector(ctx) + var s string + selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &s) + }) + selector.AddDefault(func() { + ch2.Receive(ctx, &s) + }) + selector.Select(ctx) + if selector.HasPending() { + var result string + activity := workflow.ExecuteActivity(ctx, SelectorBlockingDefaultActivity, "Signal not lost") + activity.Get(ctx, &result) + logger.Info("Result", result) + } else { + logger.Info("Signal in ch1 lost") + return nil + } + return nil +} + +func SelectorBlockingDefaultActivity(ctx context.Context, value string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity", "value", value) + return value + " was logged!", nil +}