diff --git a/README.md b/README.md index 4b41315..8cc1645 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@
-![version 0.3.0](https://img.shields.io/badge/version-0.3.0-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) +![version 0.3.1](https://img.shields.io/badge/version-0.3.1-black?labelColor=black&style=flat-square) ![jdk 17](https://img.shields.io/badge/minimum_jdk-17-orange?labelColor=black&style=flat-square) ![load-test](https://img.shields.io/badge/load%20test%2010%2C000%2C000-success-brightgreen?labelColor=black&style=flat-square) ![redis--stream](https://img.shields.io/badge/-redis--stream-da2020?style=flat-square&logo=Redis&logoColor=white) Saga pattern 으로 구현된 분산 트랜잭션 프레임워크 입니다. @@ -41,18 +41,20 @@ class Application { #### Properties -| KEY | EXAMPLE | DESCRIPTION | DEFAULT | -|-------------------------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------| -| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | | -| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | | -| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | | -| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | | -| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | | -| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | | -| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 | -| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 | -| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 | -| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" | +| KEY | EXAMPLE | DESCRIPTION | DEFAULT | +|------------------------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------| +| **netx.mode** | redis | 트랜잭션 관리에 사용할 메시지 큐 구현체의 mode 입니다. | | +| **netx.host** | localhost | 트랜잭션 관리에 사용할 메시지 큐 의 host url 입니다. (ex. redis host) | | +| **netx.password** | 0000 | 트랜잭션 관리에 사용할 메시지큐에 접속하는데 사용하는 password 입니다. 설정하지 않을시 비밀번호를 사용하지 않습니다. | | +| **netx.port** | 6379 | 트랜잭션 관리에 사용할 메시지 큐의 port 입니다. | | +| **netx.group** | pay-group | 분산 노드의 그룹입니다. 트랜잭션 이벤트는 같은 그룹내 하나의 노드로만 전송됩니다. | | +| **netx.node-id** | 1 | id 생성에 사용될 식별자입니다. 모든 서버는 반드시 다른 id를 할당받아야 하며, 1~256 만큼의 id를 설정할 수 있습니다. _`중복된 id 생성을 방지하기위해 twitter snowflake 알고리즘으로 id를 생성합니다.`_ | | +| **netx.node-name** | pay-1 | _`netx.group`_ 에 참여할 서버의 이름입니다. 같은 그룹내에 중복된 이름이 존재하면 안됩니다. | | +| **netx.recovery-milli** | 1000 | _`netx.recovery-milli`_ 마다 _`netx.orphan-milli`_ 동안 처리 되지 않는 트랜잭션을 찾아 재실행합니다. | 1000 | +| **netx.orphan-milli** | 60000 | PENDING 상태가된 트랜잭션 중, orphan-milli가 지나도 ACK 상태가 되지 않은 트랜잭션을 찾아 재시작합니다. | 60000 | +| **netx.backpressure** | 40 | 한번에 수신가능한 트랜잭션 수를 조절합니다. **너무 높게설정하면 서버에 부하가 올 수 있고, 낮게 설정하면 성능이 낮아질 수 있습니다.** 이 설정은 다른 서버가 발행한 트랜잭션 수신량과 처리에 실패한 트랜잭션 수신량에 영향을 미칩니다. 수신되지 못하거나, drop된 트랜잭션은 자동으로 retry 대기열에 들어갑니다. | 40 | +| **netx.logging.level** | info | logging level을 지정합니다. 선택가능한 value는 다음과 같습니다. "info", "warn", "off" | "off" | +| **netx.pool-size** | 40 | 커넥션을 계속해서 맺어야할때, 최대 커넥션 수를 조절하는데 사용됩니다. | 10 | ### Usage example diff --git a/gradle.properties b/gradle.properties index d393a7a..de0d1c9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ kotlin.code.style=official ### Project ### group=org.rooftop.netx -version=0.3.0 +version=0.3.1 compatibility=17 ### Sonarcloud ### @@ -38,3 +38,6 @@ jacksonVersion=2.16.1 ### Awaitility ### awaitilityVersion=3.0.0 + +### Pooling ### +pooling=1.0.5 diff --git a/gradle/core.gradle b/gradle/core.gradle index 3b2c95b..4d99ec4 100644 --- a/gradle/core.gradle +++ b/gradle/core.gradle @@ -3,4 +3,6 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" implementation "com.fasterxml.jackson.module:jackson-module-parameter-names:${jacksonVersion}" implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${jacksonVersion}" + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${jacksonVersion}" + implementation "io.projectreactor.addons:reactor-pool:${pooling}" } diff --git a/src/main/kotlin/org/rooftop/netx/api/Codec.kt b/src/main/kotlin/org/rooftop/netx/api/Codec.kt index ebf3697..bf8f419 100644 --- a/src/main/kotlin/org/rooftop/netx/api/Codec.kt +++ b/src/main/kotlin/org/rooftop/netx/api/Codec.kt @@ -4,7 +4,7 @@ import kotlin.reflect.KClass interface Codec { - fun encode(data: T): String + fun encode(data: T): String fun decode(data: String, type: KClass): T } diff --git a/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt b/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt index e6f5469..50ea4a1 100644 --- a/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt +++ b/src/main/kotlin/org/rooftop/netx/api/OrchestrateResult.kt @@ -2,7 +2,7 @@ package org.rooftop.netx.api import kotlin.reflect.KClass -class OrchestrateResult( +data class OrchestrateResult( val isSuccess: Boolean, private val codec: Codec, private val result: String diff --git a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt index eabff50..3662e03 100644 --- a/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt +++ b/src/main/kotlin/org/rooftop/netx/api/TransactionManager.kt @@ -4,21 +4,21 @@ import reactor.core.publisher.Mono interface TransactionManager { - fun start(undo: T): Mono + fun start(undo: T): Mono - fun start(undo: T, event: S): Mono + fun start(undo: T, event: S): Mono - fun syncStart(undo: T): String + fun syncStart(undo: T): String - fun syncStart(undo: T, event: S): String + fun syncStart(undo: T, event: S): String - fun join(transactionId: String, undo: T): Mono + fun join(transactionId: String, undo: T): Mono - fun join(transactionId: String, undo: T, event: S): Mono + fun join(transactionId: String, undo: T, event: S): Mono - fun syncJoin(transactionId: String, undo: T): String + fun syncJoin(transactionId: String, undo: T): String - fun syncJoin(transactionId: String, undo: T, event: S): String + fun syncJoin(transactionId: String, undo: T, event: S): String fun exists(transactionId: String): Mono @@ -26,18 +26,18 @@ interface TransactionManager { fun commit(transactionId: String): Mono - fun commit(transactionId: String, event: T): Mono + fun commit(transactionId: String, event: T): Mono fun syncCommit(transactionId: String): String - fun syncCommit(transactionId: String, event: T): String + fun syncCommit(transactionId: String, event: T): String fun rollback(transactionId: String, cause: String): Mono - fun rollback(transactionId: String, cause: String, event: T): Mono + fun rollback(transactionId: String, cause: String, event: T): Mono fun syncRollback(transactionId: String, cause: String): String - fun syncRollback(transactionId: String, cause: String, event: T): String + fun syncRollback(transactionId: String, cause: String, event: T): String } diff --git a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt index 5cd1029..087d047 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/AbstractTransactionManager.kt @@ -18,22 +18,22 @@ abstract class AbstractTransactionManager( private val transactionIdGenerator: TransactionIdGenerator, ) : TransactionManager { - final override fun syncStart(undo: T): String { + final override fun syncStart(undo: T): String { return start(undo).block() ?: throw TransactionException("Cannot start transaction \"$undo\"") } - override fun syncStart(undo: T, event: S): String { + override fun syncStart(undo: T, event: S): String { return start(undo, event).block() ?: throw TransactionException("Cannot start transaction \"$undo\" \"$event\"") } - final override fun syncJoin(transactionId: String, undo: T): String { + final override fun syncJoin(transactionId: String, undo: T): String { return join(transactionId, undo).block() ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\"") } - override fun syncJoin(transactionId: String, undo: T, event: S): String { + override fun syncJoin(transactionId: String, undo: T, event: S): String { return join(transactionId, undo, event).block() ?: throw TransactionException("Cannot join transaction \"$transactionId\", \"$undo\", \"$event\"") } @@ -48,7 +48,7 @@ abstract class AbstractTransactionManager( ?: throw TransactionException("Cannot commit transaction \"$transactionId\"") } - override fun syncCommit(transactionId: String, event: T): String { + override fun syncCommit(transactionId: String, event: T): String { return commit(transactionId, event).block() ?: throw TransactionException("Cannot commit transaction \"$transactionId\" \"$event\"") } @@ -58,12 +58,12 @@ abstract class AbstractTransactionManager( ?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\"") } - override fun syncRollback(transactionId: String, cause: String, event: T): String { + override fun syncRollback(transactionId: String, cause: String, event: T): String { return rollback(transactionId, cause, event).block() ?: throw TransactionException("Cannot rollback transaction \"$transactionId\", \"$cause\" \"$event\"") } - final override fun start(undo: T): Mono { + final override fun start(undo: T): Mono { return Mono.fromCallable { codec.encode(undo) } .flatMap { encodedUndo -> startTransaction(encodedUndo, null) @@ -72,7 +72,7 @@ abstract class AbstractTransactionManager( .contextWrite { it.put(CONTEXT_TX_KEY, transactionIdGenerator.generate()) } } - override fun start(undo: T, event: S): Mono { + override fun start(undo: T, event: S): Mono { return Mono.fromCallable { codec.encode(undo) } .map { it to codec.encode(event) } .flatMap { (encodedUndo, encodedEvent) -> @@ -98,7 +98,7 @@ abstract class AbstractTransactionManager( } } - override fun join(transactionId: String, undo: T): Mono { + override fun join(transactionId: String, undo: T): Mono { return getAnyTransaction(transactionId) .map { if (it == TransactionState.COMMIT) { @@ -114,7 +114,7 @@ abstract class AbstractTransactionManager( } } - override fun join(transactionId: String, undo: T, event: S): Mono { + override fun join(transactionId: String, undo: T, event: S): Mono { return getAnyTransaction(transactionId) .map { if (it == TransactionState.COMMIT) { @@ -153,7 +153,7 @@ abstract class AbstractTransactionManager( .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) } } - override fun rollback(transactionId: String, cause: String, event: T): Mono { + override fun rollback(transactionId: String, cause: String, event: T): Mono { return exists(transactionId) .infoOnError("Cannot rollback transaction cause, transaction \"$transactionId\" is not exists") .map { codec.encode(event) } @@ -189,7 +189,7 @@ abstract class AbstractTransactionManager( .contextWrite { it.put(CONTEXT_TX_KEY, transactionId) } } - override fun commit(transactionId: String, event: T): Mono { + override fun commit(transactionId: String, event: T): Mono { return exists(transactionId) .infoOnError("Cannot commit transaction cause, transaction \"$transactionId\" is not exists") .map { codec.encode(event) } diff --git a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt index 8e091a1..603b3ac 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/JsonCodec.kt @@ -1,24 +1,17 @@ package org.rooftop.netx.engine -import com.fasterxml.jackson.annotation.JsonCreator import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.kotlin.KotlinModule -import com.fasterxml.jackson.module.paramnames.ParameterNamesModule import org.rooftop.netx.api.Codec import org.rooftop.netx.api.DecodeException import org.rooftop.netx.api.EncodeException import kotlin.reflect.KClass -class JsonCodec : Codec { +class JsonCodec( + private val objectMapper: ObjectMapper, +) : Codec { - private val objectMapper: ObjectMapper by lazy { - ObjectMapper().registerModule(ParameterNamesModule(JsonCreator.Mode.PROPERTIES)) - .registerModule(KotlinModule.Builder().build()) - } - - override fun encode(data: T): String { - requireNotNull(data) { "Data cannot be null" } - require(data!!::class != Any::class) { "Data cannot be Any" } + override fun encode(data: T): String { + require(data::class != Any::class) { "Data cannot be Any" } return runCatching { objectMapper.writeValueAsString(data) } .getOrElse { throw EncodeException("Cannot encode \"${data}\" to \"${String::class}\"", it) diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt index 40064db..18e1567 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestrateResultHolder.kt @@ -9,5 +9,5 @@ interface OrchestrateResultHolder { fun getResult(timeout: Duration, transactionId: String): Mono - fun setResult(transactionId: String, state: TransactionState, result: T): Mono + fun setResult(transactionId: String, state: TransactionState, result: T): Mono } diff --git a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt index 0af0be5..f5d7b11 100644 --- a/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt +++ b/src/main/kotlin/org/rooftop/netx/engine/OrchestratorManager.kt @@ -22,7 +22,7 @@ class OrchestratorManager( } override fun transaction(request: T): Mono { - return transaction(ONE_MINUTES_TO_TIME_OUT, request) + return transaction(TEN_SECONDS_TO_TIME_OUT, request) } override fun transaction(timeoutMillis: Long, request: T): Mono { @@ -39,6 +39,6 @@ class OrchestratorManager( private companion object { private const val UNDO = "Orchestrate mode"; - private const val ONE_MINUTES_TO_TIME_OUT = 60000L + private const val TEN_SECONDS_TO_TIME_OUT = 10000L } } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt index 0b01cbd..0189abe 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisOrchestrateResultHolder.kt @@ -7,13 +7,17 @@ import org.rooftop.netx.api.ResultTimeoutException import org.rooftop.netx.engine.OrchestrateResultHolder import org.rooftop.netx.engine.core.Transaction import org.rooftop.netx.engine.core.TransactionState +import org.rooftop.netx.engine.logging.info import org.springframework.data.redis.core.ReactiveRedisTemplate import reactor.core.publisher.Mono +import reactor.pool.PoolBuilder import java.util.concurrent.TimeoutException import kotlin.time.Duration import kotlin.time.toJavaDuration + class RedisOrchestrateResultHolder( + poolSize: Int, private val codec: Codec, private val serverId: String, private val group: String, @@ -21,43 +25,45 @@ class RedisOrchestrateResultHolder( private val reactiveRedisTemplate: ReactiveRedisTemplate, ) : OrchestrateResultHolder { + private val pool = PoolBuilder.from(Mono.just(reactiveRedisTemplate.opsForList())) + .sizeBetween(1, poolSize) + .maxPendingAcquireUnbounded() + .buildPool() + override fun getResult(timeout: Duration, transactionId: String): Mono { - return reactiveRedisTemplate.listenToChannel(CHANNEL) - .timeout(timeout.toJavaDuration()) - .onErrorMap { - if (it::class == TimeoutException::class) { - return@onErrorMap ResultTimeoutException( - "Can't get result in \"$timeout\" time", - it, + return pool.withPoolable { + it.leftPop("Result:$transactionId", timeout.toJavaDuration()) + .switchIfEmpty(Mono.error { + ResultTimeoutException( + "Cannot get result in \"$timeout\" time", + TimeoutException() ) - } - it - } - .filter { it.message.id == transactionId } - .map { - OrchestrateResult( - isSuccess = it.message.state == TransactionState.COMMIT, - codec = codec, - result = it.message.event - ?: throw NullPointerException("OrchestrateResult message cannot be null") - ) - } - .next() - } - - override fun setResult(transactionId: String, state: TransactionState, result: T): Mono { - return reactiveRedisTemplate.convertAndSend( - CHANNEL, Transaction( - transactionId, - serverId, - group, - state, - event = objectMapper.writeValueAsString(result) + }) + }.single().map { + OrchestrateResult( + isSuccess = it.state == TransactionState.COMMIT, + codec = codec, + result = it.event + ?: throw NullPointerException("OrchestrateResult message cannot be null") ) - ).map { result } + }.doOnNext { info("Get result $it") } } - private companion object { - private const val CHANNEL = "ORCHESTRATE_RESULT_CHANNEL" + override fun setResult( + transactionId: String, + state: TransactionState, + result: T + ): Mono { + return reactiveRedisTemplate.opsForList() + .leftPush( + "Result:$transactionId", Transaction( + id = transactionId, + serverId = serverId, + group = group, + state = state, + event = objectMapper.writeValueAsString(result) + ) + ).map { result } + .doOnNext { info("Set result $it") } } } diff --git a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt index fa248f5..b1a0326 100644 --- a/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt +++ b/src/main/kotlin/org/rooftop/netx/redis/RedisTransactionConfigurer.kt @@ -2,6 +2,7 @@ package org.rooftop.netx.redis import com.fasterxml.jackson.annotation.JsonCreator import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.KotlinModule import com.fasterxml.jackson.module.paramnames.ParameterNamesModule import org.rooftop.netx.api.TransactionManager @@ -18,6 +19,8 @@ import org.springframework.context.ApplicationContext import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory +import org.springframework.data.redis.connection.RedisPassword +import org.springframework.data.redis.connection.RedisStandaloneConfiguration import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory import org.springframework.data.redis.core.ReactiveRedisTemplate import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer @@ -29,6 +32,7 @@ import org.springframework.data.redis.serializer.StringRedisSerializer class RedisTransactionConfigurer( @Value("\${netx.host}") private val host: String, @Value("\${netx.port}") private val port: String, + @Value("\${netx.password}") private val password: String?, @Value("\${netx.group}") private val nodeGroup: String, @Value("\${netx.node-id}") private val nodeId: Int, @Value("\${netx.node-name}") private val nodeName: String, @@ -36,6 +40,7 @@ class RedisTransactionConfigurer( @Value("\${netx.orphan-milli:60000}") private val orphanMilli: Long, @Value("\${netx.backpressure:40}") private val backpressureSize: Int, @Value("\${netx.logging.level:off}") loggingLevel: String, + @Value("\${netx.pool-size:10}") private val poolSize: Int, private val applicationContext: ApplicationContext, ) { @@ -80,6 +85,7 @@ class RedisTransactionConfigurer( @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun redisOrchestrateResultHolder(): OrchestrateResultHolder = RedisOrchestrateResultHolder( + poolSize, jsonCodec(), nodeName, nodeGroup, @@ -87,11 +93,16 @@ class RedisTransactionConfigurer( reactiveRedisTemplate(), ) + @Bean + @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") + fun jsonCodec(): JsonCodec = JsonCodec(netxObjectMapper()) + @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun netxObjectMapper(): ObjectMapper = ObjectMapper().registerModule(ParameterNamesModule(JsonCreator.Mode.PROPERTIES)) .registerModule(KotlinModule.Builder().build()) + .registerModule(JavaTimeModule()) @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") @@ -122,10 +133,6 @@ class RedisTransactionConfigurer( info("RedisStreamTransactionDispatcher connect to host : \"$host\" port : \"$port\" nodeName : \"$nodeName\" nodeGroup : \"$nodeGroup\"") } - @Bean - @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") - fun jsonCodec(): JsonCodec = JsonCodec() - @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun reactiveRedisTemplate(): ReactiveRedisTemplate { @@ -146,6 +153,11 @@ class RedisTransactionConfigurer( fun reactiveRedisConnectionFactory(): ReactiveRedisConnectionFactory { val port: String = System.getProperty("netx.port") ?: port - return LettuceConnectionFactory(host, port.toInt()) + val redisStandaloneConfiguration = RedisStandaloneConfiguration() + redisStandaloneConfiguration.hostName = host + redisStandaloneConfiguration.port = port.toInt() + redisStandaloneConfiguration.password = RedisPassword.of(password) + + return LettuceConnectionFactory(redisStandaloneConfiguration) } } diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt index aa54422..5812cf0 100644 --- a/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt +++ b/src/test/kotlin/org/rooftop/netx/client/NetxLoadTest.kt @@ -5,7 +5,6 @@ import io.kotest.core.annotation.DisplayName import io.kotest.core.spec.style.FunSpec import io.kotest.data.forAll import io.kotest.data.row -import org.rooftop.netx.api.Orchestrator import org.rooftop.netx.meta.EnableDistributedTransaction import org.rooftop.netx.redis.RedisContainer import org.springframework.boot.test.context.SpringBootTest @@ -18,7 +17,6 @@ import kotlin.time.Duration.Companion.minutes LoadRunner::class, NetxClient::class, TransactionReceiveStorage::class, - OrchestratorConfigurer::class, ] ) @EnableDistributedTransaction diff --git a/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt b/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt new file mode 100644 index 0000000..994373c --- /dev/null +++ b/src/test/kotlin/org/rooftop/netx/client/NetxOrchestratorLoadTest.kt @@ -0,0 +1,60 @@ +package org.rooftop.netx.client + +import io.kotest.assertions.nondeterministic.eventually +import io.kotest.core.annotation.DisplayName +import io.kotest.core.spec.style.FunSpec +import io.kotest.data.forAll +import io.kotest.data.row +import io.kotest.matchers.equals.shouldBeEqual +import org.rooftop.netx.api.Orchestrator +import org.rooftop.netx.meta.EnableDistributedTransaction +import org.rooftop.netx.redis.RedisContainer +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.TestPropertySource +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import kotlin.time.Duration.Companion.minutes + +@DisplayName("Netx orchestartor 부하테스트") +@SpringBootTest( + classes = [ + RedisContainer::class, + LoadRunner::class, + OrchestratorConfigurer::class, + ] +) +@EnableDistributedTransaction +@TestPropertySource("classpath:fast-recover-mode.properties") +class NetxOrchestratorLoadTest( + private val loadRunner: LoadRunner, + private val orchestrator: Orchestrator, +) : FunSpec({ + + test("Netx의 Orcehstrator는 부하가 가중되어도, 결과적 일관성을 보장한다.") { + forAll( + row(1), + row(10), + row(100), + row(1_000), + row(10_000), + ) { count -> + val resultStorage = ConcurrentHashMap.newKeySet(count) + + val atomicInt = AtomicInteger(0) + loadRunner.load(count) { + orchestrator.transaction(THREE_MINUTES_MILLIS, atomicInt.getAndIncrement()) + .map { resultStorage.add(it.decodeResult(Int::class)) } + .subscribe() + } + + eventually(10.minutes) { + resultStorage.size shouldBeEqual count + } + } + } +}) { + private companion object { + private const val THREE_MINUTES_MILLIS = 1000 * 60 * 3L + } +} + diff --git a/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt b/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt index f0459b8..d18b44a 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/NetxCodecSupportsTest.kt @@ -24,7 +24,7 @@ class NetxCodecSupportsTest( private val transactionReceiveStorage: TransactionReceiveStorage, ) : StringSpec({ - fun startAndRollbackTransaction(undo: T) { + fun startAndRollbackTransaction(undo: T) { val transactionId = transactionManager.syncStart(undo) transactionManager.syncRollback(transactionId, "for codec test") diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt index 5a3eac9..9a8da16 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorConfigurer.kt @@ -103,4 +103,14 @@ class OrchestratorConfigurer : AbstractOrchestratorConfigurer() { "" }.build() } + + @Bean + fun instantOrchestrator(): Orchestrator { + return newOrchestrator() + .startSync { + it.decodeEvent(OrchestratorTest.InstantWrapper::class) + }.commitSync { + it.decodeEvent(OrchestratorTest.InstantWrapper::class) + }.build() + } } diff --git a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt index 353866c..735f63e 100644 --- a/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt +++ b/src/test/kotlin/org/rooftop/netx/engine/OrchestratorTest.kt @@ -13,6 +13,7 @@ import org.rooftop.netx.redis.RedisContainer import org.springframework.beans.factory.annotation.Qualifier import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource +import java.time.Instant @EnableDistributedTransaction @ContextConfiguration( @@ -29,6 +30,7 @@ class OrchestratorTest( @Qualifier("rollbackOrchestrator") private val rollbackOrchestrator: Orchestrator, @Qualifier("noRollbackForOrchestrator") private val noRollbackForOrchestrator: Orchestrator, @Qualifier("timeOutOrchestrator") private val timeOutOrchestrator: Orchestrator, + private val instantOrchestrator: Orchestrator, ) : DescribeSpec({ describe("numberOrchestrator 구현채는") { @@ -93,6 +95,17 @@ class OrchestratorTest( } } } + + describe("instantOrchestrator 구현채는") { + context("InstantWrapper를 입력받아서,") { + val request = InstantWrapper(Instant.now()) + it("같은 InstantWrapper를 반환한다.") { + val result = instantOrchestrator.transactionSync(request) + + result.decodeResult(InstantWrapper::class) shouldBeEqualToComparingFields request + } + } + } }) { data class Home( val address: String, @@ -104,4 +117,8 @@ class OrchestratorTest( } data class Person(val name: String) + + data class InstantWrapper( + val time: Instant, + ) } diff --git a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt index c0d5509..bf5d3d2 100644 --- a/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt +++ b/src/test/kotlin/org/rooftop/netx/redis/NoAckRedisTransactionConfigurer.kt @@ -6,9 +6,9 @@ import com.fasterxml.jackson.module.kotlin.KotlinModule import com.fasterxml.jackson.module.paramnames.ParameterNamesModule import org.rooftop.netx.api.TransactionManager import org.rooftop.netx.engine.JsonCodec +import org.rooftop.netx.engine.OrchestrateResultHolder import org.rooftop.netx.engine.TransactionIdGenerator import org.rooftop.netx.engine.core.Transaction -import org.rooftop.netx.engine.OrchestrateResultHolder import org.rooftop.netx.engine.logging.logger import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Value @@ -34,6 +34,7 @@ class NoAckRedisTransactionConfigurer( @Value("\${netx.orphan-milli:10000}") private val orphanMilli: Long, @Value("\${netx.backpressure:10}") private val backpressureSize: Int, @Value("\${netx.logging.level:off}") loggingLevel: String, + @Value("\${netx.pool-size:100}") private val poolSize: Int, private val applicationContext: ApplicationContext, ) { @@ -70,6 +71,10 @@ class NoAckRedisTransactionConfigurer( objectMapper = netxObjectMapper(), ).also { it.subscribeStream() } + @Bean + @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") + fun jsonCodec(): JsonCodec = JsonCodec(netxObjectMapper()) + @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun netxObjectMapper(): ObjectMapper = @@ -79,6 +84,7 @@ class NoAckRedisTransactionConfigurer( @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun redisOrchestrateResultHolder(): OrchestrateResultHolder = RedisOrchestrateResultHolder( + poolSize, jsonCodec(), nodeName, nodeGroup, @@ -119,10 +125,6 @@ class NoAckRedisTransactionConfigurer( codec = jsonCodec(), ) - @Bean - @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") - fun jsonCodec(): JsonCodec = JsonCodec() - @Bean @ConditionalOnProperty(prefix = "netx", name = ["mode"], havingValue = "redis") fun reactiveRedisTemplate(): ReactiveRedisTemplate { diff --git a/src/test/resources/fast-recover-mode.properties b/src/test/resources/fast-recover-mode.properties index a7a7600..b606fab 100644 --- a/src/test/resources/fast-recover-mode.properties +++ b/src/test/resources/fast-recover-mode.properties @@ -8,3 +8,4 @@ netx.recovery-milli=100 netx.orphan-milli=100 netx.backpressure=40 netx.logging.level=info +netx.pool-size=10