Skip to content

Commit

Permalink
Merge branch 'main' into update/framework-8.14.3
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tex committed Oct 2, 2024
2 parents c52068c + 87b1f17 commit 0c6b1f1
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ private[r2dbc] trait R2dbcProjection extends EventSourcedT.ProjectionT {
with EventSourcedT#BaseComponent =>

val numberOfProjectionInstances = 1

// override this if you e.g. want to use a readonly endpoint for the projections https://discuss.lightbend.com/t/r2dbc-projections-use-read-only-hot-standby-replicas-for-projections-query/10860
val readJournalPluginId = R2dbcReadJournal.Identifier
}

object R2dbcProjection {
Expand All @@ -44,7 +47,7 @@ trait R2dbcShardedProjection extends R2dbcProjection {
projection: Projection[EventT, ComponentContextS with ComponentContext.Projection],
actorSystem: ActorSystem[_]
): ManagedProjection[EventEnvelope[EventT]] = {
val sliceRanges = EventSourcedProvider.sliceRanges(actorSystem, R2dbcReadJournal.Identifier, numberOfProjectionInstances)
val sliceRanges = EventSourcedProvider.sliceRanges(actorSystem, readJournalPluginId, numberOfProjectionInstances)

val projectionIds = sliceRanges.map(sliceRange =>
ProjectionId(projection.name, s"${projection.name}-${sliceRange.min}-${sliceRange.max}")
Expand Down Expand Up @@ -84,7 +87,7 @@ trait R2dbcShardedProjection extends R2dbcProjection {
private[r2dbc] def createSourceProvider(minSlice: Int, maxSlice: Int, actorSystem: ActorSystem[_]): SourceProvider[Offset, EventEnvelope[EventT]] =
EventSourcedProvider.eventsBySlices[EventT](
actorSystem,
R2dbcReadJournal.Identifier,
readJournalPluginId,
typeKey.name,
minSlice,
maxSlice,
Expand All @@ -99,7 +102,7 @@ object R2dbcShardedProjection {
override private[r2dbc] def createSourceProvider(minSlice: Int, maxSlice: Int, actorSystem: ActorSystem[_]): SourceProvider[Offset, EventEnvelope[EventT]] =
EventSourcedProvider.eventsBySlicesStartingFromSnapshots(
actorSystem,
R2dbcReadJournal.Identifier,
readJournalPluginId,
typeKey.name,
minSlice,
maxSlice,
Expand All @@ -124,7 +127,7 @@ trait R2dbcSingletonProjection extends R2dbcProjection {
offset().map { offsetOpt =>
val sequence = offsetOpt.getOrElse(Sequence(0L))
val eventQueries = PersistenceQuery(system)
.readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
.readJournalFor[R2dbcReadJournal](readJournalPluginId)
createEventSource(persistenceId, sequence, eventQueries)
}

Expand Down

0 comments on commit 0c6b1f1

Please sign in to comment.