Skip to content

Commit

Permalink
Merge pull request #81 from bpm-crafters/feature/improving_c8
Browse files Browse the repository at this point in the history
Iimproving C8 Adapter
  • Loading branch information
zambrovski authored Sep 17, 2024
2 parents 49c1058 + 8457281 commit b7ff87f
Show file tree
Hide file tree
Showing 35 changed files with 641 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SignalApiImpl(
companion object: KLogging()

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
logger.debug { "PROCESS-ENGINE-C7-REMOTE-002: sending signal ${cmd.signalName}." }
logger.debug { "PROCESS-ENGINE-C7-REMOTE-002: Sending signal ${cmd.signalName}." }
return CompletableFuture.supplyAsync {
runtimeService
.createSignalEvent(cmd.signalName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep2
import io.camunda.zeebe.client.api.command.PublishMessageCommandStep1.PublishMessageCommandStep3
import mu.KLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class CorrelationApiImpl(
private val zeebeClient: ZeebeClient
) : CorrelationApi {

companion object: KLogging()

override fun correlateMessage(cmd: CorrelateMessageCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
val correlationKey = cmd.correlation.get().correlationKey
logger.debug { "PROCESS-ENGINE-C8-001: Correlating message ${cmd.messageName} using correlation key value $correlationKey." }
zeebeClient
.newPublishMessageCommand()
.messageName(cmd.messageName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ import dev.bpmcrafters.processengineapi.correlation.SendSignalCmd
import dev.bpmcrafters.processengineapi.correlation.SignalApi
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.command.BroadcastSignalCommandStep1
import mu.KLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class SignalApiImpl(
private val zeebeClient: ZeebeClient
) : SignalApi {

companion object: KLogging()

override fun sendSignal(cmd: SendSignalCmd): Future<Empty> {
return CompletableFuture.supplyAsync {
logger.debug { "PROCESS-ENGINE-C8-002: Sending signal ${cmd.signalName}." }
zeebeClient
.newBroadcastSignalCommand()
.signalName(cmd.signalName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ import dev.bpmcrafters.processengineapi.deploy.DeploymentApi
import dev.bpmcrafters.processengineapi.deploy.DeploymentInformation
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.response.DeploymentEvent
import mu.KLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class DeploymentApiImpl(
private val zeebeClient: ZeebeClient
) : DeploymentApi {

companion object: KLogging()

override fun deploy(cmd: DeployBundleCommand): Future<DeploymentInformation> {
require(cmd.resources.isNotEmpty()) { "Resources must not be empty, at least one resource must be provided." }
logger.debug { "PROCESS-ENGINE-C8-003: Executing a bundle deployment with ${cmd.resources.size} resources." }
val first = cmd.resources.first()
return CompletableFuture.supplyAsync {
zeebeClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ import dev.bpmcrafters.processengineapi.process.*
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent
import io.camunda.zeebe.client.api.response.PublishMessageResponse
import mu.KLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class StartProcessApiImpl(
private val zeebeClient: ZeebeClient
) : StartProcessApi {

companion object: KLogging()

override fun startProcess(cmd: StartProcessCommand): Future<ProcessInformation> {
return when (cmd) {
is StartProcessByDefinitionCmd ->
CompletableFuture.supplyAsync {
logger.debug { "PROCESS-ENGINE-C8-004: Starting a new process instance by definition ${cmd.definitionKey}." }
zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId(cmd.definitionKey)
Expand All @@ -29,6 +33,7 @@ class StartProcessApiImpl(
}
is StartProcessByMessageCmd ->
CompletableFuture.supplyAsync {
logger.debug { "PROCESS-ENGINE-C8-005: Starting a new process instance by message ${cmd.messageName}." }
zeebeClient
.newPublishMessageCommand()
.messageName(cmd.messageName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ class C8TaskListClientUserTaskCompletionApiImpl(
companion object : KLogging()

override fun completeTask(cmd: CompleteTaskCmd): Future<Empty> {
taskListClient
.completeTask(cmd.taskId, cmd.get())
logger.debug { "PROCESS-ENGINE-C8-006: completing service task ${cmd.taskId}." }
taskListClient.completeTask(cmd.taskId, cmd.get())
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
termination.accept(cmd.taskId)
logger.debug { "PROCESS-ENGINE-C8-007: successfully completed service task ${cmd.taskId}." }
}
return CompletableFuture.completedFuture(Empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,59 @@ import dev.bpmcrafters.processengineapi.task.ServiceTaskCompletionApi
import dev.bpmcrafters.processengineapi.task.FailTaskCmd
import io.camunda.zeebe.client.ZeebeClient
import mu.KLogging
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class C8ZeebeExternalServiceTaskCompletionApiImpl(
private val zeebeClient: ZeebeClient,
private val subscriptionRepository: SubscriptionRepository
private val subscriptionRepository: SubscriptionRepository,
private val failureRetrySupplier: FailureRetrySupplier
) : ServiceTaskCompletionApi {

companion object : KLogging()

override fun completeTask(cmd: CompleteTaskCmd): Future<Empty> {
logger.debug { "PROCESS-ENGINE-C8-008: completing service task ${cmd.taskId}." }
zeebeClient
.newCompleteCommand(cmd.taskId.toLong())
.variables(cmd.get())
.send()
.join()
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
termination.accept(cmd.taskId)
logger.debug { "PROCESS-ENGINE-C8-009: successfully completed service task ${cmd.taskId}." }
}
return CompletableFuture.completedFuture(Empty)
}

override fun completeTaskByError(cmd: CompleteTaskByErrorCmd): Future<Empty> {
logger.debug { "PROCESS-ENGINE-C8-008: throwing error ${cmd.errorCode} in service task ${cmd.taskId}." }
zeebeClient
.newThrowErrorCommand(cmd.taskId.toLong())
.errorCode(cmd.errorCode)
.variables(cmd.get())
.send()
.join()
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
logger.debug { "PROCESS-ENGINE-C8-009: successfully thrown error ${cmd.errorCode} in service task ${cmd.taskId}." }
termination.accept(cmd.taskId)
}
return CompletableFuture.completedFuture(Empty)
}

override fun failTask(cmd: FailTaskCmd): Future<Empty> {
val (retries, retriesTimeout) = failureRetrySupplier.apply(cmd.taskId)
zeebeClient
.newFailCommand(cmd.taskId.toLong())
.retries(retries)
.retryBackoff(Duration.ofSeconds(retriesTimeout))
.send()
.join()
logger.debug { "PROCESS-ENGINE-C8-010: failing service task ${cmd.taskId}." }
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
termination.accept(cmd.taskId)
logger.debug { "PROCESS-ENGINE-C8-011: successfully failed service task ${cmd.taskId}." }
}
return CompletableFuture.completedFuture(Empty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ class C8ZeebeUserTaskCompletionApiImpl(
companion object : KLogging()

override fun completeTask(cmd: CompleteTaskCmd): Future<Empty> {
logger.debug { "PROCESS-ENGINE-C8-012: completing user task ${cmd.taskId}." }
zeebeClient
.newCompleteCommand(cmd.taskId.toLong())
.variables(cmd.get())
.send()
.join()
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
logger.debug { "PROCESS-ENGINE-C8-013: successfully completed user task ${cmd.taskId}." }
termination.accept(cmd.taskId)
}
return CompletableFuture.completedFuture(Empty)
Expand All @@ -34,8 +37,11 @@ class C8ZeebeUserTaskCompletionApiImpl(
.errorCode(cmd.errorCode)
.variables(cmd.get())
.send()
.join()
logger.debug { "PROCESS-ENGINE-C8-013: throwing error ${cmd.errorCode} in user task ${cmd.taskId}." }
subscriptionRepository.deactivateSubscriptionForTask(cmd.taskId)?.apply {
termination.accept(cmd.taskId)
logger.debug { "PROCESS-ENGINE-C8-013: successfully thrown error ${cmd.errorCode} in user task ${cmd.taskId}." }
}
return CompletableFuture.completedFuture(Empty)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.completion

import java.util.function.Function

@FunctionalInterface
interface FailureRetrySupplier : Function<String, FailureRetrySupplier.FailureRetry> {

data class FailureRetry(
val retryCount: Int,
val retryTimeout: Long
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.completion

class LinearMemoryFailureRetrySupplier(
private val retry: Int,
private val retryTimeout: Long
) : FailureRetrySupplier {

private val taskFailures: MutableMap<String, FailureRetrySupplier.FailureRetry> = mutableMapOf()

override fun apply(taskId: String): FailureRetrySupplier.FailureRetry {
val last = taskFailures.getOrPut(taskId) { FailureRetrySupplier.FailureRetry(retryCount = retry, retryTimeout = retryTimeout) }
val new = last.copy(retryCount = (last.retryCount - 1).coerceAtLeast(0)) // there must be no negative retries ever
taskFailures[taskId] = new
return new
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,57 @@ package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscription
import dev.bpmcrafters.processengineapi.task.TaskType
import io.camunda.tasklist.CamundaTaskListClient
import io.camunda.tasklist.dto.Task
import io.camunda.tasklist.dto.TaskSearch
import io.camunda.tasklist.dto.TaskState
import mu.KLogging

class PullUserTaskDelivery(
private val taskListClient: CamundaTaskListClient,
private val subscriptionRepository: SubscriptionRepository
) : RefreshableDelivery {

companion object : KLogging()

override fun refresh() {
val subscriptions = subscriptionRepository.getTaskSubscriptions()

// FIXME -> reverse lookup for all active subscriptions
// if the task is not retrieved but active subscription has a task, call modification#terminated hook
if (subscriptions.isNotEmpty()) {
logger.trace { "PROCESS-ENGINE-C8-030: pulling user tasks for subscriptions: $subscriptions" }
taskListClient.getTasks(
TaskSearch()
.forSubscriptions(subscriptions)
.setWithVariables(true)
.setState(TaskState.CREATED) // deliver only open tasks
).forEach { task ->
subscriptions
.firstOrNull { subscription -> subscription.matches(task) }
?.let { activeSubscription ->
subscriptionRepository.activateSubscriptionForTask(task.id, activeSubscription)

taskListClient.getTasks(
TaskSearch()
.forSubscriptions(subscriptions)
.setWithVariables(true)
.setState(TaskState.CREATED) // deliver only open tasks
).forEach { task ->
subscriptions
.firstOrNull { subscription -> subscription.matches(task) }
?.let { activeSubscription ->
val variablesFromTask: Map<String, Any> = task.variables?.associate { variable ->
variable.name to variable.value
} ?: mapOf()

subscriptionRepository.activateSubscriptionForTask(task.id, activeSubscription)
val variables = variablesFromTask.filterBySubscription(activeSubscription)

val variables : Map<String, Any> = if (activeSubscription.payloadDescription == null) {
task.variables
} else {
if (activeSubscription.payloadDescription!!.isEmpty()) {
task.variables?.filter { activeSubscription.payloadDescription!!.contains(it.name) }
} else {
listOf()
try {
logger.debug { "PROCESS-ENGINE-C8-031: delivering user task ${task.id}." }
activeSubscription.action.accept(task.toTaskInformation(), variables)
logger.debug { "PROCESS-ENGINE-C8-032: successfully delivered user task ${task.id}." }
} catch (e: Exception) {
logger.error { "PROCESS-ENGINE-C8-031: error delivering user task ${task.id}: ${e.message}" }
subscriptionRepository.deactivateSubscriptionForTask(taskId = task.id)
}
}?.associate { variable ->
variable.name to variable.value
} ?: mapOf()

try {
activeSubscription.action.accept(task.toTaskInformation(), variables)
} catch (e: Exception) {
subscriptionRepository.deactivateSubscriptionForTask(taskId = task.id)
}
}
}
} else {
logger.trace { "PROCESS-ENGINE-C8-035: pulling user tasks disabled, no subscriptions." }
}
}

Expand Down
Loading

0 comments on commit b7ff87f

Please sign in to comment.