diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f0d22e7..44ae181 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,6 +40,8 @@ jobs: java-version: 17 cache: sbt + - run: docker compose up -d + - name: Check that workflows are up to date run: sbt '++ ${{ matrix.scala }}' githubWorkflowCheck @@ -47,7 +49,7 @@ jobs: run: sbt '++ ${{ matrix.scala }}' test - name: Compress target directories - run: tar cf targets.tar akka-components-persistence-projection-lagom-api/.js/target akka-components-testkit/target akka-components-persistence-projection-api/.jvm/target target schevo/.js/target akka-components-persistence-projection-api/.js/target lagom-api-circe/.jvm/target common-circe/.js/target akka-stream-utils/jvm/target akka-components-persistence-projection/target akka-components-persistence-projection-r2dbc/target akka-components-persistence-projection-cassandra/target akka-stream-utils/js/target schevo-circe/.js/target lagom-server-circe/target schevo/.jvm/target akka-components-persistence-utils/target akka-components-persistence-projection-lagom-api/.jvm/target akka-components-persistence-cassandra-lagom-api/.jvm/target common-circe/.jvm/target akka-components-persistence-cassandra-lagom-api/.js/target akka-circe/target lagom-api-circe/.js/target akka-components-persistence-projection-lagom/target akka-components/target lagom-server-circe-testkit/target akka-components-lagom/target schevo-circe-example-akka/target schevo-circe/.jvm/target logstage-elastic/target common-tzdb/.js/target akka-components-persistence-cassandra-lagom/target project/target + run: tar cf targets.tar akka-components-persistence-projection-lagom-api/.js/target akka-components-testkit/target akka-components-persistence-projection-api/.jvm/target target schevo/.js/target elastic/core/target akka-components-persistence-projection-api/.js/target lagom-api-circe/.jvm/target common-circe/.js/target akka-stream-utils/jvm/target akka-components-persistence-projection/target akka-components-persistence-projection-r2dbc/target akka-components-persistence-projection-cassandra/target akka-stream-utils/js/target schevo-circe/.js/target lagom-server-circe/target schevo/.jvm/target akka-components-persistence-utils/target elastic/lagom/service/target akka-components-persistence-projection-lagom-api/.jvm/target akka-components-persistence-cassandra-lagom-api/.jvm/target common-circe/.jvm/target akka-components-persistence-cassandra-lagom-api/.js/target akka-circe/target lagom-api-circe/.js/target akka-components-persistence-projection-lagom/target akka-components/target lagom-server-circe-testkit/target akka-components-lagom/target schevo-circe-example-akka/target schevo-circe/.jvm/target elastic/lagom/api/.jvm/target logstage-elastic/target common-tzdb/.js/target akka-components-persistence-cassandra-lagom/target elastic/lagom/api/.js/target project/target - name: Upload target directories uses: actions/upload-artifact@v3 @@ -79,6 +81,8 @@ jobs: java-version: 17 cache: sbt + - run: docker compose up -d + - name: Download target directories (2.13.11) uses: actions/download-artifact@v3 with: diff --git a/build.sbt b/build.sbt index 6ff4462..a278181 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -import Dependencies._ +import Dependencies.* import sbtcrossproject.CrossType import sbtghactions.JavaSpec.Distribution.Adopt @@ -28,6 +28,10 @@ lazy val sc8s = (project in file(".")) `common-circe`.js, `common-circe`.jvm, `common-tzdb`.js, + `elastic-core`, + `elastic-lagom-api`.js, + `elastic-lagom-api`.jvm, + `elastic-lagom-service`, `lagom-api-circe`.js, `lagom-api-circe`.jvm, `lagom-server-circe-testkit`, @@ -320,6 +324,47 @@ lazy val `akka-stream-utils` = crossProject(JSPlatform, JVMPlatform) idePackagePrefix := Some("net.sc8s.akka.stream") ) +lazy val `elastic-core` = (project in file("elastic/core")) + .settings( + libraryDependencies ++= Seq( + scalaTest.value % Test, + akka.actor, + akka.typed, + akka.stream, + akka.testkitTyped % Test, + elastic4s.core, + elastic4s.jsonCirce, + elastic4s.clientAkka % Test, + elastic4s.testkit % Test, + elastic4s.elasticTestFramework % Test, + elastic4s.httpStreams, + circe.core.value, + circe.parser.value, + circe.generic.value, + circe.genericExtras.value, + nameOf, + macwire.macros, + ), + ) + .dependsOn(`logstage-elastic`, `schevo-circe`.jvm, `akka-stream-utils`.jvm, `akka-components`) + +lazy val `elastic-lagom-api` = + crossProject(JSPlatform, JVMPlatform) + .crossType(CrossType.Pure) + .in(file("elastic/lagom/api")) + .jvmSettings(libraryDependencies += lagom.scaladslApi) + .jsSettings(libraryDependencies += lagom.js.scalaDslApi.value) + +lazy val `elastic-lagom-service` = (project in file("elastic/lagom/service")) + .settings( + libraryDependencies ++= Seq( + elastic4s.core, + elastic4s.clientAkka, + macwire.macros + ) + ) + .dependsOn(`elastic-core`, `elastic-lagom-api`.jvm) + // empty project to avoid regeneration in other projects https://github.com/cquiroz/sbt-tzdb/issues/88 lazy val `common-tzdb` = crossProject(JSPlatform) .crossType(CrossType.Pure) @@ -346,6 +391,9 @@ inThisBuild(Seq( libraryDependencySchemes ++= Seq( "org.scala-lang.modules" %% "scala-java8-compat" % "always", ), + resolvers ++= Seq( + "antex public" at "https://mymavenrepo.com/repo/zeKhQjbzBED1vIds46Kj/" + ), scmInfo := Some(ScmInfo(url("https://github.com/an-tex/sc8s"), "scm:git:git://github.com/an-tex/sc8s.git")), githubWorkflowJavaVersions := Seq(JavaSpec(Adopt, "11.0.13+8")), githubWorkflowTargetTags := Seq("*"), @@ -358,6 +406,7 @@ inThisBuild(Seq( "SONATYPE_USERNAME" -> "${{ secrets.SONATYPE_USERNAME }}" ) )), + githubWorkflowJobSetup += WorkflowStep.Run(List("docker compose up -d")), githubWorkflowPublishTargetBranches := Seq(RefPredicate.StartsWith(Ref.Tag("v"))), versionScheme := Some("early-semver"), dependencyOverrides ++= Dependencies.overrides ++ Seq( @@ -368,4 +417,5 @@ inThisBuild(Seq( githubWorkflowJavaVersions := Seq(JavaSpec.temurin("17")) )) -Global / excludeLintKeys += idePackagePrefix \ No newline at end of file +Global / excludeLintKeys += idePackagePrefix +Global / onChangedBuildSource := ReloadOnSourceChanges \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..382268d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +# used for tests only + +services: + elastic8: + image: docker.elastic.co/elasticsearch/elasticsearch:8.8.2 + restart: always + environment: + discovery.type: single-node + network.host: 0.0.0.0 + xpack.security.http.ssl.enabled: false + xpack.security.enabled: false + ports: + - 9210:9200 + - 9310:9300 + deploy: + resources: + limits: + memory: 1024M diff --git a/elastic/core/src/main/resources/logback-elastic-evolution.xml b/elastic/core/src/main/resources/logback-elastic-evolution.xml new file mode 100644 index 0000000..f18de0c --- /dev/null +++ b/elastic/core/src/main/resources/logback-elastic-evolution.xml @@ -0,0 +1,3 @@ + + + diff --git a/elastic/core/src/main/scala/mu/moin/elastic/evolution/Evolver.scala b/elastic/core/src/main/scala/mu/moin/elastic/evolution/Evolver.scala new file mode 100644 index 0000000..0394842 --- /dev/null +++ b/elastic/core/src/main/scala/mu/moin/elastic/evolution/Evolver.scala @@ -0,0 +1,381 @@ +package mu.moin.elastic.evolution + +import akka.Done +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.stream.scaladsl.{Sink, Source} +import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s.fields.{ElasticField, KeywordField} +import com.sksamuel.elastic4s.requests.mappings.MappingDefinition +import com.sksamuel.elastic4s.requests.task.GetTaskResponse +import com.sksamuel.elastic4s.streams.ReactiveElastic._ +import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} +import io.circe.Codec +import io.circe.generic.extras.semiauto.deriveConfiguredCodec +import mu.moin.elastic.evolution.Evolver.Command.{AddMappings, AliasUpdated, BatchUpdatesFinished, CheckTaskCompletion, DocumentsEvolved, EvolveDocuments, EvolveNextIndex, IndexMigrated, IndexMigrationFailed, IndexMigrationStarted, MappingsAdded, MigrateIndex, MigrateIndices, MigrateNextIndex, OldIndexDeleted, RunBatchUpdates, TaskStatus} +import net.sc8s.akka.circe.CirceSerializer +import net.sc8s.akka.components.ClusterComponent +import net.sc8s.akka.stream.RateLogger +import net.sc8s.circe.CodecConfiguration._ + +import java.time.LocalDateTime +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + +object Evolver extends ClusterComponent.Singleton { + sealed trait Command + sealed trait SerializableCommand extends Command + object Command { + case class MigrateIndices(indices: Seq[String], forceReindex: Boolean) extends SerializableCommand + case class EvolveDocuments(indices: Seq[String]) extends SerializableCommand + case class RunBatchUpdates(index: String, job: String) extends SerializableCommand + + private[Evolver] case class MigrateNextIndex(pendingIndices: Seq[Index]) extends Command + private[Evolver] case class MigrateIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) extends Command + + private[Evolver] case class AddMappings(index: Index, mappings: Seq[ElasticField], pendingIndices: Seq[Index]) extends Command + private[Evolver] case class MappingsAdded(result: Try[Done]) extends Command + + private[Evolver] case class IndexMigrationStarted(nodeId: String, taskId: String) extends Command + private[Evolver] case class CheckTaskCompletion(nodeId: String, taskId: String) extends Command + private[Evolver] case class TaskStatus(status: Try[GetTaskResponse]) extends Command + private[Evolver] case object IndexMigrated extends Command + private[Evolver] case class IndexMigrationFailed(index: Index, exception: Throwable) extends Command + private[Evolver] case class AliasUpdated(index: Try[Done]) extends Command + private[Evolver] case class OldIndexDeleted(deleted: Try[Done]) extends Command + + private[Evolver] case object EvolveNextIndex extends Command + private[Evolver] case class DocumentsEvolved(index: Index, result: Try[Seq[Index]]) extends Command + private[Evolver] case class BatchUpdatesFinished(result: Try[Done]) extends Command + + implicit val codec: Codec[SerializableCommand] = deriveConfiguredCodec + } + + class Component( + elasticClient: ElasticClient, + elasticIndices: Set[Index] + ) extends BaseComponent { + + override val behavior = { ctx => + import ctx.{log, materializer, actorContext => context} + import context.executionContext + implicit val classicActorSystem = context.system.toClassic + + def resolveElasticIndices(indices: Seq[String]) = + if (indices.nonEmpty) indices.map(index => elasticIndices.find(_.name == index).get) + else elasticIndices.toSeq + + def idle = Behaviors.receiveMessagePartial[Command] { + case MigrateIndices(indices, forceReindex) => + val indicesToMigrate = resolveElasticIndices(indices) + log.info(s"${"startingIndicesMigration" -> "tag"} for ${indicesToMigrate.map(_.name) -> "indices"}") + context.self ! MigrateNextIndex(indicesToMigrate) + migratingIndices(forceReindex) + + case EvolveDocuments(indices) => + val indicesToMigrate = resolveElasticIndices(indices) + log.info(s"${"startingIndicesEvolution" -> "tag"} for ${indicesToMigrate.map(_.name) -> "indices"}") + context.self ! EvolveNextIndex + evolvingDocuments(indicesToMigrate) + + case RunBatchUpdates(index, batchUpdateName) => + resolveElasticIndices(Seq(index)).headOption match { + case Some(elasticIndex) => + elasticIndex.batchUpdates.find(_.job == batchUpdateName) match { + case Some(batchUpdate) => + val batchUpdates = elasticClient.execute(count(elasticIndex.name)).flatMap { countResponse => + val count = countResponse.result.count + + import elasticIndex.{latestTraitIndexable, versionedHitReader} + + Source + .fromPublisher(elasticClient.publisher(search(elasticIndex.name) keepAlive "1m")) + .via(RateLogger(s"runningBatchUpdates|${elasticIndex.name}|$batchUpdateName", total = Some(count -> count))) + .groupedWithin(1000, 3.seconds) + .mapAsyncUnordered(8) { hits => + val bulkRequest = hits.map(hit => + // use indexInto instead of update to remove removed fields + indexInto(elasticIndex.name) id hit.id source batchUpdate.update(hit.to[elasticIndex.Latest]) + ) + elasticClient.execute(bulk(bulkRequest)) + } + .map(response => response.result.failures.toList match { + case firstError :: _ => + throw new Exception(s"some bulk inserts failed, first failure: $firstError") + case Nil => + response.result.successes.length + }) + .runWith(Sink.ignore) + } + + context.pipeToSelf(batchUpdates)(BatchUpdatesFinished) + runningBatchUpdates(elasticIndex, batchUpdateName) + + case None => + log.error(s"${"batchUpdateNotFound" -> "tag"} on ${elasticIndex.name -> "index"} $batchUpdateName") + Behaviors.same + } + case None => + log.error(s"${"indexNotFound" -> "tag"} $index $batchUpdateName") + Behaviors.same + } + } + + def runningBatchUpdates(index: Index, batchUpdate: String): Behaviors.Receive[Command] = Behaviors.receiveMessagePartial { + case BatchUpdatesFinished(Success(_)) => + log.info(s"${"batchUpdatesFinished" -> "tag"} of ${index.name -> "index"} for $batchUpdate") + idle + + case BatchUpdatesFinished(Failure(exception)) => + log.info(s"${"batchUpdatesFailed" -> "tag"} of ${index.name -> "index"} for $batchUpdate with $exception") + idle + } + + def migratingIndices(forceReindex: Boolean): Behaviors.Receive[Command] = { + def createIndexWithMappings(index: Index, indexName: String) = { + createIndex(indexName) + .mapping( + MappingDefinition(meta = Map( + mappingsHashField -> index.mappingsHash, + analysisHashField -> index.analysisHash, + ), properties = index.mappings :+ KeywordField(Index.discriminator)) + ) + .analysis(index.analysis) + } + + def default: Behaviors.Receive[Command] = Behaviors.receiveMessagePartial { + case MigrateNextIndex(pendingIndices) => + pendingIndices.toList match { + case Nil => + log.info(s"${"allIndicesMigrated" -> "tag"}") + idle + case index :: updatedPendingIndices => + val newIndexName = s"${index.name}-${LocalDateTime.now.format(Index.indexNameSuffixFormatter)}" + + val eventualCommand = elasticClient.execute(getIndex(index.name)).flatMap { + case RequestFailure(_, _, _, error) => + if (error.`type` != "index_not_found_exception") Future.failed(error.asException) + else { + log.info(s"${"create" -> "tag"} ${index.name -> "index"}") + for { + _ <- elasticClient.execute(createIndexWithMappings(index, newIndexName)).map(_.result) + _ <- elasticClient.execute(addAlias(index.name, newIndexName)).map(_.result) + } yield MigrateNextIndex(updatedPendingIndices) + } + + case RequestSuccess(_, _, _, result) => + val (existingIndexOriginalName, existingIndex) = result.head + + lazy val migrateIndex = Future.successful(MigrateIndex(index, existingIndexOriginalName, newIndexName, updatedPendingIndices)) + val maybeHashes = for { + analysisHash <- existingIndex.mappings.meta.get(analysisHashField) + mappingsHash <- existingIndex.mappings.meta.get(mappingsHashField) + } yield analysisHash -> mappingsHash + + maybeHashes match { + case _ if forceReindex => + log.warn(s"${"forcingReindex" -> "tag"} of ${index.name -> "index"}") + migrateIndex + + case Some((existingAnalysisHash, existingMappingsHash)) => + // it's possible but hard to check whether mappings have only been added (in which case a reindex would not be necessary), or existing have changed. we just take the easy route... + if (existingAnalysisHash != index.analysisHash || existingMappingsHash != index.mappingsHash) + migrateIndex + else { + log.info(s"${"skippingMigration" -> "tag"} of ${index.name -> "index"}") + Future.successful(MigrateNextIndex(updatedPendingIndices)) + } + + case None => + log.warn(s"${"missingMetaData" -> "tag"} in ${index.name -> "index"}, initial migration?") + migrateIndex + } + } + + context.pipeToSelf(eventualCommand)(_.fold(e => IndexMigrationFailed(index, e), identity)) + Behaviors.same + } + + // not used any more as we do a full migration in any case + case AddMappings(index, mappings, pendingIndices) => + log.info(s"${"addingMappings" -> "tag"} to ${index.name -> "index"} $mappings") + context.pipeToSelf( + elasticClient.execute(putMapping(index.name) properties mappings meta Map(mappingsHashField -> index.mappingsHash)).map(_.result).map(_ => Done) + )(triedDone => MappingsAdded(triedDone)) + addingMappings(index, pendingIndices) + + case MigrateIndex(index, oldIndexName, newIndexName, pendingIndices) => + log.info(s"${"migratingIndex" -> "tag"} ${index.name -> "index"} from $oldIndexName to $newIndexName") + val eventualCommand = for { + _ <- elasticClient.execute(createIndexWithMappings(index, newIndexName)).map(_.result) + result <- elasticClient.execute(reindex(oldIndexName, newIndexName) waitForCompletion false shouldStoreResult true).map(_.result) + } yield result match { + case Right(createTaskResponse) => + IndexMigrationStarted(createTaskResponse.nodeId, createTaskResponse.taskId) + + // this case should not be called due to waitForCompletion=false + case Left(reindexResponse) => + log.info(s"${"indexMigrationFinished" -> "tag"} with ${reindexResponse -> "stats"}") + IndexMigrated + } + + context.pipeToSelf(eventualCommand)(_.fold(e => IndexMigrationFailed(index, e), identity)) + + migratingIndex(index, oldIndexName, newIndexName, pendingIndices) + + case IndexMigrationFailed(index, exception) => + log.error(s"${"indexMigrationFailed" -> "tag"} of ${index.name -> "index"} with $exception") + idle + } + + def addingMappings(index: Index, pendingIndices: Seq[Index]) = Behaviors.receiveMessagePartial[Command] { + case MappingsAdded(Success(_)) => + log.info(s"${"mappingsAdded" -> "tag"} to ${index.name -> "index"}") + context.self ! MigrateNextIndex(pendingIndices) + migratingIndices(forceReindex) + + case MappingsAdded(Failure(exception)) => + log.info(s"${"addingMappingsFailed" -> "tag"} to ${index.name -> "index"} with $exception, aborting") + idle + } + + def migratingIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) = Behaviors.withTimers[Command](timerScheduler => Behaviors.receiveMessagePartial { + case IndexMigrationStarted(nodeId, taskId) => + log.info(s"${"indexMigrationQueued" -> "tag"} of ${index.name -> "index"} at $nodeId with $taskId") + timerScheduler.startTimerAtFixedRate(CheckTaskCompletion(nodeId, taskId), 1.second) + Behaviors.same + + case IndexMigrationFailed(index, exception) => + log.error(s"${"indexMigrationFailed" -> "tag"} of ${index.name -> "index"} with $exception") + idle + + case CheckTaskCompletion(nodeId, taskId) => + val eventualTaskResponse = elasticClient.execute(getTask(nodeId, taskId)).map(_.result) + context.pipeToSelf(eventualTaskResponse)(TaskStatus) + Behaviors.same + + case TaskStatus(Success(getTaskResponse)) => + val status = getTaskResponse.task.status + val total = status.total + val left = status.total - status.created + val percent = ((status.created.toDouble / total) * 100).toLong + val rate = status.created / Math.max(1, getTaskResponse.task.runningTime.toSeconds) + val secondsRemaining = left / Math.max(1, rate) + + log.debug(s"${"indexMigrationStatus" -> "tag"} of ${index.name -> "index"} $total $left $rate/s $secondsRemaining ${percent -> "done"}%") + if (getTaskResponse.completed) context.self ! IndexMigrated + Behaviors.same + + case TaskStatus(Failure(exception)) => + log.error(s"${"getTaskStatusFailed" -> "tag"} of ${index.name -> "index"} with $exception, aborting") + idle + + case IndexMigrated => + log.info(s"${"indexMigrated" -> "tag"} ${index.name -> "index"} from $oldIndexName to $newIndexName") + + val aliasUpdated = for { + _ <- elasticClient.execute(addAlias(index.name, newIndexName)).map(_.result) + _ <- elasticClient.execute(removeAlias(index.name, oldIndexName)).map(_.result) + } yield Done + + context.pipeToSelf(aliasUpdated)(AliasUpdated) + Behaviors.same + + case AliasUpdated(Success(_)) => + log.info(s"${"aliasUpdated" -> "tag"} of ${index.name -> "index"} from $oldIndexName to $newIndexName") + context.pipeToSelf(elasticClient.execute(deleteIndex(oldIndexName)).map(_.result).map(_ => Done))(OldIndexDeleted) + Behaviors.same + + case AliasUpdated(Failure(exception)) => + log.error(s"${"aliasUpdateFailed" -> "tag"} of ${index.name -> "index"} from $oldIndexName to $newIndexName with $exception, aborting") + idle + + case OldIndexDeleted(Success(_)) => + log.info(s"${"oldIndexDeleted" -> "tag"} of ${index.name -> "index"} $oldIndexName") + context.self ! MigrateNextIndex(pendingIndices) + migratingIndices(forceReindex) + + case OldIndexDeleted(Failure(exception)) => + log.error(s"${"deletingOldIndexFailed" -> "tag"} of ${index.name -> "index"} $oldIndexName with $exception, aborting") + idle + }) + + default + } + + def evolvingDocuments(pendingIndices: Seq[Index]): Behaviors.Receive[Command] = Behaviors.receiveMessagePartial { + case EvolveNextIndex => + pendingIndices.toList match { + case Nil => + log.info(s"${"allIndicesEvolved" -> "tag"}") + idle + + case index :: updatedPendingIndices => + val counts = for { + needEvolution <- elasticClient.execute(count(index.name) query not(termQuery(Index.discriminator, index.latestVersion))) + alreadyEvolved <- elasticClient.execute(count(index.name) query termQuery(Index.discriminator, index.latestVersion)) + } yield needEvolution -> alreadyEvolved + + val indexEvolved = counts.flatMap { + case (needEvolutionCountResponse, alreadyEvolvedCountResponse) => + val documentsToEvolve = needEvolutionCountResponse.result.count + val alreadyEvolvedDocuments = alreadyEvolvedCountResponse.result.count + + if (documentsToEvolve > 0) { + log.info(s"${"evolvingDocuments" -> "tag"} of ${index.name -> "index"} with $documentsToEvolve to ${index.latestVersion -> "latestDocumentVersion"} having $alreadyEvolvedDocuments") + import index.{latestTraitIndexable, versionedHitReader} + + Source + .fromPublisher(elasticClient.publisher(search(index.name) query not(termQuery(Index.discriminator, index.latestVersion)) keepAlive "1m")) + .via(RateLogger(s"evolvingDocuments|${index.name}", total = Some(documentsToEvolve -> (documentsToEvolve + alreadyEvolvedDocuments)))) + .groupedWithin(1000, 3.seconds) + .mapAsyncUnordered(8) { hits => + val bulkRequest = hits.map(hit => + // use indexInto instead of update to remove removed fields + indexInto(index.name) id hit.id source hit.to[index.Latest] + ) + elasticClient.execute(bulk(bulkRequest)) + } + .map(response => response.result.failures.toList match { + case firstError :: _ => + throw new Exception(s"some bulk inserts failed, first failure: $firstError") + case Nil => + response.result.successes.length + } + ) + .runWith(Sink.ignore) + } else { + log.info(s"${"skippingDocumentsEvolution" -> "tag"} of ${index.name -> "index"} with $alreadyEvolvedDocuments at ${index.latestVersion -> "latestDocumentVersion"}") + Future.successful(Done) + } + }.map(_ => updatedPendingIndices) + context.pipeToSelf(indexEvolved)(tried => DocumentsEvolved(index, tried)) + Behaviors.same + } + + case DocumentsEvolved(index, Success(pendingIndices)) => + log.info(s"${"indexEvolved" -> "tag"} of ${index.name -> "index"}") + + context.self ! EvolveNextIndex + evolvingDocuments(pendingIndices) + + case DocumentsEvolved(index, Failure(exception)) => + log.error(s"${"indexEvolutionFailed" -> "tag"} of ${index.name -> "index"} with $exception, aborting") + idle + } + + context.self ! MigrateIndices(Nil, forceReindex = false) + + idle + } + } + + private val mappingsHashField = "mappingHash" + + private val analysisHashField = "analysisHash" + + override val name = "elastic-evolver" + + override val commandSerializer = CirceSerializer() +} diff --git a/elastic/core/src/main/scala/mu/moin/elastic/evolution/HitMeta.scala b/elastic/core/src/main/scala/mu/moin/elastic/evolution/HitMeta.scala new file mode 100644 index 0000000..c9bf96c --- /dev/null +++ b/elastic/core/src/main/scala/mu/moin/elastic/evolution/HitMeta.scala @@ -0,0 +1,5 @@ +package mu.moin.elastic.evolution + +trait HitMeta { + val elasticId: ElasticId +} diff --git a/elastic/core/src/main/scala/mu/moin/elastic/evolution/Index.scala b/elastic/core/src/main/scala/mu/moin/elastic/evolution/Index.scala new file mode 100644 index 0000000..434aaff --- /dev/null +++ b/elastic/core/src/main/scala/mu/moin/elastic/evolution/Index.scala @@ -0,0 +1,157 @@ +package mu.moin.elastic.evolution + +import com.github.dwickern.macros.NameOf.qualifiedNameOf +import com.github.dwickern.macros.NameOfImpl +import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s._ +import com.sksamuel.elastic4s.analysis.Analysis +import com.sksamuel.elastic4s.circe._ +import com.sksamuel.elastic4s.fields.ElasticField +import com.sksamuel.elastic4s.requests.searches.SearchRequest +import com.sksamuel.elastic4s.requests.update.UpdateRequest +import io.circe.generic.extras.Configuration +import io.circe.syntax.EncoderOps +import io.circe.{Codec, Json} +import mu.moin.elastic.evolution.Index.BatchUpdate +import net.sc8s.schevo.circe.SchevoCirce + +import java.time.format.DateTimeFormatter +import scala.concurrent.Future +import scala.language.experimental.macros +import scala.reflect.runtime.universe.{TypeTag, typeOf} + +abstract class Index( + // baseName without prefixes, should not be accessible from outside to avoid accidental access of non-prefixed indices + baseName: String + ) extends SchevoCirce { + + val indexSetup: IndexSetup + + final lazy val name = s"${indexSetup.indexNamePrefix.getOrElse("")}$baseName" + + // mixin StringId for defaults + type Id + /* don't do this as Id could be an "external", non extendable class + type Id <: { + val hitId: String + } + */ + + override type Latest <: LatestT with Version { + // the id occurs twice in the hit.id and inside the document itself. in the first iteration the id was saved only in the hit.id but in the case of e.g. JsonId's this makes subsets of the id not queryable (as elastic handles the JsonId as a String) so let's live with this duplication + val id: Id + } + + // mappings deletion is not supported (but still can happen if an reindex occurs due to a changed mapping) + val mappings = Seq.empty[ElasticField] + + val analysis = Analysis(Nil) + + val batchUpdates = Seq.empty[BatchUpdate[Latest]] + + final lazy val mappingsHash = mappings.toString.hashCode.toString + + final lazy val analysisHash = analysis.toString.hashCode.toString + + def hitIdFromId(id: Id): Json + + lazy implicit val latestTraitIndexable: Indexable[Latest] = indexableWithCirce(implicitly) + + lazy implicit val versionedHitReader: HitReader[Latest] = hitReaderWithCirce(codec) + // copy&paste this. no clue how to define it in here, macros? + //override val latestVersion = latestVersionHelper[LatestCaseClass] + val latestVersion: String + + def latestVersionHelper[T <: LatestCaseClass : TypeTag] = typeOf[T].typeSymbol.name.decodedName.toString + + implicit val configuration = Index.configuration + + implicit val codec: Codec[Latest] + + import indexSetup.actorSystem.executionContext + import indexSetup.elasticClient + + private def execute[T, U](t: T)(implicit + handler: Handler[T, U], + manifest: Manifest[U], + options: CommonRequestOptions + ): Future[U] = elasticClient.execute(t).map(_.result) + + def encodeId(id: Id) = hitIdFromId(id).noSpacesSortKeys + + def index(latest: Latest) = execute(indexRequest(latest)) + + def indexRequest(latest: Latest) = { + import scala.language.reflectiveCalls + indexInto(name) id encodeId(latest.id) doc latest refresh indexSetup.refreshPolicy + } + + def get(id: Id): Future[Option[Latest]] = + execute(getRequest(id)).map(_.toOpt[Latest]) + + private def getRequest(id: Id) = ElasticDsl.get(name, encodeId(id)) + + def delete(id: Id) = + execute(deleteRequest(id)) + + def deleteAll() = + execute(deleteAllRequest()) + + def deleteRequest(id: Id) = deleteById(name, encodeId(id)) refresh indexSetup.refreshPolicy + + def deleteAllRequest() = deleteByQuery(name, matchAllQuery()) refresh indexSetup.refreshPolicy + + def update(id: Id, transformUpdateRequest: UpdateRequest => UpdateRequest) = execute( + updateRequest(id, transformUpdateRequest) + ) + + def updateRequest(id: Id, transformRequest: UpdateRequest => UpdateRequest) = transformRequest(updateById(name, encodeId(id)) refresh indexSetup.refreshPolicy) + + def updateField(id: Id, field: Latest => Any, value: Any) = + execute(updateFieldRequest(id, field, value)) + + def updateFieldRequest(id: Id, field: Latest => Any, value: Any) = updateRequest(id, _ doc qualifiedNameOf[Latest](field) -> value) + + def fieldName(expr: Latest => Any): String = macro NameOfImpl.qualifiedNameOf + + def search(searchRequest: SearchRequest => SearchRequest = identity) = execute(searchRequest(ElasticDsl.search(name))).map(_.hits.hits.toSeq.map(_.to[Latest])) + + def searchHits(searchRequest: SearchRequest => SearchRequest = identity) = execute(searchRequest(ElasticDsl.search(name))).map(_.hits.hits.toSeq.map(hit => hit -> hit.to[Latest])) + + def searchResponse(searchRequest: SearchRequest => SearchRequest = identity) = execute(searchRequest(ElasticDsl.search(name))) +} + +object Index { + val indexNameSuffixFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss_SSS") + + val discriminator = "class" + implicit val configuration = Configuration.default.withDiscriminator(discriminator) + + case class BatchUpdate[T](job: String, update: T => T) + + abstract class StringId(baseName: String) extends Index(baseName) { + type Id = String + + override def hitIdFromId(id: Id) = id.asJson + } + + abstract class LongId(baseName: String) extends Index(baseName) { + type Id = Long + + override def hitIdFromId(id: Id) = id.asJson + } + + abstract class JsonId(baseName: String) extends Index(baseName) { + implicit val idCodec: Codec[Id] + + override def hitIdFromId(id: Id) = id.asJson + } + + abstract class ExternalJsonId[IdT](baseName: String)(override implicit val idCodec: Codec[IdT]) extends JsonId(baseName) { + type Id = IdT + } + + implicit class KeywordSuffix(field: String) { + def keyword = field + ".keyword" + } +} \ No newline at end of file diff --git a/elastic/core/src/main/scala/mu/moin/elastic/evolution/IndexSetup.scala b/elastic/core/src/main/scala/mu/moin/elastic/evolution/IndexSetup.scala new file mode 100644 index 0000000..ead3c2d --- /dev/null +++ b/elastic/core/src/main/scala/mu/moin/elastic/evolution/IndexSetup.scala @@ -0,0 +1,15 @@ +package mu.moin.elastic.evolution + +import akka.actor.typed.ActorSystem +import com.sksamuel.elastic4s.ElasticClient +import com.sksamuel.elastic4s.requests.common.RefreshPolicy + +case class IndexSetup( + elasticClient: ElasticClient, + actorSystem: ActorSystem[_], + // mainly for tests + indexNamePrefix: Option[String] = None, + refreshImmediately: Boolean = false + ) { + val refreshPolicy = if (refreshImmediately) RefreshPolicy.Immediate else RefreshPolicy.None +} diff --git a/elastic/core/src/main/scala/mu/moin/elastic/evolution/package.scala b/elastic/core/src/main/scala/mu/moin/elastic/evolution/package.scala new file mode 100644 index 0000000..bd938d9 --- /dev/null +++ b/elastic/core/src/main/scala/mu/moin/elastic/evolution/package.scala @@ -0,0 +1,5 @@ +package mu.moin.elastic + +package object evolution { + type ElasticId = String +} diff --git a/elastic/core/src/test/scala/mu/moin/elastic/evolution/ElasticIndexTesting.scala b/elastic/core/src/test/scala/mu/moin/elastic/evolution/ElasticIndexTesting.scala new file mode 100644 index 0000000..a73ad07 --- /dev/null +++ b/elastic/core/src/test/scala/mu/moin/elastic/evolution/ElasticIndexTesting.scala @@ -0,0 +1,83 @@ +package mu.moin.elastic.evolution + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps +import cats.implicits.toTraverseOps +import com.sksamuel.elastic4s.ElasticClient +import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings} +import com.sksamuel.elastic4s.requests.mappings.MappingDefinition +import org.scalatest.Inspectors.forAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, EitherValues, Suite} + +import scala.util.Random + +trait ElasticIndexTesting extends BeforeAndAfterEach with BeforeAndAfterAll with EitherValues { + _: Suite with ScalaTestWithActorTestKit => + + val elasticIndices: Set[Index] + + implicit lazy val elasticClient = ElasticClient(AkkaHttpClient(AkkaHttpClientSettings(Seq("localhost:9210")))(system.toClassic)) + + implicit lazy val indexSetup: IndexSetup = IndexSetup( + elasticClient, + system, + Some(s"test-${Random.alphanumeric.take(8).mkString.toLowerCase}_").filter(_ => createTemporaryIndices), + refreshImmediately = true + ) + + // set to false when e.g. querying non-local env + val createTemporaryIndices = true + + // set to true to recreate indices instead of just deleting documents, which is faster but might lead to changing scores + // note: refresh,forceMerge didn't help + val recreateIndices = false + + override protected def beforeAll() = { + if (createTemporaryIndices && !recreateIndices) { + implicit val executionContext = system.executionContext + forAll(elasticIndices.map(index => + elasticClient.execute( + createIndex(index.name) + .mapping(MappingDefinition(properties = index.mappings)) + .analysis(index.analysis) + )).toList.sequence.futureValue + )(_.result.acknowledged shouldBe true) + } + super.beforeAll() + } + + override protected def beforeEach() = { + if (createTemporaryIndices) { + implicit val executionContext = system.executionContext + if (recreateIndices) + forAll( + elasticIndices.map(recreateIndex).toList.sequence.futureValue + )( + _.result.acknowledged shouldBe true + ) + else + forAll( + elasticIndices.map(_.deleteAll()).toList.sequence.futureValue + )( + _.isLeft shouldBe true + ) + } + super.beforeEach() + } + + protected override def afterAll() = { + if (createTemporaryIndices) + elasticClient.execute(deleteIndex(elasticIndices.map(_.name))) + super.afterAll() + } + + def recreateIndex(index: Index) = { + elasticClient.execute(deleteIndex(index.name)).futureValue + elasticClient.execute( + createIndex(index.name) + .mapping(MappingDefinition(properties = index.mappings)) + .analysis(index.analysis) + ) + } +} diff --git a/elastic/core/src/test/scala/mu/moin/elastic/evolution/EvolverSpec.scala b/elastic/core/src/test/scala/mu/moin/elastic/evolution/EvolverSpec.scala new file mode 100644 index 0000000..760b0a0 --- /dev/null +++ b/elastic/core/src/test/scala/mu/moin/elastic/evolution/EvolverSpec.scala @@ -0,0 +1,348 @@ +//package mu.moin.elastic.evolution + +// TODO disabled for now as it sometimes fails in gitlab without any error, issue with mixing scalatest and specs? prolly should be moved into separate repo anyway... +/* +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit + +import akka.actor.ActorSystem +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.scaladsl.adapter._ +import com.sksamuel.elastic4s.ElasticClient +import com.sksamuel.elastic4s.ElasticDsl.{RichFuture => _, _} +import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings} +import com.sksamuel.elastic4s.circe._ +import com.sksamuel.elastic4s.requests.indexes.Field +import com.sksamuel.elastic4s.requests.mappings.{FieldDefinition, KeywordField, TextField} +import com.typesafe.config.ConfigFactory +import io.circe.generic.extras.semiauto._ +import io.circe.parser._ +import io.circe.syntax._ +import io.circe.{Decoder, Encoder, Json} +import mu.moin.elastic.evolution.Evolver.{EvolveDocuments, MigrateIndices, RunBatchUpdates} +import mu.moin.elastic.evolution.EvolverSpec._ +import mu.moin.elastic.evolution.Index.BatchUpdate +import mu.moin.schemaevolution.Schema +import org.scalatest.Inspectors._ +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} + +class EvolverSpec extends AkkaTypedSpec with Matchers with Eventually { + + private val elasticClient = ElasticClient(AkkaHttpClient(AkkaHttpClientSettings(Seq(sys.env.getOrElse("ELASTIC_URL", "localhost:9210"))))) + + private val testIndices = Seq(TestIndex1, TestIndex2) + + "Evolver" should { + "create indices, mappings and aliases" in new EvolverContext { + evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false) + + eventually { + val response = elasticClient.execute(catAliases()).awaitResult + response.result.map(_.alias) should contain theSameElementsAs testIndices.map(_.name) + forAll(response.result.map(_.index)) { index => + val _index :: dateTime :: Nil = index.split("-", 2).toList + testIndices.map(_.name) should contain(_index) + + LocalDateTime.parse(dateTime, Index.indexNameSuffixFormatter).until(LocalDateTime.now, ChronoUnit.SECONDS) should be <= 10L + } + } + + elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings.properties shouldBe Map( + "name" -> Field(Some("keyword")), + "class" -> Field(Some("keyword")) + ) + } + "keep existing aliases and indices" in new EvolverContext { + evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false) + + val existingAliases = eventually { + val response = elasticClient.execute(catAliases()).awaitResult + response.result should have length testIndices.length + response.result + } + + evolver ! MigrateIndices(testIndices.map(_.name), forceReindex = false) + process() + + elasticClient.execute(catAliases()).awaitResult.result shouldBe existingAliases + } + + def addDocumentV2() = { + elasticClient.execute(indexInto(TestIndex1.name) source TestIndex1.DocumentV2("name1").asJson { + import Index.configuration + deriveConfiguredEncoder[EvolverSpec.TestIndex1.DocumentV2] + }).awaitResult + eventually(elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1) + } + + "migrate to updated mapping by reindexing" in new EvolverContext(TestIndex1) { + evolver ! MigrateIndices(Seq(TestIndex1.name), forceReindex = false) + val originalIndex = eventually { + val aliases = elasticClient.execute(catAliases()).awaitResult.result.find(_.alias == TestIndex1.name) + aliases should not be empty + aliases.get.index + } + addDocumentV2() + + spawnEvolver(TestIndex1MappingUpdated) ! MigrateIndices(Seq(TestIndex1MappingUpdated.name), forceReindex = false) + + eventually { + elasticClient.execute(catAliases()).awaitResult.result.filter(_.alias == TestIndex1.name).map(_.index) should not contain originalIndex + val mappings = elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings + mappings.properties shouldBe Map("name" -> Field(Some("text")), "class" -> Field(Some("keyword"))) + mappings.meta.get(Evolver.mappingsHashField) should contain(TestIndex1MappingUpdated.mappingsHash) + elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1 + } + } + "migrate by adding mapping" in new EvolverContext(TestIndex1) { + evolver ! MigrateIndices(Nil, forceReindex = false) + val originalIndex = eventually { + val aliases = elasticClient.execute(catAliases()).awaitResult.result.find(_.alias == TestIndex1.name) + aliases should not be empty + aliases.get.index + } + + addDocumentV2() + + spawnEvolver(TestIndex1MappingAdded) ! MigrateIndices(Nil, forceReindex = false) + + eventually { + elasticClient.execute(catAliases()).awaitResult.result.filter(_.alias == TestIndex1.name).map(_.index) should contain(originalIndex) + val mappings = elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings + mappings.properties shouldBe Map("name" -> Field(Some("keyword")), "added" -> Field(Some("keyword")), "class" -> Field(Some("keyword"))) + mappings.meta.get(Evolver.mappingsHashField) should contain(TestIndex1MappingAdded.mappingsHash) + elasticClient.execute(count(TestIndex1.name)).awaitResult.result.count shouldBe 1 + } + } + "skip migration of mappings" in new EvolverContext(TestIndex1) { + evolver ! MigrateIndices(Nil, forceReindex = false) + + eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty) + elasticClient.execute(putMapping(TestIndex1.name) fields Seq(KeywordField("deleted"))).awaitResult + evolver ! MigrateIndices(Nil, forceReindex = false) + process() + + elasticClient.execute(getIndex(TestIndex1.name)).awaitResult.result.head._2.mappings.properties === Map("name" -> Field(Some("keyword")), "deleted" -> Field(Some("keyword"))) + } + "evolve documents with older version" in new EvolverContext(IndexV1) { + evolver ! MigrateIndices(Nil, forceReindex = false) + eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty) + + val document1_V1 = IndexV1.DocumentV1("first1", "last1", Some(1)) + val document2_V2 = Json.obj( + "class" -> IndexV2.latestVersion.asJson, + "skip this" -> "otherwise it would fail evolution".asJson + ) + val document3_V1 = IndexV1.DocumentV1("first3", "last3", Some(3)) + + elasticClient.execute(indexInto(IndexV1.name) source document1_V1.latestTrait).awaitResult + elasticClient.execute(indexInto(IndexV1.name) source document2_V2).awaitResult + elasticClient.execute(indexInto(IndexV1.name) source document3_V1.latestTrait).awaitResult + eventually(elasticClient.execute(count(IndexV1.name)).awaitResult.result.count shouldBe 3) + + spawnEvolver(IndexV2) ! EvolveDocuments(Nil) + + val document1_V2 = IndexV2.DocumentV1(document1_V1.firstName, document1_V1.lastName, None).migrate + val document3_V2 = IndexV2.DocumentV1(document3_V1.firstName, document3_V1.lastName, None).migrate + + eventually(elasticClient.execute(search(IndexV2.name)).awaitResult.result.hits.hits.map(_.sourceAsString).map(parse(_).right.get) should contain theSameElementsAs Seq( + document1_V2.latestTrait.asJson, + document2_V2, + document3_V2.latestTrait.asJson + )) + } + "run batch updates" in new EvolverContext(IndexV1) { + evolver ! MigrateIndices(Nil, forceReindex = false) + eventually(elasticClient.execute(catAliases()).awaitResult.result should not be empty) + + val document = IndexV1.DocumentV1("first1", "last1", Some(1)) + elasticClient.execute(indexInto(IndexV1.name) source document.latestTrait).awaitResult + eventually(elasticClient.execute(count(IndexV1.name)).awaitResult.result.count shouldBe 1) + + evolver ! RunBatchUpdates(IndexV1.name, "clearAge") + + eventually(elasticClient.execute(search(IndexV2.name)).awaitResult.result.hits.hits.map(_.sourceAsString).map(parse(_).right.get) should contain theSameElementsAs Seq( + document.latestTrait.asJson.mapObject(_.remove("age")) + )) + } + } + + private def deleteAllIndicesAndAliases() = + elasticClient.execute(catAliases()).flatMap(response => Future.sequence( + response.result.map(aliasResponse => elasticClient.execute(removeAlias(aliasResponse.alias, aliasResponse.index))) + )).flatMap(_ => + elasticClient.execute(catIndices()).flatMap(response => elasticClient.execute(deleteIndex(response.result.map(_.index): _*))) + ).awaitResult + + //private def cleanAllIndices() = elasticClient.execute(catIndices()).flatMap(response => elasticClient.execute(clearIndex(response.result.map(_.index)))).awaitResult + // + + class EvolverContext(indices: Index*) { + def spawnEvolver(indices: Index*) = testKit.spawn(Evolver(elasticClient, if (indices.isEmpty) testIndices else indices)) + + val evolver = spawnEvolver(indices: _ *) + + deleteAllIndicesAndAliases() + } + +} + +object EvolverSpec { + trait TestIndex1Base extends Index { + override val name = "test_index1" + + override val latestVersion = latestVersionHelper[LatestCaseClass] + + override val mappings: Seq[FieldDefinition] = Seq(KeywordField("name")) + + sealed trait Versioned extends Schema.Versioned { + override type LatestTrait = Document + } + + sealed trait Document extends Versioned with Schema.Latest { + override type LatestCaseClass = DocumentV3 + + val name: String + val deleted: Boolean + } + + case class DocumentV3(name: String, deleted: Boolean) extends Document { + override def migrate = this + + override def caseClass = this + } + + case class DocumentV2(name: String) extends Versioned { + override def migrate = DocumentV3(name, deleted = false).migrate + } + + case class DocumentV1(firstName: String, lastName: String) extends Versioned { + override def migrate = DocumentV2(s"$firstName $lastName").migrate + } + override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder + override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder + } + + object TestIndex1 extends TestIndex1Base + + object TestIndex1MappingUpdated extends TestIndex1Base { + override val mappings: Seq[FieldDefinition] = Seq(TextField("name")) + } + + object TestIndex1MappingAdded extends TestIndex1Base { + override val mappings = Seq(KeywordField("name"), KeywordField("added")) + } + + object TestIndex2 extends Index { + override val name = "test_index2" + + override val latestVersion = latestVersionHelper[LatestCaseClass] + + case class Document(parameter: String) extends Schema.Versioned with Schema.Latest { + + override type LatestTrait = Document + override type LatestCaseClass = Document + + override def migrate = this + + override def caseClass = this + } + override type Versioned = Document + override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder + override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder + } + + object IndexV1 extends Index { + override val name = "index" + + override val latestVersion = latestVersionHelper[LatestCaseClass] + + sealed trait Versioned extends Schema.Versioned { + override type LatestTrait = Document + } + + sealed trait Document extends Versioned with Schema.Latest { + val firstName: String + val lastName: String + val age: Option[Int] + + override type LatestCaseClass = DocumentV1 + } + + case class DocumentV1(firstName: String, lastName: String, age: Option[Int] = None) extends Document { + override def migrate: Document = this + + override def caseClass = this + } + + override val batchUpdates = Seq( + BatchUpdate("clearAge", _.caseClass.copy(age = None).latestTrait) + ) + + override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder + override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder + } + + object IndexV2 extends Index { + override val name = IndexV1.name + + override val latestVersion = latestVersionHelper[LatestCaseClass] + + override val mappings = Seq(KeywordField("name")) + + sealed trait Versioned extends Schema.Versioned { + override type LatestTrait = Document + } + + sealed trait Document extends Versioned with Schema.Latest { + val firstName: String + val lastName: String + val fullName: String + override type LatestCaseClass = DocumentV2 + } + + case class DocumentV2(firstName: String, lastName: String, fullName: String) extends Document { + override def migrate = this + + override def caseClass = this + } + + case class DocumentV1(firstName: String, lastName: String, age: Option[Int] = None) extends Versioned { + override def migrate = DocumentV2(firstName, lastName, s"$firstName $lastName").migrate + } + override implicit val versionedDecoder: Decoder[Versioned] = deriveConfiguredDecoder + override implicit val latestInterfaceEncoder: Encoder[Document] = deriveConfiguredEncoder + } +} + +abstract class AkkaTypedSpec extends ScalaTestWithActorTestKit( + // creating own ActorSystem to allow system.actorOf inside behavior (e.g. in elasticClient.publisher) + ActorSystem("scalatest", { + val debug = false + + ConfigFactory.parseString( + if (debug) + """ + |akka { + | log-dead-letters = 10 + | log-dead-letters-during-shutdown = on + | loglevel = "DEBUG" + |} + |""".stripMargin else "") + .withFallback(ConfigFactory.load()) + }).toTyped) with AnyWordSpecLike { + implicit val classicActorSystem: ActorSystem = system.toClassic + implicit val executionContext: ExecutionContext = system.executionContext + + def process() = Thread.sleep(3000) + + implicit class RichFuture[T](future: Future[T]) { + def awaitResult(implicit duration: Duration = 3.seconds): T = Await.result(future, duration) + } +} +*/ diff --git a/elastic/core/src/test/scala/mu/moin/elastic/evolution/IndexSpec.scala b/elastic/core/src/test/scala/mu/moin/elastic/evolution/IndexSpec.scala new file mode 100644 index 0000000..fe324bb --- /dev/null +++ b/elastic/core/src/test/scala/mu/moin/elastic/evolution/IndexSpec.scala @@ -0,0 +1,92 @@ +package mu.moin.elastic.evolution + +import akka.actor.Status.Success +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import com.sksamuel.elastic4s.{Hit, Indexable} +import com.softwaremill.macwire.wireSet +import io.circe.Codec +import io.circe.generic.extras.semiauto._ +import io.circe.syntax._ +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +class IndexSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with Matchers with ElasticIndexTesting { + private val index = new IndexSpec.Index + + import index._ + + "Index" should { + val documentV1 = DocumentV1("id", "first", "last") + val documentV2 = DocumentV2("id", "first last") + + "migrate" in { + documentV1.evolve shouldBe documentV2 + } + "index as latest" in { + def source[T](t: T)(implicit indexable: Indexable[T]) = indexable.json(t) + + source(documentV2.asLatest) === documentV2.asLatest.asJson.noSpaces + } + "hit read versioned with migration to latest" in { + versionedHitReader.read(new Hit { + override def id = "id" + + override def index = ??? + + override def version = ??? + + override def sourceAsString = documentV1.asInstanceOf[Version].asJson { + import Index.configuration + import io.circe.generic.extras.auto._ + deriveConfiguredEncoder[Version] + }.noSpaces + + override def sourceAsMap = ??? + + override def exists = ??? + + override def score = ??? + + override def seqNo = ??? + + override def primaryTerm = ??? + }).toString === Success(documentV1.evolve).toString + } + "latest version" in { + index.asInstanceOf[Index].latestVersion === "DocumentV2" + } + } + + override val elasticIndices = wireSet +} + +object IndexSpec { + + class Index(implicit val indexSetup: IndexSetup) extends Index.StringId("index") { + override type LatestCaseClass = DocumentV2 + + sealed trait Version extends VersionT + + sealed trait Latest extends LatestT with Version { + val id: String + val name: String + } + + case class DocumentV2(id: String, name: String) extends Latest { + override def caseClass = this + + override def evolve = this + } + + case class DocumentV1(id: String, firstName: String, lastName: String) extends Version { + override def evolve = DocumentV2(id, s"$firstName $lastName").evolve + } + + override val latestVersion = latestVersionHelper[LatestCaseClass] + + override implicit val codec: Codec[Latest] = evolvingCodec { + import io.circe.generic.extras.auto._ + deriveConfiguredCodec[Version] + } + } +} diff --git a/elastic/lagom/api/src/main/scala/mu/moin/elastic/api/ElasticService.scala b/elastic/lagom/api/src/main/scala/mu/moin/elastic/api/ElasticService.scala new file mode 100644 index 0000000..4e10868 --- /dev/null +++ b/elastic/lagom/api/src/main/scala/mu/moin/elastic/api/ElasticService.scala @@ -0,0 +1,24 @@ +package mu.moin.elastic.api + +import akka.NotUsed +import com.lightbend.lagom.scaladsl.api.transport.Method +import com.lightbend.lagom.scaladsl.api.{Descriptor, Service, ServiceCall} + +trait ElasticService extends Service { + def migrateIndices(indices: Seq[String] = Nil, forceReindex: Option[Boolean]): ServiceCall[NotUsed, NotUsed] + + def evolveDocuments(indices: Seq[String] = Nil): ServiceCall[NotUsed, NotUsed] + + def batchUpdate(index: String, job: String): ServiceCall[NotUsed, NotUsed] + + val apiPrefix: String + + abstract override def descriptor: Descriptor = super.descriptor.addCalls({ + import Service._ + Seq( + restCall(Method.POST, s"$apiPrefix/elastic/index/migrate?indices&forceReindex", migrateIndices _), + restCall(Method.POST, s"$apiPrefix/elastic/documents/evolve?indices", evolveDocuments _), + restCall(Method.POST, s"$apiPrefix/elastic/documents/batch-update?index&job", batchUpdate _), + ) + }: _ *) +} diff --git a/elastic/lagom/service/src/main/resources/logback-elastic-common.xml b/elastic/lagom/service/src/main/resources/logback-elastic-common.xml new file mode 100644 index 0000000..f18de0c --- /dev/null +++ b/elastic/lagom/service/src/main/resources/logback-elastic-common.xml @@ -0,0 +1,3 @@ + + + diff --git a/elastic/lagom/service/src/main/resources/reference.conf b/elastic/lagom/service/src/main/resources/reference.conf new file mode 100644 index 0000000..c2bb276 --- /dev/null +++ b/elastic/lagom/service/src/main/resources/reference.conf @@ -0,0 +1 @@ +com.sksamuel.elastic4s.akka.hosts = ["localhost:9210"] diff --git a/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticComponents.scala b/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticComponents.scala new file mode 100644 index 0000000..2dcec2c --- /dev/null +++ b/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticComponents.scala @@ -0,0 +1,21 @@ +package mu.moin.elastic + +import akka.actor.ActorSystem +import akka.actor.typed.scaladsl.adapter._ +import com.sksamuel.elastic4s.ElasticClient +import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings} +import com.softwaremill.macwire.wire +import mu.moin.elastic.evolution.{Evolver, Index, IndexSetup} + +trait ElasticComponents { + val actorSystem: ActorSystem + + val elasticIndices: Set[Index] + + implicit val indexSetup: IndexSetup = IndexSetup(elasticClient, actorSystem.toTyped) + + lazy implicit val elasticClient = ElasticClient(AkkaHttpClient(AkkaHttpClientSettings())(actorSystem)) + + lazy val evolver: Evolver.Wiring = Evolver.init(wire[Evolver.Component])(actorSystem.toTyped) +} + diff --git a/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticServiceImpl.scala b/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticServiceImpl.scala new file mode 100644 index 0000000..906d391 --- /dev/null +++ b/elastic/lagom/service/src/main/scala/mu/moin/elastic/ElasticServiceImpl.scala @@ -0,0 +1,30 @@ +package mu.moin.elastic + +import akka.NotUsed +import com.lightbend.lagom.scaladsl.api.ServiceCall +import mu.moin.elastic.api.ElasticService +import mu.moin.elastic.evolution.Evolver + +import scala.concurrent.Future + +trait ElasticServiceImpl extends ElasticService { + val evolver: Evolver.Wiring + + override def migrateIndices(indices: Seq[String], forceReindex: Option[Boolean]) = ServiceCall { _ => + evolver.actorRef ! Evolver.Command.MigrateIndices( + indices, + forceReindex.getOrElse(false) + ) + Future.successful(NotUsed) + } + + override def evolveDocuments(indices: Seq[String]) = ServiceCall { _ => + evolver.actorRef ! Evolver.Command.EvolveDocuments(indices) + Future.successful(NotUsed) + } + + override def batchUpdate(index: String, job: String) = ServiceCall { _ => + evolver.actorRef ! Evolver.Command.RunBatchUpdates(index, job) + Future.successful(NotUsed) + } +} diff --git a/elastic/lagom/service/src/test/scala/mu/moin/elastic/worksheets/EsQuery.sc b/elastic/lagom/service/src/test/scala/mu/moin/elastic/worksheets/EsQuery.sc new file mode 100644 index 0000000..e011e04 --- /dev/null +++ b/elastic/lagom/service/src/test/scala/mu/moin/elastic/worksheets/EsQuery.sc @@ -0,0 +1,21 @@ +import akka.actor.ActorSystem +import com.sksamuel.elastic4s.ElasticClient +import com.sksamuel.elastic4s.ElasticDsl._ +import com.sksamuel.elastic4s.akka.{AkkaHttpClient, AkkaHttpClientSettings} + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +implicit val actorSystem = ActorSystem() + +//val elasticClient= ElasticClient(JavaClient(ElasticProperties("http://localhost:9210"))) + +val elasticClient = ElasticClient(AkkaHttpClient(AkkaHttpClientSettings(Seq("localhost:9210")))) +Await.result(elasticClient.execute( + putMapping("image_signatures").meta(Map("moin" -> "digga")) +), 5.minutes).result + +Await.result(elasticClient.execute( + getIndex("image_signatures") +), 5.minutes).result.head._2.mappings.meta diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 8cc6afc..78c5f1d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,10 +5,12 @@ object Dependencies { val scala213 = "2.13.11" val scalaTest = Def.setting("org.scalatest" %%% "scalatest" % "3.2.16") + val specs2 = Def.setting("org.scalatest" %%% "scalatest" % "3.2.16") val scalamock = "org.scalamock" %% "scalamock" % "5.2.0" % Test val slf4j = "org.slf4j" % "slf4j-api" % "2.0.7" val scalaJavaTime = Def.setting("io.github.cquiroz" %%% "scala-java-time" % "2.5.0") val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "2.1.0" + val nameOf = "com.github.dwickern" %% "scala-nameof" % "3.0.0-antex" val overrides = akka.overrides ++ Seq( /* @@ -46,7 +48,7 @@ object Dependencies { object akka { private val akkaVersion = "2.6.20" - private val akkaHttpVersion = "10.1.13" + private val akkaHttpVersion = "10.2.10" private val akkaJs = "2.2.6.14" private val r2dbcVersion = "0.7.7" @@ -147,4 +149,19 @@ object Dependencies { val core = "ch.qos.logback" % "logback-core" % logback val classic = "ch.qos.logback" % "logback-classic" % logback } + + object elastic4s { + private val elastic4s = "8.8.0" + private val elasticsearch = "7.14.1" + val clientAkka = "com.sksamuel.elastic4s" %% "elastic4s-client-akka" % elastic4s + val clientJava = "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4s + val core = "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4s + val elasticTestFramework = "org.elasticsearch.test" % "framework" % elasticsearch + val httpStreams = "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4s + val jsonCirce = "com.sksamuel.elastic4s" %% "elastic4s-json-circe" % elastic4s + val jsonPlay = "com.sksamuel.elastic4s" %% "elastic4s-json-play" % elastic4s + // does not work, use httpStreams instead + val streamsAkka = "com.sksamuel.elastic4s" %% "elastic4s-streams-akka" % elastic4s + val testkit = "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4s + } }