Skip to content

Commit

Permalink
Enable 100-continue for S3 only and control by configs (linkedin#2871)
Browse files Browse the repository at this point in the history
* Disable 100-continue for signed url and control by configs

* Enable for S3 only

* Use one config to control 100-continue

---------

Co-authored-by: Sophie Guo <[email protected]>
  • Loading branch information
SophieGuo410 and Sophie Guo authored Aug 23, 2024
1 parent 1afec63 commit e2b525a
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class FrontendConfig {

private static final String DEFAULT_CONTAINER_METRICS_ENABLED_GET_REQUEST_TYPES = "GetBlob,GetBlobInfo,GetSignedUrl";


/**
* Cache validity in seconds for non-private blobs for GET.
*/
Expand Down Expand Up @@ -280,7 +279,13 @@ public class FrontendConfig {
@Default("")
public final List<String> containerMetricsExcludedAccounts;

/**
* This should be controlled by {@link NettyConfig}.nettyEnableOneHundredContinue
*/
public final boolean oneHundredContinueEnable;

public FrontendConfig(VerifiableProperties verifiableProperties) {
NettyConfig nettyConfig = new NettyConfig(verifiableProperties);
cacheValiditySeconds = verifiableProperties.getLong("frontend.cache.validity.seconds", 365 * 24 * 60 * 60);
optionsValiditySeconds = verifiableProperties.getLong("frontend.options.validity.seconds", 24 * 60 * 60);
enableNamedBlobCleanupTask = verifiableProperties.getBoolean("frontend.enable.named.blob.cleanup.task", false);
Expand Down Expand Up @@ -314,6 +319,7 @@ public FrontendConfig(VerifiableProperties verifiableProperties) {
DEFAULT_CONTAINER_METRICS_ENABLED_REQUEST_TYPES);
containerMetricsEnabledGetRequestTypes = verifiableProperties.getString(CONTAINER_METRICS_ENABLED_GET_REQUEST_TYPES,
DEFAULT_CONTAINER_METRICS_ENABLED_GET_REQUEST_TYPES);
oneHundredContinueEnable = nettyConfig.nettyEnableOneHundredContinue;
urlSignerEndpoints = verifiableProperties.getString(URL_SIGNER_ENDPOINTS, DEFAULT_ENDPOINTS_STRING);
urlSignerDefaultMaxUploadSizeBytes =
verifiableProperties.getLongInRange("frontend.url.signer.default.max.upload.size.bytes", 100 * 1024 * 1024, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class NettyConfig {
public static final String NETTY_METRICS_REFRESH_INTERVAL_SECONDS = "netty.metrics.refresh.interval.seconds";
public static final String NETTY_METRICS_STOP_WAIT_TIMEOUT_SECONDS = "netty.metrics.stop.wait.timeout.seconds";
public static final String NETTY_SERVER_CLOSE_DELAY_TIMEOUT_MS = "netty.server.close.delay.timeout.ms";
public static final String NETTY_ENABLE_ONE_HUNDRED_CONTINUE = "netty.enable.one.hundred.continue";

/**
* Number of netty boss threads.
Expand Down Expand Up @@ -169,6 +170,10 @@ public class NettyConfig {
@Default("0")
public final int nettyServerCloseDelayTimeoutMs;

@Config(NETTY_ENABLE_ONE_HUNDRED_CONTINUE)
@Default("false")
public final boolean nettyEnableOneHundredContinue;

public NettyConfig(VerifiableProperties verifiableProperties) {
nettyServerBossThreadCount = verifiableProperties.getInt(NETTY_SERVER_BOSS_THREAD_COUNT, 1);
nettyServerIdleTimeSeconds = verifiableProperties.getInt(NETTY_SERVER_IDLE_TIME_SECONDS, 60);
Expand All @@ -194,5 +199,6 @@ public NettyConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getIntInRange(NETTY_METRICS_STOP_WAIT_TIMEOUT_SECONDS, 1, 0, Integer.MAX_VALUE);
nettyServerCloseDelayTimeoutMs =
verifiableProperties.getIntInRange(NETTY_SERVER_CLOSE_DELAY_TIMEOUT_MS, 0, 0, Integer.MAX_VALUE);
nettyEnableOneHundredContinue = verifiableProperties.getBoolean(NETTY_ENABLE_ONE_HUNDRED_CONTINUE, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RequestPath {
private static final String PATH_SEPARATOR_STRING = String.valueOf(PATH_SEPARATOR_CHAR);
private static final String SEGMENT = SubResource.Segment.toString();
private static final Logger logger = LoggerFactory.getLogger(RequestPath.class);
private static final String S3_PATH = PATH_SEPARATOR_CHAR + Operations.S3;
static final String S3_PATH = PATH_SEPARATOR_CHAR + Operations.S3;

/**
* Parse the request path (and additional headers in some cases). The path will match the following regex-like
Expand Down
23 changes: 20 additions & 3 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RequestPath.*;
import static com.github.ambry.rest.RestUtils.Headers.*;
import static com.github.ambry.rest.RestUtils.InternalKeys.*;

Expand Down Expand Up @@ -827,13 +828,13 @@ public static RequestPath getRequestPath(RestRequest restRequest) {
}

/**
* Return true if the request set 100-continue in Except header and it's a named blob base put request or multi-upload post.
* Return true if the request set 100-continue in Except header and it's a S3 put request or S3 multi-upload post.
* @param restRequest the {@link RestRequest}.
* @return
*/
public static boolean isPutOrPostRequestAndExpectContinue(RestRequest restRequest) {
public static boolean isPutOrPostS3RequestAndExpectContinue(RestRequest restRequest) {
return CONTINUE.equals(restRequest.getArgs().get(EXPECT)) && (restRequest.getRestMethod().equals(RestMethod.PUT)
|| restRequest.getRestMethod().equals(RestMethod.POST));
|| restRequest.getRestMethod().equals(RestMethod.POST)) && isS3Prefix(restRequest);
}

/**
Expand Down Expand Up @@ -958,6 +959,22 @@ public static boolean isS3Request(RestRequest restRequest) {
return restRequest != null && restRequest.getArgs().containsKey(S3_REQUEST);
}

/**
* Determines if the request path is a S3 prefix request path.
* @param restRequest rest request
* @return {@code true} if the request path is a S3 prefix request path.
*/
public static boolean isS3Prefix(RestRequest restRequest) {
String path;
try {
path = restRequest.getPath();
} catch (IllegalArgumentException e) {
logger.error("Invalid URI path for request " + restRequest);
return false;
}
return path.startsWith(S3_PATH) || path.startsWith(Operations.S3);
}

/**
* Determines if the input is a S3 API request
* @param args map of the arguments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private void start() {
*/
private Callback<Void> securityProcessRequestCallback() {
return buildCallback(frontendMetrics.putSecurityProcessRequestMetrics, securityCheckResult -> {
if (CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
if (frontendConfig.oneHundredContinueEnable && CONTINUE.equals(restRequest.getArgs().get(EXPECT))
&& RestUtils.isS3Request(restRequest)) {
restResponseChannel.setStatus(ResponseStatus.Continue);
//We need to set the content length in order to be a full http response in NettyResponseChannel::maybeWriteResponseMetadata.
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ private void start() {
*/
private Callback<Void> securityProcessRequestCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.postSecurityProcessRequestMetrics, securityCheckResult -> {
if (CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
if (frontendConfig.oneHundredContinueEnable && CONTINUE.equals(restRequest.getArgs().get(EXPECT))
&& RestUtils.isS3Request(restRequest)) {
restResponseChannel.setStatus(ResponseStatus.Continue);
//We need to set the content length in order to be a full http response in NettyResponseChannel::maybeWriteResponseMetadata.
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private void start() {
*/
private Callback<Void> securityProcessRequestCallback() {
return buildCallback(frontendMetrics.putSecurityProcessRequestMetrics, securityCheckResult -> {
if (CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
if (frontendConfig.oneHundredContinueEnable && CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
restResponseChannel.setStatus(ResponseStatus.Continue);
//We need to set the content length in order to be a full http response in NettyResponseChannel::maybeWriteResponseMetadata.
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private Callback<Void> securityProcessRequestCallback() {
*/
private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putSecurityPostProcessRequestMetrics, securityCheckResult -> {
if (CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
if (frontendConfig.oneHundredContinueEnable && CONTINUE.equals(restRequest.getArgs().get(EXPECT))) {
restResponseChannel.setStatus(ResponseStatus.Continue);
//We need to set the content length in order to be a full http response in NettyResponseChannel::maybeWriteResponseMetadata.
restResponseChannel.setHeader(RestUtils.Headers.CONTENT_LENGTH, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ private boolean handleContent(HttpContent httpContent) throws RestServiceExcepti
}
boolean isPutOrPost = request.getRestMethod().equals(RestMethod.POST) || request.getRestMethod().equals(RestMethod.PUT);
boolean isMultipart = request.isMultipart() && requestContentFullyReceived;
boolean hasContinueAndIsPutOrPost = RestUtils.isPutOrPostRequestAndExpectContinue(request);
boolean hasContinueAndIsPutOrPost = RestUtils.isPutOrPostS3RequestAndExpectContinue(request);
if (success && (!isPutOrPost || isMultipart || hasContinueAndIsPutOrPost)) {
if (hasContinueAndIsPutOrPost) {
if (nettyConfig.nettyEnableOneHundredContinue && hasContinueAndIsPutOrPost) {
request.setArg(EXPECT, "");
removeInternalKeyFromRequest();
responseChannel = new NettyResponseChannel(ctx, nettyMetrics, performanceConfig, nettyConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,8 @@ public void operationComplete(ChannelFuture future) {
writeFuture.setSuccess();
//Don't close the request when we see 100-continue in EXPECT header.
completeRequest(!HttpUtil.isKeepAlive(finalResponseMetadata), false,
!RestUtils.isPutOrPostRequestAndExpectContinue(request));
!(nettyConfig.nettyEnableOneHundredContinue
&& RestUtils.isPutOrPostS3RequestAndExpectContinue(request)));
}
} else {
// otherwise there is some content to write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,13 @@ public void multipartPostTest() throws Exception {
@Test
public void continueHeaderPutTest() throws Exception {
notificationSystem.reset();
EmbeddedChannel channel = createChannel();
Properties properties = new Properties();
properties.put(NettyConfig.NETTY_ENABLE_ONE_HUNDRED_CONTINUE, "true");
NettyConfig nettyConfig = new NettyConfig(new VerifiableProperties(properties));
EmbeddedChannel channel = createChannel(nettyConfig);
HttpHeaders headers = new DefaultHttpHeaders();
headers.set(EXPECT, CONTINUE);
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.PUT, "/", headers);
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.PUT, "/s3/", headers);
httpRequest.headers().set(RestUtils.Headers.SERVICE_ID, "rawBytesPostTest");
httpRequest.headers().set(RestUtils.Headers.AMBRY_CONTENT_TYPE, "application/octet-stream");
channel.writeInbound(httpRequest);
Expand All @@ -219,10 +222,13 @@ public void continueHeaderPutTest() throws Exception {
@Test
public void continueHeaderPostTest() throws Exception {
notificationSystem.reset();
EmbeddedChannel channel = createChannel();
Properties properties = new Properties();
properties.put(NettyConfig.NETTY_ENABLE_ONE_HUNDRED_CONTINUE, "true");
NettyConfig nettyConfig = new NettyConfig(new VerifiableProperties(properties));
EmbeddedChannel channel = createChannel(nettyConfig);
HttpHeaders headers = new DefaultHttpHeaders();
headers.set(EXPECT, CONTINUE);
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/", headers);
HttpRequest httpRequest = RestTestUtils.createRequest(HttpMethod.POST, "/s3/", headers);
httpRequest.headers().set(RestUtils.Headers.SERVICE_ID, "rawBytesPostTest");
httpRequest.headers().set(RestUtils.Headers.AMBRY_CONTENT_TYPE, "application/octet-stream");
channel.writeInbound(httpRequest);
Expand Down Expand Up @@ -352,6 +358,12 @@ private EmbeddedChannel createChannel() {
return new EmbeddedChannel(new ChunkedWriteHandler(), processor);
}

private EmbeddedChannel createChannel(NettyConfig nettyConfig) {
NettyMessageProcessor processor =
new NettyMessageProcessor(NETTY_METRICS, nettyConfig, PERFORMANCE_CONFIG, requestHandler);
return new EmbeddedChannel(new ChunkedWriteHandler(), processor);
}

/**
* Sends the provided {@code httpRequest} and verifies that the response is an echo of the {@code restMethod}.
* @param channel the {@link EmbeddedChannel} to send the request over.
Expand Down

0 comments on commit e2b525a

Please sign in to comment.