Skip to content

Commit

Permalink
For C8, also close the job on unsubscribe for a task. (+ Added C8 Use…
Browse files Browse the repository at this point in the history
…r Task tests)
  • Loading branch information
p-wunderlich committed May 2, 2024
1 parent 6f1e4cf commit aaaad8c
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import dev.bpmcrafters.processengineapi.task.*
import io.mockk.mockk
import io.toolisticon.testing.jgiven.JGivenKotlinStage
import io.toolisticon.testing.jgiven.step
import mu.KLogging
import org.assertj.core.api.Assertions.assertThat

@JGivenKotlinStage
class BaseGivenWhenStage : Stage<BaseGivenWhenStage>() {

companion object : KLogging()

@ExpectedScenarioState
lateinit var processTestHelper: ProcessTestHelper

Expand Down Expand Up @@ -66,7 +69,16 @@ class BaseGivenWhenStage : Stage<BaseGivenWhenStage>() {
}

fun `a active user task subscription`(taskDescriptionKey: String) = step {
taskSubscription = subscribeTask(TaskType.USER, taskDescriptionKey) { taskInformation, _ -> userTaskId = taskInformation.taskId }
taskSubscription = subscribeTask(TaskType.USER, taskDescriptionKey) { taskInformation, _ ->
run {
logger.info { "Got new task ${taskInformation.taskId}" }
userTaskId = taskInformation.taskId
}
}
}

fun `subscribe for tasks`() = step {
processTestHelper.subscribeForUserTasks()
}

fun `a active external task subscription`(taskDescriptionKey: String) = step {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import com.tngtech.jgiven.Stage
import com.tngtech.jgiven.annotation.ExpectedScenarioState
import io.toolisticon.testing.jgiven.JGivenKotlinStage
import io.toolisticon.testing.jgiven.step
import mu.KLogging
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import java.time.Duration
import java.util.concurrent.TimeUnit

@JGivenKotlinStage
class BaseThenStage : Stage<BaseThenStage>() {

companion object : KLogging()

@ExpectedScenarioState
lateinit var instanceId: String

Expand All @@ -27,24 +32,32 @@ class BaseThenStage : Stage<BaseThenStage>() {
assertThat(process).isNotNull()
}

fun `we should get notified about a new user task`() = step {
processTestHelper.triggerUserTaskDeliveryManually()
fun `we should get notified about a new user task with pull strategy`() = step {
processTestHelper.triggerPullingUserTaskDeliveryManually()

await().untilAsserted { assertThat(userTaskId).isNotEmpty() }
}

fun `we should get notified about a new external task`() = step {
processTestHelper.triggerExternalTaskDeliveryManually()

await().untilAsserted { assertThat(externalTaskId).isNotEmpty() }
fun `we should get notified about a new user task with subscribing strategy`() = step {
await().untilAsserted { assertThat(userTaskId).isNotEmpty() }
}

fun `we should not get notified about a new user task`() = step {
processTestHelper.triggerUserTaskDeliveryManually()
fun `we should not get notified about a new user task with pull strategy`() = step {
processTestHelper.triggerPullingUserTaskDeliveryManually()

await().untilAsserted { assertThat(userTaskId).isNull() }
}

fun `we should not get notified about a new user task with subscribing strategy`() = step {
await().untilAsserted { assertThat(userTaskId).isNull() }
}

fun `we should get notified about a new external task`() = step {
processTestHelper.triggerExternalTaskDeliveryManually()

await().untilAsserted { assertThat(externalTaskId).isNotEmpty() }
}

fun `we should not get notified about a new external task`() = step {
processTestHelper.triggerExternalTaskDeliveryManually()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ interface ProcessTestHelper {

fun getExternalTaskCompletionApi(): ExternalTaskCompletionApi

fun triggerUserTaskDeliveryManually()
fun triggerPullingUserTaskDeliveryManually()

fun subscribeForUserTasks()

fun triggerExternalTaskDeliveryManually()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ class C7EmbeddedProcessTestHelper(private val processEngine: ProcessEngine) : Pr
subscriptionRepository = subscriptionRepository
)

override fun triggerUserTaskDeliveryManually() = embeddedPullUserTaskDelivery.deliverAll()
override fun triggerPullingUserTaskDeliveryManually() = embeddedPullUserTaskDelivery.deliverAll()
override fun subscribeForUserTasks() {
TODO("Not yet implemented")
}

override fun triggerExternalTaskDeliveryManually() = embeddedPullExternalTaskDelivery.deliverAll()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class C7EmbeddedTaskApiITest : AbstractC7EmbeddedApiITest(C7EmbeddedProcessTestH
.`start process by definition`(KEY)

THEN
.`we should get notified about a new user task`()
.`we should get notified about a new user task with pull strategy`()
}

@Test
Expand All @@ -51,7 +51,7 @@ class C7EmbeddedTaskApiITest : AbstractC7EmbeddedApiITest(C7EmbeddedProcessTestH
.`start process by definition`(KEY)

THEN
.`we should not get notified about a new user task`()
.`we should not get notified about a new user task with pull strategy`()
}

@Test
Expand All @@ -74,7 +74,7 @@ class C7EmbeddedTaskApiITest : AbstractC7EmbeddedApiITest(C7EmbeddedProcessTestH
.`start process by definition`(KEY)

THEN
.`we should get notified about a new user task`()
.`we should get notified about a new user task with pull strategy`()

WHEN
.`complete the user task`()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ class C7RemoteProcessTestHelper(
override fun getUserTaskCompletionApi(): UserTaskCompletionApi = userTaskCompletionApi
override fun getExternalTaskCompletionApi(): ExternalTaskCompletionApi = externalTaskCompletionApi

override fun triggerUserTaskDeliveryManually() = userTaskDelivery.deliverAll()
override fun triggerPullingUserTaskDeliveryManually() = userTaskDelivery.deliverAll()
override fun subscribeForUserTasks() {
TODO("Not yet implemented")
}

override fun triggerExternalTaskDeliveryManually() = externalTaskDelivery.deliverAll()

override fun getProcessInformation(instanceId: String): ProcessInformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class C7RemoteTaskApiITest(@Autowired processTestHelperImpl: ProcessTestHelper)
.`start process by definition`(KEY)

THEN
.`we should get notified about a new user task`()
.`we should get notified about a new user task with pull strategy`()
}

@Test
Expand All @@ -43,7 +43,7 @@ class C7RemoteTaskApiITest(@Autowired processTestHelperImpl: ProcessTestHelper)
.`start process by definition`(KEY)

THEN
.`we should not get notified about a new user task`()
.`we should not get notified about a new user task with pull strategy`()
}

@Test
Expand All @@ -66,7 +66,7 @@ class C7RemoteTaskApiITest(@Autowired processTestHelperImpl: ProcessTestHelper)
.`start process by definition`(KEY)

THEN
.`we should get notified about a new user task`()
.`we should get notified about a new user task with pull strategy`()

WHEN
.`complete the user task`()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task

import dev.bpmcrafters.processengineapi.task.TaskSubscription

/**
* Common interface for user task delivery.
*/
interface SubscribingUserTaskDelivery {
fun unsubscribe(taskSubscription: TaskSubscription)
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery

import dev.bpmcrafters.processengineapi.adapter.c8.task.SubscribingUserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.adapter.commons.task.TaskSubscriptionHandle
import dev.bpmcrafters.processengineapi.task.TaskSubscription
import dev.bpmcrafters.processengineapi.task.TaskType
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.command.ClientStatusException
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.JobWorker
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1.JobWorkerBuilderStep3
import io.grpc.Status
import mu.KLogging
Expand All @@ -15,13 +18,16 @@ class SubscribingRefreshingUserTaskDelivery(
private val subscriptionRepository: SubscriptionRepository,
private val workerId: String,
private val userTaskLockTimeoutMs: Long,
) {
): SubscribingUserTaskDelivery {

companion object : KLogging() {
const val ZEEBE_USER_TASK = "io.camunda.zeebe:userTask"
const val TIMEOUT_FACTOR = 2L
}

// taskDescriptionKey to job => TODO typealias for taskDescriptionKey?
private var jobWorkerRegistry: Map<String, JobWorker> = emptyMap()

fun refresh() {
subscriptionRepository
.getDeliveredTaskIds(TaskType.USER)
Expand Down Expand Up @@ -94,9 +100,23 @@ class SubscribingRefreshingUserTaskDelivery(
// FIXME -> more to setup from props
// FIXME -> metrics to setup
.open()
.let {
jobWorkerRegistry + (subscription.taskDescriptionKey to it)
}
}
}

override fun unsubscribe(taskSubscription: TaskSubscription) {
if(taskSubscription is TaskSubscriptionHandle) { // TODO extend interface of TaskSubscription?
logger.info { "Unsubscribe from user task: ${taskSubscription.taskDescriptionKey}" }
jobWorkerRegistry[taskSubscription.taskDescriptionKey]?.close()
}
}

fun unsubscribeAll() {
jobWorkerRegistry.forEach { (_, job) -> job.close() }
}

/*
* Additional restrictions to check.
*/
Expand All @@ -113,5 +133,4 @@ class SubscribingRefreshingUserTaskDelivery(
}
}


}
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.subscription

import dev.bpmcrafters.processengineapi.Empty
import dev.bpmcrafters.processengineapi.MetaInfo
import dev.bpmcrafters.processengineapi.MetaInfoAware
import dev.bpmcrafters.processengineapi.adapter.c8.task.SubscribingUserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.AbstractTaskSubscriptionApiImpl
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
import dev.bpmcrafters.processengineapi.task.UnsubscribeFromTaskCmd
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

class C8TaskSubscriptionApiImpl(
subscriptionRepository: SubscriptionRepository
subscriptionRepository: SubscriptionRepository,
private val subscribingUserTaskDelivery: SubscribingUserTaskDelivery?
) : AbstractTaskSubscriptionApiImpl(
subscriptionRepository = subscriptionRepository
) {

override fun meta(instance: MetaInfoAware): MetaInfo {
TODO("Not yet implemented")
}

override fun unsubscribe(cmd: UnsubscribeFromTaskCmd): Future<Empty> {
super.unsubscribe(cmd)
// For subscribing delivery, we also have to close the job
subscribingUserTaskDelivery?.unsubscribe(cmd.subscription)
return CompletableFuture.completedFuture(Empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import dev.bpmcrafters.processengineapi.adapter.c8.correlation.CorrelationApiImp
import dev.bpmcrafters.processengineapi.adapter.c8.correlation.SignalApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.deploy.DeploymentApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.process.StartProcessApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.task.SubscribingUserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.C8TaskListClientUserTaskCompletionApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.task.completion.C8ZeebeExternalServiceTaskCompletionApiImpl
import dev.bpmcrafters.processengineapi.adapter.c8.task.subscription.C8TaskSubscriptionApiImpl
Expand Down Expand Up @@ -70,8 +71,9 @@ class C8AdapterAutoConfiguration {
)

@Bean
fun taskCompletionApi(subscriptionRepository: SubscriptionRepository): TaskSubscriptionApi = C8TaskSubscriptionApiImpl(
subscriptionRepository = subscriptionRepository
fun taskCompletionApi(subscriptionRepository: SubscriptionRepository, subscribingUserTaskDelivery: SubscribingUserTaskDelivery?): TaskSubscriptionApi = C8TaskSubscriptionApiImpl(
subscriptionRepository = subscriptionRepository,
subscribingUserTaskDelivery = subscribingUserTaskDelivery,
)

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import dev.bpmcrafters.processengineapi.test.ProcessTestHelper

class C8ProcessTestHelper(
private val startProcessApi: StartProcessApi,
private val userTaskDelivery: SubscribingRefreshingUserTaskDelivery,
private val subscribingUserTaskDelivery: SubscribingRefreshingUserTaskDelivery,
private val pullUserTaskDelivery: PullUserTaskDelivery,
private val subscribingServiceTaskDelivery: SubscribingServiceTaskDelivery,
private val taskSubscriptionApi: TaskSubscriptionApi,
private val userTaskCompletionApi: UserTaskCompletionApi,
Expand All @@ -27,14 +28,19 @@ class C8ProcessTestHelper(
override fun getUserTaskCompletionApi(): UserTaskCompletionApi = userTaskCompletionApi
override fun getExternalTaskCompletionApi(): ExternalTaskCompletionApi = externalTaskCompletionApi

override fun triggerUserTaskDeliveryManually() = userTaskDelivery.refresh()
override fun triggerPullingUserTaskDeliveryManually() = pullUserTaskDelivery.deliverAll()
override fun subscribeForUserTasks() = subscribingUserTaskDelivery.subscribe()

override fun triggerExternalTaskDeliveryManually() = subscribingServiceTaskDelivery.subscribe()

override fun getProcessInformation(instanceId: String): ProcessInformation = ProcessInformation(
instanceId = "fixme",
meta = emptyMap()
)

override fun clearAllSubscriptions() = (subscriptionRepository as InMemSubscriptionRepository).deleteAllTaskSubscriptions()
override fun clearAllSubscriptions() {
(subscriptionRepository as InMemSubscriptionRepository).deleteAllTaskSubscriptions()
subscribingUserTaskDelivery.unsubscribeAll()
}

}
Loading

0 comments on commit aaaad8c

Please sign in to comment.