Skip to content

Commit

Permalink
Fix behavior for null document ids
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Aug 15, 2024
1 parent 9352128 commit 725b083
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 12 deletions.
11 changes: 4 additions & 7 deletions RFS/src/main/java/com/rfs/common/DocumentReindexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,12 @@ public Mono<Void> reindex(
) {
// Create elastic scheduler for long-lived i/o bound tasks
Scheduler elasticScheduler = Schedulers.newBoundedElastic(maxConcurrentWorkItems, Integer.MAX_VALUE, "documentReindexerElastic");
// Create parallel scheduler for short-lived CPU bound tasks
Scheduler parallelScheduler = Schedulers.newParallel( "documentReindexerParallel");
// Create scheduler for short-lived CPU bound tasks
Scheduler genericScheduler = Schedulers.newParallel( "documentReindexer");

return documentStream
.parallel()
.runOn(parallelScheduler)
.publishOn(genericScheduler)
.map(BulkDocSection::new)
.sequential()
.publishOn(parallelScheduler)
.bufferUntil(new Predicate<>() {
private int currentItemCount = 0;
private long currentSize = 0;
Expand Down Expand Up @@ -84,7 +81,7 @@ public boolean test(BulkDocSection next) {
.then()
.doFinally(unused -> {
elasticScheduler.dispose();
parallelScheduler.dispose();
genericScheduler.dispose();
});
}

Expand Down
14 changes: 9 additions & 5 deletions RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Objects;
import java.util.function.Function;

import org.apache.lucene.document.Document;
Expand All @@ -16,6 +15,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
Expand Down Expand Up @@ -83,8 +83,7 @@ public Flux<Document> readDocuments() {
)
.parallel(luceneReaderThreadCount)
.runOn(luceneReaderScheduler)
.map(tuple -> getDocument(tuple.getT1(), tuple.getT2(), true)) // Retrieve the document
.filter(Objects::nonNull) // Skip malformed docs
.flatMap(tuple -> Mono.justOrEmpty(getDocument(tuple.getT1(), tuple.getT2(), true))) // Retrieve the document skipping malformed docs
.sequential() // Merge parallel streams
.doFinally(unused -> luceneReaderScheduler.dispose());
}, reader -> { // Close the DirectoryReader when done
Expand All @@ -110,14 +109,19 @@ protected Document getDocument(IndexReader reader, int docId, boolean isLive) {
BytesRef sourceBytes = document.getBinaryValue("_source");
String id;
try {
id = Uid.decodeId(document.getBinaryValue("_id").bytes);
var idValue = document.getBinaryValue("_id");
if(idValue == null) {
log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log();
return null; // Skip documents with missing id

Check warning on line 115 in RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java#L114-L115

Added lines #L114 - L115 were not covered by tests
}
id = Uid.decodeId(idValue.bytes);
log.atDebug().setMessage("Reading document {}").addArgument(id).log();
} catch (Exception e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
log.atError().setMessage(errorMessage.toString()).setCause(e).log();
return null; // Skip documents with missing id
return null; // Skip documents with invalid id

Check warning on line 124 in RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/LuceneDocumentsReader.java#L124

Added line #L124 was not covered by tests
}

if (!isLive) {
Expand Down

0 comments on commit 725b083

Please sign in to comment.