Skip to content

Commit

Permalink
refactor: Rollback when an exception is thrown in the listener (#116)
Browse files Browse the repository at this point in the history
* refactor: TransactionHandler가 종료될때, 자동으로 commit, join, rollback event가 발행되도록 수정한다.

* refactor: remove unused import and mutablelist to list

* refactor: Remove duplicated codes
  • Loading branch information
devxb authored Mar 29, 2024
1 parent 3ff643e commit 792d3b6
Show file tree
Hide file tree
Showing 65 changed files with 951 additions and 1,114 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<br>

![version 0.3.6](https://img.shields.io/badge/version-0.3.6-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.3.7](https://img.shields.io/badge/version-0.3.7-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Redis-Stream을 지원하는 Saga frame work 입니다.
Expand Down Expand Up @@ -228,8 +228,9 @@ fun exists(param: Any): Mono<Any> {
#### Events-Scenario4. Handle transaction event

다른 분산서버가 (혹은 자기자신이) transactionManager를 통해서 트랜잭션을 시작하거나 트랜잭션 상태를 변경했을때, 트랜잭션 상태에 맞는 핸들러를 호출합니다.
이 핸들러를 구현함으로써, 트랜잭션별 상태를 처리할 수 있습니다. (롤백등)
_롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합니다._
이 핸들러를 구현함으로써, 트랜잭션 상태별 로직을 구현할 수 있습니다.
각 핸들러에서 에러가 던져지면, 자동으로 rollback 이 호출됩니다.

> [!WARNING]
> 트랜잭션 핸들러는 반드시 핸들러에 맞는 `TransactionEvent` **하나**만을 파라미터로 받아야 합니다.
Expand All @@ -238,27 +239,28 @@ _롤백은 TransactionRollbackEvent로 전달되는 `undo` 필드를 사용합
@TransactionHandler
class TransactionHandler {

@TransactionStartListener(Foo::class) // Receive transaction event when event can be mapped to Foo.class
@TransactionStartListener(event = Foo::class) // Receive transaction event when event can be mapped to Foo.class
fun handleTransactionStartEvent(event: TransactionStartEvent) {
val foo: Foo = event.decodeEvent(Foo::class) // Get event field to Foo.class
// ...
event.setNextEvent(nextFoo) // When this handler terminates and calls the next event or rollback, the event set here is published together.
}

@TransactionJoinHandler // Receive all transaction event when no type is defined.
@TransactionJoinHandler(successWith = SuccessWith.PUBLISH_COMMIT) // Receive all transaction event when no type is defined. And, when terminated this function, publish commit state
fun handleTransactionJoinEvent(event: TransactionJoinEvent) {
// ...
}

@TransactionCommitHandler(
event = Foo::class,
noRetryFor = [IllegalArgumentException::class] // Dont retry when throw IllegalArgumentException. *Retry if throw Throwable or IllegalArgumentException's super type*
noRollbackFor = [IllegalArgumentException::class] // Dont rollback when throw IllegalArgumentException. *Rollback if throw Throwable or IllegalArgumentException's super type*
)
fun handleTransactionCommitEvent(event: TransactionCommitEvent): Mono<String> { // In Webflux framework, publisher must be returned.
throw IllegalArgumentException("Ignore this exception")
// ...
}

@TransactionRollbackHandler
@TransactionRollbackHandler(Foo::class)
fun handleTransactionRollbackEvent(event: TransactionRollbackEvent) { // In Mvc framework, publisher must not returned.
val undo: Foo = event.decodeUndo(Foo::class) // Get event field to Foo.class
}
Expand Down
4 changes: 1 addition & 3 deletions src/main/kotlin/org/rooftop/netx/api/Exceptions.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.rooftop.netx.api

import org.rooftop.netx.engine.core.TransactionState

class EncodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)

class DecodeException(message: String, throwable: Throwable) : RuntimeException(message, throwable)

open class TransactionException(message: String) : RuntimeException(message)

class AlreadyCommittedTransactionException(transactionId: String, state: TransactionState) :
class AlreadyCommittedTransactionException(transactionId: String, state: String) :
TransactionException("Cannot join transaction cause, transaction \"$transactionId\" already \"$state\"")

class NotFoundDispatchFunctionException(message: String) : RuntimeException(message)
Expand Down
4 changes: 3 additions & 1 deletion src/main/kotlin/org/rooftop/netx/api/Result.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ class Result<T : Any> private constructor(
private val error: Error? = null,
) {

fun decodeResultOrThrow(typeReference: TypeReference<T>): T = decodeResult(typeReference)

fun decodeResultOrThrow(type: Class<T>): T = decodeResultOrThrow(type.kotlin)

fun decodeResultOrThrow(type: KClass<T>): T {
Expand All @@ -18,7 +20,7 @@ class Result<T : Any> private constructor(
return decodeResult(type)
}

fun decodeResult(typeReference: TypeReference<T>): T = result?.let {
fun decodeResult(typeReference: TypeReference<T>): T = result?.let {
codec.decode(it, typeReference)
} ?: throw ResultException("Cannot decode result cause Result is fail state")

Expand Down
7 changes: 7 additions & 0 deletions src/main/kotlin/org/rooftop/netx/api/SuccessWith.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.rooftop.netx.api

enum class SuccessWith {
PUBLISH_JOIN,
PUBLISH_COMMIT,
END,
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.rooftop.netx.api

class TransactionCommitEvent(
class TransactionCommitEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
): TransactionEvent(transactionId, nodeName, group, event, codec)
): TransactionEvent(transactionId, nodeName, group, event, codec) {

override fun copy(): TransactionEvent =
TransactionJoinEvent(transactionId, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionCommitListener(
val event: KClass<*> = Any::class,
val noRetryFor: Array<KClass<out Throwable>> = [],
val noRollbackFor: Array<KClass<out Throwable>> = [],
)
12 changes: 10 additions & 2 deletions src/main/kotlin/org/rooftop/netx/api/TransactionEvent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@ sealed class TransactionEvent(
val transactionId: String,
val nodeName: String,
val group: String,
private val event: String?,
private val codec: Codec,
internal val event: String?,
internal val codec: Codec,
internal var nextEvent: Any? = null,
) {

fun <T : Any> setNextEvent(event: T): T {
this.nextEvent = event
return event
}

fun <T : Any> decodeEvent(type: Class<T>): T = decodeEvent(type.kotlin)

fun <T : Any> decodeEvent(type: KClass<T>): T =
codec.decode(
event ?: throw NullPointerException("Cannot decode event cause event is null"),
type
)

internal abstract fun copy(): TransactionEvent
}
8 changes: 6 additions & 2 deletions src/main/kotlin/org/rooftop/netx/api/TransactionJoinEvent.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.rooftop.netx.api

class TransactionJoinEvent(
class TransactionJoinEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
): TransactionEvent(transactionId, nodeName, group, event, codec)
) : TransactionEvent(transactionId, nodeName, group, event, codec) {

override fun copy(): TransactionEvent =
TransactionJoinEvent(transactionId, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionJoinListener(
val event: KClass<*> = Any::class,
val noRetryFor: Array<KClass<out Throwable>> = [],
val noRollbackFor: Array<KClass<out Throwable>> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
)
16 changes: 8 additions & 8 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import reactor.core.publisher.Mono

interface TransactionManager {

fun <T : Any> start(undo: T): Mono<String>
fun start(): Mono<String>

fun <T : Any, S : Any> start(undo: T, event: S): Mono<String>
fun <T : Any> start(event: T): Mono<String>

fun <T : Any> syncStart(undo: T): String
fun syncStart(): String

fun <T : Any, S : Any> syncStart(undo: T, event: S): String
fun <T : Any> syncStart(event: T): String

fun <T : Any> join(transactionId: String, undo: T): Mono<String>
fun join(transactionId: String): Mono<String>

fun <T : Any, S : Any> join(transactionId: String, undo: T, event: S): Mono<String>
fun <T : Any> join(transactionId: String, event: T): Mono<String>

fun <T : Any> syncJoin(transactionId: String, undo: T): String
fun syncJoin(transactionId: String): String

fun <T : Any, S : Any> syncJoin(transactionId: String, undo: T, event: S): String
fun <T : Any> syncJoin(transactionId: String, event: T): String

fun exists(transactionId: String): Mono<String>

Expand Down
12 changes: 4 additions & 8 deletions src/main/kotlin/org/rooftop/netx/api/TransactionRollbackEvent.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package org.rooftop.netx.api

import kotlin.reflect.KClass

class TransactionRollbackEvent(
class TransactionRollbackEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
val cause: String,
private val undo: String,
private val codec: Codec,
codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec) {

fun <T : Any> decodeUndo(type: Class<T>): T = decodeUndo(type.kotlin)

fun <T : Any> decodeUndo(type: KClass<T>): T = codec.decode(undo, type)
override fun copy(): TransactionEvent =
TransactionJoinEvent(transactionId, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionRollbackListener(
val event: KClass<*> = Any::class,
val noRetryFor: Array<KClass<out Throwable>> = [],
)
8 changes: 6 additions & 2 deletions src/main/kotlin/org/rooftop/netx/api/TransactionStartEvent.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.rooftop.netx.api

class TransactionStartEvent(
class TransactionStartEvent internal constructor(
transactionId: String,
nodeName: String,
group: String,
event: String?,
codec: Codec,
) : TransactionEvent(transactionId, nodeName, group, event, codec)
) : TransactionEvent(transactionId, nodeName, group, event, codec) {

override fun copy(): TransactionEvent =
TransactionJoinEvent(transactionId, nodeName, group, event, codec)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ import kotlin.reflect.KClass
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionStartListener(
val event: KClass<*> = Any::class,
val noRetryFor: Array<KClass<out Throwable>> = [],
val noRollbackFor: Array<KClass<out Throwable>> = [],
val successWith: SuccessWith = SuccessWith.PUBLISH_JOIN,
)
Original file line number Diff line number Diff line change
@@ -1,20 +1,69 @@
package org.rooftop.netx.engine

import org.rooftop.netx.api.TransactionEvent
import org.rooftop.netx.api.TransactionManager
import reactor.core.scheduler.Schedulers
import kotlin.reflect.KClass
import kotlin.reflect.KFunction

internal sealed class AbstractDispatchFunction<T>(
protected val eventType: KClass<*>,
private val eventType: KClass<*>,
protected val function: KFunction<T>,
protected val handler: Any,
private val noRetryFor: Array<KClass<out Throwable>>,
private val noRollbackFor: Array<KClass<out Throwable>>,
private val nextState: NextTransactionState,
private val transactionManager: TransactionManager,
) {
fun name(): String = function.name

abstract fun call(transactionEvent: TransactionEvent): T

protected fun isNoRetryFor(throwable: Throwable): Boolean {
return noRetryFor.isNotEmpty() && throwable.cause != null && noRetryFor.contains(throwable.cause!!::class)
protected fun isNoRollbackFor(throwable: Throwable): Boolean {
return noRollbackFor.isNotEmpty() && throwable.cause != null && noRollbackFor.contains(
throwable.cause!!::class
)
}

protected fun isProcessable(transactionEvent: TransactionEvent): Boolean {
return runCatching {
transactionEvent.decodeEvent(eventType)
}.onFailure {
return it is NullPointerException && eventType == Any::class
}.isSuccess
}

protected fun rollback(transactionEvent: TransactionEvent, throwable: Throwable) {
transactionEvent.nextEvent?.let {
transactionManager.rollback(transactionEvent.transactionId, throwable.getCause(), it)
.subscribeOn(Schedulers.parallel())
.subscribe()
} ?: transactionManager.rollback(transactionEvent.transactionId, throwable.getCause())
.subscribeOn(Schedulers.parallel())
.subscribe()
}

protected fun publishNextTransaction(transactionEvent: TransactionEvent) {
when (nextState) {
NextTransactionState.JOIN -> transactionEvent.nextEvent?.let {
transactionManager.join(transactionEvent.transactionId, it)
} ?: transactionManager.join(transactionEvent.transactionId)

NextTransactionState.COMMIT -> transactionEvent.nextEvent?.let {
transactionManager.commit(transactionEvent.transactionId, it)
} ?: transactionManager.commit(transactionEvent.transactionId)

NextTransactionState.END -> return
}.subscribeOn(Schedulers.parallel())
.subscribe()
}

private fun Throwable.getCause(): String {
return this.message ?: this.cause?.message ?: this::class.java.name
}

internal enum class NextTransactionState {
JOIN,
COMMIT,
END
}
}
Loading

0 comments on commit 792d3b6

Please sign in to comment.