Skip to content

Commit

Permalink
use config for numberOfProjectionInstances instead of val override
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tex committed Oct 10, 2024
1 parent 69e6a90 commit 9d7bd99
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# override this to use custom settings for the default query plugin
net.sc8s.akka.components.persistence.projection.r2dbc.default = ${akka.persistence.r2dbc}

net.sc8s.akka.components.persistence.projection.r2dbc.default.connection-factory = ${akka.persistence.r2dbc.postgres}
net.sc8s.akka.components.persistence.projection.r2dbc {
default = ${akka.persistence.r2dbc}
default {
connection-factory = ${akka.persistence.r2dbc.postgres}
# this is only used for sharded entities. singleton entities will always have only one instance
# must be a whole number divisor of numberOfSlices [by default 1024].
numberOfProjectionInstances = 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ private[r2dbc] trait R2dbcProjection extends EventSourcedT.ProjectionT {
_: EventSourcedT#EventSourcedBaseComponentT
with EventSourcedT#BaseComponent =>

val numberOfProjectionInstances = 1

// override this if you e.g. want to use a readonly endpoint for the projections https://discuss.lightbend.com/t/r2dbc-projections-use-read-only-hot-standby-replicas-for-projections-query/10860 . or override it in the config to customize all projections
val readJournalPluginId = "net.sc8s.akka.components.persistence.projection.r2dbc.default.query"
}
Expand All @@ -47,6 +45,7 @@ trait R2dbcShardedProjection extends R2dbcProjection {
projection: Projection[EventT, ComponentContextS with ComponentContext.Projection],
actorSystem: ActorSystem[_]
): ManagedProjection[EventEnvelope[EventT]] = {
val numberOfProjectionInstances = actorSystem.settings.config.getInt(s"${readJournalPluginId.stripSuffix(".query")}.numberOfProjectionInstances")
val sliceRanges = EventSourcedProvider.sliceRanges(actorSystem, readJournalPluginId, numberOfProjectionInstances)

val projectionIds = sliceRanges.map(sliceRange =>
Expand Down Expand Up @@ -151,7 +150,7 @@ trait R2dbcSingletonProjection extends R2dbcProjection {
new ManagedProjection[EventEnvelope[EventT]](
projection.name,
projectionIds,
numberOfProjectionInstances,
1, // singleton projection parallelism is currently limited to 1 due to the EventsByPersistenceIdSourceProvider
new ProjectionStatusObserver[EventEnvelope[EventT]]()(actorSystem) {
override def extractSequenceNr(envelope: EventEnvelope[EventT]) = envelope.sequenceNr

Expand Down

0 comments on commit 9d7bd99

Please sign in to comment.