Skip to content

Commit

Permalink
add r2dbc cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tex committed Jun 7, 2024
1 parent 42216f3 commit 2cf13e1
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.sc8s.akka.components.persistence.r2dbc.lagom.api

import akka.{Done, NotUsed}
import com.lightbend.lagom.scaladsl.api.transport.Method
import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall}

trait ClusterComponentsR2dbcPersistenceService extends Service {
val apiPrefix: String

def deleteSingletonEntity(name: String): ServiceCall[NotUsed, Done]

def deleteShardedEntities(name: String): ServiceCall[NotUsed, Done]

abstract override def descriptor = {
import Service._
super.descriptor.addCalls(
restCall(Method.DELETE, s"$apiPrefix/entity/singleton/:name", deleteSingletonEntity _),
restCall(Method.DELETE, s"$apiPrefix/entity/sharded/:name", deleteShardedEntities _),
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package net.sc8s.akka.components.persistence.cassandra.lagom

import akka.Done
import akka.actor.typed.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery
import akka.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.stream.Materializer.matFromSystem
import akka.stream.scaladsl.Sink
import com.lightbend.lagom.scaladsl.api.ServiceCall
import net.sc8s.akka.circe.CirceSerializer
import net.sc8s.akka.components.ClusterComponent
import net.sc8s.akka.components.persistence.r2dbc.lagom.api.ClusterComponentsR2dbcPersistenceService
import net.sc8s.logstage.elastic.Logging

import scala.util.Success

trait ClusterComponentsR2dbcPersistenceServiceImpl extends ClusterComponentsR2dbcPersistenceService with Logging {
val clusterComponents: Set[ClusterComponent.Component[_]]

implicit val actorSystem: ActorSystem[_]

import actorSystem.executionContext

val queries = PersistenceQuery(actorSystem).readJournalFor[CurrentPersistenceIdsQuery](R2dbcReadJournal.Identifier)
val cleanup = new EventSourcedCleanup(actorSystem)

override def deleteSingletonEntity(name: String) = ServiceCall { _ =>
val maybeSingletonPersistenceId = clusterComponents
.map(wiredComponent => wiredComponent.component -> wiredComponent.innerComponent)
.collectFirst { case (outerComponent: ClusterComponent.Singleton.EventSourced, innerComponent) if outerComponent.name == name =>
innerComponent.asInstanceOf[outerComponent.BaseComponent].persistenceId
}

lazy val singletonEntities = clusterComponents.map(_.component).collect {
case outerComponent: ClusterComponent.Singleton.EventSourced => outerComponent.name
}

maybeSingletonPersistenceId.fold(
throw new Exception(s"singleton with name=$name not found, existing singletonEntities=$singletonEntities")
) { singletonPersistenceId =>
log.infoT("deleteSingletonEntity", s"$name")
cleanup
.deleteAll(singletonPersistenceId.id, resetSequenceNumber = false)
.andThen {
case Success(_) =>
log.infoT("singletonEntityDeleted", s"$name")
}
}
}

override def deleteShardedEntities(name: String) = ServiceCall { _ =>
val maybeTypeKey = clusterComponents
.map(wiredComponent => wiredComponent.component -> wiredComponent.innerComponent)
.collectFirst { case (outerComponent: ClusterComponent.Sharded.EventSourced, innerComponent) if outerComponent.name == name =>
innerComponent.asInstanceOf[outerComponent.BaseComponent].typeKey
}

lazy val shardedEntities = clusterComponents.map(_.component).collect {
case outerComponent: ClusterComponent.Sharded.EventSourced => outerComponent.name
}

maybeTypeKey.fold(
throw new Exception(s"shardedEntity with name=$name not found, existing shardedEntities=$shardedEntities")
) { typeKey =>
log.infoT("deleteShardedEntities", s"$name")

queries
.currentPersistenceIds()
.filter(_.startsWith(s"${typeKey.name}|"))
.mapAsync(10) { id =>
log.infoT("deleteShardedEntity", s"$id")
cleanup.deleteAll(id, resetSequenceNumber = false)
}
.runWith(Sink.fold(0)((i, _) => i + 1))
.map { deletedEntities =>
log.infoT("shardedEntitiesDeleted", s"$name with $deletedEntities")
Done
}
}
}
}
29 changes: 28 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ lazy val sc8s = (project in file("."))
`akka-components-persistence-projection-r2dbc`,
`akka-components-persistence-projection-api`.js,
`akka-components-persistence-projection-api`.jvm,
`akka-components-persistence-cassandra-lagom-api`.js,
`akka-components-persistence-cassandra-lagom-api`.jvm,
`akka-components-persistence-cassandra-lagom`,
`akka-components-persistence-r2dbc-lagom-api`.js,
`akka-components-persistence-r2dbc-lagom-api`.jvm,
`akka-components-persistence-r2dbc-lagom`,
`akka-components-persistence-projection-lagom-api`.js,
`akka-components-persistence-projection-lagom-api`.jvm,
`akka-components-persistence-projection-lagom`,
Expand Down Expand Up @@ -164,6 +170,26 @@ lazy val `akka-components-persistence-cassandra-lagom` = (project in file("akka-
)
.dependsOn(`akka-components`, `akka-components-persistence-cassandra-lagom-api`.jvm)

lazy val `akka-components-persistence-r2dbc-lagom-api` = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.in(file("akka-components-persistence-r2dbc-lagom-api"))
.jvmSettings(libraryDependencies += lagom.scaladslApi)
.jsSettings(libraryDependencies += lagom.js.scalaDslApi.value)
.settings(
idePackagePrefix := Some("net.sc8s.akka.components.persistence.r2dbc.lagom.api")
)
.dependsOn(`common-circe`, `lagom-api-circe`)

lazy val `akka-components-persistence-r2dbc-lagom` = (project in file("akka-components-persistence-r2dbc-lagom"))
.settings(
libraryDependencies ++= Seq(
lagom.scaladslServer,
akka.persistenceR2dbcLicensed,
),
idePackagePrefix := Some("net.sc8s.akka.components.persistence.r2dbc.lagom")
)
.dependsOn(`akka-components`, `akka-components-persistence-r2dbc-lagom-api`.jvm)

lazy val `akka-components-persistence-projection` = (project in file("akka-components-persistence-projection"))
.settings(
libraryDependencies ++= Seq(
Expand Down Expand Up @@ -421,7 +447,8 @@ inThisBuild(Seq(
"org.scala-lang.modules" %% "scala-java8-compat" % "always",
),
resolvers ++= Seq(
"antex public" at "https://mymavenrepo.com/repo/zeKhQjbzBED1vIds46Kj/"
"antex public" at "https://mymavenrepo.com/repo/zeKhQjbzBED1vIds46Kj/",
"Akka library repository" at "https://repo.akka.io/maven"
),
scmInfo := Some(ScmInfo(url("https://github.com/an-tex/sc8s"), "scm:git:git://github.com/an-tex/sc8s.git")),
githubWorkflowJavaVersions := Seq(JavaSpec(Adopt, "11.0.13+8")),
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ object Dependencies {
private val akkaHttpVersion = "10.2.10"
private val akkaJs = "2.2.6.14"
private val r2dbcVersion = "0.7.7"
private val r2dbcLicensedVersion = "1.2.4"

val actor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
val clusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion
val http = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
val persistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "1.0.6"
val persistenceR2dbc = "com.lightbend.akka" %% "akka-persistence-r2dbc" % r2dbcVersion
val persistenceR2dbcLicensed = "com.lightbend.akka" %% "akka-persistence-r2dbc" % r2dbcLicensedVersion
val persistenceTestkit = "com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion
val persistenceTyped = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion
val stream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
Expand Down

0 comments on commit 2cf13e1

Please sign in to comment.