Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update settings #249

Merged
merged 2 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions elastic/core/src/main/scala/Evolver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import com.sksamuel.elastic4s.streams.ReactiveElastic._
import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess}
import io.circe.Codec
import io.circe.generic.extras.semiauto.deriveConfiguredCodec
import Evolver.Command.{AddMappings, AliasUpdated, BatchUpdatesFinished, CheckTaskCompletion, DocumentsEvolved, EvolveDocuments, EvolveNextIndex, IndexMigrated, IndexMigrationFailed, IndexMigrationStarted, MappingsAdded, MigrateIndex, MigrateIndices, MigrateNextIndex, OldIndexDeleted, RunBatchUpdates, TaskStatus}
import net.sc8s.akka.circe.CirceSerializer
import net.sc8s.akka.components.ClusterComponent
import net.sc8s.akka.stream.RateLogger
import net.sc8s.circe.CodecConfiguration._
import net.sc8s.elastic.Evolver.Command.{AliasUpdated, BatchUpdatesFinished, OldIndexDeleted, TaskStatus}
import net.sc8s.elastic.Evolver.Command.{AddMappings, AliasUpdated, BatchUpdatesFinished, CheckTaskCompletion, DocumentsEvolved, EvolveDocuments, EvolveNextIndex, IndexMigrated, IndexMigrationFailed, IndexMigrationStarted, IndexSettingsUpdated, MappingsAdded, MigrateIndex, MigrateIndices, MigrateNextIndex, OldIndexDeleted, RunBatchUpdates, TaskStatus, UpdateIndexSettings}
import net.sc8s.logstage.elastic.Logging.IzLoggerTags

import java.time.LocalDateTime
import scala.concurrent.Future
Expand All @@ -35,6 +35,9 @@ object Evolver extends ClusterComponent.Singleton {
private[Evolver] case class MigrateNextIndex(pendingIndices: Seq[Index]) extends Command
private[Evolver] case class MigrateIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) extends Command

private[Evolver] case class UpdateIndexSettings(index: Index, pendingIndices: Seq[Index]) extends Command
private[Evolver] case class IndexSettingsUpdated(result: Try[Done]) extends Command

private[Evolver] case class AddMappings(index: Index, mappings: Seq[ElasticField], pendingIndices: Seq[Index]) extends Command
private[Evolver] case class MappingsAdded(result: Try[Done]) extends Command

Expand Down Expand Up @@ -140,6 +143,7 @@ object Evolver extends ClusterComponent.Singleton {
MappingDefinition(meta = Map(
mappingsHashField -> index.mappingsHash,
analysisHashField -> index.analysisHash,
settingsHashField -> index.settingsHash,
), properties = index.mappings :+ KeywordField(Index.discriminator))
)
.analysis(index.analysis)
Expand Down Expand Up @@ -173,17 +177,20 @@ object Evolver extends ClusterComponent.Singleton {
val maybeHashes = for {
analysisHash <- existingIndex.mappings.meta.get(analysisHashField)
mappingsHash <- existingIndex.mappings.meta.get(mappingsHashField)
} yield analysisHash -> mappingsHash
settingsHash <- existingIndex.mappings.meta.get(settingsHashField)
} yield (analysisHash, mappingsHash, settingsHash)

maybeHashes match {
case _ if forceReindex =>
log.warn(s"${"forcingReindex" -> "tag"} of ${index.name -> "index"}")
migrateIndex

case Some((existingAnalysisHash, existingMappingsHash)) =>
case Some((existingAnalysisHash, existingMappingsHash, existingSettingsHash)) =>
// it's possible but hard to check whether mappings have only been added (in which case a reindex would not be necessary), or existing have changed. we just take the easy route...
if (existingAnalysisHash != index.analysisHash || existingMappingsHash != index.mappingsHash)
migrateIndex
else if (existingSettingsHash != index.settingsHash)
Future.successful(UpdateIndexSettings(index, updatedPendingIndices))
else {
log.info(s"${"skippingMigration" -> "tag"} of ${index.name -> "index"}")
Future.successful(MigrateNextIndex(updatedPendingIndices))
Expand All @@ -195,7 +202,7 @@ object Evolver extends ClusterComponent.Singleton {
}
}

context.pipeToSelf(eventualCommand)(_.fold(e => IndexMigrationFailed(index, e), identity))
context.pipeToSelf(eventualCommand)(_.fold(IndexMigrationFailed(index, _), identity))
Behaviors.same
}

Expand All @@ -207,6 +214,13 @@ object Evolver extends ClusterComponent.Singleton {
)(triedDone => MappingsAdded(triedDone))
addingMappings(index, pendingIndices)

case UpdateIndexSettings(index, pendingIndices) =>
log.infoT("updatingSettings", s"${index.name -> "index"} ${index.settings}")
context.pipeToSelf(
elasticClient.execute(updateSettings(index.name, index.settings.view.mapValues(_.toString).toMap)).map(_.result).map(_ => Done)
)(triedDone => IndexSettingsUpdated(triedDone))
updatingIndexSettings(index, pendingIndices)

case MigrateIndex(index, oldIndexName, newIndexName, pendingIndices) =>
log.info(s"${"migratingIndex" -> "tag"} ${index.name -> "index"} from $oldIndexName to $newIndexName")
val eventualCommand = for {
Expand Down Expand Up @@ -242,6 +256,17 @@ object Evolver extends ClusterComponent.Singleton {
idle
}

def updatingIndexSettings(index: Index, pendingIndices: Seq[Index]) = Behaviors.receiveMessagePartial[Command] {
case IndexSettingsUpdated(Success(_)) =>
log.infoT("indexSettingsUpdated", s"${index.name -> "index"}")
context.self ! MigrateNextIndex(pendingIndices)
migratingIndices(forceReindex)

case IndexSettingsUpdated(Failure(exception)) =>
log.errorT("updatingIndexSettingsFailed", s"${index.name -> "index"} with $exception, aborting")
idle
}

def migratingIndex(index: Index, oldIndexName: String, newIndexName: String, pendingIndices: Seq[Index]) = Behaviors.withTimers[Command](timerScheduler => Behaviors.receiveMessagePartial {
case IndexMigrationStarted(nodeId, taskId) =>
log.info(s"${"indexMigrationQueued" -> "tag"} of ${index.name -> "index"} at $nodeId with $taskId")
Expand Down Expand Up @@ -377,6 +402,8 @@ object Evolver extends ClusterComponent.Singleton {

private val analysisHashField = "analysisHash"

private val settingsHashField = "settingsHash"

override val name = "elastic-evolver"

override val commandSerializer = CirceSerializer()
Expand Down
2 changes: 2 additions & 0 deletions elastic/core/src/main/scala/Index.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ abstract class Index(

final lazy val analysisHash = analysis.toString.hashCode.toString

final lazy val settingsHash = settings.toString.hashCode.toString

def hitIdFromId(id: Id): Json

lazy implicit val latestTraitIndexable: Indexable[Latest] = indexableWithCirce(implicitly)
Expand Down