-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replayer retries #848
Replayer retries #848
Conversation
…the replayer. Rename TransformedPackets to simply ByteBufList and pass that around in more places where a ByteBuf stream may need to be replayed. Introduce a new visitor for scheduleRequest that will pass the result response through the visitor to determine if additional retries should be made or if the request should be deemed to be complete. Notice that only the interfaces are in place, not the retries or even determinations in the calling code of when that should happen. The TrafficReplayerCore implements a visitor with a lambda that will need to take into account the existing status code. Since the response may not have been present when the target's response comes in, the TrafficReplayer needs to thread the future for the aggregation result (from the capture stream) and make the returned future for the visitor dependent upon that in some cases. Notice that the predicate to determine if retries are necessary also consumes/transforms results making it up to the caller to determine how multiple responses should be handled. Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
Specifically, ResultsToLogsConsumerTest > testOutputterForGet() FAILED org.opentest4j.AssertionFailedError at ResultsToLogsConsumerTest.java:302 ResultsToLogsConsumerTest > testOutputterForPost() FAILED org.opentest4j.AssertionFailedError at ResultsToLogsConsumerTest.java:302 ResultsToLogsConsumerTest > testOutputterWithException() FAILED org.opentest4j.AssertionFailedError at ResultsToLogsConsumerTest.java:145 OpenSearchDefaultRetryTest > testStatusCodeMatches(int, int, RetryDirective) > org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetryTest.testStatusCodeMatches(int, int, RetryDirective)[1] FAILED io.netty.handler.codec.http.TooLongHttpContentException at OpenSearchDefaultRetryTest.java:67 OpenSearchDefaultRetryTest > testStatusCodeMatches(int, int, RetryDirective) > org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetryTest.testStatusCodeMatches(int, int, RetryDirective)[2] FAILED io.netty.handler.codec.http.TooLongHttpContentException at OpenSearchDefaultRetryTest.java:67 OpenSearchDefaultRetryTest > testStatusCodeMatches(int, int, RetryDirective) > org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetryTest.testStatusCodeMatches(int, int, RetryDirective)[3] FAILED io.netty.handler.codec.http.TooLongHttpContentException at OpenSearchDefaultRetryTest.java:67 OpenSearchDefaultRetryTest > testStatusCodeMatches(int, int, RetryDirective) > org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetryTest.testStatusCodeMatches(int, int, RetryDirective)[4] FAILED io.netty.handler.codec.http.TooLongHttpContentException at OpenSearchDefaultRetryTest.java:67 OpenSearchDefaultRetryTest > testStatusCodeMatches(int, int, RetryDirective) > org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetryTest.testStatusCodeMatches(int, int, RetryDirective)[5] FAILED io.netty.handler.codec.http.TooLongHttpContentException at OpenSearchDefaultRetryTest.java:67 982 tests completed, 8 failed Signed-off-by: Greg Schohn <[email protected]>
a surprising exception in how HttpObjectAggregator (didn't) truncate messages when they were too long. Now the HttpObjectAggregation done in HttpByteBufFormatter DOES properly truncate them rather than throwing an exception and also has a test for it. Signed-off-by: Greg Schohn <[email protected]>
…d fixed a number of exposed bugs. How requests were truncated was still inadequate. The HttpObjectAggregator still wasn't performing well, so I wrote my own simple aggregator to combine an HttpMessage object with a content ByteBuf that I aggregate on the fly. I had to fix other minor exposed bugs like in the regex to find a bulk request and how pump data to the json parser for bulk bodies. Because the http message aggregation is now done with our own implementation, and more literally, the auto-adding of header values for length has been removed. See the documentation of parseHttpMessageFromBufs for how headers are left intact other than the potential addition of payloadBytesDropped which will be added when the contents are truncated. Signed-off-by: Greg Schohn <[email protected]>
f3c2ca6
to
def1f31
Compare
…tried the right number of times before failing, aggregating the results together appropriately. During testing/development, it also became apparent that transformation exceptions in the transformation status weren't being used for anything. To make sure that the exceptions flowed through w/ the flag, I modified the enum to be a discriminated union. I also fixed some bugs in refcounting in newly written code (ByteBufList.asCompositeByteBufRetained). Signed-off-by: Greg Schohn <[email protected]>
200500c
to
69b8f7f
Compare
…through the kafka topic upon misconfiguration. This change forces the user to rectify a configuration error on the client (replayer) or the target server before proceeding. If the replayer is restarted with the same parameters/target state, it should do the same thing. The replayer sends requests to a target server, if there's no connection, there's no replayer - it wouldn't a fair run in these misconfigured cases. see https://opensearch.atlassian.net/browse/MIGRATIONS-1311 Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
for the NettyPacketToHttpConsumer out from the ConnectionReplaySession into NettyPacketToHttpConsumer itself by having the ConnectionReplaySession take a factory ((eventLoop, context) -> ChannelFuture), which in the end is defined by the NettyPacketToHttpConsumer class, alongside the code that initializes the channel. That reduces the coupling between the ConnectionReplaySession and the HTTP subsystem, allowing the ConnectionReplaySession to act much more as a plain old datatype/cached value. Signed-off-by: Greg Schohn <[email protected]>
…o reduce the number of NPEs Signed-off-by: Greg Schohn <[email protected]>
…ally for tests. I've refactored some toxiproxy code into a simple wrapper that is suitable for simple use cases. I'd still like to do more. Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]> # Conflicts: # TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java
…ied requests for the Replayer Signed-off-by: Greg Schohn <[email protected]>
… scheduled on eventLoops that were already stopping. Signed-off-by: Greg Schohn <[email protected]>
… responses independent of the number of retries. Signed-off-by: Greg Schohn <[email protected]>
Some metrics are still held by the channel context, but the spans for each connection are useful to indicate the exception count and the duration that we spent. Tests have been updated so that all tests pass. Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
The deadlock was caused because TrafficReplayerTopLevel.close() waited for the CompletableFuture returned by shutdown to return. The CompletableFuture in shutdown() wraps the netty future that shuts down the event loop group. That event loop group is shutdown when the client connection pool is terminated. In an "abundance of caution", each of the cached connections are closed down (in case there's other cleanup to do on the connection, though there isn't currently). However, closeClientConnectionChannel calls submit on the EventLoop, which in turn will throw if the group has already been shut down. That causes an exception to be propagated upward. The deadlock occurs because the future for ClientConnectionPool.shutdownNow() is never returned since an exception is thrown instead. That causes a hang in close() since there's never a signal that the netty future was completed. The solution is to wrap the invocations of each closeClientConnectionChannel in a catch statement. Another potential solution could be to ignore the cached connections since the event loop group is gone. Another protection was also put in place to capture and propagate any Throwables through the returned CompletableFuture so that close() can throw in get(). Signed-off-by: Greg Schohn <[email protected]>
…wnNow() method. One test is disabled because it's actually a terrible idea in practice (see disabled description). Another test was broken because a trailing newline didn't follow request headers, which caused the server to get the content of two requests in one pass. The count for the requestConnecting value was also off. Signed-off-by: Greg Schohn <[email protected]>
185a2b2
to
660bdab
Compare
b4306f9
to
f99d4fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
25% done with this review, here are my notes, nothing blocking
...ure/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java
Outdated
Show resolved
Hide resolved
...ure/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java
Outdated
Show resolved
Hide resolved
...ure/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java
Outdated
Show resolved
Hide resolved
...ure/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpByteBufFormatter.java
Outdated
Show resolved
Hide resolved
.../trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java
Outdated
Show resolved
Hide resolved
return Optional.ofNullable(tuple.targetResponseData) | ||
.filter(r -> !r.isEmpty()) | ||
.map(d -> convertResponse(tuple.context, d, tuple.targetResponseDuration)); | ||
private static List<Map<String, Object>> getTargetResponseOp(SourceTargetCaptureTuple tuple) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to getTargetResponseList
@@ -34,11 +34,12 @@ | |||
public class ParsedHttpMessagesAsDicts { | |||
public static final String STATUS_CODE_KEY = "Status-Code"; | |||
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms"; | |||
public static final int MAX_PAYLOAD_BYTES_TO_CAPTURE = 256 * 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this too high? Why this value
} | ||
if (dtr.directive == RetryDirective.RETRY) { | ||
var newStartTime = referenceStartTime.plus(nextRetryDelay); | ||
log.atInfo().setMessage(() -> "Making request scheduled at " + newStartTime).log(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: info -> debug
() -> "sendRequestWithRetries is failing due to the pending shutdown of the EventLoop"); | ||
} | ||
return sendPackets(senderSupplier.get(), eventLoop, | ||
byteBufList.streamRetained().iterator(), referenceStartTime, interval, new AtomicInteger()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer this to use refSafeMap to retain the stream and ensure a stream close at the end of the future
sb.append(responseList.stream() | ||
.map(AggregatedRawResponse::toString) | ||
.collect(Collectors.joining("\n", "[", "]"))); | ||
return sb.toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you intend to close this with }
, also, i don't see needing the ,
in transformStatus on line 53
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the leading comma. I left the prefix suffix as [
/]
since the contents are of a list. Why were you asking about }
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why were you asking about }?
The start of this string is TransformedTargetRequestAndResponse{
and that {
is never closed
...r/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java
Outdated
Show resolved
Hide resolved
...rafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java
Show resolved
Hide resolved
...rafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java
Show resolved
Hide resolved
Signed-off-by: Greg Schohn <[email protected]>
Signed-off-by: Greg Schohn <[email protected]>
02d1400
to
7ffc979
Compare
|
||
public TextTrackedFuture<RequestSenderOrchestrator.RetryDirective> | ||
getRetryDirectiveUnlessExceededMaxRetries(List<AggregatedRawResponse> previousResponses) { | ||
var d = previousResponses.size() > MAX_RETRIES ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is current response in previous retries? If not, this should be a >=
(4 previousResponses + current response == 5 calls == 1 original call and 4 retries) meaning we should break
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. A few notes that i'd like to see.
- There's no retry limit for failed bulk requests
- There's no comparison between source and target status code for non-bulk requests
- (Unit tests for the above)
- (Existing Comment) - there's an off by one on the retry count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed each of these in the latest commits
* There's now a retry limit for failed bulk requests. * For non-bulk requests, compare the status codes of the responses * Fix an off-by-one bug for number of retries. Signed-off-by: Greg Schohn <[email protected]>
…nse bodies Signed-off-by: Greg Schohn <[email protected]>
2e045a9
to
df02a79
Compare
…ogs. I believe that there's still an issue when any different casing is used for content header keys. Signed-off-by: Greg Schohn <[email protected]>
} | ||
|
||
private RequestSenderOrchestrator.RetryDirective shouldRetry(int sourceStatus, int targetStatus) { | ||
return targetStatus >= 300 && sourceStatus < 300 ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A case that's interesting to me here is if the source is 404 and target is 500, we won't retry. This is probably fine from a side effect perspective, but we should at least document this from a test case and written documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can either come in this PR or a follow up
Please update the PR overview before merging |
… more defensive. Signed-off-by: Greg Schohn <[email protected]>
Description
Add support for RequestSenderOrchestrator.scheduleRequest() to cause retries when connections couldn't be made (retrying indefinitely, until a connection CAN be made) or when the target response is worse than the source response (limited to 4 retries). This precipitated other changes throughout the system.
Below are those other changes, which are mostly relevant for developers. However, some of these changes will have minor user-impact.
Output tuple responses are now lists of responses rather than a single response. The humanReadableLogs script has been updated to decode each of the list elements.
HttpByteBufFormatter relied upon HttpObjectAggregator to parse headers (and the status code) but would throw an exception when the body was larger than the pre-configured size. Multiple areas of code (often for outputting/diagnostics) have needed to clip the contents of HTTP messages, but we hadn't tested beyond capacities. Now an internal 'TruncatingAggregator' class accepts the headers and then accumulates initial bytes until the limit is satisfied, consuming/dropping the remaining ones. Other helper methods have been added to HttpByteBufFormatter to facilitate easily setting up an EmbeddedChannel/HTTP pipeline that additional custom handlers can be applied to. One such custom handler is the BulkErrorFindingHandler, which uses a json streaming parser to find if bulk responses had any errors (used to determine if a retry is necessary).
HttpRequestTransformationStatus has been converted from an enum to a class so that causal Exceptions can be tied to the result and propagated through the system more easily. Those causal exceptions were easy to drop/ignore before.
To support retrying connection failures, how/when/where connections are created had been updated. ConnectionReplaySession has less responsibility to create connections. The supplier previously passed to it is now a BiFunction that takes an EventLoop and an ITargetRequestContext. getChannelFutureInActiveState() now delegates to the BiFunction unless the cached channel is still active. It's up to the caller to deal with any exception that may have been propagated by the BiFunction. The BiFunction is provided by NettyPacketToHttpConsumer, which reduces the coupling between the ConnectionReplaySession and the HTTP subsystem. That allows the ConnectionReplaySession to act much more as a plain old datatype/cached value.
Since connections are reused between requests, but created due to a request, there's a new context (IRequestConnectingContext) under the target request. This won't be present when a connection has been reused, but otherwise will be when a new connection is required.
Remove AdaptiveRateLimiter. It was meant for some of the retry stuff that this PR pertains to. Since it wasn't used even here, it seemed better to kill the dependency altogether.
I've got yet another toxiproxy wrapper. Notice that there is also one for CaptureProxy tests (ToxiproxyContainerTestBase). This is just test code, but merging their behaviors/implementations together would be a great future improvement.
Be more careful about how connections are terminated when the system is being shutdown (in NettyFutureBinders). This probably never impacted the replayer, but we shut the replayer down a lot in unit tests. Some of the changes in connection handling exacerbated deadlocks.
Category: Enhancement, New feature
Why these changes are required? To make the traffic to a target more reliable.
What is the old behavior before changes and new behavior after changes? No retries for any kind of transient error before. Now we'll try harder :)
Issues Resolved
https://opensearch.atlassian.net/browse/MIGRATIONS-1214
https://opensearch.atlassian.net/browse/MIGRATIONS-1311
Is this a backport? If so, please add backport PR # and/or commits #
Testing
Gradle unit testing, manual docker-compose OSB tests + CICD tests
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.