Skip to content

Commit

Permalink
Bugfix shard divide sort (not bugfree yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokee committed Oct 11, 2023
1 parent 5b6b597 commit ba0fd4b
Show file tree
Hide file tree
Showing 5 changed files with 252 additions and 44 deletions.
37 changes: 31 additions & 6 deletions src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public enum CHOICE {always, never, auto}
* Default sort used when exporting. Ends with tie breaking on id.
*/
public static final String DEFAULT_SORT = "score desc, id asc";
/**
* The sort param as given by the caller. It is recommended to call {@link #getFullSort()} instead of using this
* value directly as {@code getFullSort()} extends the sort with relevant clauses depending on other attributes
* in the current {@code SRequest}.
*/
public String sort = DEFAULT_SORT;
public String query = null;
public Stream<String> queries = null;
Expand Down Expand Up @@ -789,12 +794,7 @@ public SolrQuery getMergedSolrQuery() {
solrQuery.setFilterQueries(filterQueries.toArray(new String[0]));
}

if (idealTime != null) {
sort = String.format(Locale.ROOT, "%s asc, abs(sub(ms(%s), crawl_date)) asc", deduplicateField, idealTime);
} else if (deduplicateField != null) {
sort = String.format(Locale.ROOT, "%s asc", deduplicateField);
}
solrQuery.set(CommonParams.SORT, sort);
solrQuery.set(CommonParams.SORT, getFullSort());

Set<String> fl = new LinkedHashSet<>();
if (fields != null) {
Expand All @@ -814,6 +814,31 @@ public SolrQuery getMergedSolrQuery() {
return solrQuery;
}

/**
* Resolve the sort param from {@link #DEFAULT_SORT}, {@link #solrQuery}-sort, {@link #sort}, {@link #idealTime},
* {@link #deduplicateField} and tie-breaking on {@code id}.
* @return fully resolved {@code sort} for use with Solr requests.
*/
public String getFullSort() {
String sort = solrQuery == null ?
SRequest.DEFAULT_SORT :
solrQuery.get(CommonParams.SORT, SRequest.DEFAULT_SORT);
if (this.sort != null) {
sort = this.sort;
}
if (!(sort.endsWith("id asc") || sort.endsWith("id desc"))) {
sort = sort + ", id asc"; // A tie breaker is needed for cursorMark and ensures deterministic order
}
if (idealTime != null) {
sort = String.format(Locale.ROOT, "%s asc, abs(sub(ms(%s), crawl_date)) asc, %s",
deduplicateField, idealTime, sort);
} else if (deduplicateField != null) {
sort = String.format(Locale.ROOT, "%s asc, %s",
deduplicateField, sort);
}
return sort;
}

/**
* @return expandResourcesFilterQueries as an array. Empty if no filters has been assigned.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,10 @@ protected SolrGenericStreaming(SRequest request) {
* If {@code fl} is not already set in solrQuery it will be set to {@code source_file_path,source_file_offset}.
* If {@code cursorMark} is not already set in solrQuery it will be set to {@code *}.
* If {@code rows} is not already set in solrQuery it will be set to {@link #DEFAULT_PAGESIZE}.
* If {@code sort} is not already set in solrQuery it will be set to {@link SRequest#DEFAULT_SORT}.
* If {@code sort} does not end with {@code id asc} or {@code id desc}, {@code id asc} will be appended.
* If expandResources is true and {@code fl} in solrQuery does not contain {@code content_type_norm},
* {code source_file_path} and {@code source_file_offset} they will be added.
* If ensureUnique is true and {@code fl} in solrQuery does not contain the field {@code id} it will be added.
* If deduplicateField is specified and {@code fl} in solrQuery does not already contain the field it will be added.
* If deduplicateField is specified and {@code sort} does not already have it as primary sort field it will be added.
* {@code facets}, {@code stats} and {@code hl} will always be set to false, no matter their initial value.
* If uniqueFields are specified, they are added to fields.
*
Expand Down Expand Up @@ -272,16 +269,6 @@ public static void adjustSolrQuery(
}
solrQuery.set(CommonParams.FL, String.join(",", fl));

// Adjust sort to ensure presence of tie breaker and deduplication field (if enabled)
String sort = solrQuery.get(CommonParams.SORT, SRequest.DEFAULT_SORT);
if (!(sort.endsWith("id asc") || sort.endsWith("id desc"))) {
sort = sort + ", id asc"; // A tie breaker is needed when using cursormark
}
if (deduplicateField != null && !sort.startsWith(deduplicateField)) {
solrQuery.set(CommonParams.SORT, deduplicateField + " asc, " + sort);
}
solrQuery.set(CommonParams.SORT, sort);

// Disable irrelevant processing
solrQuery.set(FacetParams.FACET, false);
solrQuery.set(StatsParams.STATS, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ protected static CollectionUtils.CloseableIterator<SolrDocument> iterateSharded(
// Ensure sort fields are delivered by the shard divided streams
Set<String> fields = new LinkedHashSet<>(request.fields);
fields.addAll(getSortFieldNames(base));
request.forceFields(new ArrayList<>(fields)); // TODO: Reduce to original fields before delivering merged result
// TODO: Reduce to original fields before delivering merged result
request.forceFields(new ArrayList<>(fields));

// TODO: Resolve adjustedFields (by moving it into SRequest?)
String adjustedFields = String.join(",", fields);
Expand All @@ -209,15 +210,22 @@ protected static CollectionUtils.CloseableIterator<SolrDocument> iterateSharded(
List<Iterator<SolrDocument>> documentIterators = shards.stream()
.map(shard -> base.deepCopy().shards(shard))
.map(SolrGenericStreaming::new)
// Basic "raw results"
.map(SolrGenericStreaming::iterator)
// Limit hammering on the Solr Cloud
.map(iterator -> CollectionUtils.SharedConstraintIterator.of(iterator, gatekeeper))
// Not necessary but speeds up processing by threading most of the deduplication
// Speed up processing by threading most of the deduplication
.map(iterator -> makeDeduplicatingIfStated(iterator, request))
// Speed up processing by reading ahead
.map(iterator -> CollectionUtils.BufferingIterator.of(iterator, executor, request.pageSize, continueProcessing))
.collect(Collectors.toList());
// Merge all shard divisions to one iterator
Iterator<SolrDocument> docs = CollectionUtils.mergeIterators(documentIterators, getDocumentComparator(request));
// Limit the amount of results
docs = CollectionUtils.CloseableIterator.of(docs, continueProcessing, request.maxResults);
// Remove duplicates, add resources...
docs = SolrStreamDecorators.addPostProcessors(docs, request, adjustedFields);
// TODO: Add limiting iterator that supports continueProcessing to signal feeders
// Ensure that close() propagates to the BufferingIterator to avoid Thread & buffer leaks
return CollectionUtils.CloseableIterator.of(docs, continueProcessing);
}

Expand All @@ -233,11 +241,18 @@ private static Iterator<SolrDocument> makeDeduplicatingIfStated(Iterator<SolrDoc
* @return all field names needed by {@link SRequest#sort}.
*/
private static Set<String> getSortFieldNames(SRequest request) {
return Arrays.stream(request.sort.split(", *"))
log.debug("Constructing shard divide sort from '{}'", request.getFullSort());
Set<String> fields = Arrays.stream(request.getFullSort().split(", *"))
.map(SORT_FIELD_PATTERN::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(1))
.collect(Collectors.toCollection(HashSet::new));
Arrays.stream(request.getFullSort().split(", *"))
.map(SORT_FIELD_TIME_PROXIMITY_PATTERN::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(2))
.forEach(fields::add);
return fields;
}

/**
Expand All @@ -246,19 +261,21 @@ private static Set<String> getSortFieldNames(SRequest request) {
* @param request a request with a comma separates sort chain in {@link SRequest#sort}.
* @return a chained comparator for the sort elements.
*/
private static Comparator<SolrDocument> getDocumentComparator(SRequest request) {
public static Comparator<SolrDocument> getDocumentComparator(SRequest request) {
// https://solr.apache.org/guide/solr/latest/query-guide/common-query-parameters.html#sort-parameter
String[] sortElements = request.sort.split(", *");
Comparator<SolrDocument> comparator = null;
for (String sortElement: sortElements) {
Matcher clauseMatcher = SORT_CLAUSES_PATTERN.matcher(request.getFullSort());
while (clauseMatcher.find()) {
String clause = clauseMatcher.group(1);
if (comparator == null) {
comparator = getSingleComparator(sortElement);
comparator = getSingleComparator(clause);
} else {
comparator = comparator.thenComparing(getSingleComparator(sortElement));
comparator = comparator.thenComparing(getSingleComparator(clause));
}
}
return comparator;
}
public static final Pattern SORT_CLAUSES_PATTERN = Pattern.compile(" *(.*? (?:asc|desc)),? *");

/**
* Limited recreation of Solr sort. Basic score & field-based sorting is supported as well as the time-proximity
Expand Down Expand Up @@ -290,7 +307,7 @@ private static Comparator<SolrDocument> getSingleComparator(String sortElement)
if (proximityMatcher.matches()) {
String origoS = proximityMatcher.group(1);
String field = proximityMatcher.group(2);
int dir = "asc".equals(fieldMatcher.group(3)) ? 1 : -1;
int dir = "asc".equals(proximityMatcher.group(3)) ? 1 : -1;
final long origoEpoch = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(origoS)).getEpochSecond();
return (doc1, doc2) -> {
Object o1 = doc1.getFieldValue(field);
Expand All @@ -308,8 +325,8 @@ private static Comparator<SolrDocument> getSingleComparator(String sortElement)
}

// score and plain fields work the same from a sorting perspective
private static final Pattern SORT_FIELD_PATTERN = Pattern.compile("^([a-zA-Z_][a-zA-Z0-9_]*) (asc|desc)$");
private static final Pattern SORT_FIELD_TIME_PROXIMITY_PATTERN = Pattern.compile(
"^abs *\\( *sub *\\( *ms *\\(([^)]+)\\) *, *([a-zA-Z_][a-zA-Z0-9_]*) *\\)* \\) *(asc|desc)$");
public static final Pattern SORT_FIELD_PATTERN = Pattern.compile("^([a-zA-Z_][a-zA-Z0-9_]*) (asc|desc)$");
public static final Pattern SORT_FIELD_TIME_PROXIMITY_PATTERN = Pattern.compile(
"^abs *\\( *sub *\\( *ms *\\(([^)]+)\\) *, *([a-zA-Z_][a-zA-Z0-9_]*) *\\) *\\) *(asc|desc)$");

}
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ public static <T> Iterator<T> mergeIterators(Collection<Iterator<T>> iterators,
.filter(Iterator::hasNext)
.map(PeekableIterator::new)
.forEach(pq::add);
log.debug("Created merging priority queue with {} elements from {} iterators", pq.size(), iterators.size());
// Create a new iterator that
// 1) pop iterator from the top of the priority queue
// 2) deliver the next()element from the iterator
Expand All @@ -677,10 +676,14 @@ public T next() {
/**
* Special purpose Iterator used by {@link #mergeIteratorsBuffered(Collection, Comparator, Executor, Semaphore, int)}
* for signalling cancellation in case of early iterator termination.
* <p>
* This Iterator optionally auto-closes after a given limit.
*/
public static class CloseableIterator<T> implements Iterator<T>, Closeable {
private final Iterator<T> inner;
private final AtomicBoolean continueProcessing;
private final long limit;
private long delivered = 0;

/**
* Create a closeable iterator with a shared state. If {@code continueProcessing} is set to false, either
Expand All @@ -694,6 +697,24 @@ public static <T> CloseableIterator<T> of(Iterator<T> inner, AtomicBoolean conti
return new CloseableIterator<>(inner, continueProcessing);
}

/**
* Create a closeable iterator with a shared state. If {@code continueProcessing} is set to false, either
* by a call to {@link #close()} or externally (typically by a call to {@code close} on another
* {@code CloseableIterator}), no further elements will be delivered.
* <p>
* Important: When the {@code limit} is hit, {@code continueProcessing} is set to {@code false}.
* This will signal an <strong></trong>immediate</strong> stop signal to iterator sharing the signal.
* It is recommended only to use {@code limit} if the iterator is the last in the chain.
* @param inner any iterator.
* @param continueProcessing if true, standard processing commences. If false, processing is stopped.
* @param limit the maximum amount of elements to deliver.
* If this limit is hit, {@code continueProcessing} is set to false.
* @return a {@code CloseableIterator}.
*/
public static <T> CloseableIterator<T> of(Iterator<T> inner, AtomicBoolean continueProcessing, long limit) {
return new CloseableIterator<>(inner, continueProcessing, limit);
}

/**
* Create a closeable iterator without shared state. If {@code continueProcessing} is set to false
* by a call to {@link #close()}, no further elements will be delivered.
Expand All @@ -708,9 +729,34 @@ public static CloseableIterator<SolrDocument> single(Iterator<SolrDocument> inne
return of(inner, new AtomicBoolean(true));
}

/**
* Create a closeable iterator with a shared state. If {@code continueProcessing} is set to false, either
* by a call to {@link #close()} or externally (typically by a call to {@code close} on another
* {@code CloseableIterator}), no further elements will be delivered.
* @param inner any iterator.
* @param continueProcessing if true, standard processing commences. If false, processing is stopped.
*/
public CloseableIterator(Iterator<T> inner, AtomicBoolean continueProcessing) {
this(inner, continueProcessing, Long.MAX_VALUE);
}

/**
* Create a closeable iterator with a shared state. If {@code continueProcessing} is set to false, either
* by a call to {@link #close()} or externally (typically by a call to {@code close} on another
* {@code CloseableIterator}), no further elements will be delivered.
* <p>
* Important: When the {@code limit} is hit, {@code continueProcessing} is set to {@code false}.
* This will signal an <strong></trong>immediate</strong> stop signal to iterator sharing the signal.
* It is recommended only to use {@code limit} if the iterator is the last in the chain.
* @param inner any iterator.
* @param continueProcessing if true, standard processing commences. If false, processing is stopped.
* @param limit the maximum amount of elements to deliver.
* If this limit is hit, {@code continueProcessing} is set to false.
*/
public CloseableIterator(Iterator<T> inner, AtomicBoolean continueProcessing, long limit) {
this.inner = inner;
this.continueProcessing = continueProcessing;
this.limit = limit;
}

/**
Expand All @@ -735,7 +781,11 @@ public T next() {
// iterators with shared goals.
throw new IllegalStateException("continueProcessing == false");
}
return inner.next();
T innerNext = inner.next();
if (++delivered == limit) {
continueProcessing.set(false);
}
return innerNext;
}

@Override
Expand Down
Loading

0 comments on commit ba0fd4b

Please sign in to comment.