Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lost signal from Selector when Default path blocks #1682

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,191 @@ func TestBlockingSelect(t *testing.T) {
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefault(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want the same test both here and in internal_workflow_testsuite_test.go right? Which file should hold this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe this is fine, one set tests w/ and w/o flags, and the other does default behavior

// manually create a dispatcher to ensure sdkFlags are not set
var history []string
env := &workflowEnvironmentImpl{
sdkFlags: &sdkFlags{},
commandsHelper: newCommandsHelper(),
dataConverter: converter.GetDefaultDataConverter(),
workflowInfo: &WorkflowInfo{
Namespace: "namespace:" + t.Name(),
TaskQueueName: "taskqueue:" + t.Name(),
},
}
interceptor, ctx, err := newWorkflowContext(env, nil)
require.NoError(t, err, "newWorkflowContext failed")
d, _ := newDispatcher(ctx, interceptor, func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
require.False(t, selector.HasPending())
selector.Select(ctx)

// Default behavior this signal is lost
require.True(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
require.False(t, selector.HasPending())
history = append(history, "done")
}, func() bool { return false })
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"done",
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingDefaultWithFlag(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reported bug was that blocking in the default case of a selector could cause signals to be lost, when I last looked at these tests we didn't seem to have any coverage for blocking in one selector case while a signal is received. Can we add tests to verify their is no bugs if a signal is received while blocking in another case of a selector, not just default?

// sdkFlags are set by default for tests
var history []string
d := createNewDispatcher(func(ctx Context) {
c1 := NewChannel(ctx)
c2 := NewChannel(ctx)

Go(ctx, func(ctx Context) {
history = append(history, "add-one")
c1.Send(ctx, "one")
history = append(history, "add-one-done")

})

Go(ctx, func(ctx Context) {
history = append(history, "add-two")
c2.Send(ctx, "two")
history = append(history, "add-two-done")
})

selector := NewSelector(ctx)
var v string
selector.
AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).
AddDefault(func() {
c2.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c2-%v", v))
})
history = append(history, "select1")
require.False(t, selector.HasPending())
selector.Select(ctx)

// Signal should not be lost
require.False(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
require.True(t, selector.HasPending())
selector.Select(ctx)
require.False(t, selector.HasPending())
history = append(history, "done")
})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
"select1",
"add-one",
"add-one-done",
"add-two",
"add-two-done",
"c2-two",
"select2",
"c1-one",
"done",
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingFuture(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
// TODO
c1 := NewChannel(ctx)
selector := NewSelector(ctx)
var v string
selector.AddReceive(c1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
history = append(history, fmt.Sprintf("c1-%v", v))
}).AddFuture(NewTimer(ctx, time.Second), func(f Future) {
f.Get(ctx, nil)
history = append(history, "future")
})

history = append(history, "select1")
require.False(t, selector.HasPending())
selector.Select(ctx)

// Signal should not be lost
require.False(t, c1.Len() == 0 && v == "two")

history = append(history, "select2")
require.True(t, selector.HasPending())
selector.Select(ctx)
require.False(t, selector.HasPending())
history = append(history, "done")
})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
// TODO
}
require.EqualValues(t, expected, history)
}

func TestSelectBlockingSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
// TODO
})
defer d.Close()
requireNoExecuteErr(t, d.ExecuteUntilAllBlocked(defaultDeadlockDetectionTimeout))
require.True(t, d.IsDone())

expected := []string{
// TODO
}
require.EqualValues(t, expected, history)
}

func TestBlockingSelectAsyncSend(t *testing.T) {
var history []string
d := createNewDispatcher(func(ctx Context) {
Expand Down
7 changes: 6 additions & 1 deletion internal/internal_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ const (
// SDKPriorityUpdateHandling will cause update request to be handled before the main workflow method.
// It will also cause the SDK to immediately handle updates when a handler is registered.
SDKPriorityUpdateHandling = 4
SDKFlagUnknown = math.MaxUint32
// SDKFlagBlockedSelectorSignalReceive will cause a signal to not be lost
// when the Default path is blocked.
SDKFlagBlockedSelectorSignalReceive = 5
SDKFlagUnknown = math.MaxUint32
)

func sdkFlagFromUint(value uint32) sdkFlag {
Expand All @@ -62,6 +65,8 @@ func sdkFlagFromUint(value uint32) sdkFlag {
return SDKFlagProtocolMessageCommand
case uint32(SDKPriorityUpdateHandling):
return SDKPriorityUpdateHandling
case uint32(SDKFlagBlockedSelectorSignalReceive):
return SDKFlagBlockedSelectorSignalReceive
default:
return SDKFlagUnknown
}
Expand Down
11 changes: 10 additions & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,8 +1407,17 @@ func (s *selectorImpl) Select(ctx Context) {
if readyBranch != nil {
return false
}
readyBranch = func() {
// readyBranch is not executed when AddDefault is specified,
// setting the value here prevents the signal from being dropped
dropSignalFlag := getWorkflowEnvironment(ctx).TryUse(SDKFlagBlockedSelectorSignalReceive)
yuandrew marked this conversation as resolved.
Show resolved Hide resolved
if dropSignalFlag {
c.recValue = &v
}

readyBranch = func() {
if !dropSignalFlag {
c.recValue = &v
}
f(c, more)
}
return true
Expand Down
51 changes: 51 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/protobuf/proto"

"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -4240,3 +4241,53 @@ func (s *WorkflowTestSuiteUnitTest) Test_SameWorkflowAndActivityNames() {
s.Require().True(env.IsWorkflowCompleted())
s.Require().NoError(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_SignalLoss() {
workflowFn := func(ctx Context) error {
ch1 := GetSignalChannel(ctx, "test-signal")
ch2 := GetSignalChannel(ctx, "test-signal-2")
selector := NewSelector(ctx)
var v string
selector.AddReceive(ch1, func(c ReceiveChannel, more bool) {
c.Receive(ctx, &v)
fmt.Println("received signal from ch1")
})
selector.AddDefault(func() {
ch2.Receive(ctx, &v)
fmt.Println("received signal from ch2")
})
selector.Select(ctx)
fmt.Println("ch1.Len()", ch1.Len(), "s", v)
// default behavior is this signal is lost
s.Require().True(ch1.Len() == 0 && "s2" == v)

return nil
}

// send a signal 5 seconds after workflow started
env := s.NewTestWorkflowEnvironment()
env.RegisterDelayedCallback(func() {
fmt.Println("sending signal to 1")
env.SignalWorkflow("test-signal", "s1")
fmt.Println("sending signal to 2")
env.SignalWorkflow("test-signal-2", "s2")
}, 5*time.Second)
env.ExecuteWorkflow(workflowFn)
s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}

// Not sure we need this?
func (s *WorkflowTestSuiteUnitTest) Test_SDKFlagBlockedSelectorSignalReceive() {
flags := newSDKFlags(&workflowservice.GetSystemInfoResponse_Capabilities{})
flags.set(SDKFlagBlockedSelectorSignalReceive)
fmt.Println("flags", flags)
}

func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingFuture() {
// TODO
}

func (s *WorkflowTestSuiteUnitTest) Test_SelectBlockingSend() {
// TODO
}
10 changes: 10 additions & 0 deletions test/replaytests/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,16 @@ func (s *replayTestSuite) TestGogoprotoPayloadWorkflow() {
s.NoError(err)
}

func (s *replayTestSuite) TestSelectorBlockingDefault() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflow(SelectorBlockingDefaultWorkflow)
// Verify we can still replay an old workflow that does
// not have the SDKFlagBlockedSelectorSignalReceive flag
err := replayer.ReplayWorkflowHistoryFromJSONFile(ilog.NewDefaultLogger(), "selector-blocking-default.json")
s.NoError(err)
require.NoError(s.T(), err)
}

type captureConverter struct {
converter.DataConverter
toPayloads []interface{}
Expand Down
Loading
Loading