Skip to content

Commit

Permalink
Merge pull request #916 from AndreKurait/TargetRequestCompression
Browse files Browse the repository at this point in the history
Implement target gzip request compression
  • Loading branch information
AndreKurait authored Sep 4, 2024
2 parents c6320ac + eb44aca commit e1aa766
Show file tree
Hide file tree
Showing 15 changed files with 676 additions and 29 deletions.
16 changes: 12 additions & 4 deletions DocumentsFromSnapshotMigration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The snapshot the application extracts the documents from can be local or in S3.

## How to use the tool

You can kick off the locally tool using Gradle.
You can kick off locally using Gradle.

### S3 Snapshot

Expand Down Expand Up @@ -70,7 +70,15 @@ To see the default shard size, use the `--help` CLI option:
| --target-password | The password for target cluster authentication |
| --target-aws-region | The AWS region for the target cluster. Required if using SigV4 authentication |
| --target-aws-service-signing-name | The AWS service signing name (e.g. 'es' for Amazon OpenSearch Service, 'aoss' for Amazon OpenSearch Serverless). Required if using SigV4 authentication |
| --target-insecure | Flag to allow untrusted SSL certificates for target cluster |
| --documents-per-bulk-request | Optional. The number of documents to be included within each bulk request sent. Default: no max (controlled by documents size) |
| --documents-size-per-bulk-request | Optional. The maximum aggregate document size to be used in bulk requests in bytes. Default: 10 MiB |
| --max-connections | Optional. The maximum number of connections to simultaneously used to communicate to the target. Default: 20 |

## Advanced Arguments

These arguments should be carefully considered before setting, can include experimental features, and can impact security posture of the solution. Tread with caution.

| Argument | Description |
|-----------------------------------|:-----------------------------------------------------------------------------------------------------------------------|
| --target-compression | Flag to enable request compression for target cluster. Default: false |
| --documents-per-bulk-request | The number of documents to be included within each bulk request sent. Default: no max (controlled by documents size) |
| --max-connections | The maximum number of connections to simultaneously used to communicate to the target. Default: 10 |
| --target-insecure | Flag to allow untrusted SSL certificates for target cluster. Default: false |
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void migrateFrom_ES_v6_8(final SearchClusterContainer.Version targetVersi
targetCluster.getUrl(),
clockJitter,
testDocMigrationContext,
sourceCluster.getVersion().getSourceVersion()
sourceCluster.getVersion().getSourceVersion(),
false
);
assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED));

Expand Down Expand Up @@ -224,7 +225,8 @@ private void migrateFrom_ES_v7_X(
targetCluster.getUrl(),
clockJitter,
testDocMigrationContext,
sourceCluster.getVersion().getSourceVersion()
sourceCluster.getVersion().getSourceVersion(),
false
);
assertThat(result, equalTo(DocumentsRunner.CompletionStatus.WORK_COMPLETED));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@
@Tag("longTest")
@Slf4j
public class ParallelDocumentMigrationsTest extends SourceTestBase {
final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;
final static List<SearchClusterContainer.Version> SOURCE_IMAGES = List.of(
SearchClusterContainer.ES_V7_10_2,
SearchClusterContainer.ES_V7_17
);
final static List<SearchClusterContainer.Version> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);
public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;

public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream()
Expand All @@ -52,19 +50,20 @@ public static Stream<Arguments> makeDocumentMigrationArgs() {
var targetImageNames = TARGET_IMAGES.stream()
.collect(Collectors.toList());
var numWorkersList = List.of(1, 3, 40);
var compressionEnabledList = List.of(true, false);
return sourceImageArgs.stream()
.flatMap(
sourceParams -> targetImageNames.stream()
.flatMap(
targetImage -> numWorkersList.stream()
.map(
numWorkers -> Arguments.of(
.flatMap(numWorkers -> compressionEnabledList.stream().map(compression -> Arguments.of(
numWorkers,
targetImage,
sourceParams[0],
sourceParams[1],
sourceParams[2]
)
sourceParams[2],
compression
))
)
)
);
Expand All @@ -77,7 +76,8 @@ public void testDocumentMigration(
SearchClusterContainer.Version targetVersion,
SearchClusterContainer.Version baseSourceImageVersion,
String generatorImage,
String[] generatorArgs
String[] generatorArgs,
boolean compressionEnabled
) throws Exception {
var executorService = Executors.newFixedThreadPool(numWorkers);
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();
Expand Down Expand Up @@ -144,7 +144,8 @@ public void testDocumentMigration(
runCounter,
clockJitter,
testDocMigrationContext,
baseSourceImageVersion.getSourceVersion()
baseSourceImageVersion.getSourceVersion(),
compressionEnabled
),
executorService
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public static int migrateDocumentsSequentially(
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext,
ClusterVersion parserVersion
ClusterVersion parserVersion,
boolean compressionEnabled
) {
for (int runNumber = 1;; ++runNumber) {
try {
Expand All @@ -157,7 +158,8 @@ public static int migrateDocumentsSequentially(
targetAddress,
clockJitter,
testContext,
parserVersion
parserVersion,
compressionEnabled
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return runNumber;
Expand Down Expand Up @@ -206,7 +208,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context,
ClusterVersion parserVersion
ClusterVersion parserVersion,
boolean compressionEnabled
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
var shouldThrow = new AtomicBoolean();
Expand Down Expand Up @@ -245,6 +248,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
readerFactory,
new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.compressionEnabled(compressionEnabled)
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1),
new OpenSearchWorkCoordinator(
Expand Down
9 changes: 8 additions & 1 deletion RFS/src/main/java/com/rfs/common/OpenSearchClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -293,7 +294,13 @@ public Mono<BulkResponse> sendBulkRequest(String indexName, List<BulkDocSection>
.addArgument(() -> docsMap.keySet())
.log();
var body = BulkDocSection.convertToBulkRequestBody(docsMap.values());
return client.postAsync(targetPath, body, context)
var additionalHeaders = new HashMap<String, List<String>>();
// Reduce network bandwidth by attempting request and response compression
if (client.supportsGzipCompression()) {
RestClient.addGzipRequestHeaders(additionalHeaders);
RestClient.addGzipResponseHeaders(additionalHeaders);
}
return client.postAsync(targetPath, body, additionalHeaders, context)
.flatMap(response -> {
var resp = new BulkResponse(response.statusCode, response.statusText, response.headers, response.body);
if (!resp.hasBadStatusCode() && !resp.hasFailedOperations()) {
Expand Down
42 changes: 41 additions & 1 deletion RFS/src/main/java/com/rfs/common/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;

import com.rfs.common.http.CompositeTransformer;
import com.rfs.common.http.ConnectionContext;
import com.rfs.common.http.GzipPayloadRequestTransformer;
import com.rfs.common.http.HttpResponse;
import com.rfs.netty.ReadMeteringHandler;
import com.rfs.netty.WriteMeteringHandler;
Expand Down Expand Up @@ -45,10 +47,13 @@ public class RestClient {

private static final String USER_AGENT_HEADER_NAME = HttpHeaderNames.USER_AGENT.toString();
private static final String CONTENT_TYPE_HEADER_NAME = HttpHeaderNames.CONTENT_TYPE.toString();
private static final String CONTENT_ENCODING_HEADER_NAME = HttpHeaderNames.CONTENT_ENCODING.toString();
private static final String ACCEPT_ENCODING_HEADER_NAME = HttpHeaderNames.ACCEPT_ENCODING.toString();
private static final String HOST_HEADER_NAME = HttpHeaderNames.HOST.toString();

private static final String USER_AGENT = "RfsWorker-1.0";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String GZIP_TYPE = "gzip";

public RestClient(ConnectionContext connectionContext) {
this(connectionContext, 0);
Expand Down Expand Up @@ -140,12 +145,17 @@ public Mono<HttpResponse> asyncRequest(HttpMethod method, String path, String bo
});
}
var contextCleanupRef = new AtomicReference<Runnable>(() -> {});
return connectionContext.getRequestTransformer().transform(method.name(), path, headers, Mono.justOrEmpty(body)
// Support auto compressing payload if headers indicate support and payload is not compressed
return new CompositeTransformer(
new GzipPayloadRequestTransformer(),
connectionContext.getRequestTransformer()
).transform(method.name(), path, headers, Mono.justOrEmpty(body)
.map(b -> ByteBuffer.wrap(b.getBytes(StandardCharsets.UTF_8)))
)
.flatMap(transformedRequest ->
client.doOnRequest((r, conn) -> contextCleanupRef.set(addSizeMetricsHandlersAndGetCleanup(context).apply(r, conn)))
.headers(h -> transformedRequest.getHeaders().forEach(h::add))
.compress(hasGzipResponseHeaders(transformedRequest.getHeaders()))
.request(method)
.uri("/" + path)
.send(transformedRequest.getBody().map(Unpooled::wrappedBuffer))
Expand All @@ -168,6 +178,27 @@ public Mono<HttpResponse> asyncRequest(HttpMethod method, String path, String bo
.doOnTerminate(() -> contextCleanupRef.get().run());
}


public boolean supportsGzipCompression() {
return connectionContext.isCompressionSupported();
}

public static void addGzipResponseHeaders(Map<String, List<String>> headers) {
headers.put(ACCEPT_ENCODING_HEADER_NAME, List.of(GZIP_TYPE));
}
public static boolean hasGzipResponseHeaders(Map<String, List<String>> headers) {
return headers.getOrDefault(ACCEPT_ENCODING_HEADER_NAME, List.of()).contains(GZIP_TYPE);
}
public static void addGzipRequestHeaders(Map<String, List<String>> headers) {
headers.put(GzipPayloadRequestTransformer.CONTENT_ENCODING_HEADER_NAME,
List.of(GzipPayloadRequestTransformer.GZIP_CONTENT_ENCODING_HEADER_VALUE));
}
public static boolean hasGzipRequestHeaders(Map<String, List<String>> headers) {
return headers.getOrDefault(GzipPayloadRequestTransformer.CONTENT_ENCODING_HEADER_NAME, List.of())
.contains(GzipPayloadRequestTransformer.GZIP_CONTENT_ENCODING_HEADER_VALUE);
}


private Map<String, String> extractHeaders(HttpHeaders headers) {
return headers.entries().stream()
.collect(java.util.stream.Collectors.toMap(
Expand All @@ -185,6 +216,15 @@ public Mono<HttpResponse> getAsync(String path, IRfsContexts.IRequestContext con
return asyncRequest(HttpMethod.GET, path, null, null, context);
}

public Mono<HttpResponse> postAsync(
String path,
String body,
Map<String, List<String>> additionalHeaders,
IRfsContexts.IRequestContext context
) {
return asyncRequest(HttpMethod.POST, path, body, additionalHeaders, context);
}

public Mono<HttpResponse> postAsync(String path, String body, IRfsContexts.IRequestContext context) {
return asyncRequest(HttpMethod.POST, path, body, null, context);
}
Expand Down
31 changes: 31 additions & 0 deletions RFS/src/main/java/com/rfs/common/http/CompositeTransformer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.rfs.common.http;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Getter;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@Getter
public class CompositeTransformer implements RequestTransformer {
private final RequestTransformer firstTransformer;
private final RequestTransformer secondTransformer;

@Override
public Mono<TransformedRequest> transform(
String method,
String path,
Map<String, List<String>> headers,
Mono<ByteBuffer> body
) {
return firstTransformer.transform(method, path, headers, body)
.flatMap(firstResult -> secondTransformer.transform(method,
path,
firstResult.getHeaders(),
firstResult.getBody()
));
}
}
32 changes: 29 additions & 3 deletions RFS/src/main/java/com/rfs/common/http/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.time.Clock;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

Expand All @@ -22,6 +23,7 @@ public enum Protocol {
private final Protocol protocol;
private final boolean insecure;
private final RequestTransformer requestTransformer;
private final boolean compressionSupported;

private ConnectionContext(IParams params) {
assert params.getHost() != null : "host is null";
Expand Down Expand Up @@ -57,19 +59,20 @@ private ConnectionContext(IParams params) {
}

if (basicAuthEnabled) {
this.requestTransformer = new BasicAuthTransformer(params.getUsername(), params.getPassword());
requestTransformer = new BasicAuthTransformer(params.getUsername(), params.getPassword());
}
else if (sigv4Enabled) {
this.requestTransformer = new SigV4AuthTransformer(
requestTransformer = new SigV4AuthTransformer(
DefaultCredentialsProvider.create(),
params.getAwsServiceSigningName(),
params.getAwsRegion(),
protocol.name(),
Clock::systemUTC);
}
else {
this.requestTransformer = NoAuthTransformer.INSTANCE;
requestTransformer = new NoAuthTransformer();
}
compressionSupported = params.isCompressionEnabled();
}

public interface IParams {
Expand All @@ -83,6 +86,8 @@ public interface IParams {

String getAwsServiceSigningName();

boolean isCompressionEnabled();

boolean isInsecure();

default ConnectionContext toConnectionContext() {
Expand Down Expand Up @@ -115,9 +120,25 @@ public static class TargetArgs implements IParams {
@Parameter(names = {
"--target-insecure" }, description = "Allow untrusted SSL certificates for target", required = false)
public boolean insecure = false;

@ParametersDelegate
TargetAdvancedArgs advancedArgs = new TargetAdvancedArgs();

@Override
public boolean isCompressionEnabled() {
return advancedArgs.isCompressionEnabled();
}
}

// Flags that require more testing and validation before recommendations are made
@Getter
public static class TargetAdvancedArgs {
@Parameter(names = {
"--target-compression" }, description = "**Advanced**. Allow request compression to target", required = false)
public boolean compressionEnabled = false;
}

@Getter
public static class SourceArgs implements IParams {
@Parameter(names = {
"--source-host" }, description = "The source host and port (e.g. http://localhost:9200)", required = false)
Expand All @@ -142,5 +163,10 @@ public static class SourceArgs implements IParams {
@Parameter(names = {
"--source-insecure" }, description = "Allow untrusted SSL certificates for source", required = false)
public boolean insecure = false;

public boolean isCompressionEnabled() {
// No compression on source due to no ingestion
return false;
}
}
}
Loading

0 comments on commit e1aa766

Please sign in to comment.