From 94d14336aad7858f9e3a0df348d1487f647e0495 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Thu, 12 Dec 2024 20:42:13 -0500 Subject: [PATCH] feat(go-sdk): blocking worker (#1106) --- .../controllers/workflows/controller.go | 2 +- pkg/client/dispatcher.go | 33 ++++++-- pkg/repository/prisma/get_group_key_run.go | 10 ++- pkg/worker/worker.go | 76 ++++++++++++++----- pkg/worker/worker_webhook.go | 5 +- 5 files changed, 93 insertions(+), 33 deletions(-) diff --git a/internal/services/controllers/workflows/controller.go b/internal/services/controllers/workflows/controller.go index 1e17a0e1f..585bc497e 100644 --- a/internal/services/controllers/workflows/controller.go +++ b/internal/services/controllers/workflows/controller.go @@ -645,7 +645,7 @@ func (wc *WorkflowsControllerImpl) cancelGetGroupKeyRun(ctx context.Context, ten }) if err != nil { - return fmt.Errorf("could not update step run: %w", err) + return fmt.Errorf("could not update get group key run: %w", err) } // cancel all existing jobs on the workflow run diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index bf3e29cf3..16dbd09fe 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -115,7 +115,7 @@ type Action struct { } type WorkerActionListener interface { - Actions(ctx context.Context) (<-chan *Action, error) + Actions(ctx context.Context) (<-chan *Action, <-chan error, error) Unregister() error } @@ -274,8 +274,9 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc }, &resp.WorkerId, nil } -func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error) { +func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, <-chan error, error) { ch := make(chan *Action) + errCh := make(chan error) a.l.Debug().Msgf("Starting to listen for actions") @@ -316,7 +317,9 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error }() go func() { - for { + retries := 0 + + for retries < DefaultActionListenerRetryCount { assignedAction, err := a.listenClient.Recv() if err != nil { @@ -325,16 +328,19 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error a.l.Debug().Msgf("Context cancelled, closing channel") defer close(ch) + defer close(errCh) + err := a.listenClient.CloseSend() if err != nil { a.l.Error().Msgf("Failed to close send: %v", err) - panic(fmt.Errorf("failed to close send: %w", err)) } return } + retries++ + // if this is an unimplemented error, default to v1 if a.listenerStrategy == ListenerStrategyV2 && status.Code(err) == codes.Unimplemented { a.l.Debug().Msgf("Falling back to v1 listener strategy") @@ -345,12 +351,16 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error if err != nil { a.l.Error().Msgf("Failed to resubscribe: %v", err) - panic(fmt.Errorf("failed to resubscribe: %w", err)) + errCh <- fmt.Errorf("failed to resubscribe: %w", err) } + time.Sleep(DefaultActionListenerRetryInterval) + continue } + retries = 0 + var actionType ActionType switch assignedAction.ActionType { @@ -401,9 +411,20 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error ParentWorkflowRunId: assignedAction.ParentWorkflowRunId, } } + + errCh <- fmt.Errorf("could not subscribe to the worker after %d retries", retries) + + defer close(ch) + defer close(errCh) + + err := a.listenClient.CloseSend() + + if err != nil { + a.l.Error().Msgf("Failed to close send: %v", err) + } }() - return ch, nil + return ch, errCh, nil } func (a *actionListenerImpl) retrySubscribe(ctx context.Context) error { diff --git a/pkg/repository/prisma/get_group_key_run.go b/pkg/repository/prisma/get_group_key_run.go index 33ad41764..7d9f2ad0f 100644 --- a/pkg/repository/prisma/get_group_key_run.go +++ b/pkg/repository/prisma/get_group_key_run.go @@ -190,16 +190,18 @@ func (s *getGroupKeyRunRepository) UpdateGetGroupKeyRun(ctx context.Context, ten return nil, err } - if len(getGroupKeyRuns) == 0 { - return nil, fmt.Errorf("could not find get group key run for engine") - } - err = tx.Commit(ctx) if err != nil { return nil, err } + // in this case, we've committed the update (so we can update timeouts), but we're hitting a case where the Workflow or + // WorkflowRun has been deleted, so we return an error. + if len(getGroupKeyRuns) == 0 { + return nil, fmt.Errorf("could not find get group key run for engine") + } + return getGroupKeyRuns[0], nil } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index e68efcee6..6353d1d87 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -346,10 +346,38 @@ func (w *Worker) registerAction(service, verb string, method any, compute *compu return nil } -// Start starts the worker in blocking fashion +// Start starts the worker in non-blocking fashion, returning a cleanup function and an error if the +// worker could not be started. func (w *Worker) Start() (func() error, error) { ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := w.startBlocking(ctx) + + if err != nil { + // NOTE: this matches the behavior of the old worker, until we change the signature of Start + panic(err) + } + }() + + cleanup := func() error { + cancel() + + w.l.Debug().Msgf("worker %s is stopping...", w.name) + + return nil + } + + return cleanup, nil +} + +// Run starts the worker in blocking fashion, returning an error if the worker could not be started +// or if the worker stopped due to a networking issue. +func (w *Worker) Run(ctx context.Context) error { + return w.startBlocking(ctx) +} + +func (w *Worker) startBlocking(ctx context.Context) error { actionNames := []string{} for _, action := range w.actions { @@ -377,21 +405,35 @@ func (w *Worker) Start() (func() error, error) { w.id = id if err != nil { - cancel() - return nil, fmt.Errorf("could not get action listener: %w", err) + return fmt.Errorf("could not get action listener: %w", err) } - actionCh, err := listener.Actions(ctx) + defer func() { + err := listener.Unregister() + + if err != nil { + w.l.Error().Err(err).Msg("could not unregister worker") + } + }() + + listenerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + actionCh, errCh, err := listener.Actions(listenerCtx) if err != nil { - cancel() - return nil, fmt.Errorf("could not get action channel: %w", err) + return fmt.Errorf("could not get action channel: %w", err) } go func() { for { select { - case action := <-actionCh: + case action, ok := <-actionCh: + if !ok { + w.l.Debug().Msgf("worker %s received action channel closed, stopping", w.name) + return + } + go func(action *client.Action) { err := w.executeAction(context.Background(), action) @@ -408,22 +450,14 @@ func (w *Worker) Start() (func() error, error) { } }() - cleanup := func() error { - cancel() - - w.l.Debug().Msgf("worker %s is stopping...", w.name) - - err := listener.Unregister() - if err != nil { - return fmt.Errorf("could not unregister worker: %w", err) - } - - w.l.Debug().Msgf("worker %s stopped", w.name) - + select { + case <-ctx.Done(): + w.l.Debug().Msgf("worker %s received context done, stopping", w.name) return nil + case err := <-errCh: + w.l.Error().Err(err).Msg("error from listener") + return err } - - return cleanup, nil } func (w *Worker) executeAction(ctx context.Context, assignedAction *client.Action) error { diff --git a/pkg/worker/worker_webhook.go b/pkg/worker/worker_webhook.go index 1789daa55..7e4a09d39 100644 --- a/pkg/worker/worker_webhook.go +++ b/pkg/worker/worker_webhook.go @@ -67,7 +67,7 @@ func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error) { return nil, fmt.Errorf("could not get action listener: %w", err) } - actionCh, err := listener.Actions(ctx) + actionCh, errCh, err := listener.Actions(ctx) if err != nil { cancel() @@ -77,6 +77,9 @@ func (w *Worker) StartWebhook(ww WebhookWorkerOpts) (func() error, error) { go func() { for { select { + case err := <-errCh: + // NOTE: this matches the behavior of the old worker, until we change the signature of the webhook workers + panic(err) case action := <-actionCh: go func(action *client.Action) { err := w.sendWebhook(context.Background(), action, ww)