Skip to content

Commit

Permalink
Make configurable the number of concurrent workflows (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielCosme authored Jun 12, 2024
1 parent 267d1ff commit f39ebe5
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 8 deletions.
6 changes: 4 additions & 2 deletions enduro.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ disabled = true
processNameMetadata = false

[worker]
heartbeatThrottleInterval = "60s"
heartbeatThrottleInterval = "1m"
maxConcurrentWorkflowsExecutionsSize = 15
maxConcurrentSessionExecutionSize = 15

[workflow]
activityHeartbeatTimeout = "20s"
activityHeartbeatTimeout = "30s"
2 changes: 1 addition & 1 deletion internal/workflow/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func acquirePipeline(ctx temporalsdk_workflow.Context, colsvc collection.Service
// Acquire the pipeline semaphore.
{
ctx := temporalsdk_workflow.WithActivityOptions(ctx, temporalsdk_workflow.ActivityOptions{
HeartbeatTimeout: time.Minute,
HeartbeatTimeout: heartBeatTimeout,
WaitForCancellation: false,
ScheduleToStartTimeout: forever,
StartToCloseTimeout: forever,
Expand Down
13 changes: 8 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,11 @@ func main() {

done := make(chan struct{})
w := temporalsdk_worker.New(temporalClient, config.Temporal.TaskQueue, temporalsdk_worker.Options{
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: 5000,
MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
EnableSessionWorker: true,
MaxConcurrentSessionExecutionSize: config.Worker.MaxConcurrentSessionExecutionSize,
MaxConcurrentWorkflowTaskExecutionSize: config.Worker.MaxConcurrentWorkflowsExecutionsSize,
MaxHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
DefaultHeartbeatThrottleInterval: config.Worker.HeartbeatThrottleInterval,
})
if err != nil {
logger.Error(err, "Error creating Temporal worker.")
Expand Down Expand Up @@ -402,7 +403,9 @@ type configuration struct {
}

type WorkerConfig struct {
HeartbeatThrottleInterval time.Duration
HeartbeatThrottleInterval time.Duration
MaxConcurrentSessionExecutionSize int
MaxConcurrentWorkflowsExecutionsSize int
}

func (c configuration) Validate() error {
Expand Down
51 changes: 51 additions & 0 deletions website/content/en/docs/user-manual/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,57 @@ does not include a document with checksums, e.g. `checksum.sha1`.

E.g.: `false`


### `[worker]`

#### `heartbeatThrottleInterval` (String)

Specifies the interval at which Enduro sends heartbeats to the workflow engine.

The string should be constructed as a sequence of decimal numbers, each with
optional fraction and a unit suffix, such as "30m", "24h" or "2h30m".
Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".

E.g.: `5m` (String)

#### `maxConcurrentWorkflowsExecutionsSize` (int)

Sets the maximum number of concurrent workflow executions that Enduro will
accept from the workflow engine.

A good rule of thumb is to set this value to twice the sum of the capacities
of all the configured pipelines. For example, if two pipelines are configured
with a capacity of `5` each, the value should be `20`.

E.g.: `10`

#### `maxConcurrentSessionExecutionSize` (int)

Sets the maximum number of concurrently running (workflow engine) sessions that
Enduro supports. This value governs how many concurrent SIPs are going to be
processed at any given time, regardless of pipeline
capacity. This setting can be used to throttle from a single place how many
concurrent pipelines Enduro will run.

We recommend setting this value to be directly proportional to (or higher than)
the Archivematica pipeline capacity.

E.g.: `5`

### `[worker]`

#### `activityHeartbeatTimeout` (String)

Specifies the timeout duration for activities that send heartbeats to the workflow engine.
If the activity takes more time to send a heartbeat to the workflow engine, the workflow will fail
with a `heartbeatTimeout` error.

The string should be constructed as a sequence of decimal numbers, each with
optional fraction and a unit suffix, such as "30m", "24h" or "2h30m".
Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".

E.g.: `5m` (String)

## Configuration example

Source: https://github.com/artefactual-labs/enduro/blob/main/enduro.toml.
Expand Down

0 comments on commit f39ebe5

Please sign in to comment.