Skip to content

Commit

Permalink
feat(go-sdk): blocking worker (#1106)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Dec 13, 2024
1 parent 6a0d8c7 commit 94d1433
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 33 deletions.
2 changes: 1 addition & 1 deletion internal/services/controllers/workflows/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten
})

if err != nil {
return fmt.Errorf("could not update step run: %w", err)
return fmt.Errorf("could not update get group key run: %w", err)
}

// cancel all existing jobs on the workflow run
Expand Down
33 changes: 27 additions & 6 deletions pkg/client/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type Action struct {
}

type WorkerActionListener interface {
Actions(ctx context.Context) (<-chan *Action, error)
Actions(ctx context.Context) (<-chan *Action, <-chan error, error)

Unregister() error
}
Expand Down Expand Up @@ -274,8 +274,9 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc
}, &resp.WorkerId, nil
}

func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error) {
func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, <-chan error, error) {
ch := make(chan *Action)
errCh := make(chan error)

a.l.Debug().Msgf("Starting to listen for actions")

Expand Down Expand Up @@ -316,7 +317,9 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
}()

go func() {
for {
retries := 0

for retries < DefaultActionListenerRetryCount {
assignedAction, err := a.listenClient.Recv()

if err != nil {
Expand All @@ -325,16 +328,19 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
a.l.Debug().Msgf("Context cancelled, closing channel")

defer close(ch)
defer close(errCh)

err := a.listenClient.CloseSend()

if err != nil {
a.l.Error().Msgf("Failed to close send: %v", err)
panic(fmt.Errorf("failed to close send: %w", err))
}

return
}

retries++

// if this is an unimplemented error, default to v1
if a.listenerStrategy == ListenerStrategyV2 && status.Code(err) == codes.Unimplemented {
a.l.Debug().Msgf("Falling back to v1 listener strategy")
Expand All @@ -345,12 +351,16 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error

if err != nil {
a.l.Error().Msgf("Failed to resubscribe: %v", err)
panic(fmt.Errorf("failed to resubscribe: %w", err))
errCh <- fmt.Errorf("failed to resubscribe: %w", err)
}

time.Sleep(DefaultActionListenerRetryInterval)

continue
}

retries = 0

var actionType ActionType

switch assignedAction.ActionType {
Expand Down Expand Up @@ -401,9 +411,20 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
ParentWorkflowRunId: assignedAction.ParentWorkflowRunId,
}
}

errCh <- fmt.Errorf("could not subscribe to the worker after %d retries", retries)

defer close(ch)
defer close(errCh)

err := a.listenClient.CloseSend()

if err != nil {
a.l.Error().Msgf("Failed to close send: %v", err)
}
}()

return ch, nil
return ch, errCh, nil
}

func (a *actionListenerImpl) retrySubscribe(ctx context.Context) error {
Expand Down
10 changes: 6 additions & 4 deletions pkg/repository/prisma/get_group_key_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,18 @@ func (s *getGroupKeyRunRepository) UpdateGetGroupKeyRun(ctx context.Context, ten
return nil, err
}

if len(getGroupKeyRuns) == 0 {
return nil, fmt.Errorf("could not find get group key run for engine")
}

err = tx.Commit(ctx)

if err != nil {
return nil, err
}

// in this case, we've committed the update (so we can update timeouts), but we're hitting a case where the Workflow or
// WorkflowRun has been deleted, so we return an error.
if len(getGroupKeyRuns) == 0 {
return nil, fmt.Errorf("could not find get group key run for engine")
}

return getGroupKeyRuns[0], nil
}

Expand Down
76 changes: 55 additions & 21 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,38 @@ func (w *Worker) registerAction(service, verb string, method any, compute *compu
return nil
}

// Start starts the worker in blocking fashion
// Start starts the worker in non-blocking fashion, returning a cleanup function and an error if the
// worker could not be started.
func (w *Worker) Start() (func() error, error) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
err := w.startBlocking(ctx)

if err != nil {
// NOTE: this matches the behavior of the old worker, until we change the signature of Start
panic(err)
}
}()

cleanup := func() error {
cancel()

w.l.Debug().Msgf("worker %s is stopping...", w.name)

return nil
}

return cleanup, nil
}

// Run starts the worker in blocking fashion, returning an error if the worker could not be started
// or if the worker stopped due to a networking issue.
func (w *Worker) Run(ctx context.Context) error {
return w.startBlocking(ctx)
}

func (w *Worker) startBlocking(ctx context.Context) error {
actionNames := []string{}

for _, action := range w.actions {
Expand Down Expand Up @@ -377,21 +405,35 @@ func (w *Worker) Start() (func() error, error) {
w.id = id

if err != nil {
cancel()
return nil, fmt.Errorf("could not get action listener: %w", err)
return fmt.Errorf("could not get action listener: %w", err)
}

actionCh, err := listener.Actions(ctx)
defer func() {
err := listener.Unregister()

if err != nil {
w.l.Error().Err(err).Msg("could not unregister worker")
}
}()

listenerCtx, cancel := context.WithCancel(ctx)
defer cancel()

actionCh, errCh, err := listener.Actions(listenerCtx)

if err != nil {
cancel()
return nil, fmt.Errorf("could not get action channel: %w", err)
return fmt.Errorf("could not get action channel: %w", err)
}

go func() {
for {
select {
case action := <-actionCh:
case action, ok := <-actionCh:
if !ok {
w.l.Debug().Msgf("worker %s received action channel closed, stopping", w.name)
return
}

go func(action *client.Action) {
err := w.executeAction(context.Background(), action)

Expand All @@ -408,22 +450,14 @@ func (w *Worker) Start() (func() error, error) {
}
}()

cleanup := func() error {
cancel()

w.l.Debug().Msgf("worker %s is stopping...", w.name)

err := listener.Unregister()
if err != nil {
return fmt.Errorf("could not unregister worker: %w", err)
}

w.l.Debug().Msgf("worker %s stopped", w.name)

select {
case <-ctx.Done():
w.l.Debug().Msgf("worker %s received context done, stopping", w.name)
return nil
case err := <-errCh:
w.l.Error().Err(err).Msg("error from listener")
return err
}

return cleanup, nil
}

func (w *Worker) executeAction(ctx context.Context, assignedAction *client.Action) error {
Expand Down
5 changes: 4 additions & 1 deletion pkg/worker/worker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error) {
return nil, fmt.Errorf("could not get action listener: %w", err)
}

actionCh, err := listener.Actions(ctx)
actionCh, errCh, err := listener.Actions(ctx)

if err != nil {
cancel()
Expand All @@ -77,6 +77,9 @@ func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error) {
go func() {
for {
select {
case err := <-errCh:
// NOTE: this matches the behavior of the old worker, until we change the signature of the webhook workers
panic(err)
case action := <-actionCh:
go func(action *client.Action) {
err := w.sendWebhook(context.Background(), action, ww)
Expand Down

0 comments on commit 94d1433

Please sign in to comment.