Skip to content

Commit

Permalink
Implement target gzip request compression
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Aug 26, 2024
1 parent 656bdb0 commit 1733301
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 18 deletions.
5 changes: 3 additions & 2 deletions DocumentsFromSnapshotMigration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The snapshot the application extracts the documents from can be local or in S3.

## How to use the tool

You can kick off the locally tool using Gradle.
You can kick off locally using Gradle.

### S3 Snapshot

Expand Down Expand Up @@ -71,6 +71,7 @@ To see the default shard size, use the `--help` CLI option:
| --target-aws-region | The AWS region for the target cluster. Required if using SigV4 authentication |
| --target-aws-service-signing-name | The AWS service signing name (e.g. 'es' for Amazon OpenSearch Service, 'aoss' for Amazon OpenSearch Serverless). Required if using SigV4 authentication |
| --target-insecure | Flag to allow untrusted SSL certificates for target cluster |
| --target-compression | Flag to enable request compression for target cluster |
| --documents-per-bulk-request | Optional. The number of documents to be included within each bulk request sent. Default: no max (controlled by documents size) |
| --documents-size-per-bulk-request | Optional. The maximum aggregate document size to be used in bulk requests in bytes. Default: 10 MiB |
| --max-connections | Optional. The maximum number of connections to simultaneously used to communicate to the target. Default: 20 |
| --max-connections | Optional. The maximum number of connections to simultaneously used to communicate to the target. Default: 10 |
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,24 @@ public static Stream<Arguments> makeDocumentMigrationArgs() {
.map(SearchClusterContainer.Version::getImageName)
.collect(Collectors.toList());
var numWorkersList = List.of(1, 3, 40);
var compressionEnabledList = List.of(true, false);
return sourceImageArgs.stream()
.flatMap(
sourceParams -> targetImageNames.stream()
.flatMap(
targetImage -> numWorkersList.stream()
.map(
numWorkers -> Arguments.of(
numWorkers,
targetImage,
sourceParams[0],
sourceParams[1],
sourceParams[2]
)
.flatMap(
numWorkers -> compressionEnabledList.stream()
.map(
compressionEnabled -> Arguments.of(
numWorkers,
targetImage,
sourceParams[0],
sourceParams[1],
sourceParams[2],
compressionEnabled
)
)
)
)
);
Expand All @@ -110,7 +115,8 @@ public void testDocumentMigration(
String targetImageName,
SearchClusterContainer.Version baseSourceImageVersion,
String generatorImage,
String[] generatorArgs
String[] generatorArgs,
boolean compressionEnabled
) throws Exception {
var executorService = Executors.newFixedThreadPool(numWorkers);
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();
Expand Down Expand Up @@ -176,7 +182,8 @@ public void testDocumentMigration(
osTargetContainer.getHttpHostAddress(),
runCounter,
clockJitter,
testDocMigrationContext
testDocMigrationContext,
compressionEnabled
),
executorService
)
Expand Down Expand Up @@ -304,7 +311,8 @@ private int migrateDocumentsSequentially(
String targetAddress,
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext
DocumentMigrationTestContext testContext,
boolean compressionEnabled
) {
for (int runNumber = 1;; ++runNumber) {
try {
Expand All @@ -314,7 +322,8 @@ private int migrateDocumentsSequentially(
indexAllowlist,
targetAddress,
clockJitter,
testContext
testContext,
compressionEnabled
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return runNumber;
Expand Down Expand Up @@ -362,7 +371,8 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
List<String> indexAllowlist,
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context
DocumentMigrationTestContext context,
boolean compressionEnabled
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
var shouldThrow = new AtomicBoolean();
Expand Down Expand Up @@ -399,6 +409,7 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
readerFactory,
new DocumentReindexer(new OpenSearchClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.compressionEnabled(compressionEnabled)
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1),
new OpenSearchWorkCoordinator(
Expand Down
27 changes: 27 additions & 0 deletions RFS/src/main/java/com/rfs/common/http/CompositeTransformer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.rfs.common.http;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import lombok.AllArgsConstructor;
import lombok.Getter;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@Getter
public class CompositeTransformer implements RequestTransformer {
private final RequestTransformer firstTransformer;
private final RequestTransformer secondTransformer;

Check warning on line 16 in RFS/src/main/java/com/rfs/common/http/CompositeTransformer.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/CompositeTransformer.java#L15-L16

Added lines #L15 - L16 were not covered by tests

@Override
public Mono<TransformedRequest> transform(String method, String path, Map<String, List<String>> headers, Mono<ByteBuffer> body) {
return firstTransformer.transform(method, path, headers, body)
.flatMap(firstResult -> secondTransformer.transform(method, path, firstResult.getHeaders(), firstResult.getBody()));
}

public List<RequestTransformer> getTransformers() {
return Arrays.asList(firstTransformer, secondTransformer);
}
}
24 changes: 21 additions & 3 deletions RFS/src/main/java/com/rfs/common/http/ConnectionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,26 @@ private ConnectionContext(IParams params) {
throw new IllegalArgumentException("Cannot have both Basic Auth and SigV4 Auth enabled.");
}

RequestTransformer authTransformer;
if (basicAuthEnabled) {
this.requestTransformer = new BasicAuthTransformer(params.getUsername(), params.getPassword());
authTransformer = new BasicAuthTransformer(params.getUsername(), params.getPassword());
}
else if (sigv4Enabled) {
this.requestTransformer = new SigV4AuthTransformer(
authTransformer = new SigV4AuthTransformer(
DefaultCredentialsProvider.create(),
params.getAwsServiceSigningName(),
params.getAwsRegion(),
protocol.name(),
Clock::systemUTC);
}
else {
this.requestTransformer = NoAuthTransformer.INSTANCE;
authTransformer = NoAuthTransformer.INSTANCE;
}

if (!params.isCompressionEnabled()) {
this.requestTransformer = authTransformer;
} else {
this.requestTransformer = new CompositeTransformer(new GzipRequestTransformer(), authTransformer);
}
}

Expand All @@ -83,6 +90,8 @@ public interface IParams {

String getAwsServiceSigningName();

boolean isCompressionEnabled();

boolean isInsecure();

default ConnectionContext toConnectionContext() {
Expand Down Expand Up @@ -115,6 +124,10 @@ public static class TargetArgs implements IParams {
@Parameter(names = {
"--target-insecure" }, description = "Allow untrusted SSL certificates for target", required = false)
public boolean insecure = false;

@Parameter(names = {

Check warning on line 128 in RFS/src/main/java/com/rfs/common/http/ConnectionContext.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/ConnectionContext.java#L128

Added line #L128 was not covered by tests
"--target-compression" }, description = "Allow request compression to target", required = false)
public boolean compressionEnabled = false;

Check warning on line 130 in RFS/src/main/java/com/rfs/common/http/ConnectionContext.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/ConnectionContext.java#L130

Added line #L130 was not covered by tests
}

@Getter
Expand Down Expand Up @@ -142,5 +155,10 @@ public static class SourceArgs implements IParams {
@Parameter(names = {
"--source-insecure" }, description = "Allow untrusted SSL certificates for source", required = false)
public boolean insecure = false;

public boolean isCompressionEnabled() {
// No compression on source due to no ingestion
return false;

Check warning on line 161 in RFS/src/main/java/com/rfs/common/http/ConnectionContext.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/ConnectionContext.java#L161

Added line #L161 was not covered by tests
}
}
}
75 changes: 75 additions & 0 deletions RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.rfs.common.http;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.GZIPOutputStream;

import io.netty.handler.codec.http.HttpHeaderNames;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@AllArgsConstructor
@Slf4j
public class GzipRequestTransformer implements RequestTransformer {
private static final String CONTENT_ENCODING_HEADER_NAME = HttpHeaderNames.CONTENT_ENCODING.toString();
private static final String GZIP_CONTENT_ENCODING_HEADER_VALUE = "gzip";
private static final int READ_BUFFER_SIZE = 256 * 1024; // Arbitrary, 256KB

// Local benchmarks show 15% throughput improvement with this setting
private static final int COMPRESSION_LEVEL = Deflater.BEST_SPEED;

@Override
public Mono<TransformedRequest> transform(String method, String path, Map<String, List<String>> headers, Mono<ByteBuffer> body) {
return body
.map(this::gzipByteBufferSimple)
.singleOptional()
.flatMap(
bodyOp -> {
Map<String, List<String>> newHeaders = new HashMap<>(headers);
if (bodyOp.isPresent()) {
newHeaders.put(CONTENT_ENCODING_HEADER_NAME, List.of(GZIP_CONTENT_ENCODING_HEADER_VALUE));
}
return Mono.just(new TransformedRequest(newHeaders, Mono.justOrEmpty(bodyOp)));
}
);
}

@SneakyThrows

Check warning on line 45 in RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java#L45

Added line #L45 was not covered by tests
private ByteBuffer gzipByteBufferSimple(final ByteBuffer inputBuffer) {
var readbuffer = inputBuffer.duplicate();
var baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new FastGzipOutputStream(baos, READ_BUFFER_SIZE, false)) {
if (readbuffer.hasArray()) {
gzipOutputStream.write(readbuffer.array(), readbuffer.arrayOffset() + readbuffer.position(), readbuffer.remaining());
} else {
byte[] buffer = new byte[READ_BUFFER_SIZE];

Check warning on line 53 in RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java#L53

Added line #L53 was not covered by tests
while (readbuffer.hasRemaining()) {
int bytesRead = Math.min(buffer.length, readbuffer.remaining());
readbuffer.get(buffer, 0, bytesRead);
gzipOutputStream.write(buffer, 0, bytesRead);
}

Check warning on line 58 in RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/http/GzipRequestTransformer.java#L55-L58

Added lines #L55 - L58 were not covered by tests
}
}
if (inputBuffer.remaining() > 0) {
log.atDebug().setMessage("Gzip compression ratio: {}")
.addArgument(() -> String.format("%.2f%%", (double) baos.size() / inputBuffer.remaining() * 100))
.log();
}
return ByteBuffer.wrap(baos.toByteArray());
}

private static class FastGzipOutputStream extends GZIPOutputStream {
public FastGzipOutputStream(OutputStream out, int size, boolean syncFlush) throws IOException {
super(out, size, syncFlush);
def.setLevel(COMPRESSION_LEVEL);
}
}
}
44 changes: 44 additions & 0 deletions RFS/src/test/java/com/rfs/common/ConnectionContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import org.junit.jupiter.params.provider.MethodSource;

import com.rfs.common.http.BasicAuthTransformer;
import com.rfs.common.http.CompositeTransformer;
import com.rfs.common.http.ConnectionContext;
import com.rfs.common.http.ConnectionContextTestParams;
import com.rfs.common.http.GzipRequestTransformer;
import com.rfs.common.http.NoAuthTransformer;
import com.rfs.common.http.SigV4AuthTransformer;

Expand Down Expand Up @@ -134,4 +136,46 @@ private static Stream<Arguments> invalidConnectionParams() {
)
);
}

@ParameterizedTest
@MethodSource("compressionEnabledParams")
void testCompressionEnabledBeforeAuth(ConnectionContextTestParams params, Class<?> expectedAuthTransformerClass) {
ConnectionContext context = params.toConnectionContext();
assertTrue(context.getRequestTransformer() instanceof CompositeTransformer);

CompositeTransformer compositeTransformer = (CompositeTransformer) context.getRequestTransformer();
assertEquals(2, compositeTransformer.getTransformers().size());
assertTrue(compositeTransformer.getTransformers().get(0) instanceof GzipRequestTransformer);
assertTrue(expectedAuthTransformerClass.isInstance(compositeTransformer.getTransformers().get(1)));
}

private static Stream<Arguments> compressionEnabledParams() {
return Stream.of(
Arguments.of(
ConnectionContextTestParams.builder()
.host("http://localhost:9200")
.compressionEnabled(true)
.build(),
NoAuthTransformer.class
),
Arguments.of(
ConnectionContextTestParams.builder()
.host("https://example.com:443")
.username("user")
.password("pass")
.compressionEnabled(true)
.build(),
BasicAuthTransformer.class
),
Arguments.of(
ConnectionContextTestParams.builder()
.host("https://opensearch.us-east-1.amazonaws.com")
.awsRegion("us-east-1")
.awsServiceSigningName("es")
.compressionEnabled(true)
.build(),
SigV4AuthTransformer.class
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.rfs.common.http;

import java.nio.ByteBuffer;
import java.util.Collections;

import org.junit.jupiter.api.Test;

import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

public class CompositeTransformerTest {

@Test
public void testCompositeTransformer() {
// Create mock transformers
RequestTransformer firstTransformer = Mockito.mock(RequestTransformer.class);
RequestTransformer secondTransformer = Mockito.mock(RequestTransformer.class);

// Set up mock behavior
TransformedRequest firstResult = new TransformedRequest(Collections.emptyMap(), Mono.empty());
TransformedRequest finalResult = new TransformedRequest(Collections.emptyMap(), Mono.just(ByteBuffer.wrap("test".getBytes())));

when(firstTransformer.transform(any(), any(), any(), any())).thenReturn(Mono.just(firstResult));
when(secondTransformer.transform(any(), any(), any(), any())).thenReturn(Mono.just(finalResult));

// Create CompositeTransformer
CompositeTransformer compositeTransformer = new CompositeTransformer(firstTransformer, secondTransformer);

// Test the transform method
Mono<TransformedRequest> result = compositeTransformer.transform("GET", "/test", Collections.emptyMap(), Mono.empty());

// Verify the result
StepVerifier.create(result)
.expectNext(finalResult)
.verifyComplete();

// Verify that both transformers were called
Mockito.verify(firstTransformer).transform(any(), any(), any(), any());
Mockito.verify(secondTransformer).transform(any(), any(), any(), any());
}
}
Loading

0 comments on commit 1733301

Please sign in to comment.