Skip to content

Commit

Permalink
WIP to reformat log statements so that they aren't susceptible to for…
Browse files Browse the repository at this point in the history
…matting errors and worse when {} are in the message.

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Oct 19, 2024
1 parent 30185a2 commit 8ebd9f8
Show file tree
Hide file tree
Showing 83 changed files with 514 additions and 672 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void run() {
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
}
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
log.atError().setCause(e).setMessage("Unexpected error running RfsWorker").log();
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public static void main(String[] args) throws Exception {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
log.atError().setCause(e).setMessage("Unexpected error running RfsWorker").log();
throw e;
}
}
Expand Down Expand Up @@ -343,16 +343,10 @@ private static void confirmShardPrepIsComplete(
} catch (IWorkCoordinator.LeaseLockHeldElsewhereException e) {
long finalLockRenegotiationMillis = lockRenegotiationMillis;
int finalShardSetupAttemptNumber = shardSetupAttemptNumber;
log.atInfo()
.setMessage(
() -> "After "
+ finalShardSetupAttemptNumber
+ "another process holds the lock"
+ " for setting up the shard work items. "
+ "Waiting "
+ finalLockRenegotiationMillis
+ "ms before trying again."
)
log.atInfo().setMessage("{}")
.addArgument(() -> "After " + finalShardSetupAttemptNumber + "another process holds the lock"
+ " for setting up the shard work items. " + "Waiting " + finalLockRenegotiationMillis
+ "ms before trying again.")
.log();
Thread.sleep(lockRenegotiationMillis);
lockRenegotiationMillis *= 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void testProcess(int expectedExitCode, Function<RunData, Integer> proces
esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();
log.atInfo().setMessage("Process exited with code: {}").addArgument(actualExitCode).log();

// Check if the exit code is as expected
Assertions.assertEquals(
Expand Down Expand Up @@ -228,7 +228,7 @@ private static ProcessBuilder setupProcess(
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: " + Arrays.toString(args)).log();
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}").addArgument(Arrays.toString(args)).log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
Expand All @@ -245,7 +245,7 @@ private static ProcessBuilder setupProcess(
private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException {
var process = processBuilder.start();

log.atInfo().setMessage("Process started with ID: " + process.toHandle().pid()).log();
log.atInfo().setMessage("Process started with ID: {}").addArgument(process.toHandle().pid()).log();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
var readerThread = new Thread(() -> {
Expand All @@ -259,8 +259,8 @@ private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throw
}
String finalLine = line;
log.atInfo()
.setMessage(() -> "from sub-process [" + process.toHandle().pid() + "]: " + finalLine)
.log();
.setMessage("from sub-process [{}]: {}")
.addArgument(process.toHandle().pid()).addArgument(finalLine).log();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,8 @@ public static int migrateDocumentsSequentially(
);
throw new ExpectedMigrationWorkTerminationException(e, runNumber);
} catch (Exception e) {
log.atError()
.setCause(e)
.setMessage(
() -> "Caught an exception, "
+ "but just going to run again with this worker to simulate task/container recycling"
)
.log();
log.atError().setCause(e).setMessage("Caught an exception, " +
"but just going to run again with this worker to simulate task/container recycling").log();
}
}
}
Expand Down Expand Up @@ -166,7 +161,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
var shouldThrow = new AtomicBoolean();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.atDebug().setMessage("Lease expired for " + workItemId + " making next document get throw").log();
log.atDebug().setMessage("Lease expired for {} making next document get throw")
.addArgument(workItemId).log();
shouldThrow.set(true);
})) {
UnaryOperator<RfsLuceneDocument> terminatingDocumentFilter = d -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public EvaluateResult execute(RootMetadataMigrationContext context) {
var items = migrateAllItems(migrationMode, clusters, transformer, context);
evaluateResult.items(items);
} catch (ParameterException pe) {
log.atError().setMessage("Invalid parameter").setCause(pe).log();
log.atError().setCause(pe).setMessage("Invalid parameter").log();
evaluateResult
.exitCode(INVALID_PARAMETER_CODE)
.errorMessage("Invalid parameter: " + pe.getMessage())
.build();
} catch (Throwable e) {
log.atError().setMessage("Unexpected failure").setCause(e).log();
log.atError().setCause(e).setMessage("Unexpected failure").log();
evaluateResult
.exitCode(UNEXPECTED_FAILURE_CODE)
.errorMessage("Unexpected failure: " + e.getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ public MigrateResult execute(RootMetadataMigrationContext context) {
var items = migrateAllItems(migrationMode, clusters, transformer, context);
migrateResult.items(items);
} catch (ParameterException pe) {
log.atError().setMessage("Invalid parameter").setCause(pe).log();
log.atError().setCause(pe).setMessage("Invalid parameter").log();
migrateResult
.exitCode(INVALID_PARAMETER_CODE)
.errorMessage("Invalid parameter: " + pe.getMessage())
.build();
} catch (Throwable e) {
log.atError().setMessage("Unexpected failure").setCause(e).log();
log.atError().setCause(e).setMessage("Unexpected failure").log();
migrateResult
.exitCode(UNEXPECTED_FAILURE_CODE)
.errorMessage("Unexpected failure: " + e.getMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexN

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().log("Batch Id:{}, {} documents in current bulk request.", batchId, docsBatch.size()))
.doOnSuccess(unused -> log.atDebug().log("Batch Id:{}, succeeded", batchId))
.doOnError(error -> log.atError().log("Batch Id:{}, failed {}", batchId, error.getMessage()))
.doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.")
.addArgument(batchId).addArgument(docsBatch::size).log())
.doOnSuccess(unused -> log.atDebug().setMessage("Batch Id:{}, succeeded").addArgument(batchId).log())
.doOnError(error -> log.atError().setMessage("Batch Id:{}, failed {}")
.addArgument(batchId).addArgument(error::getMessage).log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then() // Discard the response object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
try {
document = reader.document(docId);
} catch (IOException e) {
log.atError().setMessage("Failed to read document at Lucene index location {}").addArgument(docId).setCause(e).log();
log.atError().setCause(e).setMessage("Failed to read document at Lucene index location {}")
.addArgument(docId).log();
return null;
}

Expand Down Expand Up @@ -188,12 +189,14 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
}
}
if (id == null) {
log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log();
log.atError().setMessage("Document with index {} does not have an id. Skipping")
.addArgument(docId).log();
return null; // Skip documents with missing id
}

if (sourceBytes == null || sourceBytes.bytes.length == 0) {
log.atWarn().setMessage("Document {} doesn't have the _source field enabled").addArgument(id).log();
log.atWarn().setMessage("Document {} doesn't have the _source field enabled")
.addArgument(id).log();
return null; // Skip these
}

Expand All @@ -202,7 +205,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
log.atError().setMessage(errorMessage.toString()).setCause(e).log();
log.atError().setCause(e).setMessage("{}").addArgument(errorMessage).log();
return null; // Skip documents with invalid id
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,7 @@ public Mono<BulkResponse> sendBulkRequest(String indexName, List<DocumentReindex
final var docsMap = docs.stream().collect(Collectors.toMap(d -> d.getDocId(), d -> d));
return Mono.defer(() -> {
final String targetPath = indexName + "/_bulk";
log.atTrace()
.setMessage("Creating bulk body with document ids {}")
.addArgument(() -> docsMap.keySet())
.log();
log.atTrace().setMessage("Creating bulk body with document ids {}").addArgument(docsMap::keySet).log();
var body = DocumentReindexer.BulkDocSection.convertToBulkRequestBody(docsMap.values());
var additionalHeaders = new HashMap<String, List<String>>();
// Reduce network bandwidth by attempting request and response compression
Expand Down Expand Up @@ -395,11 +392,8 @@ public Mono<BulkResponse> sendBulkRequest(String indexName, List<DocumentReindex
error
);
} else {
log.atError()
.setMessage("Unexpected empty document map for bulk request on index {}")
.addArgument(indexName)
.setCause(error)
.log();
log.atError().setCause(error).setMessage("Unexpected empty document map for bulk request on index {}")
.addArgument(indexName).log();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void prepBlobFiles(ShardMetadata shardMetadata) {
+ shardMetadata.getShardId()
+ "/";

log.atInfo().setMessage("Downloading blob files from S3: s3://%s/%s to %s")
log.atInfo().setMessage("Downloading blob files from S3: s3://{}/{} to {}")
.addArgument(s3RepoUri.bucketName)
.addArgument(blobFilesS3Prefix)
.addArgument(shardDirPath).log();
Expand All @@ -207,7 +207,7 @@ public void prepBlobFiles(ShardMetadata shardMetadata) {
// Wait for the transfer to complete
CompletedDirectoryDownload completedDirectoryDownload = directoryDownload.completionFuture().join();

log.atInfo().setMessage(()->"Blob file download(s) complete").log();
log.atInfo().setMessage("Blob file download(s) complete").log();

// Print out any failed downloads
completedDirectoryDownload.failedTransfers().forEach(x->log.error("{}", x));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void registerRepo() {
client.registerSnapshotRepo(getRepoName(), settings, context);
log.atInfo().setMessage("Snapshot repo registration successful").log();
} catch (Exception e) {
log.atError().setMessage("Snapshot repo registration failed").setCause(e).log();
log.atError().setCause(e).setMessage("Snapshot repo registration failed").log();
throw new RepoRegistrationFailed(getRepoName());
}
}
Expand All @@ -72,7 +72,7 @@ public void createSnapshot() {
client.createSnapshot(getRepoName(), snapshotName, body, context);
log.atInfo().setMessage("Snapshot {} creation initiated").addArgument(snapshotName).log();
} catch (Exception e) {
log.atError().setMessage("Snapshot {} creation failed").addArgument(snapshotName).setCause(e).log();
log.atError().setCause(e).setMessage("Snapshot {} creation failed").addArgument(snapshotName).log();
throw new SnapshotCreationFailed(snapshotName);
}
}
Expand All @@ -82,7 +82,7 @@ public boolean isSnapshotFinished() {
try {
response = client.getSnapshotStatus(getRepoName(), snapshotName, context);
} catch (Exception e) {
log.atError().setMessage("Failed to get snapshot status").setCause(e).log();
log.atError().setCause(e).setMessage("Failed to get snapshot status").log();
throw new SnapshotStatusCheckFailed(snapshotName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ private ByteBuffer gzipByteBufferSimple(final ByteBuffer inputBuffer) {
}
}
if (inputBuffer.remaining() > 0) {
log.atDebug()
.setMessage("Gzip compression ratio: {}")
.addArgument(() -> String.format("%.2f%%", (double) baos.size() / inputBuffer.remaining() * 100))
.log();
log.atDebug().setMessage("Gzip compression ratio: {}")
.addArgument(() -> String.format("%.2f%%", (double) baos.size() / inputBuffer.remaining() * 100)).log();
}
return ByteBuffer.wrap(baos.toByteArray());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public IndexMetadata transformIndexMetadata(IndexMetadata index) {
}

private void transformIndex(Index index, IndexType type) {
log.atDebug().setMessage(()->"Original Object: {}").addArgument(index.getRawJson().toString()).log();
log.atDebug().setMessage("Original Object: {}").addArgument(index::getRawJson).log(();
var newRoot = index.getRawJson();

switch (type) {
Expand All @@ -85,7 +85,7 @@ private void transformIndex(Index index, IndexType type) {
TransformFunctions.removeIntermediateIndexSettingsLevel(newRoot); // run before fixNumberOfReplicas
TransformFunctions.fixReplicasForDimensionality(newRoot, awarenessAttributeDimensionality);

log.atDebug().setMessage(()->"Transformed Object: {}").addArgument(newRoot.toString()).log();
log.atDebug().setMessage("Transformed Object: {}").addArgument(newRoot).log(();
}

private enum IndexType {
Expand Down
Loading

0 comments on commit 8ebd9f8

Please sign in to comment.