Skip to content

Commit

Permalink
workflow: Add hooks to builder (#55)
Browse files Browse the repository at this point in the history
* workflow: Add hooks to builder

* add runstate testing of transition changes

* improve builder test cases

* add internal hook testing

* cover last edge case for hooks.go
  • Loading branch information
andrewwormald authored Oct 17, 2024
1 parent 4e70b2a commit 3191796
Show file tree
Hide file tree
Showing 21 changed files with 1,162 additions and 68 deletions.
56 changes: 39 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ Head on over to [./_examples](./_examples) to get familiar with **callbacks**, *
---

## Workflow's RunState
RunState is the state of a workflow run and can only exist in one state at any given time. RunState is a
finite state machine and allows for control over the workflow run. A workflow run is every instance of
RunState is the state of a Run and can only exist in one state at any given time. RunState is a
finite state machine and allows for control over the Run. A Run is every instance of
a triggered workflow.
```mermaid
---
Expand All @@ -199,6 +199,27 @@ stateDiagram-v2
RequestedDataDeleted-->DataDeleted
}
```
---
## Hooks

Hooks allow for you to write some functionality for Runs that enter a specific RunState. For example when
using `PauseAfterErrCount` the usage of the OnPause hook can be used to send a notification to a team to notify
them that a specific Run has errored to the threshold and now has been paused and should be investigated. Another
example is handling a known sentinel error in a Workflow Run and cancelling the Run by calling (where r is *Run)
r.Cancel(ctx) or if a Workflow Run is manually cancelled from a UI then a notifgication can be sent to the team for visibility.

Hooks run in an event consumer. This means that it will retry until a nil error has been returned and is durable
across deploys and interruptions. At-least-once delivery is guaranteed, and it is advised to use the RunID as an
idempotency key to ensure that the operation is idempotent.

### Available Hooks:

| Hook | Parameter(s) | Return(s) | Description | Is Event Driven? |
|---------------|---------------------------------|-----------|-------------------------------------------|------------------|
| OnPause | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStatePaused | Yes |
| OnCancelled | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateCancelled | Yes |
| OnDataDeleted | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateDeleted | Yes |
| OnCompleted | workflow.RunStateChangeHookFunc | error | Fired when a Run enters RunStateCompleted | Yes |

---

Expand Down Expand Up @@ -326,18 +347,19 @@ b.AddStep(

## Glossary

| **Term** | **Description** |
|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Builder** | A struct type that facilitates the construction of workflows. It provides methods for adding steps, callbacks, timeouts, and connecting workflows. |
| **Callback** | A method in the workflow API that can be used to trigger a callback function for a specified status. It passes data from a reader to the specified callback function. |
| **Consumer** | A component that consumes events from an event stream. In this context, it refers to the background consumer goroutines launched by the workflow. |
| **EventStreamer**| An interface representing a stream for workflow events. It includes methods for producing and consuming events. |
| **Graph** | A representation of the workflow's structure, showing the relationships between different statuses and transitions. |
| **Producer** | A component that produces events to an event stream. It is responsible for sending events to the stream. |
| **Record** | Is the wire format and representation of a run that can be stored and retrieved. The RecordStore is used for storing and retrieving records. |
| **RecordStore** | An interface representing a store for Record(s). It defines the methods needed for storing and retrieving records. The RecordStore's underlying technology must support transactions in order to prevent dual-writes. |
| **RoleScheduler**| An interface representing a scheduler for roles in the workflow. It is responsible for coordinating the execution of different roles. |
| **Topic** | A method that generates a topic for producing events in the event streamer based on the workflow name and status. |
| **Trigger** | A method in the workflow API that initiates a workflow for a specified foreignID and starting status. It returns a runID and allows for additional configuration options. |
| **WireFormat** | A format used for serializing and deserializing data for communication between workflow components. It refers to the wire format of the WireRecord. |
| **WireRecord** | A struct representing a record with additional metadata used for communication between workflow components. It can be marshaled to a wire format for storage and transmission. |
| **Term** | **Description** |
|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Builder** | A struct type that facilitates the construction of workflows. It provides methods for adding steps, callbacks, timeouts, and connecting workflows. |
| **Callback** | A method in the workflow API that can be used to trigger a callback function for a specified status. It passes data from a reader to the specified callback function. |
| **Consumer** | A component that consumes events from an event stream. In this context, it refers to the background consumer goroutines launched by the workflow. |
| **EventStreamer** | An interface representing a stream for workflow events. It includes methods for producing and consuming events. |
| **Graph** | A representation of the workflow's structure, showing the relationships between different statuses and transitions. |
| **Hooks** | An event driven process that take place on a Workflow's Run's lifecycle defined in a finite number of states called RunState. |
| **Producer** | A component that produces events to an event stream. It is responsible for sending events to the stream. |
| **Record** | Is the "wire format" and representation of a Run that can be stored and retrieved. The RecordStore is used for storing and retrieving records. |
| **RecordStore** | An interface representing a store for Record(s). It defines the methods needed for storing and retrieving records. The RecordStore's underlying technology must support transactions in order to prevent dual-writes. |
| **RoleScheduler** | An interface representing a scheduler for roles in the workflow. It is responsible for coordinating the execution of different roles. |
| **Run** | A Run is the representation of the instance that is created and processed by the Workflow. Each time Trigger is called a new "Run" is created. |
| **RunState** | RunState defines the finite number of states that a Run can be in. This is used to control and monitor the lifecycle of Runs. |
| **Topic** | A method that generates a topic for producing events in the event streamer based on the workflow name and status. |
| **Trigger** | A method in the workflow API that initiates a workflow for a specified foreignID and starting status. It returns a Run ID and allows for additional configuration options. |
4 changes: 2 additions & 2 deletions adapters/adaptertest/recordstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func testListOutboxEvents(t *testing.T, factory func() workflow.RecordStore) {
maker := func(recordID int64) (workflow.OutboxEventData, error) {
// Record ID would not have been set if it is a new record. Assign the recordID that the Store provides
expected.ID = recordID
return workflow.WireRecordToOutboxEventData(*expected, workflow.RunStateInitiated)
return workflow.RecordToOutboxEventData(*expected, workflow.RunStateInitiated)
}

err := store.Store(ctx, expected, maker)
Expand Down Expand Up @@ -150,7 +150,7 @@ func testDeleteOutboxEvent(t *testing.T, factory func() workflow.RecordStore) {
maker := func(recordID int64) (workflow.OutboxEventData, error) {
// Run ID would not have been set if it is a new record. Assign the recordID that the Store provides
expected.ID = recordID
return workflow.WireRecordToOutboxEventData(*expected, workflow.RunStateInitiated)
return workflow.RecordToOutboxEventData(*expected, workflow.RunStateInitiated)
}

err := store.Store(ctx, expected, maker)
Expand Down
14 changes: 11 additions & 3 deletions await.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func (w *Workflow[Type, Status]) Await(ctx context.Context, foreignID, runID str

func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Context, w *Workflow[Type, Status], status Status, foreignID, runID string, role string, pollFrequency time.Duration) (*Run[Type, Status], error) {
topic := Topic(w.Name, int(status))
// Terminal statuses result in the RunState changing to Completed and are stored in the RunStateChangeTopic
// as it is a key event in the Workflow Run's lifecycle.
if w.statusGraph.IsTerminal(int(status)) {
topic = RunStateChangeTopic(w.Name)
}

stream, err := w.eventStreamer.NewConsumer(
ctx,
topic,
Expand Down Expand Up @@ -77,9 +83,11 @@ func awaitWorkflowStatusByForeignID[Type any, Status StatusType](ctx context.Con
}

return &Run[Type, Status]{
Record: *r,
Status: Status(r.Status),
Object: &t,
TypedRecord: TypedRecord[Type, Status]{
Record: *r,
Status: Status(r.Status),
Object: &t,
},
controller: NewRunStateController(w.recordStore.Store, r),
}, ack()
}
Expand Down
13 changes: 13 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewBuilder[Type any, Status StatusType](name string) *Builder[Type, Status]
debugMode: false, // Explicit for readability
inner: interal_logger.New(os.Stdout),
},
runStateChangeHooks: make(map[RunState]RunStateChangeHookFunc[Type, Status]),
},
}
}
Expand Down Expand Up @@ -186,6 +187,18 @@ func (c *connectorUpdater[Type, Status]) WithOptions(opts ...Option) {
c.config.lagAlert = connectorOpts.lagAlert
}

func (b *Builder[Type, Status]) OnPause(hook RunStateChangeHookFunc[Type, Status]) {
b.workflow.runStateChangeHooks[RunStatePaused] = hook
}

func (b *Builder[Type, Status]) OnCancel(hook RunStateChangeHookFunc[Type, Status]) {
b.workflow.runStateChangeHooks[RunStateCancelled] = hook
}

func (b *Builder[Type, Status]) OnComplete(hook RunStateChangeHookFunc[Type, Status]) {
b.workflow.runStateChangeHooks[RunStateCompleted] = hook
}

func (b *Builder[Type, Status]) Build(eventStreamer EventStreamer, recordStore RecordStore, roleScheduler RoleScheduler, opts ...BuildOption) *Workflow[Type, Status] {
b.workflow.eventStreamer = eventStreamer
b.workflow.recordStore = recordStore
Expand Down
82 changes: 80 additions & 2 deletions builder_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func TestBuildOptions(t *testing.T) {
pauseAfterErrCount: 3,
}
deleter := func(object *string) error {
b := []byte(*object)
*object = string(b[:len(b)/2])
return nil
}

Expand Down Expand Up @@ -154,6 +156,11 @@ func TestBuildOptions(t *testing.T) {
require.Equal(t, logger, w.logger.inner)
require.Equal(t, opts, w.defaultOpts)
require.True(t, strings.Contains(runtime.FuncForPC(reflect.ValueOf(w.customDelete).Pointer()).Name(), "github.com/luno/workflow.TestBuildOptions.WithCustomDelete"))
object, err := w.customDelete(&Record{
Object: []byte(`"hello world"`),
})
require.NoError(t, err)
require.Equal(t, `"hello"`, string(object))
}

func TestAddingCallbacks(t *testing.T) {
Expand All @@ -168,7 +175,7 @@ func TestAddingCallbacks(t *testing.T) {
require.NotNil(t, wf.callback[statusStart][0].CallbackFunc)
}

func TestWithTimeoutErrBackOff(t *testing.T) {
func TestAddTimeoutErrBackOff(t *testing.T) {
b := NewBuilder[string, testStatus]("determine starting points")
b.AddTimeout(
statusStart,
Expand All @@ -184,7 +191,7 @@ func TestWithTimeoutErrBackOff(t *testing.T) {
require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].errBackOff)
}

func TestWithTimeoutPollingFrequency(t *testing.T) {
func TestAddTimeoutPollingFrequency(t *testing.T) {
b := NewBuilder[string, testStatus]("determine starting points")
b.AddTimeout(
statusStart,
Expand All @@ -200,6 +207,30 @@ func TestWithTimeoutPollingFrequency(t *testing.T) {
require.Equal(t, time.Minute, b.workflow.timeouts[statusStart].pollingFrequency)
}

func TestAddTimeoutDontAllowParallelCount(t *testing.T) {
require.PanicsWithValue(t,
"Cannot configure parallel timeout",
func() {
b := NewBuilder[string, testStatus]("")
b.AddTimeout(statusStart, nil, nil, statusEnd).WithOptions(
ParallelCount(2),
)
},
)
}

func TestAddTimeoutDontAllowLag(t *testing.T) {
require.PanicsWithValue(t,
"Cannot configure lag for timeout",
func() {
b := NewBuilder[string, testStatus]("")
b.AddTimeout(statusStart, nil, nil, statusEnd).WithOptions(
ConsumeLag(time.Hour),
)
},
)
}

func TestConnectorConstruction(t *testing.T) {
fn := func(ctx context.Context, w *Workflow[string, testStatus], e *ConnectorEvent) error {
return nil
Expand Down Expand Up @@ -400,3 +431,50 @@ func TestWithDefaultOptions(t *testing.T) {
require.Equal(t, 6*time.Hour, wf.defaultOpts.lag)
require.Equal(t, 700, wf.defaultOpts.pauseAfterErrCount)
}

func TestConnectorNamesAreUnique(t *testing.T) {
require.PanicsWithValue(t,
"connector names need to be unique",
func() {
b := NewBuilder[string, testStatus]("workflow X")
b.AddConnector("my-test-connector", nil, nil)
b.AddConnector("my-test-connector", nil, nil)
},
)
}

func TestOnPause(t *testing.T) {
b := NewBuilder[string, testStatus]("")
testErr := errors.New("test error")
fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error {
return testErr
}
b.OnPause(fn)
actualFn, ok := b.workflow.runStateChangeHooks[RunStatePaused]
require.True(t, ok)
require.Equal(t, testErr, actualFn(nil, nil))
}

func TestOnCancel(t *testing.T) {
b := NewBuilder[string, testStatus]("")
testErr := errors.New("test error")
fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error {
return testErr
}
b.OnCancel(fn)
actualFn, ok := b.workflow.runStateChangeHooks[RunStateCancelled]
require.True(t, ok)
require.Equal(t, testErr, actualFn(nil, nil))
}

func TestOnComplete(t *testing.T) {
b := NewBuilder[string, testStatus]("")
testErr := errors.New("test error")
fn := func(ctx context.Context, record *TypedRecord[string, testStatus]) error {
return testErr
}
b.OnComplete(fn)
actualFn, ok := b.workflow.runStateChangeHooks[RunStateCompleted]
require.True(t, ok)
require.Equal(t, testErr, actualFn(nil, nil))
}
13 changes: 12 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type OutboxEventData struct {
Data []byte
}

func WireRecordToOutboxEventData(record Record, previousRunState RunState) (OutboxEventData, error) {
func RecordToOutboxEventData(record Record, previousRunState RunState) (OutboxEventData, error) {
topic := Topic(record.WorkflowName, record.Status)

// Any record that is updated with a RunState of RunStateRequestedDataDeleted has it's events pushed into
Expand All @@ -77,6 +77,17 @@ func WireRecordToOutboxEventData(record Record, previousRunState RunState) (Outb
topic = DeleteTopic(record.WorkflowName)
}

// Records being updated to any of the following RunStates means that the event should be directed to the
// RunStateChangeTopic. This is because normal consumers in workflow should not consume events that relate
// to any of the below RunStates. The separate topic allows for hooks to consume the RunState changes and
// respond to changes in RunState.
if record.RunState == RunStatePaused ||
record.RunState == RunStateCancelled ||
record.RunState == RunStateDataDeleted ||
record.RunState == RunStateCompleted {
topic = RunStateChangeTopic(record.WorkflowName)
}

headers := make(map[string]string)
headers[string(HeaderForeignID)] = record.ForeignID
headers[string(HeaderWorkflowName)] = record.WorkflowName
Expand Down
Loading

0 comments on commit 3191796

Please sign in to comment.