diff --git a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java index 8ccd981ca..2cd3dbb0b 100644 --- a/RFS/src/main/java/com/rfs/common/DocumentReindexer.java +++ b/RFS/src/main/java/com/rfs/common/DocumentReindexer.java @@ -32,15 +32,12 @@ public Mono reindex( ) { // Create elastic scheduler for long-lived i/o bound tasks Scheduler elasticScheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "documentReindexerElastic"); - // Create parallel scheduler for short-lived CPU bound tasks - Scheduler parallelScheduler = Schedulers.newParallel( "documentReindexerParallel"); + // Create scheduler for short-lived CPU bound tasks + Scheduler genericScheduler = Schedulers.newParallel( "documentReindexer"); return documentStream - .parallel() - .runOn(parallelScheduler) + .publishOn(genericScheduler) .map(BulkDocSection::new) - .sequential() - .publishOn(parallelScheduler) .bufferUntil(new Predicate<>() { private int currentItemCount = 0; private long currentSize = 0; @@ -84,7 +81,7 @@ public boolean test(BulkDocSection next) { .then() .doFinally(unused -> { elasticScheduler.dispose(); - parallelScheduler.dispose(); + genericScheduler.dispose(); }); } diff --git a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java index 7cc1de162..cfb827b4c 100644 --- a/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.Objects; import java.util.function.Function; import org.apache.lucene.document.Document; @@ -16,6 +15,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; @@ -83,8 +83,7 @@ public Flux readDocuments() { ) .parallel(luceneReaderThreadCount) .runOn(luceneReaderScheduler) - .map(tuple -> getDocument(tuple.getT1(), tuple.getT2(), true)) // Retrieve the document - .filter(Objects::nonNull) // Skip malformed docs + .flatMap(tuple -> Mono.justOrEmpty(getDocument(tuple.getT1(), tuple.getT2(), true))) // Retrieve the document skipping malformed docs .sequential() // Merge parallel streams .doFinally(unused -> luceneReaderScheduler.dispose()); }, reader -> { // Close the DirectoryReader when done @@ -110,14 +109,19 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) { BytesRef sourceBytes = document.getBinaryValue("_source"); String id; try { - id = Uid.decodeId(document.getBinaryValue("_id").bytes); + var idValue = document.getBinaryValue("_id"); + if(idValue == null) { + log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log(); + return null; // Skip documents with missing id + } + id = Uid.decodeId(idValue.bytes); log.atDebug().setMessage("Reading document {}").addArgument(id).log(); } catch (Exception e) { StringBuilder errorMessage = new StringBuilder(); errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); log.atError().setMessage(errorMessage.toString()).setCause(e).log(); - return null; // Skip documents with missing id + return null; // Skip documents with invalid id } if (!isLive) {