Skip to content

Commit

Permalink
Removed the old aggregation parsing path (#1117)
Browse files Browse the repository at this point in the history
* Removed the old aggregation parsing path in lieu of the new one

* PR fixes
  • Loading branch information
kyle-sammons authored Oct 16, 2024
1 parent 866d0e6 commit 0c61808
Show file tree
Hide file tree
Showing 62 changed files with 466 additions and 5,900 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ public SearchResult<T> query(SearchQuery query) {
return logSearcher.search(
query.dataset,
query.howMany,
query.aggBuilder,
query.queryBuilder,
query.sourceFieldFilter,
query.aggregatorFactoriesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public SearchResult<T> query(SearchQuery query) {
return logSearcher.search(
query.dataset,
query.howMany,
query.aggBuilder,
query.queryBuilder,
query.sourceFieldFilter,
query.aggregatorFactoriesBuilder);
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.slack.astra.logstore.search;

import com.slack.astra.logstore.search.aggregations.AggBuilder;
import java.io.Closeable;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -9,7 +8,6 @@ public interface LogIndexSearcher<T> extends Closeable {
SearchResult<T> search(
String dataset,
int howMany,
AggBuilder aggBuilder,
QueryBuilder queryBuilder,
SourceFieldFilter sourceFieldFilter,
AggregatorFactories.Builder aggregatorFactoriesBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.slack.astra.logstore.LogMessage.SystemField;
import com.slack.astra.logstore.LogWireMessage;
import com.slack.astra.logstore.opensearch.OpenSearchAdapter;
import com.slack.astra.logstore.search.aggregations.AggBuilder;
import com.slack.astra.metadata.schema.LuceneFieldDef;
import com.slack.astra.util.JsonUtil;
import java.io.IOException;
Expand Down Expand Up @@ -58,14 +57,6 @@ public class LogIndexSearcherImpl implements LogIndexSearcher<LogMessage> {

private final ReferenceManager.RefreshListener refreshListener;

// This feature flag enables using OpenSearch to parse our aggregations, rather than using the
// home-rolled
// aggregation parsing we're currently using
private static final Boolean useOpenSearchAggregationParsing =
Boolean.parseBoolean(
System.getProperty("astra.query.useOpenSearchAggregationParsing", "false"));
;

@VisibleForTesting
public static SearcherManager searcherManagerFromChunkId(String chunkId, BlobStore blobStore)
throws IOException {
Expand Down Expand Up @@ -106,14 +97,15 @@ public void afterRefresh(boolean didRefresh) {
public SearchResult<LogMessage> search(
String dataset,
int howMany,
AggBuilder aggBuilder,
QueryBuilder queryBuilder,
SourceFieldFilter sourceFieldFilter,
AggregatorFactories.Builder aggregatorFactoriesBuilder) {

ensureNonEmptyString(dataset, "dataset should be a non-empty string");
ensureTrue(howMany >= 0, "hits requested should not be negative.");
ensureTrue(howMany > 0 || aggBuilder != null, "Hits or aggregation should be requested.");
ensureTrue(
howMany > 0 || aggregatorFactoriesBuilder != null,
"Hits or aggregation should be requested.");

ScopedSpan span = Tracing.currentTracer().startScopedSpan("LogIndexSearcherImpl.search");
span.tag("dataset", dataset);
Expand All @@ -132,20 +124,16 @@ public SearchResult<LogMessage> search(

if (howMany > 0) {
CollectorManager<TopFieldCollector, TopFieldDocs> topFieldCollector =
buildTopFieldCollector(howMany, aggBuilder != null ? Integer.MAX_VALUE : howMany);
buildTopFieldCollector(
howMany, aggregatorFactoriesBuilder != null ? Integer.MAX_VALUE : howMany);
MultiCollectorManager collectorManager;

if (aggregatorFactoriesBuilder != null && useOpenSearchAggregationParsing) {
if (aggregatorFactoriesBuilder != null) {
collectorManager =
new MultiCollectorManager(
topFieldCollector,
openSearchAdapter.getCollectorManager(
aggregatorFactoriesBuilder, searcher, query));
} else if (aggBuilder != null) {
collectorManager =
new MultiCollectorManager(
topFieldCollector,
openSearchAdapter.getCollectorManager(aggBuilder, searcher, query));
} else {
collectorManager = new MultiCollectorManager(topFieldCollector);
}
Expand All @@ -156,14 +144,16 @@ public SearchResult<LogMessage> search(
for (ScoreDoc hit : hits) {
results.add(buildLogMessage(searcher, hit, sourceFieldFilter));
}
if (aggBuilder != null) {
if (aggregatorFactoriesBuilder != null) {
internalAggregation = (InternalAggregation) collector[1];
}
} else {
results = Collections.emptyList();
internalAggregation =
searcher.search(
query, openSearchAdapter.getCollectorManager(aggBuilder, searcher, query));
query,
openSearchAdapter.getCollectorManager(
aggregatorFactoriesBuilder, searcher, query));
}

elapsedTime.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.slack.astra.logstore.search;

import com.slack.astra.logstore.search.aggregations.AggBuilder;
import java.util.List;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -13,7 +12,6 @@ public class SearchQuery {
public final AggregatorFactories.Builder aggregatorFactoriesBuilder;
public final QueryBuilder queryBuilder;
public final int howMany;
public final AggBuilder aggBuilder;
public final List<String> chunkIds;
public final SourceFieldFilter sourceFieldFilter;
public final long startTimeEpochMs;
Expand All @@ -24,14 +22,12 @@ public SearchQuery(
long startTimeEpochMs,
long endTimeEpochMs,
int howMany,
AggBuilder aggBuilder,
List<String> chunkIds,
QueryBuilder queryBuilder,
SourceFieldFilter sourceFieldFilter,
AggregatorFactories.Builder aggregatorFactoriesBuilder) {
this.dataset = dataset;
this.howMany = howMany;
this.aggBuilder = aggBuilder;
this.chunkIds = chunkIds;
this.queryBuilder = queryBuilder;
this.sourceFieldFilter = sourceFieldFilter;
Expand All @@ -50,8 +46,6 @@ public String toString() {
+ howMany
+ ", chunkIds="
+ chunkIds
+ ", aggBuilder="
+ aggBuilder
+ ", queryBuilder="
+ queryBuilder
+ ", sourceFieldFilter="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import brave.Tracing;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.opensearch.AstraBigArrays;
import com.slack.astra.logstore.opensearch.OpenSearchAdapter;
import com.slack.astra.logstore.opensearch.ScriptServiceProvider;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -24,14 +23,6 @@ public class SearchResultAggregatorImpl<T extends LogMessage> implements SearchR

private final SearchQuery searchQuery;

// This feature flag enables using OpenSearch to parse our aggregations, rather than using the
// home-rolled
// aggregation parsing we're currently using
private static final Boolean useOpenSearchAggregationParsing =
Boolean.parseBoolean(
System.getProperty("astra.query.useOpenSearchAggregationParsing", "false"));
;

public SearchResultAggregatorImpl(SearchQuery searchQuery) {
this.searchQuery = searchQuery;
}
Expand Down Expand Up @@ -65,14 +56,10 @@ public SearchResult<T> aggregate(List<SearchResult<T>> searchResults, boolean fi
// The last aggregation should be indicated using the final aggregation boolean. This performs
// some final pass "destructive" actions, such as applying min doc count or extended bounds.
if (finalAggregation) {
if (searchQuery.aggregatorFactoriesBuilder != null
&& this.useOpenSearchAggregationParsing) {
if (searchQuery.aggregatorFactoriesBuilder != null) {
Collection<AggregationBuilder> aggregationBuilders =
searchQuery.aggregatorFactoriesBuilder.getAggregatorFactories();
pipelineTree = aggregationBuilders.iterator().next().buildPipelineTree();
} else {
pipelineTree =
OpenSearchAdapter.getAggregationBuilder(searchQuery.aggBuilder).buildPipelineTree();
}

reduceContext =
Expand Down
Loading

0 comments on commit 0c61808

Please sign in to comment.