Skip to content

Commit

Permalink
feat(es): Support ElasticsearchEventStore (#983)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang authored Nov 11, 2024
1 parent defb5ef commit b2c4716
Show file tree
Hide file tree
Showing 17 changed files with 409 additions and 51 deletions.
5 changes: 4 additions & 1 deletion deploy/example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ data:
- org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration
- org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
- org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration
- org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchClientAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ReactiveElasticsearchClientAutoConfiguration
logging:
level:
me.ahoo.wow: warn
Expand Down
7 changes: 6 additions & 1 deletion deploy/example/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ spec:
annotations:
instrumentation.opentelemetry.io/inject-java: "true"
spec:
tolerations:
- key: "test"
operator: "Equal"
value: service
effect: "NoSchedule"
containers:
- name: wow-example
image: registry.cn-shanghai.aliyuncs.com/ahoo/wow-example-server:3.5.3
image: registry.cn-shanghai.aliyuncs.com/ahoo/wow-example-server:3.15.5
env:
- name: LANG
value: C.utf8
Expand Down
6 changes: 5 additions & 1 deletion example/example-server/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ spring:
application:
name: example-service
autoconfigure:
exclude: org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration
exclude:
- org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchClientAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration
- org.springframework.boot.autoconfigure.elasticsearch.ReactiveElasticsearchClientAutoConfiguration
data:
mongodb:
uri: mongodb://root:root@localhost:27017/wow_example_db?authSource=admin&maxIdleTimeMS=60000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ abstract class EventStoreSpec {
}

companion object {
const val TIMES = 4000
const val DEFAULT_PARALLELISM = 16
const val TIMES = 1000
const val DEFAULT_PARALLELISM = 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@ import me.ahoo.wow.api.Wow
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.modeling.toStringWithAlias

interface SnapshotIndexNameConverter {
fun convert(namedAggregate: NamedAggregate): String
}
object IndexNameConverter {
const val SNAPSHOT_SUFFIX = ".snapshot"
const val EVENT_STREAM_SUFFIX = ".es"
const val AGGREGATE_ID_SUFFIX = ".id"

fun NamedAggregate.toSnapshotIndexName(): String {
return "${Wow.WOW_PREFIX}${this.toStringWithAlias()}$SNAPSHOT_SUFFIX"
}

fun NamedAggregate.toEventStreamIndexName(): String {
return "${Wow.WOW_PREFIX}${this.toStringWithAlias()}$EVENT_STREAM_SUFFIX"
}

object DefaultSnapshotIndexNameConverter : SnapshotIndexNameConverter {
override fun convert(namedAggregate: NamedAggregate): String {
return "${Wow.WOW_PREFIX}${namedAggregate.toStringWithAlias()}.snapshot"
fun NamedAggregate.toAggregateIdIndexName(): String {
return "${Wow.WOW_PREFIX}${this.toStringWithAlias()}$AGGREGATE_ID_SUFFIX"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ package me.ahoo.wow.elasticsearch

import me.ahoo.wow.serialization.JsonSerializer

val SnapshotJsonpMapper = co.elastic.clients.json.jackson.JacksonJsonpMapper(JsonSerializer)
val WowJsonpMapper = co.elastic.clients.json.jackson.JacksonJsonpMapper(JsonSerializer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.elasticsearch.eventsourcing

import co.elastic.clients.elasticsearch._types.OpType
import co.elastic.clients.elasticsearch._types.Refresh
import co.elastic.clients.elasticsearch._types.SortOrder
import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.elasticsearch.IndexNameConverter.toEventStreamIndexName
import me.ahoo.wow.elasticsearch.query.ElasticsearchConditionConverter.toQuery
import me.ahoo.wow.elasticsearch.query.ElasticsearchSortConverter.toSortOptions
import me.ahoo.wow.event.DomainEventStream
import me.ahoo.wow.eventsourcing.AbstractEventStore
import me.ahoo.wow.eventsourcing.AggregateIdScanner.Companion.FIRST_CURSOR_ID
import me.ahoo.wow.eventsourcing.EventVersionConflictException
import me.ahoo.wow.query.dsl.condition
import me.ahoo.wow.query.dsl.sort
import me.ahoo.wow.serialization.MessageRecords
import org.elasticsearch.client.ResponseException
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono

class ElasticsearchEventStore(
private val elasticsearchClient: ReactiveElasticsearchClient,
private val refreshPolicy: Refresh = Refresh.True
) : AbstractEventStore() {
companion object {
private const val VERSION_CONFLICT_CODE = 409
private const val DEFAULT_BATCH_SIZE = 100
}

private fun DomainEventStream.toDocId(): String {
return "${this.aggregateId.id}-${this.version}"
}

override fun appendStream(eventStream: DomainEventStream): Mono<Void> {
return elasticsearchClient.index {
it.index(eventStream.aggregateId.toEventStreamIndexName())
.id(eventStream.toDocId())
.document(eventStream)
.opType(OpType.Create)
.refresh(refreshPolicy)
}.onErrorResume {
if (it is ResponseException && it.response.statusLine.statusCode == VERSION_CONFLICT_CODE) {
return@onErrorResume EventVersionConflictException(
eventStream = eventStream,
cause = it,
).toMono()
}
Mono.error(it)
}.then()
}

override fun loadStream(
aggregateId: AggregateId,
headVersion: Int,
tailVersion: Int
): Flux<DomainEventStream> {
return loopStream(aggregateId, headVersion, tailVersion)
}

private fun loopStream(
aggregateId: AggregateId,
headVersion: Int,
tailVersion: Int
): Flux<DomainEventStream> {
var endVersion = headVersion + DEFAULT_BATCH_SIZE - 1
if (tailVersion < endVersion) {
endVersion = tailVersion
}
return findEventStream(aggregateId, headVersion, endVersion).flatMapMany {
val previousStreams = Flux.fromIterable(it)
val requestSize = endVersion - headVersion + 1
if (it.size < requestSize) {
return@flatMapMany previousStreams
}
val lastVersion = it.last().version
if (lastVersion >= tailVersion) {
return@flatMapMany previousStreams
}
val nextStreams = loopStream(aggregateId, lastVersion + 1, tailVersion)
return@flatMapMany previousStreams.concatWith(nextStreams)
}
}

private fun findEventStream(
aggregateId: AggregateId,
headVersion: Int,
tailVersion: Int
): Mono<List<DomainEventStream>> {
val query = condition {
tenantId(aggregateId.tenantId)
MessageRecords.AGGREGATE_ID eq aggregateId.id
MessageRecords.VERSION between headVersion to tailVersion
}.toQuery()
val sort = sort { MessageRecords.VERSION.asc() }.toSortOptions()
return elasticsearchClient.search<DomainEventStream>({
it.index(aggregateId.toEventStreamIndexName())
.query(query)
.size(DEFAULT_BATCH_SIZE)
.sort(sort)
}, DomainEventStream::class.java).map<List<DomainEventStream>> {
it.hits().hits().map { hit -> hit.source() as DomainEventStream }
}
}

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
limit: Int
): Flux<AggregateId> {
TODO("Not yet implemented")
}

override fun tailCursorId(namedAggregate: NamedAggregate): Mono<String> {
return elasticsearchClient.search({ searchBuilder ->
searchBuilder.index(namedAggregate.toEventStreamIndexName())
.size(1)
.sort {
it.field {
it.field(MessageRecords.AGGREGATE_ID).order(SortOrder.Desc)
}
}
.source {
it.fetch(false)
}
}, Map::class.java).map<String> {
val hit = it.hits().hits().firstOrNull() ?: return@map FIRST_CURSOR_ID
hit.sort().first().stringValue()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package me.ahoo.wow.elasticsearch
package me.ahoo.wow.elasticsearch.eventsourcing

import co.elastic.clients.elasticsearch._types.ElasticsearchException
import co.elastic.clients.elasticsearch._types.Refresh
import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.elasticsearch.IndexNameConverter.toSnapshotIndexName
import me.ahoo.wow.eventsourcing.snapshot.Snapshot
import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import org.springframework.data.elasticsearch.RestStatusException
Expand All @@ -24,31 +24,26 @@ import reactor.core.publisher.Mono

class ElasticsearchSnapshotRepository(
private val elasticsearchClient: ReactiveElasticsearchClient,
private val snapshotIndexNameConverter: SnapshotIndexNameConverter = DefaultSnapshotIndexNameConverter,
private val refreshPolicy: Refresh = Refresh.WaitFor
private val refreshPolicy: Refresh = Refresh.True
) : SnapshotRepository {
companion object {
private const val NOT_FOUND_STATUS = 404
}

private fun NamedAggregate.toIndexName(): String {
return snapshotIndexNameConverter.convert(namedAggregate = this)
private const val NOT_FOUND_CODE = 404
}

@Suppress("UNCHECKED_CAST")
override fun <S : Any> load(aggregateId: AggregateId): Mono<Snapshot<S>> {
return elasticsearchClient.get({
it.index(aggregateId.toIndexName())
it.index(aggregateId.toSnapshotIndexName())
.id(aggregateId.id)
}, Snapshot::class.java)
.mapNotNull<Snapshot<S>> {
it.source() as Snapshot<S>?
}
.onErrorResume {
if (it is RestStatusException && it.status == NOT_FOUND_STATUS) {
if (it is RestStatusException && it.status == NOT_FOUND_CODE) {
return@onErrorResume Mono.empty()
}
if (it is ElasticsearchException && it.response().status() == NOT_FOUND_STATUS) {
if (it is ElasticsearchException && it.response().status() == NOT_FOUND_CODE) {
return@onErrorResume Mono.empty()
}
Mono.error(it)
Expand All @@ -57,7 +52,7 @@ class ElasticsearchSnapshotRepository(

override fun <S : Any> save(snapshot: Snapshot<S>): Mono<Void> {
return elasticsearchClient.index {
it.index(snapshot.aggregateId.toIndexName())
it.index(snapshot.aggregateId.toSnapshotIndexName())
.id(snapshot.aggregateId.id)
.document(snapshot)
.refresh(refreshPolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import me.ahoo.wow.api.query.PagedQuery
import me.ahoo.wow.api.query.Pagination
import me.ahoo.wow.api.query.SimpleDynamicDocument.Companion.toDynamicDocument
import me.ahoo.wow.configuration.requiredAggregateType
import me.ahoo.wow.elasticsearch.DefaultSnapshotIndexNameConverter
import me.ahoo.wow.elasticsearch.SnapshotIndexNameConverter
import me.ahoo.wow.elasticsearch.IndexNameConverter.toSnapshotIndexName
import me.ahoo.wow.elasticsearch.query.ElasticsearchConditionConverter.toQuery
import me.ahoo.wow.elasticsearch.query.ElasticsearchProjectionConverter.toSourceFilter
import me.ahoo.wow.elasticsearch.query.ElasticsearchSortConverter.toSortOptions
Expand All @@ -43,10 +42,9 @@ import reactor.core.publisher.Mono

class ElasticsearchSnapshotQueryService<S : Any>(
override val namedAggregate: NamedAggregate,
private val elasticsearchClient: ReactiveElasticsearchClient,
private val snapshotIndexNameConverter: SnapshotIndexNameConverter = DefaultSnapshotIndexNameConverter
private val elasticsearchClient: ReactiveElasticsearchClient
) : SnapshotQueryService<S> {
private val snapshotIndexName = snapshotIndexNameConverter.convert(namedAggregate)
private val snapshotIndexName = namedAggregate.toSnapshotIndexName()
private val snapshotType = TypeFactory.defaultInstance()
.constructParametricType(
MaterializedSnapshot::class.java,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"index_patterns": [
"wow.*.id"
],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 2
},
"mappings": {
"properties": {
"tenantId": {
"type": "keyword"
}
},
"dynamic_templates": [
{
"string_as_keyword": {
"match_mapping_type": "string",
"mapping": {
"type": "keyword"
}
}
}
]
}
}
}
Loading

0 comments on commit b2c4716

Please sign in to comment.