diff --git a/README.md b/README.md
index 9abc53d..db9861f 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-![version 0.3.6](https://img.shields.io/badge/version-0.3.6-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
+![version 0.3.7](https://img.shields.io/badge/version-0.3.7-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
Redis-Stream을 지원하는 Saga frame work 입니다.
@@ -228,8 +228,9 @@ fun exists(param: Any): Mono {
#### Events-Scenario4. Handle transaction event
다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 트랜잭션 상태에 맞는 핸들러를 호출합니다.
-이 핸들러를 구현함으로써, 트랜잭션별 상태를 처리할 수 있습니다. (롤백등)
-_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._
+이 핸들러를 구현함으로써, 트랜잭션 상태별 로직을 구현할 수 있습니다.
+각 핸들러에서 에러가 던져지면, 자동으로 rollback 이 호출됩니다.
+
> [!WARNING]
> 트랜잭션 핸들러는 반드시 핸들러에 맞는 `TransactionEvent` **하나**만을 파라미터로 받아야 합니다.
@@ -238,27 +239,28 @@ _롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합
@TransactionHandler
class TransactionHandler {
- @TransactionStartListener(Foo::class) // Receive transaction event when event can be mapped to Foo.class
+ @TransactionStartListener(event = Foo::class) // Receive transaction event when event can be mapped to Foo.class
fun handleTransactionStartEvent(event: TransactionStartEvent) {
val foo: Foo = event.decodeEvent(Foo::class) // Get event field to Foo.class
// ...
+ event.setNextEvent(nextFoo) // When this handler terminates and calls the next event or rollback, the event set here is published together.
}
- @TransactionJoinHandler // Receive all transaction event when no type is defined.
+ @TransactionJoinHandler(successWith = SuccessWith.PUBLISH_COMMIT) // Receive all transaction event when no type is defined. And, when terminated this function, publish commit state
fun handleTransactionJoinEvent(event: TransactionJoinEvent) {
// ...
}
@TransactionCommitHandler(
event = Foo::class,
- noRetryFor = [IllegalArgumentException::class] // Dont retry when throw IllegalArgumentException. *Retry if throw Throwable or IllegalArgumentException's super type*
+ noRollbackFor = [IllegalArgumentException::class] // Dont rollback when throw IllegalArgumentException. *Rollback if throw Throwable or IllegalArgumentException's super type*
)
fun handleTransactionCommitEvent(event: TransactionCommitEvent): Mono { // In Webflux framework, publisher must be returned.
throw IllegalArgumentException("Ignore this exception")
// ...
}
- @TransactionRollbackHandler
+ @TransactionRollbackHandler(Foo::class)
fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) { // In Mvc framework, publisher must not returned.
val undo: Foo = event.decodeUndo(Foo::class) // Get event field to Foo.class
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
index 9d6eb0f..892854f 100644
--- a/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
@@ -1,14 +1,12 @@
package org.rooftop.netx.api
-import org.rooftop.netx.engine.core.TransactionState
-
class EncodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)
class DecodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)
open class TransactionException(message: String) : RuntimeException(message)
-class AlreadyCommittedTransactionException(transactionId: String, state: TransactionState) :
+class AlreadyCommittedTransactionException(transactionId: String, state: String) :
TransactionException("Cannot join transaction cause, transaction \"$transactionId\" already \"$state\"")
class NotFoundDispatchFunctionException(message: String) : RuntimeException(message)
diff --git a/src/main/kotlin/org/rooftop/netx/api/Result.kt b/src/main/kotlin/org/rooftop/netx/api/Result.kt
index b09118d..73216fc 100644
--- a/src/main/kotlin/org/rooftop/netx/api/Result.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/Result.kt
@@ -9,6 +9,8 @@ class Result private constructor(
private val error: Error? = null,
) {
+ fun decodeResultOrThrow(typeReference: TypeReference): T = decodeResult(typeReference)
+
fun decodeResultOrThrow(type: Class): T = decodeResultOrThrow(type.kotlin)
fun decodeResultOrThrow(type: KClass): T {
@@ -18,7 +20,7 @@ class Result private constructor(
return decodeResult(type)
}
- fun decodeResult(typeReference: TypeReference): T = result?.let {
+ fun decodeResult(typeReference: TypeReference): T = result?.let {
codec.decode(it, typeReference)
} ?: throw ResultException("Cannot decode result cause Result is fail state")
diff --git a/src/main/kotlin/org/rooftop/netx/api/SuccessWith.kt b/src/main/kotlin/org/rooftop/netx/api/SuccessWith.kt
new file mode 100644
index 0000000..5a78ca1
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/api/SuccessWith.kt
@@ -0,0 +1,7 @@
+package org.rooftop.netx.api
+
+enum class SuccessWith {
+ PUBLISH_JOIN,
+ PUBLISH_COMMIT,
+ END,
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
index 4547794..87b33a2 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitEvent.kt
@@ -1,9 +1,13 @@
package org.rooftop.netx.api
-class TransactionCommitEvent(
+class TransactionCommitEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
-): TransactionEvent(transactionId, nodeName, group, event, codec)
+): TransactionEvent(transactionId, nodeName, group, event, codec) {
+
+ override fun copy(): TransactionEvent =
+ TransactionJoinEvent(transactionId, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt
index 7b7823d..239a8a9 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionCommitListener.kt
@@ -6,5 +6,5 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionCommitListener(
val event: KClass<*> = Any::class,
- val noRetryFor: Array> = [],
+ val noRollbackFor: Array> = [],
)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
index 1e78556..661179d 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
@@ -6,10 +6,16 @@ sealed class TransactionEvent(
val transactionId: String,
val nodeName: String,
val group: String,
- private val event: String?,
- private val codec: Codec,
+ internal val event: String?,
+ internal val codec: Codec,
+ internal var nextEvent: Any? = null,
) {
+ fun setNextEvent(event: T): T {
+ this.nextEvent = event
+ return event
+ }
+
fun decodeEvent(type: Class): T = decodeEvent(type.kotlin)
fun decodeEvent(type: KClass): T =
@@ -17,4 +23,6 @@ sealed class TransactionEvent(
event ?: throw NullPointerException("Cannot decode event cause event is null"),
type
)
+
+ internal abstract fun copy(): TransactionEvent
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
index 36a4c80..b8753f1 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
@@ -1,9 +1,13 @@
package org.rooftop.netx.api
-class TransactionJoinEvent(
+class TransactionJoinEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
-): TransactionEvent(transactionId, nodeName, group, event, codec)
+) : TransactionEvent(transactionId, nodeName, group, event, codec) {
+
+ override fun copy(): TransactionEvent =
+ TransactionJoinEvent(transactionId, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt
index d61980f..243b103 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionJoinListener.kt
@@ -6,5 +6,6 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionJoinListener(
val event: KClass<*> = Any::class,
- val noRetryFor: Array> = [],
+ val noRollbackFor: Array> = [],
+ val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
index 3662e03..25679e3 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
@@ -4,21 +4,21 @@ import reactor.core.publisher.Mono
interface TransactionManager {
- fun start(undo: T): Mono
+ fun start(): Mono
- fun start(undo: T, event: S): Mono
+ fun start(event: T): Mono
- fun syncStart(undo: T): String
+ fun syncStart(): String
- fun syncStart(undo: T, event: S): String
+ fun syncStart(event: T): String
- fun join(transactionId: String, undo: T): Mono
+ fun join(transactionId: String): Mono
- fun join(transactionId: String, undo: T, event: S): Mono
+ fun join(transactionId: String, event: T): Mono
- fun syncJoin(transactionId: String, undo: T): String
+ fun syncJoin(transactionId: String): String
- fun syncJoin(transactionId: String, undo: T, event: S): String
+ fun syncJoin(transactionId: String, event: T): String
fun exists(transactionId: String): Mono
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
index ab22323..21f2629 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
@@ -1,18 +1,14 @@
package org.rooftop.netx.api
-import kotlin.reflect.KClass
-
-class TransactionRollbackEvent(
+class TransactionRollbackEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
val cause: String,
- private val undo: String,
- private val codec: Codec,
+ codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec) {
- fun decodeUndo(type: Class): T = decodeUndo(type.kotlin)
-
- fun decodeUndo(type: KClass): T = codec.decode(undo, type)
+ override fun copy(): TransactionEvent =
+ TransactionJoinEvent(transactionId, nodeName, group, event, codec)
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt
index 27c703d..78b89b7 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionRollbackListener.kt
@@ -6,5 +6,4 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionRollbackListener(
val event: KClass<*> = Any::class,
- val noRetryFor: Array> = [],
)
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
index 2c35100..4348715 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
@@ -1,9 +1,13 @@
package org.rooftop.netx.api
-class TransactionStartEvent(
+class TransactionStartEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
-) : TransactionEvent(transactionId, nodeName, group, event, codec)
+) : TransactionEvent(transactionId, nodeName, group, event, codec) {
+
+ override fun copy(): TransactionEvent =
+ TransactionJoinEvent(transactionId, nodeName, group, event, codec)
+}
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt
index 8d3e6a6..0ca54a4 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionStartListener.kt
@@ -6,5 +6,6 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionStartListener(
val event: KClass<*> = Any::class,
- val noRetryFor: Array> = [],
+ val noRollbackFor: Array> = [],
+ val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
index 0b0c6a8..e842ecf 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractDispatchFunction.kt
@@ -1,20 +1,69 @@
package org.rooftop.netx.engine
import org.rooftop.netx.api.TransactionEvent
+import org.rooftop.netx.api.TransactionManager
+import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
internal sealed class AbstractDispatchFunction(
- protected val eventType: KClass<*>,
+ private val eventType: KClass<*>,
protected val function: KFunction,
protected val handler: Any,
- private val noRetryFor: Array>,
+ private val noRollbackFor: Array>,
+ private val nextState: NextTransactionState,
+ private val transactionManager: TransactionManager,
) {
fun name(): String = function.name
abstract fun call(transactionEvent: TransactionEvent): T
- protected fun isNoRetryFor(throwable: Throwable): Boolean {
- return noRetryFor.isNotEmpty() && throwable.cause != null && noRetryFor.contains(throwable.cause!!::class)
+ protected fun isNoRollbackFor(throwable: Throwable): Boolean {
+ return noRollbackFor.isNotEmpty() && throwable.cause != null && noRollbackFor.contains(
+ throwable.cause!!::class
+ )
+ }
+
+ protected fun isProcessable(transactionEvent: TransactionEvent): Boolean {
+ return runCatching {
+ transactionEvent.decodeEvent(eventType)
+ }.onFailure {
+ return it is NullPointerException && eventType == Any::class
+ }.isSuccess
+ }
+
+ protected fun rollback(transactionEvent: TransactionEvent, throwable: Throwable) {
+ transactionEvent.nextEvent?.let {
+ transactionManager.rollback(transactionEvent.transactionId, throwable.getCause(), it)
+ .subscribeOn(Schedulers.parallel())
+ .subscribe()
+ } ?: transactionManager.rollback(transactionEvent.transactionId, throwable.getCause())
+ .subscribeOn(Schedulers.parallel())
+ .subscribe()
+ }
+
+ protected fun publishNextTransaction(transactionEvent: TransactionEvent) {
+ when (nextState) {
+ NextTransactionState.JOIN -> transactionEvent.nextEvent?.let {
+ transactionManager.join(transactionEvent.transactionId, it)
+ } ?: transactionManager.join(transactionEvent.transactionId)
+
+ NextTransactionState.COMMIT -> transactionEvent.nextEvent?.let {
+ transactionManager.commit(transactionEvent.transactionId, it)
+ } ?: transactionManager.commit(transactionEvent.transactionId)
+
+ NextTransactionState.END -> return
+ }.subscribeOn(Schedulers.parallel())
+ .subscribe()
+ }
+
+ private fun Throwable.getCause(): String {
+ return this.message ?: this.cause?.message ?: this::class.java.name
+ }
+
+ internal enum class NextTransactionState {
+ JOIN,
+ COMMIT,
+ END
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
index 3f4b39c..1ef3baa 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractOrchestrateListener.kt
@@ -7,7 +7,7 @@ import kotlin.reflect.KClass
internal abstract class AbstractOrchestrateListener internal constructor(
private val orchestratorId: String,
- val orchestrateSequence: Int,
+ internal val orchestrateSequence: Int,
private val codec: Codec,
private val transactionManager: TransactionManager,
private val requestHolder: RequestHolder,
@@ -41,13 +41,42 @@ internal abstract class AbstractOrchestrateListener internal c
castableType = type
}
- internal fun Mono>.setNextCastableType(): Mono> {
+ private fun Mono>.setNextCastableType(): Mono> {
return this.doOnNext { (request, _) ->
nextOrchestrateListener?.castableType = request::class
nextRollbackOrchestrateListener?.castableType = request::class
}
}
+ protected fun orchestrate(transactionEvent: TransactionEvent): Mono {
+ return transactionEvent.startWithOrchestrateEvent()
+ .filter {
+ it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
+ }
+ .mapReifiedRequest()
+ .flatMap { (request, event) ->
+ holdRequestIfRollbackable(request, transactionEvent.transactionId)
+ .map { it to event }
+ }
+ .flatMap { (request, event) -> command(request, event) }
+ .setNextCastableType()
+ .doOnError {
+ rollback(
+ transactionEvent.transactionId,
+ it,
+ transactionEvent.decodeEvent(OrchestrateEvent::class)
+ )
+ }
+ .toOrchestrateEvent()
+ .map {
+ transactionEvent.setNextEvent(it)
+ }
+ }
+
+ protected open fun command(request: T, event: OrchestrateEvent): Mono> {
+ throw UnsupportedOperationException("Cannot invoke command please do concrete class from \"with\" method")
+ }
+
protected fun Mono.mapReifiedRequest(): Mono> {
return this.map { event ->
if (typeReference == null) {
@@ -78,7 +107,7 @@ internal abstract class AbstractOrchestrateListener internal c
)
}
- protected fun Mono>.toOrchestrateEvent(): Mono {
+ private fun Mono>.toOrchestrateEvent(): Mono {
return this.map { (response, context) ->
OrchestrateEvent(
orchestratorId = orchestratorId,
@@ -89,7 +118,7 @@ internal abstract class AbstractOrchestrateListener internal c
}
}
- protected fun getCastableType(): KClass {
+ private fun getCastableType(): KClass {
return castableType
?: throw NullPointerException("OrchestratorId \"$orchestratorId\", OrchestrateSequence \"$orchestrateSequence\"'s CastableType was null")
}
@@ -100,25 +129,15 @@ internal abstract class AbstractOrchestrateListener internal c
} ?: throw NullPointerException("Cannot cast \"$data\" cause, castableType is null")
}
- protected fun TransactionEvent.toOrchestrateEvent(): Mono =
+ protected fun TransactionEvent.startWithOrchestrateEvent(): Mono =
Mono.just(this.decodeEvent(OrchestrateEvent::class))
- protected fun Mono.onErrorRollback(
- transactionId: String,
- orchestrateEvent: OrchestrateEvent,
- ): Mono = this.onErrorResume {
- holdFailResult(transactionId, it)
- rollback(transactionId, it, orchestrateEvent)
- Mono.empty()
- }
-
- private fun holdFailResult(transactionId: String, throwable: Throwable) {
- throwable.stackTrace = arrayOf()
- resultHolder.setFailResult(transactionId, throwable)
- .subscribeOn(Schedulers.parallel()).subscribe()
+ private fun Throwable.toEmptyStackTrace(): Throwable {
+ this.stackTrace = arrayOf()
+ return this
}
- private fun rollback(
+ protected fun rollback(
transactionId: String,
throwable: Throwable,
orchestrateEvent: OrchestrateEvent,
@@ -130,19 +149,28 @@ internal abstract class AbstractOrchestrateListener internal c
"",
orchestrateEvent.context,
)
- transactionManager.rollback(
- transactionId = transactionId,
- cause = throwable.message ?: throwable.localizedMessage,
- event = rollbackOrchestrateEvent
- ).subscribeOn(Schedulers.parallel()).subscribe()
+ holdFailResult(transactionId, throwable)
+ .flatMap {
+ transactionManager.rollback(
+ transactionId = transactionId,
+ cause = throwable.message ?: throwable.localizedMessage,
+ event = rollbackOrchestrateEvent
+ )
+ }.subscribeOn(Schedulers.parallel()).subscribe()
+ }
+
+ private fun holdFailResult(transactionId: String, throwable: Throwable): Mono {
+ return resultHolder.setFailResult(transactionId, throwable.toEmptyStackTrace())
+ }
+
+ open fun withAnnotated(): AbstractOrchestrateListener {
+ return this
}
override fun toString(): String {
- return "AbstractOrchestrateListener(orchestrateSequence=$orchestrateSequence, " +
+ return "${this.javaClass.name}(orchestrateSequence=$orchestrateSequence, " +
"isFirst=$isFirst, isLast=$isLast, isRollbackable=$isRollbackable, " +
"beforeRollbackOrchestrateSequence=$beforeRollbackOrchestrateSequence, " +
"rollbackSequence=$rollbackSequence)"
}
-
-
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
index 477c330..d84d411 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionDispatcher.kt
@@ -13,8 +13,9 @@ import kotlin.reflect.KClass
import kotlin.reflect.KFunction
import kotlin.reflect.full.declaredMemberFunctions
-abstract class AbstractTransactionDispatcher(
+internal abstract class AbstractTransactionDispatcher(
private val codec: Codec,
+ private val transactionManager: TransactionManager,
) {
private val functions =
@@ -25,21 +26,27 @@ abstract class AbstractTransactionDispatcher(
.flatMap { function ->
when (function) {
is MonoDispatchFunction -> {
- mapToTransactionEvent(transaction)
+ mapToTransactionEvent(transaction.copy())
.callMono(function)
.warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
}
is NotPublishDispatchFunction -> {
- mapToTransactionEvent(transaction)
+ mapToTransactionEvent(transaction.copy())
.callNotPublish(function)
.warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
}
+
+ is OrchestrateDispatchFunction -> {
+ mapToTransactionEvent(transaction.copy())
+ .callOrchestrate(function)
+ .warningOnError("Error occurred in TransactionHandler function \"${function.name()}\" with transaction id ${transaction.id}")
+ }
}
}
.subscribeOn(Schedulers.boundedElastic())
.ackWhenComplete(transaction, messageId)
- .then(Mono.just(DISPATHCED))
+ .then(Mono.just(DISPATCHED))
}
private fun Flux<*>.ackWhenComplete(
@@ -88,15 +95,8 @@ abstract class AbstractTransactionDispatcher(
)
)
- TransactionState.ROLLBACK -> findOwnUndo(transaction)
- .onErrorResume {
- if (it is TransactionException) {
- return@onErrorResume Mono.empty()
- }
- throw it
- }
- .warningOnError("Error occurred when findOwnUndo transaction ${transaction.id}")
- .map {
+ TransactionState.ROLLBACK ->
+ Mono.just(
TransactionRollbackEvent(
transactionId = transaction.id,
nodeName = transaction.serverId,
@@ -104,10 +104,9 @@ abstract class AbstractTransactionDispatcher(
event = extractEvent(transaction),
cause = transaction.cause
?: throw NullPointerException("Null value on TransactionRollbackEvent's cause field"),
- undo = it,
codec = codec,
)
- }
+ )
}
}
@@ -118,12 +117,40 @@ abstract class AbstractTransactionDispatcher(
}
}
- protected abstract fun findOwnUndo(transaction: Transaction): Mono
+ internal fun addOrchestrate(handler: Any) {
+ addOrchestrateFunctions(handler)
+ info("Add orchestrate fucntion : \"${handler}\"")
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun addOrchestrateFunctions(handler: Any) {
+ val returnTypeMatchedHandlers = handler::class.declaredMemberFunctions
+ .filter { it.returnType.classifier == Mono::class }
- internal fun addHandler(handler: Any) {
- initMonoFunctions(listOf(handler))
- initNotPublisherFunctions(listOf(handler))
- info("Add functions : \"${handler}\"")
+ returnTypeMatchedHandlers.forEach { function ->
+ function.annotations
+ .forEach { annotation ->
+ runCatching {
+ val transactionState = getMatchedTransactionState(annotation)
+ val eventType = getEventType(annotation)
+ val noRollbackFor = getNoRollbackFor(annotation)
+ val nextState = getNextTransactionState(annotation)
+ functions.putIfAbsent(transactionState, mutableListOf())
+ functions[transactionState]?.add(
+ OrchestrateDispatchFunction(
+ eventType,
+ function as KFunction>,
+ handler,
+ noRollbackFor,
+ nextState,
+ transactionManager,
+ )
+ )
+ }.onFailure {
+ throw IllegalStateException("Cannot add Mono TransactionHandler", it)
+ }
+ }
+ }
}
@PostConstruct
@@ -151,14 +178,17 @@ abstract class AbstractTransactionDispatcher(
runCatching {
val transactionState = getMatchedTransactionState(annotation)
val eventType = getEventType(annotation)
- val noRetryFor = getNoRetryFor(annotation)
+ val noRollbackFor = getNoRollbackFor(annotation)
+ val nextState = getNextTransactionState(annotation)
functions.putIfAbsent(transactionState, mutableListOf())
functions[transactionState]?.add(
MonoDispatchFunction(
eventType,
function as KFunction>,
handler,
- noRetryFor,
+ noRollbackFor,
+ nextState,
+ transactionManager,
)
)
}.onFailure {
@@ -183,10 +213,18 @@ abstract class AbstractTransactionDispatcher(
runCatching {
val transactionState = getMatchedTransactionState(annotation)
val eventType = getEventType(annotation)
- val noRetryFor = getNoRetryFor(annotation)
+ val noRollbackFor = getNoRollbackFor(annotation)
+ val nextState = getNextTransactionState(annotation)
functions.putIfAbsent(transactionState, mutableListOf())
functions[transactionState]?.add(
- NotPublishDispatchFunction(eventType, function, handler, noRetryFor)
+ NotPublishDispatchFunction(
+ eventType,
+ function,
+ handler,
+ noRollbackFor,
+ nextState,
+ transactionManager,
+ )
)
}.onFailure {
throw IllegalStateException("Cannot add TransactionHandler", it)
@@ -208,12 +246,12 @@ abstract class AbstractTransactionDispatcher(
}
}
- private fun getNoRetryFor(annotation: Annotation): Array> {
+ private fun getNoRollbackFor(annotation: Annotation): Array> {
return when (annotation) {
- is TransactionStartListener -> annotation.noRetryFor
- is TransactionCommitListener -> annotation.noRetryFor
- is TransactionJoinListener -> annotation.noRetryFor
- is TransactionRollbackListener -> annotation.noRetryFor
+ is TransactionStartListener -> annotation.noRollbackFor
+ is TransactionCommitListener -> annotation.noRollbackFor
+ is TransactionJoinListener -> annotation.noRollbackFor
+ is TransactionRollbackListener -> emptyArray()
else -> throw notMatchedTransactionHandlerException
}
}
@@ -228,13 +266,29 @@ abstract class AbstractTransactionDispatcher(
}
}
+ private fun getNextTransactionState(annotation: Annotation): AbstractDispatchFunction.NextTransactionState {
+ return when (annotation) {
+ is TransactionStartListener -> annotation.successWith.toNextTransactionState()
+ is TransactionJoinListener -> annotation.successWith.toNextTransactionState()
+ else -> AbstractDispatchFunction.NextTransactionState.END
+ }
+ }
+
+ private fun SuccessWith.toNextTransactionState(): AbstractDispatchFunction.NextTransactionState {
+ return when (this) {
+ SuccessWith.PUBLISH_JOIN -> AbstractDispatchFunction.NextTransactionState.JOIN
+ SuccessWith.PUBLISH_COMMIT -> AbstractDispatchFunction.NextTransactionState.COMMIT
+ SuccessWith.END -> AbstractDispatchFunction.NextTransactionState.END
+ }
+ }
+
protected abstract fun ack(
transaction: Transaction,
messageId: String
): Mono>
private companion object {
- private const val DISPATHCED = "dispatched"
+ private const val DISPATCHED = "dispatched"
private val notMatchedTransactionHandlerException =
NotFoundDispatchFunctionException("Cannot find matched Transaction handler")
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
index b1a08b3..a511773 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionListener.kt
@@ -8,7 +8,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
-abstract class AbstractTransactionListener(
+internal abstract class AbstractTransactionListener(
private val backpressureSize: Int,
private val transactionDispatcher: AbstractTransactionDispatcher,
) {
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
index 087d047..05b7468 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
@@ -11,31 +11,31 @@ import org.rooftop.netx.engine.logging.infoOnError
import org.rooftop.netx.engine.logging.warningOnError
import reactor.core.publisher.Mono
-abstract class AbstractTransactionManager(
+internal abstract class AbstractTransactionManager(
private val codec: Codec,
private val nodeGroup: String,
private val nodeName: String,
private val transactionIdGenerator: TransactionIdGenerator,
) : TransactionManager {
- final override fun syncStart(undo: T): String {
- return start(undo).block()
- ?: throw TransactionException("Cannot start transaction \"$undo\"")
+ override fun syncStart(): String {
+ return start().block()
+ ?: throw TransactionException("Cannot start transaction")
}
- override fun syncStart(undo: T, event: S): String {
- return start(undo, event).block()
- ?: throw TransactionException("Cannot start transaction \"$undo\" \"$event\"")
+ final override fun syncStart(event: T): String {
+ return start(event).block()
+ ?: throw TransactionException("Cannot start transaction \"$event\"")
}
- final override fun syncJoin(transactionId: String, undo: T): String {
- return join(transactionId, undo).block()
- ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\"")
+ override fun syncJoin(transactionId: String): String {
+ return join(transactionId).block()
+ ?: throw TransactionException("Cannot join transaction \"$transactionId\"")
}
- override fun syncJoin(transactionId: String, undo: T, event: S): String {
- return join(transactionId, undo, event).block()
- ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\", \"$event\"")
+ final override fun syncJoin(transactionId: String, event: T): String {
+ return join(transactionId, event).block()
+ ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$event\"")
}
final override fun syncExists(transactionId: String): String {
@@ -63,26 +63,22 @@ abstract class AbstractTransactionManager(
?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"")
}
- final override fun start(undo: T): Mono {
- return Mono.fromCallable { codec.encode(undo) }
- .flatMap { encodedUndo ->
- startTransaction(encodedUndo, null)
- .info("Start transaction undo \"$undo\"")
- }
+ override fun start(): Mono {
+ return startTransaction(null)
+ .info("Start transaction")
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}
- override fun start(undo: T, event: S): Mono {
- return Mono.fromCallable { codec.encode(undo) }
- .map { it to codec.encode(event) }
- .flatMap { (encodedUndo, encodedEvent) ->
- startTransaction(encodedUndo, encodedEvent)
- .info("Start transaction undo \"$undo\"")
+ final override fun start(event: T): Mono {
+ return Mono.fromCallable { codec.encode(event) }
+ .flatMap { encodedEvent ->
+ startTransaction(encodedEvent)
+ .info("Start transaction event \"$event\"")
}
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}
- private fun startTransaction(undo: String, event: String?): Mono {
+ private fun startTransaction(event: String?): Mono {
return Mono.deferContextual { Mono.just(it[CONTEXT_TX_KEY]) }
.flatMap { transactionId ->
publishTransaction(
@@ -91,53 +87,50 @@ abstract class AbstractTransactionManager(
serverId = nodeName,
group = nodeGroup,
state = TransactionState.START,
- undo = undo,
event = event,
)
)
}
}
- override fun join(transactionId: String, undo: T): Mono {
+ override fun join(transactionId: String): Mono {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
- throw AlreadyCommittedTransactionException(transactionId, it)
+ throw AlreadyCommittedTransactionException(transactionId, it.name)
}
transactionId
}
.warningOnError("Cannot join transaction")
- .map { codec.encode(undo) }
.flatMap {
- joinTransaction(transactionId, it, null)
- .info("Join transaction transactionId \"$transactionId\", undo \"$undo\"")
+ joinTransaction(transactionId, null)
+ .info("Join transaction transactionId \"$transactionId\"")
}
}
- override fun join(transactionId: String, undo: T, event: S): Mono {
+ override fun join(transactionId: String, event: T): Mono {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
- throw AlreadyCommittedTransactionException(transactionId, it)
+ throw AlreadyCommittedTransactionException(transactionId, it.name)
}
transactionId
}
.warningOnError("Cannot join transaction")
- .map { codec.encode(undo) to codec.encode(event) }
- .flatMap { (encodedUndo, encodedEvent) ->
- joinTransaction(transactionId, encodedUndo, encodedEvent)
- .info("Join transaction transactionId \"$transactionId\", undo \"$undo\"")
+ .map { codec.encode(event) }
+ .flatMap {
+ joinTransaction(transactionId, it)
+ .info("Join transaction transactionId \"$transactionId\", event \"$event\"")
}
}
- private fun joinTransaction(transactionId: String, undo: String, event: String?): Mono {
+ private fun joinTransaction(transactionId: String, event: String?): Mono {
return publishTransaction(
transactionId, Transaction(
id = transactionId,
serverId = nodeName,
group = nodeGroup,
state = TransactionState.JOIN,
- undo = undo,
event = event,
)
)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
index ac50c02..e1876d2 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionRetrySupporter.kt
@@ -12,7 +12,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
-abstract class AbstractTransactionRetrySupporter(
+internal abstract class AbstractTransactionRetrySupporter(
private val backpressureSize: Int,
private val recoveryMilli: Long,
private val transactionDispatcher: AbstractTransactionDispatcher,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
index 117adcc..08e4ffa 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/DefaultOrchestrateChain.kt
@@ -8,8 +8,8 @@ class DefaultOrchestrateChain private constru
private val orchestratorId: String,
private val orchestrateSequence: Int,
private val chainContainer: ChainContainer,
- private val orchestrateListener: AbstractOrchestrateListener,
- private val rollbackOrchestrateListener: AbstractOrchestrateListener?,
+ private var orchestrateListener: AbstractOrchestrateListener,
+ private var rollbackOrchestrateListener: AbstractOrchestrateListener?,
private val beforeDefaultOrchestrateChain: DefaultOrchestrateChain? = null,
) : OrchestrateChain {
@@ -211,15 +211,15 @@ class DefaultOrchestrateChain private constru
this,
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
- val firstOrchestrateChain = nextDefaultOrchestrateChain.initOrchestrateListeners()
+ val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
return@cache OrchestratorManager(
transactionManager = chainContainer.transactionManager,
codec = chainContainer.codec,
orchestratorId = orchestratorId,
resultHolder = chainContainer.resultHolder,
- orchestrateListener = firstOrchestrateChain.orchestrateListener,
- rollbackOrchestrateListener = firstOrchestrateChain.rollbackOrchestrateListener,
+ orchestrateListener = firstOrchestrators.first,
+ rollbackOrchestrateListener = firstOrchestrators.second,
)
}
}
@@ -263,32 +263,33 @@ class DefaultOrchestrateChain private constru
)
this.nextDefaultOrchestrateChain = nextDefaultOrchestrateChain
- val firstOrchestrateChain = nextDefaultOrchestrateChain.initOrchestrateListeners()
+ val firstOrchestrators = nextDefaultOrchestrateChain.initOrchestrateListeners()
return@cache OrchestratorManager(
transactionManager = chainContainer.transactionManager,
codec = chainContainer.codec,
orchestratorId = orchestratorId,
resultHolder = chainContainer.resultHolder,
- orchestrateListener = firstOrchestrateChain.orchestrateListener,
- rollbackOrchestrateListener = firstOrchestrateChain.rollbackOrchestrateListener,
+ orchestrateListener = firstOrchestrators.first,
+ rollbackOrchestrateListener = firstOrchestrators.second
)
}
}
@Suppress("UNCHECKED_CAST")
- private fun initOrchestrateListeners(): DefaultOrchestrateChain {
- val cursorAndOrchestrateListener = getAllOrchestrateListeners()
+ private fun initOrchestrateListeners(): Pair, AbstractOrchestrateListener?> {
+ val annotatedListeners = getAllOrchestrateListeners()
+ .toAnnotatedListeners()
- chainOrchestrateListeners(cursorAndOrchestrateListener.second)
- chainRollbackListeners(cursorAndOrchestrateListener.second)
+ chainOrchestrateListeners(annotatedListeners)
+ chainRollbackListeners(annotatedListeners)
- addDispatcher(cursorAndOrchestrateListener.second)
+ addDispatcher(annotatedListeners)
- return cursorAndOrchestrateListener.first as DefaultOrchestrateChain
+ return annotatedListeners[0] as Pair, AbstractOrchestrateListener?>
}
- private fun getAllOrchestrateListeners(): Pair?, MutableList, AbstractOrchestrateListener?>>> {
+ private fun getAllOrchestrateListeners(): MutableList, AbstractOrchestrateListener?>> {
val orchestrateListeners = mutableListOf<
Pair, AbstractOrchestrateListener?>>()
@@ -308,7 +309,22 @@ class DefaultOrchestrateChain private constru
orchestrateListeners.reverse()
- return defaultOrchestrateChainCursor to orchestrateListeners
+ return orchestrateListeners
+ }
+
+ private fun List, AbstractOrchestrateListener?>>.toAnnotatedListeners()
+ : MutableList, AbstractOrchestrateListener?>> {
+ return this.withIndex().map {
+ val isFirst = it.index == 0
+ val isLast =
+ it.index == (this.size - 1 - COMMIT_LISTENER_PREFIX)
+
+ val listener = it.value.first
+ listener.isFirst = isFirst
+ listener.isLast = isLast
+
+ listener.withAnnotated() to it.value.second
+ }.toMutableList()
}
private fun chainOrchestrateListeners(orchestrateListeners: List, AbstractOrchestrateListener?>>) {
@@ -357,8 +373,8 @@ class DefaultOrchestrateChain private constru
private fun addDispatcher(orchestrateListeners: List, AbstractOrchestrateListener?>>) {
orchestrateListeners.forEach { (listener, rollbackListener) ->
- chainContainer.transactionDispatcher.addHandler(listener)
- rollbackListener?.let { chainContainer.transactionDispatcher.addHandler(it) }
+ chainContainer.transactionDispatcher.addOrchestrate(listener)
+ rollbackListener?.let { chainContainer.transactionDispatcher.addOrchestrate(it) }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
index 37c5e54..b559cf8 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/MonoDispatchFunction.kt
@@ -1,6 +1,7 @@
package org.rooftop.netx.engine
import org.rooftop.netx.api.TransactionEvent
+import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
@@ -17,42 +18,36 @@ internal class MonoDispatchFunction(
function: KFunction>,
handler: Any,
noRetryFor: Array>,
-) : AbstractDispatchFunction>(eventType, function, handler, noRetryFor) {
+ nextState: NextTransactionState,
+ transactionManager: TransactionManager,
+) : AbstractDispatchFunction>(
+ eventType,
+ function,
+ handler,
+ noRetryFor,
+ nextState,
+ transactionManager,
+) {
override fun call(transactionEvent: TransactionEvent): Mono<*> {
- runCatching { transactionEvent.decodeEvent(eventType) }
- .fold(
- onSuccess = {
- return Mono.just(DEFAULT_MONO)
- .flatMap { function.call(handler, transactionEvent) }
- .onErrorResume { throwable ->
- if (isNoRetryFor(throwable)) {
- info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\" no retry for mode")
- return@onErrorResume Mono.empty()
- }
- throw throwable
- }
- .info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
- },
- onFailure = {
- if (it is NullPointerException && eventType == Any::class) {
- return Mono.just(DEFAULT_MONO)
- .flatMap { function.call(handler, transactionEvent) }
- .onErrorResume { throwable ->
- if (isNoRetryFor(throwable)) {
- info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\" no retry for mode")
- return@onErrorResume Mono.empty()
- }
- throw throwable
- }
- .info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
- }
- return Mono.just("Skip \"${name()}\" handler")
+ return Mono.just(transactionEvent)
+ .filter { isProcessable(transactionEvent) }
+ .map { transactionEvent.copy() }
+ .flatMap { function.call(handler, transactionEvent) }
+ .info("Call Mono TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ .switchIfEmpty(`continue`)
+ .doOnNext { publishNextTransaction(transactionEvent) }
+ .onErrorResume {
+ if (isNoRollbackFor(it)) {
+ return@onErrorResume noRollbackFor
}
- )
+ rollback(transactionEvent, it)
+ `continue`
+ }
}
private companion object {
- private const val DEFAULT_MONO = "DEFAULT"
+ private val `continue` = Mono.just("CONTINUE")
+ private val noRollbackFor = Mono.just("NO_ROLLBACK_FOR")
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
index 24dd4e2..cd263d2 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/NotPublishDispatchFunction.kt
@@ -1,6 +1,7 @@
package org.rooftop.netx.engine
import org.rooftop.netx.api.TransactionEvent
+import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.logging.info
import reactor.core.publisher.Mono
import kotlin.reflect.KClass
@@ -14,37 +15,38 @@ internal class NotPublishDispatchFunction(
eventType: KClass<*>,
function: KFunction<*>,
handler: Any,
- noRetryFor: Array>,
-) : AbstractDispatchFunction(eventType, function, handler, noRetryFor) {
+ noRollbackFor: Array>,
+ nextState: NextTransactionState,
+ transactionManager: TransactionManager,
+) : AbstractDispatchFunction(
+ eventType,
+ function,
+ handler,
+ noRollbackFor,
+ nextState,
+ transactionManager,
+) {
- override fun call(transactionEvent: TransactionEvent): Any? {
- if (!isDecodable(transactionEvent)) {
- return SKIP
+ override fun call(transactionEvent: TransactionEvent): Any {
+ if (isProcessable(transactionEvent)) {
+ return runCatching {
+ function.call(handler, transactionEvent)
+ info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ }.fold(
+ onSuccess = { publishNextTransaction(transactionEvent) },
+ onFailure = {
+ if (isNoRollbackFor(it)) {
+ return@fold NO_ROLLBACK_FOR
+ }
+ rollback(transactionEvent, it)
+ },
+ )
}
- runCatching {
- val result = function.call(handler, transactionEvent)
- info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
- return result
- }.onFailure { throwable ->
- if (isNoRetryFor(throwable)) {
- info("Call NotPublisher TransactionHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\" no retry for mode")
- return SUCCESS_CAUSE_NO_RETRY_FOR
- }
- throw throwable
- }
- throw IllegalStateException("Unreachable code")
- }
-
- private fun isDecodable(transactionEvent: TransactionEvent): Boolean {
- runCatching { transactionEvent.decodeEvent(eventType) }
- .onFailure {
- return it is NullPointerException && eventType == Any::class
- }
- return true
+ return SKIP
}
private companion object {
- private const val SUCCESS_CAUSE_NO_RETRY_FOR = "SUCCESS"
+ private const val NO_ROLLBACK_FOR = "SUCCESS"
private const val SKIP = "SKIP"
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt
new file mode 100644
index 0000000..6c88835
--- /dev/null
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateDispatchFunction.kt
@@ -0,0 +1,55 @@
+package org.rooftop.netx.engine
+
+import org.rooftop.netx.api.TransactionEvent
+import org.rooftop.netx.api.TransactionManager
+import org.rooftop.netx.engine.logging.info
+import reactor.core.publisher.Mono
+import kotlin.reflect.KClass
+import kotlin.reflect.KFunction
+
+internal fun Mono.callOrchestrate(function: OrchestrateDispatchFunction): Mono<*> {
+ return this.flatMap {
+ function.call(it)
+ }
+}
+
+internal class OrchestrateDispatchFunction(
+ eventType: KClass<*>,
+ function: KFunction>,
+ handler: Any,
+ noRetryFor: Array>,
+ nextState: NextTransactionState,
+ transactionManager: TransactionManager,
+) : AbstractDispatchFunction>(
+ eventType,
+ function,
+ handler,
+ noRetryFor,
+ nextState,
+ transactionManager,
+) {
+
+ override fun call(transactionEvent: TransactionEvent): Mono<*> {
+ return Mono.just(transactionEvent)
+ .filter { isProcessable(transactionEvent) }
+ .map { transactionEvent.copy() }
+ .flatMap { function.call(handler, transactionEvent) }
+ .info("Call OrchestrateHandler \"${name()}\" with transactionId \"${transactionEvent.transactionId}\"")
+ .map {
+ publishNextTransaction(transactionEvent)
+ it
+ }
+ .switchIfEmpty(`continue`)
+ .onErrorResume {
+ if (isNoRollbackFor(it)) {
+ return@onErrorResume noRollbackFor
+ }
+ `continue`
+ }
+ }
+
+ private companion object {
+ private val `continue` = Mono.just("CONTINUE")
+ private val noRollbackFor = Mono.just("NO_ROLLBACK_FOR")
+ }
+}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
index d7483af..c4f6888 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
@@ -66,12 +66,11 @@ class OrchestratorManager internal constructor(
context = codec.encode(context.mapValues { codec.encode(it.value) })
)
}
- .flatMap { transactionManager.start(UNDO, it) }
+ .flatMap { transactionManager.start(it) }
.flatMap { resultHolder.getResult(timeoutMillis.milliseconds, it) }
}
private companion object {
- private const val UNDO = "Orchestrate mode";
private const val TEN_SECONDS_TO_TIME_OUT = 10000L
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt b/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt
index e5f0acc..dce8ef3 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/core/Transaction.kt
@@ -1,11 +1,10 @@
package org.rooftop.netx.engine.core
-data class Transaction(
+internal data class Transaction(
val id: String,
val serverId: String,
val group: String,
val state: TransactionState,
- val undo: String? = null,
val cause: String? = null,
val event: String? = null,
)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt b/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt
index be97f36..f5697df 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/core/TransactionState.kt
@@ -1,6 +1,6 @@
package org.rooftop.netx.engine.core
-enum class TransactionState {
+internal enum class TransactionState {
JOIN,
COMMIT,
ROLLBACK,
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
index 91d4479..248e4aa 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/CommitOrchestrateListener.kt
@@ -8,7 +8,7 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class CommitOrchestrateListener internal constructor(
- private val codec: Codec,
+ codec: Codec,
transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
@@ -27,25 +27,26 @@ internal class CommitOrchestrateListener internal constructor(
) {
@TransactionCommitListener(OrchestrateEvent::class)
- fun listenCommitOrchestrateEvent(transactionCommitEvent: TransactionCommitEvent): Mono {
- return Mono.just(transactionCommitEvent)
- .map { it.decodeEvent(OrchestrateEvent::class) }
+ fun listenCommitOrchestrateEvent(transactionCommitEvent: TransactionCommitEvent): Mono {
+ return transactionCommitEvent.startWithOrchestrateEvent()
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionCommitEvent.transactionId)
- .map{ it to event }
+ .map { it to event }
}
.map { (request, event) ->
orchestrateCommand.command(request, event.context)
}
+ .doOnError {
+ rollback(
+ transactionCommitEvent.transactionId,
+ it,
+ transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
+ )
+ }
.flatMap { (response, _) ->
resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response)
}
- .onErrorRollback(
- transactionCommitEvent.transactionId,
- transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
- )
- .map { }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
index 88f8c31..566ffde 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/JoinOrchestrateListener.kt
@@ -8,14 +8,14 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class JoinOrchestrateListener(
- codec: Codec,
+ private val codec: Codec,
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateCommand: OrchestrateCommand,
- requestHolder: RequestHolder,
- resultHolder: ResultHolder,
- typeReference: TypeReference?,
+ private val requestHolder: RequestHolder,
+ private val resultHolder: ResultHolder,
+ private val typeReference: TypeReference?,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -26,46 +26,59 @@ internal class JoinOrchestrateListener(
typeReference,
) {
- @TransactionJoinListener(OrchestrateEvent::class)
- fun listenJoinOrchestrateEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
- return transactionJoinEvent.toOrchestrateEvent()
- .filter {
- it.orchestrateSequence == orchestrateSequence
- && it.orchestratorId == orchestratorId
- }
- .mapReifiedRequest()
- .flatMap { (request, event) ->
- holdRequestIfRollbackable(request, transactionJoinEvent.transactionId)
- .map{ it to event }
+ override fun withAnnotated(): AbstractOrchestrateListener {
+ return when {
+ isLast -> this.successWithCommit()
+ !isLast -> this.successWithJoin()
+ else -> error("Cannot annotated")
+ }
+ }
+
+ private fun successWithJoin(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionJoinListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_JOIN
+ )
+ fun handleTransactionJoinEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
+ return orchestrate(transactionJoinEvent)
}
- .map { (request, event) ->
- orchestrateCommand.command(request, event.context)
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return Mono.fromCallable { orchestrateCommand.command(request, event.context) }
}
- .setNextCastableType()
- .onErrorRollback(
- transactionJoinEvent.transactionId,
- transactionJoinEvent.decodeEvent(OrchestrateEvent::class)
+ }
+ }
+
+ private fun successWithCommit(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionJoinListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_COMMIT
)
- .toOrchestrateEvent()
- .flatMap {
- if (isLast) {
- return@flatMap transactionManager.commit(
- transactionId = transactionJoinEvent.transactionId,
- event = it,
- )
- }
- transactionManager.join(
- transactionId = transactionJoinEvent.transactionId,
- undo = "",
- event = it,
- )
+ fun handleTransactionJoinEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
+ return orchestrate(transactionJoinEvent)
}
- .onErrorResume {
- if (it::class == AlreadyCommittedTransactionException::class) {
- return@onErrorResume Mono.empty()
- }
- throw it
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return Mono.fromCallable { orchestrateCommand.command(request, event.context) }
}
- .map { }
+ }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
index 5be9bc5..70fbadf 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoCommitOrchestrateListener.kt
@@ -8,7 +8,7 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class MonoCommitOrchestrateListener internal constructor(
- private val codec: Codec,
+ codec: Codec,
transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
@@ -26,25 +26,26 @@ internal class MonoCommitOrchestrateListener internal construc
typeReference,
) {
@TransactionCommitListener(OrchestrateEvent::class)
- fun listenCommitOrchestrateEvent(transactionCommitEvent: TransactionCommitEvent): Mono {
- return Mono.just(transactionCommitEvent)
- .map { it.decodeEvent(OrchestrateEvent::class) }
+ fun listenCommitOrchestrateEvent(transactionCommitEvent: TransactionCommitEvent): Mono {
+ return transactionCommitEvent.startWithOrchestrateEvent()
.filter { it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId }
.mapReifiedRequest()
.flatMap { (request, event) ->
holdRequestIfRollbackable(request, transactionCommitEvent.transactionId)
- .map{ it to event }
+ .map { it to event }
}
.flatMap { (request, event) ->
monoOrchestrateCommand.command(request, event.context)
}
+ .doOnError {
+ rollback(
+ transactionCommitEvent.transactionId,
+ it,
+ transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
+ )
+ }
.flatMap { (response, _) ->
resultHolder.setSuccessResult(transactionCommitEvent.transactionId, response)
}
- .onErrorRollback(
- transactionCommitEvent.transactionId,
- transactionCommitEvent.decodeEvent(OrchestrateEvent::class)
- )
- .map { }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
index 4fc434f..8bb0192 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoJoinOrchestrateListener.kt
@@ -8,14 +8,14 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class MonoJoinOrchestrateListener(
- codec: Codec,
+ private val codec: Codec,
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val monoOrchestrateCommand: MonoOrchestrateCommand,
- requestHolder: RequestHolder,
- resultHolder: ResultHolder,
- typeReference: TypeReference?,
+ private val requestHolder: RequestHolder,
+ private val resultHolder: ResultHolder,
+ private val typeReference: TypeReference?,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -26,45 +26,59 @@ internal class MonoJoinOrchestrateListener(
typeReference,
) {
- @TransactionJoinListener(OrchestrateEvent::class)
- fun listenJoinOrchestrateEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
- return transactionJoinEvent.toOrchestrateEvent()
- .filter {
- it.orchestrateSequence == orchestrateSequence && it.orchestratorId == orchestratorId
- }
- .mapReifiedRequest()
- .flatMap { (request, event) ->
- holdRequestIfRollbackable(request, transactionJoinEvent.transactionId)
- .map { it to event }
+ override fun withAnnotated(): AbstractOrchestrateListener {
+ return when {
+ isLast -> this.successWithCommit()
+ !isLast -> this.successWithJoin()
+ else -> error("Cannot annotated")
+ }
+ }
+
+ private fun successWithJoin(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionJoinListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_JOIN
+ )
+ fun handleTransactionJoinEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
+ return orchestrate(transactionJoinEvent)
}
- .flatMap { (request, event) ->
- monoOrchestrateCommand.command(request, event.context)
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return monoOrchestrateCommand.command(request, event.context)
}
- .setNextCastableType()
- .onErrorRollback(
- transactionJoinEvent.transactionId,
- transactionJoinEvent.decodeEvent(OrchestrateEvent::class)
+ }
+ }
+
+ private fun successWithCommit(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionJoinListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_COMMIT
)
- .toOrchestrateEvent()
- .flatMap {
- if (isLast) {
- return@flatMap transactionManager.commit(
- transactionId = transactionJoinEvent.transactionId,
- event = it,
- )
- }
- transactionManager.join(
- transactionId = transactionJoinEvent.transactionId,
- undo = "",
- event = it,
- )
+ fun handleTransactionJoinEvent(transactionJoinEvent: TransactionJoinEvent): Mono {
+ return orchestrate(transactionJoinEvent)
}
- .onErrorResume {
- if (it::class == AlreadyCommittedTransactionException::class) {
- return@onErrorResume Mono.empty()
- }
- throw it
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return monoOrchestrateCommand.command(request, event.context)
}
- .map { }
+ }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
index 72e2e2f..f395809 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoRollbackOrchestrateListener.kt
@@ -28,8 +28,7 @@ internal class MonoRollbackOrchestrateListener(
@TransactionRollbackListener(OrchestrateEvent::class)
fun listenRollbackOrchestrateEvent(transactionRollbackEvent: TransactionRollbackEvent): Mono {
- return Mono.just(transactionRollbackEvent)
- .map { it.decodeEvent(OrchestrateEvent::class) }
+ return transactionRollbackEvent.startWithOrchestrateEvent()
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.getHeldRequest(transactionRollbackEvent)
.flatMap { (request, event) ->
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
index 4c4d294..71f8630 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/MonoStartOrchestrateListener.kt
@@ -8,14 +8,14 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class MonoStartOrchestrateListener(
- codec: Codec,
+ private val codec: Codec,
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val monoOrchestrateCommand: MonoOrchestrateCommand,
- requestHolder: RequestHolder,
- resultHolder: ResultHolder,
- typeReference: TypeReference?,
+ private val requestHolder: RequestHolder,
+ private val resultHolder: ResultHolder,
+ private val typeReference: TypeReference?,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -26,43 +26,59 @@ internal class MonoStartOrchestrateListener(
typeReference,
) {
- @TransactionStartListener(OrchestrateEvent::class)
- fun listenStartOrchestrateEvent(transactionStartEvent: TransactionStartEvent): Mono {
- return transactionStartEvent.toOrchestrateEvent()
- .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
- .mapReifiedRequest()
- .flatMap { (request, event) ->
- holdRequestIfRollbackable(request, transactionStartEvent.transactionId)
- .map{ it to event }
+ override fun withAnnotated(): AbstractOrchestrateListener {
+ return when {
+ isFirst && isLast -> this.successWithCommit()
+ isFirst && !isLast -> this.successWithJoin()
+ else -> error("Cannot annotated")
+ }
+ }
+
+ private fun successWithJoin(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionStartListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_JOIN
+ )
+ fun handleTransactionStartEvent(transactionStartEvent: TransactionStartEvent): Mono {
+ return orchestrate(transactionStartEvent)
}
- .flatMap { (request, event) ->
- monoOrchestrateCommand.command(request, event.context)
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return monoOrchestrateCommand.command(request, event.context)
}
- .setNextCastableType()
- .onErrorRollback(
- transactionStartEvent.transactionId,
- transactionStartEvent.decodeEvent(OrchestrateEvent::class)
+ }
+ }
+
+ private fun successWithCommit(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionStartListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_COMMIT
)
- .toOrchestrateEvent()
- .flatMap {
- if (isLast) {
- return@flatMap transactionManager.commit(
- transactionId = transactionStartEvent.transactionId,
- event = it,
- )
- }
- transactionManager.join(
- transactionId = transactionStartEvent.transactionId,
- undo = "",
- event = it,
- )
+ fun handleTransactionStartEvent(transactionStartEvent: TransactionStartEvent): Mono {
+ return orchestrate(transactionStartEvent)
}
- .onErrorResume {
- if (it::class == AlreadyCommittedTransactionException::class) {
- return@onErrorResume Mono.empty()
- }
- throw it
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return monoOrchestrateCommand.command(request, event.context)
}
- .map { }
+ }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
index 95bff9d..5d698de 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/RollbackOrchestrateListener.kt
@@ -28,7 +28,7 @@ internal class RollbackOrchestrateListener(
@TransactionRollbackListener(OrchestrateEvent::class)
fun listenRollbackOrchestrateEvent(transactionRollbackEvent: TransactionRollbackEvent): Mono {
- return transactionRollbackEvent.toOrchestrateEvent()
+ return transactionRollbackEvent.startWithOrchestrateEvent()
.filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
.getHeldRequest(transactionRollbackEvent)
.map { (request, event) ->
diff --git a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
index 407e1c8..8ee6d9c 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/listen/StartOrchestrateListener.kt
@@ -8,14 +8,14 @@ import org.rooftop.netx.engine.ResultHolder
import reactor.core.publisher.Mono
internal class StartOrchestrateListener(
- codec: Codec,
+ private val codec: Codec,
private val transactionManager: TransactionManager,
private val orchestratorId: String,
orchestrateSequence: Int,
private val orchestrateCommand: OrchestrateCommand,
- requestHolder: RequestHolder,
- resultHolder: ResultHolder,
- typeReference: TypeReference?,
+ private val requestHolder: RequestHolder,
+ private val resultHolder: ResultHolder,
+ private val typeReference: TypeReference?,
) : AbstractOrchestrateListener(
orchestratorId,
orchestrateSequence,
@@ -26,43 +26,59 @@ internal class StartOrchestrateListener(
typeReference,
) {
- @TransactionStartListener(OrchestrateEvent::class)
- fun listenStartOrchestrateEvent(transactionStartEvent: TransactionStartEvent): Mono {
- return transactionStartEvent.toOrchestrateEvent()
- .filter { it.orchestratorId == orchestratorId && it.orchestrateSequence == orchestrateSequence }
- .mapReifiedRequest()
- .flatMap { (request, event) ->
- holdRequestIfRollbackable(request, transactionStartEvent.transactionId)
- .map { it to event }
+ override fun withAnnotated(): AbstractOrchestrateListener {
+ return when {
+ isFirst && isLast -> this.successWithCommit()
+ isFirst && !isLast -> this.successWithJoin()
+ else -> error("Cannot annotated")
+ }
+ }
+
+ private fun successWithJoin(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionStartListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_JOIN
+ )
+ fun handleTransactionStartEvent(transactionStartEvent: TransactionStartEvent): Mono {
+ return orchestrate(transactionStartEvent)
}
- .map { (request, event) ->
- orchestrateCommand.command(request, event.context)
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return Mono.fromCallable { orchestrateCommand.command(request, event.context) }
}
- .setNextCastableType()
- .onErrorRollback(
- transactionStartEvent.transactionId,
- transactionStartEvent.decodeEvent(OrchestrateEvent::class)
+ }
+ }
+
+ private fun successWithCommit(): AbstractOrchestrateListener {
+ return object : AbstractOrchestrateListener(
+ orchestratorId,
+ orchestrateSequence,
+ codec,
+ transactionManager,
+ requestHolder,
+ resultHolder,
+ typeReference,
+ ) {
+ @TransactionStartListener(
+ event = OrchestrateEvent::class,
+ successWith = SuccessWith.PUBLISH_COMMIT
)
- .toOrchestrateEvent()
- .flatMap {
- if (isLast) {
- return@flatMap transactionManager.commit(
- transactionId = transactionStartEvent.transactionId,
- event = it,
- )
- }
- transactionManager.join(
- transactionId = transactionStartEvent.transactionId,
- undo = "",
- event = it,
- )
+ fun handleTransactionStartEvent(transactionStartEvent: TransactionStartEvent): Mono {
+ return orchestrate(transactionStartEvent)
}
- .onErrorResume {
- if (it::class == AlreadyCommittedTransactionException::class) {
- return@onErrorResume Mono.empty()
- }
- throw it
+
+ override fun command(request: T, event: OrchestrateEvent): Mono> {
+ return Mono.fromCallable { orchestrateCommand.command(request, event.context) }
}
- .map { }
+ }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
index bb4a416..20753e4 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionDispatcher.kt
@@ -2,7 +2,7 @@ package org.rooftop.netx.redis
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.FailedAckTransactionException
-import org.rooftop.netx.api.TransactionException
+import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.AbstractTransactionDispatcher
import org.rooftop.netx.engine.core.Transaction
import org.rooftop.netx.meta.TransactionHandler
@@ -10,12 +10,13 @@ import org.springframework.context.ApplicationContext
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Mono
-class RedisStreamTransactionDispatcher(
+internal class RedisStreamTransactionDispatcher(
codec: Codec,
+ transactionManager: TransactionManager,
private val applicationContext: ApplicationContext,
private val reactiveRedisTemplate: ReactiveRedisTemplate,
private val nodeGroup: String,
-) : AbstractTransactionDispatcher(codec) {
+) : AbstractTransactionDispatcher(codec, transactionManager) {
override fun findHandlers(): List {
return applicationContext.getBeansWithAnnotation(TransactionHandler::class.java)
@@ -24,15 +25,6 @@ class RedisStreamTransactionDispatcher(
.toList()
}
- override fun findOwnUndo(transaction: Transaction): Mono {
- return reactiveRedisTemplate.opsForHash()[transaction.id, nodeGroup]
- .switchIfEmpty(
- Mono.error {
- throw TransactionException("Cannot find undo state in transaction hashes key \"${transaction.id}\"")
- }
- )
- }
-
override fun ack(transaction: Transaction, messageId: String): Mono> {
return reactiveRedisTemplate.opsForStream()
.acknowledge(STREAM_KEY, nodeGroup, messageId)
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
index ec9814d..71efe1e 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionListener.kt
@@ -17,7 +17,7 @@ import reactor.core.publisher.Mono
import kotlin.time.Duration.Companion.hours
import kotlin.time.toJavaDuration
-class RedisStreamTransactionListener(
+internal class RedisStreamTransactionListener(
backpressureSize: Int,
transactionDispatcher: AbstractTransactionDispatcher,
connectionFactory: ReactiveRedisConnectionFactory,
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
index e783ab4..c58a376 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisStreamTransactionManager.kt
@@ -11,7 +11,7 @@ import org.springframework.data.redis.connection.stream.Record
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Mono
-class RedisStreamTransactionManager(
+internal class RedisStreamTransactionManager(
codec: Codec,
nodeName: String,
transactionIdGenerator: TransactionIdGenerator,
@@ -36,32 +36,13 @@ class RedisStreamTransactionManager(
.map { TransactionState.valueOf(it) }
}
- /*
- --- 요구사항 ---
-
- 1. transaciton의 join마다, undo를 설정할 수 있어야함. -> 새로운 인터페이스를 만든다..?
- 2. 자신이 참여하지 않은 transaction을 수신하지 않아야함. -> 방법이 없다면.. transactionId에 참여중인 서버의 group을 알고있어야할듯
- --> silent drop undo가 없다면 참여하지 않은것.
- */
override fun publishTransaction(transactionId: String, transaction: Transaction): Mono {
- return Mono.fromCallable { hasUndo(transaction) }
- .flatMap {
- if (hasUndo(transaction)) {
- return@flatMap reactiveRedisTemplate.opsForHash()
- .putAll(
- transactionId, mapOf(
- STATE_KEY to transaction.state.name,
- nodeGroup to transaction.undo
- )
- )
- }
- reactiveRedisTemplate.opsForHash()
- .putAll(
- transactionId, mapOf(
- STATE_KEY to transaction.state.name,
- )
- )
- }
+ return reactiveRedisTemplate.opsForHash()
+ .putAll(
+ transactionId, mapOf(
+ STATE_KEY to transaction.state.name,
+ )
+ )
.map { objectMapper.writeValueAsString(transaction) }
.flatMap {
reactiveRedisTemplate.opsForStream()
@@ -73,9 +54,6 @@ class RedisStreamTransactionManager(
.map { transactionId }
}
- private fun hasUndo(transaction: Transaction): Boolean =
- transaction.state == TransactionState.JOIN || transaction.state == TransactionState.START
-
private companion object {
private const val DATA = "data"
private const val STREAM_KEY = "NETX_STREAM"
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
index e018e28..3950327 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
@@ -6,15 +6,11 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import org.rooftop.netx.api.TransactionManager
-import org.rooftop.netx.engine.JsonCodec
-import org.rooftop.netx.engine.RequestHolder
-import org.rooftop.netx.engine.ResultHolder
-import org.rooftop.netx.engine.TransactionIdGenerator
+import org.rooftop.netx.engine.*
import org.rooftop.netx.engine.core.Transaction
import org.rooftop.netx.engine.logging.LoggerFactory
import org.rooftop.netx.engine.logging.info
import org.rooftop.netx.engine.logging.logger
-import org.rooftop.netx.engine.OrchestratorFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.ApplicationContext
@@ -31,7 +27,7 @@ import org.springframework.data.redis.serializer.StringRedisSerializer
@Configuration
-class RedisTransactionConfigurer(
+internal class RedisTransactionConfigurer(
@Value("\${netx.host}") private val host: String,
@Value("\${netx.port}") private val port: String,
@Value("\${netx.password:0000}") private val password: String,
@@ -96,7 +92,7 @@ class RedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- internal fun redisResultHolder(): ResultHolder =
+ fun redisResultHolder(): ResultHolder =
RedisResultHolder(
poolSize,
jsonCodec(),
@@ -106,7 +102,7 @@ class RedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- internal fun redisRequestHolder(): RequestHolder =
+ fun redisRequestHolder(): RequestHolder =
RedisRequestHolder(
jsonCodec(),
reactiveRedisTemplate(),
@@ -142,19 +138,20 @@ class RedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
+ internal fun redisStreamTransactionDispatcher(): RedisStreamTransactionDispatcher =
RedisStreamTransactionDispatcher(
applicationContext = applicationContext,
reactiveRedisTemplate = transactionReactiveRedisTemplate(),
nodeGroup = nodeGroup,
codec = jsonCodec(),
+ transactionManager = redisStreamTransactionManager(),
).also {
info("RedisStreamTransactionDispatcher connect to host : \"$host\" port : \"$port\" nodeName : \"$nodeName\" nodeGroup : \"$nodeGroup\"")
}
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun transactionReactiveRedisTemplate(): ReactiveRedisTemplate {
+ internal fun transactionReactiveRedisTemplate(): ReactiveRedisTemplate {
val keySerializer = StringRedisSerializer()
val valueSerializer =
Jackson2JsonRedisSerializer(netxObjectMapper(), Transaction::class.java)
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
index 97639f6..5b762ff 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionRetrySupporter.kt
@@ -9,7 +9,7 @@ import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptio
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Flux
-class RedisTransactionRetrySupporter(
+internal class RedisTransactionRetrySupporter(
recoveryMilli: Long,
backpressureSize: Int,
transactionDispatcher: AbstractTransactionDispatcher,
diff --git a/src/test/java/org/rooftop/netx/javasupports/NetxJavaSupportsTest.java b/src/test/java/org/rooftop/netx/javasupports/NetxJavaSupportsTest.java
index 4360872..b6dea63 100644
--- a/src/test/java/org/rooftop/netx/javasupports/NetxJavaSupportsTest.java
+++ b/src/test/java/org/rooftop/netx/javasupports/NetxJavaSupportsTest.java
@@ -9,7 +9,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.rooftop.netx.api.Orchestrator;
import org.rooftop.netx.api.TransactionManager;
-import org.rooftop.netx.engine.core.TransactionState;
import org.rooftop.netx.meta.EnableDistributedTransaction;
import org.rooftop.netx.redis.RedisContainer;
import org.springframework.beans.factory.annotation.Autowired;
@@ -29,8 +28,6 @@
@TestPropertySource("classpath:fast-recover-mode.properties")
class NetxJavaSupportsTest {
- private static final Undo NEGATIVE_UNDO = new Undo(-1L);
- private static final Undo POSITIVE_UNDO = new Undo(1L);
private static final Event NEGATIVE_EVENT = new Event(-1L);
private static final Event POSITIVE_EVENT = new Event(1L);
@@ -51,30 +48,26 @@ void clear() {
@Test
@DisplayName("Scenario-1. Start -> Join -> Commit")
void Scenario1_Start_Join_Commit() {
- String transactionId = transactionManager.syncStart(NEGATIVE_UNDO, NEGATIVE_EVENT);
- transactionManager.syncJoin(transactionId, NEGATIVE_UNDO, NEGATIVE_EVENT);
- transactionManager.syncCommit(transactionId);
+ String transactionId = transactionManager.syncStart(POSITIVE_EVENT);
Awaitility.waitAtMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
- transactionEventListeners.assertTransactionCount(TransactionState.START, 1);
- transactionEventListeners.assertTransactionCount(TransactionState.JOIN, 1);
- transactionEventListeners.assertTransactionCount(TransactionState.COMMIT, 1);
+ transactionEventListeners.assertTransactionCount("START", 1);
+ transactionEventListeners.assertTransactionCount("JOIN", 1);
+ transactionEventListeners.assertTransactionCount("COMMIT", 1);
});
}
@Test
@DisplayName("Scenario-2. Start -> Join -> Rollback")
void Transaction_Start_Join_Rollback() {
- String transactionId = transactionManager.syncStart(POSITIVE_UNDO, POSITIVE_EVENT);
- transactionManager.syncJoin(transactionId, POSITIVE_UNDO, POSITIVE_EVENT);
- transactionManager.syncRollback(transactionId, "Scenario-2. Start -> Join -> Rollback");
+ String transactionId = transactionManager.syncStart(NEGATIVE_EVENT);
Awaitility.waitAtMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
- transactionEventListeners.assertTransactionCount(TransactionState.START, 1);
- transactionEventListeners.assertTransactionCount(TransactionState.JOIN, 1);
- transactionEventListeners.assertTransactionCount(TransactionState.ROLLBACK, 1);
+ transactionEventListeners.assertTransactionCount("START", 1);
+ transactionEventListeners.assertTransactionCount("JOIN", 1);
+ transactionEventListeners.assertTransactionCount("ROLLBACK", 1);
});
}
diff --git a/src/test/java/org/rooftop/netx/javasupports/TransactionClient.java b/src/test/java/org/rooftop/netx/javasupports/TransactionClient.java
deleted file mode 100644
index 37b4c0c..0000000
--- a/src/test/java/org/rooftop/netx/javasupports/TransactionClient.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.rooftop.netx.javasupports;
-
-import org.rooftop.netx.api.TransactionManager;
-import org.springframework.boot.test.context.TestComponent;
-
-@TestComponent
-public class TransactionClient {
-
- private final TransactionManager transactionManager;
-
- public TransactionClient(TransactionManager transactionManager) {
- this.transactionManager = transactionManager;
- }
-
- public String startTransaction(Undo undo, Event event) {
- return transactionManager.syncStart(undo, event);
- }
-
- public void joinTransaction(String transactionId, Undo undo, Event event) {
- transactionManager.syncJoin(transactionId, undo, event);
- }
-
- public void commitTransaction(String transactionId) {
- transactionManager.syncCommit(transactionId);
- }
-
- public void rollbackTransaction(String transactionId, String cause) {
- transactionManager.syncRollback(transactionId, cause);
- }
-}
diff --git a/src/test/java/org/rooftop/netx/javasupports/TransactionEventListeners.java b/src/test/java/org/rooftop/netx/javasupports/TransactionEventListeners.java
index f11caa5..5bd7950 100644
--- a/src/test/java/org/rooftop/netx/javasupports/TransactionEventListeners.java
+++ b/src/test/java/org/rooftop/netx/javasupports/TransactionEventListeners.java
@@ -3,7 +3,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.assertj.core.api.Assertions;
-import org.rooftop.netx.api.DecodeException;
+import org.rooftop.netx.api.SuccessWith;
import org.rooftop.netx.api.TransactionCommitEvent;
import org.rooftop.netx.api.TransactionCommitListener;
import org.rooftop.netx.api.TransactionJoinEvent;
@@ -12,43 +12,42 @@
import org.rooftop.netx.api.TransactionRollbackListener;
import org.rooftop.netx.api.TransactionStartEvent;
import org.rooftop.netx.api.TransactionStartListener;
-import org.rooftop.netx.engine.core.TransactionState;
import org.rooftop.netx.meta.TransactionHandler;
import reactor.core.publisher.Mono;
@TransactionHandler
public class TransactionEventListeners {
- private final Map receivedTransactions = new ConcurrentHashMap<>();
+ private final Map receivedTransactions = new ConcurrentHashMap<>();
public void clear() {
receivedTransactions.clear();
}
- public void assertTransactionCount(TransactionState transactionState, int count) {
+ public void assertTransactionCount(String transactionState, int count) {
Assertions.assertThat(receivedTransactions.getOrDefault(transactionState, 0))
.isEqualTo(count);
}
@TransactionStartListener(
event = Event.class,
- noRetryFor = IllegalArgumentException.class
+ noRollbackFor = IllegalArgumentException.class,
+ successWith = SuccessWith.PUBLISH_JOIN
)
public void listenTransactionStartEvent(TransactionStartEvent transactionStartEvent) {
- incrementTransaction(TransactionState.START);
+ incrementTransaction("START");
Event event = transactionStartEvent.decodeEvent(Event.class);
- if (event.event() < 0) {
- throw new IllegalArgumentException();
- }
+ transactionStartEvent.setNextEvent(event);
}
@TransactionJoinListener(
event = Event.class,
- noRetryFor = IllegalArgumentException.class
+ successWith = SuccessWith.PUBLISH_COMMIT
)
public void listenTransactionJoinEvent(TransactionJoinEvent transactionJoinEvent) {
- incrementTransaction(TransactionState.JOIN);
+ incrementTransaction("JOIN");
Event event = transactionJoinEvent.decodeEvent(Event.class);
+ transactionJoinEvent.setNextEvent(event);
if (event.event() < 0) {
throw new IllegalArgumentException();
}
@@ -56,18 +55,18 @@ public void listenTransactionJoinEvent(TransactionJoinEvent transactionJoinEvent
@TransactionCommitListener
public Mono listenTransactionCommitEvent(TransactionCommitEvent transactionCommitEvent) {
- incrementTransaction(TransactionState.COMMIT);
+ incrementTransaction("COMMIT");
return Mono.just(1L);
}
- @TransactionRollbackListener(noRetryFor = DecodeException.class)
+ @TransactionRollbackListener(event = Event.class)
public String listenTransactionRollbackEvent(TransactionRollbackEvent transactionRollbackEvent) {
- incrementTransaction(TransactionState.ROLLBACK);
- transactionRollbackEvent.decodeUndo(Undo.class);
+ incrementTransaction("ROLLBACK");
+ transactionRollbackEvent.decodeEvent(Event.class);
return "listenTransactionRollbackEvent";
}
- private void incrementTransaction(TransactionState transactionState) {
+ private void incrementTransaction(String transactionState) {
receivedTransactions.put(transactionState,
receivedTransactions.getOrDefault(transactionState, 0) + 1);
}
diff --git a/src/test/java/org/rooftop/netx/javasupports/Undo.java b/src/test/java/org/rooftop/netx/javasupports/Undo.java
deleted file mode 100644
index 64f585b..0000000
--- a/src/test/java/org/rooftop/netx/javasupports/Undo.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package org.rooftop.netx.javasupports;
-
-public record Undo(
- Long undo
-) {
-
-}
diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxClient.kt b/src/test/kotlin/org/rooftop/netx/client/NetxClient.kt
deleted file mode 100644
index 691875a..0000000
--- a/src/test/kotlin/org/rooftop/netx/client/NetxClient.kt
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.rooftop.netx.client
-
-import org.rooftop.netx.api.TransactionManager
-import org.springframework.stereotype.Service
-
-@Service
-class NetxClient(
- private val transactionManager: TransactionManager,
-) {
-
- fun startTransaction(undo: String): String {
- return transactionManager.start(undo).block()!!
- }
-
- fun rollbackTransaction(transactionId: String, cause: String): String {
- return transactionManager.rollback(transactionId, cause).block()!!
- }
-
- fun joinTransaction(transactionId: String, undo: String): String {
- return transactionManager.join(transactionId, undo).block()!!
- }
-
- fun commitTransaction(transactionId: String): String {
- return transactionManager.commit(transactionId).block()!!
- }
-}
diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
index 8ea98d2..388adac 100644
--- a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
@@ -5,9 +5,11 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.FunSpec
import io.kotest.data.forAll
import io.kotest.data.row
+import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.TestPropertySource
import kotlin.time.Duration.Companion.minutes
@DisplayName("Netx 부하테스트")
@@ -15,13 +17,13 @@ import kotlin.time.Duration.Companion.minutes
classes = [
RedisContainer::class,
LoadRunner::class,
- NetxClient::class,
TransactionReceiveStorage::class,
]
)
@EnableDistributedTransaction
+@TestPropertySource("classpath:fast-recover-mode.properties")
internal class NetxLoadTest(
- private val netxClient: NetxClient,
+ private val transactionManager: TransactionManager,
private val loadRunner: LoadRunner,
private val transactionReceiveStorage: TransactionReceiveStorage,
) : FunSpec({
@@ -37,24 +39,27 @@ internal class NetxLoadTest(
transactionReceiveStorage.clear()
loadRunner.load(commitLoadCount) {
- val transactionId = netxClient.startTransaction("-")
- netxClient.joinTransaction(transactionId, "-")
- netxClient.commitTransaction(transactionId)
+ transactionManager.start(LoadTestEvent(NO_ROLLBACK)).block()!!
}
loadRunner.load(rollbackLoadCount) {
- val transactionId = netxClient.startTransaction("-")
- netxClient.joinTransaction(transactionId, "-")
- netxClient.rollbackTransaction(transactionId, "")
+ transactionManager.start(LoadTestEvent(ROLLBACK)).block()!!
}
eventually(3.minutes) {
transactionReceiveStorage.startCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.joinCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
- transactionReceiveStorage.commitCountShouldBeGreaterThanOrEqual(commitLoadCount)
+ transactionReceiveStorage.commitCountShouldBeGreaterThanOrEqual(commitLoadCount + rollbackLoadCount)
transactionReceiveStorage.rollbackCountShouldBeGreaterThanOrEqual(rollbackLoadCount)
}
}
}
-})
+}) {
+ data class LoadTestEvent(val load: String)
+
+ private companion object {
+ private const val ROLLBACK = "-"
+ private const val NO_ROLLBACK = "+"
+ }
+}
diff --git a/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt b/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
index 172b988..d62843c 100644
--- a/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
+++ b/src/test/kotlin/org/rooftop/netx/client/TransactionReceiveStorage.kt
@@ -33,27 +33,44 @@ class TransactionReceiveStorage(
(storage["ROLLBACK"]?.size ?: 0) shouldBeGreaterThanOrEqual count
}
- @TransactionRollbackListener
- fun logRollback(transaction: TransactionRollbackEvent): Mono {
- return Mono.fromCallable { log("ROLLBACK", transaction) }
+ @TransactionStartListener(
+ event = NetxLoadTest.LoadTestEvent::class,
+ successWith = SuccessWith.PUBLISH_JOIN
+ )
+ fun listenStart(transaction: TransactionStartEvent): Mono {
+ return Mono.fromCallable { saveTransaction("START", transaction) }
+ .map { transaction.decodeEvent(NetxLoadTest.LoadTestEvent::class) }
+ .map { transaction.setNextEvent(it) }
}
- @TransactionStartListener
- fun logStart(transaction: TransactionStartEvent): Mono {
- return Mono.fromCallable { log("START", transaction) }
+ @TransactionJoinListener(
+ event = NetxLoadTest.LoadTestEvent::class,
+ successWith = SuccessWith.PUBLISH_COMMIT
+ )
+ fun listenJoin(transaction: TransactionJoinEvent): Mono {
+ return Mono.fromCallable { saveTransaction("JOIN", transaction) }
+ .map { transaction.decodeEvent(NetxLoadTest.LoadTestEvent::class) }
+ .map { transaction.setNextEvent(it) }
}
- @TransactionJoinListener
- fun logJoin(transaction: TransactionJoinEvent): Mono {
- return Mono.fromCallable { log("JOIN", transaction) }
+ @TransactionRollbackListener(event = NetxLoadTest.LoadTestEvent::class)
+ fun listenRollback(transaction: TransactionRollbackEvent): Mono {
+ return Mono.fromCallable { saveTransaction("ROLLBACK", transaction) }
}
- @TransactionCommitListener
- fun logCommit(transaction: TransactionCommitEvent): Mono {
- return Mono.fromCallable { log("COMMIT", transaction) }
+ @TransactionCommitListener(event = NetxLoadTest.LoadTestEvent::class)
+ fun listenCommit(transaction: TransactionCommitEvent): Mono