Skip to content

Commit

Permalink
feat(tck): Support SnapshotQueryServiceSpec (#990)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang authored Nov 12, 2024
1 parent 2f9d375 commit b16799c
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 255 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.tck.query

import me.ahoo.wow.eventsourcing.snapshot.SimpleSnapshot
import me.ahoo.wow.eventsourcing.snapshot.Snapshot
import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.id.generateGlobalId
import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory
import me.ahoo.wow.query.dsl.condition
import me.ahoo.wow.query.dsl.listQuery
import me.ahoo.wow.query.dsl.pagedQuery
import me.ahoo.wow.query.dsl.singleQuery
import me.ahoo.wow.query.snapshot.SnapshotQueryService
import me.ahoo.wow.query.snapshot.SnapshotQueryServiceFactory
import me.ahoo.wow.query.snapshot.count
import me.ahoo.wow.query.snapshot.dynamicQuery
import me.ahoo.wow.query.snapshot.query
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import me.ahoo.wow.tck.mock.MockStateAggregate
import org.hamcrest.CoreMatchers.sameInstance
import org.hamcrest.MatcherAssert.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import reactor.kotlin.test.test
import java.time.Clock

abstract class SnapshotQueryServiceSpec {
lateinit var snapshotRepository: SnapshotRepository
lateinit var snapshotQueryServiceFactory: SnapshotQueryServiceFactory
lateinit var snapshotQueryService: SnapshotQueryService<MockStateAggregate>
lateinit var snapshot: Snapshot<MockStateAggregate>

@BeforeEach
open fun setup() {
snapshotRepository = createSnapshotRepository()
snapshotQueryServiceFactory = createSnapshotQueryServiceFactory()
snapshotQueryService = snapshotQueryServiceFactory.create<MockStateAggregate>(MOCK_AGGREGATE_METADATA)
val aggregateId = MOCK_AGGREGATE_METADATA.aggregateId(generateGlobalId())
val stateAggregate =
ConstructorStateAggregateFactory.create(MOCK_AGGREGATE_METADATA.state, aggregateId).block()!!
snapshot =
SimpleSnapshot(stateAggregate, Clock.systemUTC().millis())
snapshotRepository.save(snapshot)
.test()
.verifyComplete()
}

protected abstract fun createSnapshotRepository(): SnapshotRepository
protected abstract fun createSnapshotQueryServiceFactory(): SnapshotQueryServiceFactory

@Test
fun createFromCache() {
val queryService1 = snapshotQueryServiceFactory.create<MockStateAggregate>(MOCK_AGGREGATE_METADATA)
val queryService2 = snapshotQueryServiceFactory.create<MockStateAggregate>(MOCK_AGGREGATE_METADATA)
assertThat(queryService1, sameInstance(queryService2))
}

@Test
fun single() {
singleQuery {
condition {
id(snapshot.aggregateId.id)
}
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicSingle() {
singleQuery {
condition {
id(snapshot.aggregateId.id)
}
projection {
include("contextName")
}
sort {
"version".asc()
}
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun list() {
listQuery {
condition {
id(snapshot.aggregateId.id)
}
limit(10)
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicList() {
listQuery {
condition {
id(snapshot.aggregateId.id)
}
projection {
exclude("firstEventTime")
}
limit(10)
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun paged() {
pagedQuery {
condition {
id(snapshot.aggregateId.id)
}
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicPaged() {
pagedQuery {
condition {
id(snapshot.aggregateId.id)
}
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun count() {
condition {
id(snapshot.aggregateId.id)
}.count(snapshotQueryService)
.test()
.expectNext(1L)
.verifyComplete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,13 @@
package me.ahoo.wow.elasticsearch.query.snapshot

import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.modeling.materialize
import me.ahoo.wow.query.snapshot.AbstractSnapshotQueryServiceFactory
import me.ahoo.wow.query.snapshot.SnapshotQueryService
import me.ahoo.wow.query.snapshot.SnapshotQueryServiceFactory
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import java.util.concurrent.ConcurrentHashMap

class ElasticsearchSnapshotQueryServiceFactory(private val elasticsearchClient: ReactiveElasticsearchClient) :
SnapshotQueryServiceFactory {
private val queryServiceCache = ConcurrentHashMap<NamedAggregate, SnapshotQueryService<*>>()

@Suppress("UNCHECKED_CAST")
override fun <S : Any> create(namedAggregate: NamedAggregate): SnapshotQueryService<S> {
return queryServiceCache.computeIfAbsent(namedAggregate.materialize()) {
createQueryService(it)
} as SnapshotQueryService<S>
}

private fun createQueryService(namedAggregate: NamedAggregate): SnapshotQueryService<*> {
AbstractSnapshotQueryServiceFactory() {
override fun createQueryService(namedAggregate: NamedAggregate): SnapshotQueryService<*> {
return ElasticsearchSnapshotQueryService<Any>(namedAggregate, elasticsearchClient)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,20 @@
package me.ahoo.wow.elasticsearch.query.snapshot

import co.elastic.clients.transport.rest_client.RestClientTransport
import me.ahoo.wow.elasticsearch.TemplateInitializer.initSnapshotTemplate
import me.ahoo.wow.elasticsearch.WowJsonpMapper
import me.ahoo.wow.elasticsearch.eventsourcing.ElasticsearchSnapshotRepository
import me.ahoo.wow.eventsourcing.snapshot.SimpleSnapshot
import me.ahoo.wow.eventsourcing.snapshot.Snapshot
import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.id.generateGlobalId
import me.ahoo.wow.modeling.aggregateId
import me.ahoo.wow.modeling.state.ConstructorStateAggregateFactory
import me.ahoo.wow.query.dsl.condition
import me.ahoo.wow.query.dsl.listQuery
import me.ahoo.wow.query.dsl.pagedQuery
import me.ahoo.wow.query.dsl.singleQuery
import me.ahoo.wow.query.snapshot.SnapshotQueryService
import me.ahoo.wow.query.snapshot.count
import me.ahoo.wow.query.snapshot.dynamicQuery
import me.ahoo.wow.query.snapshot.query
import me.ahoo.wow.query.snapshot.SnapshotQueryServiceFactory
import me.ahoo.wow.tck.container.ElasticsearchLauncher
import me.ahoo.wow.tck.mock.MOCK_AGGREGATE_METADATA
import me.ahoo.wow.tck.mock.MockStateAggregate
import me.ahoo.wow.tck.query.SnapshotQueryServiceSpec
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.data.elasticsearch.client.ClientConfiguration
import org.springframework.data.elasticsearch.client.elc.ElasticsearchClients
import org.springframework.data.elasticsearch.client.elc.ReactiveElasticsearchClient
import reactor.kotlin.test.test
import java.time.Clock

class ElasticsearchSnapshotQueryServiceTest {
class ElasticsearchSnapshotQueryServiceTest : SnapshotQueryServiceSpec() {
companion object {
@JvmStatic
@BeforeAll
Expand All @@ -51,126 +36,27 @@ class ElasticsearchSnapshotQueryServiceTest {
}
}

private val aggregateMetadata = MOCK_AGGREGATE_METADATA
lateinit var snapshotQueryService: SnapshotQueryService<MockStateAggregate>
lateinit var snapshotRepository: SnapshotRepository
lateinit var snapshot: Snapshot<MockStateAggregate>
lateinit var elasticsearchClient: ReactiveElasticsearchClient

@BeforeEach
fun init() {
override fun setup() {
val clientConfiguration = ClientConfiguration.builder()
.connectedTo(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.httpHostAddress)
.usingSsl(ElasticsearchLauncher.ELASTICSEARCH_CONTAINER.createSslContextFromCa())
.withBasicAuth("elastic", ElasticsearchLauncher.ELASTIC_PWD)
.build()
val restClient = ElasticsearchClients.getRestClient(clientConfiguration)
val transport = RestClientTransport(restClient, WowJsonpMapper)
val elasticsearchClient = ReactiveElasticsearchClient(transport)
val factory = ElasticsearchSnapshotQueryServiceFactory(
elasticsearchClient = elasticsearchClient
)
snapshotQueryService = factory.create(aggregateMetadata)
snapshotRepository = ElasticsearchSnapshotRepository(
elasticsearchClient = elasticsearchClient
)

val aggregateId = aggregateMetadata.aggregateId(generateGlobalId())
val stateAggregate = ConstructorStateAggregateFactory.create(aggregateMetadata.state, aggregateId).block()!!
snapshot =
SimpleSnapshot(stateAggregate, Clock.systemUTC().millis())
snapshotRepository.save(snapshot)
.test()
.verifyComplete()
}

@Test
fun single() {
singleQuery {
condition {
id(snapshot.aggregateId.id)
}
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicSingle() {
singleQuery {
condition {
id(snapshot.aggregateId.id)
}
projection {
include("contextName")
exclude("firstEventTime")
}
sort {
"version".asc()
}
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun list() {
listQuery {
condition {
id(snapshot.aggregateId.id)
}
limit(10)
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun dynamicList() {
listQuery {
condition {
id(snapshot.aggregateId.id)
}
limit(10)
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
}

@Test
fun paged() {
pagedQuery {
condition {
id(snapshot.aggregateId.id)
}
}.query(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
elasticsearchClient = ReactiveElasticsearchClient(transport)
elasticsearchClient.initSnapshotTemplate()
super.setup()
}

@Test
fun dynamicPaged() {
pagedQuery {
condition {
id(snapshot.aggregateId.id)
}
}.dynamicQuery(snapshotQueryService)
.test()
.expectNextCount(1)
.verifyComplete()
override fun createSnapshotQueryServiceFactory(): SnapshotQueryServiceFactory {
return ElasticsearchSnapshotQueryServiceFactory(elasticsearchClient)
}

@Test
fun count() {
condition {
id(snapshot.aggregateId.id)
}.count(snapshotQueryService)
.test()
.expectNext(1L)
.verifyComplete()
override fun createSnapshotRepository(): SnapshotRepository {
return ElasticsearchSnapshotRepository(elasticsearchClient)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,11 @@ import com.mongodb.reactivestreams.client.MongoDatabase
import me.ahoo.wow.api.modeling.NamedAggregate
import me.ahoo.wow.modeling.materialize
import me.ahoo.wow.mongo.AggregateSchemaInitializer.toSnapshotCollectionName
import me.ahoo.wow.query.snapshot.AbstractSnapshotQueryServiceFactory
import me.ahoo.wow.query.snapshot.SnapshotQueryService
import me.ahoo.wow.query.snapshot.SnapshotQueryServiceFactory
import java.util.concurrent.ConcurrentHashMap

class MongoSnapshotQueryServiceFactory(private val database: MongoDatabase) : SnapshotQueryServiceFactory {
private val queryServiceCache = ConcurrentHashMap<NamedAggregate, SnapshotQueryService<*>>()

@Suppress("UNCHECKED_CAST")
override fun <S : Any> create(namedAggregate: NamedAggregate): SnapshotQueryService<S> {
return queryServiceCache.computeIfAbsent(namedAggregate.materialize()) {
createQueryService(it)
} as SnapshotQueryService<S>
}

private fun createQueryService(namedAggregate: NamedAggregate): SnapshotQueryService<*> {
class MongoSnapshotQueryServiceFactory(private val database: MongoDatabase) : AbstractSnapshotQueryServiceFactory() {
override fun createQueryService(namedAggregate: NamedAggregate): SnapshotQueryService<*> {
val collectionName = namedAggregate.toSnapshotCollectionName()
val collection = database.getCollection(collectionName)
return MongoSnapshotQueryService<Any>(namedAggregate.materialize(), collection)
Expand Down
Loading

0 comments on commit b16799c

Please sign in to comment.