Skip to content

Commit

Permalink
feat(es): Support AggregateIdScanner to SnapshotRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang committed Nov 11, 2024
1 parent 99ca0e5 commit 7709c88
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,20 @@ package me.ahoo.wow.redis.eventsourcing

import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.modeling.toStringWithAlias
import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.unwrap
import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.wrap

object EventStreamKeyConverter : AggregateKeyConverter {
private const val ID_DELIMITER = "@"
const val ID_DELIMITER = "@"
fun NamedAggregate.toKeyPrefix(): String {
return "${toStringWithAlias()}${DELIMITER}es$DELIMITER"
}

fun toAggregateIdKey(aggregateId: AggregateId): String {
return "${aggregateId.id}$ID_DELIMITER${aggregateId.tenantId}".wrap()
}

fun toAggregateId(namedAggregate: NamedAggregate, key: String): AggregateId {
val prefix = namedAggregate.toKeyPrefix()
val idWithTenantId = key.removePrefix(prefix).unwrap()
idWithTenantId.split(ID_DELIMITER).let {
return namedAggregate.aggregateId(it[0], it[1])
}
fun AggregateId.toKey(): String {
return "${id}$ID_DELIMITER$tenantId".wrap()
}

override fun convert(aggregateId: AggregateId): String {
return "${aggregateId.toKeyPrefix()}${toAggregateIdKey(aggregateId)}"
return "${aggregateId.toKeyPrefix()}${aggregateId.toKey()}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import me.ahoo.wow.eventsourcing.AbstractEventStore
import me.ahoo.wow.eventsourcing.EventVersionConflictException
import me.ahoo.wow.exception.ErrorCodes
import me.ahoo.wow.naming.getContextAlias
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey
import me.ahoo.wow.serialization.toJsonString
import me.ahoo.wow.serialization.toObject
import org.springframework.core.io.ClassPathResource
Expand All @@ -41,7 +42,7 @@ class RedisEventStore(
}

override fun appendStream(eventStream: DomainEventStream): Mono<Void> {
val aggregateKey = EventStreamKeyConverter.toAggregateIdKey(eventStream.aggregateId)
val aggregateKey = eventStream.aggregateId.toKey()
return redisTemplate.execute(
SCRIPT_EVENT_STEAM_APPEND,
listOf(aggregateKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.eventsourcing.snapshot.Snapshot
import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKeyPrefix
import me.ahoo.wow.redis.eventsourcing.DefaultSnapshotKeyConverter.toKeyPrefix
import me.ahoo.wow.serialization.toJsonString
import me.ahoo.wow.serialization.toObject
import org.springframework.data.redis.connection.DataType
import org.springframework.data.redis.core.ReactiveStringRedisTemplate
import org.springframework.data.redis.core.ScanOptions
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

class RedisSnapshotRepository(
private val redisTemplate: ReactiveStringRedisTemplate,
private val keyConverter: SnapshotKeyConverter = DefaultSnapshotKeyConverter
private val keyConverter: AggregateKeyConverter = DefaultSnapshotKeyConverter
) : SnapshotRepository {

override fun <S : Any> load(aggregateId: AggregateId): Mono<Snapshot<S>> {
Expand All @@ -51,10 +52,14 @@ class RedisSnapshotRepository(
): Flux<AggregateId> {
val keyPrefix = namedAggregate.toKeyPrefix()
val keyPattern = "$keyPrefix*"
val options = ScanOptions.scanOptions().match(keyPattern).count(limit.toLong()).build()
val options = ScanOptions.scanOptions().match(keyPattern)
.type(DataType.STRING)
.count(limit.toLong()).build()
return redisTemplate.scan(options)
.map {
EventStreamKeyConverter.toAggregateId(namedAggregate, it)
DefaultSnapshotKeyConverter.toAggregateId(namedAggregate, it)
}.filter {
it.id > cursorId
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,28 @@
package me.ahoo.wow.redis.eventsourcing

import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.modeling.toStringWithAlias
import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.wrap
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.ID_DELIMITER
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey
import me.ahoo.wow.redis.eventsourcing.RedisWrappedKey.unwrap

fun interface SnapshotKeyConverter : AggregateKeyConverter
object DefaultSnapshotKeyConverter : AggregateKeyConverter {
fun NamedAggregate.toKeyPrefix(): String {
return "${toStringWithAlias()}${DELIMITER}snapshot$DELIMITER"
}

object DefaultSnapshotKeyConverter : SnapshotKeyConverter {
override fun convert(aggregateId: AggregateId): String {
return "${aggregateId.toStringWithAlias()}:snapshot:${aggregateId.id.wrap()}"
return "${aggregateId.toKeyPrefix()}${aggregateId.toKey()}"
}

fun toAggregateId(namedAggregate: NamedAggregate, key: String): AggregateId {
val prefix = namedAggregate.toKeyPrefix()
val idWithTenantId = key.removePrefix(prefix).unwrap()
idWithTenantId.split(ID_DELIMITER).let {
require(it.size == 2) { "Invalid key:$key" }
return namedAggregate.aggregateId(it[0], it[1])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ class DefaultSnapshotKeyConverterTest {
fun convert() {
val aggregateId = MOCK_AGGREGATE_METADATA.aggregateId("id", "tenantId")
val actual = DefaultSnapshotKeyConverter.convert(aggregateId)
assertThat(actual, equalTo("tck.mock_aggregate:snapshot:{id}"))
assertThat(actual, equalTo("tck.mock_aggregate:snapshot:{id@tenantId}"))
}

@Test
fun toAggregateId() {
val aggregateId = MOCK_AGGREGATE_METADATA.aggregateId("id", "tenantId")
val actual = DefaultSnapshotKeyConverter.toAggregateId(
MOCK_AGGREGATE_METADATA,
"tck.mock_aggregate:snapshot:{id@tenantId}"
)
assertThat(actual, equalTo(aggregateId))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package me.ahoo.wow.redis.eventsourcing

import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKey
import me.ahoo.wow.redis.eventsourcing.EventStreamKeyConverter.toKeyPrefix
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import org.hamcrest.MatcherAssert.*
Expand All @@ -31,16 +32,10 @@ class EventStreamKeyConverterTest {

@Test
fun toAggregateIdKey() {
val actual = EventStreamKeyConverter.toAggregateIdKey(aggregateId)
val actual = aggregateId.toKey()
assertThat(actual, equalTo("{id@tenantId}"))
}

@Test
fun toAggregateId() {
val actual = EventStreamKeyConverter.toAggregateId(MOCK_AGGREGATE_METADATA, "{id@tenantId}")
assertThat(actual, equalTo(aggregateId))
}

@Test
fun converter() {
val actual = EventStreamKeyConverter.convert(aggregateId)
Expand Down

0 comments on commit 7709c88

Please sign in to comment.