Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFS] Improved RFS Worker exception logging #676

Merged
merged 6 commits into from
May 29, 2024

Conversation

chelma
Copy link
Member

@chelma chelma commented May 22, 2024

Description

  • Perform some refactoring of the RFS Worker and select, underlying libraries to better log exceptional situations

Issues Resolved

Testing

  • Ran the ReindexFromSnapshot.java all-in-one script against the docker compose test setup
  • Updated unit tests
  • Provoked an exception to demonstrate the new logging behavior, displayed below:
chelma@3c22fba4e266 RFS % gradle runRfsWorker --args='--snapshot-name reindex-from-snapshot --s3-local-dir /tmp/s3_files --s3-repo-uri s3://chelma-iad-rfs-local-testing --s3-region us-east-1 --source-host http://localhost:19200 --target-host http://localhost:29200 --min-replicas 0 --index-template-whitelist "my_index_template" --component-template-whitelist "my_component_template"'

> Task :runRfsWorker
19:21:31.379 INFO  Running RfsWorker
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
19:21:31.644 INFO  Checking if work remains in the Snapshot Phase...
19:21:32.743 INFO  Snapshot not yet completed, entering Snapshot Phase...
19:21:32.744 INFO  Snapshot CMS Entry not found, attempting to create it...
19:21:32.813 INFO  Snapshot CMS Entry created
19:21:32.813 INFO  Attempting to initiate the snapshot...
19:21:33.936 INFO  Snapshot repo registration successful
19:21:34.008 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
19:21:35.259 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
19:21:37.341 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
19:21:42.080 ERROR Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
19:21:42.081 ERROR Snapshot reindex-from-snapshot creation failed
reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 3/3
        at reactor.core.Exceptions.retryExhausted(Exceptions.java:308) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:68) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.util.retry.RetryBackoffSpec.lambda$null$4(RetryBackoffSpec.java:560) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:183) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:273) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onError(FluxRetryWhen.java:194) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:194) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:118) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.5.jar:3.6.5]
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.5.jar:3.6.5]
Exception in thread "main"      at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.18.jar:1.1.18]
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.18.jar:1.1.18]
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.18.jar:1.1.18]
com.rfs.worker.SnapshotRunner$SnapshotPhaseFailed: Snapshot Phase failed        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:793) ~[reactor-netty-http-1.1.18.jar:1.1.18]

        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.18.jar:1.1.18]
        at com.rfs.worker.SnapshotRunner.runInternal(SnapshotRunner.java:42)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at com.rfs.worker.Runner.run(Runner.java:21)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at com.rfs.RunRfsWorker.main(RunRfsWorker.java:133)     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]

        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.108.Final.jar:4.1.108.Final]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[?:?]
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.6.5.jar:3.6.5]
                at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.5.jar:3.6.5]
                at com.rfs.common.OpenSearchClient.createSnapshot(OpenSearchClient.java:132) ~[main/:?]
                at com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:55) [main/:?]
                at com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:112) [main/:?]
                at com.rfs.worker.SnapshotRunner.runInternal(SnapshotRunner.java:34) [main/:?]
                at com.rfs.worker.Runner.run(Runner.java:21) [main/:?]
                at com.rfs.RunRfsWorker.main(RunRfsWorker.java:133) [main/:?]
Caused by: com.rfs.common.OpenSearchClient$OperationFailed: Could not create snapshot: _snapshot/migration_assistant_repo/reindex-from-snapshot. Response Code: 400, Response Message: Bad Request, Response Body: {"error":{"root_cause":[{"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"}],"type":"invalid_snapshot_name_exception","reason":"[migration_assistant_repo:reindex-from-snapshot] Invalid snapshot name [reindex-from-snapshot], snapshot with the same name already exists"},"status":400}
        at com.rfs.common.OpenSearchClient.lambda$createSnapshot$4(OpenSearchClient.java:127) ~[main/:?]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132) ~[reactor-core-3.6.5.jar:3.6.5]
        ... 42 more
19:21:42.095 ERROR Snapshot Phase failed w/ an exception
19:21:42.095 ERROR {"exceptionMessage":"Failed to create snapshot reindex-from-snapshot","exceptionClass":"SnapshotCreationFailed","exceptionTrace":"[com.rfs.common.SnapshotCreator.createSnapshot(SnapshotCreator.java:59), com.rfs.worker.SnapshotStep$InitiateSnapshot.run(SnapshotStep.java:112), com.rfs.worker.SnapshotRunner.runInternal(SnapshotRunner.java:34), com.rfs.worker.Runner.run(Runner.java:21), com.rfs.RunRfsWorker.main(RunRfsWorker.java:133)]","phase":"SNAPSHOT_IN_PROGRESS","currentStep":"InitiateSnapshot","cmsEntry":"null"}

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link

codecov bot commented May 22, 2024

Codecov Report

Attention: Patch coverage is 43.22034% with 134 lines in your changes are missing coverage. Please review.

Project coverage is 63.44%. Comparing base (c8be42b) to head (8ab93f8).
Report is 48 commits behind head on main.

Files Patch % Lines
...src/main/java/com/rfs/common/OpenSearchClient.java 0.00% 47 Missing ⚠️
...src/main/java/com/rfs/cms/OpenSearchCmsClient.java 0.00% 17 Missing ⚠️
RFS/src/main/java/com/rfs/RunRfsWorker.java 0.00% 15 Missing ⚠️
.../src/main/java/com/rfs/common/SnapshotCreator.java 0.00% 14 Missing ⚠️
.../src/main/java/com/rfs/cms/OpenSearchCmsEntry.java 0.00% 13 Missing ⚠️
...S/src/main/java/com/rfs/worker/MetadataRunner.java 57.89% 8 Missing ⚠️
...S/src/main/java/com/rfs/worker/SnapshotRunner.java 55.55% 8 Missing ⚠️
RFS/src/main/java/com/rfs/worker/MetadataStep.java 86.11% 4 Missing and 1 partial ⚠️
RFS/src/main/java/com/rfs/worker/Runner.java 71.42% 4 Missing ⚠️
RFS/src/main/java/com/rfs/worker/SnapshotStep.java 91.42% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main     #676       +/-   ##
=============================================
- Coverage     75.51%   63.44%   -12.08%     
- Complexity     1422     1561      +139     
=============================================
  Files           150      216       +66     
  Lines          6147     8414     +2267     
  Branches        561      758      +197     
=============================================
+ Hits           4642     5338      +696     
- Misses         1135     2674     +1539     
- Partials        370      402       +32     
Flag Coverage Δ
unittests 63.44% <43.22%> (-12.08%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@peternied peternied left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chelma Thanks for making progress in this space, I've got some overall concerns about the data access pattern associated with what the CMS client is returning captured in OpenSearchCmsClient.java

Since this PR is focused around exception logging - lets add test case to make sure the exception logging is propagating as you expect. It might be worthwhile to refactor how some of these methods are configured and called to make test cases of this nature easier to author.

RFS/src/main/java/com/rfs/cms/CmsEntry.java Outdated Show resolved Hide resolved
RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java Outdated Show resolved Hide resolved
RFS/src/main/java/com/rfs/worker/MetadataStep.java Outdated Show resolved Hide resolved
@chelma
Copy link
Member Author

chelma commented May 27, 2024

Still to-do - test cases around logging exceptions.

@chelma
Copy link
Member Author

chelma commented May 28, 2024

@peternied OK - Refactored a bit more and added some tests. Curious to see what you think now.

Copy link
Member

@peternied peternied left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chelma Thanks for updating the interfaces, these are looking better; however, I am seeing some issues in repeating patterns in how this code works - I've called out what looks like types of usage that look like they need adjustment and then applied throughout the codebase.

Let me know if you've got questions

Comment on lines 24 to 28
if (createdEntry.isPresent()) {
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(createdEntry.get()));
} else {
return null;
return Optional.empty();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operations on Optional<> will not be handled unless there is a value inside them, so this block can be simplify this greatly by using map to the following:

return createdEntry.map(OpenSearchCmsEntry.Snapshot::fromJson)

Comment on lines 33 to 39
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);
if (document.isEmpty()) {
return Optional.empty();
}
return OpenSearchCmsEntry.Snapshot.fromJsonString(response.body);

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
return Optional.of(OpenSearchCmsEntry.Snapshot.fromJson(sourceNode));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map simplifies this method greatly too:

    return client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID)
                 .map(document -> (ObjectNode) document.get("_source"))
                 .map(OpenSearchCmsEntry.Snapshot::fromJson);

}
return OpenSearchCmsEntry.Snapshot.fromJsonString(response.body);

ObjectNode sourceNode = (ObjectNode) document.get().get("_source");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we alter the object model so this casting isn't needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, not really; this is a Jackson thing. When you .get() a field on an ObjectNode, it returns a JsonNode (its parent class). The returned object might be an ObjectNode, or it might not, depending on what that field contains. It feels like this is the right spot to determine if this is an ObjectNode or something like a ArrayNode.

@@ -99,11 +108,16 @@ public RestClient.Response registerSnapshotRepo(String repoName, ObjectNode sett
.doOnError(e -> logger.error(e.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(10)))
.block();

return Optional.of(settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct, if the registration fails, the method should not return an instance of the settings - but of an empty object, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct - this behavior is wrong, and doesn't match the comment for the method.

Copy link
Member

@peternied peternied left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better, great work @chelma!

@chelma chelma merged commit 80f9e69 into opensearch-project:main May 29, 2024
4 of 7 checks passed
@chelma chelma deleted the MIGRATIONS-1731 branch October 23, 2024 17:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants