diff --git a/elastic/core/src/main/scala/Evolver.scala b/elastic/core/src/main/scala/Evolver.scala index a11282e..12eda11 100644 --- a/elastic/core/src/main/scala/Evolver.scala +++ b/elastic/core/src/main/scala/Evolver.scala @@ -12,12 +12,12 @@ import com.sksamuel.elastic4s.streams.ReactiveElastic._ import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} import io.circe.Codec import io.circe.generic.extras.semiauto.deriveConfiguredCodec -import Evolver.Command.{AddMappings, AliasUpdated, BatchUpdatesFinished, CheckTaskCompletion, DocumentsEvolved, EvolveDocuments, EvolveNextIndex, IndexMigrated, IndexMigrationFailed, IndexMigrationStarted, MappingsAdded, MigrateIndex, MigrateIndices, MigrateNextIndex, OldIndexDeleted, RunBatchUpdates, TaskStatus} import net.sc8s.akka.circe.CirceSerializer import net.sc8s.akka.components.ClusterComponent import net.sc8s.akka.stream.RateLogger import net.sc8s.circe.CodecConfiguration._ -import net.sc8s.elastic.Evolver.Command.{AliasUpdated, BatchUpdatesFinished, OldIndexDeleted, TaskStatus} +import net.sc8s.elastic.Evolver.Command.{AddMappings, AliasUpdated, BatchUpdatesFinished, CheckTaskCompletion, DocumentsEvolved, EvolveDocuments, EvolveNextIndex, IndexMigrated, IndexMigrationFailed, IndexMigrationStarted, IndexSettingsUpdated, MappingsAdded, MigrateIndex, MigrateIndices, MigrateNextIndex, OldIndexDeleted, RunBatchUpdates, TaskStatus, UpdateIndexSettings} +import net.sc8s.logstage.elastic.Logging.IzLoggerTags import java.time.LocalDateTime import scala.concurrent.Future @@ -35,6 +35,9 @@ object Evolver extends ClusterComponent.Singleton { private[Evolver] case class MigrateNextIndex(pendingIndices: Seq[Index]) extends Command private[Evolver] case class MigrateIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) extends Command + private[Evolver] case class UpdateIndexSettings(index: Index, pendingIndices: Seq[Index]) extends Command + private[Evolver] case class IndexSettingsUpdated(result: Try[Done]) extends Command + private[Evolver] case class AddMappings(index: Index, mappings: Seq[ElasticField], pendingIndices: Seq[Index]) extends Command private[Evolver] case class MappingsAdded(result: Try[Done]) extends Command @@ -140,6 +143,7 @@ object Evolver extends ClusterComponent.Singleton { MappingDefinition(meta = Map( mappingsHashField -> index.mappingsHash, analysisHashField -> index.analysisHash, + settingsHashField -> index.settingsHash, ), properties = index.mappings :+ KeywordField(Index.discriminator)) ) .analysis(index.analysis) @@ -173,17 +177,20 @@ object Evolver extends ClusterComponent.Singleton { val maybeHashes = for { analysisHash <- existingIndex.mappings.meta.get(analysisHashField) mappingsHash <- existingIndex.mappings.meta.get(mappingsHashField) - } yield analysisHash -> mappingsHash + settingsHash <- existingIndex.mappings.meta.get(settingsHashField) + } yield (analysisHash, mappingsHash, settingsHash) maybeHashes match { case _ if forceReindex => log.warn(s"${"forcingReindex" -> "tag"} of ${index.name -> "index"}") migrateIndex - case Some((existingAnalysisHash, existingMappingsHash)) => + case Some((existingAnalysisHash, existingMappingsHash, existingSettingsHash)) => // it's possible but hard to check whether mappings have only been added (in which case a reindex would not be necessary), or existing have changed. we just take the easy route... if (existingAnalysisHash != index.analysisHash || existingMappingsHash != index.mappingsHash) migrateIndex + else if (existingSettingsHash != index.settingsHash) + Future.successful(UpdateIndexSettings(index, updatedPendingIndices)) else { log.info(s"${"skippingMigration" -> "tag"} of ${index.name -> "index"}") Future.successful(MigrateNextIndex(updatedPendingIndices)) @@ -195,7 +202,7 @@ object Evolver extends ClusterComponent.Singleton { } } - context.pipeToSelf(eventualCommand)(_.fold(e => IndexMigrationFailed(index, e), identity)) + context.pipeToSelf(eventualCommand)(_.fold(IndexMigrationFailed(index, _), identity)) Behaviors.same } @@ -207,6 +214,13 @@ object Evolver extends ClusterComponent.Singleton { )(triedDone => MappingsAdded(triedDone)) addingMappings(index, pendingIndices) + case UpdateIndexSettings(index, pendingIndices) => + log.infoT("updatingSettings", s"${index.name -> "index"} ${index.settings}") + context.pipeToSelf( + elasticClient.execute(updateSettings(index.name, index.settings.view.mapValues(_.toString).toMap)).map(_.result).map(_ => Done) + )(triedDone => IndexSettingsUpdated(triedDone)) + updatingIndexSettings(index, pendingIndices) + case MigrateIndex(index, oldIndexName, newIndexName, pendingIndices) => log.info(s"${"migratingIndex" -> "tag"} ${index.name -> "index"} from $oldIndexName to $newIndexName") val eventualCommand = for { @@ -242,6 +256,17 @@ object Evolver extends ClusterComponent.Singleton { idle } + def updatingIndexSettings(index: Index, pendingIndices: Seq[Index]) = Behaviors.receiveMessagePartial[Command] { + case IndexSettingsUpdated(Success(_)) => + log.infoT("indexSettingsUpdated", s"${index.name -> "index"}") + context.self ! MigrateNextIndex(pendingIndices) + migratingIndices(forceReindex) + + case IndexSettingsUpdated(Failure(exception)) => + log.errorT("updatingIndexSettingsFailed", s"${index.name -> "index"} with $exception, aborting") + idle + } + def migratingIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) = Behaviors.withTimers[Command](timerScheduler => Behaviors.receiveMessagePartial { case IndexMigrationStarted(nodeId, taskId) => log.info(s"${"indexMigrationQueued" -> "tag"} of ${index.name -> "index"} at $nodeId with $taskId") @@ -377,6 +402,8 @@ object Evolver extends ClusterComponent.Singleton { private val analysisHashField = "analysisHash" + private val settingsHashField = "settingsHash" + override val name = "elastic-evolver" override val commandSerializer = CirceSerializer() diff --git a/elastic/core/src/main/scala/Index.scala b/elastic/core/src/main/scala/Index.scala index 426167e..0dd21c7 100644 --- a/elastic/core/src/main/scala/Index.scala +++ b/elastic/core/src/main/scala/Index.scala @@ -55,6 +55,8 @@ abstract class Index( final lazy val analysisHash = analysis.toString.hashCode.toString + final lazy val settingsHash = settings.toString.hashCode.toString + def hitIdFromId(id: Id): Json lazy implicit val latestTraitIndexable: Indexable[Latest] = indexableWithCirce(implicitly)