Skip to content

Commit

Permalink
fix: prevent infinite reassign loop (#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Nov 7, 2024
1 parent c531c36 commit 780496e
Show file tree
Hide file tree
Showing 15 changed files with 765 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ interface MiniMapProps {

export const MiniMap: React.FC<MiniMapProps> = ({ shape, onClick }) => {
return (
<div className={cn(hasChildSteps(shape) && 'pb-12')}>
<div className={cn('grow', hasChildSteps(shape) && 'pb-12')}>
{shape.jobRuns?.map(({ job, stepRuns }, idx) => {
const steps = job?.steps;

Expand Down Expand Up @@ -106,7 +106,7 @@ export const JobMiniMap: React.FC<JobMiniMapProps> = ({
{columns.map((column, colIndex) => (
<div
key={colIndex}
className="flex flex-col justify-start h-full w-fit"
className="flex flex-col justify-start h-full min-w-fit grow"
>
{column.map((step) => {
const stepRun = normalizedStepRunsByStepId[step.metadata.id];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export default memo(({ data }: { data: StepRunNodeProps }) => {
const stepRun = data.stepRun;

return (
<div className="flex flex-col justify-start min-w-fit">
<div className="flex flex-col justify-start min-w-fit grow">
{(variant == 'default' || variant == 'input_only') && (
<Handle
type="target"
Expand Down
24 changes: 22 additions & 2 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (

"github.com/go-co-op/gocron/v2"
"github.com/goccy/go-json"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/hatchet-dev/hatchet/internal/cel"
"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/datautils/merge"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/services/partition"
"github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
Expand Down Expand Up @@ -665,13 +667,31 @@ func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tena
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign")
defer span.End()

_, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId)
_, stepRunsToFail, err := ec.repo.StepRun().ListStepRunsToReassign(ctx, tenantId)

if err != nil {
return fmt.Errorf("could not list step runs to reassign for tenant %s: %w", tenantId, err)
}

return nil
return queueutils.BatchConcurrent(50, stepRunsToFail, func(stepRuns []*dbsqlc.GetStepRunForEngineRow) error {
var innerErr error

for _, stepRun := range stepRuns {
err := ec.failStepRun(
ctx,
tenantId,
sqlchelpers.UUIDToStr(stepRun.SRID),
"Worker has become inactive, and we exhausted all retries.",
time.Now(),
)

if err != nil {
innerErr = multierror.Append(innerErr, err)
}
}

return innerErr
})
}

func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId, stepRunId string, isRetry bool) error {
Expand Down
Loading

0 comments on commit 780496e

Please sign in to comment.