From 7d330e9c632cb89cc8cebea746dbce4675878af2 Mon Sep 17 00:00:00 2001 From: Andreas Gabor Date: Fri, 7 Jun 2024 14:27:21 +0200 Subject: [PATCH] add r2dbc cleanup --- ...terComponentsR2dbcPersistenceService.scala | 21 +++++ ...omponentsR2dbcPersistenceServiceImpl.scala | 83 +++++++++++++++++++ build.sbt | 29 ++++++- project/Dependencies.scala | 2 + 4 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 akka-components-persistence-r2dbc-lagom-api/src/main/scala/ClusterComponentsR2dbcPersistenceService.scala create mode 100644 akka-components-persistence-r2dbc-lagom/src/main/scala/ClusterComponentsR2dbcPersistenceServiceImpl.scala diff --git a/akka-components-persistence-r2dbc-lagom-api/src/main/scala/ClusterComponentsR2dbcPersistenceService.scala b/akka-components-persistence-r2dbc-lagom-api/src/main/scala/ClusterComponentsR2dbcPersistenceService.scala new file mode 100644 index 0000000..9ca2a1c --- /dev/null +++ b/akka-components-persistence-r2dbc-lagom-api/src/main/scala/ClusterComponentsR2dbcPersistenceService.scala @@ -0,0 +1,21 @@ +package net.sc8s.akka.components.persistence.r2dbc.lagom.api + +import akka.{Done, NotUsed} +import com.lightbend.lagom.scaladsl.api.transport.Method +import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall} + +trait ClusterComponentsR2dbcPersistenceService extends Service { + val apiPrefix: String + + def deleteSingletonEntity(name: String): ServiceCall[NotUsed, Done] + + def deleteShardedEntities(name: String): ServiceCall[NotUsed, Done] + + abstract override def descriptor = { + import Service._ + super.descriptor.addCalls( + restCall(Method.DELETE, s"$apiPrefix/entity/singleton/:name", deleteSingletonEntity _), + restCall(Method.DELETE, s"$apiPrefix/entity/sharded/:name", deleteShardedEntities _), + ) + } +} diff --git a/akka-components-persistence-r2dbc-lagom/src/main/scala/ClusterComponentsR2dbcPersistenceServiceImpl.scala b/akka-components-persistence-r2dbc-lagom/src/main/scala/ClusterComponentsR2dbcPersistenceServiceImpl.scala new file mode 100644 index 0000000..96a80ea --- /dev/null +++ b/akka-components-persistence-r2dbc-lagom/src/main/scala/ClusterComponentsR2dbcPersistenceServiceImpl.scala @@ -0,0 +1,83 @@ +package net.sc8s.akka.components.persistence.cassandra.lagom + +import akka.Done +import akka.actor.typed.ActorSystem +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.scaladsl.CurrentPersistenceIdsQuery +import akka.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.stream.Materializer.matFromSystem +import akka.stream.scaladsl.Sink +import com.lightbend.lagom.scaladsl.api.ServiceCall +import net.sc8s.akka.circe.CirceSerializer +import net.sc8s.akka.components.ClusterComponent +import net.sc8s.akka.components.persistence.r2dbc.lagom.api.ClusterComponentsR2dbcPersistenceService +import net.sc8s.logstage.elastic.Logging + +import scala.util.Success + +trait ClusterComponentsR2dbcPersistenceServiceImpl extends ClusterComponentsR2dbcPersistenceService with Logging { + val clusterComponents: Set[ClusterComponent.Component[_]] + + implicit val actorSystem: ActorSystem[_] + + import actorSystem.executionContext + + val queries = PersistenceQuery(actorSystem).readJournalFor[CurrentPersistenceIdsQuery](R2dbcReadJournal.Identifier) + val cleanup = new EventSourcedCleanup(actorSystem) + + override def deleteSingletonEntity(name: String) = ServiceCall { _ => + val maybeSingletonPersistenceId = clusterComponents + .map(wiredComponent => wiredComponent.component -> wiredComponent.innerComponent) + .collectFirst { case (outerComponent: ClusterComponent.Singleton.EventSourced, innerComponent) if outerComponent.name == name => + innerComponent.asInstanceOf[outerComponent.BaseComponent].persistenceId + } + + lazy val singletonEntities = clusterComponents.map(_.component).collect { + case outerComponent: ClusterComponent.Singleton.EventSourced => outerComponent.name + } + + maybeSingletonPersistenceId.fold( + throw new Exception(s"singleton with name=$name not found, existing singletonEntities=$singletonEntities") + ) { singletonPersistenceId => + log.infoT("deleteSingletonEntity", s"$name") + cleanup + .deleteAll(singletonPersistenceId.id, resetSequenceNumber = false) + .andThen { + case Success(_) => + log.infoT("singletonEntityDeleted", s"$name") + } + } + } + + override def deleteShardedEntities(name: String) = ServiceCall { _ => + val maybeTypeKey = clusterComponents + .map(wiredComponent => wiredComponent.component -> wiredComponent.innerComponent) + .collectFirst { case (outerComponent: ClusterComponent.Sharded.EventSourced, innerComponent) if outerComponent.name == name => + innerComponent.asInstanceOf[outerComponent.BaseComponent].typeKey + } + + lazy val shardedEntities = clusterComponents.map(_.component).collect { + case outerComponent: ClusterComponent.Sharded.EventSourced => outerComponent.name + } + + maybeTypeKey.fold( + throw new Exception(s"shardedEntity with name=$name not found, existing shardedEntities=$shardedEntities") + ) { typeKey => + log.infoT("deleteShardedEntities", s"$name") + + queries + .currentPersistenceIds() + .filter(_.startsWith(s"${typeKey.name}|")) + .mapAsync(10) { id => + log.infoT("deleteShardedEntity", s"$id") + cleanup.deleteAll(id, resetSequenceNumber = false) + } + .runWith(Sink.fold(0)((i, _) => i + 1)) + .map { deletedEntities => + log.infoT("shardedEntitiesDeleted", s"$name with $deletedEntities") + Done + } + } + } +} diff --git a/build.sbt b/build.sbt index 974f060..f0d8712 100644 --- a/build.sbt +++ b/build.sbt @@ -19,6 +19,12 @@ lazy val sc8s = (project in file(".")) `akka-components-persistence-projection-r2dbc`, `akka-components-persistence-projection-api`.js, `akka-components-persistence-projection-api`.jvm, + `akka-components-persistence-cassandra-lagom-api`.js, + `akka-components-persistence-cassandra-lagom-api`.jvm, + `akka-components-persistence-cassandra-lagom`, + `akka-components-persistence-r2dbc-lagom-api`.js, + `akka-components-persistence-r2dbc-lagom-api`.jvm, + `akka-components-persistence-r2dbc-lagom`, `akka-components-persistence-projection-lagom-api`.js, `akka-components-persistence-projection-lagom-api`.jvm, `akka-components-persistence-projection-lagom`, @@ -164,6 +170,26 @@ lazy val `akka-components-persistence-cassandra-lagom` = (project in file("akka- ) .dependsOn(`akka-components`, `akka-components-persistence-cassandra-lagom-api`.jvm) +lazy val `akka-components-persistence-r2dbc-lagom-api` = crossProject(JSPlatform, JVMPlatform) + .crossType(CrossType.Pure) + .in(file("akka-components-persistence-r2dbc-lagom-api")) + .jvmSettings(libraryDependencies += lagom.scaladslApi) + .jsSettings(libraryDependencies += lagom.js.scalaDslApi.value) + .settings( + idePackagePrefix := Some("net.sc8s.akka.components.persistence.r2dbc.lagom.api") + ) + .dependsOn(`common-circe`, `lagom-api-circe`) + +lazy val `akka-components-persistence-r2dbc-lagom` = (project in file("akka-components-persistence-r2dbc-lagom")) + .settings( + libraryDependencies ++= Seq( + lagom.scaladslServer, + akka.persistenceR2dbcLicensed, + ), + idePackagePrefix := Some("net.sc8s.akka.components.persistence.r2dbc.lagom") + ) + .dependsOn(`akka-components`, `akka-components-persistence-r2dbc-lagom-api`.jvm) + lazy val `akka-components-persistence-projection` = (project in file("akka-components-persistence-projection")) .settings( libraryDependencies ++= Seq( @@ -421,7 +447,8 @@ inThisBuild(Seq( "org.scala-lang.modules" %% "scala-java8-compat" % "always", ), resolvers ++= Seq( - "antex public" at "https://mymavenrepo.com/repo/zeKhQjbzBED1vIds46Kj/" + "antex public" at "https://mymavenrepo.com/repo/zeKhQjbzBED1vIds46Kj/", + "Akka library repository" at "https://repo.akka.io/maven" ), scmInfo := Some(ScmInfo(url("https://github.com/an-tex/sc8s"), "scm:git:git://github.com/an-tex/sc8s.git")), githubWorkflowJavaVersions := Seq(JavaSpec(Adopt, "11.0.13+8")), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2700571..bef4ad3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -51,12 +51,14 @@ object Dependencies { private val akkaHttpVersion = "10.2.10" private val akkaJs = "2.2.6.14" private val r2dbcVersion = "0.7.7" + private val r2dbcLicensedVersion = "1.2.4" val actor = "com.typesafe.akka" %% "akka-actor" % akkaVersion val clusterShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion val http = "com.typesafe.akka" %% "akka-http" % akkaHttpVersion val persistenceCassandra = "com.typesafe.akka" %% "akka-persistence-cassandra" % "1.0.6" val persistenceR2dbc = "com.lightbend.akka" %% "akka-persistence-r2dbc" % r2dbcVersion + val persistenceR2dbcLicensed = "com.lightbend.akka" %% "akka-persistence-r2dbc" % r2dbcLicensedVersion val persistenceTestkit = "com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion val persistenceTyped = "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion val stream = "com.typesafe.akka" %% "akka-stream" % akkaVersion