diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index e822fdd40..8d9c251c9 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -437,6 +437,11 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms return fmt.Errorf("could not get worker: %w", err) } + if len(workers) == 0 { + d.l.Warn().Msgf("worker %s not found, ignoring task for step run %s", payload.WorkerId, payload.StepRunId) + return nil + } + // load the step run from the database stepRun, err := d.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index 8a1418acb..52946f1fb 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -296,7 +296,13 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream _, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, true, sessionEstablished) if err != nil { - s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId) + lastSessionEstablished := "NULL" + + if worker.LastListenerEstablished.Valid { + lastSessionEstablished = worker.LastListenerEstablished.Time.String() + } + + s.l.Error().Err(err).Msgf("could not update worker %s active status to true (session established %s, last session established %s)", request.WorkerId, sessionEstablished.String(), lastSessionEstablished) return err } @@ -323,7 +329,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream _, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, false, sessionEstablished) if err != nil { - s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId) + s.l.Error().Err(err).Msgf("could not update worker %s active status to false due to worker stream closing (session established %s)", request.WorkerId, sessionEstablished.String()) return err } @@ -337,7 +343,7 @@ func (s *DispatcherImpl) ListenV2(request *contracts.WorkerListenRequest, stream _, err = s.repo.Worker().UpdateWorkerActiveStatus(ctx, tenantId, request.WorkerId, false, sessionEstablished) if err != nil { - s.l.Error().Err(err).Msgf("could not update worker %s active status", request.WorkerId) + s.l.Error().Err(err).Msgf("could not update worker %s active status due to worker disconnecting (session established %s)", request.WorkerId, sessionEstablished.String()) return err }