diff --git a/builder.go b/builder.go index 266bc12..1e5afc2 100644 --- a/builder.go +++ b/builder.go @@ -199,10 +199,6 @@ func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Sta b.workflow.runStateChangeHooks[RunStateCompleted] = hook } -func (b *Builder[Type, Status]) OnDeleted(hook RunStateChangeHookFunc[Type, Status]) { - b.workflow.runStateChangeHooks[RunStateDataDeleted] = hook -} - func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status] { b.workflow.eventStreamer = eventStreamer b.workflow.recordStore = recordStore diff --git a/builder_internal_test.go b/builder_internal_test.go index 8a5b9e6..8fb0ede 100644 --- a/builder_internal_test.go +++ b/builder_internal_test.go @@ -125,6 +125,8 @@ func TestBuildOptions(t *testing.T) { pauseAfterErrCount: 3, } deleter := func(object *string) error { + b := []byte(*object) + *object = string(b[:len(b)/2]) return nil } @@ -154,6 +156,11 @@ func TestBuildOptions(t *testing.T) { require.Equal(t, logger, w.logger.inner) require.Equal(t, opts, w.defaultOpts) require.True(t, strings.Contains(runtime.FuncForPC(reflect.ValueOf(w.customDelete).Pointer()).Name(), "github.com/luno/workflow.TestBuildOptions.WithCustomDelete")) + object, err := w.customDelete(&Record{ + Object: []byte(`"hello world"`), + }) + require.NoError(t, err) + require.Equal(t, `"hello"`, string(object)) } func TestAddingCallbacks(t *testing.T) { @@ -168,7 +175,7 @@ func TestAddingCallbacks(t *testing.T) { require.NotNil(t, wf.callback[statusStart][0].CallbackFunc) } -func TestWithTimeoutErrBackOff(t *testing.T) { +func TestAddTimeoutErrBackOff(t *testing.T) { b := NewBuilder[string, testStatus]("determine starting points") b.AddTimeout( statusStart, @@ -184,7 +191,7 @@ func TestWithTimeoutErrBackOff(t *testing.T) { require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].errBackOff) } -func TestWithTimeoutPollingFrequency(t *testing.T) { +func TestAddTimeoutPollingFrequency(t *testing.T) { b := NewBuilder[string, testStatus]("determine starting points") b.AddTimeout( statusStart, @@ -200,6 +207,30 @@ func TestWithTimeoutPollingFrequency(t *testing.T) { require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].pollingFrequency) } +func TestAddTimeoutDontAllowParallelCount(t *testing.T) { + require.PanicsWithValue(t, + "Cannot configure parallel timeout", + func() { + b := NewBuilder[string, testStatus]("") + b.AddTimeout(statusStart, nil, nil, statusEnd).WithOptions( + ParallelCount(2), + ) + }, + ) +} + +func TestAddTimeoutDontAllowLag(t *testing.T) { + require.PanicsWithValue(t, + "Cannot configure lag for timeout", + func() { + b := NewBuilder[string, testStatus]("") + b.AddTimeout(statusStart, nil, nil, statusEnd).WithOptions( + ConsumeLag(time.Hour), + ) + }, + ) +} + func TestConnectorConstruction(t *testing.T) { fn := func(ctx context.Context, w *Workflow[string, testStatus], e *ConnectorEvent) error { return nil @@ -400,3 +431,50 @@ func TestWithDefaultOptions(t *testing.T) { require.Equal(t, 6*time.Hour, wf.defaultOpts.lag) require.Equal(t, 700, wf.defaultOpts.pauseAfterErrCount) } + +func TestConnectorNamesAreUnique(t *testing.T) { + require.PanicsWithValue(t, + "connector names need to be unique", + func() { + b := NewBuilder[string, testStatus]("workflow X") + b.AddConnector("my-test-connector", nil, nil) + b.AddConnector("my-test-connector", nil, nil) + }, + ) +} + +func TestOnPause(t *testing.T) { + b := NewBuilder[string, testStatus]("") + testErr := errors.New("test error") + fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error { + return testErr + } + b.OnPause(fn) + actualFn, ok := b.workflow.runStateChangeHooks[RunStatePaused] + require.True(t, ok) + require.Equal(t, testErr, actualFn(nil, nil)) +} + +func TestOnCancel(t *testing.T) { + b := NewBuilder[string, testStatus]("") + testErr := errors.New("test error") + fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error { + return testErr + } + b.OnCancel(fn) + actualFn, ok := b.workflow.runStateChangeHooks[RunStateCancelled] + require.True(t, ok) + require.Equal(t, testErr, actualFn(nil, nil)) +} + +func TestOnComplete(t *testing.T) { + b := NewBuilder[string, testStatus]("") + testErr := errors.New("test error") + fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error { + return testErr + } + b.OnComplete(fn) + actualFn, ok := b.workflow.runStateChangeHooks[RunStateCompleted] + require.True(t, ok) + require.Equal(t, testErr, actualFn(nil, nil)) +}