Skip to content

Commit

Permalink
perf: Asynchronus pooling to ResultHolder (#85)
Browse files Browse the repository at this point in the history
* refactor: TransactionManager undo, event로 null을 허용하지 않는다

* fix: Support java8 Time

* perf: Re-use redis pub/sub connection

* "docs: Netx-version 0.3.1"

* refactor: pub/sub receive to receiveLater

* perf: ResultHolder pooling Asynchronus

* refactor: Default pool size 100 -> 10
  • Loading branch information
devxb authored Mar 15, 2024
1 parent 3e8d4be commit 80e8758
Show file tree
Hide file tree
Showing 19 changed files with 207 additions and 101 deletions.
28 changes: 15 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<br>

![version 0.3.0](https://img.shields.io/badge/version-0.3.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![version 0.3.1](https://img.shields.io/badge/version-0.3.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)

Saga pattern 으로 구현된 분산 트랜잭션 프레임워크 입니다.
Expand Down Expand Up @@ -41,18 +41,20 @@ class Application {

#### Properties

| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
|-------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
|------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|
| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
| **netx.password** | 0000 | 트랜잭션 관리에 사용할 메시지큐에 접속하는데 사용하는 password 입니다. 설정하지 않을시 비밀번호를 사용하지 않습니다. | |
| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
| **netx.pool-size** | 40 | 커넥션을 계속해서 맺어야할때, 최대 커넥션 수를 조절하는데 사용됩니다. | 10 |

### Usage example

Expand Down
5 changes: 4 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ kotlin.code.style=official

### Project ###
group=org.rooftop.netx
version=0.3.0
version=0.3.1
compatibility=17

### Sonarcloud ###
Expand Down Expand Up @@ -38,3 +38,6 @@ jacksonVersion=2.16.1

### Awaitility ###
awaitilityVersion=3.0.0

### Pooling ###
pooling=1.0.5
2 changes: 2 additions & 0 deletions gradle/core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-parameter-names:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${jacksonVersion}"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
implementation "io.projectreactor.addons:reactor-pool:${pooling}"
}
2 changes: 1 addition & 1 deletion src/main/kotlin/org/rooftop/netx/api/Codec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlin.reflect.KClass

interface Codec {

fun <T> encode(data: T): String
fun <T: Any> encode(data: T): String

fun <T : Any> decode(data: String, type: KClass<T>): T
}
2 changes: 1 addition & 1 deletion src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.rooftop.netx.api

import kotlin.reflect.KClass

class OrchestrateResult(
data class OrchestrateResult(
val isSuccess: Boolean,
private val codec: Codec,
private val result: String
Expand Down
24 changes: 12 additions & 12 deletions src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,40 @@ import reactor.core.publisher.Mono

interface TransactionManager {

fun <T> start(undo: T): Mono<String>
fun <T : Any> start(undo: T): Mono<String>

fun <T, S> start(undo: T, event: S): Mono<String>
fun <T : Any, S : Any> start(undo: T, event: S): Mono<String>

fun <T> syncStart(undo: T): String
fun <T : Any> syncStart(undo: T): String

fun <T, S> syncStart(undo: T, event: S): String
fun <T : Any, S : Any> syncStart(undo: T, event: S): String

fun <T> join(transactionId: String, undo: T): Mono<String>
fun <T : Any> join(transactionId: String, undo: T): Mono<String>

fun <T, S> join(transactionId: String, undo: T, event: S): Mono<String>
fun <T : Any, S : Any> join(transactionId: String, undo: T, event: S): Mono<String>

fun <T> syncJoin(transactionId: String, undo: T): String
fun <T : Any> syncJoin(transactionId: String, undo: T): String

fun <T, S> syncJoin(transactionId: String, undo: T, event: S): String
fun <T : Any, S : Any> syncJoin(transactionId: String, undo: T, event: S): String

fun exists(transactionId: String): Mono<String>

fun syncExists(transactionId: String): String

fun commit(transactionId: String): Mono<String>

fun <T> commit(transactionId: String, event: T): Mono<String>
fun <T : Any> commit(transactionId: String, event: T): Mono<String>

fun syncCommit(transactionId: String): String

fun <T> syncCommit(transactionId: String, event: T): String
fun <T : Any> syncCommit(transactionId: String, event: T): String

fun rollback(transactionId: String, cause: String): Mono<String>

fun <T> rollback(transactionId: String, cause: String, event: T): Mono<String>
fun <T : Any> rollback(transactionId: String, cause: String, event: T): Mono<String>

fun syncRollback(transactionId: String, cause: String): String

fun <T> syncRollback(transactionId: String, cause: String, event: T): String
fun <T : Any> syncRollback(transactionId: String, cause: String, event: T): String

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ abstract class AbstractTransactionManager(
private val transactionIdGenerator: TransactionIdGenerator,
) : TransactionManager {

final override fun <T> syncStart(undo: T): String {
final override fun <T : Any> syncStart(undo: T): String {
return start(undo).block()
?: throw TransactionException("Cannot start transaction \"$undo\"")
}

override fun <T, S> syncStart(undo: T, event: S): String {
override fun <T : Any, S : Any> syncStart(undo: T, event: S): String {
return start(undo, event).block()
?: throw TransactionException("Cannot start transaction \"$undo\" \"$event\"")
}

final override fun <T> syncJoin(transactionId: String, undo: T): String {
final override fun <T : Any> syncJoin(transactionId: String, undo: T): String {
return join(transactionId, undo).block()
?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\"")
}

override fun <T, S> syncJoin(transactionId: String, undo: T, event: S): String {
override fun <T : Any, S : Any> syncJoin(transactionId: String, undo: T, event: S): String {
return join(transactionId, undo, event).block()
?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\", \"$event\"")
}
Expand All @@ -48,7 +48,7 @@ abstract class AbstractTransactionManager(
?: throw TransactionException("Cannot commit transaction \"$transactionId\"")
}

override fun <T> syncCommit(transactionId: String, event: T): String {
override fun <T : Any> syncCommit(transactionId: String, event: T): String {
return commit(transactionId, event).block()
?: throw TransactionException("Cannot commit transaction \"$transactionId\" \"$event\"")
}
Expand All @@ -58,12 +58,12 @@ abstract class AbstractTransactionManager(
?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\"")
}

override fun <T> syncRollback(transactionId: String, cause: String, event: T): String {
override fun <T : Any> syncRollback(transactionId: String, cause: String, event: T): String {
return rollback(transactionId, cause, event).block()
?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"")
}

final override fun <T> start(undo: T): Mono<String> {
final override fun <T : Any> start(undo: T): Mono<String> {
return Mono.fromCallable { codec.encode(undo) }
.flatMap { encodedUndo ->
startTransaction(encodedUndo, null)
Expand All @@ -72,7 +72,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}

override fun <T, S> start(undo: T, event: S): Mono<String> {
override fun <T : Any, S : Any> start(undo: T, event: S): Mono<String> {
return Mono.fromCallable { codec.encode(undo) }
.map { it to codec.encode(event) }
.flatMap { (encodedUndo, encodedEvent) ->
Expand All @@ -98,7 +98,7 @@ abstract class AbstractTransactionManager(
}
}

override fun <T> join(transactionId: String, undo: T): Mono<String> {
override fun <T : Any> join(transactionId: String, undo: T): Mono<String> {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
Expand All @@ -114,7 +114,7 @@ abstract class AbstractTransactionManager(
}
}

override fun <T, S> join(transactionId: String, undo: T, event: S): Mono<String> {
override fun <T : Any, S : Any> join(transactionId: String, undo: T, event: S): Mono<String> {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
Expand Down Expand Up @@ -153,7 +153,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

override fun <T> rollback(transactionId: String, cause: String, event: T): Mono<String> {
override fun <T : Any> rollback(transactionId: String, cause: String, event: T): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
Expand Down Expand Up @@ -189,7 +189,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}

override fun <T> commit(transactionId: String, event: T): Mono<String> {
override fun <T : Any> commit(transactionId: String, event: T): Mono<String> {
return exists(transactionId)
.infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
Expand Down
17 changes: 5 additions & 12 deletions src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
package org.rooftop.netx.engine

import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.DecodeException
import org.rooftop.netx.api.EncodeException
import kotlin.reflect.KClass

class JsonCodec : Codec {
class JsonCodec(
private val objectMapper: ObjectMapper,
) : Codec {

private val objectMapper: ObjectMapper by lazy {
ObjectMapper().registerModule(ParameterNamesModule(JsonCreator.Mode.PROPERTIES))
.registerModule(KotlinModule.Builder().build())
}

override fun <T> encode(data: T): String {
requireNotNull(data) { "Data cannot be null" }
require(data!!::class != Any::class) { "Data cannot be Any" }
override fun <T : Any> encode(data: T): String {
require(data::class != Any::class) { "Data cannot be Any" }
return runCatching { objectMapper.writeValueAsString(data) }
.getOrElse {
throw EncodeException("Cannot encode \"${data}\" to \"${String::class}\"", it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ interface OrchestrateResultHolder {

fun getResult(timeout: Duration, transactionId: String): Mono<OrchestrateResult>

fun <T> setResult(transactionId: String, state: TransactionState, result: T): Mono<T>
fun <T: Any> setResult(transactionId: String, state: TransactionState, result: T): Mono<T>
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class OrchestratorManager<T : Any>(
}

override fun transaction(request: T): Mono<OrchestrateResult> {
return transaction(ONE_MINUTES_TO_TIME_OUT, request)
return transaction(TEN_SECONDS_TO_TIME_OUT, request)
}

override fun transaction(timeoutMillis: Long, request: T): Mono<OrchestrateResult> {
Expand All @@ -39,6 +39,6 @@ class OrchestratorManager<T : Any>(

private companion object {
private const val UNDO = "Orchestrate mode";
private const val ONE_MINUTES_TO_TIME_OUT = 60000L
private const val TEN_SECONDS_TO_TIME_OUT = 10000L
}
}
Loading

0 comments on commit 80e8758

Please sign in to comment.