From 90ec44155ed7e9d7e8eac8ca7ee6eb0bc1deef3d Mon Sep 17 00:00:00 2001 From: Nils Bandener Date: Wed, 28 Aug 2024 11:30:32 +0200 Subject: [PATCH] Update config async Signed-off-by: Nils Bandener --- .../security/OpenSearchSecurityPlugin.java | 1 + .../security/privileges/ActionPrivileges.java | 95 +++++++++++++++++-- .../privileges/PrivilegesEvaluator.java | 40 ++++---- .../privileges/ActionPrivilegesTest.java | 6 +- .../RestLayerPrivilegesEvaluatorTest.java | 1 + 5 files changed, 114 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java index 4ab41e5b18..0658886ff0 100644 --- a/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java +++ b/src/main/java/org/opensearch/security/OpenSearchSecurityPlugin.java @@ -1118,6 +1118,7 @@ public Collection createComponents( evaluator = new PrivilegesEvaluator( clusterService, clusterService::state, + threadPool, threadPool.getThreadContext(), cr, resolver, diff --git a/src/main/java/org/opensearch/security/privileges/ActionPrivileges.java b/src/main/java/org/opensearch/security/privileges/ActionPrivileges.java index 853abaf577..2b03dbb72b 100644 --- a/src/main/java/org/opensearch/security/privileges/ActionPrivileges.java +++ b/src/main/java/org/opensearch/security/privileges/ActionPrivileges.java @@ -17,6 +17,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -32,6 +34,8 @@ import org.opensearch.cluster.metadata.IndexAbstraction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -41,6 +45,7 @@ import org.opensearch.security.securityconf.impl.SecurityDynamicConfiguration; import org.opensearch.security.securityconf.impl.v7.RoleV7; import org.opensearch.security.support.WildcardMatcher; +import org.opensearch.threadpool.ThreadPool; import com.selectivem.collections.CheckTable; import com.selectivem.collections.CompactMapGroupBuilder; @@ -109,7 +114,9 @@ public class ActionPrivileges { private final ByteSizeValue statefulIndexMaxHeapSize; private final WildcardMatcher statefulIndexIncludeIndices; - private volatile StatefulIndexPrivileges statefulIndex; + private final AtomicReference statefulIndex = new AtomicReference<>(); + + private Future updateFuture; /** * TODO: It is not nice that we cannot use SecurityDynamicConfiguration with a concrete generic parameter @@ -190,7 +197,7 @@ public PrivilegesEvaluatorResponse hasIndexPrivilege( actions ); - StatefulIndexPrivileges statefulIndex = this.statefulIndex; + StatefulIndexPrivileges statefulIndex = this.statefulIndex.get(); PrivilegesEvaluatorResponse resultFromStatefulIndex = null; Map indexMetadata = this.indexMetadataSupplier.get(); @@ -220,18 +227,90 @@ public PrivilegesEvaluatorResponse hasExplicitIndexPrivilege( return this.index.providesExplicitPrivilege(context, actions, resolvedIndices, checkTable, this.indexMetadataSupplier.get()); } - public void updateStatefulIndexPrivileges(Map indices) { - StatefulIndexPrivileges statefulIndex = this.statefulIndex; + public void updateStatefulIndexPrivileges(Map indices, long metadataVersion) { + StatefulIndexPrivileges statefulIndex = this.statefulIndex.get(); indices = StatefulIndexPrivileges.relevantOnly(indices, statefulIndexIncludeIndices); if (statefulIndex == null || !statefulIndex.indices.equals(indices)) { - this.statefulIndex = new StatefulIndexPrivileges(roles, actionGroups, wellKnownIndexActions, indices, statefulIndexMaxHeapSize); + long start = System.currentTimeMillis(); + this.statefulIndex.set( + new StatefulIndexPrivileges(roles, actionGroups, wellKnownIndexActions, indices, metadataVersion, statefulIndexMaxHeapSize) + ); + long duration = System.currentTimeMillis() - start; + log.info("Updating StatefulIndexPrivileges took {} ms", duration); + } else { + synchronized (this) { + // Even if the indices did not change, update the metadataVersion in statefulIndex to reflect + // that the instance is up-to-date. + if (statefulIndex.metadataVersion < metadataVersion) { + statefulIndex.metadataVersion = metadataVersion; + } + } + } + } + + /** + * Updates the stateful index configuration asynchronously with the index metadata from the current cluster state. + * As the update process can take some seconds for clusters with many indices, this method "de-bounces" the updates, + * i.e., a further update will be only initiated after the previous update has finished. This is okay as this class + * can handle the case that it do not have the most recent information. It will fall back to slower methods then. + */ + public synchronized void updateStatefulIndexPrivilegesAsync(ClusterService clusterService, ThreadPool threadPool) { + long currentMetadataVersion = clusterService.state().metadata().version(); + + StatefulIndexPrivileges statefulIndex = this.statefulIndex.get(); + + if (statefulIndex != null && currentMetadataVersion <= statefulIndex.metadataVersion) { + return; + } + + if (this.updateFuture == null || this.updateFuture.isDone()) { + this.updateFuture = threadPool.generic().submit(() -> { + for (int i = 0;; i++) { + if (i > 10) { + try { + // In case we got many consecutive updates, let's sleep a little to let + // other operations catch up. + Thread.sleep(100); + } catch (InterruptedException e) { + return; + } + } + + Metadata metadata = clusterService.state().metadata(); + + synchronized (ActionPrivileges.this) { + if (metadata.version() <= ActionPrivileges.this.statefulIndex.get().metadataVersion) { + return; + } + } + + try { + log.debug("Updating ActionPrivileges with metadata version {}", metadata.version()); + updateStatefulIndexPrivileges(metadata.getIndicesLookup(), metadata.version()); + } catch (Exception e) { + log.error("Error while updating ActionPrivileges", e); + } finally { + synchronized (ActionPrivileges.this) { + if (ActionPrivileges.this.updateFuture.isCancelled()) { + return; + } + } + } + } + }); + } + } + + public synchronized void shutdown() { + if (this.updateFuture != null && !this.updateFuture.isDone()) { + this.updateFuture.cancel(true); } } int getEstimatedStatefulIndexByteSize() { - StatefulIndexPrivileges statefulIndex = this.statefulIndex; + StatefulIndexPrivileges statefulIndex = this.statefulIndex.get(); if (statefulIndex != null) { return statefulIndex.estimatedByteSize; @@ -846,6 +925,8 @@ static class StatefulIndexPrivileges { private final int estimatedByteSize; + private long metadataVersion; + /** * Creates pre-computed index privileges based on the given parameters. *

@@ -858,6 +939,7 @@ static class StatefulIndexPrivileges { FlattenedActionGroups actionGroups, ImmutableSet wellKnownIndexActions, Map indices, + long metadataVersion, ByteSizeValue statefulIndexMaxHeapSize ) { Map< @@ -951,6 +1033,7 @@ static class StatefulIndexPrivileges { ); this.indices = ImmutableMap.copyOf(indices); + this.metadataVersion = metadataVersion; this.wellKnownIndexActions = wellKnownIndexActions; } diff --git a/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java b/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java index b972392191..48e8d9a0b2 100644 --- a/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java +++ b/src/main/java/org/opensearch/security/privileges/PrivilegesEvaluator.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import com.google.common.collect.ImmutableList; @@ -72,12 +73,11 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.termvectors.MultiTermVectorsAction; import org.opensearch.action.update.UpdateAction; -import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; @@ -103,6 +103,7 @@ import org.opensearch.security.support.WildcardMatcher; import org.opensearch.security.user.User; import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.greenrobot.eventbus.Subscribe; @@ -150,11 +151,12 @@ public class PrivilegesEvaluator { private DynamicConfigModel dcm; private final NamedXContentRegistry namedXContentRegistry; private final Settings settings; - private volatile ActionPrivileges actionPrivileges; + private final AtomicReference actionPrivileges = new AtomicReference<>(); public PrivilegesEvaluator( final ClusterService clusterService, Supplier clusterStateSupplier, + ThreadPool threadPool, final ThreadContext threadContext, final ConfigurationRepository configurationRepository, final IndexNameExpressionResolver resolver, @@ -207,17 +209,10 @@ public PrivilegesEvaluator( } if (clusterService != null) { - clusterService.addListener(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - try { - ActionPrivileges actionPrivileges = PrivilegesEvaluator.this.actionPrivileges; - if (actionPrivileges != null) { - actionPrivileges.updateStatefulIndexPrivileges(event.state().metadata().getIndicesLookup()); - } - } catch (Exception e) { - log.error("Error while updating ActionPrivileges object with new index metadata", e); - } + clusterService.addListener(event -> { + ActionPrivileges actionPrivileges = PrivilegesEvaluator.this.actionPrivileges.get(); + if (actionPrivileges != null) { + actionPrivileges.updateStatefulIndexPrivilegesAsync(clusterService, threadPool); } }); } @@ -240,8 +235,13 @@ void updateConfiguration( () -> clusterStateSupplier.get().metadata().getIndicesLookup(), settings ); - actionPrivileges.updateStatefulIndexPrivileges(clusterStateSupplier.get().metadata().getIndicesLookup()); - this.actionPrivileges = actionPrivileges; + Metadata metadata = clusterStateSupplier.get().metadata(); + actionPrivileges.updateStatefulIndexPrivileges(metadata.getIndicesLookup(), metadata.version()); + ActionPrivileges oldInstance = this.actionPrivileges.getAndSet(actionPrivileges); + + if (oldInstance != null) { + oldInstance.shutdown(); + } } } @@ -256,16 +256,16 @@ public void onDynamicConfigModelChanged(DynamicConfigModel dcm) { } public ActionPrivileges getActionPrivileges() { - return this.actionPrivileges; + return this.actionPrivileges.get(); } public boolean hasRestAdminPermissions(final User user, final TransportAddress remoteAddress, final String permission) { PrivilegesEvaluationContext context = createContext(user, permission); - return this.actionPrivileges.hasExplicitClusterPrivilege(context, permission).isAllowed(); + return this.actionPrivileges.get().hasExplicitClusterPrivilege(context, permission).isAllowed(); } public boolean isInitialized() { - return configModel != null && dcm != null && actionPrivileges != null; + return configModel != null && dcm != null && actionPrivileges.get() != null; } private void setUserInfoInThreadContext(User user) { @@ -355,7 +355,7 @@ public PrivilegesEvaluatorResponse evaluate(PrivilegesEvaluationContext context) log.debug("Mapped roles: {}", mappedRoles.toString()); } - ActionPrivileges actionPrivileges = this.actionPrivileges; + ActionPrivileges actionPrivileges = this.actionPrivileges.get(); if (actionPrivileges == null) { throw new OpenSearchSecurityException("OpenSearch Security is not initialized: roles configuration is missing"); } diff --git a/src/test/java/org/opensearch/security/privileges/ActionPrivilegesTest.java b/src/test/java/org/opensearch/security/privileges/ActionPrivilegesTest.java index e4ce2d4bec..371652287f 100644 --- a/src/test/java/org/opensearch/security/privileges/ActionPrivilegesTest.java +++ b/src/test/java/org/opensearch/security/privileges/ActionPrivilegesTest.java @@ -282,7 +282,7 @@ public IndicesAndAliases(IndexSpec indexSpec, ActionSpec actionSpec, Statefulnes ); if (statefulness == Statefulness.STATEFUL) { - this.subject.updateStatefulIndexPrivileges(INDEX_METADATA); + this.subject.updateStatefulIndexPrivileges(INDEX_METADATA, 1); } } @@ -446,7 +446,7 @@ public DataStreams(IndexSpec indexSpec, ActionSpec actionSpec, Statefulness stat this.subject = new ActionPrivileges(roles, FlattenedActionGroups.EMPTY, () -> INDEX_METADATA, Settings.EMPTY); if (statefulness == Statefulness.STATEFUL) { - this.subject.updateStatefulIndexPrivileges(INDEX_METADATA); + this.subject.updateStatefulIndexPrivileges(INDEX_METADATA, 1); } } @@ -615,7 +615,7 @@ public static class StatefulIndexPrivilegesHeapSize { public void estimatedSize() throws Exception { ActionPrivileges subject = new ActionPrivileges(roles, FlattenedActionGroups.EMPTY, () -> indices, Settings.EMPTY); - subject.updateStatefulIndexPrivileges(indices); + subject.updateStatefulIndexPrivileges(indices, 1); int lowerBound = (int) (expectedEstimatedNumberOfBytes * 0.9); int upperBound = (int) (expectedEstimatedNumberOfBytes * 1.1); diff --git a/src/test/java/org/opensearch/security/privileges/RestLayerPrivilegesEvaluatorTest.java b/src/test/java/org/opensearch/security/privileges/RestLayerPrivilegesEvaluatorTest.java index 25f398f66a..028efc4d2f 100644 --- a/src/test/java/org/opensearch/security/privileges/RestLayerPrivilegesEvaluatorTest.java +++ b/src/test/java/org/opensearch/security/privileges/RestLayerPrivilegesEvaluatorTest.java @@ -151,6 +151,7 @@ PrivilegesEvaluator createPrivilegesEvaluator(SecurityDynamicConfiguration clusterService.state(), + null, new ThreadContext(Settings.EMPTY), null, new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),