From 7709c8870de64dcc8096c2cc5dfaee081ab86112 Mon Sep 17 00:00:00 2001 From: Ahoo Wang Date: Mon, 11 Nov 2024 20:04:41 +0800 Subject: [PATCH] feat(es): Support AggregateIdScanner to SnapshotRepository --- .../eventsourcing/EventStreamKeyConverter.kt | 18 ++++----------- .../redis/eventsourcing/RedisEventStore.kt | 3 ++- .../eventsourcing/RedisSnapshotRepository.kt | 13 +++++++---- .../eventsourcing/SnapshotKeyConverter.kt | 23 +++++++++++++++---- .../DefaultSnapshotKeyConverterTest.kt | 12 +++++++++- .../EventStreamKeyConverterTest.kt | 9 ++------ 6 files changed, 47 insertions(+), 31 deletions(-) diff --git a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverter.kt b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverter.kt index 569d4eef940..48f94e168b4 100644 --- a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverter.kt +++ b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverter.kt @@ -15,30 +15,20 @@ package me.ahoo.wow.redis.eventsourcing import me.ahoo.wow.api.modeling.AggregateId import me.ahoo.wow.api.modeling.NamedAggregate -import me.ahoo.wow.modeling.aggregateId import me.ahoo.wow.modeling.toStringWithAlias -import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.unwrap import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.wrap object EventStreamKeyConverter : AggregateKeyConverter { - private const val ID_DELIMITER = "@" + const val ID_DELIMITER = "@" fun NamedAggregate.toKeyPrefix(): String { return "${toStringWithAlias()}${DELIMITER}es$DELIMITER" } - fun toAggregateIdKey(aggregateId: AggregateId): String { - return "${aggregateId.id}$ID_DELIMITER${aggregateId.tenantId}".wrap() - } - - fun toAggregateId(namedAggregate: NamedAggregate, key: String): AggregateId { - val prefix = namedAggregate.toKeyPrefix() - val idWithTenantId = key.removePrefix(prefix).unwrap() - idWithTenantId.split(ID_DELIMITER).let { - return namedAggregate.aggregateId(it[0], it[1]) - } + fun AggregateId.toKey(): String { + return "${id}$ID_DELIMITER$tenantId".wrap() } override fun convert(aggregateId: AggregateId): String { - return "${aggregateId.toKeyPrefix()}${toAggregateIdKey(aggregateId)}" + return "${aggregateId.toKeyPrefix()}${aggregateId.toKey()}" } } diff --git a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisEventStore.kt b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisEventStore.kt index a43ecdeef93..bea4113000d 100644 --- a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisEventStore.kt +++ b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisEventStore.kt @@ -20,6 +20,7 @@ import me.ahoo.wow.eventsourcing.AbstractEventStore import me.ahoo.wow.eventsourcing.EventVersionConflictException import me.ahoo.wow.exception.ErrorCodes import me.ahoo.wow.naming.getContextAlias +import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey import me.ahoo.wow.serialization.toJsonString import me.ahoo.wow.serialization.toObject import org.springframework.core.io.ClassPathResource @@ -41,7 +42,7 @@ class RedisEventStore( } override fun appendStream(eventStream: DomainEventStream): Mono { - val aggregateKey = EventStreamKeyConverter.toAggregateIdKey(eventStream.aggregateId) + val aggregateKey = eventStream.aggregateId.toKey() return redisTemplate.execute( SCRIPT_EVENT_STEAM_APPEND, listOf(aggregateKey), diff --git a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisSnapshotRepository.kt b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisSnapshotRepository.kt index 5a4e6abaaa6..e530bafd493 100644 --- a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisSnapshotRepository.kt +++ b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/RedisSnapshotRepository.kt @@ -17,9 +17,10 @@ import me.ahoo.wow.api.modeling.AggregateId import me.ahoo.wow.api.modeling.NamedAggregate import me.ahoo.wow.eventsourcing.snapshot.Snapshot import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository -import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKeyPrefix +import me.ahoo.wow.redis.eventsourcing.DefaultSnapshotKeyConverter.toKeyPrefix import me.ahoo.wow.serialization.toJsonString import me.ahoo.wow.serialization.toObject +import org.springframework.data.redis.connection.DataType import org.springframework.data.redis.core.ReactiveStringRedisTemplate import org.springframework.data.redis.core.ScanOptions import reactor.core.publisher.Flux @@ -27,7 +28,7 @@ import reactor.core.publisher.Mono class RedisSnapshotRepository( private val redisTemplate: ReactiveStringRedisTemplate, - private val keyConverter: SnapshotKeyConverter = DefaultSnapshotKeyConverter + private val keyConverter: AggregateKeyConverter = DefaultSnapshotKeyConverter ) : SnapshotRepository { override fun load(aggregateId: AggregateId): Mono> { @@ -51,10 +52,14 @@ class RedisSnapshotRepository( ): Flux { val keyPrefix = namedAggregate.toKeyPrefix() val keyPattern = "$keyPrefix*" - val options = ScanOptions.scanOptions().match(keyPattern).count(limit.toLong()).build() + val options = ScanOptions.scanOptions().match(keyPattern) + .type(DataType.STRING) + .count(limit.toLong()).build() return redisTemplate.scan(options) .map { - EventStreamKeyConverter.toAggregateId(namedAggregate, it) + DefaultSnapshotKeyConverter.toAggregateId(namedAggregate, it) + }.filter { + it.id > cursorId } } } diff --git a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/SnapshotKeyConverter.kt b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/SnapshotKeyConverter.kt index 521877ba4b8..30e28e1d673 100644 --- a/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/SnapshotKeyConverter.kt +++ b/wow-redis/src/main/kotlin/me/ahoo/wow/redis/eventsourcing/SnapshotKeyConverter.kt @@ -14,13 +14,28 @@ package me.ahoo.wow.redis.eventsourcing import me.ahoo.wow.api.modeling.AggregateId +import me.ahoo.wow.api.modeling.NamedAggregate +import me.ahoo.wow.modeling.aggregateId import me.ahoo.wow.modeling.toStringWithAlias -import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.wrap +import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.ID_DELIMITER +import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey +import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.unwrap -fun interface SnapshotKeyConverter : AggregateKeyConverter +object DefaultSnapshotKeyConverter : AggregateKeyConverter { + fun NamedAggregate.toKeyPrefix(): String { + return "${toStringWithAlias()}${DELIMITER}snapshot$DELIMITER" + } -object DefaultSnapshotKeyConverter : SnapshotKeyConverter { override fun convert(aggregateId: AggregateId): String { - return "${aggregateId.toStringWithAlias()}:snapshot:${aggregateId.id.wrap()}" + return "${aggregateId.toKeyPrefix()}${aggregateId.toKey()}" + } + + fun toAggregateId(namedAggregate: NamedAggregate, key: String): AggregateId { + val prefix = namedAggregate.toKeyPrefix() + val idWithTenantId = key.removePrefix(prefix).unwrap() + idWithTenantId.split(ID_DELIMITER).let { + require(it.size == 2) { "Invalid key:$key" } + return namedAggregate.aggregateId(it[0], it[1]) + } } } diff --git a/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/DefaultSnapshotKeyConverterTest.kt b/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/DefaultSnapshotKeyConverterTest.kt index f02577a42f0..574aba59d46 100644 --- a/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/DefaultSnapshotKeyConverterTest.kt +++ b/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/DefaultSnapshotKeyConverterTest.kt @@ -25,6 +25,16 @@ class DefaultSnapshotKeyConverterTest { fun convert() { val aggregateId = MOCK_AGGREGATE_METADATA.aggregateId("id", "tenantId") val actual = DefaultSnapshotKeyConverter.convert(aggregateId) - assertThat(actual, equalTo("tck.mock_aggregate:snapshot:{id}")) + assertThat(actual, equalTo("tck.mock_aggregate:snapshot:{id@tenantId}")) + } + + @Test + fun toAggregateId() { + val aggregateId = MOCK_AGGREGATE_METADATA.aggregateId("id", "tenantId") + val actual = DefaultSnapshotKeyConverter.toAggregateId( + MOCK_AGGREGATE_METADATA, + "tck.mock_aggregate:snapshot:{id@tenantId}" + ) + assertThat(actual, equalTo(aggregateId)) } } diff --git a/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverterTest.kt b/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverterTest.kt index 9c0c804eda4..913b7be1d3f 100644 --- a/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverterTest.kt +++ b/wow-redis/src/test/kotlin/me/ahoo/wow/redis/eventsourcing/EventStreamKeyConverterTest.kt @@ -14,6 +14,7 @@ package me.ahoo.wow.redis.eventsourcing import me.ahoo.wow.modeling.aggregateId +import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKeyPrefix import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA import org.hamcrest.MatcherAssert.* @@ -31,16 +32,10 @@ class EventStreamKeyConverterTest { @Test fun toAggregateIdKey() { - val actual = EventStreamKeyConverter.toAggregateIdKey(aggregateId) + val actual = aggregateId.toKey() assertThat(actual, equalTo("{id@tenantId}")) } - @Test - fun toAggregateId() { - val actual = EventStreamKeyConverter.toAggregateId(MOCK_AGGREGATE_METADATA, "{id@tenantId}") - assertThat(actual, equalTo(aggregateId)) - } - @Test fun converter() { val actual = EventStreamKeyConverter.convert(aggregateId)