Skip to content

Commit

Permalink
feat(openapi): Enhance batch processing operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Ahoo-Wang committed Nov 17, 2024
1 parent 95e8330 commit b68461c
Show file tree
Hide file tree
Showing 28 changed files with 114 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DelaySnapshotRepository(
return delegate.save(snapshot).delaySubscription(delaySupplier())
}

override fun scanAggregateId(namedAggregate: NamedAggregate, cursorId: String, limit: Int): Flux<AggregateId> {
return delegate.scanAggregateId(namedAggregate, cursorId, limit).delaySubscription(delaySupplier())
override fun scanAggregateId(namedAggregate: NamedAggregate, afterId: String, limit: Int): Flux<AggregateId> {
return delegate.scanAggregateId(namedAggregate, afterId, limit).delaySubscription(delaySupplier())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ abstract class SnapshotRepositorySpec {
.test()
.verifyComplete()

snapshotRepository.scanAggregateId(snapshot.aggregateId, cursorId = cursorId, limit = 1)
snapshotRepository.scanAggregateId(snapshot.aggregateId, afterId = cursorId, limit = 1)
.test()
.expectNextCount(1)
.verifyComplete()
snapshotRepository.scanAggregateId(snapshot.aggregateId, cursorId = snapshot.aggregateId.id, limit = 1)
snapshotRepository.scanAggregateId(snapshot.aggregateId, afterId = snapshot.aggregateId.id, limit = 1)
.test()
.expectNextCount(0)
.verifyComplete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import reactor.core.publisher.Flux

interface AggregateIdScanner {
companion object {
const val FIRST_CURSOR_ID = "(0)"
const val FIRST_ID = "(0)"
}

fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String = FIRST_CURSOR_ID,
afterId: String = FIRST_ID,
limit: Int = 10
): Flux<AggregateId>
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class InMemorySnapshotRepository : SnapshotRepository {
}
}

override fun scanAggregateId(namedAggregate: NamedAggregate, cursorId: String, limit: Int): Flux<AggregateId> {
override fun scanAggregateId(namedAggregate: NamedAggregate, afterId: String, limit: Int): Flux<AggregateId> {
return aggregateIdMapSnapshot.keys.sortedBy { it.id }.toFlux()
.filter {
it.id > cursorId
it.id > afterId
}
.take(limit.toLong())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object NoOpSnapshotRepository : SnapshotRepository {

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
return Flux.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ class MetricSnapshotRepository(delegate: SnapshotRepository) :

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
return delegate.scanAggregateId(namedAggregate, cursorId, limit)
return delegate.scanAggregateId(namedAggregate, afterId, limit)
.name(Wow.WOW_PREFIX + "snapshot.save")
.tagSource()
.tag(Metrics.AGGREGATE_KEY, namedAggregate.aggregateName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ internal class SnapshotDispatcherTest {

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
return inMemorySnapshotRepository.scanAggregateId(namedAggregate, cursorId, limit)
return inMemorySnapshotRepository.scanAggregateId(namedAggregate, afterId, limit)
}
}
val snapshotStrategy = SimpleSnapshotStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ class ElasticsearchSnapshotRepository(

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
return elasticsearchClient.search({
it.index(namedAggregate.toSnapshotIndexName())
.query {
it.range {
it.field(MessageRecords.AGGREGATE_ID)
.gt(JsonData.of(cursorId))
.gt(JsonData.of(afterId))
}
}
.source {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class MongoSnapshotRepository(private val database: MongoDatabase) : SnapshotRep

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
val snapshotCollectionName = namedAggregate.toSnapshotCollectionName()
return database.getCollection(snapshotCollectionName)
.find(Filters.gt(Documents.ID_FIELD, cursorId))
.find(Filters.gt(Documents.ID_FIELD, afterId))
.projection(Projections.include(MessageRecords.TENANT_ID))
.limit(limit)
.batchSize(limit)
Expand Down
16 changes: 13 additions & 3 deletions wow-openapi/src/main/kotlin/me/ahoo/wow/openapi/BatchResult.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,17 @@

package me.ahoo.wow.openapi

import me.ahoo.wow.api.exception.ErrorInfo

/**
* The result of a batch operation.
*
* @param afterId The ID of the last record successfully executed in batch processing.
* @param size Number of records successfully processed in the batch.
*/
data class BatchResult(
val cursorId: String,
val size: Int
)
val afterId: String,
val size: Int,
override val errorCode: String = ErrorInfo.SUCCEEDED,
override val errorMsg: String = ErrorInfo.SUCCEEDED_MESSAGE
) : ErrorInfo
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import me.ahoo.wow.api.Wow
import me.ahoo.wow.openapi.BatchRouteSpecFactory.Companion.BATCH_RESULT_RESPONSE
import me.ahoo.wow.openapi.ResponseRef.Companion.toResponse
import me.ahoo.wow.openapi.ResponseRef.Companion.with
import me.ahoo.wow.openapi.RoutePaths.BATCH_CURSOR_ID_PARAMETER
import me.ahoo.wow.openapi.RoutePaths.BATCH_AFTER_ID_PARAMETER
import me.ahoo.wow.openapi.RoutePaths.BATCH_LIMIT_PARAMETER

interface BatchRouteSpec : AggregateRouteSpec {
Expand All @@ -31,7 +31,7 @@ interface BatchRouteSpec : AggregateRouteSpec {

override val parameters: List<Parameter>
get() = super.parameters + listOf(
BATCH_CURSOR_ID_PARAMETER.ref,
BATCH_AFTER_ID_PARAMETER.ref,
BATCH_LIMIT_PARAMETER.ref
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import me.ahoo.wow.openapi.ResponseRef.Companion.withBadRequest
import me.ahoo.wow.openapi.ResponseRef.Companion.withNotFound
import me.ahoo.wow.openapi.ResponseRef.Companion.withRequestTimeout
import me.ahoo.wow.openapi.ResponseRef.Companion.withTooManyRequests
import me.ahoo.wow.openapi.RoutePaths.BATCH_CURSOR_ID_PARAMETER
import me.ahoo.wow.openapi.RoutePaths.BATCH_AFTER_ID_PARAMETER
import me.ahoo.wow.openapi.RoutePaths.BATCH_LIMIT_PARAMETER
import me.ahoo.wow.openapi.RoutePaths.HEAD_VERSION
import me.ahoo.wow.openapi.RoutePaths.TAIL_VERSION
Expand All @@ -47,7 +47,7 @@ class DefaultGlobalRouteSpecFactory : GlobalRouteSpecFactory {
components.parameters
.with(HEAD_VERSION)
.with(TAIL_VERSION)
.with(BATCH_CURSOR_ID_PARAMETER)
.with(BATCH_AFTER_ID_PARAMETER)
.with(BATCH_LIMIT_PARAMETER)
components.responses
.withBadRequest()
Expand Down
12 changes: 6 additions & 6 deletions wow-openapi/src/main/kotlin/me/ahoo/wow/openapi/RoutePaths.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object RoutePaths {
const val HEAD_VERSION_KEY = "headVersion"
const val TAIL_VERSION_KEY = "tailVersion"

const val BATCH_CURSOR_ID = "cursorId"
const val BATCH_AFTER_ID = "afterId"
const val BATCH_LIMIT = "limit"

val VERSION: Parameter = Parameter()
Expand All @@ -52,13 +52,13 @@ object RoutePaths {
.example(Int.MAX_VALUE).let {
ParameterRef("${Wow.WOW_PREFIX}$TAIL_VERSION_KEY", it)
}
val BATCH_CURSOR_ID_PARAMETER = Parameter()
.name(BATCH_CURSOR_ID)
val BATCH_AFTER_ID_PARAMETER = Parameter()
.name(BATCH_AFTER_ID)
.`in`(ParameterIn.PATH.toString())
.schema(StringSchema())
.example(AggregateIdScanner.FIRST_CURSOR_ID)
.description("The cursor id of batch.").let {
ParameterRef("${Wow.WOW_PREFIX}$BATCH_CURSOR_ID", it)
.example(AggregateIdScanner.FIRST_ID)
.description("The ID of the last record in the batch.").let {
ParameterRef("${Wow.WOW_PREFIX}$BATCH_AFTER_ID", it)
}
val BATCH_LIMIT_PARAMETER = Parameter()
.name(BATCH_LIMIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ResendStateEventRouteSpec(
override val summary: String
get() = "Resend State Event"
override val appendPathSuffix: String
get() = "state/{${RoutePaths.BATCH_CURSOR_ID}}/{${RoutePaths.BATCH_LIMIT}}"
get() = "state/{${RoutePaths.BATCH_AFTER_ID}}/{${RoutePaths.BATCH_LIMIT}}"
}

class ResendStateEventRouteSpecFactory : BatchRouteSpecFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import me.ahoo.wow.openapi.BatchRouteSpec
import me.ahoo.wow.openapi.BatchRouteSpecFactory
import me.ahoo.wow.openapi.Https
import me.ahoo.wow.openapi.RouteIdSpec
import me.ahoo.wow.openapi.RoutePaths.BATCH_CURSOR_ID
import me.ahoo.wow.openapi.RoutePaths.BATCH_AFTER_ID
import me.ahoo.wow.openapi.RoutePaths.BATCH_LIMIT

class BatchRegenerateSnapshotRouteSpec(
Expand All @@ -38,7 +38,7 @@ class BatchRegenerateSnapshotRouteSpec(
override val method: String
get() = Https.Method.PUT
override val appendPathSuffix: String
get() = "snapshot/{$BATCH_CURSOR_ID}/{$BATCH_LIMIT}"
get() = "snapshot/{$BATCH_AFTER_ID}/{$BATCH_LIMIT}"
}

class BatchRegenerateSnapshotRouteSpecFactory : BatchRouteSpecFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ScanAggregateRouteSpec(
get() = Https.Method.GET

override val appendPathSuffix: String
get() = "state/{${RoutePaths.BATCH_CURSOR_ID}}/{${RoutePaths.BATCH_LIMIT}}"
get() = "state/{${RoutePaths.BATCH_AFTER_ID}}/{${RoutePaths.BATCH_LIMIT}}"

override val summary: String
get() = "Scan state aggregate"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class BatchResultTest {
@Test
fun test() {
val batchResult = BatchResult("cursorId", 1)
assertThat(batchResult.cursorId, equalTo("cursorId"))
assertThat(batchResult.afterId, equalTo("cursorId"))
assertThat(batchResult.size, equalTo(1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class TracingSnapshotRepository(override val delegate: SnapshotRepository) :

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
return delegate.scanAggregateId(namedAggregate, cursorId, limit)
return delegate.scanAggregateId(namedAggregate, afterId, limit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class R2dbcSnapshotRepository(

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
val aggregateId = namedAggregate.aggregateId("0")
Expand All @@ -138,7 +138,7 @@ class R2dbcSnapshotRepository(
/* resourceClosure = */
{
it.createStatement(snapshotSchema.scan(aggregateId))
.bind(0, cursorId)
.bind(0, afterId)
.bind(1, limit)
.execute()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class RedisSnapshotRepository(

override fun scanAggregateId(
namedAggregate: NamedAggregate,
cursorId: String,
afterId: String,
limit: Int
): Flux<AggregateId> {
val keyPrefix = namedAggregate.toKeyPrefix()
Expand All @@ -59,7 +59,7 @@ class RedisSnapshotRepository(
.map {
DefaultSnapshotKeyConverter.toAggregateId(namedAggregate, it)
}.filter {
it.id > cursorId
it.id > afterId
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright [2021-present] [ahoo wang <[email protected]> (https://github.com/Ahoo-Wang)].
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package me.ahoo.wow.webflux.route

import me.ahoo.wow.api.modeling.AggregateId
import me.ahoo.wow.exception.toErrorInfo
import me.ahoo.wow.openapi.BatchResult
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

fun Flux<AggregateId>.toBatchResult(afterId: String): Mono<BatchResult> {
return this.materialize().reduce(BatchResult(afterId, 0)) { acc, signal ->
if (signal.isOnError) {
val error = signal.throwable!!.toErrorInfo()
return@reduce BatchResult(
afterId = acc.afterId,
size = acc.size,
errorCode = error.errorCode,
errorMsg = error.errorMsg
)
}
if (signal.isOnNext) {
val aggregateId = signal.get()!!
val nextAfterId = if (aggregateId.id > acc.afterId) {
aggregateId.id
} else {
acc.afterId
}
return@reduce BatchResult(nextAfterId, acc.size + 1)
}
acc
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class ResendStateEventFunction(
ResendStateEventHandler(aggregateMetadata, snapshotRepository, stateEventCompensator)

override fun handle(request: ServerRequest): Mono<ServerResponse> {
val cursorId = request.pathVariable(RoutePaths.BATCH_CURSOR_ID)
val afterId = request.pathVariable(RoutePaths.BATCH_AFTER_ID)
val limit = request.pathVariable(RoutePaths.BATCH_LIMIT).toInt()
return handler.handle(cursorId, limit)
return handler.handle(afterId, limit)
.toServerResponse(request, exceptionHandler)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import me.ahoo.wow.eventsourcing.snapshot.SnapshotRepository
import me.ahoo.wow.messaging.compensation.CompensationTarget
import me.ahoo.wow.modeling.matedata.AggregateMetadata
import me.ahoo.wow.openapi.BatchResult
import me.ahoo.wow.webflux.route.toBatchResult
import reactor.core.publisher.Mono

class ResendStateEventHandler(
Expand All @@ -39,24 +40,16 @@ class ResendStateEventHandler(
)
}

fun handle(cursorId: String, limit: Int): Mono<BatchResult> {
fun handle(afterId: String, limit: Int): Mono<BatchResult> {
val target = CompensationTarget(function = RESEND_FUNCTION)
return snapshotRepository.scanAggregateId(aggregateMetadata.namedAggregate, cursorId, limit)
return snapshotRepository.scanAggregateId(aggregateMetadata.namedAggregate, afterId, limit)
.flatMap { aggregateId ->
stateEventCompensator.resend(
aggregateId = aggregateId,
target = target,
headVersion = DEFAULT_HEAD_VERSION,
tailVersion = Int.MAX_VALUE
).thenReturn(aggregateId)
}
.reduce(BatchResult(cursorId, 0)) { acc, aggregateId ->
val nextCursorId = if (aggregateId.id > acc.cursorId) {
aggregateId.id
} else {
acc.cursorId
}
BatchResult(nextCursorId, acc.size + 1)
}
}.toBatchResult(afterId)
}
}
Loading

0 comments on commit b68461c

Please sign in to comment.