Skip to content

Commit

Permalink
Move id converter inside of router phase4 (linkedin#2899)
Browse files Browse the repository at this point in the history
* Move id converter inside of router phase4

* Move ttl update inside of router for named blob put request

* Address comments

---------

Co-authored-by: Sophie Guo <[email protected]>
  • Loading branch information
SophieGuo410 and Sophie Guo authored Sep 26, 2024
1 parent 4b88e9d commit 5399282
Show file tree
Hide file tree
Showing 23 changed files with 267 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Call
}

@Override
public Future<String> putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback, RestRequest restRequest) {
public Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback) {
lock.lock();
try {
FutureResult<String> future = new FutureResult<>();
Expand Down Expand Up @@ -166,7 +166,7 @@ public Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> c
}

@Override
public Future<Void> updateBlobTtl(String blobId, String serviceId, long expiresAtMs, Callback<Void> callback,
public Future<Void> updateBlobTtl(RestRequest restRequest, String blobId, String serviceId, long expiresAtMs, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback) {
throw new UnsupportedOperationException("updateBlobTtl is not supported by this mock");
}
Expand Down
24 changes: 13 additions & 11 deletions ambry-api/src/main/java/com/github/ambry/router/Router.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Callback<Ge
/**
* Requests for a new blob to be put asynchronously and invokes the {@link Callback} when the request completes.
*
* @param restRequest The {@link RestRequest} to put the blob.
* @param blobProperties The properties of the blob. Note that the size specified in the properties is ignored.
* The channel is consumed fully, and the size of the blob is the number of bytes read from
* it.
Expand All @@ -58,11 +59,10 @@ Future<GetBlobResult> getBlob(String blobId, GetBlobOptions options, Callback<Ge
* @param options The {@link PutBlobOptions} associated with the request. This cannot be null.
* @param callback The {@link Callback} which will be invoked on the completion of the request .
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @param restRequest The {@link RestRequest} to put the blob.
* @return A future that would contain the BlobId eventually.
*/
Future<String> putBlob(BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback, RestRequest restRequest);
Future<String> putBlob(RestRequest restRequest, BlobProperties blobProperties, byte[] userMetadata, ReadableStreamChannel channel,
PutBlobOptions options, Callback<String> callback, QuotaChargeCallback quotaChargeCallback);

/**
* Requests for a new metadata blob to be put asynchronously and invokes the {@link Callback} when the request
Expand Down Expand Up @@ -96,15 +96,17 @@ Future<Void> deleteBlob(String blobId, String serviceId, Callback<Void> callback
/**
* Requests that a blob's TTL be updated asynchronously and returns a future that will eventually contain information
* about whether the request succeeded or not.
* @param blobId The ID of the blob that needs its TTL updated.
* @param serviceId The service ID of the service updating the blob. This can be null if unknown.
* @param expiresAtMs The new expiry time (in ms) of the blob. Using {@link Utils#Infinite_Time} makes the blob
* permanent
* @param callback The {@link Callback} which will be invoked on the completion of a request.
*
* @param restRequest The {@link RestRequest} to update blob ttl.
* @param blobId The ID of the blob that needs its TTL updated.
* @param serviceId The service ID of the service updating the blob. This can be null if unknown.
* @param expiresAtMs The new expiry time (in ms) of the blob. Using {@link Utils#Infinite_Time} makes the
* blob permanent
* @param callback The {@link Callback} which will be invoked on the completion of a request.
* @param quotaChargeCallback Listener interface to charge quota cost for the operation.
* @return A future that would contain information about whether the update succeeded or not, eventually.
*/
Future<Void> updateBlobTtl(String blobId, String serviceId, long expiresAtMs, Callback<Void> callback,
Future<Void> updateBlobTtl(RestRequest restRequest, String blobId, String serviceId, long expiresAtMs, Callback<Void> callback,
QuotaChargeCallback quotaChargeCallback);

/**
Expand Down Expand Up @@ -197,7 +199,7 @@ default CompletableFuture<String> stitchBlob(BlobProperties blobProperties, byte
default CompletableFuture<String> putBlob(BlobProperties blobProperties, byte[] userMetadata,
ReadableStreamChannel channel, PutBlobOptions options) {
CompletableFuture<String> future = new CompletableFuture<>();
putBlob(blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null, null);
putBlob(null, blobProperties, userMetadata, channel, options, CallbackUtils.fromCompletableFuture(future), null);
return future;
}

Expand Down Expand Up @@ -225,7 +227,7 @@ default CompletableFuture<Void> deleteBlob(String blobId, String serviceId) {
*/
default CompletableFuture<Void> updateBlobTtl(String blobId, String serviceId, long expiresAtMs) {
CompletableFuture<Void> future = new CompletableFuture<>();
updateBlobTtl(blobId, serviceId, expiresAtMs, CallbackUtils.fromCompletableFuture(future), null);
updateBlobTtl(null, blobId, serviceId, expiresAtMs, CallbackUtils.fromCompletableFuture(future), null);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public void handlePost(RestRequest restRequest, RestResponseChannel restResponse
restRequest.setArg(RestUtils.InternalKeys.TARGET_CONTAINER_KEY, Container.UNKNOWN_CONTAINER);
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null);
router.putBlob(null, blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
}
Expand Down Expand Up @@ -158,8 +158,8 @@ public void handlePut(RestRequest restRequest, RestResponseChannel restResponseC
restRequest.setArg(RestUtils.InternalKeys.TARGET_CONTAINER_KEY, Container.UNKNOWN_CONTAINER);
BlobProperties blobProperties = RestUtils.buildBlobProperties(restRequest.getArgs());
byte[] usermetadata = RestUtils.buildUserMetadata(restRequest.getArgs());
router.putBlob(blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null, null);
router.putBlob(null, blobProperties, usermetadata, restRequest, new PutBlobOptionsBuilder().build(),
new MockPostCallback(this, restRequest, restResponseChannel, blobProperties), null);
} catch (RestServiceException e) {
handleResponse(restRequest, restResponseChannel, null, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.github.ambry.rest.RestUtils.InternalKeys.*;


/**
* Factory that instantiates an {@link IdConverter} implementation for the frontend.
Expand Down Expand Up @@ -176,32 +178,44 @@ private CompletionStage<String> convertId(String input, RestRequest restRequest,
//and do update ttl in routerCallBack.
conversionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), getOption).thenApply(result -> {
restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, result.getVersion());
restRequest.setArg(NAMED_BLOB_VERSION, result.getVersion());
return result.getBlobId();
});
} else {
conversionFuture = getNamedBlobDb().get(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), getOption).thenApply(NamedBlobRecord::getBlobId);
}
} else if (isNamedBlobPutRequest(restRequest) || isS3MultipartUploadCompleteRequest(restRequest)) {
Objects.requireNonNull(blobProperties, "blobProperties cannot be null.");
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
String blobId = RestUtils.stripSlashAndExtensionFromId(input);
long expirationTimeMs =
Utils.addSecondsToEpochTime(blobProperties.getCreationTimeInMs(), blobProperties.getTimeToLiveInSeconds());
// Please note that the modified_ts column in DB will be auto-populated by the DB server.
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, blobProperties.getBlobSize());
NamedBlobState state = NamedBlobState.READY;
if (blobProperties.getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
state = NamedBlobState.IN_PROGRESS;
if (restRequest.getArgs().containsKey(NAMED_BLOB_VERSION)) {
long namedBlobVersion = (long) restRequest.getArgs().get(NAMED_BLOB_VERSION);
String blobIdClean = RestUtils.stripSlashAndExtensionFromId(input);
NamedBlobPath namedBlobPath =
NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobIdClean, Utils.Infinite_Time, namedBlobVersion);
conversionFuture = namedBlobDb.updateBlobTtlAndStateToReady(record).thenApply(result -> {
return result.getInsertedRecord().getBlobId();
});
} else {
Objects.requireNonNull(blobProperties, "blobProperties cannot be null.");
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
String blobId = RestUtils.stripSlashAndExtensionFromId(input);
long expirationTimeMs =
Utils.addSecondsToEpochTime(blobProperties.getCreationTimeInMs(), blobProperties.getTimeToLiveInSeconds());
// Please note that the modified_ts column in DB will be auto-populated by the DB server.
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobId, expirationTimeMs, 0, blobProperties.getBlobSize());
NamedBlobState state = NamedBlobState.READY;
if (blobProperties.getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Set named blob state as 'IN_PROGRESS', will set the state to be 'READY' in the ttlUpdate success callback: routerTtlUpdateCallback
state = NamedBlobState.IN_PROGRESS;
}
conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs())).thenApply(
result -> {
restRequest.setArg(NAMED_BLOB_VERSION, result.getInsertedRecord().getVersion());
return result.getInsertedRecord().getBlobId();
});
}
conversionFuture = getNamedBlobDb().put(record, state, RestUtils.isUpsertForNamedBlob(restRequest.getArgs())).thenApply(
result -> {
restRequest.setArg(RestUtils.InternalKeys.NAMED_BLOB_VERSION, result.getInsertedRecord().getVersion());
return result.getInsertedRecord().getBlobId();
});
} else {
String decryptedInput =
parseSignedIdIfRequired(restRequest, input.startsWith("/") ? input.substring(1) : input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.github.ambry.messageformat.BlobInfo;
import com.github.ambry.messageformat.BlobProperties;
import com.github.ambry.named.NamedBlobDb;
import com.github.ambry.named.NamedBlobRecord;
import com.github.ambry.protocol.DatasetVersionState;
import com.github.ambry.quota.QuotaManager;
import com.github.ambry.quota.QuotaUtils;
Expand Down Expand Up @@ -234,9 +233,8 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
addDatasetVersion(blobInfo.getBlobProperties(), restRequest);
}
PutBlobOptions options = getPutBlobOptionsFromRequest();
router.putBlob(getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true),
restRequest);
router.putBlob(restRequest, getPropertiesForRouterUpload(blobInfo), blobInfo.getUserMetadata(), restRequest, options,
routerPutBlobCallback(blobInfo), QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, true));
}
}, uri, LOGGER, deleteDatasetCallback);
}
Expand All @@ -250,8 +248,26 @@ private Callback<Void> securityPostProcessRequestCallback(BlobInfo blobInfo) {
private Callback<String> routerPutBlobCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.putRouterPutBlobMetrics, blobId -> {
restResponseChannel.setHeader(RestUtils.Headers.BLOB_SIZE, restRequest.getBlobBytesReceived());
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, blobId);
blobInfo.getBlobProperties().setBlobSize(restRequest.getBlobBytesReceived());
idConverter.convert(restRequest, blobId, blobInfo.getBlobProperties(), idConverterCallback(blobInfo, blobId));
if (blobInfo.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Do ttl update with retryExecutor. Use the blob ID returned from the router instead of the converted ID
// since the converted ID may be changed by the ID converter.
String serviceId = blobInfo.getBlobProperties().getServiceId();
retryExecutor.runWithRetries(retryPolicy,
callback -> router.updateBlobTtl(restRequest, blobId, serviceId, Utils.Infinite_Time, callback,
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)),
this::isRetriable, routerTtlUpdateCallback(blobInfo));
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount(
deleteDatasetVersionOutOfRetentionCallback(blobInfo));
} else {
securityService.processResponse(restRequest, restResponseChannel, blobInfo,
securityProcessResponseCallback());
}
}
}, uri, LOGGER, deleteDatasetCallback);
}

Expand Down Expand Up @@ -293,20 +309,20 @@ private Callback<String> routerStitchBlobCallback(BlobInfo blobInfo,
* After {@link IdConverter#convert} finishes, call {@link SecurityService#postProcessRequest} to perform
* request time security checks that rely on the request being fully parsed and any additional arguments set.
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @param blobId the blob ID returned by the router (without decoration or obfuscation by id converter).
* @return a {@link Callback} to be used with {@link IdConverter#convert}.
*/
private Callback<String> idConverterCallback(BlobInfo blobInfo, String blobId) {
return buildCallback(frontendMetrics.putIdConversionMetrics, convertedBlobId -> {
restResponseChannel.setHeader(RestUtils.Headers.LOCATION, convertedBlobId);
blobInfo.getBlobProperties().setBlobSize(restRequest.getBlobBytesReceived());
if (blobInfo.getBlobProperties().getTimeToLiveInSeconds() == Utils.Infinite_Time) {
// Do ttl update with retryExecutor. Use the blob ID returned from the router instead of the converted ID
// since the converted ID may be changed by the ID converter.
String serviceId = blobInfo.getBlobProperties().getServiceId();
retryExecutor.runWithRetries(retryPolicy,
callback -> router.updateBlobTtl(blobId, serviceId, Utils.Infinite_Time, callback,
callback -> router.updateBlobTtl(null, blobId, serviceId, Utils.Infinite_Time, callback,
QuotaUtils.buildQuotaChargeCallback(restRequest, quotaManager, false)), this::isRetriable,
routerTtlUpdateCallback(blobInfo, blobId));
routerTtlUpdateCallback(blobInfo));
} else {
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
Expand All @@ -330,25 +346,14 @@ private boolean isRetriable(Throwable throwable) {
}

/**
* After TTL update finishes, call {@link SecurityService#postProcessRequest} to perform
* request time security checks that rely on the request being fully parsed and any additional arguments set.
* After TTL update finishes, call {@link SecurityService#postProcessRequest} to perform request time security
* checks that rely on the request being fully parsed and any additional arguments set.
*
* @param blobInfo the {@link BlobInfo} to use for security checks.
* @param blobId the {@link String} to use for blob id.
* @return a {@link Callback} to be used with {@link Router#updateBlobTtl(String, String, long)}.
*/
private Callback<Void> routerTtlUpdateCallback(BlobInfo blobInfo, String blobId) {
private Callback<Void> routerTtlUpdateCallback(BlobInfo blobInfo) {
return buildCallback(frontendMetrics.updateBlobTtlRouterMetrics, convertedBlobId -> {
// Set the named blob state to be 'READY' after the Ttl update succeed
if (!restRequest.getArgs().containsKey(RestUtils.InternalKeys.NAMED_BLOB_VERSION)) {
throw new RestServiceException("Internal key " + RestUtils.InternalKeys.NAMED_BLOB_VERSION
+ " is required in Named Blob TTL update callback!", RestServiceErrorCode.InternalServerError);
}
long namedBlobVersion = (long) restRequest.getArgs().get(NAMED_BLOB_VERSION);
String blobIdClean = RestUtils.stripSlashAndExtensionFromId(blobId);
NamedBlobPath namedBlobPath = NamedBlobPath.parse(RestUtils.getRequestPath(restRequest), restRequest.getArgs());
NamedBlobRecord record = new NamedBlobRecord(namedBlobPath.getAccountName(), namedBlobPath.getContainerName(),
namedBlobPath.getBlobName(), blobIdClean, Utils.Infinite_Time, namedBlobVersion);
namedBlobDb.updateBlobTtlAndStateToReady(record).get();
if (RestUtils.isDatasetVersionQueryEnabled(restRequest.getArgs())) {
//Make sure to process response after delete finished
updateVersionStateAndDeleteDatasetVersionOutOfRetentionCount(
Expand Down
Loading

0 comments on commit 5399282

Please sign in to comment.