diff --git a/.builder/actions/localhost_canary.py b/.builder/actions/localhost_canary.py
new file mode 100644
index 000000000..1530a98c6
--- /dev/null
+++ b/.builder/actions/localhost_canary.py
@@ -0,0 +1,17 @@
+import Builder
+import sys
+import os
+
+
+class LocalhostCanary(Builder.Action):
+
+ def run(self, env):
+ env.shell.setenv('AWS_CRT_MEMORY_TRACING', '2')
+ actions = [
+ "mvn install -DskipTests",
+ "cd ./samples/HttpClientCanary && mvn install",
+ "cd ./samples/HttpClientCanary && mvn exec:exec@netty",
+ "cd ./samples/HttpClientCanary && mvn exec:exec@crt"
+ ]
+
+ return Builder.Script(actions, name='aws-crt-java-test')
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c41a0dac5..0f0938bb8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -238,3 +238,58 @@ jobs:
Start-Process -NoNewWindow python .\non_tls_server.py
python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
python builder.pyz localhost-test -p ${{ env.PACKAGE_NAME }} downstream
+
+ localhost-canary-linux:
+ runs-on: ubuntu-20.04 # latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ submodules: true
+ - name: Configure local host
+ run: |
+ python3 -m pip install h2
+ cd crt/aws-c-http/tests/py_localhost/
+ python3 server.py &
+ python3 non_tls_server.py &
+ - name: Build and test
+ run: |
+ python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
+ python builder.pyz localhost-canary -p ${{ env.PACKAGE_NAME }} --spec=downstream
+
+ localhost-canary-mac:
+ runs-on: macos-11 # latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ submodules: true
+ - name: Configure local host
+ run: |
+ python3 -m pip install h2
+ cd crt/aws-c-http/tests/py_localhost/
+ python3 server.py &
+ python3 non_tls_server.py &
+ - name: Build and test
+ run: |
+ python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder')"
+ chmod a+x builder
+ ./builder localhost-canary -p ${{ env.PACKAGE_NAME }} --spec=downstream
+
+ localhost-canary-win:
+ runs-on: windows-2022 # latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ submodules: true
+ - name: Configure local host
+ run: |
+ python -m pip install h2
+ - name: Build and test
+ run: |
+ cd crt/aws-c-http/tests/py_localhost/
+ Start-Process -NoNewWindow python .\server.py
+ Start-Process -NoNewWindow python .\non_tls_server.py
+ python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')"
+ python builder.pyz localhost-canary -p ${{ env.PACKAGE_NAME }} downstream
diff --git a/crt/aws-c-http b/crt/aws-c-http
index f81ee942f..a34e41031 160000
--- a/crt/aws-c-http
+++ b/crt/aws-c-http
@@ -1 +1 @@
-Subproject commit f81ee942fa9b149711d90b4a3f3af5c946151d16
+Subproject commit a34e41031936c0f1c7b41708f2a1da24735e41d6
diff --git a/samples/HttpClientCanary/pom.xml b/samples/HttpClientCanary/pom.xml
new file mode 100644
index 000000000..ca7b23d95
--- /dev/null
+++ b/samples/HttpClientCanary/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+ 4.0.0
+
+ com.canary
+ canary
+ 1.0-SNAPSHOT
+
+
+ UTF-8
+ 1.8
+ 1.8
+
+
+
+
+ software.amazon.awssdk.crt
+ aws-crt
+ 1.0.0-SNAPSHOT
+
+
+ software.amazon.awssdk
+ netty-nio-client
+ 2.17.261
+ compile
+
+
+ io.reactivex.rxjava2
+ rxjava
+ 2.2.21
+ compile
+
+
+
+
+
+
+ org.codehaus.mojo
+
+ exec-maven-plugin
+ 1.6.0
+
+
+ netty
+
+ java
+
+ -classpath
+
+ com.canary.SDKNettyClientCanary
+
+
+
+
+ crt
+
+ java
+
+ -classpath
+
+ com.canary.Http2StreamManagerCanary
+
+
+
+
+
+
+
+
+
diff --git a/samples/HttpClientCanary/src/main/java/com/canary/CanaryUtils.java b/samples/HttpClientCanary/src/main/java/com/canary/CanaryUtils.java
new file mode 100644
index 000000000..4929c25d6
--- /dev/null
+++ b/samples/HttpClientCanary/src/main/java/com/canary/CanaryUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+package com.canary;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.ArrayList;
+
+public class CanaryUtils {
+ private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
+ private static final AtomicInteger numDataCollected = new AtomicInteger(0);
+
+ public static ScheduledExecutorService createDataCollector(int warmupLoops, int loops, long timerSecs,
+ AtomicInteger opts, AtomicBoolean done,
+ ArrayList warmupResults, ArrayList results) {
+ ScheduledFuture> task = scheduler.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ int numCollected = numDataCollected.incrementAndGet();
+ int collectedOpts = opts.getAndSet(0);
+ double result = (double)collectedOpts/(double)timerSecs;
+ if (numCollected <= warmupLoops) {
+ System.out.println("warm up: " + result);
+ warmupResults.add(result);
+ return;
+ }
+ if (numCollected > loops + warmupLoops) {
+ done.set(true);
+ return;
+ }
+ System.out.println("result: " + result);
+ results.add(result);
+ }
+ }, timerSecs, timerSecs, TimeUnit.SECONDS);
+ return scheduler;
+ }
+
+ public static double calculateAverage(ArrayList list) {
+ return list.stream().mapToDouble(d -> d).average().orElse(0.0);
+ }
+
+ public static double calculateSTD(ArrayList list) {
+ double avg = calculateAverage(list);
+
+ double variance = 0;
+ for (int i = 0; i < list.size(); i++) {
+ variance += Math.pow(list.get(i) - avg, 2);
+ }
+ variance /= (list.size()-1);
+ return Math.sqrt(variance);
+ }
+
+ /* Return avg of the list and print out the avg and std */
+ public static double printResult(ArrayList list) {
+ double avg = calculateAverage(list);
+ double std = calculateSTD(list);
+ System.out.println("Result collected has: " + list.size());
+ System.out.println("avg of all samples: " + avg);
+ System.out.println("Standard deviation of all samples: " + std);
+ return avg;
+ }
+}
diff --git a/samples/HttpClientCanary/src/main/java/com/canary/EmptyPublisher.java b/samples/HttpClientCanary/src/main/java/com/canary/EmptyPublisher.java
new file mode 100644
index 000000000..ba7d6f7b5
--- /dev/null
+++ b/samples/HttpClientCanary/src/main/java/com/canary/EmptyPublisher.java
@@ -0,0 +1,45 @@
+package com.canary;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
+
+public class EmptyPublisher implements SdkHttpContentPublisher {
+ @Override
+ public void subscribe(Subscriber super ByteBuffer> subscriber) {
+ subscriber.onSubscribe(new EmptySubscription(subscriber));
+ }
+
+ @Override
+ public Optional contentLength() {
+ return Optional.of(0L);
+ }
+
+ private static class EmptySubscription implements Subscription {
+ private final Subscriber subscriber;
+ private volatile boolean done;
+
+ EmptySubscription(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void request(long l) {
+ if (!done) {
+ done = true;
+ if (l <= 0) {
+ this.subscriber.onError(new IllegalArgumentException("Demand must be positive"));
+ } else {
+ this.subscriber.onComplete();
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ done = true;
+ }
+ }
+}
diff --git a/samples/HttpClientCanary/src/main/java/com/canary/Http2StreamManagerCanary.java b/samples/HttpClientCanary/src/main/java/com/canary/Http2StreamManagerCanary.java
new file mode 100644
index 000000000..93df80fbb
--- /dev/null
+++ b/samples/HttpClientCanary/src/main/java/com/canary/Http2StreamManagerCanary.java
@@ -0,0 +1,234 @@
+/**
+ * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ * SPDX-License-Identifier: Apache-2.0.
+ */
+
+package com.canary;
+
+import software.amazon.awssdk.crt.CRT;
+import software.amazon.awssdk.crt.CrtResource;
+import software.amazon.awssdk.crt.CrtRuntimeException;
+import software.amazon.awssdk.crt.io.*;
+import software.amazon.awssdk.crt.http.*;
+import software.amazon.awssdk.crt.utils.ByteBufferUtils;
+import software.amazon.awssdk.crt.Log;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.FileReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyFactory;
+import java.security.PrivateKey;
+import java.security.Signature;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.canary.CanaryUtils.createDataCollector;
+import static com.canary.CanaryUtils.printResult;
+
+/**
+ * A sample for testing the custom private key operations. See the Java V2 SDK
+ * sample for a more in-depth
+ * sample with additional options that can be configured via the terminal. This
+ * is for testing primarily.
+ */
+public class Http2StreamManagerCanary {
+ private final AtomicInteger opts = new AtomicInteger(0);
+ private static final String HTTPS = "https";
+ private URI uri;
+ private int maxStreams = 100;
+ private int maxConnections = 50;
+ private int batchNum;
+ private String nettyResultPath;
+
+ private Http2StreamManager createStreamManager(URI uri, int numConnections) {
+
+ try (EventLoopGroup eventLoopGroup = new EventLoopGroup(0 /* default to number of cores */);
+ HostResolver resolver = new HostResolver(eventLoopGroup);
+ ClientBootstrap bootstrap = new ClientBootstrap(eventLoopGroup, resolver);
+ SocketOptions sockOpts = new SocketOptions();
+ TlsContextOptions tlsOpts = TlsContextOptions.createDefaultClient().withAlpnList("h2")
+ .withVerifyPeer(false);
+ TlsContext tlsContext = new TlsContext(tlsOpts)) {
+ boolean useTls = HTTPS.equals(uri.getScheme());
+ Http2StreamManagerOptions options = new Http2StreamManagerOptions();
+ HttpClientConnectionManagerOptions connectionManagerOptions = new HttpClientConnectionManagerOptions();
+ connectionManagerOptions.withClientBootstrap(bootstrap)
+ .withSocketOptions(sockOpts)
+ .withUri(uri)
+ .withMaxConnections(numConnections);
+ if (useTls) {
+ connectionManagerOptions.withTlsContext(tlsContext);
+ } else {
+ options.withPriorKnowledge(true);
+ }
+ options.withConnectionManagerOptions(connectionManagerOptions)
+ .withIdealConcurrentStreamsPerConnection(this.maxStreams)
+ .withMaxConcurrentStreamsPerConnection(this.maxStreams);
+
+ return Http2StreamManager.create(options);
+ }
+ }
+
+ private HttpRequestBodyStream createBodyStreamWithLength(long bodyLength) {
+ final long payloadSize = bodyLength;
+ final String payloadString = "This is CRT HTTP test.";
+
+ HttpRequestBodyStream payloadStream = new HttpRequestBodyStream() {
+
+ private long remainingBody = payloadSize;
+
+ @Override
+ public boolean sendRequestBody(ByteBuffer outBuffer) {
+
+ byte[] payloadBytes = null;
+
+ try {
+ payloadBytes = payloadString.getBytes("ASCII");
+ } catch (Exception ex) {
+ System.out.println("Encountered error trying to get payload bytes.");
+ return true;
+ }
+
+ while (remainingBody > 0 && outBuffer.remaining() > 0) {
+ long amtToTransfer = Math.min(remainingBody, (long) outBuffer.remaining());
+ amtToTransfer = Math.min(amtToTransfer, (long) payloadBytes.length);
+
+ // Transfer the data
+ outBuffer.put(payloadBytes, 0, (int) amtToTransfer);
+
+ remainingBody -= amtToTransfer;
+ }
+
+ return remainingBody == 0;
+ }
+
+ @Override
+ public boolean resetPosition() {
+ return true;
+ }
+
+ @Override
+ public long getLength() {
+ return payloadSize;
+ }
+ };
+ return payloadStream;
+ }
+
+ private Http2Request createHttp2Request(String method, URI uri, long bodyLength) {
+ HttpHeader[] requestHeaders = new HttpHeader[] {
+ new HttpHeader(":method", method),
+ new HttpHeader(":path", uri.getPath()),
+ new HttpHeader(":scheme", uri.getScheme()),
+ new HttpHeader(":authority", uri.getHost()),
+ };
+ HttpRequestBodyStream bodyStream = null;
+ if (bodyLength > 0) {
+ bodyStream = createBodyStreamWithLength(bodyLength);
+ }
+ Http2Request request = new Http2Request(requestHeaders, bodyStream);
+
+ return request;
+ }
+
+ private void concurrentRequests(Http2StreamManager streamManager, int concurrentNum,
+ AtomicInteger numStreamsFailures, AtomicInteger opts, AtomicBoolean done) throws Exception {
+ Http2Request request = createHttp2Request("GET", uri, 0);
+ while (!done.get()) {
+ final AtomicInteger requestCompleted = new AtomicInteger(0);
+ final CompletableFuture requestCompleteFuture = new CompletableFuture();
+ for (int i = 0; i < concurrentNum; i++) {
+ streamManager.acquireStream(request, new HttpStreamBaseResponseHandler() {
+ @Override
+ public void onResponseHeaders(HttpStreamBase stream, int responseStatusCode, int blockType,
+ HttpHeader[] nextHeaders) {
+ if (responseStatusCode != 200) {
+ numStreamsFailures.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
+ if (errorCode != CRT.AWS_CRT_SUCCESS) {
+ numStreamsFailures.incrementAndGet();
+ }
+ stream.close();
+
+ opts.incrementAndGet();
+ int requestCompletedNum = requestCompleted.incrementAndGet();
+ if (requestCompletedNum == concurrentNum) {
+ requestCompleteFuture.complete(null);
+ }
+ }
+ });
+ }
+ // Wait for all Requests to complete
+ requestCompleteFuture.get(30, TimeUnit.SECONDS);
+ }
+ }
+
+ private void runCanary(int warmupLoops, int loops, long timerSecs) throws Exception {
+ ArrayList warmupResults = new ArrayList<>();
+ ArrayList results = new ArrayList<>();
+ AtomicInteger streamFailed = new AtomicInteger(0);
+
+ System.out.println("batchNum: " + this.batchNum);
+ System.out.println("maxStreams: " + this.maxStreams);
+ System.out.println("maxConnections: " + this.maxConnections);
+ try (Http2StreamManager streamManager = createStreamManager(uri, this.maxConnections)) {
+ AtomicInteger opts = new AtomicInteger(0);
+ AtomicBoolean done = new AtomicBoolean(false);
+ ScheduledExecutorService scheduler = createDataCollector(warmupLoops, loops, timerSecs, opts, done,
+ warmupResults, results);
+ concurrentRequests(streamManager, batchNum, streamFailed, opts, done);
+ scheduler.shutdown();
+ }
+ System.out.println("Failed request num: " + streamFailed.get());
+ System.out.println("////////////// warmup results //////////////");
+ printResult(warmupResults);
+ System.out.println("////////////// real results //////////////");
+ double avg_result = printResult(results);
+ BufferedReader reader = new BufferedReader(new FileReader(nettyResultPath));
+ StringBuilder stringBuilder = new StringBuilder();
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ stringBuilder.append(line);
+ }
+ reader.close();
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ String content = stringBuilder.toString();
+ double netty_result = Double.parseDouble(content);
+ if (avg_result < netty_result) {
+ System.out.println("CRT result is smaller than netty. CRT: " + avg_result + " Netty: " + netty_result);
+ System.exit(-1);
+ }
+
+ CrtResource.logNativeResources();
+ CrtResource.waitForNoResources();
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("Current JVM version - " + System.getProperty("java.version"));
+
+ Http2StreamManagerCanary canary = new Http2StreamManagerCanary();
+
+ canary.uri = new URI(System.getProperty("aws.crt.http.canary.uri", "https://localhost:8443/echo"));
+ canary.maxConnections = Integer.parseInt(System.getProperty("aws.crt.http.canary.maxConnections", "8"));
+ canary.maxStreams = Integer.parseInt(System.getProperty("aws.crt.http.canary.maxStreams", "20"));
+ canary.nettyResultPath = System.getProperty("aws.crt.http.canary.nettyResultPath", "netty_result.txt");
+
+ canary.batchNum = canary.maxStreams * canary.maxConnections;
+ canary.runCanary(5, 5, 30);
+ }
+}
diff --git a/samples/HttpClientCanary/src/main/java/com/canary/SDKNettyClientCanary.java b/samples/HttpClientCanary/src/main/java/com/canary/SDKNettyClientCanary.java
new file mode 100644
index 000000000..724ed3d62
--- /dev/null
+++ b/samples/HttpClientCanary/src/main/java/com/canary/SDKNettyClientCanary.java
@@ -0,0 +1,168 @@
+package com.canary;
+
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;
+import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
+
+import org.reactivestreams.Publisher;
+import io.reactivex.Flowable;
+import software.amazon.awssdk.http.*;
+import software.amazon.awssdk.http.async.AsyncExecuteRequest;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.canary.CanaryUtils.createDataCollector;
+import static com.canary.CanaryUtils.printResult;
+
+public class SDKNettyClientCanary {
+ private final AtomicInteger opts = new AtomicInteger(0);
+ private URI uri;
+ private int batchNum;
+ private int maxStreams = 100;
+ private int maxConnections = 50;
+ private String nettyResultPath;
+
+ public static AttributeMap.Builder trustAllTlsAttributeMapBuilder() {
+ return AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true);
+ }
+
+ private static class TestResponseHandler implements SdkAsyncHttpResponseHandler {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ }
+
+ @Override
+ public void onStream(Publisher stream) {
+ Flowable.fromPublisher(stream).forEach(b -> {
+ });
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ }
+ }
+
+ private AsyncExecuteRequest createExecuteRequest(AtomicInteger numStreamsFailures) {
+ SdkHttpRequest request = createRequest(uri);
+
+ SdkAsyncHttpResponseHandler handler = new TestResponseHandler() {
+ @Override
+ public void onHeaders(SdkHttpResponse headers) {
+ opts.incrementAndGet();
+ if (!headers.isSuccessful()) {
+ numStreamsFailures.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ numStreamsFailures.incrementAndGet();
+ }
+ };
+ return AsyncExecuteRequest.builder()
+ .request(request)
+ .requestContentPublisher(new EmptyPublisher())
+ .responseHandler(handler)
+ .build();
+ }
+
+ private SdkHttpFullRequest createRequest(URI uri) {
+ return SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.GET)
+ .encodedPath("/echo")
+ .build();
+ }
+
+ private void concurrentRequests(SdkAsyncHttpClient sdkHttpClient, int concurrentNum,
+ AtomicInteger numStreamsFailures, AtomicInteger opts, AtomicBoolean done) throws Exception {
+
+ while (!done.get()) {
+
+ SdkHttpFullRequest request = SdkHttpFullRequest.builder()
+ .uri(uri)
+ .method(SdkHttpMethod.GET)
+ .build();
+ final AtomicInteger requestCompleted = new AtomicInteger(0);
+ final CompletableFuture requestCompleteFuture = new CompletableFuture();
+
+ for (int i = 0; i < concurrentNum; i++) {
+ try {
+ CompletableFuture httpClientFuture = sdkHttpClient
+ .execute(createExecuteRequest(numStreamsFailures))
+ .whenComplete((r, t) -> {
+ if (t != null) {
+ numStreamsFailures.incrementAndGet();
+ }
+ opts.incrementAndGet();
+ int requestCompletedNum = requestCompleted.incrementAndGet();
+ if (requestCompletedNum == concurrentNum) {
+ requestCompleteFuture.complete(null);
+ }
+ });
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ // Wait for all Requests to complete
+ requestCompleteFuture.get(30, TimeUnit.SECONDS);
+ }
+ }
+
+ private void runCanary(int warmupLoops, int loops, long timerSecs) throws Exception {
+ ArrayList warmupResults = new ArrayList<>();
+ ArrayList results = new ArrayList<>();
+ AtomicInteger streamFailed = new AtomicInteger(0);
+
+ System.out.println("batchNum: "+ this.batchNum);
+ System.out.println("maxStreams: "+ this.maxStreams);
+ System.out.println("maxConnections: "+ this.maxConnections);
+ SdkAsyncHttpClient sdkHttpClient = NettyNioAsyncHttpClient.builder().http2Configuration(Http2Configuration.builder()
+ .maxStreams(new Long(this.maxStreams)).build())
+ .maxConcurrency(this.batchNum)
+ .buildWithDefaults(trustAllTlsAttributeMapBuilder()
+ .put(PROTOCOL, Protocol.HTTP2)
+ .build());
+
+ AtomicInteger opts = new AtomicInteger(0);
+ AtomicBoolean done = new AtomicBoolean(false);
+ ScheduledExecutorService scheduler = createDataCollector(warmupLoops, loops, timerSecs, opts, done,
+ warmupResults, results);
+ concurrentRequests(sdkHttpClient, batchNum, streamFailed, opts, done);
+ scheduler.shutdown();
+
+ System.out.println("Failed request num: " + streamFailed.get());
+ System.out.println("////////////// warmup results //////////////");
+ printResult(warmupResults);
+ System.out.println("////////////// real results //////////////");
+ double avg_result = printResult(results);
+ BufferedWriter writer = new BufferedWriter(new FileWriter(this.nettyResultPath, false));
+ writer.append(Double.toString(avg_result));
+ writer.close();
+ }
+
+ public static void main(String[] args) throws Exception {
+ SDKNettyClientCanary canary = new SDKNettyClientCanary();
+
+ canary.uri = new URI(System.getProperty("aws.crt.http.canary.uri", "https://localhost:8443/echo"));
+ canary.maxConnections = Integer.parseInt(System.getProperty("aws.crt.http.canary.maxConnections", "8"));
+ canary.maxStreams = Integer.parseInt(System.getProperty("aws.crt.http.canary.maxStreams", "20"));
+ canary.nettyResultPath = System.getProperty("aws.crt.http.canary.nettyResultPath", "netty_result.txt");
+
+ canary.batchNum = canary.maxStreams * canary.maxConnections;
+ canary.runCanary(5, 5, 30);
+ }
+}
diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java
index 7a0db0f64..02bc91afe 100644
--- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java
+++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java
@@ -41,9 +41,7 @@ void onResponseHeadersDone(HttpStreamBase stream, int blockType) {
}
}
- int onResponseBody(HttpStreamBase stream, ByteBuffer bodyBytesIn) {
- byte[] body = new byte[bodyBytesIn.limit()];
- bodyBytesIn.get(body);
+ int onResponseBody(HttpStreamBase stream, byte[] body) {
if (this.responseBaseHandler != null) {
return responseBaseHandler.onResponseBody(stream, body);
} else {
diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c
index 037bf5031..45f8afb1f 100644
--- a/src/native/http_request_response.c
+++ b/src/native/http_request_response.c
@@ -214,7 +214,7 @@ int aws_java_http_stream_on_incoming_body_fn(
int result = AWS_OP_ERR;
- jobject jni_payload = aws_jni_direct_byte_buffer_from_raw_ptr(env, data->ptr, data->len);
+ jobject jni_payload = aws_jni_byte_array_from_cursor(env, data);
jint window_increment = (*env)->CallIntMethod(
env,
diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c
index a51c8389c..baeb9404e 100644
--- a/src/native/java_class_ids.c
+++ b/src/native/java_class_ids.c
@@ -475,8 +475,8 @@ static void s_cache_http_stream_response_handler_native_adapter(JNIEnv *env) {
(*env)->GetMethodID(env, cls, "onResponseHeadersDone", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;I)V");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onResponseHeadersDone);
- http_stream_response_handler_properties.onResponseBody = (*env)->GetMethodID(
- env, cls, "onResponseBody", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;Ljava/nio/ByteBuffer;)I");
+ http_stream_response_handler_properties.onResponseBody =
+ (*env)->GetMethodID(env, cls, "onResponseBody", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;[B)I");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onResponseBody);
http_stream_response_handler_properties.onResponseComplete =