Skip to content

Commit

Permalink
Update config async
Browse files Browse the repository at this point in the history
Signed-off-by: Nils Bandener <[email protected]>
  • Loading branch information
nibix committed Aug 28, 2024
1 parent 0252f5b commit 90ec441
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ public Collection<Object> createComponents(
evaluator = new PrivilegesEvaluator(
clusterService,
clusterService::state,
threadPool,
threadPool.getThreadContext(),
cr,
resolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand All @@ -32,6 +34,8 @@
import org.opensearch.cluster.metadata.IndexAbstraction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
Expand All @@ -41,6 +45,7 @@
import org.opensearch.security.securityconf.impl.SecurityDynamicConfiguration;
import org.opensearch.security.securityconf.impl.v7.RoleV7;
import org.opensearch.security.support.WildcardMatcher;
import org.opensearch.threadpool.ThreadPool;

import com.selectivem.collections.CheckTable;
import com.selectivem.collections.CompactMapGroupBuilder;
Expand Down Expand Up @@ -109,7 +114,9 @@ public class ActionPrivileges {
private final ByteSizeValue statefulIndexMaxHeapSize;
private final WildcardMatcher statefulIndexIncludeIndices;

private volatile StatefulIndexPrivileges statefulIndex;
private final AtomicReference<StatefulIndexPrivileges> statefulIndex = new AtomicReference<>();

private Future<?> updateFuture;

/**
* TODO: It is not nice that we cannot use SecurityDynamicConfiguration<?> with a concrete generic parameter
Expand Down Expand Up @@ -190,7 +197,7 @@ public PrivilegesEvaluatorResponse hasIndexPrivilege(
actions
);

StatefulIndexPrivileges statefulIndex = this.statefulIndex;
StatefulIndexPrivileges statefulIndex = this.statefulIndex.get();
PrivilegesEvaluatorResponse resultFromStatefulIndex = null;

Map<String, IndexAbstraction> indexMetadata = this.indexMetadataSupplier.get();
Expand Down Expand Up @@ -220,18 +227,90 @@ public PrivilegesEvaluatorResponse hasExplicitIndexPrivilege(
return this.index.providesExplicitPrivilege(context, actions, resolvedIndices, checkTable, this.indexMetadataSupplier.get());
}

public void updateStatefulIndexPrivileges(Map<String, IndexAbstraction> indices) {
StatefulIndexPrivileges statefulIndex = this.statefulIndex;
public void updateStatefulIndexPrivileges(Map<String, IndexAbstraction> indices, long metadataVersion) {
StatefulIndexPrivileges statefulIndex = this.statefulIndex.get();

indices = StatefulIndexPrivileges.relevantOnly(indices, statefulIndexIncludeIndices);

if (statefulIndex == null || !statefulIndex.indices.equals(indices)) {
this.statefulIndex = new StatefulIndexPrivileges(roles, actionGroups, wellKnownIndexActions, indices, statefulIndexMaxHeapSize);
long start = System.currentTimeMillis();
this.statefulIndex.set(
new StatefulIndexPrivileges(roles, actionGroups, wellKnownIndexActions, indices, metadataVersion, statefulIndexMaxHeapSize)
);
long duration = System.currentTimeMillis() - start;
log.info("Updating StatefulIndexPrivileges took {} ms", duration);
} else {
synchronized (this) {
// Even if the indices did not change, update the metadataVersion in statefulIndex to reflect
// that the instance is up-to-date.
if (statefulIndex.metadataVersion < metadataVersion) {
statefulIndex.metadataVersion = metadataVersion;
}
}
}
}

/**
* Updates the stateful index configuration asynchronously with the index metadata from the current cluster state.
* As the update process can take some seconds for clusters with many indices, this method "de-bounces" the updates,
* i.e., a further update will be only initiated after the previous update has finished. This is okay as this class
* can handle the case that it do not have the most recent information. It will fall back to slower methods then.
*/
public synchronized void updateStatefulIndexPrivilegesAsync(ClusterService clusterService, ThreadPool threadPool) {
long currentMetadataVersion = clusterService.state().metadata().version();

StatefulIndexPrivileges statefulIndex = this.statefulIndex.get();

if (statefulIndex != null && currentMetadataVersion <= statefulIndex.metadataVersion) {
return;
}

if (this.updateFuture == null || this.updateFuture.isDone()) {
this.updateFuture = threadPool.generic().submit(() -> {
for (int i = 0;; i++) {
if (i > 10) {
try {
// In case we got many consecutive updates, let's sleep a little to let
// other operations catch up.
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}
}

Metadata metadata = clusterService.state().metadata();

synchronized (ActionPrivileges.this) {
if (metadata.version() <= ActionPrivileges.this.statefulIndex.get().metadataVersion) {
return;
}
}

try {
log.debug("Updating ActionPrivileges with metadata version {}", metadata.version());
updateStatefulIndexPrivileges(metadata.getIndicesLookup(), metadata.version());
} catch (Exception e) {
log.error("Error while updating ActionPrivileges", e);
} finally {
synchronized (ActionPrivileges.this) {
if (ActionPrivileges.this.updateFuture.isCancelled()) {
return;
}
}
}
}
});
}
}

public synchronized void shutdown() {
if (this.updateFuture != null && !this.updateFuture.isDone()) {
this.updateFuture.cancel(true);
}
}

int getEstimatedStatefulIndexByteSize() {
StatefulIndexPrivileges statefulIndex = this.statefulIndex;
StatefulIndexPrivileges statefulIndex = this.statefulIndex.get();

if (statefulIndex != null) {
return statefulIndex.estimatedByteSize;
Expand Down Expand Up @@ -846,6 +925,8 @@ static class StatefulIndexPrivileges {

private final int estimatedByteSize;

private long metadataVersion;

/**
* Creates pre-computed index privileges based on the given parameters.
* <p>
Expand All @@ -858,6 +939,7 @@ static class StatefulIndexPrivileges {
FlattenedActionGroups actionGroups,
ImmutableSet<String> wellKnownIndexActions,
Map<String, IndexAbstraction> indices,
long metadataVersion,
ByteSizeValue statefulIndexMaxHeapSize
) {
Map<
Expand Down Expand Up @@ -951,6 +1033,7 @@ static class StatefulIndexPrivileges {
);

this.indices = ImmutableMap.copyOf(indices);
this.metadataVersion = metadataVersion;
this.wellKnownIndexActions = wellKnownIndexActions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -72,12 +73,11 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.update.UpdateAction;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand All @@ -103,6 +103,7 @@
import org.opensearch.security.support.WildcardMatcher;
import org.opensearch.security.user.User;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;

import org.greenrobot.eventbus.Subscribe;

Expand Down Expand Up @@ -150,11 +151,12 @@ public class PrivilegesEvaluator {
private DynamicConfigModel dcm;
private final NamedXContentRegistry namedXContentRegistry;
private final Settings settings;
private volatile ActionPrivileges actionPrivileges;
private final AtomicReference<ActionPrivileges> actionPrivileges = new AtomicReference<>();

public PrivilegesEvaluator(
final ClusterService clusterService,
Supplier<ClusterState> clusterStateSupplier,
ThreadPool threadPool,
final ThreadContext threadContext,
final ConfigurationRepository configurationRepository,
final IndexNameExpressionResolver resolver,
Expand Down Expand Up @@ -207,17 +209,10 @@ public PrivilegesEvaluator(
}

if (clusterService != null) {
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
ActionPrivileges actionPrivileges = PrivilegesEvaluator.this.actionPrivileges;
if (actionPrivileges != null) {
actionPrivileges.updateStatefulIndexPrivileges(event.state().metadata().getIndicesLookup());
}
} catch (Exception e) {
log.error("Error while updating ActionPrivileges object with new index metadata", e);
}
clusterService.addListener(event -> {
ActionPrivileges actionPrivileges = PrivilegesEvaluator.this.actionPrivileges.get();
if (actionPrivileges != null) {
actionPrivileges.updateStatefulIndexPrivilegesAsync(clusterService, threadPool);
}
});
}
Expand All @@ -240,8 +235,13 @@ void updateConfiguration(
() -> clusterStateSupplier.get().metadata().getIndicesLookup(),
settings
);
actionPrivileges.updateStatefulIndexPrivileges(clusterStateSupplier.get().metadata().getIndicesLookup());
this.actionPrivileges = actionPrivileges;
Metadata metadata = clusterStateSupplier.get().metadata();
actionPrivileges.updateStatefulIndexPrivileges(metadata.getIndicesLookup(), metadata.version());
ActionPrivileges oldInstance = this.actionPrivileges.getAndSet(actionPrivileges);

if (oldInstance != null) {
oldInstance.shutdown();
}
}
}

Expand All @@ -256,16 +256,16 @@ public void onDynamicConfigModelChanged(DynamicConfigModel dcm) {
}

public ActionPrivileges getActionPrivileges() {
return this.actionPrivileges;
return this.actionPrivileges.get();
}

public boolean hasRestAdminPermissions(final User user, final TransportAddress remoteAddress, final String permission) {
PrivilegesEvaluationContext context = createContext(user, permission);
return this.actionPrivileges.hasExplicitClusterPrivilege(context, permission).isAllowed();
return this.actionPrivileges.get().hasExplicitClusterPrivilege(context, permission).isAllowed();
}

public boolean isInitialized() {
return configModel != null && dcm != null && actionPrivileges != null;
return configModel != null && dcm != null && actionPrivileges.get() != null;
}

private void setUserInfoInThreadContext(User user) {
Expand Down Expand Up @@ -355,7 +355,7 @@ public PrivilegesEvaluatorResponse evaluate(PrivilegesEvaluationContext context)
log.debug("Mapped roles: {}", mappedRoles.toString());
}

ActionPrivileges actionPrivileges = this.actionPrivileges;
ActionPrivileges actionPrivileges = this.actionPrivileges.get();
if (actionPrivileges == null) {
throw new OpenSearchSecurityException("OpenSearch Security is not initialized: roles configuration is missing");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public IndicesAndAliases(IndexSpec indexSpec, ActionSpec actionSpec, Statefulnes
);

if (statefulness == Statefulness.STATEFUL) {
this.subject.updateStatefulIndexPrivileges(INDEX_METADATA);
this.subject.updateStatefulIndexPrivileges(INDEX_METADATA, 1);
}
}

Expand Down Expand Up @@ -446,7 +446,7 @@ public DataStreams(IndexSpec indexSpec, ActionSpec actionSpec, Statefulness stat
this.subject = new ActionPrivileges(roles, FlattenedActionGroups.EMPTY, () -> INDEX_METADATA, Settings.EMPTY);

if (statefulness == Statefulness.STATEFUL) {
this.subject.updateStatefulIndexPrivileges(INDEX_METADATA);
this.subject.updateStatefulIndexPrivileges(INDEX_METADATA, 1);
}
}

Expand Down Expand Up @@ -615,7 +615,7 @@ public static class StatefulIndexPrivilegesHeapSize {
public void estimatedSize() throws Exception {
ActionPrivileges subject = new ActionPrivileges(roles, FlattenedActionGroups.EMPTY, () -> indices, Settings.EMPTY);

subject.updateStatefulIndexPrivileges(indices);
subject.updateStatefulIndexPrivileges(indices, 1);

int lowerBound = (int) (expectedEstimatedNumberOfBytes * 0.9);
int upperBound = (int) (expectedEstimatedNumberOfBytes * 1.1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ PrivilegesEvaluator createPrivilegesEvaluator(SecurityDynamicConfiguration<RoleV
PrivilegesEvaluator privilegesEvaluator = new PrivilegesEvaluator(
clusterService,
() -> clusterService.state(),
null,
new ThreadContext(Settings.EMPTY),
null,
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
Expand Down

0 comments on commit 90ec441

Please sign in to comment.