Skip to content

Commit

Permalink
Feat: dynamic crons fixes (#1073)
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt authored Nov 26, 2024
1 parent 8da776f commit 2320166
Show file tree
Hide file tree
Showing 12 changed files with 565 additions and 512 deletions.
30 changes: 3 additions & 27 deletions api-contracts/openapi/paths/workflow/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -843,24 +843,12 @@ scheduledCreate:
format: uuid
minLength: 36
maxLength: 36
- description: The workflow id
- description: The workflow name
in: path
name: workflow
required: true
schema:
type: string
format: uuid
minLength: 36
maxLength: 36
- description: The workflow version. If not supplied, the latest version is fetched.
in: query
name: version
required: false
schema:
type: string
format: uuid
minLength: 36
maxLength: 36
requestBody:
content:
application/json:
Expand Down Expand Up @@ -1201,7 +1189,7 @@ crons:

cronsCreate:
post:
x-resources: ["tenant", "workflow"]
x-resources: ["tenant"]
description: Create a new cron job workflow trigger for a tenant
operationId: cron-workflow-trigger:create
parameters:
Expand All @@ -1214,24 +1202,12 @@ cronsCreate:
format: uuid
minLength: 36
maxLength: 36
- description: The workflow id
- description: The workflow name
in: path
name: workflow
required: true
schema:
type: string
format: uuid
minLength: 36
maxLength: 36
- description: The workflow version. If not supplied, the latest version is fetched.
in: query
name: version
required: false
schema:
type: string
format: uuid
minLength: 36
maxLength: 36
requestBody:
content:
application/json:
Expand Down
17 changes: 15 additions & 2 deletions api/v1/server/handlers/workflows/create_cron.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package workflows

import (
"strings"

"github.com/labstack/echo/v4"

"github.com/hatchet-dev/hatchet/api/v1/server/oas/apierrors"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/gen"
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)

func (t *WorkflowService) CronWorkflowTriggerCreate(ctx echo.Context, request gen.CronWorkflowTriggerCreateRequestObject) (gen.CronWorkflowTriggerCreateResponseObject, error) {
Expand All @@ -17,18 +20,28 @@ func (t *WorkflowService) CronWorkflowTriggerCreate(ctx echo.Context, request ge
return gen.CronWorkflowTriggerCreate400JSONResponse(apierrors.NewAPIErrors("cron name is required")), nil
}

workflow, err := t.config.EngineRepository.Workflow().GetWorkflowByName(ctx.Request().Context(), tenant.ID, request.Workflow)

if err != nil {
return gen.CronWorkflowTriggerCreate400JSONResponse(apierrors.NewAPIErrors("workflow not found")), nil
}

cronTrigger, err := t.config.APIRepository.Workflow().CreateCronWorkflow(
ctx.Request().Context(), tenant.ID, &repository.CreateCronWorkflowTriggerOpts{
Name: request.Body.CronName,
Cron: request.Body.CronExpression,
Input: request.Body.Input,
AdditionalMetadata: request.Body.AdditionalMetadata,
WorkflowId: request.Workflow.String(),
WorkflowId: sqlchelpers.UUIDToStr(workflow.ID),
},
)

if err != nil {
return gen.CronWorkflowTriggerCreate400JSONResponse(apierrors.NewAPIErrors(err.Error())), nil
if strings.Contains(err.Error(), "unique constraint") {
return gen.CronWorkflowTriggerCreate400JSONResponse(apierrors.NewAPIErrors("cron trigger with that name-expression pair already exists")), nil
}

return gen.CronWorkflowTriggerCreate400JSONResponse(apierrors.NewAPIErrors("error creating cron trigger")), nil
}

return gen.CronWorkflowTriggerCreate200JSONResponse(
Expand Down
9 changes: 8 additions & 1 deletion api/v1/server/handlers/workflows/create_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ import (
"github.com/hatchet-dev/hatchet/api/v1/server/oas/transformers"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
)

func (t *WorkflowService) ScheduledWorkflowRunCreate(ctx echo.Context, request gen.ScheduledWorkflowRunCreateRequestObject) (gen.ScheduledWorkflowRunCreateResponseObject, error) {
tenant := ctx.Get("tenant").(*db.TenantModel)

workflow, err := t.config.EngineRepository.Workflow().GetWorkflowByName(ctx.Request().Context(), tenant.ID, request.Workflow)

if err != nil {
return gen.ScheduledWorkflowRunCreate400JSONResponse(apierrors.NewAPIErrors("workflow not found")), nil
}

scheduled, err := t.config.APIRepository.Workflow().CreateScheduledWorkflow(ctx.Request().Context(), tenant.ID, &repository.CreateScheduledWorkflowRunForWorkflowOpts{
ScheduledTrigger: request.Body.TriggerAt,
Input: request.Body.Input,
AdditionalMetadata: request.Body.AdditionalMetadata,
WorkflowId: request.Workflow.String(),
WorkflowId: sqlchelpers.UUIDToStr(workflow.ID),
})

if err != nil {
Expand Down
Loading

0 comments on commit 2320166

Please sign in to comment.