From 80e8758fa6d22744ea95afec2743351de41fcce3 Mon Sep 17 00:00:00 2001
From: xb205 <62425964+devxb@users.noreply.github.com>
Date: Sat, 16 Mar 2024 01:22:02 +0900
Subject: [PATCH] perf: Asynchronus pooling to ResultHolder (#85)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* refactor: TransactionManager undo, event로 null을 허용하지 않는다
* fix: Support java8 Time
* perf: Re-use redis pub/sub connection
* "docs: Netx-version 0.3.1"
* refactor: pub/sub receive to receiveLater
* perf: ResultHolder pooling Asynchronus
* refactor: Default pool size 100 -> 10
---
README.md | 28 ++++----
gradle.properties | 5 +-
gradle/core.gradle | 2 +
src/main/kotlin/org/rooftop/netx/api/Codec.kt | 2 +-
.../org/rooftop/netx/api/OrchestrateResult.kt | 2 +-
.../rooftop/netx/api/TransactionManager.kt | 24 +++----
.../netx/engine/AbstractTransactionManager.kt | 24 +++----
.../org/rooftop/netx/engine/JsonCodec.kt | 17 ++---
.../netx/engine/OrchestrateResultHolder.kt | 2 +-
.../netx/engine/OrchestratorManager.kt | 4 +-
.../redis/RedisOrchestrateResultHolder.kt | 72 ++++++++++---------
.../netx/redis/RedisTransactionConfigurer.kt | 22 ++++--
.../org/rooftop/netx/client/NetxLoadTest.kt | 2 -
.../netx/client/NetxOrchestratorLoadTest.kt | 60 ++++++++++++++++
.../netx/engine/NetxCodecSupportsTest.kt | 2 +-
.../netx/engine/OrchestratorConfigurer.kt | 10 +++
.../rooftop/netx/engine/OrchestratorTest.kt | 17 +++++
.../redis/NoAckRedisTransactionConfigurer.kt | 12 ++--
.../resources/fast-recover-mode.properties | 1 +
19 files changed, 207 insertions(+), 101 deletions(-)
create mode 100644 src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt
diff --git a/README.md b/README.md
index 4b41315..8cc1645 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-![version 0.3.0](https://img.shields.io/badge/version-0.3.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
+![version 0.3.1](https://img.shields.io/badge/version-0.3.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square)
![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white)
Saga pattern 으로 구현된 분산 트랜잭션 프레임워크 입니다.
@@ -41,18 +41,20 @@ class Application {
#### Properties
-| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
-|-------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
-| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
-| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
-| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
-| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
-| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
-| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
-| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
-| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
-| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
-| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
+| KEY | EXAMPLE | DESCRIPTION | DEFAULT |
+|------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|
+| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | |
+| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | |
+| **netx.password** | 0000 | 트랜잭션 관리에 사용할 메시지큐에 접속하는데 사용하는 password 입니다. 설정하지 않을시 비밀번호를 사용하지 않습니다. | |
+| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | |
+| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | |
+| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | |
+| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | |
+| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 |
+| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 |
+| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 |
+| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" |
+| **netx.pool-size** | 40 | 커넥션을 계속해서 맺어야할때, 최대 커넥션 수를 조절하는데 사용됩니다. | 10 |
### Usage example
diff --git a/gradle.properties b/gradle.properties
index d393a7a..de0d1c9 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -2,7 +2,7 @@ kotlin.code.style=official
### Project ###
group=org.rooftop.netx
-version=0.3.0
+version=0.3.1
compatibility=17
### Sonarcloud ###
@@ -38,3 +38,6 @@ jacksonVersion=2.16.1
### Awaitility ###
awaitilityVersion=3.0.0
+
+### Pooling ###
+pooling=1.0.5
diff --git a/gradle/core.gradle b/gradle/core.gradle
index 3b2c95b..4d99ec4 100644
--- a/gradle/core.gradle
+++ b/gradle/core.gradle
@@ -3,4 +3,6 @@ dependencies {
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-parameter-names:${jacksonVersion}"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${jacksonVersion}"
+ implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}"
+ implementation "io.projectreactor.addons:reactor-pool:${pooling}"
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/Codec.kt b/src/main/kotlin/org/rooftop/netx/api/Codec.kt
index ebf3697..bf8f419 100644
--- a/src/main/kotlin/org/rooftop/netx/api/Codec.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/Codec.kt
@@ -4,7 +4,7 @@ import kotlin.reflect.KClass
interface Codec {
- fun encode(data: T): String
+ fun encode(data: T): String
fun decode(data: String, type: KClass): T
}
diff --git a/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt b/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt
index e6f5469..50ea4a1 100644
--- a/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt
@@ -2,7 +2,7 @@ package org.rooftop.netx.api
import kotlin.reflect.KClass
-class OrchestrateResult(
+data class OrchestrateResult(
val isSuccess: Boolean,
private val codec: Codec,
private val result: String
diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
index eabff50..3662e03 100644
--- a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt
@@ -4,21 +4,21 @@ import reactor.core.publisher.Mono
interface TransactionManager {
- fun start(undo: T): Mono
+ fun start(undo: T): Mono
- fun start(undo: T, event: S): Mono
+ fun start(undo: T, event: S): Mono
- fun syncStart(undo: T): String
+ fun syncStart(undo: T): String
- fun syncStart(undo: T, event: S): String
+ fun syncStart(undo: T, event: S): String
- fun join(transactionId: String, undo: T): Mono
+ fun join(transactionId: String, undo: T): Mono
- fun join(transactionId: String, undo: T, event: S): Mono
+ fun join(transactionId: String, undo: T, event: S): Mono
- fun syncJoin(transactionId: String, undo: T): String
+ fun syncJoin(transactionId: String, undo: T): String
- fun syncJoin(transactionId: String, undo: T, event: S): String
+ fun syncJoin(transactionId: String, undo: T, event: S): String
fun exists(transactionId: String): Mono
@@ -26,18 +26,18 @@ interface TransactionManager {
fun commit(transactionId: String): Mono
- fun commit(transactionId: String, event: T): Mono
+ fun commit(transactionId: String, event: T): Mono
fun syncCommit(transactionId: String): String
- fun syncCommit(transactionId: String, event: T): String
+ fun syncCommit(transactionId: String, event: T): String
fun rollback(transactionId: String, cause: String): Mono
- fun rollback(transactionId: String, cause: String, event: T): Mono
+ fun rollback(transactionId: String, cause: String, event: T): Mono
fun syncRollback(transactionId: String, cause: String): String
- fun syncRollback(transactionId: String, cause: String, event: T): String
+ fun syncRollback(transactionId: String, cause: String, event: T): String
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
index 5cd1029..087d047 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt
@@ -18,22 +18,22 @@ abstract class AbstractTransactionManager(
private val transactionIdGenerator: TransactionIdGenerator,
) : TransactionManager {
- final override fun syncStart(undo: T): String {
+ final override fun syncStart(undo: T): String {
return start(undo).block()
?: throw TransactionException("Cannot start transaction \"$undo\"")
}
- override fun syncStart(undo: T, event: S): String {
+ override fun syncStart(undo: T, event: S): String {
return start(undo, event).block()
?: throw TransactionException("Cannot start transaction \"$undo\" \"$event\"")
}
- final override fun syncJoin(transactionId: String, undo: T): String {
+ final override fun syncJoin(transactionId: String, undo: T): String {
return join(transactionId, undo).block()
?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\"")
}
- override fun syncJoin(transactionId: String, undo: T, event: S): String {
+ override fun syncJoin(transactionId: String, undo: T, event: S): String {
return join(transactionId, undo, event).block()
?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\", \"$event\"")
}
@@ -48,7 +48,7 @@ abstract class AbstractTransactionManager(
?: throw TransactionException("Cannot commit transaction \"$transactionId\"")
}
- override fun syncCommit(transactionId: String, event: T): String {
+ override fun syncCommit(transactionId: String, event: T): String {
return commit(transactionId, event).block()
?: throw TransactionException("Cannot commit transaction \"$transactionId\" \"$event\"")
}
@@ -58,12 +58,12 @@ abstract class AbstractTransactionManager(
?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\"")
}
- override fun syncRollback(transactionId: String, cause: String, event: T): String {
+ override fun syncRollback(transactionId: String, cause: String, event: T): String {
return rollback(transactionId, cause, event).block()
?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"")
}
- final override fun start(undo: T): Mono {
+ final override fun start(undo: T): Mono {
return Mono.fromCallable { codec.encode(undo) }
.flatMap { encodedUndo ->
startTransaction(encodedUndo, null)
@@ -72,7 +72,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) }
}
- override fun start(undo: T, event: S): Mono {
+ override fun start(undo: T, event: S): Mono {
return Mono.fromCallable { codec.encode(undo) }
.map { it to codec.encode(event) }
.flatMap { (encodedUndo, encodedEvent) ->
@@ -98,7 +98,7 @@ abstract class AbstractTransactionManager(
}
}
- override fun join(transactionId: String, undo: T): Mono {
+ override fun join(transactionId: String, undo: T): Mono {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
@@ -114,7 +114,7 @@ abstract class AbstractTransactionManager(
}
}
- override fun join(transactionId: String, undo: T, event: S): Mono {
+ override fun join(transactionId: String, undo: T, event: S): Mono {
return getAnyTransaction(transactionId)
.map {
if (it == TransactionState.COMMIT) {
@@ -153,7 +153,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}
- override fun rollback(transactionId: String, cause: String, event: T): Mono {
+ override fun rollback(transactionId: String, cause: String, event: T): Mono {
return exists(transactionId)
.infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
@@ -189,7 +189,7 @@ abstract class AbstractTransactionManager(
.contextWrite { it.put(CONTEXT_TX_KEY, transactionId) }
}
- override fun commit(transactionId: String, event: T): Mono {
+ override fun commit(transactionId: String, event: T): Mono {
return exists(transactionId)
.infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists")
.map { codec.encode(event) }
diff --git a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt
index 8e091a1..603b3ac 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt
@@ -1,24 +1,17 @@
package org.rooftop.netx.engine
-import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.kotlin.KotlinModule
-import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import org.rooftop.netx.api.Codec
import org.rooftop.netx.api.DecodeException
import org.rooftop.netx.api.EncodeException
import kotlin.reflect.KClass
-class JsonCodec : Codec {
+class JsonCodec(
+ private val objectMapper: ObjectMapper,
+) : Codec {
- private val objectMapper: ObjectMapper by lazy {
- ObjectMapper().registerModule(ParameterNamesModule(JsonCreator.Mode.PROPERTIES))
- .registerModule(KotlinModule.Builder().build())
- }
-
- override fun encode(data: T): String {
- requireNotNull(data) { "Data cannot be null" }
- require(data!!::class != Any::class) { "Data cannot be Any" }
+ override fun encode(data: T): String {
+ require(data::class != Any::class) { "Data cannot be Any" }
return runCatching { objectMapper.writeValueAsString(data) }
.getOrElse {
throw EncodeException("Cannot encode \"${data}\" to \"${String::class}\"", it)
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt
index 40064db..18e1567 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt
@@ -9,5 +9,5 @@ interface OrchestrateResultHolder {
fun getResult(timeout: Duration, transactionId: String): Mono
- fun setResult(transactionId: String, state: TransactionState, result: T): Mono
+ fun setResult(transactionId: String, state: TransactionState, result: T): Mono
}
diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
index 0af0be5..f5d7b11 100644
--- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
+++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt
@@ -22,7 +22,7 @@ class OrchestratorManager(
}
override fun transaction(request: T): Mono {
- return transaction(ONE_MINUTES_TO_TIME_OUT, request)
+ return transaction(TEN_SECONDS_TO_TIME_OUT, request)
}
override fun transaction(timeoutMillis: Long, request: T): Mono {
@@ -39,6 +39,6 @@ class OrchestratorManager(
private companion object {
private const val UNDO = "Orchestrate mode";
- private const val ONE_MINUTES_TO_TIME_OUT = 60000L
+ private const val TEN_SECONDS_TO_TIME_OUT = 10000L
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt
index 0b01cbd..0189abe 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt
@@ -7,13 +7,17 @@ import org.rooftop.netx.api.ResultTimeoutException
import org.rooftop.netx.engine.OrchestrateResultHolder
import org.rooftop.netx.engine.core.Transaction
import org.rooftop.netx.engine.core.TransactionState
+import org.rooftop.netx.engine.logging.info
import org.springframework.data.redis.core.ReactiveRedisTemplate
import reactor.core.publisher.Mono
+import reactor.pool.PoolBuilder
import java.util.concurrent.TimeoutException
import kotlin.time.Duration
import kotlin.time.toJavaDuration
+
class RedisOrchestrateResultHolder(
+ poolSize: Int,
private val codec: Codec,
private val serverId: String,
private val group: String,
@@ -21,43 +25,45 @@ class RedisOrchestrateResultHolder(
private val reactiveRedisTemplate: ReactiveRedisTemplate,
) : OrchestrateResultHolder {
+ private val pool = PoolBuilder.from(Mono.just(reactiveRedisTemplate.opsForList()))
+ .sizeBetween(1, poolSize)
+ .maxPendingAcquireUnbounded()
+ .buildPool()
+
override fun getResult(timeout: Duration, transactionId: String): Mono {
- return reactiveRedisTemplate.listenToChannel(CHANNEL)
- .timeout(timeout.toJavaDuration())
- .onErrorMap {
- if (it::class == TimeoutException::class) {
- return@onErrorMap ResultTimeoutException(
- "Can't get result in \"$timeout\" time",
- it,
+ return pool.withPoolable {
+ it.leftPop("Result:$transactionId", timeout.toJavaDuration())
+ .switchIfEmpty(Mono.error {
+ ResultTimeoutException(
+ "Cannot get result in \"$timeout\" time",
+ TimeoutException()
)
- }
- it
- }
- .filter { it.message.id == transactionId }
- .map {
- OrchestrateResult(
- isSuccess = it.message.state == TransactionState.COMMIT,
- codec = codec,
- result = it.message.event
- ?: throw NullPointerException("OrchestrateResult message cannot be null")
- )
- }
- .next()
- }
-
- override fun setResult(transactionId: String, state: TransactionState, result: T): Mono {
- return reactiveRedisTemplate.convertAndSend(
- CHANNEL, Transaction(
- transactionId,
- serverId,
- group,
- state,
- event = objectMapper.writeValueAsString(result)
+ })
+ }.single().map {
+ OrchestrateResult(
+ isSuccess = it.state == TransactionState.COMMIT,
+ codec = codec,
+ result = it.event
+ ?: throw NullPointerException("OrchestrateResult message cannot be null")
)
- ).map { result }
+ }.doOnNext { info("Get result $it") }
}
- private companion object {
- private const val CHANNEL = "ORCHESTRATE_RESULT_CHANNEL"
+ override fun setResult(
+ transactionId: String,
+ state: TransactionState,
+ result: T
+ ): Mono {
+ return reactiveRedisTemplate.opsForList()
+ .leftPush(
+ "Result:$transactionId", Transaction(
+ id = transactionId,
+ serverId = serverId,
+ group = group,
+ state = state,
+ event = objectMapper.writeValueAsString(result)
+ )
+ ).map { result }
+ .doOnNext { info("Set result $it") }
}
}
diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
index fa248f5..b1a0326 100644
--- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
+++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt
@@ -2,6 +2,7 @@ package org.rooftop.netx.redis
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import org.rooftop.netx.api.TransactionManager
@@ -18,6 +19,8 @@ import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
+import org.springframework.data.redis.connection.RedisPassword
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer
@@ -29,6 +32,7 @@ import org.springframework.data.redis.serializer.StringRedisSerializer
class RedisTransactionConfigurer(
@Value("\${netx.host}") private val host: String,
@Value("\${netx.port}") private val port: String,
+ @Value("\${netx.password}") private val password: String?,
@Value("\${netx.group}") private val nodeGroup: String,
@Value("\${netx.node-id}") private val nodeId: Int,
@Value("\${netx.node-name}") private val nodeName: String,
@@ -36,6 +40,7 @@ class RedisTransactionConfigurer(
@Value("\${netx.orphan-milli:60000}") private val orphanMilli: Long,
@Value("\${netx.backpressure:40}") private val backpressureSize: Int,
@Value("\${netx.logging.level:off}") loggingLevel: String,
+ @Value("\${netx.pool-size:10}") private val poolSize: Int,
private val applicationContext: ApplicationContext,
) {
@@ -80,6 +85,7 @@ class RedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisOrchestrateResultHolder(): OrchestrateResultHolder = RedisOrchestrateResultHolder(
+ poolSize,
jsonCodec(),
nodeName,
nodeGroup,
@@ -87,11 +93,16 @@ class RedisTransactionConfigurer(
reactiveRedisTemplate(),
)
+ @Bean
+ @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
+ fun jsonCodec(): JsonCodec = JsonCodec(netxObjectMapper())
+
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun netxObjectMapper(): ObjectMapper =
ObjectMapper().registerModule(ParameterNamesModule(JsonCreator.Mode.PROPERTIES))
.registerModule(KotlinModule.Builder().build())
+ .registerModule(JavaTimeModule())
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
@@ -122,10 +133,6 @@ class RedisTransactionConfigurer(
info("RedisStreamTransactionDispatcher connect to host : \"$host\" port : \"$port\" nodeName : \"$nodeName\" nodeGroup : \"$nodeGroup\"")
}
- @Bean
- @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun jsonCodec(): JsonCodec = JsonCodec()
-
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun reactiveRedisTemplate(): ReactiveRedisTemplate {
@@ -146,6 +153,11 @@ class RedisTransactionConfigurer(
fun reactiveRedisConnectionFactory(): ReactiveRedisConnectionFactory {
val port: String = System.getProperty("netx.port") ?: port
- return LettuceConnectionFactory(host, port.toInt())
+ val redisStandaloneConfiguration = RedisStandaloneConfiguration()
+ redisStandaloneConfiguration.hostName = host
+ redisStandaloneConfiguration.port = port.toInt()
+ redisStandaloneConfiguration.password = RedisPassword.of(password)
+
+ return LettuceConnectionFactory(redisStandaloneConfiguration)
}
}
diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
index aa54422..5812cf0 100644
--- a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt
@@ -5,7 +5,6 @@ import io.kotest.core.annotation.DisplayName
import io.kotest.core.spec.style.FunSpec
import io.kotest.data.forAll
import io.kotest.data.row
-import org.rooftop.netx.api.Orchestrator
import org.rooftop.netx.meta.EnableDistributedTransaction
import org.rooftop.netx.redis.RedisContainer
import org.springframework.boot.test.context.SpringBootTest
@@ -18,7 +17,6 @@ import kotlin.time.Duration.Companion.minutes
LoadRunner::class,
NetxClient::class,
TransactionReceiveStorage::class,
- OrchestratorConfigurer::class,
]
)
@EnableDistributedTransaction
diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt
new file mode 100644
index 0000000..994373c
--- /dev/null
+++ b/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt
@@ -0,0 +1,60 @@
+package org.rooftop.netx.client
+
+import io.kotest.assertions.nondeterministic.eventually
+import io.kotest.core.annotation.DisplayName
+import io.kotest.core.spec.style.FunSpec
+import io.kotest.data.forAll
+import io.kotest.data.row
+import io.kotest.matchers.equals.shouldBeEqual
+import org.rooftop.netx.api.Orchestrator
+import org.rooftop.netx.meta.EnableDistributedTransaction
+import org.rooftop.netx.redis.RedisContainer
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.TestPropertySource
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.time.Duration.Companion.minutes
+
+@DisplayName("Netx orchestartor 부하테스트")
+@SpringBootTest(
+ classes = [
+ RedisContainer::class,
+ LoadRunner::class,
+ OrchestratorConfigurer::class,
+ ]
+)
+@EnableDistributedTransaction
+@TestPropertySource("classpath:fast-recover-mode.properties")
+class NetxOrchestratorLoadTest(
+ private val loadRunner: LoadRunner,
+ private val orchestrator: Orchestrator,
+) : FunSpec({
+
+ test("Netx의 Orcehstrator는 부하가 가중되어도, 결과적 일관성을 보장한다.") {
+ forAll(
+ row(1),
+ row(10),
+ row(100),
+ row(1_000),
+ row(10_000),
+ ) { count ->
+ val resultStorage = ConcurrentHashMap.newKeySet(count)
+
+ val atomicInt = AtomicInteger(0)
+ loadRunner.load(count) {
+ orchestrator.transaction(THREE_MINUTES_MILLIS, atomicInt.getAndIncrement())
+ .map { resultStorage.add(it.decodeResult(Int::class)) }
+ .subscribe()
+ }
+
+ eventually(10.minutes) {
+ resultStorage.size shouldBeEqual count
+ }
+ }
+ }
+}) {
+ private companion object {
+ private const val THREE_MINUTES_MILLIS = 1000 * 60 * 3L
+ }
+}
+
diff --git a/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt b/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt
index f0459b8..d18b44a 100644
--- a/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt
@@ -24,7 +24,7 @@ class NetxCodecSupportsTest(
private val transactionReceiveStorage: TransactionReceiveStorage,
) : StringSpec({
- fun startAndRollbackTransaction(undo: T) {
+ fun startAndRollbackTransaction(undo: T) {
val transactionId = transactionManager.syncStart(undo)
transactionManager.syncRollback(transactionId, "for codec test")
diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
index 5a3eac9..9a8da16 100644
--- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
+++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt
@@ -103,4 +103,14 @@ class OrchestratorConfigurer : AbstractOrchestratorConfigurer() {
""
}.build()
}
+
+ @Bean
+ fun instantOrchestrator(): Orchestrator {
+ return newOrchestrator()
+ .startSync {
+ it.decodeEvent(OrchestratorTest.InstantWrapper::class)
+ }.commitSync {
+ it.decodeEvent(OrchestratorTest.InstantWrapper::class)
+ }.build()
+ }
}
diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
index 353866c..735f63e 100644
--- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
+++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt
@@ -13,6 +13,7 @@ import org.rooftop.netx.redis.RedisContainer
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
+import java.time.Instant
@EnableDistributedTransaction
@ContextConfiguration(
@@ -29,6 +30,7 @@ class OrchestratorTest(
@Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator,
@Qualifier("noRollbackForOrchestrator") private val noRollbackForOrchestrator: Orchestrator,
@Qualifier("timeOutOrchestrator") private val timeOutOrchestrator: Orchestrator,
+ private val instantOrchestrator: Orchestrator,
) : DescribeSpec({
describe("numberOrchestrator 구현채는") {
@@ -93,6 +95,17 @@ class OrchestratorTest(
}
}
}
+
+ describe("instantOrchestrator 구현채는") {
+ context("InstantWrapper를 입력받아서,") {
+ val request = InstantWrapper(Instant.now())
+ it("같은 InstantWrapper를 반환한다.") {
+ val result = instantOrchestrator.transactionSync(request)
+
+ result.decodeResult(InstantWrapper::class) shouldBeEqualToComparingFields request
+ }
+ }
+ }
}) {
data class Home(
val address: String,
@@ -104,4 +117,8 @@ class OrchestratorTest(
}
data class Person(val name: String)
+
+ data class InstantWrapper(
+ val time: Instant,
+ )
}
diff --git a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
index c0d5509..bf5d3d2 100644
--- a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
+++ b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt
@@ -6,9 +6,9 @@ import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
import org.rooftop.netx.api.TransactionManager
import org.rooftop.netx.engine.JsonCodec
+import org.rooftop.netx.engine.OrchestrateResultHolder
import org.rooftop.netx.engine.TransactionIdGenerator
import org.rooftop.netx.engine.core.Transaction
-import org.rooftop.netx.engine.OrchestrateResultHolder
import org.rooftop.netx.engine.logging.logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
@@ -34,6 +34,7 @@ class NoAckRedisTransactionConfigurer(
@Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long,
@Value("\${netx.backpressure:10}") private val backpressureSize: Int,
@Value("\${netx.logging.level:off}") loggingLevel: String,
+ @Value("\${netx.pool-size:100}") private val poolSize: Int,
private val applicationContext: ApplicationContext,
) {
@@ -70,6 +71,10 @@ class NoAckRedisTransactionConfigurer(
objectMapper = netxObjectMapper(),
).also { it.subscribeStream() }
+ @Bean
+ @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
+ fun jsonCodec(): JsonCodec = JsonCodec(netxObjectMapper())
+
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun netxObjectMapper(): ObjectMapper =
@@ -79,6 +84,7 @@ class NoAckRedisTransactionConfigurer(
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun redisOrchestrateResultHolder(): OrchestrateResultHolder = RedisOrchestrateResultHolder(
+ poolSize,
jsonCodec(),
nodeName,
nodeGroup,
@@ -119,10 +125,6 @@ class NoAckRedisTransactionConfigurer(
codec = jsonCodec(),
)
- @Bean
- @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
- fun jsonCodec(): JsonCodec = JsonCodec()
-
@Bean
@ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis")
fun reactiveRedisTemplate(): ReactiveRedisTemplate {
diff --git a/src/test/resources/fast-recover-mode.properties b/src/test/resources/fast-recover-mode.properties
index a7a7600..b606fab 100644
--- a/src/test/resources/fast-recover-mode.properties
+++ b/src/test/resources/fast-recover-mode.properties
@@ -8,3 +8,4 @@ netx.recovery-milli=100
netx.orphan-milli=100
netx.backpressure=40
netx.logging.level=info
+netx.pool-size=10