Skip to content

Commit

Permalink
Add cluster setting to allow size>0 in request cache
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Oct 25, 2024
1 parent 936cdb9 commit 29a23c0
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
Expand Down Expand Up @@ -89,6 +90,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.indices.IndicesRequestCache.ALLOW_SIZE_NONZERO_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.search.aggregations.AggregationBuilders.dateRange;
Expand Down Expand Up @@ -579,6 +581,20 @@ public void testCanCache() throws Exception {
OpenSearchAssertions.assertAllSuccessful(r4);
assertThat(r4.getHits().getTotalHits().value, equalTo(7L));
assertCacheState(client, index, 0, 4);

// If size > 0 we should cache if this is enabled via cluster setting
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(ALLOW_SIZE_NONZERO_SETTING.getKey(), true));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final SearchResponse r7 = client.prepareSearch(index)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setSize(1)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-22").lte("2016-03-26"))
.get();
OpenSearchAssertions.assertAllSuccessful(r7);
assertThat(r7.getHits().getTotalHits().value, equalTo(5L));
assertCacheState(client, index, 0, 6);
}

public void testCacheWithFilteredAlias() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,
IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING,
IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING,
IndicesRequestCache.ALLOW_SIZE_NONZERO_SETTING,
HunspellService.HUNSPELL_LAZY_LOAD,
HunspellService.HUNSPELL_IGNORE_CASE,
HunspellService.HUNSPELL_DICTIONARY_OPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
Property.NodeScope
);

/**
* If enabled, allows caching size > 0 queries.
*/
public static final Setting<Boolean> ALLOW_SIZE_NONZERO_SETTING = Setting.boolSetting(
"indices.requests.cache.allow_size_nonzero",
false,
Property.NodeScope,
Property.Dynamic
);

private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
Expand Down
12 changes: 11 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.opensearch.indices.IndicesRequestCache.ALLOW_SIZE_NONZERO_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;

Expand Down Expand Up @@ -360,6 +361,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private boolean canCacheSizeNonzeroRequests;

@Override
protected void doStart() {
Expand Down Expand Up @@ -507,6 +509,8 @@ protected void closeInternal() {
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
this.canCacheSizeNonzeroRequests = ALLOW_SIZE_NONZERO_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_SIZE_NONZERO_SETTING, this::setCanCacheSizeNonzeroRequests);
}

public IndicesService(
Expand Down Expand Up @@ -1748,9 +1752,10 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
if (request.requestCache() == null) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
return false;
} else if (context.size() != 0) {
} else if (context.size() != 0 && !canCacheSizeNonzeroRequests) {
// If no request cache query parameter and shard request cache
// is enabled in settings don't cache for requests with size > 0
// unless this is enabled via cluster setting
return false;
}
} else if (request.requestCache() == false) {
Expand Down Expand Up @@ -2118,4 +2123,9 @@ public RemoteStoreSettings getRemoteStoreSettings() {
public CompositeIndexSettings getCompositeIndexSettings() {
return this.compositeIndexSettings;
}

// Package-private for testing
void setCanCacheSizeNonzeroRequests(Boolean canCacheSizeNonzeroRequests) {
this.canCacheSizeNonzeroRequests = canCacheSizeNonzeroRequests;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,25 +641,68 @@ public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws
ShardSearchRequest request = mock(ShardSearchRequest.class);
when(request.requestCache()).thenReturn(true);

TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) {
@Override
public SearchType searchType() {
return SearchType.QUERY_THEN_FETCH;
}
};
TestSearchContext context = getTestContext(indexService, 0);
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);
for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
setupMocksForCanCache(context, cacheHelper);
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
}
}

public void testCanCacheSizeNonzero() {
// Size == 0 requests should always be cacheable (if they pass the other checks).
// Size > 0 requests should only be cacheable if ALLOW_SIZE_NONZERO_SETTING is true.

final IndexService indexService = createIndex("test");
ShardSearchRequest request = mock(ShardSearchRequest.class);
when(request.requestCache()).thenReturn(null);

TestSearchContext sizeZeroContext = getTestContext(indexService, 0);
TestSearchContext sizeNonzeroContext = getTestContext(indexService, 10);

// Test for an IndicesService with the default setting value of false
IndicesService indicesService = getIndicesService();
DelegatingCacheHelper cacheHelper = mock(DelegatingCacheHelper.class);
Map<TestSearchContext, Boolean> expectedResultMap = Map.of(sizeZeroContext, true, sizeNonzeroContext, false);

for (Map.Entry<TestSearchContext, Boolean> entry : expectedResultMap.entrySet()) {
TestSearchContext context = entry.getKey();
setupMocksForCanCache(context, cacheHelper);
assertEquals(entry.getValue(), indicesService.canCache(request, context));
}
// Simulate the cluster setting update by manually calling setCanCacheSizeNonzeroRequests
indicesService.setCanCacheSizeNonzeroRequests(true);
expectedResultMap = Map.of(sizeZeroContext, true, sizeNonzeroContext, true);

for (Map.Entry<TestSearchContext, Boolean> entry : expectedResultMap.entrySet()) {
TestSearchContext context = entry.getKey();
setupMocksForCanCache(context, cacheHelper);
assertEquals(entry.getValue(), indicesService.canCache(request, context));
}
}

private void setupMocksForCanCache(TestSearchContext context, IndexReader.CacheHelper cacheHelper) {
ContextIndexSearcher searcher = mock(ContextIndexSearcher.class);
context.setSearcher(searcher);
DirectoryReader reader = mock(DirectoryReader.class);
when(searcher.getDirectoryReader()).thenReturn(reader);
when(searcher.getIndexReader()).thenReturn(reader);
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
}

for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
}
private TestSearchContext getTestContext(IndexService indexService, int size) {
return new TestSearchContext(indexService.getBigArrays(), indexService) {
@Override
public SearchType searchType() {
return SearchType.QUERY_THEN_FETCH;
}

@Override
public int size() {
return size;
}
};
}
}

0 comments on commit 29a23c0

Please sign in to comment.