From 6b7f21b69f2445832bd20e7c0531564fa193a604 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 12 Sep 2024 15:00:08 +0200 Subject: [PATCH 1/3] correct default values --- .../adapter/c8/springboot/C8AdapterProperties.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt index 3dcc1b9..96ec36e 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt @@ -53,11 +53,11 @@ class C8AdapterProperties( /** * Fixed rate for scheduled user task delivery. */ - val scheduleDeliveryFixedRateInSeconds: Long = 5_000L, + val scheduleDeliveryFixedRateInSeconds: Long = 5L, /** * Fixed rate for refreshing user task delivery. */ - val subscribingDeliveryInitialDelayInSeconds: Long = 5_000L, + val subscribingDeliveryInitialDelayInSeconds: Long = 5L, /** * URL of the task list. */ From 792c7cab9ff853f9c00ebd74d6febb39512f9760 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Thu, 12 Sep 2024 15:00:12 +0200 Subject: [PATCH 2/3] formatting --- .../adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt index e7e245d..a2ee5de 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt @@ -72,7 +72,8 @@ class SubscribingServiceTaskDelivery( // FIXME -> tenantId // FIXME -> more to setup from props return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) { - this.fetchVariables(subscription.payloadDescription!!.toList()) + this + .fetchVariables(subscription.payloadDescription!!.toList()) } else { this } From 84572817621e71c083e31d55c81e2320186f5568 Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Mon, 16 Sep 2024 16:25:26 +0200 Subject: [PATCH 3/3] improving C8 logging, add some missing aync call resolutions, improve props --- .../c7/remote/correlation/SignalApiImpl.kt | 2 +- .../c8/correlation/CorrelationApiImpl.kt | 4 + .../adapter/c8/correlation/SignalApiImpl.kt | 4 + .../adapter/c8/deploy/DeploymentApiImpl.kt | 4 + .../adapter/c8/process/StartProcessApiImpl.kt | 5 + ...TaskListClientUserTaskCompletionApiImpl.kt | 5 +- ...ebeExternalServiceTaskCompletionApiImpl.kt | 17 ++- .../C8ZeebeUserTaskCompletionApiImpl.kt | 6 + .../task/completion/FailureRetrySupplier.kt | 12 ++ .../LinearMemoryFailureRetrySupplier.kt | 16 +++ .../c8/task/delivery/PullUserTaskDelivery.kt | 56 +++++---- .../SubscribingRefreshingUserTaskDelivery.kt | 114 ++++++++++-------- .../SubscribingServiceTaskDelivery.kt | 77 ++++++------ .../springboot/C8AdapterAutoConfiguration.kt | 4 +- .../c8/springboot/C8AdapterConditions.kt | 78 ++++++++++++ .../c8/springboot/C8AdapterProperties.kt | 28 ++++- .../C8TaskListClientAutoConfiguration.kt | 8 +- .../C8ZeebeClientAutoConfiguration.kt | 62 ++++------ .../schedule/C8SchedulingAutoConfiguration.kt | 84 +++++++++++++ .../RefreshingUserTaskDeliveryBinding.kt | 35 ++++++ .../ScheduledUserTaskDeliveryBinding.kt | 37 ++++-- ...ribingRefreshingUserTaskDeliveryBinding.kt | 25 ---- .../SubscribingServiceTaskDeliveryBinding.kt | 21 ---- .../C8SubscriptionAutoConfiguration.kt | 50 ++++++++ .../SubscribingServiceTaskDeliveryBinding.kt | 24 ++++ .../SubscribingUserTaskDeliveryBinding.kt | 23 ++++ ...ot.autoconfigure.AutoConfiguration.imports | 2 + .../src/test/resources/application-itest.yml | 8 +- examples/java-c8/pom.xml | 5 - .../src/main/resources/application-saas.yml | 28 ++--- .../src/main/resources/application-sm.yml | 35 ++---- .../src/main/resources/application.yml | 27 +++++ .../java-c8/src/main/resources/banner-sm.txt | 7 ++ examples/java-common-fixture/pom.xml | 1 - .../CommonFixtureAutoconfiguration.java | 9 +- 35 files changed, 638 insertions(+), 285 deletions(-) create mode 100644 engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/FailureRetrySupplier.kt create mode 100644 engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/LinearMemoryFailureRetrySupplier.kt create mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/C8SchedulingAutoConfiguration.kt create mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/RefreshingUserTaskDeliveryBinding.kt delete mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingRefreshingUserTaskDeliveryBinding.kt delete mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingServiceTaskDeliveryBinding.kt create mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/C8SubscriptionAutoConfiguration.kt create mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingServiceTaskDeliveryBinding.kt create mode 100644 engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingUserTaskDeliveryBinding.kt create mode 100644 examples/java-c8/src/main/resources/banner-sm.txt diff --git a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/correlation/SignalApiImpl.kt b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/correlation/SignalApiImpl.kt index ba8c505..da6c85b 100644 --- a/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/correlation/SignalApiImpl.kt +++ b/engine-adapter/camunda-platform-7-remote-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c7/remote/correlation/SignalApiImpl.kt @@ -20,7 +20,7 @@ class SignalApiImpl( companion object: KLogging() override fun sendSignal(cmd: SendSignalCmd): Future { - logger.debug { "PROCESS-ENGINE-C7-REMOTE-002: sending signal ${cmd.signalName}." } + logger.debug { "PROCESS-ENGINE-C7-REMOTE-002: Sending signal ${cmd.signalName}." } return CompletableFuture.supplyAsync { runtimeService .createSignalEvent(cmd.signalName) diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/CorrelationApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/CorrelationApiImpl.kt index c134904..995bc78 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/CorrelationApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/CorrelationApiImpl.kt @@ -10,6 +10,7 @@ import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep2 import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep3 +import mu.KLogging import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -17,9 +18,12 @@ class CorrelationApiImpl( private val zeebeClient: ZeebeClient ) : CorrelationApi { + companion object: KLogging() + override fun correlateMessage(cmd: CorrelateMessageCmd): Future { return CompletableFuture.supplyAsync { val correlationKey = cmd.correlation.get().correlationKey + logger.debug { "PROCESS-ENGINE-C8-001: Correlating message ${cmd.messageName} using correlation key value $correlationKey." } zeebeClient .newPublishMessageCommand() .messageName(cmd.messageName) diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/SignalApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/SignalApiImpl.kt index 05a6ad2..007d1b7 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/SignalApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/correlation/SignalApiImpl.kt @@ -8,6 +8,7 @@ import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd import dev.bpmcrafters.processengineapi.correlation.SignalApi import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.command.BroadcastSignalCommandStep1 +import mu.KLogging import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -15,8 +16,11 @@ class SignalApiImpl( private val zeebeClient: ZeebeClient ) : SignalApi { + companion object: KLogging() + override fun sendSignal(cmd: SendSignalCmd): Future { return CompletableFuture.supplyAsync { + logger.debug { "PROCESS-ENGINE-C8-002: Sending signal ${cmd.signalName}." } zeebeClient .newBroadcastSignalCommand() .signalName(cmd.signalName) diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/deploy/DeploymentApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/deploy/DeploymentApiImpl.kt index 47d1fe5..4c9e9ed 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/deploy/DeploymentApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/deploy/DeploymentApiImpl.kt @@ -7,6 +7,7 @@ import dev.bpmcrafters.processengineapi.deploy.DeploymentApi import dev.bpmcrafters.processengineapi.deploy.DeploymentInformation import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.response.DeploymentEvent +import mu.KLogging import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -14,8 +15,11 @@ class DeploymentApiImpl( private val zeebeClient: ZeebeClient ) : DeploymentApi { + companion object: KLogging() + override fun deploy(cmd: DeployBundleCommand): Future { require(cmd.resources.isNotEmpty()) { "Resources must not be empty, at least one resource must be provided." } + logger.debug { "PROCESS-ENGINE-C8-003: Executing a bundle deployment with ${cmd.resources.size} resources." } val first = cmd.resources.first() return CompletableFuture.supplyAsync { zeebeClient diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/process/StartProcessApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/process/StartProcessApiImpl.kt index 59c2359..5ad1e50 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/process/StartProcessApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/process/StartProcessApiImpl.kt @@ -7,6 +7,7 @@ import dev.bpmcrafters.processengineapi.process.* import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.response.ProcessInstanceEvent import io.camunda.zeebe.client.api.response.PublishMessageResponse +import mu.KLogging import java.util.concurrent.CompletableFuture import java.util.concurrent.Future @@ -14,10 +15,13 @@ class StartProcessApiImpl( private val zeebeClient: ZeebeClient ) : StartProcessApi { + companion object: KLogging() + override fun startProcess(cmd: StartProcessCommand): Future { return when (cmd) { is StartProcessByDefinitionCmd -> CompletableFuture.supplyAsync { + logger.debug { "PROCESS-ENGINE-C8-004: Starting a new process instance by definition ${cmd.definitionKey}." } zeebeClient .newCreateInstanceCommand() .bpmnProcessId(cmd.definitionKey) @@ -29,6 +33,7 @@ class StartProcessApiImpl( } is StartProcessByMessageCmd -> CompletableFuture.supplyAsync { + logger.debug { "PROCESS-ENGINE-C8-005: Starting a new process instance by message ${cmd.messageName}." } zeebeClient .newPublishMessageCommand() .messageName(cmd.messageName) diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8TaskListClientUserTaskCompletionApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8TaskListClientUserTaskCompletionApiImpl.kt index 869066a..bc35319 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8TaskListClientUserTaskCompletionApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8TaskListClientUserTaskCompletionApiImpl.kt @@ -18,10 +18,11 @@ class C8TaskListClientUserTaskCompletionApiImpl( companion object : KLogging() override fun completeTask(cmd: CompleteTaskCmd): Future { - taskListClient - .completeTask(cmd.taskId, cmd.get()) + logger.debug { "PROCESS-ENGINE-C8-006: completing service task ${cmd.taskId}." } + taskListClient.completeTask(cmd.taskId, cmd.get()) subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { termination.accept(cmd.taskId) + logger.debug { "PROCESS-ENGINE-C8-007: successfully completed service task ${cmd.taskId}." } } return CompletableFuture.completedFuture(Empty) } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeExternalServiceTaskCompletionApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeExternalServiceTaskCompletionApiImpl.kt index 2b62d6b..1f898cd 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeExternalServiceTaskCompletionApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeExternalServiceTaskCompletionApiImpl.kt @@ -8,44 +8,59 @@ import dev.bpmcrafters.processengineapi.task.ServiceTaskCompletionApi import dev.bpmcrafters.processengineapi.task.FailTaskCmd import io.camunda.zeebe.client.ZeebeClient import mu.KLogging +import java.time.Duration import java.util.concurrent.CompletableFuture import java.util.concurrent.Future class C8ZeebeExternalServiceTaskCompletionApiImpl( private val zeebeClient: ZeebeClient, - private val subscriptionRepository: SubscriptionRepository + private val subscriptionRepository: SubscriptionRepository, + private val failureRetrySupplier: FailureRetrySupplier ) : ServiceTaskCompletionApi { companion object : KLogging() override fun completeTask(cmd: CompleteTaskCmd): Future { + logger.debug { "PROCESS-ENGINE-C8-008: completing service task ${cmd.taskId}." } zeebeClient .newCompleteCommand(cmd.taskId.toLong()) .variables(cmd.get()) .send() + .join() subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { termination.accept(cmd.taskId) + logger.debug { "PROCESS-ENGINE-C8-009: successfully completed service task ${cmd.taskId}." } } return CompletableFuture.completedFuture(Empty) } override fun completeTaskByError(cmd: CompleteTaskByErrorCmd): Future { + logger.debug { "PROCESS-ENGINE-C8-008: throwing error ${cmd.errorCode} in service task ${cmd.taskId}." } zeebeClient .newThrowErrorCommand(cmd.taskId.toLong()) .errorCode(cmd.errorCode) .variables(cmd.get()) .send() + .join() subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { + logger.debug { "PROCESS-ENGINE-C8-009: successfully thrown error ${cmd.errorCode} in service task ${cmd.taskId}." } termination.accept(cmd.taskId) } return CompletableFuture.completedFuture(Empty) } override fun failTask(cmd: FailTaskCmd): Future { + val (retries, retriesTimeout) = failureRetrySupplier.apply(cmd.taskId) zeebeClient .newFailCommand(cmd.taskId.toLong()) + .retries(retries) + .retryBackoff(Duration.ofSeconds(retriesTimeout)) + .send() + .join() + logger.debug { "PROCESS-ENGINE-C8-010: failing service task ${cmd.taskId}." } subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { termination.accept(cmd.taskId) + logger.debug { "PROCESS-ENGINE-C8-011: successfully failed service task ${cmd.taskId}." } } return CompletableFuture.completedFuture(Empty) } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeUserTaskCompletionApiImpl.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeUserTaskCompletionApiImpl.kt index 62f6361..1fa8f73 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeUserTaskCompletionApiImpl.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/C8ZeebeUserTaskCompletionApiImpl.kt @@ -18,11 +18,14 @@ class C8ZeebeUserTaskCompletionApiImpl( companion object : KLogging() override fun completeTask(cmd: CompleteTaskCmd): Future { + logger.debug { "PROCESS-ENGINE-C8-012: completing user task ${cmd.taskId}." } zeebeClient .newCompleteCommand(cmd.taskId.toLong()) .variables(cmd.get()) .send() + .join() subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { + logger.debug { "PROCESS-ENGINE-C8-013: successfully completed user task ${cmd.taskId}." } termination.accept(cmd.taskId) } return CompletableFuture.completedFuture(Empty) @@ -34,8 +37,11 @@ class C8ZeebeUserTaskCompletionApiImpl( .errorCode(cmd.errorCode) .variables(cmd.get()) .send() + .join() + logger.debug { "PROCESS-ENGINE-C8-013: throwing error ${cmd.errorCode} in user task ${cmd.taskId}." } subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply { termination.accept(cmd.taskId) + logger.debug { "PROCESS-ENGINE-C8-013: successfully thrown error ${cmd.errorCode} in user task ${cmd.taskId}." } } return CompletableFuture.completedFuture(Empty) } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/FailureRetrySupplier.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/FailureRetrySupplier.kt new file mode 100644 index 0000000..611d4ef --- /dev/null +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/FailureRetrySupplier.kt @@ -0,0 +1,12 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.task.completion + +import java.util.function.Function + +@FunctionalInterface +interface FailureRetrySupplier : Function { + + data class FailureRetry( + val retryCount: Int, + val retryTimeout: Long + ) +} diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/LinearMemoryFailureRetrySupplier.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/LinearMemoryFailureRetrySupplier.kt new file mode 100644 index 0000000..d57f368 --- /dev/null +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/completion/LinearMemoryFailureRetrySupplier.kt @@ -0,0 +1,16 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.task.completion + +class LinearMemoryFailureRetrySupplier( + private val retry: Int, + private val retryTimeout: Long +) : FailureRetrySupplier { + + private val taskFailures: MutableMap = mutableMapOf() + + override fun apply(taskId: String): FailureRetrySupplier.FailureRetry { + val last = taskFailures.getOrPut(taskId) { FailureRetrySupplier.FailureRetry(retryCount = retry, retryTimeout = retryTimeout) } + val new = last.copy(retryCount = (last.retryCount - 1).coerceAtLeast(0)) // there must be no negative retries ever + taskFailures[taskId] = new + return new + } +} diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt index 88b5420..75528a1 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/PullUserTaskDelivery.kt @@ -3,53 +3,57 @@ package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle +import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.task.TaskType import io.camunda.tasklist.CamundaTaskListClient import io.camunda.tasklist.dto.Task import io.camunda.tasklist.dto.TaskSearch import io.camunda.tasklist.dto.TaskState +import mu.KLogging class PullUserTaskDelivery( private val taskListClient: CamundaTaskListClient, private val subscriptionRepository: SubscriptionRepository ) : RefreshableDelivery { + companion object : KLogging() + override fun refresh() { val subscriptions = subscriptionRepository.getTaskSubscriptions() // FIXME -> reverse lookup for all active subscriptions // if the task is not retrieved but active subscription has a task, call modification#terminated hook + if (subscriptions.isNotEmpty()) { + logger.trace { "PROCESS-ENGINE-C8-030: pulling user tasks for subscriptions: $subscriptions" } + taskListClient.getTasks( + TaskSearch() + .forSubscriptions(subscriptions) + .setWithVariables(true) + .setState(TaskState.CREATED) // deliver only open tasks + ).forEach { task -> + subscriptions + .firstOrNull { subscription -> subscription.matches(task) } + ?.let { activeSubscription -> + subscriptionRepository.activateSubscriptionForTask(task.id, activeSubscription) - taskListClient.getTasks( - TaskSearch() - .forSubscriptions(subscriptions) - .setWithVariables(true) - .setState(TaskState.CREATED) // deliver only open tasks - ).forEach { task -> - subscriptions - .firstOrNull { subscription -> subscription.matches(task) } - ?.let { activeSubscription -> + val variablesFromTask: Map = task.variables?.associate { variable -> + variable.name to variable.value + } ?: mapOf() - subscriptionRepository.activateSubscriptionForTask(task.id, activeSubscription) + val variables = variablesFromTask.filterBySubscription(activeSubscription) - val variables : Map = if (activeSubscription.payloadDescription == null) { - task.variables - } else { - if (activeSubscription.payloadDescription!!.isEmpty()) { - task.variables?.filter { activeSubscription.payloadDescription!!.contains(it.name) } - } else { - listOf() + try { + logger.debug { "PROCESS-ENGINE-C8-031: delivering user task ${task.id}." } + activeSubscription.action.accept(task.toTaskInformation(), variables) + logger.debug { "PROCESS-ENGINE-C8-032: successfully delivered user task ${task.id}." } + } catch (e: Exception) { + logger.error { "PROCESS-ENGINE-C8-031: error delivering user task ${task.id}: ${e.message}" } + subscriptionRepository.deactivateSubscriptionForTask(taskId = task.id) } - }?.associate { variable -> - variable.name to variable.value - } ?: mapOf() - - try { - activeSubscription.action.accept(task.toTaskInformation(), variables) - } catch (e: Exception) { - subscriptionRepository.deactivateSubscriptionForTask(taskId = task.id) } - } + } + } else { + logger.trace { "PROCESS-ENGINE-C8-035: pulling user tasks disabled, no subscriptions." } } } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt index 9cb7b26..47f63d6 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingRefreshingUserTaskDelivery.kt @@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.adapter.c8.task.SubscribingUserTaskDeliv import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle +import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.task.TaskSubscription import dev.bpmcrafters.processengineapi.task.TaskType import io.camunda.zeebe.client.ZeebeClient @@ -29,91 +30,100 @@ class SubscribingRefreshingUserTaskDelivery( private var jobWorkerRegistry: Map = emptyMap() fun subscribe() { - logger.info { "[USER TASK DELIVERY] Subscribing for user tasks." } - subscriptionRepository - .getTaskSubscriptions() - .filter { it.taskType == TaskType.USER } - .forEach { subscription -> - // this is a job to subscribe to. - zeebeClient - .newWorker() - .jobType(ZEEBE_USER_TASK) - .handler { client, job -> - if (subscription.matches(job)) { - logger.debug { "Retrieved user task ${job.key}" } - subscriptionRepository.activateSubscriptionForTask("${job.key}", subscription) - - val variables = if (subscription.payloadDescription == null) { - job.variablesAsMap - } else { - if (subscription.payloadDescription!!.isEmpty()) { - mapOf() - } else { - job.variablesAsMap.filter { subscription.payloadDescription!!.contains(it.key) } + val subscriptions = subscriptionRepository.getTaskSubscriptions() + if (subscriptions.isNotEmpty()) { + logger.trace { "PROCESS-ENGINE-C8-040: subscribing user tasks for subscriptions: $subscriptions" } + subscriptions + .filter { it.taskType == TaskType.USER } + .forEach { activeSubscription -> + // this is a job to subscribe to. + zeebeClient + .newWorker() + .jobType(ZEEBE_USER_TASK) + .handler { client, job -> + if (activeSubscription.matches(job)) { + subscriptionRepository.activateSubscriptionForTask("${job.key}", activeSubscription) + val variables = job.variablesAsMap.filterBySubscription(activeSubscription) + try { + logger.debug { "PROCESS-ENGINE-C8-041: Delivering user task ${job.key}." } + activeSubscription.action.accept(job.toTaskInformation(), variables) + logger.debug { "PROCESS-ENGINE-C8-042: Successfully delivered user task ${job.key}." } + } catch (e: Exception) { + logger.error { "PROCESS-ENGINE-C8-043: Failed to deliver user task ${job.key}: ${e.message}" } + client + .newFailCommand(job.key) + .retries(job.retries) + .send() + .join() // could not deliver + subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}") } + } else { + // put it back + logger.trace { "PROCESS-ENGINE-C8-044: Received user task ${job.key} not matching subscriptions, returning it." } + client + .newFailCommand(job.key) + .retries(job.retries + 1) + .send() + .join() + logger.trace { "PROCESS-ENGINE-C8-045: Successfully returned user task ${job.key} not matching subscriptions." } } - try { - subscription.action.accept(job.toTaskInformation(), variables) - - } catch (e: Exception) { - client.newFailCommand(job.key) // could not deliver - subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}") - } - } else { - // put it back - zeebeClient.newUpdateRetriesCommand(job).retries(job.retries + 1) - client.newFailCommand(job.key) } - } - .name(workerId) - .timeout(userTaskLockTimeoutMs * TIMEOUT_FACTOR) - .forSubscription(subscription) - // FIXME -> metrics to setup - .open() - .let { - jobWorkerRegistry + (subscription.taskDescriptionKey to it) - } - } + .name(workerId) + .timeout(userTaskLockTimeoutMs * TIMEOUT_FACTOR) + .forSubscription(activeSubscription) + // FIXME -> metrics to setup + .open() + .let { + jobWorkerRegistry + (activeSubscription.taskDescriptionKey to it) + } + } + } else { + logger.trace { "PROCESS-ENGINE-C8-046: not subscribing for user tasks, no active subscription found." } + } } override fun refresh() { - logger.trace { "[USER TASK DELIVERY] Refreshing user tasks." } - subscriptionRepository - .getDeliveredTaskIds(TaskType.USER) - .forEach { taskId -> + val subscriptions = subscriptionRepository.getDeliveredTaskIds(TaskType.USER) + logger.trace { "PROCESS-ENGINE-C8-047: refreshing user tasks for subscriptions: $subscriptions" } + if (subscriptions.isNotEmpty()) { + subscriptions.forEach { taskId -> try { - logger.trace { "Extending job $taskId..." } + logger.trace { "PROCESS-ENGINE-C8-048: Extending job timout for user task $taskId..." } zeebeClient .newUpdateTimeoutCommand(taskId.toLong()) .timeout(userTaskLockTimeoutMs) .send() .join() - logger.trace { "Extended job $taskId." } + logger.trace { "PROCESS-ENGINE-C8-049: Extended job timout for user task $taskId." } } catch (e: ClientStatusException) { when (e.statusCode) { Status.Code.NOT_FOUND -> { subscriptionRepository.getActiveSubscriptionForTask(taskId)?.let { - logger.trace { "User task is gone, sending termination to the handler." } + logger.trace { "PROCESS-ENGINE-C8-050: User task is gone, sending termination to the handler." } it.termination.accept(taskId) subscriptionRepository.deactivateSubscriptionForTask(taskId) - logger.trace { "Termination sent to handler and user task is removed." } + logger.trace { "PROCESS-ENGINE-C8-051: Termination sent to handler and user task is removed." } } } - else -> logger.error(e) { "Error extending job $taskId." } + else -> logger.error(e) { "PROCESS-ENGINE-C8-052: Error extending job $taskId, ${e.message}." } } } } + } else { + logger.trace { "PROCESS-ENGINE-C8-053: not subscribing for user tasks, no active subscription found." } + } } override fun unsubscribe(taskSubscription: TaskSubscription) { if (taskSubscription is TaskSubscriptionHandle) { - logger.debug { "Unsubscribe from user task: ${taskSubscription.taskDescriptionKey}" } + logger.debug { "PROCESS-ENGINE-C8-054: Unsubscribe from user task: ${taskSubscription.taskDescriptionKey}" } jobWorkerRegistry[taskSubscription.taskDescriptionKey]?.close() } } fun unsubscribeAll() { + logger.debug { "PROCESS-ENGINE-C8-055: Unsubscribe all user tasks." } jobWorkerRegistry.forEach { (_, job) -> job.close() } } diff --git a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt index a2ee5de..2550024 100644 --- a/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt +++ b/engine-adapter/camunda-platform-8-core/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/task/delivery/SubscribingServiceTaskDelivery.kt @@ -2,10 +2,12 @@ package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle +import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription import dev.bpmcrafters.processengineapi.task.TaskType import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.response.ActivatedJob import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1.JobWorkerBuilderStep3 +import mu.KLogging /** * Uses task subscription available in the repository to subscribe to zeebe. @@ -16,46 +18,49 @@ class SubscribingServiceTaskDelivery( private val workerId: String ) { + companion object: KLogging() + fun subscribe() { - subscriptionRepository - .getTaskSubscriptions() - .filter { it.taskType == TaskType.EXTERNAL } - .forEach { subscription -> - // this is a job to subscribe to. - zeebeClient - .newWorker() - .jobType(subscription.taskDescriptionKey) - .handler { client, job -> - if (subscription.matches(job)) { - subscriptionRepository.activateSubscriptionForTask("${job.key}", subscription) - val variables = if (subscription.payloadDescription == null) { - job.variablesAsMap - } else { - if (subscription.payloadDescription!!.isEmpty()) { - mapOf() - } else { - job.variablesAsMap.filter { subscription.payloadDescription!!.contains(it.key) } + val subscriptions = subscriptionRepository.getTaskSubscriptions() + if (subscriptions.isNotEmpty()) { + logger.trace { "PROCESS-ENGINE-C8-050: subscribing service tasks for subscriptions: $subscriptions" } + subscriptions + .filter { it.taskType == TaskType.EXTERNAL } + .forEach { activeSubscription -> + // this is a job to subscribe to. + zeebeClient + .newWorker() + .jobType(activeSubscription.taskDescriptionKey) + .handler { client, job -> + if (activeSubscription.matches(job)) { + subscriptionRepository.activateSubscriptionForTask("${job.key}", activeSubscription) + val variables = job.variablesAsMap.filterBySubscription(activeSubscription) + try { + logger.debug { "PROCESS-ENGINE-C8-051: Delivering service task ${job.key}." } + activeSubscription.action.accept(job.toTaskInformation(), variables) + logger.debug { "PROCESS-ENGINE-C8-052: Successfully delivered service task ${job.key}." } + } catch (e: Exception) { + logger.error { "PROCESS-ENGINE-C8-051: Failing to deliver service task ${job.key}: ${e.message}." } + client.newFailCommand(job.key).retries(job.retries).send().join() // could not deliver + subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}") + logger.error { "PROCESS-ENGINE-C8-052: Successfully failed to deliver service task ${job.key}: ${e.message}." } } + } else { + // put it back + // TODO: check this, is it ok to put the job this way back? + logger.trace { "PROCESS-ENGINE-C8-053: Received service task ${job.key} not matching subscriptions, returning it." } + client.newFailCommand(job.key).retries(job.retries + 1).send().join() + logger.trace { "PROCESS-ENGINE-C8-045: Successfully returned service task ${job.key} not matching subscriptions." } } - try { - subscription.action.accept(job.toTaskInformation(), variables) - - } catch (e: Exception) { - client.newFailCommand(job.key) // could not deliver - subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}") - } - } else { - // put it back - // TODO: check this, is it ok to put the job this way back? - zeebeClient.newUpdateRetriesCommand(job).retries(job.retries + 1) - client.newFailCommand(job.key) } - } - .name(workerId) - .forSubscription(subscription) - // FIXME -> metrics to setup - .open() - } + .name(workerId) + .forSubscription(activeSubscription) + // FIXME -> metrics to setup + .open() + } + } else { + logger.trace { "PROCESS-ENGINE-C8-050: Not subscribing service tasks for subscriptions. No subscriptions found." } + } } /* diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterAutoConfiguration.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterAutoConfiguration.kt index 5f6a56a..26c9607 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterAutoConfiguration.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterAutoConfiguration.kt @@ -24,9 +24,10 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Conditional import org.springframework.context.annotation.Configuration +import org.springframework.scheduling.TaskScheduler import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler -@EnableScheduling @Configuration @AutoConfigureAfter( CamundaAutoConfiguration::class @@ -71,4 +72,5 @@ class C8AdapterAutoConfiguration { @Bean @ConditionalOnMissingBean fun subscriptionRepository(): SubscriptionRepository = InMemSubscriptionRepository() + } diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterConditions.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterConditions.kt index 1ba676a..491fec4 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterConditions.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterConditions.kt @@ -1,9 +1,13 @@ package dev.bpmcrafters.processengineapi.adapter.c8.springboot import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.Companion.DEFAULT_PREFIX +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.ServiceTaskDeliveryStrategy +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy +import org.springframework.boot.context.properties.bind.BindResult import org.springframework.boot.context.properties.bind.Binder import org.springframework.context.annotation.Condition import org.springframework.context.annotation.ConditionContext +import org.springframework.context.annotation.Conditional import org.springframework.core.type.AnnotatedTypeMetadata /** @@ -19,3 +23,77 @@ open class C8AdapterEnabledCondition : Condition { return false } } + +/** + * Conditions matches if the given strategy is equal to the configured one in application property: `DEFAULT_PREFIX`.userTasks.deliveryStrategy + */ +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER) +@Retention(AnnotationRetention.RUNTIME) +@MustBeDocumented +@Conditional( + OnUserTaskDeliveryStrategyCondition::class +) +annotation class ConditionalOnUserTaskDeliveryStrategy( + val strategy: UserTaskDeliveryStrategy = UserTaskDeliveryStrategy.SUBSCRIPTION_REFRESHING, +) + +internal class OnUserTaskDeliveryStrategyCondition : C8AdapterEnabledCondition() { + override fun matches(context: ConditionContext, metadata: AnnotatedTypeMetadata): Boolean { + + if (!super.matches(context, metadata)) { + return false + } + + val propertiesBindResult: BindResult = Binder.get(context.environment) + .bind(DEFAULT_PREFIX, C8AdapterProperties::class.java) + + if (propertiesBindResult.isBound) { + val properties: C8AdapterProperties = propertiesBindResult.get() + + val strategy = metadata + .getAnnotationAttributes(ConditionalOnUserTaskDeliveryStrategy::class.java.name) + ?.get(ConditionalOnUserTaskDeliveryStrategy::strategy.name) as UserTaskDeliveryStrategy + + return properties.userTasks.deliveryStrategy == strategy + } + + return false + } +} + +/** + * Conditions matches if the given strategy is equal to the configured one in application property: `DEFAULT_PREFIX`.serviceTasks.deliveryStrategy + */ +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER) +@Retention(AnnotationRetention.RUNTIME) +@MustBeDocumented +@Conditional( + OnServiceTaskDeliveryStrategyCondition::class +) +annotation class ConditionalOnServiceTaskDeliveryStrategy( + val strategy: ServiceTaskDeliveryStrategy = ServiceTaskDeliveryStrategy.SUBSCRIPTION, +) + +internal class OnServiceTaskDeliveryStrategyCondition : C8AdapterEnabledCondition() { + override fun matches(context: ConditionContext, metadata: AnnotatedTypeMetadata): Boolean { + if (!super.matches(context, metadata)) { + return false + } + + val propertiesBindResult: BindResult = Binder.get(context.environment) + .bind(DEFAULT_PREFIX, C8AdapterProperties::class.java) + + if (propertiesBindResult.isBound) { + val properties: C8AdapterProperties = propertiesBindResult.get() + + val strategy = metadata + .getAnnotationAttributes(ConditionalOnServiceTaskDeliveryStrategy::class.java.name) + ?.get(ConditionalOnServiceTaskDeliveryStrategy::strategy.name) as ServiceTaskDeliveryStrategy + + return properties.serviceTasks.deliveryStrategy == strategy + } + + return false + } +} + diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt index 96ec36e..cfee482 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8AdapterProperties.kt @@ -39,6 +39,16 @@ class C8AdapterProperties( * Default id of the worker used for the external task. */ val workerId: String, + + /** + * Number of job retries. + */ + val retries: Int = 3, + + /** + * Timeout in seconds before making a retry. + */ + val retryTimeoutInSeconds: Long = 5L ) data class UserTasks( @@ -54,10 +64,6 @@ class C8AdapterProperties( * Fixed rate for scheduled user task delivery. */ val scheduleDeliveryFixedRateInSeconds: Long = 5L, - /** - * Fixed rate for refreshing user task delivery. - */ - val subscribingDeliveryInitialDelayInSeconds: Long = 5L, /** * URL of the task list. */ @@ -91,7 +97,12 @@ class C8AdapterProperties( /** * Subscribing using zeebe job subscriptions, extending lock times. */ - SUBSCRIPTION_REFRESHING + SUBSCRIPTION_REFRESHING, + + /** + * Own strategy. + */ + CUSTOM } /** @@ -101,6 +112,11 @@ class C8AdapterProperties( /** * Subscribing using zeebe job. */ - SUBSCRIPTION + SUBSCRIPTION, + + /** + * Own strategy. + */ + CUSTOM } } diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8TaskListClientAutoConfiguration.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8TaskListClientAutoConfiguration.kt index c23c515..f31c70b 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8TaskListClientAutoConfiguration.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8TaskListClientAutoConfiguration.kt @@ -1,6 +1,8 @@ package dev.bpmcrafters.processengineapi.adapter.c8.springboot import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.Companion.DEFAULT_PREFIX +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SCHEDULED +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SUBSCRIPTION_REFRESHING import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.C8TaskListClientUserTaskCompletionApiImpl import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.PullUserTaskDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository @@ -10,6 +12,7 @@ import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.autoconfigure.AutoConfigureAfter import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Conditional import org.springframework.context.annotation.Configuration /** @@ -17,7 +20,7 @@ import org.springframework.context.annotation.Configuration */ @Configuration @AutoConfigureAfter(C8AdapterAutoConfiguration::class) -@ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["enabled"], havingValue = "true", matchIfMissing = true) +@Conditional(C8AdapterEnabledCondition::class) class C8TaskListClientAutoConfiguration { @Bean("c8-user-task-completion") @@ -35,7 +38,7 @@ class C8TaskListClientAutoConfiguration { @Bean("c8-user-task-delivery") @Qualifier("c8-user-task-delivery") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["user-tasks.delivery-strategy"], havingValue = "scheduled") + @ConditionalOnUserTaskDeliveryStrategy(strategy = SCHEDULED) fun scheduledUserTaskDelivery( subscriptionRepository: SubscriptionRepository, taskListClient: CamundaTaskListClient, @@ -45,5 +48,4 @@ class C8TaskListClientAutoConfiguration { taskListClient = taskListClient ) } - } diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8ZeebeClientAutoConfiguration.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8ZeebeClientAutoConfiguration.kt index c6ff530..8b16593 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8ZeebeClientAutoConfiguration.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/C8ZeebeClientAutoConfiguration.kt @@ -1,12 +1,12 @@ package dev.bpmcrafters.processengineapi.adapter.c8.springboot import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.Companion.DEFAULT_PREFIX -import dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule.SubscribingRefreshingUserTaskDeliveryBinding -import dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule.ScheduledUserTaskDeliveryBinding -import dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule.SubscribingServiceTaskDeliveryBinding +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.ServiceTaskDeliveryStrategy.SUBSCRIPTION +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SUBSCRIPTION_REFRESHING import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.C8ZeebeExternalServiceTaskCompletionApiImpl import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.C8ZeebeUserTaskCompletionApiImpl -import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.PullUserTaskDelivery +import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.FailureRetrySupplier +import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.LinearMemoryFailureRetrySupplier import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingServiceTaskDelivery import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository @@ -15,8 +15,10 @@ import dev.bpmcrafters.processengineapi.task.UserTaskCompletionApi import io.camunda.zeebe.client.ZeebeClient import org.springframework.beans.factory.annotation.Qualifier import org.springframework.boot.autoconfigure.AutoConfigureAfter +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Conditional import org.springframework.context.annotation.Configuration /** @@ -24,12 +26,21 @@ import org.springframework.context.annotation.Configuration */ @Configuration @AutoConfigureAfter(C8AdapterAutoConfiguration::class) -@ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["enabled"], havingValue = "true", matchIfMissing = true) +@Conditional(C8AdapterEnabledCondition::class) class C8ZeebeClientAutoConfiguration { + @Bean + @ConditionalOnMissingBean + fun defaultFailureRetrySupplier(c8AdapterProperties: C8AdapterProperties): FailureRetrySupplier = + LinearMemoryFailureRetrySupplier( + retry = c8AdapterProperties.serviceTasks.retries, + retryTimeout = c8AdapterProperties.serviceTasks.retryTimeoutInSeconds + ) + + @Bean(name = ["c8-service-task-delivery"]) @Qualifier("c8-service-task-delivery") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["service-tasks.delivery-strategy"], havingValue = "subscription") + @ConditionalOnServiceTaskDeliveryStrategy(strategy = SUBSCRIPTION) fun subscribingServiceTaskDelivery( subscriptionRepository: SubscriptionRepository, zeebeClient: ZeebeClient, @@ -40,20 +51,9 @@ class C8ZeebeClientAutoConfiguration { workerId = c8AdapterProperties.serviceTasks.workerId ) - @Bean("c8-service-task-delivery-scheduler") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["service-tasks.delivery-strategy"], havingValue = "subscription") - fun subscribingServiceTaskDeliveryBinding( - @Qualifier("c8-service-task-delivery") - subscribingServiceTaskDelivery: SubscribingServiceTaskDelivery - ): SubscribingServiceTaskDeliveryBinding { - return SubscribingServiceTaskDeliveryBinding( - subscribingServiceTaskDelivery = subscribingServiceTaskDelivery - ) - } - @Bean(name = ["c8-user-task-delivery"]) @Qualifier("c8-user-task-delivery") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["user-tasks.delivery-strategy"], havingValue = "subscription_refreshing") + @ConditionalOnUserTaskDeliveryStrategy(strategy = SUBSCRIPTION_REFRESHING) fun subscribingRefreshingUserTaskDelivery( subscriptionRepository: SubscriptionRepository, zeebeClient: ZeebeClient, @@ -67,37 +67,17 @@ class C8ZeebeClientAutoConfiguration { ) } - @Bean("c8-user-task-delivery-scheduler") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["user-tasks.delivery-strategy"], havingValue = "scheduled") - fun scheduledUserTaskDeliveryBinding( - @Qualifier("c8-user-task-delivery") - pullUserTaskDelivery: PullUserTaskDelivery - ): ScheduledUserTaskDeliveryBinding { - return ScheduledUserTaskDeliveryBinding( - pullUserTaskDelivery = pullUserTaskDelivery - ) - } - - @Bean("c8-user-task-delivery-scheduler") - @ConditionalOnProperty(prefix = DEFAULT_PREFIX, name = ["user-tasks.delivery-strategy"], havingValue = "subscription_refreshing") - fun refreshingUserTaskDeliveryBinding( - @Qualifier("c8-user-task-delivery") - subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery - ): SubscribingRefreshingUserTaskDeliveryBinding { - return SubscribingRefreshingUserTaskDeliveryBinding( - subscribingRefreshingUserTaskDelivery = subscribingRefreshingUserTaskDelivery - ) - } - @Bean("c8-service-task-completion") @Qualifier("c8-service-task-completion") fun externalTaskCompletionStrategy( zeebeClient: ZeebeClient, subscriptionRepository: SubscriptionRepository, + failureRetrySupplier: FailureRetrySupplier ): ServiceTaskCompletionApi = C8ZeebeExternalServiceTaskCompletionApiImpl( zeebeClient = zeebeClient, - subscriptionRepository = subscriptionRepository + subscriptionRepository = subscriptionRepository, + failureRetrySupplier = failureRetrySupplier ) @Bean("c8-user-task-completion") diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/C8SchedulingAutoConfiguration.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/C8SchedulingAutoConfiguration.kt new file mode 100644 index 0000000..b0de05a --- /dev/null +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/C8SchedulingAutoConfiguration.kt @@ -0,0 +1,84 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule + +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterAutoConfiguration +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterEnabledCondition +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SCHEDULED +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SUBSCRIPTION_REFRESHING +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.ConditionalOnUserTaskDeliveryStrategy +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.PullUserTaskDelivery +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery +import jakarta.annotation.PostConstruct +import mu.KLogging +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.boot.autoconfigure.AutoConfigureAfter +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Conditional +import org.springframework.context.annotation.Configuration +import org.springframework.scheduling.TaskScheduler +import org.springframework.scheduling.annotation.EnableAsync +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler + +/** + * Auto-configuration for scheduled delivery. + */ +@Configuration +@EnableScheduling +@EnableAsync +@AutoConfigureAfter(C8AdapterAutoConfiguration::class) +@Conditional(C8AdapterEnabledCondition::class) +class C8SchedulingAutoConfiguration { + + companion object : KLogging() + + @PostConstruct + fun report() { + logger.debug { "PROCESS-ENGINE-C8-202: Scheduling configuration applied." } + } + + @Bean("c8-task-scheduler") + @Qualifier("c8-task-scheduler") + @ConditionalOnMissingBean + fun taskScheduler(): TaskScheduler { + val threadPoolTaskScheduler = ThreadPoolTaskScheduler() + threadPoolTaskScheduler.poolSize = 2 // we have two schedulers, one for user tasks one for service tasks + threadPoolTaskScheduler.threadNamePrefix = "C8REMOTE-SCHEDULER-" + return threadPoolTaskScheduler + } + + @Bean("c8-user-task-delivery-scheduler") + @ConditionalOnUserTaskDeliveryStrategy(strategy = SCHEDULED) + fun scheduledUserTaskDeliveryBinding( + c8AdapterProperties: C8AdapterProperties, + @Qualifier("c8-task-scheduler") + c8TaskScheduler: TaskScheduler, + @Qualifier("c8-user-task-delivery") + pullUserTaskDelivery: PullUserTaskDelivery + ): ScheduledUserTaskDeliveryBinding { + return ScheduledUserTaskDeliveryBinding( + pullUserTaskDelivery = pullUserTaskDelivery, + c8AdapterProperties = c8AdapterProperties, + c8taskScheduler = c8TaskScheduler + ) + } + + @Bean("c8-user-task-delivery-scheduler") + @ConditionalOnUserTaskDeliveryStrategy(strategy = SUBSCRIPTION_REFRESHING) + fun refreshingUserTaskDeliveryBinding( + @Qualifier("c8-user-task-delivery") + subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery, + c8AdapterProperties: C8AdapterProperties, + @Qualifier("c8-task-scheduler") + c8TaskScheduler: TaskScheduler + ): RefreshingUserTaskDeliveryBinding { + return RefreshingUserTaskDeliveryBinding( + subscribingRefreshingUserTaskDelivery = subscribingRefreshingUserTaskDelivery, + c8AdapterProperties = c8AdapterProperties, + c8taskScheduler = c8TaskScheduler + ) + } + + +} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/RefreshingUserTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/RefreshingUserTaskDeliveryBinding.kt new file mode 100644 index 0000000..aba78ed --- /dev/null +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/RefreshingUserTaskDeliveryBinding.kt @@ -0,0 +1,35 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule + +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery +import mu.KLogging +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.scheduling.TaskScheduler +import org.springframework.scheduling.annotation.SchedulingConfigurer +import org.springframework.scheduling.config.ScheduledTaskRegistrar +import java.time.Duration +import java.time.temporal.ChronoUnit + +class RefreshingUserTaskDeliveryBinding( + private val subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery, + private val c8AdapterProperties: C8AdapterProperties, + @Qualifier("c8-task-scheduler") + private val c8taskScheduler: TaskScheduler +) : SchedulingConfigurer { + + companion object : KLogging() + + override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) { + taskRegistrar.setScheduler(c8taskScheduler) + taskRegistrar.addFixedRateTask( + { + logger.trace { "PROCESS-ENGINE-C7-REMOTE-104: Refreshing user tasks..." } + subscribingRefreshingUserTaskDelivery.refresh() + logger.trace { "PROCESS-ENGINE-C7-REMOTE-105: Refreshed user tasks." } + }, + Duration.of(c8AdapterProperties.userTasks.scheduleDeliveryFixedRateInSeconds, ChronoUnit.SECONDS) + ) + } + + +} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/ScheduledUserTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/ScheduledUserTaskDeliveryBinding.kt index 527b4d7..339936d 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/ScheduledUserTaskDeliveryBinding.kt +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/ScheduledUserTaskDeliveryBinding.kt @@ -1,20 +1,33 @@ package dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.PullUserTaskDelivery import mu.KLogging -import org.springframework.scheduling.annotation.Scheduled +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.scheduling.TaskScheduler +import org.springframework.scheduling.annotation.SchedulingConfigurer +import org.springframework.scheduling.config.ScheduledTaskRegistrar +import java.time.Duration +import java.time.temporal.ChronoUnit -class ScheduledUserTaskDeliveryBinding( - private val pullUserTaskDelivery: PullUserTaskDelivery -) { +open class ScheduledUserTaskDeliveryBinding( + private val pullUserTaskDelivery: PullUserTaskDelivery, + private val c8AdapterProperties: C8AdapterProperties, + @Qualifier("c8-task-scheduler") + private val c8taskScheduler: TaskScheduler +) : SchedulingConfigurer { - companion object : KLogging() - - @Scheduled(fixedRateString = "\${dev.bpm-crafters.process-api.adapter.c8.user-tasks.schedule-delivery-fixed-rate-in-seconds}") - fun scheduleUserTaskDelivery() { - logger.trace { "[SCHEDULER]: Delivering user tasks..." } - pullUserTaskDelivery.refresh() - logger.trace { "[SCHEDULER]: Delivered user tasks." } - } + companion object : KLogging() + override fun configureTasks(taskRegistrar: ScheduledTaskRegistrar) { + taskRegistrar.setScheduler(c8taskScheduler) + taskRegistrar.addFixedRateTask( + { + logger.trace { "PROCESS-ENGINE-C7-REMOTE-106: Delivering user tasks..." } + pullUserTaskDelivery.refresh() + logger.trace { "PROCESS-ENGINE-C7-REMOTE-107: Delivered user tasks." } + }, + Duration.of(c8AdapterProperties.userTasks.scheduleDeliveryFixedRateInSeconds, ChronoUnit.SECONDS) + ) + } } diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingRefreshingUserTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingRefreshingUserTaskDeliveryBinding.kt deleted file mode 100644 index 7c76121..0000000 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingRefreshingUserTaskDeliveryBinding.kt +++ /dev/null @@ -1,25 +0,0 @@ -package dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule - -import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery -import mu.KLogging -import org.springframework.scheduling.annotation.Scheduled - -class SubscribingRefreshingUserTaskDeliveryBinding( - private val subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery -) { - - companion object : KLogging() - - @Scheduled(fixedRateString = "\${dev.bpm-crafters.process-api.adapter.c8.user-tasks.schedule-delivery-fixed-rate-in-seconds}") - fun scheduleUserTaskDelivery() { - logger.trace { "[SCHEDULER]: Refreshing user tasks..." } - subscribingRefreshingUserTaskDelivery.refresh() - logger.trace { "[SCHEDULER]: Refreshing user tasks." } - } - - @Scheduled(initialDelayString = "\${dev.bpm-crafters.process-api.adapter.c8.user-tasks.subscribing-delivery-initial-delay-in-seconds}", fixedDelay = Long.MAX_VALUE - 1000) - fun scheduleTaskSubscription() { - subscribingRefreshingUserTaskDelivery.subscribe() - } - -} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingServiceTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingServiceTaskDeliveryBinding.kt deleted file mode 100644 index 76a2545..0000000 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/schedule/SubscribingServiceTaskDeliveryBinding.kt +++ /dev/null @@ -1,21 +0,0 @@ -package dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule - -import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingServiceTaskDelivery -import mu.KLogging -import org.springframework.scheduling.annotation.Scheduled - -class SubscribingServiceTaskDeliveryBinding( - private val subscribingServiceTaskDelivery: SubscribingServiceTaskDelivery -) { - - companion object : KLogging() - - @Scheduled( - initialDelayString = "\${dev.bpm-crafters.process-api.adapter.c8.service-tasks.subscribing-delivery-initial-delay-in-seconds}", - fixedDelay = Long.MAX_VALUE - 1000 - ) - fun scheduleTaskSubscription() { - subscribingServiceTaskDelivery.subscribe() - } - -} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/C8SubscriptionAutoConfiguration.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/C8SubscriptionAutoConfiguration.kt new file mode 100644 index 0000000..2225284 --- /dev/null +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/C8SubscriptionAutoConfiguration.kt @@ -0,0 +1,50 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.springboot.subscription + +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterAutoConfiguration +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterEnabledCondition +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.ServiceTaskDeliveryStrategy.SUBSCRIPTION +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterProperties.UserTaskDeliveryStrategy.SUBSCRIPTION_REFRESHING +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.ConditionalOnServiceTaskDeliveryStrategy +import dev.bpmcrafters.processengineapi.adapter.c8.springboot.ConditionalOnUserTaskDeliveryStrategy +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingServiceTaskDelivery +import jakarta.annotation.PostConstruct +import mu.KLogging +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.boot.autoconfigure.AutoConfigureAfter +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Conditional + +@AutoConfigureAfter(C8AdapterAutoConfiguration::class) +@Conditional(C8AdapterEnabledCondition::class) +class C8SubscriptionAutoConfiguration { + + companion object: KLogging() + + @PostConstruct + fun report() { + logger.debug { "PROCESS-ENGINE-C8-203: Subscription configuration applied." } + } + + @Bean("c8-service-task-delivery-subscription") + @ConditionalOnServiceTaskDeliveryStrategy(strategy = SUBSCRIPTION) + fun subscribingServiceTaskDeliveryBinding( + @Qualifier("c8-service-task-delivery") + subscribingServiceTaskDelivery: SubscribingServiceTaskDelivery + ): SubscribingServiceTaskDeliveryBinding { + return SubscribingServiceTaskDeliveryBinding( + subscribingServiceTaskDelivery = subscribingServiceTaskDelivery + ) + } + + @Bean("c8-user-task-delivery-subscription") + @ConditionalOnUserTaskDeliveryStrategy(strategy = SUBSCRIPTION_REFRESHING) + fun subscribingUserTaskDeliveryBinding( + @Qualifier("c8-user-task-delivery") + subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery, + ): SubscribingUserTaskDeliveryBinding { + return SubscribingUserTaskDeliveryBinding( + subscribingRefreshingUserTaskDelivery = subscribingRefreshingUserTaskDelivery + ) + } +} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingServiceTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingServiceTaskDeliveryBinding.kt new file mode 100644 index 0000000..1526adb --- /dev/null +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingServiceTaskDeliveryBinding.kt @@ -0,0 +1,24 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.springboot.subscription + +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingServiceTaskDelivery +import mu.KLogging +import org.springframework.boot.context.event.ApplicationStartedEvent +import org.springframework.context.ApplicationEvent +import org.springframework.context.event.EventListener +import org.springframework.scheduling.annotation.Async + +open class SubscribingServiceTaskDeliveryBinding( + private val subscribingServiceTaskDelivery: SubscribingServiceTaskDelivery +) { + + companion object : KLogging() + + @EventListener + @Async + open fun scheduleTaskSubscription(event: ApplicationStartedEvent) { + logger.trace { "PROCESS-ENGINE-C8-100: Subscribing to service tasks..." } + subscribingServiceTaskDelivery.subscribe() + logger.trace { "PROCESS-ENGINE-C8-101: Subscribed to service tasks." } + } + +} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingUserTaskDeliveryBinding.kt b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingUserTaskDeliveryBinding.kt new file mode 100644 index 0000000..4fee1c9 --- /dev/null +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/kotlin/dev/bpmcrafters/processengineapi/adapter/c8/springboot/subscription/SubscribingUserTaskDeliveryBinding.kt @@ -0,0 +1,23 @@ +package dev.bpmcrafters.processengineapi.adapter.c8.springboot.subscription + +import dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingRefreshingUserTaskDelivery +import mu.KLogging +import org.springframework.boot.context.event.ApplicationStartedEvent +import org.springframework.context.event.EventListener +import org.springframework.scheduling.annotation.Async + +open class SubscribingUserTaskDeliveryBinding( + private val subscribingRefreshingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery +) { + + companion object : KLogging() + + @EventListener + @Async + open fun scheduleUserTaskSubscription(event: ApplicationStartedEvent) { + logger.trace { "PROCESS-ENGINE-C8-102: Subscribing to user tasks..." } + subscribingRefreshingUserTaskDelivery.subscribe() + logger.trace { "PROCESS-ENGINE-C8-103: Subscribed to user tasks." } + } + +} diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 350eae6..98e95c2 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,3 +1,5 @@ dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8AdapterAutoConfiguration dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8ZeebeClientAutoConfiguration dev.bpmcrafters.processengineapi.adapter.c8.springboot.C8TaskListClientAutoConfiguration +dev.bpmcrafters.processengineapi.adapter.c8.springboot.subscription.C8SubscriptionAutoConfiguration +dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule.C8SchedulingAutoConfiguration diff --git a/engine-adapter/camunda-platform-8-spring-boot-starter/src/test/resources/application-itest.yml b/engine-adapter/camunda-platform-8-spring-boot-starter/src/test/resources/application-itest.yml index 42a4005..6e0b9c1 100644 --- a/engine-adapter/camunda-platform-8-spring-boot-starter/src/test/resources/application-itest.yml +++ b/engine-adapter/camunda-platform-8-spring-boot-starter/src/test/resources/application-itest.yml @@ -9,12 +9,10 @@ dev: c8: enabled: true service-tasks: - delivery-strategy: subscription + delivery-strategy: SUBSCRIPTION worker-id: execute-action-external - subscribing-delivery-initial-delay-in-seconds: 1000 user-tasks: - delivery-strategy: subscription_refreshing # or scheduled - schedule-delivery-fixed-rate-in-seconds: 100000 + delivery-strategy: SUBSCRIPTION_REFRESHING # or scheduled + schedule-delivery-fixed-rate-in-seconds: 10 tasklist-url: http://localhost - subscribing-delivery-initial-delay-in-seconds: 100000 completion-strategy: job # or tasklist diff --git a/examples/java-c8/pom.xml b/examples/java-c8/pom.xml index 6602b01..e512ca3 100644 --- a/examples/java-c8/pom.xml +++ b/examples/java-c8/pom.xml @@ -67,11 +67,6 @@ - - org.springframework.boot - spring-boot-starter-web - - org.springdoc springdoc-openapi-starter-webmvc-ui diff --git a/examples/java-c8/src/main/resources/application-saas.yml b/examples/java-c8/src/main/resources/application-saas.yml index 36ae806..36f4868 100644 --- a/examples/java-c8/src/main/resources/application-saas.yml +++ b/examples/java-c8/src/main/resources/application-saas.yml @@ -2,24 +2,6 @@ spring: application: name: Java Camunda Platform 8 SaaS -dev: - bpm-crafters: - process-api: - adapter: - c8: - enabled: true - service-tasks: - delivery-strategy: subscription - worker-id: worker - subscribing-delivery-initial-delay-in-seconds: 1000 - user-tasks: - delivery-strategy: subscription_refreshing - completion-strategy: job - subscribing-delivery-initial-delay-in-seconds: 1000 - schedule-delivery-fixed-rate-in-seconds: 2000 # every 2 seconds - tasklist-url: https://${zeebe.client.cloud.region}.tasklist.camunda.io/${zeebe.client.cloud.clusterId} - fixed-rate-refresh-rate: 2000 # every 2 seconds - zeebe: client: connection-mode: CLOUD # CLOUD for SaaS OR ADDRESS for Self-Managed @@ -29,6 +11,10 @@ zeebe: clientId: ${ZEEBE_CLIENT_ID} clientSecret: ${ZEEBE_CLIENT_SECRET} -logging: - level: - io.camunda.zeebe.client.impl: ERROR +dev: + bpm-crafters: + process-api: + adapter: + c8: + user-tasks: + tasklist-url: https://${zeebe.client.cloud.region}.tasklist.camunda.io/${zeebe.client.cloud.clusterId} diff --git a/examples/java-c8/src/main/resources/application-sm.yml b/examples/java-c8/src/main/resources/application-sm.yml index b1e7188..fa24cda 100644 --- a/examples/java-c8/src/main/resources/application-sm.yml +++ b/examples/java-c8/src/main/resources/application-sm.yml @@ -1,25 +1,9 @@ spring: + banner: + location: banner-sm.txt application: name: Java Camunda Platform 8 Local Self-Managed -dev: - bpm-crafters: - process-api: - adapter: - c8: - enabled: true - service-tasks: - delivery-strategy: subscription - worker-id: worker - subscribing-delivery-initial-delay-in-seconds: 1000 - user-tasks: - delivery-strategy: subscription_refreshing - subscribing-delivery-initial-delay-in-seconds: 1000 - schedule-delivery-fixed-rate-in-seconds: 2000 # every 2 seconds - tasklist-url: http://localhost:8082 - fixed-rate-refresh-rate: 2000 # every 2 seconds - completion-strategy: job - camunda: client: mode: simple @@ -29,11 +13,12 @@ camunda: zeebe: base-url: http://127.0.0.1:26500 tasklist: - base-url: ${dev.bpm-crafters.process-api.adapter.c8.user-tasks.tasklist-url} + base-url: http://localhost:8082 - -logging: - level: - io.camunda.zeebe.client.impl: ERROR - dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule: DEBUG - dev.bpmcrafters.processengineapi.adapter.c8.task.delivery: DEBUG +dev: + bpm-crafters: + process-api: + adapter: + c8: + user-tasks: + tasklist-url: http://localhost:8082 diff --git a/examples/java-c8/src/main/resources/application.yml b/examples/java-c8/src/main/resources/application.yml index 7f96e04..44a88dd 100644 --- a/examples/java-c8/src/main/resources/application.yml +++ b/examples/java-c8/src/main/resources/application.yml @@ -4,3 +4,30 @@ server: springdoc: swagger-ui: try-it-out-enabled: true + packages-to-scan: + - dev.bpmcrafters.example.common.adapter.in.rest + +dev: + bpm-crafters: + process-api: + adapter: + c8: + enabled: true + service-tasks: + delivery-strategy: subscription + worker-id: example-worker + user-tasks: + delivery-strategy: subscription_refreshing + completion-strategy: job + schedule-delivery-fixed-rate-in-seconds: 5 + +logging: + level: + io.camunda.zeebe.client.impl: ERROR + # Make Bean Postprocessor Warnings quiet: FIXME -> double-check them + org.springframework.context.support.PostProcessorRegistrationDelegate: ERROR + # Process Engine API + dev.bpmcrafters: DEBUG + dev.bpmcrafters.processengineapi.adapter.c8.task.delivery.SubscribingServiceTaskDelivery: TRACE + dev.bpmcrafters.processengineapi.adapter.c8.springboot.schedule: DEBUG + dev.bpmcrafters.processengineapi.adapter.c8.task.delivery: DEBUG diff --git a/examples/java-c8/src/main/resources/banner-sm.txt b/examples/java-c8/src/main/resources/banner-sm.txt new file mode 100644 index 0000000..364ff91 --- /dev/null +++ b/examples/java-c8/src/main/resources/banner-sm.txt @@ -0,0 +1,7 @@ +=============================================================================== + +C8 Example Application (Self Managed) + + Open API: http://localhost:${server.port}/swagger-ui/index.html + +=============================================================================== diff --git a/examples/java-common-fixture/pom.xml b/examples/java-common-fixture/pom.xml index 30632db..0e1a030 100644 --- a/examples/java-common-fixture/pom.xml +++ b/examples/java-common-fixture/pom.xml @@ -39,7 +39,6 @@ org.springframework.boot spring-boot-starter-web - provided diff --git a/examples/java-common-fixture/src/main/java/dev/bpmcrafters/example/common/CommonFixtureAutoconfiguration.java b/examples/java-common-fixture/src/main/java/dev/bpmcrafters/example/common/CommonFixtureAutoconfiguration.java index b10dd16..87caf7b 100644 --- a/examples/java-common-fixture/src/main/java/dev/bpmcrafters/example/common/CommonFixtureAutoconfiguration.java +++ b/examples/java-common-fixture/src/main/java/dev/bpmcrafters/example/common/CommonFixtureAutoconfiguration.java @@ -1,11 +1,18 @@ package dev.bpmcrafters.example.common; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan -@ConditionalOnProperty(prefix = "dev.bpm-crafters.process-api", name = "adapter") +@Slf4j public class CommonFixtureAutoconfiguration { + + @PostConstruct + public void report() { + log.info("[EXAMPLE] Started common example fixture actor configuration"); + } }