From 2e5c55c6b4492e3a91148397f1c9ff5788eb72ad Mon Sep 17 00:00:00 2001 From: Toke Eskildsen Date: Fri, 13 Oct 2023 07:51:09 +0200 Subject: [PATCH] Enable strategy with possible shard divide where it makes sense Note: This does not yet guarantee that the stream/iterator is closed. This must be fixed! --- .../netarchivesuite/solrwayback/export/ContentStreams.java | 4 ++++ .../dk/kb/netarchivesuite/solrwayback/facade/Facade.java | 6 +++++- .../solrwayback/solr/NetarchiveSolrClient.java | 3 ++- .../solrwayback/solr/SolrGenericStreaming.java | 2 +- .../solrwayback/solr/SolrStreamingExportClient.java | 4 +++- .../solr/SolrStreamingLinkGraphCSVExportClient.java | 4 +++- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/export/ContentStreams.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/export/ContentStreams.java index ad5f44dfd..f01f0a6d3 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/export/ContentStreams.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/export/ContentStreams.java @@ -104,6 +104,7 @@ public static Stream findImages( SRequest directImagesReq = SRequest.builder(). query(query). + shardDivide("never"). // We are conservative here filterQueries(SolrUtils.extend("content_type_norm:image", filterQueries)). fields(SolrUtils.arcEntryDescriptorFieldList); if (goFast) { // TODO: Consider if this should be an explicit option instead @@ -115,6 +116,7 @@ public static Stream findImages( SRequest htmlRequest = SRequest.builder(). query(query). + shardDivide("never"). // The conservative choice filterQueries(SolrUtils.extend("content_type_norm:html", filterQueries)). fields("crawl_date, links_images"). pageSize(100); // The links-field can be heavy and we want low latency @@ -125,6 +127,7 @@ public static Stream findImages( if (goFast) { htmlImages = resolveImagesFromPageRequest(htmlPages, maxImagesPerPage, sharedLinkPruner). useCachingClient(true). + shardDivide("never"). // Conservative choice stream(); } else { Stream>> htmlCallbacks = htmlPages. @@ -172,6 +175,7 @@ public static Callable> createHTMLImageCallback(SolrDocumen "image_size:[2000 TO *]"). // No small images. (fillers etc.) fields(SolrUtils.arcEntryDescriptorFieldList). // Contains hash used for uniqueness timeProximityDeduplication(isotime, "url_norm"). + shardDivide("never"). // Conservatice choise. Maybe too conservative? maxResults(maxImages); // No sense in returning more than maxImages from a sub-request // The strange construction where the stream is collected and then re-streamed is to ensure that the diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/facade/Facade.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/facade/Facade.java index b28b25c8e..480c304ee 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/facade/Facade.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/facade/Facade.java @@ -41,8 +41,10 @@ import dk.kb.netarchivesuite.solrwayback.solr.SolrGenericStreaming; import dk.kb.netarchivesuite.solrwayback.solr.SolrStats; import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamDecorators; +import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamShard; import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamingExportClient; import dk.kb.netarchivesuite.solrwayback.solr.SolrStreamingLinkGraphCSVExportClient; +import dk.kb.netarchivesuite.solrwayback.util.CollectionUtils; import dk.kb.netarchivesuite.solrwayback.util.DateUtils; import dk.kb.netarchivesuite.solrwayback.util.FileUtil; import dk.kb.netarchivesuite.solrwayback.util.SolrUtils; @@ -636,7 +638,9 @@ public static InputStream exportFields( ensureUnique(ensureUnique); // Create stream - Stream docs = SolrGenericStreaming.stream(request); + //Stream docs = SolrGenericStreaming.stream(request); + // TODO: Figure out how to handle the CloseableStream-problem + Stream docs = SolrStreamShard.streamStrategy(request); if (Boolean.TRUE.equals(flatten)) { docs = docs.flatMap(SolrStreamDecorators::flatten); } diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/NetarchiveSolrClient.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/NetarchiveSolrClient.java index 8e18b0702..233aaf6c7 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/NetarchiveSolrClient.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/NetarchiveSolrClient.java @@ -398,7 +398,8 @@ public ArrayList findImagesForTimestamp(String searchString, timeProximityDeduplication(timeStamp, "url_norm"). maxResults(50); // TODO: Make this an argument instead - return SolrGenericStreaming.stream(request) + // TODO: Figure out how to handle the CloseableStream-problem + return SolrStreamShard.streamStrategy(request) .map(SolrUtils::solrDocument2ArcEntryDescriptor) .collect(Collectors.toCollection(ArrayList::new)); } diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrGenericStreaming.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrGenericStreaming.java index d842a7938..58a4e0561 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrGenericStreaming.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrGenericStreaming.java @@ -199,7 +199,7 @@ public static Stream stream(List fields, String query, Str * @see SolrStreamShard#iterateSharded(SRequest, List) */ public static Iterator iterate(List fields, String query, String... filterQueries) { - return iterate(SRequest.create(query, fields).filterQueries(filterQueries)); + return iterate(SRequest.create(query, fields).filterQueries(filterQueries).shardDivide("never")); } /** diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingExportClient.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingExportClient.java index 94863ada3..a3d63f4c3 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingExportClient.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingExportClient.java @@ -39,7 +39,9 @@ public SolrStreamingExportClient( } this.solrFieldsArray = solrFields.split(", *"); this.csvFieldsArray = csvFields.split(", *"); - solrDocs = SolrGenericStreaming.iterate( + // TODO: Handle closing in case of exceptions + //solrDocs = SolrGenericStreaming.iterate( + solrDocs = SolrStreamShard.iterateStrategy( SRequest.builder(). solrClient(solrClient). query(query).filterQueries(filterQueries). diff --git a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingLinkGraphCSVExportClient.java b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingLinkGraphCSVExportClient.java index 443179243..5be8e166d 100644 --- a/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingLinkGraphCSVExportClient.java +++ b/src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SolrStreamingLinkGraphCSVExportClient.java @@ -38,7 +38,9 @@ public SolrStreamingLinkGraphCSVExportClient( this.solrFieldsArray = solrFields.split(", *"); this.csvFieldsArray = csvFields.split(", *"); - solrDocs = SolrGenericStreaming.iterate( + // TODO: Handle closing in case of exceptions + //solrDocs = SolrGenericStreaming.iterate( + solrDocs = SolrStreamShard.iterateStrategy( SRequest.builder(). solrClient(solrClient). query(query).filterQueries(filters).