Skip to content

Commit

Permalink
Enable strategy with possible shard divide where it makes sense
Browse files Browse the repository at this point in the history
Note: This does not yet guarantee that the stream/iterator is closed. This must be fixed!
  • Loading branch information
tokee committed Oct 13, 2023
1 parent 83f136e commit 2e5c55c
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static Stream<SolrDocument> findImages(

SRequest directImagesReq = SRequest.builder().
query(query).
shardDivide("never"). // We are conservative here
filterQueries(SolrUtils.extend("content_type_norm:image", filterQueries)).
fields(SolrUtils.arcEntryDescriptorFieldList);
if (goFast) { // TODO: Consider if this should be an explicit option instead
Expand All @@ -115,6 +116,7 @@ public static Stream<SolrDocument> findImages(

SRequest htmlRequest = SRequest.builder().
query(query).
shardDivide("never"). // The conservative choice
filterQueries(SolrUtils.extend("content_type_norm:html", filterQueries)).
fields("crawl_date, links_images").
pageSize(100); // The links-field can be heavy and we want low latency
Expand All @@ -125,6 +127,7 @@ public static Stream<SolrDocument> findImages(
if (goFast) {
htmlImages = resolveImagesFromPageRequest(htmlPages, maxImagesPerPage, sharedLinkPruner).
useCachingClient(true).
shardDivide("never"). // Conservative choice
stream();
} else {
Stream<Callable<Stream<SolrDocument>>> htmlCallbacks = htmlPages.
Expand Down Expand Up @@ -172,6 +175,7 @@ public static Callable<Stream<SolrDocument>> createHTMLImageCallback(SolrDocumen
"image_size:[2000 TO *]"). // No small images. (fillers etc.)
fields(SolrUtils.arcEntryDescriptorFieldList). // Contains hash used for uniqueness
timeProximityDeduplication(isotime, "url_norm").
shardDivide("never"). // Conservatice choise. Maybe too conservative?
maxResults(maxImages); // No sense in returning more than maxImages from a sub-request

// The strange construction where the stream is collected and then re-streamed is to ensure that the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import dk.kb.netarchivesuite.solrwayback.solr.SolrGenericStreaming;
import dk.kb.netarchivesuite.solrwayback.solr.SolrStats;
import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamDecorators;
import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamShard;
import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamingExportClient;
import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamingLinkGraphCSVExportClient;
import dk.kb.netarchivesuite.solrwayback.util.CollectionUtils;
import dk.kb.netarchivesuite.solrwayback.util.DateUtils;
import dk.kb.netarchivesuite.solrwayback.util.FileUtil;
import dk.kb.netarchivesuite.solrwayback.util.SolrUtils;
Expand Down Expand Up @@ -636,7 +638,9 @@ public static InputStream exportFields(
ensureUnique(ensureUnique);

// Create stream
Stream<SolrDocument> docs = SolrGenericStreaming.stream(request);
//Stream<SolrDocument> docs = SolrGenericStreaming.stream(request);
// TODO: Figure out how to handle the CloseableStream-problem
Stream<SolrDocument> docs = SolrStreamShard.streamStrategy(request);
if (Boolean.TRUE.equals(flatten)) {
docs = docs.flatMap(SolrStreamDecorators::flatten);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ public ArrayList<ArcEntryDescriptor> findImagesForTimestamp(String searchString,
timeProximityDeduplication(timeStamp, "url_norm").
maxResults(50); // TODO: Make this an argument instead

return SolrGenericStreaming.stream(request)
// TODO: Figure out how to handle the CloseableStream-problem
return SolrStreamShard.streamStrategy(request)
.map(SolrUtils::solrDocument2ArcEntryDescriptor)
.collect(Collectors.toCollection(ArrayList::new));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public static Stream<SolrDocument> stream(List<String> fields, String query, Str
* @see SolrStreamShard#iterateSharded(SRequest, List)
*/
public static Iterator<SolrDocument> iterate(List<String> fields, String query, String... filterQueries) {
return iterate(SRequest.create(query, fields).filterQueries(filterQueries));
return iterate(SRequest.create(query, fields).filterQueries(filterQueries).shardDivide("never"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public SolrStreamingExportClient(
}
this.solrFieldsArray = solrFields.split(", *");
this.csvFieldsArray = csvFields.split(", *");
solrDocs = SolrGenericStreaming.iterate(
// TODO: Handle closing in case of exceptions
//solrDocs = SolrGenericStreaming.iterate(
solrDocs = SolrStreamShard.iterateStrategy(
SRequest.builder().
solrClient(solrClient).
query(query).filterQueries(filterQueries).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public SolrStreamingLinkGraphCSVExportClient(
this.solrFieldsArray = solrFields.split(", *");
this.csvFieldsArray = csvFields.split(", *");

solrDocs = SolrGenericStreaming.iterate(
// TODO: Handle closing in case of exceptions
//solrDocs = SolrGenericStreaming.iterate(
solrDocs = SolrStreamShard.iterateStrategy(
SRequest.builder().
solrClient(solrClient).
query(query).filterQueries(filters).
Expand Down

0 comments on commit 2e5c55c

Please sign in to comment.