diff --git a/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/state/ResendStateEventHandler.kt b/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/state/ResendStateEventHandler.kt index 562c6fffdb..211143777f 100644 --- a/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/state/ResendStateEventHandler.kt +++ b/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/event/state/ResendStateEventHandler.kt @@ -43,7 +43,7 @@ class ResendStateEventHandler( fun handle(afterId: String, limit: Int): Mono { val target = CompensationTarget(function = RESEND_FUNCTION) return snapshotRepository.scanAggregateId(aggregateMetadata.namedAggregate, afterId, limit) - .flatMap { aggregateId -> + .concatMap { aggregateId -> stateEventCompensator.resend( aggregateId = aggregateId, target = target, diff --git a/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/snapshot/BatchRegenerateSnapshotHandlerFunction.kt b/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/snapshot/BatchRegenerateSnapshotHandlerFunction.kt index 53e5834683..aa86043f57 100644 --- a/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/snapshot/BatchRegenerateSnapshotHandlerFunction.kt +++ b/wow-webflux/src/main/kotlin/me/ahoo/wow/webflux/route/snapshot/BatchRegenerateSnapshotHandlerFunction.kt @@ -49,7 +49,7 @@ class BatchRegenerateSnapshotHandlerFunction( namedAggregate = aggregateMetadata.namedAggregate, afterId = afterId, limit = limit, - ).flatMap { aggregateId -> + ).concatMap { aggregateId -> handler.handle(aggregateId).thenReturn(aggregateId) }.toBatchResult(afterId).toServerResponse(request, exceptionHandler) }