Skip to content

Commit

Permalink
feat(es): Support ElasticsearchEventStreamQueryService
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang committed Nov 11, 2024
1 parent 270ca9b commit c065034
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 77 deletions.
1 change: 1 addition & 0 deletions test/wow-tck/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ description = "The Technology Compatibility Kit"

dependencies {
api(project(":wow-core"))
api(project(":wow-query"))
api("io.projectreactor:reactor-test")
api("me.ahoo.cosid:cosid-test")
api("org.hamcrest:hamcrest")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.tck.query

import me.ahoo.wow.eventsourcing.EventStore
import me.ahoo.wow.id.generateGlobalId
import me.ahoo.wow.metrics.Metrics.metrizable
import me.ahoo.wow.modeling.MaterializedNamedAggregate
import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.query.dsl.listQuery
import me.ahoo.wow.query.event.EventStreamQueryService
import me.ahoo.wow.query.event.EventStreamQueryServiceFactory
import me.ahoo.wow.query.event.dynamicQuery
import me.ahoo.wow.query.event.query
import me.ahoo.wow.tck.event.MockDomainEventStreams.generateEventStream
import org.hamcrest.CoreMatchers.sameInstance
import org.hamcrest.MatcherAssert.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import reactor.kotlin.test.test

abstract class EventStreamQueryServiceSpec {
val namedAggregate = MaterializedNamedAggregate("tck", "event-stream-query-spec")
lateinit var eventStore: EventStore
lateinit var eventStreamQueryServiceFactory: EventStreamQueryServiceFactory
lateinit var eventStreamQueryService: EventStreamQueryService

@BeforeEach
open fun setup() {
eventStore = createEventStore().metrizable()
eventStreamQueryServiceFactory = createEventStreamQueryServiceFactory()
eventStreamQueryService = eventStreamQueryServiceFactory.create(namedAggregate)
}

protected abstract fun createEventStore(): EventStore
protected abstract fun createEventStreamQueryServiceFactory(): EventStreamQueryServiceFactory

@Test
fun createFromCache() {
val queryService1 = eventStreamQueryServiceFactory.create(namedAggregate)
val queryService2 = eventStreamQueryServiceFactory.create(namedAggregate)
assertThat(queryService1, sameInstance(queryService2))
}

@Test
fun list() {
val eventStream = generateEventStream(namedAggregate.aggregateId(tenantId = generateGlobalId()))
eventStore.append(eventStream).block()
listQuery {
condition {
tenantId(eventStream.aggregateId.tenantId)
}
}.query(eventStreamQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicList() {
val eventStream = generateEventStream(namedAggregate.aggregateId(tenantId = generateGlobalId()))
eventStore.append(eventStream).block()
listQuery {
condition {
tenantId(eventStream.aggregateId.tenantId)
}
}.dynamicQuery(eventStreamQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,4 @@ object IndexNameConverter {
fun NamedAggregate.toEventStreamIndexName(): String {
return "${Wow.WOW_PREFIX}${this.toStringWithAlias()}$EVENT_STREAM_SUFFIX"
}

fun NamedAggregate.toAggregateIdIndexName(): String {
return "${Wow.WOW_PREFIX}${this.toStringWithAlias()}$AGGREGATE_ID_SUFFIX"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.elasticsearch.query.event

import co.elastic.clients.elasticsearch.core.SearchRequest
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.api.query.DynamicDocument
import me.ahoo.wow.api.query.IListQuery
import me.ahoo.wow.api.query.SimpleDynamicDocument.Companion.toDynamicDocument
import me.ahoo.wow.elasticsearch.IndexNameConverter.toEventStreamIndexName
import me.ahoo.wow.elasticsearch.query.ElasticsearchConditionConverter.toQuery
import me.ahoo.wow.elasticsearch.query.ElasticsearchProjectionConverter.toSourceFilter
import me.ahoo.wow.elasticsearch.query.ElasticsearchSortConverter.toSortOptions
import me.ahoo.wow.event.DomainEventStream
import me.ahoo.wow.query.event.EventStreamQueryService
import me.ahoo.wow.serialization.toJsonString
import me.ahoo.wow.serialization.toObject
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import reactor.core.publisher.Flux

class ElasticsearchEventStreamQueryService(
override val namedAggregate: NamedAggregate,
private val elasticsearchClient: ReactiveElasticsearchClient
) : EventStreamQueryService {
private val eventStreamIndexName = namedAggregate.toEventStreamIndexName()
override fun list(listQuery: IListQuery): Flux<DomainEventStream> {
return dynamicList(listQuery)
.map { it.toJsonString().toObject<DomainEventStream>() }
}

@Suppress("UNCHECKED_CAST")
override fun dynamicList(listQuery: IListQuery): Flux<DynamicDocument> {
val searchRequest = SearchRequest.of {
it.index(eventStreamIndexName)
.query(listQuery.condition.toQuery())
.size(listQuery.limit)

if (listQuery.sort.isNotEmpty()) {
it.sort(listQuery.sort.toSortOptions())
}
if (!listQuery.projection.isEmpty()) {
it.source {
it.filter(listQuery.projection.toSourceFilter())
}
}
it
}
return elasticsearchClient.search(searchRequest, Map::class.java)
.flatMapIterable { result ->
result.hits()?.hits()?.map { hit ->
hit.source()?.let {
(it as Map<String, Any>).toDynamicDocument()
}
} as List<DynamicDocument>? ?: emptyList()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.elasticsearch.query.event

import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.query.event.AbstractEventStreamQueryServiceFactory
import me.ahoo.wow.query.event.EventStreamQueryService
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient

class ElasticsearchEventStreamQueryServiceFactory(private val elasticsearchClient: ReactiveElasticsearchClient) :
AbstractEventStreamQueryServiceFactory() {
override fun createQueryService(namedAggregate: NamedAggregate): EventStreamQueryService {
return ElasticsearchEventStreamQueryService(namedAggregate, elasticsearchClient)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.elasticsearch

import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext

object TemplateInitializer {

fun ReactiveElasticsearchClient.createElasticsearchTemplate(): ReactiveElasticsearchTemplate {
val mappingContext = SimpleElasticsearchMappingContext()
val converter = MappingElasticsearchConverter(mappingContext)
converter.setConversions(ElasticsearchCustomConversions(emptyList<Any>()))
return ReactiveElasticsearchTemplate(this, converter)
}

fun ReactiveElasticsearchClient.initEventStreamTemplate() {
val elasticsearchTemplate = createElasticsearchTemplate()
IndexTemplateInitializer(elasticsearchTemplate).initEventStreamTemplate().block()
}

fun ReactiveElasticsearchClient.initSnapshotTemplate() {
val elasticsearchTemplate = createElasticsearchTemplate()
IndexTemplateInitializer(elasticsearchTemplate).initSnapshotTemplate().block()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package me.ahoo.wow.elasticsearch.eventsourcing

import co.elastic.clients.transport.rest_client.RestClientTransport
import me.ahoo.wow.elasticsearch.IndexTemplateInitializer
import me.ahoo.wow.elasticsearch.TemplateInitializer.initEventStreamTemplate
import me.ahoo.wow.elasticsearch.WowJsonpMapper
import me.ahoo.wow.eventsourcing.EventStore
import me.ahoo.wow.tck.container.ElasticsearchLauncher
Expand All @@ -24,10 +24,6 @@ import org.junit.jupiter.api.BeforeAll
import org.springframework.data.elasticsearch.client.ClientConfiguration
import org.springframework.data.elasticsearch.client.elc.ElasticsearchClients
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext
import java.time.Duration

class ElasticsearchEventStoreTest : EventStoreSpec() {
Expand All @@ -39,14 +35,6 @@ class ElasticsearchEventStoreTest : EventStoreSpec() {
}
}

private fun initTemplate(elasticsearchClient: ReactiveElasticsearchClient) {
val mappingContext = SimpleElasticsearchMappingContext()
val converter = MappingElasticsearchConverter(mappingContext)
converter.setConversions(ElasticsearchCustomConversions(emptyList<Any>()))
val elasticsearchTemplate = ReactiveElasticsearchTemplate(elasticsearchClient, converter)
IndexTemplateInitializer(elasticsearchTemplate).initEventStreamTemplate().block()
}

override fun createEventStore(): EventStore {
val clientConfiguration = ClientConfiguration.builder()
.connectedTo(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.httpHostAddress)
Expand All @@ -58,7 +46,7 @@ class ElasticsearchEventStoreTest : EventStoreSpec() {
val restClient = ElasticsearchClients.getRestClient(clientConfiguration)
val transport = RestClientTransport(restClient, WowJsonpMapper)
val elasticsearchClient = ReactiveElasticsearchClient(transport)
initTemplate(elasticsearchClient)
elasticsearchClient.initEventStreamTemplate()
return ElasticsearchEventStore(
elasticsearchClient = elasticsearchClient
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package me.ahoo.wow.elasticsearch.eventsourcing

import co.elastic.clients.transport.rest_client.RestClientTransport
import me.ahoo.wow.elasticsearch.IndexTemplateInitializer
import me.ahoo.wow.elasticsearch.TemplateInitializer.initSnapshotTemplate
import me.ahoo.wow.elasticsearch.WowJsonpMapper
import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.tck.container.ElasticsearchLauncher
Expand All @@ -23,10 +23,6 @@ import org.junit.jupiter.api.BeforeAll
import org.springframework.data.elasticsearch.client.ClientConfiguration
import org.springframework.data.elasticsearch.client.elc.ElasticsearchClients
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchTemplate
import org.springframework.data.elasticsearch.core.convert.ElasticsearchCustomConversions
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter
import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext

internal class ElasticsearchSnapshotRepositoryTest : SnapshotRepositorySpec() {
companion object {
Expand All @@ -37,14 +33,6 @@ internal class ElasticsearchSnapshotRepositoryTest : SnapshotRepositorySpec() {
}
}

private fun initTemplate(elasticsearchClient: ReactiveElasticsearchClient) {
val mappingContext = SimpleElasticsearchMappingContext()
val converter = MappingElasticsearchConverter(mappingContext)
converter.setConversions(ElasticsearchCustomConversions(emptyList<Any>()))
val elasticsearchTemplate = ReactiveElasticsearchTemplate(elasticsearchClient, converter)
IndexTemplateInitializer(elasticsearchTemplate).initSnapshotTemplate().block()
}

override fun createSnapshotRepository(): SnapshotRepository {
val clientConfiguration = ClientConfiguration.builder()
.connectedTo(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.httpHostAddress)
Expand All @@ -54,7 +42,7 @@ internal class ElasticsearchSnapshotRepositoryTest : SnapshotRepositorySpec() {
val restClient = ElasticsearchClients.getRestClient(clientConfiguration)
val transport = RestClientTransport(restClient, WowJsonpMapper)
val elasticsearchClient = ReactiveElasticsearchClient(transport)
initTemplate(elasticsearchClient)
elasticsearchClient.initSnapshotTemplate()
return ElasticsearchSnapshotRepository(
elasticsearchClient = elasticsearchClient
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.elasticsearch.query.event

import co.elastic.clients.transport.rest_client.RestClientTransport
import me.ahoo.wow.elasticsearch.TemplateInitializer.initEventStreamTemplate
import me.ahoo.wow.elasticsearch.WowJsonpMapper
import me.ahoo.wow.elasticsearch.eventsourcing.ElasticsearchEventStore
import me.ahoo.wow.eventsourcing.EventStore
import me.ahoo.wow.query.event.EventStreamQueryServiceFactory
import me.ahoo.wow.tck.container.ElasticsearchLauncher
import me.ahoo.wow.tck.query.EventStreamQueryServiceSpec
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.springframework.data.elasticsearch.client.ClientConfiguration
import org.springframework.data.elasticsearch.client.elc.ElasticsearchClients
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient

class ElasticsearchEventStreamQueryServiceTest : EventStreamQueryServiceSpec() {
companion object {
@JvmStatic
@BeforeAll
fun waitLauncher() {
ElasticsearchLauncher.isRunning
}
}

lateinit var elasticsearchClient: ReactiveElasticsearchClient

@BeforeEach
override fun setup() {
val clientConfiguration = ClientConfiguration.builder()
.connectedTo(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.httpHostAddress)
.usingSsl(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.createSslContextFromCa())
.withBasicAuth("elastic", ElasticsearchLauncher.ELASTIC_PWD)
.build()
val restClient = ElasticsearchClients.getRestClient(clientConfiguration)
val transport = RestClientTransport(restClient, WowJsonpMapper)
elasticsearchClient = ReactiveElasticsearchClient(transport)
elasticsearchClient.initEventStreamTemplate()
super.setup()
}

override fun createEventStore(): EventStore {
return ElasticsearchEventStore(elasticsearchClient)
}

override fun createEventStreamQueryServiceFactory(): EventStreamQueryServiceFactory {
return ElasticsearchEventStreamQueryServiceFactory(elasticsearchClient)
}
}
Loading

0 comments on commit c065034

Please sign in to comment.